fgerlits commented on a change in pull request #791: URL: https://github.com/apache/nifi-minifi-cpp/pull/791#discussion_r439388761
########## File path: extensions/standard-processors/processors/TailFile.cpp ########## @@ -335,170 +563,210 @@ bool TailFile::storeState(const std::shared_ptr<core::ProcessContext>& context) return true; } -static bool sortTailMatchedFileItem(TailMatchedFileItem i, TailMatchedFileItem j) { - return (i.modifiedTime < j.modifiedTime); -} -void TailFile::checkRollOver(const std::shared_ptr<core::ProcessContext>& context, TailState &file, const std::string &base_file_name) { - struct stat statbuf; - std::vector<TailMatchedFileItem> matchedFiles; - std::string fullPath = file.path_ + utils::file::FileUtils::get_separator() + file.current_file_name_; - - if (stat(fullPath.c_str(), &statbuf) == 0) { - logger_->log_trace("Searching for files rolled over"); - std::string pattern = file.current_file_name_; - std::size_t found = file.current_file_name_.find_last_of("."); - if (found != std::string::npos) - pattern = file.current_file_name_.substr(0, found); - - // Callback, called for each file entry in the listed directory - // Return value is used to break (false) or continue (true) listing - auto lambda = [&](const std::string& path, const std::string& filename) -> bool { - struct stat sb; - std::string fileFullName = path + utils::file::FileUtils::get_separator() + filename; - if ((fileFullName.find(pattern) != std::string::npos) && stat(fileFullName.c_str(), &sb) == 0) { - uint64_t candidateModTime = ((uint64_t) (sb.st_mtime) * 1000); - if (candidateModTime >= file.currentTailFileModificationTime_) { - logging::LOG_TRACE(logger_) << "File " << filename << " (short name " << file.current_file_name_ << - ") disk mod time " << candidateModTime << ", struct mod time " << file.currentTailFileModificationTime_ << ", size on disk " << sb.st_size << ", position " << file.currentTailFilePosition_; - if (filename == file.current_file_name_ && candidateModTime == file.currentTailFileModificationTime_ && - sb.st_size == file.currentTailFilePosition_) { - return true; // Skip the current file as a candidate in case it wasn't updated +std::vector<TailState> TailFile::findRotatedFiles(const TailState &state) const { + logger_->log_debug("Searching for files rolled over; last read time is %llu", + std::chrono::time_point_cast<std::chrono::seconds>(state.last_read_time_).time_since_epoch().count()); + + std::size_t last_dot_position = state.file_name_.find_last_of('.'); + std::string base_name = state.file_name_.substr(0, last_dot_position); + std::string pattern = utils::StringUtils::replaceOne(rolling_filename_pattern_, "${filename}", base_name); + + std::vector<TailStateWithMtime> matched_files_with_mtime; + auto collect_matching_files = [&](const std::string &path, const std::string &file_name) -> bool { + if (file_name != state.file_name_ && utils::Regex::matchesFullInput(pattern, file_name)) { + std::string full_file_name = path + utils::file::FileUtils::get_separator() + file_name; + TailStateWithMtime::TimePoint mtime{utils::file::FileUtils::last_write_time_point(full_file_name)}; + logger_->log_debug("File %s with mtime %llu matches rolling filename pattern %s", file_name, mtime.time_since_epoch().count(), pattern); + if (mtime >= std::chrono::time_point_cast<std::chrono::seconds>(state.last_read_time_)) { + logger_->log_debug("File %s has mtime >= last read time, so we are going to read it", file_name); + matched_files_with_mtime.emplace_back(TailState{path, file_name}, mtime); + } + } + return true; + }; + + utils::file::FileUtils::list_dir(state.path_, collect_matching_files, logger_, false); + + std::sort(matched_files_with_mtime.begin(), matched_files_with_mtime.end(), [](const TailStateWithMtime &left, const TailStateWithMtime &right) { + return std::tie(left.mtime_, left.tail_state_.file_name_) < + std::tie(right.mtime_, right.tail_state_.file_name_); + }); + + if (!matched_files_with_mtime.empty() && state.position_ > 0) { + TailState &first_rotated_file = matched_files_with_mtime[0].tail_state_; + std::string full_file_name = first_rotated_file.fileNameWithPath(); + if (utils::file::FileUtils::file_size(full_file_name) >= state.position_) { + uint64_t checksum = utils::file::FileUtils::computeChecksum(full_file_name, state.position_); + if (checksum == state.checksum_) { + first_rotated_file.position_ = state.position_; + first_rotated_file.checksum_ = state.checksum_; } - TailMatchedFileItem item; - item.fileName = filename; - item.modifiedTime = ((uint64_t) (sb.st_mtime) * 1000); - matchedFiles.push_back(item); } } - return true;}; - - utils::file::FileUtils::list_dir(file.path_, lambda, logger_, false); - if (matchedFiles.size() < 1) { - logger_->log_debug("No newer files found in directory!"); - return; - } + std::vector<TailState> matched_files; + std::transform(matched_files_with_mtime.begin(), matched_files_with_mtime.end(), std::back_inserter(matched_files), + [](TailStateWithMtime &tail_state_with_mtime) { return std::move(tail_state_with_mtime.tail_state_); }); + return matched_files; +} - // Sort the list based on modified time - std::sort(matchedFiles.begin(), matchedFiles.end(), sortTailMatchedFileItem); - TailMatchedFileItem item = matchedFiles[0]; - logger_->log_info("TailFile File Roll Over from %s to %s", file.current_file_name_, item.fileName); +void TailFile::onTrigger(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSession> &session) { + std::lock_guard<std::mutex> tail_lock(tail_file_mutex_); - // Going ahead in the file rolled over - if (file.current_file_name_ != base_file_name) { - logger_->log_debug("Resetting posotion since %s != %s", base_file_name, file.current_file_name_); - file.currentTailFilePosition_ = 0; + if (tail_mode_ == Mode::MULTIPLE) { + if (last_multifile_lookup_ + lookup_frequency_ < std::chrono::steady_clock::now()) { + logger_->log_debug("Lookup frequency %d ms have elapsed, doing new multifile lookup", lookup_frequency_.count()); Review comment: The underlying types of `seconds` and `milliseconds` are "signed integer type of at least 35 bits" and "signed integer type of at least 45 bits", respectively, so I changed this (and similar logs) to `int64_t{some_duration.count()}` and `PRId64`. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org