szaszm commented on a change in pull request #791:
URL: https://github.com/apache/nifi-minifi-cpp/pull/791#discussion_r432671663



##########
File path: extensions/standard-processors/processors/TailFile.cpp
##########
@@ -58,42 +47,108 @@
 #define S_ISDIR(mode)  (((mode) & S_IFMT) == S_IFDIR)
 #endif
 
-#if defined(__clang__)
-#pragma clang diagnostic push
-#pragma clang diagnostic ignored "-Wsign-compare"
-#elif defined(__GNUC__) || defined(__GNUG__)
-#pragma GCC diagnostic push
-#pragma GCC diagnostic ignored "-Wsign-compare"
-#endif
-
 namespace org {
 namespace apache {
 namespace nifi {
 namespace minifi {
 namespace processors {
 
-core::Property TailFile::FileName("File to Tail", "Fully-qualified filename of 
the file that should be tailed when using single file mode, or a file regex 
when using multifile mode", "");
-core::Property TailFile::StateFile("State File", "DEPRECATED. Only use it for 
state migration from the legacy state file.",
-                                   "TailFileState");
-core::Property TailFile::Delimiter("Input Delimiter", "Specifies the character 
that should be used for delimiting the data being tailed"
-                                   "from the incoming file."
-                                   "If none is specified, data will be 
ingested as it becomes available.",
-                                   "");
+core::Property TailFile::FileName(
+    core::PropertyBuilder::createProperty("File to Tail")
+        ->withDescription("Fully-qualified filename of the file that should be 
tailed when using single file mode, or a file regex when using multifile mode")
+        ->isRequired(true)
+        ->build());
+
+core::Property TailFile::StateFile(
+    core::PropertyBuilder::createProperty("State File")
+        ->withDescription("DEPRECATED. Only use it for state migration from 
the legacy state file.")
+        ->isRequired(false)
+        ->withDefaultValue<std::string>("TailFileState")
+        ->build());
+
+core::Property TailFile::Delimiter(
+    core::PropertyBuilder::createProperty("Input Delimiter")
+        ->withDescription("Specifies the character that should be used for 
delimiting the data being tailed"
+         "from the incoming file. If none is specified, data will be ingested 
as it becomes available.")
+        ->isRequired(false)
+        ->withDefaultValue<std::string>("\\n")
+        ->build());
 
 core::Property TailFile::TailMode(
-    core::PropertyBuilder::createProperty("tail-mode", "Tailing 
Mode")->withDescription(
-        "Specifies the tail file mode. In 'Single file' mode only a single 
file will be watched. "
+    core::PropertyBuilder::createProperty("tail-mode", "Tailing Mode")
+        ->withDescription("Specifies the tail file mode. In 'Single file' mode 
only a single file will be watched. "
         "In 'Multiple file' mode a regex may be used. Note that in multiple 
file mode we will still continue to watch for rollover on the initial set of 
watched files. "
         "The Regex used to locate multiple files will be run during the 
schedule phrase. Note that if rotated files are matched by the regex, those 
files will be tailed.")->isRequired(true)
-        ->withAllowableValue<std::string>("Single 
file")->withAllowableValue("Multiple file")->withDefaultValue("Single 
file")->build());
-
-core::Property 
TailFile::BaseDirectory(core::PropertyBuilder::createProperty("tail-base-directory",
 "Base Directory")->isRequired(false)->build());
+        ->withAllowableValue<std::string>("Single 
file")->withAllowableValue("Multiple file")->withDefaultValue("Single file")
+        ->build());
+
+core::Property TailFile::BaseDirectory(
+    core::PropertyBuilder::createProperty("tail-base-directory", "Base 
Directory")
+        ->withDescription("Base directory used to look for files to tail. This 
property is required when using Multiple file mode.")
+        ->isRequired(false)
+        ->build());
+
+core::Property TailFile::RollingFilenamePattern(
+    core::PropertyBuilder::createProperty("Rolling Filename Pattern")
+        ->withDescription("If the file to tail \"rolls over\" as would be the 
case with log files, this filename pattern will be used to "
+        "identify files that have rolled over so MiNiFi can read the remaining 
of the rolled-over file and then continue with the new log file. "
+        "This pattern supports the wildcard characters * and ?, it also 
supports the notation ${filename} to specify a pattern based on the name of the 
file "
+        "(without extension), and will assume that the files that have rolled 
over live in the same directory as the file being tailed.")
+        ->isRequired(false)
+        ->withDefaultValue<std::string>("${filename}.*")
+        ->build());
 
 core::Relationship TailFile::Success("success", "All files are routed to 
success");
 
 const char *TailFile::CURRENT_STR = "CURRENT.";
 const char *TailFile::POSITION_STR = "POSITION.";
 
+namespace {
+  template<typename Container, typename Key>
+  bool containsKey(const Container &container, const Key &key) {
+    return container.find(key) != container.end();
+  }
+
+  template <typename Container, typename Key>
+  uint64_t readOptionalUint64(const Container &container, const Key &key) {
+    const auto it = container.find(key);
+    if (it != container.end()) {
+      return std::stoull(it->second);
+    } else {
+      return 0;
+    }
+  }
+
+  // the delimiter is the first character of the input, allowing some escape 
sequences
+  std::string parseDelimiter(const std::string &input) {
+    if (!input.empty()) {
+      if (input[0] == '\\') {
+        if (input.size() > (std::size_t) 1) {
+          switch (input[1]) {
+            case 'r':
+              return "\r";
+            case 't':
+              return "\t";
+            case 'n':
+              return "\n";
+            case '\\':
+              return "\\";
+            default:
+              return input.substr(1, 1);
+          }
+        } else {
+          return "\\";
+        }
+      } else {
+        return input.substr(0, 1);
+      }
+    } else {
+      return "";
+    }
+  }

Review comment:
       I suggest handling the simple branches locally and reducing the 
indentation level.
   ```suggestion
     std::string parseDelimiter(const std::string &input) {
       if (input.empty()) return "";
       if (input[0] != '\\') return std::string{ input[0] };
       if (input.size() == std::size_t{1}) return "\\";
       switch (input[1]) {
         case 'r': return "\r";
         case 't': return "\t";
         case 'n': return "\n";
         default: return std::string{ input[1] };
       }
     }
   ```

##########
File path: extensions/standard-processors/processors/TailFile.cpp
##########
@@ -335,65 +387,45 @@ bool TailFile::storeState(const 
std::shared_ptr<core::ProcessContext>& context)
   return true;
 }
 
-static bool sortTailMatchedFileItem(TailMatchedFileItem i, TailMatchedFileItem 
j) {
-  return (i.modifiedTime < j.modifiedTime);
-}
-void TailFile::checkRollOver(const std::shared_ptr<core::ProcessContext>& 
context, TailState &file, const std::string &base_file_name) {
-  struct stat statbuf;
-  std::vector<TailMatchedFileItem> matchedFiles;
-  std::string fullPath = file.path_ + utils::file::FileUtils::get_separator() 
+ file.current_file_name_;
-
-  if (stat(fullPath.c_str(), &statbuf) == 0) {
-    logger_->log_trace("Searching for files rolled over");
-    std::string pattern = file.current_file_name_;
-    std::size_t found = file.current_file_name_.find_last_of(".");
-    if (found != std::string::npos)
-      pattern = file.current_file_name_.substr(0, found);
-
-    // Callback, called for each file entry in the listed directory
-    // Return value is used to break (false) or continue (true) listing
-    auto lambda = [&](const std::string& path, const std::string& filename) -> 
bool {
-      struct stat sb;
-      std::string fileFullName = path + 
utils::file::FileUtils::get_separator() + filename;
-      if ((fileFullName.find(pattern) != std::string::npos) && 
stat(fileFullName.c_str(), &sb) == 0) {
-        uint64_t candidateModTime = ((uint64_t) (sb.st_mtime) * 1000);
-        if (candidateModTime >= file.currentTailFileModificationTime_) {
-          logging::LOG_TRACE(logger_) << "File " << filename << " (short name 
" << file.current_file_name_ <<
-          ") disk mod time " << candidateModTime << ", struct mod time " << 
file.currentTailFileModificationTime_ << ", size on disk " << sb.st_size << ", 
position " << file.currentTailFilePosition_;
-          if (filename == file.current_file_name_ && candidateModTime == 
file.currentTailFileModificationTime_ &&
-              sb.st_size == file.currentTailFilePosition_) {
-            return true;  // Skip the current file as a candidate in case it 
wasn't updated
-      }
-      TailMatchedFileItem item;
-      item.fileName = filename;
-      item.modifiedTime = ((uint64_t) (sb.st_mtime) * 1000);
-      matchedFiles.push_back(item);
-    }
-  }
-  return true;};
+std::vector<TailState> TailFile::findRotatedFiles(const TailState &state) 
const {
+  logger_->log_trace("Searching for files rolled over");
 
-    utils::file::FileUtils::list_dir(file.path_, lambda, logger_, false);
+  std::size_t last_dot_position = state.file_name_.find_last_of('.');
+  std::string base_name = state.file_name_.substr(0, last_dot_position);
+  std::string pattern = 
utils::file::PathUtils::replacePlaceholderWithBaseName(rolling_filename_pattern_,
 base_name);
 
-    if (matchedFiles.size() < 1) {
-      logger_->log_debug("No newer files found in directory!");
-      return;
+  std::vector<TailState> matched_files;
+  auto collect_matching_files = [&](const std::string &path, const std::string 
&file_name) -> bool {
+    if (file_name != state.file_name_ && 
utils::Regex::matchesFullInput(pattern, file_name)) {
+      std::string full_file_name = path + 
utils::file::FileUtils::get_separator() + file_name;
+      uint64_t mtime = utils::file::FileUtils::last_write_time(full_file_name);
+      if (mtime >= state.timestamp_ / 1000) {

Review comment:
       Can we use chrono units here instead of magic division? Only applies if 
the changes are feasible because magic math was present before.

##########
File path: extensions/standard-processors/processors/TailFile.cpp
##########
@@ -204,52 +249,78 @@ void TailFile::parseStateFileLine(char *buf) {
   }
 
   std::string value = equal;
-  key = trimRight(key);
-  value = trimRight(value);
+  key = utils::StringUtils::trimRight(key);
+  value = utils::StringUtils::trimRight(value);
 
   if (key == "FILENAME") {
     std::string fileLocation, fileName;
     if (utils::file::PathUtils::getFileNameAndPath(value, fileLocation, 
fileName)) {
       logger_->log_debug("State migration received path %s, file %s", 
fileLocation, fileName);
-      tail_states_.insert(std::make_pair(fileName, TailState { fileLocation, 
fileName, 0, 0 }));
+      state.emplace(fileName, TailState{fileLocation, fileName, 0, 0, 0, 0});
     } else {
-      tail_states_.insert(std::make_pair(value, TailState { fileLocation, 
value, 0, 0 }));
+      state.emplace(value, TailState{fileLocation, value, 0, 0, 0, 0});
     }
   }
   if (key == "POSITION") {
     // for backwards compatibility
-    if (tail_states_.size() != 1) {
+    if (tail_states_.size() != (std::size_t) 1) {

Review comment:
       I recommend replacing the C-style cast with a 
direct-list-initialization, as recommended here: 
https://stackoverflow.com/a/22346540/3997716
   
   ```suggestion
       if (tail_states_.size() != std::size_t{ 1 }) {
   ```

##########
File path: extensions/standard-processors/processors/TailFile.cpp
##########
@@ -204,52 +249,78 @@ void TailFile::parseStateFileLine(char *buf) {
   }
 
   std::string value = equal;
-  key = trimRight(key);
-  value = trimRight(value);
+  key = utils::StringUtils::trimRight(key);
+  value = utils::StringUtils::trimRight(value);
 
   if (key == "FILENAME") {
     std::string fileLocation, fileName;
     if (utils::file::PathUtils::getFileNameAndPath(value, fileLocation, 
fileName)) {
       logger_->log_debug("State migration received path %s, file %s", 
fileLocation, fileName);
-      tail_states_.insert(std::make_pair(fileName, TailState { fileLocation, 
fileName, 0, 0 }));
+      state.emplace(fileName, TailState{fileLocation, fileName, 0, 0, 0, 0});
     } else {
-      tail_states_.insert(std::make_pair(value, TailState { fileLocation, 
value, 0, 0 }));
+      state.emplace(value, TailState{fileLocation, value, 0, 0, 0, 0});
     }
   }
   if (key == "POSITION") {
     // for backwards compatibility
-    if (tail_states_.size() != 1) {
+    if (tail_states_.size() != (std::size_t) 1) {
       throw minifi::Exception(ExceptionType::PROCESSOR_EXCEPTION, 
"Incompatible state file types");
     }
     const auto position = std::stoull(value);
     logger_->log_debug("Received position %d", position);
-    tail_states_.begin()->second.currentTailFilePosition_ = position;
+    state.begin()->second.position_ = position;
   }
   if (key.find(CURRENT_STR) == 0) {
     const auto file = key.substr(strlen(CURRENT_STR));
     std::string fileLocation, fileName;
     if (utils::file::PathUtils::getFileNameAndPath(value, fileLocation, 
fileName)) {
-      tail_states_[file].path_ = fileLocation;
-      tail_states_[file].current_file_name_ = fileName;
+      state[file].path_ = fileLocation;
+      state[file].file_name_ = fileName;
     } else {
       throw minifi::Exception(ExceptionType::PROCESSOR_EXCEPTION, "State file 
contains an invalid file name");
     }
   }
 
   if (key.find(POSITION_STR) == 0) {
     const auto file = key.substr(strlen(POSITION_STR));
-    tail_states_[file].currentTailFilePosition_ = std::stoull(value);
+    state[file].position_ = std::stoull(value);
   }
 }
 
+bool TailFile::recoverState(const std::shared_ptr<core::ProcessContext>& 
context) {
+  std::map<std::string, TailState> new_tail_states;
+  bool state_load_success = getStateFromStateManager(new_tail_states) ||
+                            getStateFromLegacyStateFile(new_tail_states);

Review comment:
       Nice, elegant solution. :+1: 

##########
File path: libminifi/test/unit/FileUtilsTests.cpp
##########
@@ -178,3 +179,157 @@ TEST_CASE("TestFileUtils::getFullPath", 
"[TestGetFullPath]") {
   REQUIRE(tempDir1 == 
utils::file::PathUtils::getFullPath(".\\test2\\..\\test1"));
 #endif
 }
+
+TEST_CASE("FileUtils::last_write_time works", "[last_write_time]") {
+  uint64_t timeBeforeWrite = getTimeMillis() / 1000;
+
+  TestController testController;
+
+  char format[] = "/tmp/gt.XXXXXX";
+  std::string dir = testController.createTempDirectory(format);
+
+  std::string test_file = dir + FileUtils::get_separator() + "test.txt";
+  REQUIRE(FileUtils::last_write_time(test_file) == 0);
+
+  std::ofstream test_file_stream(test_file);
+  test_file_stream << "foo\n";
+  test_file_stream.flush();
+
+  uint64_t timeAfterFirstWrite = getTimeMillis() / 1000;
+
+  uint64_t first_mtime = FileUtils::last_write_time(test_file);
+  REQUIRE(first_mtime >= timeBeforeWrite);
+  REQUIRE(first_mtime <= timeAfterFirstWrite);
+
+  test_file_stream << "bar\n";
+  test_file_stream.flush();
+
+  uint64_t timeAfterSecondWrite = getTimeMillis() / 1000;
+
+  uint64_t second_mtime = FileUtils::last_write_time(test_file);
+  REQUIRE(second_mtime >= first_mtime);
+  REQUIRE(second_mtime >= timeAfterFirstWrite);
+  REQUIRE(second_mtime <= timeAfterSecondWrite);
+
+  test_file_stream.close();
+  uint64_t third_mtime = FileUtils::last_write_time(test_file);
+  REQUIRE(third_mtime == second_mtime);

Review comment:
       Comparing two close measurements of time with equals is going to make 
the tests unstable because execution takes some time. I suggest using an 
epsilon or just skipping the third measurement altogether.

##########
File path: extensions/standard-processors/processors/TailFile.cpp
##########
@@ -403,102 +435,136 @@ void TailFile::onTrigger(const 
std::shared_ptr<core::ProcessContext> &context, c
     state_file_ = st_file + "." + getUUIDStr();
   }
   if (!this->state_recovered_) {
-    state_recovered_ = true;
-    // recover the state if we have not done so
     this->recoverState(context);
+    state_recovered_ = true;
+  }
+
+  if (tail_mode_ == Mode::MULTIPLE) {
+    checkForRemovedFiles();
+    checkForNewFiles();
   }
 
-  /**
-   * iterate over file states. may modify them
-   */
+  // iterate over file states. may modify them
   for (auto &state : tail_states_) {
-    auto fileLocation = state.second.path_;
-
-    checkRollOver(context, state.second, state.first);
-    std::string fullPath = fileLocation + 
utils::file::FileUtils::get_separator() + state.second.current_file_name_;
-    struct stat statbuf;
-
-    logger_->log_debug("Tailing file %s from %llu", fullPath, 
state.second.currentTailFilePosition_);
-    if (stat(fullPath.c_str(), &statbuf) == 0) {
-      if ((uint64_t) statbuf.st_size <= state.second.currentTailFilePosition_) 
{
-        logger_->log_trace("Current pos: %llu", 
state.second.currentTailFilePosition_);
-        logger_->log_trace("%s", "there are no new input for the current tail 
file");
-        context->yield();
-        return;
-      }
-      std::size_t found = state.first.find_last_of(".");
-      std::string baseName = state.first.substr(0, found);
-      std::string extension = state.first.substr(found + 1);
-
-      if (!delimiter_.empty()) {
-        char delim = delimiter_.c_str()[0];
-        if (delim == '\\') {
-          if (delimiter_.size() > 1) {
-            switch (delimiter_.c_str()[1]) {
-              case 'r':
-                delim = '\r';
-                break;
-              case 't':
-                delim = '\t';
-                break;
-              case 'n':
-                delim = '\n';
-                break;
-              case '\\':
-                delim = '\\';
-                break;
-              default:
-                // previous behavior
-                break;
-            }
-          }
-        }
-        logger_->log_debug("Looking for delimiter 0x%X", delim);
-        std::vector<std::shared_ptr<FlowFileRecord>> flowFiles;
-        session->import(fullPath, flowFiles, 
state.second.currentTailFilePosition_, delim);
-        logger_->log_info("%u flowfiles were received from TailFile input", 
flowFiles.size());
-
-        for (auto ffr : flowFiles) {
-          logger_->log_info("TailFile %s for %u bytes", state.first, 
ffr->getSize());
-          std::string logName = baseName + "." + 
std::to_string(state.second.currentTailFilePosition_) + "-" + 
std::to_string(state.second.currentTailFilePosition_ + ffr->getSize()) + "." + 
extension;
-          ffr->updateKeyedAttribute(PATH, fileLocation);
-          ffr->addKeyedAttribute(ABSOLUTE_PATH, fullPath);
-          ffr->updateKeyedAttribute(FILENAME, logName);
-          session->transfer(ffr, Success);
-          state.second.currentTailFilePosition_ += ffr->getSize() + 1;
-          storeState(context);
-        }
+    processFile(context, session, state.first, state.second);
+  }
 
-      } else {
-        std::shared_ptr<FlowFileRecord> flowFile = 
std::static_pointer_cast<FlowFileRecord>(session->create());
-        if (flowFile) {
-          flowFile->updateKeyedAttribute(PATH, fileLocation);
-          flowFile->addKeyedAttribute(ABSOLUTE_PATH, fullPath);
-          session->import(fullPath, flowFile, true, 
state.second.currentTailFilePosition_);
-          session->transfer(flowFile, Success);
-          logger_->log_info("TailFile %s for %llu bytes", state.first, 
flowFile->getSize());
-          std::string logName = baseName + "." + 
std::to_string(state.second.currentTailFilePosition_) + "-" + 
std::to_string(state.second.currentTailFilePosition_ + flowFile->getSize()) + 
"."
-              + extension;
-          flowFile->updateKeyedAttribute(FILENAME, logName);
-          state.second.currentTailFilePosition_ += flowFile->getSize();
-          storeState(context);
-        }
-      }
-      state.second.currentTailFileModificationTime_ = ((uint64_t) 
(statbuf.st_mtime) * 1000);
-    } else {
-      logger_->log_warn("Unable to stat file %s", fullPath);
+  if (!session->existsFlowFileInRelationship(Success)) {
+    yield();
+  }
+}
+
+void TailFile::processFile(const std::shared_ptr<core::ProcessContext> 
&context,
+                           const std::shared_ptr<core::ProcessSession> 
&session,
+                           const std::string &fileName,
+                           TailState &state) {
+  if (utils::file::FileUtils::file_size(state.fileNameWithPath()) < 
state.position_) {
+    processRotatedFiles(context, session, state);
+  }
+
+  processSingleFile(context, session, fileName, state);
+}
+
+void TailFile::processRotatedFiles(const std::shared_ptr<core::ProcessContext> 
&context,
+                                   const std::shared_ptr<core::ProcessSession> 
&session,
+                                   TailState &state) {
+    std::vector<TailState> rotated_file_states = findRotatedFiles(state);
+    for (TailState &file_state : rotated_file_states) {
+      processSingleFile(context, session, file_state.file_name_, file_state);
     }
+    state.position_ = 0;
+    state.checksum_ = 0;
+}
+
+void TailFile::processSingleFile(const std::shared_ptr<core::ProcessContext> 
&context,
+                                 const std::shared_ptr<core::ProcessSession> 
&session,
+                                 const std::string &fileName,
+                                 TailState &state) {
+  std::string full_file_name = state.fileNameWithPath();
+
+  if (utils::file::FileUtils::file_size(full_file_name) == 0u) {
+    logger_->log_warn("Unable to read file %s as it does not exist or has size 
zero", full_file_name);
+    return;
+  }
+  logger_->log_debug("Tailing file %s from %llu", full_file_name, 
state.position_);
+
+  std::size_t last_dot_position = fileName.find_last_of('.');
+  std::string baseName = fileName.substr(0, last_dot_position);
+  std::string extension = fileName.substr(last_dot_position + 1);
+
+  if (!delimiter_.empty()) {
+    char delim = delimiter_[0];
+    logger_->log_trace("Looking for delimiter 0x%X", delim);
+
+    std::vector<std::shared_ptr<FlowFileRecord>> flowFiles;
+    uint64_t checksum = state.checksum_;
+    session->import(full_file_name, flowFiles, state.position_, delim, true, 
&checksum);
+    logger_->log_info("%u flowfiles were received from TailFile input", 
flowFiles.size());
+
+    for (auto &ffr : flowFiles) {
+      logger_->log_info("TailFile %s for %u bytes", fileName, ffr->getSize());
+      std::string logName = baseName + "." + std::to_string(state.position_) + 
"-" +
+                            std::to_string(state.position_ + ffr->getSize() - 
1) + "." + extension;
+      ffr->updateKeyedAttribute(PATH, state.path_);
+      ffr->addKeyedAttribute(ABSOLUTE_PATH, full_file_name);
+      ffr->updateKeyedAttribute(FILENAME, logName);
+      session->transfer(ffr, Success);
+      state.position_ += ffr->getSize();
+      state.timestamp_ = getTimeMillis();
+      state.checksum_ = checksum;
+      storeState(context);
+    }
+  } else {
+    std::shared_ptr<FlowFileRecord> flowFile = 
std::static_pointer_cast<FlowFileRecord>(session->create());
+    if (flowFile) {

Review comment:
       Inverting the condition could reduce indentation level.
   ```suggestion
       if (!flowFile) { return; }
   ```

##########
File path: extensions/standard-processors/processors/TailFile.cpp
##########
@@ -403,102 +435,136 @@ void TailFile::onTrigger(const 
std::shared_ptr<core::ProcessContext> &context, c
     state_file_ = st_file + "." + getUUIDStr();
   }
   if (!this->state_recovered_) {
-    state_recovered_ = true;
-    // recover the state if we have not done so
     this->recoverState(context);
+    state_recovered_ = true;
+  }
+
+  if (tail_mode_ == Mode::MULTIPLE) {
+    checkForRemovedFiles();
+    checkForNewFiles();
   }
 
-  /**
-   * iterate over file states. may modify them
-   */
+  // iterate over file states. may modify them
   for (auto &state : tail_states_) {
-    auto fileLocation = state.second.path_;
-
-    checkRollOver(context, state.second, state.first);
-    std::string fullPath = fileLocation + 
utils::file::FileUtils::get_separator() + state.second.current_file_name_;
-    struct stat statbuf;
-
-    logger_->log_debug("Tailing file %s from %llu", fullPath, 
state.second.currentTailFilePosition_);
-    if (stat(fullPath.c_str(), &statbuf) == 0) {
-      if ((uint64_t) statbuf.st_size <= state.second.currentTailFilePosition_) 
{
-        logger_->log_trace("Current pos: %llu", 
state.second.currentTailFilePosition_);
-        logger_->log_trace("%s", "there are no new input for the current tail 
file");
-        context->yield();
-        return;
-      }
-      std::size_t found = state.first.find_last_of(".");
-      std::string baseName = state.first.substr(0, found);
-      std::string extension = state.first.substr(found + 1);
-
-      if (!delimiter_.empty()) {
-        char delim = delimiter_.c_str()[0];
-        if (delim == '\\') {
-          if (delimiter_.size() > 1) {
-            switch (delimiter_.c_str()[1]) {
-              case 'r':
-                delim = '\r';
-                break;
-              case 't':
-                delim = '\t';
-                break;
-              case 'n':
-                delim = '\n';
-                break;
-              case '\\':
-                delim = '\\';
-                break;
-              default:
-                // previous behavior
-                break;
-            }
-          }
-        }
-        logger_->log_debug("Looking for delimiter 0x%X", delim);
-        std::vector<std::shared_ptr<FlowFileRecord>> flowFiles;
-        session->import(fullPath, flowFiles, 
state.second.currentTailFilePosition_, delim);
-        logger_->log_info("%u flowfiles were received from TailFile input", 
flowFiles.size());
-
-        for (auto ffr : flowFiles) {
-          logger_->log_info("TailFile %s for %u bytes", state.first, 
ffr->getSize());
-          std::string logName = baseName + "." + 
std::to_string(state.second.currentTailFilePosition_) + "-" + 
std::to_string(state.second.currentTailFilePosition_ + ffr->getSize()) + "." + 
extension;
-          ffr->updateKeyedAttribute(PATH, fileLocation);
-          ffr->addKeyedAttribute(ABSOLUTE_PATH, fullPath);
-          ffr->updateKeyedAttribute(FILENAME, logName);
-          session->transfer(ffr, Success);
-          state.second.currentTailFilePosition_ += ffr->getSize() + 1;
-          storeState(context);
-        }
+    processFile(context, session, state.first, state.second);
+  }
 
-      } else {
-        std::shared_ptr<FlowFileRecord> flowFile = 
std::static_pointer_cast<FlowFileRecord>(session->create());
-        if (flowFile) {
-          flowFile->updateKeyedAttribute(PATH, fileLocation);
-          flowFile->addKeyedAttribute(ABSOLUTE_PATH, fullPath);
-          session->import(fullPath, flowFile, true, 
state.second.currentTailFilePosition_);
-          session->transfer(flowFile, Success);
-          logger_->log_info("TailFile %s for %llu bytes", state.first, 
flowFile->getSize());
-          std::string logName = baseName + "." + 
std::to_string(state.second.currentTailFilePosition_) + "-" + 
std::to_string(state.second.currentTailFilePosition_ + flowFile->getSize()) + 
"."
-              + extension;
-          flowFile->updateKeyedAttribute(FILENAME, logName);
-          state.second.currentTailFilePosition_ += flowFile->getSize();
-          storeState(context);
-        }
-      }
-      state.second.currentTailFileModificationTime_ = ((uint64_t) 
(statbuf.st_mtime) * 1000);
-    } else {
-      logger_->log_warn("Unable to stat file %s", fullPath);
+  if (!session->existsFlowFileInRelationship(Success)) {
+    yield();
+  }
+}
+
+void TailFile::processFile(const std::shared_ptr<core::ProcessContext> 
&context,
+                           const std::shared_ptr<core::ProcessSession> 
&session,
+                           const std::string &fileName,
+                           TailState &state) {
+  if (utils::file::FileUtils::file_size(state.fileNameWithPath()) < 
state.position_) {
+    processRotatedFiles(context, session, state);
+  }
+
+  processSingleFile(context, session, fileName, state);
+}
+
+void TailFile::processRotatedFiles(const std::shared_ptr<core::ProcessContext> 
&context,
+                                   const std::shared_ptr<core::ProcessSession> 
&session,
+                                   TailState &state) {
+    std::vector<TailState> rotated_file_states = findRotatedFiles(state);
+    for (TailState &file_state : rotated_file_states) {
+      processSingleFile(context, session, file_state.file_name_, file_state);
     }
+    state.position_ = 0;
+    state.checksum_ = 0;
+}
+
+void TailFile::processSingleFile(const std::shared_ptr<core::ProcessContext> 
&context,
+                                 const std::shared_ptr<core::ProcessSession> 
&session,
+                                 const std::string &fileName,
+                                 TailState &state) {
+  std::string full_file_name = state.fileNameWithPath();
+
+  if (utils::file::FileUtils::file_size(full_file_name) == 0u) {
+    logger_->log_warn("Unable to read file %s as it does not exist or has size 
zero", full_file_name);
+    return;
+  }
+  logger_->log_debug("Tailing file %s from %llu", full_file_name, 
state.position_);
+
+  std::size_t last_dot_position = fileName.find_last_of('.');
+  std::string baseName = fileName.substr(0, last_dot_position);
+  std::string extension = fileName.substr(last_dot_position + 1);
+
+  if (!delimiter_.empty()) {
+    char delim = delimiter_[0];
+    logger_->log_trace("Looking for delimiter 0x%X", delim);
+
+    std::vector<std::shared_ptr<FlowFileRecord>> flowFiles;
+    uint64_t checksum = state.checksum_;
+    session->import(full_file_name, flowFiles, state.position_, delim, true, 
&checksum);
+    logger_->log_info("%u flowfiles were received from TailFile input", 
flowFiles.size());
+
+    for (auto &ffr : flowFiles) {
+      logger_->log_info("TailFile %s for %u bytes", fileName, ffr->getSize());
+      std::string logName = baseName + "." + std::to_string(state.position_) + 
"-" +
+                            std::to_string(state.position_ + ffr->getSize() - 
1) + "." + extension;
+      ffr->updateKeyedAttribute(PATH, state.path_);
+      ffr->addKeyedAttribute(ABSOLUTE_PATH, full_file_name);
+      ffr->updateKeyedAttribute(FILENAME, logName);
+      session->transfer(ffr, Success);
+      state.position_ += ffr->getSize();
+      state.timestamp_ = getTimeMillis();
+      state.checksum_ = checksum;
+      storeState(context);
+    }
+  } else {
+    std::shared_ptr<FlowFileRecord> flowFile = 
std::static_pointer_cast<FlowFileRecord>(session->create());

Review comment:
       `auto` could be used to avoid having to repeat the type name.

##########
File path: libminifi/src/utils/RegexUtils.cpp
##########
@@ -158,6 +158,16 @@ const std::vector<std::string>& Regex::getResult() const { 
return results_; }
 
 const std::string& Regex::getSuffix() const { return suffix_; }
 
+bool Regex::matchesFullInput(const std::string &regex, const std::string 
&input) {
+#ifdef NO_MORE_REGFREEE
+  std::regex re{regex};
+  return std::regex_match(input, re);
+#else
+  Regex rgx('^' + regex + '$');
+  return rgx.match(input);
+#endif

Review comment:
       I assume `NO_MORE_REGFREEE` is a transitional macro from `Regex` to 
`std::regex`. Would it make sense to drop the non-`std::regex` version?

##########
File path: libminifi/src/utils/file/FileUtils.cpp
##########
@@ -0,0 +1,58 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "utils/file/FileUtils.h"
+
+#include <zlib.h>
+
+#include <algorithm>
+#include <iostream>
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace utils {
+namespace file {
+
+  uint64_t FileUtils::computeChecksum(std::string file_name, uint64_t 
up_to_position) {
+    constexpr uint64_t BUFFER_SIZE = 4096u;
+    std::array<char, (std::size_t) BUFFER_SIZE> buffer;
+
+    std::ifstream stream{file_name, std::ios::in | std::ios::binary};
+
+    uint64_t checksum = 0;
+    uint64_t remaining_bytes_to_be_read = up_to_position;
+
+    while (stream && remaining_bytes_to_be_read > 0) {
+      using std::min;  // for min(), to work around 
https://docs.microsoft.com/en-us/windows/win32/multimedia/min

Review comment:
       FYI: You can suppress the macro substitution and force `std::min` by 
putting the name inside parentheses.
   ```
   (std::min)(BUFFER_SIZE, remaining_bytes_to_be_read)
   ```
   
   I'm fine with either version in this case but have a preference for the 
standard version in general to avoid multiple evaluation of expressions with 
side effects.

##########
File path: extensions/standard-processors/tests/unit/TailFileTests.cpp
##########
@@ -47,7 +47,33 @@ static std::string NEWLINE_FILE = ""  // NOLINT
 static const char *TMP_FILE = "minifi-tmpfile.txt";
 static const char *STATE_FILE = "minifi-state-file.txt";
 
-TEST_CASE("TailFileWithDelimiter", "[tailfiletest2]") {
+namespace {
+  std::string createTempFile(const std::string &directory, const std::string 
&file_name, const std::string &contents,
+      std::ios_base::openmode open_mode = std::ios::out | std::ios::binary) {
+    std::string full_file_name = directory + 
utils::file::FileUtils::get_separator() + file_name;
+    std::ofstream tmpfile{full_file_name, open_mode};
+    tmpfile << contents;
+    return full_file_name;
+  }
+
+  void appendTempFile(const std::string &directory, const std::string 
&file_name, const std::string &contents,
+      std::ios_base::openmode open_mode = std::ios::app | std::ios::binary) {
+    createTempFile(directory, file_name, contents, open_mode);
+  }
+
+  void removeFile(const std::string &directory, const std::string &file_name) {
+    std::string full_file_name = directory + 
utils::file::FileUtils::get_separator() + file_name;
+    std::remove(full_file_name.c_str());
+  }
+
+  void renameTempFile(const std::string &directory, const std::string 
&old_file_name, const std::string &new_file_name) {
+    std::string old_full_file_name = directory + 
utils::file::FileUtils::get_separator() + old_file_name;
+    std::string new_full_file_name = directory + 
utils::file::FileUtils::get_separator() + new_file_name;
+    rename(old_full_file_name.c_str(), new_full_file_name.c_str());
+  }
+}  // namespace

Review comment:
       Namespace contents are not indented. 
   
   https://google.github.io/styleguide/cppguide.html#Namespace_Formatting

##########
File path: extensions/standard-processors/processors/TailFile.cpp
##########
@@ -403,102 +435,136 @@ void TailFile::onTrigger(const 
std::shared_ptr<core::ProcessContext> &context, c
     state_file_ = st_file + "." + getUUIDStr();
   }
   if (!this->state_recovered_) {
-    state_recovered_ = true;
-    // recover the state if we have not done so
     this->recoverState(context);
+    state_recovered_ = true;
+  }
+
+  if (tail_mode_ == Mode::MULTIPLE) {
+    checkForRemovedFiles();
+    checkForNewFiles();
   }
 
-  /**
-   * iterate over file states. may modify them
-   */
+  // iterate over file states. may modify them
   for (auto &state : tail_states_) {
-    auto fileLocation = state.second.path_;
-
-    checkRollOver(context, state.second, state.first);
-    std::string fullPath = fileLocation + 
utils::file::FileUtils::get_separator() + state.second.current_file_name_;
-    struct stat statbuf;
-
-    logger_->log_debug("Tailing file %s from %llu", fullPath, 
state.second.currentTailFilePosition_);
-    if (stat(fullPath.c_str(), &statbuf) == 0) {
-      if ((uint64_t) statbuf.st_size <= state.second.currentTailFilePosition_) 
{
-        logger_->log_trace("Current pos: %llu", 
state.second.currentTailFilePosition_);
-        logger_->log_trace("%s", "there are no new input for the current tail 
file");
-        context->yield();
-        return;
-      }
-      std::size_t found = state.first.find_last_of(".");
-      std::string baseName = state.first.substr(0, found);
-      std::string extension = state.first.substr(found + 1);
-
-      if (!delimiter_.empty()) {
-        char delim = delimiter_.c_str()[0];
-        if (delim == '\\') {
-          if (delimiter_.size() > 1) {
-            switch (delimiter_.c_str()[1]) {
-              case 'r':
-                delim = '\r';
-                break;
-              case 't':
-                delim = '\t';
-                break;
-              case 'n':
-                delim = '\n';
-                break;
-              case '\\':
-                delim = '\\';
-                break;
-              default:
-                // previous behavior
-                break;
-            }
-          }
-        }
-        logger_->log_debug("Looking for delimiter 0x%X", delim);
-        std::vector<std::shared_ptr<FlowFileRecord>> flowFiles;
-        session->import(fullPath, flowFiles, 
state.second.currentTailFilePosition_, delim);
-        logger_->log_info("%u flowfiles were received from TailFile input", 
flowFiles.size());
-
-        for (auto ffr : flowFiles) {
-          logger_->log_info("TailFile %s for %u bytes", state.first, 
ffr->getSize());
-          std::string logName = baseName + "." + 
std::to_string(state.second.currentTailFilePosition_) + "-" + 
std::to_string(state.second.currentTailFilePosition_ + ffr->getSize()) + "." + 
extension;
-          ffr->updateKeyedAttribute(PATH, fileLocation);
-          ffr->addKeyedAttribute(ABSOLUTE_PATH, fullPath);
-          ffr->updateKeyedAttribute(FILENAME, logName);
-          session->transfer(ffr, Success);
-          state.second.currentTailFilePosition_ += ffr->getSize() + 1;
-          storeState(context);
-        }
+    processFile(context, session, state.first, state.second);
+  }
 
-      } else {
-        std::shared_ptr<FlowFileRecord> flowFile = 
std::static_pointer_cast<FlowFileRecord>(session->create());
-        if (flowFile) {
-          flowFile->updateKeyedAttribute(PATH, fileLocation);
-          flowFile->addKeyedAttribute(ABSOLUTE_PATH, fullPath);
-          session->import(fullPath, flowFile, true, 
state.second.currentTailFilePosition_);
-          session->transfer(flowFile, Success);
-          logger_->log_info("TailFile %s for %llu bytes", state.first, 
flowFile->getSize());
-          std::string logName = baseName + "." + 
std::to_string(state.second.currentTailFilePosition_) + "-" + 
std::to_string(state.second.currentTailFilePosition_ + flowFile->getSize()) + 
"."
-              + extension;
-          flowFile->updateKeyedAttribute(FILENAME, logName);
-          state.second.currentTailFilePosition_ += flowFile->getSize();
-          storeState(context);
-        }
-      }
-      state.second.currentTailFileModificationTime_ = ((uint64_t) 
(statbuf.st_mtime) * 1000);
-    } else {
-      logger_->log_warn("Unable to stat file %s", fullPath);
+  if (!session->existsFlowFileInRelationship(Success)) {
+    yield();
+  }
+}
+
+void TailFile::processFile(const std::shared_ptr<core::ProcessContext> 
&context,
+                           const std::shared_ptr<core::ProcessSession> 
&session,
+                           const std::string &fileName,
+                           TailState &state) {
+  if (utils::file::FileUtils::file_size(state.fileNameWithPath()) < 
state.position_) {
+    processRotatedFiles(context, session, state);
+  }
+
+  processSingleFile(context, session, fileName, state);
+}
+
+void TailFile::processRotatedFiles(const std::shared_ptr<core::ProcessContext> 
&context,
+                                   const std::shared_ptr<core::ProcessSession> 
&session,
+                                   TailState &state) {
+    std::vector<TailState> rotated_file_states = findRotatedFiles(state);
+    for (TailState &file_state : rotated_file_states) {
+      processSingleFile(context, session, file_state.file_name_, file_state);
     }
+    state.position_ = 0;
+    state.checksum_ = 0;
+}
+
+void TailFile::processSingleFile(const std::shared_ptr<core::ProcessContext> 
&context,
+                                 const std::shared_ptr<core::ProcessSession> 
&session,
+                                 const std::string &fileName,
+                                 TailState &state) {
+  std::string full_file_name = state.fileNameWithPath();
+
+  if (utils::file::FileUtils::file_size(full_file_name) == 0u) {
+    logger_->log_warn("Unable to read file %s as it does not exist or has size 
zero", full_file_name);
+    return;
+  }
+  logger_->log_debug("Tailing file %s from %llu", full_file_name, 
state.position_);
+
+  std::size_t last_dot_position = fileName.find_last_of('.');
+  std::string baseName = fileName.substr(0, last_dot_position);
+  std::string extension = fileName.substr(last_dot_position + 1);
+
+  if (!delimiter_.empty()) {
+    char delim = delimiter_[0];
+    logger_->log_trace("Looking for delimiter 0x%X", delim);
+
+    std::vector<std::shared_ptr<FlowFileRecord>> flowFiles;
+    uint64_t checksum = state.checksum_;
+    session->import(full_file_name, flowFiles, state.position_, delim, true, 
&checksum);
+    logger_->log_info("%u flowfiles were received from TailFile input", 
flowFiles.size());
+
+    for (auto &ffr : flowFiles) {

Review comment:
       This could be a `const auto&`

##########
File path: extensions/standard-processors/processors/TailFile.cpp
##########
@@ -204,52 +230,65 @@ void TailFile::parseStateFileLine(char *buf) {
   }
 
   std::string value = equal;
-  key = trimRight(key);
-  value = trimRight(value);
+  key = utils::StringUtils::trimRight(key);
+  value = utils::StringUtils::trimRight(value);
 
   if (key == "FILENAME") {
     std::string fileLocation, fileName;
     if (utils::file::PathUtils::getFileNameAndPath(value, fileLocation, 
fileName)) {
       logger_->log_debug("State migration received path %s, file %s", 
fileLocation, fileName);
-      tail_states_.insert(std::make_pair(fileName, TailState { fileLocation, 
fileName, 0, 0 }));
+      state.insert(std::make_pair(fileName, TailState{fileLocation, fileName, 
0, 0, 0, 0}));
     } else {
-      tail_states_.insert(std::make_pair(value, TailState { fileLocation, 
value, 0, 0 }));
+      state.insert(std::make_pair(value, TailState{fileLocation, value, 0, 0, 
0, 0}));
     }
   }
   if (key == "POSITION") {
     // for backwards compatibility
-    if (tail_states_.size() != 1) {
+    if (tail_states_.size() != (std::size_t) 1) {
       throw minifi::Exception(ExceptionType::PROCESSOR_EXCEPTION, 
"Incompatible state file types");
     }
     const auto position = std::stoull(value);
     logger_->log_debug("Received position %d", position);
-    tail_states_.begin()->second.currentTailFilePosition_ = position;
+    state.begin()->second.position_ = position;
   }
   if (key.find(CURRENT_STR) == 0) {
     const auto file = key.substr(strlen(CURRENT_STR));
     std::string fileLocation, fileName;
     if (utils::file::PathUtils::getFileNameAndPath(value, fileLocation, 
fileName)) {
-      tail_states_[file].path_ = fileLocation;
-      tail_states_[file].current_file_name_ = fileName;
+      state[file].path_ = fileLocation;
+      state[file].file_name_ = fileName;
     } else {
       throw minifi::Exception(ExceptionType::PROCESSOR_EXCEPTION, "State file 
contains an invalid file name");
     }
   }
 
   if (key.find(POSITION_STR) == 0) {
     const auto file = key.substr(strlen(POSITION_STR));
-    tail_states_[file].currentTailFilePosition_ = std::stoull(value);
+    state[file].position_ = std::stoull(value);
   }
 }
 
+bool TailFile::recoverState(const std::shared_ptr<core::ProcessContext>& 
context) {
+  std::map<std::string, TailState> new_tail_states;
+  bool state_load_success = getStateFromStateManager(new_tail_states) ||
+                            getStateFromLegacyStateFile(new_tail_states);
+  if (!state_load_success) {
+    return false;
+  }
 
+  logger_->log_debug("load state succeeded");
 
-bool TailFile::recoverState(const std::shared_ptr<core::ProcessContext>& 
context) {
-  bool state_load_success = false;
+  tail_states_ = std::move(new_tail_states);
+
+  // Save the state to the state manager
+  storeState(context);

Review comment:
       If this `storeState()` call required to fix the bug? It is not always 
cheap, because one can configure the state storage to be durable, i.e. flush 
the WAL on every commit in the rocksdb case, causing a blocking IO operation.

##########
File path: libminifi/src/utils/file/PathUtils.cpp
##########
@@ -84,6 +108,19 @@ std::string PathUtils::getFullPath(const std::string& path) 
{
 #endif
 }
 
+  std::string PathUtils::globToRegex(const std::string &glob) {
+    std::string glob_copy = glob;
+    replaceAll(glob_copy, ".", "\\.");
+    replaceAll(glob_copy, "*", ".*");
+    replaceAll(glob_copy, "?", ".");
+    return glob_copy;
+  }
+
+  std::string PathUtils::replacePlaceholderWithBaseName(const std::string 
&pattern, const std::string& base_name) {
+    static const std::string PLACEHOLDER = "${filename}";

Review comment:
       I think this is not generic enough to be in shared code. I'd rather move 
`replaceOne` (as well as `replaceAll`) to StringUtils and use that from 
`TailFile`.

##########
File path: libminifi/src/utils/file/PathUtils.cpp
##########
@@ -84,6 +108,19 @@ std::string PathUtils::getFullPath(const std::string& path) 
{
 #endif
 }
 
+  std::string PathUtils::globToRegex(const std::string &glob) {
+    std::string glob_copy = glob;
+    replaceAll(glob_copy, ".", "\\.");
+    replaceAll(glob_copy, "*", ".*");
+    replaceAll(glob_copy, "?", ".");
+    return glob_copy;
+  }

Review comment:
       Taking the glob by value and modifying it instead of a forced copy may 
save us an allocation.

##########
File path: extensions/standard-processors/processors/TailFile.h
##########
@@ -33,38 +33,40 @@ namespace nifi {
 namespace minifi {
 namespace processors {
 
-// TailFile Class
+struct TailState {
+  TailState(std::string path, std::string file_name, uint64_t position, 
uint64_t timestamp,
+            uint64_t mtime, uint64_t checksum)
+      : path_(std::move(path)), file_name_(std::move(file_name)), 
position_(position), timestamp_(timestamp),
+        mtime_(mtime), checksum_(checksum) {}
 
+  TailState() : TailState("", "", 0, 0, 0, 0) {}
 
-typedef struct {
-  std::string path_;
-  std::string current_file_name_;
-  uint64_t currentTailFilePosition_;
-  uint64_t currentTailFileModificationTime_;
-} TailState;
-
-// Matched File Item for Roll over check
-typedef struct {
-  std::string fileName;
-  uint64_t modifiedTime;
-} TailMatchedFileItem;
+  std::string fileNameWithPath() const {
+    return path_ + utils::file::FileUtils::get_separator() + file_name_;
+  }
 
+  std::string path_;
+  std::string file_name_;
+  uint64_t position_;
+  uint64_t timestamp_;
+  uint64_t mtime_;

Review comment:
       Now that we have both `timestamp_` and `mtime_`, the meanings are no 
longer obvious (`mtime_` is a timestamp). Could you give `timestamp_` a more 
specific, descriptive name or add a comment?
   
   Optionally, introducing chrono types may improve readability.

##########
File path: libminifi/include/io/CRCStream.h
##########
@@ -45,6 +45,7 @@ class CRCStream : public BaseStream {
    * it will exceed our lifetime.
    */
   explicit CRCStream(T *child_stream);
+  explicit CRCStream(T *child_stream, uint64_t initial_crc);

Review comment:
       I think there's no need to mark non-single-parameter constructors 
`explicit`.

##########
File path: libminifi/src/core/ProcessSession.cpp
##########
@@ -469,6 +470,14 @@ void ProcessSession::import(std::string source, const 
std::shared_ptr<core::Flow
           invalidWrite = true;
         }
       }
+
+      io::BaseStream *stream = baseStream.get();
+      std::unique_ptr<io::CRCStream<io::BaseStream>> crcStream;
+      if (crc) {
+        crcStream = utils::make_unique<io::CRCStream<io::BaseStream>>(stream, 
*crc);
+        stream = crcStream.get();
+      }

Review comment:
       The additional responsibility of calculating the checksum of the stream 
does not belong in `ProcessSession`. Maybe providing a customization point 
(e.g. function param with identity default) for general stream 
composition/replacement is a better approach, but I need to think a bit more 
about this design issue to be able to provide a good solution with high 
confidence.
   
   The main smells that caught my attention are the specific function param 
(`crc`) with a null default and corresponding `if`s, and the newly introduced 
dependency on `CRCStream`.
   
   
https://nifi.apache.org/docs/nifi-docs/html/developer-guide.html#process_session

##########
File path: libminifi/src/core/ProcessSession.cpp
##########
@@ -574,15 +589,16 @@ void ProcessSession::import(const std::string& source, 
std::vector<std::shared_p
           if (zlen < std::numeric_limits<int>::min() || zlen > 
std::numeric_limits<int>::max()) {
             logger_->log_error("narrowing conversion failed");
           }
-          const int len = zlen;
+          const bool foundDelimiter = (delimiterPos != end);
+          const int len = (includeDelimiter && foundDelimiter) ? zlen + 1 : 
zlen;

Review comment:
       The purpose of `len` before was "exactly the same as `zlen`, after 
narrowing type conversion". This is broken with this change.
   
   The conditional should go to (or near) the `zlen` initializer.

##########
File path: libminifi/src/utils/file/FileUtils.cpp
##########
@@ -0,0 +1,58 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "utils/file/FileUtils.h"
+
+#include <zlib.h>
+
+#include <algorithm>
+#include <iostream>
+
+namespace org {
+namespace apache {
+namespace nifi {
+namespace minifi {
+namespace utils {
+namespace file {
+
+  uint64_t FileUtils::computeChecksum(std::string file_name, uint64_t 
up_to_position) {
+    constexpr uint64_t BUFFER_SIZE = 4096u;
+    std::array<char, (std::size_t) BUFFER_SIZE> buffer;
+
+    std::ifstream stream{file_name, std::ios::in | std::ios::binary};
+
+    uint64_t checksum = 0;
+    uint64_t remaining_bytes_to_be_read = up_to_position;
+
+    while (stream && remaining_bytes_to_be_read > 0) {
+      using std::min;  // for min(), to work around 
https://docs.microsoft.com/en-us/windows/win32/multimedia/min
+      stream.read(buffer.data(), min(BUFFER_SIZE, remaining_bytes_to_be_read));
+      uint64_t bytes_read = stream.gcount();
+      checksum = crc32(checksum, reinterpret_cast<unsigned 
char*>(buffer.data()), bytes_read);
+      remaining_bytes_to_be_read -= bytes_read;
+    }
+
+    return checksum;
+  }

Review comment:
       Namespace contents shouldn't be indented.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to