This is an automated email from the ASF dual-hosted git repository. szaszm pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git
commit 4054419f34b26ed552be97bb2d8c7ee6f867ceec Author: Adam Debreceni <[email protected]> AuthorDate: Mon Jan 20 16:54:06 2025 +0100 MINIFICPP-2485 fix: Support Expression Language in InvokeHTTP "Remote URL" property Closes #1904 Signed-off-by: Marton Szasz <[email protected]> --- .../standard-processors/processors/InvokeHTTP.cpp | 68 +++++-- .../standard-processors/processors/InvokeHTTP.h | 71 ++++++- .../tests/unit/HttpClientStoreTests.cpp | 219 +++++++++++++++++++++ 3 files changed, 339 insertions(+), 19 deletions(-) diff --git a/extensions/standard-processors/processors/InvokeHTTP.cpp b/extensions/standard-processors/processors/InvokeHTTP.cpp index 7104359a8..19f0f4361 100644 --- a/extensions/standard-processors/processors/InvokeHTTP.cpp +++ b/extensions/standard-processors/processors/InvokeHTTP.cpp @@ -35,6 +35,45 @@ #include "range/v3/algorithm/any_of.hpp" namespace org::apache::nifi::minifi::processors { +namespace invoke_http { + +HttpClientStore::HttpClientWrapper HttpClientStore::getClient(const std::string& url) { + std::unique_lock lock(clients_mutex_); + const auto it = std::find_if(std::begin(unused_clients_), std::end(unused_clients_), [&url](const auto& client) { + return client->getURL() == url; + }); + if (it != std::end(unused_clients_)) { + used_clients_.splice(used_clients_.end(), unused_clients_, it); + return {*this, **it}; + } + + if (used_clients_.size() + unused_clients_.size() < max_size_) { + auto client = create_client_function_(url); + used_clients_.push_back(std::move(client)); + return {*this, *used_clients_.back()}; + } else { + cv_.wait(lock, [this] { return !unused_clients_.empty(); }); + auto client = create_client_function_(url); + unused_clients_.front() = std::move(client); + used_clients_.splice(used_clients_.end(), unused_clients_, unused_clients_.begin()); + return {*this, *used_clients_.back()}; + } +} + +void HttpClientStore::returnClient(http::HTTPClient& client) { + std::unique_lock lock(clients_mutex_); + const auto it = std::find_if(std::begin(used_clients_), std::end(used_clients_), + [&client](const auto& elem) { return &client == elem.get(); }); + if (it == std::end(used_clients_)) { + logger_->log_error("Couldn't find HTTP client in client store to be returned"); + return; + } + unused_clients_.splice(unused_clients_.end(), used_clients_, it); + lock.unlock(); + cv_.notify_one(); +} + +} // namespace invoke_http std::string InvokeHTTP::DefaultContentType = "application/octet-stream"; @@ -64,8 +103,9 @@ void setupClientTransferEncoding(http::HTTPClient& client, bool use_chunked_enco } // namespace void InvokeHTTP::setupMembersFromProperties(const core::ProcessContext& context) { - if (!context.getProperty(URL, url_)) - throw Exception(PROCESS_SCHEDULE_EXCEPTION, "URL property missing or invalid"); + std::string url; + if (!context.getProperty(URL, url) || url.empty()) + throw Exception(PROCESS_SCHEDULE_EXCEPTION, "URL property missing or empty"); method_ = utils::parseEnumProperty<http::HttpRequestMethod>(context, Method); @@ -121,9 +161,9 @@ void InvokeHTTP::setupMembersFromProperties(const core::ProcessContext& context) } } -std::unique_ptr<minifi::http::HTTPClient> InvokeHTTP::createHTTPClientFromMembers() const { - auto client = std::make_unique<minifi::http::HTTPClient>(); - client->initialize(method_, url_, ssl_context_service_); +gsl::not_null<std::unique_ptr<http::HTTPClient>> InvokeHTTP::createHTTPClientFromMembers(const std::string& url) const { + auto client = std::make_unique<http::HTTPClient>(); + client->initialize(method_, url, ssl_context_service_); setupClientTimeouts(*client, connect_timeout_, read_timeout_); client->setHTTPProxy(proxy_); client->setFollowRedirects(follow_redirects_); @@ -133,18 +173,18 @@ std::unique_ptr<minifi::http::HTTPClient> InvokeHTTP::createHTTPClientFromMember client->setMaximumUploadSpeed(maximum_upload_speed_.getValue()); client->setMaximumDownloadSpeed(maximum_download_speed_.getValue()); - return client; + return gsl::make_not_null(std::move(client)); } void InvokeHTTP::onSchedule(core::ProcessContext& context, core::ProcessSessionFactory&) { setupMembersFromProperties(context); - auto create_client = [this]() -> std::unique_ptr<minifi::http::HTTPClient> { - return createHTTPClientFromMembers(); + auto create_client = [this](const std::string& url) -> gsl::not_null<std::unique_ptr<minifi::http::HTTPClient>> { + return createHTTPClientFromMembers(url); }; - client_queue_ = utils::ResourceQueue<http::HTTPClient>::create(create_client, getMaxConcurrentTasks(), std::nullopt, logger_); + client_queue_ = std::make_unique<invoke_http::HttpClientStore>(getMaxConcurrentTasks() * 2, create_client); } bool InvokeHTTP::shouldEmitFlowFile() const { @@ -202,9 +242,15 @@ void InvokeHTTP::onTrigger(core::ProcessContext& context, core::ProcessSession& logger_->log_debug("InvokeHTTP -- Received flowfile"); } - auto client = client_queue_->getResource(); + auto url = context.getProperty(URL, flow_file.get()); + if (!url || url->empty()) { + logger_->log_error("InvokeHTTP -- URL is empty, transferring to failure"); + session.transfer(flow_file, RelFailure); + return; + } - onTriggerWithClient(context, session, flow_file, *client); + auto client = client_queue_->getClient(*url); + onTriggerWithClient(context, session, flow_file, client.get()); } void InvokeHTTP::onTriggerWithClient(core::ProcessContext& context, core::ProcessSession& session, diff --git a/extensions/standard-processors/processors/InvokeHTTP.h b/extensions/standard-processors/processors/InvokeHTTP.h index bb5e9d283..46caabdf6 100644 --- a/extensions/standard-processors/processors/InvokeHTTP.h +++ b/extensions/standard-processors/processors/InvokeHTTP.h @@ -23,6 +23,8 @@ #include <string> #include <unordered_map> #include <utility> +#include <optional> +#include <list> #include "Core.h" #include "FlowFileRecord.h" @@ -41,9 +43,62 @@ #include "utils/Enum.h" #include "utils/RegexUtils.h" +namespace org::apache::nifi::minifi::test { +class HttpClientStoreTestAccessor; +} + namespace org::apache::nifi::minifi::processors { namespace invoke_http { +class HttpClientStore { + public: + HttpClientStore(const size_t size, std::function<gsl::not_null<std::unique_ptr<http::HTTPClient>>(const std::string&)> create_client_function) + : max_size_(size), + create_client_function_(std::move(create_client_function)) { + } + HttpClientStore(const HttpClientStore&) = delete; + HttpClientStore& operator=(const HttpClientStore&) = delete; + HttpClientStore(HttpClientStore&&) = delete; + HttpClientStore& operator=(HttpClientStore&&) = delete; + ~HttpClientStore() = default; + + class HttpClientWrapper { + public: + HttpClientWrapper(HttpClientStore& client_store, http::HTTPClient& client) + :client_(client), client_store_(client_store) { } + + HttpClientWrapper(const HttpClientWrapper&) = delete; + HttpClientWrapper& operator=(const HttpClientWrapper&) = delete; + HttpClientWrapper(HttpClientWrapper&& src) = delete; + HttpClientWrapper& operator=(HttpClientWrapper&& src) = delete; + ~HttpClientWrapper() { + client_store_.returnClient(client_); + } + + [[nodiscard]] http::HTTPClient& get() const { + return client_; + } + + private: + http::HTTPClient& client_; + HttpClientStore& client_store_; + }; + + [[nodiscard]] HttpClientWrapper getClient(const std::string& url); + + private: + friend class test::HttpClientStoreTestAccessor; + void returnClient(http::HTTPClient& client); + + std::mutex clients_mutex_; + std::condition_variable cv_; + const size_t max_size_; + std::list<gsl::not_null<std::unique_ptr<http::HTTPClient>>> used_clients_; + std::list<gsl::not_null<std::unique_ptr<http::HTTPClient>>> unused_clients_; + std::function<gsl::not_null<std::unique_ptr<http::HTTPClient>>(const std::string&)> create_client_function_; + std::shared_ptr<core::logging::Logger> logger_{core::logging::LoggerFactory<HttpClientWrapper>::getLogger()}; +}; + enum class InvalidHTTPHeaderFieldHandlingOption { fail, transform, @@ -64,8 +119,8 @@ class InvokeHTTP : public core::Processor { } EXTENSIONAPI static constexpr const char* Description = "An HTTP client processor which can interact with a configurable HTTP Endpoint. " - "The destination URL and HTTP Method are configurable. FlowFile attributes are converted to HTTP headers and the " - "FlowFile contents are included as the body of the request (if the HTTP Method is PUT, POST or PATCH)."; + "The destination URL and HTTP Method are configurable. FlowFile attributes are converted to HTTP headers and the " + "FlowFile contents are included as the body of the request (if the HTTP Method is PUT, POST or PATCH)."; EXTENSIONAPI static constexpr auto Method = core::PropertyDefinitionBuilder<magic_enum::enum_count<http::HttpRequestMethod>()>::createProperty("HTTP Method") .withDescription("HTTP request method. Methods other than POST, PUT and PATCH will be sent without a message body.") @@ -106,7 +161,7 @@ class InvokeHTTP : public core::Processor { EXTENSIONAPI static constexpr auto SSLContext = core::PropertyDefinitionBuilder<0, 0, 1>::createProperty("SSL Context Service") .withDescription("The SSL Context Service used to provide client certificate information for TLS/SSL (https) connections.") .isRequired(false) - .withAllowedTypes<minifi::controllers::SSLContextService>() + .withAllowedTypes<controllers::SSLContextService>() .withExclusiveOfProperties({{{"Remote URL", "^http:.*$"}}}) .build(); EXTENSIONAPI static constexpr auto ProxyHost = core::PropertyDefinitionBuilder<>::createProperty("Proxy Host") @@ -268,15 +323,15 @@ class InvokeHTTP : public core::Processor { private: void route(const std::shared_ptr<core::FlowFile>& request, const std::shared_ptr<core::FlowFile>& response, core::ProcessSession& session, - core::ProcessContext& context, bool is_success, int64_t status_code); + core::ProcessContext& context, bool is_success, int64_t status_code); [[nodiscard]] bool shouldEmitFlowFile() const; void onTriggerWithClient(core::ProcessContext& context, core::ProcessSession& session, - const std::shared_ptr<core::FlowFile>& flow_file, minifi::http::HTTPClient& client); + const std::shared_ptr<core::FlowFile>& flow_file, http::HTTPClient& client); [[nodiscard]] bool appendHeaders(const core::FlowFile& flow_file, /*std::invocable<std::string, std::string>*/ auto append_header); void setupMembersFromProperties(const core::ProcessContext& context); - std::unique_ptr<minifi::http::HTTPClient> createHTTPClientFromMembers() const; + gsl::not_null<std::unique_ptr<http::HTTPClient>> createHTTPClientFromMembers(const std::string& url) const; http::HttpRequestMethod method_{}; std::optional<utils::Regex> attributes_to_send_; @@ -289,7 +344,6 @@ class InvokeHTTP : public core::Processor { core::DataTransferSpeedValue maximum_upload_speed_{0}; core::DataTransferSpeedValue maximum_download_speed_{0}; - std::string url_; std::shared_ptr<minifi::controllers::SSLContextService> ssl_context_service_; std::chrono::milliseconds connect_timeout_{std::chrono::seconds(30)}; @@ -303,7 +357,8 @@ class InvokeHTTP : public core::Processor { invoke_http::InvalidHTTPHeaderFieldHandlingOption invalid_http_header_field_handling_strategy_{}; std::shared_ptr<core::logging::Logger> logger_{core::logging::LoggerFactory<InvokeHTTP>::getLogger(uuid_)}; - std::shared_ptr<utils::ResourceQueue<http::HTTPClient>> client_queue_; + + std::unique_ptr<invoke_http::HttpClientStore> client_queue_; }; } // namespace org::apache::nifi::minifi::processors diff --git a/extensions/standard-processors/tests/unit/HttpClientStoreTests.cpp b/extensions/standard-processors/tests/unit/HttpClientStoreTests.cpp new file mode 100644 index 000000000..c3bc7f985 --- /dev/null +++ b/extensions/standard-processors/tests/unit/HttpClientStoreTests.cpp @@ -0,0 +1,219 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include <thread> + +#include "unit/TestBase.h" +#include "unit/Catch.h" +#include "InvokeHTTP.h" +#include "http/BaseHTTPClient.h" + +using namespace std::literals::chrono_literals; + +namespace org::apache::nifi::minifi::test { + +class HttpClientStoreTestAccessor { + public: + static const std::list<gsl::not_null<std::unique_ptr<minifi::http::HTTPClient>>>& getUsedClients(minifi::processors::invoke_http::HttpClientStore& store) { + return store.used_clients_; + } + + static const std::list<gsl::not_null<std::unique_ptr<minifi::http::HTTPClient>>>& getUnusedClients(minifi::processors::invoke_http::HttpClientStore& store) { + return store.unused_clients_; + } + + static size_t getMaxSize(minifi::processors::invoke_http::HttpClientStore& store) { + return store.max_size_; + } +}; + +TEST_CASE("HttpClientStore can create new client for a url and is returned after it's not used anymore") { + minifi::processors::invoke_http::HttpClientStore store(2, [](const std::string& url) { + auto client = std::make_unique<minifi::http::HTTPClient>(); + client->initialize(minifi::http::HttpRequestMethod::GET, url, nullptr); + return gsl::make_not_null(std::move(client)); + }); + REQUIRE(HttpClientStoreTestAccessor::getMaxSize(store) == 2); + const auto& used_clients = HttpClientStoreTestAccessor::getUsedClients(store); + const auto& unused_clients = HttpClientStoreTestAccessor::getUnusedClients(store); + REQUIRE(used_clients.empty()); + REQUIRE(unused_clients.empty()); + { + [[maybe_unused]] auto client = store.getClient("http://localhost:8080"); + REQUIRE(used_clients.size() == 1); + REQUIRE(unused_clients.empty()); + } + + REQUIRE(used_clients.empty()); + REQUIRE(unused_clients.size() == 1); +} + +TEST_CASE("A http client can be reused") { + minifi::processors::invoke_http::HttpClientStore store(2, [](const std::string& url) { + auto client = std::make_unique<minifi::http::HTTPClient>(); + client->initialize(minifi::http::HttpRequestMethod::GET, url, nullptr); + return gsl::make_not_null(std::move(client)); + }); + minifi::http::HTTPClient* client_ptr = nullptr; + const auto& used_clients = HttpClientStoreTestAccessor::getUsedClients(store); + const auto& unused_clients = HttpClientStoreTestAccessor::getUnusedClients(store); + { + auto client = store.getClient("http://localhost:8080"); + client_ptr = &client.get(); + } + + { + auto client = store.getClient("http://localhost:8080"); + REQUIRE(&client.get() == client_ptr); + REQUIRE(used_clients.size() == 1); + REQUIRE(unused_clients.empty()); + } + REQUIRE(used_clients.empty()); + REQUIRE(unused_clients.size() == 1); +} + +TEST_CASE("A new url always creates a new client") { + minifi::processors::invoke_http::HttpClientStore store(3, [](const std::string& url) { + auto client = std::make_unique<minifi::http::HTTPClient>(); + client->initialize(minifi::http::HttpRequestMethod::GET, url, nullptr); + return gsl::make_not_null(std::move(client)); + }); + const auto& used_clients = HttpClientStoreTestAccessor::getUsedClients(store); + const auto& unused_clients = HttpClientStoreTestAccessor::getUnusedClients(store); + { + [[maybe_unused]] auto client1 = store.getClient("http://localhost:8080"); + [[maybe_unused]] auto client2 = store.getClient("http://localhost:8081"); + [[maybe_unused]] auto client3 = store.getClient("http://localhost:8082"); + REQUIRE(used_clients.size() == 3); + REQUIRE(unused_clients.empty()); + CHECK(used_clients.front()->getURL() == "http://localhost:8080"); + CHECK((*std::next(used_clients.begin(), 1))->getURL() == "http://localhost:8081"); + CHECK((*std::next(used_clients.begin(), 2))->getURL() == "http://localhost:8082"); + } + REQUIRE(used_clients.empty()); + REQUIRE(unused_clients.size() == 3); + CHECK(unused_clients.front()->getURL() == "http://localhost:8082"); + CHECK((*std::next(unused_clients.begin(), 1))->getURL() == "http://localhost:8081"); + CHECK((*std::next(unused_clients.begin(), 2))->getURL() == "http://localhost:8080"); +} + +TEST_CASE("If a store is full, the first unused client is replaced by the newly requested one") { + minifi::processors::invoke_http::HttpClientStore store(3, [](const std::string& url) { + auto client = std::make_unique<minifi::http::HTTPClient>(); + client->initialize(minifi::http::HttpRequestMethod::GET, url, nullptr); + return gsl::make_not_null(std::move(client)); + }); + const auto& used_clients = HttpClientStoreTestAccessor::getUsedClients(store); + const auto& unused_clients = HttpClientStoreTestAccessor::getUnusedClients(store); + { + [[maybe_unused]] auto client1 = store.getClient("http://localhost:8080"); + { + [[maybe_unused]] auto client2 = store.getClient("http://localhost:8081"); + } + [[maybe_unused]] auto client3 = store.getClient("http://localhost:8082"); + REQUIRE(used_clients.size() == 2); + REQUIRE(unused_clients.size() == 1); + CHECK(unused_clients.front()->getURL() == "http://localhost:8081"); + CHECK(used_clients.front()->getURL() == "http://localhost:8080"); + CHECK((*std::next(used_clients.begin(), 1))->getURL() == "http://localhost:8082"); + + [[maybe_unused]] auto client4 = store.getClient("http://localhost:8083"); + REQUIRE(used_clients.size() == 3); + REQUIRE(unused_clients.empty()); + CHECK(used_clients.front()->getURL() == "http://localhost:8080"); + CHECK((*std::next(used_clients.begin(), 1))->getURL() == "http://localhost:8082"); + CHECK((*std::next(used_clients.begin(), 2))->getURL() == "http://localhost:8083"); + } + REQUIRE(used_clients.empty()); + REQUIRE(unused_clients.size() == 3); + CHECK(unused_clients.front()->getURL() == "http://localhost:8083"); + CHECK((*std::next(unused_clients.begin(), 1))->getURL() == "http://localhost:8082"); + CHECK((*std::next(unused_clients.begin(), 2))->getURL() == "http://localhost:8080"); +} + +TEST_CASE("Multiple unused clients are present the oldest one is replaced") { + minifi::processors::invoke_http::HttpClientStore store(4, [](const std::string& url) { + auto client = std::make_unique<minifi::http::HTTPClient>(); + client->initialize(minifi::http::HttpRequestMethod::GET, url, nullptr); + return gsl::make_not_null(std::move(client)); + }); + const auto& used_clients = HttpClientStoreTestAccessor::getUsedClients(store); + const auto& unused_clients = HttpClientStoreTestAccessor::getUnusedClients(store); + { + [[maybe_unused]] auto client1 = store.getClient("http://localhost:8080"); + { + [[maybe_unused]] auto client2 = store.getClient("http://localhost:8081"); + [[maybe_unused]] auto client3 = store.getClient("http://localhost:8082"); + } + [[maybe_unused]] auto client4 = store.getClient("http://localhost:8083"); + REQUIRE(used_clients.size() == 2); + REQUIRE(unused_clients.size() == 2); + CHECK(unused_clients.front()->getURL() == "http://localhost:8082"); + CHECK((*std::next(unused_clients.begin(), 1))->getURL() == "http://localhost:8081"); + CHECK(used_clients.front()->getURL() == "http://localhost:8080"); + CHECK((*std::next(used_clients.begin(), 1))->getURL() == "http://localhost:8083"); + + [[maybe_unused]] auto client5 = store.getClient("http://localhost:8084"); + CHECK(unused_clients.front()->getURL() == "http://localhost:8081"); + + CHECK(used_clients.front()->getURL() == "http://localhost:8080"); + CHECK((*std::next(used_clients.begin(), 1))->getURL() == "http://localhost:8083"); + CHECK((*std::next(used_clients.begin(), 2))->getURL() == "http://localhost:8084"); + } + REQUIRE(used_clients.empty()); + REQUIRE(unused_clients.size() == 4); + CHECK(unused_clients.front()->getURL() == "http://localhost:8081"); + CHECK((*std::next(unused_clients.begin(), 1))->getURL() == "http://localhost:8084"); + CHECK((*std::next(unused_clients.begin(), 2))->getURL() == "http://localhost:8083"); + CHECK((*std::next(unused_clients.begin(), 3))->getURL() == "http://localhost:8080"); +} + +TEST_CASE("If all clients are in use, the call will block until a client is returned") { + minifi::processors::invoke_http::HttpClientStore store(2, [](const std::string& url) { + auto client = std::make_unique<minifi::http::HTTPClient>(); + client->initialize(minifi::http::HttpRequestMethod::GET, url, nullptr); + return gsl::make_not_null(std::move(client)); + }); + bool client2_created{false}; + std::mutex mutex; + std::condition_variable client2_created_cv; + [[maybe_unused]] auto client1 = store.getClient("http://localhost:8080"); + + std::thread thread1([&store, &mutex, &client2_created, &client2_created_cv] { + std::unique_lock lock(mutex); + [[maybe_unused]] auto client2 = store.getClient("http://localhost:8081"); + client2_created = true; + lock.unlock(); + client2_created_cv.notify_one(); + std::this_thread::sleep_for(300ms); + }); + + std::thread thread2([&store, &mutex, &client2_created, &client2_created_cv] { + std::unique_lock lock(mutex); + client2_created_cv.wait(lock, [&client2_created] { return client2_created; }); + [[maybe_unused]] auto client3 = store.getClient("http://localhost:8082"); + auto& used_clients = HttpClientStoreTestAccessor::getUsedClients(store); + REQUIRE(used_clients.size() == 2); + CHECK(used_clients.front()->getURL() == "http://localhost:8080"); + CHECK((*std::next(used_clients.begin(), 1))->getURL() == "http://localhost:8082"); + }); + + thread1.join(); + thread2.join(); +} + +} // namespace org::apache::nifi::minifi::test
