This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 016c293eaef [refactor](taskqueue) remove old task scheduler based wg
(#30832)
016c293eaef is described below
commit 016c293eaef8a897538cc12531b1431966a65454
Author: yiguolei <[email protected]>
AuthorDate: Mon Feb 5 18:22:01 2024 +0800
[refactor](taskqueue) remove old task scheduler based wg (#30832)
---------
Co-authored-by: yiguolei <[email protected]>
---
be/src/pipeline/pipeline_fragment_context.cpp | 3 -
be/src/pipeline/pipeline_fragment_context.h | 5 -
be/src/pipeline/pipeline_task.cpp | 4 -
be/src/pipeline/pipeline_task.h | 2 -
be/src/pipeline/task_queue.cpp | 170 -----------------------
be/src/pipeline/task_queue.h | 52 -------
be/src/runtime/exec_env.h | 4 -
be/src/runtime/exec_env_init.cpp | 10 --
be/src/runtime/query_context.cpp | 4 +-
be/src/runtime/query_context.h | 4 -
be/src/runtime/task_group/task_group.cpp | 62 ---------
be/src/runtime/task_group/task_group.h | 47 -------
be/src/runtime/task_group/task_group_manager.cpp | 5 +-
be/src/runtime/task_group/task_group_manager.h | 4 +-
be/src/util/mem_info.cpp | 4 +-
15 files changed, 8 insertions(+), 372 deletions(-)
diff --git a/be/src/pipeline/pipeline_fragment_context.cpp
b/be/src/pipeline/pipeline_fragment_context.cpp
index a7d33aa0c77..1fb50c9ea06 100644
--- a/be/src/pipeline/pipeline_fragment_context.cpp
+++ b/be/src/pipeline/pipeline_fragment_context.cpp
@@ -131,9 +131,6 @@ PipelineFragmentContext::PipelineFragmentContext(
_is_report_on_cancel(true),
_report_status_cb(report_status_cb),
_create_time(MonotonicNanos()) {
- if (_query_ctx->get_task_group()) {
- _task_group_entity = _query_ctx->get_task_group()->task_entity();
- }
_fragment_watcher.start();
}
diff --git a/be/src/pipeline/pipeline_fragment_context.h
b/be/src/pipeline/pipeline_fragment_context.h
index 0ec27c5054f..b8a3bf7e922 100644
--- a/be/src/pipeline/pipeline_fragment_context.h
+++ b/be/src/pipeline/pipeline_fragment_context.h
@@ -132,9 +132,6 @@ public:
return _query_ctx->exec_status();
}
- [[nodiscard]] taskgroup::TaskGroupPipelineTaskEntity*
get_task_group_entity() const {
- return _task_group_entity;
- }
void trigger_report_if_necessary();
virtual void instance_ids(std::vector<TUniqueId>& ins_ids) const {
ins_ids.resize(1);
@@ -198,8 +195,6 @@ protected:
std::shared_ptr<QueryContext> _query_ctx;
- taskgroup::TaskGroupPipelineTaskEntity* _task_group_entity = nullptr;
-
std::shared_ptr<RuntimeFilterMergeControllerEntity>
_merge_controller_handler;
MonotonicStopWatch _fragment_watcher;
diff --git a/be/src/pipeline/pipeline_task.cpp
b/be/src/pipeline/pipeline_task.cpp
index 6ea1b482c85..f31a39df31a 100644
--- a/be/src/pipeline/pipeline_task.cpp
+++ b/be/src/pipeline/pipeline_task.cpp
@@ -426,8 +426,4 @@ std::string PipelineTask::debug_string() {
return fmt::to_string(debug_string_buffer);
}
-taskgroup::TaskGroupPipelineTaskEntity* PipelineTask::get_task_group_entity()
const {
- return _fragment_context->get_task_group_entity();
-}
-
} // namespace doris::pipeline
diff --git a/be/src/pipeline/pipeline_task.h b/be/src/pipeline/pipeline_task.h
index 04bc55908e3..be1531de4e1 100644
--- a/be/src/pipeline/pipeline_task.h
+++ b/be/src/pipeline/pipeline_task.h
@@ -196,8 +196,6 @@ public:
virtual std::string debug_string();
- taskgroup::TaskGroupPipelineTaskEntity* get_task_group_entity() const;
-
void set_task_queue(TaskQueue* task_queue);
TaskQueue* get_task_queue() { return _task_queue; }
diff --git a/be/src/pipeline/task_queue.cpp b/be/src/pipeline/task_queue.cpp
index a68a2ba4a7f..a79d865821d 100644
--- a/be/src/pipeline/task_queue.cpp
+++ b/be/src/pipeline/task_queue.cpp
@@ -204,175 +204,5 @@ Status MultiCoreTaskQueue::push_back(PipelineTask* task,
size_t core_id) {
return _prio_task_queue_list[core_id].push(task);
}
-bool TaskGroupTaskQueue::TaskGroupSchedEntityComparator::operator()(
- const taskgroup::TGPTEntityPtr& lhs_ptr, const
taskgroup::TGPTEntityPtr& rhs_ptr) const {
- auto lhs_val = lhs_ptr->vruntime_ns();
- auto rhs_val = rhs_ptr->vruntime_ns();
- if (lhs_val != rhs_val) {
- return lhs_val < rhs_val;
- } else {
- auto l_share = lhs_ptr->cpu_share();
- auto r_share = rhs_ptr->cpu_share();
- if (l_share != r_share) {
- return l_share < r_share;
- } else {
- return lhs_ptr->task_group_id() < rhs_ptr->task_group_id();
- }
- }
-}
-
-TaskGroupTaskQueue::TaskGroupTaskQueue(size_t core_size)
- : TaskQueue(core_size), _min_tg_entity(nullptr) {}
-
-TaskGroupTaskQueue::~TaskGroupTaskQueue() = default;
-
-void TaskGroupTaskQueue::close() {
- std::unique_lock<std::mutex> lock(_rs_mutex);
- _closed = true;
- _wait_task.notify_all();
-}
-
-Status TaskGroupTaskQueue::push_back(PipelineTask* task) {
- return _push_back<false>(task);
-}
-
-Status TaskGroupTaskQueue::push_back(PipelineTask* task, size_t core_id) {
- return _push_back<true>(task);
-}
-
-template <bool from_executor>
-Status TaskGroupTaskQueue::_push_back(PipelineTask* task) {
- task->put_in_runnable_queue();
- auto* entity = task->get_task_group_entity();
- std::unique_lock<std::mutex> lock(_rs_mutex);
- entity->task_queue()->emplace(task);
- if (_group_entities.find(entity) == _group_entities.end()) {
- _enqueue_task_group<from_executor>(entity);
- }
- _wait_task.notify_one();
- return Status::OK();
-}
-
-// TODO pipeline support steal
-PipelineTask* TaskGroupTaskQueue::take(size_t core_id) {
- std::unique_lock<std::mutex> lock(_rs_mutex);
- taskgroup::TGPTEntityPtr entity = nullptr;
- while (entity == nullptr) {
- if (_closed) {
- return nullptr;
- }
- if (_group_entities.empty()) {
- _wait_task.wait(lock);
- } else {
- entity = _next_tg_entity();
- if (!entity) {
- _wait_task.wait_for(lock,
std::chrono::milliseconds(WAIT_CORE_TASK_TIMEOUT_MS));
- }
- }
- }
- DCHECK(entity->task_size() > 0);
- if (entity->task_size() == 1) {
- _dequeue_task_group(entity);
- }
- auto task = entity->task_queue()->front();
- if (task) {
- entity->task_queue()->pop();
- task->pop_out_runnable_queue();
- }
- return task;
-}
-
-template <bool from_worker>
-void TaskGroupTaskQueue::_enqueue_task_group(taskgroup::TGPTEntityPtr
tg_entity) {
- _total_cpu_share += tg_entity->cpu_share();
- if constexpr (!from_worker) {
- /**
- * If a task group entity leaves task queue for a long time, its v
runtime will be very
- * small. This can cause it to preempt too many execution time. So, in
order to avoid this
- * situation, it is necessary to adjust the task group's v runtime.
- * */
- auto old_v_ns = tg_entity->vruntime_ns();
- auto* min_entity = _min_tg_entity.load();
- if (min_entity) {
- auto min_tg_v = min_entity->vruntime_ns();
- auto ideal_r = _ideal_runtime_ns(tg_entity) / 2;
- uint64_t new_vruntime_ns = min_tg_v > ideal_r ? min_tg_v - ideal_r
: min_tg_v;
- if (new_vruntime_ns > old_v_ns) {
- tg_entity->adjust_vruntime_ns(new_vruntime_ns);
- }
- } else if (old_v_ns < _min_tg_v_runtime_ns) {
- tg_entity->adjust_vruntime_ns(_min_tg_v_runtime_ns);
- }
- }
- _group_entities.emplace(tg_entity);
- VLOG_DEBUG << "enqueue tg " << tg_entity->debug_string()
- << ", group entity size: " << _group_entities.size();
- _update_min_tg();
-}
-
-void TaskGroupTaskQueue::_dequeue_task_group(taskgroup::TGPTEntityPtr
tg_entity) {
- _total_cpu_share -= tg_entity->cpu_share();
- _group_entities.erase(tg_entity);
- VLOG_DEBUG << "dequeue tg " << tg_entity->debug_string()
- << ", group entity size: " << _group_entities.size();
- _update_min_tg();
-}
-
-void TaskGroupTaskQueue::_update_min_tg() {
- auto* min_entity = _next_tg_entity();
- _min_tg_entity = min_entity;
- if (min_entity) {
- auto min_v_runtime = min_entity->vruntime_ns();
- if (min_v_runtime > _min_tg_v_runtime_ns) {
- _min_tg_v_runtime_ns = min_v_runtime;
- }
- }
-}
-
-// like sched_fair.c calc_delta_fair, THREAD_TIME_SLICE maybe a dynamic value.
-uint64_t TaskGroupTaskQueue::_ideal_runtime_ns(taskgroup::TGPTEntityPtr
tg_entity) const {
- return PipelineTask::THREAD_TIME_SLICE * _core_size *
tg_entity->cpu_share() / _total_cpu_share;
-}
-
-taskgroup::TGPTEntityPtr TaskGroupTaskQueue::_next_tg_entity() {
- taskgroup::TGPTEntityPtr res = nullptr;
- for (auto* entity : _group_entities) {
- res = entity;
- break;
- }
- return res;
-}
-
-void TaskGroupTaskQueue::update_statistics(PipelineTask* task, int64_t
time_spent) {
- std::unique_lock<std::mutex> lock(_rs_mutex);
- auto* entity = task->get_task_group_entity();
- auto find_entity = _group_entities.find(entity);
- bool is_in_queue = find_entity != _group_entities.end();
- VLOG_DEBUG << "update_statistics " << entity->debug_string() << ", in
queue:" << is_in_queue;
- if (is_in_queue) {
- _group_entities.erase(entity);
- }
- entity->incr_runtime_ns(time_spent);
- if (is_in_queue) {
- _group_entities.emplace(entity);
- _update_min_tg();
- }
-}
-
-void TaskGroupTaskQueue::update_tg_cpu_share(const taskgroup::TaskGroupInfo&
task_group_info,
- taskgroup::TGPTEntityPtr entity) {
- std::unique_lock<std::mutex> lock(_rs_mutex);
- bool is_in_queue = _group_entities.find(entity) != _group_entities.end();
- if (is_in_queue) {
- _group_entities.erase(entity);
- _total_cpu_share -= entity->cpu_share();
- }
- entity->check_and_update_cpu_share(task_group_info);
- if (is_in_queue) {
- _group_entities.emplace(entity);
- _total_cpu_share += entity->cpu_share();
- }
-}
-
} // namespace pipeline
} // namespace doris
\ No newline at end of file
diff --git a/be/src/pipeline/task_queue.h b/be/src/pipeline/task_queue.h
index d693cbe2168..9440fc18d83 100644
--- a/be/src/pipeline/task_queue.h
+++ b/be/src/pipeline/task_queue.h
@@ -52,9 +52,6 @@ public:
virtual void update_statistics(PipelineTask* task, int64_t time_spent) {}
- virtual void update_tg_cpu_share(const taskgroup::TaskGroupInfo&
task_group_info,
- taskgroup::TGPTEntityPtr entity) = 0;
-
int cores() const { return _core_size; }
protected:
@@ -154,11 +151,6 @@ public:
time_spent);
}
- void update_tg_cpu_share(const taskgroup::TaskGroupInfo& task_group_info,
- taskgroup::TGPTEntityPtr entity) override {
- LOG(FATAL) << "update_tg_cpu_share not implemented";
- }
-
private:
PipelineTask* _steal_take(size_t core_id);
@@ -167,49 +159,5 @@ private:
std::atomic<bool> _closed;
};
-class TaskGroupTaskQueue : public TaskQueue {
-public:
- explicit TaskGroupTaskQueue(size_t);
- ~TaskGroupTaskQueue() override;
-
- void close() override;
-
- PipelineTask* take(size_t core_id) override;
-
- // from TaskScheduler or BlockedTaskScheduler
- Status push_back(PipelineTask* task) override;
-
- // from worker
- Status push_back(PipelineTask* task, size_t core_id) override;
-
- void update_statistics(PipelineTask* task, int64_t time_spent) override;
-
- void update_tg_cpu_share(const taskgroup::TaskGroupInfo& task_group_info,
- taskgroup::TGPTEntityPtr entity) override;
-
-private:
- template <bool from_executor>
- Status _push_back(PipelineTask* task);
- template <bool from_worker>
- void _enqueue_task_group(taskgroup::TGPTEntityPtr);
- void _dequeue_task_group(taskgroup::TGPTEntityPtr);
- taskgroup::TGPTEntityPtr _next_tg_entity();
- uint64_t _ideal_runtime_ns(taskgroup::TGPTEntityPtr tg_entity) const;
- void _update_min_tg();
-
- // Like cfs rb tree in sched_entity
- struct TaskGroupSchedEntityComparator {
- bool operator()(const taskgroup::TGPTEntityPtr&, const
taskgroup::TGPTEntityPtr&) const;
- };
- using ResouceGroupSet = std::set<taskgroup::TGPTEntityPtr,
TaskGroupSchedEntityComparator>;
- ResouceGroupSet _group_entities;
- std::condition_variable _wait_task;
- std::mutex _rs_mutex;
- bool _closed = false;
- int _total_cpu_share = 0;
- std::atomic<taskgroup::TGPTEntityPtr> _min_tg_entity = nullptr;
- uint64_t _min_tg_v_runtime_ns = 0;
-};
-
} // namespace pipeline
} // namespace doris
diff --git a/be/src/runtime/exec_env.h b/be/src/runtime/exec_env.h
index b8120f1c731..24316065397 100644
--- a/be/src/runtime/exec_env.h
+++ b/be/src/runtime/exec_env.h
@@ -147,7 +147,6 @@ public:
ClientCache<TPaloBrokerServiceClient>* broker_client_cache() { return
_broker_client_cache; }
pipeline::TaskScheduler* pipeline_task_scheduler() { return
_without_group_task_scheduler; }
- pipeline::TaskScheduler* pipeline_task_group_scheduler() { return
_with_group_task_scheduler; }
taskgroup::TaskGroupManager* task_group_manager() { return
_task_group_manager; }
WorkloadSchedPolicyMgr* workload_sched_policy_mgr() { return
_workload_sched_mgr; }
RuntimeQueryStatiticsMgr* runtime_query_statistics_mgr() {
@@ -320,7 +319,6 @@ private:
FragmentMgr* _fragment_mgr = nullptr;
pipeline::TaskScheduler* _without_group_task_scheduler = nullptr;
- pipeline::TaskScheduler* _with_group_task_scheduler = nullptr;
taskgroup::TaskGroupManager* _task_group_manager = nullptr;
ResultCache* _result_cache = nullptr;
@@ -374,8 +372,6 @@ private:
std::shared_ptr<doris::pipeline::BlockedTaskScheduler>
_global_block_scheduler;
// used for query without workload group
std::shared_ptr<doris::pipeline::BlockedTaskScheduler>
_without_group_block_scheduler;
- // used for query with workload group cpu soft limit
- std::shared_ptr<doris::pipeline::BlockedTaskScheduler>
_with_group_block_scheduler;
doris::pipeline::RuntimeFilterTimerQueue* _runtime_filter_timer_queue =
nullptr;
diff --git a/be/src/runtime/exec_env_init.cpp b/be/src/runtime/exec_env_init.cpp
index 9942262f9c9..1883a6f2dbe 100644
--- a/be/src/runtime/exec_env_init.cpp
+++ b/be/src/runtime/exec_env_init.cpp
@@ -301,13 +301,6 @@ Status ExecEnv::init_pipeline_task_scheduler() {
RETURN_IF_ERROR(_without_group_task_scheduler->start());
RETURN_IF_ERROR(_without_group_block_scheduler->start());
- auto tg_queue =
std::make_shared<pipeline::TaskGroupTaskQueue>(executors_size);
- _with_group_block_scheduler =
std::make_shared<pipeline::BlockedTaskScheduler>("PipeGSchePool");
- _with_group_task_scheduler = new pipeline::TaskScheduler(this,
_with_group_block_scheduler,
- tg_queue,
"PipeGSchePool", nullptr);
- RETURN_IF_ERROR(_with_group_task_scheduler->start());
- RETURN_IF_ERROR(_with_group_block_scheduler->start());
-
_global_block_scheduler =
std::make_shared<pipeline::BlockedTaskScheduler>("PipeGBlockSche");
RETURN_IF_ERROR(_global_block_scheduler->start());
_runtime_filter_timer_queue = new
doris::pipeline::RuntimeFilterTimerQueue();
@@ -542,8 +535,6 @@ void ExecEnv::destroy() {
// stop pipline step 1, non-cgroup execution
SAFE_SHUTDOWN(_without_group_block_scheduler.get());
SAFE_STOP(_without_group_task_scheduler);
- SAFE_SHUTDOWN(_with_group_block_scheduler.get());
- SAFE_STOP(_with_group_task_scheduler);
// stop pipline step 2, cgroup execution
SAFE_SHUTDOWN(_global_block_scheduler.get());
SAFE_STOP(_task_group_manager);
@@ -611,7 +602,6 @@ void ExecEnv::destroy() {
SAFE_DELETE(_fragment_mgr);
SAFE_DELETE(_workload_sched_mgr);
SAFE_DELETE(_task_group_manager);
- SAFE_DELETE(_with_group_task_scheduler);
SAFE_DELETE(_without_group_task_scheduler);
SAFE_DELETE(_file_cache_factory);
SAFE_DELETE(_runtime_filter_timer_queue);
diff --git a/be/src/runtime/query_context.cpp b/be/src/runtime/query_context.cpp
index 5b2de639d47..c5f3ece16f8 100644
--- a/be/src/runtime/query_context.cpp
+++ b/be/src/runtime/query_context.cpp
@@ -162,9 +162,7 @@ void QueryContext::set_query_scheduler(uint64_t tg_id) {
doris::pipeline::TaskScheduler* QueryContext::get_pipe_exec_scheduler() {
if (_task_group) {
- if (!config::enable_cgroup_cpu_soft_limit) {
- return _exec_env->pipeline_task_group_scheduler();
- } else if (_task_scheduler) {
+ if (_task_scheduler) {
return _task_scheduler;
}
}
diff --git a/be/src/runtime/query_context.h b/be/src/runtime/query_context.h
index d0dcd02452b..948b2427ddc 100644
--- a/be/src/runtime/query_context.h
+++ b/be/src/runtime/query_context.h
@@ -152,10 +152,6 @@ public:
void set_task_group(taskgroup::TaskGroupPtr& tg) { _task_group = tg; }
- taskgroup::TaskGroup* get_task_group() const {
- return _task_group == nullptr ? nullptr : _task_group.get();
- }
-
int execution_timeout() const {
return _query_options.__isset.execution_timeout ?
_query_options.execution_timeout
:
_query_options.query_timeout;
diff --git a/be/src/runtime/task_group/task_group.cpp
b/be/src/runtime/task_group/task_group.cpp
index 9e86f8b831b..e0b0dc1fb5e 100644
--- a/be/src/runtime/task_group/task_group.cpp
+++ b/be/src/runtime/task_group/task_group.cpp
@@ -43,65 +43,6 @@ const static std::string MEMORY_LIMIT_DEFAULT_VALUE = "0%";
const static bool ENABLE_MEMORY_OVERCOMMIT_DEFAULT_VALUE = true;
const static int CPU_HARD_LIMIT_DEFAULT_VALUE = -1;
-template <typename QueueType>
-TaskGroupEntity<QueueType>::TaskGroupEntity(taskgroup::TaskGroup* tg,
std::string type)
- : _tg(tg), _type(type), _version(tg->version()),
_cpu_share(tg->cpu_share()) {
- _task_queue = new QueueType();
-}
-
-template <typename QueueType>
-TaskGroupEntity<QueueType>::~TaskGroupEntity() {
- delete _task_queue;
-}
-
-template <typename QueueType>
-QueueType* TaskGroupEntity<QueueType>::task_queue() {
- return _task_queue;
-}
-
-template <typename QueueType>
-void TaskGroupEntity<QueueType>::incr_runtime_ns(uint64_t runtime_ns) {
- auto v_time = runtime_ns / _cpu_share;
- _vruntime_ns += v_time;
-}
-
-template <typename QueueType>
-void TaskGroupEntity<QueueType>::adjust_vruntime_ns(uint64_t vruntime_ns) {
- VLOG_DEBUG << "adjust " << debug_string() << "vtime to " << vruntime_ns;
- _vruntime_ns = vruntime_ns;
-}
-
-template <typename QueueType>
-size_t TaskGroupEntity<QueueType>::task_size() const {
- return _task_queue->size();
-}
-
-template <typename QueueType>
-uint64_t TaskGroupEntity<QueueType>::cpu_share() const {
- return _cpu_share;
-}
-
-template <typename QueueType>
-uint64_t TaskGroupEntity<QueueType>::task_group_id() const {
- return _tg->id();
-}
-
-template <typename QueueType>
-void TaskGroupEntity<QueueType>::check_and_update_cpu_share(const
TaskGroupInfo& tg_info) {
- if (tg_info.version > _version) {
- _cpu_share = tg_info.cpu_share;
- _version = tg_info.version;
- }
-}
-
-template <typename QueueType>
-std::string TaskGroupEntity<QueueType>::debug_string() const {
- return fmt::format("TGE[id = {}, name = {}-{}, cpu_share = {}, task size:
{}, v_time:{} ns]",
- _tg->id(), _tg->name(), _type, cpu_share(),
task_size(), _vruntime_ns);
-}
-
-template class TaskGroupEntity<std::queue<pipeline::PipelineTask*>>;
-
TaskGroup::TaskGroup(const TaskGroupInfo& tg_info)
: _id(tg_info.id),
_name(tg_info.name),
@@ -109,7 +50,6 @@ TaskGroup::TaskGroup(const TaskGroupInfo& tg_info)
_memory_limit(tg_info.memory_limit),
_enable_memory_overcommit(tg_info.enable_memory_overcommit),
_cpu_share(tg_info.cpu_share),
- _task_entity(this, "pipeline task entity"),
_mem_tracker_limiter_pool(MEM_TRACKER_GROUP_NUM),
_cpu_hard_limit(tg_info.cpu_hard_limit) {}
@@ -145,8 +85,6 @@ void TaskGroup::check_and_update(const TaskGroupInfo&
tg_info) {
return;
}
}
-
ExecEnv::GetInstance()->pipeline_task_group_scheduler()->task_queue()->update_tg_cpu_share(
- tg_info, &_task_entity);
}
int64_t TaskGroup::memory_used() {
diff --git a/be/src/runtime/task_group/task_group.h
b/be/src/runtime/task_group/task_group.h
index 3440bead9df..cf82f35fa20 100644
--- a/be/src/runtime/task_group/task_group.h
+++ b/be/src/runtime/task_group/task_group.h
@@ -44,50 +44,6 @@ namespace taskgroup {
class TaskGroup;
struct TaskGroupInfo;
-
-template <typename QueueType>
-class TaskGroupEntity {
-public:
- explicit TaskGroupEntity(taskgroup::TaskGroup* tg, std::string type);
- ~TaskGroupEntity();
-
- uint64_t vruntime_ns() const { return _vruntime_ns; }
-
- QueueType* task_queue();
-
- void incr_runtime_ns(uint64_t runtime_ns);
-
- void adjust_vruntime_ns(uint64_t vruntime_ns);
-
- size_t task_size() const;
-
- uint64_t cpu_share() const;
-
- std::string debug_string() const;
-
- uint64_t task_group_id() const;
-
- void check_and_update_cpu_share(const TaskGroupInfo& tg_info);
-
-private:
- QueueType* _task_queue = nullptr;
-
- uint64_t _vruntime_ns = 0;
- taskgroup::TaskGroup* _tg = nullptr;
-
- std::string _type;
-
- // Because updating cpu share of entity requires locking the task
queue(pipeline task queue or
- // scan task queue) contains that entity, we kept version and cpu share in
entity for
- // independent updates.
- int64_t _version;
- uint64_t _cpu_share;
-};
-
-// TODO llj tg use PriorityTaskQueue to replace std::queue
-using TaskGroupPipelineTaskEntity =
TaskGroupEntity<std::queue<pipeline::PipelineTask*>>;
-using TGPTEntityPtr = TaskGroupPipelineTaskEntity*;
-
struct TgTrackerLimiterGroup {
std::unordered_set<std::shared_ptr<MemTrackerLimiter>> trackers;
std::mutex group_lock;
@@ -97,8 +53,6 @@ class TaskGroup : public
std::enable_shared_from_this<TaskGroup> {
public:
explicit TaskGroup(const TaskGroupInfo& tg_info);
- TaskGroupPipelineTaskEntity* task_entity() { return &_task_entity; }
-
int64_t version() const { return _version; }
uint64_t cpu_share() const { return _cpu_share.load(); }
@@ -160,7 +114,6 @@ private:
int64_t _memory_limit; // bytes
bool _enable_memory_overcommit;
std::atomic<uint64_t> _cpu_share;
- TaskGroupPipelineTaskEntity _task_entity;
std::vector<TgTrackerLimiterGroup> _mem_tracker_limiter_pool;
std::atomic<int> _cpu_hard_limit;
diff --git a/be/src/runtime/task_group/task_group_manager.cpp
b/be/src/runtime/task_group/task_group_manager.cpp
index 068f5eced37..9a2bca4ff6a 100644
--- a/be/src/runtime/task_group/task_group_manager.cpp
+++ b/be/src/runtime/task_group/task_group_manager.cpp
@@ -49,8 +49,9 @@ TaskGroupPtr TaskGroupManager::get_or_create_task_group(const
TaskGroupInfo& tas
return new_task_group;
}
-void TaskGroupManager::get_resource_groups(const std::function<bool(const
TaskGroupPtr& ptr)>& pred,
- std::vector<TaskGroupPtr>*
task_groups) {
+void TaskGroupManager::get_related_taskgroups(
+ const std::function<bool(const TaskGroupPtr& ptr)>& pred,
+ std::vector<TaskGroupPtr>* task_groups) {
std::shared_lock<std::shared_mutex> r_lock(_group_mutex);
for (const auto& [id, task_group] : _task_groups) {
if (pred(task_group)) {
diff --git a/be/src/runtime/task_group/task_group_manager.h
b/be/src/runtime/task_group/task_group_manager.h
index 78610b4efc3..19e056d5258 100644
--- a/be/src/runtime/task_group/task_group_manager.h
+++ b/be/src/runtime/task_group/task_group_manager.h
@@ -48,8 +48,8 @@ public:
TaskGroupPtr get_or_create_task_group(const TaskGroupInfo&
task_group_info);
- void get_resource_groups(const std::function<bool(const TaskGroupPtr&
ptr)>& pred,
- std::vector<TaskGroupPtr>* task_groups);
+ void get_related_taskgroups(const std::function<bool(const TaskGroupPtr&
ptr)>& pred,
+ std::vector<TaskGroupPtr>* task_groups);
Status upsert_cg_task_scheduler(taskgroup::TaskGroupInfo* tg_info,
ExecEnv* exec_env);
diff --git a/be/src/util/mem_info.cpp b/be/src/util/mem_info.cpp
index ec79a8f1cc3..1eaa3eacaf5 100644
--- a/be/src/util/mem_info.cpp
+++ b/be/src/util/mem_info.cpp
@@ -245,7 +245,7 @@ int64_t MemInfo::tg_not_enable_overcommit_group_gc() {
std::unique_ptr<RuntimeProfile> tg_profile =
std::make_unique<RuntimeProfile>("WorkloadGroup");
int64_t total_free_memory = 0;
- ExecEnv::GetInstance()->task_group_manager()->get_resource_groups(
+ ExecEnv::GetInstance()->task_group_manager()->get_related_taskgroups(
[](const taskgroup::TaskGroupPtr& task_group) {
return task_group->is_mem_limit_valid() &&
!task_group->enable_memory_overcommit();
},
@@ -301,7 +301,7 @@ int64_t MemInfo::tg_enable_overcommit_group_gc(int64_t
request_free_memory,
MonotonicStopWatch watch;
watch.start();
std::vector<taskgroup::TaskGroupPtr> task_groups;
- ExecEnv::GetInstance()->task_group_manager()->get_resource_groups(
+ ExecEnv::GetInstance()->task_group_manager()->get_related_taskgroups(
[](const taskgroup::TaskGroupPtr& task_group) {
return task_group->is_mem_limit_valid() &&
task_group->enable_memory_overcommit();
},
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]