This is an automated email from the ASF dual-hosted git repository. fgerlits pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git
commit 9bd409b5f19c63a945af617b4516c95e6c3db3c9 Author: Adam Debreceni <[email protected]> AuthorDate: Thu Feb 3 12:30:35 2022 +0100 MINIFICPP-1748 - Make log properties configurable through c2 protocol Signed-off-by: Ferenc Gerlits <[email protected]> This closes #1259 --- extensions/http-curl/tests/C2DebugBundleTest.cpp | 10 +- .../http-curl/tests/C2PropertiesUpdateTests.cpp | 195 +++++++++++++++++++++ extensions/http-curl/tests/CMakeLists.txt | 1 + extensions/http-curl/tests/EmptyFlow.h | 28 +++ libminifi/include/c2/C2Agent.h | 2 +- libminifi/include/core/Core.h | 19 +- .../include/core/logging/LoggerConfiguration.h | 2 + .../core/logging/internal/LogCompressorSink.h | 8 + libminifi/include/properties/Configure.h | 21 ++- libminifi/include/properties/Properties.h | 37 +++- libminifi/include/utils/StagingQueue.h | 4 + libminifi/src/Configure.cpp | 26 +++ libminifi/src/c2/C2Agent.cpp | 18 +- libminifi/src/core/logging/LoggerConfiguration.cpp | 93 ++++++---- .../core/logging/internal/CompressionManager.cpp | 18 +- libminifi/src/properties/Properties.cpp | 18 +- libminifi/src/utils/crypto/EncryptionManager.cpp | 2 +- libminifi/test/TestBase.cpp | 10 +- libminifi/test/TestBase.h | 67 ++++--- main/MiNiFiMain.cpp | 2 +- 20 files changed, 450 insertions(+), 131 deletions(-) diff --git a/extensions/http-curl/tests/C2DebugBundleTest.cpp b/extensions/http-curl/tests/C2DebugBundleTest.cpp index 954b524..29d8814 100644 --- a/extensions/http-curl/tests/C2DebugBundleTest.cpp +++ b/extensions/http-curl/tests/C2DebugBundleTest.cpp @@ -26,6 +26,7 @@ #include "HTTPIntegrationBase.h" #include "HTTPHandlers.h" #include "io/ArchiveStream.h" +#include "EmptyFlow.h" using std::literals::chrono_literals::operator""s; @@ -133,14 +134,7 @@ class C2HeartbeatHandler : public ServerAwareHandler { }; static std::string properties_file = "some.dummy.content = here\n"; -static std::string flow_config_file = R"( - Flow Controller: - name: Banana Bread - Processors: [] - Connections: [] - Remote Processing Groups: [] - Provenance Reporting: -)"; +static std::string flow_config_file = empty_flow; int main() { TestController controller; diff --git a/extensions/http-curl/tests/C2PropertiesUpdateTests.cpp b/extensions/http-curl/tests/C2PropertiesUpdateTests.cpp new file mode 100644 index 0000000..36812d0 --- /dev/null +++ b/extensions/http-curl/tests/C2PropertiesUpdateTests.cpp @@ -0,0 +1,195 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#undef NDEBUG +#include "HTTPIntegrationBase.h" +#include "HTTPHandlers.h" +#include "utils/gsl.h" +#include "utils/IntegrationTestUtils.h" +#include "EmptyFlow.h" +#include "spdlog/spdlog.h" +#include "spdlog/sinks/stdout_sinks.h" +#include "spdlog/sinks/ostream_sink.h" +#include "spdlog/sinks/dist_sink.h" +#include "LogUtils.h" +#include "properties/PropertiesFile.h" + +struct PropertyChange { + std::string name; + std::string value; + bool persist; +}; + +class C2HeartbeatHandler : public ServerAwareHandler { + public: + bool handlePost(CivetServer* /*server*/, struct mg_connection *conn) override { + if (response_) { + mg_printf(conn, "HTTP/1.1 200 OK\r\nContent-Type: " + "text/plain\r\nContent-Length: %lu\r\nConnection: close\r\n\r\n", + response_->length()); + mg_printf(conn, "%s", response_->c_str()); + response_.reset(); + } else { + mg_printf(conn, "HTTP/1.1 200 OK\r\nContent-Type: " + "text/plain\r\nContent-Length: 0\r\nConnection: close\r\n\r\n"); + } + + return true; + } + + void setProperties(const std::vector<PropertyChange>& changes) { + std::vector<std::string> fields; + for (const auto& change : changes) { + fields.push_back(fmt::format(R"("{}": {{"value": "{}", "persist": {}}})", change.name, change.value, change.persist)); + } + response_ = + R"({ + "operation" : "heartbeat", + "requested_operations": [{ + "operation" : "update", + "operationid" : "79", + "name": "properties", + "args": {)" + + utils::StringUtils::join(", ", fields) + + R"(} + }] + })"; + } + + private: + std::optional<std::string> response_; +}; + +class VerifyPropertyUpdate : public HTTPIntegrationBase { + public: + explicit VerifyPropertyUpdate(std::function<void()> fn) : fn_(std::move(fn)) {} + + void testSetup() {} + + void runAssertions() { + fn_(); + } + + std::function<void()> fn_; +}; + +static const std::string properties_file = + "nifi.property.one=tree\n" + "nifi.c2.agent.protocol.class=RESTSender\n" + "nifi.c2.enable=true\n" + "nifi.c2.agent.class=test\n" + "nifi.c2.agent.heartbeat.period=100\n"; + +static const std::string log_properties_file = + "logger.root=INFO,ostream\n"; + +using std::literals::chrono_literals::operator""s; + +struct DummyClass1 {}; +struct DummyClass2 {}; +namespace test { +struct DummyClass3 {}; +} // namespace test + +struct ConfigTestAccessor { + static void call_setLoggerProperties(const std::shared_ptr<minifi::Configure>& config, std::shared_ptr<core::logging::LoggerProperties> props) { + config->setLoggerProperties(props); + } +}; + +int main() { + TempDirectory tmp_dir; + + std::filesystem::path home_dir = tmp_dir.getPath(); + + utils::file::PathUtils::create_dir((home_dir / "conf").string()); + std::ofstream{home_dir / "conf/minifi.properties"} << properties_file; + std::ofstream{home_dir / "conf/minifi-log.properties"} << log_properties_file; + std::ofstream{home_dir / "conf/config.yml"} << empty_flow; + + C2HeartbeatHandler hb_handler{}; + C2AcknowledgeHandler ack_handler{}; + + auto logger_properties = std::make_shared<core::logging::LoggerProperties>(); + // this sets the ostream logger + auto log_test_controller = LogTestController::getInstance(logger_properties); + + logger_properties->setHome(home_dir.string()); + logger_properties->loadConfigureFile("conf/minifi-log.properties"); + core::logging::LoggerConfiguration::getConfiguration().initialize(logger_properties); + + auto logger1 = core::logging::LoggerFactory<DummyClass1>::getLogger(); + auto logger2 = core::logging::LoggerFactory<DummyClass2>::getLogger(); + auto logger3 = core::logging::LoggerFactory<test::DummyClass3>::getLogger(); + + { + // verify initial log levels, none of these should be logged + logger1->log_debug("DummyClass1::before"); + logger2->log_debug("DummyClass2::before"); + logger3->log_debug("DummyClass3::before"); + + assert(!log_test_controller->contains("DummyClass1::before", 0s)); + assert(!log_test_controller->contains("DummyClass2::before", 0s)); + assert(!log_test_controller->contains("DummyClass3::before", 0s)); + } + + VerifyPropertyUpdate harness([&] { + assert(utils::verifyEventHappenedInPollTime(3s, [&] {return ack_handler.isAcknowledged("79");})); + // update operation acknowledged + { + // verify final log levels + logger1->log_debug("DummyClass1::after"); + logger2->log_debug("DummyClass2::after"); // this should still not log + logger3->log_debug("DummyClass3::after"); + } + assert(log_test_controller->contains("DummyClass1::after", 0s)); + assert(!log_test_controller->contains("DummyClass2::after", 0s)); + assert(log_test_controller->contains("DummyClass3::after", 0s)); + + { + minifi::PropertiesFile minifi_properties(std::ifstream{home_dir / "conf/minifi.properties"}); + assert(!minifi_properties.hasValue("nifi.dummy.property")); + assert(minifi_properties.getValue("nifi.property.one") == "bush"); + assert(minifi_properties.getValue("nifi.property.two") == "ring"); + } + + { + minifi::PropertiesFile minifi_log_properties(std::ifstream{home_dir / "conf/minifi-log.properties"}); + assert(!minifi_log_properties.hasValue("logger.test")); + assert(minifi_log_properties.getValue("logger.DummyClass1") == "DEBUG,ostream"); + } + }); + + harness.getConfiguration()->setHome(home_dir.string()); + harness.getConfiguration()->loadConfigureFile("conf/minifi.properties"); + ConfigTestAccessor::call_setLoggerProperties(harness.getConfiguration(), logger_properties); + + harness.setUrl("http://localhost:0/heartbeat", &hb_handler); + harness.setUrl("http://localhost:0/acknowledge", &ack_handler); + harness.setC2Url("/heartbeat", "/acknowledge"); + + hb_handler.setProperties({ + {"nifi.dummy.property", "banana", false}, + {"nifi.property.one", "bush", true}, + {"nifi.property.two", "ring", true}, + {"nifi.log.logger.test", "DEBUG,ostream", false}, + {"nifi.log.logger.DummyClass1", "DEBUG,ostream", true} + }); + + harness.run((home_dir / "conf/config.yml").string()); +} diff --git a/extensions/http-curl/tests/CMakeLists.txt b/extensions/http-curl/tests/CMakeLists.txt index 800cd37..e5410b5 100644 --- a/extensions/http-curl/tests/CMakeLists.txt +++ b/extensions/http-curl/tests/CMakeLists.txt @@ -97,3 +97,4 @@ add_test(NAME AbsoluteTimeoutTest COMMAND AbsoluteTimeoutTest) add_test(NAME C2PauseResumeTest COMMAND C2PauseResumeTest "${TEST_RESOURCES}/C2PauseResumeTest.yml" "${TEST_RESOURCES}/") add_test(NAME C2LogHeartbeatTest COMMAND C2LogHeartbeatTest) add_test(NAME C2DebugBundleTest COMMAND C2DebugBundleTest) +add_test(NAME C2PropertiesUpdateTests COMMAND C2PropertiesUpdateTests) diff --git a/extensions/http-curl/tests/EmptyFlow.h b/extensions/http-curl/tests/EmptyFlow.h new file mode 100644 index 0000000..0ece051 --- /dev/null +++ b/extensions/http-curl/tests/EmptyFlow.h @@ -0,0 +1,28 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +constexpr const char* empty_flow = R"( + Flow Controller: + name: Banana Bread + Processors: [] + Connections: [] + Remote Processing Groups: [] + Provenance Reporting: +)"; diff --git a/libminifi/include/c2/C2Agent.h b/libminifi/include/c2/C2Agent.h index f8d73e6..a62adc1 100644 --- a/libminifi/include/c2/C2Agent.h +++ b/libminifi/include/c2/C2Agent.h @@ -151,7 +151,7 @@ class C2Agent : public state::UpdateController { /** * Updates a property */ - bool update_property(const std::string &property_name, const std::string &property_value, bool persist); + bool update_property(const std::string &property_name, const std::string &property_value, PropertyChangeLifetime lifetime); void handle_transfer(const C2ContentResponse &resp); diff --git a/libminifi/include/core/Core.h b/libminifi/include/core/Core.h index c4c1ea7..aa7ed90 100644 --- a/libminifi/include/core/Core.h +++ b/libminifi/include/core/Core.h @@ -23,6 +23,7 @@ #include <memory> #include <string> +#include <string_view> #ifdef WIN32 #pragma comment(lib, "shlwapi.lib") @@ -64,6 +65,7 @@ #include "utils/Id.h" #include "properties/Configure.h" +#include "utils/StringUtils.h" /** * namespace aliasing @@ -80,13 +82,16 @@ static inline std::string getClassName() { std::free(b); return name; #else - std::string adjusted_name = typeid(T).name(); - // can probably skip class manually for slightly higher performance - const std::string clazz = "class "; - auto haz_clazz = adjusted_name.find(clazz); - if (haz_clazz == 0) - adjusted_name = adjusted_name.substr(clazz.length(), adjusted_name.length() - clazz.length()); - return adjusted_name; + std::string_view name = typeid(T).name(); + const std::string_view class_prefix = "class "; + const std::string_view struct_prefix = "struct "; + + if (utils::StringUtils::startsWith(name, class_prefix)) { + name.remove_prefix(class_prefix.length()); + } else if (utils::StringUtils::startsWith(name, struct_prefix)) { + name.remove_prefix(struct_prefix.length()); + } + return std::string{name}; #endif } diff --git a/libminifi/include/core/logging/LoggerConfiguration.h b/libminifi/include/core/logging/LoggerConfiguration.h index 7c2fb59..f201260 100644 --- a/libminifi/include/core/logging/LoggerConfiguration.h +++ b/libminifi/include/core/logging/LoggerConfiguration.h @@ -145,6 +145,8 @@ class LoggerConfiguration { const std::string name; }; + static std::shared_ptr<spdlog::sinks::rotating_file_sink_mt> getRotatingFileSink(const std::string& appender_key, const std::shared_ptr<LoggerProperties>& properties); + LoggerConfiguration(); internal::CompressionManager compression_manager_; std::shared_ptr<internal::LoggerNamespace> root_namespace_; diff --git a/libminifi/include/core/logging/internal/LogCompressorSink.h b/libminifi/include/core/logging/internal/LogCompressorSink.h index f1b28d6..fd02521 100644 --- a/libminifi/include/core/logging/internal/LogCompressorSink.h +++ b/libminifi/include/core/logging/internal/LogCompressorSink.h @@ -69,6 +69,14 @@ class LogCompressorSink : public spdlog::sinks::base_sink<std::mutex> { return std::move(compressed.buffer_); } + size_t getMaxCacheSize() const { + return cached_logs_.getMaxSize(); + } + + size_t getMaxCompressedSize() const { + return compressed_logs_.getMaxSize(); + } + private: enum class CompressionResult { Success, diff --git a/libminifi/include/properties/Configure.h b/libminifi/include/properties/Configure.h index 55bb74f..88b6878 100644 --- a/libminifi/include/properties/Configure.h +++ b/libminifi/include/properties/Configure.h @@ -19,10 +19,14 @@ #include <optional> #include <string> #include <utility> +#include <memory> #include "properties/Configuration.h" #include "properties/Decryptor.h" #include "core/AgentIdentificationProvider.h" +#include "core/logging/LoggerProperties.h" + +struct ConfigTestAccessor; namespace org { namespace apache { @@ -30,9 +34,10 @@ namespace nifi { namespace minifi { class Configure : public Configuration, public core::AgentIdentificationProvider { + friend struct ::ConfigTestAccessor; public: - explicit Configure(std::optional<Decryptor> decryptor = std::nullopt) - : Configuration{}, decryptor_(std::move(decryptor)) {} + explicit Configure(std::optional<Decryptor> decryptor = std::nullopt, std::shared_ptr<core::logging::LoggerProperties> logger_properties = {}) + : Configuration{}, decryptor_(std::move(decryptor)), logger_properties_(std::move(logger_properties)) {} bool get(const std::string& key, std::string& value) const; bool get(const std::string& key, const std::string& alternate_key, std::string& value) const; @@ -42,12 +47,24 @@ class Configure : public Configuration, public core::AgentIdentificationProvider std::string getAgentIdentifier() const override; void setFallbackAgentIdentifier(const std::string& id); + using Configuration::set; + void set(const std::string& key, const std::string& value, PropertyChangeLifetime lifetime) override; + bool commitChanges() override; + + private: + // WARNING! a test utility + void setLoggerProperties(std::shared_ptr<core::logging::LoggerProperties> new_properties) { + logger_properties_ = new_properties; + } + bool isEncrypted(const std::string& key) const; std::optional<Decryptor> decryptor_; mutable std::mutex fallback_identifier_mutex_; std::string fallback_identifier_; + std::atomic_bool logger_properties_changed_{false}; + std::shared_ptr<core::logging::LoggerProperties> logger_properties_; }; } // namespace minifi diff --git a/libminifi/include/properties/Properties.h b/libminifi/include/properties/Properties.h index d8a51fe..2a8325d 100644 --- a/libminifi/include/properties/Properties.h +++ b/libminifi/include/properties/Properties.h @@ -29,16 +29,23 @@ #include "core/logging/Logger.h" #include "utils/ChecksumCalculator.h" +#include "utils/StringUtils.h" namespace org { namespace apache { namespace nifi { namespace minifi { +enum class PropertyChangeLifetime { + TRANSIENT, // the changed value will not be committed to disk + PERSISTENT // the changed value will be written to the source file +}; + class Properties { struct PropertyValue { - std::string value; - bool changed; + std::string persisted_value; + std::string active_value; + bool need_to_persist_new_value{false}; }; public: @@ -55,11 +62,29 @@ class Properties { std::lock_guard<std::mutex> lock(mutex_); properties_.clear(); } + void set(const std::string& key, const std::string& value) { + set(key, value, PropertyChangeLifetime::PERSISTENT); + } // Set the config value - void set(const std::string &key, const std::string &value) { + virtual void set(const std::string &key, const std::string &value, PropertyChangeLifetime lifetime) { + auto active_value = utils::StringUtils::replaceEnvironmentVariables(value); std::lock_guard<std::mutex> lock(mutex_); - properties_[key] = PropertyValue{value, true}; - dirty_ = true; + bool should_persist = lifetime == PropertyChangeLifetime::PERSISTENT; + if (auto it = properties_.find(key); it != properties_.end()) { + // update an existing property + it->second.active_value = active_value; + if (should_persist) { + it->second.persisted_value = value; + it->second.need_to_persist_new_value = true; + } + } else { + // brand new property + properties_[key] = PropertyValue{value, active_value, should_persist}; + } + + if (should_persist) { + dirty_ = true; + } } // Check whether the config value existed bool has(const std::string& key) const { @@ -112,7 +137,7 @@ class Properties { return minifi_home_; } - bool persistProperties(); + virtual bool commitChanges(); utils::ChecksumCalculator& getChecksumCalculator() { return checksum_calculator_; } diff --git a/libminifi/include/utils/StagingQueue.h b/libminifi/include/utils/StagingQueue.h index 2714a3a..854ea34 100644 --- a/libminifi/include/utils/StagingQueue.h +++ b/libminifi/include/utils/StagingQueue.h @@ -139,6 +139,10 @@ class StagingQueue { return false; } + size_t getMaxSize() const { + return max_size_; + } + void discardOverflow() { while (total_size_ > max_size_) { Item item; diff --git a/libminifi/src/Configure.cpp b/libminifi/src/Configure.cpp index 2832295..62605b8 100644 --- a/libminifi/src/Configure.cpp +++ b/libminifi/src/Configure.cpp @@ -21,6 +21,7 @@ #include "utils/gsl.h" #include "core/logging/LoggerConfiguration.h" +#include "utils/StringUtils.h" namespace org { namespace apache { @@ -88,6 +89,31 @@ void Configure::setFallbackAgentIdentifier(const std::string& id) { fallback_identifier_ = id; } +void Configure::set(const std::string& key, const std::string& value, PropertyChangeLifetime lifetime) { + const std::string_view log_prefix = "nifi.log."; + if (utils::StringUtils::startsWith(key, log_prefix)) { + if (logger_properties_) { + logger_properties_changed_ = true; + logger_properties_->set(key.substr(log_prefix.length()), value, lifetime); + } + } else { + Configuration::set(key, value, lifetime); + } +} + +bool Configure::commitChanges() { + bool success = true; + if (logger_properties_) { + success &= logger_properties_->commitChanges(); + if (logger_properties_changed_) { + core::logging::LoggerConfiguration::getConfiguration().initialize(logger_properties_); + logger_properties_changed_ = false; + } + } + success &= Configuration::commitChanges(); + return success; +} + } /* namespace minifi */ } /* namespace nifi */ } /* namespace apache */ diff --git a/libminifi/src/c2/C2Agent.cpp b/libminifi/src/c2/C2Agent.cpp index 01f7660..50ed9b6 100644 --- a/libminifi/src/c2/C2Agent.cpp +++ b/libminifi/src/c2/C2Agent.cpp @@ -567,10 +567,15 @@ void C2Agent::handle_update(const C2ContentResponse &resp) { entry.second.getAnnotation("persist") | utils::map(&AnnotatedValue::to_string) | utils::flatMap(utils::StringUtils::toBool)).value_or(false); - if (!update_property(entry.first, entry.second.to_string(), persist)) { + PropertyChangeLifetime lifetime = persist ? PropertyChangeLifetime::PERSISTENT : PropertyChangeLifetime::TRANSIENT; + if (!update_property(entry.first, entry.second.to_string(), lifetime)) { result = state::UpdateState::PARTIALLY_APPLIED; } } + // apply changes and persist properties requested to be persisted + if (!configuration_->commitChanges()) { + result = state::UpdateState::PARTIALLY_APPLIED; + } C2Payload response(Operation::ACKNOWLEDGE, result, resp.ident, true); enqueue_c2_response(std::move(response)); } else if (resp.name == "c2") { @@ -601,15 +606,12 @@ void C2Agent::handle_update(const C2ContentResponse &resp) { /** * Updates a property */ -bool C2Agent::update_property(const std::string &property_name, const std::string &property_value, bool persist) { +bool C2Agent::update_property(const std::string &property_name, const std::string &property_value, PropertyChangeLifetime lifetime) { if (update_service_ && !update_service_->canUpdate(property_name)) { return false; } - configuration_->set(property_name, property_value); - if (!persist) { - return true; - } - return configuration_->persistProperties(); + configuration_->set(property_name, property_value, lifetime); + return true; } C2Payload C2Agent::bundleDebugInfo(std::map<std::string, std::unique_ptr<io::InputStream>>& files) { @@ -849,7 +851,7 @@ bool C2Agent::handleConfigurationUpdate(const C2ContentResponse &resp) { if (should_persist) { // update the flow id - configuration_->persistProperties(); + configuration_->commitChanges(); } return true; diff --git a/libminifi/src/core/logging/LoggerConfiguration.cpp b/libminifi/src/core/logging/LoggerConfiguration.cpp index 289faaf..ae8e4c8 100644 --- a/libminifi/src/core/logging/LoggerConfiguration.cpp +++ b/libminifi/src/core/logging/LoggerConfiguration.cpp @@ -185,43 +185,7 @@ std::shared_ptr<internal::LoggerNamespace> LoggerConfiguration::initialize_names if ("nullappender" == appender_type || "null appender" == appender_type || "null" == appender_type) { sink_map[appender_name] = std::make_shared<spdlog::sinks::null_sink_st>(); } else if ("rollingappender" == appender_type || "rolling appender" == appender_type || "rolling" == appender_type) { - std::string file_name; - if (!logger_properties->getString(appender_key + ".file_name", file_name)) { - file_name = "minifi-app.log"; - } - std::string directory; - if (!logger_properties->getString(appender_key + ".directory", directory)) { - // The below part assumes logger_properties->getHome() is existing - // Cause minifiHome must be set at MiNiFiMain.cpp? - directory = logger_properties->getHome() + utils::file::FileUtils::get_separator() + "logs"; - } - - if (utils::file::FileUtils::create_dir(directory) == -1) { - std::cerr << directory << " cannot be created\n"; - exit(1); - } - file_name = directory + utils::file::FileUtils::get_separator() + file_name; - - int max_files = 3; - std::string max_files_str = ""; - if (logger_properties->getString(appender_key + ".max_files", max_files_str)) { - try { - max_files = std::stoi(max_files_str); - } catch (const std::invalid_argument &) { - } catch (const std::out_of_range &) { - } - } - - int max_file_size = 5 * 1024 * 1024; - std::string max_file_size_str = ""; - if (logger_properties->getString(appender_key + ".max_file_size", max_file_size_str)) { - try { - max_file_size = std::stoi(max_file_size_str); - } catch (const std::invalid_argument &) { - } catch (const std::out_of_range &) { - } - } - sink_map[appender_name] = std::make_shared<spdlog::sinks::rotating_file_sink_mt>(file_name, max_file_size, max_files); + sink_map[appender_name] = getRotatingFileSink(appender_key, logger_properties); } else if ("stdout" == appender_type) { sink_map[appender_name] = std::make_shared<spdlog::sinks::stdout_sink_mt>(); } else if ("stderr" == appender_type) { @@ -318,7 +282,7 @@ std::shared_ptr<spdlog::logger> LoggerConfiguration::get_logger(std::shared_ptr< std::copy(inherited_sinks.begin(), inherited_sinks.end(), std::back_inserter(sinks)); spdlogger = std::make_shared<spdlog::logger>(name, begin(sinks), end(sinks)); spdlogger->set_level(level); - spdlogger->set_formatter(formatter -> clone()); + spdlogger->set_formatter(formatter->clone()); spdlogger->flush_on(std::max(spdlog::level::info, current_namespace->level)); try { spdlog::register_logger(spdlogger); @@ -359,6 +323,59 @@ void LoggerConfiguration::initializeCompression(const std::lock_guard<std::mutex } } +std::shared_ptr<spdlog::sinks::rotating_file_sink_mt> LoggerConfiguration::getRotatingFileSink(const std::string& appender_key, const std::shared_ptr<LoggerProperties>& properties) { + // According to spdlog docs, if two loggers write to the same file, they must use the same sink object. + // Note that some logging configuration changes will not take effect until MiNiFi is restarted. + static std::map<std::filesystem::path, std::shared_ptr<spdlog::sinks::rotating_file_sink_mt>> rotating_file_sinks; + static std::mutex sink_map_mtx; + + std::string file_name; + if (!properties->getString(appender_key + ".file_name", file_name)) { + file_name = "minifi-app.log"; + } + std::string directory; + if (!properties->getString(appender_key + ".directory", directory)) { + // The below part assumes logger_properties->getHome() is existing + // Cause minifiHome must be set at MiNiFiMain.cpp? + directory = properties->getHome() + utils::file::FileUtils::get_separator() + "logs"; + } + + file_name = directory + utils::file::FileUtils::get_separator() + file_name; + if (utils::file::FileUtils::create_dir(directory) == -1) { + std::cerr << directory << " cannot be created\n"; + exit(1); + } + + int max_files = 3; + std::string max_files_str = ""; + if (properties->getString(appender_key + ".max_files", max_files_str)) { + try { + max_files = std::stoi(max_files_str); + } catch (const std::invalid_argument &) { + } catch (const std::out_of_range &) { + } + } + + int max_file_size = 5_MiB; + std::string max_file_size_str = ""; + if (properties->getString(appender_key + ".max_file_size", max_file_size_str)) { + try { + max_file_size = std::stoi(max_file_size_str); + } catch (const std::invalid_argument &) { + } catch (const std::out_of_range &) { + } + } + + std::lock_guard<std::mutex> guard(sink_map_mtx); + auto it = rotating_file_sinks.find(file_name); + if (it != rotating_file_sinks.end()) { + return it->second; + } + auto sink = std::make_shared<spdlog::sinks::rotating_file_sink_mt>(file_name, max_file_size, max_files); + rotating_file_sinks.emplace(file_name, sink); + return sink; +} + } /* namespace logging */ } /* namespace core */ } /* namespace minifi */ diff --git a/libminifi/src/core/logging/internal/CompressionManager.cpp b/libminifi/src/core/logging/internal/CompressionManager.cpp index b7453f5..8733f8d 100644 --- a/libminifi/src/core/logging/internal/CompressionManager.cpp +++ b/libminifi/src/core/logging/internal/CompressionManager.cpp @@ -51,19 +51,19 @@ std::shared_ptr<LogCompressorSink> CompressionManager::initialize( }; auto cached_log_max_size = get_size(compression_cached_log_max_size_).value_or(8_MiB); auto compressed_log_max_size = get_size(compression_compressed_log_max_size_).value_or(8_MiB); - std::shared_ptr<internal::LogCompressorSink> sink; - if (cached_log_max_size != 0 && compressed_log_max_size != 0) { - sink = std::make_shared<internal::LogCompressorSink>( + std::lock_guard<std::mutex> lock(mtx_); + if (cached_log_max_size == 0 || compressed_log_max_size == 0) { + sink_.reset(); + return sink_; + } + // do not create new sink if all relevant parameters match + if (!sink_ || sink_->getMaxCacheSize() != cached_log_max_size || sink_->getMaxCompressedSize() != compressed_log_max_size) { + sink_ = std::make_shared<internal::LogCompressorSink>( LogQueueSize{cached_log_max_size, cache_segment_size}, LogQueueSize{compressed_log_max_size, compressed_segment_size}, logger_factory(getClassName<LogCompressorSink>())); } - { - // gcc4.8 bug => cannot use std::atomic_store - std::lock_guard<std::mutex> lock(mtx_); - sink_ = sink; - } - return sink; + return sink_; } } // namespace internal diff --git a/libminifi/src/properties/Properties.cpp b/libminifi/src/properties/Properties.cpp index 8df8202..b8d127a 100644 --- a/libminifi/src/properties/Properties.cpp +++ b/libminifi/src/properties/Properties.cpp @@ -43,7 +43,7 @@ bool Properties::getString(const std::string &key, std::string &value) const { auto it = properties_.find(key); if (it != properties_.end()) { - value = it->second.value; + value = it->second.active_value; return true; } else { return false; @@ -64,7 +64,7 @@ int Properties::getInt(const std::string &key, int default_value) const { std::lock_guard<std::mutex> lock(mutex_); auto it = properties_.find(key); - return it != properties_.end() ? std::stoi(it->second.value) : default_value; + return it != properties_.end() ? std::stoi(it->second.active_value) : default_value; } // Load Configure File @@ -91,7 +91,9 @@ void Properties::loadConfigureFile(const char *fileName) { } properties_.clear(); for (const auto& line : PropertiesFile{file}) { - properties_[line.getKey()] = {utils::StringUtils::replaceEnvironmentVariables(line.getValue()), false}; + auto persisted_value = line.getValue(); + auto value = utils::StringUtils::replaceEnvironmentVariables(persisted_value); + properties_[line.getKey()] = {persisted_value, value, false}; } checksum_calculator_.setFileLocation(properties_file_); dirty_ = false; @@ -102,7 +104,7 @@ std::string Properties::getFilePath() const { return properties_file_; } -bool Properties::persistProperties() { +bool Properties::commitChanges() { std::lock_guard<std::mutex> lock(mutex_); if (!dirty_) { logger_->log_info("Attempt to persist, but properties are not updated"); @@ -118,13 +120,13 @@ bool Properties::persistProperties() { PropertiesFile current_content{file}; for (const auto& prop : properties_) { - if (!prop.second.changed) { + if (!prop.second.need_to_persist_new_value) { continue; } if (current_content.hasValue(prop.first)) { - current_content.update(prop.first, prop.second.value); + current_content.update(prop.first, prop.second.persisted_value); } else { - current_content.append(prop.first, prop.second.value); + current_content.append(prop.first, prop.second.persisted_value); } } @@ -151,7 +153,7 @@ std::map<std::string, std::string> Properties::getProperties() const { std::lock_guard<std::mutex> lock(mutex_); std::map<std::string, std::string> properties; for (const auto& prop : properties_) { - properties[prop.first] = prop.second.value; + properties[prop.first] = prop.second.active_value; } return properties; } diff --git a/libminifi/src/utils/crypto/EncryptionManager.cpp b/libminifi/src/utils/crypto/EncryptionManager.cpp index 5f37318..122071a 100644 --- a/libminifi/src/utils/crypto/EncryptionManager.cpp +++ b/libminifi/src/utils/crypto/EncryptionManager.cpp @@ -78,7 +78,7 @@ bool EncryptionManager::writeKey(const std::string &key_name, const Bytes& key) bootstrap_conf.setHome(key_dir_); bootstrap_conf.loadConfigureFile(DEFAULT_NIFI_BOOTSTRAP_FILE); bootstrap_conf.set(key_name, utils::StringUtils::to_hex(key)); - return bootstrap_conf.persistProperties(); + return bootstrap_conf.commitChanges(); } } // namespace crypto diff --git a/libminifi/test/TestBase.cpp b/libminifi/test/TestBase.cpp index dad6c95..dc6daf7 100644 --- a/libminifi/test/TestBase.cpp +++ b/libminifi/test/TestBase.cpp @@ -197,9 +197,9 @@ TestPlan::TestPlan(std::shared_ptr<minifi::core::ContentRepository> content_repo controller_services_provider_ = std::make_shared<minifi::core::controller::StandardControllerServiceProvider>(controller_services_, nullptr, configuration_); /* Inject the default state provider ahead of ProcessContext to make sure we have a unique state directory */ if (state_dir == nullptr) { - state_dir_ = std::make_unique<StateDir>(); + state_dir_ = std::make_unique<TempDirectory>(); } else { - state_dir_ = std::make_unique<StateDir>(state_dir); + state_dir_ = std::make_unique<TempDirectory>(state_dir); } if (!configuration_->get(minifi::Configure::nifi_state_management_provider_local_path)) { configuration_->set(minifi::Configure::nifi_state_management_provider_local_path, state_dir_->getPath()); @@ -625,8 +625,6 @@ std::shared_ptr<TestPlan> TestController::createPlan(std::shared_ptr<minifi::Con } std::string TestController::createTempDirectory() { - char format[] = "/var/tmp/nifi-minifi-cpp.test.XXXXXX"; - auto dir = minifi::utils::file::FileUtils::create_temp_directory(format); - directories.push_back(dir); - return dir; + directories.push_back(std::make_unique<TempDirectory>()); + return directories.back()->getPath(); } diff --git a/libminifi/test/TestBase.h b/libminifi/test/TestBase.h index 13bf4c8..8b40dca 100644 --- a/libminifi/test/TestBase.h +++ b/libminifi/test/TestBase.h @@ -166,6 +166,35 @@ class LogTestController { std::vector<std::string> modified_loggers; }; +class TempDirectory { + public: + TempDirectory() { + char format[] = "/var/tmp/nifi-minifi-cpp.test.XXXXXX"; + path_ = minifi::utils::file::FileUtils::create_temp_directory(format); + is_owner_ = true; + } + explicit TempDirectory(std::string path): path_{std::move(path)}, is_owner_{false} {} + + // disable copy + TempDirectory(const TempDirectory&) = delete; + TempDirectory& operator=(const TempDirectory&) = delete; + + ~TempDirectory() { + if (is_owner_) { + minifi::utils::file::FileUtils::delete_dir(path_, true); + } + } + + [[nodiscard]] + std::string getPath() const { + return path_; + } + + private: + std::string path_; + bool is_owner_; +}; + class TestPlan { public: explicit TestPlan(std::shared_ptr<minifi::core::ContentRepository> content_repo, std::shared_ptr<minifi::core::Repository> flow_repo, std::shared_ptr<minifi::core::Repository> prov_repo, @@ -256,35 +285,7 @@ class TestPlan { void validateAnnotations() const; protected: - class StateDir { - public: - StateDir() { - char state_dir_name_template[] = "/var/tmp/teststate.XXXXXX"; - path_ = minifi::utils::file::FileUtils::create_temp_directory(state_dir_name_template); - is_owner_ = true; - } - - explicit StateDir(std::string path) : path_(std::move(path)), is_owner_(false) {} - - StateDir(const StateDir&) = delete; - StateDir& operator=(const StateDir&) = delete; - - ~StateDir() { - if (is_owner_) { - minifi::utils::file::FileUtils::delete_dir(path_, true); - } - } - - [[nodiscard]] std::string getPath() const { - return path_; - } - - private: - std::string path_; - bool is_owner_; - }; - - std::unique_ptr<StateDir> state_dir_; + std::unique_ptr<TempDirectory> state_dir_; std::shared_ptr<minifi::Connection> buildFinalConnection(const std::shared_ptr<minifi::core::Processor>& processor, bool setDest = false); @@ -349,18 +350,12 @@ class TestController { return log; } - ~TestController() { - for (const auto& dir : directories) { - minifi::utils::file::FileUtils::delete_dir(dir, true); - } - } - std::string createTempDirectory(); protected: std::shared_ptr<minifi::state::response::FlowVersion> flow_version_; LogTestController &log; - std::vector<std::string> directories; + std::vector<std::unique_ptr<TempDirectory>> directories; }; static bool disableAwsMetadata = [] { diff --git a/main/MiNiFiMain.cpp b/main/MiNiFiMain.cpp index 45d0128..4763423 100644 --- a/main/MiNiFiMain.cpp +++ b/main/MiNiFiMain.cpp @@ -218,7 +218,7 @@ int main(int argc, char **argv) { logger->log_info("No encryption key found, will not decrypt sensitive properties in the configuration"); } - const std::shared_ptr<minifi::Configure> configure = std::make_shared<minifi::Configure>(std::move(decryptor)); + const std::shared_ptr<minifi::Configure> configure = std::make_shared<minifi::Configure>(std::move(decryptor), std::move(log_properties)); configure->setHome(minifiHome); configure->loadConfigureFile(DEFAULT_NIFI_PROPERTIES_FILE);
