This is an automated email from the ASF dual-hosted git repository.

924060929 pushed a commit to branch fe_local_shuffle_optimize
in repository https://gitbox.apache.org/repos/asf/doris.git

commit 192729a4442b63b057cda8feb374097f7ad8f756
Author: 924060929 <[email protected]>
AuthorDate: Thu Jun 4 20:19:13 2026 +0800

    Revert "[opt](local shuffle) parallel pooled bucket scan: spread ranges to 
bucket owners (DORIS-24902)"
    
    This reverts commit 1ebdfd2ab30.
    
    The scan-spread pair (ranges to owners + non-serial scan flag) breaks 
runtime
    filter correctness: BE engages the local RF merge path based on the target
    scan's is_serial_operator flag (RuntimeFilter.toThrift hasSerialTargets ->
    needShuffle). Flipping the scan flag silently turns the merge off while the
    upgraded join's build is hash-sliced, so per-instance partial IN/MIN_MAX
    filters get applied unmerged and drop legitimate rows (3-BE repro: forced
    'IN,MIN_MAX' lost 96% of rows on a small join, 3.9K of 200M on a big one —
    loss scales with how early the wrong filter arrives).
    
    Bring it back together with a truthful needShuffle (walk the plan from the 
RF
    builder to the target for an intervening LocalExchangeNode instead of
    inferring from serial flags), which also needs the BE merge-engagement
    semantics verified.
---
 .../job/UnassignedScanBucketOlapTableJob.java      | 15 ++---------
 .../java/org/apache/doris/planner/ScanNode.java    | 29 ----------------------
 2 files changed, 2 insertions(+), 42 deletions(-)

diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/UnassignedScanBucketOlapTableJob.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/UnassignedScanBucketOlapTableJob.java
index 24c96fd6960..b7f654cd023 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/UnassignedScanBucketOlapTableJob.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/UnassignedScanBucketOlapTableJob.java
@@ -217,24 +217,13 @@ public class UnassignedScanBucketOlapTableJob extends 
AbstractUnassignedScanJob
         if (nonSerialScanSource.isEmpty()) {
             List<BucketScanSource> assignedJoinBuckets
                     = (List) serialScanSource.parallelize(scanNodes, 
instanceNum);
-            // With the FE local shuffle planner's bucket upgrade enabled, 
give every
-            // bucket-owner instance its own buckets' scan ranges so the scan 
runs in
-            // parallel across owners, instead of funneling all ranges into 
instance 0
-            // (the serial-scan constant that dominates pooled bucket 
fragments). The
-            // planner=false dest path (sortDestinationInstancesByBuckets) 
depends on the
-            // first instance holding all buckets, so this must stay gated.
-            boolean spreadScanRanges = 
context.getSessionVariable().isEnableLocalShuffle()
-                    && 
context.getSessionVariable().isEnableLocalShufflePlanner()
-                    && 
context.getSessionVariable().getLocalShuffleBucketUpgradeRatio() > 1.0;
             for (int i = 0; i < assignedJoinBuckets.size(); i++) {
                 BucketScanSource assignedJoinBucket = 
assignedJoinBuckets.get(i);
                 LocalShuffleBucketJoinAssignedJob instance = new 
LocalShuffleBucketJoinAssignedJob(
                         instances.size(), shareScanId, 
context.nextInstanceId(),
                         this, worker,
-                        // spread mode: each instance scans its assigned join 
buckets;
-                        // legacy mode: only first instance to scan all data
-                        spreadScanRanges ? assignedJoinBucket
-                                : (i == 0 ? serialScanSource : 
emptyShareScanSource),
+                        // only first instance to scan all data
+                        i == 0 ? serialScanSource : emptyShareScanSource,
                         // but join can assign to multiple instances
                         
Utils.fastToImmutableSet(assignedJoinBucket.bucketIndexToScanNodeToTablets.keySet())
                 );
diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/ScanNode.java 
b/fe/fe-core/src/main/java/org/apache/doris/planner/ScanNode.java
index 9b57474639f..e69d3101548 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/ScanNode.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/ScanNode.java
@@ -749,35 +749,6 @@ public abstract class ScanNode extends PlanNode implements 
SplitGenerator {
                 || context.getSessionVariable().isForceToLocalShuffle();
     }
 
-    /**
-     * Spread-scan mode (mirrors the gate in
-     * {@code UnassignedScanBucketOlapTableJob.assignLocalShuffleJobs}): when 
the FE local
-     * shuffle planner's bucket upgrade is enabled, a pooled bucket-assigned 
fragment gives
-     * every bucket-owner instance its own buckets' scan ranges. The scan must 
then NOT be
-     * reported serial — BE runs one scan task per instance, and a serial flag 
would leave
-     * only instance 0's ranges scanned (silently dropping the other buckets).
-     *
-     * Plain pooled fragments (non-bucket assignment) keep all ranges on 
instance 0 and
-     * still rely on the serial flag + PASSTHROUGH fan-out, so the 
bucket-fragment shape
-     * check ({@code hasColocatePlanNode || hasBucketShuffleNode}, the same 
predicate
-     * UnassignedJobBuilder uses to pick the bucket job) is essential.
-     *
-     * Note this does NOT touch {@link #isSerialNode()}: pooling decisions
-     * (fragment.useSerialSource via hasSerialScanNode) stay unchanged.
-     */
-    @Override
-    public boolean isSerialOperatorOnBe(ConnectContext context) {
-        if (context != null
-                && context.getSessionVariable().isEnableLocalShuffle()
-                && context.getSessionVariable().isEnableLocalShufflePlanner()
-                && 
context.getSessionVariable().getLocalShuffleBucketUpgradeRatio() > 1.0
-                && fragment != null
-                && (fragment.hasColocatePlanNode() || 
fragment.hasBucketShuffleNode())) {
-            return false;
-        }
-        return super.isSerialOperatorOnBe(context);
-    }
-
     @Override
     public boolean hasSerialScanChildren() {
         return isSerialNode();


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to