This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 3595f214058 [improvement](executor)clear unused cgroup path (#27798)
3595f214058 is described below
commit 3595f2140589f0d92da6be007d2dd2d6fcb2285d
Author: wangbo <[email protected]>
AuthorDate: Tue Dec 5 14:18:23 2023 +0800
[improvement](executor)clear unused cgroup path (#27798)
* clear unused cgroup path
* use C++ api
* add gcc header
---
be/src/agent/cgroup_cpu_ctl.cpp | 76 ++++++++++++++++++-
be/src/agent/cgroup_cpu_ctl.h | 8 +-
be/src/agent/workload_group_listener.cpp | 2 +-
be/src/runtime/task_group/task_group_manager.cpp | 94 ++++++++++++++++--------
be/src/runtime/task_group/task_group_manager.h | 6 +-
5 files changed, 150 insertions(+), 36 deletions(-)
diff --git a/be/src/agent/cgroup_cpu_ctl.cpp b/be/src/agent/cgroup_cpu_ctl.cpp
index 6fa234e6ee2..a494681d082 100644
--- a/be/src/agent/cgroup_cpu_ctl.cpp
+++ b/be/src/agent/cgroup_cpu_ctl.cpp
@@ -18,6 +18,9 @@
#include "agent/cgroup_cpu_ctl.h"
#include <fmt/format.h>
+#include <sys/stat.h>
+
+#include <filesystem>
namespace doris {
@@ -100,11 +103,34 @@ Status CgroupV1CpuCtl::init() {
int ret = mkdir(_cgroup_v1_cpu_query_path.c_str(), S_IRWXU);
if (ret != 0) {
LOG(ERROR) << "cgroup v1 mkdir query failed, path=" <<
_cgroup_v1_cpu_query_path;
- return Status::InternalError<false>("cgroup v1 mkdir query failed,
path=",
+ return Status::InternalError<false>("cgroup v1 mkdir query failed,
path={}",
_cgroup_v1_cpu_query_path);
}
}
+ // check whether current user specified path is a valid cgroup path
+ std::string query_path_tasks = _cgroup_v1_cpu_query_path + "/tasks";
+ std::string query_path_cpu_shares = _cgroup_v1_cpu_query_path +
"/cpu.shares";
+ std::string query_path_quota = _cgroup_v1_cpu_query_path +
"/cpu.cfs_quota_us";
+ if (access(query_path_tasks.c_str(), F_OK) != 0) {
+ return Status::InternalError<false>("invalid cgroup path, not find
task file");
+ }
+ if (access(query_path_cpu_shares.c_str(), F_OK) != 0) {
+ return Status::InternalError<false>("invalid cgroup path, not find cpu
share file");
+ }
+ if (access(query_path_quota.c_str(), F_OK) != 0) {
+ return Status::InternalError<false>("invalid cgroup path, not find cpu
quota file");
+ }
+
+ if (_tg_id == -1) {
+ // means current cgroup cpu ctl is just used to clear dir,
+ // it does not contains task group.
+ // todo(wb) rethinking whether need to refactor cgroup_cpu_ctl
+ _init_succ = true;
+ LOG(INFO) << "init cgroup cpu query path succ, path=" <<
_cgroup_v1_cpu_query_path;
+ return Status::OK();
+ }
+
// workload group path
_cgroup_v1_cpu_tg_path = _cgroup_v1_cpu_query_path + "/" +
std::to_string(_tg_id);
if (access(_cgroup_v1_cpu_tg_path.c_str(), F_OK) != 0) {
@@ -157,4 +183,52 @@ Status CgroupV1CpuCtl::add_thread_to_cgroup() {
return CgroupCpuCtl::write_cg_sys_file(_cgroup_v1_cpu_tg_task_file, tid,
msg, true);
#endif
}
+
+Status CgroupV1CpuCtl::delete_unused_cgroup_path(std::set<uint64_t>&
used_wg_ids) {
+ if (!_init_succ) {
+ return Status::InternalError<false>(
+ "cgroup cpu ctl init failed, delete can not be executed");
+ }
+ // 1 get unused wg id
+ std::set<std::string> unused_wg_ids;
+ for (const auto& entry :
std::filesystem::directory_iterator(_cgroup_v1_cpu_query_path)) {
+ const std::string dir_name = entry.path().string();
+ struct stat st;
+ // == 0 means exists
+ if (stat(dir_name.c_str(), &st) == 0 && (st.st_mode & S_IFDIR)) {
+ int pos = dir_name.rfind("/");
+ std::string wg_dir_name = dir_name.substr(pos + 1,
dir_name.length());
+ if (wg_dir_name.empty()) {
+ return Status::InternalError<false>("find an empty workload
group path, path={}",
+ dir_name);
+ }
+ if (std::all_of(wg_dir_name.begin(), wg_dir_name.end(),
::isdigit)) {
+ uint64_t id_in_path = std::stoll(wg_dir_name);
+ if (used_wg_ids.find(id_in_path) == used_wg_ids.end()) {
+ unused_wg_ids.insert(wg_dir_name);
+ }
+ }
+ }
+ }
+
+ // 2 delete unused cgroup path
+ int failed_count = 0;
+ std::string query_path = _cgroup_v1_cpu_query_path.back() != '/'
+ ? _cgroup_v1_cpu_query_path + "/"
+ : _cgroup_v1_cpu_query_path;
+ for (const std::string& unused_wg_id : unused_wg_ids) {
+ std::string wg_path = query_path + unused_wg_id;
+ int ret = rmdir(wg_path.c_str());
+ if (ret < 0) {
+ LOG(WARNING) << "rmdir failed, path=" << wg_path;
+ failed_count++;
+ }
+ }
+ if (failed_count != 0) {
+ return Status::InternalError<false>("error happens when delete unused
path, count={}",
+ failed_count);
+ }
+ return Status::OK();
+}
+
} // namespace doris
diff --git a/be/src/agent/cgroup_cpu_ctl.h b/be/src/agent/cgroup_cpu_ctl.h
index 2a7cdc5719b..94514c8e2e0 100644
--- a/be/src/agent/cgroup_cpu_ctl.h
+++ b/be/src/agent/cgroup_cpu_ctl.h
@@ -37,6 +37,7 @@ const static uint64_t CPU_SOFT_LIMIT_DEFAULT_VALUE = 1024;
class CgroupCpuCtl {
public:
virtual ~CgroupCpuCtl() = default;
+ CgroupCpuCtl() = default;
CgroupCpuCtl(uint64_t tg_id) { _tg_id = tg_id; }
virtual Status init();
@@ -50,6 +51,8 @@ public:
// for log
void get_cgroup_cpu_info(uint64_t* cpu_shares, int* cpu_hard_limit);
+ virtual Status delete_unused_cgroup_path(std::set<uint64_t>& used_wg_ids)
= 0;
+
protected:
Status write_cg_sys_file(std::string file_path, int value, std::string
msg, bool is_append);
@@ -63,7 +66,7 @@ protected:
int _cpu_hard_limit = 0;
std::shared_mutex _lock_mutex;
bool _init_succ = false;
- uint64_t _tg_id; // workload group id
+ uint64_t _tg_id = -1; // workload group id
uint64_t _cpu_shares = 0;
};
@@ -96,11 +99,14 @@ protected:
class CgroupV1CpuCtl : public CgroupCpuCtl {
public:
CgroupV1CpuCtl(uint64_t tg_id) : CgroupCpuCtl(tg_id) {}
+ CgroupV1CpuCtl() = default;
Status init() override;
Status modify_cg_cpu_hard_limit_no_lock(int cpu_hard_limit) override;
Status modify_cg_cpu_soft_limit_no_lock(int cpu_shares) override;
Status add_thread_to_cgroup() override;
+ Status delete_unused_cgroup_path(std::set<uint64_t>& used_wg_ids) override;
+
private:
std::string _cgroup_v1_cpu_query_path;
std::string _cgroup_v1_cpu_tg_path; // workload group path
diff --git a/be/src/agent/workload_group_listener.cpp
b/be/src/agent/workload_group_listener.cpp
index 6d7dfb9a3a0..f2770e8e7c4 100644
--- a/be/src/agent/workload_group_listener.cpp
+++ b/be/src/agent/workload_group_listener.cpp
@@ -57,7 +57,7 @@ void WorkloadGroupListener::handle_topic_info(const
std::vector<TopicInfo>& topi
<< ", reason=" << ret2.to_string();
}
- LOG(INFO) << "update task group success, tg info=" <<
tg->debug_string()
+ LOG(INFO) << "update task group finish, tg info=" << tg->debug_string()
<< ", enable_cpu_hard_limit="
<< (_exec_env->task_group_manager()->enable_cpu_hard_limit()
? "true" : "false")
<< ", cgroup cpu_shares=" <<
task_group_info.cgroup_cpu_shares
diff --git a/be/src/runtime/task_group/task_group_manager.cpp
b/be/src/runtime/task_group/task_group_manager.cpp
index da6294045f8..98043c6395a 100644
--- a/be/src/runtime/task_group/task_group_manager.cpp
+++ b/be/src/runtime/task_group/task_group_manager.cpp
@@ -68,7 +68,7 @@ TaskGroupPtr TaskGroupManager::get_task_group_by_id(uint64_t
tg_id) {
}
bool TaskGroupManager::set_cg_task_sche_for_query_ctx(uint64_t tg_id,
QueryContext* query_ctx_ptr) {
- std::lock_guard<std::mutex> lock(_task_scheduler_lock);
+ std::lock_guard<std::shared_mutex> write_lock(_task_scheduler_lock);
if (_tg_sche_map.find(tg_id) != _tg_sche_map.end()) {
query_ctx_ptr->set_task_scheduler(_tg_sche_map.at(tg_id).get());
} else {
@@ -91,7 +91,7 @@ Status
TaskGroupManager::upsert_cg_task_scheduler(taskgroup::TaskGroupInfo* tg_i
uint64_t cpu_shares = tg_info->cpu_share;
bool enable_cpu_hard_limit = tg_info->enable_cpu_hard_limit;
- std::lock_guard<std::mutex> lock(_task_scheduler_lock);
+ std::lock_guard<std::shared_mutex> write_lock(_task_scheduler_lock);
// step 1: init cgroup cpu controller
CgroupCpuCtl* cg_cu_ctl_ptr = nullptr;
if (_cgroup_ctl_map.find(tg_id) == _cgroup_ctl_map.end()) {
@@ -101,7 +101,8 @@ Status
TaskGroupManager::upsert_cg_task_scheduler(taskgroup::TaskGroupInfo* tg_i
cg_cu_ctl_ptr = cgroup_cpu_ctl.get();
_cgroup_ctl_map.emplace(tg_id, std::move(cgroup_cpu_ctl));
} else {
- return Status::InternalError<false>("cgroup init failed, gid={}",
tg_id);
+ return Status::InternalError<false>("cgroup init failed, gid={},
reason={}", tg_id,
+ ret.to_string());
}
}
@@ -157,54 +158,83 @@ Status
TaskGroupManager::upsert_cg_task_scheduler(taskgroup::TaskGroupInfo* tg_i
return Status::OK();
}
-void TaskGroupManager::delete_task_group_by_ids(std::set<uint64_t> id_set) {
+void TaskGroupManager::delete_task_group_by_ids(std::set<uint64_t> used_wg_id)
{
+ // stop task sche may cost some time, so it should not be locked
+ std::set<doris::pipeline::TaskScheduler*> task_sche_to_del;
+ std::set<vectorized::SimplifiedScanScheduler*> scan_task_sche_to_del;
+ std::set<uint64_t> deleted_tg_ids;
{
- std::lock_guard<std::shared_mutex> w_lock(_group_mutex);
- for (auto iter = _task_groups.begin(); iter != _task_groups.end();) {
+ std::shared_lock<std::shared_mutex> read_lock(_task_scheduler_lock);
+ for (auto iter = _tg_sche_map.begin(); iter != _tg_sche_map.end();
iter++) {
uint64_t tg_id = iter->first;
- if (id_set.find(tg_id) == id_set.end()) {
- iter = _task_groups.erase(iter);
- } else {
- iter++;
+ if (used_wg_id.find(tg_id) == used_wg_id.end()) {
+ task_sche_to_del.insert(_tg_sche_map[tg_id].get());
+ deleted_tg_ids.insert(tg_id);
}
}
- }
- // stop task sche may cost some time, so it should not be locked
- // task scheduler is stoped in task scheduler's destructor
- std::set<std::unique_ptr<doris::pipeline::TaskScheduler>> task_sche_to_del;
- std::set<std::unique_ptr<vectorized::SimplifiedScanScheduler>>
scan_task_sche_to_del;
- {
- std::lock_guard<std::mutex> lock(_task_scheduler_lock);
- for (auto iter = _tg_sche_map.begin(); iter != _tg_sche_map.end();) {
+ for (auto iter = _tg_scan_sche_map.begin(); iter !=
_tg_scan_sche_map.end(); iter++) {
uint64_t tg_id = iter->first;
- if (id_set.find(tg_id) == id_set.end()) {
- task_sche_to_del.insert(std::move(_tg_sche_map[tg_id]));
- iter = _tg_sche_map.erase(iter);
- } else {
- iter++;
+ if (used_wg_id.find(tg_id) == used_wg_id.end()) {
+ scan_task_sche_to_del.insert(_tg_scan_sche_map[tg_id].get());
}
}
+ }
+ // 1 stop all threads
+ for (auto* ptr1 : task_sche_to_del) {
+ ptr1->stop();
+ }
+ for (auto* ptr2 : scan_task_sche_to_del) {
+ ptr2->stop();
+ }
+ // 2 release resource in memory
+ {
+ std::lock_guard<std::shared_mutex> write_lock(_task_scheduler_lock);
+ for (uint64_t tg_id : deleted_tg_ids) {
+ _tg_sche_map.erase(tg_id);
+ _tg_scan_sche_map.erase(tg_id);
+ _cgroup_ctl_map.erase(tg_id);
+ }
+ }
- for (auto iter = _tg_scan_sche_map.begin(); iter !=
_tg_scan_sche_map.end();) {
+ {
+ std::lock_guard<std::shared_mutex> write_lock(_group_mutex);
+ for (auto iter = _task_groups.begin(); iter != _task_groups.end();) {
uint64_t tg_id = iter->first;
- if (id_set.find(tg_id) == id_set.end()) {
-
scan_task_sche_to_del.insert(std::move(_tg_scan_sche_map[tg_id]));
- iter = _tg_scan_sche_map.erase(iter);
+ if (used_wg_id.find(tg_id) == used_wg_id.end()) {
+ iter = _task_groups.erase(iter);
} else {
iter++;
}
}
+ }
- for (auto iter = _cgroup_ctl_map.begin(); iter !=
_cgroup_ctl_map.end();) {
- uint64_t tg_id = iter->first;
- if (id_set.find(tg_id) == id_set.end()) {
- iter = _cgroup_ctl_map.erase(iter);
+ // 3 clear cgroup dir
+ // NOTE(wb) currently we use rmdir to delete cgroup path,
+ // this action may be failed until task file is cleared which means all
thread are stopped.
+ // So the first time to rmdir a cgroup path may failed.
+ // Using cgdelete has no such issue.
+ {
+ std::lock_guard<std::shared_mutex> write_lock(_init_cg_ctl_lock);
+ if (!_cg_cpu_ctl) {
+ _cg_cpu_ctl = std::make_unique<CgroupV1CpuCtl>();
+ }
+ if (!_is_init_succ) {
+ Status ret = _cg_cpu_ctl->init();
+ if (ret.ok()) {
+ _is_init_succ = true;
} else {
- iter++;
+ LOG(INFO) << "init task group mgr cpu ctl failed, " <<
ret.to_string();
+ }
+ }
+ if (_is_init_succ) {
+ Status ret = _cg_cpu_ctl->delete_unused_cgroup_path(used_wg_id);
+ if (!ret.ok()) {
+ LOG(WARNING) << ret.to_string();
}
}
}
+ LOG(INFO) << "finish clear unused cgroup path";
}
void TaskGroupManager::stop() {
diff --git a/be/src/runtime/task_group/task_group_manager.h
b/be/src/runtime/task_group/task_group_manager.h
index 91156237f40..08968b6fe99 100644
--- a/be/src/runtime/task_group/task_group_manager.h
+++ b/be/src/runtime/task_group/task_group_manager.h
@@ -77,10 +77,14 @@ private:
// map for workload group id and task scheduler pool
// used for cpu hard limit
- std::mutex _task_scheduler_lock;
+ std::shared_mutex _task_scheduler_lock;
std::map<uint64_t, std::unique_ptr<doris::pipeline::TaskScheduler>>
_tg_sche_map;
std::map<uint64_t, std::unique_ptr<vectorized::SimplifiedScanScheduler>>
_tg_scan_sche_map;
std::map<uint64_t, std::unique_ptr<CgroupCpuCtl>> _cgroup_ctl_map;
+
+ std::shared_mutex _init_cg_ctl_lock;
+ std::unique_ptr<CgroupCpuCtl> _cg_cpu_ctl;
+ bool _is_init_succ = false;
};
} // namespace taskgroup
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]