This is an automated email from the ASF dual-hosted git repository.

huajianlan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 07ec65712d6 [opt](Nereids) optimize performance in new distribute 
planner (#44048)
07ec65712d6 is described below

commit 07ec65712d67efd072667247704b40adaf3e35a7
Author: 924060929 <[email protected]>
AuthorDate: Mon Nov 18 18:43:21 2024 +0800

    [opt](Nereids) optimize performance in new distribute planner (#44048)
    
    optimize new distribute planner performance in tpc-h, because #41730
    made some performance rollback has occurred
    
    1. fix the wrong runtime filter thrift parameters
    2. not default to print distribute plan in profile, you should config
    `set profile_level=3` to see it
    3. for shuffle join which two sides distribution of natural +
    execution_bucketed, support compare cost between plans of shuffle to
    left/right
---
 .../org/apache/doris/common/profile/Profile.java   |  12 +-
 .../nereids/jobs/cascades/CostAndEnforcerJob.java  |  83 +++++----
 .../properties/ChildrenPropertiesRegulator.java    | 188 +++++++++++++--------
 .../worker/job/AbstractUnassignedScanJob.java      | 112 ++++++------
 .../job/LocalShuffleBucketJoinAssignedJob.java     |  56 ++++++
 .../distribute/worker/job/StaticAssignedJob.java   |   5 +
 .../job/UnassignedScanBucketOlapTableJob.java      |  38 ++++-
 .../doris/qe/runtime/PipelineExecutionTask.java    |   2 -
 .../qe/runtime/RuntimeFiltersThriftBuilder.java    |  15 +-
 .../doris/qe/runtime/ThriftPlansBuilder.java       |  19 ++-
 .../distribute/shuffle_left_join.out               |   8 +-
 .../tpch_sf1000/nostats_rf_prune/q15.out           |   2 +-
 .../tpch_sf1000/nostats_rf_prune/q20-rewrite.out   |   2 +-
 .../tpch_sf1000/nostats_rf_prune/q20.out           |   2 +-
 .../tpch_sf1000/rf_prune/q20-rewrite.out           |   2 +-
 .../new_shapes_p0/tpch_sf1000/rf_prune/q20.out     |   2 +-
 .../tpch_sf1000/shape/q20-rewrite.out              |   2 +-
 .../data/new_shapes_p0/tpch_sf1000/shape/q20.out   |   2 +-
 .../tpch_sf1000/shape_no_stats/q15.out             |   2 +-
 .../tpch_sf1000/shape_no_stats/q20-rewrite.out     |   2 +-
 .../tpch_sf1000/shape_no_stats/q20.out             |   2 +-
 .../distribute/shuffle_left_join.groovy            |   4 +-
 22 files changed, 363 insertions(+), 199 deletions(-)

diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/common/profile/Profile.java 
b/fe/fe-core/src/main/java/org/apache/doris/common/profile/Profile.java
index 5381cc26bd2..71d51c32faf 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/common/profile/Profile.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/common/profile/Profile.java
@@ -286,12 +286,14 @@ public class Profile {
                 NereidsPlanner nereidsPlanner = ((NereidsPlanner) planner);
                 physicalPlan = nereidsPlanner.getPhysicalPlan();
                 
physicalRelations.addAll(nereidsPlanner.getPhysicalRelations());
-                FragmentIdMapping<DistributedPlan> distributedPlans = 
nereidsPlanner.getDistributedPlans();
-                if (distributedPlans != null) {
-                    summaryInfo.put(SummaryProfile.DISTRIBUTED_PLAN,
-                            
DistributedPlan.toString(Lists.newArrayList(distributedPlans.values()))
+                if (profileLevel >= 3) {
+                    FragmentIdMapping<DistributedPlan> distributedPlans = 
nereidsPlanner.getDistributedPlans();
+                    if (distributedPlans != null) {
+                        summaryInfo.put(SummaryProfile.DISTRIBUTED_PLAN,
+                                
DistributedPlan.toString(Lists.newArrayList(distributedPlans.values()))
                                     .replace("\n", "\n     ")
-                    );
+                        );
+                    }
                 }
             }
 
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/jobs/cascades/CostAndEnforcerJob.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/jobs/cascades/CostAndEnforcerJob.java
index 101b3ca6670..96960475e9a 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/jobs/cascades/CostAndEnforcerJob.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/jobs/cascades/CostAndEnforcerJob.java
@@ -234,52 +234,61 @@ public class CostAndEnforcerJob extends Job implements 
Cloneable {
      * @return false if error occurs, the caller will return.
      */
     private boolean calculateEnforce(List<PhysicalProperties> 
requestChildrenProperties,
-            List<PhysicalProperties> outputChildrenProperties) {
+            List<PhysicalProperties> originOutputChildrenProperties) {
+
         // to ensure distributionSpec has been added sufficiently.
         // it's certain that lowestCostChildren is equals to arity().
         ChildrenPropertiesRegulator regulator = new 
ChildrenPropertiesRegulator(groupExpression,
-                lowestCostChildren, outputChildrenProperties, 
requestChildrenProperties, context);
-        boolean success = regulator.adjustChildrenProperties();
-        if (!success) {
+                lowestCostChildren, new 
ArrayList<>(originOutputChildrenProperties),
+                requestChildrenProperties, context);
+        List<List<PhysicalProperties>> childrenOutputSpace = 
regulator.adjustChildrenProperties();
+        if (childrenOutputSpace.isEmpty()) {
             // invalid enforce, return.
             return false;
         }
 
-        // Not need to do pruning here because it has been done when we get the
-        // best expr from the child group
-        ChildOutputPropertyDeriver childOutputPropertyDeriver
-                = new ChildOutputPropertyDeriver(outputChildrenProperties);
-        // the physical properties the group expression support for its parent.
-        PhysicalProperties outputProperty = 
childOutputPropertyDeriver.getOutputProperties(getConnectContext(),
-                groupExpression);
-
-        // update current group statistics and re-compute costs.
-        if (groupExpression.children().stream().anyMatch(group -> 
group.getStatistics() == null)
-                && groupExpression.getOwnerGroup().getStatistics() == null) {
-            // if we come here, mean that we have some error in stats 
calculator and should fix it.
-            LOG.warn("Nereids try to calculate cost without stats for group 
expression {}", groupExpression);
-            return false;
-        }
+        boolean hasSuccess = false;
+        for (List<PhysicalProperties> outputChildrenProperties : 
childrenOutputSpace) {
+            // Not need to do pruning here because it has been done when we 
get the
+            // best expr from the child group
+            ChildOutputPropertyDeriver childOutputPropertyDeriver
+                    = new ChildOutputPropertyDeriver(outputChildrenProperties);
+            // the physical properties the group expression support for its 
parent.
+            // some cases maybe output some possibilities, for example, 
shuffle join
+            // maybe select left shuffle to right, or right shuffle to left, 
so their
+            // are 2 output properties possibilities
+            PhysicalProperties outputProperty
+                    = 
childOutputPropertyDeriver.getOutputProperties(getConnectContext(), 
groupExpression);
+
+            // update current group statistics and re-compute costs.
+            if (groupExpression.children().stream().anyMatch(group -> 
group.getStatistics() == null)
+                    && groupExpression.getOwnerGroup().getStatistics() == 
null) {
+                // if we come here, mean that we have some error in stats 
calculator and should fix it.
+                LOG.warn("Nereids try to calculate cost without stats for 
group expression {}", groupExpression);
+            }
 
-        // recompute cost after adjusting property
-        curNodeCost = CostCalculator.calculateCost(getConnectContext(), 
groupExpression, requestChildrenProperties);
-        groupExpression.setCost(curNodeCost);
-        curTotalCost = curNodeCost;
-        for (int i = 0; i < outputChildrenProperties.size(); i++) {
-            PhysicalProperties childProperties = 
outputChildrenProperties.get(i);
-            curTotalCost = CostCalculator.addChildCost(
-                    getConnectContext(),
-                    groupExpression.getPlan(),
-                    curTotalCost,
-                    
groupExpression.child(i).getLowestCostPlan(childProperties).get().first,
-                    i);
-        }
+            // recompute cost after adjusting property
+            curNodeCost = CostCalculator.calculateCost(getConnectContext(), 
groupExpression, requestChildrenProperties);
+            groupExpression.setCost(curNodeCost);
+            curTotalCost = curNodeCost;
+            for (int i = 0; i < outputChildrenProperties.size(); i++) {
+                PhysicalProperties childProperties = 
outputChildrenProperties.get(i);
+                curTotalCost = CostCalculator.addChildCost(
+                        getConnectContext(),
+                        groupExpression.getPlan(),
+                        curTotalCost,
+                        
groupExpression.child(i).getLowestCostPlan(childProperties).get().first,
+                        i);
+            }
 
-        // record map { outputProperty -> outputProperty }, { ANY -> 
outputProperty },
-        recordPropertyAndCost(groupExpression, outputProperty, 
PhysicalProperties.ANY, outputChildrenProperties);
-        recordPropertyAndCost(groupExpression, outputProperty, outputProperty, 
outputChildrenProperties);
-        enforce(outputProperty, outputChildrenProperties);
-        return true;
+            // record map { outputProperty -> outputProperty }, { ANY -> 
outputProperty },
+            recordPropertyAndCost(groupExpression, outputProperty, 
PhysicalProperties.ANY, outputChildrenProperties);
+            recordPropertyAndCost(groupExpression, outputProperty, 
outputProperty, outputChildrenProperties);
+            enforce(outputProperty, outputChildrenProperties);
+
+            hasSuccess = true;
+        }
+        return hasSuccess;
     }
 
     /**
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/ChildrenPropertiesRegulator.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/ChildrenPropertiesRegulator.java
index 9985b9c567f..b821ff0de87 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/ChildrenPropertiesRegulator.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/ChildrenPropertiesRegulator.java
@@ -65,20 +65,20 @@ import java.util.stream.Collectors;
  * NOTICE: all visitor should call visit(plan, context) at proper place
  * to process must shuffle except project and filter
  */
-public class ChildrenPropertiesRegulator extends PlanVisitor<Boolean, Void> {
+public class ChildrenPropertiesRegulator extends 
PlanVisitor<List<List<PhysicalProperties>>, Void> {
 
     private final GroupExpression parent;
     private final List<GroupExpression> children;
-    private final List<PhysicalProperties> childrenProperties;
+    private final List<PhysicalProperties> originChildrenProperties;
     private final List<PhysicalProperties> requiredProperties;
     private final JobContext jobContext;
 
     public ChildrenPropertiesRegulator(GroupExpression parent, 
List<GroupExpression> children,
-            List<PhysicalProperties> childrenProperties, 
List<PhysicalProperties> requiredProperties,
+            List<PhysicalProperties> originChildrenProperties, 
List<PhysicalProperties> requiredProperties,
             JobContext jobContext) {
         this.parent = parent;
         this.children = children;
-        this.childrenProperties = childrenProperties;
+        this.originChildrenProperties = originChildrenProperties;
         this.requiredProperties = requiredProperties;
         this.jobContext = jobContext;
     }
@@ -88,34 +88,35 @@ public class ChildrenPropertiesRegulator extends 
PlanVisitor<Boolean, Void> {
      *
      * @return enforce cost.
      */
-    public boolean adjustChildrenProperties() {
+    public List<List<PhysicalProperties>> adjustChildrenProperties() {
         return parent.getPlan().accept(this, null);
     }
 
     @Override
-    public Boolean visit(Plan plan, Void context) {
+    public List<List<PhysicalProperties>> visit(Plan plan, Void context) {
         // process must shuffle
         for (int i = 0; i < children.size(); i++) {
-            DistributionSpec distributionSpec = 
childrenProperties.get(i).getDistributionSpec();
+            DistributionSpec distributionSpec = 
originChildrenProperties.get(i).getDistributionSpec();
             if (distributionSpec instanceof DistributionSpecMustShuffle) {
                 updateChildEnforceAndCost(i, PhysicalProperties.EXECUTION_ANY);
             }
         }
-        return true;
+        return ImmutableList.of(originChildrenProperties);
     }
 
     @Override
-    public Boolean visitPhysicalHashAggregate(PhysicalHashAggregate<? extends 
Plan> agg, Void context) {
+    public List<List<PhysicalProperties>> visitPhysicalHashAggregate(
+            PhysicalHashAggregate<? extends Plan> agg, Void context) {
         if (agg.getGroupByExpressions().isEmpty() && 
agg.getOutputExpressions().isEmpty()) {
-            return false;
+            return ImmutableList.of();
         }
         if (!agg.getAggregateParam().canBeBanned) {
-            return true;
+            return ImmutableList.of(originChildrenProperties);
         }
         // forbid one phase agg on distribute
         if (agg.getAggMode() == AggMode.INPUT_TO_RESULT && 
children.get(0).getPlan() instanceof PhysicalDistribute) {
             // this means one stage gather agg, usually bad pattern
-            return false;
+            return ImmutableList.of();
         }
 
         // forbid TWO_PHASE_AGGREGATE_WITH_DISTINCT after shuffle
@@ -123,7 +124,7 @@ public class ChildrenPropertiesRegulator extends 
PlanVisitor<Boolean, Void> {
         if (agg.getAggMode() == AggMode.INPUT_TO_BUFFER
                 && requiredProperties.get(0).getDistributionSpec() instanceof 
DistributionSpecHash
                 && children.get(0).getPlan() instanceof PhysicalDistribute) {
-            return false;
+            return ImmutableList.of();
         }
 
         // agg(group by x)-union all(A, B)
@@ -132,7 +133,7 @@ public class ChildrenPropertiesRegulator extends 
PlanVisitor<Boolean, Void> {
         if (agg.getAggMode() == AggMode.INPUT_TO_RESULT
                 && children.get(0).getPlan() instanceof PhysicalUnion
                 && !((PhysicalUnion) children.get(0).getPlan()).isDistinct()) {
-            return false;
+            return ImmutableList.of();
         }
         // forbid multi distinct opt that bad than multi-stage version when 
multi-stage can be executed in one fragment
         if (agg.getAggMode() == AggMode.INPUT_TO_BUFFER || agg.getAggMode() == 
AggMode.INPUT_TO_RESULT) {
@@ -146,7 +147,7 @@ public class ChildrenPropertiesRegulator extends 
PlanVisitor<Boolean, Void> {
                     .collect(Collectors.toList());
             if (multiDistinctions.size() == 1) {
                 Expression distinctChild = multiDistinctions.get(0).child(0);
-                DistributionSpec childDistribution = 
childrenProperties.get(0).getDistributionSpec();
+                DistributionSpec childDistribution = 
originChildrenProperties.get(0).getDistributionSpec();
                 if (distinctChild instanceof SlotReference && 
childDistribution instanceof DistributionSpecHash) {
                     SlotReference slotReference = (SlotReference) 
distinctChild;
                     DistributionSpecHash distributionSpecHash = 
(DistributionSpecHash) childDistribution;
@@ -163,7 +164,7 @@ public class ChildrenPropertiesRegulator extends 
PlanVisitor<Boolean, Void> {
                     if ((!groupByColumns.isEmpty() && 
distributionSpecHash.satisfy(groupByRequire))
                             || (groupByColumns.isEmpty() && 
distributionSpecHash.satisfy(distinctChildRequire))) {
                         if (!agg.mustUseMultiDistinctAgg()) {
-                            return false;
+                            return ImmutableList.of();
                         }
                     }
                 }
@@ -171,40 +172,41 @@ public class ChildrenPropertiesRegulator extends 
PlanVisitor<Boolean, Void> {
                 // because the second phase of multi-distinct only have one 
instance, and it is slow generally.
                 if (agg.getOutputExpressions().size() == 1 && 
agg.getGroupByExpressions().isEmpty()
                         && !agg.mustUseMultiDistinctAgg()) {
-                    return false;
+                    return ImmutableList.of();
                 }
             }
         }
         // process must shuffle
         visit(agg, context);
         // process agg
-        return true;
+        return ImmutableList.of(originChildrenProperties);
     }
 
     @Override
-    public Boolean visitPhysicalPartitionTopN(PhysicalPartitionTopN<? extends 
Plan> partitionTopN, Void context) {
+    public List<List<PhysicalProperties>> visitPhysicalPartitionTopN(
+            PhysicalPartitionTopN<? extends Plan> partitionTopN, Void context) 
{
         if (partitionTopN.getPhase().isOnePhaseGlobal() && 
children.get(0).getPlan() instanceof PhysicalDistribute) {
             // one phase partition topn, if the child is an enforced 
distribution, discard this
             // and use two phase candidate.
-            return false;
+            return ImmutableList.of();
         } else if (partitionTopN.getPhase().isTwoPhaseGlobal()
                 && !(children.get(0).getPlan() instanceof PhysicalDistribute)) 
{
             // two phase partition topn, if global's child is not 
distribution, which means
             // the local distribution has met final requirement, discard this 
candidate.
-            return false;
+            return ImmutableList.of();
         } else {
             visit(partitionTopN, context);
-            return true;
+            return ImmutableList.of(originChildrenProperties);
         }
     }
 
     @Override
-    public Boolean visitPhysicalFilter(PhysicalFilter<? extends Plan> filter, 
Void context) {
+    public List<List<PhysicalProperties>> visitPhysicalFilter(PhysicalFilter<? 
extends Plan> filter, Void context) {
         // do not process must shuffle
         if (children.get(0).getPlan() instanceof PhysicalDistribute) {
-            return false;
+            return ImmutableList.of();
         }
-        return true;
+        return ImmutableList.of(originChildrenProperties);
     }
 
     private boolean isBucketShuffleDownGrade(Plan oneSidePlan, 
DistributionSpecHash otherSideSpec) {
@@ -268,20 +270,20 @@ public class ChildrenPropertiesRegulator extends 
PlanVisitor<Boolean, Void> {
     }
 
     @Override
-    public Boolean visitPhysicalHashJoin(
+    public List<List<PhysicalProperties>> visitPhysicalHashJoin(
             PhysicalHashJoin<? extends Plan, ? extends Plan> hashJoin, Void 
context) {
         Preconditions.checkArgument(children.size() == 2, "children.size() != 
2");
-        Preconditions.checkArgument(childrenProperties.size() == 2);
+        Preconditions.checkArgument(originChildrenProperties.size() == 2);
         Preconditions.checkArgument(requiredProperties.size() == 2);
         // process must shuffle
         visit(hashJoin, context);
         // process hash join
-        DistributionSpec leftDistributionSpec = 
childrenProperties.get(0).getDistributionSpec();
-        DistributionSpec rightDistributionSpec = 
childrenProperties.get(1).getDistributionSpec();
+        DistributionSpec leftDistributionSpec = 
originChildrenProperties.get(0).getDistributionSpec();
+        DistributionSpec rightDistributionSpec = 
originChildrenProperties.get(1).getDistributionSpec();
 
         // broadcast do not need regular
         if (rightDistributionSpec instanceof DistributionSpecReplicated) {
-            return true;
+            return ImmutableList.of(originChildrenProperties);
         }
 
         // shuffle
@@ -301,7 +303,7 @@ public class ChildrenPropertiesRegulator extends 
PlanVisitor<Boolean, Void> {
 
         if (JoinUtils.couldColocateJoin(leftHashSpec, rightHashSpec, 
hashJoin.getHashJoinConjuncts())) {
             // check colocate join with scan
-            return true;
+            return ImmutableList.of(originChildrenProperties);
         } else if (couldNotRightBucketShuffleJoin(hashJoin.getJoinType(), 
leftHashSpec, rightHashSpec)) {
             // right anti, right outer, full outer join could not do bucket 
shuffle join
             // TODO remove this after we refactor coordinator
@@ -339,6 +341,24 @@ public class ChildrenPropertiesRegulator extends 
PlanVisitor<Boolean, Void> {
                     (DistributionSpecHash) 
requiredProperties.get(1).getDistributionSpec()));
         } else if (leftHashSpec.getShuffleType() == ShuffleType.NATURAL
                 && rightHashSpec.getShuffleType() == 
ShuffleType.EXECUTION_BUCKETED) {
+            if (SessionVariable.canUseNereidsDistributePlanner()) {
+                List<PhysicalProperties> shuffleToLeft = 
Lists.newArrayList(originChildrenProperties);
+                PhysicalProperties enforceShuffleRight = 
calAnotherSideRequired(
+                        ShuffleType.STORAGE_BUCKETED, leftHashSpec, 
rightHashSpec,
+                        (DistributionSpecHash) 
requiredProperties.get(0).getDistributionSpec(),
+                        (DistributionSpecHash) 
requiredProperties.get(1).getDistributionSpec());
+                updateChildEnforceAndCost(1, enforceShuffleRight, 
shuffleToLeft);
+
+                List<PhysicalProperties> shuffleToRight = 
Lists.newArrayList(originChildrenProperties);
+                PhysicalProperties enforceShuffleLeft = calAnotherSideRequired(
+                        ShuffleType.EXECUTION_BUCKETED, rightHashSpec, 
leftHashSpec,
+                        (DistributionSpecHash) 
requiredProperties.get(1).getDistributionSpec(),
+                        (DistributionSpecHash) 
requiredProperties.get(0).getDistributionSpec()
+                );
+                updateChildEnforceAndCost(0, enforceShuffleLeft, 
shuffleToRight);
+                return ImmutableList.of(shuffleToLeft, shuffleToRight);
+            }
+
             // must add enforce because shuffle algorithm is not same between 
NATURAL and BUCKETED
             updatedForRight = Optional.of(calAnotherSideRequired(
                     ShuffleType.STORAGE_BUCKETED, leftHashSpec, rightHashSpec,
@@ -349,7 +369,7 @@ public class ChildrenPropertiesRegulator extends 
PlanVisitor<Boolean, Void> {
             if (bothSideShuffleKeysAreSameOrder(leftHashSpec, rightHashSpec,
                     (DistributionSpecHash) 
requiredProperties.get(0).getDistributionSpec(),
                     (DistributionSpecHash) 
requiredProperties.get(1).getDistributionSpec())) {
-                return true;
+                return ImmutableList.of(originChildrenProperties);
             }
             updatedForRight = Optional.of(calAnotherSideRequired(
                     ShuffleType.STORAGE_BUCKETED, leftHashSpec, rightHashSpec,
@@ -362,10 +382,25 @@ public class ChildrenPropertiesRegulator extends 
PlanVisitor<Boolean, Void> {
                 // TODO: maybe we should check if left child is 
PhysicalDistribute.
                 //  If so add storage bucketed shuffle on left side. Other 
wise,
                 //  add execution bucketed shuffle on right side.
-                updatedForLeft = Optional.of(calAnotherSideRequired(
+                // updatedForLeft = Optional.of(calAnotherSideRequired(
+                //         ShuffleType.STORAGE_BUCKETED, rightHashSpec, 
leftHashSpec,
+                //         (DistributionSpecHash) 
requiredProperties.get(1).getDistributionSpec(),
+                //         (DistributionSpecHash) 
requiredProperties.get(0).getDistributionSpec()));
+                List<PhysicalProperties> shuffleToLeft = 
Lists.newArrayList(originChildrenProperties);
+                PhysicalProperties enforceShuffleRight = 
calAnotherSideRequired(
+                        ShuffleType.EXECUTION_BUCKETED, leftHashSpec, 
rightHashSpec,
+                        (DistributionSpecHash) 
requiredProperties.get(0).getDistributionSpec(),
+                        (DistributionSpecHash) 
requiredProperties.get(1).getDistributionSpec());
+                updateChildEnforceAndCost(1, enforceShuffleRight, 
shuffleToLeft);
+
+                List<PhysicalProperties> shuffleToRight = 
Lists.newArrayList(originChildrenProperties);
+                PhysicalProperties enforceShuffleLeft = calAnotherSideRequired(
                         ShuffleType.STORAGE_BUCKETED, rightHashSpec, 
leftHashSpec,
                         (DistributionSpecHash) 
requiredProperties.get(1).getDistributionSpec(),
-                        (DistributionSpecHash) 
requiredProperties.get(0).getDistributionSpec()));
+                        (DistributionSpecHash) 
requiredProperties.get(0).getDistributionSpec()
+                );
+                updateChildEnforceAndCost(0, enforceShuffleLeft, 
shuffleToRight);
+                return ImmutableList.of(shuffleToLeft, shuffleToRight);
             } else {
                 // legacy coordinator could not do right be selection in this 
case,
                 // since it always to check the left most node whether olap 
scan node.
@@ -380,7 +415,7 @@ public class ChildrenPropertiesRegulator extends 
PlanVisitor<Boolean, Void> {
             if (bothSideShuffleKeysAreSameOrder(rightHashSpec, leftHashSpec,
                     (DistributionSpecHash) 
requiredProperties.get(1).getDistributionSpec(),
                     (DistributionSpecHash) 
requiredProperties.get(0).getDistributionSpec())) {
-                return true;
+                return ImmutableList.of(originChildrenProperties);
             }
             updatedForRight = Optional.of(calAnotherSideRequired(
                     ShuffleType.EXECUTION_BUCKETED, leftHashSpec, 
rightHashSpec,
@@ -427,7 +462,7 @@ public class ChildrenPropertiesRegulator extends 
PlanVisitor<Boolean, Void> {
             if (bothSideShuffleKeysAreSameOrder(rightHashSpec, leftHashSpec,
                     (DistributionSpecHash) 
requiredProperties.get(1).getDistributionSpec(),
                     (DistributionSpecHash) 
requiredProperties.get(0).getDistributionSpec())) {
-                return true;
+                return ImmutableList.of(originChildrenProperties);
             }
             if (children.get(0).getPlan() instanceof PhysicalDistribute) {
                 updatedForLeft = Optional.of(calAnotherSideRequired(
@@ -445,58 +480,59 @@ public class ChildrenPropertiesRegulator extends 
PlanVisitor<Boolean, Void> {
         updatedForLeft.ifPresent(physicalProperties -> 
updateChildEnforceAndCost(0, physicalProperties));
         updatedForRight.ifPresent(physicalProperties -> 
updateChildEnforceAndCost(1, physicalProperties));
 
-        return true;
+        return ImmutableList.of(originChildrenProperties);
     }
 
     @Override
-    public Boolean visitPhysicalNestedLoopJoin(PhysicalNestedLoopJoin<? 
extends Plan, ? extends Plan> nestedLoopJoin,
-            Void context) {
+    public List<List<PhysicalProperties>> visitPhysicalNestedLoopJoin(
+            PhysicalNestedLoopJoin<? extends Plan, ? extends Plan> 
nestedLoopJoin, Void context) {
         Preconditions.checkArgument(children.size() == 2, 
String.format("children.size() is %d", children.size()));
-        Preconditions.checkArgument(childrenProperties.size() == 2);
+        Preconditions.checkArgument(originChildrenProperties.size() == 2);
         Preconditions.checkArgument(requiredProperties.size() == 2);
         // process must shuffle
         visit(nestedLoopJoin, context);
         // process nlj
-        DistributionSpec rightDistributionSpec = 
childrenProperties.get(1).getDistributionSpec();
+        DistributionSpec rightDistributionSpec = 
originChildrenProperties.get(1).getDistributionSpec();
         if (rightDistributionSpec instanceof DistributionSpecStorageGather) {
             updateChildEnforceAndCost(1, PhysicalProperties.GATHER);
         }
-        return true;
+        return ImmutableList.of(originChildrenProperties);
     }
 
     @Override
-    public Boolean visitPhysicalProject(PhysicalProject<? extends Plan> 
project, Void context) {
+    public List<List<PhysicalProperties>> 
visitPhysicalProject(PhysicalProject<? extends Plan> project, Void context) {
         // do not process must shuffle
         if (children.get(0).getPlan() instanceof PhysicalDistribute) {
-            return false;
+            return ImmutableList.of();
         }
-        return true;
+        return ImmutableList.of(originChildrenProperties);
     }
 
     @Override
-    public Boolean visitPhysicalSetOperation(PhysicalSetOperation 
setOperation, Void context) {
+    public List<List<PhysicalProperties>> 
visitPhysicalSetOperation(PhysicalSetOperation setOperation, Void context) {
         // process must shuffle
         visit(setOperation, context);
         // union with only constant exprs list
         if (children.isEmpty()) {
-            return true;
+            return ImmutableList.of(originChildrenProperties);
         }
         // process set operation
         PhysicalProperties requiredProperty = requiredProperties.get(0);
         DistributionSpec requiredDistributionSpec = 
requiredProperty.getDistributionSpec();
         if (requiredDistributionSpec instanceof DistributionSpecGather) {
-            for (int i = 0; i < childrenProperties.size(); i++) {
-                if (childrenProperties.get(i).getDistributionSpec() instanceof 
DistributionSpecStorageGather) {
+            for (int i = 0; i < originChildrenProperties.size(); i++) {
+                if (originChildrenProperties.get(i).getDistributionSpec() 
instanceof DistributionSpecStorageGather) {
                     updateChildEnforceAndCost(i, PhysicalProperties.GATHER);
                 }
             }
         } else if (requiredDistributionSpec instanceof DistributionSpecAny) {
-            for (int i = 0; i < childrenProperties.size(); i++) {
-                if (childrenProperties.get(i).getDistributionSpec() instanceof 
DistributionSpecStorageAny
-                        || childrenProperties.get(i).getDistributionSpec() 
instanceof DistributionSpecStorageGather
-                        || childrenProperties.get(i).getDistributionSpec() 
instanceof DistributionSpecGather
-                        || (childrenProperties.get(i).getDistributionSpec() 
instanceof DistributionSpecHash
-                        && ((DistributionSpecHash) 
childrenProperties.get(i).getDistributionSpec())
+            for (int i = 0; i < originChildrenProperties.size(); i++) {
+                PhysicalProperties physicalProperties = 
originChildrenProperties.get(i);
+                if (physicalProperties.getDistributionSpec() instanceof 
DistributionSpecStorageAny
+                        || physicalProperties.getDistributionSpec() instanceof 
DistributionSpecStorageGather
+                        || physicalProperties.getDistributionSpec() instanceof 
DistributionSpecGather
+                        || (physicalProperties.getDistributionSpec() 
instanceof DistributionSpecHash
+                        && ((DistributionSpecHash) 
physicalProperties.getDistributionSpec())
                         .getShuffleType() == ShuffleType.NATURAL)) {
                     updateChildEnforceAndCost(i, 
PhysicalProperties.EXECUTION_ANY);
                 }
@@ -504,8 +540,9 @@ public class ChildrenPropertiesRegulator extends 
PlanVisitor<Boolean, Void> {
         } else if (requiredDistributionSpec instanceof DistributionSpecHash) {
             // TODO: should use the most common hash spec as basic
             DistributionSpecHash basic = (DistributionSpecHash) 
requiredDistributionSpec;
-            for (int i = 0; i < childrenProperties.size(); i++) {
-                DistributionSpecHash current = (DistributionSpecHash) 
childrenProperties.get(i).getDistributionSpec();
+            for (int i = 0; i < originChildrenProperties.size(); i++) {
+                DistributionSpecHash current
+                        = (DistributionSpecHash) 
originChildrenProperties.get(i).getDistributionSpec();
                 if (current.getShuffleType() != ShuffleType.EXECUTION_BUCKETED
                         || !bothSideShuffleKeysAreSameOrder(basic, current,
                         (DistributionSpecHash) 
requiredProperties.get(0).getDistributionSpec(),
@@ -518,41 +555,42 @@ public class ChildrenPropertiesRegulator extends 
PlanVisitor<Boolean, Void> {
                 }
             }
         }
-        return true;
+        return ImmutableList.of(originChildrenProperties);
     }
 
     @Override
-    public Boolean visitAbstractPhysicalSort(AbstractPhysicalSort<? extends 
Plan> sort, Void context) {
+    public List<List<PhysicalProperties>> visitAbstractPhysicalSort(
+            AbstractPhysicalSort<? extends Plan> sort, Void context) {
         // process must shuffle
         visit(sort, context);
         if (sort.getSortPhase() == SortPhase.GATHER_SORT && sort.child() 
instanceof PhysicalDistribute) {
             // forbid gather sort need explicit shuffle
-            return false;
+            return ImmutableList.of();
         }
-        return true;
+        return ImmutableList.of(originChildrenProperties);
     }
 
     @Override
-    public Boolean visitPhysicalTopN(PhysicalTopN<? extends Plan> topN, Void 
context) {
+    public List<List<PhysicalProperties>> visitPhysicalTopN(PhysicalTopN<? 
extends Plan> topN, Void context) {
         // process must shuffle
         visit(topN, context);
 
         int sortPhaseNum = 
jobContext.getCascadesContext().getConnectContext().getSessionVariable().sortPhaseNum;
         // if control sort phase, forbid nothing
         if (sortPhaseNum == 1 || sortPhaseNum == 2) {
-            return true;
+            return ImmutableList.of(originChildrenProperties);
         }
         // If child is DistributionSpecGather, topN should forbid two-phase 
topN
         if (topN.getSortPhase() == SortPhase.LOCAL_SORT
-                && 
childrenProperties.get(0).getDistributionSpec().equals(DistributionSpecGather.INSTANCE))
 {
-            return false;
+                && 
originChildrenProperties.get(0).getDistributionSpec().equals(DistributionSpecGather.INSTANCE))
 {
+            return ImmutableList.of();
         }
         // forbid one step topn with distribute as child
         if (topN.getSortPhase() == SortPhase.GATHER_SORT
                 && children.get(0).getPlan() instanceof PhysicalDistribute) {
-            return false;
+            return ImmutableList.of();
         }
-        return true;
+        return ImmutableList.of(originChildrenProperties);
     }
 
     /**
@@ -645,8 +683,14 @@ public class ChildrenPropertiesRegulator extends 
PlanVisitor<Boolean, Void> {
     }
 
     private void updateChildEnforceAndCost(int index, PhysicalProperties 
targetProperties) {
+        updateChildEnforceAndCost(index, targetProperties, 
originChildrenProperties);
+    }
+
+    private void updateChildEnforceAndCost(
+            int index, PhysicalProperties targetProperties, 
List<PhysicalProperties> childrenProperties) {
         GroupExpression child = children.get(index);
-        Pair<Cost, List<PhysicalProperties>> lowest = 
child.getLowestCostTable().get(childrenProperties.get(index));
+        Pair<Cost, List<PhysicalProperties>> lowest
+                = 
child.getLowestCostTable().get(childrenProperties.get(index));
         PhysicalProperties output = 
child.getOutputProperties(childrenProperties.get(index));
         DistributionSpec target = targetProperties.getDistributionSpec();
         updateChildEnforceAndCost(child, output, target, lowest.first);
@@ -668,10 +712,10 @@ public class ChildrenPropertiesRegulator extends 
PlanVisitor<Boolean, Void> {
         GroupExpression enforcer = target.addEnforcer(child.getOwnerGroup());
         child.getOwnerGroup().addEnforcer(enforcer);
         ConnectContext connectContext = 
jobContext.getCascadesContext().getConnectContext();
-        Cost totalCost = CostCalculator.addChildCost(connectContext, 
enforcer.getPlan(),
-                CostCalculator.calculateCost(connectContext, enforcer, 
Lists.newArrayList(childOutput)),
-                currentCost,
-                0);
+        Cost enforceCost = CostCalculator.calculateCost(connectContext, 
enforcer, Lists.newArrayList(childOutput));
+        enforcer.setCost(enforceCost);
+        Cost totalCost = CostCalculator.addChildCost(
+                connectContext, enforcer.getPlan(), enforceCost, currentCost, 
0);
 
         if (enforcer.updateLowestCostTable(newOutputProperty,
                 Lists.newArrayList(childOutput), totalCost)) {
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/AbstractUnassignedScanJob.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/AbstractUnassignedScanJob.java
index ef3236690f1..d2fbb9905e1 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/AbstractUnassignedScanJob.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/AbstractUnassignedScanJob.java
@@ -74,8 +74,7 @@ public abstract class AbstractUnassignedScanJob extends 
AbstractUnassignedJob {
             DistributeContext distributeContext) {
 
         ConnectContext context = statementContext.getConnectContext();
-        boolean useLocalShuffleToAddParallel = 
fragment.useSerialSource(ConnectContext.get());
-        int instanceIndexInFragment = 0;
+        boolean useLocalShuffleToAddParallel = useLocalShuffleToAddParallel();
         List<AssignedJob> instances = Lists.newArrayList();
         for (Entry<DistributedPlanWorker, UninstancedScanSource> entry : 
workerToScanRanges.entrySet()) {
             DistributedPlanWorker worker = entry.getKey();
@@ -94,64 +93,73 @@ public abstract class AbstractUnassignedScanJob extends 
AbstractUnassignedJob {
             // for example: two instances
             int instanceNum = degreeOfParallelism(scanSourceMaxParallel);
 
-            List<ScanSource> instanceToScanRanges;
             if (useLocalShuffleToAddParallel) {
-                // only generate one instance to scan all data, in this step
-                instanceToScanRanges = scanSource.parallelize(
-                        scanNodes, 1
-                );
-
-                // when data not big, but aggregation too slow, we will use 1 
instance to scan data,
-                // and use more instances (to ***add parallel***) to process 
aggregate.
-                // We call it `ignore data distribution` of `share scan`. 
Backend will know this instances
-                // share the same ScanSource, and will not scan same data 
multiple times.
-                //
-                // +-------------------------------- same fragment in one host 
-------------------------------------+
-                // |                instance1      instance2     instance3     
instance4                            |
-                // |                    \              \             /         
   /                                 |
-                // |                                                           
                                     |
-                // |                                     OlapScanNode          
                                     |
-                // |(share scan node, instance1 will scan all data and local 
shuffle to other local instances       |
-                // |                           to parallel compute this data)  
                                     |
-                // 
+------------------------------------------------------------------------------------------------+
-                ScanSource shareScanSource = instanceToScanRanges.get(0);
-
-                // one scan range generate multiple instances,
-                // different instances reference the same scan source
-                int shareScanId = shareScanIdGenerator.getAndIncrement();
-                ScanSource emptyShareScanSource = shareScanSource.newEmpty();
-                for (int i = 0; i < instanceNum; i++) {
-                    LocalShuffleAssignedJob instance = new 
LocalShuffleAssignedJob(
-                            instanceIndexInFragment++, shareScanId, i > 0,
-                            context.nextInstanceId(), this, worker,
-                            i == 0 ? shareScanSource : emptyShareScanSource
-                    );
-                    instances.add(instance);
-                }
+                assignLocalShuffleJobs(scanSource, instanceNum, instances, 
context, worker);
             } else {
-                // split the scanRanges to some partitions, one partition for 
one instance
-                // for example:
-                //  [
-                //     scan tbl1: [tablet_10001, tablet_10003], // instance 1
-                //     scan tbl1: [tablet_10002, tablet_10004]  // instance 2
-                //  ]
-                instanceToScanRanges = scanSource.parallelize(
-                        scanNodes, instanceNum
-                );
-
-                for (ScanSource instanceToScanRange : instanceToScanRanges) {
-                    instances.add(
-                            assignWorkerAndDataSources(
-                                instanceIndexInFragment++, 
context.nextInstanceId(), worker, instanceToScanRange
-                            )
-                    );
-                }
+                assignedDefaultJobs(scanSource, instanceNum, instances, 
context, worker);
             }
         }
 
         return instances;
     }
 
+    protected boolean useLocalShuffleToAddParallel() {
+        return fragment.useSerialSource(ConnectContext.get());
+    }
+
+    protected void assignedDefaultJobs(ScanSource scanSource, int instanceNum, 
List<AssignedJob> instances,
+            ConnectContext context, DistributedPlanWorker worker) {
+        // split the scanRanges to some partitions, one partition for one 
instance
+        // for example:
+        //  [
+        //     scan tbl1: [tablet_10001, tablet_10003], // instance 1
+        //     scan tbl1: [tablet_10002, tablet_10004]  // instance 2
+        //  ]
+        List<ScanSource> instanceToScanRanges = 
scanSource.parallelize(scanNodes, instanceNum);
+
+        for (ScanSource instanceToScanRange : instanceToScanRanges) {
+            instances.add(
+                    assignWorkerAndDataSources(
+                        instances.size(), context.nextInstanceId(), worker, 
instanceToScanRange
+                    )
+            );
+        }
+    }
+
+    protected void assignLocalShuffleJobs(ScanSource scanSource, int 
instanceNum, List<AssignedJob> instances,
+            ConnectContext context, DistributedPlanWorker worker) {
+        // only generate one instance to scan all data, in this step
+        List<ScanSource> instanceToScanRanges = 
scanSource.parallelize(scanNodes, 1);
+
+        // when data not big, but aggregation too slow, we will use 1 instance 
to scan data,
+        // and use more instances (to ***add parallel***) to process aggregate.
+        // We call it `ignore data distribution` of `share scan`. Backend will 
know this instances
+        // share the same ScanSource, and will not scan same data multiple 
times.
+        //
+        // +-------------------------------- same fragment in one host 
-------------------------------------+
+        // |                instance1      instance2     instance3     
instance4                            |
+        // |                    \              \             /            /    
                             |
+        // |                                                                   
                             |
+        // |                                     OlapScanNode                  
                             |
+        // |(share scan node, instance1 will scan all data and local shuffle 
to other local instances       |
+        // |                           to parallel compute this data)          
                             |
+        // 
+------------------------------------------------------------------------------------------------+
+        ScanSource shareScanSource = instanceToScanRanges.get(0);
+
+        // one scan range generate multiple instances,
+        // different instances reference the same scan source
+        int shareScanId = shareScanIdGenerator.getAndIncrement();
+        ScanSource emptyShareScanSource = shareScanSource.newEmpty();
+        for (int i = 0; i < instanceNum; i++) {
+            LocalShuffleAssignedJob instance = new LocalShuffleAssignedJob(
+                    instances.size(), shareScanId, i > 0,
+                    context.nextInstanceId(), this, worker,
+                    i == 0 ? shareScanSource : emptyShareScanSource
+            );
+            instances.add(instance);
+        }
+    }
+
     protected int degreeOfParallelism(int maxParallel) {
         Preconditions.checkArgument(maxParallel > 0, "maxParallel must be 
positive");
         if (!fragment.getDataPartition().isPartitioned()) {
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/LocalShuffleBucketJoinAssignedJob.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/LocalShuffleBucketJoinAssignedJob.java
new file mode 100644
index 00000000000..443acb50d78
--- /dev/null
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/LocalShuffleBucketJoinAssignedJob.java
@@ -0,0 +1,56 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.nereids.trees.plans.distribute.worker.job;
+
+import 
org.apache.doris.nereids.trees.plans.distribute.worker.DistributedPlanWorker;
+import org.apache.doris.nereids.util.Utils;
+import org.apache.doris.thrift.TUniqueId;
+
+import com.google.common.collect.ImmutableSet;
+
+import java.util.Set;
+
+/** LocalShuffleBucketJoinAssignedJob */
+public class LocalShuffleBucketJoinAssignedJob extends LocalShuffleAssignedJob 
{
+    private volatile Set<Integer> assignedJoinBucketIndexes;
+
+    public LocalShuffleBucketJoinAssignedJob(
+            int indexInUnassignedJob, int shareScanId, boolean 
receiveDataFromLocal,
+            TUniqueId instanceId, UnassignedJob unassignedJob,
+            DistributedPlanWorker worker, ScanSource scanSource,
+            Set<Integer> assignedJoinBucketIndexes) {
+        super(indexInUnassignedJob, shareScanId, receiveDataFromLocal, 
instanceId, unassignedJob, worker, scanSource);
+        this.assignedJoinBucketIndexes = 
Utils.fastToImmutableSet(assignedJoinBucketIndexes);
+    }
+
+    public Set<Integer> getAssignedJoinBucketIndexes() {
+        return assignedJoinBucketIndexes;
+    }
+
+    public void addAssignedJoinBucketIndexes(Set<Integer> joinBucketIndexes) {
+        this.assignedJoinBucketIndexes = ImmutableSet.<Integer>builder()
+                .addAll(assignedJoinBucketIndexes)
+                .addAll(joinBucketIndexes)
+                .build();
+    }
+
+    @Override
+    protected String formatOtherString() {
+        return ",\n  assigned join buckets: " + assignedJoinBucketIndexes;
+    }
+}
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/StaticAssignedJob.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/StaticAssignedJob.java
index 75849ad8146..77494d621ee 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/StaticAssignedJob.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/StaticAssignedJob.java
@@ -89,6 +89,7 @@ public class StaticAssignedJob implements AssignedJob {
         }
 
         return str
+                .append(formatOtherString())
                 .append(",\n  scanSource: " + formatScanSourceString())
                 .append("\n)")
                 .toString();
@@ -108,6 +109,10 @@ public class StaticAssignedJob implements AssignedJob {
         return scanSourceString.toString();
     }
 
+    protected String formatOtherString() {
+        return "";
+    }
+
     @Override
     public int hashCode() {
         return indexInUnassignedJob;
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/UnassignedScanBucketOlapTableJob.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/UnassignedScanBucketOlapTableJob.java
index 70a91ca2b30..f90fe7ea6e2 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/UnassignedScanBucketOlapTableJob.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/UnassignedScanBucketOlapTableJob.java
@@ -27,6 +27,7 @@ import 
org.apache.doris.nereids.trees.plans.distribute.DistributeContext;
 import 
org.apache.doris.nereids.trees.plans.distribute.worker.DistributedPlanWorker;
 import 
org.apache.doris.nereids.trees.plans.distribute.worker.DistributedPlanWorkerManager;
 import 
org.apache.doris.nereids.trees.plans.distribute.worker.ScanWorkerSelector;
+import org.apache.doris.nereids.util.Utils;
 import org.apache.doris.planner.ExchangeNode;
 import org.apache.doris.planner.HashJoinNode;
 import org.apache.doris.planner.OlapScanNode;
@@ -164,6 +165,34 @@ public class UnassignedScanBucketOlapTableJob extends 
AbstractUnassignedScanJob
         return assignedJobs;
     }
 
+    @Override
+    protected void assignLocalShuffleJobs(ScanSource scanSource, int 
instanceNum, List<AssignedJob> instances,
+            ConnectContext context, DistributedPlanWorker worker) {
+        // only generate one instance to scan all data, in this step
+        List<ScanSource> assignJoinBuckets = scanSource.parallelize(
+                scanNodes, instanceNum
+        );
+
+        // one scan range generate multiple instances,
+        // different instances reference the same scan source
+        int shareScanId = shareScanIdGenerator.getAndIncrement();
+
+        BucketScanSource shareScanSource = (BucketScanSource) scanSource;
+        ScanSource emptyShareScanSource = shareScanSource.newEmpty();
+
+        for (int i = 0; i < assignJoinBuckets.size(); i++) {
+            Set<Integer> assignedJoinBuckets
+                    = ((BucketScanSource) 
assignJoinBuckets.get(i)).bucketIndexToScanNodeToTablets.keySet();
+            LocalShuffleBucketJoinAssignedJob instance = new 
LocalShuffleBucketJoinAssignedJob(
+                    instances.size(), shareScanId, i > 0,
+                    context.nextInstanceId(), this, worker,
+                    i == 0 ? shareScanSource : emptyShareScanSource,
+                    Utils.fastToImmutableSet(assignedJoinBuckets)
+            );
+            instances.add(instance);
+        }
+    }
+
     private boolean shouldFillUpInstances(List<HashJoinNode> hashJoinNodes) {
         for (HashJoinNode hashJoinNode : hashJoinNodes) {
             if (!hashJoinNode.isBucketShuffle()) {
@@ -198,6 +227,7 @@ public class UnassignedScanBucketOlapTableJob extends 
AbstractUnassignedScanJob
         List<AssignedJob> newInstances = new ArrayList<>(instances);
         for (Entry<DistributedPlanWorker, Collection<Integer>> workerToBuckets 
: missingBuckets.asMap().entrySet()) {
             Map<Integer, Map<ScanNode, ScanRanges>> scanEmptyBuckets = 
Maps.newLinkedHashMap();
+            Set<Integer> assignedJoinBuckets = 
Utils.fastToImmutableSet(workerToBuckets.getValue());
             for (Integer bucketIndex : workerToBuckets.getValue()) {
                 Map<ScanNode, ScanRanges> scanTableWithEmptyData = 
Maps.newLinkedHashMap();
                 for (ScanNode scanNode : scanNodes) {
@@ -218,12 +248,16 @@ public class UnassignedScanBucketOlapTableJob extends 
AbstractUnassignedScanJob
                         BucketScanSource bucketScanSource = (BucketScanSource) 
newInstance.getScanSource();
                         
bucketScanSource.bucketIndexToScanNodeToTablets.putAll(scanEmptyBuckets);
                         mergedBucketsInSameWorkerInstance = true;
+
+                        LocalShuffleBucketJoinAssignedJob instance = 
(LocalShuffleBucketJoinAssignedJob) newInstance;
+                        
instance.addAssignedJoinBucketIndexes(assignedJoinBuckets);
                     }
                 }
                 if (!mergedBucketsInSameWorkerInstance) {
-                    fillUpInstance = new LocalShuffleAssignedJob(
+                    fillUpInstance = new LocalShuffleBucketJoinAssignedJob(
                             newInstances.size(), 
shareScanIdGenerator.getAndIncrement(),
-                            false, context.nextInstanceId(), this, worker, 
scanSource
+                            false, context.nextInstanceId(), this, worker, 
scanSource,
+                            assignedJoinBuckets
                     );
                 }
             } else {
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/qe/runtime/PipelineExecutionTask.java
 
b/fe/fe-core/src/main/java/org/apache/doris/qe/runtime/PipelineExecutionTask.java
index 1a8f2a216cd..8c1b9714c35 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/qe/runtime/PipelineExecutionTask.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/qe/runtime/PipelineExecutionTask.java
@@ -63,7 +63,6 @@ public class PipelineExecutionTask extends 
AbstractRuntimeTask<Long, MultiFragme
     private final long timeoutDeadline;
     private final CoordinatorContext coordinatorContext;
     private final BackendServiceProxy backendServiceProxy;
-    private final Map<BackendFragmentId, SingleFragmentPipelineTask> 
backendFragmentTasks;
 
     // mutable states
     public PipelineExecutionTask(
@@ -88,7 +87,6 @@ public class PipelineExecutionTask extends 
AbstractRuntimeTask<Long, MultiFragme
                 backendFragmentTasks.put(new BackendFragmentId(backendId, 
fragmentId), fragmentTask);
             }
         }
-        this.backendFragmentTasks = backendFragmentTasks.build();
     }
 
     @Override
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/qe/runtime/RuntimeFiltersThriftBuilder.java
 
b/fe/fe-core/src/main/java/org/apache/doris/qe/runtime/RuntimeFiltersThriftBuilder.java
index 42cf08fb2e3..47c01ef8eb3 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/qe/runtime/RuntimeFiltersThriftBuilder.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/qe/runtime/RuntimeFiltersThriftBuilder.java
@@ -82,11 +82,13 @@ public class RuntimeFiltersThriftBuilder {
                             target.address, address -> {
                                 TRuntimeFilterTargetParamsV2 params = new 
TRuntimeFilterTargetParamsV2();
                                 params.target_fragment_instance_addr = address;
+                                params.target_fragment_ids = new ArrayList<>();
+                                // required field
                                 params.target_fragment_instance_ids = new 
ArrayList<>();
                                 return params;
                             });
 
-                    
targetParams.target_fragment_instance_ids.add(target.instanceId);
+                    targetParams.target_fragment_ids.add(target.fragmentId);
                 }
 
                 runtimeFilterParams.putToRidToTargetParamv2(
@@ -95,7 +97,8 @@ public class RuntimeFiltersThriftBuilder {
             } else {
                 List<TRuntimeFilterTargetParams> targetParams = 
Lists.newArrayList();
                 for (RuntimeFilterTarget target : targets) {
-                    targetParams.add(new 
TRuntimeFilterTargetParams(target.instanceId, target.address));
+                    // Instance id make no sense if this runtime filter 
doesn't have remote targets.
+                    targetParams.add(new TRuntimeFilterTargetParams(new 
TUniqueId(), target.address));
                 }
                 
runtimeFilterParams.putToRidToTargetParam(rf.getFilterId().asInt(), 
targetParams);
             }
@@ -135,7 +138,7 @@ public class RuntimeFiltersThriftBuilder {
                     BackendWorker backendWorker = (BackendWorker) 
instanceJob.getAssignedWorker();
                     Backend backend = backendWorker.getBackend();
                     targetFragments.add(new RuntimeFilterTarget(
-                            instanceJob.instanceId(),
+                            fragment.getFragmentId().asInt(),
                             new TNetworkAddress(backend.getHost(), 
backend.getBrpcPort())
                     ));
                 }
@@ -158,11 +161,11 @@ public class RuntimeFiltersThriftBuilder {
     }
 
     public static class RuntimeFilterTarget {
-        public final TUniqueId instanceId;
+        public final int fragmentId;
         public final TNetworkAddress address;
 
-        public RuntimeFilterTarget(TUniqueId instanceId, TNetworkAddress 
address) {
-            this.instanceId = instanceId;
+        public RuntimeFilterTarget(int fragmentId, TNetworkAddress address) {
+            this.fragmentId = fragmentId;
             this.address = address;
         }
     }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/qe/runtime/ThriftPlansBuilder.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/runtime/ThriftPlansBuilder.java
index c04861cbf43..f0e3febe192 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/qe/runtime/ThriftPlansBuilder.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/qe/runtime/ThriftPlansBuilder.java
@@ -26,6 +26,7 @@ import 
org.apache.doris.nereids.trees.plans.distribute.worker.job.AssignedJob;
 import 
org.apache.doris.nereids.trees.plans.distribute.worker.job.BucketScanSource;
 import 
org.apache.doris.nereids.trees.plans.distribute.worker.job.DefaultScanSource;
 import 
org.apache.doris.nereids.trees.plans.distribute.worker.job.LocalShuffleAssignedJob;
+import 
org.apache.doris.nereids.trees.plans.distribute.worker.job.LocalShuffleBucketJoinAssignedJob;
 import org.apache.doris.nereids.trees.plans.distribute.worker.job.ScanRanges;
 import org.apache.doris.nereids.trees.plans.distribute.worker.job.ScanSource;
 import 
org.apache.doris.nereids.trees.plans.distribute.worker.job.UnassignedScanBucketOlapTableJob;
@@ -498,14 +499,18 @@ public class ThriftPlansBuilder {
             if (instanceJob.getAssignedWorker().id() != worker.id()) {
                 continue;
             }
-            if (instanceJob instanceof LocalShuffleAssignedJob
-                    && ((LocalShuffleAssignedJob) 
instanceJob).receiveDataFromLocal) {
-                continue;
-            }
+
             Integer instanceIndex = instanceToIndex.get(instanceJob);
-            BucketScanSource bucketScanSource = (BucketScanSource) 
instanceJob.getScanSource();
-            for (Integer bucketIndex : 
bucketScanSource.bucketIndexToScanNodeToTablets.keySet()) {
-                bucketIdToInstanceId.put(bucketIndex, instanceIndex);
+            if (instanceJob instanceof LocalShuffleBucketJoinAssignedJob) {
+                LocalShuffleBucketJoinAssignedJob assignedJob = 
(LocalShuffleBucketJoinAssignedJob) instanceJob;
+                for (Integer bucketIndex : 
assignedJob.getAssignedJoinBucketIndexes()) {
+                    bucketIdToInstanceId.put(bucketIndex, instanceIndex);
+                }
+            } else {
+                BucketScanSource bucketScanSource = (BucketScanSource) 
instanceJob.getScanSource();
+                for (Integer bucketIndex : 
bucketScanSource.bucketIndexToScanNodeToTablets.keySet()) {
+                    bucketIdToInstanceId.put(bucketIndex, instanceIndex);
+                }
             }
         }
         return bucketIdToInstanceId;
diff --git 
a/regression-test/data/nereids_syntax_p0/distribute/shuffle_left_join.out 
b/regression-test/data/nereids_syntax_p0/distribute/shuffle_left_join.out
index 99d095d87f7..0b5a63b0295 100644
--- a/regression-test/data/nereids_syntax_p0/distribute/shuffle_left_join.out
+++ b/regression-test/data/nereids_syntax_p0/distribute/shuffle_left_join.out
@@ -1,9 +1,9 @@
 -- This file is automatically generated. You should know what you did if you 
want to edit this
 -- !shuffle_left_and_right --
-1      1       1       1
-2      2       2       2
+1      1       1
+2      2       2
 
 -- !shuffle_left --
-1      1       1       1
-2      2       2       2
+1      1       1
+2      2       2
 
diff --git 
a/regression-test/data/new_shapes_p0/tpch_sf1000/nostats_rf_prune/q15.out 
b/regression-test/data/new_shapes_p0/tpch_sf1000/nostats_rf_prune/q15.out
index 519a64b38aa..9e6b383230a 100644
--- a/regression-test/data/new_shapes_p0/tpch_sf1000/nostats_rf_prune/q15.out
+++ b/regression-test/data/new_shapes_p0/tpch_sf1000/nostats_rf_prune/q15.out
@@ -7,7 +7,7 @@ PhysicalResultSink
 --------PhysicalProject
 ----------hashJoin[INNER_JOIN broadcast] 
hashCondition=((revenue0.total_revenue = max(total_revenue))) otherCondition=()
 ------------PhysicalProject
---------------hashJoin[INNER_JOIN broadcast] 
hashCondition=((supplier.s_suppkey = revenue0.supplier_no)) otherCondition=()
+--------------hashJoin[INNER_JOIN bucketShuffle] 
hashCondition=((supplier.s_suppkey = revenue0.supplier_no)) otherCondition=()
 ----------------PhysicalProject
 ------------------hashAgg[GLOBAL]
 --------------------PhysicalDistribute[DistributionSpecHash]
diff --git 
a/regression-test/data/new_shapes_p0/tpch_sf1000/nostats_rf_prune/q20-rewrite.out
 
b/regression-test/data/new_shapes_p0/tpch_sf1000/nostats_rf_prune/q20-rewrite.out
index f1ec59e40d8..89548468b7c 100644
--- 
a/regression-test/data/new_shapes_p0/tpch_sf1000/nostats_rf_prune/q20-rewrite.out
+++ 
b/regression-test/data/new_shapes_p0/tpch_sf1000/nostats_rf_prune/q20-rewrite.out
@@ -9,7 +9,7 @@ PhysicalResultSink
 ------------PhysicalProject
 --------------hashJoin[RIGHT_SEMI_JOIN shuffleBucket] 
hashCondition=((supplier.s_suppkey = t3.ps_suppkey)) otherCondition=() build 
RFs:RF3 s_suppkey->[l_suppkey,ps_suppkey]
 ----------------PhysicalProject
-------------------hashJoin[INNER_JOIN shuffleBucket] 
hashCondition=((t2.l_partkey = t1.ps_partkey) and (t2.l_suppkey = 
t1.ps_suppkey)) otherCondition=((cast(ps_availqty as DECIMALV3(38, 3)) > 
t2.l_q)) build RFs:RF1 ps_partkey->[l_partkey];RF2 ps_suppkey->[l_suppkey]
+------------------hashJoin[INNER_JOIN bucketShuffle] 
hashCondition=((t2.l_partkey = t1.ps_partkey) and (t2.l_suppkey = 
t1.ps_suppkey)) otherCondition=((cast(ps_availqty as DECIMALV3(38, 3)) > 
t2.l_q)) build RFs:RF1 ps_partkey->[l_partkey];RF2 ps_suppkey->[l_suppkey]
 --------------------PhysicalProject
 ----------------------hashAgg[GLOBAL]
 ------------------------PhysicalDistribute[DistributionSpecHash]
diff --git 
a/regression-test/data/new_shapes_p0/tpch_sf1000/nostats_rf_prune/q20.out 
b/regression-test/data/new_shapes_p0/tpch_sf1000/nostats_rf_prune/q20.out
index bcf875640f9..7678db3199a 100644
--- a/regression-test/data/new_shapes_p0/tpch_sf1000/nostats_rf_prune/q20.out
+++ b/regression-test/data/new_shapes_p0/tpch_sf1000/nostats_rf_prune/q20.out
@@ -9,7 +9,7 @@ PhysicalResultSink
 ------------PhysicalProject
 --------------hashJoin[RIGHT_SEMI_JOIN shuffleBucket] 
hashCondition=((supplier.s_suppkey = partsupp.ps_suppkey)) otherCondition=() 
build RFs:RF3 s_suppkey->[l_suppkey,ps_suppkey]
 ----------------PhysicalProject
-------------------hashJoin[INNER_JOIN shuffleBucket] 
hashCondition=((lineitem.l_partkey = partsupp.ps_partkey) and 
(lineitem.l_suppkey = partsupp.ps_suppkey)) otherCondition=((cast(ps_availqty 
as DECIMALV3(38, 3)) > (0.5 * sum(l_quantity)))) build RFs:RF1 
ps_partkey->[l_partkey];RF2 ps_suppkey->[l_suppkey]
+------------------hashJoin[INNER_JOIN bucketShuffle] 
hashCondition=((lineitem.l_partkey = partsupp.ps_partkey) and 
(lineitem.l_suppkey = partsupp.ps_suppkey)) otherCondition=((cast(ps_availqty 
as DECIMALV3(38, 3)) > (0.5 * sum(l_quantity)))) build RFs:RF1 
ps_partkey->[l_partkey];RF2 ps_suppkey->[l_suppkey]
 --------------------hashAgg[GLOBAL]
 ----------------------PhysicalDistribute[DistributionSpecHash]
 ------------------------hashAgg[LOCAL]
diff --git 
a/regression-test/data/new_shapes_p0/tpch_sf1000/rf_prune/q20-rewrite.out 
b/regression-test/data/new_shapes_p0/tpch_sf1000/rf_prune/q20-rewrite.out
index 80f86bf96e4..6b7a6da490c 100644
--- a/regression-test/data/new_shapes_p0/tpch_sf1000/rf_prune/q20-rewrite.out
+++ b/regression-test/data/new_shapes_p0/tpch_sf1000/rf_prune/q20-rewrite.out
@@ -7,7 +7,7 @@ PhysicalResultSink
 --------PhysicalProject
 ----------hashJoin[RIGHT_SEMI_JOIN shuffleBucket] 
hashCondition=((supplier.s_suppkey = t3.ps_suppkey)) otherCondition=() build 
RFs:RF4 s_suppkey->[l_suppkey,ps_suppkey]
 ------------PhysicalProject
---------------hashJoin[INNER_JOIN shuffleBucket] hashCondition=((t2.l_partkey 
= t1.ps_partkey) and (t2.l_suppkey = t1.ps_suppkey)) 
otherCondition=((cast(ps_availqty as DECIMALV3(38, 3)) > t2.l_q)) build RFs:RF2 
ps_partkey->[l_partkey];RF3 ps_suppkey->[l_suppkey]
+--------------hashJoin[INNER_JOIN bucketShuffle] hashCondition=((t2.l_partkey 
= t1.ps_partkey) and (t2.l_suppkey = t1.ps_suppkey)) 
otherCondition=((cast(ps_availqty as DECIMALV3(38, 3)) > t2.l_q)) build RFs:RF2 
ps_partkey->[l_partkey];RF3 ps_suppkey->[l_suppkey]
 ----------------PhysicalProject
 ------------------hashAgg[GLOBAL]
 --------------------PhysicalDistribute[DistributionSpecHash]
diff --git a/regression-test/data/new_shapes_p0/tpch_sf1000/rf_prune/q20.out 
b/regression-test/data/new_shapes_p0/tpch_sf1000/rf_prune/q20.out
index aca1634e470..6b3b115fcfe 100644
--- a/regression-test/data/new_shapes_p0/tpch_sf1000/rf_prune/q20.out
+++ b/regression-test/data/new_shapes_p0/tpch_sf1000/rf_prune/q20.out
@@ -7,7 +7,7 @@ PhysicalResultSink
 --------PhysicalProject
 ----------hashJoin[RIGHT_SEMI_JOIN shuffleBucket] 
hashCondition=((supplier.s_suppkey = partsupp.ps_suppkey)) otherCondition=() 
build RFs:RF4 s_suppkey->[l_suppkey,ps_suppkey]
 ------------PhysicalProject
---------------hashJoin[INNER_JOIN shuffleBucket] 
hashCondition=((lineitem.l_partkey = partsupp.ps_partkey) and 
(lineitem.l_suppkey = partsupp.ps_suppkey)) otherCondition=((cast(ps_availqty 
as DECIMALV3(38, 3)) > (0.5 * sum(l_quantity)))) build RFs:RF2 
ps_partkey->[l_partkey];RF3 ps_suppkey->[l_suppkey]
+--------------hashJoin[INNER_JOIN bucketShuffle] 
hashCondition=((lineitem.l_partkey = partsupp.ps_partkey) and 
(lineitem.l_suppkey = partsupp.ps_suppkey)) otherCondition=((cast(ps_availqty 
as DECIMALV3(38, 3)) > (0.5 * sum(l_quantity)))) build RFs:RF2 
ps_partkey->[l_partkey];RF3 ps_suppkey->[l_suppkey]
 ----------------hashAgg[GLOBAL]
 ------------------PhysicalDistribute[DistributionSpecHash]
 --------------------hashAgg[LOCAL]
diff --git 
a/regression-test/data/new_shapes_p0/tpch_sf1000/shape/q20-rewrite.out 
b/regression-test/data/new_shapes_p0/tpch_sf1000/shape/q20-rewrite.out
index 80f86bf96e4..6b7a6da490c 100644
--- a/regression-test/data/new_shapes_p0/tpch_sf1000/shape/q20-rewrite.out
+++ b/regression-test/data/new_shapes_p0/tpch_sf1000/shape/q20-rewrite.out
@@ -7,7 +7,7 @@ PhysicalResultSink
 --------PhysicalProject
 ----------hashJoin[RIGHT_SEMI_JOIN shuffleBucket] 
hashCondition=((supplier.s_suppkey = t3.ps_suppkey)) otherCondition=() build 
RFs:RF4 s_suppkey->[l_suppkey,ps_suppkey]
 ------------PhysicalProject
---------------hashJoin[INNER_JOIN shuffleBucket] hashCondition=((t2.l_partkey 
= t1.ps_partkey) and (t2.l_suppkey = t1.ps_suppkey)) 
otherCondition=((cast(ps_availqty as DECIMALV3(38, 3)) > t2.l_q)) build RFs:RF2 
ps_partkey->[l_partkey];RF3 ps_suppkey->[l_suppkey]
+--------------hashJoin[INNER_JOIN bucketShuffle] hashCondition=((t2.l_partkey 
= t1.ps_partkey) and (t2.l_suppkey = t1.ps_suppkey)) 
otherCondition=((cast(ps_availqty as DECIMALV3(38, 3)) > t2.l_q)) build RFs:RF2 
ps_partkey->[l_partkey];RF3 ps_suppkey->[l_suppkey]
 ----------------PhysicalProject
 ------------------hashAgg[GLOBAL]
 --------------------PhysicalDistribute[DistributionSpecHash]
diff --git a/regression-test/data/new_shapes_p0/tpch_sf1000/shape/q20.out 
b/regression-test/data/new_shapes_p0/tpch_sf1000/shape/q20.out
index aca1634e470..6b3b115fcfe 100644
--- a/regression-test/data/new_shapes_p0/tpch_sf1000/shape/q20.out
+++ b/regression-test/data/new_shapes_p0/tpch_sf1000/shape/q20.out
@@ -7,7 +7,7 @@ PhysicalResultSink
 --------PhysicalProject
 ----------hashJoin[RIGHT_SEMI_JOIN shuffleBucket] 
hashCondition=((supplier.s_suppkey = partsupp.ps_suppkey)) otherCondition=() 
build RFs:RF4 s_suppkey->[l_suppkey,ps_suppkey]
 ------------PhysicalProject
---------------hashJoin[INNER_JOIN shuffleBucket] 
hashCondition=((lineitem.l_partkey = partsupp.ps_partkey) and 
(lineitem.l_suppkey = partsupp.ps_suppkey)) otherCondition=((cast(ps_availqty 
as DECIMALV3(38, 3)) > (0.5 * sum(l_quantity)))) build RFs:RF2 
ps_partkey->[l_partkey];RF3 ps_suppkey->[l_suppkey]
+--------------hashJoin[INNER_JOIN bucketShuffle] 
hashCondition=((lineitem.l_partkey = partsupp.ps_partkey) and 
(lineitem.l_suppkey = partsupp.ps_suppkey)) otherCondition=((cast(ps_availqty 
as DECIMALV3(38, 3)) > (0.5 * sum(l_quantity)))) build RFs:RF2 
ps_partkey->[l_partkey];RF3 ps_suppkey->[l_suppkey]
 ----------------hashAgg[GLOBAL]
 ------------------PhysicalDistribute[DistributionSpecHash]
 --------------------hashAgg[LOCAL]
diff --git 
a/regression-test/data/new_shapes_p0/tpch_sf1000/shape_no_stats/q15.out 
b/regression-test/data/new_shapes_p0/tpch_sf1000/shape_no_stats/q15.out
index f29b77317c3..e9b45b5888c 100644
--- a/regression-test/data/new_shapes_p0/tpch_sf1000/shape_no_stats/q15.out
+++ b/regression-test/data/new_shapes_p0/tpch_sf1000/shape_no_stats/q15.out
@@ -7,7 +7,7 @@ PhysicalResultSink
 --------PhysicalProject
 ----------hashJoin[INNER_JOIN broadcast] 
hashCondition=((revenue0.total_revenue = max(total_revenue))) otherCondition=()
 ------------PhysicalProject
---------------hashJoin[INNER_JOIN broadcast] 
hashCondition=((supplier.s_suppkey = revenue0.supplier_no)) otherCondition=() 
build RFs:RF0 s_suppkey->[l_suppkey]
+--------------hashJoin[INNER_JOIN bucketShuffle] 
hashCondition=((supplier.s_suppkey = revenue0.supplier_no)) otherCondition=() 
build RFs:RF0 s_suppkey->[l_suppkey]
 ----------------PhysicalProject
 ------------------hashAgg[GLOBAL]
 --------------------PhysicalDistribute[DistributionSpecHash]
diff --git 
a/regression-test/data/new_shapes_p0/tpch_sf1000/shape_no_stats/q20-rewrite.out 
b/regression-test/data/new_shapes_p0/tpch_sf1000/shape_no_stats/q20-rewrite.out
index f1ec59e40d8..89548468b7c 100644
--- 
a/regression-test/data/new_shapes_p0/tpch_sf1000/shape_no_stats/q20-rewrite.out
+++ 
b/regression-test/data/new_shapes_p0/tpch_sf1000/shape_no_stats/q20-rewrite.out
@@ -9,7 +9,7 @@ PhysicalResultSink
 ------------PhysicalProject
 --------------hashJoin[RIGHT_SEMI_JOIN shuffleBucket] 
hashCondition=((supplier.s_suppkey = t3.ps_suppkey)) otherCondition=() build 
RFs:RF3 s_suppkey->[l_suppkey,ps_suppkey]
 ----------------PhysicalProject
-------------------hashJoin[INNER_JOIN shuffleBucket] 
hashCondition=((t2.l_partkey = t1.ps_partkey) and (t2.l_suppkey = 
t1.ps_suppkey)) otherCondition=((cast(ps_availqty as DECIMALV3(38, 3)) > 
t2.l_q)) build RFs:RF1 ps_partkey->[l_partkey];RF2 ps_suppkey->[l_suppkey]
+------------------hashJoin[INNER_JOIN bucketShuffle] 
hashCondition=((t2.l_partkey = t1.ps_partkey) and (t2.l_suppkey = 
t1.ps_suppkey)) otherCondition=((cast(ps_availqty as DECIMALV3(38, 3)) > 
t2.l_q)) build RFs:RF1 ps_partkey->[l_partkey];RF2 ps_suppkey->[l_suppkey]
 --------------------PhysicalProject
 ----------------------hashAgg[GLOBAL]
 ------------------------PhysicalDistribute[DistributionSpecHash]
diff --git 
a/regression-test/data/new_shapes_p0/tpch_sf1000/shape_no_stats/q20.out 
b/regression-test/data/new_shapes_p0/tpch_sf1000/shape_no_stats/q20.out
index bcf875640f9..7678db3199a 100644
--- a/regression-test/data/new_shapes_p0/tpch_sf1000/shape_no_stats/q20.out
+++ b/regression-test/data/new_shapes_p0/tpch_sf1000/shape_no_stats/q20.out
@@ -9,7 +9,7 @@ PhysicalResultSink
 ------------PhysicalProject
 --------------hashJoin[RIGHT_SEMI_JOIN shuffleBucket] 
hashCondition=((supplier.s_suppkey = partsupp.ps_suppkey)) otherCondition=() 
build RFs:RF3 s_suppkey->[l_suppkey,ps_suppkey]
 ----------------PhysicalProject
-------------------hashJoin[INNER_JOIN shuffleBucket] 
hashCondition=((lineitem.l_partkey = partsupp.ps_partkey) and 
(lineitem.l_suppkey = partsupp.ps_suppkey)) otherCondition=((cast(ps_availqty 
as DECIMALV3(38, 3)) > (0.5 * sum(l_quantity)))) build RFs:RF1 
ps_partkey->[l_partkey];RF2 ps_suppkey->[l_suppkey]
+------------------hashJoin[INNER_JOIN bucketShuffle] 
hashCondition=((lineitem.l_partkey = partsupp.ps_partkey) and 
(lineitem.l_suppkey = partsupp.ps_suppkey)) otherCondition=((cast(ps_availqty 
as DECIMALV3(38, 3)) > (0.5 * sum(l_quantity)))) build RFs:RF1 
ps_partkey->[l_partkey];RF2 ps_suppkey->[l_suppkey]
 --------------------hashAgg[GLOBAL]
 ----------------------PhysicalDistribute[DistributionSpecHash]
 ------------------------hashAgg[LOCAL]
diff --git 
a/regression-test/suites/nereids_syntax_p0/distribute/shuffle_left_join.groovy 
b/regression-test/suites/nereids_syntax_p0/distribute/shuffle_left_join.groovy
index 8c56c257b0e..4ee14eba481 100644
--- 
a/regression-test/suites/nereids_syntax_p0/distribute/shuffle_left_join.groovy
+++ 
b/regression-test/suites/nereids_syntax_p0/distribute/shuffle_left_join.groovy
@@ -95,8 +95,8 @@ suite("shuffle_left_join") {
         .collect(Collectors.joining("\n"))
     logger.info("Variables:\n${variableString}")
 
-    extractFragment(sqlStr, "INNER JOIN(BUCKET_SHUFFLE)") { exchangeNum ->
-        assertTrue(exchangeNum == 1)
+    extractFragment(sqlStr, "INNER JOIN(PARTITIONED)") { exchangeNum ->
+        assertTrue(exchangeNum == 2)
     }
 
     explain {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to