This is an automated email from the ASF dual-hosted git repository.

englefly pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 1faf5734bc3 [feat](nereids)using salt-join automatically when 
encountering skew join (#54207)
1faf5734bc3 is described below

commit 1faf5734bc3b92ee44282d3213fad7f5cc834933
Author: minghong <[email protected]>
AuthorDate: Fri Aug 29 15:21:50 2025 +0800

    [feat](nereids)using salt-join automatically when encountering skew join 
(#54207)
    
    ### What problem does this PR solve?
    When encountering a data-skewed join, there are currently two
    optimization methods: using salt-join or using broadcast join.
    We detect data skew during the RBO phase. If the right table is
    relatively large, we will automatically add salt, else we will and 
broadcast hint.
---
 .../doris/nereids/jobs/executor/Rewriter.java      |   6 +-
 .../doris/nereids/rules/rewrite/SaltJoin.java      |  33 ++++--
 .../doris/nereids/rules/rewrite/SkewJoin.java      | 117 +++++++++++++++++++++
 .../doris/nereids/rules/rewrite/StatsDerive.java   |   1 +
 .../java/org/apache/doris/qe/SessionVariable.java  |   2 +-
 .../data/nereids_rules_p0/skew_join/skew_join.out  | Bin 0 -> 761 bytes
 .../nereids_rules_p0/skew_join/skew_join.groovy    |  53 ++++++++++
 7 files changed, 199 insertions(+), 13 deletions(-)

diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/jobs/executor/Rewriter.java 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/jobs/executor/Rewriter.java
index 7f63e8d73f8..447c58311a9 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/jobs/executor/Rewriter.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/jobs/executor/Rewriter.java
@@ -151,6 +151,7 @@ import org.apache.doris.nereids.rules.rewrite.SaltJoin;
 import org.apache.doris.nereids.rules.rewrite.SetPreAggStatus;
 import org.apache.doris.nereids.rules.rewrite.SimplifyEncodeDecode;
 import org.apache.doris.nereids.rules.rewrite.SimplifyWindowExpression;
+import org.apache.doris.nereids.rules.rewrite.SkewJoin;
 import org.apache.doris.nereids.rules.rewrite.SplitLimit;
 import org.apache.doris.nereids.rules.rewrite.SplitMultiDistinct;
 import org.apache.doris.nereids.rules.rewrite.SumLiteralRewrite;
@@ -564,8 +565,6 @@ public class Rewriter extends AbstractBatchJobExecutor {
                         bottomUp(new EliminateNotNull()),
                         topDown(new ConvertInnerOrCrossJoin())
                 ),
-                topic("set initial join order",
-                        bottomUp(ImmutableList.of(new InitJoinOrder()))),
                 topic("Set operation optimization",
                         topic("",
                                 cascadesContext -> 
cascadesContext.rewritePlanContainsTypes(SetOperation.class),
@@ -731,6 +730,9 @@ public class Rewriter extends AbstractBatchJobExecutor {
                                 new PushDownProjectThroughLimit(),
                                 new MergeProjectable())
                 ),
+                topic("set initial join order",
+                        bottomUp(ImmutableList.of(new InitJoinOrder())),
+                        topDown(new SkewJoin())),
                 topic("agg rewrite",
                     // these rules should be put after mv optimization to 
avoid mv matching fail
                     topDown(new SumLiteralRewrite(),
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/SaltJoin.java 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/SaltJoin.java
index 00421ac0a6c..1227d63ebc8 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/SaltJoin.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/SaltJoin.java
@@ -60,6 +60,7 @@ import org.apache.doris.nereids.types.IntegerType;
 import org.apache.doris.nereids.types.TinyIntType;
 import org.apache.doris.nereids.util.TypeCoercionUtils;
 import org.apache.doris.nereids.util.Utils;
+import org.apache.doris.qe.ConnectContext;
 
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableSet;
@@ -138,7 +139,17 @@ public class SaltJoin extends OneRewriteRuleFactory {
     }
 
     private static Plan transform(MatchingContext<LogicalJoin<Plan, Plan>> 
ctx) {
-        LogicalJoin<Plan, Plan> join = ctx.root;
+        return transform(ctx.root);
+    }
+
+    /**
+     * add salt
+     */
+    public static Plan transform(LogicalJoin<Plan, Plan> join) {
+        if (ConnectContext.get() == null) {
+            return null;
+        }
+
         DistributeHint hint = join.getDistributeHint();
         if (hint.distributeType != DistributeType.SHUFFLE_RIGHT) {
             return null;
@@ -152,7 +163,7 @@ public class SaltJoin extends OneRewriteRuleFactory {
                 || join.getJoinType().isRightOuterJoin() && 
!join.right().getOutput().contains((Slot) skewExpr)) {
             return null;
         }
-        int factor = getSaltFactor(ctx);
+        int factor = getSaltFactor();
         Optional<Expression> literalType = 
TypeCoercionUtils.characterLiteralTypeCoercion(String.valueOf(factor),
                 TinyIntType.INSTANCE);
         if (!literalType.isPresent()) {
@@ -190,19 +201,20 @@ public class SaltJoin extends OneRewriteRuleFactory {
         DataType type = literalType.get().getDataType();
         LogicalProject<Plan> rightProject;
         LogicalProject<Plan> leftProject;
+        StatementContext statementContext = 
ConnectContext.get().getStatementContext();
         switch (join.getJoinType()) {
             case INNER_JOIN:
             case LEFT_OUTER_JOIN:
                 leftProject = addRandomSlot(leftSkewExpr, skewSideValues, 
join.left(), factor, type,
-                        ctx.statementContext);
+                        statementContext);
                 rightProject = expandSkewValueRows(rightSkewExpr, 
expandSideValues, join.right(), factor, type,
-                        ctx.statementContext);
+                        statementContext);
                 break;
             case RIGHT_OUTER_JOIN:
                 leftProject = expandSkewValueRows(leftSkewExpr, 
expandSideValues, join.left(), factor, type,
-                        ctx.statementContext);
+                        statementContext);
                 rightProject = addRandomSlot(rightSkewExpr, skewSideValues, 
join.right(), factor, type,
-                        ctx.statementContext);
+                        statementContext);
                 break;
             default:
                 return null;
@@ -322,12 +334,13 @@ public class SaltJoin extends OneRewriteRuleFactory {
         return new LogicalProject<>(namedExpressionsBuilder.build(), 
rightJoin);
     }
 
-    private static int getSaltFactor(MatchingContext<LogicalJoin<Plan, Plan>> 
ctx) {
-        int factor = 
ctx.connectContext.getStatementContext().getConnectContext()
+    private static int getSaltFactor() {
+        ConnectContext connectContext = ConnectContext.get();
+        int factor = connectContext.getStatementContext().getConnectContext()
                 .getSessionVariable().skewRewriteJoinSaltExplodeFactor;
         if (factor == 0) {
-            int beNumber = Math.max(1, 
ctx.connectContext.getEnv().getClusterInfo().getBackendsNumber(true));
-            int parallelInstance = Math.max(1, 
ctx.connectContext.getSessionVariable().getParallelExecInstanceNum());
+            int beNumber = Math.max(1, 
connectContext.getEnv().getClusterInfo().getBackendsNumber(true));
+            int parallelInstance = Math.max(1, 
connectContext.getSessionVariable().getParallelExecInstanceNum());
             factor = (int) Math.min((long) beNumber * parallelInstance * 
SALT_FACTOR, Integer.MAX_VALUE);
         }
         return Math.max(factor, 1);
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/SkewJoin.java 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/SkewJoin.java
new file mode 100644
index 00000000000..dc75dd9f362
--- /dev/null
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/SkewJoin.java
@@ -0,0 +1,117 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.nereids.rules.rewrite;
+
+import org.apache.doris.nereids.hint.DistributeHint;
+import org.apache.doris.nereids.hint.JoinSkewInfo;
+import org.apache.doris.nereids.pattern.MatchingContext;
+import org.apache.doris.nereids.rules.Rule;
+import org.apache.doris.nereids.rules.RuleType;
+import org.apache.doris.nereids.rules.rewrite.StatsDerive.DeriveContext;
+import org.apache.doris.nereids.trees.expressions.EqualPredicate;
+import org.apache.doris.nereids.trees.expressions.Expression;
+import org.apache.doris.nereids.trees.plans.AbstractPlan;
+import org.apache.doris.nereids.trees.plans.DistributeType;
+import org.apache.doris.nereids.trees.plans.Plan;
+import org.apache.doris.nereids.trees.plans.logical.LogicalJoin;
+import org.apache.doris.qe.ConnectContext;
+import org.apache.doris.qe.SessionVariable;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * When encountering a data-skewed join, there are currently two optimization 
methods:
+ * using salt-join or using broadcast join.
+ * If we detect data skew during the RBO phase and the right table is 
relatively large, we will automatically add salt.
+ *
+ * Depends on InitJoinOrder rule
+ */
+public class SkewJoin extends OneRewriteRuleFactory {
+
+    @Override
+    public Rule build() {
+        return logicalJoin()
+                .when(join -> join.getJoinType().isOneSideOuterJoin()
+                        || join.getJoinType().isInnerJoin())
+                .when(join -> join.getDistributeHint().distributeType == 
DistributeType.NONE)
+                .whenNot(LogicalJoin::isMarkJoin)
+                .thenApply(SkewJoin::transform).toRule(RuleType.SALT_JOIN);
+    }
+
+    private static Plan transform(MatchingContext<LogicalJoin<Plan, Plan>> 
ctx) {
+        if (ConnectContext.get() == null) {
+            return null;
+        }
+        StatsDerive derive = new StatsDerive(false);
+
+        LogicalJoin<Plan, Plan> join = ctx.root;
+        Expression skewExpr = null;
+        List<Expression> hotValues = new ArrayList<>();
+        if (join.getHashJoinConjuncts().size() != 1) {
+            return null;
+        }
+        AbstractPlan left = (AbstractPlan) join.left();
+        if (left.getStats() == null) {
+            left.accept(derive, new DeriveContext());
+        }
+        AbstractPlan right = (AbstractPlan) join.right();
+        if (right.getStats() == null) {
+            right.accept(derive, new DeriveContext());
+        }
+
+        EqualPredicate equal = (EqualPredicate) 
join.getHashJoinConjuncts().get(0);
+        if (join.left().getOutputSet().contains(equal.right())) {
+            equal = equal.commute();
+        }
+        if (join.getJoinType().isInnerJoin() || 
join.getJoinType().isLeftOuterJoin()) {
+            Expression leftEqHand = equal.child(0);
+            if (left.getStats().findColumnStatistics(leftEqHand) != null
+                    && 
left.getStats().findColumnStatistics(leftEqHand).getHotValues() != null) {
+                skewExpr = leftEqHand;
+                
hotValues.addAll(left.getStats().findColumnStatistics(leftEqHand).getHotValues().keySet());
+            }
+        } else if (join.getJoinType().isRightOuterJoin()) {
+            Expression rightEqHand = equal.child(1);
+            if (right.getStats().findColumnStatistics(rightEqHand) != null
+                    && 
right.getStats().findColumnStatistics(rightEqHand).getHotValues() != null) {
+                skewExpr = rightEqHand;
+                
hotValues.addAll(right.getStats().findColumnStatistics(rightEqHand).getHotValues().keySet());
+            }
+        } else {
+            return null;
+        }
+        if (skewExpr == null || hotValues.isEmpty()) {
+            return null;
+        }
+
+        SessionVariable sessionVariable = 
ConnectContext.get().getSessionVariable();
+        // broadcast join for small right table
+        // salt join for large right table
+        if (right.getStats().getRowCount() < 
sessionVariable.getBroadcastRowCountLimit() / 100) {
+            DistributeHint hint = new 
DistributeHint(DistributeType.BROADCAST_RIGHT);
+            join.setHint(hint);
+            return join;
+        } else {
+            DistributeHint hint = new 
DistributeHint(DistributeType.SHUFFLE_RIGHT,
+                    new JoinSkewInfo(skewExpr, hotValues, false));
+            join.setHint(hint);
+            return SaltJoin.transform(join);
+        }
+    }
+}
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/StatsDerive.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/StatsDerive.java
index 362dcc8f8a5..e3e06c23e6b 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/StatsDerive.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/StatsDerive.java
@@ -68,6 +68,7 @@ import java.util.List;
  */
 public class StatsDerive extends PlanVisitor<Statistics, 
StatsDerive.DeriveContext> implements CustomRewriter {
 
+    // when deepDerive is true, even nodes already have stats, we still derive 
them.
     private final boolean deepDerive;
 
     /**
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
index 55d130e945e..ea03ea64b9c 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
@@ -1707,7 +1707,7 @@ public class SessionVariable implements Serializable, 
Writable {
     private double leftSemiOrAntiProbeFactor = 0.05;
 
     @VariableMgr.VarAttr(name = BROADCAST_ROW_COUNT_LIMIT, needForward = true)
-    private double broadcastRowCountLimit = 30000000;
+    private double broadcastRowCountLimit = 30_000_000;
 
     @VariableMgr.VarAttr(name = BROADCAST_HASHTABLE_MEM_LIMIT_PERCENTAGE, 
needForward = true)
     private double broadcastHashtableMemLimitPercentage = 0.2;
diff --git a/regression-test/data/nereids_rules_p0/skew_join/skew_join.out 
b/regression-test/data/nereids_rules_p0/skew_join/skew_join.out
new file mode 100644
index 00000000000..27251a5d860
Binary files /dev/null and 
b/regression-test/data/nereids_rules_p0/skew_join/skew_join.out differ
diff --git a/regression-test/suites/nereids_rules_p0/skew_join/skew_join.groovy 
b/regression-test/suites/nereids_rules_p0/skew_join/skew_join.groovy
new file mode 100644
index 00000000000..a395c3e546f
--- /dev/null
+++ b/regression-test/suites/nereids_rules_p0/skew_join/skew_join.groovy
@@ -0,0 +1,53 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+suite("skew_join") {
+    sql """
+        drop table if exists t1;
+        create table t1 (
+            id int,
+            value int
+        )
+        properties("replication_num"="1");
+
+        insert into t1 values (1, 1);
+
+        drop table if exists t2;
+        create table t2 (
+            id int,
+            value int
+        )
+        properties("replication_num"="1");
+
+        insert into t2 values (1, 2);
+
+        alter table t1 modify column id set stats ('row_count' = '1000000000', 
'hot_values'='1 :80');
+
+        set runtime_filter_mode='OFF';
+    """
+
+
+    // big right: salt join
+    sql "alter table t2 modify column id set stats ('row_count' = '1000000');"
+    qt_salt_shape "explain shape plan select * from t2 join t1 on t1.id=t2.id;"
+    qt_salt_exe "select * from t2 join t1 on t1.id=t2.id;"
+
+    // small right: bc join
+    sql "alter table t2 modify column id set stats ('row_count' = '10000');"
+    qt_bc_shape "explain shape plan select * from t2 join t1 on t1.id=t2.id;"
+    qt_bc_exe "select * from t2 join t1 on t1.id=t2.id;"
+}
\ No newline at end of file


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to