This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new a1eb46ae83f [Improment](executor)Create workload thread pool without
cgroup #31170
a1eb46ae83f is described below
commit a1eb46ae83f7611a5d89237ebb5856d68faed1de
Author: wangbo <[email protected]>
AuthorDate: Tue Feb 20 22:21:55 2024 +0800
[Improment](executor)Create workload thread pool without cgroup #31170
---
be/src/agent/workload_group_listener.cpp | 10 +----
be/src/runtime/task_group/task_group_manager.cpp | 50 +++++++++++++-----------
be/src/runtime/task_group/task_group_manager.h | 2 +-
docs/en/docs/admin-manual/workload-group.md | 10 ++---
docs/zh-CN/docs/admin-manual/workload-group.md | 12 +++---
5 files changed, 41 insertions(+), 43 deletions(-)
diff --git a/be/src/agent/workload_group_listener.cpp
b/be/src/agent/workload_group_listener.cpp
index 237d6c77274..5f9da64bd2a 100644
--- a/be/src/agent/workload_group_listener.cpp
+++ b/be/src/agent/workload_group_listener.cpp
@@ -26,7 +26,6 @@ namespace doris {
void WorkloadGroupListener::handle_topic_info(const std::vector<TopicInfo>&
topic_info_list) {
std::set<uint64_t> current_wg_ids;
- bool is_set_cgroup_path = config::doris_cgroup_cpu_path != "";
for (const TopicInfo& topic_info : topic_info_list) {
if (!topic_info.__isset.workload_group_info) {
continue;
@@ -51,12 +50,7 @@ void WorkloadGroupListener::handle_topic_info(const
std::vector<TopicInfo>& topi
task_group_info.enable_cpu_hard_limit);
// 4 create and update task scheduler
- Status ret2 =
_exec_env->task_group_manager()->upsert_cg_task_scheduler(&task_group_info,
-
_exec_env);
- if (is_set_cgroup_path && !ret2.ok()) {
- LOG(INFO) << "upsert task sche failed, tg_id=" <<
task_group_info.id
- << ", reason=" << ret2.to_string();
- }
+
_exec_env->task_group_manager()->upsert_cg_task_scheduler(&task_group_info,
_exec_env);
LOG(INFO) << "update task group finish, tg info=" << tg->debug_string()
<< ", enable_cpu_hard_limit="
@@ -65,7 +59,7 @@ void WorkloadGroupListener::handle_topic_info(const
std::vector<TopicInfo>& topi
<< ", cgroup cpu_hard_limit=" <<
task_group_info.cgroup_cpu_hard_limit
<< ", enable_cgroup_cpu_soft_limit="
<< (config::enable_cgroup_cpu_soft_limit ? "true" : "false")
- << ", is set cgroup path=" << (is_set_cgroup_path ? "true" :
"flase");
+ << ", cgroup home path=" << config::doris_cgroup_cpu_path;
}
_exec_env->task_group_manager()->delete_task_group_by_ids(current_wg_ids);
diff --git a/be/src/runtime/task_group/task_group_manager.cpp
b/be/src/runtime/task_group/task_group_manager.cpp
index 18e446295cb..718d69021e7 100644
--- a/be/src/runtime/task_group/task_group_manager.cpp
+++ b/be/src/runtime/task_group/task_group_manager.cpp
@@ -89,8 +89,8 @@ void TaskGroupManager::get_query_scheduler(uint64_t tg_id,
}
}
-Status TaskGroupManager::upsert_cg_task_scheduler(taskgroup::TaskGroupInfo*
tg_info,
- ExecEnv* exec_env) {
+void TaskGroupManager::upsert_cg_task_scheduler(taskgroup::TaskGroupInfo*
tg_info,
+ ExecEnv* exec_env) {
uint64_t tg_id = tg_info->id;
std::string tg_name = tg_info->name;
int cpu_hard_limit = tg_info->cpu_hard_limit;
@@ -101,15 +101,17 @@ Status
TaskGroupManager::upsert_cg_task_scheduler(taskgroup::TaskGroupInfo* tg_i
std::lock_guard<std::shared_mutex> write_lock(_task_scheduler_lock);
// step 1: init cgroup cpu controller
CgroupCpuCtl* cg_cu_ctl_ptr = nullptr;
- if (_cgroup_ctl_map.find(tg_id) == _cgroup_ctl_map.end()) {
+ if (config::doris_cgroup_cpu_path != "" &&
+ _cgroup_ctl_map.find(tg_id) == _cgroup_ctl_map.end()) {
std::unique_ptr<CgroupCpuCtl> cgroup_cpu_ctl =
std::make_unique<CgroupV1CpuCtl>(tg_id);
Status ret = cgroup_cpu_ctl->init();
if (ret.ok()) {
cg_cu_ctl_ptr = cgroup_cpu_ctl.get();
_cgroup_ctl_map.emplace(tg_id, std::move(cgroup_cpu_ctl));
+ LOG(INFO) << "[upsert wg thread pool] cgroup init success";
} else {
- return Status::InternalError<false>("cgroup init failed, gid={},
reason={}", tg_id,
- ret.to_string());
+ LOG(INFO) << "[upsert wg thread pool] cgroup init failed, gid= "
<< tg_id
+ << ", reason=" << ret.to_string();
}
}
@@ -128,7 +130,7 @@ Status
TaskGroupManager::upsert_cg_task_scheduler(taskgroup::TaskGroupInfo* tg_i
if (ret.ok()) {
_tg_sche_map.emplace(tg_id, std::move(pipeline_task_scheduler));
} else {
- return Status::InternalError<false>("task scheduler start failed,
gid={}", tg_id);
+ LOG(INFO) << "[upsert wg thread pool] task scheduler start failed,
gid= " << tg_id;
}
}
@@ -140,7 +142,7 @@ Status
TaskGroupManager::upsert_cg_task_scheduler(taskgroup::TaskGroupInfo* tg_i
if (ret.ok()) {
_tg_scan_sche_map.emplace(tg_id, std::move(scan_scheduler));
} else {
- return Status::InternalError<false>("scan scheduler start failed,
gid={}", tg_id);
+ LOG(INFO) << "[upsert wg thread pool] scan scheduler start failed,
gid=" << tg_id;
}
}
if (scan_thread_num > 0 && _tg_scan_sche_map.find(tg_id) !=
_tg_scan_sche_map.end()) {
@@ -157,31 +159,33 @@ Status
TaskGroupManager::upsert_cg_task_scheduler(taskgroup::TaskGroupInfo* tg_i
.set_cgroup_cpu_ctl(cg_cu_ctl_ptr)
.build(&thread_pool);
if (!ret.ok()) {
- LOG(INFO) << "create non-pipline thread pool failed";
+ LOG(INFO) << "[upsert wg thread pool] create non-pipline thread
pool failed, gid="
+ << tg_id;
} else {
_non_pipe_thread_pool_map.emplace(tg_id, std::move(thread_pool));
}
}
// step 5: update cgroup cpu if needed
- if (enable_cpu_hard_limit) {
- if (cpu_hard_limit > 0) {
- _cgroup_ctl_map.at(tg_id)->update_cpu_hard_limit(cpu_hard_limit);
-
_cgroup_ctl_map.at(tg_id)->update_cpu_soft_limit(CPU_SOFT_LIMIT_DEFAULT_VALUE);
+ if (_cgroup_ctl_map.find(tg_id) != _cgroup_ctl_map.end()) {
+ if (enable_cpu_hard_limit) {
+ if (cpu_hard_limit > 0) {
+
_cgroup_ctl_map.at(tg_id)->update_cpu_hard_limit(cpu_hard_limit);
+
_cgroup_ctl_map.at(tg_id)->update_cpu_soft_limit(CPU_SOFT_LIMIT_DEFAULT_VALUE);
+ } else {
+ LOG(INFO) << "[upsert wg thread pool] enable cpu hard limit
but value is illegal: "
+ << cpu_hard_limit << ", gid=" << tg_id;
+ }
} else {
- return Status::InternalError<false>("enable cpu hard limit but
value is illegal");
- }
- } else {
- if (config::enable_cgroup_cpu_soft_limit) {
- _cgroup_ctl_map.at(tg_id)->update_cpu_soft_limit(cpu_shares);
- _cgroup_ctl_map.at(tg_id)->update_cpu_hard_limit(
- CPU_HARD_LIMIT_DEFAULT_VALUE); // disable cpu hard limit
+ if (config::enable_cgroup_cpu_soft_limit) {
+ _cgroup_ctl_map.at(tg_id)->update_cpu_soft_limit(cpu_shares);
+ _cgroup_ctl_map.at(tg_id)->update_cpu_hard_limit(
+ CPU_HARD_LIMIT_DEFAULT_VALUE); // disable cpu hard
limit
+ }
}
+
_cgroup_ctl_map.at(tg_id)->get_cgroup_cpu_info(&(tg_info->cgroup_cpu_shares),
+
&(tg_info->cgroup_cpu_hard_limit));
}
-
_cgroup_ctl_map.at(tg_id)->get_cgroup_cpu_info(&(tg_info->cgroup_cpu_shares),
-
&(tg_info->cgroup_cpu_hard_limit));
-
- return Status::OK();
}
void TaskGroupManager::delete_task_group_by_ids(std::set<uint64_t> used_wg_id)
{
diff --git a/be/src/runtime/task_group/task_group_manager.h
b/be/src/runtime/task_group/task_group_manager.h
index 1a1a614d068..29e5c30a596 100644
--- a/be/src/runtime/task_group/task_group_manager.h
+++ b/be/src/runtime/task_group/task_group_manager.h
@@ -51,7 +51,7 @@ public:
void get_related_taskgroups(const std::function<bool(const TaskGroupPtr&
ptr)>& pred,
std::vector<TaskGroupPtr>* task_groups);
- Status upsert_cg_task_scheduler(taskgroup::TaskGroupInfo* tg_info,
ExecEnv* exec_env);
+ void upsert_cg_task_scheduler(taskgroup::TaskGroupInfo* tg_info, ExecEnv*
exec_env);
void delete_task_group_by_ids(std::set<uint64_t> id_set);
diff --git a/docs/en/docs/admin-manual/workload-group.md
b/docs/en/docs/admin-manual/workload-group.md
index 93216a91fb9..01ebfdf156a 100644
--- a/docs/en/docs/admin-manual/workload-group.md
+++ b/docs/en/docs/admin-manual/workload-group.md
@@ -71,7 +71,9 @@ Notes:
Doris 2.0 version uses Doris scheduling to limit CPU resources, but since
version 2.1, Doris defaults to using CGgroup v1 to limit CPU resources (CGgroup
v2 is currently not supported). Therefore, if CPU resources are expected to be
limited in version 2.1, it is necessary to have CGgroup v1 installed on the
node where BE is located.
-If users use the Workload Group software limit in version 2.0 and upgrade to
version 2.1, they also need to configure CGroup.
+If users use the Workload Group software limit in version 2.0 and upgrade to
version 2.1, they also need to configure CGroup, Otherwise, cpu soft limit may
not work.
+
+Without configuring cgroup, users can use all functions of the workload group
except for CPU limitations.
1 Firstly, confirm that the CGgroup v1 version has been installed on the node
where BE is located, and the path ```/sys/fs/cgroup/cpu/``` exists.
@@ -90,13 +92,11 @@ chonw -R doris:doris /sys/fs/cgroup/cpu/doris
4 Modify the configuration of BE and specify the path to cgroup
```
-1:modify be.conf in disk
doris_cgroup_cpu_path = /sys/fs/cgroup/cpu/doris
-
-2 modify be conf in memory
-curl -X POST
http://{be_ip}:{be_http_port}/api/update_config?doris_cgroup_cpu_path=/sys/fs/cgroup/cpu/doris
```
+5 restart BE, in the log (be. INFO), you can see the words "add thread xxx to
group" indicating successful configuration.
+
It should be noted that the current workload group does not support the
deployment of multiple BE on same machine.
## Workload group usage
diff --git a/docs/zh-CN/docs/admin-manual/workload-group.md
b/docs/zh-CN/docs/admin-manual/workload-group.md
index d25fd26212e..2b36089fdeb 100644
--- a/docs/zh-CN/docs/admin-manual/workload-group.md
+++ b/docs/zh-CN/docs/admin-manual/workload-group.md
@@ -69,9 +69,11 @@ Workload Group是从2.0版本开始支持的功能,Workload Group在2.0版本
## 配置cgroup v1的环境
Doris的2.0版本使用基于Doris的调度实现CPU资源的限制,但是从2.1版本起,Doris默认使用基于CGroup
v1版本对CPU资源进行限制(暂不支持CGroup v2),因此如果期望在2.1版本对CPU资源进行约束,那么需要BE所在的节点上已经安装好CGroup
v1的环境。
-用户如果在2.0版本使用了Workload Group的软限并升级到了2.1版本,那么也需要配置CGroup。
+用户如果在2.0版本使用了Workload Group的软限并升级到了2.1版本,那么也需要配置CGroup,否则可能导致软限失效。
-1 首先确认BE所在节点已经安装好CGroup v1版本,确认存在路径```/sys/fs/cgroup/cpu/```即可。
+在不配置cgroup的情况下,用户可以使用workload group除CPU限制外的所有功能。
+
+1 首先确认BE所在节点已经安装好CGroup v1版本,确认存在路径```/sys/fs/cgroup/cpu/```即可
2 在cgroup的cpu路径下新建一个名为doris的目录,这个目录名用户可以自行指定
@@ -88,13 +90,11 @@ chonw -R doris:doris /sys/fs/cgroup/cpu/doris
4 修改BE的配置,指定cgroup的路径
```
-方法1:修改be.conf然后重启BE
doris_cgroup_cpu_path = /sys/fs/cgroup/cpu/doris
-
-方法2:可以通过动态修改内存的方式
-curl -X POST
http://{be_ip}:{be_http_port}/api/update_config?doris_cgroup_cpu_path=/sys/fs/cgroup/cpu/doris
```
+5 重启BE,在日志(be.INFO)可以看到"add thread xxx to group"的字样代表配置成功
+
需要注意的是,目前的workload group暂时不支持一个机器多个BE的部署方式。
## workload group使用
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]