This is an automated email from the ASF dual-hosted git repository.

924060929 pushed a commit to branch fe_local_shuffle_optimize
in repository https://gitbox.apache.org/repos/asf/doris.git

commit eb8044e89fb3a67b8e95a6fccc78001a0f7d221d
Author: 924060929 <[email protected]>
AuthorDate: Thu Jun 4 21:09:34 2026 +0800

    Reapply "[opt](local shuffle) parallel pooled bucket scan: spread ranges to 
bucket owners"
    
    This reverts commit 192729a4442 (the revert): the runtime-filter interaction
    that forced the rollback is solved by the truthful force_local_merge signal
    in the previous commit (the scan's serial flag no longer carries the RF
    merge-engagement side channel).
    
    3-BE verification: single-join and stacked four-arm batteries (upgrade and
    bucket, each with and without forced runtime_filter_type='IN,MIN_MAX') all
    match the local-shuffle-off baseline, including the small-table forced-RF
    case that previously lost 96% of rows; upgraded+parallel-scan runs
    0.37-0.41s vs bucket 0.46-0.50s (+17%).
---
 .../job/UnassignedScanBucketOlapTableJob.java      | 15 +++++++++--
 .../java/org/apache/doris/planner/ScanNode.java    | 29 ++++++++++++++++++++++
 2 files changed, 42 insertions(+), 2 deletions(-)

diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/UnassignedScanBucketOlapTableJob.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/UnassignedScanBucketOlapTableJob.java
index b7f654cd023..24c96fd6960 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/UnassignedScanBucketOlapTableJob.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/UnassignedScanBucketOlapTableJob.java
@@ -217,13 +217,24 @@ public class UnassignedScanBucketOlapTableJob extends 
AbstractUnassignedScanJob
         if (nonSerialScanSource.isEmpty()) {
             List<BucketScanSource> assignedJoinBuckets
                     = (List) serialScanSource.parallelize(scanNodes, 
instanceNum);
+            // With the FE local shuffle planner's bucket upgrade enabled, 
give every
+            // bucket-owner instance its own buckets' scan ranges so the scan 
runs in
+            // parallel across owners, instead of funneling all ranges into 
instance 0
+            // (the serial-scan constant that dominates pooled bucket 
fragments). The
+            // planner=false dest path (sortDestinationInstancesByBuckets) 
depends on the
+            // first instance holding all buckets, so this must stay gated.
+            boolean spreadScanRanges = 
context.getSessionVariable().isEnableLocalShuffle()
+                    && 
context.getSessionVariable().isEnableLocalShufflePlanner()
+                    && 
context.getSessionVariable().getLocalShuffleBucketUpgradeRatio() > 1.0;
             for (int i = 0; i < assignedJoinBuckets.size(); i++) {
                 BucketScanSource assignedJoinBucket = 
assignedJoinBuckets.get(i);
                 LocalShuffleBucketJoinAssignedJob instance = new 
LocalShuffleBucketJoinAssignedJob(
                         instances.size(), shareScanId, 
context.nextInstanceId(),
                         this, worker,
-                        // only first instance to scan all data
-                        i == 0 ? serialScanSource : emptyShareScanSource,
+                        // spread mode: each instance scans its assigned join 
buckets;
+                        // legacy mode: only first instance to scan all data
+                        spreadScanRanges ? assignedJoinBucket
+                                : (i == 0 ? serialScanSource : 
emptyShareScanSource),
                         // but join can assign to multiple instances
                         
Utils.fastToImmutableSet(assignedJoinBucket.bucketIndexToScanNodeToTablets.keySet())
                 );
diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/ScanNode.java 
b/fe/fe-core/src/main/java/org/apache/doris/planner/ScanNode.java
index e69d3101548..9b57474639f 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/ScanNode.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/ScanNode.java
@@ -749,6 +749,35 @@ public abstract class ScanNode extends PlanNode implements 
SplitGenerator {
                 || context.getSessionVariable().isForceToLocalShuffle();
     }
 
+    /**
+     * Spread-scan mode (mirrors the gate in
+     * {@code UnassignedScanBucketOlapTableJob.assignLocalShuffleJobs}): when 
the FE local
+     * shuffle planner's bucket upgrade is enabled, a pooled bucket-assigned 
fragment gives
+     * every bucket-owner instance its own buckets' scan ranges. The scan must 
then NOT be
+     * reported serial — BE runs one scan task per instance, and a serial flag 
would leave
+     * only instance 0's ranges scanned (silently dropping the other buckets).
+     *
+     * Plain pooled fragments (non-bucket assignment) keep all ranges on 
instance 0 and
+     * still rely on the serial flag + PASSTHROUGH fan-out, so the 
bucket-fragment shape
+     * check ({@code hasColocatePlanNode || hasBucketShuffleNode}, the same 
predicate
+     * UnassignedJobBuilder uses to pick the bucket job) is essential.
+     *
+     * Note this does NOT touch {@link #isSerialNode()}: pooling decisions
+     * (fragment.useSerialSource via hasSerialScanNode) stay unchanged.
+     */
+    @Override
+    public boolean isSerialOperatorOnBe(ConnectContext context) {
+        if (context != null
+                && context.getSessionVariable().isEnableLocalShuffle()
+                && context.getSessionVariable().isEnableLocalShufflePlanner()
+                && 
context.getSessionVariable().getLocalShuffleBucketUpgradeRatio() > 1.0
+                && fragment != null
+                && (fragment.hasColocatePlanNode() || 
fragment.hasBucketShuffleNode())) {
+            return false;
+        }
+        return super.isSerialOperatorOnBe(context);
+    }
+
     @Override
     public boolean hasSerialScanChildren() {
         return isSerialNode();


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to