This is an automated email from the ASF dual-hosted git repository. 924060929 pushed a commit to branch fe_local_shuffle_optimize_pr in repository https://gitbox.apache.org/repos/asf/doris.git
commit 35c57932fd890294270c20454067b86400c8ec18 Author: 924060929 <[email protected]> AuthorDate: Wed Jun 24 17:47:12 2026 +0800 [opt](local shuffle) DORIS-24902 part2: bucket-to-hash parallelism upgrade + RF force_local_merge When a pooled-scan fragment has significantly more instances than buckets, upgrade bucket-shuffle local exchanges to LOCAL_EXECUTION_HASH_SHUFFLE so the join runs at full instance parallelism instead of being capped at bucket count. Upgrade mechanics: - AddLocalExchange.isBucketUpgradeEligible: per-worker min(instances, executor_threads) / min(buckets, executor_threads) > bucket_shuffle_upgrade_ratio (default 1.0). executor_threads comes from BE heartbeat (pipelineExecutorSize / cpuCores fallback); <=1 means unreported, no cap applied. - HashJoinNode.canUpgradeBucketToLocalHash: eligible + no bucket-upgraded ancestor (prevents stacked joins from both upgrading and losing key alignment). - Whole-chain upgrade: stacked lower bucket joins also upgrade but claim NOOP output, forcing the upper join to re-align via its own LE. Tunable: bucket_shuffle_downgrade_ratio (session variable, default 0.8) — the existing Nereids threshold below which a bucket-shuffle join is downgraded to PARTITIONED. Setting to 0 disables downgrade, preserving bucket joins for the upgrade path. Runtime filter fix (force_local_merge): bucket upgrade flips the scan from serial to parallel, breaking the implicit RF merge signal (is_serial_operator). Added TRuntimeFilterDesc.force_local_merge (thrift field): FE walks builder→target path after AddLocalExchange; if a LocalExchangeNode sits on the path, the target must merge partial RFs. BE consumer registers need_merge OR this bit. planner=false has no FE-planned LE nodes so the bit is always false (legacy inference preserved). --- be/src/runtime/runtime_state.cpp | 3 +- .../glue/translator/PlanTranslatorContext.java | 31 ++++ .../properties/ChildrenPropertiesRegulator.java | 10 +- .../org/apache/doris/planner/AddLocalExchange.java | 105 +++++++++++++ .../org/apache/doris/planner/HashJoinNode.java | 92 ++++++++++-- .../org/apache/doris/planner/RuntimeFilter.java | 37 +++++ .../java/org/apache/doris/qe/SessionVariable.java | 41 +++++ .../planner/LocalShuffleNodeCoverageTest.java | 152 +++++++++++++++++++ gensrc/thrift/PlanNodes.thrift | 6 + .../test_local_shuffle_bucket_upgrade.groovy | 166 +++++++++++++++++++++ 10 files changed, 626 insertions(+), 17 deletions(-) diff --git a/be/src/runtime/runtime_state.cpp b/be/src/runtime/runtime_state.cpp index 9e173acab7e..54989511eb8 100644 --- a/be/src/runtime/runtime_state.cpp +++ b/be/src/runtime/runtime_state.cpp @@ -508,7 +508,8 @@ Status RuntimeState::register_consumer_runtime_filter( const TRuntimeFilterDesc& desc, bool need_local_merge, int node_id, std::shared_ptr<RuntimeFilterConsumer>* consumer_filter) { _registered_runtime_filter_ids.insert(desc.filter_id); - bool need_merge = desc.has_remote_targets || need_local_merge; + bool need_merge = desc.has_remote_targets || need_local_merge || + (desc.__isset.force_local_merge && desc.force_local_merge); RuntimeFilterMgr* mgr = need_merge ? global_runtime_filter_mgr() : local_runtime_filter_mgr(); RETURN_IF_ERROR(mgr->register_consumer_filter(this, desc, node_id, consumer_filter)); // Stamp the consumer with the current recursive CTE stage so that incoming publish RPCs diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PlanTranslatorContext.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PlanTranslatorContext.java index c4ffab8b5dc..f680936b5e5 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PlanTranslatorContext.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PlanTranslatorContext.java @@ -127,6 +127,21 @@ public class PlanTranslatorContext { // needs shuffle for correctness, not just for performance like StreamingAgg pre-agg). private final Map<PlanNodeId, Boolean> shuffledAncestorMap = Maps.newHashMap(); + // Whether the fragment currently being processed by AddLocalExchange is eligible for the + // bucket → local-hash parallelism upgrade: a pooled bucket-join fragment whose per-BE + // instance count exceeds (buckets-with-data per BE) × local_shuffle_bucket_upgrade_ratio. + // Computed once per fragment in AddLocalExchange.addLocalExchange from the distributed + // plan's LocalShuffleBucketJoinAssignedJob assignments; read by + // HashJoinNode.enforceAndDeriveLocalExchange. + private boolean currentFragmentBucketUpgradeEligible = false; + + // Per-node "a bucket join above me in this fragment already upgraded to local hash" flag. + // An upgraded join marks its direct children so a stacked bucket join below keeps its + // BUCKET_HASH_SHUFFLE requires: if it also upgraded, its LOCAL hash output (keyed by ITS + // join keys) would type-satisfy the upper join's requireSpecific(LOCAL_EXECUTION_HASH) + // and suppress the LE that re-aligns data to the upper join's keys → wrong results. + private final Map<PlanNodeId, Boolean> bucketUpgradedAncestorMap = Maps.newHashMap(); + // Whether the current fragment uses LocalShuffleAssignedJob (pooling scan with // ignoreDataDistribution → _parallel_instances=1 in BE). When true, serial operators // indicate real pipeline bottlenecks needing PASSTHROUGH fan-out (heavy_ops). @@ -271,6 +286,22 @@ public class PlanTranslatorContext { return shuffledAncestorMap.getOrDefault(node.getId(), false); } + public void setCurrentFragmentBucketUpgradeEligible(boolean eligible) { + this.currentFragmentBucketUpgradeEligible = eligible; + } + + public boolean isCurrentFragmentBucketUpgradeEligible() { + return currentFragmentBucketUpgradeEligible; + } + + public void setHasBucketUpgradedAncestor(PlanNode node, boolean value) { + bucketUpgradedAncestorMap.put(node.getId(), value); + } + + public boolean hasBucketUpgradedAncestor(PlanNode node) { + return bucketUpgradedAncestorMap.getOrDefault(node.getId(), false); + } + public SlotDescriptor addSlotDesc(TupleDescriptor t) { return descTable.addSlotDescriptor(t); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/ChildrenPropertiesRegulator.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/ChildrenPropertiesRegulator.java index 845c87eea9c..e5e0d9d1bd0 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/ChildrenPropertiesRegulator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/ChildrenPropertiesRegulator.java @@ -311,7 +311,15 @@ public class ChildrenPropertiesRegulator extends PlanVisitor<List<List<PhysicalP int bucketNum = candidate.getTable().getDefaultDistributionInfo().getBucketNum(); int totalBucketNum = prunedPartNum * bucketNum; ConnectContext connectContext = ConnectContext.get(); - return totalBucketNum < connectContext.getTotalInstanceNum() * 0.8; + // <= 0 disables the downgrade entirely: with the FE local shuffle planner's + // bucket -> local-hash upgrade (local_shuffle_bucket_upgrade_ratio), few-bucket + // bucket shuffle no longer funnels, so keeping bucket shuffle (anchored side + // needs no re-shuffle) can beat downgrading to shuffle join. + double downgradeRatio = connectContext.getSessionVariable().getBucketShuffleDowngradeRatio(); + if (downgradeRatio <= 0) { + return false; + } + return totalBucketNum < connectContext.getTotalInstanceNum() * downgradeRatio; } } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/AddLocalExchange.java b/fe/fe-core/src/main/java/org/apache/doris/planner/AddLocalExchange.java index e1d607ea615..48a708956fc 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/AddLocalExchange.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/AddLocalExchange.java @@ -22,9 +22,18 @@ import org.apache.doris.nereids.glue.translator.PlanTranslatorContext; import org.apache.doris.nereids.trees.plans.distribute.DistributedPlan; import org.apache.doris.nereids.trees.plans.distribute.FragmentIdMapping; import org.apache.doris.nereids.trees.plans.distribute.PipelineDistributedPlan; +import org.apache.doris.nereids.trees.plans.distribute.worker.job.AssignedJob; +import org.apache.doris.nereids.trees.plans.distribute.worker.job.LocalShuffleBucketJoinAssignedJob; import org.apache.doris.planner.LocalExchangeNode.LocalExchangeType; import org.apache.doris.planner.LocalExchangeNode.LocalExchangeTypeRequire; import org.apache.doris.planner.LocalExchangeNode.RequireHash; +import org.apache.doris.qe.ConnectContext; + +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; /** * FE-side local exchange planner — inserts {@link LocalExchangeNode} into each fragment's @@ -81,11 +90,107 @@ public class AddLocalExchange { if (maxPerBeInstances <= 1) { continue; } + context.setCurrentFragmentBucketUpgradeEligible( + isBucketUpgradeEligible(pipePlan, maxPerBeInstances, context)); PlanFragment fragment = pipePlan.getFragmentJob().getFragment(); addLocalExchangeForFragment(fragment, context); } } + /** + * Bucket → local-hash parallelism upgrade eligibility (DORIS-24902 part 2). + * + * A pooled bucket-join fragment runs its bucket joins at bucket-count parallelism: + * each LocalShuffleBucketJoinAssignedJob owns a disjoint set of join buckets and only + * instances with buckets do join work (e.g. 8 buckets/BE but 16 instances/BE → 8 idle). + * When nothing above the join needs bucket alignment, HashJoinNode can re-distribute + * both sides with LOCAL_EXECUTION_HASH_SHUFFLE to use all instances — see + * {@link HashJoinNode#enforceAndDeriveLocalExchange}. + * + * This method computes the per-fragment numeric condition from the actual instance + * assignment: maxPerBeInstances > maxBucketsWithDataPerWorker × ratio. The ratio comes + * from session variable {@code local_shuffle_bucket_upgrade_ratio}; values <= 1 disable + * the upgrade entirely (a required parallelism gain of at most 1x means no gain). + */ + private boolean isBucketUpgradeEligible(PipelineDistributedPlan pipePlan, + long maxPerBeInstances, PlanTranslatorContext context) { + ConnectContext connectContext = context.getConnectContext(); + if (connectContext == null || connectContext.getSessionVariable() == null) { + return false; + } + double ratio = connectContext.getSessionVariable().getLocalShuffleBucketUpgradeRatio(); + List<AssignedJob> instanceJobs = pipePlan.getInstanceJobs(); + if (instanceJobs.isEmpty() + || !instanceJobs.stream().allMatch(LocalShuffleBucketJoinAssignedJob.class::isInstance)) { + // Only pooled bucket-join fragments have the bucket-count parallelism cap. + return false; + } + Map<Long, Set<Integer>> bucketsPerWorker = new HashMap<>(); + Map<Long, Integer> instancesPerWorker = new HashMap<>(); + Map<Long, Integer> coresPerWorker = new HashMap<>(); + for (AssignedJob job : instanceJobs) { + long workerId = job.getAssignedWorker().id(); + bucketsPerWorker.computeIfAbsent(workerId, k -> new HashSet<>()) + .addAll(((LocalShuffleBucketJoinAssignedJob) job).getAssignedJoinBucketIndexes()); + instancesPerWorker.merge(workerId, 1, Integer::sum); + coresPerWorker.computeIfAbsent(workerId, k -> resolveWorkerCores(job.getAssignedWorker())); + } + // Conservative: every worker that owns buckets must clear the gain bar. The gain is + // computed on EFFECTIVE parallelism (capped by the BE's executor threads): when the + // bucket count already saturates the cores, adding instances cannot speed the join + // up and the extra local exchange is a pure cost. + boolean anyBuckets = false; + for (Map.Entry<Long, Set<Integer>> entry : bucketsPerWorker.entrySet()) { + int buckets = entry.getValue().size(); + if (buckets == 0) { + continue; + } + anyBuckets = true; + int instances = instancesPerWorker.getOrDefault(entry.getKey(), 0); + int cores = coresPerWorker.getOrDefault(entry.getKey(), Integer.MAX_VALUE); + if (!shouldUpgradeBucketParallelism(ratio, + Math.min(instances, cores), Math.min(buckets, cores))) { + return false; + } + } + return anyBuckets; + } + + /** + * Effective execution threads of the worker's backend (pipelineExecutorSize, falling + * back to cpuCores). Values <= 1 mean the heartbeat has not reported yet — treat the + * capacity as unknown/uncapped rather than blocking the upgrade. + */ + private static int resolveWorkerCores( + org.apache.doris.nereids.trees.plans.distribute.worker.DistributedPlanWorker worker) { + if (worker instanceof org.apache.doris.nereids.trees.plans.distribute.worker.BackendWorker) { + org.apache.doris.system.Backend backend = + ((org.apache.doris.nereids.trees.plans.distribute.worker.BackendWorker) worker).getBackend(); + int size = backend.getPipelineExecutorSize(); + if (size <= 1) { + size = backend.getCputCores(); + } + if (size > 1) { + return size; + } + } + return Integer.MAX_VALUE; + } + + /** + * Pure numeric gate for the bucket → local-hash upgrade. + * ratio <= 1 (including 0 and negatives) always disables; otherwise upgrade when the + * per-BE instance count exceeds buckets-with-data × ratio (i.e. the parallelism gain + * is at least the configured multiple). + */ + static boolean shouldUpgradeBucketParallelism(double ratio, long maxPerBeInstances, + long maxBucketsPerWorker) { + if (ratio <= 1.0) { + return false; + } + return maxBucketsPerWorker > 0 && maxPerBeInstances > maxBucketsPerWorker * ratio; + } + private void addLocalExchangeForFragment(PlanFragment fragment, PlanTranslatorContext context) { DataSink sink = fragment.getSink(); LocalExchangeTypeRequire require = sink == null diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/HashJoinNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/HashJoinNode.java index 567c99866fd..1734c6ed257 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/HashJoinNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/HashJoinNode.java @@ -344,21 +344,55 @@ public class HashJoinNode extends JoinNodeBase { // For a non-serial probe without the flag: propagate the probe's distribution. outputType = probePassthrough ? LocalExchangeType.PASSTHROUGH : null; } else if (isColocate() || isBucketShuffle()) { - // Both probe and build sides require BUCKET_HASH_SHUFFLE: the bucket distribution - // must be preserved on both inputs. A serial child on either side is handled the - // same way (serial exchange returns NOOP → enforceRequire() inserts the LE). - probeSideRequire = LocalExchangeTypeRequire.requireBucketHash(); - // For BUCKET_SHUFFLE with serial build child: use requireBucketHash() (not - // requirePassToOne()). Unlike BROADCAST joins, BUCKET_SHUFFLE has no shared - // hash table mechanism — PASS_TO_ONE routes all data to task 0 while tasks 1..N-1 - // build empty hash tables, losing rows. BUCKET_HASH_SHUFFLE correctly distributes - // build data by bucket to match the probe side's bucket distribution. - // The serial exchange returns NOOP, so enforceRequire() will insert a - // BUCKET_HASH_SHUFFLE local exchange (with PASSTHROUGH fan-out for heavy-ops - // bottleneck avoidance). - buildSideRequire = LocalExchangeTypeRequire.requireBucketHash(); - outputType = AddLocalExchange.resolveExchangeType( - LocalExchangeTypeRequire.requireBucketHash()); + if (canUpgradeBucketToLocalHash(translatorContext, parentRequire)) { + // Bucket → local-hash parallelism upgrade (DORIS-24902 part 2): the fragment + // has noticeably more instances than buckets-with-data (see + // AddLocalExchange.isBucketUpgradeEligible) and nothing above this join needs + // bucket alignment — re-distribute both sides by their distribute keys with + // LOCAL_EXECUTION_HASH_SHUFFLE so the join runs at full instance parallelism + // instead of being capped at bucket count. The LE keys come from + // childrenDistributeExprLists (pairwise-aligned per side, a subset of the + // equi-join keys), so both sides keep hashing the same values and the + // per-instance build/probe pairing stays correct. + // + // requireSpecific (not requireHash) on purpose: the children's + // BUCKET_HASH_SHUFFLE output must NOT satisfy this require, otherwise no LE + // is inserted and the join stays bucket-capped. + // + // Mark direct children so a stacked bucket join below keeps its BUCKET + // requires: if it also upgraded, its LOCAL hash output (keyed by ITS join + // keys) would type-satisfy our requireSpecific(LOCAL_EXECUTION_HASH) and + // suppress the LE that re-aligns data to OUR keys → wrong results. + translatorContext.setHasBucketUpgradedAncestor(children.get(0), true); + translatorContext.setHasBucketUpgradedAncestor(children.get(1), true); + probeSideRequire = LocalExchangeTypeRequire.requireSpecific( + LocalExchangeType.LOCAL_EXECUTION_HASH_SHUFFLE); + buildSideRequire = LocalExchangeTypeRequire.requireSpecific( + LocalExchangeType.LOCAL_EXECUTION_HASH_SHUFFLE); + // Whole-chain upgrade: a stacked bucket join below an upgraded one also + // upgrades (16-way instead of bucket-capped), but must NOT let its LOCAL + // hash claim type-satisfy the upper join's requireSpecific(LOCAL) — the + // keys may differ (each level hashes its own distribute exprs). Claim NOOP + // so the upper join always inserts its own re-align LE; that LE existed in + // the bucket world too (bucket claim never satisfied LOCAL require), so + // the chain upgrade is pure parallelism gain. + outputType = translatorContext.hasBucketUpgradedAncestor(this) + ? LocalExchangeType.NOOP + : null; // null: derived from probeResult.second below + } else { + probeSideRequire = LocalExchangeTypeRequire.requireBucketHash(); + // For BUCKET_SHUFFLE with serial build child: use requireBucketHash() (not + // requirePassToOne()). Unlike BROADCAST joins, BUCKET_SHUFFLE has no shared + // hash table mechanism — PASS_TO_ONE routes all data to task 0 while tasks 1..N-1 + // build empty hash tables, losing rows. BUCKET_HASH_SHUFFLE correctly distributes + // build data by bucket to match the probe side's bucket distribution. + // The serial exchange returns NOOP, so enforceRequire() will insert a + // BUCKET_HASH_SHUFFLE local exchange (with PASSTHROUGH fan-out for heavy-ops + // bottleneck avoidance). + buildSideRequire = LocalExchangeTypeRequire.requireBucketHash(); + outputType = AddLocalExchange.resolveExchangeType( + LocalExchangeTypeRequire.requireBucketHash()); + } } else { // PARTITIONED (shuffle) join: both sides enter via global hash exchange. // Require GLOBAL specifically so that any inserted exchange uses the same @@ -394,4 +428,32 @@ public class HashJoinNode extends JoinNodeBase { protected boolean shouldResetSerialFlagForChild(int childIndex) { return childIndex == 1; } + + /** + * Whether this bucket-shuffle / colocate join may upgrade its children requires from + * BUCKET_HASH_SHUFFLE to LOCAL_EXECUTION_HASH_SHUFFLE for higher parallelism: + * <ul> + * <li>the fragment passed the numeric gate (instances vs buckets-with-data × ratio), + * computed once per fragment in {@code AddLocalExchange};</li> + * <li>stacked bucket joins below an upgraded one also upgrade, but report NOOP + * output so the upper join's re-align LE is always inserted — see the + * whole-chain note in {@code enforceAndDeriveLocalExchange};</li> + * <li>the parent does not require bucket distribution of our output (an upper + * bucket join's probe/build require — upgrading here would break the bucket + * alignment it depends on);</li> + * <li>both sides have non-empty distribute exprs — they become the LOCAL hash LE + * keys, an exprs-less hash exchange would be meaningless.</li> + * </ul> + */ + private boolean canUpgradeBucketToLocalHash(PlanTranslatorContext translatorContext, + LocalExchangeTypeRequire parentRequire) { + if (!translatorContext.isCurrentFragmentBucketUpgradeEligible() + || parentRequire.preferType() == LocalExchangeType.BUCKET_HASH_SHUFFLE) { + return false; + } + List<Expr> probeExprs = getChildDistributeExprList(0); + List<Expr> buildExprs = getChildDistributeExprList(1); + return probeExprs != null && !probeExprs.isEmpty() + && buildExprs != null && !buildExprs.isEmpty(); + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/RuntimeFilter.java b/fe/fe-core/src/main/java/org/apache/doris/planner/RuntimeFilter.java index 28062b97b7a..77bb4ffcdc1 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/RuntimeFilter.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/RuntimeFilter.java @@ -257,6 +257,29 @@ public final class RuntimeFilter { return finalized; } + /** + * DFS from {@code node} down to {@code target} within the fragment (stopping at + * ExchangeNode boundaries). Returns null if target is not under node, otherwise + * whether the path crosses a LocalExchangeNode. + */ + private static Boolean pathCrossesLocalExchange(PlanNode node, PlanNode target) { + if (node == target) { + return false; + } + for (PlanNode child : node.getChildren()) { + if (child instanceof ExchangeNode) { + // fragment boundary: a target behind it is a remote target, handled by + // has_remote_targets + continue; + } + Boolean sub = pathCrossesLocalExchange(child, target); + if (sub != null) { + return sub || child instanceof LocalExchangeNode; + } + } + return null; + } + /** * Serializes a runtime filter to Thrift. */ @@ -270,11 +293,25 @@ public final class RuntimeFilter { tFilter.setHasRemoteTargets(hasRemoteTargets); boolean hasSerialTargets = false; + boolean forceLocalMerge = false; for (RuntimeFilterTarget target : targets) { tFilter.putToPlanIdToTargetExpr(target.node.getId().asInt(), ExprToThriftVisitor.treeToThrift(target.expr)); hasSerialTargets = hasSerialTargets || target.node.isSerialOperatorOnBe(ConnectContext.get()); + // Truthful merge signal: if a LocalExchangeNode sits between the builder join + // and a same-fragment target scan, per-instance partial filters are not aligned + // with the scan's data slice and must be merged before being applied. BE used to + // infer this from the target scan's is_serial_operator (scan pooled => LE + // in between), which silently breaks once the scan is parallelized; this bit is + // computed from the actual plan after FE local exchange planning. In BE-planned + // mode (planner off) the FE tree has no LocalExchangeNodes and the bit stays + // false — the serial-flag inference still covers that world. + if (!forceLocalMerge && target.isLocalTarget) { + Boolean crossed = pathCrossesLocalExchange(builderNode, target.node); + forceLocalMerge = crossed != null && crossed; + } } + tFilter.setForceLocalMerge(forceLocalMerge); boolean enableSyncFilterSize = ConnectContext.get() != null && ConnectContext.get().getSessionVariable().enableSyncRuntimeFilterSize(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java index d017b921efd..6dd9b4bcf52 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java @@ -358,6 +358,10 @@ public class SessionVariable implements Serializable, Writable { public static final String ENABLE_LOCAL_SHUFFLE_PLANNER = "enable_local_shuffle_planner"; + public static final String LOCAL_SHUFFLE_BUCKET_UPGRADE_RATIO = "local_shuffle_bucket_upgrade_ratio"; + + public static final String BUCKET_SHUFFLE_DOWNGRADE_RATIO = "bucket_shuffle_downgrade_ratio"; + public static final String FORCE_TO_LOCAL_SHUFFLE = "force_to_local_shuffle"; public static final String ENABLE_LOCAL_MERGE_SORT = "enable_local_merge_sort"; @@ -1636,6 +1640,27 @@ public class SessionVariable implements Serializable, Writable { "Whether to force to local shuffle on pipelineX engine."}) private boolean forceToLocalShuffle = false; + @VarAttrDef.VarAttr( + name = LOCAL_SHUFFLE_BUCKET_UPGRADE_RATIO, fuzzy = false, varType = VariableAnnotation.EXPERIMENTAL, + description = {"FE规划Local Shuffle时, 当池化bucket join所在fragment的每BE实例数大于" + + "每BE有数据分桶数的该倍数时, 将join两侧的桶分布本地重分发为hash分布以突破桶数并发上限。" + + "必须大于1才生效; 小于等于1(含0和负数)时关闭该优化", + "When FE plans local shuffle and a pooled bucket join fragment has more instances" + + " per BE than (buckets-with-data per BE) * this ratio, re-distribute both join" + + " sides with local hash instead of bucket hash so join parallelism is no longer" + + " capped at bucket count. Only takes effect when > 1; values <= 1 (including 0" + + " and negatives) disable the upgrade."}, needForward = true) + private double localShuffleBucketUpgradeRatio = 1.5; + + @VarAttrDef.VarAttr( + name = BUCKET_SHUFFLE_DOWNGRADE_RATIO, fuzzy = false, varType = VariableAnnotation.EXPERIMENTAL, + description = {"当一侧基表总桶数小于总实例数的该倍数时, 放弃bucket shuffle join降级为shuffle join。" + + "小于等于0时永不降级。默认0.8保持原有行为", + "Downgrade bucket shuffle join to shuffle join when the base table side's total" + + " bucket count is less than total instance count times this ratio. Values <= 0" + + " never downgrade. Default 0.8 keeps the original behavior."}, needForward = true) + private double bucketShuffleDowngradeRatio = 0.8; + @VarAttrDef.VarAttr(name = ENABLE_LOCAL_MERGE_SORT) private boolean enableLocalMergeSort = true; @@ -4749,6 +4774,22 @@ public class SessionVariable implements Serializable, Writable { this.enableLocalShufflePlanner = enableLocalShufflePlanner; } + public double getLocalShuffleBucketUpgradeRatio() { + return localShuffleBucketUpgradeRatio; + } + + public void setLocalShuffleBucketUpgradeRatio(double localShuffleBucketUpgradeRatio) { + this.localShuffleBucketUpgradeRatio = localShuffleBucketUpgradeRatio; + } + + public double getBucketShuffleDowngradeRatio() { + return bucketShuffleDowngradeRatio; + } + + public void setBucketShuffleDowngradeRatio(double bucketShuffleDowngradeRatio) { + this.bucketShuffleDowngradeRatio = bucketShuffleDowngradeRatio; + } + public boolean enablePushDownNoGroupAgg() { return enablePushDownNoGroupAgg; } diff --git a/fe/fe-core/src/test/java/org/apache/doris/planner/LocalShuffleNodeCoverageTest.java b/fe/fe-core/src/test/java/org/apache/doris/planner/LocalShuffleNodeCoverageTest.java index 7288ba49ca2..71e67a87418 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/planner/LocalShuffleNodeCoverageTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/planner/LocalShuffleNodeCoverageTest.java @@ -24,6 +24,7 @@ import org.apache.doris.analysis.GroupingInfo; import org.apache.doris.analysis.JoinOperator; import org.apache.doris.analysis.OrderByElement; import org.apache.doris.analysis.SlotDescriptor; +import org.apache.doris.analysis.SlotRef; import org.apache.doris.analysis.SortInfo; import org.apache.doris.analysis.TupleDescriptor; import org.apache.doris.analysis.TupleId; @@ -312,6 +313,157 @@ public class LocalShuffleNodeCoverageTest { assertChildLocalExchangeType(serialBuildBroadcast, 1, LocalExchangeType.PASS_TO_ONE); } + private static List<List<Expr>> mockDistributeExprLists() { + return Lists.newArrayList( + Collections.singletonList(Mockito.mock(SlotRef.class)), + Collections.singletonList(Mockito.mock(SlotRef.class))); + } + + @Test + public void testHashJoinBucketUpgradeToLocalHash() { + List<Expr> eqConjuncts = Collections.singletonList(Mockito.mock(BinaryPredicate.class)); + + // 1. Eligible fragment + parent doesn't need bucket → both sides re-distributed + // with LOCAL_EXECUTION_HASH_SHUFFLE, output reports LOCAL hash. + PlanTranslatorContext upgradeCtx = new PlanTranslatorContext(); + upgradeCtx.setCurrentFragmentBucketUpgradeEligible(true); + TrackingPlanNode probeBucket = new TrackingPlanNode(nextPlanNodeId(), LocalExchangeType.BUCKET_HASH_SHUFFLE); + TrackingPlanNode buildNoop = new TrackingPlanNode(nextPlanNodeId(), LocalExchangeType.NOOP); + HashJoinNode upgradedJoin = new HashJoinNode(nextPlanNodeId(), probeBucket, buildNoop, + JoinOperator.INNER_JOIN, eqConjuncts, Collections.emptyList(), null, null, false); + upgradedJoin.setChildrenDistributeExprLists(mockDistributeExprLists()); + upgradedJoin.setDistributionMode(DistributionMode.BUCKET_SHUFFLE); + Pair<PlanNode, LocalExchangeType> upgradedOutput = upgradedJoin.enforceAndDeriveLocalExchange( + upgradeCtx, null, LocalExchangeTypeRequire.requireHash()); + // BUCKET claim must NOT satisfy the upgrade's requireSpecific(LOCAL_EXECUTION_HASH): + // an LE is inserted on both sides. + Assertions.assertEquals(LocalExchangeType.LOCAL_EXECUTION_HASH_SHUFFLE, upgradedOutput.second); + assertChildLocalExchangeType(upgradedJoin, 0, LocalExchangeType.LOCAL_EXECUTION_HASH_SHUFFLE); + assertChildLocalExchangeType(upgradedJoin, 1, LocalExchangeType.LOCAL_EXECUTION_HASH_SHUFFLE); + + // 2. Child already providing LOCAL hash satisfies the upgraded require — no extra LE. + PlanTranslatorContext satisfiedCtx = new PlanTranslatorContext(); + satisfiedCtx.setCurrentFragmentBucketUpgradeEligible(true); + TrackingPlanNode probeLocal = new TrackingPlanNode(nextPlanNodeId(), + LocalExchangeType.LOCAL_EXECUTION_HASH_SHUFFLE); + TrackingPlanNode buildLocal = new TrackingPlanNode(nextPlanNodeId(), + LocalExchangeType.LOCAL_EXECUTION_HASH_SHUFFLE); + HashJoinNode satisfiedJoin = new HashJoinNode(nextPlanNodeId(), probeLocal, buildLocal, + JoinOperator.INNER_JOIN, eqConjuncts, Collections.emptyList(), null, null, false); + satisfiedJoin.setChildrenDistributeExprLists(mockDistributeExprLists()); + satisfiedJoin.setDistributionMode(DistributionMode.BUCKET_SHUFFLE); + Pair<PlanNode, LocalExchangeType> satisfiedUpgrade = satisfiedJoin.enforceAndDeriveLocalExchange( + satisfiedCtx, null, LocalExchangeTypeRequire.requireHash()); + Assertions.assertEquals(LocalExchangeType.LOCAL_EXECUTION_HASH_SHUFFLE, satisfiedUpgrade.second); + Assertions.assertSame(probeLocal, satisfiedJoin.getChild(0)); + Assertions.assertSame(buildLocal, satisfiedJoin.getChild(1)); + + // 3. Parent requires bucket distribution (upper bucket join) → no upgrade even when + // the fragment is eligible: children keep BUCKET_HASH_SHUFFLE. + PlanTranslatorContext parentBucketCtx = new PlanTranslatorContext(); + parentBucketCtx.setCurrentFragmentBucketUpgradeEligible(true); + TrackingPlanNode probeForBucketParent = new TrackingPlanNode(nextPlanNodeId(), LocalExchangeType.NOOP); + TrackingPlanNode buildForBucketParent = new TrackingPlanNode(nextPlanNodeId(), LocalExchangeType.NOOP); + HashJoinNode bucketParentJoin = new HashJoinNode(nextPlanNodeId(), probeForBucketParent, + buildForBucketParent, JoinOperator.INNER_JOIN, eqConjuncts, Collections.emptyList(), + null, null, false); + bucketParentJoin.setChildrenDistributeExprLists(mockDistributeExprLists()); + bucketParentJoin.setDistributionMode(DistributionMode.BUCKET_SHUFFLE); + Pair<PlanNode, LocalExchangeType> bucketParentOutput = bucketParentJoin.enforceAndDeriveLocalExchange( + parentBucketCtx, null, LocalExchangeTypeRequire.requireBucketHash()); + Assertions.assertEquals(LocalExchangeType.BUCKET_HASH_SHUFFLE, bucketParentOutput.second); + assertChildLocalExchangeType(bucketParentJoin, 0, LocalExchangeType.BUCKET_HASH_SHUFFLE); + assertChildLocalExchangeType(bucketParentJoin, 1, LocalExchangeType.BUCKET_HASH_SHUFFLE); + + // 4. Fragment not eligible (ratio gate failed / not a pooled bucket fragment) → + // existing behavior untouched. + PlanTranslatorContext ineligibleCtx = new PlanTranslatorContext(); + TrackingPlanNode probeIneligible = new TrackingPlanNode(nextPlanNodeId(), LocalExchangeType.NOOP); + TrackingPlanNode buildIneligible = new TrackingPlanNode(nextPlanNodeId(), LocalExchangeType.NOOP); + HashJoinNode ineligibleJoin = new HashJoinNode(nextPlanNodeId(), probeIneligible, buildIneligible, + JoinOperator.INNER_JOIN, eqConjuncts, Collections.emptyList(), null, null, false); + ineligibleJoin.setChildrenDistributeExprLists(mockDistributeExprLists()); + ineligibleJoin.setDistributionMode(DistributionMode.BUCKET_SHUFFLE); + Pair<PlanNode, LocalExchangeType> ineligibleOutput = ineligibleJoin.enforceAndDeriveLocalExchange( + ineligibleCtx, null, LocalExchangeTypeRequire.requireHash()); + Assertions.assertEquals(LocalExchangeType.BUCKET_HASH_SHUFFLE, ineligibleOutput.second); + assertChildLocalExchangeType(ineligibleJoin, 0, LocalExchangeType.BUCKET_HASH_SHUFFLE); + assertChildLocalExchangeType(ineligibleJoin, 1, LocalExchangeType.BUCKET_HASH_SHUFFLE); + + // 5. Stacked bucket joins: the whole chain upgrades. The inner join (direct probe + // child of the upgraded one) also upgrades its children to LOCAL hash, but + // reports NOOP output so the outer join always inserts its own re-align LE + // (keys may differ between levels). + PlanTranslatorContext stackedCtx = new PlanTranslatorContext(); + stackedCtx.setCurrentFragmentBucketUpgradeEligible(true); + TrackingPlanNode innerProbe = new TrackingPlanNode(nextPlanNodeId(), LocalExchangeType.NOOP); + TrackingPlanNode innerBuild = new TrackingPlanNode(nextPlanNodeId(), LocalExchangeType.NOOP); + HashJoinNode innerJoin = new HashJoinNode(nextPlanNodeId(), innerProbe, innerBuild, + JoinOperator.INNER_JOIN, eqConjuncts, Collections.emptyList(), null, null, false); + innerJoin.setChildrenDistributeExprLists(mockDistributeExprLists()); + innerJoin.setDistributionMode(DistributionMode.BUCKET_SHUFFLE); + TrackingPlanNode outerBuild = new TrackingPlanNode(nextPlanNodeId(), LocalExchangeType.NOOP); + HashJoinNode outerJoin = new HashJoinNode(nextPlanNodeId(), innerJoin, outerBuild, + JoinOperator.INNER_JOIN, eqConjuncts, Collections.emptyList(), null, null, false); + outerJoin.setChildrenDistributeExprLists(mockDistributeExprLists()); + outerJoin.setDistributionMode(DistributionMode.BUCKET_SHUFFLE); + Pair<PlanNode, LocalExchangeType> stackedOutput = outerJoin.enforceAndDeriveLocalExchange( + stackedCtx, null, LocalExchangeTypeRequire.requireHash()); + Assertions.assertEquals(LocalExchangeType.LOCAL_EXECUTION_HASH_SHUFFLE, stackedOutput.second); + // outer upgraded: probe side wrapped with LOCAL hash LE (re-aligning inner's output) + assertChildLocalExchangeType(outerJoin, 0, LocalExchangeType.LOCAL_EXECUTION_HASH_SHUFFLE); + assertChildLocalExchangeType(outerJoin, 1, LocalExchangeType.LOCAL_EXECUTION_HASH_SHUFFLE); + // inner upgraded too (whole-chain): its children get LOCAL hash LEs + assertChildLocalExchangeType(innerJoin, 0, LocalExchangeType.LOCAL_EXECUTION_HASH_SHUFFLE); + assertChildLocalExchangeType(innerJoin, 1, LocalExchangeType.LOCAL_EXECUTION_HASH_SHUFFLE); + + // 6. Colocate join takes the same upgrade path. + PlanTranslatorContext colocateCtx = new PlanTranslatorContext(); + colocateCtx.setCurrentFragmentBucketUpgradeEligible(true); + TrackingPlanNode colocateProbe = new TrackingPlanNode(nextPlanNodeId(), LocalExchangeType.NOOP); + TrackingPlanNode colocateBuild = new TrackingPlanNode(nextPlanNodeId(), LocalExchangeType.NOOP); + HashJoinNode colocateJoin = new HashJoinNode(nextPlanNodeId(), colocateProbe, colocateBuild, + JoinOperator.INNER_JOIN, eqConjuncts, Collections.emptyList(), null, null, false); + colocateJoin.setChildrenDistributeExprLists(mockDistributeExprLists()); + colocateJoin.setColocate(true, "test"); + Pair<PlanNode, LocalExchangeType> colocateOutput = colocateJoin.enforceAndDeriveLocalExchange( + colocateCtx, null, LocalExchangeTypeRequire.requireHash()); + Assertions.assertEquals(LocalExchangeType.LOCAL_EXECUTION_HASH_SHUFFLE, colocateOutput.second); + assertChildLocalExchangeType(colocateJoin, 0, LocalExchangeType.LOCAL_EXECUTION_HASH_SHUFFLE); + assertChildLocalExchangeType(colocateJoin, 1, LocalExchangeType.LOCAL_EXECUTION_HASH_SHUFFLE); + + // 7. Missing distribute exprs → no upgrade (the LOCAL hash LE would have no keys). + PlanTranslatorContext noExprCtx = new PlanTranslatorContext(); + noExprCtx.setCurrentFragmentBucketUpgradeEligible(true); + TrackingPlanNode probeNoExpr = new TrackingPlanNode(nextPlanNodeId(), LocalExchangeType.NOOP); + TrackingPlanNode buildNoExpr = new TrackingPlanNode(nextPlanNodeId(), LocalExchangeType.NOOP); + HashJoinNode noExprJoin = new HashJoinNode(nextPlanNodeId(), probeNoExpr, buildNoExpr, + JoinOperator.INNER_JOIN, eqConjuncts, Collections.emptyList(), null, null, false); + noExprJoin.setDistributionMode(DistributionMode.BUCKET_SHUFFLE); + Pair<PlanNode, LocalExchangeType> noExprOutput = noExprJoin.enforceAndDeriveLocalExchange( + noExprCtx, null, LocalExchangeTypeRequire.requireHash()); + Assertions.assertEquals(LocalExchangeType.BUCKET_HASH_SHUFFLE, noExprOutput.second); + assertChildLocalExchangeType(noExprJoin, 0, LocalExchangeType.BUCKET_HASH_SHUFFLE); + assertChildLocalExchangeType(noExprJoin, 1, LocalExchangeType.BUCKET_HASH_SHUFFLE); + } + + + @Test + public void testShouldUpgradeBucketParallelismGate() { + // ratio <= 1 (including 0 and negatives) always disables — the knob doubles as the + // off switch: requiring at most 1x parallelism gain means no gain. + Assertions.assertFalse(AddLocalExchange.shouldUpgradeBucketParallelism(0, 16, 8)); + Assertions.assertFalse(AddLocalExchange.shouldUpgradeBucketParallelism(-1, 16, 8)); + Assertions.assertFalse(AddLocalExchange.shouldUpgradeBucketParallelism(1.0, 16, 8)); + // active threshold: instances must exceed buckets-with-data × ratio + Assertions.assertTrue(AddLocalExchange.shouldUpgradeBucketParallelism(1.5, 16, 8)); + Assertions.assertFalse(AddLocalExchange.shouldUpgradeBucketParallelism(1.5, 12, 8)); + Assertions.assertFalse(AddLocalExchange.shouldUpgradeBucketParallelism(2.0, 16, 8)); + Assertions.assertTrue(AddLocalExchange.shouldUpgradeBucketParallelism(1.5, 256, 8)); + // no buckets with data → nothing to upgrade + Assertions.assertFalse(AddLocalExchange.shouldUpgradeBucketParallelism(1.5, 16, 0)); + } + @Test public void testLocalExchangeNodeIsNotSerializedAsSerialOperator() { SerialTrackingScanNode serialScan = new SerialTrackingScanNode(nextPlanNodeId(), LocalExchangeType.NOOP); diff --git a/gensrc/thrift/PlanNodes.thrift b/gensrc/thrift/PlanNodes.thrift index 4f8826bdb3f..fb8ef30150e 100644 --- a/gensrc/thrift/PlanNodes.thrift +++ b/gensrc/thrift/PlanNodes.thrift @@ -1609,6 +1609,12 @@ struct TRuntimeFilterDesc { // the listed partitions with the listed direction; absent partitions are // unsafe for this RF target and must not be pruned by it. 20: optional map<Types.TPlanNodeId, list<TPartitionTargetExprMonotonicity>> planId_to_partition_target_monotonicity; + + // True when a local exchange sits between the filter builder (join) and a same-fragment + // target scan: per-instance partial filters are then NOT aligned with the scan's data + // slice and must be merged before being applied. Computed truthfully by FE after local + // exchange planning; replaces inferring this from the target scan's is_serial_operator. + 21: optional bool force_local_merge; } diff --git a/regression-test/suites/nereids_p0/local_shuffle/test_local_shuffle_bucket_upgrade.groovy b/regression-test/suites/nereids_p0/local_shuffle/test_local_shuffle_bucket_upgrade.groovy new file mode 100644 index 00000000000..0d9c21fe7d9 --- /dev/null +++ b/regression-test/suites/nereids_p0/local_shuffle/test_local_shuffle_bucket_upgrade.groovy @@ -0,0 +1,166 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +/** + * DORIS-24902 part 2: bucket -> local-hash parallelism upgrade. + * + * A pooled bucket-join fragment runs its bucket joins at bucket-count parallelism + * (only instances owning buckets do join work). When nothing above the join needs + * bucket alignment and per-BE instances > buckets-with-data x ratio + * (session var local_shuffle_bucket_upgrade_ratio, > 1 enables, <= 1 disables), + * the FE planner re-distributes both join sides with LOCAL_EXECUTION_HASH_SHUFFLE + * so the join uses all instances. + * + * Shape notes (verified against a live cluster): + * - LocalExchangeNodes only appear in EXPLAIN DISTRIBUTED PLAN (plain EXPLAIN + * renders the tree before AddLocalExchange runs). + * - Nereids bucket-shuffle downgrade: bucket shuffle only forms when + * totalBucketNum >= totalInstanceNum * 0.8, so BUCKETS 13 with + * parallel_pipeline_task_num=16 on 1 BE (13 >= 12.8) keeps the bucket join, + * and ratio=1.1 (16 > 13*1.1) enables the upgrade while default 1.5 does not. + * - The aggregation above must NOT group by the bucket key: a colocate agg + * requires bucket distribution of the join output and correctly blocks the + * upgrade via the parentRequire gate. + */ +suite("test_local_shuffle_bucket_upgrade") { + + def hints = { ls_on, ratio -> + """/*+SET_VAR( + enable_sql_cache=false, disable_join_reorder=true, + disable_colocate_plan=true, + auto_broadcast_join_threshold=-1, broadcast_row_count_limit=0, + experimental_force_to_local_shuffle=true, + experimental_enable_parallel_scan=false, + enable_runtime_filter_prune=false, + enable_runtime_filter_partition_prune=false, + runtime_filter_type='IN,MIN_MAX', + parallel_pipeline_task_num=16, + parallel_exchange_instance_num=8, + query_timeout=600, + local_shuffle_bucket_upgrade_ratio=${ratio}, + enable_local_shuffle=${ls_on}, + enable_local_shuffle_planner=${ls_on} + )*/""" + } + + sql "DROP TABLE IF EXISTS lsbu_fact" + sql "DROP TABLE IF EXISTS lsbu_probe" + sql "DROP TABLE IF EXISTS lsbu_probe2" + sql """CREATE TABLE lsbu_fact (k INT, v BIGINT) + ENGINE=OLAP DUPLICATE KEY(k) DISTRIBUTED BY HASH(k) BUCKETS 13 + PROPERTIES ("replication_num"="1")""" + sql """CREATE TABLE lsbu_probe (pk INT, k INT, w BIGINT) + ENGINE=OLAP DUPLICATE KEY(pk) DISTRIBUTED BY HASH(pk) BUCKETS 7 + PROPERTIES ("replication_num"="1")""" + sql """CREATE TABLE lsbu_probe2 (pk INT, k INT, w BIGINT) + ENGINE=OLAP DUPLICATE KEY(pk) DISTRIBUTED BY HASH(pk) BUCKETS 5 + PROPERTIES ("replication_num"="1")""" + sql """INSERT INTO lsbu_fact + SELECT CAST(number%50 AS INT), number*10+1 + FROM numbers("number"="200")""" + sql """INSERT INTO lsbu_probe + SELECT CAST(number AS INT), CAST(number%50 AS INT), 1000+number + FROM numbers("number"="300")""" + sql """INSERT INTO lsbu_probe2 + SELECT CAST(number AS INT), CAST(number%50 AS INT), 2000+number + FROM numbers("number"="170")""" + + // group key pk%10 is NOT the bucket key, so the agg above does not require + // bucket distribution and the upgrade is allowed. + def singleJoin = { h -> + """SELECT ${h} p.pk % 10 AS g, COUNT(*) c, SUM(f.v) sv, SUM(p.w) sw + FROM lsbu_fact f JOIN lsbu_probe p ON p.k = f.k + GROUP BY g ORDER BY g""" + } + + // ---------- 1. plan shape (EXPLAIN DISTRIBUTED PLAN: post-AddLocalExchange) ---------- + def upgradedPlan = sql "EXPLAIN DISTRIBUTED PLAN ${singleJoin(hints('true', '1.1'))}" + def upgradedText = upgradedPlan.toString() + assertTrue(upgradedText.contains("BUCKET_SHUFFLE"), + "precondition: the join must be a bucket-shuffle join") + assertTrue(upgradedText.contains("LOCAL_EXECUTION_HASH_SHUFFLE"), + "ratio=1.1 must upgrade the bucket join's local exchanges to LOCAL hash") + + def bucketPlan = sql "EXPLAIN DISTRIBUTED PLAN ${singleJoin(hints('true', '0'))}" + def bucketText = bucketPlan.toString() + assertTrue(bucketText.contains("BUCKET_SHUFFLE"), + "precondition: the join must be a bucket-shuffle join") + assertFalse(bucketText.contains("LOCAL_EXECUTION_HASH_SHUFFLE"), + "ratio=0 disables the upgrade: no LOCAL hash exchanges for the bucket join") + + // ratio exactly 1 also keeps the upgrade off (<=1 disables) + def ratioOnePlan = sql "EXPLAIN DISTRIBUTED PLAN ${singleJoin(hints('true', '1'))}" + assertFalse(ratioOnePlan.toString().contains("LOCAL_EXECUTION_HASH_SHUFFLE"), + "ratio=1 must keep the upgrade off (<=1 disables)") + + // default ratio 1.5 does not fire here: 16 < 13*1.5 (gate respects the threshold) + def ratioDefaultPlan = sql "EXPLAIN DISTRIBUTED PLAN ${singleJoin(hints('true', '1.5'))}" + assertFalse(ratioDefaultPlan.toString().contains("LOCAL_EXECUTION_HASH_SHUFFLE"), + "ratio=1.5 with 16 instances vs 13 buckets (16 < 19.5) must not upgrade") + + // Note: whether a group-by-bucket-key agg blocks the upgrade depends on the agg + // shape the optimizer picks (a colocate one-phase agg requires bucket distribution + // and blocks it; a two-phase agg does not). That parentRequire gate is covered + // deterministically by LocalShuffleNodeCoverageTest; here we only pin correctness. + def bucketKeyAgg = { h -> + """SELECT ${h} f.k AS g, COUNT(*) c, SUM(p.w) sw + FROM lsbu_fact f JOIN lsbu_probe p ON p.k = f.k + GROUP BY g ORDER BY g""" + } + def bka_baseline = sql bucketKeyAgg(hints('false', '0')) + def bka_upgraded = sql bucketKeyAgg(hints('true', '1.1')) + assertEquals(50, bka_baseline.size()) + assertEquals(bka_baseline, bka_upgraded, + "group-by-bucket-key agg over (possibly upgraded) bucket join must stay correct") + + // ---------- 2. correctness: single bucket join ---------- + def single_baseline = sql singleJoin(hints('false', '0')) + def single_bucket = sql singleJoin(hints('true', '0')) + def single_upgraded = sql singleJoin(hints('true', '1.1')) + + assertEquals(10, single_baseline.size()) + assertEquals(single_baseline, single_bucket, + "bucket join (upgrade off) must match local-shuffle-off baseline") + assertEquals(single_baseline, single_upgraded, + "upgraded bucket join must match local-shuffle-off baseline") + + // ---------- 3. correctness: stacked bucket joins ---------- + def stackedJoin = { h -> + """SELECT ${h} p1.pk % 10 AS g, COUNT(*) c, SUM(f.v) sv, SUM(p1.w) s1, SUM(p2.w) s2 + FROM lsbu_fact f + JOIN lsbu_probe p1 ON p1.k = f.k + JOIN lsbu_probe2 p2 ON p2.k = f.k + GROUP BY g ORDER BY g""" + } + + // whole-chain shape: at an eligible ratio every level of the stacked bucket chain + // upgrades (the lower join reports NOOP so the upper re-align LE is kept). + def stackedUpgradedPlan = sql "EXPLAIN DISTRIBUTED PLAN ${stackedJoin(hints('true', '1.1'))}" + def stackedUpgradedText = stackedUpgradedPlan.toString() + assertTrue(stackedUpgradedText.contains("LOCAL_EXECUTION_HASH_SHUFFLE"), + "ratio=1.1 must upgrade the stacked bucket chain to LOCAL hash") + + def stacked_baseline = sql stackedJoin(hints('false', '0')) + def stacked_bucket = sql stackedJoin(hints('true', '0')) + def stacked_upgraded = sql stackedJoin(hints('true', '1.1')) + + assertEquals(10, stacked_baseline.size()) + assertEquals(stacked_baseline, stacked_bucket, + "stacked bucket joins (upgrade off) must match local-shuffle-off baseline") + assertEquals(stacked_baseline, stacked_upgraded, + "stacked bucket joins (upgrade on) must match local-shuffle-off baseline") +} --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
