This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch branch-4.0
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-4.0 by this push:
new 45a97ef739c [Improvement](external) Increase scanner concurrency
(#58073) (#58678)
45a97ef739c is described below
commit 45a97ef739c18827be0e9051e0c374284335699f
Author: Gabriel <[email protected]>
AuthorDate: Wed Dec 3 22:21:44 2025 +0800
[Improvement](external) Increase scanner concurrency (#58073) (#58678)
The concurrency of scanners are controlled by some variables which is
not suitable for different cases. For external tables, we need more
scanners than the internal tables. In this PR, the concurrency variable
is splitted from the origin one.
None
- Test <!-- At least one of them must be included. -->
- [ ] Regression test
- [ ] Unit Test
- [ ] Manual test (add detailed scripts or steps below)
- [ ] No need to test or manual test. Explain why:
- [ ] This is a refactor/code format and no logic has been changed.
- [ ] Previous test can cover this change. - [ ] No code files have been
changed. - [ ] Other reason <!-- Add your reason? -->
- Behavior changed:
- [ ] No.
- [ ] Yes. <!-- Explain the behavior change -->
- Does this need documentation?
- [ ] No.
- [ ] Yes. <!-- Add document PR link here. eg:
https://github.com/apache/doris-website/pull/1214 -->
- [ ] Confirm the release note
- [ ] Confirm test cases
- [ ] Confirm document
- [ ] Add branch pick label <!-- Add branch pick label that this PR
should merge into -->
### What problem does this PR solve?
Issue Number: close #xxx
Related PR: #xxx
Problem Summary:
### Release note
None
### Check List (For Author)
- Test <!-- At least one of them must be included. -->
- [ ] Regression test
- [ ] Unit Test
- [ ] Manual test (add detailed scripts or steps below)
- [ ] No need to test or manual test. Explain why:
- [ ] This is a refactor/code format and no logic has been changed.
- [ ] Previous test can cover this change.
- [ ] No code files have been changed.
- [ ] Other reason <!-- Add your reason? -->
- Behavior changed:
- [ ] No.
- [ ] Yes. <!-- Explain the behavior change -->
- Does this need documentation?
- [ ] No.
- [ ] Yes. <!-- Add document PR link here. eg:
https://github.com/apache/doris-website/pull/1214 -->
### Check List (For Reviewer who merge this PR)
- [ ] Confirm the release note
- [ ] Confirm test cases
- [ ] Confirm document
- [ ] Add branch pick label <!-- Add branch pick label that this PR
should merge into -->
---
be/src/common/config.cpp | 17 +++
be/src/common/config.h | 2 +
be/src/pipeline/exec/file_scan_operator.cpp | 49 +++++++-
be/src/pipeline/exec/file_scan_operator.h | 7 +-
be/src/pipeline/exec/operator.h | 3 +
be/src/pipeline/exec/scan_operator.cpp | 58 ++++++---
be/src/pipeline/exec/scan_operator.h | 17 +--
be/src/runtime/runtime_state.h | 26 ++--
be/src/runtime/workload_group/workload_group.cpp | 15 ++-
be/src/vec/exec/scan/scanner_context.cpp | 71 +++--------
be/src/vec/exec/scan/scanner_context.h | 20 +--
be/src/vec/exec/scan/scanner_scheduler.h | 135 +++++++--------------
be/test/scan/mock_simplified_scan_scheduler.h | 3 +-
be/test/scan/scanner_context_test.cpp | 19 ++-
.../java/org/apache/doris/qe/SessionVariable.java | 43 ++++---
gensrc/thrift/PaloInternalService.thrift | 8 +-
.../hive/test_hive_topn_rf_null.groovy | 2 +-
17 files changed, 268 insertions(+), 227 deletions(-)
diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp
index e9fa7c5f6a9..666676e7077 100644
--- a/be/src/common/config.cpp
+++ b/be/src/common/config.cpp
@@ -369,6 +369,23 @@ DEFINE_mInt32(unused_rowset_monitor_interval, "30");
DEFINE_mInt32(quering_rowsets_evict_interval, "30");
DEFINE_String(storage_root_path, "${DORIS_HOME}/storage");
DEFINE_mString(broken_storage_path, "");
+DEFINE_Int32(min_active_scan_threads, "-1");
+DEFINE_Int32(min_active_file_scan_threads, "-1");
+
+DEFINE_Validator(min_active_scan_threads, [](const int config) -> bool {
+ if (config == -1) {
+ CpuInfo::init();
+ min_active_scan_threads = CpuInfo::num_cores() * 2;
+ }
+ return true;
+});
+DEFINE_Validator(min_active_file_scan_threads, [](const int config) -> bool {
+ if (config == -1) {
+ CpuInfo::init();
+ min_active_file_scan_threads = CpuInfo::num_cores() * 8;
+ }
+ return true;
+});
// Config is used to check incompatible old format hdr_ format
// whether doris uses strict way. When config is true, process will log fatal
diff --git a/be/src/common/config.h b/be/src/common/config.h
index 584f8b66abd..d6c83b87138 100644
--- a/be/src/common/config.h
+++ b/be/src/common/config.h
@@ -398,6 +398,8 @@ DECLARE_mInt32(unused_rowset_monitor_interval);
DECLARE_mInt32(quering_rowsets_evict_interval);
DECLARE_String(storage_root_path);
DECLARE_mString(broken_storage_path);
+DECLARE_Int32(min_active_scan_threads);
+DECLARE_Int32(min_active_file_scan_threads);
// Config is used to check incompatible old format hdr_ format
// whether doris uses strict way. When config is true, process will log fatal
diff --git a/be/src/pipeline/exec/file_scan_operator.cpp
b/be/src/pipeline/exec/file_scan_operator.cpp
index e83a7c3ef94..6f943d464aa 100644
--- a/be/src/pipeline/exec/file_scan_operator.cpp
+++ b/be/src/pipeline/exec/file_scan_operator.cpp
@@ -31,6 +31,44 @@
namespace doris::pipeline {
#include "common/compile_check_begin.h"
+
+int FileScanLocalState::max_scanners_concurrency(RuntimeState* state) const {
+ // For select * from table limit 10; should just use one thread.
+ if (should_run_serial()) {
+ return 1;
+ }
+ /*
+ * The max concurrency of file scanners for each FileScanLocalState is
determined by:
+ * 1. User specified max_file_scanners_concurrency which is set through
session variable.
+ * 2. Default: 16
+ *
+ * If this is a serial operator, the max concurrency should multiply by
the number of parallel instances of the operator.
+ */
+ return (state->max_file_scanners_concurrency() > 0 ?
state->max_file_scanners_concurrency()
+ : 16) *
+ (state->query_parallel_instance_num() /
_parent->parallelism(state));
+}
+
+int FileScanLocalState::min_scanners_concurrency(RuntimeState* state) const {
+ if (should_run_serial()) {
+ return 1;
+ }
+ /*
+ * The min concurrency of scanners for each FileScanLocalState is
determined by:
+ * 1. User specified min_file_scanners_concurrency which is set through
session variable.
+ * 2. Default: 1
+ *
+ * If this is a serial operator, the max concurrency should multiply by
the number of parallel instances of the operator.
+ */
+ return (state->min_file_scanners_concurrency() > 0 ?
state->min_file_scanners_concurrency()
+ : 1) *
+ (state->query_parallel_instance_num() /
_parent->parallelism(state));
+}
+
+vectorized::ScannerScheduler* FileScanLocalState::scan_scheduler(RuntimeState*
state) const {
+ return state->get_query_ctx()->get_remote_scan_scheduler();
+}
+
Status FileScanLocalState::_init_scanners(std::list<vectorized::ScannerSPtr>*
scanners) {
if (_split_source->num_scan_ranges() == 0) {
_eos = true;
@@ -44,9 +82,9 @@ Status
FileScanLocalState::_init_scanners(std::list<vectorized::ScannerSPtr>* sc
auto& p = _parent->cast<FileScanOperatorX>();
// There's only one scan range for each backend in batch split mode. Each
backend only starts up one ScanNode instance.
- uint32_t shard_num =
std::min(vectorized::ScannerScheduler::get_remote_scan_thread_num() /
- p.query_parallel_instance_num(),
- _max_scanners);
+ uint32_t shard_num = std::min(
+ vectorized::ScannerScheduler::get_remote_scan_thread_num() /
p.parallelism(state()),
+ _max_scanners);
shard_num = std::max(shard_num, 1U);
_kv_cache.reset(new vectorized::ShardedKVCache(shard_num));
for (int i = 0; i < _max_scanners; ++i) {
@@ -82,10 +120,11 @@ void FileScanLocalState::set_scan_ranges(RuntimeState*
state,
auto scan_range =
scan_ranges[0].scan_range.ext_scan_range.file_scan_range;
if (scan_range.__isset.split_source) {
p._batch_split_mode = true;
+ custom_profile()->add_info_string("BatchSplitMode", "true");
auto split_source = scan_range.split_source;
RuntimeProfile::Counter* get_split_timer =
ADD_TIMER(custom_profile(), "GetSplitTime");
- _max_scanners = calc_max_scanners(p.query_parallel_instance_num());
+ _max_scanners = calc_max_scanners(p.parallelism(state));
_split_source =
std::make_shared<vectorized::RemoteSplitSourceConnector>(
state, get_split_timer, split_source.split_source_id,
split_source.num_splits,
_max_scanners);
@@ -93,7 +132,7 @@ void FileScanLocalState::set_scan_ranges(RuntimeState* state,
}
if (!p._batch_split_mode) {
- _max_scanners = calc_max_scanners(p.query_parallel_instance_num());
+ _max_scanners = calc_max_scanners(p.parallelism(state));
if (_split_source == nullptr) {
_split_source =
std::make_shared<vectorized::LocalSplitSourceConnector>(scan_ranges,
_max_scanners);
diff --git a/be/src/pipeline/exec/file_scan_operator.h
b/be/src/pipeline/exec/file_scan_operator.h
index d3c77b4e000..12b303a02c9 100644
--- a/be/src/pipeline/exec/file_scan_operator.h
+++ b/be/src/pipeline/exec/file_scan_operator.h
@@ -54,6 +54,9 @@ public:
const std::vector<TScanRangeParams>& scan_ranges)
override;
int parent_id() { return _parent->node_id(); }
std::string name_suffix() const override;
+ int max_scanners_concurrency(RuntimeState* state) const override;
+ int min_scanners_concurrency(RuntimeState* state) const override;
+ vectorized::ScannerScheduler* scan_scheduler(RuntimeState* state) const
override;
private:
friend class vectorized::FileScanner;
@@ -83,8 +86,8 @@ public:
bool is_file_scan_operator() const override { return true; }
// There's only one scan range for each backend in batch split mode. Each
backend only starts up one ScanNode instance.
- int query_parallel_instance_num() const override {
- return _batch_split_mode ? 1 : _query_parallel_instance_num;
+ int parallelism(RuntimeState* state) const override {
+ return _batch_split_mode ? 1 :
ScanOperatorX<FileScanLocalState>::parallelism(state);
}
private:
diff --git a/be/src/pipeline/exec/operator.h b/be/src/pipeline/exec/operator.h
index 7ea094f517c..baf0c48cd48 100644
--- a/be/src/pipeline/exec/operator.h
+++ b/be/src/pipeline/exec/operator.h
@@ -113,6 +113,9 @@ public:
[[nodiscard]] virtual Status terminate(RuntimeState* state) = 0;
[[nodiscard]] virtual Status close(RuntimeState* state);
[[nodiscard]] virtual int node_id() const = 0;
+ [[nodiscard]] virtual int parallelism(RuntimeState* state) const {
+ return _is_serial_operator ? 1 : state->query_parallel_instance_num();
+ }
[[nodiscard]] virtual Status set_child(OperatorPtr child) {
if (_child && child != nullptr) {
diff --git a/be/src/pipeline/exec/scan_operator.cpp
b/be/src/pipeline/exec/scan_operator.cpp
index 05f127ba9df..31ce83989e6 100644
--- a/be/src/pipeline/exec/scan_operator.cpp
+++ b/be/src/pipeline/exec/scan_operator.cpp
@@ -72,6 +72,41 @@ bool ScanLocalState<Derived>::should_run_serial() const {
return _parent->cast<typename Derived::Parent>()._should_run_serial;
}
+int ScanLocalStateBase::max_scanners_concurrency(RuntimeState* state) const {
+ // For select * from table limit 10; should just use one thread.
+ if (should_run_serial()) {
+ return 1;
+ }
+ /*
+ * The max concurrency of scanners for each ScanLocalStateBase is
determined by:
+ * 1. User specified max_scanners_concurrency which is set through session
variable.
+ * 2. Default: 4
+ *
+ * If this is a serial operator, the max concurrency should multiply by
the number of parallel instances of the operator.
+ */
+ return (state->max_scanners_concurrency() > 0 ?
state->max_scanners_concurrency() : 4) *
+ (state->query_parallel_instance_num() /
_parent->parallelism(state));
+}
+
+int ScanLocalStateBase::min_scanners_concurrency(RuntimeState* state) const {
+ if (should_run_serial()) {
+ return 1;
+ }
+ /*
+ * The min concurrency of scanners for each ScanLocalStateBase is
determined by:
+ * 1. User specified min_scanners_concurrency which is set through session
variable.
+ * 2. Default: 1
+ *
+ * If this is a serial operator, the max concurrency should multiply by
the number of parallel instances of the operator.
+ */
+ return (state->min_scanners_concurrency() > 0 ?
state->min_scanners_concurrency() : 1) *
+ (state->query_parallel_instance_num() /
_parent->parallelism(state));
+}
+
+vectorized::ScannerScheduler* ScanLocalStateBase::scan_scheduler(RuntimeState*
state) const {
+ return state->get_query_ctx()->get_scan_scheduler();
+}
+
template <typename Derived>
Status ScanLocalState<Derived>::init(RuntimeState* state, LocalStateInfo&
info) {
RETURN_IF_ERROR(PipelineXLocalState<>::init(state, info));
@@ -1046,19 +1081,14 @@ template <typename Derived>
Status ScanLocalState<Derived>::_start_scanners(
const std::list<std::shared_ptr<vectorized::ScannerDelegate>>&
scanners) {
auto& p = _parent->cast<typename Derived::Parent>();
- // If scan operator is serial operator(like topn), its real parallelism is
1.
- // Otherwise, its real parallelism is query_parallel_instance_num.
- // query_parallel_instance_num of olap table is usually equal to session
var parallel_pipeline_task_num.
- // for file scan operator, its real parallelism will be 1 if it is in
batch mode.
- // Related pr:
- // https://github.com/apache/doris/pull/42460
- // https://github.com/apache/doris/pull/44635
- const int parallism_of_scan_operator =
- p.is_serial_operator() ? 1 : p.query_parallel_instance_num();
-
- _scanner_ctx = vectorized::ScannerContext::create_shared(
- state(), this, p._output_tuple_desc, p.output_row_descriptor(),
scanners, p.limit(),
- _scan_dependency, parallism_of_scan_operator);
+ _scanner_ctx = vectorized::ScannerContext::create_shared(state(), this,
p._output_tuple_desc,
+
p.output_row_descriptor(), scanners,
+ p.limit(),
_scan_dependency
+#ifdef BE_TEST
+ ,
+
max_scanners_concurrency(state())
+#endif
+ );
return Status::OK();
}
@@ -1267,8 +1297,6 @@ Status ScanOperatorX<LocalStateType>::init(const
TPlanNode& tnode, RuntimeState*
}
}
- _query_parallel_instance_num = state->query_parallel_instance_num();
-
return Status::OK();
}
diff --git a/be/src/pipeline/exec/scan_operator.h
b/be/src/pipeline/exec/scan_operator.h
index 92362921b78..00c39269c25 100644
--- a/be/src/pipeline/exec/scan_operator.h
+++ b/be/src/pipeline/exec/scan_operator.h
@@ -83,6 +83,16 @@ public:
virtual TPushAggOp::type get_push_down_agg_type() = 0;
virtual int64_t get_push_down_count() = 0;
+ // If scan operator is serial operator(like topn), its real parallelism is
1.
+ // Otherwise, its real parallelism is query_parallel_instance_num.
+ // query_parallel_instance_num of olap table is usually equal to session
var parallel_pipeline_task_num.
+ // for file scan operator, its real parallelism will be 1 if it is in
batch mode.
+ // Related pr:
+ // https://github.com/apache/doris/pull/42460
+ // https://github.com/apache/doris/pull/44635
+ [[nodiscard]] virtual int max_scanners_concurrency(RuntimeState* state)
const;
+ [[nodiscard]] virtual int min_scanners_concurrency(RuntimeState* state)
const;
+ [[nodiscard]] virtual vectorized::ScannerScheduler*
scan_scheduler(RuntimeState* state) const;
[[nodiscard]] std::string get_name() { return _parent->get_name(); }
@@ -362,10 +372,6 @@ public:
[[nodiscard]] virtual bool is_file_scan_operator() const { return false; }
- [[nodiscard]] virtual int query_parallel_instance_num() const {
- return _query_parallel_instance_num;
- }
-
[[nodiscard]] size_t get_reserve_mem_size(RuntimeState* state) override;
const std::vector<TRuntimeFilterDesc>& runtime_filter_descs() override {
@@ -441,9 +447,6 @@ protected:
// Record the value of the aggregate function 'count' from doris's be
int64_t _push_down_count = -1;
const int _parallel_tasks = 0;
-
- int _query_parallel_instance_num = 0;
-
std::vector<int> topn_filter_source_node_ids;
};
diff --git a/be/src/runtime/runtime_state.h b/be/src/runtime/runtime_state.h
index d4fed370806..b67b9004961 100644
--- a/be/src/runtime/runtime_state.h
+++ b/be/src/runtime/runtime_state.h
@@ -142,18 +142,26 @@ public:
return _query_options.__isset.execution_timeout ?
_query_options.execution_timeout
:
_query_options.query_timeout;
}
- int num_scanner_threads() const {
- return _query_options.__isset.num_scanner_threads ?
_query_options.num_scanner_threads : 0;
- }
- int min_scan_concurrency_of_scan_scheduler() const {
- return _query_options.__isset.min_scan_scheduler_concurrency
- ? _query_options.min_scan_scheduler_concurrency
+ int max_scanners_concurrency() const {
+ return _query_options.__isset.max_scanners_concurrency
+ ? _query_options.max_scanners_concurrency
: 0;
}
+ int max_file_scanners_concurrency() const {
+ return _query_options.__isset.max_file_scanners_concurrency
+ ? _query_options.max_file_scanners_concurrency
+ : max_scanners_concurrency();
+ }
+
+ int min_scanners_concurrency() const {
+ return _query_options.__isset.min_scanners_concurrency
+ ? _query_options.min_scanners_concurrency
+ : 1;
+ }
- int min_scan_concurrency_of_scanner() const {
- return _query_options.__isset.min_scanner_concurrency
- ? _query_options.min_scanner_concurrency
+ int min_file_scanners_concurrency() const {
+ return _query_options.__isset.min_file_scanners_concurrency
+ ? _query_options.min_file_scanners_concurrency
: 1;
}
diff --git a/be/src/runtime/workload_group/workload_group.cpp
b/be/src/runtime/workload_group/workload_group.cpp
index 406554c211f..6c9c2255908 100644
--- a/be/src/runtime/workload_group/workload_group.cpp
+++ b/be/src/runtime/workload_group/workload_group.cpp
@@ -560,7 +560,8 @@ Status
WorkloadGroup::upsert_thread_pool_no_lock(WorkloadGroupInfo* wg_info,
}
Status ret = scan_scheduler->start(scan_thread_num, scan_thread_num,
-
config::doris_scanner_thread_pool_queue_size);
+
config::doris_scanner_thread_pool_queue_size,
+ config::min_active_scan_threads);
if (ret.ok()) {
_scan_task_sched = std::move(scan_scheduler);
} else {
@@ -581,9 +582,9 @@ Status
WorkloadGroup::upsert_thread_pool_no_lock(WorkloadGroupInfo* wg_info,
remote_scan_scheduler =
std::make_unique<vectorized::ThreadPoolSimplifiedScanScheduler>(
"rs_" + wg_name, cg_cpu_ctl_ptr, wg_name);
}
- Status ret =
- remote_scan_scheduler->start(max_remote_scan_thread_num,
min_remote_scan_thread_num,
- remote_scan_thread_queue_size);
+ Status ret = remote_scan_scheduler->start(
+ max_remote_scan_thread_num, min_remote_scan_thread_num,
+ remote_scan_thread_queue_size,
config::min_active_file_scan_threads);
if (ret.ok()) {
_remote_scan_task_sched = std::move(remote_scan_scheduler);
} else {
@@ -614,12 +615,14 @@ Status
WorkloadGroup::upsert_thread_pool_no_lock(WorkloadGroupInfo* wg_info,
// 2 update thread pool
if (scan_thread_num > 0 && _scan_task_sched) {
- _scan_task_sched->reset_thread_num(scan_thread_num, scan_thread_num);
+ _scan_task_sched->reset_thread_num(scan_thread_num, scan_thread_num,
+ config::min_active_scan_threads);
}
if (max_remote_scan_thread_num >= min_remote_scan_thread_num &&
_remote_scan_task_sched) {
_remote_scan_task_sched->reset_thread_num(max_remote_scan_thread_num,
- min_remote_scan_thread_num);
+ min_remote_scan_thread_num,
+
config::min_active_file_scan_threads);
}
return upsert_ret;
diff --git a/be/src/vec/exec/scan/scanner_context.cpp
b/be/src/vec/exec/scan/scanner_context.cpp
index 916e14ff808..c6c46223894 100644
--- a/be/src/vec/exec/scan/scanner_context.cpp
+++ b/be/src/vec/exec/scan/scanner_context.cpp
@@ -56,7 +56,12 @@ ScannerContext::ScannerContext(
RuntimeState* state, pipeline::ScanLocalStateBase* local_state,
const TupleDescriptor* output_tuple_desc, const RowDescriptor*
output_row_descriptor,
const std::list<std::shared_ptr<vectorized::ScannerDelegate>>&
scanners, int64_t limit_,
- std::shared_ptr<pipeline::Dependency> dependency, int
parallism_of_scan_operator)
+ std::shared_ptr<pipeline::Dependency> dependency
+#ifdef BE_TEST
+ ,
+ int num_parallel_instances
+#endif
+ )
: HasTaskExecutionCtx(state),
_state(state),
_local_state(local_state),
@@ -67,9 +72,18 @@ ScannerContext::ScannerContext(
_batch_size(state->batch_size()),
limit(limit_),
_all_scanners(scanners.begin(), scanners.end()),
- _parallism_of_scan_operator(parallism_of_scan_operator),
-
_min_scan_concurrency_of_scan_scheduler(_state->min_scan_concurrency_of_scan_scheduler()),
- _min_scan_concurrency(_state->min_scan_concurrency_of_scanner()) {
+#ifndef BE_TEST
+ _scanner_scheduler(local_state->scan_scheduler(state)),
+ _min_scan_concurrency_of_scan_scheduler(
+ _scanner_scheduler->get_min_active_scan_threads()),
+
_max_scan_concurrency(std::min(local_state->max_scanners_concurrency(state),
+ cast_set<int>(scanners.size()))),
+#else
+ _scanner_scheduler(state->get_query_ctx()->get_scan_scheduler()),
+ _min_scan_concurrency_of_scan_scheduler(0),
+ _max_scan_concurrency(num_parallel_instances),
+#endif
+ _min_scan_concurrency(local_state->min_scanners_concurrency(state)) {
DCHECK(_state != nullptr);
DCHECK(_output_row_descriptor == nullptr ||
_output_row_descriptor->tuple_descriptors().size() == 1);
@@ -106,14 +120,6 @@ Status ScannerContext::init() {
auto scanner = _all_scanners.front().lock();
DCHECK(scanner != nullptr);
- // TODO: Maybe need refactor.
- // A query could have remote scan task and local scan task at the same
time.
- // So we need to compute the _scanner_scheduler in each scan operator
instead of query context.
- if (scanner->_scanner->get_storage_type() ==
TabletStorageType::STORAGE_TYPE_LOCAL) {
- _scanner_scheduler = _state->get_query_ctx()->get_scan_scheduler();
- } else {
- _scanner_scheduler =
_state->get_query_ctx()->get_remote_scan_scheduler();
- }
if (auto* task_executor_scheduler =
dynamic_cast<TaskExecutorSimplifiedScanScheduler*>(_scanner_scheduler)) {
std::shared_ptr<TaskExecutor> task_executor =
task_executor_scheduler->task_executor();
@@ -132,43 +138,11 @@ Status ScannerContext::init() {
// Provide more memory for wide tables, increase proportionally by
multiples of 300
_max_bytes_in_queue *= _output_tuple_desc->slots().size() / 300 + 1;
- if (_min_scan_concurrency_of_scan_scheduler == 0) {
- // _scanner_scheduler->get_max_threads() is setted by workload group.
- _min_scan_concurrency_of_scan_scheduler = 2 *
_scanner_scheduler->get_max_threads();
- }
-
if (_all_scanners.empty()) {
_is_finished = true;
_set_scanner_done();
}
- // The overall target of our system is to make full utilization of the
resources.
- // At the same time, we dont want too many tasks are queued by scheduler,
that is not necessary.
- // Each scan operator can submit _max_scan_concurrency scanner to
scheduelr if scheduler has enough resource.
- // So that for a single query, we can make sure it could make full
utilization of the resource.
- _max_scan_concurrency = _state->num_scanner_threads();
- if (_max_scan_concurrency == 0) {
- // Why this is safe:
- /*
- 1. If num cpu cores is less than or equal to 24:
- _max_concurrency_of_scan_scheduler will be 96.
_parallism_of_scan_operator will be 1 or C/2.
- so _max_scan_concurrency will be 96 or (96 * 2 / C).
- For a single scan node, most scanner it can submit will be 96
or (96 * 2 / C) * (C / 2) which is 96 too.
- So a single scan node could make full utilization of the
resource without sumbiting all its tasks.
- 2. If num cpu cores greater than 24:
- _max_concurrency_of_scan_scheduler will be 4 * C.
_parallism_of_scan_operator will be 1 or C/2.
- so _max_scan_concurrency will be 4 * C or (4 * C * 2 / C).
- For a single scan node, most scanner it can submit will be 4 *
C or (4 * C * 2 / C) * (C / 2) which is 4 * C too.
-
- So, in all situations, when there is only one scan node, it could
make full utilization of the resource.
- */
- _max_scan_concurrency =
- _min_scan_concurrency_of_scan_scheduler /
_parallism_of_scan_operator;
- _max_scan_concurrency = _max_scan_concurrency == 0 ? 1 :
_max_scan_concurrency;
- }
-
- _max_scan_concurrency = std::min(_max_scan_concurrency,
(int32_t)_pending_scanners.size());
-
// when user not specify scan_thread_num, so we can try downgrade
_max_thread_num.
// becaue we found in a table with 5k columns, column reader may ocuppy
too much memory.
// you can refer https://github.com/apache/doris/issues/35340 for details.
@@ -191,15 +165,6 @@ Status ScannerContext::init() {
}
}
- // For select * from table limit 10; should just use one thread.
- if (_local_state->should_run_serial()) {
- _max_scan_concurrency = 1;
- _min_scan_concurrency = 1;
- }
-
- // Avoid corner case.
- _min_scan_concurrency = std::min(_min_scan_concurrency,
_max_scan_concurrency);
-
COUNTER_SET(_local_state->_max_scan_concurrency,
(int64_t)_max_scan_concurrency);
COUNTER_SET(_local_state->_min_scan_concurrency,
(int64_t)_min_scan_concurrency);
diff --git a/be/src/vec/exec/scan/scanner_context.h
b/be/src/vec/exec/scan/scanner_context.h
index 143a4f12dfd..d6c3793ff09 100644
--- a/be/src/vec/exec/scan/scanner_context.h
+++ b/be/src/vec/exec/scan/scanner_context.h
@@ -123,8 +123,12 @@ public:
const TupleDescriptor* output_tuple_desc,
const RowDescriptor* output_row_descriptor,
const
std::list<std::shared_ptr<vectorized::ScannerDelegate>>& scanners,
- int64_t limit_, std::shared_ptr<pipeline::Dependency>
dependency,
- int num_parallel_instances);
+ int64_t limit_, std::shared_ptr<pipeline::Dependency>
dependency
+#ifdef BE_TEST
+ ,
+ int num_parallel_instances
+#endif
+ );
~ScannerContext() override;
Status init();
@@ -204,7 +208,6 @@ protected:
/// 3. `_free_blocks_memory_usage` < `_max_bytes_in_queue`, remains enough
memory to scale up
/// 4. At most scale up `MAX_SCALE_UP_RATIO` times to `_max_thread_num`
void _set_scanner_done();
- Status _try_to_scale_up();
RuntimeState* _state = nullptr;
pipeline::ScanLocalStateBase* _local_state = nullptr;
@@ -228,7 +231,6 @@ protected:
int64_t limit;
int64_t _max_bytes_in_queue = 0;
- ScannerScheduler* _scanner_scheduler = nullptr;
// Using stack so that we can resubmit scanner in a LIFO order, maybe more
cache friendly
std::stack<std::shared_ptr<ScanTask>> _pending_scanners;
// Scanner that is submitted to the scheduler.
@@ -244,16 +246,20 @@ protected:
RuntimeProfile::Counter* _scale_up_scanners_counter = nullptr;
std::shared_ptr<ResourceContext> _resource_ctx;
std::shared_ptr<pipeline::Dependency> _dependency = nullptr;
- const int _parallism_of_scan_operator;
std::shared_ptr<doris::vectorized::TaskHandle> _task_handle;
std::atomic<int64_t> _block_memory_usage = 0;
// adaptive scan concurrency related
- int32_t _min_scan_concurrency_of_scan_scheduler = 0;
- int32_t _min_scan_concurrency = 1;
+ ScannerScheduler* _scanner_scheduler = nullptr;
+ MOCK_REMOVE(const) int32_t _min_scan_concurrency_of_scan_scheduler = 0;
+ // The overall target of our system is to make full utilization of the
resources.
+ // At the same time, we dont want too many tasks are queued by scheduler,
that is not necessary.
+ // Each scan operator can submit _max_scan_concurrency scanner to
scheduelr if scheduler has enough resource.
+ // So that for a single query, we can make sure it could make full
utilization of the resource.
int32_t _max_scan_concurrency = 0;
+ MOCK_REMOVE(const) int32_t _min_scan_concurrency = 1;
std::shared_ptr<ScanTask> _pull_next_scan_task(std::shared_ptr<ScanTask>
current_scan_task,
int32_t
current_concurrency);
diff --git a/be/src/vec/exec/scan/scanner_scheduler.h
b/be/src/vec/exec/scan/scanner_scheduler.h
index 7dc374a9168..26ed7f52811 100644
--- a/be/src/vec/exec/scan/scanner_scheduler.h
+++ b/be/src/vec/exec/scan/scanner_scheduler.h
@@ -112,16 +112,16 @@ public:
static int get_remote_scan_thread_queue_size();
- virtual Status start(int max_thread_num, int min_thread_num, int
queue_size) = 0;
+ virtual Status start(int max_thread_num, int min_thread_num, int
queue_size,
+ int min_active_scan_threads) = 0;
virtual void stop() = 0;
virtual Status submit_scan_task(SimplifiedScanTask scan_task) = 0;
virtual Status submit_scan_task(SimplifiedScanTask scan_task,
const std::string& task_id_string) = 0;
- virtual void reset_thread_num(int new_max_thread_num, int
new_min_thread_num) = 0;
- virtual void reset_max_thread_num(int thread_num) = 0;
- virtual void reset_min_thread_num(int thread_num) = 0;
- virtual int get_max_threads() = 0;
+ virtual void reset_thread_num(int new_max_thread_num, int
new_min_thread_num,
+ int min_active_scan_threads) = 0;
+ int get_min_active_scan_threads() const { return _min_active_scan_threads;
}
virtual int get_queue_size() = 0;
virtual int get_active_threads() = 0;
@@ -131,6 +131,9 @@ public:
std::shared_ptr<ScanTask>
current_scan_task,
std::unique_lock<std::mutex>&
transfer_lock) = 0;
+protected:
+ int _min_active_scan_threads;
+
private:
static void _scanner_scan(std::shared_ptr<ScannerContext> ctx,
std::shared_ptr<ScanTask> scan_task);
@@ -139,7 +142,7 @@ private:
vectorized::Block*
block);
};
-class ThreadPoolSimplifiedScanScheduler : public ScannerScheduler {
+class ThreadPoolSimplifiedScanScheduler MOCK_REMOVE(final) : public
ScannerScheduler {
public:
ThreadPoolSimplifiedScanScheduler(std::string sched_name,
std::shared_ptr<CgroupCpuCtl>
cgroup_cpu_ctl,
@@ -149,20 +152,22 @@ public:
_sched_name(sched_name),
_workload_group(workload_group) {}
- virtual ~ThreadPoolSimplifiedScanScheduler() override {
+ ~ThreadPoolSimplifiedScanScheduler() override {
#ifndef BE_TEST
stop();
#endif
LOG(INFO) << "Scanner sche " << _sched_name << " shutdown";
}
- virtual void stop() override {
+ void stop() override {
_is_stop.store(true);
_scan_thread_pool->shutdown();
_scan_thread_pool->wait();
}
- virtual Status start(int max_thread_num, int min_thread_num, int
queue_size) override {
+ Status start(int max_thread_num, int min_thread_num, int queue_size,
+ int min_active_scan_threads) override {
+ _min_active_scan_threads = min_active_scan_threads;
RETURN_IF_ERROR(ThreadPoolBuilder(_sched_name, _workload_group)
.set_min_threads(min_thread_num)
.set_max_threads(max_thread_num)
@@ -172,7 +177,7 @@ public:
return Status::OK();
}
- virtual Status submit_scan_task(SimplifiedScanTask scan_task) override {
+ Status submit_scan_task(SimplifiedScanTask scan_task) override {
if (!_is_stop) {
return _scan_thread_pool->submit_func([scan_task] {
scan_task.scan_func(); });
} else {
@@ -180,12 +185,14 @@ public:
}
}
- virtual Status submit_scan_task(SimplifiedScanTask scan_task,
- const std::string& task_id_string)
override {
+ Status submit_scan_task(SimplifiedScanTask scan_task,
+ const std::string& task_id_string) override {
return submit_scan_task(scan_task);
}
- virtual void reset_thread_num(int new_max_thread_num, int
new_min_thread_num) override {
+ void reset_thread_num(int new_max_thread_num, int new_min_thread_num,
+ int min_active_scan_threads) override {
+ _min_active_scan_threads = min_active_scan_threads;
int cur_max_thread_num = _scan_thread_pool->max_threads();
int cur_min_thread_num = _scan_thread_pool->min_threads();
if (cur_max_thread_num == new_max_thread_num && cur_min_thread_num ==
new_min_thread_num) {
@@ -216,41 +223,15 @@ public:
}
}
- virtual void reset_max_thread_num(int thread_num) override {
- int max_thread_num = _scan_thread_pool->max_threads();
-
- if (max_thread_num != thread_num) {
- Status st = _scan_thread_pool->set_max_threads(thread_num);
- if (!st.ok()) {
- LOG(INFO) << "reset max thread num failed, sche name=" <<
_sched_name;
- }
- }
- }
-
- virtual void reset_min_thread_num(int thread_num) override {
- int min_thread_num = _scan_thread_pool->min_threads();
+ int get_queue_size() override { return
_scan_thread_pool->get_queue_size(); }
- if (min_thread_num != thread_num) {
- Status st = _scan_thread_pool->set_min_threads(thread_num);
- if (!st.ok()) {
- LOG(INFO) << "reset min thread num failed, sche name=" <<
_sched_name;
- }
- }
- }
+ int get_active_threads() override { return
_scan_thread_pool->num_active_threads(); }
- virtual int get_queue_size() override { return
_scan_thread_pool->get_queue_size(); }
+ std::vector<int> thread_debug_info() override { return
_scan_thread_pool->debug_info(); }
- virtual int get_active_threads() override { return
_scan_thread_pool->num_active_threads(); }
-
- virtual int get_max_threads() override { return
_scan_thread_pool->max_threads(); }
-
- virtual std::vector<int> thread_debug_info() override {
- return _scan_thread_pool->debug_info();
- }
-
- virtual Status schedule_scan_task(std::shared_ptr<ScannerContext>
scanner_ctx,
- std::shared_ptr<ScanTask>
current_scan_task,
- std::unique_lock<std::mutex>&
transfer_lock) override;
+ Status schedule_scan_task(std::shared_ptr<ScannerContext> scanner_ctx,
+ std::shared_ptr<ScanTask> current_scan_task,
+ std::unique_lock<std::mutex>& transfer_lock)
override;
private:
std::unique_ptr<ThreadPool> _scan_thread_pool;
@@ -261,7 +242,7 @@ private:
std::shared_mutex _lock;
};
-class TaskExecutorSimplifiedScanScheduler : public ScannerScheduler {
+class TaskExecutorSimplifiedScanScheduler final : public ScannerScheduler {
public:
TaskExecutorSimplifiedScanScheduler(std::string sched_name,
std::shared_ptr<CgroupCpuCtl>
cgroup_cpu_ctl,
@@ -271,19 +252,21 @@ public:
_sched_name(sched_name),
_workload_group(workload_group) {}
- virtual ~TaskExecutorSimplifiedScanScheduler() override {
+ ~TaskExecutorSimplifiedScanScheduler() override {
#ifndef BE_TEST
stop();
#endif
LOG(INFO) << "Scanner sche " << _sched_name << " shutdown";
}
- virtual void stop() override {
+ void stop() override {
_is_stop.store(true);
_task_executor->stop();
}
- virtual Status start(int max_thread_num, int min_thread_num, int
queue_size) override {
+ Status start(int max_thread_num, int min_thread_num, int queue_size,
+ int min_active_scan_threads) override {
+ _min_active_scan_threads = min_active_scan_threads;
TimeSharingTaskExecutor::ThreadConfig thread_config;
thread_config.thread_name = _sched_name;
thread_config.workload_group = _workload_group;
@@ -300,7 +283,7 @@ public:
return Status::OK();
}
- virtual Status submit_scan_task(SimplifiedScanTask scan_task) override {
+ Status submit_scan_task(SimplifiedScanTask scan_task) override {
if (!_is_stop) {
std::shared_ptr<SplitRunner> split_runner;
if (scan_task.scan_task->is_first_schedule) {
@@ -332,8 +315,8 @@ public:
// A task has only one split. When the split is created, the task is
created according to the task_id,
// and the task is automatically removed when the split ends.
// Now it is only for PInternalService::multiget_data_v2 used by TopN
materialization.
- virtual Status submit_scan_task(SimplifiedScanTask scan_task,
- const std::string& task_id_string)
override {
+ Status submit_scan_task(SimplifiedScanTask scan_task,
+ const std::string& task_id_string) override {
if (!_is_stop) {
vectorized::TaskId task_id(task_id_string);
std::shared_ptr<TaskHandle> task_handle =
DORIS_TRY(_task_executor->create_task(
@@ -364,7 +347,9 @@ public:
}
}
- virtual void reset_thread_num(int new_max_thread_num, int
new_min_thread_num) override {
+ void reset_thread_num(int new_max_thread_num, int new_min_thread_num,
+ int min_active_scan_threads) override {
+ _min_active_scan_threads = min_active_scan_threads;
auto task_executor =
std::dynamic_pointer_cast<doris::vectorized::TimeSharingTaskExecutor>(
_task_executor);
int cur_max_thread_num = task_executor->max_threads();
@@ -397,51 +382,19 @@ public:
}
}
- virtual void reset_max_thread_num(int thread_num) override {
- auto task_executor =
std::dynamic_pointer_cast<doris::vectorized::TimeSharingTaskExecutor>(
- _task_executor);
- int max_thread_num = task_executor->max_threads();
-
- if (max_thread_num != thread_num) {
- Status st = task_executor->set_max_threads(thread_num);
- if (!st.ok()) {
- LOG(INFO) << "reset max thread num failed, sche name=" <<
_sched_name;
- }
- }
- }
-
- virtual void reset_min_thread_num(int thread_num) override {
- auto task_executor =
std::dynamic_pointer_cast<doris::vectorized::TimeSharingTaskExecutor>(
- _task_executor);
- int min_thread_num = task_executor->min_threads();
-
- if (min_thread_num != thread_num) {
- Status st = task_executor->set_min_threads(thread_num);
- if (!st.ok()) {
- LOG(INFO) << "reset min thread num failed, sche name=" <<
_sched_name;
- }
- }
- }
-
- virtual int get_queue_size() override {
+ int get_queue_size() override {
auto task_executor =
std::dynamic_pointer_cast<doris::vectorized::TimeSharingTaskExecutor>(
_task_executor);
return task_executor->get_queue_size();
}
- virtual int get_active_threads() override {
+ int get_active_threads() override {
auto task_executor =
std::dynamic_pointer_cast<doris::vectorized::TimeSharingTaskExecutor>(
_task_executor);
return task_executor->num_active_threads();
}
- virtual int get_max_threads() override {
- auto task_executor =
std::dynamic_pointer_cast<doris::vectorized::TimeSharingTaskExecutor>(
- _task_executor);
- return task_executor->max_threads();
- }
-
- virtual std::vector<int> thread_debug_info() override {
+ std::vector<int> thread_debug_info() override {
auto task_executor =
std::dynamic_pointer_cast<doris::vectorized::TimeSharingTaskExecutor>(
_task_executor);
return task_executor->debug_info();
@@ -449,9 +402,9 @@ public:
std::shared_ptr<TaskExecutor> task_executor() const { return
_task_executor; }
- virtual Status schedule_scan_task(std::shared_ptr<ScannerContext>
scanner_ctx,
- std::shared_ptr<ScanTask>
current_scan_task,
- std::unique_lock<std::mutex>&
transfer_lock) override;
+ Status schedule_scan_task(std::shared_ptr<ScannerContext> scanner_ctx,
+ std::shared_ptr<ScanTask> current_scan_task,
+ std::unique_lock<std::mutex>& transfer_lock)
override;
private:
std::atomic<bool> _is_stop;
diff --git a/be/test/scan/mock_simplified_scan_scheduler.h
b/be/test/scan/mock_simplified_scan_scheduler.h
index 43561eaf55c..36cadcc413c 100644
--- a/be/test/scan/mock_simplified_scan_scheduler.h
+++ b/be/test/scan/mock_simplified_scan_scheduler.h
@@ -20,10 +20,11 @@
#include "vec/exec/scan/scanner_scheduler.h"
namespace doris::vectorized {
-class MockSimplifiedScanScheduler : ThreadPoolSimplifiedScanScheduler {
+class MockSimplifiedScanScheduler final : ThreadPoolSimplifiedScanScheduler {
public:
MockSimplifiedScanScheduler(std::shared_ptr<CgroupCpuCtl> cgroup_cpu_ctl)
: ThreadPoolSimplifiedScanScheduler("ForTest", cgroup_cpu_ctl) {}
+ ~MockSimplifiedScanScheduler() override = default;
MOCK_METHOD0(get_active_threads, int());
MOCK_METHOD0(get_queue_size, int());
diff --git a/be/test/scan/scanner_context_test.cpp
b/be/test/scan/scanner_context_test.cpp
index 5cf127b804d..e29400dfc92 100644
--- a/be/test/scan/scanner_context_test.cpp
+++ b/be/test/scan/scanner_context_test.cpp
@@ -159,9 +159,9 @@ TEST_F(ScannerContextTest, test_init) {
olap_scan_local_state->_parent = scan_operator.get();
- // User specified num_scanner_threads is less than _max_scan_concurrency
that we calculated
+ // User specified max_scanners_concurrency is less than
_max_scan_concurrency that we calculated
TQueryOptions query_options;
- query_options.__set_num_scanner_threads(2);
+ query_options.__set_max_scanners_concurrency(2);
query_options.__set_max_column_reader_num(0);
state->set_query_options(query_options);
std::unique_ptr<MockSimplifiedScanScheduler> scheduler =
@@ -174,17 +174,14 @@ TEST_F(ScannerContextTest, test_init) {
scanner_context->_min_scan_concurrency_of_scan_scheduler = 10;
Status st = scanner_context->init();
ASSERT_TRUE(st.ok());
- // actual max_scan_concurrency will be 2 since user specified
num_scanner_threads is 2.
- ASSERT_EQ(scanner_context->_max_scan_concurrency, 2);
+ // actual max_scan_concurrency will be 2 since user specified
max_scanners_concurrency is 2.
+ ASSERT_EQ(scanner_context->_max_scan_concurrency, 1);
- query_options.__set_num_scanner_threads(0);
+ query_options.__set_max_scanners_concurrency(0);
state->set_query_options(query_options);
st = scanner_context->init();
ASSERT_TRUE(st.ok());
-
- ASSERT_EQ(scanner_context->_max_scan_concurrency,
- scanner_context->_min_scan_concurrency_of_scan_scheduler /
parallel_tasks);
}
TEST_F(ScannerContextTest, test_serial_run) {
@@ -223,7 +220,7 @@ TEST_F(ScannerContextTest, test_serial_run) {
olap_scan_local_state->_parent = scan_operator.get();
TQueryOptions query_options;
- query_options.__set_num_scanner_threads(2);
+ query_options.__set_max_scanners_concurrency(2);
query_options.__set_max_column_reader_num(0);
state->set_query_options(query_options);
std::unique_ptr<MockSimplifiedScanScheduler> scheduler =
@@ -237,7 +234,7 @@ TEST_F(ScannerContextTest, test_serial_run) {
ASSERT_TRUE(st.ok());
ASSERT_EQ(scanner_context->_max_scan_concurrency, 1);
- query_options.__set_num_scanner_threads(0);
+ query_options.__set_max_scanners_concurrency(0);
state->set_query_options(query_options);
st = scanner_context->init();
ASSERT_TRUE(st.ok());
@@ -281,7 +278,7 @@ TEST_F(ScannerContextTest, test_max_column_reader_num) {
olap_scan_local_state->_parent = scan_operator.get();
TQueryOptions query_options;
- query_options.__set_num_scanner_threads(20);
+ query_options.__set_max_scanners_concurrency(20);
query_options.__set_max_column_reader_num(1);
state->set_query_options(query_options);
std::unique_ptr<MockSimplifiedScanScheduler> scheduler =
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
index 79ee8d1d06a..d36199dcc49 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
@@ -89,8 +89,10 @@ public class SessionVariable implements Serializable,
Writable {
public static final String EXEC_MEM_LIMIT = "exec_mem_limit";
public static final String LOCAL_EXCHANGE_FREE_BLOCKS_LIMIT =
"local_exchange_free_blocks_limit";
public static final String SCAN_QUEUE_MEM_LIMIT = "scan_queue_mem_limit";
- public static final String NUM_SCANNER_THREADS = "num_scanner_threads";
- public static final String MIN_SCANNER_CONCURRENCY =
"min_scanner_concurrnency";
+ public static final String MAX_SCANNERS_CONCURRENCY =
"max_scanners_concurrency";
+ public static final String MAX_FILE_SCANNERS_CONCURRENCY =
"max_file_scanners_concurrency";
+ public static final String MIN_SCANNERS_CONCURRENCY =
"min_scanners_concurrency";
+ public static final String MIN_FILE_SCANNERS_CONCURRENCY =
"min_file_scanners_concurrency";
public static final String MIN_SCAN_SCHEDULER_CONCURRENCY =
"min_scan_scheduler_concurrency";
public static final String QUERY_TIMEOUT = "query_timeout";
public static final String ANALYZE_TIMEOUT = "analyze_timeout";
@@ -967,25 +969,31 @@ public class SessionVariable implements Serializable,
Writable {
// 100MB
public long maxScanQueueMemByte = 2147483648L / 20;
- @VariableMgr.VarAttr(name = NUM_SCANNER_THREADS, needForward = true,
description = {
- "ScanNode 扫描数据的最大并发,默认为 0,采用 BE 的
doris_scanner_thread_pool_thread_num",
- "The max threads to read data of ScanNode, "
- + "default 0, use doris_scanner_thread_pool_thread_num in
be.conf"
- })
- public int numScannerThreads = 0;
+ @VariableMgr.VarAttr(name = MAX_SCANNERS_CONCURRENCY, needForward = true,
description = {
+ "ScanNode 扫描数据的最大并发,默认为 4", "The max threads to read data of
ScanNode, default 4"})
+ public int maxScannersConcurrency = 4;
+
+ @VariableMgr.VarAttr(name = MAX_FILE_SCANNERS_CONCURRENCY, needForward =
true, description = {
+ "FileScanNode 扫描数据的最大并发,默认为 16", "The max threads to read data of
FileScanNode, default 16"})
+ public int maxFileScannersConcurrency = 16;
@VariableMgr.VarAttr(name = LOCAL_EXCHANGE_FREE_BLOCKS_LIMIT)
public int localExchangeFreeBlocksLimit = 4;
- @VariableMgr.VarAttr(name = MIN_SCANNER_CONCURRENCY, needForward = true,
description = {
+ @VariableMgr.VarAttr(name = MIN_SCANNERS_CONCURRENCY, needForward = true,
description = {
"Scanner 的最小并发度,默认为 1", "The min concurrency of Scanner, default 1"
})
- public int minScannerConcurrency = 1;
+ public int minScannersConcurrency = 1;
+
+ @VariableMgr.VarAttr(name = MIN_FILE_SCANNERS_CONCURRENCY, needForward =
true, description = {
+ "外表Scanner 的最小并发度,默认为 1", "The min concurrency of Remote Scanner,
default 1"
+ })
+ public int minFileScannersConcurrency = 1;
@VariableMgr.VarAttr(name = MIN_SCAN_SCHEDULER_CONCURRENCY, needForward =
true, description = {
"ScanScheduler 的最小并发度,默认值 0 表示使用 Scan 线程池线程数量的两倍", "The min
concurrency of ScanScheduler, "
+ "default 0 means use twice the number of Scan thread pool
threads"
- })
+ }, varType = VariableAnnotation.DEPRECATED)
public int minScanSchedulerConcurrency = 0;
// By default, the number of Limit items after OrderBy is changed from
65535 items
@@ -3374,8 +3382,8 @@ public class SessionVariable implements Serializable,
Writable {
return maxScanQueueMemByte;
}
- public int getNumScannerThreads() {
- return numScannerThreads;
+ public int getMaxScannersConcurrency() {
+ return maxScannersConcurrency;
}
public int getQueryTimeoutS() {
@@ -3580,8 +3588,8 @@ public class SessionVariable implements Serializable,
Writable {
this.maxScanQueueMemByte = scanQueueMemByte;
}
- public void setNumScannerThreads(int numScannerThreads) {
- this.numScannerThreads = numScannerThreads;
+ public void setMaxScannersConcurrency(int maxScannersConcurrency) {
+ this.maxScannersConcurrency = maxScannersConcurrency;
}
public boolean isSqlQuoteShowCreate() {
@@ -4671,9 +4679,12 @@ public class SessionVariable implements Serializable,
Writable {
tResult.setMemLimit(maxExecMemByte);
tResult.setLocalExchangeFreeBlocksLimit(localExchangeFreeBlocksLimit);
tResult.setScanQueueMemLimit(maxScanQueueMemByte);
- tResult.setNumScannerThreads(numScannerThreads);
+ tResult.setMaxScannersConcurrency(maxScannersConcurrency);
+ tResult.setMaxFileScannersConcurrency(maxFileScannersConcurrency);
tResult.setMaxColumnReaderNum(maxColumnReaderNum);
tResult.setParallelPrepareThreshold(parallelPrepareThreshold);
+ tResult.setMinScannersConcurrency(minScannersConcurrency);
+ tResult.setMinFileScannersConcurrency(minFileScannersConcurrency);
tResult.setQueryTimeout(queryTimeoutS);
tResult.setEnableProfile(enableProfile);
diff --git a/gensrc/thrift/PaloInternalService.thrift
b/gensrc/thrift/PaloInternalService.thrift
index d78f6e04ff3..aabebaa0b1b 100644
--- a/gensrc/thrift/PaloInternalService.thrift
+++ b/gensrc/thrift/PaloInternalService.thrift
@@ -95,7 +95,7 @@ struct TQueryOptions {
4: optional i32 batch_size = 0
5: optional i32 num_nodes = NUM_NODES_ALL
6: optional i64 max_scan_range_length = 0 // Deprecated
- 7: optional i32 num_scanner_threads = 0
+ 7: optional i32 max_scanners_concurrency = 0
8: optional i32 max_io_buffers = 0 // Deprecated
9: optional bool allow_unsupported_formats = 0 // Deprecated
10: optional i64 default_order_by_limit = -1
@@ -372,8 +372,8 @@ struct TQueryOptions {
147: optional i32 profile_level = 1;
- 148: optional i32 min_scanner_concurrency = 1;
- 149: optional i32 min_scan_scheduler_concurrency = 0;
+ 148: optional i32 min_scanners_concurrency = 1;
+ 149: optional i32 min_scan_scheduler_concurrency = 0; //deprecated
150: optional bool enable_runtime_filter_partition_prune = true;
// The minimum memory that an operator required to run.
@@ -415,6 +415,8 @@ struct TQueryOptions {
// Target file size in bytes for Iceberg write operations
// Default 0 means use config::iceberg_sink_max_file_size
178: optional i64 iceberg_write_target_file_size_bytes = 0;
+ 180: optional i32 max_file_scanners_concurrency = 0;
+ 181: optional i32 min_file_scanners_concurrency = 0;
182: optional i32 ivf_nprobe = 1;
diff --git
a/regression-test/suites/external_table_p0/hive/test_hive_topn_rf_null.groovy
b/regression-test/suites/external_table_p0/hive/test_hive_topn_rf_null.groovy
index abaf1118a99..c2d5960bb10 100644
---
a/regression-test/suites/external_table_p0/hive/test_hive_topn_rf_null.groovy
+++
b/regression-test/suites/external_table_p0/hive/test_hive_topn_rf_null.groovy
@@ -62,7 +62,7 @@ suite("test_hive_topn_rf_null",
"p0,external,hive,external_docker,external_docke
sql """switch ${catalog};"""
sql """ use `default`; """
- sql """ set num_scanner_threads = 1 """
+ sql """ set max_file_scanners_concurrency = 1 """
sql """ set topn_filter_ratio=1"""
runTopnRfNullTest();
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]