This is an automated email from the ASF dual-hosted git repository.

HappenLee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 9dae1258c9b [fix](be) Release workload group schedulers on shutdown 
(#65112)
9dae1258c9b is described below

commit 9dae1258c9bad38209ef403bf33f349b38c8ead4
Author: TengJianPing <[email protected]>
AuthorDate: Fri Jul 3 11:39:40 2026 +0800

    [fix](be) Release workload group schedulers on shutdown (#65112)
    
    Problem Summary:
    
    There are smart pointer cycle references:
    ```
      ResourceContext
        -> shared_ptr<WorkloadGroup>
        -> WorkloadGroup::_task_sched
        -> HybridTaskScheduler
        -> TaskScheduler::_task_queue
        -> MultiCoreTaskQueue
        -> PriorityTaskQueue
        -> SubTaskQueue::_queue
        -> shared_ptr<PipelineTask>
        -> ScanLocalState / ScannerContext / ScanTask / operator local state
        -> shared_ptr<ResourceContext>
    ```
    and
    ```
    ResourceContext
        -> WorkloadGroup
        -> _scan_task_sched / _remote_scan_task_sched
        -> TaskExecutorSimplifiedScanScheduler
        -> queued scan task / split runner
        -> ScannerContext / ScanTask
        -> shared_ptr<ResourceContext>
    ```
    Graceful shutdown with enable_graceful_exit_check could report ASAN/LSan
    leaks after exec env destroy. The leak stacks showed pending pipeline
    tasks in MultiCoreTaskQueue, scanner splits in TimeSharingTaskExecutor,
    memtable flush tasks in workload group ThreadPool, and
    QueryContext/ResourceContext retaining WorkloadGroup.
    
    ExecEnv stopped workload group schedulers before fragment and load
    resources were released, and WorkloadGroupMgr::stop only stopped
    scheduler threads without destroying scheduler objects or their queued
    resources. Pending queues and executor task maps could therefore retain
    query/load resources until process exit. This change clears pending
    pipeline task queues on close, makes queue close and submit mutually
    exclusive, only marks tasks runnable after a successful enqueue,
    releases scheduler-owned queues after fragment/load resources have
    stopped.
---
 be/src/exec/pipeline/task_queue.cpp                | 35 +++++++++++++++++-----
 be/src/exec/pipeline/task_queue.h                  |  2 ++
 be/src/exec/scan/scanner_scheduler.h               |  8 +++--
 .../time_sharing/time_sharing_task_executor.cpp    |  7 +++--
 be/src/runtime/exec_env_init.cpp                   |  8 ++++-
 be/src/runtime/workload_group/workload_group.cpp   | 13 ++++++++
 be/src/runtime/workload_group/workload_group.h     |  2 ++
 .../workload_group/workload_group_manager.cpp      |  7 +++++
 .../workload_group/workload_group_manager.h        |  2 ++
 9 files changed, 71 insertions(+), 13 deletions(-)

diff --git a/be/src/exec/pipeline/task_queue.cpp 
b/be/src/exec/pipeline/task_queue.cpp
index 40e6d60eb90..a9fee38b459 100644
--- a/be/src/exec/pipeline/task_queue.cpp
+++ b/be/src/exec/pipeline/task_queue.cpp
@@ -21,6 +21,7 @@
 #include <chrono> // IWYU pragma: keep
 #include <memory>
 #include <string>
+#include <vector>
 
 #include "common/logging.h"
 #include "exec/pipeline/pipeline_task.h"
@@ -37,6 +38,12 @@ PipelineTaskSPtr SubTaskQueue::try_take(bool is_steal) {
     return task;
 }
 
+std::queue<PipelineTaskSPtr> SubTaskQueue::take_all() {
+    std::queue<PipelineTaskSPtr> tasks;
+    tasks.swap(_queue);
+    return tasks;
+}
+
 ////////////////////  PriorityTaskQueue ////////////////////
 
 PriorityTaskQueue::PriorityTaskQueue() : _closed(false) {
@@ -48,10 +55,24 @@ PriorityTaskQueue::PriorityTaskQueue() : _closed(false) {
 }
 
 void PriorityTaskQueue::close() {
-    std::unique_lock<std::mutex> lock(_work_size_mutex);
-    _closed = true;
-    _wait_task.notify_all();
-    
DorisMetrics::instance()->pipeline_task_queue_size->increment(-_total_task_size);
+    std::vector<std::queue<PipelineTaskSPtr>> tasks_to_release;
+    {
+        std::unique_lock<std::mutex> lock(_work_size_mutex);
+        _closed = true;
+        const auto total_task_size = _total_task_size.exchange(0);
+        _wait_task.notify_all();
+        DorisMetrics::instance()->pipeline_task_queue_size->increment(
+                -static_cast<int64_t>(total_task_size));
+        for (auto& sub_queue : _sub_queues) {
+            tasks_to_release.emplace_back(sub_queue.take_all());
+        }
+    }
+    for (auto& tasks : tasks_to_release) {
+        while (!tasks.empty()) {
+            tasks.front()->pop_out_runnable_queue();
+            tasks.pop();
+        }
+    }
 }
 
 PipelineTaskSPtr PriorityTaskQueue::_try_take_unprotected(bool is_steal) {
@@ -113,11 +134,11 @@ PipelineTaskSPtr PriorityTaskQueue::take(uint32_t 
timeout_ms) {
 }
 
 Status PriorityTaskQueue::push(PipelineTaskSPtr task) {
+    auto level = _compute_level(task->get_runtime_ns());
+    std::unique_lock<std::mutex> lock(_work_size_mutex);
     if (_closed) {
         return Status::InternalError("WorkTaskQueue closed");
     }
-    auto level = _compute_level(task->get_runtime_ns());
-    std::unique_lock<std::mutex> lock(_work_size_mutex);
 
     // update empty queue's  runtime, to avoid too high priority
     if (_sub_queues[level].empty() &&
@@ -125,6 +146,7 @@ Status PriorityTaskQueue::push(PipelineTaskSPtr task) {
         _sub_queues[level].adjust_runtime(_queue_level_min_vruntime);
     }
 
+    task->put_in_runnable_queue();
     _sub_queues[level].push_back(task);
     _total_task_size++;
     DorisMetrics::instance()->pipeline_task_queue_size->increment(1);
@@ -199,7 +221,6 @@ Status MultiCoreTaskQueue::push_back(PipelineTaskSPtr task) 
{
 
 Status MultiCoreTaskQueue::push_back(PipelineTaskSPtr task, int core_id) {
     DCHECK(core_id < _core_size);
-    task->put_in_runnable_queue();
     return _prio_task_queues[core_id].push(task);
 }
 
diff --git a/be/src/exec/pipeline/task_queue.h 
b/be/src/exec/pipeline/task_queue.h
index ad55a4f82db..f5a33a91699 100644
--- a/be/src/exec/pipeline/task_queue.h
+++ b/be/src/exec/pipeline/task_queue.h
@@ -56,6 +56,8 @@ public:
 
     bool empty() { return _queue.empty(); }
 
+    std::queue<PipelineTaskSPtr> take_all();
+
 private:
     std::queue<PipelineTaskSPtr> _queue;
     // depends on LEVEL_QUEUE_TIME_FACTOR
diff --git a/be/src/exec/scan/scanner_scheduler.h 
b/be/src/exec/scan/scanner_scheduler.h
index bc4ac2c8075..57f4406a1ae 100644
--- a/be/src/exec/scan/scanner_scheduler.h
+++ b/be/src/exec/scan/scanner_scheduler.h
@@ -164,7 +164,9 @@ public:
     }
 
     void stop() override {
-        _is_stop.store(true);
+        if (_is_stop.exchange(true)) {
+            return;
+        }
         _scan_thread_pool->shutdown();
         _scan_thread_pool->wait();
     }
@@ -264,7 +266,9 @@ public:
     }
 
     void stop() override {
-        _is_stop.store(true);
+        if (_is_stop.exchange(true)) {
+            return;
+        }
         _task_executor->stop();
         _task_executor->wait();
     }
diff --git 
a/be/src/exec/scan/task_executor/time_sharing/time_sharing_task_executor.cpp 
b/be/src/exec/scan/task_executor/time_sharing/time_sharing_task_executor.cpp
index dc60da2e39d..05d07263d0c 100644
--- a/be/src/exec/scan/task_executor/time_sharing/time_sharing_task_executor.cpp
+++ b/be/src/exec/scan/task_executor/time_sharing/time_sharing_task_executor.cpp
@@ -273,9 +273,7 @@ Status TimeSharingTaskExecutor::init() {
 }
 
 TimeSharingTaskExecutor::~TimeSharingTaskExecutor() {
-    if (!_stopped.exchange(true)) {
-        stop();
-    }
+    stop();
 
     std::vector<std::shared_ptr<PrioritizedSplitRunner>> splits_to_destroy;
     {
@@ -312,6 +310,9 @@ Status TimeSharingTaskExecutor::start() {
 }
 
 void TimeSharingTaskExecutor::stop() {
+    if (_stopped.exchange(true)) {
+        return;
+    }
     // Why access to doris_metrics is safe here?
     // Since DorisMetrics is a singleton, it will be destroyed only after 
doris_main is exited.
     // The shutdown/destroy of SplitThreadPool is guaranteed to take place 
before doris_main exits by
diff --git a/be/src/runtime/exec_env_init.cpp b/be/src/runtime/exec_env_init.cpp
index e3201d890a1..c010c1deeba 100644
--- a/be/src/runtime/exec_env_init.cpp
+++ b/be/src/runtime/exec_env_init.cpp
@@ -872,7 +872,8 @@ void ExecEnv::destroy() {
     SAFE_STOP(_stream_load_recorder_manager);
     // stop workload scheduler
     SAFE_STOP(_workload_sched_mgr);
-    // stop pipline step 2, cgroup execution
+    // Stop workload group execution threads before FragmentMgr. Running 
pipeline tasks can still
+    // report status through FragmentMgr's async thread pool.
     SAFE_STOP(_workload_group_manager);
 
     SAFE_STOP(_external_scan_context_mgr);
@@ -885,6 +886,11 @@ void ExecEnv::destroy() {
     _memtable_memory_limiter.reset();
     _delta_writer_v2_pool.reset();
     _load_stream_map_pool.reset();
+    // Workload group schedulers own the query pipeline, scan, and memtable 
flush queues.
+    // Release them after fragment/load resources have stopped submitting 
cleanup work.
+    if (_workload_group_manager) {
+        _workload_group_manager->destroy_schedulers();
+    }
     SAFE_STOP(_write_cooldown_meta_executors);
 
     // _id_manager must be destoried before tablet schema cache
diff --git a/be/src/runtime/workload_group/workload_group.cpp 
b/be/src/runtime/workload_group/workload_group.cpp
index ad74647a21c..0c0bdf5f74c 100644
--- a/be/src/runtime/workload_group/workload_group.cpp
+++ b/be/src/runtime/workload_group/workload_group.cpp
@@ -743,6 +743,10 @@ int64_t WorkloadGroup::get_mem_used() {
 
 void WorkloadGroup::try_stop_schedulers() {
     std::lock_guard<std::shared_mutex> wlock(_task_sched_lock);
+    stop_schedulers_no_lock();
+}
+
+void WorkloadGroup::stop_schedulers_no_lock() {
     if (_task_sched) {
         _task_sched->stop();
     }
@@ -766,6 +770,15 @@ void WorkloadGroup::try_stop_schedulers() {
     }
 }
 
+void WorkloadGroup::destroy_schedulers() {
+    std::lock_guard<std::shared_mutex> wlock(_task_sched_lock);
+    _task_sched.reset();
+    _scan_task_sched.reset();
+    _remote_scan_task_sched.reset();
+    _memtable_flush_pool.reset();
+    _cgroup_cpu_ctl.reset();
+}
+
 void WorkloadGroup::update_memtable_flush_threads() {
     if (_memtable_flush_pool == nullptr) {
         return;
diff --git a/be/src/runtime/workload_group/workload_group.h 
b/be/src/runtime/workload_group/workload_group.h
index 9b764664adb..2a310bf66a0 100644
--- a/be/src/runtime/workload_group/workload_group.h
+++ b/be/src/runtime/workload_group/workload_group.h
@@ -219,6 +219,8 @@ private:
     void upsert_cgroup_cpu_ctl_no_lock(WorkloadGroupInfo* wg_info);
     Status upsert_thread_pool_no_lock(WorkloadGroupInfo* wg_info,
                                       std::shared_ptr<CgroupCpuCtl> 
cg_cpu_ctl_ptr);
+    void stop_schedulers_no_lock();
+    void destroy_schedulers();
 
     std::string _memory_debug_string() const;
 
diff --git a/be/src/runtime/workload_group/workload_group_manager.cpp 
b/be/src/runtime/workload_group/workload_group_manager.cpp
index f7abad1c0c5..bfc3b94509d 100644
--- a/be/src/runtime/workload_group/workload_group_manager.cpp
+++ b/be/src/runtime/workload_group/workload_group_manager.cpp
@@ -971,6 +971,13 @@ void WorkloadGroupMgr::stop() {
     }
 }
 
+void WorkloadGroupMgr::destroy_schedulers() {
+    std::shared_lock<std::shared_mutex> r_lock(_group_mutex);
+    for (auto iter = _workload_groups.begin(); iter != _workload_groups.end(); 
iter++) {
+        iter->second->destroy_schedulers();
+    }
+}
+
 Status WorkloadGroupMgr::create_internal_wg() {
     TWorkloadGroupInfo twg_info;
     twg_info.__set_id(INTERNAL_NORMAL_WG_ID);
diff --git a/be/src/runtime/workload_group/workload_group_manager.h 
b/be/src/runtime/workload_group/workload_group_manager.h
index 6cd63e6e5fa..f4c4504ad08 100644
--- a/be/src/runtime/workload_group/workload_group_manager.h
+++ b/be/src/runtime/workload_group/workload_group_manager.h
@@ -81,6 +81,8 @@ public:
 
     void stop();
 
+    void destroy_schedulers();
+
     void refresh_workload_group_memory_state();
 
     void get_wg_resource_usage(Block* block);


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to