This is an automated email from the ASF dual-hosted git repository.

gabriellee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new eb1c43ef751 [fix](union) Fix union operator (#60334)
eb1c43ef751 is described below

commit eb1c43ef751528cbbed28864ac6360331291a639
Author: Gabriel <[email protected]>
AuthorDate: Mon Feb 2 16:08:06 2026 +0800

    [fix](union) Fix union operator (#60334)
---
 be/src/pipeline/exec/aggregation_sink_operator.cpp |  36 ++---
 be/src/pipeline/exec/aggregation_sink_operator.h   |  17 ++-
 be/src/pipeline/exec/analytic_sink_operator.cpp    |   7 +-
 be/src/pipeline/exec/analytic_sink_operator.h      |  19 ++-
 .../distinct_streaming_aggregation_operator.cpp    |   9 +-
 .../exec/distinct_streaming_aggregation_operator.h |  20 ++-
 be/src/pipeline/exec/hashjoin_build_sink.h         |  12 +-
 be/src/pipeline/exec/hashjoin_probe_operator.h     |  12 +-
 be/src/pipeline/exec/operator.cpp                  |  14 +-
 be/src/pipeline/exec/operator.h                    |  46 +++++--
 .../exec/partitioned_aggregation_sink_operator.cpp |   7 +-
 .../exec/partitioned_aggregation_sink_operator.h   |  16 ++-
 .../partitioned_aggregation_source_operator.cpp    |  20 +++
 .../exec/partitioned_aggregation_source_operator.h |   6 +
 .../exec/partitioned_hash_join_probe_operator.h    |  17 ++-
 .../exec/partitioned_hash_join_sink_operator.h     |  17 ++-
 be/src/pipeline/exec/set_probe_sink_operator.h     |   3 +-
 be/src/pipeline/exec/set_sink_operator.h           |   3 +-
 be/src/pipeline/exec/set_source_operator.h         |  14 +-
 be/src/pipeline/exec/sort_sink_operator.cpp        |   4 +-
 be/src/pipeline/exec/sort_sink_operator.h          |  10 +-
 be/src/pipeline/exec/spill_sort_sink_operator.cpp  |   7 +-
 be/src/pipeline/exec/spill_sort_sink_operator.h    |  14 +-
 .../exec/streaming_aggregation_operator.cpp        |  39 +++---
 .../pipeline/exec/streaming_aggregation_operator.h |   6 +-
 be/src/pipeline/exec/union_sink_operator.h         |  18 +--
 be/src/pipeline/exec/union_source_operator.h       |  12 +-
 be/src/pipeline/pipeline_fragment_context.cpp      | 151 +++++++++------------
 be/src/pipeline/pipeline_fragment_context.h        |   3 +-
 .../operator/hashjoin_probe_operator_test.cpp      |   4 +-
 .../partitioned_aggregation_test_helper.cpp        |   4 +-
 .../operator/partitioned_aggregation_test_helper.h |   2 +-
 .../operator/spill_sort_sink_operator_test.cpp     |   4 +-
 .../pipeline/operator/spill_sort_test_helper.cpp   |   2 +-
 34 files changed, 331 insertions(+), 244 deletions(-)

diff --git a/be/src/pipeline/exec/aggregation_sink_operator.cpp 
b/be/src/pipeline/exec/aggregation_sink_operator.cpp
index ddb4bbfbe6f..b7b05918d2a 100644
--- a/be/src/pipeline/exec/aggregation_sink_operator.cpp
+++ b/be/src/pipeline/exec/aggregation_sink_operator.cpp
@@ -724,8 +724,7 @@ size_t 
AggSinkLocalState::get_reserve_mem_size(RuntimeState* state, bool eos) co
 
 // TODO: Tricky processing if `multi_distinct_` exists which will be re-planed 
by optimizer.
 AggSinkOperatorX::AggSinkOperatorX(ObjectPool* pool, int operator_id, int 
dest_id,
-                                   const TPlanNode& tnode, const 
DescriptorTbl& descs,
-                                   bool require_bucket_distribution)
+                                   const TPlanNode& tnode, const 
DescriptorTbl& descs)
         : DataSinkOperatorX<AggSinkLocalState>(operator_id, tnode, dest_id),
           _intermediate_tuple_id(tnode.agg_node.intermediate_tuple_id),
           _output_tuple_id(tnode.agg_node.output_tuple_id),
@@ -736,24 +735,27 @@ AggSinkOperatorX::AggSinkOperatorX(ObjectPool* pool, int 
operator_id, int dest_i
           _limit(tnode.limit),
           _have_conjuncts((tnode.__isset.vconjunct && 
!tnode.vconjunct.nodes.empty()) ||
                           (tnode.__isset.conjuncts && 
!tnode.conjuncts.empty())),
-          _partition_exprs(
-                  tnode.__isset.distribute_expr_lists &&
-                                  (require_bucket_distribution ||
-                                   std::any_of(
-                                           
tnode.agg_node.aggregate_functions.begin(),
-                                           
tnode.agg_node.aggregate_functions.end(),
-                                           [](const TExpr& texpr) -> bool {
-                                               return texpr.nodes[0]
-                                                       
.fn.name.function_name.starts_with(
-                                                               vectorized::
-                                                                       
DISTINCT_FUNCTION_PREFIX);
-                                           }))
-                          ? tnode.distribute_expr_lists[0]
-                          : tnode.agg_node.grouping_exprs),
           _is_colocate(tnode.agg_node.__isset.is_colocate && 
tnode.agg_node.is_colocate),
-          _require_bucket_distribution(require_bucket_distribution),
           _agg_fn_output_row_descriptor(descs, tnode.row_tuples) {}
 
+void AggSinkOperatorX::update_operator(const TPlanNode& tnode, bool 
followed_by_shuffled_operator,
+                                       bool require_bucket_distribution) {
+    _followed_by_shuffled_operator = followed_by_shuffled_operator;
+    _require_bucket_distribution = require_bucket_distribution;
+    _partition_exprs =
+            tnode.__isset.distribute_expr_lists &&
+                            (_followed_by_shuffled_operator ||
+                             std::any_of(
+                                     
tnode.agg_node.aggregate_functions.begin(),
+                                     tnode.agg_node.aggregate_functions.end(),
+                                     [](const TExpr& texpr) -> bool {
+                                         return 
texpr.nodes[0].fn.name.function_name.starts_with(
+                                                 
vectorized::DISTINCT_FUNCTION_PREFIX);
+                                     }))
+                    ? tnode.distribute_expr_lists[0]
+                    : tnode.agg_node.grouping_exprs;
+}
+
 Status AggSinkOperatorX::init(const TPlanNode& tnode, RuntimeState* state) {
     RETURN_IF_ERROR(DataSinkOperatorX<AggSinkLocalState>::init(tnode, state));
     // ignore return status for now , so we need to introduce ExecNode::init()
diff --git a/be/src/pipeline/exec/aggregation_sink_operator.h 
b/be/src/pipeline/exec/aggregation_sink_operator.h
index 6946578109d..d85308a7cb2 100644
--- a/be/src/pipeline/exec/aggregation_sink_operator.h
+++ b/be/src/pipeline/exec/aggregation_sink_operator.h
@@ -131,14 +131,11 @@ protected:
 class AggSinkOperatorX MOCK_REMOVE(final) : public 
DataSinkOperatorX<AggSinkLocalState> {
 public:
     AggSinkOperatorX(ObjectPool* pool, int operator_id, int dest_id, const 
TPlanNode& tnode,
-                     const DescriptorTbl& descs, bool 
require_bucket_distribution);
+                     const DescriptorTbl& descs);
 
 #ifdef BE_TEST
     AggSinkOperatorX()
-            : DataSinkOperatorX<AggSinkLocalState>(1, 0, 2),
-              _is_first_phase(),
-              _is_colocate(),
-              _require_bucket_distribution() {}
+            : DataSinkOperatorX<AggSinkLocalState>(1, 0, 2), 
_is_first_phase(), _is_colocate() {}
 #endif
 
     ~AggSinkOperatorX() override = default;
@@ -148,6 +145,8 @@ public:
     }
 
     Status init(const TPlanNode& tnode, RuntimeState* state) override;
+    void update_operator(const TPlanNode& tnode, bool 
followed_by_shuffled_operator,
+                         bool require_bucket_distribution) override;
 
     Status prepare(RuntimeState* state) override;
 
@@ -160,11 +159,12 @@ public:
                            : 
DataSinkOperatorX<AggSinkLocalState>::required_data_distribution(
                                      state);
         }
-        return _is_colocate && _require_bucket_distribution && 
!_followed_by_shuffled_operator
+        return _is_colocate && _require_bucket_distribution
                        ? DataDistribution(ExchangeType::BUCKET_HASH_SHUFFLE, 
_partition_exprs)
                        : DataDistribution(ExchangeType::HASH_SHUFFLE, 
_partition_exprs);
     }
-    bool require_data_distribution() const override { return _is_colocate; }
+    bool is_colocated_operator() const override { return _is_colocate; }
+    bool is_shuffled_operator() const override { return 
!_partition_exprs.empty(); }
     size_t get_revocable_mem_size(RuntimeState* state) const;
 
     AggregatedDataVariants* get_agg_data(RuntimeState* state) {
@@ -221,9 +221,8 @@ protected:
     std::vector<int> _null_directions;
 
     bool _have_conjuncts;
-    const std::vector<TExpr> _partition_exprs;
+    std::vector<TExpr> _partition_exprs;
     const bool _is_colocate;
-    const bool _require_bucket_distribution;
     RowDescriptor _agg_fn_output_row_descriptor;
 };
 
diff --git a/be/src/pipeline/exec/analytic_sink_operator.cpp 
b/be/src/pipeline/exec/analytic_sink_operator.cpp
index a2f30a30337..33ba9053103 100644
--- a/be/src/pipeline/exec/analytic_sink_operator.cpp
+++ b/be/src/pipeline/exec/analytic_sink_operator.cpp
@@ -632,8 +632,7 @@ int64_t 
AnalyticSinkLocalState::find_first_not_equal(vectorized::IColumn* refere
 }
 
 AnalyticSinkOperatorX::AnalyticSinkOperatorX(ObjectPool* pool, int 
operator_id, int dest_id,
-                                             const TPlanNode& tnode, const 
DescriptorTbl& descs,
-                                             bool require_bucket_distribution)
+                                             const TPlanNode& tnode, const 
DescriptorTbl& descs)
         : DataSinkOperatorX(operator_id, tnode, dest_id),
           _pool(pool),
           _intermediate_tuple_id(tnode.analytic_node.intermediate_tuple_id),
