This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 016c293eaef [refactor](taskqueue) remove old task scheduler based wg 
(#30832)
016c293eaef is described below

commit 016c293eaef8a897538cc12531b1431966a65454
Author: yiguolei <[email protected]>
AuthorDate: Mon Feb 5 18:22:01 2024 +0800

    [refactor](taskqueue) remove old task scheduler based wg (#30832)
    
    
    
    ---------
    
    Co-authored-by: yiguolei <[email protected]>
---
 be/src/pipeline/pipeline_fragment_context.cpp    |   3 -
 be/src/pipeline/pipeline_fragment_context.h      |   5 -
 be/src/pipeline/pipeline_task.cpp                |   4 -
 be/src/pipeline/pipeline_task.h                  |   2 -
 be/src/pipeline/task_queue.cpp                   | 170 -----------------------
 be/src/pipeline/task_queue.h                     |  52 -------
 be/src/runtime/exec_env.h                        |   4 -
 be/src/runtime/exec_env_init.cpp                 |  10 --
 be/src/runtime/query_context.cpp                 |   4 +-
 be/src/runtime/query_context.h                   |   4 -
 be/src/runtime/task_group/task_group.cpp         |  62 ---------
 be/src/runtime/task_group/task_group.h           |  47 -------
 be/src/runtime/task_group/task_group_manager.cpp |   5 +-
 be/src/runtime/task_group/task_group_manager.h   |   4 +-
 be/src/util/mem_info.cpp                         |   4 +-
 15 files changed, 8 insertions(+), 372 deletions(-)

diff --git a/be/src/pipeline/pipeline_fragment_context.cpp 
b/be/src/pipeline/pipeline_fragment_context.cpp
index a7d33aa0c77..1fb50c9ea06 100644
--- a/be/src/pipeline/pipeline_fragment_context.cpp
+++ b/be/src/pipeline/pipeline_fragment_context.cpp
@@ -131,9 +131,6 @@ PipelineFragmentContext::PipelineFragmentContext(
           _is_report_on_cancel(true),
           _report_status_cb(report_status_cb),
           _create_time(MonotonicNanos()) {
-    if (_query_ctx->get_task_group()) {
-        _task_group_entity = _query_ctx->get_task_group()->task_entity();
-    }
     _fragment_watcher.start();
 }
 
diff --git a/be/src/pipeline/pipeline_fragment_context.h 
b/be/src/pipeline/pipeline_fragment_context.h
index 0ec27c5054f..b8a3bf7e922 100644
--- a/be/src/pipeline/pipeline_fragment_context.h
+++ b/be/src/pipeline/pipeline_fragment_context.h
@@ -132,9 +132,6 @@ public:
         return _query_ctx->exec_status();
     }
 
-    [[nodiscard]] taskgroup::TaskGroupPipelineTaskEntity* 
get_task_group_entity() const {
-        return _task_group_entity;
-    }
     void trigger_report_if_necessary();
     virtual void instance_ids(std::vector<TUniqueId>& ins_ids) const {
         ins_ids.resize(1);
@@ -198,8 +195,6 @@ protected:
 
     std::shared_ptr<QueryContext> _query_ctx;
 
-    taskgroup::TaskGroupPipelineTaskEntity* _task_group_entity = nullptr;
-
     std::shared_ptr<RuntimeFilterMergeControllerEntity> 
_merge_controller_handler;
 
     MonotonicStopWatch _fragment_watcher;
diff --git a/be/src/pipeline/pipeline_task.cpp 
b/be/src/pipeline/pipeline_task.cpp
index 6ea1b482c85..f31a39df31a 100644
--- a/be/src/pipeline/pipeline_task.cpp
+++ b/be/src/pipeline/pipeline_task.cpp
@@ -426,8 +426,4 @@ std::string PipelineTask::debug_string() {
     return fmt::to_string(debug_string_buffer);
 }
 
-taskgroup::TaskGroupPipelineTaskEntity* PipelineTask::get_task_group_entity() 
const {
-    return _fragment_context->get_task_group_entity();
-}
-
 } // namespace doris::pipeline
diff --git a/be/src/pipeline/pipeline_task.h b/be/src/pipeline/pipeline_task.h
index 04bc55908e3..be1531de4e1 100644
--- a/be/src/pipeline/pipeline_task.h
+++ b/be/src/pipeline/pipeline_task.h
@@ -196,8 +196,6 @@ public:
 
     virtual std::string debug_string();
 
-    taskgroup::TaskGroupPipelineTaskEntity* get_task_group_entity() const;
-
     void set_task_queue(TaskQueue* task_queue);
     TaskQueue* get_task_queue() { return _task_queue; }
 
diff --git a/be/src/pipeline/task_queue.cpp b/be/src/pipeline/task_queue.cpp
index a68a2ba4a7f..a79d865821d 100644
--- a/be/src/pipeline/task_queue.cpp
+++ b/be/src/pipeline/task_queue.cpp
@@ -204,175 +204,5 @@ Status MultiCoreTaskQueue::push_back(PipelineTask* task, 
size_t core_id) {
     return _prio_task_queue_list[core_id].push(task);
 }
 
-bool TaskGroupTaskQueue::TaskGroupSchedEntityComparator::operator()(
-        const taskgroup::TGPTEntityPtr& lhs_ptr, const 
taskgroup::TGPTEntityPtr& rhs_ptr) const {
-    auto lhs_val = lhs_ptr->vruntime_ns();
-    auto rhs_val = rhs_ptr->vruntime_ns();
-    if (lhs_val != rhs_val) {
-        return lhs_val < rhs_val;
-    } else {
-        auto l_share = lhs_ptr->cpu_share();
-        auto r_share = rhs_ptr->cpu_share();
-        if (l_share != r_share) {
-            return l_share < r_share;
-        } else {
-            return lhs_ptr->task_group_id() < rhs_ptr->task_group_id();
-        }
-    }
-}
-
-TaskGroupTaskQueue::TaskGroupTaskQueue(size_t core_size)
-        : TaskQueue(core_size), _min_tg_entity(nullptr) {}
-
-TaskGroupTaskQueue::~TaskGroupTaskQueue() = default;
-
-void TaskGroupTaskQueue::close() {
-    std::unique_lock<std::mutex> lock(_rs_mutex);
-    _closed = true;
-    _wait_task.notify_all();
-}
-
-Status TaskGroupTaskQueue::push_back(PipelineTask* task) {
-    return _push_back<false>(task);
-}
-
-Status TaskGroupTaskQueue::push_back(PipelineTask* task, size_t core_id) {
-    return _push_back<true>(task);
-}
-
-template <bool from_executor>
-Status TaskGroupTaskQueue::_push_back(PipelineTask* task) {
-    task->put_in_runnable_queue();
-    auto* entity = task->get_task_group_entity();
-    std::unique_lock<std::mutex> lock(_rs_mutex);
-    entity->task_queue()->emplace(task);
-    if (_group_entities.find(entity) == _group_entities.end()) {
-        _enqueue_task_group<from_executor>(entity);
-    }
-    _wait_task.notify_one();
-    return Status::OK();
-}
-
-// TODO pipeline support steal
-PipelineTask* TaskGroupTaskQueue::take(size_t core_id) {
-    std::unique_lock<std::mutex> lock(_rs_mutex);
-    taskgroup::TGPTEntityPtr entity = nullptr;
-    while (entity == nullptr) {
-        if (_closed) {
-            return nullptr;
-        }
-        if (_group_entities.empty()) {
-            _wait_task.wait(lock);
-        } else {
-            entity = _next_tg_entity();
-            if (!entity) {
-                _wait_task.wait_for(lock, 
std::chrono::milliseconds(WAIT_CORE_TASK_TIMEOUT_MS));
-            }
-        }
-    }
-    DCHECK(entity->task_size() > 0);
-    if (entity->task_size() == 1) {
-        _dequeue_task_group(entity);
-    }
-    auto task = entity->task_queue()->front();
-    if (task) {
-        entity->task_queue()->pop();
-        task->pop_out_runnable_queue();
-    }
-    return task;
-}
-
-template <bool from_worker>
-void TaskGroupTaskQueue::_enqueue_task_group(taskgroup::TGPTEntityPtr 
tg_entity) {
-    _total_cpu_share += tg_entity->cpu_share();
-    if constexpr (!from_worker) {
-        /**
-         * If a task group entity leaves task queue for a long time, its v 
runtime will be very
-         * small. This can cause it to preempt too many execution time. So, in 
order to avoid this
-         * situation, it is necessary to adjust the task group's v runtime.
-         * */
-        auto old_v_ns = tg_entity->vruntime_ns();
-        auto* min_entity = _min_tg_entity.load();
-        if (min_entity) {
-            auto min_tg_v = min_entity->vruntime_ns();
-            auto ideal_r = _ideal_runtime_ns(tg_entity) / 2;
-            uint64_t new_vruntime_ns = min_tg_v > ideal_r ? min_tg_v - ideal_r 
: min_tg_v;
-            if (new_vruntime_ns > old_v_ns) {
-                tg_entity->adjust_vruntime_ns(new_vruntime_ns);
-            }
-        } else if (old_v_ns < _min_tg_v_runtime_ns) {
-            tg_entity->adjust_vruntime_ns(_min_tg_v_runtime_ns);
-        }
-    }
-    _group_entities.emplace(tg_entity);
-    VLOG_DEBUG << "enqueue tg " << tg_entity->debug_string()
-               << ", group entity size: " << _group_entities.size();
-    _update_min_tg();
-}
-
-void TaskGroupTaskQueue::_dequeue_task_group(taskgroup::TGPTEntityPtr 
tg_entity) {
-    _total_cpu_share -= tg_entity->cpu_share();
-    _group_entities.erase(tg_entity);
-    VLOG_DEBUG << "dequeue tg " << tg_entity->debug_string()
-               << ", group entity size: " << _group_entities.size();
-    _update_min_tg();
-}
-
-void TaskGroupTaskQueue::_update_min_tg() {
-    auto* min_entity = _next_tg_entity();
-    _min_tg_entity = min_entity;
-    if (min_entity) {
-        auto min_v_runtime = min_entity->vruntime_ns();
-        if (min_v_runtime > _min_tg_v_runtime_ns) {
-            _min_tg_v_runtime_ns = min_v_runtime;
-        }
-    }
-}
-
-// like sched_fair.c calc_delta_fair, THREAD_TIME_SLICE maybe a dynamic value.
-uint64_t TaskGroupTaskQueue::_ideal_runtime_ns(taskgroup::TGPTEntityPtr 
tg_entity) const {
-    return PipelineTask::THREAD_TIME_SLICE * _core_size * 
tg_entity->cpu_share() / _total_cpu_share;
-}
-
-taskgroup::TGPTEntityPtr TaskGroupTaskQueue::_next_tg_entity() {
-    taskgroup::TGPTEntityPtr res = nullptr;
-    for (auto* entity : _group_entities) {
-        res = entity;
-        break;
-    }
-    return res;
-}
-
-void TaskGroupTaskQueue::update_statistics(PipelineTask* task, int64_t 
time_spent) {
-    std::unique_lock<std::mutex> lock(_rs_mutex);
-    auto* entity = task->get_task_group_entity();
-    auto find_entity = _group_entities.find(entity);
-    bool is_in_queue = find_entity != _group_entities.end();
-    VLOG_DEBUG << "update_statistics " << entity->debug_string() << ", in 
queue:" << is_in_queue;
-    if (is_in_queue) {
-        _group_entities.erase(entity);
-    }
-    entity->incr_runtime_ns(time_spent);
-    if (is_in_queue) {
-        _group_entities.emplace(entity);
-        _update_min_tg();
-    }
-}
-
-void TaskGroupTaskQueue::update_tg_cpu_share(const taskgroup::TaskGroupInfo& 
task_group_info,
-                                             taskgroup::TGPTEntityPtr entity) {
-    std::unique_lock<std::mutex> lock(_rs_mutex);
-    bool is_in_queue = _group_entities.find(entity) != _group_entities.end();
-    if (is_in_queue) {
-        _group_entities.erase(entity);
-        _total_cpu_share -= entity->cpu_share();
-    }
-    entity->check_and_update_cpu_share(task_group_info);
-    if (is_in_queue) {
-        _group_entities.emplace(entity);
-        _total_cpu_share += entity->cpu_share();
-    }
-}
-
 } // namespace pipeline
 } // namespace doris
