This is an automated email from the ASF dual-hosted git repository. szaszm pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git
commit 34839437140f92ab96149f531ea4d4dd9e63b2b6 Author: Adam Debreceni <[email protected]> AuthorDate: Thu Jul 25 09:22:37 2024 +0000 MINIFICPP-2314 - Send asset state hash in heartbeat, implement c2 asset sync Closes #1751 Signed-off-by: Marton Szasz <[email protected]> --- C2.md | 9 +- CONFIGURE.md | 9 +- conf/minifi.properties | 3 +- .../cluster/containers/MinifiContainer.py | 4 +- encrypt-config/tests/resources/minifi.properties | 2 +- ...th-additional-sensitive-props.minifi.properties | 2 +- libminifi/include/FlowController.h | 4 +- libminifi/include/c2/C2Agent.h | 8 +- libminifi/include/c2/C2Payload.h | 76 +++++- libminifi/include/c2/PayloadParser.h | 27 +- libminifi/include/c2/PayloadSerializer.h | 10 +- libminifi/include/c2/protocols/RESTProtocol.h | 2 +- .../include/core/state/MetricsPublisherStore.h | 3 +- .../include/core/state/nodes/AssetInformation.h | 42 ++++ .../include/core/state/nodes/ResponseNodeLoader.h | 5 +- libminifi/include/properties/Configuration.h | 1 + libminifi/include/utils/file/AssetManager.h | 73 ++++++ libminifi/include/utils/file/PathUtils.h | 22 ++ libminifi/src/Configuration.cpp | 1 + libminifi/src/FlowController.cpp | 5 +- libminifi/src/c2/C2Agent.cpp | 232 +++++++++++++---- libminifi/src/c2/C2Payload.cpp | 14 +- libminifi/src/c2/HeartbeatJsonSerializer.cpp | 15 +- libminifi/src/c2/protocols/RESTProtocol.cpp | 26 +- libminifi/src/c2/protocols/RESTSender.cpp | 13 + libminifi/src/c2/triggers/FileUpdateTrigger.cpp | 4 +- libminifi/src/core/state/MetricsPublisherStore.cpp | 4 +- .../src/core/state/nodes/AssetInformation.cpp | 47 ++++ .../src/core/state/nodes/ResponseNodeLoader.cpp | 14 +- .../src/core/state/nodes/SupportedOperations.cpp | 4 + libminifi/src/utils/file/AssetManager.cpp | 190 ++++++++++++++ libminifi/test/integration/C2AssetSyncTest.cpp | 280 +++++++++++++++++++++ .../integration/C2ClearCoreComponentStateTest.cpp | 2 +- .../test/integration/C2DescribeMetricsTest.cpp | 4 +- libminifi/test/integration/C2MetricsTest.cpp | 12 +- libminifi/test/integration/C2UpdateAssetTest.cpp | 47 ++-- .../test/libtest/integration/HTTPHandlers.cpp | 58 +++-- libminifi/test/libtest/integration/HTTPHandlers.h | 8 +- .../test/libtest/integration/IntegrationBase.cpp | 6 +- .../test/libtest/integration/IntegrationBase.h | 2 + .../test/resources/encrypted.minifi.properties | 2 +- libminifi/test/unit/PayloadParserTests.cpp | 16 +- minifi_main/MiNiFiMain.cpp | 9 +- 43 files changed, 1101 insertions(+), 216 deletions(-) diff --git a/C2.md b/C2.md index 9d84974ff..ad492ec57 100644 --- a/C2.md +++ b/C2.md @@ -66,9 +66,10 @@ be requested via C2 DESCRIBE manifest command. # DeviceInfoNode: basic info about the system (OS, number of cores etc) # AgentInformation: info about the MiNiFi agent, may include the manifest # FlowInformation: information about the current flow, including queue sizes + # AssetInformation: the state of the asset directory managed by the agent # ConfigurationChecksums: hashes of the configuration files; can be used to detect unexpected modifications # the default is - nifi.c2.root.classes=DeviceInfoNode,AgentInformation,FlowInformation + nifi.c2.root.classes=DeviceInfoNode,AgentInformation,FlowInformation,AssetInformation # control c2 heartbeat interval nifi.c2.agent.heartbeat.period=30 sec @@ -80,8 +81,10 @@ be requested via C2 DESCRIBE manifest command. nifi.c2.rest.listener.cacert=<SSL Cert path> # specify the rest URIs if using RESTSender - nifi.c2.rest.url=http://<your-c2-server>/<c2-api-path>/c2-protocol/heartbeat - nifi.c2.rest.url.ack=http://<your-c2-server>/<c2-api-path>/c2-protocol/acknowledge + nifi.c2.rest.path.base=https://<your-c2-server>/<c2-api-path> + # specify either absolute url or relative to the nifi.c2.rest.path.base url for hearbeat and acknowledge + nifi.c2.rest.url=/c2-protocol/heartbeat + nifi.c2.rest.url.ack=/c2-protocol/acknowledge nifi.c2.flow.base.url=http://<your-c2-server>/<c2-api-path>/c2-protocol/ # c2 agent identifier -- must be defined to run agent diff --git a/CONFIGURE.md b/CONFIGURE.md index 9f93c8a44..c645a5bc4 100644 --- a/CONFIGURE.md +++ b/CONFIGURE.md @@ -665,8 +665,13 @@ Additionally, a unique hexadecimal uid.minifi.device.segment should be assigned ### Asset directory -It is possible to make agents download an asset (triggered through the c2 protocol). The target directory can be specified -using the `nifi.asset.directory` agent property, which defaults to `${MINIFI_HOME}/asset`. +There is an asset directory specified using the `nifi.asset.directory` agent property, which defaults to `${MINIFI_HOME}/asset`. +The files referenced in the `.state` file in this directory are managed by the agent. They are deleted, updated, downloaded +using the asset sync c2 command. For the asset sync command to work, the c2 server must be made aware of the current state of the +managed assets by adding the `AssetInformation` entry to the `nifi.c2.root.classes` property. + +Files and directories not referenced in the `.state` file are not directly controlled by the agent, although +it is possible to download an asset (triggered through the c2 protocol) into the asset directory instead. ### Controller Services If you need to reference a controller service in your config.yml file, use the following template. In the example, below, ControllerServiceClass is the name of the class defining the controller Service. ControllerService1 diff --git a/conf/minifi.properties b/conf/minifi.properties index 5e5c29545..425e6e64b 100644 --- a/conf/minifi.properties +++ b/conf/minifi.properties @@ -84,10 +84,11 @@ nifi.content.repository.class.name=DatabaseContentRepository ## base URL of the c2 server, ## very likely the same base url of rest urls #nifi.c2.flow.base.url= +#nifi.c2.rest.path.base= #nifi.c2.rest.url= #nifi.c2.rest.url.ack= #nifi.c2.rest.ssl.context.service= -nifi.c2.root.classes=DeviceInfoNode,AgentInformation,FlowInformation +nifi.c2.root.classes=DeviceInfoNode,AgentInformation,FlowInformation,AssetInformation ## Minimize heartbeat payload size by excluding agent manifest from the heartbeat nifi.c2.full.heartbeat=false ## heartbeat twice a minute diff --git a/docker/test/integration/cluster/containers/MinifiContainer.py b/docker/test/integration/cluster/containers/MinifiContainer.py index bbe56a879..fee054f06 100644 --- a/docker/test/integration/cluster/containers/MinifiContainer.py +++ b/docker/test/integration/cluster/containers/MinifiContainer.py @@ -99,7 +99,7 @@ class MinifiContainer(FlowContainer): f.write(f"nifi.c2.rest.url=http://minifi-c2-server-{self.feature_context.id}:10090/c2/config/heartbeat\n") f.write(f"nifi.c2.rest.url.ack=http://minifi-c2-server-{self.feature_context.id}:10090/c2/config/acknowledge\n") f.write(f"nifi.c2.flow.base.url=http://minifi-c2-server-{self.feature_context.id}:10090/c2/config/\n") - f.write("nifi.c2.root.classes=DeviceInfoNode,AgentInformation,FlowInformation\n") + f.write("nifi.c2.root.classes=DeviceInfoNode,AgentInformation,FlowInformation,AssetInformation\n") f.write("nifi.c2.full.heartbeat=false\n") f.write("nifi.c2.agent.class=minifi-test-class\n") f.write("nifi.c2.agent.identifier=minifi-test-id\n") @@ -109,7 +109,7 @@ class MinifiContainer(FlowContainer): f.write(f"nifi.c2.rest.url.ack=https://minifi-c2-server-{self.feature_context.id}:10090/c2/config/acknowledge\n") f.write("nifi.c2.rest.ssl.context.service=SSLContextService\n") f.write(f"nifi.c2.flow.base.url=https://minifi-c2-server-{self.feature_context.id}:10090/c2/config/\n") - f.write("nifi.c2.root.classes=DeviceInfoNode,AgentInformation,FlowInformation\n") + f.write("nifi.c2.root.classes=DeviceInfoNode,AgentInformation,FlowInformation,AssetInformation\n") f.write("nifi.c2.full.heartbeat=false\n") f.write("nifi.c2.agent.class=minifi-test-class\n") f.write("nifi.c2.agent.identifier=minifi-test-id\n") diff --git a/encrypt-config/tests/resources/minifi.properties b/encrypt-config/tests/resources/minifi.properties index 9bac06775..2f2db68bb 100644 --- a/encrypt-config/tests/resources/minifi.properties +++ b/encrypt-config/tests/resources/minifi.properties @@ -55,7 +55,7 @@ nifi.c2.enable=true nifi.c2.flow.base.url=http://localhost:10080/c2-server/api nifi.c2.rest.url=http://localhost:10080/c2-server/api/c2-protocol/heartbeat nifi.c2.rest.url.ack=http://localhost:10080/c2-server/api/c2-protocol/acknowledge -nifi.c2.root.classes=DeviceInfoNode,AgentInformation,FlowInformation +nifi.c2.root.classes=DeviceInfoNode,AgentInformation,FlowInformation,AssetInformation ## Minimize heartbeat payload size by excluding agent manifest from the heartbeat #nifi.c2.full.heartbeat=false ## heartbeat 4 times a second diff --git a/encrypt-config/tests/resources/with-additional-sensitive-props.minifi.properties b/encrypt-config/tests/resources/with-additional-sensitive-props.minifi.properties index aff36a065..d2702c34d 100644 --- a/encrypt-config/tests/resources/with-additional-sensitive-props.minifi.properties +++ b/encrypt-config/tests/resources/with-additional-sensitive-props.minifi.properties @@ -57,7 +57,7 @@ nifi.c2.enable=true nifi.c2.flow.base.url=http://localhost:10080/c2-server/api nifi.c2.rest.url=http://localhost:10080/c2-server/api/c2-protocol/heartbeat nifi.c2.rest.url.ack=http://localhost:10080/c2-server/api/c2-protocol/acknowledge -nifi.c2.root.classes=DeviceInfoNode,AgentInformation,FlowInformation +nifi.c2.root.classes=DeviceInfoNode,AgentInformation,FlowInformation,AssetInformation ## Minimize heartbeat payload size by excluding agent manifest from the heartbeat #nifi.c2.full.heartbeat=false ## heartbeat 4 times a second diff --git a/libminifi/include/FlowController.h b/libminifi/include/FlowController.h index 6f14f0446..c4a3402b9 100644 --- a/libminifi/include/FlowController.h +++ b/libminifi/include/FlowController.h @@ -55,6 +55,7 @@ #include "TimerDrivenSchedulingAgent.h" #include "utils/Id.h" #include "utils/file/FileSystem.h" +#include "utils/file/AssetManager.h" #include "core/state/nodes/ResponseNodeLoader.h" #include "core/state/MetricsPublisher.h" #include "core/state/MetricsPublisherStore.h" @@ -72,7 +73,8 @@ class FlowController : public core::controller::ForwardingControllerServiceProvi FlowController(std::shared_ptr<core::Repository> provenance_repo, std::shared_ptr<core::Repository> flow_file_repo, std::shared_ptr<Configure> configure, std::shared_ptr<core::FlowConfiguration> flow_configuration, std::shared_ptr<core::ContentRepository> content_repo, std::unique_ptr<state::MetricsPublisherStore> metrics_publisher_store = nullptr, - std::shared_ptr<utils::file::FileSystem> filesystem = std::make_shared<utils::file::FileSystem>(), std::function<void()> request_restart = []{}); + std::shared_ptr<utils::file::FileSystem> filesystem = std::make_shared<utils::file::FileSystem>(), std::function<void()> request_restart = []{}, + utils::file::AssetManager* asset_manager = {}); ~FlowController() override; diff --git a/libminifi/include/c2/C2Agent.h b/libminifi/include/c2/C2Agent.h index 63829fd2c..3f0e12a7c 100644 --- a/libminifi/include/c2/C2Agent.h +++ b/libminifi/include/c2/C2Agent.h @@ -43,6 +43,7 @@ #include "utils/ThreadPool.h" #include "utils/file/FileSystem.h" #include "C2Utils.h" +#include "utils/file/AssetManager.h" namespace org::apache::nifi::minifi::c2 { @@ -62,7 +63,8 @@ class C2Agent : public state::UpdateController { C2Agent(std::shared_ptr<Configure> configuration, std::weak_ptr<state::response::NodeReporter> node_reporter, std::shared_ptr<utils::file::FileSystem> filesystem, - std::function<void()> request_restart); + std::function<void()> request_restart, + utils::file::AssetManager* asset_manager); void initialize(core::controller::ControllerServiceProvider *controller, state::Pausable *pause_handler, state::StateMonitor* update_sink); void start() override; @@ -131,6 +133,8 @@ class C2Agent : public state::UpdateController { */ void handle_describe(const C2ContentResponse &resp); + void handle_sync(const C2ContentResponse &resp); + enum class UpdateResult { NO_UPDATE, @@ -235,6 +239,8 @@ class C2Agent : public state::UpdateController { // time point the last time we performed a heartbeat. std::chrono::steady_clock::time_point last_run_; + + utils::file::AssetManager* asset_manager_; }; } // namespace org::apache::nifi::minifi::c2 diff --git a/libminifi/include/c2/C2Payload.h b/libminifi/include/c2/C2Payload.h index 9ff1aaaa7..09c90540f 100644 --- a/libminifi/include/c2/C2Payload.h +++ b/libminifi/include/c2/C2Payload.h @@ -29,6 +29,7 @@ #include "utils/Enum.h" #include "utils/gsl.h" #include "utils/span.h" +#include "rapidjson/document.h" namespace org::apache::nifi::minifi::c2 { @@ -43,7 +44,8 @@ enum class Operation : uint8_t { clear, transfer, pause, - resume + resume, + sync }; enum class DescribeOperand : uint8_t { @@ -70,6 +72,10 @@ enum class ClearOperand : uint8_t{ corecomponentstate }; +enum class SyncOperand : uint8_t{ + resource +}; + #define PAYLOAD_NO_STATUS 0 #define PAYLOAD_SUCCESS 1 #define PAYLOAD_FAILURE 2 @@ -79,21 +85,65 @@ enum Direction { RECEIVE }; -struct AnnotatedValue : state::response::ValueNode { - using state::response::ValueNode::ValueNode; - using state::response::ValueNode::operator=; +class C2Value { + public: + friend std::ostream& operator<<(std::ostream& out, const C2Value& val); + + C2Value() = default; + C2Value(const C2Value& other) { + (*this) = other; + } + C2Value(C2Value&&) = default; + template<typename T> + requires(std::constructible_from<state::response::ValueNode, T>) + explicit C2Value(T&& value) { value_ = state::response::ValueNode{std::forward<T>(value)}; } + explicit C2Value(const rapidjson::Value& json_value) { + value_.emplace<rapidjson::Document>(); + get<rapidjson::Document>(value_).CopyFrom(json_value, get<rapidjson::Document>(value_).GetAllocator()); + } + explicit C2Value(rapidjson::Document&& json_doc) { + value_ = std::move(json_doc); + } - [[nodiscard]] std::optional<std::reference_wrapper<const AnnotatedValue>> getAnnotation(const std::string& name) const { - auto it = annotations.find(name); - if (it == annotations.end()) { - return {}; + C2Value& operator=(const C2Value& other) { + if (auto* other_val_node = get_if<state::response::ValueNode>(&other.value_)) { + value_ = *other_val_node; + } else { + value_.emplace<rapidjson::Document>(); + get<rapidjson::Document>(value_).CopyFrom(get<rapidjson::Document>(other.value_), get<rapidjson::Document>(value_).GetAllocator()); } - return std::cref(it->second); + return *this; + } + + C2Value& operator=(C2Value&&) = default; + + + bool empty() const { + if (auto* val_node = get_if<state::response::ValueNode>(&value_)) { + return val_node->empty(); + } + return false; + } + + std::string to_string() const { + if (auto* val_node = get_if<state::response::ValueNode>(&value_)) { + return val_node->to_string(); + } + return std::string{get<rapidjson::Document>(value_).GetString(), get<rapidjson::Document>(value_).GetStringLength()}; + } + + const rapidjson::Document* json() const { + return get_if<rapidjson::Document>(&value_); + } + + const state::response::ValueNode* valueNode() const { + return get_if<state::response::ValueNode>(&value_); } - friend std::ostream& operator<<(std::ostream& out, const AnnotatedValue& val); + bool operator==(const C2Value&) const = default; - std::map<std::string, AnnotatedValue> annotations; + private: + std::variant<state::response::ValueNode, rapidjson::Document> value_; }; struct C2ContentResponse { @@ -115,7 +165,7 @@ struct C2ContentResponse { friend std::ostream& operator<<(std::ostream& out, const C2ContentResponse& response); - std::optional<std::string> getArgument(const std::string& arg_name) const { + std::optional<std::string> getStringArgument(const std::string& arg_name) const { if (auto it = operation_arguments.find(arg_name); it != operation_arguments.end()) { return it->second.to_string(); } @@ -134,7 +184,7 @@ struct C2ContentResponse { // name applied to commands std::string name; // commands that correspond with the operation. - std::map<std::string, AnnotatedValue> operation_arguments; + std::map<std::string, C2Value> operation_arguments; }; /** diff --git a/libminifi/include/c2/PayloadParser.h b/libminifi/include/c2/PayloadParser.h index d6a97642b..01b70a58e 100644 --- a/libminifi/include/c2/PayloadParser.h +++ b/libminifi/include/c2/PayloadParser.h @@ -138,27 +138,20 @@ class PayloadParser { } template<typename T> - inline T getAs(const std::string &field) { + inline T getAs(const std::string &field, const std::optional<T>& fallback = std::nullopt) { for (const auto &cmd : ref_.getContent()) { - auto exists = cmd.operation_arguments.find(field); - if (exists != cmd.operation_arguments.end()) { - return convert_if<T>(exists->second.getValue())(); + if (auto it = cmd.operation_arguments.find(field); it != cmd.operation_arguments.end()) { + if (auto* val_node = it->second.valueNode()) { + return convert_if<T>(val_node->getValue())(); + } } } - std::stringstream ss; - ss << "Invalid Field. Could not find " << field << " in " << ref_.getLabel(); - throw PayloadParseException(ss.str()); - } - - template<typename T> - inline T getAs(const std::string &field, const T &fallback) { - for (const auto &cmd : ref_.getContent()) { - auto exists = cmd.operation_arguments.find(field); - if (exists != cmd.operation_arguments.end()) { - return convert_if<T>(exists->second.getValue())(); - } + if (!fallback) { + std::stringstream ss; + ss << "Invalid Field. Could not find " << field << " in " << ref_.getLabel(); + throw PayloadParseException(ss.str()); } - return fallback; + return fallback.value(); } size_t getSize() const { diff --git a/libminifi/include/c2/PayloadSerializer.h b/libminifi/include/c2/PayloadSerializer.h index 17de97792..e9ca060b9 100644 --- a/libminifi/include/c2/PayloadSerializer.h +++ b/libminifi/include/c2/PayloadSerializer.h @@ -42,7 +42,7 @@ class PayloadSerializer { /** * Static function that serializes the value nodes */ - static void serializeValueNode(state::response::ValueNode &value, std::shared_ptr<io::OutputStream> stream) { + static void serializeValueNode(const state::response::ValueNode &value, std::shared_ptr<io::OutputStream> stream) { auto base_type = value.getValue(); if (!base_type) { uint8_t type = 0; @@ -95,7 +95,7 @@ class PayloadSerializer { stream->write(size); for (auto content : payload_content.operation_arguments) { stream->write(content.first); - serializeValueNode(content.second, stream); + serializeValueNode(*gsl::not_null(content.second.valueNode()), stream); } } if (nested_payload.getNestedPayloads().size() > 0) { @@ -170,7 +170,7 @@ class PayloadSerializer { stream->write(size); for (auto content : payload_content.operation_arguments) { stream->write(content.first); - serializeValueNode(content.second, stream); + serializeValueNode(*gsl::not_null(content.second.valueNode()), stream); } } serialize(op, payload, stream); @@ -251,7 +251,7 @@ class PayloadSerializer { for (uint32_t j = 0; j < args; j++) { std::string first, second; stream->read(first); - content.operation_arguments[first] = deserializeValueNode(stream); + content.operation_arguments[first] = C2Value{deserializeValueNode(stream)}; } subPayload.addContent(std::move(content)); } @@ -293,7 +293,7 @@ class PayloadSerializer { std::string first, second; stream.read(first); // stream.readUTF(second); - content.operation_arguments[first] = deserializeValueNode(&stream); + content.operation_arguments[first] = C2Value{deserializeValueNode(&stream)}; } newPayload.addContent(std::move(content)); } diff --git a/libminifi/include/c2/protocols/RESTProtocol.h b/libminifi/include/c2/protocols/RESTProtocol.h index 36c741278..cb04a2de1 100644 --- a/libminifi/include/c2/protocols/RESTProtocol.h +++ b/libminifi/include/c2/protocols/RESTProtocol.h @@ -43,7 +43,7 @@ class RESTProtocol : public HeartbeatJsonSerializer { protected: void initialize(core::controller::ControllerServiceProvider* controller, const std::shared_ptr<Configure> &configure); void serializeNestedPayload(rapidjson::Value& target, const C2Payload& payload, rapidjson::Document::AllocatorType& alloc) override; - static C2Payload parseJsonResponse(const C2Payload &payload, std::span<const std::byte> response); + C2Payload parseJsonResponse(const C2Payload &payload, std::span<const std::byte> response) const; private: bool containsPayload(const C2Payload &o); diff --git a/libminifi/include/core/state/MetricsPublisherStore.h b/libminifi/include/core/state/MetricsPublisherStore.h index 544e82445..a1deb3cec 100644 --- a/libminifi/include/core/state/MetricsPublisherStore.h +++ b/libminifi/include/core/state/MetricsPublisherStore.h @@ -27,13 +27,14 @@ #include "core/state/nodes/ResponseNodeLoader.h" #include "utils/gsl.h" #include "core/ProcessGroup.h" +#include "utils/file/AssetManager.h" namespace org::apache::nifi::minifi::state { class MetricsPublisherStore { public: MetricsPublisherStore(std::shared_ptr<Configure> configuration, const std::vector<std::shared_ptr<core::RepositoryMetricsSource>>& repository_metric_sources, - std::shared_ptr<core::FlowConfiguration> flow_configuration); + std::shared_ptr<core::FlowConfiguration> flow_configuration, utils::file::AssetManager* asset_manager = nullptr); void initialize(core::controller::ControllerServiceProvider* controller, state::StateMonitor* update_sink); void loadMetricNodes(core::ProcessGroup* root); void clearMetricNodes(); diff --git a/libminifi/include/core/state/nodes/AssetInformation.h b/libminifi/include/core/state/nodes/AssetInformation.h new file mode 100644 index 000000000..feffa38a0 --- /dev/null +++ b/libminifi/include/core/state/nodes/AssetInformation.h @@ -0,0 +1,42 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#pragma once + +#include "core/state/nodes/MetricsBase.h" +#include "utils/file/AssetManager.h" +#include "core/logging/Logger.h" + +namespace org::apache::nifi::minifi::state::response { + +class AssetInformation : public ResponseNode { + public: + AssetInformation(); + explicit AssetInformation(std::string_view name, const utils::Identifier& uuid = {}) : ResponseNode(name, uuid) {} + + MINIFIAPI static constexpr const char* Description = "Metric node that defines hash for all asset identifiers"; + + void setAssetManager(utils::file::AssetManager* asset_manager); + + std::string getName() const override { return "resourceInfo"; } + std::vector<SerializedResponseNode> serialize() override; + + private: + utils::file::AssetManager* asset_manager_{nullptr}; + std::shared_ptr<core::logging::Logger> logger_; +}; + +} // namespace org::apache::nifi::minifi::state::response diff --git a/libminifi/include/core/state/nodes/ResponseNodeLoader.h b/libminifi/include/core/state/nodes/ResponseNodeLoader.h index af330585c..9eb55f413 100644 --- a/libminifi/include/core/state/nodes/ResponseNodeLoader.h +++ b/libminifi/include/core/state/nodes/ResponseNodeLoader.h @@ -34,13 +34,14 @@ #include "utils/Id.h" #include "utils/expected.h" #include "core/RepositoryMetricsSource.h" +#include "utils/file/AssetManager.h" namespace org::apache::nifi::minifi::state::response { class ResponseNodeLoader { public: ResponseNodeLoader(std::shared_ptr<Configure> configuration, std::vector<std::shared_ptr<core::RepositoryMetricsSource>> repository_metric_sources, - std::shared_ptr<core::FlowConfiguration> flow_configuration); + std::shared_ptr<core::FlowConfiguration> flow_configuration, utils::file::AssetManager* asset_manager = nullptr); void setNewConfigRoot(core::ProcessGroup* root); void clearConfigRoot(); @@ -62,6 +63,7 @@ class ResponseNodeLoader { void initializeAgentStatus(const SharedResponseNode& response_node) const; void initializeConfigurationChecksums(const SharedResponseNode& response_node) const; void initializeFlowMonitor(const SharedResponseNode& response_node) const; + void initializeAssetInformation(const SharedResponseNode& response_node) const; std::vector<SharedResponseNode> getMatchingComponentMetricsNodes(const std::string& regex_str) const; mutable std::mutex root_mutex_; @@ -73,6 +75,7 @@ class ResponseNodeLoader { std::shared_ptr<Configure> configuration_; std::vector<std::shared_ptr<core::RepositoryMetricsSource>> repository_metric_sources_; std::shared_ptr<core::FlowConfiguration> flow_configuration_; + utils::file::AssetManager* asset_manager_{}; core::controller::ControllerServiceProvider* controller_{}; state::StateMonitor* update_sink_{}; std::shared_ptr<core::logging::Logger> logger_{core::logging::LoggerFactory<ResponseNodeLoader>::getLogger()}; diff --git a/libminifi/include/properties/Configuration.h b/libminifi/include/properties/Configuration.h index 2d9300636..cea438ea9 100644 --- a/libminifi/include/properties/Configuration.h +++ b/libminifi/include/properties/Configuration.h @@ -118,6 +118,7 @@ class Configuration : public Properties { static constexpr const char *nifi_c2_root_class_definitions = "nifi.c2.root.class.definitions"; static constexpr const char *nifi_c2_rest_listener_port = "nifi.c2.rest.listener.port"; static constexpr const char *nifi_c2_rest_listener_cacert = "nifi.c2.rest.listener.cacert"; + static constexpr const char *nifi_c2_rest_path_base = "nifi.c2.rest.path.base"; static constexpr const char *nifi_c2_rest_url = "nifi.c2.rest.url"; static constexpr const char *nifi_c2_rest_url_ack = "nifi.c2.rest.url.ack"; static constexpr const char *nifi_c2_rest_ssl_context_service = "nifi.c2.rest.ssl.context.service"; diff --git a/libminifi/include/utils/file/AssetManager.h b/libminifi/include/utils/file/AssetManager.h new file mode 100644 index 000000000..93424a145 --- /dev/null +++ b/libminifi/include/utils/file/AssetManager.h @@ -0,0 +1,73 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include <filesystem> +#include <functional> +#include <vector> +#include <string> +#include <memory> +#include <set> +#include "core/logging/Logger.h" +#include "utils/expected.h" +#include "properties/Configure.h" + +namespace org::apache::nifi::minifi::utils::file { + +struct AssetDescription { + std::string id; + std::filesystem::path path; + std::string url; + + bool operator<(const AssetDescription& other) const { + return id < other.id; + } +}; + +struct AssetLayout { + std::string digest; + std::set<AssetDescription> assets; + + void clear() { + digest.clear(); + assets.clear(); + } +}; + +class AssetManager { + public: + explicit AssetManager(const Configure& configuration); + + nonstd::expected<void, std::string> sync(const AssetLayout& layout, const std::function<nonstd::expected<std::vector<std::byte>, std::string>(std::string_view /*url*/)>& fetch); + + std::string hash() const; + + std::filesystem::path getRoot() const; + + private: + void refreshState(); + + void persist() const; + + mutable std::recursive_mutex mtx_; + std::filesystem::path root_; + AssetLayout state_; + std::shared_ptr<core::logging::Logger> logger_; +}; + +} // namespace org::apache::nifi::minifi::utils::file diff --git a/libminifi/include/utils/file/PathUtils.h b/libminifi/include/utils/file/PathUtils.h index 1df2e9d89..04886c1a6 100644 --- a/libminifi/include/utils/file/PathUtils.h +++ b/libminifi/include/utils/file/PathUtils.h @@ -25,6 +25,7 @@ #include <string> #include <system_error> #include <utility> +#include "utils/expected.h" namespace org::apache::nifi::minifi::utils::file { @@ -42,6 +43,27 @@ inline std::optional<std::filesystem::path> canonicalize(const std::filesystem:: return result; } +inline nonstd::expected<void, std::string> validateRelativeFilePath(const std::filesystem::path& path) { + if (path.empty()) { + return nonstd::make_unexpected("Empty file path"); + } + if (!path.is_relative()) { + return nonstd::make_unexpected("File path must be a relative path '" + path.string() + "'"); + } + if (!path.has_filename()) { + return nonstd::make_unexpected("Filename missing in output path '" + path.string() + "'"); + } + if (path.filename() == "." || path.filename() == "..") { + return nonstd::make_unexpected("Invalid filename '" + path.filename().string() + "'"); + } + for (const auto& segment : path) { + if (segment == "..") { + return nonstd::make_unexpected("Accessing parent directory is forbidden in file path '" + path.string() + "'"); + } + } + return {}; +} + /** * Represents filesystem space information in bytes diff --git a/libminifi/src/Configuration.cpp b/libminifi/src/Configuration.cpp index 024393e57..116746c9c 100644 --- a/libminifi/src/Configuration.cpp +++ b/libminifi/src/Configuration.cpp @@ -88,6 +88,7 @@ const std::unordered_map<std::string_view, gsl::not_null<const core::PropertyVal {Configuration::nifi_c2_root_class_definitions, gsl::make_not_null(&core::StandardPropertyTypes::VALID_TYPE)}, {Configuration::nifi_c2_rest_listener_port, gsl::make_not_null(&core::StandardPropertyTypes::LISTEN_PORT_TYPE)}, {Configuration::nifi_c2_rest_listener_cacert, gsl::make_not_null(&core::StandardPropertyTypes::VALID_TYPE)}, + {Configuration::nifi_c2_rest_path_base, gsl::make_not_null(&core::StandardPropertyTypes::VALID_TYPE)}, {Configuration::nifi_c2_rest_url, gsl::make_not_null(&core::StandardPropertyTypes::VALID_TYPE)}, {Configuration::nifi_c2_rest_url_ack, gsl::make_not_null(&core::StandardPropertyTypes::VALID_TYPE)}, {Configuration::nifi_c2_rest_ssl_context_service, gsl::make_not_null(&core::StandardPropertyTypes::VALID_TYPE)}, diff --git a/libminifi/src/FlowController.cpp b/libminifi/src/FlowController.cpp index 8e5739d9d..39c24e320 100644 --- a/libminifi/src/FlowController.cpp +++ b/libminifi/src/FlowController.cpp @@ -50,7 +50,8 @@ namespace org::apache::nifi::minifi { FlowController::FlowController(std::shared_ptr<core::Repository> provenance_repo, std::shared_ptr<core::Repository> flow_file_repo, std::shared_ptr<Configure> configure, std::shared_ptr<core::FlowConfiguration> flow_configuration, std::shared_ptr<core::ContentRepository> content_repo, std::unique_ptr<state::MetricsPublisherStore> metrics_publisher_store, - std::shared_ptr<utils::file::FileSystem> filesystem, std::function<void()> request_restart) + std::shared_ptr<utils::file::FileSystem> filesystem, std::function<void()> request_restart, + utils::file::AssetManager* asset_manager) : core::controller::ForwardingControllerServiceProvider(core::className<FlowController>()), running_(false), initialized_(false), @@ -82,7 +83,7 @@ FlowController::FlowController(std::shared_ptr<core::Repository> provenance_repo if (auto publisher = metrics_publisher_store_->getMetricsPublisher(c2::C2_METRICS_PUBLISHER).lock()) { c2_metrics_publisher = std::dynamic_pointer_cast<c2::C2MetricsPublisher>(publisher); } - c2_agent_ = std::make_unique<c2::C2Agent>(configuration_, c2_metrics_publisher, std::move(filesystem), std::move(request_restart)); + c2_agent_ = std::make_unique<c2::C2Agent>(configuration_, c2_metrics_publisher, std::move(filesystem), std::move(request_restart), asset_manager); } if (c2::isControllerSocketEnabled(configuration_)) { diff --git a/libminifi/src/c2/C2Agent.cpp b/libminifi/src/c2/C2Agent.cpp index 5f1097fb4..009c4293a 100644 --- a/libminifi/src/c2/C2Agent.cpp +++ b/libminifi/src/c2/C2Agent.cpp @@ -38,6 +38,7 @@ #include "utils/file/FileManager.h" #include "utils/file/FileSystem.h" #include "http/BaseHTTPClient.h" +#include "utils/file/PathUtils.h" #include "utils/Environment.h" #include "utils/Monitors.h" #include "utils/StringUtils.h" @@ -46,6 +47,7 @@ #include "utils/Id.h" #include "c2/C2Utils.h" #include "c2/protocols/RESTSender.h" +#include "rapidjson/error/en.h" using namespace std::literals::chrono_literals; @@ -54,7 +56,8 @@ namespace org::apache::nifi::minifi::c2 { C2Agent::C2Agent(std::shared_ptr<Configure> configuration, std::weak_ptr<state::response::NodeReporter> node_reporter, std::shared_ptr<utils::file::FileSystem> filesystem, - std::function<void()> request_restart) + std::function<void()> request_restart, + utils::file::AssetManager* asset_manager) : heart_beat_period_(3s), max_c2_responses(5), configuration_(std::move(configuration)), @@ -62,7 +65,8 @@ C2Agent::C2Agent(std::shared_ptr<Configure> configuration, filesystem_(std::move(filesystem)), thread_pool_(2, nullptr, "C2 threadpool"), request_restart_(std::move(request_restart)), - last_run_(std::chrono::steady_clock::now()) { + last_run_(std::chrono::steady_clock::now()), + asset_manager_(asset_manager) { if (!configuration_->getAgentClass()) { logger_->log_info("Agent class is not predefined"); } @@ -251,7 +255,7 @@ void C2Agent::serializeMetrics(C2Payload &metric_payload, const std::string &nam } else { C2ContentResponse response(metric_payload.getOperation()); response.name = name; - response.operation_arguments[metric.name] = metric.value; + response.operation_arguments[metric.name] = C2Value{metric.value}; metric_payload.addContent(std::move(response), is_collapsible); } } @@ -381,6 +385,9 @@ void C2Agent::handle_c2_server_response(const C2ContentResponse &resp) { } break; } + case Operation::sync: + handle_sync(resp); + break; default: break; // do nothing @@ -401,7 +408,7 @@ C2Payload C2Agent::prepareConfigurationOptions(const C2ContentResponse &resp) co if (configuration_->get(key, value)) { C2ContentResponse option(Operation::acknowledge); option.name = key; - option.operation_arguments[key] = value; + option.operation_arguments[key] = C2Value{value}; options.addContent(std::move(option)); } } @@ -530,7 +537,7 @@ void C2Agent::handle_describe(const C2ContentResponse &resp) { for (const auto &line : trace.getTraces()) { C2ContentResponse option(Operation::acknowledge); option.name = line; - option.operation_arguments[line] = line; + option.operation_arguments[line] = C2Value{line}; options.addContent(std::move(option)); } response.addPayload(std::move(options)); @@ -553,7 +560,7 @@ void C2Agent::handle_describe(const C2ContentResponse &resp) { for (const auto& kv : core_component_state.second) { C2ContentResponse entry(Operation::acknowledge); entry.name = kv.first; - entry.operation_arguments[kv.first] = kv.second; + entry.operation_arguments[kv.first] = C2Value{kv.second}; state.addContent(std::move(entry)); } states.addPayload(std::move(state)); @@ -608,12 +615,18 @@ void C2Agent::handlePropertyUpdate(const C2ContentResponse &resp) { }; for (const auto& [name, value] : resp.operation_arguments) { - bool persist = ( - value.getAnnotation("persist") - | utils::transform(&AnnotatedValue::to_string) - | utils::andThen(utils::string::toBool)).value_or(true); - PropertyChangeLifetime lifetime = persist ? PropertyChangeLifetime::PERSISTENT : PropertyChangeLifetime::TRANSIENT; - changeUpdateState(update_property(name, value.to_string(), lifetime)); + if (auto* json_val = value.json()) { + if (json_val->IsObject() && json_val->HasMember("value")) { + PropertyChangeLifetime lifetime = PropertyChangeLifetime::PERSISTENT; + if (json_val->HasMember("persist")) { + lifetime = (*json_val)["persist"].GetBool() ? PropertyChangeLifetime::PERSISTENT : PropertyChangeLifetime::TRANSIENT; + } + std::string property_value{(*json_val)["value"].GetString(), (*json_val)["value"].GetStringLength()}; + changeUpdateState(update_property(name, property_value, lifetime)); + continue; + } + } + changeUpdateState(update_property(name, value.to_string(), PropertyChangeLifetime::PERSISTENT)); } // apply changes and persist properties requested to be persisted const bool propertyWasUpdated = result == state::UpdateState::FULLY_APPLIED || result == state::UpdateState::PARTIALLY_APPLIED; @@ -700,6 +713,160 @@ void C2Agent::handle_transfer(const C2ContentResponse &resp) { } } +void C2Agent::handle_sync(const org::apache::nifi::minifi::c2::C2ContentResponse &resp) { + logger_->log_info("Requested resource synchronization"); + auto send_error = [&] (std::string_view error) { + logger_->log_error("{}", error); + C2Payload response(Operation::acknowledge, state::UpdateState::SET_ERROR, resp.ident, true); + response.setRawData(as_bytes(std::span(error.begin(), error.end()))); + enqueue_c2_response(std::move(response)); + }; + + if (!asset_manager_) { + send_error("Internal error: no asset manager"); + return; + } + + SyncOperand operand = SyncOperand::resource; + try { + operand = utils::enumCast<SyncOperand>(resp.name, true); + } catch(const std::runtime_error&) { + send_error("Unknown operand '" + resp.name + "'"); + return; + } + + gsl_Assert(operand == SyncOperand::resource); + + utils::file::AssetLayout asset_layout; + + auto state_it = resp.operation_arguments.find("globalHash"); + if (state_it == resp.operation_arguments.end()) { + send_error("Malformed request, missing 'globalHash' argument"); + return; + } + + const rapidjson::Document* state_doc = state_it->second.json(); + if (!state_doc) { + send_error("Argument 'globalHash' is malformed"); + return; + } + + if (!state_doc->IsObject()) { + send_error("Malformed request, 'globalHash' is not an object"); + return; + } + + if (!state_doc->HasMember("digest")) { + send_error("Malformed request, 'globalHash' has no member 'digest'"); + return; + } + if (!(*state_doc)["digest"].IsString()) { + send_error("Malformed request, 'globalHash.digest' is not a string"); + return; + } + + asset_layout.digest = std::string{(*state_doc)["digest"].GetString(), (*state_doc)["digest"].GetStringLength()}; + + auto resource_list_it = resp.operation_arguments.find("resourceList"); + if (resource_list_it == resp.operation_arguments.end()) { + send_error("Malformed request, missing 'resourceList' argument"); + return; + } + + const rapidjson::Document* resource_list = resource_list_it->second.json(); + if (!resource_list) { + send_error("Argument 'resourceList' is malformed"); + return; + } + if (!resource_list->IsArray()) { + send_error("Malformed request, 'resourceList' is not an array"); + return; + } + + for (rapidjson::SizeType resource_idx = 0; resource_idx < resource_list->Size(); ++resource_idx) { + auto& resource = resource_list->GetArray()[resource_idx]; + if (!resource.IsObject()) { + send_error(fmt::format("Malformed request, 'resourceList[{}]' is not an object", resource_idx)); + return; + } + auto get_member_str = [&] (const char* key) -> nonstd::expected<std::string_view, std::string> { + if (!resource.HasMember(key)) { + return nonstd::make_unexpected(fmt::format("Malformed request, 'resourceList[{}]' has no member '{}'", resource_idx, key)); + } + if (!resource[key].IsString()) { + return nonstd::make_unexpected(fmt::format("Malformed request, 'resourceList[{}].{}' is not a string", resource_idx, key)); + } + return std::string_view{resource[key].GetString(), resource[key].GetStringLength()}; + }; + auto id = get_member_str("resourceId"); + if (!id) { + send_error(id.error()); + return; + } + auto name = get_member_str("resourceName"); + if (!name) { + send_error(name.error()); + return; + } + auto type = get_member_str("resourceType"); + if (!type) { + send_error(type.error()); + return; + } + if (type.value() != "ASSET") { + logger_->log_info("Resource (id = '{}', name = '{}') with type '{}' is not yet supported", id.value(), name.value(), type.value()); + continue; + } + auto path = get_member_str("resourcePath"); + if (!path) { + send_error(path.error()); + return; + } + auto url = get_member_str("url"); + if (!url) { + send_error(url.error()); + return; + } + + auto full_path = std::filesystem::path{path.value()} / name.value(); // NOLINT(whitespace/braces) + + auto path_valid = utils::file::validateRelativeFilePath(full_path); + if (!path_valid) { + send_error(path_valid.error()); + return; + } + + asset_layout.assets.insert(utils::file::AssetDescription{ + .id = std::string{id.value()}, + .path = full_path, + .url = std::string{url.value()} + }); + } + + auto fetch = [&] (std::string_view url) -> nonstd::expected<std::vector<std::byte>, std::string> { + auto resolved_url = resolveUrl(std::string{url}); + if (!resolved_url) { + return nonstd::make_unexpected("Couldn't resolve url"); + } + C2Payload file_response = protocol_->fetch(resolved_url.value()); + + if (file_response.getStatus().getState() != state::UpdateState::READ_COMPLETE) { + return nonstd::make_unexpected("Failed to fetch file from " + resolved_url.value()); + } + + return std::move(file_response).moveRawData(); + }; + + auto result = asset_manager_->sync(asset_layout, fetch); + if (!result) { + send_error(result.error()); + return; + } + + C2Payload response(Operation::acknowledge, state::UpdateState::FULLY_APPLIED, resp.ident, true); + enqueue_c2_response(std::move(response)); +} + utils::TaskRescheduleInfo C2Agent::produce() { // place priority on messages to send to the c2 server if (protocol_ != nullptr) { @@ -789,6 +956,9 @@ std::optional<std::string> C2Agent::resolveUrl(const std::string& url) const { return url; } std::string base; + if (configuration_->get(Configuration::nifi_c2_rest_path_base, base)) { + return base + url; + } if (!configuration_->get(Configuration::nifi_c2_rest_url, "c2.rest.url", base)) { logger_->log_error("Missing C2 REST URL"); return std::nullopt; @@ -891,27 +1061,6 @@ static auto make_path(const std::string& str) { return std::filesystem::path(str); } -static std::optional<std::string> validateFilePath(const std::filesystem::path& path) { - if (path.empty()) { - return "Empty file path"; - } - if (!path.is_relative()) { - return "File path must be a relative path '" + path.string() + "'"; - } - if (!path.has_filename()) { - return "Filename missing in output path '" + path.string() + "'"; - } - if (path.filename() == "." || path.filename() == "..") { - return "Invalid filename '" + path.filename().string() + "'"; - } - for (const auto& segment : path) { - if (segment == "..") { - return "Accessing parent directory is forbidden in file path '" + path.string() + "'"; - } - } - return std::nullopt; -} - void C2Agent::handleAssetUpdate(const C2ContentResponse& resp) { auto send_error = [&] (std::string_view error) { logger_->log_error("{}", error); @@ -919,19 +1068,16 @@ void C2Agent::handleAssetUpdate(const C2ContentResponse& resp) { response.setRawData(as_bytes(std::span(error.begin(), error.end()))); enqueue_c2_response(std::move(response)); }; - std::filesystem::path asset_dir = configuration_->getHome() / "asset"; - if (auto asset_dir_str = configuration_->get(Configuration::nifi_asset_directory)) { - asset_dir = asset_dir_str.value(); - } // output file std::filesystem::path file_path; - if (auto file_rel = resp.getArgument("file") | utils::transform(make_path)) { - if (auto error = validateFilePath(file_rel.value())) { - send_error(error.value()); + if (auto file_rel = resp.getStringArgument("file") | utils::transform(make_path)) { + auto result = utils::file::validateRelativeFilePath(file_rel.value()); + if (!result) { + send_error(result.error()); return; } - file_path = asset_dir / file_rel.value(); + file_path = asset_manager_->getRoot() / file_rel.value(); } else { send_error("Couldn't find 'file' argument"); return; @@ -939,7 +1085,7 @@ void C2Agent::handleAssetUpdate(const C2ContentResponse& resp) { // source url std::string url; - if (auto url_str = resp.getArgument("url")) { + if (auto url_str = resp.getStringArgument("url")) { if (auto resolved_url = resolveUrl(*url_str)) { url = resolved_url.value(); } else { @@ -953,7 +1099,7 @@ void C2Agent::handleAssetUpdate(const C2ContentResponse& resp) { // forceDownload bool force_download = false; - if (auto force_download_str = resp.getArgument("forceDownload")) { + if (auto force_download_str = resp.getStringArgument("forceDownload")) { if (utils::string::equalsIgnoreCase(force_download_str.value(), "true")) { force_download = true; } else if (utils::string::equalsIgnoreCase(force_download_str.value(), "false")) { diff --git a/libminifi/src/c2/C2Payload.cpp b/libminifi/src/c2/C2Payload.cpp index e70413cd5..f21f60437 100644 --- a/libminifi/src/c2/C2Payload.cpp +++ b/libminifi/src/c2/C2Payload.cpp @@ -131,14 +131,14 @@ std::ostream& operator<<(std::ostream& out, const C2ContentResponse& response) { << "}"; } -std::ostream& operator<<(std::ostream& out, const AnnotatedValue& val) { - if (val.value_) { - out << '"' << val.value_->c_str() << '"'; +std::ostream& operator<<(std::ostream& out, const C2Value& val) { + if (auto* val_ptr = val.valueNode()) { + out << '"' << val_ptr->to_string() << '"'; } else { - out << "<null>"; - } - if (!val.annotations.empty()) { - out << val.annotations; + rapidjson::StringBuffer buffer; + rapidjson::Writer<rapidjson::StringBuffer> writer(buffer); + gsl::not_null(val.json())->Accept(writer); + out << std::string_view{buffer.GetString(), buffer.GetLength()}; } return out; } diff --git a/libminifi/src/c2/HeartbeatJsonSerializer.cpp b/libminifi/src/c2/HeartbeatJsonSerializer.cpp index d15ecf71a..a25b47f77 100644 --- a/libminifi/src/c2/HeartbeatJsonSerializer.cpp +++ b/libminifi/src/c2/HeartbeatJsonSerializer.cpp @@ -60,9 +60,16 @@ static void serializeOperationInfo(rapidjson::Value& target, const C2Payload& pa target.AddMember("identifier", rapidjson::Value(id.c_str(), alloc), alloc); } -static void setJsonStr(const std::string& key, const state::response::ValueNode& value, rapidjson::Value& parent, rapidjson::Document::AllocatorType& alloc) { // NOLINT +static void setJsonStr(const std::string& key, const c2::C2Value& value, rapidjson::Value& parent, rapidjson::Document::AllocatorType& alloc) { rapidjson::Value valueVal; - auto base_type = value.getValue(); + + if (auto* json_val = value.json()) { + valueVal.CopyFrom(*json_val, alloc); + parent.AddMember(rapidjson::Value(key.c_str(), alloc), valueVal, alloc); + return; + } + + auto base_type = gsl::not_null(value.valueNode())->getValue(); auto type_index = base_type->getTypeIndex(); if (auto sub_type = std::dynamic_pointer_cast<core::TransformableValue>(base_type)) { @@ -156,9 +163,9 @@ static rapidjson::Value serializeConnectionQueues(const C2Payload& payload, std: updatedContent.name = uuid; adjusted.setLabel(uuid); adjusted.setIdentifier(uuid); - c2::AnnotatedValue nd; + c2::C2Value nd; // name should be what was previously the TLN ( top level node ) - nd = name; + nd = C2Value{name}; updatedContent.operation_arguments.insert(std::make_pair("name", nd)); // the rvalue reference is an unfortunate side effect of the underlying API decision. adjusted.addContent(std::move(updatedContent), true); diff --git a/libminifi/src/c2/protocols/RESTProtocol.cpp b/libminifi/src/c2/protocols/RESTProtocol.cpp index ca4f3faea..fee4b6327 100644 --- a/libminifi/src/c2/protocols/RESTProtocol.cpp +++ b/libminifi/src/c2/protocols/RESTProtocol.cpp @@ -28,6 +28,7 @@ #include <string> #include <utility> +#include "rapidjson/error/en.h" #include "core/TypedValues.h" #include "utils/gsl.h" #include "properties/Configuration.h" @@ -36,26 +37,7 @@ namespace org::apache::nifi::minifi::c2 { - -AnnotatedValue parseAnnotatedValue(const rapidjson::Value& jsonValue) { - AnnotatedValue result; - if (jsonValue.IsObject() && jsonValue.HasMember("value")) { - result = jsonValue["value"].GetString(); - for (const auto& annotation : jsonValue.GetObject()) { - if (annotation.name.GetString() == std::string("value")) { - continue; - } - result.annotations[annotation.name.GetString()] = parseAnnotatedValue(annotation.value); - } - } else if (jsonValue.IsBool()) { - result = jsonValue.GetBool(); - } else { - result = jsonValue.GetString(); - } - return result; -} - -C2Payload RESTProtocol::parseJsonResponse(const C2Payload &payload, std::span<const std::byte> response) { +C2Payload RESTProtocol::parseJsonResponse(const C2Payload &payload, std::span<const std::byte> response) const { rapidjson::Document root; try { @@ -123,7 +105,7 @@ C2Payload RESTProtocol::parseJsonResponse(const C2Payload &payload, std::span<co for (auto key : {"content", "args"}) { if (request.HasMember(key) && request[key].IsObject()) { for (const auto &member : request[key].GetObject()) { - new_command.operation_arguments[member.name.GetString()] = parseAnnotatedValue(member.value); + new_command.operation_arguments[member.name.GetString()] = C2Value{member.value}; } break; } @@ -136,6 +118,8 @@ C2Payload RESTProtocol::parseJsonResponse(const C2Payload &payload, std::span<co // we have a response for this request return new_payload; // } + } else { + logger_->log_error("Failed to parse json response: {} at {}", rapidjson::GetParseError_En(ok.Code()), ok.Offset()); } } catch (...) { } diff --git a/libminifi/src/c2/protocols/RESTSender.cpp b/libminifi/src/c2/protocols/RESTSender.cpp index 98642b009..659ed51db 100644 --- a/libminifi/src/c2/protocols/RESTSender.cpp +++ b/libminifi/src/c2/protocols/RESTSender.cpp @@ -41,10 +41,23 @@ void RESTSender::initialize(core::controller::ControllerServiceProvider* control RESTProtocol::initialize(controller, configure); // base URL when one is not specified. if (nullptr != configure) { + std::optional<std::string> rest_base_path = configure->get(Configuration::nifi_c2_rest_path_base); std::string update_str; std::string ssl_context_service_str; configure->get(Configuration::nifi_c2_rest_url, "c2.rest.url", rest_uri_); configure->get(Configuration::nifi_c2_rest_url_ack, "c2.rest.url.ack", ack_uri_); + if (rest_uri_.starts_with("/")) { + if (!rest_base_path) { + throw Exception(ExceptionType::GENERAL_EXCEPTION, "Cannot use relative nifi.c2.rest.url unless the nifi.c2.rest.path.base is set"); + } + rest_uri_ = rest_base_path.value() + rest_uri_; + } + if (ack_uri_.starts_with("/")) { + if (!rest_base_path) { + throw Exception(ExceptionType::GENERAL_EXCEPTION, "Cannot use relative nifi.c2.rest.url.ack unless the nifi.c2.rest.path.base is set"); + } + ack_uri_ = rest_base_path.value() + ack_uri_; + } if (controller && configure->get(Configuration::nifi_c2_rest_ssl_context_service, "c2.rest.ssl.context.service", ssl_context_service_str)) { if (auto service = controller->getControllerService(ssl_context_service_str)) { ssl_context_service_ = std::static_pointer_cast<minifi::controllers::SSLContextService>(service); diff --git a/libminifi/src/c2/triggers/FileUpdateTrigger.cpp b/libminifi/src/c2/triggers/FileUpdateTrigger.cpp index c3e94daa2..28bf3082b 100644 --- a/libminifi/src/c2/triggers/FileUpdateTrigger.cpp +++ b/libminifi/src/c2/triggers/FileUpdateTrigger.cpp @@ -29,8 +29,8 @@ C2Payload FileUpdateTrigger::getAction() { C2ContentResponse resp(Operation::update); resp.ident = "triggered"; resp.name = "configuration"; - resp.operation_arguments["location"] = file_; - resp.operation_arguments["persist"] = "true"; + resp.operation_arguments["location"] = C2Value{file_}; + resp.operation_arguments["persist"] = C2Value{"true"}; response_payload.addContent(std::move(resp)); update_ = false; return response_payload; diff --git a/libminifi/src/core/state/MetricsPublisherStore.cpp b/libminifi/src/core/state/MetricsPublisherStore.cpp index 269cb6267..abd1550d8 100644 --- a/libminifi/src/core/state/MetricsPublisherStore.cpp +++ b/libminifi/src/core/state/MetricsPublisherStore.cpp @@ -23,9 +23,9 @@ namespace org::apache::nifi::minifi::state { MetricsPublisherStore::MetricsPublisherStore(std::shared_ptr<Configure> configuration, const std::vector<std::shared_ptr<core::RepositoryMetricsSource>>& repository_metric_sources, - std::shared_ptr<core::FlowConfiguration> flow_configuration) + std::shared_ptr<core::FlowConfiguration> flow_configuration, utils::file::AssetManager* asset_manager) : configuration_(configuration), - response_node_loader_(std::make_shared<response::ResponseNodeLoader>(std::move(configuration), repository_metric_sources, std::move(flow_configuration))) { + response_node_loader_(std::make_shared<response::ResponseNodeLoader>(std::move(configuration), repository_metric_sources, std::move(flow_configuration), asset_manager)) { } void MetricsPublisherStore::initialize(core::controller::ControllerServiceProvider* controller, state::StateMonitor* update_sink) { diff --git a/libminifi/src/core/state/nodes/AssetInformation.cpp b/libminifi/src/core/state/nodes/AssetInformation.cpp new file mode 100644 index 000000000..4a243b607 --- /dev/null +++ b/libminifi/src/core/state/nodes/AssetInformation.cpp @@ -0,0 +1,47 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "core/state/nodes/AssetInformation.h" +#include "core/Resource.h" +#include "core/logging/LoggerFactory.h" + +namespace org::apache::nifi::minifi::state::response { + +AssetInformation::AssetInformation() + : logger_(core::logging::LoggerFactory<AssetInformation>().getLogger()) {} + +void AssetInformation::setAssetManager(utils::file::AssetManager* asset_manager) { + asset_manager_ = asset_manager; + if (!asset_manager_) { + logger_->log_error("No asset manager is provided, asset information will not be available"); + } +} + +std::vector<SerializedResponseNode> AssetInformation::serialize() { + if (!asset_manager_) { + return {}; + } + SerializedResponseNode node; + node.name = "hash"; + node.value = asset_manager_->hash(); + + return std::vector<SerializedResponseNode>{node}; +} + +REGISTER_RESOURCE(AssetInformation, DescriptionOnly); + +} // namespace org::apache::nifi::minifi::state::response diff --git a/libminifi/src/core/state/nodes/ResponseNodeLoader.cpp b/libminifi/src/core/state/nodes/ResponseNodeLoader.cpp index 58ff6ecea..8d9dc1845 100644 --- a/libminifi/src/core/state/nodes/ResponseNodeLoader.cpp +++ b/libminifi/src/core/state/nodes/ResponseNodeLoader.cpp @@ -25,6 +25,7 @@ #include "core/state/nodes/QueueMetrics.h" #include "core/state/nodes/AgentInformation.h" #include "core/state/nodes/ConfigurationChecksums.h" +#include "core/state/nodes/AssetInformation.h" #include "utils/gsl.h" #include "utils/RegexUtils.h" #include "utils/StringUtils.h" @@ -33,10 +34,11 @@ namespace org::apache::nifi::minifi::state::response { ResponseNodeLoader::ResponseNodeLoader(std::shared_ptr<Configure> configuration, std::vector<std::shared_ptr<core::RepositoryMetricsSource>> repository_metric_sources, - std::shared_ptr<core::FlowConfiguration> flow_configuration) + std::shared_ptr<core::FlowConfiguration> flow_configuration, utils::file::AssetManager* asset_manager) : configuration_(std::move(configuration)), repository_metric_sources_(std::move(repository_metric_sources)), - flow_configuration_(std::move(flow_configuration)) { + flow_configuration_(std::move(flow_configuration)), + asset_manager_(asset_manager) { } void ResponseNodeLoader::clearConfigRoot() { @@ -194,6 +196,13 @@ void ResponseNodeLoader::initializeConfigurationChecksums(const SharedResponseNo } } +void ResponseNodeLoader::initializeAssetInformation(const SharedResponseNode& response_node) const { + auto asset_info = dynamic_cast<state::response::AssetInformation*>(response_node.get()); + if (asset_info) { + asset_info->setAssetManager(asset_manager_); + } +} + void ResponseNodeLoader::initializeFlowMonitor(const SharedResponseNode& response_node) const { auto flowMonitor = dynamic_cast<state::response::FlowMonitor*>(response_node.get()); if (flowMonitor == nullptr) { @@ -231,6 +240,7 @@ std::vector<SharedResponseNode> ResponseNodeLoader::loadResponseNodes(const std: initializeAgentStatus(response_node); initializeConfigurationChecksums(response_node); initializeFlowMonitor(response_node); + initializeAssetInformation(response_node); } return response_nodes; } diff --git a/libminifi/src/core/state/nodes/SupportedOperations.cpp b/libminifi/src/core/state/nodes/SupportedOperations.cpp index b0681415f..51f86c40b 100644 --- a/libminifi/src/core/state/nodes/SupportedOperations.cpp +++ b/libminifi/src/core/state/nodes/SupportedOperations.cpp @@ -110,6 +110,10 @@ void SupportedOperations::fillProperties(SerializedResponseNode& properties, min } break; } + case minifi::c2::Operation::sync: { + serializeProperty<minifi::c2::SyncOperand>(properties); + break; + } default: break; } diff --git a/libminifi/src/utils/file/AssetManager.cpp b/libminifi/src/utils/file/AssetManager.cpp new file mode 100644 index 000000000..3444cb55c --- /dev/null +++ b/libminifi/src/utils/file/AssetManager.cpp @@ -0,0 +1,190 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "utils/file/AssetManager.h" +#include "utils/file/FileUtils.h" +#include "rapidjson/document.h" +#include "rapidjson/writer.h" +#include "core/logging/LoggerFactory.h" +#include "utils/Hash.h" + +#undef GetObject // windows.h #defines GetObject = GetObjectA or GetObjectW, which conflicts with rapidjson + +namespace org::apache::nifi::minifi::utils::file { + +AssetManager::AssetManager(const Configure& configuration) + : root_(configuration.get(Configure::nifi_asset_directory).value_or((configuration.getHome() / "asset").string())), + logger_(core::logging::LoggerFactory<AssetManager>::getLogger()) { + refreshState(); +} + +void AssetManager::refreshState() { + std::lock_guard lock(mtx_); + state_.clear(); + if (!FileUtils::exists(root_)) { + std::filesystem::create_directories(root_); + } + if (!FileUtils::exists(root_ / ".state")) { + std::ofstream{root_ / ".state", std::ios::binary} << R"({"digest": "", "assets": {}})"; + } + rapidjson::Document doc; + + std::string file_content = get_content(root_ / ".state"); + + rapidjson::ParseResult res = doc.Parse(file_content.c_str(), file_content.size()); + if (!res) { + logger_->log_error("Failed to parse asset '.state' file, not a valid json file"); + return; + } + if (!doc.IsObject()) { + logger_->log_error("Asset '.state' file is malformed"); + return; + } + if (!doc.HasMember("digest")) { + logger_->log_error("Asset '.state' file is malformed, missing 'digest'"); + return; + } + if (!doc["digest"].IsString()) { + logger_->log_error("Asset '.state' file is malformed, 'digest' is not a string"); + return; + } + if (!doc.HasMember("assets")) { + logger_->log_error("Asset '.state' file is malformed, missing 'assets'"); + return; + } + if (!doc["assets"].IsObject()) { + logger_->log_error("Asset '.state' file is malformed, 'assets' is not an object"); + return; + } + + + AssetLayout new_state; + new_state.digest = std::string{doc["digest"].GetString(), doc["digest"].GetStringLength()}; + + for (auto& [id, entry] : doc["assets"].GetObject()) { + if (!entry.IsObject()) { + logger_->log_error("Asset '.state' file is malformed, 'assets.{}' is not an object", std::string_view{id.GetString(), id.GetStringLength()}); + return; + } + AssetDescription description; + description.id = std::string{id.GetString(), id.GetStringLength()}; + if (!entry.HasMember("path") || !entry["path"].IsString()) { + logger_->log_error("Asset '.state' file is malformed, 'assets.{}.path' does not exist or is not a string", std::string_view{id.GetString(), id.GetStringLength()}); + return; + } + description.path = std::string{entry["path"].GetString(), entry["path"].GetStringLength()}; + if (!entry.HasMember("url") || !entry["url"].IsString()) { + logger_->log_error("Asset '.state' file is malformed, 'assets.{}.url' does not exist or is not a string", std::string_view{id.GetString(), id.GetStringLength()}); + return; + } + description.url = std::string{entry["url"].GetString(), entry["url"].GetStringLength()}; + + if (FileUtils::exists(root_ / description.path)) { + new_state.assets.insert(std::move(description)); + } else { + logger_->log_error("Asset '.state' file contains entry '{}' that does not exist on the filesystem at '{}'", + std::string_view{id.GetString(), id.GetStringLength()}, (root_ / description.path).string()); + } + } + state_ = std::move(new_state); +} + +std::string AssetManager::hash() const { + std::lock_guard lock(mtx_); + return state_.digest.empty() ? "null" : state_.digest; +} + +nonstd::expected<void, std::string> AssetManager::sync( + const AssetLayout& layout, + const std::function<nonstd::expected<std::vector<std::byte>, std::string>(std::string_view /*url*/)>& fetch) { + logger_->log_info("Synchronizing assets"); + std::lock_guard lock(mtx_); + AssetLayout new_state{ + .digest = state_.digest, + .assets = {} + }; + std::string fetch_errors; + std::vector<std::pair<std::filesystem::path, std::vector<std::byte>>> new_file_contents; + for (auto& new_entry : layout.assets) { + if (std::find_if(state_.assets.begin(), state_.assets.end(), [&] (auto& entry) {return entry.id == new_entry.id;}) == state_.assets.end()) { + logger_->log_info("Fetching asset (id = '{}', path = '{}') from {}", new_entry.id, new_entry.path.string(), new_entry.url); + if (auto data = fetch(new_entry.url)) { + new_file_contents.emplace_back(new_entry.path, data.value()); + new_state.assets.insert(new_entry); + } else { + logger_->log_error("Failed to fetch asset (id = '{}', path = '{}') from {}: {}", new_entry.id, new_entry.path.string(), new_entry.url, data.error()); + fetch_errors += "Failed to fetch '" + new_entry.id + "' from '" + new_entry.url + "': " + data.error() + "\n"; + } + } else { + logger_->log_info("Asset (id = '{}', path = '{}') already exists", new_entry.id, new_entry.path.string()); + new_state.assets.insert(new_entry); + } + } + if (fetch_errors.empty()) { + new_state.digest = layout.digest; + } + + for (auto& old_entry : state_.assets) { + if (std::find_if(layout.assets.begin(), layout.assets.end(), [&] (auto& entry) {return entry.id == old_entry.id;}) == layout.assets.end()) { + logger_->log_info("We no longer need asset (id = '{}', path = '{}')", old_entry.id, old_entry.path.string()); + std::filesystem::remove(root_ / old_entry.path); + } + } + + for (auto& [path, content] : new_file_contents) { + create_dir((root_ / path).parent_path()); + std::ofstream{root_ / path, std::ios::binary}.write(reinterpret_cast<const char*>(content.data()), gsl::narrow<std::streamsize>(content.size())); + } + + state_ = std::move(new_state); + persist(); + + if (!fetch_errors.empty()) { + return nonstd::make_unexpected(fetch_errors); + } + + return {}; +} + +void AssetManager::persist() const { + std::lock_guard lock(mtx_); + rapidjson::Document doc; + doc.SetObject(); + + doc.AddMember(rapidjson::StringRef("digest"), rapidjson::Value{state_.digest, doc.GetAllocator()}, doc.GetAllocator()); + doc.AddMember(rapidjson::StringRef("assets"), rapidjson::Value{rapidjson::kObjectType}, doc.GetAllocator()); + + for (auto& entry : state_.assets) { + rapidjson::Value entry_val(rapidjson::kObjectType); + entry_val.AddMember(rapidjson::StringRef("path"), rapidjson::Value(entry.path.generic_string(), doc.GetAllocator()), doc.GetAllocator()); + entry_val.AddMember(rapidjson::StringRef("url"), rapidjson::StringRef(entry.url), doc.GetAllocator()); + doc["assets"].AddMember(rapidjson::StringRef(entry.id), entry_val, doc.GetAllocator()); + } + + rapidjson::StringBuffer buffer; + rapidjson::Writer<rapidjson::StringBuffer> writer(buffer); + doc.Accept(writer); + + std::ofstream{root_ / ".state", std::ios::binary}.write(buffer.GetString(), gsl::narrow<std::streamsize>(buffer.GetSize())); +} + +std::filesystem::path AssetManager::getRoot() const { + std::lock_guard lock(mtx_); + return root_; +} + +} // namespace org::apache::nifi::minifi::utils::file diff --git a/libminifi/test/integration/C2AssetSyncTest.cpp b/libminifi/test/integration/C2AssetSyncTest.cpp new file mode 100644 index 000000000..5731025eb --- /dev/null +++ b/libminifi/test/integration/C2AssetSyncTest.cpp @@ -0,0 +1,280 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include <vector> +#include <string> +#include <fstream> +#include <iterator> + +#include "integration/HTTPIntegrationBase.h" +#include "integration/HTTPHandlers.h" +#include "utils/file/FileUtils.h" +#include "utils/file/AssetManager.h" +#include "unit/TestUtils.h" +#include "unit/Catch.h" + +namespace org::apache::nifi::minifi::test { + +class FileProvider : public ServerAwareHandler { + public: + explicit FileProvider(std::string file_content): file_content_(std::move(file_content)) {} + + bool handleGet(CivetServer* /*server*/, struct mg_connection* conn) override { + mg_printf(conn, "HTTP/1.1 200 OK\r\nContent-Type: " + "text/plain\r\nContent-Length: %lu\r\nConnection: close\r\n\r\n", + file_content_.length()); + mg_printf(conn, "%s", file_content_.c_str()); + return true; + } + + private: + std::string file_content_; +}; + +class C2HeartbeatHandler : public HeartbeatHandler { + public: + using HeartbeatHandler::HeartbeatHandler; + using AssetDescription = org::apache::nifi::minifi::utils::file::AssetDescription; + + void handleHeartbeat(const rapidjson::Document& root, struct mg_connection* conn) override { + std::string hb_str = [&] { + rapidjson::StringBuffer buffer; + rapidjson::Writer<rapidjson::StringBuffer> writer(buffer); + root.Accept(writer); + return std::string{buffer.GetString(), buffer.GetSize()}; + }(); + auto& asset_info_node = root["resourceInfo"]; + auto& asset_hash_node = asset_info_node["hash"]; + std::string asset_hash{asset_hash_node.GetString(), asset_hash_node.GetStringLength()}; + + std::vector<C2Operation> operations; + { + std::lock_guard guard(asset_mtx_); + agent_asset_hash_ = asset_hash; + if (asset_hash != calculateAssetHash()) { + std::unordered_map<std::string, c2::C2Value> args; + rapidjson::Document global_hash_doc{rapidjson::kObjectType}; + global_hash_doc.AddMember("digest", calculateAssetHash(), global_hash_doc.GetAllocator()); + args["globalHash"] = minifi::c2::C2Value{std::move(global_hash_doc)}; + rapidjson::Document resource_list_doc{rapidjson::kArrayType}; + + for (auto& asset : expected_assets_) { + rapidjson::Value resource_obj{rapidjson::kObjectType}; + resource_obj.AddMember("resourceId", asset.id, resource_list_doc.GetAllocator()); + resource_obj.AddMember("resourceName", asset.path.filename().string(), resource_list_doc.GetAllocator()); + resource_obj.AddMember("resourceType", "ASSET", resource_list_doc.GetAllocator()); + resource_obj.AddMember("resourcePath", asset.path.parent_path().string(), resource_list_doc.GetAllocator()); + resource_obj.AddMember("url", asset.url, resource_list_doc.GetAllocator()); + resource_list_doc.PushBack(resource_obj, resource_list_doc.GetAllocator()); + } + args["resourceList"] = minifi::c2::C2Value{std::move(resource_list_doc)}; + + operations.push_back(C2Operation{ + .operation = "sync", + .operand = "resource", + .operation_id = std::to_string(next_op_id_++), + .args = std::move(args) + }); + } + } + sendHeartbeatResponse(operations, conn); + } + + void addAsset(std::string id, std::string path, std::string url) { + std::lock_guard guard(asset_mtx_); + expected_assets_.insert(AssetDescription{ + .id = std::move(id), + .path = std::move(path), + .url = std::move(url) + }); + } + + void removeAsset(std::string id) { + std::lock_guard guard{asset_mtx_}; + expected_assets_.erase(AssetDescription{.id = std::move(id), .path = {}, .url = {}}); + } + + std::optional<std::string> getAgentAssetHash() const { + std::lock_guard lock(asset_mtx_); + return agent_asset_hash_; + } + + std::string calculateAssetHash() const { + std::lock_guard guard{asset_mtx_}; + size_t hash_value{0}; + for (auto& asset : expected_assets_) { + hash_value = minifi::utils::hash_combine(hash_value, std::hash<std::string>{}(asset.id)); + } + return std::to_string(hash_value); + } + + std::string assetState() const { + std::lock_guard guard{asset_mtx_}; + rapidjson::Document doc; + doc.SetObject(); + doc.AddMember(rapidjson::StringRef("digest"), rapidjson::Value{calculateAssetHash(), doc.GetAllocator()}, doc.GetAllocator()); + doc.AddMember(rapidjson::StringRef("assets"), rapidjson::Value{rapidjson::kObjectType}, doc.GetAllocator()); + for (auto& asset : expected_assets_) { + auto path_str = asset.path.string(); + doc["assets"].AddMember(rapidjson::StringRef(asset.id), rapidjson::kObjectType, doc.GetAllocator()); + doc["assets"][asset.id].AddMember(rapidjson::StringRef("path"), rapidjson::Value(path_str, doc.GetAllocator()), doc.GetAllocator()); + doc["assets"][asset.id].AddMember(rapidjson::StringRef("url"), rapidjson::StringRef(asset.url), doc.GetAllocator()); + } + rapidjson::StringBuffer buffer; + rapidjson::Writer<rapidjson::StringBuffer> writer(buffer); + doc.Accept(writer); + + return {buffer.GetString(), buffer.GetSize()}; + } + + private: + mutable std::recursive_mutex asset_mtx_; + std::set<AssetDescription> expected_assets_; + + std::optional<std::string> agent_asset_hash_; + + std::atomic<size_t> next_op_id_{1}; +}; + +class VerifyC2AssetSync : public VerifyC2Base { + public: + void configureC2() override { + configuration->set("nifi.c2.agent.protocol.class", "RESTSender"); + configuration->set("nifi.c2.enable", "true"); + configuration->set("nifi.c2.agent.heartbeat.period", "100"); + configuration->set("nifi.c2.root.classes", "DeviceInfoNode,AgentInformation,FlowInformation,AssetInformation"); + } + + void runAssertions() override { + verify_(); + } + + void setVerifier(std::function<void()> verify) { + verify_ = std::move(verify); + } + + private: + std::function<void()> verify_; +}; + +TEST_CASE("C2AssetSync", "[c2test]") { + TestController controller; + + // setup minifi home + const std::filesystem::path home_dir = controller.createTempDirectory(); + const auto asset_dir = home_dir / "asset"; + + std::filesystem::current_path(home_dir); + auto wd_guard = gsl::finally([] { + std::filesystem::current_path(minifi::utils::file::get_executable_dir()); + }); + + C2AcknowledgeHandler ack_handler; + std::string file_A = "hello from file A"; + FileProvider file_A_provider{file_A}; + std::string file_B = "hello from file B"; + FileProvider file_B_provider{file_B}; + std::string file_C = "hello from file C"; + FileProvider file_C_provider{file_C}; + std::string file_A_v2 = "hello from file A version 2"; + FileProvider file_Av2_provider{file_A_v2}; + C2HeartbeatHandler hb_handler{std::make_shared<minifi::Configure>()}; + + VerifyC2AssetSync harness; + harness.setUrl("http://localhost:0/api/file/A.txt", &file_A_provider); + harness.setUrl("http://localhost:0/api/file/Av2.txt", &file_Av2_provider); + harness.setUrl("http://localhost:0/api/file/B.txt", &file_B_provider); + harness.setUrl("http://localhost:0/api/file/C.txt", &file_C_provider); + + std::string absolute_file_A_url = "http://localhost:" + harness.getWebPort() + "/api/file/A.txt"; + + hb_handler.addAsset("Av1", "A.txt", "/api/file/A.txt"); + hb_handler.addAsset("Bv1", "nested/dir/B.txt", "/api/file/B.txt"); + hb_handler.addAsset("Cv1", "nested/C.txt", "/api/file/C.txt"); + + harness.setUrl("http://localhost:0/api/heartbeat", &hb_handler); + harness.setUrl("http://localhost:0/api/acknowledge", &ack_handler); + harness.setC2Url("/api/heartbeat", "/api/acknowledge"); + + auto get_asset_structure = [&] () { + std::unordered_map<std::string, std::string> contents; + for (auto& [dir, file] : minifi::utils::file::list_dir_all(asset_dir, controller.getLogger())) { + contents[(dir / file).string()] = minifi::utils::file::get_content(dir / file); + } + return contents; + }; + + harness.setVerifier([&] () { + REQUIRE(utils::verifyEventHappenedInPollTime(10s, [&] { + std::cout << "calculated hash = " << hb_handler.calculateAssetHash() << std::endl; + std::cout << "reported hash = " << hb_handler.getAgentAssetHash().value_or("<missing>") << std::endl; + return hb_handler.calculateAssetHash() == hb_handler.getAgentAssetHash(); + })); + + { + std::unordered_map<std::string, std::string> expected_assets{ + {(asset_dir / "A.txt").string(), file_A}, + {(asset_dir / "nested" / "dir" / "B.txt").string(), file_B}, + {(asset_dir / "nested" / "C.txt").string(), file_C}, + {(asset_dir / ".state").string(), hb_handler.assetState()} + }; + auto actual_assets = get_asset_structure(); + if (actual_assets != expected_assets) { + controller.getLogger()->log_error("Mismatch between expected and actual assets"); + for (auto& [path, content] : expected_assets) { + controller.getLogger()->log_error("Expected asset at {}: {}", path, content); + } + for (auto& [path, content] : actual_assets) { + controller.getLogger()->log_error("Actual asset at {}: {}", path, content); + } + REQUIRE(false); + } + } + + hb_handler.removeAsset("Av1"); + hb_handler.removeAsset("Cv1"); + hb_handler.addAsset("Av2", "A.txt", "/api/file/Av2.txt"); + + + REQUIRE(utils::verifyEventHappenedInPollTime(10s, [&] {return hb_handler.calculateAssetHash() == hb_handler.getAgentAssetHash();})); + + { + std::unordered_map<std::string, std::string> expected_assets{ + {(asset_dir / "A.txt").string(), file_A_v2}, + {(asset_dir / "nested" / "dir" / "B.txt").string(), file_B}, + {(asset_dir / ".state").string(), hb_handler.assetState()} + }; + + auto actual_assets = get_asset_structure(); + if (actual_assets != expected_assets) { + controller.getLogger()->log_error("Mismatch between expected and actual assets"); + for (auto& [path, content] : expected_assets) { + controller.getLogger()->log_error("Expected asset at {}: {}", path, content); + } + for (auto& [path, content] : actual_assets) { + controller.getLogger()->log_error("Actual asset at {}: {}", path, content); + } + REQUIRE(false); + } + } + }); + + harness.run(); +} + +} // namespace org::apache::nifi::minifi::test diff --git a/libminifi/test/integration/C2ClearCoreComponentStateTest.cpp b/libminifi/test/integration/C2ClearCoreComponentStateTest.cpp index 07081d815..a936bcd88 100644 --- a/libminifi/test/integration/C2ClearCoreComponentStateTest.cpp +++ b/libminifi/test/integration/C2ClearCoreComponentStateTest.cpp @@ -97,7 +97,7 @@ class ClearCoreComponentStateHandler: public HeartbeatHandler { break; case FlowState::FIRST_DESCRIBE_ACK: case FlowState::CLEAR_SENT: { - sendHeartbeatResponse("CLEAR", "corecomponentstate", "889346", conn, { {"corecomponent1", "TailFile1"} }); + sendHeartbeatResponse("CLEAR", "corecomponentstate", "889346", conn, { {"corecomponent1", minifi::c2::C2Value{"TailFile1"}} }); flow_state_ = FlowState::CLEAR_SENT; break; } diff --git a/libminifi/test/integration/C2DescribeMetricsTest.cpp b/libminifi/test/integration/C2DescribeMetricsTest.cpp index 692e0e37b..9e5bcbcdf 100644 --- a/libminifi/test/integration/C2DescribeMetricsTest.cpp +++ b/libminifi/test/integration/C2DescribeMetricsTest.cpp @@ -64,11 +64,11 @@ class MetricsHandler: public HeartbeatHandler { void handleHeartbeat(const rapidjson::Document&, struct mg_connection* conn) override { switch (state_) { case TestState::DESCRIBE_SPECIFIC_PROCESSOR_METRIC: { - sendHeartbeatResponse("DESCRIBE", "metrics", "889347", conn, {{"metricsClass", "GetFileMetrics"}}); + sendHeartbeatResponse("DESCRIBE", "metrics", "889347", conn, {{"metricsClass", minifi::c2::C2Value{"GetFileMetrics"}}}); break; } case TestState::DESCRIBE_SPECIFIC_SYSTEM_METRIC: { - sendHeartbeatResponse("DESCRIBE", "metrics", "889347", conn, {{"metricsClass", "QueueMetrics"}}); + sendHeartbeatResponse("DESCRIBE", "metrics", "889347", conn, {{"metricsClass", minifi::c2::C2Value{"QueueMetrics"}}}); break; } case TestState::DESCRIBE_ALL_METRICS: { diff --git a/libminifi/test/integration/C2MetricsTest.cpp b/libminifi/test/integration/C2MetricsTest.cpp index 4d356fc2b..39a0a19c7 100644 --- a/libminifi/test/integration/C2MetricsTest.cpp +++ b/libminifi/test/integration/C2MetricsTest.cpp @@ -62,7 +62,7 @@ class MetricsHandler: public HeartbeatHandler { explicit MetricsHandler(std::atomic_bool& metrics_updated_successfully, std::shared_ptr<minifi::Configure> configuration, const std::filesystem::path& replacement_config_path) : HeartbeatHandler(std::move(configuration)), metrics_updated_successfully_(metrics_updated_successfully), - replacement_config_(getReplacementConfigAsJsonValue(replacement_config_path.string())) { + replacement_config_(minifi::utils::file::get_content(replacement_config_path.string())) { } void handleHeartbeat(const rapidjson::Document& root, struct mg_connection* conn) override { @@ -73,7 +73,7 @@ class MetricsHandler: public HeartbeatHandler { break; } case TestState::SEND_NEW_CONFIG: { - sendHeartbeatResponse("UPDATE", "configuration", "889348", conn, {{"configuration_data", replacement_config_}}); + sendHeartbeatResponse("UPDATE", "configuration", "889348", conn, {{"configuration_data", minifi::c2::C2Value{replacement_config_}}}); test_state_ = TestState::VERIFY_UPDATED_METRICS; break; } @@ -178,14 +178,6 @@ class MetricsHandler: public HeartbeatHandler { processor_metrics["GetTCPMetrics"][GETTCP1_UUID].HasMember("TransferredBytes"); } - [[nodiscard]] static std::string getReplacementConfigAsJsonValue(const std::string& replacement_config_path) { - std::ifstream is(replacement_config_path); - auto content = std::string((std::istreambuf_iterator<char>(is)), std::istreambuf_iterator<char>()); - content = minifi::utils::string::replaceAll(content, "\n", "\\n"); - content = minifi::utils::string::replaceAll(content, "\"", "\\\""); - return content; - } - std::atomic_bool& metrics_updated_successfully_; TestState test_state_ = TestState::VERIFY_INITIAL_METRICS; std::string replacement_config_; diff --git a/libminifi/test/integration/C2UpdateAssetTest.cpp b/libminifi/test/integration/C2UpdateAssetTest.cpp index 71cd0b737..907b9f921 100644 --- a/libminifi/test/integration/C2UpdateAssetTest.cpp +++ b/libminifi/test/integration/C2UpdateAssetTest.cpp @@ -56,7 +56,7 @@ class C2HeartbeatHandler : public HeartbeatHandler { return true; } - void addOperation(std::string id, std::unordered_map<std::string, std::string> args) { + void addOperation(std::string id, std::unordered_map<std::string, c2::C2Value> args) { std::lock_guard<std::mutex> guard(op_mtx_); operations_.push_back(C2Operation{ .operation = "update", @@ -92,7 +92,7 @@ class VerifyC2AssetUpdate : public VerifyC2Base { struct AssetUpdateOperation { std::string id; - std::unordered_map<std::string, std::string> args; + std::unordered_map<std::string, c2::C2Value> args; std::string state; std::optional<std::string> details; }; @@ -103,7 +103,11 @@ TEST_CASE("Test update asset C2 command", "[c2test]") { // setup minifi home const std::filesystem::path home_dir = controller.createTempDirectory(); const auto asset_dir = home_dir / "asset"; + std::filesystem::current_path(home_dir); + auto wd_guard = gsl::finally([] { + std::filesystem::current_path(minifi::utils::file::get_executable_dir()); + }); C2AcknowledgeHandler ack_handler; std::string file_A = "hello from file A"; @@ -130,7 +134,7 @@ TEST_CASE("Test update asset C2 command", "[c2test]") { operations.push_back({ .id = "2", .args = { - {"file", "my_file.txt"} + {"file", minifi::c2::C2Value{"my_file.txt"}} }, .state = "NOT_APPLIED", .details = "Couldn't find 'url' argument" @@ -139,8 +143,8 @@ TEST_CASE("Test update asset C2 command", "[c2test]") { operations.push_back({ .id = "3", .args = { - {"file", "my_file.txt"}, - {"url", "/api/file/A.txt"} + {"file", minifi::c2::C2Value{"my_file.txt"}}, + {"url", minifi::c2::C2Value{"/api/file/A.txt"}} }, .state = "FULLY_APPLIED", .details = std::nullopt @@ -149,8 +153,8 @@ TEST_CASE("Test update asset C2 command", "[c2test]") { operations.push_back({ .id = "4", .args = { - {"file", "my_file.txt"}, - {"url", "/api/file/A.txt"} + {"file", minifi::c2::C2Value{"my_file.txt"}}, + {"url", minifi::c2::C2Value{"/api/file/A.txt"}} }, .state = "NO_OPERATION", .details = std::nullopt @@ -159,9 +163,9 @@ TEST_CASE("Test update asset C2 command", "[c2test]") { operations.push_back({ .id = "5", .args = { - {"file", "my_file.txt"}, - {"url", "/api/file/B.txt"}, - {"forceDownload", "true"} + {"file", minifi::c2::C2Value{"my_file.txt"}}, + {"url", minifi::c2::C2Value{"/api/file/B.txt"}}, + {"forceDownload", minifi::c2::C2Value{"true"}} }, .state = "FULLY_APPLIED", .details = std::nullopt @@ -170,8 +174,8 @@ TEST_CASE("Test update asset C2 command", "[c2test]") { operations.push_back({ .id = "6", .args = { - {"file", "new_dir/inner/my_file.txt"}, - {"url", "/api/file/A.txt"} + {"file", minifi::c2::C2Value{"new_dir/inner/my_file.txt"}}, + {"url", minifi::c2::C2Value{"/api/file/A.txt"}} }, .state = "FULLY_APPLIED", .details = std::nullopt @@ -180,8 +184,8 @@ TEST_CASE("Test update asset C2 command", "[c2test]") { operations.push_back({ .id = "7", .args = { - {"file", "dummy.txt"}, - {"url", "/not_existing_api/file.txt"} + {"file", minifi::c2::C2Value{"dummy.txt"}}, + {"url", minifi::c2::C2Value{"/not_existing_api/file.txt"}} }, .state = "NOT_APPLIED", .details = "Failed to fetch asset" @@ -190,8 +194,8 @@ TEST_CASE("Test update asset C2 command", "[c2test]") { operations.push_back({ .id = "8", .args = { - {"file", "../../system_lib.dll"}, - {"url", "/not_existing_api/file.txt"} + {"file", minifi::c2::C2Value{"../../system_lib.dll"}}, + {"url", minifi::c2::C2Value{"/not_existing_api/file.txt"}} }, .state = "NOT_APPLIED", .details = "Accessing parent directory is forbidden in file path" @@ -200,8 +204,8 @@ TEST_CASE("Test update asset C2 command", "[c2test]") { operations.push_back({ .id = "9", .args = { - {"file", "other_dir/A.txt"}, - {"url", absolute_file_A_url} + {"file", minifi::c2::C2Value{"other_dir/A.txt"}}, + {"url", minifi::c2::C2Value{absolute_file_A_url}} }, .state = "FULLY_APPLIED", .details = std::nullopt @@ -244,11 +248,12 @@ TEST_CASE("Test update asset C2 command", "[c2test]") { // this op failed no file made on the disk continue; } - expected_files[(asset_dir / op.args["file"]).string()] = minifi::utils::string::endsWith(op.args["url"], "A.txt") ? file_A : file_B; + expected_files[(asset_dir / op.args["file"].to_string()).string()] = minifi::utils::string::endsWith(op.args["url"].to_string(), "A.txt") ? file_A : file_B; } size_t file_count = minifi::utils::file::list_dir_all(asset_dir.string(), controller.getLogger()).size(); - if (file_count != expected_files.size()) { + // + 1 is for the .state file from the AssetManager + if (file_count != expected_files.size() + 1) { controller.getLogger()->log_error("Expected {} files, got {}", expected_files.size(), file_count); REQUIRE(false); } @@ -258,8 +263,6 @@ TEST_CASE("Test update asset C2 command", "[c2test]") { REQUIRE(false); } } - - std::filesystem::current_path(minifi::utils::file::get_executable_dir()); } } // namespace org::apache::nifi::minifi::test diff --git a/libminifi/test/libtest/integration/HTTPHandlers.cpp b/libminifi/test/libtest/integration/HTTPHandlers.cpp index eb357b2b9..31578022a 100644 --- a/libminifi/test/libtest/integration/HTTPHandlers.cpp +++ b/libminifi/test/libtest/integration/HTTPHandlers.cpp @@ -56,7 +56,7 @@ bool PeerResponder::handleGet(CivetServer* /*server*/, struct mg_connection *con #else std::string hostname = "localhost"; #endif - std::string site2site_rest_resp = "{\"peers\" : [{ \"hostname\": \"" + hostname + "\", \"port\": " + port + ", \"secure\": false, \"flowFileCount\" : 0 }] }"; + std::string site2site_rest_resp = R"({"peers" : [{ "hostname": ")" + hostname + R"(", "port": )" + port + R"(, "secure": false, "flowFileCount" : 0 }] })"; std::stringstream headers; headers << "HTTP/1.1 200 OK\r\nContent-Type: application/json\r\nContent-Length: " << site2site_rest_resp.length() << "\r\nConnection: close\r\n\r\n"; mg_printf(conn, "%s", headers.str().c_str()); @@ -134,7 +134,8 @@ bool FlowFileResponder::handlePost(CivetServer* /*server*/, struct mg_connection const auto flow = std::make_shared<FlowObj>(); for (uint32_t i = 0; i < num_attributes; i++) { - std::string name, value; + std::string name; + std::string value; { const auto read = stream.read(name, true); if (!isServerRunning()) return false; @@ -204,7 +205,7 @@ bool FlowFileResponder::handleGet(CivetServer* /*server*/, struct mg_connection minifi::io::BufferStream serializer; minifi::io::CRCStream <minifi::io::OutputStream> stream(gsl::make_not_null(&serializer)); for (const auto& flow : flows) { - uint32_t num_attributes = gsl::narrow<uint32_t>(flow->attributes.size()); + auto num_attributes = gsl::narrow<uint32_t>(flow->attributes.size()); stream.write(num_attributes); for (const auto& entry : flow->attributes) { stream.write(entry.first); @@ -235,41 +236,38 @@ bool DeleteTransactionResponder::handleDelete(CivetServer* /*server*/, struct mg } void HeartbeatHandler::sendHeartbeatResponse(const std::vector<C2Operation>& operations, struct mg_connection * conn) { - std::string operation_jsons; + rapidjson::Document hb_obj{rapidjson::kObjectType}; + hb_obj.AddMember("operation", "heartbeat", hb_obj.GetAllocator()); + hb_obj.AddMember("requested_operations", rapidjson::kArrayType, hb_obj.GetAllocator()); for (const auto& c2_operation : operations) { - std::string resp_args; + rapidjson::Value op{rapidjson::kObjectType}; + op.AddMember("operation", c2_operation.operation, hb_obj.GetAllocator()); + op.AddMember("operationid", c2_operation.operation_id, hb_obj.GetAllocator()); + op.AddMember("operand", c2_operation.operand, hb_obj.GetAllocator()); if (!c2_operation.args.empty()) { - resp_args = ", \"args\": {"; - auto it = c2_operation.args.begin(); - while (it != c2_operation.args.end()) { - resp_args += "\"" + it->first + "\": \"" + it->second + "\""; - ++it; - if (it != c2_operation.args.end()) { - resp_args += ", "; + rapidjson::Value args{rapidjson::kObjectType}; + for (auto& [arg_name, arg_val] : c2_operation.args) { + rapidjson::Value json_arg_val; + if (auto* json_val = arg_val.json()) { + json_arg_val.CopyFrom(*json_val, hb_obj.GetAllocator()); + } else { + json_arg_val.SetString(arg_val.to_string(), hb_obj.GetAllocator()); } + args.AddMember(rapidjson::StringRef(arg_name), json_arg_val, hb_obj.GetAllocator()); } - resp_args += "}"; - } - - std::string operation_json = "{" - "\"operation\" : \"" + c2_operation.operation + "\"," - "\"operationid\" : \"" + c2_operation.operation_id + "\"," - "\"operand\": \"" + c2_operation.operand + "\"" + - resp_args + "}"; - - if (operation_jsons.empty()) { - operation_jsons += operation_json; - } else { - operation_jsons += ", " + operation_json; + op.AddMember("args", args, hb_obj.GetAllocator()); } + hb_obj["requested_operations"].PushBack(op, hb_obj.GetAllocator()); } - std::string heartbeat_response = "{\"operation\" : \"heartbeat\",\"requested_operations\": [ " + operation_jsons + " ]}"; + rapidjson::StringBuffer buffer; + rapidjson::Writer<rapidjson::StringBuffer> writer(buffer); + hb_obj.Accept(writer); mg_printf(conn, "HTTP/1.1 200 OK\r\nContent-Type: " "text/plain\r\nContent-Length: %lu\r\nConnection: close\r\n\r\n", - heartbeat_response.length()); - mg_printf(conn, "%s", heartbeat_response.c_str()); + buffer.GetLength()); + mg_printf(conn, "%s", buffer.GetString()); } void HeartbeatHandler::verifyJsonHasAgentManifest(const rapidjson::Document& root, const std::vector<std::string>& verify_components, const std::vector<std::string>& disallowed_properties) { @@ -477,9 +475,9 @@ bool C2UpdateHandler::handlePost(CivetServer* /*server*/, struct mg_connection * } void C2UpdateHandler::setC2RestResponse(const std::string& url, const std::string& name, const std::optional<std::string>& persist) { - std::string content = "{\"location\": \"" + url + "\""; + std::string content = R"({"location": ")" + url + "\""; if (persist) { - content += ", \"persist\": \"" + *persist + "\""; + content += R"(, "persist": ")" + *persist + "\""; } content += "}"; response_ = diff --git a/libminifi/test/libtest/integration/HTTPHandlers.h b/libminifi/test/libtest/integration/HTTPHandlers.h index a47d2a506..c73395ed0 100644 --- a/libminifi/test/libtest/integration/HTTPHandlers.h +++ b/libminifi/test/libtest/integration/HTTPHandlers.h @@ -214,15 +214,15 @@ class HeartbeatHandler : public ServerAwareHandler { std::string operation; std::string operand; std::string operation_id; - std::unordered_map<std::string, std::string> args; + std::unordered_map<std::string, c2::C2Value> args; }; - void sendHeartbeatResponse(const std::string& operation, const std::string& operand, const std::string& operation_id, struct mg_connection* conn, - const std::unordered_map<std::string, std::string>& args = {}) { + static void sendHeartbeatResponse(const std::string& operation, const std::string& operand, const std::string& operation_id, struct mg_connection* conn, + const std::unordered_map<std::string, c2::C2Value>& args = {}) { sendHeartbeatResponse({{operation, operand, operation_id, args}}, conn); } - void sendHeartbeatResponse(const std::vector<C2Operation>& operations, struct mg_connection * conn); + static void sendHeartbeatResponse(const std::vector<C2Operation>& operations, struct mg_connection * conn); void verifyJsonHasAgentManifest(const rapidjson::Document& root, const std::vector<std::string>& verify_components = {}, const std::vector<std::string>& disallowed_properties = {}); void verify(struct mg_connection *conn); diff --git a/libminifi/test/libtest/integration/IntegrationBase.cpp b/libminifi/test/libtest/integration/IntegrationBase.cpp index 36efc0cee..72ae844fd 100644 --- a/libminifi/test/libtest/integration/IntegrationBase.cpp +++ b/libminifi/test/libtest/integration/IntegrationBase.cpp @@ -23,6 +23,7 @@ #include "utils/HTTPUtils.h" #include "unit/ProvenanceTestHelper.h" #include "utils/FifoExecutor.h" +#include "utils/file/AssetManager.h" #include "core/ConfigurationFactory.h" namespace org::apache::nifi::minifi::test { @@ -117,9 +118,10 @@ void IntegrationBase::run(const std::optional<std::filesystem::path>& test_file_ }; std::vector<std::shared_ptr<core::RepositoryMetricsSource>> repo_metric_sources{test_repo, test_flow_repo, content_repo}; - auto metrics_publisher_store = std::make_unique<minifi::state::MetricsPublisherStore>(configuration, repo_metric_sources, flow_config); + asset_manager_ = std::make_unique<minifi::utils::file::AssetManager>(*configuration); + auto metrics_publisher_store = std::make_unique<minifi::state::MetricsPublisherStore>(configuration, repo_metric_sources, flow_config, asset_manager_.get()); flowController_ = std::make_unique<minifi::FlowController>(test_repo, test_flow_repo, configuration, - std::move(flow_config), content_repo, std::move(metrics_publisher_store), filesystem, request_restart); + std::move(flow_config), content_repo, std::move(metrics_publisher_store), filesystem, request_restart, asset_manager_.get()); flowController_->load(); updateProperties(*flowController_); flowController_->start(); diff --git a/libminifi/test/libtest/integration/IntegrationBase.h b/libminifi/test/libtest/integration/IntegrationBase.h index 8c374159d..86a46e3f8 100644 --- a/libminifi/test/libtest/integration/IntegrationBase.h +++ b/libminifi/test/libtest/integration/IntegrationBase.h @@ -28,6 +28,7 @@ #include "core/ProcessGroup.h" #include "FlowController.h" #include "properties/Configure.h" +#include "utils/file/AssetManager.h" namespace minifi = org::apache::nifi::minifi; namespace core = minifi::core; @@ -107,6 +108,7 @@ class IntegrationBase { void configureSecurity(); std::shared_ptr<minifi::Configure> configuration; + std::unique_ptr<minifi::utils::file::AssetManager> asset_manager_; std::unique_ptr<minifi::state::response::ResponseNodeLoader> response_node_loader_; std::unique_ptr<minifi::FlowController> flowController_; std::chrono::milliseconds wait_time_; diff --git a/libminifi/test/resources/encrypted.minifi.properties b/libminifi/test/resources/encrypted.minifi.properties index c9f3eac06..f19422e43 100644 --- a/libminifi/test/resources/encrypted.minifi.properties +++ b/libminifi/test/resources/encrypted.minifi.properties @@ -57,7 +57,7 @@ nifi.c2.enable=true nifi.c2.flow.base.url=http://localhost:10080/c2-server/api nifi.c2.rest.url=http://localhost:10080/c2-server/api/c2-protocol/heartbeat nifi.c2.rest.url.ack=http://localhost:10080/c2-server/api/c2-protocol/acknowledge -nifi.c2.root.classes=DeviceInfoNode,AgentInformation,FlowInformation +nifi.c2.root.classes=DeviceInfoNode,AgentInformation,FlowInformation,AssetInformation ## Minimize heartbeat payload size by excluding agent manifest from the heartbeat #nifi.c2.full.heartbeat=false ## heartbeat 4 times a second diff --git a/libminifi/test/unit/PayloadParserTests.cpp b/libminifi/test/unit/PayloadParserTests.cpp index bdb2f5046..0a9702bfa 100644 --- a/libminifi/test/unit/PayloadParserTests.cpp +++ b/libminifi/test/unit/PayloadParserTests.cpp @@ -30,7 +30,7 @@ TEST_CASE("Test Valid Payload", "[tv1]") { minifi::c2::C2Payload payload(minifi::c2::Operation::acknowledge, ident); minifi::c2::C2Payload payload2(minifi::c2::Operation::acknowledge, minifi::state::UpdateState::FULLY_APPLIED, cheese); minifi::c2::C2ContentResponse response(minifi::c2::Operation::acknowledge); - response.operation_arguments["type"] = "munster"; + response.operation_arguments["type"] = minifi::c2::C2Value{"munster"}; payload2.addContent(std::move(response)); payload.addPayload(std::move(payload2)); payload.addPayload(minifi::c2::C2Payload(minifi::c2::Operation::acknowledge, chips)); @@ -44,7 +44,7 @@ TEST_CASE("Test Invalid not found", "[tv2]") { minifi::c2::C2Payload payload(minifi::c2::Operation::acknowledge, ident); minifi::c2::C2Payload payload2(minifi::c2::Operation::acknowledge, cheese); minifi::c2::C2ContentResponse response(minifi::c2::Operation::acknowledge); - response.operation_arguments["typeS"] = "munster"; + response.operation_arguments["typeS"] = minifi::c2::C2Value{"munster"}; payload2.addContent(std::move(response)); payload.addPayload(std::move(payload2)); payload.addPayload(minifi::c2::C2Payload(minifi::c2::Operation::acknowledge, chips)); @@ -59,7 +59,7 @@ TEST_CASE("Test Invalid coercion", "[tv3]") { minifi::c2::C2Payload payload(minifi::c2::Operation::acknowledge, ident); minifi::c2::C2Payload payload2(minifi::c2::Operation::acknowledge, minifi::state::UpdateState::FULLY_APPLIED, cheese); minifi::c2::C2ContentResponse response(minifi::c2::Operation::acknowledge); - response.operation_arguments["type"] = "munster"; + response.operation_arguments["type"] = minifi::c2::C2Value{"munster"}; payload2.addContent(std::move(response)); payload.addPayload(std::move(payload2)); payload.addPayload(minifi::c2::C2Payload(minifi::c2::Operation::acknowledge, chips)); @@ -73,7 +73,7 @@ TEST_CASE("Test Invalid not there", "[tv4]") { minifi::c2::C2Payload payload(minifi::c2::Operation::acknowledge, ident); minifi::c2::C2Payload payload2(minifi::c2::Operation::acknowledge, minifi::state::UpdateState::FULLY_APPLIED, cheese); minifi::c2::C2ContentResponse response(minifi::c2::Operation::acknowledge); - response.operation_arguments["type"] = "munster"; + response.operation_arguments["type"] = minifi::c2::C2Value{"munster"}; payload2.addContent(std::move(response)); payload.addPayload(std::move(payload2)); payload.addPayload(minifi::c2::C2Payload(minifi::c2::Operation::acknowledge, chips)); @@ -89,9 +89,9 @@ TEST_CASE("Test typed conversions", "[tv5]") { minifi::c2::C2Payload payload(minifi::c2::Operation::acknowledge, ident); minifi::c2::C2Payload payload2(minifi::c2::Operation::acknowledge, minifi::state::UpdateState::FULLY_APPLIED, cheese); minifi::c2::C2ContentResponse response(minifi::c2::Operation::acknowledge); - response.operation_arguments["type"] = "munster"; - response.operation_arguments["isvalid"] = isvalid; - response.operation_arguments["size"] = size; + response.operation_arguments["type"] = minifi::c2::C2Value{"munster"}; + response.operation_arguments["isvalid"] = minifi::c2::C2Value{isvalid}; + response.operation_arguments["size"] = minifi::c2::C2Value{size}; payload2.addContent(std::move(response)); payload.addPayload(std::move(payload2)); payload.addPayload(minifi::c2::C2Payload(minifi::c2::Operation::acknowledge, chips)); @@ -108,7 +108,7 @@ TEST_CASE("Test Invalid not there deep", "[tv6]") { minifi::c2::C2Payload payload(minifi::c2::Operation::acknowledge, ident); minifi::c2::C2Payload payload2(minifi::c2::Operation::acknowledge, minifi::state::UpdateState::FULLY_APPLIED, cheese); minifi::c2::C2ContentResponse response(minifi::c2::Operation::acknowledge); - response.operation_arguments["type"] = "munster"; + response.operation_arguments["type"] = minifi::c2::C2Value{"munster"}; payload2.addContent(std::move(response)); payload.addPayload(std::move(payload2)); payload.addPayload(minifi::c2::C2Payload(minifi::c2::Operation::acknowledge, chips)); diff --git a/minifi_main/MiNiFiMain.cpp b/minifi_main/MiNiFiMain.cpp index 782b06c80..cf15f5633 100644 --- a/minifi_main/MiNiFiMain.cpp +++ b/minifi_main/MiNiFiMain.cpp @@ -57,6 +57,7 @@ #include "properties/Decryptor.h" #include "utils/file/PathUtils.h" #include "utils/file/FileUtils.h" +#include "utils/file/AssetManager.h" #include "utils/Environment.h" #include "utils/FileMutex.h" #include "FlowController.h" @@ -397,11 +398,13 @@ int main(int argc, char **argv) { .sensitive_values_encryptor = utils::crypto::EncryptionProvider::createSensitivePropertiesEncryptor(minifiHome) }, nifi_configuration_class_name); - std::vector<std::shared_ptr<core::RepositoryMetricsSource>> repo_metric_sources{prov_repo, flow_repo, content_repo}; - auto metrics_publisher_store = std::make_unique<minifi::state::MetricsPublisherStore>(configure, repo_metric_sources, flow_configuration); + auto asset_manager = std::make_unique<utils::file::AssetManager>(*configure); + std::vector<std::shared_ptr<core::RepositoryMetricsSource>> repo_metric_sources{prov_repo, flow_repo, content_repo}; + auto metrics_publisher_store = std::make_unique<minifi::state::MetricsPublisherStore>(configure, repo_metric_sources, flow_configuration, asset_manager.get()); const auto controller = std::make_unique<minifi::FlowController>( - prov_repo, flow_repo, configure, std::move(flow_configuration), content_repo, std::move(metrics_publisher_store), filesystem, request_restart); + prov_repo, flow_repo, configure, std::move(flow_configuration), content_repo, + std::move(metrics_publisher_store), filesystem, request_restart, asset_manager.get()); const bool disk_space_watchdog_enable = configure->get(minifi::Configure::minifi_disk_space_watchdog_enable) | utils::andThen(utils::string::toBool)
