This is an automated email from the ASF dual-hosted git repository. yiguolei pushed a commit to branch branch-2.1 in repository https://gitbox.apache.org/repos/asf/doris.git
commit f3a6888b60329b33fcd72cdec612afed8e722342 Author: yiguolei <[email protected]> AuthorDate: Tue Feb 6 13:50:56 2024 +0800 [refactor](queryctx) move tg related code to task group (#30829) init query ctx memtracker in queryctx constructor set all task group related property during set taskgroup --- be/src/runtime/fragment_mgr.cpp | 85 +++++------------------- be/src/runtime/query_context.cpp | 51 ++++++++++++-- be/src/runtime/query_context.h | 17 ++--- be/src/runtime/task_group/task_group.h | 32 +++++++-- be/src/runtime/task_group/task_group_manager.cpp | 28 -------- be/src/runtime/task_group/task_group_manager.h | 4 -- 6 files changed, 91 insertions(+), 126 deletions(-) diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp index 9d485361bef..3a9a7c2a88d 100644 --- a/be/src/runtime/fragment_mgr.cpp +++ b/be/src/runtime/fragment_mgr.cpp @@ -600,15 +600,13 @@ Status FragmentMgr::_get_query_ctx(const Params& params, TUniqueId query_id, boo } query_ctx = search->second; } else { - { - // Find _query_ctx_map, in case some other request has already - // create the query fragments context. - std::lock_guard<std::mutex> lock(_lock); - auto search = _query_ctx_map.find(query_id); - if (search != _query_ctx_map.end()) { - query_ctx = search->second; - return Status::OK(); - } + // Find _query_ctx_map, in case some other request has already + // create the query fragments context. + std::lock_guard<std::mutex> lock(_lock); + auto search = _query_ctx_map.find(query_id); + if (search != _query_ctx_map.end()) { + query_ctx = search->second; + return Status::OK(); } // This may be a first fragment request of the query. @@ -636,40 +634,8 @@ Status FragmentMgr::_get_query_ctx(const Params& params, TUniqueId query_id, boo } query_ctx->get_shared_hash_table_controller()->set_pipeline_engine_enabled(pipeline); - query_ctx->timeout_second = params.query_options.execution_timeout; _set_scan_concurrency(params, query_ctx.get()); - bool has_query_mem_tracker = - params.query_options.__isset.mem_limit && (params.query_options.mem_limit > 0); - int64_t bytes_limit = has_query_mem_tracker ? params.query_options.mem_limit : -1; - if (bytes_limit > MemInfo::mem_limit()) { - VLOG_NOTICE << "Query memory limit " << PrettyPrinter::print(bytes_limit, TUnit::BYTES) - << " exceeds process memory limit of " - << PrettyPrinter::print(MemInfo::mem_limit(), TUnit::BYTES) - << ". Using process memory limit instead"; - bytes_limit = MemInfo::mem_limit(); - } - if (params.query_options.query_type == TQueryType::SELECT) { - query_ctx->query_mem_tracker = std::make_shared<MemTrackerLimiter>( - MemTrackerLimiter::Type::QUERY, - fmt::format("Query#Id={}", print_id(query_ctx->query_id())), bytes_limit); - } else if (params.query_options.query_type == TQueryType::LOAD) { - query_ctx->query_mem_tracker = std::make_shared<MemTrackerLimiter>( - MemTrackerLimiter::Type::LOAD, - fmt::format("Load#Id={}", print_id(query_ctx->query_id())), bytes_limit); - } else { // EXTERNAL - query_ctx->query_mem_tracker = std::make_shared<MemTrackerLimiter>( - MemTrackerLimiter::Type::LOAD, - fmt::format("External#Id={}", print_id(query_ctx->query_id())), bytes_limit); - } - if (params.query_options.__isset.is_report_success && - params.query_options.is_report_success) { - query_ctx->query_mem_tracker->enable_print_log_usage(); - } - - query_ctx->register_memory_statistics(); - query_ctx->register_cpu_statistics(); - bool is_pipeline = false; if constexpr (std::is_same_v<TPipelineFragmentParams, Params>) { is_pipeline = true; @@ -677,16 +643,12 @@ Status FragmentMgr::_get_query_ctx(const Params& params, TUniqueId query_id, boo if (params.__isset.workload_groups && !params.workload_groups.empty()) { uint64_t tg_id = params.workload_groups[0].id; - auto* tg_mgr = _exec_env->task_group_manager(); - taskgroup::TaskGroupPtr task_group_ptr = nullptr; - Status ret = tg_mgr->add_query_to_group(tg_id, query_ctx->query_id(), &task_group_ptr); - if (ret.ok()) { - task_group_ptr->add_mem_tracker_limiter(query_ctx->query_mem_tracker); - // set task group to queryctx for memory tracker can be removed, see QueryContext's destructor - query_ctx->set_task_group(task_group_ptr); + taskgroup::TaskGroupPtr task_group_ptr = + _exec_env->task_group_manager()->get_task_group_by_id(tg_id); + if (task_group_ptr != nullptr) { + RETURN_IF_ERROR(query_ctx->set_task_group(task_group_ptr)); _exec_env->runtime_query_statistics_mgr()->set_workload_group_id(print_id(query_id), tg_id); - query_ctx->set_query_scheduler(tg_id); LOG(INFO) << "Query/load id: " << print_id(query_ctx->query_id()) << ", use task group: " << task_group_ptr->debug_string() @@ -695,26 +657,15 @@ Status FragmentMgr::_get_query_ctx(const Params& params, TUniqueId query_id, boo << ((int)config::enable_cgroup_cpu_soft_limit); } else { LOG(INFO) << "Query/load id: " << print_id(query_ctx->query_id()) - << " carried group info but can not find group in be, reason: " - << ret.to_string(); - } - } - - { - // Find _query_ctx_map again, in case some other request has already - // create the query fragments context. - std::lock_guard<std::mutex> lock(_lock); - auto search = _query_ctx_map.find(query_id); - if (search == _query_ctx_map.end()) { - _query_ctx_map.insert(std::make_pair(query_ctx->query_id(), query_ctx)); - LOG(INFO) << "Register query/load memory tracker, query/load id: " - << print_id(query_ctx->query_id()) - << " limit: " << PrettyPrinter::print(bytes_limit, TUnit::BYTES); - } else { - // Already has a query fragments context, use it - query_ctx = search->second; + << " carried group info but can not find group in be"; } } + // There is some logic in query ctx's dctor, we could not check if exists and delete the + // temp query ctx now. For example, the query id maybe removed from task group's queryset. + _query_ctx_map.insert(std::make_pair(query_ctx->query_id(), query_ctx)); + LOG(INFO) << "Register query/load memory tracker, query/load id: " + << print_id(query_ctx->query_id()) + << " limit: " << PrettyPrinter::print(query_ctx->mem_limit(), TUnit::BYTES); } return Status::OK(); } diff --git a/be/src/runtime/query_context.cpp b/be/src/runtime/query_context.cpp index c5f3ece16f8..e1c26cfcbb9 100644 --- a/be/src/runtime/query_context.cpp +++ b/be/src/runtime/query_context.cpp @@ -21,6 +21,7 @@ #include "pipeline/pipeline_x/dependency.h" #include "runtime/runtime_query_statistics_mgr.h" #include "runtime/task_group/task_group_manager.h" +#include "util/mem_info.h" namespace doris { @@ -46,6 +47,37 @@ QueryContext::QueryContext(TUniqueId query_id, int total_fragment_num, ExecEnv* pipeline::Dependency::create_unique(-1, -1, "ExecutionDependency", this); _runtime_filter_mgr.reset( new RuntimeFilterMgr(TUniqueId(), RuntimeFilterParamsContext::create(this))); + + timeout_second = query_options.execution_timeout; + + bool has_query_mem_tracker = query_options.__isset.mem_limit && (query_options.mem_limit > 0); + int64_t _bytes_limit = has_query_mem_tracker ? query_options.mem_limit : -1; + if (_bytes_limit > MemInfo::mem_limit()) { + VLOG_NOTICE << "Query memory limit " << PrettyPrinter::print(_bytes_limit, TUnit::BYTES) + << " exceeds process memory limit of " + << PrettyPrinter::print(MemInfo::mem_limit(), TUnit::BYTES) + << ". Using process memory limit instead"; + _bytes_limit = MemInfo::mem_limit(); + } + if (query_options.query_type == TQueryType::SELECT) { + query_mem_tracker = std::make_shared<MemTrackerLimiter>( + MemTrackerLimiter::Type::QUERY, fmt::format("Query#Id={}", print_id(_query_id)), + _bytes_limit); + } else if (query_options.query_type == TQueryType::LOAD) { + query_mem_tracker = std::make_shared<MemTrackerLimiter>( + MemTrackerLimiter::Type::LOAD, fmt::format("Load#Id={}", print_id(_query_id)), + _bytes_limit); + } else { // EXTERNAL + query_mem_tracker = std::make_shared<MemTrackerLimiter>( + MemTrackerLimiter::Type::LOAD, fmt::format("External#Id={}", print_id(_query_id)), + _bytes_limit); + } + if (query_options.__isset.is_report_success && query_options.is_report_success) { + query_mem_tracker->enable_print_log_usage(); + } + + register_memory_statistics(); + register_cpu_statistics(); } QueryContext::~QueryContext() { @@ -64,7 +96,7 @@ QueryContext::~QueryContext() { } if (_task_group) { _task_group->remove_mem_tracker_limiter(query_mem_tracker); - _exec_env->task_group_manager()->remove_query_from_group(_task_group->id(), _query_id); + _task_group->remove_query(_query_id); } _exec_env->runtime_query_statistics_mgr()->set_query_finished(print_id(_query_id)); @@ -154,12 +186,6 @@ void QueryContext::register_cpu_statistics() { } } -void QueryContext::set_query_scheduler(uint64_t tg_id) { - auto* tg_mgr = _exec_env->task_group_manager(); - tg_mgr->get_query_scheduler(tg_id, &_task_scheduler, &_scan_task_scheduler, - &_non_pipe_thread_pool); -} - doris::pipeline::TaskScheduler* QueryContext::get_pipe_exec_scheduler() { if (_task_group) { if (_task_scheduler) { @@ -177,4 +203,15 @@ ThreadPool* QueryContext::get_non_pipe_exec_thread_pool() { } } +Status QueryContext::set_task_group(taskgroup::TaskGroupPtr& tg) { + _task_group = tg; + // Should add query first, then the task group will not be deleted. + // see task_group_manager::delete_task_group_by_ids + RETURN_IF_ERROR(_task_group->add_query(_query_id)); + _task_group->add_mem_tracker_limiter(query_mem_tracker); + _exec_env->task_group_manager()->get_query_scheduler( + _task_group->id(), &_task_scheduler, &_scan_task_scheduler, &_non_pipe_thread_pool); + return Status::OK(); +} + } // namespace doris diff --git a/be/src/runtime/query_context.h b/be/src/runtime/query_context.h index 948b2427ddc..806470fbd4c 100644 --- a/be/src/runtime/query_context.h +++ b/be/src/runtime/query_context.h @@ -150,7 +150,7 @@ public: vectorized::RuntimePredicate& get_runtime_predicate() { return _runtime_predicate; } - void set_task_group(taskgroup::TaskGroupPtr& tg) { _task_group = tg; } + Status set_task_group(taskgroup::TaskGroupPtr& tg); int execution_timeout() const { return _query_options.__isset.execution_timeout ? _query_options.execution_timeout @@ -191,16 +191,6 @@ public: TUniqueId query_id() const { return _query_id; } - void set_task_scheduler(pipeline::TaskScheduler* task_scheduler) { - _task_scheduler = task_scheduler; - } - - pipeline::TaskScheduler* get_task_scheduler() { return _task_scheduler; } - - void set_scan_task_scheduler(vectorized::SimplifiedScanScheduler* scan_task_scheduler) { - _scan_task_scheduler = scan_task_scheduler; - } - vectorized::SimplifiedScanScheduler* get_scan_scheduler() { return _scan_task_scheduler; } pipeline::Dependency* get_execution_dependency() { return _execution_dependency.get(); } @@ -215,12 +205,12 @@ public: std::shared_ptr<QueryStatistics> get_cpu_statistics() { return _cpu_statistics; } - void set_query_scheduler(uint64_t wg_id); - doris::pipeline::TaskScheduler* get_pipe_exec_scheduler(); ThreadPool* get_non_pipe_exec_thread_pool(); + int64_t mem_limit() { return _bytes_limit; } + public: DescriptorTbl* desc_tbl = nullptr; bool set_rsc_info = false; @@ -254,6 +244,7 @@ private: TUniqueId _query_id; ExecEnv* _exec_env = nullptr; VecDateTimeValue _start_time; + int64_t _bytes_limit = 0; // A token used to submit olap scanner to the "_limited_scan_thread_pool", // This thread pool token is created from "_limited_scan_thread_pool" from exec env. diff --git a/be/src/runtime/task_group/task_group.h b/be/src/runtime/task_group/task_group.h index cf82f35fa20..647d088e817 100644 --- a/be/src/runtime/task_group/task_group.h +++ b/be/src/runtime/task_group/task_group.h @@ -29,11 +29,11 @@ #include <unordered_set> #include "common/status.h" +#include "service/backend_options.h" #include "util/hash_util.hpp" namespace doris { -class TPipelineWorkloadGroup; class MemTrackerLimiter; namespace pipeline { @@ -96,15 +96,33 @@ public: return _memory_limit > 0; } - void add_query(TUniqueId query_id) { _query_id_set.insert(query_id); } - - void remove_query(TUniqueId query_id) { _query_id_set.erase(query_id); } + Status add_query(TUniqueId query_id) { + std::unique_lock<std::shared_mutex> wlock(_mutex); + if (_is_shutdown) { + // If the task group is set shutdown, then should not run any more, + // because the scheduler pool and other pointer may be released. + return Status::InternalError( + "Failed add query to workload group, the workload group is shutdown. host: {}", + BackendOptions::get_localhost()); + } + _query_id_set.insert(query_id); + return Status::OK(); + } - void shutdown() { _is_shutdown = true; } + void remove_query(TUniqueId query_id) { + std::unique_lock<std::shared_mutex> wlock(_mutex); + _query_id_set.erase(query_id); + } - int query_num() { return _query_id_set.size(); } + void shutdown() { + std::unique_lock<std::shared_mutex> wlock(_mutex); + _is_shutdown = true; + } - bool is_shutdown() { return _is_shutdown; } + int query_num() { + std::shared_lock<std::shared_mutex> r_lock(_mutex); + return _query_id_set.size(); + } private: mutable std::shared_mutex _mutex; // lock _name, _version, _cpu_share, _memory_limit diff --git a/be/src/runtime/task_group/task_group_manager.cpp b/be/src/runtime/task_group/task_group_manager.cpp index 9a2bca4ff6a..05be653747d 100644 --- a/be/src/runtime/task_group/task_group_manager.cpp +++ b/be/src/runtime/task_group/task_group_manager.cpp @@ -276,34 +276,6 @@ void TaskGroupManager::delete_task_group_by_ids(std::set<uint64_t> used_wg_id) { << "ms, deleted group size:" << deleted_tg_ids.size(); } -Status TaskGroupManager::add_query_to_group(uint64_t tg_id, TUniqueId query_id, - TaskGroupPtr* tg_ptr) { - std::lock_guard<std::shared_mutex> write_lock(_group_mutex); - auto tg_iter = _task_groups.find(tg_id); - if (tg_iter != _task_groups.end()) { - if (tg_iter->second->is_shutdown()) { - return Status::InternalError<false>("workload group {} is shutdown.", tg_id); - } - tg_iter->second->add_query(query_id); - *tg_ptr = tg_iter->second; - return Status::OK(); - } else { - return Status::InternalError<false>("can not find workload group {}.", tg_id); - } -} - -void TaskGroupManager::remove_query_from_group(uint64_t tg_id, TUniqueId query_id) { - std::lock_guard<std::shared_mutex> write_lock(_group_mutex); - auto tg_iter = _task_groups.find(tg_id); - if (tg_iter != _task_groups.end()) { - tg_iter->second->remove_query(query_id); - } else { - //NOTE: This should never happen - LOG(INFO) << "can not find task group when remove query, tg:" << tg_id - << ", query_id:" << print_id(query_id); - } -} - void TaskGroupManager::stop() { for (auto& task_sche : _tg_sche_map) { task_sche.second->stop(); diff --git a/be/src/runtime/task_group/task_group_manager.h b/be/src/runtime/task_group/task_group_manager.h index 19e056d5258..1a1a614d068 100644 --- a/be/src/runtime/task_group/task_group_manager.h +++ b/be/src/runtime/task_group/task_group_manager.h @@ -69,10 +69,6 @@ public: vectorized::SimplifiedScanScheduler** scan_sched, ThreadPool** non_pipe_thread_pool); - Status add_query_to_group(uint64_t tg_id, TUniqueId query_id, TaskGroupPtr* tg_ptr); - - void remove_query_from_group(uint64_t tg_id, TUniqueId query_id); - private: std::shared_mutex _group_mutex; std::unordered_map<uint64_t, TaskGroupPtr> _task_groups; --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