\ No newline at end of file
diff --git a/be/src/pipeline/task_queue.h b/be/src/pipeline/task_queue.h
index d693cbe2168..9440fc18d83 100644
--- a/be/src/pipeline/task_queue.h
+++ b/be/src/pipeline/task_queue.h
@@ -52,9 +52,6 @@ public:
 
     virtual void update_statistics(PipelineTask* task, int64_t time_spent) {}
 
-    virtual void update_tg_cpu_share(const taskgroup::TaskGroupInfo& 
task_group_info,
-                                     taskgroup::TGPTEntityPtr entity) = 0;
-
     int cores() const { return _core_size; }
 
 protected:
@@ -154,11 +151,6 @@ public:
                                                                          
time_spent);
     }
 
-    void update_tg_cpu_share(const taskgroup::TaskGroupInfo& task_group_info,
-                             taskgroup::TGPTEntityPtr entity) override {
-        LOG(FATAL) << "update_tg_cpu_share not implemented";
-    }
-
 private:
     PipelineTask* _steal_take(size_t core_id);
 
@@ -167,49 +159,5 @@ private:
     std::atomic<bool> _closed;
 };
 
-class TaskGroupTaskQueue : public TaskQueue {
-public:
-    explicit TaskGroupTaskQueue(size_t);
-    ~TaskGroupTaskQueue() override;
-
-    void close() override;
-
-    PipelineTask* take(size_t core_id) override;
-
-    // from TaskScheduler or BlockedTaskScheduler
-    Status push_back(PipelineTask* task) override;
-
-    // from worker
-    Status push_back(PipelineTask* task, size_t core_id) override;
-
-    void update_statistics(PipelineTask* task, int64_t time_spent) override;
-
-    void update_tg_cpu_share(const taskgroup::TaskGroupInfo& task_group_info,
-                             taskgroup::TGPTEntityPtr entity) override;
-
-private:
-    template <bool from_executor>
-    Status _push_back(PipelineTask* task);
-    template <bool from_worker>
-    void _enqueue_task_group(taskgroup::TGPTEntityPtr);
-    void _dequeue_task_group(taskgroup::TGPTEntityPtr);
-    taskgroup::TGPTEntityPtr _next_tg_entity();
-    uint64_t _ideal_runtime_ns(taskgroup::TGPTEntityPtr tg_entity) const;
-    void _update_min_tg();
-
-    // Like cfs rb tree in sched_entity
-    struct TaskGroupSchedEntityComparator {
-        bool operator()(const taskgroup::TGPTEntityPtr&, const 
taskgroup::TGPTEntityPtr&) const;
-    };
-    using ResouceGroupSet = std::set<taskgroup::TGPTEntityPtr, 
TaskGroupSchedEntityComparator>;
-    ResouceGroupSet _group_entities;
-    std::condition_variable _wait_task;
-    std::mutex _rs_mutex;
-    bool _closed = false;
-    int _total_cpu_share = 0;
-    std::atomic<taskgroup::TGPTEntityPtr> _min_tg_entity = nullptr;
-    uint64_t _min_tg_v_runtime_ns = 0;
-};
-
 } // namespace pipeline
 } // namespace doris
