lordgamez commented on a change in pull request #1052:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1052#discussion_r618508219



##########
File path: extensions/standard-processors/processors/TailFile.cpp
##########
@@ -653,29 +688,57 @@ void TailFile::onTrigger(const 
std::shared_ptr<core::ProcessContext> &, const st
   if (!session->existsFlowFileInRelationship(Success)) {
     yield();
   }
+
+  first_trigger_ = false;
+}
+
+bool TailFile::isOldFileInitiallyRead(TailState &state) const {
+  // This is our initial processing and no stored state was found
+  return first_trigger_ && state.last_read_time_ == 
std::chrono::system_clock::time_point{};
 }
 
 void TailFile::processFile(const std::shared_ptr<core::ProcessSession> 
&session,
                            const std::string &full_file_name,
                            TailState &state) {
-  uint64_t fsize = utils::file::FileUtils::file_size(full_file_name);
-  if (fsize < state.position_) {
-    processRotatedFiles(session, state);
-  } else if (fsize == state.position_) {
-    logger_->log_trace("Skipping file %s as its size hasn't change since last 
read", state.file_name_);
-    return;
+  if (isOldFileInitiallyRead(state)) {
+    if (initial_start_position_ == InitialStartPositions::BEGINNING_OF_TIME) {
+      processAllRotatedFiles(session, state);
+    } else if (initial_start_position_ == InitialStartPositions::CURRENT_TIME) 
{
+      state.position_ = utils::file::FileUtils::file_size(full_file_name);
+      state.last_read_time_ = std::chrono::system_clock::now();
+      state.checksum_ = 
utils::file::FileUtils::computeChecksum(full_file_name, state.position_);
+      storeState();
+      return;
+    }
+  } else {
+    uint64_t fsize = utils::file::FileUtils::file_size(full_file_name);
+    if (fsize < state.position_) {
+      processRotatedFilesAfterLastReadTime(session, state);
+    } else if (fsize == state.position_) {
+      logger_->log_trace("Skipping file %s as its size hasn't change since 
last read", state.file_name_);
+      return;
+    }
   }
 
   processSingleFile(session, full_file_name, state);
 }
 
-void TailFile::processRotatedFiles(const std::shared_ptr<core::ProcessSession> 
&session, TailState &state) {
-    std::vector<TailState> rotated_file_states = findRotatedFiles(state);
-    for (TailState &file_state : rotated_file_states) {
-      processSingleFile(session, file_state.fileNameWithPath(), file_state);
-    }
-    state.position_ = 0;
-    state.checksum_ = 0;
+void TailFile::processRotatedFilesAfterLastReadTime(const 
std::shared_ptr<core::ProcessSession> &session, TailState &state) {
+  std::vector<TailState> rotated_file_states = 
findRotatedFilesAfterLastReadTime(state);
+  processRotatedFiles(session, state, rotated_file_states);
+}
+
+void TailFile::processAllRotatedFiles(const 
std::shared_ptr<core::ProcessSession> &session, TailState &state) {
+  std::vector<TailState> rotated_file_states = findAllRotatedFiles(state);
+  processRotatedFiles(session, state, rotated_file_states);
+}
+
+void TailFile::processRotatedFiles(const std::shared_ptr<core::ProcessSession> 
&session, TailState &state, std::vector<TailState> &rotated_file_states) {
+  for (TailState &file_state : rotated_file_states) {
+    processSingleFile(session, file_state.fileNameWithPath(), file_state);

Review comment:
       It should be for a restart scenario, when the agent is restarted we 
should know that those files are already processed and not new files, as in 
that case we do not check for rollover patterns.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to