This is an automated email from the ASF dual-hosted git repository.

huajianlan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new b11791b9a8 [Feature](Nereids) Limit pushdown. (#12518)
b11791b9a8 is described below

commit b11791b9a87241586701c483462678d49ce81367
Author: Shuo Wang <[email protected]>
AuthorDate: Thu Sep 15 12:12:10 2022 +0800

    [Feature](Nereids) Limit pushdown. (#12518)
    
    This PR adds rewrite rules to push the limit down. Following two cases 
would be handled:
    ```
    limit -> join
    limit -> project -> join
    ```
---
 .../doris/nereids/jobs/batch/RewriteJob.java       |   4 +-
 .../org/apache/doris/nereids/rules/RuleType.java   |   4 +
 .../rules/rewrite/logical/LimitPushDown.java       | 116 +++++++++++
 .../doris/nereids/trees/plans/algebra/Join.java    |   7 +
 .../doris/nereids/trees/plans/algebra/Limit.java   |   8 +
 .../rules/rewrite/logical/LimitPushDownTest.java   | 230 +++++++++++++++++++++
 6 files changed, 368 insertions(+), 1 deletion(-)

diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/jobs/batch/RewriteJob.java 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/jobs/batch/RewriteJob.java
index 2aea7994fa..242687ba6d 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/jobs/batch/RewriteJob.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/jobs/batch/RewriteJob.java
@@ -26,6 +26,7 @@ import 
org.apache.doris.nereids.rules.rewrite.logical.ColumnPruning;
 import org.apache.doris.nereids.rules.rewrite.logical.EliminateFilter;
 import org.apache.doris.nereids.rules.rewrite.logical.EliminateLimit;
 import org.apache.doris.nereids.rules.rewrite.logical.FindHashConditionForJoin;
+import org.apache.doris.nereids.rules.rewrite.logical.LimitPushDown;
 import org.apache.doris.nereids.rules.rewrite.logical.MergeConsecutiveFilters;
 import org.apache.doris.nereids.rules.rewrite.logical.MergeConsecutiveLimits;
 import org.apache.doris.nereids.rules.rewrite.logical.MergeConsecutiveProjects;
@@ -62,7 +63,7 @@ public class RewriteJob extends BatchRulesJob {
                 .addAll(new ConvertApplyToJoinJob(cascadesContext).rulesJob)
                 .add(topDownBatch(ImmutableList.of(new 
ExpressionNormalization())))
                 .add(topDownBatch(ImmutableList.of(new NormalizeAggregate())))
-                 .add(topDownBatch(ImmutableList.of(new ReorderJoin())))
+                .add(topDownBatch(ImmutableList.of(new ReorderJoin())))
                 .add(topDownBatch(ImmutableList.of(new 
FindHashConditionForJoin())))
                 .add(topDownBatch(ImmutableList.of(new NormalizeAggregate())))
                 .add(topDownBatch(ImmutableList.of(new ColumnPruning())))
@@ -73,6 +74,7 @@ public class RewriteJob extends BatchRulesJob {
                         new MergeConsecutiveFilters(),
                         new MergeConsecutiveLimits())))
                 .add(topDownBatch(ImmutableList.of(new 
AggregateDisassemble())))
+                .add(topDownBatch(ImmutableList.of(new LimitPushDown())))
                 .add(topDownBatch(ImmutableList.of(new EliminateLimit())))
                 .add(topDownBatch(ImmutableList.of(new EliminateFilter())))
                 .add(topDownBatch(ImmutableList.of(new 
PruneOlapScanPartition())))
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/RuleType.java 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/RuleType.java
index d90e0c089d..826acf522f 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/RuleType.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/RuleType.java
@@ -112,6 +112,10 @@ public enum RuleType {
     SWAP_LIMIT_PROJECT(RuleTypeClass.REWRITE),
     REWRITE_SENTINEL(RuleTypeClass.REWRITE),
 
+    // limit push down
+    PUSH_LIMIT_THROUGH_JOIN(RuleTypeClass.REWRITE),
+    PUSH_LIMIT_THROUGH_PROJECT_JOIN(RuleTypeClass.REWRITE),
+
     // exploration rules
     TEST_EXPLORATION(RuleTypeClass.EXPLORATION),
     LOGICAL_JOIN_COMMUTATE(RuleTypeClass.EXPLORATION),
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/logical/LimitPushDown.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/logical/LimitPushDown.java
new file mode 100644
index 0000000000..33bda76078
--- /dev/null
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/logical/LimitPushDown.java
@@ -0,0 +1,116 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.nereids.rules.rewrite.logical;
+
+import org.apache.doris.nereids.rules.Rule;
+import org.apache.doris.nereids.rules.RuleType;
+import org.apache.doris.nereids.rules.rewrite.RewriteRuleFactory;
+import org.apache.doris.nereids.trees.plans.Plan;
+import org.apache.doris.nereids.trees.plans.algebra.EmptyRelation;
+import org.apache.doris.nereids.trees.plans.algebra.Limit;
+import org.apache.doris.nereids.trees.plans.algebra.OneRowRelation;
+import org.apache.doris.nereids.trees.plans.logical.LogicalEmptyRelation;
+import org.apache.doris.nereids.trees.plans.logical.LogicalJoin;
+import org.apache.doris.nereids.trees.plans.logical.LogicalLimit;
+import org.apache.doris.nereids.trees.plans.logical.LogicalProject;
+
+import com.google.common.collect.ImmutableList;
+
+import java.util.List;
+
+/**
+ * Rules to push {@link 
org.apache.doris.nereids.trees.plans.logical.LogicalLimit} down.
+ * <p>
+ * Limit can't be push down if it has a valid offset info.
+ */
+public class LimitPushDown implements RewriteRuleFactory {
+
+    @Override
+    public List<Rule> buildRules() {
+        return ImmutableList.of(
+                // limit -> join
+                logicalLimit(logicalJoin(any(), 
any())).whenNot(Limit::hasValidOffset)
+                        .then(limit -> 
limit.withChildren(pushLimitThroughJoin(limit, limit.child())))
+                        .toRule(RuleType.PUSH_LIMIT_THROUGH_JOIN),
+
+                // limit -> project -> join
+                logicalLimit(logicalProject(logicalJoin(any(), 
any()))).whenNot(Limit::hasValidOffset)
+                        .then(limit -> {
+                            LogicalProject<LogicalJoin<Plan, Plan>> project = 
limit.child();
+                            LogicalJoin<Plan, Plan> join = project.child();
+                            return limit.withChildren(
+                                    project.withChildren(
+                                            pushLimitThroughJoin(limit, 
join)));
+                        }).toRule(RuleType.PUSH_LIMIT_THROUGH_PROJECT_JOIN)
+        );
+    }
+
+    private Plan pushLimitThroughJoin(LogicalLimit<? extends Plan> limit, 
LogicalJoin<Plan, Plan> join) {
+        switch (join.getJoinType()) {
+            case LEFT_OUTER_JOIN:
+                return join.withChildren(
+                        addLimit(limit, join.left()),
+                        join.right()
+                );
+            case RIGHT_OUTER_JOIN:
+                return join.withChildren(
+                        join.left(),
+                        addLimit(limit, join.right())
+                );
+            case CROSS_JOIN:
+                return join.withChildren(
+                        addLimit(limit, join.left()),
+                        addLimit(limit, join.right())
+                );
+            case INNER_JOIN:
+                if (join.hasJoinCondition()) {
+                    return join;
+                } else {
+                    return join.withChildren(
+                            addLimit(limit, join.left()),
+                            addLimit(limit, join.right())
+                    );
+                }
+            default:
+                // don't push limit.
+                return join;
+        }
+    }
+
+    private Plan addLimit(LogicalLimit<? extends Plan> pushdownLimit, Plan 
plan) {
+        if (plan instanceof LogicalLimit) {
+            // Avoid adding duplicate limits on top of the plan, otherwise 
would result in dead loop
+            // when applying the rule multiple times.
+            LogicalLimit<? extends Plan> limit = (LogicalLimit<? extends 
Plan>) plan;
+            // plan is pure limit and limit value > push down limit value
+            if (!limit.hasValidOffset() && limit.getLimit() > 
pushdownLimit.getLimit()) {
+                // replace limit.
+                return pushdownLimit.withChildren(limit.child());
+            } else {
+                // return input plan.
+                return plan;
+            }
+        } else if (plan instanceof OneRowRelation) {
+            return pushdownLimit.getLimit() > 0 ? plan : new 
LogicalEmptyRelation((List) plan.getOutput());
+        } else if (plan instanceof EmptyRelation) {
+            return plan;
+        } else {
+            return pushdownLimit.withChildren(plan);
+        }
+    }
+}
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/algebra/Join.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/algebra/Join.java
index 43765a6649..48fb08250c 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/algebra/Join.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/algebra/Join.java
@@ -34,4 +34,11 @@ public interface Join {
     Optional<Expression> getOtherJoinCondition();
 
     Optional<Expression> getOnClauseCondition();
+
+    /**
+     * The join plan has join condition or not.
+     */
+    default boolean hasJoinCondition() {
+        return !getHashJoinConjuncts().isEmpty() || 
getOtherJoinCondition().isPresent();
+    }
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/algebra/Limit.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/algebra/Limit.java
index a198d1253d..dc5ca26936 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/algebra/Limit.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/algebra/Limit.java
@@ -24,4 +24,12 @@ public interface Limit {
     long getLimit();
 
     long getOffset();
+
+    /**
+     * This limit node has valid offset info or not.
+     * We treat the limit as having a valid offset info only when `getOffset` 
result is a positive value.
+     */
+    default boolean hasValidOffset() {
+        return getOffset() > 0;
+    }
 }
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/nereids/rules/rewrite/logical/LimitPushDownTest.java
 
b/fe/fe-core/src/test/java/org/apache/doris/nereids/rules/rewrite/logical/LimitPushDownTest.java
new file mode 100644
index 0000000000..714b5511a6
--- /dev/null
+++ 
b/fe/fe-core/src/test/java/org/apache/doris/nereids/rules/rewrite/logical/LimitPushDownTest.java
@@ -0,0 +1,230 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.nereids.rules.rewrite.logical;
+
+import org.apache.doris.nereids.analyzer.UnboundSlot;
+import org.apache.doris.nereids.pattern.PatternDescriptor;
+import org.apache.doris.nereids.trees.expressions.EqualTo;
+import org.apache.doris.nereids.trees.expressions.Expression;
+import org.apache.doris.nereids.trees.plans.JoinType;
+import org.apache.doris.nereids.trees.plans.Plan;
+import org.apache.doris.nereids.trees.plans.logical.LogicalJoin;
+import org.apache.doris.nereids.trees.plans.logical.LogicalLimit;
+import org.apache.doris.nereids.trees.plans.logical.LogicalOlapScan;
+import org.apache.doris.nereids.trees.plans.logical.LogicalProject;
+import org.apache.doris.nereids.util.MemoTestUtils;
+import org.apache.doris.nereids.util.PatternMatchSupported;
+import org.apache.doris.nereids.util.PlanChecker;
+import org.apache.doris.nereids.util.PlanConstructor;
+import org.apache.doris.planner.OlapScanNode;
+import org.apache.doris.planner.PlanFragment;
+import org.apache.doris.utframe.TestWithFeService;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Sets;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+class LimitPushDownTest extends TestWithFeService implements 
PatternMatchSupported {
+    private Plan scanScore = new LogicalOlapScan(PlanConstructor.score);
+    private Plan scanStudent = new LogicalOlapScan(PlanConstructor.student);
+
+    @Override
+    protected void runBeforeAll() throws Exception {
+        createDatabase("test");
+
+        connectContext.setDatabase("default_cluster:test");
+
+        createTable("CREATE TABLE `t1` (\n"
+                + "  `k1` int(11) NULL\n"
+                + ") ENGINE=OLAP\n"
+                + "COMMENT 'OLAP'\n"
+                + "DISTRIBUTED BY HASH(`k1`) BUCKETS 3\n"
+                + "PROPERTIES (\n"
+                + "\"replication_allocation\" = \"tag.location.default: 1\",\n"
+                + "\"in_memory\" = \"false\",\n"
+                + "\"storage_format\" = \"V2\",\n"
+                + "\"disable_auto_compaction\" = \"false\"\n"
+                + ");");
+
+        createTable("CREATE TABLE `t2` (\n"
+                + "  `k1` int(11) NULL\n"
+                + ") ENGINE=OLAP\n"
+                + "COMMENT 'OLAP'\n"
+                + "DISTRIBUTED BY HASH(`k1`) BUCKETS 3\n"
+                + "PROPERTIES (\n"
+                + "\"replication_allocation\" = \"tag.location.default: 1\",\n"
+                + "\"in_memory\" = \"false\",\n"
+                + "\"storage_format\" = \"V2\",\n"
+                + "\"disable_auto_compaction\" = \"false\"\n"
+                + ");");
+    }
+
+    @Test
+    public void testPushLimitThroughLeftJoin() {
+        test(JoinType.LEFT_OUTER_JOIN, true,
+                logicalLimit(
+                        logicalProject(
+                                logicalJoin(
+                                        logicalLimit(logicalOlapScan().when(s 
-> s.equals(scanScore))),
+                                        logicalOlapScan().when(s -> 
s.equals(scanStudent))
+                                ).when(j -> j.getJoinType() == 
JoinType.LEFT_OUTER_JOIN)
+                        )
+                )
+        );
+        test(JoinType.LEFT_OUTER_JOIN, false,
+                logicalLimit(
+                        logicalJoin(
+                                logicalLimit(logicalOlapScan().when(s -> 
s.equals(scanScore))),
+                                logicalOlapScan().when(s -> 
s.equals(scanStudent))
+                        ).when(j -> j.getJoinType() == 
JoinType.LEFT_OUTER_JOIN)
+                )
+        );
+    }
+
+    @Test
+    public void testPushLimitThroughRightJoin() {
+        test(JoinType.RIGHT_OUTER_JOIN, true,
+                logicalLimit(
+                        logicalProject(
+                                logicalJoin(
+                                        logicalOlapScan().when(s -> 
s.equals(scanScore)),
+                                        logicalLimit(logicalOlapScan().when(s 
-> s.equals(scanStudent)))
+                                ).when(j -> j.getJoinType() == 
JoinType.RIGHT_OUTER_JOIN)
+                        )
+                )
+        );
+        test(JoinType.RIGHT_OUTER_JOIN, false,
+                logicalLimit(
+                        logicalJoin(
+                                logicalOlapScan().when(s -> 
s.equals(scanScore)),
+                                logicalLimit(logicalOlapScan().when(s -> 
s.equals(scanStudent)))
+                        ).when(j -> j.getJoinType() == 
JoinType.RIGHT_OUTER_JOIN)
+                )
+        );
+    }
+
+    @Test
+    public void testPushLimitThroughCrossJoin() {
+        test(JoinType.CROSS_JOIN, true,
+                logicalLimit(
+                        logicalProject(
+                                logicalJoin(
+                                        logicalLimit(logicalOlapScan().when(s 
-> s.equals(scanScore))),
+                                        logicalLimit(logicalOlapScan().when(s 
-> s.equals(scanStudent)))
+                                ).when(j -> j.getJoinType() == 
JoinType.CROSS_JOIN)
+                        )
+                )
+        );
+        test(JoinType.CROSS_JOIN, false,
+                logicalLimit(
+                        logicalJoin(
+                                logicalLimit(logicalOlapScan().when(s -> 
s.equals(scanScore))),
+                                logicalLimit(logicalOlapScan().when(s -> 
s.equals(scanStudent)))
+                        ).when(j -> j.getJoinType() == JoinType.CROSS_JOIN)
+                )
+        );
+    }
+
+    @Test
+    public void testPushLimitThroughInnerJoin() {
+        test(JoinType.INNER_JOIN, true,
+                logicalLimit(
+                        logicalProject(
+                                logicalJoin(
+                                        logicalLimit(logicalOlapScan().when(s 
-> s.equals(scanScore))),
+                                        logicalLimit(logicalOlapScan().when(s 
-> s.equals(scanStudent)))
+                                ).when(j -> j.getJoinType() == 
JoinType.INNER_JOIN)
+                        )
+                )
+        );
+        test(JoinType.INNER_JOIN, false,
+                logicalLimit(
+                        logicalJoin(
+                                logicalLimit(logicalOlapScan().when(s -> 
s.equals(scanScore))),
+                                logicalLimit(logicalOlapScan().when(s -> 
s.equals(scanStudent)))
+                        ).when(j -> j.getJoinType() == JoinType.INNER_JOIN)
+                )
+        );
+    }
+
+    @Test
+    public void testTranslate() {
+        PlanChecker.from(connectContext).checkPlannerResult("select * from t1 
left join t2 on t1.k1=t2.k1 limit 5",
+                planner -> {
+                    List<PlanFragment> fragments = planner.getFragments();
+                    Map<String, OlapScanNode> nameToScan = fragments.stream()
+                            .flatMap(fragment -> {
+                                Set<OlapScanNode> scans = Sets.newHashSet();
+                                
fragment.getPlanRoot().collect(OlapScanNode.class, scans);
+                                return scans.stream();
+                            })
+                            .collect(Collectors.toMap(
+                                    olapScanNode -> 
olapScanNode.getOlapTable().getName(),
+                                    Function.identity(),
+                                    // plan among fragments has duplicate 
elements.
+                                    (s1, s2) -> s1)
+                            );
+
+                    // limit is push down to left scan of `t1`.
+                    Assertions.assertEquals(2, nameToScan.size());
+                    Assertions.assertEquals(5, 
nameToScan.get("t1").getLimit());
+                }
+        );
+    }
+
+    private void test(JoinType joinType, boolean hasProject, 
PatternDescriptor<? extends Plan> pattern) {
+        Plan plan = generatePlan(joinType, hasProject);
+        PlanChecker.from(MemoTestUtils.createConnectContext())
+                .analyze(plan)
+                .applyTopDown(new LimitPushDown())
+                .matchesFromRoot(pattern);
+    }
+
+    private Plan generatePlan(JoinType joinType, boolean hasProject) {
+        ImmutableList<Expression> joinConditions =
+                joinType == JoinType.CROSS_JOIN || joinType == 
JoinType.INNER_JOIN
+                        ? ImmutableList.of()
+                        : ImmutableList.of(new EqualTo(new UnboundSlot("sid"), 
new UnboundSlot("id")));
+
+        LogicalJoin<? extends Plan, ? extends Plan> join = new LogicalJoin<>(
+                joinType,
+                joinConditions,
+                Optional.empty(),
+                new LogicalOlapScan(PlanConstructor.score),
+                new LogicalOlapScan(PlanConstructor.student)
+        );
+
+        if (hasProject) {
+            // return limit -> project -> join
+            return new LogicalLimit<>(10, 0, new LogicalProject<>(
+                    ImmutableList.of(new UnboundSlot("sid"), new 
UnboundSlot("id")),
+                    join));
+        } else {
+            // return limit -> join
+            return new LogicalLimit<>(10, 0, join);
+        }
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to