This is an automated email from the ASF dual-hosted git repository. lordgamez pushed a commit to branch MINIFICPP-2502 in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git
commit 1889eaaf7e09d253d68c4800dd3180db3af43648 Author: Gabor Gyimesi <[email protected]> AuthorDate: Tue Jan 7 15:49:09 2025 +0100 MINIFICPP-2502 Add processorBulletins C2 metric node to FlowInformation --- C2.md | 31 ++++++ conf/minifi.properties | 3 + extensions/python/ExecutePythonProcessor.cpp | 9 +- extensions/python/ExecutePythonProcessor.h | 2 + .../tests/unit/FlowJsonTests.cpp | 2 + .../tests/unit/YamlConfigurationTests.cpp | 2 + libminifi/include/core/BulletinStore.h | 70 +++++++++++++ libminifi/include/core/FlowConfiguration.h | 3 + libminifi/include/core/ProcessGroup.h | 37 +------ libminifi/include/core/ProcessorConfig.h | 19 +--- libminifi/include/core/flow/FlowSchema.h | 1 + libminifi/include/core/logging/LoggerBase.h | 32 ++++++ .../include/core/state/MetricsPublisherStore.h | 3 +- .../include/core/state/nodes/FlowInformation.h | 6 ++ .../include/core/state/nodes/ResponseNodeLoader.h | 4 +- libminifi/src/Configuration.cpp | 1 + libminifi/src/core/BulletinStore.cpp | 76 +++++++++++++++ libminifi/src/core/FlowConfiguration.cpp | 1 + libminifi/src/core/ProcessGroup.cpp | 13 +++ libminifi/src/core/flow/FlowSchema.cpp | 2 + .../src/core/flow/StructuredConfiguration.cpp | 17 ++++ libminifi/src/core/logging/LoggerBase.cpp | 7 ++ libminifi/src/core/state/MetricsPublisherStore.cpp | 4 +- libminifi/src/core/state/nodes/FlowInformation.cpp | 27 ++++++ .../src/core/state/nodes/ResponseNodeLoader.cpp | 9 +- libminifi/test/integration/C2MetricsTest.cpp | 27 +++++- .../test/libtest/integration/IntegrationBase.cpp | 6 +- .../test/libtest/integration/IntegrationBase.h | 2 + libminifi/test/resources/TestC2Metrics.yml | 2 + libminifi/test/unit/BulletinStoreTests.cpp | 108 +++++++++++++++++++++ minifi-api/include/minifi-cpp/core/Processor.h | 8 ++ .../include/minifi-cpp/core/logging/Logger.h | 5 +- .../include/minifi-cpp/properties/Configuration.h | 1 + minifi_main/MiNiFiMain.cpp | 9 +- utils/include/core/Processor.h | 29 ++++++ utils/include/utils/TimeUtil.h | 4 + utils/src/core/Processor.cpp | 5 +- 37 files changed, 519 insertions(+), 68 deletions(-) diff --git a/C2.md b/C2.md index c990829d4..244e0e887 100644 --- a/C2.md +++ b/C2.md @@ -115,6 +115,9 @@ be requested via C2 DESCRIBE manifest command. # minimize REST heartbeat updates #nifi.c2.rest.heartbeat.minimize.updates=true + # specify the maximum number of bulletins to send in a heartbeat + # nifi.c2.flow.info.processor.bulletin.limit=1000 + #### Flow Id and URL Flow id and URL are usually retrieved from the C2 server. These identify the last updated flow version and where the flow was downloaded from. These properties are persisted in the minifi.properties file. @@ -194,6 +197,20 @@ configuration produces the following JSON: "uuid": "2438e3c8-015a-1000-79ca-83af40ec1997" } }, + "processorBulletins": [ + { + "id": 1, + "timestamp": "Mon Jan 27 12:10:47 UTC 2025", + "level": "ERROR", + "category": "Log Message", + "message": "Error connecting to localhost:8776 due to Connection refused (2438e3c8-015a-1000-79ca-83af40ec1991)", + "groupId": "2438e3c8-015a-1000-79ca-83af40ec1990", + "groupName": "MiNiFi Flow", + "groupPath": "MiNiFi Flow", + "sourceId": "2438e3c8-015a-1000-79ca-83af40ec1991", + "sourceName": "GetTCP" + } + ], "processorStatuses": [ { "id": "5128e3c8-015a-1000-79ca-83af40ec1990", @@ -536,6 +553,20 @@ Contains information about the flow the agent is running, including the versione "uuid": "8368e3c8-015a-1003-52ca-83af40ec1332" } }, + "processorBulletins": [ + { + "id": 1, + "timestamp": "Mon Jan 27 12:10:47 UTC 2025", + "level": "ERROR", + "category": "Log Message", + "message": "Error connecting to localhost:8776 due to Connection refused (2438e3c8-015a-1000-79ca-83af40ec1991)", + "groupId": "2438e3c8-015a-1000-79ca-83af40ec1990", + "groupName": "MiNiFi Flow", + "groupPath": "MiNiFi Flow", + "sourceId": "2438e3c8-015a-1000-79ca-83af40ec1991", + "sourceName": "GetTCP" + } + ], "processorStatuses": [ { "id": "5128e3c8-015a-1000-79ca-83af40ec1990", diff --git a/conf/minifi.properties b/conf/minifi.properties index f681e9a28..286f87f4b 100644 --- a/conf/minifi.properties +++ b/conf/minifi.properties @@ -120,6 +120,9 @@ nifi.c2.full.heartbeat=false # specify encoding strategy for c2 requests (gzip, none) #nifi.c2.rest.request.encoding=none +# specify the maximum number of bulletins to send in a heartbeat +#nifi.c2.flow.info.processor.bulletin.limit=1000 + ## enable the controller socket provider on port 9998 ## off by default. #controller.socket.enable=true diff --git a/extensions/python/ExecutePythonProcessor.cpp b/extensions/python/ExecutePythonProcessor.cpp index 707e23806..29ae001db 100644 --- a/extensions/python/ExecutePythonProcessor.cpp +++ b/extensions/python/ExecutePythonProcessor.cpp @@ -158,10 +158,7 @@ void ExecutePythonProcessor::reloadScriptIfUsingScriptFileProperty() { std::unique_ptr<PythonScriptEngine> ExecutePythonProcessor::createScriptEngine() { auto engine = std::make_unique<PythonScriptEngine>(); - - python_logger_ = core::logging::LoggerFactory<ExecutePythonProcessor>::getAliasedLogger(getName()); engine->initialize(Success, Failure, Original, python_logger_); - return engine; } @@ -283,6 +280,12 @@ std::vector<core::Relationship> ExecutePythonProcessor::getPythonRelationships() return relationships; } +void ExecutePythonProcessor::setLoggerCallback(const std::function<void(core::logging::LOG_LEVEL level, const std::string& message)>& callback) { + gsl_Expects(logger_ && python_logger_); + logger_->addLogCallback(callback); + python_logger_->addLogCallback(callback); +} + REGISTER_RESOURCE(ExecutePythonProcessor, Processor); } // namespace org::apache::nifi::minifi::extensions::python::processors diff --git a/extensions/python/ExecutePythonProcessor.h b/extensions/python/ExecutePythonProcessor.h index 9c0138565..b9e12da07 100644 --- a/extensions/python/ExecutePythonProcessor.h +++ b/extensions/python/ExecutePythonProcessor.h @@ -46,6 +46,7 @@ class ExecutePythonProcessor : public core::ProcessorImpl { python_dynamic_(false), reload_on_script_change_(true) { logger_ = core::logging::LoggerFactory<ExecutePythonProcessor>::getLogger(uuid_); + python_logger_ = core::logging::LoggerFactory<ExecutePythonProcessor>::getAliasedLogger(getName()); } EXTENSIONAPI static constexpr const char* Description = "Executes a script given the flow file and a process session. " @@ -141,6 +142,7 @@ class ExecutePythonProcessor : public core::ProcessorImpl { nonstd::expected<core::Property, std::error_code> getSupportedProperty(std::string_view name) const override; std::vector<core::Relationship> getPythonRelationships() const; + void setLoggerCallback(const std::function<void(core::logging::LOG_LEVEL level, const std::string& message)>& callback) override; nonstd::expected<std::string, std::error_code> getProperty(std::string_view name) const override; nonstd::expected<void, std::error_code> setProperty(std::string_view name, std::string value) override; diff --git a/extensions/standard-processors/tests/unit/FlowJsonTests.cpp b/extensions/standard-processors/tests/unit/FlowJsonTests.cpp index 5ef3abf55..88a3fd1f8 100644 --- a/extensions/standard-processors/tests/unit/FlowJsonTests.cpp +++ b/extensions/standard-processors/tests/unit/FlowJsonTests.cpp @@ -79,6 +79,7 @@ TEST_CASE("NiFi flow json format is correctly parsed") { "schedulingPeriod": "3 sec", "penaltyDuration": "12 sec", "yieldDuration": "4 sec", + "bulletinLevel": "ERROR", "runDurationMillis": 12, "autoTerminatedRelationships": ["one", "two"], "properties": { @@ -148,6 +149,7 @@ TEST_CASE("NiFi flow json format is correctly parsed") { CHECK(3s == proc->getSchedulingPeriod()); CHECK(12s == proc->getPenalizationPeriod()); CHECK(4s == proc->getYieldPeriod()); + CHECK(proc->getLogBulletinLevel() == logging::LOG_LEVEL::err); CHECK(proc->isAutoTerminated({"one", ""})); CHECK(proc->isAutoTerminated({"two", ""})); CHECK_FALSE(proc->isAutoTerminated({"three", ""})); diff --git a/extensions/standard-processors/tests/unit/YamlConfigurationTests.cpp b/extensions/standard-processors/tests/unit/YamlConfigurationTests.cpp index 4e2a31485..72746076d 100644 --- a/extensions/standard-processors/tests/unit/YamlConfigurationTests.cpp +++ b/extensions/standard-processors/tests/unit/YamlConfigurationTests.cpp @@ -102,6 +102,7 @@ Processors: scheduling period: 1 sec penalization period: 30 sec yield period: 1 sec + bulletin level: ERROR run duration nanos: 0 auto-terminated relationships list: Properties: @@ -159,6 +160,7 @@ Provenance Reporting: REQUIRE(1s == rootFlowConfig->findProcessorByName("TailFile")->getSchedulingPeriod()); REQUIRE(30s == rootFlowConfig->findProcessorByName("TailFile")->getPenalizationPeriod()); REQUIRE(1s == rootFlowConfig->findProcessorByName("TailFile")->getYieldPeriod()); + REQUIRE(rootFlowConfig->findProcessorByName("TailFile")->getLogBulletinLevel() == logging::LOG_LEVEL::err); REQUIRE(0s == rootFlowConfig->findProcessorByName("TailFile")->getRunDurationNano()); std::map<std::string, minifi::Connection*> connectionMap; diff --git a/libminifi/include/core/BulletinStore.h b/libminifi/include/core/BulletinStore.h new file mode 100644 index 000000000..372601454 --- /dev/null +++ b/libminifi/include/core/BulletinStore.h @@ -0,0 +1,70 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#pragma once + +#include <string> +#include <deque> +#include <mutex> +#include <memory> +#include <optional> +#include <chrono> + +#include "properties/Configure.h" +#include "core/logging/LoggerFactory.h" +#include "core/Processor.h" + +namespace org::apache::nifi::minifi { +namespace test { +class BulletinStoreTestAccessor; +} // namespace test + +namespace core { + +struct Bulletin { + uint64_t id = 0; + std::chrono::time_point<std::chrono::system_clock> timestamp; + std::string level; + std::string category; + std::string message; + std::string group_id; + std::string group_name; + std::string group_path; + std::string source_id; + std::string source_name; +}; + +class BulletinStore { + public: + explicit BulletinStore(const Configure& configure); + void addProcessorBulletin(const core::Processor& processor, core::logging::LOG_LEVEL log_level, const std::string& message); + std::deque<Bulletin> getBulletins(std::optional<std::chrono::system_clock::duration> time_interval_to_include = {}) const; + size_t getMaxBulletinCount() const; + + private: + friend class minifi::test::BulletinStoreTestAccessor; + + static constexpr size_t DEFAULT_BULLETIN_COUNT = 1000; + size_t max_bulletin_count_; + mutable std::mutex mutex_; + uint64_t id_counter = 1; + std::deque<Bulletin> bulletins_; + std::shared_ptr<logging::Logger> logger_{logging::LoggerFactory<BulletinStore>::getLogger()}; +}; + +} // namespace core +} // namespace org::apache::nifi::minifi diff --git a/libminifi/include/core/FlowConfiguration.h b/libminifi/include/core/FlowConfiguration.h index a4066f969..e6c9249e9 100644 --- a/libminifi/include/core/FlowConfiguration.h +++ b/libminifi/include/core/FlowConfiguration.h @@ -43,6 +43,7 @@ #include "utils/ChecksumCalculator.h" #include "ParameterContext.h" #include "ParameterProvider.h" +#include "core/BulletinStore.h" namespace org::apache::nifi::minifi::core { @@ -61,6 +62,7 @@ struct ConfigurationContext { std::optional<std::filesystem::path> path{std::nullopt}; std::shared_ptr<utils::file::FileSystem> filesystem{std::make_shared<utils::file::FileSystem>()}; std::optional<utils::crypto::EncryptionProvider> sensitive_values_encryptor{std::nullopt}; + core::BulletinStore* bulletin_store{nullptr}; }; enum class FlowSerializationType { Json, NifiJson, Yaml }; @@ -153,6 +155,7 @@ class FlowConfiguration : public CoreComponentImpl { std::shared_ptr<utils::file::FileSystem> filesystem_; utils::crypto::EncryptionProvider sensitive_values_encryptor_; utils::ChecksumCalculator checksum_calculator_; + core::BulletinStore* bulletin_store_ = nullptr; private: virtual std::string serialize(const ProcessGroup&) { return ""; } diff --git a/libminifi/include/core/ProcessGroup.h b/libminifi/include/core/ProcessGroup.h index bb3b018b7..1a267b04e 100644 --- a/libminifi/include/core/ProcessGroup.h +++ b/libminifi/include/core/ProcessGroup.h @@ -73,17 +73,13 @@ class ProcessGroup : public CoreComponentImpl { ProcessGroup(ProcessGroupType type, std::string_view name); ProcessGroup(ProcessGroupType type, std::string_view name, const utils::Identifier& uuid); ProcessGroup(ProcessGroupType type, std::string_view name, const utils::Identifier& uuid, int version); - // Destructor ~ProcessGroup() override; - // Set URL void setURL(std::string url) { url_ = std::move(url); } - // Get URL std::string getURL() { return (url_); } - // SetTransmitting void setTransmitting(bool val) { transmitting_ = val; } @@ -93,7 +89,6 @@ class ProcessGroup : public CoreComponentImpl { uint64_t getTimeout() { return timeout_; } - // setInterface void setInterface(std::string &ifc) { local_network_interface_ = ifc; } @@ -124,11 +119,9 @@ class ProcessGroup : public CoreComponentImpl { http::HTTPProxy getHTTPProxy() { return proxy_; } - // Set Processor yield period in MilliSecond void setYieldPeriodMsec(std::chrono::milliseconds period) { yield_period_msec_ = period; } - // Get Processor yield period in MilliSecond std::chrono::milliseconds getYieldPeriodMsec() { return (yield_period_msec_); } @@ -147,13 +140,11 @@ class ProcessGroup : public CoreComponentImpl { const std::function<bool(const Processor*)>& filter = nullptr); bool isRemoteProcessGroup(); - // set parent process group void setParent(ProcessGroup *parent) { std::lock_guard<std::recursive_mutex> lock(mutex_); parent_process_group_ = parent; } - // get parent process group - ProcessGroup *getParent() { + ProcessGroup *getParent() const { std::lock_guard<std::recursive_mutex> lock(mutex_); return parent_process_group_; } @@ -185,25 +176,17 @@ class ProcessGroup : public CoreComponentImpl { } return nullptr; } - // findProcessor based on UUID Processor* findProcessorById(const utils::Identifier& uuid, Traverse traverse = Traverse::IncludeChildren) const; - // findProcessor based on name Processor* findProcessorByName(const std::string &processorName, Traverse traverse = Traverse::IncludeChildren) const; void getAllProcessors(std::vector<Processor*>& processor_vec) const; - /** - * Add controller service - * @param nodeId node identifier - * @param node controller service node. - */ void addControllerService(const std::string &nodeId, const std::shared_ptr<core::controller::ControllerServiceNode> &node); core::controller::ControllerServiceNode* findControllerService(const std::string &nodeId, Traverse traverse = Traverse::ExcludeChildren) const; std::vector<const core::controller::ControllerServiceNode*> getAllControllerServices() const; - // update property value void updatePropertyValue(const std::string& processorName, const std::string& propertyName, const std::string& propertyValue); void getConnections(std::map<std::string, Connection*>& connectionMap); @@ -228,45 +211,29 @@ class ProcessGroup : public CoreComponentImpl { protected: void startProcessingProcessors(TimerDrivenSchedulingAgent& timeScheduler, EventDrivenSchedulingAgent& eventScheduler, CronDrivenSchedulingAgent& cronScheduler); - // version int config_version_; - // Process Group Type const ProcessGroupType type_; - // Processors (ProcessNode) inside this process group which include Remote Process Group input/Output port std::set<std::unique_ptr<Processor>> processors_; std::set<Processor*> failed_processors_; std::set<Port*> ports_; std::set<std::unique_ptr<ProcessGroup>> child_process_groups_; - // Connections between the processor inside the group; std::set<std::unique_ptr<Connection>> connections_; - // Parent Process Group ProcessGroup* parent_process_group_; - // Yield Period in Milliseconds std::atomic<std::chrono::milliseconds> yield_period_msec_; std::atomic<uint64_t> timeout_; - - // URL std::string url_; - // local network interface std::string local_network_interface_; - // Transmitting std::atomic<bool> transmitting_; - // http proxy http::HTTPProxy proxy_; std::string transport_protocol_; - - // controller services - core::controller::ControllerServiceNodeMap controller_service_map_; - ParameterContext* parameter_context_ = nullptr; private: static Port* findPortById(const std::set<Port*>& ports, const utils::Identifier& uuid); + std::string buildGroupPath() const; - // Mutex for protection mutable std::recursive_mutex mutex_; - // Logger std::shared_ptr<logging::Logger> logger_; ProcessGroup(const ProcessGroup &parent); diff --git a/libminifi/include/core/ProcessorConfig.h b/libminifi/include/core/ProcessorConfig.h index e4706204c..dd29562aa 100644 --- a/libminifi/include/core/ProcessorConfig.h +++ b/libminifi/include/core/ProcessorConfig.h @@ -14,8 +14,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -#ifndef LIBMINIFI_INCLUDE_CORE_PROCESSORCONFIG_H_ -#define LIBMINIFI_INCLUDE_CORE_PROCESSORCONFIG_H_ +#pragma once #include <string> #include <vector> @@ -23,12 +22,7 @@ #include "core/Core.h" #include "core/Property.h" -namespace org { -namespace apache { -namespace nifi { -namespace minifi { -namespace core { - +namespace org::apache::nifi::minifi::core { constexpr const char* DEFAULT_SCHEDULING_STRATEGY{"TIMER_DRIVEN"}; constexpr const char* DEFAULT_SCHEDULING_PERIOD_STR{"1 sec"}; @@ -47,16 +41,11 @@ struct ProcessorConfig { std::string schedulingPeriod; std::string penalizationPeriod; std::string yieldPeriod; + std::string bulletinLevel; std::string runDurationNanos; std::vector<std::string> autoTerminatedRelationships; std::vector<core::Property> properties; std::string parameterContextName; }; -} // namespace core -} // namespace minifi -} // namespace nifi -} // namespace apache -} // namespace org - -#endif // LIBMINIFI_INCLUDE_CORE_PROCESSORCONFIG_H_ +} // namespace org::apache::nifi::minifi::core diff --git a/libminifi/include/core/flow/FlowSchema.h b/libminifi/include/core/flow/FlowSchema.h index e751d7bff..ebbb2ab26 100644 --- a/libminifi/include/core/flow/FlowSchema.h +++ b/libminifi/include/core/flow/FlowSchema.h @@ -35,6 +35,7 @@ struct FlowSchema { Keys max_concurrent_tasks; Keys penalization_period; Keys proc_yield_period; + Keys bulletin_level; Keys runduration_nanos; Keys connections; diff --git a/libminifi/include/core/logging/LoggerBase.h b/libminifi/include/core/logging/LoggerBase.h index 69142b473..29cd40ed5 100644 --- a/libminifi/include/core/logging/LoggerBase.h +++ b/libminifi/include/core/logging/LoggerBase.h @@ -80,6 +80,36 @@ inline LOG_LEVEL mapFromSpdLogLevel(spdlog::level::level_enum level) { throw std::invalid_argument(fmt::format("Invalid spdlog::level::level_enum {}", magic_enum::enum_underlying(level))); } +inline std::string mapLogLevelToString(LOG_LEVEL level) { + switch (level) { + case trace: return "TRACE"; + case debug: return "DEBUG"; + case info: return "INFO"; + case warn: return "WARN"; + case err: return "ERROR"; + case critical: return "CRITICAL"; + case off: return "OFF"; + } + throw std::invalid_argument(fmt::format("Invalid LOG_LEVEL {}", magic_enum::enum_underlying(level))); +} + +inline LOG_LEVEL mapStringToLogLevel(const std::string& level_str) { + if (level_str == "TRACE") { + return trace; + } else if (level_str == "DEBUG") { + return debug; + } else if (level_str == "INFO") { + return info; + } else if (level_str == "WARN") { + return warn; + } else if (level_str == "ERROR") { + return err; + } else if (level_str == "CRITICAL") { + return critical; + } + throw std::invalid_argument(fmt::format("Invalid LOG_LEVEL {}", level_str)); +} + class LoggerBase : public Logger { public: LoggerBase(LoggerBase const&) = delete; @@ -92,6 +122,7 @@ class LoggerBase : public Logger { bool should_log(LOG_LEVEL level) override; void log_string(LOG_LEVEL level, std::string str) override; LOG_LEVEL level() const override; + void addLogCallback(const std::function<void(LOG_LEVEL level, const std::string&)>& callback) override; protected: LoggerBase(std::shared_ptr<spdlog::logger> delegate, std::shared_ptr<LoggerControl> controller); @@ -109,6 +140,7 @@ class LoggerBase : public Logger { private: std::atomic<int> max_log_size_{LOG_BUFFER_SIZE}; + std::vector<std::function<void(LOG_LEVEL level, const std::string&)>> log_callbacks_; }; } // namespace org::apache::nifi::minifi::core::logging diff --git a/libminifi/include/core/state/MetricsPublisherStore.h b/libminifi/include/core/state/MetricsPublisherStore.h index 4be8c3504..61cad498b 100644 --- a/libminifi/include/core/state/MetricsPublisherStore.h +++ b/libminifi/include/core/state/MetricsPublisherStore.h @@ -28,13 +28,14 @@ #include "utils/gsl.h" #include "core/ProcessGroup.h" #include "utils/file/AssetManager.h" +#include "core/BulletinStore.h" namespace org::apache::nifi::minifi::state { class MetricsPublisherStore { public: MetricsPublisherStore(std::shared_ptr<Configure> configuration, const std::vector<std::shared_ptr<core::RepositoryMetricsSource>>& repository_metric_sources, - std::shared_ptr<core::FlowConfiguration> flow_configuration, utils::file::AssetManager* asset_manager = nullptr); + std::shared_ptr<core::FlowConfiguration> flow_configuration, utils::file::AssetManager* asset_manager = nullptr, core::BulletinStore* bulletin_store = nullptr); void initialize(core::controller::ControllerServiceProvider* controller, state::StateMonitor* update_sink); void loadMetricNodes(core::ProcessGroup* root); void clearMetricNodes(); diff --git a/libminifi/include/core/state/nodes/FlowInformation.h b/libminifi/include/core/state/nodes/FlowInformation.h index 7ba9a4931..e0f69b2ca 100644 --- a/libminifi/include/core/state/nodes/FlowInformation.h +++ b/libminifi/include/core/state/nodes/FlowInformation.h @@ -27,6 +27,7 @@ #include "Connection.h" #include "core/state/ConnectionStore.h" #include "core/Processor.h" +#include "core/BulletinStore.h" namespace org::apache::nifi::minifi::state::response { @@ -121,6 +122,10 @@ class FlowInformation : public StateMonitorNode { processors_ = std::move(processors); } + void setBulletinStore(core::BulletinStore* bulletin_store) { + bulletin_store_ = bulletin_store; + } + std::vector<SerializedResponseNode> serialize() override; std::vector<PublishedMetric> calculateMetrics() override; @@ -128,6 +133,7 @@ class FlowInformation : public StateMonitorNode { std::shared_ptr<state::response::FlowVersion> flow_version_; ConnectionStore connection_store_; std::vector<core::Processor*> processors_; + core::BulletinStore* bulletin_store_ = nullptr; }; } // namespace org::apache::nifi::minifi::state::response diff --git a/libminifi/include/core/state/nodes/ResponseNodeLoader.h b/libminifi/include/core/state/nodes/ResponseNodeLoader.h index 69fc73a7b..018bc5e08 100644 --- a/libminifi/include/core/state/nodes/ResponseNodeLoader.h +++ b/libminifi/include/core/state/nodes/ResponseNodeLoader.h @@ -36,13 +36,14 @@ #include "core/RepositoryMetricsSource.h" #include "utils/file/AssetManager.h" #include "minifi-cpp/core/state/nodes/ResponseNodeLoader.h" +#include "core/BulletinStore.h" namespace org::apache::nifi::minifi::state::response { class ResponseNodeLoaderImpl : public ResponseNodeLoader { public: ResponseNodeLoaderImpl(std::shared_ptr<Configure> configuration, std::vector<std::shared_ptr<core::RepositoryMetricsSource>> repository_metric_sources, - std::shared_ptr<core::FlowConfiguration> flow_configuration, utils::file::AssetManager* asset_manager = nullptr); + std::shared_ptr<core::FlowConfiguration> flow_configuration, utils::file::AssetManager* asset_manager = nullptr, core::BulletinStore* bulletin_store = nullptr); void setNewConfigRoot(core::ProcessGroup* root) override; void clearConfigRoot() override; @@ -81,6 +82,7 @@ class ResponseNodeLoaderImpl : public ResponseNodeLoader { utils::file::AssetManager* asset_manager_{}; core::controller::ControllerServiceProvider* controller_{}; state::StateMonitor* update_sink_{}; + core::BulletinStore* bulletin_store_{}; std::shared_ptr<core::logging::Logger> logger_{core::logging::LoggerFactory<ResponseNodeLoader>::getLogger()}; }; diff --git a/libminifi/src/Configuration.cpp b/libminifi/src/Configuration.cpp index e9bf0ba1c..e41874fdb 100644 --- a/libminifi/src/Configuration.cpp +++ b/libminifi/src/Configuration.cpp @@ -98,6 +98,7 @@ const std::unordered_map<std::string_view, gsl::not_null<const core::PropertyVal {Configuration::nifi_c2_rest_ssl_context_service, gsl::make_not_null(&core::StandardPropertyValidators::ALWAYS_VALID_VALIDATOR)}, {Configuration::nifi_c2_rest_request_encoding, gsl::make_not_null(&core::StandardPropertyValidators::ALWAYS_VALID_VALIDATOR)}, {Configuration::nifi_c2_rest_heartbeat_minimize_updates, gsl::make_not_null(&core::StandardPropertyValidators::BOOLEAN_VALIDATOR)}, + {Configuration::nifi_c2_flow_info_processor_bulletin_limit, gsl::make_not_null(&core::StandardPropertyValidators::UNSIGNED_INTEGER_VALIDATOR)}, {Configuration::nifi_state_storage_local, gsl::make_not_null(&core::StandardPropertyValidators::ALWAYS_VALID_VALIDATOR)}, {Configuration::nifi_state_storage_local_old, gsl::make_not_null(&core::StandardPropertyValidators::ALWAYS_VALID_VALIDATOR)}, {Configuration::nifi_state_storage_local_class_name, gsl::make_not_null(&core::StandardPropertyValidators::ALWAYS_VALID_VALIDATOR)}, diff --git a/libminifi/src/core/BulletinStore.cpp b/libminifi/src/core/BulletinStore.cpp new file mode 100644 index 000000000..89443b7d8 --- /dev/null +++ b/libminifi/src/core/BulletinStore.cpp @@ -0,0 +1,76 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include "core/BulletinStore.h" + +#include "core/logging/LoggerBase.h" + +namespace org::apache::nifi::minifi::core { + +BulletinStore::BulletinStore(const Configure &configure) { + auto max_bulletin_count_str = configure.get(Configuration::nifi_c2_flow_info_processor_bulletin_limit); + if (!max_bulletin_count_str) { + logger_->log_debug("Bulletin limit not set, using default value of {}", DEFAULT_BULLETIN_COUNT); + max_bulletin_count_ = DEFAULT_BULLETIN_COUNT; + return; + } + try { + max_bulletin_count_ = std::stoul(*max_bulletin_count_str); + } catch(const std::exception&) { + logger_->log_warn("Invalid value for bulletin limit, using default value of {}", DEFAULT_BULLETIN_COUNT); + max_bulletin_count_ = DEFAULT_BULLETIN_COUNT; + } +} + +void BulletinStore::addProcessorBulletin(const core::Processor& processor, core::logging::LOG_LEVEL log_level, const std::string& message) { + std::lock_guard<std::mutex> lock(mutex_); + Bulletin bulletin; + bulletin.id = id_counter++; + bulletin.timestamp = std::chrono::system_clock::now(); + bulletin.level = core::logging::mapLogLevelToString(log_level); + bulletin.category = "Log Message"; + bulletin.message = message; + bulletin.group_id = processor.getProcessGroupUUIDStr(); + bulletin.group_name = processor.getProcessGroupName(); + bulletin.group_path = processor.getProcessGroupPath(); + bulletin.source_id = processor.getUUIDStr(); + bulletin.source_name = processor.getName(); + if (bulletins_.size() >= max_bulletin_count_) { + bulletins_.pop_front(); + } + bulletins_.push_back(std::move(bulletin)); +} + +std::deque<Bulletin> BulletinStore::getBulletins(std::optional<std::chrono::system_clock::duration> time_interval_to_include) const { + std::lock_guard<std::mutex> lock(mutex_); + if (!time_interval_to_include) { + return bulletins_; + } + for (auto it = bulletins_.begin(); it != bulletins_.end(); ++it) { + if (std::chrono::system_clock::now() - it->timestamp <= *time_interval_to_include) { + return {it, bulletins_.end()}; + } + } + return {}; +} + +size_t BulletinStore::getMaxBulletinCount() const { + std::lock_guard<std::mutex> lock(mutex_); + return max_bulletin_count_; +} + +} // namespace org::apache::nifi::minifi::core diff --git a/libminifi/src/core/FlowConfiguration.cpp b/libminifi/src/core/FlowConfiguration.cpp index 302595af3..b5af58826 100644 --- a/libminifi/src/core/FlowConfiguration.cpp +++ b/libminifi/src/core/FlowConfiguration.cpp @@ -37,6 +37,7 @@ FlowConfiguration::FlowConfiguration(ConfigurationContext ctx) service_provider_(std::make_shared<core::controller::StandardControllerServiceProvider>(std::make_unique<core::controller::ControllerServiceNodeMap>(), configuration_)), filesystem_(std::move(ctx.filesystem)), sensitive_values_encryptor_(std::move(ctx.sensitive_values_encryptor.value())), + bulletin_store_(ctx.bulletin_store), logger_(logging::LoggerFactory<FlowConfiguration>::getLogger()) { std::string flowUrl; std::string bucket_id = "default"; diff --git a/libminifi/src/core/ProcessGroup.cpp b/libminifi/src/core/ProcessGroup.cpp index 4f274575d..df183661c 100644 --- a/libminifi/src/core/ProcessGroup.cpp +++ b/libminifi/src/core/ProcessGroup.cpp @@ -87,6 +87,8 @@ std::tuple<Processor*, bool> ProcessGroup::addProcessor(std::unique_ptr<Processo const auto name = processor->getName(); std::lock_guard<std::recursive_mutex> lock(mutex_); processor->setProcessGroupUUIDStr(getUUIDStr()); + processor->setProcessGroupName(getName()); + processor->setProcessGroupPath(buildGroupPath()); const auto [iter, inserted] = processors_.insert(std::move(processor)); if (inserted) { logger_->log_debug("Add processor {} into process group {}", name, name_); @@ -485,4 +487,15 @@ ParameterContext* ProcessGroup::getParameterContext() const { return parameter_context_; } +std::string ProcessGroup::buildGroupPath() const { + std::lock_guard<std::recursive_mutex> lock(mutex_); + std::string path = name_; + auto parent = parent_process_group_; + while (parent != nullptr) { + path.insert(0, parent->getName() + " / "); + parent = parent->getParent(); + } + return path; +} + } // namespace org::apache::nifi::minifi::core diff --git a/libminifi/src/core/flow/FlowSchema.cpp b/libminifi/src/core/flow/FlowSchema.cpp index 812b578f0..7d80603d9 100644 --- a/libminifi/src/core/flow/FlowSchema.cpp +++ b/libminifi/src/core/flow/FlowSchema.cpp @@ -31,6 +31,7 @@ FlowSchema FlowSchema::getDefault() { .max_concurrent_tasks = {"max concurrent tasks"}, .penalization_period = {"penalization period"}, .proc_yield_period = {"yield period"}, + .bulletin_level = {"bulletin level"}, .runduration_nanos = {"run duration nanos"}, .connections = {"Connections"}, @@ -101,6 +102,7 @@ FlowSchema FlowSchema::getNiFiFlowJson() { .max_concurrent_tasks = {"concurrentlySchedulableTaskCount"}, .penalization_period = {"penaltyDuration"}, .proc_yield_period = {"yieldDuration"}, + .bulletin_level = {"bulletinLevel"}, // TODO(adebreceni): MINIFICPP-2033 since this is unused the mismatch between nano and milli is not an issue .runduration_nanos = {"runDurationMillis"}, diff --git a/libminifi/src/core/flow/StructuredConfiguration.cpp b/libminifi/src/core/flow/StructuredConfiguration.cpp index 5eb856c5f..2d41a0747 100644 --- a/libminifi/src/core/flow/StructuredConfiguration.cpp +++ b/libminifi/src/core/flow/StructuredConfiguration.cpp @@ -356,6 +356,11 @@ void StructuredConfiguration::parseProcessorNode(const Node& processors_node, co logger_->log_debug("parseProcessorNode: yield period => [{}]", procCfg.yieldPeriod); } + if (auto bulletin_level_node = procNode[schema_.bulletin_level]) { + procCfg.bulletinLevel = bulletin_level_node.getString().value(); + logger_->log_debug("parseProcessorNode: bulletin level => [{}]", procCfg.bulletinLevel); + } + if (auto runNode = procNode[schema_.runduration_nanos]) { procCfg.runDurationNanos = runNode.getIntegerAsString().value(); logger_->log_debug("parseProcessorNode: run duration nanos => [{}]", procCfg.runDurationNanos); @@ -398,6 +403,18 @@ void StructuredConfiguration::parseProcessorNode(const Node& processors_node, co processor->setYieldPeriodMsec(yield_period.value()); } + if (!procCfg.bulletinLevel.empty()) { + processor->setLogBulletinLevel(core::logging::mapStringToLogLevel(procCfg.bulletinLevel)); + } + processor->setLoggerCallback([this, processor = processor.get()](core::logging::LOG_LEVEL level, const std::string& message) { + if (level < processor->getLogBulletinLevel()) { + return; + } + if (bulletin_store_) { + bulletin_store_->addProcessorBulletin(*processor, level, message); + } + }); + // Default to running processor->setScheduledState(core::RUNNING); diff --git a/libminifi/src/core/logging/LoggerBase.cpp b/libminifi/src/core/logging/LoggerBase.cpp index f4415a891..9df1a890a 100644 --- a/libminifi/src/core/logging/LoggerBase.cpp +++ b/libminifi/src/core/logging/LoggerBase.cpp @@ -36,6 +36,10 @@ void LoggerControl::setEnabled(bool status) { is_enabled_ = status; } +void LoggerBase::addLogCallback(const std::function<void(LOG_LEVEL level, const std::string&)>& callback) { + std::lock_guard<std::mutex> lock(mutex_); + log_callbacks_.push_back(callback); +} bool LoggerBase::should_log(LOG_LEVEL level) { if (controller_ && !controller_->is_enabled()) @@ -46,6 +50,9 @@ bool LoggerBase::should_log(LOG_LEVEL level) { } void LoggerBase::log_string(LOG_LEVEL level, std::string str) { + for (const auto& callback : log_callbacks_) { + callback(level, str); + } delegate_->log(mapToSpdLogLevel(level), str.c_str()); } diff --git a/libminifi/src/core/state/MetricsPublisherStore.cpp b/libminifi/src/core/state/MetricsPublisherStore.cpp index 85fddb39a..6c89e58c1 100644 --- a/libminifi/src/core/state/MetricsPublisherStore.cpp +++ b/libminifi/src/core/state/MetricsPublisherStore.cpp @@ -23,9 +23,9 @@ namespace org::apache::nifi::minifi::state { MetricsPublisherStore::MetricsPublisherStore(std::shared_ptr<Configure> configuration, const std::vector<std::shared_ptr<core::RepositoryMetricsSource>>& repository_metric_sources, - std::shared_ptr<core::FlowConfiguration> flow_configuration, utils::file::AssetManager* asset_manager) + std::shared_ptr<core::FlowConfiguration> flow_configuration, utils::file::AssetManager* asset_manager, core::BulletinStore* bulletin_store) : configuration_(configuration), - response_node_loader_(std::make_shared<response::ResponseNodeLoaderImpl>(std::move(configuration), repository_metric_sources, std::move(flow_configuration), asset_manager)) { + response_node_loader_(std::make_shared<response::ResponseNodeLoaderImpl>(std::move(configuration), repository_metric_sources, std::move(flow_configuration), asset_manager, bulletin_store)) { } void MetricsPublisherStore::initialize(core::controller::ControllerServiceProvider* controller, state::StateMonitor* update_sink) { diff --git a/libminifi/src/core/state/nodes/FlowInformation.cpp b/libminifi/src/core/state/nodes/FlowInformation.cpp index bbe9fc451..56584156b 100644 --- a/libminifi/src/core/state/nodes/FlowInformation.cpp +++ b/libminifi/src/core/state/nodes/FlowInformation.cpp @@ -18,6 +18,9 @@ #include "core/state/nodes/FlowInformation.h" #include "core/Resource.h" #include "core/state/Value.h" +#include "utils/TimeUtil.h" + +using namespace std::literals::chrono_literals; namespace org::apache::nifi::minifi::state::response { @@ -99,6 +102,30 @@ std::vector<SerializedResponseNode> FlowInformation::serialize() { serialized.push_back(processorsStatusesNode); } + if (bulletin_store_) { + SerializedResponseNode processorBulletinsNode{.name = "processorBulletins", .array = true, .collapsible = false}; + auto bulletins = bulletin_store_->getBulletins(5min); + for (const auto& bulletin : bulletins) { + processorBulletinsNode.children.push_back({ + .name = std::to_string(bulletin.id), + .collapsible = false, + .children = { + {.name = "id", .value = bulletin.id}, + {.name = "timestamp", .value = utils::timeutils::getNiFiDateTimeFormat(std::chrono::time_point_cast<std::chrono::seconds>(bulletin.timestamp))}, + {.name = "level", .value = bulletin.level}, + {.name = "category", .value = bulletin.category}, + {.name = "message", .value = bulletin.message}, + {.name = "groupId", .value = bulletin.group_id}, + {.name = "groupName", .value = bulletin.group_name}, + {.name = "groupPath", .value = bulletin.group_path}, + {.name = "sourceId", .value = bulletin.source_id}, + {.name = "sourceName", .value = bulletin.source_name} + } + }); + } + serialized.push_back(processorBulletinsNode); + } + return serialized; } diff --git a/libminifi/src/core/state/nodes/ResponseNodeLoader.cpp b/libminifi/src/core/state/nodes/ResponseNodeLoader.cpp index ef8d6e867..257eab3df 100644 --- a/libminifi/src/core/state/nodes/ResponseNodeLoader.cpp +++ b/libminifi/src/core/state/nodes/ResponseNodeLoader.cpp @@ -34,11 +34,12 @@ namespace org::apache::nifi::minifi::state::response { ResponseNodeLoaderImpl::ResponseNodeLoaderImpl(std::shared_ptr<Configure> configuration, std::vector<std::shared_ptr<core::RepositoryMetricsSource>> repository_metric_sources, - std::shared_ptr<core::FlowConfiguration> flow_configuration, utils::file::AssetManager* asset_manager) + std::shared_ptr<core::FlowConfiguration> flow_configuration, utils::file::AssetManager* asset_manager, core::BulletinStore* bulletin_store) : configuration_(std::move(configuration)), repository_metric_sources_(std::move(repository_metric_sources)), flow_configuration_(std::move(flow_configuration)), - asset_manager_(asset_manager) { + asset_manager_(asset_manager), + bulletin_store_(bulletin_store) { } void ResponseNodeLoaderImpl::clearConfigRoot() { @@ -233,6 +234,10 @@ void ResponseNodeLoaderImpl::initializeFlowInformation(const SharedResponseNode& flow_information->setFlowVersion(flow_configuration_->getFlowVersion()); } + if (bulletin_store_) { + flow_information->setBulletinStore(bulletin_store_); + } + if (root_) { std::vector<core::Processor*> processors; root_->getAllProcessors(processors); diff --git a/libminifi/test/integration/C2MetricsTest.cpp b/libminifi/test/integration/C2MetricsTest.cpp index 1aad28e5e..fb51805b4 100644 --- a/libminifi/test/integration/C2MetricsTest.cpp +++ b/libminifi/test/integration/C2MetricsTest.cpp @@ -45,7 +45,7 @@ class VerifyC2Metrics : public VerifyC2Base { LogTestController::getInstance().setTrace<minifi::c2::C2Agent>(); LogTestController::getInstance().setDebug<minifi::c2::RESTSender>(); LogTestController::getInstance().setDebug<minifi::FlowController>(); - LogTestController::getInstance().setOff<minifi::processors::GetTCP>(); + LogTestController::getInstance().setDebug<minifi::processors::GetTCP>(); VerifyC2Base::testSetup(); } @@ -93,6 +93,7 @@ class MetricsHandler: public HeartbeatHandler { VERIFY_UPDATED_METRICS }; + static constexpr const char* PROCESS_GROUP_UUID = "2438e3c8-015a-1000-79ca-83af40ec1990"; static constexpr const char* GETTCP_UUID = "2438e3c8-015a-1000-79ca-83af40ec1991"; static constexpr const char* LOGATTRIBUTE1_UUID = "2438e3c8-015a-1000-79ca-83af40ec1992"; static constexpr const char* LOGATTRIBUTE2_UUID = "5128e3c8-015a-1000-79ca-83af40ec1990"; @@ -160,6 +161,27 @@ class MetricsHandler: public HeartbeatHandler { runtime_metrics["flowInfo"].HasMember("processorStatuses"); } + static bool verifyProcessorBulletins(const rapidjson::Value& runtime_metrics) { + if (!runtime_metrics["flowInfo"].HasMember("processorBulletins")) { + return false; + } + auto bulletins = runtime_metrics["flowInfo"]["processorBulletins"].GetArray(); + return std::any_of(bulletins.begin(), bulletins.end(), [](const auto& bulletin) { + std::string message = bulletin["message"].GetString(); + return bulletin["id"].GetInt() > 0 && + !std::string{bulletin["timestamp"].GetString()}.empty() && + bulletin["level"].GetString() == std::string("ERROR") && + bulletin["category"].GetString() == std::string("Log Message") && + message.find("Error connecting to") != std::string::npos && + message.find(GETTCP_UUID) != std::string::npos && + bulletin["groupId"].GetString() == std::string(PROCESS_GROUP_UUID) && + bulletin["groupName"].GetString() == std::string("MiNiFi Flow") && + bulletin["groupPath"].GetString() == std::string("MiNiFi Flow") && + bulletin["sourceId"].GetString() == std::string(GETTCP_UUID) && + bulletin["sourceName"].GetString() == std::string("GetTCP"); + }); + } + static bool verifyRuntimeMetrics(const rapidjson::Value& runtime_metrics) { return verifyCommonRuntimeMetricNodes(runtime_metrics, "2438e3c8-015a-1000-79ca-83af40ec1997") && [&]() { @@ -173,7 +195,8 @@ class MetricsHandler: public HeartbeatHandler { } return processorMetricsAreValid(processor); }); - }(); + }() && + verifyProcessorBulletins(runtime_metrics); } static bool verifyUpdatedRuntimeMetrics(const rapidjson::Value& runtime_metrics) { diff --git a/libminifi/test/libtest/integration/IntegrationBase.cpp b/libminifi/test/libtest/integration/IntegrationBase.cpp index 3652e0ccb..a7facb26d 100644 --- a/libminifi/test/libtest/integration/IntegrationBase.cpp +++ b/libminifi/test/libtest/integration/IntegrationBase.cpp @@ -91,6 +91,7 @@ void IntegrationBase::run(const std::optional<std::filesystem::path>& test_file_ std::string nifi_configuration_class_name = "adaptiveconfiguration"; configuration->get(minifi::Configure::nifi_configuration_class_name, nifi_configuration_class_name); + bulletin_store_ = std::make_unique<core::BulletinStore>(*configuration); std::shared_ptr<core::FlowConfiguration> flow_config = core::createFlowConfiguration( core::ConfigurationContext{ .flow_file_repo = test_repo, @@ -98,7 +99,8 @@ void IntegrationBase::run(const std::optional<std::filesystem::path>& test_file_ .configuration = configuration, .path = test_file_location, .filesystem = filesystem, - .sensitive_values_encryptor = sensitive_values_encryptor + .sensitive_values_encryptor = sensitive_values_encryptor, + .bulletin_store = bulletin_store_.get() }, nifi_configuration_class_name); auto controller_service_provider = flow_config->getControllerServiceProvider(); @@ -119,7 +121,7 @@ void IntegrationBase::run(const std::optional<std::filesystem::path>& test_file_ std::vector<std::shared_ptr<core::RepositoryMetricsSource>> repo_metric_sources{test_repo, test_flow_repo, content_repo}; asset_manager_ = std::make_unique<minifi::utils::file::AssetManager>(*configuration); - auto metrics_publisher_store = std::make_unique<minifi::state::MetricsPublisherStore>(configuration, repo_metric_sources, flow_config, asset_manager_.get()); + auto metrics_publisher_store = std::make_unique<minifi::state::MetricsPublisherStore>(configuration, repo_metric_sources, flow_config, asset_manager_.get(), bulletin_store_.get()); flowController_ = std::make_unique<minifi::FlowController>(test_repo, test_flow_repo, configuration, std::move(flow_config), content_repo, std::move(metrics_publisher_store), filesystem, request_restart, asset_manager_.get()); flowController_->load(); diff --git a/libminifi/test/libtest/integration/IntegrationBase.h b/libminifi/test/libtest/integration/IntegrationBase.h index 76c518606..002820486 100644 --- a/libminifi/test/libtest/integration/IntegrationBase.h +++ b/libminifi/test/libtest/integration/IntegrationBase.h @@ -30,6 +30,7 @@ #include "properties/Configure.h" #include "utils/file/AssetManager.h" #include "utils/file/FileUtils.h" +#include "core/BulletinStore.h" namespace minifi = org::apache::nifi::minifi; namespace core = minifi::core; @@ -110,6 +111,7 @@ class IntegrationBase { void configureSecurity(); std::shared_ptr<minifi::Configure> configuration; std::unique_ptr<minifi::utils::file::AssetManager> asset_manager_; + std::unique_ptr<core::BulletinStore> bulletin_store_; std::unique_ptr<minifi::state::response::ResponseNodeLoader> response_node_loader_; std::unique_ptr<minifi::FlowController> flowController_; std::chrono::milliseconds wait_time_; diff --git a/libminifi/test/resources/TestC2Metrics.yml b/libminifi/test/resources/TestC2Metrics.yml index 6a0af5e4c..fdb892419 100644 --- a/libminifi/test/resources/TestC2Metrics.yml +++ b/libminifi/test/resources/TestC2Metrics.yml @@ -29,6 +29,7 @@ Processors: penalization period: 30 sec yield period: 10 sec run duration nanos: 0 + bulletin level: ERROR auto-terminated relationships list: Properties: Endpoint List: localhost:8776 @@ -43,6 +44,7 @@ Processors: scheduling period: 30 sec penalization period: 30 sec yield period: 1 sec + bulletin level: ERROR run duration nanos: 0 auto-terminated relationships list: - response diff --git a/libminifi/test/unit/BulletinStoreTests.cpp b/libminifi/test/unit/BulletinStoreTests.cpp new file mode 100644 index 000000000..bb32ca303 --- /dev/null +++ b/libminifi/test/unit/BulletinStoreTests.cpp @@ -0,0 +1,108 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include <memory> + +#include "unit/TestBase.h" +#include "unit/Catch.h" +#include "core/BulletinStore.h" +#include "properties/Configure.h" +#include "unit/DummyProcessor.h" + +using namespace std::literals::chrono_literals; + +namespace org::apache::nifi::minifi::test { + +class BulletinStoreTestAccessor { + public: + static std::deque<core::Bulletin>& getBulletins(core::BulletinStore& store) { + return store.bulletins_; + } +}; + +std::unique_ptr<core::Processor> createDummyProcessor() { + auto processor = std::make_unique<DummyProcessor>("DummyProcessor", minifi::utils::Identifier::parse("4d7fa7e6-2459-46dd-b2ba-61517239edf5").value()); + processor->setProcessGroupUUIDStr("68fa9ae4-b9fc-4873-b0d9-edab59fdb0c2"); + processor->setProcessGroupName("sub_group"); + processor->setProcessGroupPath("root/sub_group"); + return processor; +} + +TEST_CASE("Create BulletinStore with default max size of 1000", "[bulletinStore]") { + ConfigureImpl configuration; + SECTION("No limit is configured") {} + SECTION("Invalid value is configured") { + configuration.set(Configure::nifi_c2_flow_info_processor_bulletin_limit, "invalid"); + } + core::BulletinStore bulletin_store(configuration); + REQUIRE(bulletin_store.getMaxBulletinCount() == 1000); +} + +TEST_CASE("Create BulletinStore with custom max size of 10000", "[bulletinStore]") { + ConfigureImpl configuration; + configuration.set(Configure::nifi_c2_flow_info_processor_bulletin_limit, "10000"); + core::BulletinStore bulletin_store(configuration); + REQUIRE(bulletin_store.getMaxBulletinCount() == 10000); +} + +TEST_CASE("Remove oldest entries when limit is reached", "[bulletinStore]") { + ConfigureImpl configuration; + configuration.set(Configure::nifi_c2_flow_info_processor_bulletin_limit, "2"); + core::BulletinStore bulletin_store(configuration); + auto processor = createDummyProcessor(); + for (size_t i = 0; i < 3; ++i) { + bulletin_store.addProcessorBulletin(*processor, logging::LOG_LEVEL::warn, "Warning message"); + } + auto bulletins = bulletin_store.getBulletins(); + REQUIRE(bulletins.size() == 2); + REQUIRE(bulletins[0].id == 2); + REQUIRE(bulletins[1].id == 3); + REQUIRE(bulletins[0].message == "Warning message"); +} + +TEST_CASE("Return all bulletins when no time interval is defined or all entries are part of the time interval", "[bulletinStore]") { + ConfigureImpl configuration; + core::BulletinStore bulletin_store(configuration); + auto processor = createDummyProcessor(); + for (size_t i = 0; i < 3; ++i) { + bulletin_store.addProcessorBulletin(*processor, logging::LOG_LEVEL::warn, "Warning message"); + } + auto bulletins = bulletin_store.getBulletins(); + REQUIRE(bulletins.size() == 3); + REQUIRE(bulletins[0].id == 1); + REQUIRE(bulletins[1].id == 2); + REQUIRE(bulletins[2].id == 3); + REQUIRE(bulletins[2].message == "Warning message"); +} + +TEST_CASE("Return only bulletins that are inside the defined time interval", "[bulletinStore]") { + ConfigureImpl configuration; + core::BulletinStore bulletin_store(configuration); + auto processor = createDummyProcessor(); + for (size_t i = 0; i < 3; ++i) { + bulletin_store.addProcessorBulletin(*processor, logging::LOG_LEVEL::warn, "Warning message"); + } + BulletinStoreTestAccessor::getBulletins(bulletin_store)[0].timestamp -= 5min; + + auto bulletins = bulletin_store.getBulletins(3min); + REQUIRE(bulletins.size() == 2); + REQUIRE(bulletins[0].id == 2); + REQUIRE(bulletins[1].id == 3); + REQUIRE(bulletins[0].message == "Warning message"); +} + +} // namespace org::apache::nifi::minifi::test diff --git a/minifi-api/include/minifi-cpp/core/Processor.h b/minifi-api/include/minifi-cpp/core/Processor.h index b732c21eb..ded0f7570 100644 --- a/minifi-api/include/minifi-cpp/core/Processor.h +++ b/minifi-api/include/minifi-cpp/core/Processor.h @@ -33,6 +33,7 @@ #include "minifi-cpp/core/state/nodes/MetricsBase.h" #include "ProcessorMetrics.h" #include "utils/gsl.h" +#include "core/logging/Logger.h" namespace org::apache::nifi::minifi { @@ -88,6 +89,13 @@ class Processor : public virtual Connectable, public virtual ConfigurableCompone virtual gsl::not_null<std::shared_ptr<ProcessorMetrics>> getMetrics() const = 0; virtual std::string getProcessGroupUUIDStr() const = 0; virtual void setProcessGroupUUIDStr(const std::string &uuid) = 0; + virtual std::string getProcessGroupName() const = 0; + virtual void setProcessGroupName(const std::string &name) = 0; + virtual std::string getProcessGroupPath() const = 0; + virtual void setProcessGroupPath(const std::string &path) = 0; + virtual logging::LOG_LEVEL getLogBulletinLevel() const = 0; + virtual void setLogBulletinLevel(logging::LOG_LEVEL level) = 0; + virtual void setLoggerCallback(const std::function<void(logging::LOG_LEVEL level, const std::string& message)>& callback) = 0; virtual void updateReachability(const std::lock_guard<std::mutex>& graph_lock, bool force = false) = 0; virtual const std::unordered_map<Connection*, std::unordered_set<Processor*>>& reachable_processors() const = 0; diff --git a/minifi-api/include/minifi-cpp/core/logging/Logger.h b/minifi-api/include/minifi-cpp/core/logging/Logger.h index f94a46758..d73681a91 100644 --- a/minifi-api/include/minifi-cpp/core/logging/Logger.h +++ b/minifi-api/include/minifi-cpp/core/logging/Logger.h @@ -99,6 +99,8 @@ class Logger { virtual ~Logger() = default; + virtual void addLogCallback(const std::function<void(LOG_LEVEL level, const std::string&)>& callback) = 0; + protected: virtual int getMaxLogSize() = 0; @@ -124,7 +126,8 @@ class Logger { if (!should_log(level)) { return; } - log_string(level, stringify(std::move(fmt), map_args(std::forward<Args>(args))...)); + auto message = stringify(std::move(fmt), map_args(std::forward<Args>(args))...); + log_string(level, message); } }; diff --git a/minifi-api/include/minifi-cpp/properties/Configuration.h b/minifi-api/include/minifi-cpp/properties/Configuration.h index 2009e746f..46d29f6ba 100644 --- a/minifi-api/include/minifi-cpp/properties/Configuration.h +++ b/minifi-api/include/minifi-cpp/properties/Configuration.h @@ -128,6 +128,7 @@ class Configuration : public virtual Properties { static constexpr const char *nifi_c2_rest_ssl_context_service = "nifi.c2.rest.ssl.context.service"; static constexpr const char *nifi_c2_rest_heartbeat_minimize_updates = "nifi.c2.rest.heartbeat.minimize.updates"; static constexpr const char *nifi_c2_rest_request_encoding = "nifi.c2.rest.request.encoding"; + static constexpr const char *nifi_c2_flow_info_processor_bulletin_limit = "nifi.c2.flow.info.processor.bulletin.limit"; // state management options static constexpr const char *nifi_state_storage_local = "nifi.state.storage.local"; diff --git a/minifi_main/MiNiFiMain.cpp b/minifi_main/MiNiFiMain.cpp index 849ab2672..c7f832eff 100644 --- a/minifi_main/MiNiFiMain.cpp +++ b/minifi_main/MiNiFiMain.cpp @@ -65,11 +65,11 @@ #include "MainHelper.h" #include "agent/JsonSchema.h" #include "core/state/nodes/ResponseNodeLoader.h" -#include "c2/C2Agent.h" #include "core/state/MetricsPublisherStore.h" #include "argparse/argparse.hpp" #include "agent/agent_version.h" #include "Fips.h" +#include "core/BulletinStore.h" namespace minifi = org::apache::nifi::minifi; namespace core = minifi::core; @@ -393,6 +393,8 @@ int main(int argc, char **argv) { should_encrypt_flow_config, utils::crypto::EncryptionProvider::create(minifiHome)); + std::unique_ptr<core::BulletinStore> bulletin_store = std::make_unique<core::BulletinStore>(*configure); + std::shared_ptr<core::FlowConfiguration> flow_configuration = core::createFlowConfiguration( core::ConfigurationContext{ .flow_file_repo = flow_repo, @@ -400,13 +402,14 @@ int main(int argc, char **argv) { .configuration = configure, .path = configure->get(minifi::Configure::nifi_flow_configuration_file), .filesystem = filesystem, - .sensitive_values_encryptor = utils::crypto::EncryptionProvider::createSensitivePropertiesEncryptor(minifiHome) + .sensitive_values_encryptor = utils::crypto::EncryptionProvider::createSensitivePropertiesEncryptor(minifiHome), + .bulletin_store = bulletin_store.get() }, nifi_configuration_class_name); auto asset_manager = std::make_unique<utils::file::AssetManager>(*configure); std::vector<std::shared_ptr<core::RepositoryMetricsSource>> repo_metric_sources{prov_repo, flow_repo, content_repo}; - auto metrics_publisher_store = std::make_unique<minifi::state::MetricsPublisherStore>(configure, repo_metric_sources, flow_configuration, asset_manager.get()); + auto metrics_publisher_store = std::make_unique<minifi::state::MetricsPublisherStore>(configure, repo_metric_sources, flow_configuration, asset_manager.get(), bulletin_store.get()); const auto controller = std::make_unique<minifi::FlowController>( prov_repo, flow_repo, configure, std::move(flow_configuration), content_repo, std::move(metrics_publisher_store), filesystem, request_restart, asset_manager.get()); diff --git a/utils/include/core/Processor.h b/utils/include/core/Processor.h index df19776a8..11c779b00 100644 --- a/utils/include/core/Processor.h +++ b/utils/include/core/Processor.h @@ -173,6 +173,22 @@ class ProcessorImpl : public virtual Processor, public ConnectableImpl, public C process_group_uuid_ = uuid; } + std::string getProcessGroupName() const override { + return process_group_name_; + } + + void setProcessGroupName(const std::string &name) override { + process_group_name_ = name; + } + + std::string getProcessGroupPath() const override { + return process_group_path_; + } + + void setProcessGroupPath(const std::string &path) override { + process_group_path_ = path; + } + void yield() override; void yield(std::chrono::steady_clock::duration delta_time) override; @@ -226,6 +242,16 @@ class ProcessorImpl : public virtual Processor, public ConnectableImpl, public C return metrics_; } + logging::LOG_LEVEL getLogBulletinLevel() const override { + return log_bulletin_level_; + } + + void setLogBulletinLevel(logging::LOG_LEVEL level) override { + log_bulletin_level_ = level; + } + + void setLoggerCallback(const std::function<void(logging::LOG_LEVEL level, const std::string& message)>& callback) override; + static constexpr auto DynamicProperties = std::array<DynamicProperty, 0>{}; static constexpr auto OutputAttributes = std::array<OutputAttributeReference, 0>{}; @@ -247,6 +273,7 @@ class ProcessorImpl : public virtual Processor, public ConnectableImpl, public C gsl::not_null<std::shared_ptr<ProcessorMetrics>> metrics_; std::shared_ptr<logging::Logger> logger_; + logging::LOG_LEVEL log_bulletin_level_ = logging::LOG_LEVEL::warn; private: mutable std::mutex mutex_; @@ -270,6 +297,8 @@ class ProcessorImpl : public virtual Processor, public ConnectableImpl, public C std::unordered_map<Connection*, std::unordered_set<Processor*>> reachable_processors_; std::string process_group_uuid_; + std::string process_group_name_; + std::string process_group_path_; }; } // namespace core diff --git a/utils/include/utils/TimeUtil.h b/utils/include/utils/TimeUtil.h index c9b1d4534..b8766f42b 100644 --- a/utils/include/utils/TimeUtil.h +++ b/utils/include/utils/TimeUtil.h @@ -104,6 +104,10 @@ inline std::string getRFC2616Format(std::chrono::sys_seconds tp) { return date::format("%a, %d %b %Y %H:%M:%S %Z", tp); } +inline std::string getNiFiDateTimeFormat(std::chrono::sys_seconds tp) { + return date::format("%a %b %d %H:%M:%S %Z %Y", tp); +} + inline date::sys_seconds to_sys_time(const std::tm& t) { using date::year; using date::month; diff --git a/utils/src/core/Processor.cpp b/utils/src/core/Processor.cpp index cf2eb6b53..41d665686 100644 --- a/utils/src/core/Processor.cpp +++ b/utils/src/core/Processor.cpp @@ -37,7 +37,6 @@ #include "range/v3/algorithm/any_of.hpp" #include "fmt/format.h" #include "Exception.h" -#include "core/Processor.h" #include "core/ProcessorMetrics.h" using namespace std::literals::chrono_literals; @@ -370,4 +369,8 @@ std::chrono::steady_clock::duration ProcessorImpl::getYieldTime() const { return std::max(yield_expiration_.load()-std::chrono::steady_clock::now(), std::chrono::steady_clock::duration{0}); } +void ProcessorImpl::setLoggerCallback(const std::function<void(logging::LOG_LEVEL level, const std::string& message)>& callback) { + logger_->addLogCallback(callback); +} + } // namespace org::apache::nifi::minifi::core
