This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 6bf0c0e1060 [refactor](queryctx) move tg related code to task group
(#30829)
6bf0c0e1060 is described below
commit 6bf0c0e10609ba0aa57a829e585f8b393f71fe5d
Author: yiguolei <[email protected]>
AuthorDate: Tue Feb 6 13:50:56 2024 +0800
[refactor](queryctx) move tg related code to task group (#30829)
init query ctx memtracker in queryctx constructor
set all task group related property during set taskgroup
---
be/src/runtime/fragment_mgr.cpp | 85 +++++-------------------
be/src/runtime/query_context.cpp | 51 ++++++++++++--
be/src/runtime/query_context.h | 17 ++---
be/src/runtime/task_group/task_group.h | 32 +++++++--
be/src/runtime/task_group/task_group_manager.cpp | 28 --------
be/src/runtime/task_group/task_group_manager.h | 4 --
6 files changed, 91 insertions(+), 126 deletions(-)
diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp
index 9d485361bef..3a9a7c2a88d 100644
--- a/be/src/runtime/fragment_mgr.cpp
+++ b/be/src/runtime/fragment_mgr.cpp
@@ -600,15 +600,13 @@ Status FragmentMgr::_get_query_ctx(const Params& params,
TUniqueId query_id, boo
}
query_ctx = search->second;
} else {
- {
- // Find _query_ctx_map, in case some other request has already
- // create the query fragments context.
- std::lock_guard<std::mutex> lock(_lock);
- auto search = _query_ctx_map.find(query_id);
- if (search != _query_ctx_map.end()) {
- query_ctx = search->second;
- return Status::OK();
- }
+ // Find _query_ctx_map, in case some other request has already
+ // create the query fragments context.
+ std::lock_guard<std::mutex> lock(_lock);
+ auto search = _query_ctx_map.find(query_id);
+ if (search != _query_ctx_map.end()) {
+ query_ctx = search->second;
+ return Status::OK();
}
// This may be a first fragment request of the query.
@@ -636,40 +634,8 @@ Status FragmentMgr::_get_query_ctx(const Params& params,
TUniqueId query_id, boo
}
query_ctx->get_shared_hash_table_controller()->set_pipeline_engine_enabled(pipeline);
- query_ctx->timeout_second = params.query_options.execution_timeout;
_set_scan_concurrency(params, query_ctx.get());
- bool has_query_mem_tracker =
- params.query_options.__isset.mem_limit &&
(params.query_options.mem_limit > 0);
- int64_t bytes_limit = has_query_mem_tracker ?
params.query_options.mem_limit : -1;
- if (bytes_limit > MemInfo::mem_limit()) {
- VLOG_NOTICE << "Query memory limit " <<
PrettyPrinter::print(bytes_limit, TUnit::BYTES)
- << " exceeds process memory limit of "
- << PrettyPrinter::print(MemInfo::mem_limit(),
TUnit::BYTES)
- << ". Using process memory limit instead";
- bytes_limit = MemInfo::mem_limit();
- }
- if (params.query_options.query_type == TQueryType::SELECT) {
- query_ctx->query_mem_tracker = std::make_shared<MemTrackerLimiter>(
- MemTrackerLimiter::Type::QUERY,
- fmt::format("Query#Id={}",
print_id(query_ctx->query_id())), bytes_limit);
- } else if (params.query_options.query_type == TQueryType::LOAD) {
- query_ctx->query_mem_tracker = std::make_shared<MemTrackerLimiter>(
- MemTrackerLimiter::Type::LOAD,
- fmt::format("Load#Id={}",
print_id(query_ctx->query_id())), bytes_limit);
- } else { // EXTERNAL
- query_ctx->query_mem_tracker = std::make_shared<MemTrackerLimiter>(
- MemTrackerLimiter::Type::LOAD,
- fmt::format("External#Id={}",
print_id(query_ctx->query_id())), bytes_limit);
- }
- if (params.query_options.__isset.is_report_success &&
- params.query_options.is_report_success) {
- query_ctx->query_mem_tracker->enable_print_log_usage();
- }
-
- query_ctx->register_memory_statistics();
- query_ctx->register_cpu_statistics();
-
bool is_pipeline = false;
if constexpr (std::is_same_v<TPipelineFragmentParams, Params>) {
is_pipeline = true;
@@ -677,16 +643,12 @@ Status FragmentMgr::_get_query_ctx(const Params& params,
TUniqueId query_id, boo
if (params.__isset.workload_groups && !params.workload_groups.empty())
{
uint64_t tg_id = params.workload_groups[0].id;
- auto* tg_mgr = _exec_env->task_group_manager();
- taskgroup::TaskGroupPtr task_group_ptr = nullptr;
- Status ret = tg_mgr->add_query_to_group(tg_id,
query_ctx->query_id(), &task_group_ptr);
- if (ret.ok()) {
-
task_group_ptr->add_mem_tracker_limiter(query_ctx->query_mem_tracker);
- // set task group to queryctx for memory tracker can be
removed, see QueryContext's destructor
- query_ctx->set_task_group(task_group_ptr);
+ taskgroup::TaskGroupPtr task_group_ptr =
+
_exec_env->task_group_manager()->get_task_group_by_id(tg_id);
+ if (task_group_ptr != nullptr) {
+ RETURN_IF_ERROR(query_ctx->set_task_group(task_group_ptr));
_exec_env->runtime_query_statistics_mgr()->set_workload_group_id(print_id(query_id),
tg_id);
- query_ctx->set_query_scheduler(tg_id);
LOG(INFO) << "Query/load id: " <<
print_id(query_ctx->query_id())
<< ", use task group: " <<
task_group_ptr->debug_string()
@@ -695,26 +657,15 @@ Status FragmentMgr::_get_query_ctx(const Params& params,
TUniqueId query_id, boo
<< ((int)config::enable_cgroup_cpu_soft_limit);
} else {
LOG(INFO) << "Query/load id: " <<
print_id(query_ctx->query_id())
- << " carried group info but can not find group in
be, reason: "
- << ret.to_string();
- }
- }
-
- {
- // Find _query_ctx_map again, in case some other request has
already
- // create the query fragments context.
- std::lock_guard<std::mutex> lock(_lock);
- auto search = _query_ctx_map.find(query_id);
- if (search == _query_ctx_map.end()) {
- _query_ctx_map.insert(std::make_pair(query_ctx->query_id(),
query_ctx));
- LOG(INFO) << "Register query/load memory tracker, query/load
id: "
- << print_id(query_ctx->query_id())
- << " limit: " << PrettyPrinter::print(bytes_limit,
TUnit::BYTES);
- } else {
- // Already has a query fragments context, use it
- query_ctx = search->second;
+ << " carried group info but can not find group in
be";
}
}
+ // There is some logic in query ctx's dctor, we could not check if
exists and delete the
+ // temp query ctx now. For example, the query id maybe removed from
task group's queryset.
+ _query_ctx_map.insert(std::make_pair(query_ctx->query_id(),
query_ctx));
+ LOG(INFO) << "Register query/load memory tracker, query/load id: "
+ << print_id(query_ctx->query_id())
+ << " limit: " <<
PrettyPrinter::print(query_ctx->mem_limit(), TUnit::BYTES);
}
return Status::OK();
}
diff --git a/be/src/runtime/query_context.cpp b/be/src/runtime/query_context.cpp
index c5f3ece16f8..e1c26cfcbb9 100644
--- a/be/src/runtime/query_context.cpp
+++ b/be/src/runtime/query_context.cpp
@@ -21,6 +21,7 @@
#include "pipeline/pipeline_x/dependency.h"
#include "runtime/runtime_query_statistics_mgr.h"
#include "runtime/task_group/task_group_manager.h"
+#include "util/mem_info.h"
namespace doris {
@@ -46,6 +47,37 @@ QueryContext::QueryContext(TUniqueId query_id, int
total_fragment_num, ExecEnv*
pipeline::Dependency::create_unique(-1, -1, "ExecutionDependency",
this);
_runtime_filter_mgr.reset(
new RuntimeFilterMgr(TUniqueId(),
RuntimeFilterParamsContext::create(this)));
+
+ timeout_second = query_options.execution_timeout;
+
+ bool has_query_mem_tracker = query_options.__isset.mem_limit &&
(query_options.mem_limit > 0);
+ int64_t _bytes_limit = has_query_mem_tracker ? query_options.mem_limit :
-1;
+ if (_bytes_limit > MemInfo::mem_limit()) {
+ VLOG_NOTICE << "Query memory limit " <<
PrettyPrinter::print(_bytes_limit, TUnit::BYTES)
+ << " exceeds process memory limit of "
+ << PrettyPrinter::print(MemInfo::mem_limit(), TUnit::BYTES)
+ << ". Using process memory limit instead";
+ _bytes_limit = MemInfo::mem_limit();
+ }
+ if (query_options.query_type == TQueryType::SELECT) {
+ query_mem_tracker = std::make_shared<MemTrackerLimiter>(
+ MemTrackerLimiter::Type::QUERY, fmt::format("Query#Id={}",
print_id(_query_id)),
+ _bytes_limit);
+ } else if (query_options.query_type == TQueryType::LOAD) {
+ query_mem_tracker = std::make_shared<MemTrackerLimiter>(
+ MemTrackerLimiter::Type::LOAD, fmt::format("Load#Id={}",
print_id(_query_id)),
+ _bytes_limit);
+ } else { // EXTERNAL
+ query_mem_tracker = std::make_shared<MemTrackerLimiter>(
+ MemTrackerLimiter::Type::LOAD, fmt::format("External#Id={}",
print_id(_query_id)),
+ _bytes_limit);
+ }
+ if (query_options.__isset.is_report_success &&
query_options.is_report_success) {
+ query_mem_tracker->enable_print_log_usage();
+ }
+
+ register_memory_statistics();
+ register_cpu_statistics();
}
QueryContext::~QueryContext() {
@@ -64,7 +96,7 @@ QueryContext::~QueryContext() {
}
if (_task_group) {
_task_group->remove_mem_tracker_limiter(query_mem_tracker);
-
_exec_env->task_group_manager()->remove_query_from_group(_task_group->id(),
_query_id);
+ _task_group->remove_query(_query_id);
}
_exec_env->runtime_query_statistics_mgr()->set_query_finished(print_id(_query_id));
@@ -154,12 +186,6 @@ void QueryContext::register_cpu_statistics() {
}
}
-void QueryContext::set_query_scheduler(uint64_t tg_id) {
- auto* tg_mgr = _exec_env->task_group_manager();
- tg_mgr->get_query_scheduler(tg_id, &_task_scheduler, &_scan_task_scheduler,
- &_non_pipe_thread_pool);
-}
-
doris::pipeline::TaskScheduler* QueryContext::get_pipe_exec_scheduler() {
if (_task_group) {
if (_task_scheduler) {
@@ -177,4 +203,15 @@ ThreadPool* QueryContext::get_non_pipe_exec_thread_pool() {
}
}
+Status QueryContext::set_task_group(taskgroup::TaskGroupPtr& tg) {
+ _task_group = tg;
+ // Should add query first, then the task group will not be deleted.
+ // see task_group_manager::delete_task_group_by_ids
+ RETURN_IF_ERROR(_task_group->add_query(_query_id));
+ _task_group->add_mem_tracker_limiter(query_mem_tracker);
+ _exec_env->task_group_manager()->get_query_scheduler(
+ _task_group->id(), &_task_scheduler, &_scan_task_scheduler,
&_non_pipe_thread_pool);
+ return Status::OK();
+}
+
} // namespace doris
diff --git a/be/src/runtime/query_context.h b/be/src/runtime/query_context.h
index 948b2427ddc..806470fbd4c 100644
--- a/be/src/runtime/query_context.h
+++ b/be/src/runtime/query_context.h
@@ -150,7 +150,7 @@ public:
vectorized::RuntimePredicate& get_runtime_predicate() { return
_runtime_predicate; }
- void set_task_group(taskgroup::TaskGroupPtr& tg) { _task_group = tg; }
+ Status set_task_group(taskgroup::TaskGroupPtr& tg);
int execution_timeout() const {
return _query_options.__isset.execution_timeout ?
_query_options.execution_timeout
@@ -191,16 +191,6 @@ public:
TUniqueId query_id() const { return _query_id; }
- void set_task_scheduler(pipeline::TaskScheduler* task_scheduler) {
- _task_scheduler = task_scheduler;
- }
-
- pipeline::TaskScheduler* get_task_scheduler() { return _task_scheduler; }
-
- void set_scan_task_scheduler(vectorized::SimplifiedScanScheduler*
scan_task_scheduler) {
- _scan_task_scheduler = scan_task_scheduler;
- }
-
vectorized::SimplifiedScanScheduler* get_scan_scheduler() { return
_scan_task_scheduler; }
pipeline::Dependency* get_execution_dependency() { return
_execution_dependency.get(); }
@@ -215,12 +205,12 @@ public:
std::shared_ptr<QueryStatistics> get_cpu_statistics() { return
_cpu_statistics; }
- void set_query_scheduler(uint64_t wg_id);
-
doris::pipeline::TaskScheduler* get_pipe_exec_scheduler();
ThreadPool* get_non_pipe_exec_thread_pool();
+ int64_t mem_limit() { return _bytes_limit; }
+
public:
DescriptorTbl* desc_tbl = nullptr;
bool set_rsc_info = false;
@@ -254,6 +244,7 @@ private:
TUniqueId _query_id;
ExecEnv* _exec_env = nullptr;
VecDateTimeValue _start_time;
+ int64_t _bytes_limit = 0;
// A token used to submit olap scanner to the "_limited_scan_thread_pool",
// This thread pool token is created from "_limited_scan_thread_pool" from
exec env.
diff --git a/be/src/runtime/task_group/task_group.h
b/be/src/runtime/task_group/task_group.h
index cf82f35fa20..647d088e817 100644
--- a/be/src/runtime/task_group/task_group.h
+++ b/be/src/runtime/task_group/task_group.h
@@ -29,11 +29,11 @@
#include <unordered_set>
#include "common/status.h"
+#include "service/backend_options.h"
#include "util/hash_util.hpp"
namespace doris {
-class TPipelineWorkloadGroup;
class MemTrackerLimiter;
namespace pipeline {
@@ -96,15 +96,33 @@ public:
return _memory_limit > 0;
}
- void add_query(TUniqueId query_id) { _query_id_set.insert(query_id); }
-
- void remove_query(TUniqueId query_id) { _query_id_set.erase(query_id); }
+ Status add_query(TUniqueId query_id) {
+ std::unique_lock<std::shared_mutex> wlock(_mutex);
+ if (_is_shutdown) {
+ // If the task group is set shutdown, then should not run any more,
+ // because the scheduler pool and other pointer may be released.
+ return Status::InternalError(
+ "Failed add query to workload group, the workload group is
shutdown. host: {}",
+ BackendOptions::get_localhost());
+ }
+ _query_id_set.insert(query_id);
+ return Status::OK();
+ }
- void shutdown() { _is_shutdown = true; }
+ void remove_query(TUniqueId query_id) {
+ std::unique_lock<std::shared_mutex> wlock(_mutex);
+ _query_id_set.erase(query_id);
+ }
- int query_num() { return _query_id_set.size(); }
+ void shutdown() {
+ std::unique_lock<std::shared_mutex> wlock(_mutex);
+ _is_shutdown = true;
+ }
- bool is_shutdown() { return _is_shutdown; }
+ int query_num() {
+ std::shared_lock<std::shared_mutex> r_lock(_mutex);
+ return _query_id_set.size();
+ }
private:
mutable std::shared_mutex _mutex; // lock _name, _version, _cpu_share,
_memory_limit
diff --git a/be/src/runtime/task_group/task_group_manager.cpp
b/be/src/runtime/task_group/task_group_manager.cpp
index 9a2bca4ff6a..05be653747d 100644
--- a/be/src/runtime/task_group/task_group_manager.cpp
+++ b/be/src/runtime/task_group/task_group_manager.cpp
@@ -276,34 +276,6 @@ void
TaskGroupManager::delete_task_group_by_ids(std::set<uint64_t> used_wg_id) {
<< "ms, deleted group size:" << deleted_tg_ids.size();
}
-Status TaskGroupManager::add_query_to_group(uint64_t tg_id, TUniqueId query_id,
- TaskGroupPtr* tg_ptr) {
- std::lock_guard<std::shared_mutex> write_lock(_group_mutex);
- auto tg_iter = _task_groups.find(tg_id);
- if (tg_iter != _task_groups.end()) {
- if (tg_iter->second->is_shutdown()) {
- return Status::InternalError<false>("workload group {} is
shutdown.", tg_id);
- }
- tg_iter->second->add_query(query_id);
- *tg_ptr = tg_iter->second;
- return Status::OK();
- } else {
- return Status::InternalError<false>("can not find workload group {}.",
tg_id);
- }
-}
-
-void TaskGroupManager::remove_query_from_group(uint64_t tg_id, TUniqueId
query_id) {
- std::lock_guard<std::shared_mutex> write_lock(_group_mutex);
- auto tg_iter = _task_groups.find(tg_id);
- if (tg_iter != _task_groups.end()) {
- tg_iter->second->remove_query(query_id);
- } else {
- //NOTE: This should never happen
- LOG(INFO) << "can not find task group when remove query, tg:" << tg_id
- << ", query_id:" << print_id(query_id);
- }
-}
-
void TaskGroupManager::stop() {
for (auto& task_sche : _tg_sche_map) {
task_sche.second->stop();
diff --git a/be/src/runtime/task_group/task_group_manager.h
b/be/src/runtime/task_group/task_group_manager.h
index 19e056d5258..1a1a614d068 100644
--- a/be/src/runtime/task_group/task_group_manager.h
+++ b/be/src/runtime/task_group/task_group_manager.h
@@ -69,10 +69,6 @@ public:
vectorized::SimplifiedScanScheduler** scan_sched,
ThreadPool** non_pipe_thread_pool);
- Status add_query_to_group(uint64_t tg_id, TUniqueId query_id,
TaskGroupPtr* tg_ptr);
-
- void remove_query_from_group(uint64_t tg_id, TUniqueId query_id);
-
private:
std::shared_mutex _group_mutex;
std::unordered_map<uint64_t, TaskGroupPtr> _task_groups;
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]