This is an automated email from the ASF dual-hosted git repository.
morrysnow pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 704faaed84 [feature](Nereids) add rule split limit into two phase
(#16797)
704faaed84 is described below
commit 704faaed84464e6c2f3e7fe2b07602ff3ddd283f
Author: 谢健 <[email protected]>
AuthorDate: Tue Mar 7 15:34:12 2023 +0800
[feature](Nereids) add rule split limit into two phase (#16797)
1. Add a rule split limit, like Limit(Origin) ==> Limit(Global) -> Gather
-> Limit(Local)
2. Add a rule: limit-> sort ==> topN
3. fix a bug about topN
4. make the type of limit,offset long in topN
And because this rule is always beneficial, we add a rule in the rewrite
phase
---
.../glue/translator/PhysicalPlanTranslator.java | 62 ++++++++-----------
.../doris/nereids/jobs/batch/NereidsRewriter.java | 2 +
.../doris/nereids/parser/LogicalPlanBuilder.java | 3 +-
.../properties/ChildOutputPropertyDeriver.java | 12 +---
.../nereids/properties/RequestPropertyDeriver.java | 15 ++++-
.../org/apache/doris/nereids/rules/RuleType.java | 4 +-
.../LogicalLimitToPhysicalLimit.java | 1 +
.../implementation/LogicalTopNToPhysicalTopN.java | 12 +++-
.../rules/rewrite/logical/ExistsApplyToJoin.java | 5 +-
.../nereids/rules/rewrite/logical/MergeLimits.java | 19 +++---
.../rules/rewrite/logical/PushdownLimit.java | 12 ++++
.../logical/PushdownProjectThroughLimit.java | 4 +-
.../logical/{MergeLimits.java => SplitLimit.java} | 42 ++++++-------
.../plans/{algebra/TopN.java => LimitPhase.java} | 20 +++++--
.../doris/nereids/trees/plans/algebra/TopN.java | 4 +-
.../nereids/trees/plans/logical/LogicalLimit.java | 26 +++++---
.../nereids/trees/plans/logical/LogicalTopN.java | 12 ++--
.../trees/plans/physical/PhysicalLimit.java | 33 +++++++----
.../nereids/trees/plans/physical/PhysicalTopN.java | 14 ++---
.../nereids/trees/plans/visitor/PlanVisitor.java | 2 +-
.../org/apache/doris/nereids/memo/MemoTest.java | 69 +++++++++++-----------
.../properties/ChildOutputPropertyDeriverTest.java | 15 ++++-
.../rules/implementation/ImplementationTest.java | 3 +-
.../rules/rewrite/logical/EliminateLimitTest.java | 21 ++++++-
.../rules/rewrite/logical/MergeLimitsTest.java | 7 ++-
.../rules/rewrite/logical/PushdownLimitTest.java | 20 +++++--
...EliminateLimitTest.java => SplitLimitTest.java} | 30 ++++------
.../doris/nereids/stats/StatsCalculatorTest.java | 6 +-
.../nereids/trees/plans/PlanToStringTest.java | 2 +-
.../doris/nereids/util/LogicalPlanBuilder.java | 3 +-
30 files changed, 283 insertions(+), 197 deletions(-)
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java
index b28db7c0f8..500f0360a5 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java
@@ -808,9 +808,13 @@ public class PhysicalPlanTranslator extends
DefaultPlanVisitor<PlanFragment, Pla
PlanFragment inputFragment = topN.child(0).accept(this, context);
PlanFragment currentFragment = inputFragment;
- //1. generate new fragment for sort when the child is exchangeNode
- if (inputFragment.getPlanRoot() instanceof ExchangeNode) {
- Preconditions.checkArgument(!topN.getSortPhase().isLocal());
+ //1. Generate new fragment for sort when the child is exchangeNode, If
the child is
+ // mergingExchange, it means we have always generated a new fragment
when processing mergeSort
+ if (inputFragment.getPlanRoot() instanceof ExchangeNode
+ && !((ExchangeNode)
inputFragment.getPlanRoot()).isMergingExchange()) {
+ // Except LocalTopN->MergeTopN, we don't allow localTopN's child
is Exchange Node
+ Preconditions.checkArgument(!topN.getSortPhase().isLocal(),
+ "local sort requires any property but child is" +
inputFragment.getPlanRoot());
DataPartition outputPartition = DataPartition.UNPARTITIONED;
ExchangeNode exchangeNode = (ExchangeNode)
inputFragment.getPlanRoot();
inputFragment.setOutputPartition(outputPartition);
@@ -822,20 +826,28 @@ public class PhysicalPlanTranslator extends
DefaultPlanVisitor<PlanFragment, Pla
// 2. According to the type of sort, generate physical plan
if (!topN.getSortPhase().isMerge()) {
- // For localSort or Gather->Sort, we just need to add sortNode
+ // For localSort or Gather->Sort, we just need to add TopNNode
SortNode sortNode = translateSortNode(topN,
inputFragment.getPlanRoot(), context);
+ sortNode.setOffset(topN.getOffset());
+ sortNode.setLimit(topN.getLimit());
currentFragment.addPlanRoot(sortNode);
} else {
// For mergeSort, we need to push sortInfo to exchangeNode
if (!(currentFragment.getPlanRoot() instanceof ExchangeNode)) {
// if there is no exchange node for mergeSort
- // e.g., localSort -> mergeSort
+ // e.g., mergeTopN -> localTopN
// It means the local has satisfied the Gather property. We
can just ignore mergeSort
+ currentFragment.getPlanRoot().setOffset(topN.getOffset());
+ currentFragment.getPlanRoot().setLimit(topN.getLimit());
return currentFragment;
}
- Preconditions.checkArgument(inputFragment.getPlanRoot() instanceof
SortNode);
+ Preconditions.checkArgument(inputFragment.getPlanRoot() instanceof
SortNode,
+ "mergeSort' child must be sortNode");
SortNode sortNode = (SortNode) inputFragment.getPlanRoot();
- ((ExchangeNode)
currentFragment.getPlanRoot()).setMergeInfo(sortNode.getSortInfo());
+ ExchangeNode exchangeNode = (ExchangeNode)
currentFragment.getPlanRoot();
+ exchangeNode.setMergeInfo(sortNode.getSortInfo());
+ exchangeNode.setLimit(topN.getLimit());
+ exchangeNode.setOffset(topN.getOffset());
}
return currentFragment;
}
@@ -1388,38 +1400,12 @@ public class PhysicalPlanTranslator extends
DefaultPlanVisitor<PlanFragment, Pla
if (inputFragment == null) {
return inputFragment;
}
-
+ // For case globalLimit(l, o) -> LocalLimit(l+o, 0), that is the
LocalLimit has already gathered
+ // The globalLimit can overwrite the limit and offset, so it's still
correct
PlanNode child = inputFragment.getPlanRoot();
-
- // physical plan: limit --> sort
- // after translate, it could be:
- // 1. limit->sort => set (limit and offset) on sort
- // 2. limit->exchange->sort => set (limit and offset) on exchange, set
sort.limit = limit+offset
- if (child instanceof SortNode) {
- SortNode sort = (SortNode) child;
- sort.setLimit(physicalLimit.getLimit());
- sort.setOffset(physicalLimit.getOffset());
- return inputFragment;
- }
- if (child instanceof ExchangeNode) {
- ExchangeNode exchangeNode = (ExchangeNode) child;
- exchangeNode.setLimit(physicalLimit.getLimit());
- // we do not check if this is a merging exchange here,
- // since this guaranteed by translating logic plan to physical plan
- exchangeNode.setOffset(physicalLimit.getOffset());
- if (exchangeNode.getChild(0) instanceof SortNode) {
- SortNode sort = (SortNode) exchangeNode.getChild(0);
- sort.setLimit(physicalLimit.getLimit() +
physicalLimit.getOffset());
- sort.setOffset(0);
- }
- return inputFragment;
- }
- // for other PlanNode, just set limit as limit+offset
- child.setLimit(physicalLimit.getLimit() + physicalLimit.getOffset());
- PlanFragment planFragment = exchangeToMergeFragment(inputFragment,
context);
- planFragment.getPlanRoot().setLimit(physicalLimit.getLimit());
-
planFragment.getPlanRoot().setOffSetDirectly(physicalLimit.getOffset());
- return planFragment;
+ child.setLimit(physicalLimit.getLimit());
+ child.setOffset(physicalLimit.getOffset());
+ return inputFragment;
}
@Override
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/jobs/batch/NereidsRewriter.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/jobs/batch/NereidsRewriter.java
index cfe29964f1..1d51420a7e 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/jobs/batch/NereidsRewriter.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/jobs/batch/NereidsRewriter.java
@@ -61,6 +61,7 @@ import
org.apache.doris.nereids.rules.rewrite.logical.PruneOlapScanTablet;
import org.apache.doris.nereids.rules.rewrite.logical.PushFilterInsideJoin;
import org.apache.doris.nereids.rules.rewrite.logical.PushdownLimit;
import org.apache.doris.nereids.rules.rewrite.logical.ReorderJoin;
+import org.apache.doris.nereids.rules.rewrite.logical.SplitLimit;
import java.util.List;
@@ -192,6 +193,7 @@ public class NereidsRewriter extends BatchRewriteJob {
new EliminateAggregate(),
new MergeSetOperations(),
new PushdownLimit(),
+ new SplitLimit(),
new BuildAggForUnion()
)),
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/parser/LogicalPlanBuilder.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/parser/LogicalPlanBuilder.java
index 185e74368b..3947e8139b 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/parser/LogicalPlanBuilder.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/parser/LogicalPlanBuilder.java
@@ -194,6 +194,7 @@ import
org.apache.doris.nereids.trees.expressions.literal.TinyIntLiteral;
import org.apache.doris.nereids.trees.expressions.literal.VarcharLiteral;
import org.apache.doris.nereids.trees.plans.JoinHint;
import org.apache.doris.nereids.trees.plans.JoinType;
+import org.apache.doris.nereids.trees.plans.LimitPhase;
import org.apache.doris.nereids.trees.plans.Plan;
import org.apache.doris.nereids.trees.plans.algebra.Aggregate;
import org.apache.doris.nereids.trees.plans.algebra.SetOperation.Qualifier;
@@ -1346,7 +1347,7 @@ public class LogicalPlanBuilder extends
DorisParserBaseVisitor<Object> {
if (offsetToken != null) {
offset = Long.parseLong(offsetToken.getText());
}
- return new LogicalLimit<>(limit, offset, input);
+ return new LogicalLimit<>(limit, offset, LimitPhase.ORIGIN, input);
});
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/ChildOutputPropertyDeriver.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/ChildOutputPropertyDeriver.java
index 4de8c769fe..4623ba311e 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/ChildOutputPropertyDeriver.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/ChildOutputPropertyDeriver.java
@@ -26,6 +26,7 @@ import
org.apache.doris.nereids.trees.expressions.NamedExpression;
import org.apache.doris.nereids.trees.expressions.SlotReference;
import
org.apache.doris.nereids.trees.expressions.functions.table.TableValuedFunction;
import org.apache.doris.nereids.trees.plans.Plan;
+import org.apache.doris.nereids.trees.plans.physical.AbstractPhysicalSort;
import org.apache.doris.nereids.trees.plans.physical.PhysicalAssertNumRows;
import org.apache.doris.nereids.trees.plans.physical.PhysicalDistribute;
import org.apache.doris.nereids.trees.plans.physical.PhysicalEsScan;
@@ -39,10 +40,8 @@ import
org.apache.doris.nereids.trees.plans.physical.PhysicalLimit;
import org.apache.doris.nereids.trees.plans.physical.PhysicalNestedLoopJoin;
import org.apache.doris.nereids.trees.plans.physical.PhysicalOlapScan;
import org.apache.doris.nereids.trees.plans.physical.PhysicalProject;
-import org.apache.doris.nereids.trees.plans.physical.PhysicalQuickSort;
import
org.apache.doris.nereids.trees.plans.physical.PhysicalStorageLayerAggregate;
import org.apache.doris.nereids.trees.plans.physical.PhysicalTVFRelation;
-import org.apache.doris.nereids.trees.plans.physical.PhysicalTopN;
import org.apache.doris.nereids.trees.plans.visitor.PlanVisitor;
import org.apache.doris.nereids.util.JoinUtils;
@@ -105,13 +104,8 @@ public class ChildOutputPropertyDeriver extends
PlanVisitor<PhysicalProperties,
}
@Override
- public PhysicalProperties visitPhysicalTopN(PhysicalTopN<? extends Plan>
topN, PlanContext context) {
- Preconditions.checkState(childrenOutputProperties.size() == 1);
- return new PhysicalProperties(DistributionSpecGather.INSTANCE, new
OrderSpec(topN.getOrderKeys()));
- }
-
- @Override
- public PhysicalProperties visitPhysicalQuickSort(PhysicalQuickSort<?
extends Plan> sort, PlanContext context) {
+ public PhysicalProperties visitAbstractPhysicalSort(AbstractPhysicalSort<?
extends Plan> sort,
+ PlanContext context) {
Preconditions.checkState(childrenOutputProperties.size() == 1);
if (sort.getSortPhase().isLocal()) {
return new PhysicalProperties(
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/RequestPropertyDeriver.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/RequestPropertyDeriver.java
index 77d17f2701..2ca4fdc169 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/RequestPropertyDeriver.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/RequestPropertyDeriver.java
@@ -29,11 +29,12 @@ import
org.apache.doris.nereids.trees.expressions.SlotReference;
import
org.apache.doris.nereids.trees.expressions.functions.agg.AggregateFunction;
import org.apache.doris.nereids.trees.plans.JoinHint;
import org.apache.doris.nereids.trees.plans.Plan;
+import org.apache.doris.nereids.trees.plans.physical.AbstractPhysicalSort;
import org.apache.doris.nereids.trees.plans.physical.PhysicalAssertNumRows;
import org.apache.doris.nereids.trees.plans.physical.PhysicalGenerate;
import org.apache.doris.nereids.trees.plans.physical.PhysicalHashJoin;
+import org.apache.doris.nereids.trees.plans.physical.PhysicalLimit;
import org.apache.doris.nereids.trees.plans.physical.PhysicalNestedLoopJoin;
-import org.apache.doris.nereids.trees.plans.physical.PhysicalQuickSort;
import org.apache.doris.nereids.trees.plans.visitor.PlanVisitor;
import org.apache.doris.nereids.util.ExpressionUtils;
import org.apache.doris.nereids.util.JoinUtils;
@@ -94,7 +95,7 @@ public class RequestPropertyDeriver extends PlanVisitor<Void,
PlanContext> {
}
@Override
- public Void visitPhysicalQuickSort(PhysicalQuickSort<? extends Plan> sort,
PlanContext context) {
+ public Void visitAbstractPhysicalSort(AbstractPhysicalSort<? extends Plan>
sort, PlanContext context) {
if (!sort.getSortPhase().isLocal()) {
addRequestPropertyToChildren(PhysicalProperties.GATHER);
} else {
@@ -103,6 +104,16 @@ public class RequestPropertyDeriver extends
PlanVisitor<Void, PlanContext> {
return null;
}
+ @Override
+ public Void visitPhysicalLimit(PhysicalLimit<? extends Plan> limit,
PlanContext context) {
+ if (limit.isGlobal()) {
+ addRequestPropertyToChildren(PhysicalProperties.GATHER);
+ } else {
+ addRequestPropertyToChildren(PhysicalProperties.ANY);
+ }
+ return null;
+ }
+
@Override
public Void visitPhysicalHashJoin(PhysicalHashJoin<? extends Plan, ?
extends Plan> hashJoin, PlanContext context) {
JoinHint hint = hashJoin.getHint();
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/RuleType.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/RuleType.java
index cf55dedddd..13c47df6a4 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/RuleType.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/RuleType.java
@@ -188,13 +188,15 @@ public enum RuleType {
INNER_TO_CROSS_JOIN(RuleTypeClass.REWRITE),
REWRITE_SENTINEL(RuleTypeClass.REWRITE),
+ // split limit
+ SPLIT_LIMIT(RuleTypeClass.REWRITE),
// limit push down
PUSH_LIMIT_THROUGH_JOIN(RuleTypeClass.REWRITE),
PUSH_LIMIT_THROUGH_PROJECT_JOIN(RuleTypeClass.REWRITE),
PUSH_LIMIT_THROUGH_UNION(RuleTypeClass.REWRITE),
PUSH_LIMIT_THROUGH_ONE_ROW_RELATION(RuleTypeClass.REWRITE),
PUSH_LIMIT_THROUGH_EMPTY_RELATION(RuleTypeClass.REWRITE),
-
+ PUSH_LIMIT_INTO_SORT(RuleTypeClass.REWRITE),
// adjust nullable
ADJUST_NULLABLE_ON_AGGREGATE(RuleTypeClass.REWRITE),
ADJUST_NULLABLE_ON_ASSERT_NUM_ROWS(RuleTypeClass.REWRITE),
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/implementation/LogicalLimitToPhysicalLimit.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/implementation/LogicalLimitToPhysicalLimit.java
index 836d0b0344..5742cee12e 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/implementation/LogicalLimitToPhysicalLimit.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/implementation/LogicalLimitToPhysicalLimit.java
@@ -30,6 +30,7 @@ public class LogicalLimitToPhysicalLimit extends
OneImplementationRuleFactory {
return logicalLimit().then(limit -> new PhysicalLimit<>(
limit.getLimit(),
limit.getOffset(),
+ limit.getPhase(),
limit.getLogicalProperties(),
limit.child())
).toRule(RuleType.LOGICAL_LIMIT_TO_PHYSICAL_LIMIT_RULE);
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/implementation/LogicalTopNToPhysicalTopN.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/implementation/LogicalTopNToPhysicalTopN.java
index ac90ff8acc..bf675fe264 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/implementation/LogicalTopNToPhysicalTopN.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/implementation/LogicalTopNToPhysicalTopN.java
@@ -38,10 +38,16 @@ public class LogicalTopNToPhysicalTopN extends
OneImplementationRuleFactory {
.toRule(RuleType.LOGICAL_TOP_N_TO_PHYSICAL_TOP_N_RULE);
}
+ /**
+ * before: logicalTopN(off, limit)
+ * after:
+ * gatherTopN(limit, off, require gather)
+ * mergeTopN(limit, off, require gather) -> localTopN(off+limit, 0,
require any)
+ */
private List<PhysicalTopN<? extends Plan>> twoPhaseSort(LogicalTopN
logicalTopN) {
- PhysicalTopN localSort = new PhysicalTopN(logicalTopN.getOrderKeys(),
logicalTopN.getLimit(),
- logicalTopN.getOffset(), SortPhase.LOCAL_SORT,
logicalTopN.getLogicalProperties(), logicalTopN.child(0)
- );
+ PhysicalTopN localSort = new PhysicalTopN(logicalTopN.getOrderKeys(),
+ logicalTopN.getLimit() + logicalTopN.getOffset(), 0,
SortPhase.LOCAL_SORT,
+ logicalTopN.getLogicalProperties(), logicalTopN.child(0));
PhysicalTopN twoPhaseSort = new
PhysicalTopN<>(logicalTopN.getOrderKeys(), logicalTopN.getLimit(),
logicalTopN.getOffset(), SortPhase.MERGE_SORT,
logicalTopN.getLogicalProperties(), localSort);
PhysicalTopN onePhaseSort = new
PhysicalTopN<>(logicalTopN.getOrderKeys(), logicalTopN.getLimit(),
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/logical/ExistsApplyToJoin.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/logical/ExistsApplyToJoin.java
index ca776b83b7..a7447f1b35 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/logical/ExistsApplyToJoin.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/logical/ExistsApplyToJoin.java
@@ -28,6 +28,7 @@ import
org.apache.doris.nereids.trees.expressions.functions.agg.Count;
import org.apache.doris.nereids.trees.expressions.literal.IntegerLiteral;
import org.apache.doris.nereids.trees.plans.JoinHint;
import org.apache.doris.nereids.trees.plans.JoinType;
+import org.apache.doris.nereids.trees.plans.LimitPhase;
import org.apache.doris.nereids.trees.plans.Plan;
import org.apache.doris.nereids.trees.plans.logical.LogicalAggregate;
import org.apache.doris.nereids.trees.plans.logical.LogicalApply;
@@ -117,7 +118,7 @@ public class ExistsApplyToJoin extends
OneRewriteRuleFactory {
}
private Plan unCorrelatedNotExist(LogicalApply unapply) {
- LogicalLimit newLimit = new LogicalLimit<>(1, 0, (LogicalPlan)
unapply.right());
+ LogicalLimit newLimit = new LogicalLimit<>(1, 0, LimitPhase.ORIGIN,
(LogicalPlan) unapply.right());
Alias alias = new Alias(new Count(), "count(*)");
LogicalAggregate newAgg = new LogicalAggregate<>(new ArrayList<>(),
ImmutableList.of(alias), newLimit);
@@ -128,7 +129,7 @@ public class ExistsApplyToJoin extends
OneRewriteRuleFactory {
}
private Plan unCorrelatedExist(LogicalApply unapply) {
- LogicalLimit newLimit = new LogicalLimit<>(1, 0, (LogicalPlan)
unapply.right());
+ LogicalLimit newLimit = new LogicalLimit<>(1, 0, LimitPhase.ORIGIN,
(LogicalPlan) unapply.right());
return new LogicalJoin<>(JoinType.CROSS_JOIN, (LogicalPlan)
unapply.left(), newLimit);
}
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/logical/MergeLimits.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/logical/MergeLimits.java
index 4b3ec22fb0..1d6b0ab48f 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/logical/MergeLimits.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/logical/MergeLimits.java
@@ -42,13 +42,16 @@ import
org.apache.doris.nereids.trees.plans.logical.LogicalLimit;
public class MergeLimits extends OneRewriteRuleFactory {
@Override
public Rule build() {
- return logicalLimit(logicalLimit()).then(upperLimit -> {
- LogicalLimit<? extends Plan> bottomLimit = upperLimit.child();
- return new LogicalLimit<>(
- Math.min(upperLimit.getLimit(),
Math.max(bottomLimit.getLimit() - upperLimit.getOffset(), 0)),
- bottomLimit.getOffset() + upperLimit.getOffset(),
- bottomLimit.child()
- );
- }).toRule(RuleType.MERGE_LIMITS);
+ return logicalLimit(logicalLimit())
+ .when(upperLimit ->
upperLimit.getPhase().equals(upperLimit.child().getPhase()))
+ .then(upperLimit -> {
+ LogicalLimit<? extends Plan> bottomLimit =
upperLimit.child();
+ return new LogicalLimit<>(
+ Math.min(upperLimit.getLimit(),
+ Math.max(bottomLimit.getLimit() -
upperLimit.getOffset(), 0)),
+ bottomLimit.getOffset() + upperLimit.getOffset(),
+ bottomLimit.getPhase(), bottomLimit.child()
+ );
+ }).toRule(RuleType.MERGE_LIMITS);
}
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/logical/PushdownLimit.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/logical/PushdownLimit.java
index 1b2b1a4426..b5b4614410 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/logical/PushdownLimit.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/logical/PushdownLimit.java
@@ -28,6 +28,8 @@ import
org.apache.doris.nereids.trees.plans.logical.LogicalEmptyRelation;
import org.apache.doris.nereids.trees.plans.logical.LogicalJoin;
import org.apache.doris.nereids.trees.plans.logical.LogicalLimit;
import org.apache.doris.nereids.trees.plans.logical.LogicalProject;
+import org.apache.doris.nereids.trees.plans.logical.LogicalSort;
+import org.apache.doris.nereids.trees.plans.logical.LogicalTopN;
import org.apache.doris.nereids.trees.plans.logical.LogicalUnion;
import com.google.common.collect.ImmutableList;
@@ -82,6 +84,16 @@ public class PushdownLimit implements RewriteRuleFactory {
return
limit.withChildren(union.withChildren(newUnionChildren));
})
.toRule(RuleType.PUSH_LIMIT_THROUGH_UNION),
+ // limit -> sort ==> topN
+ logicalLimit(logicalSort())
+ .then(limit -> {
+ LogicalSort sort = limit.child();
+ LogicalTopN topN = new
LogicalTopN(sort.getOrderKeys(),
+ limit.getLimit(),
+ limit.getOffset(),
+ sort.child(0));
+ return topN;
+ }).toRule(RuleType.PUSH_LIMIT_INTO_SORT),
logicalLimit(logicalOneRowRelation())
.then(limit -> limit.getLimit() > 0
? limit.child() : new
LogicalEmptyRelation(limit.child().getOutput()))
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/logical/PushdownProjectThroughLimit.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/logical/PushdownProjectThroughLimit.java
index b9f0f70d2a..c1705250a5 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/logical/PushdownProjectThroughLimit.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/logical/PushdownProjectThroughLimit.java
@@ -51,8 +51,8 @@ public class PushdownProjectThroughLimit extends
OneRewriteRuleFactory {
return logicalProject(logicalLimit(any())).thenApply(ctx -> {
LogicalProject<LogicalLimit<Plan>> logicalProject = ctx.root;
LogicalLimit<Plan> logicalLimit = logicalProject.child();
- return new LogicalLimit<>(logicalLimit.getLimit(),
- logicalLimit.getOffset(), new
LogicalProject<>(logicalProject.getProjects(),
+ return new LogicalLimit<>(logicalLimit.getLimit(),
logicalLimit.getOffset(),
+ logicalLimit.getPhase(), new
LogicalProject<>(logicalProject.getProjects(),
logicalLimit.child()));
}).toRule(RuleType.PUSHDOWN_PROJECT_THROUGH_LIMIT);
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/logical/MergeLimits.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/logical/SplitLimit.java
similarity index 55%
copy from
fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/logical/MergeLimits.java
copy to
fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/logical/SplitLimit.java
index 4b3ec22fb0..abd7b0a49c 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/logical/MergeLimits.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/logical/SplitLimit.java
@@ -20,35 +20,29 @@ package org.apache.doris.nereids.rules.rewrite.logical;
import org.apache.doris.nereids.rules.Rule;
import org.apache.doris.nereids.rules.RuleType;
import org.apache.doris.nereids.rules.rewrite.OneRewriteRuleFactory;
-import org.apache.doris.nereids.trees.plans.Plan;
+import org.apache.doris.nereids.trees.plans.LimitPhase;
import org.apache.doris.nereids.trees.plans.logical.LogicalLimit;
/**
- * This rule aims to merge consecutive limits.
- * <pre>
- * input plan:
- * LIMIT1(limit=10, offset=0)
- * |
- * LIMIT2(limit=3, offset=5)
- *
- * output plan:
- * LIMIT(limit=3, offset=5)
- *
- * merged limit = min(LIMIT1.limit, LIMIT2.limit)
- * merged offset = LIMIT2.offset
- * </pre>
- * Note that the top limit should not have valid offset info.
+ * Split limit into two phase
+ * before:
+ * Limit(origin) limit, offset
+ * after:
+ * Limit(global) limit, offset
+ * |
+ * Limit(local) limit + offset, 0
*/
-public class MergeLimits extends OneRewriteRuleFactory {
+public class SplitLimit extends OneRewriteRuleFactory {
@Override
public Rule build() {
- return logicalLimit(logicalLimit()).then(upperLimit -> {
- LogicalLimit<? extends Plan> bottomLimit = upperLimit.child();
- return new LogicalLimit<>(
- Math.min(upperLimit.getLimit(),
Math.max(bottomLimit.getLimit() - upperLimit.getOffset(), 0)),
- bottomLimit.getOffset() + upperLimit.getOffset(),
- bottomLimit.child()
- );
- }).toRule(RuleType.MERGE_LIMITS);
+ return logicalLimit().when(limit -> !limit.isSplit())
+ .then(limit -> {
+ long l = limit.getLimit();
+ long o = limit.getOffset();
+ return new LogicalLimit<>(l, o,
+ LimitPhase.GLOBAL, new LogicalLimit<>(l + o, 0,
LimitPhase.LOCAL, limit.child())
+ );
+ }).toRule(RuleType.SPLIT_LIMIT);
}
}
+
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/algebra/TopN.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/LimitPhase.java
similarity index 63%
copy from
fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/algebra/TopN.java
copy to
fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/LimitPhase.java
index d79fe003ed..705c712ef4 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/algebra/TopN.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/LimitPhase.java
@@ -15,14 +15,24 @@
// specific language governing permissions and limitations
// under the License.
-package org.apache.doris.nereids.trees.plans.algebra;
+package org.apache.doris.nereids.trees.plans;
/**
- * Common interface for logical/physical TopN.
+ * Limit phase for logical and physical limit, like
+ * LocalLimit -> Gather -> GlobalLimit
+ * Origin is used to mark the limit operator that has not been split into
2-phase
*/
-public interface TopN extends Sort {
+public enum LimitPhase {
+ LOCAL("LOCAL"),
+ GLOBAL("GLOBAL"),
+ ORIGIN("ORIGIN");
+ private final String name;
- int getOffset();
+ LimitPhase(String name) {
+ this.name = name;
+ }
- int getLimit();
+ public boolean isLocal() {
+ return this == LOCAL;
+ }
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/algebra/TopN.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/algebra/TopN.java
index d79fe003ed..c214dffbbf 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/algebra/TopN.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/algebra/TopN.java
@@ -22,7 +22,7 @@ package org.apache.doris.nereids.trees.plans.algebra;
*/
public interface TopN extends Sort {
- int getOffset();
+ long getOffset();
- int getLimit();
+ long getLimit();
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalLimit.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalLimit.java
index 75ff5f1ea9..d632b959e1 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalLimit.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalLimit.java
@@ -21,6 +21,7 @@ import org.apache.doris.nereids.memo.GroupExpression;
import org.apache.doris.nereids.properties.LogicalProperties;
import org.apache.doris.nereids.trees.expressions.Expression;
import org.apache.doris.nereids.trees.expressions.Slot;
+import org.apache.doris.nereids.trees.plans.LimitPhase;
import org.apache.doris.nereids.trees.plans.Plan;
import org.apache.doris.nereids.trees.plans.PlanType;
import org.apache.doris.nereids.trees.plans.algebra.Limit;
@@ -44,19 +45,28 @@ import java.util.Optional;
* offset 100
*/
public class LogicalLimit<CHILD_TYPE extends Plan> extends
LogicalUnary<CHILD_TYPE> implements Limit {
-
+ private final LimitPhase phase;
private final long limit;
private final long offset;
- public LogicalLimit(long limit, long offset, CHILD_TYPE child) {
- this(limit, offset, Optional.empty(), Optional.empty(), child);
+ public LogicalLimit(long limit, long offset, LimitPhase phase, CHILD_TYPE
child) {
+ this(limit, offset, phase, Optional.empty(), Optional.empty(), child);
}
- public LogicalLimit(long limit, long offset, Optional<GroupExpression>
groupExpression,
+ public LogicalLimit(long limit, long offset, LimitPhase phase,
Optional<GroupExpression> groupExpression,
Optional<LogicalProperties> logicalProperties, CHILD_TYPE child) {
super(PlanType.LOGICAL_LIMIT, groupExpression, logicalProperties,
child);
this.limit = limit;
this.offset = offset;
+ this.phase = phase;
+ }
+
+ public LimitPhase getPhase() {
+ return phase;
+ }
+
+ public boolean isSplit() {
+ return phase != LimitPhase.ORIGIN;
}
public long getLimit() {
@@ -94,7 +104,7 @@ public class LogicalLimit<CHILD_TYPE extends Plan> extends
LogicalUnary<CHILD_TY
return false;
}
LogicalLimit that = (LogicalLimit) o;
- return limit == that.limit && offset == that.offset;
+ return limit == that.limit && offset == that.offset && phase ==
that.phase;
}
@Override
@@ -108,17 +118,17 @@ public class LogicalLimit<CHILD_TYPE extends Plan>
extends LogicalUnary<CHILD_TY
@Override
public Plan withGroupExpression(Optional<GroupExpression> groupExpression)
{
- return new LogicalLimit<>(limit, offset, groupExpression,
Optional.of(getLogicalProperties()), child());
+ return new LogicalLimit<>(limit, offset, phase, groupExpression,
Optional.of(getLogicalProperties()), child());
}
@Override
public Plan withLogicalProperties(Optional<LogicalProperties>
logicalProperties) {
- return new LogicalLimit<>(limit, offset, Optional.empty(),
logicalProperties, child());
+ return new LogicalLimit<>(limit, offset, phase, Optional.empty(),
logicalProperties, child());
}
@Override
public LogicalLimit<Plan> withChildren(List<Plan> children) {
Preconditions.checkArgument(children.size() == 1);
- return new LogicalLimit<>(limit, offset, children.get(0));
+ return new LogicalLimit<>(limit, offset, phase, children.get(0));
}
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalTopN.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalTopN.java
index da78e27cef..cb07601ffa 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalTopN.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalTopN.java
@@ -41,17 +41,17 @@ import java.util.Optional;
public class LogicalTopN<CHILD_TYPE extends Plan> extends
LogicalUnary<CHILD_TYPE> implements TopN {
private final List<OrderKey> orderKeys;
- private final int limit;
- private final int offset;
+ private final long limit;
+ private final long offset;
- public LogicalTopN(List<OrderKey> orderKeys, int limit, int offset,
CHILD_TYPE child) {
+ public LogicalTopN(List<OrderKey> orderKeys, long limit, long offset,
CHILD_TYPE child) {
this(orderKeys, limit, offset, Optional.empty(), Optional.empty(),
child);
}
/**
* Constructor for LogicalSort.
*/
- public LogicalTopN(List<OrderKey> orderKeys, int limit, int offset,
Optional<GroupExpression> groupExpression,
+ public LogicalTopN(List<OrderKey> orderKeys, long limit, long offset,
Optional<GroupExpression> groupExpression,
Optional<LogicalProperties> logicalProperties, CHILD_TYPE child) {
super(PlanType.LOGICAL_TOP_N, groupExpression, logicalProperties,
child);
this.orderKeys =
ImmutableList.copyOf(Objects.requireNonNull(orderKeys, "orderKeys can not be
null"));
@@ -68,11 +68,11 @@ public class LogicalTopN<CHILD_TYPE extends Plan> extends
LogicalUnary<CHILD_TYP
return orderKeys;
}
- public int getOffset() {
+ public long getOffset() {
return offset;
}
- public int getLimit() {
+ public long getLimit() {
return limit;
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalLimit.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalLimit.java
index dfab80443c..8d803d0f88 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalLimit.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalLimit.java
@@ -21,6 +21,7 @@ import org.apache.doris.nereids.memo.GroupExpression;
import org.apache.doris.nereids.properties.LogicalProperties;
import org.apache.doris.nereids.properties.PhysicalProperties;
import org.apache.doris.nereids.trees.expressions.Expression;
+import org.apache.doris.nereids.trees.plans.LimitPhase;
import org.apache.doris.nereids.trees.plans.Plan;
import org.apache.doris.nereids.trees.plans.PlanType;
import org.apache.doris.nereids.trees.plans.algebra.Limit;
@@ -39,14 +40,14 @@ import java.util.Optional;
* Physical limit plan
*/
public class PhysicalLimit<CHILD_TYPE extends Plan> extends
PhysicalUnary<CHILD_TYPE> implements Limit {
-
+ private final LimitPhase phase;
private final long limit;
private final long offset;
public PhysicalLimit(long limit, long offset,
- LogicalProperties logicalProperties,
+ LimitPhase phase, LogicalProperties logicalProperties,
CHILD_TYPE child) {
- this(limit, offset, Optional.empty(), logicalProperties, child);
+ this(limit, offset, phase, Optional.empty(), logicalProperties, child);
}
/**
@@ -57,11 +58,12 @@ public class PhysicalLimit<CHILD_TYPE extends Plan> extends
PhysicalUnary<CHILD_
* @param offset the number of tuples skipped.
*/
public PhysicalLimit(long limit, long offset,
- Optional<GroupExpression> groupExpression, LogicalProperties
logicalProperties,
+ LimitPhase phase, Optional<GroupExpression> groupExpression,
LogicalProperties logicalProperties,
CHILD_TYPE child) {
super(PlanType.PHYSICAL_LIMIT, groupExpression, logicalProperties,
child);
this.limit = limit;
this.offset = offset;
+ this.phase = phase;
}
/**
@@ -70,14 +72,16 @@ public class PhysicalLimit<CHILD_TYPE extends Plan> extends
PhysicalUnary<CHILD_
*
* @param limit the number of tuples retrieved.
* @param offset the number of tuples skipped.
+ * @param phase the phase of 2-phase limit.
*/
- public PhysicalLimit(long limit, long offset, Optional<GroupExpression>
groupExpression,
+ public PhysicalLimit(long limit, long offset, LimitPhase phase,
Optional<GroupExpression> groupExpression,
LogicalProperties logicalProperties, PhysicalProperties
physicalProperties,
StatsDeriveResult statsDeriveResult, CHILD_TYPE child) {
super(PlanType.PHYSICAL_LIMIT, groupExpression, logicalProperties,
physicalProperties, statsDeriveResult,
child);
this.limit = limit;
this.offset = offset;
+ this.phase = phase;
}
public long getLimit() {
@@ -88,10 +92,18 @@ public class PhysicalLimit<CHILD_TYPE extends Plan> extends
PhysicalUnary<CHILD_
return offset;
}
+ public LimitPhase getPhase() {
+ return phase;
+ }
+
+ public boolean isGlobal() {
+ return phase == LimitPhase.GLOBAL;
+ }
+
@Override
public Plan withChildren(List<Plan> children) {
Preconditions.checkArgument(children.size() == 1);
- return new PhysicalLimit<>(limit, offset, getLogicalProperties(),
children.get(0));
+ return new PhysicalLimit<>(limit, offset, phase,
getLogicalProperties(), children.get(0));
}
@Override
@@ -101,18 +113,18 @@ public class PhysicalLimit<CHILD_TYPE extends Plan>
extends PhysicalUnary<CHILD_
@Override
public PhysicalLimit<CHILD_TYPE>
withGroupExpression(Optional<GroupExpression> groupExpression) {
- return new PhysicalLimit<>(limit, offset, groupExpression,
getLogicalProperties(), child());
+ return new PhysicalLimit<>(limit, offset, phase, groupExpression,
getLogicalProperties(), child());
}
@Override
public PhysicalLimit<CHILD_TYPE>
withLogicalProperties(Optional<LogicalProperties> logicalProperties) {
- return new PhysicalLimit<>(limit, offset, logicalProperties.get(),
child());
+ return new PhysicalLimit<>(limit, offset, phase,
logicalProperties.get(), child());
}
@Override
public PhysicalLimit<CHILD_TYPE>
withPhysicalPropertiesAndStats(PhysicalProperties physicalProperties,
StatsDeriveResult statsDeriveResult) {
- return new PhysicalLimit<>(limit, offset, groupExpression,
getLogicalProperties(), physicalProperties,
+ return new PhysicalLimit<>(limit, offset, phase, groupExpression,
getLogicalProperties(), physicalProperties,
statsDeriveResult, child());
}
@@ -125,7 +137,7 @@ public class PhysicalLimit<CHILD_TYPE extends Plan> extends
PhysicalUnary<CHILD_
return false;
}
PhysicalLimit that = (PhysicalLimit) o;
- return offset == that.offset && limit == that.limit;
+ return offset == that.offset && limit == that.limit && phase ==
that.phase;
}
@Override
@@ -143,6 +155,7 @@ public class PhysicalLimit<CHILD_TYPE extends Plan> extends
PhysicalUnary<CHILD_
return Utils.toSqlString("PhysicalLimit",
"limit", limit,
"offset", offset,
+ "phase", phase,
"stats", statsDeriveResult
);
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalTopN.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalTopN.java
index 13f52eb103..dc01b0332a 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalTopN.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalTopN.java
@@ -40,10 +40,10 @@ import java.util.Optional;
*/
public class PhysicalTopN<CHILD_TYPE extends Plan> extends
AbstractPhysicalSort<CHILD_TYPE> implements TopN {
- private final int limit;
- private final int offset;
+ private final long limit;
+ private final long offset;
- public PhysicalTopN(List<OrderKey> orderKeys, int limit, int offset,
+ public PhysicalTopN(List<OrderKey> orderKeys, long limit, long offset,
SortPhase phase, LogicalProperties logicalProperties, CHILD_TYPE
child) {
this(orderKeys, limit, offset, phase, Optional.empty(),
logicalProperties, child);
}
@@ -51,7 +51,7 @@ public class PhysicalTopN<CHILD_TYPE extends Plan> extends
AbstractPhysicalSort<
/**
* Constructor of PhysicalHashJoinNode.
*/
- public PhysicalTopN(List<OrderKey> orderKeys, int limit, int offset,
+ public PhysicalTopN(List<OrderKey> orderKeys, long limit, long offset,
SortPhase phase, Optional<GroupExpression> groupExpression,
LogicalProperties logicalProperties,
CHILD_TYPE child) {
super(PlanType.PHYSICAL_TOP_N, orderKeys, phase, groupExpression,
logicalProperties, child);
@@ -63,7 +63,7 @@ public class PhysicalTopN<CHILD_TYPE extends Plan> extends
AbstractPhysicalSort<
/**
* Constructor of PhysicalHashJoinNode.
*/
- public PhysicalTopN(List<OrderKey> orderKeys, int limit, int offset,
+ public PhysicalTopN(List<OrderKey> orderKeys, long limit, long offset,
SortPhase phase, Optional<GroupExpression> groupExpression,
LogicalProperties logicalProperties,
PhysicalProperties physicalProperties, StatsDeriveResult
statsDeriveResult, CHILD_TYPE child) {
super(PlanType.PHYSICAL_TOP_N, orderKeys, phase, groupExpression,
logicalProperties, physicalProperties,
@@ -73,11 +73,11 @@ public class PhysicalTopN<CHILD_TYPE extends Plan> extends
AbstractPhysicalSort<
this.offset = offset;
}
- public int getLimit() {
+ public long getLimit() {
return limit;
}
- public int getOffset() {
+ public long getOffset() {
return offset;
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/visitor/PlanVisitor.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/visitor/PlanVisitor.java
index de6c98f2f7..3cda38d6fb 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/visitor/PlanVisitor.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/visitor/PlanVisitor.java
@@ -317,7 +317,7 @@ public abstract class PlanVisitor<R, C> {
}
public R visitPhysicalTopN(PhysicalTopN<? extends Plan> topN, C context) {
- return visit(topN, context);
+ return visitAbstractPhysicalSort(topN, context);
}
public R visitPhysicalLimit(PhysicalLimit<? extends Plan> limit, C
context) {
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/nereids/memo/MemoTest.java
b/fe/fe-core/src/test/java/org/apache/doris/nereids/memo/MemoTest.java
index e268f5a091..40d9d6289a 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/nereids/memo/MemoTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/nereids/memo/MemoTest.java
@@ -31,6 +31,7 @@ import org.apache.doris.nereids.trees.plans.FakePlan;
import org.apache.doris.nereids.trees.plans.GroupPlan;
import org.apache.doris.nereids.trees.plans.JoinType;
import org.apache.doris.nereids.trees.plans.LeafPlan;
+import org.apache.doris.nereids.trees.plans.LimitPhase;
import org.apache.doris.nereids.trees.plans.Plan;
import org.apache.doris.nereids.trees.plans.logical.LogicalFilter;
import org.apache.doris.nereids.trees.plans.logical.LogicalJoin;
@@ -361,7 +362,7 @@ class MemoTest implements MemoPatternMatchSupported {
@Test
public void a2bc() {
LogicalOlapScan student = new
LogicalOlapScan(RelationUtil.newRelationId(), PlanConstructor.student);
- LogicalLimit<LogicalOlapScan> limit = new LogicalLimit<>(1, 0,
student);
+ LogicalLimit<? extends Plan> limit = new LogicalLimit<>(1, 0,
LimitPhase.ORIGIN, student);
PlanChecker.from(connectContext, new
UnboundRelation(RelationUtil.newRelationId(), ImmutableList.of("student")))
.applyBottomUp(
@@ -396,7 +397,7 @@ class MemoTest implements MemoPatternMatchSupported {
// invalid case
Assertions.assertThrows(IllegalStateException.class, () -> {
UnboundRelation student = new
UnboundRelation(RelationUtil.newRelationId(), ImmutableList.of("student"));
- LogicalLimit<UnboundRelation> limit = new LogicalLimit<>(1, 0,
student);
+ LogicalLimit<? extends Plan> limit = new LogicalLimit<>(1, 0,
LimitPhase.ORIGIN, student);
PlanChecker.from(connectContext, student)
.applyBottomUp(
@@ -414,7 +415,7 @@ class MemoTest implements MemoPatternMatchSupported {
UnboundRelation a = new UnboundRelation(RelationUtil.newRelationId(),
ImmutableList.of("student"));
UnboundRelation a2 = new UnboundRelation(RelationUtil.newRelationId(),
ImmutableList.of("student"));
- LogicalLimit<UnboundRelation> limit = new LogicalLimit<>(1, 0, a2);
+ LogicalLimit<UnboundRelation> limit = new LogicalLimit<>(1, 0,
LimitPhase.ORIGIN, a2);
PlanChecker.from(connectContext, a)
.setMaxInvokeTimesPerRule(1000)
.applyBottomUp(
@@ -479,8 +480,8 @@ class MemoTest implements MemoPatternMatchSupported {
@Test
public void a2bcd() {
LogicalOlapScan scan = new
LogicalOlapScan(RelationUtil.newRelationId(), PlanConstructor.student);
- LogicalLimit<LogicalOlapScan> limit5 = new LogicalLimit<>(5, 0, scan);
- LogicalLimit<LogicalLimit<LogicalOlapScan>> limit10 = new
LogicalLimit<>(10, 0, limit5);
+ LogicalLimit<LogicalOlapScan> limit5 = new LogicalLimit<>(5, 0,
LimitPhase.ORIGIN, scan);
+ LogicalLimit<LogicalLimit<LogicalOlapScan>> limit10 = new
LogicalLimit<>(10, 0, LimitPhase.ORIGIN, limit5);
PlanChecker.from(connectContext, limit10)
.applyBottomUp(
@@ -507,7 +508,7 @@ class MemoTest implements MemoPatternMatchSupported {
@Test
public void ab2a() {
LogicalOlapScan student = new
LogicalOlapScan(RelationUtil.newRelationId(), PlanConstructor.student);
- LogicalLimit<LogicalOlapScan> limit10 = new LogicalLimit<>(10, 0,
student);
+ LogicalLimit<LogicalOlapScan> limit10 = new LogicalLimit<>(10, 0,
LimitPhase.ORIGIN, student);
PlanChecker.from(connectContext, limit10)
.applyBottomUp(
@@ -531,7 +532,7 @@ class MemoTest implements MemoPatternMatchSupported {
@Test
public void ab2NewA() {
LogicalOlapScan student = new
LogicalOlapScan(RelationUtil.newRelationId(), PlanConstructor.student);
- LogicalLimit<LogicalOlapScan> limit10 = new LogicalLimit<>(10, 0,
student);
+ LogicalLimit<LogicalOlapScan> limit10 = new LogicalLimit<>(10, 0,
LimitPhase.ORIGIN, student);
PlanChecker.from(connectContext, limit10)
.applyBottomUp(
@@ -555,7 +556,7 @@ class MemoTest implements MemoPatternMatchSupported {
@Test
public void ab2GroupB() {
LogicalOlapScan student = new
LogicalOlapScan(RelationUtil.newRelationId(), PlanConstructor.student);
- LogicalLimit<LogicalOlapScan> limit10 = new LogicalLimit<>(10, 0,
student);
+ LogicalLimit<LogicalOlapScan> limit10 = new LogicalLimit<>(10, 0,
LimitPhase.ORIGIN, student);
PlanChecker.from(connectContext, limit10)
.applyBottomUp(
@@ -577,7 +578,7 @@ class MemoTest implements MemoPatternMatchSupported {
@Test
public void ab2PlanB() {
LogicalOlapScan student = new
LogicalOlapScan(RelationUtil.newRelationId(), PlanConstructor.student);
- LogicalLimit<LogicalOlapScan> limit10 = new LogicalLimit<>(10, 0,
student);
+ LogicalLimit<LogicalOlapScan> limit10 = new LogicalLimit<>(10, 0,
LimitPhase.ORIGIN, student);
PlanChecker.from(connectContext, limit10)
.applyBottomUp(
@@ -599,7 +600,7 @@ class MemoTest implements MemoPatternMatchSupported {
@Test
public void ab2c() {
UnboundRelation relation = new
UnboundRelation(RelationUtil.newRelationId(), ImmutableList.of("student"));
- LogicalLimit<UnboundRelation> limit10 = new LogicalLimit<>(10, 0,
relation);
+ LogicalLimit<UnboundRelation> limit10 = new LogicalLimit<>(10, 0,
LimitPhase.ORIGIN, relation);
LogicalOlapScan student = new
LogicalOlapScan(RelationUtil.newRelationId(), PlanConstructor.student);
PlanChecker.from(connectContext, limit10)
@@ -622,10 +623,10 @@ class MemoTest implements MemoPatternMatchSupported {
@Test
public void ab2cd() {
UnboundRelation relation = new
UnboundRelation(RelationUtil.newRelationId(), ImmutableList.of("student"));
- LogicalLimit<UnboundRelation> limit10 = new LogicalLimit<>(10, 0,
relation);
+ LogicalLimit<UnboundRelation> limit10 = new LogicalLimit<>(10, 0,
LimitPhase.ORIGIN, relation);
LogicalOlapScan student = new
LogicalOlapScan(RelationUtil.newRelationId(), PlanConstructor.student);
- LogicalLimit<LogicalOlapScan> limit5 = new LogicalLimit<>(5, 0,
student);
+ LogicalLimit<LogicalOlapScan> limit5 = new LogicalLimit<>(5, 0,
LimitPhase.ORIGIN, student);
PlanChecker.from(connectContext, limit10)
.applyBottomUp(
@@ -650,8 +651,8 @@ class MemoTest implements MemoPatternMatchSupported {
@Test
public void ab2cb() {
LogicalOlapScan student = new
LogicalOlapScan(RelationUtil.newRelationId(), PlanConstructor.student);
- LogicalLimit<LogicalOlapScan> limit10 = new LogicalLimit<>(10, 0,
student);
- LogicalLimit<LogicalOlapScan> limit5 = new LogicalLimit<>(5, 0,
student);
+ LogicalLimit<LogicalOlapScan> limit10 = new LogicalLimit<>(10, 0,
LimitPhase.ORIGIN, student);
+ LogicalLimit<LogicalOlapScan> limit5 = new LogicalLimit<>(5, 0,
LimitPhase.ORIGIN, student);
PlanChecker.from(connectContext, limit10)
.applyBottomUp(
@@ -681,7 +682,7 @@ class MemoTest implements MemoPatternMatchSupported {
Assertions.assertThrowsExactly(IllegalStateException.class, () -> {
LogicalOlapScan student = new
LogicalOlapScan(RelationUtil.newRelationId(), PlanConstructor.student);
- LogicalLimit<LogicalOlapScan> limit10 = new LogicalLimit<>(10, 0,
student);
+ LogicalLimit<LogicalOlapScan> limit10 = new LogicalLimit<>(10, 0,
LimitPhase.ORIGIN, student);
PlanChecker.from(connectContext, limit10)
.setMaxInvokeTimesPerRule(1000)
@@ -707,8 +708,8 @@ class MemoTest implements MemoPatternMatchSupported {
Assertions.assertThrowsExactly(IllegalStateException.class, () -> {
UnboundRelation student = new
UnboundRelation(RelationUtil.newRelationId(), ImmutableList.of("student"));
- LogicalLimit<UnboundRelation> limit5 = new LogicalLimit<>(5, 0,
student);
- LogicalLimit<LogicalLimit<UnboundRelation>> limit10 = new
LogicalLimit<>(10, 0, limit5);
+ LogicalLimit<UnboundRelation> limit5 = new LogicalLimit<>(5, 0,
LimitPhase.ORIGIN, student);
+ LogicalLimit<LogicalLimit<UnboundRelation>> limit10 = new
LogicalLimit<>(10, 0, LimitPhase.ORIGIN, limit5);
PlanChecker.from(connectContext, limit10)
.applyBottomUp(
@@ -733,11 +734,11 @@ class MemoTest implements MemoPatternMatchSupported {
@Test
public void ab2cde() {
UnboundRelation student = new
UnboundRelation(RelationUtil.newRelationId(), ImmutableList.of("student"));
- LogicalLimit<UnboundRelation> limit3 = new LogicalLimit<>(3, 0,
student);
+ LogicalLimit<UnboundRelation> limit3 = new LogicalLimit<>(3, 0,
LimitPhase.ORIGIN, student);
LogicalOlapScan scan = new
LogicalOlapScan(RelationUtil.newRelationId(), PlanConstructor.student);
- LogicalLimit<LogicalOlapScan> limit5 = new LogicalLimit<>(5, 0, scan);
- LogicalLimit<LogicalLimit<LogicalOlapScan>> limit10 = new
LogicalLimit<>(10, 0, limit5);
+ LogicalLimit<LogicalOlapScan> limit5 = new LogicalLimit<>(5, 0,
LimitPhase.ORIGIN, scan);
+ LogicalLimit<LogicalLimit<LogicalOlapScan>> limit10 = new
LogicalLimit<>(10, 0, LimitPhase.ORIGIN, limit5);
PlanChecker.from(connectContext, limit3)
.applyBottomUp(
@@ -766,8 +767,8 @@ class MemoTest implements MemoPatternMatchSupported {
public void abc2bac() {
UnboundRelation student = new
UnboundRelation(RelationUtil.newRelationId(), ImmutableList.of("student"));
- LogicalLimit<UnboundRelation> limit5 = new LogicalLimit<>(5, 0,
student);
- LogicalLimit<LogicalLimit<UnboundRelation>> limit10 = new
LogicalLimit<>(10, 0, limit5);
+ LogicalLimit<UnboundRelation> limit5 = new LogicalLimit<>(5, 0,
LimitPhase.ORIGIN, student);
+ LogicalLimit<LogicalLimit<UnboundRelation>> limit10 = new
LogicalLimit<>(10, 0, LimitPhase.ORIGIN, limit5);
PlanChecker.from(connectContext, limit10)
.applyBottomUp(
@@ -805,8 +806,8 @@ class MemoTest implements MemoPatternMatchSupported {
public void abc2bc() {
UnboundRelation student = new
UnboundRelation(RelationUtil.newRelationId(), ImmutableList.of("student"));
- LogicalLimit<UnboundRelation> limit5 = new LogicalLimit<>(5, 0,
student);
- LogicalLimit<LogicalLimit<UnboundRelation>> limit10 = new
LogicalLimit<>(10, 0, limit5);
+ LogicalLimit<UnboundRelation> limit5 = new LogicalLimit<>(5, 0,
LimitPhase.ORIGIN, student);
+ LogicalLimit<LogicalLimit<UnboundRelation>> limit10 = new
LogicalLimit<>(10, 0, LimitPhase.ORIGIN, limit5);
PlanChecker.from(connectContext, limit10)
.applyBottomUp(
@@ -829,7 +830,7 @@ class MemoTest implements MemoPatternMatchSupported {
@Test
public void testRewriteBottomPlanToOnePlan() {
LogicalOlapScan student = new
LogicalOlapScan(RelationUtil.newRelationId(), PlanConstructor.student);
- LogicalLimit<LogicalOlapScan> limit = new LogicalLimit<>(1, 0,
student);
+ LogicalLimit<LogicalOlapScan> limit = new LogicalLimit<>(1, 0,
LimitPhase.ORIGIN, student);
LogicalOlapScan score = new
LogicalOlapScan(RelationUtil.newRelationId(), PlanConstructor.score);
@@ -848,10 +849,10 @@ class MemoTest implements MemoPatternMatchSupported {
@Test
public void testRewriteBottomPlanToMultiPlan() {
LogicalOlapScan student = new
LogicalOlapScan(RelationUtil.newRelationId(), PlanConstructor.student);
- LogicalLimit<LogicalOlapScan> limit10 = new LogicalLimit<>(10, 0,
student);
+ LogicalLimit<LogicalOlapScan> limit10 = new LogicalLimit<>(10, 0,
LimitPhase.ORIGIN, student);
LogicalOlapScan score = new
LogicalOlapScan(RelationUtil.newRelationId(), PlanConstructor.score);
- LogicalLimit<LogicalOlapScan> limit1 = new LogicalLimit<>(1, 0, score);
+ LogicalLimit<LogicalOlapScan> limit1 = new LogicalLimit<>(1, 0,
LimitPhase.ORIGIN, score);
PlanChecker.from(connectContext, limit10)
.applyBottomUp(
@@ -892,7 +893,7 @@ class MemoTest implements MemoPatternMatchSupported {
@Test
public void testRecomputeLogicalProperties() {
UnboundRelation unboundTable = new
UnboundRelation(RelationUtil.newRelationId(), ImmutableList.of("score"));
- LogicalLimit<UnboundRelation> unboundLimit = new LogicalLimit<>(1, 0,
unboundTable);
+ LogicalLimit<UnboundRelation> unboundLimit = new LogicalLimit<>(1, 0,
LimitPhase.ORIGIN, unboundTable);
LogicalOlapScan boundTable = new
LogicalOlapScan(RelationUtil.newRelationId(), PlanConstructor.score);
LogicalLimit<Plan> boundLimit =
unboundLimit.withChildren(ImmutableList.of(boundTable));
@@ -924,7 +925,7 @@ class MemoTest implements MemoPatternMatchSupported {
@Test
public void testEliminateRootWithChildGroupInTwoLevels() {
LogicalOlapScan scan = new
LogicalOlapScan(RelationUtil.newRelationId(), PlanConstructor.score);
- LogicalLimit<Plan> limit = new LogicalLimit<>(1, 0, scan);
+ LogicalLimit<Plan> limit = new LogicalLimit<>(1, 0, LimitPhase.ORIGIN,
scan);
PlanChecker.from(connectContext, limit)
.applyBottomUp(logicalLimit().then(LogicalLimit::child))
@@ -936,7 +937,7 @@ class MemoTest implements MemoPatternMatchSupported {
@Test
public void testEliminateRootWithChildPlanInTwoLevels() {
LogicalOlapScan scan = new
LogicalOlapScan(RelationUtil.newRelationId(), PlanConstructor.score);
- LogicalLimit<Plan> limit = new LogicalLimit<>(1, 0, scan);
+ LogicalLimit<Plan> limit = new LogicalLimit<>(1, 0, LimitPhase.ORIGIN,
scan);
PlanChecker.from(connectContext, limit)
.applyBottomUp(logicalLimit(any()).then(LogicalLimit::child))
@@ -948,7 +949,7 @@ class MemoTest implements MemoPatternMatchSupported {
@Test
public void testEliminateTwoLevelsToOnePlan() {
LogicalOlapScan score = new
LogicalOlapScan(RelationUtil.newRelationId(), PlanConstructor.score);
- LogicalLimit<Plan> limit = new LogicalLimit<>(1, 0, score);
+ LogicalLimit<Plan> limit = new LogicalLimit<>(1, 0, LimitPhase.ORIGIN,
score);
LogicalOlapScan student = new
LogicalOlapScan(RelationUtil.newRelationId(), PlanConstructor.student);
@@ -968,10 +969,10 @@ class MemoTest implements MemoPatternMatchSupported {
@Test
public void testEliminateTwoLevelsToTwoPlans() {
LogicalOlapScan score = new
LogicalOlapScan(RelationUtil.newRelationId(), PlanConstructor.score);
- LogicalLimit<Plan> limit1 = new LogicalLimit<>(1, 0, score);
+ LogicalLimit<Plan> limit1 = new LogicalLimit<>(1, 0,
LimitPhase.ORIGIN, score);
LogicalOlapScan student = new
LogicalOlapScan(RelationUtil.newRelationId(), PlanConstructor.student);
- LogicalLimit<Plan> limit10 = new LogicalLimit<>(10, 0, student);
+ LogicalLimit<Plan> limit10 = new LogicalLimit<>(10, 0,
LimitPhase.ORIGIN, student);
PlanChecker.from(connectContext, limit1)
.applyBottomUp(logicalLimit(any()).when(limit1::equals).then(l
-> limit10))
@@ -998,7 +999,7 @@ class MemoTest implements MemoPatternMatchSupported {
public void test() {
PlanChecker.from(MemoTestUtils.createConnectContext())
.analyze(new LogicalLimit<>(10, 0,
- new LogicalJoin<>(JoinType.LEFT_OUTER_JOIN,
+ LimitPhase.ORIGIN, new
LogicalJoin<>(JoinType.LEFT_OUTER_JOIN,
ImmutableList.of(new EqualTo(new
UnboundSlot("sid"), new UnboundSlot("id"))),
new
LogicalOlapScan(RelationUtil.newRelationId(), PlanConstructor.score),
new
LogicalOlapScan(RelationUtil.newRelationId(), PlanConstructor.student)
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/nereids/properties/ChildOutputPropertyDeriverTest.java
b/fe/fe-core/src/test/java/org/apache/doris/nereids/properties/ChildOutputPropertyDeriverTest.java
index 9cd2888b38..6d7b2413ff 100644
---
a/fe/fe-core/src/test/java/org/apache/doris/nereids/properties/ChildOutputPropertyDeriverTest.java
+++
b/fe/fe-core/src/test/java/org/apache/doris/nereids/properties/ChildOutputPropertyDeriverTest.java
@@ -33,6 +33,7 @@ import org.apache.doris.nereids.trees.plans.AggPhase;
import org.apache.doris.nereids.trees.plans.GroupPlan;
import org.apache.doris.nereids.trees.plans.JoinHint;
import org.apache.doris.nereids.trees.plans.JoinType;
+import org.apache.doris.nereids.trees.plans.LimitPhase;
import org.apache.doris.nereids.trees.plans.Plan;
import org.apache.doris.nereids.trees.plans.SortPhase;
import org.apache.doris.nereids.trees.plans.physical.AbstractPhysicalJoin;
@@ -385,6 +386,7 @@ public class ChildOutputPropertyDeriverTest {
public void testTopN() {
SlotReference key = new SlotReference("col1", IntegerType.INSTANCE);
List<OrderKey> orderKeys = Lists.newArrayList(new OrderKey(key, true,
true));
+ // localSort require any
PhysicalTopN<GroupPlan> sort = new PhysicalTopN<>(orderKeys, 10, 10,
SortPhase.LOCAL_SORT, logicalProperties, groupPlan);
GroupExpression groupExpression = new GroupExpression(sort);
PhysicalProperties child = new
PhysicalProperties(DistributionSpecReplicated.INSTANCE,
@@ -394,6 +396,17 @@ public class ChildOutputPropertyDeriverTest {
ChildOutputPropertyDeriver deriver = new
ChildOutputPropertyDeriver(Lists.newArrayList(child));
PhysicalProperties result =
deriver.getOutputProperties(groupExpression);
Assertions.assertEquals(orderKeys,
result.getOrderSpec().getOrderKeys());
+ Assertions.assertEquals(DistributionSpecReplicated.INSTANCE,
result.getDistributionSpec());
+ // merge/gather sort requires gather
+ sort = new PhysicalTopN<>(orderKeys, 10, 10, SortPhase.MERGE_SORT,
logicalProperties, groupPlan);
+ groupExpression = new GroupExpression(sort);
+ child = new PhysicalProperties(DistributionSpecReplicated.INSTANCE,
+ new OrderSpec(Lists.newArrayList(
+ new OrderKey(new SlotReference("ignored",
IntegerType.INSTANCE), true, true))));
+
+ deriver = new ChildOutputPropertyDeriver(Lists.newArrayList(child));
+ result = deriver.getOutputProperties(groupExpression);
+ Assertions.assertEquals(orderKeys,
result.getOrderSpec().getOrderKeys());
Assertions.assertEquals(DistributionSpecGather.INSTANCE,
result.getDistributionSpec());
}
@@ -401,7 +414,7 @@ public class ChildOutputPropertyDeriverTest {
public void testLimit() {
SlotReference key = new SlotReference("col1", IntegerType.INSTANCE);
List<OrderKey> orderKeys = Lists.newArrayList(new OrderKey(key, true,
true));
- PhysicalLimit<GroupPlan> limit = new PhysicalLimit<>(10, 10,
logicalProperties, groupPlan);
+ PhysicalLimit<GroupPlan> limit = new PhysicalLimit<>(10, 10,
LimitPhase.ORIGIN, logicalProperties, groupPlan);
GroupExpression groupExpression = new GroupExpression(limit);
PhysicalProperties child = new
PhysicalProperties(DistributionSpecReplicated.INSTANCE,
new OrderSpec(orderKeys));
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/nereids/rules/implementation/ImplementationTest.java
b/fe/fe-core/src/test/java/org/apache/doris/nereids/rules/implementation/ImplementationTest.java
index 1e7f72dd2e..10dfea7032 100644
---
a/fe/fe-core/src/test/java/org/apache/doris/nereids/rules/implementation/ImplementationTest.java
+++
b/fe/fe-core/src/test/java/org/apache/doris/nereids/rules/implementation/ImplementationTest.java
@@ -22,6 +22,7 @@ import org.apache.doris.nereids.properties.OrderKey;
import org.apache.doris.nereids.rules.Rule;
import org.apache.doris.nereids.trees.expressions.SlotReference;
import org.apache.doris.nereids.trees.plans.GroupPlan;
+import org.apache.doris.nereids.trees.plans.LimitPhase;
import org.apache.doris.nereids.trees.plans.Plan;
import org.apache.doris.nereids.trees.plans.PlanType;
import org.apache.doris.nereids.trees.plans.logical.LogicalFilter;
@@ -108,7 +109,7 @@ public class ImplementationTest {
public void toPhysicalLimitTest() {
int limit = 10;
int offset = 100;
- LogicalLimit<GroupPlan> logicalLimit = new LogicalLimit<>(limit,
offset, groupPlan);
+ LogicalLimit<? extends Plan> logicalLimit = new LogicalLimit<>(limit,
offset, LimitPhase.LOCAL, groupPlan);
PhysicalPlan physicalPlan = executeImplementationRule(logicalLimit);
Assertions.assertEquals(PlanType.PHYSICAL_LIMIT,
physicalPlan.getType());
PhysicalLimit<GroupPlan> physicalLimit = (PhysicalLimit<GroupPlan>)
physicalPlan;
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/nereids/rules/rewrite/logical/EliminateLimitTest.java
b/fe/fe-core/src/test/java/org/apache/doris/nereids/rules/rewrite/logical/EliminateLimitTest.java
index 77cb6df00f..44a88ac0c8 100644
---
a/fe/fe-core/src/test/java/org/apache/doris/nereids/rules/rewrite/logical/EliminateLimitTest.java
+++
b/fe/fe-core/src/test/java/org/apache/doris/nereids/rules/rewrite/logical/EliminateLimitTest.java
@@ -18,12 +18,17 @@
package org.apache.doris.nereids.rules.rewrite.logical;
import org.apache.doris.nereids.CascadesContext;
+import org.apache.doris.nereids.properties.OrderKey;
import org.apache.doris.nereids.rules.Rule;
+import org.apache.doris.nereids.trees.plans.LimitPhase;
import org.apache.doris.nereids.trees.plans.Plan;
import org.apache.doris.nereids.trees.plans.logical.LogicalEmptyRelation;
import org.apache.doris.nereids.trees.plans.logical.LogicalLimit;
import org.apache.doris.nereids.trees.plans.logical.LogicalOlapScan;
+import org.apache.doris.nereids.trees.plans.logical.LogicalSort;
+import org.apache.doris.nereids.trees.plans.logical.LogicalTopN;
import org.apache.doris.nereids.util.MemoTestUtils;
+import org.apache.doris.nereids.util.PlanChecker;
import org.apache.doris.nereids.util.PlanConstructor;
import com.google.common.collect.Lists;
@@ -31,6 +36,7 @@ import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import java.util.List;
+import java.util.stream.Collectors;
/**
* MergeConsecutiveFilter ut
@@ -39,7 +45,7 @@ public class EliminateLimitTest {
@Test
public void testEliminateLimit() {
LogicalOlapScan scan = PlanConstructor.newLogicalOlapScan(0, "t1", 0);
- LogicalLimit<LogicalOlapScan> limit = new LogicalLimit<>(0, 0, scan);
+ LogicalLimit<LogicalOlapScan> limit = new LogicalLimit<>(0, 0,
LimitPhase.ORIGIN, scan);
CascadesContext cascadesContext =
MemoTestUtils.createCascadesContext(limit);
List<Rule> rules = Lists.newArrayList(new EliminateLimit().build());
@@ -48,4 +54,17 @@ public class EliminateLimitTest {
Plan actual = cascadesContext.getMemo().copyOut();
Assertions.assertTrue(actual instanceof LogicalEmptyRelation);
}
+
+ @Test
+ public void testLimitSort() {
+ LogicalOlapScan scan = PlanConstructor.newLogicalOlapScan(0, "t1", 0);
+ LogicalLimit limit = new LogicalLimit<>(1, 1, LimitPhase.ORIGIN,
+ new LogicalSort<>(scan.getOutput().stream().map(c -> new
OrderKey(c, true, true)).collect(Collectors.toList()),
+ scan));
+
+ Plan actual = PlanChecker.from(MemoTestUtils.createConnectContext(),
limit)
+ .rewrite()
+ .getPlan();
+ Assertions.assertTrue(actual instanceof LogicalTopN);
+ }
}
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/nereids/rules/rewrite/logical/MergeLimitsTest.java
b/fe/fe-core/src/test/java/org/apache/doris/nereids/rules/rewrite/logical/MergeLimitsTest.java
index fa7270def9..869dec982f 100644
---
a/fe/fe-core/src/test/java/org/apache/doris/nereids/rules/rewrite/logical/MergeLimitsTest.java
+++
b/fe/fe-core/src/test/java/org/apache/doris/nereids/rules/rewrite/logical/MergeLimitsTest.java
@@ -20,6 +20,7 @@ package org.apache.doris.nereids.rules.rewrite.logical;
import org.apache.doris.nereids.CascadesContext;
import org.apache.doris.nereids.analyzer.UnboundRelation;
import org.apache.doris.nereids.rules.Rule;
+import org.apache.doris.nereids.trees.plans.LimitPhase;
import org.apache.doris.nereids.trees.plans.logical.LogicalLimit;
import org.apache.doris.nereids.trees.plans.logical.RelationUtil;
import org.apache.doris.nereids.util.MemoTestUtils;
@@ -33,10 +34,10 @@ import java.util.List;
public class MergeLimitsTest {
@Test
public void testMergeConsecutiveLimits() {
- LogicalLimit limit3 = new LogicalLimit<>(3, 5, new UnboundRelation(
+ LogicalLimit limit3 = new LogicalLimit<>(3, 5, LimitPhase.ORIGIN, new
UnboundRelation(
RelationUtil.newRelationId(), Lists.newArrayList("db", "t")));
- LogicalLimit limit2 = new LogicalLimit<>(2, 0, limit3);
- LogicalLimit limit1 = new LogicalLimit<>(10, 0, limit2);
+ LogicalLimit limit2 = new LogicalLimit<>(2, 0, LimitPhase.ORIGIN,
limit3);
+ LogicalLimit limit1 = new LogicalLimit<>(10, 0, LimitPhase.ORIGIN,
limit2);
CascadesContext context = MemoTestUtils.createCascadesContext(limit1);
List<Rule> rules = Lists.newArrayList(new MergeLimits().build());
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/nereids/rules/rewrite/logical/PushdownLimitTest.java
b/fe/fe-core/src/test/java/org/apache/doris/nereids/rules/rewrite/logical/PushdownLimitTest.java
index ea1fc58942..57e38b32d1 100644
---
a/fe/fe-core/src/test/java/org/apache/doris/nereids/rules/rewrite/logical/PushdownLimitTest.java
+++
b/fe/fe-core/src/test/java/org/apache/doris/nereids/rules/rewrite/logical/PushdownLimitTest.java
@@ -22,6 +22,7 @@ import org.apache.doris.nereids.pattern.PatternDescriptor;
import org.apache.doris.nereids.trees.expressions.EqualTo;
import org.apache.doris.nereids.trees.expressions.Expression;
import org.apache.doris.nereids.trees.plans.JoinType;
+import org.apache.doris.nereids.trees.plans.LimitPhase;
import org.apache.doris.nereids.trees.plans.Plan;
import org.apache.doris.nereids.trees.plans.RelationId;
import org.apache.doris.nereids.trees.plans.logical.LogicalJoin;
@@ -204,7 +205,6 @@ class PushdownLimitTest extends TestWithFeService
implements MemoPatternMatchSup
// plan among fragments has duplicate
elements.
(s1, s2) -> s1)
);
-
// limit is push down to left scan of `t1`.
Assertions.assertEquals(2, nameToScan.size());
Assertions.assertEquals(5,
nameToScan.get("t1").getLimit());
@@ -212,6 +212,14 @@ class PushdownLimitTest extends TestWithFeService
implements MemoPatternMatchSup
);
}
+ @Test
+ public void testLimitPushSort() {
+ PlanChecker.from(connectContext)
+ .analyze("select k1 from t1 order by k1 limit 1")
+ .rewrite()
+ .matches(logicalTopN());
+ }
+
@Test
public void testLimitPushUnion() {
PlanChecker.from(connectContext)
@@ -229,8 +237,10 @@ class PushdownLimitTest extends TestWithFeService
implements MemoPatternMatchSup
logicalOlapScan().when(scan ->
"t2".equals(scan.getTable().getName()))
),
logicalLimit(
- logicalProject(
- logicalOlapScan().when(scan ->
"t3".equals(scan.getTable().getName()))
+ logicalLimit(
+ logicalProject(
+
logicalOlapScan().when(scan -> "t3".equals(scan.getTable().getName()))
+ )
)
)
)
@@ -261,12 +271,12 @@ class PushdownLimitTest extends TestWithFeService
implements MemoPatternMatchSup
if (hasProject) {
// return limit -> project -> join
- return new LogicalLimit<>(10, 0, new LogicalProject<>(
+ return new LogicalLimit<>(10, 0, LimitPhase.ORIGIN, new
LogicalProject<>(
ImmutableList.of(new UnboundSlot("sid"), new
UnboundSlot("id")),
join));
} else {
// return limit -> join
- return new LogicalLimit<>(10, 0, join);
+ return new LogicalLimit<>(10, 0, LimitPhase.ORIGIN, join);
}
}
}
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/nereids/rules/rewrite/logical/EliminateLimitTest.java
b/fe/fe-core/src/test/java/org/apache/doris/nereids/rules/rewrite/logical/SplitLimitTest.java
similarity index 57%
copy from
fe/fe-core/src/test/java/org/apache/doris/nereids/rules/rewrite/logical/EliminateLimitTest.java
copy to
fe/fe-core/src/test/java/org/apache/doris/nereids/rules/rewrite/logical/SplitLimitTest.java
index 77cb6df00f..174f5a90b4 100644
---
a/fe/fe-core/src/test/java/org/apache/doris/nereids/rules/rewrite/logical/EliminateLimitTest.java
+++
b/fe/fe-core/src/test/java/org/apache/doris/nereids/rules/rewrite/logical/SplitLimitTest.java
@@ -17,35 +17,25 @@
package org.apache.doris.nereids.rules.rewrite.logical;
-import org.apache.doris.nereids.CascadesContext;
-import org.apache.doris.nereids.rules.Rule;
+import org.apache.doris.nereids.trees.plans.LimitPhase;
import org.apache.doris.nereids.trees.plans.Plan;
-import org.apache.doris.nereids.trees.plans.logical.LogicalEmptyRelation;
import org.apache.doris.nereids.trees.plans.logical.LogicalLimit;
import org.apache.doris.nereids.trees.plans.logical.LogicalOlapScan;
import org.apache.doris.nereids.util.MemoTestUtils;
+import org.apache.doris.nereids.util.PlanChecker;
import org.apache.doris.nereids.util.PlanConstructor;
-import com.google.common.collect.Lists;
-import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
-import java.util.List;
+public class SplitLimitTest {
+ private final LogicalOlapScan scan1 =
PlanConstructor.newLogicalOlapScan(0, "t1", 0);
-/**
- * MergeConsecutiveFilter ut
- */
-public class EliminateLimitTest {
@Test
- public void testEliminateLimit() {
- LogicalOlapScan scan = PlanConstructor.newLogicalOlapScan(0, "t1", 0);
- LogicalLimit<LogicalOlapScan> limit = new LogicalLimit<>(0, 0, scan);
-
- CascadesContext cascadesContext =
MemoTestUtils.createCascadesContext(limit);
- List<Rule> rules = Lists.newArrayList(new EliminateLimit().build());
- cascadesContext.topDownRewrite(rules);
-
- Plan actual = cascadesContext.getMemo().copyOut();
- Assertions.assertTrue(actual instanceof LogicalEmptyRelation);
+ void testSplitLimit() {
+ Plan plan = new LogicalLimit<>(0, 0, LimitPhase.ORIGIN, scan1);
+ plan = PlanChecker.from(MemoTestUtils.createConnectContext(), plan)
+ .rewrite()
+ .getPlan();
+ plan.anyMatch(x -> x instanceof LogicalLimit && ((LogicalLimit<?>)
x).isSplit());
}
}
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/nereids/stats/StatsCalculatorTest.java
b/fe/fe-core/src/test/java/org/apache/doris/nereids/stats/StatsCalculatorTest.java
index e931957be2..94f6a07d33 100644
---
a/fe/fe-core/src/test/java/org/apache/doris/nereids/stats/StatsCalculatorTest.java
+++
b/fe/fe-core/src/test/java/org/apache/doris/nereids/stats/StatsCalculatorTest.java
@@ -28,6 +28,8 @@ import
org.apache.doris.nereids.trees.expressions.SlotReference;
import org.apache.doris.nereids.trees.expressions.literal.IntegerLiteral;
import org.apache.doris.nereids.trees.plans.FakePlan;
import org.apache.doris.nereids.trees.plans.GroupPlan;
+import org.apache.doris.nereids.trees.plans.LimitPhase;
+import org.apache.doris.nereids.trees.plans.Plan;
import org.apache.doris.nereids.trees.plans.logical.LogicalFilter;
import org.apache.doris.nereids.trees.plans.logical.LogicalLimit;
import org.apache.doris.nereids.trees.plans.logical.LogicalOlapScan;
@@ -279,7 +281,9 @@ public class StatsCalculatorTest {
GroupPlan groupPlan = new GroupPlan(childGroup);
childGroup.setStatistics(childStats);
- LogicalLimit<GroupPlan> logicalLimit = new LogicalLimit<>(1, 2,
groupPlan);
+ LogicalLimit<? extends Plan> logicalLimit = new LogicalLimit<>(1, 2,
+ LimitPhase.GLOBAL, new LogicalLimit<>(1, 2, LimitPhase.LOCAL,
groupPlan)
+ );
GroupExpression groupExpression = new GroupExpression(logicalLimit,
ImmutableList.of(childGroup));
Group ownerGroup = newGroup();
ownerGroup.addGroupExpression(groupExpression);
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/nereids/trees/plans/PlanToStringTest.java
b/fe/fe-core/src/test/java/org/apache/doris/nereids/trees/plans/PlanToStringTest.java
index aed7bbbbd5..be62fc278b 100644
---
a/fe/fe-core/src/test/java/org/apache/doris/nereids/trees/plans/PlanToStringTest.java
+++
b/fe/fe-core/src/test/java/org/apache/doris/nereids/trees/plans/PlanToStringTest.java
@@ -46,7 +46,7 @@ public class PlanToStringTest {
@Test
public void testLogicalLimit(@Mocked Plan child) {
- LogicalLimit<Plan> plan = new LogicalLimit<>(0, 0, child);
+ LogicalLimit<Plan> plan = new LogicalLimit<>(0, 0, LimitPhase.ORIGIN,
child);
Assertions.assertEquals("LogicalLimit ( limit=0, offset=0 )",
plan.toString());
}
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/nereids/util/LogicalPlanBuilder.java
b/fe/fe-core/src/test/java/org/apache/doris/nereids/util/LogicalPlanBuilder.java
index c719302ac5..2bca62bab1 100644
---
a/fe/fe-core/src/test/java/org/apache/doris/nereids/util/LogicalPlanBuilder.java
+++
b/fe/fe-core/src/test/java/org/apache/doris/nereids/util/LogicalPlanBuilder.java
@@ -24,6 +24,7 @@ import org.apache.doris.nereids.trees.expressions.Expression;
import org.apache.doris.nereids.trees.expressions.NamedExpression;
import org.apache.doris.nereids.trees.plans.JoinHint;
import org.apache.doris.nereids.trees.plans.JoinType;
+import org.apache.doris.nereids.trees.plans.LimitPhase;
import org.apache.doris.nereids.trees.plans.Plan;
import org.apache.doris.nereids.trees.plans.logical.LogicalAggregate;
import org.apache.doris.nereids.trees.plans.logical.LogicalFilter;
@@ -122,7 +123,7 @@ public class LogicalPlanBuilder {
}
public LogicalPlanBuilder limit(long limit, long offset) {
- LogicalLimit<LogicalPlan> limitPlan = new LogicalLimit<>(limit,
offset, this.plan);
+ LogicalLimit<LogicalPlan> limitPlan = new LogicalLimit<>(limit,
offset, LimitPhase.ORIGIN, this.plan);
return from(limitPlan);
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]