fgerlits commented on a change in pull request #1052:
URL: https://github.com/apache/nifi-minifi-cpp/pull/1052#discussion_r617430402
##########
File path: extensions/standard-processors/processors/TailFile.cpp
##########
@@ -653,29 +690,56 @@ void TailFile::onTrigger(const
std::shared_ptr<core::ProcessContext> &, const st
if (!session->existsFlowFileInRelationship(Success)) {
yield();
}
+
+ first_trigger_ = false;
+}
+
+bool TailFile::isOldFileInitiallyRead(TailState &state) const {
+ // This is our initial processing and no stored state was found
+ return first_trigger_ && state.checksum_ == 0;
Review comment:
Checking `state.last_read_time_ == 0` instead of `state.checksum_ == 0`
would be safer, as there is a tiny chance the checksum happens to be zero on a
non-new file, but the timestamp can't be from 1970.
##########
File path: extensions/standard-processors/processors/TailFile.h
##########
@@ -147,13 +161,27 @@ class TailFile : public core::Processor {
std::string rolling_filename_pattern_;
+ std::string initial_start_position_;
Review comment:
I think it would be nicer to store this as an enum rather than a string.
You could use @adamdebreceni's `SMART_ENUM`, which has a `values()` method
that could be used instead of `INITIAL_START_POSITIONS`.
##########
File path: extensions/standard-processors/tests/unit/TailFileTests.cpp
##########
@@ -1536,3 +1514,197 @@ TEST_CASE("TailFile interprets the lookup frequency
property correctly", "[multi
REQUIRE(LogTestController::getInstance().contains("Logged 1 flow files"));
}
}
+
+TEST_CASE("TailFile reads from a single file when Initial Start Position is
set", "[initialStartPosition]") {
+ TestController testController;
+ LogTestController::getInstance().setTrace<minifi::processors::TailFile>();
+
LogTestController::getInstance().setDebug<minifi::processors::LogAttribute>();
+
+ std::shared_ptr<TestPlan> plan = testController.createPlan();
+ std::shared_ptr<core::Processor> tailfile = plan->addProcessor("TailFile",
"tailfileProc");
+ std::shared_ptr<core::Processor> logattribute =
plan->addProcessor("LogAttribute", "logattribute",
core::Relationship("success", "description"), true);
+
+ auto dir = minifi::utils::createTempDir(&testController);
+ createTempFile(dir, ROLLED_OVER_TMP_FILE, ROLLED_OVER_TAIL_DATA);
+ auto temp_file_path = createTempFile(dir, TMP_FILE, NEWLINE_FILE);
+
+ plan->setProperty(logattribute,
org::apache::nifi::minifi::processors::LogAttribute::FlowFilesToLog.getName(),
"0");
+ plan->setProperty(tailfile,
org::apache::nifi::minifi::processors::TailFile::FileName.getName(),
temp_file_path);
+ plan->setProperty(tailfile,
org::apache::nifi::minifi::processors::TailFile::Delimiter.getName(), "\n");
+
+ SECTION("Initial Start Position is set to Beginning of File") {
+ plan->setProperty(tailfile,
org::apache::nifi::minifi::processors::TailFile::InitialStartPosition.getName(),
"Beginning of File");
+
+ testController.runSession(plan);
+
+ REQUIRE(LogTestController::getInstance().contains("Logged 1 flow files"));
+ REQUIRE(LogTestController::getInstance().contains("Size:" +
std::to_string(NEWLINE_FILE.find_first_of('\n') + 1) + " Offset:0"));
+
+ plan->reset(true);
+
LogTestController::getInstance().resetStream(LogTestController::getInstance().log_output);
+
+ appendTempFile(dir, TMP_FILE, NEW_TAIL_DATA);
+
+ testController.runSession(plan);
+
+ REQUIRE(LogTestController::getInstance().contains("Logged 1 flow files"));
+ REQUIRE(LogTestController::getInstance().contains("Size:" +
std::to_string(NEWLINE_FILE.size() - NEWLINE_FILE.find_first_of('\n') +
NEW_TAIL_DATA.find_first_of('\n')) + " Offset:0"));
+ }
+
+ SECTION("Initial Start Position is set to Beginning of Time") {
+ plan->setProperty(tailfile,
org::apache::nifi::minifi::processors::TailFile::InitialStartPosition.getName(),
"Beginning of Time");
+
+ testController.runSession(plan);
+
+ REQUIRE(LogTestController::getInstance().contains("Logged 2 flow files"));
+ REQUIRE(LogTestController::getInstance().contains("Size:" +
std::to_string(NEWLINE_FILE.find_first_of('\n') + 1) + " Offset:0"));
+ REQUIRE(LogTestController::getInstance().contains("Size:" +
std::to_string(ROLLED_OVER_TAIL_DATA.find_first_of('\n') + 1) + " Offset:0"));
+
+ plan->reset(true);
+
LogTestController::getInstance().resetStream(LogTestController::getInstance().log_output);
+
+ appendTempFile(dir, TMP_FILE, NEW_TAIL_DATA);
+
+ testController.runSession(plan);
+
+ REQUIRE(LogTestController::getInstance().contains("Logged 1 flow files"));
+ REQUIRE(LogTestController::getInstance().contains("Size:" +
std::to_string(NEWLINE_FILE.size() - NEWLINE_FILE.find_first_of('\n') +
NEW_TAIL_DATA.find_first_of('\n')) + " Offset:0"));
+ }
+
+ SECTION("Initial Start Position is set to Current Time") {
+ plan->setProperty(tailfile,
org::apache::nifi::minifi::processors::TailFile::InitialStartPosition.getName(),
"Current Time");
+
+ testController.runSession(plan);
+
+ REQUIRE(LogTestController::getInstance().contains("Logged 0 flow files"));
+
+ plan->reset(true);
+
LogTestController::getInstance().resetStream(LogTestController::getInstance().log_output);
+
+ appendTempFile(dir, TMP_FILE, NEW_TAIL_DATA);
+
+ testController.runSession(plan);
+
+ REQUIRE(LogTestController::getInstance().contains("Logged 1 flow files"));
+ REQUIRE(LogTestController::getInstance().contains("Size:" +
std::to_string(NEW_TAIL_DATA.find_first_of('\n') + 1) + " Offset:0"));
+ }
+
+ LogTestController::getInstance().reset();
+}
+
+TEST_CASE("TailFile reads multiple files when Initial Start Position is set",
"[initialStartPosition]") {
+ TestController testController;
+ LogTestController::getInstance().setTrace<minifi::processors::TailFile>();
+
LogTestController::getInstance().setDebug<minifi::processors::LogAttribute>();
+
+ std::shared_ptr<TestPlan> plan = testController.createPlan();
+ std::shared_ptr<core::Processor> tailfile = plan->addProcessor("TailFile",
"tailfileProc");
+ std::shared_ptr<core::Processor> logattribute =
plan->addProcessor("LogAttribute", "logattribute",
core::Relationship("success", "description"), true);
+
+ auto dir = minifi::utils::createTempDir(&testController);
+ createTempFile(dir, ROLLED_OVER_TMP_FILE, ROLLED_OVER_TAIL_DATA);
+ createTempFile(dir, TMP_FILE, NEWLINE_FILE);
+ const std::string TMP_FILE_2_DATA = "tmp_file_2_new_line_data\n";
+ createTempFile(dir, "minifi-tmpfile-2.txt", TMP_FILE_2_DATA);
+
+ plan->setProperty(logattribute,
org::apache::nifi::minifi::processors::LogAttribute::FlowFilesToLog.getName(),
"0");
+ plan->setProperty(tailfile,
org::apache::nifi::minifi::processors::TailFile::FileName.getName(),
".*\\.txt");
+ plan->setProperty(tailfile,
org::apache::nifi::minifi::processors::TailFile::Delimiter.getName(), "\n");
+ plan->setProperty(tailfile,
org::apache::nifi::minifi::processors::TailFile::TailMode.getName(), "Multiple
file");
+ plan->setProperty(tailfile,
org::apache::nifi::minifi::processors::TailFile::BaseDirectory.getName(), dir);
+
+ SECTION("Initial Start Position is set to Beginning of File") {
+ plan->setProperty(tailfile,
org::apache::nifi::minifi::processors::TailFile::InitialStartPosition.getName(),
"Beginning of File");
+
+ testController.runSession(plan);
+
+ REQUIRE(LogTestController::getInstance().contains("Logged 2 flow files"));
+ REQUIRE(LogTestController::getInstance().contains("Size:" +
std::to_string(NEWLINE_FILE.find_first_of('\n') + 1) + " Offset:0"));
+ REQUIRE(LogTestController::getInstance().contains("Size:" +
std::to_string(TMP_FILE_2_DATA.find_first_of('\n') + 1) + " Offset:0"));
+
+ plan->reset(true);
+
LogTestController::getInstance().resetStream(LogTestController::getInstance().log_output);
+
+ createTempFile(dir, "minifi-tmpfile-3.txt", NEWLINE_FILE);
Review comment:
The test could be made clearer if the contents (and length) of
`minifi-tmpfile-3.txt` were different from the contents (and length) of
`TMP_FILE`.
##########
File path: extensions/standard-processors/processors/TailFile.cpp
##########
@@ -653,29 +690,56 @@ void TailFile::onTrigger(const
std::shared_ptr<core::ProcessContext> &, const st
if (!session->existsFlowFileInRelationship(Success)) {
yield();
}
+
+ first_trigger_ = false;
+}
+
+bool TailFile::isOldFileInitiallyRead(TailState &state) const {
+ // This is our initial processing and no stored state was found
+ return first_trigger_ && state.checksum_ == 0;
}
void TailFile::processFile(const std::shared_ptr<core::ProcessSession>
&session,
const std::string &full_file_name,
TailState &state) {
- uint64_t fsize = utils::file::FileUtils::file_size(full_file_name);
- if (fsize < state.position_) {
- processRotatedFiles(session, state);
- } else if (fsize == state.position_) {
- logger_->log_trace("Skipping file %s as its size hasn't change since last
read", state.file_name_);
- return;
+ if (isOldFileInitiallyRead(state)) {
+ if (initial_start_position_ == "Beginning of Time") {
+ processAllRotatedFiles(session, state);
+ } else if (initial_start_position_ == "Current Time") {
+ state.position_ = utils::file::FileUtils::file_size(full_file_name);
+ state.last_read_time_ = std::chrono::system_clock::now();
Review comment:
I think we also need to compute the checksum of the file up to this
point. Otherwise, when
1. MiNiFi starts with Current Time at the middle of `test.log`
1. `onTrigger()` reads a few more lines (and computes the checksum, but only
from the middle)
1. `test.log` gets rolled over to `test.log.1`
1. `onTrigger()` gets called again, it will compute the checksum of
`test.log.1` from the beginning, and the checksum will not match the state, so
it will re-read `test.log.1` from the beginning rather than from the point
where it left off.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]