arpadboda commented on a change in pull request #529: MINIFICPP-792 - TailFile 
processor should handle rotation of source file
URL: https://github.com/apache/nifi-minifi-cpp/pull/529#discussion_r288627223
 
 

 ##########
 File path: extensions/standard-processors/tests/unit/TailFileTests.cpp
 ##########
 @@ -113,9 +114,175 @@ TEST_CASE("TailFileWithOutDelimiter", "[tailfiletest2]") 
{
   LogTestController::getInstance().reset();
 
   // Delete the test and state file.
-    std::remove(TMP_FILE);
-    std::remove(STATE_FILE);
+  remove(TMP_FILE);
+  remove(STATE_FILE);
+}
+
+TEST_CASE("TailFileWithRealDelimiterAndRotate", "[tailfiletest2]") {
+  TestController testController;
+
+  const char DELIM = ',';
+  size_t expected_pieces = std::count(NEWLINE_FILE.begin(), 
NEWLINE_FILE.end(), DELIM); // The last piece is left as considered unfinished
+
+
+  LogTestController::getInstance().setTrace<TestPlan>();
+  LogTestController::getInstance().setTrace<processors::TailFile>();
+  LogTestController::getInstance().setTrace<processors::LogAttribute>();
+  LogTestController::getInstance().setTrace<core::ProcessSession>();
+
+  auto plan = testController.createPlan();
+
+  char format[] = "/tmp/gt.XXXXXX";
+  char *dir = testController.createTempDirectory(format);
+
+  // Define test input file
+  std::string in_file(dir);
+  in_file.append("/testfifo.txt");
+
+  std::string state_file(dir);
+  state_file.append("tailfile.state");
+
+  std::ofstream in_file_stream(in_file);
+  in_file_stream << NEWLINE_FILE;
+  in_file_stream.flush();
+
+  // Build MiNiFi processing graph
+  auto tail_file = plan->addProcessor(
+      "TailFile",
+      "Tail");
+  plan->setProperty(
+      tail_file,
+      processors::TailFile::Delimiter.getName(), std::string(1, DELIM));
+  plan->setProperty(
+      tail_file,
+      processors::TailFile::FileName.getName(), in_file);
+  plan->setProperty(
+      tail_file,
+      processors::TailFile::StateFile.getName(), state_file);
+  auto log_attr = plan->addProcessor(
+      "LogAttribute",
+      "Log",
+      core::Relationship("success", "description"),
+      true);
+  plan->setProperty(
+      log_attr,
+      processors::LogAttribute::FlowFilesToLog.getName(), "0");
+  // Log as many FFs as it can to make sure exactly the expected amount is 
produced
+
+
+  plan->runNextProcessor();  // Tail
+  plan->runNextProcessor();  // Log
+
+  REQUIRE(LogTestController::getInstance().contains(std::string("Logged ") + 
std::to_string(expected_pieces) + " flow files"));
+
+  in_file_stream << DELIM;
+  in_file_stream.close();
+
+
+  std::string rotated_file = (in_file + ".1");
+
+  REQUIRE(rename(in_file.c_str(), rotated_file.c_str() ) == 0);
+
+  std::this_thread::sleep_for(std::chrono::milliseconds(1000)); // make sure 
the new file gets newer modification time
+
+  std::ofstream new_in_file_stream(in_file);
+  new_in_file_stream << "five" << DELIM << "six" << DELIM;
+  new_in_file_stream.close();
+
+  plan->reset();
+  plan->runNextProcessor();  // Tail
+  plan->runNextProcessor();  // Log
+
+  //Find the last flow file in the rotated file
+  REQUIRE(LogTestController::getInstance().contains("Logged 1 flow files"));
+
+  plan->reset();
+  plan->runNextProcessor();  // Tail
+  plan->runNextProcessor();  // Log
+
+  //Two new files in the new flow file
+  REQUIRE(LogTestController::getInstance().contains("Logged 2 flow files"));
 }
+
+TEST_CASE("TailFileWithMultileRolledOverFiles", "[tailfiletest2]") {
+  TestController testController;
+
+  const char DELIM = ':';
+
+  LogTestController::getInstance().setTrace<TestPlan>();
+  LogTestController::getInstance().setTrace<processors::TailFile>();
+  LogTestController::getInstance().setTrace<processors::LogAttribute>();
+  LogTestController::getInstance().setTrace<core::ProcessSession>();
+
+  auto plan = testController.createPlan();
+
+  char format[] = "/tmp/gt.XXXXXX";
+  char *dir = testController.createTempDirectory(format);
+
+  std::string state_file(dir);
+  state_file.append("tailfile.state");
+
+  // Define test input file
+  std::string in_file(dir);
+  in_file.append("/fruits.txt");
+
+  for (int i = 2; 0 <= i; --i) {
+    if (i < 2) {
+      std::this_thread::sleep_for(
+          std::chrono::milliseconds(1000)); // make sure the new file gets 
newer modification time
+    }
+    std::ofstream in_file_stream(in_file + (i > 0 ? std::to_string(i) : ""));
+    for (int j = 0; j <= i; j++) {
+      in_file_stream << "Apple" << DELIM;
+    }
+    in_file_stream.close();
+  }
+
+  // Build MiNiFi processing graph
+  auto tail_file = plan->addProcessor(
+      "TailFile",
+      "Tail");
+  plan->setProperty(
+      tail_file,
+      processors::TailFile::Delimiter.getName(), std::string(1, DELIM));
+  plan->setProperty(
+      tail_file,
+      processors::TailFile::FileName.getName(), in_file);
+  plan->setProperty(
+      tail_file,
+      processors::TailFile::StateFile.getName(), state_file);
+  auto log_attr = plan->addProcessor(
+      "LogAttribute",
+      "Log",
+      core::Relationship("success", "description"),
+      true);
+  plan->setProperty(
+      log_attr,
+      processors::LogAttribute::FlowFilesToLog.getName(), "0");
 
 Review comment:
   @phrocker you deserve some good IPAs for this, it's so cool to test with! 🍺 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to