This is an automated email from the ASF dual-hosted git repository.

wangbo pushed a commit to branch branch-2.0
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-2.0 by this push:
     new 002d3e63e41 [improvement](pipeline) task group scan entity (#19924) 
(#27040)
002d3e63e41 is described below

commit 002d3e63e41736c95c0cbad8787a688cb86a5a66
Author: wangbo <[email protected]>
AuthorDate: Thu Nov 16 09:42:44 2023 +0800

    [improvement](pipeline) task group scan entity (#19924) (#27040)
    
    Co-authored-by: Lijia Liu <[email protected]>
---
 be/src/common/config.cpp                           |   2 +-
 be/src/common/config.h                             |   2 +-
 be/src/pipeline/pipeline_fragment_context.cpp      |   5 +-
 be/src/pipeline/pipeline_fragment_context.h        |  10 +-
 be/src/pipeline/pipeline_task.cpp                  |  10 +-
 be/src/pipeline/pipeline_task.h                    |   9 +-
 be/src/pipeline/task_queue.cpp                     |  59 +++---
 be/src/pipeline/task_queue.h                       |  28 +--
 be/src/pipeline/task_scheduler.cpp                 |   6 +-
 be/src/pipeline/task_scheduler.h                   |   3 +-
 be/src/runtime/exec_env.h                          |   5 +
 be/src/runtime/exec_env_init.cpp                   |   3 +
 be/src/runtime/fragment_mgr.cpp                    |   2 +-
 be/src/runtime/task_group/task_group.cpp           |  98 ++++++----
 be/src/runtime/task_group/task_group.h             |  59 ++++--
 be/src/runtime/task_group/task_group_manager.cpp   |   8 +-
 be/src/runtime/task_group/task_group_manager.h     |   8 +-
 be/src/util/mem_info.cpp                           |   4 +-
 be/src/vec/exec/scan/scan_task_queue.cpp           | 213 +++++++++++++++++++++
 be/src/vec/exec/scan/scan_task_queue.h             |  98 ++++++++++
 be/src/vec/exec/scan/scanner_context.cpp           |  19 ++
 be/src/vec/exec/scan/scanner_context.h             |   6 +
 be/src/vec/exec/scan/scanner_scheduler.cpp         |  54 +++++-
 be/src/vec/exec/scan/scanner_scheduler.h           |  13 ++
 be/src/vec/exec/scan/vscanner.h                    |   2 +
 .../main/java/org/apache/doris/common/Config.java  |   5 +-
 26 files changed, 592 insertions(+), 139 deletions(-)

diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp
index 22ec143c6da..b09e77f7eb3 100644
--- a/be/src/common/config.cpp
+++ b/be/src/common/config.cpp
@@ -955,7 +955,7 @@ DEFINE_Bool(enable_fuzzy_mode, "false");
 DEFINE_Bool(enable_debug_points, "false");
 
 DEFINE_Int32(pipeline_executor_size, "0");
-DEFINE_mInt16(pipeline_short_query_timeout_s, "20");
+DEFINE_mBool(enable_workload_group_for_scan, "false");
 
 // Temp config. True to use optimization for bitmap_index apply predicate 
except leaf node of the and node.
 // Will remove after fully test.
diff --git a/be/src/common/config.h b/be/src/common/config.h
index 0a11a4b46d7..de372deaf95 100644
--- a/be/src/common/config.h
+++ b/be/src/common/config.h
@@ -997,7 +997,7 @@ DECLARE_Bool(enable_fuzzy_mode);
 DECLARE_Bool(enable_debug_points);
 
 DECLARE_Int32(pipeline_executor_size);
-DECLARE_mInt16(pipeline_short_query_timeout_s);
+DECLARE_mBool(enable_workload_group_for_scan);
 
 // Temp config. True to use optimization for bitmap_index apply predicate 
except leaf node of the and node.
 // Will remove after fully test.
diff --git a/be/src/pipeline/pipeline_fragment_context.cpp 
b/be/src/pipeline/pipeline_fragment_context.cpp
index d718e1c6eee..1b7dffae1c1 100644
--- a/be/src/pipeline/pipeline_fragment_context.cpp
+++ b/be/src/pipeline/pipeline_fragment_context.cpp
@@ -128,6 +128,9 @@ PipelineFragmentContext::PipelineFragmentContext(
           _report_thread_active(false),
           _report_status_cb(report_status_cb),
           _is_report_on_cancel(true) {
+    if (_query_ctx->get_task_group()) {
+        _task_group_entity = _query_ctx->get_task_group()->task_entity();
+    }
     _report_thread_future = _report_thread_promise.get_future();
     _fragment_watcher.start();
 }
@@ -689,7 +692,7 @@ Status PipelineFragmentContext::submit() {
     int submit_tasks = 0;
     Status st;
     auto* scheduler = _exec_env->pipeline_task_scheduler();
-    if (get_task_group()) {
+    if (_task_group_entity) {
         scheduler = _exec_env->pipeline_task_group_scheduler();
     }
     for (auto& task : _tasks) {
diff --git a/be/src/pipeline/pipeline_fragment_context.h 
b/be/src/pipeline/pipeline_fragment_context.h
index ed79800ec60..1b44894b842 100644
--- a/be/src/pipeline/pipeline_fragment_context.h
+++ b/be/src/pipeline/pipeline_fragment_context.h
@@ -48,10 +48,6 @@ class RuntimeFilterMergeControllerEntity;
 class TDataSink;
 class TPipelineFragmentParams;
 
-namespace taskgroup {
-class TaskGroup;
-} // namespace taskgroup
-
 namespace pipeline {
 
 class PipelineFragmentContext : public 
std::enable_shared_from_this<PipelineFragmentContext> {
@@ -121,7 +117,9 @@ public:
         return _exec_status;
     }
 
-    taskgroup::TaskGroup* get_task_group() const { return 
_query_ctx->get_task_group(); }
+    taskgroup::TaskGroupPipelineTaskEntity* get_task_group_entity() const {
+        return _task_group_entity;
+    }
 
 private:
     Status _create_sink(int sender_id, const TDataSink& t_data_sink, 
RuntimeState* state);
@@ -176,6 +174,8 @@ private:
 
     std::shared_ptr<QueryContext> _query_ctx;
 
+    taskgroup::TaskGroupPipelineTaskEntity* _task_group_entity = nullptr;
+
     std::shared_ptr<RuntimeFilterMergeControllerEntity> 
_merge_controller_handler;
 
     MonotonicStopWatch _fragment_watcher;
diff --git a/be/src/pipeline/pipeline_task.cpp 
b/be/src/pipeline/pipeline_task.cpp
index 9e40b85f917..0d3cb413bf7 100644
--- a/be/src/pipeline/pipeline_task.cpp
+++ b/be/src/pipeline/pipeline_task.cpp
@@ -37,9 +37,6 @@
 
 namespace doris {
 class RuntimeState;
-namespace taskgroup {
-class TaskGroup;
-} // namespace taskgroup
 } // namespace doris
 
 namespace doris::pipeline {
@@ -402,7 +399,8 @@ std::string PipelineTask::debug_string() {
         _task_profile->pretty_print(&profile_ss, "");
         fmt::format_to(debug_string_buffer, "Profile: {}\n", profile_ss.str());
     }
-    fmt::format_to(debug_string_buffer, "PipelineTask[id = {}, state = 
{}]\noperators: ", _index,
+    fmt::format_to(debug_string_buffer,
+                   "PipelineTask[this = {}, state = {}]\noperators: ", 
(void*)this,
                    get_state_name(_cur_state));
     for (size_t i = 0; i < _operators.size(); i++) {
         fmt::format_to(debug_string_buffer, "\n{}{}", std::string(i * 2, ' '),
@@ -422,8 +420,8 @@ std::string PipelineTask::debug_string() {
     return fmt::to_string(debug_string_buffer);
 }
 
-taskgroup::TaskGroup* PipelineTask::get_task_group() const {
-    return _fragment_context->get_task_group();
+taskgroup::TaskGroupPipelineTaskEntity* PipelineTask::get_task_group_entity() 
const {
+    return _fragment_context->get_task_group_entity();
 }
 
 } // namespace doris::pipeline
diff --git a/be/src/pipeline/pipeline_task.h b/be/src/pipeline/pipeline_task.h
index 9fdf3c82e05..ea78d795006 100644
--- a/be/src/pipeline/pipeline_task.h
+++ b/be/src/pipeline/pipeline_task.h
@@ -26,6 +26,7 @@
 #include "common/status.h"
 #include "exec/operator.h"
 #include "pipeline.h"
+#include "runtime/task_group/task_group.h"
 #include "util/runtime_profile.h"
 #include "util/stopwatch.hpp"
 #include "vec/core/block.h"
@@ -36,9 +37,6 @@ class RuntimeState;
 namespace pipeline {
 class PipelineFragmentContext;
 } // namespace pipeline
-namespace taskgroup {
-class TaskGroup;
-} // namespace taskgroup
 } // namespace doris
 
 namespace doris::pipeline {
@@ -107,6 +105,7 @@ inline const char* get_state_name(PipelineTaskState idx) {
 }
 
 class TaskQueue;
+class PriorityTaskQueue;
 
 // The class do the pipeline task. Minest schdule union by task scheduler
 class PipelineTask {
@@ -185,11 +184,11 @@ public:
 
     std::string debug_string();
 
-    taskgroup::TaskGroup* get_task_group() const;
+    taskgroup::TaskGroupPipelineTaskEntity* get_task_group_entity() const;
 
     void set_task_queue(TaskQueue* task_queue);
 
-    static constexpr auto THREAD_TIME_SLICE = 100'000'000L;
+    static constexpr auto THREAD_TIME_SLICE = 100'000'000ULL;
 
     // 1 used for update priority queue
     // note(wb) an ugly implementation, need refactor later
diff --git a/be/src/pipeline/task_queue.cpp b/be/src/pipeline/task_queue.cpp
index 715c9601cdf..a68a2ba4a7f 100644
--- a/be/src/pipeline/task_queue.cpp
+++ b/be/src/pipeline/task_queue.cpp
@@ -23,7 +23,6 @@
 
 #include "common/logging.h"
 #include "pipeline/pipeline_task.h"
-#include "runtime/task_group/task_group.h"
 
 namespace doris {
 namespace pipeline {
@@ -55,7 +54,7 @@ void PriorityTaskQueue::close() {
     _wait_task.notify_all();
 }
 
-PipelineTask* PriorityTaskQueue::try_take_unprotected(bool is_steal) {
+PipelineTask* PriorityTaskQueue::_try_take_unprotected(bool is_steal) {
     if (_total_task_size == 0 || _closed) {
         return nullptr;
     }
@@ -94,12 +93,12 @@ int PriorityTaskQueue::_compute_level(uint64_t runtime) {
 PipelineTask* PriorityTaskQueue::try_take(bool is_steal) {
     // TODO other efficient lock? e.g. if get lock fail, return null_ptr
     std::unique_lock<std::mutex> lock(_work_size_mutex);
-    return try_take_unprotected(is_steal);
+    return _try_take_unprotected(is_steal);
 }
 
 PipelineTask* PriorityTaskQueue::take(uint32_t timeout_ms) {
     std::unique_lock<std::mutex> lock(_work_size_mutex);
-    auto task = try_take_unprotected(false);
+    auto task = _try_take_unprotected(false);
     if (task) {
         return task;
     } else {
@@ -108,7 +107,7 @@ PipelineTask* PriorityTaskQueue::take(uint32_t timeout_ms) {
         } else {
             _wait_task.wait(lock);
         }
-        return try_take_unprotected(false);
+        return _try_take_unprotected(false);
     }
 }
 
@@ -131,6 +130,11 @@ Status PriorityTaskQueue::push(PipelineTask* task) {
     return Status::OK();
 }
 
+int PriorityTaskQueue::task_size() {
+    std::unique_lock<std::mutex> lock(_work_size_mutex);
+    return _total_task_size;
+}
+
 MultiCoreTaskQueue::~MultiCoreTaskQueue() = default;
 
 MultiCoreTaskQueue::MultiCoreTaskQueue(size_t core_size) : 
TaskQueue(core_size), _closed(false) {
@@ -201,9 +205,9 @@ Status MultiCoreTaskQueue::push_back(PipelineTask* task, 
size_t core_id) {
 }
 
 bool TaskGroupTaskQueue::TaskGroupSchedEntityComparator::operator()(
-        const taskgroup::TGEntityPtr& lhs_ptr, const taskgroup::TGEntityPtr& 
rhs_ptr) const {
-    int64_t lhs_val = lhs_ptr->vruntime_ns();
-    int64_t rhs_val = rhs_ptr->vruntime_ns();
+        const taskgroup::TGPTEntityPtr& lhs_ptr, const 
taskgroup::TGPTEntityPtr& rhs_ptr) const {
+    auto lhs_val = lhs_ptr->vruntime_ns();
+    auto rhs_val = rhs_ptr->vruntime_ns();
     if (lhs_val != rhs_val) {
         return lhs_val < rhs_val;
     } else {
@@ -217,7 +221,8 @@ bool 
TaskGroupTaskQueue::TaskGroupSchedEntityComparator::operator()(
     }
 }
 
-TaskGroupTaskQueue::TaskGroupTaskQueue(size_t core_size) : 
TaskQueue(core_size) {}
+TaskGroupTaskQueue::TaskGroupTaskQueue(size_t core_size)
+        : TaskQueue(core_size), _min_tg_entity(nullptr) {}
 
 TaskGroupTaskQueue::~TaskGroupTaskQueue() = default;
 
@@ -237,9 +242,10 @@ Status TaskGroupTaskQueue::push_back(PipelineTask* task, 
size_t core_id) {
 
 template <bool from_executor>
 Status TaskGroupTaskQueue::_push_back(PipelineTask* task) {
-    auto* entity = task->get_task_group()->task_entity();
+    task->put_in_runnable_queue();
+    auto* entity = task->get_task_group_entity();
     std::unique_lock<std::mutex> lock(_rs_mutex);
-    entity->push_back(task);
+    entity->task_queue()->emplace(task);
     if (_group_entities.find(entity) == _group_entities.end()) {
         _enqueue_task_group<from_executor>(entity);
     }
@@ -250,7 +256,7 @@ Status TaskGroupTaskQueue::_push_back(PipelineTask* task) {
 // TODO pipeline support steal
 PipelineTask* TaskGroupTaskQueue::take(size_t core_id) {
     std::unique_lock<std::mutex> lock(_rs_mutex);
-    taskgroup::TGEntityPtr entity = nullptr;
+    taskgroup::TGPTEntityPtr entity = nullptr;
     while (entity == nullptr) {
         if (_closed) {
             return nullptr;
@@ -268,11 +274,16 @@ PipelineTask* TaskGroupTaskQueue::take(size_t core_id) {
     if (entity->task_size() == 1) {
         _dequeue_task_group(entity);
     }
-    return entity->take();
+    auto task = entity->task_queue()->front();
+    if (task) {
+        entity->task_queue()->pop();
+        task->pop_out_runnable_queue();
+    }
+    return task;
 }
 
 template <bool from_worker>
-void TaskGroupTaskQueue::_enqueue_task_group(taskgroup::TGEntityPtr tg_entity) 
{
+void TaskGroupTaskQueue::_enqueue_task_group(taskgroup::TGPTEntityPtr 
tg_entity) {
     _total_cpu_share += tg_entity->cpu_share();
     if constexpr (!from_worker) {
         /**
@@ -283,7 +294,9 @@ void 
TaskGroupTaskQueue::_enqueue_task_group(taskgroup::TGEntityPtr tg_entity) {
         auto old_v_ns = tg_entity->vruntime_ns();
         auto* min_entity = _min_tg_entity.load();
         if (min_entity) {
-            int64_t new_vruntime_ns = min_entity->vruntime_ns() - 
_ideal_runtime_ns(tg_entity) / 2;
+            auto min_tg_v = min_entity->vruntime_ns();
+            auto ideal_r = _ideal_runtime_ns(tg_entity) / 2;
+            uint64_t new_vruntime_ns = min_tg_v > ideal_r ? min_tg_v - ideal_r 
: min_tg_v;
             if (new_vruntime_ns > old_v_ns) {
                 tg_entity->adjust_vruntime_ns(new_vruntime_ns);
             }
@@ -297,7 +310,7 @@ void 
TaskGroupTaskQueue::_enqueue_task_group(taskgroup::TGEntityPtr tg_entity) {
     _update_min_tg();
 }
 
-void TaskGroupTaskQueue::_dequeue_task_group(taskgroup::TGEntityPtr tg_entity) 
{
+void TaskGroupTaskQueue::_dequeue_task_group(taskgroup::TGPTEntityPtr 
tg_entity) {
     _total_cpu_share -= tg_entity->cpu_share();
     _group_entities.erase(tg_entity);
     VLOG_DEBUG << "dequeue tg " << tg_entity->debug_string()
@@ -317,12 +330,12 @@ void TaskGroupTaskQueue::_update_min_tg() {
 }
 
 // like sched_fair.c calc_delta_fair, THREAD_TIME_SLICE maybe a dynamic value.
-int64_t TaskGroupTaskQueue::_ideal_runtime_ns(taskgroup::TGEntityPtr 
tg_entity) const {
+uint64_t TaskGroupTaskQueue::_ideal_runtime_ns(taskgroup::TGPTEntityPtr 
tg_entity) const {
     return PipelineTask::THREAD_TIME_SLICE * _core_size * 
tg_entity->cpu_share() / _total_cpu_share;
 }
 
-taskgroup::TGEntityPtr TaskGroupTaskQueue::_next_tg_entity() {
-    taskgroup::TGEntityPtr res = nullptr;
+taskgroup::TGPTEntityPtr TaskGroupTaskQueue::_next_tg_entity() {
+    taskgroup::TGPTEntityPtr res = nullptr;
     for (auto* entity : _group_entities) {
         res = entity;
         break;
@@ -332,8 +345,7 @@ taskgroup::TGEntityPtr 
TaskGroupTaskQueue::_next_tg_entity() {
 
 void TaskGroupTaskQueue::update_statistics(PipelineTask* task, int64_t 
time_spent) {
     std::unique_lock<std::mutex> lock(_rs_mutex);
-    auto* group = task->get_task_group();
-    auto* entity = group->task_entity();
+    auto* entity = task->get_task_group_entity();
     auto find_entity = _group_entities.find(entity);
     bool is_in_queue = find_entity != _group_entities.end();
     VLOG_DEBUG << "update_statistics " << entity->debug_string() << ", in 
queue:" << is_in_queue;
@@ -348,15 +360,14 @@ void TaskGroupTaskQueue::update_statistics(PipelineTask* 
task, int64_t time_spen
 }
 
 void TaskGroupTaskQueue::update_tg_cpu_share(const taskgroup::TaskGroupInfo& 
task_group_info,
-                                             taskgroup::TaskGroupPtr 
task_group) {
+                                             taskgroup::TGPTEntityPtr entity) {
     std::unique_lock<std::mutex> lock(_rs_mutex);
-    auto* entity = task_group->task_entity();
     bool is_in_queue = _group_entities.find(entity) != _group_entities.end();
     if (is_in_queue) {
         _group_entities.erase(entity);
         _total_cpu_share -= entity->cpu_share();
     }
-    task_group->update_cpu_share_unlock(task_group_info);
+    entity->check_and_update_cpu_share(task_group_info);
     if (is_in_queue) {
         _group_entities.emplace(entity);
         _total_cpu_share += entity->cpu_share();
diff --git a/be/src/pipeline/task_queue.h b/be/src/pipeline/task_queue.h
index 9956ba3cb98..d693cbe2168 100644
--- a/be/src/pipeline/task_queue.h
+++ b/be/src/pipeline/task_queue.h
@@ -33,7 +33,6 @@
 #include "runtime/task_group/task_group.h"
 
 namespace doris {
-
 namespace pipeline {
 
 class TaskQueue {
@@ -54,7 +53,7 @@ public:
     virtual void update_statistics(PipelineTask* task, int64_t time_spent) {}
 
     virtual void update_tg_cpu_share(const taskgroup::TaskGroupInfo& 
task_group_info,
-                                     taskgroup::TaskGroupPtr task_group) = 0;
+                                     taskgroup::TGPTEntityPtr entity) = 0;
 
     int cores() const { return _core_size; }
 
@@ -95,12 +94,10 @@ private:
 // A Multilevel Feedback Queue
 class PriorityTaskQueue {
 public:
-    explicit PriorityTaskQueue();
+    PriorityTaskQueue();
 
     void close();
 
-    PipelineTask* try_take_unprotected(bool is_steal);
-
     PipelineTask* try_take(bool is_steal);
 
     PipelineTask* take(uint32_t timeout_ms = 0);
@@ -111,7 +108,10 @@ public:
         _sub_queues[level].inc_runtime(runtime);
     }
 
+    int task_size();
+
 private:
+    PipelineTask* _try_take_unprotected(bool is_steal);
     static constexpr auto LEVEL_QUEUE_TIME_FACTOR = 2;
     static constexpr size_t SUB_QUEUE_LEVEL = 6;
     SubTaskQueue _sub_queues[SUB_QUEUE_LEVEL];
@@ -155,7 +155,7 @@ public:
     }
 
     void update_tg_cpu_share(const taskgroup::TaskGroupInfo& task_group_info,
-                             taskgroup::TaskGroupPtr task_group) override {
+                             taskgroup::TGPTEntityPtr entity) override {
         LOG(FATAL) << "update_tg_cpu_share not implemented";
     }
 
@@ -185,29 +185,29 @@ public:
     void update_statistics(PipelineTask* task, int64_t time_spent) override;
 
     void update_tg_cpu_share(const taskgroup::TaskGroupInfo& task_group_info,
-                             taskgroup::TaskGroupPtr task_group) override;
+                             taskgroup::TGPTEntityPtr entity) override;
 
 private:
     template <bool from_executor>
     Status _push_back(PipelineTask* task);
     template <bool from_worker>
-    void _enqueue_task_group(taskgroup::TGEntityPtr);
-    void _dequeue_task_group(taskgroup::TGEntityPtr);
-    taskgroup::TGEntityPtr _next_tg_entity();
-    int64_t _ideal_runtime_ns(taskgroup::TGEntityPtr tg_entity) const;
+    void _enqueue_task_group(taskgroup::TGPTEntityPtr);
+    void _dequeue_task_group(taskgroup::TGPTEntityPtr);
+    taskgroup::TGPTEntityPtr _next_tg_entity();
+    uint64_t _ideal_runtime_ns(taskgroup::TGPTEntityPtr tg_entity) const;
     void _update_min_tg();
 
     // Like cfs rb tree in sched_entity
     struct TaskGroupSchedEntityComparator {
-        bool operator()(const taskgroup::TGEntityPtr&, const 
taskgroup::TGEntityPtr&) const;
+        bool operator()(const taskgroup::TGPTEntityPtr&, const 
taskgroup::TGPTEntityPtr&) const;
     };
-    using ResouceGroupSet = std::set<taskgroup::TGEntityPtr, 
TaskGroupSchedEntityComparator>;
+    using ResouceGroupSet = std::set<taskgroup::TGPTEntityPtr, 
TaskGroupSchedEntityComparator>;
     ResouceGroupSet _group_entities;
     std::condition_variable _wait_task;
     std::mutex _rs_mutex;
     bool _closed = false;
     int _total_cpu_share = 0;
-    std::atomic<taskgroup::TGEntityPtr> _min_tg_entity = nullptr;
+    std::atomic<taskgroup::TGPTEntityPtr> _min_tg_entity = nullptr;
     uint64_t _min_tg_v_runtime_ns = 0;
 };
 
diff --git a/be/src/pipeline/task_scheduler.cpp 
b/be/src/pipeline/task_scheduler.cpp
index b83a51ecde2..4298efd01db 100644
--- a/be/src/pipeline/task_scheduler.cpp
+++ b/be/src/pipeline/task_scheduler.cpp
@@ -112,6 +112,7 @@ void BlockedTaskScheduler::_schedule() {
             if (state == PipelineTaskState::PENDING_FINISH) {
                 // should cancel or should finish
                 if (task->is_pending_finish()) {
+                    VLOG_DEBUG << "Task pending" << task->debug_string();
                     iter++;
                 } else {
                     _make_task_run(local_blocked_tasks, iter, 
PipelineTaskState::PENDING_FINISH);
@@ -365,9 +366,4 @@ void TaskScheduler::shutdown() {
     }
 }
 
-void TaskScheduler::update_tg_cpu_share(const taskgroup::TaskGroupInfo& 
task_group_info,
-                                        taskgroup::TaskGroupPtr task_group) {
-    _task_queue->update_tg_cpu_share(task_group_info, task_group);
-}
-
 } // namespace doris::pipeline
diff --git a/be/src/pipeline/task_scheduler.h b/be/src/pipeline/task_scheduler.h
index ad69e10d8b0..ac9389c0887 100644
--- a/be/src/pipeline/task_scheduler.h
+++ b/be/src/pipeline/task_scheduler.h
@@ -91,8 +91,7 @@ public:
 
     void shutdown();
 
-    void update_tg_cpu_share(const taskgroup::TaskGroupInfo& task_group_info,
-                             taskgroup::TaskGroupPtr task_group);
+    TaskQueue* task_queue() const { return _task_queue.get(); }
 
 private:
     std::unique_ptr<ThreadPool> _fix_thread_pool;
diff --git a/be/src/runtime/exec_env.h b/be/src/runtime/exec_env.h
index afc64f65f4b..2032d788392 100644
--- a/be/src/runtime/exec_env.h
+++ b/be/src/runtime/exec_env.h
@@ -39,6 +39,9 @@ class ScannerScheduler;
 namespace pipeline {
 class TaskScheduler;
 }
+namespace taskgroup {
+class TaskGroupManager;
+}
 class BfdParser;
 class BrokerMgr;
 template <class T>
@@ -108,6 +111,7 @@ public:
     pipeline::TaskScheduler* pipeline_task_group_scheduler() {
         return _pipeline_task_group_scheduler;
     }
+    taskgroup::TaskGroupManager* task_group_manager() { return 
_task_group_manager; }
 
     // using template to simplify client cache management
     template <typename T>
@@ -233,6 +237,7 @@ private:
     FragmentMgr* _fragment_mgr = nullptr;
     pipeline::TaskScheduler* _pipeline_task_scheduler = nullptr;
     pipeline::TaskScheduler* _pipeline_task_group_scheduler = nullptr;
+    taskgroup::TaskGroupManager* _task_group_manager = nullptr;
 
     ResultCache* _result_cache = nullptr;
     TMasterInfo* _master_info = nullptr;
diff --git a/be/src/runtime/exec_env_init.cpp b/be/src/runtime/exec_env_init.cpp
index a4e1479fc4e..d0fab65a75f 100644
--- a/be/src/runtime/exec_env_init.cpp
+++ b/be/src/runtime/exec_env_init.cpp
@@ -65,6 +65,7 @@
 #include "runtime/small_file_mgr.h"
 #include "runtime/stream_load/new_load_stream_mgr.h"
 #include "runtime/stream_load/stream_load_executor.h"
+#include "runtime/task_group/task_group_manager.h"
 #include "runtime/thread_context.h"
 #include "service/point_query_executor.h"
 #include "util/bfd_parser.h"
@@ -147,6 +148,7 @@ Status ExecEnv::_init(const std::vector<StorePath>& 
store_paths) {
             .build(&_join_node_thread_pool);
 
     RETURN_IF_ERROR(init_pipeline_task_scheduler());
+    _task_group_manager = new taskgroup::TaskGroupManager();
     _scanner_scheduler = new doris::vectorized::ScannerScheduler();
     _fragment_mgr = new FragmentMgr(this);
     _result_cache = new ResultCache(config::query_cache_max_size_mb,
@@ -389,6 +391,7 @@ void ExecEnv::_destroy() {
     SAFE_DELETE(_load_path_mgr);
     SAFE_DELETE(_pipeline_task_scheduler);
     SAFE_DELETE(_pipeline_task_group_scheduler);
+    SAFE_DELETE(_task_group_manager);
     SAFE_DELETE(_fragment_mgr);
     SAFE_DELETE(_broker_client_cache);
     SAFE_DELETE(_frontend_client_cache);
diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp
index 37588f36616..7e7330a43ec 100644
--- a/be/src/runtime/fragment_mgr.cpp
+++ b/be/src/runtime/fragment_mgr.cpp
@@ -736,7 +736,7 @@ Status FragmentMgr::_get_query_ctx(const Params& params, 
TUniqueId query_id, boo
                 auto status = 
taskgroup::TaskGroupInfo::parse_group_info(params.workload_groups[0],
                                                                          
&task_group_info);
                 if (status.ok()) {
-                    auto tg = 
taskgroup::TaskGroupManager::instance()->get_or_create_task_group(
+                    auto tg = 
_exec_env->task_group_manager()->get_or_create_task_group(
                             task_group_info);
                     tg->add_mem_tracker_limiter(query_ctx->query_mem_tracker);
                     query_ctx->set_task_group(tg);
diff --git a/be/src/runtime/task_group/task_group.cpp 
b/be/src/runtime/task_group/task_group.cpp
index 37149ddfd8b..7c3d8ff42b0 100644
--- a/be/src/runtime/task_group/task_group.cpp
+++ b/be/src/runtime/task_group/task_group.cpp
@@ -27,11 +27,14 @@
 #include <utility>
 
 #include "common/logging.h"
+#include "pipeline/task_queue.h"
 #include "pipeline/task_scheduler.h"
 #include "runtime/exec_env.h"
 #include "runtime/memory/mem_tracker_limiter.h"
 #include "util/mem_info.h"
 #include "util/parse_util.h"
+#include "vec/exec/scan/scan_task_queue.h"
+#include "vec/exec/scan/scanner_scheduler.h"
 
 namespace doris {
 namespace taskgroup {
@@ -40,50 +43,75 @@ const static std::string CPU_SHARE = "cpu_share";
 const static std::string MEMORY_LIMIT = "memory_limit";
 const static std::string ENABLE_MEMORY_OVERCOMMIT = "enable_memory_overcommit";
 
-pipeline::PipelineTask* TaskGroupEntity::take() {
-    if (_queue.empty()) {
-        return nullptr;
-    }
-    auto task = _queue.front();
-    _queue.pop();
-    return task;
+template <typename QueueType>
+TaskGroupEntity<QueueType>::TaskGroupEntity(taskgroup::TaskGroup* tg, 
std::string type)
+        : _tg(tg), _type(type), _version(tg->version()), 
_cpu_share(tg->cpu_share()) {
+    _task_queue = new QueueType();
+}
+
+template <typename QueueType>
+TaskGroupEntity<QueueType>::~TaskGroupEntity() {
+    delete _task_queue;
 }
 
-void TaskGroupEntity::incr_runtime_ns(uint64_t runtime_ns) {
-    auto v_time = runtime_ns / _tg->cpu_share();
+template <typename QueueType>
+QueueType* TaskGroupEntity<QueueType>::task_queue() {
+    return _task_queue;
+}
+
+template <typename QueueType>
+void TaskGroupEntity<QueueType>::incr_runtime_ns(uint64_t runtime_ns) {
+    auto v_time = runtime_ns / _cpu_share;
     _vruntime_ns += v_time;
 }
 
-void TaskGroupEntity::adjust_vruntime_ns(uint64_t vruntime_ns) {
+template <typename QueueType>
+void TaskGroupEntity<QueueType>::adjust_vruntime_ns(uint64_t vruntime_ns) {
     VLOG_DEBUG << "adjust " << debug_string() << "vtime to " << vruntime_ns;
     _vruntime_ns = vruntime_ns;
 }
 
-void TaskGroupEntity::push_back(pipeline::PipelineTask* task) {
-    _queue.emplace(task);
+template <typename QueueType>
+size_t TaskGroupEntity<QueueType>::task_size() const {
+    return _task_queue->size();
 }
 
-uint64_t TaskGroupEntity::cpu_share() const {
-    return _tg->cpu_share();
+template <typename QueueType>
+uint64_t TaskGroupEntity<QueueType>::cpu_share() const {
+    return _cpu_share;
 }
 
-uint64_t TaskGroupEntity::task_group_id() const {
+template <typename QueueType>
+uint64_t TaskGroupEntity<QueueType>::task_group_id() const {
     return _tg->id();
 }
 
-std::string TaskGroupEntity::debug_string() const {
-    return fmt::format("TGE[id = {}, cpu_share = {}, task size: {}, 
v_time:{}ns]", _tg->id(),
-                       cpu_share(), _queue.size(), _vruntime_ns);
+template <typename QueueType>
+void TaskGroupEntity<QueueType>::check_and_update_cpu_share(const 
TaskGroupInfo& tg_info) {
+    if (tg_info.version > _version) {
+        _cpu_share = tg_info.cpu_share;
+        _version = tg_info.version;
+    }
 }
 
+template <typename QueueType>
+std::string TaskGroupEntity<QueueType>::debug_string() const {
+    return fmt::format("TGE[id = {}, name = {}-{}, cpu_share = {}, task size: 
{}, v_time:{} ns]",
+                       _tg->id(), _tg->name(), _type, cpu_share(), 
task_size(), _vruntime_ns);
+}
+
+template class TaskGroupEntity<std::queue<pipeline::PipelineTask*>>;
+template class TaskGroupEntity<ScanTaskQueue>;
+
 TaskGroup::TaskGroup(const TaskGroupInfo& tg_info)
         : _id(tg_info.id),
           _name(tg_info.name),
-          _cpu_share(tg_info.cpu_share),
+          _version(tg_info.version),
           _memory_limit(tg_info.memory_limit),
           _enable_memory_overcommit(tg_info.enable_memory_overcommit),
-          _version(tg_info.version),
-          _task_entity(this),
+          _cpu_share(tg_info.cpu_share),
+          _task_entity(this, "pipeline task entity"),
+          _local_scan_entity(this, "local scan entity"),
           _mem_tracker_limiter_pool(MEM_TRACKER_GROUP_NUM) {}
 
 std::string TaskGroup::debug_string() const {
@@ -105,22 +133,22 @@ void TaskGroup::check_and_update(const TaskGroupInfo& 
tg_info) {
             return;
         }
     }
-
-    std::lock_guard<std::shared_mutex> wl {_mutex};
-    if (tg_info.version > _version) {
-        _name = tg_info.name;
-        _version = tg_info.version;
-        _memory_limit = tg_info.memory_limit;
-        _enable_memory_overcommit = tg_info.enable_memory_overcommit;
-        if (_cpu_share != tg_info.cpu_share) {
-            
ExecEnv::GetInstance()->pipeline_task_group_scheduler()->update_tg_cpu_share(
-                    tg_info, shared_from_this());
+    {
+        std::lock_guard<std::shared_mutex> wl {_mutex};
+        if (tg_info.version > _version) {
+            _name = tg_info.name;
+            _version = tg_info.version;
+            _memory_limit = tg_info.memory_limit;
+            _enable_memory_overcommit = tg_info.enable_memory_overcommit;
+            _cpu_share = tg_info.cpu_share;
+        } else {
+            return;
         }
     }
-}
-
-void TaskGroup::update_cpu_share_unlock(const TaskGroupInfo& tg_info) {
-    _cpu_share = tg_info.cpu_share;
+    
ExecEnv::GetInstance()->pipeline_task_group_scheduler()->task_queue()->update_tg_cpu_share(
+            tg_info, &_task_entity);
+    
ExecEnv::GetInstance()->scanner_scheduler()->local_scan_task_queue()->update_tg_cpu_share(
+            tg_info, &_local_scan_entity);
 }
 
 int64_t TaskGroup::memory_used() {
diff --git a/be/src/runtime/task_group/task_group.h 
b/be/src/runtime/task_group/task_group.h
index c96664fc532..cd547c9c7e9 100644
--- a/be/src/runtime/task_group/task_group.h
+++ b/be/src/runtime/task_group/task_group.h
@@ -31,31 +31,34 @@
 
 namespace doris {
 
-namespace pipeline {
-class PipelineTask;
-}
-
 class TPipelineWorkloadGroup;
 class MemTrackerLimiter;
 
+namespace pipeline {
+class PipelineTask;
+} // namespace pipeline
+
 namespace taskgroup {
 
 class TaskGroup;
 struct TaskGroupInfo;
+class ScanTaskQueue;
 
+template <typename QueueType>
 class TaskGroupEntity {
 public:
-    explicit TaskGroupEntity(taskgroup::TaskGroup* ts) : _tg(ts) {}
-    void push_back(pipeline::PipelineTask* task);
+    explicit TaskGroupEntity(taskgroup::TaskGroup* tg, std::string type);
+    ~TaskGroupEntity();
+
     uint64_t vruntime_ns() const { return _vruntime_ns; }
 
-    pipeline::PipelineTask* take();
+    QueueType* task_queue();
 
     void incr_runtime_ns(uint64_t runtime_ns);
 
     void adjust_vruntime_ns(uint64_t vruntime_ns);
 
-    size_t task_size() const { return _queue.size(); }
+    size_t task_size() const;
 
     uint64_t cpu_share() const;
 
@@ -63,14 +66,29 @@ public:
 
     uint64_t task_group_id() const;
 
+    void check_and_update_cpu_share(const TaskGroupInfo& tg_info);
+
 private:
-    // TODO pipeline use MLFQ
-    std::queue<pipeline::PipelineTask*> _queue;
-    taskgroup::TaskGroup* _tg;
+    QueueType* _task_queue;
+
     uint64_t _vruntime_ns = 0;
+    taskgroup::TaskGroup* _tg;
+
+    std::string _type;
+
+    // Because updating cpu share of entity requires locking the task 
queue(pipeline task queue or
+    // scan task queue) contains that entity, we kept version and cpu share in 
entity for
+    // independent updates.
+    int64_t _version;
+    uint64_t _cpu_share;
 };
 
-using TGEntityPtr = TaskGroupEntity*;
+// TODO llj tg use PriorityTaskQueue to replace std::queue
+using TaskGroupPipelineTaskEntity = 
TaskGroupEntity<std::queue<pipeline::PipelineTask*>>;
+using TGPTEntityPtr = TaskGroupPipelineTaskEntity*;
+
+using TaskGroupScanTaskEntity = TaskGroupEntity<ScanTaskQueue>;
+using TGSTEntityPtr = TaskGroupScanTaskEntity*;
 
 struct TgTrackerLimiterGroup {
     std::unordered_set<std::shared_ptr<MemTrackerLimiter>> trackers;
@@ -81,12 +99,17 @@ class TaskGroup : public 
std::enable_shared_from_this<TaskGroup> {
 public:
     explicit TaskGroup(const TaskGroupInfo& tg_info);
 
-    TaskGroupEntity* task_entity() { return &_task_entity; }
+    TaskGroupPipelineTaskEntity* task_entity() { return &_task_entity; }
+    TGSTEntityPtr local_scan_task_entity() { return &_local_scan_entity; }
+
+    int64_t version() const { return _version; }
 
     uint64_t cpu_share() const { return _cpu_share.load(); }
 
     uint64_t id() const { return _id; }
 
+    std::string name() const { return _name; };
+
     bool enable_memory_overcommit() const {
         std::shared_lock<std::shared_mutex> r_lock(_mutex);
         return _enable_memory_overcommit;
@@ -103,8 +126,6 @@ public:
 
     void check_and_update(const TaskGroupInfo& tg_info);
 
-    void update_cpu_share_unlock(const TaskGroupInfo& tg_info);
-
     void add_mem_tracker_limiter(std::shared_ptr<MemTrackerLimiter> 
mem_tracker_ptr);
 
     void remove_mem_tracker_limiter(std::shared_ptr<MemTrackerLimiter> 
mem_tracker_ptr);
@@ -119,12 +140,12 @@ private:
     mutable std::shared_mutex _mutex; // lock _name, _version, _cpu_share, 
_memory_limit
     const uint64_t _id;
     std::string _name;
-    std::atomic<uint64_t> _cpu_share;
+    int64_t _version;
     int64_t _memory_limit; // bytes
     bool _enable_memory_overcommit;
-    int64_t _version;
-    TaskGroupEntity _task_entity;
-
+    std::atomic<uint64_t> _cpu_share;
+    TaskGroupPipelineTaskEntity _task_entity;
+    TaskGroupScanTaskEntity _local_scan_entity;
     std::vector<TgTrackerLimiterGroup> _mem_tracker_limiter_pool;
 };
 
diff --git a/be/src/runtime/task_group/task_group_manager.cpp 
b/be/src/runtime/task_group/task_group_manager.cpp
index 552ab2c0a92..6ce6d316048 100644
--- a/be/src/runtime/task_group/task_group_manager.cpp
+++ b/be/src/runtime/task_group/task_group_manager.cpp
@@ -20,19 +20,17 @@
 #include <memory>
 #include <mutex>
 
+#include "pipeline/task_scheduler.h"
+#include "runtime/exec_env.h"
 #include "runtime/memory/mem_tracker_limiter.h"
 #include "runtime/task_group/task_group.h"
+#include "vec/exec/scan/scanner_scheduler.h"
 
 namespace doris::taskgroup {
 
 TaskGroupManager::TaskGroupManager() = default;
 TaskGroupManager::~TaskGroupManager() = default;
 
-TaskGroupManager* TaskGroupManager::instance() {
-    static TaskGroupManager tgm;
-    return &tgm;
-}
-
 TaskGroupPtr TaskGroupManager::get_or_create_task_group(const TaskGroupInfo& 
task_group_info) {
     {
         std::shared_lock<std::shared_mutex> r_lock(_group_mutex);
diff --git a/be/src/runtime/task_group/task_group_manager.h 
b/be/src/runtime/task_group/task_group_manager.h
index 0b7472438c3..375208dc6e5 100644
--- a/be/src/runtime/task_group/task_group_manager.h
+++ b/be/src/runtime/task_group/task_group_manager.h
@@ -23,13 +23,14 @@
 
 #include "task_group.h"
 
-namespace doris::taskgroup {
+namespace doris {
+class ExecEnv;
+namespace taskgroup {
 
 class TaskGroupManager {
 public:
     TaskGroupManager();
     ~TaskGroupManager();
-    static TaskGroupManager* instance();
 
     TaskGroupPtr get_or_create_task_group(const TaskGroupInfo& 
task_group_info);
 
@@ -41,4 +42,5 @@ private:
     std::unordered_map<uint64_t, TaskGroupPtr> _task_groups;
 };
 
-} // namespace doris::taskgroup
+} // namespace taskgroup
+} // namespace doris
diff --git a/be/src/util/mem_info.cpp b/be/src/util/mem_info.cpp
index 9250101e67d..38f9039816c 100644
--- a/be/src/util/mem_info.cpp
+++ b/be/src/util/mem_info.cpp
@@ -235,7 +235,7 @@ int64_t MemInfo::tg_not_enable_overcommit_group_gc() {
     std::unique_ptr<RuntimeProfile> tg_profile = 
std::make_unique<RuntimeProfile>("WorkloadGroup");
     int64_t total_free_memory = 0;
 
-    taskgroup::TaskGroupManager::instance()->get_resource_groups(
+    ExecEnv::GetInstance()->task_group_manager()->get_resource_groups(
             [](const taskgroup::TaskGroupPtr& task_group) {
                 return !task_group->enable_memory_overcommit();
             },
@@ -277,7 +277,7 @@ int64_t MemInfo::tg_enable_overcommit_group_gc(int64_t 
request_free_memory,
     MonotonicStopWatch watch;
     watch.start();
     std::vector<taskgroup::TaskGroupPtr> task_groups;
-    taskgroup::TaskGroupManager::instance()->get_resource_groups(
+    ExecEnv::GetInstance()->task_group_manager()->get_resource_groups(
             [](const taskgroup::TaskGroupPtr& task_group) {
                 return task_group->enable_memory_overcommit();
             },
diff --git a/be/src/vec/exec/scan/scan_task_queue.cpp 
b/be/src/vec/exec/scan/scan_task_queue.cpp
new file mode 100644
index 00000000000..538f77211c3
--- /dev/null
+++ b/be/src/vec/exec/scan/scan_task_queue.cpp
@@ -0,0 +1,213 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "scan_task_queue.h"
+
+#include "pipeline/pipeline_task.h"
+#include "runtime/task_group/task_group.h"
+#include "vec/exec/scan/scanner_context.h"
+
+namespace doris {
+namespace taskgroup {
+static void empty_function() {}
+ScanTask::ScanTask() : ScanTask(empty_function, nullptr, 1) {}
+
+ScanTask::ScanTask(WorkFunction scan_func, vectorized::ScannerContext* 
scanner_context,
+                   int priority)
+        : scan_func(std::move(scan_func)), scanner_context(scanner_context), 
priority(priority) {}
+
+ScanTaskQueue::ScanTaskQueue() : 
_queue(config::doris_scanner_thread_pool_queue_size) {}
+
+Status ScanTaskQueue::try_push_back(ScanTask scan_task) {
+    if (_queue.try_put(std::move(scan_task))) {
+        VLOG_DEBUG << "try_push_back scan task " << 
scan_task.scanner_context->ctx_id << " "
+                   << scan_task.priority;
+        return Status::OK();
+    } else {
+        return Status::InternalError("failed to submit scan task to 
ScanTaskQueue");
+    }
+}
+bool ScanTaskQueue::try_get(ScanTask* scan_task, uint32_t timeout_ms) {
+    auto r = _queue.blocking_get(scan_task, timeout_ms);
+    if (r) {
+        VLOG_DEBUG << "try get scan task " << 
scan_task->scanner_context->ctx_id << " "
+                   << scan_task->priority;
+    }
+    return r;
+}
+
+ScanTaskTaskGroupQueue::ScanTaskTaskGroupQueue(size_t core_size) : 
_core_size(core_size) {}
+ScanTaskTaskGroupQueue::~ScanTaskTaskGroupQueue() = default;
+
+void ScanTaskTaskGroupQueue::close() {
+    std::unique_lock<std::mutex> lock(_rs_mutex);
+    _closed = true;
+    _wait_task.notify_all();
+}
+
+bool ScanTaskTaskGroupQueue::take(ScanTask* scan_task) {
+    std::unique_lock<std::mutex> lock(_rs_mutex);
+    taskgroup::TGSTEntityPtr entity = nullptr;
+    while (entity == nullptr) {
+        if (_closed) {
+            return false;
+        }
+        if (_group_entities.empty()) {
+            _wait_task.wait_for(lock, 
std::chrono::milliseconds(WAIT_CORE_TASK_TIMEOUT_MS * 5));
+        } else {
+            entity = _next_tg_entity();
+            if (!entity) {
+                _wait_task.wait_for(lock, 
std::chrono::milliseconds(WAIT_CORE_TASK_TIMEOUT_MS));
+            }
+        }
+    }
+    DCHECK(entity->task_size() > 0);
+    if (entity->task_size() == 1) {
+        _dequeue_task_group(entity);
+    }
+    return entity->task_queue()->try_get(scan_task, WAIT_CORE_TASK_TIMEOUT_MS 
/* timeout_ms */);
+}
+
+bool ScanTaskTaskGroupQueue::push_back(ScanTask scan_task) {
+    auto* entity = 
scan_task.scanner_context->get_task_group()->local_scan_task_entity();
+    std::unique_lock<std::mutex> lock(_rs_mutex);
+    auto status = entity->task_queue()->try_push_back(scan_task);
+    if (!status.ok()) {
+        LOG(WARNING) << "try_push_back scan task fail: " << status;
+        return false;
+    }
+    if (_group_entities.find(entity) == _group_entities.end()) {
+        _enqueue_task_group(entity);
+    }
+    _wait_task.notify_one();
+    return true;
+}
+
+void ScanTaskTaskGroupQueue::update_statistics(ScanTask scan_task, int64_t 
time_spent) {
+    auto* entity = 
scan_task.scanner_context->get_task_group()->local_scan_task_entity();
+    std::unique_lock<std::mutex> lock(_rs_mutex);
+    auto find_entity = _group_entities.find(entity);
+    bool is_in_queue = find_entity != _group_entities.end();
+    VLOG_DEBUG << "scan task task group queue update_statistics " << 
entity->debug_string()
+               << ", in queue:" << is_in_queue << ", time_spent: " << 
time_spent;
+    if (is_in_queue) {
+        _group_entities.erase(entity);
+    }
+    entity->incr_runtime_ns(time_spent);
+    if (is_in_queue) {
+        _group_entities.emplace(entity);
+        _update_min_tg();
+    }
+}
+
+void ScanTaskTaskGroupQueue::update_tg_cpu_share(const 
taskgroup::TaskGroupInfo& task_group_info,
+                                                 taskgroup::TGSTEntityPtr 
entity) {
+    std::unique_lock<std::mutex> lock(_rs_mutex);
+    bool is_in_queue = _group_entities.find(entity) != _group_entities.end();
+    if (is_in_queue) {
+        _group_entities.erase(entity);
+        _total_cpu_share -= entity->cpu_share();
+    }
+    entity->check_and_update_cpu_share(task_group_info);
+    if (is_in_queue) {
+        _group_entities.emplace(entity);
+        _total_cpu_share += entity->cpu_share();
+    }
+}
+
+void ScanTaskTaskGroupQueue::_enqueue_task_group(TGSTEntityPtr tg_entity) {
+    _total_cpu_share += tg_entity->cpu_share();
+    // TODO llj tg If submitted back to this queue from the scanner thread, 
`adjust_vruntime_ns`
+    // should be avoided.
+    /**
+     * If a task group entity leaves task queue for a long time, its v runtime 
will be very
+     * small. This can cause it to preempt too many execution time. So, in 
order to avoid this
+     * situation, it is necessary to adjust the task group's v runtime.
+     * */
+    auto old_v_ns = tg_entity->vruntime_ns();
+    auto* min_entity = _min_tg_entity.load();
+    if (min_entity) {
+        auto min_tg_v = min_entity->vruntime_ns();
+        auto ideal_r = _ideal_runtime_ns(tg_entity) / 2;
+        uint64_t new_vruntime_ns = min_tg_v > ideal_r ? min_tg_v - ideal_r : 
min_tg_v;
+        if (new_vruntime_ns > old_v_ns) {
+            VLOG_DEBUG << tg_entity->debug_string() << ", adjust to new " << 
new_vruntime_ns;
+            tg_entity->adjust_vruntime_ns(new_vruntime_ns);
+        }
+    } else if (old_v_ns < _min_tg_v_runtime_ns) {
+        VLOG_DEBUG << tg_entity->debug_string() << ", adjust to " << 
_min_tg_v_runtime_ns;
+        tg_entity->adjust_vruntime_ns(_min_tg_v_runtime_ns);
+    }
+    _group_entities.emplace(tg_entity);
+    VLOG_DEBUG << "scan enqueue tg " << tg_entity->debug_string()
+               << ", group entity size: " << _group_entities.size();
+    _update_min_tg();
+}
+
+void ScanTaskTaskGroupQueue::_dequeue_task_group(TGSTEntityPtr tg_entity) {
+    _total_cpu_share -= tg_entity->cpu_share();
+    _group_entities.erase(tg_entity);
+    VLOG_DEBUG << "scan task group queue dequeue tg " << 
tg_entity->debug_string()
+               << ", group entity size: " << _group_entities.size();
+    _update_min_tg();
+}
+
+TGSTEntityPtr ScanTaskTaskGroupQueue::_next_tg_entity() {
+    taskgroup::TGSTEntityPtr res = nullptr;
+    for (auto* entity : _group_entities) {
+        res = entity;
+        break;
+    }
+    return res;
+}
+
+uint64_t ScanTaskTaskGroupQueue::_ideal_runtime_ns(TGSTEntityPtr tg_entity) 
const {
+    // Scan task does not have time slice, so we use pipeline task's instead.
+    return pipeline::PipelineTask::THREAD_TIME_SLICE * _core_size * 
tg_entity->cpu_share() /
+           _total_cpu_share;
+}
+
+void ScanTaskTaskGroupQueue::_update_min_tg() {
+    auto* min_entity = _next_tg_entity();
+    _min_tg_entity = min_entity;
+    if (min_entity) {
+        auto min_v_runtime = min_entity->vruntime_ns();
+        if (min_v_runtime > _min_tg_v_runtime_ns) {
+            _min_tg_v_runtime_ns = min_v_runtime;
+        }
+    }
+}
+
+bool ScanTaskTaskGroupQueue::TaskGroupSchedEntityComparator::operator()(
+        const taskgroup::TGSTEntityPtr& lhs_ptr, const 
taskgroup::TGSTEntityPtr& rhs_ptr) const {
+    auto lhs_val = lhs_ptr->vruntime_ns();
+    auto rhs_val = rhs_ptr->vruntime_ns();
+    if (lhs_val != rhs_val) {
+        return lhs_val < rhs_val;
+    } else {
+        auto l_share = lhs_ptr->cpu_share();
+        auto r_share = rhs_ptr->cpu_share();
+        if (l_share != r_share) {
+            return l_share < r_share;
+        } else {
+            return lhs_ptr->task_group_id() < rhs_ptr->task_group_id();
+        }
+    }
+}
+
+} // namespace taskgroup
+} // namespace doris
\ No newline at end of file
diff --git a/be/src/vec/exec/scan/scan_task_queue.h 
b/be/src/vec/exec/scan/scan_task_queue.h
new file mode 100644
index 00000000000..f3c3b792a48
--- /dev/null
+++ b/be/src/vec/exec/scan/scan_task_queue.h
@@ -0,0 +1,98 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+#pragma once
+
+#include "olap/tablet.h"
+#include "runtime/task_group/task_group.h"
+#include "util/blocking_priority_queue.hpp"
+
+namespace doris {
+namespace vectorized {
+class ScannerContext;
+};
+
+namespace taskgroup {
+
+using WorkFunction = std::function<void()>;
+static constexpr auto WAIT_CORE_TASK_TIMEOUT_MS = 100;
+
+// Like PriorityThreadPool::Task
+struct ScanTask {
+    ScanTask();
+    ScanTask(WorkFunction scan_func, vectorized::ScannerContext* 
scanner_context, int priority);
+    bool operator<(const ScanTask& o) const { return priority < o.priority; }
+    ScanTask& operator++() {
+        priority += 2;
+        return *this;
+    }
+
+    WorkFunction scan_func;
+    vectorized::ScannerContext* scanner_context;
+    int priority;
+};
+
+// Like pipeline::PriorityTaskQueue use BlockingPriorityQueue directly?
+class ScanTaskQueue {
+public:
+    ScanTaskQueue();
+    Status try_push_back(ScanTask);
+    bool try_get(ScanTask* scan_task, uint32_t timeout_ms);
+    int size() { return _queue.get_size(); }
+
+private:
+    BlockingPriorityQueue<ScanTask> _queue;
+};
+
+// Like TaskGroupTaskQueue
+class ScanTaskTaskGroupQueue {
+public:
+    explicit ScanTaskTaskGroupQueue(size_t core_size);
+    ~ScanTaskTaskGroupQueue();
+
+    void close();
+    bool take(ScanTask* scan_task);
+    bool push_back(ScanTask);
+
+    void update_statistics(ScanTask task, int64_t time_spent);
+
+    void update_tg_cpu_share(const taskgroup::TaskGroupInfo&, 
taskgroup::TGSTEntityPtr);
+
+private:
+    TGSTEntityPtr _task_entity(ScanTask& scan_task);
+    void _enqueue_task_group(TGSTEntityPtr);
+    void _dequeue_task_group(TGSTEntityPtr);
+    TGSTEntityPtr _next_tg_entity();
+    uint64_t _ideal_runtime_ns(TGSTEntityPtr tg_entity) const;
+    void _update_min_tg();
+
+    // Like cfs rb tree in sched_entity
+    struct TaskGroupSchedEntityComparator {
+        bool operator()(const taskgroup::TGSTEntityPtr&, const 
taskgroup::TGSTEntityPtr&) const;
+    };
+    using ResouceGroupSet = std::set<taskgroup::TGSTEntityPtr, 
TaskGroupSchedEntityComparator>;
+    ResouceGroupSet _group_entities;
+    std::condition_variable _wait_task;
+    std::mutex _rs_mutex;
+    bool _closed = false;
+    int _total_cpu_share = 0;
+    std::atomic<taskgroup::TGSTEntityPtr> _min_tg_entity = nullptr;
+    uint64_t _min_tg_v_runtime_ns = 0;
+    size_t _core_size;
+};
+
+} // namespace taskgroup
+} // namespace doris
diff --git a/be/src/vec/exec/scan/scanner_context.cpp 
b/be/src/vec/exec/scan/scanner_context.cpp
index 478d9fb4cb7..27ed7509987 100644
--- a/be/src/vec/exec/scan/scanner_context.cpp
+++ b/be/src/vec/exec/scan/scanner_context.cpp
@@ -261,8 +261,10 @@ Status 
ScannerContext::_close_and_clear_scanners(VScanNode* node, RuntimeState*
     if (state->enable_profile()) {
         std::stringstream scanner_statistics;
         std::stringstream scanner_rows_read;
+        std::stringstream scanner_wait_worker_time;
         scanner_statistics << "[";
         scanner_rows_read << "[";
+        scanner_wait_worker_time << "[";
         for (auto finished_scanner_time : _finished_scanner_runtime) {
             scanner_statistics << PrettyPrinter::print(finished_scanner_time, 
TUnit::TIME_NS)
                                << ", ";
@@ -270,6 +272,10 @@ Status 
ScannerContext::_close_and_clear_scanners(VScanNode* node, RuntimeState*
         for (auto finished_scanner_rows : _finished_scanner_rows_read) {
             scanner_rows_read << PrettyPrinter::print(finished_scanner_rows, 
TUnit::UNIT) << ", ";
         }
+        for (auto finished_scanner_wait_time : 
_finished_scanner_wait_worker_time) {
+            scanner_wait_worker_time
+                    << PrettyPrinter::print(finished_scanner_wait_time, 
TUnit::TIME_NS) << ", ";
+        }
         // Only unfinished scanners here
         for (auto& scanner : _scanners) {
             // Scanners are in ObjPool in ScanNode,
@@ -279,11 +285,18 @@ Status 
ScannerContext::_close_and_clear_scanners(VScanNode* node, RuntimeState*
                                << ", ";
             scanner_rows_read << 
PrettyPrinter::print(scanner->get_rows_read(), TUnit::UNIT)
                               << ", ";
+            scanner_wait_worker_time
+                    << 
PrettyPrinter::print(scanner->get_scanner_wait_worker_timer(),
+                                            TUnit::TIME_NS)
+                    << ", ";
         }
         scanner_statistics << "]";
         scanner_rows_read << "]";
+        scanner_wait_worker_time << "]";
         node->_scanner_profile->add_info_string("PerScannerRunningTime", 
scanner_statistics.str());
         node->_scanner_profile->add_info_string("PerScannerRowsRead", 
scanner_rows_read.str());
+        node->_scanner_profile->add_info_string("PerScannerWaitTime",
+                                                
scanner_wait_worker_time.str());
     }
     // Only unfinished scanners here
     for (auto& scanner : _scanners) {
@@ -397,6 +410,8 @@ void 
ScannerContext::get_next_batch_of_scanners(std::list<VScannerSPtr>* current
             if (scanner->need_to_close()) {
                 
_finished_scanner_runtime.push_back(scanner->get_time_cost_ns());
                 
_finished_scanner_rows_read.push_back(scanner->get_rows_read());
+                _finished_scanner_wait_worker_time.push_back(
+                        scanner->get_scanner_wait_worker_timer());
                 scanner->close(_state);
             } else {
                 current_run->push_back(scanner);
@@ -406,4 +421,8 @@ void 
ScannerContext::get_next_batch_of_scanners(std::list<VScannerSPtr>* current
     }
 }
 
+taskgroup::TaskGroup* ScannerContext::get_task_group() const {
+    return _state->get_query_ctx()->get_task_group();
+}
+
 } // namespace doris::vectorized
diff --git a/be/src/vec/exec/scan/scanner_context.h 
b/be/src/vec/exec/scan/scanner_context.h
index 3aad0d6263f..c2c8612f861 100644
--- a/be/src/vec/exec/scan/scanner_context.h
+++ b/be/src/vec/exec/scan/scanner_context.h
@@ -41,6 +41,10 @@ class ThreadPoolToken;
 class RuntimeState;
 class TupleDescriptor;
 
+namespace taskgroup {
+class TaskGroup;
+} // namespace taskgroup
+
 namespace vectorized {
 
 class VScanner;
@@ -149,6 +153,7 @@ public:
         }
         return thread_slot_num;
     }
+    taskgroup::TaskGroup* get_task_group() const;
 
     void reschedule_scanner_ctx();
 
@@ -241,6 +246,7 @@ protected:
     std::list<VScannerSPtr> _scanners;
     std::vector<int64_t> _finished_scanner_runtime;
     std::vector<int64_t> _finished_scanner_rows_read;
+    std::vector<int64_t> _finished_scanner_wait_worker_time;
 
     const int _num_parallel_instances;
 
diff --git a/be/src/vec/exec/scan/scanner_scheduler.cpp 
b/be/src/vec/exec/scan/scanner_scheduler.cpp
index f63b04692fd..b1c783a8520 100644
--- a/be/src/vec/exec/scan/scanner_scheduler.cpp
+++ b/be/src/vec/exec/scan/scanner_scheduler.cpp
@@ -36,6 +36,7 @@
 #include "runtime/exec_env.h"
 #include "runtime/runtime_state.h"
 #include "runtime/thread_context.h"
+#include "scan_task_queue.h"
 #include "util/async_io.h" // IWYU pragma: keep
 #include "util/blocking_queue.hpp"
 #include "util/cpu_info.h"
@@ -80,6 +81,7 @@ ScannerScheduler::~ScannerScheduler() {
     _local_scan_thread_pool->shutdown();
     _remote_scan_thread_pool->shutdown();
     _limited_scan_thread_pool->shutdown();
+    _group_local_scan_thread_pool->shutdown();
 
     _scheduler_pool->wait();
     _local_scan_thread_pool->join();
@@ -88,6 +90,9 @@ ScannerScheduler::~ScannerScheduler() {
         delete _pending_queues[i];
     }
     delete[] _pending_queues;
+
+    _task_group_local_scan_queue->close();
+    _group_local_scan_thread_pool->wait();
 }
 
 Status ScannerScheduler::init(ExecEnv* env) {
@@ -127,6 +132,19 @@ Status ScannerScheduler::init(ExecEnv* env) {
 
     _register_metrics();
 
+    // 5. task group local scan
+    _task_group_local_scan_queue = 
std::make_unique<taskgroup::ScanTaskTaskGroupQueue>(
+            config::doris_scanner_thread_pool_thread_num);
+    ThreadPoolBuilder("local_scan_group")
+            .set_min_threads(config::doris_scanner_thread_pool_thread_num)
+            .set_max_threads(config::doris_scanner_thread_pool_thread_num)
+            .build(&_group_local_scan_thread_pool);
+    for (int i = 0; i < config::doris_scanner_thread_pool_thread_num; i++) {
+        _group_local_scan_thread_pool->submit_func([this] {
+            this->_task_group_scanner_scan(this, 
_task_group_local_scan_queue.get());
+        });
+    }
+
     _is_init = true;
     return Status::OK();
 }
@@ -219,12 +237,20 @@ void ScannerScheduler::_schedule_scanners(ScannerContext* 
ctx) {
                 TabletStorageType type = (*iter)->get_storage_type();
                 bool ret = false;
                 if (type == TabletStorageType::STORAGE_TYPE_LOCAL) {
-                    PriorityThreadPool::Task task;
-                    task.work_function = [this, scanner = *iter, ctx] {
-                        this->_scanner_scan(this, ctx, scanner);
-                    };
-                    task.priority = nice;
-                    ret = _local_scan_thread_pool->offer(task);
+                    if (ctx->get_task_group() && 
config::enable_workload_group_for_scan) {
+                        auto work_func = [this, scanner = *iter, ctx] {
+                            this->_scanner_scan(this, ctx, scanner);
+                        };
+                        taskgroup::ScanTask scan_task = {work_func, ctx, nice};
+                        ret = 
_task_group_local_scan_queue->push_back(scan_task);
+                    } else {
+                        PriorityThreadPool::Task task;
+                        task.work_function = [this, scanner = *iter, ctx] {
+                            this->_scanner_scan(this, ctx, scanner);
+                        };
+                        task.priority = nice;
+                        ret = _local_scan_thread_pool->offer(task);
+                    }
                 } else {
                     ret = _remote_scan_thread_pool->submit_func([this, scanner 
= *iter, ctx] {
                         this->_scanner_scan(this, ctx, scanner);
@@ -438,4 +464,20 @@ void ScannerScheduler::_deregister_metrics() {
     DEREGISTER_HOOK_METRIC(limited_scan_thread_pool_thread_num);
 }
 
+void ScannerScheduler::_task_group_scanner_scan(ScannerScheduler* scheduler,
+                                                
taskgroup::ScanTaskTaskGroupQueue* scan_queue) {
+    while (!_is_closed) {
+        taskgroup::ScanTask scan_task;
+        auto success = scan_queue->take(&scan_task);
+        if (success) {
+            int64_t time_spent = 0;
+            {
+                SCOPED_RAW_TIMER(&time_spent);
+                scan_task.scan_func();
+            }
+            scan_queue->update_statistics(scan_task, time_spent);
+        }
+    }
+}
+
 } // namespace doris::vectorized
diff --git a/be/src/vec/exec/scan/scanner_scheduler.h 
b/be/src/vec/exec/scan/scanner_scheduler.h
index df81fcf8b47..87a9498e77c 100644
--- a/be/src/vec/exec/scan/scanner_scheduler.h
+++ b/be/src/vec/exec/scan/scanner_scheduler.h
@@ -21,6 +21,7 @@
 #include <memory>
 
 #include "common/status.h"
+#include "scan_task_queue.h"
 #include "util/threadpool.h"
 #include "vec/exec/scan/vscanner.h"
 
@@ -30,6 +31,9 @@ class ExecEnv;
 namespace vectorized {
 class VScanner;
 } // namespace vectorized
+namespace taskgroup {
+class ScanTaskTaskGroupQueue;
+}
 template <typename T>
 class BlockingQueue;
 } // namespace doris
@@ -66,6 +70,9 @@ public:
 
     std::unique_ptr<ThreadPoolToken> 
new_limited_scan_pool_token(ThreadPool::ExecutionMode mode,
                                                                  int 
max_concurrency);
+    taskgroup::ScanTaskTaskGroupQueue* local_scan_task_queue() {
+        return _task_group_local_scan_queue.get();
+    }
 
     int remote_thread_pool_max_size() const { return 
_remote_thread_pool_max_size; }
 
@@ -81,6 +88,9 @@ private:
 
     static void _deregister_metrics();
 
+    void _task_group_scanner_scan(ScannerScheduler* scheduler,
+                                  taskgroup::ScanTaskTaskGroupQueue* 
scan_queue);
+
     // Scheduling queue number.
     // TODO: make it configurable.
     static const int QUEUE_NUM = 4;
@@ -103,6 +113,9 @@ private:
     std::unique_ptr<ThreadPool> _remote_scan_thread_pool;
     std::unique_ptr<ThreadPool> _limited_scan_thread_pool;
 
+    std::unique_ptr<taskgroup::ScanTaskTaskGroupQueue> 
_task_group_local_scan_queue;
+    std::unique_ptr<ThreadPool> _group_local_scan_thread_pool;
+
     // true is the scheduler is closed.
     std::atomic_bool _is_closed = {false};
     bool _is_init = false;
diff --git a/be/src/vec/exec/scan/vscanner.h b/be/src/vec/exec/scan/vscanner.h
index 2ee39979563..c29aafafc04 100644
--- a/be/src/vec/exec/scan/vscanner.h
+++ b/be/src/vec/exec/scan/vscanner.h
@@ -108,6 +108,8 @@ public:
 
     void update_wait_worker_timer() { _scanner_wait_worker_timer += 
_watch.elapsed_time(); }
 
+    int64_t get_scanner_wait_worker_timer() { return 
_scanner_wait_worker_timer; }
+
     void update_scan_cpu_timer() { _scan_cpu_timer += 
_cpu_watch.elapsed_time(); }
 
     RuntimeState* runtime_state() { return _state; }
diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java 
b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
index 142616f2a0a..f81545f3b90 100644
--- a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
+++ b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
@@ -1581,7 +1581,7 @@ public class Config extends ConfigBase {
     public static boolean enable_pipeline_load = false;
 
     // enable_workload_group should be immutable and temporarily set to 
mutable during the development test phase
-    @ConfField(mutable = true, masterOnly = true, expType = 
ExperimentalType.EXPERIMENTAL)
+    @ConfField(mutable = true, expType = ExperimentalType.EXPERIMENTAL)
     public static boolean enable_workload_group = false;
 
     @ConfField(mutable = true)
@@ -1590,9 +1590,6 @@ public class Config extends ConfigBase {
     @ConfField(mutable = true)
     public static boolean disable_shared_scan = false;
 
-    @ConfField(mutable = true, expType = ExperimentalType.EXPERIMENTAL)
-    public static boolean enable_cpu_hard_limit = false;
-
     @ConfField(mutable = false, masterOnly = true)
     public static int backend_rpc_timeout_ms = 60000; // 1 min
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to