This is an automated email from the ASF dual-hosted git repository.
wangbo pushed a commit to branch branch-2.0
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.0 by this push:
new 002d3e63e41 [improvement](pipeline) task group scan entity (#19924)
(#27040)
002d3e63e41 is described below
commit 002d3e63e41736c95c0cbad8787a688cb86a5a66
Author: wangbo <[email protected]>
AuthorDate: Thu Nov 16 09:42:44 2023 +0800
[improvement](pipeline) task group scan entity (#19924) (#27040)
Co-authored-by: Lijia Liu <[email protected]>
---
be/src/common/config.cpp | 2 +-
be/src/common/config.h | 2 +-
be/src/pipeline/pipeline_fragment_context.cpp | 5 +-
be/src/pipeline/pipeline_fragment_context.h | 10 +-
be/src/pipeline/pipeline_task.cpp | 10 +-
be/src/pipeline/pipeline_task.h | 9 +-
be/src/pipeline/task_queue.cpp | 59 +++---
be/src/pipeline/task_queue.h | 28 +--
be/src/pipeline/task_scheduler.cpp | 6 +-
be/src/pipeline/task_scheduler.h | 3 +-
be/src/runtime/exec_env.h | 5 +
be/src/runtime/exec_env_init.cpp | 3 +
be/src/runtime/fragment_mgr.cpp | 2 +-
be/src/runtime/task_group/task_group.cpp | 98 ++++++----
be/src/runtime/task_group/task_group.h | 59 ++++--
be/src/runtime/task_group/task_group_manager.cpp | 8 +-
be/src/runtime/task_group/task_group_manager.h | 8 +-
be/src/util/mem_info.cpp | 4 +-
be/src/vec/exec/scan/scan_task_queue.cpp | 213 +++++++++++++++++++++
be/src/vec/exec/scan/scan_task_queue.h | 98 ++++++++++
be/src/vec/exec/scan/scanner_context.cpp | 19 ++
be/src/vec/exec/scan/scanner_context.h | 6 +
be/src/vec/exec/scan/scanner_scheduler.cpp | 54 +++++-
be/src/vec/exec/scan/scanner_scheduler.h | 13 ++
be/src/vec/exec/scan/vscanner.h | 2 +
.../main/java/org/apache/doris/common/Config.java | 5 +-
26 files changed, 592 insertions(+), 139 deletions(-)
diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp
index 22ec143c6da..b09e77f7eb3 100644
--- a/be/src/common/config.cpp
+++ b/be/src/common/config.cpp
@@ -955,7 +955,7 @@ DEFINE_Bool(enable_fuzzy_mode, "false");
DEFINE_Bool(enable_debug_points, "false");
DEFINE_Int32(pipeline_executor_size, "0");
-DEFINE_mInt16(pipeline_short_query_timeout_s, "20");
+DEFINE_mBool(enable_workload_group_for_scan, "false");
// Temp config. True to use optimization for bitmap_index apply predicate
except leaf node of the and node.
// Will remove after fully test.
diff --git a/be/src/common/config.h b/be/src/common/config.h
index 0a11a4b46d7..de372deaf95 100644
--- a/be/src/common/config.h
+++ b/be/src/common/config.h
@@ -997,7 +997,7 @@ DECLARE_Bool(enable_fuzzy_mode);
DECLARE_Bool(enable_debug_points);
DECLARE_Int32(pipeline_executor_size);
-DECLARE_mInt16(pipeline_short_query_timeout_s);
+DECLARE_mBool(enable_workload_group_for_scan);
// Temp config. True to use optimization for bitmap_index apply predicate
except leaf node of the and node.
// Will remove after fully test.
diff --git a/be/src/pipeline/pipeline_fragment_context.cpp
b/be/src/pipeline/pipeline_fragment_context.cpp
index d718e1c6eee..1b7dffae1c1 100644
--- a/be/src/pipeline/pipeline_fragment_context.cpp
+++ b/be/src/pipeline/pipeline_fragment_context.cpp
@@ -128,6 +128,9 @@ PipelineFragmentContext::PipelineFragmentContext(
_report_thread_active(false),
_report_status_cb(report_status_cb),
_is_report_on_cancel(true) {
+ if (_query_ctx->get_task_group()) {
+ _task_group_entity = _query_ctx->get_task_group()->task_entity();
+ }
_report_thread_future = _report_thread_promise.get_future();
_fragment_watcher.start();
}
@@ -689,7 +692,7 @@ Status PipelineFragmentContext::submit() {
int submit_tasks = 0;
Status st;
auto* scheduler = _exec_env->pipeline_task_scheduler();
- if (get_task_group()) {
+ if (_task_group_entity) {
scheduler = _exec_env->pipeline_task_group_scheduler();
}
for (auto& task : _tasks) {
diff --git a/be/src/pipeline/pipeline_fragment_context.h
b/be/src/pipeline/pipeline_fragment_context.h
index ed79800ec60..1b44894b842 100644
--- a/be/src/pipeline/pipeline_fragment_context.h
+++ b/be/src/pipeline/pipeline_fragment_context.h
@@ -48,10 +48,6 @@ class RuntimeFilterMergeControllerEntity;
class TDataSink;
class TPipelineFragmentParams;
-namespace taskgroup {
-class TaskGroup;
-} // namespace taskgroup
-
namespace pipeline {
class PipelineFragmentContext : public
std::enable_shared_from_this<PipelineFragmentContext> {
@@ -121,7 +117,9 @@ public:
return _exec_status;
}
- taskgroup::TaskGroup* get_task_group() const { return
_query_ctx->get_task_group(); }
+ taskgroup::TaskGroupPipelineTaskEntity* get_task_group_entity() const {
+ return _task_group_entity;
+ }
private:
Status _create_sink(int sender_id, const TDataSink& t_data_sink,
RuntimeState* state);
@@ -176,6 +174,8 @@ private:
std::shared_ptr<QueryContext> _query_ctx;
+ taskgroup::TaskGroupPipelineTaskEntity* _task_group_entity = nullptr;
+
std::shared_ptr<RuntimeFilterMergeControllerEntity>
_merge_controller_handler;
MonotonicStopWatch _fragment_watcher;
diff --git a/be/src/pipeline/pipeline_task.cpp
b/be/src/pipeline/pipeline_task.cpp
index 9e40b85f917..0d3cb413bf7 100644
--- a/be/src/pipeline/pipeline_task.cpp
+++ b/be/src/pipeline/pipeline_task.cpp
@@ -37,9 +37,6 @@
namespace doris {
class RuntimeState;
-namespace taskgroup {
-class TaskGroup;
-} // namespace taskgroup
} // namespace doris
namespace doris::pipeline {
@@ -402,7 +399,8 @@ std::string PipelineTask::debug_string() {
_task_profile->pretty_print(&profile_ss, "");
fmt::format_to(debug_string_buffer, "Profile: {}\n", profile_ss.str());
}
- fmt::format_to(debug_string_buffer, "PipelineTask[id = {}, state =
{}]\noperators: ", _index,
+ fmt::format_to(debug_string_buffer,
+ "PipelineTask[this = {}, state = {}]\noperators: ",
(void*)this,
get_state_name(_cur_state));
for (size_t i = 0; i < _operators.size(); i++) {
fmt::format_to(debug_string_buffer, "\n{}{}", std::string(i * 2, ' '),
@@ -422,8 +420,8 @@ std::string PipelineTask::debug_string() {
return fmt::to_string(debug_string_buffer);
}
-taskgroup::TaskGroup* PipelineTask::get_task_group() const {
- return _fragment_context->get_task_group();
+taskgroup::TaskGroupPipelineTaskEntity* PipelineTask::get_task_group_entity()
const {
+ return _fragment_context->get_task_group_entity();
}
} // namespace doris::pipeline
diff --git a/be/src/pipeline/pipeline_task.h b/be/src/pipeline/pipeline_task.h
index 9fdf3c82e05..ea78d795006 100644
--- a/be/src/pipeline/pipeline_task.h
+++ b/be/src/pipeline/pipeline_task.h
@@ -26,6 +26,7 @@
#include "common/status.h"
#include "exec/operator.h"
#include "pipeline.h"
+#include "runtime/task_group/task_group.h"
#include "util/runtime_profile.h"
#include "util/stopwatch.hpp"
#include "vec/core/block.h"
@@ -36,9 +37,6 @@ class RuntimeState;
namespace pipeline {
class PipelineFragmentContext;
} // namespace pipeline
-namespace taskgroup {
-class TaskGroup;
-} // namespace taskgroup
} // namespace doris
namespace doris::pipeline {
@@ -107,6 +105,7 @@ inline const char* get_state_name(PipelineTaskState idx) {
}
class TaskQueue;
+class PriorityTaskQueue;
// The class do the pipeline task. Minest schdule union by task scheduler
class PipelineTask {
@@ -185,11 +184,11 @@ public:
std::string debug_string();
- taskgroup::TaskGroup* get_task_group() const;
+ taskgroup::TaskGroupPipelineTaskEntity* get_task_group_entity() const;
void set_task_queue(TaskQueue* task_queue);
- static constexpr auto THREAD_TIME_SLICE = 100'000'000L;
+ static constexpr auto THREAD_TIME_SLICE = 100'000'000ULL;
// 1 used for update priority queue
// note(wb) an ugly implementation, need refactor later
diff --git a/be/src/pipeline/task_queue.cpp b/be/src/pipeline/task_queue.cpp
index 715c9601cdf..a68a2ba4a7f 100644
--- a/be/src/pipeline/task_queue.cpp
+++ b/be/src/pipeline/task_queue.cpp
@@ -23,7 +23,6 @@
#include "common/logging.h"
#include "pipeline/pipeline_task.h"
-#include "runtime/task_group/task_group.h"
namespace doris {
namespace pipeline {
@@ -55,7 +54,7 @@ void PriorityTaskQueue::close() {
_wait_task.notify_all();
}
-PipelineTask* PriorityTaskQueue::try_take_unprotected(bool is_steal) {
+PipelineTask* PriorityTaskQueue::_try_take_unprotected(bool is_steal) {
if (_total_task_size == 0 || _closed) {
return nullptr;
}
@@ -94,12 +93,12 @@ int PriorityTaskQueue::_compute_level(uint64_t runtime) {
PipelineTask* PriorityTaskQueue::try_take(bool is_steal) {
// TODO other efficient lock? e.g. if get lock fail, return null_ptr
std::unique_lock<std::mutex> lock(_work_size_mutex);
- return try_take_unprotected(is_steal);
+ return _try_take_unprotected(is_steal);
}
PipelineTask* PriorityTaskQueue::take(uint32_t timeout_ms) {
std::unique_lock<std::mutex> lock(_work_size_mutex);
- auto task = try_take_unprotected(false);
+ auto task = _try_take_unprotected(false);
if (task) {
return task;
} else {
@@ -108,7 +107,7 @@ PipelineTask* PriorityTaskQueue::take(uint32_t timeout_ms) {
} else {
_wait_task.wait(lock);
}
- return try_take_unprotected(false);
+ return _try_take_unprotected(false);
}
}
@@ -131,6 +130,11 @@ Status PriorityTaskQueue::push(PipelineTask* task) {
return Status::OK();
}
+int PriorityTaskQueue::task_size() {
+ std::unique_lock<std::mutex> lock(_work_size_mutex);
+ return _total_task_size;
+}
+
MultiCoreTaskQueue::~MultiCoreTaskQueue() = default;
MultiCoreTaskQueue::MultiCoreTaskQueue(size_t core_size) :
TaskQueue(core_size), _closed(false) {
@@ -201,9 +205,9 @@ Status MultiCoreTaskQueue::push_back(PipelineTask* task,
size_t core_id) {
}
bool TaskGroupTaskQueue::TaskGroupSchedEntityComparator::operator()(
- const taskgroup::TGEntityPtr& lhs_ptr, const taskgroup::TGEntityPtr&
rhs_ptr) const {
- int64_t lhs_val = lhs_ptr->vruntime_ns();
- int64_t rhs_val = rhs_ptr->vruntime_ns();
+ const taskgroup::TGPTEntityPtr& lhs_ptr, const
taskgroup::TGPTEntityPtr& rhs_ptr) const {
+ auto lhs_val = lhs_ptr->vruntime_ns();
+ auto rhs_val = rhs_ptr->vruntime_ns();
if (lhs_val != rhs_val) {
return lhs_val < rhs_val;
} else {
@@ -217,7 +221,8 @@ bool
TaskGroupTaskQueue::TaskGroupSchedEntityComparator::operator()(
}
}
-TaskGroupTaskQueue::TaskGroupTaskQueue(size_t core_size) :
TaskQueue(core_size) {}
+TaskGroupTaskQueue::TaskGroupTaskQueue(size_t core_size)
+ : TaskQueue(core_size), _min_tg_entity(nullptr) {}
TaskGroupTaskQueue::~TaskGroupTaskQueue() = default;
@@ -237,9 +242,10 @@ Status TaskGroupTaskQueue::push_back(PipelineTask* task,
size_t core_id) {
template <bool from_executor>
Status TaskGroupTaskQueue::_push_back(PipelineTask* task) {
- auto* entity = task->get_task_group()->task_entity();
+ task->put_in_runnable_queue();
+ auto* entity = task->get_task_group_entity();
std::unique_lock<std::mutex> lock(_rs_mutex);
- entity->push_back(task);
+ entity->task_queue()->emplace(task);
if (_group_entities.find(entity) == _group_entities.end()) {
_enqueue_task_group<from_executor>(entity);
}
@@ -250,7 +256,7 @@ Status TaskGroupTaskQueue::_push_back(PipelineTask* task) {
// TODO pipeline support steal
PipelineTask* TaskGroupTaskQueue::take(size_t core_id) {
std::unique_lock<std::mutex> lock(_rs_mutex);
- taskgroup::TGEntityPtr entity = nullptr;
+ taskgroup::TGPTEntityPtr entity = nullptr;
while (entity == nullptr) {
if (_closed) {
return nullptr;
@@ -268,11 +274,16 @@ PipelineTask* TaskGroupTaskQueue::take(size_t core_id) {
if (entity->task_size() == 1) {
_dequeue_task_group(entity);
}
- return entity->take();
+ auto task = entity->task_queue()->front();
+ if (task) {
+ entity->task_queue()->pop();
+ task->pop_out_runnable_queue();
+ }
+ return task;
}
template <bool from_worker>
-void TaskGroupTaskQueue::_enqueue_task_group(taskgroup::TGEntityPtr tg_entity)
{
+void TaskGroupTaskQueue::_enqueue_task_group(taskgroup::TGPTEntityPtr
tg_entity) {
_total_cpu_share += tg_entity->cpu_share();
if constexpr (!from_worker) {
/**
@@ -283,7 +294,9 @@ void
TaskGroupTaskQueue::_enqueue_task_group(taskgroup::TGEntityPtr tg_entity) {
auto old_v_ns = tg_entity->vruntime_ns();
auto* min_entity = _min_tg_entity.load();
if (min_entity) {
- int64_t new_vruntime_ns = min_entity->vruntime_ns() -
_ideal_runtime_ns(tg_entity) / 2;
+ auto min_tg_v = min_entity->vruntime_ns();
+ auto ideal_r = _ideal_runtime_ns(tg_entity) / 2;
+ uint64_t new_vruntime_ns = min_tg_v > ideal_r ? min_tg_v - ideal_r
: min_tg_v;
if (new_vruntime_ns > old_v_ns) {
tg_entity->adjust_vruntime_ns(new_vruntime_ns);
}
@@ -297,7 +310,7 @@ void
TaskGroupTaskQueue::_enqueue_task_group(taskgroup::TGEntityPtr tg_entity) {
_update_min_tg();
}
-void TaskGroupTaskQueue::_dequeue_task_group(taskgroup::TGEntityPtr tg_entity)
{
+void TaskGroupTaskQueue::_dequeue_task_group(taskgroup::TGPTEntityPtr
tg_entity) {
_total_cpu_share -= tg_entity->cpu_share();
_group_entities.erase(tg_entity);
VLOG_DEBUG << "dequeue tg " << tg_entity->debug_string()
@@ -317,12 +330,12 @@ void TaskGroupTaskQueue::_update_min_tg() {
}
// like sched_fair.c calc_delta_fair, THREAD_TIME_SLICE maybe a dynamic value.
-int64_t TaskGroupTaskQueue::_ideal_runtime_ns(taskgroup::TGEntityPtr
tg_entity) const {
+uint64_t TaskGroupTaskQueue::_ideal_runtime_ns(taskgroup::TGPTEntityPtr
tg_entity) const {
return PipelineTask::THREAD_TIME_SLICE * _core_size *
tg_entity->cpu_share() / _total_cpu_share;
}
-taskgroup::TGEntityPtr TaskGroupTaskQueue::_next_tg_entity() {
- taskgroup::TGEntityPtr res = nullptr;
+taskgroup::TGPTEntityPtr TaskGroupTaskQueue::_next_tg_entity() {
+ taskgroup::TGPTEntityPtr res = nullptr;
for (auto* entity : _group_entities) {
res = entity;
break;
@@ -332,8 +345,7 @@ taskgroup::TGEntityPtr
TaskGroupTaskQueue::_next_tg_entity() {
void TaskGroupTaskQueue::update_statistics(PipelineTask* task, int64_t
time_spent) {
std::unique_lock<std::mutex> lock(_rs_mutex);
- auto* group = task->get_task_group();
- auto* entity = group->task_entity();
+ auto* entity = task->get_task_group_entity();
auto find_entity = _group_entities.find(entity);
bool is_in_queue = find_entity != _group_entities.end();
VLOG_DEBUG << "update_statistics " << entity->debug_string() << ", in
queue:" << is_in_queue;
@@ -348,15 +360,14 @@ void TaskGroupTaskQueue::update_statistics(PipelineTask*
task, int64_t time_spen
}
void TaskGroupTaskQueue::update_tg_cpu_share(const taskgroup::TaskGroupInfo&
task_group_info,
- taskgroup::TaskGroupPtr
task_group) {
+ taskgroup::TGPTEntityPtr entity) {
std::unique_lock<std::mutex> lock(_rs_mutex);
- auto* entity = task_group->task_entity();
bool is_in_queue = _group_entities.find(entity) != _group_entities.end();
if (is_in_queue) {
_group_entities.erase(entity);
_total_cpu_share -= entity->cpu_share();
}
- task_group->update_cpu_share_unlock(task_group_info);
+ entity->check_and_update_cpu_share(task_group_info);
if (is_in_queue) {
_group_entities.emplace(entity);
_total_cpu_share += entity->cpu_share();
diff --git a/be/src/pipeline/task_queue.h b/be/src/pipeline/task_queue.h
index 9956ba3cb98..d693cbe2168 100644
--- a/be/src/pipeline/task_queue.h
+++ b/be/src/pipeline/task_queue.h
@@ -33,7 +33,6 @@
#include "runtime/task_group/task_group.h"
namespace doris {
-
namespace pipeline {
class TaskQueue {
@@ -54,7 +53,7 @@ public:
virtual void update_statistics(PipelineTask* task, int64_t time_spent) {}
virtual void update_tg_cpu_share(const taskgroup::TaskGroupInfo&
task_group_info,
- taskgroup::TaskGroupPtr task_group) = 0;
+ taskgroup::TGPTEntityPtr entity) = 0;
int cores() const { return _core_size; }
@@ -95,12 +94,10 @@ private:
// A Multilevel Feedback Queue
class PriorityTaskQueue {
public:
- explicit PriorityTaskQueue();
+ PriorityTaskQueue();
void close();
- PipelineTask* try_take_unprotected(bool is_steal);
-
PipelineTask* try_take(bool is_steal);
PipelineTask* take(uint32_t timeout_ms = 0);
@@ -111,7 +108,10 @@ public:
_sub_queues[level].inc_runtime(runtime);
}
+ int task_size();
+
private:
+ PipelineTask* _try_take_unprotected(bool is_steal);
static constexpr auto LEVEL_QUEUE_TIME_FACTOR = 2;
static constexpr size_t SUB_QUEUE_LEVEL = 6;
SubTaskQueue _sub_queues[SUB_QUEUE_LEVEL];
@@ -155,7 +155,7 @@ public:
}
void update_tg_cpu_share(const taskgroup::TaskGroupInfo& task_group_info,
- taskgroup::TaskGroupPtr task_group) override {
+ taskgroup::TGPTEntityPtr entity) override {
LOG(FATAL) << "update_tg_cpu_share not implemented";
}
@@ -185,29 +185,29 @@ public:
void update_statistics(PipelineTask* task, int64_t time_spent) override;
void update_tg_cpu_share(const taskgroup::TaskGroupInfo& task_group_info,
- taskgroup::TaskGroupPtr task_group) override;
+ taskgroup::TGPTEntityPtr entity) override;
private:
template <bool from_executor>
Status _push_back(PipelineTask* task);
template <bool from_worker>
- void _enqueue_task_group(taskgroup::TGEntityPtr);
- void _dequeue_task_group(taskgroup::TGEntityPtr);
- taskgroup::TGEntityPtr _next_tg_entity();
- int64_t _ideal_runtime_ns(taskgroup::TGEntityPtr tg_entity) const;
+ void _enqueue_task_group(taskgroup::TGPTEntityPtr);
+ void _dequeue_task_group(taskgroup::TGPTEntityPtr);
+ taskgroup::TGPTEntityPtr _next_tg_entity();
+ uint64_t _ideal_runtime_ns(taskgroup::TGPTEntityPtr tg_entity) const;
void _update_min_tg();
// Like cfs rb tree in sched_entity
struct TaskGroupSchedEntityComparator {
- bool operator()(const taskgroup::TGEntityPtr&, const
taskgroup::TGEntityPtr&) const;
+ bool operator()(const taskgroup::TGPTEntityPtr&, const
taskgroup::TGPTEntityPtr&) const;
};
- using ResouceGroupSet = std::set<taskgroup::TGEntityPtr,
TaskGroupSchedEntityComparator>;
+ using ResouceGroupSet = std::set<taskgroup::TGPTEntityPtr,
TaskGroupSchedEntityComparator>;
ResouceGroupSet _group_entities;
std::condition_variable _wait_task;
std::mutex _rs_mutex;
bool _closed = false;
int _total_cpu_share = 0;
- std::atomic<taskgroup::TGEntityPtr> _min_tg_entity = nullptr;
+ std::atomic<taskgroup::TGPTEntityPtr> _min_tg_entity = nullptr;
uint64_t _min_tg_v_runtime_ns = 0;
};
diff --git a/be/src/pipeline/task_scheduler.cpp
b/be/src/pipeline/task_scheduler.cpp
index b83a51ecde2..4298efd01db 100644
--- a/be/src/pipeline/task_scheduler.cpp
+++ b/be/src/pipeline/task_scheduler.cpp
@@ -112,6 +112,7 @@ void BlockedTaskScheduler::_schedule() {
if (state == PipelineTaskState::PENDING_FINISH) {
// should cancel or should finish
if (task->is_pending_finish()) {
+ VLOG_DEBUG << "Task pending" << task->debug_string();
iter++;
} else {
_make_task_run(local_blocked_tasks, iter,
PipelineTaskState::PENDING_FINISH);
@@ -365,9 +366,4 @@ void TaskScheduler::shutdown() {
}
}
-void TaskScheduler::update_tg_cpu_share(const taskgroup::TaskGroupInfo&
task_group_info,
- taskgroup::TaskGroupPtr task_group) {
- _task_queue->update_tg_cpu_share(task_group_info, task_group);
-}
-
} // namespace doris::pipeline
diff --git a/be/src/pipeline/task_scheduler.h b/be/src/pipeline/task_scheduler.h
index ad69e10d8b0..ac9389c0887 100644
--- a/be/src/pipeline/task_scheduler.h
+++ b/be/src/pipeline/task_scheduler.h
@@ -91,8 +91,7 @@ public:
void shutdown();
- void update_tg_cpu_share(const taskgroup::TaskGroupInfo& task_group_info,
- taskgroup::TaskGroupPtr task_group);
+ TaskQueue* task_queue() const { return _task_queue.get(); }
private:
std::unique_ptr<ThreadPool> _fix_thread_pool;
diff --git a/be/src/runtime/exec_env.h b/be/src/runtime/exec_env.h
index afc64f65f4b..2032d788392 100644
--- a/be/src/runtime/exec_env.h
+++ b/be/src/runtime/exec_env.h
@@ -39,6 +39,9 @@ class ScannerScheduler;
namespace pipeline {
class TaskScheduler;
}
+namespace taskgroup {
+class TaskGroupManager;
+}
class BfdParser;
class BrokerMgr;
template <class T>
@@ -108,6 +111,7 @@ public:
pipeline::TaskScheduler* pipeline_task_group_scheduler() {
return _pipeline_task_group_scheduler;
}
+ taskgroup::TaskGroupManager* task_group_manager() { return
_task_group_manager; }
// using template to simplify client cache management
template <typename T>
@@ -233,6 +237,7 @@ private:
FragmentMgr* _fragment_mgr = nullptr;
pipeline::TaskScheduler* _pipeline_task_scheduler = nullptr;
pipeline::TaskScheduler* _pipeline_task_group_scheduler = nullptr;
+ taskgroup::TaskGroupManager* _task_group_manager = nullptr;
ResultCache* _result_cache = nullptr;
TMasterInfo* _master_info = nullptr;
diff --git a/be/src/runtime/exec_env_init.cpp b/be/src/runtime/exec_env_init.cpp
index a4e1479fc4e..d0fab65a75f 100644
--- a/be/src/runtime/exec_env_init.cpp
+++ b/be/src/runtime/exec_env_init.cpp
@@ -65,6 +65,7 @@
#include "runtime/small_file_mgr.h"
#include "runtime/stream_load/new_load_stream_mgr.h"
#include "runtime/stream_load/stream_load_executor.h"
+#include "runtime/task_group/task_group_manager.h"
#include "runtime/thread_context.h"
#include "service/point_query_executor.h"
#include "util/bfd_parser.h"
@@ -147,6 +148,7 @@ Status ExecEnv::_init(const std::vector<StorePath>&
store_paths) {
.build(&_join_node_thread_pool);
RETURN_IF_ERROR(init_pipeline_task_scheduler());
+ _task_group_manager = new taskgroup::TaskGroupManager();
_scanner_scheduler = new doris::vectorized::ScannerScheduler();
_fragment_mgr = new FragmentMgr(this);
_result_cache = new ResultCache(config::query_cache_max_size_mb,
@@ -389,6 +391,7 @@ void ExecEnv::_destroy() {
SAFE_DELETE(_load_path_mgr);
SAFE_DELETE(_pipeline_task_scheduler);
SAFE_DELETE(_pipeline_task_group_scheduler);
+ SAFE_DELETE(_task_group_manager);
SAFE_DELETE(_fragment_mgr);
SAFE_DELETE(_broker_client_cache);
SAFE_DELETE(_frontend_client_cache);
diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp
index 37588f36616..7e7330a43ec 100644
--- a/be/src/runtime/fragment_mgr.cpp
+++ b/be/src/runtime/fragment_mgr.cpp
@@ -736,7 +736,7 @@ Status FragmentMgr::_get_query_ctx(const Params& params,
TUniqueId query_id, boo
auto status =
taskgroup::TaskGroupInfo::parse_group_info(params.workload_groups[0],
&task_group_info);
if (status.ok()) {
- auto tg =
taskgroup::TaskGroupManager::instance()->get_or_create_task_group(
+ auto tg =
_exec_env->task_group_manager()->get_or_create_task_group(
task_group_info);
tg->add_mem_tracker_limiter(query_ctx->query_mem_tracker);
query_ctx->set_task_group(tg);
diff --git a/be/src/runtime/task_group/task_group.cpp
b/be/src/runtime/task_group/task_group.cpp
index 37149ddfd8b..7c3d8ff42b0 100644
--- a/be/src/runtime/task_group/task_group.cpp
+++ b/be/src/runtime/task_group/task_group.cpp
@@ -27,11 +27,14 @@
#include <utility>
#include "common/logging.h"
+#include "pipeline/task_queue.h"
#include "pipeline/task_scheduler.h"
#include "runtime/exec_env.h"
#include "runtime/memory/mem_tracker_limiter.h"
#include "util/mem_info.h"
#include "util/parse_util.h"
+#include "vec/exec/scan/scan_task_queue.h"
+#include "vec/exec/scan/scanner_scheduler.h"
namespace doris {
namespace taskgroup {
@@ -40,50 +43,75 @@ const static std::string CPU_SHARE = "cpu_share";
const static std::string MEMORY_LIMIT = "memory_limit";
const static std::string ENABLE_MEMORY_OVERCOMMIT = "enable_memory_overcommit";
-pipeline::PipelineTask* TaskGroupEntity::take() {
- if (_queue.empty()) {
- return nullptr;
- }
- auto task = _queue.front();
- _queue.pop();
- return task;
+template <typename QueueType>
+TaskGroupEntity<QueueType>::TaskGroupEntity(taskgroup::TaskGroup* tg,
std::string type)
+ : _tg(tg), _type(type), _version(tg->version()),
_cpu_share(tg->cpu_share()) {
+ _task_queue = new QueueType();
+}
+
+template <typename QueueType>
+TaskGroupEntity<QueueType>::~TaskGroupEntity() {
+ delete _task_queue;
}
-void TaskGroupEntity::incr_runtime_ns(uint64_t runtime_ns) {
- auto v_time = runtime_ns / _tg->cpu_share();
+template <typename QueueType>
+QueueType* TaskGroupEntity<QueueType>::task_queue() {
+ return _task_queue;
+}
+
+template <typename QueueType>
+void TaskGroupEntity<QueueType>::incr_runtime_ns(uint64_t runtime_ns) {
+ auto v_time = runtime_ns / _cpu_share;
_vruntime_ns += v_time;
}
-void TaskGroupEntity::adjust_vruntime_ns(uint64_t vruntime_ns) {
+template <typename QueueType>
+void TaskGroupEntity<QueueType>::adjust_vruntime_ns(uint64_t vruntime_ns) {
VLOG_DEBUG << "adjust " << debug_string() << "vtime to " << vruntime_ns;
_vruntime_ns = vruntime_ns;
}
-void TaskGroupEntity::push_back(pipeline::PipelineTask* task) {
- _queue.emplace(task);
+template <typename QueueType>
+size_t TaskGroupEntity<QueueType>::task_size() const {
+ return _task_queue->size();
}
-uint64_t TaskGroupEntity::cpu_share() const {
- return _tg->cpu_share();
+template <typename QueueType>
+uint64_t TaskGroupEntity<QueueType>::cpu_share() const {
+ return _cpu_share;
}
-uint64_t TaskGroupEntity::task_group_id() const {
+template <typename QueueType>
+uint64_t TaskGroupEntity<QueueType>::task_group_id() const {
return _tg->id();
}
-std::string TaskGroupEntity::debug_string() const {
- return fmt::format("TGE[id = {}, cpu_share = {}, task size: {},
v_time:{}ns]", _tg->id(),
- cpu_share(), _queue.size(), _vruntime_ns);
+template <typename QueueType>
+void TaskGroupEntity<QueueType>::check_and_update_cpu_share(const
TaskGroupInfo& tg_info) {
+ if (tg_info.version > _version) {
+ _cpu_share = tg_info.cpu_share;
+ _version = tg_info.version;
+ }
}
+template <typename QueueType>
+std::string TaskGroupEntity<QueueType>::debug_string() const {
+ return fmt::format("TGE[id = {}, name = {}-{}, cpu_share = {}, task size:
{}, v_time:{} ns]",
+ _tg->id(), _tg->name(), _type, cpu_share(),
task_size(), _vruntime_ns);
+}
+
+template class TaskGroupEntity<std::queue<pipeline::PipelineTask*>>;
+template class TaskGroupEntity<ScanTaskQueue>;
+
TaskGroup::TaskGroup(const TaskGroupInfo& tg_info)
: _id(tg_info.id),
_name(tg_info.name),
- _cpu_share(tg_info.cpu_share),
+ _version(tg_info.version),
_memory_limit(tg_info.memory_limit),
_enable_memory_overcommit(tg_info.enable_memory_overcommit),
- _version(tg_info.version),
- _task_entity(this),
+ _cpu_share(tg_info.cpu_share),
+ _task_entity(this, "pipeline task entity"),
+ _local_scan_entity(this, "local scan entity"),
_mem_tracker_limiter_pool(MEM_TRACKER_GROUP_NUM) {}
std::string TaskGroup::debug_string() const {
@@ -105,22 +133,22 @@ void TaskGroup::check_and_update(const TaskGroupInfo&
tg_info) {
return;
}
}
-
- std::lock_guard<std::shared_mutex> wl {_mutex};
- if (tg_info.version > _version) {
- _name = tg_info.name;
- _version = tg_info.version;
- _memory_limit = tg_info.memory_limit;
- _enable_memory_overcommit = tg_info.enable_memory_overcommit;
- if (_cpu_share != tg_info.cpu_share) {
-
ExecEnv::GetInstance()->pipeline_task_group_scheduler()->update_tg_cpu_share(
- tg_info, shared_from_this());
+ {
+ std::lock_guard<std::shared_mutex> wl {_mutex};
+ if (tg_info.version > _version) {
+ _name = tg_info.name;
+ _version = tg_info.version;
+ _memory_limit = tg_info.memory_limit;
+ _enable_memory_overcommit = tg_info.enable_memory_overcommit;
+ _cpu_share = tg_info.cpu_share;
+ } else {
+ return;
}
}
-}
-
-void TaskGroup::update_cpu_share_unlock(const TaskGroupInfo& tg_info) {
- _cpu_share = tg_info.cpu_share;
+
ExecEnv::GetInstance()->pipeline_task_group_scheduler()->task_queue()->update_tg_cpu_share(
+ tg_info, &_task_entity);
+
ExecEnv::GetInstance()->scanner_scheduler()->local_scan_task_queue()->update_tg_cpu_share(
+ tg_info, &_local_scan_entity);
}
int64_t TaskGroup::memory_used() {
diff --git a/be/src/runtime/task_group/task_group.h
b/be/src/runtime/task_group/task_group.h
index c96664fc532..cd547c9c7e9 100644
--- a/be/src/runtime/task_group/task_group.h
+++ b/be/src/runtime/task_group/task_group.h
@@ -31,31 +31,34 @@
namespace doris {
-namespace pipeline {
-class PipelineTask;
-}
-
class TPipelineWorkloadGroup;
class MemTrackerLimiter;
+namespace pipeline {
+class PipelineTask;
+} // namespace pipeline
+
namespace taskgroup {
class TaskGroup;
struct TaskGroupInfo;
+class ScanTaskQueue;
+template <typename QueueType>
class TaskGroupEntity {
public:
- explicit TaskGroupEntity(taskgroup::TaskGroup* ts) : _tg(ts) {}
- void push_back(pipeline::PipelineTask* task);
+ explicit TaskGroupEntity(taskgroup::TaskGroup* tg, std::string type);
+ ~TaskGroupEntity();
+
uint64_t vruntime_ns() const { return _vruntime_ns; }
- pipeline::PipelineTask* take();
+ QueueType* task_queue();
void incr_runtime_ns(uint64_t runtime_ns);
void adjust_vruntime_ns(uint64_t vruntime_ns);
- size_t task_size() const { return _queue.size(); }
+ size_t task_size() const;
uint64_t cpu_share() const;
@@ -63,14 +66,29 @@ public:
uint64_t task_group_id() const;
+ void check_and_update_cpu_share(const TaskGroupInfo& tg_info);
+
private:
- // TODO pipeline use MLFQ
- std::queue<pipeline::PipelineTask*> _queue;
- taskgroup::TaskGroup* _tg;
+ QueueType* _task_queue;
+
uint64_t _vruntime_ns = 0;
+ taskgroup::TaskGroup* _tg;
+
+ std::string _type;
+
+ // Because updating cpu share of entity requires locking the task
queue(pipeline task queue or
+ // scan task queue) contains that entity, we kept version and cpu share in
entity for
+ // independent updates.
+ int64_t _version;
+ uint64_t _cpu_share;
};
-using TGEntityPtr = TaskGroupEntity*;
+// TODO llj tg use PriorityTaskQueue to replace std::queue
+using TaskGroupPipelineTaskEntity =
TaskGroupEntity<std::queue<pipeline::PipelineTask*>>;
+using TGPTEntityPtr = TaskGroupPipelineTaskEntity*;
+
+using TaskGroupScanTaskEntity = TaskGroupEntity<ScanTaskQueue>;
+using TGSTEntityPtr = TaskGroupScanTaskEntity*;
struct TgTrackerLimiterGroup {
std::unordered_set<std::shared_ptr<MemTrackerLimiter>> trackers;
@@ -81,12 +99,17 @@ class TaskGroup : public
std::enable_shared_from_this<TaskGroup> {
public:
explicit TaskGroup(const TaskGroupInfo& tg_info);
- TaskGroupEntity* task_entity() { return &_task_entity; }
+ TaskGroupPipelineTaskEntity* task_entity() { return &_task_entity; }
+ TGSTEntityPtr local_scan_task_entity() { return &_local_scan_entity; }
+
+ int64_t version() const { return _version; }
uint64_t cpu_share() const { return _cpu_share.load(); }
uint64_t id() const { return _id; }
+ std::string name() const { return _name; };
+
bool enable_memory_overcommit() const {
std::shared_lock<std::shared_mutex> r_lock(_mutex);
return _enable_memory_overcommit;
@@ -103,8 +126,6 @@ public:
void check_and_update(const TaskGroupInfo& tg_info);
- void update_cpu_share_unlock(const TaskGroupInfo& tg_info);
-
void add_mem_tracker_limiter(std::shared_ptr<MemTrackerLimiter>
mem_tracker_ptr);
void remove_mem_tracker_limiter(std::shared_ptr<MemTrackerLimiter>
mem_tracker_ptr);
@@ -119,12 +140,12 @@ private:
mutable std::shared_mutex _mutex; // lock _name, _version, _cpu_share,
_memory_limit
const uint64_t _id;
std::string _name;
- std::atomic<uint64_t> _cpu_share;
+ int64_t _version;
int64_t _memory_limit; // bytes
bool _enable_memory_overcommit;
- int64_t _version;
- TaskGroupEntity _task_entity;
-
+ std::atomic<uint64_t> _cpu_share;
+ TaskGroupPipelineTaskEntity _task_entity;
+ TaskGroupScanTaskEntity _local_scan_entity;
std::vector<TgTrackerLimiterGroup> _mem_tracker_limiter_pool;
};
diff --git a/be/src/runtime/task_group/task_group_manager.cpp
b/be/src/runtime/task_group/task_group_manager.cpp
index 552ab2c0a92..6ce6d316048 100644
--- a/be/src/runtime/task_group/task_group_manager.cpp
+++ b/be/src/runtime/task_group/task_group_manager.cpp
@@ -20,19 +20,17 @@
#include <memory>
#include <mutex>
+#include "pipeline/task_scheduler.h"
+#include "runtime/exec_env.h"
#include "runtime/memory/mem_tracker_limiter.h"
#include "runtime/task_group/task_group.h"
+#include "vec/exec/scan/scanner_scheduler.h"
namespace doris::taskgroup {
TaskGroupManager::TaskGroupManager() = default;
TaskGroupManager::~TaskGroupManager() = default;
-TaskGroupManager* TaskGroupManager::instance() {
- static TaskGroupManager tgm;
- return &tgm;
-}
-
TaskGroupPtr TaskGroupManager::get_or_create_task_group(const TaskGroupInfo&
task_group_info) {
{
std::shared_lock<std::shared_mutex> r_lock(_group_mutex);
diff --git a/be/src/runtime/task_group/task_group_manager.h
b/be/src/runtime/task_group/task_group_manager.h
index 0b7472438c3..375208dc6e5 100644
--- a/be/src/runtime/task_group/task_group_manager.h
+++ b/be/src/runtime/task_group/task_group_manager.h
@@ -23,13 +23,14 @@
#include "task_group.h"
-namespace doris::taskgroup {
+namespace doris {
+class ExecEnv;
+namespace taskgroup {
class TaskGroupManager {
public:
TaskGroupManager();
~TaskGroupManager();
- static TaskGroupManager* instance();
TaskGroupPtr get_or_create_task_group(const TaskGroupInfo&
task_group_info);
@@ -41,4 +42,5 @@ private:
std::unordered_map<uint64_t, TaskGroupPtr> _task_groups;
};
-} // namespace doris::taskgroup
+} // namespace taskgroup
+} // namespace doris
diff --git a/be/src/util/mem_info.cpp b/be/src/util/mem_info.cpp
index 9250101e67d..38f9039816c 100644
--- a/be/src/util/mem_info.cpp
+++ b/be/src/util/mem_info.cpp
@@ -235,7 +235,7 @@ int64_t MemInfo::tg_not_enable_overcommit_group_gc() {
std::unique_ptr<RuntimeProfile> tg_profile =
std::make_unique<RuntimeProfile>("WorkloadGroup");
int64_t total_free_memory = 0;
- taskgroup::TaskGroupManager::instance()->get_resource_groups(
+ ExecEnv::GetInstance()->task_group_manager()->get_resource_groups(
[](const taskgroup::TaskGroupPtr& task_group) {
return !task_group->enable_memory_overcommit();
},
@@ -277,7 +277,7 @@ int64_t MemInfo::tg_enable_overcommit_group_gc(int64_t
request_free_memory,
MonotonicStopWatch watch;
watch.start();
std::vector<taskgroup::TaskGroupPtr> task_groups;
- taskgroup::TaskGroupManager::instance()->get_resource_groups(
+ ExecEnv::GetInstance()->task_group_manager()->get_resource_groups(
[](const taskgroup::TaskGroupPtr& task_group) {
return task_group->enable_memory_overcommit();
},
diff --git a/be/src/vec/exec/scan/scan_task_queue.cpp
b/be/src/vec/exec/scan/scan_task_queue.cpp
new file mode 100644
index 00000000000..538f77211c3
--- /dev/null
+++ b/be/src/vec/exec/scan/scan_task_queue.cpp
@@ -0,0 +1,213 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "scan_task_queue.h"
+
+#include "pipeline/pipeline_task.h"
+#include "runtime/task_group/task_group.h"
+#include "vec/exec/scan/scanner_context.h"
+
+namespace doris {
+namespace taskgroup {
+static void empty_function() {}
+ScanTask::ScanTask() : ScanTask(empty_function, nullptr, 1) {}
+
+ScanTask::ScanTask(WorkFunction scan_func, vectorized::ScannerContext*
scanner_context,
+ int priority)
+ : scan_func(std::move(scan_func)), scanner_context(scanner_context),
priority(priority) {}
+
+ScanTaskQueue::ScanTaskQueue() :
_queue(config::doris_scanner_thread_pool_queue_size) {}
+
+Status ScanTaskQueue::try_push_back(ScanTask scan_task) {
+ if (_queue.try_put(std::move(scan_task))) {
+ VLOG_DEBUG << "try_push_back scan task " <<
scan_task.scanner_context->ctx_id << " "
+ << scan_task.priority;
+ return Status::OK();
+ } else {
+ return Status::InternalError("failed to submit scan task to
ScanTaskQueue");
+ }
+}
+bool ScanTaskQueue::try_get(ScanTask* scan_task, uint32_t timeout_ms) {
+ auto r = _queue.blocking_get(scan_task, timeout_ms);
+ if (r) {
+ VLOG_DEBUG << "try get scan task " <<
scan_task->scanner_context->ctx_id << " "
+ << scan_task->priority;
+ }
+ return r;
+}
+
+ScanTaskTaskGroupQueue::ScanTaskTaskGroupQueue(size_t core_size) :
_core_size(core_size) {}
+ScanTaskTaskGroupQueue::~ScanTaskTaskGroupQueue() = default;
+
+void ScanTaskTaskGroupQueue::close() {
+ std::unique_lock<std::mutex> lock(_rs_mutex);
+ _closed = true;
+ _wait_task.notify_all();
+}
+
+bool ScanTaskTaskGroupQueue::take(ScanTask* scan_task) {
+ std::unique_lock<std::mutex> lock(_rs_mutex);
+ taskgroup::TGSTEntityPtr entity = nullptr;
+ while (entity == nullptr) {
+ if (_closed) {
+ return false;
+ }
+ if (_group_entities.empty()) {
+ _wait_task.wait_for(lock,
std::chrono::milliseconds(WAIT_CORE_TASK_TIMEOUT_MS * 5));
+ } else {
+ entity = _next_tg_entity();
+ if (!entity) {
+ _wait_task.wait_for(lock,
std::chrono::milliseconds(WAIT_CORE_TASK_TIMEOUT_MS));
+ }
+ }
+ }
+ DCHECK(entity->task_size() > 0);
+ if (entity->task_size() == 1) {
+ _dequeue_task_group(entity);
+ }
+ return entity->task_queue()->try_get(scan_task, WAIT_CORE_TASK_TIMEOUT_MS
/* timeout_ms */);
+}
+
+bool ScanTaskTaskGroupQueue::push_back(ScanTask scan_task) {
+ auto* entity =
scan_task.scanner_context->get_task_group()->local_scan_task_entity();
+ std::unique_lock<std::mutex> lock(_rs_mutex);
+ auto status = entity->task_queue()->try_push_back(scan_task);
+ if (!status.ok()) {
+ LOG(WARNING) << "try_push_back scan task fail: " << status;
+ return false;
+ }
+ if (_group_entities.find(entity) == _group_entities.end()) {
+ _enqueue_task_group(entity);
+ }
+ _wait_task.notify_one();
+ return true;
+}
+
+void ScanTaskTaskGroupQueue::update_statistics(ScanTask scan_task, int64_t
time_spent) {
+ auto* entity =
scan_task.scanner_context->get_task_group()->local_scan_task_entity();
+ std::unique_lock<std::mutex> lock(_rs_mutex);
+ auto find_entity = _group_entities.find(entity);
+ bool is_in_queue = find_entity != _group_entities.end();
+ VLOG_DEBUG << "scan task task group queue update_statistics " <<
entity->debug_string()
+ << ", in queue:" << is_in_queue << ", time_spent: " <<
time_spent;
+ if (is_in_queue) {
+ _group_entities.erase(entity);
+ }
+ entity->incr_runtime_ns(time_spent);
+ if (is_in_queue) {
+ _group_entities.emplace(entity);
+ _update_min_tg();
+ }
+}
+
+void ScanTaskTaskGroupQueue::update_tg_cpu_share(const
taskgroup::TaskGroupInfo& task_group_info,
+ taskgroup::TGSTEntityPtr
entity) {
+ std::unique_lock<std::mutex> lock(_rs_mutex);
+ bool is_in_queue = _group_entities.find(entity) != _group_entities.end();
+ if (is_in_queue) {
+ _group_entities.erase(entity);
+ _total_cpu_share -= entity->cpu_share();
+ }
+ entity->check_and_update_cpu_share(task_group_info);
+ if (is_in_queue) {
+ _group_entities.emplace(entity);
+ _total_cpu_share += entity->cpu_share();
+ }
+}
+
+void ScanTaskTaskGroupQueue::_enqueue_task_group(TGSTEntityPtr tg_entity) {
+ _total_cpu_share += tg_entity->cpu_share();
+ // TODO llj tg If submitted back to this queue from the scanner thread,
`adjust_vruntime_ns`
+ // should be avoided.
+ /**
+ * If a task group entity leaves task queue for a long time, its v runtime
will be very
+ * small. This can cause it to preempt too many execution time. So, in
order to avoid this
+ * situation, it is necessary to adjust the task group's v runtime.
+ * */
+ auto old_v_ns = tg_entity->vruntime_ns();
+ auto* min_entity = _min_tg_entity.load();
+ if (min_entity) {
+ auto min_tg_v = min_entity->vruntime_ns();
+ auto ideal_r = _ideal_runtime_ns(tg_entity) / 2;
+ uint64_t new_vruntime_ns = min_tg_v > ideal_r ? min_tg_v - ideal_r :
min_tg_v;
+ if (new_vruntime_ns > old_v_ns) {
+ VLOG_DEBUG << tg_entity->debug_string() << ", adjust to new " <<
new_vruntime_ns;
+ tg_entity->adjust_vruntime_ns(new_vruntime_ns);
+ }
+ } else if (old_v_ns < _min_tg_v_runtime_ns) {
+ VLOG_DEBUG << tg_entity->debug_string() << ", adjust to " <<
_min_tg_v_runtime_ns;
+ tg_entity->adjust_vruntime_ns(_min_tg_v_runtime_ns);
+ }
+ _group_entities.emplace(tg_entity);
+ VLOG_DEBUG << "scan enqueue tg " << tg_entity->debug_string()
+ << ", group entity size: " << _group_entities.size();
+ _update_min_tg();
+}
+
+void ScanTaskTaskGroupQueue::_dequeue_task_group(TGSTEntityPtr tg_entity) {
+ _total_cpu_share -= tg_entity->cpu_share();
+ _group_entities.erase(tg_entity);
+ VLOG_DEBUG << "scan task group queue dequeue tg " <<
tg_entity->debug_string()
+ << ", group entity size: " << _group_entities.size();
+ _update_min_tg();
+}
+
+TGSTEntityPtr ScanTaskTaskGroupQueue::_next_tg_entity() {
+ taskgroup::TGSTEntityPtr res = nullptr;
+ for (auto* entity : _group_entities) {
+ res = entity;
+ break;
+ }
+ return res;
+}
+
+uint64_t ScanTaskTaskGroupQueue::_ideal_runtime_ns(TGSTEntityPtr tg_entity)
const {
+ // Scan task does not have time slice, so we use pipeline task's instead.
+ return pipeline::PipelineTask::THREAD_TIME_SLICE * _core_size *
tg_entity->cpu_share() /
+ _total_cpu_share;
+}
+
+void ScanTaskTaskGroupQueue::_update_min_tg() {
+ auto* min_entity = _next_tg_entity();
+ _min_tg_entity = min_entity;
+ if (min_entity) {
+ auto min_v_runtime = min_entity->vruntime_ns();
+ if (min_v_runtime > _min_tg_v_runtime_ns) {
+ _min_tg_v_runtime_ns = min_v_runtime;
+ }
+ }
+}
+
+bool ScanTaskTaskGroupQueue::TaskGroupSchedEntityComparator::operator()(
+ const taskgroup::TGSTEntityPtr& lhs_ptr, const
taskgroup::TGSTEntityPtr& rhs_ptr) const {
+ auto lhs_val = lhs_ptr->vruntime_ns();
+ auto rhs_val = rhs_ptr->vruntime_ns();
+ if (lhs_val != rhs_val) {
+ return lhs_val < rhs_val;
+ } else {
+ auto l_share = lhs_ptr->cpu_share();
+ auto r_share = rhs_ptr->cpu_share();
+ if (l_share != r_share) {
+ return l_share < r_share;
+ } else {
+ return lhs_ptr->task_group_id() < rhs_ptr->task_group_id();
+ }
+ }
+}
+
+} // namespace taskgroup
+} // namespace doris
\ No newline at end of file
diff --git a/be/src/vec/exec/scan/scan_task_queue.h
b/be/src/vec/exec/scan/scan_task_queue.h
new file mode 100644
index 00000000000..f3c3b792a48
--- /dev/null
+++ b/be/src/vec/exec/scan/scan_task_queue.h
@@ -0,0 +1,98 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+#pragma once
+
+#include "olap/tablet.h"
+#include "runtime/task_group/task_group.h"
+#include "util/blocking_priority_queue.hpp"
+
+namespace doris {
+namespace vectorized {
+class ScannerContext;
+};
+
+namespace taskgroup {
+
+using WorkFunction = std::function<void()>;
+static constexpr auto WAIT_CORE_TASK_TIMEOUT_MS = 100;
+
+// Like PriorityThreadPool::Task
+struct ScanTask {
+ ScanTask();
+ ScanTask(WorkFunction scan_func, vectorized::ScannerContext*
scanner_context, int priority);
+ bool operator<(const ScanTask& o) const { return priority < o.priority; }
+ ScanTask& operator++() {
+ priority += 2;
+ return *this;
+ }
+
+ WorkFunction scan_func;
+ vectorized::ScannerContext* scanner_context;
+ int priority;
+};
+
+// Like pipeline::PriorityTaskQueue use BlockingPriorityQueue directly?
+class ScanTaskQueue {
+public:
+ ScanTaskQueue();
+ Status try_push_back(ScanTask);
+ bool try_get(ScanTask* scan_task, uint32_t timeout_ms);
+ int size() { return _queue.get_size(); }
+
+private:
+ BlockingPriorityQueue<ScanTask> _queue;
+};
+
+// Like TaskGroupTaskQueue
+class ScanTaskTaskGroupQueue {
+public:
+ explicit ScanTaskTaskGroupQueue(size_t core_size);
+ ~ScanTaskTaskGroupQueue();
+
+ void close();
+ bool take(ScanTask* scan_task);
+ bool push_back(ScanTask);
+
+ void update_statistics(ScanTask task, int64_t time_spent);
+
+ void update_tg_cpu_share(const taskgroup::TaskGroupInfo&,
taskgroup::TGSTEntityPtr);
+
+private:
+ TGSTEntityPtr _task_entity(ScanTask& scan_task);
+ void _enqueue_task_group(TGSTEntityPtr);
+ void _dequeue_task_group(TGSTEntityPtr);
+ TGSTEntityPtr _next_tg_entity();
+ uint64_t _ideal_runtime_ns(TGSTEntityPtr tg_entity) const;
+ void _update_min_tg();
+
+ // Like cfs rb tree in sched_entity
+ struct TaskGroupSchedEntityComparator {
+ bool operator()(const taskgroup::TGSTEntityPtr&, const
taskgroup::TGSTEntityPtr&) const;
+ };
+ using ResouceGroupSet = std::set<taskgroup::TGSTEntityPtr,
TaskGroupSchedEntityComparator>;
+ ResouceGroupSet _group_entities;
+ std::condition_variable _wait_task;
+ std::mutex _rs_mutex;
+ bool _closed = false;
+ int _total_cpu_share = 0;
+ std::atomic<taskgroup::TGSTEntityPtr> _min_tg_entity = nullptr;
+ uint64_t _min_tg_v_runtime_ns = 0;
+ size_t _core_size;
+};
+
+} // namespace taskgroup
+} // namespace doris
diff --git a/be/src/vec/exec/scan/scanner_context.cpp
b/be/src/vec/exec/scan/scanner_context.cpp
index 478d9fb4cb7..27ed7509987 100644
--- a/be/src/vec/exec/scan/scanner_context.cpp
+++ b/be/src/vec/exec/scan/scanner_context.cpp
@@ -261,8 +261,10 @@ Status
ScannerContext::_close_and_clear_scanners(VScanNode* node, RuntimeState*
if (state->enable_profile()) {
std::stringstream scanner_statistics;
std::stringstream scanner_rows_read;
+ std::stringstream scanner_wait_worker_time;
scanner_statistics << "[";
scanner_rows_read << "[";
+ scanner_wait_worker_time << "[";
for (auto finished_scanner_time : _finished_scanner_runtime) {
scanner_statistics << PrettyPrinter::print(finished_scanner_time,
TUnit::TIME_NS)
<< ", ";
@@ -270,6 +272,10 @@ Status
ScannerContext::_close_and_clear_scanners(VScanNode* node, RuntimeState*
for (auto finished_scanner_rows : _finished_scanner_rows_read) {
scanner_rows_read << PrettyPrinter::print(finished_scanner_rows,
TUnit::UNIT) << ", ";
}
+ for (auto finished_scanner_wait_time :
_finished_scanner_wait_worker_time) {
+ scanner_wait_worker_time
+ << PrettyPrinter::print(finished_scanner_wait_time,
TUnit::TIME_NS) << ", ";
+ }
// Only unfinished scanners here
for (auto& scanner : _scanners) {
// Scanners are in ObjPool in ScanNode,
@@ -279,11 +285,18 @@ Status
ScannerContext::_close_and_clear_scanners(VScanNode* node, RuntimeState*
<< ", ";
scanner_rows_read <<
PrettyPrinter::print(scanner->get_rows_read(), TUnit::UNIT)
<< ", ";
+ scanner_wait_worker_time
+ <<
PrettyPrinter::print(scanner->get_scanner_wait_worker_timer(),
+ TUnit::TIME_NS)
+ << ", ";
}
scanner_statistics << "]";
scanner_rows_read << "]";
+ scanner_wait_worker_time << "]";
node->_scanner_profile->add_info_string("PerScannerRunningTime",
scanner_statistics.str());
node->_scanner_profile->add_info_string("PerScannerRowsRead",
scanner_rows_read.str());
+ node->_scanner_profile->add_info_string("PerScannerWaitTime",
+
scanner_wait_worker_time.str());
}
// Only unfinished scanners here
for (auto& scanner : _scanners) {
@@ -397,6 +410,8 @@ void
ScannerContext::get_next_batch_of_scanners(std::list<VScannerSPtr>* current
if (scanner->need_to_close()) {
_finished_scanner_runtime.push_back(scanner->get_time_cost_ns());
_finished_scanner_rows_read.push_back(scanner->get_rows_read());
+ _finished_scanner_wait_worker_time.push_back(
+ scanner->get_scanner_wait_worker_timer());
scanner->close(_state);
} else {
current_run->push_back(scanner);
@@ -406,4 +421,8 @@ void
ScannerContext::get_next_batch_of_scanners(std::list<VScannerSPtr>* current
}
}
+taskgroup::TaskGroup* ScannerContext::get_task_group() const {
+ return _state->get_query_ctx()->get_task_group();
+}
+
} // namespace doris::vectorized
diff --git a/be/src/vec/exec/scan/scanner_context.h
b/be/src/vec/exec/scan/scanner_context.h
index 3aad0d6263f..c2c8612f861 100644
--- a/be/src/vec/exec/scan/scanner_context.h
+++ b/be/src/vec/exec/scan/scanner_context.h
@@ -41,6 +41,10 @@ class ThreadPoolToken;
class RuntimeState;
class TupleDescriptor;
+namespace taskgroup {
+class TaskGroup;
+} // namespace taskgroup
+
namespace vectorized {
class VScanner;
@@ -149,6 +153,7 @@ public:
}
return thread_slot_num;
}
+ taskgroup::TaskGroup* get_task_group() const;
void reschedule_scanner_ctx();
@@ -241,6 +246,7 @@ protected:
std::list<VScannerSPtr> _scanners;
std::vector<int64_t> _finished_scanner_runtime;
std::vector<int64_t> _finished_scanner_rows_read;
+ std::vector<int64_t> _finished_scanner_wait_worker_time;
const int _num_parallel_instances;
diff --git a/be/src/vec/exec/scan/scanner_scheduler.cpp
b/be/src/vec/exec/scan/scanner_scheduler.cpp
index f63b04692fd..b1c783a8520 100644
--- a/be/src/vec/exec/scan/scanner_scheduler.cpp
+++ b/be/src/vec/exec/scan/scanner_scheduler.cpp
@@ -36,6 +36,7 @@
#include "runtime/exec_env.h"
#include "runtime/runtime_state.h"
#include "runtime/thread_context.h"
+#include "scan_task_queue.h"
#include "util/async_io.h" // IWYU pragma: keep
#include "util/blocking_queue.hpp"
#include "util/cpu_info.h"
@@ -80,6 +81,7 @@ ScannerScheduler::~ScannerScheduler() {
_local_scan_thread_pool->shutdown();
_remote_scan_thread_pool->shutdown();
_limited_scan_thread_pool->shutdown();
+ _group_local_scan_thread_pool->shutdown();
_scheduler_pool->wait();
_local_scan_thread_pool->join();
@@ -88,6 +90,9 @@ ScannerScheduler::~ScannerScheduler() {
delete _pending_queues[i];
}
delete[] _pending_queues;
+
+ _task_group_local_scan_queue->close();
+ _group_local_scan_thread_pool->wait();
}
Status ScannerScheduler::init(ExecEnv* env) {
@@ -127,6 +132,19 @@ Status ScannerScheduler::init(ExecEnv* env) {
_register_metrics();
+ // 5. task group local scan
+ _task_group_local_scan_queue =
std::make_unique<taskgroup::ScanTaskTaskGroupQueue>(
+ config::doris_scanner_thread_pool_thread_num);
+ ThreadPoolBuilder("local_scan_group")
+ .set_min_threads(config::doris_scanner_thread_pool_thread_num)
+ .set_max_threads(config::doris_scanner_thread_pool_thread_num)
+ .build(&_group_local_scan_thread_pool);
+ for (int i = 0; i < config::doris_scanner_thread_pool_thread_num; i++) {
+ _group_local_scan_thread_pool->submit_func([this] {
+ this->_task_group_scanner_scan(this,
_task_group_local_scan_queue.get());
+ });
+ }
+
_is_init = true;
return Status::OK();
}
@@ -219,12 +237,20 @@ void ScannerScheduler::_schedule_scanners(ScannerContext*
ctx) {
TabletStorageType type = (*iter)->get_storage_type();
bool ret = false;
if (type == TabletStorageType::STORAGE_TYPE_LOCAL) {
- PriorityThreadPool::Task task;
- task.work_function = [this, scanner = *iter, ctx] {
- this->_scanner_scan(this, ctx, scanner);
- };
- task.priority = nice;
- ret = _local_scan_thread_pool->offer(task);
+ if (ctx->get_task_group() &&
config::enable_workload_group_for_scan) {
+ auto work_func = [this, scanner = *iter, ctx] {
+ this->_scanner_scan(this, ctx, scanner);
+ };
+ taskgroup::ScanTask scan_task = {work_func, ctx, nice};
+ ret =
_task_group_local_scan_queue->push_back(scan_task);
+ } else {
+ PriorityThreadPool::Task task;
+ task.work_function = [this, scanner = *iter, ctx] {
+ this->_scanner_scan(this, ctx, scanner);
+ };
+ task.priority = nice;
+ ret = _local_scan_thread_pool->offer(task);
+ }
} else {
ret = _remote_scan_thread_pool->submit_func([this, scanner
= *iter, ctx] {
this->_scanner_scan(this, ctx, scanner);
@@ -438,4 +464,20 @@ void ScannerScheduler::_deregister_metrics() {
DEREGISTER_HOOK_METRIC(limited_scan_thread_pool_thread_num);
}
+void ScannerScheduler::_task_group_scanner_scan(ScannerScheduler* scheduler,
+
taskgroup::ScanTaskTaskGroupQueue* scan_queue) {
+ while (!_is_closed) {
+ taskgroup::ScanTask scan_task;
+ auto success = scan_queue->take(&scan_task);
+ if (success) {
+ int64_t time_spent = 0;
+ {
+ SCOPED_RAW_TIMER(&time_spent);
+ scan_task.scan_func();
+ }
+ scan_queue->update_statistics(scan_task, time_spent);
+ }
+ }
+}
+
} // namespace doris::vectorized
diff --git a/be/src/vec/exec/scan/scanner_scheduler.h
b/be/src/vec/exec/scan/scanner_scheduler.h
index df81fcf8b47..87a9498e77c 100644
--- a/be/src/vec/exec/scan/scanner_scheduler.h
+++ b/be/src/vec/exec/scan/scanner_scheduler.h
@@ -21,6 +21,7 @@
#include <memory>
#include "common/status.h"
+#include "scan_task_queue.h"
#include "util/threadpool.h"
#include "vec/exec/scan/vscanner.h"
@@ -30,6 +31,9 @@ class ExecEnv;
namespace vectorized {
class VScanner;
} // namespace vectorized
+namespace taskgroup {
+class ScanTaskTaskGroupQueue;
+}
template <typename T>
class BlockingQueue;
} // namespace doris
@@ -66,6 +70,9 @@ public:
std::unique_ptr<ThreadPoolToken>
new_limited_scan_pool_token(ThreadPool::ExecutionMode mode,
int
max_concurrency);
+ taskgroup::ScanTaskTaskGroupQueue* local_scan_task_queue() {
+ return _task_group_local_scan_queue.get();
+ }
int remote_thread_pool_max_size() const { return
_remote_thread_pool_max_size; }
@@ -81,6 +88,9 @@ private:
static void _deregister_metrics();
+ void _task_group_scanner_scan(ScannerScheduler* scheduler,
+ taskgroup::ScanTaskTaskGroupQueue*
scan_queue);
+
// Scheduling queue number.
// TODO: make it configurable.
static const int QUEUE_NUM = 4;
@@ -103,6 +113,9 @@ private:
std::unique_ptr<ThreadPool> _remote_scan_thread_pool;
std::unique_ptr<ThreadPool> _limited_scan_thread_pool;
+ std::unique_ptr<taskgroup::ScanTaskTaskGroupQueue>
_task_group_local_scan_queue;
+ std::unique_ptr<ThreadPool> _group_local_scan_thread_pool;
+
// true is the scheduler is closed.
std::atomic_bool _is_closed = {false};
bool _is_init = false;
diff --git a/be/src/vec/exec/scan/vscanner.h b/be/src/vec/exec/scan/vscanner.h
index 2ee39979563..c29aafafc04 100644
--- a/be/src/vec/exec/scan/vscanner.h
+++ b/be/src/vec/exec/scan/vscanner.h
@@ -108,6 +108,8 @@ public:
void update_wait_worker_timer() { _scanner_wait_worker_timer +=
_watch.elapsed_time(); }
+ int64_t get_scanner_wait_worker_timer() { return
_scanner_wait_worker_timer; }
+
void update_scan_cpu_timer() { _scan_cpu_timer +=
_cpu_watch.elapsed_time(); }
RuntimeState* runtime_state() { return _state; }
diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
index 142616f2a0a..f81545f3b90 100644
--- a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
+++ b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
@@ -1581,7 +1581,7 @@ public class Config extends ConfigBase {
public static boolean enable_pipeline_load = false;
// enable_workload_group should be immutable and temporarily set to
mutable during the development test phase
- @ConfField(mutable = true, masterOnly = true, expType =
ExperimentalType.EXPERIMENTAL)
+ @ConfField(mutable = true, expType = ExperimentalType.EXPERIMENTAL)
public static boolean enable_workload_group = false;
@ConfField(mutable = true)
@@ -1590,9 +1590,6 @@ public class Config extends ConfigBase {
@ConfField(mutable = true)
public static boolean disable_shared_scan = false;
- @ConfField(mutable = true, expType = ExperimentalType.EXPERIMENTAL)
- public static boolean enable_cpu_hard_limit = false;
-
@ConfField(mutable = false, masterOnly = true)
public static int backend_rpc_timeout_ms = 60000; // 1 min
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]