This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch branch-4.0
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-4.0 by this push:
new cb0746dbab1 branch-4.0: [pipeline](conf) make blocking scheduler
configurable #57354 (#57392)
cb0746dbab1 is described below
commit cb0746dbab1f651bb8fbdaddbb99abdbb9612629
Author: github-actions[bot]
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Tue Oct 28 14:39:20 2025 +0800
branch-4.0: [pipeline](conf) make blocking scheduler configurable #57354
(#57392)
Cherry-picked from #57354
Co-authored-by: Gabriel <[email protected]>
---
be/src/common/config.cpp | 1 +
be/src/common/config.h | 1 +
be/src/pipeline/task_scheduler.h | 7 ++++---
be/src/runtime/workload_group/workload_group.cpp | 7 +++++++
be/src/runtime/workload_group/workload_group.h | 1 +
5 files changed, 14 insertions(+), 3 deletions(-)
diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp
index 65b245dabe6..5e7871814fb 100644
--- a/be/src/common/config.cpp
+++ b/be/src/common/config.cpp
@@ -1091,6 +1091,7 @@ DEFINE_Bool(enable_graceful_exit_check, "false");
DEFINE_Bool(enable_debug_points, "false");
DEFINE_Int32(pipeline_executor_size, "0");
+DEFINE_Int32(blocking_pipeline_executor_size, "0");
DEFINE_Bool(enable_workload_group_for_scan, "false");
DEFINE_mInt64(workload_group_scan_task_wait_timeout_ms, "10000");
diff --git a/be/src/common/config.h b/be/src/common/config.h
index d8aa651fc68..4d3b1a73420 100644
--- a/be/src/common/config.h
+++ b/be/src/common/config.h
@@ -1122,6 +1122,7 @@ DECLARE_Bool(enable_graceful_exit_check);
DECLARE_Bool(enable_debug_points);
DECLARE_Int32(pipeline_executor_size);
+DECLARE_Int32(blocking_pipeline_executor_size);
// block file cache
DECLARE_Bool(enable_file_cache);
diff --git a/be/src/pipeline/task_scheduler.h b/be/src/pipeline/task_scheduler.h
index 62b1b644c90..269116b5e4f 100644
--- a/be/src/pipeline/task_scheduler.h
+++ b/be/src/pipeline/task_scheduler.h
@@ -80,10 +80,11 @@ private:
class HybridTaskScheduler MOCK_REMOVE(final) : public TaskScheduler {
public:
- HybridTaskScheduler(int core_num, std::string name,
+ HybridTaskScheduler(int exec_thread_num, int blocking_exec_thread_num,
std::string name,
std::shared_ptr<CgroupCpuCtl> cgroup_cpu_ctl)
- : _blocking_scheduler(core_num * 2, name + "_blocking_scheduler",
cgroup_cpu_ctl),
- _simple_scheduler(core_num, name + "_simple_scheduler",
cgroup_cpu_ctl) {}
+ : _blocking_scheduler(blocking_exec_thread_num, name +
"_blocking_scheduler",
+ cgroup_cpu_ctl),
+ _simple_scheduler(exec_thread_num, name + "_simple_scheduler",
cgroup_cpu_ctl) {}
Status submit(PipelineTaskSPtr task) override;
diff --git a/be/src/runtime/workload_group/workload_group.cpp
b/be/src/runtime/workload_group/workload_group.cpp
index b7bf59a80a3..bcb1daf818e 100644
--- a/be/src/runtime/workload_group/workload_group.cpp
+++ b/be/src/runtime/workload_group/workload_group.cpp
@@ -414,6 +414,10 @@ WorkloadGroupInfo WorkloadGroupInfo::parse_topic_info(
if (exec_thread_num <= 0) {
exec_thread_num = CpuInfo::num_cores();
}
+ int blocking_exec_thread_num = config::blocking_pipeline_executor_size;
+ if (blocking_exec_thread_num <= 0) {
+ blocking_exec_thread_num = CpuInfo::num_cores() * 2;
+ }
int num_disk = 1;
int num_cpus = 1;
@@ -484,6 +488,7 @@ WorkloadGroupInfo WorkloadGroupInfo::parse_topic_info(
.total_query_slot_count = total_query_slot_count,
.slot_mem_policy = slot_mem_policy,
.pipeline_exec_thread_num = exec_thread_num,
+ .blocking_pipeline_exec_thread_num = blocking_exec_thread_num,
.max_flush_thread_num = max_flush_thread_num,
.min_flush_thread_num = min_flush_thread_num};
}
@@ -522,6 +527,7 @@ Status
WorkloadGroup::upsert_thread_pool_no_lock(WorkloadGroupInfo* wg_info,
uint64_t wg_id = wg_info->id;
std::string wg_name = wg_info->name;
int pipeline_exec_thread_num = wg_info->pipeline_exec_thread_num;
+ int blocking_exec_thread_num = wg_info->blocking_pipeline_exec_thread_num;
int scan_thread_num = wg_info->scan_thread_num;
int max_remote_scan_thread_num = wg_info->max_remote_scan_thread_num;
int min_remote_scan_thread_num = wg_info->min_remote_scan_thread_num;
@@ -532,6 +538,7 @@ Status
WorkloadGroup::upsert_thread_pool_no_lock(WorkloadGroupInfo* wg_info,
if (_task_sched == nullptr) {
std::unique_ptr<pipeline::TaskScheduler> pipeline_task_scheduler =
std::make_unique<pipeline::HybridTaskScheduler>(pipeline_exec_thread_num,
+
blocking_exec_thread_num,
"p_" +
wg_name, cg_cpu_ctl_ptr);
Status ret = pipeline_task_scheduler->start();
if (ret.ok()) {
diff --git a/be/src/runtime/workload_group/workload_group.h
b/be/src/runtime/workload_group/workload_group.h
index 2677596b82b..cfdbaa8b4b8 100644
--- a/be/src/runtime/workload_group/workload_group.h
+++ b/be/src/runtime/workload_group/workload_group.h
@@ -285,6 +285,7 @@ struct WorkloadGroupInfo {
int cgroup_cpu_hard_limit = 0;
const bool valid = true;
const int pipeline_exec_thread_num = 0;
+ const int blocking_pipeline_exec_thread_num = 0;
const int max_flush_thread_num = 0;
const int min_flush_thread_num = 0;
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]