github-actions[bot] commented on code in PR #31376:
URL: https://github.com/apache/doris/pull/31376#discussion_r1501751586
##########
be/src/runtime/task_group/task_group.cpp:
##########
@@ -266,8 +273,167 @@ Status TaskGroupInfo::parse_topic_info(const
TWorkloadGroupInfo& workload_group_
task_group_info->scan_thread_num = workload_group_info.scan_thread_num;
}
+ // 10 max remote scan thread num
+ task_group_info->max_remote_scan_thread_num =
config::doris_scanner_thread_pool_thread_num;
+ if (workload_group_info.__isset.max_remote_scan_thread_num &&
+ workload_group_info.max_remote_scan_thread_num > 0) {
+ task_group_info->max_remote_scan_thread_num =
+ workload_group_info.max_remote_scan_thread_num;
+ }
+
+ // 11 min remote scan thread num
+ task_group_info->min_remote_scan_thread_num =
config::doris_scanner_thread_pool_thread_num;
+ if (workload_group_info.__isset.min_remote_scan_thread_num &&
+ workload_group_info.min_remote_scan_thread_num > 0) {
+ task_group_info->min_remote_scan_thread_num =
+ workload_group_info.min_remote_scan_thread_num;
+ }
+
return Status::OK();
}
+void TaskGroup::upsert_task_scheduler(taskgroup::TaskGroupInfo* tg_info,
ExecEnv* exec_env) {
Review Comment:
warning: method 'upsert_task_scheduler' can be made static
[readability-convert-member-functions-to-static]
be/src/runtime/task_group/task_group.h:131:
```diff
- void upsert_task_scheduler(taskgroup::TaskGroupInfo* tg_info, ExecEnv*
exec_env);
+ static void upsert_task_scheduler(taskgroup::TaskGroupInfo* tg_info,
ExecEnv* exec_env);
```
##########
be/src/vec/exec/scan/scanner_scheduler.h:
##########
@@ -148,11 +148,33 @@
}
}
+ void reset_max_thread_num(int thread_num) {
+ int max_thread_num = _scan_thread_pool->max_threads();
+
+ if (max_thread_num != thread_num) {
+ Status st = _scan_thread_pool->set_max_threads(thread_num);
+ if (!st.ok()) {
+ LOG(INFO) << "reset max thread num failed, sche name=" <<
_sched_name;
+ }
+ }
+ }
+
+ void reset_min_thread_num(int thread_num) {
Review Comment:
warning: method 'reset_min_thread_num' can be made static
[readability-convert-member-functions-to-static]
```suggestion
static void reset_min_thread_num(int thread_num) {
```
##########
be/src/vec/exec/scan/scanner_scheduler.h:
##########
@@ -148,11 +148,33 @@ class SimplifiedScanScheduler {
}
}
+ void reset_max_thread_num(int thread_num) {
Review Comment:
warning: method 'reset_max_thread_num' can be made static
[readability-convert-member-functions-to-static]
```suggestion
static void reset_max_thread_num(int thread_num) {
```
##########
be/src/runtime/task_group/task_group.cpp:
##########
@@ -266,8 +273,167 @@
task_group_info->scan_thread_num = workload_group_info.scan_thread_num;
}
+ // 10 max remote scan thread num
+ task_group_info->max_remote_scan_thread_num =
config::doris_scanner_thread_pool_thread_num;
+ if (workload_group_info.__isset.max_remote_scan_thread_num &&
+ workload_group_info.max_remote_scan_thread_num > 0) {
+ task_group_info->max_remote_scan_thread_num =
+ workload_group_info.max_remote_scan_thread_num;
+ }
+
+ // 11 min remote scan thread num
+ task_group_info->min_remote_scan_thread_num =
config::doris_scanner_thread_pool_thread_num;
+ if (workload_group_info.__isset.min_remote_scan_thread_num &&
+ workload_group_info.min_remote_scan_thread_num > 0) {
+ task_group_info->min_remote_scan_thread_num =
+ workload_group_info.min_remote_scan_thread_num;
+ }
+
return Status::OK();
}
+void TaskGroup::upsert_task_scheduler(taskgroup::TaskGroupInfo* tg_info,
ExecEnv* exec_env) {
Review Comment:
warning: function 'upsert_task_scheduler' exceeds recommended
size/complexity thresholds [readability-function-size]
```cpp
void TaskGroup::upsert_task_scheduler(taskgroup::TaskGroupInfo* tg_info,
ExecEnv* exec_env) {
^
```
<details>
<summary>Additional context</summary>
**be/src/runtime/task_group/task_group.cpp:294:** 113 lines including
whitespace and comments (threshold 80)
```cpp
void TaskGroup::upsert_task_scheduler(taskgroup::TaskGroupInfo* tg_info,
ExecEnv* exec_env) {
^
```
</details>
##########
be/src/runtime/task_group/task_group.cpp:
##########
@@ -266,8 +273,167 @@
task_group_info->scan_thread_num = workload_group_info.scan_thread_num;
}
+ // 10 max remote scan thread num
+ task_group_info->max_remote_scan_thread_num =
config::doris_scanner_thread_pool_thread_num;
+ if (workload_group_info.__isset.max_remote_scan_thread_num &&
+ workload_group_info.max_remote_scan_thread_num > 0) {
+ task_group_info->max_remote_scan_thread_num =
+ workload_group_info.max_remote_scan_thread_num;
+ }
+
+ // 11 min remote scan thread num
+ task_group_info->min_remote_scan_thread_num =
config::doris_scanner_thread_pool_thread_num;
+ if (workload_group_info.__isset.min_remote_scan_thread_num &&
+ workload_group_info.min_remote_scan_thread_num > 0) {
+ task_group_info->min_remote_scan_thread_num =
+ workload_group_info.min_remote_scan_thread_num;
+ }
+
return Status::OK();
}
+void TaskGroup::upsert_task_scheduler(taskgroup::TaskGroupInfo* tg_info,
ExecEnv* exec_env) {
+ uint64_t tg_id = tg_info->id;
+ std::string tg_name = tg_info->name;
+ int cpu_hard_limit = tg_info->cpu_hard_limit;
+ uint64_t cpu_shares = tg_info->cpu_share;
+ bool enable_cpu_hard_limit = tg_info->enable_cpu_hard_limit;
+ int scan_thread_num = tg_info->scan_thread_num;
+ int max_remote_scan_thread_num = tg_info->max_remote_scan_thread_num;
+ int min_remote_scan_thread_num = tg_info->min_remote_scan_thread_num;
+
+ std::lock_guard<std::shared_mutex> wlock(_task_sched_lock);
+ if (config::doris_cgroup_cpu_path != "" && _cgroup_cpu_ctl == nullptr) {
+ std::unique_ptr<CgroupCpuCtl> cgroup_cpu_ctl =
std::make_unique<CgroupV1CpuCtl>(tg_id);
+ Status ret = cgroup_cpu_ctl->init();
+ if (ret.ok()) {
+ _cgroup_cpu_ctl = std::move(cgroup_cpu_ctl);
+ LOG(INFO) << "[upsert wg thread pool] cgroup init success";
+ } else {
+ LOG(INFO) << "[upsert wg thread pool] cgroup init failed, gid= "
<< tg_id
+ << ", reason=" << ret.to_string();
+ }
+ }
+
+ CgroupCpuCtl* cg_cpu_ctl_ptr = _cgroup_cpu_ctl.get();
+
+ if (_task_sched == nullptr) {
+ int32_t executors_size = config::pipeline_executor_size;
+ if (executors_size <= 0) {
+ executors_size = CpuInfo::num_cores();
+ }
+ auto task_queue =
std::make_shared<pipeline::MultiCoreTaskQueue>(executors_size);
+ std::unique_ptr<pipeline::TaskScheduler> pipeline_task_scheduler =
+ std::make_unique<pipeline::TaskScheduler>(
+ exec_env, exec_env->get_global_block_scheduler(),
std::move(task_queue),
+ "Exec_" + tg_name, cg_cpu_ctl_ptr);
+ Status ret = pipeline_task_scheduler->start();
+ if (ret.ok()) {
+ _task_sched = std::move(pipeline_task_scheduler);
+ } else {
+ LOG(INFO) << "[upsert wg thread pool] task scheduler start failed,
gid= " << tg_id;
+ }
+ }
+
+ if (_scan_task_sched == nullptr) {
+ std::unique_ptr<vectorized::SimplifiedScanScheduler> scan_scheduler =
+ std::make_unique<vectorized::SimplifiedScanScheduler>("Scan_"
+ tg_name,
+
cg_cpu_ctl_ptr);
+ Status ret = scan_scheduler->start();
+ if (ret.ok()) {
+ _scan_task_sched = std::move(scan_scheduler);
+ } else {
+ LOG(INFO) << "[upsert wg thread pool] scan scheduler start failed,
gid=" << tg_id;
+ }
+ }
+ if (scan_thread_num > 0 && _scan_task_sched) {
+ _scan_task_sched->reset_thread_num(scan_thread_num);
+ }
+
+ if (_remote_scan_task_sched == nullptr) {
+ std::unique_ptr<vectorized::SimplifiedScanScheduler>
remote_scan_scheduler =
+ std::make_unique<vectorized::SimplifiedScanScheduler>("RScan_"
+ tg_name,
+
cg_cpu_ctl_ptr);
+ Status ret = remote_scan_scheduler->start();
+ if (ret.ok()) {
+ _remote_scan_task_sched = std::move(remote_scan_scheduler);
+ } else {
+ LOG(INFO) << "[upsert wg thread pool] remote scan scheduler start
failed, gid="
+ << tg_id;
+ }
+ }
+ if (max_remote_scan_thread_num > 0 && _remote_scan_task_sched) {
+
_remote_scan_task_sched->reset_max_thread_num(max_remote_scan_thread_num);
+ }
+ if (min_remote_scan_thread_num > 0 && _remote_scan_task_sched) {
+
_remote_scan_task_sched->reset_min_thread_num(min_remote_scan_thread_num);
+ }
+
+ if (_non_pipe_thread_pool == nullptr) {
+ std::unique_ptr<ThreadPool> thread_pool = nullptr;
+ auto ret = ThreadPoolBuilder("nonPip_" + tg_name)
+ .set_min_threads(1)
+
.set_max_threads(config::fragment_pool_thread_num_max)
+
.set_max_queue_size(config::fragment_pool_queue_size)
+ .set_cgroup_cpu_ctl(cg_cpu_ctl_ptr)
+ .build(&thread_pool);
+ if (!ret.ok()) {
+ LOG(INFO) << "[upsert wg thread pool] create non-pipline thread
pool failed, gid="
+ << tg_id;
+ } else {
+ _non_pipe_thread_pool = std::move(thread_pool);
+ }
+ }
+
+ // step 6: update cgroup cpu if needed
+ if (_cgroup_cpu_ctl) {
+ if (enable_cpu_hard_limit) {
+ if (cpu_hard_limit > 0) {
+ _cgroup_cpu_ctl->update_cpu_hard_limit(cpu_hard_limit);
+
_cgroup_cpu_ctl->update_cpu_soft_limit(CPU_SOFT_LIMIT_DEFAULT_VALUE);
+ } else {
+ LOG(INFO) << "[upsert wg thread pool] enable cpu hard limit
but value is illegal: "
+ << cpu_hard_limit << ", gid=" << tg_id;
+ }
+ } else {
+ if (config::enable_cgroup_cpu_soft_limit) {
+ _cgroup_cpu_ctl->update_cpu_soft_limit(cpu_shares);
+ _cgroup_cpu_ctl->update_cpu_hard_limit(
+ CPU_HARD_LIMIT_DEFAULT_VALUE); // disable cpu hard
limit
+ }
+ }
+ _cgroup_cpu_ctl->get_cgroup_cpu_info(&(tg_info->cgroup_cpu_shares),
+
&(tg_info->cgroup_cpu_hard_limit));
+ }
+}
+
+void TaskGroup::get_query_scheduler(doris::pipeline::TaskScheduler**
exec_sched,
Review Comment:
warning: method 'get_query_scheduler' can be made static
[readability-convert-member-functions-to-static]
be/src/runtime/task_group/task_group.h:133:
```diff
- void get_query_scheduler(doris::pipeline::TaskScheduler** exec_sched,
+ static void get_query_scheduler(doris::pipeline::TaskScheduler**
exec_sched,
```
##########
be/src/runtime/task_group/task_group.cpp:
##########
@@ -266,8 +273,167 @@
task_group_info->scan_thread_num = workload_group_info.scan_thread_num;
}
+ // 10 max remote scan thread num
+ task_group_info->max_remote_scan_thread_num =
config::doris_scanner_thread_pool_thread_num;
+ if (workload_group_info.__isset.max_remote_scan_thread_num &&
+ workload_group_info.max_remote_scan_thread_num > 0) {
+ task_group_info->max_remote_scan_thread_num =
+ workload_group_info.max_remote_scan_thread_num;
+ }
+
+ // 11 min remote scan thread num
+ task_group_info->min_remote_scan_thread_num =
config::doris_scanner_thread_pool_thread_num;
+ if (workload_group_info.__isset.min_remote_scan_thread_num &&
+ workload_group_info.min_remote_scan_thread_num > 0) {
+ task_group_info->min_remote_scan_thread_num =
+ workload_group_info.min_remote_scan_thread_num;
+ }
+
return Status::OK();
}
+void TaskGroup::upsert_task_scheduler(taskgroup::TaskGroupInfo* tg_info,
ExecEnv* exec_env) {
+ uint64_t tg_id = tg_info->id;
+ std::string tg_name = tg_info->name;
+ int cpu_hard_limit = tg_info->cpu_hard_limit;
+ uint64_t cpu_shares = tg_info->cpu_share;
+ bool enable_cpu_hard_limit = tg_info->enable_cpu_hard_limit;
+ int scan_thread_num = tg_info->scan_thread_num;
+ int max_remote_scan_thread_num = tg_info->max_remote_scan_thread_num;
+ int min_remote_scan_thread_num = tg_info->min_remote_scan_thread_num;
+
+ std::lock_guard<std::shared_mutex> wlock(_task_sched_lock);
+ if (config::doris_cgroup_cpu_path != "" && _cgroup_cpu_ctl == nullptr) {
+ std::unique_ptr<CgroupCpuCtl> cgroup_cpu_ctl =
std::make_unique<CgroupV1CpuCtl>(tg_id);
+ Status ret = cgroup_cpu_ctl->init();
+ if (ret.ok()) {
+ _cgroup_cpu_ctl = std::move(cgroup_cpu_ctl);
+ LOG(INFO) << "[upsert wg thread pool] cgroup init success";
+ } else {
+ LOG(INFO) << "[upsert wg thread pool] cgroup init failed, gid= "
<< tg_id
+ << ", reason=" << ret.to_string();
+ }
+ }
+
+ CgroupCpuCtl* cg_cpu_ctl_ptr = _cgroup_cpu_ctl.get();
+
+ if (_task_sched == nullptr) {
+ int32_t executors_size = config::pipeline_executor_size;
+ if (executors_size <= 0) {
+ executors_size = CpuInfo::num_cores();
+ }
+ auto task_queue =
std::make_shared<pipeline::MultiCoreTaskQueue>(executors_size);
+ std::unique_ptr<pipeline::TaskScheduler> pipeline_task_scheduler =
+ std::make_unique<pipeline::TaskScheduler>(
+ exec_env, exec_env->get_global_block_scheduler(),
std::move(task_queue),
+ "Exec_" + tg_name, cg_cpu_ctl_ptr);
+ Status ret = pipeline_task_scheduler->start();
+ if (ret.ok()) {
+ _task_sched = std::move(pipeline_task_scheduler);
+ } else {
+ LOG(INFO) << "[upsert wg thread pool] task scheduler start failed,
gid= " << tg_id;
+ }
+ }
+
+ if (_scan_task_sched == nullptr) {
+ std::unique_ptr<vectorized::SimplifiedScanScheduler> scan_scheduler =
+ std::make_unique<vectorized::SimplifiedScanScheduler>("Scan_"
+ tg_name,
+
cg_cpu_ctl_ptr);
+ Status ret = scan_scheduler->start();
+ if (ret.ok()) {
+ _scan_task_sched = std::move(scan_scheduler);
+ } else {
+ LOG(INFO) << "[upsert wg thread pool] scan scheduler start failed,
gid=" << tg_id;
+ }
+ }
+ if (scan_thread_num > 0 && _scan_task_sched) {
+ _scan_task_sched->reset_thread_num(scan_thread_num);
+ }
+
+ if (_remote_scan_task_sched == nullptr) {
+ std::unique_ptr<vectorized::SimplifiedScanScheduler>
remote_scan_scheduler =
+ std::make_unique<vectorized::SimplifiedScanScheduler>("RScan_"
+ tg_name,
+
cg_cpu_ctl_ptr);
+ Status ret = remote_scan_scheduler->start();
+ if (ret.ok()) {
+ _remote_scan_task_sched = std::move(remote_scan_scheduler);
+ } else {
+ LOG(INFO) << "[upsert wg thread pool] remote scan scheduler start
failed, gid="
+ << tg_id;
+ }
+ }
+ if (max_remote_scan_thread_num > 0 && _remote_scan_task_sched) {
+
_remote_scan_task_sched->reset_max_thread_num(max_remote_scan_thread_num);
+ }
+ if (min_remote_scan_thread_num > 0 && _remote_scan_task_sched) {
+
_remote_scan_task_sched->reset_min_thread_num(min_remote_scan_thread_num);
+ }
+
+ if (_non_pipe_thread_pool == nullptr) {
+ std::unique_ptr<ThreadPool> thread_pool = nullptr;
+ auto ret = ThreadPoolBuilder("nonPip_" + tg_name)
+ .set_min_threads(1)
+
.set_max_threads(config::fragment_pool_thread_num_max)
+
.set_max_queue_size(config::fragment_pool_queue_size)
+ .set_cgroup_cpu_ctl(cg_cpu_ctl_ptr)
+ .build(&thread_pool);
+ if (!ret.ok()) {
+ LOG(INFO) << "[upsert wg thread pool] create non-pipline thread
pool failed, gid="
+ << tg_id;
+ } else {
+ _non_pipe_thread_pool = std::move(thread_pool);
+ }
+ }
+
+ // step 6: update cgroup cpu if needed
+ if (_cgroup_cpu_ctl) {
+ if (enable_cpu_hard_limit) {
+ if (cpu_hard_limit > 0) {
+ _cgroup_cpu_ctl->update_cpu_hard_limit(cpu_hard_limit);
+
_cgroup_cpu_ctl->update_cpu_soft_limit(CPU_SOFT_LIMIT_DEFAULT_VALUE);
+ } else {
+ LOG(INFO) << "[upsert wg thread pool] enable cpu hard limit
but value is illegal: "
+ << cpu_hard_limit << ", gid=" << tg_id;
+ }
+ } else {
+ if (config::enable_cgroup_cpu_soft_limit) {
+ _cgroup_cpu_ctl->update_cpu_soft_limit(cpu_shares);
+ _cgroup_cpu_ctl->update_cpu_hard_limit(
+ CPU_HARD_LIMIT_DEFAULT_VALUE); // disable cpu hard
limit
+ }
+ }
+ _cgroup_cpu_ctl->get_cgroup_cpu_info(&(tg_info->cgroup_cpu_shares),
+
&(tg_info->cgroup_cpu_hard_limit));
+ }
+}
+
+void TaskGroup::get_query_scheduler(doris::pipeline::TaskScheduler**
exec_sched,
+ vectorized::SimplifiedScanScheduler**
scan_sched,
+ ThreadPool** non_pipe_thread_pool,
+ vectorized::SimplifiedScanScheduler**
remote_scan_sched) {
+ std::shared_lock<std::shared_mutex> rlock(_task_sched_lock);
+ *exec_sched = _task_sched.get();
+ *scan_sched = _scan_task_sched.get();
+ *remote_scan_sched = _remote_scan_task_sched.get();
+ *non_pipe_thread_pool = _non_pipe_thread_pool.get();
+}
+
+void TaskGroup::try_stop_task_scheduler() {
Review Comment:
warning: method 'try_stop_task_scheduler' can be made static
[readability-convert-member-functions-to-static]
be/src/runtime/task_group/task_group.h:138:
```diff
- void try_stop_task_scheduler();
+ static void try_stop_task_scheduler();
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]