This is an automated email from the ASF dual-hosted git repository.

szaszm pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git

commit 2689859959e318cc679b4262b5c93b20f36228db
Author: Ferenc Gerlits <[email protected]>
AuthorDate: Fri Mar 3 14:13:32 2023 +0100

    MINIFICPP-2057 Improve the performance of ListFile
    
    - change how the Path Filter property works to match NiFi
      Before this change, files in the input root directory were always listed.
      After the change, the files in the input root directory are only listed if
      the value of the Path Filter property matches ".".
    
      Also to match NiFi's behavior, we now ignore the Path Filter property if
      Recurse Subdirectories is false.
    - list file even if we can't read its last_modification_time
    - fix a bug in the handling of files with the same modification time 
(probably never happens in real life)
    
    Closes #1518
    Signed-off-by: Marton Szasz <[email protected]>
---
 .../standard-processors/processors/ListFile.cpp    | 113 +++++++++++----------
 .../standard-processors/processors/ListFile.h      |  10 +-
 .../tests/unit/ListFileTests.cpp                   |  38 ++++++-
 3 files changed, 95 insertions(+), 66 deletions(-)

diff --git a/extensions/standard-processors/processors/ListFile.cpp 
b/extensions/standard-processors/processors/ListFile.cpp
index 579728642..0365a6e9a 100644
--- a/extensions/standard-processors/processors/ListFile.cpp
+++ b/extensions/standard-processors/processors/ListFile.cpp
@@ -19,6 +19,7 @@
 #include <filesystem>
 
 #include "utils/StringUtils.h"
+#include "utils/TimeUtil.h"
 #include "core/PropertyBuilder.h"
 #include "core/Resource.h"
 
@@ -94,7 +95,6 @@ void ListFile::onSchedule(const 
std::shared_ptr<core::ProcessContext> &context,
   }
   state_manager_ = 
std::make_unique<minifi::utils::ListingStateManager>(state_manager);
 
