This is an automated email from the ASF dual-hosted git repository.
morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 0a6270b76f9 [fix](executor) Fix rare self-deadlock that can cause the
time-sharing task executor to hang. (#58273)
0a6270b76f9 is described below
commit 0a6270b76f9945cbbb2599cba35a545564f53028
Author: Qi Chen <[email protected]>
AuthorDate: Tue Jan 13 10:55:16 2026 +0800
[fix](executor) Fix rare self-deadlock that can cause the time-sharing task
executor to hang. (#58273)
### What problem does this PR solve?
### Release note
#### Summary
1. Fix a self-deadlock that can occur in `_dispatch_thread` when a
destructor path attempts to re-acquire the same mutex already held by
the thread. The root cause is destructors (triggered while holding
`_lock`) performing operations that try to re-acquire the same mutex. A
safe fix should ensure destructors that may call `remove_task()` run
outside the `_lock` scope or avoid re-locking the same mutex inside
destructor paths.
2. Call `_task_executor->wait()` in
`TaskExecutorSimplifiedScanScheduler::stop()`.
---
#### Details / Reproduction steps
1. `std::shared_ptr<PrioritizedSplitRunner> split =
_tokenless->_entries->take();`
3. `l.lock();` — `_dispatch_thread` acquires `_lock`.
4. After the `while` loop finishes, `split` goes out of scope and the
`shared_ptr` is destroyed.
5. `PrioritizedSplitRunner` destructor runs → destroys `_split_runner`
(`ScannerSplitRunner`).
6. `ScannerSplitRunner::_scan_func` destructor runs → destroys captured
`ctx` (`std::shared_ptr<ScannerContext>`).
7. `ScannerContext::~ScannerContext()` calls `remove_task()`.
8. `remove_task()` attempts to acquire `_lock`.
9. Result: **self-deadlock** because `_lock` is already held by
`_dispatch_thread`.
---
#### Solution
We must explicitly release 'split' BEFORE acquiring _lock to avoid
self-deadlock. The destructor chain (`PrioritizedSplitRunner` ->
`ScannerSplitRunner`-> `_scan_func` lambda -> captured `ScannerContext`)
may call `remove_task()` which tries to acquire `_lock`. Since `_lock`
is not a recursive mutex, this would deadlock.
---
be/src/common/config.cpp | 2 +-
be/src/vec/exec/executor/task_executor.h | 1 +
.../time_sharing/time_sharing_task_executor.cpp | 19 +++++++++++--------
.../time_sharing/time_sharing_task_executor.h | 3 +--
be/src/vec/exec/scan/scanner_scheduler.h | 1 +
5 files changed, 15 insertions(+), 11 deletions(-)
diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp
index 5f78c7d9294..ae172414b65 100644
--- a/be/src/common/config.cpp
+++ b/be/src/common/config.cpp
@@ -308,7 +308,7 @@ DEFINE_Int32(task_executor_max_concurrency_per_task, "-1");
DEFINE_Int32(task_executor_initial_max_concurrency_per_task, "-1");
// Enable task executor in internal table scan.
-DEFINE_Bool(enable_task_executor_in_internal_table, "false");
+DEFINE_Bool(enable_task_executor_in_internal_table, "true");
// Enable task executor in external table scan.
DEFINE_Bool(enable_task_executor_in_external_table, "true");
diff --git a/be/src/vec/exec/executor/task_executor.h
b/be/src/vec/exec/executor/task_executor.h
index 59ea00d460e..98526b1b81c 100644
--- a/be/src/vec/exec/executor/task_executor.h
+++ b/be/src/vec/exec/executor/task_executor.h
@@ -37,6 +37,7 @@ public:
virtual Status init() = 0;
virtual Status start() = 0;
virtual void stop() = 0;
+ virtual void wait() = 0;
virtual Result<std::shared_ptr<TaskHandle>> create_task(
const TaskId& task_id, std::function<double()>
utilization_supplier,
diff --git
a/be/src/vec/exec/executor/time_sharing/time_sharing_task_executor.cpp
b/be/src/vec/exec/executor/time_sharing/time_sharing_task_executor.cpp
index 5d104290177..4459f20037b 100644
--- a/be/src/vec/exec/executor/time_sharing/time_sharing_task_executor.cpp
+++ b/be/src/vec/exec/executor/time_sharing/time_sharing_task_executor.cpp
@@ -562,10 +562,6 @@ void TimeSharingTaskExecutor::_dispatch_thread() {
std::lock_guard<std::mutex> guard(_mutex);
_running_splits.insert(split);
}
- Defer defer {[&]() {
- std::lock_guard<std::mutex> guard(_mutex);
- _running_splits.erase(split);
- }};
Result<SharedListenableFuture<Void>> blocked_future_result =
split->process();
@@ -577,10 +573,6 @@ void TimeSharingTaskExecutor::_dispatch_thread() {
auto blocked_future = blocked_future_result.value();
if (split->is_finished()) {
- {
- std::ostringstream _oss;
- _oss << std::this_thread::get_id();
- }
_split_finished(split, split->finished_status());
} else {
if (split->is_auto_reschedule()) {
@@ -625,6 +617,17 @@ void TimeSharingTaskExecutor::_dispatch_thread() {
// In the worst case, the destructor might even try to do something
// with this SplitThreadPool, and produce a deadlock.
// task.runnable.reset();
+
+ // IMPORTANT: We must explicitly release 'split' BEFORE acquiring
_lock to avoid
+ // self-deadlock. The destructor chain (PrioritizedSplitRunner ->
ScannerSplitRunner
+ // -> _scan_func lambda -> captured ScannerContext) may call
remove_task() which
+ // tries to acquire _lock. Since _lock is not a recursive mutex, this
would deadlock.
+ {
+ std::lock_guard<std::mutex> guard(_mutex);
+ _running_splits.erase(split);
+ }
+ split.reset();
+
l.lock();
thread_pool_task_execution_time_ns_total->increment(
task_execution_time_watch.elapsed_time());
diff --git a/be/src/vec/exec/executor/time_sharing/time_sharing_task_executor.h
b/be/src/vec/exec/executor/time_sharing/time_sharing_task_executor.h
index f40cb88b424..3550449beb7 100644
--- a/be/src/vec/exec/executor/time_sharing/time_sharing_task_executor.h
+++ b/be/src/vec/exec/executor/time_sharing/time_sharing_task_executor.h
@@ -97,6 +97,7 @@ public:
Status start() override;
void stop() override;
+ void wait() override;
Result<std::shared_ptr<TaskHandle>> create_task(
const TaskId& task_id, std::function<double()>
utilization_supplier,
@@ -245,8 +246,6 @@ private:
std::unique_lock<std::mutex>& lock);
void _record_leaf_splits_size(std::unique_lock<std::mutex>& lock);
void _split_finished(std::shared_ptr<PrioritizedSplitRunner> split, const
Status& status);
- // Waits until all the tasks are completed.
- void wait();
int64_t _get_running_tasks_for_level(int level) const;
diff --git a/be/src/vec/exec/scan/scanner_scheduler.h
b/be/src/vec/exec/scan/scanner_scheduler.h
index 21fa4aefa5c..089f3e1e5b7 100644
--- a/be/src/vec/exec/scan/scanner_scheduler.h
+++ b/be/src/vec/exec/scan/scanner_scheduler.h
@@ -268,6 +268,7 @@ public:
void stop() override {
_is_stop.store(true);
_task_executor->stop();
+ _task_executor->wait();
}
Status start(int max_thread_num, int min_thread_num, int queue_size,
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]