This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new e20cab64f4d [improvement](scan) avoid too many scanners for file scan 
node (#25727)
e20cab64f4d is described below

commit e20cab64f4dba11aac8b2acbcbc69f57a7f3606f
Author: Mingyu Chen <morning...@163.com>
AuthorDate: Sun Oct 29 17:41:31 2023 +0800

    [improvement](scan) avoid too many scanners for file scan node (#25727)
    
    In previous, when using file scan node(eq, querying hive table), the max 
number of scanner for each scan node
    will be the `doris_scanner_thread_pool_thread_num`(default is 48).
    And if the query parallelism is N, the total number of scanner would be 48 
* N, which is too many.
    
    In this PR, I change the logic, the max number of scanner for each scan node
    will be the `doris_scanner_thread_pool_thread_num / query parallelism`. So 
that the total number of scanners
    will be up to `doris_scanner_thread_pool_thread_num`.
    
    Reduce the number of scanner can significantly reduce the memory usage of 
query.
---
 be/src/exec/exec_node.cpp                       | 5 +++--
 be/src/exec/scan_node.h                         | 3 ++-
 be/src/io/fs/multi_table_pipe.cpp               | 4 ++--
 be/src/pipeline/exec/es_scan_operator.cpp       | 3 ++-
 be/src/pipeline/exec/es_scan_operator.h         | 3 ++-
 be/src/pipeline/exec/file_scan_operator.cpp     | 7 +++++--
 be/src/pipeline/exec/file_scan_operator.h       | 3 ++-
 be/src/pipeline/exec/meta_scan_operator.cpp     | 3 ++-
 be/src/pipeline/exec/meta_scan_operator.h       | 3 ++-
 be/src/pipeline/exec/olap_scan_operator.cpp     | 3 ++-
 be/src/pipeline/exec/olap_scan_operator.h       | 3 ++-
 be/src/pipeline/exec/scan_operator.cpp          | 2 +-
 be/src/pipeline/exec/scan_operator.h            | 6 ++++--
 be/src/pipeline/pipeline_fragment_context.cpp   | 6 +++---
 be/src/runtime/group_commit_mgr.cpp             | 4 ++--
 be/src/runtime/plan_fragment_executor.cpp       | 4 ++--
 be/src/service/backend_service.cpp              | 4 ++--
 be/src/vec/exec/scan/group_commit_scan_node.cpp | 3 ++-
 be/src/vec/exec/scan/group_commit_scan_node.h   | 3 ++-
 be/src/vec/exec/scan/new_es_scan_node.cpp       | 3 ++-
 be/src/vec/exec/scan/new_es_scan_node.h         | 3 ++-
 be/src/vec/exec/scan/new_file_scan_node.cpp     | 8 +++++---
 be/src/vec/exec/scan/new_file_scan_node.h       | 3 ++-
 be/src/vec/exec/scan/new_odbc_scan_node.h       | 3 ++-
 be/src/vec/exec/scan/new_olap_scan_node.cpp     | 3 ++-
 be/src/vec/exec/scan/new_olap_scan_node.h       | 3 ++-
 be/src/vec/exec/scan/vmeta_scan_node.cpp        | 3 ++-
 be/src/vec/exec/scan/vmeta_scan_node.h          | 5 +++--
 be/src/vec/exec/scan/vscan_node.h               | 3 ++-
 be/src/vec/exec/vdata_gen_scan_node.cpp         | 3 ++-
 be/src/vec/exec/vdata_gen_scan_node.h           | 3 ++-
 be/src/vec/exec/vmysql_scan_node.cpp            | 5 +++--
 be/src/vec/exec/vmysql_scan_node.h              | 5 +++--
 be/src/vec/exec/vschema_scan_node.cpp           | 3 ++-
 be/src/vec/exec/vschema_scan_node.h             | 5 +++--
 35 files changed, 83 insertions(+), 50 deletions(-)

diff --git a/be/src/exec/exec_node.cpp b/be/src/exec/exec_node.cpp
index 16964264edf..ed2c54a0704 100644
--- a/be/src/exec/exec_node.cpp
+++ b/be/src/exec/exec_node.cpp
@@ -213,8 +213,6 @@ Status ExecNode::close(RuntimeState* state) {
                   << " already closed";
         return Status::OK();
     }
-    LOG(INFO) << "query= " << print_id(state->query_id())
-              << " fragment_instance_id=" << 
print_id(state->fragment_instance_id()) << " closed";
     _is_closed = true;
 
     Status result;
@@ -228,6 +226,9 @@ Status ExecNode::close(RuntimeState* state) {
         _peak_memory_usage_counter->set(_mem_tracker->peak_consumption());
     }
     release_resource(state);
+    LOG(INFO) << "query= " << print_id(state->query_id())
+              << ", fragment_instance_id=" << 
print_id(state->fragment_instance_id())
+              << ", id=" << _id << " type=" << print_plan_node_type(_type) << 
" closed";
     return result;
 }
 
diff --git a/be/src/exec/scan_node.h b/be/src/exec/scan_node.h
index 2514b0d2320..c127a7c7b17 100644
--- a/be/src/exec/scan_node.h
+++ b/be/src/exec/scan_node.h
@@ -83,7 +83,8 @@ public:
 
     // Convert scan_ranges into node-specific scan restrictions.  This should 
be
     // called after prepare()
-    virtual Status set_scan_ranges(const std::vector<TScanRangeParams>& 
scan_ranges) = 0;
+    virtual Status set_scan_ranges(RuntimeState* state,
+                                   const std::vector<TScanRangeParams>& 
scan_ranges) = 0;
 
     bool is_scan_node() const override { return true; }
 
diff --git a/be/src/io/fs/multi_table_pipe.cpp 
b/be/src/io/fs/multi_table_pipe.cpp
index 3f2f1d19914..1036f7a30df 100644
--- a/be/src/io/fs/multi_table_pipe.cpp
+++ b/be/src/io/fs/multi_table_pipe.cpp
@@ -216,7 +216,7 @@ Status MultiTablePipe::exec_plans(ExecEnv* exec_env, 
std::vector<ExecParam> para
         if constexpr (std::is_same_v<ExecParam, TExecPlanFragmentParams>) {
             RETURN_IF_ERROR(
                     putPipe(plan.params.fragment_instance_id, 
_planned_pipes[plan.table_name]));
-            LOG(INFO) << "fragment_instance_id=" << 
plan.params.fragment_instance_id
+            LOG(INFO) << "fragment_instance_id=" << 
print_id(plan.params.fragment_instance_id)
                       << " table=" << plan.table_name;
         } else if constexpr (std::is_same_v<ExecParam, 
TPipelineFragmentParams>) {
             auto pipe_id = calculate_pipe_id(plan.query_id, plan.fragment_id);
@@ -334,4 +334,4 @@ template Status MultiTablePipe::exec_plans(ExecEnv* 
exec_env,
                                            
std::vector<TPipelineFragmentParams> params);
 
 } // namespace io
-} // namespace doris
\ No newline at end of file
+} // namespace doris
diff --git a/be/src/pipeline/exec/es_scan_operator.cpp 
b/be/src/pipeline/exec/es_scan_operator.cpp
index 901daa9a437..9b41155a22b 100644
--- a/be/src/pipeline/exec/es_scan_operator.cpp
+++ b/be/src/pipeline/exec/es_scan_operator.cpp
@@ -106,7 +106,8 @@ Status 
EsScanLocalState::_init_scanners(std::list<vectorized::VScannerSPtr>* sca
     return Status::OK();
 }
 
