This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new ffd178f5ff1 [feat](pipelinex) support parallel scan on pipeline x 
engine (#29070)
ffd178f5ff1 is described below

commit ffd178f5ff16370e4d7440821a1927648019e672
Author: Jerry Hu <[email protected]>
AuthorDate: Thu Dec 28 21:29:07 2023 +0800

    [feat](pipelinex) support parallel scan on pipeline x engine (#29070)
    
    * [feat](pipelinex) support parallel scan on pipeline x engine
    
    * make parallel scan be independent of shared scan
---
 be/src/olap/parallel_scanner_builder.cpp           |  2 +
 be/src/pipeline/exec/olap_scan_operator.cpp        | 67 +++++++++++++++++++++-
 be/src/vec/exec/scan/new_olap_scan_node.cpp        | 43 +++++++-------
 .../main/java/org/apache/doris/qe/Coordinator.java |  4 ++
 4 files changed, 94 insertions(+), 22 deletions(-)

diff --git a/be/src/olap/parallel_scanner_builder.cpp 
b/be/src/olap/parallel_scanner_builder.cpp
index 32b163b7a1a..4e64551dc3b 100644
--- a/be/src/olap/parallel_scanner_builder.cpp
+++ b/be/src/olap/parallel_scanner_builder.cpp
@@ -18,6 +18,7 @@
 #include "parallel_scanner_builder.h"
 
 #include "olap/rowset/beta_rowset.h"
+#include "pipeline/exec/olap_scan_operator.h"
 #include "vec/exec/scan/new_olap_scanner.h"
 
 namespace doris {
@@ -197,5 +198,6 @@ std::shared_ptr<NewOlapScanner> 
ParallelScannerBuilder<ParentType>::_build_scann
 }
 
 template class ParallelScannerBuilder<NewOlapScanNode>;
+template class ParallelScannerBuilder<pipeline::OlapScanLocalState>;
 
 } // namespace doris
\ No newline at end of file
diff --git a/be/src/pipeline/exec/olap_scan_operator.cpp 
b/be/src/pipeline/exec/olap_scan_operator.cpp
index 3e40e173585..d1575075b4c 100644
--- a/be/src/pipeline/exec/olap_scan_operator.cpp
+++ b/be/src/pipeline/exec/olap_scan_operator.cpp
@@ -21,6 +21,7 @@
 
 #include <memory>
 
+#include "olap/parallel_scanner_builder.h"
 #include "olap/storage_engine.h"
 #include "olap/tablet_manager.h"
 #include "pipeline/exec/scan_operator.h"
@@ -28,8 +29,6 @@
 #include "util/to_string.h"
 #include "vec/exec/runtime_filter_consumer.h"
 #include "vec/exec/scan/new_olap_scanner.h"
-#include "vec/exec/scan/pip_scanner_context.h"
-#include "vec/exec/scan/scanner_context.h"
 #include "vec/exec/scan/vscan_node.h"
 #include "vec/exprs/vcompound_pred.h"
 #include "vec/exprs/vectorized_fn_call.h"
@@ -256,6 +255,70 @@ Status 
OlapScanLocalState::_init_scanners(std::list<vectorized::VScannerSPtr>* s
     if (_cond_ranges.empty()) {
         _cond_ranges.emplace_back(new doris::OlapScanRange());
     }
+
+    bool enable_parallel_scan = state()->enable_parallel_scan();
+    bool has_cpu_limit = state()->query_options().__isset.resource_limit &&
+                         
state()->query_options().resource_limit.__isset.cpu_limit;
+
+    if (enable_parallel_scan && !p._should_run_serial && !has_cpu_limit &&
+        p._push_down_agg_type == TPushAggOp::NONE) {
+        std::vector<TabletWithVersion> tablets;
+        bool is_dup_mow_key = true;
+        for (auto&& scan_range : _scan_ranges) {
+            auto tablet = 
DORIS_TRY(ExecEnv::get_tablet(scan_range->tablet_id));
+            is_dup_mow_key =
+                    tablet->keys_type() == DUP_KEYS || (tablet->keys_type() == 
UNIQUE_KEYS &&
+                                                        
tablet->enable_unique_key_merge_on_write());
+            if (!is_dup_mow_key) {
+                break;
+            }
+
+            int64_t version = 0;
+            std::from_chars(scan_range->version.data(),
+                            scan_range->version.data() + 
scan_range->version.size(), version);
+            tablets.emplace_back(
+                    TabletWithVersion 
{std::dynamic_pointer_cast<Tablet>(tablet), version});
+        }
+
+        if (is_dup_mow_key) {
+            std::vector<OlapScanRange*> key_ranges;
+            for (auto& range : _cond_ranges) {
+                if (range->begin_scan_range.size() == 1 &&
+                    range->begin_scan_range.get_value(0) == NEGATIVE_INFINITY) 
{
+                    continue;
+                }
+                key_ranges.emplace_back(range.get());
+            }
+
+            ParallelScannerBuilder<OlapScanLocalState> scanner_builder(
+                    this, tablets, _scanner_profile, key_ranges, state(), 
p._limit_per_scanner,
+                    is_dup_mow_key, p._olap_scan_node.is_preaggregation);
+
+            int max_scanners_count = 
state()->parallel_scan_max_scanners_count();
+
+            // If the `max_scanners_count` was not set,
+            // use `config::doris_scanner_thread_pool_thread_num` as the 
default value.
+            if (max_scanners_count <= 0) {
+                max_scanners_count = 
config::doris_scanner_thread_pool_thread_num;
+            }
+
+            // Too small value of `min_rows_per_scanner` is meaningless.
+            auto min_rows_per_scanner =
+                    std::max<int64_t>(1024, 
state()->parallel_scan_min_rows_per_scanner());
+            scanner_builder.set_max_scanners_count(max_scanners_count);
+            scanner_builder.set_min_rows_per_scanner(min_rows_per_scanner);
+
+            RETURN_IF_ERROR(scanner_builder.build_scanners(*scanners));
+            for (auto& scanner : *scanners) {
+                auto* olap_scanner = 
assert_cast<vectorized::NewOlapScanner*>(scanner.get());
+                RETURN_IF_ERROR(olap_scanner->prepare(state(), _conjuncts));
+                olap_scanner->set_compound_filters(_compound_filters);
+            }
+            LOG(INFO) << "parallel scanners count: " << scanners->size();
+            return Status::OK();
+        }
+    }
+
     int scanners_per_tablet = std::max(1, 64 / (int)_scan_ranges.size());
 
     auto build_new_scanner = [&](BaseTabletSPtr tablet, int64_t version,
diff --git a/be/src/vec/exec/scan/new_olap_scan_node.cpp 
b/be/src/vec/exec/scan/new_olap_scan_node.cpp
index b33c656a8df..8ac1af004d9 100644
--- a/be/src/vec/exec/scan/new_olap_scan_node.cpp
+++ b/be/src/vec/exec/scan/new_olap_scan_node.cpp
@@ -532,7 +532,7 @@ Status 
NewOlapScanNode::_init_scanners(std::list<VScannerSPtr>* scanners) {
     // Split tablet segment by scanner, only use in pipeline in duplicate key
     // 1. if tablet count lower than scanner thread num, count segment num of 
all tablet ready for scan
     // TODO: some tablet may do not have segment, may need split segment all 
case
-    if (_shared_scan_opt && _scan_ranges.size() < 
config::doris_scanner_thread_pool_thread_num) {
+    if (_scan_ranges.size() < config::doris_scanner_thread_pool_thread_num) {
         for (auto&& [tablet, version] : tablets_to_scan) {
             is_dup_mow_key =
                     tablet->keys_type() == DUP_KEYS || (tablet->keys_type() == 
UNIQUE_KEYS &&
@@ -541,26 +541,29 @@ Status 
NewOlapScanNode::_init_scanners(std::list<VScannerSPtr>* scanners) {
                 break;
             }
 
-            auto& read_source = tablets_read_source.emplace_back();
-            {
-                std::shared_lock rdlock(tablet->get_header_lock());
-                auto st = tablet->capture_rs_readers({0, version}, 
&read_source.rs_splits, false);
-                if (!st.ok()) {
-                    LOG(WARNING) << "fail to init reader.res=" << st;
-                    return Status::InternalError(
-                            "failed to initialize storage reader. tablet_id={} 
: {}",
-                            tablet->tablet_id(), st.to_string());
+            if (_shared_scan_opt) {
+                auto& read_source = tablets_read_source.emplace_back();
+                {
+                    std::shared_lock rdlock(tablet->get_header_lock());
+                    auto st =
+                            tablet->capture_rs_readers({0, version}, 
&read_source.rs_splits, false);
+                    if (!st.ok()) {
+                        LOG(WARNING) << "fail to init reader.res=" << st;
+                        return Status::InternalError(
+                                "failed to initialize storage reader. 
tablet_id={} : {}",
+                                tablet->tablet_id(), st.to_string());
+                    }
+                }
+                if (!_state->skip_delete_predicate()) {
+                    read_source.fill_delete_predicates();
                 }
-            }
-            if (!_state->skip_delete_predicate()) {
-                read_source.fill_delete_predicates();
-            }
 
-            auto& rs_seg_count = tablet_rs_seg_count.emplace_back();
-            for (const auto& rowset_splits : read_source.rs_splits) {
-                auto num_segments = 
rowset_splits.rs_reader->rowset()->num_segments();
-                rs_seg_count.emplace_back(num_segments);
-                segment_count += num_segments;
+                auto& rs_seg_count = tablet_rs_seg_count.emplace_back();
+                for (const auto& rowset_splits : read_source.rs_splits) {
+                    auto num_segments = 
rowset_splits.rs_reader->rowset()->num_segments();
+                    rs_seg_count.emplace_back(num_segments);
+                    segment_count += num_segments;
+                }
             }
         }
     }
@@ -627,7 +630,7 @@ Status 
NewOlapScanNode::_init_scanners(std::list<VScannerSPtr>* scanners) {
         return Status::OK();
     };
 
-    if (is_dup_mow_key) {
+    if (_shared_scan_opt && is_dup_mow_key) {
         // 2. Split segment evenly to each scanner (e.g. each scanner need to 
scan `avg_segment_count_per_scanner` segments)
         const auto avg_segment_count_by_scanner =
                 std::max(segment_count / 
config::doris_scanner_thread_pool_thread_num, (size_t)1);
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
index 2d4e5764aad..92bb329b98c 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
@@ -1099,6 +1099,10 @@ public class Coordinator implements CoordInterface {
             try {
                 PExecPlanFragmentResult result = 
triple.getRight().get(timeoutMs, TimeUnit.MILLISECONDS);
                 code = 
TStatusCode.findByValue(result.getStatus().getStatusCode());
+                if (code == null) {
+                    code = TStatusCode.INTERNAL_ERROR;
+                }
+
                 if (code != TStatusCode.OK) {
                     if (!result.getStatus().getErrorMsgsList().isEmpty()) {
                         errMsg = result.getStatus().getErrorMsgsList().get(0);


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to