This is an automated email from the ASF dual-hosted git repository.

924060929 pushed a commit to branch fe_local_shuffle_optimize
in repository https://gitbox.apache.org/repos/asf/doris.git

commit 7dd0b12a16401a2be48592a9b57361f6635e732e
Author: 924060929 <[email protected]>
AuthorDate: Thu Jun 4 19:43:13 2026 +0800

    [opt](nereids) make bucket shuffle downgrade threshold tunable
    
    The bucket-shuffle-join downgrade (give up bucket shuffle when the base 
table
    side's totalBucketNum < totalInstanceNum * 0.8 and fall back to shuffle 
join)
    hardcoded the 0.8 threshold. Make it a session variable
    bucket_shuffle_downgrade_ratio (default 0.8 keeps the original behavior;
    <= 0 never downgrades).
    
    Rationale: the downgrade and the FE local shuffle planner's bucket -> local
    hash upgrade (local_shuffle_bucket_upgrade_ratio) solve the same problem —
    few-bucket bucket shuffle runs at bucket-count parallelism — from opposite
    ends. The downgrade pays a full re-shuffle of BOTH sides for parallelism;
    the upgrade keeps the anchored side in place and restores parallelism with a
    local exchange. With the upgrade available the downgrade window can be
    narrowed or disabled; measured on a 4-bucket 20M x 20M single bucket-shuffle
    join (200M-row output, 16 instances): bucket 4-way 0.445s, upgraded 16-way
    0.40s, downgraded shuffle 0.395s (single-BE, where shuffle pays no real
    network — multi-BE favors bucket+upgrade further).
---
 .../properties/ChildrenPropertiesRegulator.java       | 10 +++++++++-
 .../java/org/apache/doris/qe/SessionVariable.java     | 19 +++++++++++++++++++
 2 files changed, 28 insertions(+), 1 deletion(-)

diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/ChildrenPropertiesRegulator.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/ChildrenPropertiesRegulator.java
index 845c87eea9c..e5e0d9d1bd0 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/ChildrenPropertiesRegulator.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/ChildrenPropertiesRegulator.java
@@ -311,7 +311,15 @@ public class ChildrenPropertiesRegulator extends 
PlanVisitor<List<List<PhysicalP
                 int bucketNum = 
candidate.getTable().getDefaultDistributionInfo().getBucketNum();
                 int totalBucketNum = prunedPartNum * bucketNum;
                 ConnectContext connectContext = ConnectContext.get();
-                return totalBucketNum < connectContext.getTotalInstanceNum() * 
0.8;
+                // <= 0 disables the downgrade entirely: with the FE local 
shuffle planner's
+                // bucket -> local-hash upgrade 
(local_shuffle_bucket_upgrade_ratio), few-bucket
+                // bucket shuffle no longer funnels, so keeping bucket shuffle 
(anchored side
+                // needs no re-shuffle) can beat downgrading to shuffle join.
+                double downgradeRatio = 
connectContext.getSessionVariable().getBucketShuffleDowngradeRatio();
+                if (downgradeRatio <= 0) {
+                    return false;
+                }
+                return totalBucketNum < connectContext.getTotalInstanceNum() * 
downgradeRatio;
             }
         }
     }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
index 66f80718ee4..2a5e895acf6 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
@@ -341,6 +341,8 @@ public class SessionVariable implements Serializable, 
Writable {
 
     public static final String LOCAL_SHUFFLE_BUCKET_UPGRADE_RATIO = 
"local_shuffle_bucket_upgrade_ratio";
 
+    public static final String BUCKET_SHUFFLE_DOWNGRADE_RATIO = 
"bucket_shuffle_downgrade_ratio";
+
     public static final String FORCE_TO_LOCAL_SHUFFLE = 
"force_to_local_shuffle";
 
     public static final String ENABLE_LOCAL_MERGE_SORT = 
"enable_local_merge_sort";
@@ -1654,6 +1656,15 @@ public class SessionVariable implements Serializable, 
Writable {
                     + " and negatives) disable the upgrade."}, needForward = 
true)
     private double localShuffleBucketUpgradeRatio = 1.5;
 
+    @VarAttrDef.VarAttr(
+            name = BUCKET_SHUFFLE_DOWNGRADE_RATIO, fuzzy = false, varType = 
VariableAnnotation.EXPERIMENTAL,
+            description = {"当一侧基表总桶数小于总实例数的该倍数时, 放弃bucket shuffle 
join降级为shuffle join。"
+                    + "小于等于0时永不降级。默认0.8保持原有行为",
+                    "Downgrade bucket shuffle join to shuffle join when the 
base table side's total"
+                    + " bucket count is less than total instance count times 
this ratio. Values <= 0"
+                    + " never downgrade. Default 0.8 keeps the original 
behavior."}, needForward = true)
+    private double bucketShuffleDowngradeRatio = 0.8;
+
     @VarAttrDef.VarAttr(name = ENABLE_LOCAL_MERGE_SORT)
     private boolean enableLocalMergeSort = true;
 
@@ -4818,6 +4829,14 @@ public class SessionVariable implements Serializable, 
Writable {
         this.localShuffleBucketUpgradeRatio = localShuffleBucketUpgradeRatio;
     }
 
+    public double getBucketShuffleDowngradeRatio() {
+        return bucketShuffleDowngradeRatio;
+    }
+
+    public void setBucketShuffleDowngradeRatio(double 
bucketShuffleDowngradeRatio) {
+        this.bucketShuffleDowngradeRatio = bucketShuffleDowngradeRatio;
+    }
+
     public boolean enablePushDownNoGroupAgg() {
         return enablePushDownNoGroupAgg;
     }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to