This is an automated email from the ASF dual-hosted git repository.

924060929 pushed a commit to branch fe_local_shuffle_optimize
in repository https://gitbox.apache.org/repos/asf/doris.git

commit a72c363bf0b8b123918f77bf05a37d6415667ffe
Author: 924060929 <[email protected]>
AuthorDate: Thu Jun 4 21:15:05 2026 +0800

    Revert "Reapply [opt](local shuffle) parallel pooled bucket scan"
    
    Second landing attempt, second hidden contract: the RQG-bugs regression 
suite
    (Bug 20: use_serial_exchange=true + RIGHT OUTER self-join + GROUP BY, 3 BEs)
    loses rows with the spread active. The right-outer bucket fill-up path
    (fillUpInstances merging missing buckets into the FIRST instance's scan
    source) — like the planner-off destination mapping and the runtime filter
    serial-inference before it — assumes all pooled scan ranges live on
    instance 0.
    
    'All ranges on instance 0' is a load-bearing convention with at least four
    known consumers (planner-off dest mapping, RF merge inference, right-join
    bucket fill-up, serial-exchange semantics). Parallelizing the pooled scan is
    parked as its own project: audit every consumer of that convention first.
    The truthful RF merge signal (force_local_merge) stays — it is independently
    correct and keeps the RF path honest regardless of how the scan is laid out.
---
 .../job/UnassignedScanBucketOlapTableJob.java      | 15 ++---------
 .../java/org/apache/doris/planner/ScanNode.java    | 29 ----------------------
 2 files changed, 2 insertions(+), 42 deletions(-)

diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/UnassignedScanBucketOlapTableJob.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/UnassignedScanBucketOlapTableJob.java
index 24c96fd6960..b7f654cd023 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/UnassignedScanBucketOlapTableJob.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/UnassignedScanBucketOlapTableJob.java
@@ -217,24 +217,13 @@ public class UnassignedScanBucketOlapTableJob extends 
AbstractUnassignedScanJob
         if (nonSerialScanSource.isEmpty()) {
             List<BucketScanSource> assignedJoinBuckets
                     = (List) serialScanSource.parallelize(scanNodes, 
instanceNum);
-            // With the FE local shuffle planner's bucket upgrade enabled, 
give every
-            // bucket-owner instance its own buckets' scan ranges so the scan 
runs in
-            // parallel across owners, instead of funneling all ranges into 
instance 0
-            // (the serial-scan constant that dominates pooled bucket 
fragments). The
-            // planner=false dest path (sortDestinationInstancesByBuckets) 
depends on the
-            // first instance holding all buckets, so this must stay gated.
-            boolean spreadScanRanges = 
context.getSessionVariable().isEnableLocalShuffle()
-                    && 
context.getSessionVariable().isEnableLocalShufflePlanner()
-                    && 
context.getSessionVariable().getLocalShuffleBucketUpgradeRatio() > 1.0;
             for (int i = 0; i < assignedJoinBuckets.size(); i++) {
                 BucketScanSource assignedJoinBucket = 
assignedJoinBuckets.get(i);
                 LocalShuffleBucketJoinAssignedJob instance = new 
LocalShuffleBucketJoinAssignedJob(
                         instances.size(), shareScanId, 
context.nextInstanceId(),
                         this, worker,
-                        // spread mode: each instance scans its assigned join 
buckets;
-                        // legacy mode: only first instance to scan all data
-                        spreadScanRanges ? assignedJoinBucket
-                                : (i == 0 ? serialScanSource : 
emptyShareScanSource),
+                        // only first instance to scan all data
+                        i == 0 ? serialScanSource : emptyShareScanSource,
                         // but join can assign to multiple instances
                         
Utils.fastToImmutableSet(assignedJoinBucket.bucketIndexToScanNodeToTablets.keySet())
                 );
diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/ScanNode.java 
b/fe/fe-core/src/main/java/org/apache/doris/planner/ScanNode.java
index 9b57474639f..e69d3101548 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/ScanNode.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/ScanNode.java
@@ -749,35 +749,6 @@ public abstract class ScanNode extends PlanNode implements 
SplitGenerator {
                 || context.getSessionVariable().isForceToLocalShuffle();
     }
 
-    /**
-     * Spread-scan mode (mirrors the gate in
-     * {@code UnassignedScanBucketOlapTableJob.assignLocalShuffleJobs}): when 
the FE local
-     * shuffle planner's bucket upgrade is enabled, a pooled bucket-assigned 
fragment gives
-     * every bucket-owner instance its own buckets' scan ranges. The scan must 
then NOT be
-     * reported serial — BE runs one scan task per instance, and a serial flag 
would leave
-     * only instance 0's ranges scanned (silently dropping the other buckets).
-     *
-     * Plain pooled fragments (non-bucket assignment) keep all ranges on 
instance 0 and
-     * still rely on the serial flag + PASSTHROUGH fan-out, so the 
bucket-fragment shape
-     * check ({@code hasColocatePlanNode || hasBucketShuffleNode}, the same 
predicate
-     * UnassignedJobBuilder uses to pick the bucket job) is essential.
-     *
-     * Note this does NOT touch {@link #isSerialNode()}: pooling decisions
-     * (fragment.useSerialSource via hasSerialScanNode) stay unchanged.
-     */
-    @Override
-    public boolean isSerialOperatorOnBe(ConnectContext context) {
-        if (context != null
-                && context.getSessionVariable().isEnableLocalShuffle()
-                && context.getSessionVariable().isEnableLocalShufflePlanner()
-                && 
context.getSessionVariable().getLocalShuffleBucketUpgradeRatio() > 1.0
-                && fragment != null
-                && (fragment.hasColocatePlanNode() || 
fragment.hasBucketShuffleNode())) {
-            return false;
-        }
-        return super.isSerialOperatorOnBe(context);
-    }
-
     @Override
     public boolean hasSerialScanChildren() {
         return isSerialNode();


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to