This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch branch-4.0
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-4.0 by this push:
     new e50161775d6 branch-4.0: [bugfix](be) should not use cpu init in 
config's valadator #58669 (#58693)
e50161775d6 is described below

commit e50161775d640120b6e5554b5ae0307a1716e5db
Author: github-actions[bot] 
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Thu Dec 4 13:59:32 2025 +0800

    branch-4.0: [bugfix](be) should not use cpu init in config's valadator 
#58669 (#58693)
    
    Cherry-picked from #58669
    
    Co-authored-by: yiguolei <[email protected]>
---
 be/src/common/config.cpp                           | 38 ++------------------
 .../segment_v2/ann_index/faiss_ann_index.cpp       | 13 +++++--
 be/src/pipeline/exec/file_scan_operator.cpp        |  6 ++--
 be/src/runtime/workload_group/workload_group.cpp   | 24 +++++++------
 be/src/util/async_io.h                             |  9 +++--
 be/src/vec/exec/scan/scanner_context.cpp           |  4 ++-
 be/src/vec/exec/scan/scanner_scheduler.cpp         | 31 +++++++++++-----
 be/src/vec/exec/scan/scanner_scheduler.h           | 18 +++++++---
 be/test/common/config_validator_test.cpp           | 42 ----------------------
 9 files changed, 75 insertions(+), 110 deletions(-)

diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp
index 666676e7077..d5fdcc4d12b 100644
--- a/be/src/common/config.cpp
+++ b/be/src/common/config.cpp
@@ -301,21 +301,10 @@ DEFINE_mInt32(pipeline_task_exec_time_slice, "100");
 DEFINE_Int32(task_executor_min_concurrency_per_task, "1");
 // task executor max concurrency per task
 DEFINE_Int32(task_executor_max_concurrency_per_task, "-1");
-DEFINE_Validator(task_executor_max_concurrency_per_task, [](const int config) 
-> bool {
-    if (config == -1) {
-        task_executor_max_concurrency_per_task = 
std::numeric_limits<int>::max();
-    }
-    return true;
-});
+
 // task task executor inital split max concurrency per task, later concurrency 
may be adjusted dynamically
 DEFINE_Int32(task_executor_initial_max_concurrency_per_task, "-1");
-DEFINE_Validator(task_executor_initial_max_concurrency_per_task, [](const int 
config) -> bool {
-    if (config == -1) {
-        CpuInfo::init();
-        task_executor_initial_max_concurrency_per_task = std::max(48, 
CpuInfo::num_cores() * 2);
-    }
-    return true;
-});
+
 // Enable task executor in internal table scan.
 DEFINE_Bool(enable_task_executor_in_internal_table, "false");
 // Enable task executor in external table scan.
@@ -324,13 +313,7 @@ DEFINE_Bool(enable_task_executor_in_external_table, 
"true");
 // number of scanner thread pool size for olap table
 // and the min thread num of remote scanner thread pool
 DEFINE_Int32(doris_scanner_thread_pool_thread_num, "-1");
-DEFINE_Validator(doris_scanner_thread_pool_thread_num, [](const int config) -> 
bool {
-    if (config == -1) {
-        CpuInfo::init();
-        doris_scanner_thread_pool_thread_num = std::max(48, 
CpuInfo::num_cores() * 2);
-    }
-    return true;
-});
+
 DEFINE_Int32(doris_scanner_min_thread_pool_thread_num, "8");
 DEFINE_Int32(remote_split_source_batch_size, "1000");
 DEFINE_Int32(doris_max_remote_scanner_thread_pool_thread_num, "-1");
@@ -372,21 +355,6 @@ DEFINE_mString(broken_storage_path, "");
 DEFINE_Int32(min_active_scan_threads, "-1");
 DEFINE_Int32(min_active_file_scan_threads, "-1");
 
-DEFINE_Validator(min_active_scan_threads, [](const int config) -> bool {
-    if (config == -1) {
-        CpuInfo::init();
-        min_active_scan_threads = CpuInfo::num_cores() * 2;
-    }
-    return true;
-});
-DEFINE_Validator(min_active_file_scan_threads, [](const int config) -> bool {
-    if (config == -1) {
-        CpuInfo::init();
-        min_active_file_scan_threads = CpuInfo::num_cores() * 8;
-    }
-    return true;
-});
-
 // Config is used to check incompatible old format hdr_ format
 // whether doris uses strict way. When config is true, process will log fatal
 // and exit. When config is false, process will only log warning.
