This is an automated email from the ASF dual-hosted git repository.

gabriellee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 421ab56c3e0 [pipelineX](improvement) Support local shuffle for join 
and agg (#27852)
421ab56c3e0 is described below

commit 421ab56c3e00799f6b8259f1adc9f5dd1a7c2be8
Author: Gabriel <[email protected]>
AuthorDate: Sat Dec 2 20:17:18 2023 +0800

    [pipelineX](improvement) Support local shuffle for join and agg (#27852)
---
 be/src/pipeline/exec/aggregation_sink_operator.cpp |   3 +-
 be/src/pipeline/exec/aggregation_sink_operator.h   |  10 ++
 .../distinct_streaming_aggregation_sink_operator.h |   1 +
 be/src/pipeline/exec/exchange_sink_operator.cpp    |   2 +-
 be/src/pipeline/exec/exchange_sink_operator.h      |   2 +-
 be/src/pipeline/exec/exchange_source_operator.cpp  |   5 +
 be/src/pipeline/exec/exchange_source_operator.h    |   6 +-
 be/src/pipeline/exec/file_scan_operator.cpp        |   5 +
 be/src/pipeline/exec/file_scan_operator.h          |   7 +-
 be/src/pipeline/exec/hashjoin_build_sink.cpp       |   1 +
 be/src/pipeline/exec/hashjoin_build_sink.h         |   9 +
 be/src/pipeline/exec/hashjoin_probe_operator.cpp   |   3 +
 be/src/pipeline/exec/hashjoin_probe_operator.h     |   9 +
 be/src/pipeline/exec/jdbc_scan_operator.cpp        |   5 +
 be/src/pipeline/exec/jdbc_scan_operator.h          |   2 +
 be/src/pipeline/exec/join_probe_operator.h         |   4 +-
 .../pipeline/exec/multi_cast_data_stream_sink.cpp  |   2 +-
 be/src/pipeline/exec/multi_cast_data_stream_sink.h |   2 +-
 be/src/pipeline/exec/olap_scan_operator.cpp        |   2 +-
 be/src/pipeline/exec/olap_scan_operator.h          |   7 +-
 be/src/pipeline/exec/operator.h                    |   2 +-
 .../exec/streaming_aggregation_sink_operator.h     |   1 +
 be/src/pipeline/pipeline.h                         |   1 +
 be/src/pipeline/pipeline_fragment_context.cpp      |   9 +
 be/src/pipeline/pipeline_fragment_context.h        |   2 +
 be/src/pipeline/pipeline_x/dependency.h            |  19 +++
 .../local_exchange/local_exchange_sink_operator.h  |  14 +-
 .../local_exchange_source_operator.cpp             |   3 -
 .../local_exchange_source_operator.h               |  20 +--
 .../pipeline_x/local_exchange/local_exchanger.cpp  |  10 +-
 .../pipeline_x/local_exchange/local_exchanger.h    |   7 +-
 be/src/pipeline/pipeline_x/operator.cpp            |   9 +-
 be/src/pipeline/pipeline_x/operator.h              |  21 ++-
 .../pipeline_x/pipeline_x_fragment_context.cpp     | 184 +++++++++++++--------
 .../pipeline_x/pipeline_x_fragment_context.h       |   6 +-
 be/src/pipeline/pipeline_x/pipeline_x_task.cpp     |  13 +-
 be/src/pipeline/pipeline_x/pipeline_x_task.h       |   5 +-
 be/src/runtime/fragment_mgr.cpp                    |   2 +
 .../glue/translator/PhysicalPlanTranslator.java    |   1 +
 .../org/apache/doris/planner/ExchangeNode.java     |   7 +
 gensrc/thrift/PlanNodes.thrift                     |   2 +
 41 files changed, 281 insertions(+), 144 deletions(-)

diff --git a/be/src/pipeline/exec/aggregation_sink_operator.cpp 
b/be/src/pipeline/exec/aggregation_sink_operator.cpp
index 49cbe30a3b2..4d6f9636de7 100644
--- a/be/src/pipeline/exec/aggregation_sink_operator.cpp
+++ b/be/src/pipeline/exec/aggregation_sink_operator.cpp
@@ -730,7 +730,8 @@ 
AggSinkOperatorX<LocalStateType>::AggSinkOperatorX(ObjectPool* pool, int operato
           _pool(pool),
           _limit(tnode.limit),
           _have_conjuncts(tnode.__isset.vconjunct && 
!tnode.vconjunct.nodes.empty()),
-          _is_streaming(is_streaming) {
+          _is_streaming(is_streaming),
+          _partition_exprs(tnode.agg_node.grouping_exprs) {
     _is_first_phase = tnode.agg_node.__isset.is_first_phase && 
tnode.agg_node.is_first_phase;
 }
 
diff --git a/be/src/pipeline/exec/aggregation_sink_operator.h 
b/be/src/pipeline/exec/aggregation_sink_operator.h
index 7c5483e1e17..232a61d50f1 100644
--- a/be/src/pipeline/exec/aggregation_sink_operator.h
+++ b/be/src/pipeline/exec/aggregation_sink_operator.h
@@ -366,6 +366,14 @@ public:
     Status sink(RuntimeState* state, vectorized::Block* in_block,
                 SourceState source_state) override;
 
+    std::vector<TExpr> get_local_shuffle_exprs() const override { return 
_partition_exprs; }
+    ExchangeType get_local_exchange_type() const override {
+        if (_probe_expr_ctxs.empty()) {
+            return _needs_finalize ? ExchangeType::PASSTHROUGH : 
ExchangeType::NOOP;
+        }
+        return ExchangeType::SHUFFLE;
+    }
+
     using DataSinkOperatorX<LocalStateType>::id;
     using DataSinkOperatorX<LocalStateType>::operator_id;
     using DataSinkOperatorX<LocalStateType>::get_local_state;
@@ -405,6 +413,8 @@ protected:
     int64_t _limit; // -1: no limit
     bool _have_conjuncts;
     const bool _is_streaming;
+
+    const std::vector<TExpr> _partition_exprs;
 };
 
 } // namespace pipeline
diff --git 
a/be/src/pipeline/exec/distinct_streaming_aggregation_sink_operator.h 
b/be/src/pipeline/exec/distinct_streaming_aggregation_sink_operator.h
index b30a8298727..c04e35448d1 100644
--- a/be/src/pipeline/exec/distinct_streaming_aggregation_sink_operator.h
+++ b/be/src/pipeline/exec/distinct_streaming_aggregation_sink_operator.h
@@ -110,6 +110,7 @@ public:
     Status init(const TPlanNode& tnode, RuntimeState* state) override;
     Status sink(RuntimeState* state, vectorized::Block* in_block,
                 SourceState source_state) override;
+    ExchangeType get_local_exchange_type() const override { return 
ExchangeType::PASSTHROUGH; }
 };
 
 } // namespace pipeline
diff --git a/be/src/pipeline/exec/exchange_sink_operator.cpp 
b/be/src/pipeline/exec/exchange_sink_operator.cpp
index 20d612d5785..8065ae3082a 100644
--- a/be/src/pipeline/exec/exchange_sink_operator.cpp
+++ b/be/src/pipeline/exec/exchange_sink_operator.cpp
@@ -242,7 +242,7 @@ Status ExchangeSinkLocalState::open(RuntimeState* state) {
     return Status::OK();
 }
 
