This is an automated email from the ASF dual-hosted git repository. morningman pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new e20cab64f4d [improvement](scan) avoid too many scanners for file scan node (#25727) e20cab64f4d is described below commit e20cab64f4dba11aac8b2acbcbc69f57a7f3606f Author: Mingyu Chen <morning...@163.com> AuthorDate: Sun Oct 29 17:41:31 2023 +0800 [improvement](scan) avoid too many scanners for file scan node (#25727) In previous, when using file scan node(eq, querying hive table), the max number of scanner for each scan node will be the `doris_scanner_thread_pool_thread_num`(default is 48). And if the query parallelism is N, the total number of scanner would be 48 * N, which is too many. In this PR, I change the logic, the max number of scanner for each scan node will be the `doris_scanner_thread_pool_thread_num / query parallelism`. So that the total number of scanners will be up to `doris_scanner_thread_pool_thread_num`. Reduce the number of scanner can significantly reduce the memory usage of query. --- be/src/exec/exec_node.cpp | 5 +++-- be/src/exec/scan_node.h | 3 ++- be/src/io/fs/multi_table_pipe.cpp | 4 ++-- be/src/pipeline/exec/es_scan_operator.cpp | 3 ++- be/src/pipeline/exec/es_scan_operator.h | 3 ++- be/src/pipeline/exec/file_scan_operator.cpp | 7 +++++-- be/src/pipeline/exec/file_scan_operator.h | 3 ++- be/src/pipeline/exec/meta_scan_operator.cpp | 3 ++- be/src/pipeline/exec/meta_scan_operator.h | 3 ++- be/src/pipeline/exec/olap_scan_operator.cpp | 3 ++- be/src/pipeline/exec/olap_scan_operator.h | 3 ++- be/src/pipeline/exec/scan_operator.cpp | 2 +- be/src/pipeline/exec/scan_operator.h | 6 ++++-- be/src/pipeline/pipeline_fragment_context.cpp | 6 +++--- be/src/runtime/group_commit_mgr.cpp | 4 ++-- be/src/runtime/plan_fragment_executor.cpp | 4 ++-- be/src/service/backend_service.cpp | 4 ++-- be/src/vec/exec/scan/group_commit_scan_node.cpp | 3 ++- be/src/vec/exec/scan/group_commit_scan_node.h | 3 ++- be/src/vec/exec/scan/new_es_scan_node.cpp | 3 ++- be/src/vec/exec/scan/new_es_scan_node.h | 3 ++- be/src/vec/exec/scan/new_file_scan_node.cpp | 8 +++++--- be/src/vec/exec/scan/new_file_scan_node.h | 3 ++- be/src/vec/exec/scan/new_odbc_scan_node.h | 3 ++- be/src/vec/exec/scan/new_olap_scan_node.cpp | 3 ++- be/src/vec/exec/scan/new_olap_scan_node.h | 3 ++- be/src/vec/exec/scan/vmeta_scan_node.cpp | 3 ++- be/src/vec/exec/scan/vmeta_scan_node.h | 5 +++-- be/src/vec/exec/scan/vscan_node.h | 3 ++- be/src/vec/exec/vdata_gen_scan_node.cpp | 3 ++- be/src/vec/exec/vdata_gen_scan_node.h | 3 ++- be/src/vec/exec/vmysql_scan_node.cpp | 5 +++-- be/src/vec/exec/vmysql_scan_node.h | 5 +++-- be/src/vec/exec/vschema_scan_node.cpp | 3 ++- be/src/vec/exec/vschema_scan_node.h | 5 +++-- 35 files changed, 83 insertions(+), 50 deletions(-) diff --git a/be/src/exec/exec_node.cpp b/be/src/exec/exec_node.cpp index 16964264edf..ed2c54a0704 100644 --- a/be/src/exec/exec_node.cpp +++ b/be/src/exec/exec_node.cpp @@ -213,8 +213,6 @@ Status ExecNode::close(RuntimeState* state) { << " already closed"; return Status::OK(); } - LOG(INFO) << "query= " << print_id(state->query_id()) - << " fragment_instance_id=" << print_id(state->fragment_instance_id()) << " closed"; _is_closed = true; Status result; @@ -228,6 +226,9 @@ Status ExecNode::close(RuntimeState* state) { _peak_memory_usage_counter->set(_mem_tracker->peak_consumption()); } release_resource(state); + LOG(INFO) << "query= " << print_id(state->query_id()) + << ", fragment_instance_id=" << print_id(state->fragment_instance_id()) + << ", id=" << _id << " type=" << print_plan_node_type(_type) << " closed"; return result; } diff --git a/be/src/exec/scan_node.h b/be/src/exec/scan_node.h index 2514b0d2320..c127a7c7b17 100644 --- a/be/src/exec/scan_node.h +++ b/be/src/exec/scan_node.h @@ -83,7 +83,8 @@ public: // Convert scan_ranges into node-specific scan restrictions. This should be // called after prepare() - virtual Status set_scan_ranges(const std::vector<TScanRangeParams>& scan_ranges) = 0; + virtual Status set_scan_ranges(RuntimeState* state, + const std::vector<TScanRangeParams>& scan_ranges) = 0; bool is_scan_node() const override { return true; } diff --git a/be/src/io/fs/multi_table_pipe.cpp b/be/src/io/fs/multi_table_pipe.cpp index 3f2f1d19914..1036f7a30df 100644 --- a/be/src/io/fs/multi_table_pipe.cpp +++ b/be/src/io/fs/multi_table_pipe.cpp @@ -216,7 +216,7 @@ Status MultiTablePipe::exec_plans(ExecEnv* exec_env, std::vector<ExecParam> para if constexpr (std::is_same_v<ExecParam, TExecPlanFragmentParams>) { RETURN_IF_ERROR( putPipe(plan.params.fragment_instance_id, _planned_pipes[plan.table_name])); - LOG(INFO) << "fragment_instance_id=" << plan.params.fragment_instance_id + LOG(INFO) << "fragment_instance_id=" << print_id(plan.params.fragment_instance_id) << " table=" << plan.table_name; } else if constexpr (std::is_same_v<ExecParam, TPipelineFragmentParams>) { auto pipe_id = calculate_pipe_id(plan.query_id, plan.fragment_id); @@ -334,4 +334,4 @@ template Status MultiTablePipe::exec_plans(ExecEnv* exec_env, std::vector<TPipelineFragmentParams> params); } // namespace io -} // namespace doris \ No newline at end of file +} // namespace doris diff --git a/be/src/pipeline/exec/es_scan_operator.cpp b/be/src/pipeline/exec/es_scan_operator.cpp index 901daa9a437..9b41155a22b 100644 --- a/be/src/pipeline/exec/es_scan_operator.cpp +++ b/be/src/pipeline/exec/es_scan_operator.cpp @@ -106,7 +106,8 @@ Status EsScanLocalState::_init_scanners(std::list<vectorized::VScannerSPtr>* sca return Status::OK(); } -void EsScanLocalState::set_scan_ranges(const std::vector<TScanRangeParams>& scan_ranges) { +void EsScanLocalState::set_scan_ranges(RuntimeState* state, + const std::vector<TScanRangeParams>& scan_ranges) { for (auto& es_scan_range : scan_ranges) { DCHECK(es_scan_range.scan_range.__isset.es_scan_range); _scan_ranges.emplace_back(new TEsScanRange(es_scan_range.scan_range.es_scan_range)); diff --git a/be/src/pipeline/exec/es_scan_operator.h b/be/src/pipeline/exec/es_scan_operator.h index 7c8a5ceae54..6f02008383b 100644 --- a/be/src/pipeline/exec/es_scan_operator.h +++ b/be/src/pipeline/exec/es_scan_operator.h @@ -48,7 +48,8 @@ public: private: friend class vectorized::NewEsScanner; - void set_scan_ranges(const std::vector<TScanRangeParams>& scan_ranges) override; + void set_scan_ranges(RuntimeState* state, + const std::vector<TScanRangeParams>& scan_ranges) override; Status _init_profile() override; Status _process_conjuncts() override; Status _init_scanners(std::list<vectorized::VScannerSPtr>* scanners) override; diff --git a/be/src/pipeline/exec/file_scan_operator.cpp b/be/src/pipeline/exec/file_scan_operator.cpp index 9fdafc734c2..019f5813a42 100644 --- a/be/src/pipeline/exec/file_scan_operator.cpp +++ b/be/src/pipeline/exec/file_scan_operator.cpp @@ -52,8 +52,11 @@ Status FileScanLocalState::_init_scanners(std::list<vectorized::VScannerSPtr>* s return Status::OK(); } -void FileScanLocalState::set_scan_ranges(const std::vector<TScanRangeParams>& scan_ranges) { - int max_scanners = config::doris_scanner_thread_pool_thread_num; +void FileScanLocalState::set_scan_ranges(RuntimeState* state, + const std::vector<TScanRangeParams>& scan_ranges) { + int max_scanners = + config::doris_scanner_thread_pool_thread_num / state->query_parallel_instance_num(); + max_scanners = max_scanners == 0 ? 1 : max_scanners; if (scan_ranges.size() <= max_scanners) { _scan_ranges = scan_ranges; } else { diff --git a/be/src/pipeline/exec/file_scan_operator.h b/be/src/pipeline/exec/file_scan_operator.h index df54a22e611..4648ed716a0 100644 --- a/be/src/pipeline/exec/file_scan_operator.h +++ b/be/src/pipeline/exec/file_scan_operator.h @@ -51,7 +51,8 @@ public: Status _process_conjuncts() override; Status _init_scanners(std::list<vectorized::VScannerSPtr>* scanners) override; - void set_scan_ranges(const std::vector<TScanRangeParams>& scan_ranges) override; + void set_scan_ranges(RuntimeState* state, + const std::vector<TScanRangeParams>& scan_ranges) override; int parent_id() { return _parent->node_id(); } private: diff --git a/be/src/pipeline/exec/meta_scan_operator.cpp b/be/src/pipeline/exec/meta_scan_operator.cpp index 326b9f9e258..c2f9bc7e428 100644 --- a/be/src/pipeline/exec/meta_scan_operator.cpp +++ b/be/src/pipeline/exec/meta_scan_operator.cpp @@ -39,7 +39,8 @@ Status MetaScanLocalState::_init_scanners(std::list<vectorized::VScannerSPtr>* s return Status::OK(); } -void MetaScanLocalState::set_scan_ranges(const std::vector<TScanRangeParams>& scan_ranges) { +void MetaScanLocalState::set_scan_ranges(RuntimeState* state, + const std::vector<TScanRangeParams>& scan_ranges) { _scan_ranges = scan_ranges; } diff --git a/be/src/pipeline/exec/meta_scan_operator.h b/be/src/pipeline/exec/meta_scan_operator.h index ed371847a1b..1bfda6c9b83 100644 --- a/be/src/pipeline/exec/meta_scan_operator.h +++ b/be/src/pipeline/exec/meta_scan_operator.h @@ -48,7 +48,8 @@ public: private: friend class vectorized::NewOlapScanner; - void set_scan_ranges(const std::vector<TScanRangeParams>& scan_ranges) override; + void set_scan_ranges(RuntimeState* state, + const std::vector<TScanRangeParams>& scan_ranges) override; Status _init_scanners(std::list<vectorized::VScannerSPtr>* scanners) override; Status _process_conjuncts() override; diff --git a/be/src/pipeline/exec/olap_scan_operator.cpp b/be/src/pipeline/exec/olap_scan_operator.cpp index b4e61af2051..5ea76ddd0d6 100644 --- a/be/src/pipeline/exec/olap_scan_operator.cpp +++ b/be/src/pipeline/exec/olap_scan_operator.cpp @@ -293,7 +293,8 @@ TOlapScanNode& OlapScanLocalState::olap_scan_node() { return _parent->cast<OlapScanOperatorX>()._olap_scan_node; } -void OlapScanLocalState::set_scan_ranges(const std::vector<TScanRangeParams>& scan_ranges) { +void OlapScanLocalState::set_scan_ranges(RuntimeState* state, + const std::vector<TScanRangeParams>& scan_ranges) { for (auto& scan_range : scan_ranges) { DCHECK(scan_range.scan_range.__isset.palo_scan_range); _scan_ranges.emplace_back(new TPaloScanRange(scan_range.scan_range.palo_scan_range)); diff --git a/be/src/pipeline/exec/olap_scan_operator.h b/be/src/pipeline/exec/olap_scan_operator.h index 5f7c86c9c3f..0527fa6f44d 100644 --- a/be/src/pipeline/exec/olap_scan_operator.h +++ b/be/src/pipeline/exec/olap_scan_operator.h @@ -50,7 +50,8 @@ public: private: friend class vectorized::NewOlapScanner; - void set_scan_ranges(const std::vector<TScanRangeParams>& scan_ranges) override; + void set_scan_ranges(RuntimeState* state, + const std::vector<TScanRangeParams>& scan_ranges) override; Status _init_profile() override; Status _process_conjuncts() override; bool _is_key_column(const std::string& col_name) override; diff --git a/be/src/pipeline/exec/scan_operator.cpp b/be/src/pipeline/exec/scan_operator.cpp index 2df1755c811..71876045257 100644 --- a/be/src/pipeline/exec/scan_operator.cpp +++ b/be/src/pipeline/exec/scan_operator.cpp @@ -131,7 +131,7 @@ Status ScanLocalState<Derived>::init(RuntimeState* state, LocalStateInfo& info) _filter_dependency->set_filter_blocked_by_fn( [this]() { return this->runtime_filters_are_ready_or_timeout(); }); auto& p = _parent->cast<typename Derived::Parent>(); - set_scan_ranges(info.scan_ranges); + set_scan_ranges(state, info.scan_ranges); _common_expr_ctxs_push_down.resize(p._common_expr_ctxs_push_down.size()); for (size_t i = 0; i < _common_expr_ctxs_push_down.size(); i++) { RETURN_IF_ERROR( diff --git a/be/src/pipeline/exec/scan_operator.h b/be/src/pipeline/exec/scan_operator.h index 022b57ee9bf..c71811c831d 100644 --- a/be/src/pipeline/exec/scan_operator.h +++ b/be/src/pipeline/exec/scan_operator.h @@ -136,7 +136,8 @@ public: [[nodiscard]] virtual int runtime_filter_num() const = 0; virtual Status clone_conjunct_ctxs(vectorized::VExprContextSPtrs& conjuncts) = 0; - virtual void set_scan_ranges(const std::vector<TScanRangeParams>& scan_ranges) = 0; + virtual void set_scan_ranges(RuntimeState* state, + const std::vector<TScanRangeParams>& scan_ranges) = 0; virtual TPushAggOp::type get_push_down_agg_type() = 0; @@ -219,7 +220,8 @@ class ScanLocalState : public ScanLocalStateBase { } Status clone_conjunct_ctxs(vectorized::VExprContextSPtrs& conjuncts) override; - virtual void set_scan_ranges(const std::vector<TScanRangeParams>& scan_ranges) override {} + virtual void set_scan_ranges(RuntimeState* state, + const std::vector<TScanRangeParams>& scan_ranges) override {} TPushAggOp::type get_push_down_agg_type() override; diff --git a/be/src/pipeline/pipeline_fragment_context.cpp b/be/src/pipeline/pipeline_fragment_context.cpp index 686ff0fe0ad..4616ff2fb49 100644 --- a/be/src/pipeline/pipeline_fragment_context.cpp +++ b/be/src/pipeline/pipeline_fragment_context.cpp @@ -300,15 +300,15 @@ Status PipelineFragmentContext::prepare(const doris::TPipelineFragmentParams& re no_scan_ranges); const bool shared_scan = find_with_default(local_params.per_node_shared_scans, scan_node->id(), false); - scan_node->set_scan_ranges(scan_ranges); + scan_node->set_scan_ranges(_runtime_state.get(), scan_ranges); scan_node->set_shared_scan(_runtime_state.get(), shared_scan); } else { ScanNode* scan_node = static_cast<ScanNode*>(node); auto scan_ranges = find_with_default(local_params.per_node_scan_ranges, scan_node->id(), no_scan_ranges); - static_cast<void>(scan_node->set_scan_ranges(scan_ranges)); + static_cast<void>(scan_node->set_scan_ranges(_runtime_state.get(), scan_ranges)); VLOG_CRITICAL << "query " << print_id(get_query_id()) - << "scan_node_Id=" << scan_node->id() + << " scan_node_id=" << scan_node->id() << " size=" << scan_ranges.get().size(); } } diff --git a/be/src/runtime/group_commit_mgr.cpp b/be/src/runtime/group_commit_mgr.cpp index be49cca5258..1bdb5b586f0 100644 --- a/be/src/runtime/group_commit_mgr.cpp +++ b/be/src/runtime/group_commit_mgr.cpp @@ -464,7 +464,7 @@ Status GroupCommitMgr::group_commit_insert(int64_t table_id, const TPlan& plan, RETURN_IF_ERROR(file_scan_node.prepare(runtime_state.get())); std::vector<TScanRangeParams> params_vector; params_vector.emplace_back(scan_range_params); - file_scan_node.set_scan_ranges(params_vector); + file_scan_node.set_scan_ranges(runtime_state.get(), params_vector); RETURN_IF_ERROR(file_scan_node.open(runtime_state.get())); // 3. Put the block into block queue. @@ -557,4 +557,4 @@ Status GroupCommitMgr::get_load_block_queue(int64_t table_id, const TUniqueId& i } return group_commit_table->get_load_block_queue(instance_id, load_block_queue); } -} // namespace doris \ No newline at end of file +} // namespace doris diff --git a/be/src/runtime/plan_fragment_executor.cpp b/be/src/runtime/plan_fragment_executor.cpp index 78d48327abf..a58744a5692 100644 --- a/be/src/runtime/plan_fragment_executor.cpp +++ b/be/src/runtime/plan_fragment_executor.cpp @@ -211,12 +211,12 @@ Status PlanFragmentExecutor::prepare(const TExecPlanFragmentParams& request) { vectorized::VScanNode* scan_node = static_cast<vectorized::VScanNode*>(scan_nodes[i]); auto scan_ranges = find_with_default(params.per_node_scan_ranges, scan_node->id(), no_scan_ranges); - scan_node->set_scan_ranges(scan_ranges); + scan_node->set_scan_ranges(runtime_state(), scan_ranges); } else { ScanNode* scan_node = static_cast<ScanNode*>(scan_nodes[i]); auto scan_ranges = find_with_default(params.per_node_scan_ranges, scan_node->id(), no_scan_ranges); - static_cast<void>(scan_node->set_scan_ranges(scan_ranges)); + static_cast<void>(scan_node->set_scan_ranges(runtime_state(), scan_ranges)); VLOG_CRITICAL << "scan_node_Id=" << scan_node->id() << " size=" << scan_ranges.get().size(); } diff --git a/be/src/service/backend_service.cpp b/be/src/service/backend_service.cpp index 4374a789caf..a312996162c 100644 --- a/be/src/service/backend_service.cpp +++ b/be/src/service/backend_service.cpp @@ -108,7 +108,7 @@ Status BackendService::create_service(ExecEnv* exec_env, int port, void BackendService::exec_plan_fragment(TExecPlanFragmentResult& return_val, const TExecPlanFragmentParams& params) { - LOG(INFO) << "exec_plan_fragment() instance_id=" << params.params.fragment_instance_id + LOG(INFO) << "exec_plan_fragment() instance_id=" << print_id(params.params.fragment_instance_id) << " coord=" << params.coord << " backend#=" << params.backend_num; start_plan_fragment_execution(params).set_t_status(&return_val); } @@ -122,7 +122,7 @@ Status BackendService::start_plan_fragment_execution(const TExecPlanFragmentPara void BackendService::cancel_plan_fragment(TCancelPlanFragmentResult& return_val, const TCancelPlanFragmentParams& params) { - LOG(INFO) << "cancel_plan_fragment(): instance_id=" << params.fragment_instance_id; + LOG(INFO) << "cancel_plan_fragment(): instance_id=" << print_id(params.fragment_instance_id); _exec_env->fragment_mgr()->cancel_instance(params.fragment_instance_id, PPlanFragmentCancelReason::INTERNAL_ERROR); } diff --git a/be/src/vec/exec/scan/group_commit_scan_node.cpp b/be/src/vec/exec/scan/group_commit_scan_node.cpp index d5bc4565772..d78c165a3dc 100644 --- a/be/src/vec/exec/scan/group_commit_scan_node.cpp +++ b/be/src/vec/exec/scan/group_commit_scan_node.cpp @@ -50,7 +50,8 @@ Status GroupCommitScanNode::prepare(RuntimeState* state) { return VScanNode::prepare(state); } -void GroupCommitScanNode::set_scan_ranges(const std::vector<TScanRangeParams>& scan_ranges) {} +void GroupCommitScanNode::set_scan_ranges(RuntimeState* state, + const std::vector<TScanRangeParams>& scan_ranges) {} Status GroupCommitScanNode::_init_profile() { return VScanNode::_init_profile(); diff --git a/be/src/vec/exec/scan/group_commit_scan_node.h b/be/src/vec/exec/scan/group_commit_scan_node.h index 86c504d5819..36a29c0ae66 100644 --- a/be/src/vec/exec/scan/group_commit_scan_node.h +++ b/be/src/vec/exec/scan/group_commit_scan_node.h @@ -31,7 +31,8 @@ public: Status prepare(RuntimeState* state) override; - void set_scan_ranges(const std::vector<TScanRangeParams>& scan_ranges) override; + void set_scan_ranges(RuntimeState* state, + const std::vector<TScanRangeParams>& scan_ranges) override; std::string get_name() override; diff --git a/be/src/vec/exec/scan/new_es_scan_node.cpp b/be/src/vec/exec/scan/new_es_scan_node.cpp index 6a3ec3a7389..4704f5eb8c6 100644 --- a/be/src/vec/exec/scan/new_es_scan_node.cpp +++ b/be/src/vec/exec/scan/new_es_scan_node.cpp @@ -109,7 +109,8 @@ Status NewEsScanNode::prepare(RuntimeState* state) { return Status::OK(); } -void NewEsScanNode::set_scan_ranges(const std::vector<TScanRangeParams>& scan_ranges) { +void NewEsScanNode::set_scan_ranges(RuntimeState* state, + const std::vector<TScanRangeParams>& scan_ranges) { for (auto& es_scan_range : scan_ranges) { DCHECK(es_scan_range.scan_range.__isset.es_scan_range); _scan_ranges.emplace_back(new TEsScanRange(es_scan_range.scan_range.es_scan_range)); diff --git a/be/src/vec/exec/scan/new_es_scan_node.h b/be/src/vec/exec/scan/new_es_scan_node.h index 2cb6193d644..5185ead56bc 100644 --- a/be/src/vec/exec/scan/new_es_scan_node.h +++ b/be/src/vec/exec/scan/new_es_scan_node.h @@ -55,7 +55,8 @@ public: std::string get_name() override; Status init(const TPlanNode& tnode, RuntimeState* state = nullptr) override; Status prepare(RuntimeState* state) override; - void set_scan_ranges(const std::vector<TScanRangeParams>& scan_ranges) override; + void set_scan_ranges(RuntimeState* state, + const std::vector<TScanRangeParams>& scan_ranges) override; protected: Status _init_profile() override; diff --git a/be/src/vec/exec/scan/new_file_scan_node.cpp b/be/src/vec/exec/scan/new_file_scan_node.cpp index 71c6c79b61b..43f43722e9c 100644 --- a/be/src/vec/exec/scan/new_file_scan_node.cpp +++ b/be/src/vec/exec/scan/new_file_scan_node.cpp @@ -58,8 +58,11 @@ Status NewFileScanNode::prepare(RuntimeState* state) { return Status::OK(); } -void NewFileScanNode::set_scan_ranges(const std::vector<TScanRangeParams>& scan_ranges) { - int max_scanners = config::doris_scanner_thread_pool_thread_num; +void NewFileScanNode::set_scan_ranges(RuntimeState* state, + const std::vector<TScanRangeParams>& scan_ranges) { + int max_scanners = + config::doris_scanner_thread_pool_thread_num / state->query_parallel_instance_num(); + max_scanners = max_scanners == 0 ? 1 : max_scanners; if (scan_ranges.size() <= max_scanners) { _scan_ranges = scan_ranges; } else { @@ -122,7 +125,6 @@ Status NewFileScanNode::_init_scanners(std::list<VScannerSPtr>* scanners) { scanner->prepare(_conjuncts, &_colname_to_value_range, &_colname_to_slot_id)); scanners->push_back(std::move(scanner)); } - return Status::OK(); } diff --git a/be/src/vec/exec/scan/new_file_scan_node.h b/be/src/vec/exec/scan/new_file_scan_node.h index 740e354db5f..e6d9fc1e621 100644 --- a/be/src/vec/exec/scan/new_file_scan_node.h +++ b/be/src/vec/exec/scan/new_file_scan_node.h @@ -47,7 +47,8 @@ public: Status prepare(RuntimeState* state) override; - void set_scan_ranges(const std::vector<TScanRangeParams>& scan_ranges) override; + void set_scan_ranges(RuntimeState* state, + const std::vector<TScanRangeParams>& scan_ranges) override; std::string get_name() override; diff --git a/be/src/vec/exec/scan/new_odbc_scan_node.h b/be/src/vec/exec/scan/new_odbc_scan_node.h index f76d2b645be..c6dde0ab3dc 100644 --- a/be/src/vec/exec/scan/new_odbc_scan_node.h +++ b/be/src/vec/exec/scan/new_odbc_scan_node.h @@ -46,7 +46,8 @@ public: std::string get_name() override; // no use - void set_scan_ranges(const std::vector<TScanRangeParams>& scan_ranges) override {} + void set_scan_ranges(RuntimeState* state, + const std::vector<TScanRangeParams>& scan_ranges) override {} protected: Status _init_profile() override; diff --git a/be/src/vec/exec/scan/new_olap_scan_node.cpp b/be/src/vec/exec/scan/new_olap_scan_node.cpp index 3bbff6c20a6..8c95391d48f 100644 --- a/be/src/vec/exec/scan/new_olap_scan_node.cpp +++ b/be/src/vec/exec/scan/new_olap_scan_node.cpp @@ -402,7 +402,8 @@ Status NewOlapScanNode::_should_push_down_function_filter(VectorizedFnCall* fn_c // 9: optional string table_name //} // every doris_scan_range is related with one tablet so that one olap scan node contains multiple tablet -void NewOlapScanNode::set_scan_ranges(const std::vector<TScanRangeParams>& scan_ranges) { +void NewOlapScanNode::set_scan_ranges(RuntimeState* state, + const std::vector<TScanRangeParams>& scan_ranges) { for (auto& scan_range : scan_ranges) { DCHECK(scan_range.scan_range.__isset.palo_scan_range); _scan_ranges.emplace_back(new TPaloScanRange(scan_range.scan_range.palo_scan_range)); diff --git a/be/src/vec/exec/scan/new_olap_scan_node.h b/be/src/vec/exec/scan/new_olap_scan_node.h index a26c300ffc4..93039c6182a 100644 --- a/be/src/vec/exec/scan/new_olap_scan_node.h +++ b/be/src/vec/exec/scan/new_olap_scan_node.h @@ -66,7 +66,8 @@ public: Status collect_query_statistics(QueryStatistics* statistics) override; Status collect_query_statistics(QueryStatistics* statistics, int sender_id) override; - void set_scan_ranges(const std::vector<TScanRangeParams>& scan_ranges) override; + void set_scan_ranges(RuntimeState* state, + const std::vector<TScanRangeParams>& scan_ranges) override; std::string get_name() override; diff --git a/be/src/vec/exec/scan/vmeta_scan_node.cpp b/be/src/vec/exec/scan/vmeta_scan_node.cpp index 3bdcbfbaaee..5ba559466ef 100644 --- a/be/src/vec/exec/scan/vmeta_scan_node.cpp +++ b/be/src/vec/exec/scan/vmeta_scan_node.cpp @@ -52,7 +52,8 @@ Status VMetaScanNode::prepare(RuntimeState* state) { return Status::OK(); } -void VMetaScanNode::set_scan_ranges(const std::vector<TScanRangeParams>& scan_ranges) { +void VMetaScanNode::set_scan_ranges(RuntimeState* state, + const std::vector<TScanRangeParams>& scan_ranges) { _scan_ranges = scan_ranges; } diff --git a/be/src/vec/exec/scan/vmeta_scan_node.h b/be/src/vec/exec/scan/vmeta_scan_node.h index caad8b1b7f9..73da9e85e37 100644 --- a/be/src/vec/exec/scan/vmeta_scan_node.h +++ b/be/src/vec/exec/scan/vmeta_scan_node.h @@ -47,7 +47,8 @@ public: Status init(const TPlanNode& tnode, RuntimeState* state = nullptr) override; Status prepare(RuntimeState* state) override; - void set_scan_ranges(const std::vector<TScanRangeParams>& scan_ranges) override; + void set_scan_ranges(RuntimeState* state, + const std::vector<TScanRangeParams>& scan_ranges) override; const TMetaScanNode& scan_params() { return _scan_params; } private: @@ -61,4 +62,4 @@ private: std::vector<TScanRangeParams> _scan_ranges; }; -} // namespace doris::vectorized \ No newline at end of file +} // namespace doris::vectorized diff --git a/be/src/vec/exec/scan/vscan_node.h b/be/src/vec/exec/scan/vscan_node.h index be72c7ca1f1..73961f59133 100644 --- a/be/src/vec/exec/scan/vscan_node.h +++ b/be/src/vec/exec/scan/vscan_node.h @@ -116,7 +116,8 @@ public: Status open(RuntimeState* state) override; - virtual void set_scan_ranges(const std::vector<TScanRangeParams>& scan_ranges) {} + virtual void set_scan_ranges(RuntimeState* state, + const std::vector<TScanRangeParams>& scan_ranges) {} void set_shared_scan(RuntimeState* state, bool shared_scan) { _shared_scan_opt = shared_scan; diff --git a/be/src/vec/exec/vdata_gen_scan_node.cpp b/be/src/vec/exec/vdata_gen_scan_node.cpp index 999a88a4931..6c5db2c0161 100644 --- a/be/src/vec/exec/vdata_gen_scan_node.cpp +++ b/be/src/vec/exec/vdata_gen_scan_node.cpp @@ -137,7 +137,8 @@ Status VDataGenFunctionScanNode::close(RuntimeState* state) { return ExecNode::close(state); } -Status VDataGenFunctionScanNode::set_scan_ranges(const std::vector<TScanRangeParams>& scan_ranges) { +Status VDataGenFunctionScanNode::set_scan_ranges(RuntimeState* state, + const std::vector<TScanRangeParams>& scan_ranges) { return _table_func->set_scan_ranges(scan_ranges); } diff --git a/be/src/vec/exec/vdata_gen_scan_node.h b/be/src/vec/exec/vdata_gen_scan_node.h index 13e4e6408ad..0faa5d0fe3d 100644 --- a/be/src/vec/exec/vdata_gen_scan_node.h +++ b/be/src/vec/exec/vdata_gen_scan_node.h @@ -51,7 +51,8 @@ public: Status close(RuntimeState* state) override; // No use - Status set_scan_ranges(const std::vector<TScanRangeParams>& scan_ranges) override; + Status set_scan_ranges(RuntimeState* state, + const std::vector<TScanRangeParams>& scan_ranges) override; protected: std::shared_ptr<VDataGenFunctionInf> _table_func; diff --git a/be/src/vec/exec/vmysql_scan_node.cpp b/be/src/vec/exec/vmysql_scan_node.cpp index d744a54470c..c0720c5580b 100644 --- a/be/src/vec/exec/vmysql_scan_node.cpp +++ b/be/src/vec/exec/vmysql_scan_node.cpp @@ -227,7 +227,8 @@ void VMysqlScanNode::debug_string(int indentation_level, std::stringstream* out) } } -Status VMysqlScanNode::set_scan_ranges(const std::vector<TScanRangeParams>& scan_ranges) { +Status VMysqlScanNode::set_scan_ranges(RuntimeState* state, + const std::vector<TScanRangeParams>& scan_ranges) { return Status::OK(); } -} // namespace doris::vectorized \ No newline at end of file +} // namespace doris::vectorized diff --git a/be/src/vec/exec/vmysql_scan_node.h b/be/src/vec/exec/vmysql_scan_node.h index 742b94eaf50..85ecca784e4 100644 --- a/be/src/vec/exec/vmysql_scan_node.h +++ b/be/src/vec/exec/vmysql_scan_node.h @@ -49,7 +49,8 @@ public: Status close(RuntimeState* state) override; // No use - Status set_scan_ranges(const std::vector<TScanRangeParams>& scan_ranges) override; + Status set_scan_ranges(RuntimeState* state, + const std::vector<TScanRangeParams>& scan_ranges) override; private: // Write debug string of this into out. @@ -79,4 +80,4 @@ private: DataTypeSerDe::FormatOptions _text_formatOptions; }; } // namespace vectorized -} // namespace doris \ No newline at end of file +} // namespace doris diff --git a/be/src/vec/exec/vschema_scan_node.cpp b/be/src/vec/exec/vschema_scan_node.cpp index cf5659bf48b..57e0de2cca1 100644 --- a/be/src/vec/exec/vschema_scan_node.cpp +++ b/be/src/vec/exec/vschema_scan_node.cpp @@ -302,7 +302,8 @@ void VSchemaScanNode::debug_string(int indentation_level, std::stringstream* out } } -Status VSchemaScanNode::set_scan_ranges(const std::vector<TScanRangeParams>& scan_ranges) { +Status VSchemaScanNode::set_scan_ranges(RuntimeState* state, + const std::vector<TScanRangeParams>& scan_ranges) { return Status::OK(); } diff --git a/be/src/vec/exec/vschema_scan_node.h b/be/src/vec/exec/vschema_scan_node.h index 589de552eec..1b551704fb7 100644 --- a/be/src/vec/exec/vschema_scan_node.h +++ b/be/src/vec/exec/vschema_scan_node.h @@ -56,7 +56,8 @@ public: private: // this is no use in this class - Status set_scan_ranges(const std::vector<TScanRangeParams>& scan_ranges) override; + Status set_scan_ranges(RuntimeState* state, + const std::vector<TScanRangeParams>& scan_ranges) override; // Write debug string of this into out. void debug_string(int indentation_level, std::stringstream* out) const override; @@ -77,4 +78,4 @@ private: std::unique_ptr<SchemaScanner> _schema_scanner; }; } // namespace vectorized -} // namespace doris \ No newline at end of file +} // namespace doris --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org