This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 54780c62e0d [improvement](executor)Using cgroup to implement cpu hard
limit (#25489)
54780c62e0d is described below
commit 54780c62e0d0e8a03cc9a45d1a7edd1386fb5ede
Author: wangbo <[email protected]>
AuthorDate: Thu Oct 19 18:56:26 2023 +0800
[improvement](executor)Using cgroup to implement cpu hard limit (#25489)
* Using cgroup to implement cpu hard limit
* code style
---
be/src/agent/cgroup_cpu_ctl.cpp | 25 +++++--
be/src/agent/cgroup_cpu_ctl.h | 23 ++++---
be/src/pipeline/pipeline_fragment_context.cpp | 4 +-
be/src/pipeline/pipeline_task.h | 1 +
be/src/pipeline/task_scheduler.cpp | 12 ++--
be/src/pipeline/task_scheduler.h | 6 +-
be/src/runtime/exec_env.h | 14 +++-
be/src/runtime/exec_env_init.cpp | 29 ++++----
be/src/runtime/fragment_mgr.cpp | 24 ++++---
be/src/runtime/query_context.h | 15 +++++
be/src/runtime/task_group/task_group.cpp | 11 +--
be/src/runtime/task_group/task_group.h | 3 +-
be/src/runtime/task_group/task_group_manager.cpp | 72 ++++++++++++++++++++
be/src/runtime/task_group/task_group_manager.h | 26 ++++++++
be/src/vec/exec/scan/scanner_context.cpp | 4 ++
be/src/vec/exec/scan/scanner_context.h | 5 ++
be/src/vec/exec/scan/scanner_scheduler.cpp | 16 +++--
be/src/vec/exec/scan/scanner_scheduler.h | 60 +++++++++++++++++
.../resource/workloadgroup/WorkloadGroupMgr.java | 78 +++-------------------
19 files changed, 297 insertions(+), 131 deletions(-)
diff --git a/be/src/agent/cgroup_cpu_ctl.cpp b/be/src/agent/cgroup_cpu_ctl.cpp
index d8a18c8c130..4a9aed2bb0f 100644
--- a/be/src/agent/cgroup_cpu_ctl.cpp
+++ b/be/src/agent/cgroup_cpu_ctl.cpp
@@ -86,14 +86,25 @@ Status CgroupV1CpuCtl::init() {
}
}
+ // workload group path
+ _cgroup_v1_cpu_tg_path = _cgroup_v1_cpu_query_path + "/" +
std::to_string(_tg_id);
+ if (access(_cgroup_v1_cpu_tg_path.c_str(), F_OK) != 0) {
+ int ret = mkdir(_cgroup_v1_cpu_tg_path.c_str(), S_IRWXU);
+ if (ret != 0) {
+ LOG(ERROR) << "cgroup v1 mkdir workload group failed, path=" <<
_cgroup_v1_cpu_tg_path;
+ return Status::InternalError("cgroup v1 mkdir workload group
failed, path=",
+ _cgroup_v1_cpu_tg_path);
+ }
+ }
+
// quota path
- _cgroup_v1_cpu_query_quota_path = _cgroup_v1_cpu_query_path +
"/cpu.cfs_quota_us";
+ _cgroup_v1_cpu_tg_quota_file = _cgroup_v1_cpu_tg_path +
"/cpu.cfs_quota_us";
// task path
- _cgroup_v1_cpu_query_task_path = _cgroup_v1_cpu_query_path + "/tasks";
+ _cgroup_v1_cpu_tg_task_file = _cgroup_v1_cpu_tg_path + "/tasks";
LOG(INFO) << "cgroup v1 cpu path init success"
- << ", query path=" << _cgroup_v1_cpu_query_path
- << ", query quota path=" << _cgroup_v1_cpu_query_quota_path
- << ", query tasks path=" << _cgroup_v1_cpu_query_task_path
+ << ", query tg path=" << _cgroup_v1_cpu_tg_path
+ << ", query tg quota file path=" << _cgroup_v1_cpu_tg_quota_file
+ << ", query tg tasks file path=" << _cgroup_v1_cpu_tg_task_file
<< ", core num=" << _cpu_core_num;
_init_succ = true;
return Status::OK();
@@ -102,7 +113,7 @@ Status CgroupV1CpuCtl::init() {
Status CgroupV1CpuCtl::modify_cg_cpu_hard_limit_no_lock(int cpu_hard_limit) {
int val = _cpu_cfs_period_us * _cpu_core_num * cpu_hard_limit / 100;
std::string msg = "modify cpu quota value to " + std::to_string(val);
- return CgroupCpuCtl::write_cg_sys_file(_cgroup_v1_cpu_query_quota_path,
val, msg, false);
+ return CgroupCpuCtl::write_cg_sys_file(_cgroup_v1_cpu_tg_quota_file, val,
msg, false);
}
Status CgroupV1CpuCtl::add_thread_to_cgroup() {
@@ -112,6 +123,6 @@ Status CgroupV1CpuCtl::add_thread_to_cgroup() {
int tid = static_cast<int>(syscall(SYS_gettid));
std::string msg = "add thread " + std::to_string(tid) + " to group";
std::lock_guard<std::shared_mutex> w_lock(_lock_mutex);
- return CgroupCpuCtl::write_cg_sys_file(_cgroup_v1_cpu_query_task_path,
tid, msg, true);
+ return CgroupCpuCtl::write_cg_sys_file(_cgroup_v1_cpu_tg_task_file, tid,
msg, true);
}
} // namespace doris
diff --git a/be/src/agent/cgroup_cpu_ctl.h b/be/src/agent/cgroup_cpu_ctl.h
index 21eb367e444..c3a30660147 100644
--- a/be/src/agent/cgroup_cpu_ctl.h
+++ b/be/src/agent/cgroup_cpu_ctl.h
@@ -30,8 +30,8 @@ namespace doris {
class CgroupCpuCtl {
public:
- CgroupCpuCtl() {}
- virtual ~CgroupCpuCtl() {}
+ virtual ~CgroupCpuCtl() = default;
+ CgroupCpuCtl(uint64_t tg_id) { _tg_id = tg_id; }
virtual Status init();
@@ -50,6 +50,7 @@ protected:
uint64_t _cpu_hard_limit = 0;
std::shared_mutex _lock_mutex;
bool _init_succ = false;
+ uint64_t _tg_id; // workload group id
};
/*
@@ -66,23 +67,27 @@ protected:
4 doris query path
/sys/fs/cgroup/cpu/{doris_home}/query
- 5 doris query quota file:
- /sys/fs/cgroup/cpu/{doris_home}/query/cpu.cfs_quota_us
+ 5 workload group path
+ /sys/fs/cgroup/cpu/{doris_home}/query/{workload group id}
- 6 doris query tasks file:
- /sys/fs/cgroup/cpu/{doris_home}/query/tasks
+ 6 workload group quota file:
+ /sys/fs/cgroup/cpu/{doris_home}/query/{workload group
id}/cpu.cfs_quota_us
+
+ 7 workload group tasks file:
+ /sys/fs/cgroup/cpu/{doris_home}/query/{workload group id}/tasks
*/
class CgroupV1CpuCtl : public CgroupCpuCtl {
public:
+ CgroupV1CpuCtl(uint64_t tg_id) : CgroupCpuCtl(tg_id) {}
Status init() override;
Status modify_cg_cpu_hard_limit_no_lock(int cpu_hard_limit) override;
Status add_thread_to_cgroup() override;
private:
- // todo(wb) support load/compaction path
std::string _cgroup_v1_cpu_query_path;
- std::string _cgroup_v1_cpu_query_quota_path;
- std::string _cgroup_v1_cpu_query_task_path;
+ std::string _cgroup_v1_cpu_tg_path; // workload group path
+ std::string _cgroup_v1_cpu_tg_quota_file;
+ std::string _cgroup_v1_cpu_tg_task_file;
};
} // namespace doris
\ No newline at end of file
diff --git a/be/src/pipeline/pipeline_fragment_context.cpp
b/be/src/pipeline/pipeline_fragment_context.cpp
index a9eedb3a489..22a533c233e 100644
--- a/be/src/pipeline/pipeline_fragment_context.cpp
+++ b/be/src/pipeline/pipeline_fragment_context.cpp
@@ -687,7 +687,9 @@ Status PipelineFragmentContext::submit() {
int submit_tasks = 0;
Status st;
auto* scheduler = _exec_env->pipeline_task_scheduler();
- if (_task_group_entity) {
+ if (_query_ctx->get_task_scheduler()) {
+ scheduler = _query_ctx->get_task_scheduler();
+ } else if (_task_group_entity) {
scheduler = _exec_env->pipeline_task_group_scheduler();
}
for (auto& task : _tasks) {
diff --git a/be/src/pipeline/pipeline_task.h b/be/src/pipeline/pipeline_task.h
index 1f517d6c94f..417fab35b97 100644
--- a/be/src/pipeline/pipeline_task.h
+++ b/be/src/pipeline/pipeline_task.h
@@ -193,6 +193,7 @@ public:
taskgroup::TaskGroupPipelineTaskEntity* get_task_group_entity() const;
void set_task_queue(TaskQueue* task_queue);
+ TaskQueue* get_task_queue() { return _task_queue; }
static constexpr auto THREAD_TIME_SLICE = 100'000'000ULL;
static constexpr auto THREAD_TIME_SLICE_US = 100000L; // 100ms
diff --git a/be/src/pipeline/task_scheduler.cpp
b/be/src/pipeline/task_scheduler.cpp
index 1a44c57a74f..c58e4c5cf07 100644
--- a/be/src/pipeline/task_scheduler.cpp
+++ b/be/src/pipeline/task_scheduler.cpp
@@ -45,14 +45,12 @@
namespace doris::pipeline {
-BlockedTaskScheduler::BlockedTaskScheduler(std::shared_ptr<TaskQueue>
task_queue)
- : _task_queue(std::move(task_queue)), _started(false),
_shutdown(false) {}
+BlockedTaskScheduler::BlockedTaskScheduler() : _started(false),
_shutdown(false) {}
-Status BlockedTaskScheduler::start() {
+Status BlockedTaskScheduler::start(std::string sche_name) {
LOG(INFO) << "BlockedTaskScheduler start";
RETURN_IF_ERROR(Thread::create(
- "BlockedTaskScheduler", "schedule_blocked_pipeline", [this]() {
this->_schedule(); },
- &_thread));
+ "BlockedTaskScheduler", sche_name, [this]() { this->_schedule();
}, &_thread));
while (!this->_started.load()) {
std::this_thread::sleep_for(std::chrono::milliseconds(5));
}
@@ -185,7 +183,7 @@ void
BlockedTaskScheduler::_make_task_run(std::list<PipelineTask*>& local_tasks,
auto task = *task_itr;
task->set_state(t_state);
local_tasks.erase(task_itr++);
- static_cast<void>(_task_queue->push_back(task));
+ static_cast<void>(task->get_task_queue()->push_back(task));
}
TaskScheduler::~TaskScheduler() {
@@ -207,7 +205,7 @@ Status TaskScheduler::start() {
RETURN_IF_ERROR(
_fix_thread_pool->submit_func(std::bind(&TaskScheduler::_do_work, this, i)));
}
- return _blocked_task_scheduler->start();
+ return Status::OK();
}
Status TaskScheduler::schedule_task(PipelineTask* task) {
diff --git a/be/src/pipeline/task_scheduler.h b/be/src/pipeline/task_scheduler.h
index 943b11a8a21..9b85ec420ee 100644
--- a/be/src/pipeline/task_scheduler.h
+++ b/be/src/pipeline/task_scheduler.h
@@ -46,17 +46,15 @@ namespace doris::pipeline {
class BlockedTaskScheduler {
public:
- explicit BlockedTaskScheduler(std::shared_ptr<TaskQueue> task_queue);
+ explicit BlockedTaskScheduler();
~BlockedTaskScheduler() = default;
- Status start();
+ Status start(std::string sche_name);
void shutdown();
Status add_blocked_task(PipelineTask* task);
private:
- std::shared_ptr<TaskQueue> _task_queue;
-
std::mutex _task_mutex;
std::condition_variable _task_cond;
std::list<PipelineTask*> _blocked_tasks;
diff --git a/be/src/runtime/exec_env.h b/be/src/runtime/exec_env.h
index c1e1e77f87b..c6aa8216d90 100644
--- a/be/src/runtime/exec_env.h
+++ b/be/src/runtime/exec_env.h
@@ -45,7 +45,8 @@ class DeltaWriterV2Pool;
} // namespace vectorized
namespace pipeline {
class TaskScheduler;
-}
+class BlockedTaskScheduler;
+} // namespace pipeline
namespace taskgroup {
class TaskGroupManager;
}
@@ -268,7 +269,9 @@ public:
return _inverted_index_query_cache;
}
- CgroupCpuCtl* get_cgroup_cpu_ctl() { return _cgroup_cpu_ctl.get(); }
+ std::shared_ptr<doris::pipeline::BlockedTaskScheduler>
get_global_block_scheduler() {
+ return _global_block_scheduler;
+ }
private:
ExecEnv();
@@ -375,7 +378,12 @@ private:
segment_v2::InvertedIndexSearcherCache* _inverted_index_searcher_cache =
nullptr;
segment_v2::InvertedIndexQueryCache* _inverted_index_query_cache = nullptr;
- std::unique_ptr<CgroupCpuCtl> _cgroup_cpu_ctl = nullptr;
+ // used for query with group cpu hard limit
+ std::shared_ptr<doris::pipeline::BlockedTaskScheduler>
_global_block_scheduler;
+ // used for query without workload group
+ std::shared_ptr<doris::pipeline::BlockedTaskScheduler>
_without_group_block_scheduler;
+ // used for query with workload group cpu soft limit
+ std::shared_ptr<doris::pipeline::BlockedTaskScheduler>
_with_group_block_scheduler;
};
template <>
diff --git a/be/src/runtime/exec_env_init.cpp b/be/src/runtime/exec_env_init.cpp
index d9f7f31f7d2..c526c836ef5 100644
--- a/be/src/runtime/exec_env_init.cpp
+++ b/be/src/runtime/exec_env_init.cpp
@@ -280,29 +280,23 @@ Status ExecEnv::init_pipeline_task_scheduler() {
executors_size = CpuInfo::num_cores();
}
- if (!config::doris_cgroup_cpu_path.empty()) {
- _cgroup_cpu_ctl = std::make_unique<CgroupV1CpuCtl>();
- Status ret = _cgroup_cpu_ctl->init();
- if (!ret.ok()) {
- LOG(ERROR) << "init cgroup cpu controller failed";
- }
- } else {
- LOG(INFO) << "cgroup cpu controller is not inited";
- }
-
// TODO pipeline task group combie two blocked schedulers.
auto t_queue =
std::make_shared<pipeline::MultiCoreTaskQueue>(executors_size);
- auto b_scheduler =
std::make_shared<pipeline::BlockedTaskScheduler>(t_queue);
- _pipeline_task_scheduler = new pipeline::TaskScheduler(this, b_scheduler,
t_queue,
-
"WithoutGroupTaskSchePool", nullptr);
+ _without_group_block_scheduler =
std::make_shared<pipeline::BlockedTaskScheduler>();
+ _pipeline_task_scheduler = new pipeline::TaskScheduler(
+ this, _without_group_block_scheduler, t_queue,
"WithoutGroupTaskSchePool", nullptr);
RETURN_IF_ERROR(_pipeline_task_scheduler->start());
+
RETURN_IF_ERROR(_without_group_block_scheduler->start("WithoutGroupBlockSche"));
auto tg_queue =
std::make_shared<pipeline::TaskGroupTaskQueue>(executors_size);
- auto tg_b_scheduler =
std::make_shared<pipeline::BlockedTaskScheduler>(tg_queue);
+ _with_group_block_scheduler =
std::make_shared<pipeline::BlockedTaskScheduler>();
_pipeline_task_group_scheduler = new pipeline::TaskScheduler(
- this, tg_b_scheduler, tg_queue, "WithGroupTaskSchePool",
_cgroup_cpu_ctl.get());
+ this, _with_group_block_scheduler, tg_queue,
"WithGroupTaskSchePool", nullptr);
RETURN_IF_ERROR(_pipeline_task_group_scheduler->start());
+ RETURN_IF_ERROR(_with_group_block_scheduler->start("WithGroupBlockSche"));
+ _global_block_scheduler =
std::make_shared<pipeline::BlockedTaskScheduler>();
+ RETURN_IF_ERROR(_global_block_scheduler->start("GlobalBlockSche"));
return Status::OK();
}
@@ -547,6 +541,7 @@ void ExecEnv::destroy() {
SAFE_STOP(_routine_load_task_executor);
SAFE_STOP(_pipeline_task_scheduler);
SAFE_STOP(_pipeline_task_group_scheduler);
+ SAFE_STOP(_task_group_manager);
SAFE_STOP(_external_scan_context_mgr);
SAFE_STOP(_fragment_mgr);
// NewLoadStreamMgr should be destoried before storage_engine & after
fragment_mgr stopped.
@@ -644,6 +639,10 @@ void ExecEnv::destroy() {
// info is deconstructed then BE process will core at coordinator back
method in fragment mgr.
SAFE_DELETE(_master_info);
+ SAFE_SHUTDOWN(_global_block_scheduler.get());
+ SAFE_SHUTDOWN(_without_group_block_scheduler.get());
+ SAFE_SHUTDOWN(_with_group_block_scheduler.get());
+
LOG(INFO) << "Doris exec envorinment is destoried.";
}
diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp
index e3be7b94839..71297e0d4c3 100644
--- a/be/src/runtime/fragment_mgr.cpp
+++ b/be/src/runtime/fragment_mgr.cpp
@@ -640,21 +640,29 @@ Status FragmentMgr::_get_query_ctx(const Params& params,
TUniqueId query_id, boo
if constexpr (std::is_same_v<TPipelineFragmentParams, Params>) {
if (params.__isset.workload_groups &&
!params.workload_groups.empty()) {
taskgroup::TaskGroupInfo task_group_info;
- int query_cpu_hard_limit = -1;
- auto status = taskgroup::TaskGroupInfo::parse_group_info(
- params.workload_groups[0], &task_group_info,
&query_cpu_hard_limit);
+ auto status =
taskgroup::TaskGroupInfo::parse_group_info(params.workload_groups[0],
+
&task_group_info);
if (status.ok()) {
auto tg =
_exec_env->task_group_manager()->get_or_create_task_group(
task_group_info);
tg->add_mem_tracker_limiter(query_ctx->query_mem_tracker);
- query_ctx->set_task_group(tg);
+ uint64_t tg_id = tg->id();
+ std::string tg_name = tg->name();
LOG(INFO) << "Query/load id: " <<
print_id(query_ctx->query_id())
<< " use task group: " << tg->debug_string()
- << " query_cpu_hard_limit: " <<
query_cpu_hard_limit
+ << " cpu_hard_limit: " <<
task_group_info.cpu_hard_limit
<< " cpu_share:" << task_group_info.cpu_share;
- if (query_cpu_hard_limit > 0 &&
_exec_env->get_cgroup_cpu_ctl() != nullptr) {
- _exec_env->get_cgroup_cpu_ctl()->update_cpu_hard_limit(
- query_cpu_hard_limit);
+ if (task_group_info.cpu_hard_limit > 0) {
+ Status ret =
_exec_env->task_group_manager()->create_and_get_task_scheduler(
+ tg_id, tg_name,
task_group_info.cpu_hard_limit, _exec_env,
+ query_ctx.get());
+ if (!ret.ok()) {
+ LOG(INFO) << "workload group init failed "
+ << ", name=" << tg_name << ", id=" <<
tg_id
+ << ", reason=" << ret.to_string();
+ }
+ } else {
+ query_ctx->set_task_group(tg);
}
}
} else {
diff --git a/be/src/runtime/query_context.h b/be/src/runtime/query_context.h
index 2791291bf4d..24bb52b1634 100644
--- a/be/src/runtime/query_context.h
+++ b/be/src/runtime/query_context.h
@@ -225,6 +225,18 @@ public:
TUniqueId query_id() const { return _query_id; }
+ void set_task_scheduler(pipeline::TaskScheduler* task_scheduler) {
+ _task_scheduler = task_scheduler;
+ }
+
+ pipeline::TaskScheduler* get_task_scheduler() { return _task_scheduler; }
+
+ void set_scan_task_scheduler(vectorized::SimplifiedScanScheduler*
scan_task_scheduler) {
+ _scan_task_scheduler = scan_task_scheduler;
+ }
+
+ vectorized::SimplifiedScanScheduler* get_scan_scheduler() { return
_scan_task_scheduler; }
+
public:
DescriptorTbl* desc_tbl;
bool set_rsc_info = false;
@@ -283,6 +295,9 @@ private:
// All pipeline tasks use the same query context to report status. So we
need a `_exec_status`
// to report the real message if failed.
Status _exec_status = Status::OK();
+
+ pipeline::TaskScheduler* _task_scheduler = nullptr;
+ vectorized::SimplifiedScanScheduler* _scan_task_scheduler = nullptr;
};
} // namespace doris
diff --git a/be/src/runtime/task_group/task_group.cpp
b/be/src/runtime/task_group/task_group.cpp
index 758e543b034..7abc08d3c27 100644
--- a/be/src/runtime/task_group/task_group.cpp
+++ b/be/src/runtime/task_group/task_group.cpp
@@ -42,8 +42,7 @@ namespace taskgroup {
const static std::string CPU_SHARE = "cpu_share";
const static std::string MEMORY_LIMIT = "memory_limit";
const static std::string ENABLE_MEMORY_OVERCOMMIT = "enable_memory_overcommit";
-const static std::string QUERY_CPU_HARD_LIMIT =
- "query_cpu_hard_limit"; // sum of all query's cpu_hard_limit
+const static std::string CPU_HARD_LIMIT = "cpu_hard_limit";
template <typename QueueType>
TaskGroupEntity<QueueType>::TaskGroupEntity(taskgroup::TaskGroup* tg,
std::string type)
@@ -202,7 +201,7 @@ void TaskGroup::task_group_info(TaskGroupInfo* tg_info)
const {
}
Status TaskGroupInfo::parse_group_info(const TPipelineWorkloadGroup&
resource_group,
- TaskGroupInfo* task_group_info, int*
query_cpu_hard_limit) {
+ TaskGroupInfo* task_group_info) {
if (UNLIKELY(!check_group_info(resource_group))) {
std::stringstream ss;
ss << "incomplete resource group parameters: ";
@@ -215,14 +214,16 @@ Status TaskGroupInfo::parse_group_info(const
TPipelineWorkloadGroup& resource_gr
uint64_t share = 0;
std::from_chars(iter->second.c_str(), iter->second.c_str() +
iter->second.size(), share);
- auto iter2 = resource_group.properties.find(QUERY_CPU_HARD_LIMIT);
+ int cpu_hard_limit = 0;
+ auto iter2 = resource_group.properties.find(CPU_HARD_LIMIT);
std::from_chars(iter2->second.c_str(), iter2->second.c_str() +
iter2->second.size(),
- *query_cpu_hard_limit);
+ cpu_hard_limit);
task_group_info->id = resource_group.id;
task_group_info->name = resource_group.name;
task_group_info->version = resource_group.version;
task_group_info->cpu_share = share;
+ task_group_info->cpu_hard_limit = cpu_hard_limit;
bool is_percent = true;
auto mem_limit_str = resource_group.properties.find(MEMORY_LIMIT)->second;
diff --git a/be/src/runtime/task_group/task_group.h
b/be/src/runtime/task_group/task_group.h
index 8dd8b75fd10..1dc634469ee 100644
--- a/be/src/runtime/task_group/task_group.h
+++ b/be/src/runtime/task_group/task_group.h
@@ -168,9 +168,10 @@ struct TaskGroupInfo {
int64_t memory_limit;
bool enable_memory_overcommit;
int64_t version;
+ int cpu_hard_limit;
static Status parse_group_info(const TPipelineWorkloadGroup&
resource_group,
- TaskGroupInfo* task_group_info, int*
query_cpu_hard_limit);
+ TaskGroupInfo* task_group_info);
private:
static bool check_group_info(const TPipelineWorkloadGroup& resource_group);
diff --git a/be/src/runtime/task_group/task_group_manager.cpp
b/be/src/runtime/task_group/task_group_manager.cpp
index 179bf8911a2..e6ed60148e0 100644
--- a/be/src/runtime/task_group/task_group_manager.cpp
+++ b/be/src/runtime/task_group/task_group_manager.cpp
@@ -59,4 +59,76 @@ void TaskGroupManager::get_resource_groups(const
std::function<bool(const TaskGr
}
}
+Status TaskGroupManager::create_and_get_task_scheduler(uint64_t tg_id,
std::string tg_name,
+ int cpu_hard_limit,
ExecEnv* exec_env,
+ QueryContext*
query_ctx_ptr) {
+ std::lock_guard<std::mutex> lock(_task_scheduler_lock);
+ // step 1: init cgroup cpu controller
+ CgroupCpuCtl* cg_cu_ctl_ptr = nullptr;
+ if (_cgroup_ctl_map.find(tg_id) == _cgroup_ctl_map.end()) {
+ std::unique_ptr<CgroupCpuCtl> cgroup_cpu_ctl =
std::make_unique<CgroupV1CpuCtl>(tg_id);
+ Status ret = cgroup_cpu_ctl->init();
+ if (ret.ok()) {
+ cg_cu_ctl_ptr = cgroup_cpu_ctl.get();
+ _cgroup_ctl_map.emplace(tg_id, std::move(cgroup_cpu_ctl));
+ } else {
+ return Status::Error<INTERNAL_ERROR, false>("cgroup init failed,
gid={}", tg_id);
+ }
+ }
+
+ // step 2: init task scheduler
+ if (_tg_sche_map.find(tg_id) == _tg_sche_map.end()) {
+ int32_t executors_size = config::pipeline_executor_size;
+ if (executors_size <= 0) {
+ executors_size = CpuInfo::num_cores();
+ }
+ auto task_queue =
std::make_shared<pipeline::MultiCoreTaskQueue>(executors_size);
+
+ auto pipeline_task_scheduler =
std::make_unique<pipeline::TaskScheduler>(
+ exec_env, exec_env->get_global_block_scheduler(),
std::move(task_queue),
+ "Exec_" + tg_name, cg_cu_ctl_ptr);
+ Status ret = pipeline_task_scheduler->start();
+ if (ret.ok()) {
+ _tg_sche_map.emplace(tg_id, std::move(pipeline_task_scheduler));
+ } else {
+ return Status::Error<INTERNAL_ERROR, false>("task scheduler start
failed, gid={}",
+ tg_id);
+ }
+ }
+
+ // step 3: init scan scheduler
+ if (_tg_scan_sche_map.find(tg_id) == _tg_scan_sche_map.end()) {
+ auto scan_scheduler =
+ std::make_unique<vectorized::SimplifiedScanScheduler>(tg_name,
cg_cu_ctl_ptr);
+ Status ret = scan_scheduler->start();
+ if (ret.ok()) {
+ _tg_scan_sche_map.emplace(tg_id, std::move(scan_scheduler));
+ } else {
+ return Status::Error<INTERNAL_ERROR, false>("scan scheduler start
failed, gid={}",
+ tg_id);
+ }
+ }
+
+ // step 4 set exec/scan task scheudler to query ctx
+ pipeline::TaskScheduler* task_sche = _tg_sche_map.at(tg_id).get();
+ query_ctx_ptr->set_task_scheduler(task_sche);
+
+ vectorized::SimplifiedScanScheduler* scan_task_sche =
_tg_scan_sche_map.at(tg_id).get();
+ query_ctx_ptr->set_scan_task_scheduler(scan_task_sche);
+
+ // step 5 update cgroup cpu if needed
+ _cgroup_ctl_map.at(tg_id)->update_cpu_hard_limit(cpu_hard_limit);
+
+ return Status::OK();
+}
+
+void TaskGroupManager::stop() {
+ for (auto& task_sche : _tg_sche_map) {
+ task_sche.second->stop();
+ }
+ for (auto& task_sche : _tg_scan_sche_map) {
+ task_sche.second->stop();
+ }
+}
+
} // namespace doris::taskgroup
diff --git a/be/src/runtime/task_group/task_group_manager.h
b/be/src/runtime/task_group/task_group_manager.h
index baa6579b15d..e45cdeca7ea 100644
--- a/be/src/runtime/task_group/task_group_manager.h
+++ b/be/src/runtime/task_group/task_group_manager.h
@@ -21,10 +21,24 @@
#include <shared_mutex>
#include <unordered_map>
+#include "pipeline/task_queue.h"
+#include "pipeline/task_scheduler.h"
#include "task_group.h"
namespace doris {
class ExecEnv;
+class QueryContext;
+class CgroupCpuCtl;
+
+namespace vectorized {
+class SimplifiedScanScheduler;
+}
+
+namespace pipeline {
+class TaskScheduler;
+class MultiCoreTaskQueue;
+} // namespace pipeline
+
namespace taskgroup {
class TaskGroupManager {
@@ -37,9 +51,21 @@ public:
void get_resource_groups(const std::function<bool(const TaskGroupPtr&
ptr)>& pred,
std::vector<TaskGroupPtr>* task_groups);
+ Status create_and_get_task_scheduler(uint64_t wg_id, std::string wg_name,
int cpu_hard_limit,
+ ExecEnv* exec_env, QueryContext*
query_ctx_ptr);
+
+ void stop();
+
private:
std::shared_mutex _group_mutex;
std::unordered_map<uint64_t, TaskGroupPtr> _task_groups;
+
+ // map for workload group id and task scheduler pool
+ // used for cpu hard limit
+ std::mutex _task_scheduler_lock;
+ std::map<uint64_t, std::unique_ptr<doris::pipeline::TaskScheduler>>
_tg_sche_map;
+ std::map<uint64_t, std::unique_ptr<vectorized::SimplifiedScanScheduler>>
_tg_scan_sche_map;
+ std::map<uint64_t, std::unique_ptr<CgroupCpuCtl>> _cgroup_ctl_map;
};
} // namespace taskgroup
diff --git a/be/src/vec/exec/scan/scanner_context.cpp
b/be/src/vec/exec/scan/scanner_context.cpp
index 93637599411..e1c29d569aa 100644
--- a/be/src/vec/exec/scan/scanner_context.cpp
+++ b/be/src/vec/exec/scan/scanner_context.cpp
@@ -123,6 +123,10 @@ Status ScannerContext::init() {
// 3. get thread token
if (_state->get_query_ctx()) {
thread_token = _state->get_query_ctx()->get_token();
+ _simple_scan_scheduler = _state->get_query_ctx()->get_scan_scheduler();
+ if (_simple_scan_scheduler) {
+ _should_reset_thread_name = false;
+ }
}
#endif
diff --git a/be/src/vec/exec/scan/scanner_context.h
b/be/src/vec/exec/scan/scanner_context.h
index 07f9f055517..932fd294ff9 100644
--- a/be/src/vec/exec/scan/scanner_context.h
+++ b/be/src/vec/exec/scan/scanner_context.h
@@ -57,6 +57,7 @@ namespace vectorized {
class VScanner;
class VScanNode;
class ScannerScheduler;
+class SimplifiedScanScheduler;
// ScannerContext is responsible for recording the execution status
// of a group of Scanners corresponding to a ScanNode.
@@ -164,6 +165,7 @@ public:
}
taskgroup::TaskGroup* get_task_group() const;
+ SimplifiedScanScheduler* get_simple_scan_scheduler() { return
_simple_scan_scheduler; }
void reschedule_scanner_ctx();
@@ -173,6 +175,8 @@ public:
ThreadPoolToken* thread_token = nullptr;
std::vector<bthread_t> _btids;
+ bool _should_reset_thread_name = true;
+
private:
template <typename Parent>
Status _close_and_clear_scanners(Parent* parent, RuntimeState* state);
@@ -252,6 +256,7 @@ protected:
const int64_t _max_bytes_in_queue;
doris::vectorized::ScannerScheduler* _scanner_scheduler;
+ SimplifiedScanScheduler* _simple_scan_scheduler = nullptr; // used for cpu
hard limit
// List "scanners" saves all "unfinished" scanners.
// The scanner scheduler will pop scanners from this list, run scanner,
// and then if the scanner is not finished, will be pushed back to this
list.
diff --git a/be/src/vec/exec/scan/scanner_scheduler.cpp
b/be/src/vec/exec/scan/scanner_scheduler.cpp
index b7f05f0ec94..7c0064a1f4c 100644
--- a/be/src/vec/exec/scan/scanner_scheduler.cpp
+++ b/be/src/vec/exec/scan/scanner_scheduler.cpp
@@ -136,7 +136,6 @@ Status ScannerScheduler::init(ExecEnv* env) {
static_cast<void>(ThreadPoolBuilder("local_scan_group")
.set_min_threads(config::doris_scanner_thread_pool_thread_num)
.set_max_threads(config::doris_scanner_thread_pool_thread_num)
- .set_cgroup_cpu_ctl(env->get_cgroup_cpu_ctl())
.build(&_group_local_scan_thread_pool));
for (int i = 0; i < config::doris_scanner_thread_pool_thread_num; i++) {
static_cast<void>(_group_local_scan_thread_pool->submit_func([this] {
@@ -237,7 +236,13 @@ void ScannerScheduler::_schedule_scanners(ScannerContext*
ctx) {
TabletStorageType type = (*iter)->get_storage_type();
bool ret = false;
if (type == TabletStorageType::STORAGE_TYPE_LOCAL) {
- if (ctx->get_task_group() &&
config::enable_workload_group_for_scan) {
+ if (auto* scan_sche = ctx->get_simple_scan_scheduler()) {
+ auto work_func = [this, scanner = *iter, ctx] {
+ this->_scanner_scan(this, ctx, scanner);
+ };
+ SimplifiedScanTask simple_scan_task = {work_func, ctx};
+ ret =
scan_sche->get_scan_queue()->try_put(simple_scan_task);
+ } else if (ctx->get_task_group() &&
config::enable_workload_group_for_scan) {
auto work_func = [this, scanner = *iter, ctx] {
this->_scanner_scan(this, ctx, scanner);
};
@@ -312,10 +317,13 @@ void ScannerScheduler::_schedule_scanners(ScannerContext*
ctx) {
void ScannerScheduler::_scanner_scan(ScannerScheduler* scheduler,
ScannerContext* ctx,
VScannerSPtr scanner) {
SCOPED_ATTACH_TASK(scanner->runtime_state());
+ // for cpu hard limit, thread name should not be reset
#if !defined(USE_BTHREAD_SCANNER)
- Thread::set_self_name("_scanner_scan");
+ if (ctx->_should_reset_thread_name) {
+ Thread::set_self_name("_scanner_scan");
+ }
#else
- if (dynamic_cast<NewOlapScanner*>(scanner) == nullptr) {
+ if (dynamic_cast<NewOlapScanner*>(scanner) == nullptr &&
ctx->_should_reset_thread_name) {
Thread::set_self_name("_scanner_scan");
}
#endif
diff --git a/be/src/vec/exec/scan/scanner_scheduler.h
b/be/src/vec/exec/scan/scanner_scheduler.h
index 3a198b6c746..ad6f86c4f18 100644
--- a/be/src/vec/exec/scan/scanner_scheduler.h
+++ b/be/src/vec/exec/scan/scanner_scheduler.h
@@ -121,4 +121,64 @@ private:
config::doris_scanner_thread_pool_thread_num +
config::pipeline_executor_size;
};
+struct SimplifiedScanTask {
+ SimplifiedScanTask() = default;
+ SimplifiedScanTask(std::function<void()> scan_func,
+ vectorized::ScannerContext* scanner_context) {
+ this->scan_func = scan_func;
+ this->scanner_context = scanner_context;
+ }
+
+ std::function<void()> scan_func;
+ vectorized::ScannerContext* scanner_context;
+};
+
+// used for cpu hard limit
+class SimplifiedScanScheduler {
+public:
+ SimplifiedScanScheduler(std::string wg_name, CgroupCpuCtl* cgroup_cpu_ctl)
{
+ _scan_task_queue = std::make_unique<BlockingQueue<SimplifiedScanTask>>(
+ config::doris_scanner_thread_pool_queue_size);
+ _is_stop.store(false);
+ _cgroup_cpu_ctl = cgroup_cpu_ctl;
+ _wg_name = wg_name;
+ }
+
+ void stop() {
+ _is_stop.store(true);
+ _scan_thread_pool->shutdown();
+ _scan_thread_pool->wait();
+ }
+
+ Status start() {
+ RETURN_IF_ERROR(ThreadPoolBuilder("Scan_" + _wg_name)
+
.set_min_threads(config::doris_scanner_thread_pool_thread_num)
+
.set_max_threads(config::doris_scanner_thread_pool_thread_num)
+ .set_cgroup_cpu_ctl(_cgroup_cpu_ctl)
+ .build(&_scan_thread_pool));
+
+ for (int i = 0; i < config::doris_scanner_thread_pool_thread_num; i++)
{
+ RETURN_IF_ERROR(_scan_thread_pool->submit_func([this] {
this->_work(); }));
+ }
+ return Status::OK();
+ }
+
+ BlockingQueue<SimplifiedScanTask>* get_scan_queue() { return
_scan_task_queue.get(); }
+
+private:
+ void _work() {
+ while (!_is_stop.load()) {
+ SimplifiedScanTask scan_task;
+ _scan_task_queue->blocking_get(&scan_task);
+ scan_task.scan_func();
+ }
+ }
+
+ std::unique_ptr<ThreadPool> _scan_thread_pool;
+ std::unique_ptr<BlockingQueue<SimplifiedScanTask>> _scan_task_queue;
+ std::atomic<bool> _is_stop;
+ CgroupCpuCtl* _cgroup_cpu_ctl;
+ std::string _wg_name;
+};
+
} // namespace doris::vectorized
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/WorkloadGroupMgr.java
b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/WorkloadGroupMgr.java
index 3f0053237a9..1ec47d053fe 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/WorkloadGroupMgr.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/resource/workloadgroup/WorkloadGroupMgr.java
@@ -72,11 +72,6 @@ public class WorkloadGroupMgr implements Writable,
GsonPostProcessable {
private final ResourceProcNode procNode = new ResourceProcNode();
private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
- public static final String QUERY_CPU_HARD_LIMIT = "query_cpu_hard_limit";
- private int queryCPUHardLimit = 0;
- // works when user not set cpu hard limit, we fill a default value
- private int cpuHardLimitDefaultVal = 0;
-
public WorkloadGroupMgr() {
}
@@ -124,19 +119,13 @@ public class WorkloadGroupMgr implements Writable,
GsonPostProcessable {
throw new UserException("Workload group " + groupName + " does
not exist");
}
workloadGroups.add(workloadGroup.toThrift());
- // note(wb) -1 to tell be no need to update cgroup
- int thriftVal = -1;
- if (Config.enable_cpu_hard_limit) {
- // reset cpu_share according to cpu hard limit
- int cpuHardLimitShare = workloadGroup.getCpuHardLimit() == 0
- ? this.cpuHardLimitDefaultVal :
workloadGroup.getCpuHardLimit();
- workloadGroups.get(0).getProperties()
- .put(WorkloadGroup.CPU_SHARE,
String.valueOf(cpuHardLimitShare));
-
- // reset sum of all groups cpu hard limit
- thriftVal = this.queryCPUHardLimit;
+ // note(wb) -1 to tell be no need to not use cpu hard limit
+ int cpuHardLimitThriftVal = -1;
+ if (Config.enable_cpu_hard_limit &&
workloadGroup.getCpuHardLimit() > 0) {
+ cpuHardLimitThriftVal = workloadGroup.getCpuHardLimit();
}
- workloadGroups.get(0).getProperties().put(QUERY_CPU_HARD_LIMIT,
String.valueOf(thriftVal));
+
workloadGroups.get(0).getProperties().put(WorkloadGroup.CPU_HARD_LIMIT,
+ String.valueOf(cpuHardLimitThriftVal));
context.setWorkloadGroupName(groupName);
} finally {
readUnlock();
@@ -213,7 +202,6 @@ public class WorkloadGroupMgr implements Writable,
GsonPostProcessable {
checkGlobalUnlock(workloadGroup, null);
nameToWorkloadGroup.put(workloadGroupName, workloadGroup);
idToWorkloadGroup.put(workloadGroup.getId(), workloadGroup);
- calQueryCPUHardLimit();
Env.getCurrentEnv().getEditLog().logCreateWorkloadGroup(workloadGroup);
} finally {
writeUnlock();
@@ -240,44 +228,20 @@ public class WorkloadGroupMgr implements Writable,
GsonPostProcessable {
+ " value can not be greater than 100% or less
than or equal 0%");
}
- // 2, calculate new query hard cpu limit
- int tmpCpuHardLimit = 0;
- int zeroCpuHardLimitCount = 0;
+ // 2, check sum of all cpu hard limit
+ int sumOfAllCpuHardLimit = 0;
for (Map.Entry<Long, WorkloadGroup> entry :
idToWorkloadGroup.entrySet()) {
if (old != null && entry.getKey() == old.getId()) {
continue;
}
- int cpuHardLimit = entry.getValue().getCpuHardLimit();
- if (cpuHardLimit == 0) {
- zeroCpuHardLimitCount++;
- }
- tmpCpuHardLimit += cpuHardLimit;
+ sumOfAllCpuHardLimit += entry.getValue().getCpuHardLimit();
}
- if (newGroupCpuHardLimit == 0) {
- zeroCpuHardLimitCount++;
- }
- tmpCpuHardLimit += newGroupCpuHardLimit;
+ sumOfAllCpuHardLimit += newGroupCpuHardLimit;
- if (tmpCpuHardLimit > 100) {
+ if (sumOfAllCpuHardLimit > 100) {
throw new DdlException("sum of all workload group " +
WorkloadGroup.CPU_HARD_LIMIT
+ " can not be greater than 100% ");
}
-
- if (tmpCpuHardLimit == 100 && zeroCpuHardLimitCount > 0) {
- throw new DdlException("some workload group may not be assigned "
- + "cpu hard limit but all query cpu hard limit exceeds
100%");
- }
-
- int leftCpuHardLimitVal = 100 - tmpCpuHardLimit;
- if (zeroCpuHardLimitCount != 0) {
- int tmpCpuHardLimitDefaultVal = leftCpuHardLimitVal /
zeroCpuHardLimitCount;
- if (tmpCpuHardLimitDefaultVal == 0) {
- throw new DdlException("remaining cpu can not be assigned to
the "
- + "workload group without cpu hard limit value; "
- + leftCpuHardLimitVal + "%," + newGroupCpuHardLimit
- + "%," + zeroCpuHardLimitCount);
- }
- }
}
public void alterWorkloadGroup(AlterWorkloadGroupStmt stmt) throws
DdlException {
@@ -296,7 +260,6 @@ public class WorkloadGroupMgr implements Writable,
GsonPostProcessable {
checkGlobalUnlock(newWorkloadGroup, workloadGroup);
nameToWorkloadGroup.put(workloadGroupName, newWorkloadGroup);
idToWorkloadGroup.put(newWorkloadGroup.getId(), newWorkloadGroup);
- calQueryCPUHardLimit();
Env.getCurrentEnv().getEditLog().logAlterWorkloadGroup(newWorkloadGroup);
} finally {
writeUnlock();
@@ -331,7 +294,6 @@ public class WorkloadGroupMgr implements Writable,
GsonPostProcessable {
long groupId = workloadGroup.getId();
idToWorkloadGroup.remove(groupId);
nameToWorkloadGroup.remove(workloadGroupName);
- calQueryCPUHardLimit();
Env.getCurrentEnv().getEditLog().logDropWorkloadGroup(new
DropWorkloadGroupOperationLog(groupId));
} finally {
writeUnlock();
@@ -344,7 +306,6 @@ public class WorkloadGroupMgr implements Writable,
GsonPostProcessable {
try {
nameToWorkloadGroup.put(workloadGroup.getName(), workloadGroup);
idToWorkloadGroup.put(workloadGroup.getId(), workloadGroup);
- calQueryCPUHardLimit();
} finally {
writeUnlock();
}
@@ -377,7 +338,6 @@ public class WorkloadGroupMgr implements Writable,
GsonPostProcessable {
WorkloadGroup workloadGroup = idToWorkloadGroup.get(id);
nameToWorkloadGroup.remove(workloadGroup.getName());
idToWorkloadGroup.remove(id);
- calQueryCPUHardLimit();
} finally {
writeUnlock();
}
@@ -403,21 +363,6 @@ public class WorkloadGroupMgr implements Writable,
GsonPostProcessable {
return idToWorkloadGroup;
}
- private void calQueryCPUHardLimit() {
- int zeroCpuHardLimitCount = 0;
- int ret = 0;
- for (Map.Entry<Long, WorkloadGroup> entry :
idToWorkloadGroup.entrySet()) {
- if (entry.getValue().getCpuHardLimit() == 0) {
- zeroCpuHardLimitCount++;
- }
- ret += entry.getValue().getCpuHardLimit();
- }
- this.queryCPUHardLimit = ret;
- if (zeroCpuHardLimitCount != 0) {
- this.cpuHardLimitDefaultVal = (100 - this.queryCPUHardLimit) /
zeroCpuHardLimitCount;
- }
- }
-
@Override
public void write(DataOutput out) throws IOException {
String json = GsonUtils.GSON.toJson(this);
@@ -428,7 +373,6 @@ public class WorkloadGroupMgr implements Writable,
GsonPostProcessable {
public void gsonPostProcess() throws IOException {
idToWorkloadGroup.forEach(
(id, workloadGroup) ->
nameToWorkloadGroup.put(workloadGroup.getName(), workloadGroup));
- calQueryCPUHardLimit();
}
public class ResourceProcNode {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]