This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new fe251a92b1f [Chore](runtime-filter) adjust need_local_merge setting
conditions (#33886)
fe251a92b1f is described below
commit fe251a92b1fc24ccb7fedde5b72fb62e4f226cc4
Author: Pxl <[email protected]>
AuthorDate: Fri Apr 19 23:49:09 2024 +0800
[Chore](runtime-filter) adjust need_local_merge setting conditions (#33886)
---
.../pipeline_x/pipeline_x_fragment_context.cpp | 30 ++++++++--------------
be/src/runtime/runtime_state.cpp | 3 ---
2 files changed, 11 insertions(+), 22 deletions(-)
diff --git a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp
b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp
index 1a189cb88f6..5fd59d2955a 100644
--- a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp
+++ b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp
@@ -228,9 +228,8 @@ Status PipelineXFragmentContext::prepare(const
doris::TPipelineFragmentParams& r
_query_ctx->init_runtime_predicates({0});
}
- _need_local_merge =
- request.__isset.parallel_instances &&
- (request.__isset.per_node_shared_scans &&
!request.per_node_shared_scans.empty());
+ _need_local_merge = request.__isset.parallel_instances;
+
// 2. Build pipelines with operators in this fragment.
auto root_pipeline = add_pipeline();
RETURN_IF_ERROR_OR_CATCH_EXCEPTION(_build_pipelines(
@@ -934,14 +933,13 @@ Status
PipelineXFragmentContext::_create_operator(ObjectPool* pool, const TPlanN
// Therefore, here we need to use a stack-like structure.
_pipeline_parent_map.pop(cur_pipe, parent_idx, child_idx);
std::stringstream error_msg;
+
switch (tnode.node_type) {
case TPlanNodeType::OLAP_SCAN_NODE: {
op.reset(new OlapScanOperatorX(pool, tnode, next_operator_id(), descs,
_num_instances));
RETURN_IF_ERROR(cur_pipe->add_operator(op));
- if (find_with_default(request.per_node_shared_scans, op->node_id(),
false)) {
- if (request.__isset.parallel_instances) {
- cur_pipe->set_num_tasks(request.parallel_instances);
- }
+ if (request.__isset.parallel_instances) {
+ cur_pipe->set_num_tasks(request.parallel_instances);
op->set_ignore_data_distribution();
}
break;
@@ -955,10 +953,8 @@ Status
PipelineXFragmentContext::_create_operator(ObjectPool* pool, const TPlanN
"Jdbc scan node is disabled, you can change be config
enable_java_support "
"to true and restart be.");
}
- if (find_with_default(request.per_node_shared_scans, op->node_id(),
false)) {
- if (request.__isset.parallel_instances) {
- cur_pipe->set_num_tasks(request.parallel_instances);
- }
+ if (request.__isset.parallel_instances) {
+ cur_pipe->set_num_tasks(request.parallel_instances);
op->set_ignore_data_distribution();
}
break;
@@ -966,10 +962,8 @@ Status
PipelineXFragmentContext::_create_operator(ObjectPool* pool, const TPlanN
case doris::TPlanNodeType::FILE_SCAN_NODE: {
op.reset(new FileScanOperatorX(pool, tnode, next_operator_id(), descs,
_num_instances));
RETURN_IF_ERROR(cur_pipe->add_operator(op));
- if (find_with_default(request.per_node_shared_scans, op->node_id(),
false)) {
- if (request.__isset.parallel_instances) {
- cur_pipe->set_num_tasks(request.parallel_instances);
- }
+ if (request.__isset.parallel_instances) {
+ cur_pipe->set_num_tasks(request.parallel_instances);
op->set_ignore_data_distribution();
}
break;
@@ -978,10 +972,8 @@ Status
PipelineXFragmentContext::_create_operator(ObjectPool* pool, const TPlanN
case TPlanNodeType::ES_HTTP_SCAN_NODE: {
op.reset(new EsScanOperatorX(pool, tnode, next_operator_id(), descs,
_num_instances));
RETURN_IF_ERROR(cur_pipe->add_operator(op));
- if (find_with_default(request.per_node_shared_scans, op->node_id(),
false)) {
- if (request.__isset.parallel_instances) {
- cur_pipe->set_num_tasks(request.parallel_instances);
- }
+ if (request.__isset.parallel_instances) {
+ cur_pipe->set_num_tasks(request.parallel_instances);
op->set_ignore_data_distribution();
}
break;
diff --git a/be/src/runtime/runtime_state.cpp b/be/src/runtime/runtime_state.cpp
index aab31fa02ea..2713ee441dd 100644
--- a/be/src/runtime/runtime_state.cpp
+++ b/be/src/runtime/runtime_state.cpp
@@ -531,9 +531,6 @@ Status RuntimeState::register_producer_runtime_filter(const
doris::TRuntimeFilte
bool need_local_merge,
doris::IRuntimeFilter**
producer_filter,
bool build_bf_exactly) {
- // If runtime filter need to be local merged, `build_bf_exactly` will lead
to bloom filters with
- // different size need to be merged which is not allowed.
- // So if `need_local_merge` is true, we will disable `build_bf_exactly`.
if (desc.has_remote_targets || need_local_merge) {
return
global_runtime_filter_mgr()->register_local_merge_producer_filter(
desc, query_options(), producer_filter, build_bf_exactly);
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]