This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new d8b4edbd60a [bugfix](wgcore) map at only get reference and it will 
core in multithread (#31702)
d8b4edbd60a is described below

commit d8b4edbd60af3cd343d4cf7740ceb7157346237f
Author: yiguolei <[email protected]>
AuthorDate: Sun Mar 3 17:42:42 2024 +0800

    [bugfix](wgcore) map at only get reference and it will core in multithread 
(#31702)
    
    map.at method only get a reference of the task group.
    in multi thread env, the task group maybe erased by another thread.
    map.at()->stop_task_schedulers will core.
    
    ---------
    
    Co-authored-by: yiguolei <[email protected]>
---
 be/src/runtime/task_group/task_group_manager.cpp | 18 ++++++++++--------
 1 file changed, 10 insertions(+), 8 deletions(-)

diff --git a/be/src/runtime/task_group/task_group_manager.cpp 
b/be/src/runtime/task_group/task_group_manager.cpp
index 819e63c855d..a336cccd3d2 100644
--- a/be/src/runtime/task_group/task_group_manager.cpp
+++ b/be/src/runtime/task_group/task_group_manager.cpp
@@ -72,33 +72,35 @@ TaskGroupPtr 
TaskGroupManager::get_task_group_by_id(uint64_t tg_id) {
 void TaskGroupManager::delete_task_group_by_ids(std::set<uint64_t> used_wg_id) 
{
     int64_t begin_time = MonotonicMillis();
     // 1 get delete group without running queries
-    std::set<uint64_t> deleted_tg_ids;
+    std::vector<TaskGroupPtr> deleted_task_groups;
     {
         std::lock_guard<std::shared_mutex> write_lock(_group_mutex);
         for (auto iter = _task_groups.begin(); iter != _task_groups.end(); 
iter++) {
             uint64_t tg_id = iter->first;
-            auto* task_group_ptr = iter->second.get();
+            auto task_group_ptr = iter->second;
             if (used_wg_id.find(tg_id) == used_wg_id.end()) {
                 task_group_ptr->shutdown();
                 // only when no query running in task group, its resource can 
be released in BE
                 if (task_group_ptr->query_num() == 0) {
                     LOG(INFO) << "There is no query in wg " << tg_id << ", 
delete it.";
-                    deleted_tg_ids.insert(tg_id);
+                    deleted_task_groups.push_back(task_group_ptr);
                 }
             }
         }
     }
 
     // 2 stop active thread
-    for (uint64_t tg_id : deleted_tg_ids) {
-        _task_groups.at(tg_id)->try_stop_schedulers();
+    for (auto& tg : deleted_task_groups) {
+        // There is not lock here, but the tg may be released by another
+        // thread, so that we should use shared ptr here, not use tg_id
+        tg->try_stop_schedulers();
     }
 
     // 3 release resource in memory
     {
         std::lock_guard<std::shared_mutex> write_lock(_group_mutex);
-        for (uint64_t tg_id : deleted_tg_ids) {
-            _task_groups.erase(tg_id);
+        for (auto& tg : deleted_task_groups) {
+            _task_groups.erase(tg->id());
         }
     }
 
@@ -129,7 +131,7 @@ void 
TaskGroupManager::delete_task_group_by_ids(std::set<uint64_t> used_wg_id) {
     }
     int64_t time_cost_ms = MonotonicMillis() - begin_time;
     LOG(INFO) << "finish clear unused task group, time cost: " << time_cost_ms
-              << "ms, deleted group size:" << deleted_tg_ids.size();
+              << "ms, deleted group size:" << deleted_task_groups.size();
 }
 
 void TaskGroupManager::stop() {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to