This is an automated email from the ASF dual-hosted git repository. starocean999 pushed a commit to branch dev_rec in repository https://gitbox.apache.org/repos/asf/doris.git
commit bd7cbfb4e2f216d8e3dfca3b990725f72a8be9c5 Author: lichi <[email protected]> AuthorDate: Tue Oct 21 18:07:38 2025 +0800 update fe --- .../org/apache/doris/nereids/CascadesContext.java | 4 +++ .../org/apache/doris/nereids/StatementContext.java | 10 ++++++ .../glue/translator/PhysicalPlanTranslator.java | 8 ----- .../doris/nereids/parser/LogicalPlanBuilder.java | 9 ++++-- .../nereids/properties/RequestPropertyDeriver.java | 11 +++++++ .../doris/nereids/rules/analysis/AnalyzeCTE.java | 15 ++++++--- .../doris/nereids/rules/analysis/BindRelation.java | 5 +++ .../doris/nereids/rules/rewrite/CTEInline.java | 8 +++-- .../org/apache/doris/planner/RecursiveCteNode.java | 18 +++++++++-- .../doris/qe/runtime/ThriftPlansBuilder.java | 37 ++++++++++++++++++++-- gensrc/thrift/PlanNodes.thrift | 2 ++ 11 files changed, 106 insertions(+), 21 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/CascadesContext.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/CascadesContext.java index fb8572ae64e..1517d6f2a7a 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/CascadesContext.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/CascadesContext.java @@ -267,6 +267,10 @@ public class CascadesContext implements ScheduleContext { return recursiveCteOutputs; } + public boolean isAnalyzingRecursiveCteAnchorChild() { + return currentRecursiveCteName.isPresent() && recursiveCteOutputs.isEmpty(); + } + /** * Init memo with plan */ diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/StatementContext.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/StatementContext.java index 5829a0c9b56..a9997d3dfdc 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/StatementContext.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/StatementContext.java @@ -71,6 +71,7 @@ import com.google.common.base.Suppliers; import com.google.common.base.Throwables; import com.google.common.collect.HashMultimap; import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableSet; import com.google.common.collect.Maps; import com.google.common.collect.Multimap; import com.google.common.collect.Sets; @@ -157,6 +158,7 @@ public class StatementContext implements Closeable { private final Map<CTEId, List<Pair<Multimap<Slot, Slot>, Group>>> cteIdToConsumerGroup = new HashMap<>(); private final Map<CTEId, LogicalPlan> rewrittenCteProducer = new HashMap<>(); private final Map<CTEId, LogicalPlan> rewrittenCteConsumer = new HashMap<>(); + private final Set<CTEId> recursiveCteIds = new HashSet<>(); private final Set<String> viewDdlSqlSet = Sets.newHashSet(); private final SqlCacheContext sqlCacheContext; @@ -594,6 +596,14 @@ public class StatementContext implements Closeable { return cteIdToConsumerGroup; } + public void addRecursiveCteIds(CTEId cteId) { + recursiveCteIds.add(cteId); + } + + public Set<CTEId> getRecursiveCteIds() { + return ImmutableSet.copyOf(recursiveCteIds); + } + public Map<CTEId, LogicalPlan> getRewrittenCteProducer() { return rewrittenCteProducer; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java index 8c35e65d1f1..4f25b08f798 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java @@ -2333,14 +2333,6 @@ public class PhysicalPlanTranslator extends DefaultPlanVisitor<PlanFragment, Pla setPlanRoot(recursiveCteFragment, recursiveCteNode, recursiveCte); } - // in pipeline engine, we use parallel scan by default, but it broke the rule of data distribution - // we need turn of parallel scan to ensure to get correct result. - // TODO: nereids forbid all parallel scan under PhysicalSetOperation temporary - if (!recursiveCte.getPhysicalProperties().equals(PhysicalProperties.ANY) - && findOlapScanNodesByPassExchangeAndJoinNode(recursiveCteFragment.getPlanRoot())) { - recursiveCteFragment.setHasColocatePlanNode(true); - recursiveCteNode.setColocate(true); - } recursiveCteFragment.updateDataPartition(DataPartition.UNPARTITIONED); recursiveCteFragment.setOutputPartition(DataPartition.UNPARTITIONED); diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/parser/LogicalPlanBuilder.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/parser/LogicalPlanBuilder.java index a5c106fd6e5..b54389ade5c 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/parser/LogicalPlanBuilder.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/parser/LogicalPlanBuilder.java @@ -1111,6 +1111,8 @@ public class LogicalPlanBuilder extends DorisParserBaseVisitor<Object> { private final Map<Integer, ParserRuleContext> selectHintMap; + private boolean isInRecursiveCteContext = false; + public LogicalPlanBuilder(Map<Integer, ParserRuleContext> selectHintMap) { this.selectHintMap = selectHintMap; } @@ -2134,8 +2136,11 @@ public class LogicalPlanBuilder extends DorisParserBaseVisitor<Object> { if (ctx == null) { return plan; } - return new LogicalCTE<>(ctx.RECURSIVE() != null, + isInRecursiveCteContext = ctx.RECURSIVE() != null; + LogicalCTE<Plan> logicalCTE = new LogicalCTE<>(isInRecursiveCteContext, (List) visit(ctx.aliasQuery(), LogicalSubQueryAlias.class), plan); + isInRecursiveCteContext = false; + return logicalCTE; } /** @@ -2329,7 +2334,7 @@ public class LogicalPlanBuilder extends DorisParserBaseVisitor<Object> { public LogicalPlan visitSetOperation(SetOperationContext ctx) { return ParserUtils.withOrigin(ctx, () -> { - if (ctx.UNION() != null) { + if (ctx.UNION() != null && !isInRecursiveCteContext) { Qualifier qualifier = getQualifier(ctx); List<QueryTermContext> contexts = Lists.newArrayList(ctx.right); QueryTermContext current = ctx.left; diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/RequestPropertyDeriver.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/RequestPropertyDeriver.java index 2322b39a62b..c26d1abade1 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/RequestPropertyDeriver.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/RequestPropertyDeriver.java @@ -54,6 +54,7 @@ import org.apache.doris.nereids.trees.plans.physical.PhysicalNestedLoopJoin; import org.apache.doris.nereids.trees.plans.physical.PhysicalOlapTableSink; import org.apache.doris.nereids.trees.plans.physical.PhysicalPartitionTopN; import org.apache.doris.nereids.trees.plans.physical.PhysicalProject; +import org.apache.doris.nereids.trees.plans.physical.PhysicalRecursiveCte; import org.apache.doris.nereids.trees.plans.physical.PhysicalResultSink; import org.apache.doris.nereids.trees.plans.physical.PhysicalSetOperation; import org.apache.doris.nereids.trees.plans.physical.PhysicalUnion; @@ -315,6 +316,16 @@ public class RequestPropertyDeriver extends PlanVisitor<Void, PlanContext> { return null; } + @Override + public Void visitPhysicalRecursiveCte(PhysicalRecursiveCte recursiveCte, PlanContext context) { + List<PhysicalProperties> requestGather = Lists.newArrayListWithCapacity(context.arity()); + for (int i = context.arity(); i > 0; --i) { + requestGather.add(PhysicalProperties.GATHER); + } + addRequestPropertyToChildren(requestGather); + return null; + } + @Override public Void visitAbstractPhysicalSort(AbstractPhysicalSort<? extends Plan> sort, PlanContext context) { if (!sort.getSortPhase().isLocal()) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/AnalyzeCTE.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/AnalyzeCTE.java index b7ef605c395..594dede9edf 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/AnalyzeCTE.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/AnalyzeCTE.java @@ -140,7 +140,8 @@ public class AnalyzeCTE extends OneAnalysisRuleFactory { Preconditions.checkArgument(aliasQuery.isRecursiveCte(), "alias query must be recursive cte"); LogicalPlan parsedCtePlan = (LogicalPlan) aliasQuery.child(); if (!(parsedCtePlan instanceof LogicalUnion) || parsedCtePlan.children().size() != 2) { - throw new AnalysisException("recursive cte must be union"); + throw new AnalysisException(String.format("recursive cte must be union, don't support %s", + parsedCtePlan.getClass().getSimpleName())); } // analyze anchor child, its output list will be recursive cte temp table's schema LogicalPlan anchorChild = (LogicalPlan) parsedCtePlan.child(0); @@ -151,6 +152,11 @@ public class AnalyzeCTE extends OneAnalysisRuleFactory { }); cascadesContext.addPlanProcesses(innerAnchorCascadesCtx.getPlanProcesses()); LogicalPlan analyzedAnchorChild = (LogicalPlan) innerAnchorCascadesCtx.getRewritePlan(); + if (!analyzedAnchorChild.collect(LogicalRecursiveCteScan.class::isInstance).isEmpty()) { + throw new AnalysisException( + String.format("recursive reference to query %s must not appear within its non-recursive term", + aliasQuery.getAlias())); + } checkColumnAlias(aliasQuery, analyzedAnchorChild.getOutput()); // make all output nullable analyzedAnchorChild = forceOutputNullable(analyzedAnchorChild, @@ -179,10 +185,10 @@ public class AnalyzeCTE extends OneAnalysisRuleFactory { List<Slot> recursiveChildOutputs = analyzedRecursiveChild.getOutput(); for (int i = 0; i < recursiveChildOutputs.size(); ++i) { if (!recursiveChildOutputs.get(i).getDataType().equals(anchorChildOutputTypes.get(i))) { - throw new AnalysisException(String.format("recursive child's %d column's datatype in select list %s " + throw new AnalysisException(String.format("%s recursive child's %d column's datatype in select list %s " + "is different from anchor child's output datatype %s, please add cast manually " - + "to get expect datatype", - i + 1, recursiveChildOutputs.get(i).getDataType(), anchorChildOutputTypes.get(i))); + + "to get expect datatype", aliasQuery.getAlias(), i + 1, + recursiveChildOutputs.get(i).getDataType(), anchorChildOutputTypes.get(i))); } } analyzedRecursiveChild = new LogicalRecursiveCteRecursiveChild<>(forceOutputNullable(analyzedRecursiveChild, @@ -216,6 +222,7 @@ public class AnalyzeCTE extends OneAnalysisRuleFactory { analyzedCtePlan = analyzedCtePlan.withNewOutputs(newOutputs); CTEId cteId = StatementScopeIdGenerator.newCTEId(); + cascadesContext.getStatementContext().addRecursiveCteIds(cteId); LogicalSubQueryAlias<Plan> logicalSubQueryAlias = aliasQuery.withChildren(ImmutableList.of(analyzedCtePlan)); outerCteCtx = new CTEContext(cteId, logicalSubQueryAlias, outerCteCtx); outerCteCtx.setAnalyzedPlan(logicalSubQueryAlias); diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/BindRelation.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/BindRelation.java index 4e6be99f290..b6ef89a26e8 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/BindRelation.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/BindRelation.java @@ -174,6 +174,11 @@ public class BindRelation extends OneAnalysisRuleFactory { List<String> tableQualifier = RelationUtil.getQualifierName( cascadesContext.getConnectContext(), unboundRelation.getNameParts()); if (tableName.equalsIgnoreCase(cascadesContext.getCurrentRecursiveCteName().orElse(""))) { + if (cascadesContext.isAnalyzingRecursiveCteAnchorChild()) { + throw new AnalysisException( + String.format("recursive reference to query %s must not appear within its non-recursive term", + cascadesContext.getCurrentRecursiveCteName().get())); + } ImmutableList.Builder<Column> schema = new ImmutableList.Builder<>(); for (Slot slot : cascadesContext.getRecursiveCteOutputs()) { schema.add(new Column(slot.getName(), slot.getDataType().toCatalogDataType(), slot.nullable())); diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/CTEInline.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/CTEInline.java index 30970dee649..9c6500238c8 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/CTEInline.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/CTEInline.java @@ -21,6 +21,7 @@ import org.apache.doris.nereids.jobs.JobContext; import org.apache.doris.nereids.trees.copier.DeepCopierContext; import org.apache.doris.nereids.trees.copier.LogicalPlanDeepCopier; import org.apache.doris.nereids.trees.expressions.Alias; +import org.apache.doris.nereids.trees.expressions.CTEId; import org.apache.doris.nereids.trees.expressions.ExprId; import org.apache.doris.nereids.trees.expressions.Expression; import org.apache.doris.nereids.trees.expressions.NamedExpression; @@ -52,10 +53,12 @@ import java.util.Set; * and put all of them to the top of plan depends on dependency tree of them. */ public class CTEInline extends DefaultPlanRewriter<LogicalCTEProducer<?>> implements CustomRewriter { - Set<LogicalCTEConsumer> mustInlineCteConsumers = new HashSet<>(); + private Set<LogicalCTEConsumer> mustInlineCteConsumers = new HashSet<>(); + private Set<CTEId> recursiveCteIds; @Override public Plan rewriteRoot(Plan plan, JobContext jobContext) { + recursiveCteIds = jobContext.getCascadesContext().getStatementContext().getRecursiveCteIds(); List<LogicalRecursiveCte> recursiveCteList = plan.collectToList(LogicalRecursiveCte.class::isInstance); for (LogicalRecursiveCte recursiveCte : recursiveCteList) { mustInlineCteConsumers.addAll(recursiveCte.collect(LogicalCTEConsumer.class::isInstance)); @@ -88,7 +91,8 @@ public class CTEInline extends DefaultPlanRewriter<LogicalCTEProducer<?>> implem } return false; }); - if (!Sets.intersection(mustInlineCteConsumers, Sets.newHashSet(consumers)).isEmpty()) { + if (recursiveCteIds.contains(cteAnchor.getCteId()) + || !Sets.intersection(mustInlineCteConsumers, Sets.newHashSet(consumers)).isEmpty()) { // should inline Plan root = cteAnchor.right().accept(this, (LogicalCTEProducer<?>) cteAnchor.left()); // process child diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/RecursiveCteNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/RecursiveCteNode.java index 7d9eecd35be..d9625dfc6dd 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/RecursiveCteNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/RecursiveCteNode.java @@ -26,14 +26,18 @@ import org.apache.doris.thrift.TPlanNodeType; import org.apache.doris.thrift.TRecCTENode; import com.google.common.base.MoreObjects; +import com.google.common.collect.Lists; -public class RecursiveCteNode extends SetOperationNode { +import java.util.List; + +public class RecursiveCteNode extends PlanNode { private boolean isUnionAll; + private List<List<Expr>> materializedResultExprLists = Lists.newArrayList(); private TRecCTENode tRecCTENode; public RecursiveCteNode(PlanNodeId id, TupleId tupleId, boolean isUnionAll) { - super(id, tupleId, "RECURSIVE_CTE", StatisticalType.RECURSIVE_CTE_NODE); + super(id, tupleId.asList(), "RECURSIVE_CTE", StatisticalType.RECURSIVE_CTE_NODE); this.isUnionAll = isUnionAll; } @@ -41,6 +45,14 @@ public class RecursiveCteNode extends SetOperationNode { return isUnionAll; } + public void setMaterializedResultExprLists(List<List<Expr>> exprs) { + this.materializedResultExprLists = exprs; + } + + public List<List<Expr>> getMaterializedResultExprLists() { + return materializedResultExprLists; + } + public void settRecCTENode(TRecCTENode tRecCTENode) { this.tRecCTENode = tRecCTENode; } @@ -67,7 +79,7 @@ public class RecursiveCteNode extends SetOperationNode { public String toString() { return MoreObjects.toStringHelper(this) .add("id", getId().asInt()) - .add("tid", tupleId.asInt()) + .add("tid", tupleIds.get(0).asInt()) .add("isUnionAll", isUnionAll).toString(); } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/runtime/ThriftPlansBuilder.java b/fe/fe-core/src/main/java/org/apache/doris/qe/runtime/ThriftPlansBuilder.java index 9d4bd4f9a3e..fc0bd2d1859 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/runtime/ThriftPlansBuilder.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/runtime/ThriftPlansBuilder.java @@ -43,8 +43,10 @@ import org.apache.doris.planner.OlapScanNode; import org.apache.doris.planner.OlapTableSink; import org.apache.doris.planner.PlanFragment; import org.apache.doris.planner.PlanFragmentId; +import org.apache.doris.planner.PlanNode; import org.apache.doris.planner.RecursiveCteNode; import org.apache.doris.planner.RecursiveCteScanNode; +import org.apache.doris.planner.RuntimeFilter; import org.apache.doris.planner.ScanNode; import org.apache.doris.planner.SortNode; import org.apache.doris.qe.ConnectContext; @@ -100,7 +102,8 @@ public class ThriftPlansBuilder { CoordinatorContext coordinatorContext) { List<PipelineDistributedPlan> distributedPlans = coordinatorContext.distributedPlans; - Set<Integer> fragmentToNotifyClose = setParamsForRecursiveCteNode(distributedPlans); + Set<Integer> fragmentToNotifyClose = setParamsForRecursiveCteNode(distributedPlans, + coordinatorContext.runtimeFilters); // we should set runtime predicate first, then we can use heap sort and to thrift setRuntimePredicateIfNeed(coordinatorContext.scanNodes); @@ -592,7 +595,12 @@ public class ThriftPlansBuilder { } } - private static Set<Integer> setParamsForRecursiveCteNode(List<PipelineDistributedPlan> distributedPlans) { + private static Set<Integer> setParamsForRecursiveCteNode(List<PipelineDistributedPlan> distributedPlans, + List<RuntimeFilter> runtimeFilters) { + Set<RecursiveCteNode> recursiveCteNodesInRecursiveSide = new HashSet<>(); + PlanNode rootPlan = distributedPlans.get(distributedPlans.size() - 1) + .getFragmentJob().getFragment().getPlanRoot(); + collectAllRecursiveCteNodesInRecursiveSide(rootPlan, false, recursiveCteNodesInRecursiveSide); Set<Integer> fragmentToNotifyClose = new HashSet<>(); Map<PlanFragmentId, TRecCTETarget> fragmentIdToRecCteTargetMap = new TreeMap<>(); Map<PlanFragmentId, Set<TNetworkAddress>> fragmentIdToNetworkAddressMap = new TreeMap<>(); @@ -669,17 +677,42 @@ public class ThriftPlansBuilder { for (List<Expr> exprList : materializedResultExprLists) { texprLists.add(Expr.treesToThrift(exprList)); } + List<Integer> runtimeFiltersToReset = new ArrayList<>(runtimeFilters.size()); + for (RuntimeFilter rf : runtimeFilters) { + if (rf.hasRemoteTargets() + && recursiveCteNode.getChild(1).contains(node -> node == rf.getBuilderNode())) { + runtimeFiltersToReset.add(rf.getFilterId().asInt()); + } + } + boolean isUsedByOtherRecCte = recursiveCteNodesInRecursiveSide.contains(recursiveCteNode); TRecCTENode tRecCTENode = new TRecCTENode(); tRecCTENode.setIsUnionAll(recursiveCteNode.isUnionAll()); tRecCTENode.setTargets(targets); tRecCTENode.setFragmentsToReset(fragmentsToReset); tRecCTENode.setResultExprLists(texprLists); + tRecCTENode.setRecSideRuntimeFilterIds(runtimeFiltersToReset); + tRecCTENode.setIsUsedByOtherRecCte(isUsedByOtherRecCte); recursiveCteNode.settRecCTENode(tRecCTENode); } } return fragmentToNotifyClose; } + private static void collectAllRecursiveCteNodesInRecursiveSide(PlanNode planNode, boolean needCollect, + Set<RecursiveCteNode> recursiveCteNodes) { + if (planNode instanceof RecursiveCteNode) { + if (needCollect) { + recursiveCteNodes.add((RecursiveCteNode) planNode); + } + collectAllRecursiveCteNodesInRecursiveSide(planNode.getChild(0), needCollect, recursiveCteNodes); + collectAllRecursiveCteNodesInRecursiveSide(planNode.getChild(1), true, recursiveCteNodes); + } else { + for (PlanNode child : planNode.getChildren()) { + collectAllRecursiveCteNodesInRecursiveSide(child, needCollect, recursiveCteNodes); + } + } + } + private static class PerNodeScanParams { Map<Integer, List<TScanRangeParams>> perNodeScanRanges; diff --git a/gensrc/thrift/PlanNodes.thrift b/gensrc/thrift/PlanNodes.thrift index 4036ad7ea49..a13f511f9eb 100644 --- a/gensrc/thrift/PlanNodes.thrift +++ b/gensrc/thrift/PlanNodes.thrift @@ -710,6 +710,8 @@ struct TRecCTENode { 2: optional list<TRecCTETarget> targets 3: optional list<TRecCTEResetInfo> fragments_to_reset 4: optional list<list<Exprs.TExpr>> result_expr_lists + 5: optional list<i32> rec_side_runtime_filter_ids + 6: optional bool is_used_by_other_rec_cte } struct TRecCTEScanNode { --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
