This is an automated email from the ASF dual-hosted git repository.

aboda pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git


The following commit(s) were added to refs/heads/main by this push:
     new 41fc5ae  MINIFICPP-1313 Do a multifile lookup in onSchedule
41fc5ae is described below

commit 41fc5aec2becee1a799b7c69cd8717423e7e6286
Author: Ferenc Gerlits <[email protected]>
AuthorDate: Mon Aug 3 18:19:38 2020 +0200

    MINIFICPP-1313 Do a multifile lookup in onSchedule
    
    Signed-off-by: Arpad Boda <[email protected]>
    
    This closes #859
---
 .../standard-processors/processors/TailFile.cpp      | 18 ++++++++++++------
 extensions/standard-processors/processors/TailFile.h |  2 ++
 .../standard-processors/tests/unit/TailFileTests.cpp | 20 +++++++++++++++-----
 3 files changed, 29 insertions(+), 11 deletions(-)

diff --git a/extensions/standard-processors/processors/TailFile.cpp 
b/extensions/standard-processors/processors/TailFile.cpp
index 426667e..c5d2a1a 100644
--- a/extensions/standard-processors/processors/TailFile.cpp
+++ b/extensions/standard-processors/processors/TailFile.cpp
@@ -366,7 +366,9 @@ void TailFile::onSchedule(const 
std::shared_ptr<core::ProcessContext> &context,
 
     context->getProperty(LookupFrequency.getName(), lookup_frequency_);
 
-    // in multiple mode, we check for new/removed files in every onTrigger
+    recoverState(context);
+
+    doMultifileLookup();
 
   } else {
     tail_mode_ = Mode::SINGLE;
@@ -378,13 +380,13 @@ void TailFile::onSchedule(const 
std::shared_ptr<core::ProcessContext> &context,
     } else {
       throw minifi::Exception(ExceptionType::PROCESSOR_EXCEPTION, "File to 
tail must be a fully qualified file");
     }
+
+    recoverState(context);
   }
 
   std::string rolling_filename_pattern_glob;
   context->getProperty(RollingFilenamePattern.getName(), 
rolling_filename_pattern_glob);
   rolling_filename_pattern_ = 
utils::file::PathUtils::globToRegex(rolling_filename_pattern_glob);
-
-  recoverState(context);
 }
 
 void TailFile::parseStateFileLine(char *buf, std::map<std::string, TailState> 
&state) const {
@@ -633,9 +635,7 @@ void TailFile::onTrigger(const 
std::shared_ptr<core::ProcessContext> &, const st
   if (tail_mode_ == Mode::MULTIPLE) {
     if (last_multifile_lookup_ + lookup_frequency_ < 
std::chrono::steady_clock::now()) {
       logger_->log_debug("Lookup frequency %" PRId64 " ms have elapsed, doing 
new multifile lookup", int64_t{lookup_frequency_.count()});
-      checkForRemovedFiles();
-      checkForNewFiles();
-      last_multifile_lookup_ = std::chrono::steady_clock::now();
+      doMultifileLookup();
     } else {
       logger_->log_trace("Skipping multifile lookup");
     }
@@ -745,6 +745,12 @@ void TailFile::updateStateAttributes(TailState &state, 
uint64_t size, uint64_t c
   state.checksum_ = checksum;
 }
 
+void TailFile::doMultifileLookup() {
+  checkForRemovedFiles();
+  checkForNewFiles();
+  last_multifile_lookup_ = std::chrono::steady_clock::now();
+}
+
 void TailFile::checkForRemovedFiles() {
   std::vector<std::string> file_names_to_remove;
 
diff --git a/extensions/standard-processors/processors/TailFile.h 
b/extensions/standard-processors/processors/TailFile.h
index 080505c..c673601 100644
--- a/extensions/standard-processors/processors/TailFile.h
+++ b/extensions/standard-processors/processors/TailFile.h
@@ -168,6 +168,8 @@ class TailFile : public core::Processor {
   bool getStateFromLegacyStateFile(const 
std::shared_ptr<core::ProcessContext>& context,
                                    std::map<std::string, TailState> 
&new_tail_states) const;
 
+  void doMultifileLookup();
+
   void checkForRemovedFiles();
 
   void checkForNewFiles();
diff --git a/extensions/standard-processors/tests/unit/TailFileTests.cpp 
b/extensions/standard-processors/tests/unit/TailFileTests.cpp
index 4bd803b..7d0e2b7 100644
--- a/extensions/standard-processors/tests/unit/TailFileTests.cpp
+++ b/extensions/standard-processors/tests/unit/TailFileTests.cpp
@@ -1499,9 +1499,9 @@ TEST_CASE("TailFile interprets the lookup frequency 
property correctly", "[multi
   auto log_attribute = plan->addProcessor("LogAttribute", "Log", 
core::Relationship("success", "description"), true);
   plan->setProperty(log_attribute, 
processors::LogAttribute::FlowFilesToLog.getName(), "0");
 
-  testController.runSession(plan, true);
-
   SECTION("Lookup frequency not set => defaults to 10 minutes") {
+    testController.runSession(plan, true);
+
     std::shared_ptr<processors::TailFile> tail_file_processor = 
std::dynamic_pointer_cast<processors::TailFile>(tail_file);
     REQUIRE(tail_file_processor);
     REQUIRE(tail_file_processor->getLookupFrequency() == 
std::chrono::minutes{10});
@@ -1509,8 +1509,10 @@ TEST_CASE("TailFile interprets the lookup frequency 
property correctly", "[multi
 
   SECTION("Lookup frequency set to zero => new files are picked up 
immediately") {
     plan->setProperty(tail_file, 
processors::TailFile::LookupFrequency.getName(), "0 sec");
+    testController.runSession(plan, true);
+    REQUIRE(LogTestController::getInstance().contains("Logged 1 flow files"));
 
-    plan->reset(true);
+    plan->reset();
     
LogTestController::getInstance().resetStream(LogTestController::getInstance().log_output);
 
     createTempFile(directory, "test.blue.log", "sky\n");
@@ -1523,8 +1525,10 @@ TEST_CASE("TailFile interprets the lookup frequency 
property correctly", "[multi
 
   SECTION("Lookup frequency set to 100 ms => new files are only picked up 
after 100 ms") {
     plan->setProperty(tail_file, 
processors::TailFile::LookupFrequency.getName(), "100 ms");
+    testController.runSession(plan, true);
+    REQUIRE(LogTestController::getInstance().contains("Logged 1 flow files"));
 
-    plan->reset(true);
+    plan->reset();
     
LogTestController::getInstance().resetStream(LogTestController::getInstance().log_output);
 
     createTempFile(directory, "test.blue.log", "sky\n");
@@ -1533,11 +1537,17 @@ TEST_CASE("TailFile interprets the lookup frequency 
property correctly", "[multi
     testController.runSession(plan, true);
     REQUIRE(LogTestController::getInstance().contains("Logged 0 flow files"));
 
-    plan->reset(false);
+    plan->reset();
     
LogTestController::getInstance().resetStream(LogTestController::getInstance().log_output);
 
     std::this_thread::sleep_for(std::chrono::milliseconds(110));
     testController.runSession(plan, true);
     REQUIRE(LogTestController::getInstance().contains("Logged 2 flow files"));
   }
+
+  SECTION("Lookup frequency set to a thousand years => files already present 
when started are still picked up") {
+    plan->setProperty(tail_file, 
processors::TailFile::LookupFrequency.getName(), "365000 days");
+    testController.runSession(plan, true);
+    REQUIRE(LogTestController::getInstance().contains("Logged 1 flow files"));
+  }
 }

Reply via email to