This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git

commit 241a2fc25c3c8f22624b58eda51b2ad4d2c28680
Author: xzj7019 <[email protected]>
AuthorDate: Thu Feb 22 17:20:19 2024 +0800

    [nereids] downgrade bucket shuffle if tablet num < instance num (#31222)
---
 .../properties/ChildrenPropertiesRegulator.java    | 58 ++++++++++++++++++++++
 .../java/org/apache/doris/qe/SessionVariable.java  |  9 ++++
 2 files changed, 67 insertions(+)

diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/ChildrenPropertiesRegulator.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/ChildrenPropertiesRegulator.java
index 3dfa2615af0..a70d72c565c 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/ChildrenPropertiesRegulator.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/ChildrenPropertiesRegulator.java
@@ -30,6 +30,7 @@ import org.apache.doris.nereids.trees.expressions.Expression;
 import org.apache.doris.nereids.trees.expressions.SlotReference;
 import 
org.apache.doris.nereids.trees.expressions.functions.agg.MultiDistinction;
 import org.apache.doris.nereids.trees.plans.AggMode;
+import org.apache.doris.nereids.trees.plans.GroupPlan;
 import org.apache.doris.nereids.trees.plans.JoinType;
 import org.apache.doris.nereids.trees.plans.Plan;
 import org.apache.doris.nereids.trees.plans.SortPhase;
@@ -39,6 +40,7 @@ import 
org.apache.doris.nereids.trees.plans.physical.PhysicalFilter;
 import org.apache.doris.nereids.trees.plans.physical.PhysicalHashAggregate;
 import org.apache.doris.nereids.trees.plans.physical.PhysicalHashJoin;
 import org.apache.doris.nereids.trees.plans.physical.PhysicalNestedLoopJoin;
+import org.apache.doris.nereids.trees.plans.physical.PhysicalOlapScan;
 import org.apache.doris.nereids.trees.plans.physical.PhysicalPartitionTopN;
 import org.apache.doris.nereids.trees.plans.physical.PhysicalProject;
 import org.apache.doris.nereids.trees.plans.physical.PhysicalSetOperation;
@@ -189,6 +191,41 @@ public class ChildrenPropertiesRegulator extends 
PlanVisitor<Boolean, Void> {
         return true;
     }
 
+    private boolean isBucketShuffleDownGrade(Plan oneSidePlan, 
DistributionSpecHash otherSideSpec) {
+        // improper to do bucket shuffle join:
+        // oneSide:
+        // 1. base table
+        // 2. single partition after pruning
+        // 3. tablets' number is small enough (< paraInstanceNum)
+        // otherSide: ShuffleType.EXECUTION_BUCKETED
+        boolean isBucketShuffleDownGrade = 
ConnectContext.get().getSessionVariable().isEnableBucketShuffleDownGrade();
+        if (!isBucketShuffleDownGrade) {
+            return false;
+        } else if (otherSideSpec.getShuffleType() != 
ShuffleType.EXECUTION_BUCKETED) {
+            return false;
+        } else {
+            int paraNum = Math.max(1, 
ConnectContext.get().getSessionVariable().getParallelExecInstanceNum());
+            if (((GroupPlan) 
oneSidePlan).getGroup().getPhysicalExpressions().isEmpty()) {
+                return false;
+            } else {
+                Plan plan = ((GroupPlan) 
oneSidePlan).getGroup().getPhysicalExpressions().get(0).getPlan();
+                while ((plan instanceof PhysicalProject || plan instanceof 
PhysicalFilter)
+                        && !((GroupPlan) 
plan.child(0)).getGroup().getPhysicalExpressions().isEmpty()) {
+                    plan = ((GroupPlan) 
plan.child(0)).getGroup().getPhysicalExpressions().get(0).getPlan();
+                }
+                if (plan != null && plan instanceof PhysicalOlapScan
+                        && ((PhysicalOlapScan) 
plan).getSelectedPartitionIds().size() <= 1
+                        && ((PhysicalOlapScan) plan).getTable() != null
+                        && ((PhysicalOlapScan) 
plan).getTable().getDefaultDistributionInfo() != null
+                        && ((PhysicalOlapScan) 
plan).getTable().getDefaultDistributionInfo().getBucketNum() < paraNum) {
+                    return true;
+                } else {
+                    return false;
+                }
+            }
+        }
+    }
+
     private boolean couldNotRightBucketShuffleJoin(JoinType joinType) {
         return joinType == JoinType.RIGHT_ANTI_JOIN
                 || joinType == JoinType.RIGHT_OUTER_JOIN
@@ -207,6 +244,9 @@ public class ChildrenPropertiesRegulator extends 
PlanVisitor<Boolean, Void> {
         DistributionSpec leftDistributionSpec = 
childrenProperties.get(0).getDistributionSpec();
         DistributionSpec rightDistributionSpec = 
childrenProperties.get(1).getDistributionSpec();
 
+        Plan leftChild = hashJoin.child(0);
+        Plan rightChild = hashJoin.child(1);
+
         // broadcast do not need regular
         if (rightDistributionSpec instanceof DistributionSpecReplicated) {
             return true;
@@ -238,6 +278,24 @@ public class ChildrenPropertiesRegulator extends 
PlanVisitor<Boolean, Void> {
                     ShuffleType.EXECUTION_BUCKETED, leftHashSpec, 
rightHashSpec,
                     (DistributionSpecHash) 
requiredProperties.get(0).getDistributionSpec(),
                     (DistributionSpecHash) 
requiredProperties.get(1).getDistributionSpec()));
+        } else if (isBucketShuffleDownGrade(leftChild, rightHashSpec)) {
+            updatedForLeft = Optional.of(calAnotherSideRequired(
+                    ShuffleType.EXECUTION_BUCKETED, leftHashSpec, leftHashSpec,
+                    (DistributionSpecHash) 
requiredProperties.get(0).getDistributionSpec(),
+                    (DistributionSpecHash) 
requiredProperties.get(0).getDistributionSpec()));
+            updatedForRight = Optional.of(calAnotherSideRequired(
+                    ShuffleType.EXECUTION_BUCKETED, leftHashSpec, 
rightHashSpec,
+                    (DistributionSpecHash) 
requiredProperties.get(0).getDistributionSpec(),
+                    (DistributionSpecHash) 
requiredProperties.get(1).getDistributionSpec()));
+        } else if (isBucketShuffleDownGrade(rightChild, leftHashSpec)) {
+            updatedForLeft = Optional.of(calAnotherSideRequired(
+                    ShuffleType.EXECUTION_BUCKETED, rightHashSpec, 
leftHashSpec,
+                    (DistributionSpecHash) 
requiredProperties.get(1).getDistributionSpec(),
+                    (DistributionSpecHash) 
requiredProperties.get(0).getDistributionSpec()));
+            updatedForRight = Optional.of(calAnotherSideRequired(
+                    ShuffleType.EXECUTION_BUCKETED, rightHashSpec, 
rightHashSpec,
+                    (DistributionSpecHash) 
requiredProperties.get(1).getDistributionSpec(),
+                    (DistributionSpecHash) 
requiredProperties.get(1).getDistributionSpec()));
         } else if ((leftHashSpec.getShuffleType() == ShuffleType.NATURAL
                 && rightHashSpec.getShuffleType() == ShuffleType.NATURAL)) {
             updatedForRight = Optional.of(calAnotherSideRequired(
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
index 4b4d108650b..0124efc4299 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
@@ -242,6 +242,8 @@ public class SessionVariable implements Serializable, 
Writable {
 
     public static final String ENABLE_AGG_STATE = "enable_agg_state";
 
+    public static final String ENABLE_BUCKET_SHUFFLE_DOWNGRADE = 
"enable_bucket_shuffle_downgrade";
+
     public static final String ENABLE_RPC_OPT_FOR_PIPELINE = 
"enable_rpc_opt_for_pipeline";
 
     public static final String ENABLE_SINGLE_DISTINCT_COLUMN_OPT = 
"enable_single_distinct_column_opt";
@@ -733,6 +735,9 @@ public class SessionVariable implements Serializable, 
Writable {
     @VariableMgr.VarAttr(name = ENABLE_BUCKET_SHUFFLE_JOIN, varType = 
VariableAnnotation.EXPERIMENTAL_ONLINE)
     public boolean enableBucketShuffleJoin = true;
 
+    @VariableMgr.VarAttr(name = ENABLE_BUCKET_SHUFFLE_DOWNGRADE, needForward = 
true)
+    public boolean enableBucketShuffleDownGrade = false;
+
     @VariableMgr.VarAttr(name = PREFER_JOIN_METHOD)
     public String preferJoinMethod = "broadcast";
 
@@ -2096,6 +2101,10 @@ public class SessionVariable implements Serializable, 
Writable {
         return enableBucketShuffleJoin;
     }
 
+    public boolean isEnableBucketShuffleDownGrade() {
+        return enableBucketShuffleDownGrade;
+    }
+
     public boolean isEnableOdbcTransaction() {
         return enableOdbcTransaction;
     }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to