This is an automated email from the ASF dual-hosted git repository.
morrysnow pushed a commit to branch branch-2.0
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.0 by this push:
new 9e386c02ae7 [feat](Nereids) Reject Commutativity Swap for Nested Loop
Joins Affecting Parallelism (#34639) (#34996)
9e386c02ae7 is described below
commit 9e386c02ae726a7a806b954a3e3bd2f7e85b9dc2
Author: 谢健 <[email protected]>
AuthorDate: Tue May 21 10:17:19 2024 +0800
[feat](Nereids) Reject Commutativity Swap for Nested Loop Joins Affecting
Parallelism (#34639) (#34996)
pick from master #34639
This PR introduces a safeguard to prevent commutativity swaps in nested
loop joins that would convert a parallelizable join into a non-parallelizable
one, thereby preserving optimal query execution efficiency. By adding a
function that assesses the impact of such swaps on parallelism, the system
automatically rejects changes that would hinder performance, ensuring that
joins can continue to be executed in parallel to fully utilize system resources
and maintain high operational throughput.
---
.../rules/exploration/join/JoinCommute.java | 8 ++++++++
.../apache/doris/planner/NestedLoopJoinNode.java | 6 +++++-
.../rules/exploration/join/JoinCommuteTest.java | 22 +++++++++++++++++++++-
3 files changed, 34 insertions(+), 2 deletions(-)
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/exploration/join/JoinCommute.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/exploration/join/JoinCommute.java
index 8dd3e6abe71..e9656a1b62d 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/exploration/join/JoinCommute.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/exploration/join/JoinCommute.java
@@ -25,8 +25,11 @@ import org.apache.doris.nereids.trees.expressions.Not;
import org.apache.doris.nereids.trees.expressions.Slot;
import
org.apache.doris.nereids.trees.expressions.functions.scalar.BitmapContains;
import org.apache.doris.nereids.trees.plans.GroupPlan;
+import org.apache.doris.nereids.trees.plans.JoinType;
import org.apache.doris.nereids.trees.plans.Plan;
import org.apache.doris.nereids.trees.plans.logical.LogicalJoin;
+import org.apache.doris.nereids.util.JoinUtils;
+import org.apache.doris.planner.NestedLoopJoinNode;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.thrift.TRuntimeFilterType;
@@ -59,6 +62,11 @@ public class JoinCommute extends OneExplorationRuleFactory {
.whenNot(LogicalJoin::hasJoinHint)
.whenNot(join -> joinOrderMatchBitmapRuntimeFilterOrder(join))
.whenNot(LogicalJoin::isMarkJoin)
+ // For a nested loop join, if commutativity causes a join that
could originally be executed
+ // in parallel to become non-parallelizable, then we reject
this swap.
+ .whenNot(join -> JoinUtils.shouldNestedLoopJoin(join)
+ &&
NestedLoopJoinNode.canParallelize(JoinType.toJoinOperator(join.getJoinType()))
+ &&
!NestedLoopJoinNode.canParallelize(JoinType.toJoinOperator(join.getJoinType().swap())))
.then(join -> {
LogicalJoin<Plan, Plan> newJoin =
join.withTypeChildren(join.getJoinType().swap(),
join.right(), join.left());
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/planner/NestedLoopJoinNode.java
b/fe/fe-core/src/main/java/org/apache/doris/planner/NestedLoopJoinNode.java
index 05eb34a7815..c1ad5933492 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/NestedLoopJoinNode.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/NestedLoopJoinNode.java
@@ -71,12 +71,16 @@ public class NestedLoopJoinNode extends JoinNodeBase {
tupleIds.addAll(inner.getOutputTupleIds());
}
- public boolean canParallelize() {
+ public static boolean canParallelize(JoinOperator joinOp) {
return joinOp == JoinOperator.CROSS_JOIN || joinOp ==
JoinOperator.INNER_JOIN
|| joinOp == JoinOperator.LEFT_OUTER_JOIN || joinOp ==
JoinOperator.LEFT_SEMI_JOIN
|| joinOp == JoinOperator.LEFT_ANTI_JOIN || joinOp ==
JoinOperator.NULL_AWARE_LEFT_ANTI_JOIN;
}
+ public boolean canParallelize() {
+ return canParallelize(joinOp);
+ }
+
public void setJoinConjuncts(List<Expr> joinConjuncts) {
this.joinConjuncts = joinConjuncts;
}
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/nereids/rules/exploration/join/JoinCommuteTest.java
b/fe/fe-core/src/test/java/org/apache/doris/nereids/rules/exploration/join/JoinCommuteTest.java
index 20323d108e6..022256225b6 100644
---
a/fe/fe-core/src/test/java/org/apache/doris/nereids/rules/exploration/join/JoinCommuteTest.java
+++
b/fe/fe-core/src/test/java/org/apache/doris/nereids/rules/exploration/join/JoinCommuteTest.java
@@ -18,7 +18,9 @@
package org.apache.doris.nereids.rules.exploration.join;
import org.apache.doris.common.Pair;
+import org.apache.doris.nereids.trees.expressions.GreaterThan;
import org.apache.doris.nereids.trees.plans.JoinType;
+import org.apache.doris.nereids.trees.plans.logical.LogicalJoin;
import org.apache.doris.nereids.trees.plans.logical.LogicalOlapScan;
import org.apache.doris.nereids.trees.plans.logical.LogicalPlan;
import org.apache.doris.nereids.util.LogicalPlanBuilder;
@@ -27,11 +29,12 @@ import org.apache.doris.nereids.util.MemoTestUtils;
import org.apache.doris.nereids.util.PlanChecker;
import org.apache.doris.nereids.util.PlanConstructor;
+import com.google.common.collect.ImmutableList;
import org.junit.jupiter.api.Test;
public class JoinCommuteTest implements MemoPatternMatchSupported {
@Test
- public void testInnerJoinCommute() {
+ void testInnerJoinCommute() {
LogicalOlapScan scan1 = PlanConstructor.newLogicalOlapScan(0, "t1", 0);
LogicalOlapScan scan2 = PlanConstructor.newLogicalOlapScan(1, "t2", 0);
@@ -51,4 +54,21 @@ public class JoinCommuteTest implements
MemoPatternMatchSupported {
)
;
}
+
+ @Test
+ void testParallelJoinCommute() {
+ LogicalOlapScan scan1 = PlanConstructor.newLogicalOlapScan(0, "t1", 0);
+ LogicalOlapScan scan2 = PlanConstructor.newLogicalOlapScan(1, "t2", 0);
+
+ LogicalJoin<?, ?> join = (LogicalJoin<?, ?>) new
LogicalPlanBuilder(scan1)
+ .join(scan2, JoinType.LEFT_OUTER_JOIN, Pair.of(0, 0))
+ .build();
+ join = join.withJoinConjuncts(
+ ImmutableList.of(),
+ ImmutableList.of(new GreaterThan(scan1.getOutput().get(0),
scan2.getOutput().get(0))));
+
+ PlanChecker.from(MemoTestUtils.createConnectContext(), join)
+ .applyExploration(JoinCommute.BUSHY.build())
+ .printlnTree();
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]