This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 0a6270b76f9 [fix](executor) Fix rare self-deadlock that can cause the 
time-sharing task executor to hang. (#58273)
0a6270b76f9 is described below

commit 0a6270b76f9945cbbb2599cba35a545564f53028
Author: Qi Chen <[email protected]>
AuthorDate: Tue Jan 13 10:55:16 2026 +0800

    [fix](executor) Fix rare self-deadlock that can cause the time-sharing task 
executor to hang. (#58273)
    
    ### What problem does this PR solve?
    
    ### Release note
    
    #### Summary
    
    1. Fix a self-deadlock that can occur in `_dispatch_thread` when a
    destructor path attempts to re-acquire the same mutex already held by
    the thread. The root cause is destructors (triggered while holding
    `_lock`) performing operations that try to re-acquire the same mutex. A
    safe fix should ensure destructors that may call `remove_task()` run
    outside the `_lock` scope or avoid re-locking the same mutex inside
    destructor paths.
    2. Call `_task_executor->wait()` in
    `TaskExecutorSimplifiedScanScheduler::stop()`.
    
    ---
    
    #### Details / Reproduction steps
    1. `std::shared_ptr<PrioritizedSplitRunner> split =
    _tokenless->_entries->take();`
    3. `l.lock();` — `_dispatch_thread` acquires `_lock`.
    4. After the `while` loop finishes, `split` goes out of scope and the
    `shared_ptr` is destroyed.
    5. `PrioritizedSplitRunner` destructor runs → destroys `_split_runner`
    (`ScannerSplitRunner`).
    6. `ScannerSplitRunner::_scan_func` destructor runs → destroys captured
    `ctx` (`std::shared_ptr<ScannerContext>`).
    7. `ScannerContext::~ScannerContext()` calls `remove_task()`.
    8. `remove_task()` attempts to acquire `_lock`.
    9. Result: **self-deadlock** because `_lock` is already held by
    `_dispatch_thread`.
    
    ---
    
    #### Solution
    We must explicitly release 'split' BEFORE acquiring _lock to avoid
    self-deadlock. The destructor chain (`PrioritizedSplitRunner` ->
    `ScannerSplitRunner`-> `_scan_func` lambda -> captured `ScannerContext`)
    may call `remove_task()` which tries to acquire `_lock`. Since `_lock`
    is not a recursive mutex, this would deadlock.
---
 be/src/common/config.cpp                              |  2 +-
 be/src/vec/exec/executor/task_executor.h              |  1 +
 .../time_sharing/time_sharing_task_executor.cpp       | 19 +++++++++++--------
 .../time_sharing/time_sharing_task_executor.h         |  3 +--
 be/src/vec/exec/scan/scanner_scheduler.h              |  1 +
 5 files changed, 15 insertions(+), 11 deletions(-)

diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp
index 5f78c7d9294..ae172414b65 100644
--- a/be/src/common/config.cpp
+++ b/be/src/common/config.cpp
@@ -308,7 +308,7 @@ DEFINE_Int32(task_executor_max_concurrency_per_task, "-1");
 DEFINE_Int32(task_executor_initial_max_concurrency_per_task, "-1");
 
 // Enable task executor in internal table scan.
-DEFINE_Bool(enable_task_executor_in_internal_table, "false");
+DEFINE_Bool(enable_task_executor_in_internal_table, "true");
 // Enable task executor in external table scan.
 DEFINE_Bool(enable_task_executor_in_external_table, "true");
 
diff --git a/be/src/vec/exec/executor/task_executor.h 
b/be/src/vec/exec/executor/task_executor.h
index 59ea00d460e..98526b1b81c 100644
--- a/be/src/vec/exec/executor/task_executor.h
+++ b/be/src/vec/exec/executor/task_executor.h
@@ -37,6 +37,7 @@ public:
     virtual Status init() = 0;
     virtual Status start() = 0;
     virtual void stop() = 0;
+    virtual void wait() = 0;
 
     virtual Result<std::shared_ptr<TaskHandle>> create_task(
             const TaskId& task_id, std::function<double()> 
utilization_supplier,
diff --git 
a/be/src/vec/exec/executor/time_sharing/time_sharing_task_executor.cpp 
b/be/src/vec/exec/executor/time_sharing/time_sharing_task_executor.cpp
index 5d104290177..4459f20037b 100644
--- a/be/src/vec/exec/executor/time_sharing/time_sharing_task_executor.cpp
+++ b/be/src/vec/exec/executor/time_sharing/time_sharing_task_executor.cpp
@@ -562,10 +562,6 @@ void TimeSharingTaskExecutor::_dispatch_thread() {
             std::lock_guard<std::mutex> guard(_mutex);
             _running_splits.insert(split);
         }
-        Defer defer {[&]() {
-            std::lock_guard<std::mutex> guard(_mutex);
-            _running_splits.erase(split);
-        }};
 
         Result<SharedListenableFuture<Void>> blocked_future_result = 
split->process();
 
@@ -577,10 +573,6 @@ void TimeSharingTaskExecutor::_dispatch_thread() {
             auto blocked_future = blocked_future_result.value();
 
             if (split->is_finished()) {
-                {
-                    std::ostringstream _oss;
-                    _oss << std::this_thread::get_id();
-                }
                 _split_finished(split, split->finished_status());
             } else {
                 if (split->is_auto_reschedule()) {
@@ -625,6 +617,17 @@ void TimeSharingTaskExecutor::_dispatch_thread() {
         // In the worst case, the destructor might even try to do something
         // with this SplitThreadPool, and produce a deadlock.
         // task.runnable.reset();
+
+        // IMPORTANT: We must explicitly release 'split' BEFORE acquiring 
_lock to avoid
+        // self-deadlock. The destructor chain (PrioritizedSplitRunner -> 
ScannerSplitRunner
+        // -> _scan_func lambda -> captured ScannerContext) may call 
remove_task() which
+        // tries to acquire _lock. Since _lock is not a recursive mutex, this 
would deadlock.
+        {
+            std::lock_guard<std::mutex> guard(_mutex);
+            _running_splits.erase(split);
+        }
+        split.reset();
+
         l.lock();
         thread_pool_task_execution_time_ns_total->increment(
                 task_execution_time_watch.elapsed_time());
diff --git a/be/src/vec/exec/executor/time_sharing/time_sharing_task_executor.h 
b/be/src/vec/exec/executor/time_sharing/time_sharing_task_executor.h
index f40cb88b424..3550449beb7 100644
--- a/be/src/vec/exec/executor/time_sharing/time_sharing_task_executor.h
+++ b/be/src/vec/exec/executor/time_sharing/time_sharing_task_executor.h
@@ -97,6 +97,7 @@ public:
 
     Status start() override;
     void stop() override;
+    void wait() override;
 
     Result<std::shared_ptr<TaskHandle>> create_task(
             const TaskId& task_id, std::function<double()> 
utilization_supplier,
@@ -245,8 +246,6 @@ private:
             std::unique_lock<std::mutex>& lock);
     void _record_leaf_splits_size(std::unique_lock<std::mutex>& lock);
     void _split_finished(std::shared_ptr<PrioritizedSplitRunner> split, const 
Status& status);
-    // Waits until all the tasks are completed.
-    void wait();
 
     int64_t _get_running_tasks_for_level(int level) const;
 
diff --git a/be/src/vec/exec/scan/scanner_scheduler.h 
b/be/src/vec/exec/scan/scanner_scheduler.h
index 21fa4aefa5c..089f3e1e5b7 100644
--- a/be/src/vec/exec/scan/scanner_scheduler.h
+++ b/be/src/vec/exec/scan/scanner_scheduler.h
@@ -268,6 +268,7 @@ public:
     void stop() override {
         _is_stop.store(true);
         _task_executor->stop();
+        _task_executor->wait();
     }
 
     Status start(int max_thread_num, int min_thread_num, int queue_size,


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to