This is an automated email from the ASF dual-hosted git repository.

szaszm pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git

commit a57e38c75d08dbd50b369dedc2240ab2708c9ec2
Author: Adam Debreceni <[email protected]>
AuthorDate: Sat Mar 4 23:38:16 2023 +0100

    MINIFICPP-2065 Only accept configuration format the agent can handle
    
    - Only add accept header if the list of accepted formats is non-empty
    
    Closes #1522
    Signed-off-by: Marton Szasz <[email protected]>
---
 extensions/http-curl/protocols/RESTSender.cpp       | 13 ++++++++++---
 extensions/http-curl/protocols/RESTSender.h         |  7 ++++++-
 libminifi/include/FlowController.h                  |  2 ++
 libminifi/include/c2/C2Protocol.h                   |  5 +++++
 libminifi/include/core/FlowConfiguration.h          |  4 ++++
 libminifi/include/core/flow/AdaptiveConfiguration.h |  5 +++++
 libminifi/include/core/state/UpdateController.h     |  2 ++
 libminifi/include/core/yaml/YamlConfiguration.h     |  5 +++++
 libminifi/src/FlowController.cpp                    |  4 ++++
 libminifi/src/c2/C2Agent.cpp                        |  5 ++---
 libminifi/test/unit/ControllerTests.cpp             |  4 ++++
 11 files changed, 49 insertions(+), 7 deletions(-)

diff --git a/extensions/http-curl/protocols/RESTSender.cpp 
b/extensions/http-curl/protocols/RESTSender.cpp
index ad3036cbe..d5934b817 100644
--- a/extensions/http-curl/protocols/RESTSender.cpp
+++ b/extensions/http-curl/protocols/RESTSender.cpp
@@ -94,7 +94,8 @@ void 
RESTSender::setSecurityContext(extensions::curl::HTTPClient &client, const
   client.initialize(type, url, generatedService);
 }
 
