This is an automated email from the ASF dual-hosted git repository.
dataroaring pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-3.0 by this push:
new 49d0e72177d [Opt] (multi-catalog) opt max scanner thread number in
batch split mode. (#50767)
49d0e72177d is described below
commit 49d0e72177d954f0e51d2c4c44f37d486d6f5e33
Author: Qi Chen <[email protected]>
AuthorDate: Thu May 15 15:51:32 2025 +0800
[Opt] (multi-catalog) opt max scanner thread number in batch split mode.
(#50767)
Cherry-pick #44635
---
be/src/pipeline/exec/file_scan_operator.cpp | 41 ++++++++++++++++++++---------
be/src/pipeline/exec/file_scan_operator.h | 6 +++++
be/src/pipeline/exec/scan_operator.cpp | 5 +++-
be/src/pipeline/exec/scan_operator.h | 6 +++++
be/src/vec/exec/scan/scanner_context.cpp | 9 +++----
be/src/vec/exec/scan/scanner_context.h | 4 ++-
6 files changed, 52 insertions(+), 19 deletions(-)
diff --git a/be/src/pipeline/exec/file_scan_operator.cpp
b/be/src/pipeline/exec/file_scan_operator.cpp
index 6fa7401e278..62994bc6db4 100644
--- a/be/src/pipeline/exec/file_scan_operator.cpp
+++ b/be/src/pipeline/exec/file_scan_operator.cpp
@@ -26,6 +26,7 @@
#include "pipeline/exec/olap_scan_operator.h"
#include "pipeline/exec/scan_operator.h"
#include "vec/exec/format/format_common.h"
+#include "vec/exec/scan/scanner_context.h"
#include "vec/exec/scan/vfile_scanner.h"
namespace doris::pipeline {
@@ -37,8 +38,9 @@ Status
FileScanLocalState::_init_scanners(std::list<vectorized::VScannerSPtr>* s
}
auto& p = _parent->cast<FileScanOperatorX>();
+ // There's only one scan range for each backend in batch split mode. Each
backend only starts up one ScanNode instance.
size_t shard_num = std::min<size_t>(
- config::doris_scanner_thread_pool_thread_num /
state()->query_parallel_instance_num(),
+ config::doris_scanner_thread_pool_thread_num /
p.query_parallel_instance_num(),
_max_scanners);
shard_num = std::max(shard_num, (size_t)1);
_kv_cache.reset(new vectorized::ShardedKVCache(shard_num));
@@ -60,28 +62,43 @@ std::string FileScanLocalState::name_suffix() const {
void FileScanLocalState::set_scan_ranges(RuntimeState* state,
const std::vector<TScanRangeParams>&
scan_ranges) {
- _max_scanners =
- config::doris_scanner_thread_pool_thread_num /
state->query_parallel_instance_num();
- _max_scanners = std::max(std::max(_max_scanners,
state->parallel_scan_max_scanners_count()), 1);
- // For select * from table limit 10; should just use one thread.
- if (should_run_serial()) {
- _max_scanners = 1;
- }
+ auto& p = _parent->cast<FileScanOperatorX>();
+
+ auto calc_max_scanners = [&](int parallel_instance_num) -> int {
+ int max_scanners = config::doris_scanner_thread_pool_thread_num /
parallel_instance_num;
+ max_scanners =
+ std::max(std::max(max_scanners,
state->parallel_scan_max_scanners_count()), 1);
+ if (should_run_serial()) {
+ max_scanners = 1;
+ }
+ return max_scanners;
+ };
+
if (scan_ranges.size() == 1) {
auto scan_range =
scan_ranges[0].scan_range.ext_scan_range.file_scan_range;
if (scan_range.__isset.split_source) {
+ p._batch_split_mode = true;
auto split_source = scan_range.split_source;
RuntimeProfile::Counter* get_split_timer =
ADD_TIMER(_runtime_profile, "GetSplitTime");
+
+ _max_scanners = calc_max_scanners(p.query_parallel_instance_num());
_split_source =
std::make_shared<vectorized::RemoteSplitSourceConnector>(
state, get_split_timer, split_source.split_source_id,
split_source.num_splits,
_max_scanners);
}
}
- if (_split_source == nullptr) {
- _split_source =
-
std::make_shared<vectorized::LocalSplitSourceConnector>(scan_ranges,
_max_scanners);
+
+ if (!p._batch_split_mode) {
+ _max_scanners = calc_max_scanners(p.query_parallel_instance_num());
+ if (_split_source == nullptr) {
+ _split_source =
std::make_shared<vectorized::LocalSplitSourceConnector>(scan_ranges,
+
_max_scanners);
+ }
+ // currently the total number of splits in the bach split mode cannot
be accurately obtained,
+ // so we don't do it in the batch split mode.
+ _max_scanners = std::min(_max_scanners,
_split_source->num_scan_ranges());
}
- _max_scanners = std::min(_max_scanners, _split_source->num_scan_ranges());
+
if (scan_ranges.size() > 0 &&
scan_ranges[0].scan_range.ext_scan_range.file_scan_range.__isset.params) {
// for compatibility.
diff --git a/be/src/pipeline/exec/file_scan_operator.h
b/be/src/pipeline/exec/file_scan_operator.h
index 2777a013d62..25635dcdd62 100644
--- a/be/src/pipeline/exec/file_scan_operator.h
+++ b/be/src/pipeline/exec/file_scan_operator.h
@@ -80,10 +80,16 @@ public:
bool is_file_scan_operator() const override { return true; }
+ // There's only one scan range for each backend in batch split mode. Each
backend only starts up one ScanNode instance.
+ int query_parallel_instance_num() const override {
+ return _batch_split_mode ? 1 : _query_parallel_instance_num;
+ }
+
private:
friend class FileScanLocalState;
const std::string _table_name;
+ bool _batch_split_mode = false;
};
} // namespace doris::pipeline
diff --git a/be/src/pipeline/exec/scan_operator.cpp
b/be/src/pipeline/exec/scan_operator.cpp
index 850bb81d2c5..686fe6d2960 100644
--- a/be/src/pipeline/exec/scan_operator.cpp
+++ b/be/src/pipeline/exec/scan_operator.cpp
@@ -1004,7 +1004,8 @@ Status ScanLocalState<Derived>::_start_scanners(
auto& p = _parent->cast<typename Derived::Parent>();
_scanner_ctx = vectorized::ScannerContext::create_shared(
state(), this, p._output_tuple_desc, p.output_row_descriptor(),
scanners, p.limit(),
- _scan_dependency, p.is_serial_operator(),
p.is_file_scan_operator());
+ _scan_dependency, p.is_serial_operator(),
p.is_file_scan_operator(),
+ p.query_parallel_instance_num());
return Status::OK();
}
@@ -1220,6 +1221,8 @@ Status ScanOperatorX<LocalStateType>::init(const
TPlanNode& tnode, RuntimeState*
}
}
+ _query_parallel_instance_num = state->query_parallel_instance_num();
+
return Status::OK();
}
diff --git a/be/src/pipeline/exec/scan_operator.h
b/be/src/pipeline/exec/scan_operator.h
index 4519a3ca283..ea7c965c522 100644
--- a/be/src/pipeline/exec/scan_operator.h
+++ b/be/src/pipeline/exec/scan_operator.h
@@ -371,6 +371,10 @@ public:
[[nodiscard]] virtual bool is_file_scan_operator() const { return false; }
+ [[nodiscard]] virtual int query_parallel_instance_num() const {
+ return _query_parallel_instance_num;
+ }
+
const std::vector<TRuntimeFilterDesc>& runtime_filter_descs() override {
return _runtime_filter_descs;
}
@@ -433,6 +437,8 @@ protected:
int64_t _push_down_count = -1;
const int _parallel_tasks = 0;
+ int _query_parallel_instance_num = 0;
+
std::vector<int> topn_filter_source_node_ids;
};
diff --git a/be/src/vec/exec/scan/scanner_context.cpp
b/be/src/vec/exec/scan/scanner_context.cpp
index 5a33aec2de8..15afb3254ca 100644
--- a/be/src/vec/exec/scan/scanner_context.cpp
+++ b/be/src/vec/exec/scan/scanner_context.cpp
@@ -47,7 +47,7 @@ ScannerContext::ScannerContext(
const TupleDescriptor* output_tuple_desc, const RowDescriptor*
output_row_descriptor,
const std::list<std::shared_ptr<vectorized::ScannerDelegate>>&
scanners, int64_t limit_,
std::shared_ptr<pipeline::Dependency> dependency, bool
ignore_data_distribution,
- bool is_file_scan_operator)
+ bool is_file_scan_operator, int num_parallel_instances)
: HasTaskExecutionCtx(state),
_state(state),
_local_state(local_state),
@@ -60,7 +60,8 @@ ScannerContext::ScannerContext(
_scanner_scheduler_global(state->exec_env()->scanner_scheduler()),
_all_scanners(scanners.begin(), scanners.end()),
_ignore_data_distribution(ignore_data_distribution),
- _is_file_scan_operator(is_file_scan_operator) {
+ _is_file_scan_operator(is_file_scan_operator),
+ _num_parallel_instances(num_parallel_instances) {
DCHECK(_output_row_descriptor == nullptr ||
_output_row_descriptor->tuple_descriptors().size() == 1);
_query_id = _state->get_query_ctx()->query_id();
@@ -105,8 +106,6 @@ Status ScannerContext::init() {
_local_state->_runtime_profile->add_info_string("UseSpecificThreadToken",
thread_token == nullptr ?
"False" : "True");
- const int num_parallel_instances = _state->query_parallel_instance_num();
-
// _max_bytes_in_queue controls the maximum memory that can be used by a
single scan instance.
// scan_queue_mem_limit on FE is 100MB by default, on backend we will make
sure its actual value
// is larger than 10MB.
@@ -176,7 +175,7 @@ Status ScannerContext::init() {
} else {
const size_t factor = _is_file_scan_operator ? 1 : 4;
_max_thread_num = factor *
(config::doris_scanner_thread_pool_thread_num /
- num_parallel_instances);
+ _num_parallel_instances);
// In some rare cases, user may set num_parallel_instances to 1
handly to make many query could be executed
// in parallel. We need to make sure the _max_thread_num is
smaller than previous value.
_max_thread_num =
diff --git a/be/src/vec/exec/scan/scanner_context.h
b/be/src/vec/exec/scan/scanner_context.h
index 1604e1224f4..47bc5896926 100644
--- a/be/src/vec/exec/scan/scanner_context.h
+++ b/be/src/vec/exec/scan/scanner_context.h
@@ -107,7 +107,8 @@ public:
const RowDescriptor* output_row_descriptor,
const
std::list<std::shared_ptr<vectorized::ScannerDelegate>>& scanners,
int64_t limit_, std::shared_ptr<pipeline::Dependency>
dependency,
- bool ignore_data_distribution, bool is_file_scan_operator);
+ bool ignore_data_distribution, bool is_file_scan_operator,
+ int num_parallel_instances);
~ScannerContext() override {
SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(_query_thread_context.query_mem_tracker);
@@ -213,6 +214,7 @@ protected:
std::shared_ptr<pipeline::Dependency> _dependency = nullptr;
bool _ignore_data_distribution = false;
bool _is_file_scan_operator = false;
+ int _num_parallel_instances;
// for scaling up the running scanners
size_t _estimated_block_size = 0;
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]