This is an automated email from the ASF dual-hosted git repository.
HappenLee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 9dae1258c9b [fix](be) Release workload group schedulers on shutdown
(#65112)
9dae1258c9b is described below
commit 9dae1258c9bad38209ef403bf33f349b38c8ead4
Author: TengJianPing <[email protected]>
AuthorDate: Fri Jul 3 11:39:40 2026 +0800
[fix](be) Release workload group schedulers on shutdown (#65112)
Problem Summary:
There are smart pointer cycle references:
```
ResourceContext
-> shared_ptr<WorkloadGroup>
-> WorkloadGroup::_task_sched
-> HybridTaskScheduler
-> TaskScheduler::_task_queue
-> MultiCoreTaskQueue
-> PriorityTaskQueue
-> SubTaskQueue::_queue
-> shared_ptr<PipelineTask>
-> ScanLocalState / ScannerContext / ScanTask / operator local state
-> shared_ptr<ResourceContext>
```
and
```
ResourceContext
-> WorkloadGroup
-> _scan_task_sched / _remote_scan_task_sched
-> TaskExecutorSimplifiedScanScheduler
-> queued scan task / split runner
-> ScannerContext / ScanTask
-> shared_ptr<ResourceContext>
```
Graceful shutdown with enable_graceful_exit_check could report ASAN/LSan
leaks after exec env destroy. The leak stacks showed pending pipeline
tasks in MultiCoreTaskQueue, scanner splits in TimeSharingTaskExecutor,
memtable flush tasks in workload group ThreadPool, and
QueryContext/ResourceContext retaining WorkloadGroup.
ExecEnv stopped workload group schedulers before fragment and load
resources were released, and WorkloadGroupMgr::stop only stopped
scheduler threads without destroying scheduler objects or their queued
resources. Pending queues and executor task maps could therefore retain
query/load resources until process exit. This change clears pending
pipeline task queues on close, makes queue close and submit mutually
exclusive, only marks tasks runnable after a successful enqueue,
releases scheduler-owned queues after fragment/load resources have
stopped.
---
be/src/exec/pipeline/task_queue.cpp | 35 +++++++++++++++++-----
be/src/exec/pipeline/task_queue.h | 2 ++
be/src/exec/scan/scanner_scheduler.h | 8 +++--
.../time_sharing/time_sharing_task_executor.cpp | 7 +++--
be/src/runtime/exec_env_init.cpp | 8 ++++-
be/src/runtime/workload_group/workload_group.cpp | 13 ++++++++
be/src/runtime/workload_group/workload_group.h | 2 ++
.../workload_group/workload_group_manager.cpp | 7 +++++
.../workload_group/workload_group_manager.h | 2 ++
9 files changed, 71 insertions(+), 13 deletions(-)
diff --git a/be/src/exec/pipeline/task_queue.cpp
b/be/src/exec/pipeline/task_queue.cpp
index 40e6d60eb90..a9fee38b459 100644
--- a/be/src/exec/pipeline/task_queue.cpp
+++ b/be/src/exec/pipeline/task_queue.cpp
@@ -21,6 +21,7 @@
#include <chrono> // IWYU pragma: keep
#include <memory>
#include <string>
+#include <vector>
#include "common/logging.h"
#include "exec/pipeline/pipeline_task.h"
@@ -37,6 +38,12 @@ PipelineTaskSPtr SubTaskQueue::try_take(bool is_steal) {
return task;
}
+std::queue<PipelineTaskSPtr> SubTaskQueue::take_all() {
+ std::queue<PipelineTaskSPtr> tasks;
+ tasks.swap(_queue);
+ return tasks;
+}
+
//////////////////// PriorityTaskQueue ////////////////////
PriorityTaskQueue::PriorityTaskQueue() : _closed(false) {
@@ -48,10 +55,24 @@ PriorityTaskQueue::PriorityTaskQueue() : _closed(false) {
}
void PriorityTaskQueue::close() {
- std::unique_lock<std::mutex> lock(_work_size_mutex);
- _closed = true;
- _wait_task.notify_all();
-
DorisMetrics::instance()->pipeline_task_queue_size->increment(-_total_task_size);
+ std::vector<std::queue<PipelineTaskSPtr>> tasks_to_release;
+ {
+ std::unique_lock<std::mutex> lock(_work_size_mutex);
+ _closed = true;
+ const auto total_task_size = _total_task_size.exchange(0);
+ _wait_task.notify_all();
+ DorisMetrics::instance()->pipeline_task_queue_size->increment(
+ -static_cast<int64_t>(total_task_size));
+ for (auto& sub_queue : _sub_queues) {
+ tasks_to_release.emplace_back(sub_queue.take_all());
+ }
+ }
+ for (auto& tasks : tasks_to_release) {
+ while (!tasks.empty()) {
+ tasks.front()->pop_out_runnable_queue();
+ tasks.pop();
+ }
+ }
}
PipelineTaskSPtr PriorityTaskQueue::_try_take_unprotected(bool is_steal) {
@@ -113,11 +134,11 @@ PipelineTaskSPtr PriorityTaskQueue::take(uint32_t
timeout_ms) {
}
Status PriorityTaskQueue::push(PipelineTaskSPtr task) {
+ auto level = _compute_level(task->get_runtime_ns());
+ std::unique_lock<std::mutex> lock(_work_size_mutex);
if (_closed) {
return Status::InternalError("WorkTaskQueue closed");
}
- auto level = _compute_level(task->get_runtime_ns());
- std::unique_lock<std::mutex> lock(_work_size_mutex);
// update empty queue's runtime, to avoid too high priority
if (_sub_queues[level].empty() &&
@@ -125,6 +146,7 @@ Status PriorityTaskQueue::push(PipelineTaskSPtr task) {
_sub_queues[level].adjust_runtime(_queue_level_min_vruntime);
}
+ task->put_in_runnable_queue();
_sub_queues[level].push_back(task);
_total_task_size++;
DorisMetrics::instance()->pipeline_task_queue_size->increment(1);
@@ -199,7 +221,6 @@ Status MultiCoreTaskQueue::push_back(PipelineTaskSPtr task)
{
Status MultiCoreTaskQueue::push_back(PipelineTaskSPtr task, int core_id) {
DCHECK(core_id < _core_size);
- task->put_in_runnable_queue();
return _prio_task_queues[core_id].push(task);
}
diff --git a/be/src/exec/pipeline/task_queue.h
b/be/src/exec/pipeline/task_queue.h
index ad55a4f82db..f5a33a91699 100644
--- a/be/src/exec/pipeline/task_queue.h
+++ b/be/src/exec/pipeline/task_queue.h
@@ -56,6 +56,8 @@ public:
bool empty() { return _queue.empty(); }
+ std::queue<PipelineTaskSPtr> take_all();
+
private:
std::queue<PipelineTaskSPtr> _queue;
// depends on LEVEL_QUEUE_TIME_FACTOR
diff --git a/be/src/exec/scan/scanner_scheduler.h
b/be/src/exec/scan/scanner_scheduler.h
index bc4ac2c8075..57f4406a1ae 100644
--- a/be/src/exec/scan/scanner_scheduler.h
+++ b/be/src/exec/scan/scanner_scheduler.h
@@ -164,7 +164,9 @@ public:
}
void stop() override {
- _is_stop.store(true);
+ if (_is_stop.exchange(true)) {
+ return;
+ }
_scan_thread_pool->shutdown();
_scan_thread_pool->wait();
}
@@ -264,7 +266,9 @@ public:
}
void stop() override {
- _is_stop.store(true);
+ if (_is_stop.exchange(true)) {
+ return;
+ }
_task_executor->stop();
_task_executor->wait();
}
diff --git
a/be/src/exec/scan/task_executor/time_sharing/time_sharing_task_executor.cpp
b/be/src/exec/scan/task_executor/time_sharing/time_sharing_task_executor.cpp
index dc60da2e39d..05d07263d0c 100644
--- a/be/src/exec/scan/task_executor/time_sharing/time_sharing_task_executor.cpp
+++ b/be/src/exec/scan/task_executor/time_sharing/time_sharing_task_executor.cpp
@@ -273,9 +273,7 @@ Status TimeSharingTaskExecutor::init() {
}
TimeSharingTaskExecutor::~TimeSharingTaskExecutor() {
- if (!_stopped.exchange(true)) {
- stop();
- }
+ stop();
std::vector<std::shared_ptr<PrioritizedSplitRunner>> splits_to_destroy;
{
@@ -312,6 +310,9 @@ Status TimeSharingTaskExecutor::start() {
}
void TimeSharingTaskExecutor::stop() {
+ if (_stopped.exchange(true)) {
+ return;
+ }
// Why access to doris_metrics is safe here?
// Since DorisMetrics is a singleton, it will be destroyed only after
doris_main is exited.
// The shutdown/destroy of SplitThreadPool is guaranteed to take place
before doris_main exits by
diff --git a/be/src/runtime/exec_env_init.cpp b/be/src/runtime/exec_env_init.cpp
index e3201d890a1..c010c1deeba 100644
--- a/be/src/runtime/exec_env_init.cpp
+++ b/be/src/runtime/exec_env_init.cpp
@@ -872,7 +872,8 @@ void ExecEnv::destroy() {
SAFE_STOP(_stream_load_recorder_manager);
// stop workload scheduler
SAFE_STOP(_workload_sched_mgr);
- // stop pipline step 2, cgroup execution
+ // Stop workload group execution threads before FragmentMgr. Running
pipeline tasks can still
+ // report status through FragmentMgr's async thread pool.
SAFE_STOP(_workload_group_manager);
SAFE_STOP(_external_scan_context_mgr);
@@ -885,6 +886,11 @@ void ExecEnv::destroy() {
_memtable_memory_limiter.reset();
_delta_writer_v2_pool.reset();
_load_stream_map_pool.reset();
+ // Workload group schedulers own the query pipeline, scan, and memtable
flush queues.
+ // Release them after fragment/load resources have stopped submitting
cleanup work.
+ if (_workload_group_manager) {
+ _workload_group_manager->destroy_schedulers();
+ }
SAFE_STOP(_write_cooldown_meta_executors);
// _id_manager must be destoried before tablet schema cache
diff --git a/be/src/runtime/workload_group/workload_group.cpp
b/be/src/runtime/workload_group/workload_group.cpp
index ad74647a21c..0c0bdf5f74c 100644
--- a/be/src/runtime/workload_group/workload_group.cpp
+++ b/be/src/runtime/workload_group/workload_group.cpp
@@ -743,6 +743,10 @@ int64_t WorkloadGroup::get_mem_used() {
void WorkloadGroup::try_stop_schedulers() {
std::lock_guard<std::shared_mutex> wlock(_task_sched_lock);
+ stop_schedulers_no_lock();
+}
+
+void WorkloadGroup::stop_schedulers_no_lock() {
if (_task_sched) {
_task_sched->stop();
}
@@ -766,6 +770,15 @@ void WorkloadGroup::try_stop_schedulers() {
}
}
+void WorkloadGroup::destroy_schedulers() {
+ std::lock_guard<std::shared_mutex> wlock(_task_sched_lock);
+ _task_sched.reset();
+ _scan_task_sched.reset();
+ _remote_scan_task_sched.reset();
+ _memtable_flush_pool.reset();
+ _cgroup_cpu_ctl.reset();
+}
+
void WorkloadGroup::update_memtable_flush_threads() {
if (_memtable_flush_pool == nullptr) {
return;
diff --git a/be/src/runtime/workload_group/workload_group.h
b/be/src/runtime/workload_group/workload_group.h
index 9b764664adb..2a310bf66a0 100644
--- a/be/src/runtime/workload_group/workload_group.h
+++ b/be/src/runtime/workload_group/workload_group.h
@@ -219,6 +219,8 @@ private:
void upsert_cgroup_cpu_ctl_no_lock(WorkloadGroupInfo* wg_info);
Status upsert_thread_pool_no_lock(WorkloadGroupInfo* wg_info,
std::shared_ptr<CgroupCpuCtl>
cg_cpu_ctl_ptr);
+ void stop_schedulers_no_lock();
+ void destroy_schedulers();
std::string _memory_debug_string() const;
diff --git a/be/src/runtime/workload_group/workload_group_manager.cpp
b/be/src/runtime/workload_group/workload_group_manager.cpp
index f7abad1c0c5..bfc3b94509d 100644
--- a/be/src/runtime/workload_group/workload_group_manager.cpp
+++ b/be/src/runtime/workload_group/workload_group_manager.cpp
@@ -971,6 +971,13 @@ void WorkloadGroupMgr::stop() {
}
}
+void WorkloadGroupMgr::destroy_schedulers() {
+ std::shared_lock<std::shared_mutex> r_lock(_group_mutex);
+ for (auto iter = _workload_groups.begin(); iter != _workload_groups.end();
iter++) {
+ iter->second->destroy_schedulers();
+ }
+}
+
Status WorkloadGroupMgr::create_internal_wg() {
TWorkloadGroupInfo twg_info;
twg_info.__set_id(INTERNAL_NORMAL_WG_ID);
diff --git a/be/src/runtime/workload_group/workload_group_manager.h
b/be/src/runtime/workload_group/workload_group_manager.h
index 6cd63e6e5fa..f4c4504ad08 100644
--- a/be/src/runtime/workload_group/workload_group_manager.h
+++ b/be/src/runtime/workload_group/workload_group_manager.h
@@ -81,6 +81,8 @@ public:
void stop();
+ void destroy_schedulers();
+
void refresh_workload_group_memory_state();
void get_wg_resource_usage(Block* block);
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]