This is an automated email from the ASF dual-hosted git repository.

gabriellee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new fd5dd9a391 [Opt](Pipeline) opt pipeline code in mult tablet (#17999)
fd5dd9a391 is described below

commit fd5dd9a3910fa63a0b1ca2d2e120c2a7a7dfbbc0
Author: HappenLee <[email protected]>
AuthorDate: Mon Mar 27 10:02:48 2023 +0800

    [Opt](Pipeline) opt pipeline code in mult tablet (#17999)
---
 be/src/vec/exec/scan/new_olap_scan_node.cpp | 24 +++++----------
 be/src/vec/exec/scan/new_olap_scan_node.h   |  1 +
 be/src/vec/exec/scan/new_olap_scanner.cpp   | 46 +++++++++++++++--------------
 be/src/vec/exec/scan/new_olap_scanner.h     | 15 +++++-----
 be/src/vec/exec/scan/scanner_scheduler.cpp  |  9 +++++-
 be/src/vec/exec/scan/vscan_node.cpp         |  1 -
 be/src/vec/exec/scan/vscanner.h             |  8 ++++-
 7 files changed, 54 insertions(+), 50 deletions(-)

diff --git a/be/src/vec/exec/scan/new_olap_scan_node.cpp 
b/be/src/vec/exec/scan/new_olap_scan_node.cpp
index c4ba3d611c..d884845ab1 100644
--- a/be/src/vec/exec/scan/new_olap_scan_node.cpp
+++ b/be/src/vec/exec/scan/new_olap_scan_node.cpp
@@ -55,7 +55,6 @@ Status NewOlapScanNode::prepare(RuntimeState* state) {
 Status NewOlapScanNode::_init_profile() {
     RETURN_IF_ERROR(VScanNode::_init_profile());
 
-    _num_disks_accessed_counter = ADD_COUNTER(_runtime_profile, 
"NumDiskAccess", TUnit::UNIT);
     _tablet_counter = ADD_COUNTER(_runtime_profile, "TabletNum", TUnit::UNIT);
 
     // 1. init segment profile
@@ -405,11 +404,10 @@ Status 
NewOlapScanNode::_init_scanners(std::list<VScanner*>* scanners) {
     }
 
     // ranges constructed from scan keys
-    std::vector<std::unique_ptr<doris::OlapScanRange>> cond_ranges;
-    RETURN_IF_ERROR(_scan_keys.get_key_range(&cond_ranges));
+    RETURN_IF_ERROR(_scan_keys.get_key_range(&_cond_ranges));
     // if we can't get ranges from conditions, we give it a total range
-    if (cond_ranges.empty()) {
-        cond_ranges.emplace_back(new doris::OlapScanRange());
+    if (_cond_ranges.empty()) {
+        _cond_ranges.emplace_back(new doris::OlapScanRange());
     }
     int scanners_per_tablet = std::max(1, 64 / (int)_scan_ranges.size());
 
@@ -462,25 +460,20 @@ Status 
NewOlapScanNode::_init_scanners(std::list<VScanner*>* scanners) {
         }
     }
 
-    std::unordered_set<std::string> disk_set;
     auto build_new_scanner = [&](const TPaloScanRange& scan_range,
                                  const std::vector<OlapScanRange*>& key_ranges,
                                  const std::vector<RowsetReaderSharedPtr>& 
rs_readers,
                                  const std::vector<std::pair<int, int>>& 
rs_reader_seg_offsets) {
         NewOlapScanner* scanner = new NewOlapScanner(_state, this, 
_limit_per_scanner,
-                                                     
_olap_scan_node.is_preaggregation,
+                                                     
_olap_scan_node.is_preaggregation, scan_range,
+                                                     key_ranges, rs_readers, 
rs_reader_seg_offsets,
                                                      _need_agg_finalize, 
_scanner_profile.get());
 
         scanner->set_compound_filters(_compound_filters);
         // add scanner to pool before doing prepare.
         // so that scanner can be automatically deconstructed if prepare 
failed.
         _scanner_pool.add(scanner);
-        RETURN_IF_ERROR(scanner->prepare(scan_range, key_ranges, 
_vconjunct_ctx_ptr.get(),
-                                         _olap_filters, _filter_predicates, 
_push_down_functions,
-                                         _common_vexpr_ctxs_pushdown.get(), 
rs_readers,
-                                         rs_reader_seg_offsets));
         scanners->push_back((VScanner*)scanner);
-        disk_set.insert(scanner->scan_disk());
         return Status::OK();
     };
     if (is_duplicate_key) {
@@ -489,7 +482,7 @@ Status 
NewOlapScanNode::_init_scanners(std::list<VScanner*>* scanners) {
                 std::max(segment_count / 
config::doris_scanner_thread_pool_thread_num, 1);
         for (int i = 0; i < _scan_ranges.size(); ++i) {
             auto& scan_range = _scan_ranges[i];
-            std::vector<std::unique_ptr<doris::OlapScanRange>>* ranges = 
&cond_ranges;
+            std::vector<std::unique_ptr<doris::OlapScanRange>>* ranges = 
&_cond_ranges;
             int num_ranges = ranges->size();
             std::vector<doris::OlapScanRange*> scanner_ranges(num_ranges);
             for (int j = 0; j < num_ranges; ++j) {
@@ -553,7 +546,7 @@ Status 
NewOlapScanNode::_init_scanners(std::list<VScanner*>* scanners) {
                                                                                
        true);
             RETURN_IF_ERROR(status);
 
-            std::vector<std::unique_ptr<doris::OlapScanRange>>* ranges = 
&cond_ranges;
+            std::vector<std::unique_ptr<doris::OlapScanRange>>* ranges = 
&_cond_ranges;
             int size_based_scanners_per_tablet = 1;
 
             if (config::doris_scan_range_max_mb > 0) {
@@ -577,10 +570,7 @@ Status 
NewOlapScanNode::_init_scanners(std::list<VScanner*>* scanners) {
                 RETURN_IF_ERROR(build_new_scanner(*scan_range, scanner_ranges, 
{}, {}));
             }
         }
-        COUNTER_SET(_num_disks_accessed_counter, 
static_cast<int64_t>(disk_set.size()));
     }
-    // telemetry::set_span_attribute(span, _num_disks_accessed_counter);
-    // telemetry::set_span_attribute(span, _num_scanners);
 
     return Status::OK();
 }
diff --git a/be/src/vec/exec/scan/new_olap_scan_node.h 
b/be/src/vec/exec/scan/new_olap_scan_node.h
index b1c95804b7..48fde8436b 100644
--- a/be/src/vec/exec/scan/new_olap_scan_node.h
+++ b/be/src/vec/exec/scan/new_olap_scan_node.h
@@ -65,6 +65,7 @@ private:
 private:
     TOlapScanNode _olap_scan_node;
     std::vector<std::unique_ptr<TPaloScanRange>> _scan_ranges;
+    std::vector<std::unique_ptr<doris::OlapScanRange>> _cond_ranges;
     OlapScanKeys _scan_keys;
     std::vector<TCondition> _olap_filters;
     // _compound_filters store conditions in the one compound relationship in 
conjunct expr tree except leaf node of `and` node,
diff --git a/be/src/vec/exec/scan/new_olap_scanner.cpp 
b/be/src/vec/exec/scan/new_olap_scanner.cpp
index 3aaaf6add4..59b72eea8b 100644
--- a/be/src/vec/exec/scan/new_olap_scanner.cpp
+++ b/be/src/vec/exec/scan/new_olap_scanner.cpp
@@ -24,12 +24,21 @@
 namespace doris::vectorized {
 
 NewOlapScanner::NewOlapScanner(RuntimeState* state, NewOlapScanNode* parent, 
int64_t limit,
-                               bool aggregation, bool need_agg_finalize, 
RuntimeProfile* profile)
+                               bool aggregation, const TPaloScanRange& 
scan_range,
+                               const std::vector<OlapScanRange*>& key_ranges,
+                               const std::vector<RowsetReaderSharedPtr>& 
rs_readers,
+                               const std::vector<std::pair<int, int>>& 
rs_reader_seg_offsets,
+                               bool need_agg_finalize, RuntimeProfile* profile)
         : VScanner(state, static_cast<VScanNode*>(parent), limit, profile),
           _aggregation(aggregation),
           _need_agg_finalize(need_agg_finalize),
-          _version(-1) {
+          _version(-1),
+          _scan_range(scan_range),
+          _key_ranges(key_ranges) {
+    _tablet_reader_params.rs_readers = rs_readers;
+    _tablet_reader_params.rs_readers_segment_offsets = rs_reader_seg_offsets;
     _tablet_schema = std::make_shared<TabletSchema>();
+    _is_init = false;
 }
 
 static std::string read_columns_to_string(TabletSchemaSPtr tablet_schema,
@@ -46,19 +55,14 @@ static std::string read_columns_to_string(TabletSchemaSPtr 
tablet_schema,
     return read_columns_string;
 }
 
-Status NewOlapScanner::prepare(const TPaloScanRange& scan_range,
-                               const std::vector<OlapScanRange*>& key_ranges,
-                               VExprContext** vconjunct_ctx_ptr,
-                               const std::vector<TCondition>& filters,
-                               const FilterPredicates& filter_predicates,
-                               const std::vector<FunctionFilter>& 
function_filters,
-                               VExprContext** common_vexpr_ctxs_pushdown,
-                               const std::vector<RowsetReaderSharedPtr>& 
rs_readers,
-                               const std::vector<std::pair<int, int>>& 
rs_reader_seg_offsets) {
-    RETURN_IF_ERROR(VScanner::prepare(_state, vconjunct_ctx_ptr));
-    if (common_vexpr_ctxs_pushdown != nullptr) {
+Status NewOlapScanner::init() {
+    _is_init = true;
+    auto parent = static_cast<NewOlapScanNode*>(_parent);
+    RETURN_IF_ERROR(VScanner::prepare(_state, 
parent->_vconjunct_ctx_ptr.get()));
+    if (parent->_common_vexpr_ctxs_pushdown != nullptr) {
         // Copy common_vexpr_ctxs_pushdown from scan node to this scanner's 
_common_vexpr_ctxs_pushdown, just necessary.
-        RETURN_IF_ERROR((*common_vexpr_ctxs_pushdown)->clone(_state, 
&_common_vexpr_ctxs_pushdown));
+        RETURN_IF_ERROR((*parent->_common_vexpr_ctxs_pushdown)
+                                ->clone(_state, &_common_vexpr_ctxs_pushdown));
     }
 
     // set limit to reduce end of rowset and segment mem use
@@ -69,8 +73,8 @@ Status NewOlapScanner::prepare(const TPaloScanRange& 
scan_range,
                     : std::min(static_cast<int64_t>(_state->batch_size()), 
_parent->limit()));
 
     // Get olap table
-    TTabletId tablet_id = scan_range.tablet_id;
-    _version = strtoul(scan_range.version.c_str(), nullptr, 10);
+    TTabletId tablet_id = _scan_range.tablet_id;
+    _version = strtoul(_scan_range.version.c_str(), nullptr, 10);
     {
         auto [tablet, status] =
                 
StorageEngine::instance()->tablet_manager()->get_tablet_and_status(tablet_id, 
true);
@@ -114,7 +118,7 @@ Status NewOlapScanner::prepare(const TPaloScanRange& 
scan_range,
 
         {
             std::shared_lock rdlock(_tablet->get_header_lock());
-            if (rs_readers.empty()) {
+            if (_tablet_reader_params.rs_readers.empty()) {
                 const RowsetSharedPtr rowset = 
_tablet->rowset_with_max_version();
                 if (rowset == nullptr) {
                     std::stringstream ss;
@@ -137,14 +141,12 @@ Status NewOlapScanner::prepare(const TPaloScanRange& 
scan_range,
                        << ", backend=" << BackendOptions::get_localhost();
                     return Status::InternalError(ss.str());
                 }
-            } else {
-                _tablet_reader_params.rs_readers = rs_readers;
-                _tablet_reader_params.rs_readers_segment_offsets = 
rs_reader_seg_offsets;
             }
 
             // Initialize tablet_reader_params
-            RETURN_IF_ERROR(_init_tablet_reader_params(key_ranges, filters, 
filter_predicates,
-                                                       function_filters));
+            RETURN_IF_ERROR(_init_tablet_reader_params(_key_ranges, 
parent->_olap_filters,
+                                                       
parent->_filter_predicates,
+                                                       
parent->_push_down_functions));
         }
     }
 
diff --git a/be/src/vec/exec/scan/new_olap_scanner.h 
b/be/src/vec/exec/scan/new_olap_scanner.h
index 2a04e021db..4b05880bcd 100644
--- a/be/src/vec/exec/scan/new_olap_scanner.h
+++ b/be/src/vec/exec/scan/new_olap_scanner.h
@@ -35,20 +35,17 @@ struct FilterPredicates;
 class NewOlapScanner : public VScanner {
 public:
     NewOlapScanner(RuntimeState* state, NewOlapScanNode* parent, int64_t 
limit, bool aggregation,
+                   const TPaloScanRange& scan_range, const 
std::vector<OlapScanRange*>& key_ranges,
+                   const std::vector<RowsetReaderSharedPtr>& rs_readers,
+                   const std::vector<std::pair<int, int>>& 
rs_reader_seg_offsets,
                    bool need_agg_finalize, RuntimeProfile* profile);
 
+    Status init() override;
+
     Status open(RuntimeState* state) override;
 
     Status close(RuntimeState* state) override;
 
-    Status prepare(const TPaloScanRange& scan_range, const 
std::vector<OlapScanRange*>& key_ranges,
-                   VExprContext** vconjunct_ctx_ptr, const 
std::vector<TCondition>& filters,
-                   const FilterPredicates& filter_predicates,
-                   const std::vector<FunctionFilter>& function_filters,
-                   VExprContext** common_vexpr_ctxs_pushdown,
-                   const std::vector<RowsetReaderSharedPtr>& rs_readers = {},
-                   const std::vector<std::pair<int, int>>& 
rs_reader_seg_offsets = {});
-
     const std::string& scan_disk() const { return _tablet->data_dir()->path(); 
}
 
     void set_compound_filters(const std::vector<TCondition>& compound_filters);
@@ -75,6 +72,8 @@ private:
     TabletSchemaSPtr _tablet_schema;
     TabletSharedPtr _tablet;
     int64_t _version;
+    const TPaloScanRange& _scan_range;
+    std::vector<OlapScanRange*> _key_ranges;
 
     TabletReader::ReaderParams _tablet_reader_params;
     std::unique_ptr<TabletReader> _tablet_reader;
diff --git a/be/src/vec/exec/scan/scanner_scheduler.cpp 
b/be/src/vec/exec/scan/scanner_scheduler.cpp
index 38121538f7..a6a76e57d7 100644
--- a/be/src/vec/exec/scan/scanner_scheduler.cpp
+++ b/be/src/vec/exec/scan/scanner_scheduler.cpp
@@ -260,7 +260,14 @@ void ScannerScheduler::_scanner_scan(ScannerScheduler* 
scheduler, ScannerContext
     bool eos = false;
     RuntimeState* state = ctx->state();
     DCHECK(nullptr != state);
-    if (!scanner->is_open()) {
+    if (!scanner->is_init()) {
+        status = scanner->init();
+        if (!status.ok()) {
+            ctx->set_status_on_error(status);
+            eos = true;
+        }
+    }
+    if (!eos && !scanner->is_open()) {
         status = scanner->open(state);
         if (!status.ok()) {
             ctx->set_status_on_error(status);
diff --git a/be/src/vec/exec/scan/vscan_node.cpp 
b/be/src/vec/exec/scan/vscan_node.cpp
index 3e9459071b..1286baa573 100644
--- a/be/src/vec/exec/scan/vscan_node.cpp
+++ b/be/src/vec/exec/scan/vscan_node.cpp
@@ -1363,7 +1363,6 @@ Status VScanNode::_prepare_scanners() {
         COUNTER_SET(_num_scanners, static_cast<int64_t>(scanners.size()));
         RETURN_IF_ERROR(_start_scanners(scanners));
     }
-
     return Status::OK();
 }
 } // namespace doris::vectorized
diff --git a/be/src/vec/exec/scan/vscanner.h b/be/src/vec/exec/scan/vscanner.h
index 88cac3db42..26248d9b54 100644
--- a/be/src/vec/exec/scan/vscanner.h
+++ b/be/src/vec/exec/scan/vscanner.h
@@ -39,7 +39,9 @@ class VScanner {
 public:
     VScanner(RuntimeState* state, VScanNode* parent, int64_t limit, 
RuntimeProfile* profile);
 
-    virtual ~VScanner() {}
+    virtual ~VScanner() = default;
+
+    virtual Status init() { return Status::OK(); }
 
     virtual Status open(RuntimeState* state) { return Status::OK(); }
 
@@ -67,6 +69,8 @@ public:
 
     int64_t get_rows_read() const { return _num_rows_read; }
 
+    bool is_init() const { return _is_init; }
+
     Status try_append_late_arrival_runtime_filter();
 
     // Call start_wait_worker_timer() when submit the scanner to the thread 
pool.
@@ -179,6 +183,8 @@ protected:
     // set to true after decrease the "_num_unfinished_scanners" in scanner 
context
     bool _is_counted_down = false;
 
+    bool _is_init = true;
+
     ScannerCounter _counter;
     int64_t _per_scanner_timer = 0;
 };


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to