-std::string ExchangeSinkLocalState::id_name() {
+std::string ExchangeSinkLocalState::name_suffix() {
     std::string name = " (id=" + std::to_string(_parent->node_id());
     auto& p = _parent->cast<ExchangeSinkOperatorX>();
     name += ",dest_id=" + std::to_string(p._dest_node_id);
diff --git a/be/src/pipeline/exec/exchange_sink_operator.h 
b/be/src/pipeline/exec/exchange_sink_operator.h
index 048c8d3910c..32a61eda334 100644
--- a/be/src/pipeline/exec/exchange_sink_operator.h
+++ b/be/src/pipeline/exec/exchange_sink_operator.h
@@ -184,7 +184,7 @@ public:
 
     [[nodiscard]] int sender_id() const { return _sender_id; }
 
-    std::string id_name() override;
+    std::string name_suffix() override;
     segment_v2::CompressionTypePB& compression_type();
     std::string debug_string(int indentation_level) const override;
 
diff --git a/be/src/pipeline/exec/exchange_source_operator.cpp 
b/be/src/pipeline/exec/exchange_source_operator.cpp
index 1c766f06a82..f64ebcddcac 100644
--- a/be/src/pipeline/exec/exchange_source_operator.cpp
+++ b/be/src/pipeline/exec/exchange_source_operator.cpp
@@ -110,6 +110,11 @@ 
ExchangeSourceOperatorX::ExchangeSourceOperatorX(ObjectPool* pool, const TPlanNo
         : OperatorX<ExchangeLocalState>(pool, tnode, operator_id, descs),
           _num_senders(num_senders),
           _is_merging(tnode.exchange_node.__isset.sort_info),
+          _is_hash_partition(
+                  tnode.exchange_node.__isset.partition_type &&
+                  (tnode.exchange_node.partition_type == 
TPartitionType::HASH_PARTITIONED ||
+                   tnode.exchange_node.partition_type ==
+                           TPartitionType::BUCKET_SHFFULE_HASH_PARTITIONED)),
           _input_row_desc(descs, tnode.exchange_node.input_row_tuples,
                           std::vector<bool>(tnode.nullable_tuples.begin(),
                                             tnode.nullable_tuples.begin() +
diff --git a/be/src/pipeline/exec/exchange_source_operator.h 
b/be/src/pipeline/exec/exchange_source_operator.h
index 479a8799058..633a9c3e29a 100644
--- a/be/src/pipeline/exec/exchange_source_operator.h
+++ b/be/src/pipeline/exec/exchange_source_operator.h
@@ -104,15 +104,13 @@ public:
         return _sub_plan_query_statistics_recvr;
     }
 
-    bool need_to_local_shuffle() const override {
-        // TODO(gabriel):
-        return false;
-    }
+    bool need_to_local_shuffle() const override { return !_is_hash_partition; }
 
 private:
     friend class ExchangeLocalState;
     const int _num_senders;
     const bool _is_merging;
+    const bool _is_hash_partition;
     RowDescriptor _input_row_desc;
     std::shared_ptr<QueryStatisticsRecvr> _sub_plan_query_statistics_recvr;
 
diff --git a/be/src/pipeline/exec/file_scan_operator.cpp 
b/be/src/pipeline/exec/file_scan_operator.cpp
index 819575211c9..f34346c9063 100644
--- a/be/src/pipeline/exec/file_scan_operator.cpp
+++ b/be/src/pipeline/exec/file_scan_operator.cpp
@@ -53,6 +53,11 @@ Status 
FileScanLocalState::_init_scanners(std::list<vectorized::VScannerSPtr>* s
     return Status::OK();
 }
 
+std::string FileScanLocalState::name_suffix() const {
+    return fmt::format(" (id={}. table name = {})", 
std::to_string(_parent->node_id()),
+                       _parent->cast<FileScanOperatorX>()._table_name);
+}
+
 void FileScanLocalState::set_scan_ranges(RuntimeState* state,
                                          const std::vector<TScanRangeParams>& 
scan_ranges) {
     int max_scanners =
diff --git a/be/src/pipeline/exec/file_scan_operator.h 
b/be/src/pipeline/exec/file_scan_operator.h
index 4648ed716a0..6ae3344ed71 100644
--- a/be/src/pipeline/exec/file_scan_operator.h
+++ b/be/src/pipeline/exec/file_scan_operator.h
@@ -54,6 +54,7 @@ public:
     void set_scan_ranges(RuntimeState* state,
                          const std::vector<TScanRangeParams>& scan_ranges) 
override;
     int parent_id() { return _parent->node_id(); }
+    std::string name_suffix() const override;
 
 private:
     std::vector<TScanRangeParams> _scan_ranges;
@@ -70,7 +71,9 @@ class FileScanOperatorX final : public 
ScanOperatorX<FileScanLocalState> {
 public:
     FileScanOperatorX(ObjectPool* pool, const TPlanNode& tnode, int 
operator_id,
                       const DescriptorTbl& descs)
-            : ScanOperatorX<FileScanLocalState>(pool, tnode, operator_id, 
descs) {
+            : ScanOperatorX<FileScanLocalState>(pool, tnode, operator_id, 
descs),
+              _table_name(tnode.file_scan_node.__isset.table_name ? 
tnode.file_scan_node.table_name
+                                                                  : "") {
         _output_tuple_id = tnode.file_scan_node.tuple_id;
     }
 
@@ -78,6 +81,8 @@ public:
 
 private:
     friend class FileScanLocalState;
+
+    const std::string _table_name;
 };
 
 } // namespace doris::pipeline
diff --git a/be/src/pipeline/exec/hashjoin_build_sink.cpp 
b/be/src/pipeline/exec/hashjoin_build_sink.cpp
index 1e31014512e..92c61882e2a 100644
--- a/be/src/pipeline/exec/hashjoin_build_sink.cpp
+++ b/be/src/pipeline/exec/hashjoin_build_sink.cpp
@@ -414,6 +414,7 @@ Status HashJoinBuildSinkOperatorX::init(const TPlanNode& 
tnode, RuntimeState* st
     for (const auto& eq_join_conjunct : eq_join_conjuncts) {
         vectorized::VExprContextSPtr ctx;
         
RETURN_IF_ERROR(vectorized::VExpr::create_expr_tree(eq_join_conjunct.right, 
ctx));
+        _partition_exprs.push_back(eq_join_conjunct.right);
         _build_expr_ctxs.push_back(ctx);
 
         const auto vexpr = _build_expr_ctxs.back()->root();
diff --git a/be/src/pipeline/exec/hashjoin_build_sink.h 
b/be/src/pipeline/exec/hashjoin_build_sink.h
index b45c2eed752..cc34f46d4dd 100644
--- a/be/src/pipeline/exec/hashjoin_build_sink.h
+++ b/be/src/pipeline/exec/hashjoin_build_sink.h
@@ -155,6 +155,14 @@ public:
                                               ._should_build_hash_table;
     }
 
+    std::vector<TExpr> get_local_shuffle_exprs() const override { return 
_partition_exprs; }
+    ExchangeType get_local_exchange_type() const override {
+        if (_join_op == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN || 
_is_broadcast_join) {
+            return ExchangeType::NOOP;
+        }
+        return ExchangeType::SHUFFLE;
+    }
+
 private:
     friend class HashJoinBuildSinkLocalState;
 
@@ -171,6 +179,7 @@ private:
 
     vectorized::SharedHashTableContextPtr _shared_hash_table_context = nullptr;
     std::vector<TRuntimeFilterDesc> _runtime_filter_descs;
+    std::vector<TExpr> _partition_exprs;
 };
 
 } // namespace pipeline
diff --git a/be/src/pipeline/exec/hashjoin_probe_operator.cpp 
b/be/src/pipeline/exec/hashjoin_probe_operator.cpp
index 174d102993d..74304bf5351 100644
--- a/be/src/pipeline/exec/hashjoin_probe_operator.cpp
+++ b/be/src/pipeline/exec/hashjoin_probe_operator.cpp
@@ -222,6 +222,8 @@ void HashJoinProbeLocalState::_prepare_probe_block() {
 HashJoinProbeOperatorX::HashJoinProbeOperatorX(ObjectPool* pool, const 
TPlanNode& tnode,
                                                int operator_id, const 
DescriptorTbl& descs)
         : JoinProbeOperatorX<HashJoinProbeLocalState>(pool, tnode, 
operator_id, descs),
+          _is_broadcast_join(tnode.hash_join_node.__isset.is_broadcast_join &&
+                             tnode.hash_join_node.is_broadcast_join),
           
_hash_output_slot_ids(tnode.hash_join_node.__isset.hash_output_slot_ids
                                         ? 
tnode.hash_join_node.hash_output_slot_ids
                                         : std::vector<SlotId> {}) {}
@@ -549,6 +551,7 @@ Status HashJoinProbeOperatorX::init(const TPlanNode& tnode, 
RuntimeState* state)
     for (const auto& eq_join_conjunct : eq_join_conjuncts) {
         vectorized::VExprContextSPtr ctx;
         
RETURN_IF_ERROR(vectorized::VExpr::create_expr_tree(eq_join_conjunct.left, 
ctx));
+        _partition_exprs.push_back(eq_join_conjunct.left);
         _probe_expr_ctxs.push_back(ctx);
         bool null_aware = eq_join_conjunct.__isset.opcode &&
                           eq_join_conjunct.opcode == TExprOpcode::EQ_FOR_NULL;
diff --git a/be/src/pipeline/exec/hashjoin_probe_operator.h 
b/be/src/pipeline/exec/hashjoin_probe_operator.h
index 4de50474bfb..263b4d4252b 100644
--- a/be/src/pipeline/exec/hashjoin_probe_operator.h
+++ b/be/src/pipeline/exec/hashjoin_probe_operator.h
@@ -163,6 +163,13 @@ public:
                 SourceState& source_state) const override;
 
     bool need_more_input_data(RuntimeState* state) const override;
+    std::vector<TExpr> get_local_shuffle_exprs() const override { return 
_partition_exprs; }
+    ExchangeType get_local_exchange_type() const override {
+        if (_join_op == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN) {
+            return ExchangeType::NOOP;
+        }
+        return _is_broadcast_join ? ExchangeType::PASSTHROUGH : 
ExchangeType::SHUFFLE;
+    }
 
 private:
     Status _do_evaluate(vectorized::Block& block, 
vectorized::VExprContextSPtrs& exprs,
@@ -170,6 +177,7 @@ private:
                         std::vector<int>& res_col_ids) const;
     friend class HashJoinProbeLocalState;
 
+    const bool _is_broadcast_join;
     // other expr
     vectorized::VExprContextSPtrs _other_join_conjuncts;
     // probe expr
@@ -182,6 +190,7 @@ private:
     std::vector<bool> _left_output_slot_flags;
     std::vector<bool> _right_output_slot_flags;
     std::vector<std::string> _right_table_column_names;
+    std::vector<TExpr> _partition_exprs;
 };
 
 } // namespace pipeline
diff --git a/be/src/pipeline/exec/jdbc_scan_operator.cpp 
b/be/src/pipeline/exec/jdbc_scan_operator.cpp
index 1f03939df15..74890f647fc 100644
--- a/be/src/pipeline/exec/jdbc_scan_operator.cpp
+++ b/be/src/pipeline/exec/jdbc_scan_operator.cpp
@@ -22,6 +22,11 @@
 
 namespace doris::pipeline {
 
+std::string JDBCScanLocalState::name_suffix() const {
+    return fmt::format(" (id={}. table name = {})", 
std::to_string(_parent->node_id()),
+                       _parent->cast<JDBCScanOperatorX>()._table_name);
+}
+
 Status JDBCScanLocalState::_init_scanners(std::list<vectorized::VScannerSPtr>* 
scanners) {
     auto& p = _parent->cast<JDBCScanOperatorX>();
     std::unique_ptr<vectorized::NewJdbcScanner> scanner = 
vectorized::NewJdbcScanner::create_unique(
diff --git a/be/src/pipeline/exec/jdbc_scan_operator.h 
b/be/src/pipeline/exec/jdbc_scan_operator.h
index bf954e25dfa..2acf5b5ec9d 100644
--- a/be/src/pipeline/exec/jdbc_scan_operator.h
+++ b/be/src/pipeline/exec/jdbc_scan_operator.h
@@ -45,6 +45,8 @@ public:
             : ScanLocalState<JDBCScanLocalState>(state, parent) {}
     Status _init_scanners(std::list<vectorized::VScannerSPtr>* scanners) 
override;
 
+    std::string name_suffix() const override;
+
 private:
     friend class vectorized::NewJdbcScanner;
 };
diff --git a/be/src/pipeline/exec/join_probe_operator.h 
b/be/src/pipeline/exec/join_probe_operator.h
index 12d89c1049e..6a947c5f6b1 100644
--- a/be/src/pipeline/exec/join_probe_operator.h
+++ b/be/src/pipeline/exec/join_probe_operator.h
@@ -84,7 +84,7 @@ public:
     }
 
     Status set_child(OperatorXPtr child) override {
-        if (OperatorX<LocalStateType>::_child_x) {
+        if (OperatorX<LocalStateType>::_child_x && _build_side_child == 
nullptr) {
             // when there already (probe) child, others is build child.
             set_build_side_child(child);
         } else {
@@ -113,7 +113,7 @@ protected:
     std::unique_ptr<RowDescriptor> _intermediate_row_desc;
     // output expr
     vectorized::VExprContextSPtrs _output_expr_ctxs;
-    OperatorXPtr _build_side_child;
+    OperatorXPtr _build_side_child = nullptr;
     const bool _short_circuit_for_null_in_build_side;
 };
 
diff --git a/be/src/pipeline/exec/multi_cast_data_stream_sink.cpp 
b/be/src/pipeline/exec/multi_cast_data_stream_sink.cpp
index ad6b3c8cdda..8a45634027f 100644
--- a/be/src/pipeline/exec/multi_cast_data_stream_sink.cpp
+++ b/be/src/pipeline/exec/multi_cast_data_stream_sink.cpp
@@ -25,7 +25,7 @@ OperatorPtr 
MultiCastDataStreamSinkOperatorBuilder::build_operator() {
     return std::make_shared<MultiCastDataStreamSinkOperator>(this, _sink);
 }
 
-std::string MultiCastDataStreamSinkLocalState::id_name() {
+std::string MultiCastDataStreamSinkLocalState::name_suffix() {
     auto& sinks = 
static_cast<MultiCastDataStreamSinkOperatorX*>(_parent)->sink_node().sinks;
     std::string id_name = " (dst id : ";
     for (auto& sink : sinks) {
diff --git a/be/src/pipeline/exec/multi_cast_data_stream_sink.h 
b/be/src/pipeline/exec/multi_cast_data_stream_sink.h
index a2ad07e5297..a30361ebe14 100644
--- a/be/src/pipeline/exec/multi_cast_data_stream_sink.h
+++ b/be/src/pipeline/exec/multi_cast_data_stream_sink.h
@@ -60,7 +60,7 @@ class MultiCastDataStreamSinkLocalState final
     using Base = PipelineXSinkLocalState<MultiCastSinkDependency>;
     using Parent = MultiCastDataStreamSinkOperatorX;
     Status init(RuntimeState* state, LocalSinkStateInfo& info) override;
-    std::string id_name() override;
+    std::string name_suffix() override;
 
 private:
     std::shared_ptr<pipeline::MultiCastDataStreamer> _multi_cast_data_streamer;
diff --git a/be/src/pipeline/exec/olap_scan_operator.cpp 
b/be/src/pipeline/exec/olap_scan_operator.cpp
index 2f22daa5190..3f0cc369427 100644
--- a/be/src/pipeline/exec/olap_scan_operator.cpp
+++ b/be/src/pipeline/exec/olap_scan_operator.cpp
@@ -298,7 +298,7 @@ Status 
OlapScanLocalState::_init_scanners(std::list<vectorized::VScannerSPtr>* s
     return Status::OK();
 }
 
-TOlapScanNode& OlapScanLocalState::olap_scan_node() {
+TOlapScanNode& OlapScanLocalState::olap_scan_node() const {
     return _parent->cast<OlapScanOperatorX>()._olap_scan_node;
 }
 
diff --git a/be/src/pipeline/exec/olap_scan_operator.h 
b/be/src/pipeline/exec/olap_scan_operator.h
index 1f0fac55a43..868d3efe555 100644
--- a/be/src/pipeline/exec/olap_scan_operator.h
+++ b/be/src/pipeline/exec/olap_scan_operator.h
@@ -45,7 +45,12 @@ public:
     OlapScanLocalState(RuntimeState* state, OperatorXBase* parent)
             : ScanLocalState(state, parent) {}
 
-    TOlapScanNode& olap_scan_node();
+    TOlapScanNode& olap_scan_node() const;
+
+    std::string name_suffix() const override {
+        return fmt::format(" (id={}. table name = {})", 
std::to_string(_parent->node_id()),
+                           olap_scan_node().table_name);
+    }
 
 private:
     friend class vectorized::NewOlapScanner;
diff --git a/be/src/pipeline/exec/operator.h b/be/src/pipeline/exec/operator.h
index a006cda9436..2beea932a8b 100644
--- a/be/src/pipeline/exec/operator.h
+++ b/be/src/pipeline/exec/operator.h
@@ -269,7 +269,7 @@ protected:
     OperatorPtr _child;
 
     // Used on pipeline X
-    OperatorXPtr _child_x;
+    OperatorXPtr _child_x = nullptr;
 
     bool _is_closed;
 };
diff --git a/be/src/pipeline/exec/streaming_aggregation_sink_operator.h 
b/be/src/pipeline/exec/streaming_aggregation_sink_operator.h
index 182946188eb..b0327e71cd1 100644
--- a/be/src/pipeline/exec/streaming_aggregation_sink_operator.h
+++ b/be/src/pipeline/exec/streaming_aggregation_sink_operator.h
@@ -120,6 +120,7 @@ public:
     Status init(const TPlanNode& tnode, RuntimeState* state) override;
     Status sink(RuntimeState* state, vectorized::Block* in_block,
                 SourceState source_state) override;
+    ExchangeType get_local_exchange_type() const override { return 
ExchangeType::PASSTHROUGH; }
 };
 
 } // namespace pipeline
diff --git a/be/src/pipeline/pipeline.h b/be/src/pipeline/pipeline.h
index a0b6de5c62e..68213fe9c27 100644
--- a/be/src/pipeline/pipeline.h
+++ b/be/src/pipeline/pipeline.h
@@ -136,6 +136,7 @@ public:
 
     std::vector<std::shared_ptr<Pipeline>>& children() { return _children; }
     void set_children(std::shared_ptr<Pipeline> child) { 
_children.push_back(child); }
+    void set_children(std::vector<std::shared_ptr<Pipeline>> children) { 
_children = children; }
 
 private:
     void _init_profile();
diff --git a/be/src/pipeline/pipeline_fragment_context.cpp 
b/be/src/pipeline/pipeline_fragment_context.cpp
index ce3d943af70..5812da5b7f5 100644
--- a/be/src/pipeline/pipeline_fragment_context.cpp
+++ b/be/src/pipeline/pipeline_fragment_context.cpp
@@ -194,6 +194,15 @@ PipelinePtr PipelineFragmentContext::add_pipeline() {
     return pipeline;
 }
 
+PipelinePtr PipelineFragmentContext::add_pipeline(PipelinePtr parent) {
+    // _prepared、_submitted, _canceled should do not add pipeline
+    PipelineId id = _next_pipeline_id++;
+    auto pipeline = std::make_shared<Pipeline>(id, weak_from_this());
+    _pipelines.emplace_back(pipeline);
+    parent->set_children(pipeline);
+    return pipeline;
+}
+
 Status PipelineFragmentContext::prepare(const doris::TPipelineFragmentParams& 
request,
                                         const size_t idx) {
     if (_prepared) {
diff --git a/be/src/pipeline/pipeline_fragment_context.h 
b/be/src/pipeline/pipeline_fragment_context.h
index f38996a161f..480e4332d44 100644
--- a/be/src/pipeline/pipeline_fragment_context.h
+++ b/be/src/pipeline/pipeline_fragment_context.h
@@ -71,6 +71,8 @@ public:
 
     PipelinePtr add_pipeline();
 
+    PipelinePtr add_pipeline(PipelinePtr parent);
+
     TUniqueId get_fragment_instance_id() const { return _fragment_instance_id; 
}
 
     virtual RuntimeState* get_runtime_state(UniqueId /*fragment_instance_id*/) 
{
diff --git a/be/src/pipeline/pipeline_x/dependency.h 
b/be/src/pipeline/pipeline_x/dependency.h
index 49dc327c796..23ce13a1db6 100644
--- a/be/src/pipeline/pipeline_x/dependency.h
+++ b/be/src/pipeline/pipeline_x/dependency.h
@@ -574,6 +574,25 @@ public:
     }
 };
 
+enum class ExchangeType : uint8_t {
+    NOOP = 0,
+    SHUFFLE = 1,
+    PASSTHROUGH = 2,
+};
+
+inline std::string get_exchange_type_name(ExchangeType idx) {
+    switch (idx) {
+    case ExchangeType::NOOP:
+        return "NOOP";
+    case ExchangeType::SHUFFLE:
+        return "SHUFFLE";
+    case ExchangeType::PASSTHROUGH:
+        return "PASSTHROUGH";
+    }
+    LOG(FATAL) << "__builtin_unreachable";
+    __builtin_unreachable();
+}
+
 class Exchanger;
 
 struct LocalExchangeSharedState : public BasicSharedState {
diff --git 
a/be/src/pipeline/pipeline_x/local_exchange/local_exchange_sink_operator.h 
b/be/src/pipeline/pipeline_x/local_exchange/local_exchange_sink_operator.h
index c5fe9dc7dc4..f5f1d0f48a7 100644
--- a/be/src/pipeline/pipeline_x/local_exchange/local_exchange_sink_operator.h
+++ b/be/src/pipeline/pipeline_x/local_exchange/local_exchange_sink_operator.h
@@ -89,10 +89,10 @@ public:
         return Status::InternalError("{} should not init with TPlanNode", 
Base::_name);
     }
 
-    Status init(bool need_partitioner) override {
-        _name = "LOCAL_EXCHANGE_SINK_OPERATOR";
-        _need_partitioner = need_partitioner;
-        if (_need_partitioner) {
+    Status init(ExchangeType type) override {
+        _name = "LOCAL_EXCHANGE_SINK_OPERATOR (" + 
get_exchange_type_name(type) + ")";
+        _type = type;
+        if (_type == ExchangeType::SHUFFLE) {
             _partitioner.reset(
                     new 
vectorized::Crc32HashPartitioner<LocalExchangeChannelIds>(_num_partitions));
             RETURN_IF_ERROR(_partitioner->init(_texprs));
@@ -102,7 +102,7 @@ public:
     }
 
     Status prepare(RuntimeState* state) override {
-        if (_need_partitioner) {
+        if (_type == ExchangeType::SHUFFLE) {
             RETURN_IF_ERROR(_partitioner->prepare(state, 
_child_x->row_desc()));
         }
 
@@ -110,7 +110,7 @@ public:
     }
 
     Status open(RuntimeState* state) override {
-        if (_need_partitioner) {
+        if (_type == ExchangeType::SHUFFLE) {
             RETURN_IF_ERROR(_partitioner->open(state));
         }
 
@@ -122,7 +122,7 @@ public:
 
 private:
     friend class LocalExchangeSinkLocalState;
-    bool _need_partitioner;
+    ExchangeType _type;
     const int _num_partitions;
     const std::vector<TExpr>& _texprs;
     std::unique_ptr<vectorized::PartitionerBase> _partitioner;
diff --git 
a/be/src/pipeline/pipeline_x/local_exchange/local_exchange_source_operator.cpp 
b/be/src/pipeline/pipeline_x/local_exchange/local_exchange_source_operator.cpp
index dd64852891f..4467b32db4e 100644
--- 
a/be/src/pipeline/pipeline_x/local_exchange/local_exchange_source_operator.cpp
+++ 
b/be/src/pipeline/pipeline_x/local_exchange/local_exchange_source_operator.cpp
@@ -36,9 +36,6 @@ Status LocalExchangeSourceLocalState::init(RuntimeState* 
state, LocalStateInfo&
     RETURN_IF_ERROR(Base::init(state, info));
     SCOPED_TIMER(exec_time_counter());
     SCOPED_TIMER(_open_timer);
-    _dependency->set_shared_state(info.local_exchange_state);
-    _shared_state = 
(LocalExchangeSharedState*)_dependency->shared_state().get();
-    DCHECK(_shared_state != nullptr);
     _channel_id = info.task_idx;
     _shared_state->set_dep_by_channel_id(_dependency, _channel_id);
     _exchanger = _shared_state->exchanger.get();
diff --git 
a/be/src/pipeline/pipeline_x/local_exchange/local_exchange_source_operator.h 
b/be/src/pipeline/pipeline_x/local_exchange/local_exchange_source_operator.h
index d94b9041fce..9a44ce9678c 100644
--- a/be/src/pipeline/pipeline_x/local_exchange/local_exchange_source_operator.h
+++ b/be/src/pipeline/pipeline_x/local_exchange/local_exchange_source_operator.h
@@ -60,10 +60,9 @@ private:
 class LocalExchangeSourceOperatorX final : public 
OperatorX<LocalExchangeSourceLocalState> {
 public:
     using Base = OperatorX<LocalExchangeSourceLocalState>;
-    LocalExchangeSourceOperatorX(ObjectPool* pool, int id, OperatorXBase* 
parent)
-            : Base(pool, -1, id), _parent(parent) {}
-    Status init(const TPlanNode& tnode, RuntimeState* state) override {
-        _op_name = "LOCAL_EXCHANGE_OPERATOR";
+    LocalExchangeSourceOperatorX(ObjectPool* pool, int id) : Base(pool, -1, 
id) {}
+    Status init(ExchangeType type) override {
+        _op_name = "LOCAL_EXCHANGE_OPERATOR (" + get_exchange_type_name(type) 
+ ")";
         return Status::OK();
     }
     Status prepare(RuntimeState* state) override { return Status::OK(); }
@@ -79,21 +78,8 @@ public:
 
     bool is_source() const override { return true; }
 
-    Status set_child(OperatorXPtr child) override {
-        if (_child_x) {
-            // Set build side child for join probe operator
-            DCHECK(_parent != nullptr);
-            RETURN_IF_ERROR(_parent->set_child(child));
-        } else {
-            _child_x = std::move(child);
-        }
-        return Status::OK();
-    }
-
 private:
     friend class LocalExchangeSourceLocalState;
-
-    OperatorXBase* _parent = nullptr;
 };
 
 } // namespace doris::pipeline
diff --git a/be/src/pipeline/pipeline_x/local_exchange/local_exchanger.cpp 
b/be/src/pipeline/pipeline_x/local_exchange/local_exchanger.cpp
index 616b469a99f..13a3f232222 100644
--- a/be/src/pipeline/pipeline_x/local_exchange/local_exchanger.cpp
+++ b/be/src/pipeline/pipeline_x/local_exchange/local_exchanger.cpp
@@ -115,8 +115,8 @@ Status ShuffleExchanger::_split_rows(RuntimeState* state, 
const uint32_t* __rest
 Status PassthroughExchanger::sink(RuntimeState* state, vectorized::Block* 
in_block,
                                   SourceState source_state,
                                   LocalExchangeSinkLocalState& local_state) {
-    auto new_block = vectorized::Block::create_unique(in_block->clone_empty());
-    new_block->swap(*in_block);
+    vectorized::Block new_block(in_block->clone_empty());
+    new_block.swap(*in_block);
     auto channel_id = (local_state._channel_id++) % _num_instances;
     _data_queue[channel_id].enqueue(std::move(new_block));
     local_state._shared_state->set_ready_for_read(channel_id);
@@ -127,16 +127,16 @@ Status PassthroughExchanger::sink(RuntimeState* state, 
vectorized::Block* in_blo
 Status PassthroughExchanger::get_block(RuntimeState* state, vectorized::Block* 
block,
                                        SourceState& source_state,
                                        LocalExchangeSourceLocalState& 
local_state) {
-    std::unique_ptr<vectorized::Block> next_block;
+    vectorized::Block next_block;
     if (running_sink_operators == 0) {
         if (_data_queue[local_state._channel_id].try_dequeue(next_block)) {
-            *block = *next_block.release();
+            *block = std::move(next_block);
         } else {
             COUNTER_UPDATE(local_state._get_block_failed_counter, 1);
             source_state = SourceState::FINISHED;
         }
     } else if (_data_queue[local_state._channel_id].try_dequeue(next_block)) {
-        *block = *next_block.release();
+        *block = std::move(next_block);
     } else {
         COUNTER_UPDATE(local_state._get_block_failed_counter, 1);
         local_state._dependency->block();
diff --git a/be/src/pipeline/pipeline_x/local_exchange/local_exchanger.h 
b/be/src/pipeline/pipeline_x/local_exchange/local_exchanger.h
index 13e3fe931e7..b7acff688f6 100644
--- a/be/src/pipeline/pipeline_x/local_exchange/local_exchanger.h
+++ b/be/src/pipeline/pipeline_x/local_exchange/local_exchanger.h
@@ -22,11 +22,6 @@
 
 namespace doris::pipeline {
 
-enum class ExchangeType : uint8_t {
-    SHUFFLE = 0,
-    PASSTHROUGH = 1,
-};
-
 class LocalExchangeSourceLocalState;
 class LocalExchangeSinkLocalState;
 
@@ -92,7 +87,7 @@ public:
     ExchangeType get_type() const override { return ExchangeType::PASSTHROUGH; 
}
 
 private:
-    
std::vector<moodycamel::ConcurrentQueue<std::unique_ptr<vectorized::Block>>> 
_data_queue;
+    std::vector<moodycamel::ConcurrentQueue<vectorized::Block>> _data_queue;
 };
 
 } // namespace doris::pipeline
diff --git a/be/src/pipeline/pipeline_x/operator.cpp 
b/be/src/pipeline/pipeline_x/operator.cpp
index 04f7cea0314..76e4d42823a 100644
--- a/be/src/pipeline/pipeline_x/operator.cpp
+++ b/be/src/pipeline/pipeline_x/operator.cpp
@@ -330,8 +330,7 @@ 
PipelineXLocalStateBase::PipelineXLocalStateBase(RuntimeState* state, OperatorXB
 
 template <typename DependencyType>
 Status PipelineXLocalState<DependencyType>::init(RuntimeState* state, 
LocalStateInfo& info) {
-    _runtime_profile.reset(new RuntimeProfile(_parent->get_name() +
-                                              " (id=" + 
std::to_string(_parent->node_id()) + ")"));
+    _runtime_profile.reset(new RuntimeProfile(_parent->get_name() + 
name_suffix()));
     _runtime_profile->set_metadata(_parent->node_id());
     _runtime_profile->set_is_sink(false);
     info.parent_profile->add_child(_runtime_profile.get(), true, nullptr);
@@ -341,7 +340,7 @@ Status 
PipelineXLocalState<DependencyType>::init(RuntimeState* state, LocalState
     if constexpr (!std::is_same_v<FakeDependency, DependencyType>) {
         auto& deps = info.upstream_dependencies;
         if constexpr (std::is_same_v<LocalExchangeSourceDependency, 
DependencyType>) {
-            _dependency->set_shared_state(info.local_exchange_state);
+            
_dependency->set_shared_state(info.le_state_map[_parent->operator_id()]);
         } else {
             _dependency->set_shared_state(deps.front()->shared_state());
         }
@@ -402,7 +401,7 @@ template <typename DependencyType>
 Status PipelineXSinkLocalState<DependencyType>::init(RuntimeState* state,
                                                      LocalSinkStateInfo& info) 
{
     // create profile
-    _profile = state->obj_pool()->add(new RuntimeProfile(_parent->get_name() + 
id_name()));
+    _profile = state->obj_pool()->add(new RuntimeProfile(_parent->get_name() + 
name_suffix()));
     _profile->set_metadata(_parent->node_id());
     _profile->set_is_sink(true);
     _wait_for_finish_dependency_timer = ADD_TIMER(_profile, 
"PendingFinishDependency");
@@ -410,7 +409,7 @@ Status 
PipelineXSinkLocalState<DependencyType>::init(RuntimeState* state,
         auto& deps = info.dependencys;
         _dependency = (DependencyType*)deps.front().get();
         if constexpr (std::is_same_v<LocalExchangeSinkDependency, 
DependencyType>) {
-            _dependency->set_shared_state(info.local_exchange_state);
+            
_dependency->set_shared_state(info.le_state_map[_parent->dests_id().front()]);
         }
         if (_dependency) {
             _shared_state =
diff --git a/be/src/pipeline/pipeline_x/operator.h 
b/be/src/pipeline/pipeline_x/operator.h
index 58c18db7038..69d6a6cbc37 100644
--- a/be/src/pipeline/pipeline_x/operator.h
+++ b/be/src/pipeline/pipeline_x/operator.h
@@ -20,6 +20,7 @@
 #include "common/logging.h"
 #include "pipeline/exec/operator.h"
 #include "pipeline/pipeline_x/dependency.h"
+#include "pipeline/pipeline_x/local_exchange/local_exchanger.h"
 
 namespace doris::pipeline {
 
@@ -28,7 +29,7 @@ struct LocalStateInfo {
     RuntimeProfile* parent_profile = nullptr;
     const std::vector<TScanRangeParams> scan_ranges;
     std::vector<DependencySPtr>& upstream_dependencies;
-    std::shared_ptr<LocalExchangeSharedState> local_exchange_state;
+    std::map<int, std::shared_ptr<LocalExchangeSharedState>> le_state_map;
     int task_idx;
 
     DependencySPtr dependency;
@@ -39,7 +40,7 @@ struct LocalSinkStateInfo {
     RuntimeProfile* parent_profile = nullptr;
     const int sender_id;
     std::vector<DependencySPtr>& dependencys;
-    std::shared_ptr<LocalExchangeSharedState> local_exchange_state;
+    std::map<int, std::shared_ptr<LocalExchangeSharedState>> le_state_map;
     const TDataSink& tsink;
 };
 
@@ -160,6 +161,10 @@ public:
         LOG(FATAL) << "should not reach here!";
         return Status::OK();
     }
+    virtual Status init(ExchangeType type) {
+        LOG(FATAL) << "should not reach here!";
+        return Status::OK();
+    }
     [[nodiscard]] RuntimeProfile* get_runtime_profile() const override {
         throw doris::Exception(ErrorCode::NOT_IMPLEMENTED_ERROR,
                                "Runtime Profile is not owned by operator");
@@ -170,6 +175,8 @@ public:
     }
     [[nodiscard]] std::string get_name() const override { return _op_name; }
     virtual DependencySPtr get_dependency(QueryContext* ctx) = 0;
+    virtual std::vector<TExpr> get_local_shuffle_exprs() const { return {}; }
+    virtual ExchangeType get_local_exchange_type() const { return 
ExchangeType::NOOP; }
 
     Status prepare(RuntimeState* state) override;
 
@@ -313,6 +320,10 @@ public:
 
     Status init(RuntimeState* state, LocalStateInfo& info) override;
 
+    virtual std::string name_suffix() const {
+        return " (id=" + std::to_string(_parent->node_id()) + ")";
+    }
+
     Status close(RuntimeState* state) override;
 
     [[nodiscard]] std::string debug_string(int indentation_level = 0) const 
override;
@@ -427,7 +438,7 @@ public:
     virtual Status init(const TPlanNode& tnode, RuntimeState* state);
 
     Status init(const TDataSink& tsink) override;
-    virtual Status init(bool need_partitioner) {
+    virtual Status init(ExchangeType type) {
         return Status::InternalError("init() is only implemented in local 
exchange!");
     }
 
@@ -452,6 +463,8 @@ public:
     }
 
     virtual void get_dependency(std::vector<DependencySPtr>& dependency, 
QueryContext* ctx) = 0;
+    virtual std::vector<TExpr> get_local_shuffle_exprs() const { return {}; }
+    virtual ExchangeType get_local_exchange_type() const { return 
ExchangeType::NOOP; }
 
     Status close(RuntimeState* state) override {
         return Status::InternalError("Should not reach here!");
@@ -570,7 +583,7 @@ public:
 
     [[nodiscard]] std::string debug_string(int indentation_level) const 
override;
 
-    virtual std::string id_name() { return " (id=" + 
std::to_string(_parent->node_id()) + ")"; }
+    virtual std::string name_suffix() { return " (id=" + 
std::to_string(_parent->node_id()) + ")"; }
 
     Dependency* dependency() override { return _dependency; }
 
diff --git a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp 
b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp
index 636d4e235d7..71b45ad79c3 100644
--- a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp
+++ b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp
@@ -231,7 +231,7 @@ Status PipelineXFragmentContext::prepare(const 
doris::TPipelineFragmentParams& r
     RETURN_IF_ERROR(_sink->init(request.fragment.output_sink));
     static_cast<void>(root_pipeline->set_sink(_sink));
 
-    //    RETURN_IF_ERROR(_plan_local_shuffle());
+    RETURN_IF_ERROR(_plan_local_shuffle());
 
     // 4. Initialize global states in pipelines.
     for (PipelinePtr& pipeline : _pipelines) {
@@ -254,7 +254,34 @@ Status PipelineXFragmentContext::_plan_local_shuffle() {
         auto& children = _pipelines[pip_idx]->children();
         if (children.empty()) {
             _pipelines[pip_idx]->init_need_to_local_shuffle_by_source();
-        } else {
+        } else if (children.size() == 1) {
+            
_pipelines[pip_idx]->set_need_to_local_shuffle(children[0]->need_to_local_shuffle());
+        }
+
+        int idx = 0;
+        bool do_local_exchange = false;
+        do {
+            auto& ops = _pipelines[pip_idx]->operator_xs();
+            do_local_exchange = false;
+            for (; idx < ops.size();) {
+                if (ops[idx]->get_local_exchange_type() != ExchangeType::NOOP) 
{
+                    RETURN_IF_ERROR(_add_local_exchange(
+                            idx, ops[idx]->node_id(), 
_runtime_state->obj_pool(),
+                            _pipelines[pip_idx], 
ops[idx]->get_local_shuffle_exprs(),
+                            ops[idx]->get_local_exchange_type(), 
&do_local_exchange));
+                }
+                if (do_local_exchange) {
+                    idx = 2;
+                    break;
+                }
+                idx++;
+            }
+        } while (do_local_exchange);
+        if (_pipelines[pip_idx]->sink_x()->get_local_exchange_type() != 
ExchangeType::NOOP) {
+            RETURN_IF_ERROR(_add_local_exchange(
+                    idx, _pipelines[pip_idx]->sink_x()->node_id(), 
_runtime_state->obj_pool(),
+                    _pipelines[pip_idx], 
_pipelines[pip_idx]->sink_x()->get_local_shuffle_exprs(),
+                    _pipelines[pip_idx]->sink_x()->get_local_exchange_type(), 
&do_local_exchange));
         }
     }
     return Status::OK();
@@ -443,19 +470,20 @@ Status PipelineXFragmentContext::_build_pipeline_tasks(
         _runtime_states[i]->set_total_load_streams(request.total_load_streams);
         _runtime_states[i]->set_num_local_sink(request.num_local_sink);
         std::map<PipelineId, PipelineXTask*> pipeline_id_to_task;
-        auto get_local_exchange_state =
-                [&](PipelinePtr pipeline) -> 
std::shared_ptr<LocalExchangeSharedState> {
+        auto get_local_exchange_state = [&](PipelinePtr pipeline)
+                -> std::map<int, std::shared_ptr<LocalExchangeSharedState>> {
+            std::map<int, std::shared_ptr<LocalExchangeSharedState>> 
le_state_map;
             auto source_id = pipeline->operator_xs().front()->operator_id();
             if (auto iter = _op_id_to_le_state.find(source_id); iter != 
_op_id_to_le_state.end()) {
-                return iter->second;
+                le_state_map.insert({source_id, iter->second});
             }
             for (auto sink_to_source_id : pipeline->sink_x()->dests_id()) {
                 if (auto iter = _op_id_to_le_state.find(sink_to_source_id);
                     iter != _op_id_to_le_state.end()) {
-                    return iter->second;
+                    le_state_map.insert({sink_to_source_id, iter->second});
                 }
             }
-            return nullptr;
+            return le_state_map;
         };
         for (auto& pipeline : _pipelines) {
             auto task = std::make_unique<PipelineXTask>(
@@ -606,49 +634,100 @@ Status 
PipelineXFragmentContext::_create_tree_helper(ObjectPool* pool,
     return Status::OK();
 }
 
-Status PipelineXFragmentContext::_add_local_exchange(ObjectPool* pool, 
OperatorXPtr& op,
-                                                     PipelinePtr& cur_pipe, 
const TPlanNode& tnode,
+Status PipelineXFragmentContext::_add_local_exchange(int idx, int node_id, 
ObjectPool* pool,
+                                                     PipelinePtr cur_pipe,
                                                      const std::vector<TExpr>& 
texprs,
-                                                     ExchangeType 
exchange_type) {
+                                                     ExchangeType 
exchange_type,
+                                                     bool* do_local_exchange) {
     if (!_runtime_state->enable_local_shuffle() || _num_instances <= 1) {
         return Status::OK();
     }
-    auto parent = op;
-    RETURN_IF_ERROR(parent->init(tnode, _runtime_state.get()));
-    auto local_exchange_id = next_operator_id();
-    op.reset(new LocalExchangeSourceOperatorX(pool, local_exchange_id, 
parent.get()));
-    RETURN_IF_ERROR(cur_pipe->add_operator(op));
-    RETURN_IF_ERROR(parent->set_child(op));
 
-    const auto downstream_pipeline_id = cur_pipe->id();
-    if (_dag.find(downstream_pipeline_id) == _dag.end()) {
-        _dag.insert({downstream_pipeline_id, {}});
+    if (!cur_pipe->need_to_local_shuffle() && exchange_type == 
ExchangeType::SHUFFLE) {
+        return Status::OK();
     }
-    cur_pipe = add_pipeline();
-    _dag[downstream_pipeline_id].push_back(cur_pipe->id());
+    *do_local_exchange = true;
+
+    auto& operator_xs = cur_pipe->operator_xs();
+    auto total_op_num = operator_xs.size();
+    const auto downstream_pipeline_id = cur_pipe->id();
+    auto local_exchange_id = next_operator_id();
+    // 1. Create a new pipeline with local exchange sink.
+    auto new_pip = add_pipeline();
 
     DataSinkOperatorXPtr sink;
     sink.reset(new LocalExchangeSinkOperatorX(next_sink_operator_id(), 
local_exchange_id,
                                               _num_instances, texprs));
-    RETURN_IF_ERROR(cur_pipe->set_sink(sink));
+    RETURN_IF_ERROR(new_pip->set_sink(sink));
 
-    bool need_partitioner = false;
     auto shared_state = LocalExchangeSharedState::create_shared();
     shared_state->source_dependencies.resize(_num_instances, nullptr);
     switch (exchange_type) {
     case ExchangeType::SHUFFLE:
         shared_state->exchanger = 
ShuffleExchanger::create_unique(_num_instances);
-        need_partitioner = true;
+        new_pip->set_need_to_local_shuffle(false);
+        cur_pipe->set_need_to_local_shuffle(false);
         break;
     case ExchangeType::PASSTHROUGH:
         shared_state->exchanger = 
PassthroughExchanger::create_unique(_num_instances);
+        new_pip->set_need_to_local_shuffle(cur_pipe->need_to_local_shuffle());
+        cur_pipe->set_need_to_local_shuffle(true);
         break;
     default:
         return Status::InternalError("Unsupported local exchange type : " +
                                      std::to_string((int)exchange_type));
     }
-    RETURN_IF_ERROR(cur_pipe->sink_x()->init(need_partitioner));
+    RETURN_IF_ERROR(new_pip->sink_x()->init(exchange_type));
     _op_id_to_le_state.insert({local_exchange_id, shared_state});
+
+    // 2. Initialize operators list.
+    std::copy(operator_xs.begin(), operator_xs.begin() + idx,
+              std::inserter(new_pip->operator_xs(), 
new_pip->operator_xs().end()));
+
+    // 3. Erase operators in new pipeline.
+    OperatorXPtr source_op;
+    source_op.reset(new LocalExchangeSourceOperatorX(pool, local_exchange_id));
+    RETURN_IF_ERROR(source_op->init(exchange_type));
+    operator_xs.erase(operator_xs.begin(), operator_xs.begin() + idx);
+    if (operator_xs.size() > 0) {
+        RETURN_IF_ERROR(operator_xs.front()->set_child(source_op));
+    }
+
+    operator_xs.insert(operator_xs.begin(), source_op);
+    RETURN_IF_ERROR(source_op->set_child(new_pip->operator_xs().back()));
+
+    std::vector<std::shared_ptr<Pipeline>> new_children;
+    std::vector<PipelineId> edges_with_source;
+    for (auto child : cur_pipe->children()) {
+        bool found = false;
+        for (auto op : new_pip->operator_xs()) {
+            if (child->sink_x()->node_id() == op->node_id()) {
+                new_pip->set_children(child);
+                found = true;
+            };
+        }
+        if (!found) {
+            new_children.push_back(child);
+            edges_with_source.push_back(child->id());
+        }
+    }
+    new_children.push_back(new_pip);
+    edges_with_source.push_back(new_pip->id());
+
+    if (!new_pip->children().empty()) {
+        std::vector<PipelineId> edges_with_sink;
+        for (auto child : new_pip->children()) {
+            edges_with_sink.push_back(child->id());
+        }
+        _dag.insert({new_pip->id(), edges_with_sink});
+    }
+    cur_pipe->set_children(new_children);
+    _dag[downstream_pipeline_id] = edges_with_source;
+
+    CHECK(total_op_num + 1 == cur_pipe->operator_xs().size() + 
new_pip->operator_xs().size())
+            << "total_op_num: " << total_op_num
+            << " cur_pipe->operator_xs().size(): " << 
cur_pipe->operator_xs().size()
+            << " new_pip->operator_xs().size(): " << 
new_pip->operator_xs().size();
     return Status::OK();
 }
 
@@ -704,7 +783,7 @@ Status 
PipelineXFragmentContext::_create_operator(ObjectPool* pool, const TPlanN
             if (_dag.find(downstream_pipeline_id) == _dag.end()) {
                 _dag.insert({downstream_pipeline_id, {}});
             }
-            cur_pipe = add_pipeline();
+            cur_pipe = add_pipeline(cur_pipe);
             _dag[downstream_pipeline_id].push_back(cur_pipe->id());
             DataSinkOperatorXPtr sink;
             sink.reset(new DistinctStreamingAggSinkOperatorX(pool, 
next_sink_operator_id(), tnode,
@@ -712,10 +791,6 @@ Status 
PipelineXFragmentContext::_create_operator(ObjectPool* pool, const TPlanN
             sink->set_dests_id({op->operator_id()});
             RETURN_IF_ERROR(cur_pipe->set_sink(sink));
             RETURN_IF_ERROR(cur_pipe->sink_x()->init(tnode, 
_runtime_state.get()));
-
-            //            RETURN_IF_ERROR(_add_local_exchange(pool, op, 
cur_pipe, tnode,
-            //                                                
tnode.agg_node.grouping_exprs,
-            //                                                
ExchangeType::PASSTHROUGH));
         } else if (tnode.agg_node.__isset.use_streaming_preaggregation &&
                    tnode.agg_node.use_streaming_preaggregation) {
             op.reset(new StreamingAggSourceOperatorX(pool, tnode, 
next_operator_id(), descs));
@@ -725,7 +800,7 @@ Status 
PipelineXFragmentContext::_create_operator(ObjectPool* pool, const TPlanN
             if (_dag.find(downstream_pipeline_id) == _dag.end()) {
                 _dag.insert({downstream_pipeline_id, {}});
             }
-            cur_pipe = add_pipeline();
+            cur_pipe = add_pipeline(cur_pipe);
             _dag[downstream_pipeline_id].push_back(cur_pipe->id());
             DataSinkOperatorXPtr sink;
             sink.reset(new StreamingAggSinkOperatorX(pool, 
next_sink_operator_id(), tnode, descs));
@@ -733,9 +808,6 @@ Status 
PipelineXFragmentContext::_create_operator(ObjectPool* pool, const TPlanN
             RETURN_IF_ERROR(cur_pipe->set_sink(sink));
             RETURN_IF_ERROR(cur_pipe->sink_x()->init(tnode, 
_runtime_state.get()));
 
-            //            RETURN_IF_ERROR(_add_local_exchange(pool, op, 
cur_pipe, tnode,
-            //                                                
tnode.agg_node.grouping_exprs,
-            //                                                
ExchangeType::PASSTHROUGH));
         } else {
             op.reset(new AggSourceOperatorX(pool, tnode, next_operator_id(), 
descs));
             RETURN_IF_ERROR(cur_pipe->add_operator(op));
@@ -744,7 +816,7 @@ Status 
PipelineXFragmentContext::_create_operator(ObjectPool* pool, const TPlanN
             if (_dag.find(downstream_pipeline_id) == _dag.end()) {
                 _dag.insert({downstream_pipeline_id, {}});
             }
-            cur_pipe = add_pipeline();
+            cur_pipe = add_pipeline(cur_pipe);
             _dag[downstream_pipeline_id].push_back(cur_pipe->id());
 
             DataSinkOperatorXPtr sink;
@@ -752,20 +824,6 @@ Status 
PipelineXFragmentContext::_create_operator(ObjectPool* pool, const TPlanN
             sink->set_dests_id({op->operator_id()});
             RETURN_IF_ERROR(cur_pipe->set_sink(sink));
             RETURN_IF_ERROR(cur_pipe->sink_x()->init(tnode, 
_runtime_state.get()));
-
-            //            if (tnode.agg_node.grouping_exprs.empty()) {
-            //                if (tnode.agg_node.need_finalize) {
-            //                    RETURN_IF_ERROR(_add_local_exchange(pool, 
op, cur_pipe, tnode,
-            //                                                        
tnode.agg_node.grouping_exprs,
-            //                                                        
ExchangeType::PASSTHROUGH));
-            //                } else {
-            //                    // TODO(gabriel): maybe use local shuffle
-            //                }
-            //            } else if (cur_pipe->need_to_local_shuffle()) {
-            //                RETURN_IF_ERROR(_add_local_exchange(pool, op, 
cur_pipe, tnode,
-            //                                                    
tnode.agg_node.grouping_exprs,
-            //                                                    
ExchangeType::SHUFFLE));
-            //            }
         }
         break;
     }
@@ -777,7 +835,7 @@ Status 
PipelineXFragmentContext::_create_operator(ObjectPool* pool, const TPlanN
         if (_dag.find(downstream_pipeline_id) == _dag.end()) {
             _dag.insert({downstream_pipeline_id, {}});
         }
-        PipelinePtr build_side_pipe = add_pipeline();
+        PipelinePtr build_side_pipe = add_pipeline(cur_pipe);
         _dag[downstream_pipeline_id].push_back(build_side_pipe->id());
 
         DataSinkOperatorXPtr sink;
@@ -786,20 +844,6 @@ Status 
PipelineXFragmentContext::_create_operator(ObjectPool* pool, const TPlanN
         RETURN_IF_ERROR(build_side_pipe->set_sink(sink));
         RETURN_IF_ERROR(build_side_pipe->sink_x()->init(tnode, 
_runtime_state.get()));
 
-        std::vector<TExpr> probe_exprs;
-        const std::vector<TEqJoinCondition>& eq_join_conjuncts =
-                tnode.hash_join_node.eq_join_conjuncts;
-        for (const auto& eq_join_conjunct : eq_join_conjuncts) {
-            probe_exprs.push_back(eq_join_conjunct.left);
-        }
-        if (tnode.hash_join_node.__isset.is_broadcast_join &&
-            tnode.hash_join_node.is_broadcast_join) {
-            RETURN_IF_ERROR(_add_local_exchange(pool, op, cur_pipe, tnode, 
probe_exprs,
-                                                ExchangeType::PASSTHROUGH));
-        } else if (cur_pipe->need_to_local_shuffle()) {
-            RETURN_IF_ERROR(_add_local_exchange(pool, op, cur_pipe, tnode, 
probe_exprs,
-                                                ExchangeType::SHUFFLE));
-        }
         _pipeline_parent_map.push(op->node_id(), cur_pipe);
         _pipeline_parent_map.push(op->node_id(), build_side_pipe);
         break;
@@ -812,7 +856,7 @@ Status 
PipelineXFragmentContext::_create_operator(ObjectPool* pool, const TPlanN
         if (_dag.find(downstream_pipeline_id) == _dag.end()) {
             _dag.insert({downstream_pipeline_id, {}});
         }
-        PipelinePtr build_side_pipe = add_pipeline();
+        PipelinePtr build_side_pipe = add_pipeline(cur_pipe);
         _dag[downstream_pipeline_id].push_back(build_side_pipe->id());
 
         DataSinkOperatorXPtr sink;
@@ -835,7 +879,7 @@ Status 
PipelineXFragmentContext::_create_operator(ObjectPool* pool, const TPlanN
             _dag.insert({downstream_pipeline_id, {}});
         }
         for (int i = 0; i < child_count; i++) {
-            PipelinePtr build_side_pipe = add_pipeline();
+            PipelinePtr build_side_pipe = add_pipeline(cur_pipe);
             _dag[downstream_pipeline_id].push_back(build_side_pipe->id());
             DataSinkOperatorXPtr sink;
             sink.reset(new UnionSinkOperatorX(i, next_sink_operator_id(), 
pool, tnode, descs));
@@ -855,7 +899,7 @@ Status 
PipelineXFragmentContext::_create_operator(ObjectPool* pool, const TPlanN
         if (_dag.find(downstream_pipeline_id) == _dag.end()) {
             _dag.insert({downstream_pipeline_id, {}});
         }
-        cur_pipe = add_pipeline();
+        cur_pipe = add_pipeline(cur_pipe);
         _dag[downstream_pipeline_id].push_back(cur_pipe->id());
 
         DataSinkOperatorXPtr sink;
@@ -873,7 +917,7 @@ Status 
PipelineXFragmentContext::_create_operator(ObjectPool* pool, const TPlanN
         if (_dag.find(downstream_pipeline_id) == _dag.end()) {
             _dag.insert({downstream_pipeline_id, {}});
         }
-        cur_pipe = add_pipeline();
+        cur_pipe = add_pipeline(cur_pipe);
         _dag[downstream_pipeline_id].push_back(cur_pipe->id());
 
         DataSinkOperatorXPtr sink;
@@ -891,7 +935,7 @@ Status 
PipelineXFragmentContext::_create_operator(ObjectPool* pool, const TPlanN
         if (_dag.find(downstream_pipeline_id) == _dag.end()) {
             _dag.insert({downstream_pipeline_id, {}});
         }
-        cur_pipe = add_pipeline();
+        cur_pipe = add_pipeline(cur_pipe);
         _dag[downstream_pipeline_id].push_back(cur_pipe->id());
 
         DataSinkOperatorXPtr sink;
@@ -971,7 +1015,7 @@ Status 
PipelineXFragmentContext::_build_operators_for_set_operation_node(
     }
 
     for (int child_id = 0; child_id < tnode.num_children; child_id++) {
-        PipelinePtr probe_side_pipe = add_pipeline();
+        PipelinePtr probe_side_pipe = add_pipeline(cur_pipe);
         _dag[downstream_pipeline_id].push_back(probe_side_pipe->id());
 
         DataSinkOperatorXPtr sink;
diff --git a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.h 
b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.h
index 1390f2a9544..b2239c49910 100644
--- a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.h
+++ b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.h
@@ -125,9 +125,9 @@ public:
 private:
     void _close_fragment_instance() override;
     Status _build_pipeline_tasks(const doris::TPipelineFragmentParams& 
request) override;
-    Status _add_local_exchange(ObjectPool* pool, OperatorXPtr& op, 
PipelinePtr& cur_pipe,
-                               const TPlanNode& tnode, const 
std::vector<TExpr>& texprs,
-                               ExchangeType exchange_type);
+    Status _add_local_exchange(int idx, int node_id, ObjectPool* pool, 
PipelinePtr cur_pipe,
+                               const std::vector<TExpr>& texprs, ExchangeType 
exchange_type,
+                               bool* do_local_exchange);
 
     [[nodiscard]] Status _build_pipelines(ObjectPool* pool,
                                           const 
doris::TPipelineFragmentParams& request,
diff --git a/be/src/pipeline/pipeline_x/pipeline_x_task.cpp 
b/be/src/pipeline/pipeline_x/pipeline_x_task.cpp
index 3495d8eb273..559a76b8e65 100644
--- a/be/src/pipeline/pipeline_x/pipeline_x_task.cpp
+++ b/be/src/pipeline/pipeline_x/pipeline_x_task.cpp
@@ -47,14 +47,14 @@ namespace doris::pipeline {
 PipelineXTask::PipelineXTask(PipelinePtr& pipeline, uint32_t task_id, 
RuntimeState* state,
                              PipelineFragmentContext* fragment_context,
                              RuntimeProfile* parent_profile,
-                             std::shared_ptr<LocalExchangeSharedState> 
local_exchange_state,
+                             std::map<int, 
std::shared_ptr<LocalExchangeSharedState>> le_state_map,
                              int task_idx)
         : PipelineTask(pipeline, task_id, state, fragment_context, 
parent_profile),
           _operators(pipeline->operator_xs()),
           _source(_operators.front()),
           _root(_operators.back()),
           _sink(pipeline->sink_shared_pointer()),
-          _local_exchange_state(local_exchange_state),
+          _le_state_map(le_state_map),
           _task_idx(task_idx),
           _execution_dep(state->get_query_ctx()->get_execution_dependency()) {
     _pipeline_task_watcher.start();
@@ -76,7 +76,7 @@ Status PipelineXTask::prepare(RuntimeState* state, const 
TPipelineInstanceParams
     {
         // set sink local state
         LocalSinkStateInfo info {_parent_profile, local_params.sender_id,
-                                 get_downstream_dependency(), 
_local_exchange_state, tsink};
+                                 get_downstream_dependency(), _le_state_map, 
tsink};
         RETURN_IF_ERROR(_sink->setup_local_state(state, info));
     }
 
@@ -87,9 +87,8 @@ Status PipelineXTask::prepare(RuntimeState* state, const 
TPipelineInstanceParams
     for (int op_idx = _operators.size() - 1; op_idx >= 0; op_idx--) {
         auto& op = _operators[op_idx];
         auto& deps = get_upstream_dependency(op->operator_id());
-        LocalStateInfo info {parent_profile, scan_ranges,
-                             deps,           _local_exchange_state,
-                             _task_idx,      
_source_dependency[op->operator_id()]};
+        LocalStateInfo info {parent_profile, scan_ranges, deps,
+                             _le_state_map,  _task_idx,   
_source_dependency[op->operator_id()]};
         RETURN_IF_ERROR(op->setup_local_state(state, info));
         parent_profile = state->get_local_state(op->operator_id())->profile();
     }
@@ -292,7 +291,7 @@ void PipelineXTask::finalize() {
     std::vector<DependencySPtr> {}.swap(_downstream_dependency);
     DependencyMap {}.swap(_upstream_dependency);
 
-    _local_exchange_state = nullptr;
+    _le_state_map.clear();
 }
 
 Status PipelineXTask::try_close(Status exec_status) {
diff --git a/be/src/pipeline/pipeline_x/pipeline_x_task.h 
b/be/src/pipeline/pipeline_x/pipeline_x_task.h
index e4ee71914b5..2a5193d3c4b 100644
--- a/be/src/pipeline/pipeline_x/pipeline_x_task.h
+++ b/be/src/pipeline/pipeline_x/pipeline_x_task.h
@@ -52,7 +52,8 @@ class PipelineXTask : public PipelineTask {
 public:
     PipelineXTask(PipelinePtr& pipeline, uint32_t task_id, RuntimeState* state,
                   PipelineFragmentContext* fragment_context, RuntimeProfile* 
parent_profile,
-                  std::shared_ptr<LocalExchangeSharedState> 
local_exchange_state, int task_idx);
+                  std::map<int, std::shared_ptr<LocalExchangeSharedState>> 
le_state_map,
+                  int task_idx);
 
     Status prepare(RuntimeState* state) override {
         return Status::InternalError("Should not reach here!");
@@ -209,7 +210,7 @@ private:
     DependencyMap _upstream_dependency;
     std::map<int, DependencySPtr> _source_dependency;
     std::vector<DependencySPtr> _downstream_dependency;
-    std::shared_ptr<LocalExchangeSharedState> _local_exchange_state;
+    std::map<int, std::shared_ptr<LocalExchangeSharedState>> _le_state_map;
     int _task_idx;
     bool _dry_run = false;
 
diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp
index d4f11d25e2f..40404423e4c 100644
--- a/be/src/runtime/fragment_mgr.cpp
+++ b/be/src/runtime/fragment_mgr.cpp
@@ -844,6 +844,7 @@ Status FragmentMgr::exec_plan_fragment(const 
TPipelineFragmentParams& params,
             SCOPED_RAW_TIMER(&duration_ns);
             auto prepare_st = context->prepare(params);
             if (!prepare_st.ok()) {
+                LOG(WARNING) << "Prepare failed: " << prepare_st.to_string();
                 context->close_if_prepare_failed();
                 return prepare_st;
             }
@@ -923,6 +924,7 @@ Status FragmentMgr::exec_plan_fragment(const 
TPipelineFragmentParams& params,
                 SCOPED_RAW_TIMER(&duration_ns);
                 auto prepare_st = context->prepare(params, i);
                 if (!prepare_st.ok()) {
+                    LOG(WARNING) << "Prepare failed: " << 
prepare_st.to_string();
                     context->close_if_prepare_failed();
                     static_cast<void>(context->update_status(prepare_st));
                     return prepare_st;
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java
index 6eb9f2d75e9..72e56d09c4e 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java
@@ -310,6 +310,7 @@ public class PhysicalPlanTranslator extends 
DefaultPlanVisitor<PlanFragment, Pla
             }
         }
         DataPartition dataPartition = 
toDataPartition(distribute.getDistributionSpec(), validOutputIds, context);
+        exchangeNode.setPartitionType(dataPartition.getType());
         PlanFragment parentFragment = new 
PlanFragment(context.nextFragmentId(), exchangeNode, dataPartition);
         
exchangeNode.setNumInstances(inputFragment.getPlanRoot().getNumInstances());
         if (distribute.getDistributionSpec() instanceof 
DistributionSpecGather) {
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/planner/ExchangeNode.java 
b/fe/fe-core/src/main/java/org/apache/doris/planner/ExchangeNode.java
index bde891a8351..65a366fde16 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/ExchangeNode.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/ExchangeNode.java
@@ -29,6 +29,7 @@ import org.apache.doris.statistics.StatisticalType;
 import org.apache.doris.statistics.StatsRecursiveDerive;
 import org.apache.doris.thrift.TExchangeNode;
 import org.apache.doris.thrift.TExplainLevel;
+import org.apache.doris.thrift.TPartitionType;
 import org.apache.doris.thrift.TPlanNode;
 import org.apache.doris.thrift.TPlanNodeType;
 
@@ -64,6 +65,7 @@ public class ExchangeNode extends PlanNode {
     private SortInfo mergeInfo;
 
     private boolean isRightChildOfBroadcastHashJoin = false;
+    private TPartitionType partitionType;
 
     /**
      * use for Nereids only.
@@ -77,6 +79,10 @@ public class ExchangeNode extends PlanNode {
         computeTupleIds();
     }
 
+    public void setPartitionType(TPartitionType partitionType) {
+        this.partitionType = partitionType;
+    }
+
     /**
      * Create ExchangeNode that consumes output of inputNode.
      * An ExchangeNode doesn't have an input node as a child, which is why we
@@ -171,6 +177,7 @@ public class ExchangeNode extends PlanNode {
             msg.exchange_node.setSortInfo(mergeInfo.toThrift());
         }
         msg.exchange_node.setOffset(offset);
+        msg.exchange_node.setPartitionType(partitionType);
     }
 
     @Override
diff --git a/gensrc/thrift/PlanNodes.thrift b/gensrc/thrift/PlanNodes.thrift
index 778604ce7d5..09e3f4788d6 100644
--- a/gensrc/thrift/PlanNodes.thrift
+++ b/gensrc/thrift/PlanNodes.thrift
@@ -1028,6 +1028,8 @@ struct TExchangeNode {
   2: optional TSortInfo sort_info
   // This is tHe number of rows to skip before returning results
   3: optional i64 offset
+  // Shuffle partition type
+  4: optional Partitions.TPartitionType partition_type
 }
 
 struct TOlapRewriteNode {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to