This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new ffd178f5ff1 [feat](pipelinex) support parallel scan on pipeline x
engine (#29070)
ffd178f5ff1 is described below
commit ffd178f5ff16370e4d7440821a1927648019e672
Author: Jerry Hu <[email protected]>
AuthorDate: Thu Dec 28 21:29:07 2023 +0800
[feat](pipelinex) support parallel scan on pipeline x engine (#29070)
* [feat](pipelinex) support parallel scan on pipeline x engine
* make parallel scan be independent of shared scan
---
be/src/olap/parallel_scanner_builder.cpp | 2 +
be/src/pipeline/exec/olap_scan_operator.cpp | 67 +++++++++++++++++++++-
be/src/vec/exec/scan/new_olap_scan_node.cpp | 43 +++++++-------
.../main/java/org/apache/doris/qe/Coordinator.java | 4 ++
4 files changed, 94 insertions(+), 22 deletions(-)
diff --git a/be/src/olap/parallel_scanner_builder.cpp
b/be/src/olap/parallel_scanner_builder.cpp
index 32b163b7a1a..4e64551dc3b 100644
--- a/be/src/olap/parallel_scanner_builder.cpp
+++ b/be/src/olap/parallel_scanner_builder.cpp
@@ -18,6 +18,7 @@
#include "parallel_scanner_builder.h"
#include "olap/rowset/beta_rowset.h"
+#include "pipeline/exec/olap_scan_operator.h"
#include "vec/exec/scan/new_olap_scanner.h"
namespace doris {
@@ -197,5 +198,6 @@ std::shared_ptr<NewOlapScanner>
ParallelScannerBuilder<ParentType>::_build_scann
}
template class ParallelScannerBuilder<NewOlapScanNode>;
+template class ParallelScannerBuilder<pipeline::OlapScanLocalState>;
} // namespace doris
\ No newline at end of file
diff --git a/be/src/pipeline/exec/olap_scan_operator.cpp
b/be/src/pipeline/exec/olap_scan_operator.cpp
index 3e40e173585..d1575075b4c 100644
--- a/be/src/pipeline/exec/olap_scan_operator.cpp
+++ b/be/src/pipeline/exec/olap_scan_operator.cpp
@@ -21,6 +21,7 @@
#include <memory>
+#include "olap/parallel_scanner_builder.h"
#include "olap/storage_engine.h"
#include "olap/tablet_manager.h"
#include "pipeline/exec/scan_operator.h"
@@ -28,8 +29,6 @@
#include "util/to_string.h"
#include "vec/exec/runtime_filter_consumer.h"
#include "vec/exec/scan/new_olap_scanner.h"
-#include "vec/exec/scan/pip_scanner_context.h"
-#include "vec/exec/scan/scanner_context.h"
#include "vec/exec/scan/vscan_node.h"
#include "vec/exprs/vcompound_pred.h"
#include "vec/exprs/vectorized_fn_call.h"
@@ -256,6 +255,70 @@ Status
OlapScanLocalState::_init_scanners(std::list<vectorized::VScannerSPtr>* s
if (_cond_ranges.empty()) {
_cond_ranges.emplace_back(new doris::OlapScanRange());
}
+
+ bool enable_parallel_scan = state()->enable_parallel_scan();
+ bool has_cpu_limit = state()->query_options().__isset.resource_limit &&
+
state()->query_options().resource_limit.__isset.cpu_limit;
+
+ if (enable_parallel_scan && !p._should_run_serial && !has_cpu_limit &&
+ p._push_down_agg_type == TPushAggOp::NONE) {
+ std::vector<TabletWithVersion> tablets;
+ bool is_dup_mow_key = true;
+ for (auto&& scan_range : _scan_ranges) {
+ auto tablet =
DORIS_TRY(ExecEnv::get_tablet(scan_range->tablet_id));
+ is_dup_mow_key =
+ tablet->keys_type() == DUP_KEYS || (tablet->keys_type() ==
UNIQUE_KEYS &&
+
tablet->enable_unique_key_merge_on_write());
+ if (!is_dup_mow_key) {
+ break;
+ }
+
+ int64_t version = 0;
+ std::from_chars(scan_range->version.data(),
+ scan_range->version.data() +
scan_range->version.size(), version);
+ tablets.emplace_back(
+ TabletWithVersion
{std::dynamic_pointer_cast<Tablet>(tablet), version});
+ }
+
+ if (is_dup_mow_key) {
+ std::vector<OlapScanRange*> key_ranges;
+ for (auto& range : _cond_ranges) {
+ if (range->begin_scan_range.size() == 1 &&
+ range->begin_scan_range.get_value(0) == NEGATIVE_INFINITY)
{
+ continue;
+ }
+ key_ranges.emplace_back(range.get());
+ }
+
+ ParallelScannerBuilder<OlapScanLocalState> scanner_builder(
+ this, tablets, _scanner_profile, key_ranges, state(),
p._limit_per_scanner,
+ is_dup_mow_key, p._olap_scan_node.is_preaggregation);
+
+ int max_scanners_count =
state()->parallel_scan_max_scanners_count();
+
+ // If the `max_scanners_count` was not set,
+ // use `config::doris_scanner_thread_pool_thread_num` as the
default value.
+ if (max_scanners_count <= 0) {
+ max_scanners_count =
config::doris_scanner_thread_pool_thread_num;
+ }
+
+ // Too small value of `min_rows_per_scanner` is meaningless.
+ auto min_rows_per_scanner =
+ std::max<int64_t>(1024,
state()->parallel_scan_min_rows_per_scanner());
+ scanner_builder.set_max_scanners_count(max_scanners_count);
+ scanner_builder.set_min_rows_per_scanner(min_rows_per_scanner);
+
+ RETURN_IF_ERROR(scanner_builder.build_scanners(*scanners));
+ for (auto& scanner : *scanners) {
+ auto* olap_scanner =
assert_cast<vectorized::NewOlapScanner*>(scanner.get());
+ RETURN_IF_ERROR(olap_scanner->prepare(state(), _conjuncts));
+ olap_scanner->set_compound_filters(_compound_filters);
+ }
+ LOG(INFO) << "parallel scanners count: " << scanners->size();
+ return Status::OK();
+ }
+ }
+
int scanners_per_tablet = std::max(1, 64 / (int)_scan_ranges.size());
auto build_new_scanner = [&](BaseTabletSPtr tablet, int64_t version,
diff --git a/be/src/vec/exec/scan/new_olap_scan_node.cpp
b/be/src/vec/exec/scan/new_olap_scan_node.cpp
index b33c656a8df..8ac1af004d9 100644
--- a/be/src/vec/exec/scan/new_olap_scan_node.cpp
+++ b/be/src/vec/exec/scan/new_olap_scan_node.cpp
@@ -532,7 +532,7 @@ Status
NewOlapScanNode::_init_scanners(std::list<VScannerSPtr>* scanners) {
// Split tablet segment by scanner, only use in pipeline in duplicate key
// 1. if tablet count lower than scanner thread num, count segment num of
all tablet ready for scan
// TODO: some tablet may do not have segment, may need split segment all
case
- if (_shared_scan_opt && _scan_ranges.size() <
config::doris_scanner_thread_pool_thread_num) {
+ if (_scan_ranges.size() < config::doris_scanner_thread_pool_thread_num) {
for (auto&& [tablet, version] : tablets_to_scan) {
is_dup_mow_key =
tablet->keys_type() == DUP_KEYS || (tablet->keys_type() ==
UNIQUE_KEYS &&
@@ -541,26 +541,29 @@ Status
NewOlapScanNode::_init_scanners(std::list<VScannerSPtr>* scanners) {
break;
}
- auto& read_source = tablets_read_source.emplace_back();
- {
- std::shared_lock rdlock(tablet->get_header_lock());
- auto st = tablet->capture_rs_readers({0, version},
&read_source.rs_splits, false);
- if (!st.ok()) {
- LOG(WARNING) << "fail to init reader.res=" << st;
- return Status::InternalError(
- "failed to initialize storage reader. tablet_id={}
: {}",
- tablet->tablet_id(), st.to_string());
+ if (_shared_scan_opt) {
+ auto& read_source = tablets_read_source.emplace_back();
+ {
+ std::shared_lock rdlock(tablet->get_header_lock());
+ auto st =
+ tablet->capture_rs_readers({0, version},
&read_source.rs_splits, false);
+ if (!st.ok()) {
+ LOG(WARNING) << "fail to init reader.res=" << st;
+ return Status::InternalError(
+ "failed to initialize storage reader.
tablet_id={} : {}",
+ tablet->tablet_id(), st.to_string());
+ }
+ }
+ if (!_state->skip_delete_predicate()) {
+ read_source.fill_delete_predicates();
}
- }
- if (!_state->skip_delete_predicate()) {
- read_source.fill_delete_predicates();
- }
- auto& rs_seg_count = tablet_rs_seg_count.emplace_back();
- for (const auto& rowset_splits : read_source.rs_splits) {
- auto num_segments =
rowset_splits.rs_reader->rowset()->num_segments();
- rs_seg_count.emplace_back(num_segments);
- segment_count += num_segments;
+ auto& rs_seg_count = tablet_rs_seg_count.emplace_back();
+ for (const auto& rowset_splits : read_source.rs_splits) {
+ auto num_segments =
rowset_splits.rs_reader->rowset()->num_segments();
+ rs_seg_count.emplace_back(num_segments);
+ segment_count += num_segments;
+ }
}
}
}
@@ -627,7 +630,7 @@ Status
NewOlapScanNode::_init_scanners(std::list<VScannerSPtr>* scanners) {
return Status::OK();
};
- if (is_dup_mow_key) {
+ if (_shared_scan_opt && is_dup_mow_key) {
// 2. Split segment evenly to each scanner (e.g. each scanner need to
scan `avg_segment_count_per_scanner` segments)
const auto avg_segment_count_by_scanner =
std::max(segment_count /
config::doris_scanner_thread_pool_thread_num, (size_t)1);
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
index 2d4e5764aad..92bb329b98c 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
@@ -1099,6 +1099,10 @@ public class Coordinator implements CoordInterface {
try {
PExecPlanFragmentResult result =
triple.getRight().get(timeoutMs, TimeUnit.MILLISECONDS);
code =
TStatusCode.findByValue(result.getStatus().getStatusCode());
+ if (code == null) {
+ code = TStatusCode.INTERNAL_ERROR;
+ }
+
if (code != TStatusCode.OK) {
if (!result.getStatus().getErrorMsgsList().isEmpty()) {
errMsg = result.getStatus().getErrorMsgsList().get(0);
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]