yiguolei commented on code in PR #50817:
URL: https://github.com/apache/doris/pull/50817#discussion_r2115446406
##########
be/src/runtime/workload_group/workload_group.cpp:
##########
@@ -486,99 +506,94 @@ void WorkloadGroup::create_cgroup_cpu_ctl_no_lock() {
}
}
-void WorkloadGroup::upsert_thread_pool_no_lock(WorkloadGroupInfo* wg_info,
- std::shared_ptr<CgroupCpuCtl>
cg_cpu_ctl_ptr) {
+Status WorkloadGroup::upsert_thread_pool_no_lock(WorkloadGroupInfo* wg_info,
+ std::shared_ptr<CgroupCpuCtl>
cg_cpu_ctl_ptr) {
+ Status upsert_ret = Status::OK();
uint64_t wg_id = wg_info->id;
std::string wg_name = wg_info->name;
+ int exec_thread_num = wg_info->exec_thread_num;
int scan_thread_num = wg_info->scan_thread_num;
int max_remote_scan_thread_num = wg_info->max_remote_scan_thread_num;
int min_remote_scan_thread_num = wg_info->min_remote_scan_thread_num;
+ int max_flush_thread_num = wg_info->max_flush_thread_num;
+ int min_flush_thread_num = wg_info->min_flush_thread_num;
+
+ // 1 create thread pool
if (_task_sched == nullptr) {
- int32_t executors_size = config::pipeline_executor_size;
- if (executors_size <= 0) {
- executors_size = CpuInfo::num_cores();
- }
std::unique_ptr<pipeline::TaskScheduler> pipeline_task_scheduler =
- std::make_unique<pipeline::TaskScheduler>(executors_size,
"Pipe_" + wg_name,
+ std::make_unique<pipeline::TaskScheduler>(exec_thread_num,
"pipe_" + wg_name,
cg_cpu_ctl_ptr);
Status ret = pipeline_task_scheduler->start();
if (ret.ok()) {
_task_sched = std::move(pipeline_task_scheduler);
} else {
+ upsert_ret = ret;
LOG(INFO) << "[upsert wg thread pool] task scheduler start failed,
gid= " << wg_id;
}
}
if (_scan_task_sched == nullptr) {
std::unique_ptr<vectorized::SimplifiedScanScheduler> scan_scheduler =
- std::make_unique<vectorized::SimplifiedScanScheduler>("Scan_"
+ wg_name,
+ std::make_unique<vectorized::SimplifiedScanScheduler>("lscan_"
+ wg_name,
cg_cpu_ctl_ptr, wg_name);
- Status ret =
scan_scheduler->start(config::doris_scanner_thread_pool_thread_num,
-
config::doris_scanner_thread_pool_thread_num,
+ Status ret = scan_scheduler->start(scan_thread_num, scan_thread_num,
config::doris_scanner_thread_pool_queue_size);
if (ret.ok()) {
_scan_task_sched = std::move(scan_scheduler);
} else {
+ upsert_ret = ret;
LOG(INFO) << "[upsert wg thread pool] scan scheduler start failed,
gid=" << wg_id;
}
}
- if (scan_thread_num > 0 && _scan_task_sched) {
- _scan_task_sched->reset_thread_num(scan_thread_num, scan_thread_num);
- }
if (_remote_scan_task_sched == nullptr) {
- int remote_max_thread_num =
vectorized::ScannerScheduler::get_remote_scan_thread_num();
int remote_scan_thread_queue_size =
vectorized::ScannerScheduler::get_remote_scan_thread_queue_size();
std::unique_ptr<vectorized::SimplifiedScanScheduler>
remote_scan_scheduler =
- std::make_unique<vectorized::SimplifiedScanScheduler>("RScan_"
+ wg_name,
+ std::make_unique<vectorized::SimplifiedScanScheduler>("rscan_"
+ wg_name,
cg_cpu_ctl_ptr, wg_name);
- Status ret = remote_scan_scheduler->start(remote_max_thread_num,
-
config::doris_scanner_min_thread_pool_thread_num,
-
remote_scan_thread_queue_size);
+ Status ret =
+ remote_scan_scheduler->start(max_remote_scan_thread_num,
min_remote_scan_thread_num,
+ remote_scan_thread_queue_size);
if (ret.ok()) {
_remote_scan_task_sched = std::move(remote_scan_scheduler);
} else {
+ upsert_ret = ret;
LOG(INFO) << "[upsert wg thread pool] remote scan scheduler start
failed, gid="
<< wg_id;
}
}
+
+ if (_memtable_flush_pool == nullptr) {
+ std::unique_ptr<ThreadPool> thread_pool = nullptr;
+ std::string pool_name = "flush_" + wg_name;
Review Comment:
名字太长了,top的时候看不全了
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]