This is an automated email from the ASF dual-hosted git repository.

924060929 pushed a commit to branch fe_local_shuffle_optimize
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/fe_local_shuffle_optimize by 
this push:
     new 2dbc6f0fbc6 [opt](local shuffle) cores-aware bucket upgrade gate
2dbc6f0fbc6 is described below

commit 2dbc6f0fbc68ba2fda8643c716611f3ae615e962
Author: 924060929 <[email protected]>
AuthorDate: Thu Jun 4 22:12:03 2026 +0800

    [opt](local shuffle) cores-aware bucket upgrade gate
    
    The upgrade gate compared raw instance count to bucket count, but the real
    question is whether the bucket parallelism already saturates the backend's
    execution threads: local calibration (20M x 20M join, bucket gradient
    4/6/8/10/13 at 16 instances on an oversubscribed host) showed the bucket
    baseline keeps improving with bucket count while the upgraded plan stays
    flat — once min(buckets, threads) is close to min(instances, threads), the
    extra local exchange is a pure cost (-9% to -22%).
    
    Gate per worker on EFFECTIVE parallelism gain:
      min(instances, threads) > min(buckets, threads) * ratio
    where threads = pipelineExecutorSize from the heartbeat (fallback cpuCores;
    <= 1 means not reported yet and the capacity is treated as uncapped). Every
    worker owning buckets must clear the bar (conservative min-gain). On normal
    deployments (threads >= instances) this reduces exactly to the previous
    formula.
    
    UT 16/16, nereids_p0/local_shuffle 7/7 on 3 BEs.
---
 .../org/apache/doris/planner/AddLocalExchange.java | 50 ++++++++++++++++++++--
 1 file changed, 46 insertions(+), 4 deletions(-)

diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/planner/AddLocalExchange.java 
b/fe/fe-core/src/main/java/org/apache/doris/planner/AddLocalExchange.java
index 92925292f12..dcbabc856f2 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/AddLocalExchange.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/AddLocalExchange.java
@@ -126,13 +126,55 @@ public class AddLocalExchange {
             return false;
         }
         Map<Long, Set<Integer>> bucketsPerWorker = new HashMap<>();
+        Map<Long, Integer> instancesPerWorker = new HashMap<>();
+        Map<Long, Integer> coresPerWorker = new HashMap<>();
         for (AssignedJob job : instanceJobs) {
-            bucketsPerWorker.computeIfAbsent(job.getAssignedWorker().id(), k 
-> new HashSet<>())
+            long workerId = job.getAssignedWorker().id();
+            bucketsPerWorker.computeIfAbsent(workerId, k -> new HashSet<>())
                     .addAll(((LocalShuffleBucketJoinAssignedJob) 
job).getAssignedJoinBucketIndexes());
+            instancesPerWorker.merge(workerId, 1, Integer::sum);
+            coresPerWorker.computeIfAbsent(workerId, k -> 
resolveWorkerCores(job.getAssignedWorker()));
         }
-        long maxBucketsPerWorker = bucketsPerWorker.values().stream()
-                .mapToLong(Set::size).max().orElse(0);
-        return shouldUpgradeBucketParallelism(ratio, maxPerBeInstances, 
maxBucketsPerWorker);
+        // Conservative: every worker that owns buckets must clear the gain 
bar. The gain is
+        // computed on EFFECTIVE parallelism (capped by the BE's executor 
threads): when the
+        // bucket count already saturates the cores, adding instances cannot 
speed the join
+        // up and the extra local exchange is a pure cost.
+        boolean anyBuckets = false;
+        for (Map.Entry<Long, Set<Integer>> entry : 
bucketsPerWorker.entrySet()) {
+            int buckets = entry.getValue().size();
+            if (buckets == 0) {
+                continue;
+            }
+            anyBuckets = true;
+            int instances = instancesPerWorker.getOrDefault(entry.getKey(), 0);
+            int cores = coresPerWorker.getOrDefault(entry.getKey(), 
Integer.MAX_VALUE);
+            if (!shouldUpgradeBucketParallelism(ratio,
+                    Math.min(instances, cores), Math.min(buckets, cores))) {
+                return false;
+            }
+        }
+        return anyBuckets;
+    }
+
+    /**
+     * Effective execution threads of the worker's backend 
(pipelineExecutorSize, falling
+     * back to cpuCores). Values <= 1 mean the heartbeat has not reported yet 
— treat the
+     * capacity as unknown/uncapped rather than blocking the upgrade.
+     */
+    private static int resolveWorkerCores(
+            
org.apache.doris.nereids.trees.plans.distribute.worker.DistributedPlanWorker 
worker) {
+        if (worker instanceof 
org.apache.doris.nereids.trees.plans.distribute.worker.BackendWorker) {
+            org.apache.doris.system.Backend backend =
+                    
((org.apache.doris.nereids.trees.plans.distribute.worker.BackendWorker) 
worker).getBackend();
+            int size = backend.getPipelineExecutorSize();
+            if (size <= 1) {
+                size = backend.getCputCores();
+            }
+            if (size > 1) {
+                return size;
+            }
+        }
+        return Integer.MAX_VALUE;
     }
 
     /**


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to