diff --git a/be/src/runtime/exec_env.h b/be/src/runtime/exec_env.h
index b8120f1c731..24316065397 100644
--- a/be/src/runtime/exec_env.h
+++ b/be/src/runtime/exec_env.h
@@ -147,7 +147,6 @@ public:
     ClientCache<TPaloBrokerServiceClient>* broker_client_cache() { return 
_broker_client_cache; }
 
     pipeline::TaskScheduler* pipeline_task_scheduler() { return 
_without_group_task_scheduler; }
-    pipeline::TaskScheduler* pipeline_task_group_scheduler() { return 
_with_group_task_scheduler; }
     taskgroup::TaskGroupManager* task_group_manager() { return 
_task_group_manager; }
     WorkloadSchedPolicyMgr* workload_sched_policy_mgr() { return 
_workload_sched_mgr; }
     RuntimeQueryStatiticsMgr* runtime_query_statistics_mgr() {
@@ -320,7 +319,6 @@ private:
 
     FragmentMgr* _fragment_mgr = nullptr;
     pipeline::TaskScheduler* _without_group_task_scheduler = nullptr;
-    pipeline::TaskScheduler* _with_group_task_scheduler = nullptr;
     taskgroup::TaskGroupManager* _task_group_manager = nullptr;
 
     ResultCache* _result_cache = nullptr;
@@ -374,8 +372,6 @@ private:
     std::shared_ptr<doris::pipeline::BlockedTaskScheduler> 
_global_block_scheduler;
     // used for query without workload group
     std::shared_ptr<doris::pipeline::BlockedTaskScheduler> 
_without_group_block_scheduler;
-    // used for query with workload group cpu soft limit
-    std::shared_ptr<doris::pipeline::BlockedTaskScheduler> 
_with_group_block_scheduler;
 
     doris::pipeline::RuntimeFilterTimerQueue* _runtime_filter_timer_queue = 
nullptr;
 
diff --git a/be/src/runtime/exec_env_init.cpp b/be/src/runtime/exec_env_init.cpp
index 9942262f9c9..1883a6f2dbe 100644
--- a/be/src/runtime/exec_env_init.cpp
+++ b/be/src/runtime/exec_env_init.cpp
@@ -301,13 +301,6 @@ Status ExecEnv::init_pipeline_task_scheduler() {
     RETURN_IF_ERROR(_without_group_task_scheduler->start());
     RETURN_IF_ERROR(_without_group_block_scheduler->start());
 
-    auto tg_queue = 
std::make_shared<pipeline::TaskGroupTaskQueue>(executors_size);
-    _with_group_block_scheduler = 
std::make_shared<pipeline::BlockedTaskScheduler>("PipeGSchePool");
-    _with_group_task_scheduler = new pipeline::TaskScheduler(this, 
_with_group_block_scheduler,
-                                                             tg_queue, 
"PipeGSchePool", nullptr);
-    RETURN_IF_ERROR(_with_group_task_scheduler->start());
-    RETURN_IF_ERROR(_with_group_block_scheduler->start());
-
     _global_block_scheduler = 
std::make_shared<pipeline::BlockedTaskScheduler>("PipeGBlockSche");
     RETURN_IF_ERROR(_global_block_scheduler->start());
     _runtime_filter_timer_queue = new 
doris::pipeline::RuntimeFilterTimerQueue();
@@ -542,8 +535,6 @@ void ExecEnv::destroy() {
     // stop pipline step 1, non-cgroup execution
     SAFE_SHUTDOWN(_without_group_block_scheduler.get());
     SAFE_STOP(_without_group_task_scheduler);
-    SAFE_SHUTDOWN(_with_group_block_scheduler.get());
-    SAFE_STOP(_with_group_task_scheduler);
     // stop pipline step 2, cgroup execution
     SAFE_SHUTDOWN(_global_block_scheduler.get());
     SAFE_STOP(_task_group_manager);
@@ -611,7 +602,6 @@ void ExecEnv::destroy() {
     SAFE_DELETE(_fragment_mgr);
     SAFE_DELETE(_workload_sched_mgr);
     SAFE_DELETE(_task_group_manager);
-    SAFE_DELETE(_with_group_task_scheduler);
     SAFE_DELETE(_without_group_task_scheduler);
     SAFE_DELETE(_file_cache_factory);
     SAFE_DELETE(_runtime_filter_timer_queue);
diff --git a/be/src/runtime/query_context.cpp b/be/src/runtime/query_context.cpp
index 5b2de639d47..c5f3ece16f8 100644
--- a/be/src/runtime/query_context.cpp
+++ b/be/src/runtime/query_context.cpp
@@ -162,9 +162,7 @@ void QueryContext::set_query_scheduler(uint64_t tg_id) {
 
 doris::pipeline::TaskScheduler* QueryContext::get_pipe_exec_scheduler() {
     if (_task_group) {
-        if (!config::enable_cgroup_cpu_soft_limit) {
-            return _exec_env->pipeline_task_group_scheduler();
-        } else if (_task_scheduler) {
+        if (_task_scheduler) {
             return _task_scheduler;
         }
     }
diff --git a/be/src/runtime/query_context.h b/be/src/runtime/query_context.h
index d0dcd02452b..948b2427ddc 100644
--- a/be/src/runtime/query_context.h
+++ b/be/src/runtime/query_context.h
@@ -152,10 +152,6 @@ public:
 
     void set_task_group(taskgroup::TaskGroupPtr& tg) { _task_group = tg; }
 
-    taskgroup::TaskGroup* get_task_group() const {
-        return _task_group == nullptr ? nullptr : _task_group.get();
-    }
-
     int execution_timeout() const {
         return _query_options.__isset.execution_timeout ? 
_query_options.execution_timeout
                                                         : 
_query_options.query_timeout;
diff --git a/be/src/runtime/task_group/task_group.cpp 
b/be/src/runtime/task_group/task_group.cpp
index 9e86f8b831b..e0b0dc1fb5e 100644
--- a/be/src/runtime/task_group/task_group.cpp
+++ b/be/src/runtime/task_group/task_group.cpp
@@ -43,65 +43,6 @@ const static std::string MEMORY_LIMIT_DEFAULT_VALUE = "0%";
 const static bool ENABLE_MEMORY_OVERCOMMIT_DEFAULT_VALUE = true;
 const static int CPU_HARD_LIMIT_DEFAULT_VALUE = -1;
 
-template <typename QueueType>
-TaskGroupEntity<QueueType>::TaskGroupEntity(taskgroup::TaskGroup* tg, 
std::string type)
-        : _tg(tg), _type(type), _version(tg->version()), 
_cpu_share(tg->cpu_share()) {
-    _task_queue = new QueueType();
-}
-
-template <typename QueueType>
-TaskGroupEntity<QueueType>::~TaskGroupEntity() {
-    delete _task_queue;
-}
-
-template <typename QueueType>
-QueueType* TaskGroupEntity<QueueType>::task_queue() {
-    return _task_queue;
-}
-
-template <typename QueueType>
-void TaskGroupEntity<QueueType>::incr_runtime_ns(uint64_t runtime_ns) {
-    auto v_time = runtime_ns / _cpu_share;
-    _vruntime_ns += v_time;
-}
-
-template <typename QueueType>
-void TaskGroupEntity<QueueType>::adjust_vruntime_ns(uint64_t vruntime_ns) {
-    VLOG_DEBUG << "adjust " << debug_string() << "vtime to " << vruntime_ns;
-    _vruntime_ns = vruntime_ns;
-}
-
-template <typename QueueType>
-size_t TaskGroupEntity<QueueType>::task_size() const {
-    return _task_queue->size();
-}
-
-template <typename QueueType>
-uint64_t TaskGroupEntity<QueueType>::cpu_share() const {
-    return _cpu_share;
-}
-
-template <typename QueueType>
-uint64_t TaskGroupEntity<QueueType>::task_group_id() const {
-    return _tg->id();
-}
-
-template <typename QueueType>
-void TaskGroupEntity<QueueType>::check_and_update_cpu_share(const 
TaskGroupInfo& tg_info) {
-    if (tg_info.version > _version) {
-        _cpu_share = tg_info.cpu_share;
-        _version = tg_info.version;
-    }
-}
-
-template <typename QueueType>
-std::string TaskGroupEntity<QueueType>::debug_string() const {
-    return fmt::format("TGE[id = {}, name = {}-{}, cpu_share = {}, task size: 
{}, v_time:{} ns]",
-                       _tg->id(), _tg->name(), _type, cpu_share(), 
task_size(), _vruntime_ns);
-}
-
-template class TaskGroupEntity<std::queue<pipeline::PipelineTask*>>;
-
 TaskGroup::TaskGroup(const TaskGroupInfo& tg_info)
         : _id(tg_info.id),
           _name(tg_info.name),
@@ -109,7 +50,6 @@ TaskGroup::TaskGroup(const TaskGroupInfo& tg_info)
           _memory_limit(tg_info.memory_limit),
           _enable_memory_overcommit(tg_info.enable_memory_overcommit),
           _cpu_share(tg_info.cpu_share),
-          _task_entity(this, "pipeline task entity"),
           _mem_tracker_limiter_pool(MEM_TRACKER_GROUP_NUM),
           _cpu_hard_limit(tg_info.cpu_hard_limit) {}
 
@@ -145,8 +85,6 @@ void TaskGroup::check_and_update(const TaskGroupInfo& 
tg_info) {
             return;
         }
     }
-    
ExecEnv::GetInstance()->pipeline_task_group_scheduler()->task_queue()->update_tg_cpu_share(
-            tg_info, &_task_entity);
 }
 
 int64_t TaskGroup::memory_used() {
diff --git a/be/src/runtime/task_group/task_group.h 
b/be/src/runtime/task_group/task_group.h
index 3440bead9df..cf82f35fa20 100644
--- a/be/src/runtime/task_group/task_group.h
+++ b/be/src/runtime/task_group/task_group.h
@@ -44,50 +44,6 @@ namespace taskgroup {
 
 class TaskGroup;
 struct TaskGroupInfo;
-
-template <typename QueueType>
-class TaskGroupEntity {
-public:
-    explicit TaskGroupEntity(taskgroup::TaskGroup* tg, std::string type);
-    ~TaskGroupEntity();
-
-    uint64_t vruntime_ns() const { return _vruntime_ns; }
-
-    QueueType* task_queue();
-
-    void incr_runtime_ns(uint64_t runtime_ns);
-
-    void adjust_vruntime_ns(uint64_t vruntime_ns);
-
-    size_t task_size() const;
-
-    uint64_t cpu_share() const;
-
-    std::string debug_string() const;
-
-    uint64_t task_group_id() const;
-
-    void check_and_update_cpu_share(const TaskGroupInfo& tg_info);
-
-private:
-    QueueType* _task_queue = nullptr;
-
-    uint64_t _vruntime_ns = 0;
-    taskgroup::TaskGroup* _tg = nullptr;
-
-    std::string _type;
-
-    // Because updating cpu share of entity requires locking the task 
queue(pipeline task queue or
-    // scan task queue) contains that entity, we kept version and cpu share in 
entity for
-    // independent updates.
-    int64_t _version;
-    uint64_t _cpu_share;
-};
-
-// TODO llj tg use PriorityTaskQueue to replace std::queue
-using TaskGroupPipelineTaskEntity = 
TaskGroupEntity<std::queue<pipeline::PipelineTask*>>;
-using TGPTEntityPtr = TaskGroupPipelineTaskEntity*;
-
 struct TgTrackerLimiterGroup {
     std::unordered_set<std::shared_ptr<MemTrackerLimiter>> trackers;
     std::mutex group_lock;
@@ -97,8 +53,6 @@ class TaskGroup : public 
std::enable_shared_from_this<TaskGroup> {
 public:
     explicit TaskGroup(const TaskGroupInfo& tg_info);
 
-    TaskGroupPipelineTaskEntity* task_entity() { return &_task_entity; }
-
     int64_t version() const { return _version; }
 
     uint64_t cpu_share() const { return _cpu_share.load(); }
@@ -160,7 +114,6 @@ private:
     int64_t _memory_limit; // bytes
     bool _enable_memory_overcommit;
     std::atomic<uint64_t> _cpu_share;
-    TaskGroupPipelineTaskEntity _task_entity;
     std::vector<TgTrackerLimiterGroup> _mem_tracker_limiter_pool;
     std::atomic<int> _cpu_hard_limit;
 
diff --git a/be/src/runtime/task_group/task_group_manager.cpp 
b/be/src/runtime/task_group/task_group_manager.cpp
index 068f5eced37..9a2bca4ff6a 100644
--- a/be/src/runtime/task_group/task_group_manager.cpp
+++ b/be/src/runtime/task_group/task_group_manager.cpp
@@ -49,8 +49,9 @@ TaskGroupPtr TaskGroupManager::get_or_create_task_group(const 
TaskGroupInfo& tas
     return new_task_group;
 }
 
-void TaskGroupManager::get_resource_groups(const std::function<bool(const 
TaskGroupPtr& ptr)>& pred,
-                                           std::vector<TaskGroupPtr>* 
task_groups) {
+void TaskGroupManager::get_related_taskgroups(
+        const std::function<bool(const TaskGroupPtr& ptr)>& pred,
+        std::vector<TaskGroupPtr>* task_groups) {
     std::shared_lock<std::shared_mutex> r_lock(_group_mutex);
     for (const auto& [id, task_group] : _task_groups) {
         if (pred(task_group)) {
diff --git a/be/src/runtime/task_group/task_group_manager.h 
b/be/src/runtime/task_group/task_group_manager.h
index 78610b4efc3..19e056d5258 100644
--- a/be/src/runtime/task_group/task_group_manager.h
+++ b/be/src/runtime/task_group/task_group_manager.h
@@ -48,8 +48,8 @@ public:
 
     TaskGroupPtr get_or_create_task_group(const TaskGroupInfo& 
task_group_info);
 
-    void get_resource_groups(const std::function<bool(const TaskGroupPtr& 
ptr)>& pred,
-                             std::vector<TaskGroupPtr>* task_groups);
+    void get_related_taskgroups(const std::function<bool(const TaskGroupPtr& 
ptr)>& pred,
+                                std::vector<TaskGroupPtr>* task_groups);
 
     Status upsert_cg_task_scheduler(taskgroup::TaskGroupInfo* tg_info, 
ExecEnv* exec_env);
 
diff --git a/be/src/util/mem_info.cpp b/be/src/util/mem_info.cpp
index ec79a8f1cc3..1eaa3eacaf5 100644
--- a/be/src/util/mem_info.cpp
+++ b/be/src/util/mem_info.cpp
@@ -245,7 +245,7 @@ int64_t MemInfo::tg_not_enable_overcommit_group_gc() {
     std::unique_ptr<RuntimeProfile> tg_profile = 
std::make_unique<RuntimeProfile>("WorkloadGroup");
     int64_t total_free_memory = 0;
 
-    ExecEnv::GetInstance()->task_group_manager()->get_resource_groups(
+    ExecEnv::GetInstance()->task_group_manager()->get_related_taskgroups(
             [](const taskgroup::TaskGroupPtr& task_group) {
                 return task_group->is_mem_limit_valid() && 
!task_group->enable_memory_overcommit();
             },
@@ -301,7 +301,7 @@ int64_t MemInfo::tg_enable_overcommit_group_gc(int64_t 
request_free_memory,
     MonotonicStopWatch watch;
     watch.start();
     std::vector<taskgroup::TaskGroupPtr> task_groups;
-    ExecEnv::GetInstance()->task_group_manager()->get_resource_groups(
+    ExecEnv::GetInstance()->task_group_manager()->get_related_taskgroups(
             [](const taskgroup::TaskGroupPtr& task_group) {
                 return task_group->is_mem_limit_valid() && 
task_group->enable_memory_overcommit();
             },


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to