This is an automated email from the ASF dual-hosted git repository. fgerlits pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/nifi-minifi-cpp.git
commit 35f98753d5b5e6e5eb64e0951a7af98d08ceca3e Author: Gabor Gyimesi <[email protected]> AuthorDate: Tue Sep 2 14:28:47 2025 +0200 MINIFICPP-2619 Fix deadlock of ThreadPool shutdown The test case "Timer driven should respect both yield and run schedule" of SchedulingAgentTests hung sometimes due to a race condition in the ThreadPool class. It could occur that while shutting down, the notify_all call would go out and after that the delayed scheduler thread would still try to process the worker queue one more time then wait for the notify forever. Moving the running_ variable check under the mutual mutex solves the issue. Signed-off-by: Ferenc Gerlits <[email protected]> Closes #2019 --- core-framework/include/utils/ThreadPool.h | 1 - core-framework/src/utils/ThreadPool.cpp | 6 ++++-- 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/core-framework/include/utils/ThreadPool.h b/core-framework/include/utils/ThreadPool.h index 0d0231427..f5a90f698 100644 --- a/core-framework/include/utils/ThreadPool.h +++ b/core-framework/include/utils/ThreadPool.h @@ -271,7 +271,6 @@ class ThreadPool { std::vector<std::shared_ptr<WorkerThread>> thread_queue_; std::thread manager_thread_; std::thread delayed_scheduler_thread_; - std::atomic<bool> adjust_threads_; std::atomic<bool> running_; core::controller::ControllerServiceLookup* controller_service_provider_; std::shared_ptr<controllers::ThreadManagementService> thread_manager_; diff --git a/core-framework/src/utils/ThreadPool.cpp b/core-framework/src/utils/ThreadPool.cpp index 999f3ad5c..d46064722 100644 --- a/core-framework/src/utils/ThreadPool.cpp +++ b/core-framework/src/utils/ThreadPool.cpp @@ -26,7 +26,6 @@ ThreadPool::ThreadPool(int max_worker_threads, core::controller::ControllerServi : thread_reduction_count_(0), max_worker_threads_(max_worker_threads), current_workers_(0), - adjust_threads_(false), running_(false), controller_service_provider_(controller_service_provider), name_(std::move(name)), @@ -98,8 +97,11 @@ void ThreadPool::run_tasks(const std::shared_ptr<WorkerThread>& thread) { } void ThreadPool::manage_delayed_queue() { - while (running_) { + while (true) { std::unique_lock<std::mutex> lock(worker_queue_mutex_); + if (!running_) { + return; + } // Put the tasks ready to run in the worker queue while (!delayed_worker_queue_.empty() && delayed_worker_queue_.top().getNextExecutionTime() <= std::chrono::steady_clock::now()) {
