This is an automated email from the ASF dual-hosted git repository.
aboda pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git
The following commit(s) were added to refs/heads/main by this push:
new 41fc5ae MINIFICPP-1313 Do a multifile lookup in onSchedule
41fc5ae is described below
commit 41fc5aec2becee1a799b7c69cd8717423e7e6286
Author: Ferenc Gerlits <[email protected]>
AuthorDate: Mon Aug 3 18:19:38 2020 +0200
MINIFICPP-1313 Do a multifile lookup in onSchedule
Signed-off-by: Arpad Boda <[email protected]>
This closes #859
---
.../standard-processors/processors/TailFile.cpp | 18 ++++++++++++------
extensions/standard-processors/processors/TailFile.h | 2 ++
.../standard-processors/tests/unit/TailFileTests.cpp | 20 +++++++++++++++-----
3 files changed, 29 insertions(+), 11 deletions(-)
diff --git a/extensions/standard-processors/processors/TailFile.cpp
b/extensions/standard-processors/processors/TailFile.cpp
index 426667e..c5d2a1a 100644
--- a/extensions/standard-processors/processors/TailFile.cpp
+++ b/extensions/standard-processors/processors/TailFile.cpp
@@ -366,7 +366,9 @@ void TailFile::onSchedule(const
std::shared_ptr<core::ProcessContext> &context,
context->getProperty(LookupFrequency.getName(), lookup_frequency_);
- // in multiple mode, we check for new/removed files in every onTrigger
+ recoverState(context);
+
+ doMultifileLookup();
} else {
tail_mode_ = Mode::SINGLE;
@@ -378,13 +380,13 @@ void TailFile::onSchedule(const
std::shared_ptr<core::ProcessContext> &context,
} else {
throw minifi::Exception(ExceptionType::PROCESSOR_EXCEPTION, "File to
tail must be a fully qualified file");
}
+
+ recoverState(context);
}
std::string rolling_filename_pattern_glob;
context->getProperty(RollingFilenamePattern.getName(),
rolling_filename_pattern_glob);
rolling_filename_pattern_ =
utils::file::PathUtils::globToRegex(rolling_filename_pattern_glob);
-
- recoverState(context);
}
void TailFile::parseStateFileLine(char *buf, std::map<std::string, TailState>
&state) const {
@@ -633,9 +635,7 @@ void TailFile::onTrigger(const
std::shared_ptr<core::ProcessContext> &, const st
if (tail_mode_ == Mode::MULTIPLE) {
if (last_multifile_lookup_ + lookup_frequency_ <
std::chrono::steady_clock::now()) {
logger_->log_debug("Lookup frequency %" PRId64 " ms have elapsed, doing
new multifile lookup", int64_t{lookup_frequency_.count()});
- checkForRemovedFiles();
- checkForNewFiles();
- last_multifile_lookup_ = std::chrono::steady_clock::now();
+ doMultifileLookup();
} else {
logger_->log_trace("Skipping multifile lookup");
}
@@ -745,6 +745,12 @@ void TailFile::updateStateAttributes(TailState &state,
uint64_t size, uint64_t c
state.checksum_ = checksum;
}
+void TailFile::doMultifileLookup() {
+ checkForRemovedFiles();
+ checkForNewFiles();
+ last_multifile_lookup_ = std::chrono::steady_clock::now();
+}
+
void TailFile::checkForRemovedFiles() {
std::vector<std::string> file_names_to_remove;
diff --git a/extensions/standard-processors/processors/TailFile.h
b/extensions/standard-processors/processors/TailFile.h
index 080505c..c673601 100644
--- a/extensions/standard-processors/processors/TailFile.h
+++ b/extensions/standard-processors/processors/TailFile.h
@@ -168,6 +168,8 @@ class TailFile : public core::Processor {
bool getStateFromLegacyStateFile(const
std::shared_ptr<core::ProcessContext>& context,
std::map<std::string, TailState>
&new_tail_states) const;
+ void doMultifileLookup();
+
void checkForRemovedFiles();
void checkForNewFiles();
diff --git a/extensions/standard-processors/tests/unit/TailFileTests.cpp
b/extensions/standard-processors/tests/unit/TailFileTests.cpp
index 4bd803b..7d0e2b7 100644
--- a/extensions/standard-processors/tests/unit/TailFileTests.cpp
+++ b/extensions/standard-processors/tests/unit/TailFileTests.cpp
@@ -1499,9 +1499,9 @@ TEST_CASE("TailFile interprets the lookup frequency
property correctly", "[multi
auto log_attribute = plan->addProcessor("LogAttribute", "Log",
core::Relationship("success", "description"), true);
plan->setProperty(log_attribute,
processors::LogAttribute::FlowFilesToLog.getName(), "0");
- testController.runSession(plan, true);
-
SECTION("Lookup frequency not set => defaults to 10 minutes") {
+ testController.runSession(plan, true);
+
std::shared_ptr<processors::TailFile> tail_file_processor =
std::dynamic_pointer_cast<processors::TailFile>(tail_file);
REQUIRE(tail_file_processor);
REQUIRE(tail_file_processor->getLookupFrequency() ==
std::chrono::minutes{10});
@@ -1509,8 +1509,10 @@ TEST_CASE("TailFile interprets the lookup frequency
property correctly", "[multi
SECTION("Lookup frequency set to zero => new files are picked up
immediately") {
plan->setProperty(tail_file,
processors::TailFile::LookupFrequency.getName(), "0 sec");
+ testController.runSession(plan, true);
+ REQUIRE(LogTestController::getInstance().contains("Logged 1 flow files"));
- plan->reset(true);
+ plan->reset();
LogTestController::getInstance().resetStream(LogTestController::getInstance().log_output);
createTempFile(directory, "test.blue.log", "sky\n");
@@ -1523,8 +1525,10 @@ TEST_CASE("TailFile interprets the lookup frequency
property correctly", "[multi
SECTION("Lookup frequency set to 100 ms => new files are only picked up
after 100 ms") {
plan->setProperty(tail_file,
processors::TailFile::LookupFrequency.getName(), "100 ms");
+ testController.runSession(plan, true);
+ REQUIRE(LogTestController::getInstance().contains("Logged 1 flow files"));
- plan->reset(true);
+ plan->reset();
LogTestController::getInstance().resetStream(LogTestController::getInstance().log_output);
createTempFile(directory, "test.blue.log", "sky\n");
@@ -1533,11 +1537,17 @@ TEST_CASE("TailFile interprets the lookup frequency
property correctly", "[multi
testController.runSession(plan, true);
REQUIRE(LogTestController::getInstance().contains("Logged 0 flow files"));
- plan->reset(false);
+ plan->reset();
LogTestController::getInstance().resetStream(LogTestController::getInstance().log_output);
std::this_thread::sleep_for(std::chrono::milliseconds(110));
testController.runSession(plan, true);
REQUIRE(LogTestController::getInstance().contains("Logged 2 flow files"));
}
+
+ SECTION("Lookup frequency set to a thousand years => files already present
when started are still picked up") {
+ plan->setProperty(tail_file,
processors::TailFile::LookupFrequency.getName(), "365000 days");
+ testController.runSession(plan, true);
+ REQUIRE(LogTestController::getInstance().contains("Logged 1 flow files"));
+ }
}