-  std::string input_directory_str;
   if (auto input_directory_str = context->getProperty(InputDirectory); 
!input_directory_str || input_directory_str->empty()) {
     throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Input Directory property 
missing or invalid");
   } else {
@@ -102,12 +102,13 @@ void ListFile::onSchedule(const 
std::shared_ptr<core::ProcessContext> &context,
   }
 
   context->getProperty(RecurseSubdirectories.getName(), 
recurse_subdirectories_);
+
   std::string value;
   if (context->getProperty(FileFilter.getName(), value) && !value.empty()) {
     file_filter_ = std::regex(value);
   }
 
-  if (context->getProperty(PathFilter.getName(), value) && !value.empty()) {
+  if (recurse_subdirectories_ && context->getProperty(PathFilter.getName(), 
value) && !value.empty()) {
     path_filter_ = std::regex(value);
   }
 
@@ -137,35 +138,49 @@ bool ListFile::fileMatchesFilters(const ListedFile& 
listed_file) {
     return false;
   }
 
-  if (file_filter_ && !std::regex_match(listed_file.filename.string(), 
*file_filter_)) {
-    logger_->log_debug("File '%s' does not match file filter so it will not be 
listed", listed_file.full_file_path.string());
-    return false;
-  }
+  if (file_filter_) {
+    const auto file_name = listed_file.full_file_path.filename();
 
-  if (path_filter_ && listed_file.relative_path != "." && 
!std::regex_match(listed_file.relative_path.string(), *path_filter_)) {
-    logger_->log_debug("Relative path '%s' does not match path filter so file 
'%s' will not be listed", listed_file.relative_path.string(), 
listed_file.full_file_path.string());
-    return false;
+    if (!std::regex_match(file_name.string(), *file_filter_)) {
+      logger_->log_debug("File '%s' does not match file filter so it will not 
be listed", listed_file.full_file_path.string());
+      return false;
+    }
   }
 
-  auto file_age = 
std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::system_clock::now()
 - listed_file.getLastModified());
-  if (minimum_file_age_ && file_age < *minimum_file_age_) {
-    logger_->log_debug("File '%s' does not meet the minimum file age 
requirement so it will not be listed", listed_file.full_file_path.string());
-    return false;
+  if (path_filter_) {
+    const auto relative_path = 
std::filesystem::relative(listed_file.full_file_path.parent_path(), 
input_directory_);
+    if (!std::regex_match(relative_path.string(), *path_filter_)) {
+      logger_->log_debug("Relative path '%s' does not match path filter so 
file '%s' will not be listed", relative_path.string(), 
listed_file.full_file_path.string());
+      return false;
+    }
   }
 
-  if (maximum_file_age_ && file_age > *maximum_file_age_) {
-    logger_->log_debug("File '%s' does not meet the maximum file age 
requirement so it will not be listed", listed_file.full_file_path.string());
-    return false;
-  }
+  if (minimum_file_age_ || maximum_file_age_) {
+    const auto file_age = 
std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::system_clock::now()
 - listed_file.getLastModified());
 
-  if (minimum_file_size_ && listed_file.file_size < *minimum_file_size_) {
-    logger_->log_debug("File '%s' does not meet the minimum file size 
requirement so it will not be listed", listed_file.full_file_path.string());
-    return false;
+    if (minimum_file_age_ && file_age < *minimum_file_age_) {
+      logger_->log_debug("File '%s' does not meet the minimum file age 
requirement so it will not be listed", listed_file.full_file_path.string());
+      return false;
+    }
+
+    if (maximum_file_age_ && file_age > *maximum_file_age_) {
+      logger_->log_debug("File '%s' does not meet the maximum file age 
requirement so it will not be listed", listed_file.full_file_path.string());
+      return false;
+    }
   }
 
-  if (maximum_file_size_ && *maximum_file_size_ < listed_file.file_size) {
-    logger_->log_debug("File '%s' does not meet the maximum file size 
requirement so it will not be listed", listed_file.full_file_path.string());
-    return false;
+  if (minimum_file_size_ || maximum_file_size_) {
+    const auto file_size = utils::file::file_size(listed_file.full_file_path);
+
+    if (minimum_file_size_ && file_size < *minimum_file_size_) {
+      logger_->log_debug("File '%s' does not meet the minimum file size 
requirement so it will not be listed", listed_file.full_file_path.string());
+      return false;
+    }
+
+    if (maximum_file_size_ && *maximum_file_size_ < file_size) {
+      logger_->log_debug("File '%s' does not meet the maximum file size 
requirement so it will not be listed", listed_file.full_file_path.string());
+      return false;
+    }
   }
 
   return true;
@@ -173,17 +188,14 @@ bool ListFile::fileMatchesFilters(const ListedFile& 
listed_file) {
 
 std::shared_ptr<core::FlowFile> ListFile::createFlowFile(core::ProcessSession& 
session, const ListedFile& listed_file) {
   auto flow_file = session.create();
-  session.putAttribute(flow_file, core::SpecialFlowAttribute::FILENAME, 
listed_file.filename.string());
-  session.putAttribute(flow_file, core::SpecialFlowAttribute::ABSOLUTE_PATH, 
listed_file.absolute_path.string());
-  session.putAttribute(flow_file, core::SpecialFlowAttribute::PATH, 
listed_file.relative_path == "." ?
-    (std::filesystem::path(".") / "").string() : (listed_file.relative_path / 
"").string());
-  session.putAttribute(flow_file, "file.size", 
std::to_string(listed_file.file_size));
-  if (auto last_modified_str = 
utils::file::FileUtils::get_last_modified_time_formatted_string(listed_file.full_file_path,
 "%Y-%m-%dT%H:%M:%SZ")) {
-    session.putAttribute(flow_file, "file.lastModifiedTime", 
*last_modified_str);
-  } else {
-    session.putAttribute(flow_file, "file.lastModifiedTime", "");
-    logger_->log_warn("Could not get last modification time of file '%s'", 
listed_file.full_file_path.string());
-  }
+  session.putAttribute(flow_file, core::SpecialFlowAttribute::FILENAME, 
listed_file.full_file_path.filename().string());
+  session.putAttribute(flow_file, core::SpecialFlowAttribute::ABSOLUTE_PATH, 
(listed_file.full_file_path.parent_path() / "").string());
+
+  auto relative_path = 
std::filesystem::relative(listed_file.full_file_path.parent_path(), 
input_directory_);
+  session.putAttribute(flow_file, core::SpecialFlowAttribute::PATH, 
(relative_path / "").string());
+
+  session.putAttribute(flow_file, "file.size", 
std::to_string(utils::file::file_size(listed_file.full_file_path)));
+  session.putAttribute(flow_file, "file.lastModifiedTime", 
utils::timeutils::getDateTimeStr(std::chrono::time_point_cast<std::chrono::seconds>(listed_file.last_modified_time)));
 
   if (auto permission_string = 
utils::file::FileUtils::get_permission_string(listed_file.full_file_path)) {
     session.putAttribute(flow_file, "file.permissions", *permission_string);
@@ -221,39 +233,32 @@ void ListFile::onTrigger(const 
std::shared_ptr<core::ProcessContext> &context, c
   auto latest_listing_state = stored_listing_state;
   uint32_t files_listed = 0;
 
-  auto file_list = utils::file::FileUtils::list_dir_all(input_directory_, 
logger_, recurse_subdirectories_);
-  for (const auto& [path, filename] : file_list) {
+  auto process_files = [&](const std::filesystem::path& path, const 
std::filesystem::path& filename) {
     ListedFile listed_file;
     listed_file.full_file_path = path / filename;
-    listed_file.absolute_path = path / "";
-    if (auto relative_path = utils::file::FileUtils::get_relative_path(path, 
input_directory_)) {
-      listed_file.relative_path = *relative_path;
-    } else {
-      logger_->log_warn("Failed to get group of file '%s' to input directory 
'%s'", listed_file.full_file_path.string(), input_directory_.string());
-    }
-    listed_file.file_size = 
utils::file::FileUtils::file_size(listed_file.full_file_path);
-    listed_file.filename = filename;
-    if (auto last_modified_time = 
utils::file::FileUtils::last_write_time(listed_file.full_file_path)) {
-      listed_file.last_modified_time = *last_modified_time;
+    if (auto last_modified_time = 
utils::file::last_write_time(listed_file.full_file_path)) {
+      listed_file.last_modified_time = 
std::chrono::time_point_cast<std::chrono::milliseconds>(utils::file::to_sys(*last_modified_time));
     } else {
-      logger_->log_error("Could not get last modification time of file '%s'", 
listed_file.full_file_path.string());
-      continue;
-    }
-
-    if (!fileMatchesFilters(listed_file)) {
-      continue;
+      logger_->log_warn("Could not get last modification time of file '%s'", 
listed_file.full_file_path.string());
+      listed_file.last_modified_time = {};
     }
 
     if (stored_listing_state.wasObjectListedAlready(listed_file)) {
       logger_->log_debug("File '%s' was already listed.", 
listed_file.full_file_path.string());
-      continue;
+      return true;
+    }
+
+    if (!fileMatchesFilters(listed_file)) {
+      return true;
     }
 
     auto flow_file = createFlowFile(*session, listed_file);
     session->transfer(flow_file, Success);
     ++files_listed;
     latest_listing_state.updateState(listed_file);
-  }
+    return true;
+  };
+  utils::file::list_dir(input_directory_, process_files, logger_, 
recurse_subdirectories_);
 
   state_manager_->storeState(latest_listing_state);
 
diff --git a/extensions/standard-processors/processors/ListFile.h 
b/extensions/standard-processors/processors/ListFile.h
index 05b72ba23..970f28cf0 100644
--- a/extensions/standard-processors/processors/ListFile.h
+++ b/extensions/standard-processors/processors/ListFile.h
@@ -81,19 +81,15 @@ class ListFile : public core::Processor {
  private:
   struct ListedFile : public utils::ListedObject {
     [[nodiscard]] std::chrono::time_point<std::chrono::system_clock> 
getLastModified() const override {
-      return 
std::chrono::time_point_cast<std::chrono::milliseconds>(utils::file::FileUtils::to_sys(last_modified_time));
+      return last_modified_time;
     }
 
     [[nodiscard]] std::string getKey() const override {
-      return absolute_path.string();
+      return full_file_path.string();
     }
 
-    std::filesystem::path filename;
-    std::filesystem::path absolute_path;
-    std::filesystem::file_time_type last_modified_time;
-    std::filesystem::path relative_path;
+    std::chrono::time_point<std::chrono::system_clock> last_modified_time;
     std::filesystem::path full_file_path;
-    uint64_t file_size = 0;
   };
 
   bool fileMatchesFilters(const ListedFile& listed_file);
diff --git a/extensions/standard-processors/tests/unit/ListFileTests.cpp 
b/extensions/standard-processors/tests/unit/ListFileTests.cpp
index 32172c05f..7428f4a7c 100644
--- a/extensions/standard-processors/tests/unit/ListFileTests.cpp
+++ b/extensions/standard-processors/tests/unit/ListFileTests.cpp
@@ -166,10 +166,17 @@ TEST_CASE_METHOD(ListFileTestFixture, "Test listing files 
matching the File Filt
 TEST_CASE_METHOD(ListFileTestFixture, "Test listing files matching the Path 
Filter pattern", "[testListFile]") {
   plan_->setProperty(list_file_processor_, "Path Filter", "first.*");
   test_controller_.runSession(plan_);
-  REQUIRE(verifyLogLinePresenceInPollTime(3s, "key:filename 
value:standard_file.log"));
-  REQUIRE(verifyLogLinePresenceInPollTime(3s, "key:filename 
value:empty_file.txt"));
-  REQUIRE(verifyLogLinePresenceInPollTime(3s, "key:filename 
value:sub_file_one.txt"));
-  REQUIRE(LogTestController::getInstance().countOccurrences("key:filename 
value:sub_file_two.txt") == 0);
+  CHECK(verifyLogLinePresenceInPollTime(3s, "key:filename 
value:sub_file_one.txt"));
+  CHECK(LogTestController::getInstance().countOccurrences("key:filename 
value:") == 1);
+}
+
+TEST_CASE_METHOD(ListFileTestFixture, "Test listing files matching the Path 
Filter pattern when the pattern also matches .", "[testListFile]") {
+  plan_->setProperty(list_file_processor_, "Path Filter", "second.*|\\.");
+  test_controller_.runSession(plan_);
+  CHECK(verifyLogLinePresenceInPollTime(3s, "key:filename 
value:standard_file.log"));
+  CHECK(verifyLogLinePresenceInPollTime(3s, "key:filename 
value:empty_file.txt"));
+  CHECK(verifyLogLinePresenceInPollTime(3s, "key:filename 
value:sub_file_two.txt"));
+  CHECK(LogTestController::getInstance().countOccurrences("key:filename 
value:") == 3);
 }
 
 TEST_CASE_METHOD(ListFileTestFixture, "Test listing files with restriction on 
the minimum file age", "[testListFile]") {
@@ -221,7 +228,7 @@ TEST_CASE_METHOD(ListFileTestFixture, "Test listing hidden 
files", "[testListFil
 TEST_CASE("ListFile sets attributes correctly") {
   using minifi::processors::ListFile;
 
-  const auto list_file = std::make_shared<ListFile>("GetFile");
+  const auto list_file = std::make_shared<ListFile>("ListFile");
   LogTestController::getInstance().setTrace<ListFile>();
   minifi::test::SingleProcessorTestController test_controller(list_file);
   std::filesystem::path dir = test_controller.createTempDirectory();
@@ -247,4 +254,25 @@ TEST_CASE("ListFile sets attributes correctly") {
   }
 }
 
+TEST_CASE("If a second file with the same modification time shows up later, 
then it will get listed") {
+  using minifi::processors::ListFile;
+
+  const auto list_file = std::make_shared<ListFile>("ListFile");
+  minifi::test::SingleProcessorTestController test_controller(list_file);
+
+  const auto input_dir = test_controller.createTempDirectory();
+  list_file->setProperty(ListFile::InputDirectory, input_dir.string());
+
+  const auto common_timestamp = std::chrono::file_clock::now();
+
+  const auto file_one = utils::putFileToDir(input_dir, "file_one.txt", "When I 
was one, I had just begun.");
+  std::filesystem::last_write_time(file_one, common_timestamp);
+  const auto result_one = test_controller.trigger();
+  CHECK(result_one.at(ListFile::Success).size() == 1);
+
+  const auto file_two = utils::putFileToDir(input_dir, "file_two.txt", "When I 
was two, I was nearly new.");
+  std::filesystem::last_write_time(file_two, common_timestamp);
+  const auto result_two = test_controller.trigger();
+  CHECK(result_two.at(ListFile::Success).size() == 1);
+}
 }  // namespace

Reply via email to