This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch branch-4.0
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-4.0 by this push:
new c992307c331 branch-4.0: [fix](pipeline)Fix set_operation not correctly
setting shuffled_operator #59293 (#59702)
c992307c331 is described below
commit c992307c331aa5d7be34b33fa18957850b89561c
Author: github-actions[bot]
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Mon Jan 12 14:05:26 2026 +0800
branch-4.0: [fix](pipeline)Fix set_operation not correctly setting
shuffled_operator #59293 (#59702)
Cherry-picked from #59293
Co-authored-by: Mryange <[email protected]>
---
be/src/pipeline/exec/set_probe_sink_operator.h | 2 ++
be/src/pipeline/exec/set_sink_operator.h | 2 ++
be/src/pipeline/pipeline_fragment_context.cpp | 12 +++++++-----
be/src/pipeline/pipeline_fragment_context.h | 3 ++-
4 files changed, 13 insertions(+), 6 deletions(-)
diff --git a/be/src/pipeline/exec/set_probe_sink_operator.h
b/be/src/pipeline/exec/set_probe_sink_operator.h
index 77e78c9ae6b..b92dcb1b387 100644
--- a/be/src/pipeline/exec/set_probe_sink_operator.h
+++ b/be/src/pipeline/exec/set_probe_sink_operator.h
@@ -113,6 +113,8 @@ public:
size_t get_reserve_mem_size(RuntimeState* state, bool eos) override;
+ bool is_shuffled_operator() const override { return true; }
+
private:
void _finalize_probe(SetProbeSinkLocalState<is_intersect>& local_state);
Status _extract_probe_column(SetProbeSinkLocalState<is_intersect>&
local_state,
diff --git a/be/src/pipeline/exec/set_sink_operator.h
b/be/src/pipeline/exec/set_sink_operator.h
index 2df817f6ab4..cccaff69361 100644
--- a/be/src/pipeline/exec/set_sink_operator.h
+++ b/be/src/pipeline/exec/set_sink_operator.h
@@ -117,6 +117,8 @@ public:
size_t get_reserve_mem_size(RuntimeState* state, bool eos) override;
+ bool is_shuffled_operator() const override { return true; }
+
private:
template <class HashTableContext, bool is_intersected>
friend struct HashTableBuild;
diff --git a/be/src/pipeline/pipeline_fragment_context.cpp
b/be/src/pipeline/pipeline_fragment_context.cpp
index 29ef3daad0b..4f047b8fb5b 100644
--- a/be/src/pipeline/pipeline_fragment_context.cpp
+++ b/be/src/pipeline/pipeline_fragment_context.cpp
@@ -1598,14 +1598,14 @@ Status
PipelineFragmentContext::_create_operator(ObjectPool* pool, const TPlanNo
}
case TPlanNodeType::INTERSECT_NODE: {
RETURN_IF_ERROR(_build_operators_for_set_operation_node<true>(
- pool, tnode, descs, op, cur_pipe, parent_idx, child_idx));
- op->set_followed_by_shuffled_operator(_require_bucket_distribution);
+ pool, tnode, descs, op, cur_pipe, parent_idx, child_idx,
+ followed_by_shuffled_operator));
break;
}
case TPlanNodeType::EXCEPT_NODE: {
RETURN_IF_ERROR(_build_operators_for_set_operation_node<false>(
- pool, tnode, descs, op, cur_pipe, parent_idx, child_idx));
- op->set_followed_by_shuffled_operator(_require_bucket_distribution);
+ pool, tnode, descs, op, cur_pipe, parent_idx, child_idx,
+ followed_by_shuffled_operator));
break;
}
case TPlanNodeType::REPEAT_NODE: {
@@ -1666,8 +1666,9 @@ Status
PipelineFragmentContext::_create_operator(ObjectPool* pool, const TPlanNo
template <bool is_intersect>
Status PipelineFragmentContext::_build_operators_for_set_operation_node(
ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs,
OperatorPtr& op,
- PipelinePtr& cur_pipe, int parent_idx, int child_idx) {
+ PipelinePtr& cur_pipe, int parent_idx, int child_idx, bool
followed_by_shuffled_operator) {
op.reset(new SetSourceOperatorX<is_intersect>(pool, tnode,
next_operator_id(), descs));
+ op->set_followed_by_shuffled_operator(followed_by_shuffled_operator);
RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
const auto downstream_pipeline_id = cur_pipe->id();
@@ -1687,6 +1688,7 @@ Status
PipelineFragmentContext::_build_operators_for_set_operation_node(
sink.reset(new SetProbeSinkOperatorX<is_intersect>(
child_id, next_sink_operator_id(), op->operator_id(),
pool, tnode, descs));
}
+ sink->set_followed_by_shuffled_operator(followed_by_shuffled_operator);
RETURN_IF_ERROR(probe_side_pipe->set_sink(sink));
RETURN_IF_ERROR(probe_side_pipe->sink()->init(tnode,
_runtime_state.get()));
// prepare children pipelines. if any pipeline found this as its
father, will use the prepared pipeline to build.
diff --git a/be/src/pipeline/pipeline_fragment_context.h
b/be/src/pipeline/pipeline_fragment_context.h
index 81b3f57b01f..3578572ae52 100644
--- a/be/src/pipeline/pipeline_fragment_context.h
+++ b/be/src/pipeline/pipeline_fragment_context.h
@@ -140,7 +140,8 @@ private:
Status _build_operators_for_set_operation_node(ObjectPool* pool, const
TPlanNode& tnode,
const DescriptorTbl& descs,
OperatorPtr& op,
PipelinePtr& cur_pipe, int
parent_idx,
- int child_idx);
+ int child_idx,
+ bool
followed_by_shuffled_operator);
Status _create_data_sink(ObjectPool* pool, const TDataSink& thrift_sink,
const std::vector<TExpr>& output_exprs,
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]