This is an automated email from the ASF dual-hosted git repository.

huajianlan pushed a commit to branch local-exchange-planner
in repository https://gitbox.apache.org/repos/asf/doris.git

commit b8faee6a900f7459f4eeb86e65c36c7d8dad878e
Author: 924060929 <[email protected]>
AuthorDate: Tue Jan 13 17:42:18 2026 +0800

    local exchange planner
---
 .../doris/datasource/tvf/source/TVFScanNode.java   |  11 +
 .../org/apache/doris/nereids/NereidsPlanner.java   |   7 +-
 .../glue/translator/PhysicalPlanTranslator.java    |  71 +++++-
 .../apache/doris/nereids/trees/plans/PlanType.java |   1 +
 .../org/apache/doris/planner/AddLocalExchange.java |  67 +++++
 .../org/apache/doris/planner/AggregationNode.java  |  55 +++++
 .../org/apache/doris/planner/AnalyticEvalNode.java |  37 +++
 .../apache/doris/planner/AssertNumRowsNode.java    |  27 ++
 .../java/org/apache/doris/planner/DataSink.java    |   5 +
 .../org/apache/doris/planner/DataStreamSink.java   |  14 ++
 .../org/apache/doris/planner/ExchangeNode.java     |  16 ++
 .../org/apache/doris/planner/HashJoinNode.java     |  62 +++++
 .../apache/doris/planner/LocalExchangeNode.java    | 271 +++++++++++++++++++++
 .../apache/doris/planner/NestedLoopJoinNode.java   |  57 +++++
 .../org/apache/doris/planner/OlapScanNode.java     |  25 ++
 .../java/org/apache/doris/planner/PlanNode.java    |  58 ++++-
 .../org/apache/doris/planner/SetOperationNode.java |  61 +++++
 .../java/org/apache/doris/planner/SortNode.java    |  37 +++
 .../java/org/apache/doris/qe/SessionVariable.java  |  18 ++
 gensrc/thrift/PaloInternalService.thrift           |   3 +
 gensrc/thrift/Partitions.thrift                    |  33 +++
 gensrc/thrift/PlanNodes.thrift                     |  22 +-
 22 files changed, 942 insertions(+), 16 deletions(-)

diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/tvf/source/TVFScanNode.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/tvf/source/TVFScanNode.java
index d3c891c1b4c..a47006b6467 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/tvf/source/TVFScanNode.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/tvf/source/TVFScanNode.java
@@ -23,6 +23,7 @@ import org.apache.doris.catalog.FunctionGenTable;
 import org.apache.doris.catalog.TableIf;
 import org.apache.doris.common.DdlException;
 import org.apache.doris.common.MetaNotFoundException;
+import org.apache.doris.common.Pair;
 import org.apache.doris.common.UserException;
 import org.apache.doris.common.util.LocationPath;
 import org.apache.doris.common.util.Util;
@@ -31,6 +32,10 @@ import org.apache.doris.datasource.FileSplit;
 import org.apache.doris.datasource.FileSplit.FileSplitCreator;
 import org.apache.doris.datasource.FileSplitter;
 import org.apache.doris.datasource.TableFormatType;
+import org.apache.doris.nereids.glue.translator.PlanTranslatorContext;
+import org.apache.doris.planner.LocalExchangeNode.LocalExchangeType;
+import org.apache.doris.planner.LocalExchangeNode.LocalExchangeTypeRequire;
+import org.apache.doris.planner.PlanNode;
 import org.apache.doris.planner.PlanNodeId;
 import org.apache.doris.qe.SessionVariable;
 import org.apache.doris.spi.Split;
@@ -199,4 +204,10 @@ public class TVFScanNode extends FileQueryScanNode {
     public int getNumInstances() {
         return scanRangeLocations.size();
     }
+
+    @Override
+    public Pair<PlanNode, LocalExchangeType> 
enforceAndDeriveLocalExchange(PlanTranslatorContext translatorContext,
+            PlanNode parent, LocalExchangeTypeRequire parentRequire) {
+        return Pair.of(this, LocalExchangeType.PASSTHROUGH);
+    }
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/NereidsPlanner.java 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/NereidsPlanner.java
index 364de658d12..c02f2935d64 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/nereids/NereidsPlanner.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/NereidsPlanner.java
@@ -75,6 +75,7 @@ import 
org.apache.doris.nereids.trees.plans.physical.PhysicalPlan;
 import org.apache.doris.nereids.trees.plans.physical.PhysicalRelation;
 import org.apache.doris.nereids.trees.plans.physical.PhysicalSqlCache;
 import org.apache.doris.nereids.trees.plans.physical.TopnFilter;
+import org.apache.doris.planner.AddLocalExchange;
 import org.apache.doris.planner.PlanFragment;
 import org.apache.doris.planner.PlanNodeId;
 import org.apache.doris.planner.Planner;
@@ -623,7 +624,11 @@ public class NereidsPlanner extends Planner {
         scanNodeList.addAll(planTranslatorContext.getScanNodes());
         physicalRelations.addAll(planTranslatorContext.getPhysicalRelations());
         descTable = planTranslatorContext.getDescTable();
-        fragments = new ArrayList<>(planTranslatorContext.getPlanFragments());
+        List<PlanFragment> planFragments = 
planTranslatorContext.getPlanFragments();
+        if (sessionVariable.isEnableLocalShufflePlanner()) {
+            new AddLocalExchange().addLocalExchange(planFragments, 
planTranslatorContext);
+        }
+        fragments = new ArrayList<>(planFragments);
 
         boolean enableQueryCache = sessionVariable.getEnableQueryCache();
         for (int seq = 0; seq < fragments.size(); seq++) {
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java
index 3f87d1a856b..9ea5fcdf637 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java
@@ -343,7 +343,7 @@ public class PhysicalPlanTranslator extends 
DefaultPlanVisitor<PlanFragment, Pla
             PlanTranslatorContext context) {
         Plan upstream = distribute.child(); // now they're in one fragment but 
will be split by ExchangeNode.
         PlanFragment upstreamFragment = upstream.accept(this, context);
-        List<List<Expr>> upstreamDistributeExprs = 
getDistributeExprs(upstream);
+        List<List<Expr>> upstreamDistributeExprs = 
getChildrenDistributeExprs(upstream);
 
         DistributionSpec targetDistribution = distribute.getDistributionSpec();
 
@@ -391,6 +391,7 @@ public class PhysicalPlanTranslator extends 
DefaultPlanVisitor<PlanFragment, Pla
         // target data partition
         DataPartition targetDataPartition = 
toDataPartition(targetDistribution, validOutputIds, context);
         exchangeNode.setPartitionType(targetDataPartition.getType());
+        exchangeNode.setDistributeExprLists(getDistributeExpr(distribute));
         exchangeNode.setChildrenDistributeExprLists(upstreamDistributeExprs);
         // its source partition is targetDataPartition. and outputPartition is 
UNPARTITIONED now, will be set when
         // visit its SinkNode
@@ -695,6 +696,7 @@ public class PhysicalPlanTranslator extends 
DefaultPlanVisitor<PlanFragment, Pla
             
fileScan.getTableSnapshot().ifPresent(fileQueryScanNode::setQueryTableSnapshot);
             
fileScan.getScanParams().ifPresent(fileQueryScanNode::setScanParams);
         }
+        scanNode.setDistributeExprLists(getDistributeExpr(fileScan));
         return getPlanFragmentForPhysicalFileScan(fileScan, context, scanNode, 
table, tupleDescriptor);
     }
 
@@ -715,6 +717,7 @@ public class PhysicalPlanTranslator extends 
DefaultPlanVisitor<PlanFragment, Pla
         PlanFragment planFragment = createPlanFragment(emptySetNode,
                 DataPartition.UNPARTITIONED, emptyRelation);
         context.addPlanFragment(planFragment);
+        emptySetNode.setDistributeExprLists(getDistributeExpr(emptyRelation));
         updateLegacyPlanIdToPhysicalPlan(planFragment.getPlanRoot(), 
emptyRelation);
         return planFragment;
     }
@@ -748,6 +751,7 @@ public class PhysicalPlanTranslator extends 
DefaultPlanVisitor<PlanFragment, Pla
         PlanFragment planFragment = new PlanFragment(context.nextFragmentId(), 
esScanNode, dataPartition);
         context.addPlanFragment(planFragment);
         updateLegacyPlanIdToPhysicalPlan(planFragment.getPlanRoot(), esScan);
+        esScanNode.setDistributeExprLists(getDistributeExpr(esScan));
         return planFragment;
     }
 
@@ -776,6 +780,7 @@ public class PhysicalPlanTranslator extends 
DefaultPlanVisitor<PlanFragment, Pla
         }
         HudiScanNode hudiScanNode = (HudiScanNode) scanNode;
         hudiScanNode.setSelectedPartitions(fileScan.getSelectedPartitions());
+        hudiScanNode.setDistributeExprLists(getDistributeExpr(hudiScan));
         return getPlanFragmentForPhysicalFileScan(fileScan, context, scanNode, 
table, tupleDescriptor);
     }
 
@@ -801,6 +806,7 @@ public class PhysicalPlanTranslator extends 
DefaultPlanVisitor<PlanFragment, Pla
         DataPartition dataPartition = DataPartition.RANDOM;
         PlanFragment planFragment = createPlanFragment(scanNode, 
dataPartition, fileScan);
         context.addPlanFragment(planFragment);
+        scanNode.setDistributeExprLists(getDistributeExpr(fileScan));
         updateLegacyPlanIdToPhysicalPlan(planFragment.getPlanRoot(), fileScan);
         return planFragment;
     }
@@ -820,6 +826,7 @@ public class PhysicalPlanTranslator extends 
DefaultPlanVisitor<PlanFragment, Pla
         DataPartition dataPartition = DataPartition.RANDOM;
         PlanFragment planFragment = new PlanFragment(context.nextFragmentId(), 
jdbcScanNode, dataPartition);
         context.addPlanFragment(planFragment);
+        jdbcScanNode.setDistributeExprLists(getDistributeExpr(jdbcScan));
         updateLegacyPlanIdToPhysicalPlan(planFragment.getPlanRoot(), jdbcScan);
         return planFragment;
     }
@@ -839,6 +846,7 @@ public class PhysicalPlanTranslator extends 
DefaultPlanVisitor<PlanFragment, Pla
         DataPartition dataPartition = DataPartition.RANDOM;
         PlanFragment planFragment = new PlanFragment(context.nextFragmentId(), 
odbcScanNode, dataPartition);
         context.addPlanFragment(planFragment);
+        odbcScanNode.setDistributeExprLists(getDistributeExpr(odbcScan));
         updateLegacyPlanIdToPhysicalPlan(planFragment.getPlanRoot(), odbcScan);
         return planFragment;
     }
