This is an automated email from the ASF dual-hosted git repository.
wangbo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 4c570c34b7f [Fix](executor)Release resource correctly when drop
workload group (#30279)
4c570c34b7f is described below
commit 4c570c34b7f4b2f6ff3e8b19633f26e3e679d6f6
Author: wangbo <[email protected]>
AuthorDate: Thu Jan 25 09:47:57 2024 +0800
[Fix](executor)Release resource correctly when drop workload group (#30279)
---
be/src/runtime/fragment_mgr.cpp | 8 +-
be/src/runtime/query_context.cpp | 16 ++--
be/src/runtime/query_context.h | 4 +-
be/src/runtime/task_group/task_group.h | 17 ++++
be/src/runtime/task_group/task_group_manager.cpp | 98 ++++++++++++++++--------
be/src/runtime/task_group/task_group_manager.h | 4 +
6 files changed, 106 insertions(+), 41 deletions(-)
diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp
index a76c7687f02..403c2463c2a 100644
--- a/be/src/runtime/fragment_mgr.cpp
+++ b/be/src/runtime/fragment_mgr.cpp
@@ -685,7 +685,9 @@ Status FragmentMgr::_get_query_ctx(const Params& params,
TUniqueId query_id, boo
if (params.__isset.workload_groups && !params.workload_groups.empty())
{
uint64_t tg_id = params.workload_groups[0].id;
auto* tg_mgr = _exec_env->task_group_manager();
- if (auto task_group_ptr = tg_mgr->get_task_group_by_id(tg_id)) {
+ taskgroup::TaskGroupPtr task_group_ptr = nullptr;
+ Status ret = tg_mgr->add_query_to_group(tg_id,
query_ctx->query_id(), &task_group_ptr);
+ if (ret.ok()) {
task_group_ptr->add_mem_tracker_limiter(query_ctx->query_mem_tracker);
// set task group to queryctx for memory tracker can be
removed, see QueryContext's destructor
query_ctx->set_task_group(task_group_ptr);
@@ -698,6 +700,10 @@ Status FragmentMgr::_get_query_ctx(const Params& params,
TUniqueId query_id, boo
<< ", is pipeline: " << ((int)is_pipeline)
<< ", enable cgroup soft limit: "
<< ((int)config::enable_cgroup_cpu_soft_limit);
+ } else {
+ LOG(INFO) << "Query/load id: " <<
print_id(query_ctx->query_id())
+ << " carried group info but can not find group in
be, reason: "
+ << ret.to_string();
}
}
diff --git a/be/src/runtime/query_context.cpp b/be/src/runtime/query_context.cpp
index a70bf6695ac..5b2de639d47 100644
--- a/be/src/runtime/query_context.cpp
+++ b/be/src/runtime/query_context.cpp
@@ -64,8 +64,10 @@ QueryContext::~QueryContext() {
}
if (_task_group) {
_task_group->remove_mem_tracker_limiter(query_mem_tracker);
+
_exec_env->task_group_manager()->remove_query_from_group(_task_group->id(),
_query_id);
}
+
_exec_env->runtime_query_statistics_mgr()->set_query_finished(print_id(_query_id));
LOG_INFO("Query {} deconstructed, {}", print_id(_query_id),
mem_tracker_msg);
// Not release the the thread token in query context's dector method,
because the query
// conext may be dectored in the thread token it self. It is very
dangerous and may core.
@@ -75,7 +77,6 @@ QueryContext::~QueryContext() {
static_cast<void>(ExecEnv::GetInstance()->lazy_release_obj_pool()->submit(
std::make_shared<DelayReleaseToken>(std::move(_thread_token))));
}
-
_exec_env->runtime_query_statistics_mgr()->set_query_finished(print_id(_query_id));
}
void QueryContext::set_ready_to_execute(bool is_cancelled) {
@@ -160,13 +161,14 @@ void QueryContext::set_query_scheduler(uint64_t tg_id) {
}
doris::pipeline::TaskScheduler* QueryContext::get_pipe_exec_scheduler() {
- if (!config::enable_cgroup_cpu_soft_limit) {
- return _exec_env->pipeline_task_group_scheduler();
- } else if (_task_scheduler) {
- return _task_scheduler;
- } else {
- return _exec_env->pipeline_task_scheduler();
+ if (_task_group) {
+ if (!config::enable_cgroup_cpu_soft_limit) {
+ return _exec_env->pipeline_task_group_scheduler();
+ } else if (_task_scheduler) {
+ return _task_scheduler;
+ }
}
+ return _exec_env->pipeline_task_scheduler();
}
ThreadPool* QueryContext::get_non_pipe_exec_thread_pool() {
diff --git a/be/src/runtime/query_context.h b/be/src/runtime/query_context.h
index d5a8f12cee1..1c5d98b0472 100644
--- a/be/src/runtime/query_context.h
+++ b/be/src/runtime/query_context.h
@@ -153,7 +153,9 @@ public:
void set_task_group(taskgroup::TaskGroupPtr& tg) { _task_group = tg; }
- taskgroup::TaskGroup* get_task_group() const { return _task_group.get(); }
+ taskgroup::TaskGroup* get_task_group() const {
+ return _task_group == nullptr ? nullptr : _task_group.get();
+ }
int execution_timeout() const {
return _query_options.__isset.execution_timeout ?
_query_options.execution_timeout
diff --git a/be/src/runtime/task_group/task_group.h
b/be/src/runtime/task_group/task_group.h
index 04dbf518f0d..3440bead9df 100644
--- a/be/src/runtime/task_group/task_group.h
+++ b/be/src/runtime/task_group/task_group.h
@@ -29,6 +29,7 @@
#include <unordered_set>
#include "common/status.h"
+#include "util/hash_util.hpp"
namespace doris {
@@ -141,6 +142,16 @@ public:
return _memory_limit > 0;
}
+ void add_query(TUniqueId query_id) { _query_id_set.insert(query_id); }
+
+ void remove_query(TUniqueId query_id) { _query_id_set.erase(query_id); }
+
+ void shutdown() { _is_shutdown = true; }
+
+ int query_num() { return _query_id_set.size(); }
+
+ bool is_shutdown() { return _is_shutdown; }
+
private:
mutable std::shared_mutex _mutex; // lock _name, _version, _cpu_share,
_memory_limit
const uint64_t _id;
@@ -152,6 +163,12 @@ private:
TaskGroupPipelineTaskEntity _task_entity;
std::vector<TgTrackerLimiterGroup> _mem_tracker_limiter_pool;
std::atomic<int> _cpu_hard_limit;
+
+ // means task group is mark dropped
+ // new query can not submit
+ // waiting running query to be cancelled or finish
+ bool _is_shutdown = false;
+ std::unordered_set<TUniqueId> _query_id_set;
};
using TaskGroupPtr = std::shared_ptr<TaskGroup>;
diff --git a/be/src/runtime/task_group/task_group_manager.cpp
b/be/src/runtime/task_group/task_group_manager.cpp
index 74694baa9fc..068f5eced37 100644
--- a/be/src/runtime/task_group/task_group_manager.cpp
+++ b/be/src/runtime/task_group/task_group_manager.cpp
@@ -180,68 +180,72 @@ Status
TaskGroupManager::upsert_cg_task_scheduler(taskgroup::TaskGroupInfo* tg_i
}
void TaskGroupManager::delete_task_group_by_ids(std::set<uint64_t> used_wg_id)
{
- // stop task sche may cost some time, so it should not be locked
- std::set<doris::pipeline::TaskScheduler*> task_sche_to_del;
- std::set<vectorized::SimplifiedScanScheduler*> scan_task_sche_to_del;
- std::set<ThreadPool*> non_pip_thread_pool_to_del;
+ int64_t begin_time = MonotonicMillis();
+ // 1 get delete group without running queries
std::set<uint64_t> deleted_tg_ids;
{
- std::shared_lock<std::shared_mutex> read_lock(_task_scheduler_lock);
- for (auto iter = _tg_sche_map.begin(); iter != _tg_sche_map.end();
iter++) {
+ std::lock_guard<std::shared_mutex> write_lock(_group_mutex);
+ for (auto iter = _task_groups.begin(); iter != _task_groups.end();
iter++) {
uint64_t tg_id = iter->first;
+ auto* task_group_ptr = iter->second.get();
if (used_wg_id.find(tg_id) == used_wg_id.end()) {
- task_sche_to_del.insert(_tg_sche_map[tg_id].get());
- deleted_tg_ids.insert(tg_id);
+ task_group_ptr->shutdown();
+ // only when no query running in task group, its resource can
be released in BE
+ if (task_group_ptr->query_num() == 0) {
+ deleted_tg_ids.insert(tg_id);
+ }
}
}
+ }
- for (auto iter = _tg_scan_sche_map.begin(); iter !=
_tg_scan_sche_map.end(); iter++) {
- uint64_t tg_id = iter->first;
- if (used_wg_id.find(tg_id) == used_wg_id.end()) {
- scan_task_sche_to_del.insert(_tg_scan_sche_map[tg_id].get());
+ // 2 stop active thread
+ std::vector<doris::pipeline::TaskScheduler*> task_sched_to_stop;
+ std::vector<vectorized::SimplifiedScanScheduler*> scan_task_sched_to_stop;
+ std::vector<ThreadPool*> non_pip_thread_pool_to_stop;
+ {
+ std::shared_lock<std::shared_mutex> read_lock(_task_scheduler_lock);
+ for (uint64_t tg_id : deleted_tg_ids) {
+ if (_tg_sche_map.find(tg_id) != _tg_sche_map.end()) {
+ task_sched_to_stop.emplace_back(_tg_sche_map.at(tg_id).get());
}
- }
- for (auto iter = _non_pipe_thread_pool_map.begin(); iter !=
_non_pipe_thread_pool_map.end();
- iter++) {
- uint64_t tg_id = iter->first;
- if (used_wg_id.find(tg_id) == used_wg_id.end()) {
-
non_pip_thread_pool_to_del.insert(_non_pipe_thread_pool_map[tg_id].get());
+ if (_tg_scan_sche_map.find(tg_id) != _tg_scan_sche_map.end()) {
+
scan_task_sched_to_stop.emplace_back(_tg_scan_sche_map.at(tg_id).get());
+ }
+ if (_non_pipe_thread_pool_map.find(tg_id) !=
_non_pipe_thread_pool_map.end()) {
+
non_pip_thread_pool_to_stop.emplace_back(_non_pipe_thread_pool_map.at(tg_id).get());
}
}
}
- // 1 stop all threads
- for (auto* ptr1 : task_sche_to_del) {
+ for (auto* ptr1 : task_sched_to_stop) {
ptr1->stop();
}
- for (auto* ptr2 : scan_task_sche_to_del) {
+ for (auto* ptr2 : scan_task_sched_to_stop) {
ptr2->stop();
}
- for (auto& ptr3 : non_pip_thread_pool_to_del) {
+ for (auto& ptr3 : non_pip_thread_pool_to_stop) {
ptr3->shutdown();
+ ptr3->wait();
}
- // 2 release resource in memory
+
+ // 3 release resource in memory
{
std::lock_guard<std::shared_mutex> write_lock(_task_scheduler_lock);
for (uint64_t tg_id : deleted_tg_ids) {
_tg_sche_map.erase(tg_id);
_tg_scan_sche_map.erase(tg_id);
_cgroup_ctl_map.erase(tg_id);
+ _non_pipe_thread_pool_map.erase(tg_id);
}
}
{
std::lock_guard<std::shared_mutex> write_lock(_group_mutex);
- for (auto iter = _task_groups.begin(); iter != _task_groups.end();) {
- uint64_t tg_id = iter->first;
- if (used_wg_id.find(tg_id) == used_wg_id.end()) {
- iter = _task_groups.erase(iter);
- } else {
- iter++;
- }
+ for (uint64_t tg_id : deleted_tg_ids) {
+ _task_groups.erase(tg_id);
}
}
- // 3 clear cgroup dir
+ // 4 clear cgroup dir
// NOTE(wb) currently we use rmdir to delete cgroup path,
// this action may be failed until task file is cleared which means all
thread are stopped.
// So the first time to rmdir a cgroup path may failed.
@@ -266,7 +270,37 @@ void
TaskGroupManager::delete_task_group_by_ids(std::set<uint64_t> used_wg_id) {
}
}
}
- LOG(INFO) << "finish clear unused cgroup path";
+ int64_t time_cost_ms = MonotonicMillis() - begin_time;
+ LOG(INFO) << "finish clear unused task group, time cost: " << time_cost_ms
+ << "ms, deleted group size:" << deleted_tg_ids.size();
+}
+
+Status TaskGroupManager::add_query_to_group(uint64_t tg_id, TUniqueId query_id,
+ TaskGroupPtr* tg_ptr) {
+ std::lock_guard<std::shared_mutex> write_lock(_group_mutex);
+ auto tg_iter = _task_groups.find(tg_id);
+ if (tg_iter != _task_groups.end()) {
+ if (tg_iter->second->is_shutdown()) {
+ return Status::InternalError<false>("workload group {} is
shutdown.", tg_id);
+ }
+ tg_iter->second->add_query(query_id);
+ *tg_ptr = tg_iter->second;
+ return Status::OK();
+ } else {
+ return Status::InternalError<false>("can not find workload group {}.",
tg_id);
+ }
+}
+
+void TaskGroupManager::remove_query_from_group(uint64_t tg_id, TUniqueId
query_id) {
+ std::lock_guard<std::shared_mutex> write_lock(_group_mutex);
+ auto tg_iter = _task_groups.find(tg_id);
+ if (tg_iter != _task_groups.end()) {
+ tg_iter->second->remove_query(query_id);
+ } else {
+ //NOTE: This should never happen
+ LOG(INFO) << "can not find task group when remove query, tg:" << tg_id
+ << ", query_id:" << print_id(query_id);
+ }
}
void TaskGroupManager::stop() {
diff --git a/be/src/runtime/task_group/task_group_manager.h
b/be/src/runtime/task_group/task_group_manager.h
index a7ccb52f00e..78610b4efc3 100644
--- a/be/src/runtime/task_group/task_group_manager.h
+++ b/be/src/runtime/task_group/task_group_manager.h
@@ -69,6 +69,10 @@ public:
vectorized::SimplifiedScanScheduler** scan_sched,
ThreadPool** non_pipe_thread_pool);
+ Status add_query_to_group(uint64_t tg_id, TUniqueId query_id,
TaskGroupPtr* tg_ptr);
+
+ void remove_query_from_group(uint64_t tg_id, TUniqueId query_id);
+
private:
std::shared_mutex _group_mutex;
std::unordered_map<uint64_t, TaskGroupPtr> _task_groups;
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]