This is an automated email from the ASF dual-hosted git repository.

phrocker pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git

commit 7cb81f17982f3b408be5ba34c0bf07113b68d915
Author: Arpad Boda <[email protected]>
AuthorDate: Thu Mar 28 12:13:52 2019 +0100

    MINIFICPP-792 - TailFile processor should handle rotation of source file
---
 .../standard-processors/processors/TailFile.cpp    | 134 ++++++---------
 .../standard-processors/processors/TailFile.h      |   2 +
 .../tests/unit/TailFileTests.cpp                   | 179 ++++++++++++++++++++-
 3 files changed, 227 insertions(+), 88 deletions(-)

diff --git a/extensions/standard-processors/processors/TailFile.cpp 
b/extensions/standard-processors/processors/TailFile.cpp
index 71314b5..8302aeb 100644
--- a/extensions/standard-processors/processors/TailFile.cpp
+++ b/extensions/standard-processors/processors/TailFile.cpp
@@ -36,6 +36,7 @@
 #include <sstream>
 #include <string>
 #include <iostream>
+#include "utils/file/FileUtils.h"
 #include "utils/TimeUtil.h"
 #include "utils/StringUtils.h"
 #include "TailFile.h"
@@ -164,90 +165,57 @@ void TailFile::storeState() {
 static bool sortTailMatchedFileItem(TailMatchedFileItem i, TailMatchedFileItem 
j) {
   return (i.modifiedTime < j.modifiedTime);
 }
-void TailFile::checkRollOver(const std::string &fileLocation, const 
std::string &fileName) {
+void TailFile::checkRollOver(const std::string &fileLocation, const 
std::string &baseFileName) {
   struct stat statbuf;
   std::vector<TailMatchedFileItem> matchedFiles;
   std::string fullPath = fileLocation + "/" + _currentTailFileName;
 
   if (stat(fullPath.c_str(), &statbuf) == 0) {
-    if (statbuf.st_size > this->_currentTailFilePosition)
-      // there are new input for the current tail file
-      return;
-
-    uint64_t modifiedTimeCurrentTailFile = ((uint64_t) (statbuf.st_mtime) * 
1000);
-    std::string pattern = fileName;
-    std::size_t found = fileName.find_last_of(".");
+    logger_->log_trace("Searching for files rolled over");
+    std::string pattern = baseFileName;
+    std::size_t found = baseFileName.find_last_of(".");
     if (found != std::string::npos)
-      pattern = fileName.substr(0, found);
-#ifndef WIN32
-    DIR *d;
-    d = opendir(fileLocation.c_str());
-    if (!d)
-      return;
-    while (1) {
-      struct dirent *entry;
-      entry = readdir(d);
-      if (!entry)
-        break;
-      std::string d_name = entry->d_name;
-      if (!(entry->d_type & DT_DIR)) {
-        std::string fileName = d_name;
-        std::string fileFullName = fileLocation + "/" + d_name;
-        if (fileFullName.find(pattern) != std::string::npos && 
stat(fileFullName.c_str(), &statbuf) == 0) {
-          if (((uint64_t) (statbuf.st_mtime) * 1000) >= 
modifiedTimeCurrentTailFile) {
-            TailMatchedFileItem item;
-            item.fileName = fileName;
-            item.modifiedTime = ((uint64_t) (statbuf.st_mtime) * 1000);
-            matchedFiles.push_back(item);
+      pattern = baseFileName.substr(0, found);
+
+    // Callback, called for each file entry in the listed directory
+    // Return value is used to break (false) or continue (true) listing
+    auto lambda = [&](const std::string& path, const std::string& filename) -> 
bool {
+      struct stat sb;
+      std::string fileFullName = path + 
utils::file::FileUtils::get_separator() + filename;
+      if ((fileFullName.find(pattern) != std::string::npos) && 
stat(fileFullName.c_str(), &sb) == 0) {
+        uint64_t candidateModTime = ((uint64_t) (sb.st_mtime) * 1000);
+        if (candidateModTime >= _currentTailFileModificationTime) {
+          if (filename == _currentTailFileName && candidateModTime == 
_currentTailFileModificationTime &&
+          sb.st_size == _currentTailFilePosition) {
+            return true;  // Skip the current file as a candidate in case it 
wasn't updated
           }
+          TailMatchedFileItem item;
+          item.fileName = filename;
+          item.modifiedTime = ((uint64_t) (sb.st_mtime) * 1000);
+          matchedFiles.push_back(item);
         }
       }
-    }
-    closedir(d);
-#else
-
-    HANDLE hFind;
-    WIN32_FIND_DATA FindFileData;
-
-    if ((hFind = FindFirstFile(fileLocation.c_str(), &FindFileData)) != 
INVALID_HANDLE_VALUE) {
-      do {
-        struct stat statbuf {};
-        if (stat(FindFileData.cFileName, &statbuf) != 0) {
-          logger_->log_warn("Failed to stat %s", FindFileData.cFileName);
-          break;
-        }
+      return true;
+    };
 
-        std::string fileFullName = fileLocation + "/" + FindFileData.cFileName;
+    utils::file::FileUtils::list_dir(fileLocation, lambda, logger_, false);
 
-        if (fileFullName.find(pattern) != std::string::npos && 
stat(fileFullName.c_str(), &statbuf) == 0) {
-          if (((uint64_t)(statbuf.st_mtime) * 1000) >= 
modifiedTimeCurrentTailFile) {
-            TailMatchedFileItem item;
-            item.fileName = fileName;
-            item.modifiedTime = ((uint64_t)(statbuf.st_mtime) * 1000);
-            matchedFiles.push_back(item);
-          }
-        }
-      }while (FindNextFile(hFind, &FindFileData));
-      FindClose(hFind);
+    if (matchedFiles.size() < 1) {
+      logger_->log_debug("No newer files found in directory!");
+      return;
     }
-#endif
 
     // Sort the list based on modified time
     std::sort(matchedFiles.begin(), matchedFiles.end(), 
sortTailMatchedFileItem);
-    for (std::vector<TailMatchedFileItem>::iterator it = matchedFiles.begin(); 
it != matchedFiles.end(); ++it) {
-      TailMatchedFileItem item = *it;
-      if (item.fileName == _currentTailFileName) {
-        ++it;
-        if (it != matchedFiles.end()) {
-          TailMatchedFileItem nextItem = *it;
-          logger_->log_info("TailFile File Roll Over from %s to %s", 
_currentTailFileName, nextItem.fileName);
-          _currentTailFileName = nextItem.fileName;
-          _currentTailFilePosition = 0;
-          storeState();
-        }
-        break;
-      }
+    TailMatchedFileItem item = matchedFiles[0];
+    logger_->log_info("TailFile File Roll Over from %s to %s", 
_currentTailFileName, item.fileName);
+
+    // Going ahead in the file rolled over
+    if (_currentTailFileName != baseFileName) {
+      _currentTailFilePosition = 0;
     }
+    _currentTailFileName = item.fileName;
+    storeState();
   } else {
     return;
   }
@@ -259,7 +227,7 @@ void TailFile::onTrigger(core::ProcessContext *context, 
core::ProcessSession *se
   std::string fileLocation = "";
   std::string fileName = "";
   if (context->getProperty(FileName.getName(), value)) {
-    std::size_t found = value.find_last_of("/\\");
+    std::size_t found = 
value.find_last_of(utils::file::FileUtils::get_separator());
     fileLocation = value.substr(0, found);
     fileName = value.substr(found + 1);
   }
@@ -280,7 +248,8 @@ void TailFile::onTrigger(core::ProcessContext *context, 
core::ProcessSession *se
   logger_->log_debug("Tailing file %s", fullPath);
   if (stat(fullPath.c_str(), &statbuf) == 0) {
     if ((uint64_t) statbuf.st_size <= this->_currentTailFilePosition) {
-      // there are no new input for the current tail file
+      logger_->log_trace("Current pos: %llu", this->_currentTailFilePosition);
+      logger_->log_trace("%s", "there are no new input for the current tail 
file");
       context->yield();
       return;
     }
@@ -329,19 +298,20 @@ void TailFile::onTrigger(core::ProcessContext *context, 
core::ProcessSession *se
 
     } else {
       std::shared_ptr<FlowFileRecord> flowFile = 
std::static_pointer_cast<FlowFileRecord>(session->create());
-      if (!flowFile)
-        return;
-      flowFile->updateKeyedAttribute(PATH, fileLocation);
-      flowFile->addKeyedAttribute(ABSOLUTE_PATH, fullPath);
-      session->import(fullPath, flowFile, true, 
this->_currentTailFilePosition);
-      session->transfer(flowFile, Success);
-      logger_->log_info("TailFile %s for %llu bytes", _currentTailFileName, 
flowFile->getSize());
-      std::string logName = baseName + "." + 
std::to_string(_currentTailFilePosition) + "-" + 
std::to_string(_currentTailFilePosition + flowFile->getSize()) + "." + 
extension;
-      flowFile->updateKeyedAttribute(FILENAME, logName);
-      this->_currentTailFilePosition += flowFile->getSize();
-      storeState();
+      if (flowFile) {
+        flowFile->updateKeyedAttribute(PATH, fileLocation);
+        flowFile->addKeyedAttribute(ABSOLUTE_PATH, fullPath);
+        session->import(fullPath, flowFile, true, 
this->_currentTailFilePosition);
+        session->transfer(flowFile, Success);
+        logger_->log_info("TailFile %s for %llu bytes", _currentTailFileName, 
flowFile->getSize());
+        std::string logName = baseName + "." + 
std::to_string(_currentTailFilePosition) + "-" +
+                              std::to_string(_currentTailFilePosition + 
flowFile->getSize()) + "." + extension;
+        flowFile->updateKeyedAttribute(FILENAME, logName);
+        this->_currentTailFilePosition += flowFile->getSize();
+        storeState();
+      }
     }
-
+    _currentTailFileModificationTime = ((uint64_t) (statbuf.st_mtime) * 1000);
   } else {
     logger_->log_warn("Unable to stat file %s", fullPath);
   }
diff --git a/extensions/standard-processors/processors/TailFile.h 
b/extensions/standard-processors/processors/TailFile.h
index 3714023..54296a2 100644
--- a/extensions/standard-processors/processors/TailFile.h
+++ b/extensions/standard-processors/processors/TailFile.h
@@ -43,6 +43,7 @@ class TailFile : public core::Processor {
   explicit TailFile(std::string name, utils::Identifier uuid = 
utils::Identifier())
       : core::Processor(name, uuid),
         _currentTailFilePosition(0),
+        _currentTailFileModificationTime(0),
         logger_(logging::LoggerFactory<TailFile>::getLogger()) {
     _stateRecovered = false;
   }
@@ -89,6 +90,7 @@ class TailFile : public core::Processor {
   // determine if state is recovered;
   bool _stateRecovered;
   uint64_t _currentTailFilePosition;
+  uint64_t _currentTailFileModificationTime;
   static const int BUFFER_SIZE = 512;
 
   // Utils functions for parse state file
diff --git a/extensions/standard-processors/tests/unit/TailFileTests.cpp 
b/extensions/standard-processors/tests/unit/TailFileTests.cpp
index c5cf392..b7c96ec 100644
--- a/extensions/standard-processors/tests/unit/TailFileTests.cpp
+++ b/extensions/standard-processors/tests/unit/TailFileTests.cpp
@@ -16,6 +16,7 @@
  * limitations under the License.
  */
 
+#include <stdio.h>
 #include <uuid/uuid.h>
 #include <fstream>
 #include <map>
@@ -37,13 +38,13 @@
 #include "LogAttribute.h"
 
 
-static const char *NEWLINE_FILE = ""
+static std::string NEWLINE_FILE = "" // NOLINT
     "one,two,three\n"
     "four,five,six, seven";
 static const char *TMP_FILE = "/tmp/minifi-tmpfile.txt";
 static const char *STATE_FILE = "/tmp/minifi-state-file.txt";
 
-TEST_CASE("TailFileWithDelimiter", "[tailfiletest1]") {
+TEST_CASE("TailFileWithDelimiter", "[tailfiletest2]") {
   // Create and write to the test file
       std::ofstream tmpfile;
       tmpfile.open(TMP_FILE);
@@ -76,8 +77,8 @@ TEST_CASE("TailFileWithDelimiter", "[tailfiletest1]") {
   LogTestController::getInstance().reset();
 
   // Delete the test and state file.
-    std::remove(TMP_FILE);
-    std::remove(STATE_FILE);
+  remove(TMP_FILE);
+  remove(STATE_FILE);
 }
 
 TEST_CASE("TailFileWithOutDelimiter", "[tailfiletest2]") {
@@ -113,9 +114,175 @@ TEST_CASE("TailFileWithOutDelimiter", "[tailfiletest2]") {
   LogTestController::getInstance().reset();
 
   // Delete the test and state file.
-    std::remove(TMP_FILE);
-    std::remove(STATE_FILE);
+  remove(TMP_FILE);
+  remove(STATE_FILE);
+}
+
+TEST_CASE("TailFileWithRealDelimiterAndRotate", "[tailfiletest2]") {
+  TestController testController;
+
+  const char DELIM = ',';
+  size_t expected_pieces = std::count(NEWLINE_FILE.begin(), 
NEWLINE_FILE.end(), DELIM);  // The last piece is left as considered unfinished
+
+
+  LogTestController::getInstance().setTrace<TestPlan>();
+  LogTestController::getInstance().setTrace<processors::TailFile>();
+  LogTestController::getInstance().setTrace<processors::LogAttribute>();
+  LogTestController::getInstance().setTrace<core::ProcessSession>();
+
+  auto plan = testController.createPlan();
+
+  char format[] = "/tmp/gt.XXXXXX";
+  char *dir = testController.createTempDirectory(format);
+
+  // Define test input file
+  std::string in_file(dir);
+  in_file.append("/testfifo.txt");
+
+  std::string state_file(dir);
+  state_file.append("tailfile.state");
+
+  std::ofstream in_file_stream(in_file);
+  in_file_stream << NEWLINE_FILE;
+  in_file_stream.flush();
+
+  // Build MiNiFi processing graph
+  auto tail_file = plan->addProcessor(
+      "TailFile",
+      "Tail");
+  plan->setProperty(
+      tail_file,
+      processors::TailFile::Delimiter.getName(), std::string(1, DELIM));
+  plan->setProperty(
+      tail_file,
+      processors::TailFile::FileName.getName(), in_file);
+  plan->setProperty(
+      tail_file,
+      processors::TailFile::StateFile.getName(), state_file);
+  auto log_attr = plan->addProcessor(
+      "LogAttribute",
+      "Log",
+      core::Relationship("success", "description"),
+      true);
+  plan->setProperty(
+      log_attr,
+      processors::LogAttribute::FlowFilesToLog.getName(), "0");
+  // Log as many FFs as it can to make sure exactly the expected amount is 
produced
+
+
+  plan->runNextProcessor();  // Tail
+  plan->runNextProcessor();  // Log
+
+  REQUIRE(LogTestController::getInstance().contains(std::string("Logged ") + 
std::to_string(expected_pieces) + " flow files"));
+
+  in_file_stream << DELIM;
+  in_file_stream.close();
+
+
+  std::string rotated_file = (in_file + ".1");
+
+  REQUIRE(rename(in_file.c_str(), rotated_file.c_str() ) == 0);
+
+  std::this_thread::sleep_for(std::chrono::milliseconds(1000));  // make sure 
the new file gets newer modification time
+
+  std::ofstream new_in_file_stream(in_file);
+  new_in_file_stream << "five" << DELIM << "six" << DELIM;
+  new_in_file_stream.close();
+
+  plan->reset();
+  plan->runNextProcessor();  // Tail
+  plan->runNextProcessor();  // Log
+
+  // Find the last flow file in the rotated file
+  REQUIRE(LogTestController::getInstance().contains("Logged 1 flow files"));
+
+  plan->reset();
+  plan->runNextProcessor();  // Tail
+  plan->runNextProcessor();  // Log
+
+  // Two new files in the new flow file
+  REQUIRE(LogTestController::getInstance().contains("Logged 2 flow files"));
 }
+
+TEST_CASE("TailFileWithMultileRolledOverFiles", "[tailfiletest2]") {
+  TestController testController;
+
+  const char DELIM = ':';
+
+  LogTestController::getInstance().setTrace<TestPlan>();
+  LogTestController::getInstance().setTrace<processors::TailFile>();
+  LogTestController::getInstance().setTrace<processors::LogAttribute>();
+  LogTestController::getInstance().setTrace<core::ProcessSession>();
+
+  auto plan = testController.createPlan();
+
+  char format[] = "/tmp/gt.XXXXXX";
+  char *dir = testController.createTempDirectory(format);
+
+  std::string state_file(dir);
+  state_file.append("tailfile.state");
+
+  // Define test input file
+  std::string in_file(dir);
+  in_file.append("/fruits.txt");
+
+  for (int i = 2; 0 <= i; --i) {
+    if (i < 2) {
+      std::this_thread::sleep_for(
+          std::chrono::milliseconds(1000));  // make sure the new file gets 
newer modification time
+    }
+    std::ofstream in_file_stream(in_file + (i > 0 ? std::to_string(i) : ""));
+    for (int j = 0; j <= i; j++) {
+      in_file_stream << "Apple" << DELIM;
+    }
+    in_file_stream.close();
+  }
+
+  // Build MiNiFi processing graph
+  auto tail_file = plan->addProcessor(
+      "TailFile",
+      "Tail");
+  plan->setProperty(
+      tail_file,
+      processors::TailFile::Delimiter.getName(), std::string(1, DELIM));
+  plan->setProperty(
+      tail_file,
+      processors::TailFile::FileName.getName(), in_file);
+  plan->setProperty(
+      tail_file,
+      processors::TailFile::StateFile.getName(), state_file);
+  auto log_attr = plan->addProcessor(
+      "LogAttribute",
+      "Log",
+      core::Relationship("success", "description"),
+      true);
+  plan->setProperty(
+      log_attr,
+      processors::LogAttribute::FlowFilesToLog.getName(), "0");
+  // Log as many FFs as it can to make sure exactly the expected amount is 
produced
+
+
+  // Each iteration should go through one file and log all flowfiles
+  for (int i = 2; 0 <= i; --i) {
+    plan->reset();
+    plan->runNextProcessor();  // Tail
+    plan->runNextProcessor();  // Log
+
+    REQUIRE(LogTestController::getInstance().contains(std::string("Logged ") + 
std::to_string(i + 1) + " flow files"));
+  }
+
+  // Rrite some more data to the source file
+  std::ofstream in_file_stream(in_file);
+  in_file_stream << "Pear" << DELIM << "Cherry" << DELIM;
+
+  plan->reset();
+  plan->runNextProcessor();  // Tail
+  plan->runNextProcessor();  // Log
+
+  REQUIRE(LogTestController::getInstance().contains(std::string("Logged 2 flow 
files")));
+}
+
+
 /*
 TEST_CASE("TailFileWithDelimiter", "[tailfiletest1]") {
   try {

Reply via email to