This is an automated email from the ASF dual-hosted git repository.

szaszm pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git

commit 4054419f34b26ed552be97bb2d8c7ee6f867ceec
Author: Adam Debreceni <[email protected]>
AuthorDate: Mon Jan 20 16:54:06 2025 +0100

    MINIFICPP-2485 fix: Support Expression Language in InvokeHTTP "Remote URL" 
property
    
    Closes #1904
    
    Signed-off-by: Marton Szasz <[email protected]>
---
 .../standard-processors/processors/InvokeHTTP.cpp  |  68 +++++--
 .../standard-processors/processors/InvokeHTTP.h    |  71 ++++++-
 .../tests/unit/HttpClientStoreTests.cpp            | 219 +++++++++++++++++++++
 3 files changed, 339 insertions(+), 19 deletions(-)

diff --git a/extensions/standard-processors/processors/InvokeHTTP.cpp 
b/extensions/standard-processors/processors/InvokeHTTP.cpp
index 7104359a8..19f0f4361 100644
--- a/extensions/standard-processors/processors/InvokeHTTP.cpp
+++ b/extensions/standard-processors/processors/InvokeHTTP.cpp
@@ -35,6 +35,45 @@
 #include "range/v3/algorithm/any_of.hpp"
 
 namespace org::apache::nifi::minifi::processors {
+namespace invoke_http {
+
+HttpClientStore::HttpClientWrapper HttpClientStore::getClient(const 
std::string& url) {
+  std::unique_lock lock(clients_mutex_);
+  const auto it = std::find_if(std::begin(unused_clients_), 
std::end(unused_clients_), [&url](const auto& client) {
+    return client->getURL() == url;
+  });
+  if (it != std::end(unused_clients_)) {
+    used_clients_.splice(used_clients_.end(), unused_clients_, it);
+    return {*this, **it};
+  }
+
+  if (used_clients_.size() + unused_clients_.size() < max_size_) {
+    auto client = create_client_function_(url);
+    used_clients_.push_back(std::move(client));
+    return {*this, *used_clients_.back()};
+  } else {
+    cv_.wait(lock, [this] { return !unused_clients_.empty(); });
+    auto client = create_client_function_(url);
+    unused_clients_.front() = std::move(client);
+    used_clients_.splice(used_clients_.end(), unused_clients_, 
unused_clients_.begin());
+    return {*this, *used_clients_.back()};
+  }
+}
+
+void HttpClientStore::returnClient(http::HTTPClient& client) {
+  std::unique_lock lock(clients_mutex_);
+  const auto it = std::find_if(std::begin(used_clients_), 
std::end(used_clients_),
+    [&client](const auto& elem) { return &client == elem.get(); });
+  if (it == std::end(used_clients_)) {
+    logger_->log_error("Couldn't find HTTP client in client store to be 
returned");
+    return;
+  }
+  unused_clients_.splice(unused_clients_.end(), used_clients_, it);
+  lock.unlock();
+  cv_.notify_one();
+}
+
+}  // namespace invoke_http
 
 std::string InvokeHTTP::DefaultContentType = "application/octet-stream";
 
@@ -64,8 +103,9 @@ void setupClientTransferEncoding(http::HTTPClient& client, 
bool use_chunked_enco
 }  // namespace
 
 void InvokeHTTP::setupMembersFromProperties(const core::ProcessContext& 
context) {
-  if (!context.getProperty(URL, url_))
-    throw Exception(PROCESS_SCHEDULE_EXCEPTION, "URL property missing or 
invalid");
+  std::string url;
+  if (!context.getProperty(URL, url) || url.empty())
+    throw Exception(PROCESS_SCHEDULE_EXCEPTION, "URL property missing or 
empty");
 
   method_ = utils::parseEnumProperty<http::HttpRequestMethod>(context, Method);
 
@@ -121,9 +161,9 @@ void InvokeHTTP::setupMembersFromProperties(const 
core::ProcessContext& context)
   }
 }
 
-std::unique_ptr<minifi::http::HTTPClient> 
InvokeHTTP::createHTTPClientFromMembers() const {
-  auto client = std::make_unique<minifi::http::HTTPClient>();
-  client->initialize(method_, url_, ssl_context_service_);
+gsl::not_null<std::unique_ptr<http::HTTPClient>> 
InvokeHTTP::createHTTPClientFromMembers(const std::string& url) const {
+  auto client = std::make_unique<http::HTTPClient>();
+  client->initialize(method_, url, ssl_context_service_);
   setupClientTimeouts(*client, connect_timeout_, read_timeout_);
   client->setHTTPProxy(proxy_);
   client->setFollowRedirects(follow_redirects_);
@@ -133,18 +173,18 @@ std::unique_ptr<minifi::http::HTTPClient> 
InvokeHTTP::createHTTPClientFromMember
   client->setMaximumUploadSpeed(maximum_upload_speed_.getValue());
   client->setMaximumDownloadSpeed(maximum_download_speed_.getValue());
 
-  return client;
+  return gsl::make_not_null(std::move(client));
 }
 
 
 void InvokeHTTP::onSchedule(core::ProcessContext& context, 
core::ProcessSessionFactory&) {
   setupMembersFromProperties(context);
 
-  auto create_client = [this]() -> std::unique_ptr<minifi::http::HTTPClient> {
-    return createHTTPClientFromMembers();
+  auto create_client = [this](const std::string& url) -> 
gsl::not_null<std::unique_ptr<minifi::http::HTTPClient>> {
+    return createHTTPClientFromMembers(url);
   };
 
-  client_queue_ = 
utils::ResourceQueue<http::HTTPClient>::create(create_client, 
getMaxConcurrentTasks(), std::nullopt, logger_);
+  client_queue_ = 
std::make_unique<invoke_http::HttpClientStore>(getMaxConcurrentTasks() * 2, 
create_client);
 }
 
 bool InvokeHTTP::shouldEmitFlowFile() const {
@@ -202,9 +242,15 @@ void InvokeHTTP::onTrigger(core::ProcessContext& context, 
core::ProcessSession&
     logger_->log_debug("InvokeHTTP -- Received flowfile");
   }
 
-  auto client = client_queue_->getResource();
+  auto url = context.getProperty(URL, flow_file.get());
+  if (!url || url->empty()) {
+    logger_->log_error("InvokeHTTP -- URL is empty, transferring to failure");
+    session.transfer(flow_file, RelFailure);
+    return;
+  }
 
-  onTriggerWithClient(context, session, flow_file, *client);
+  auto client = client_queue_->getClient(*url);
+  onTriggerWithClient(context, session, flow_file, client.get());
 }
 
 void InvokeHTTP::onTriggerWithClient(core::ProcessContext& context, 
core::ProcessSession& session,
diff --git a/extensions/standard-processors/processors/InvokeHTTP.h 
b/extensions/standard-processors/processors/InvokeHTTP.h
index bb5e9d283..46caabdf6 100644
--- a/extensions/standard-processors/processors/InvokeHTTP.h
+++ b/extensions/standard-processors/processors/InvokeHTTP.h
@@ -23,6 +23,8 @@
 #include <string>
 #include <unordered_map>
 #include <utility>
+#include <optional>
+#include <list>
 
 #include "Core.h"
 #include "FlowFileRecord.h"
@@ -41,9 +43,62 @@
 #include "utils/Enum.h"
 #include "utils/RegexUtils.h"
 
+namespace org::apache::nifi::minifi::test {
+class HttpClientStoreTestAccessor;
+}
+
 namespace org::apache::nifi::minifi::processors {
 
 namespace invoke_http {
+class HttpClientStore {
+ public:
+  HttpClientStore(const size_t size, 
std::function<gsl::not_null<std::unique_ptr<http::HTTPClient>>(const 
std::string&)> create_client_function)
+      : max_size_(size),
+        create_client_function_(std::move(create_client_function)) {
+  }
+  HttpClientStore(const HttpClientStore&) = delete;
+  HttpClientStore& operator=(const HttpClientStore&) = delete;
+  HttpClientStore(HttpClientStore&&) = delete;
+  HttpClientStore& operator=(HttpClientStore&&) = delete;
+  ~HttpClientStore() = default;
+
+  class HttpClientWrapper {
+   public:
+    HttpClientWrapper(HttpClientStore& client_store, http::HTTPClient& client)
+      :client_(client), client_store_(client_store) { }
+
+    HttpClientWrapper(const HttpClientWrapper&) = delete;
+    HttpClientWrapper& operator=(const HttpClientWrapper&) = delete;
+    HttpClientWrapper(HttpClientWrapper&& src) = delete;
+    HttpClientWrapper& operator=(HttpClientWrapper&& src) = delete;
+    ~HttpClientWrapper() {
+      client_store_.returnClient(client_);
+    }
+
+    [[nodiscard]] http::HTTPClient& get() const {
+      return client_;
+    }
+
+   private:
+    http::HTTPClient& client_;
+    HttpClientStore& client_store_;
+  };
+
+  [[nodiscard]] HttpClientWrapper getClient(const std::string& url);
+
+ private:
+  friend class test::HttpClientStoreTestAccessor;
+  void returnClient(http::HTTPClient& client);
+
+  std::mutex clients_mutex_;
+  std::condition_variable cv_;
+  const size_t max_size_;
+  std::list<gsl::not_null<std::unique_ptr<http::HTTPClient>>> used_clients_;
+  std::list<gsl::not_null<std::unique_ptr<http::HTTPClient>>> unused_clients_;
+  std::function<gsl::not_null<std::unique_ptr<http::HTTPClient>>(const 
std::string&)> create_client_function_;
+  std::shared_ptr<core::logging::Logger> 
logger_{core::logging::LoggerFactory<HttpClientWrapper>::getLogger()};
+};
+
 enum class InvalidHTTPHeaderFieldHandlingOption {
     fail,
     transform,
@@ -64,8 +119,8 @@ class InvokeHTTP : public core::Processor {
   }
 
   EXTENSIONAPI static constexpr const char* Description = "An HTTP client 
processor which can interact with a configurable HTTP Endpoint. "
-                                                          "The destination URL 
and HTTP Method are configurable. FlowFile attributes are converted to HTTP 
headers and the "
-                                                          "FlowFile contents 
are included as the body of the request (if the HTTP Method is PUT, POST or 
PATCH).";
+      "The destination URL and HTTP Method are configurable. FlowFile 
attributes are converted to HTTP headers and the "
+      "FlowFile contents are included as the body of the request (if the HTTP 
Method is PUT, POST or PATCH).";
 
   EXTENSIONAPI static constexpr auto Method = 
core::PropertyDefinitionBuilder<magic_enum::enum_count<http::HttpRequestMethod>()>::createProperty("HTTP
 Method")
       .withDescription("HTTP request method. Methods other than POST, PUT and 
PATCH will be sent without a message body.")
@@ -106,7 +161,7 @@ class InvokeHTTP : public core::Processor {
   EXTENSIONAPI static constexpr auto SSLContext = 
core::PropertyDefinitionBuilder<0, 0, 1>::createProperty("SSL Context Service")
       .withDescription("The SSL Context Service used to provide client 
certificate information for TLS/SSL (https) connections.")
       .isRequired(false)
-      .withAllowedTypes<minifi::controllers::SSLContextService>()
+      .withAllowedTypes<controllers::SSLContextService>()
       .withExclusiveOfProperties({{{"Remote URL", "^http:.*$"}}})
       .build();
   EXTENSIONAPI static constexpr auto ProxyHost = 
core::PropertyDefinitionBuilder<>::createProperty("Proxy Host")
@@ -268,15 +323,15 @@ class InvokeHTTP : public core::Processor {
 
  private:
   void route(const std::shared_ptr<core::FlowFile>& request, const 
std::shared_ptr<core::FlowFile>& response, core::ProcessSession& session,
-             core::ProcessContext& context, bool is_success, int64_t 
status_code);
+      core::ProcessContext& context, bool is_success, int64_t status_code);
   [[nodiscard]] bool shouldEmitFlowFile() const;
   void onTriggerWithClient(core::ProcessContext& context, 
core::ProcessSession& session,
-                           const std::shared_ptr<core::FlowFile>& flow_file, 
minifi::http::HTTPClient& client);
+      const std::shared_ptr<core::FlowFile>& flow_file, http::HTTPClient& 
client);
   [[nodiscard]] bool appendHeaders(const core::FlowFile& flow_file, 
/*std::invocable<std::string, std::string>*/ auto append_header);
 
 
   void setupMembersFromProperties(const core::ProcessContext& context);
-  std::unique_ptr<minifi::http::HTTPClient> createHTTPClientFromMembers() 
const;
+  gsl::not_null<std::unique_ptr<http::HTTPClient>> 
createHTTPClientFromMembers(const std::string& url) const;
 
   http::HttpRequestMethod method_{};
   std::optional<utils::Regex> attributes_to_send_;
@@ -289,7 +344,6 @@ class InvokeHTTP : public core::Processor {
   core::DataTransferSpeedValue maximum_upload_speed_{0};
   core::DataTransferSpeedValue maximum_download_speed_{0};
 
-  std::string url_;
   std::shared_ptr<minifi::controllers::SSLContextService> ssl_context_service_;
 
   std::chrono::milliseconds connect_timeout_{std::chrono::seconds(30)};
@@ -303,7 +357,8 @@ class InvokeHTTP : public core::Processor {
   invoke_http::InvalidHTTPHeaderFieldHandlingOption 
invalid_http_header_field_handling_strategy_{};
 
   std::shared_ptr<core::logging::Logger> 
logger_{core::logging::LoggerFactory<InvokeHTTP>::getLogger(uuid_)};
-  std::shared_ptr<utils::ResourceQueue<http::HTTPClient>> client_queue_;
+
+  std::unique_ptr<invoke_http::HttpClientStore> client_queue_;
 };
 
 }  // namespace org::apache::nifi::minifi::processors
diff --git a/extensions/standard-processors/tests/unit/HttpClientStoreTests.cpp 
b/extensions/standard-processors/tests/unit/HttpClientStoreTests.cpp
new file mode 100644
index 000000000..c3bc7f985
--- /dev/null
+++ b/extensions/standard-processors/tests/unit/HttpClientStoreTests.cpp
@@ -0,0 +1,219 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include <thread>
+
+#include "unit/TestBase.h"
+#include "unit/Catch.h"
+#include "InvokeHTTP.h"
+#include "http/BaseHTTPClient.h"
+
+using namespace std::literals::chrono_literals;
+
+namespace org::apache::nifi::minifi::test {
+
+class HttpClientStoreTestAccessor {
+ public:
+  static const 
std::list<gsl::not_null<std::unique_ptr<minifi::http::HTTPClient>>>& 
getUsedClients(minifi::processors::invoke_http::HttpClientStore& store) {
+    return store.used_clients_;
+  }
+
+  static const 
std::list<gsl::not_null<std::unique_ptr<minifi::http::HTTPClient>>>& 
getUnusedClients(minifi::processors::invoke_http::HttpClientStore& store) {
+    return store.unused_clients_;
+  }
+
+  static size_t getMaxSize(minifi::processors::invoke_http::HttpClientStore& 
store) {
+    return store.max_size_;
+  }
+};
+
+TEST_CASE("HttpClientStore can create new client for a url and is returned 
after it's not used anymore") {
+  minifi::processors::invoke_http::HttpClientStore store(2, [](const 
std::string& url) {
+    auto client = std::make_unique<minifi::http::HTTPClient>();
+    client->initialize(minifi::http::HttpRequestMethod::GET, url, nullptr);
+    return gsl::make_not_null(std::move(client));
+  });
+  REQUIRE(HttpClientStoreTestAccessor::getMaxSize(store) == 2);
+  const auto& used_clients = 
HttpClientStoreTestAccessor::getUsedClients(store);
+  const auto& unused_clients = 
HttpClientStoreTestAccessor::getUnusedClients(store);
+  REQUIRE(used_clients.empty());
+  REQUIRE(unused_clients.empty());
+  {
+    [[maybe_unused]] auto client = store.getClient("http://localhost:8080";);
+    REQUIRE(used_clients.size() == 1);
+    REQUIRE(unused_clients.empty());
+  }
+
+  REQUIRE(used_clients.empty());
+  REQUIRE(unused_clients.size() == 1);
+}
+
+TEST_CASE("A http client can be reused") {
+  minifi::processors::invoke_http::HttpClientStore store(2, [](const 
std::string& url) {
+    auto client = std::make_unique<minifi::http::HTTPClient>();
+    client->initialize(minifi::http::HttpRequestMethod::GET, url, nullptr);
+    return gsl::make_not_null(std::move(client));
+  });
+  minifi::http::HTTPClient* client_ptr = nullptr;
+  const auto& used_clients = 
HttpClientStoreTestAccessor::getUsedClients(store);
+  const auto& unused_clients = 
HttpClientStoreTestAccessor::getUnusedClients(store);
+  {
+    auto client = store.getClient("http://localhost:8080";);
+    client_ptr = &client.get();
+  }
+
+  {
+    auto client = store.getClient("http://localhost:8080";);
+    REQUIRE(&client.get() == client_ptr);
+    REQUIRE(used_clients.size() == 1);
+    REQUIRE(unused_clients.empty());
+  }
+  REQUIRE(used_clients.empty());
+  REQUIRE(unused_clients.size() == 1);
+}
+
+TEST_CASE("A new url always creates a new client") {
+  minifi::processors::invoke_http::HttpClientStore store(3, [](const 
std::string& url) {
+    auto client = std::make_unique<minifi::http::HTTPClient>();
+    client->initialize(minifi::http::HttpRequestMethod::GET, url, nullptr);
+    return gsl::make_not_null(std::move(client));
+  });
+  const auto& used_clients = 
HttpClientStoreTestAccessor::getUsedClients(store);
+  const auto& unused_clients = 
HttpClientStoreTestAccessor::getUnusedClients(store);
+  {
+    [[maybe_unused]] auto client1 = store.getClient("http://localhost:8080";);
+    [[maybe_unused]] auto client2 = store.getClient("http://localhost:8081";);
+    [[maybe_unused]] auto client3 = store.getClient("http://localhost:8082";);
+    REQUIRE(used_clients.size() == 3);
+    REQUIRE(unused_clients.empty());
+    CHECK(used_clients.front()->getURL() == "http://localhost:8080";);
+    CHECK((*std::next(used_clients.begin(), 1))->getURL() == 
"http://localhost:8081";);
+    CHECK((*std::next(used_clients.begin(), 2))->getURL() == 
"http://localhost:8082";);
+  }
+  REQUIRE(used_clients.empty());
+  REQUIRE(unused_clients.size() == 3);
+  CHECK(unused_clients.front()->getURL() == "http://localhost:8082";);
+  CHECK((*std::next(unused_clients.begin(), 1))->getURL() == 
"http://localhost:8081";);
+  CHECK((*std::next(unused_clients.begin(), 2))->getURL() == 
"http://localhost:8080";);
+}
+
+TEST_CASE("If a store is full, the first unused client is replaced by the 
newly requested one") {
+  minifi::processors::invoke_http::HttpClientStore store(3, [](const 
std::string& url) {
+    auto client = std::make_unique<minifi::http::HTTPClient>();
+    client->initialize(minifi::http::HttpRequestMethod::GET, url, nullptr);
+    return gsl::make_not_null(std::move(client));
+  });
+  const auto& used_clients = 
HttpClientStoreTestAccessor::getUsedClients(store);
+  const auto& unused_clients = 
HttpClientStoreTestAccessor::getUnusedClients(store);
+  {
+    [[maybe_unused]] auto client1 = store.getClient("http://localhost:8080";);
+    {
+      [[maybe_unused]] auto client2 = store.getClient("http://localhost:8081";);
+    }
+    [[maybe_unused]] auto client3 = store.getClient("http://localhost:8082";);
+    REQUIRE(used_clients.size() == 2);
+    REQUIRE(unused_clients.size() == 1);
+    CHECK(unused_clients.front()->getURL() == "http://localhost:8081";);
+    CHECK(used_clients.front()->getURL() == "http://localhost:8080";);
+    CHECK((*std::next(used_clients.begin(), 1))->getURL() == 
"http://localhost:8082";);
+
+    [[maybe_unused]] auto client4 = store.getClient("http://localhost:8083";);
+    REQUIRE(used_clients.size() == 3);
+    REQUIRE(unused_clients.empty());
+    CHECK(used_clients.front()->getURL() == "http://localhost:8080";);
+    CHECK((*std::next(used_clients.begin(), 1))->getURL() == 
"http://localhost:8082";);
+    CHECK((*std::next(used_clients.begin(), 2))->getURL() == 
"http://localhost:8083";);
+  }
+  REQUIRE(used_clients.empty());
+  REQUIRE(unused_clients.size() == 3);
+  CHECK(unused_clients.front()->getURL() == "http://localhost:8083";);
+  CHECK((*std::next(unused_clients.begin(), 1))->getURL() == 
"http://localhost:8082";);
+  CHECK((*std::next(unused_clients.begin(), 2))->getURL() == 
"http://localhost:8080";);
+}
+
+TEST_CASE("Multiple unused clients are present the oldest one is replaced") {
+  minifi::processors::invoke_http::HttpClientStore store(4, [](const 
std::string& url) {
+    auto client = std::make_unique<minifi::http::HTTPClient>();
+    client->initialize(minifi::http::HttpRequestMethod::GET, url, nullptr);
+    return gsl::make_not_null(std::move(client));
+  });
+  const auto& used_clients = 
HttpClientStoreTestAccessor::getUsedClients(store);
+  const auto& unused_clients = 
HttpClientStoreTestAccessor::getUnusedClients(store);
+  {
+    [[maybe_unused]] auto client1 = store.getClient("http://localhost:8080";);
+    {
+      [[maybe_unused]] auto client2 = store.getClient("http://localhost:8081";);
+      [[maybe_unused]] auto client3 = store.getClient("http://localhost:8082";);
+    }
+    [[maybe_unused]] auto client4 = store.getClient("http://localhost:8083";);
+    REQUIRE(used_clients.size() == 2);
+    REQUIRE(unused_clients.size() == 2);
+    CHECK(unused_clients.front()->getURL() == "http://localhost:8082";);
+    CHECK((*std::next(unused_clients.begin(), 1))->getURL() == 
"http://localhost:8081";);
+    CHECK(used_clients.front()->getURL() == "http://localhost:8080";);
+    CHECK((*std::next(used_clients.begin(), 1))->getURL() == 
"http://localhost:8083";);
+
+    [[maybe_unused]] auto client5 = store.getClient("http://localhost:8084";);
+    CHECK(unused_clients.front()->getURL() == "http://localhost:8081";);
+
+    CHECK(used_clients.front()->getURL() == "http://localhost:8080";);
+    CHECK((*std::next(used_clients.begin(), 1))->getURL() == 
"http://localhost:8083";);
+    CHECK((*std::next(used_clients.begin(), 2))->getURL() == 
"http://localhost:8084";);
+  }
+  REQUIRE(used_clients.empty());
+  REQUIRE(unused_clients.size() == 4);
+  CHECK(unused_clients.front()->getURL() == "http://localhost:8081";);
+  CHECK((*std::next(unused_clients.begin(), 1))->getURL() == 
"http://localhost:8084";);
+  CHECK((*std::next(unused_clients.begin(), 2))->getURL() == 
"http://localhost:8083";);
+  CHECK((*std::next(unused_clients.begin(), 3))->getURL() == 
"http://localhost:8080";);
+}
+
+TEST_CASE("If all clients are in use, the call will block until a client is 
returned") {
+  minifi::processors::invoke_http::HttpClientStore store(2, [](const 
std::string& url) {
+    auto client = std::make_unique<minifi::http::HTTPClient>();
+    client->initialize(minifi::http::HttpRequestMethod::GET, url, nullptr);
+    return gsl::make_not_null(std::move(client));
+  });
+  bool client2_created{false};
+  std::mutex mutex;
+  std::condition_variable client2_created_cv;
+  [[maybe_unused]] auto client1 = store.getClient("http://localhost:8080";);
+
+  std::thread thread1([&store, &mutex, &client2_created, &client2_created_cv] {
+    std::unique_lock lock(mutex);
+    [[maybe_unused]] auto client2 = store.getClient("http://localhost:8081";);
+    client2_created = true;
+    lock.unlock();
+    client2_created_cv.notify_one();
+    std::this_thread::sleep_for(300ms);
+  });
+
+  std::thread thread2([&store, &mutex, &client2_created, &client2_created_cv] {
+    std::unique_lock lock(mutex);
+    client2_created_cv.wait(lock, [&client2_created] { return client2_created; 
});
+    [[maybe_unused]] auto client3 = store.getClient("http://localhost:8082";);
+    auto& used_clients = HttpClientStoreTestAccessor::getUsedClients(store);
+    REQUIRE(used_clients.size() == 2);
+    CHECK(used_clients.front()->getURL() == "http://localhost:8080";);
+    CHECK((*std::next(used_clients.begin(), 1))->getURL() == 
"http://localhost:8082";);
+  });
+
+  thread1.join();
+  thread2.join();
+}
+
+}  // namespace org::apache::nifi::minifi::test

Reply via email to