This is an automated email from the ASF dual-hosted git repository.

924060929 pushed a commit to branch fe_local_shuffle_optimize
in repository https://gitbox.apache.org/repos/asf/doris.git

commit 7c39882f66a753f4cee77b5addc347942b5e078e
Author: 924060929 <[email protected]>
AuthorDate: Thu Jun 4 20:29:31 2026 +0800

    [opt](local shuffle) whole-chain bucket upgrade: stacked lower joins 
upgrade too
    
    v1 upgraded only the topmost bucket join of a stacked chain (lower joins 
were
    vetoed via hasBucketUpgradedAncestor to avoid their LOCAL hash claim
    type-satisfying the upper requireSpecific(LOCAL) with different keys), which
    left the lower joins bucket-capped while still paying the upper re-align LE 
—
    measured -10% on a colocate-anchored stacked shape.
    
    Now a stacked lower join upgrades as well, but reports NOOP output instead 
of
    LOCAL hash: the upper join's re-align LE is then always inserted, so key
    alignment between levels is enforced structurally. That LE existed in the
    bucket world too (a BUCKET claim never satisfied the LOCAL require), so the
    chain upgrade is pure parallelism gain.
    
    3-BE verification (anchor4 colocate dim4, bucket-shuffle probe_big, 16
    instances, 4-bucket domain): plan shows all chain levels on
    LOCAL_EXECUTION_HASH_SHUFFLE; results match the local-shuffle-off baseline
    including forced runtime_filter_type='IN,MIN_MAX'; the stacked shape flips
    from -10% to +9% (0.47-0.48s -> 0.43-0.44s, 4/4 interleaved rounds), and the
    single-join three-arm regression stays green.
---
 .../java/org/apache/doris/planner/HashJoinNode.java    | 18 +++++++++++++-----
 .../doris/planner/LocalShuffleNodeCoverageTest.java    | 14 +++++++-------
 .../test_local_shuffle_bucket_upgrade.groovy           |  7 +++++++
 3 files changed, 27 insertions(+), 12 deletions(-)

diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/planner/HashJoinNode.java 
b/fe/fe-core/src/main/java/org/apache/doris/planner/HashJoinNode.java
index 017792483ab..aea9a05d614 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/HashJoinNode.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/HashJoinNode.java
@@ -355,7 +355,16 @@ public class HashJoinNode extends JoinNodeBase {
                         LocalExchangeType.LOCAL_EXECUTION_HASH_SHUFFLE);
                 buildSideRequire = LocalExchangeTypeRequire.requireSpecific(
                         LocalExchangeType.LOCAL_EXECUTION_HASH_SHUFFLE);
