This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new 49d0e72177d [Opt] (multi-catalog) opt max scanner thread number in 
batch split mode. (#50767)
49d0e72177d is described below

commit 49d0e72177d954f0e51d2c4c44f37d486d6f5e33
Author: Qi Chen <[email protected]>
AuthorDate: Thu May 15 15:51:32 2025 +0800

    [Opt] (multi-catalog) opt max scanner thread number in batch split mode. 
(#50767)
    
    Cherry-pick #44635
---
 be/src/pipeline/exec/file_scan_operator.cpp | 41 ++++++++++++++++++++---------
 be/src/pipeline/exec/file_scan_operator.h   |  6 +++++
 be/src/pipeline/exec/scan_operator.cpp      |  5 +++-
 be/src/pipeline/exec/scan_operator.h        |  6 +++++
 be/src/vec/exec/scan/scanner_context.cpp    |  9 +++----
 be/src/vec/exec/scan/scanner_context.h      |  4 ++-
 6 files changed, 52 insertions(+), 19 deletions(-)

diff --git a/be/src/pipeline/exec/file_scan_operator.cpp 
b/be/src/pipeline/exec/file_scan_operator.cpp
index 6fa7401e278..62994bc6db4 100644
--- a/be/src/pipeline/exec/file_scan_operator.cpp
+++ b/be/src/pipeline/exec/file_scan_operator.cpp
@@ -26,6 +26,7 @@
 #include "pipeline/exec/olap_scan_operator.h"
 #include "pipeline/exec/scan_operator.h"
 #include "vec/exec/format/format_common.h"
+#include "vec/exec/scan/scanner_context.h"
 #include "vec/exec/scan/vfile_scanner.h"
 
 namespace doris::pipeline {
@@ -37,8 +38,9 @@ Status 
FileScanLocalState::_init_scanners(std::list<vectorized::VScannerSPtr>* s
     }
 
     auto& p = _parent->cast<FileScanOperatorX>();
+    // There's only one scan range for each backend in batch split mode. Each 
backend only starts up one ScanNode instance.
     size_t shard_num = std::min<size_t>(
-            config::doris_scanner_thread_pool_thread_num / 
state()->query_parallel_instance_num(),
+            config::doris_scanner_thread_pool_thread_num / 
p.query_parallel_instance_num(),
             _max_scanners);
     shard_num = std::max(shard_num, (size_t)1);
     _kv_cache.reset(new vectorized::ShardedKVCache(shard_num));
@@ -60,28 +62,43 @@ std::string FileScanLocalState::name_suffix() const {
 
 void FileScanLocalState::set_scan_ranges(RuntimeState* state,
                                          const std::vector<TScanRangeParams>& 
scan_ranges) {
-    _max_scanners =
-            config::doris_scanner_thread_pool_thread_num / 
state->query_parallel_instance_num();
-    _max_scanners = std::max(std::max(_max_scanners, 
state->parallel_scan_max_scanners_count()), 1);
-    // For select * from table limit 10; should just use one thread.
-    if (should_run_serial()) {
-        _max_scanners = 1;
-    }
+    auto& p = _parent->cast<FileScanOperatorX>();
+
+    auto calc_max_scanners = [&](int parallel_instance_num) -> int {
+        int max_scanners = config::doris_scanner_thread_pool_thread_num / 
parallel_instance_num;
+        max_scanners =
+                std::max(std::max(max_scanners, 
state->parallel_scan_max_scanners_count()), 1);
+        if (should_run_serial()) {
+            max_scanners = 1;
+        }
+        return max_scanners;
+    };
+
     if (scan_ranges.size() == 1) {
         auto scan_range = 
scan_ranges[0].scan_range.ext_scan_range.file_scan_range;
         if (scan_range.__isset.split_source) {
+            p._batch_split_mode = true;
             auto split_source = scan_range.split_source;
             RuntimeProfile::Counter* get_split_timer = 
ADD_TIMER(_runtime_profile, "GetSplitTime");
+
+            _max_scanners = calc_max_scanners(p.query_parallel_instance_num());
             _split_source = 
std::make_shared<vectorized::RemoteSplitSourceConnector>(
                     state, get_split_timer, split_source.split_source_id, 
split_source.num_splits,
                     _max_scanners);
         }
     }
-    if (_split_source == nullptr) {
-        _split_source =
-                
std::make_shared<vectorized::LocalSplitSourceConnector>(scan_ranges, 
_max_scanners);
+
+    if (!p._batch_split_mode) {
+        _max_scanners = calc_max_scanners(p.query_parallel_instance_num());
+        if (_split_source == nullptr) {
+            _split_source = 
std::make_shared<vectorized::LocalSplitSourceConnector>(scan_ranges,
+                                                                               
     _max_scanners);
+        }
+        // currently the total number of splits in the bach split mode cannot 
be accurately obtained,
+        // so we don't do it in the batch split mode.
+        _max_scanners = std::min(_max_scanners, 
_split_source->num_scan_ranges());
     }
-    _max_scanners = std::min(_max_scanners, _split_source->num_scan_ranges());
+
     if (scan_ranges.size() > 0 &&
         
scan_ranges[0].scan_range.ext_scan_range.file_scan_range.__isset.params) {
         // for compatibility.
diff --git a/be/src/pipeline/exec/file_scan_operator.h 
b/be/src/pipeline/exec/file_scan_operator.h
index 2777a013d62..25635dcdd62 100644
--- a/be/src/pipeline/exec/file_scan_operator.h
+++ b/be/src/pipeline/exec/file_scan_operator.h
@@ -80,10 +80,16 @@ public:
 
     bool is_file_scan_operator() const override { return true; }
 
+    // There's only one scan range for each backend in batch split mode. Each 
backend only starts up one ScanNode instance.
+    int query_parallel_instance_num() const override {
+        return _batch_split_mode ? 1 : _query_parallel_instance_num;
+    }
+
 private:
     friend class FileScanLocalState;
 
     const std::string _table_name;
+    bool _batch_split_mode = false;
 };
 
 } // namespace doris::pipeline
diff --git a/be/src/pipeline/exec/scan_operator.cpp 
b/be/src/pipeline/exec/scan_operator.cpp
index 850bb81d2c5..686fe6d2960 100644
--- a/be/src/pipeline/exec/scan_operator.cpp
+++ b/be/src/pipeline/exec/scan_operator.cpp
@@ -1004,7 +1004,8 @@ Status ScanLocalState<Derived>::_start_scanners(
     auto& p = _parent->cast<typename Derived::Parent>();
     _scanner_ctx = vectorized::ScannerContext::create_shared(
             state(), this, p._output_tuple_desc, p.output_row_descriptor(), 
scanners, p.limit(),
-            _scan_dependency, p.is_serial_operator(), 
p.is_file_scan_operator());
+            _scan_dependency, p.is_serial_operator(), 
p.is_file_scan_operator(),
+            p.query_parallel_instance_num());
     return Status::OK();
 }
 
@@ -1220,6 +1221,8 @@ Status ScanOperatorX<LocalStateType>::init(const 
TPlanNode& tnode, RuntimeState*
         }
     }
 
+    _query_parallel_instance_num = state->query_parallel_instance_num();
+
     return Status::OK();
 }
 
diff --git a/be/src/pipeline/exec/scan_operator.h 
b/be/src/pipeline/exec/scan_operator.h
index 4519a3ca283..ea7c965c522 100644
--- a/be/src/pipeline/exec/scan_operator.h
+++ b/be/src/pipeline/exec/scan_operator.h
@@ -371,6 +371,10 @@ public:
 
     [[nodiscard]] virtual bool is_file_scan_operator() const { return false; }
 
+    [[nodiscard]] virtual int query_parallel_instance_num() const {
+        return _query_parallel_instance_num;
+    }
+
     const std::vector<TRuntimeFilterDesc>& runtime_filter_descs() override {
         return _runtime_filter_descs;
     }
@@ -433,6 +437,8 @@ protected:
     int64_t _push_down_count = -1;
     const int _parallel_tasks = 0;
 
+    int _query_parallel_instance_num = 0;
+
     std::vector<int> topn_filter_source_node_ids;
 };
 
diff --git a/be/src/vec/exec/scan/scanner_context.cpp 
b/be/src/vec/exec/scan/scanner_context.cpp
index 5a33aec2de8..15afb3254ca 100644
--- a/be/src/vec/exec/scan/scanner_context.cpp
+++ b/be/src/vec/exec/scan/scanner_context.cpp
@@ -47,7 +47,7 @@ ScannerContext::ScannerContext(
         const TupleDescriptor* output_tuple_desc, const RowDescriptor* 
output_row_descriptor,
         const std::list<std::shared_ptr<vectorized::ScannerDelegate>>& 
scanners, int64_t limit_,
         std::shared_ptr<pipeline::Dependency> dependency, bool 
ignore_data_distribution,
-        bool is_file_scan_operator)
+        bool is_file_scan_operator, int num_parallel_instances)
         : HasTaskExecutionCtx(state),
           _state(state),
           _local_state(local_state),
