This is an automated email from the ASF dual-hosted git repository.

szaszm pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git

commit 212b1458c72dab812bc99eb3b47fd887cc3c190f
Author: Martin Zink <[email protected]>
AuthorDate: Sun Dec 29 21:00:29 2024 +0100

    MINIFICPP-2505 SchedulingAgent::scheduled_processors_ isnt thread safe
    
    Closes #1910
    
    Signed-off-by: Marton Szasz <[email protected]>
---
 libminifi/include/SchedulingAgent.h |  4 ++--
 libminifi/src/SchedulingAgent.cpp   | 33 ++++++++++++++++++---------------
 2 files changed, 20 insertions(+), 17 deletions(-)

diff --git a/libminifi/include/SchedulingAgent.h 
b/libminifi/include/SchedulingAgent.h
index 99ebd62c8..f50ddc571 100644
--- a/libminifi/include/SchedulingAgent.h
+++ b/libminifi/include/SchedulingAgent.h
@@ -150,8 +150,8 @@ class SchedulingAgent {
   };
 
   std::shared_ptr<core::logging::Logger> logger_;
-  mutable std::mutex watchdog_mtx_;  // used to protect the set below
-  std::set<SchedulingInfo> scheduled_processors_;  // set was chosen to avoid 
iterator invalidation
+  mutable std::mutex watchdog_mtx_;  // used to protect the vector below
+  std::vector<gsl::not_null<SchedulingInfo*>> scheduled_processors_;
   std::unique_ptr<utils::CallBackTimer> watchDogTimer_;
   std::chrono::milliseconds alert_time_;
 };
diff --git a/libminifi/src/SchedulingAgent.cpp 
b/libminifi/src/SchedulingAgent.cpp
index f42ca057f..90da81971 100644
--- a/libminifi/src/SchedulingAgent.cpp
+++ b/libminifi/src/SchedulingAgent.cpp
@@ -18,10 +18,12 @@
  * limitations under the License.
  */
 #include "SchedulingAgent.h"
+
 #include <chrono>
+#include <memory>
 #include <thread>
 #include <utility>
-#include <memory>
+
 #include "core/Processor.h"
 #include "utils/gsl.h"
 
@@ -68,18 +70,19 @@ nonstd::expected<void, std::exception_ptr> 
SchedulingAgent::triggerAndCommit(cor
     return {};
   }
 
-  auto schedule_it = scheduled_processors_.end();
-
+  auto processor_scheduling_info = SchedulingInfo(processor);
   {
     std::lock_guard<std::mutex> lock(watchdog_mtx_);
-    schedule_it = scheduled_processors_.emplace(processor).first;
+    
scheduled_processors_.push_back(gsl::make_not_null(&processor_scheduling_info));
   }
 
-  const auto guard = gsl::finally([this, &schedule_it](){
+  const auto guard = gsl::finally([this, &processor_scheduling_info](){
     std::lock_guard<std::mutex> lock(watchdog_mtx_);
-    scheduled_processors_.erase(schedule_it);
+    [[maybe_unused]] const auto erased_scheduling_infos_count = 
std::erase(scheduled_processors_, 
gsl::make_not_null(&processor_scheduling_info));
+    gsl_Assert(1 == erased_scheduling_infos_count);
   });
 
+
   processor->incrementActiveTasks();
   auto decrement_task = gsl::finally([processor]() { 
processor->decrementActiveTask(); });
 
@@ -107,16 +110,16 @@ nonstd::expected<bool, std::exception_ptr> 
SchedulingAgent::trigger(core::Proces
     return false;
   }
 
-  auto schedule_it = scheduled_processors_.end();
-
+  auto processor_scheduling_info = SchedulingInfo(processor);
   {
     std::lock_guard<std::mutex> lock(watchdog_mtx_);
-    schedule_it = scheduled_processors_.emplace(processor).first;
+    
scheduled_processors_.push_back(gsl::make_not_null(&processor_scheduling_info));
   }
 
-  const auto guard = gsl::finally([this, &schedule_it](){
+  const auto guard = gsl::finally([this, &processor_scheduling_info](){
     std::lock_guard<std::mutex> lock(watchdog_mtx_);
-    scheduled_processors_.erase(schedule_it);
+    [[maybe_unused]] const auto erased_scheduling_infos_count = 
std::erase(scheduled_processors_, 
gsl::make_not_null(&processor_scheduling_info));
+    gsl_Assert(1 == erased_scheduling_infos_count);
   });
 
   processor->incrementActiveTasks();
@@ -141,11 +144,11 @@ void SchedulingAgent::watchDogFunc() {
   std::lock_guard<std::mutex> lock(watchdog_mtx_);
   auto now = std::chrono::steady_clock::now();
   for (const auto& info : scheduled_processors_) {
-    auto elapsed = now - info.last_alert_time_;
+    auto elapsed = now - info->last_alert_time_;
     if (elapsed > alert_time_) {
-      int64_t elapsed_ms{ 
std::chrono::duration_cast<std::chrono::milliseconds>(now - 
info.start_time_).count() };
-      logger_->log_warn("{}::onTrigger has been running for {}  ms in {}", 
info.name_, elapsed_ms, info.uuid_);
-      info.last_alert_time_ = now;
+      int64_t elapsed_ms{ 
std::chrono::duration_cast<std::chrono::milliseconds>(now - 
info->start_time_).count() };
+      logger_->log_warn("{}::onTrigger has been running for {}  ms in {}", 
info->name_, elapsed_ms, info->uuid_);
+      info->last_alert_time_ = now;
     }
   }
 }

Reply via email to