This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch branch-4.0
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-4.0 by this push:
     new c992307c331 branch-4.0: [fix](pipeline)Fix set_operation not correctly 
setting shuffled_operator #59293 (#59702)
c992307c331 is described below

commit c992307c331aa5d7be34b33fa18957850b89561c
Author: github-actions[bot] 
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Mon Jan 12 14:05:26 2026 +0800

    branch-4.0: [fix](pipeline)Fix set_operation not correctly setting 
shuffled_operator #59293 (#59702)
    
    Cherry-picked from #59293
    
    Co-authored-by: Mryange <[email protected]>
---
 be/src/pipeline/exec/set_probe_sink_operator.h |  2 ++
 be/src/pipeline/exec/set_sink_operator.h       |  2 ++
 be/src/pipeline/pipeline_fragment_context.cpp  | 12 +++++++-----
 be/src/pipeline/pipeline_fragment_context.h    |  3 ++-
 4 files changed, 13 insertions(+), 6 deletions(-)

diff --git a/be/src/pipeline/exec/set_probe_sink_operator.h 
b/be/src/pipeline/exec/set_probe_sink_operator.h
index 77e78c9ae6b..b92dcb1b387 100644
--- a/be/src/pipeline/exec/set_probe_sink_operator.h
+++ b/be/src/pipeline/exec/set_probe_sink_operator.h
@@ -113,6 +113,8 @@ public:
 
     size_t get_reserve_mem_size(RuntimeState* state, bool eos) override;
 
+    bool is_shuffled_operator() const override { return true; }
+
 private:
     void _finalize_probe(SetProbeSinkLocalState<is_intersect>& local_state);
     Status _extract_probe_column(SetProbeSinkLocalState<is_intersect>& 
local_state,
diff --git a/be/src/pipeline/exec/set_sink_operator.h 
b/be/src/pipeline/exec/set_sink_operator.h
index 2df817f6ab4..cccaff69361 100644
--- a/be/src/pipeline/exec/set_sink_operator.h
+++ b/be/src/pipeline/exec/set_sink_operator.h
@@ -117,6 +117,8 @@ public:
 
     size_t get_reserve_mem_size(RuntimeState* state, bool eos) override;
 
+    bool is_shuffled_operator() const override { return true; }
+
 private:
     template <class HashTableContext, bool is_intersected>
     friend struct HashTableBuild;
diff --git a/be/src/pipeline/pipeline_fragment_context.cpp 
b/be/src/pipeline/pipeline_fragment_context.cpp
index 29ef3daad0b..4f047b8fb5b 100644
--- a/be/src/pipeline/pipeline_fragment_context.cpp
+++ b/be/src/pipeline/pipeline_fragment_context.cpp
@@ -1598,14 +1598,14 @@ Status 
PipelineFragmentContext::_create_operator(ObjectPool* pool, const TPlanNo
     }
     case TPlanNodeType::INTERSECT_NODE: {
         RETURN_IF_ERROR(_build_operators_for_set_operation_node<true>(
-                pool, tnode, descs, op, cur_pipe, parent_idx, child_idx));
-        op->set_followed_by_shuffled_operator(_require_bucket_distribution);
+                pool, tnode, descs, op, cur_pipe, parent_idx, child_idx,
+                followed_by_shuffled_operator));
         break;
     }
     case TPlanNodeType::EXCEPT_NODE: {
         RETURN_IF_ERROR(_build_operators_for_set_operation_node<false>(
-                pool, tnode, descs, op, cur_pipe, parent_idx, child_idx));
-        op->set_followed_by_shuffled_operator(_require_bucket_distribution);
+                pool, tnode, descs, op, cur_pipe, parent_idx, child_idx,
+                followed_by_shuffled_operator));
         break;
     }
     case TPlanNodeType::REPEAT_NODE: {
@@ -1666,8 +1666,9 @@ Status 
PipelineFragmentContext::_create_operator(ObjectPool* pool, const TPlanNo
 template <bool is_intersect>
 Status PipelineFragmentContext::_build_operators_for_set_operation_node(
         ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs, 
OperatorPtr& op,
-        PipelinePtr& cur_pipe, int parent_idx, int child_idx) {
+        PipelinePtr& cur_pipe, int parent_idx, int child_idx, bool 
followed_by_shuffled_operator) {
     op.reset(new SetSourceOperatorX<is_intersect>(pool, tnode, 
next_operator_id(), descs));
+    op->set_followed_by_shuffled_operator(followed_by_shuffled_operator);
     RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
 
     const auto downstream_pipeline_id = cur_pipe->id();
@@ -1687,6 +1688,7 @@ Status 
PipelineFragmentContext::_build_operators_for_set_operation_node(
             sink.reset(new SetProbeSinkOperatorX<is_intersect>(
                     child_id, next_sink_operator_id(), op->operator_id(), 
pool, tnode, descs));
         }
+        sink->set_followed_by_shuffled_operator(followed_by_shuffled_operator);
         RETURN_IF_ERROR(probe_side_pipe->set_sink(sink));
         RETURN_IF_ERROR(probe_side_pipe->sink()->init(tnode, 
_runtime_state.get()));
         // prepare children pipelines. if any pipeline found this as its 
father, will use the prepared pipeline to build.
diff --git a/be/src/pipeline/pipeline_fragment_context.h 
b/be/src/pipeline/pipeline_fragment_context.h
index 81b3f57b01f..3578572ae52 100644
--- a/be/src/pipeline/pipeline_fragment_context.h
+++ b/be/src/pipeline/pipeline_fragment_context.h
@@ -140,7 +140,8 @@ private:
     Status _build_operators_for_set_operation_node(ObjectPool* pool, const 
TPlanNode& tnode,
                                                    const DescriptorTbl& descs, 
OperatorPtr& op,
                                                    PipelinePtr& cur_pipe, int 
parent_idx,
-                                                   int child_idx);
+                                                   int child_idx,
+                                                   bool 
followed_by_shuffled_operator);
 
     Status _create_data_sink(ObjectPool* pool, const TDataSink& thrift_sink,
                              const std::vector<TExpr>& output_exprs,


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to