This is an automated email from the ASF dual-hosted git repository.
kxiao pushed a commit to branch branch-2.0
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.0 by this push:
new 8d182a1029a [feature](Nereids): Pushdown LimitDistinct Through Join
(#25113) (#27288)
8d182a1029a is described below
commit 8d182a1029a03febd91e5a37cf3b90b340ba2d6c
Author: jakevin <[email protected]>
AuthorDate: Mon Nov 20 19:59:28 2023 +0800
[feature](Nereids): Pushdown LimitDistinct Through Join (#25113) (#27288)
Push down limit-distinct through left/right outer join or cross join.
such as select t1.c1 from t1 left join t2 on t1.c1 = t2.c1 order by t1.c1
limit 1;
---
.../doris/nereids/jobs/executor/Rewriter.java | 2 +
.../org/apache/doris/nereids/rules/RuleType.java | 3 +
.../rewrite/PushdownLimitDistinctThroughJoin.java | 109 +++++++++++++++++++++
.../rules/rewrite/PushdownTopNThroughJoin.java | 34 ++++---
.../trees/plans/logical/LogicalAggregate.java | 5 +
.../nereids/trees/plans/logical/LogicalLimit.java | 6 ++
.../nereids/trees/plans/logical/LogicalTopN.java | 7 ++
.../data/nereids_p0/join/test_limit_join.out | 23 +++++
.../suites/nereids_p0/join/test_limit_join.groovy | 105 ++++++++++++++++++++
9 files changed, 279 insertions(+), 15 deletions(-)
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/jobs/executor/Rewriter.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/jobs/executor/Rewriter.java
index a3f800340b9..fcbbdee6ee6 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/jobs/executor/Rewriter.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/jobs/executor/Rewriter.java
@@ -91,6 +91,7 @@ import
org.apache.doris.nereids.rules.rewrite.PushProjectIntoOneRowRelation;
import org.apache.doris.nereids.rules.rewrite.PushProjectThroughUnion;
import org.apache.doris.nereids.rules.rewrite.PushdownFilterThroughProject;
import org.apache.doris.nereids.rules.rewrite.PushdownLimit;
+import org.apache.doris.nereids.rules.rewrite.PushdownLimitDistinctThroughJoin;
import org.apache.doris.nereids.rules.rewrite.PushdownTopNThroughJoin;
import org.apache.doris.nereids.rules.rewrite.PushdownTopNThroughWindow;
import org.apache.doris.nereids.rules.rewrite.ReorderJoin;
@@ -272,6 +273,7 @@ public class Rewriter extends AbstractBatchJobExecutor {
topDown(new SplitLimit()),
topDown(new PushdownLimit(),
new PushdownTopNThroughJoin(),
+ new PushdownLimitDistinctThroughJoin(),
new PushdownTopNThroughWindow(),
new CreatePartitionTopNFromWindow()
)
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/RuleType.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/RuleType.java
index dcf13637c96..84ce5506eb0 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/RuleType.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/RuleType.java
@@ -250,6 +250,9 @@ public enum RuleType {
PUSH_TOP_N_THROUGH_PROJECT_JOIN(RuleTypeClass.REWRITE),
PUSH_TOP_N_THROUGH_PROJECT_WINDOW(RuleTypeClass.REWRITE),
PUSH_TOP_N_THROUGH_WINDOW(RuleTypeClass.REWRITE),
+ // limit distinct push down
+ PUSH_LIMIT_DISTINCT_THROUGH_JOIN(RuleTypeClass.REWRITE),
+ PUSH_LIMIT_DISTINCT_THROUGH_PROJECT_JOIN(RuleTypeClass.REWRITE),
// adjust nullable
ADJUST_NULLABLE(RuleTypeClass.REWRITE),
ADJUST_CONJUNCTS_RETURN_TYPE(RuleTypeClass.REWRITE),
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/PushdownLimitDistinctThroughJoin.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/PushdownLimitDistinctThroughJoin.java
new file mode 100644
index 00000000000..24737b63625
--- /dev/null
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/PushdownLimitDistinctThroughJoin.java
@@ -0,0 +1,109 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.nereids.rules.rewrite;
+
+import org.apache.doris.nereids.rules.Rule;
+import org.apache.doris.nereids.rules.RuleType;
+import org.apache.doris.nereids.trees.expressions.Slot;
+import org.apache.doris.nereids.trees.plans.Plan;
+import org.apache.doris.nereids.trees.plans.logical.LogicalAggregate;
+import org.apache.doris.nereids.trees.plans.logical.LogicalJoin;
+import org.apache.doris.nereids.trees.plans.logical.LogicalLimit;
+import org.apache.doris.nereids.trees.plans.logical.LogicalProject;
+
+import com.google.common.collect.ImmutableList;
+
+import java.util.List;
+import java.util.stream.Collectors;
+
+/**
+ * Same with PushdownLimit
+ */
+public class PushdownLimitDistinctThroughJoin implements RewriteRuleFactory {
+
+ @Override
+ public List<Rule> buildRules() {
+ return ImmutableList.of(
+ // limit -> distinct -> join
+ logicalLimit(logicalAggregate(logicalJoin())
+ .when(LogicalAggregate::isDistinct))
+ .then(limit -> {
+ LogicalAggregate<LogicalJoin<Plan, Plan>> agg =
limit.child();
+ LogicalJoin<Plan, Plan> join = agg.child();
+
+ Plan newJoin = pushLimitThroughJoin(limit, join);
+ if (newJoin == null ||
join.children().equals(newJoin.children())) {
+ return null;
+ }
+ return
limit.withChildren(agg.withChildren(newJoin));
+ })
+ .toRule(RuleType.PUSH_LIMIT_DISTINCT_THROUGH_JOIN),
+
+ // limit -> distinct -> project -> join
+
logicalLimit(logicalAggregate(logicalProject(logicalJoin()).when(LogicalProject::isAllSlots))
+ .when(LogicalAggregate::isDistinct))
+ .then(limit -> {
+ LogicalAggregate<LogicalProject<LogicalJoin<Plan,
Plan>>> agg = limit.child();
+ LogicalProject<LogicalJoin<Plan, Plan>> project =
agg.child();
+ LogicalJoin<Plan, Plan> join = project.child();
+
+ Plan newJoin = pushLimitThroughJoin(limit, join);
+ if (newJoin == null ||
join.children().equals(newJoin.children())) {
+ return null;
+ }
+ return
limit.withChildren(agg.withChildren(project.withChildren(newJoin)));
+ }).toRule(RuleType.PUSH_LIMIT_DISTINCT_THROUGH_JOIN)
+ );
+ }
+
+ private Plan pushLimitThroughJoin(LogicalLimit<?> limit, LogicalJoin<Plan,
Plan> join) {
+ LogicalAggregate<?> agg = (LogicalAggregate<?>) limit.child();
+ List<Slot> groupBySlots = agg.getGroupByExpressions().stream()
+ .flatMap(e ->
e.getInputSlots().stream()).collect(Collectors.toList());
+ switch (join.getJoinType()) {
+ case LEFT_OUTER_JOIN:
+ if (join.left().getOutputSet().containsAll(groupBySlots)
+ &&
join.left().getOutputSet().equals(agg.getOutputSet())) {
+ return
join.withChildren(limit.withLimitChild(limit.getLimit() + limit.getOffset(), 0,
+ agg.withChildren(join.left())), join.right());
+ }
+ return null;
+ case RIGHT_OUTER_JOIN:
+ if (join.right().getOutputSet().containsAll(groupBySlots)
+ &&
join.right().getOutputSet().equals(agg.getOutputSet())) {
+ return join.withChildren(join.left(),
limit.withLimitChild(limit.getLimit() + limit.getOffset(), 0,
+ agg.withChildren(join.right())));
+ }
+ return null;
+ case CROSS_JOIN:
+ if (join.left().getOutputSet().containsAll(groupBySlots)
+ &&
join.left().getOutputSet().equals(agg.getOutputSet())) {
+ return
join.withChildren(limit.withLimitChild(limit.getLimit() + limit.getOffset(), 0,
+ agg.withChildren(join.left())), join.right());
+ } else if
(join.right().getOutputSet().containsAll(groupBySlots)
+ &&
join.right().getOutputSet().equals(agg.getOutputSet())) {
+ return join.withChildren(join.left(),
limit.withLimitChild(limit.getLimit() + limit.getOffset(), 0,
+ agg.withChildren(join.right())));
+ } else {
+ return null;
+ }
+ default:
+ return null;
+ }
+ }
+}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/PushdownTopNThroughJoin.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/PushdownTopNThroughJoin.java
index ac179864393..b025d40b6d7 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/PushdownTopNThroughJoin.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/PushdownTopNThroughJoin.java
@@ -25,7 +25,6 @@ import org.apache.doris.nereids.trees.plans.Plan;
import org.apache.doris.nereids.trees.plans.logical.LogicalJoin;
import org.apache.doris.nereids.trees.plans.logical.LogicalProject;
import org.apache.doris.nereids.trees.plans.logical.LogicalTopN;
-import org.apache.doris.nereids.util.Utils;
import com.google.common.collect.ImmutableList;
@@ -85,28 +84,33 @@ public class PushdownTopNThroughJoin implements
RewriteRuleFactory {
}
private Plan pushLimitThroughJoin(LogicalTopN<? extends Plan> topN,
LogicalJoin<Plan, Plan> join) {
+ List<Slot> orderbySlots =
topN.getOrderKeys().stream().map(OrderKey::getExpr)
+ .flatMap(e ->
e.getInputSlots().stream()).collect(Collectors.toList());
switch (join.getJoinType()) {
case LEFT_OUTER_JOIN:
- Set<Slot> rightOutputSet = join.right().getOutputSet();
- if (topN.getOrderKeys().stream().map(OrderKey::getExpr)
- .anyMatch(e -> Utils.isIntersecting(rightOutputSet,
e.getInputSlots()))) {
- return null;
+ if (join.left().getOutputSet().containsAll(orderbySlots)) {
+ return join.withChildren(
+ topN.withLimitChild(topN.getLimit() +
topN.getOffset(), 0, join.left()),
+ join.right());
}
- return join.withChildren(topN.withChildren(join.left()),
join.right());
+ return null;
case RIGHT_OUTER_JOIN:
- Set<Slot> leftOutputSet = join.left().getOutputSet();
- if (topN.getOrderKeys().stream().map(OrderKey::getExpr)
- .anyMatch(e -> Utils.isIntersecting(leftOutputSet,
e.getInputSlots()))) {
- return null;
+ if (join.right().getOutputSet().containsAll(orderbySlots)) {
+ return join.withChildren(
+ join.left(),
+ topN.withLimitChild(topN.getLimit() +
topN.getOffset(), 0, join.right()));
}
- return join.withChildren(join.left(),
topN.withChildren(join.right()));
+ return null;
case CROSS_JOIN:
- List<Slot> orderbySlots =
topN.getOrderKeys().stream().map(OrderKey::getExpr)
- .flatMap(e ->
e.getInputSlots().stream()).collect(Collectors.toList());
+
if (join.left().getOutputSet().containsAll(orderbySlots)) {
- return join.withChildren(topN.withChildren(join.left()),
join.right());
+ return join.withChildren(
+ topN.withLimitChild(topN.getLimit() +
topN.getOffset(), 0, join.left()),
+ join.right());
} else if
(join.right().getOutputSet().containsAll(orderbySlots)) {
- return join.withChildren(join.left(),
topN.withChildren(join.right()));
+ return join.withChildren(
+ join.left(),
+ topN.withLimitChild(topN.getLimit() +
topN.getOffset(), 0, join.right()));
} else {
return null;
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalAggregate.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalAggregate.java
index 38f2b161900..ef278c61fbe 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalAggregate.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalAggregate.java
@@ -140,6 +140,11 @@ public class LogicalAggregate<CHILD_TYPE extends Plan>
return sourceRepeat;
}
+ public boolean isDistinct() {
+ return outputExpressions.stream().allMatch(e -> e instanceof Slot)
+ && groupByExpressions.stream().allMatch(e -> e instanceof
Slot);
+ }
+
public boolean hasRepeat() {
return sourceRepeat.isPresent();
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalLimit.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalLimit.java
index f03f335f018..e2bd6998a53 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalLimit.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalLimit.java
@@ -117,6 +117,12 @@ public class LogicalLimit<CHILD_TYPE extends Plan> extends
LogicalUnary<CHILD_TY
return ImmutableList.of();
}
+ public LogicalLimit<Plan> withLimitChild(long limit, long offset, Plan
child) {
+ Preconditions.checkArgument(children.size() == 1,
+ "LogicalTopN should have 1 child, but input is %s",
children.size());
+ return new LogicalLimit<>(limit, offset, phase, child);
+ }
+
public LogicalLimit<Plan> withLimitPhase(LimitPhase phase) {
return new LogicalLimit<>(limit, offset, phase, child());
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalTopN.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalTopN.java
index 80de9d6215a..02b239f1735 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalTopN.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalTopN.java
@@ -122,6 +122,13 @@ public class LogicalTopN<CHILD_TYPE extends Plan> extends
LogicalUnary<CHILD_TYP
Optional.empty(), Optional.of(getLogicalProperties()),
child());
}
+ public LogicalTopN<Plan> withLimitChild(long limit, long offset, Plan
child) {
+ Preconditions.checkArgument(children.size() == 1,
+ "LogicalTopN should have 1 child, but input is %s",
children.size());
+ return new LogicalTopN<>(orderKeys, limit, offset,
+ Optional.empty(), Optional.of(getLogicalProperties()), child);
+ }
+
@Override
public LogicalTopN<Plan> withChildren(List<Plan> children) {
Preconditions.checkArgument(children.size() == 1,
diff --git a/regression-test/data/nereids_p0/join/test_limit_join.out
b/regression-test/data/nereids_p0/join/test_limit_join.out
new file mode 100644
index 00000000000..311d110c2bb
--- /dev/null
+++ b/regression-test/data/nereids_p0/join/test_limit_join.out
@@ -0,0 +1,23 @@
+-- This file is automatically generated. You should know what you did if you
want to edit this
+-- !join1 --
+1
+2
+
+-- !join3 --
+0
+1
+
+-- !join5 --
+1
+1
+
+-- !join6 --
+1
+
+-- !join7 --
+0
+0
+
+-- !join8 --
+0
+
diff --git a/regression-test/suites/nereids_p0/join/test_limit_join.groovy
b/regression-test/suites/nereids_p0/join/test_limit_join.groovy
new file mode 100644
index 00000000000..8f4cbf88f34
--- /dev/null
+++ b/regression-test/suites/nereids_p0/join/test_limit_join.groovy
@@ -0,0 +1,105 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+suite("test_limit_join", "nereids_p0") {
+ def DBname = "nereids_regression_test_limit_join"
+ sql "DROP DATABASE IF EXISTS ${DBname}"
+ sql "CREATE DATABASE IF NOT EXISTS ${DBname}"
+ sql "use ${DBname}"
+ sql "SET enable_nereids_planner=true"
+ sql "SET enable_fallback_to_original_planner=false"
+
+ def tbName1 = "t1"
+ def tbName2 = "t2"
+
+ sql "DROP TABLE IF EXISTS ${tbName1};"
+ sql "DROP TABLE IF EXISTS ${tbName2};"
+
+ sql """create table if not exists ${tbName1} (c1 int, c2 int) DISTRIBUTED
BY HASH(c1) properties("replication_num" = "1");"""
+ sql """create table if not exists ${tbName2} (c1 int, c2 int, c3 int)
DISTRIBUTED BY HASH(c1) properties("replication_num" = "1");"""
+
+ sql "insert into ${tbName1} values (1,1);"
+ sql "insert into ${tbName1} values (2,2);"
+ sql "insert into ${tbName1} values (1,null);"
+ sql "insert into ${tbName1} values (2,null);"
+ sql "insert into ${tbName2} values (0,1,9999);"
+ sql "insert into ${tbName2} values (1,1,9999);"
+ sql "insert into ${tbName2} values (0,null,9999);"
+ sql "insert into ${tbName2} values (1,null,9999);"
+
+
+ /* test push limit-distinct through join */
+ order_qt_join1 """
+ SELECT t1.c1
+ FROM ${tbName1} t1 left join ${tbName2} t2 on t1.c1 = t2.c1
+ GROUP BY t1.c1
+ limit 2;
+ """
+
+ sql """
+ SELECT t1.c1
+ FROM ${tbName1} t1 left join ${tbName2} t2 on t1.c1 = t2.c1
+ GROUP BY t1.c1
+ LIMIT 1 OFFSET 1;
+ """
+
+ order_qt_join3 """
+ SELECT t2.c1
+ FROM ${tbName1} t1 right join ${tbName2} t2 on t1.c1 = t2.c1
+ GROUP BY t2.c1
+ limit 2;
+ """
+
+ sql """
+ SELECT t2.c1
+ FROM ${tbName1} t1 right join ${tbName2} t2 on t1.c1 = t2.c1
+ GROUP BY t2.c1
+ LIMIT 1 OFFSET 1;
+ """
+
+ /* test push topN through join */
+ qt_join5 """
+ SELECT t1.c1
+ FROM ${tbName1} t1 left join ${tbName2} t2 on t1.c1 = t2.c1
+ ORDER BY t1.c1
+ limit 2;
+ """
+
+ qt_join6 """
+ SELECT t1.c1
+ FROM ${tbName1} t1 left join ${tbName2} t2 on t1.c1 = t2.c1
+ ORDER BY t1.c1
+ LIMIT 1 OFFSET 1;
+ """
+
+ qt_join7 """
+ SELECT t2.c1
+ FROM ${tbName1} t1 right join ${tbName2} t2 on t1.c1 = t2.c1
+ ORDER BY t2.c1
+ limit 2;
+ """
+
+ qt_join8 """
+ SELECT t2.c1
+ FROM ${tbName1} t1 right join ${tbName2} t2 on t1.c1 = t2.c1
+ ORDER BY t2.c1
+ LIMIT 1 OFFSET 1;
+ """
+
+ sql "DROP DATABASE IF EXISTS ${DBname};"
+}
+
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]