This is an automated email from the ASF dual-hosted git repository. szaszm pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git
commit 55dda00ff627106b607ee33d8ebbb7b774d21399 Author: Marton Szasz <[email protected]> AuthorDate: Mon Apr 17 15:51:32 2023 +0200 MINIFICPP-2074 Map properties to match validators Fix time-period/integer and data-size/integer validated properties during loadConfigureFile Closes #1543 Signed-off-by: Marton Szasz <[email protected]> --- conf/minifi-log.properties | 2 +- extensions/coap/tests/CoapIntegrationBase.h | 2 +- .../tests/ControllerServiceIntegrationTests.cpp | 2 +- extensions/http-curl/tests/HTTPHandlers.h | 25 +- extensions/sftp/processors/SFTPProcessorBase.cpp | 4 +- .../tests/unit/ConfigurationTests.cpp | 123 +++++++++- libminifi/include/EventDrivenSchedulingAgent.h | 13 +- libminifi/include/FlowController.h | 2 +- libminifi/include/SchedulingAgent.h | 17 +- libminifi/include/core/PropertyBuilder.h | 11 - libminifi/include/core/TypedValues.h | 28 +-- libminifi/include/properties/Configuration.h | 5 +- libminifi/include/properties/Properties.h | 2 +- libminifi/include/utils/StringUtils.h | 21 +- libminifi/include/utils/TimeUtil.h | 30 +-- libminifi/src/Configuration.cpp | 273 ++++++++++----------- libminifi/src/FlowController.cpp | 4 +- libminifi/src/c2/C2Agent.cpp | 3 +- libminifi/src/c2/C2MetricsPublisher.cpp | 8 +- libminifi/src/core/logging/LoggerConfiguration.cpp | 6 +- .../src/core/state/nodes/SupportedOperations.cpp | 33 +-- libminifi/src/properties/Properties.cpp | 106 +++++++- libminifi/src/utils/StringUtils.cpp | 27 +- .../test/integration/ProvenanceReportingTest.cpp | 2 +- libminifi/test/unit/ProvenanceTestHelper.h | 2 +- libminifi/test/unit/StringUtilsTests.cpp | 38 +++ libminifi/test/unit/TimeUtilTests.cpp | 62 ++--- minifi_main/MiNiFiMain.cpp | 28 +-- 28 files changed, 562 insertions(+), 317 deletions(-) diff --git a/conf/minifi-log.properties b/conf/minifi-log.properties index f27ef967b..87fb9694f 100644 --- a/conf/minifi-log.properties +++ b/conf/minifi-log.properties @@ -29,7 +29,7 @@ appender.rolling=rollingappender #appender.rolling.directory=${MINIFI_HOME}/logs appender.rolling.file_name=minifi-app.log appender.rolling.max_files=3 -appender.rolling.max_file_size=5242880 +appender.rolling.max_file_size=5 MB #Other possible appenders #appender.stdout=stdout diff --git a/extensions/coap/tests/CoapIntegrationBase.h b/extensions/coap/tests/CoapIntegrationBase.h index 593bf4dac..b498e1240 100644 --- a/extensions/coap/tests/CoapIntegrationBase.h +++ b/extensions/coap/tests/CoapIntegrationBase.h @@ -81,7 +81,7 @@ class CoapIntegrationBase : public IntegrationBase { runAssertions(); shutdownBeforeFlowController(); - controller->waitUnload(wait_time_.count()); + controller->waitUnload(wait_time_); cleanup(); } diff --git a/extensions/http-curl/tests/ControllerServiceIntegrationTests.cpp b/extensions/http-curl/tests/ControllerServiceIntegrationTests.cpp index 3c8abb228..bb501b74f 100644 --- a/extensions/http-curl/tests/ControllerServiceIntegrationTests.cpp +++ b/extensions/http-curl/tests/ControllerServiceIntegrationTests.cpp @@ -127,6 +127,6 @@ int main(int argc, char **argv) { // } // assert(verifyEventHappenedInPollTime(std::chrono::seconds(2), checkCsIdEnabledMatchesDisabledFlag)); - controller->waitUnload(60000); + controller->waitUnload(60s); return 0; } diff --git a/extensions/http-curl/tests/HTTPHandlers.h b/extensions/http-curl/tests/HTTPHandlers.h index 88484d5a8..f95d372bc 100644 --- a/extensions/http-curl/tests/HTTPHandlers.h +++ b/extensions/http-curl/tests/HTTPHandlers.h @@ -42,7 +42,9 @@ #include "c2/C2Payload.h" #include "core/PropertyBuilder.h" #include "properties/Configuration.h" -#include "range/v3/algorithm/find.hpp" +#include "range/v3/algorithm/contains.hpp" +#include "range/v3/view/filter.hpp" +#include "range/v3/view/view.hpp" static std::atomic<int> transaction_id; static std::atomic<int> transaction_id_output; @@ -556,20 +558,19 @@ class HeartbeatHandler : public ServerAwareHandler { std::vector<std::unordered_map<std::string, std::string>> config_properties; const auto prop_reader = [this](const std::string& sensitive_props) { return configuration_->getString(sensitive_props); }; const auto sensitive_props = minifi::Configuration::getSensitiveProperties(prop_reader); - for (const auto& property : minifi::Configuration::CONFIGURATION_PROPERTIES) { - if (ranges::find(sensitive_props, property.name) != ranges::end(sensitive_props)) { - continue; - } + auto allowed_not_sensitive_configuration_properties = minifi::Configuration::CONFIGURATION_PROPERTIES | ranges::views::filter([&](const auto& configuration_property) { + const auto& configuration_property_name = configuration_property.first; + return !ranges::contains(sensitive_props, configuration_property_name) && !ranges::contains(disallowed_properties, configuration_property_name); + }); + for (const auto& [property_name, property_validator] : allowed_not_sensitive_configuration_properties) { std::unordered_map<std::string, std::string> config_property; - if (ranges::find(disallowed_properties, property.name) == ranges::end(disallowed_properties)) { - config_property.emplace("propertyName", property.name); - if (auto value = configuration_->getRawValue(std::string(property.name))) { - config_property.emplace("propertyValue", *value); - } - config_property.emplace("validator", property.validator->getName()); - config_properties.push_back(config_property); + config_property.emplace("propertyName", property_name); + if (auto value = configuration_->getRawValue(std::string(property_name))) { + config_property.emplace("propertyValue", *value); } + config_property.emplace("validator", property_validator->getName()); + config_properties.push_back(config_property); } Metadata metadata; metadata.emplace("availableProperties", config_properties); diff --git a/extensions/sftp/processors/SFTPProcessorBase.cpp b/extensions/sftp/processors/SFTPProcessorBase.cpp index bc4c8fce4..10fde8d45 100644 --- a/extensions/sftp/processors/SFTPProcessorBase.cpp +++ b/extensions/sftp/processors/SFTPProcessorBase.cpp @@ -112,7 +112,7 @@ bool SFTPProcessorBase::parseCommonPropertiesOnTrigger(const std::shared_ptr<cor logger_->log_error("Port attribute is missing or invalid"); return false; } else { - int port_tmp; + int port_tmp = 0; if (!core::Property::StringToInt(value, port_tmp) || port_tmp <= std::numeric_limits<uint16_t>::min() || port_tmp > std::numeric_limits<uint16_t>::max()) { @@ -132,7 +132,7 @@ bool SFTPProcessorBase::parseCommonPropertiesOnTrigger(const std::shared_ptr<cor context->getProperty(Password, common_properties.password, flow_file); context->getProperty(ProxyHost, common_properties.proxy_host, flow_file); if (context->getProperty(ProxyPort, value, flow_file) && !value.empty()) { - int port_tmp; + int port_tmp = 0; if (!core::Property::StringToInt(value, port_tmp) || port_tmp <= std::numeric_limits<uint16_t>::min() || port_tmp > std::numeric_limits<uint16_t>::max()) { diff --git a/extensions/standard-processors/tests/unit/ConfigurationTests.cpp b/extensions/standard-processors/tests/unit/ConfigurationTests.cpp index 463f86212..7ffb9b3e0 100644 --- a/extensions/standard-processors/tests/unit/ConfigurationTests.cpp +++ b/extensions/standard-processors/tests/unit/ConfigurationTests.cpp @@ -19,13 +19,14 @@ #include "Catch.h" #include "properties/Configuration.h" +#include "Environment.h" namespace org::apache::nifi::minifi::test { TEST_CASE("Configuration can merge lists of property names", "[mergeProperties]") { using vector = std::vector<std::string>; - REQUIRE(Configuration::mergeProperties(vector{}, vector{}) == vector{}); + REQUIRE(Configuration::mergeProperties(vector{}, vector{}) == vector{}); // NOLINT(readability-container-size-empty) REQUIRE(Configuration::mergeProperties(vector{"a"}, vector{}) == vector{"a"}); REQUIRE(Configuration::mergeProperties(vector{"a"}, vector{"a"}) == vector{"a"}); @@ -53,4 +54,124 @@ TEST_CASE("Configuration can validate values to be assigned to specific properti REQUIRE(Configuration::validatePropertyValue("random.property", "random_value")); } +TEST_CASE("Configuration can fix misconfigured timeperiod<->integer validated properties") { + LogTestController::getInstance().setInfo<minifi::Configure>(); + LogTestController::getInstance().setInfo<minifi::Properties>(); + + auto properties_path = std::filesystem::temp_directory_path() / "test.properties"; + + std::ofstream{properties_path} + << "nifi.c2.agent.heartbeat.period=1min\n" + << "nifi.administrative.yield.duration=30000\n"; + auto properties_file_time_after_creation = std::filesystem::last_write_time(properties_path); + const std::shared_ptr<minifi::Configure> configure = std::make_shared<minifi::Configure>(); + + configure->loadConfigureFile(properties_path); + CHECK(configure->get("nifi.c2.agent.heartbeat.period") == "60000"); + CHECK(configure->get("nifi.administrative.yield.duration") == "30000 ms"); + + { + CHECK(properties_file_time_after_creation == std::filesystem::last_write_time(properties_path)); + std::ifstream properties_file(properties_path); + std::string first_line; + std::string second_line; + CHECK(std::getline(properties_file, first_line)); + CHECK(std::getline(properties_file, second_line)); + CHECK(first_line == "nifi.c2.agent.heartbeat.period=1min"); + CHECK(second_line == "nifi.administrative.yield.duration=30000"); + } + + CHECK(configure->commitChanges()); + + { + CHECK(properties_file_time_after_creation <= std::filesystem::last_write_time(properties_path)); + std::ifstream properties_file(properties_path); + std::string first_line; + std::string second_line; + CHECK(std::getline(properties_file, first_line)); + CHECK(std::getline(properties_file, second_line)); + CHECK(first_line == "nifi.c2.agent.heartbeat.period=60000"); + CHECK(second_line == "nifi.administrative.yield.duration=30000 ms"); + } +} + +TEST_CASE("Configuration can fix misconfigured datasize<->integer validated properties") { + LogTestController::getInstance().setInfo<minifi::Configure>(); + LogTestController::getInstance().setInfo<minifi::Properties>(); + + auto properties_path = std::filesystem::temp_directory_path() / "test.properties"; + + { + std::ofstream properties_file(properties_path); + properties_file << "appender.rolling.max_file_size=6000" << std::endl; + properties_file.close(); + } + auto properties_file_time_after_creation = std::filesystem::last_write_time(properties_path); + const std::shared_ptr<minifi::Configure> configure = std::make_shared<minifi::Configure>(); + + configure->loadConfigureFile(properties_path, "nifi.log."); + CHECK(configure->get("appender.rolling.max_file_size") == "6000 B"); + + { + CHECK(properties_file_time_after_creation <= std::filesystem::last_write_time(properties_path)); + std::ifstream properties_file(properties_path); + std::string first_line; + CHECK(std::getline(properties_file, first_line)); + CHECK(first_line == "appender.rolling.max_file_size=6000"); + } + + CHECK(configure->commitChanges()); + + { + CHECK(properties_file_time_after_creation <= std::filesystem::last_write_time(properties_path)); + std::ifstream properties_file(properties_path); + std::string first_line; + CHECK(std::getline(properties_file, first_line)); + CHECK(first_line == "appender.rolling.max_file_size=6000 B"); + } +} + + +TEST_CASE("Configuration can fix misconfigured validated properties within environmental variables") { + LogTestController::getInstance().setInfo<minifi::Configure>(); + LogTestController::getInstance().setInfo<minifi::Properties>(); + auto properties_path = std::filesystem::temp_directory_path() / "test.properties"; + + CHECK(minifi::utils::Environment::setEnvironmentVariable("SOME_VARIABLE", "4000")); + + std::ofstream{properties_path} + << "compression.cached.log.max.size=${SOME_VARIABLE}\n" + << "compression.compressed.log.max.size=3000\n"; + auto properties_file_time_after_creation = std::filesystem::last_write_time(properties_path); + const std::shared_ptr<minifi::Configure> configure = std::make_shared<minifi::Configure>(); + + configure->loadConfigureFile(properties_path, "nifi.log."); + CHECK(configure->get("compression.cached.log.max.size") == "4000 B"); + CHECK(configure->get("compression.compressed.log.max.size") == "3000 B"); + + { + CHECK(properties_file_time_after_creation <= std::filesystem::last_write_time(properties_path)); + std::ifstream properties_file(properties_path); + std::string first_line; + std::string second_line; + CHECK(std::getline(properties_file, first_line)); + CHECK(std::getline(properties_file, second_line)); + CHECK(first_line == "compression.cached.log.max.size=${SOME_VARIABLE}"); + CHECK(second_line == "compression.compressed.log.max.size=3000"); + } + + CHECK(configure->commitChanges()); + + { + CHECK(properties_file_time_after_creation <= std::filesystem::last_write_time(properties_path)); + std::ifstream properties_file(properties_path); + std::string first_line; + std::string second_line; + CHECK(std::getline(properties_file, first_line)); + CHECK(std::getline(properties_file, second_line)); + CHECK(first_line == "compression.cached.log.max.size=${SOME_VARIABLE}"); + CHECK(second_line == "compression.compressed.log.max.size=3000 B"); + } +} + } // namespace org::apache::nifi::minifi::test diff --git a/libminifi/include/EventDrivenSchedulingAgent.h b/libminifi/include/EventDrivenSchedulingAgent.h index d017da38c..07a173059 100644 --- a/libminifi/include/EventDrivenSchedulingAgent.h +++ b/libminifi/include/EventDrivenSchedulingAgent.h @@ -21,8 +21,9 @@ #include <memory> #include <string> +#include <chrono> -#define DEFAULT_TIME_SLICE_MS 500 +constexpr auto DEFAULT_TIME_SLICE = std::chrono::milliseconds(500); #include "core/logging/Logger.h" #include "core/Processor.h" @@ -38,11 +39,15 @@ class EventDrivenSchedulingAgent : public ThreadedSchedulingAgent { std::shared_ptr<core::Repository> flow_repo, std::shared_ptr<core::ContentRepository> content_repo, std::shared_ptr<Configure> configuration, utils::ThreadPool<utils::TaskRescheduleInfo> &thread_pool) : ThreadedSchedulingAgent(controller_service_provider, repo, flow_repo, content_repo, configuration, thread_pool) { - int slice = configuration->getInt(Configure::nifi_flow_engine_event_driven_time_slice, DEFAULT_TIME_SLICE_MS); - if (slice < 10 || 1000 < slice) { + using namespace std::literals::chrono_literals; + + time_slice_ = configuration->get(Configure::nifi_flow_engine_event_driven_time_slice) + | utils::flatMap(utils::timeutils::StringToDuration<std::chrono::milliseconds>) + | utils::valueOrElse([] { return DEFAULT_TIME_SLICE; }); + + if (time_slice_ < 10ms || 1000ms < time_slice_) { throw Exception(FLOW_EXCEPTION, std::string(Configure::nifi_flow_engine_event_driven_time_slice) + " is out of reasonable range!"); } - time_slice_ = std::chrono::milliseconds(slice); } void schedule(core::Processor* processor) override; diff --git a/libminifi/include/FlowController.h b/libminifi/include/FlowController.h index 187c622c4..60a8832e8 100644 --- a/libminifi/include/FlowController.h +++ b/libminifi/include/FlowController.h @@ -115,7 +115,7 @@ class FlowController : public core::controller::ForwardingControllerServiceProvi int16_t applyUpdate(const std::string& /*source*/, const std::shared_ptr<state::Update>&) override { return -1; } // Asynchronous function trigger unloading and wait for a period of time - virtual void waitUnload(uint64_t timeToWaitMs); + virtual void waitUnload(const std::chrono::milliseconds time_to_wait); void updatePropertyValue(std::string processorName, std::string propertyName, std::string propertyValue) { root_wrapper_.updatePropertyValue(std::move(processorName), std::move(propertyName), std::move(propertyValue)); } diff --git a/libminifi/include/SchedulingAgent.h b/libminifi/include/SchedulingAgent.h index 968d03860..fec5e9598 100644 --- a/libminifi/include/SchedulingAgent.h +++ b/libminifi/include/SchedulingAgent.h @@ -45,17 +45,13 @@ #include "core/controller/ControllerServiceProvider.h" #include "core/controller/ControllerServiceNode.h" -#define SCHEDULING_WATCHDOG_CHECK_PERIOD_MS 1000 // msec -#define SCHEDULING_WATCHDOG_DEFAULT_ALERT_PERIOD_MS 5000 // msec +constexpr std::chrono::milliseconds SCHEDULING_WATCHDOG_CHECK_PERIOD = std::chrono::seconds(1); +constexpr std::chrono::milliseconds SCHEDULING_WATCHDOG_DEFAULT_ALERT_PERIOD = std::chrono::seconds(5); namespace org::apache::nifi::minifi { class SchedulingAgent { public: - // Constructor - /*! - * Create a new scheduling agent. - */ SchedulingAgent(const gsl::not_null<core::controller::ControllerServiceProvider*> controller_service_provider, std::shared_ptr<core::Repository> repo, std::shared_ptr<core::Repository> flow_repo, std::shared_ptr<core::ContentRepository> content_repo, std::shared_ptr<Configure> configuration, utils::ThreadPool<utils::TaskRescheduleInfo> &thread_pool) : admin_yield_duration_(), @@ -64,15 +60,18 @@ class SchedulingAgent { content_repo_(content_repo), thread_pool_(thread_pool), controller_service_provider_(controller_service_provider), - logger_(core::logging::LoggerFactory<SchedulingAgent>::getLogger()), - alert_time_(configuration->getInt(Configure::nifi_flow_engine_alert_period, SCHEDULING_WATCHDOG_DEFAULT_ALERT_PERIOD_MS)) { + logger_(core::logging::LoggerFactory<SchedulingAgent>::getLogger()) { running_ = false; repo_ = repo; flow_repo_ = flow_repo; + alert_time_ = configuration->get(Configure::nifi_flow_engine_alert_period) + | utils::flatMap(utils::timeutils::StringToDuration<std::chrono::milliseconds>) + | utils::valueOrElse([] { return SCHEDULING_WATCHDOG_DEFAULT_ALERT_PERIOD; }); + if (alert_time_ > std::chrono::milliseconds(0)) { std::function<void(void)> f = std::bind(&SchedulingAgent::watchDogFunc, this); - watchDogTimer_.reset(new utils::CallBackTimer(std::chrono::milliseconds(SCHEDULING_WATCHDOG_CHECK_PERIOD_MS), f)); + watchDogTimer_.reset(new utils::CallBackTimer(SCHEDULING_WATCHDOG_CHECK_PERIOD, f)); watchDogTimer_->start(); } diff --git a/libminifi/include/core/PropertyBuilder.h b/libminifi/include/core/PropertyBuilder.h index 169717a64..0c02e592a 100644 --- a/libminifi/include/core/PropertyBuilder.h +++ b/libminifi/include/core/PropertyBuilder.h @@ -207,17 +207,6 @@ class ConstrainedProperty : public std::enable_shared_from_this<ConstrainedPrope friend class PropertyBuilder; }; -struct ConfigurationProperty { - explicit ConfigurationProperty(std::string_view name, - gsl::not_null<PropertyValidator*> validator = gsl::make_not_null(StandardValidators::get().VALID_VALIDATOR.get())) - : name(name), - validator(validator) { - } - - std::string_view name; - gsl::not_null<PropertyValidator*> validator; -}; - } // namespace core } // namespace minifi } // namespace nifi diff --git a/libminifi/include/core/TypedValues.h b/libminifi/include/core/TypedValues.h index a309a449b..053c79b2b 100644 --- a/libminifi/include/core/TypedValues.h +++ b/libminifi/include/core/TypedValues.h @@ -110,20 +110,23 @@ class DataSizeValue : public TransformableValue, public state::response::UInt64V : state::response::UInt64Value(0) { } - - // Convert String to Integer - template<typename T, typename std::enable_if< - std::is_integral<T>::value>::type* = nullptr> - static bool StringToInt(const std::string &input, T &output) { + static std::optional<int64_t> getUnitMultiplier(std::string unit_str) { // TODO(adebreceni): this mapping is to preserve backwards compatibility, // we should entertain the idea of moving to standardized units in // the configuration (i.e. K = 1000, Ki = 1024) static std::map<std::string, int64_t> unit_map{ - {"B", 1}, - {"K", 1_KB}, {"M", 1_MB}, {"G", 1_GB}, {"T", 1_TB}, {"P", 1_PB}, - {"KB", 1_KiB}, {"MB", 1_MiB}, {"GB", 1_GiB}, {"TB", 1_TiB}, {"PB", 1_PiB}, + {"B", 1}, + {"K", 1_KB}, {"M", 1_MB}, {"G", 1_GB}, {"T", 1_TB}, {"P", 1_PB}, + {"KB", 1_KiB}, {"MB", 1_MiB}, {"GB", 1_GiB}, {"TB", 1_TiB}, {"PB", 1_PiB}, }; + unit_str = utils::StringUtils::toUpper(unit_str); + + return unit_map.contains(unit_str) ? std::optional(unit_map.at(unit_str)) : std::nullopt; + } + // Convert String to Integer + template<std::integral T> + static bool StringToInt(const std::string &input, T &output) { int64_t value; std::string unit_str; try { @@ -133,14 +136,11 @@ class DataSizeValue : public TransformableValue, public state::response::UInt64V } if (!unit_str.empty()) { - std::transform(unit_str.begin(), unit_str.end(), unit_str.begin(), ::toupper); - auto multiplierIt = unit_map.find(unit_str); - if (multiplierIt == unit_map.end()) { + if (auto unit_multiplier = getUnitMultiplier(unit_str)) { + value *= *unit_multiplier; + } else { getLogger()->log_warn("Unrecognized data unit: '%s', in the future this will constitute as an error", unit_str); // backwards compatibility - // return false; - } else { - value *= multiplierIt->second; } } diff --git a/libminifi/include/properties/Configuration.h b/libminifi/include/properties/Configuration.h index a580f3f74..dfc50e920 100644 --- a/libminifi/include/properties/Configuration.h +++ b/libminifi/include/properties/Configuration.h @@ -18,6 +18,7 @@ #pragma once #include <vector> +#include <unordered_map> #include <string> #include "properties/Properties.h" @@ -27,7 +28,7 @@ namespace org::apache::nifi::minifi { namespace core { - struct ConfigurationProperty; +class PropertyValidator; } class Configuration : public Properties { @@ -191,7 +192,7 @@ class Configuration : public Properties { static constexpr const char *controller_socket_port = "controller.socket.port"; static constexpr const char *controller_ssl_context_service = "controller.ssl.context.service"; - MINIFIAPI static const std::vector<core::ConfigurationProperty> CONFIGURATION_PROPERTIES; + MINIFIAPI static const std::unordered_map<std::string_view, gsl::not_null<core::PropertyValidator*>> CONFIGURATION_PROPERTIES; MINIFIAPI static const std::array<const char*, 2> DEFAULT_SENSITIVE_PROPERTIES; static std::vector<std::string> mergeProperties(std::vector<std::string> properties, diff --git a/libminifi/include/properties/Properties.h b/libminifi/include/properties/Properties.h index 8979f649e..7d3948b68 100644 --- a/libminifi/include/properties/Properties.h +++ b/libminifi/include/properties/Properties.h @@ -113,7 +113,7 @@ class Properties { * Load configure file * @param fileName path of the configuration file RELATIVE to MINIFI_HOME set by setHome() */ - void loadConfigureFile(const std::filesystem::path& configuration_file); + void loadConfigureFile(const std::filesystem::path& configuration_file, std::string_view prefix = ""); // Set the determined MINIFI_HOME void setHome(std::filesystem::path minifiHome) { diff --git a/libminifi/include/utils/StringUtils.h b/libminifi/include/utils/StringUtils.h index 6d96fc232..2e6f7285c 100644 --- a/libminifi/include/utils/StringUtils.h +++ b/libminifi/include/utils/StringUtils.h @@ -35,6 +35,9 @@ #include "utils/FailurePolicy.h" #include "utils/gsl.h" #include "utils/meta/detected.h" +#include "range/v3/view/transform.hpp" +#include "range/v3/range/conversion.hpp" +#include "range/v3/view/join.hpp" // libc++ doesn't define operator<=> on strings, and apparently the operator rewrite rules don't automagically make one #if defined(_LIBCPP_VERSION) && _LIBCPP_VERSION < 16000 @@ -45,10 +48,6 @@ constexpr std::strong_ordering operator<=>(const std::string& lhs, const std::st } #endif -#include "range/v3/view/transform.hpp" -#include "range/v3/view/join.hpp" -#include "range/v3/range/conversion.hpp" - namespace org::apache::nifi::minifi::utils { template<class Char> @@ -88,7 +87,17 @@ class StringUtils { */ static std::optional<bool> toBool(const std::string& input); - static std::string toLower(std::string_view str); + static inline std::string toLower(std::string str) { + const auto tolower = [](auto c) { return std::tolower(static_cast<unsigned char>(c)); }; + std::transform(std::begin(str), std::end(str), std::begin(str), tolower); + return str; + } + + static inline std::string toUpper(std::string str) { + const auto toupper = [](auto c) { return std::toupper(static_cast<unsigned char>(c)); }; + std::transform(std::begin(str), std::end(str), std::begin(str), toupper); + return str; + } /** * Strips the line ending (\n or \r\n) from the end of the input line. @@ -480,7 +489,7 @@ class StringUtils { */ static bool matchesSequence(std::string_view str, const std::vector<std::string>& patterns); - private: + static bool splitToValueAndUnit(std::string_view input, int64_t& value, std::string& unit); }; } // namespace org::apache::nifi::minifi::utils diff --git a/libminifi/include/utils/TimeUtil.h b/libminifi/include/utils/TimeUtil.h index 2080c1ebe..4eff16fb6 100644 --- a/libminifi/include/utils/TimeUtil.h +++ b/libminifi/include/utils/TimeUtil.h @@ -32,6 +32,8 @@ #include <condition_variable> #include <memory> +#include "StringUtils.h" + // libc++ doesn't define operator<=> on durations, and apparently the operator rewrite rules don't automagically make one #if defined(_LIBCPP_VERSION) && _LIBCPP_VERSION < 16000 #include <compare> @@ -203,38 +205,18 @@ std::optional<TargetDuration> cast_to_matching_unit(std::string& unit, const int ((result = cast_if_unit_matches<TargetDuration, T>(unit, value)) || ...); return result; } - -inline bool get_unit_and_value(const std::string& input, std::string& unit, int64_t& value) { - const char* begin = input.c_str(); - char *end; - errno = 0; - value = std::strtoll(begin, &end, 0); - if (end == begin || errno == ERANGE) { - return false; - } - - if (end[0] == '\0') { - return false; - } - - while (*end == ' ') { - // Skip the spaces - end++; - } - unit = std::string(end); - std::transform(unit.begin(), unit.end(), unit.begin(), ::tolower); - return true; -} - } // namespace details template<class TargetDuration> std::optional<TargetDuration> StringToDuration(const std::string& input) { std::string unit; int64_t value; - if (!details::get_unit_and_value(input, unit, value)) + if (!StringUtils::splitToValueAndUnit(input, value, unit)) return std::nullopt; + + unit = utils::StringUtils::toLower(unit); + return details::cast_to_matching_unit<TargetDuration, std::chrono::nanoseconds, std::chrono::microseconds, diff --git a/libminifi/src/Configuration.cpp b/libminifi/src/Configuration.cpp index ab313f367..893e967fb 100644 --- a/libminifi/src/Configuration.cpp +++ b/libminifi/src/Configuration.cpp @@ -17,140 +17,140 @@ #include <algorithm> #include "properties/Configuration.h" -#include "core/PropertyBuilder.h" +#include "core/PropertyValidation.h" namespace org::apache::nifi::minifi { -const std::vector<core::ConfigurationProperty> Configuration::CONFIGURATION_PROPERTIES{ - core::ConfigurationProperty{Configuration::nifi_default_directory}, - core::ConfigurationProperty{Configuration::nifi_flow_configuration_file}, - core::ConfigurationProperty{Configuration::nifi_flow_configuration_encrypt, gsl::make_not_null(core::StandardValidators::get().BOOLEAN_VALIDATOR.get())}, - core::ConfigurationProperty{Configuration::nifi_flow_configuration_file_backup_update, gsl::make_not_null(core::StandardValidators::get().BOOLEAN_VALIDATOR.get())}, - core::ConfigurationProperty{Configuration::nifi_flow_engine_threads, gsl::make_not_null(core::StandardValidators::get().UNSIGNED_INT_VALIDATOR.get())}, - core::ConfigurationProperty{Configuration::nifi_flow_engine_alert_period, gsl::make_not_null(core::StandardValidators::get().UNSIGNED_INT_VALIDATOR.get())}, - core::ConfigurationProperty{Configuration::nifi_flow_engine_event_driven_time_slice, gsl::make_not_null(core::StandardValidators::get().UNSIGNED_INT_VALIDATOR.get())}, - core::ConfigurationProperty{Configuration::nifi_administrative_yield_duration, gsl::make_not_null(core::StandardValidators::get().TIME_PERIOD_VALIDATOR.get())}, - core::ConfigurationProperty{Configuration::nifi_bored_yield_duration, gsl::make_not_null(core::StandardValidators::get().TIME_PERIOD_VALIDATOR.get())}, - core::ConfigurationProperty{Configuration::nifi_graceful_shutdown_seconds, gsl::make_not_null(core::StandardValidators::get().UNSIGNED_INT_VALIDATOR.get())}, - core::ConfigurationProperty{Configuration::nifi_flowcontroller_drain_timeout, gsl::make_not_null(core::StandardValidators::get().TIME_PERIOD_VALIDATOR.get())}, - core::ConfigurationProperty{Configuration::nifi_server_name}, - core::ConfigurationProperty{Configuration::nifi_configuration_class_name}, - core::ConfigurationProperty{Configuration::nifi_flow_repository_class_name}, - core::ConfigurationProperty{Configuration::nifi_flow_repository_rocksdb_compression}, - core::ConfigurationProperty{Configuration::nifi_content_repository_class_name}, - core::ConfigurationProperty{Configuration::nifi_content_repository_rocksdb_compression}, - core::ConfigurationProperty{Configuration::nifi_provenance_repository_class_name}, - core::ConfigurationProperty{Configuration::nifi_volatile_repository_options_flowfile_max_count, gsl::make_not_null(core::StandardValidators::get().UNSIGNED_INT_VALIDATOR.get())}, - core::ConfigurationProperty{Configuration::nifi_volatile_repository_options_flowfile_max_bytes, gsl::make_not_null(core::StandardValidators::get().DATA_SIZE_VALIDATOR.get())}, - core::ConfigurationProperty{Configuration::nifi_volatile_repository_options_provenance_max_count, gsl::make_not_null(core::StandardValidators::get().UNSIGNED_INT_VALIDATOR.get())}, - core::ConfigurationProperty{Configuration::nifi_volatile_repository_options_provenance_max_bytes, gsl::make_not_null(core::StandardValidators::get().DATA_SIZE_VALIDATOR.get())}, - core::ConfigurationProperty{Configuration::nifi_volatile_repository_options_content_max_count, gsl::make_not_null(core::StandardValidators::get().UNSIGNED_INT_VALIDATOR.get())}, - core::ConfigurationProperty{Configuration::nifi_volatile_repository_options_content_max_bytes, gsl::make_not_null(core::StandardValidators::get().DATA_SIZE_VALIDATOR.get())}, - core::ConfigurationProperty{Configuration::nifi_volatile_repository_options_content_minimal_locking, gsl::make_not_null(core::StandardValidators::get().BOOLEAN_VALIDATOR.get())}, - core::ConfigurationProperty{Configuration::nifi_server_port, gsl::make_not_null(core::StandardValidators::get().PORT_VALIDATOR.get())}, - core::ConfigurationProperty{Configuration::nifi_server_report_interval, gsl::make_not_null(core::StandardValidators::get().TIME_PERIOD_VALIDATOR.get())}, - core::ConfigurationProperty{Configuration::nifi_provenance_repository_max_storage_size, gsl::make_not_null(core::StandardValidators::get().DATA_SIZE_VALIDATOR.get())}, - core::ConfigurationProperty{Configuration::nifi_provenance_repository_max_storage_time, gsl::make_not_null(core::StandardValidators::get().TIME_PERIOD_VALIDATOR.get())}, - core::ConfigurationProperty{Configuration::nifi_provenance_repository_directory_default}, - core::ConfigurationProperty{Configuration::nifi_flowfile_repository_directory_default}, - core::ConfigurationProperty{Configuration::nifi_dbcontent_repository_directory_default}, - core::ConfigurationProperty{Configuration::nifi_flowfile_repository_rocksdb_compaction_period, gsl::make_not_null(core::StandardValidators::get().TIME_PERIOD_VALIDATOR.get())}, - core::ConfigurationProperty{Configuration::nifi_dbcontent_repository_rocksdb_compaction_period, gsl::make_not_null(core::StandardValidators::get().TIME_PERIOD_VALIDATOR.get())}, - core::ConfigurationProperty{Configuration::nifi_remote_input_secure, gsl::make_not_null(core::StandardValidators::get().BOOLEAN_VALIDATOR.get())}, - core::ConfigurationProperty{Configuration::nifi_security_need_ClientAuth, gsl::make_not_null(core::StandardValidators::get().BOOLEAN_VALIDATOR.get())}, - core::ConfigurationProperty{Configuration::nifi_sensitive_props_additional_keys}, - core::ConfigurationProperty{Configuration::nifi_python_processor_dir}, - core::ConfigurationProperty{Configuration::nifi_extension_path}, - core::ConfigurationProperty{Configuration::nifi_security_client_certificate}, - core::ConfigurationProperty{Configuration::nifi_security_client_private_key}, - core::ConfigurationProperty{Configuration::nifi_security_client_pass_phrase}, - core::ConfigurationProperty{Configuration::nifi_security_client_ca_certificate}, - core::ConfigurationProperty{Configuration::nifi_security_use_system_cert_store}, - core::ConfigurationProperty{Configuration::nifi_security_windows_cert_store_location}, - core::ConfigurationProperty{Configuration::nifi_security_windows_server_cert_store}, - core::ConfigurationProperty{Configuration::nifi_security_windows_client_cert_store}, - core::ConfigurationProperty{Configuration::nifi_security_windows_client_cert_cn}, - core::ConfigurationProperty{Configuration::nifi_security_windows_client_cert_key_usage}, - core::ConfigurationProperty{Configuration::nifi_rest_api_user_name}, - core::ConfigurationProperty{Configuration::nifi_rest_api_password}, - core::ConfigurationProperty{Configuration::nifi_c2_enable, gsl::make_not_null(core::StandardValidators::get().BOOLEAN_VALIDATOR.get())}, - core::ConfigurationProperty{Configuration::nifi_c2_file_watch}, - core::ConfigurationProperty{Configuration::nifi_c2_flow_id}, - core::ConfigurationProperty{Configuration::nifi_c2_flow_url}, - core::ConfigurationProperty{Configuration::nifi_c2_flow_base_url}, - core::ConfigurationProperty{Configuration::nifi_c2_full_heartbeat, gsl::make_not_null(core::StandardValidators::get().BOOLEAN_VALIDATOR.get())}, - core::ConfigurationProperty{Configuration::nifi_c2_coap_connector_service}, - core::ConfigurationProperty{Configuration::nifi_c2_agent_heartbeat_period, gsl::make_not_null(core::StandardValidators::get().UNSIGNED_INT_VALIDATOR.get())}, - core::ConfigurationProperty{Configuration::nifi_c2_agent_heartbeat_reporter_classes}, - core::ConfigurationProperty{Configuration::nifi_c2_agent_class}, - core::ConfigurationProperty{Configuration::nifi_c2_agent_coap_host}, - core::ConfigurationProperty{Configuration::nifi_c2_agent_coap_port, gsl::make_not_null(core::StandardValidators::get().PORT_VALIDATOR.get())}, - core::ConfigurationProperty{Configuration::nifi_c2_agent_protocol_class}, - core::ConfigurationProperty{Configuration::nifi_c2_agent_identifier}, - core::ConfigurationProperty{Configuration::nifi_c2_agent_identifier_fallback}, - core::ConfigurationProperty{Configuration::nifi_c2_agent_trigger_classes}, - core::ConfigurationProperty{Configuration::nifi_c2_root_classes}, - core::ConfigurationProperty{Configuration::nifi_c2_root_class_definitions}, - core::ConfigurationProperty{Configuration::nifi_c2_rest_listener_port, gsl::make_not_null(core::StandardValidators::get().LISTEN_PORT_VALIDATOR.get())}, - core::ConfigurationProperty{Configuration::nifi_c2_rest_listener_cacert}, - core::ConfigurationProperty{Configuration::nifi_c2_rest_url}, - core::ConfigurationProperty{Configuration::nifi_c2_rest_url_ack}, - core::ConfigurationProperty{Configuration::nifi_c2_rest_ssl_context_service}, - core::ConfigurationProperty{Configuration::nifi_c2_rest_request_encoding}, - core::ConfigurationProperty{Configuration::nifi_c2_rest_heartbeat_minimize_updates, gsl::make_not_null(core::StandardValidators::get().BOOLEAN_VALIDATOR.get())}, - core::ConfigurationProperty{Configuration::nifi_c2_mqtt_connector_service}, - core::ConfigurationProperty{Configuration::nifi_c2_mqtt_heartbeat_topic}, - core::ConfigurationProperty{Configuration::nifi_c2_mqtt_update_topic}, - core::ConfigurationProperty{Configuration::nifi_state_storage_local}, - core::ConfigurationProperty{Configuration::nifi_state_storage_local_old}, - core::ConfigurationProperty{Configuration::nifi_state_storage_local_class_name}, - core::ConfigurationProperty{Configuration::nifi_state_storage_local_class_name_old}, - core::ConfigurationProperty{Configuration::nifi_state_storage_local_always_persist, gsl::make_not_null(core::StandardValidators::get().BOOLEAN_VALIDATOR.get())}, - core::ConfigurationProperty{Configuration::nifi_state_storage_local_always_persist_old, gsl::make_not_null(core::StandardValidators::get().BOOLEAN_VALIDATOR.get())}, - core::ConfigurationProperty{Configuration::nifi_state_storage_local_auto_persistence_interval, gsl::make_not_null(core::StandardValidators::get().TIME_PERIOD_VALIDATOR.get())}, - core::ConfigurationProperty{Configuration::nifi_state_storage_local_auto_persistence_interval_old, gsl::make_not_null(core::StandardValidators::get().TIME_PERIOD_VALIDATOR.get())}, - core::ConfigurationProperty{Configuration::nifi_state_storage_local_path}, - core::ConfigurationProperty{Configuration::nifi_state_storage_local_path_old}, - core::ConfigurationProperty{Configuration::minifi_disk_space_watchdog_enable, gsl::make_not_null(core::StandardValidators::get().BOOLEAN_VALIDATOR.get())}, - core::ConfigurationProperty{Configuration::minifi_disk_space_watchdog_interval, gsl::make_not_null(core::StandardValidators::get().TIME_PERIOD_VALIDATOR.get())}, - core::ConfigurationProperty{Configuration::minifi_disk_space_watchdog_stop_threshold, gsl::make_not_null(core::StandardValidators::get().UNSIGNED_LONG_VALIDATOR.get())}, - core::ConfigurationProperty{Configuration::minifi_disk_space_watchdog_restart_threshold, gsl::make_not_null(core::StandardValidators::get().UNSIGNED_LONG_VALIDATOR.get())}, - core::ConfigurationProperty{Configuration::nifi_framework_dir}, - core::ConfigurationProperty{Configuration::nifi_jvm_options}, - core::ConfigurationProperty{Configuration::nifi_nar_directory}, - core::ConfigurationProperty{Configuration::nifi_nar_deploy_directory}, - core::ConfigurationProperty{Configuration::nifi_log_spdlog_pattern}, - core::ConfigurationProperty{Configuration::nifi_log_spdlog_shorten_names, gsl::make_not_null(core::StandardValidators::get().BOOLEAN_VALIDATOR.get())}, - core::ConfigurationProperty{Configuration::nifi_log_appender_rolling}, - core::ConfigurationProperty{Configuration::nifi_log_appender_rolling_directory}, - core::ConfigurationProperty{Configuration::nifi_log_appender_rolling_file_name}, - core::ConfigurationProperty{Configuration::nifi_log_appender_rolling_max_files, gsl::make_not_null(core::StandardValidators::get().UNSIGNED_INT_VALIDATOR.get())}, - core::ConfigurationProperty{Configuration::nifi_log_appender_rolling_max_file_size, gsl::make_not_null(core::StandardValidators::get().DATA_SIZE_VALIDATOR.get())}, - core::ConfigurationProperty{Configuration::nifi_log_appender_stdout}, - core::ConfigurationProperty{Configuration::nifi_log_appender_stderr}, - core::ConfigurationProperty{Configuration::nifi_log_appender_null}, - core::ConfigurationProperty{Configuration::nifi_log_appender_syslog}, - core::ConfigurationProperty{Configuration::nifi_log_logger_root}, - core::ConfigurationProperty{Configuration::nifi_log_compression_cached_log_max_size, gsl::make_not_null(core::StandardValidators::get().DATA_SIZE_VALIDATOR.get())}, - core::ConfigurationProperty{Configuration::nifi_log_compression_compressed_log_max_size, gsl::make_not_null(core::StandardValidators::get().DATA_SIZE_VALIDATOR.get())}, - core::ConfigurationProperty{Configuration::nifi_log_alert_url}, - core::ConfigurationProperty{Configuration::nifi_log_alert_ssl_context_service}, - core::ConfigurationProperty{Configuration::nifi_log_alert_batch_size}, - core::ConfigurationProperty{Configuration::nifi_log_alert_flush_period}, - core::ConfigurationProperty{Configuration::nifi_log_alert_filter}, - core::ConfigurationProperty{Configuration::nifi_log_alert_rate_limit}, - core::ConfigurationProperty{Configuration::nifi_log_alert_buffer_limit}, - core::ConfigurationProperty{Configuration::nifi_log_alert_level}, - core::ConfigurationProperty{Configuration::nifi_asset_directory}, - core::ConfigurationProperty{Configuration::nifi_metrics_publisher_agent_identifier}, - core::ConfigurationProperty{Configuration::nifi_metrics_publisher_class}, - core::ConfigurationProperty{Configuration::nifi_metrics_publisher_prometheus_metrics_publisher_port, gsl::make_not_null(core::StandardValidators::get().PORT_VALIDATOR.get())}, - core::ConfigurationProperty{Configuration::nifi_metrics_publisher_metrics}, - core::ConfigurationProperty{Configuration::controller_socket_enable, gsl::make_not_null(core::StandardValidators::get().BOOLEAN_VALIDATOR.get())}, - core::ConfigurationProperty{Configuration::controller_socket_local_any_interface, gsl::make_not_null(core::StandardValidators::get().BOOLEAN_VALIDATOR.get())}, - core::ConfigurationProperty{Configuration::controller_socket_host}, - core::ConfigurationProperty{Configuration::controller_socket_port, gsl::make_not_null(core::StandardValidators::get().PORT_VALIDATOR.get())}, - core::ConfigurationProperty{Configuration::controller_ssl_context_service} +const std::unordered_map<std::string_view, gsl::not_null<core::PropertyValidator*>> Configuration::CONFIGURATION_PROPERTIES{ + {Configuration::nifi_default_directory, gsl::make_not_null(core::StandardValidators::get().VALID_VALIDATOR.get())}, + {Configuration::nifi_flow_configuration_file, gsl::make_not_null(core::StandardValidators::get().VALID_VALIDATOR.get())}, + {Configuration::nifi_flow_configuration_encrypt, gsl::make_not_null(core::StandardValidators::get().BOOLEAN_VALIDATOR.get())}, + {Configuration::nifi_flow_configuration_file_backup_update, gsl::make_not_null(core::StandardValidators::get().BOOLEAN_VALIDATOR.get())}, + {Configuration::nifi_flow_engine_threads, gsl::make_not_null(core::StandardValidators::get().UNSIGNED_INT_VALIDATOR.get())}, + {Configuration::nifi_flow_engine_alert_period, gsl::make_not_null(core::StandardValidators::get().TIME_PERIOD_VALIDATOR.get())}, + {Configuration::nifi_flow_engine_event_driven_time_slice, gsl::make_not_null(core::StandardValidators::get().TIME_PERIOD_VALIDATOR.get())}, + {Configuration::nifi_administrative_yield_duration, gsl::make_not_null(core::StandardValidators::get().TIME_PERIOD_VALIDATOR.get())}, + {Configuration::nifi_bored_yield_duration, gsl::make_not_null(core::StandardValidators::get().TIME_PERIOD_VALIDATOR.get())}, + {Configuration::nifi_graceful_shutdown_seconds, gsl::make_not_null(core::StandardValidators::get().TIME_PERIOD_VALIDATOR.get())}, + {Configuration::nifi_flowcontroller_drain_timeout, gsl::make_not_null(core::StandardValidators::get().TIME_PERIOD_VALIDATOR.get())}, + {Configuration::nifi_server_name, gsl::make_not_null(core::StandardValidators::get().VALID_VALIDATOR.get())}, + {Configuration::nifi_configuration_class_name, gsl::make_not_null(core::StandardValidators::get().VALID_VALIDATOR.get())}, + {Configuration::nifi_flow_repository_class_name, gsl::make_not_null(core::StandardValidators::get().VALID_VALIDATOR.get())}, + {Configuration::nifi_flow_repository_rocksdb_compression, gsl::make_not_null(core::StandardValidators::get().VALID_VALIDATOR.get())}, + {Configuration::nifi_content_repository_class_name, gsl::make_not_null(core::StandardValidators::get().VALID_VALIDATOR.get())}, + {Configuration::nifi_content_repository_rocksdb_compression, gsl::make_not_null(core::StandardValidators::get().VALID_VALIDATOR.get())}, + {Configuration::nifi_provenance_repository_class_name, gsl::make_not_null(core::StandardValidators::get().VALID_VALIDATOR.get())}, + {Configuration::nifi_volatile_repository_options_flowfile_max_count, gsl::make_not_null(core::StandardValidators::get().UNSIGNED_INT_VALIDATOR.get())}, + {Configuration::nifi_volatile_repository_options_flowfile_max_bytes, gsl::make_not_null(core::StandardValidators::get().DATA_SIZE_VALIDATOR.get())}, + {Configuration::nifi_volatile_repository_options_provenance_max_count, gsl::make_not_null(core::StandardValidators::get().UNSIGNED_INT_VALIDATOR.get())}, + {Configuration::nifi_volatile_repository_options_provenance_max_bytes, gsl::make_not_null(core::StandardValidators::get().DATA_SIZE_VALIDATOR.get())}, + {Configuration::nifi_volatile_repository_options_content_max_count, gsl::make_not_null(core::StandardValidators::get().UNSIGNED_INT_VALIDATOR.get())}, + {Configuration::nifi_volatile_repository_options_content_max_bytes, gsl::make_not_null(core::StandardValidators::get().DATA_SIZE_VALIDATOR.get())}, + {Configuration::nifi_volatile_repository_options_content_minimal_locking, gsl::make_not_null(core::StandardValidators::get().BOOLEAN_VALIDATOR.get())}, + {Configuration::nifi_server_port, gsl::make_not_null(core::StandardValidators::get().PORT_VALIDATOR.get())}, + {Configuration::nifi_server_report_interval, gsl::make_not_null(core::StandardValidators::get().TIME_PERIOD_VALIDATOR.get())}, + {Configuration::nifi_provenance_repository_max_storage_size, gsl::make_not_null(core::StandardValidators::get().DATA_SIZE_VALIDATOR.get())}, + {Configuration::nifi_provenance_repository_max_storage_time, gsl::make_not_null(core::StandardValidators::get().TIME_PERIOD_VALIDATOR.get())}, + {Configuration::nifi_provenance_repository_directory_default, gsl::make_not_null(core::StandardValidators::get().VALID_VALIDATOR.get())}, + {Configuration::nifi_flowfile_repository_directory_default, gsl::make_not_null(core::StandardValidators::get().VALID_VALIDATOR.get())}, + {Configuration::nifi_dbcontent_repository_directory_default, gsl::make_not_null(core::StandardValidators::get().VALID_VALIDATOR.get())}, + {Configuration::nifi_flowfile_repository_rocksdb_compaction_period, gsl::make_not_null(core::StandardValidators::get().TIME_PERIOD_VALIDATOR.get())}, + {Configuration::nifi_dbcontent_repository_rocksdb_compaction_period, gsl::make_not_null(core::StandardValidators::get().TIME_PERIOD_VALIDATOR.get())}, + {Configuration::nifi_remote_input_secure, gsl::make_not_null(core::StandardValidators::get().BOOLEAN_VALIDATOR.get())}, + {Configuration::nifi_security_need_ClientAuth, gsl::make_not_null(core::StandardValidators::get().BOOLEAN_VALIDATOR.get())}, + {Configuration::nifi_sensitive_props_additional_keys, gsl::make_not_null(core::StandardValidators::get().VALID_VALIDATOR.get())}, + {Configuration::nifi_python_processor_dir, gsl::make_not_null(core::StandardValidators::get().VALID_VALIDATOR.get())}, + {Configuration::nifi_extension_path, gsl::make_not_null(core::StandardValidators::get().VALID_VALIDATOR.get())}, + {Configuration::nifi_security_client_certificate, gsl::make_not_null(core::StandardValidators::get().VALID_VALIDATOR.get())}, + {Configuration::nifi_security_client_private_key, gsl::make_not_null(core::StandardValidators::get().VALID_VALIDATOR.get())}, + {Configuration::nifi_security_client_pass_phrase, gsl::make_not_null(core::StandardValidators::get().VALID_VALIDATOR.get())}, + {Configuration::nifi_security_client_ca_certificate, gsl::make_not_null(core::StandardValidators::get().VALID_VALIDATOR.get())}, + {Configuration::nifi_security_use_system_cert_store, gsl::make_not_null(core::StandardValidators::get().VALID_VALIDATOR.get())}, + {Configuration::nifi_security_windows_cert_store_location, gsl::make_not_null(core::StandardValidators::get().VALID_VALIDATOR.get())}, + {Configuration::nifi_security_windows_server_cert_store, gsl::make_not_null(core::StandardValidators::get().VALID_VALIDATOR.get())}, + {Configuration::nifi_security_windows_client_cert_store, gsl::make_not_null(core::StandardValidators::get().VALID_VALIDATOR.get())}, + {Configuration::nifi_security_windows_client_cert_cn, gsl::make_not_null(core::StandardValidators::get().VALID_VALIDATOR.get())}, + {Configuration::nifi_security_windows_client_cert_key_usage, gsl::make_not_null(core::StandardValidators::get().VALID_VALIDATOR.get())}, + {Configuration::nifi_rest_api_user_name, gsl::make_not_null(core::StandardValidators::get().VALID_VALIDATOR.get())}, + {Configuration::nifi_rest_api_password, gsl::make_not_null(core::StandardValidators::get().VALID_VALIDATOR.get())}, + {Configuration::nifi_c2_enable, gsl::make_not_null(core::StandardValidators::get().BOOLEAN_VALIDATOR.get())}, + {Configuration::nifi_c2_file_watch, gsl::make_not_null(core::StandardValidators::get().VALID_VALIDATOR.get())}, + {Configuration::nifi_c2_flow_id, gsl::make_not_null(core::StandardValidators::get().VALID_VALIDATOR.get())}, + {Configuration::nifi_c2_flow_url, gsl::make_not_null(core::StandardValidators::get().VALID_VALIDATOR.get())}, + {Configuration::nifi_c2_flow_base_url, gsl::make_not_null(core::StandardValidators::get().VALID_VALIDATOR.get())}, + {Configuration::nifi_c2_full_heartbeat, gsl::make_not_null(core::StandardValidators::get().BOOLEAN_VALIDATOR.get())}, + {Configuration::nifi_c2_coap_connector_service, gsl::make_not_null(core::StandardValidators::get().VALID_VALIDATOR.get())}, + {Configuration::nifi_c2_agent_heartbeat_period, gsl::make_not_null(core::StandardValidators::get().UNSIGNED_INT_VALIDATOR.get())}, + {Configuration::nifi_c2_agent_heartbeat_reporter_classes, gsl::make_not_null(core::StandardValidators::get().VALID_VALIDATOR.get())}, + {Configuration::nifi_c2_agent_class, gsl::make_not_null(core::StandardValidators::get().VALID_VALIDATOR.get())}, + {Configuration::nifi_c2_agent_coap_host, gsl::make_not_null(core::StandardValidators::get().VALID_VALIDATOR.get())}, + {Configuration::nifi_c2_agent_coap_port, gsl::make_not_null(core::StandardValidators::get().PORT_VALIDATOR.get())}, + {Configuration::nifi_c2_agent_protocol_class, gsl::make_not_null(core::StandardValidators::get().VALID_VALIDATOR.get())}, + {Configuration::nifi_c2_agent_identifier, gsl::make_not_null(core::StandardValidators::get().VALID_VALIDATOR.get())}, + {Configuration::nifi_c2_agent_identifier_fallback, gsl::make_not_null(core::StandardValidators::get().VALID_VALIDATOR.get())}, + {Configuration::nifi_c2_agent_trigger_classes, gsl::make_not_null(core::StandardValidators::get().VALID_VALIDATOR.get())}, + {Configuration::nifi_c2_root_classes, gsl::make_not_null(core::StandardValidators::get().VALID_VALIDATOR.get())}, + {Configuration::nifi_c2_root_class_definitions, gsl::make_not_null(core::StandardValidators::get().VALID_VALIDATOR.get())}, + {Configuration::nifi_c2_rest_listener_port, gsl::make_not_null(core::StandardValidators::get().LISTEN_PORT_VALIDATOR.get())}, + {Configuration::nifi_c2_rest_listener_cacert, gsl::make_not_null(core::StandardValidators::get().VALID_VALIDATOR.get())}, + {Configuration::nifi_c2_rest_url, gsl::make_not_null(core::StandardValidators::get().VALID_VALIDATOR.get())}, + {Configuration::nifi_c2_rest_url_ack, gsl::make_not_null(core::StandardValidators::get().VALID_VALIDATOR.get())}, + {Configuration::nifi_c2_rest_ssl_context_service, gsl::make_not_null(core::StandardValidators::get().VALID_VALIDATOR.get())}, + {Configuration::nifi_c2_rest_request_encoding, gsl::make_not_null(core::StandardValidators::get().VALID_VALIDATOR.get())}, + {Configuration::nifi_c2_rest_heartbeat_minimize_updates, gsl::make_not_null(core::StandardValidators::get().BOOLEAN_VALIDATOR.get())}, + {Configuration::nifi_c2_mqtt_connector_service, gsl::make_not_null(core::StandardValidators::get().VALID_VALIDATOR.get())}, + {Configuration::nifi_c2_mqtt_heartbeat_topic, gsl::make_not_null(core::StandardValidators::get().VALID_VALIDATOR.get())}, + {Configuration::nifi_c2_mqtt_update_topic, gsl::make_not_null(core::StandardValidators::get().VALID_VALIDATOR.get())}, + {Configuration::nifi_state_storage_local, gsl::make_not_null(core::StandardValidators::get().VALID_VALIDATOR.get())}, + {Configuration::nifi_state_storage_local_old, gsl::make_not_null(core::StandardValidators::get().VALID_VALIDATOR.get())}, + {Configuration::nifi_state_storage_local_class_name, gsl::make_not_null(core::StandardValidators::get().VALID_VALIDATOR.get())}, + {Configuration::nifi_state_storage_local_class_name_old, gsl::make_not_null(core::StandardValidators::get().VALID_VALIDATOR.get())}, + {Configuration::nifi_state_storage_local_always_persist, gsl::make_not_null(core::StandardValidators::get().BOOLEAN_VALIDATOR.get())}, + {Configuration::nifi_state_storage_local_always_persist_old, gsl::make_not_null(core::StandardValidators::get().BOOLEAN_VALIDATOR.get())}, + {Configuration::nifi_state_storage_local_auto_persistence_interval, gsl::make_not_null(core::StandardValidators::get().TIME_PERIOD_VALIDATOR.get())}, + {Configuration::nifi_state_storage_local_auto_persistence_interval_old, gsl::make_not_null(core::StandardValidators::get().TIME_PERIOD_VALIDATOR.get())}, + {Configuration::nifi_state_storage_local_path, gsl::make_not_null(core::StandardValidators::get().VALID_VALIDATOR.get())}, + {Configuration::nifi_state_storage_local_path_old, gsl::make_not_null(core::StandardValidators::get().VALID_VALIDATOR.get())}, + {Configuration::minifi_disk_space_watchdog_enable, gsl::make_not_null(core::StandardValidators::get().BOOLEAN_VALIDATOR.get())}, + {Configuration::minifi_disk_space_watchdog_interval, gsl::make_not_null(core::StandardValidators::get().TIME_PERIOD_VALIDATOR.get())}, + {Configuration::minifi_disk_space_watchdog_stop_threshold, gsl::make_not_null(core::StandardValidators::get().UNSIGNED_LONG_VALIDATOR.get())}, + {Configuration::minifi_disk_space_watchdog_restart_threshold, gsl::make_not_null(core::StandardValidators::get().UNSIGNED_LONG_VALIDATOR.get())}, + {Configuration::nifi_framework_dir, gsl::make_not_null(core::StandardValidators::get().VALID_VALIDATOR.get())}, + {Configuration::nifi_jvm_options, gsl::make_not_null(core::StandardValidators::get().VALID_VALIDATOR.get())}, + {Configuration::nifi_nar_directory, gsl::make_not_null(core::StandardValidators::get().VALID_VALIDATOR.get())}, + {Configuration::nifi_nar_deploy_directory, gsl::make_not_null(core::StandardValidators::get().VALID_VALIDATOR.get())}, + {Configuration::nifi_log_spdlog_pattern, gsl::make_not_null(core::StandardValidators::get().VALID_VALIDATOR.get())}, + {Configuration::nifi_log_spdlog_shorten_names, gsl::make_not_null(core::StandardValidators::get().BOOLEAN_VALIDATOR.get())}, + {Configuration::nifi_log_appender_rolling, gsl::make_not_null(core::StandardValidators::get().VALID_VALIDATOR.get())}, + {Configuration::nifi_log_appender_rolling_directory, gsl::make_not_null(core::StandardValidators::get().VALID_VALIDATOR.get())}, + {Configuration::nifi_log_appender_rolling_file_name, gsl::make_not_null(core::StandardValidators::get().VALID_VALIDATOR.get())}, + {Configuration::nifi_log_appender_rolling_max_files, gsl::make_not_null(core::StandardValidators::get().UNSIGNED_INT_VALIDATOR.get())}, + {Configuration::nifi_log_appender_rolling_max_file_size, gsl::make_not_null(core::StandardValidators::get().DATA_SIZE_VALIDATOR.get())}, + {Configuration::nifi_log_appender_stdout, gsl::make_not_null(core::StandardValidators::get().VALID_VALIDATOR.get())}, + {Configuration::nifi_log_appender_stderr, gsl::make_not_null(core::StandardValidators::get().VALID_VALIDATOR.get())}, + {Configuration::nifi_log_appender_null, gsl::make_not_null(core::StandardValidators::get().VALID_VALIDATOR.get())}, + {Configuration::nifi_log_appender_syslog, gsl::make_not_null(core::StandardValidators::get().VALID_VALIDATOR.get())}, + {Configuration::nifi_log_logger_root, gsl::make_not_null(core::StandardValidators::get().VALID_VALIDATOR.get())}, + {Configuration::nifi_log_compression_cached_log_max_size, gsl::make_not_null(core::StandardValidators::get().DATA_SIZE_VALIDATOR.get())}, + {Configuration::nifi_log_compression_compressed_log_max_size, gsl::make_not_null(core::StandardValidators::get().DATA_SIZE_VALIDATOR.get())}, + {Configuration::nifi_log_alert_url, gsl::make_not_null(core::StandardValidators::get().VALID_VALIDATOR.get())}, + {Configuration::nifi_log_alert_ssl_context_service, gsl::make_not_null(core::StandardValidators::get().VALID_VALIDATOR.get())}, + {Configuration::nifi_log_alert_batch_size, gsl::make_not_null(core::StandardValidators::get().VALID_VALIDATOR.get())}, + {Configuration::nifi_log_alert_flush_period, gsl::make_not_null(core::StandardValidators::get().TIME_PERIOD_VALIDATOR.get())}, + {Configuration::nifi_log_alert_filter, gsl::make_not_null(core::StandardValidators::get().VALID_VALIDATOR.get())}, + {Configuration::nifi_log_alert_rate_limit, gsl::make_not_null(core::StandardValidators::get().VALID_VALIDATOR.get())}, + {Configuration::nifi_log_alert_buffer_limit, gsl::make_not_null(core::StandardValidators::get().VALID_VALIDATOR.get())}, + {Configuration::nifi_log_alert_level, gsl::make_not_null(core::StandardValidators::get().VALID_VALIDATOR.get())}, + {Configuration::nifi_asset_directory, gsl::make_not_null(core::StandardValidators::get().VALID_VALIDATOR.get())}, + {Configuration::nifi_metrics_publisher_agent_identifier, gsl::make_not_null(core::StandardValidators::get().VALID_VALIDATOR.get())}, + {Configuration::nifi_metrics_publisher_class, gsl::make_not_null(core::StandardValidators::get().VALID_VALIDATOR.get())}, + {Configuration::nifi_metrics_publisher_prometheus_metrics_publisher_port, gsl::make_not_null(core::StandardValidators::get().PORT_VALIDATOR.get())}, + {Configuration::nifi_metrics_publisher_metrics, gsl::make_not_null(core::StandardValidators::get().VALID_VALIDATOR.get())}, + {Configuration::controller_socket_enable, gsl::make_not_null(core::StandardValidators::get().BOOLEAN_VALIDATOR.get())}, + {Configuration::controller_socket_local_any_interface, gsl::make_not_null(core::StandardValidators::get().BOOLEAN_VALIDATOR.get())}, + {Configuration::controller_socket_host, gsl::make_not_null(core::StandardValidators::get().VALID_VALIDATOR.get())}, + {Configuration::controller_socket_port, gsl::make_not_null(core::StandardValidators::get().PORT_VALIDATOR.get())}, + {Configuration::controller_ssl_context_service, gsl::make_not_null(core::StandardValidators::get().VALID_VALIDATOR.get())} }; const std::array<const char*, 2> Configuration::DEFAULT_SENSITIVE_PROPERTIES = {Configuration::nifi_security_client_pass_phrase, @@ -184,12 +184,11 @@ std::vector<std::string> Configuration::getSensitiveProperties(const std::functi } bool Configuration::validatePropertyValue(const std::string& property_name, const std::string& property_value) { - for (const auto& config_property: Configuration::CONFIGURATION_PROPERTIES) { - if (config_property.name == property_name) { - return config_property.validator->validate(property_name, property_value).valid(); - } - } - return true; + const auto validator = Configuration::CONFIGURATION_PROPERTIES.find(property_name); + if (validator == std::end(Configuration::CONFIGURATION_PROPERTIES)) + return true; + + return validator->second->validate(property_name, property_value).valid(); } } // namespace org::apache::nifi::minifi diff --git a/libminifi/src/FlowController.cpp b/libminifi/src/FlowController.cpp index 20b159fe8..896dd88d9 100644 --- a/libminifi/src/FlowController.cpp +++ b/libminifi/src/FlowController.cpp @@ -208,10 +208,10 @@ int16_t FlowController::stop() { * @param timeToWaitMs Maximum time to wait before manually * marking running as false. */ -void FlowController::waitUnload(const uint64_t timeToWaitMs) { +void FlowController::waitUnload(const std::chrono::milliseconds time_to_wait) { if (running_) { // use the current time and increment with the provided argument. - std::chrono::system_clock::time_point wait_time = std::chrono::system_clock::now() + std::chrono::milliseconds(timeToWaitMs); + std::chrono::system_clock::time_point wait_time = std::chrono::system_clock::now() + time_to_wait; // create an asynchronous future. std::future<void> unload_task = std::async(std::launch::async, [this]() {stop();}); diff --git a/libminifi/src/c2/C2Agent.cpp b/libminifi/src/c2/C2Agent.cpp index f3038de6d..434faf16b 100644 --- a/libminifi/src/c2/C2Agent.cpp +++ b/libminifi/src/c2/C2Agent.cpp @@ -179,17 +179,18 @@ void C2Agent::configure(const std::shared_ptr<Configure> &configure, bool reconf try { if (auto heartbeat_period_ms = utils::timeutils::StringToDuration<std::chrono::milliseconds>(heartbeat_period)) { heart_beat_period_ = *heartbeat_period_ms; - logger_->log_debug("Using %u ms as the heartbeat period", heart_beat_period_.count()); } else { heart_beat_period_ = std::chrono::milliseconds(std::stoi(heartbeat_period)); } } catch (const std::invalid_argument &) { + logger_->log_error("Invalid heartbeat period: %s", heartbeat_period); heart_beat_period_ = 3s; } } else { if (!reconfigure) heart_beat_period_ = 3s; } + logger_->log_debug("Using %" PRId64 " ms as the heartbeat period", heart_beat_period_.count()); std::string heartbeat_reporters; if (configure->get(Configuration::nifi_c2_agent_heartbeat_reporter_classes, "c2.agent.heartbeat.reporter.classes", heartbeat_reporters)) { diff --git a/libminifi/src/c2/C2MetricsPublisher.cpp b/libminifi/src/c2/C2MetricsPublisher.cpp index 8f71e1f38..bc9cd8ad1 100644 --- a/libminifi/src/c2/C2MetricsPublisher.cpp +++ b/libminifi/src/c2/C2MetricsPublisher.cpp @@ -165,9 +165,11 @@ std::optional<state::response::NodeReporter::ReportedNode> C2MetricsPublisher::g std::vector<state::response::NodeReporter::ReportedNode> C2MetricsPublisher::getHeartbeatNodes(bool include_manifest) const { gsl_Expects(configuration_); - std::string fullHb{"true"}; - configuration_->get(minifi::Configuration::nifi_c2_full_heartbeat, fullHb); - const bool include = include_manifest || fullHb == "true"; + bool full_heartbeat = configuration_->get(minifi::Configuration::nifi_c2_full_heartbeat) + | utils::flatMap(utils::StringUtils::toBool) + | utils::valueOrElse([] {return true;}); + + bool include = include_manifest || full_heartbeat; std::vector<state::response::NodeReporter::ReportedNode> reported_nodes; std::lock_guard<std::mutex> guard{metrics_mutex_}; diff --git a/libminifi/src/core/logging/LoggerConfiguration.cpp b/libminifi/src/core/logging/LoggerConfiguration.cpp index 07709dde7..78199b214 100644 --- a/libminifi/src/core/logging/LoggerConfiguration.cpp +++ b/libminifi/src/core/logging/LoggerConfiguration.cpp @@ -378,11 +378,7 @@ std::shared_ptr<spdlog::sinks::rotating_file_sink_mt> LoggerConfiguration::getRo int max_file_size = 5_MiB; std::string max_file_size_str; if (properties->getString(appender_key + ".max_file_size", max_file_size_str)) { - try { - max_file_size = std::stoi(max_file_size_str); - } catch (const std::invalid_argument &) { - } catch (const std::out_of_range &) { - } + core::DataSizeValue::StringToInt(max_file_size_str, max_file_size); } std::lock_guard<std::mutex> guard(sink_map_mtx); diff --git a/libminifi/src/core/state/nodes/SupportedOperations.cpp b/libminifi/src/core/state/nodes/SupportedOperations.cpp index e3469e1d5..8e8b49f0c 100644 --- a/libminifi/src/core/state/nodes/SupportedOperations.cpp +++ b/libminifi/src/core/state/nodes/SupportedOperations.cpp @@ -18,7 +18,9 @@ #include "core/state/nodes/SupportedOperations.h" #include "core/PropertyBuilder.h" #include "core/Resource.h" -#include "range/v3/algorithm/find.hpp" +#include "range/v3/algorithm/contains.hpp" +#include "range/v3/view/filter.hpp" +#include "range/v3/view/view.hpp" namespace org::apache::nifi::minifi::state::response { @@ -65,22 +67,23 @@ void SupportedOperations::addProperty(SerializedResponseNode& properties, const SupportedOperations::Metadata SupportedOperations::buildUpdatePropertiesMetadata() const { std::vector<std::unordered_map<std::string, std::string>> supported_config_updates; - for (const auto& config_property : Configuration::CONFIGURATION_PROPERTIES) { - auto sensitive_properties = Configuration::getSensitiveProperties(configuration_reader_); - if (ranges::find(sensitive_properties, config_property.name) != ranges::end(sensitive_properties)) { - continue; - } - if (!update_policy_controller_ || update_policy_controller_->canUpdate(std::string(config_property.name))) { - std::unordered_map<std::string, std::string> property; - property.emplace("propertyName", config_property.name); - property.emplace("validator", config_property.validator->getName()); - if (configuration_reader_) { - if (auto property_value = configuration_reader_(std::string(config_property.name))) { - property.emplace("propertyValue", *property_value); - } + auto sensitive_properties = Configuration::getSensitiveProperties(configuration_reader_); + auto updatable_not_sensitive_configuration_properties = minifi::Configuration::CONFIGURATION_PROPERTIES | ranges::views::filter([&](const auto& configuration_property) { + const auto& configuration_property_name = configuration_property.first; + return !ranges::contains(sensitive_properties, configuration_property_name) + && (!update_policy_controller_ || update_policy_controller_->canUpdate(std::string(configuration_property_name))); + }); + + for (const auto& [config_property_name, config_property_validator] : updatable_not_sensitive_configuration_properties) { + std::unordered_map<std::string, std::string> property; + property.emplace("propertyName", config_property_name); + property.emplace("validator", config_property_validator->getName()); + if (configuration_reader_) { + if (auto property_value = configuration_reader_(std::string(config_property_name))) { + property.emplace("propertyValue", *property_value); } - supported_config_updates.push_back(property); } + supported_config_updates.push_back(property); } Metadata available_properties; available_properties.emplace("availableProperties", supported_config_updates); diff --git a/libminifi/src/properties/Properties.cpp b/libminifi/src/properties/Properties.cpp index 56951c2dc..9b694b3a0 100644 --- a/libminifi/src/properties/Properties.cpp +++ b/libminifi/src/properties/Properties.cpp @@ -24,6 +24,7 @@ #include "utils/file/PathUtils.h" #include "core/logging/LoggerConfiguration.h" #include "properties/PropertiesFile.h" +#include "properties/Configuration.h" namespace org::apache::nifi::minifi { @@ -62,8 +63,103 @@ int Properties::getInt(const std::string &key, int default_value) const { return it != properties_.end() ? std::stoi(it->second.active_value) : default_value; } +namespace { +const core::PropertyValidator* getValidator(const std::string& lookup_value) { + auto configuration_property = Configuration::CONFIGURATION_PROPERTIES.find(lookup_value); + + if (configuration_property != Configuration::CONFIGURATION_PROPERTIES.end()) + return configuration_property->second; + return nullptr; +} + +std::optional<std::string> ensureTimePeriodValidatedPropertyHasExplicitUnit(const core::PropertyValidator* const validator, std::string& value) { + if (validator != core::StandardValidators::get().TIME_PERIOD_VALIDATOR.get()) + return std::nullopt; + if (value.empty() || !std::all_of(value.begin(), value.end(), [](unsigned char c){ return ::isdigit(c); })) + return std::nullopt; + + return value + " ms"; +} + +std::optional<std::string> ensureDataSizeValidatedPropertyHasExplicitUnit(const core::PropertyValidator* const validator, std::string& value) { + if (validator != core::StandardValidators::get().DATA_SIZE_VALIDATOR.get()) + return std::nullopt; + if (value.empty() || !std::all_of(value.begin(), value.end(), [](unsigned char c){ return ::isdigit(c); })) + return std::nullopt; + + return value + " B"; +} + +bool integerValidatedProperty(const core::PropertyValidator* const validator) { + return validator == core::StandardValidators::get().INTEGER_VALIDATOR.get() + || validator == core::StandardValidators::get().UNSIGNED_INT_VALIDATOR.get() + || validator == core::StandardValidators::get().LONG_VALIDATOR.get() + || validator == core::StandardValidators::get().UNSIGNED_LONG_VALIDATOR.get(); +} + +std::optional<int64_t> stringToDataSize(std::string_view input) { + int64_t value; + std::string unit_str; + if (!utils::StringUtils::splitToValueAndUnit(input, value, unit_str)) { + return std::nullopt; + } + + if (auto unit_multiplier = core::DataSizeValue::getUnitMultiplier(unit_str)) { + return value * *unit_multiplier; + } + return std::nullopt; +} + +std::optional<std::string> ensureIntegerValidatedPropertyHasNoUnit(const core::PropertyValidator* const validator, std::string& value) { + if (!integerValidatedProperty(validator)) { + return std::nullopt; + } + + if (auto parsed_time = utils::timeutils::StringToDuration<std::chrono::milliseconds>(value)) { + return fmt::format("{}", parsed_time->count()); + } + + if (auto parsed_data_size = stringToDataSize(value)) { + return fmt::format("{}", *parsed_data_size); + } + + return std::nullopt; +} + +void fixValidatedProperty(const std::string& property_name, + std::string& persisted_value, + std::string& value, + bool& needs_to_persist_new_value, + core::logging::Logger& logger) { + auto validator = getValidator(property_name); + if (!validator) + return; + + auto fixed_property_value = ensureTimePeriodValidatedPropertyHasExplicitUnit(validator, value) + | utils::valueOrElse([&] { return ensureDataSizeValidatedPropertyHasExplicitUnit(validator, value);}) + | utils::valueOrElse([&] { return ensureIntegerValidatedPropertyHasNoUnit(validator, value);}); + + if (!fixed_property_value) { + return; + } + + if (persisted_value == value) { + logger.log_info("Changed validated property from %s to %s, this change will be persisted", value, *fixed_property_value); + value = *fixed_property_value; + persisted_value = value; + needs_to_persist_new_value = true; + } else { + logger.log_info("Changed validated property from %s to %s, this change won't be persisted", value, *fixed_property_value); + value = *fixed_property_value; + needs_to_persist_new_value = false; + } +} +} // namespace + // Load Configure File -void Properties::loadConfigureFile(const std::filesystem::path& configuration_file) { +// If the loaded property is time-period or data-size validated and it has no explicit units ms or B will be appended. +// If the loaded property is integer validated and it has some explicit unit(time-period or data-size) it will be converted to ms/B and its unit cut off +void Properties::loadConfigureFile(const std::filesystem::path& configuration_file, std::string_view prefix) { std::lock_guard<std::mutex> lock(mutex_); if (configuration_file.empty()) { logger_->log_error("Configuration file path for %s is empty!", getName()); @@ -87,13 +183,17 @@ void Properties::loadConfigureFile(const std::filesystem::path& configuration_fi return; } properties_.clear(); + dirty_ = false; for (const auto& line : PropertiesFile{file}) { + auto key = line.getKey(); auto persisted_value = line.getValue(); auto value = utils::StringUtils::replaceEnvironmentVariables(persisted_value); - properties_[line.getKey()] = {persisted_value, value, false}; + bool need_to_persist_new_value = false; + fixValidatedProperty(std::string(prefix) + key, persisted_value, value, need_to_persist_new_value, *logger_); + dirty_ = dirty_ || need_to_persist_new_value; + properties_[key] = {persisted_value, value, need_to_persist_new_value}; } checksum_calculator_.setFileLocation(properties_file_); - dirty_ = false; } std::filesystem::path Properties::getFilePath() const { diff --git a/libminifi/src/utils/StringUtils.cpp b/libminifi/src/utils/StringUtils.cpp index 25215e97d..84b7a531b 100644 --- a/libminifi/src/utils/StringUtils.cpp +++ b/libminifi/src/utils/StringUtils.cpp @@ -16,9 +16,7 @@ */ #include <limits> - -#include "range/v3/view/transform.hpp" -#include "range/v3/range/conversion.hpp" +#include <charconv> #include "utils/Environment.h" #include "utils/GeneralUtils.h" @@ -26,8 +24,6 @@ namespace org::apache::nifi::minifi::utils { -namespace views = ranges::views; - std::optional<bool> StringUtils::toBool(const std::string& input) { std::string trimmed = trim(input); if (equalsIgnoreCase(trimmed, "true")) { @@ -39,11 +35,6 @@ std::optional<bool> StringUtils::toBool(const std::string& input) { return std::nullopt; } -std::string StringUtils::toLower(std::string_view str) { - const auto tolower = [](auto c) { return std::tolower(static_cast<unsigned char>(c)); }; - return str | views::transform(tolower) | ranges::to<std::string>(); -} - std::pair<std::string, std::string> StringUtils::chomp(const std::string& input_line) { if (endsWith(input_line, "\r\n")) { return std::make_pair(input_line.substr(0, input_line.size() - 2), "\r\n"); @@ -510,4 +501,20 @@ bool StringUtils::matchesSequence(std::string_view str, const std::vector<std::s return true; } +bool StringUtils::splitToValueAndUnit(std::string_view input, int64_t& value, std::string& unit) { + const char* begin = input.data(); + const char* end = begin + input.size(); + auto [ptr, ec] = std::from_chars(begin, end, value); + if (ptr == begin || ec != std::errc()) { + return false; + } + + while (ptr != end && *ptr == ' ') { + // Skip the spaces + ptr++; + } + unit = std::string(ptr, end); + return true; +} + } // namespace org::apache::nifi::minifi::utils diff --git a/libminifi/test/integration/ProvenanceReportingTest.cpp b/libminifi/test/integration/ProvenanceReportingTest.cpp index 91953c41f..6b2fecacf 100644 --- a/libminifi/test/integration/ProvenanceReportingTest.cpp +++ b/libminifi/test/integration/ProvenanceReportingTest.cpp @@ -72,7 +72,7 @@ int main(int argc, char **argv) { assert(verifyLogLinePresenceInPollTime(std::chrono::milliseconds(std::chrono::seconds(2)), "Add processor SiteToSiteProvenanceReportingTask into process group MiNiFi Flow")); - controller->waitUnload(60000); + controller->waitUnload(60s); LogTestController::getInstance().reset(); rmdir("./content_repository"); rmdir("/tmp/aljs39/"); diff --git a/libminifi/test/unit/ProvenanceTestHelper.h b/libminifi/test/unit/ProvenanceTestHelper.h index b60d0a890..f2d04ed70 100644 --- a/libminifi/test/unit/ProvenanceTestHelper.h +++ b/libminifi/test/unit/ProvenanceTestHelper.h @@ -214,7 +214,7 @@ class TestFlowController : public org::apache::nifi::minifi::FlowController { return 0; } - void waitUnload(const uint64_t /*timeToWaitMs*/) override { + void waitUnload(const std::chrono::milliseconds /*time_to_wait*/) override { stop(); } diff --git a/libminifi/test/unit/StringUtilsTests.cpp b/libminifi/test/unit/StringUtilsTests.cpp index 1f12032f6..2b64b5af5 100644 --- a/libminifi/test/unit/StringUtilsTests.cpp +++ b/libminifi/test/unit/StringUtilsTests.cpp @@ -556,4 +556,42 @@ TEST_CASE("StringUtils::matchesSequence works correctly", "[matchesSequence]") { REQUIRE(!StringUtils::matchesSequence("xxxabcxxxdefxxx", {"abc", "abc", "def"})); } +TEST_CASE("StringUtils::toLower and toUpper tests") { + CHECK(StringUtils::toUpper("Lorem ipsum dolor sit amet") == "LOREM IPSUM DOLOR SIT AMET"); + CHECK(StringUtils::toLower("Lorem ipsum dolor sit amet") == "lorem ipsum dolor sit amet"); + + CHECK(StringUtils::toUpper("SuSpenDISse") == "SUSPENDISSE"); + CHECK(StringUtils::toLower("SuSpenDISse") == "suspendisse"); +} + +TEST_CASE("StringUtils::splitToValueAndUnit tests") { + int64_t value; + std::string unit_str; + SECTION("Simple case") { + CHECK(StringUtils::splitToValueAndUnit("1 horse", value, unit_str)); + CHECK(value == 1); + CHECK(unit_str == "horse"); + } + + SECTION("Without whitespace") { + CHECK(StringUtils::splitToValueAndUnit("112KiB", value, unit_str)); + CHECK(value == 112); + CHECK(unit_str == "KiB"); + } + + SECTION("Additional whitespace in the middle") { + CHECK(StringUtils::splitToValueAndUnit("100 hOrSe", value, unit_str)); + CHECK(value == 100); + CHECK(unit_str == "hOrSe"); + } + + SECTION("Invalid value") { + CHECK_FALSE(StringUtils::splitToValueAndUnit("one horse", value, unit_str)); + } + + SECTION("Empty string") { + CHECK_FALSE(StringUtils::splitToValueAndUnit("", value, unit_str)); + } +} + // NOLINTEND(readability-container-size-empty) diff --git a/libminifi/test/unit/TimeUtilTests.cpp b/libminifi/test/unit/TimeUtilTests.cpp index ef9d9ca37..48753efd2 100644 --- a/libminifi/test/unit/TimeUtilTests.cpp +++ b/libminifi/test/unit/TimeUtilTests.cpp @@ -131,37 +131,37 @@ TEST_CASE("Test string to duration conversion", "[timedurationtests]") { CHECK(one_hour.value() == 1h); CHECK(one_hour.value() == 3600s); - REQUIRE(StringToDuration<std::chrono::milliseconds>("1 hour")); - REQUIRE(StringToDuration<std::chrono::seconds>("102 hours") == 102h); - REQUIRE(StringToDuration<std::chrono::days>("102 hours") == std::chrono::days(4)); - REQUIRE(StringToDuration<std::chrono::milliseconds>("5 ns") == 0ms); - - REQUIRE(StringToDuration<std::chrono::seconds>("1d") == std::chrono::days(1)); - REQUIRE(StringToDuration<std::chrono::seconds>("10 days") == std::chrono::days(10)); - REQUIRE(StringToDuration<std::chrono::seconds>("100ms") == 0ms); - REQUIRE(StringToDuration<std::chrono::seconds>("20 us") == 0s); - REQUIRE(StringToDuration<std::chrono::seconds>("1ns") == 0ns); - REQUIRE(StringToDuration<std::chrono::seconds>("1min") == 1min); - REQUIRE(StringToDuration<std::chrono::seconds>("1 hour") == 1h); - REQUIRE(StringToDuration<std::chrono::seconds>("100 SEC") == 100s); - REQUIRE(StringToDuration<std::chrono::seconds>("10 ms") == 0ms); - REQUIRE(StringToDuration<std::chrono::seconds>("100 ns") == 0ns); - REQUIRE(StringToDuration<std::chrono::seconds>("1 minute") == 1min); - - REQUIRE(StringToDuration<std::chrono::nanoseconds>("1d") == std::chrono::days(1)); - REQUIRE(StringToDuration<std::chrono::nanoseconds>("10 days") == std::chrono::days(10)); - REQUIRE(StringToDuration<std::chrono::nanoseconds>("100ms") == 100ms); - REQUIRE(StringToDuration<std::chrono::nanoseconds>("20 us") == 20us); - REQUIRE(StringToDuration<std::chrono::nanoseconds>("1ns") == 1ns); - REQUIRE(StringToDuration<std::chrono::nanoseconds>("1min") == 1min); - REQUIRE(StringToDuration<std::chrono::nanoseconds>("1 hour") == 1h); - REQUIRE(StringToDuration<std::chrono::nanoseconds>("100 SEC") == 100s); - REQUIRE(StringToDuration<std::chrono::nanoseconds>("10 ms") == 10ms); - REQUIRE(StringToDuration<std::chrono::nanoseconds>("100 ns") == 100ns); - REQUIRE(StringToDuration<std::chrono::nanoseconds>("1 minute") == 1min); - - REQUIRE_FALSE(StringToDuration<std::chrono::seconds>("5 apples") == 1s); - REQUIRE_FALSE(StringToDuration<std::chrono::seconds>("1 year") == 1s); + CHECK(StringToDuration<std::chrono::milliseconds>("1 hour")); + CHECK(StringToDuration<std::chrono::seconds>("102 hours") == 102h); + CHECK(StringToDuration<std::chrono::days>("102 hours") == std::chrono::days(4)); + CHECK(StringToDuration<std::chrono::milliseconds>("5 ns") == 0ms); + + CHECK(StringToDuration<std::chrono::seconds>("1d") == std::chrono::days(1)); + CHECK(StringToDuration<std::chrono::seconds>("10 days") == std::chrono::days(10)); + CHECK(StringToDuration<std::chrono::seconds>("100ms") == 0ms); + CHECK(StringToDuration<std::chrono::seconds>("20 us") == 0s); + CHECK(StringToDuration<std::chrono::seconds>("1ns") == 0ns); + CHECK(StringToDuration<std::chrono::seconds>("1min") == 1min); + CHECK(StringToDuration<std::chrono::seconds>("1 hour") == 1h); + CHECK(StringToDuration<std::chrono::seconds>("100 SEC") == 100s); + CHECK(StringToDuration<std::chrono::seconds>("10 ms") == 0ms); + CHECK(StringToDuration<std::chrono::seconds>("100 ns") == 0ns); + CHECK(StringToDuration<std::chrono::seconds>("1 minute") == 1min); + + CHECK(StringToDuration<std::chrono::nanoseconds>("1d") == std::chrono::days(1)); + CHECK(StringToDuration<std::chrono::nanoseconds>("10 days") == std::chrono::days(10)); + CHECK(StringToDuration<std::chrono::nanoseconds>("100ms") == 100ms); + CHECK(StringToDuration<std::chrono::nanoseconds>("20 us") == 20us); + CHECK(StringToDuration<std::chrono::nanoseconds>("1ns") == 1ns); + CHECK(StringToDuration<std::chrono::nanoseconds>("1min") == 1min); + CHECK(StringToDuration<std::chrono::nanoseconds>("1 hour") == 1h); + CHECK(StringToDuration<std::chrono::nanoseconds>("100 SEC") == 100s); + CHECK(StringToDuration<std::chrono::nanoseconds>("10 ms") == 10ms); + CHECK(StringToDuration<std::chrono::nanoseconds>("100 ns") == 100ns); + CHECK(StringToDuration<std::chrono::nanoseconds>("1 minute") == 1min); + + CHECK(StringToDuration<std::chrono::seconds>("5 apples") == std::nullopt); + CHECK(StringToDuration<std::chrono::seconds>("1 year") == std::nullopt); } namespace { diff --git a/minifi_main/MiNiFiMain.cpp b/minifi_main/MiNiFiMain.cpp index 0c1cc4915..29476495c 100644 --- a/minifi_main/MiNiFiMain.cpp +++ b/minifi_main/MiNiFiMain.cpp @@ -231,8 +231,6 @@ int main(int argc, char **argv) { return -1; } - uint16_t stop_wait_time = STOP_WAIT_TIME_MS; - std::string graceful_shutdown_seconds; std::string prov_repo_class = "provenancerepository"; std::string flow_repo_class = "flowfilerepository"; @@ -241,7 +239,8 @@ int main(int argc, char **argv) { auto log_properties = std::make_shared<core::logging::LoggerProperties>(); log_properties->setHome(minifiHome); - log_properties->loadConfigureFile(DEFAULT_LOG_PROPERTIES_FILE); + log_properties->loadConfigureFile(DEFAULT_LOG_PROPERTIES_FILE, "nifi.log."); + core::logging::LoggerConfiguration::getConfiguration().initialize(log_properties); std::shared_ptr<minifi::Properties> uid_properties = std::make_shared<minifi::Properties>("UID properties"); @@ -308,20 +307,10 @@ int main(int argc, char **argv) { std::exit(0); } - if (configure->get(minifi::Configure::nifi_graceful_shutdown_seconds, graceful_shutdown_seconds)) { - try { - stop_wait_time = std::stoi(graceful_shutdown_seconds); - } - catch (const std::out_of_range& e) { - logger->log_error("%s is out of range. %s", minifi::Configure::nifi_graceful_shutdown_seconds, e.what()); - } - catch (const std::invalid_argument& e) { - logger->log_error("%s contains an invalid argument set. %s", minifi::Configure::nifi_graceful_shutdown_seconds, e.what()); - } - } else { - logger->log_debug("%s not set, defaulting to %d", minifi::Configure::nifi_graceful_shutdown_seconds, - STOP_WAIT_TIME_MS); - } + std::chrono::milliseconds stop_wait_time = configure->get(minifi::Configure::nifi_graceful_shutdown_seconds) + | utils::flatMap(utils::timeutils::StringToDuration<std::chrono::milliseconds>) + | utils::valueOrElse([] { return std::chrono::milliseconds(STOP_WAIT_TIME_MS);}); + configure->get(minifi::Configure::nifi_provenance_repository_class_name, prov_repo_class); // Create repos for flow record and provenance @@ -387,7 +376,10 @@ int main(int argc, char **argv) { const auto controller = std::make_unique<minifi::FlowController>( prov_repo, flow_repo, configure, std::move(flow_configuration), content_repo, std::move(metrics_publisher_store), filesystem, request_restart); - const bool disk_space_watchdog_enable = (configure->get(minifi::Configure::minifi_disk_space_watchdog_enable) | utils::map([](const std::string& v) { return v == "true"; })).value_or(true); + const bool disk_space_watchdog_enable = configure->get(minifi::Configure::minifi_disk_space_watchdog_enable) + | utils::flatMap(utils::StringUtils::toBool) + | utils::valueOrElse([] { return true; }); + std::unique_ptr<utils::CallBackTimer> disk_space_watchdog; if (disk_space_watchdog_enable) { try {
