This is an automated email from the ASF dual-hosted git repository.

szaszm pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git

commit 3d38c3ac6d0534288c973f27b3c5e8a8a1cc2d9e
Author: Ferenc Gerlits <[email protected]>
AuthorDate: Thu Jan 22 15:41:31 2026 +0100

    MINIFICPP-2693 Do not share ProcessContextImpl objects among threads
    
    In the case of multi-threaded processors, give each instance a separate 
ProcessContextImpl object, so they don't have to share the caches.
    
    Closes #2085
    
    Signed-off-by: Marton Szasz <[email protected]>
---
 libminifi/include/core/ProcessContextImpl.h     |  2 +-
 libminifi/src/ThreadedSchedulingAgent.cpp       |  9 +++------
 libminifi/src/core/ProcessContextImpl.cpp       |  6 ------
 libminifi/test/unit/ProcessContextExprTests.cpp | 25 -------------------------
 4 files changed, 4 insertions(+), 38 deletions(-)

diff --git a/libminifi/include/core/ProcessContextImpl.h 
b/libminifi/include/core/ProcessContextImpl.h
index 31f6ad188..9017de724 100644
--- a/libminifi/include/core/ProcessContextImpl.h
+++ b/libminifi/include/core/ProcessContextImpl.h
@@ -206,7 +206,7 @@ class ProcessContextImpl : public 
core::VariableRegistryImpl, public virtual Pro
   gsl::not_null<std::shared_ptr<Configure>> configure_;
   std::unique_ptr<ProcessorInfo> info_;
 
-  mutable std::mutex mutex_;
+  // each ProcessContextImpl instance is only accessed from one thread at a 
time, so no synchronization is needed on these caches
   mutable std::unordered_map<std::string, expression::Expression, 
utils::string::transparent_string_hash, std::equal_to<>> cached_expressions_;
   mutable std::unordered_map<std::string, expression::Expression, 
utils::string::transparent_string_hash, std::equal_to<>> 
cached_dynamic_expressions_;
 };
