fgerlits commented on a change in pull request #791:
URL: https://github.com/apache/nifi-minifi-cpp/pull/791#discussion_r439388761



##########
File path: extensions/standard-processors/processors/TailFile.cpp
##########
@@ -335,170 +563,210 @@ bool TailFile::storeState(const 
std::shared_ptr<core::ProcessContext>& context)
   return true;
 }
 
-static bool sortTailMatchedFileItem(TailMatchedFileItem i, TailMatchedFileItem 
j) {
-  return (i.modifiedTime < j.modifiedTime);
-}
-void TailFile::checkRollOver(const std::shared_ptr<core::ProcessContext>& 
context, TailState &file, const std::string &base_file_name) {
-  struct stat statbuf;
-  std::vector<TailMatchedFileItem> matchedFiles;
-  std::string fullPath = file.path_ + utils::file::FileUtils::get_separator() 
+ file.current_file_name_;
-
-  if (stat(fullPath.c_str(), &statbuf) == 0) {
-    logger_->log_trace("Searching for files rolled over");
-    std::string pattern = file.current_file_name_;
-    std::size_t found = file.current_file_name_.find_last_of(".");
-    if (found != std::string::npos)
-      pattern = file.current_file_name_.substr(0, found);
-
-    // Callback, called for each file entry in the listed directory
-    // Return value is used to break (false) or continue (true) listing
-    auto lambda = [&](const std::string& path, const std::string& filename) -> 
bool {
-      struct stat sb;
-      std::string fileFullName = path + 
utils::file::FileUtils::get_separator() + filename;
-      if ((fileFullName.find(pattern) != std::string::npos) && 
stat(fileFullName.c_str(), &sb) == 0) {
-        uint64_t candidateModTime = ((uint64_t) (sb.st_mtime) * 1000);
-        if (candidateModTime >= file.currentTailFileModificationTime_) {
-          logging::LOG_TRACE(logger_) << "File " << filename << " (short name 
" << file.current_file_name_ <<
-          ") disk mod time " << candidateModTime << ", struct mod time " << 
file.currentTailFileModificationTime_ << ", size on disk " << sb.st_size << ", 
position " << file.currentTailFilePosition_;
-          if (filename == file.current_file_name_ && candidateModTime == 
file.currentTailFileModificationTime_ &&
-              sb.st_size == file.currentTailFilePosition_) {
-            return true;  // Skip the current file as a candidate in case it 
wasn't updated
+std::vector<TailState> TailFile::findRotatedFiles(const TailState &state) 
const {
+  logger_->log_debug("Searching for files rolled over; last read time is %llu",
+      
std::chrono::time_point_cast<std::chrono::seconds>(state.last_read_time_).time_since_epoch().count());
+
+  std::size_t last_dot_position = state.file_name_.find_last_of('.');
+  std::string base_name = state.file_name_.substr(0, last_dot_position);
+  std::string pattern = 
utils::StringUtils::replaceOne(rolling_filename_pattern_, "${filename}", 
base_name);
+
+  std::vector<TailStateWithMtime> matched_files_with_mtime;
+  auto collect_matching_files = [&](const std::string &path, const std::string 
&file_name) -> bool {
+    if (file_name != state.file_name_ && 
utils::Regex::matchesFullInput(pattern, file_name)) {
+      std::string full_file_name = path + 
utils::file::FileUtils::get_separator() + file_name;
+      TailStateWithMtime::TimePoint 
mtime{utils::file::FileUtils::last_write_time_point(full_file_name)};
+      logger_->log_debug("File %s with mtime %llu matches rolling filename 
pattern %s", file_name, mtime.time_since_epoch().count(), pattern);
+      if (mtime >= 
std::chrono::time_point_cast<std::chrono::seconds>(state.last_read_time_)) {
+        logger_->log_debug("File %s has mtime >= last read time, so we are 
going to read it", file_name);
+        matched_files_with_mtime.emplace_back(TailState{path, file_name}, 
mtime);
+      }
+    }
+    return true;
+  };
+
+  utils::file::FileUtils::list_dir(state.path_, collect_matching_files, 
logger_, false);
+
+  std::sort(matched_files_with_mtime.begin(), matched_files_with_mtime.end(), 
[](const TailStateWithMtime &left, const TailStateWithMtime &right) {
+    return std::tie(left.mtime_, left.tail_state_.file_name_) <
+           std::tie(right.mtime_, right.tail_state_.file_name_);
+  });
+
+  if (!matched_files_with_mtime.empty() && state.position_ > 0) {
+    TailState &first_rotated_file = matched_files_with_mtime[0].tail_state_;
+    std::string full_file_name = first_rotated_file.fileNameWithPath();
+    if (utils::file::FileUtils::file_size(full_file_name) >= state.position_) {
+      uint64_t checksum = 
utils::file::FileUtils::computeChecksum(full_file_name, state.position_);
+      if (checksum == state.checksum_) {
+        first_rotated_file.position_ = state.position_;
+        first_rotated_file.checksum_ = state.checksum_;
       }
-      TailMatchedFileItem item;
-      item.fileName = filename;
-      item.modifiedTime = ((uint64_t) (sb.st_mtime) * 1000);
-      matchedFiles.push_back(item);
     }
   }
-  return true;};
-
-    utils::file::FileUtils::list_dir(file.path_, lambda, logger_, false);
 
-    if (matchedFiles.size() < 1) {
-      logger_->log_debug("No newer files found in directory!");
-      return;
-    }
+  std::vector<TailState> matched_files;
+  std::transform(matched_files_with_mtime.begin(), 
matched_files_with_mtime.end(), std::back_inserter(matched_files),
+                 [](TailStateWithMtime &tail_state_with_mtime) { return 
std::move(tail_state_with_mtime.tail_state_); });
+  return matched_files;
+}
 
-    // Sort the list based on modified time
-    std::sort(matchedFiles.begin(), matchedFiles.end(), 
sortTailMatchedFileItem);
-    TailMatchedFileItem item = matchedFiles[0];
-    logger_->log_info("TailFile File Roll Over from %s to %s", 
file.current_file_name_, item.fileName);
+void TailFile::onTrigger(const std::shared_ptr<core::ProcessContext> &context, 
const std::shared_ptr<core::ProcessSession> &session) {
+  std::lock_guard<std::mutex> tail_lock(tail_file_mutex_);
 
-    // Going ahead in the file rolled over
-    if (file.current_file_name_ != base_file_name) {
-      logger_->log_debug("Resetting posotion since %s != %s", base_file_name, 
file.current_file_name_);
-      file.currentTailFilePosition_ = 0;
+  if (tail_mode_ == Mode::MULTIPLE) {
+    if (last_multifile_lookup_ + lookup_frequency_ < 
std::chrono::steady_clock::now()) {
+      logger_->log_debug("Lookup frequency %d ms have elapsed, doing new 
multifile lookup", lookup_frequency_.count());

Review comment:
       The underlying types of `seconds` and `milliseconds` are "signed integer 
type of at least 35 bits" and "signed integer type of at least 45 bits", 
respectively, so I changed this (and similar logs) to 
`int64_t{some_duration.count()}` and `PRId64`.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to