This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch branch-4.0
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-4.0 by this push:
new e50161775d6 branch-4.0: [bugfix](be) should not use cpu init in
config's valadator #58669 (#58693)
e50161775d6 is described below
commit e50161775d640120b6e5554b5ae0307a1716e5db
Author: github-actions[bot]
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Thu Dec 4 13:59:32 2025 +0800
branch-4.0: [bugfix](be) should not use cpu init in config's valadator
#58669 (#58693)
Cherry-picked from #58669
Co-authored-by: yiguolei <[email protected]>
---
be/src/common/config.cpp | 38 ++------------------
.../segment_v2/ann_index/faiss_ann_index.cpp | 13 +++++--
be/src/pipeline/exec/file_scan_operator.cpp | 6 ++--
be/src/runtime/workload_group/workload_group.cpp | 24 +++++++------
be/src/util/async_io.h | 9 +++--
be/src/vec/exec/scan/scanner_context.cpp | 4 ++-
be/src/vec/exec/scan/scanner_scheduler.cpp | 31 +++++++++++-----
be/src/vec/exec/scan/scanner_scheduler.h | 18 +++++++---
be/test/common/config_validator_test.cpp | 42 ----------------------
9 files changed, 75 insertions(+), 110 deletions(-)
diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp
index 666676e7077..d5fdcc4d12b 100644
--- a/be/src/common/config.cpp
+++ b/be/src/common/config.cpp
@@ -301,21 +301,10 @@ DEFINE_mInt32(pipeline_task_exec_time_slice, "100");
DEFINE_Int32(task_executor_min_concurrency_per_task, "1");
// task executor max concurrency per task
DEFINE_Int32(task_executor_max_concurrency_per_task, "-1");
-DEFINE_Validator(task_executor_max_concurrency_per_task, [](const int config)
-> bool {
- if (config == -1) {
- task_executor_max_concurrency_per_task =
std::numeric_limits<int>::max();
- }
- return true;
-});
+
// task task executor inital split max concurrency per task, later concurrency
may be adjusted dynamically
DEFINE_Int32(task_executor_initial_max_concurrency_per_task, "-1");
-DEFINE_Validator(task_executor_initial_max_concurrency_per_task, [](const int
config) -> bool {
- if (config == -1) {
- CpuInfo::init();
- task_executor_initial_max_concurrency_per_task = std::max(48,
CpuInfo::num_cores() * 2);
- }
- return true;
-});
+
// Enable task executor in internal table scan.
DEFINE_Bool(enable_task_executor_in_internal_table, "false");
// Enable task executor in external table scan.
@@ -324,13 +313,7 @@ DEFINE_Bool(enable_task_executor_in_external_table,
"true");
// number of scanner thread pool size for olap table
// and the min thread num of remote scanner thread pool
DEFINE_Int32(doris_scanner_thread_pool_thread_num, "-1");
-DEFINE_Validator(doris_scanner_thread_pool_thread_num, [](const int config) ->
bool {
- if (config == -1) {
- CpuInfo::init();
- doris_scanner_thread_pool_thread_num = std::max(48,
CpuInfo::num_cores() * 2);
- }
- return true;
-});
+
DEFINE_Int32(doris_scanner_min_thread_pool_thread_num, "8");
DEFINE_Int32(remote_split_source_batch_size, "1000");
DEFINE_Int32(doris_max_remote_scanner_thread_pool_thread_num, "-1");
@@ -372,21 +355,6 @@ DEFINE_mString(broken_storage_path, "");
DEFINE_Int32(min_active_scan_threads, "-1");
DEFINE_Int32(min_active_file_scan_threads, "-1");
-DEFINE_Validator(min_active_scan_threads, [](const int config) -> bool {
- if (config == -1) {
- CpuInfo::init();
- min_active_scan_threads = CpuInfo::num_cores() * 2;
- }
- return true;
-});
-DEFINE_Validator(min_active_file_scan_threads, [](const int config) -> bool {
- if (config == -1) {
- CpuInfo::init();
- min_active_file_scan_threads = CpuInfo::num_cores() * 8;
- }
- return true;
-});
-
// Config is used to check incompatible old format hdr_ format
// whether doris uses strict way. When config is true, process will log fatal
// and exit. When config is false, process will only log warning.
diff --git a/be/src/olap/rowset/segment_v2/ann_index/faiss_ann_index.cpp
b/be/src/olap/rowset/segment_v2/ann_index/faiss_ann_index.cpp
index f01a9c23c78..cf954e0ffc0 100644
--- a/be/src/olap/rowset/segment_v2/ann_index/faiss_ann_index.cpp
+++ b/be/src/olap/rowset/segment_v2/ann_index/faiss_ann_index.cpp
@@ -78,7 +78,7 @@ public:
omp_set_num_threads(_reserved_threads);
VLOG_DEBUG << fmt::format(
"ScopedOmpThreadBudget reserve threads reserved={}, in_use={},
limit={}",
- _reserved_threads, g_index_threads_in_use,
config::omp_threads_limit);
+ _reserved_threads, g_index_threads_in_use,
get_omp_threads_limit());
}
~ScopedOmpThreadBudget() {
@@ -90,7 +90,16 @@ public:
}
VLOG_DEBUG << fmt::format(
"ScopedOmpThreadBudget release threads reserved={},
remaining_in_use={}, limit={}",
- _reserved_threads, g_index_threads_in_use,
config::omp_threads_limit);
+ _reserved_threads, g_index_threads_in_use,
get_omp_threads_limit());
+ }
+
+ static int get_omp_threads_limit() {
+ if (config::omp_threads_limit > 0) {
+ return config::omp_threads_limit;
+ }
+ int core_cap = std::max(1, CpuInfo::num_cores());
+ // Use at most 80% of the available CPU cores.
+ return std::max(1, core_cap * 4 / 5);
}
private:
diff --git a/be/src/pipeline/exec/file_scan_operator.cpp
b/be/src/pipeline/exec/file_scan_operator.cpp
index 6f943d464aa..dd8ad3587d3 100644
--- a/be/src/pipeline/exec/file_scan_operator.cpp
+++ b/be/src/pipeline/exec/file_scan_operator.cpp
@@ -83,7 +83,7 @@ Status
FileScanLocalState::_init_scanners(std::list<vectorized::ScannerSPtr>* sc
auto& p = _parent->cast<FileScanOperatorX>();
// There's only one scan range for each backend in batch split mode. Each
backend only starts up one ScanNode instance.
uint32_t shard_num = std::min(
- vectorized::ScannerScheduler::get_remote_scan_thread_num() /
p.parallelism(state()),
+ vectorized::ScannerScheduler::default_remote_scan_thread_num() /
p.parallelism(state()),
_max_scanners);
shard_num = std::max(shard_num, 1U);
_kv_cache.reset(new vectorized::ShardedKVCache(shard_num));
@@ -108,8 +108,8 @@ void FileScanLocalState::set_scan_ranges(RuntimeState*
state,
auto& p = _parent->cast<FileScanOperatorX>();
auto calc_max_scanners = [&](int parallel_instance_num) -> int {
- int max_scanners =
- vectorized::ScannerScheduler::get_remote_scan_thread_num() /
parallel_instance_num;
+ int max_scanners =
vectorized::ScannerScheduler::default_remote_scan_thread_num() /
+ parallel_instance_num;
if (should_run_serial()) {
max_scanners = 1;
}
diff --git a/be/src/runtime/workload_group/workload_group.cpp
b/be/src/runtime/workload_group/workload_group.cpp
index 6c9c2255908..6ab5c82806b 100644
--- a/be/src/runtime/workload_group/workload_group.cpp
+++ b/be/src/runtime/workload_group/workload_group.cpp
@@ -389,13 +389,13 @@ WorkloadGroupInfo WorkloadGroupInfo::parse_topic_info(
}
// 9 scan thread num
- int scan_thread_num = config::doris_scanner_thread_pool_thread_num;
+ int scan_thread_num =
vectorized::ScannerScheduler::default_local_scan_thread_num();
if (tworkload_group_info.__isset.scan_thread_num &&
tworkload_group_info.scan_thread_num > 0) {
scan_thread_num = tworkload_group_info.scan_thread_num;
}
// 10 max remote scan thread num
- int max_remote_scan_thread_num =
vectorized::ScannerScheduler::get_remote_scan_thread_num();
+ int max_remote_scan_thread_num =
vectorized::ScannerScheduler::default_remote_scan_thread_num();
if (tworkload_group_info.__isset.max_remote_scan_thread_num &&
tworkload_group_info.max_remote_scan_thread_num > 0) {
max_remote_scan_thread_num =
tworkload_group_info.max_remote_scan_thread_num;
@@ -559,9 +559,9 @@ Status
WorkloadGroup::upsert_thread_pool_no_lock(WorkloadGroupInfo* wg_info,
"ls_" + wg_name, cg_cpu_ctl_ptr, wg_name);
}
- Status ret = scan_scheduler->start(scan_thread_num, scan_thread_num,
-
config::doris_scanner_thread_pool_queue_size,
- config::min_active_scan_threads);
+ Status ret = scan_scheduler->start(
+ scan_thread_num, scan_thread_num,
config::doris_scanner_thread_pool_queue_size,
+
vectorized::ScannerScheduler::default_min_active_scan_threads());
if (ret.ok()) {
_scan_task_sched = std::move(scan_scheduler);
} else {
@@ -584,7 +584,8 @@ Status
WorkloadGroup::upsert_thread_pool_no_lock(WorkloadGroupInfo* wg_info,
}
Status ret = remote_scan_scheduler->start(
max_remote_scan_thread_num, min_remote_scan_thread_num,
- remote_scan_thread_queue_size,
config::min_active_file_scan_threads);
+ remote_scan_thread_queue_size,
+
vectorized::ScannerScheduler::default_min_active_file_scan_threads());
if (ret.ok()) {
_remote_scan_task_sched = std::move(remote_scan_scheduler);
} else {
@@ -615,14 +616,15 @@ Status
WorkloadGroup::upsert_thread_pool_no_lock(WorkloadGroupInfo* wg_info,
// 2 update thread pool
if (scan_thread_num > 0 && _scan_task_sched) {
- _scan_task_sched->reset_thread_num(scan_thread_num, scan_thread_num,
- config::min_active_scan_threads);
+ _scan_task_sched->reset_thread_num(
+ scan_thread_num, scan_thread_num,
+
vectorized::ScannerScheduler::default_min_active_scan_threads());
}
if (max_remote_scan_thread_num >= min_remote_scan_thread_num &&
_remote_scan_task_sched) {
- _remote_scan_task_sched->reset_thread_num(max_remote_scan_thread_num,
- min_remote_scan_thread_num,
-
config::min_active_file_scan_threads);
+ _remote_scan_task_sched->reset_thread_num(
+ max_remote_scan_thread_num, min_remote_scan_thread_num,
+
vectorized::ScannerScheduler::default_min_active_file_scan_threads());
}
return upsert_ret;
diff --git a/be/src/util/async_io.h b/be/src/util/async_io.h
index a848a628d25..624686716db 100644
--- a/be/src/util/async_io.h
+++ b/be/src/util/async_io.h
@@ -21,6 +21,7 @@
#include "io/fs/file_system.h"
#include "olap/olap_define.h"
+#include "util/cpu_info.h"
#include "work_thread_pool.hpp"
namespace doris {
@@ -35,9 +36,11 @@ struct AsyncIOCtx {
class AsyncIO {
public:
AsyncIO() {
- _io_thread_pool = new
PriorityThreadPool(config::doris_scanner_thread_pool_thread_num,
-
config::doris_scanner_thread_pool_queue_size,
- "async_io_thread_pool");
+ _io_thread_pool = new PriorityThreadPool(
+ config::doris_scanner_thread_pool_thread_num > 0
+ ? config::doris_scanner_thread_pool_thread_num
+ : std::max(48, CpuInfo::num_cores() * 2),
+ config::doris_scanner_thread_pool_queue_size,
"async_io_thread_pool");
_remote_thread_pool = new PriorityThreadPool(
config::doris_remote_scanner_thread_pool_thread_num,
config::doris_remote_scanner_thread_pool_queue_size,
"async_remote_thread_pool");
diff --git a/be/src/vec/exec/scan/scanner_context.cpp
b/be/src/vec/exec/scan/scanner_context.cpp
index c6c46223894..4aabf67c261 100644
--- a/be/src/vec/exec/scan/scanner_context.cpp
+++ b/be/src/vec/exec/scan/scanner_context.cpp
@@ -126,7 +126,9 @@ Status ScannerContext::init() {
vectorized::TaskId task_id(fmt::format("{}-{}",
print_id(_state->query_id()), ctx_id));
_task_handle = DORIS_TRY(task_executor->create_task(
task_id, []() { return 0.0; },
- config::task_executor_initial_max_concurrency_per_task,
+ config::task_executor_initial_max_concurrency_per_task > 0
+ ?
config::task_executor_initial_max_concurrency_per_task
+ : std::max(48, CpuInfo::num_cores() * 2),
std::chrono::milliseconds(100), std::nullopt));
}
#endif
diff --git a/be/src/vec/exec/scan/scanner_scheduler.cpp
b/be/src/vec/exec/scan/scanner_scheduler.cpp
index 0bd27114d72..f932ea420c6 100644
--- a/be/src/vec/exec/scan/scanner_scheduler.cpp
+++ b/be/src/vec/exec/scan/scanner_scheduler.cpp
@@ -334,21 +334,34 @@ void
ScannerScheduler::_scanner_scan(std::shared_ptr<ScannerContext> ctx,
ctx->push_back_scan_task(scan_task);
}
-
-int ScannerScheduler::get_remote_scan_thread_num() {
- static int remote_max_thread_num = []() {
- int num = config::doris_max_remote_scanner_thread_pool_thread_num != -1
- ?
config::doris_max_remote_scanner_thread_pool_thread_num
- : std::max(512, CpuInfo::num_cores() * 10);
- return std::max(num, config::doris_scanner_thread_pool_thread_num);
- }();
- return remote_max_thread_num;
+int ScannerScheduler::default_local_scan_thread_num() {
+ return config::doris_scanner_thread_pool_thread_num > 0
+ ? config::doris_scanner_thread_pool_thread_num
+ : std::max(48, CpuInfo::num_cores() * 2);
+}
+int ScannerScheduler::default_remote_scan_thread_num() {
+ int num = config::doris_max_remote_scanner_thread_pool_thread_num > 0
+ ? config::doris_max_remote_scanner_thread_pool_thread_num
+ : std::max(512, CpuInfo::num_cores() * 10);
+ return std::max(num, default_local_scan_thread_num());
}
int ScannerScheduler::get_remote_scan_thread_queue_size() {
return config::doris_remote_scanner_thread_pool_queue_size;
}
+int ScannerScheduler::default_min_active_scan_threads() {
+ return config::min_active_scan_threads > 0
+ ? config::min_active_scan_threads
+ : config::min_active_scan_threads = CpuInfo::num_cores() *
2;
+}
+
+int ScannerScheduler::default_min_active_file_scan_threads() {
+ return config::min_active_file_scan_threads > 0
+ ? config::min_active_file_scan_threads
+ : config::min_active_file_scan_threads =
CpuInfo::num_cores() * 8;
+}
+
void ScannerScheduler::_make_sure_virtual_col_is_materialized(
const std::shared_ptr<Scanner>& scanner, vectorized::Block*
free_block) {
#ifndef NDEBUG
diff --git a/be/src/vec/exec/scan/scanner_scheduler.h
b/be/src/vec/exec/scan/scanner_scheduler.h
index 26ed7f52811..21fa4aefa5c 100644
--- a/be/src/vec/exec/scan/scanner_scheduler.h
+++ b/be/src/vec/exec/scan/scanner_scheduler.h
@@ -108,10 +108,16 @@ public:
Status submit(std::shared_ptr<ScannerContext> ctx,
std::shared_ptr<ScanTask> scan_task);
- static int get_remote_scan_thread_num();
+ static int default_local_scan_thread_num();
+
+ static int default_remote_scan_thread_num();
static int get_remote_scan_thread_queue_size();
+ static int default_min_active_scan_threads();
+
+ static int default_min_active_file_scan_threads();
+
virtual Status start(int max_thread_num, int min_thread_num, int
queue_size,
int min_active_scan_threads) = 0;
virtual void stop() = 0;
@@ -276,8 +282,10 @@ public:
thread_config.cgroup_cpu_ctl = _cgroup_cpu_ctl;
_task_executor = TimeSharingTaskExecutor::create_shared(
thread_config, max_thread_num * 2,
config::task_executor_min_concurrency_per_task,
- config::task_executor_max_concurrency_per_task,
std::make_shared<SystemTicker>(),
- nullptr, false);
+ config::task_executor_max_concurrency_per_task > 0
+ ? config::task_executor_max_concurrency_per_task
+ : std::numeric_limits<int>::max(),
+ std::make_shared<SystemTicker>(), nullptr, false);
RETURN_IF_ERROR(_task_executor->init());
RETURN_IF_ERROR(_task_executor->start());
return Status::OK();
@@ -321,7 +329,9 @@ public:
vectorized::TaskId task_id(task_id_string);
std::shared_ptr<TaskHandle> task_handle =
DORIS_TRY(_task_executor->create_task(
task_id, []() { return 0.0; },
- config::task_executor_initial_max_concurrency_per_task,
+ config::task_executor_initial_max_concurrency_per_task > 0
+ ?
config::task_executor_initial_max_concurrency_per_task
+ : std::max(48, CpuInfo::num_cores() * 2),
std::chrono::milliseconds(100), std::nullopt));
auto wrapped_scan_func = [this, task_handle, scan_func =
scan_task.scan_func]() {
diff --git a/be/test/common/config_validator_test.cpp
b/be/test/common/config_validator_test.cpp
index ac105655c20..9f6e849ed3a 100644
--- a/be/test/common/config_validator_test.cpp
+++ b/be/test/common/config_validator_test.cpp
@@ -50,46 +50,4 @@ TEST(ConfigValidatorTest, Validator) {
EXPECT_EQ(cfg_validator_2, 8);
}
-// When a positive value is specified, validator should use it directly.
-TEST(ConfigValidatorTest, OmpThreadsLimitExplicitValue) {
- int32_t original_limit = config::omp_threads_limit;
-
- Status st = config::set_config("omp_threads_limit", "7", false, true);
- ASSERT_TRUE(st.ok());
- EXPECT_EQ(7, config::omp_threads_limit);
-
- config::omp_threads_limit = original_limit;
-}
-
-// When value is -1 and config::num_cores is set,
-// validator should pick 80% of that core count.
-TEST(ConfigValidatorTest, OmpThreadsLimitAutoUsesConfiguredNumCores) {
- int32_t original_limit = config::omp_threads_limit;
- int32_t original_num_cores = config::num_cores;
-
- config::num_cores = 20;
- Status st = config::set_config("omp_threads_limit", "-1", false, true);
- ASSERT_TRUE(st.ok());
- EXPECT_EQ(16, config::omp_threads_limit);
-
- config::num_cores = original_num_cores;
- config::omp_threads_limit = original_limit;
-}
-
-// When value is -1 and config::num_cores is 0,
-// validator should fall back to CpuInfo::num_cores().
-TEST(ConfigValidatorTest, OmpThreadsLimitAutoFallsBackToCpuInfo) {
- int32_t original_limit = config::omp_threads_limit;
- int32_t original_num_cores = config::num_cores;
-
- config::num_cores = 0;
- Status st = config::set_config("omp_threads_limit", "-1", false, true);
- ASSERT_TRUE(st.ok());
- CpuInfo::init();
- int expected = std::max(1, CpuInfo::num_cores() * 4 / 5);
- EXPECT_EQ(expected, config::omp_threads_limit);
-
- config::num_cores = original_num_cores;
- config::omp_threads_limit = original_limit;
-}
} // namespace doris
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]