This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 54780c62e0d [improvement](executor)Using cgroup to implement cpu hard 
limit (#25489)
54780c62e0d is described below

commit 54780c62e0d0e8a03cc9a45d1a7edd1386fb5ede
Author: wangbo <[email protected]>
AuthorDate: Thu Oct 19 18:56:26 2023 +0800

    [improvement](executor)Using cgroup to implement cpu hard limit (#25489)
    
    * Using cgroup to implement cpu hard limit
    
    * code style
---
 be/src/agent/cgroup_cpu_ctl.cpp                    | 25 +++++--
 be/src/agent/cgroup_cpu_ctl.h                      | 23 ++++---
 be/src/pipeline/pipeline_fragment_context.cpp      |  4 +-
 be/src/pipeline/pipeline_task.h                    |  1 +
 be/src/pipeline/task_scheduler.cpp                 | 12 ++--
 be/src/pipeline/task_scheduler.h                   |  6 +-
 be/src/runtime/exec_env.h                          | 14 +++-
 be/src/runtime/exec_env_init.cpp                   | 29 ++++----
 be/src/runtime/fragment_mgr.cpp                    | 24 ++++---
 be/src/runtime/query_context.h                     | 15 +++++
 be/src/runtime/task_group/task_group.cpp           | 11 +--
 be/src/runtime/task_group/task_group.h             |  3 +-
 be/src/runtime/task_group/task_group_manager.cpp   | 72 ++++++++++++++++++++
 be/src/runtime/task_group/task_group_manager.h     | 26 ++++++++
 be/src/vec/exec/scan/scanner_context.cpp           |  4 ++
 be/src/vec/exec/scan/scanner_context.h             |  5 ++
 be/src/vec/exec/scan/scanner_scheduler.cpp         | 16 +++--
 be/src/vec/exec/scan/scanner_scheduler.h           | 60 +++++++++++++++++
 .../resource/workloadgroup/WorkloadGroupMgr.java   | 78 +++-------------------
 19 files changed, 297 insertions(+), 131 deletions(-)

diff --git a/be/src/agent/cgroup_cpu_ctl.cpp b/be/src/agent/cgroup_cpu_ctl.cpp
index d8a18c8c130..4a9aed2bb0f 100644
--- a/be/src/agent/cgroup_cpu_ctl.cpp
+++ b/be/src/agent/cgroup_cpu_ctl.cpp
@@ -86,14 +86,25 @@ Status CgroupV1CpuCtl::init() {
         }
     }
 
+    // workload group path
+    _cgroup_v1_cpu_tg_path = _cgroup_v1_cpu_query_path + "/" + 
std::to_string(_tg_id);
+    if (access(_cgroup_v1_cpu_tg_path.c_str(), F_OK) != 0) {
+        int ret = mkdir(_cgroup_v1_cpu_tg_path.c_str(), S_IRWXU);
+        if (ret != 0) {
+            LOG(ERROR) << "cgroup v1 mkdir workload group failed, path=" << 
_cgroup_v1_cpu_tg_path;
+            return Status::InternalError("cgroup v1 mkdir workload group 
failed, path=",
+                                         _cgroup_v1_cpu_tg_path);
+        }
+    }
+
     // quota path
-    _cgroup_v1_cpu_query_quota_path = _cgroup_v1_cpu_query_path + 
"/cpu.cfs_quota_us";
+    _cgroup_v1_cpu_tg_quota_file = _cgroup_v1_cpu_tg_path + 
"/cpu.cfs_quota_us";
     // task path
-    _cgroup_v1_cpu_query_task_path = _cgroup_v1_cpu_query_path + "/tasks";
+    _cgroup_v1_cpu_tg_task_file = _cgroup_v1_cpu_tg_path + "/tasks";
     LOG(INFO) << "cgroup v1 cpu path init success"
-              << ", query path=" << _cgroup_v1_cpu_query_path
-              << ", query quota path=" << _cgroup_v1_cpu_query_quota_path
-              << ", query tasks path=" << _cgroup_v1_cpu_query_task_path
+              << ", query tg path=" << _cgroup_v1_cpu_tg_path
+              << ", query tg quota file path=" << _cgroup_v1_cpu_tg_quota_file
+              << ", query tg tasks file path=" << _cgroup_v1_cpu_tg_task_file
               << ", core num=" << _cpu_core_num;
     _init_succ = true;
     return Status::OK();
@@ -102,7 +113,7 @@ Status CgroupV1CpuCtl::init() {
 Status CgroupV1CpuCtl::modify_cg_cpu_hard_limit_no_lock(int cpu_hard_limit) {
     int val = _cpu_cfs_period_us * _cpu_core_num * cpu_hard_limit / 100;
     std::string msg = "modify cpu quota value to " + std::to_string(val);
-    return CgroupCpuCtl::write_cg_sys_file(_cgroup_v1_cpu_query_quota_path, 
val, msg, false);
+    return CgroupCpuCtl::write_cg_sys_file(_cgroup_v1_cpu_tg_quota_file, val, 
msg, false);
 }
 
 Status CgroupV1CpuCtl::add_thread_to_cgroup() {
@@ -112,6 +123,6 @@ Status CgroupV1CpuCtl::add_thread_to_cgroup() {
     int tid = static_cast<int>(syscall(SYS_gettid));
     std::string msg = "add thread " + std::to_string(tid) + " to group";
     std::lock_guard<std::shared_mutex> w_lock(_lock_mutex);
-    return CgroupCpuCtl::write_cg_sys_file(_cgroup_v1_cpu_query_task_path, 
tid, msg, true);
+    return CgroupCpuCtl::write_cg_sys_file(_cgroup_v1_cpu_tg_task_file, tid, 
msg, true);
 }
 } // namespace doris
diff --git a/be/src/agent/cgroup_cpu_ctl.h b/be/src/agent/cgroup_cpu_ctl.h
index 21eb367e444..c3a30660147 100644
--- a/be/src/agent/cgroup_cpu_ctl.h
+++ b/be/src/agent/cgroup_cpu_ctl.h
@@ -30,8 +30,8 @@ namespace doris {
 
 class CgroupCpuCtl {
 public:
-    CgroupCpuCtl() {}
-    virtual ~CgroupCpuCtl() {}
+    virtual ~CgroupCpuCtl() = default;
+    CgroupCpuCtl(uint64_t tg_id) { _tg_id = tg_id; }
 
     virtual Status init();
 
@@ -50,6 +50,7 @@ protected:
     uint64_t _cpu_hard_limit = 0;
     std::shared_mutex _lock_mutex;
     bool _init_succ = false;
+    uint64_t _tg_id; // workload group id
 };
 
 /*
@@ -66,23 +67,27 @@ protected:
     4 doris query path
         /sys/fs/cgroup/cpu/{doris_home}/query
     
-    5 doris query quota file:
-        /sys/fs/cgroup/cpu/{doris_home}/query/cpu.cfs_quota_us
+    5 workload group path
+        /sys/fs/cgroup/cpu/{doris_home}/query/{workload group id}
     
-    6 doris query tasks file:
-        /sys/fs/cgroup/cpu/{doris_home}/query/tasks
+    6 workload group quota file:
+        /sys/fs/cgroup/cpu/{doris_home}/query/{workload group 
id}/cpu.cfs_quota_us
+    
+     7 workload group tasks file:
+        /sys/fs/cgroup/cpu/{doris_home}/query/{workload group id}/tasks
 */
 class CgroupV1CpuCtl : public CgroupCpuCtl {
 public:
+    CgroupV1CpuCtl(uint64_t tg_id) : CgroupCpuCtl(tg_id) {}
     Status init() override;
     Status modify_cg_cpu_hard_limit_no_lock(int cpu_hard_limit) override;
     Status add_thread_to_cgroup() override;
 
 private:
-    // todo(wb) support load/compaction path
     std::string _cgroup_v1_cpu_query_path;
-    std::string _cgroup_v1_cpu_query_quota_path;
-    std::string _cgroup_v1_cpu_query_task_path;
+    std::string _cgroup_v1_cpu_tg_path; // workload group path
+    std::string _cgroup_v1_cpu_tg_quota_file;
+    std::string _cgroup_v1_cpu_tg_task_file;
 };
 
 } // namespace doris
\ No newline at end of file
diff --git a/be/src/pipeline/pipeline_fragment_context.cpp 
b/be/src/pipeline/pipeline_fragment_context.cpp
index a9eedb3a489..22a533c233e 100644
--- a/be/src/pipeline/pipeline_fragment_context.cpp
+++ b/be/src/pipeline/pipeline_fragment_context.cpp
@@ -687,7 +687,9 @@ Status PipelineFragmentContext::submit() {
     int submit_tasks = 0;
     Status st;
     auto* scheduler = _exec_env->pipeline_task_scheduler();
-    if (_task_group_entity) {
+    if (_query_ctx->get_task_scheduler()) {
+        scheduler = _query_ctx->get_task_scheduler();
+    } else if (_task_group_entity) {
         scheduler = _exec_env->pipeline_task_group_scheduler();
     }
     for (auto& task : _tasks) {
diff --git a/be/src/pipeline/pipeline_task.h b/be/src/pipeline/pipeline_task.h
index 1f517d6c94f..417fab35b97 100644
--- a/be/src/pipeline/pipeline_task.h
+++ b/be/src/pipeline/pipeline_task.h
@@ -193,6 +193,7 @@ public:
     taskgroup::TaskGroupPipelineTaskEntity* get_task_group_entity() const;
 
     void set_task_queue(TaskQueue* task_queue);
+    TaskQueue* get_task_queue() { return _task_queue; }
 
     static constexpr auto THREAD_TIME_SLICE = 100'000'000ULL;
     static constexpr auto THREAD_TIME_SLICE_US = 100000L; // 100ms
diff --git a/be/src/pipeline/task_scheduler.cpp 
b/be/src/pipeline/task_scheduler.cpp
index 1a44c57a74f..c58e4c5cf07 100644
--- a/be/src/pipeline/task_scheduler.cpp
+++ b/be/src/pipeline/task_scheduler.cpp
@@ -45,14 +45,12 @@
 
 namespace doris::pipeline {
 
-BlockedTaskScheduler::BlockedTaskScheduler(std::shared_ptr<TaskQueue> 
task_queue)
-        : _task_queue(std::move(task_queue)), _started(false), 
_shutdown(false) {}
+BlockedTaskScheduler::BlockedTaskScheduler() : _started(false), 
_shutdown(false) {}
 
-Status BlockedTaskScheduler::start() {
+Status BlockedTaskScheduler::start(std::string sche_name) {
     LOG(INFO) << "BlockedTaskScheduler start";
     RETURN_IF_ERROR(Thread::create(
-            "BlockedTaskScheduler", "schedule_blocked_pipeline", [this]() { 
this->_schedule(); },
-            &_thread));
+            "BlockedTaskScheduler", sche_name, [this]() { this->_schedule(); 
}, &_thread));
     while (!this->_started.load()) {
         std::this_thread::sleep_for(std::chrono::milliseconds(5));
     }
@@ -185,7 +183,7 @@ void 
BlockedTaskScheduler::_make_task_run(std::list<PipelineTask*>& local_tasks,
     auto task = *task_itr;
     task->set_state(t_state);
     local_tasks.erase(task_itr++);
-    static_cast<void>(_task_queue->push_back(task));
+    static_cast<void>(task->get_task_queue()->push_back(task));
 }
 
 TaskScheduler::~TaskScheduler() {
@@ -207,7 +205,7 @@ Status TaskScheduler::start() {
         RETURN_IF_ERROR(
                 
_fix_thread_pool->submit_func(std::bind(&TaskScheduler::_do_work, this, i)));
     }
-    return _blocked_task_scheduler->start();
+    return Status::OK();
 }
 
 Status TaskScheduler::schedule_task(PipelineTask* task) {
diff --git a/be/src/pipeline/task_scheduler.h b/be/src/pipeline/task_scheduler.h
index 943b11a8a21..9b85ec420ee 100644
--- a/be/src/pipeline/task_scheduler.h
+++ b/be/src/pipeline/task_scheduler.h
@@ -46,17 +46,15 @@ namespace doris::pipeline {
 
 class BlockedTaskScheduler {
 public:
-    explicit BlockedTaskScheduler(std::shared_ptr<TaskQueue> task_queue);
+    explicit BlockedTaskScheduler();
 
     ~BlockedTaskScheduler() = default;
 
-    Status start();
+    Status start(std::string sche_name);
     void shutdown();
     Status add_blocked_task(PipelineTask* task);
 
 private:
-    std::shared_ptr<TaskQueue> _task_queue;
-
     std::mutex _task_mutex;
     std::condition_variable _task_cond;
     std::list<PipelineTask*> _blocked_tasks;
diff --git a/be/src/runtime/exec_env.h b/be/src/runtime/exec_env.h
index c1e1e77f87b..c6aa8216d90 100644
--- a/be/src/runtime/exec_env.h
+++ b/be/src/runtime/exec_env.h
@@ -45,7 +45,8 @@ class DeltaWriterV2Pool;
 } // namespace vectorized
 namespace pipeline {
 class TaskScheduler;
-}
+class BlockedTaskScheduler;
+} // namespace pipeline
 namespace taskgroup {
 class TaskGroupManager;
 }
@@ -268,7 +269,9 @@ public:
         return _inverted_index_query_cache;
     }
 
-    CgroupCpuCtl* get_cgroup_cpu_ctl() { return _cgroup_cpu_ctl.get(); }
+    std::shared_ptr<doris::pipeline::BlockedTaskScheduler> 
get_global_block_scheduler() {
+        return _global_block_scheduler;
+    }
 
 private:
     ExecEnv();
@@ -375,7 +378,12 @@ private:
     segment_v2::InvertedIndexSearcherCache* _inverted_index_searcher_cache = 
nullptr;
     segment_v2::InvertedIndexQueryCache* _inverted_index_query_cache = nullptr;
 
-    std::unique_ptr<CgroupCpuCtl> _cgroup_cpu_ctl = nullptr;
+    // used for query with group cpu hard limit
+    std::shared_ptr<doris::pipeline::BlockedTaskScheduler> 
_global_block_scheduler;
+    // used for query without workload group
+    std::shared_ptr<doris::pipeline::BlockedTaskScheduler> 
_without_group_block_scheduler;
+    // used for query with workload group cpu soft limit
+    std::shared_ptr<doris::pipeline::BlockedTaskScheduler> 
_with_group_block_scheduler;
 };
 
 template <>
diff --git a/be/src/runtime/exec_env_init.cpp b/be/src/runtime/exec_env_init.cpp
index d9f7f31f7d2..c526c836ef5 100644
--- a/be/src/runtime/exec_env_init.cpp
+++ b/be/src/runtime/exec_env_init.cpp
@@ -280,29 +280,23 @@ Status ExecEnv::init_pipeline_task_scheduler() {
         executors_size = CpuInfo::num_cores();
     }
 
-    if (!config::doris_cgroup_cpu_path.empty()) {
-        _cgroup_cpu_ctl = std::make_unique<CgroupV1CpuCtl>();
-        Status ret = _cgroup_cpu_ctl->init();
-        if (!ret.ok()) {
-            LOG(ERROR) << "init cgroup cpu controller failed";
-        }
-    } else {
-        LOG(INFO) << "cgroup cpu controller is not inited";
-    }
-
     // TODO pipeline task group combie two blocked schedulers.
     auto t_queue = 
std::make_shared<pipeline::MultiCoreTaskQueue>(executors_size);
-    auto b_scheduler = 
std::make_shared<pipeline::BlockedTaskScheduler>(t_queue);
-    _pipeline_task_scheduler = new pipeline::TaskScheduler(this, b_scheduler, 
t_queue,
-                                                           
"WithoutGroupTaskSchePool", nullptr);
+    _without_group_block_scheduler = 
std::make_shared<pipeline::BlockedTaskScheduler>();
+    _pipeline_task_scheduler = new pipeline::TaskScheduler(
+            this, _without_group_block_scheduler, t_queue, 
"WithoutGroupTaskSchePool", nullptr);
     RETURN_IF_ERROR(_pipeline_task_scheduler->start());
+    
RETURN_IF_ERROR(_without_group_block_scheduler->start("WithoutGroupBlockSche"));
 
     auto tg_queue = 
std::make_shared<pipeline::TaskGroupTaskQueue>(executors_size);
-    auto tg_b_scheduler = 
std::make_shared<pipeline::BlockedTaskScheduler>(tg_queue);
+    _with_group_block_scheduler = 
std::make_shared<pipeline::BlockedTaskScheduler>();
     _pipeline_task_group_scheduler = new pipeline::TaskScheduler(
-            this, tg_b_scheduler, tg_queue, "WithGroupTaskSchePool", 
_cgroup_cpu_ctl.get());
+            this, _with_group_block_scheduler, tg_queue, 
"WithGroupTaskSchePool", nullptr);
     RETURN_IF_ERROR(_pipeline_task_group_scheduler->start());
+    RETURN_IF_ERROR(_with_group_block_scheduler->start("WithGroupBlockSche"));
 
+    _global_block_scheduler = 
std::make_shared<pipeline::BlockedTaskScheduler>();
+    RETURN_IF_ERROR(_global_block_scheduler->start("GlobalBlockSche"));
     return Status::OK();
 }
 
@@ -547,6 +541,7 @@ void ExecEnv::destroy() {
     SAFE_STOP(_routine_load_task_executor);
     SAFE_STOP(_pipeline_task_scheduler);
     SAFE_STOP(_pipeline_task_group_scheduler);
+    SAFE_STOP(_task_group_manager);
     SAFE_STOP(_external_scan_context_mgr);
     SAFE_STOP(_fragment_mgr);
     // NewLoadStreamMgr should be destoried before storage_engine & after 
fragment_mgr stopped.
@@ -644,6 +639,10 @@ void ExecEnv::destroy() {
     // info is deconstructed then BE process will core at coordinator back 
method in fragment mgr.
     SAFE_DELETE(_master_info);
 
+    SAFE_SHUTDOWN(_global_block_scheduler.get());
+    SAFE_SHUTDOWN(_without_group_block_scheduler.get());
+    SAFE_SHUTDOWN(_with_group_block_scheduler.get());
+
     LOG(INFO) << "Doris exec envorinment is destoried.";
 }
 
diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp
index e3be7b94839..71297e0d4c3 100644
--- a/be/src/runtime/fragment_mgr.cpp
+++ b/be/src/runtime/fragment_mgr.cpp
@@ -640,21 +640,29 @@ Status FragmentMgr::_get_query_ctx(const Params& params, 
TUniqueId query_id, boo
         if constexpr (std::is_same_v<TPipelineFragmentParams, Params>) {
             if (params.__isset.workload_groups && 
!params.workload_groups.empty()) {
                 taskgroup::TaskGroupInfo task_group_info;
-                int query_cpu_hard_limit = -1;
-                auto status = taskgroup::TaskGroupInfo::parse_group_info(
-                        params.workload_groups[0], &task_group_info, 
&query_cpu_hard_limit);
+                auto status = 
taskgroup::TaskGroupInfo::parse_group_info(params.workload_groups[0],
+                                                                         
&task_group_info);
                 if (status.ok()) {
                     auto tg = 
_exec_env->task_group_manager()->get_or_create_task_group(
                             task_group_info);
                     tg->add_mem_tracker_limiter(query_ctx->query_mem_tracker);
-                    query_ctx->set_task_group(tg);
+                    uint64_t tg_id = tg->id();
+                    std::string tg_name = tg->name();
                     LOG(INFO) << "Query/load id: " << 
print_id(query_ctx->query_id())
                               << " use task group: " << tg->debug_string()
-                              << " query_cpu_hard_limit: " << 
query_cpu_hard_limit
+                              << " cpu_hard_limit: " << 
task_group_info.cpu_hard_limit
                               << " cpu_share:" << task_group_info.cpu_share;
-                    if (query_cpu_hard_limit > 0 && 
_exec_env->get_cgroup_cpu_ctl() != nullptr) {
-                        _exec_env->get_cgroup_cpu_ctl()->update_cpu_hard_limit(
-                                query_cpu_hard_limit);
+                    if (task_group_info.cpu_hard_limit > 0) {
+                        Status ret = 
_exec_env->task_group_manager()->create_and_get_task_scheduler(
+                                tg_id, tg_name, 
task_group_info.cpu_hard_limit, _exec_env,
+                                query_ctx.get());
+                        if (!ret.ok()) {
+                            LOG(INFO) << "workload group init failed "
+                                      << ", name=" << tg_name << ", id=" << 
tg_id
+                                      << ", reason=" << ret.to_string();
+                        }
+                    } else {
+                        query_ctx->set_task_group(tg);
                     }
                 }
             } else {
diff --git a/be/src/runtime/query_context.h b/be/src/runtime/query_context.h
index 2791291bf4d..24bb52b1634 100644
--- a/be/src/runtime/query_context.h
+++ b/be/src/runtime/query_context.h
@@ -225,6 +225,18 @@ public:
 
     TUniqueId query_id() const { return _query_id; }
 
+    void set_task_scheduler(pipeline::TaskScheduler* task_scheduler) {
+        _task_scheduler = task_scheduler;
+    }
+
+    pipeline::TaskScheduler* get_task_scheduler() { return _task_scheduler; }
+
+    void set_scan_task_scheduler(vectorized::SimplifiedScanScheduler* 
scan_task_scheduler) {
+        _scan_task_scheduler = scan_task_scheduler;
+    }
+
+    vectorized::SimplifiedScanScheduler* get_scan_scheduler() { return 
_scan_task_scheduler; }
+
 public:
     DescriptorTbl* desc_tbl;
     bool set_rsc_info = false;
@@ -283,6 +295,9 @@ private:
     // All pipeline tasks use the same query context to report status. So we 
need a `_exec_status`
     // to report the real message if failed.
     Status _exec_status = Status::OK();
+
+    pipeline::TaskScheduler* _task_scheduler = nullptr;
+    vectorized::SimplifiedScanScheduler* _scan_task_scheduler = nullptr;
 };
 
 } // namespace doris
diff --git a/be/src/runtime/task_group/task_group.cpp 
b/be/src/runtime/task_group/task_group.cpp
index 758e543b034..7abc08d3c27 100644
--- a/be/src/runtime/task_group/task_group.cpp
+++ b/be/src/runtime/task_group/task_group.cpp
@@ -42,8 +42,7 @@ namespace taskgroup {
 const static std::string CPU_SHARE = "cpu_share";
 const static std::string MEMORY_LIMIT = "memory_limit";
 const static std::string ENABLE_MEMORY_OVERCOMMIT = "enable_memory_overcommit";
-const static std::string QUERY_CPU_HARD_LIMIT =
-        "query_cpu_hard_limit"; // sum of all query's cpu_hard_limit
+const static std::string CPU_HARD_LIMIT = "cpu_hard_limit";
 
 template <typename QueueType>
 TaskGroupEntity<QueueType>::TaskGroupEntity(taskgroup::TaskGroup* tg, 
std::string type)
@@ -202,7 +201,7 @@ void TaskGroup::task_group_info(TaskGroupInfo* tg_info) 
const {
 }
 
 Status TaskGroupInfo::parse_group_info(const TPipelineWorkloadGroup& 
resource_group,
-                                       TaskGroupInfo* task_group_info, int* 
query_cpu_hard_limit) {
+                                       TaskGroupInfo* task_group_info) {
     if (UNLIKELY(!check_group_info(resource_group))) {
         std::stringstream ss;
         ss << "incomplete resource group parameters: ";
@@ -215,14 +214,16 @@ Status TaskGroupInfo::parse_group_info(const 
TPipelineWorkloadGroup& resource_gr
     uint64_t share = 0;
     std::from_chars(iter->second.c_str(), iter->second.c_str() + 
iter->second.size(), share);
 
-    auto iter2 = resource_group.properties.find(QUERY_CPU_HARD_LIMIT);
+    int cpu_hard_limit = 0;
+    auto iter2 = resource_group.properties.find(CPU_HARD_LIMIT);
     std::from_chars(iter2->second.c_str(), iter2->second.c_str() + 
iter2->second.size(),
-                    *query_cpu_hard_limit);
+                    cpu_hard_limit);
 
     task_group_info->id = resource_group.id;
     task_group_info->name = resource_group.name;
     task_group_info->version = resource_group.version;
     task_group_info->cpu_share = share;
+    task_group_info->cpu_hard_limit = cpu_hard_limit;
 
     bool is_percent = true;
     auto mem_limit_str = resource_group.properties.find(MEMORY_LIMIT)->second;
diff --git a/be/src/runtime/task_group/task_group.h 
b/be/src/runtime/task_group/task_group.h
index 8dd8b75fd10..1dc634469ee 100644
--- a/be/src/runtime/task_group/task_group.h
+++ b/be/src/runtime/task_group/task_group.h
@@ -168,9 +168,10 @@ struct TaskGroupInfo {
     int64_t memory_limit;
     bool enable_memory_overcommit;
     int64_t version;
+    int cpu_hard_limit;
 
     static Status parse_group_info(const TPipelineWorkloadGroup& 
resource_group,
-                                   TaskGroupInfo* task_group_info, int* 
query_cpu_hard_limit);
+                                   TaskGroupInfo* task_group_info);
 
 private:
     static bool check_group_info(const TPipelineWorkloadGroup& resource_group);
diff --git a/be/src/runtime/task_group/task_group_manager.cpp 
b/be/src/runtime/task_group/task_group_manager.cpp
index 179bf8911a2..e6ed60148e0 100644
--- a/be/src/runtime/task_group/task_group_manager.cpp
+++ b/be/src/runtime/task_group/task_group_manager.cpp
@@ -59,4 +59,76 @@ void TaskGroupManager::get_resource_groups(const 
std::function<bool(const TaskGr
     }
 }
 
+Status TaskGroupManager::create_and_get_task_scheduler(uint64_t tg_id, 
std::string tg_name,
+                                                       int cpu_hard_limit, 
ExecEnv* exec_env,
+                                                       QueryContext* 
query_ctx_ptr) {
+    std::lock_guard<std::mutex> lock(_task_scheduler_lock);
+    // step 1: init cgroup cpu controller
+    CgroupCpuCtl* cg_cu_ctl_ptr = nullptr;
+    if (_cgroup_ctl_map.find(tg_id) == _cgroup_ctl_map.end()) {
+        std::unique_ptr<CgroupCpuCtl> cgroup_cpu_ctl = 
std::make_unique<CgroupV1CpuCtl>(tg_id);
+        Status ret = cgroup_cpu_ctl->init();
+        if (ret.ok()) {
+            cg_cu_ctl_ptr = cgroup_cpu_ctl.get();
+            _cgroup_ctl_map.emplace(tg_id, std::move(cgroup_cpu_ctl));
+        } else {
+            return Status::Error<INTERNAL_ERROR, false>("cgroup init failed, 
gid={}", tg_id);
+        }
+    }
+
+    // step 2: init task scheduler
+    if (_tg_sche_map.find(tg_id) == _tg_sche_map.end()) {
+        int32_t executors_size = config::pipeline_executor_size;
+        if (executors_size <= 0) {
+            executors_size = CpuInfo::num_cores();
+        }
+        auto task_queue = 
std::make_shared<pipeline::MultiCoreTaskQueue>(executors_size);
+
+        auto pipeline_task_scheduler = 
std::make_unique<pipeline::TaskScheduler>(
+                exec_env, exec_env->get_global_block_scheduler(), 
std::move(task_queue),
+                "Exec_" + tg_name, cg_cu_ctl_ptr);
+        Status ret = pipeline_task_scheduler->start();
+        if (ret.ok()) {
+            _tg_sche_map.emplace(tg_id, std::move(pipeline_task_scheduler));
+        } else {
+            return Status::Error<INTERNAL_ERROR, false>("task scheduler start 
failed, gid={}",
+                                                        tg_id);
+        }
+    }
+
+    // step 3: init scan scheduler
+    if (_tg_scan_sche_map.find(tg_id) == _tg_scan_sche_map.end()) {
+        auto scan_scheduler =
+                std::make_unique<vectorized::SimplifiedScanScheduler>(tg_name, 
cg_cu_ctl_ptr);
+        Status ret = scan_scheduler->start();
+        if (ret.ok()) {
+            _tg_scan_sche_map.emplace(tg_id, std::move(scan_scheduler));
+        } else {
+            return Status::Error<INTERNAL_ERROR, false>("scan scheduler start 
failed, gid={}",
+                                                        tg_id);
+        }
+    }
+
+    // step 4 set exec/scan task scheudler to query ctx
+    pipeline::TaskScheduler* task_sche = _tg_sche_map.at(tg_id).get();
+    query_ctx_ptr->set_task_scheduler(task_sche);
+
+    vectorized::SimplifiedScanScheduler* scan_task_sche = 
_tg_scan_sche_map.at(tg_id).get();
+    query_ctx_ptr->set_scan_task_scheduler(scan_task_sche);
+
+    // step 5 update cgroup cpu if needed
+    _cgroup_ctl_map.at(tg_id)->update_cpu_hard_limit(cpu_hard_limit);
+
+    return Status::OK();
+}
+
+void TaskGroupManager::stop() {
+    for (auto& task_sche : _tg_sche_map) {
+        task_sche.second->stop();
+    }
+    for (auto& task_sche : _tg_scan_sche_map) {
+        task_sche.second->stop();
+    }
+}
+
 } // namespace doris::taskgroup
diff --git a/be/src/runtime/task_group/task_group_manager.h 
b/be/src/runtime/task_group/task_group_manager.h
index baa6579b15d..e45cdeca7ea 100644
--- a/be/src/runtime/task_group/task_group_manager.h
+++ b/be/src/runtime/task_group/task_group_manager.h
@@ -21,10 +21,24 @@
 #include <shared_mutex>
 #include <unordered_map>
 
+#include "pipeline/task_queue.h"
+#include "pipeline/task_scheduler.h"
 #include "task_group.h"
 
 namespace doris {
 class ExecEnv;
+class QueryContext;
+class CgroupCpuCtl;
+
+namespace vectorized {
+class SimplifiedScanScheduler;
+}
+
+namespace pipeline {
+class TaskScheduler;
+class MultiCoreTaskQueue;
+} // namespace pipeline
+
 namespace taskgroup {
 
 class TaskGroupManager {
@@ -37,9 +51,21 @@ public:
     void get_resource_groups(const std::function<bool(const TaskGroupPtr& 
ptr)>& pred,
                              std::vector<TaskGroupPtr>* task_groups);
 
+    Status create_and_get_task_scheduler(uint64_t wg_id, std::string wg_name, 
int cpu_hard_limit,
+                                         ExecEnv* exec_env, QueryContext* 
query_ctx_ptr);
+
+    void stop();
+
 private:
     std::shared_mutex _group_mutex;
     std::unordered_map<uint64_t, TaskGroupPtr> _task_groups;
+
+    // map for workload group id and task scheduler pool
+    // used for cpu hard limit
+    std::mutex _task_scheduler_lock;
+    std::map<uint64_t, std::unique_ptr<doris::pipeline::TaskScheduler>> 
_tg_sche_map;
+    std::map<uint64_t, std::unique_ptr<vectorized::SimplifiedScanScheduler>> 
_tg_scan_sche_map;
+    std::map<uint64_t, std::unique_ptr<CgroupCpuCtl>> _cgroup_ctl_map;
 };
 
 } // namespace taskgroup
diff --git a/be/src/vec/exec/scan/scanner_context.cpp 
b/be/src/vec/exec/scan/scanner_context.cpp
index 93637599411..e1c29d569aa 100644
--- a/be/src/vec/exec/scan/scanner_context.cpp
+++ b/be/src/vec/exec/scan/scanner_context.cpp
@@ -123,6 +123,10 @@ Status ScannerContext::init() {
     // 3. get thread token
     if (_state->get_query_ctx()) {
         thread_token = _state->get_query_ctx()->get_token();
+        _simple_scan_scheduler = _state->get_query_ctx()->get_scan_scheduler();
+        if (_simple_scan_scheduler) {
+            _should_reset_thread_name = false;
+        }
     }
 #endif
 
diff --git a/be/src/vec/exec/scan/scanner_context.h 
b/be/src/vec/exec/scan/scanner_context.h
index 07f9f055517..932fd294ff9 100644
--- a/be/src/vec/exec/scan/scanner_context.h
+++ b/be/src/vec/exec/scan/scanner_context.h
@@ -57,6 +57,7 @@ namespace vectorized {
 class VScanner;
 class VScanNode;
 class ScannerScheduler;
+class SimplifiedScanScheduler;
 
 // ScannerContext is responsible for recording the execution status
 // of a group of Scanners corresponding to a ScanNode.
@@ -164,6 +165,7 @@ public:
     }
 
     taskgroup::TaskGroup* get_task_group() const;
+    SimplifiedScanScheduler* get_simple_scan_scheduler() { return 
_simple_scan_scheduler; }
 
     void reschedule_scanner_ctx();
 
@@ -173,6 +175,8 @@ public:
     ThreadPoolToken* thread_token = nullptr;
     std::vector<bthread_t> _btids;
 
+    bool _should_reset_thread_name = true;
+
 private:
     template <typename Parent>
     Status _close_and_clear_scanners(Parent* parent, RuntimeState* state);
@@ -252,6 +256,7 @@ protected:
     const int64_t _max_bytes_in_queue;
 
     doris::vectorized::ScannerScheduler* _scanner_scheduler;
+    SimplifiedScanScheduler* _simple_scan_scheduler = nullptr; // used for cpu 
hard limit
     // List "scanners" saves all "unfinished" scanners.
     // The scanner scheduler will pop scanners from this list, run scanner,
     // and then if the scanner is not finished, will be pushed back to this 
list.
diff --git a/be/src/vec/exec/scan/scanner_scheduler.cpp 
b/be/src/vec/exec/scan/scanner_scheduler.cpp
index b7f05f0ec94..7c0064a1f4c 100644
--- a/be/src/vec/exec/scan/scanner_scheduler.cpp
+++ b/be/src/vec/exec/scan/scanner_scheduler.cpp
@@ -136,7 +136,6 @@ Status ScannerScheduler::init(ExecEnv* env) {
     static_cast<void>(ThreadPoolBuilder("local_scan_group")
                               
.set_min_threads(config::doris_scanner_thread_pool_thread_num)
                               
.set_max_threads(config::doris_scanner_thread_pool_thread_num)
-                              .set_cgroup_cpu_ctl(env->get_cgroup_cpu_ctl())
                               .build(&_group_local_scan_thread_pool));
     for (int i = 0; i < config::doris_scanner_thread_pool_thread_num; i++) {
         static_cast<void>(_group_local_scan_thread_pool->submit_func([this] {
@@ -237,7 +236,13 @@ void ScannerScheduler::_schedule_scanners(ScannerContext* 
ctx) {
                 TabletStorageType type = (*iter)->get_storage_type();
                 bool ret = false;
                 if (type == TabletStorageType::STORAGE_TYPE_LOCAL) {
-                    if (ctx->get_task_group() && 
config::enable_workload_group_for_scan) {
+                    if (auto* scan_sche = ctx->get_simple_scan_scheduler()) {
+                        auto work_func = [this, scanner = *iter, ctx] {
+                            this->_scanner_scan(this, ctx, scanner);
+                        };
+                        SimplifiedScanTask simple_scan_task = {work_func, ctx};
+                        ret = 
scan_sche->get_scan_queue()->try_put(simple_scan_task);
+                    } else if (ctx->get_task_group() && 
config::enable_workload_group_for_scan) {
                         auto work_func = [this, scanner = *iter, ctx] {
                             this->_scanner_scan(this, ctx, scanner);
                         };
@@ -312,10 +317,13 @@ void ScannerScheduler::_schedule_scanners(ScannerContext* 
ctx) {
 void ScannerScheduler::_scanner_scan(ScannerScheduler* scheduler, 
ScannerContext* ctx,
                                      VScannerSPtr scanner) {
     SCOPED_ATTACH_TASK(scanner->runtime_state());
+    // for cpu hard limit, thread name should not be reset
 #if !defined(USE_BTHREAD_SCANNER)
-    Thread::set_self_name("_scanner_scan");
+    if (ctx->_should_reset_thread_name) {
+        Thread::set_self_name("_scanner_scan");
+    }
 #else
-    if (dynamic_cast<NewOlapScanner*>(scanner) == nullptr) {
+    if (dynamic_cast<NewOlapScanner*>(scanner) == nullptr && 
ctx->_should_reset_thread_name) {
         Thread::set_self_name("_scanner_scan");
     }
 #endif
diff --git a/be/src/vec/exec/scan/scanner_scheduler.h 
b/be/src/vec/exec/scan/scanner_scheduler.h
index 3a198b6c746..ad6f86c4f18 100644
--- a/be/src/vec/exec/scan/scanner_scheduler.h
+++ b/be/src/vec/exec/scan/scanner_scheduler.h
@@ -121,4 +121,64 @@ private:
             config::doris_scanner_thread_pool_thread_num + 
config::pipeline_executor_size;
 };
 
+struct SimplifiedScanTask {
+    SimplifiedScanTask() = default;
+    SimplifiedScanTask(std::function<void()> scan_func,
+                       vectorized::ScannerContext* scanner_context) {
+        this->scan_func = scan_func;
+        this->scanner_context = scanner_context;
+    }
+
+    std::function<void()> scan_func;
+    vectorized::ScannerContext* scanner_context;
+};
+
+// used for cpu hard limit
+class SimplifiedScanScheduler {
+public:
+    SimplifiedScanScheduler(std::string wg_name, CgroupCpuCtl* cgroup_cpu_ctl) 
{
+        _scan_task_queue = std::make_unique<BlockingQueue<SimplifiedScanTask>>(
+                config::doris_scanner_thread_pool_queue_size);
+        _is_stop.store(false);
+        _cgroup_cpu_ctl = cgroup_cpu_ctl;
+        _wg_name = wg_name;
+    }
+
+    void stop() {
+        _is_stop.store(true);
+        _scan_thread_pool->shutdown();
+        _scan_thread_pool->wait();
+    }
+
+    Status start() {
+        RETURN_IF_ERROR(ThreadPoolBuilder("Scan_" + _wg_name)
+                                
.set_min_threads(config::doris_scanner_thread_pool_thread_num)
+                                
.set_max_threads(config::doris_scanner_thread_pool_thread_num)
+                                .set_cgroup_cpu_ctl(_cgroup_cpu_ctl)
+                                .build(&_scan_thread_pool));
+
+        for (int i = 0; i < config::doris_scanner_thread_pool_thread_num; i++) 
{
+            RETURN_IF_ERROR(_scan_thread_pool->submit_func([this] { 
this->_work(); }));
+        }
+        return Status::OK();
+    }
+
+    BlockingQueue<SimplifiedScanTask>* get_scan_queue() { return 
_scan_task_queue.get(); }
+
+private:
+    void _work() {
+        while (!_is_stop.load()) {
+            SimplifiedScanTask scan_task;
+            _scan_task_queue->blocking_get(&scan_task);
+            scan_task.scan_func();
+        }
+    }
+
+    std::unique_ptr<ThreadPool> _scan_thread_pool;
+    std::unique_ptr<BlockingQueue<SimplifiedScanTask>> _scan_task_queue;
+    std::atomic<bool> _is_stop;
+    CgroupCpuCtl* _cgroup_cpu_ctl;
+    std::string _wg_name;
+};
+
 } // namespace doris::vectorized
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/WorkloadGroupMgr.java
 
b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/WorkloadGroupMgr.java
index 3f0053237a9..1ec47d053fe 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/WorkloadGroupMgr.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/WorkloadGroupMgr.java
@@ -72,11 +72,6 @@ public class WorkloadGroupMgr implements Writable, 
GsonPostProcessable {
     private final ResourceProcNode procNode = new ResourceProcNode();
     private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
 
-    public static final String QUERY_CPU_HARD_LIMIT = "query_cpu_hard_limit";
-    private int queryCPUHardLimit = 0;
-    // works when user not set cpu hard limit, we fill a default value
-    private int cpuHardLimitDefaultVal = 0;
-
     public WorkloadGroupMgr() {
     }
 
@@ -124,19 +119,13 @@ public class WorkloadGroupMgr implements Writable, 
GsonPostProcessable {
                 throw new UserException("Workload group " + groupName + " does 
not exist");
             }
             workloadGroups.add(workloadGroup.toThrift());
-            // note(wb) -1 to tell be no need to update cgroup
-            int thriftVal = -1;
-            if (Config.enable_cpu_hard_limit) {
-                // reset cpu_share according to cpu hard limit
-                int cpuHardLimitShare = workloadGroup.getCpuHardLimit() == 0
-                        ? this.cpuHardLimitDefaultVal : 
workloadGroup.getCpuHardLimit();
-                workloadGroups.get(0).getProperties()
-                        .put(WorkloadGroup.CPU_SHARE, 
String.valueOf(cpuHardLimitShare));
-
-                // reset sum of all groups cpu hard limit
-                thriftVal = this.queryCPUHardLimit;
+            // note(wb) -1 to tell be no need to not use cpu hard limit
+            int cpuHardLimitThriftVal = -1;
+            if (Config.enable_cpu_hard_limit && 
workloadGroup.getCpuHardLimit() > 0) {
+                cpuHardLimitThriftVal = workloadGroup.getCpuHardLimit();
             }
-            workloadGroups.get(0).getProperties().put(QUERY_CPU_HARD_LIMIT, 
String.valueOf(thriftVal));
+            
workloadGroups.get(0).getProperties().put(WorkloadGroup.CPU_HARD_LIMIT,
+                    String.valueOf(cpuHardLimitThriftVal));
             context.setWorkloadGroupName(groupName);
         } finally {
             readUnlock();
@@ -213,7 +202,6 @@ public class WorkloadGroupMgr implements Writable, 
GsonPostProcessable {
             checkGlobalUnlock(workloadGroup, null);
             nameToWorkloadGroup.put(workloadGroupName, workloadGroup);
             idToWorkloadGroup.put(workloadGroup.getId(), workloadGroup);
-            calQueryCPUHardLimit();
             
Env.getCurrentEnv().getEditLog().logCreateWorkloadGroup(workloadGroup);
         } finally {
             writeUnlock();
@@ -240,44 +228,20 @@ public class WorkloadGroupMgr implements Writable, 
GsonPostProcessable {
                             + " value can not be greater than 100% or less 
than or equal 0%");
         }
 
-        // 2, calculate new query hard cpu limit
-        int tmpCpuHardLimit = 0;
-        int zeroCpuHardLimitCount = 0;
+        // 2, check sum of all cpu hard limit
+        int sumOfAllCpuHardLimit = 0;
         for (Map.Entry<Long, WorkloadGroup> entry : 
idToWorkloadGroup.entrySet()) {
             if (old != null && entry.getKey() == old.getId()) {
                 continue;
             }
-            int cpuHardLimit = entry.getValue().getCpuHardLimit();
-            if (cpuHardLimit == 0) {
-                zeroCpuHardLimitCount++;
-            }
-            tmpCpuHardLimit += cpuHardLimit;
+            sumOfAllCpuHardLimit += entry.getValue().getCpuHardLimit();
         }
-        if (newGroupCpuHardLimit == 0) {
-            zeroCpuHardLimitCount++;
-        }
-        tmpCpuHardLimit += newGroupCpuHardLimit;
+        sumOfAllCpuHardLimit += newGroupCpuHardLimit;
 
-        if (tmpCpuHardLimit > 100) {
+        if (sumOfAllCpuHardLimit > 100) {
             throw new DdlException("sum of all workload group " + 
WorkloadGroup.CPU_HARD_LIMIT
                     + " can not be greater than 100% ");
         }
-
-        if (tmpCpuHardLimit == 100 && zeroCpuHardLimitCount > 0) {
-            throw new DdlException("some workload group may not be assigned "
-                    + "cpu hard limit but all query cpu hard limit exceeds 
100%");
-        }
-
-        int leftCpuHardLimitVal = 100 - tmpCpuHardLimit;
-        if (zeroCpuHardLimitCount != 0) {
-            int tmpCpuHardLimitDefaultVal = leftCpuHardLimitVal / 
zeroCpuHardLimitCount;
-            if (tmpCpuHardLimitDefaultVal == 0) {
-                throw new DdlException("remaining cpu can not be assigned to 
the "
-                        + "workload group without cpu hard limit value; "
-                        + leftCpuHardLimitVal + "%," + newGroupCpuHardLimit
-                        + "%," + zeroCpuHardLimitCount);
-            }
-        }
     }
 
     public void alterWorkloadGroup(AlterWorkloadGroupStmt stmt) throws 
DdlException {
@@ -296,7 +260,6 @@ public class WorkloadGroupMgr implements Writable, 
GsonPostProcessable {
             checkGlobalUnlock(newWorkloadGroup, workloadGroup);
             nameToWorkloadGroup.put(workloadGroupName, newWorkloadGroup);
             idToWorkloadGroup.put(newWorkloadGroup.getId(), newWorkloadGroup);
-            calQueryCPUHardLimit();
             
Env.getCurrentEnv().getEditLog().logAlterWorkloadGroup(newWorkloadGroup);
         } finally {
             writeUnlock();
@@ -331,7 +294,6 @@ public class WorkloadGroupMgr implements Writable, 
GsonPostProcessable {
             long groupId = workloadGroup.getId();
             idToWorkloadGroup.remove(groupId);
             nameToWorkloadGroup.remove(workloadGroupName);
-            calQueryCPUHardLimit();
             Env.getCurrentEnv().getEditLog().logDropWorkloadGroup(new 
DropWorkloadGroupOperationLog(groupId));
         } finally {
             writeUnlock();
@@ -344,7 +306,6 @@ public class WorkloadGroupMgr implements Writable, 
GsonPostProcessable {
         try {
             nameToWorkloadGroup.put(workloadGroup.getName(), workloadGroup);
             idToWorkloadGroup.put(workloadGroup.getId(), workloadGroup);
-            calQueryCPUHardLimit();
         } finally {
             writeUnlock();
         }
@@ -377,7 +338,6 @@ public class WorkloadGroupMgr implements Writable, 
GsonPostProcessable {
             WorkloadGroup workloadGroup = idToWorkloadGroup.get(id);
             nameToWorkloadGroup.remove(workloadGroup.getName());
             idToWorkloadGroup.remove(id);
-            calQueryCPUHardLimit();
         } finally {
             writeUnlock();
         }
@@ -403,21 +363,6 @@ public class WorkloadGroupMgr implements Writable, 
GsonPostProcessable {
         return idToWorkloadGroup;
     }
 
-    private void calQueryCPUHardLimit() {
-        int zeroCpuHardLimitCount = 0;
-        int ret = 0;
-        for (Map.Entry<Long, WorkloadGroup> entry : 
idToWorkloadGroup.entrySet()) {
-            if (entry.getValue().getCpuHardLimit() == 0) {
-                zeroCpuHardLimitCount++;
-            }
-            ret += entry.getValue().getCpuHardLimit();
-        }
-        this.queryCPUHardLimit = ret;
-        if (zeroCpuHardLimitCount != 0) {
-            this.cpuHardLimitDefaultVal = (100 - this.queryCPUHardLimit) / 
zeroCpuHardLimitCount;
-        }
-    }
-
     @Override
     public void write(DataOutput out) throws IOException {
         String json = GsonUtils.GSON.toJson(this);
@@ -428,7 +373,6 @@ public class WorkloadGroupMgr implements Writable, 
GsonPostProcessable {
     public void gsonPostProcess() throws IOException {
         idToWorkloadGroup.forEach(
                 (id, workloadGroup) -> 
nameToWorkloadGroup.put(workloadGroup.getName(), workloadGroup));
-        calQueryCPUHardLimit();
     }
 
     public class ResourceProcNode {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to