This is an automated email from the ASF dual-hosted git repository. szaszm pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git
commit a57e38c75d08dbd50b369dedc2240ab2708c9ec2 Author: Adam Debreceni <[email protected]> AuthorDate: Sat Mar 4 23:38:16 2023 +0100 MINIFICPP-2065 Only accept configuration format the agent can handle - Only add accept header if the list of accepted formats is non-empty Closes #1522 Signed-off-by: Marton Szasz <[email protected]> --- extensions/http-curl/protocols/RESTSender.cpp | 13 ++++++++++--- extensions/http-curl/protocols/RESTSender.h | 7 ++++++- libminifi/include/FlowController.h | 2 ++ libminifi/include/c2/C2Protocol.h | 5 +++++ libminifi/include/core/FlowConfiguration.h | 4 ++++ libminifi/include/core/flow/AdaptiveConfiguration.h | 5 +++++ libminifi/include/core/state/UpdateController.h | 2 ++ libminifi/include/core/yaml/YamlConfiguration.h | 5 +++++ libminifi/src/FlowController.cpp | 4 ++++ libminifi/src/c2/C2Agent.cpp | 5 ++--- libminifi/test/unit/ControllerTests.cpp | 4 ++++ 11 files changed, 49 insertions(+), 7 deletions(-) diff --git a/extensions/http-curl/protocols/RESTSender.cpp b/extensions/http-curl/protocols/RESTSender.cpp index ad3036cbe..d5934b817 100644 --- a/extensions/http-curl/protocols/RESTSender.cpp +++ b/extensions/http-curl/protocols/RESTSender.cpp @@ -94,7 +94,8 @@ void RESTSender::setSecurityContext(extensions::curl::HTTPClient &client, const client.initialize(type, url, generatedService); } -C2Payload RESTSender::sendPayload(const std::string& url, const Direction direction, const C2Payload &payload, std::optional<std::string> data) { +C2Payload RESTSender::sendPayload(const std::string& url, const Direction direction, const C2Payload &payload, std::optional<std::string> data, + const std::optional<std::vector<std::string>>& accepted_formats) { if (url.empty()) { return {payload.getOperation(), state::UpdateState::READ_ERROR}; } @@ -162,7 +163,9 @@ C2Payload RESTSender::sendPayload(const std::string& url, const Direction direct if (payload.getOperation() == Operation::TRANSFER) { auto read = std::make_unique<utils::HTTPReadCallback>(std::numeric_limits<size_t>::max()); client.setReadCallback(std::move(read)); - client.setRequestHeader("Accept", "application/vnd.minifi-c2+json;version=1, text/yml"); + if (accepted_formats && !accepted_formats->empty()) { + client.setRequestHeader("Accept", utils::StringUtils::join(", ", accepted_formats.value())); + } } else { // Due to a bug in MiNiFi C2 the Accept header is not handled properly thus we need to exclude it to be compatible // TODO(lordgamez): The header should be re-added when the issue in MiNiFi C2 is fixed: https://issues.apache.org/jira/browse/NIFI-10535 @@ -181,7 +184,7 @@ C2Payload RESTSender::sendPayload(const std::string& url, const Direction direct const auto response_body_bytes = gsl::make_span(client.getResponseBody()).as_span<const std::byte>(); logger_->log_trace("Received response: \"%s\"", [&] {return utils::StringUtils::escapeUnprintableBytes(response_body_bytes);}); if (isOkay && !clientError && !serverError) { - if (payload.isRaw()) { + if (accepted_formats) { C2Payload response_payload(payload.getOperation(), state::UpdateState::READ_COMPLETE, true); response_payload.setRawData(response_body_bytes); return response_payload; @@ -192,6 +195,10 @@ C2Payload RESTSender::sendPayload(const std::string& url, const Direction direct } } +C2Payload RESTSender::fetch(const std::string& url, const std::vector<std::string>& accepted_formats, bool /*async*/) { + return sendPayload(url, Direction::RECEIVE, C2Payload(Operation::TRANSFER, true), std::nullopt, accepted_formats); +} + REGISTER_RESOURCE(RESTSender, DescriptionOnly); } // namespace org::apache::nifi::minifi::c2 diff --git a/extensions/http-curl/protocols/RESTSender.h b/extensions/http-curl/protocols/RESTSender.h index f4337866e..22463a272 100644 --- a/extensions/http-curl/protocols/RESTSender.h +++ b/extensions/http-curl/protocols/RESTSender.h @@ -18,6 +18,8 @@ #include <string> #include <memory> +#include <vector> +#include <optional> #include "c2/C2Protocol.h" #include "c2/protocols/RESTProtocol.h" @@ -50,12 +52,15 @@ class RESTSender : public RESTProtocol, public C2Protocol { C2Payload consumePayload(const C2Payload &payload, Direction direction, bool async) override; + C2Payload fetch(const std::string& url, const std::vector<std::string>& accepted_formats, bool async) override; + void update(const std::shared_ptr<Configure> &configure) override; void initialize(core::controller::ControllerServiceProvider* controller, const std::shared_ptr<Configure> &configure) override; protected: - C2Payload sendPayload(const std::string& url, const Direction direction, const C2Payload &payload, std::optional<std::string> data); + C2Payload sendPayload(const std::string& url, const Direction direction, const C2Payload &payload, std::optional<std::string> data, + const std::optional<std::vector<std::string>>& accepted_formats = std::nullopt); /** * Initializes the SSLContextService onto the HTTP client if one is needed diff --git a/libminifi/include/FlowController.h b/libminifi/include/FlowController.h index 84ac20af8..d412427d3 100644 --- a/libminifi/include/FlowController.h +++ b/libminifi/include/FlowController.h @@ -115,6 +115,8 @@ class FlowController : public core::controller::ForwardingControllerServiceProvi int16_t clearConnection(const std::string &connection) override; + std::vector<std::string> getSupportedConfigurationFormats() const override; + int16_t applyUpdate(const std::string& /*source*/, const std::shared_ptr<state::Update>&) override { return -1; } // Asynchronous function trigger unloading and wait for a period of time virtual void waitUnload(uint64_t timeToWaitMs); diff --git a/libminifi/include/c2/C2Protocol.h b/libminifi/include/c2/C2Protocol.h index ce248a4d7..03d18722f 100644 --- a/libminifi/include/c2/C2Protocol.h +++ b/libminifi/include/c2/C2Protocol.h @@ -21,6 +21,7 @@ #include <memory> #include <string> #include <utility> +#include <vector> #include "C2Payload.h" #include "core/controller/ControllerServiceProvider.h" @@ -72,6 +73,10 @@ class C2Protocol : public core::Connectable { */ virtual C2Payload consumePayload(const C2Payload &operation, Direction direction = TRANSMIT, bool async = false) = 0; + virtual C2Payload fetch(const std::string& url, const std::vector<std::string>& /*accepted_formats*/ = {}, bool async = false) { + return consumePayload(url, C2Payload(Operation::TRANSFER, true), Direction::RECEIVE, async); + } + /** * Determines if we are connected and operating */ diff --git a/libminifi/include/core/FlowConfiguration.h b/libminifi/include/core/FlowConfiguration.h index f58aea588..be4e75df2 100644 --- a/libminifi/include/core/FlowConfiguration.h +++ b/libminifi/include/core/FlowConfiguration.h @@ -96,6 +96,10 @@ class FlowConfiguration : public CoreComponent { return flow_version_; } + virtual std::vector<std::string> getSupportedFormats() const { + return {}; + } + std::shared_ptr<Configure> getConfiguration() { // cannot be const as getters mutate the underlying map return configuration_; } diff --git a/libminifi/include/core/flow/AdaptiveConfiguration.h b/libminifi/include/core/flow/AdaptiveConfiguration.h index 67a4c5885..e897905c9 100644 --- a/libminifi/include/core/flow/AdaptiveConfiguration.h +++ b/libminifi/include/core/flow/AdaptiveConfiguration.h @@ -19,6 +19,7 @@ #include <string> #include <memory> +#include <vector> #include "StructuredConfiguration.h" @@ -28,6 +29,10 @@ class AdaptiveConfiguration : public StructuredConfiguration { public: explicit AdaptiveConfiguration(ConfigurationContext ctx); + std::vector<std::string> getSupportedFormats() const override { + return {"application/vnd.minifi-c2+json;version=1", "text/yml"}; + } + std::unique_ptr<core::ProcessGroup> getRootFromPayload(const std::string &payload) override; }; diff --git a/libminifi/include/core/state/UpdateController.h b/libminifi/include/core/state/UpdateController.h index 4ee42bc3e..c63b385bc 100644 --- a/libminifi/include/core/state/UpdateController.h +++ b/libminifi/include/core/state/UpdateController.h @@ -158,6 +158,8 @@ class StateMonitor : public StateController { */ virtual int16_t clearConnection(const std::string &connection) = 0; + virtual std::vector<std::string> getSupportedConfigurationFormats() const = 0; + /** * Apply an update with the provided string. * diff --git a/libminifi/include/core/yaml/YamlConfiguration.h b/libminifi/include/core/yaml/YamlConfiguration.h index 4d82ea394..5db71ea0b 100644 --- a/libminifi/include/core/yaml/YamlConfiguration.h +++ b/libminifi/include/core/yaml/YamlConfiguration.h @@ -21,6 +21,7 @@ #include <optional> #include <string> #include <unordered_set> +#include <vector> #include "core/FlowConfiguration.h" #include "core/logging/LoggerFactory.h" @@ -44,6 +45,10 @@ class YamlConfiguration : public flow::StructuredConfiguration { ~YamlConfiguration() override = default; + std::vector<std::string> getSupportedFormats() const override { + return {"text/yml"}; + } + /** * Returns a shared pointer to a ProcessGroup object containing the * flow configuration. The yamlConfigStream argument must point to diff --git a/libminifi/src/FlowController.cpp b/libminifi/src/FlowController.cpp index a31e6536b..efc69a36b 100644 --- a/libminifi/src/FlowController.cpp +++ b/libminifi/src/FlowController.cpp @@ -419,6 +419,10 @@ int16_t FlowController::resume() { return 0; } +std::vector<std::string> FlowController::getSupportedConfigurationFormats() const { + return flow_configuration_->getSupportedFormats(); +} + int16_t FlowController::applyUpdate(const std::string &source, const std::string &configuration, bool persist, const std::optional<std::string>& flow_id) { if (applyConfiguration(source, configuration, flow_id)) { if (persist) { diff --git a/libminifi/src/c2/C2Agent.cpp b/libminifi/src/c2/C2Agent.cpp index 76f5be0ad..faa89b971 100644 --- a/libminifi/src/c2/C2Agent.cpp +++ b/libminifi/src/c2/C2Agent.cpp @@ -842,8 +842,7 @@ std::optional<std::string> C2Agent::fetchFlow(const std::string& uri) const { return std::nullopt; } - C2Payload payload(Operation::TRANSFER, true); - C2Payload &&response = protocol_.load()->consumePayload(resolved_url.value(), payload, RECEIVE, false); + C2Payload response = protocol_.load()->fetch(resolved_url.value(), update_sink_->getSupportedConfigurationFormats()); return response.getRawDataAsString(); } @@ -995,7 +994,7 @@ void C2Agent::handleAssetUpdate(const C2ContentResponse& resp) { return; } - C2Payload file_response = protocol_.load()->consumePayload(url, C2Payload(Operation::TRANSFER, true), RECEIVE, false); + C2Payload file_response = protocol_.load()->fetch(url); if (file_response.getStatus().getState() != state::UpdateState::READ_COMPLETE) { send_error("Failed to fetch asset from '" + url + "'"); diff --git a/libminifi/test/unit/ControllerTests.cpp b/libminifi/test/unit/ControllerTests.cpp index 796d1eae2..58b990649 100644 --- a/libminifi/test/unit/ControllerTests.cpp +++ b/libminifi/test/unit/ControllerTests.cpp @@ -152,6 +152,10 @@ class TestUpdateSink : public minifi::state::StateMonitor { return 0; } + std::vector<std::string> getSupportedConfigurationFormats() const override { + return {}; + } + /** * Apply an update with the provided string. *
