This is an automated email from the ASF dual-hosted git repository. szaszm pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git
commit 53487b1233838f7e5c8602c0cf8aacbd897cf8f0 Author: Gabor Gyimesi <[email protected]> AuthorDate: Wed Apr 20 17:50:03 2022 +0200 MINIFICPP-1760 Implement ListFile processor Closes #1283 Signed-off-by: Marton Szasz <[email protected]> --- PROCESSORS.md | 41 +++ README.md | 2 +- .../standard-processors/processors/ListFile.cpp | 278 +++++++++++++++++++++ .../standard-processors/processors/ListFile.h | 93 +++++++ .../tests/unit/ListFileTests.cpp | 206 +++++++++++++++ libminifi/include/utils/file/FileUtils.h | 186 +++++++++++++- libminifi/src/utils/file/FileUtils.cpp | 16 +- libminifi/test/unit/FileUtilsTests.cpp | 24 ++ 8 files changed, 837 insertions(+), 9 deletions(-) diff --git a/PROCESSORS.md b/PROCESSORS.md index 464539960..0d7c8692e 100644 --- a/PROCESSORS.md +++ b/PROCESSORS.md @@ -39,6 +39,7 @@ - [ListAzureDataLakeStorage](#listazuredatalakestorage) - [ListenHTTP](#listenhttp) - [ListenSyslog](#listensyslog) +- [ListFile](#listfile) - [ListS3](#lists3) - [ListSFTP](#listsftp) - [LogAttribute](#logattribute) @@ -1051,6 +1052,46 @@ In the list below, the names of required properties appear in bold. Any other pr +## ListFile + +### Description + +Retrieves a listing of files from the local filesystem. For each file that is listed, creates a FlowFile that represents the file so that it can be fetched in conjunction with FetchFile. +### Properties + +In the list below, the names of required properties appear in bold. Any other properties (not in bold) are considered optional. The table also indicates any default values, and whether a property supports the NiFi Expression Language. + +| Name | Default Value | Allowable Values | Description | +| - | - | - | - | +|**Input Directory**|||The input directory from which files to pull files| +|**Recurse Subdirectories**|true||Indicates whether to list files from subdirectories of the directory| +|File Filter|||Only files whose names match the given regular expression will be picked up| +|Path Filter|||When Recurse Subdirectories is true, then only subdirectories whose path matches the given regular expression will be scanned| +|**Minimum File Age**|0 sec||The minimum age that a file must be in order to be pulled; any file younger than this amount of time (according to last modification date) will be ignored| +|Maximum File Age|||The maximum age that a file must be in order to be pulled; any file older than this amount of time (according to last modification date) will be ignored| +|**Minimum File Size**|0 B||The minimum size that a file must be in order to be pulled| +|Maximum File Size|||The maximum size that a file can be in order to be pulled| +|**Ignore Hidden Files**|true||Indicates whether or not hidden files should be ignored| +### Relationships + +| Name | Description | +| - | - | +|success|All FlowFiles that are received are routed to success| + +### Output Attributes + +| Attribute | Relationship | Description | +|----------------------------|--------------|--------------------------------------------------------------------| +| _filename_ | success | The name of the file that was read from filesystem. | +| _path_ | success | The path is set to the relative path of the file's directory on filesystem compared to the Input Directory property. For example, if Input Directory is set to /tmp, then files picked up from /tmp will have the path attribute set to "./". If the Recurse Subdirectories property is set to true and a file is picked up from /tmp/abc/1/2/3, then the path attribute will be set to "abc/1/2/3/". | +| _absolute.path_ | success | The absolute.path is set to the absolute path of the file's directory on filesystem. For example, if the Input Directory property is set to /tmp, then files picked up from /tmp will have the path attribute set to "/tmp/". If the Recurse Subdirectories property is set to true and a file is picked up from /tmp/abc/1/2/3, then the path attribute will be set to "/tmp/abc/1/2/3/". | +| _file.owner_ | success | The user that owns the file in filesystem | +| _file.group_ | success | The group that owns the file in filesystem | +| _file.size_ | success | The number of bytes in the file in filesystem | +| _file.permissions_ | success | The permissions for the file in filesystem. This is formatted as 3 characters for the owner, 3 for the group, and 3 for other users. For example rw-rw-r-- | +| _file.lastModifiedTime_ | success | The timestamp of when the file in filesystem was last modified as 'yyyy-MM-dd'T'HH:mm:ssZ' | + + ## ListS3 ### Description diff --git a/README.md b/README.md index 6fa381fd5..d4fe1e3f0 100644 --- a/README.md +++ b/README.md @@ -65,7 +65,7 @@ The following table lists the base set of processors. | Extension Set | Processors [...] |---------------|:------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------ [...] -| **Base** | [AppendHostInfo](PROCESSORS.md#appendhostinfo)<br/>[DefragmentText](PROCESSORS.md#defragmenttext)<br/>[ExecuteProcess](PROCESSORS.md#executeprocess)<br/>[ExtractText](PROCESSORS.md#extracttext)<br/>[FetchFile](PROCESSORS.md#fetchfile)<br/>[GenerateFlowFile](PROCESSORS.md#generateflowfile)<br/>[GetFile](PROCESSORS.md#getfile)<br/>[GetTCP](PROCESSORS.md#gettcp)<br/>[HashContent](PROCESSORS.md#hashcontent)<br/>[ListenSyslog](PROCESSORS.md#listensyslog)<br/>[LogAttribute](P [...] +| **Base** | [AppendHostInfo](PROCESSORS.md#appendhostinfo)<br/>[DefragmentText](PROCESSORS.md#defragmenttext)<br/>[ExecuteProcess](PROCESSORS.md#executeprocess)<br/>[ExtractText](PROCESSORS.md#extracttext)<br/>[FetchFile](PROCESSORS.md#fetchfile)<br/>[GenerateFlowFile](PROCESSORS.md#generateflowfile)<br/>[GetFile](PROCESSORS.md#getfile)<br/>[GetTCP](PROCESSORS.md#gettcp)<br/>[HashContent](PROCESSORS.md#hashcontent)<br/>[ListenSyslog](PROCESSORS.md#listensyslog)<br/>[ListFile](PROCE [...] The next table outlines CMAKE flags that correspond with MiNiFi extensions. Extensions that are enabled by default ( such as CURL ), can be disabled with the respective CMAKE flag on the command line. diff --git a/extensions/standard-processors/processors/ListFile.cpp b/extensions/standard-processors/processors/ListFile.cpp new file mode 100644 index 000000000..6387976d3 --- /dev/null +++ b/extensions/standard-processors/processors/ListFile.cpp @@ -0,0 +1,278 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include "ListFile.h" + +#include <filesystem> + +#include "utils/FileReaderCallback.h" +#include "utils/StringUtils.h" +#include "core/Resource.h" + +namespace org::apache::nifi::minifi::processors { + +const core::Property ListFile::InputDirectory( + core::PropertyBuilder::createProperty("Input Directory") + ->withDescription("The input directory from which files to pull files") + ->isRequired(true) + ->build()); + +const core::Property ListFile::RecurseSubdirectories( + core::PropertyBuilder::createProperty("Recurse Subdirectories") + ->withDescription("Indicates whether to list files from subdirectories of the directory") + ->withDefaultValue(true) + ->isRequired(true) + ->build()); + +const core::Property ListFile::FileFilter( + core::PropertyBuilder::createProperty("File Filter") + ->withDescription("Only files whose names match the given regular expression will be picked up") + ->build()); + +const core::Property ListFile::PathFilter( + core::PropertyBuilder::createProperty("Path Filter") + ->withDescription("When Recurse Subdirectories is true, then only subdirectories whose path matches the given regular expression will be scanned") + ->build()); + +const core::Property ListFile::MinimumFileAge( + core::PropertyBuilder::createProperty("Minimum File Age") + ->withDescription("The minimum age that a file must be in order to be pulled; any file younger than this amount of time (according to last modification date) will be ignored") + ->isRequired(true) + ->withDefaultValue<core::TimePeriodValue>("0 sec") + ->build()); + +const core::Property ListFile::MaximumFileAge( + core::PropertyBuilder::createProperty("Maximum File Age") + ->withDescription("The maximum age that a file must be in order to be pulled; any file older than this amount of time (according to last modification date) will be ignored") + ->build()); + +const core::Property ListFile::MinimumFileSize( + core::PropertyBuilder::createProperty("Minimum File Size") + ->withDescription("The minimum size that a file must be in order to be pulled") + ->isRequired(true) + ->withDefaultValue<core::DataSizeValue>("0 B") + ->build()); + +const core::Property ListFile::MaximumFileSize( + core::PropertyBuilder::createProperty("Maximum File Size") + ->withDescription("The maximum size that a file can be in order to be pulled") + ->build()); + +const core::Property ListFile::IgnoreHiddenFiles( + core::PropertyBuilder::createProperty("Ignore Hidden Files") + ->withDescription("Indicates whether or not hidden files should be ignored") + ->withDefaultValue(true) + ->isRequired(true) + ->build()); + +const core::Relationship ListFile::Success("success", "All FlowFiles that are received are routed to success"); + +void ListFile::initialize() { + setSupportedProperties({ + InputDirectory, + RecurseSubdirectories, + FileFilter, + PathFilter, + MinimumFileAge, + MaximumFileAge, + MinimumFileSize, + MaximumFileSize, + IgnoreHiddenFiles + }); + + setSupportedRelationships({ + Success + }); +} + +void ListFile::onSchedule(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSessionFactory> &/*sessionFactory*/) { + gsl_Expects(context); + + auto state_manager = context->getStateManager(); + if (state_manager == nullptr) { + throw Exception(PROCESSOR_EXCEPTION, "Failed to get StateManager"); + } + state_manager_ = std::make_unique<minifi::utils::ListingStateManager>(state_manager); + + if (!context->getProperty(InputDirectory.getName(), input_directory_) || input_directory_.empty()) { + throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Input Directory property missing or invalid"); + } + + context->getProperty(RecurseSubdirectories.getName(), recurse_subdirectories_); + std::string value; + if (context->getProperty(FileFilter.getName(), value) && !value.empty()) { + file_filter_ = std::regex(value); + } + + if (context->getProperty(PathFilter.getName(), value) && !value.empty()) { + path_filter_ = std::regex(value); + } + + if (auto minimum_file_age = context->getProperty<core::TimePeriodValue>(MinimumFileAge)) { + minimum_file_age_ = minimum_file_age->getMilliseconds(); + } + + if (auto maximum_file_age = context->getProperty<core::TimePeriodValue>(MaximumFileAge)) { + maximum_file_age_ = maximum_file_age->getMilliseconds(); + } + + uint64_t int_value = 0; + if (context->getProperty(MinimumFileSize.getName(), value) && !value.empty() && core::Property::StringToInt(value, int_value)) { + minimum_file_size_ = int_value; + } + + if (context->getProperty(MaximumFileSize.getName(), value) && !value.empty() && core::Property::StringToInt(value, int_value)) { + maximum_file_size_ = int_value; + } + + context->getProperty(IgnoreHiddenFiles.getName(), ignore_hidden_files_); +} + +bool ListFile::fileMatchesFilters(const ListedFile& listed_file) { + if (ignore_hidden_files_ && utils::file::FileUtils::is_hidden(listed_file.absolute_path)) { + logger_->log_debug("File '%s' is hidden so it will not be listed", listed_file.absolute_path); + return false; + } + + if (file_filter_ && !std::regex_match(listed_file.filename, *file_filter_)) { + logger_->log_debug("File '%s' does not match file filter so it will not be listed", listed_file.absolute_path); + return false; + } + + if (path_filter_ && listed_file.relative_path != "." && !std::regex_match(listed_file.relative_path, *path_filter_)) { + logger_->log_debug("Relative path '%s' does not match path filter so file '%s' will not be listed", listed_file.relative_path, listed_file.absolute_path); + return false; + } + + auto file_age = std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::system_clock::now() - listed_file.getLastModified()); + if (minimum_file_age_ && file_age < *minimum_file_age_) { + logger_->log_debug("File '%s' does not meet the minimum file age requirement so it will not be listed", listed_file.absolute_path); + return false; + } + + if (maximum_file_age_ && file_age > *maximum_file_age_) { + logger_->log_debug("File '%s' does not meet the maximum file age requirement so it will not be listed", listed_file.absolute_path); + return false; + } + + if (minimum_file_size_ && listed_file.file_size < *minimum_file_size_) { + logger_->log_debug("File '%s' does not meet the minimum file size requirement so it will not be listed", listed_file.absolute_path); + return false; + } + + if (maximum_file_size_ && *maximum_file_size_ < listed_file.file_size) { + logger_->log_debug("File '%s' does not meet the maximum file size requirement so it will not be listed", listed_file.absolute_path); + return false; + } + + return true; +} + +std::shared_ptr<core::FlowFile> ListFile::createFlowFile(core::ProcessSession& session, const ListedFile& listed_file) { + auto flow_file = session.create(); + session.putAttribute(flow_file, core::SpecialFlowAttribute::FILENAME, listed_file.filename); + session.putAttribute(flow_file, core::SpecialFlowAttribute::ABSOLUTE_PATH, listed_file.absolute_path); + session.putAttribute(flow_file, core::SpecialFlowAttribute::PATH, listed_file.relative_path == "." ? + std::string(".") + utils::file::FileUtils::get_separator() : listed_file.relative_path + utils::file::FileUtils::get_separator()); + session.putAttribute(flow_file, "file.size", std::to_string(listed_file.file_size)); + if (auto last_modified_str = utils::file::FileUtils::get_last_modified_time_formatted_string(listed_file.absolute_path, "%Y-%m-%dT%H:%M:%SZ")) { + session.putAttribute(flow_file, "file.lastModifiedTime", *last_modified_str); + } else { + session.putAttribute(flow_file, "file.lastModifiedTime", ""); + logger_->log_warn("Could not get last modification time of file '%s'", listed_file.absolute_path); + } + + if (auto permission_string = utils::file::FileUtils::get_permission_string(listed_file.absolute_path)) { + session.putAttribute(flow_file, "file.permissions", *permission_string); + } else { + logger_->log_warn("Failed to get permissions of file '%s'", listed_file.absolute_path); + session.putAttribute(flow_file, "file.permissions", ""); + } + + if (auto owner = utils::file::FileUtils::get_file_owner(listed_file.absolute_path)) { + session.putAttribute(flow_file, "file.owner", *owner); + } else { + logger_->log_warn("Failed to get owner of file '%s'", listed_file.absolute_path); + session.putAttribute(flow_file, "file.owner", ""); + } + +#ifndef WIN32 + if (auto group = utils::file::FileUtils::get_file_group(listed_file.absolute_path)) { + session.putAttribute(flow_file, "file.group", *group); + } else { + logger_->log_warn("Failed to get group of file '%s'", listed_file.absolute_path); + session.putAttribute(flow_file, "file.group", ""); + } +#else + session.putAttribute(flow_file, "file.group", ""); +#endif + + return flow_file; +} + +void ListFile::onTrigger(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSession> &session) { + gsl_Expects(context && session); + logger_->log_trace("ListFile onTrigger"); + + auto stored_listing_state = state_manager_->getCurrentState(); + auto latest_listing_state = stored_listing_state; + uint32_t files_listed = 0; + + auto file_list = utils::file::FileUtils::list_dir_all(input_directory_, logger_, recurse_subdirectories_); + for (const auto& [path, filename] : file_list) { + ListedFile listed_file; + listed_file.absolute_path = (std::filesystem::path(path) / filename).string(); + if (auto relative_path = utils::file::FileUtils::get_relative_path(path, input_directory_)) { + listed_file.relative_path = *relative_path; + } else { + logger_->log_warn("Failed to get group of file '%s' to input directory '%s'", listed_file.absolute_path, input_directory_); + } + listed_file.file_size = utils::file::FileUtils::file_size(listed_file.absolute_path); + listed_file.filename = filename; + if (auto last_modified_time = utils::file::FileUtils::last_write_time(listed_file.absolute_path)) { + listed_file.last_modified_time = *last_modified_time; + } else { + logger_->log_error("Could not get last modification time of file '%s'", listed_file.absolute_path); + continue; + } + + if (!fileMatchesFilters(listed_file)) { + continue; + } + + if (stored_listing_state.wasObjectListedAlready(listed_file)) { + logger_->log_debug("File '%s' was already listed.", listed_file.absolute_path); + continue; + } + + auto flow_file = createFlowFile(*session, listed_file); + session->transfer(flow_file, Success); + ++files_listed; + latest_listing_state.updateState(listed_file); + } + + state_manager_->storeState(latest_listing_state); + + if (files_listed == 0) { + logger_->log_debug("No new files were found in input directory '%s' to list", input_directory_); + context->yield(); + } +} + +REGISTER_RESOURCE(ListFile, "Retrieves a listing of files from the local filesystem. For each file that is listed, " + "creates a FlowFile that represents the file so that it can be fetched in conjunction with FetchFile."); + +} // namespace org::apache::nifi::minifi::processors diff --git a/extensions/standard-processors/processors/ListFile.h b/extensions/standard-processors/processors/ListFile.h new file mode 100644 index 000000000..cef41b6b1 --- /dev/null +++ b/extensions/standard-processors/processors/ListFile.h @@ -0,0 +1,93 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#pragma once + +#include <string> +#include <regex> +#include <optional> +#include <memory> + +#include "core/Processor.h" +#include "core/ProcessSession.h" +#include "core/Property.h" +#include "core/logging/LoggerConfiguration.h" +#include "utils/Enum.h" +#include "utils/ListingStateManager.h" +#include "utils/file/FileUtils.h" + +namespace org::apache::nifi::minifi::processors { + +class ListFile : public core::Processor { + public: + explicit ListFile(const std::string& name, const utils::Identifier& uuid = {}) + : core::Processor(name, uuid) { + } + + EXTENSIONAPI static const core::Property InputDirectory; + EXTENSIONAPI static const core::Property RecurseSubdirectories; + EXTENSIONAPI static const core::Property FileFilter; + EXTENSIONAPI static const core::Property PathFilter; + EXTENSIONAPI static const core::Property MinimumFileAge; + EXTENSIONAPI static const core::Property MaximumFileAge; + EXTENSIONAPI static const core::Property MinimumFileSize; + EXTENSIONAPI static const core::Property MaximumFileSize; + EXTENSIONAPI static const core::Property IgnoreHiddenFiles; + + EXTENSIONAPI static const core::Relationship Success; + + void initialize() override; + void onSchedule(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSessionFactory> &session_factory) override; + void onTrigger(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSession> &session) override; + + core::annotation::Input getInputRequirement() const override { + return core::annotation::Input::INPUT_FORBIDDEN; + } + + private: + struct ListedFile : public utils::ListedObject { + [[nodiscard]] std::chrono::time_point<std::chrono::system_clock> getLastModified() const override { + return std::chrono::time_point_cast<std::chrono::milliseconds>(utils::file::FileUtils::to_sys_time_point(last_modified_time)); + } + + [[nodiscard]] std::string getKey() const override { + return absolute_path; + } + + std::string filename; + std::string absolute_path; + std::filesystem::file_time_type last_modified_time; + std::string relative_path; + uint64_t file_size = 0; + }; + + bool fileMatchesFilters(const ListedFile& listed_file); + std::shared_ptr<core::FlowFile> createFlowFile(core::ProcessSession& session, const ListedFile& listed_file); + + std::shared_ptr<core::logging::Logger> logger_ = core::logging::LoggerFactory<ListFile>::getLogger(); + std::string input_directory_; + std::unique_ptr<minifi::utils::ListingStateManager> state_manager_; + bool recurse_subdirectories_ = true; + std::optional<std::regex> file_filter_; + std::optional<std::regex> path_filter_; + std::optional<std::chrono::milliseconds> minimum_file_age_; + std::optional<std::chrono::milliseconds> maximum_file_age_; + std::optional<uint64_t> minimum_file_size_; + std::optional<uint64_t> maximum_file_size_; + bool ignore_hidden_files_ = true; +}; + +} // namespace org::apache::nifi::minifi::processors diff --git a/extensions/standard-processors/tests/unit/ListFileTests.cpp b/extensions/standard-processors/tests/unit/ListFileTests.cpp new file mode 100644 index 000000000..06314f53b --- /dev/null +++ b/extensions/standard-processors/tests/unit/ListFileTests.cpp @@ -0,0 +1,206 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include <memory> +#include <string> + +#include "TestBase.h" +#include "Catch.h" +#include "core/Property.h" +#include "core/Processor.h" +#include "processors/LogAttribute.h" +#include "processors/ListFile.h" +#include "utils/TestUtils.h" +#include "utils/IntegrationTestUtils.h" + +using namespace std::literals::chrono_literals; + +namespace { + +using org::apache::nifi::minifi::utils::verifyLogLinePresenceInPollTime; + +class ListFileTestFixture { + public: + static const std::string FORMAT_STRING; + ListFileTestFixture(); + + protected: + TestController test_controller_; + std::shared_ptr<TestPlan> plan_; + const std::string input_dir_; + std::shared_ptr<core::Processor> list_file_processor_; + std::string hidden_file_path_; + std::string empty_file_abs_path_; + std::string standard_file_abs_path_; + std::string first_sub_file_abs_path_; + std::string second_sub_file_abs_path_; +}; + +const std::string ListFileTestFixture::FORMAT_STRING = "%Y-%m-%dT%H:%M:%SZ"; + +ListFileTestFixture::ListFileTestFixture() + : plan_(test_controller_.createPlan()), + input_dir_(test_controller_.createTempDirectory()) { + LogTestController::getInstance().setTrace<TestPlan>(); + LogTestController::getInstance().setTrace<minifi::processors::ListFile>(); + LogTestController::getInstance().setTrace<minifi::processors::LogAttribute>(); + + REQUIRE(!input_dir_.empty()); + + list_file_processor_ = plan_->addProcessor("ListFile", "ListFile"); + plan_->setProperty(list_file_processor_, "Input Directory", input_dir_); + auto log_attribute = plan_->addProcessor("LogAttribute", "logAttribute", core::Relationship("success", "description"), true); + plan_->setProperty(log_attribute, "FlowFiles To Log", "0"); + + hidden_file_path_ = utils::putFileToDir(input_dir_, ".hidden_file.txt", "hidden"); + standard_file_abs_path_ = utils::putFileToDir(input_dir_, "standard_file.log", "test"); + empty_file_abs_path_ = utils::putFileToDir(input_dir_, "empty_file.txt", ""); + utils::file::FileUtils::create_dir(input_dir_ + utils::file::FileUtils::get_separator() + "first_subdir"); + first_sub_file_abs_path_ = utils::putFileToDir(input_dir_ + utils::file::FileUtils::get_separator() + "first_subdir", "sub_file_one.txt", "the"); + utils::file::FileUtils::create_dir(input_dir_ + utils::file::FileUtils::get_separator() + "second_subdir"); + second_sub_file_abs_path_ = utils::putFileToDir(input_dir_ + utils::file::FileUtils::get_separator() + "second_subdir", "sub_file_two.txt", "some_other_content"); + + auto last_write_time = *utils::file::FileUtils::last_write_time(standard_file_abs_path_); + utils::file::FileUtils::set_last_write_time(empty_file_abs_path_, last_write_time - 1h); + utils::file::FileUtils::set_last_write_time(first_sub_file_abs_path_, last_write_time - 2h); + utils::file::FileUtils::set_last_write_time(second_sub_file_abs_path_, last_write_time - 3h); +#ifndef WIN32 + REQUIRE(0 == utils::file::FileUtils::set_permissions(input_dir_ + utils::file::FileUtils::get_separator() + "empty_file.txt", 0755)); + REQUIRE(0 == utils::file::FileUtils::set_permissions(input_dir_ + utils::file::FileUtils::get_separator() + "standard_file.log", 0644)); +#endif + +#ifdef WIN32 + const auto hide_file_error = utils::file::FileUtils::hide_file(hidden_file_path_.c_str()); + REQUIRE(!hide_file_error); +#endif +} + +TEST_CASE_METHOD(ListFileTestFixture, "Input Directory is empty", "[testListFile]") { + plan_->setProperty(list_file_processor_, "Input Directory", ""); + REQUIRE_THROWS_AS(test_controller_.runSession(plan_, true), minifi::Exception); +} + +TEST_CASE_METHOD(ListFileTestFixture, "Test listing files only once with default parameters", "[testListFile]") { + test_controller_.runSession(plan_); + REQUIRE(verifyLogLinePresenceInPollTime(3s, "key:filename value:standard_file.log")); + REQUIRE(verifyLogLinePresenceInPollTime(3s, "key:filename value:empty_file.txt")); + REQUIRE(verifyLogLinePresenceInPollTime(3s, "key:filename value:sub_file_one.txt")); + REQUIRE(verifyLogLinePresenceInPollTime(3s, "key:filename value:sub_file_two.txt")); + REQUIRE(verifyLogLinePresenceInPollTime(3s, "key:absolute.path value:" + empty_file_abs_path_)); + REQUIRE(verifyLogLinePresenceInPollTime(3s, "key:absolute.path value:" + standard_file_abs_path_)); + REQUIRE(verifyLogLinePresenceInPollTime(3s, "key:absolute.path value:" + first_sub_file_abs_path_)); + REQUIRE(verifyLogLinePresenceInPollTime(3s, "key:absolute.path value:" + second_sub_file_abs_path_)); + REQUIRE(LogTestController::getInstance().countOccurrences(std::string("key:path value:.") + utils::file::FileUtils::get_separator() + "\n") == 2); + REQUIRE(verifyLogLinePresenceInPollTime(3s, std::string("key:path value:first_subdir") + utils::file::FileUtils::get_separator())); + REQUIRE(verifyLogLinePresenceInPollTime(3s, std::string("key:path value:second_subdir") + utils::file::FileUtils::get_separator())); + REQUIRE(LogTestController::getInstance().countOccurrences("key:filename value:.hidden_file.txt") == 0); + REQUIRE(verifyLogLinePresenceInPollTime(3s, "key:file.size value:0")); + REQUIRE(verifyLogLinePresenceInPollTime(3s, "key:file.size value:4")); +#ifndef WIN32 + REQUIRE(verifyLogLinePresenceInPollTime(3s, "key:file.permissions value:rwxr-xr-x")); + REQUIRE(verifyLogLinePresenceInPollTime(3s, "key:file.permissions value:rw-r--r--")); + if (auto group = utils::file::FileUtils::get_file_group(standard_file_abs_path_)) { + REQUIRE(LogTestController::getInstance().countOccurrences("key:file.group value:" + *group) == 4); + } +#endif + if (auto owner = utils::file::FileUtils::get_file_owner(standard_file_abs_path_)) { + REQUIRE(LogTestController::getInstance().countOccurrences("key:file.owner value:" + *owner) == 4); + } + REQUIRE(verifyLogLinePresenceInPollTime(3s, "key:file.lastModifiedTime value:" + *utils::file::FileUtils::get_last_modified_time_formatted_string(empty_file_abs_path_, FORMAT_STRING))); + REQUIRE(verifyLogLinePresenceInPollTime(3s, "key:file.lastModifiedTime value:" + *utils::file::FileUtils::get_last_modified_time_formatted_string(standard_file_abs_path_, FORMAT_STRING))); + REQUIRE(verifyLogLinePresenceInPollTime(3s, "key:file.lastModifiedTime value:" + *utils::file::FileUtils::get_last_modified_time_formatted_string(first_sub_file_abs_path_, FORMAT_STRING))); + REQUIRE(verifyLogLinePresenceInPollTime(3s, "key:file.lastModifiedTime value:" + *utils::file::FileUtils::get_last_modified_time_formatted_string(second_sub_file_abs_path_, FORMAT_STRING))); + plan_->reset(); + LogTestController::getInstance().resetStream(LogTestController::getInstance().log_output); + test_controller_.runSession(plan_, true); + REQUIRE_FALSE(LogTestController::getInstance().contains("key:file.size", 0s, 0ms)); +} + +TEST_CASE_METHOD(ListFileTestFixture, "Test turning off recursive file listing", "[testListFile]") { + plan_->setProperty(list_file_processor_, "Recurse Subdirectories", "false"); + test_controller_.runSession(plan_); + REQUIRE(verifyLogLinePresenceInPollTime(3s, "key:filename value:standard_file.log")); + REQUIRE(verifyLogLinePresenceInPollTime(3s, "key:filename value:empty_file.txt")); + REQUIRE_FALSE(LogTestController::getInstance().contains("key:filename value:sub_file_one.txt", 0s, 0ms)); + REQUIRE_FALSE(LogTestController::getInstance().contains("key:filename value:sub_file_two.txt", 0s, 0ms)); +} + +TEST_CASE_METHOD(ListFileTestFixture, "Test listing files matching the File Filter pattern", "[testListFile]") { + plan_->setProperty(list_file_processor_, "File Filter", "stand\\w+\\.log"); + test_controller_.runSession(plan_); + REQUIRE(verifyLogLinePresenceInPollTime(3s, "key:filename value:standard_file.log")); + REQUIRE_FALSE(LogTestController::getInstance().contains("key:filename value:empty_file.txt", 0s, 0ms)); + REQUIRE_FALSE(LogTestController::getInstance().contains("key:filename value:sub_file_one.txt", 0s, 0ms)); + REQUIRE_FALSE(LogTestController::getInstance().contains("key:filename value:sub_file_two.txt", 0s, 0ms)); +} + +TEST_CASE_METHOD(ListFileTestFixture, "Test listing files matching the Path Filter pattern", "[testListFile]") { + plan_->setProperty(list_file_processor_, "Path Filter", "first.*"); + test_controller_.runSession(plan_); + REQUIRE(verifyLogLinePresenceInPollTime(3s, "key:filename value:standard_file.log")); + REQUIRE(verifyLogLinePresenceInPollTime(3s, "key:filename value:empty_file.txt")); + REQUIRE(verifyLogLinePresenceInPollTime(3s, "key:filename value:sub_file_one.txt")); + REQUIRE(LogTestController::getInstance().countOccurrences("key:filename value:sub_file_two.txt") == 0); +} + +TEST_CASE_METHOD(ListFileTestFixture, "Test listing files with restriction on the minimum file age", "[testListFile]") { + plan_->setProperty(list_file_processor_, "Minimum File Age", "90 min"); + test_controller_.runSession(plan_); + REQUIRE(verifyLogLinePresenceInPollTime(3s, "key:filename value:sub_file_one.txt")); + REQUIRE(verifyLogLinePresenceInPollTime(3s, "key:filename value:sub_file_two.txt")); + REQUIRE(LogTestController::getInstance().countOccurrences("key:filename value:empty_file.txt") == 0); + REQUIRE(LogTestController::getInstance().countOccurrences("key:filename value:standard_file.log") == 0); +} + +TEST_CASE_METHOD(ListFileTestFixture, "Test listing files with restriction on the maximum file age", "[testListFile]") { + plan_->setProperty(list_file_processor_, "Maximum File Age", "90 min"); + test_controller_.runSession(plan_); + REQUIRE(verifyLogLinePresenceInPollTime(3s, "key:filename value:empty_file.txt")); + REQUIRE(verifyLogLinePresenceInPollTime(3s, "key:filename value:standard_file.log")); + REQUIRE(LogTestController::getInstance().countOccurrences("key:filename value:sub_file_one.txt") == 0); + REQUIRE(LogTestController::getInstance().countOccurrences("key:filename value:sub_file_two.txt") == 0); +} + +TEST_CASE_METHOD(ListFileTestFixture, "Test listing files with restriction on the minimum file size", "[testListFile]") { + plan_->setProperty(list_file_processor_, "Minimum File Size", "4 B"); + test_controller_.runSession(plan_); + REQUIRE(verifyLogLinePresenceInPollTime(3s, "key:filename value:standard_file.log")); + REQUIRE(verifyLogLinePresenceInPollTime(3s, "key:filename value:sub_file_two.txt")); + REQUIRE(LogTestController::getInstance().countOccurrences("key:filename value:empty_file.txt") == 0); + REQUIRE(LogTestController::getInstance().countOccurrences("key:filename value:sub_file_one.txt") == 0); +} + +TEST_CASE_METHOD(ListFileTestFixture, "Test listing files with restriction on the maximum file size", "[testListFile]") { + plan_->setProperty(list_file_processor_, "Maximum File Size", "4 B"); + test_controller_.runSession(plan_); + REQUIRE(verifyLogLinePresenceInPollTime(3s, "key:filename value:standard_file.log")); + REQUIRE(verifyLogLinePresenceInPollTime(3s, "key:filename value:empty_file.txt")); + REQUIRE(verifyLogLinePresenceInPollTime(3s, "key:filename value:sub_file_one.txt")); + REQUIRE(LogTestController::getInstance().countOccurrences("key:filename value:sub_file_two.txt") == 0); +} + +TEST_CASE_METHOD(ListFileTestFixture, "Test listing hidden files", "[testListFile]") { + plan_->setProperty(list_file_processor_, "Ignore Hidden Files", "false"); + test_controller_.runSession(plan_); + REQUIRE(verifyLogLinePresenceInPollTime(3s, "key:filename value:standard_file.log")); + REQUIRE(verifyLogLinePresenceInPollTime(3s, "key:filename value:empty_file.txt")); + REQUIRE(verifyLogLinePresenceInPollTime(3s, "key:filename value:sub_file_one.txt")); + REQUIRE(verifyLogLinePresenceInPollTime(3s, "key:filename value:sub_file_two.txt")); + REQUIRE(verifyLogLinePresenceInPollTime(3s, "key:filename value:.hidden_file.txt")); +} + +} // namespace diff --git a/libminifi/include/utils/file/FileUtils.h b/libminifi/include/utils/file/FileUtils.h index c3b85b336..75ad4e833 100644 --- a/libminifi/include/utils/file/FileUtils.h +++ b/libminifi/include/utils/file/FileUtils.h @@ -25,16 +25,20 @@ #include <utility> #include <vector> #include <cstdio> +#include <algorithm> #ifndef WIN32 #include <unistd.h> #include <sys/stat.h> //NOLINT +#include <pwd.h> +#include <grp.h> #endif #include <fcntl.h> #ifdef WIN32 +#include <stdio.h> #include <direct.h> #include <sys/stat.h> // stat // NOLINT #include <sys/types.h> // NOLINT @@ -51,12 +55,15 @@ #pragma comment(lib, "Ws2_32.lib") -#include <algorithm> // replace -#include <string> // string +#include <string> #include "properties/Properties.h" #include "utils/Id.h" +#include "accctrl.h" +#include "aclapi.h" +#pragma comment(lib, "advapi32.lib") + #endif #ifdef __APPLE__ #include <mach-o/dyld.h> @@ -109,7 +116,8 @@ inline char get_separator(bool /*force_posix*/ = false) { return '/'; } #endif -time_t to_time_t(const std::filesystem::file_time_type time); +time_t to_time_t(const std::filesystem::file_time_type& time); +std::chrono::time_point<std::chrono::system_clock> to_sys_time_point(const std::filesystem::file_time_type& time); inline std::string normalize_path_separators(std::string path, bool force_posix = false) { const auto normalize_separators = [force_posix](const char c) { @@ -171,6 +179,19 @@ inline const std::optional<std::filesystem::file_time_type> last_write_time(cons return std::nullopt; } +inline std::optional<std::string> format_time(const std::filesystem::file_time_type& time, const std::string& format) { + auto last_write_time_t = to_time_t(time); + std::array<char, 128U> result; + if (std::strftime(result.data(), result.size(), format.c_str(), gmtime(&last_write_time_t)) != 0) { + return std::string(result.data()); + } + return std::nullopt; +} + +inline std::optional<std::string> get_last_modified_time_formatted_string(const std::string& path, const std::string& format_string) { + return last_write_time(path) | utils::flatMap([format_string](auto time) { return format_time(time, format_string); }); +} + inline bool set_last_write_time(const std::string &path, std::filesystem::file_time_type new_time) { std::error_code ec; std::filesystem::last_write_time(path, new_time, ec); @@ -192,7 +213,6 @@ inline uint64_t file_size(const std::string &path) { return 0; } -#ifndef WIN32 inline bool get_permissions(const std::string &path, uint32_t &permissions) { std::error_code ec; permissions = static_cast<uint32_t>(std::filesystem::status(path, ec).permissions()); @@ -204,7 +224,26 @@ inline int set_permissions(const std::string &path, const uint32_t permissions) std::filesystem::permissions(path, static_cast<std::filesystem::perms>(permissions), ec); return ec.value(); } -#endif + +inline std::optional<std::string> get_permission_string(const std::string &path) { + std::error_code ec; + auto permissions = std::filesystem::status(path, ec).permissions(); + if (ec.value() != 0) { + return std::nullopt; + } + + std::string permission_string; + permission_string += (permissions & std::filesystem::perms::owner_read) != std::filesystem::perms::none ? "r" : "-"; + permission_string += (permissions & std::filesystem::perms::owner_write) != std::filesystem::perms::none ? "w" : "-"; + permission_string += (permissions & std::filesystem::perms::owner_exec) != std::filesystem::perms::none ? "x" : "-"; + permission_string += (permissions & std::filesystem::perms::group_read) != std::filesystem::perms::none ? "r" : "-"; + permission_string += (permissions & std::filesystem::perms::group_write) != std::filesystem::perms::none ? "w" : "-"; + permission_string += (permissions & std::filesystem::perms::group_exec) != std::filesystem::perms::none ? "x" : "-"; + permission_string += (permissions & std::filesystem::perms::others_read) != std::filesystem::perms::none ? "r" : "-"; + permission_string += (permissions & std::filesystem::perms::others_write) != std::filesystem::perms::none ? "w" : "-"; + permission_string += (permissions & std::filesystem::perms::others_exec) != std::filesystem::perms::none ? "x" : "-"; + return permission_string; +} #ifndef WIN32 inline bool get_uid_gid(const std::string &path, uint64_t &uid, uint64_t &gid) { @@ -577,6 +616,143 @@ inline std::string get_file_content(const std::string &file_name) { bool contains(const std::filesystem::path& file_path, std::string_view text_to_search); +inline std::optional<std::string> get_file_owner(const std::string& file_path) { +#ifndef WIN32 + struct stat info; + if (stat(file_path.c_str(), &info) != 0) { + return std::nullopt; + } + + struct passwd pw; + pw.pw_name = 0; + struct passwd *result = nullptr; + char localbuf[1024] = {}; + if (getpwuid_r(info.st_uid, &pw, localbuf, sizeof(localbuf), &result) != 0 || pw.pw_name == 0) { + return std::nullopt; + } + + return std::string(pw.pw_name); +#else + DWORD return_code = 0; + PSID sid_owner = NULL; + BOOL bool_return = TRUE; + LPTSTR account_name = NULL; + LPTSTR domain_name = NULL; + DWORD account_name_dword = 1; + DWORD domain_name_dword = 1; + SID_NAME_USE sid_type = SidTypeUnknown; + HANDLE file_handle; + PSECURITY_DESCRIPTOR sec_descriptor = NULL; + + // Get the handle of the file object. + file_handle = CreateFile( + TEXT(file_path.c_str()), + GENERIC_READ, + FILE_SHARE_READ, + NULL, + OPEN_EXISTING, + FILE_ATTRIBUTE_NORMAL, + NULL); + + // Check GetLastError for CreateFile error code. + if (file_handle == INVALID_HANDLE_VALUE) { + return std::nullopt; + } + + // Get the owner SID of the file. + return_code = GetSecurityInfo( + file_handle, + SE_FILE_OBJECT, + OWNER_SECURITY_INFORMATION, + &sid_owner, + NULL, + NULL, + NULL, + &sec_descriptor); + + // Check GetLastError for GetSecurityInfo error condition. + if (return_code != ERROR_SUCCESS) { + return std::nullopt; + } + + // First call to LookupAccountSid to get the buffer sizes. + bool_return = LookupAccountSid( + NULL, + sid_owner, + account_name, + (LPDWORD)&account_name_dword, + domain_name, + (LPDWORD)&domain_name_dword, + &sid_type); + + // Reallocate memory for the buffers. + account_name = (LPTSTR)GlobalAlloc( + GMEM_FIXED, + account_name_dword); + + // Check GetLastError for GlobalAlloc error condition. + if (account_name == NULL) { + return std::nullopt; + } + auto cleanup_account_name = gsl::finally([&account_name] { GlobalFree(account_name); }); + + domain_name = (LPTSTR)GlobalAlloc( + GMEM_FIXED, + domain_name_dword); + + // Check GetLastError for GlobalAlloc error condition. + if (domain_name == NULL) { + return std::nullopt; + } + auto cleanup_domain_name = gsl::finally([&domain_name] { GlobalFree(domain_name); }); + + // Second call to LookupAccountSid to get the account name. + bool_return = LookupAccountSid( + NULL, // name of local or remote computer + sid_owner, // security identifier + account_name, // account name buffer + (LPDWORD)&account_name_dword, // size of account name buffer + domain_name, // domain name + (LPDWORD)&domain_name_dword, // size of domain name buffer + &sid_type); // SID type + + // Check GetLastError for LookupAccountSid error condition. + if (bool_return == FALSE) { + return std::nullopt; + } + + auto result = std::string(account_name); + return result; +#endif +} + +#ifndef WIN32 +inline std::optional<std::string> get_file_group(const std::string& file_path) { + struct stat info; + if (stat(file_path.c_str(), &info) != 0) { + return std::nullopt; + } + + struct group gr; + gr.gr_name = 0; + struct group *result = nullptr; + char localbuf[1024] = {}; + if ((getgrgid_r(info.st_uid, &gr, localbuf, sizeof(localbuf), &result) != 0) || gr.gr_name == 0) { + return std::nullopt; + } + + return std::string(gr.gr_name); +} +#endif + +inline std::optional<std::string> get_relative_path(const std::string& path, const std::string& base_path) { + if (!utils::StringUtils::startsWith(path, base_path)) { + return std::nullopt; + } + + return std::filesystem::relative(path, base_path).string(); +} + } // namespace file } // namespace utils } // namespace minifi diff --git a/libminifi/src/utils/file/FileUtils.cpp b/libminifi/src/utils/file/FileUtils.cpp index 3fc50fae3..456211130 100644 --- a/libminifi/src/utils/file/FileUtils.cpp +++ b/libminifi/src/utils/file/FileUtils.cpp @@ -86,13 +86,23 @@ bool contains(const std::filesystem::path& file_path, std::string_view text_to_s return check_range(left.size(), left.size() + right.size()); } -time_t to_time_t(const std::filesystem::file_time_type file_time) { +time_t to_time_t(const std::filesystem::file_time_type& file_time) { #if defined(WIN32) - return std::chrono::system_clock::to_time_t(std::chrono::utc_clock::to_sys(std::chrono::file_clock::to_utc(file_time))); + return std::chrono::system_clock::to_time_t(to_sys_time_point(file_time)); #elif defined(_LIBCPP_VERSION) return std::chrono::file_clock::to_time_t(file_time); #else - return std::chrono::system_clock::to_time_t(std::chrono::file_clock::to_sys(file_time)); + return std::chrono::system_clock::to_time_t(to_sys_time_point(file_time)); +#endif +} + +std::chrono::time_point<std::chrono::system_clock> to_sys_time_point(const std::filesystem::file_time_type& file_time) { +#if defined(WIN32) + return std::chrono::time_point_cast<std::chrono::system_clock::duration>(file_time - std::filesystem::file_time_type::clock::now() + std::chrono::system_clock::now()); +#elif defined(_LIBCPP_VERSION) + return std::chrono::system_clock::from_time_t(std::chrono::file_clock::to_time_t(file_time)); +#else + return std::chrono::file_clock::to_sys(file_time); #endif } diff --git a/libminifi/test/unit/FileUtilsTests.cpp b/libminifi/test/unit/FileUtilsTests.cpp index 137f8b150..eabb29ee0 100644 --- a/libminifi/test/unit/FileUtilsTests.cpp +++ b/libminifi/test/unit/FileUtilsTests.cpp @@ -500,6 +500,19 @@ TEST_CASE("FileUtils::set_permissions and get_permissions", "[TestSetPermissions REQUIRE(FileUtils::get_permissions(path, perms)); REQUIRE(perms == 0644); } + +TEST_CASE("FileUtils::get_permission_string", "[TestGetPermissionString]") { + TestController testController; + + auto dir = testController.createTempDirectory(); + auto path = dir + FileUtils::get_separator() + "test_file.txt"; + std::ofstream outfile(path, std::ios::out | std::ios::binary); + + REQUIRE(FileUtils::set_permissions(path, 0644) == 0); + auto perms = FileUtils::get_permission_string(path); + REQUIRE(perms != std::nullopt); + REQUIRE(*perms == "rw-r--r--"); +} #endif TEST_CASE("FileUtils::exists", "[TestExists]") { @@ -581,3 +594,14 @@ TEST_CASE("FileUtils::contains", "[utils][file][contains]") { REQUIRE(utils::file::contains(file_path, "ABC")); } } + +TEST_CASE("FileUtils::get_relative_path", "[TestGetRelativePath]") { + TestController test_controller; + const auto base_path = test_controller.createTempDirectory(); + auto path = std::filesystem::path{"/random/non-existent/dir"}; + REQUIRE(FileUtils::get_relative_path(path.string(), base_path) == std::nullopt); + path = std::filesystem::path{base_path} / "subdir" / "file.log"; + REQUIRE(*FileUtils::get_relative_path(path.string(), base_path) == std::string("subdir") + FileUtils::get_separator() + "file.log"); + REQUIRE(*FileUtils::get_relative_path(path.string(), base_path + FileUtils::get_separator()) == std::string("subdir") + FileUtils::get_separator() + "file.log"); + REQUIRE(*FileUtils::get_relative_path(base_path, base_path) == "."); +}
