This is an automated email from the ASF dual-hosted git repository. szaszm pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git
commit 2689859959e318cc679b4262b5c93b20f36228db Author: Ferenc Gerlits <[email protected]> AuthorDate: Fri Mar 3 14:13:32 2023 +0100 MINIFICPP-2057 Improve the performance of ListFile - change how the Path Filter property works to match NiFi Before this change, files in the input root directory were always listed. After the change, the files in the input root directory are only listed if the value of the Path Filter property matches ".". Also to match NiFi's behavior, we now ignore the Path Filter property if Recurse Subdirectories is false. - list file even if we can't read its last_modification_time - fix a bug in the handling of files with the same modification time (probably never happens in real life) Closes #1518 Signed-off-by: Marton Szasz <[email protected]> --- .../standard-processors/processors/ListFile.cpp | 113 +++++++++++---------- .../standard-processors/processors/ListFile.h | 10 +- .../tests/unit/ListFileTests.cpp | 38 ++++++- 3 files changed, 95 insertions(+), 66 deletions(-) diff --git a/extensions/standard-processors/processors/ListFile.cpp b/extensions/standard-processors/processors/ListFile.cpp index 579728642..0365a6e9a 100644 --- a/extensions/standard-processors/processors/ListFile.cpp +++ b/extensions/standard-processors/processors/ListFile.cpp @@ -19,6 +19,7 @@ #include <filesystem> #include "utils/StringUtils.h" +#include "utils/TimeUtil.h" #include "core/PropertyBuilder.h" #include "core/Resource.h" @@ -94,7 +95,6 @@ void ListFile::onSchedule(const std::shared_ptr<core::ProcessContext> &context, } state_manager_ = std::make_unique<minifi::utils::ListingStateManager>(state_manager); - std::string input_directory_str; if (auto input_directory_str = context->getProperty(InputDirectory); !input_directory_str || input_directory_str->empty()) { throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Input Directory property missing or invalid"); } else { @@ -102,12 +102,13 @@ void ListFile::onSchedule(const std::shared_ptr<core::ProcessContext> &context, } context->getProperty(RecurseSubdirectories.getName(), recurse_subdirectories_); + std::string value; if (context->getProperty(FileFilter.getName(), value) && !value.empty()) { file_filter_ = std::regex(value); } - if (context->getProperty(PathFilter.getName(), value) && !value.empty()) { + if (recurse_subdirectories_ && context->getProperty(PathFilter.getName(), value) && !value.empty()) { path_filter_ = std::regex(value); } @@ -137,35 +138,49 @@ bool ListFile::fileMatchesFilters(const ListedFile& listed_file) { return false; } - if (file_filter_ && !std::regex_match(listed_file.filename.string(), *file_filter_)) { - logger_->log_debug("File '%s' does not match file filter so it will not be listed", listed_file.full_file_path.string()); - return false; - } + if (file_filter_) { + const auto file_name = listed_file.full_file_path.filename(); - if (path_filter_ && listed_file.relative_path != "." && !std::regex_match(listed_file.relative_path.string(), *path_filter_)) { - logger_->log_debug("Relative path '%s' does not match path filter so file '%s' will not be listed", listed_file.relative_path.string(), listed_file.full_file_path.string()); - return false; + if (!std::regex_match(file_name.string(), *file_filter_)) { + logger_->log_debug("File '%s' does not match file filter so it will not be listed", listed_file.full_file_path.string()); + return false; + } } - auto file_age = std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::system_clock::now() - listed_file.getLastModified()); - if (minimum_file_age_ && file_age < *minimum_file_age_) { - logger_->log_debug("File '%s' does not meet the minimum file age requirement so it will not be listed", listed_file.full_file_path.string()); - return false; + if (path_filter_) { + const auto relative_path = std::filesystem::relative(listed_file.full_file_path.parent_path(), input_directory_); + if (!std::regex_match(relative_path.string(), *path_filter_)) { + logger_->log_debug("Relative path '%s' does not match path filter so file '%s' will not be listed", relative_path.string(), listed_file.full_file_path.string()); + return false; + } } - if (maximum_file_age_ && file_age > *maximum_file_age_) { - logger_->log_debug("File '%s' does not meet the maximum file age requirement so it will not be listed", listed_file.full_file_path.string()); - return false; - } + if (minimum_file_age_ || maximum_file_age_) { + const auto file_age = std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::system_clock::now() - listed_file.getLastModified()); - if (minimum_file_size_ && listed_file.file_size < *minimum_file_size_) { - logger_->log_debug("File '%s' does not meet the minimum file size requirement so it will not be listed", listed_file.full_file_path.string()); - return false; + if (minimum_file_age_ && file_age < *minimum_file_age_) { + logger_->log_debug("File '%s' does not meet the minimum file age requirement so it will not be listed", listed_file.full_file_path.string()); + return false; + } + + if (maximum_file_age_ && file_age > *maximum_file_age_) { + logger_->log_debug("File '%s' does not meet the maximum file age requirement so it will not be listed", listed_file.full_file_path.string()); + return false; + } } - if (maximum_file_size_ && *maximum_file_size_ < listed_file.file_size) { - logger_->log_debug("File '%s' does not meet the maximum file size requirement so it will not be listed", listed_file.full_file_path.string()); - return false; + if (minimum_file_size_ || maximum_file_size_) { + const auto file_size = utils::file::file_size(listed_file.full_file_path); + + if (minimum_file_size_ && file_size < *minimum_file_size_) { + logger_->log_debug("File '%s' does not meet the minimum file size requirement so it will not be listed", listed_file.full_file_path.string()); + return false; + } + + if (maximum_file_size_ && *maximum_file_size_ < file_size) { + logger_->log_debug("File '%s' does not meet the maximum file size requirement so it will not be listed", listed_file.full_file_path.string()); + return false; + } } return true; @@ -173,17 +188,14 @@ bool ListFile::fileMatchesFilters(const ListedFile& listed_file) { std::shared_ptr<core::FlowFile> ListFile::createFlowFile(core::ProcessSession& session, const ListedFile& listed_file) { auto flow_file = session.create(); - session.putAttribute(flow_file, core::SpecialFlowAttribute::FILENAME, listed_file.filename.string()); - session.putAttribute(flow_file, core::SpecialFlowAttribute::ABSOLUTE_PATH, listed_file.absolute_path.string()); - session.putAttribute(flow_file, core::SpecialFlowAttribute::PATH, listed_file.relative_path == "." ? - (std::filesystem::path(".") / "").string() : (listed_file.relative_path / "").string()); - session.putAttribute(flow_file, "file.size", std::to_string(listed_file.file_size)); - if (auto last_modified_str = utils::file::FileUtils::get_last_modified_time_formatted_string(listed_file.full_file_path, "%Y-%m-%dT%H:%M:%SZ")) { - session.putAttribute(flow_file, "file.lastModifiedTime", *last_modified_str); - } else { - session.putAttribute(flow_file, "file.lastModifiedTime", ""); - logger_->log_warn("Could not get last modification time of file '%s'", listed_file.full_file_path.string()); - } + session.putAttribute(flow_file, core::SpecialFlowAttribute::FILENAME, listed_file.full_file_path.filename().string()); + session.putAttribute(flow_file, core::SpecialFlowAttribute::ABSOLUTE_PATH, (listed_file.full_file_path.parent_path() / "").string()); + + auto relative_path = std::filesystem::relative(listed_file.full_file_path.parent_path(), input_directory_); + session.putAttribute(flow_file, core::SpecialFlowAttribute::PATH, (relative_path / "").string()); + + session.putAttribute(flow_file, "file.size", std::to_string(utils::file::file_size(listed_file.full_file_path))); + session.putAttribute(flow_file, "file.lastModifiedTime", utils::timeutils::getDateTimeStr(std::chrono::time_point_cast<std::chrono::seconds>(listed_file.last_modified_time))); if (auto permission_string = utils::file::FileUtils::get_permission_string(listed_file.full_file_path)) { session.putAttribute(flow_file, "file.permissions", *permission_string); @@ -221,39 +233,32 @@ void ListFile::onTrigger(const std::shared_ptr<core::ProcessContext> &context, c auto latest_listing_state = stored_listing_state; uint32_t files_listed = 0; - auto file_list = utils::file::FileUtils::list_dir_all(input_directory_, logger_, recurse_subdirectories_); - for (const auto& [path, filename] : file_list) { + auto process_files = [&](const std::filesystem::path& path, const std::filesystem::path& filename) { ListedFile listed_file; listed_file.full_file_path = path / filename; - listed_file.absolute_path = path / ""; - if (auto relative_path = utils::file::FileUtils::get_relative_path(path, input_directory_)) { - listed_file.relative_path = *relative_path; - } else { - logger_->log_warn("Failed to get group of file '%s' to input directory '%s'", listed_file.full_file_path.string(), input_directory_.string()); - } - listed_file.file_size = utils::file::FileUtils::file_size(listed_file.full_file_path); - listed_file.filename = filename; - if (auto last_modified_time = utils::file::FileUtils::last_write_time(listed_file.full_file_path)) { - listed_file.last_modified_time = *last_modified_time; + if (auto last_modified_time = utils::file::last_write_time(listed_file.full_file_path)) { + listed_file.last_modified_time = std::chrono::time_point_cast<std::chrono::milliseconds>(utils::file::to_sys(*last_modified_time)); } else { - logger_->log_error("Could not get last modification time of file '%s'", listed_file.full_file_path.string()); - continue; - } - - if (!fileMatchesFilters(listed_file)) { - continue; + logger_->log_warn("Could not get last modification time of file '%s'", listed_file.full_file_path.string()); + listed_file.last_modified_time = {}; } if (stored_listing_state.wasObjectListedAlready(listed_file)) { logger_->log_debug("File '%s' was already listed.", listed_file.full_file_path.string()); - continue; + return true; + } + + if (!fileMatchesFilters(listed_file)) { + return true; } auto flow_file = createFlowFile(*session, listed_file); session->transfer(flow_file, Success); ++files_listed; latest_listing_state.updateState(listed_file); - } + return true; + }; + utils::file::list_dir(input_directory_, process_files, logger_, recurse_subdirectories_); state_manager_->storeState(latest_listing_state); diff --git a/extensions/standard-processors/processors/ListFile.h b/extensions/standard-processors/processors/ListFile.h index 05b72ba23..970f28cf0 100644 --- a/extensions/standard-processors/processors/ListFile.h +++ b/extensions/standard-processors/processors/ListFile.h @@ -81,19 +81,15 @@ class ListFile : public core::Processor { private: struct ListedFile : public utils::ListedObject { [[nodiscard]] std::chrono::time_point<std::chrono::system_clock> getLastModified() const override { - return std::chrono::time_point_cast<std::chrono::milliseconds>(utils::file::FileUtils::to_sys(last_modified_time)); + return last_modified_time; } [[nodiscard]] std::string getKey() const override { - return absolute_path.string(); + return full_file_path.string(); } - std::filesystem::path filename; - std::filesystem::path absolute_path; - std::filesystem::file_time_type last_modified_time; - std::filesystem::path relative_path; + std::chrono::time_point<std::chrono::system_clock> last_modified_time; std::filesystem::path full_file_path; - uint64_t file_size = 0; }; bool fileMatchesFilters(const ListedFile& listed_file); diff --git a/extensions/standard-processors/tests/unit/ListFileTests.cpp b/extensions/standard-processors/tests/unit/ListFileTests.cpp index 32172c05f..7428f4a7c 100644 --- a/extensions/standard-processors/tests/unit/ListFileTests.cpp +++ b/extensions/standard-processors/tests/unit/ListFileTests.cpp @@ -166,10 +166,17 @@ TEST_CASE_METHOD(ListFileTestFixture, "Test listing files matching the File Filt TEST_CASE_METHOD(ListFileTestFixture, "Test listing files matching the Path Filter pattern", "[testListFile]") { plan_->setProperty(list_file_processor_, "Path Filter", "first.*"); test_controller_.runSession(plan_); - REQUIRE(verifyLogLinePresenceInPollTime(3s, "key:filename value:standard_file.log")); - REQUIRE(verifyLogLinePresenceInPollTime(3s, "key:filename value:empty_file.txt")); - REQUIRE(verifyLogLinePresenceInPollTime(3s, "key:filename value:sub_file_one.txt")); - REQUIRE(LogTestController::getInstance().countOccurrences("key:filename value:sub_file_two.txt") == 0); + CHECK(verifyLogLinePresenceInPollTime(3s, "key:filename value:sub_file_one.txt")); + CHECK(LogTestController::getInstance().countOccurrences("key:filename value:") == 1); +} + +TEST_CASE_METHOD(ListFileTestFixture, "Test listing files matching the Path Filter pattern when the pattern also matches .", "[testListFile]") { + plan_->setProperty(list_file_processor_, "Path Filter", "second.*|\\."); + test_controller_.runSession(plan_); + CHECK(verifyLogLinePresenceInPollTime(3s, "key:filename value:standard_file.log")); + CHECK(verifyLogLinePresenceInPollTime(3s, "key:filename value:empty_file.txt")); + CHECK(verifyLogLinePresenceInPollTime(3s, "key:filename value:sub_file_two.txt")); + CHECK(LogTestController::getInstance().countOccurrences("key:filename value:") == 3); } TEST_CASE_METHOD(ListFileTestFixture, "Test listing files with restriction on the minimum file age", "[testListFile]") { @@ -221,7 +228,7 @@ TEST_CASE_METHOD(ListFileTestFixture, "Test listing hidden files", "[testListFil TEST_CASE("ListFile sets attributes correctly") { using minifi::processors::ListFile; - const auto list_file = std::make_shared<ListFile>("GetFile"); + const auto list_file = std::make_shared<ListFile>("ListFile"); LogTestController::getInstance().setTrace<ListFile>(); minifi::test::SingleProcessorTestController test_controller(list_file); std::filesystem::path dir = test_controller.createTempDirectory(); @@ -247,4 +254,25 @@ TEST_CASE("ListFile sets attributes correctly") { } } +TEST_CASE("If a second file with the same modification time shows up later, then it will get listed") { + using minifi::processors::ListFile; + + const auto list_file = std::make_shared<ListFile>("ListFile"); + minifi::test::SingleProcessorTestController test_controller(list_file); + + const auto input_dir = test_controller.createTempDirectory(); + list_file->setProperty(ListFile::InputDirectory, input_dir.string()); + + const auto common_timestamp = std::chrono::file_clock::now(); + + const auto file_one = utils::putFileToDir(input_dir, "file_one.txt", "When I was one, I had just begun."); + std::filesystem::last_write_time(file_one, common_timestamp); + const auto result_one = test_controller.trigger(); + CHECK(result_one.at(ListFile::Success).size() == 1); + + const auto file_two = utils::putFileToDir(input_dir, "file_two.txt", "When I was two, I was nearly new."); + std::filesystem::last_write_time(file_two, common_timestamp); + const auto result_two = test_controller.trigger(); + CHECK(result_two.at(ListFile::Success).size() == 1); +} } // namespace
