This is an automated email from the ASF dual-hosted git repository.
panxiaolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 5cfb985ad5a [Enchancement](scan) enable parallel scan when preagg is
on (#35810)
5cfb985ad5a is described below
commit 5cfb985ad5ad2b6570080e44c7244fca513bdc8e
Author: Pxl <[email protected]>
AuthorDate: Tue Jun 4 11:09:17 2024 +0800
[Enchancement](scan) enable parallel scan when preagg is on (#35810)
## Proposed changes
enable parallel scan when preaggregation is true
---
be/src/pipeline/exec/olap_scan_operator.cpp | 69 ++++++++++++-----------------
1 file changed, 29 insertions(+), 40 deletions(-)
diff --git a/be/src/pipeline/exec/olap_scan_operator.cpp
b/be/src/pipeline/exec/olap_scan_operator.cpp
index eb0a0be726b..6e3b99f6d1d 100644
--- a/be/src/pipeline/exec/olap_scan_operator.cpp
+++ b/be/src/pipeline/exec/olap_scan_operator.cpp
@@ -290,53 +290,42 @@ Status
OlapScanLocalState::_init_scanners(std::list<vectorized::VScannerSPtr>* s
}
if (enable_parallel_scan && !p._should_run_serial && !has_cpu_limit &&
- p._push_down_agg_type == TPushAggOp::NONE) {
- bool is_dup_mow_key = true;
- for (auto&& [tablet, _] : tablets) {
- is_dup_mow_key =
- tablet->keys_type() == DUP_KEYS || (tablet->keys_type() ==
UNIQUE_KEYS &&
-
tablet->enable_unique_key_merge_on_write());
- if (!is_dup_mow_key) {
- break;
+ p._push_down_agg_type == TPushAggOp::NONE &&
+ (_storage_no_merge() || p._olap_scan_node.is_preaggregation)) {
+ std::vector<OlapScanRange*> key_ranges;
+ for (auto& range : _cond_ranges) {
+ if (range->begin_scan_range.size() == 1 &&
+ range->begin_scan_range.get_value(0) == NEGATIVE_INFINITY) {
+ continue;
}
+ key_ranges.emplace_back(range.get());
}
- if (is_dup_mow_key) {
- std::vector<OlapScanRange*> key_ranges;
- for (auto& range : _cond_ranges) {
- if (range->begin_scan_range.size() == 1 &&
- range->begin_scan_range.get_value(0) == NEGATIVE_INFINITY)
{
- continue;
- }
- key_ranges.emplace_back(range.get());
- }
+ ParallelScannerBuilder<OlapScanLocalState> scanner_builder(
+ this, tablets, _scanner_profile, key_ranges, state(),
p._limit_per_scanner, true,
+ p._olap_scan_node.is_preaggregation);
- ParallelScannerBuilder<OlapScanLocalState> scanner_builder(
- this, tablets, _scanner_profile, key_ranges, state(),
p._limit_per_scanner,
- is_dup_mow_key, p._olap_scan_node.is_preaggregation);
+ int max_scanners_count = state()->parallel_scan_max_scanners_count();
- int max_scanners_count =
state()->parallel_scan_max_scanners_count();
-
- // If the `max_scanners_count` was not set,
- // use `config::doris_scanner_thread_pool_thread_num` as the
default value.
- if (max_scanners_count <= 0) {
- max_scanners_count =
config::doris_scanner_thread_pool_thread_num;
- }
+ // If the `max_scanners_count` was not set,
+ // use `config::doris_scanner_thread_pool_thread_num` as the default
value.
+ if (max_scanners_count <= 0) {
+ max_scanners_count = config::doris_scanner_thread_pool_thread_num;
+ }
- // Too small value of `min_rows_per_scanner` is meaningless.
- auto min_rows_per_scanner =
- std::max<int64_t>(1024,
state()->parallel_scan_min_rows_per_scanner());
- scanner_builder.set_max_scanners_count(max_scanners_count);
- scanner_builder.set_min_rows_per_scanner(min_rows_per_scanner);
-
- RETURN_IF_ERROR(scanner_builder.build_scanners(*scanners));
- for (auto& scanner : *scanners) {
- auto* olap_scanner =
assert_cast<vectorized::NewOlapScanner*>(scanner.get());
- RETURN_IF_ERROR(olap_scanner->prepare(state(), _conjuncts));
- olap_scanner->set_compound_filters(_compound_filters);
- }
- return Status::OK();
+ // Too small value of `min_rows_per_scanner` is meaningless.
+ auto min_rows_per_scanner =
+ std::max<int64_t>(1024,
state()->parallel_scan_min_rows_per_scanner());
+ scanner_builder.set_max_scanners_count(max_scanners_count);
+ scanner_builder.set_min_rows_per_scanner(min_rows_per_scanner);
+
+ RETURN_IF_ERROR(scanner_builder.build_scanners(*scanners));
+ for (auto& scanner : *scanners) {
+ auto* olap_scanner =
assert_cast<vectorized::NewOlapScanner*>(scanner.get());
+ RETURN_IF_ERROR(olap_scanner->prepare(state(), _conjuncts));
+ olap_scanner->set_compound_filters(_compound_filters);
}
+ return Status::OK();
}
int scanners_per_tablet = std::max(1, 64 / (int)_scan_ranges.size());
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]