This is an automated email from the ASF dual-hosted git repository.

924060929 pushed a commit to branch fe_local_shuffle_optimize
in repository https://gitbox.apache.org/repos/asf/doris.git

commit 001548d9f81a5a1f4a3a27b404691f2c710d25be
Author: 924060929 <[email protected]>
AuthorDate: Thu Jun 4 17:30:49 2026 +0800

    [opt](local shuffle) upgrade bucket LE to local hash when instances exceed 
buckets (DORIS-24902 part 2)
    
    A pooled bucket-join fragment runs bucket joins at bucket-count parallelism:
    only instances owning join buckets do work (e.g. 8 buckets/BE, 16 
instances/BE
    leaves half idle). When nothing above the join needs bucket alignment and
    per-BE instances > buckets-with-data x local_shuffle_bucket_upgrade_ratio
    (double, default 1.5; <=1 disables), HashJoinNode re-distributes both sides
    with LOCAL_EXECUTION_HASH_SHUFFLE keyed by childrenDistributeExprLists so 
the
    join runs at full instance parallelism. Same-key local rehash keeps each key
    on one instance (groups intact, both sides pairwise aligned) at zero network
    cost; coordinator dests and BE are untouched.
    
    Safety gates:
    - parentRequire == BUCKET (upper bucket join) blocks the upgrade
    - hasBucketUpgradedAncestor: in stacked bucket joins only the topmost
      upgrades; a lower join's LOCAL hash output (different keys) would
      type-satisfy the upper requireSpecific(LOCAL) and skip the re-align LE
    - both sides must have non-empty distribute exprs
    - numeric gate computed from real assignment data
      (LocalShuffleBucketJoinAssignedJob.assignedJoinBucketIndexes)
---
 .../glue/translator/PlanTranslatorContext.java     |  31 ++++
 .../org/apache/doris/planner/AddLocalExchange.java |  63 ++++++++
 .../org/apache/doris/planner/HashJoinNode.java     |  83 +++++++++--
 .../java/org/apache/doris/qe/SessionVariable.java  |  22 +++
 .../planner/LocalShuffleNodeCoverageTest.java      | 152 ++++++++++++++++++++
 .../test_local_shuffle_bucket_upgrade.groovy       | 159 +++++++++++++++++++++
 6 files changed, 497 insertions(+), 13 deletions(-)

diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PlanTranslatorContext.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PlanTranslatorContext.java
index 223d84888bd..92c724124dc 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PlanTranslatorContext.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PlanTranslatorContext.java
@@ -127,6 +127,21 @@ public class PlanTranslatorContext {
     // needs shuffle for correctness, not just for performance like 
StreamingAgg pre-agg).
     private final Map<PlanNodeId, Boolean> shuffledAncestorMap = 
Maps.newHashMap();
 