@@ -874,6 +882,7 @@ public class PhysicalPlanTranslator extends 
DefaultPlanVisitor<PlanFragment, Pla
         OlapScanNode olapScanNode = new OlapScanNode(context.nextPlanNodeId(), 
tupleDescriptor, "OlapScanNode");
         olapScanNode.setNereidsId(olapScan.getId());
         context.getNereidsIdToPlanNodeIdMap().put(olapScan.getId(), 
olapScanNode.getId());
+        olapScanNode.setDistributeExprLists(getDistributeExpr(olapScan));
 
         // translate score topn info
         if (!olapScan.getScoreOrderKeys().isEmpty()) {
@@ -1007,6 +1016,7 @@ public class PhysicalPlanTranslator extends 
DefaultPlanVisitor<PlanFragment, Pla
             PhysicalDeferMaterializeOlapScan deferMaterializeOlapScan, 
PlanTranslatorContext context) {
         PlanFragment planFragment = 
visitPhysicalOlapScan(deferMaterializeOlapScan.getPhysicalOlapScan(), context);
         OlapScanNode olapScanNode = (OlapScanNode) planFragment.getPlanRoot();
+        
olapScanNode.setDistributeExprLists(getDistributeExpr(deferMaterializeOlapScan));
         TupleDescriptor tupleDescriptor = 
context.getTupleDesc(olapScanNode.getTupleId());
         context.createSlotDesc(tupleDescriptor, 
deferMaterializeOlapScan.getColumnIdSlot());
         
context.getTopnFilterContext().translateTarget(deferMaterializeOlapScan, 
olapScanNode, context);
@@ -1035,6 +1045,8 @@ public class PhysicalPlanTranslator extends 
DefaultPlanVisitor<PlanFragment, Pla
         PlanFragment planFragment = createPlanFragment(unionNode, 
DataPartition.UNPARTITIONED, oneRowRelation);
         context.addPlanFragment(planFragment);
         updateLegacyPlanIdToPhysicalPlan(planFragment.getPlanRoot(), 
oneRowRelation);
+
+        unionNode.setDistributeExprLists(getDistributeExpr(oneRowRelation));
         return planFragment;
     }
 
@@ -1067,6 +1079,7 @@ public class PhysicalPlanTranslator extends 
DefaultPlanVisitor<PlanFragment, Pla
         context.addScanNode(scanNode, schemaScan);
         PlanFragment planFragment = createPlanFragment(scanNode, 
DataPartition.RANDOM, schemaScan);
         context.addPlanFragment(planFragment);
+        scanNode.setDistributeExprLists(getDistributeExpr(schemaScan));
         updateLegacyPlanIdToPhysicalPlan(planFragment.getPlanRoot(), 
schemaScan);
         return planFragment;
     }
@@ -1083,6 +1096,7 @@ public class PhysicalPlanTranslator extends 
DefaultPlanVisitor<PlanFragment, Pla
 
         PlanFragment planFragment = createPlanFragment(scanNode, 
DataPartition.RANDOM, workTableReference);
         context.addPlanFragment(planFragment);
+        scanNode.setDistributeExprLists(getDistributeExpr(workTableReference));
         updateLegacyPlanIdToPhysicalPlan(planFragment.getPlanRoot(), 
workTableReference);
         return planFragment;
     }
@@ -1103,6 +1117,7 @@ public class PhysicalPlanTranslator extends 
DefaultPlanVisitor<PlanFragment, Pla
         TableValuedFunctionIf catalogFunction = 
tvfRelation.getFunction().getCatalogFunction();
         SessionVariable sv = ConnectContext.get().getSessionVariable();
         ScanNode scanNode = 
catalogFunction.getScanNode(context.nextPlanNodeId(), tupleDescriptor, sv);
+        scanNode.setDistributeExprLists(getDistributeExpr(tvfRelation));
         scanNode.setNereidsId(tvfRelation.getId());
         context.getNereidsIdToPlanNodeIdMap().put(tvfRelation.getId(), 
scanNode.getId());
         Utils.execWithUncheckedException(scanNode::init);
@@ -1141,7 +1156,7 @@ public class PhysicalPlanTranslator extends 
DefaultPlanVisitor<PlanFragment, Pla
             PlanTranslatorContext context) {
 
         PlanFragment inputPlanFragment = aggregate.child(0).accept(this, 
context);
-        List<List<Expr>> distributeExprLists = 
getDistributeExprs(aggregate.child(0));
+        List<List<Expr>> distributeExprLists = 
getChildrenDistributeExprs(aggregate.child(0));
 
         List<Expression> groupByExpressions = 
aggregate.getGroupByExpressions();
         List<NamedExpression> outputExpressions = 
aggregate.getOutputExpressions();
@@ -1222,6 +1237,7 @@ public class PhysicalPlanTranslator extends 
DefaultPlanVisitor<PlanFragment, Pla
         AggregationNode aggregationNode = new 
AggregationNode(context.nextPlanNodeId(),
                 inputPlanFragment.getPlanRoot(), aggInfo);
 
+        aggregationNode.setDistributeExprLists(getDistributeExpr(aggregate));
         aggregationNode.setChildrenDistributeExprLists(distributeExprLists);
 
         aggregationNode.setNereidsId(aggregate.getId());
@@ -1339,7 +1355,7 @@ public class PhysicalPlanTranslator extends 
DefaultPlanVisitor<PlanFragment, Pla
     public PlanFragment visitPhysicalAssertNumRows(PhysicalAssertNumRows<? 
extends Plan> assertNumRows,
             PlanTranslatorContext context) {
         PlanFragment currentFragment = assertNumRows.child().accept(this, 
context);
-        List<List<Expr>> distributeExprLists = 
getDistributeExprs(assertNumRows.child());
+        List<List<Expr>> distributeExprLists = 
getChildrenDistributeExprs(assertNumRows.child());
 
         // we need convert all columns to nullable in AssertNumRows node
         // create a tuple for AssertNumRowsNode
@@ -1349,6 +1365,7 @@ public class PhysicalPlanTranslator extends 
DefaultPlanVisitor<PlanFragment, Pla
         TupleDescriptor tupleDescriptor = 
generateTupleDesc(assertNumRows.getOutput(), null, context);
         AssertNumRowsNode assertNumRowsNode = new 
AssertNumRowsNode(context.nextPlanNodeId(),
                 currentFragment.getPlanRoot(), assertion, tupleDescriptor);
+        
assertNumRowsNode.setDistributeExprLists(getDistributeExpr(assertNumRows));
         assertNumRowsNode.setChildrenDistributeExprLists(distributeExprLists);
         assertNumRowsNode.setNereidsId(assertNumRows.getId());
         context.getNereidsIdToPlanNodeIdMap().put(assertNumRows.getId(), 
assertNumRowsNode.getId());
@@ -1400,6 +1417,7 @@ public class PhysicalPlanTranslator extends 
DefaultPlanVisitor<PlanFragment, Pla
             }
         }
         CTEScanNode cteScanNode = new CTEScanNode(tupleDescriptor);
+        cteScanNode.setDistributeExprLists(getDistributeExpr(cteConsumer));
         translateRuntimeFilter(cteConsumer, cteScanNode, context);
         context.getCteScanNodeMap().put(multiCastFragment.getFragmentId(), 
cteScanNode);
 
@@ -1511,6 +1529,7 @@ public class PhysicalPlanTranslator extends 
DefaultPlanVisitor<PlanFragment, Pla
                 .collect(Collectors.toCollection(ArrayList::new));
         TableFunctionNode tableFunctionNode = new 
TableFunctionNode(context.nextPlanNodeId(),
                 currentFragment.getPlanRoot(), tupleDescriptor.getId(), 
functionCalls, outputSlotIds, conjuncts);
+        tableFunctionNode.setDistributeExprLists(getDistributeExpr(generate));
         tableFunctionNode.setNereidsId(generate.getId());
         context.getNereidsIdToPlanNodeIdMap().put(generate.getId(), 
tableFunctionNode.getId());
         addPlanRoot(currentFragment, tableFunctionNode, generate);
@@ -1574,7 +1593,8 @@ public class PhysicalPlanTranslator extends 
DefaultPlanVisitor<PlanFragment, Pla
         // NOTICE: We must visit from right to left, to ensure the last 
fragment is root fragment
         PlanFragment rightFragment = hashJoin.child(1).accept(this, context);
         PlanFragment leftFragment = hashJoin.child(0).accept(this, context);