diff --git a/be/src/olap/rowset/segment_v2/ann_index/faiss_ann_index.cpp 
b/be/src/olap/rowset/segment_v2/ann_index/faiss_ann_index.cpp
index f01a9c23c78..cf954e0ffc0 100644
--- a/be/src/olap/rowset/segment_v2/ann_index/faiss_ann_index.cpp
+++ b/be/src/olap/rowset/segment_v2/ann_index/faiss_ann_index.cpp
@@ -78,7 +78,7 @@ public:
         omp_set_num_threads(_reserved_threads);
         VLOG_DEBUG << fmt::format(
                 "ScopedOmpThreadBudget reserve threads reserved={}, in_use={}, 
limit={}",
-                _reserved_threads, g_index_threads_in_use, 
config::omp_threads_limit);
+                _reserved_threads, g_index_threads_in_use, 
get_omp_threads_limit());
     }
 
     ~ScopedOmpThreadBudget() {
@@ -90,7 +90,16 @@ public:
         }
         VLOG_DEBUG << fmt::format(
                 "ScopedOmpThreadBudget release threads reserved={}, 
remaining_in_use={}, limit={}",
-                _reserved_threads, g_index_threads_in_use, 
config::omp_threads_limit);
+                _reserved_threads, g_index_threads_in_use, 
get_omp_threads_limit());
+    }
+
+    static int get_omp_threads_limit() {
+        if (config::omp_threads_limit > 0) {
+            return config::omp_threads_limit;
+        }
+        int core_cap = std::max(1, CpuInfo::num_cores());
+        // Use at most 80% of the available CPU cores.
+        return std::max(1, core_cap * 4 / 5);
     }
 
 private:
