This is an automated email from the ASF dual-hosted git repository. fgerlits pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git
commit 8bb8cc7e2f2b0cd0455e59b77c8a70188d4bb419 Author: Martin Zink <[email protected]> AuthorDate: Thu Aug 14 11:11:41 2025 +0200 MINIFICPP-2607 ProcessContextExpr thread safety fix Signed-off-by: Ferenc Gerlits <[email protected]> Closes #2009 --- .../expression-language/ProcessContextExpr.cpp | 5 +++++ .../expression-language/ProcessContextExpr.h | 1 + .../tests/ProcessContextExprTests.cpp | 25 ++++++++++++++++++++++ 3 files changed, 31 insertions(+) diff --git a/extensions/expression-language/ProcessContextExpr.cpp b/extensions/expression-language/ProcessContextExpr.cpp index fb917045e..71a748b35 100644 --- a/extensions/expression-language/ProcessContextExpr.cpp +++ b/extensions/expression-language/ProcessContextExpr.cpp @@ -26,6 +26,7 @@ namespace org::apache::nifi::minifi::core { nonstd::expected<std::string, std::error_code> ProcessContextExpr::getProperty(const std::string_view name, const FlowFile* flow_file) const { + std::lock_guard<std::mutex> lock(mutex_); const auto property = getProcessorInfo().getSupportedProperty(name); if (!property) { return nonstd::make_unexpected(PropertyErrorCode::NotSupportedProperty); @@ -48,6 +49,7 @@ nonstd::expected<std::string, std::error_code> ProcessContextExpr::getProperty(c } nonstd::expected<std::string, std::error_code> ProcessContextExpr::getDynamicProperty(const std::string_view name, const FlowFile* flow_file) const { + std::lock_guard<std::mutex> lock(mutex_); if (!cached_dynamic_expressions_.contains(name)) { auto expression_str = ProcessContextImpl::getDynamicProperty(name, flow_file); if (!expression_str) { return expression_str; } @@ -58,16 +60,19 @@ nonstd::expected<std::string, std::error_code> ProcessContextExpr::getDynamicPro } nonstd::expected<void, std::error_code> ProcessContextExpr::setProperty(const std::string_view name, std::string value) { + std::lock_guard<std::mutex> lock(mutex_); cached_expressions_.erase(std::string{name}); return ProcessContextImpl::setProperty(name, std::move(value)); } nonstd::expected<void, std::error_code> ProcessContextExpr::setDynamicProperty(std::string name, std::string value) { + std::lock_guard<std::mutex> lock(mutex_); cached_dynamic_expressions_.erase(name); return ProcessContextImpl::setDynamicProperty(std::move(name), std::move(value)); } std::map<std::string, std::string> ProcessContextExpr::getDynamicProperties(const FlowFile* flow_file) const { + std::lock_guard<std::mutex> lock(mutex_); auto dynamic_props = ProcessContextImpl::getDynamicProperties(flow_file); const expression::Parameters params{this, flow_file}; for (auto& [dynamic_property_name, dynamic_property_value]: dynamic_props) { diff --git a/extensions/expression-language/ProcessContextExpr.h b/extensions/expression-language/ProcessContextExpr.h index 3aa822d83..9e69e332e 100644 --- a/extensions/expression-language/ProcessContextExpr.h +++ b/extensions/expression-language/ProcessContextExpr.h @@ -46,6 +46,7 @@ class ProcessContextExpr final : public core::ProcessContextImpl { std::map<std::string, std::string> getDynamicProperties(const FlowFile*) const override; private: + mutable std::mutex mutex_; mutable std::unordered_map<std::string, expression::Expression, utils::string::transparent_string_hash, std::equal_to<>> cached_expressions_; mutable std::unordered_map<std::string, expression::Expression, utils::string::transparent_string_hash, std::equal_to<>> cached_dynamic_expressions_; }; diff --git a/extensions/expression-language/tests/ProcessContextExprTests.cpp b/extensions/expression-language/tests/ProcessContextExprTests.cpp index 595a79033..0537cb7a8 100644 --- a/extensions/expression-language/tests/ProcessContextExprTests.cpp +++ b/extensions/expression-language/tests/ProcessContextExprTests.cpp @@ -129,3 +129,28 @@ TEST_CASE("ProcessContextExpr can use expression language in dynamic properties" } } } + +TEST_CASE("ProcessContextExpr is mutex guarded properly") { + TestController test_controller; + const std::shared_ptr<TestPlan> test_plan = test_controller.createPlan(); + std::ignore = test_plan->addProcessor("DummyProcessor", "dummy_processor"); + test_plan->runNextProcessor(); + const auto context = test_plan->getCurrentContext(); + REQUIRE(dynamic_pointer_cast<core::ProcessContextExpr>(context) != nullptr); + + auto play_with_context = [=]() { + for (auto i = 0; i < 100; ++i) { + CHECK(context->setDynamicProperty("foo", fmt::format("${{literal('{}')}}", std::this_thread::get_id()))); + const auto dynamic_properties = context->getDynamicProperties(); + std::this_thread::sleep_for(std::chrono::milliseconds(10)); + } + }; + + std::thread thread_one{play_with_context}; + std::thread thread_two{play_with_context}; + std::thread thread_three{play_with_context}; + + REQUIRE_NOTHROW(thread_one.join()); + REQUIRE_NOTHROW(thread_two.join()); + REQUIRE_NOTHROW(thread_three.join()); +}