-void EsScanLocalState::set_scan_ranges(const std::vector<TScanRangeParams>& 
scan_ranges) {
+void EsScanLocalState::set_scan_ranges(RuntimeState* state,
+                                       const std::vector<TScanRangeParams>& 
scan_ranges) {
     for (auto& es_scan_range : scan_ranges) {
         DCHECK(es_scan_range.scan_range.__isset.es_scan_range);
         _scan_ranges.emplace_back(new 
TEsScanRange(es_scan_range.scan_range.es_scan_range));
diff --git a/be/src/pipeline/exec/es_scan_operator.h 
b/be/src/pipeline/exec/es_scan_operator.h
index 7c8a5ceae54..6f02008383b 100644
--- a/be/src/pipeline/exec/es_scan_operator.h
+++ b/be/src/pipeline/exec/es_scan_operator.h
@@ -48,7 +48,8 @@ public:
 private:
     friend class vectorized::NewEsScanner;
 
-    void set_scan_ranges(const std::vector<TScanRangeParams>& scan_ranges) 
override;
+    void set_scan_ranges(RuntimeState* state,
+                         const std::vector<TScanRangeParams>& scan_ranges) 
override;
     Status _init_profile() override;
     Status _process_conjuncts() override;
     Status _init_scanners(std::list<vectorized::VScannerSPtr>* scanners) 
override;
diff --git a/be/src/pipeline/exec/file_scan_operator.cpp 
b/be/src/pipeline/exec/file_scan_operator.cpp
index 9fdafc734c2..019f5813a42 100644
--- a/be/src/pipeline/exec/file_scan_operator.cpp
+++ b/be/src/pipeline/exec/file_scan_operator.cpp
@@ -52,8 +52,11 @@ Status 
FileScanLocalState::_init_scanners(std::list<vectorized::VScannerSPtr>* s
     return Status::OK();
 }
 
-void FileScanLocalState::set_scan_ranges(const std::vector<TScanRangeParams>& 
scan_ranges) {
-    int max_scanners = config::doris_scanner_thread_pool_thread_num;
+void FileScanLocalState::set_scan_ranges(RuntimeState* state,
+                                         const std::vector<TScanRangeParams>& 
scan_ranges) {
+    int max_scanners =
+            config::doris_scanner_thread_pool_thread_num / 
state->query_parallel_instance_num();
+    max_scanners = max_scanners == 0 ? 1 : max_scanners;
     if (scan_ranges.size() <= max_scanners) {
         _scan_ranges = scan_ranges;
     } else {
diff --git a/be/src/pipeline/exec/file_scan_operator.h 
b/be/src/pipeline/exec/file_scan_operator.h
index df54a22e611..4648ed716a0 100644
--- a/be/src/pipeline/exec/file_scan_operator.h
+++ b/be/src/pipeline/exec/file_scan_operator.h
@@ -51,7 +51,8 @@ public:
 
     Status _process_conjuncts() override;
     Status _init_scanners(std::list<vectorized::VScannerSPtr>* scanners) 
override;
-    void set_scan_ranges(const std::vector<TScanRangeParams>& scan_ranges) 
override;
+    void set_scan_ranges(RuntimeState* state,
+                         const std::vector<TScanRangeParams>& scan_ranges) 
override;
     int parent_id() { return _parent->node_id(); }
 
 private:
diff --git a/be/src/pipeline/exec/meta_scan_operator.cpp 
b/be/src/pipeline/exec/meta_scan_operator.cpp
index 326b9f9e258..c2f9bc7e428 100644
--- a/be/src/pipeline/exec/meta_scan_operator.cpp
+++ b/be/src/pipeline/exec/meta_scan_operator.cpp
@@ -39,7 +39,8 @@ Status 
MetaScanLocalState::_init_scanners(std::list<vectorized::VScannerSPtr>* s
     return Status::OK();
 }
 
-void MetaScanLocalState::set_scan_ranges(const std::vector<TScanRangeParams>& 
scan_ranges) {
+void MetaScanLocalState::set_scan_ranges(RuntimeState* state,
+                                         const std::vector<TScanRangeParams>& 
scan_ranges) {
     _scan_ranges = scan_ranges;
 }
 
diff --git a/be/src/pipeline/exec/meta_scan_operator.h 
b/be/src/pipeline/exec/meta_scan_operator.h
index ed371847a1b..1bfda6c9b83 100644
--- a/be/src/pipeline/exec/meta_scan_operator.h
+++ b/be/src/pipeline/exec/meta_scan_operator.h
@@ -48,7 +48,8 @@ public:
 private:
     friend class vectorized::NewOlapScanner;
 
-    void set_scan_ranges(const std::vector<TScanRangeParams>& scan_ranges) 
override;
+    void set_scan_ranges(RuntimeState* state,
+                         const std::vector<TScanRangeParams>& scan_ranges) 
override;
     Status _init_scanners(std::list<vectorized::VScannerSPtr>* scanners) 
override;
     Status _process_conjuncts() override;
 
diff --git a/be/src/pipeline/exec/olap_scan_operator.cpp 
b/be/src/pipeline/exec/olap_scan_operator.cpp
index b4e61af2051..5ea76ddd0d6 100644
--- a/be/src/pipeline/exec/olap_scan_operator.cpp
+++ b/be/src/pipeline/exec/olap_scan_operator.cpp
@@ -293,7 +293,8 @@ TOlapScanNode& OlapScanLocalState::olap_scan_node() {
     return _parent->cast<OlapScanOperatorX>()._olap_scan_node;
 }
 
-void OlapScanLocalState::set_scan_ranges(const std::vector<TScanRangeParams>& 
scan_ranges) {
+void OlapScanLocalState::set_scan_ranges(RuntimeState* state,
+                                         const std::vector<TScanRangeParams>& 
scan_ranges) {
     for (auto& scan_range : scan_ranges) {
         DCHECK(scan_range.scan_range.__isset.palo_scan_range);
         _scan_ranges.emplace_back(new 
TPaloScanRange(scan_range.scan_range.palo_scan_range));
diff --git a/be/src/pipeline/exec/olap_scan_operator.h 
b/be/src/pipeline/exec/olap_scan_operator.h
index 5f7c86c9c3f..0527fa6f44d 100644
--- a/be/src/pipeline/exec/olap_scan_operator.h
+++ b/be/src/pipeline/exec/olap_scan_operator.h
@@ -50,7 +50,8 @@ public:
 private:
     friend class vectorized::NewOlapScanner;
 
-    void set_scan_ranges(const std::vector<TScanRangeParams>& scan_ranges) 
override;
+    void set_scan_ranges(RuntimeState* state,
+                         const std::vector<TScanRangeParams>& scan_ranges) 
override;
     Status _init_profile() override;
     Status _process_conjuncts() override;
     bool _is_key_column(const std::string& col_name) override;
diff --git a/be/src/pipeline/exec/scan_operator.cpp 
b/be/src/pipeline/exec/scan_operator.cpp
index 2df1755c811..71876045257 100644
--- a/be/src/pipeline/exec/scan_operator.cpp
+++ b/be/src/pipeline/exec/scan_operator.cpp
@@ -131,7 +131,7 @@ Status ScanLocalState<Derived>::init(RuntimeState* state, 
LocalStateInfo& info)
     _filter_dependency->set_filter_blocked_by_fn(
             [this]() { return this->runtime_filters_are_ready_or_timeout(); });
     auto& p = _parent->cast<typename Derived::Parent>();
-    set_scan_ranges(info.scan_ranges);
+    set_scan_ranges(state, info.scan_ranges);
     _common_expr_ctxs_push_down.resize(p._common_expr_ctxs_push_down.size());
     for (size_t i = 0; i < _common_expr_ctxs_push_down.size(); i++) {
         RETURN_IF_ERROR(
diff --git a/be/src/pipeline/exec/scan_operator.h 
b/be/src/pipeline/exec/scan_operator.h
index 022b57ee9bf..c71811c831d 100644
--- a/be/src/pipeline/exec/scan_operator.h
+++ b/be/src/pipeline/exec/scan_operator.h
@@ -136,7 +136,8 @@ public:
     [[nodiscard]] virtual int runtime_filter_num() const = 0;
 
     virtual Status clone_conjunct_ctxs(vectorized::VExprContextSPtrs& 
conjuncts) = 0;
-    virtual void set_scan_ranges(const std::vector<TScanRangeParams>& 
scan_ranges) = 0;
+    virtual void set_scan_ranges(RuntimeState* state,
+                                 const std::vector<TScanRangeParams>& 
scan_ranges) = 0;
 
     virtual TPushAggOp::type get_push_down_agg_type() = 0;
 
@@ -219,7 +220,8 @@ class ScanLocalState : public ScanLocalStateBase {
     }
 
     Status clone_conjunct_ctxs(vectorized::VExprContextSPtrs& conjuncts) 
override;
-    virtual void set_scan_ranges(const std::vector<TScanRangeParams>& 
scan_ranges) override {}
+    virtual void set_scan_ranges(RuntimeState* state,
+                                 const std::vector<TScanRangeParams>& 
scan_ranges) override {}
 
     TPushAggOp::type get_push_down_agg_type() override;
 
diff --git a/be/src/pipeline/pipeline_fragment_context.cpp 
b/be/src/pipeline/pipeline_fragment_context.cpp
index 686ff0fe0ad..4616ff2fb49 100644
--- a/be/src/pipeline/pipeline_fragment_context.cpp
+++ b/be/src/pipeline/pipeline_fragment_context.cpp
@@ -300,15 +300,15 @@ Status PipelineFragmentContext::prepare(const 
doris::TPipelineFragmentParams& re
                                                  no_scan_ranges);
             const bool shared_scan =
                     find_with_default(local_params.per_node_shared_scans, 
scan_node->id(), false);
-            scan_node->set_scan_ranges(scan_ranges);
+            scan_node->set_scan_ranges(_runtime_state.get(), scan_ranges);
             scan_node->set_shared_scan(_runtime_state.get(), shared_scan);
         } else {
             ScanNode* scan_node = static_cast<ScanNode*>(node);
             auto scan_ranges = 
find_with_default(local_params.per_node_scan_ranges, scan_node->id(),
                                                  no_scan_ranges);
-            static_cast<void>(scan_node->set_scan_ranges(scan_ranges));
+            static_cast<void>(scan_node->set_scan_ranges(_runtime_state.get(), 
scan_ranges));
             VLOG_CRITICAL << "query " << print_id(get_query_id())
-                          << "scan_node_Id=" << scan_node->id()
+                          << " scan_node_id=" << scan_node->id()
                           << " size=" << scan_ranges.get().size();
         }
     }
diff --git a/be/src/runtime/group_commit_mgr.cpp 
b/be/src/runtime/group_commit_mgr.cpp
index be49cca5258..1bdb5b586f0 100644
--- a/be/src/runtime/group_commit_mgr.cpp
+++ b/be/src/runtime/group_commit_mgr.cpp
@@ -464,7 +464,7 @@ Status GroupCommitMgr::group_commit_insert(int64_t 
table_id, const TPlan& plan,
         RETURN_IF_ERROR(file_scan_node.prepare(runtime_state.get()));
         std::vector<TScanRangeParams> params_vector;
         params_vector.emplace_back(scan_range_params);
-        file_scan_node.set_scan_ranges(params_vector);
+        file_scan_node.set_scan_ranges(runtime_state.get(), params_vector);
         RETURN_IF_ERROR(file_scan_node.open(runtime_state.get()));
 
         // 3. Put the block into block queue.
@@ -557,4 +557,4 @@ Status GroupCommitMgr::get_load_block_queue(int64_t 
table_id, const TUniqueId& i
     }
     return group_commit_table->get_load_block_queue(instance_id, 
load_block_queue);
 }
-} // namespace doris
\ No newline at end of file
+} // namespace doris
diff --git a/be/src/runtime/plan_fragment_executor.cpp 
b/be/src/runtime/plan_fragment_executor.cpp
index 78d48327abf..a58744a5692 100644
--- a/be/src/runtime/plan_fragment_executor.cpp
+++ b/be/src/runtime/plan_fragment_executor.cpp
@@ -211,12 +211,12 @@ Status PlanFragmentExecutor::prepare(const 
TExecPlanFragmentParams& request) {
             vectorized::VScanNode* scan_node = 
static_cast<vectorized::VScanNode*>(scan_nodes[i]);
             auto scan_ranges =
                     find_with_default(params.per_node_scan_ranges, 
scan_node->id(), no_scan_ranges);
-            scan_node->set_scan_ranges(scan_ranges);
+            scan_node->set_scan_ranges(runtime_state(), scan_ranges);
         } else {
             ScanNode* scan_node = static_cast<ScanNode*>(scan_nodes[i]);
             auto scan_ranges =
                     find_with_default(params.per_node_scan_ranges, 
scan_node->id(), no_scan_ranges);
-            static_cast<void>(scan_node->set_scan_ranges(scan_ranges));
+            static_cast<void>(scan_node->set_scan_ranges(runtime_state(), 
scan_ranges));
             VLOG_CRITICAL << "scan_node_Id=" << scan_node->id()
                           << " size=" << scan_ranges.get().size();
         }
diff --git a/be/src/service/backend_service.cpp 
b/be/src/service/backend_service.cpp
index 4374a789caf..a312996162c 100644
--- a/be/src/service/backend_service.cpp
+++ b/be/src/service/backend_service.cpp
@@ -108,7 +108,7 @@ Status BackendService::create_service(ExecEnv* exec_env, 
int port,
 
 void BackendService::exec_plan_fragment(TExecPlanFragmentResult& return_val,
                                         const TExecPlanFragmentParams& params) 
{
-    LOG(INFO) << "exec_plan_fragment() instance_id=" << 
params.params.fragment_instance_id
+    LOG(INFO) << "exec_plan_fragment() instance_id=" << 
print_id(params.params.fragment_instance_id)
               << " coord=" << params.coord << " backend#=" << 
params.backend_num;
     start_plan_fragment_execution(params).set_t_status(&return_val);
 }
@@ -122,7 +122,7 @@ Status BackendService::start_plan_fragment_execution(const 
TExecPlanFragmentPara
 
 void BackendService::cancel_plan_fragment(TCancelPlanFragmentResult& 
return_val,
                                           const TCancelPlanFragmentParams& 
params) {
-    LOG(INFO) << "cancel_plan_fragment(): instance_id=" << 
params.fragment_instance_id;
+    LOG(INFO) << "cancel_plan_fragment(): instance_id=" << 
print_id(params.fragment_instance_id);
     _exec_env->fragment_mgr()->cancel_instance(params.fragment_instance_id,
                                                
PPlanFragmentCancelReason::INTERNAL_ERROR);
 }
diff --git a/be/src/vec/exec/scan/group_commit_scan_node.cpp 
b/be/src/vec/exec/scan/group_commit_scan_node.cpp
index d5bc4565772..d78c165a3dc 100644
--- a/be/src/vec/exec/scan/group_commit_scan_node.cpp
+++ b/be/src/vec/exec/scan/group_commit_scan_node.cpp
@@ -50,7 +50,8 @@ Status GroupCommitScanNode::prepare(RuntimeState* state) {
     return VScanNode::prepare(state);
 }
 
-void GroupCommitScanNode::set_scan_ranges(const std::vector<TScanRangeParams>& 
scan_ranges) {}
+void GroupCommitScanNode::set_scan_ranges(RuntimeState* state,
+                                          const std::vector<TScanRangeParams>& 
scan_ranges) {}
 
 Status GroupCommitScanNode::_init_profile() {
     return VScanNode::_init_profile();
diff --git a/be/src/vec/exec/scan/group_commit_scan_node.h 
b/be/src/vec/exec/scan/group_commit_scan_node.h
index 86c504d5819..36a29c0ae66 100644
--- a/be/src/vec/exec/scan/group_commit_scan_node.h
+++ b/be/src/vec/exec/scan/group_commit_scan_node.h
@@ -31,7 +31,8 @@ public:
 
     Status prepare(RuntimeState* state) override;
 
-    void set_scan_ranges(const std::vector<TScanRangeParams>& scan_ranges) 
override;
+    void set_scan_ranges(RuntimeState* state,
+                         const std::vector<TScanRangeParams>& scan_ranges) 
override;
 
     std::string get_name() override;
 
diff --git a/be/src/vec/exec/scan/new_es_scan_node.cpp 
b/be/src/vec/exec/scan/new_es_scan_node.cpp
index 6a3ec3a7389..4704f5eb8c6 100644
--- a/be/src/vec/exec/scan/new_es_scan_node.cpp
+++ b/be/src/vec/exec/scan/new_es_scan_node.cpp
@@ -109,7 +109,8 @@ Status NewEsScanNode::prepare(RuntimeState* state) {
     return Status::OK();
 }
 
-void NewEsScanNode::set_scan_ranges(const std::vector<TScanRangeParams>& 
scan_ranges) {
+void NewEsScanNode::set_scan_ranges(RuntimeState* state,
+                                    const std::vector<TScanRangeParams>& 
scan_ranges) {
     for (auto& es_scan_range : scan_ranges) {
         DCHECK(es_scan_range.scan_range.__isset.es_scan_range);
         _scan_ranges.emplace_back(new 
TEsScanRange(es_scan_range.scan_range.es_scan_range));
diff --git a/be/src/vec/exec/scan/new_es_scan_node.h 
b/be/src/vec/exec/scan/new_es_scan_node.h
index 2cb6193d644..5185ead56bc 100644
--- a/be/src/vec/exec/scan/new_es_scan_node.h
+++ b/be/src/vec/exec/scan/new_es_scan_node.h
@@ -55,7 +55,8 @@ public:
     std::string get_name() override;
     Status init(const TPlanNode& tnode, RuntimeState* state = nullptr) 
override;
     Status prepare(RuntimeState* state) override;
-    void set_scan_ranges(const std::vector<TScanRangeParams>& scan_ranges) 
override;
+    void set_scan_ranges(RuntimeState* state,
+                         const std::vector<TScanRangeParams>& scan_ranges) 
override;
 
 protected:
     Status _init_profile() override;
diff --git a/be/src/vec/exec/scan/new_file_scan_node.cpp 
b/be/src/vec/exec/scan/new_file_scan_node.cpp
index 71c6c79b61b..43f43722e9c 100644
--- a/be/src/vec/exec/scan/new_file_scan_node.cpp
+++ b/be/src/vec/exec/scan/new_file_scan_node.cpp
@@ -58,8 +58,11 @@ Status NewFileScanNode::prepare(RuntimeState* state) {
     return Status::OK();
 }
 
-void NewFileScanNode::set_scan_ranges(const std::vector<TScanRangeParams>& 
scan_ranges) {
-    int max_scanners = config::doris_scanner_thread_pool_thread_num;
+void NewFileScanNode::set_scan_ranges(RuntimeState* state,
+                                      const std::vector<TScanRangeParams>& 
scan_ranges) {
+    int max_scanners =
+            config::doris_scanner_thread_pool_thread_num / 
state->query_parallel_instance_num();
+    max_scanners = max_scanners == 0 ? 1 : max_scanners;
     if (scan_ranges.size() <= max_scanners) {
         _scan_ranges = scan_ranges;
     } else {
@@ -122,7 +125,6 @@ Status 
NewFileScanNode::_init_scanners(std::list<VScannerSPtr>* scanners) {
                 scanner->prepare(_conjuncts, &_colname_to_value_range, 
&_colname_to_slot_id));
         scanners->push_back(std::move(scanner));
     }
-
     return Status::OK();
 }
 
diff --git a/be/src/vec/exec/scan/new_file_scan_node.h 
b/be/src/vec/exec/scan/new_file_scan_node.h
index 740e354db5f..e6d9fc1e621 100644
--- a/be/src/vec/exec/scan/new_file_scan_node.h
+++ b/be/src/vec/exec/scan/new_file_scan_node.h
@@ -47,7 +47,8 @@ public:
 
     Status prepare(RuntimeState* state) override;
 
-    void set_scan_ranges(const std::vector<TScanRangeParams>& scan_ranges) 
override;
+    void set_scan_ranges(RuntimeState* state,
+                         const std::vector<TScanRangeParams>& scan_ranges) 
override;
 
     std::string get_name() override;
 
diff --git a/be/src/vec/exec/scan/new_odbc_scan_node.h 
b/be/src/vec/exec/scan/new_odbc_scan_node.h
index f76d2b645be..c6dde0ab3dc 100644
--- a/be/src/vec/exec/scan/new_odbc_scan_node.h
+++ b/be/src/vec/exec/scan/new_odbc_scan_node.h
@@ -46,7 +46,8 @@ public:
     std::string get_name() override;
 
     // no use
-    void set_scan_ranges(const std::vector<TScanRangeParams>& scan_ranges) 
override {}
+    void set_scan_ranges(RuntimeState* state,
+                         const std::vector<TScanRangeParams>& scan_ranges) 
override {}
 
 protected:
     Status _init_profile() override;
diff --git a/be/src/vec/exec/scan/new_olap_scan_node.cpp 
b/be/src/vec/exec/scan/new_olap_scan_node.cpp
index 3bbff6c20a6..8c95391d48f 100644
--- a/be/src/vec/exec/scan/new_olap_scan_node.cpp
+++ b/be/src/vec/exec/scan/new_olap_scan_node.cpp
@@ -402,7 +402,8 @@ Status 
NewOlapScanNode::_should_push_down_function_filter(VectorizedFnCall* fn_c
 //  9: optional string table_name
 //}
 // every doris_scan_range is related with one tablet so that one olap scan 
node contains multiple tablet
-void NewOlapScanNode::set_scan_ranges(const std::vector<TScanRangeParams>& 
scan_ranges) {
+void NewOlapScanNode::set_scan_ranges(RuntimeState* state,
+                                      const std::vector<TScanRangeParams>& 
scan_ranges) {
     for (auto& scan_range : scan_ranges) {
         DCHECK(scan_range.scan_range.__isset.palo_scan_range);
         _scan_ranges.emplace_back(new 
TPaloScanRange(scan_range.scan_range.palo_scan_range));
diff --git a/be/src/vec/exec/scan/new_olap_scan_node.h 
b/be/src/vec/exec/scan/new_olap_scan_node.h
index a26c300ffc4..93039c6182a 100644
--- a/be/src/vec/exec/scan/new_olap_scan_node.h
+++ b/be/src/vec/exec/scan/new_olap_scan_node.h
@@ -66,7 +66,8 @@ public:
     Status collect_query_statistics(QueryStatistics* statistics) override;
     Status collect_query_statistics(QueryStatistics* statistics, int 
sender_id) override;
 
-    void set_scan_ranges(const std::vector<TScanRangeParams>& scan_ranges) 
override;
+    void set_scan_ranges(RuntimeState* state,
+                         const std::vector<TScanRangeParams>& scan_ranges) 
override;
 
     std::string get_name() override;
 
diff --git a/be/src/vec/exec/scan/vmeta_scan_node.cpp 
b/be/src/vec/exec/scan/vmeta_scan_node.cpp
index 3bdcbfbaaee..5ba559466ef 100644
--- a/be/src/vec/exec/scan/vmeta_scan_node.cpp
+++ b/be/src/vec/exec/scan/vmeta_scan_node.cpp
@@ -52,7 +52,8 @@ Status VMetaScanNode::prepare(RuntimeState* state) {
     return Status::OK();
 }
 
-void VMetaScanNode::set_scan_ranges(const std::vector<TScanRangeParams>& 
scan_ranges) {
+void VMetaScanNode::set_scan_ranges(RuntimeState* state,
+                                    const std::vector<TScanRangeParams>& 
scan_ranges) {
     _scan_ranges = scan_ranges;
 }
 
diff --git a/be/src/vec/exec/scan/vmeta_scan_node.h 
b/be/src/vec/exec/scan/vmeta_scan_node.h
index caad8b1b7f9..73da9e85e37 100644
--- a/be/src/vec/exec/scan/vmeta_scan_node.h
+++ b/be/src/vec/exec/scan/vmeta_scan_node.h
@@ -47,7 +47,8 @@ public:
 
     Status init(const TPlanNode& tnode, RuntimeState* state = nullptr) 
override;
     Status prepare(RuntimeState* state) override;
-    void set_scan_ranges(const std::vector<TScanRangeParams>& scan_ranges) 
override;
+    void set_scan_ranges(RuntimeState* state,
+                         const std::vector<TScanRangeParams>& scan_ranges) 
override;
     const TMetaScanNode& scan_params() { return _scan_params; }
 
 private:
@@ -61,4 +62,4 @@ private:
     std::vector<TScanRangeParams> _scan_ranges;
 };
 
-} // namespace doris::vectorized
\ No newline at end of file
+} // namespace doris::vectorized
diff --git a/be/src/vec/exec/scan/vscan_node.h 
b/be/src/vec/exec/scan/vscan_node.h
index be72c7ca1f1..73961f59133 100644
--- a/be/src/vec/exec/scan/vscan_node.h
+++ b/be/src/vec/exec/scan/vscan_node.h
@@ -116,7 +116,8 @@ public:
 
     Status open(RuntimeState* state) override;
 
-    virtual void set_scan_ranges(const std::vector<TScanRangeParams>& 
scan_ranges) {}
+    virtual void set_scan_ranges(RuntimeState* state,
+                                 const std::vector<TScanRangeParams>& 
scan_ranges) {}
 
     void set_shared_scan(RuntimeState* state, bool shared_scan) {
         _shared_scan_opt = shared_scan;
diff --git a/be/src/vec/exec/vdata_gen_scan_node.cpp 
b/be/src/vec/exec/vdata_gen_scan_node.cpp
index 999a88a4931..6c5db2c0161 100644
--- a/be/src/vec/exec/vdata_gen_scan_node.cpp
+++ b/be/src/vec/exec/vdata_gen_scan_node.cpp
@@ -137,7 +137,8 @@ Status VDataGenFunctionScanNode::close(RuntimeState* state) 
{
     return ExecNode::close(state);
 }
 
-Status VDataGenFunctionScanNode::set_scan_ranges(const 
std::vector<TScanRangeParams>& scan_ranges) {
+Status VDataGenFunctionScanNode::set_scan_ranges(RuntimeState* state,
+                                                 const 
std::vector<TScanRangeParams>& scan_ranges) {
     return _table_func->set_scan_ranges(scan_ranges);
 }
 
diff --git a/be/src/vec/exec/vdata_gen_scan_node.h 
b/be/src/vec/exec/vdata_gen_scan_node.h
index 13e4e6408ad..0faa5d0fe3d 100644
--- a/be/src/vec/exec/vdata_gen_scan_node.h
+++ b/be/src/vec/exec/vdata_gen_scan_node.h
@@ -51,7 +51,8 @@ public:
     Status close(RuntimeState* state) override;
 
     // No use
-    Status set_scan_ranges(const std::vector<TScanRangeParams>& scan_ranges) 
override;
+    Status set_scan_ranges(RuntimeState* state,
+                           const std::vector<TScanRangeParams>& scan_ranges) 
override;
 
 protected:
     std::shared_ptr<VDataGenFunctionInf> _table_func;
diff --git a/be/src/vec/exec/vmysql_scan_node.cpp 
b/be/src/vec/exec/vmysql_scan_node.cpp
index d744a54470c..c0720c5580b 100644
--- a/be/src/vec/exec/vmysql_scan_node.cpp
+++ b/be/src/vec/exec/vmysql_scan_node.cpp
@@ -227,7 +227,8 @@ void VMysqlScanNode::debug_string(int indentation_level, 
std::stringstream* out)
     }
 }
 
-Status VMysqlScanNode::set_scan_ranges(const std::vector<TScanRangeParams>& 
scan_ranges) {
+Status VMysqlScanNode::set_scan_ranges(RuntimeState* state,
+                                       const std::vector<TScanRangeParams>& 
scan_ranges) {
     return Status::OK();
 }
-} // namespace doris::vectorized
\ No newline at end of file
+} // namespace doris::vectorized
diff --git a/be/src/vec/exec/vmysql_scan_node.h 
b/be/src/vec/exec/vmysql_scan_node.h
index 742b94eaf50..85ecca784e4 100644
--- a/be/src/vec/exec/vmysql_scan_node.h
+++ b/be/src/vec/exec/vmysql_scan_node.h
@@ -49,7 +49,8 @@ public:
     Status close(RuntimeState* state) override;
 
     // No use
-    Status set_scan_ranges(const std::vector<TScanRangeParams>& scan_ranges) 
override;
+    Status set_scan_ranges(RuntimeState* state,
+                           const std::vector<TScanRangeParams>& scan_ranges) 
override;
 
 private:
     // Write debug string of this into out.
@@ -79,4 +80,4 @@ private:
     DataTypeSerDe::FormatOptions _text_formatOptions;
 };
 } // namespace vectorized
-} // namespace doris
\ No newline at end of file
+} // namespace doris
diff --git a/be/src/vec/exec/vschema_scan_node.cpp 
b/be/src/vec/exec/vschema_scan_node.cpp
index cf5659bf48b..57e0de2cca1 100644
--- a/be/src/vec/exec/vschema_scan_node.cpp
+++ b/be/src/vec/exec/vschema_scan_node.cpp
@@ -302,7 +302,8 @@ void VSchemaScanNode::debug_string(int indentation_level, 
std::stringstream* out
     }
 }
 
-Status VSchemaScanNode::set_scan_ranges(const std::vector<TScanRangeParams>& 
scan_ranges) {
+Status VSchemaScanNode::set_scan_ranges(RuntimeState* state,
+                                        const std::vector<TScanRangeParams>& 
scan_ranges) {
     return Status::OK();
 }
 
diff --git a/be/src/vec/exec/vschema_scan_node.h 
b/be/src/vec/exec/vschema_scan_node.h
index 589de552eec..1b551704fb7 100644
--- a/be/src/vec/exec/vschema_scan_node.h
+++ b/be/src/vec/exec/vschema_scan_node.h
@@ -56,7 +56,8 @@ public:
 
 private:
     // this is no use in this class
-    Status set_scan_ranges(const std::vector<TScanRangeParams>& scan_ranges) 
override;
+    Status set_scan_ranges(RuntimeState* state,
+                           const std::vector<TScanRangeParams>& scan_ranges) 
override;
 
     // Write debug string of this into out.
     void debug_string(int indentation_level, std::stringstream* out) const 
override;
@@ -77,4 +78,4 @@ private:
     std::unique_ptr<SchemaScanner> _schema_scanner;
 };
 } // namespace vectorized
-} // namespace doris
\ No newline at end of file
+} // namespace doris


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to