This is an automated email from the ASF dual-hosted git repository. starocean999 pushed a commit to branch dev_rec in repository https://gitbox.apache.org/repos/asf/doris.git
commit 4c06602b74283ed0bfeba573e317b166100299a8 Author: lichi <[email protected]> AuthorDate: Fri Oct 24 11:26:46 2025 +0800 update fe --- .../glue/translator/PhysicalPlanTranslator.java | 2 +- .../doris/nereids/rules/analysis/AnalyzeCTE.java | 4 +- ...eChildToPhysicalRecursiveCteRecursiveChild.java | 1 + .../doris/nereids/rules/rewrite/CTEInline.java | 22 ++- .../trees/copier/LogicalPlanDeepCopier.java | 2 +- .../logical/LogicalRecursiveCteRecursiveChild.java | 27 ++-- .../plans/logical/LogicalRecursiveCteScan.java | 2 +- .../trees/plans/physical/PhysicalRecursiveCte.java | 4 + .../PhysicalRecursiveCteRecursiveChild.java | 28 ++-- .../org/apache/doris/planner/RecursiveCteNode.java | 8 +- .../doris/qe/runtime/ThriftPlansBuilder.java | 22 ++- .../nereids/rules/analysis/AnalyzeCTETest.java | 148 +++++++++++++++++++++ .../doris/nereids/rules/rewrite/CTEInlineTest.java | 84 ++++++++++++ .../nereids/rules/rewrite/ColumnPruningTest.java | 30 +++++ 14 files changed, 346 insertions(+), 38 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java index 96469ccd5bf..ee8fec61401 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java @@ -2283,7 +2283,7 @@ public class PhysicalPlanTranslator extends DefaultPlanVisitor<PlanFragment, Pla List<SlotDescriptor> outputSlotDescs = new ArrayList<>(setTuple.getSlots()); RecursiveCteNode recursiveCteNode = new RecursiveCteNode(context.nextPlanNodeId(), setTuple.getId(), - recursiveCte.isUnionAll()); + recursiveCte.getCteName(), recursiveCte.isUnionAll()); List<List<Expr>> distributeExprLists = getDistributeExprs(recursiveCte.children().toArray(new Plan[0])); recursiveCteNode.setChildrenDistributeExprLists(distributeExprLists); recursiveCteNode.setNereidsId(recursiveCte.getId()); diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/AnalyzeCTE.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/AnalyzeCTE.java index bbf0c036cf4..62612422e08 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/AnalyzeCTE.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/AnalyzeCTE.java @@ -191,8 +191,8 @@ public class AnalyzeCTE extends OneAnalysisRuleFactory { recursiveChildOutputs.get(i).getDataType(), anchorChildOutputTypes.get(i))); } } - analyzedRecursiveChild = new LogicalRecursiveCteRecursiveChild<>(forceOutputNullable(analyzedRecursiveChild, - ImmutableList.of())); + analyzedRecursiveChild = new LogicalRecursiveCteRecursiveChild<>(aliasQuery.getAlias(), + forceOutputNullable(analyzedRecursiveChild, ImmutableList.of())); // create LogicalRecursiveCte LogicalUnion logicalUnion = (LogicalUnion) parsedCtePlan; diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/implementation/LogicalRecursiveCteRecursiveChildToPhysicalRecursiveCteRecursiveChild.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/implementation/LogicalRecursiveCteRecursiveChildToPhysicalRecursiveCteRecursiveChild.java index 2923924a6ad..689e550fa3a 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/implementation/LogicalRecursiveCteRecursiveChildToPhysicalRecursiveCteRecursiveChild.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/implementation/LogicalRecursiveCteRecursiveChildToPhysicalRecursiveCteRecursiveChild.java @@ -29,6 +29,7 @@ public class LogicalRecursiveCteRecursiveChildToPhysicalRecursiveCteRecursiveChi @Override public Rule build() { return logicalRecursiveCteRecursiveChild().then(recursiveCte -> new PhysicalRecursiveCteRecursiveChild( + recursiveCte.getCteName(), recursiveCte.getLogicalProperties(), recursiveCte.child())) .toRule(RuleType.LOGICAL_RECURSIVE_CTE_RECURSIVE_CHILD_TO_PHYSICAL_RECURSIVE_CTE_RECURSIVE_CHILD); diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/CTEInline.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/CTEInline.java index 75f228ff7fd..9bbcb2e1e8d 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/CTEInline.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/CTEInline.java @@ -52,15 +52,12 @@ import java.util.Set; * and put all of them to the top of plan depends on dependency tree of them. */ public class CTEInline extends DefaultPlanRewriter<LogicalCTEProducer<?>> implements CustomRewriter { + // all cte used by recursive cte's recursive child should be inline private Set<LogicalCTEConsumer> mustInlineCteConsumers = new HashSet<>(); @Override public Plan rewriteRoot(Plan plan, JobContext jobContext) { - List<LogicalRecursiveCteRecursiveChild> recursiveCteRecursiveChildList = - plan.collectToList(LogicalRecursiveCteRecursiveChild.class::isInstance); - for (LogicalRecursiveCteRecursiveChild recursiveChild : recursiveCteRecursiveChildList) { - mustInlineCteConsumers.addAll(recursiveChild.collect(LogicalCTEConsumer.class::isInstance)); - } + collectMustInlineCteConsumers(plan, false, mustInlineCteConsumers); Plan root = plan.accept(this, null); // collect cte id to consumer @@ -131,4 +128,19 @@ public class CTEInline extends DefaultPlanRewriter<LogicalCTEProducer<?>> implem } return cteConsumer; } + + private void collectMustInlineCteConsumers(Plan planNode, boolean needCollect, + Set<LogicalCTEConsumer> cteConsumers) { + if (planNode instanceof LogicalCTEConsumer) { + if (needCollect) { + cteConsumers.add((LogicalCTEConsumer) planNode); + } + } else if (planNode instanceof LogicalRecursiveCteRecursiveChild) { + collectMustInlineCteConsumers(planNode.child(0), true, cteConsumers); + } else { + for (Plan child : planNode.children()) { + collectMustInlineCteConsumers(child, needCollect, cteConsumers); + } + } + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/copier/LogicalPlanDeepCopier.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/copier/LogicalPlanDeepCopier.java index e900ba6b2b4..b13644dc10d 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/copier/LogicalPlanDeepCopier.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/copier/LogicalPlanDeepCopier.java @@ -377,7 +377,7 @@ public class LogicalPlanDeepCopier extends DefaultPlanRewriter<DeepCopierContext public Plan visitLogicalRecursiveCteRecursiveChild(LogicalRecursiveCteRecursiveChild<? extends Plan> recursiveChild, DeepCopierContext context) { Plan child = recursiveChild.child().accept(this, context); - return new LogicalRecursiveCteRecursiveChild<>(child); + return new LogicalRecursiveCteRecursiveChild<>(recursiveChild.getCteName(), child); } @Override diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalRecursiveCteRecursiveChild.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalRecursiveCteRecursiveChild.java index 55747f5ba12..d2766a86f95 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalRecursiveCteRecursiveChild.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalRecursiveCteRecursiveChild.java @@ -25,6 +25,7 @@ import org.apache.doris.nereids.trees.expressions.Slot; import org.apache.doris.nereids.trees.plans.Plan; import org.apache.doris.nereids.trees.plans.PlanType; import org.apache.doris.nereids.trees.plans.visitor.PlanVisitor; +import org.apache.doris.nereids.util.Utils; import com.google.common.collect.ImmutableList; @@ -35,24 +36,30 @@ import java.util.Optional; * LogicalRecursiveCteRecursiveChild is sentinel plan for must_shuffle */ public class LogicalRecursiveCteRecursiveChild<CHILD_TYPE extends Plan> extends LogicalUnary<CHILD_TYPE> { + private final String cteName; - public LogicalRecursiveCteRecursiveChild(CHILD_TYPE child) { - this(Optional.empty(), Optional.empty(), child); + public LogicalRecursiveCteRecursiveChild(String cteName, CHILD_TYPE child) { + this(cteName, Optional.empty(), Optional.empty(), child); } - public LogicalRecursiveCteRecursiveChild(Optional<GroupExpression> groupExpression, + public LogicalRecursiveCteRecursiveChild(String cteName, Optional<GroupExpression> groupExpression, Optional<LogicalProperties> logicalProperties, CHILD_TYPE child) { - this(groupExpression, logicalProperties, ImmutableList.of(child)); + this(cteName, groupExpression, logicalProperties, ImmutableList.of(child)); } - public LogicalRecursiveCteRecursiveChild(Optional<GroupExpression> groupExpression, + public LogicalRecursiveCteRecursiveChild(String cteName, Optional<GroupExpression> groupExpression, Optional<LogicalProperties> logicalProperties, List<Plan> child) { super(PlanType.LOGICAL_RECURSIVE_CTE_RECURSIVE_CHILD, groupExpression, logicalProperties, child); + this.cteName = cteName; + } + + public String getCteName() { + return cteName; } @Override public Plan withChildren(List<Plan> children) { - return new LogicalRecursiveCteRecursiveChild<>(Optional.empty(), Optional.empty(), children); + return new LogicalRecursiveCteRecursiveChild<>(cteName, Optional.empty(), Optional.empty(), children); } @Override @@ -67,18 +74,20 @@ public class LogicalRecursiveCteRecursiveChild<CHILD_TYPE extends Plan> extends @Override public Plan withGroupExpression(Optional<GroupExpression> groupExpression) { - return new LogicalRecursiveCteRecursiveChild<>(groupExpression, Optional.of(getLogicalProperties()), children); + return new LogicalRecursiveCteRecursiveChild<>(cteName, groupExpression, + Optional.of(getLogicalProperties()), children); } @Override public Plan withGroupExprLogicalPropChildren(Optional<GroupExpression> groupExpression, Optional<LogicalProperties> logicalProperties, List<Plan> children) { - return new LogicalRecursiveCteRecursiveChild<>(groupExpression, logicalProperties, children); + return new LogicalRecursiveCteRecursiveChild<>(cteName, groupExpression, logicalProperties, children); } @Override public String toString() { - return "LogicalRecursiveCteRecursiveChild(MUST_SHUFFLE)"; + return Utils.toSqlStringSkipNull("LogicalRecursiveCteRecursiveChild", + "cteName", cteName); } @Override diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalRecursiveCteScan.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalRecursiveCteScan.java index 110a6297858..31b335260b9 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalRecursiveCteScan.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalRecursiveCteScan.java @@ -52,7 +52,7 @@ public class LogicalRecursiveCteScan extends LogicalCatalogRelation { @Override public String toString() { return Utils.toSqlString("LogicalRecursiveCteScan", - "recursive cte name", table.getName()); + "cteName", table.getName()); } @Override diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalRecursiveCte.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalRecursiveCte.java index e5a74a8292a..44aab38fc30 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalRecursiveCte.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalRecursiveCte.java @@ -92,6 +92,10 @@ public class PhysicalRecursiveCte extends AbstractPhysicalPlan implements Recurs return isUnionAll; } + public String getCteName() { + return cteName; + } + @Override public List<SlotReference> getRegularChildOutput(int i) { return regularChildrenOutputs.get(i); diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalRecursiveCteRecursiveChild.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalRecursiveCteRecursiveChild.java index 25903905b7a..9aef71e7ee9 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalRecursiveCteRecursiveChild.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalRecursiveCteRecursiveChild.java @@ -25,6 +25,7 @@ import org.apache.doris.nereids.trees.expressions.Expression; import org.apache.doris.nereids.trees.plans.Plan; import org.apache.doris.nereids.trees.plans.PlanType; import org.apache.doris.nereids.trees.plans.visitor.PlanVisitor; +import org.apache.doris.nereids.util.Utils; import org.apache.doris.statistics.Statistics; import com.google.common.base.Preconditions; @@ -38,31 +39,36 @@ import java.util.Optional; * PhysicalRecursiveCteRecursiveChild is sentinel plan for must_shuffle */ public class PhysicalRecursiveCteRecursiveChild<CHILD_TYPE extends Plan> extends PhysicalUnary<CHILD_TYPE> { - public PhysicalRecursiveCteRecursiveChild(LogicalProperties logicalProperties, CHILD_TYPE child) { - this(Optional.empty(), logicalProperties, child); + private final String cteName; + + public PhysicalRecursiveCteRecursiveChild(String cteName, LogicalProperties logicalProperties, CHILD_TYPE child) { + this(cteName, Optional.empty(), logicalProperties, child); } - public PhysicalRecursiveCteRecursiveChild(Optional<GroupExpression> groupExpression, + public PhysicalRecursiveCteRecursiveChild(String cteName, Optional<GroupExpression> groupExpression, LogicalProperties logicalProperties, CHILD_TYPE child) { - this(groupExpression, logicalProperties, PhysicalProperties.ANY, null, child); + this(cteName, groupExpression, logicalProperties, PhysicalProperties.ANY, null, child); } - public PhysicalRecursiveCteRecursiveChild(Optional<GroupExpression> groupExpression, + public PhysicalRecursiveCteRecursiveChild(String cteName, Optional<GroupExpression> groupExpression, LogicalProperties logicalProperties, @Nullable PhysicalProperties physicalProperties, Statistics statistics, CHILD_TYPE child) { super(PlanType.PHYSICAL_RECURSIVE_CTE_RECURSIVE_CHILD, groupExpression, logicalProperties, physicalProperties, statistics, child); + this.cteName = cteName; } @Override public String toString() { - return "PhysicalRecursiveCteRecursiveChild(MUST_SHUFFLE)"; + return Utils.toSqlStringSkipNull("PhysicalRecursiveCteRecursiveChild", + "cteName", cteName); } @Override public Plan withChildren(List<Plan> children) { Preconditions.checkArgument(children.size() == 1); - return new PhysicalRecursiveCteRecursiveChild<>(groupExpression, getLogicalProperties(), children.get(0)); + return new PhysicalRecursiveCteRecursiveChild<>(cteName, groupExpression, getLogicalProperties(), + children.get(0)); } @Override @@ -77,14 +83,14 @@ public class PhysicalRecursiveCteRecursiveChild<CHILD_TYPE extends Plan> extends @Override public Plan withGroupExpression(Optional<GroupExpression> groupExpression) { - return new PhysicalRecursiveCteRecursiveChild<>(groupExpression, getLogicalProperties(), child()); + return new PhysicalRecursiveCteRecursiveChild<>(cteName, groupExpression, getLogicalProperties(), child()); } @Override public Plan withGroupExprLogicalPropChildren(Optional<GroupExpression> groupExpression, Optional<LogicalProperties> logicalProperties, List<Plan> children) { Preconditions.checkArgument(children.size() == 1); - return new PhysicalRecursiveCteRecursiveChild<>(groupExpression, logicalProperties.get(), child()); + return new PhysicalRecursiveCteRecursiveChild<>(cteName, groupExpression, logicalProperties.get(), child()); } @Override @@ -109,7 +115,7 @@ public class PhysicalRecursiveCteRecursiveChild<CHILD_TYPE extends Plan> extends @Override public PhysicalPlan withPhysicalPropertiesAndStats(PhysicalProperties physicalProperties, Statistics statistics) { - return new PhysicalRecursiveCteRecursiveChild<>(groupExpression, getLogicalProperties(), physicalProperties, - statistics, child()); + return new PhysicalRecursiveCteRecursiveChild<>(cteName, groupExpression, getLogicalProperties(), + physicalProperties, statistics, child()); } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/RecursiveCteNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/RecursiveCteNode.java index d9625dfc6dd..c8259cc2821 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/RecursiveCteNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/RecursiveCteNode.java @@ -31,13 +31,14 @@ import com.google.common.collect.Lists; import java.util.List; public class RecursiveCteNode extends PlanNode { - + private String cteName; private boolean isUnionAll; private List<List<Expr>> materializedResultExprLists = Lists.newArrayList(); private TRecCTENode tRecCTENode; - public RecursiveCteNode(PlanNodeId id, TupleId tupleId, boolean isUnionAll) { + public RecursiveCteNode(PlanNodeId id, TupleId tupleId, String cteName, boolean isUnionAll) { super(id, tupleId.asList(), "RECURSIVE_CTE", StatisticalType.RECURSIVE_CTE_NODE); + this.cteName = cteName; this.isUnionAll = isUnionAll; } @@ -66,7 +67,7 @@ public class RecursiveCteNode extends PlanNode { @Override public String getNodeExplainString(String prefix, TExplainLevel detailLevel) { StringBuilder output = new StringBuilder(); - output.append(prefix).append("Recursive Cte: ").append("\n"); + output.append(prefix).append("Recursive Cte: ").append(cteName).append("\n"); output.append(prefix).append("isUnionAll: ").append(isUnionAll).append("\n"); if (!conjuncts.isEmpty()) { Expr expr = convertConjunctsToAndCompoundPredicate(conjuncts); @@ -78,6 +79,7 @@ public class RecursiveCteNode extends PlanNode { @Override public String toString() { return MoreObjects.toStringHelper(this) + .add("name", cteName) .add("id", getId().asInt()) .add("tid", tupleIds.get(0).asInt()) .add("isUnionAll", isUnionAll).toString(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/runtime/ThriftPlansBuilder.java b/fe/fe-core/src/main/java/org/apache/doris/qe/runtime/ThriftPlansBuilder.java index fc0bd2d1859..2733385154c 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/runtime/ThriftPlansBuilder.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/runtime/ThriftPlansBuilder.java @@ -597,13 +597,10 @@ public class ThriftPlansBuilder { private static Set<Integer> setParamsForRecursiveCteNode(List<PipelineDistributedPlan> distributedPlans, List<RuntimeFilter> runtimeFilters) { - Set<RecursiveCteNode> recursiveCteNodesInRecursiveSide = new HashSet<>(); - PlanNode rootPlan = distributedPlans.get(distributedPlans.size() - 1) - .getFragmentJob().getFragment().getPlanRoot(); - collectAllRecursiveCteNodesInRecursiveSide(rootPlan, false, recursiveCteNodesInRecursiveSide); Set<Integer> fragmentToNotifyClose = new HashSet<>(); Map<PlanFragmentId, TRecCTETarget> fragmentIdToRecCteTargetMap = new TreeMap<>(); Map<PlanFragmentId, Set<TNetworkAddress>> fragmentIdToNetworkAddressMap = new TreeMap<>(); + // distributedPlans is ordered in bottom up way, so does the fragments for (PipelineDistributedPlan plan : distributedPlans) { List<AssignedJob> fragmentAssignedJobs = plan.getInstanceJobs(); Set<TNetworkAddress> networkAddresses = new TreeSet<>(); @@ -633,6 +630,7 @@ public class ThriftPlansBuilder { distributedPlanWorker.brpcPort())); tRecCTETarget.setFragmentInstanceId(fragmentAssignedJobs.get(0).instanceId()); tRecCTETarget.setNodeId(recursiveCteScanNodes.get(0).getId().asInt()); + // find all RecursiveCteScanNode and its fragment id fragmentIdToRecCteTargetMap.put(planFragment.getFragmentId(), tRecCTETarget); } @@ -648,15 +646,22 @@ public class ThriftPlansBuilder { List<TRecCTETarget> targets = new ArrayList<>(); List<TRecCTEResetInfo> fragmentsToReset = new ArrayList<>(); // PhysicalPlanTranslator will swap recursiveCteNode's child fragment, - // so we get recursive one by 1st child + // so we get recursive one by 1st child and collect all child fragment of recursive side List<PlanFragment> childFragments = new ArrayList<>(); planFragment.getChild(0).collectAll(PlanFragment.class::isInstance, childFragments); for (PlanFragment child : childFragments) { PlanFragmentId childFragmentId = child.getFragmentId(); + // the fragment need to be notified to close fragmentToNotifyClose.add(childFragmentId.asInt()); TRecCTETarget tRecCTETarget = fragmentIdToRecCteTargetMap.getOrDefault(childFragmentId, null); if (tRecCTETarget != null) { + // one RecursiveCteNode can only have one corresponding RecursiveCteScanNode targets.add(tRecCTETarget); + // because we traverse the fragments in bottom-up way + // we can safely remove accessed RecursiveCteScanNode + // so the parent RecursiveCteNode won't see its grandson RecursiveCteScanNode + // but can only see its child RecursiveCteScanNode + fragmentIdToRecCteTargetMap.remove(childFragmentId); } Set<TNetworkAddress> tNetworkAddresses = fragmentIdToNetworkAddressMap.get(childFragmentId); if (tNetworkAddresses == null) { @@ -677,6 +682,7 @@ public class ThriftPlansBuilder { for (List<Expr> exprList : materializedResultExprLists) { texprLists.add(Expr.treesToThrift(exprList)); } + // the recursive side's rf need to be reset List<Integer> runtimeFiltersToReset = new ArrayList<>(runtimeFilters.size()); for (RuntimeFilter rf : runtimeFilters) { if (rf.hasRemoteTargets() @@ -684,7 +690,13 @@ public class ThriftPlansBuilder { runtimeFiltersToReset.add(rf.getFilterId().asInt()); } } + // find recursiveCte used by other recursive cte + Set<RecursiveCteNode> recursiveCteNodesInRecursiveSide = new HashSet<>(); + PlanNode rootPlan = distributedPlans.get(distributedPlans.size() - 1) + .getFragmentJob().getFragment().getPlanRoot(); + collectAllRecursiveCteNodesInRecursiveSide(rootPlan, false, recursiveCteNodesInRecursiveSide); boolean isUsedByOtherRecCte = recursiveCteNodesInRecursiveSide.contains(recursiveCteNode); + TRecCTENode tRecCTENode = new TRecCTENode(); tRecCTENode.setIsUnionAll(recursiveCteNode.isUnionAll()); tRecCTENode.setTargets(targets); diff --git a/fe/fe-core/src/test/java/org/apache/doris/nereids/rules/analysis/AnalyzeCTETest.java b/fe/fe-core/src/test/java/org/apache/doris/nereids/rules/analysis/AnalyzeCTETest.java index a91c0dd4712..56cea284188 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/nereids/rules/analysis/AnalyzeCTETest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/nereids/rules/analysis/AnalyzeCTETest.java @@ -33,10 +33,14 @@ import org.apache.doris.nereids.rules.rewrite.InApplyToJoin; import org.apache.doris.nereids.rules.rewrite.PullUpProjectUnderApply; import org.apache.doris.nereids.rules.rewrite.UnCorrelatedApplyFilter; import org.apache.doris.nereids.trees.expressions.StatementScopeIdGenerator; +import org.apache.doris.nereids.trees.expressions.functions.scalar.Nullable; +import org.apache.doris.nereids.trees.plans.commands.ExplainCommand; +import org.apache.doris.nereids.trees.plans.logical.LogicalPlan; import org.apache.doris.nereids.trees.plans.physical.PhysicalPlan; import org.apache.doris.nereids.util.MemoPatternMatchSupported; import org.apache.doris.nereids.util.MemoTestUtils; import org.apache.doris.nereids.util.PlanChecker; +import org.apache.doris.qe.OriginStatement; import org.apache.doris.utframe.TestWithFeService; import com.google.common.collect.ImmutableList; @@ -264,6 +268,92 @@ public class AnalyzeCTETest extends TestWithFeService implements MemoPatternMatc ); } + @Test + public void testRecCteOutputNullable() { + String sql = new StringBuilder() + .append("WITH RECURSIVE test_table AS (\n") + .append(" SELECT 1 UNION ALL\n") + .append(" SELECT 2 FROM test_table\n") + .append(")\n") + .append("SELECT * FROM test_table;") + .toString(); + PlanChecker.from(connectContext) + .analyze(sql) + .matches( + logicalRecursiveCte( + logicalProject( + logicalOneRowRelation( + ) + ).when(project -> project.getProjects().get(0).child(0) instanceof Nullable), + logicalRecursiveCteRecursiveChild( + logicalProject( + logicalProject( + logicalCTEConsumer() + ) + ).when(project -> project.getProjects().get(0).child(0) instanceof Nullable) + ) + ) + ); + } + + @Test + public void testRecCteWithoutRecKeyword() { + String sql = new StringBuilder() + .append("WITH RECURSIVE t1 AS (\n") + .append(" SELECT 1\n") + .append("UNION ALL\n") + .append(" SELECT 2 FROM t1\n") + .append("),\n").append("t2 AS (\n") + .append(" SELECT 3\n") + .append("UNION ALL\n") + .append(" SELECT 4 FROM t1, t2\n") + .append(")\n") + .append("SELECT * FROM t2;") + .toString(); + PlanChecker.from(connectContext) + .analyze(sql) + .matches( + logicalRecursiveCte( + logicalProject( + logicalOneRowRelation( + ) + ), + logicalRecursiveCteRecursiveChild( + logicalProject( + logicalProject( + logicalJoin() + ) + ) + ) + ).when(cte -> cte.getCteName().equals("t2")) + ); + } + + @Test + public void testRecCteMultipleUnion() { + String sql = new StringBuilder().append("with recursive t1 as (\n").append(" select\n") + .append(" 1 as c1,\n").append(" 1 as c2\n").append("),\n").append("t2 as (\n") + .append(" select\n").append(" 2 as c1,\n").append(" 2 as c2\n").append("),\n") + .append("xx as (\n").append(" select\n").append(" c1,\n").append(" c2\n") + .append(" from\n").append(" t1\n").append(" union\n").append(" select\n") + .append(" c1,\n").append(" c2\n").append(" from\n").append(" t2\n") + .append(" union\n").append(" select\n").append(" c1,\n").append(" c2\n") + .append(" from\n").append(" xx\n").append(")\n").append("select\n").append(" *\n") + .append("from\n").append(" xx;").toString(); + LogicalPlan unboundPlan = new NereidsParser().parseSingle(sql); + StatementContext statementContext = new StatementContext(connectContext, + new OriginStatement(sql, 0)); + NereidsPlanner planner = new NereidsPlanner(statementContext); + planner.planWithLock(unboundPlan, PhysicalProperties.ANY, + ExplainCommand.ExplainLevel.ANALYZED_PLAN); + MemoTestUtils.initMemoAndValidState(planner.getCascadesContext()); + PlanChecker.from(planner.getCascadesContext()).matches( + logicalRecursiveCte( + logicalProject( + logicalUnion()), + logicalRecursiveCteRecursiveChild()).when(cte -> cte.getCteName().equals("xx"))); + } + /* ******************************************************************************************** * Test CTE Exceptions @@ -333,4 +423,62 @@ public class AnalyzeCTETest extends TestWithFeService implements MemoPatternMatc () -> PlanChecker.from(connectContext).analyze(sql), "Not throw expected exception."); Assertions.assertTrue(exception.getMessage().contains("Table [cte2] does not exist in database")); } + + @Test + public void testRecCteWithoutRecKeywordException() { + String sql = new StringBuilder() + .append("WITH t1 AS (\n") + .append(" SELECT 1 UNION ALL\n") + .append(" SELECT 2 FROM t1\n") + .append(")\n") + .append("SELECT * FROM t1;") + .toString(); + LogicalPlan unboundPlan = new NereidsParser().parseSingle(sql); + StatementContext statementContext = new StatementContext(connectContext, + new OriginStatement(sql, 0)); + NereidsPlanner planner = new NereidsPlanner(statementContext); + AnalysisException exception = Assertions.assertThrows(AnalysisException.class, + () -> planner.planWithLock(unboundPlan, PhysicalProperties.ANY, + ExplainCommand.ExplainLevel.ANALYZED_PLAN), "Not throw expected exception."); + Assertions.assertTrue(exception.getMessage().contains("Table [t1] does not exist in database")); + } + + @Test + public void testRecCteDatatypeException() { + String sql = new StringBuilder().append("WITH RECURSIVE t1 AS (\n").append(" SELECT 1 AS number\n") + .append("UNION ALL\n").append(" SELECT number + 1 FROM t1 WHERE number < 100\n").append(")\n") + .append("SELECT number FROM t1;").toString(); + LogicalPlan unboundPlan = new NereidsParser().parseSingle(sql); + StatementContext statementContext = new StatementContext(connectContext, + new OriginStatement(sql, 0)); + NereidsPlanner planner = new NereidsPlanner(statementContext); + AnalysisException exception = Assertions.assertThrows(AnalysisException.class, + () -> planner.planWithLock(unboundPlan, PhysicalProperties.ANY, + ExplainCommand.ExplainLevel.ANALYZED_PLAN), + "Not throw expected exception."); + Assertions.assertTrue(exception.getMessage().contains("please add cast manually to get expect datatype")); + } + + @Test + public void testRecCteMultipleUnionException() { + String sql = new StringBuilder().append("with recursive t1 as (\n").append(" select\n") + .append(" 1 as c1,\n").append(" 1 as c2\n").append("),\n").append("t2 as (\n") + .append(" select\n").append(" 2 as c1,\n").append(" 2 as c2\n").append("),\n") + .append("xx as (\n").append(" select\n").append(" c1,\n").append(" c2\n") + .append(" from\n").append(" t1\n").append(" union\n").append(" select\n") + .append(" c1,\n").append(" c2\n").append(" from\n").append(" xx\n") + .append(" union\n").append(" select\n").append(" c1,\n").append(" c2\n") + .append(" from\n").append(" t2\n").append(")\n").append("select\n").append(" *\n") + .append("from\n").append(" xx").toString(); + LogicalPlan unboundPlan = new NereidsParser().parseSingle(sql); + StatementContext statementContext = new StatementContext(connectContext, + new OriginStatement(sql, 0)); + NereidsPlanner planner = new NereidsPlanner(statementContext); + AnalysisException exception = Assertions.assertThrows(AnalysisException.class, + () -> planner.planWithLock(unboundPlan, PhysicalProperties.ANY, + ExplainCommand.ExplainLevel.ANALYZED_PLAN), + "Not throw expected exception."); + Assertions.assertTrue(exception.getMessage() + .contains("recursive reference to query xx must not appear within its non-recursive term")); + } } diff --git a/fe/fe-core/src/test/java/org/apache/doris/nereids/rules/rewrite/CTEInlineTest.java b/fe/fe-core/src/test/java/org/apache/doris/nereids/rules/rewrite/CTEInlineTest.java new file mode 100644 index 00000000000..0a0efc8b5db --- /dev/null +++ b/fe/fe-core/src/test/java/org/apache/doris/nereids/rules/rewrite/CTEInlineTest.java @@ -0,0 +1,84 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.nereids.rules.rewrite; + +import org.apache.doris.nereids.NereidsPlanner; +import org.apache.doris.nereids.StatementContext; +import org.apache.doris.nereids.parser.NereidsParser; +import org.apache.doris.nereids.properties.PhysicalProperties; +import org.apache.doris.nereids.trees.plans.commands.ExplainCommand; +import org.apache.doris.nereids.trees.plans.logical.LogicalPlan; +import org.apache.doris.nereids.util.MemoPatternMatchSupported; +import org.apache.doris.nereids.util.MemoTestUtils; +import org.apache.doris.nereids.util.PlanChecker; +import org.apache.doris.qe.OriginStatement; +import org.apache.doris.utframe.TestWithFeService; + +import org.junit.jupiter.api.Test; + +public class CTEInlineTest extends TestWithFeService implements MemoPatternMatchSupported { + @Override + protected void runBeforeAll() throws Exception { + createDatabase("test"); + connectContext.setDatabase("test"); + } + + @Test + public void recCteInline() { + String sql = new StringBuilder().append("with recursive t1 as (\n").append(" select\n") + .append(" 1 as c1,\n").append(" 1 as c2\n").append("),\n").append("t2 as (\n") + .append(" select\n").append(" 2 as c1,\n").append(" 2 as c2\n").append("),\n") + .append("t3 as (\n").append(" select\n").append(" 3 as c1,\n").append(" 3 as c2\n") + .append("),\n").append("xx as (\n").append(" select\n").append(" c1,\n") + .append(" c2\n").append(" from\n").append(" t1\n").append(" union\n") + .append(" select\n").append(" t2.c1,\n").append(" t2.c2\n").append(" from\n") + .append(" t2,\n").append(" xx\n").append(" where\n").append(" t2.c1 = xx.c1\n") + .append("),\n").append("yy as (\n").append(" select\n").append(" c1,\n") + .append(" c2\n").append(" from\n").append(" t3\n").append(" union\n") + .append(" select\n").append(" t3.c1,\n").append(" t3.c2\n").append(" from\n") + .append(" t3,\n").append(" yy,\n").append(" xx\n").append(" where\n") + .append(" t3.c1 = yy.c1\n").append(" and t3.c2 = xx.c1\n").append(")\n") + .append("select\n").append(" *\n").append("from\n").append(" yy y1,\n").append(" yy y2;") + .toString(); + LogicalPlan unboundPlan = new NereidsParser().parseSingle(sql); + StatementContext statementContext = new StatementContext(connectContext, + new OriginStatement(sql, 0)); + NereidsPlanner planner = new NereidsPlanner(statementContext); + planner.planWithLock(unboundPlan, PhysicalProperties.ANY, + ExplainCommand.ExplainLevel.REWRITTEN_PLAN); + MemoTestUtils.initMemoAndValidState(planner.getCascadesContext()); + PlanChecker.from(planner.getCascadesContext()).matches( + logicalRecursiveCte( + any( + ), + logicalRecursiveCteRecursiveChild( + logicalProject( + logicalJoin( + any(), + logicalProject( + logicalFilter( + logicalRecursiveCte().when(cte -> cte.getCteName().equals("xx")) + ) + ) + ) + ) + ) + ).when(cte -> cte.getCteName().equals("yy")) + ); + } +} diff --git a/fe/fe-core/src/test/java/org/apache/doris/nereids/rules/rewrite/ColumnPruningTest.java b/fe/fe-core/src/test/java/org/apache/doris/nereids/rules/rewrite/ColumnPruningTest.java index 12e9a1ad381..353f0c13863 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/nereids/rules/rewrite/ColumnPruningTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/nereids/rules/rewrite/ColumnPruningTest.java @@ -17,15 +17,23 @@ package org.apache.doris.nereids.rules.rewrite; +import org.apache.doris.nereids.NereidsPlanner; +import org.apache.doris.nereids.StatementContext; +import org.apache.doris.nereids.parser.NereidsParser; +import org.apache.doris.nereids.properties.PhysicalProperties; import org.apache.doris.nereids.trees.expressions.NamedExpression; import org.apache.doris.nereids.trees.expressions.SlotReference; import org.apache.doris.nereids.trees.expressions.literal.TinyIntLiteral; import org.apache.doris.nereids.trees.plans.Plan; +import org.apache.doris.nereids.trees.plans.commands.ExplainCommand; +import org.apache.doris.nereids.trees.plans.logical.LogicalPlan; import org.apache.doris.nereids.trees.plans.logical.LogicalProject; import org.apache.doris.nereids.types.DoubleType; import org.apache.doris.nereids.types.TinyIntType; import org.apache.doris.nereids.util.MemoPatternMatchSupported; +import org.apache.doris.nereids.util.MemoTestUtils; import org.apache.doris.nereids.util.PlanChecker; +import org.apache.doris.qe.OriginStatement; import org.apache.doris.utframe.TestWithFeService; import com.google.common.collect.ImmutableList; @@ -328,6 +336,28 @@ public class ColumnPruningTest extends TestWithFeService implements MemoPatternM ); } + @Test + public void pruneRecCte() { + String sql = new StringBuilder().append("WITH RECURSIVE t1(col1, col2, col3) AS (\n") + .append(" SELECT 1, 1, 1\n").append(" UNION ALL\n").append(" SELECT 2, 2, 2\n") + .append(" FROM student, t1\n").append(" WHERE t1.col1 = student.id\n").append(" )\n") + .append("SELECT col1\n").append("FROM t1\n").append("WHERE col2 = 2;").toString(); + LogicalPlan unboundPlan = new NereidsParser().parseSingle(sql); + StatementContext statementContext = new StatementContext(connectContext, + new OriginStatement(sql, 0)); + NereidsPlanner planner = new NereidsPlanner(statementContext); + planner.planWithLock(unboundPlan, PhysicalProperties.ANY, + ExplainCommand.ExplainLevel.REWRITTEN_PLAN); + MemoTestUtils.initMemoAndValidState(planner.getCascadesContext()); + PlanChecker.from(planner.getCascadesContext()).matches( + logicalProject( + logicalFilter( + logicalRecursiveCte().when(cte -> cte.getOutput().size() == 3) + ) + ).when(project -> project.getOutputs().size() == 1) + ); + } + private List<String> getOutputQualifiedNames(LogicalProject<? extends Plan> p) { return getOutputQualifiedNames(p.getOutputs()); } --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
