This is an automated email from the ASF dual-hosted git repository. fgerlits pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git
commit e6d8271870c37a4dcea87a35b9b70e5e5b178ee5 Author: Martin Zink <[email protected]> AuthorDate: Tue Nov 29 13:10:00 2022 +0100 MINIFICPP-1973 Refactor ResourceQueue Signed-off-by: Ferenc Gerlits <[email protected]> This closes #1473 --- extensions/http-curl/processors/InvokeHTTP.cpp | 15 +++-- extensions/script/ExecuteScript.cpp | 10 ++-- extensions/splunk/PutSplunkHTTP.cpp | 48 ++++++++-------- libminifi/include/utils/ResourceQueue.h | 32 ++++++++--- libminifi/test/unit/ResourceQueueTests.cpp | 77 ++++++++++++++++++-------- 5 files changed, 117 insertions(+), 65 deletions(-) diff --git a/extensions/http-curl/processors/InvokeHTTP.cpp b/extensions/http-curl/processors/InvokeHTTP.cpp index b7397badd..9e803eec1 100644 --- a/extensions/http-curl/processors/InvokeHTTP.cpp +++ b/extensions/http-curl/processors/InvokeHTTP.cpp @@ -263,7 +263,15 @@ void InvokeHTTP::onSchedule(const std::shared_ptr<core::ProcessContext>& context gsl_Expects(context); setupMembersFromProperties(*context); - client_queue_ = utils::ResourceQueue<extensions::curl::HTTPClient>::create(getMaxConcurrentTasks(), logger_); + std::weak_ptr<core::ProcessContext> weak_context = context; + auto create_client = [this, weak_context]() -> std::unique_ptr<minifi::extensions::curl::HTTPClient> { + if (auto context = weak_context.lock()) + return createHTTPClientFromPropertiesAndMembers(*context); + else + return nullptr; + }; + + client_queue_ = utils::ResourceQueue<extensions::curl::HTTPClient>::create(create_client, getMaxConcurrentTasks(), std::nullopt, logger_); } bool InvokeHTTP::shouldEmitFlowFile(minifi::extensions::curl::HTTPClient& client) { @@ -306,11 +314,8 @@ bool InvokeHTTP::appendHeaders(const core::FlowFile& flow_file, /*std::invocable void InvokeHTTP::onTrigger(const std::shared_ptr<core::ProcessContext>& context, const std::shared_ptr<core::ProcessSession>& session) { gsl_Expects(session && context && client_queue_); - auto create_client = [&]() -> std::unique_ptr<minifi::extensions::curl::HTTPClient> { - return createHTTPClientFromPropertiesAndMembers(*context); - }; - auto client = client_queue_->getResource(create_client); + auto client = client_queue_->getResource(); onTriggerWithClient(context, session, *client); } diff --git a/extensions/script/ExecuteScript.cpp b/extensions/script/ExecuteScript.cpp index 28717c122..7124abbba 100644 --- a/extensions/script/ExecuteScript.cpp +++ b/extensions/script/ExecuteScript.cpp @@ -71,7 +71,10 @@ void ExecuteScript::initialize() { void ExecuteScript::onSchedule(core::ProcessContext *context, core::ProcessSessionFactory* /*sessionFactory*/) { #ifdef LUA_SUPPORT - lua_script_engine_queue_ = utils::ResourceQueue<lua::LuaScriptEngine>::create(getMaxConcurrentTasks(), logger_); + auto create_engine = [this]() -> std::unique_ptr<lua::LuaScriptEngine> { + return engine_factory_.createEngine<lua::LuaScriptEngine>(); + }; + lua_script_engine_queue_ = utils::ResourceQueue<lua::LuaScriptEngine>::create(create_engine, getMaxConcurrentTasks(), std::nullopt, logger_); #endif // LUA_SUPPORT #ifdef PYTHON_SUPPORT python_script_engine_ = engine_factory_.createEngine<python::PythonScriptEngine>(); @@ -114,11 +117,8 @@ void ExecuteScript::onTrigger(const std::shared_ptr<core::ProcessContext> &conte } else if (script_engine_ == ScriptEngineOption::LUA) { #ifdef LUA_SUPPORT gsl_Expects(lua_script_engine_queue_); - auto create_engine = [&]() -> std::unique_ptr<lua::LuaScriptEngine> { - return engine_factory_.createEngine<lua::LuaScriptEngine>(); - }; - lua_script_engine.emplace(lua_script_engine_queue_->getResource(create_engine)); + lua_script_engine.emplace(lua_script_engine_queue_->getResource()); engine = lua_script_engine->get(); #else throw std::runtime_error("Lua support is disabled in this build."); diff --git a/extensions/splunk/PutSplunkHTTP.cpp b/extensions/splunk/PutSplunkHTTP.cpp index 0969d836e..f51f4f3af 100644 --- a/extensions/splunk/PutSplunkHTTP.cpp +++ b/extensions/splunk/PutSplunkHTTP.cpp @@ -40,32 +40,26 @@ void PutSplunkHTTP::initialize() { setSupportedRelationships(relationships()); } -void PutSplunkHTTP::onSchedule(const std::shared_ptr<core::ProcessContext>& context, const std::shared_ptr<core::ProcessSessionFactory>& sessionFactory) { - SplunkHECProcessor::onSchedule(context, sessionFactory); - client_queue_ = utils::ResourceQueue<extensions::curl::HTTPClient>::create(getMaxConcurrentTasks(), logger_); -} - namespace { std::optional<std::string> getContentType(core::ProcessContext& context, const core::FlowFile& flow_file) { return context.getProperty(PutSplunkHTTP::ContentType) | utils::orElse ([&flow_file] {return flow_file.getAttribute("mime.type");}); } -std::string getEndpoint(core::ProcessContext& context, const gsl::not_null<std::shared_ptr<core::FlowFile>>& flow_file, curl::HTTPClient& client) { +std::string getEndpoint(core::ProcessContext& context, curl::HTTPClient& client) { std::stringstream endpoint; endpoint << "/services/collector/raw"; std::vector<std::string> parameters; - std::string prop_value; - if (context.getProperty(PutSplunkHTTP::SourceType, prop_value, flow_file)) { - parameters.push_back("sourcetype=" + client.escape(prop_value)); + if (auto source_type = context.getProperty(PutSplunkHTTP::SourceType)) { + parameters.push_back("sourcetype=" + client.escape(*source_type)); } - if (context.getProperty(PutSplunkHTTP::Source, prop_value, flow_file)) { - parameters.push_back("source=" + client.escape(prop_value)); + if (auto source = context.getProperty(PutSplunkHTTP::Source)) { + parameters.push_back("source=" + client.escape(*source)); } - if (context.getProperty(PutSplunkHTTP::Host, prop_value, flow_file)) { - parameters.push_back("host=" + client.escape(prop_value)); + if (auto host = context.getProperty(PutSplunkHTTP::Host)) { + parameters.push_back("host=" + client.escape(*host)); } - if (context.getProperty(PutSplunkHTTP::Index, prop_value, flow_file)) { - parameters.push_back("index=" + client.escape(prop_value)); + if (auto index = context.getProperty(PutSplunkHTTP::Index)) { + parameters.push_back("index=" + client.escape(*index)); } if (!parameters.empty()) { endpoint << "?" << utils::StringUtils::join("&", parameters); @@ -117,6 +111,21 @@ void setFlowFileAsPayload(core::ProcessSession& session, } } // namespace +void PutSplunkHTTP::onSchedule(const std::shared_ptr<core::ProcessContext>& context, const std::shared_ptr<core::ProcessSessionFactory>& sessionFactory) { + SplunkHECProcessor::onSchedule(context, sessionFactory); + std::weak_ptr<core::ProcessContext> weak_context = context; + auto create_client = [this, weak_context]() -> std::unique_ptr<minifi::extensions::curl::HTTPClient> { + if (auto context = weak_context.lock()) { + auto client = std::make_unique<curl::HTTPClient>(); + initializeClient(*client, getNetworkLocation().append(getEndpoint(*context, *client)), getSSLContextService(*context)); + return client; + } + return nullptr; + }; + + client_queue_ = utils::ResourceQueue<extensions::curl::HTTPClient>::create(create_client, getMaxConcurrentTasks(), std::nullopt, logger_); +} + void PutSplunkHTTP::onTrigger(const std::shared_ptr<core::ProcessContext>& context, const std::shared_ptr<core::ProcessSession>& session) { gsl_Expects(context && session && client_queue_); @@ -127,13 +136,7 @@ void PutSplunkHTTP::onTrigger(const std::shared_ptr<core::ProcessContext>& conte } auto flow_file = gsl::not_null(std::move(ff)); - auto create_client = [&]() -> std::unique_ptr<minifi::extensions::curl::HTTPClient> { - auto client = std::make_unique<curl::HTTPClient>(); - initializeClient(*client, getNetworkLocation().append(getEndpoint(*context, flow_file, *client)), getSSLContextService(*context)); - return client; - }; - - auto client = client_queue_->getResource(create_client); + auto client = client_queue_->getResource(); setFlowFileAsPayload(*session, *context, *client, flow_file); @@ -145,4 +148,3 @@ void PutSplunkHTTP::onTrigger(const std::shared_ptr<core::ProcessContext>& conte } } // namespace org::apache::nifi::minifi::extensions::splunk - diff --git a/libminifi/include/utils/ResourceQueue.h b/libminifi/include/utils/ResourceQueue.h index acb5be4fa..7385c18c4 100644 --- a/libminifi/include/utils/ResourceQueue.h +++ b/libminifi/include/utils/ResourceQueue.h @@ -65,10 +65,12 @@ class ResourceQueue : public std::enable_shared_from_this<ResourceQueue<Resource std::unique_ptr<ResourceType> resource_; }; - static auto create(std::optional<size_t> maximum_number_of_creatable_resources, std::shared_ptr<core::logging::Logger> logger); + static auto create(std::function<std::unique_ptr<ResourceType>()> creator, + std::optional<size_t> maximum_number_of_creatable_resources = std::nullopt, + std::optional<std::function<void(ResourceType&)>> reset_fn = std::nullopt, + std::shared_ptr<core::logging::Logger> logger = nullptr); - template<typename Fn> - [[nodiscard]] std::enable_if_t<std::is_invocable_v<std::unique_ptr<ResourceType>()>, ResourceWrapper> getResource(const Fn& create_resource) { + [[nodiscard]] ResourceWrapper getResource() { std::unique_ptr<ResourceType> resource; // Use an existing resource, if one is available if (internal_queue_.tryDequeue(resource)) { @@ -78,7 +80,7 @@ class ResourceQueue : public std::enable_shared_from_this<ResourceQueue<Resource const std::lock_guard<std::mutex> lock(counter_mutex_); if (!maximum_number_of_creatable_resources_ || resources_created_ < maximum_number_of_creatable_resources_) { ++resources_created_; - resource = create_resource(); + resource = create_new_resource_(); logDebug("Created new [%p] resource instance. Number of instances: %d%s.", resource.get(), resources_created_, @@ -94,14 +96,21 @@ class ResourceQueue : public std::enable_shared_from_this<ResourceQueue<Resource } protected: - ResourceQueue(std::optional<size_t> maximum_number_of_creatable_resources, std::shared_ptr<core::logging::Logger> logger) - : maximum_number_of_creatable_resources_(maximum_number_of_creatable_resources), + ResourceQueue(std::function<std::unique_ptr<ResourceType>()> create_new_resource, + std::optional<size_t> maximum_number_of_creatable_resources, + std::optional<std::function<void(ResourceType&)>> reset_fn, + std::shared_ptr<core::logging::Logger> logger) + : create_new_resource_(std::move(create_new_resource)), + maximum_number_of_creatable_resources_(maximum_number_of_creatable_resources), + reset_fn_(std::move(reset_fn)), logger_(std::move(logger)) { } private: void returnResource(std::unique_ptr<ResourceType> resource) { logDebug("Returning [%p] resource", resource.get()); + if (reset_fn_) + reset_fn_.value()(*resource); internal_queue_.enqueue(std::move(resource)); } @@ -111,8 +120,10 @@ class ResourceQueue : public std::enable_shared_from_this<ResourceQueue<Resource logger_->log_debug(format, std::forward<Args>(args)...); } + const std::function<std::unique_ptr<ResourceType>()> create_new_resource_; const std::optional<size_t> maximum_number_of_creatable_resources_; - std::shared_ptr<core::logging::Logger> logger_; + const std::optional<std::function<void(ResourceType&)>> reset_fn_; + const std::shared_ptr<core::logging::Logger> logger_; ConditionConcurrentQueue<std::unique_ptr<ResourceType>> internal_queue_; size_t resources_created_ = 0; std::mutex counter_mutex_; @@ -126,7 +137,10 @@ struct ResourceQueue<ResourceType>::make_shared_enabler : public ResourceQueue<R }; template<class ResourceType> -auto ResourceQueue<ResourceType>::create(std::optional<size_t> maximum_number_of_creatable_resources, std::shared_ptr<core::logging::Logger> logger) { - return std::make_shared<make_shared_enabler>(maximum_number_of_creatable_resources, std::move(logger)); +auto ResourceQueue<ResourceType>::create(std::function<std::unique_ptr<ResourceType>()> creator, + std::optional<size_t> maximum_number_of_creatable_resources, + std::optional<std::function<void(ResourceType&)>> reset_fn, + std::shared_ptr<core::logging::Logger> logger) { + return std::make_shared<make_shared_enabler>(std::move(creator), maximum_number_of_creatable_resources, std::move(reset_fn), std::move(logger)); } } // namespace org::apache::nifi::minifi::utils diff --git a/libminifi/test/unit/ResourceQueueTests.cpp b/libminifi/test/unit/ResourceQueueTests.cpp index 526050477..8266d3b50 100644 --- a/libminifi/test/unit/ResourceQueueTests.cpp +++ b/libminifi/test/unit/ResourceQueueTests.cpp @@ -29,23 +29,26 @@ using namespace std::literals::chrono_literals; namespace org::apache::nifi::minifi::utils::testing { TEST_CASE("Limiting resource queue to a maximum of 2 resources", "[utils::ResourceQueue]") { - std::shared_ptr<core::logging::Logger> logger_{core::logging::LoggerFactory<ResourceQueue<int>>::getLogger()}; - LogTestController::getInstance().setTrace<ResourceQueue<int>>(); + using std::chrono::steady_clock; + + std::shared_ptr<core::logging::Logger> logger{core::logging::LoggerFactory<ResourceQueue<steady_clock::time_point>>::getLogger()}; + + LogTestController::getInstance().setTrace<ResourceQueue<steady_clock::time_point>>(); std::mutex resources_created_mutex; - std::set<int> resources_created; + std::set<steady_clock::time_point> resources_created; - auto worker = [&](int value, const std::shared_ptr<ResourceQueue<int>>& resource_queue) { - auto resource = resource_queue->getResource([value]{return std::make_unique<int>(value);}); + auto worker = [&](const std::shared_ptr<ResourceQueue<steady_clock::time_point>>& resource_queue) { + auto resource = resource_queue->getResource(); std::this_thread::sleep_for(10ms); std::lock_guard<std::mutex> lock(resources_created_mutex); resources_created.emplace(*resource); }; - auto resource_queue = ResourceQueue<int>::create(2, logger_); - std::thread thread_one{[&] { worker(1, resource_queue); }}; - std::thread thread_two{[&] { worker(2, resource_queue); }}; - std::thread thread_three{[&] { worker(3, resource_queue); }}; + auto resource_queue = utils::ResourceQueue<steady_clock::time_point>::create([]{ return std::make_unique<steady_clock::time_point>(steady_clock::now()); }, 2, std::nullopt, logger); + std::thread thread_one{[&] { worker(resource_queue); }}; + std::thread thread_two{[&] { worker(resource_queue); }}; + std::thread thread_three{[&] { worker(resource_queue); }}; thread_one.join(); thread_two.join(); @@ -56,15 +59,17 @@ TEST_CASE("Limiting resource queue to a maximum of 2 resources", "[utils::Resour } TEST_CASE("Resource limitation is not set to the resource queue", "[utils::ResourceQueue]") { - std::shared_ptr<core::logging::Logger> logger_{core::logging::LoggerFactory<ResourceQueue<int>>::getLogger()}; - LogTestController::getInstance().setTrace<ResourceQueue<int>>(); + using std::chrono::steady_clock; + + std::shared_ptr<core::logging::Logger> logger{core::logging::LoggerFactory<ResourceQueue<steady_clock::time_point>>::getLogger()}; + LogTestController::getInstance().setTrace<ResourceQueue<steady_clock::time_point>>(); LogTestController::getInstance().clear(); - auto resource_queue = ResourceQueue<int>::create(std::nullopt, logger_); - std::set<int> resources_created; + auto resource_queue = utils::ResourceQueue<steady_clock::time_point>::create([]{ return std::make_unique<steady_clock::time_point>(steady_clock::now()); }, std::nullopt, std::nullopt, logger); + std::set<steady_clock::time_point> resources_created; - auto resource_wrapper_one = resource_queue->getResource([]{return std::make_unique<int>(1);}); - auto resource_wrapper_two = resource_queue->getResource([]{return std::make_unique<int>(2);}); - auto resource_wrapper_three = resource_queue->getResource([]{return std::make_unique<int>(3);}); + auto resource_wrapper_one = resource_queue->getResource(); + auto resource_wrapper_two = resource_queue->getResource(); + auto resource_wrapper_three = resource_queue->getResource(); resources_created.emplace(*resource_wrapper_one); resources_created.emplace(*resource_wrapper_two); @@ -76,20 +81,46 @@ TEST_CASE("Resource limitation is not set to the resource queue", "[utils::Resou } TEST_CASE("resource returns when it goes out of scope", "[utils::ResourceQueue]") { - auto queue = utils::ResourceQueue<int>::create(std::nullopt, nullptr); + using std::chrono::steady_clock; + auto queue = utils::ResourceQueue<steady_clock::time_point>::create([]{ return std::make_unique<steady_clock::time_point>(steady_clock::time_point::min()); }); + { + auto resource = queue->getResource(); + CHECK(*resource == steady_clock::time_point::min()); + *resource = steady_clock::now(); + } + { + auto resource = queue->getResource(); + CHECK(*resource != steady_clock::time_point::min()); + } +} + +TEST_CASE("resource resets when it goes out of scope", "[utils::ResourceQueue]") { + using std::chrono::steady_clock; + std::shared_ptr<core::logging::Logger> logger{core::logging::LoggerFactory<ResourceQueue<steady_clock::time_point>>::getLogger()}; + LogTestController::getInstance().setTrace<ResourceQueue<steady_clock::time_point>>(); + LogTestController::getInstance().clear(); + auto queue = utils::ResourceQueue<steady_clock::time_point>::create([]{ return std::make_unique<steady_clock::time_point>(steady_clock::time_point::min()); }, + std::nullopt, + [](steady_clock::time_point& resource){ resource = steady_clock::time_point::min();}, + logger); { - auto resource = queue->getResource([] { return std::make_unique<int>(1); }); - CHECK(*resource == 1); + auto resource = queue->getResource(); + CHECK(*resource == steady_clock::time_point::min()); + *resource = steady_clock::now(); } { - auto resource = queue->getResource([] { return std::make_unique<int>(2); }); - CHECK(*resource == 1); + CHECK(LogTestController::getInstance().matchesRegex("Returning .* resource", 0ms)); + auto resource = queue->getResource(); + CHECK(*resource == steady_clock::time_point::min()); + CHECK(LogTestController::getInstance().matchesRegex("Using available .* resource instance", 0ms)); } } TEST_CASE("queue destroyed before resource", "[utils::ResourceQueue]") { - auto queue = utils::ResourceQueue<int>::create(std::nullopt, nullptr); - auto resource = queue->getResource([]{ return std::make_unique<int>(1); }); + using std::chrono::steady_clock; + auto queue = utils::ResourceQueue<steady_clock::time_point>::create([]{ return std::make_unique<steady_clock::time_point>(steady_clock::time_point::min()); }); + auto resource = queue->getResource(); REQUIRE_NOTHROW(queue.reset()); + REQUIRE_NOTHROW(*resource); } } // namespace org::apache::nifi::minifi::utils::testing
