This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git

commit f3a6888b60329b33fcd72cdec612afed8e722342
Author: yiguolei <[email protected]>
AuthorDate: Tue Feb 6 13:50:56 2024 +0800

    [refactor](queryctx) move tg related code to task group (#30829)
    
    init query ctx memtracker in queryctx constructor
    set all task group related property during set taskgroup
---
 be/src/runtime/fragment_mgr.cpp                  | 85 +++++-------------------
 be/src/runtime/query_context.cpp                 | 51 ++++++++++++--
 be/src/runtime/query_context.h                   | 17 ++---
 be/src/runtime/task_group/task_group.h           | 32 +++++++--
 be/src/runtime/task_group/task_group_manager.cpp | 28 --------
 be/src/runtime/task_group/task_group_manager.h   |  4 --
 6 files changed, 91 insertions(+), 126 deletions(-)

diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp
index 9d485361bef..3a9a7c2a88d 100644
--- a/be/src/runtime/fragment_mgr.cpp
+++ b/be/src/runtime/fragment_mgr.cpp
@@ -600,15 +600,13 @@ Status FragmentMgr::_get_query_ctx(const Params& params, 
TUniqueId query_id, boo
         }
         query_ctx = search->second;
     } else {
-        {
-            // Find _query_ctx_map, in case some other request has already
-            // create the query fragments context.
-            std::lock_guard<std::mutex> lock(_lock);
-            auto search = _query_ctx_map.find(query_id);
-            if (search != _query_ctx_map.end()) {
-                query_ctx = search->second;
-                return Status::OK();
-            }
+        // Find _query_ctx_map, in case some other request has already
+        // create the query fragments context.
+        std::lock_guard<std::mutex> lock(_lock);
+        auto search = _query_ctx_map.find(query_id);
+        if (search != _query_ctx_map.end()) {
+            query_ctx = search->second;
+            return Status::OK();
         }
 
         // This may be a first fragment request of the query.
@@ -636,40 +634,8 @@ Status FragmentMgr::_get_query_ctx(const Params& params, 
TUniqueId query_id, boo
         }
 
         
query_ctx->get_shared_hash_table_controller()->set_pipeline_engine_enabled(pipeline);
-        query_ctx->timeout_second = params.query_options.execution_timeout;
         _set_scan_concurrency(params, query_ctx.get());
 
-        bool has_query_mem_tracker =
-                params.query_options.__isset.mem_limit && 
(params.query_options.mem_limit > 0);
-        int64_t bytes_limit = has_query_mem_tracker ? 
params.query_options.mem_limit : -1;
-        if (bytes_limit > MemInfo::mem_limit()) {
-            VLOG_NOTICE << "Query memory limit " << 
PrettyPrinter::print(bytes_limit, TUnit::BYTES)
-                        << " exceeds process memory limit of "
-                        << PrettyPrinter::print(MemInfo::mem_limit(), 
TUnit::BYTES)
-                        << ". Using process memory limit instead";
-            bytes_limit = MemInfo::mem_limit();
-        }
-        if (params.query_options.query_type == TQueryType::SELECT) {
-            query_ctx->query_mem_tracker = std::make_shared<MemTrackerLimiter>(
-                    MemTrackerLimiter::Type::QUERY,
-                    fmt::format("Query#Id={}", 
print_id(query_ctx->query_id())), bytes_limit);
-        } else if (params.query_options.query_type == TQueryType::LOAD) {
-            query_ctx->query_mem_tracker = std::make_shared<MemTrackerLimiter>(
-                    MemTrackerLimiter::Type::LOAD,
-                    fmt::format("Load#Id={}", 
print_id(query_ctx->query_id())), bytes_limit);
-        } else { // EXTERNAL
-            query_ctx->query_mem_tracker = std::make_shared<MemTrackerLimiter>(
-                    MemTrackerLimiter::Type::LOAD,
-                    fmt::format("External#Id={}", 
print_id(query_ctx->query_id())), bytes_limit);
-        }
-        if (params.query_options.__isset.is_report_success &&
-            params.query_options.is_report_success) {
-            query_ctx->query_mem_tracker->enable_print_log_usage();
-        }
-
-        query_ctx->register_memory_statistics();
-        query_ctx->register_cpu_statistics();
-
         bool is_pipeline = false;
         if constexpr (std::is_same_v<TPipelineFragmentParams, Params>) {
             is_pipeline = true;
@@ -677,16 +643,12 @@ Status FragmentMgr::_get_query_ctx(const Params& params, 
TUniqueId query_id, boo
 
         if (params.__isset.workload_groups && !params.workload_groups.empty()) 
{
             uint64_t tg_id = params.workload_groups[0].id;
-            auto* tg_mgr = _exec_env->task_group_manager();
-            taskgroup::TaskGroupPtr task_group_ptr = nullptr;
-            Status ret = tg_mgr->add_query_to_group(tg_id, 
query_ctx->query_id(), &task_group_ptr);
-            if (ret.ok()) {
-                
task_group_ptr->add_mem_tracker_limiter(query_ctx->query_mem_tracker);
-                // set task group to queryctx for memory tracker can be 
removed, see QueryContext's destructor
-                query_ctx->set_task_group(task_group_ptr);
+            taskgroup::TaskGroupPtr task_group_ptr =
+                    
_exec_env->task_group_manager()->get_task_group_by_id(tg_id);
+            if (task_group_ptr != nullptr) {
+                RETURN_IF_ERROR(query_ctx->set_task_group(task_group_ptr));
                 
_exec_env->runtime_query_statistics_mgr()->set_workload_group_id(print_id(query_id),
                                                                                
  tg_id);
-                query_ctx->set_query_scheduler(tg_id);
 
                 LOG(INFO) << "Query/load id: " << 
print_id(query_ctx->query_id())
                           << ", use task group: " << 
task_group_ptr->debug_string()
@@ -695,26 +657,15 @@ Status FragmentMgr::_get_query_ctx(const Params& params, 
TUniqueId query_id, boo
                           << ((int)config::enable_cgroup_cpu_soft_limit);
             } else {
                 LOG(INFO) << "Query/load id: " << 
print_id(query_ctx->query_id())
-                          << " carried group info but can not find group in 
be, reason: "
-                          << ret.to_string();
-            }
-        }
-
-        {
-            // Find _query_ctx_map again, in case some other request has 
already
-            // create the query fragments context.
-            std::lock_guard<std::mutex> lock(_lock);
-            auto search = _query_ctx_map.find(query_id);
-            if (search == _query_ctx_map.end()) {
-                _query_ctx_map.insert(std::make_pair(query_ctx->query_id(), 
query_ctx));
-                LOG(INFO) << "Register query/load memory tracker, query/load 
id: "
-                          << print_id(query_ctx->query_id())
-                          << " limit: " << PrettyPrinter::print(bytes_limit, 
TUnit::BYTES);
-            } else {
-                // Already has a query fragments context, use it
-                query_ctx = search->second;
+                          << " carried group info but can not find group in 
be";
             }
         }
+        // There is some logic in query ctx's dctor, we could not check if 
exists and delete the
+        // temp query ctx now. For example, the query id maybe removed from 
task group's queryset.
+        _query_ctx_map.insert(std::make_pair(query_ctx->query_id(), 
query_ctx));
+        LOG(INFO) << "Register query/load memory tracker, query/load id: "
+                  << print_id(query_ctx->query_id())
+                  << " limit: " << 
PrettyPrinter::print(query_ctx->mem_limit(), TUnit::BYTES);
     }
     return Status::OK();
 }
diff --git a/be/src/runtime/query_context.cpp b/be/src/runtime/query_context.cpp
index c5f3ece16f8..e1c26cfcbb9 100644
--- a/be/src/runtime/query_context.cpp
+++ b/be/src/runtime/query_context.cpp
@@ -21,6 +21,7 @@
 #include "pipeline/pipeline_x/dependency.h"
 #include "runtime/runtime_query_statistics_mgr.h"
 #include "runtime/task_group/task_group_manager.h"
+#include "util/mem_info.h"
 
 namespace doris {
 
@@ -46,6 +47,37 @@ QueryContext::QueryContext(TUniqueId query_id, int 
total_fragment_num, ExecEnv*
             pipeline::Dependency::create_unique(-1, -1, "ExecutionDependency", 
this);
     _runtime_filter_mgr.reset(
             new RuntimeFilterMgr(TUniqueId(), 
RuntimeFilterParamsContext::create(this)));
+
+    timeout_second = query_options.execution_timeout;
+
+    bool has_query_mem_tracker = query_options.__isset.mem_limit && 
(query_options.mem_limit > 0);
+    int64_t _bytes_limit = has_query_mem_tracker ? query_options.mem_limit : 
-1;
+    if (_bytes_limit > MemInfo::mem_limit()) {
+        VLOG_NOTICE << "Query memory limit " << 
PrettyPrinter::print(_bytes_limit, TUnit::BYTES)
+                    << " exceeds process memory limit of "
+                    << PrettyPrinter::print(MemInfo::mem_limit(), TUnit::BYTES)
+                    << ". Using process memory limit instead";
+        _bytes_limit = MemInfo::mem_limit();
+    }
+    if (query_options.query_type == TQueryType::SELECT) {
+        query_mem_tracker = std::make_shared<MemTrackerLimiter>(
+                MemTrackerLimiter::Type::QUERY, fmt::format("Query#Id={}", 
print_id(_query_id)),
+                _bytes_limit);
+    } else if (query_options.query_type == TQueryType::LOAD) {
+        query_mem_tracker = std::make_shared<MemTrackerLimiter>(
+                MemTrackerLimiter::Type::LOAD, fmt::format("Load#Id={}", 
print_id(_query_id)),
+                _bytes_limit);
+    } else { // EXTERNAL
+        query_mem_tracker = std::make_shared<MemTrackerLimiter>(
+                MemTrackerLimiter::Type::LOAD, fmt::format("External#Id={}", 
print_id(_query_id)),
+                _bytes_limit);
+    }
+    if (query_options.__isset.is_report_success && 
query_options.is_report_success) {
+        query_mem_tracker->enable_print_log_usage();
+    }
+
+    register_memory_statistics();
+    register_cpu_statistics();
 }
 
 QueryContext::~QueryContext() {
@@ -64,7 +96,7 @@ QueryContext::~QueryContext() {
     }
     if (_task_group) {
         _task_group->remove_mem_tracker_limiter(query_mem_tracker);
-        
_exec_env->task_group_manager()->remove_query_from_group(_task_group->id(), 
_query_id);
+        _task_group->remove_query(_query_id);
     }
 
     
_exec_env->runtime_query_statistics_mgr()->set_query_finished(print_id(_query_id));
@@ -154,12 +186,6 @@ void QueryContext::register_cpu_statistics() {
     }
 }
 
-void QueryContext::set_query_scheduler(uint64_t tg_id) {
-    auto* tg_mgr = _exec_env->task_group_manager();
-    tg_mgr->get_query_scheduler(tg_id, &_task_scheduler, &_scan_task_scheduler,
-                                &_non_pipe_thread_pool);
-}
-
 doris::pipeline::TaskScheduler* QueryContext::get_pipe_exec_scheduler() {
     if (_task_group) {
         if (_task_scheduler) {
@@ -177,4 +203,15 @@ ThreadPool* QueryContext::get_non_pipe_exec_thread_pool() {
     }
 }
 
+Status QueryContext::set_task_group(taskgroup::TaskGroupPtr& tg) {
+    _task_group = tg;
+    // Should add query first, then the task group will not be deleted.
+    // see task_group_manager::delete_task_group_by_ids
+    RETURN_IF_ERROR(_task_group->add_query(_query_id));
+    _task_group->add_mem_tracker_limiter(query_mem_tracker);
+    _exec_env->task_group_manager()->get_query_scheduler(
+            _task_group->id(), &_task_scheduler, &_scan_task_scheduler, 
&_non_pipe_thread_pool);
+    return Status::OK();
+}
+
 } // namespace doris
diff --git a/be/src/runtime/query_context.h b/be/src/runtime/query_context.h
index 948b2427ddc..806470fbd4c 100644
--- a/be/src/runtime/query_context.h
+++ b/be/src/runtime/query_context.h
@@ -150,7 +150,7 @@ public:
 
     vectorized::RuntimePredicate& get_runtime_predicate() { return 
_runtime_predicate; }
 
-    void set_task_group(taskgroup::TaskGroupPtr& tg) { _task_group = tg; }
+    Status set_task_group(taskgroup::TaskGroupPtr& tg);
 
     int execution_timeout() const {
         return _query_options.__isset.execution_timeout ? 
_query_options.execution_timeout
@@ -191,16 +191,6 @@ public:
 
     TUniqueId query_id() const { return _query_id; }
 
-    void set_task_scheduler(pipeline::TaskScheduler* task_scheduler) {
-        _task_scheduler = task_scheduler;
-    }
-
-    pipeline::TaskScheduler* get_task_scheduler() { return _task_scheduler; }
-
-    void set_scan_task_scheduler(vectorized::SimplifiedScanScheduler* 
scan_task_scheduler) {
-        _scan_task_scheduler = scan_task_scheduler;
-    }
-
     vectorized::SimplifiedScanScheduler* get_scan_scheduler() { return 
_scan_task_scheduler; }
 
     pipeline::Dependency* get_execution_dependency() { return 
_execution_dependency.get(); }
@@ -215,12 +205,12 @@ public:
 
     std::shared_ptr<QueryStatistics> get_cpu_statistics() { return 
_cpu_statistics; }
 
-    void set_query_scheduler(uint64_t wg_id);
-
     doris::pipeline::TaskScheduler* get_pipe_exec_scheduler();
 
     ThreadPool* get_non_pipe_exec_thread_pool();
 
+    int64_t mem_limit() { return _bytes_limit; }
+
 public:
     DescriptorTbl* desc_tbl = nullptr;
     bool set_rsc_info = false;
@@ -254,6 +244,7 @@ private:
     TUniqueId _query_id;
     ExecEnv* _exec_env = nullptr;
     VecDateTimeValue _start_time;
+    int64_t _bytes_limit = 0;
 
     // A token used to submit olap scanner to the "_limited_scan_thread_pool",
     // This thread pool token is created from "_limited_scan_thread_pool" from 
exec env.
diff --git a/be/src/runtime/task_group/task_group.h 
b/be/src/runtime/task_group/task_group.h
index cf82f35fa20..647d088e817 100644
--- a/be/src/runtime/task_group/task_group.h
+++ b/be/src/runtime/task_group/task_group.h
@@ -29,11 +29,11 @@
 #include <unordered_set>
 
 #include "common/status.h"
+#include "service/backend_options.h"
 #include "util/hash_util.hpp"
 
 namespace doris {
 
-class TPipelineWorkloadGroup;
 class MemTrackerLimiter;
 
 namespace pipeline {
@@ -96,15 +96,33 @@ public:
         return _memory_limit > 0;
     }
 
-    void add_query(TUniqueId query_id) { _query_id_set.insert(query_id); }
-
-    void remove_query(TUniqueId query_id) { _query_id_set.erase(query_id); }
+    Status add_query(TUniqueId query_id) {
+        std::unique_lock<std::shared_mutex> wlock(_mutex);
+        if (_is_shutdown) {
+            // If the task group is set shutdown, then should not run any more,
+            // because the scheduler pool and other pointer may be released.
+            return Status::InternalError(
+                    "Failed add query to workload group, the workload group is 
shutdown. host: {}",
+                    BackendOptions::get_localhost());
+        }
+        _query_id_set.insert(query_id);
+        return Status::OK();
+    }
 
-    void shutdown() { _is_shutdown = true; }
+    void remove_query(TUniqueId query_id) {
+        std::unique_lock<std::shared_mutex> wlock(_mutex);
+        _query_id_set.erase(query_id);
+    }
 
-    int query_num() { return _query_id_set.size(); }
+    void shutdown() {
+        std::unique_lock<std::shared_mutex> wlock(_mutex);
+        _is_shutdown = true;
+    }
 
-    bool is_shutdown() { return _is_shutdown; }
+    int query_num() {
+        std::shared_lock<std::shared_mutex> r_lock(_mutex);
+        return _query_id_set.size();
+    }
 
 private:
     mutable std::shared_mutex _mutex; // lock _name, _version, _cpu_share, 
_memory_limit
diff --git a/be/src/runtime/task_group/task_group_manager.cpp 
b/be/src/runtime/task_group/task_group_manager.cpp
index 9a2bca4ff6a..05be653747d 100644
--- a/be/src/runtime/task_group/task_group_manager.cpp
+++ b/be/src/runtime/task_group/task_group_manager.cpp
@@ -276,34 +276,6 @@ void 
TaskGroupManager::delete_task_group_by_ids(std::set<uint64_t> used_wg_id) {
               << "ms, deleted group size:" << deleted_tg_ids.size();
 }
 
-Status TaskGroupManager::add_query_to_group(uint64_t tg_id, TUniqueId query_id,
-                                            TaskGroupPtr* tg_ptr) {
-    std::lock_guard<std::shared_mutex> write_lock(_group_mutex);
-    auto tg_iter = _task_groups.find(tg_id);
-    if (tg_iter != _task_groups.end()) {
-        if (tg_iter->second->is_shutdown()) {
-            return Status::InternalError<false>("workload group {} is 
shutdown.", tg_id);
-        }
-        tg_iter->second->add_query(query_id);
-        *tg_ptr = tg_iter->second;
-        return Status::OK();
-    } else {
-        return Status::InternalError<false>("can not find workload group {}.", 
tg_id);
-    }
-}
-
-void TaskGroupManager::remove_query_from_group(uint64_t tg_id, TUniqueId 
query_id) {
-    std::lock_guard<std::shared_mutex> write_lock(_group_mutex);
-    auto tg_iter = _task_groups.find(tg_id);
-    if (tg_iter != _task_groups.end()) {
-        tg_iter->second->remove_query(query_id);
-    } else {
-        //NOTE: This should never happen
-        LOG(INFO) << "can not find task group when remove query, tg:" << tg_id
-                  << ", query_id:" << print_id(query_id);
-    }
-}
-
 void TaskGroupManager::stop() {
     for (auto& task_sche : _tg_sche_map) {
         task_sche.second->stop();
diff --git a/be/src/runtime/task_group/task_group_manager.h 
b/be/src/runtime/task_group/task_group_manager.h
index 19e056d5258..1a1a614d068 100644
--- a/be/src/runtime/task_group/task_group_manager.h
+++ b/be/src/runtime/task_group/task_group_manager.h
@@ -69,10 +69,6 @@ public:
                              vectorized::SimplifiedScanScheduler** scan_sched,
                              ThreadPool** non_pipe_thread_pool);
 
-    Status add_query_to_group(uint64_t tg_id, TUniqueId query_id, 
TaskGroupPtr* tg_ptr);
-
-    void remove_query_from_group(uint64_t tg_id, TUniqueId query_id);
-
 private:
     std::shared_mutex _group_mutex;
     std::unordered_map<uint64_t, TaskGroupPtr> _task_groups;


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to