-        List<List<Expr>> distributeExprLists = 
getDistributeExprs(physicalHashJoin.left(), physicalHashJoin.right());
+        List<List<Expr>> distributeExprLists
+                = getChildrenDistributeExprs(physicalHashJoin.left(), 
physicalHashJoin.right());
 
         if (JoinUtils.shouldNestedLoopJoin(hashJoin)) {
             throw new RuntimeException("Physical hash join could not execute 
without equal join condition.");
@@ -1619,6 +1639,7 @@ public class PhysicalPlanTranslator extends 
DefaultPlanVisitor<PlanFragment, Pla
                 markConjuncts, hashJoin.isMarkJoin());
         hashJoinNode.setNereidsId(hashJoin.getId());
         context.getNereidsIdToPlanNodeIdMap().put(hashJoin.getId(), 
hashJoinNode.getId());
+        hashJoinNode.setDistributeExprLists(getDistributeExpr(hashJoin));
         hashJoinNode.setChildrenDistributeExprLists(distributeExprLists);
         PlanFragment currentFragment = connectJoinNode(hashJoinNode, 
leftFragment, rightFragment, context, hashJoin);
 
@@ -1833,7 +1854,8 @@ public class PhysicalPlanTranslator extends 
DefaultPlanVisitor<PlanFragment, Pla
         //       PhysicalPlan plan, PlanVisitor visitor, Context context).
         PlanFragment rightFragment = nestedLoopJoin.child(1).accept(this, 
context);
         PlanFragment leftFragment = nestedLoopJoin.child(0).accept(this, 
context);
-        List<List<Expr>> distributeExprLists = 
getDistributeExprs(nestedLoopJoin.child(0), nestedLoopJoin.child(1));
+        List<List<Expr>> distributeExprLists
+                = getChildrenDistributeExprs(nestedLoopJoin.child(0), 
nestedLoopJoin.child(1));
         PlanNode leftFragmentPlanRoot = leftFragment.getPlanRoot();
         PlanNode rightFragmentPlanRoot = rightFragment.getPlanRoot();
 
@@ -1854,6 +1876,7 @@ public class PhysicalPlanTranslator extends 
DefaultPlanVisitor<PlanFragment, Pla
                 nestedLoopJoin.isMarkJoin());
         nestedLoopJoinNode.setNereidsId(nestedLoopJoin.getId());
         context.getNereidsIdToPlanNodeIdMap().put(nestedLoopJoin.getId(), 
nestedLoopJoinNode.getId());
+        
nestedLoopJoinNode.setDistributeExprLists(getDistributeExpr(nestedLoopJoin));
         nestedLoopJoinNode.setChildrenDistributeExprLists(distributeExprLists);
         if (nestedLoopJoin.getStats() != null) {
             nestedLoopJoinNode.setCardinality((long) 
nestedLoopJoin.getStats().getRowCount());
@@ -2053,9 +2076,10 @@ public class PhysicalPlanTranslator extends 
DefaultPlanVisitor<PlanFragment, Pla
     public PlanFragment visitPhysicalPartitionTopN(PhysicalPartitionTopN<? 
extends Plan> partitionTopN,
             PlanTranslatorContext context) {
         PlanFragment inputFragment = partitionTopN.child(0).accept(this, 
context);
-        List<List<Expr>> distributeExprLists = 
getDistributeExprs(partitionTopN.child(0));
+        List<List<Expr>> distributeExprLists = 
getChildrenDistributeExprs(partitionTopN.child(0));
         PartitionSortNode partitionSortNode = translatePartitionSortNode(
                 partitionTopN, inputFragment.getPlanRoot(), context);
+        
partitionSortNode.setDistributeExprLists(getDistributeExpr(partitionTopN));
         partitionSortNode.setChildrenDistributeExprLists(distributeExprLists);
         addPlanRoot(inputFragment, partitionSortNode, partitionTopN);
         // in pipeline engine, we use parallel scan by default, but it broke 
the rule of data distribution
@@ -2249,12 +2273,13 @@ public class PhysicalPlanTranslator extends 
DefaultPlanVisitor<PlanFragment, Pla
         for (Plan plan : recursiveCte.children()) {
             childrenFragments.add(plan.accept(this, context));
         }
-        List<List<Expr>> distributeExprLists = 
getDistributeExprs(recursiveCte.children().toArray(new Plan[0]));
+        List<List<Expr>> distributeExprLists = 
getChildrenDistributeExprs(recursiveCte.children().toArray(new Plan[0]));
         TupleDescriptor setTuple = generateTupleDesc(recursiveCte.getOutput(), 
null, context);
 
         RecursiveCteNode recursiveCteNode = new 
RecursiveCteNode(context.nextPlanNodeId(), setTuple.getId(),
                 recursiveCte.getCteName(), recursiveCte.isUnionAll());
 
+        
recursiveCteNode.setDistributeExprLists(getDistributeExpr(recursiveCte));
         recursiveCteNode.setChildrenDistributeExprLists(distributeExprLists);
         recursiveCteNode.setNereidsId(recursiveCte.getId());
         context.getNereidsIdToPlanNodeIdMap().put(recursiveCte.getId(), 
recursiveCteNode.getId());
@@ -2322,7 +2347,7 @@ public class PhysicalPlanTranslator extends 
DefaultPlanVisitor<PlanFragment, Pla
         for (Plan plan : setOperation.children()) {
             childrenFragments.add(plan.accept(this, context));
         }
-        List<List<Expr>> distributeExprLists = 
getDistributeExprs(setOperation.children().toArray(new Plan[0]));
+        List<List<Expr>> distributeExprLists = 
getChildrenDistributeExprs(setOperation.children().toArray(new Plan[0]));
         TupleDescriptor setTuple = generateTupleDesc(setOperation.getOutput(), 
null, context);
 
         SetOperationNode setOperationNode;
@@ -2336,6 +2361,7 @@ public class PhysicalPlanTranslator extends 
DefaultPlanVisitor<PlanFragment, Pla
         } else {
             throw new RuntimeException("not support set operation type " + 
setOperation);
         }
+        
setOperationNode.setDistributeExprLists(getDistributeExpr(setOperation));
         setOperationNode.setChildrenDistributeExprLists(distributeExprLists);
         setOperationNode.setNereidsId(setOperation.getId());
         context.getNereidsIdToPlanNodeIdMap().put(setOperation.getId(), 
setOperationNode.getId());
@@ -2448,12 +2474,13 @@ public class PhysicalPlanTranslator extends 
DefaultPlanVisitor<PlanFragment, Pla
     public PlanFragment visitPhysicalQuickSort(PhysicalQuickSort<? extends 
Plan> sort,
             PlanTranslatorContext context) {
         PlanFragment inputFragment = sort.child(0).accept(this, context);
-        List<List<Expr>> distributeExprLists = 
getDistributeExprs(sort.child(0));
+        List<List<Expr>> distributeExprLists = 
getChildrenDistributeExprs(sort.child(0));
 
         // 2. According to the type of sort, generate physical plan
         if (!sort.getSortPhase().isMerge()) {
             // For localSort or Gather->Sort, we just need to add sortNode
             SortNode sortNode = translateSortNode(sort, 
inputFragment.getPlanRoot(), context);
+            sortNode.setDistributeExprLists(getDistributeExpr(sort));
             sortNode.setChildrenDistributeExprLists(distributeExprLists);
             addPlanRoot(inputFragment, sortNode, sort);
         } else {
@@ -2470,6 +2497,7 @@ public class PhysicalPlanTranslator extends 
DefaultPlanVisitor<PlanFragment, Pla
                 inputFragment.getChild(0).getSink().setMerge(true);
             }
             sortNode.setMergeByExchange();
+            sortNode.setDistributeExprLists(getDistributeExpr(sort));
             sortNode.setChildrenDistributeExprLists(distributeExprLists);
         }
         return inputFragment;
@@ -2478,7 +2506,7 @@ public class PhysicalPlanTranslator extends 
DefaultPlanVisitor<PlanFragment, Pla
     @Override
     public PlanFragment visitPhysicalTopN(PhysicalTopN<? extends Plan> topN, 
PlanTranslatorContext context) {
         PlanFragment inputFragment = topN.child(0).accept(this, context);
-        List<List<Expr>> distributeExprLists = 
getDistributeExprs(topN.child(0));
+        List<List<Expr>> distributeExprLists = 
getChildrenDistributeExprs(topN.child(0));
         // 2. According to the type of sort, generate physical plan
         if (!topN.getSortPhase().isMerge()) {
             // For localSort or Gather->Sort, we just need to add TopNNode
@@ -2514,6 +2542,9 @@ public class PhysicalPlanTranslator extends 
DefaultPlanVisitor<PlanFragment, Pla
                     }
                 }
             }
+            sortNode.setDistributeExprLists(
+                    CollectionUtils.isEmpty(distributeExprLists) ? null : 
distributeExprLists.get(0)
+            );
             sortNode.setChildrenDistributeExprLists(distributeExprLists);
             addPlanRoot(inputFragment, sortNode, topN);
         } else {
@@ -2527,6 +2558,9 @@ public class PhysicalPlanTranslator extends 
DefaultPlanVisitor<PlanFragment, Pla
                 return inputFragment;
             }
             ExchangeNode exchangeNode = (ExchangeNode) 
