This is an automated email from the ASF dual-hosted git repository.
morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new e20cab64f4d [improvement](scan) avoid too many scanners for file scan
node (#25727)
e20cab64f4d is described below
commit e20cab64f4dba11aac8b2acbcbc69f57a7f3606f
Author: Mingyu Chen <[email protected]>
AuthorDate: Sun Oct 29 17:41:31 2023 +0800
[improvement](scan) avoid too many scanners for file scan node (#25727)
In previous, when using file scan node(eq, querying hive table), the max
number of scanner for each scan node
will be the `doris_scanner_thread_pool_thread_num`(default is 48).
And if the query parallelism is N, the total number of scanner would be 48
* N, which is too many.
In this PR, I change the logic, the max number of scanner for each scan node
will be the `doris_scanner_thread_pool_thread_num / query parallelism`. So
that the total number of scanners
will be up to `doris_scanner_thread_pool_thread_num`.
Reduce the number of scanner can significantly reduce the memory usage of
query.
---
be/src/exec/exec_node.cpp | 5 +++--
be/src/exec/scan_node.h | 3 ++-
be/src/io/fs/multi_table_pipe.cpp | 4 ++--
be/src/pipeline/exec/es_scan_operator.cpp | 3 ++-
be/src/pipeline/exec/es_scan_operator.h | 3 ++-
be/src/pipeline/exec/file_scan_operator.cpp | 7 +++++--
be/src/pipeline/exec/file_scan_operator.h | 3 ++-
be/src/pipeline/exec/meta_scan_operator.cpp | 3 ++-
be/src/pipeline/exec/meta_scan_operator.h | 3 ++-
be/src/pipeline/exec/olap_scan_operator.cpp | 3 ++-
be/src/pipeline/exec/olap_scan_operator.h | 3 ++-
be/src/pipeline/exec/scan_operator.cpp | 2 +-
be/src/pipeline/exec/scan_operator.h | 6 ++++--
be/src/pipeline/pipeline_fragment_context.cpp | 6 +++---
be/src/runtime/group_commit_mgr.cpp | 4 ++--
be/src/runtime/plan_fragment_executor.cpp | 4 ++--
be/src/service/backend_service.cpp | 4 ++--
be/src/vec/exec/scan/group_commit_scan_node.cpp | 3 ++-
be/src/vec/exec/scan/group_commit_scan_node.h | 3 ++-
be/src/vec/exec/scan/new_es_scan_node.cpp | 3 ++-
be/src/vec/exec/scan/new_es_scan_node.h | 3 ++-
be/src/vec/exec/scan/new_file_scan_node.cpp | 8 +++++---
be/src/vec/exec/scan/new_file_scan_node.h | 3 ++-
be/src/vec/exec/scan/new_odbc_scan_node.h | 3 ++-
be/src/vec/exec/scan/new_olap_scan_node.cpp | 3 ++-
be/src/vec/exec/scan/new_olap_scan_node.h | 3 ++-
be/src/vec/exec/scan/vmeta_scan_node.cpp | 3 ++-
be/src/vec/exec/scan/vmeta_scan_node.h | 5 +++--
be/src/vec/exec/scan/vscan_node.h | 3 ++-
be/src/vec/exec/vdata_gen_scan_node.cpp | 3 ++-
be/src/vec/exec/vdata_gen_scan_node.h | 3 ++-
be/src/vec/exec/vmysql_scan_node.cpp | 5 +++--
be/src/vec/exec/vmysql_scan_node.h | 5 +++--
be/src/vec/exec/vschema_scan_node.cpp | 3 ++-
be/src/vec/exec/vschema_scan_node.h | 5 +++--
35 files changed, 83 insertions(+), 50 deletions(-)
diff --git a/be/src/exec/exec_node.cpp b/be/src/exec/exec_node.cpp
index 16964264edf..ed2c54a0704 100644
--- a/be/src/exec/exec_node.cpp
+++ b/be/src/exec/exec_node.cpp
@@ -213,8 +213,6 @@ Status ExecNode::close(RuntimeState* state) {
<< " already closed";
return Status::OK();
}
- LOG(INFO) << "query= " << print_id(state->query_id())
- << " fragment_instance_id=" <<
print_id(state->fragment_instance_id()) << " closed";
_is_closed = true;
Status result;
@@ -228,6 +226,9 @@ Status ExecNode::close(RuntimeState* state) {
_peak_memory_usage_counter->set(_mem_tracker->peak_consumption());
}
release_resource(state);
+ LOG(INFO) << "query= " << print_id(state->query_id())
+ << ", fragment_instance_id=" <<
print_id(state->fragment_instance_id())
+ << ", id=" << _id << " type=" << print_plan_node_type(_type) <<
" closed";
return result;
}
diff --git a/be/src/exec/scan_node.h b/be/src/exec/scan_node.h
index 2514b0d2320..c127a7c7b17 100644
--- a/be/src/exec/scan_node.h
+++ b/be/src/exec/scan_node.h
@@ -83,7 +83,8 @@ public:
// Convert scan_ranges into node-specific scan restrictions. This should
be
// called after prepare()
- virtual Status set_scan_ranges(const std::vector<TScanRangeParams>&
scan_ranges) = 0;
+ virtual Status set_scan_ranges(RuntimeState* state,
+ const std::vector<TScanRangeParams>&
scan_ranges) = 0;
bool is_scan_node() const override { return true; }
diff --git a/be/src/io/fs/multi_table_pipe.cpp
b/be/src/io/fs/multi_table_pipe.cpp
index 3f2f1d19914..1036f7a30df 100644
--- a/be/src/io/fs/multi_table_pipe.cpp
+++ b/be/src/io/fs/multi_table_pipe.cpp
@@ -216,7 +216,7 @@ Status MultiTablePipe::exec_plans(ExecEnv* exec_env,
std::vector<ExecParam> para
if constexpr (std::is_same_v<ExecParam, TExecPlanFragmentParams>) {
RETURN_IF_ERROR(
putPipe(plan.params.fragment_instance_id,
_planned_pipes[plan.table_name]));
- LOG(INFO) << "fragment_instance_id=" <<
plan.params.fragment_instance_id
+ LOG(INFO) << "fragment_instance_id=" <<
print_id(plan.params.fragment_instance_id)
<< " table=" << plan.table_name;
} else if constexpr (std::is_same_v<ExecParam,
TPipelineFragmentParams>) {
auto pipe_id = calculate_pipe_id(plan.query_id, plan.fragment_id);
@@ -334,4 +334,4 @@ template Status MultiTablePipe::exec_plans(ExecEnv*
exec_env,
std::vector<TPipelineFragmentParams> params);
} // namespace io
-} // namespace doris
\ No newline at end of file
+} // namespace doris
diff --git a/be/src/pipeline/exec/es_scan_operator.cpp
b/be/src/pipeline/exec/es_scan_operator.cpp
index 901daa9a437..9b41155a22b 100644
--- a/be/src/pipeline/exec/es_scan_operator.cpp
+++ b/be/src/pipeline/exec/es_scan_operator.cpp
@@ -106,7 +106,8 @@ Status
EsScanLocalState::_init_scanners(std::list<vectorized::VScannerSPtr>* sca
return Status::OK();
}
-void EsScanLocalState::set_scan_ranges(const std::vector<TScanRangeParams>&
scan_ranges) {
+void EsScanLocalState::set_scan_ranges(RuntimeState* state,
+ const std::vector<TScanRangeParams>&
scan_ranges) {
for (auto& es_scan_range : scan_ranges) {
DCHECK(es_scan_range.scan_range.__isset.es_scan_range);
_scan_ranges.emplace_back(new
TEsScanRange(es_scan_range.scan_range.es_scan_range));
diff --git a/be/src/pipeline/exec/es_scan_operator.h
b/be/src/pipeline/exec/es_scan_operator.h
index 7c8a5ceae54..6f02008383b 100644
--- a/be/src/pipeline/exec/es_scan_operator.h
+++ b/be/src/pipeline/exec/es_scan_operator.h
@@ -48,7 +48,8 @@ public:
private:
friend class vectorized::NewEsScanner;
- void set_scan_ranges(const std::vector<TScanRangeParams>& scan_ranges)
override;
+ void set_scan_ranges(RuntimeState* state,
+ const std::vector<TScanRangeParams>& scan_ranges)
override;
Status _init_profile() override;
Status _process_conjuncts() override;
Status _init_scanners(std::list<vectorized::VScannerSPtr>* scanners)
override;
diff --git a/be/src/pipeline/exec/file_scan_operator.cpp
b/be/src/pipeline/exec/file_scan_operator.cpp
index 9fdafc734c2..019f5813a42 100644
--- a/be/src/pipeline/exec/file_scan_operator.cpp
+++ b/be/src/pipeline/exec/file_scan_operator.cpp
@@ -52,8 +52,11 @@ Status
FileScanLocalState::_init_scanners(std::list<vectorized::VScannerSPtr>* s
return Status::OK();
}
-void FileScanLocalState::set_scan_ranges(const std::vector<TScanRangeParams>&
scan_ranges) {
- int max_scanners = config::doris_scanner_thread_pool_thread_num;
+void FileScanLocalState::set_scan_ranges(RuntimeState* state,
+ const std::vector<TScanRangeParams>&
scan_ranges) {
+ int max_scanners =
+ config::doris_scanner_thread_pool_thread_num /
state->query_parallel_instance_num();
+ max_scanners = max_scanners == 0 ? 1 : max_scanners;
if (scan_ranges.size() <= max_scanners) {
_scan_ranges = scan_ranges;
} else {
diff --git a/be/src/pipeline/exec/file_scan_operator.h
b/be/src/pipeline/exec/file_scan_operator.h
index df54a22e611..4648ed716a0 100644
--- a/be/src/pipeline/exec/file_scan_operator.h
+++ b/be/src/pipeline/exec/file_scan_operator.h
@@ -51,7 +51,8 @@ public:
Status _process_conjuncts() override;
Status _init_scanners(std::list<vectorized::VScannerSPtr>* scanners)
override;
- void set_scan_ranges(const std::vector<TScanRangeParams>& scan_ranges)
override;
+ void set_scan_ranges(RuntimeState* state,
+ const std::vector<TScanRangeParams>& scan_ranges)
override;
int parent_id() { return _parent->node_id(); }
private:
diff --git a/be/src/pipeline/exec/meta_scan_operator.cpp
b/be/src/pipeline/exec/meta_scan_operator.cpp
index 326b9f9e258..c2f9bc7e428 100644
--- a/be/src/pipeline/exec/meta_scan_operator.cpp
+++ b/be/src/pipeline/exec/meta_scan_operator.cpp
@@ -39,7 +39,8 @@ Status
MetaScanLocalState::_init_scanners(std::list<vectorized::VScannerSPtr>* s
return Status::OK();
}
-void MetaScanLocalState::set_scan_ranges(const std::vector<TScanRangeParams>&
scan_ranges) {
+void MetaScanLocalState::set_scan_ranges(RuntimeState* state,
+ const std::vector<TScanRangeParams>&
scan_ranges) {
_scan_ranges = scan_ranges;
}
diff --git a/be/src/pipeline/exec/meta_scan_operator.h
b/be/src/pipeline/exec/meta_scan_operator.h
index ed371847a1b..1bfda6c9b83 100644
--- a/be/src/pipeline/exec/meta_scan_operator.h
+++ b/be/src/pipeline/exec/meta_scan_operator.h
@@ -48,7 +48,8 @@ public:
private:
friend class vectorized::NewOlapScanner;
- void set_scan_ranges(const std::vector<TScanRangeParams>& scan_ranges)
override;
+ void set_scan_ranges(RuntimeState* state,
+ const std::vector<TScanRangeParams>& scan_ranges)
override;
Status _init_scanners(std::list<vectorized::VScannerSPtr>* scanners)
override;
Status _process_conjuncts() override;
diff --git a/be/src/pipeline/exec/olap_scan_operator.cpp
b/be/src/pipeline/exec/olap_scan_operator.cpp
index b4e61af2051..5ea76ddd0d6 100644
--- a/be/src/pipeline/exec/olap_scan_operator.cpp
+++ b/be/src/pipeline/exec/olap_scan_operator.cpp
@@ -293,7 +293,8 @@ TOlapScanNode& OlapScanLocalState::olap_scan_node() {
return _parent->cast<OlapScanOperatorX>()._olap_scan_node;
}
-void OlapScanLocalState::set_scan_ranges(const std::vector<TScanRangeParams>&
scan_ranges) {
+void OlapScanLocalState::set_scan_ranges(RuntimeState* state,
+ const std::vector<TScanRangeParams>&
scan_ranges) {
for (auto& scan_range : scan_ranges) {
DCHECK(scan_range.scan_range.__isset.palo_scan_range);
_scan_ranges.emplace_back(new
TPaloScanRange(scan_range.scan_range.palo_scan_range));
diff --git a/be/src/pipeline/exec/olap_scan_operator.h
b/be/src/pipeline/exec/olap_scan_operator.h
index 5f7c86c9c3f..0527fa6f44d 100644
--- a/be/src/pipeline/exec/olap_scan_operator.h
+++ b/be/src/pipeline/exec/olap_scan_operator.h
@@ -50,7 +50,8 @@ public:
private:
friend class vectorized::NewOlapScanner;
- void set_scan_ranges(const std::vector<TScanRangeParams>& scan_ranges)
override;
+ void set_scan_ranges(RuntimeState* state,
+ const std::vector<TScanRangeParams>& scan_ranges)
override;
Status _init_profile() override;
Status _process_conjuncts() override;
bool _is_key_column(const std::string& col_name) override;
diff --git a/be/src/pipeline/exec/scan_operator.cpp
b/be/src/pipeline/exec/scan_operator.cpp
index 2df1755c811..71876045257 100644
--- a/be/src/pipeline/exec/scan_operator.cpp
+++ b/be/src/pipeline/exec/scan_operator.cpp
@@ -131,7 +131,7 @@ Status ScanLocalState<Derived>::init(RuntimeState* state,
LocalStateInfo& info)
_filter_dependency->set_filter_blocked_by_fn(
[this]() { return this->runtime_filters_are_ready_or_timeout(); });
auto& p = _parent->cast<typename Derived::Parent>();
- set_scan_ranges(info.scan_ranges);
+ set_scan_ranges(state, info.scan_ranges);
_common_expr_ctxs_push_down.resize(p._common_expr_ctxs_push_down.size());
for (size_t i = 0; i < _common_expr_ctxs_push_down.size(); i++) {
RETURN_IF_ERROR(
diff --git a/be/src/pipeline/exec/scan_operator.h
b/be/src/pipeline/exec/scan_operator.h
index 022b57ee9bf..c71811c831d 100644
--- a/be/src/pipeline/exec/scan_operator.h
+++ b/be/src/pipeline/exec/scan_operator.h
@@ -136,7 +136,8 @@ public:
[[nodiscard]] virtual int runtime_filter_num() const = 0;
virtual Status clone_conjunct_ctxs(vectorized::VExprContextSPtrs&
conjuncts) = 0;
- virtual void set_scan_ranges(const std::vector<TScanRangeParams>&
scan_ranges) = 0;
+ virtual void set_scan_ranges(RuntimeState* state,
+ const std::vector<TScanRangeParams>&
scan_ranges) = 0;
virtual TPushAggOp::type get_push_down_agg_type() = 0;
@@ -219,7 +220,8 @@ class ScanLocalState : public ScanLocalStateBase {
}
Status clone_conjunct_ctxs(vectorized::VExprContextSPtrs& conjuncts)
override;
- virtual void set_scan_ranges(const std::vector<TScanRangeParams>&
scan_ranges) override {}
+ virtual void set_scan_ranges(RuntimeState* state,
+ const std::vector<TScanRangeParams>&
scan_ranges) override {}
TPushAggOp::type get_push_down_agg_type() override;
diff --git a/be/src/pipeline/pipeline_fragment_context.cpp
b/be/src/pipeline/pipeline_fragment_context.cpp
index 686ff0fe0ad..4616ff2fb49 100644
--- a/be/src/pipeline/pipeline_fragment_context.cpp
+++ b/be/src/pipeline/pipeline_fragment_context.cpp
@@ -300,15 +300,15 @@ Status PipelineFragmentContext::prepare(const
doris::TPipelineFragmentParams& re
no_scan_ranges);
const bool shared_scan =
find_with_default(local_params.per_node_shared_scans,
scan_node->id(), false);
- scan_node->set_scan_ranges(scan_ranges);
+ scan_node->set_scan_ranges(_runtime_state.get(), scan_ranges);
scan_node->set_shared_scan(_runtime_state.get(), shared_scan);
} else {
ScanNode* scan_node = static_cast<ScanNode*>(node);
auto scan_ranges =
find_with_default(local_params.per_node_scan_ranges, scan_node->id(),
no_scan_ranges);
- static_cast<void>(scan_node->set_scan_ranges(scan_ranges));
+ static_cast<void>(scan_node->set_scan_ranges(_runtime_state.get(),
scan_ranges));
VLOG_CRITICAL << "query " << print_id(get_query_id())
- << "scan_node_Id=" << scan_node->id()
+ << " scan_node_id=" << scan_node->id()
<< " size=" << scan_ranges.get().size();
}
}
diff --git a/be/src/runtime/group_commit_mgr.cpp
b/be/src/runtime/group_commit_mgr.cpp
index be49cca5258..1bdb5b586f0 100644
--- a/be/src/runtime/group_commit_mgr.cpp
+++ b/be/src/runtime/group_commit_mgr.cpp
@@ -464,7 +464,7 @@ Status GroupCommitMgr::group_commit_insert(int64_t
table_id, const TPlan& plan,
RETURN_IF_ERROR(file_scan_node.prepare(runtime_state.get()));
std::vector<TScanRangeParams> params_vector;
params_vector.emplace_back(scan_range_params);
- file_scan_node.set_scan_ranges(params_vector);
+ file_scan_node.set_scan_ranges(runtime_state.get(), params_vector);
RETURN_IF_ERROR(file_scan_node.open(runtime_state.get()));
// 3. Put the block into block queue.
@@ -557,4 +557,4 @@ Status GroupCommitMgr::get_load_block_queue(int64_t
table_id, const TUniqueId& i
}
return group_commit_table->get_load_block_queue(instance_id,
load_block_queue);
}
-} // namespace doris
\ No newline at end of file
+} // namespace doris
diff --git a/be/src/runtime/plan_fragment_executor.cpp
b/be/src/runtime/plan_fragment_executor.cpp
index 78d48327abf..a58744a5692 100644
--- a/be/src/runtime/plan_fragment_executor.cpp
+++ b/be/src/runtime/plan_fragment_executor.cpp
@@ -211,12 +211,12 @@ Status PlanFragmentExecutor::prepare(const
TExecPlanFragmentParams& request) {
vectorized::VScanNode* scan_node =
static_cast<vectorized::VScanNode*>(scan_nodes[i]);
auto scan_ranges =
find_with_default(params.per_node_scan_ranges,
scan_node->id(), no_scan_ranges);
- scan_node->set_scan_ranges(scan_ranges);
+ scan_node->set_scan_ranges(runtime_state(), scan_ranges);
} else {
ScanNode* scan_node = static_cast<ScanNode*>(scan_nodes[i]);
auto scan_ranges =
find_with_default(params.per_node_scan_ranges,
scan_node->id(), no_scan_ranges);
- static_cast<void>(scan_node->set_scan_ranges(scan_ranges));
+ static_cast<void>(scan_node->set_scan_ranges(runtime_state(),
scan_ranges));
VLOG_CRITICAL << "scan_node_Id=" << scan_node->id()
<< " size=" << scan_ranges.get().size();
}
diff --git a/be/src/service/backend_service.cpp
b/be/src/service/backend_service.cpp
index 4374a789caf..a312996162c 100644
--- a/be/src/service/backend_service.cpp
+++ b/be/src/service/backend_service.cpp
@@ -108,7 +108,7 @@ Status BackendService::create_service(ExecEnv* exec_env,
int port,
void BackendService::exec_plan_fragment(TExecPlanFragmentResult& return_val,
const TExecPlanFragmentParams& params)
{
- LOG(INFO) << "exec_plan_fragment() instance_id=" <<
params.params.fragment_instance_id
+ LOG(INFO) << "exec_plan_fragment() instance_id=" <<
print_id(params.params.fragment_instance_id)
<< " coord=" << params.coord << " backend#=" <<
params.backend_num;
start_plan_fragment_execution(params).set_t_status(&return_val);
}
@@ -122,7 +122,7 @@ Status BackendService::start_plan_fragment_execution(const
TExecPlanFragmentPara
void BackendService::cancel_plan_fragment(TCancelPlanFragmentResult&
return_val,
const TCancelPlanFragmentParams&
params) {
- LOG(INFO) << "cancel_plan_fragment(): instance_id=" <<
params.fragment_instance_id;
+ LOG(INFO) << "cancel_plan_fragment(): instance_id=" <<
print_id(params.fragment_instance_id);
_exec_env->fragment_mgr()->cancel_instance(params.fragment_instance_id,
PPlanFragmentCancelReason::INTERNAL_ERROR);
}
diff --git a/be/src/vec/exec/scan/group_commit_scan_node.cpp
b/be/src/vec/exec/scan/group_commit_scan_node.cpp
index d5bc4565772..d78c165a3dc 100644
--- a/be/src/vec/exec/scan/group_commit_scan_node.cpp
+++ b/be/src/vec/exec/scan/group_commit_scan_node.cpp
@@ -50,7 +50,8 @@ Status GroupCommitScanNode::prepare(RuntimeState* state) {
return VScanNode::prepare(state);
}
-void GroupCommitScanNode::set_scan_ranges(const std::vector<TScanRangeParams>&
scan_ranges) {}
+void GroupCommitScanNode::set_scan_ranges(RuntimeState* state,
+ const std::vector<TScanRangeParams>&
scan_ranges) {}
Status GroupCommitScanNode::_init_profile() {
return VScanNode::_init_profile();
diff --git a/be/src/vec/exec/scan/group_commit_scan_node.h
b/be/src/vec/exec/scan/group_commit_scan_node.h
index 86c504d5819..36a29c0ae66 100644
--- a/be/src/vec/exec/scan/group_commit_scan_node.h
+++ b/be/src/vec/exec/scan/group_commit_scan_node.h
@@ -31,7 +31,8 @@ public:
Status prepare(RuntimeState* state) override;
- void set_scan_ranges(const std::vector<TScanRangeParams>& scan_ranges)
override;
+ void set_scan_ranges(RuntimeState* state,
+ const std::vector<TScanRangeParams>& scan_ranges)
override;
std::string get_name() override;
diff --git a/be/src/vec/exec/scan/new_es_scan_node.cpp
b/be/src/vec/exec/scan/new_es_scan_node.cpp
index 6a3ec3a7389..4704f5eb8c6 100644
--- a/be/src/vec/exec/scan/new_es_scan_node.cpp
+++ b/be/src/vec/exec/scan/new_es_scan_node.cpp
@@ -109,7 +109,8 @@ Status NewEsScanNode::prepare(RuntimeState* state) {
return Status::OK();
}
-void NewEsScanNode::set_scan_ranges(const std::vector<TScanRangeParams>&
scan_ranges) {
+void NewEsScanNode::set_scan_ranges(RuntimeState* state,
+ const std::vector<TScanRangeParams>&
scan_ranges) {
for (auto& es_scan_range : scan_ranges) {
DCHECK(es_scan_range.scan_range.__isset.es_scan_range);
_scan_ranges.emplace_back(new
TEsScanRange(es_scan_range.scan_range.es_scan_range));
diff --git a/be/src/vec/exec/scan/new_es_scan_node.h
b/be/src/vec/exec/scan/new_es_scan_node.h
index 2cb6193d644..5185ead56bc 100644
--- a/be/src/vec/exec/scan/new_es_scan_node.h
+++ b/be/src/vec/exec/scan/new_es_scan_node.h
@@ -55,7 +55,8 @@ public:
std::string get_name() override;
Status init(const TPlanNode& tnode, RuntimeState* state = nullptr)
override;
Status prepare(RuntimeState* state) override;
- void set_scan_ranges(const std::vector<TScanRangeParams>& scan_ranges)
override;
+ void set_scan_ranges(RuntimeState* state,
+ const std::vector<TScanRangeParams>& scan_ranges)
override;
protected:
Status _init_profile() override;
diff --git a/be/src/vec/exec/scan/new_file_scan_node.cpp
b/be/src/vec/exec/scan/new_file_scan_node.cpp
index 71c6c79b61b..43f43722e9c 100644
--- a/be/src/vec/exec/scan/new_file_scan_node.cpp
+++ b/be/src/vec/exec/scan/new_file_scan_node.cpp
@@ -58,8 +58,11 @@ Status NewFileScanNode::prepare(RuntimeState* state) {
return Status::OK();
}
-void NewFileScanNode::set_scan_ranges(const std::vector<TScanRangeParams>&
scan_ranges) {
- int max_scanners = config::doris_scanner_thread_pool_thread_num;
+void NewFileScanNode::set_scan_ranges(RuntimeState* state,
+ const std::vector<TScanRangeParams>&
scan_ranges) {
+ int max_scanners =
+ config::doris_scanner_thread_pool_thread_num /
state->query_parallel_instance_num();
+ max_scanners = max_scanners == 0 ? 1 : max_scanners;
if (scan_ranges.size() <= max_scanners) {
_scan_ranges = scan_ranges;
} else {
@@ -122,7 +125,6 @@ Status
NewFileScanNode::_init_scanners(std::list<VScannerSPtr>* scanners) {
scanner->prepare(_conjuncts, &_colname_to_value_range,
&_colname_to_slot_id));
scanners->push_back(std::move(scanner));
}
-
return Status::OK();
}
diff --git a/be/src/vec/exec/scan/new_file_scan_node.h
b/be/src/vec/exec/scan/new_file_scan_node.h
index 740e354db5f..e6d9fc1e621 100644
--- a/be/src/vec/exec/scan/new_file_scan_node.h
+++ b/be/src/vec/exec/scan/new_file_scan_node.h
@@ -47,7 +47,8 @@ public:
Status prepare(RuntimeState* state) override;
- void set_scan_ranges(const std::vector<TScanRangeParams>& scan_ranges)
override;
+ void set_scan_ranges(RuntimeState* state,
+ const std::vector<TScanRangeParams>& scan_ranges)
override;
std::string get_name() override;
diff --git a/be/src/vec/exec/scan/new_odbc_scan_node.h
b/be/src/vec/exec/scan/new_odbc_scan_node.h
index f76d2b645be..c6dde0ab3dc 100644
--- a/be/src/vec/exec/scan/new_odbc_scan_node.h
+++ b/be/src/vec/exec/scan/new_odbc_scan_node.h
@@ -46,7 +46,8 @@ public:
std::string get_name() override;
// no use
- void set_scan_ranges(const std::vector<TScanRangeParams>& scan_ranges)
override {}
+ void set_scan_ranges(RuntimeState* state,
+ const std::vector<TScanRangeParams>& scan_ranges)
override {}
protected:
Status _init_profile() override;
diff --git a/be/src/vec/exec/scan/new_olap_scan_node.cpp
b/be/src/vec/exec/scan/new_olap_scan_node.cpp
index 3bbff6c20a6..8c95391d48f 100644
--- a/be/src/vec/exec/scan/new_olap_scan_node.cpp
+++ b/be/src/vec/exec/scan/new_olap_scan_node.cpp
@@ -402,7 +402,8 @@ Status
NewOlapScanNode::_should_push_down_function_filter(VectorizedFnCall* fn_c
// 9: optional string table_name
//}
// every doris_scan_range is related with one tablet so that one olap scan
node contains multiple tablet
-void NewOlapScanNode::set_scan_ranges(const std::vector<TScanRangeParams>&
scan_ranges) {
+void NewOlapScanNode::set_scan_ranges(RuntimeState* state,
+ const std::vector<TScanRangeParams>&
scan_ranges) {
for (auto& scan_range : scan_ranges) {
DCHECK(scan_range.scan_range.__isset.palo_scan_range);
_scan_ranges.emplace_back(new
TPaloScanRange(scan_range.scan_range.palo_scan_range));
diff --git a/be/src/vec/exec/scan/new_olap_scan_node.h
b/be/src/vec/exec/scan/new_olap_scan_node.h
index a26c300ffc4..93039c6182a 100644
--- a/be/src/vec/exec/scan/new_olap_scan_node.h
+++ b/be/src/vec/exec/scan/new_olap_scan_node.h
@@ -66,7 +66,8 @@ public:
Status collect_query_statistics(QueryStatistics* statistics) override;
Status collect_query_statistics(QueryStatistics* statistics, int
sender_id) override;
- void set_scan_ranges(const std::vector<TScanRangeParams>& scan_ranges)
override;
+ void set_scan_ranges(RuntimeState* state,
+ const std::vector<TScanRangeParams>& scan_ranges)
override;
std::string get_name() override;
diff --git a/be/src/vec/exec/scan/vmeta_scan_node.cpp
b/be/src/vec/exec/scan/vmeta_scan_node.cpp
index 3bdcbfbaaee..5ba559466ef 100644
--- a/be/src/vec/exec/scan/vmeta_scan_node.cpp
+++ b/be/src/vec/exec/scan/vmeta_scan_node.cpp
@@ -52,7 +52,8 @@ Status VMetaScanNode::prepare(RuntimeState* state) {
return Status::OK();
}
-void VMetaScanNode::set_scan_ranges(const std::vector<TScanRangeParams>&
scan_ranges) {
+void VMetaScanNode::set_scan_ranges(RuntimeState* state,
+ const std::vector<TScanRangeParams>&
scan_ranges) {
_scan_ranges = scan_ranges;
}
diff --git a/be/src/vec/exec/scan/vmeta_scan_node.h
b/be/src/vec/exec/scan/vmeta_scan_node.h
index caad8b1b7f9..73da9e85e37 100644
--- a/be/src/vec/exec/scan/vmeta_scan_node.h
+++ b/be/src/vec/exec/scan/vmeta_scan_node.h
@@ -47,7 +47,8 @@ public:
Status init(const TPlanNode& tnode, RuntimeState* state = nullptr)
override;
Status prepare(RuntimeState* state) override;
- void set_scan_ranges(const std::vector<TScanRangeParams>& scan_ranges)
override;
+ void set_scan_ranges(RuntimeState* state,
+ const std::vector<TScanRangeParams>& scan_ranges)
override;
const TMetaScanNode& scan_params() { return _scan_params; }
private:
@@ -61,4 +62,4 @@ private:
std::vector<TScanRangeParams> _scan_ranges;
};
-} // namespace doris::vectorized
\ No newline at end of file
+} // namespace doris::vectorized
diff --git a/be/src/vec/exec/scan/vscan_node.h
b/be/src/vec/exec/scan/vscan_node.h
index be72c7ca1f1..73961f59133 100644
--- a/be/src/vec/exec/scan/vscan_node.h
+++ b/be/src/vec/exec/scan/vscan_node.h
@@ -116,7 +116,8 @@ public:
Status open(RuntimeState* state) override;
- virtual void set_scan_ranges(const std::vector<TScanRangeParams>&
scan_ranges) {}
+ virtual void set_scan_ranges(RuntimeState* state,
+ const std::vector<TScanRangeParams>&
scan_ranges) {}
void set_shared_scan(RuntimeState* state, bool shared_scan) {
_shared_scan_opt = shared_scan;
diff --git a/be/src/vec/exec/vdata_gen_scan_node.cpp
b/be/src/vec/exec/vdata_gen_scan_node.cpp
index 999a88a4931..6c5db2c0161 100644
--- a/be/src/vec/exec/vdata_gen_scan_node.cpp
+++ b/be/src/vec/exec/vdata_gen_scan_node.cpp
@@ -137,7 +137,8 @@ Status VDataGenFunctionScanNode::close(RuntimeState* state)
{
return ExecNode::close(state);
}
-Status VDataGenFunctionScanNode::set_scan_ranges(const
std::vector<TScanRangeParams>& scan_ranges) {
+Status VDataGenFunctionScanNode::set_scan_ranges(RuntimeState* state,
+ const
std::vector<TScanRangeParams>& scan_ranges) {
return _table_func->set_scan_ranges(scan_ranges);
}
diff --git a/be/src/vec/exec/vdata_gen_scan_node.h
b/be/src/vec/exec/vdata_gen_scan_node.h
index 13e4e6408ad..0faa5d0fe3d 100644
--- a/be/src/vec/exec/vdata_gen_scan_node.h
+++ b/be/src/vec/exec/vdata_gen_scan_node.h
@@ -51,7 +51,8 @@ public:
Status close(RuntimeState* state) override;
// No use
- Status set_scan_ranges(const std::vector<TScanRangeParams>& scan_ranges)
override;
+ Status set_scan_ranges(RuntimeState* state,
+ const std::vector<TScanRangeParams>& scan_ranges)
override;
protected:
std::shared_ptr<VDataGenFunctionInf> _table_func;
diff --git a/be/src/vec/exec/vmysql_scan_node.cpp
b/be/src/vec/exec/vmysql_scan_node.cpp
index d744a54470c..c0720c5580b 100644
--- a/be/src/vec/exec/vmysql_scan_node.cpp
+++ b/be/src/vec/exec/vmysql_scan_node.cpp
@@ -227,7 +227,8 @@ void VMysqlScanNode::debug_string(int indentation_level,
std::stringstream* out)
}
}
-Status VMysqlScanNode::set_scan_ranges(const std::vector<TScanRangeParams>&
scan_ranges) {
+Status VMysqlScanNode::set_scan_ranges(RuntimeState* state,
+ const std::vector<TScanRangeParams>&
scan_ranges) {
return Status::OK();
}
-} // namespace doris::vectorized
\ No newline at end of file
+} // namespace doris::vectorized
diff --git a/be/src/vec/exec/vmysql_scan_node.h
b/be/src/vec/exec/vmysql_scan_node.h
index 742b94eaf50..85ecca784e4 100644
--- a/be/src/vec/exec/vmysql_scan_node.h
+++ b/be/src/vec/exec/vmysql_scan_node.h
@@ -49,7 +49,8 @@ public:
Status close(RuntimeState* state) override;
// No use
- Status set_scan_ranges(const std::vector<TScanRangeParams>& scan_ranges)
override;
+ Status set_scan_ranges(RuntimeState* state,
+ const std::vector<TScanRangeParams>& scan_ranges)
override;
private:
// Write debug string of this into out.
@@ -79,4 +80,4 @@ private:
DataTypeSerDe::FormatOptions _text_formatOptions;
};
} // namespace vectorized
-} // namespace doris
\ No newline at end of file
+} // namespace doris
diff --git a/be/src/vec/exec/vschema_scan_node.cpp
b/be/src/vec/exec/vschema_scan_node.cpp
index cf5659bf48b..57e0de2cca1 100644
--- a/be/src/vec/exec/vschema_scan_node.cpp
+++ b/be/src/vec/exec/vschema_scan_node.cpp
@@ -302,7 +302,8 @@ void VSchemaScanNode::debug_string(int indentation_level,
std::stringstream* out
}
}
-Status VSchemaScanNode::set_scan_ranges(const std::vector<TScanRangeParams>&
scan_ranges) {
+Status VSchemaScanNode::set_scan_ranges(RuntimeState* state,
+ const std::vector<TScanRangeParams>&
scan_ranges) {
return Status::OK();
}
diff --git a/be/src/vec/exec/vschema_scan_node.h
b/be/src/vec/exec/vschema_scan_node.h
index 589de552eec..1b551704fb7 100644
--- a/be/src/vec/exec/vschema_scan_node.h
+++ b/be/src/vec/exec/vschema_scan_node.h
@@ -56,7 +56,8 @@ public:
private:
// this is no use in this class
- Status set_scan_ranges(const std::vector<TScanRangeParams>& scan_ranges)
override;
+ Status set_scan_ranges(RuntimeState* state,
+ const std::vector<TScanRangeParams>& scan_ranges)
override;
// Write debug string of this into out.
void debug_string(int indentation_level, std::stringstream* out) const
override;
@@ -77,4 +78,4 @@ private:
std::unique_ptr<SchemaScanner> _schema_scanner;
};
} // namespace vectorized
-} // namespace doris
\ No newline at end of file
+} // namespace doris
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]