This is an automated email from the ASF dual-hosted git repository. szaszm pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git
commit 3d38c3ac6d0534288c973f27b3c5e8a8a1cc2d9e Author: Ferenc Gerlits <[email protected]> AuthorDate: Thu Jan 22 15:41:31 2026 +0100 MINIFICPP-2693 Do not share ProcessContextImpl objects among threads In the case of multi-threaded processors, give each instance a separate ProcessContextImpl object, so they don't have to share the caches. Closes #2085 Signed-off-by: Marton Szasz <[email protected]> --- libminifi/include/core/ProcessContextImpl.h | 2 +- libminifi/src/ThreadedSchedulingAgent.cpp | 9 +++------ libminifi/src/core/ProcessContextImpl.cpp | 6 ------ libminifi/test/unit/ProcessContextExprTests.cpp | 25 ------------------------- 4 files changed, 4 insertions(+), 38 deletions(-) diff --git a/libminifi/include/core/ProcessContextImpl.h b/libminifi/include/core/ProcessContextImpl.h index 31f6ad188..9017de724 100644 --- a/libminifi/include/core/ProcessContextImpl.h +++ b/libminifi/include/core/ProcessContextImpl.h @@ -206,7 +206,7 @@ class ProcessContextImpl : public core::VariableRegistryImpl, public virtual Pro gsl::not_null<std::shared_ptr<Configure>> configure_; std::unique_ptr<ProcessorInfo> info_; - mutable std::mutex mutex_; + // each ProcessContextImpl instance is only accessed from one thread at a time, so no synchronization is needed on these caches mutable std::unordered_map<std::string, expression::Expression, utils::string::transparent_string_hash, std::equal_to<>> cached_expressions_; mutable std::unordered_map<std::string, expression::Expression, utils::string::transparent_string_hash, std::equal_to<>> cached_dynamic_expressions_; }; diff --git a/libminifi/src/ThreadedSchedulingAgent.cpp b/libminifi/src/ThreadedSchedulingAgent.cpp index 05cbb11e6..c37603e4d 100644 --- a/libminifi/src/ThreadedSchedulingAgent.cpp +++ b/libminifi/src/ThreadedSchedulingAgent.cpp @@ -77,15 +77,12 @@ void ThreadedSchedulingAgent::schedule(core::Processor* processor) { processor->onSchedule(*process_context, *session_factory); - std::vector<std::thread *> threads; - ThreadedSchedulingAgent *agent = this; for (uint8_t i = 0; i < processor->getMaxConcurrentTasks(); i++) { - // reference the disable function from serviceNode processor->incrementActiveTasks(); - - std::function<utils::TaskRescheduleInfo()> f_ex = [agent, processor, process_context, session_factory] () { - return agent->run(processor, process_context, session_factory); + auto thread_process_context = std::make_shared<core::ProcessContextImpl>(*processor, controller_service_provider_, repo_, flow_repo_, configure_, content_repo_); + std::function<utils::TaskRescheduleInfo()> f_ex = [agent, processor, thread_process_context, session_factory] () { + return agent->run(processor, thread_process_context, session_factory); }; std::future<utils::TaskRescheduleInfo> future; diff --git a/libminifi/src/core/ProcessContextImpl.cpp b/libminifi/src/core/ProcessContextImpl.cpp index 94caeab6c..f0ba1e215 100644 --- a/libminifi/src/core/ProcessContextImpl.cpp +++ b/libminifi/src/core/ProcessContextImpl.cpp @@ -77,7 +77,6 @@ bool ProcessContextImpl::hasNonEmptyProperty(std::string_view name) const { std::vector<std::string> ProcessContextImpl::getDynamicPropertyKeys() const { return processor_.getDynamicPropertyKeys(); } std::map<std::string, std::string> ProcessContextImpl::getDynamicProperties(const FlowFile* flow_file) const { - std::lock_guard<std::mutex> lock(mutex_); auto dynamic_props = processor_.getDynamicProperties(); const expression::Parameters params{this, flow_file}; for (auto& [dynamic_property_name, dynamic_property_value]: dynamic_props) { @@ -101,7 +100,6 @@ uint8_t ProcessContextImpl::getMaxConcurrentTasks() const { return processor_.ge void ProcessContextImpl::yield() { processor_.yield(); } nonstd::expected<std::string, std::error_code> ProcessContextImpl::getProperty(const std::string_view name, const FlowFile* flow_file) const { - std::lock_guard<std::mutex> lock(mutex_); const auto property = getProcessorInfo().getSupportedProperty(name); if (!property) { return nonstd::make_unexpected(PropertyErrorCode::NotSupportedProperty); @@ -124,19 +122,16 @@ nonstd::expected<std::string, std::error_code> ProcessContextImpl::getProperty(c } nonstd::expected<void, std::error_code> ProcessContextImpl::setProperty(const std::string_view name, std::string value) { - std::lock_guard<std::mutex> lock(mutex_); cached_expressions_.erase(std::string{name}); return getProcessor().setProperty(name, std::move(value)); } nonstd::expected<void, std::error_code> ProcessContextImpl::clearProperty(const std::string_view name) { - std::lock_guard<std::mutex> lock(mutex_); cached_expressions_.erase(std::string{name}); return getProcessor().clearProperty(name); } nonstd::expected<std::string, std::error_code> ProcessContextImpl::getDynamicProperty(const std::string_view name, const FlowFile* flow_file) const { - std::lock_guard<std::mutex> lock(mutex_); if (!cached_dynamic_expressions_.contains(name)) { auto expression_str = getProcessor().getDynamicProperty(name); if (!expression_str) { return expression_str; } @@ -155,7 +150,6 @@ nonstd::expected<std::string, std::error_code> ProcessContextImpl::getRawDynamic } nonstd::expected<void, std::error_code> ProcessContextImpl::setDynamicProperty(std::string name, std::string value) { - std::lock_guard<std::mutex> lock(mutex_); cached_dynamic_expressions_.erase(name); return getProcessor().setDynamicProperty(std::move(name), std::move(value)); } diff --git a/libminifi/test/unit/ProcessContextExprTests.cpp b/libminifi/test/unit/ProcessContextExprTests.cpp index a8cd6a117..0bca2a891 100644 --- a/libminifi/test/unit/ProcessContextExprTests.cpp +++ b/libminifi/test/unit/ProcessContextExprTests.cpp @@ -159,28 +159,3 @@ TEST_CASE("ProcessContextExpr can use expression language in dynamic properties" } } } - -TEST_CASE("ProcessContextExpr is mutex guarded properly") { - TestController test_controller; - const std::shared_ptr<TestPlan> test_plan = test_controller.createPlan(); - std::ignore = test_plan->addProcessor("DummyProcessor", "dummy_processor"); - test_plan->runNextProcessor(); - const auto context = test_plan->getCurrentContext(); - REQUIRE(dynamic_pointer_cast<core::ProcessContextImpl>(context) != nullptr); - - auto play_with_context = [=]() { - for (auto i = 0; i < 100; ++i) { - CHECK(context->setDynamicProperty("foo", fmt::format("${{literal('{}')}}", std::this_thread::get_id()))); - const auto dynamic_properties = context->getDynamicProperties(); - std::this_thread::sleep_for(std::chrono::milliseconds(10)); - } - }; - - std::thread thread_one{play_with_context}; - std::thread thread_two{play_with_context}; - std::thread thread_three{play_with_context}; - - REQUIRE_NOTHROW(thread_one.join()); - REQUIRE_NOTHROW(thread_two.join()); - REQUIRE_NOTHROW(thread_three.join()); -}
