This is an automated email from the ASF dual-hosted git repository. szaszm pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git
commit 5e2a5501c86c68b9414ac1133adad49d29b53459 Author: Ferenc Gerlits <[email protected]> AuthorDate: Tue Sep 12 16:00:00 2023 +0200 MINIFICPP-2088 InvokeHTTP should check whether it needs to run ... before it creates/retrieves the HTTP client. Closes #1643 Signed-off-by: Marton Szasz <[email protected]> --- extensions/http-curl/client/HTTPClient.h | 1 - extensions/http-curl/processors/InvokeHTTP.cpp | 41 +++++++++++--------------- extensions/http-curl/processors/InvokeHTTP.h | 9 +++--- 3 files changed, 22 insertions(+), 29 deletions(-) diff --git a/extensions/http-curl/client/HTTPClient.h b/extensions/http-curl/client/HTTPClient.h index 2e98bab67..d6a1894f7 100644 --- a/extensions/http-curl/client/HTTPClient.h +++ b/extensions/http-curl/client/HTTPClient.h @@ -119,7 +119,6 @@ class HTTPClient : public utils::BaseHTTPClient, public core::Connectable { const std::vector<char>& getResponseBody() override; void set_request_method(std::string method) override; - std::string& getRequestMethod() { return method_; } void setPeerVerification(bool peer_verification) override; void setHostVerification(bool host_verification) override; diff --git a/extensions/http-curl/processors/InvokeHTTP.cpp b/extensions/http-curl/processors/InvokeHTTP.cpp index 03412c2a5..f9696e425 100644 --- a/extensions/http-curl/processors/InvokeHTTP.cpp +++ b/extensions/http-curl/processors/InvokeHTTP.cpp @@ -19,7 +19,6 @@ #include "InvokeHTTP.h" #include <cinttypes> -#include <cstdint> #include <memory> #include <string> #include <utility> @@ -27,10 +26,8 @@ #include "core/FlowFile.h" #include "core/ProcessContext.h" -#include "core/Relationship.h" #include "core/Resource.h" #include "io/BufferStream.h" -#include "ResourceClaim.h" #include "utils/gsl.h" #include "utils/ProcessorConfigUtils.h" #include "utils/OptionalUtils.h" @@ -96,6 +93,8 @@ void setupClientTransferEncoding(extensions::curl::HTTPClient& client, bool use_ } // namespace void InvokeHTTP::setupMembersFromProperties(const core::ProcessContext& context) { + method_ = (context.getProperty(Method) | utils::orElse([] { throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Method property missing or invalid"); })).value(); + context.getProperty(SendMessageBody, send_message_body_); attributes_to_send_ = context.getProperty(AttributesToSend) @@ -103,7 +102,6 @@ void InvokeHTTP::setupMembersFromProperties(const core::ProcessContext& context) | utils::map([](const std::string& regex_str) { return utils::Regex{regex_str}; }) | utils::orElse([this] { logger_->log_debug("%s is missing, so the default value will be used", std::string{AttributesToSend.name}); }); - always_output_response_ = (context.getProperty(AlwaysOutputResponse) | utils::flatMap(&utils::StringUtils::toBool)).value_or(false); penalize_no_retry_ = (context.getProperty(PenalizeOnNoRetry) | utils::flatMap(&utils::StringUtils::toBool)).value_or(false); @@ -120,10 +118,6 @@ void InvokeHTTP::setupMembersFromProperties(const core::ProcessContext& context) } std::unique_ptr<minifi::extensions::curl::HTTPClient> InvokeHTTP::createHTTPClientFromPropertiesAndMembers(const core::ProcessContext& context) const { - std::string method; - if (!context.getProperty(Method, method)) - throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Method property missing or invalid"); - std::string url; if (!context.getProperty(URL, url)) throw Exception(PROCESS_SCHEDULE_EXCEPTION, "URL property missing or invalid"); @@ -140,7 +134,7 @@ std::unique_ptr<minifi::extensions::curl::HTTPClient> InvokeHTTP::createHTTPClie } auto client = std::make_unique<minifi::extensions::curl::HTTPClient>(); - client->initialize(std::move(method), std::move(url), std::move(ssl_context_service)); + client->initialize(method_, std::move(url), std::move(ssl_context_service)); setupClientTimeouts(*client, context); setupClientProxy(*client, context); setupClientFollowRedirects(*client, context); @@ -167,9 +161,8 @@ void InvokeHTTP::onSchedule(const std::shared_ptr<core::ProcessContext>& context client_queue_ = utils::ResourceQueue<extensions::curl::HTTPClient>::create(create_client, getMaxConcurrentTasks(), std::nullopt, logger_); } -bool InvokeHTTP::shouldEmitFlowFile(minifi::extensions::curl::HTTPClient& client) { - auto method = client.getRequestMethod(); - return ("POST" == method || "PUT" == method || "PATCH" == method); +bool InvokeHTTP::shouldEmitFlowFile() const { + return ("POST" == method_ || "PUT" == method_ || "PATCH" == method_); } /** @@ -208,21 +201,14 @@ bool InvokeHTTP::appendHeaders(const core::FlowFile& flow_file, /*std::invocable void InvokeHTTP::onTrigger(const std::shared_ptr<core::ProcessContext>& context, const std::shared_ptr<core::ProcessSession>& session) { gsl_Expects(session && context && client_queue_); - auto client = client_queue_->getResource(); - - onTriggerWithClient(context, session, *client); -} - -void InvokeHTTP::onTriggerWithClient(const std::shared_ptr<core::ProcessContext>& context, const std::shared_ptr<core::ProcessSession>& session, - minifi::extensions::curl::HTTPClient& client) { auto flow_file = session->get(); if (flow_file == nullptr) { - if (!shouldEmitFlowFile(client)) { - logger_->log_debug("InvokeHTTP -- create flow file with %s", client.getRequestMethod()); + if (!shouldEmitFlowFile()) { + logger_->log_debug("InvokeHTTP -- create flow file with %s", method_); flow_file = session->create(); } else { - logger_->log_debug("Exiting because method is %s and there is no flowfile available to execute it, yielding", client.getRequestMethod()); + logger_->log_debug("Exiting because method is %s and there is no flowfile available to execute it, yielding", method_); yield(); return; } @@ -230,7 +216,14 @@ void InvokeHTTP::onTriggerWithClient(const std::shared_ptr<core::ProcessContext> logger_->log_debug("InvokeHTTP -- Received flowfile"); } - logger_->log_debug("onTrigger InvokeHTTP with %s to %s", client.getRequestMethod(), client.getURL()); + auto client = client_queue_->getResource(); + + onTriggerWithClient(context, session, flow_file, *client); +} + +void InvokeHTTP::onTriggerWithClient(const std::shared_ptr<core::ProcessContext>& context, const std::shared_ptr<core::ProcessSession>& session, + const std::shared_ptr<core::FlowFile>& flow_file, minifi::extensions::curl::HTTPClient& client) { + logger_->log_debug("onTrigger InvokeHTTP with %s to %s", method_, client.getURL()); const auto remove_callback_from_client_at_exit = gsl::finally([&client] { client.setUploadCallback({}); @@ -238,7 +231,7 @@ void InvokeHTTP::onTriggerWithClient(const std::shared_ptr<core::ProcessContext> std::string transaction_id = utils::IdGenerator::getIdGenerator()->generate().to_string(); - if (shouldEmitFlowFile(client)) { + if (shouldEmitFlowFile()) { logger_->log_trace("InvokeHTTP -- reading flowfile"); const auto flow_file_reader_stream = session->getFlowFileContentStream(flow_file); if (flow_file_reader_stream) { diff --git a/extensions/http-curl/processors/InvokeHTTP.h b/extensions/http-curl/processors/InvokeHTTP.h index d82be800e..89fd640c3 100644 --- a/extensions/http-curl/processors/InvokeHTTP.h +++ b/extensions/http-curl/processors/InvokeHTTP.h @@ -258,16 +258,17 @@ class InvokeHTTP : public core::Processor { private: void route(const std::shared_ptr<core::FlowFile>& request, const std::shared_ptr<core::FlowFile>& response, const std::shared_ptr<core::ProcessSession>& session, const std::shared_ptr<core::ProcessContext>& context, bool is_success, int64_t status_code); - static bool shouldEmitFlowFile(minifi::extensions::curl::HTTPClient& client); - void onTriggerWithClient(const std::shared_ptr<core::ProcessContext>& context, const std::shared_ptr<core::ProcessSession>& session, minifi::extensions::curl::HTTPClient& client); + [[nodiscard]] bool shouldEmitFlowFile() const; + void onTriggerWithClient(const std::shared_ptr<core::ProcessContext>& context, const std::shared_ptr<core::ProcessSession>& session, + const std::shared_ptr<core::FlowFile>& flow_file, minifi::extensions::curl::HTTPClient& client); [[nodiscard]] bool appendHeaders(const core::FlowFile& flow_file, /*std::invocable<std::string, std::string>*/ auto append_header); void setupMembersFromProperties(const core::ProcessContext& context); std::unique_ptr<minifi::extensions::curl::HTTPClient> createHTTPClientFromPropertiesAndMembers(const core::ProcessContext& context) const; + std::string method_; std::optional<utils::Regex> attributes_to_send_; - std::optional<std::string> put_response_body_in_attribute_; bool always_output_response_{false}; bool use_chunked_encoding_{false}; @@ -275,7 +276,7 @@ class InvokeHTTP : public core::Processor { bool send_message_body_{true}; bool send_date_header_{true}; - invoke_http::InvalidHTTPHeaderFieldHandlingOption invalid_http_header_field_handling_strategy_; + invoke_http::InvalidHTTPHeaderFieldHandlingOption invalid_http_header_field_handling_strategy_{}; std::shared_ptr<core::logging::Logger> logger_{core::logging::LoggerFactory<InvokeHTTP>::getLogger(uuid_)}; std::shared_ptr<utils::ResourceQueue<extensions::curl::HTTPClient>> client_queue_;