@@ -642,10 +641,6 @@ AnalyticSinkOperatorX::AnalyticSinkOperatorX(ObjectPool* 
pool, int operator_id,
                                      ? tnode.analytic_node.buffered_tuple_id
                                      : 0),
           _is_colocate(tnode.analytic_node.__isset.is_colocate && 
tnode.analytic_node.is_colocate),
-          _require_bucket_distribution(require_bucket_distribution),
-          _partition_exprs(tnode.__isset.distribute_expr_lists && 
require_bucket_distribution
-                                   ? tnode.distribute_expr_lists[0]
-                                   : tnode.analytic_node.partition_exprs),
           _window(tnode.analytic_node.window),
           _has_window(tnode.analytic_node.__isset.window),
           _has_range_window(tnode.analytic_node.window.type == 
TAnalyticWindowType::RANGE),
diff --git a/be/src/pipeline/exec/analytic_sink_operator.h 
b/be/src/pipeline/exec/analytic_sink_operator.h
index 4815102f6ff..c4168a33c4a 100644
--- a/be/src/pipeline/exec/analytic_sink_operator.h
+++ b/be/src/pipeline/exec/analytic_sink_operator.h
@@ -180,7 +180,7 @@ private:
 class AnalyticSinkOperatorX final : public 
DataSinkOperatorX<AnalyticSinkLocalState> {
 public:
     AnalyticSinkOperatorX(ObjectPool* pool, int operator_id, int dest_id, 
const TPlanNode& tnode,
-                          const DescriptorTbl& descs, bool 
require_bucket_distribution);
+                          const DescriptorTbl& descs);
 
 #ifdef BE_TEST
     AnalyticSinkOperatorX(ObjectPool* pool)
@@ -189,7 +189,6 @@ public:
               _output_tuple_id(0),
               _buffered_tuple_id(0),
               _is_colocate(false),
-              _require_bucket_distribution(false),
               _has_window(false),
               _has_range_window(false),
               _has_window_start(false),
@@ -202,6 +201,14 @@ public:
     }
 
     Status init(const TPlanNode& tnode, RuntimeState* state) override;
+    void update_operator(const TPlanNode& tnode, bool 
followed_by_shuffled_operator,
+                         bool require_bucket_distribution) override {
+        _followed_by_shuffled_operator = followed_by_shuffled_operator;
+        _require_bucket_distribution = require_bucket_distribution;
+        _partition_exprs = tnode.__isset.distribute_expr_lists && 
_followed_by_shuffled_operator
+                                   ? tnode.distribute_expr_lists[0]
+                                   : tnode.analytic_node.partition_exprs;
+    }
 
     Status prepare(RuntimeState* state) override;
 
@@ -210,13 +217,14 @@ public:
         if (_partition_by_eq_expr_ctxs.empty()) {
             return {ExchangeType::PASSTHROUGH};
         } else {
-            return _is_colocate && _require_bucket_distribution && 
!_followed_by_shuffled_operator
+            return _is_colocate && _require_bucket_distribution
                            ? 
DataDistribution(ExchangeType::BUCKET_HASH_SHUFFLE, _partition_exprs)
                            : DataDistribution(ExchangeType::HASH_SHUFFLE, 
_partition_exprs);
         }
     }
 
-    bool require_data_distribution() const override { return true; }
+    bool is_colocated_operator() const override { return _is_colocate; }
+    bool is_shuffled_operator() const override { return 
!_partition_by_eq_expr_ctxs.empty(); }
 
     size_t get_reserve_mem_size(RuntimeState* state, bool eos) override;
 
@@ -243,8 +251,7 @@ private:
     const TTupleId _buffered_tuple_id;
 
     const bool _is_colocate;
-    const bool _require_bucket_distribution;
-    const std::vector<TExpr> _partition_exprs;
+    std::vector<TExpr> _partition_exprs;
 
     TAnalyticWindow _window;
     bool _has_window;