-                outputType = null; // derived from probeResult.second below
+                // Whole-chain upgrade: a stacked bucket join below an 
upgraded one also
+                // upgrades (16-way instead of bucket-capped), but must NOT 
let its LOCAL
+                // hash claim type-satisfy the upper join's 
requireSpecific(LOCAL) — the
+                // keys may differ (each level hashes its own distribute 
exprs). Claim NOOP
+                // so the upper join always inserts its own re-align LE; that 
LE existed in
+                // the bucket world too (bucket claim never satisfied LOCAL 
require), so
+                // the chain upgrade is pure parallelism gain.
+                outputType = translatorContext.hasBucketUpgradedAncestor(this)
+                        ? LocalExchangeType.NOOP
+                        : null; // null: derived from probeResult.second below
             } else {
                 probeSideRequire = 
LocalExchangeTypeRequire.requireBucketHash();
                 // For BUCKET_SHUFFLE with serial build child: use 
requireBucketHash() (not
@@ -413,9 +422,9 @@ public class HashJoinNode extends JoinNodeBase {
      * <ul>
      *   <li>the fragment passed the numeric gate (instances vs 
buckets-with-data × ratio),
      *       computed once per fragment in {@code AddLocalExchange};</li>
-     *   <li>no bucket join above already upgraded (stacked joins must keep 
bucket
-     *       alignment below the single upgraded ancestor — see
-     *       {@code PlanTranslatorContext#hasBucketUpgradedAncestor});</li>
+     *   <li>stacked bucket joins below an upgraded one also upgrade, but 
report NOOP
+     *       output so the upper join's re-align LE is always inserted — see 
the
+     *       whole-chain note in {@code enforceAndDeriveLocalExchange};</li>
      *   <li>the parent does not require bucket distribution of our output (an 
upper
      *       bucket join's probe/build require — upgrading here would break 
the bucket
      *       alignment it depends on);</li>
@@ -426,7 +435,6 @@ public class HashJoinNode extends JoinNodeBase {
     private boolean canUpgradeBucketToLocalHash(PlanTranslatorContext 
translatorContext,
             LocalExchangeTypeRequire parentRequire) {
         if (!translatorContext.isCurrentFragmentBucketUpgradeEligible()
-                || translatorContext.hasBucketUpgradedAncestor(this)
                 || parentRequire.preferType() == 
LocalExchangeType.BUCKET_HASH_SHUFFLE) {
             return false;
         }
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/planner/LocalShuffleNodeCoverageTest.java
 
b/fe/fe-core/src/test/java/org/apache/doris/planner/LocalShuffleNodeCoverageTest.java
index c4615be4974..71e67a87418 100644
--- 
a/fe/fe-core/src/test/java/org/apache/doris/planner/LocalShuffleNodeCoverageTest.java
+++ 
b/fe/fe-core/src/test/java/org/apache/doris/planner/LocalShuffleNodeCoverageTest.java
@@ -390,10 +390,10 @@ public class LocalShuffleNodeCoverageTest {
         assertChildLocalExchangeType(ineligibleJoin, 0, 
LocalExchangeType.BUCKET_HASH_SHUFFLE);
         assertChildLocalExchangeType(ineligibleJoin, 1, 
LocalExchangeType.BUCKET_HASH_SHUFFLE);
 
-        // 5. Stacked bucket joins: only the topmost upgrades. The inner join 
(direct probe
-        //    child of the upgraded one) is marked via 
hasBucketUpgradedAncestor and keeps
-        //    BUCKET requires; the outer join re-aligns the inner's BUCKET 
output to its own
-        //    keys with a LOCAL hash LE.
+        // 5. Stacked bucket joins: the whole chain upgrades. The inner join 
(direct probe
+        //    child of the upgraded one) also upgrades its children to LOCAL 
hash, but
+        //    reports NOOP output so the outer join always inserts its own 
re-align LE
+        //    (keys may differ between levels).
         PlanTranslatorContext stackedCtx = new PlanTranslatorContext();
         stackedCtx.setCurrentFragmentBucketUpgradeEligible(true);
         TrackingPlanNode innerProbe = new TrackingPlanNode(nextPlanNodeId(), 
LocalExchangeType.NOOP);
@@ -413,9 +413,9 @@ public class LocalShuffleNodeCoverageTest {
         // outer upgraded: probe side wrapped with LOCAL hash LE (re-aligning 
inner's output)
         assertChildLocalExchangeType(outerJoin, 0, 
LocalExchangeType.LOCAL_EXECUTION_HASH_SHUFFLE);
         assertChildLocalExchangeType(outerJoin, 1, 
LocalExchangeType.LOCAL_EXECUTION_HASH_SHUFFLE);
-        // inner stayed bucket: its own children keep BUCKET_HASH_SHUFFLE LEs
-        assertChildLocalExchangeType(innerJoin, 0, 
LocalExchangeType.BUCKET_HASH_SHUFFLE);
-        assertChildLocalExchangeType(innerJoin, 1, 
LocalExchangeType.BUCKET_HASH_SHUFFLE);
+        // inner upgraded too (whole-chain): its children get LOCAL hash LEs
+        assertChildLocalExchangeType(innerJoin, 0, 
LocalExchangeType.LOCAL_EXECUTION_HASH_SHUFFLE);
+        assertChildLocalExchangeType(innerJoin, 1, 
LocalExchangeType.LOCAL_EXECUTION_HASH_SHUFFLE);
 
         // 6. Colocate join takes the same upgrade path.
         PlanTranslatorContext colocateCtx = new PlanTranslatorContext();
diff --git 
a/regression-test/suites/nereids_p0/local_shuffle/test_local_shuffle_bucket_upgrade.groovy
 
b/regression-test/suites/nereids_p0/local_shuffle/test_local_shuffle_bucket_upgrade.groovy
index 2f6e549b0f9..e6710f0027e 100644
--- 
a/regression-test/suites/nereids_p0/local_shuffle/test_local_shuffle_bucket_upgrade.groovy
+++ 
b/regression-test/suites/nereids_p0/local_shuffle/test_local_shuffle_bucket_upgrade.groovy
@@ -146,6 +146,13 @@ suite("test_local_shuffle_bucket_upgrade") {
            GROUP BY g ORDER BY g"""
     }
 
+    // whole-chain shape: at an eligible ratio every level of the stacked 
bucket chain
+    // upgrades (the lower join reports NOOP so the upper re-align LE is kept).
+    def stackedUpgradedPlan = sql "EXPLAIN DISTRIBUTED PLAN 
${stackedJoin(hints('true', '1.1'))}"
+    def stackedUpgradedText = stackedUpgradedPlan.toString()
+    assertTrue(stackedUpgradedText.contains("LOCAL_EXECUTION_HASH_SHUFFLE"),
+        "ratio=1.1 must upgrade the stacked bucket chain to LOCAL hash")
+
     def stacked_baseline = sql stackedJoin(hints('false', '0'))
     def stacked_bucket = sql stackedJoin(hints('true', '0'))
     def stacked_upgraded = sql stackedJoin(hints('true', '1.1'))


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to