This is an automated email from the ASF dual-hosted git repository.
morrysnow pushed a commit to branch branch-3.1
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-3.1 by this push:
new baaee9f7ff4 [bugfix](config) do not use cpu info in validator and
using a new method to get config value (#58673)
baaee9f7ff4 is described below
commit baaee9f7ff4ab7b6df21518b047e5dce37180b1d
Author: yiguolei <[email protected]>
AuthorDate: Fri Dec 5 10:54:56 2025 +0800
[bugfix](config) do not use cpu info in validator and using a new method to
get config value (#58673)
---
be/src/common/config.cpp | 17 ++---------------
be/src/exec/olap_common.cpp | 10 ++++++++++
be/src/exec/olap_common.h | 6 ++++++
be/src/pipeline/exec/olap_scan_operator.cpp | 2 +-
be/src/runtime/workload_group/workload_group.cpp | 6 +++---
be/src/util/async_io.h | 7 ++++---
be/src/util/s3_util.cpp | 3 ++-
be/src/vec/exec/scan/scanner_context.cpp | 10 +++++-----
be/src/vec/exec/scan/scanner_scheduler.cpp | 20 +++++++++++---------
9 files changed, 44 insertions(+), 37 deletions(-)
diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp
index 571b1879c80..164a853a54a 100644
--- a/be/src/common/config.cpp
+++ b/be/src/common/config.cpp
@@ -303,13 +303,7 @@
DEFINE_mInt64(doris_blocking_priority_queue_wait_timeout_ms, "500");
// number of scanner thread pool size for olap table
// and the min thread num of remote scanner thread pool
DEFINE_Int32(doris_scanner_thread_pool_thread_num, "-1");
-DEFINE_Validator(doris_scanner_thread_pool_thread_num, [](const int config) ->
bool {
- if (config == -1) {
- CpuInfo::init();
- doris_scanner_thread_pool_thread_num = std::max(48,
CpuInfo::num_cores() * 2);
- }
- return true;
-});
+
DEFINE_Int32(doris_scanner_min_thread_pool_thread_num, "8");
DEFINE_Int32(remote_split_source_batch_size, "1000");
DEFINE_Int32(doris_max_remote_scanner_thread_pool_thread_num, "-1");
@@ -1377,14 +1371,7 @@ DEFINE_String(spill_storage_root_path, "");
DEFINE_String(spill_storage_limit, "20%"); // 20%
DEFINE_mInt32(spill_gc_interval_ms, "2000"); // 2s
DEFINE_mInt32(spill_gc_work_time_ms, "2000"); // 2s
-DEFINE_Int32(spill_io_thread_pool_thread_num, "-1");
-DEFINE_Validator(spill_io_thread_pool_thread_num, [](const int config) -> bool
{
- if (config == -1) {
- CpuInfo::init();
- spill_io_thread_pool_thread_num = std::max(48, CpuInfo::num_cores() *
2);
- }
- return true;
-});
+DEFINE_Int32(spill_io_thread_pool_thread_num, "32");
DEFINE_Int32(spill_io_thread_pool_queue_size, "102400");
DEFINE_mBool(check_segment_when_build_rowset_meta, "false");
diff --git a/be/src/exec/olap_common.cpp b/be/src/exec/olap_common.cpp
index 04ca8e5efcb..33b1100eff6 100644
--- a/be/src/exec/olap_common.cpp
+++ b/be/src/exec/olap_common.cpp
@@ -24,6 +24,16 @@
namespace doris {
+namespace config {
+namespace dynamic {
+int doris_scanner_thread_pool_thread_num() {
+ if (::doris::config::doris_scanner_thread_pool_thread_num > 0) {
+ return ::doris::config::doris_scanner_thread_pool_thread_num;
+ }
+ return std::max(48, ::doris::CpuInfo::num_cores() * 2);
+}
+} // namespace dynamic
+} // namespace config
Status
OlapScanKeys::get_key_range(std::vector<std::unique_ptr<OlapScanRange>>*
key_range) {
key_range->clear();
diff --git a/be/src/exec/olap_common.h b/be/src/exec/olap_common.h
index c30adf7d2fb..8629d0e3fd3 100644
--- a/be/src/exec/olap_common.h
+++ b/be/src/exec/olap_common.h
@@ -40,6 +40,7 @@
#include "runtime/define_primitive_type.h"
#include "runtime/primitive_type.h"
#include "runtime/type_limit.h"
+#include "util/cpu_info.h"
#include "vec/core/types.h"
#include "vec/io/io_helper.h"
#include "vec/runtime/ipv4_value.h"
@@ -48,6 +49,11 @@
#include "vec/runtime/vdatetime_value.h"
namespace doris {
+namespace config {
+namespace dynamic {
+int doris_scanner_thread_pool_thread_num();
+} // namespace dynamic
+} // namespace config
template <PrimitiveType primitive_type, class T>
std::string cast_to_string(T value, int scale) {
diff --git a/be/src/pipeline/exec/olap_scan_operator.cpp
b/be/src/pipeline/exec/olap_scan_operator.cpp
index f9535ef0c8d..cda01d09866 100644
--- a/be/src/pipeline/exec/olap_scan_operator.cpp
+++ b/be/src/pipeline/exec/olap_scan_operator.cpp
@@ -402,7 +402,7 @@ Status
OlapScanLocalState::_init_scanners(std::list<vectorized::VScannerSPtr>* s
// If the `max_scanners_count` was not set,
// use `config::doris_scanner_thread_pool_thread_num` as the default
value.
if (max_scanners_count <= 0) {
- max_scanners_count = config::doris_scanner_thread_pool_thread_num;
+ max_scanners_count =
config::dynamic::doris_scanner_thread_pool_thread_num();
}
// Too small value of `min_rows_per_scanner` is meaningless.
diff --git a/be/src/runtime/workload_group/workload_group.cpp
b/be/src/runtime/workload_group/workload_group.cpp
index ba84e5786ce..e9c7c41c8f7 100644
--- a/be/src/runtime/workload_group/workload_group.cpp
+++ b/be/src/runtime/workload_group/workload_group.cpp
@@ -368,7 +368,7 @@ WorkloadGroupInfo WorkloadGroupInfo::parse_topic_info(
}
// 9 scan thread num
- int scan_thread_num = config::doris_scanner_thread_pool_thread_num;
+ int scan_thread_num =
config::dynamic::doris_scanner_thread_pool_thread_num();
if (tworkload_group_info.__isset.scan_thread_num &&
tworkload_group_info.scan_thread_num > 0) {
scan_thread_num = tworkload_group_info.scan_thread_num;
}
@@ -478,8 +478,8 @@ void
WorkloadGroup::upsert_task_scheduler(WorkloadGroupInfo* tg_info, ExecEnv* e
std::unique_ptr<vectorized::SimplifiedScanScheduler> scan_scheduler =
std::make_unique<vectorized::SimplifiedScanScheduler>("Scan_"
+ tg_name,
cg_cpu_ctl_ptr, tg_name);
- Status ret =
scan_scheduler->start(config::doris_scanner_thread_pool_thread_num,
-
config::doris_scanner_thread_pool_thread_num,
+ Status ret =
scan_scheduler->start(config::dynamic::doris_scanner_thread_pool_thread_num(),
+
config::dynamic::doris_scanner_thread_pool_thread_num(),
config::doris_scanner_thread_pool_queue_size);
if (ret.ok()) {
_scan_task_sched = std::move(scan_scheduler);
diff --git a/be/src/util/async_io.h b/be/src/util/async_io.h
index a848a628d25..d8ff08b3e69 100644
--- a/be/src/util/async_io.h
+++ b/be/src/util/async_io.h
@@ -19,6 +19,7 @@
#include <bthread/bthread.h>
+#include "exec/olap_common.h"
#include "io/fs/file_system.h"
#include "olap/olap_define.h"
#include "work_thread_pool.hpp"
@@ -35,9 +36,9 @@ struct AsyncIOCtx {
class AsyncIO {
public:
AsyncIO() {
- _io_thread_pool = new
PriorityThreadPool(config::doris_scanner_thread_pool_thread_num,
-
config::doris_scanner_thread_pool_queue_size,
- "async_io_thread_pool");
+ _io_thread_pool = new PriorityThreadPool(
+ config::dynamic::doris_scanner_thread_pool_thread_num(),
+ config::doris_scanner_thread_pool_queue_size,
"async_io_thread_pool");
_remote_thread_pool = new PriorityThreadPool(
config::doris_remote_scanner_thread_pool_thread_num,
config::doris_remote_scanner_thread_pool_queue_size,
"async_remote_thread_pool");
diff --git a/be/src/util/s3_util.cpp b/be/src/util/s3_util.cpp
index ad895948a2f..8eb40c401d0 100644
--- a/be/src/util/s3_util.cpp
+++ b/be/src/util/s3_util.cpp
@@ -53,6 +53,7 @@
#ifdef USE_AZURE
#include "io/fs/azure_obj_storage_client.h"
#endif
+#include "exec/olap_common.h"
#include "io/fs/obj_storage_client.h"
#include "io/fs/s3_obj_storage_client.h"
#include "runtime/exec_env.h"
@@ -372,7 +373,7 @@ std::shared_ptr<io::ObjStorageClient>
S3ClientFactory::_create_s3_client(
#ifdef BE_TEST
// the S3Client may shared by many threads.
// So need to set the number of connections large enough.
- aws_config.maxConnections =
config::doris_scanner_thread_pool_thread_num;
+ aws_config.maxConnections =
config::dynamic::doris_scanner_thread_pool_thread_num();
#else
aws_config.maxConnections =
ExecEnv::GetInstance()->scanner_scheduler()->remote_thread_pool_max_thread_num();
diff --git a/be/src/vec/exec/scan/scanner_context.cpp
b/be/src/vec/exec/scan/scanner_context.cpp
index 5fb564e4ab8..738a919f744 100644
--- a/be/src/vec/exec/scan/scanner_context.cpp
+++ b/be/src/vec/exec/scan/scanner_context.cpp
@@ -171,15 +171,15 @@ Status ScannerContext::init() {
// That will make the number of scan task can be submitted to the
scheduler
// in a vary large value. This logicl is kept from the older
implementation.
if (submit_many_scan_tasks_for_potential_performance_issue ||
_ignore_data_distribution) {
- _max_thread_num = config::doris_scanner_thread_pool_thread_num / 1;
+ _max_thread_num =
config::dynamic::doris_scanner_thread_pool_thread_num() / 1;
} else {
const size_t factor = _is_file_scan_operator ? 1 : 4;
- _max_thread_num = factor *
(config::doris_scanner_thread_pool_thread_num /
+ _max_thread_num = factor *
(config::dynamic::doris_scanner_thread_pool_thread_num() /
_num_parallel_instances);
// In some rare cases, user may set num_parallel_instances to 1
handly to make many query could be executed
// in parallel. We need to make sure the _max_thread_num is
smaller than previous value.
- _max_thread_num =
- std::min(_max_thread_num,
config::doris_scanner_thread_pool_thread_num);
+ _max_thread_num = std::min(_max_thread_num,
+
config::dynamic::doris_scanner_thread_pool_thread_num());
}
}
@@ -200,7 +200,7 @@ Status ScannerContext::init() {
<< ", _ignore_data_distribution: " <<
_ignore_data_distribution
<< ", _is_file_scan_operator: " << _is_file_scan_operator
<< ", doris_scanner_thread_pool_thread_num: "
- << config::doris_scanner_thread_pool_thread_num
+ << config::dynamic::doris_scanner_thread_pool_thread_num()
<< ", _num_parallel_instances: " << _num_parallel_instances
<< ", _all_scanners.size: " << _all_scanners.size()
<< ", should_run_serial: " <<
_local_state->should_run_serial()
diff --git a/be/src/vec/exec/scan/scanner_scheduler.cpp
b/be/src/vec/exec/scan/scanner_scheduler.cpp
index db767cf9228..e7b9350e052 100644
--- a/be/src/vec/exec/scan/scanner_scheduler.cpp
+++ b/be/src/vec/exec/scan/scanner_scheduler.cpp
@@ -80,9 +80,10 @@ Status ScannerScheduler::init(ExecEnv* env) {
// 1. local scan thread pool
_local_scan_thread_pool =
std::make_unique<vectorized::SimplifiedScanScheduler>("local_scan", nullptr);
- Status ret1 =
_local_scan_thread_pool->start(config::doris_scanner_thread_pool_thread_num,
-
config::doris_scanner_thread_pool_thread_num,
-
config::doris_scanner_thread_pool_queue_size);
+ Status ret1 =
+
_local_scan_thread_pool->start(config::dynamic::doris_scanner_thread_pool_thread_num(),
+
config::dynamic::doris_scanner_thread_pool_thread_num(),
+
config::doris_scanner_thread_pool_queue_size);
RETURN_IF_ERROR(ret1);
// 2. remote scan thread pool
@@ -96,11 +97,12 @@ Status ScannerScheduler::init(ExecEnv* env) {
RETURN_IF_ERROR(ret2);
// 3. limited scan thread pool
- RETURN_IF_ERROR(ThreadPoolBuilder("LimitedScanThreadPool")
-
.set_min_threads(config::doris_scanner_thread_pool_thread_num)
-
.set_max_threads(config::doris_scanner_thread_pool_thread_num)
-
.set_max_queue_size(config::doris_scanner_thread_pool_queue_size)
- .build(&_limited_scan_thread_pool));
+ RETURN_IF_ERROR(
+ ThreadPoolBuilder("LimitedScanThreadPool")
+
.set_min_threads(config::dynamic::doris_scanner_thread_pool_thread_num())
+
.set_max_threads(config::dynamic::doris_scanner_thread_pool_thread_num())
+
.set_max_queue_size(config::doris_scanner_thread_pool_queue_size)
+ .build(&_limited_scan_thread_pool));
_is_init = true;
return Status::OK();
}
@@ -332,7 +334,7 @@ int ScannerScheduler::get_remote_scan_thread_num() {
int num = config::doris_max_remote_scanner_thread_pool_thread_num != -1
?
config::doris_max_remote_scanner_thread_pool_thread_num
: std::max(512, CpuInfo::num_cores() * 10);
- return std::max(num, config::doris_scanner_thread_pool_thread_num);
+ return std::max(num,
config::dynamic::doris_scanner_thread_pool_thread_num());
}();
return remote_max_thread_num;
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]