inputFragment.getPlanRoot();
+            exchangeNode.setDistributeExprLists(
+                    CollectionUtils.isEmpty(distributeExprLists) ? null : 
distributeExprLists.get(0)
+            );
             exchangeNode.setChildrenDistributeExprLists(distributeExprLists);
             exchangeNode.setMergeInfo(((SortNode) 
exchangeNode.getChild(0)).getSortInfo());
             if (inputFragment.hasChild(0) && 
inputFragment.getChild(0).getSink() != null) {
@@ -2558,7 +2592,7 @@ public class PhysicalPlanTranslator extends 
DefaultPlanVisitor<PlanFragment, Pla
     @Override
     public PlanFragment visitPhysicalRepeat(PhysicalRepeat<? extends Plan> 
repeat, PlanTranslatorContext context) {
         PlanFragment inputPlanFragment = repeat.child(0).accept(this, context);
-        List<List<Expr>> distributeExprLists = 
getDistributeExprs(repeat.child(0));
+        List<List<Expr>> distributeExprLists = 
getChildrenDistributeExprs(repeat.child(0));
 
         List<Expression> flattenGroupingExpressions = 
repeat.getGroupByExpressions();
         Set<Slot> preRepeatExpressions = Sets.newLinkedHashSet();
@@ -2609,6 +2643,7 @@ public class PhysicalPlanTranslator extends 
DefaultPlanVisitor<PlanFragment, Pla
                 allSlotId, repeat.computeGroupingFunctionsValues());
         repeatNode.setNereidsId(repeat.getId());
         context.getNereidsIdToPlanNodeIdMap().put(repeat.getId(), 
repeatNode.getId());
+        repeatNode.setDistributeExprLists(getDistributeExpr(repeat));
         repeatNode.setChildrenDistributeExprLists(distributeExprLists);
         addPlanRoot(inputPlanFragment, repeatNode, repeat);
         updateLegacyPlanIdToPhysicalPlan(inputPlanFragment.getPlanRoot(), 
repeat);
@@ -2619,7 +2654,7 @@ public class PhysicalPlanTranslator extends 
DefaultPlanVisitor<PlanFragment, Pla
     public PlanFragment visitPhysicalWindow(PhysicalWindow<? extends Plan> 
physicalWindow,
             PlanTranslatorContext context) {
         PlanFragment inputPlanFragment = physicalWindow.child(0).accept(this, 
context);
-        List<List<Expr>> distributeExprLists = 
getDistributeExprs(physicalWindow.child(0));
+        List<List<Expr>> distributeExprLists = 
getChildrenDistributeExprs(physicalWindow.child(0));
 
         // 1. translate to old optimizer variable
         // variable in Nereids
@@ -2684,6 +2719,7 @@ public class PhysicalPlanTranslator extends 
DefaultPlanVisitor<PlanFragment, Pla
         );
         analyticEvalNode.setNereidsId(physicalWindow.getId());
         context.getNereidsIdToPlanNodeIdMap().put(physicalWindow.getId(), 
analyticEvalNode.getId());
+        
analyticEvalNode.setDistributeExprLists(getDistributeExpr(physicalWindow));
         analyticEvalNode.setChildrenDistributeExprLists(distributeExprLists);
         PlanNode root = inputPlanFragment.getPlanRoot();
         if (root instanceof SortNode) {
@@ -2721,6 +2757,7 @@ public class PhysicalPlanTranslator extends 
DefaultPlanVisitor<PlanFragment, Pla
         materializeNode.setLazyColumns(materialize.getLazyColumns());
         materializeNode.setLocations(materialize.getLazySlotLocations());
         materializeNode.setIdxs(materialize.getlazyTableIdxs());
+        materializeNode.setDistributeExprLists(getDistributeExpr(materialize));
 
         List<Boolean> rowStoreFlags = new ArrayList<>();
         for (Relation relation : materialize.getRelations()) {
@@ -2756,6 +2793,7 @@ public class PhysicalPlanTranslator extends 
DefaultPlanVisitor<PlanFragment, Pla
         TableValuedFunctionIf catalogFunction = 
tvfRelation.getFunction().getCatalogFunction();
         SessionVariable sv = ConnectContext.get().getSessionVariable();
         ScanNode scanNode = 
catalogFunction.getScanNode(context.nextPlanNodeId(), tupleDescriptor, sv);
+        scanNode.setDistributeExprLists(getDistributeExpr(tvfRelation));
         scanNode.setNereidsId(tvfRelation.getId());
         context.getNereidsIdToPlanNodeIdMap().put(tvfRelation.getId(), 
scanNode.getId());
         Utils.execWithUncheckedException(scanNode::init);
@@ -3201,7 +3239,7 @@ public class PhysicalPlanTranslator extends 
DefaultPlanVisitor<PlanFragment, Pla
         return false;
     }
 
-    private List<List<Expr>> getDistributeExprs(Plan ... children) {
+    private List<List<Expr>> getChildrenDistributeExprs(Plan ... children) {
         List<List<Expr>> distributeExprLists = Lists.newArrayList();
         for (Plan child : children) {
             DistributionSpec spec = ((PhysicalPlan) 
child).getPhysicalProperties().getDistributionSpec();
@@ -3210,6 +3248,13 @@ public class PhysicalPlanTranslator extends 
DefaultPlanVisitor<PlanFragment, Pla
         return distributeExprLists;
     }
 
+    private List<Expr> getDistributeExpr(PhysicalPlan physicalPlan) {
+        return getDistributeExpr(
+                physicalPlan.getOutputExprIds(),
+                physicalPlan.getPhysicalProperties().getDistributionSpec()
+        );
+    }
+
     private List<Expr> getDistributeExpr(List<ExprId> childOutputIds, 
DistributionSpec spec) {
         if (spec instanceof DistributionSpecHash) {
             DistributionSpecHash distributionSpecHash = (DistributionSpecHash) 
spec;
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/PlanType.java 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/PlanType.java
index 5b89fa7b89b..376fdef3760 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/PlanType.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/PlanType.java
@@ -133,6 +133,7 @@ public enum PlanType {
     PHYSICAL_RECURSIVE_CTE,
     PHYSICAL_RECURSIVE_CTE_RECURSIVE_CHILD,
     PHYSICAL_DISTRIBUTE,
+    PHYSICAL_LOCAL_DISTRIBUTE,
     PHYSICAL_EXCEPT,
     PHYSICAL_FILTER,
     PHYSICAL_GENERATE,
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/planner/AddLocalExchange.java 
b/fe/fe-core/src/main/java/org/apache/doris/planner/AddLocalExchange.java
new file mode 100644
index 00000000000..2fc8ad7801b
--- /dev/null
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/AddLocalExchange.java
@@ -0,0 +1,67 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.planner;
+
+import org.apache.doris.common.Pair;
+import org.apache.doris.nereids.glue.translator.PlanTranslatorContext;
+import org.apache.doris.planner.LocalExchangeNode.LocalExchangeType;
+import org.apache.doris.planner.LocalExchangeNode.LocalExchangeTypeRequire;
+
+import java.util.List;
+
+/** AddLocalExchange */
+public class AddLocalExchange {
+    public void addLocalExchange(List<PlanFragment> fragments, 
PlanTranslatorContext context) {
+        for (PlanFragment fragment : fragments) {
+            DataSink sink = fragment.getSink();
+            LocalExchangeTypeRequire require = sink == null
+                    ? LocalExchangeTypeRequire.noRequire() : 
sink.getLocalExchangeTypeRequire();
+            PlanNode root = fragment.getPlanRoot();
+            Pair<PlanNode, LocalExchangeType> output = root
+                    .enforceAndDeriveLocalExchange(context, null, require);
+            if (output.first != root) {
+                fragment.setPlanRoot(output.first);
+            }
+        }
+    }
+
+    public static boolean isColocated(PlanNode plan) {
+        if (plan instanceof AggregationNode) {
+            return ((AggregationNode) plan).isColocate() && 
isColocated(plan.getChild(0));
+        } else if (plan instanceof OlapScanNode) {
+            return true;
+        } else if (plan instanceof SelectNode) {
+            return isColocated(plan.getChild(0));
+        } else if (plan instanceof HashJoinNode) {
+            return ((HashJoinNode) plan).isColocate()
+                    && (isColocated(plan.getChild(0)) || 
isColocated(plan.getChild(1)));
+        } else if (plan instanceof SetOperationNode) {
+            if (!((SetOperationNode) plan).isColocate()) {
+                return false;
+            }
+            for (PlanNode child : plan.getChildren()) {
+                if (isColocated(child)) {
+                    return true;
+                }
+            }
+            return false;
+        } else {
+            return false;
+        }
+    }
+}
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/planner/AggregationNode.java 
b/fe/fe-core/src/main/java/org/apache/doris/planner/AggregationNode.java
index b0e0bab2b2b..d41212819ab 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/AggregationNode.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/AggregationNode.java
@@ -26,7 +26,13 @@ import org.apache.doris.analysis.FunctionCallExpr;
 import org.apache.doris.analysis.SlotDescriptor;
 import org.apache.doris.analysis.SlotRef;
 import org.apache.doris.analysis.SortInfo;
+import org.apache.doris.common.Pair;
+import org.apache.doris.nereids.glue.translator.PlanTranslatorContext;
+import org.apache.doris.planner.LocalExchangeNode.LocalExchangeType;
+import org.apache.doris.planner.LocalExchangeNode.LocalExchangeTypeRequire;
 import org.apache.doris.planner.normalize.Normalizer;
+import org.apache.doris.qe.ConnectContext;
+import org.apache.doris.qe.SessionVariable;
 import org.apache.doris.thrift.TAggregationNode;
 import org.apache.doris.thrift.TExplainLevel;
 import org.apache.doris.thrift.TExpr;
@@ -246,6 +252,10 @@ public class AggregationNode extends PlanNode {
         isColocate = colocate;
     }
 
+    public boolean isColocate() {
+        return isColocate;
+    }
+
     public void setSortByGroupKey(SortInfo sortByGroupKey) {
         this.sortByGroupKey = sortByGroupKey;
     }
@@ -257,4 +267,49 @@ public class AggregationNode extends PlanNode {
     public void setQueryCacheCandidate(boolean queryCacheCandidate) {
         this.queryCacheCandidate = queryCacheCandidate;
     }
+
+    @Override
+    public Pair<PlanNode, LocalExchangeType> enforceAndDeriveLocalExchange(
+            PlanTranslatorContext translatorContext, PlanNode parent, 
LocalExchangeTypeRequire parentRequire) {
+
+        ConnectContext connectContext = translatorContext.getConnectContext();
+        SessionVariable sessionVariable = connectContext.getSessionVariable();
+
+        LocalExchangeTypeRequire requireChild;
+        if (needsFinalize && aggInfo.getGroupingExprs().isEmpty()) {
+            requireChild = LocalExchangeTypeRequire.noRequire();
+        } else if (canUseDistinctStreamingAgg(sessionVariable)) {
+            boolean childUsePoolingScan = 
fragment.useSerialSource(translatorContext.getConnectContext())
+                    && (children.get(0) instanceof ScanNode);
+            if (needsFinalize || (aggInfo.getGroupingExprs().size() > 1 && 
!useStreamingPreagg)) {
+                if (AddLocalExchange.isColocated(this)) {
+                    requireChild = 
LocalExchangeTypeRequire.requireBucketHash();
+                } else {
+                    requireChild = parentRequire.autoHash();
+                }
+            } else if (childUsePoolingScan) {
+                requireChild = LocalExchangeTypeRequire.requirePassthrough();
+            } else {
+                requireChild = LocalExchangeTypeRequire.noRequire();
+            }
+        } else {
+            if (aggInfo.getGroupingExprs().isEmpty()) {
+                requireChild = LocalExchangeTypeRequire.requirePassthrough();
+            } else if (AddLocalExchange.isColocated(this)) {
+                requireChild = LocalExchangeTypeRequire.requireBucketHash();
+            } else {
+                requireChild = parentRequire.autoHash();
+            }
+        }
+
+        Pair<PlanNode, LocalExchangeType> enforceResult
+                = enforceChild(translatorContext, requireChild, 
children.get(0));
+        children = Lists.newArrayList(enforceResult.first);
+        return Pair.of(this, enforceResult.second);
+    }
+
+    private boolean canUseDistinctStreamingAgg(SessionVariable 
sessionVariable) {
+        return aggInfo.getAggregateExprs().isEmpty() && sortByGroupKey == null
+                && sessionVariable.enableDistinctStreamingAggregation;
+    }
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/planner/AnalyticEvalNode.java 
b/fe/fe-core/src/main/java/org/apache/doris/planner/AnalyticEvalNode.java
index 6992eeb001a..292df60cb26 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/AnalyticEvalNode.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/AnalyticEvalNode.java
@@ -24,6 +24,10 @@ import org.apache.doris.analysis.AnalyticWindow;
 import org.apache.doris.analysis.Expr;
 import org.apache.doris.analysis.OrderByElement;
 import org.apache.doris.analysis.TupleDescriptor;
+import org.apache.doris.common.Pair;
+import org.apache.doris.nereids.glue.translator.PlanTranslatorContext;
+import org.apache.doris.planner.LocalExchangeNode.LocalExchangeType;
+import org.apache.doris.planner.LocalExchangeNode.LocalExchangeTypeRequire;
 import org.apache.doris.thrift.TAnalyticNode;
 import org.apache.doris.thrift.TExplainLevel;
 import org.apache.doris.thrift.TPlanNode;
@@ -152,4 +156,37 @@ public class AnalyticEvalNode extends PlanNode {
     public boolean isSerialOperator() {
         return partitionExprs.isEmpty();
     }
+
+    @Override
+    public Pair<PlanNode, LocalExchangeType> 
enforceAndDeriveLocalExchange(PlanTranslatorContext translatorContext,
+            PlanNode parent, LocalExchangeTypeRequire parentRequire) {
+        LocalExchangeTypeRequire requireChild;
+        LocalExchangeType outputType = null;
+        if (partitionExprs.isEmpty()) {
+            requireChild = LocalExchangeTypeRequire.requirePassthrough();
+            outputType = LocalExchangeType.PASSTHROUGH;
+        } else if (orderByElements.isEmpty()) {
+            if (AddLocalExchange.isColocated(this)) {
+                requireChild = LocalExchangeTypeRequire.requireBucketHash();
+                outputType = LocalExchangeType.BUCKET_HASH_SHUFFLE;
+            } else {
+                requireChild = parentRequire.autoHash();
+            }
+        } else if 
(fragment.useSerialSource(translatorContext.getConnectContext())
+                && children.get(0) instanceof ScanNode) {
+            requireChild = LocalExchangeTypeRequire.requirePassthrough();
+            outputType = LocalExchangeType.PASSTHROUGH;
+        } else {
+            requireChild = LocalExchangeTypeRequire.noRequire();
+            outputType = LocalExchangeType.NOOP;
+        }
+
+        Pair<PlanNode, LocalExchangeType> enforceResult
+                = enforceChild(translatorContext, requireChild, 
children.get(0));
+        children = Lists.newArrayList(enforceResult.first);
+        if (outputType == null) {
+            outputType = enforceResult.second;
+        }
+        return Pair.of(this, outputType);
+    }
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/planner/AssertNumRowsNode.java 
b/fe/fe-core/src/main/java/org/apache/doris/planner/AssertNumRowsNode.java
index dcf683f0783..6874f4f0fad 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/AssertNumRowsNode.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/AssertNumRowsNode.java
@@ -19,11 +19,18 @@ package org.apache.doris.planner;
 
 import org.apache.doris.analysis.AssertNumRowsElement;
 import org.apache.doris.analysis.TupleDescriptor;
+import org.apache.doris.common.Pair;
+import org.apache.doris.nereids.glue.translator.PlanTranslatorContext;
+import org.apache.doris.planner.LocalExchangeNode.LocalExchangeType;
+import org.apache.doris.planner.LocalExchangeNode.LocalExchangeTypeRequire;
+import org.apache.doris.planner.LocalExchangeNode.RequireSpecific;
 import org.apache.doris.thrift.TAssertNumRowsNode;
 import org.apache.doris.thrift.TExplainLevel;
 import org.apache.doris.thrift.TPlanNode;
 import org.apache.doris.thrift.TPlanNodeType;
 
+import com.google.common.collect.Lists;
+
 /**
  * Assert num rows node is used to determine whether the number of rows is 
less than desired num of rows.
  * The rows are the result of subqueryString.
@@ -81,4 +88,24 @@ public class AssertNumRowsNode extends PlanNode {
     public boolean isSerialOperator() {
         return true;
     }
+
+    @Override
+    public Pair<PlanNode, LocalExchangeType> 
enforceAndDeriveLocalExchange(PlanTranslatorContext translatorContext,
+            PlanNode parent, LocalExchangeTypeRequire parentRequire) {
+
+        PlanNode child = children.get(0);
+        RequireSpecific requireChild = 
LocalExchangeTypeRequire.requirePassthrough();
+        Pair<PlanNode, LocalExchangeType> childOutput = 
child.enforceAndDeriveLocalExchange(
+                translatorContext, this, requireChild);
+
+        if (!requireChild.satisfy(childOutput.second)) {
+            LocalExchangeType preferType = requireChild.preferType();
+            LocalExchangeNode localExchangeNode
+                    = new 
LocalExchangeNode(translatorContext.nextPlanNodeId(), child, preferType);
+            children = Lists.newArrayList(localExchangeNode);
+        } else {
+            children = Lists.newArrayList(childOutput.first);
+        }
+        return Pair.of(this, LocalExchangeType.PASSTHROUGH);
+    }
 }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/DataSink.java 
b/fe/fe-core/src/main/java/org/apache/doris/planner/DataSink.java
index 320fcae9eb0..199c428b561 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/DataSink.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/DataSink.java
@@ -27,6 +27,7 @@ import org.apache.doris.common.AnalysisException;
 import org.apache.doris.datasource.hive.HMSExternalTable;
 import org.apache.doris.datasource.iceberg.IcebergExternalTable;
 import org.apache.doris.datasource.odbc.sink.OdbcTableSink;
+import org.apache.doris.planner.LocalExchangeNode.LocalExchangeTypeRequire;
 import org.apache.doris.thrift.TDataSink;
 import org.apache.doris.thrift.TExplainLevel;
 
@@ -86,4 +87,8 @@ public abstract class DataSink {
     public void setMerge(boolean merge) {
         isMerge = merge;
     }
+
+    public LocalExchangeTypeRequire getLocalExchangeTypeRequire() {
+        return LocalExchangeTypeRequire.noRequire();
+    }
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/planner/DataStreamSink.java 
b/fe/fe-core/src/main/java/org/apache/doris/planner/DataStreamSink.java
index 155b63d6224..22f73ed2b28 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/DataStreamSink.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/DataStreamSink.java
@@ -22,6 +22,7 @@ package org.apache.doris.planner;
 
 import org.apache.doris.analysis.Expr;
 import org.apache.doris.analysis.TupleDescriptor;
+import org.apache.doris.planner.LocalExchangeNode.LocalExchangeTypeRequire;
 import org.apache.doris.thrift.TDataSink;
 import org.apache.doris.thrift.TDataSinkType;
 import org.apache.doris.thrift.TDataStreamSink;
@@ -29,6 +30,7 @@ import org.apache.doris.thrift.TExplainLevel;
 import org.apache.doris.thrift.TOlapTableLocationParam;
 import org.apache.doris.thrift.TOlapTablePartitionParam;
 import org.apache.doris.thrift.TOlapTableSchemaParam;
+import org.apache.doris.thrift.TPartitionType;
 
 import com.google.common.base.Joiner;
 import com.google.common.base.Preconditions;
@@ -239,4 +241,16 @@ public class DataStreamSink extends DataSink {
         result.setStreamSink(tStreamSink);
         return result;
     }
+
+    @Override
+    public LocalExchangeTypeRequire getLocalExchangeTypeRequire() {
+        TPartitionType outputType = outputPartition.getType();
+        if (outputType == TPartitionType.HASH_PARTITIONED) {
+            return LocalExchangeTypeRequire.requireExecutionHash();
+        } else if (outputType == 
TPartitionType.BUCKET_SHFFULE_HASH_PARTITIONED) {
+            return LocalExchangeTypeRequire.requireBucketHash();
+        } else {
+            return LocalExchangeTypeRequire.noRequire();
+        }
+    }
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/planner/ExchangeNode.java 
b/fe/fe-core/src/main/java/org/apache/doris/planner/ExchangeNode.java
index 7274f731c5a..c24c3894b4e 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/ExchangeNode.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/ExchangeNode.java
@@ -23,6 +23,10 @@ package org.apache.doris.planner;
 import org.apache.doris.analysis.SortInfo;
 import org.apache.doris.analysis.TupleDescriptor;
 import org.apache.doris.analysis.TupleId;
+import org.apache.doris.common.Pair;
+import org.apache.doris.nereids.glue.translator.PlanTranslatorContext;
+import org.apache.doris.planner.LocalExchangeNode.LocalExchangeType;
+import org.apache.doris.planner.LocalExchangeNode.LocalExchangeTypeRequire;
 import org.apache.doris.qe.ConnectContext;
 import org.apache.doris.thrift.TExchangeNode;
 import org.apache.doris.thrift.TExplainLevel;
@@ -166,4 +170,16 @@ public class ExchangeNode extends PlanNode {
     public boolean hasSerialScanChildren() {
         return false;
     }
+
+    @Override
+    public Pair<PlanNode, LocalExchangeType> 
enforceAndDeriveLocalExchange(PlanTranslatorContext translatorContext,
+            PlanNode parent, LocalExchangeTypeRequire parentRequire) {
+        if (partitionType == TPartitionType.HASH_PARTITIONED) {
+            return Pair.of(this, 
LocalExchangeType.GLOBAL_EXECUTION_HASH_SHUFFLE);
+        } else if (partitionType == 
TPartitionType.BUCKET_SHFFULE_HASH_PARTITIONED) {
+            return Pair.of(this, LocalExchangeType.BUCKET_HASH_SHUFFLE);
+        } else {
+            return Pair.of(this, LocalExchangeType.NOOP);
+        }
+    }
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/planner/HashJoinNode.java 
b/fe/fe-core/src/main/java/org/apache/doris/planner/HashJoinNode.java
index e5e40126b13..c748809cc35 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/HashJoinNode.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/HashJoinNode.java
@@ -25,7 +25,11 @@ import org.apache.doris.analysis.Expr;
 import org.apache.doris.analysis.JoinOperator;
 import org.apache.doris.analysis.SlotId;
 import org.apache.doris.analysis.TupleDescriptor;
+import org.apache.doris.common.Pair;
+import org.apache.doris.nereids.glue.translator.PlanTranslatorContext;
 import org.apache.doris.nereids.trees.expressions.ExprId;
+import org.apache.doris.planner.LocalExchangeNode.LocalExchangeType;
+import org.apache.doris.planner.LocalExchangeNode.LocalExchangeTypeRequire;
 import org.apache.doris.thrift.TEqJoinCondition;
 import org.apache.doris.thrift.TExplainLevel;
 import org.apache.doris.thrift.THashJoinNode;
@@ -118,6 +122,10 @@ public class HashJoinNode extends JoinNodeBase {
         colocateReason = reason;
     }
 
+    public boolean isColocate() {
+        return isColocate;
+    }
+
     public Map<ExprId, SlotId> getHashOutputExprSlotIdMap() {
         return hashOutputExprSlotIdMap;
     }
@@ -267,4 +275,58 @@ public class HashJoinNode extends JoinNodeBase {
     public List<Expr> getMarkJoinConjuncts() {
         return markJoinConjuncts;
     }
+
+    @Override
+    public Pair<PlanNode, LocalExchangeType> enforceAndDeriveLocalExchange(
+            PlanTranslatorContext translatorContext, PlanNode parent, 
LocalExchangeTypeRequire parentRequire) {
+
+        LocalExchangeTypeRequire probeSideRequire;
+        LocalExchangeTypeRequire buildSideRequire;
+        LocalExchangeType outputType = null;
+
+        if (joinOp == JoinOperator.NULL_AWARE_LEFT_ANTI_JOIN) {
+            buildSideRequire = probeSideRequire = 
LocalExchangeTypeRequire.noRequire();
+            outputType = LocalExchangeType.NOOP;
+        } else if (distrMode == DistributionMode.BROADCAST) {
+            probeSideRequire = LocalExchangeTypeRequire.requirePassthrough();
+            buildSideRequire = LocalExchangeTypeRequire.requirePassToOne();
+            outputType = LocalExchangeType.PASSTHROUGH;
+        } else if (AddLocalExchange.isColocated(this) || isBucketShuffle()) {
+            buildSideRequire = probeSideRequire = 
LocalExchangeTypeRequire.requireBucketHash();
+            outputType = LocalExchangeType.BUCKET_HASH_SHUFFLE;
+        } else {
+            buildSideRequire = probeSideRequire = parentRequire.autoHash();
+        }
+
+        PlanNode probeSide = children.get(0);
+        Pair<PlanNode, LocalExchangeType> probeSideOutput = 
probeSide.enforceAndDeriveLocalExchange(
+                translatorContext, this, probeSideRequire);
+        if (!probeSideRequire.satisfy(probeSideOutput.second)) {
+            LocalExchangeType preferType = probeSideRequire.preferType();
+            probeSide = new LocalExchangeNode(
+                    translatorContext.nextPlanNodeId(), probeSide, preferType
+            );
+        } else {
+            probeSide = probeSideOutput.first;
+        }
+
+        PlanNode buildSide = children.get(1);
+        Pair<PlanNode, LocalExchangeType> buildSideOutput = 
buildSide.enforceAndDeriveLocalExchange(
+                translatorContext, this, buildSideRequire);
+        if (!buildSideRequire.satisfy(buildSideOutput.second)) {
+            LocalExchangeType preferType = buildSideRequire.preferType();
+            buildSide = new LocalExchangeNode(
+                    translatorContext.nextPlanNodeId(), buildSide, preferType
+            );
+        } else {
+            buildSide = buildSideOutput.first;
+        }
+
+        this.children = Lists.newArrayList(probeSide, buildSide);
+
+        if (outputType == null) {
+            outputType = probeSideOutput.second;
+        }
+        return Pair.of(this, outputType);
+    }
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/planner/LocalExchangeNode.java 
b/fe/fe-core/src/main/java/org/apache/doris/planner/LocalExchangeNode.java
new file mode 100644
index 00000000000..840b1b5ed4d
--- /dev/null
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/LocalExchangeNode.java
@@ -0,0 +1,271 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+// This file is copied from
+// 
https://github.com/apache/impala/blob/branch-2.9.0/fe/src/main/java/org/apache/impala/ExchangeNode.java
+// and modified by Doris
+
+package org.apache.doris.planner;
+
+import org.apache.doris.analysis.Expr;
+import org.apache.doris.analysis.TupleDescriptor;
+import org.apache.doris.qe.ConnectContext;
+import org.apache.doris.thrift.TExplainLevel;
+import org.apache.doris.thrift.TExpr;
+import org.apache.doris.thrift.TLocalExchangeNode;
+import org.apache.doris.thrift.TLocalPartitionType;
+import org.apache.doris.thrift.TPlanNode;
+import org.apache.doris.thrift.TPlanNodeType;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+/** LocalExchangeNode */
+public class LocalExchangeNode extends PlanNode {
+    public static final String EXCHANGE_NODE = "LOCAL-EXCHANGE";
+
+    private LocalExchangeType exchangeType;
+
+    /**
+     * use for Nereids only.
+     */
+    public LocalExchangeNode(PlanNodeId id, PlanNode inputNode, 
LocalExchangeType exchangeType) {
+        super(id, inputNode, EXCHANGE_NODE);
+        this.offset = 0;
+        this.limit = -1;
+        this.conjuncts = Collections.emptyList();
+        this.children.add(inputNode);
+        this.exchangeType = exchangeType;
+        this.fragment = inputNode.getFragment();
+
+        List<Expr> distributeExprs = inputNode.getDistributeExprLists();
+        boolean isHashShuffle = (exchangeType == 
LocalExchangeType.BUCKET_HASH_SHUFFLE
+                || exchangeType == 
LocalExchangeType.LOCAL_EXECUTION_HASH_SHUFFLE
+                || exchangeType == 
LocalExchangeType.GLOBAL_EXECUTION_HASH_SHUFFLE);
+        if (isHashShuffle && distributeExprs != null && 
!distributeExprs.isEmpty()) {
+            setDistributeExprLists(distributeExprs);
+            List<List<Expr>> distributeExprsList = new ArrayList<>();
+            distributeExprsList.add(distributeExprs);
+            setChildrenDistributeExprLists(distributeExprsList);
+        }
+        TupleDescriptor outputTupleDesc = inputNode.getOutputTupleDesc();
+        updateTupleIds(outputTupleDesc);
+    }
+
+    public void updateTupleIds(TupleDescriptor outputTupleDesc) {
+        if (outputTupleDesc != null) {
+            clearTupleIds();
+            tupleIds.add(outputTupleDesc.getId());
+        } else {
+            clearTupleIds();
+            tupleIds.addAll(getChild(0).getOutputTupleIds());
+        }
+    }
+
+    @Override
+    protected void toThrift(TPlanNode msg) {
+        msg.setIsSerialOperator((isSerialOperator() || 
fragment.hasSerialScanNode())
+                && fragment.useSerialSource(ConnectContext.get()));
+
+        msg.node_type = TPlanNodeType.LOCAL_EXCHANGE_NODE;
+        msg.local_exchange_node = new 
TLocalExchangeNode(exchangeType.toThrift());
+
+        if (exchangeType.isHashShuffle()) {
+            List<List<TExpr>> distributeExprLists = new ArrayList<>();
+            for (List<Expr> exprList : childrenDistributeExprLists) {
+                List<TExpr> distributeExprList = new ArrayList<>();
+                for (Expr expr : exprList) {
+                    distributeExprList.add(expr.treeToThrift());
+                }
+                distributeExprLists.add(distributeExprList);
+            }
+            msg.distribute_expr_lists = distributeExprLists;
+        }
+    }
+
+    @Override
+    public String getNodeExplainString(String prefix, TExplainLevel 
detailLevel) {
+        return prefix + "type: " + exchangeType.name() + "\n";
+    }
+
+    /** LocalExchangeTypeRequire */
+    public interface LocalExchangeTypeRequire {
+        boolean satisfy(LocalExchangeType provide);
+
+        LocalExchangeType preferType();
+
+        default LocalExchangeTypeRequire autoHash() {
+            return RequireHash.INSTANCE;
+        }
+
+        static NoRequire noRequire() {
+            return NoRequire.INSTANCE;
+        }
+
+        static RequireHash requireHash() {
+            return RequireHash.INSTANCE;
+        }
+
+        static RequireSpecific requirePassthrough() {
+            return requireSpecific(LocalExchangeType.PASSTHROUGH);
+        }
+
+        static RequireSpecific requirePassToOne() {
+            return requireSpecific(LocalExchangeType.PASS_TO_ONE);
+        }
+
+        static RequireSpecific requireBroadcast() {
+            return requireSpecific(LocalExchangeType.BROADCAST);
+        }
+
+        static RequireSpecific requireAdaptivePassthrough() {
+            return requireSpecific(LocalExchangeType.ADAPTIVE_PASSTHROUGH);
+        }
+
+        static RequireSpecific requireBucketHash() {
+            return requireSpecific(LocalExchangeType.BUCKET_HASH_SHUFFLE);
+        }
+
+        static RequireSpecific requireExecutionHash() {
+            return 
requireSpecific(LocalExchangeType.GLOBAL_EXECUTION_HASH_SHUFFLE);
+        }
+
+        static RequireSpecific requireSpecific(LocalExchangeType require) {
+            return new RequireSpecific(require);
+        }
+
+        default LocalExchangeType noopTo(LocalExchangeType defaultType) {
+            LocalExchangeType preferType = preferType();
+            return (preferType == LocalExchangeType.NOOP) ? defaultType : 
preferType;
+        }
+    }
+
+    /** NoRequire */
+    public static class NoRequire implements LocalExchangeTypeRequire {
+        public static final NoRequire INSTANCE = new NoRequire();
+
+        @Override
+        public boolean satisfy(LocalExchangeType provide) {
+            return true;
+        }
+
+        @Override
+        public LocalExchangeType preferType() {
+            return LocalExchangeType.NOOP;
+        }
+    }
+
+    /** RequireHash */
+    public static class RequireHash implements LocalExchangeTypeRequire {
+        public static final RequireHash INSTANCE = new RequireHash();
+
+        @Override
+        public boolean satisfy(LocalExchangeType provide) {
+            switch (provide) {
+                case GLOBAL_EXECUTION_HASH_SHUFFLE:
+                case BUCKET_HASH_SHUFFLE:
+                    return true;
+                default:
+                    return false;
+            }
+        }
+
+        @Override
+        public LocalExchangeType preferType() {
+            return LocalExchangeType.GLOBAL_EXECUTION_HASH_SHUFFLE;
+        }
+
+        @Override
+        public LocalExchangeTypeRequire autoHash() {
+            return this;
+        }
+    }
+
+    public static class RequireSpecific implements LocalExchangeTypeRequire {
+        LocalExchangeType requireType;
+
+        public RequireSpecific(LocalExchangeType requireType) {
+            this.requireType = requireType;
+        }
+
+        @Override
+        public boolean satisfy(LocalExchangeType provide) {
+            return requireType == provide;
+        }
+
+        @Override
+        public LocalExchangeType preferType() {
+            return requireType;
+        }
+
+        @Override
+        public LocalExchangeTypeRequire autoHash() {
+            if (requireType == LocalExchangeType.GLOBAL_EXECUTION_HASH_SHUFFLE
+                    || requireType == LocalExchangeType.BUCKET_HASH_SHUFFLE) {
+                return this;
+            }
+            return RequireHash.INSTANCE;
+        }
+    }
+
+    public enum LocalExchangeType {
+        NOOP,
+        GLOBAL_EXECUTION_HASH_SHUFFLE,
+        LOCAL_EXECUTION_HASH_SHUFFLE,
+        BUCKET_HASH_SHUFFLE,
+        PASSTHROUGH,
+        ADAPTIVE_PASSTHROUGH,
+        BROADCAST,
+        PASS_TO_ONE,
+        LOCAL_MERGE_SORT;
+
+        public boolean isHashShuffle() {
+            switch (this) {
+                case GLOBAL_EXECUTION_HASH_SHUFFLE:
+                case LOCAL_EXECUTION_HASH_SHUFFLE:
+                case BUCKET_HASH_SHUFFLE:
+                    return true;
+                default:
+                    return false;
+            }
+        }
+
+        public TLocalPartitionType toThrift() {
+            switch (this) {
+                case GLOBAL_EXECUTION_HASH_SHUFFLE:
+                    return TLocalPartitionType.GLOBAL_EXECUTION_HASH_SHUFFLE;
+                case LOCAL_EXECUTION_HASH_SHUFFLE:
+                    return TLocalPartitionType.LOCAL_EXECUTION_HASH_SHUFFLE;
+                case BUCKET_HASH_SHUFFLE:
+                    return TLocalPartitionType.BUCKET_HASH_SHUFFLE;
+                case PASSTHROUGH:
+                    return TLocalPartitionType.PASSTHROUGH;
+                case ADAPTIVE_PASSTHROUGH:
+                    return TLocalPartitionType.ADAPTIVE_PASSTHROUGH;
+                case BROADCAST:
+                    return TLocalPartitionType.BROADCAST;
+                case PASS_TO_ONE:
+                    return TLocalPartitionType.PASS_TO_ONE;
+                case LOCAL_MERGE_SORT:
+                    return TLocalPartitionType.LOCAL_MERGE_SORT;
+                default: {
+                    throw new IllegalStateException("Unsupported 
LocalExchangeType: " + this);
+                }
+            }
+        }
+    }
+}
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/planner/NestedLoopJoinNode.java 
b/fe/fe-core/src/main/java/org/apache/doris/planner/NestedLoopJoinNode.java
index 520cbf937cd..f9cc1c3a1ff 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/NestedLoopJoinNode.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/NestedLoopJoinNode.java
@@ -22,11 +22,17 @@ import org.apache.doris.analysis.JoinOperator;
 import org.apache.doris.analysis.SlotId;
 import org.apache.doris.analysis.TupleDescriptor;
 import org.apache.doris.analysis.TupleId;
+import org.apache.doris.common.Pair;
+import org.apache.doris.nereids.glue.translator.PlanTranslatorContext;
+import org.apache.doris.planner.LocalExchangeNode.LocalExchangeType;
+import org.apache.doris.planner.LocalExchangeNode.LocalExchangeTypeRequire;
 import org.apache.doris.thrift.TExplainLevel;
 import org.apache.doris.thrift.TNestedLoopJoinNode;
 import org.apache.doris.thrift.TPlanNode;
 import org.apache.doris.thrift.TPlanNodeType;
 
+import com.google.common.collect.Lists;
+
 import java.util.List;
 
 /**
@@ -165,4 +171,55 @@ public class NestedLoopJoinNode extends JoinNodeBase {
         return joinOp == JoinOperator.RIGHT_OUTER_JOIN || joinOp == 
JoinOperator.RIGHT_ANTI_JOIN
                 || joinOp == JoinOperator.RIGHT_SEMI_JOIN || joinOp == 
JoinOperator.FULL_OUTER_JOIN;
     }
+
+    @Override
+    public Pair<PlanNode, LocalExchangeType> 
enforceAndDeriveLocalExchange(PlanTranslatorContext translatorContext,
+            PlanNode parent, LocalExchangeTypeRequire parentRequire) {
+
+        boolean childUsePoolingScan = 
fragment.useSerialSource(translatorContext.getConnectContext())
+                && ((children.get(0) instanceof ScanNode) || (children.get(1) 
instanceof ScanNode));
+
+        LocalExchangeTypeRequire probeSideRequire;
+        LocalExchangeTypeRequire buildSideRequire;
+        LocalExchangeType outputType;
+        if (joinOp == JoinOperator.NULL_AWARE_LEFT_ANTI_JOIN) {
+            probeSideRequire = buildSideRequire = 
LocalExchangeTypeRequire.noRequire();
+            outputType = LocalExchangeType.NOOP;
+        } else if (childUsePoolingScan) {
+            probeSideRequire = 
LocalExchangeTypeRequire.requireAdaptivePassthrough();
+            buildSideRequire = LocalExchangeTypeRequire.requireBroadcast();
+            outputType = LocalExchangeType.ADAPTIVE_PASSTHROUGH;
+        } else {
+            probeSideRequire = 
LocalExchangeTypeRequire.requireAdaptivePassthrough();
+            buildSideRequire = LocalExchangeTypeRequire.noRequire();
+            outputType = LocalExchangeType.ADAPTIVE_PASSTHROUGH;
+        }
+
+        PlanNode probeSide = children.get(0);
+        Pair<PlanNode, LocalExchangeType> probeSideOutput = 
probeSide.enforceAndDeriveLocalExchange(
+                translatorContext, this, probeSideRequire);
+        if (!probeSideRequire.satisfy(probeSideOutput.second)) {
+            LocalExchangeType preferType = probeSideRequire.preferType();
+            probeSide = new LocalExchangeNode(
+                    translatorContext.nextPlanNodeId(), probeSide, preferType
+            );
+        } else {
+            probeSide = probeSideOutput.first;
+        }
+
+        PlanNode buildSide = children.get(1);
+        Pair<PlanNode, LocalExchangeType> buildSideOutput = 
buildSide.enforceAndDeriveLocalExchange(
+                translatorContext, this, buildSideRequire);
+        if (!buildSideRequire.satisfy(buildSideOutput.second)) {
+            LocalExchangeType preferType = buildSideRequire.preferType();
+            buildSide = new LocalExchangeNode(
+                    translatorContext.nextPlanNodeId(), buildSide, preferType
+            );
+        } else {
+            buildSide = buildSideOutput.first;
+        }
+
+        this.children = Lists.newArrayList(probeSide, buildSide);
+        return Pair.of(this, outputType);
+    }
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java 
b/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java
index 1a20ecb3578..fb7aebbff89 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java
@@ -53,6 +53,8 @@ import org.apache.doris.common.UserException;
 import org.apache.doris.common.util.DebugUtil;
 import org.apache.doris.info.PartitionNamesInfo;
 import org.apache.doris.nereids.glue.translator.PlanTranslatorContext;
+import org.apache.doris.planner.LocalExchangeNode.LocalExchangeType;
+import org.apache.doris.planner.LocalExchangeNode.LocalExchangeTypeRequire;
 import org.apache.doris.planner.normalize.Normalizer;
 import org.apache.doris.planner.normalize.PartitionRangePredicateNormalizer;
 import org.apache.doris.qe.ConnectContext;
@@ -1049,6 +1051,9 @@ public class OlapScanNode extends ScanNode {
         if (isPointQuery()) {
             output.append(prefix).append("SHORT-CIRCUIT\n");
         }
+        if (fragment.useSerialSource(ConnectContext.get())) {
+            output.append(prefix).append("POOLING-SCAN\n");
+        }
 
         printNestedColumns(output, prefix, getTupleDesc());
 
@@ -1384,4 +1389,24 @@ public class OlapScanNode extends ScanNode {
         }
         return super.getCatalogId();
     }
+
+    @Override
+    public Pair<PlanNode, LocalExchangeType> enforceAndDeriveLocalExchange(
+            PlanTranslatorContext translatorContext, PlanNode parent,
+            LocalExchangeTypeRequire parentRequire) {
+        boolean poolingScan = 
fragment.useSerialSource(translatorContext.getConnectContext());
+        if (poolingScan) {
+            LocalExchangeType preferType = 
parentRequire.noopTo(LocalExchangeType.BUCKET_HASH_SHUFFLE);
+
+            LocalExchangeNode exchangeNode = new LocalExchangeNode(
+                    translatorContext.nextPlanNodeId(), this, preferType
+            );
+            return Pair.of(
+                    exchangeNode,
+                    preferType
+            );
+        } else {
+            return Pair.of(this, LocalExchangeType.NOOP);
+        }
+    }
 }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/PlanNode.java 
b/fe/fe-core/src/main/java/org/apache/doris/planner/PlanNode.java
index 14923c381b8..21e31595a55 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/PlanNode.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/PlanNode.java
@@ -32,6 +32,9 @@ import org.apache.doris.common.Pair;
 import org.apache.doris.common.TreeNode;
 import org.apache.doris.common.UserException;
 import org.apache.doris.datasource.iceberg.source.IcebergScanNode;
+import org.apache.doris.nereids.glue.translator.PlanTranslatorContext;
+import org.apache.doris.planner.LocalExchangeNode.LocalExchangeType;
+import org.apache.doris.planner.LocalExchangeNode.LocalExchangeTypeRequire;
 import org.apache.doris.planner.normalize.Normalizer;
 import org.apache.doris.qe.ConnectContext;
 import org.apache.doris.thrift.TAccessPathType;
@@ -139,7 +142,8 @@ public abstract class PlanNode extends TreeNode<PlanNode> {
 
     protected int nereidsId = -1;
 
-    private List<List<Expr>> childrenDistributeExprLists = new ArrayList<>();
+    protected List<List<Expr>> childrenDistributeExprLists = new ArrayList<>();
+    protected List<Expr> distributeExprLists = new ArrayList<>();
 
     protected PlanNode(PlanNodeId id, List<TupleId> tupleIds, String 
planNodeName) {
         this.id = id;
@@ -913,4 +917,56 @@ public abstract class PlanNode extends TreeNode<PlanNode> {
         }
         return StringUtils.join(mergeDisplayAccessPaths, ", ");
     }
+
+    public Pair<PlanNode, LocalExchangeType> enforceAndDeriveLocalExchange(
+            PlanTranslatorContext translatorContext, PlanNode parent, 
LocalExchangeTypeRequire parentRequire) {
+        ArrayList<PlanNode> newChildren = Lists.newArrayList();
+        for (int i = 0; i < children.size(); i++) {
+            PlanNode child = children.get(i);
+            Pair<PlanNode, LocalExchangeType> childOutput
+                    = child.enforceAndDeriveLocalExchange(translatorContext, 
parent, parentRequire);
+            newChildren.add(childOutput.first);
+        }
+        this.children = newChildren;
+        return Pair.of(this, LocalExchangeType.NOOP);
+    }
+
+    protected Pair<PlanNode, LocalExchangeType> enforceChild(
+            PlanTranslatorContext translatorContext, LocalExchangeTypeRequire 
requireChild, PlanNode originChild) {
+        Pair<PlanNode, LocalExchangeType> childOutput
+                = originChild.enforceAndDeriveLocalExchange(translatorContext, 
this, requireChild);
+        if (!requireChild.satisfy(childOutput.second)) {
+            LocalExchangeType preferType = requireChild.preferType();
+            return Pair.of(
+                    new LocalExchangeNode(translatorContext.nextPlanNodeId(), 
childOutput.first, preferType),
+                    preferType
+            );
+        } else {
+            return childOutput;
+        }
+    }
+
+    protected List<Expr> getChildDistributeExprList(int childIndex) {
+        if ((childrenDistributeExprLists == null || 
childrenDistributeExprLists.size() <= childIndex)) {
+            return null;
+        } else {
+            return childrenDistributeExprLists.get(childIndex);
+        }
+    }
+
+    public List<List<Expr>> getChildrenDistributeExprLists() {
+        return childrenDistributeExprLists;
+    }
+
+    public List<Expr> getDistributeExprLists() {
+        return distributeExprLists;
+    }
+
+    public void setDistributeExprLists(List<Expr> distributeExprLists) {
+        if (distributeExprLists == null) {
+            this.distributeExprLists = Collections.emptyList();
+        } else {
+            this.distributeExprLists = distributeExprLists;
+        }
+    }
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/planner/SetOperationNode.java 
b/fe/fe-core/src/main/java/org/apache/doris/planner/SetOperationNode.java
index 2745fcaa135..acb808cef55 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/SetOperationNode.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/SetOperationNode.java
@@ -19,6 +19,11 @@ package org.apache.doris.planner;
 
 import org.apache.doris.analysis.Expr;
 import org.apache.doris.analysis.TupleId;
+import org.apache.doris.common.Pair;
+import org.apache.doris.nereids.glue.translator.PlanTranslatorContext;
+import org.apache.doris.planner.LocalExchangeNode.LocalExchangeType;
+import org.apache.doris.planner.LocalExchangeNode.LocalExchangeTypeRequire;
+import org.apache.doris.planner.LocalExchangeNode.NoRequire;
 import org.apache.doris.thrift.TExceptNode;
 import org.apache.doris.thrift.TExplainLevel;
 import org.apache.doris.thrift.TExpr;
@@ -34,6 +39,7 @@ import org.apache.commons.collections4.CollectionUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.ArrayList;
 import java.util.List;
 import java.util.stream.Collectors;
 
@@ -188,4 +194,59 @@ public abstract class SetOperationNode extends PlanNode {
     public boolean isBucketShuffle() {
         return distributionMode.equals(DistributionMode.BUCKET_SHUFFLE);
     }
+
+    public boolean isColocate() {
+        return isColocate;
+    }
+
+    @Override
+    public Pair<PlanNode, LocalExchangeType> 
enforceAndDeriveLocalExchange(PlanTranslatorContext translatorContext,
+            PlanNode parent, LocalExchangeTypeRequire parentRequire) {
+        if (this instanceof UnionNode) {
+            ArrayList<PlanNode> newChildren = Lists.newArrayList();
+            NoRequire requireChild = LocalExchangeTypeRequire.noRequire();
+            for (PlanNode child : children) {
+                Pair<PlanNode, LocalExchangeType> childOutput
+                        = 
child.enforceAndDeriveLocalExchange(translatorContext, this, requireChild);
+                if (!requireChild.satisfy(childOutput.second)) {
+                    LocalExchangeType preferType = requireChild.preferType();
+                    LocalExchangeNode localExchangeNode
+                            = new 
LocalExchangeNode(translatorContext.nextPlanNodeId(), child, preferType);
+                    newChildren.add(localExchangeNode);
+                } else {
+                    newChildren.add(childOutput.first);
+                }
+            }
+
+            this.children = newChildren;
+            return Pair.of(this, LocalExchangeType.NOOP);
+        } else {
+            LocalExchangeTypeRequire requireChild;
+            LocalExchangeType outputType;
+            if (AddLocalExchange.isColocated(this)) {
+                requireChild = LocalExchangeTypeRequire.requireBucketHash();
+                outputType = LocalExchangeType.BUCKET_HASH_SHUFFLE;
+            } else {
+                requireChild = LocalExchangeTypeRequire.requireExecutionHash();
+                outputType = LocalExchangeType.GLOBAL_EXECUTION_HASH_SHUFFLE;
+            }
+
+            ArrayList<PlanNode> newChildren = Lists.newArrayList();
+            for (PlanNode child : children) {
+                Pair<PlanNode, LocalExchangeType> childOutput
+                        = 
child.enforceAndDeriveLocalExchange(translatorContext, this, requireChild);
+                if (!requireChild.satisfy(childOutput.second)) {
+                    LocalExchangeType preferType = requireChild.preferType();
+                    LocalExchangeNode localExchangeNode
+                            = new 
LocalExchangeNode(translatorContext.nextPlanNodeId(), child, preferType);
+                    newChildren.add(localExchangeNode);
+                } else {
+                    newChildren.add(childOutput.first);
+                }
+            }
+
+            this.children = newChildren;
+            return Pair.of(this, outputType);
+        }
+    }
 }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/SortNode.java 
b/fe/fe-core/src/main/java/org/apache/doris/planner/SortNode.java
index a0e65a38011..fe30b55f428 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/SortNode.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/SortNode.java
@@ -23,6 +23,9 @@ package org.apache.doris.planner;
 import org.apache.doris.analysis.Expr;
 import org.apache.doris.analysis.SortInfo;
 import org.apache.doris.common.Pair;
+import org.apache.doris.nereids.glue.translator.PlanTranslatorContext;
+import org.apache.doris.planner.LocalExchangeNode.LocalExchangeType;
+import org.apache.doris.planner.LocalExchangeNode.LocalExchangeTypeRequire;
 import org.apache.doris.qe.ConnectContext;
 import org.apache.doris.thrift.TExplainLevel;
 import org.apache.doris.thrift.TPlanNode;
@@ -243,4 +246,38 @@ public class SortNode extends PlanNode {
             List<Pair<Integer, Integer>> topnFilterTargets) {
         this.topnFilterTargets = topnFilterTargets;
     }
+
+    @Override
+    public Pair<PlanNode, LocalExchangeType> 
enforceAndDeriveLocalExchange(PlanTranslatorContext translatorContext,
+            PlanNode parent, LocalExchangeTypeRequire parentRequire) {
+
+        LocalExchangeTypeRequire requireChild;
+        LocalExchangeType outputType = null;
+        if (children.get(0) instanceof AnalyticEvalNode) {
+            if (AddLocalExchange.isColocated(this)) {
+                requireChild = LocalExchangeTypeRequire.requireBucketHash();
+                outputType = LocalExchangeType.BUCKET_HASH_SHUFFLE;
+            } else {
+                requireChild = parentRequire.autoHash();
+            }
+        } else if (mergeByexchange) {
+            requireChild = LocalExchangeTypeRequire.requirePassthrough();
+            outputType = LocalExchangeType.PASSTHROUGH;
+        } else if 
(fragment.useSerialSource(translatorContext.getConnectContext())
+                && children.get(0) instanceof ScanNode) {
+            requireChild = LocalExchangeTypeRequire.requirePassthrough();
+            outputType = LocalExchangeType.PASSTHROUGH;
+        } else {
+            requireChild = LocalExchangeTypeRequire.noRequire();
+            outputType = LocalExchangeType.NOOP;
+        }
+
+        Pair<PlanNode, LocalExchangeType> enforceResult
+                = enforceChild(translatorContext, requireChild, 
children.get(0));
+        this.children = Lists.newArrayList(enforceResult.first);
+        if (outputType == null) {
+            outputType = enforceResult.second;
+        }
+        return Pair.of(this, outputType);
+    }
 }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
index 3fc7d3a7634..8e87e146ab2 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
@@ -331,6 +331,8 @@ public class SessionVariable implements Serializable, 
Writable {
 
     public static final String ENABLE_LOCAL_SHUFFLE = "enable_local_shuffle";
 
+    public static final String ENABLE_LOCAL_SHUFFLE_PLANNER = 
"enable_local_shuffle_planner";
+
     public static final String FORCE_TO_LOCAL_SHUFFLE = 
"force_to_local_shuffle";
 
     public static final String ENABLE_LOCAL_MERGE_SORT = 
"enable_local_merge_sort";
@@ -1537,6 +1539,12 @@ public class SessionVariable implements Serializable, 
Writable {
                     "Whether to enable local shuffle on pipelineX engine."}, 
needForward = true)
     private boolean enableLocalShuffle = true;
 
+    @VariableMgr.VarAttr(
+            name = ENABLE_LOCAL_SHUFFLE_PLANNER, fuzzy = false, varType = 
VariableAnnotation.EXPERIMENTAL,
+            description = {"是否在FE规划Local Shuffle",
+                    "Whether to plan local shuffle in frontend"}, needForward 
= true)
+    private boolean enableLocalShufflePlanner = true;
+
     @VariableMgr.VarAttr(
                 name = FORCE_TO_LOCAL_SHUFFLE, fuzzy = false, varType = 
VariableAnnotation.EXPERIMENTAL,
                 description = {"是否在 pipelineX 引擎上强制开启 local shuffle 优化",
@@ -4459,6 +4467,14 @@ public class SessionVariable implements Serializable, 
Writable {
         this.enableLocalShuffle = enableLocalShuffle;
     }
 
+    public boolean isEnableLocalShufflePlanner() {
+        return enableLocalShufflePlanner;
+    }
+
+    public void setEnableLocalShufflePlanner(boolean 
enableLocalShufflePlanner) {
+        this.enableLocalShufflePlanner = enableLocalShufflePlanner;
+    }
+
     public boolean enablePushDownNoGroupAgg() {
         return enablePushDownNoGroupAgg;
     }
@@ -5293,6 +5309,8 @@ public class SessionVariable implements Serializable, 
Writable {
         // Set Iceberg write target file size
         
tResult.setIcebergWriteTargetFileSizeBytes(icebergWriteTargetFileSizeBytes);
 
+        tResult.setEnableLocalShufflePlanner(enableLocalShufflePlanner);
+
         return tResult;
     }
 
diff --git a/gensrc/thrift/PaloInternalService.thrift 
b/gensrc/thrift/PaloInternalService.thrift
index e495d529ef6..ff955e972d7 100644
--- a/gensrc/thrift/PaloInternalService.thrift
+++ b/gensrc/thrift/PaloInternalService.thrift
@@ -440,6 +440,9 @@ struct TQueryOptions {
   // Use paimon-cpp to read Paimon splits on BE
   201: optional bool enable_paimon_cpp_reader = false;
 
+  // enable plan local exchange node in fe
+  201: optional bool enable_local_shuffle_planner;
+
   // For cloud, to control if the content would be written into file cache
   // In write path, to control if the content would be written into file cache.
   // In read path, read from file cache or remote storage when execute query.
diff --git a/gensrc/thrift/Partitions.thrift b/gensrc/thrift/Partitions.thrift
index 86a2d9be555..a122cae6ac7 100644
--- a/gensrc/thrift/Partitions.thrift
+++ b/gensrc/thrift/Partitions.thrift
@@ -52,6 +52,39 @@ enum TPartitionType {
   HIVE_TABLE_SINK_UNPARTITIONED = 8
 }
 
+enum TLocalPartitionType {
+  // used to resume the global hash distribution because other distribution 
break the global hash distribution,
+  // such as PASSTHROUGH. and then JoinNode can shuffle data by the same hash 
distribution.
+  //
+  // for example:                                   look here, need resume to 
GLOBAL_EXECUTION_HASH_SHUFFLE
+  //                                                                           
 ↓
+  //   Node -> LocalExchangeNode(PASSTHROUGH) → JoinNode →  
LocalExchangeNode(GLOBAL_EXECUTION_HASH_SHUFFLE) → JoinNode
+  //                  ExchangeNode(BROADCAST) ↗                                
                                  ↑
+  //                                                                         
ExchangeNode(GLOBAL_EXECUTION_HASH_SHUFFLE)
+  GLOBAL_EXECUTION_HASH_SHUFFLE = 0,
+  // used to rebalance data for rebalance data and add parallelism
+  //
+  // for example:          look here, need use LOCAL_EXECUTION_HASH_SHUFFLE to 
rebalance data
+  //                                         ↓
+  //  Scan(hash(id)) -> LocalExchangeNode(LOCAL_EXECUTION_HASH_SHUFFLE(id, 
name)) → AggregationNode(group by(id,name))
+  //
+  // the LOCAL_EXECUTION_HASH_SHUFFLE is necessary because the hash 
distribution of scan node is based on id,
+  // but the hash distribution of aggregation node is based on id and name, so 
we need to rebalance data by both
+  // id and name to make sure the data with same id and name can be sent to 
the same instance of aggregation node.
+  // and we can not use GLOBAL_EXECUTION_HASH_SHUFFLE(id, name) here, because
+  // `TPipelineFragmentParams.shuffle_idx_to_instance_idx` is used to mapping 
partial global instance index to local
+  // instance index, and discard the other backend's instance index, the data 
not belong to the local instance will be
+  // discarded, which cause data loss.
+  LOCAL_EXECUTION_HASH_SHUFFLE = 1,
+  BUCKET_HASH_SHUFFLE = 2,
+  // round-robin partition, used to rebalance data for rebalance data and add 
parallelism
+  PASSTHROUGH = 3,
+  ADAPTIVE_PASSTHROUGH = 4,
+  BROADCAST = 5,
+  PASS_TO_ONE = 6,
+  LOCAL_MERGE_SORT = 7
+}
+
 enum TDistributionType {
   UNPARTITIONED = 0,
 
diff --git a/gensrc/thrift/PlanNodes.thrift b/gensrc/thrift/PlanNodes.thrift
index 1369c6e1045..1aa3969d1e2 100644
--- a/gensrc/thrift/PlanNodes.thrift
+++ b/gensrc/thrift/PlanNodes.thrift
@@ -62,7 +62,8 @@ enum TPlanNodeType {
   GROUP_COMMIT_SCAN_NODE = 33,
   MATERIALIZATION_NODE = 34,
   REC_CTE_NODE = 35,
-  REC_CTE_SCAN_NODE = 36
+  REC_CTE_SCAN_NODE = 36,
+  LOCAL_EXCHANGE_NODE = 37
 }
 
 struct TKeyRange {
@@ -1287,6 +1288,24 @@ struct TExchangeNode {
   4: optional Partitions.TPartitionType partition_type
 }
 
+struct TLocalExchangeNode {
+  1: required Partitions.TLocalPartitionType partition_type
+  // when partition_type in (GLOBAL_EXECUTION_HASH_SHUFFLE, 
LOCAL_EXECUTION_HASH_SHUFFLE, BUCKET_HASH_SHUFFLE),
+  // the distribute_expr_lists is not null, and the legacy 
`TPlanNode.distribute_expr_lists` is deprecated
+  //
+  // the hash computation:
+  // 1. for BUCKET_HASH_SHUFFLE, use distribution_exprs to compute hash value 
and mod by
+  //    `TPipelineFragmentParams.num_buckets`, and mapping bucket index to 
local instance id by
+  //    `TPipelineFragmentParams.bucket_seq_to_instance_idx`
+  // 2. for LOCAL_EXECUTION_HASH_SHUFFLE, use distribution_exprs to compute 
hash value and mod by
+  //    `TPipelineFragmentParams.local_params.size`, and backend will mapping 
instance index to local instance
+  //    by `i -> i`, for example: 1 -> 1, 2 -> 2, ...
+  // 3. for GLOBAL_EXECUTION_HASH_SHUFFLE, use distribution_exprs to compute 
hash value and mod by
+  //    `TPipelineFragmentParams.total_instances`, and mapping global instance 
index to local instance by
+  //    `TPipelineFragmentParams.shuffle_idx_to_instance_idx`
+  2: optional list<Exprs.TExpr> distribute_expr_lists
+}
+
 struct TOlapRewriteNode {
     1: required list<Exprs.TExpr> columns
     2: required list<Types.TColumnType> column_types
@@ -1503,6 +1522,7 @@ struct TPlanNode {
   50: optional list<list<Exprs.TExpr>> distribute_expr_lists
   51: optional bool is_serial_operator
   52: optional TRecCTEScanNode rec_cte_scan_node
+  53: optional TLocalExchangeNode local_exchange_node
 
   // projections is final projections, which means projecting into results and 
materializing them into the output block.
   101: optional list<Exprs.TExpr> projections


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to