diff --git a/be/src/pipeline/exec/file_scan_operator.cpp 
b/be/src/pipeline/exec/file_scan_operator.cpp
index 6f943d464aa..dd8ad3587d3 100644
--- a/be/src/pipeline/exec/file_scan_operator.cpp
+++ b/be/src/pipeline/exec/file_scan_operator.cpp
@@ -83,7 +83,7 @@ Status 
FileScanLocalState::_init_scanners(std::list<vectorized::ScannerSPtr>* sc
     auto& p = _parent->cast<FileScanOperatorX>();
     // There's only one scan range for each backend in batch split mode. Each 
backend only starts up one ScanNode instance.
     uint32_t shard_num = std::min(
-            vectorized::ScannerScheduler::get_remote_scan_thread_num() / 
p.parallelism(state()),
+            vectorized::ScannerScheduler::default_remote_scan_thread_num() / 
p.parallelism(state()),
             _max_scanners);
     shard_num = std::max(shard_num, 1U);
     _kv_cache.reset(new vectorized::ShardedKVCache(shard_num));
@@ -108,8 +108,8 @@ void FileScanLocalState::set_scan_ranges(RuntimeState* 
state,
     auto& p = _parent->cast<FileScanOperatorX>();
 
     auto calc_max_scanners = [&](int parallel_instance_num) -> int {
-        int max_scanners =
-                vectorized::ScannerScheduler::get_remote_scan_thread_num() / 
parallel_instance_num;
+        int max_scanners = 
vectorized::ScannerScheduler::default_remote_scan_thread_num() /
+                           parallel_instance_num;
         if (should_run_serial()) {
             max_scanners = 1;
         }
diff --git a/be/src/runtime/workload_group/workload_group.cpp 
b/be/src/runtime/workload_group/workload_group.cpp
index 6c9c2255908..6ab5c82806b 100644
--- a/be/src/runtime/workload_group/workload_group.cpp
+++ b/be/src/runtime/workload_group/workload_group.cpp
@@ -389,13 +389,13 @@ WorkloadGroupInfo WorkloadGroupInfo::parse_topic_info(
     }
 
     // 9 scan thread num
-    int scan_thread_num = config::doris_scanner_thread_pool_thread_num;
+    int scan_thread_num = 
vectorized::ScannerScheduler::default_local_scan_thread_num();
     if (tworkload_group_info.__isset.scan_thread_num && 
tworkload_group_info.scan_thread_num > 0) {
         scan_thread_num = tworkload_group_info.scan_thread_num;
     }
 
     // 10 max remote scan thread num
-    int max_remote_scan_thread_num = 
vectorized::ScannerScheduler::get_remote_scan_thread_num();
+    int max_remote_scan_thread_num = 
vectorized::ScannerScheduler::default_remote_scan_thread_num();
     if (tworkload_group_info.__isset.max_remote_scan_thread_num &&
         tworkload_group_info.max_remote_scan_thread_num > 0) {
         max_remote_scan_thread_num = 
tworkload_group_info.max_remote_scan_thread_num;
@@ -559,9 +559,9 @@ Status 
WorkloadGroup::upsert_thread_pool_no_lock(WorkloadGroupInfo* wg_info,
                     "ls_" + wg_name, cg_cpu_ctl_ptr, wg_name);
         }
 
-        Status ret = scan_scheduler->start(scan_thread_num, scan_thread_num,
-                                           
config::doris_scanner_thread_pool_queue_size,
-                                           config::min_active_scan_threads);
+        Status ret = scan_scheduler->start(
+                scan_thread_num, scan_thread_num, 
config::doris_scanner_thread_pool_queue_size,
+                
vectorized::ScannerScheduler::default_min_active_scan_threads());
         if (ret.ok()) {
             _scan_task_sched = std::move(scan_scheduler);
         } else {
@@ -584,7 +584,8 @@ Status 
WorkloadGroup::upsert_thread_pool_no_lock(WorkloadGroupInfo* wg_info,
         }
         Status ret = remote_scan_scheduler->start(
                 max_remote_scan_thread_num, min_remote_scan_thread_num,
-                remote_scan_thread_queue_size, 
config::min_active_file_scan_threads);
+                remote_scan_thread_queue_size,
+                
vectorized::ScannerScheduler::default_min_active_file_scan_threads());
         if (ret.ok()) {
             _remote_scan_task_sched = std::move(remote_scan_scheduler);
         } else {
@@ -615,14 +616,15 @@ Status 
WorkloadGroup::upsert_thread_pool_no_lock(WorkloadGroupInfo* wg_info,
 
     // 2 update thread pool
     if (scan_thread_num > 0 && _scan_task_sched) {
-        _scan_task_sched->reset_thread_num(scan_thread_num, scan_thread_num,
-                                           config::min_active_scan_threads);
+        _scan_task_sched->reset_thread_num(
+                scan_thread_num, scan_thread_num,
+                
vectorized::ScannerScheduler::default_min_active_scan_threads());
     }
 
     if (max_remote_scan_thread_num >= min_remote_scan_thread_num && 
_remote_scan_task_sched) {
-        _remote_scan_task_sched->reset_thread_num(max_remote_scan_thread_num,
-                                                  min_remote_scan_thread_num,
-                                                  
config::min_active_file_scan_threads);
+        _remote_scan_task_sched->reset_thread_num(
+                max_remote_scan_thread_num, min_remote_scan_thread_num,
+                
vectorized::ScannerScheduler::default_min_active_file_scan_threads());
     }
 
     return upsert_ret;
diff --git a/be/src/util/async_io.h b/be/src/util/async_io.h
index a848a628d25..624686716db 100644
--- a/be/src/util/async_io.h
+++ b/be/src/util/async_io.h
@@ -21,6 +21,7 @@
 
 #include "io/fs/file_system.h"
 #include "olap/olap_define.h"
+#include "util/cpu_info.h"
 #include "work_thread_pool.hpp"
 
 namespace doris {
@@ -35,9 +36,11 @@ struct AsyncIOCtx {
 class AsyncIO {
 public:
     AsyncIO() {
-        _io_thread_pool = new 
PriorityThreadPool(config::doris_scanner_thread_pool_thread_num,
-                                                 
config::doris_scanner_thread_pool_queue_size,
-                                                 "async_io_thread_pool");
+        _io_thread_pool = new PriorityThreadPool(
+                config::doris_scanner_thread_pool_thread_num > 0
+                        ? config::doris_scanner_thread_pool_thread_num
+                        : std::max(48, CpuInfo::num_cores() * 2),
+                config::doris_scanner_thread_pool_queue_size, 
"async_io_thread_pool");
         _remote_thread_pool = new PriorityThreadPool(
                 config::doris_remote_scanner_thread_pool_thread_num,
                 config::doris_remote_scanner_thread_pool_queue_size, 
"async_remote_thread_pool");
diff --git a/be/src/vec/exec/scan/scanner_context.cpp 
b/be/src/vec/exec/scan/scanner_context.cpp
index c6c46223894..4aabf67c261 100644
--- a/be/src/vec/exec/scan/scanner_context.cpp
+++ b/be/src/vec/exec/scan/scanner_context.cpp
@@ -126,7 +126,9 @@ Status ScannerContext::init() {
         vectorized::TaskId task_id(fmt::format("{}-{}", 
print_id(_state->query_id()), ctx_id));
         _task_handle = DORIS_TRY(task_executor->create_task(
                 task_id, []() { return 0.0; },
-                config::task_executor_initial_max_concurrency_per_task,
+                config::task_executor_initial_max_concurrency_per_task > 0
+                        ? 
config::task_executor_initial_max_concurrency_per_task
+                        : std::max(48, CpuInfo::num_cores() * 2),
                 std::chrono::milliseconds(100), std::nullopt));
     }
 #endif
diff --git a/be/src/vec/exec/scan/scanner_scheduler.cpp 
b/be/src/vec/exec/scan/scanner_scheduler.cpp
index 0bd27114d72..f932ea420c6 100644
--- a/be/src/vec/exec/scan/scanner_scheduler.cpp
+++ b/be/src/vec/exec/scan/scanner_scheduler.cpp
@@ -334,21 +334,34 @@ void 
ScannerScheduler::_scanner_scan(std::shared_ptr<ScannerContext> ctx,
 
     ctx->push_back_scan_task(scan_task);
 }
-
-int ScannerScheduler::get_remote_scan_thread_num() {
-    static int remote_max_thread_num = []() {
-        int num = config::doris_max_remote_scanner_thread_pool_thread_num != -1
-                          ? 
config::doris_max_remote_scanner_thread_pool_thread_num
-                          : std::max(512, CpuInfo::num_cores() * 10);
-        return std::max(num, config::doris_scanner_thread_pool_thread_num);
-    }();
-    return remote_max_thread_num;
+int ScannerScheduler::default_local_scan_thread_num() {
+    return config::doris_scanner_thread_pool_thread_num > 0
+                   ? config::doris_scanner_thread_pool_thread_num
+                   : std::max(48, CpuInfo::num_cores() * 2);
+}
+int ScannerScheduler::default_remote_scan_thread_num() {
+    int num = config::doris_max_remote_scanner_thread_pool_thread_num > 0
+                      ? config::doris_max_remote_scanner_thread_pool_thread_num
+                      : std::max(512, CpuInfo::num_cores() * 10);
+    return std::max(num, default_local_scan_thread_num());
 }
 
 int ScannerScheduler::get_remote_scan_thread_queue_size() {
     return config::doris_remote_scanner_thread_pool_queue_size;
 }
 
+int ScannerScheduler::default_min_active_scan_threads() {
+    return config::min_active_scan_threads > 0
+                   ? config::min_active_scan_threads
+                   : config::min_active_scan_threads = CpuInfo::num_cores() * 
2;
+}
+
+int ScannerScheduler::default_min_active_file_scan_threads() {
+    return config::min_active_file_scan_threads > 0
+                   ? config::min_active_file_scan_threads
+                   : config::min_active_file_scan_threads = 
CpuInfo::num_cores() * 8;
+}
+
 void ScannerScheduler::_make_sure_virtual_col_is_materialized(
         const std::shared_ptr<Scanner>& scanner, vectorized::Block* 
free_block) {
 #ifndef NDEBUG
diff --git a/be/src/vec/exec/scan/scanner_scheduler.h 
b/be/src/vec/exec/scan/scanner_scheduler.h
index 26ed7f52811..21fa4aefa5c 100644
--- a/be/src/vec/exec/scan/scanner_scheduler.h
+++ b/be/src/vec/exec/scan/scanner_scheduler.h
@@ -108,10 +108,16 @@ public:
 
     Status submit(std::shared_ptr<ScannerContext> ctx, 
std::shared_ptr<ScanTask> scan_task);
 
-    static int get_remote_scan_thread_num();
+    static int default_local_scan_thread_num();
+
+    static int default_remote_scan_thread_num();
 
     static int get_remote_scan_thread_queue_size();
 
+    static int default_min_active_scan_threads();
+
+    static int default_min_active_file_scan_threads();
+
     virtual Status start(int max_thread_num, int min_thread_num, int 
queue_size,
                          int min_active_scan_threads) = 0;
     virtual void stop() = 0;
@@ -276,8 +282,10 @@ public:
         thread_config.cgroup_cpu_ctl = _cgroup_cpu_ctl;
         _task_executor = TimeSharingTaskExecutor::create_shared(
                 thread_config, max_thread_num * 2, 
config::task_executor_min_concurrency_per_task,
-                config::task_executor_max_concurrency_per_task, 
std::make_shared<SystemTicker>(),
-                nullptr, false);
+                config::task_executor_max_concurrency_per_task > 0
+                        ? config::task_executor_max_concurrency_per_task
+                        : std::numeric_limits<int>::max(),
+                std::make_shared<SystemTicker>(), nullptr, false);
         RETURN_IF_ERROR(_task_executor->init());
         RETURN_IF_ERROR(_task_executor->start());
         return Status::OK();
@@ -321,7 +329,9 @@ public:
             vectorized::TaskId task_id(task_id_string);
             std::shared_ptr<TaskHandle> task_handle = 
DORIS_TRY(_task_executor->create_task(
                     task_id, []() { return 0.0; },
-                    config::task_executor_initial_max_concurrency_per_task,
+                    config::task_executor_initial_max_concurrency_per_task > 0
+                            ? 
config::task_executor_initial_max_concurrency_per_task
+                            : std::max(48, CpuInfo::num_cores() * 2),
                     std::chrono::milliseconds(100), std::nullopt));
 
             auto wrapped_scan_func = [this, task_handle, scan_func = 
scan_task.scan_func]() {
diff --git a/be/test/common/config_validator_test.cpp 
b/be/test/common/config_validator_test.cpp
index ac105655c20..9f6e849ed3a 100644
--- a/be/test/common/config_validator_test.cpp
+++ b/be/test/common/config_validator_test.cpp
@@ -50,46 +50,4 @@ TEST(ConfigValidatorTest, Validator) {
     EXPECT_EQ(cfg_validator_2, 8);
 }
 
-// When a positive value is specified, validator should use it directly.
-TEST(ConfigValidatorTest, OmpThreadsLimitExplicitValue) {
-    int32_t original_limit = config::omp_threads_limit;
-
-    Status st = config::set_config("omp_threads_limit", "7", false, true);
-    ASSERT_TRUE(st.ok());
-    EXPECT_EQ(7, config::omp_threads_limit);
-
-    config::omp_threads_limit = original_limit;
-}
-
-// When value is -1 and config::num_cores is set,
-// validator should pick 80% of that core count.
-TEST(ConfigValidatorTest, OmpThreadsLimitAutoUsesConfiguredNumCores) {
-    int32_t original_limit = config::omp_threads_limit;
-    int32_t original_num_cores = config::num_cores;
-
-    config::num_cores = 20;
-    Status st = config::set_config("omp_threads_limit", "-1", false, true);
-    ASSERT_TRUE(st.ok());
-    EXPECT_EQ(16, config::omp_threads_limit);
-
-    config::num_cores = original_num_cores;
-    config::omp_threads_limit = original_limit;
-}
-
-// When value is -1 and config::num_cores is 0,
-// validator should fall back to CpuInfo::num_cores().
-TEST(ConfigValidatorTest, OmpThreadsLimitAutoFallsBackToCpuInfo) {
-    int32_t original_limit = config::omp_threads_limit;
-    int32_t original_num_cores = config::num_cores;
-
-    config::num_cores = 0;
-    Status st = config::set_config("omp_threads_limit", "-1", false, true);
-    ASSERT_TRUE(st.ok());
-    CpuInfo::init();
-    int expected = std::max(1, CpuInfo::num_cores() * 4 / 5);
-    EXPECT_EQ(expected, config::omp_threads_limit);
-
-    config::num_cores = original_num_cores;
-    config::omp_threads_limit = original_limit;
-}
 } // namespace doris


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to