+    // Whether the fragment currently being processed by AddLocalExchange is 
eligible for the
+    // bucket → local-hash parallelism upgrade: a pooled bucket-join fragment 
whose per-BE
+    // instance count exceeds (buckets-with-data per BE) × 
local_shuffle_bucket_upgrade_ratio.
+    // Computed once per fragment in AddLocalExchange.addLocalExchange from 
the distributed
+    // plan's LocalShuffleBucketJoinAssignedJob assignments; read by
+    // HashJoinNode.enforceAndDeriveLocalExchange.
+    private boolean currentFragmentBucketUpgradeEligible = false;
+
+    // Per-node "a bucket join above me in this fragment already upgraded to 
local hash" flag.
+    // An upgraded join marks its direct children so a stacked bucket join 
below keeps its
+    // BUCKET_HASH_SHUFFLE requires: if it also upgraded, its LOCAL hash 
output (keyed by ITS
+    // join keys) would type-satisfy the upper join's 
requireSpecific(LOCAL_EXECUTION_HASH)
+    // and suppress the LE that re-aligns data to the upper join's keys → 
wrong results.
+    private final Map<PlanNodeId, Boolean> bucketUpgradedAncestorMap = 
Maps.newHashMap();
+
     // Whether the current fragment uses LocalShuffleAssignedJob (pooling scan 
with
     // ignoreDataDistribution → _parallel_instances=1 in BE). When true, 
serial operators
     // indicate real pipeline bottlenecks needing PASSTHROUGH fan-out 
(heavy_ops).
@@ -276,6 +291,22 @@ public class PlanTranslatorContext {
         return shuffledAncestorMap.getOrDefault(node.getId(), false);
     }
 
+    public void setCurrentFragmentBucketUpgradeEligible(boolean eligible) {
+        this.currentFragmentBucketUpgradeEligible = eligible;
+    }
+
+    public boolean isCurrentFragmentBucketUpgradeEligible() {
+        return currentFragmentBucketUpgradeEligible;
+    }
+
+    public void setHasBucketUpgradedAncestor(PlanNode node, boolean value) {
+        bucketUpgradedAncestorMap.put(node.getId(), value);
+    }
+
+    public boolean hasBucketUpgradedAncestor(PlanNode node) {
+        return bucketUpgradedAncestorMap.getOrDefault(node.getId(), false);
+    }
+
     public SlotDescriptor addSlotDesc(TupleDescriptor t) {
         return descTable.addSlotDescriptor(t);
     }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/planner/AddLocalExchange.java 
b/fe/fe-core/src/main/java/org/apache/doris/planner/AddLocalExchange.java
index bb4a883d608..92925292f12 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/AddLocalExchange.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/AddLocalExchange.java
@@ -22,9 +22,18 @@ import 
org.apache.doris.nereids.glue.translator.PlanTranslatorContext;
 import org.apache.doris.nereids.trees.plans.distribute.DistributedPlan;
 import org.apache.doris.nereids.trees.plans.distribute.FragmentIdMapping;
 import org.apache.doris.nereids.trees.plans.distribute.PipelineDistributedPlan;
+import org.apache.doris.nereids.trees.plans.distribute.worker.job.AssignedJob;
+import 
org.apache.doris.nereids.trees.plans.distribute.worker.job.LocalShuffleBucketJoinAssignedJob;
 import org.apache.doris.planner.LocalExchangeNode.LocalExchangeType;
 import org.apache.doris.planner.LocalExchangeNode.LocalExchangeTypeRequire;
 import org.apache.doris.planner.LocalExchangeNode.RequireHash;
+import org.apache.doris.qe.ConnectContext;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
 
 /**
  * FE-side local exchange planner — inserts {@link LocalExchangeNode} into 
each fragment's
@@ -81,11 +90,65 @@ public class AddLocalExchange {
             if (maxPerBeInstances <= 1) {
                 continue;
             }
+            context.setCurrentFragmentBucketUpgradeEligible(
+                    isBucketUpgradeEligible(pipePlan, maxPerBeInstances, 
context));
             PlanFragment fragment = pipePlan.getFragmentJob().getFragment();
             addLocalExchangeForFragment(fragment, context);
         }
     }
 
+    /**
+     * Bucket → local-hash parallelism upgrade eligibility (DORIS-24902 part 
2).
+     *
+     * A pooled bucket-join fragment runs its bucket joins at bucket-count 
parallelism:
+     * each LocalShuffleBucketJoinAssignedJob owns a disjoint set of join 
buckets and only
+     * instances with buckets do join work (e.g. 8 buckets/BE but 16 
instances/BE → 8 idle).
+     * When nothing above the join needs bucket alignment, HashJoinNode can 
re-distribute
+     * both sides with LOCAL_EXECUTION_HASH_SHUFFLE to use all instances — see
+     * {@link HashJoinNode#enforceAndDeriveLocalExchange}.
+     *
+     * This method computes the per-fragment numeric condition from the actual 
instance
+     * assignment: maxPerBeInstances > maxBucketsWithDataPerWorker × ratio.  
The ratio comes
+     * from session variable {@code local_shuffle_bucket_upgrade_ratio}; 
values <= 1 disable
+     * the upgrade entirely (a required parallelism gain of at most 1x means 
no gain).
+     */
+    private boolean isBucketUpgradeEligible(PipelineDistributedPlan pipePlan,
+            long maxPerBeInstances, PlanTranslatorContext context) {
+        ConnectContext connectContext = context.getConnectContext();
+        if (connectContext == null || connectContext.getSessionVariable() == 
null) {
+            return false;
+        }
+        double ratio = 
connectContext.getSessionVariable().getLocalShuffleBucketUpgradeRatio();
+        List<AssignedJob> instanceJobs = pipePlan.getInstanceJobs();
+        if (instanceJobs.isEmpty()
+                || 
!instanceJobs.stream().allMatch(LocalShuffleBucketJoinAssignedJob.class::isInstance))
 {
+            // Only pooled bucket-join fragments have the bucket-count 
parallelism cap.
+            return false;
+        }
+        Map<Long, Set<Integer>> bucketsPerWorker = new HashMap<>();
+        for (AssignedJob job : instanceJobs) {
+            bucketsPerWorker.computeIfAbsent(job.getAssignedWorker().id(), k 
-> new HashSet<>())
+                    .addAll(((LocalShuffleBucketJoinAssignedJob) 
job).getAssignedJoinBucketIndexes());
+        }
+        long maxBucketsPerWorker = bucketsPerWorker.values().stream()
+                .mapToLong(Set::size).max().orElse(0);
+        return shouldUpgradeBucketParallelism(ratio, maxPerBeInstances, 
maxBucketsPerWorker);
+    }
+
+    /**
+     * Pure numeric gate for the bucket → local-hash upgrade.
+     * ratio <= 1 (including 0 and negatives) always disables; otherwise 
upgrade when the
+     * per-BE instance count exceeds buckets-with-data × ratio (i.e. the 
parallelism gain
+     * is at least the configured multiple).
+     */
+    static boolean shouldUpgradeBucketParallelism(double ratio, long 
maxPerBeInstances,
+            long maxBucketsPerWorker) {
+        if (ratio <= 1.0) {
+            return false;
+        }
+        return maxBucketsPerWorker > 0 && maxPerBeInstances > 
maxBucketsPerWorker * ratio;
+    }
+
     private void addLocalExchangeForFragment(PlanFragment fragment, 
PlanTranslatorContext context) {
         DataSink sink = fragment.getSink();
         LocalExchangeTypeRequire require = sink == null
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/planner/HashJoinNode.java 
b/fe/fe-core/src/main/java/org/apache/doris/planner/HashJoinNode.java
index cbf2dcd6511..017792483ab 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/HashJoinNode.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/HashJoinNode.java
@@ -330,19 +330,47 @@ public class HashJoinNode extends JoinNodeBase {
             // For non-serial probe: propagate probe side's actual 
distribution.
             outputType = probeChildSerial ? LocalExchangeType.PASSTHROUGH : 
null;
         } else if (isColocate() || isBucketShuffle()) {
-            probeSideRequire = LocalExchangeTypeRequire.requireBucketHash();
-            // For BUCKET_SHUFFLE with serial build child: use 
requireBucketHash() (not
-            // requirePassToOne()). Unlike BROADCAST joins, BUCKET_SHUFFLE has 
no shared
-            // hash table mechanism — PASS_TO_ONE routes all data to task 0 
while tasks 1..N-1
-            // build empty hash tables, losing rows. BUCKET_HASH_SHUFFLE 
correctly distributes
-            // build data by bucket to match the probe side's bucket 
distribution.
-            // The serial exchange returns NOOP, so enforceRequire() will 
insert a
-            // BUCKET_HASH_SHUFFLE local exchange (with PASSTHROUGH fan-out 
for heavy-ops
-            // bottleneck avoidance).
-            buildSideRequire = LocalExchangeTypeRequire.requireBucketHash();
-            outputType = AddLocalExchange.resolveExchangeType(
-                    LocalExchangeTypeRequire.requireBucketHash(), 
translatorContext, this,
-                    children.get(0));
+            if (canUpgradeBucketToLocalHash(translatorContext, parentRequire)) 
{
+                // Bucket → local-hash parallelism upgrade (DORIS-24902 part 
2): the fragment
+                // has noticeably more instances than buckets-with-data (see
+                // AddLocalExchange.isBucketUpgradeEligible) and nothing above 
this join needs
+                // bucket alignment — re-distribute both sides by their 
distribute keys with
+                // LOCAL_EXECUTION_HASH_SHUFFLE so the join runs at full 
instance parallelism
+                // instead of being capped at bucket count.  The LE keys come 
from
+                // childrenDistributeExprLists (pairwise-aligned per side, a 
subset of the
+                // equi-join keys), so both sides keep hashing the same values 
and the
+                // per-instance build/probe pairing stays correct.
+                //
+                // requireSpecific (not requireHash) on purpose: the children's
+                // BUCKET_HASH_SHUFFLE output must NOT satisfy this require, 
otherwise no LE
+                // is inserted and the join stays bucket-capped.
+                //
+                // Mark direct children so a stacked bucket join below keeps 
its BUCKET
+                // requires: if it also upgraded, its LOCAL hash output (keyed 
by ITS join
+                // keys) would type-satisfy our 
requireSpecific(LOCAL_EXECUTION_HASH) and
+                // suppress the LE that re-aligns data to OUR keys → wrong 
results.
+                
translatorContext.setHasBucketUpgradedAncestor(children.get(0), true);
+                
translatorContext.setHasBucketUpgradedAncestor(children.get(1), true);
+                probeSideRequire = LocalExchangeTypeRequire.requireSpecific(
+                        LocalExchangeType.LOCAL_EXECUTION_HASH_SHUFFLE);
+                buildSideRequire = LocalExchangeTypeRequire.requireSpecific(
+                        LocalExchangeType.LOCAL_EXECUTION_HASH_SHUFFLE);
+                outputType = null; // derived from probeResult.second below
+            } else {
+                probeSideRequire = 
LocalExchangeTypeRequire.requireBucketHash();
+                // For BUCKET_SHUFFLE with serial build child: use 
requireBucketHash() (not
+                // requirePassToOne()). Unlike BROADCAST joins, BUCKET_SHUFFLE 
has no shared
+                // hash table mechanism — PASS_TO_ONE routes all data to task 
0 while tasks 1..N-1
+                // build empty hash tables, losing rows. BUCKET_HASH_SHUFFLE 
correctly distributes
+                // build data by bucket to match the probe side's bucket 
distribution.
+                // The serial exchange returns NOOP, so enforceRequire() will 
insert a
+                // BUCKET_HASH_SHUFFLE local exchange (with PASSTHROUGH 
fan-out for heavy-ops
+                // bottleneck avoidance).
+                buildSideRequire = 
LocalExchangeTypeRequire.requireBucketHash();
+                outputType = AddLocalExchange.resolveExchangeType(
+                        LocalExchangeTypeRequire.requireBucketHash(), 
translatorContext, this,
+                        children.get(0));
+            }
         } else {
             // PARTITIONED (shuffle) join: both sides enter via global hash 
exchange.
             // Require GLOBAL specifically so that any inserted exchange uses 
the same
@@ -378,4 +406,33 @@ public class HashJoinNode extends JoinNodeBase {
     protected boolean shouldResetSerialFlagForChild(int childIndex) {
         return childIndex == 1;
     }
+
+    /**
+     * Whether this bucket-shuffle / colocate join may upgrade its children 
requires from
+     * BUCKET_HASH_SHUFFLE to LOCAL_EXECUTION_HASH_SHUFFLE for higher 
parallelism:
+     * <ul>
+     *   <li>the fragment passed the numeric gate (instances vs 
buckets-with-data × ratio),
+     *       computed once per fragment in {@code AddLocalExchange};</li>
+     *   <li>no bucket join above already upgraded (stacked joins must keep 
bucket
+     *       alignment below the single upgraded ancestor — see
+     *       {@code PlanTranslatorContext#hasBucketUpgradedAncestor});</li>
+     *   <li>the parent does not require bucket distribution of our output (an 
upper
+     *       bucket join's probe/build require — upgrading here would break 
the bucket
+     *       alignment it depends on);</li>
+     *   <li>both sides have non-empty distribute exprs — they become the 
LOCAL hash LE
+     *       keys, an exprs-less hash exchange would be meaningless.</li>
+     * </ul>
+     */
+    private boolean canUpgradeBucketToLocalHash(PlanTranslatorContext 
translatorContext,
+            LocalExchangeTypeRequire parentRequire) {
+        if (!translatorContext.isCurrentFragmentBucketUpgradeEligible()
+                || translatorContext.hasBucketUpgradedAncestor(this)
+                || parentRequire.preferType() == 
LocalExchangeType.BUCKET_HASH_SHUFFLE) {
+            return false;
+        }
+        List<Expr> probeExprs = getChildDistributeExprList(0);
+        List<Expr> buildExprs = getChildDistributeExprList(1);
+        return probeExprs != null && !probeExprs.isEmpty()
+                && buildExprs != null && !buildExprs.isEmpty();
+    }
 }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
index 2cddffb2ba9..66f80718ee4 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
@@ -339,6 +339,8 @@ public class SessionVariable implements Serializable, 
Writable {
 
     public static final String ENABLE_LOCAL_SHUFFLE_PLANNER = 
"enable_local_shuffle_planner";
 
+    public static final String LOCAL_SHUFFLE_BUCKET_UPGRADE_RATIO = 
"local_shuffle_bucket_upgrade_ratio";
+
     public static final String FORCE_TO_LOCAL_SHUFFLE = 
"force_to_local_shuffle";
 
     public static final String ENABLE_LOCAL_MERGE_SORT = 
"enable_local_merge_sort";
@@ -1640,6 +1642,18 @@ public class SessionVariable implements Serializable, 
Writable {
                         "Whether to force to local shuffle on pipelineX 
engine."})
     private boolean forceToLocalShuffle = false;
 
+    @VarAttrDef.VarAttr(
+            name = LOCAL_SHUFFLE_BUCKET_UPGRADE_RATIO, fuzzy = false, varType 
= VariableAnnotation.EXPERIMENTAL,
+            description = {"FE规划Local Shuffle时, 当池化bucket 
join所在fragment的每BE实例数大于"
+                    + "每BE有数据分桶数的该倍数时, 将join两侧的桶分布本地重分发为hash分布以突破桶数并发上限。"
+                    + "必须大于1才生效; 小于等于1(含0和负数)时关闭该优化",
+                    "When FE plans local shuffle and a pooled bucket join 
fragment has more instances"
+                    + " per BE than (buckets-with-data per BE) * this ratio, 
re-distribute both join"
+                    + " sides with local hash instead of bucket hash so join 
parallelism is no longer"
+                    + " capped at bucket count. Only takes effect when > 1; 
values <= 1 (including 0"
+                    + " and negatives) disable the upgrade."}, needForward = 
true)
+    private double localShuffleBucketUpgradeRatio = 1.5;
+
     @VarAttrDef.VarAttr(name = ENABLE_LOCAL_MERGE_SORT)
     private boolean enableLocalMergeSort = true;
 
@@ -4796,6 +4810,14 @@ public class SessionVariable implements Serializable, 
Writable {
         this.enableLocalShufflePlanner = enableLocalShufflePlanner;
     }
 
+    public double getLocalShuffleBucketUpgradeRatio() {
+        return localShuffleBucketUpgradeRatio;
+    }
+
+    public void setLocalShuffleBucketUpgradeRatio(double 
localShuffleBucketUpgradeRatio) {
+        this.localShuffleBucketUpgradeRatio = localShuffleBucketUpgradeRatio;
+    }
+
     public boolean enablePushDownNoGroupAgg() {
         return enablePushDownNoGroupAgg;
     }
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/planner/LocalShuffleNodeCoverageTest.java
 
b/fe/fe-core/src/test/java/org/apache/doris/planner/LocalShuffleNodeCoverageTest.java
index 7288ba49ca2..c4615be4974 100644
--- 
a/fe/fe-core/src/test/java/org/apache/doris/planner/LocalShuffleNodeCoverageTest.java
+++ 
b/fe/fe-core/src/test/java/org/apache/doris/planner/LocalShuffleNodeCoverageTest.java
@@ -24,6 +24,7 @@ import org.apache.doris.analysis.GroupingInfo;
 import org.apache.doris.analysis.JoinOperator;
 import org.apache.doris.analysis.OrderByElement;
 import org.apache.doris.analysis.SlotDescriptor;
+import org.apache.doris.analysis.SlotRef;
 import org.apache.doris.analysis.SortInfo;
 import org.apache.doris.analysis.TupleDescriptor;
 import org.apache.doris.analysis.TupleId;
@@ -312,6 +313,157 @@ public class LocalShuffleNodeCoverageTest {
         assertChildLocalExchangeType(serialBuildBroadcast, 1, 
LocalExchangeType.PASS_TO_ONE);
     }
 
+    private static List<List<Expr>> mockDistributeExprLists() {
+        return Lists.newArrayList(
+                Collections.singletonList(Mockito.mock(SlotRef.class)),
+                Collections.singletonList(Mockito.mock(SlotRef.class)));
+    }
+
+    @Test
+    public void testHashJoinBucketUpgradeToLocalHash() {
+        List<Expr> eqConjuncts = 
Collections.singletonList(Mockito.mock(BinaryPredicate.class));
+
+        // 1. Eligible fragment + parent doesn't need bucket → both sides 
re-distributed
+        //    with LOCAL_EXECUTION_HASH_SHUFFLE, output reports LOCAL hash.
+        PlanTranslatorContext upgradeCtx = new PlanTranslatorContext();
+        upgradeCtx.setCurrentFragmentBucketUpgradeEligible(true);
+        TrackingPlanNode probeBucket = new TrackingPlanNode(nextPlanNodeId(), 
LocalExchangeType.BUCKET_HASH_SHUFFLE);
+        TrackingPlanNode buildNoop = new TrackingPlanNode(nextPlanNodeId(), 
LocalExchangeType.NOOP);
+        HashJoinNode upgradedJoin = new HashJoinNode(nextPlanNodeId(), 
probeBucket, buildNoop,
+                JoinOperator.INNER_JOIN, eqConjuncts, Collections.emptyList(), 
null, null, false);
+        upgradedJoin.setChildrenDistributeExprLists(mockDistributeExprLists());
+        upgradedJoin.setDistributionMode(DistributionMode.BUCKET_SHUFFLE);
+        Pair<PlanNode, LocalExchangeType> upgradedOutput = 
upgradedJoin.enforceAndDeriveLocalExchange(
+                upgradeCtx, null, LocalExchangeTypeRequire.requireHash());
+        // BUCKET claim must NOT satisfy the upgrade's 
requireSpecific(LOCAL_EXECUTION_HASH):
+        // an LE is inserted on both sides.
+        
Assertions.assertEquals(LocalExchangeType.LOCAL_EXECUTION_HASH_SHUFFLE, 
upgradedOutput.second);
+        assertChildLocalExchangeType(upgradedJoin, 0, 
LocalExchangeType.LOCAL_EXECUTION_HASH_SHUFFLE);
+        assertChildLocalExchangeType(upgradedJoin, 1, 
LocalExchangeType.LOCAL_EXECUTION_HASH_SHUFFLE);
+
+        // 2. Child already providing LOCAL hash satisfies the upgraded 
require — no extra LE.
+        PlanTranslatorContext satisfiedCtx = new PlanTranslatorContext();
+        satisfiedCtx.setCurrentFragmentBucketUpgradeEligible(true);
+        TrackingPlanNode probeLocal = new TrackingPlanNode(nextPlanNodeId(),
+                LocalExchangeType.LOCAL_EXECUTION_HASH_SHUFFLE);
+        TrackingPlanNode buildLocal = new TrackingPlanNode(nextPlanNodeId(),
+                LocalExchangeType.LOCAL_EXECUTION_HASH_SHUFFLE);
+        HashJoinNode satisfiedJoin = new HashJoinNode(nextPlanNodeId(), 
probeLocal, buildLocal,
+                JoinOperator.INNER_JOIN, eqConjuncts, Collections.emptyList(), 
null, null, false);
+        
satisfiedJoin.setChildrenDistributeExprLists(mockDistributeExprLists());
+        satisfiedJoin.setDistributionMode(DistributionMode.BUCKET_SHUFFLE);
+        Pair<PlanNode, LocalExchangeType> satisfiedUpgrade = 
satisfiedJoin.enforceAndDeriveLocalExchange(
+                satisfiedCtx, null, LocalExchangeTypeRequire.requireHash());
+        
Assertions.assertEquals(LocalExchangeType.LOCAL_EXECUTION_HASH_SHUFFLE, 
satisfiedUpgrade.second);
+        Assertions.assertSame(probeLocal, satisfiedJoin.getChild(0));
+        Assertions.assertSame(buildLocal, satisfiedJoin.getChild(1));
+
+        // 3. Parent requires bucket distribution (upper bucket join) → no 
upgrade even when
+        //    the fragment is eligible: children keep BUCKET_HASH_SHUFFLE.
+        PlanTranslatorContext parentBucketCtx = new PlanTranslatorContext();
+        parentBucketCtx.setCurrentFragmentBucketUpgradeEligible(true);
+        TrackingPlanNode probeForBucketParent = new 
TrackingPlanNode(nextPlanNodeId(), LocalExchangeType.NOOP);
+        TrackingPlanNode buildForBucketParent = new 
TrackingPlanNode(nextPlanNodeId(), LocalExchangeType.NOOP);
+        HashJoinNode bucketParentJoin = new HashJoinNode(nextPlanNodeId(), 
probeForBucketParent,
+                buildForBucketParent, JoinOperator.INNER_JOIN, eqConjuncts, 
Collections.emptyList(),
+                null, null, false);
+        
bucketParentJoin.setChildrenDistributeExprLists(mockDistributeExprLists());
+        bucketParentJoin.setDistributionMode(DistributionMode.BUCKET_SHUFFLE);
+        Pair<PlanNode, LocalExchangeType> bucketParentOutput = 
bucketParentJoin.enforceAndDeriveLocalExchange(
+                parentBucketCtx, null, 
LocalExchangeTypeRequire.requireBucketHash());
+        Assertions.assertEquals(LocalExchangeType.BUCKET_HASH_SHUFFLE, 
bucketParentOutput.second);
+        assertChildLocalExchangeType(bucketParentJoin, 0, 
LocalExchangeType.BUCKET_HASH_SHUFFLE);
+        assertChildLocalExchangeType(bucketParentJoin, 1, 
LocalExchangeType.BUCKET_HASH_SHUFFLE);
+
+        // 4. Fragment not eligible (ratio gate failed / not a pooled bucket 
fragment) →
+        //    existing behavior untouched.
+        PlanTranslatorContext ineligibleCtx = new PlanTranslatorContext();
+        TrackingPlanNode probeIneligible = new 
TrackingPlanNode(nextPlanNodeId(), LocalExchangeType.NOOP);
+        TrackingPlanNode buildIneligible = new 
TrackingPlanNode(nextPlanNodeId(), LocalExchangeType.NOOP);
+        HashJoinNode ineligibleJoin = new HashJoinNode(nextPlanNodeId(), 
probeIneligible, buildIneligible,
+                JoinOperator.INNER_JOIN, eqConjuncts, Collections.emptyList(), 
null, null, false);
+        
ineligibleJoin.setChildrenDistributeExprLists(mockDistributeExprLists());
+        ineligibleJoin.setDistributionMode(DistributionMode.BUCKET_SHUFFLE);
+        Pair<PlanNode, LocalExchangeType> ineligibleOutput = 
ineligibleJoin.enforceAndDeriveLocalExchange(
+                ineligibleCtx, null, LocalExchangeTypeRequire.requireHash());
+        Assertions.assertEquals(LocalExchangeType.BUCKET_HASH_SHUFFLE, 
ineligibleOutput.second);
+        assertChildLocalExchangeType(ineligibleJoin, 0, 
LocalExchangeType.BUCKET_HASH_SHUFFLE);
+        assertChildLocalExchangeType(ineligibleJoin, 1, 
LocalExchangeType.BUCKET_HASH_SHUFFLE);
+
+        // 5. Stacked bucket joins: only the topmost upgrades. The inner join 
(direct probe
+        //    child of the upgraded one) is marked via 
hasBucketUpgradedAncestor and keeps
+        //    BUCKET requires; the outer join re-aligns the inner's BUCKET 
output to its own
+        //    keys with a LOCAL hash LE.
+        PlanTranslatorContext stackedCtx = new PlanTranslatorContext();
+        stackedCtx.setCurrentFragmentBucketUpgradeEligible(true);
+        TrackingPlanNode innerProbe = new TrackingPlanNode(nextPlanNodeId(), 
LocalExchangeType.NOOP);
+        TrackingPlanNode innerBuild = new TrackingPlanNode(nextPlanNodeId(), 
LocalExchangeType.NOOP);
+        HashJoinNode innerJoin = new HashJoinNode(nextPlanNodeId(), 
innerProbe, innerBuild,
+                JoinOperator.INNER_JOIN, eqConjuncts, Collections.emptyList(), 
null, null, false);
+        innerJoin.setChildrenDistributeExprLists(mockDistributeExprLists());
+        innerJoin.setDistributionMode(DistributionMode.BUCKET_SHUFFLE);
+        TrackingPlanNode outerBuild = new TrackingPlanNode(nextPlanNodeId(), 
LocalExchangeType.NOOP);
+        HashJoinNode outerJoin = new HashJoinNode(nextPlanNodeId(), innerJoin, 
outerBuild,
+                JoinOperator.INNER_JOIN, eqConjuncts, Collections.emptyList(), 
null, null, false);
+        outerJoin.setChildrenDistributeExprLists(mockDistributeExprLists());
+        outerJoin.setDistributionMode(DistributionMode.BUCKET_SHUFFLE);
+        Pair<PlanNode, LocalExchangeType> stackedOutput = 
outerJoin.enforceAndDeriveLocalExchange(
+                stackedCtx, null, LocalExchangeTypeRequire.requireHash());
+        
Assertions.assertEquals(LocalExchangeType.LOCAL_EXECUTION_HASH_SHUFFLE, 
stackedOutput.second);
+        // outer upgraded: probe side wrapped with LOCAL hash LE (re-aligning 
inner's output)
+        assertChildLocalExchangeType(outerJoin, 0, 
LocalExchangeType.LOCAL_EXECUTION_HASH_SHUFFLE);
+        assertChildLocalExchangeType(outerJoin, 1, 
LocalExchangeType.LOCAL_EXECUTION_HASH_SHUFFLE);
+        // inner stayed bucket: its own children keep BUCKET_HASH_SHUFFLE LEs
+        assertChildLocalExchangeType(innerJoin, 0, 
LocalExchangeType.BUCKET_HASH_SHUFFLE);
+        assertChildLocalExchangeType(innerJoin, 1, 
LocalExchangeType.BUCKET_HASH_SHUFFLE);
+
+        // 6. Colocate join takes the same upgrade path.
+        PlanTranslatorContext colocateCtx = new PlanTranslatorContext();
+        colocateCtx.setCurrentFragmentBucketUpgradeEligible(true);
+        TrackingPlanNode colocateProbe = new 
TrackingPlanNode(nextPlanNodeId(), LocalExchangeType.NOOP);
+        TrackingPlanNode colocateBuild = new 
TrackingPlanNode(nextPlanNodeId(), LocalExchangeType.NOOP);
+        HashJoinNode colocateJoin = new HashJoinNode(nextPlanNodeId(), 
colocateProbe, colocateBuild,
+                JoinOperator.INNER_JOIN, eqConjuncts, Collections.emptyList(), 
null, null, false);
+        colocateJoin.setChildrenDistributeExprLists(mockDistributeExprLists());
+        colocateJoin.setColocate(true, "test");
+        Pair<PlanNode, LocalExchangeType> colocateOutput = 
colocateJoin.enforceAndDeriveLocalExchange(
+                colocateCtx, null, LocalExchangeTypeRequire.requireHash());
+        
Assertions.assertEquals(LocalExchangeType.LOCAL_EXECUTION_HASH_SHUFFLE, 
colocateOutput.second);
+        assertChildLocalExchangeType(colocateJoin, 0, 
LocalExchangeType.LOCAL_EXECUTION_HASH_SHUFFLE);
+        assertChildLocalExchangeType(colocateJoin, 1, 
LocalExchangeType.LOCAL_EXECUTION_HASH_SHUFFLE);
+
+        // 7. Missing distribute exprs → no upgrade (the LOCAL hash LE would 
have no keys).
+        PlanTranslatorContext noExprCtx = new PlanTranslatorContext();
+        noExprCtx.setCurrentFragmentBucketUpgradeEligible(true);
+        TrackingPlanNode probeNoExpr = new TrackingPlanNode(nextPlanNodeId(), 
LocalExchangeType.NOOP);
+        TrackingPlanNode buildNoExpr = new TrackingPlanNode(nextPlanNodeId(), 
LocalExchangeType.NOOP);
+        HashJoinNode noExprJoin = new HashJoinNode(nextPlanNodeId(), 
probeNoExpr, buildNoExpr,
+                JoinOperator.INNER_JOIN, eqConjuncts, Collections.emptyList(), 
null, null, false);
+        noExprJoin.setDistributionMode(DistributionMode.BUCKET_SHUFFLE);
+        Pair<PlanNode, LocalExchangeType> noExprOutput = 
noExprJoin.enforceAndDeriveLocalExchange(
+                noExprCtx, null, LocalExchangeTypeRequire.requireHash());
+        Assertions.assertEquals(LocalExchangeType.BUCKET_HASH_SHUFFLE, 
noExprOutput.second);
+        assertChildLocalExchangeType(noExprJoin, 0, 
LocalExchangeType.BUCKET_HASH_SHUFFLE);
+        assertChildLocalExchangeType(noExprJoin, 1, 
LocalExchangeType.BUCKET_HASH_SHUFFLE);
+    }
+
+
+    @Test
+    public void testShouldUpgradeBucketParallelismGate() {
+        // ratio <= 1 (including 0 and negatives) always disables — the knob 
doubles as the
+        // off switch: requiring at most 1x parallelism gain means no gain.
+        
Assertions.assertFalse(AddLocalExchange.shouldUpgradeBucketParallelism(0, 16, 
8));
+        
Assertions.assertFalse(AddLocalExchange.shouldUpgradeBucketParallelism(-1, 16, 
8));
+        
Assertions.assertFalse(AddLocalExchange.shouldUpgradeBucketParallelism(1.0, 16, 
8));
+        // active threshold: instances must exceed buckets-with-data × ratio
+        
Assertions.assertTrue(AddLocalExchange.shouldUpgradeBucketParallelism(1.5, 16, 
8));
+        
Assertions.assertFalse(AddLocalExchange.shouldUpgradeBucketParallelism(1.5, 12, 
8));
+        
Assertions.assertFalse(AddLocalExchange.shouldUpgradeBucketParallelism(2.0, 16, 
8));
+        
Assertions.assertTrue(AddLocalExchange.shouldUpgradeBucketParallelism(1.5, 256, 
8));
+        // no buckets with data → nothing to upgrade
+        
Assertions.assertFalse(AddLocalExchange.shouldUpgradeBucketParallelism(1.5, 16, 
0));
+    }
+
     @Test
     public void testLocalExchangeNodeIsNotSerializedAsSerialOperator() {
         SerialTrackingScanNode serialScan = new 
SerialTrackingScanNode(nextPlanNodeId(), LocalExchangeType.NOOP);
diff --git 
a/regression-test/suites/nereids_p0/local_shuffle/test_local_shuffle_bucket_upgrade.groovy
 
b/regression-test/suites/nereids_p0/local_shuffle/test_local_shuffle_bucket_upgrade.groovy
new file mode 100644
index 00000000000..6627008c561
--- /dev/null
+++ 
b/regression-test/suites/nereids_p0/local_shuffle/test_local_shuffle_bucket_upgrade.groovy
@@ -0,0 +1,159 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+/**
+ * DORIS-24902 part 2: bucket -> local-hash parallelism upgrade.
+ *
+ * A pooled bucket-join fragment runs its bucket joins at bucket-count 
parallelism
+ * (only instances owning buckets do join work). When nothing above the join 
needs
+ * bucket alignment and per-BE instances > buckets-with-data x ratio
+ * (session var local_shuffle_bucket_upgrade_ratio, > 1 enables, <= 1 
disables),
+ * the FE planner re-distributes both join sides with 
LOCAL_EXECUTION_HASH_SHUFFLE
+ * so the join uses all instances.
+ *
+ * Shape notes (verified against a live cluster):
+ *  - LocalExchangeNodes only appear in EXPLAIN DISTRIBUTED PLAN (plain EXPLAIN
+ *    renders the tree before AddLocalExchange runs).
+ *  - Nereids bucket-shuffle downgrade: bucket shuffle only forms when
+ *    totalBucketNum >= totalInstanceNum * 0.8, so BUCKETS 13 with
+ *    parallel_pipeline_task_num=16 on 1 BE (13 >= 12.8) keeps the bucket join,
+ *    and ratio=1.1 (16 > 13*1.1) enables the upgrade while default 1.5 does 
not.
+ *  - The aggregation above must NOT group by the bucket key: a colocate agg
+ *    requires bucket distribution of the join output and correctly blocks the
+ *    upgrade via the parentRequire gate.
+ */
+suite("test_local_shuffle_bucket_upgrade") {
+
+    def hints = { ls_on, ratio ->
+        """/*+SET_VAR(
+            enable_sql_cache=false, disable_join_reorder=true,
+            disable_colocate_plan=true,
+            auto_broadcast_join_threshold=-1, broadcast_row_count_limit=0,
+            experimental_force_to_local_shuffle=true,
+            experimental_enable_parallel_scan=false,
+            enable_runtime_filter_prune=false,
+            enable_runtime_filter_partition_prune=false,
+            runtime_filter_type='IN,MIN_MAX',
+            parallel_pipeline_task_num=16,
+            parallel_exchange_instance_num=8,
+            query_timeout=600,
+            local_shuffle_bucket_upgrade_ratio=${ratio},
+            enable_local_shuffle=${ls_on},
+            enable_local_shuffle_planner=${ls_on}
+        )*/"""
+    }
+
+    sql "DROP TABLE IF EXISTS lsbu_fact"
+    sql "DROP TABLE IF EXISTS lsbu_probe"
+    sql "DROP TABLE IF EXISTS lsbu_probe2"
+    sql """CREATE TABLE lsbu_fact (k INT, v BIGINT)
+           ENGINE=OLAP DUPLICATE KEY(k) DISTRIBUTED BY HASH(k) BUCKETS 13
+           PROPERTIES ("replication_num"="1")"""
+    sql """CREATE TABLE lsbu_probe (pk INT, k INT, w BIGINT)
+           ENGINE=OLAP DUPLICATE KEY(pk) DISTRIBUTED BY HASH(pk) BUCKETS 7
+           PROPERTIES ("replication_num"="1")"""
+    sql """CREATE TABLE lsbu_probe2 (pk INT, k INT, w BIGINT)
+           ENGINE=OLAP DUPLICATE KEY(pk) DISTRIBUTED BY HASH(pk) BUCKETS 5
+           PROPERTIES ("replication_num"="1")"""
+    sql """INSERT INTO lsbu_fact
+           SELECT CAST(number%50 AS INT), number*10+1
+           FROM numbers("number"="200")"""
+    sql """INSERT INTO lsbu_probe
+           SELECT CAST(number AS INT), CAST(number%50 AS INT), 1000+number
+           FROM numbers("number"="300")"""
+    sql """INSERT INTO lsbu_probe2
+           SELECT CAST(number AS INT), CAST(number%50 AS INT), 2000+number
+           FROM numbers("number"="170")"""
+
+    // group key pk%10 is NOT the bucket key, so the agg above does not require
+    // bucket distribution and the upgrade is allowed.
+    def singleJoin = { h ->
+        """SELECT ${h} p.pk % 10 AS g, COUNT(*) c, SUM(f.v) sv, SUM(p.w) sw
+           FROM lsbu_fact f JOIN lsbu_probe p ON p.k = f.k
+           GROUP BY g ORDER BY g"""
+    }
+
+    // ---------- 1. plan shape (EXPLAIN DISTRIBUTED PLAN: 
post-AddLocalExchange) ----------
+    def upgradedPlan = sql "EXPLAIN DISTRIBUTED PLAN 
${singleJoin(hints('true', '1.1'))}"
+    def upgradedText = upgradedPlan.toString()
+    assertTrue(upgradedText.contains("BUCKET_SHUFFLE"),
+        "precondition: the join must be a bucket-shuffle join")
+    assertTrue(upgradedText.contains("LOCAL_EXECUTION_HASH_SHUFFLE"),
+        "ratio=1.1 must upgrade the bucket join's local exchanges to LOCAL 
hash")
+
+    def bucketPlan = sql "EXPLAIN DISTRIBUTED PLAN ${singleJoin(hints('true', 
'0'))}"
+    def bucketText = bucketPlan.toString()
+    assertTrue(bucketText.contains("BUCKET_SHUFFLE"),
+        "precondition: the join must be a bucket-shuffle join")
+    assertFalse(bucketText.contains("LOCAL_EXECUTION_HASH_SHUFFLE"),
+        "ratio=0 disables the upgrade: no LOCAL hash exchanges for the bucket 
join")
+
+    // ratio exactly 1 also keeps the upgrade off (<=1 disables)
+    def ratioOnePlan = sql "EXPLAIN DISTRIBUTED PLAN 
${singleJoin(hints('true', '1'))}"
+    
assertFalse(ratioOnePlan.toString().contains("LOCAL_EXECUTION_HASH_SHUFFLE"),
+        "ratio=1 must keep the upgrade off (<=1 disables)")
+
+    // default ratio 1.5 does not fire here: 16 < 13*1.5 (gate respects the 
threshold)
+    def ratioDefaultPlan = sql "EXPLAIN DISTRIBUTED PLAN 
${singleJoin(hints('true', '1.5'))}"
+    
assertFalse(ratioDefaultPlan.toString().contains("LOCAL_EXECUTION_HASH_SHUFFLE"),
+        "ratio=1.5 with 16 instances vs 13 buckets (16 < 19.5) must not 
upgrade")
+
+    // Note: whether a group-by-bucket-key agg blocks the upgrade depends on 
the agg
+    // shape the optimizer picks (a colocate one-phase agg requires bucket 
distribution
+    // and blocks it; a two-phase agg does not). That parentRequire gate is 
covered
+    // deterministically by LocalShuffleNodeCoverageTest; here we only pin 
correctness.
+    def bucketKeyAgg = { h ->
+        """SELECT ${h} f.k AS g, COUNT(*) c, SUM(p.w) sw
+           FROM lsbu_fact f JOIN lsbu_probe p ON p.k = f.k
+           GROUP BY g ORDER BY g"""
+    }
+    def bka_baseline = sql bucketKeyAgg(hints('false', '0'))
+    def bka_upgraded = sql bucketKeyAgg(hints('true', '1.1'))
+    assertEquals(50, bka_baseline.size())
+    assertEquals(bka_baseline, bka_upgraded,
+        "group-by-bucket-key agg over (possibly upgraded) bucket join must 
stay correct")
+
+    // ---------- 2. correctness: single bucket join ----------
+    def single_baseline = sql singleJoin(hints('false', '0'))
+    def single_bucket = sql singleJoin(hints('true', '0'))
+    def single_upgraded = sql singleJoin(hints('true', '1.1'))
+
+    assertEquals(10, single_baseline.size())
+    assertEquals(single_baseline, single_bucket,
+        "bucket join (upgrade off) must match local-shuffle-off baseline")
+    assertEquals(single_baseline, single_upgraded,
+        "upgraded bucket join must match local-shuffle-off baseline")
+
+    // ---------- 3. correctness: stacked bucket joins ----------
+    def stackedJoin = { h ->
+        """SELECT ${h} p1.pk % 10 AS g, COUNT(*) c, SUM(f.v) sv, SUM(p1.w) s1, 
SUM(p2.w) s2
+           FROM lsbu_fact f
+           JOIN lsbu_probe p1 ON p1.k = f.k
+           JOIN lsbu_probe2 p2 ON p2.k = f.k
+           GROUP BY g ORDER BY g"""
+    }
+
+    def stacked_baseline = sql stackedJoin(hints('false', '0'))
+    def stacked_bucket = sql stackedJoin(hints('true', '0'))
+    def stacked_upgraded = sql stackedJoin(hints('true', '1.1'))
+
+    assertEquals(10, stacked_baseline.size())
+    assertEquals(stacked_baseline, stacked_bucket,
+        "stacked bucket joins (upgrade off) must match local-shuffle-off 
baseline")
+    assertEquals(stacked_baseline, stacked_upgraded,
+        "stacked bucket joins (upgrade on) must match local-shuffle-off 
baseline")
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to