This is an automated email from the ASF dual-hosted git repository.

gabriellee pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new 7769c68d682 [branch-3.0](pick) Pick #41789 #42482 #41210 #42460 
(#42914)
7769c68d682 is described below

commit 7769c68d6829170b560a77f3e536b0e6978d4a58
Author: Gabriel <[email protected]>
AuthorDate: Thu Oct 31 17:31:41 2024 +0800

    [branch-3.0](pick) Pick #41789 #42482 #41210 #42460 (#42914)
---
 be/src/pipeline/dependency.h                       |   8 +-
 be/src/pipeline/exec/aggregation_sink_operator.cpp |   5 +-
 be/src/pipeline/exec/aggregation_sink_operator.h   |   8 +-
 .../pipeline/exec/aggregation_source_operator.cpp  |   4 +-
 be/src/pipeline/exec/analytic_sink_operator.cpp    |   4 +-
 be/src/pipeline/exec/analytic_sink_operator.h      |   3 -
 be/src/pipeline/exec/analytic_source_operator.cpp  |   1 +
 be/src/pipeline/exec/assert_num_rows_operator.cpp  |   1 +
 be/src/pipeline/exec/datagen_operator.cpp          |   8 +-
 .../distinct_streaming_aggregation_operator.cpp    |   4 +-
 .../exec/distinct_streaming_aggregation_operator.h |   7 +-
 be/src/pipeline/exec/exchange_sink_operator.h      |   1 +
 be/src/pipeline/exec/exchange_source_operator.cpp  |   4 +-
 be/src/pipeline/exec/exchange_source_operator.h    |   2 +-
 be/src/pipeline/exec/hashjoin_build_sink.h         |   7 +-
 be/src/pipeline/exec/hashjoin_probe_operator.h     |   3 -
 be/src/pipeline/exec/join_build_sink_operator.cpp  |   2 +
 be/src/pipeline/exec/join_probe_operator.cpp       |   1 +
 .../exec/nested_loop_join_build_operator.h         |   4 +-
 .../exec/nested_loop_join_probe_operator.h         |   4 +-
 be/src/pipeline/exec/operator.cpp                  |  19 +-
 be/src/pipeline/exec/operator.h                    |  27 +-
 .../exec/partitioned_aggregation_sink_operator.h   |   3 -
 .../partitioned_aggregation_source_operator.cpp    |   4 +
 .../exec/partitioned_aggregation_source_operator.h |   2 +
 .../exec/partitioned_hash_join_probe_operator.h    |   3 -
 .../exec/partitioned_hash_join_sink_operator.h     |   3 -
 be/src/pipeline/exec/scan_operator.cpp             |   6 +-
 be/src/pipeline/exec/scan_operator.h               |   4 +-
 be/src/pipeline/exec/set_probe_sink_operator.h     |   2 -
 be/src/pipeline/exec/set_sink_operator.h           |   1 -
 be/src/pipeline/exec/sort_sink_operator.cpp        |   4 +-
 be/src/pipeline/exec/sort_sink_operator.h          |   4 +-
 be/src/pipeline/exec/sort_source_operator.cpp      |   4 +-
 be/src/pipeline/exec/union_source_operator.h       |   4 +-
 .../local_exchange_sink_operator.cpp               |   6 +-
 .../local_exchange/local_exchange_sink_operator.h  |   2 +-
 .../local_exchange_source_operator.h               |   3 -
 be/src/pipeline/local_exchange/local_exchanger.cpp |   2 +-
 be/src/pipeline/local_exchange/local_exchanger.h   |  11 +-
 be/src/pipeline/pipeline.cpp                       |  44 ++-
 be/src/pipeline/pipeline.h                         |  46 ++-
 be/src/pipeline/pipeline_fragment_context.cpp      | 322 ++++++++++-----------
 be/src/pipeline/pipeline_fragment_context.h        |  38 ++-
 .../aggregate_function_window.h                    |   7 +-
 .../org/apache/doris/planner/AggregationNode.java  |   6 +
 .../org/apache/doris/planner/AnalyticEvalNode.java |  11 +
 .../apache/doris/planner/AssertNumRowsNode.java    |   5 +
 .../org/apache/doris/planner/DataPartition.java    |   4 +
 .../org/apache/doris/planner/EmptySetNode.java     |   1 -
 .../org/apache/doris/planner/ExchangeNode.java     |  29 ++
 .../org/apache/doris/planner/JoinNodeBase.java     |   1 -
 .../apache/doris/planner/NestedLoopJoinNode.java   |  15 +
 .../org/apache/doris/planner/PlanFragment.java     |  20 ++
 .../java/org/apache/doris/planner/PlanNode.java    |  14 +
 .../java/org/apache/doris/planner/RepeatNode.java  |   6 +
 .../java/org/apache/doris/planner/ScanNode.java    |   7 +
 .../java/org/apache/doris/planner/SelectNode.java  |   6 +
 .../java/org/apache/doris/planner/SortNode.java    |   6 +
 .../java/org/apache/doris/planner/UnionNode.java   |   7 +
 .../main/java/org/apache/doris/qe/Coordinator.java |  37 ++-
 .../java/org/apache/doris/qe/SessionVariable.java  |   4 +-
 gensrc/thrift/PlanNodes.thrift                     |   1 +
 .../insert_into_table/complex_insert.groovy        |   6 +-
 .../distribute/local_shuffle.groovy                |  14 +-
 65 files changed, 505 insertions(+), 347 deletions(-)

diff --git a/be/src/pipeline/dependency.h b/be/src/pipeline/dependency.h
index 9364170898d..619dd2d2aa3 100644
--- a/be/src/pipeline/dependency.h
+++ b/be/src/pipeline/dependency.h
@@ -110,19 +110,19 @@ public:
     // Notify downstream pipeline tasks this dependency is ready.
     void set_ready();
     void set_ready_to_read() {
-        DCHECK(_shared_state->source_deps.size() == 1) << debug_string();
+        DCHECK_EQ(_shared_state->source_deps.size(), 1) << debug_string();
         _shared_state->source_deps.front()->set_ready();
     }
     void set_block_to_read() {
-        DCHECK(_shared_state->source_deps.size() == 1) << debug_string();
+        DCHECK_EQ(_shared_state->source_deps.size(), 1) << debug_string();
         _shared_state->source_deps.front()->block();
     }
     void set_ready_to_write() {
-        DCHECK(_shared_state->sink_deps.size() == 1) << debug_string();
+        DCHECK_EQ(_shared_state->sink_deps.size(), 1) << debug_string();
         _shared_state->sink_deps.front()->set_ready();
     }
     void set_block_to_write() {
-        DCHECK(_shared_state->sink_deps.size() == 1) << debug_string();
+        DCHECK_EQ(_shared_state->sink_deps.size(), 1) << debug_string();
         _shared_state->sink_deps.front()->block();
     }
 
diff --git a/be/src/pipeline/exec/aggregation_sink_operator.cpp 
b/be/src/pipeline/exec/aggregation_sink_operator.cpp
index 260a599a947..f7585e50422 100644
--- a/be/src/pipeline/exec/aggregation_sink_operator.cpp
+++ b/be/src/pipeline/exec/aggregation_sink_operator.cpp
@@ -725,7 +725,10 @@ AggSinkOperatorX::AggSinkOperatorX(ObjectPool* pool, int 
operator_id, const TPla
                                    : tnode.agg_node.grouping_exprs),
           _is_colocate(tnode.agg_node.__isset.is_colocate && 
tnode.agg_node.is_colocate),
           _require_bucket_distribution(require_bucket_distribution),
-          _agg_fn_output_row_descriptor(descs, tnode.row_tuples, 
tnode.nullable_tuples) {}
+          _agg_fn_output_row_descriptor(descs, tnode.row_tuples, 
tnode.nullable_tuples),
+          _without_key(tnode.agg_node.grouping_exprs.empty()) {
+    _is_serial_operator = tnode.__isset.is_serial_operator && 
tnode.is_serial_operator;
+}
 
 Status AggSinkOperatorX::init(const TPlanNode& tnode, RuntimeState* state) {
     RETURN_IF_ERROR(DataSinkOperatorX<AggSinkLocalState>::init(tnode, state));
diff --git a/be/src/pipeline/exec/aggregation_sink_operator.h 
b/be/src/pipeline/exec/aggregation_sink_operator.h
index 7c146c38a2b..0b78ab15d2a 100644
--- a/be/src/pipeline/exec/aggregation_sink_operator.h
+++ b/be/src/pipeline/exec/aggregation_sink_operator.h
@@ -143,9 +143,8 @@ public:
 
     DataDistribution required_data_distribution() const override {
         if (_probe_expr_ctxs.empty()) {
-            return _needs_finalize || 
DataSinkOperatorX<AggSinkLocalState>::_child
-                                              ->ignore_data_distribution()
-                           ? DataDistribution(ExchangeType::PASSTHROUGH)
+            return _needs_finalize
+                           ? DataDistribution(ExchangeType::NOOP)
                            : 
DataSinkOperatorX<AggSinkLocalState>::required_data_distribution();
         }
         return _is_colocate && _require_bucket_distribution && 
!_followed_by_shuffled_operator
@@ -153,7 +152,6 @@ public:
                        : DataDistribution(ExchangeType::HASH_SHUFFLE, 
_partition_exprs);
     }
     bool require_data_distribution() const override { return _is_colocate; }
-    bool require_shuffled_data_distribution() const override { return 
!_probe_expr_ctxs.empty(); }
     size_t get_revocable_mem_size(RuntimeState* state) const;
 
     AggregatedDataVariants* get_agg_data(RuntimeState* state) {
@@ -204,8 +202,8 @@ protected:
     const std::vector<TExpr> _partition_exprs;
     const bool _is_colocate;
     const bool _require_bucket_distribution;
-
     RowDescriptor _agg_fn_output_row_descriptor;
+    const bool _without_key;
 };
 
 } // namespace doris::pipeline
diff --git a/be/src/pipeline/exec/aggregation_source_operator.cpp 
b/be/src/pipeline/exec/aggregation_source_operator.cpp
index fe03eba4102..9cc35923d08 100644
--- a/be/src/pipeline/exec/aggregation_source_operator.cpp
+++ b/be/src/pipeline/exec/aggregation_source_operator.cpp
@@ -440,7 +440,9 @@ AggSourceOperatorX::AggSourceOperatorX(ObjectPool* pool, 
const TPlanNode& tnode,
                                        const DescriptorTbl& descs)
         : Base(pool, tnode, operator_id, descs),
           _needs_finalize(tnode.agg_node.need_finalize),
-          _without_key(tnode.agg_node.grouping_exprs.empty()) {}
+          _without_key(tnode.agg_node.grouping_exprs.empty()) {
+    _is_serial_operator = tnode.__isset.is_serial_operator && 
tnode.is_serial_operator;
+}
 
 Status AggSourceOperatorX::get_block(RuntimeState* state, vectorized::Block* 
block, bool* eos) {
     auto& local_state = get_local_state(state);
diff --git a/be/src/pipeline/exec/analytic_sink_operator.cpp 
b/be/src/pipeline/exec/analytic_sink_operator.cpp
index 85d7773bdbd..11652db0a85 100644
--- a/be/src/pipeline/exec/analytic_sink_operator.cpp
+++ b/be/src/pipeline/exec/analytic_sink_operator.cpp
@@ -201,7 +201,9 @@ AnalyticSinkOperatorX::AnalyticSinkOperatorX(ObjectPool* 
pool, int operator_id,
           _require_bucket_distribution(require_bucket_distribution),
           _partition_exprs(tnode.__isset.distribute_expr_lists && 
require_bucket_distribution
                                    ? tnode.distribute_expr_lists[0]
-                                   : tnode.analytic_node.partition_exprs) {}
+                                   : tnode.analytic_node.partition_exprs) {
+    _is_serial_operator = tnode.__isset.is_serial_operator && 
tnode.is_serial_operator;
+}
 
 Status AnalyticSinkOperatorX::init(const TPlanNode& tnode, RuntimeState* 
state) {
     RETURN_IF_ERROR(DataSinkOperatorX::init(tnode, state));
diff --git a/be/src/pipeline/exec/analytic_sink_operator.h 
b/be/src/pipeline/exec/analytic_sink_operator.h
index ee305a877f5..7ef650c4383 100644
--- a/be/src/pipeline/exec/analytic_sink_operator.h
+++ b/be/src/pipeline/exec/analytic_sink_operator.h
@@ -88,9 +88,6 @@ public:
     }
 
     bool require_data_distribution() const override { return true; }
-    bool require_shuffled_data_distribution() const override {
-        return !_partition_by_eq_expr_ctxs.empty();
-    }
 
 private:
     Status _insert_range_column(vectorized::Block* block, const 
vectorized::VExprContextSPtr& expr,
diff --git a/be/src/pipeline/exec/analytic_source_operator.cpp 
b/be/src/pipeline/exec/analytic_source_operator.cpp
index b9e48727656..866e5b8119e 100644
--- a/be/src/pipeline/exec/analytic_source_operator.cpp
+++ b/be/src/pipeline/exec/analytic_source_operator.cpp
@@ -475,6 +475,7 @@ 
AnalyticSourceOperatorX::AnalyticSourceOperatorX(ObjectPool* pool, const TPlanNo
           _has_range_window(tnode.analytic_node.window.type == 
TAnalyticWindowType::RANGE),
           _has_window_start(tnode.analytic_node.window.__isset.window_start),
           _has_window_end(tnode.analytic_node.window.__isset.window_end) {
+    _is_serial_operator = tnode.__isset.is_serial_operator && 
tnode.is_serial_operator;
     _fn_scope = AnalyticFnScope::PARTITION;
     if (tnode.analytic_node.__isset.window &&
         tnode.analytic_node.window.type == TAnalyticWindowType::RANGE) {
diff --git a/be/src/pipeline/exec/assert_num_rows_operator.cpp 
b/be/src/pipeline/exec/assert_num_rows_operator.cpp
index 5aa27b51c45..563c4bf49ca 100644
--- a/be/src/pipeline/exec/assert_num_rows_operator.cpp
+++ b/be/src/pipeline/exec/assert_num_rows_operator.cpp
@@ -27,6 +27,7 @@ AssertNumRowsOperatorX::AssertNumRowsOperatorX(ObjectPool* 
pool, const TPlanNode
         : StreamingOperatorX<AssertNumRowsLocalState>(pool, tnode, 
operator_id, descs),
           _desired_num_rows(tnode.assert_num_rows_node.desired_num_rows),
           _subquery_string(tnode.assert_num_rows_node.subquery_string) {
+    _is_serial_operator = true;
     if (tnode.assert_num_rows_node.__isset.assertion) {
         _assertion = tnode.assert_num_rows_node.assertion;
     } else {
diff --git a/be/src/pipeline/exec/datagen_operator.cpp 
b/be/src/pipeline/exec/datagen_operator.cpp
index 93b3d058154..ac86db03d19 100644
--- a/be/src/pipeline/exec/datagen_operator.cpp
+++ b/be/src/pipeline/exec/datagen_operator.cpp
@@ -36,7 +36,9 @@ DataGenSourceOperatorX::DataGenSourceOperatorX(ObjectPool* 
pool, const TPlanNode
         : OperatorX<DataGenLocalState>(pool, tnode, operator_id, descs),
           _tuple_id(tnode.data_gen_scan_node.tuple_id),
           _tuple_desc(nullptr),
-          _runtime_filter_descs(tnode.runtime_filters) {}
+          _runtime_filter_descs(tnode.runtime_filters) {
+    _is_serial_operator = tnode.__isset.is_serial_operator && 
tnode.is_serial_operator;
+}
 
 Status DataGenSourceOperatorX::init(const TPlanNode& tnode, RuntimeState* 
state) {
     RETURN_IF_ERROR(OperatorX<DataGenLocalState>::init(tnode, state));
@@ -87,8 +89,8 @@ Status DataGenLocalState::init(RuntimeState* state, 
LocalStateInfo& info) {
     // TODO: use runtime filter to filte result block, maybe this node need 
derive from vscan_node.
     for (const auto& filter_desc : p._runtime_filter_descs) {
         std::shared_ptr<IRuntimeFilter> runtime_filter;
-        RETURN_IF_ERROR(state->register_consumer_runtime_filter(
-                filter_desc, p.ignore_data_distribution(), p.node_id(), 
&runtime_filter));
+        RETURN_IF_ERROR(state->register_consumer_runtime_filter(filter_desc, 
p.is_serial_operator(),
+                                                                p.node_id(), 
&runtime_filter));
         runtime_filter->init_profile(_runtime_profile.get());
     }
     return Status::OK();
diff --git a/be/src/pipeline/exec/distinct_streaming_aggregation_operator.cpp 
b/be/src/pipeline/exec/distinct_streaming_aggregation_operator.cpp
index 70b73225f06..f61602f576b 100644
--- a/be/src/pipeline/exec/distinct_streaming_aggregation_operator.cpp
+++ b/be/src/pipeline/exec/distinct_streaming_aggregation_operator.cpp
@@ -334,7 +334,9 @@ 
DistinctStreamingAggOperatorX::DistinctStreamingAggOperatorX(ObjectPool* pool, i
                                    ? tnode.distribute_expr_lists[0]
                                    : tnode.agg_node.grouping_exprs),
           _is_colocate(tnode.agg_node.__isset.is_colocate && 
tnode.agg_node.is_colocate),
-          _require_bucket_distribution(require_bucket_distribution) {
+          _require_bucket_distribution(require_bucket_distribution),
+          _without_key(tnode.agg_node.grouping_exprs.empty()) {
+    _is_serial_operator = tnode.__isset.is_serial_operator && 
tnode.is_serial_operator;
     if (tnode.agg_node.__isset.use_streaming_preaggregation) {
         _is_streaming_preagg = tnode.agg_node.use_streaming_preaggregation;
         if (_is_streaming_preagg) {
diff --git a/be/src/pipeline/exec/distinct_streaming_aggregation_operator.h 
b/be/src/pipeline/exec/distinct_streaming_aggregation_operator.h
index edeb4321763..97df1a6fcbe 100644
--- a/be/src/pipeline/exec/distinct_streaming_aggregation_operator.h
+++ b/be/src/pipeline/exec/distinct_streaming_aggregation_operator.h
@@ -104,6 +104,9 @@ public:
     bool need_more_input_data(RuntimeState* state) const override;
 
     DataDistribution required_data_distribution() const override {
+        if (_needs_finalize && _probe_expr_ctxs.empty()) {
+            return {ExchangeType::NOOP};
+        }
         if (_needs_finalize || (!_probe_expr_ctxs.empty() && 
!_is_streaming_preagg)) {
             return _is_colocate && _require_bucket_distribution && 
!_followed_by_shuffled_operator
                            ? 
DataDistribution(ExchangeType::BUCKET_HASH_SHUFFLE, _partition_exprs)
@@ -113,9 +116,6 @@ public:
     }
 
     bool require_data_distribution() const override { return _is_colocate; }
-    bool require_shuffled_data_distribution() const override {
-        return _needs_finalize || (!_probe_expr_ctxs.empty() && 
!_is_streaming_preagg);
-    }
 
 private:
     friend class DistinctStreamingAggLocalState;
@@ -136,6 +136,7 @@ private:
     /// The total size of the row from the aggregate functions.
     size_t _total_size_of_aggregate_states = 0;
     bool _is_streaming_preagg = false;
+    const bool _without_key;
 };
 
 } // namespace pipeline
diff --git a/be/src/pipeline/exec/exchange_sink_operator.h 
b/be/src/pipeline/exec/exchange_sink_operator.h
index c60cefabfa8..3bd905fdde6 100644
--- a/be/src/pipeline/exec/exchange_sink_operator.h
+++ b/be/src/pipeline/exec/exchange_sink_operator.h
@@ -221,6 +221,7 @@ public:
     Status serialize_block(ExchangeSinkLocalState& stete, vectorized::Block* 
src, PBlock* dest,
                            int num_receivers = 1);
     DataDistribution required_data_distribution() const override;
+    bool is_serial_operator() const override { return true; }
 
 private:
     friend class ExchangeSinkLocalState;
diff --git a/be/src/pipeline/exec/exchange_source_operator.cpp 
b/be/src/pipeline/exec/exchange_source_operator.cpp
index cf2055ec47b..0f72b197a51 100644
--- a/be/src/pipeline/exec/exchange_source_operator.cpp
+++ b/be/src/pipeline/exec/exchange_source_operator.cpp
@@ -104,7 +104,9 @@ 
ExchangeSourceOperatorX::ExchangeSourceOperatorX(ObjectPool* pool, const TPlanNo
                           std::vector<bool>(tnode.nullable_tuples.begin(),
                                             tnode.nullable_tuples.begin() +
                                                     
tnode.exchange_node.input_row_tuples.size())),
-          _offset(tnode.exchange_node.__isset.offset ? 
tnode.exchange_node.offset : 0) {}
+          _offset(tnode.exchange_node.__isset.offset ? 
tnode.exchange_node.offset : 0) {
+    _is_serial_operator = tnode.__isset.is_serial_operator && 
tnode.is_serial_operator;
+}
 
 Status ExchangeSourceOperatorX::init(const TPlanNode& tnode, RuntimeState* 
state) {
     RETURN_IF_ERROR(OperatorX<ExchangeLocalState>::init(tnode, state));
diff --git a/be/src/pipeline/exec/exchange_source_operator.h 
b/be/src/pipeline/exec/exchange_source_operator.h
index 0fe3dcbb590..c8ef674d269 100644
--- a/be/src/pipeline/exec/exchange_source_operator.h
+++ b/be/src/pipeline/exec/exchange_source_operator.h
@@ -81,7 +81,7 @@ public:
     [[nodiscard]] bool is_merging() const { return _is_merging; }
 
     DataDistribution required_data_distribution() const override {
-        if (OperatorX<ExchangeLocalState>::ignore_data_distribution()) {
+        if (OperatorX<ExchangeLocalState>::is_serial_operator()) {
             return {ExchangeType::NOOP};
         }
         return _partition_type == TPartitionType::HASH_PARTITIONED
diff --git a/be/src/pipeline/exec/hashjoin_build_sink.h 
b/be/src/pipeline/exec/hashjoin_build_sink.h
index 1ae9d5ae1a7..c56fb9bc9b1 100644
--- a/be/src/pipeline/exec/hashjoin_build_sink.h
+++ b/be/src/pipeline/exec/hashjoin_build_sink.h
@@ -132,8 +132,8 @@ public:
         if (_join_op == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN) {
             return {ExchangeType::NOOP};
         } else if (_is_broadcast_join) {
-            return _child->ignore_data_distribution() ? 
DataDistribution(ExchangeType::PASS_TO_ONE)
-                                                      : 
DataDistribution(ExchangeType::NOOP);
+            return _child->is_serial_operator() ? 
DataDistribution(ExchangeType::PASS_TO_ONE)
+                                                : 
DataDistribution(ExchangeType::NOOP);
         }
         return _join_distribution == TJoinDistributionType::BUCKET_SHUFFLE ||
                                _join_distribution == 
TJoinDistributionType::COLOCATE
@@ -141,9 +141,6 @@ public:
                        : DataDistribution(ExchangeType::HASH_SHUFFLE, 
_partition_exprs);
     }
 
-    bool require_shuffled_data_distribution() const override {
-        return _join_op != TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN && 
!_is_broadcast_join;
-    }
     bool is_shuffled_operator() const override {
         return _join_distribution == TJoinDistributionType::PARTITIONED;
     }
diff --git a/be/src/pipeline/exec/hashjoin_probe_operator.h 
b/be/src/pipeline/exec/hashjoin_probe_operator.h
index dde9c00dfe4..ad7e0d284cb 100644
--- a/be/src/pipeline/exec/hashjoin_probe_operator.h
+++ b/be/src/pipeline/exec/hashjoin_probe_operator.h
@@ -152,9 +152,6 @@ public:
                                   : 
DataDistribution(ExchangeType::HASH_SHUFFLE, _partition_exprs));
     }
 
-    bool require_shuffled_data_distribution() const override {
-        return _join_op != TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN && 
!_is_broadcast_join;
-    }
     bool is_shuffled_operator() const override {
         return _join_distribution == TJoinDistributionType::PARTITIONED;
     }
diff --git a/be/src/pipeline/exec/join_build_sink_operator.cpp 
b/be/src/pipeline/exec/join_build_sink_operator.cpp
index 2439dbc8fe1..a6f5f0f650a 100644
--- a/be/src/pipeline/exec/join_build_sink_operator.cpp
+++ b/be/src/pipeline/exec/join_build_sink_operator.cpp
@@ -82,6 +82,8 @@ 
JoinBuildSinkOperatorX<LocalStateType>::JoinBuildSinkOperatorX(ObjectPool* pool,
           _short_circuit_for_null_in_build_side(_join_op == 
TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN &&
                                                 !_is_mark_join),
           _runtime_filter_descs(tnode.runtime_filters) {
+    DataSinkOperatorX<LocalStateType>::_is_serial_operator =
+            tnode.__isset.is_serial_operator && tnode.is_serial_operator;
     _init_join_op();
     if (_is_mark_join) {
         DCHECK(_join_op == TJoinOp::LEFT_ANTI_JOIN || _join_op == 
TJoinOp::LEFT_SEMI_JOIN ||
diff --git a/be/src/pipeline/exec/join_probe_operator.cpp 
b/be/src/pipeline/exec/join_probe_operator.cpp
index 05c62544d2b..53dcb2c4cfc 100644
--- a/be/src/pipeline/exec/join_probe_operator.cpp
+++ b/be/src/pipeline/exec/join_probe_operator.cpp
@@ -220,6 +220,7 @@ 
JoinProbeOperatorX<LocalStateType>::JoinProbeOperatorX(ObjectPool* pool, const T
                                      : true)
 
           ) {
+    Base::_is_serial_operator = tnode.__isset.is_serial_operator && 
tnode.is_serial_operator;
     if (tnode.__isset.hash_join_node) {
         _intermediate_row_desc.reset(new RowDescriptor(
                 descs, tnode.hash_join_node.vintermediate_tuple_id_list,
diff --git a/be/src/pipeline/exec/nested_loop_join_build_operator.h 
b/be/src/pipeline/exec/nested_loop_join_build_operator.h
index f2ca259754b..d6e72799f97 100644
--- a/be/src/pipeline/exec/nested_loop_join_build_operator.h
+++ b/be/src/pipeline/exec/nested_loop_join_build_operator.h
@@ -76,8 +76,8 @@ public:
         if (_join_op == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN) {
             return {ExchangeType::NOOP};
         }
-        return _child->ignore_data_distribution() ? 
DataDistribution(ExchangeType::BROADCAST)
-                                                  : 
DataDistribution(ExchangeType::NOOP);
+        return _child->is_serial_operator() ? 
DataDistribution(ExchangeType::BROADCAST)
+                                            : 
DataDistribution(ExchangeType::NOOP);
     }
 
 private:
diff --git a/be/src/pipeline/exec/nested_loop_join_probe_operator.h 
b/be/src/pipeline/exec/nested_loop_join_probe_operator.h
index f46a99306a5..982498c7e2e 100644
--- a/be/src/pipeline/exec/nested_loop_join_probe_operator.h
+++ b/be/src/pipeline/exec/nested_loop_join_probe_operator.h
@@ -197,7 +197,9 @@ public:
     }
 
     DataDistribution required_data_distribution() const override {
-        if (_join_op == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN) {
+        if (_join_op == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN ||
+            _join_op == TJoinOp::RIGHT_OUTER_JOIN || _join_op == 
TJoinOp::RIGHT_ANTI_JOIN ||
+            _join_op == TJoinOp::RIGHT_SEMI_JOIN || _join_op == 
TJoinOp::FULL_OUTER_JOIN) {
             return {ExchangeType::NOOP};
         }
         return {ExchangeType::ADAPTIVE_PASSTHROUGH};
diff --git a/be/src/pipeline/exec/operator.cpp 
b/be/src/pipeline/exec/operator.cpp
index 4a93bac67fe..68ddd250019 100644
--- a/be/src/pipeline/exec/operator.cpp
+++ b/be/src/pipeline/exec/operator.cpp
@@ -74,6 +74,7 @@
 #include "pipeline/exec/union_source_operator.h"
 #include "pipeline/local_exchange/local_exchange_sink_operator.h"
 #include "pipeline/local_exchange/local_exchange_source_operator.h"
+#include "pipeline/pipeline.h"
 #include "util/debug_util.h"
 #include "util/runtime_profile.h"
 #include "util/string_util.h"
@@ -116,11 +117,16 @@ std::string 
PipelineXSinkLocalState<SharedStateArg>::name_suffix() {
     }() + ")";
 }
 
-DataDistribution DataSinkOperatorXBase::required_data_distribution() const {
-    return _child && _child->ignore_data_distribution()
+DataDistribution OperatorBase::required_data_distribution() const {
+    return _child && _child->is_serial_operator() && !is_source()
                    ? DataDistribution(ExchangeType::PASSTHROUGH)
                    : DataDistribution(ExchangeType::NOOP);
 }
+
+bool OperatorBase::require_shuffled_data_distribution() const {
+    return 
Pipeline::is_hash_exchange(required_data_distribution().distribution_type);
+}
+
 const RowDescriptor& OperatorBase::row_desc() const {
     return _child->row_desc();
 }
@@ -141,8 +147,9 @@ std::string 
PipelineXSinkLocalState<SharedStateArg>::debug_string(int indentatio
 
 std::string OperatorXBase::debug_string(int indentation_level) const {
     fmt::memory_buffer debug_string_buffer;
-    fmt::format_to(debug_string_buffer, "{}{}: id={}, parallel_tasks={}",
-                   std::string(indentation_level * 2, ' '), _op_name, 
node_id(), _parallel_tasks);
+    fmt::format_to(debug_string_buffer, "{}{}: id={}, parallel_tasks={}, 
_is_serial_operator={}",
+                   std::string(indentation_level * 2, ' '), _op_name, 
node_id(), _parallel_tasks,
+                   _is_serial_operator);
     return fmt::to_string(debug_string_buffer);
 }
 
@@ -354,8 +361,8 @@ void 
PipelineXLocalStateBase::reached_limit(vectorized::Block* block, bool* eos)
 std::string DataSinkOperatorXBase::debug_string(int indentation_level) const {
     fmt::memory_buffer debug_string_buffer;
 
-    fmt::format_to(debug_string_buffer, "{}{}: id={}", 
std::string(indentation_level * 2, ' '),
-                   _name, node_id());
+    fmt::format_to(debug_string_buffer, "{}{}: id={}, _is_serial_operator={}",
+                   std::string(indentation_level * 2, ' '), _name, node_id(), 
_is_serial_operator);
     return fmt::to_string(debug_string_buffer);
 }
 
diff --git a/be/src/pipeline/exec/operator.h b/be/src/pipeline/exec/operator.h
index b848aea6e1e..38cee083e2e 100644
--- a/be/src/pipeline/exec/operator.h
+++ b/be/src/pipeline/exec/operator.h
@@ -101,6 +101,9 @@ public:
         return Status::OK();
     }
 
+    // Operators need to be executed serially. (e.g. finalized agg without key)
+    [[nodiscard]] virtual bool is_serial_operator() const { return 
_is_serial_operator; }
+
     [[nodiscard]] bool is_closed() const { return _is_closed; }
 
     virtual size_t revocable_mem_size(RuntimeState* state) const { return 0; }
@@ -115,13 +118,15 @@ public:
         _followed_by_shuffled_operator = followed_by_shuffled_operator;
     }
     [[nodiscard]] virtual bool is_shuffled_operator() const { return false; }
-    [[nodiscard]] virtual bool require_shuffled_data_distribution() const { 
return false; }
+    [[nodiscard]] virtual DataDistribution required_data_distribution() const;
+    [[nodiscard]] virtual bool require_shuffled_data_distribution() const;
 
 protected:
     OperatorPtr _child = nullptr;
 
     bool _is_closed;
     bool _followed_by_shuffled_operator = false;
+    bool _is_serial_operator = false;
 };
 
 class PipelineXLocalStateBase {
@@ -443,7 +448,7 @@ public:
 
     Status init(const TDataSink& tsink) override;
     [[nodiscard]] virtual Status init(ExchangeType type, const int num_buckets,
-                                      const bool is_shuffled_hash_join,
+                                      const bool use_global_hash_shuffle,
                                       const std::map<int, int>& 
shuffle_idx_to_instance_idx) {
         return Status::InternalError("init() is only implemented in local 
exchange!");
     }
@@ -478,7 +483,6 @@ public:
     }
 
     [[nodiscard]] virtual std::shared_ptr<BasicSharedState> 
create_shared_state() const = 0;
-    [[nodiscard]] virtual DataDistribution required_data_distribution() const;
 
     Status close(RuntimeState* state) override {
         return Status::InternalError("Should not reach here!");
@@ -491,8 +495,6 @@ public:
 
     [[nodiscard]] bool is_sink() const override { return true; }
 
-    [[nodiscard]] bool is_source() const override { return false; }
-
     static Status close(RuntimeState* state, Status exec_status) {
         auto result = state->get_sink_local_state_result();
         if (!result) {
@@ -647,19 +649,7 @@ public:
         throw doris::Exception(ErrorCode::NOT_IMPLEMENTED_ERROR, _op_name);
     }
     [[nodiscard]] std::string get_name() const override { return _op_name; }
-    [[nodiscard]] virtual DataDistribution required_data_distribution() const {
-        return _child && _child->ignore_data_distribution() && !is_source()
-                       ? DataDistribution(ExchangeType::PASSTHROUGH)
-                       : DataDistribution(ExchangeType::NOOP);
-    }
-    [[nodiscard]] virtual bool ignore_data_distribution() const {
-        return _child ? _child->ignore_data_distribution() : 
_ignore_data_distribution;
-    }
-    [[nodiscard]] bool ignore_data_hash_distribution() const {
-        return _child ? _child->ignore_data_hash_distribution() : 
_ignore_data_distribution;
-    }
     [[nodiscard]] virtual bool need_more_input_data(RuntimeState* state) const 
{ return true; }
-    void set_ignore_data_distribution() { _ignore_data_distribution = true; }
 
     Status open(RuntimeState* state) override;
 
@@ -730,8 +720,6 @@ public:
 
     bool has_output_row_desc() const { return _output_row_descriptor != 
nullptr; }
 
-    [[nodiscard]] bool is_source() const override { return false; }
-
     [[nodiscard]] virtual Status get_block_after_projects(RuntimeState* state,
                                                           vectorized::Block* 
block, bool* eos);
 
@@ -774,7 +762,6 @@ protected:
     uint32_t _debug_point_count = 0;
 
     std::string _op_name;
-    bool _ignore_data_distribution = false;
     int _parallel_tasks = 0;
 
     //_keep_origin is used to avoid copying during projection,
diff --git a/be/src/pipeline/exec/partitioned_aggregation_sink_operator.h 
b/be/src/pipeline/exec/partitioned_aggregation_sink_operator.h
index 6b3a74c83df..15f6b22387a 100644
--- a/be/src/pipeline/exec/partitioned_aggregation_sink_operator.h
+++ b/be/src/pipeline/exec/partitioned_aggregation_sink_operator.h
@@ -309,9 +309,6 @@ public:
     bool require_data_distribution() const override {
         return _agg_sink_operator->require_data_distribution();
     }
-    bool require_shuffled_data_distribution() const override {
-        return _agg_sink_operator->require_shuffled_data_distribution();
-    }
 
     Status set_child(OperatorPtr child) override {
         
RETURN_IF_ERROR(DataSinkOperatorX<PartitionedAggSinkLocalState>::set_child(child));
diff --git a/be/src/pipeline/exec/partitioned_aggregation_source_operator.cpp 
b/be/src/pipeline/exec/partitioned_aggregation_source_operator.cpp
index 48df5587198..655a6e19725 100644
--- a/be/src/pipeline/exec/partitioned_aggregation_source_operator.cpp
+++ b/be/src/pipeline/exec/partitioned_aggregation_source_operator.cpp
@@ -118,6 +118,10 @@ Status PartitionedAggSourceOperatorX::close(RuntimeState* 
state) {
     return _agg_source_operator->close(state);
 }
 
+bool PartitionedAggSourceOperatorX::is_serial_operator() const {
+    return _agg_source_operator->is_serial_operator();
+}
+
 Status PartitionedAggSourceOperatorX::get_block(RuntimeState* state, 
vectorized::Block* block,
                                                 bool* eos) {
     auto& local_state = get_local_state(state);
diff --git a/be/src/pipeline/exec/partitioned_aggregation_source_operator.h 
b/be/src/pipeline/exec/partitioned_aggregation_source_operator.h
index edae99c716a..7e73241745e 100644
--- a/be/src/pipeline/exec/partitioned_aggregation_source_operator.h
+++ b/be/src/pipeline/exec/partitioned_aggregation_source_operator.h
@@ -91,6 +91,8 @@ public:
 
     bool is_source() const override { return true; }
 
+    bool is_serial_operator() const override;
+
 private:
     friend class PartitionedAggLocalState;
 
diff --git a/be/src/pipeline/exec/partitioned_hash_join_probe_operator.h 
b/be/src/pipeline/exec/partitioned_hash_join_probe_operator.h
index 3aab11f62d8..f8fc0780b6f 100644
--- a/be/src/pipeline/exec/partitioned_hash_join_probe_operator.h
+++ b/be/src/pipeline/exec/partitioned_hash_join_probe_operator.h
@@ -165,9 +165,6 @@ public:
                                            _distribution_partition_exprs));
     }
 
-    bool require_shuffled_data_distribution() const override {
-        return _join_op != TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN;
-    }
     bool is_shuffled_operator() const override {
         return _join_distribution == TJoinDistributionType::PARTITIONED;
     }
diff --git a/be/src/pipeline/exec/partitioned_hash_join_sink_operator.h 
b/be/src/pipeline/exec/partitioned_hash_join_sink_operator.h
index c768d7518b9..8e89763b50a 100644
--- a/be/src/pipeline/exec/partitioned_hash_join_sink_operator.h
+++ b/be/src/pipeline/exec/partitioned_hash_join_sink_operator.h
@@ -115,9 +115,6 @@ public:
                                           _distribution_partition_exprs);
     }
 
-    bool require_shuffled_data_distribution() const override {
-        return _join_op != TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN;
-    }
     bool is_shuffled_operator() const override {
         return _join_distribution == TJoinDistributionType::PARTITIONED;
     }
diff --git a/be/src/pipeline/exec/scan_operator.cpp 
b/be/src/pipeline/exec/scan_operator.cpp
index 88ac16f9740..cf723cd8732 100644
--- a/be/src/pipeline/exec/scan_operator.cpp
+++ b/be/src/pipeline/exec/scan_operator.cpp
@@ -70,7 +70,7 @@ Status ScanLocalState<Derived>::init(RuntimeState* state, 
LocalStateInfo& info)
     SCOPED_TIMER(exec_time_counter());
     SCOPED_TIMER(_init_timer);
     auto& p = _parent->cast<typename Derived::Parent>();
-    RETURN_IF_ERROR(RuntimeFilterConsumer::init(state, 
p.ignore_data_distribution()));
+    RETURN_IF_ERROR(RuntimeFilterConsumer::init(state, 
p.is_serial_operator()));
     // init profile for runtime filter
     RuntimeFilterConsumer::_init_profile(profile());
     init_runtime_filter_dependency(_filter_dependencies, p.operator_id(), 
p.node_id(),
@@ -994,7 +994,7 @@ Status ScanLocalState<Derived>::_start_scanners(
     auto& p = _parent->cast<typename Derived::Parent>();
     _scanner_ctx = vectorized::ScannerContext::create_shared(
             state(), this, p._output_tuple_desc, p.output_row_descriptor(), 
scanners, p.limit(),
-            _scan_dependency, p.ignore_data_distribution());
+            _scan_dependency, p.is_serial_operator());
     return Status::OK();
 }
 
@@ -1155,6 +1155,8 @@ ScanOperatorX<LocalStateType>::ScanOperatorX(ObjectPool* 
pool, const TPlanNode&
             _should_run_serial = true;
         }
     }
+    OperatorX<LocalStateType>::_is_serial_operator =
+            tnode.__isset.is_serial_operator && tnode.is_serial_operator;
     if (tnode.__isset.push_down_count) {
         _push_down_count = tnode.push_down_count;
     }
diff --git a/be/src/pipeline/exec/scan_operator.h 
b/be/src/pipeline/exec/scan_operator.h
index 2168ff5cb19..56234d9ea4e 100644
--- a/be/src/pipeline/exec/scan_operator.h
+++ b/be/src/pipeline/exec/scan_operator.h
@@ -375,8 +375,8 @@ public:
     TPushAggOp::type get_push_down_agg_type() { return _push_down_agg_type; }
 
     DataDistribution required_data_distribution() const override {
-        if (OperatorX<LocalStateType>::ignore_data_distribution()) {
-            // `ignore_data_distribution()` returns true means we ignore the 
distribution.
+        if (OperatorX<LocalStateType>::is_serial_operator()) {
+            // `is_serial_operator()` returns true means we ignore the 
distribution.
             return {ExchangeType::NOOP};
         }
         return {ExchangeType::BUCKET_HASH_SHUFFLE};
diff --git a/be/src/pipeline/exec/set_probe_sink_operator.h 
b/be/src/pipeline/exec/set_probe_sink_operator.h
index ab53f5358c2..f320c8e89cd 100644
--- a/be/src/pipeline/exec/set_probe_sink_operator.h
+++ b/be/src/pipeline/exec/set_probe_sink_operator.h
@@ -96,8 +96,6 @@ public:
                             : DataDistribution(ExchangeType::HASH_SHUFFLE, 
_partition_exprs);
     }
 
-    bool require_shuffled_data_distribution() const override { return true; }
-
     std::shared_ptr<BasicSharedState> create_shared_state() const override { 
return nullptr; }
 
 private:
diff --git a/be/src/pipeline/exec/set_sink_operator.h 
b/be/src/pipeline/exec/set_sink_operator.h
index 1c08eddc141..0e76867b31f 100644
--- a/be/src/pipeline/exec/set_sink_operator.h
+++ b/be/src/pipeline/exec/set_sink_operator.h
@@ -93,7 +93,6 @@ public:
         return _is_colocate ? 
DataDistribution(ExchangeType::BUCKET_HASH_SHUFFLE, _partition_exprs)
                             : DataDistribution(ExchangeType::HASH_SHUFFLE, 
_partition_exprs);
     }
-    bool require_shuffled_data_distribution() const override { return true; }
 
 private:
     template <class HashTableContext, bool is_intersected>
diff --git a/be/src/pipeline/exec/sort_sink_operator.cpp 
b/be/src/pipeline/exec/sort_sink_operator.cpp
index b07942b9ab1..e20b21a0bf2 100644
--- a/be/src/pipeline/exec/sort_sink_operator.cpp
+++ b/be/src/pipeline/exec/sort_sink_operator.cpp
@@ -90,7 +90,9 @@ SortSinkOperatorX::SortSinkOperatorX(ObjectPool* pool, int 
operator_id, const TP
                                                                : 
std::vector<TExpr> {}),
           _algorithm(tnode.sort_node.__isset.algorithm ? 
tnode.sort_node.algorithm
                                                        : 
TSortAlgorithm::FULL_SORT),
-          _reuse_mem(_algorithm != TSortAlgorithm::HEAP_SORT) {}
+          _reuse_mem(_algorithm != TSortAlgorithm::HEAP_SORT) {
+    _is_serial_operator = tnode.__isset.is_serial_operator && 
tnode.is_serial_operator;
+}
 
 Status SortSinkOperatorX::init(const TPlanNode& tnode, RuntimeState* state) {
     RETURN_IF_ERROR(DataSinkOperatorX::init(tnode, state));
diff --git a/be/src/pipeline/exec/sort_sink_operator.h 
b/be/src/pipeline/exec/sort_sink_operator.h
index 8462472dd02..a5a24e37163 100644
--- a/be/src/pipeline/exec/sort_sink_operator.h
+++ b/be/src/pipeline/exec/sort_sink_operator.h
@@ -69,10 +69,10 @@ public:
         } else if (_merge_by_exchange) {
             // The current sort node is used for the ORDER BY
             return {ExchangeType::PASSTHROUGH};
+        } else {
+            return {ExchangeType::NOOP};
         }
-        return 
DataSinkOperatorX<SortSinkLocalState>::required_data_distribution();
     }
-    bool require_shuffled_data_distribution() const override { return 
_is_analytic_sort; }
     bool require_data_distribution() const override { return _is_colocate; }
 
     size_t get_revocable_mem_size(RuntimeState* state) const;
diff --git a/be/src/pipeline/exec/sort_source_operator.cpp 
b/be/src/pipeline/exec/sort_source_operator.cpp
index 02a99e183c8..7f801b79c0b 100644
--- a/be/src/pipeline/exec/sort_source_operator.cpp
+++ b/be/src/pipeline/exec/sort_source_operator.cpp
@@ -30,7 +30,9 @@ SortSourceOperatorX::SortSourceOperatorX(ObjectPool* pool, 
const TPlanNode& tnod
                                          const DescriptorTbl& descs)
         : OperatorX<SortLocalState>(pool, tnode, operator_id, descs),
           _merge_by_exchange(tnode.sort_node.merge_by_exchange),
-          _offset(tnode.sort_node.__isset.offset ? tnode.sort_node.offset : 0) 
{}
+          _offset(tnode.sort_node.__isset.offset ? tnode.sort_node.offset : 0) 
{
+    _is_serial_operator = tnode.__isset.is_serial_operator && 
tnode.is_serial_operator;
+}
 
 Status SortSourceOperatorX::init(const TPlanNode& tnode, RuntimeState* state) {
     RETURN_IF_ERROR(Base::init(tnode, state));
diff --git a/be/src/pipeline/exec/union_source_operator.h 
b/be/src/pipeline/exec/union_source_operator.h
index 2d112ebf2df..200e7de8597 100644
--- a/be/src/pipeline/exec/union_source_operator.h
+++ b/be/src/pipeline/exec/union_source_operator.h
@@ -63,7 +63,9 @@ public:
     using Base = OperatorX<UnionSourceLocalState>;
     UnionSourceOperatorX(ObjectPool* pool, const TPlanNode& tnode, int 
operator_id,
                          const DescriptorTbl& descs)
-            : Base(pool, tnode, operator_id, descs), 
_child_size(tnode.num_children) {};
+            : Base(pool, tnode, operator_id, descs), 
_child_size(tnode.num_children) {
+        _is_serial_operator = tnode.__isset.is_serial_operator && 
tnode.is_serial_operator;
+    }
     ~UnionSourceOperatorX() override = default;
     Status get_block(RuntimeState* state, vectorized::Block* block, bool* eos) 
override;
 
diff --git a/be/src/pipeline/local_exchange/local_exchange_sink_operator.cpp 
b/be/src/pipeline/local_exchange/local_exchange_sink_operator.cpp
index d87113ca80a..ff243186c47 100644
--- a/be/src/pipeline/local_exchange/local_exchange_sink_operator.cpp
+++ b/be/src/pipeline/local_exchange/local_exchange_sink_operator.cpp
@@ -36,17 +36,17 @@ std::vector<Dependency*> 
LocalExchangeSinkLocalState::dependencies() const {
 }
 
 Status LocalExchangeSinkOperatorX::init(ExchangeType type, const int 
num_buckets,
-                                        const bool 
should_disable_bucket_shuffle,
+                                        const bool use_global_hash_shuffle,
                                         const std::map<int, int>& 
shuffle_idx_to_instance_idx) {
     _name = "LOCAL_EXCHANGE_SINK_OPERATOR (" + get_exchange_type_name(type) + 
")";
     _type = type;
     if (_type == ExchangeType::HASH_SHUFFLE) {
-        _use_global_shuffle = should_disable_bucket_shuffle;
+        _use_global_shuffle = use_global_hash_shuffle;
         // For shuffle join, if data distribution has been broken by previous 
operator, we
         // should use a HASH_SHUFFLE local exchanger to shuffle data again. To 
be mentioned,
         // we should use map shuffle idx to instance idx because all instances 
will be
         // distributed to all BEs. Otherwise, we should use shuffle idx 
directly.
-        if (should_disable_bucket_shuffle) {
+        if (use_global_hash_shuffle) {
             std::for_each(shuffle_idx_to_instance_idx.begin(), 
shuffle_idx_to_instance_idx.end(),
                           [&](const auto& item) {
                               DCHECK(item.first != -1);
diff --git a/be/src/pipeline/local_exchange/local_exchange_sink_operator.h 
b/be/src/pipeline/local_exchange/local_exchange_sink_operator.h
index 1cd9736d429..09b1f2cc310 100644
--- a/be/src/pipeline/local_exchange/local_exchange_sink_operator.h
+++ b/be/src/pipeline/local_exchange/local_exchange_sink_operator.h
@@ -100,7 +100,7 @@ public:
         return Status::InternalError("{} should not init with TPlanNode", 
Base::_name);
     }
 
-    Status init(ExchangeType type, const int num_buckets, const bool 
should_disable_bucket_shuffle,
+    Status init(ExchangeType type, const int num_buckets, const bool 
use_global_hash_shuffle,
                 const std::map<int, int>& shuffle_idx_to_instance_idx) 
override;
 
     Status open(RuntimeState* state) override;
diff --git a/be/src/pipeline/local_exchange/local_exchange_source_operator.h 
b/be/src/pipeline/local_exchange/local_exchange_source_operator.h
index c0da5c8120c..3c706d50182 100644
--- a/be/src/pipeline/local_exchange/local_exchange_source_operator.h
+++ b/be/src/pipeline/local_exchange/local_exchange_source_operator.h
@@ -81,9 +81,6 @@ public:
 
     bool is_source() const override { return true; }
 
-    // If input data distribution is ignored by this fragment, this first 
local exchange source in this fragment will re-assign all data.
-    bool ignore_data_distribution() const override { return false; }
-
 private:
     friend class LocalExchangeSourceLocalState;
 
diff --git a/be/src/pipeline/local_exchange/local_exchanger.cpp 
b/be/src/pipeline/local_exchange/local_exchanger.cpp
index f4630f328bb..c9f98db26a9 100644
--- a/be/src/pipeline/local_exchange/local_exchanger.cpp
+++ b/be/src/pipeline/local_exchange/local_exchanger.cpp
@@ -226,7 +226,7 @@ Status ShuffleExchanger::_split_rows(RuntimeState* state, 
const uint32_t* __rest
                 new_block_wrapper->unref(local_state._shared_state, 
local_state._channel_id);
             }
         }
-    } else if (_num_senders != _num_sources || 
_ignore_source_data_distribution) {
+    } else if (_num_senders != _num_sources) {
         // In this branch, data just should be distributed equally into all 
instances.
         new_block_wrapper->ref(_num_partitions);
         for (size_t i = 0; i < _num_partitions; i++) {
diff --git a/be/src/pipeline/local_exchange/local_exchanger.h 
b/be/src/pipeline/local_exchange/local_exchanger.h
index 01b55816ba8..cc33efbb934 100644
--- a/be/src/pipeline/local_exchange/local_exchanger.h
+++ b/be/src/pipeline/local_exchange/local_exchanger.h
@@ -217,24 +217,21 @@ public:
 
 protected:
     ShuffleExchanger(int running_sink_operators, int num_sources, int 
num_partitions,
-                     bool ignore_source_data_distribution, int 
free_block_limit)
+                     int free_block_limit)
             : Exchanger<PartitionedBlock>(running_sink_operators, num_sources, 
num_partitions,
-                                          free_block_limit),
-              
_ignore_source_data_distribution(ignore_source_data_distribution) {
+                                          free_block_limit) {
         _data_queue.resize(num_partitions);
     }
     Status _split_rows(RuntimeState* state, const uint32_t* __restrict 
channel_ids,
                        vectorized::Block* block, LocalExchangeSinkLocalState& 
local_state);
-
-    const bool _ignore_source_data_distribution = false;
 };
 
 class BucketShuffleExchanger final : public ShuffleExchanger {
     ENABLE_FACTORY_CREATOR(BucketShuffleExchanger);
     BucketShuffleExchanger(int running_sink_operators, int num_sources, int 
num_partitions,
-                           bool ignore_source_data_distribution, int 
free_block_limit)
+                           int free_block_limit)
             : ShuffleExchanger(running_sink_operators, num_sources, 
num_partitions,
-                               ignore_source_data_distribution, 
free_block_limit) {}
+                               free_block_limit) {}
     ~BucketShuffleExchanger() override = default;
     ExchangeType get_type() const override { return 
ExchangeType::BUCKET_HASH_SHUFFLE; }
 };
diff --git a/be/src/pipeline/pipeline.cpp b/be/src/pipeline/pipeline.cpp
index 6e83c7805e4..96da754daa5 100644
--- a/be/src/pipeline/pipeline.cpp
+++ b/be/src/pipeline/pipeline.cpp
@@ -22,6 +22,7 @@
 #include <utility>
 
 #include "pipeline/exec/operator.h"
+#include "pipeline/pipeline_fragment_context.h"
 #include "pipeline/pipeline_task.h"
 
 namespace doris::pipeline {
@@ -31,7 +32,48 @@ void Pipeline::_init_profile() {
     _pipeline_profile = std::make_unique<RuntimeProfile>(std::move(s));
 }
 
-Status Pipeline::add_operator(OperatorPtr& op) {
+bool Pipeline::need_to_local_exchange(const DataDistribution 
target_data_distribution,
+                                      const int idx) const {
+    // If serial operator exists after `idx`-th operator, we should not 
improve parallelism.
+    if (std::any_of(_operators.begin() + idx, _operators.end(),
+                    [&](OperatorPtr op) -> bool { return 
op->is_serial_operator(); })) {
+        return false;
+    }
+    // If all operators are serial and sink is not serial, we should improve 
parallelism for sink.
+    if (std::all_of(_operators.begin(), _operators.end(),
+                    [&](OperatorPtr op) -> bool { return 
op->is_serial_operator(); })) {
+        if (!_sink->is_serial_operator()) {
+            return true;
+        }
+    } else if (std::any_of(_operators.begin(), _operators.end(),
+                           [&](OperatorPtr op) -> bool { return 
op->is_serial_operator(); })) {
+        // If non-serial operators exist, we should improve parallelism for 
those.
+        return true;
+    }
+
+    if (target_data_distribution.distribution_type != 
ExchangeType::BUCKET_HASH_SHUFFLE &&
+        target_data_distribution.distribution_type != 
ExchangeType::HASH_SHUFFLE) {
+        // Always do local exchange if non-hash-partition exchanger is 
required.
+        // For example, `PASSTHROUGH` exchanger is always required to 
distribute data evenly.
+        return true;
+    } else if (_operators.front()->is_serial_operator()) {
+        DCHECK(std::all_of(_operators.begin(), _operators.end(),
+                           [&](OperatorPtr op) -> bool { return 
op->is_serial_operator(); }) &&
+               _sink->is_serial_operator())
+                << debug_string();
+        // All operators and sink are serial in this path.
+        return false;
+    } else {
+        return _data_distribution.distribution_type != 
target_data_distribution.distribution_type &&
+               !(is_hash_exchange(_data_distribution.distribution_type) &&
+                 is_hash_exchange(target_data_distribution.distribution_type));
+    }
+}
+
+Status Pipeline::add_operator(OperatorPtr& op, const int parallelism) {
+    if (parallelism > 0 && op->is_serial_operator()) {
+        set_num_tasks(parallelism);
+    }
     op->set_parallel_tasks(num_tasks());
     _operators.emplace_back(op);
     if (op->is_source()) {
diff --git a/be/src/pipeline/pipeline.h b/be/src/pipeline/pipeline.h
index 8a20ccb631c..619848110d4 100644
--- a/be/src/pipeline/pipeline.h
+++ b/be/src/pipeline/pipeline.h
@@ -44,14 +44,16 @@ class Pipeline : public 
std::enable_shared_from_this<Pipeline> {
 
 public:
     explicit Pipeline(PipelineId pipeline_id, int num_tasks,
-                      std::weak_ptr<PipelineFragmentContext> context)
-            : _pipeline_id(pipeline_id), _num_tasks(num_tasks) {
+                      std::weak_ptr<PipelineFragmentContext> context, int 
num_tasks_of_parent)
+            : _pipeline_id(pipeline_id),
+              _num_tasks(num_tasks),
+              _num_tasks_of_parent(num_tasks_of_parent) {
         _init_profile();
         _tasks.resize(_num_tasks, nullptr);
     }
 
     // Add operators for pipelineX
-    Status add_operator(OperatorPtr& op);
+    Status add_operator(OperatorPtr& op, const int parallelism);
     // prepare operators for pipelineX
     Status prepare(RuntimeState* state);
 
@@ -71,28 +73,8 @@ public:
         return idx == ExchangeType::HASH_SHUFFLE || idx == 
ExchangeType::BUCKET_HASH_SHUFFLE;
     }
 
-    bool need_to_local_exchange(const DataDistribution 
target_data_distribution) const {
-        if (target_data_distribution.distribution_type != 
ExchangeType::BUCKET_HASH_SHUFFLE &&
-            target_data_distribution.distribution_type != 
ExchangeType::HASH_SHUFFLE) {
-            return true;
-        } else if (_operators.front()->ignore_data_hash_distribution()) {
-            if (_data_distribution.distribution_type ==
-                        target_data_distribution.distribution_type &&
-                (_data_distribution.partition_exprs.empty() ||
-                 target_data_distribution.partition_exprs.empty())) {
-                return true;
-            }
-            return _data_distribution.distribution_type !=
-                           target_data_distribution.distribution_type &&
-                   !(is_hash_exchange(_data_distribution.distribution_type) &&
-                     
is_hash_exchange(target_data_distribution.distribution_type));
-        } else {
-            return _data_distribution.distribution_type !=
-                           target_data_distribution.distribution_type &&
-                   !(is_hash_exchange(_data_distribution.distribution_type) &&
-                     
is_hash_exchange(target_data_distribution.distribution_type));
-        }
-    }
+    bool need_to_local_exchange(const DataDistribution 
target_data_distribution,
+                                const int idx) const;
     void init_data_distribution() {
         
set_data_distribution(_operators.front()->required_data_distribution());
     }
@@ -120,11 +102,19 @@ public:
         for (auto& op : _operators) {
             op->set_parallel_tasks(_num_tasks);
         }
+
+#ifndef NDEBUG
+        if (num_tasks > 1 &&
+            std::any_of(_operators.begin(), _operators.end(),
+                        [&](OperatorPtr op) -> bool { return 
op->is_serial_operator(); })) {
+            DCHECK(false) << debug_string();
+        }
+#endif
     }
     int num_tasks() const { return _num_tasks; }
     bool close_task() { return _num_tasks_running.fetch_sub(1) == 1; }
 
-    std::string debug_string() {
+    std::string debug_string() const {
         fmt::memory_buffer debug_string_buffer;
         fmt::format_to(debug_string_buffer,
                        "Pipeline [id: {}, _num_tasks: {}, _num_tasks_created: 
{}]", _pipeline_id,
@@ -136,6 +126,8 @@ public:
         return fmt::to_string(debug_string_buffer);
     }
 
+    int num_tasks_of_parent() const { return _num_tasks_of_parent; }
+
 private:
     void _init_profile();
 
@@ -173,6 +165,8 @@ private:
     std::atomic<int> _num_tasks_running = 0;
     // Tasks in this pipeline.
     std::vector<PipelineTask*> _tasks;
+    // Parallelism of parent pipeline.
+    const int _num_tasks_of_parent;
 };
 
 } // namespace doris::pipeline
diff --git a/be/src/pipeline/pipeline_fragment_context.cpp 
b/be/src/pipeline/pipeline_fragment_context.cpp
index 7f3fa348237..a74f0818cb3 100644
--- a/be/src/pipeline/pipeline_fragment_context.cpp
+++ b/be/src/pipeline/pipeline_fragment_context.cpp
@@ -112,7 +112,6 @@
 #include "vec/runtime/vdata_stream_mgr.h"
 
 namespace doris::pipeline {
-bvar::Adder<int64_t> g_pipeline_tasks_count("doris_pipeline_tasks_count");
 
 PipelineFragmentContext::PipelineFragmentContext(
         const TUniqueId& query_id, const int fragment_id, 
std::shared_ptr<QueryContext> query_ctx,
@@ -183,9 +182,10 @@ void PipelineFragmentContext::cancel(const Status reason) {
         LOG(WARNING) << "PipelineFragmentContext is cancelled due to timeout : 
" << debug_string();
     }
 
+    // `ILLEGAL_STATE` means queries this fragment belongs to was not found in 
FE (maybe finished)
     if (reason.is<ErrorCode::ILLEGAL_STATE>()) {
         LOG_WARNING("PipelineFragmentContext is cancelled due to illegal state 
: {}",
-                    this->debug_string());
+                    debug_string());
     }
 
     _query_ctx->cancel(reason, _fragment_id);
@@ -210,28 +210,20 @@ void PipelineFragmentContext::cancel(const Status reason) 
{
     }
 }
 
-PipelinePtr PipelineFragmentContext::add_pipeline() {
-    // _prepared、_submitted, _canceled should do not add pipeline
-    PipelineId id = _next_pipeline_id++;
-    auto pipeline = std::make_shared<Pipeline>(
-            id, _num_instances,
-            
std::dynamic_pointer_cast<PipelineFragmentContext>(shared_from_this()));
-    _pipelines.emplace_back(pipeline);
-    return pipeline;
-}
-
 PipelinePtr PipelineFragmentContext::add_pipeline(PipelinePtr parent, int idx) 
{
-    // _prepared、_submitted, _canceled should do not add pipeline
     PipelineId id = _next_pipeline_id++;
     auto pipeline = std::make_shared<Pipeline>(
-            id, _num_instances,
-            
std::dynamic_pointer_cast<PipelineFragmentContext>(shared_from_this()));
+            id, parent ? std::min(parent->num_tasks(), _num_instances) : 
_num_instances,
+            
std::dynamic_pointer_cast<PipelineFragmentContext>(shared_from_this()),
+            parent ? parent->num_tasks() : _num_instances);
     if (idx >= 0) {
         _pipelines.insert(_pipelines.begin() + idx, pipeline);
     } else {
         _pipelines.emplace_back(pipeline);
     }
-    parent->set_children(pipeline);
+    if (parent) {
+        parent->set_children(pipeline);
+    }
     return pipeline;
 }
 
@@ -249,7 +241,7 @@ Status PipelineFragmentContext::prepare(const 
doris::TPipelineFragmentParams& re
     SCOPED_TIMER(_prepare_timer);
     _build_pipelines_timer = ADD_TIMER(_runtime_profile, "BuildPipelinesTime");
     _init_context_timer = ADD_TIMER(_runtime_profile, "InitContextTime");
-    _plan_local_shuffle_timer = ADD_TIMER(_runtime_profile, 
"PlanLocalShuffleTime");
+    _plan_local_exchanger_timer = ADD_TIMER(_runtime_profile, 
"PlanLocalLocalExchangerTime");
     _build_tasks_timer = ADD_TIMER(_runtime_profile, "BuildTasksTime");
     _prepare_all_pipelines_timer = ADD_TIMER(_runtime_profile, 
"PrepareAllPipelinesTime");
     {
@@ -336,14 +328,15 @@ Status PipelineFragmentContext::prepare(const 
doris::TPipelineFragmentParams& re
             
RETURN_IF_ERROR(pipeline->sink()->set_child(pipeline->operators().back()));
         }
     }
-    if (_enable_local_shuffle()) {
-        SCOPED_TIMER(_plan_local_shuffle_timer);
+    // 4. Build local exchanger
+    if (_runtime_state->enable_local_shuffle()) {
+        SCOPED_TIMER(_plan_local_exchanger_timer);
         RETURN_IF_ERROR(_plan_local_exchange(request.num_buckets,
                                              
request.bucket_seq_to_instance_idx,
                                              
request.shuffle_idx_to_instance_idx));
     }
 
-    // 4. Initialize global states in pipelines.
+    // 5. Initialize global states in pipelines.
     for (PipelinePtr& pipeline : _pipelines) {
         SCOPED_TIMER(_prepare_all_pipelines_timer);
         pipeline->children().clear();
@@ -352,7 +345,7 @@ Status PipelineFragmentContext::prepare(const 
doris::TPipelineFragmentParams& re
 
     {
         SCOPED_TIMER(_build_tasks_timer);
-        // 5. Build pipeline tasks and initialize local state.
+        // 6. Build pipeline tasks and initialize local state.
         RETURN_IF_ERROR(_build_pipeline_tasks(request, thread_pool));
     }
 
@@ -380,40 +373,6 @@ Status 
PipelineFragmentContext::_build_pipeline_tasks(const doris::TPipelineFrag
         const auto& local_params = request.local_params[i];
         auto fragment_instance_id = local_params.fragment_instance_id;
         _fragment_instance_ids[i] = fragment_instance_id;
-        std::unique_ptr<RuntimeFilterMgr> runtime_filter_mgr;
-        auto init_runtime_state = [&](std::unique_ptr<RuntimeState>& 
runtime_state) {
-            
runtime_state->set_query_mem_tracker(_query_ctx->query_mem_tracker);
-
-            runtime_state->set_task_execution_context(shared_from_this());
-            runtime_state->set_be_number(local_params.backend_num);
-
-            if (request.__isset.backend_id) {
-                runtime_state->set_backend_id(request.backend_id);
-            }
-            if (request.__isset.import_label) {
-                runtime_state->set_import_label(request.import_label);
-            }
-            if (request.__isset.db_name) {
-                runtime_state->set_db_name(request.db_name);
-            }
-            if (request.__isset.load_job_id) {
-                runtime_state->set_load_job_id(request.load_job_id);
-            }
-            if (request.__isset.wal_id) {
-                runtime_state->set_wal_id(request.wal_id);
-            }
-
-            runtime_state->set_desc_tbl(_desc_tbl);
-            
runtime_state->set_per_fragment_instance_idx(local_params.sender_id);
-            runtime_state->set_num_per_fragment_instances(request.num_senders);
-            runtime_state->resize_op_id_to_local_state(max_operator_id());
-            runtime_state->set_max_operator_id(max_operator_id());
-            
runtime_state->set_load_stream_per_node(request.load_stream_per_node);
-            runtime_state->set_total_load_streams(request.total_load_streams);
-            runtime_state->set_num_local_sink(request.num_local_sink);
-            DCHECK(runtime_filter_mgr);
-            runtime_state->set_runtime_filter_mgr(runtime_filter_mgr.get());
-        };
 
         auto filterparams = std::make_unique<RuntimeFilterParamsContext>();
 
@@ -432,8 +391,7 @@ Status PipelineFragmentContext::_build_pipeline_tasks(const 
doris::TPipelineFrag
             filterparams->query_ctx = _query_ctx.get();
         }
 
-        // build local_runtime_filter_mgr for each instance
-        runtime_filter_mgr = std::make_unique<RuntimeFilterMgr>(
+        auto runtime_filter_mgr = std::make_unique<RuntimeFilterMgr>(
                 request.query_id, filterparams.get(), 
_query_ctx->query_mem_tracker);
 
         filterparams->runtime_filter_mgr = runtime_filter_mgr.get();
@@ -470,7 +428,41 @@ Status 
PipelineFragmentContext::_build_pipeline_tasks(const doris::TPipelineFrag
                         request.fragment_id, request.query_options, 
_query_ctx->query_globals,
                         _exec_env, _query_ctx.get());
                 auto& task_runtime_state = _task_runtime_states[pip_idx][i];
-                init_runtime_state(task_runtime_state);
+                {
+                    // Initialize runtime state for this task
+                    
task_runtime_state->set_query_mem_tracker(_query_ctx->query_mem_tracker);
+
+                    
task_runtime_state->set_task_execution_context(shared_from_this());
+                    
task_runtime_state->set_be_number(local_params.backend_num);
+
+                    if (request.__isset.backend_id) {
+                        task_runtime_state->set_backend_id(request.backend_id);
+                    }
+                    if (request.__isset.import_label) {
+                        
task_runtime_state->set_import_label(request.import_label);
+                    }
+                    if (request.__isset.db_name) {
+                        task_runtime_state->set_db_name(request.db_name);
+                    }
+                    if (request.__isset.load_job_id) {
+                        
task_runtime_state->set_load_job_id(request.load_job_id);
+                    }
+                    if (request.__isset.wal_id) {
+                        task_runtime_state->set_wal_id(request.wal_id);
+                    }
+
+                    task_runtime_state->set_desc_tbl(_desc_tbl);
+                    
task_runtime_state->set_per_fragment_instance_idx(local_params.sender_id);
+                    
task_runtime_state->set_num_per_fragment_instances(request.num_senders);
+                    
task_runtime_state->resize_op_id_to_local_state(max_operator_id());
+                    task_runtime_state->set_max_operator_id(max_operator_id());
+                    
task_runtime_state->set_load_stream_per_node(request.load_stream_per_node);
+                    
task_runtime_state->set_total_load_streams(request.total_load_streams);
+                    
task_runtime_state->set_num_local_sink(request.num_local_sink);
+                    DCHECK(_runtime_filter_states[i]->runtime_filter_mgr);
+                    task_runtime_state->set_runtime_filter_mgr(
+                            _runtime_filter_states[i]->runtime_filter_mgr);
+                }
                 auto cur_task_id = _total_tasks++;
                 task_runtime_state->set_task_id(cur_task_id);
                 task_runtime_state->set_task_num(pipeline->num_tasks());
@@ -502,22 +494,12 @@ Status 
PipelineFragmentContext::_build_pipeline_tasks(const doris::TPipelineFrag
          * Finally, we have two upstream dependencies in Pipeline1 
corresponding to JoinProbeOperator1
          * and JoinProbeOperator2.
          */
-
-        // First, set up the parent profile,task runtime state
-
-        auto prepare_and_set_parent_profile = [&](PipelineTask* task, size_t 
pip_idx) {
-            DCHECK(pipeline_id_to_profile[pip_idx]);
-            RETURN_IF_ERROR(
-                    task->prepare(local_params, request.fragment.output_sink, 
_query_ctx.get()));
-            return Status::OK();
-        };
-
         for (auto& _pipeline : _pipelines) {
             if (pipeline_id_to_task.contains(_pipeline->id())) {
                 auto* task = pipeline_id_to_task[_pipeline->id()];
                 DCHECK(task != nullptr);
 
-                // if this task has upstream dependency, then record them.
+                // If this task has upstream dependency, then inject it into 
this task.
                 if (_dag.find(_pipeline->id()) != _dag.end()) {
                     auto& deps = _dag[_pipeline->id()];
                     for (auto& dep : deps) {
@@ -537,7 +519,9 @@ Status PipelineFragmentContext::_build_pipeline_tasks(const 
doris::TPipelineFrag
         for (size_t pip_idx = 0; pip_idx < _pipelines.size(); pip_idx++) {
             if (pipeline_id_to_task.contains(_pipelines[pip_idx]->id())) {
                 auto* task = pipeline_id_to_task[_pipelines[pip_idx]->id()];
-                RETURN_IF_ERROR(prepare_and_set_parent_profile(task, pip_idx));
+                DCHECK(pipeline_id_to_profile[pip_idx]);
+                RETURN_IF_ERROR(task->prepare(local_params, 
request.fragment.output_sink,
+                                              _query_ctx.get()));
             }
         }
         {
@@ -549,6 +533,7 @@ Status PipelineFragmentContext::_build_pipeline_tasks(const 
doris::TPipelineFrag
     if (target_size > 1 &&
         (_runtime_state->query_options().__isset.parallel_prepare_threshold &&
          target_size > 
_runtime_state->query_options().parallel_prepare_threshold)) {
+        // If instances parallelism is big enough ( > 
parallel_prepare_threshold), we will prepare all tasks by multi-threads
         std::vector<Status> prepare_status(target_size);
         std::mutex m;
         std::condition_variable cv;
@@ -636,8 +621,8 @@ void PipelineFragmentContext::trigger_report_if_necessary() 
{
                 _runtime_state->load_channel_profile()->pretty_print(&ss);
             }
 
-            VLOG_FILE << "Query " << print_id(this->get_query_id()) << " 
fragment "
-                      << this->get_fragment_id() << " profile:\n"
+            VLOG_FILE << "Query " << print_id(get_query_id()) << " fragment " 
<< get_fragment_id()
+                      << " profile:\n"
                       << ss.str();
         }
         auto st = send_report(false);
@@ -716,6 +701,9 @@ Status 
PipelineFragmentContext::_create_tree_helper(ObjectPool* pool,
             (followed_by_shuffled_operator || op->is_shuffled_operator()) &&
             require_shuffled_data_distribution;
 
+    if (num_children == 0) {
+        _use_serial_source = op->is_serial_operator();
+    }
     // rely on that tnodes is preorder of the plan
     for (int i = 0; i < num_children; i++) {
         ++*node_idx;
@@ -748,9 +736,7 @@ Status PipelineFragmentContext::_add_local_exchange_impl(
         int idx, ObjectPool* pool, PipelinePtr cur_pipe, PipelinePtr new_pip,
         DataDistribution data_distribution, bool* do_local_exchange, int 
num_buckets,
         const std::map<int, int>& bucket_seq_to_instance_idx,
-        const std::map<int, int>& shuffle_idx_to_instance_idx,
-        const bool ignore_data_hash_distribution) {
-    num_buckets = num_buckets != 0 ? num_buckets : _num_instances;
+        const std::map<int, int>& shuffle_idx_to_instance_idx) {
     auto& operators = cur_pipe->operators();
     const auto downstream_pipeline_id = cur_pipe->id();
     auto local_exchange_id = next_operator_id();
@@ -765,13 +751,12 @@ Status PipelineFragmentContext::_add_local_exchange_impl(
     const bool followed_by_shuffled_operator =
             operators.size() > idx ? 
operators[idx]->followed_by_shuffled_operator()
                                    : 
cur_pipe->sink()->followed_by_shuffled_operator();
-    const bool should_disable_bucket_shuffle =
+    const bool use_global_hash_shuffle =
             bucket_seq_to_instance_idx.empty() &&
             shuffle_idx_to_instance_idx.find(-1) == 
shuffle_idx_to_instance_idx.end() &&
-            followed_by_shuffled_operator;
+            followed_by_shuffled_operator && !_use_serial_source;
     sink.reset(new LocalExchangeSinkOperatorX(
-            sink_id, local_exchange_id,
-            should_disable_bucket_shuffle ? _total_instances : _num_instances,
+            sink_id, local_exchange_id, use_global_hash_shuffle ? 
_total_instances : _num_instances,
             data_distribution.partition_exprs, bucket_seq_to_instance_idx));
     if (bucket_seq_to_instance_idx.empty() &&
         data_distribution.distribution_type == 
ExchangeType::BUCKET_HASH_SHUFFLE) {
@@ -779,8 +764,7 @@ Status PipelineFragmentContext::_add_local_exchange_impl(
     }
     RETURN_IF_ERROR(new_pip->set_sink(sink));
     RETURN_IF_ERROR(new_pip->sink()->init(data_distribution.distribution_type, 
num_buckets,
-                                          should_disable_bucket_shuffle,
-                                          shuffle_idx_to_instance_idx));
+                                          use_global_hash_shuffle, 
shuffle_idx_to_instance_idx));
 
     // 2. Create and initialize LocalExchangeSharedState.
     std::shared_ptr<LocalExchangeSharedState> shared_state =
@@ -791,7 +775,7 @@ Status PipelineFragmentContext::_add_local_exchange_impl(
     case ExchangeType::HASH_SHUFFLE:
         shared_state->exchanger = ShuffleExchanger::create_unique(
                 std::max(cur_pipe->num_tasks(), _num_instances),
-                should_disable_bucket_shuffle ? _total_instances : 
_num_instances,
+                use_global_hash_shuffle ? _total_instances : _num_instances,
                 
_runtime_state->query_options().__isset.local_exchange_free_blocks_limit
                         ? 
_runtime_state->query_options().local_exchange_free_blocks_limit
                         : 0);
@@ -799,7 +783,6 @@ Status PipelineFragmentContext::_add_local_exchange_impl(
     case ExchangeType::BUCKET_HASH_SHUFFLE:
         shared_state->exchanger = BucketShuffleExchanger::create_unique(
                 std::max(cur_pipe->num_tasks(), _num_instances), 
_num_instances, num_buckets,
-                ignore_data_hash_distribution,
                 
_runtime_state->query_options().__isset.local_exchange_free_blocks_limit
                         ? 
_runtime_state->query_options().local_exchange_free_blocks_limit
                         : 0);
@@ -929,14 +912,12 @@ Status PipelineFragmentContext::_add_local_exchange(
         int pip_idx, int idx, int node_id, ObjectPool* pool, PipelinePtr 
cur_pipe,
         DataDistribution data_distribution, bool* do_local_exchange, int 
num_buckets,
         const std::map<int, int>& bucket_seq_to_instance_idx,
-        const std::map<int, int>& shuffle_idx_to_instance_idx,
-        const bool ignore_data_distribution) {
-    DCHECK(_enable_local_shuffle());
-    if (_num_instances <= 1) {
+        const std::map<int, int>& shuffle_idx_to_instance_idx) {
+    if (_num_instances <= 1 || cur_pipe->num_tasks_of_parent() <= 1) {
         return Status::OK();
     }
 
-    if (!cur_pipe->need_to_local_exchange(data_distribution)) {
+    if (!cur_pipe->need_to_local_exchange(data_distribution, idx)) {
         return Status::OK();
     }
     *do_local_exchange = true;
@@ -946,7 +927,7 @@ Status PipelineFragmentContext::_add_local_exchange(
     auto new_pip = add_pipeline(cur_pipe, pip_idx + 1);
     RETURN_IF_ERROR(_add_local_exchange_impl(
             idx, pool, cur_pipe, new_pip, data_distribution, 
do_local_exchange, num_buckets,
-            bucket_seq_to_instance_idx, shuffle_idx_to_instance_idx, 
ignore_data_distribution));
+            bucket_seq_to_instance_idx, shuffle_idx_to_instance_idx));
 
     CHECK(total_op_num + 1 == cur_pipe->operators().size() + 
new_pip->operators().size())
             << "total_op_num: " << total_op_num
@@ -959,7 +940,7 @@ Status PipelineFragmentContext::_add_local_exchange(
         RETURN_IF_ERROR(_add_local_exchange_impl(
                 new_pip->operators().size(), pool, new_pip, 
add_pipeline(new_pip, pip_idx + 2),
                 DataDistribution(ExchangeType::PASSTHROUGH), 
do_local_exchange, num_buckets,
-                bucket_seq_to_instance_idx, shuffle_idx_to_instance_idx, 
ignore_data_distribution));
+                bucket_seq_to_instance_idx, shuffle_idx_to_instance_idx));
     }
     return Status::OK();
 }
@@ -985,13 +966,8 @@ Status PipelineFragmentContext::_plan_local_exchange(
         // scan node. so here use `_num_instance` to replace the `num_buckets` 
to prevent dividing 0
         // still keep colocate plan after local shuffle
         RETURN_IF_ERROR(_plan_local_exchange(
-                
_pipelines[pip_idx]->operators().front()->ignore_data_hash_distribution() ||
-                                num_buckets == 0
-                        ? _num_instances
-                        : num_buckets,
-                pip_idx, _pipelines[pip_idx], bucket_seq_to_instance_idx,
-                shuffle_idx_to_instance_idx,
-                
_pipelines[pip_idx]->operators().front()->ignore_data_hash_distribution()));
+                _use_serial_source || num_buckets == 0 ? _num_instances : 
num_buckets, pip_idx,
+                _pipelines[pip_idx], bucket_seq_to_instance_idx, 
shuffle_idx_to_instance_idx));
     }
     return Status::OK();
 }
@@ -999,8 +975,7 @@ Status PipelineFragmentContext::_plan_local_exchange(
 Status PipelineFragmentContext::_plan_local_exchange(
         int num_buckets, int pip_idx, PipelinePtr pip,
         const std::map<int, int>& bucket_seq_to_instance_idx,
-        const std::map<int, int>& shuffle_idx_to_instance_idx,
-        const bool ignore_data_hash_distribution) {
+        const std::map<int, int>& shuffle_idx_to_instance_idx) {
     int idx = 1;
     bool do_local_exchange = false;
     do {
@@ -1012,8 +987,7 @@ Status PipelineFragmentContext::_plan_local_exchange(
                 RETURN_IF_ERROR(_add_local_exchange(
                         pip_idx, idx, ops[idx]->node_id(), 
_runtime_state->obj_pool(), pip,
                         ops[idx]->required_data_distribution(), 
&do_local_exchange, num_buckets,
-                        bucket_seq_to_instance_idx, 
shuffle_idx_to_instance_idx,
-                        ignore_data_hash_distribution));
+                        bucket_seq_to_instance_idx, 
shuffle_idx_to_instance_idx));
             }
             if (do_local_exchange) {
                 // If local exchange is needed for current operator, we will 
split this pipeline to
@@ -1030,8 +1004,7 @@ Status PipelineFragmentContext::_plan_local_exchange(
         RETURN_IF_ERROR(_add_local_exchange(
                 pip_idx, idx, pip->sink()->node_id(), 
_runtime_state->obj_pool(), pip,
                 pip->sink()->required_data_distribution(), &do_local_exchange, 
num_buckets,
-                bucket_seq_to_instance_idx, shuffle_idx_to_instance_idx,
-                ignore_data_hash_distribution));
+                bucket_seq_to_instance_idx, shuffle_idx_to_instance_idx));
     }
     return Status::OK();
 }
@@ -1170,7 +1143,8 @@ Status 
PipelineFragmentContext::_create_data_sink(ObjectPool* pool, const TDataS
             // 1. create and set the source operator of 
multi_cast_data_stream_source for new pipeline
             source_op.reset(new MultiCastDataStreamerSourceOperatorX(
                     i, pool, thrift_sink.multi_cast_stream_sink.sinks[i], 
row_desc, source_id));
-            RETURN_IF_ERROR(new_pipeline->add_operator(source_op));
+            RETURN_IF_ERROR(new_pipeline->add_operator(
+                    source_op, params.__isset.parallel_instances ? 
params.parallel_instances : 0));
             // 2. create and set sink operator of data stream sender for new 
pipeline
 
             DataSinkOperatorPtr sink_op;
@@ -1219,11 +1193,8 @@ Status 
PipelineFragmentContext::_create_operator(ObjectPool* pool, const TPlanNo
         op.reset(new OlapScanOperatorX(
                 pool, tnode, next_operator_id(), descs, _num_instances,
                 enable_query_cache ? request.fragment.query_cache_param : 
TQueryCacheParam {}));
-        RETURN_IF_ERROR(cur_pipe->add_operator(op));
-        if (request.__isset.parallel_instances) {
-            cur_pipe->set_num_tasks(request.parallel_instances);
-            op->set_ignore_data_distribution();
-        }
+        RETURN_IF_ERROR(cur_pipe->add_operator(
+                op, request.__isset.parallel_instances ? 
request.parallel_instances : 0));
         break;
     }
     case TPlanNodeType::GROUP_COMMIT_SCAN_NODE: {
@@ -1232,56 +1203,41 @@ Status 
PipelineFragmentContext::_create_operator(ObjectPool* pool, const TPlanNo
         _query_ctx->query_mem_tracker->is_group_commit_load = true;
 #endif
         op.reset(new GroupCommitOperatorX(pool, tnode, next_operator_id(), 
descs, _num_instances));
-        RETURN_IF_ERROR(cur_pipe->add_operator(op));
-        if (request.__isset.parallel_instances) {
-            cur_pipe->set_num_tasks(request.parallel_instances);
-            op->set_ignore_data_distribution();
-        }
+        RETURN_IF_ERROR(cur_pipe->add_operator(
+                op, request.__isset.parallel_instances ? 
request.parallel_instances : 0));
         break;
     }
     case doris::TPlanNodeType::JDBC_SCAN_NODE: {
         if (config::enable_java_support) {
             op.reset(new JDBCScanOperatorX(pool, tnode, next_operator_id(), 
descs, _num_instances));
-            RETURN_IF_ERROR(cur_pipe->add_operator(op));
+            RETURN_IF_ERROR(cur_pipe->add_operator(
+                    op, request.__isset.parallel_instances ? 
request.parallel_instances : 0));
         } else {
             return Status::InternalError(
                     "Jdbc scan node is disabled, you can change be config 
enable_java_support "
                     "to true and restart be.");
         }
-        if (request.__isset.parallel_instances) {
-            cur_pipe->set_num_tasks(request.parallel_instances);
-            op->set_ignore_data_distribution();
-        }
         break;
     }
     case doris::TPlanNodeType::FILE_SCAN_NODE: {
         op.reset(new FileScanOperatorX(pool, tnode, next_operator_id(), descs, 
_num_instances));
-        RETURN_IF_ERROR(cur_pipe->add_operator(op));
-        if (request.__isset.parallel_instances) {
-            cur_pipe->set_num_tasks(request.parallel_instances);
-            op->set_ignore_data_distribution();
-        }
+        RETURN_IF_ERROR(cur_pipe->add_operator(
+                op, request.__isset.parallel_instances ? 
request.parallel_instances : 0));
         break;
     }
     case TPlanNodeType::ES_SCAN_NODE:
     case TPlanNodeType::ES_HTTP_SCAN_NODE: {
         op.reset(new EsScanOperatorX(pool, tnode, next_operator_id(), descs, 
_num_instances));
-        RETURN_IF_ERROR(cur_pipe->add_operator(op));
-        if (request.__isset.parallel_instances) {
-            cur_pipe->set_num_tasks(request.parallel_instances);
-            op->set_ignore_data_distribution();
-        }
+        RETURN_IF_ERROR(cur_pipe->add_operator(
+                op, request.__isset.parallel_instances ? 
request.parallel_instances : 0));
         break;
     }
     case TPlanNodeType::EXCHANGE_NODE: {
         int num_senders = find_with_default(request.per_exch_num_senders, 
tnode.node_id, 0);
         DCHECK_GT(num_senders, 0);
         op.reset(new ExchangeSourceOperatorX(pool, tnode, next_operator_id(), 
descs, num_senders));
-        RETURN_IF_ERROR(cur_pipe->add_operator(op));
-        if (request.__isset.parallel_instances) {
-            op->set_ignore_data_distribution();
-            cur_pipe->set_num_tasks(request.parallel_instances);
-        }
+        RETURN_IF_ERROR(cur_pipe->add_operator(
+                op, request.__isset.parallel_instances ? 
request.parallel_instances : 0));
         break;
     }
     case TPlanNodeType::AGGREGATION_NODE: {
@@ -1296,7 +1252,8 @@ Status 
PipelineFragmentContext::_create_operator(ObjectPool* pool, const TPlanNo
             auto cache_source_id = next_operator_id();
             op.reset(new CacheSourceOperatorX(pool, cache_node_id, 
cache_source_id,
                                               
request.fragment.query_cache_param));
-            RETURN_IF_ERROR(cur_pipe->add_operator(op));
+            RETURN_IF_ERROR(cur_pipe->add_operator(
+                    op, request.__isset.parallel_instances ? 
request.parallel_instances : 0));
 
             const auto downstream_pipeline_id = cur_pipe->id();
             if (_dag.find(downstream_pipeline_id) == _dag.end()) {
@@ -1331,7 +1288,8 @@ Status 
PipelineFragmentContext::_create_operator(ObjectPool* pool, const TPlanNo
                                                            
_require_bucket_distribution));
                 op->set_followed_by_shuffled_operator(false);
                 _require_bucket_distribution = true;
-                RETURN_IF_ERROR(new_pipe->add_operator(op));
+                RETURN_IF_ERROR(new_pipe->add_operator(
+                        op, request.__isset.parallel_instances ? 
request.parallel_instances : 0));
                 RETURN_IF_ERROR(cur_pipe->operators().front()->set_child(op));
                 cur_pipe = new_pipe;
             } else {
@@ -1340,7 +1298,8 @@ Status 
PipelineFragmentContext::_create_operator(ObjectPool* pool, const TPlanNo
                 
op->set_followed_by_shuffled_operator(followed_by_shuffled_operator);
                 _require_bucket_distribution =
                         _require_bucket_distribution || 
op->require_data_distribution();
-                RETURN_IF_ERROR(cur_pipe->add_operator(op));
+                RETURN_IF_ERROR(cur_pipe->add_operator(
+                        op, request.__isset.parallel_instances ? 
request.parallel_instances : 0));
             }
         } else if (tnode.agg_node.__isset.use_streaming_preaggregation &&
                    tnode.agg_node.use_streaming_preaggregation &&
@@ -1351,11 +1310,13 @@ Status 
PipelineFragmentContext::_create_operator(ObjectPool* pool, const TPlanNo
 
                 op.reset(new StreamingAggOperatorX(pool, next_operator_id(), 
tnode, descs));
                 RETURN_IF_ERROR(cur_pipe->operators().front()->set_child(op));
-                RETURN_IF_ERROR(new_pipe->add_operator(op));
+                RETURN_IF_ERROR(new_pipe->add_operator(
+                        op, request.__isset.parallel_instances ? 
request.parallel_instances : 0));
                 cur_pipe = new_pipe;
             } else {
                 op.reset(new StreamingAggOperatorX(pool, next_operator_id(), 
tnode, descs));
-                RETURN_IF_ERROR(cur_pipe->add_operator(op));
+                RETURN_IF_ERROR(cur_pipe->add_operator(
+                        op, request.__isset.parallel_instances ? 
request.parallel_instances : 0));
             }
         } else {
             // create new pipeline to add query cache operator
@@ -1371,10 +1332,12 @@ Status 
PipelineFragmentContext::_create_operator(ObjectPool* pool, const TPlanNo
             }
             if (enable_query_cache) {
                 RETURN_IF_ERROR(cur_pipe->operators().front()->set_child(op));
-                RETURN_IF_ERROR(new_pipe->add_operator(op));
+                RETURN_IF_ERROR(new_pipe->add_operator(
+                        op, request.__isset.parallel_instances ? 
request.parallel_instances : 0));
                 cur_pipe = new_pipe;
             } else {
-                RETURN_IF_ERROR(cur_pipe->add_operator(op));
+                RETURN_IF_ERROR(cur_pipe->add_operator(
+                        op, request.__isset.parallel_instances ? 
request.parallel_instances : 0));
             }
 
             const auto downstream_pipeline_id = cur_pipe->id();
@@ -1422,7 +1385,8 @@ Status 
PipelineFragmentContext::_create_operator(ObjectPool* pool, const TPlanNo
                     pool, tnode_, next_operator_id(), descs, partition_count);
             probe_operator->set_inner_operators(inner_sink_operator, 
inner_probe_operator);
             op = std::move(probe_operator);
-            RETURN_IF_ERROR(cur_pipe->add_operator(op));
+            RETURN_IF_ERROR(cur_pipe->add_operator(
+                    op, request.__isset.parallel_instances ? 
request.parallel_instances : 0));
 
             const auto downstream_pipeline_id = cur_pipe->id();
             if (_dag.find(downstream_pipeline_id) == _dag.end()) {
@@ -1446,7 +1410,8 @@ Status 
PipelineFragmentContext::_create_operator(ObjectPool* pool, const TPlanNo
             op->set_followed_by_shuffled_operator(op->is_shuffled_operator());
         } else {
             op.reset(new HashJoinProbeOperatorX(pool, tnode, 
next_operator_id(), descs));
-            RETURN_IF_ERROR(cur_pipe->add_operator(op));
+            RETURN_IF_ERROR(cur_pipe->add_operator(
+                    op, request.__isset.parallel_instances ? 
request.parallel_instances : 0));
 
             const auto downstream_pipeline_id = cur_pipe->id();
             if (_dag.find(downstream_pipeline_id) == _dag.end()) {
@@ -1473,7 +1438,8 @@ Status 
PipelineFragmentContext::_create_operator(ObjectPool* pool, const TPlanNo
     }
     case TPlanNodeType::CROSS_JOIN_NODE: {
         op.reset(new NestedLoopJoinProbeOperatorX(pool, tnode, 
next_operator_id(), descs));
-        RETURN_IF_ERROR(cur_pipe->add_operator(op));
+        RETURN_IF_ERROR(cur_pipe->add_operator(
+                op, request.__isset.parallel_instances ? 
request.parallel_instances : 0));
 
         const auto downstream_pipeline_id = cur_pipe->id();
         if (_dag.find(downstream_pipeline_id) == _dag.end()) {
@@ -1496,7 +1462,8 @@ Status 
PipelineFragmentContext::_create_operator(ObjectPool* pool, const TPlanNo
         int child_count = tnode.num_children;
         op.reset(new UnionSourceOperatorX(pool, tnode, next_operator_id(), 
descs));
         op->set_followed_by_shuffled_operator(_require_bucket_distribution);
-        RETURN_IF_ERROR(cur_pipe->add_operator(op));
+        RETURN_IF_ERROR(cur_pipe->add_operator(
+                op, request.__isset.parallel_instances ? 
request.parallel_instances : 0));
 
         const auto downstream_pipeline_id = cur_pipe->id();
         if (_dag.find(downstream_pipeline_id) == _dag.end()) {
@@ -1524,7 +1491,8 @@ Status 
PipelineFragmentContext::_create_operator(ObjectPool* pool, const TPlanNo
         } else {
             op.reset(new SortSourceOperatorX(pool, tnode, next_operator_id(), 
descs));
         }
-        RETURN_IF_ERROR(cur_pipe->add_operator(op));
+        RETURN_IF_ERROR(cur_pipe->add_operator(
+                op, request.__isset.parallel_instances ? 
request.parallel_instances : 0));
 
         const auto downstream_pipeline_id = cur_pipe->id();
         if (_dag.find(downstream_pipeline_id) == _dag.end()) {
@@ -1551,7 +1519,8 @@ Status 
PipelineFragmentContext::_create_operator(ObjectPool* pool, const TPlanNo
     }
     case doris::TPlanNodeType::PARTITION_SORT_NODE: {
         op.reset(new PartitionSortSourceOperatorX(pool, tnode, 
next_operator_id(), descs));
-        RETURN_IF_ERROR(cur_pipe->add_operator(op));
+        RETURN_IF_ERROR(cur_pipe->add_operator(
+                op, request.__isset.parallel_instances ? 
request.parallel_instances : 0));
 
         const auto downstream_pipeline_id = cur_pipe->id();
         if (_dag.find(downstream_pipeline_id) == _dag.end()) {
@@ -1569,7 +1538,8 @@ Status 
PipelineFragmentContext::_create_operator(ObjectPool* pool, const TPlanNo
     }
     case TPlanNodeType::ANALYTIC_EVAL_NODE: {
         op.reset(new AnalyticSourceOperatorX(pool, tnode, next_operator_id(), 
descs));
-        RETURN_IF_ERROR(cur_pipe->add_operator(op));
+        RETURN_IF_ERROR(cur_pipe->add_operator(
+                op, request.__isset.parallel_instances ? 
request.parallel_instances : 0));
 
         const auto downstream_pipeline_id = cur_pipe->id();
         if (_dag.find(downstream_pipeline_id) == _dag.end()) {
@@ -1591,58 +1561,62 @@ Status 
PipelineFragmentContext::_create_operator(ObjectPool* pool, const TPlanNo
     }
     case TPlanNodeType::INTERSECT_NODE: {
         RETURN_IF_ERROR(_build_operators_for_set_operation_node<true>(
-                pool, tnode, descs, op, cur_pipe, parent_idx, child_idx));
+                pool, tnode, descs, op, cur_pipe, parent_idx, child_idx, 
request));
         op->set_followed_by_shuffled_operator(_require_bucket_distribution);
         break;
     }
     case TPlanNodeType::EXCEPT_NODE: {
         RETURN_IF_ERROR(_build_operators_for_set_operation_node<false>(
-                pool, tnode, descs, op, cur_pipe, parent_idx, child_idx));
+                pool, tnode, descs, op, cur_pipe, parent_idx, child_idx, 
request));
         op->set_followed_by_shuffled_operator(_require_bucket_distribution);
         break;
     }
     case TPlanNodeType::REPEAT_NODE: {
         op.reset(new RepeatOperatorX(pool, tnode, next_operator_id(), descs));
-        RETURN_IF_ERROR(cur_pipe->add_operator(op));
+        RETURN_IF_ERROR(cur_pipe->add_operator(
+                op, request.__isset.parallel_instances ? 
request.parallel_instances : 0));
         break;
     }
     case TPlanNodeType::TABLE_FUNCTION_NODE: {
         op.reset(new TableFunctionOperatorX(pool, tnode, next_operator_id(), 
descs));
-        RETURN_IF_ERROR(cur_pipe->add_operator(op));
+        RETURN_IF_ERROR(cur_pipe->add_operator(
+                op, request.__isset.parallel_instances ? 
request.parallel_instances : 0));
         break;
     }
     case TPlanNodeType::ASSERT_NUM_ROWS_NODE: {
         op.reset(new AssertNumRowsOperatorX(pool, tnode, next_operator_id(), 
descs));
-        RETURN_IF_ERROR(cur_pipe->add_operator(op));
+        RETURN_IF_ERROR(cur_pipe->add_operator(
+                op, request.__isset.parallel_instances ? 
request.parallel_instances : 0));
         break;
     }
     case TPlanNodeType::EMPTY_SET_NODE: {
         op.reset(new EmptySetSourceOperatorX(pool, tnode, next_operator_id(), 
descs));
-        RETURN_IF_ERROR(cur_pipe->add_operator(op));
+        RETURN_IF_ERROR(cur_pipe->add_operator(
+                op, request.__isset.parallel_instances ? 
request.parallel_instances : 0));
         break;
     }
     case TPlanNodeType::DATA_GEN_SCAN_NODE: {
         op.reset(new DataGenSourceOperatorX(pool, tnode, next_operator_id(), 
descs));
-        RETURN_IF_ERROR(cur_pipe->add_operator(op));
-        if (request.__isset.parallel_instances) {
-            cur_pipe->set_num_tasks(request.parallel_instances);
-            op->set_ignore_data_distribution();
-        }
+        RETURN_IF_ERROR(cur_pipe->add_operator(
+                op, request.__isset.parallel_instances ? 
request.parallel_instances : 0));
         break;
     }
     case TPlanNodeType::SCHEMA_SCAN_NODE: {
         op.reset(new SchemaScanOperatorX(pool, tnode, next_operator_id(), 
descs));
-        RETURN_IF_ERROR(cur_pipe->add_operator(op));
+        RETURN_IF_ERROR(cur_pipe->add_operator(
+                op, request.__isset.parallel_instances ? 
request.parallel_instances : 0));
         break;
     }
     case TPlanNodeType::META_SCAN_NODE: {
         op.reset(new MetaScanOperatorX(pool, tnode, next_operator_id(), 
descs));
-        RETURN_IF_ERROR(cur_pipe->add_operator(op));
+        RETURN_IF_ERROR(cur_pipe->add_operator(
+                op, request.__isset.parallel_instances ? 
request.parallel_instances : 0));
         break;
     }
     case TPlanNodeType::SELECT_NODE: {
         op.reset(new SelectOperatorX(pool, tnode, next_operator_id(), descs));
-        RETURN_IF_ERROR(cur_pipe->add_operator(op));
+        RETURN_IF_ERROR(cur_pipe->add_operator(
+                op, request.__isset.parallel_instances ? 
request.parallel_instances : 0));
         break;
     }
     default:
@@ -1658,9 +1632,11 @@ Status 
PipelineFragmentContext::_create_operator(ObjectPool* pool, const TPlanNo
 template <bool is_intersect>
 Status PipelineFragmentContext::_build_operators_for_set_operation_node(
         ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs, 
OperatorPtr& op,
-        PipelinePtr& cur_pipe, int parent_idx, int child_idx) {
+        PipelinePtr& cur_pipe, int parent_idx, int child_idx,
+        const doris::TPipelineFragmentParams& request) {
     op.reset(new SetSourceOperatorX<is_intersect>(pool, tnode, 
next_operator_id(), descs));
-    RETURN_IF_ERROR(cur_pipe->add_operator(op));
+    RETURN_IF_ERROR(cur_pipe->add_operator(
+            op, request.__isset.parallel_instances ? 
request.parallel_instances : 0));
 
     const auto downstream_pipeline_id = cur_pipe->id();
     if (_dag.find(downstream_pipeline_id) == _dag.end()) {
@@ -1754,8 +1730,7 @@ void PipelineFragmentContext::_close_fragment_instance() {
             _runtime_state->load_channel_profile()->pretty_print(&ss);
         }
 
-        LOG_INFO("Query {} fragment {} profile:\n {}", 
print_id(this->_query_id),
-                 this->_fragment_id, ss.str());
+        LOG_INFO("Query {} fragment {} profile:\n {}", print_id(_query_id), 
_fragment_id, ss.str());
     }
 
     if (_query_ctx->enable_profile()) {
@@ -1779,7 +1754,6 @@ void PipelineFragmentContext::close_a_pipeline(PipelineId 
pipeline_id) {
         }
     }
     std::lock_guard<std::mutex> l(_task_mutex);
-    g_pipeline_tasks_count << -1;
     ++_closed_tasks;
     if (_closed_tasks == _total_tasks) {
         _close_fragment_instance();
@@ -1849,9 +1823,9 @@ PipelineFragmentContext::collect_realtime_profile() const 
{
     // we do not have mutex to protect pipeline_id_to_profile
     // so we need to make sure this funciton is invoked after fragment context
     // has already been prepared.
-    if (!this->_prepared) {
+    if (!_prepared) {
         std::string msg =
-                "Query " + print_id(this->_query_id) + " collecting profile, 
but its not prepared";
+                "Query " + print_id(_query_id) + " collecting profile, but its 
not prepared";
         DCHECK(false) << msg;
         LOG_ERROR(msg);
         return res;
@@ -1872,9 +1846,9 @@ 
PipelineFragmentContext::collect_realtime_load_channel_profile() const {
     // we do not have mutex to protect pipeline_id_to_profile
     // so we need to make sure this funciton is invoked after fragment context
     // has already been prepared.
-    if (!this->_prepared) {
+    if (!_prepared) {
         std::string msg =
-                "Query " + print_id(this->_query_id) + " collecting profile, 
but its not prepared";
+                "Query " + print_id(_query_id) + " collecting profile, but its 
not prepared";
         DCHECK(false) << msg;
         LOG_ERROR(msg);
         return nullptr;
@@ -1882,19 +1856,19 @@ 
PipelineFragmentContext::collect_realtime_load_channel_profile() const {
 
     for (auto& runtime_states : _task_runtime_states) {
         for (auto& runtime_state : runtime_states) {
-            if (runtime_state->runtime_profile() == nullptr) {
+            if (runtime_state == nullptr || runtime_state->runtime_profile() 
== nullptr) {
                 continue;
             }
 
             auto tmp_load_channel_profile = 
std::make_shared<TRuntimeProfileTree>();
 
             
runtime_state->runtime_profile()->to_thrift(tmp_load_channel_profile.get());
-            
this->_runtime_state->load_channel_profile()->update(*tmp_load_channel_profile);
+            
_runtime_state->load_channel_profile()->update(*tmp_load_channel_profile);
         }
     }
 
     auto load_channel_profile = std::make_shared<TRuntimeProfileTree>();
-    
this->_runtime_state->load_channel_profile()->to_thrift(load_channel_profile.get());
+    
_runtime_state->load_channel_profile()->to_thrift(load_channel_profile.get());
     return load_channel_profile;
 }
 
diff --git a/be/src/pipeline/pipeline_fragment_context.h 
b/be/src/pipeline/pipeline_fragment_context.h
index 822a23c54bd..2e75aeb414e 100644
--- a/be/src/pipeline/pipeline_fragment_context.h
+++ b/be/src/pipeline/pipeline_fragment_context.h
@@ -78,9 +78,7 @@ public:
 
     int timeout_second() const { return _timeout; }
 
-    PipelinePtr add_pipeline();
-
-    PipelinePtr add_pipeline(PipelinePtr parent, int idx = -1);
+    PipelinePtr add_pipeline(PipelinePtr parent = nullptr, int idx = -1);
 
     RuntimeState* get_runtime_state() { return _runtime_state.get(); }
 
@@ -123,7 +121,7 @@ public:
                 _tasks[j][i]->stop_if_finished();
             }
         }
-    };
+    }
 
 private:
     Status _build_pipelines(ObjectPool* pool, const 
doris::TPipelineFragmentParams& request,
@@ -142,7 +140,8 @@ private:
     Status _build_operators_for_set_operation_node(ObjectPool* pool, const 
TPlanNode& tnode,
                                                    const DescriptorTbl& descs, 
OperatorPtr& op,
                                                    PipelinePtr& cur_pipe, int 
parent_idx,
-                                                   int child_idx);
+                                                   int child_idx,
+                                                   const 
doris::TPipelineFragmentParams& request);
 
     Status _create_data_sink(ObjectPool* pool, const TDataSink& thrift_sink,
                              const std::vector<TExpr>& output_exprs,
@@ -154,24 +153,19 @@ private:
                                 const std::map<int, int>& 
shuffle_idx_to_instance_idx);
     Status _plan_local_exchange(int num_buckets, int pip_idx, PipelinePtr pip,
                                 const std::map<int, int>& 
bucket_seq_to_instance_idx,
-                                const std::map<int, int>& 
shuffle_idx_to_instance_idx,
-                                const bool ignore_data_distribution);
+                                const std::map<int, int>& 
shuffle_idx_to_instance_idx);
     void _inherit_pipeline_properties(const DataDistribution& 
data_distribution,
                                       PipelinePtr pipe_with_source, 
PipelinePtr pipe_with_sink);
     Status _add_local_exchange(int pip_idx, int idx, int node_id, ObjectPool* 
pool,
                                PipelinePtr cur_pipe, DataDistribution 
data_distribution,
                                bool* do_local_exchange, int num_buckets,
                                const std::map<int, int>& 
bucket_seq_to_instance_idx,
-                               const std::map<int, int>& 
shuffle_idx_to_instance_idx,
-                               const bool ignore_data_distribution);
+                               const std::map<int, int>& 
shuffle_idx_to_instance_idx);
     Status _add_local_exchange_impl(int idx, ObjectPool* pool, PipelinePtr 
cur_pipe,
                                     PipelinePtr new_pip, DataDistribution 
data_distribution,
                                     bool* do_local_exchange, int num_buckets,
                                     const std::map<int, int>& 
bucket_seq_to_instance_idx,
-                                    const std::map<int, int>& 
shuffle_idx_to_instance_idx,
-                                    const bool ignore_data_hash_distribution);
-
-    bool _enable_local_shuffle() const { return 
_runtime_state->enable_local_shuffle(); }
+                                    const std::map<int, int>& 
shuffle_idx_to_instance_idx);
 
     Status _build_pipeline_tasks(const doris::TPipelineFragmentParams& request,
                                  ThreadPool* thread_pool);
@@ -206,7 +200,7 @@ private:
     RuntimeProfile::Counter* _prepare_timer = nullptr;
     RuntimeProfile::Counter* _init_context_timer = nullptr;
     RuntimeProfile::Counter* _build_pipelines_timer = nullptr;
-    RuntimeProfile::Counter* _plan_local_shuffle_timer = nullptr;
+    RuntimeProfile::Counter* _plan_local_exchanger_timer = nullptr;
     RuntimeProfile::Counter* _prepare_all_pipelines_timer = nullptr;
     RuntimeProfile::Counter* _build_tasks_timer = nullptr;
 
@@ -228,6 +222,7 @@ private:
     int _num_instances = 1;
 
     int _timeout = -1;
+    bool _use_serial_source = false;
 
     OperatorPtr _root_op = nullptr;
     // this is a [n * m] matrix. n is parallelism of pipeline engine and m is 
the number of pipelines.
@@ -290,7 +285,20 @@ private:
     //    - _task_runtime_states is at the task level, unique to each task.
 
     std::vector<TUniqueId> _fragment_instance_ids;
-    // Local runtime states for each task
+    /**
+     * Local runtime states for each task.
+     *
+     * 2-D matrix:
+     * +-------------------------+------------+-------+
+     * |            | Instance 0 | Instance 1 |  ...  |
+     * +------------+------------+------------+-------+
+     * | Pipeline 0 |  task 0-0  |  task 0-1  |  ...  |
+     * +------------+------------+------------+-------+
+     * | Pipeline 1 |  task 1-0  |  task 1-1  |  ...  |
+     * +------------+------------+------------+-------+
+     * | ...                                          |
+     * +--------------------------------------+-------+
+     */
     std::vector<std::vector<std::unique_ptr<RuntimeState>>> 
_task_runtime_states;
 
     std::vector<std::unique_ptr<RuntimeFilterParamsContext>> 
_runtime_filter_states;
diff --git a/be/src/vec/aggregate_functions/aggregate_function_window.h 
b/be/src/vec/aggregate_functions/aggregate_function_window.h
index 92e22c895c4..517871e2fb6 100644
--- a/be/src/vec/aggregate_functions/aggregate_function_window.h
+++ b/be/src/vec/aggregate_functions/aggregate_function_window.h
@@ -41,15 +41,10 @@
 #include "vec/core/types.h"
 #include "vec/data_types/data_type_number.h"
 
-namespace doris {
-namespace vectorized {
+namespace doris::vectorized {
 class Arena;
 class BufferReadable;
 class BufferWritable;
-} // namespace vectorized
-} // namespace doris
-
-namespace doris::vectorized {
 
 struct RowNumberData {
     int64_t count = 0;
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/planner/AggregationNode.java 
b/fe/fe-core/src/main/java/org/apache/doris/planner/AggregationNode.java
index 4dca9384d65..446f49c3782 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/AggregationNode.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/AggregationNode.java
@@ -488,6 +488,12 @@ public class AggregationNode extends PlanNode {
         }
     }
 
+    // If `GroupingExprs` is empty and agg need to finalize, the result must 
be output by single instance
+    @Override
+    public boolean isSerialOperator() {
+        return aggInfo.getGroupingExprs().isEmpty() && needsFinalize;
+    }
+
     public void setColocate(boolean colocate) {
         isColocate = colocate;
     }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/planner/AnalyticEvalNode.java 
b/fe/fe-core/src/main/java/org/apache/doris/planner/AnalyticEvalNode.java
index cdbf827aed9..7b5998717a2 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/AnalyticEvalNode.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/AnalyticEvalNode.java
@@ -296,4 +296,15 @@ public class AnalyticEvalNode extends PlanNode {
 
         return output.toString();
     }
+
+    /**
+     * If `partitionExprs` is empty, the result must be output by single 
instance.
+     *
+     * For example, for `window (colA order by colB)`,
+     * all data should be input in this node to ensure the global ordering by 
colB.
+     */
+    @Override
+    public boolean isSerialOperator() {
+        return partitionExprs.isEmpty();
+    }
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/planner/AssertNumRowsNode.java 
b/fe/fe-core/src/main/java/org/apache/doris/planner/AssertNumRowsNode.java
index 57d9ce8742f..a4c4aa42c65 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/AssertNumRowsNode.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/AssertNumRowsNode.java
@@ -116,4 +116,9 @@ public class AssertNumRowsNode extends PlanNode {
     public int getNumInstances() {
         return 1;
     }
+
+    @Override
+    public boolean isSerialOperator() {
+        return true;
+    }
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/planner/DataPartition.java 
b/fe/fe-core/src/main/java/org/apache/doris/planner/DataPartition.java
index 9c6ba83408a..ce57a57c377 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/DataPartition.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/DataPartition.java
@@ -90,6 +90,10 @@ public class DataPartition {
         return type == TPartitionType.BUCKET_SHFFULE_HASH_PARTITIONED;
     }
 
+    public boolean isTabletSinkShufflePartition() {
+        return type == TPartitionType.TABLET_SINK_SHUFFLE_PARTITIONED;
+    }
+
     public TPartitionType getType() {
         return type;
     }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/planner/EmptySetNode.java 
b/fe/fe-core/src/main/java/org/apache/doris/planner/EmptySetNode.java
index 867c220d9fe..e262797a4fb 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/EmptySetNode.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/EmptySetNode.java
@@ -80,5 +80,4 @@ public class EmptySetNode extends PlanNode {
     public int getNumInstances() {
         return 1;
     }
-
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/planner/ExchangeNode.java 
b/fe/fe-core/src/main/java/org/apache/doris/planner/ExchangeNode.java
index 4ada9a82f7c..97d46b109b7 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/ExchangeNode.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/ExchangeNode.java
@@ -202,4 +202,33 @@ public class ExchangeNode extends PlanNode {
     public void setRightChildOfBroadcastHashJoin(boolean value) {
         isRightChildOfBroadcastHashJoin = value;
     }
+
+    /**
+     * If table `t1` has unique key `k1` and value column `v1`.
+     * Now use plan below to load data into `t1`:
+     * ```
+     * FRAGMENT 0:
+     *  Merging Exchange (id = 1)
+     *   NL Join (id = 2)
+     *  DataStreamSender (id = 3, dst_id = 3) (TABLET_SINK_SHUFFLE_PARTITIONED)
+     *
+     * FRAGMENT 1:
+     *  Exchange (id = 3)
+     *  OlapTableSink (id = 4) ```
+     *
+     * In this plan, `Exchange (id = 1)` needs to do merge sort using column 
`k1` and `v1` so parallelism
+     * of FRAGMENT 0 must be 1 and data will be shuffled to FRAGMENT 1 which 
also has only 1 instance
+     * because this loading job relies on the global ordering of column `k1` 
and `v1`.
+     *
+     * So FRAGMENT 0 should not use serial source.
+     */
+    @Override
+    public boolean isSerialOperator() {
+        return partitionType == TPartitionType.UNPARTITIONED && mergeInfo != 
null;
+    }
+
+    @Override
+    public boolean hasSerialChildren() {
+        return isSerialOperator();
+    }
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/planner/JoinNodeBase.java 
b/fe/fe-core/src/main/java/org/apache/doris/planner/JoinNodeBase.java
index 91a3c26e770..5dc81e29d85 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/JoinNodeBase.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/JoinNodeBase.java
@@ -597,7 +597,6 @@ public abstract class JoinNodeBase extends PlanNode {
         this.useSpecificProjections = useSpecificProjections;
     }
 
-
     public boolean isUseSpecificProjections() {
         return useSpecificProjections;
     }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/planner/NestedLoopJoinNode.java 
b/fe/fe-core/src/main/java/org/apache/doris/planner/NestedLoopJoinNode.java
index 30c0a2d0394..e2a7504a98d 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/NestedLoopJoinNode.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/NestedLoopJoinNode.java
@@ -281,4 +281,19 @@ public class NestedLoopJoinNode extends JoinNodeBase {
         }
         return output.toString();
     }
+
+    /**
+     * If joinOp is one of type below:
+     * 1. RIGHT_OUTER_JOIN
+     * 2. RIGHT_ANTI_JOIN
+     * 3. RIGHT_SEMI_JOIN
+     * 4. FULL_OUTER_JOIN
+     *
+     * Probe-side must have full data so join is a serial operator.
+     */
+    @Override
+    public boolean isSerialOperator() {
+        return joinOp == JoinOperator.RIGHT_OUTER_JOIN || joinOp == 
JoinOperator.RIGHT_ANTI_JOIN
+                || joinOp == JoinOperator.RIGHT_SEMI_JOIN || joinOp == 
JoinOperator.FULL_OUTER_JOIN;
+    }
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/planner/PlanFragment.java 
b/fe/fe-core/src/main/java/org/apache/doris/planner/PlanFragment.java
index ae1d34308a3..0ebd023ed41 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/PlanFragment.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/PlanFragment.java
@@ -162,6 +162,7 @@ public class PlanFragment extends TreeNode<PlanFragment> {
     public Optional<NereidsSpecifyInstances<ScanSource>> specifyInstances = 
Optional.empty();
 
     public TQueryCacheParam queryCacheParam;
+    private int numBackends = 0;
 
     /**
      * C'tor for fragment with specific partition; the output is by default 
broadcast.
@@ -502,4 +503,23 @@ public class PlanFragment extends TreeNode<PlanFragment> {
     public boolean hasNullAwareLeftAntiJoin() {
         return planRoot.isNullAwareLeftAntiJoin();
     }
+
+    public boolean useSerialSource(ConnectContext context) {
+        return context != null
+                && 
context.getSessionVariable().isIgnoreStorageDataDistribution()
+                && queryCacheParam == null
+                && !hasNullAwareLeftAntiJoin()
+                // If planRoot is not a serial operator and has serial 
children, we can use serial source and improve
+                // parallelism of non-serial operators.
+                && sink instanceof DataStreamSink && 
!planRoot.isSerialOperator()
+                && planRoot.hasSerialChildren();
+    }
+
+    public int getNumBackends() {
+        return numBackends;
+    }
+
+    public void setNumBackends(int numBackends) {
+        this.numBackends = numBackends;
+    }
 }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/PlanNode.java 
b/fe/fe-core/src/main/java/org/apache/doris/planner/PlanNode.java
index 1e9d5646939..14bd34e93e1 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/PlanNode.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/PlanNode.java
@@ -41,6 +41,7 @@ import org.apache.doris.common.Pair;
 import org.apache.doris.common.TreeNode;
 import org.apache.doris.common.UserException;
 import org.apache.doris.planner.normalize.Normalizer;
+import org.apache.doris.qe.ConnectContext;
 import org.apache.doris.statistics.PlanStats;
 import org.apache.doris.statistics.StatisticalType;
 import org.apache.doris.statistics.StatsDeriveResult;
@@ -639,6 +640,7 @@ public abstract class PlanNode extends TreeNode<PlanNode> 
implements PlanStats {
         TPlanNode msg = new TPlanNode();
         msg.node_id = id.asInt();
         msg.setNereidsId(nereidsId);
+        msg.setIsSerialOperator(isSerialOperator() && 
fragment.useSerialSource(ConnectContext.get()));
         msg.num_children = children.size();
         msg.limit = limit;
         for (TupleId tid : tupleIds) {
@@ -1374,4 +1376,16 @@ public abstract class PlanNode extends 
TreeNode<PlanNode> implements PlanStats {
             return true;
         });
     }
+
+    // Operators need to be executed serially. (e.g. finalized agg without key)
+    public boolean isSerialOperator() {
+        return false;
+    }
+
+    public boolean hasSerialChildren() {
+        if (children.isEmpty()) {
+            return isSerialOperator();
+        }
+        return children.stream().allMatch(PlanNode::hasSerialChildren);
+    }
 }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/RepeatNode.java 
b/fe/fe-core/src/main/java/org/apache/doris/planner/RepeatNode.java
index 3c6a88cea08..2bc4e847ac3 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/RepeatNode.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/RepeatNode.java
@@ -200,4 +200,10 @@ public class RepeatNode extends PlanNode {
         }
         return output.toString();
     }
+
+    // Determined by its child.
+    @Override
+    public boolean isSerialOperator() {
+        return children.get(0).isSerialOperator();
+    }
 }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/ScanNode.java 
b/fe/fe-core/src/main/java/org/apache/doris/planner/ScanNode.java
index 50b0f5a0269..ea5d27bb17f 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/ScanNode.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/ScanNode.java
@@ -836,4 +836,11 @@ public abstract class ScanNode extends PlanNode implements 
SplitGenerator {
     public long getSelectedSplitNum() {
         return selectedSplitNum;
     }
+
+    @Override
+    public boolean isSerialOperator() {
+        return numScanBackends() <= 0 || getScanRangeNum()
+                < 
ConnectContext.get().getSessionVariable().getParallelExecInstanceNum() * 
numScanBackends()
+                || (ConnectContext.get() != null && 
ConnectContext.get().getSessionVariable().isForceToLocalShuffle());
+    }
 }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/SelectNode.java 
b/fe/fe-core/src/main/java/org/apache/doris/planner/SelectNode.java
index 6c6b665b00a..734e9338352 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/SelectNode.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/SelectNode.java
@@ -109,4 +109,10 @@ public class SelectNode extends PlanNode {
         }
         return output.toString();
     }
+
+    // Determined by its child.
+    @Override
+    public boolean isSerialOperator() {
+        return children.get(0).isSerialOperator();
+    }
 }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/SortNode.java 
b/fe/fe-core/src/main/java/org/apache/doris/planner/SortNode.java
index e3c405bcbab..e3eb08c3e75 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/SortNode.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/SortNode.java
@@ -389,6 +389,12 @@ public class SortNode extends PlanNode {
         return new HashSet<>(result);
     }
 
+    // If it's analytic sort or not merged by a followed exchange node, it 
must output the global ordered data.
+    @Override
+    public boolean isSerialOperator() {
+        return !isAnalyticSort && !mergeByexchange;
+    }
+
     public void setColocate(boolean colocate) {
         isColocate = colocate;
     }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/UnionNode.java 
b/fe/fe-core/src/main/java/org/apache/doris/planner/UnionNode.java
index 40982d07e77..ac66ce718ec 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/UnionNode.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/UnionNode.java
@@ -42,4 +42,11 @@ public class UnionNode extends SetOperationNode {
     protected void toThrift(TPlanNode msg) {
         toThrift(msg, TPlanNodeType.UNION_NODE);
     }
+
+    // If it is a union without children which means it will output some 
constant values, we should use a serial union
+    // to output non-duplicated data.
+    @Override
+    public boolean isSerialOperator() {
+        return children.isEmpty();
+    }
 }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
index b9f90242a29..9829f88cf52 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
@@ -1777,6 +1777,19 @@ public class Coordinator implements CoordInterface {
                 FInstanceExecParam instanceParam = new 
FInstanceExecParam(null, execHostport,
                         0, params);
                 params.instanceExecParams.add(instanceParam);
+
+                // Using serial source means a serial source operator will be 
used in this fragment (e.g. data will be
+                // shuffled to only 1 exchange operator) and then splitted by 
followed local exchanger
+                int expectedInstanceNum = fragment.getParallelExecNum();
+                boolean useSerialSource = fragment.useSerialSource(context);
+                if (useSerialSource) {
+                    for (int j = 1; j < expectedInstanceNum; j++) {
+                        params.instanceExecParams.add(new FInstanceExecParam(
+                                null, execHostport, 0, params));
+                    }
+                    params.ignoreDataDistribution = true;
+                    params.parallelTasksNum = 1;
+                }
                 continue;
             }
 
@@ -1806,6 +1819,9 @@ public class Coordinator implements CoordInterface {
                 if (leftMostNode.getNumInstances() == 1) {
                     exchangeInstances = 1;
                 }
+                // Using serial source means a serial source operator will be 
used in this fragment (e.g. data will be
+                // shuffled to only 1 exchange operator) and then splitted by 
followed local exchanger
+                boolean useSerialSource = fragment.useSerialSource(context);
                 if (exchangeInstances > 0 && 
fragmentExecParamsMap.get(inputFragmentId)
                         .instanceExecParams.size() > exchangeInstances) {
                     // random select some instance
@@ -1823,12 +1839,16 @@ public class Coordinator implements CoordInterface {
                                 hosts.get(index % hosts.size()), 0, params);
                         params.instanceExecParams.add(instanceParam);
                     }
+                    params.ignoreDataDistribution = useSerialSource;
+                    params.parallelTasksNum = useSerialSource ? 1 : 
params.instanceExecParams.size();
                 } else {
                     for (FInstanceExecParam execParams
                             : 
fragmentExecParamsMap.get(inputFragmentId).instanceExecParams) {
                         FInstanceExecParam instanceParam = new 
FInstanceExecParam(null, execParams.host, 0, params);
                         params.instanceExecParams.add(instanceParam);
                     }
+                    params.ignoreDataDistribution = useSerialSource;
+                    params.parallelTasksNum = useSerialSource ? 1 : 
params.instanceExecParams.size();
                 }
 
                 // When group by cardinality is smaller than number of 
backend, only some backends always
@@ -1874,12 +1894,8 @@ public class Coordinator implements CoordInterface {
                         boolean sharedScan = true;
                         int expectedInstanceNum = 
Math.min(parallelExecInstanceNum,
                                 leftMostNode.getNumInstances());
-                        boolean forceToLocalShuffle = context != null
-                                && 
context.getSessionVariable().isForceToLocalShuffle()
-                                && !fragment.hasNullAwareLeftAntiJoin() && 
useNereids;
-                        boolean ignoreStorageDataDistribution = 
(forceToLocalShuffle || (node.isPresent()
-                                && 
node.get().ignoreStorageDataDistribution(context, addressToBackendID.size())
-                                && useNereids)) && fragment.queryCacheParam == 
null;
+                        boolean ignoreStorageDataDistribution = 
node.isPresent()
+                                && fragment.useSerialSource(context);
                         if (node.isPresent() && ignoreStorageDataDistribution) 
{
                             expectedInstanceNum = 
Math.max(expectedInstanceNum, 1);
                             // if have limit and no conjuncts, only need 1 
instance to save cpu and
@@ -2726,14 +2742,7 @@ public class Coordinator implements CoordInterface {
          * 1. `parallelExecInstanceNum * numBackends` is larger than scan 
ranges.
          * 2. Use Nereids planner.
          */
-        boolean forceToLocalShuffle = context != null
-                && context.getSessionVariable().isForceToLocalShuffle() && 
!hasNullAwareLeftAntiJoin && useNereids;
-        boolean ignoreStorageDataDistribution = (forceToLocalShuffle || 
(scanNodes.stream()
-                .allMatch(node -> node.ignoreStorageDataDistribution(context,
-                        addressToBackendID.size()))
-                && 
addressToScanRanges.entrySet().stream().allMatch(addressScanRange -> {
-                    return addressScanRange.getValue().size() < 
parallelExecInstanceNum;
-                }) && useNereids)) && params.fragment.queryCacheParam == null;
+        boolean ignoreStorageDataDistribution = params.fragment != null && 
params.fragment.useSerialSource(context);
 
         FragmentScanRangeAssignment assignment = params.scanRangeAssignment;
         for (Map.Entry<TNetworkAddress, List<Pair<Integer, Map<Integer, 
List<TScanRangeParams>>>>> addressScanRange
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
index 89a18589f35..dee675a30d5 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
@@ -4291,7 +4291,7 @@ public class SessionVariable implements Serializable, 
Writable {
     }
 
     public boolean isIgnoreStorageDataDistribution() {
-        return ignoreStorageDataDistribution && enableLocalShuffle;
+        return ignoreStorageDataDistribution && enableLocalShuffle && 
enableNereidsPlanner;
     }
 
     public void setIgnoreStorageDataDistribution(boolean 
ignoreStorageDataDistribution) {
@@ -4329,7 +4329,7 @@ public class SessionVariable implements Serializable, 
Writable {
     }
 
     public boolean isForceToLocalShuffle() {
-        return enableLocalShuffle && forceToLocalShuffle;
+        return enableLocalShuffle && forceToLocalShuffle && 
enableNereidsPlanner;
     }
 
     public void setForceToLocalShuffle(boolean forceToLocalShuffle) {
diff --git a/gensrc/thrift/PlanNodes.thrift b/gensrc/thrift/PlanNodes.thrift
index 1fcf5aff254..3f557bea711 100644
--- a/gensrc/thrift/PlanNodes.thrift
+++ b/gensrc/thrift/PlanNodes.thrift
@@ -1364,6 +1364,7 @@ struct TPlanNode {
   49: optional i64 push_down_count
 
   50: optional list<list<Exprs.TExpr>> distribute_expr_lists
+  51: optional bool is_serial_operator
   // projections is final projections, which means projecting into results and 
materializing them into the output block.
   101: optional list<Exprs.TExpr> projections
   102: optional Types.TTupleId output_tuple_id
diff --git 
a/regression-test/suites/nereids_p0/insert_into_table/complex_insert.groovy 
b/regression-test/suites/nereids_p0/insert_into_table/complex_insert.groovy
index 2493a7df5de..049cbe0b4d7 100644
--- a/regression-test/suites/nereids_p0/insert_into_table/complex_insert.groovy
+++ b/regression-test/suites/nereids_p0/insert_into_table/complex_insert.groovy
@@ -177,15 +177,15 @@ suite('complex_insert') {
 
     sql 'insert into t1(id, c1, c2, c3) select id, c1 * 2, c2, c3 from t1'
     sql 'sync'
-    qt_sql_1 'select * from t1, t2, t3 order by t1.id, t1.id1, t2.id, t3.id'
+    qt_sql_1 'select * from t1, t2, t3 order by t1.id, t1.id1, t2.id, t2.c1, 
t3.id'
 
     sql 'insert into t2(id, c1, c2, c3) select id, c1, c2 * 2, c3 from t2'
     sql 'sync'
-    qt_sql_2 'select * from t1, t2, t3 order by t1.id, t1.id1, t2.id, t3.id'
+    qt_sql_2 'select * from t1, t2, t3 order by t1.id, t1.id1, t2.id, t2.c1, 
t3.id'
 
     sql 'insert into t2(c1, c3) select c1 + 1, c3 + 1 from (select id, c1, c3 
from t1 order by id, c1 limit 10) t1, t3'
     sql 'sync'
-    qt_sql_3 'select * from t1, t2, t3 order by t1.id, t1.id1, t2.id, t3.id'
+    qt_sql_3 'select * from t1, t2, t3 order by t1.id, t1.id1, t2.id, t2.c1, 
t3.id'
 
     sql 'drop table if exists agg_have_dup_base'
 
diff --git 
a/regression-test/suites/nereids_syntax_p0/distribute/local_shuffle.groovy 
b/regression-test/suites/nereids_syntax_p0/distribute/local_shuffle.groovy
index 997230b1a06..d701ad890d6 100644
--- a/regression-test/suites/nereids_syntax_p0/distribute/local_shuffle.groovy
+++ b/regression-test/suites/nereids_syntax_p0/distribute/local_shuffle.groovy
@@ -45,14 +45,14 @@ suite("local_shuffle") {
         insert into test_local_shuffle1 values (1, 1), (2, 2);
         insert into test_local_shuffle2 values (2, 2), (3, 3);
         
-        set enable_nereids_distribute_planner=true;
+        // set enable_nereids_distribute_planner=true;
         set enable_pipeline_x_engine=true;
         set disable_join_reorder=true;
         set enable_local_shuffle=true;
         set force_to_local_shuffle=true;
         """
 
-    order_qt_read_single_olap_table "select * from test_local_shuffle1"
+    order_qt_read_single_olap_table "select * from test_local_shuffle1 order 
by id, id2"
 
     order_qt_broadcast_join """
         select *
@@ -96,7 +96,7 @@ suite("local_shuffle") {
         ) a
         right outer join [shuffle]
         test_local_shuffle2
-        on a.id=test_local_shuffle2.id2
+        on a.id=test_local_shuffle2.id2 order by test_local_shuffle2.id, 
test_local_shuffle2.id2
         """
 
     order_qt_bucket_shuffle_with_prune_tablets2 """
@@ -109,7 +109,7 @@ suite("local_shuffle") {
             from test_local_shuffle1
             where id=1
         ) a
-        on a.id=test_local_shuffle2.id2
+        on a.id=test_local_shuffle2.id2 order by test_local_shuffle2.id, 
test_local_shuffle2.id2
         """
 
     order_qt_bucket_shuffle_with_prune_tablets3 """
@@ -150,11 +150,11 @@ suite("local_shuffle") {
         """
 
     order_qt_fillup_bucket """
-            SELECT cast(a.c0 as int), cast(b.c0 as int) FROM
+            SELECT cast(a.c0 as int), cast(b.c0 as int) res FROM
             (select * from test_local_shuffle3 where c0 =1)a
             RIGHT OUTER JOIN
             (select * from test_local_shuffle4)b
-            ON a.c0 = b.c0
+            ON a.c0 = b.c0 order by res
             """
 
     multi_sql """
@@ -182,6 +182,6 @@ suite("local_shuffle") {
             ) a
             inner join [shuffle]
             test_shuffle_left_with_local_shuffle b
-            on a.id2=b.id;
+            on a.id2=b.id order by a.id2;
         """
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to