diff --git a/be/src/pipeline/exec/distinct_streaming_aggregation_operator.cpp 
b/be/src/pipeline/exec/distinct_streaming_aggregation_operator.cpp
index 6363355db73..9cafe75ec76 100644
--- a/be/src/pipeline/exec/distinct_streaming_aggregation_operator.cpp
+++ b/be/src/pipeline/exec/distinct_streaming_aggregation_operator.cpp
@@ -321,17 +321,12 @@ void 
DistinctStreamingAggLocalState::_emplace_into_hash_table_to_distinct(
 
 DistinctStreamingAggOperatorX::DistinctStreamingAggOperatorX(ObjectPool* pool, 
int operator_id,
                                                              const TPlanNode& 
tnode,
-                                                             const 
DescriptorTbl& descs,
-                                                             bool 
require_bucket_distribution)
+                                                             const 
DescriptorTbl& descs)
         : StatefulOperatorX<DistinctStreamingAggLocalState>(pool, tnode, 
operator_id, descs),
           _output_tuple_id(tnode.agg_node.output_tuple_id),
           _needs_finalize(tnode.agg_node.need_finalize),
           _is_first_phase(tnode.agg_node.__isset.is_first_phase && 
tnode.agg_node.is_first_phase),
-          _partition_exprs(tnode.__isset.distribute_expr_lists && 
require_bucket_distribution
-                                   ? tnode.distribute_expr_lists[0]
-                                   : tnode.agg_node.grouping_exprs),
-          _is_colocate(tnode.agg_node.__isset.is_colocate && 
tnode.agg_node.is_colocate),
-          _require_bucket_distribution(require_bucket_distribution) {
+          _is_colocate(tnode.agg_node.__isset.is_colocate && 
tnode.agg_node.is_colocate) {
     if (tnode.agg_node.__isset.use_streaming_preaggregation) {
         _is_streaming_preagg = tnode.agg_node.use_streaming_preaggregation;
         if (_is_streaming_preagg) {
diff --git a/be/src/pipeline/exec/distinct_streaming_aggregation_operator.h 
b/be/src/pipeline/exec/distinct_streaming_aggregation_operator.h
index b0baf64ff96..805223aa8d6 100644
--- a/be/src/pipeline/exec/distinct_streaming_aggregation_operator.h
+++ b/be/src/pipeline/exec/distinct_streaming_aggregation_operator.h
@@ -93,17 +93,24 @@ class DistinctStreamingAggOperatorX final
         : public StatefulOperatorX<DistinctStreamingAggLocalState> {
 public:
     DistinctStreamingAggOperatorX(ObjectPool* pool, int operator_id, const 
TPlanNode& tnode,
-                                  const DescriptorTbl& descs, bool 
require_bucket_distribution);
+                                  const DescriptorTbl& descs);
 #ifdef BE_TEST
     DistinctStreamingAggOperatorX()
             : _needs_finalize(false),
               _is_first_phase(true),
               _partition_exprs({}),
-              _is_colocate(false),
-              _require_bucket_distribution {false} {}
+              _is_colocate(false) {}
 #endif
 
     Status init(const TPlanNode& tnode, RuntimeState* state) override;
+    void update_operator(const TPlanNode& tnode, bool 
followed_by_shuffled_operator,
+                         bool require_bucket_distribution) override {
+        _followed_by_shuffled_operator = followed_by_shuffled_operator;
+        _require_bucket_distribution = require_bucket_distribution;
+        _partition_exprs = tnode.__isset.distribute_expr_lists && 
_followed_by_shuffled_operator
+                                   ? tnode.distribute_expr_lists[0]
+                                   : tnode.agg_node.grouping_exprs;
+    }
     Status prepare(RuntimeState* state) override;
     Status pull(RuntimeState* state, vectorized::Block* block, bool* eos) 
const override;
     Status push(RuntimeState* state, vectorized::Block* input_block, bool eos) 
const override;
@@ -114,14 +121,14 @@ public:
             return {ExchangeType::NOOP};
         }
         if (_needs_finalize || (!_probe_expr_ctxs.empty() && 
!_is_streaming_preagg)) {
-            return _is_colocate && _require_bucket_distribution && 
!_followed_by_shuffled_operator
+            return _is_colocate && _require_bucket_distribution
                            ? 
DataDistribution(ExchangeType::BUCKET_HASH_SHUFFLE, _partition_exprs)
                            : DataDistribution(ExchangeType::HASH_SHUFFLE, 
_partition_exprs);
         }
         return {ExchangeType::PASSTHROUGH};
     }
 
-    bool require_data_distribution() const override { return _is_colocate; }
+    bool is_colocated_operator() const override { return _is_colocate; }
 
 private:
     friend class DistinctStreamingAggLocalState;
@@ -130,9 +137,8 @@ private:
     TupleDescriptor* _output_tuple_desc = nullptr;
     const bool _needs_finalize;
     const bool _is_first_phase;
-    const std::vector<TExpr> _partition_exprs;
+    std::vector<TExpr> _partition_exprs;
     const bool _is_colocate;
-    const bool _require_bucket_distribution;
     // group by k1,k2
     vectorized::VExprContextSPtrs _probe_expr_ctxs;
     std::vector<size_t> _make_nullable_keys;
diff --git a/be/src/pipeline/exec/hashjoin_build_sink.h 
b/be/src/pipeline/exec/hashjoin_build_sink.h
index 45ab687a631..a08f80063e7 100644
--- a/be/src/pipeline/exec/hashjoin_build_sink.h
+++ b/be/src/pipeline/exec/hashjoin_build_sink.h
@@ -140,11 +140,13 @@ public:
     }
 
     bool is_shuffled_operator() const override {
-        return _join_distribution == TJoinDistributionType::PARTITIONED;
+        return _join_distribution == TJoinDistributionType::PARTITIONED ||
+               _join_distribution == TJoinDistributionType::BUCKET_SHUFFLE ||
+               _join_distribution == TJoinDistributionType::COLOCATE;
     }
-    bool require_data_distribution() const override {
-        return _join_distribution != TJoinDistributionType::BROADCAST &&
-               _join_distribution != TJoinDistributionType::NONE;
+    bool is_colocated_operator() const override {
+        return _join_distribution == TJoinDistributionType::BUCKET_SHUFFLE ||
+               _join_distribution == TJoinDistributionType::COLOCATE;
     }
     std::vector<bool>& is_null_safe_eq_join() { return _is_null_safe_eq_join; }
 
@@ -161,7 +163,7 @@ private:
     std::vector<bool> _is_null_safe_eq_join;
 
     bool _is_broadcast_join = false;
-    const std::vector<TExpr> _partition_exprs;
+    std::vector<TExpr> _partition_exprs;
 
     std::vector<SlotId> _hash_output_slot_ids;
     std::vector<bool> _should_keep_column_flags;
diff --git a/be/src/pipeline/exec/hashjoin_probe_operator.h 
b/be/src/pipeline/exec/hashjoin_probe_operator.h
index a0ad3e56c8b..4b455fadcab 100644
--- a/be/src/pipeline/exec/hashjoin_probe_operator.h
+++ b/be/src/pipeline/exec/hashjoin_probe_operator.h
@@ -147,11 +147,13 @@ public:
     bool is_broadcast_join() const { return _is_broadcast_join; }
 
     bool is_shuffled_operator() const override {
-        return _join_distribution == TJoinDistributionType::PARTITIONED;
+        return _join_distribution == TJoinDistributionType::PARTITIONED ||
+               _join_distribution == TJoinDistributionType::BUCKET_SHUFFLE ||
+               _join_distribution == TJoinDistributionType::COLOCATE;
     }
-    bool require_data_distribution() const override {
-        return _join_distribution != TJoinDistributionType::BROADCAST &&
-               _join_distribution != TJoinDistributionType::NONE;
+    bool is_colocated_operator() const override {
+        return _join_distribution == TJoinDistributionType::BUCKET_SHUFFLE ||
+               _join_distribution == TJoinDistributionType::COLOCATE;
     }
 
     bool need_finalize_variant_column() const { return 
_need_finalize_variant_column; }
@@ -192,7 +194,7 @@ private:
     bool _need_finalize_variant_column = false;
     std::set<int> _should_not_lazy_materialized_column_ids;
     std::vector<std::string> _right_table_column_names;
-    const std::vector<TExpr> _partition_exprs;
+    std::vector<TExpr> _partition_exprs;
 
     // Index of column(slot) from right table in the `_intermediate_row_desc`.
     size_t _right_col_idx;
diff --git a/be/src/pipeline/exec/operator.cpp 
b/be/src/pipeline/exec/operator.cpp
index cf6327bcf0c..7258b4ca028 100644
--- a/be/src/pipeline/exec/operator.cpp
+++ b/be/src/pipeline/exec/operator.cpp
@@ -145,10 +145,6 @@ DataDistribution 
OperatorBase::required_data_distribution(RuntimeState* /*state*
                    : DataDistribution(ExchangeType::NOOP);
 }
 
-bool OperatorBase::require_shuffled_data_distribution(RuntimeState* state) 
const {
-    return 
Pipeline::is_hash_exchange(required_data_distribution(state).distribution_type);
-}
-
 const RowDescriptor& OperatorBase::row_desc() const {
     return _child->row_desc();
 }
@@ -566,6 +562,11 @@ Status 
PipelineXLocalState<SharedStateArg>::init(RuntimeState* state, LocalState
     _exec_timer = ADD_TIMER_WITH_LEVEL(_common_profile, "ExecTime", 1);
     _memory_used_counter =
             _common_profile->AddHighWaterMarkCounter("MemoryUsage", 
TUnit::BYTES, "", 1);
+    _common_profile->add_info_string("IsColocate",
+                                     
std::to_string(_parent->is_colocated_operator()));
+    _common_profile->add_info_string("IsShuffled", 
std::to_string(_parent->is_shuffled_operator()));
+    _common_profile->add_info_string("FollowedByShuffledOperator",
+                                     
std::to_string(_parent->followed_by_shuffled_operator()));
     return Status::OK();
 }
 
@@ -664,6 +665,11 @@ Status 
PipelineXSinkLocalState<SharedState>::init(RuntimeState* state, LocalSink
     _exec_timer = ADD_TIMER_WITH_LEVEL(_common_profile, "ExecTime", 1);
     _memory_used_counter =
             _common_profile->AddHighWaterMarkCounter("MemoryUsage", 
TUnit::BYTES, "", 1);
+    _common_profile->add_info_string("IsColocate",
+                                     
std::to_string(_parent->is_colocated_operator()));
+    _common_profile->add_info_string("IsShuffled", 
std::to_string(_parent->is_shuffled_operator()));
+    _common_profile->add_info_string("FollowedByShuffledOperator",
+                                     
std::to_string(_parent->followed_by_shuffled_operator()));
     return Status::OK();
 }
 
diff --git a/be/src/pipeline/exec/operator.h b/be/src/pipeline/exec/operator.h
index 1b6afecd0e6..cc56f1d11ca 100644
--- a/be/src/pipeline/exec/operator.h
+++ b/be/src/pipeline/exec/operator.h
@@ -143,31 +143,59 @@ public:
      * pipeline task into the blocking task scheduler.
      */
     virtual bool is_blockable(RuntimeState* state) const = 0;
-
     virtual void set_low_memory_mode(RuntimeState* state) {}
 
-    [[nodiscard]] virtual bool require_data_distribution() const { return 
false; }
     OperatorPtr child() { return _child; }
+    virtual Status reset(RuntimeState* state) {
+        return Status::InternalError("Reset is not implemented in operator: 
{}", get_name());
+    }
+
+    /* -------------- Interfaces to determine the input data properties 
-------------- */
+    /**
+     * Return True if this operator relies on the bucket distribution (e.g. 
COLOCATE join, 1-phase AGG).
+     * Data input to this kind of operators must have the same distribution 
with the table buckets.
+     * It is also means `required_data_distribution` should be 
`BUCKET_HASH_SHUFFLE`.
+     * @return
+     */
+    [[nodiscard]] virtual bool is_colocated_operator() const { return false; }
+    /**
+     * Return True if this operator relies on the bucket distribution or 
specific hash data distribution (e.g. SHUFFLED HASH join).
+     * Data input to this kind of operators must be HASH distributed according 
to some rules.
+     * All colocated operators are also shuffled operators.
+     * It is also means `required_data_distribution` should be 
`BUCKET_HASH_SHUFFLE` or `HASH_SHUFFLE`.
+     * @return
+     */
+    [[nodiscard]] virtual bool is_shuffled_operator() const { return false; }
+    /**
+     * Return True if this operator is followed by a shuffled operator.
+     * For example, in the plan fragment:
+     *   `UNION` -> `SHUFFLED HASH JOIN`
+     * The `SHUFFLED HASH JOIN` is a shuffled operator so the UNION operator 
is followed by a shuffled operator.
+     */
     [[nodiscard]] bool followed_by_shuffled_operator() const {
         return _followed_by_shuffled_operator;
     }
-    void set_followed_by_shuffled_operator(bool followed_by_shuffled_operator) 
{
+    /**
+     * Update the operator properties according to the plan node.
+     * This is called before `prepare`.
+     */
+    virtual void update_operator(const TPlanNode& tnode, bool 
followed_by_shuffled_operator,
+                                 bool require_bucket_distribution) {
         _followed_by_shuffled_operator = followed_by_shuffled_operator;
+        _require_bucket_distribution = require_bucket_distribution;
     }
-    [[nodiscard]] virtual bool is_shuffled_operator() const { return false; }
+    /**
+     * Return the required data distribution of this operator.
+     */
     [[nodiscard]] virtual DataDistribution required_data_distribution(
             RuntimeState* /*state*/) const;
-    [[nodiscard]] virtual bool 
require_shuffled_data_distribution(RuntimeState* /*state*/) const;
-
-    virtual Status reset(RuntimeState* state) {
-        return Status::InternalError("Reset is not implemented in operator: 
{}", get_name());
-    }
 
 protected:
     OperatorPtr _child = nullptr;
 
     bool _is_closed;
     bool _followed_by_shuffled_operator = false;
+    bool _require_bucket_distribution = false;
     bool _is_serial_operator = false;
 };
 
diff --git a/be/src/pipeline/exec/partitioned_aggregation_sink_operator.cpp 
b/be/src/pipeline/exec/partitioned_aggregation_sink_operator.cpp
index 391f30d82d1..74c12352a88 100644
--- a/be/src/pipeline/exec/partitioned_aggregation_sink_operator.cpp
+++ b/be/src/pipeline/exec/partitioned_aggregation_sink_operator.cpp
@@ -114,11 +114,10 @@ void 
PartitionedAggSinkLocalState::update_profile(RuntimeProfile* child_profile)
 
 PartitionedAggSinkOperatorX::PartitionedAggSinkOperatorX(ObjectPool* pool, int 
operator_id,
                                                          int dest_id, const 
TPlanNode& tnode,
-                                                         const DescriptorTbl& 
descs,
-                                                         bool 
require_bucket_distribution)
+                                                         const DescriptorTbl& 
descs)
         : DataSinkOperatorX<PartitionedAggSinkLocalState>(operator_id, 
tnode.node_id, dest_id) {
-    _agg_sink_operator = std::make_unique<AggSinkOperatorX>(pool, operator_id, 
dest_id, tnode,
-                                                            descs, 
require_bucket_distribution);
+    _agg_sink_operator =
+            std::make_unique<AggSinkOperatorX>(pool, operator_id, dest_id, 
tnode, descs);
     _spillable = true;
 }
 
diff --git a/be/src/pipeline/exec/partitioned_aggregation_sink_operator.h 
b/be/src/pipeline/exec/partitioned_aggregation_sink_operator.h
index 9d5cb146c4d..827655e4c3d 100644
--- a/be/src/pipeline/exec/partitioned_aggregation_sink_operator.h
+++ b/be/src/pipeline/exec/partitioned_aggregation_sink_operator.h
@@ -104,8 +104,7 @@ public:
 class PartitionedAggSinkOperatorX : public 
DataSinkOperatorX<PartitionedAggSinkLocalState> {
 public:
     PartitionedAggSinkOperatorX(ObjectPool* pool, int operator_id, int dest_id,
-                                const TPlanNode& tnode, const DescriptorTbl& 
descs,
-                                bool require_bucket_distribution);
+                                const TPlanNode& tnode, const DescriptorTbl& 
descs);
     ~PartitionedAggSinkOperatorX() override = default;
     Status init(const TDataSink& tsink) override {
         return Status::InternalError("{} should not init with TPlanNode",
@@ -118,12 +117,21 @@ public:
 
     Status sink(RuntimeState* state, vectorized::Block* in_block, bool eos) 
override;
 
+    void update_operator(const TPlanNode& tnode, bool 
followed_by_shuffled_operator,
+                         bool require_bucket_distribution) override {
+        _agg_sink_operator->update_operator(tnode, 
followed_by_shuffled_operator,
+                                            require_bucket_distribution);
+    }
+
     DataDistribution required_data_distribution(RuntimeState* state) const 
override {
         return _agg_sink_operator->required_data_distribution(state);
     }
 
-    bool require_data_distribution() const override {
-        return _agg_sink_operator->require_data_distribution();
+    bool is_colocated_operator() const override {
+        return _agg_sink_operator->is_colocated_operator();
+    }
+    bool is_shuffled_operator() const override {
+        return _agg_sink_operator->is_shuffled_operator();
     }
 
     Status set_child(OperatorPtr child) override {
diff --git a/be/src/pipeline/exec/partitioned_aggregation_source_operator.cpp 
b/be/src/pipeline/exec/partitioned_aggregation_source_operator.cpp
index f9b900cba2f..a3446a28881 100644
--- a/be/src/pipeline/exec/partitioned_aggregation_source_operator.cpp
+++ b/be/src/pipeline/exec/partitioned_aggregation_source_operator.cpp
@@ -25,6 +25,7 @@
 #include "common/exception.h"
 #include "common/logging.h"
 #include "common/status.h"
+#include "pipeline/exec/aggregation_source_operator.h"
 #include "pipeline/exec/operator.h"
 #include "pipeline/exec/spill_utils.h"
 #include "pipeline/pipeline_task.h"
@@ -116,6 +117,25 @@ bool PartitionedAggSourceOperatorX::is_serial_operator() 
const {
     return _agg_source_operator->is_serial_operator();
 }
 
+void PartitionedAggSourceOperatorX::update_operator(const TPlanNode& tnode,
+                                                    bool 
followed_by_shuffled_operator,
+                                                    bool 
require_bucket_distribution) {
+    _agg_source_operator->update_operator(tnode, followed_by_shuffled_operator,
+                                          require_bucket_distribution);
+}
+
+DataDistribution PartitionedAggSourceOperatorX::required_data_distribution(
+        RuntimeState* state) const {
+    return _agg_source_operator->required_data_distribution(state);
+}
+
+bool PartitionedAggSourceOperatorX::is_colocated_operator() const {
+    return _agg_source_operator->is_colocated_operator();
+}
+bool PartitionedAggSourceOperatorX::is_shuffled_operator() const {
+    return _agg_source_operator->is_shuffled_operator();
+}
+
 Status PartitionedAggSourceOperatorX::get_block(RuntimeState* state, 
vectorized::Block* block,
                                                 bool* eos) {
     auto& local_state = get_local_state(state);
diff --git a/be/src/pipeline/exec/partitioned_aggregation_source_operator.h 
b/be/src/pipeline/exec/partitioned_aggregation_source_operator.h
index c388d5fe047..d22d23f20f1 100644
--- a/be/src/pipeline/exec/partitioned_aggregation_source_operator.h
+++ b/be/src/pipeline/exec/partitioned_aggregation_source_operator.h
@@ -88,6 +88,12 @@ public:
     bool is_source() const override { return true; }
 
     bool is_serial_operator() const override;
+    void update_operator(const TPlanNode& tnode, bool 
followed_by_shuffled_operator,
+                         bool require_bucket_distribution) override;
+
+    DataDistribution required_data_distribution(RuntimeState* state) const 
override;
+    bool is_colocated_operator() const override;
+    bool is_shuffled_operator() const override;
 
 private:
     friend class PartitionedAggLocalState;
diff --git a/be/src/pipeline/exec/partitioned_hash_join_probe_operator.h 
b/be/src/pipeline/exec/partitioned_hash_join_probe_operator.h
index 22f88d6859a..b744b8f2f7d 100644
--- a/be/src/pipeline/exec/partitioned_hash_join_probe_operator.h
+++ b/be/src/pipeline/exec/partitioned_hash_join_probe_operator.h
@@ -152,10 +152,6 @@ public:
                                            _distribution_partition_exprs));
     }
 
-    bool is_shuffled_operator() const override {
-        return _join_distribution == TJoinDistributionType::PARTITIONED;
-    }
-
     size_t revocable_mem_size(RuntimeState* state) const override;
 
     size_t get_reserve_mem_size(RuntimeState* state) override;
@@ -165,8 +161,17 @@ public:
         _inner_sink_operator = sink_operator;
         _inner_probe_operator = probe_operator;
     }
-    bool require_data_distribution() const override {
-        return _inner_probe_operator->require_data_distribution();
+    bool is_shuffled_operator() const override {
+        return _inner_probe_operator->is_shuffled_operator();
+    }
+    bool is_colocated_operator() const override {
+        return _inner_probe_operator->is_colocated_operator();
+    }
+
+    void update_operator(const TPlanNode& tnode, bool 
followed_by_shuffled_operator,
+                         bool require_bucket_distribution) override {
+        _inner_probe_operator->update_operator(tnode, 
followed_by_shuffled_operator,
+                                               require_bucket_distribution);
     }
 
 private:
diff --git a/be/src/pipeline/exec/partitioned_hash_join_sink_operator.h 
b/be/src/pipeline/exec/partitioned_hash_join_sink_operator.h
index f464af25af2..e098d071daa 100644
--- a/be/src/pipeline/exec/partitioned_hash_join_sink_operator.h
+++ b/be/src/pipeline/exec/partitioned_hash_join_sink_operator.h
@@ -138,18 +138,23 @@ public:
                                           _distribution_partition_exprs);
     }
 
-    bool is_shuffled_operator() const override {
-        return _join_distribution == TJoinDistributionType::PARTITIONED;
-    }
-
     void set_inner_operators(const 
std::shared_ptr<HashJoinBuildSinkOperatorX>& sink_operator,
                              const std::shared_ptr<HashJoinProbeOperatorX>& 
probe_operator) {
         _inner_sink_operator = sink_operator;
         _inner_probe_operator = probe_operator;
     }
 
-    bool require_data_distribution() const override {
-        return _inner_probe_operator->require_data_distribution();
+    bool is_colocated_operator() const override {
+        return _inner_sink_operator->is_colocated_operator();
+    }
+    bool is_shuffled_operator() const override {
+        return _inner_sink_operator->is_shuffled_operator();
+    }
+
+    void update_operator(const TPlanNode& tnode, bool 
followed_by_shuffled_operator,
+                         bool require_bucket_distribution) override {
+        _inner_sink_operator->update_operator(tnode, 
followed_by_shuffled_operator,
+                                              require_bucket_distribution);
     }
 
 private:
diff --git a/be/src/pipeline/exec/set_probe_sink_operator.h 
b/be/src/pipeline/exec/set_probe_sink_operator.h
index 09b5ee0249b..18cc29c37ce 100644
--- a/be/src/pipeline/exec/set_probe_sink_operator.h
+++ b/be/src/pipeline/exec/set_probe_sink_operator.h
@@ -117,6 +117,7 @@ public:
     size_t get_reserve_mem_size(RuntimeState* state, bool eos) override;
 
     bool is_shuffled_operator() const override { return true; }
+    bool is_colocated_operator() const override { return _is_colocate; }
 
 private:
     void _finalize_probe(SetProbeSinkLocalState<is_intersect>& local_state);
@@ -128,7 +129,7 @@ private:
     // every child has its result expr list
     vectorized::VExprContextSPtrs _child_exprs;
     const bool _is_colocate;
-    const std::vector<TExpr> _partition_exprs;
+    std::vector<TExpr> _partition_exprs;
     using OperatorBase::_child;
 };
 
diff --git a/be/src/pipeline/exec/set_sink_operator.h 
b/be/src/pipeline/exec/set_sink_operator.h
index 1d70c8681a3..c22a09ac07e 100644
--- a/be/src/pipeline/exec/set_sink_operator.h
+++ b/be/src/pipeline/exec/set_sink_operator.h
@@ -121,6 +121,7 @@ public:
     size_t get_reserve_mem_size(RuntimeState* state, bool eos) override;
 
     bool is_shuffled_operator() const override { return true; }
+    bool is_colocated_operator() const override { return _is_colocate; }
 
 private:
     template <class HashTableContext, bool is_intersected>
@@ -137,7 +138,7 @@ private:
     // every child has its result expr list
     vectorized::VExprContextSPtrs _child_exprs;
     const bool _is_colocate;
-    const std::vector<TExpr> _partition_exprs;
+    std::vector<TExpr> _partition_exprs;
     using OperatorBase::_child;
 
     const std::vector<TRuntimeFilterDesc> _runtime_filter_descs;
diff --git a/be/src/pipeline/exec/set_source_operator.h 
b/be/src/pipeline/exec/set_source_operator.h
index d06987c6613..db56da26a24 100644
--- a/be/src/pipeline/exec/set_source_operator.h
+++ b/be/src/pipeline/exec/set_source_operator.h
@@ -68,14 +68,23 @@ public:
             : Base(pool, tnode, operator_id, descs),
               _child_quantity(tnode.node_type == 
TPlanNodeType::type::INTERSECT_NODE
                                       ? 
tnode.intersect_node.result_expr_lists.size()
-                                      : 
tnode.except_node.result_expr_lists.size()) {};
+                                      : 
tnode.except_node.result_expr_lists.size()),
+              _is_colocate(is_intersect ? tnode.intersect_node.is_colocate
+                                        : tnode.except_node.is_colocate) {}
 
 #ifdef BE_TEST
-    SetSourceOperatorX(size_t child_quantity) : 
_child_quantity(child_quantity) {}
+    SetSourceOperatorX(size_t child_quantity)
+            : _child_quantity(child_quantity), _is_colocate(false) {}
 #endif
     ~SetSourceOperatorX() override = default;
 
     [[nodiscard]] bool is_source() const override { return true; }
+    bool is_shuffled_operator() const override { return true; }
+    bool is_colocated_operator() const override { return _is_colocate; }
+    DataDistribution required_data_distribution(RuntimeState* /*state*/) const 
override {
+        return _is_colocate ? 
DataDistribution(ExchangeType::BUCKET_HASH_SHUFFLE)
+                            : DataDistribution(ExchangeType::HASH_SHUFFLE);
+    }
 
     Status get_block(RuntimeState* state, vectorized::Block* block, bool* eos) 
override;
     Status set_child(OperatorPtr child) override {
@@ -94,6 +103,7 @@ private:
                                   HashTableContext& hash_table_ctx, 
vectorized::Block* output_block,
                                   const int batch_size, bool* eos);
     const size_t _child_quantity;
+    const bool _is_colocate;
 };
 #include "common/compile_check_end.h"
 } // namespace pipeline
diff --git a/be/src/pipeline/exec/sort_sink_operator.cpp 
b/be/src/pipeline/exec/sort_sink_operator.cpp
index b85373f7663..6931601bf3f 100644
--- a/be/src/pipeline/exec/sort_sink_operator.cpp
+++ b/be/src/pipeline/exec/sort_sink_operator.cpp
@@ -86,8 +86,7 @@ Status SortSinkLocalState::open(RuntimeState* state) {
 }
 
 SortSinkOperatorX::SortSinkOperatorX(ObjectPool* pool, int operator_id, int 
dest_id,
-                                     const TPlanNode& tnode, const 
DescriptorTbl& descs,
-                                     const bool require_bucket_distribution)
+                                     const TPlanNode& tnode, const 
DescriptorTbl& descs)
         : DataSinkOperatorX(operator_id, tnode, dest_id),
           _offset(tnode.sort_node.__isset.offset ? tnode.sort_node.offset : 0),
           _pool(pool),
@@ -95,7 +94,6 @@ SortSinkOperatorX::SortSinkOperatorX(ObjectPool* pool, int 
operator_id, int dest
           _row_descriptor(descs, tnode.row_tuples),
           _merge_by_exchange(tnode.sort_node.merge_by_exchange),
           _is_colocate(tnode.sort_node.__isset.is_colocate && 
tnode.sort_node.is_colocate),
-          _require_bucket_distribution(require_bucket_distribution),
           _is_analytic_sort(tnode.sort_node.__isset.is_analytic_sort
                                     ? tnode.sort_node.is_analytic_sort
                                     : false),
diff --git a/be/src/pipeline/exec/sort_sink_operator.h 
b/be/src/pipeline/exec/sort_sink_operator.h
index b520dbb3111..b7aaa99b0fb 100644
--- a/be/src/pipeline/exec/sort_sink_operator.h
+++ b/be/src/pipeline/exec/sort_sink_operator.h
@@ -56,7 +56,7 @@ private:
 class SortSinkOperatorX final : public DataSinkOperatorX<SortSinkLocalState> {
 public:
     SortSinkOperatorX(ObjectPool* pool, int operator_id, int dest_id, const 
TPlanNode& tnode,
-                      const DescriptorTbl& descs, const bool 
require_bucket_distribution);
+                      const DescriptorTbl& descs);
 #ifdef BE_TEST
     SortSinkOperatorX(ObjectPool* pool, TSortAlgorithm::type type, int64_t 
limit, int64_t offset)
             : _offset(offset),
@@ -79,7 +79,7 @@ public:
     Status sink(RuntimeState* state, vectorized::Block* in_block, bool eos) 
override;
     DataDistribution required_data_distribution(RuntimeState* /*state*/) const 
override {
         if (_is_analytic_sort) {
-            return _is_colocate && _require_bucket_distribution && 
!_followed_by_shuffled_operator
+            return _is_colocate && _require_bucket_distribution
                            ? 
DataDistribution(ExchangeType::BUCKET_HASH_SHUFFLE, _partition_exprs)
                            : DataDistribution(ExchangeType::HASH_SHUFFLE, 
_partition_exprs);
         } else if (_merge_by_exchange) {
@@ -89,7 +89,8 @@ public:
             return {ExchangeType::NOOP};
         }
     }
-    bool require_data_distribution() const override { return _is_colocate; }
+    bool is_colocated_operator() const override { return _is_colocate; }
+    bool is_shuffled_operator() const override { return _is_analytic_sort; }
 
     size_t get_revocable_mem_size(RuntimeState* state) const;
 
@@ -121,9 +122,8 @@ private:
     const RowDescriptor _row_descriptor;
     const bool _merge_by_exchange;
     const bool _is_colocate = false;
-    const bool _require_bucket_distribution = false;
     const bool _is_analytic_sort = false;
-    const std::vector<TExpr> _partition_exprs;
+    std::vector<TExpr> _partition_exprs;
     const TSortAlgorithm::type _algorithm;
     const bool _reuse_mem;
     const int64_t _max_buffered_bytes;
diff --git a/be/src/pipeline/exec/spill_sort_sink_operator.cpp 
b/be/src/pipeline/exec/spill_sort_sink_operator.cpp
index 9669f508d7a..52e820ca62b 100644
--- a/be/src/pipeline/exec/spill_sort_sink_operator.cpp
+++ b/be/src/pipeline/exec/spill_sort_sink_operator.cpp
@@ -106,12 +106,11 @@ bool SpillSortSinkLocalState::is_blockable() const {
 }
 
 SpillSortSinkOperatorX::SpillSortSinkOperatorX(ObjectPool* pool, int 
operator_id, int dest_id,
-                                               const TPlanNode& tnode, const 
DescriptorTbl& descs,
-                                               bool 
require_bucket_distribution)
+                                               const TPlanNode& tnode, const 
DescriptorTbl& descs)
         : DataSinkOperatorX(operator_id, tnode.node_id, dest_id) {
     _spillable = true;
-    _sort_sink_operator = std::make_unique<SortSinkOperatorX>(pool, 
operator_id, dest_id, tnode,
-                                                              descs, 
require_bucket_distribution);
+    _sort_sink_operator =
+            std::make_unique<SortSinkOperatorX>(pool, operator_id, dest_id, 
tnode, descs);
 }
 
 Status SpillSortSinkOperatorX::init(const TPlanNode& tnode, RuntimeState* 
state) {
diff --git a/be/src/pipeline/exec/spill_sort_sink_operator.h 
b/be/src/pipeline/exec/spill_sort_sink_operator.h
index d2efcc90512..a3ddb8a24e9 100644
--- a/be/src/pipeline/exec/spill_sort_sink_operator.h
+++ b/be/src/pipeline/exec/spill_sort_sink_operator.h
@@ -68,7 +68,7 @@ class SpillSortSinkOperatorX final : public 
DataSinkOperatorX<SpillSortSinkLocal
 public:
     using LocalStateType = SpillSortSinkLocalState;
     SpillSortSinkOperatorX(ObjectPool* pool, int operator_id, int dest_id, 
const TPlanNode& tnode,
-                           const DescriptorTbl& descs, bool 
require_bucket_distribution);
+                           const DescriptorTbl& descs);
     Status init(const TDataSink& tsink) override {
         return Status::InternalError("{} should not init with TPlanNode",
                                      
DataSinkOperatorX<SpillSortSinkLocalState>::_name);
@@ -81,8 +81,16 @@ public:
     DataDistribution required_data_distribution(RuntimeState* state) const 
override {
         return _sort_sink_operator->required_data_distribution(state);
     }
-    bool require_data_distribution() const override {
-        return _sort_sink_operator->require_data_distribution();
+    void update_operator(const TPlanNode& tnode, bool 
followed_by_shuffled_operator,
+                         bool require_bucket_distribution) override {
+        _sort_sink_operator->update_operator(tnode, 
followed_by_shuffled_operator,
+                                             require_bucket_distribution);
+    }
+    bool is_colocated_operator() const override {
+        return _sort_sink_operator->is_colocated_operator();
+    }
+    bool is_shuffled_operator() const override {
+        return _sort_sink_operator->is_shuffled_operator();
     }
     Status set_child(OperatorPtr child) override {
         
RETURN_IF_ERROR(DataSinkOperatorX<SpillSortSinkLocalState>::set_child(child));
diff --git a/be/src/pipeline/exec/streaming_aggregation_operator.cpp 
b/be/src/pipeline/exec/streaming_aggregation_operator.cpp
index 5959ef2cf03..fc7e8fe147c 100644
--- a/be/src/pipeline/exec/streaming_aggregation_operator.cpp
+++ b/be/src/pipeline/exec/streaming_aggregation_operator.cpp
@@ -818,29 +818,34 @@ void 
StreamingAggLocalState::_emplace_into_hash_table(vectorized::AggregateDataP
 }
 
 StreamingAggOperatorX::StreamingAggOperatorX(ObjectPool* pool, int operator_id,
-                                             const TPlanNode& tnode, const 
DescriptorTbl& descs,
-                                             bool require_bucket_distribution)
+                                             const TPlanNode& tnode, const 
DescriptorTbl& descs)
         : StatefulOperatorX<StreamingAggLocalState>(pool, tnode, operator_id, 
descs),
           _intermediate_tuple_id(tnode.agg_node.intermediate_tuple_id),
           _output_tuple_id(tnode.agg_node.output_tuple_id),
           _needs_finalize(tnode.agg_node.need_finalize),
           _is_first_phase(tnode.agg_node.__isset.is_first_phase && 
tnode.agg_node.is_first_phase),
           _have_conjuncts(tnode.__isset.vconjunct && 
!tnode.vconjunct.nodes.empty()),
-          _agg_fn_output_row_descriptor(descs, tnode.row_tuples),
-          _partition_exprs(
-                  tnode.__isset.distribute_expr_lists &&
-                                  (require_bucket_distribution ||
-                                   std::any_of(
-                                           
tnode.agg_node.aggregate_functions.begin(),
-                                           
tnode.agg_node.aggregate_functions.end(),
-                                           [](const TExpr& texpr) -> bool {
-                                               return texpr.nodes[0]
-                                                       
.fn.name.function_name.starts_with(
-                                                               vectorized::
-                                                                       
DISTINCT_FUNCTION_PREFIX);
-                                           }))
-                          ? tnode.distribute_expr_lists[0]
-                          : tnode.agg_node.grouping_exprs) {}
+          _agg_fn_output_row_descriptor(descs, tnode.row_tuples) {}
+
+void StreamingAggOperatorX::update_operator(const TPlanNode& tnode,
+                                            bool followed_by_shuffled_operator,
+                                            bool require_bucket_distribution) {
+    _followed_by_shuffled_operator = followed_by_shuffled_operator;
+    _require_bucket_distribution = require_bucket_distribution;
+    _partition_exprs =
+            tnode.__isset.distribute_expr_lists &&
+                            (StatefulOperatorX<
+                                     
StreamingAggLocalState>::_followed_by_shuffled_operator ||
+                             std::any_of(
+                                     
tnode.agg_node.aggregate_functions.begin(),
+                                     tnode.agg_node.aggregate_functions.end(),
+                                     [](const TExpr& texpr) -> bool {
+                                         return 
texpr.nodes[0].fn.name.function_name.starts_with(
+                                                 
vectorized::DISTINCT_FUNCTION_PREFIX);
+                                     }))
+                    ? tnode.distribute_expr_lists[0]
+                    : tnode.agg_node.grouping_exprs;
+}
 
 Status StreamingAggOperatorX::init(const TPlanNode& tnode, RuntimeState* 
state) {
     RETURN_IF_ERROR(StatefulOperatorX<StreamingAggLocalState>::init(tnode, 
state));
diff --git a/be/src/pipeline/exec/streaming_aggregation_operator.h 
b/be/src/pipeline/exec/streaming_aggregation_operator.h
index 71396f351e3..a6d82f569f0 100644
--- a/be/src/pipeline/exec/streaming_aggregation_operator.h
+++ b/be/src/pipeline/exec/streaming_aggregation_operator.h
@@ -204,13 +204,15 @@ private:
 class StreamingAggOperatorX MOCK_REMOVE(final) : public 
StatefulOperatorX<StreamingAggLocalState> {
 public:
     StreamingAggOperatorX(ObjectPool* pool, int operator_id, const TPlanNode& 
tnode,
-                          const DescriptorTbl& descs, bool 
require_bucket_distribution);
+                          const DescriptorTbl& descs);
 #ifdef BE_TEST
     StreamingAggOperatorX() : _is_first_phase {false} {}
 #endif
 
     ~StreamingAggOperatorX() override = default;
     Status init(const TPlanNode& tnode, RuntimeState* state) override;
+    void update_operator(const TPlanNode& tnode, bool 
followed_by_shuffled_operator,
+                         bool require_bucket_distribution) override;
     Status prepare(RuntimeState* state) override;
     Status pull(RuntimeState* state, vectorized::Block* block, bool* eos) 
const override;
     Status push(RuntimeState* state, vectorized::Block* input_block, bool eos) 
const override;
@@ -271,7 +273,7 @@ private:
     std::vector<int> _order_directions;
     std::vector<int> _null_directions;
 
-    const std::vector<TExpr> _partition_exprs;
+    std::vector<TExpr> _partition_exprs;
 };
 
 } // namespace pipeline
diff --git a/be/src/pipeline/exec/union_sink_operator.h 
b/be/src/pipeline/exec/union_sink_operator.h
index ee575e8f8d5..2b32268ffca 100644
--- a/be/src/pipeline/exec/union_sink_operator.h
+++ b/be/src/pipeline/exec/union_sink_operator.h
@@ -117,18 +117,14 @@ public:
         }
     }
 