@@ -60,7 +60,8 @@ ScannerContext::ScannerContext(
           _scanner_scheduler_global(state->exec_env()->scanner_scheduler()),
           _all_scanners(scanners.begin(), scanners.end()),
           _ignore_data_distribution(ignore_data_distribution),
-          _is_file_scan_operator(is_file_scan_operator) {
+          _is_file_scan_operator(is_file_scan_operator),
+          _num_parallel_instances(num_parallel_instances) {
     DCHECK(_output_row_descriptor == nullptr ||
            _output_row_descriptor->tuple_descriptors().size() == 1);
     _query_id = _state->get_query_ctx()->query_id();
@@ -105,8 +106,6 @@ Status ScannerContext::init() {
     _local_state->_runtime_profile->add_info_string("UseSpecificThreadToken",
                                                     thread_token == nullptr ? 
"False" : "True");
 
-    const int num_parallel_instances = _state->query_parallel_instance_num();
-
     // _max_bytes_in_queue controls the maximum memory that can be used by a 
single scan instance.
     // scan_queue_mem_limit on FE is 100MB by default, on backend we will make 
sure its actual value
     // is larger than 10MB.
@@ -176,7 +175,7 @@ Status ScannerContext::init() {
         } else {
             const size_t factor = _is_file_scan_operator ? 1 : 4;
             _max_thread_num = factor * 
(config::doris_scanner_thread_pool_thread_num /
-                                        num_parallel_instances);
+                                        _num_parallel_instances);
             // In some rare cases, user may set num_parallel_instances to 1 
handly to make many query could be executed
             // in parallel. We need to make sure the _max_thread_num is 
smaller than previous value.
             _max_thread_num =
diff --git a/be/src/vec/exec/scan/scanner_context.h 
b/be/src/vec/exec/scan/scanner_context.h
index 1604e1224f4..47bc5896926 100644
--- a/be/src/vec/exec/scan/scanner_context.h
+++ b/be/src/vec/exec/scan/scanner_context.h
@@ -107,7 +107,8 @@ public:
                    const RowDescriptor* output_row_descriptor,
                    const 
std::list<std::shared_ptr<vectorized::ScannerDelegate>>& scanners,
                    int64_t limit_, std::shared_ptr<pipeline::Dependency> 
dependency,
-                   bool ignore_data_distribution, bool is_file_scan_operator);
+                   bool ignore_data_distribution, bool is_file_scan_operator,
+                   int num_parallel_instances);
 
     ~ScannerContext() override {
         
SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(_query_thread_context.query_mem_tracker);
@@ -213,6 +214,7 @@ protected:
     std::shared_ptr<pipeline::Dependency> _dependency = nullptr;
     bool _ignore_data_distribution = false;
     bool _is_file_scan_operator = false;
+    int _num_parallel_instances;
 
     // for scaling up the running scanners
     size_t _estimated_block_size = 0;


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to