-C2Payload RESTSender::sendPayload(const std::string& url, const Direction 
direction, const C2Payload &payload, std::optional<std::string> data) {
+C2Payload RESTSender::sendPayload(const std::string& url, const Direction 
direction, const C2Payload &payload, std::optional<std::string> data,
+                                  const 
std::optional<std::vector<std::string>>& accepted_formats) {
   if (url.empty()) {
     return {payload.getOperation(), state::UpdateState::READ_ERROR};
   }
@@ -162,7 +163,9 @@ C2Payload RESTSender::sendPayload(const std::string& url, 
const Direction direct
   if (payload.getOperation() == Operation::TRANSFER) {
     auto read = 
std::make_unique<utils::HTTPReadCallback>(std::numeric_limits<size_t>::max());
     client.setReadCallback(std::move(read));
-    client.setRequestHeader("Accept", 
"application/vnd.minifi-c2+json;version=1, text/yml");
+    if (accepted_formats && !accepted_formats->empty()) {
+      client.setRequestHeader("Accept", utils::StringUtils::join(", ", 
accepted_formats.value()));
+    }
   } else {
     // Due to a bug in MiNiFi C2 the Accept header is not handled properly 
thus we need to exclude it to be compatible
     // TODO(lordgamez): The header should be re-added when the issue in MiNiFi 
C2 is fixed: https://issues.apache.org/jira/browse/NIFI-10535
@@ -181,7 +184,7 @@ C2Payload RESTSender::sendPayload(const std::string& url, 
const Direction direct
   const auto response_body_bytes = 
gsl::make_span(client.getResponseBody()).as_span<const std::byte>();
   logger_->log_trace("Received response: \"%s\"", [&] {return 
utils::StringUtils::escapeUnprintableBytes(response_body_bytes);});
   if (isOkay && !clientError && !serverError) {
-    if (payload.isRaw()) {
+    if (accepted_formats) {
       C2Payload response_payload(payload.getOperation(), 
state::UpdateState::READ_COMPLETE, true);
       response_payload.setRawData(response_body_bytes);
       return response_payload;
@@ -192,6 +195,10 @@ C2Payload RESTSender::sendPayload(const std::string& url, 
const Direction direct
   }
 }
 
+C2Payload RESTSender::fetch(const std::string& url, const 
std::vector<std::string>& accepted_formats, bool /*async*/) {
+  return sendPayload(url, Direction::RECEIVE, C2Payload(Operation::TRANSFER, 
true), std::nullopt, accepted_formats);
+}
+
 REGISTER_RESOURCE(RESTSender, DescriptionOnly);
 
 }  // namespace org::apache::nifi::minifi::c2
diff --git a/extensions/http-curl/protocols/RESTSender.h 
b/extensions/http-curl/protocols/RESTSender.h
index f4337866e..22463a272 100644
--- a/extensions/http-curl/protocols/RESTSender.h
+++ b/extensions/http-curl/protocols/RESTSender.h
@@ -18,6 +18,8 @@
 
 #include <string>
 #include <memory>
+#include <vector>
+#include <optional>
 
 #include "c2/C2Protocol.h"
 #include "c2/protocols/RESTProtocol.h"
@@ -50,12 +52,15 @@ class RESTSender : public RESTProtocol, public C2Protocol {
 
   C2Payload consumePayload(const C2Payload &payload, Direction direction, bool 
async) override;
 
+  C2Payload fetch(const std::string& url, const std::vector<std::string>& 
accepted_formats, bool async) override;
+
   void update(const std::shared_ptr<Configure> &configure) override;
 
   void initialize(core::controller::ControllerServiceProvider* controller, 
const std::shared_ptr<Configure> &configure) override;
 
  protected:
-  C2Payload sendPayload(const std::string& url, const Direction direction, 
const C2Payload &payload, std::optional<std::string> data);
+  C2Payload sendPayload(const std::string& url, const Direction direction, 
const C2Payload &payload, std::optional<std::string> data,
+                        const std::optional<std::vector<std::string>>& 
accepted_formats = std::nullopt);
 
   /**
    * Initializes the SSLContextService onto the HTTP client if one is needed
diff --git a/libminifi/include/FlowController.h 
b/libminifi/include/FlowController.h
index 84ac20af8..d412427d3 100644
--- a/libminifi/include/FlowController.h
+++ b/libminifi/include/FlowController.h
@@ -115,6 +115,8 @@ class FlowController : public 
core::controller::ForwardingControllerServiceProvi
 
   int16_t clearConnection(const std::string &connection) override;
 
+  std::vector<std::string> getSupportedConfigurationFormats() const override;
+
   int16_t applyUpdate(const std::string& /*source*/, const 
std::shared_ptr<state::Update>&) override { return -1; }
   // Asynchronous function trigger unloading and wait for a period of time
   virtual void waitUnload(uint64_t timeToWaitMs);
diff --git a/libminifi/include/c2/C2Protocol.h 
b/libminifi/include/c2/C2Protocol.h
index ce248a4d7..03d18722f 100644
--- a/libminifi/include/c2/C2Protocol.h
+++ b/libminifi/include/c2/C2Protocol.h
@@ -21,6 +21,7 @@
 #include <memory>
 #include <string>
 #include <utility>
+#include <vector>
 
 #include "C2Payload.h"
 #include "core/controller/ControllerServiceProvider.h"
@@ -72,6 +73,10 @@ class C2Protocol : public core::Connectable {
    */
   virtual C2Payload consumePayload(const C2Payload &operation, Direction 
direction = TRANSMIT, bool async = false) = 0;
 
+  virtual C2Payload fetch(const std::string& url, const 
std::vector<std::string>& /*accepted_formats*/ = {}, bool async = false) {
+    return consumePayload(url, C2Payload(Operation::TRANSFER, true), 
Direction::RECEIVE, async);
+  }
+
   /**
    * Determines if we are connected and operating
    */
diff --git a/libminifi/include/core/FlowConfiguration.h 
b/libminifi/include/core/FlowConfiguration.h
index f58aea588..be4e75df2 100644
--- a/libminifi/include/core/FlowConfiguration.h
+++ b/libminifi/include/core/FlowConfiguration.h
@@ -96,6 +96,10 @@ class FlowConfiguration : public CoreComponent {
     return flow_version_;
   }
 
+  virtual std::vector<std::string> getSupportedFormats() const {
+    return {};
+  }
+
   std::shared_ptr<Configure> getConfiguration() {  // cannot be const as 
getters mutate the underlying map
     return configuration_;
   }
diff --git a/libminifi/include/core/flow/AdaptiveConfiguration.h 
b/libminifi/include/core/flow/AdaptiveConfiguration.h
index 67a4c5885..e897905c9 100644
--- a/libminifi/include/core/flow/AdaptiveConfiguration.h
+++ b/libminifi/include/core/flow/AdaptiveConfiguration.h
@@ -19,6 +19,7 @@
 
 #include <string>
 #include <memory>
+#include <vector>
 
 #include "StructuredConfiguration.h"
 
@@ -28,6 +29,10 @@ class AdaptiveConfiguration : public StructuredConfiguration 
{
  public:
   explicit AdaptiveConfiguration(ConfigurationContext ctx);
 
+  std::vector<std::string> getSupportedFormats() const override {
+    return {"application/vnd.minifi-c2+json;version=1", "text/yml"};
+  }
+
   std::unique_ptr<core::ProcessGroup> getRootFromPayload(const std::string 
&payload) override;
 };
 
diff --git a/libminifi/include/core/state/UpdateController.h 
b/libminifi/include/core/state/UpdateController.h
index 4ee42bc3e..c63b385bc 100644
--- a/libminifi/include/core/state/UpdateController.h
+++ b/libminifi/include/core/state/UpdateController.h
@@ -158,6 +158,8 @@ class StateMonitor : public StateController {
    */
   virtual int16_t clearConnection(const std::string &connection) = 0;
 
+  virtual std::vector<std::string> getSupportedConfigurationFormats() const = 
0;
+
   /**
    * Apply an update with the provided string.
    *
diff --git a/libminifi/include/core/yaml/YamlConfiguration.h 
b/libminifi/include/core/yaml/YamlConfiguration.h
index 4d82ea394..5db71ea0b 100644
--- a/libminifi/include/core/yaml/YamlConfiguration.h
+++ b/libminifi/include/core/yaml/YamlConfiguration.h
@@ -21,6 +21,7 @@
 #include <optional>
 #include <string>
 #include <unordered_set>
+#include <vector>
 
 #include "core/FlowConfiguration.h"
 #include "core/logging/LoggerFactory.h"
@@ -44,6 +45,10 @@ class YamlConfiguration : public 
flow::StructuredConfiguration {
 
   ~YamlConfiguration() override = default;
 
+  std::vector<std::string> getSupportedFormats() const override {
+    return {"text/yml"};
+  }
+
   /**
    * Returns a shared pointer to a ProcessGroup object containing the
    * flow configuration. The yamlConfigStream argument must point to
diff --git a/libminifi/src/FlowController.cpp b/libminifi/src/FlowController.cpp
index a31e6536b..efc69a36b 100644
--- a/libminifi/src/FlowController.cpp
+++ b/libminifi/src/FlowController.cpp
@@ -419,6 +419,10 @@ int16_t FlowController::resume() {
   return 0;
 }
 
+std::vector<std::string> FlowController::getSupportedConfigurationFormats() 
const {
+  return flow_configuration_->getSupportedFormats();
+}
+
 int16_t FlowController::applyUpdate(const std::string &source, const 
std::string &configuration, bool persist, const std::optional<std::string>& 
flow_id) {
   if (applyConfiguration(source, configuration, flow_id)) {
     if (persist) {
diff --git a/libminifi/src/c2/C2Agent.cpp b/libminifi/src/c2/C2Agent.cpp
index 76f5be0ad..faa89b971 100644
--- a/libminifi/src/c2/C2Agent.cpp
+++ b/libminifi/src/c2/C2Agent.cpp
@@ -842,8 +842,7 @@ std::optional<std::string> C2Agent::fetchFlow(const 
std::string& uri) const {
     return std::nullopt;
   }
 
-  C2Payload payload(Operation::TRANSFER, true);
-  C2Payload &&response = 
protocol_.load()->consumePayload(resolved_url.value(), payload, RECEIVE, false);
+  C2Payload response = protocol_.load()->fetch(resolved_url.value(), 
update_sink_->getSupportedConfigurationFormats());
 
   return response.getRawDataAsString();
 }
@@ -995,7 +994,7 @@ void C2Agent::handleAssetUpdate(const C2ContentResponse& 
resp) {
     return;
   }
 
-  C2Payload file_response = protocol_.load()->consumePayload(url, 
C2Payload(Operation::TRANSFER, true), RECEIVE, false);
+  C2Payload file_response = protocol_.load()->fetch(url);
 
   if (file_response.getStatus().getState() != 
state::UpdateState::READ_COMPLETE) {
     send_error("Failed to fetch asset from '" + url + "'");
diff --git a/libminifi/test/unit/ControllerTests.cpp 
b/libminifi/test/unit/ControllerTests.cpp
index 796d1eae2..58b990649 100644
--- a/libminifi/test/unit/ControllerTests.cpp
+++ b/libminifi/test/unit/ControllerTests.cpp
@@ -152,6 +152,10 @@ class TestUpdateSink : public minifi::state::StateMonitor {
     return 0;
   }
 
+  std::vector<std::string> getSupportedConfigurationFormats() const override {
+    return {};
+  }
+
   /**
    * Apply an update with the provided string.
    *

Reply via email to