This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new d8b4edbd60a [bugfix](wgcore) map at only get reference and it will
core in multithread (#31702)
d8b4edbd60a is described below
commit d8b4edbd60af3cd343d4cf7740ceb7157346237f
Author: yiguolei <[email protected]>
AuthorDate: Sun Mar 3 17:42:42 2024 +0800
[bugfix](wgcore) map at only get reference and it will core in multithread
(#31702)
map.at method only get a reference of the task group.
in multi thread env, the task group maybe erased by another thread.
map.at()->stop_task_schedulers will core.
---------
Co-authored-by: yiguolei <[email protected]>
---
be/src/runtime/task_group/task_group_manager.cpp | 18 ++++++++++--------
1 file changed, 10 insertions(+), 8 deletions(-)
diff --git a/be/src/runtime/task_group/task_group_manager.cpp
b/be/src/runtime/task_group/task_group_manager.cpp
index 819e63c855d..a336cccd3d2 100644
--- a/be/src/runtime/task_group/task_group_manager.cpp
+++ b/be/src/runtime/task_group/task_group_manager.cpp
@@ -72,33 +72,35 @@ TaskGroupPtr
TaskGroupManager::get_task_group_by_id(uint64_t tg_id) {
void TaskGroupManager::delete_task_group_by_ids(std::set<uint64_t> used_wg_id)
{
int64_t begin_time = MonotonicMillis();
// 1 get delete group without running queries
- std::set<uint64_t> deleted_tg_ids;
+ std::vector<TaskGroupPtr> deleted_task_groups;
{
std::lock_guard<std::shared_mutex> write_lock(_group_mutex);
for (auto iter = _task_groups.begin(); iter != _task_groups.end();
iter++) {
uint64_t tg_id = iter->first;
- auto* task_group_ptr = iter->second.get();
+ auto task_group_ptr = iter->second;
if (used_wg_id.find(tg_id) == used_wg_id.end()) {
task_group_ptr->shutdown();
// only when no query running in task group, its resource can
be released in BE
if (task_group_ptr->query_num() == 0) {
LOG(INFO) << "There is no query in wg " << tg_id << ",
delete it.";
- deleted_tg_ids.insert(tg_id);
+ deleted_task_groups.push_back(task_group_ptr);
}
}
}
}
// 2 stop active thread
- for (uint64_t tg_id : deleted_tg_ids) {
- _task_groups.at(tg_id)->try_stop_schedulers();
+ for (auto& tg : deleted_task_groups) {
+ // There is not lock here, but the tg may be released by another
+ // thread, so that we should use shared ptr here, not use tg_id
+ tg->try_stop_schedulers();
}
// 3 release resource in memory
{
std::lock_guard<std::shared_mutex> write_lock(_group_mutex);
- for (uint64_t tg_id : deleted_tg_ids) {
- _task_groups.erase(tg_id);
+ for (auto& tg : deleted_task_groups) {
+ _task_groups.erase(tg->id());
}
}
@@ -129,7 +131,7 @@ void
TaskGroupManager::delete_task_group_by_ids(std::set<uint64_t> used_wg_id) {
}
int64_t time_cost_ms = MonotonicMillis() - begin_time;
LOG(INFO) << "finish clear unused task group, time cost: " << time_cost_ms
- << "ms, deleted group size:" << deleted_tg_ids.size();
+ << "ms, deleted group size:" << deleted_task_groups.size();
}
void TaskGroupManager::stop() {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]