This is an automated email from the ASF dual-hosted git repository. 924060929 pushed a commit to branch fe_local_shuffle_optimize in repository https://gitbox.apache.org/repos/asf/doris.git
commit 192729a4442b63b057cda8feb374097f7ad8f756 Author: 924060929 <[email protected]> AuthorDate: Thu Jun 4 20:19:13 2026 +0800 Revert "[opt](local shuffle) parallel pooled bucket scan: spread ranges to bucket owners (DORIS-24902)" This reverts commit 1ebdfd2ab30. The scan-spread pair (ranges to owners + non-serial scan flag) breaks runtime filter correctness: BE engages the local RF merge path based on the target scan's is_serial_operator flag (RuntimeFilter.toThrift hasSerialTargets -> needShuffle). Flipping the scan flag silently turns the merge off while the upgraded join's build is hash-sliced, so per-instance partial IN/MIN_MAX filters get applied unmerged and drop legitimate rows (3-BE repro: forced 'IN,MIN_MAX' lost 96% of rows on a small join, 3.9K of 200M on a big one — loss scales with how early the wrong filter arrives). Bring it back together with a truthful needShuffle (walk the plan from the RF builder to the target for an intervening LocalExchangeNode instead of inferring from serial flags), which also needs the BE merge-engagement semantics verified. --- .../job/UnassignedScanBucketOlapTableJob.java | 15 ++--------- .../java/org/apache/doris/planner/ScanNode.java | 29 ---------------------- 2 files changed, 2 insertions(+), 42 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/UnassignedScanBucketOlapTableJob.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/UnassignedScanBucketOlapTableJob.java index 24c96fd6960..b7f654cd023 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/UnassignedScanBucketOlapTableJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/UnassignedScanBucketOlapTableJob.java @@ -217,24 +217,13 @@ public class UnassignedScanBucketOlapTableJob extends AbstractUnassignedScanJob if (nonSerialScanSource.isEmpty()) { List<BucketScanSource> assignedJoinBuckets = (List) serialScanSource.parallelize(scanNodes, instanceNum); - // With the FE local shuffle planner's bucket upgrade enabled, give every - // bucket-owner instance its own buckets' scan ranges so the scan runs in - // parallel across owners, instead of funneling all ranges into instance 0 - // (the serial-scan constant that dominates pooled bucket fragments). The - // planner=false dest path (sortDestinationInstancesByBuckets) depends on the - // first instance holding all buckets, so this must stay gated. - boolean spreadScanRanges = context.getSessionVariable().isEnableLocalShuffle() - && context.getSessionVariable().isEnableLocalShufflePlanner() - && context.getSessionVariable().getLocalShuffleBucketUpgradeRatio() > 1.0; for (int i = 0; i < assignedJoinBuckets.size(); i++) { BucketScanSource assignedJoinBucket = assignedJoinBuckets.get(i); LocalShuffleBucketJoinAssignedJob instance = new LocalShuffleBucketJoinAssignedJob( instances.size(), shareScanId, context.nextInstanceId(), this, worker, - // spread mode: each instance scans its assigned join buckets; - // legacy mode: only first instance to scan all data - spreadScanRanges ? assignedJoinBucket - : (i == 0 ? serialScanSource : emptyShareScanSource), + // only first instance to scan all data + i == 0 ? serialScanSource : emptyShareScanSource, // but join can assign to multiple instances Utils.fastToImmutableSet(assignedJoinBucket.bucketIndexToScanNodeToTablets.keySet()) ); diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/ScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/ScanNode.java index 9b57474639f..e69d3101548 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/ScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/ScanNode.java @@ -749,35 +749,6 @@ public abstract class ScanNode extends PlanNode implements SplitGenerator { || context.getSessionVariable().isForceToLocalShuffle(); } - /** - * Spread-scan mode (mirrors the gate in - * {@code UnassignedScanBucketOlapTableJob.assignLocalShuffleJobs}): when the FE local - * shuffle planner's bucket upgrade is enabled, a pooled bucket-assigned fragment gives - * every bucket-owner instance its own buckets' scan ranges. The scan must then NOT be - * reported serial — BE runs one scan task per instance, and a serial flag would leave - * only instance 0's ranges scanned (silently dropping the other buckets). - * - * Plain pooled fragments (non-bucket assignment) keep all ranges on instance 0 and - * still rely on the serial flag + PASSTHROUGH fan-out, so the bucket-fragment shape - * check ({@code hasColocatePlanNode || hasBucketShuffleNode}, the same predicate - * UnassignedJobBuilder uses to pick the bucket job) is essential. - * - * Note this does NOT touch {@link #isSerialNode()}: pooling decisions - * (fragment.useSerialSource via hasSerialScanNode) stay unchanged. - */ - @Override - public boolean isSerialOperatorOnBe(ConnectContext context) { - if (context != null - && context.getSessionVariable().isEnableLocalShuffle() - && context.getSessionVariable().isEnableLocalShufflePlanner() - && context.getSessionVariable().getLocalShuffleBucketUpgradeRatio() > 1.0 - && fragment != null - && (fragment.hasColocatePlanNode() || fragment.hasBucketShuffleNode())) { - return false; - } - return super.isSerialOperatorOnBe(context); - } - @Override public boolean hasSerialScanChildren() { return isSerialNode(); --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
