This is an automated email from the ASF dual-hosted git repository. 924060929 pushed a commit to branch fe_local_shuffle_optimize in repository https://gitbox.apache.org/repos/asf/doris.git
commit 7c39882f66a753f4cee77b5addc347942b5e078e Author: 924060929 <[email protected]> AuthorDate: Thu Jun 4 20:29:31 2026 +0800 [opt](local shuffle) whole-chain bucket upgrade: stacked lower joins upgrade too v1 upgraded only the topmost bucket join of a stacked chain (lower joins were vetoed via hasBucketUpgradedAncestor to avoid their LOCAL hash claim type-satisfying the upper requireSpecific(LOCAL) with different keys), which left the lower joins bucket-capped while still paying the upper re-align LE — measured -10% on a colocate-anchored stacked shape. Now a stacked lower join upgrades as well, but reports NOOP output instead of LOCAL hash: the upper join's re-align LE is then always inserted, so key alignment between levels is enforced structurally. That LE existed in the bucket world too (a BUCKET claim never satisfied the LOCAL require), so the chain upgrade is pure parallelism gain. 3-BE verification (anchor4 colocate dim4, bucket-shuffle probe_big, 16 instances, 4-bucket domain): plan shows all chain levels on LOCAL_EXECUTION_HASH_SHUFFLE; results match the local-shuffle-off baseline including forced runtime_filter_type='IN,MIN_MAX'; the stacked shape flips from -10% to +9% (0.47-0.48s -> 0.43-0.44s, 4/4 interleaved rounds), and the single-join three-arm regression stays green. --- .../java/org/apache/doris/planner/HashJoinNode.java | 18 +++++++++++++----- .../doris/planner/LocalShuffleNodeCoverageTest.java | 14 +++++++------- .../test_local_shuffle_bucket_upgrade.groovy | 7 +++++++ 3 files changed, 27 insertions(+), 12 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/HashJoinNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/HashJoinNode.java index 017792483ab..aea9a05d614 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/HashJoinNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/HashJoinNode.java @@ -355,7 +355,16 @@ public class HashJoinNode extends JoinNodeBase { LocalExchangeType.LOCAL_EXECUTION_HASH_SHUFFLE); buildSideRequire = LocalExchangeTypeRequire.requireSpecific( LocalExchangeType.LOCAL_EXECUTION_HASH_SHUFFLE); - outputType = null; // derived from probeResult.second below + // Whole-chain upgrade: a stacked bucket join below an upgraded one also + // upgrades (16-way instead of bucket-capped), but must NOT let its LOCAL + // hash claim type-satisfy the upper join's requireSpecific(LOCAL) — the + // keys may differ (each level hashes its own distribute exprs). Claim NOOP + // so the upper join always inserts its own re-align LE; that LE existed in + // the bucket world too (bucket claim never satisfied LOCAL require), so + // the chain upgrade is pure parallelism gain. + outputType = translatorContext.hasBucketUpgradedAncestor(this) + ? LocalExchangeType.NOOP + : null; // null: derived from probeResult.second below } else { probeSideRequire = LocalExchangeTypeRequire.requireBucketHash(); // For BUCKET_SHUFFLE with serial build child: use requireBucketHash() (not @@ -413,9 +422,9 @@ public class HashJoinNode extends JoinNodeBase { * <ul> * <li>the fragment passed the numeric gate (instances vs buckets-with-data × ratio), * computed once per fragment in {@code AddLocalExchange};</li> - * <li>no bucket join above already upgraded (stacked joins must keep bucket - * alignment below the single upgraded ancestor — see - * {@code PlanTranslatorContext#hasBucketUpgradedAncestor});</li> + * <li>stacked bucket joins below an upgraded one also upgrade, but report NOOP + * output so the upper join's re-align LE is always inserted — see the + * whole-chain note in {@code enforceAndDeriveLocalExchange};</li> * <li>the parent does not require bucket distribution of our output (an upper * bucket join's probe/build require — upgrading here would break the bucket * alignment it depends on);</li> @@ -426,7 +435,6 @@ public class HashJoinNode extends JoinNodeBase { private boolean canUpgradeBucketToLocalHash(PlanTranslatorContext translatorContext, LocalExchangeTypeRequire parentRequire) { if (!translatorContext.isCurrentFragmentBucketUpgradeEligible() - || translatorContext.hasBucketUpgradedAncestor(this) || parentRequire.preferType() == LocalExchangeType.BUCKET_HASH_SHUFFLE) { return false; } diff --git a/fe/fe-core/src/test/java/org/apache/doris/planner/LocalShuffleNodeCoverageTest.java b/fe/fe-core/src/test/java/org/apache/doris/planner/LocalShuffleNodeCoverageTest.java index c4615be4974..71e67a87418 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/planner/LocalShuffleNodeCoverageTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/planner/LocalShuffleNodeCoverageTest.java @@ -390,10 +390,10 @@ public class LocalShuffleNodeCoverageTest { assertChildLocalExchangeType(ineligibleJoin, 0, LocalExchangeType.BUCKET_HASH_SHUFFLE); assertChildLocalExchangeType(ineligibleJoin, 1, LocalExchangeType.BUCKET_HASH_SHUFFLE); - // 5. Stacked bucket joins: only the topmost upgrades. The inner join (direct probe - // child of the upgraded one) is marked via hasBucketUpgradedAncestor and keeps - // BUCKET requires; the outer join re-aligns the inner's BUCKET output to its own - // keys with a LOCAL hash LE. + // 5. Stacked bucket joins: the whole chain upgrades. The inner join (direct probe + // child of the upgraded one) also upgrades its children to LOCAL hash, but + // reports NOOP output so the outer join always inserts its own re-align LE + // (keys may differ between levels). PlanTranslatorContext stackedCtx = new PlanTranslatorContext(); stackedCtx.setCurrentFragmentBucketUpgradeEligible(true); TrackingPlanNode innerProbe = new TrackingPlanNode(nextPlanNodeId(), LocalExchangeType.NOOP); @@ -413,9 +413,9 @@ public class LocalShuffleNodeCoverageTest { // outer upgraded: probe side wrapped with LOCAL hash LE (re-aligning inner's output) assertChildLocalExchangeType(outerJoin, 0, LocalExchangeType.LOCAL_EXECUTION_HASH_SHUFFLE); assertChildLocalExchangeType(outerJoin, 1, LocalExchangeType.LOCAL_EXECUTION_HASH_SHUFFLE); - // inner stayed bucket: its own children keep BUCKET_HASH_SHUFFLE LEs - assertChildLocalExchangeType(innerJoin, 0, LocalExchangeType.BUCKET_HASH_SHUFFLE); - assertChildLocalExchangeType(innerJoin, 1, LocalExchangeType.BUCKET_HASH_SHUFFLE); + // inner upgraded too (whole-chain): its children get LOCAL hash LEs + assertChildLocalExchangeType(innerJoin, 0, LocalExchangeType.LOCAL_EXECUTION_HASH_SHUFFLE); + assertChildLocalExchangeType(innerJoin, 1, LocalExchangeType.LOCAL_EXECUTION_HASH_SHUFFLE); // 6. Colocate join takes the same upgrade path. PlanTranslatorContext colocateCtx = new PlanTranslatorContext(); diff --git a/regression-test/suites/nereids_p0/local_shuffle/test_local_shuffle_bucket_upgrade.groovy b/regression-test/suites/nereids_p0/local_shuffle/test_local_shuffle_bucket_upgrade.groovy index 2f6e549b0f9..e6710f0027e 100644 --- a/regression-test/suites/nereids_p0/local_shuffle/test_local_shuffle_bucket_upgrade.groovy +++ b/regression-test/suites/nereids_p0/local_shuffle/test_local_shuffle_bucket_upgrade.groovy @@ -146,6 +146,13 @@ suite("test_local_shuffle_bucket_upgrade") { GROUP BY g ORDER BY g""" } + // whole-chain shape: at an eligible ratio every level of the stacked bucket chain + // upgrades (the lower join reports NOOP so the upper re-align LE is kept). + def stackedUpgradedPlan = sql "EXPLAIN DISTRIBUTED PLAN ${stackedJoin(hints('true', '1.1'))}" + def stackedUpgradedText = stackedUpgradedPlan.toString() + assertTrue(stackedUpgradedText.contains("LOCAL_EXECUTION_HASH_SHUFFLE"), + "ratio=1.1 must upgrade the stacked bucket chain to LOCAL hash") + def stacked_baseline = sql stackedJoin(hints('false', '0')) def stacked_bucket = sql stackedJoin(hints('true', '0')) def stacked_upgraded = sql stackedJoin(hints('true', '1.1')) --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