-    bool require_shuffled_data_distribution(RuntimeState* /*state*/) const 
override {
-        return _followed_by_shuffled_operator;
-    }
-
-    DataDistribution required_data_distribution(RuntimeState* /*state*/) const 
override {
-        if (_child->is_serial_operator() && _followed_by_shuffled_operator) {
-            return DataDistribution(ExchangeType::HASH_SHUFFLE, 
_distribute_exprs);
+    DataDistribution required_data_distribution(RuntimeState* state) const 
override {
+        if (_require_bucket_distribution) {
+            return DataDistribution(ExchangeType::BUCKET_HASH_SHUFFLE, 
_distribute_exprs);
         }
-        if (_child->is_serial_operator()) {
-            return DataDistribution(ExchangeType::PASSTHROUGH);
+        if (_followed_by_shuffled_operator) {
+            return DataDistribution(ExchangeType::HASH_SHUFFLE, 
_distribute_exprs);
         }
-        return DataDistribution(ExchangeType::NOOP);
+        return Base::required_data_distribution(state);
     }
 
     void set_low_memory_mode(RuntimeState* state) override {
@@ -136,8 +132,6 @@ public:
         local_state._shared_state->data_queue.set_low_memory_mode();
     }
 
-    bool is_shuffled_operator() const override { return 
_followed_by_shuffled_operator; }
-
 private:
     int _get_first_materialized_child_idx() const { return 
_first_materialized_child_idx; }
 
diff --git a/be/src/pipeline/exec/union_source_operator.h 
b/be/src/pipeline/exec/union_source_operator.h
index 255c778f1e8..81040b9d417 100644
--- a/be/src/pipeline/exec/union_source_operator.h
+++ b/be/src/pipeline/exec/union_source_operator.h
@@ -102,8 +102,15 @@ public:
         return Status::OK();
     }
     [[nodiscard]] int get_child_count() const { return _child_size; }
-    bool require_shuffled_data_distribution(RuntimeState* /*state*/) const 
override {
-        return _followed_by_shuffled_operator;
+
+    DataDistribution required_data_distribution(RuntimeState* state) const 
override {
+        if (_require_bucket_distribution) {
+            return DataDistribution(ExchangeType::BUCKET_HASH_SHUFFLE);
+        }
+        if (_followed_by_shuffled_operator) {
+            return DataDistribution(ExchangeType::HASH_SHUFFLE);
+        }
+        return Base::required_data_distribution(state);
     }
 
     void set_low_memory_mode(RuntimeState* state) override {
@@ -113,7 +120,6 @@ public:
         }
     }
 
-    bool is_shuffled_operator() const override { return 
_followed_by_shuffled_operator; }
     Status set_child(OperatorPtr child) override {
         Base::_child = child;
         return Status::OK();
diff --git a/be/src/pipeline/pipeline_fragment_context.cpp 
b/be/src/pipeline/pipeline_fragment_context.cpp
index cbe8555ad47..e66e2326789 100644
--- a/be/src/pipeline/pipeline_fragment_context.cpp
+++ b/be/src/pipeline/pipeline_fragment_context.cpp
@@ -699,13 +699,15 @@ Status 
PipelineFragmentContext::_create_tree_helper(ObjectPool* pool,
      * If an operator's is followed by a local exchange without shuffle (e.g. 
passthrough), a
      * shuffled local exchanger will be used before join so it is not followed 
by shuffle join.
      */
-    auto require_shuffled_data_distribution =
+    auto required_data_distribution =
             cur_pipe->operators().empty()
-                    ? 
cur_pipe->sink()->require_shuffled_data_distribution(_runtime_state.get())
-                    : 
op->require_shuffled_data_distribution(_runtime_state.get());
+                    ? 
cur_pipe->sink()->required_data_distribution(_runtime_state.get())
+                    : op->required_data_distribution(_runtime_state.get());
     current_followed_by_shuffled_operator =
-            (followed_by_shuffled_operator || op->is_shuffled_operator()) &&
-            require_shuffled_data_distribution;
+            (followed_by_shuffled_operator ||
+             (cur_pipe->operators().empty() ? 
cur_pipe->sink()->is_shuffled_operator()
+                                            : op->is_shuffled_operator())) &&
+            
Pipeline::is_hash_exchange(required_data_distribution.distribution_type);
 
     if (num_children == 0) {
         _use_serial_source = op->is_serial_operator();
@@ -1197,6 +1199,19 @@ Status 
PipelineFragmentContext::_create_operator(ObjectPool* pool, const TPlanNo
                                                  PipelinePtr& cur_pipe, int 
parent_idx,
                                                  int child_idx,
                                                  const bool 
followed_by_shuffled_operator) {
+    std::vector<DataSinkOperatorPtr> sink_ops;
+    Defer defer = Defer([&]() {
+        if (op) {
+            op->update_operator(tnode, followed_by_shuffled_operator, 
_require_bucket_distribution);
+            _require_bucket_distribution =
+                    _require_bucket_distribution || 
op->is_colocated_operator();
+        }
+        for (auto& s : sink_ops) {
+            s->update_operator(tnode, followed_by_shuffled_operator, 
_require_bucket_distribution);
+            _require_bucket_distribution =
+                    _require_bucket_distribution || s->is_colocated_operator();
+        }
+    });
     // We directly construct the operator from Thrift because the given array 
is in the order of preorder traversal.
     // Therefore, here we need to use a stack-like structure.
     _pipeline_parent_map.pop(cur_pipe, parent_idx, child_idx);
@@ -1309,19 +1324,14 @@ Status 
PipelineFragmentContext::_create_operator(ObjectPool* pool, const TPlanNo
                 PipelinePtr new_pipe;
                 RETURN_IF_ERROR(create_query_cache_operator(new_pipe));
 
-                op = std::make_shared<DistinctStreamingAggOperatorX>(
-                        pool, next_operator_id(), tnode, descs, 
_require_bucket_distribution);
-                op->set_followed_by_shuffled_operator(false);
-                _require_bucket_distribution = true;
+                op = std::make_shared<DistinctStreamingAggOperatorX>(pool, 
next_operator_id(),
+                                                                     tnode, 
descs);
                 RETURN_IF_ERROR(new_pipe->add_operator(op, 
_parallel_instances));
                 RETURN_IF_ERROR(cur_pipe->operators().front()->set_child(op));
                 cur_pipe = new_pipe;
             } else {
-                op = std::make_shared<DistinctStreamingAggOperatorX>(
-                        pool, next_operator_id(), tnode, descs, 
_require_bucket_distribution);
-                
op->set_followed_by_shuffled_operator(followed_by_shuffled_operator);
-                _require_bucket_distribution =
-                        _require_bucket_distribution || 
op->require_data_distribution();
+                op = std::make_shared<DistinctStreamingAggOperatorX>(pool, 
next_operator_id(),
+                                                                     tnode, 
descs);
                 RETURN_IF_ERROR(cur_pipe->add_operator(op, 
_parallel_instances));
             }
         } else if (is_streaming_agg) {
@@ -1329,14 +1339,14 @@ Status 
PipelineFragmentContext::_create_operator(ObjectPool* pool, const TPlanNo
                 PipelinePtr new_pipe;
                 RETURN_IF_ERROR(create_query_cache_operator(new_pipe));
 
-                op = std::make_shared<StreamingAggOperatorX>(pool, 
next_operator_id(), tnode, descs,
-                                                             
_require_bucket_distribution);
+                op = std::make_shared<StreamingAggOperatorX>(pool, 
next_operator_id(), tnode,
+                                                             descs);
                 RETURN_IF_ERROR(cur_pipe->operators().front()->set_child(op));
                 RETURN_IF_ERROR(new_pipe->add_operator(op, 
_parallel_instances));
                 cur_pipe = new_pipe;
             } else {
-                op = std::make_shared<StreamingAggOperatorX>(pool, 
next_operator_id(), tnode, descs,
-                                                             
_require_bucket_distribution);
+                op = std::make_shared<StreamingAggOperatorX>(pool, 
next_operator_id(), tnode,
+                                                             descs);
                 RETURN_IF_ERROR(cur_pipe->add_operator(op, 
_parallel_instances));
             }
         } else {
@@ -1367,20 +1377,14 @@ Status 
PipelineFragmentContext::_create_operator(ObjectPool* pool, const TPlanNo
             cur_pipe = add_pipeline(cur_pipe);
             _dag[downstream_pipeline_id].push_back(cur_pipe->id());
 
-            DataSinkOperatorPtr sink;
             if (enable_spill) {
-                sink = std::make_shared<PartitionedAggSinkOperatorX>(
-                        pool, next_sink_operator_id(), op->operator_id(), 
tnode, descs,
-                        _require_bucket_distribution);
+                
sink_ops.push_back(std::make_shared<PartitionedAggSinkOperatorX>(
+                        pool, next_sink_operator_id(), op->operator_id(), 
tnode, descs));
             } else {
-                sink = std::make_shared<AggSinkOperatorX>(pool, 
next_sink_operator_id(),
-                                                          op->operator_id(), 
tnode, descs,
-                                                          
_require_bucket_distribution);
+                sink_ops.push_back(std::make_shared<AggSinkOperatorX>(
+                        pool, next_sink_operator_id(), op->operator_id(), 
tnode, descs));
             }
-            
sink->set_followed_by_shuffled_operator(followed_by_shuffled_operator);
-            _require_bucket_distribution =
-                    _require_bucket_distribution || 
sink->require_data_distribution();
-            RETURN_IF_ERROR(cur_pipe->set_sink(sink));
+            RETURN_IF_ERROR(cur_pipe->set_sink(sink_ops.back()));
             RETURN_IF_ERROR(cur_pipe->sink()->init(tnode, 
_runtime_state.get()));
         }
         break;
@@ -1426,14 +1430,12 @@ Status 
PipelineFragmentContext::_create_operator(ObjectPool* pool, const TPlanNo
             RETURN_IF_ERROR(inner_sink_operator->init(tnode, 
_runtime_state.get()));
 
             sink_operator->set_inner_operators(inner_sink_operator, 
inner_probe_operator);
-            DataSinkOperatorPtr sink = std::move(sink_operator);
-            RETURN_IF_ERROR(build_side_pipe->set_sink(sink));
+            sink_ops.push_back(std::move(sink_operator));
+            RETURN_IF_ERROR(build_side_pipe->set_sink(sink_ops.back()));
             RETURN_IF_ERROR(build_side_pipe->sink()->init(tnode_, 
_runtime_state.get()));
 
             _pipeline_parent_map.push(op->node_id(), cur_pipe);
             _pipeline_parent_map.push(op->node_id(), build_side_pipe);
-            
sink->set_followed_by_shuffled_operator(sink->is_shuffled_operator());
-            op->set_followed_by_shuffled_operator(op->is_shuffled_operator());
         } else {
             op = std::make_shared<HashJoinProbeOperatorX>(pool, tnode, 
next_operator_id(), descs);
             RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
@@ -1445,16 +1447,13 @@ Status 
PipelineFragmentContext::_create_operator(ObjectPool* pool, const TPlanNo
             PipelinePtr build_side_pipe = add_pipeline(cur_pipe);
             _dag[downstream_pipeline_id].push_back(build_side_pipe->id());
 
-            DataSinkOperatorPtr sink;
-            sink = std::make_shared<HashJoinBuildSinkOperatorX>(pool, 
next_sink_operator_id(),
-                                                                
op->operator_id(), tnode, descs);
-            RETURN_IF_ERROR(build_side_pipe->set_sink(sink));
+            sink_ops.push_back(std::make_shared<HashJoinBuildSinkOperatorX>(
+                    pool, next_sink_operator_id(), op->operator_id(), tnode, 
descs));
+            RETURN_IF_ERROR(build_side_pipe->set_sink(sink_ops.back()));
             RETURN_IF_ERROR(build_side_pipe->sink()->init(tnode, 
_runtime_state.get()));
 
             _pipeline_parent_map.push(op->node_id(), cur_pipe);
             _pipeline_parent_map.push(op->node_id(), build_side_pipe);
-            
sink->set_followed_by_shuffled_operator(sink->is_shuffled_operator());
-            op->set_followed_by_shuffled_operator(op->is_shuffled_operator());
         }
         if (is_broadcast_join && 
_runtime_state->enable_share_hash_table_for_broadcast_join()) {
             std::shared_ptr<HashJoinSharedState> shared_state =
@@ -1470,8 +1469,6 @@ Status 
PipelineFragmentContext::_create_operator(ObjectPool* pool, const TPlanNo
             _op_id_to_shared_state.insert(
                     {op->operator_id(), {shared_state, 
shared_state->sink_deps}});
         }
-        _require_bucket_distribution =
-                _require_bucket_distribution || 
op->require_data_distribution();
         break;
     }
     case TPlanNodeType::CROSS_JOIN_NODE: {
@@ -1485,10 +1482,9 @@ Status 
PipelineFragmentContext::_create_operator(ObjectPool* pool, const TPlanNo
         PipelinePtr build_side_pipe = add_pipeline(cur_pipe);
         _dag[downstream_pipeline_id].push_back(build_side_pipe->id());
 
-        DataSinkOperatorPtr sink;
-        sink = std::make_shared<NestedLoopJoinBuildSinkOperatorX>(pool, 
next_sink_operator_id(),
-                                                                  
op->operator_id(), tnode, descs);
-        RETURN_IF_ERROR(build_side_pipe->set_sink(sink));
+        sink_ops.push_back(std::make_shared<NestedLoopJoinBuildSinkOperatorX>(
+                pool, next_sink_operator_id(), op->operator_id(), tnode, 
descs));
+        RETURN_IF_ERROR(build_side_pipe->set_sink(sink_ops.back()));
         RETURN_IF_ERROR(build_side_pipe->sink()->init(tnode, 
_runtime_state.get()));
         _pipeline_parent_map.push(op->node_id(), cur_pipe);
         _pipeline_parent_map.push(op->node_id(), build_side_pipe);
@@ -1497,7 +1493,6 @@ Status 
PipelineFragmentContext::_create_operator(ObjectPool* pool, const TPlanNo
     case TPlanNodeType::UNION_NODE: {
         int child_count = tnode.num_children;
         op = std::make_shared<UnionSourceOperatorX>(pool, tnode, 
next_operator_id(), descs);
-        op->set_followed_by_shuffled_operator(_require_bucket_distribution);
         RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
 
         const auto downstream_pipeline_id = cur_pipe->id();
@@ -1507,11 +1502,9 @@ Status 
PipelineFragmentContext::_create_operator(ObjectPool* pool, const TPlanNo
         for (int i = 0; i < child_count; i++) {
             PipelinePtr build_side_pipe = add_pipeline(cur_pipe);
             _dag[downstream_pipeline_id].push_back(build_side_pipe->id());
-            DataSinkOperatorPtr sink;
-            sink = std::make_shared<UnionSinkOperatorX>(i, 
next_sink_operator_id(),
-                                                        op->operator_id(), 
pool, tnode, descs);
-            
sink->set_followed_by_shuffled_operator(_require_bucket_distribution);
-            RETURN_IF_ERROR(build_side_pipe->set_sink(sink));
+            sink_ops.push_back(std::make_shared<UnionSinkOperatorX>(
+                    i, next_sink_operator_id(), op->operator_id(), pool, 
tnode, descs));
+            RETURN_IF_ERROR(build_side_pipe->set_sink(sink_ops.back()));
             RETURN_IF_ERROR(build_side_pipe->sink()->init(tnode, 
_runtime_state.get()));
             // preset children pipelines. if any pipeline found this as its 
father, will use the prepared pipeline to build.
             _pipeline_parent_map.push(op->node_id(), build_side_pipe);
@@ -1540,20 +1533,14 @@ Status 
PipelineFragmentContext::_create_operator(ObjectPool* pool, const TPlanNo
         cur_pipe = add_pipeline(cur_pipe);
         _dag[downstream_pipeline_id].push_back(cur_pipe->id());
 
-        DataSinkOperatorPtr sink;
         if (should_spill) {
-            sink = std::make_shared<SpillSortSinkOperatorX>(pool, 
next_sink_operator_id(),
-                                                            op->operator_id(), 
tnode, descs,
-                                                            
_require_bucket_distribution);
+            sink_ops.push_back(std::make_shared<SpillSortSinkOperatorX>(
+                    pool, next_sink_operator_id(), op->operator_id(), tnode, 
descs));
         } else {
-            sink = std::make_shared<SortSinkOperatorX>(pool, 
next_sink_operator_id(),
-                                                       op->operator_id(), 
tnode, descs,
-                                                       
_require_bucket_distribution);
-        }
-        sink->set_followed_by_shuffled_operator(followed_by_shuffled_operator);
-        _require_bucket_distribution =
-                _require_bucket_distribution || 
sink->require_data_distribution();
-        RETURN_IF_ERROR(cur_pipe->set_sink(sink));
+            sink_ops.push_back(std::make_shared<SortSinkOperatorX>(
+                    pool, next_sink_operator_id(), op->operator_id(), tnode, 
descs));
+        }
+        RETURN_IF_ERROR(cur_pipe->set_sink(sink_ops.back()));
         RETURN_IF_ERROR(cur_pipe->sink()->init(tnode, _runtime_state.get()));
         break;
     }
@@ -1568,10 +1555,9 @@ Status 
PipelineFragmentContext::_create_operator(ObjectPool* pool, const TPlanNo
         cur_pipe = add_pipeline(cur_pipe);
         _dag[downstream_pipeline_id].push_back(cur_pipe->id());
 
-        DataSinkOperatorPtr sink;
-        sink = std::make_shared<PartitionSortSinkOperatorX>(pool, 
next_sink_operator_id(),
-                                                            op->operator_id(), 
tnode, descs);
-        RETURN_IF_ERROR(cur_pipe->set_sink(sink));
+        sink_ops.push_back(std::make_shared<PartitionSortSinkOperatorX>(
+                pool, next_sink_operator_id(), op->operator_id(), tnode, 
descs));
+        RETURN_IF_ERROR(cur_pipe->set_sink(sink_ops.back()));
         RETURN_IF_ERROR(cur_pipe->sink()->init(tnode, _runtime_state.get()));
         break;
     }
@@ -1586,14 +1572,9 @@ Status 
PipelineFragmentContext::_create_operator(ObjectPool* pool, const TPlanNo
         cur_pipe = add_pipeline(cur_pipe);
         _dag[downstream_pipeline_id].push_back(cur_pipe->id());
 
-        DataSinkOperatorPtr sink;
-        sink = std::make_shared<AnalyticSinkOperatorX>(pool, 
next_sink_operator_id(),
-                                                       op->operator_id(), 
tnode, descs,
-                                                       
_require_bucket_distribution);
-        sink->set_followed_by_shuffled_operator(followed_by_shuffled_operator);
-        _require_bucket_distribution =
-                _require_bucket_distribution || 
sink->require_data_distribution();
-        RETURN_IF_ERROR(cur_pipe->set_sink(sink));
+        sink_ops.push_back(std::make_shared<AnalyticSinkOperatorX>(
+                pool, next_sink_operator_id(), op->operator_id(), tnode, 
descs));
+        RETURN_IF_ERROR(cur_pipe->set_sink(sink_ops.back()));
         RETURN_IF_ERROR(cur_pipe->sink()->init(tnode, _runtime_state.get()));
         break;
     }
@@ -1605,15 +1586,13 @@ Status 
PipelineFragmentContext::_create_operator(ObjectPool* pool, const TPlanNo
     case TPlanNodeType::INTERSECT_NODE: {
         RETURN_IF_ERROR(_build_operators_for_set_operation_node<true>(
                 pool, tnode, descs, op, cur_pipe, parent_idx, child_idx,
-                !tnode.intersect_node.is_colocate));
-        _require_bucket_distribution = tnode.intersect_node.is_colocate;
+                !tnode.intersect_node.is_colocate || 
followed_by_shuffled_operator, sink_ops));
         break;
     }
     case TPlanNodeType::EXCEPT_NODE: {
         RETURN_IF_ERROR(_build_operators_for_set_operation_node<false>(
                 pool, tnode, descs, op, cur_pipe, parent_idx, child_idx,
-                !tnode.except_node.is_colocate));
-        _require_bucket_distribution = tnode.except_node.is_colocate;
+                !tnode.except_node.is_colocate || 
followed_by_shuffled_operator, sink_ops));
         break;
     }
     case TPlanNodeType::REPEAT_NODE: {
@@ -1710,9 +1689,9 @@ Status 
PipelineFragmentContext::_create_operator(ObjectPool* pool, const TPlanNo
 template <bool is_intersect>
 Status PipelineFragmentContext::_build_operators_for_set_operation_node(
         ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs, 
OperatorPtr& op,
-        PipelinePtr& cur_pipe, int parent_idx, int child_idx, bool 
followed_by_shuffled_operator) {
+        PipelinePtr& cur_pipe, int parent_idx, int child_idx, bool 
followed_by_shuffled_operator,
+        std::vector<DataSinkOperatorPtr>& sink_ops) {
     op.reset(new SetSourceOperatorX<is_intersect>(pool, tnode, 
next_operator_id(), descs));
-    op->set_followed_by_shuffled_operator(followed_by_shuffled_operator);
     RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances));
 
     const auto downstream_pipeline_id = cur_pipe->id();
@@ -1724,16 +1703,14 @@ Status 
PipelineFragmentContext::_build_operators_for_set_operation_node(
         PipelinePtr probe_side_pipe = add_pipeline(cur_pipe);
         _dag[downstream_pipeline_id].push_back(probe_side_pipe->id());
 
-        DataSinkOperatorPtr sink;
         if (child_id == 0) {
-            sink.reset(new SetSinkOperatorX<is_intersect>(child_id, 
next_sink_operator_id(),
-                                                          op->operator_id(), 
pool, tnode, descs));
+            
sink_ops.push_back(std::make_shared<SetSinkOperatorX<is_intersect>>(
+                    child_id, next_sink_operator_id(), op->operator_id(), 
pool, tnode, descs));
         } else {
-            sink.reset(new SetProbeSinkOperatorX<is_intersect>(
+            
sink_ops.push_back(std::make_shared<SetProbeSinkOperatorX<is_intersect>>(
                     child_id, next_sink_operator_id(), op->operator_id(), 
pool, tnode, descs));
         }
-        sink->set_followed_by_shuffled_operator(followed_by_shuffled_operator);
-        RETURN_IF_ERROR(probe_side_pipe->set_sink(sink));
+        RETURN_IF_ERROR(probe_side_pipe->set_sink(sink_ops.back()));
         RETURN_IF_ERROR(probe_side_pipe->sink()->init(tnode, 
_runtime_state.get()));
         // prepare children pipelines. if any pipeline found this as its 
father, will use the prepared pipeline to build.
         _pipeline_parent_map.push(op->node_id(), probe_side_pipe);
diff --git a/be/src/pipeline/pipeline_fragment_context.h 
b/be/src/pipeline/pipeline_fragment_context.h
index abe007dadb3..f908abc6b31 100644
--- a/be/src/pipeline/pipeline_fragment_context.h
+++ b/be/src/pipeline/pipeline_fragment_context.h
@@ -154,7 +154,8 @@ private:
                                                    const DescriptorTbl& descs, 
OperatorPtr& op,
                                                    PipelinePtr& cur_pipe, int 
parent_idx,
                                                    int child_idx,
-                                                   bool 
followed_by_shuffled_operator);
+                                                   bool 
followed_by_shuffled_operator,
+                                                   
std::vector<DataSinkOperatorPtr>& sink_ops);
 
     Status _create_data_sink(ObjectPool* pool, const TDataSink& thrift_sink,
                              const std::vector<TExpr>& output_exprs,
diff --git a/be/test/pipeline/operator/hashjoin_probe_operator_test.cpp 
b/be/test/pipeline/operator/hashjoin_probe_operator_test.cpp
index b94a33f197d..5540c6d06f6 100644
--- a/be/test/pipeline/operator/hashjoin_probe_operator_test.cpp
+++ b/be/test/pipeline/operator/hashjoin_probe_operator_test.cpp
@@ -202,8 +202,8 @@ public:
         ASSERT_EQ(sink_operator->should_dry_run(_helper.runtime_state.get()),
                   join_params.is_broadcast_join && !should_build_hash_table);
 
-        ASSERT_EQ(sink_operator->require_data_distribution(), false);
-        ASSERT_EQ(probe_operator->require_data_distribution(), false);
+        ASSERT_EQ(sink_operator->is_colocated_operator(), false);
+        ASSERT_EQ(probe_operator->is_colocated_operator(), false);
         ASSERT_FALSE(sink_operator->is_shuffled_operator());
         ASSERT_FALSE(probe_operator->is_shuffled_operator());
         std::cout << "sink distribution: "
diff --git a/be/test/pipeline/operator/partitioned_aggregation_test_helper.cpp 
b/be/test/pipeline/operator/partitioned_aggregation_test_helper.cpp
index 060e8f7a6a8..f3eb1214a47 100644
--- a/be/test/pipeline/operator/partitioned_aggregation_test_helper.cpp
+++ b/be/test/pipeline/operator/partitioned_aggregation_test_helper.cpp
@@ -165,8 +165,8 @@ PartitionedAggregationTestHelper::create_operators() {
 
     auto source_operator =
             std::make_shared<PartitionedAggSourceOperatorX>(obj_pool.get(), 
tnode, 0, desc_tbl);
-    auto sink_operator = 
std::make_shared<PartitionedAggSinkOperatorX>(obj_pool.get(), 0, 0, tnode,
-                                                                       
desc_tbl, false);
+    auto sink_operator =
+            std::make_shared<PartitionedAggSinkOperatorX>(obj_pool.get(), 0, 
0, tnode, desc_tbl);
 
     auto child_operator = std::make_shared<MockChildOperator>();
     auto probe_side_source_operator = std::make_shared<MockChildOperator>();
diff --git a/be/test/pipeline/operator/partitioned_aggregation_test_helper.h 
b/be/test/pipeline/operator/partitioned_aggregation_test_helper.h
index 3e65201b84c..3b434378144 100644
--- a/be/test/pipeline/operator/partitioned_aggregation_test_helper.h
+++ b/be/test/pipeline/operator/partitioned_aggregation_test_helper.h
@@ -72,7 +72,7 @@ class MockPartitionedAggSinkOperatorX : public 
PartitionedAggSinkOperatorX {
 public:
     MockPartitionedAggSinkOperatorX(ObjectPool* pool, int operator_id, int 
dest_id,
                                     const TPlanNode& tnode, const 
DescriptorTbl& descs)
-            : PartitionedAggSinkOperatorX(pool, operator_id, dest_id, tnode, 
descs, false) {}
+            : PartitionedAggSinkOperatorX(pool, operator_id, dest_id, tnode, 
descs) {}
     ~MockPartitionedAggSinkOperatorX() override = default;
 
     Status prepare(RuntimeState* state) override { return Status::OK(); }
diff --git a/be/test/pipeline/operator/spill_sort_sink_operator_test.cpp 
b/be/test/pipeline/operator/spill_sort_sink_operator_test.cpp
index 2711417b3d8..c2b54ab157b 100644
--- a/be/test/pipeline/operator/spill_sort_sink_operator_test.cpp
+++ b/be/test/pipeline/operator/spill_sort_sink_operator_test.cpp
@@ -80,8 +80,8 @@ TEST_F(SpillSortSinkOperatorTest, Basic) {
             _helper.runtime_state.get());
     ASSERT_EQ(data_distribution.distribution_type, 
inner_data_distribution.distribution_type);
 
-    ASSERT_EQ(sink_operator->require_data_distribution(),
-              sink_operator->_sort_sink_operator->require_data_distribution());
+    ASSERT_EQ(sink_operator->is_colocated_operator(),
+              sink_operator->_sort_sink_operator->is_colocated_operator());
 
     st = sink_local_state->close(_helper.runtime_state.get(), st);
     ASSERT_TRUE(st.ok()) << "close failed: " << st.to_string();
diff --git a/be/test/pipeline/operator/spill_sort_test_helper.cpp 
b/be/test/pipeline/operator/spill_sort_test_helper.cpp
index 8c8836a43d9..3dafb39778f 100644
--- a/be/test/pipeline/operator/spill_sort_test_helper.cpp
+++ b/be/test/pipeline/operator/spill_sort_test_helper.cpp
@@ -144,7 +144,7 @@ SpillSortTestHelper::create_operators() {
     auto source_operator =
             std::make_shared<SpillSortSourceOperatorX>(obj_pool.get(), tnode, 
0, desc_tbl);
     auto sink_operator =
-            std::make_shared<SpillSortSinkOperatorX>(obj_pool.get(), 0, 0, 
tnode, desc_tbl, false);
+            std::make_shared<SpillSortSinkOperatorX>(obj_pool.get(), 0, 0, 
tnode, desc_tbl);
 
     auto child_operator = std::make_shared<MockChildOperator>();
     auto probe_side_source_operator = std::make_shared<MockChildOperator>();


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to