diff --git a/libminifi/src/ThreadedSchedulingAgent.cpp 
b/libminifi/src/ThreadedSchedulingAgent.cpp
index 05cbb11e6..c37603e4d 100644
--- a/libminifi/src/ThreadedSchedulingAgent.cpp
+++ b/libminifi/src/ThreadedSchedulingAgent.cpp
@@ -77,15 +77,12 @@ void ThreadedSchedulingAgent::schedule(core::Processor* 
processor) {
 
   processor->onSchedule(*process_context, *session_factory);
 
-  std::vector<std::thread *> threads;
-
   ThreadedSchedulingAgent *agent = this;
   for (uint8_t i = 0; i < processor->getMaxConcurrentTasks(); i++) {
-    // reference the disable function from serviceNode
     processor->incrementActiveTasks();
-
-    std::function<utils::TaskRescheduleInfo()> f_ex = [agent, processor, 
process_context, session_factory] () {
-      return agent->run(processor, process_context, session_factory);
+    auto thread_process_context = 
std::make_shared<core::ProcessContextImpl>(*processor, 
controller_service_provider_, repo_, flow_repo_, configure_, content_repo_);
+    std::function<utils::TaskRescheduleInfo()> f_ex = [agent, processor, 
thread_process_context, session_factory] () {
+      return agent->run(processor, thread_process_context, session_factory);
     };
 
     std::future<utils::TaskRescheduleInfo> future;
diff --git a/libminifi/src/core/ProcessContextImpl.cpp 
b/libminifi/src/core/ProcessContextImpl.cpp
index 94caeab6c..f0ba1e215 100644
--- a/libminifi/src/core/ProcessContextImpl.cpp
+++ b/libminifi/src/core/ProcessContextImpl.cpp
@@ -77,7 +77,6 @@ bool ProcessContextImpl::hasNonEmptyProperty(std::string_view 
name) const {
 std::vector<std::string> ProcessContextImpl::getDynamicPropertyKeys() const { 
return processor_.getDynamicPropertyKeys(); }
 
 std::map<std::string, std::string> 
ProcessContextImpl::getDynamicProperties(const FlowFile* flow_file) const {
-  std::lock_guard<std::mutex> lock(mutex_);
   auto dynamic_props = processor_.getDynamicProperties();
   const expression::Parameters params{this, flow_file};
   for (auto& [dynamic_property_name, dynamic_property_value]: dynamic_props) {
@@ -101,7 +100,6 @@ uint8_t ProcessContextImpl::getMaxConcurrentTasks() const { 
return processor_.ge
 void ProcessContextImpl::yield() { processor_.yield(); }
 
 nonstd::expected<std::string, std::error_code> 
ProcessContextImpl::getProperty(const std::string_view name, const FlowFile* 
flow_file) const {
-  std::lock_guard<std::mutex> lock(mutex_);
   const auto property = getProcessorInfo().getSupportedProperty(name);
   if (!property) {
     return nonstd::make_unexpected(PropertyErrorCode::NotSupportedProperty);
@@ -124,19 +122,16 @@ nonstd::expected<std::string, std::error_code> 
ProcessContextImpl::getProperty(c
 }
 
 nonstd::expected<void, std::error_code> ProcessContextImpl::setProperty(const 
std::string_view name, std::string value) {
-  std::lock_guard<std::mutex> lock(mutex_);
   cached_expressions_.erase(std::string{name});
   return getProcessor().setProperty(name, std::move(value));
 }
 
 nonstd::expected<void, std::error_code> 
ProcessContextImpl::clearProperty(const std::string_view name) {
-  std::lock_guard<std::mutex> lock(mutex_);
   cached_expressions_.erase(std::string{name});
   return getProcessor().clearProperty(name);
 }
 
 nonstd::expected<std::string, std::error_code> 
ProcessContextImpl::getDynamicProperty(const std::string_view name, const 
FlowFile* flow_file) const {
-  std::lock_guard<std::mutex> lock(mutex_);
   if (!cached_dynamic_expressions_.contains(name)) {
     auto expression_str = getProcessor().getDynamicProperty(name);
     if (!expression_str) { return expression_str; }
@@ -155,7 +150,6 @@ nonstd::expected<std::string, std::error_code> 
ProcessContextImpl::getRawDynamic
 }
 
 nonstd::expected<void, std::error_code> 
ProcessContextImpl::setDynamicProperty(std::string name, std::string value) {
-  std::lock_guard<std::mutex> lock(mutex_);
   cached_dynamic_expressions_.erase(name);
   return getProcessor().setDynamicProperty(std::move(name), std::move(value));
 }
diff --git a/libminifi/test/unit/ProcessContextExprTests.cpp 
b/libminifi/test/unit/ProcessContextExprTests.cpp
index a8cd6a117..0bca2a891 100644
--- a/libminifi/test/unit/ProcessContextExprTests.cpp
+++ b/libminifi/test/unit/ProcessContextExprTests.cpp
@@ -159,28 +159,3 @@ TEST_CASE("ProcessContextExpr can use expression language 
in dynamic properties"
     }
   }
 }
-
-TEST_CASE("ProcessContextExpr is mutex guarded properly") {
-  TestController test_controller;
-  const std::shared_ptr<TestPlan> test_plan = test_controller.createPlan();
-  std::ignore = test_plan->addProcessor("DummyProcessor", "dummy_processor");
-  test_plan->runNextProcessor();
-  const auto context = test_plan->getCurrentContext();
-  REQUIRE(dynamic_pointer_cast<core::ProcessContextImpl>(context) != nullptr);
-
-  auto play_with_context = [=]() {
-    for (auto i = 0; i < 100; ++i) {
-      CHECK(context->setDynamicProperty("foo", 
fmt::format("${{literal('{}')}}", std::this_thread::get_id())));
-      const auto dynamic_properties = context->getDynamicProperties();
-      std::this_thread::sleep_for(std::chrono::milliseconds(10));
-    }
-  };
-
-  std::thread thread_one{play_with_context};
-  std::thread thread_two{play_with_context};
-  std::thread thread_three{play_with_context};
-
-  REQUIRE_NOTHROW(thread_one.join());
-  REQUIRE_NOTHROW(thread_two.join());
-  REQUIRE_NOTHROW(thread_three.join());
-}

Reply via email to