This is an automated email from the ASF dual-hosted git repository. starocean999 pushed a commit to branch dev_rec in repository https://gitbox.apache.org/repos/asf/doris.git
commit e6f133b2aec9443ede1da66eb0799dd1bd84ea5d Author: lichi <[email protected]> AuthorDate: Thu Oct 9 15:14:12 2025 +0800 recursive cte fe part --- .../antlr4/org/apache/doris/nereids/DorisLexer.g4 | 1 + .../antlr4/org/apache/doris/nereids/DorisParser.g4 | 2 +- .../doris/catalog/RecursiveCteTempTable.java | 26 ++ .../java/org/apache/doris/catalog/TableIf.java | 6 +- .../org/apache/doris/nereids/CascadesContext.java | 37 +- .../glue/translator/PhysicalPlanTranslator.java | 29 ++ .../doris/nereids/parser/LogicalPlanBuilder.java | 3 +- .../apache/doris/nereids/pattern/MemoPatterns.java | 24 ++ .../properties/ChildOutputPropertyDeriver.java | 12 + .../org/apache/doris/nereids/rules/RuleSet.java | 4 + .../org/apache/doris/nereids/rules/RuleType.java | 2 + .../doris/nereids/rules/analysis/AnalyzeCTE.java | 111 +++++- .../doris/nereids/rules/analysis/BindRelation.java | 27 +- .../nereids/rules/analysis/CollectRelation.java | 11 +- .../nereids/rules/analysis/SubExprAnalyzer.java | 4 +- ...RecursiveCteScanToPhysicalRecursiveCteScan.java | 42 +++ .../LogicalRecursiveCteToPhysicalRecursiveCte.java | 39 ++ .../nereids/rules/rewrite/AdjustNullable.java | 17 + .../doris/nereids/rules/rewrite/CTEInline.java | 31 +- .../doris/nereids/rules/rewrite/ColumnPruning.java | 106 ++++++ .../doris/nereids/stats/StatsCalculator.java | 29 ++ .../trees/copier/LogicalPlanDeepCopier.java | 23 ++ .../apache/doris/nereids/trees/plans/PlanType.java | 4 + .../commands/CreateMaterializedViewCommand.java | 2 +- .../distribute/worker/job/AssignedJobBuilder.java | 93 +++++ .../worker/job/UnassignedJobBuilder.java | 13 + .../worker/job/UnassignedRecursiveCteScanJob.java | 62 +++ .../trees/plans/logical/LogicalRecursiveCte.java | 419 +++++++++++++++++++++ .../plans/logical/LogicalRecursiveCteScan.java | 66 ++++ .../trees/plans/logical/LogicalSubQueryAlias.java | 31 +- .../trees/plans/physical/PhysicalRecursiveCte.java | 251 ++++++++++++ .../plans/physical/PhysicalRecursiveCteScan.java | 85 +++++ .../nereids/trees/plans/visitor/PlanVisitor.java | 10 + .../trees/plans/visitor/RelationVisitor.java | 10 + .../org/apache/doris/planner/RecursiveCteNode.java | 52 +++ .../apache/doris/planner/RecursiveCteScanNode.java | 113 ++++++ .../apache/doris/statistics/StatisticalType.java | 2 + .../recursive_cte/test_recursive_cte.groovy | 102 +++++ 38 files changed, 1845 insertions(+), 56 deletions(-) diff --git a/fe/fe-core/src/main/antlr4/org/apache/doris/nereids/DorisLexer.g4 b/fe/fe-core/src/main/antlr4/org/apache/doris/nereids/DorisLexer.g4 index 1ae0cc0e075..f2a543b3d0c 100644 --- a/fe/fe-core/src/main/antlr4/org/apache/doris/nereids/DorisLexer.g4 +++ b/fe/fe-core/src/main/antlr4/org/apache/doris/nereids/DorisLexer.g4 @@ -438,6 +438,7 @@ REAL: 'REAL'; REBALANCE: 'REBALANCE'; RECENT: 'RECENT'; RECOVER: 'RECOVER'; +RECURSIVE: 'RECURSIVE'; RECYCLE: 'RECYCLE'; REFRESH: 'REFRESH'; REFERENCES: 'REFERENCES'; diff --git a/fe/fe-core/src/main/antlr4/org/apache/doris/nereids/DorisParser.g4 b/fe/fe-core/src/main/antlr4/org/apache/doris/nereids/DorisParser.g4 index 25c0d9a00ab..3f35084e689 100644 --- a/fe/fe-core/src/main/antlr4/org/apache/doris/nereids/DorisParser.g4 +++ b/fe/fe-core/src/main/antlr4/org/apache/doris/nereids/DorisParser.g4 @@ -1200,7 +1200,7 @@ cte ; aliasQuery - : identifier columnAliases? AS LEFT_PAREN query RIGHT_PAREN + : RECURSIVE? identifier columnAliases? AS LEFT_PAREN query RIGHT_PAREN ; columnAliases diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/RecursiveCteTempTable.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/RecursiveCteTempTable.java new file mode 100644 index 00000000000..9f36b04dfc4 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/RecursiveCteTempTable.java @@ -0,0 +1,26 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.catalog; + +import java.util.List; + +public class RecursiveCteTempTable extends Table { + public RecursiveCteTempTable(String tableName, List<Column> fullSchema) { + super(-1, tableName, TableType.RECURSIVE_CTE_TEMP_TABLE, fullSchema); + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/TableIf.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/TableIf.java index d15fabd6b75..5aa7f9ead76 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/TableIf.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/TableIf.java @@ -398,7 +398,8 @@ public interface TableIf { @Deprecated ICEBERG, @Deprecated HUDI, JDBC, TABLE_VALUED_FUNCTION, HMS_EXTERNAL_TABLE, ES_EXTERNAL_TABLE, MATERIALIZED_VIEW, JDBC_EXTERNAL_TABLE, ICEBERG_EXTERNAL_TABLE, TEST_EXTERNAL_TABLE, PAIMON_EXTERNAL_TABLE, MAX_COMPUTE_EXTERNAL_TABLE, - HUDI_EXTERNAL_TABLE, TRINO_CONNECTOR_EXTERNAL_TABLE, LAKESOUl_EXTERNAL_TABLE, DICTIONARY; + HUDI_EXTERNAL_TABLE, TRINO_CONNECTOR_EXTERNAL_TABLE, LAKESOUl_EXTERNAL_TABLE, DICTIONARY, + RECURSIVE_CTE_TEMP_TABLE; public String toEngineName() { switch (this) { @@ -437,6 +438,8 @@ public interface TableIf { return "iceberg"; case DICTIONARY: return "dictionary"; + case RECURSIVE_CTE_TEMP_TABLE: + return "RecursiveCteTempTable"; default: return null; } @@ -475,6 +478,7 @@ public interface TableIf { case PAIMON_EXTERNAL_TABLE: case MATERIALIZED_VIEW: case TRINO_CONNECTOR_EXTERNAL_TABLE: + case RECURSIVE_CTE_TEMP_TABLE: return "BASE TABLE"; default: return null; diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/CascadesContext.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/CascadesContext.java index efb53b66822..96b53b0a78d 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/CascadesContext.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/CascadesContext.java @@ -133,6 +133,8 @@ public class CascadesContext implements ScheduleContext { private final boolean isEnableExprTrace; private int groupExpressionCount = 0; + private Optional<String> currentRecursiveCteName; + private List<Slot> recursiveCteOutputs; /** * Constructor of OptimizerContext. @@ -142,7 +144,8 @@ public class CascadesContext implements ScheduleContext { */ private CascadesContext(Optional<CascadesContext> parent, Optional<CTEId> currentTree, StatementContext statementContext, Plan plan, Memo memo, - CTEContext cteContext, PhysicalProperties requireProperties, boolean isLeadingDisableJoinReorder) { + CTEContext cteContext, PhysicalProperties requireProperties, boolean isLeadingDisableJoinReorder, + Optional<String> currentRecursiveCteName, List<Slot> recursiveCteOutputs) { this.parent = Objects.requireNonNull(parent, "parent should not null"); this.currentTree = Objects.requireNonNull(currentTree, "currentTree should not null"); this.statementContext = Objects.requireNonNull(statementContext, "statementContext should not null"); @@ -167,6 +170,8 @@ public class CascadesContext implements ScheduleContext { this.isEnableExprTrace = false; } this.isLeadingDisableJoinReorder = isLeadingDisableJoinReorder; + this.currentRecursiveCteName = currentRecursiveCteName; + this.recursiveCteOutputs = recursiveCteOutputs; } /** init a temporary context to rewrite expression */ @@ -181,7 +186,7 @@ public class CascadesContext implements ScheduleContext { } return newContext(Optional.empty(), Optional.empty(), statementContext, DUMMY_PLAN, - new CTEContext(), PhysicalProperties.ANY, false); + new CTEContext(), PhysicalProperties.ANY, false, Optional.empty(), ImmutableList.of()); } /** @@ -190,24 +195,25 @@ public class CascadesContext implements ScheduleContext { public static CascadesContext initContext(StatementContext statementContext, Plan initPlan, PhysicalProperties requireProperties) { return newContext(Optional.empty(), Optional.empty(), statementContext, - initPlan, new CTEContext(), requireProperties, false); + initPlan, new CTEContext(), requireProperties, false, Optional.empty(), ImmutableList.of()); } /** * use for analyze cte. we must pass CteContext from outer since we need to get right scope of cte */ public static CascadesContext newContextWithCteContext(CascadesContext cascadesContext, - Plan initPlan, CTEContext cteContext) { + Plan initPlan, CTEContext cteContext, Optional<String> currentRecursiveCteName, + List<Slot> recursiveCteOutputs) { return newContext(Optional.of(cascadesContext), Optional.empty(), cascadesContext.getStatementContext(), initPlan, cteContext, PhysicalProperties.ANY, - cascadesContext.isLeadingDisableJoinReorder - ); + cascadesContext.isLeadingDisableJoinReorder, currentRecursiveCteName, recursiveCteOutputs); } public static CascadesContext newCurrentTreeContext(CascadesContext context) { return CascadesContext.newContext(context.getParent(), context.getCurrentTree(), context.getStatementContext(), context.getRewritePlan(), context.getCteContext(), - context.getCurrentJobContext().getRequiredProperties(), context.isLeadingDisableJoinReorder); + context.getCurrentJobContext().getRequiredProperties(), context.isLeadingDisableJoinReorder, + Optional.empty(), ImmutableList.of()); } /** @@ -216,14 +222,17 @@ public class CascadesContext implements ScheduleContext { public static CascadesContext newSubtreeContext(Optional<CTEId> subtree, CascadesContext context, Plan plan, PhysicalProperties requireProperties) { return CascadesContext.newContext(Optional.of(context), subtree, context.getStatementContext(), - plan, context.getCteContext(), requireProperties, context.isLeadingDisableJoinReorder); + plan, context.getCteContext(), requireProperties, context.isLeadingDisableJoinReorder, Optional.empty(), + ImmutableList.of()); } private static CascadesContext newContext(Optional<CascadesContext> parent, Optional<CTEId> subtree, StatementContext statementContext, Plan initPlan, CTEContext cteContext, - PhysicalProperties requireProperties, boolean isLeadingDisableJoinReorder) { + PhysicalProperties requireProperties, boolean isLeadingDisableJoinReorder, + Optional<String> currentRecursiveCteName, List<Slot> recursiveCteOutputs) { return new CascadesContext(parent, subtree, statementContext, initPlan, null, - cteContext, requireProperties, isLeadingDisableJoinReorder); + cteContext, requireProperties, isLeadingDisableJoinReorder, currentRecursiveCteName, + recursiveCteOutputs); } public CascadesContext getRoot() { @@ -250,6 +259,14 @@ public class CascadesContext implements ScheduleContext { return isTimeout; } + public Optional<String> getCurrentRecursiveCteName() { + return currentRecursiveCteName; + } + + public List<Slot> getRecursiveCteOutputs() { + return recursiveCteOutputs; + } + /** * Init memo with plan */ diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java index a6af723fff8..21f9bbf2126 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java @@ -116,6 +116,7 @@ import org.apache.doris.nereids.trees.plans.Plan; import org.apache.doris.nereids.trees.plans.PreAggStatus; import org.apache.doris.nereids.trees.plans.algebra.Aggregate; import org.apache.doris.nereids.trees.plans.algebra.Relation; +import org.apache.doris.nereids.trees.plans.algebra.SetOperation; import org.apache.doris.nereids.trees.plans.physical.AbstractPhysicalJoin; import org.apache.doris.nereids.trees.plans.physical.AbstractPhysicalSort; import org.apache.doris.nereids.trees.plans.physical.PhysicalAssertNumRows; @@ -156,6 +157,8 @@ import org.apache.doris.nereids.trees.plans.physical.PhysicalPartitionTopN; import org.apache.doris.nereids.trees.plans.physical.PhysicalPlan; import org.apache.doris.nereids.trees.plans.physical.PhysicalProject; import org.apache.doris.nereids.trees.plans.physical.PhysicalQuickSort; +import org.apache.doris.nereids.trees.plans.physical.PhysicalRecursiveCte; +import org.apache.doris.nereids.trees.plans.physical.PhysicalRecursiveCteScan; import org.apache.doris.nereids.trees.plans.physical.PhysicalRelation; import org.apache.doris.nereids.trees.plans.physical.PhysicalRepeat; import org.apache.doris.nereids.trees.plans.physical.PhysicalResultSink; @@ -205,6 +208,8 @@ import org.apache.doris.planner.OlapTableSink; import org.apache.doris.planner.PartitionSortNode; import org.apache.doris.planner.PlanFragment; import org.apache.doris.planner.PlanNode; +import org.apache.doris.planner.RecursiveCteNode; +import org.apache.doris.planner.RecursiveCteScanNode; import org.apache.doris.planner.RepeatNode; import org.apache.doris.planner.ResultFileSink; import org.apache.doris.planner.ResultSink; @@ -1070,6 +1075,27 @@ public class PhysicalPlanTranslator extends DefaultPlanVisitor<PlanFragment, Pla return planFragment; } + @Override + public PlanFragment visitPhysicalRecursiveCteScan(PhysicalRecursiveCteScan recursiveCteScan, + PlanTranslatorContext context) { + TableIf table = recursiveCteScan.getTable(); + List<Slot> slots = ImmutableList.copyOf(recursiveCteScan.getOutput()); + TupleDescriptor tupleDescriptor = generateTupleDesc(slots, table, context); + + RecursiveCteScanNode scanNode = new RecursiveCteScanNode(context.nextPlanNodeId(), tupleDescriptor); + scanNode.setNereidsId(recursiveCteScan.getId()); + context.getNereidsIdToPlanNodeIdMap().put(recursiveCteScan.getId(), scanNode.getId()); + Utils.execWithUncheckedException(scanNode::initScanRangeLocations); + + translateRuntimeFilter(recursiveCteScan, scanNode, context); + + context.addScanNode(scanNode, recursiveCteScan); + PlanFragment planFragment = createPlanFragment(scanNode, DataPartition.RANDOM, recursiveCteScan); + context.addPlanFragment(planFragment); + updateLegacyPlanIdToPhysicalPlan(planFragment.getPlanRoot(), recursiveCteScan); + return planFragment; + } + private List<Expr> translateToExprs(List<Expression> expressions, PlanTranslatorContext context) { List<Expr> exprs = Lists.newArrayListWithCapacity(expressions.size()); for (Expression expression : expressions) { @@ -2274,6 +2300,9 @@ public class PhysicalPlanTranslator extends DefaultPlanVisitor<PlanFragment, Pla setOperationNode = new ExceptNode(context.nextPlanNodeId(), setTuple.getId()); } else if (setOperation instanceof PhysicalIntersect) { setOperationNode = new IntersectNode(context.nextPlanNodeId(), setTuple.getId()); + } else if (setOperation instanceof PhysicalRecursiveCte) { + setOperationNode = new RecursiveCteNode(context.nextPlanNodeId(), setTuple.getId(), + setOperation.getQualifier().equals(SetOperation.Qualifier.ALL)); } else { throw new RuntimeException("not support set operation type " + setOperation); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/parser/LogicalPlanBuilder.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/parser/LogicalPlanBuilder.java index 925d1e57c9b..da2671b760f 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/parser/LogicalPlanBuilder.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/parser/LogicalPlanBuilder.java @@ -2145,7 +2145,8 @@ public class LogicalPlanBuilder extends DorisParserBaseVisitor<Object> { .map(RuleContext::getText) .collect(ImmutableList.toImmutableList()) ); - return new LogicalSubQueryAlias<>(ctx.identifier().getText(), columnNames, queryPlan); + return new LogicalSubQueryAlias<>(ctx.identifier().getText(), columnNames, ctx.RECURSIVE() != null, + queryPlan); }); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/pattern/MemoPatterns.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/pattern/MemoPatterns.java index fa9d191f5e2..ec53deafaad 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/pattern/MemoPatterns.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/pattern/MemoPatterns.java @@ -28,6 +28,7 @@ import org.apache.doris.nereids.trees.plans.logical.LogicalExcept; import org.apache.doris.nereids.trees.plans.logical.LogicalIntersect; import org.apache.doris.nereids.trees.plans.logical.LogicalLeaf; import org.apache.doris.nereids.trees.plans.logical.LogicalPlan; +import org.apache.doris.nereids.trees.plans.logical.LogicalRecursiveCte; import org.apache.doris.nereids.trees.plans.logical.LogicalRelation; import org.apache.doris.nereids.trees.plans.logical.LogicalSetOperation; import org.apache.doris.nereids.trees.plans.logical.LogicalUnary; @@ -205,6 +206,29 @@ public interface MemoPatterns extends Patterns { defaultPromise()); } + /** + * create a LogicalRecursiveCte pattern. + */ + default PatternDescriptor<LogicalRecursiveCte> + logicalRecursiveCte( + PatternDescriptor... children) { + return new PatternDescriptor( + new TypePattern(LogicalRecursiveCte.class, + Arrays.stream(children) + .map(PatternDescriptor::getPattern) + .toArray(Pattern[]::new)), + defaultPromise()); + } + + /** + * create a logicalRecursiveCte group. + */ + default PatternDescriptor<LogicalRecursiveCte> logicalRecursiveCte() { + return new PatternDescriptor( + new TypePattern(LogicalRecursiveCte.class, multiGroup().pattern), + defaultPromise()); + } + /** * create a logicalExcept pattern. */ diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/ChildOutputPropertyDeriver.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/ChildOutputPropertyDeriver.java index d1c8bccb421..d3592388dae 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/ChildOutputPropertyDeriver.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/ChildOutputPropertyDeriver.java @@ -52,6 +52,8 @@ import org.apache.doris.nereids.trees.plans.physical.PhysicalOlapScan; import org.apache.doris.nereids.trees.plans.physical.PhysicalOneRowRelation; import org.apache.doris.nereids.trees.plans.physical.PhysicalPartitionTopN; import org.apache.doris.nereids.trees.plans.physical.PhysicalProject; +import org.apache.doris.nereids.trees.plans.physical.PhysicalRecursiveCte; +import org.apache.doris.nereids.trees.plans.physical.PhysicalRecursiveCteScan; import org.apache.doris.nereids.trees.plans.physical.PhysicalRepeat; import org.apache.doris.nereids.trees.plans.physical.PhysicalSetOperation; import org.apache.doris.nereids.trees.plans.physical.PhysicalSink; @@ -147,6 +149,11 @@ public class ChildOutputPropertyDeriver extends PlanVisitor<PhysicalProperties, return PhysicalProperties.STORAGE_ANY; } + @Override + public PhysicalProperties visitPhysicalRecursiveCteScan(PhysicalRecursiveCteScan cteScan, PlanContext context) { + return PhysicalProperties.GATHER; + } + /** * TODO return ANY after refactor coordinator * return STORAGE_ANY not ANY, in order to generate distribute on jdbc scan. @@ -463,6 +470,11 @@ public class ChildOutputPropertyDeriver extends PlanVisitor<PhysicalProperties, return PhysicalProperties.createHash(request, firstType); } + @Override + public PhysicalProperties visitPhysicalRecursiveCte(PhysicalRecursiveCte recursiveCte, PlanContext context) { + return PhysicalProperties.GATHER; + } + @Override public PhysicalProperties visitPhysicalUnion(PhysicalUnion union, PlanContext context) { if (union.getConstantExprsList().isEmpty()) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/RuleSet.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/RuleSet.java index 45cb22e6b52..122dbee03ef 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/RuleSet.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/RuleSet.java @@ -80,6 +80,8 @@ import org.apache.doris.nereids.rules.implementation.LogicalOlapTableSinkToPhysi import org.apache.doris.nereids.rules.implementation.LogicalOneRowRelationToPhysicalOneRowRelation; import org.apache.doris.nereids.rules.implementation.LogicalPartitionTopNToPhysicalPartitionTopN; import org.apache.doris.nereids.rules.implementation.LogicalProjectToPhysicalProject; +import org.apache.doris.nereids.rules.implementation.LogicalRecursiveCteScanToPhysicalRecursiveCteScan; +import org.apache.doris.nereids.rules.implementation.LogicalRecursiveCteToPhysicalRecursiveCte; import org.apache.doris.nereids.rules.implementation.LogicalRepeatToPhysicalRepeat; import org.apache.doris.nereids.rules.implementation.LogicalResultSinkToPhysicalResultSink; import org.apache.doris.nereids.rules.implementation.LogicalSchemaScanToPhysicalSchemaScan; @@ -194,6 +196,7 @@ public class RuleSet { .add(new LogicalJdbcScanToPhysicalJdbcScan()) .add(new LogicalOdbcScanToPhysicalOdbcScan()) .add(new LogicalEsScanToPhysicalEsScan()) + .add(new LogicalRecursiveCteScanToPhysicalRecursiveCteScan()) .add(new LogicalProjectToPhysicalProject()) .add(new LogicalLimitToPhysicalLimit()) .add(new LogicalWindowToPhysicalWindow()) @@ -209,6 +212,7 @@ public class RuleSet { .add(SplitAggWithoutDistinct.INSTANCE) .add(SplitAggMultiPhase.INSTANCE) .add(SplitAggMultiPhaseWithoutGbyKey.INSTANCE) + .add(new LogicalRecursiveCteToPhysicalRecursiveCte()) .add(new LogicalUnionToPhysicalUnion()) .add(new LogicalExceptToPhysicalExcept()) .add(new LogicalIntersectToPhysicalIntersect()) diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/RuleType.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/RuleType.java index a8bc2052348..c13607a838d 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/RuleType.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/RuleType.java @@ -496,6 +496,7 @@ public enum RuleType { LOGICAL_JDBC_SCAN_TO_PHYSICAL_JDBC_SCAN_RULE(RuleTypeClass.IMPLEMENTATION), LOGICAL_ODBC_SCAN_TO_PHYSICAL_ODBC_SCAN_RULE(RuleTypeClass.IMPLEMENTATION), LOGICAL_ES_SCAN_TO_PHYSICAL_ES_SCAN_RULE(RuleTypeClass.IMPLEMENTATION), + LOGICAL_RECURSIVE_CTE_SCAN_TO_PHYSICAL_RECUSIVE_CTE_SCAN_RULE(RuleTypeClass.IMPLEMENTATION), LOGICAL_BLACKHOLE_SINK_TO_PHYSICAL_BLACKHOLE_SINK_RULE(RuleTypeClass.IMPLEMENTATION), LOGICAL_OLAP_TABLE_SINK_TO_PHYSICAL_OLAP_TABLE_SINK_RULE(RuleTypeClass.IMPLEMENTATION), LOGICAL_HIVE_TABLE_SINK_TO_PHYSICAL_HIVE_TABLE_SINK_RULE(RuleTypeClass.IMPLEMENTATION), @@ -517,6 +518,7 @@ public enum RuleType { COUNT_ON_INDEX_WITHOUT_PROJECT(RuleTypeClass.IMPLEMENTATION), TWO_PHASE_AGGREGATE_WITH_DISTINCT(RuleTypeClass.IMPLEMENTATION), LOGICAL_UNION_TO_PHYSICAL_UNION(RuleTypeClass.IMPLEMENTATION), + LOGICAL_RECURSIVE_CTE_TO_PHYSICAL_RECURSIVE_CTE(RuleTypeClass.IMPLEMENTATION), LOGICAL_EXCEPT_TO_PHYSICAL_EXCEPT(RuleTypeClass.IMPLEMENTATION), LOGICAL_INTERSECT_TO_PHYSICAL_INTERSECT(RuleTypeClass.IMPLEMENTATION), LOGICAL_GENERATE_TO_PHYSICAL_GENERATE(RuleTypeClass.IMPLEMENTATION), diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/AnalyzeCTE.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/AnalyzeCTE.java index 4287e8ca7de..8ac4234e846 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/AnalyzeCTE.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/AnalyzeCTE.java @@ -24,21 +24,29 @@ import org.apache.doris.nereids.exceptions.AnalysisException; import org.apache.doris.nereids.rules.Rule; import org.apache.doris.nereids.rules.RuleType; import org.apache.doris.nereids.trees.expressions.CTEId; +import org.apache.doris.nereids.trees.expressions.NamedExpression; import org.apache.doris.nereids.trees.expressions.Slot; +import org.apache.doris.nereids.trees.expressions.SlotReference; import org.apache.doris.nereids.trees.expressions.StatementScopeIdGenerator; import org.apache.doris.nereids.trees.plans.Plan; import org.apache.doris.nereids.trees.plans.logical.LogicalCTE; import org.apache.doris.nereids.trees.plans.logical.LogicalCTEAnchor; import org.apache.doris.nereids.trees.plans.logical.LogicalCTEProducer; import org.apache.doris.nereids.trees.plans.logical.LogicalPlan; +import org.apache.doris.nereids.trees.plans.logical.LogicalProject; +import org.apache.doris.nereids.trees.plans.logical.LogicalRecursiveCte; import org.apache.doris.nereids.trees.plans.logical.LogicalSubQueryAlias; +import org.apache.doris.nereids.trees.plans.logical.LogicalUnion; +import org.apache.doris.nereids.trees.plans.logical.ProjectProcessor; +import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableList; import com.google.common.collect.Sets; import java.util.ArrayList; import java.util.HashSet; import java.util.List; +import java.util.Optional; import java.util.Set; import java.util.stream.Collectors; @@ -70,7 +78,7 @@ public class AnalyzeCTE extends OneAnalysisRuleFactory { // step 1. analyzed all cte plan Pair<CTEContext, List<LogicalCTEProducer<Plan>>> result = analyzeCte(logicalCTE, ctx.cascadesContext); CascadesContext outerCascadesCtx = CascadesContext.newContextWithCteContext( - ctx.cascadesContext, logicalCTE.child(), result.first); + ctx.cascadesContext, logicalCTE.child(), result.first, Optional.empty(), ImmutableList.of()); outerCascadesCtx.withPlanProcess(ctx.cascadesContext.showPlanProcess(), () -> { outerCascadesCtx.newAnalyzer().analyze(); }); @@ -95,25 +103,96 @@ public class AnalyzeCTE extends OneAnalysisRuleFactory { List<LogicalCTEProducer<Plan>> cteProducerPlans = new ArrayList<>(); for (LogicalSubQueryAlias<Plan> aliasQuery : aliasQueries) { // we should use a chain to ensure visible of cte - LogicalPlan parsedCtePlan = (LogicalPlan) aliasQuery.child(); - CascadesContext innerCascadesCtx = CascadesContext.newContextWithCteContext( - cascadesContext, parsedCtePlan, outerCteCtx); - innerCascadesCtx.withPlanProcess(cascadesContext.showPlanProcess(), () -> { - innerCascadesCtx.newAnalyzer().analyze(); - }); - cascadesContext.addPlanProcesses(innerCascadesCtx.getPlanProcesses()); - LogicalPlan analyzedCtePlan = (LogicalPlan) innerCascadesCtx.getRewritePlan(); - checkColumnAlias(aliasQuery, analyzedCtePlan.getOutput()); - CTEId cteId = StatementScopeIdGenerator.newCTEId(); - LogicalSubQueryAlias<Plan> logicalSubQueryAlias = - aliasQuery.withChildren(ImmutableList.of(analyzedCtePlan)); - outerCteCtx = new CTEContext(cteId, logicalSubQueryAlias, outerCteCtx); - outerCteCtx.setAnalyzedPlan(logicalSubQueryAlias); - cteProducerPlans.add(new LogicalCTEProducer<>(cteId, logicalSubQueryAlias)); + if (aliasQuery.isRecursiveCte()) { + Pair<CTEContext, LogicalCTEProducer<Plan>> result = analyzeRecursiveCte(aliasQuery, outerCteCtx, + cascadesContext); + outerCteCtx = result.first; + cteProducerPlans.add(result.second); + } else { + LogicalPlan parsedCtePlan = (LogicalPlan) aliasQuery.child(); + CascadesContext innerCascadesCtx = CascadesContext.newContextWithCteContext( + cascadesContext, parsedCtePlan, outerCteCtx, Optional.empty(), ImmutableList.of()); + innerCascadesCtx.withPlanProcess(cascadesContext.showPlanProcess(), () -> { + innerCascadesCtx.newAnalyzer().analyze(); + }); + cascadesContext.addPlanProcesses(innerCascadesCtx.getPlanProcesses()); + LogicalPlan analyzedCtePlan = (LogicalPlan) innerCascadesCtx.getRewritePlan(); + checkColumnAlias(aliasQuery, analyzedCtePlan.getOutput()); + CTEId cteId = StatementScopeIdGenerator.newCTEId(); + LogicalSubQueryAlias<Plan> logicalSubQueryAlias = aliasQuery + .withChildren(ImmutableList.of(analyzedCtePlan)); + outerCteCtx = new CTEContext(cteId, logicalSubQueryAlias, outerCteCtx); + outerCteCtx.setAnalyzedPlan(logicalSubQueryAlias); + cteProducerPlans.add(new LogicalCTEProducer<>(cteId, logicalSubQueryAlias)); + } } return Pair.of(outerCteCtx, cteProducerPlans); } + private Pair<CTEContext, LogicalCTEProducer<Plan>> analyzeRecursiveCte(LogicalSubQueryAlias<Plan> aliasQuery, + CTEContext outerCteCtx, CascadesContext cascadesContext) { + Preconditions.checkArgument(aliasQuery.isRecursiveCte(), "alias query must be recursive cte"); + LogicalPlan parsedCtePlan = (LogicalPlan) aliasQuery.child(); + if (!(parsedCtePlan instanceof LogicalUnion) || parsedCtePlan.children().size() != 2) { + throw new AnalysisException("recursive cte must be union"); + } + // analyze anchor child, its output list will be recursive cte temp table's schema + LogicalPlan anchorChild = (LogicalPlan) parsedCtePlan.child(0); + CascadesContext innerAnchorCascadesCtx = CascadesContext.newContextWithCteContext( + cascadesContext, anchorChild, outerCteCtx, Optional.of(aliasQuery.getAlias()), ImmutableList.of()); + innerAnchorCascadesCtx.withPlanProcess(cascadesContext.showPlanProcess(), () -> { + innerAnchorCascadesCtx.newAnalyzer().analyze(); + }); + cascadesContext.addPlanProcesses(innerAnchorCascadesCtx.getPlanProcesses()); + LogicalPlan analyzedAnchorChild = (LogicalPlan) innerAnchorCascadesCtx.getRewritePlan(); + checkColumnAlias(aliasQuery, analyzedAnchorChild.getOutput()); + + // analyze recursive child + LogicalPlan recursiveChild = (LogicalPlan) parsedCtePlan.child(1); + CascadesContext innerRecursiveCascadesCtx = CascadesContext.newContextWithCteContext( + cascadesContext, recursiveChild, outerCteCtx, Optional.of(aliasQuery.getAlias()), + analyzedAnchorChild.getOutput()); + innerRecursiveCascadesCtx.withPlanProcess(cascadesContext.showPlanProcess(), () -> { + innerRecursiveCascadesCtx.newAnalyzer().analyze(); + }); + cascadesContext.addPlanProcesses(innerRecursiveCascadesCtx.getPlanProcesses()); + LogicalPlan analyzedRecursiveChild = (LogicalPlan) innerRecursiveCascadesCtx.getRewritePlan(); + LogicalUnion logicalUnion = (LogicalUnion) parsedCtePlan; + + // manually bind LogicalRecursiveCte, see bindSetOperation in BindExpression.java + LogicalRecursiveCte analyzedCtePlan = new LogicalRecursiveCte( + logicalUnion.getQualifier(), ImmutableList.of(analyzedAnchorChild, analyzedRecursiveChild)); + List<List<NamedExpression>> childrenProjections = analyzedCtePlan.collectChildrenProjections(); + int childrenProjectionSize = childrenProjections.size(); + ImmutableList.Builder<List<SlotReference>> childrenOutputs = ImmutableList + .builderWithExpectedSize(childrenProjectionSize); + ImmutableList.Builder<Plan> newChildren = ImmutableList.builderWithExpectedSize(childrenProjectionSize); + for (int i = 0; i < childrenProjectionSize; i++) { + Plan newChild; + Plan child = analyzedCtePlan.child(i); + if (childrenProjections.get(i).stream().allMatch(SlotReference.class::isInstance)) { + newChild = child; + } else { + List<NamedExpression> parentProject = childrenProjections.get(i); + newChild = ProjectProcessor.tryProcessProject(parentProject, child) + .orElseGet(() -> new LogicalProject<>(parentProject, child)); + } + newChildren.add(newChild); + childrenOutputs.add((List<SlotReference>) (List) newChild.getOutput()); + } + analyzedCtePlan = (LogicalRecursiveCte) analyzedCtePlan.withChildrenAndTheirOutputs(newChildren.build(), + childrenOutputs.build()); + List<NamedExpression> newOutputs = analyzedCtePlan.buildNewOutputs(); + analyzedCtePlan = analyzedCtePlan.withNewOutputs(newOutputs); + + CTEId cteId = StatementScopeIdGenerator.newCTEId(); + LogicalSubQueryAlias<Plan> logicalSubQueryAlias = aliasQuery.withChildren(ImmutableList.of(analyzedCtePlan)); + outerCteCtx = new CTEContext(cteId, logicalSubQueryAlias, outerCteCtx); + outerCteCtx.setAnalyzedPlan(logicalSubQueryAlias); + LogicalCTEProducer<Plan> cteProducer = new LogicalCTEProducer<>(cteId, logicalSubQueryAlias); + return Pair.of(outerCteCtx, cteProducer); + } + /** * check columnAliases' size and name */ diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/BindRelation.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/BindRelation.java index 84652b9eed0..4e6be99f290 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/BindRelation.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/BindRelation.java @@ -27,6 +27,7 @@ import org.apache.doris.catalog.FunctionRegistry; import org.apache.doris.catalog.KeysType; import org.apache.doris.catalog.OlapTable; import org.apache.doris.catalog.Partition; +import org.apache.doris.catalog.RecursiveCteTempTable; import org.apache.doris.catalog.SchemaTable; import org.apache.doris.catalog.SchemaTable.SchemaColumn; import org.apache.doris.catalog.TableIf; @@ -89,6 +90,7 @@ import org.apache.doris.nereids.trees.plans.logical.LogicalJdbcScan; import org.apache.doris.nereids.trees.plans.logical.LogicalOdbcScan; import org.apache.doris.nereids.trees.plans.logical.LogicalOlapScan; import org.apache.doris.nereids.trees.plans.logical.LogicalPlan; +import org.apache.doris.nereids.trees.plans.logical.LogicalRecursiveCteScan; import org.apache.doris.nereids.trees.plans.logical.LogicalSchemaScan; import org.apache.doris.nereids.trees.plans.logical.LogicalSubQueryAlias; import org.apache.doris.nereids.trees.plans.logical.LogicalTVFRelation; @@ -168,16 +170,27 @@ public class BindRelation extends OneAnalysisRuleFactory { return consumer; } } + LogicalPlan scan; List<String> tableQualifier = RelationUtil.getQualifierName( cascadesContext.getConnectContext(), unboundRelation.getNameParts()); - TableIf table = cascadesContext.getStatementContext().getAndCacheTable(tableQualifier, TableFrom.QUERY, - Optional.of(unboundRelation)); + if (tableName.equalsIgnoreCase(cascadesContext.getCurrentRecursiveCteName().orElse(""))) { + ImmutableList.Builder<Column> schema = new ImmutableList.Builder<>(); + for (Slot slot : cascadesContext.getRecursiveCteOutputs()) { + schema.add(new Column(slot.getName(), slot.getDataType().toCatalogDataType(), slot.nullable())); + } + RecursiveCteTempTable cteTempTable = new RecursiveCteTempTable(tableName, schema.build()); + scan = new LogicalRecursiveCteScan(cascadesContext.getStatementContext().getNextRelationId(), + cteTempTable, tableQualifier); + } else { + TableIf table = cascadesContext.getStatementContext().getAndCacheTable(tableQualifier, TableFrom.QUERY, + Optional.of(unboundRelation)); - LogicalPlan scan = getLogicalPlan(table, unboundRelation, tableQualifier, cascadesContext); - if (cascadesContext.isLeadingJoin()) { - LeadingHint leading = (LeadingHint) cascadesContext.getHintMap().get("Leading"); - leading.putRelationIdAndTableName(Pair.of(unboundRelation.getRelationId(), tableName)); - leading.getRelationIdToScanMap().put(unboundRelation.getRelationId(), scan); + scan = getLogicalPlan(table, unboundRelation, tableQualifier, cascadesContext); + if (cascadesContext.isLeadingJoin()) { + LeadingHint leading = (LeadingHint) cascadesContext.getHintMap().get("Leading"); + leading.putRelationIdAndTableName(Pair.of(unboundRelation.getRelationId(), tableName)); + leading.getRelationIdToScanMap().put(unboundRelation.getRelationId(), scan); + } } return scan; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/CollectRelation.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/CollectRelation.java index 53650234610..468bc8c7d7c 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/CollectRelation.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/CollectRelation.java @@ -102,8 +102,10 @@ public class CollectRelation implements AnalysisRuleFactory { for (LogicalSubQueryAlias<Plan> aliasQuery : aliasQueries) { // we should use a chain to ensure visible of cte LogicalPlan parsedCtePlan = (LogicalPlan) aliasQuery.child(); + // 看起来需要在CascadesContext中添加当前CTE的name,以便判断自引用 CascadesContext innerCascadesCtx = CascadesContext.newContextWithCteContext( - cascadesContext, parsedCtePlan, outerCteCtx); + cascadesContext, parsedCtePlan, outerCteCtx, aliasQuery.isRecursiveCte() + ? Optional.of(aliasQuery.getAlias()) : Optional.empty(), ImmutableList.of()); innerCascadesCtx.newTableCollector().collect(); LogicalPlan analyzedCtePlan = (LogicalPlan) innerCascadesCtx.getRewritePlan(); // cteId is not used in CollectTable stage @@ -122,7 +124,8 @@ public class CollectRelation implements AnalysisRuleFactory { if (e instanceof SubqueryExpr) { SubqueryExpr subqueryExpr = (SubqueryExpr) e; CascadesContext subqueryContext = CascadesContext.newContextWithCteContext( - ctx.cascadesContext, subqueryExpr.getQueryPlan(), ctx.cteContext); + ctx.cascadesContext, subqueryExpr.getQueryPlan(), ctx.cteContext, Optional.empty(), + ImmutableList.of()); subqueryContext.keepOrShowPlanProcess(ctx.cascadesContext.showPlanProcess(), () -> subqueryContext.newTableCollector().collect()); ctx.cascadesContext.addPlanProcesses(subqueryContext.getPlanProcesses()); @@ -174,6 +177,10 @@ public class CollectRelation implements AnalysisRuleFactory { List<String> nameParts, TableFrom tableFrom, Optional<UnboundRelation> unboundRelation) { if (nameParts.size() == 1) { String tableName = nameParts.get(0); + if (cascadesContext.getCurrentRecursiveCteName().isPresent() + && tableName.equalsIgnoreCase(cascadesContext.getCurrentRecursiveCteName().get())) { + return; + } // check if it is a CTE's name CTEContext cteContext = cascadesContext.getCteContext().findCTEContext(tableName).orElse(null); if (cteContext != null) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/SubExprAnalyzer.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/SubExprAnalyzer.java index 69d79934992..05a35dc6ed8 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/SubExprAnalyzer.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/SubExprAnalyzer.java @@ -52,6 +52,7 @@ import java.util.Collection; import java.util.HashSet; import java.util.List; import java.util.Objects; +import java.util.Optional; import java.util.Set; /** @@ -227,7 +228,8 @@ class SubExprAnalyzer<T> extends DefaultExpressionRewriter<T> { throw new IllegalStateException("Missing CascadesContext"); } CascadesContext subqueryContext = CascadesContext.newContextWithCteContext( - cascadesContext, expr.getQueryPlan(), cascadesContext.getCteContext()); + cascadesContext, expr.getQueryPlan(), cascadesContext.getCteContext(), Optional.empty(), + ImmutableList.of()); // don't use `getScope()` because we only need `getScope().getOuterScope()` and `getScope().getSlots()` // otherwise unexpected errors may occur Scope subqueryScope = new Scope(getScope().getOuterScope(), diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/implementation/LogicalRecursiveCteScanToPhysicalRecursiveCteScan.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/implementation/LogicalRecursiveCteScanToPhysicalRecursiveCteScan.java new file mode 100644 index 00000000000..8714c280bb9 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/implementation/LogicalRecursiveCteScanToPhysicalRecursiveCteScan.java @@ -0,0 +1,42 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.nereids.rules.implementation; + +import org.apache.doris.nereids.rules.Rule; +import org.apache.doris.nereids.rules.RuleType; +import org.apache.doris.nereids.trees.plans.physical.PhysicalRecursiveCteScan; + +import java.util.Optional; + +/** + * Implementation rule that convert logical Recursive Cte Scan to physical Recursive Cte Scan. + */ +public class LogicalRecursiveCteScanToPhysicalRecursiveCteScan extends OneImplementationRuleFactory { + @Override + public Rule build() { + return logicalRecursiveCteScan().then(recursiveCteScan -> + new PhysicalRecursiveCteScan( + recursiveCteScan.getRelationId(), + recursiveCteScan.getTable(), + recursiveCteScan.getQualifier(), + Optional.empty(), + recursiveCteScan.getLogicalProperties(), + recursiveCteScan.getOperativeSlots()) + ).toRule(RuleType.LOGICAL_RECURSIVE_CTE_SCAN_TO_PHYSICAL_RECUSIVE_CTE_SCAN_RULE); + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/implementation/LogicalRecursiveCteToPhysicalRecursiveCte.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/implementation/LogicalRecursiveCteToPhysicalRecursiveCte.java new file mode 100644 index 00000000000..c8960aa9e51 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/implementation/LogicalRecursiveCteToPhysicalRecursiveCte.java @@ -0,0 +1,39 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.nereids.rules.implementation; + +import org.apache.doris.nereids.rules.Rule; +import org.apache.doris.nereids.rules.RuleType; +import org.apache.doris.nereids.trees.plans.physical.PhysicalRecursiveCte; + +/** + * Implementation rule that convert logical Recursive Cte to Physical Recursive Cte. + */ +public class LogicalRecursiveCteToPhysicalRecursiveCte extends OneImplementationRuleFactory { + @Override + public Rule build() { + return logicalRecursiveCte().then(recursiveCte -> + new PhysicalRecursiveCte(recursiveCte.getQualifier(), + recursiveCte.getOutputs(), + recursiveCte.getRegularChildrenOutputs(), + recursiveCte.getConstantExprsList(), + recursiveCte.getLogicalProperties(), + recursiveCte.children()) + ).toRule(RuleType.LOGICAL_RECURSIVE_CTE_TO_PHYSICAL_RECURSIVE_CTE); + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/AdjustNullable.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/AdjustNullable.java index 8592d3f22a0..63d63f32b6f 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/AdjustNullable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/AdjustNullable.java @@ -41,6 +41,7 @@ import org.apache.doris.nereids.trees.plans.logical.LogicalJoin; import org.apache.doris.nereids.trees.plans.logical.LogicalPartitionTopN; import org.apache.doris.nereids.trees.plans.logical.LogicalPlan; import org.apache.doris.nereids.trees.plans.logical.LogicalProject; +import org.apache.doris.nereids.trees.plans.logical.LogicalRecursiveCte; import org.apache.doris.nereids.trees.plans.logical.LogicalRepeat; import org.apache.doris.nereids.trees.plans.logical.LogicalSetOperation; import org.apache.doris.nereids.trees.plans.logical.LogicalSink; @@ -330,6 +331,22 @@ public class AdjustNullable extends DefaultPlanRewriter<Map<ExprId, Slot>> imple inputNullable.set(j, inputNullable.get(j) || constantExprs.get(j).nullable()); } } + } else if (setOperation instanceof LogicalRecursiveCte) { + // LogicalRecursiveCte is basically like LogicalUnion, so just do same as LogicalUnion + LogicalRecursiveCte logicalRecursiveCte = (LogicalRecursiveCte) setOperation; + if (!logicalRecursiveCte.getConstantExprsList().isEmpty() && setOperation.children().isEmpty()) { + int outputSize = logicalRecursiveCte.getConstantExprsList().get(0).size(); + // create the inputNullable list and fill it with all FALSE values + inputNullable = Lists.newArrayListWithCapacity(outputSize); + for (int i = 0; i < outputSize; i++) { + inputNullable.add(false); + } + } + for (List<NamedExpression> constantExprs : logicalRecursiveCte.getConstantExprsList()) { + for (int j = 0; j < constantExprs.size(); j++) { + inputNullable.set(j, inputNullable.get(j) || constantExprs.get(j).nullable()); + } + } } if (inputNullable == null) { // this is a fail-safe diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/CTEInline.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/CTEInline.java index 22ec72c99c5..30970dee649 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/CTEInline.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/CTEInline.java @@ -31,15 +31,19 @@ import org.apache.doris.nereids.trees.plans.logical.LogicalCTEConsumer; import org.apache.doris.nereids.trees.plans.logical.LogicalCTEProducer; import org.apache.doris.nereids.trees.plans.logical.LogicalPlan; import org.apache.doris.nereids.trees.plans.logical.LogicalProject; +import org.apache.doris.nereids.trees.plans.logical.LogicalRecursiveCte; import org.apache.doris.nereids.trees.plans.visitor.CustomRewriter; import org.apache.doris.nereids.trees.plans.visitor.DefaultPlanRewriter; import org.apache.doris.qe.ConnectContext; import com.google.common.collect.ImmutableList; import com.google.common.collect.Lists; +import com.google.common.collect.Sets; import java.util.ArrayList; +import java.util.HashSet; import java.util.List; +import java.util.Set; /** * pull up LogicalCteAnchor to the top of plan to avoid CteAnchor break other rewrite rules pattern @@ -48,9 +52,15 @@ import java.util.List; * and put all of them to the top of plan depends on dependency tree of them. */ public class CTEInline extends DefaultPlanRewriter<LogicalCTEProducer<?>> implements CustomRewriter { + Set<LogicalCTEConsumer> mustInlineCteConsumers = new HashSet<>(); @Override public Plan rewriteRoot(Plan plan, JobContext jobContext) { + List<LogicalRecursiveCte> recursiveCteList = plan.collectToList(LogicalRecursiveCte.class::isInstance); + for (LogicalRecursiveCte recursiveCte : recursiveCteList) { + mustInlineCteConsumers.addAll(recursiveCte.collect(LogicalCTEConsumer.class::isInstance)); + } + Plan root = plan.accept(this, null); // collect cte id to consumer root.foreach(p -> { @@ -78,17 +88,24 @@ public class CTEInline extends DefaultPlanRewriter<LogicalCTEProducer<?>> implem } return false; }); - ConnectContext connectContext = ConnectContext.get(); - if (connectContext.getSessionVariable().enableCTEMaterialize - && consumers.size() > connectContext.getSessionVariable().inlineCTEReferencedThreshold) { - // not inline - Plan right = cteAnchor.right().accept(this, null); - return cteAnchor.withChildren(cteAnchor.left(), right); - } else { + if (!Sets.intersection(mustInlineCteConsumers, Sets.newHashSet(consumers)).isEmpty()) { // should inline Plan root = cteAnchor.right().accept(this, (LogicalCTEProducer<?>) cteAnchor.left()); // process child return root.accept(this, null); + } else { + ConnectContext connectContext = ConnectContext.get(); + if (connectContext.getSessionVariable().enableCTEMaterialize + && consumers.size() > connectContext.getSessionVariable().inlineCTEReferencedThreshold) { + // not inline + Plan right = cteAnchor.right().accept(this, null); + return cteAnchor.withChildren(cteAnchor.left(), right); + } else { + // should inline + Plan root = cteAnchor.right().accept(this, (LogicalCTEProducer<?>) cteAnchor.left()); + // process child + return root.accept(this, null); + } } } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/ColumnPruning.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/ColumnPruning.java index 90e94d578ba..96daceedfe4 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/ColumnPruning.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/ColumnPruning.java @@ -44,6 +44,7 @@ import org.apache.doris.nereids.trees.plans.logical.LogicalExcept; import org.apache.doris.nereids.trees.plans.logical.LogicalFilter; import org.apache.doris.nereids.trees.plans.logical.LogicalIntersect; import org.apache.doris.nereids.trees.plans.logical.LogicalProject; +import org.apache.doris.nereids.trees.plans.logical.LogicalRecursiveCte; import org.apache.doris.nereids.trees.plans.logical.LogicalRepeat; import org.apache.doris.nereids.trees.plans.logical.LogicalSink; import org.apache.doris.nereids.trees.plans.logical.LogicalUnion; @@ -213,6 +214,47 @@ public class ColumnPruning extends DefaultPlanRewriter<PruneContext> implements return pruneChildren(plan, new RoaringBitmap()); } + @Override + public Plan visitLogicalRecursiveCte(LogicalRecursiveCte recursiveCte, PruneContext context) { + // LogicalRecursiveCte is basically like LogicalUnion, so just do same as LogicalUnion + if (recursiveCte.getQualifier() == Qualifier.DISTINCT) { + return skipPruneThisAndFirstLevelChildren(recursiveCte); + } + LogicalRecursiveCte prunedOutputRecursiveCte = pruneRecursiveCteOutput(recursiveCte, context); + // start prune children of recursiveCte + List<Slot> originOutput = recursiveCte.getOutput(); + Set<Slot> prunedOutput = prunedOutputRecursiveCte.getOutputSet(); + List<Integer> prunedOutputIndexes = IntStream.range(0, originOutput.size()) + .filter(index -> prunedOutput.contains(originOutput.get(index))) + .boxed() + .collect(ImmutableList.toImmutableList()); + + ImmutableList.Builder<Plan> prunedChildren = ImmutableList.builder(); + ImmutableList.Builder<List<SlotReference>> prunedChildrenOutputs = ImmutableList.builder(); + for (int i = 0; i < prunedOutputRecursiveCte.arity(); i++) { + List<SlotReference> regularChildOutputs = prunedOutputRecursiveCte.getRegularChildOutput(i); + + RoaringBitmap prunedChildOutputExprIds = new RoaringBitmap(); + Builder<SlotReference> prunedChildOutputBuilder + = ImmutableList.builderWithExpectedSize(regularChildOutputs.size()); + for (Integer index : prunedOutputIndexes) { + SlotReference slot = regularChildOutputs.get(index); + prunedChildOutputBuilder.add(slot); + prunedChildOutputExprIds.add(slot.getExprId().asInt()); + } + + List<SlotReference> prunedChildOutput = prunedChildOutputBuilder.build(); + Plan prunedChild = doPruneChild( + prunedOutputRecursiveCte, prunedOutputRecursiveCte.child(i), prunedChildOutputExprIds, + prunedChildOutput, true + ); + prunedChildrenOutputs.add(prunedChildOutput); + prunedChildren.add(prunedChild); + } + return prunedOutputRecursiveCte.withChildrenAndTheirOutputs(prunedChildren.build(), + prunedChildrenOutputs.build()); + } + // union can not prune children by the common logic, we must override visit method to write special code. @Override public Plan visitLogicalUnion(LogicalUnion union, PruneContext context) { @@ -403,6 +445,70 @@ public class ColumnPruning extends DefaultPlanRewriter<PruneContext> implements } } + private LogicalRecursiveCte pruneRecursiveCteOutput(LogicalRecursiveCte recursiveCte, PruneContext context) { + List<NamedExpression> originOutput = recursiveCte.getOutputs(); + if (originOutput.isEmpty()) { + return recursiveCte; + } + List<NamedExpression> prunedOutputs = Lists.newArrayList(); + List<List<NamedExpression>> constantExprsList = recursiveCte.getConstantExprsList(); + List<List<SlotReference>> regularChildrenOutputs = recursiveCte.getRegularChildrenOutputs(); + List<Plan> children = recursiveCte.children(); + List<Integer> extractColumnIndex = Lists.newArrayList(); + for (int i = 0; i < originOutput.size(); i++) { + NamedExpression output = originOutput.get(i); + if (context.requiredSlotsIds.contains(output.getExprId().asInt())) { + prunedOutputs.add(output); + extractColumnIndex.add(i); + } + } + + ImmutableList.Builder<List<NamedExpression>> prunedConstantExprsList + = ImmutableList.builderWithExpectedSize(constantExprsList.size()); + if (prunedOutputs.isEmpty()) { + // process prune all columns + NamedExpression originSlot = originOutput.get(0); + prunedOutputs = ImmutableList.of(new SlotReference(originSlot.getExprId(), originSlot.getName(), + TinyIntType.INSTANCE, false, originSlot.getQualifier())); + regularChildrenOutputs = Lists.newArrayListWithCapacity(regularChildrenOutputs.size()); + children = Lists.newArrayListWithCapacity(children.size()); + for (int i = 0; i < recursiveCte.getArity(); i++) { + Plan child = recursiveCte.child(i); + List<NamedExpression> newProjectOutput = ImmutableList.of(new Alias(new TinyIntLiteral((byte) 1))); + LogicalProject<?> project; + if (child instanceof LogicalProject) { + LogicalProject<Plan> childProject = (LogicalProject<Plan>) child; + List<NamedExpression> mergeProjections = PlanUtils.mergeProjections( + childProject.getProjects(), newProjectOutput); + project = new LogicalProject<>(mergeProjections, childProject.child()); + } else { + project = new LogicalProject<>(newProjectOutput, child); + } + regularChildrenOutputs.add((List) project.getOutput()); + children.add(project); + } + for (int i = 0; i < constantExprsList.size(); i++) { + prunedConstantExprsList.add(ImmutableList.of(new Alias(new TinyIntLiteral((byte) 1)))); + } + } else { + int len = extractColumnIndex.size(); + for (List<NamedExpression> row : constantExprsList) { + ImmutableList.Builder<NamedExpression> newRow = ImmutableList.builderWithExpectedSize(len); + for (int idx : extractColumnIndex) { + newRow.add(row.get(idx)); + } + prunedConstantExprsList.add(newRow.build()); + } + } + + if (prunedOutputs.equals(originOutput) && !context.requiredSlotsIds.isEmpty()) { + return recursiveCte; + } else { + return recursiveCte.withNewOutputsChildrenAndConstExprsList(prunedOutputs, children, + regularChildrenOutputs, prunedConstantExprsList.build()); + } + } + private LogicalUnion pruneUnionOutput(LogicalUnion union, PruneContext context) { List<NamedExpression> originOutput = union.getOutputs(); if (originOutput.isEmpty()) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/stats/StatsCalculator.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/stats/StatsCalculator.java index f0ca1f1e6ba..a14c2dfc9cb 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/stats/StatsCalculator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/stats/StatsCalculator.java @@ -90,6 +90,8 @@ import org.apache.doris.nereids.trees.plans.logical.LogicalOlapScan; import org.apache.doris.nereids.trees.plans.logical.LogicalOneRowRelation; import org.apache.doris.nereids.trees.plans.logical.LogicalPartitionTopN; import org.apache.doris.nereids.trees.plans.logical.LogicalProject; +import org.apache.doris.nereids.trees.plans.logical.LogicalRecursiveCte; +import org.apache.doris.nereids.trees.plans.logical.LogicalRecursiveCteScan; import org.apache.doris.nereids.trees.plans.logical.LogicalRepeat; import org.apache.doris.nereids.trees.plans.logical.LogicalSchemaScan; import org.apache.doris.nereids.trees.plans.logical.LogicalSink; @@ -123,6 +125,8 @@ import org.apache.doris.nereids.trees.plans.physical.PhysicalOneRowRelation; import org.apache.doris.nereids.trees.plans.physical.PhysicalPartitionTopN; import org.apache.doris.nereids.trees.plans.physical.PhysicalProject; import org.apache.doris.nereids.trees.plans.physical.PhysicalQuickSort; +import org.apache.doris.nereids.trees.plans.physical.PhysicalRecursiveCte; +import org.apache.doris.nereids.trees.plans.physical.PhysicalRecursiveCteScan; import org.apache.doris.nereids.trees.plans.physical.PhysicalRelation; import org.apache.doris.nereids.trees.plans.physical.PhysicalRepeat; import org.apache.doris.nereids.trees.plans.physical.PhysicalSchemaScan; @@ -869,6 +873,12 @@ public class StatsCalculator extends DefaultPlanVisitor<Statistics, Void> { return computeCatalogRelation(esScan); } + @Override + public Statistics visitLogicalRecursiveCteScan(LogicalRecursiveCteScan recursiveCteScan, Void context) { + recursiveCteScan.getExpressions(); + return computeCatalogRelation(recursiveCteScan); + } + @Override public Statistics visitLogicalProject(LogicalProject<? extends Plan> project, Void context) { return computeProject(project, groupExpression.childStatistics(0)); @@ -912,6 +922,14 @@ public class StatsCalculator extends DefaultPlanVisitor<Statistics, Void> { return computeAssertNumRows(assertNumRows.getAssertNumRowsElement(), groupExpression.childStatistics(0)); } + @Override + public Statistics visitLogicalRecursiveCte( + LogicalRecursiveCte recursiveCte, Void context) { + return computeUnion(recursiveCte, + groupExpression.children() + .stream().map(Group::getStatistics).collect(Collectors.toList())); + } + @Override public Statistics visitLogicalUnion( LogicalUnion union, Void context) { @@ -997,6 +1015,11 @@ public class StatsCalculator extends DefaultPlanVisitor<Statistics, Void> { return computeCatalogRelation(schemaScan); } + @Override + public Statistics visitPhysicalRecursiveCteScan(PhysicalRecursiveCteScan recursiveCteScan, Void context) { + return computeCatalogRelation(recursiveCteScan); + } + @Override public Statistics visitPhysicalFileScan(PhysicalFileScan fileScan, Void context) { return computeCatalogRelation(fileScan); @@ -1083,6 +1106,12 @@ public class StatsCalculator extends DefaultPlanVisitor<Statistics, Void> { return computeAssertNumRows(assertNumRows.getAssertNumRowsElement(), groupExpression.childStatistics(0)); } + @Override + public Statistics visitPhysicalRecursiveCte(PhysicalRecursiveCte recursiveCte, Void context) { + return computeUnion(recursiveCte, groupExpression.children() + .stream().map(Group::getStatistics).collect(Collectors.toList())); + } + @Override public Statistics visitPhysicalUnion(PhysicalUnion union, Void context) { return computeUnion(union, groupExpression.children() diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/copier/LogicalPlanDeepCopier.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/copier/LogicalPlanDeepCopier.java index e85c6eb8dae..c9cf877f709 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/copier/LogicalPlanDeepCopier.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/copier/LogicalPlanDeepCopier.java @@ -53,6 +53,7 @@ import org.apache.doris.nereids.trees.plans.logical.LogicalOneRowRelation; import org.apache.doris.nereids.trees.plans.logical.LogicalPartitionTopN; import org.apache.doris.nereids.trees.plans.logical.LogicalPlan; import org.apache.doris.nereids.trees.plans.logical.LogicalProject; +import org.apache.doris.nereids.trees.plans.logical.LogicalRecursiveCte; import org.apache.doris.nereids.trees.plans.logical.LogicalRelation; import org.apache.doris.nereids.trees.plans.logical.LogicalRepeat; import org.apache.doris.nereids.trees.plans.logical.LogicalSink; @@ -354,6 +355,28 @@ public class LogicalPlanDeepCopier extends DefaultPlanRewriter<DeepCopierContext constantExprsList, union.hasPushedFilter(), children); } + @Override + public Plan visitLogicalRecursiveCte(LogicalRecursiveCte recursiveCte, DeepCopierContext context) { + List<Plan> children = recursiveCte.children().stream() + .map(c -> c.accept(this, context)) + .collect(ImmutableList.toImmutableList()); + List<List<NamedExpression>> constantExprsList = recursiveCte.getConstantExprsList().stream() + .map(l -> l.stream() + .map(e -> (NamedExpression) ExpressionDeepCopier.INSTANCE.deepCopy(e, context)) + .collect(ImmutableList.toImmutableList())) + .collect(ImmutableList.toImmutableList()); + List<NamedExpression> outputs = recursiveCte.getOutputs().stream() + .map(o -> (NamedExpression) ExpressionDeepCopier.INSTANCE.deepCopy(o, context)) + .collect(ImmutableList.toImmutableList()); + List<List<SlotReference>> childrenOutputs = recursiveCte.getRegularChildrenOutputs().stream() + .map(childOutputs -> childOutputs.stream() + .map(o -> (SlotReference) ExpressionDeepCopier.INSTANCE.deepCopy(o, context)) + .collect(ImmutableList.toImmutableList())) + .collect(ImmutableList.toImmutableList()); + return new LogicalRecursiveCte(recursiveCte.getQualifier(), outputs, childrenOutputs, + constantExprsList, recursiveCte.hasPushedFilter(), children); + } + @Override public Plan visitLogicalExcept(LogicalExcept except, DeepCopierContext context) { List<Plan> children = except.children().stream() diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/PlanType.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/PlanType.java index f5ace7cd416..f67558962f4 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/PlanType.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/PlanType.java @@ -83,6 +83,8 @@ public enum PlanType { LOGICAL_PARTITION_TOP_N, LOGICAL_PROJECT, LOGICAL_QUALIFY, + LOGICAL_RECURSIVE_CTE, + LOGICAL_RECURSIVE_CTE_SCAN, LOGICAL_REPEAT, LOGICAL_SELECT_HINT, LOGICAL_SUBQUERY_ALIAS, @@ -107,6 +109,7 @@ public enum PlanType { PHYSICAL_OLAP_SCAN, PHYSICAL_SCHEMA_SCAN, PHYSICAL_TVF_RELATION, + PHYSICAL_RECURSIVE_CTE_SCAN, // physical sinks PHYSICAL_FILE_SINK, @@ -123,6 +126,7 @@ public enum PlanType { PHYSICAL_ASSERT_NUM_ROWS, PHYSICAL_CTE_PRODUCER, PHYSICAL_CTE_ANCHOR, + PHYSICAL_RECURSIVE_CTE, PHYSICAL_DISTRIBUTE, PHYSICAL_EXCEPT, PHYSICAL_FILTER, diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/CreateMaterializedViewCommand.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/CreateMaterializedViewCommand.java index 3d9db687908..3ea481294da 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/CreateMaterializedViewCommand.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/CreateMaterializedViewCommand.java @@ -323,7 +323,7 @@ public class CreateMaterializedViewCommand extends Command implements ForwardWit } try { Expr defineExpr = translateToLegacyExpr(predicate, context.planTranslatorContext); - context.filterItem = new MVColumnItem(defineExpr.toSqlWithoutTbl(), defineExpr); + context.filterItem = new MVColumnItem(predicate.toSql(), defineExpr); } catch (Exception ex) { throw new AnalysisException(ex.getMessage()); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/AssignedJobBuilder.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/AssignedJobBuilder.java index 9369acd8d6f..4291ba32732 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/AssignedJobBuilder.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/AssignedJobBuilder.java @@ -17,19 +17,33 @@ package org.apache.doris.nereids.trees.plans.distribute.worker.job; +import org.apache.doris.analysis.Expr; import org.apache.doris.nereids.trees.plans.distribute.DistributeContext; import org.apache.doris.nereids.trees.plans.distribute.worker.BackendDistributedPlanWorkerManager; +import org.apache.doris.nereids.trees.plans.distribute.worker.DistributedPlanWorker; import org.apache.doris.planner.ExchangeNode; +import org.apache.doris.planner.PlanFragment; import org.apache.doris.planner.PlanFragmentId; +import org.apache.doris.planner.RecursiveCteNode; +import org.apache.doris.planner.RecursiveCteScanNode; import org.apache.doris.thrift.TExplainLevel; +import org.apache.doris.thrift.TExpr; +import org.apache.doris.thrift.TNetworkAddress; +import org.apache.doris.thrift.TRecCTENode; +import org.apache.doris.thrift.TRecCTEResetInfo; +import org.apache.doris.thrift.TRecCTETarget; import com.google.common.collect.ArrayListMultimap; import com.google.common.collect.ListMultimap; +import java.util.ArrayList; import java.util.Collection; import java.util.List; import java.util.Map; import java.util.Map.Entry; +import java.util.Set; +import java.util.TreeMap; +import java.util.TreeSet; /** AssignedJobBuilder */ public class AssignedJobBuilder { @@ -39,6 +53,8 @@ public class AssignedJobBuilder { boolean isLoadJob) { DistributeContext distributeContext = new DistributeContext(workerManager, isLoadJob); ListMultimap<PlanFragmentId, AssignedJob> allAssignedJobs = ArrayListMultimap.create(); + Map<PlanFragmentId, TRecCTETarget> fragmentIdToRecCteTargetMap = new TreeMap<>(); + Map<PlanFragmentId, Set<TNetworkAddress>> fragmentIdToNetworkAddressMap = new TreeMap<>(); for (Entry<PlanFragmentId, UnassignedJob> kv : unassignedJobs.entrySet()) { PlanFragmentId fragmentId = kv.getKey(); UnassignedJob unassignedJob = kv.getValue(); @@ -55,6 +71,83 @@ public class AssignedJobBuilder { + ", fragment: " + unassignedJob.getFragment().getExplainString(TExplainLevel.VERBOSE)); } allAssignedJobs.putAll(fragmentId, fragmentAssignedJobs); + + Set<TNetworkAddress> networkAddresses = new TreeSet<>(); + for (AssignedJob assignedJob : fragmentAssignedJobs) { + DistributedPlanWorker distributedPlanWorker = assignedJob.getAssignedWorker(); + networkAddresses.add(new TNetworkAddress(distributedPlanWorker.host(), distributedPlanWorker.port())); + } + fragmentIdToNetworkAddressMap.put(fragmentId, networkAddresses); + + PlanFragment planFragment = unassignedJob.getFragment(); + List<RecursiveCteScanNode> recursiveCteScanNodes = planFragment.getPlanRoot() + .collectInCurrentFragment(RecursiveCteScanNode.class::isInstance); + if (!recursiveCteScanNodes.isEmpty()) { + if (recursiveCteScanNodes.size() != 1) { + throw new IllegalStateException( + String.format("one fragment can only have 1 recursive cte scan node, but there is %d", + recursiveCteScanNodes.size())); + } + if (fragmentAssignedJobs.size() != 1) { + throw new IllegalStateException(String.format( + "fragmentAssignedJobs's size must be 1 for recursive cte scan node, but it is %d", + fragmentAssignedJobs.size())); + } + TRecCTETarget tRecCTETarget = new TRecCTETarget(); + DistributedPlanWorker distributedPlanWorker = fragmentAssignedJobs.get(0).getAssignedWorker(); + tRecCTETarget.setAddr(new TNetworkAddress(distributedPlanWorker.host(), distributedPlanWorker.port())); + tRecCTETarget.setFragmentInstanceId(fragmentAssignedJobs.get(0).instanceId()); + tRecCTETarget.setNodeId(recursiveCteScanNodes.get(0).getId().asInt()); + fragmentIdToRecCteTargetMap.put(fragmentId, tRecCTETarget); + } + + List<RecursiveCteNode> recursiveCteNodes = planFragment.getPlanRoot() + .collectInCurrentFragment(RecursiveCteNode.class::isInstance); + if (!recursiveCteNodes.isEmpty()) { + if (recursiveCteNodes.size() != 1) { + throw new IllegalStateException( + String.format("one fragment can only have 1 recursive cte node, but there is %d", + recursiveCteNodes.size())); + } + + List<TRecCTETarget> targets = new ArrayList<>(); + List<TRecCTEResetInfo> fragmentsToReset = new ArrayList<>(); + // PhysicalPlanTranslator will swap recursiveCteNodes's child fragment, + // so we get recursive one by 1st child + List<PlanFragment> childFragments = new ArrayList<>(); + planFragment.getChild(0).collectAll(PlanFragment.class::isInstance, childFragments); + for (PlanFragment child : childFragments) { + PlanFragmentId childFragmentId = child.getFragmentId(); + TRecCTETarget tRecCTETarget = fragmentIdToRecCteTargetMap.getOrDefault(childFragmentId, null); + if (tRecCTETarget != null) { + targets.add(tRecCTETarget); + } + Set<TNetworkAddress> tNetworkAddresses = fragmentIdToNetworkAddressMap.get(childFragmentId); + if (tNetworkAddresses == null) { + throw new IllegalStateException( + String.format("can't find TNetworkAddress for fragment %d", childFragmentId)); + } + for (TNetworkAddress address : tNetworkAddresses) { + TRecCTEResetInfo tRecCTEResetInfo = new TRecCTEResetInfo(); + tRecCTEResetInfo.setFragmentId(childFragmentId.asInt()); + tRecCTEResetInfo.setAddr(address); + fragmentsToReset.add(tRecCTEResetInfo); + } + } + + RecursiveCteNode recursiveCteNode = recursiveCteNodes.get(0); + List<List<Expr>> materializedResultExprLists = recursiveCteNode.getMaterializedResultExprLists(); + List<List<TExpr>> texprLists = new ArrayList<>(materializedResultExprLists.size()); + for (List<Expr> exprList : materializedResultExprLists) { + texprLists.add(Expr.treesToThrift(exprList)); + } + TRecCTENode tRecCTENode = new TRecCTENode(); + tRecCTENode.setIsUnionAll(recursiveCteNode.isUnionAll()); + tRecCTENode.setTargets(targets); + tRecCTENode.setFragmentsToReset(fragmentsToReset); + tRecCTENode.setResultExprLists(texprLists); + recursiveCteNode.settRecCTENode(tRecCTENode); + } } return allAssignedJobs; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/UnassignedJobBuilder.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/UnassignedJobBuilder.java index bc20d3efa17..1229bc59455 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/UnassignedJobBuilder.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/UnassignedJobBuilder.java @@ -29,6 +29,7 @@ import org.apache.doris.planner.OlapScanNode; import org.apache.doris.planner.PlanFragment; import org.apache.doris.planner.PlanFragmentId; import org.apache.doris.planner.PlanNodeId; +import org.apache.doris.planner.RecursiveCteScanNode; import org.apache.doris.planner.ScanNode; import org.apache.doris.planner.SchemaScanNode; import org.apache.doris.thrift.TExplainLevel; @@ -126,6 +127,10 @@ public class UnassignedJobBuilder { unassignedJob = buildScanMetadataJob( statementContext, planFragment, (SchemaScanNode) scanNode, scanWorkerSelector ); + } else if (scanNode instanceof RecursiveCteScanNode) { + unassignedJob = buildScanRecursiveCteJob( + statementContext, planFragment, (RecursiveCteScanNode) scanNode, inputJobs, scanWorkerSelector + ); } else { // only scan external tables or cloud tables or table valued functions // e,g. select * from numbers('number'='100') @@ -196,6 +201,14 @@ public class UnassignedJobBuilder { return new UnassignedScanMetadataJob(statementContext, fragment, schemaScanNode, scanWorkerSelector); } + private UnassignedJob buildScanRecursiveCteJob( + StatementContext statementContext, PlanFragment fragment, + RecursiveCteScanNode recursiveCteScanNode, + ListMultimap<ExchangeNode, UnassignedJob> inputJobs, ScanWorkerSelector scanWorkerSelector) { + return new UnassignedRecursiveCteScanJob(statementContext, fragment, recursiveCteScanNode, + inputJobs, scanWorkerSelector); + } + private UnassignedJob buildScanRemoteTableJob( StatementContext statementContext, PlanFragment planFragment, List<ScanNode> scanNodes, ListMultimap<ExchangeNode, UnassignedJob> inputJobs, diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/UnassignedRecursiveCteScanJob.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/UnassignedRecursiveCteScanJob.java new file mode 100644 index 00000000000..7fbb3f88e21 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/UnassignedRecursiveCteScanJob.java @@ -0,0 +1,62 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.nereids.trees.plans.distribute.worker.job; + +import org.apache.doris.nereids.StatementContext; +import org.apache.doris.nereids.trees.plans.distribute.DistributeContext; +import org.apache.doris.nereids.trees.plans.distribute.worker.DistributedPlanWorker; +import org.apache.doris.nereids.trees.plans.distribute.worker.DistributedPlanWorkerManager; +import org.apache.doris.nereids.trees.plans.distribute.worker.ScanWorkerSelector; +import org.apache.doris.planner.ExchangeNode; +import org.apache.doris.planner.PlanFragment; +import org.apache.doris.planner.ScanNode; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ListMultimap; + +import java.util.List; +import java.util.Map; +import java.util.Objects; + +/** + * UnassignedRecursiveCteScanJob + */ +public class UnassignedRecursiveCteScanJob extends AbstractUnassignedScanJob { + private final ScanWorkerSelector scanWorkerSelector; + + public UnassignedRecursiveCteScanJob( + StatementContext statementContext, PlanFragment fragment, ScanNode scanNode, + ListMultimap<ExchangeNode, UnassignedJob> exchangeToChildJob, ScanWorkerSelector scanWorkerSelector) { + super(statementContext, fragment, ImmutableList.of(scanNode), exchangeToChildJob); + this.scanWorkerSelector = Objects.requireNonNull(scanWorkerSelector, "scanWorkerSelector is not null"); + } + + @Override + protected Map<DistributedPlanWorker, UninstancedScanSource> multipleMachinesParallelization( + DistributeContext distributeContext, ListMultimap<ExchangeNode, AssignedJob> inputJobs) { + return scanWorkerSelector.selectReplicaAndWorkerWithoutBucket( + scanNodes.get(0), statementContext.getConnectContext() + ); + } + + @Override + protected List<AssignedJob> fillUpAssignedJobs(List<AssignedJob> assignedJobs, + DistributedPlanWorkerManager workerManager, ListMultimap<ExchangeNode, AssignedJob> inputJobs) { + return fillUpSingleEmptyInstance(workerManager); + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalRecursiveCte.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalRecursiveCte.java new file mode 100644 index 00000000000..cfdf317c620 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalRecursiveCte.java @@ -0,0 +1,419 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.nereids.trees.plans.logical; + +import org.apache.doris.common.Pair; +import org.apache.doris.nereids.CascadesContext; +import org.apache.doris.nereids.memo.GroupExpression; +import org.apache.doris.nereids.properties.DataTrait; +import org.apache.doris.nereids.properties.LogicalProperties; +import org.apache.doris.nereids.properties.PhysicalProperties; +import org.apache.doris.nereids.rules.expression.ExpressionRewriteContext; +import org.apache.doris.nereids.trees.expressions.Expression; +import org.apache.doris.nereids.trees.expressions.NamedExpression; +import org.apache.doris.nereids.trees.expressions.Slot; +import org.apache.doris.nereids.trees.expressions.SlotReference; +import org.apache.doris.nereids.trees.expressions.literal.Literal; +import org.apache.doris.nereids.trees.plans.Plan; +import org.apache.doris.nereids.trees.plans.PlanType; +import org.apache.doris.nereids.trees.plans.algebra.Union; +import org.apache.doris.nereids.trees.plans.visitor.PlanVisitor; +import org.apache.doris.nereids.types.DataType; +import org.apache.doris.nereids.util.ExpressionUtils; +import org.apache.doris.nereids.util.TypeCoercionUtils; +import org.apache.doris.nereids.util.Utils; +import org.apache.doris.qe.ConnectContext; + +import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableSet; +import com.google.common.collect.Lists; + +import java.util.ArrayList; +import java.util.BitSet; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; +import java.util.Set; + +/** + * LogicalRecursiveCte is basically like LogicalUnion + */ +public class LogicalRecursiveCte extends LogicalSetOperation implements Union, OutputPrunable { + + // in doris, we use union node to present one row relation + private final List<List<NamedExpression>> constantExprsList; + // When there is an agg on the union and there is a filter on the agg, + // it is necessary to keep the filter on the agg and push the filter down to each child of the union. + private final boolean hasPushedFilter; + + /** LogicalRecursiveCte */ + public LogicalRecursiveCte(Qualifier qualifier, List<Plan> children) { + this(qualifier, ImmutableList.of(), children); + } + + /** LogicalRecursiveCte */ + public LogicalRecursiveCte(Qualifier qualifier, List<List<NamedExpression>> constantExprsList, + List<Plan> children) { + this(qualifier, ImmutableList.of(), ImmutableList.of(), constantExprsList, false, children); + } + + /** LogicalRecursiveCte */ + public LogicalRecursiveCte(Qualifier qualifier, List<NamedExpression> outputs, + List<List<SlotReference>> childrenOutputs, + List<List<NamedExpression>> constantExprsList, boolean hasPushedFilter, List<Plan> children) { + this(qualifier, outputs, childrenOutputs, constantExprsList, hasPushedFilter, Optional.empty(), + Optional.empty(), + children); + } + + /** LogicalRecursiveCte */ + public LogicalRecursiveCte(Qualifier qualifier, List<NamedExpression> outputs, + List<List<SlotReference>> childrenOutputs, + List<List<NamedExpression>> constantExprsList, boolean hasPushedFilter, + Optional<GroupExpression> groupExpression, Optional<LogicalProperties> logicalProperties, + List<Plan> children) { + super(PlanType.LOGICAL_RECURSIVE_CTE, qualifier, outputs, childrenOutputs, + groupExpression, logicalProperties, children); + this.hasPushedFilter = hasPushedFilter; + this.constantExprsList = Utils.fastToImmutableList( + Objects.requireNonNull(constantExprsList, "constantExprsList should not be null")); + } + + public boolean hasPushedFilter() { + return hasPushedFilter; + } + + public List<List<NamedExpression>> getConstantExprsList() { + return constantExprsList; + } + + @Override + public List<? extends Expression> getExpressions() { + return constantExprsList.stream().flatMap(List::stream).collect(ImmutableList.toImmutableList()); + } + + @Override + public String toString() { + return Utils.toSqlStringSkipNull("LogicalRecursiveCte", + "qualifier", qualifier, + "outputs", outputs, + "regularChildrenOutputs", regularChildrenOutputs, + "constantExprsList", constantExprsList, + "hasPushedFilter", hasPushedFilter, + "stats", statistics); + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + LogicalRecursiveCte that = (LogicalRecursiveCte) o; + return super.equals(that) && hasPushedFilter == that.hasPushedFilter + && Objects.equals(constantExprsList, that.constantExprsList); + } + + @Override + public int hashCode() { + return Objects.hash(super.hashCode(), hasPushedFilter, constantExprsList); + } + + @Override + public <R, C> R accept(PlanVisitor<R, C> visitor, C context) { + return visitor.visitLogicalRecursiveCte(this, context); + } + + @Override + public LogicalRecursiveCte withChildren(List<Plan> children) { + return new LogicalRecursiveCte(qualifier, outputs, regularChildrenOutputs, + constantExprsList, hasPushedFilter, children); + } + + @Override + public LogicalSetOperation withChildrenAndTheirOutputs(List<Plan> children, + List<List<SlotReference>> childrenOutputs) { + Preconditions.checkArgument(children.size() == childrenOutputs.size(), + "children size %s is not equals with children outputs size %s", + children.size(), childrenOutputs.size()); + return new LogicalRecursiveCte(qualifier, outputs, childrenOutputs, constantExprsList, hasPushedFilter, + children); + } + + @Override + public LogicalRecursiveCte withGroupExpression(Optional<GroupExpression> groupExpression) { + return new LogicalRecursiveCte(qualifier, outputs, regularChildrenOutputs, constantExprsList, hasPushedFilter, + groupExpression, Optional.of(getLogicalProperties()), children); + } + + @Override + public Plan withGroupExprLogicalPropChildren(Optional<GroupExpression> groupExpression, + Optional<LogicalProperties> logicalProperties, List<Plan> children) { + return new LogicalRecursiveCte(qualifier, outputs, regularChildrenOutputs, constantExprsList, hasPushedFilter, + groupExpression, logicalProperties, children); + } + + @Override + public LogicalRecursiveCte withNewOutputs(List<NamedExpression> newOutputs) { + return new LogicalRecursiveCte(qualifier, newOutputs, regularChildrenOutputs, constantExprsList, + hasPushedFilter, Optional.empty(), Optional.empty(), children); + } + + public LogicalRecursiveCte withNewOutputsAndConstExprsList(List<NamedExpression> newOutputs, + List<List<NamedExpression>> constantExprsList) { + return new LogicalRecursiveCte(qualifier, newOutputs, regularChildrenOutputs, constantExprsList, + hasPushedFilter, Optional.empty(), Optional.empty(), children); + } + + public LogicalRecursiveCte withChildrenAndConstExprsList(List<Plan> children, + List<List<SlotReference>> childrenOutputs, List<List<NamedExpression>> constantExprsList) { + return new LogicalRecursiveCte(qualifier, outputs, childrenOutputs, constantExprsList, hasPushedFilter, + children); + } + + public LogicalRecursiveCte withNewOutputsChildrenAndConstExprsList(List<NamedExpression> newOutputs, + List<Plan> children, + List<List<SlotReference>> childrenOutputs, + List<List<NamedExpression>> constantExprsList) { + return new LogicalRecursiveCte(qualifier, newOutputs, childrenOutputs, constantExprsList, + hasPushedFilter, Optional.empty(), Optional.empty(), children); + } + + public LogicalRecursiveCte withAllQualifier() { + return new LogicalRecursiveCte(Qualifier.ALL, outputs, regularChildrenOutputs, constantExprsList, + hasPushedFilter, + Optional.empty(), Optional.empty(), children); + } + + @Override + public LogicalRecursiveCte pruneOutputs(List<NamedExpression> prunedOutputs) { + return withNewOutputs(prunedOutputs); + } + + @Override + public void computeUnique(DataTrait.Builder builder) { + if (qualifier == Qualifier.DISTINCT) { + builder.addUniqueSlot(ImmutableSet.copyOf(getOutput())); + } + } + + @Override + public void computeUniform(DataTrait.Builder builder) { + final Optional<ExpressionRewriteContext> context = ConnectContext.get() == null ? Optional.empty() + : Optional.of(new ExpressionRewriteContext(CascadesContext.initContext( + ConnectContext.get().getStatementContext(), this, PhysicalProperties.ANY))); + for (int i = 0; i < getOutputs().size(); i++) { + Optional<Literal> value = Optional.empty(); + if (!constantExprsList.isEmpty()) { + value = ExpressionUtils.checkConstantExpr(constantExprsList.get(0).get(i), context); + if (!value.isPresent()) { + continue; + } + final int fi = i; + Literal literal = value.get(); + if (constantExprsList.stream() + .map(exprs -> ExpressionUtils.checkConstantExpr(exprs.get(fi), context)) + .anyMatch(val -> !val.isPresent() || !val.get().equals(literal))) { + continue; + } + } + for (int childIdx = 0; childIdx < children.size(); childIdx++) { + // TODO: use originOutputs = child(childIdx).getOutput() ? + List<? extends Slot> originOutputs = regularChildrenOutputs.get(childIdx); + Slot slot = originOutputs.get(i); + Optional<Expression> childValue = child(childIdx).getLogicalProperties() + .getTrait().getUniformValue(slot); + if (childValue == null || !childValue.isPresent() || !childValue.get().isConstant()) { + value = Optional.empty(); + break; + } + Optional<Literal> constExprOpt = ExpressionUtils.checkConstantExpr(childValue.get(), context); + if (!constExprOpt.isPresent()) { + value = Optional.empty(); + break; + } + if (!value.isPresent()) { + value = constExprOpt; + } else if (!value.equals(constExprOpt)) { + value = Optional.empty(); + break; + } + } + if (value.isPresent()) { + builder.addUniformSlotAndLiteral(getOutputs().get(i).toSlot(), value.get()); + } + } + } + + @Override + public boolean hasUnboundExpression() { + if (!constantExprsList.isEmpty() && children.isEmpty()) { + return false; + } + return super.hasUnboundExpression(); + } + + private List<BitSet> mapSlotToIndex(Plan plan, List<Set<Slot>> equalSlotsList) { + Map<Slot, Integer> slotToIndex = new HashMap<>(); + for (int i = 0; i < plan.getOutput().size(); i++) { + slotToIndex.put(plan.getOutput().get(i), i); + } + List<BitSet> equalSlotIndicesList = new ArrayList<>(); + for (Set<Slot> equalSlots : equalSlotsList) { + BitSet equalSlotIndices = new BitSet(); + for (Slot slot : equalSlots) { + if (slotToIndex.containsKey(slot)) { + equalSlotIndices.set(slotToIndex.get(slot)); + } + } + if (equalSlotIndices.cardinality() > 1) { + equalSlotIndicesList.add(equalSlotIndices); + } + } + return equalSlotIndicesList; + } + + @Override + public void computeEqualSet(DataTrait.Builder builder) { + if (children.isEmpty()) { + return; + } + + // Get the list of equal slot sets and their corresponding index mappings for the first child + List<Set<Slot>> childEqualSlotsList = child(0).getLogicalProperties() + .getTrait().calAllEqualSet(); + List<BitSet> childEqualSlotsIndicesList = mapSlotToIndex(child(0), childEqualSlotsList); + List<BitSet> unionEqualSlotIndicesList = new ArrayList<>(childEqualSlotsIndicesList); + + // Traverse all children and find the equal sets that exist in all children + for (int i = 1; i < children.size(); i++) { + Plan child = children.get(i); + + // Get the equal slot sets for the current child + childEqualSlotsList = child.getLogicalProperties().getTrait().calAllEqualSet(); + + // Map slots to indices for the current child + childEqualSlotsIndicesList = mapSlotToIndex(child, childEqualSlotsList); + + // Only keep the equal pairs that exist in all children of the union + // This is done by calculating the intersection of all children's equal slot indices + for (BitSet unionEqualSlotIndices : unionEqualSlotIndicesList) { + BitSet intersect = new BitSet(); + for (BitSet childEqualSlotIndices : childEqualSlotsIndicesList) { + if (unionEqualSlotIndices.intersects(childEqualSlotIndices)) { + intersect = childEqualSlotIndices; + break; + } + } + unionEqualSlotIndices.and(intersect); + } + } + + // Build the functional dependencies for the output slots + List<Slot> outputList = getOutput(); + for (BitSet equalSlotIndices : unionEqualSlotIndicesList) { + if (equalSlotIndices.cardinality() <= 1) { + continue; + } + int first = equalSlotIndices.nextSetBit(0); + int next = equalSlotIndices.nextSetBit(first + 1); + while (next > 0) { + builder.addEqualPair(outputList.get(first), outputList.get(next)); + next = equalSlotIndices.nextSetBit(next + 1); + } + } + } + + @Override + public void computeFd(DataTrait.Builder builder) { + // don't generate + } + + /** castCommonDataTypeAndNullableByConstants */ + public static Pair<List<List<NamedExpression>>, List<Boolean>> castCommonDataTypeAndNullableByConstants( + List<List<NamedExpression>> constantExprsList) { + int columnCount = constantExprsList.isEmpty() ? 0 : constantExprsList.get(0).size(); + Pair<List<DataType>, List<Boolean>> commonInfo = computeCommonDataTypeAndNullable(constantExprsList, + columnCount); + List<List<NamedExpression>> castedRows = castToCommonType(constantExprsList, commonInfo.key(), columnCount); + List<Boolean> nullables = commonInfo.second; + return Pair.of(castedRows, nullables); + } + + private static Pair<List<DataType>, List<Boolean>> computeCommonDataTypeAndNullable( + List<List<NamedExpression>> constantExprsList, int columnCount) { + List<Boolean> nullables = Lists.newArrayListWithCapacity(columnCount); + List<DataType> commonDataTypes = Lists.newArrayListWithCapacity(columnCount); + List<NamedExpression> firstRow = constantExprsList.get(0); + for (int columnId = 0; columnId < columnCount; columnId++) { + Expression constant = firstRow.get(columnId).child(0); + Pair<DataType, Boolean> commonDataTypeAndNullable = computeCommonDataTypeAndNullable(constant, columnId, + constantExprsList); + commonDataTypes.add(commonDataTypeAndNullable.first); + nullables.add(commonDataTypeAndNullable.second); + } + return Pair.of(commonDataTypes, nullables); + } + + private static Pair<DataType, Boolean> computeCommonDataTypeAndNullable( + Expression firstRowExpr, int columnId, List<List<NamedExpression>> constantExprsList) { + DataType commonDataType = firstRowExpr.getDataType(); + boolean nullable = firstRowExpr.nullable(); + for (int rowId = 1; rowId < constantExprsList.size(); rowId++) { + NamedExpression namedExpression = constantExprsList.get(rowId).get(columnId); + Expression otherConstant = namedExpression.child(0); + nullable |= otherConstant.nullable(); + DataType otherDataType = otherConstant.getDataType(); + commonDataType = getAssignmentCompatibleType(commonDataType, otherDataType); + } + return Pair.of(commonDataType, nullable); + } + + private static List<List<NamedExpression>> castToCommonType( + List<List<NamedExpression>> rows, List<DataType> commonDataTypes, int columnCount) { + ImmutableList.Builder<List<NamedExpression>> castedConstants = ImmutableList + .builderWithExpectedSize(rows.size()); + for (List<NamedExpression> row : rows) { + castedConstants.add(castToCommonType(row, commonDataTypes)); + } + return castedConstants.build(); + } + + private static List<NamedExpression> castToCommonType(List<NamedExpression> row, List<DataType> commonTypes) { + ImmutableList.Builder<NamedExpression> castedRow = ImmutableList.builderWithExpectedSize(row.size()); + boolean changed = false; + for (int columnId = 0; columnId < row.size(); columnId++) { + NamedExpression constantAlias = row.get(columnId); + Expression constant = constantAlias.child(0); + DataType commonType = commonTypes.get(columnId); + if (commonType.equals(constant.getDataType())) { + castedRow.add(constantAlias); + } else { + changed = true; + Expression expression = TypeCoercionUtils.castIfNotSameTypeStrict(constant, commonType); + castedRow.add((NamedExpression) constantAlias.withChildren(expression)); + } + } + return changed ? castedRow.build() : row; + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalRecursiveCteScan.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalRecursiveCteScan.java new file mode 100644 index 00000000000..7b1fc4862ee --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalRecursiveCteScan.java @@ -0,0 +1,66 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.nereids.trees.plans.logical; + +import org.apache.doris.catalog.TableIf; +import org.apache.doris.nereids.memo.GroupExpression; +import org.apache.doris.nereids.properties.LogicalProperties; +import org.apache.doris.nereids.trees.plans.Plan; +import org.apache.doris.nereids.trees.plans.PlanType; +import org.apache.doris.nereids.trees.plans.RelationId; +import org.apache.doris.nereids.trees.plans.visitor.PlanVisitor; + +import java.util.List; +import java.util.Optional; + +/** + * LogicalRecursiveCteScan. + */ +public class LogicalRecursiveCteScan extends LogicalCatalogRelation { + public LogicalRecursiveCteScan(RelationId relationId, TableIf table, List<String> qualifier) { + this(relationId, table, qualifier, Optional.empty(), Optional.empty()); + } + + private LogicalRecursiveCteScan(RelationId relationId, TableIf table, List<String> qualifier, + Optional<GroupExpression> groupExpression, Optional<LogicalProperties> logicalProperties) { + super(relationId, PlanType.LOGICAL_RECURSIVE_CTE_SCAN, table, qualifier, groupExpression, logicalProperties); + } + + @Override + public Plan withGroupExpression(Optional<GroupExpression> groupExpression) { + return new LogicalRecursiveCteScan(relationId, table, qualifier, + groupExpression, Optional.ofNullable(getLogicalProperties())); + } + + @Override + public Plan withGroupExprLogicalPropChildren(Optional<GroupExpression> groupExpression, + Optional<LogicalProperties> logicalProperties, List<Plan> children) { + return new LogicalRecursiveCteScan(relationId, table, qualifier, groupExpression, logicalProperties); + } + + @Override + public LogicalCatalogRelation withRelationId(RelationId relationId) { + return new LogicalRecursiveCteScan(relationId, table, qualifier, + groupExpression, Optional.ofNullable(getLogicalProperties())); + } + + @Override + public <R, C> R accept(PlanVisitor<R, C> visitor, C context) { + return visitor.visitLogicalRecursiveCteScan(this, context); + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalSubQueryAlias.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalSubQueryAlias.java index 328508fb3c7..e7230b5a31e 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalSubQueryAlias.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalSubQueryAlias.java @@ -55,28 +55,36 @@ public class LogicalSubQueryAlias<CHILD_TYPE extends Plan> extends LogicalUnary< private final List<String> qualifier; private final Optional<List<String>> columnAliases; + private final boolean isRecursiveCte; + public LogicalSubQueryAlias(String tableAlias, CHILD_TYPE child) { - this(ImmutableList.of(tableAlias), Optional.empty(), Optional.empty(), Optional.empty(), child); + this(ImmutableList.of(tableAlias), Optional.empty(), false, Optional.empty(), Optional.empty(), child); } public LogicalSubQueryAlias(List<String> qualifier, CHILD_TYPE child) { - this(qualifier, Optional.empty(), Optional.empty(), Optional.empty(), child); + this(qualifier, Optional.empty(), false, Optional.empty(), Optional.empty(), child); } public LogicalSubQueryAlias(String tableAlias, Optional<List<String>> columnAliases, CHILD_TYPE child) { - this(ImmutableList.of(tableAlias), columnAliases, Optional.empty(), Optional.empty(), child); + this(ImmutableList.of(tableAlias), columnAliases, false, Optional.empty(), Optional.empty(), child); + } + + public LogicalSubQueryAlias(String tableAlias, Optional<List<String>> columnAliases, boolean isRecursiveCte, + CHILD_TYPE child) { + this(ImmutableList.of(tableAlias), columnAliases, isRecursiveCte, Optional.empty(), Optional.empty(), child); } public LogicalSubQueryAlias(List<String> qualifier, Optional<List<String>> columnAliases, CHILD_TYPE child) { - this(qualifier, columnAliases, Optional.empty(), Optional.empty(), child); + this(qualifier, columnAliases, false, Optional.empty(), Optional.empty(), child); } - public LogicalSubQueryAlias(List<String> qualifier, Optional<List<String>> columnAliases, + public LogicalSubQueryAlias(List<String> qualifier, Optional<List<String>> columnAliases, boolean isRecursiveCte, Optional<GroupExpression> groupExpression, Optional<LogicalProperties> logicalProperties, CHILD_TYPE child) { super(PlanType.LOGICAL_SUBQUERY_ALIAS, groupExpression, logicalProperties, child); this.qualifier = ImmutableList.copyOf(Objects.requireNonNull(qualifier, "qualifier is null")); this.columnAliases = columnAliases; + this.isRecursiveCte = isRecursiveCte; } @Override @@ -128,10 +136,14 @@ public class LogicalSubQueryAlias<CHILD_TYPE extends Plan> extends LogicalUnary< return columnAliases; } + public boolean isRecursiveCte() { + return isRecursiveCte; + } + @Override public String toString() { return columnAliases.map(strings -> Utils.toSqlString("LogicalSubQueryAlias", - "qualifier", qualifier, + "qualifier", qualifier, "isRecursiveCte", isRecursiveCte, "columnAliases", StringUtils.join(strings, ",") )).orElseGet(() -> Utils.toSqlString("LogicalSubQueryAlias", "qualifier", qualifier @@ -158,7 +170,8 @@ public class LogicalSubQueryAlias<CHILD_TYPE extends Plan> extends LogicalUnary< @Override public LogicalSubQueryAlias<Plan> withChildren(List<Plan> children) { Preconditions.checkArgument(children.size() == 1); - return new LogicalSubQueryAlias<>(qualifier, columnAliases, children.get(0)); + return new LogicalSubQueryAlias<>(qualifier, columnAliases, isRecursiveCte, Optional.empty(), Optional.empty(), + children.get(0)); } @Override @@ -173,7 +186,7 @@ public class LogicalSubQueryAlias<CHILD_TYPE extends Plan> extends LogicalUnary< @Override public LogicalSubQueryAlias<CHILD_TYPE> withGroupExpression(Optional<GroupExpression> groupExpression) { - return new LogicalSubQueryAlias<>(qualifier, columnAliases, groupExpression, + return new LogicalSubQueryAlias<>(qualifier, columnAliases, isRecursiveCte, groupExpression, Optional.of(getLogicalProperties()), child()); } @@ -181,7 +194,7 @@ public class LogicalSubQueryAlias<CHILD_TYPE extends Plan> extends LogicalUnary< public Plan withGroupExprLogicalPropChildren(Optional<GroupExpression> groupExpression, Optional<LogicalProperties> logicalProperties, List<Plan> children) { Preconditions.checkArgument(children.size() == 1); - return new LogicalSubQueryAlias<>(qualifier, columnAliases, groupExpression, logicalProperties, + return new LogicalSubQueryAlias<>(qualifier, columnAliases, isRecursiveCte, groupExpression, logicalProperties, children.get(0)); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalRecursiveCte.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalRecursiveCte.java new file mode 100644 index 00000000000..ef0cc98de0a --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalRecursiveCte.java @@ -0,0 +1,251 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.nereids.trees.plans.physical; + +import org.apache.doris.nereids.memo.GroupExpression; +import org.apache.doris.nereids.properties.DataTrait; +import org.apache.doris.nereids.properties.LogicalProperties; +import org.apache.doris.nereids.properties.PhysicalProperties; +import org.apache.doris.nereids.trees.expressions.Expression; +import org.apache.doris.nereids.trees.expressions.NamedExpression; +import org.apache.doris.nereids.trees.expressions.Slot; +import org.apache.doris.nereids.trees.expressions.SlotReference; +import org.apache.doris.nereids.trees.plans.Plan; +import org.apache.doris.nereids.trees.plans.PlanType; +import org.apache.doris.nereids.trees.plans.algebra.Union; +import org.apache.doris.nereids.trees.plans.visitor.PlanVisitor; +import org.apache.doris.nereids.util.Utils; +import org.apache.doris.qe.ConnectContext; +import org.apache.doris.statistics.Statistics; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableSet; + +import java.util.ArrayList; +import java.util.BitSet; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; +import java.util.Set; +import java.util.stream.Collectors; + +/** + * PhysicalRecursiveCte is basically like PhysicalUnion + */ +public class PhysicalRecursiveCte extends PhysicalSetOperation implements Union { + + // in doris, we use union node to present one row relation + private final List<List<NamedExpression>> constantExprsList; + + /** PhysicalRecursiveCte */ + public PhysicalRecursiveCte(Qualifier qualifier, + List<NamedExpression> outputs, + List<List<SlotReference>> childrenOutputs, + List<List<NamedExpression>> constantExprsList, + LogicalProperties logicalProperties, + List<Plan> children) { + super(PlanType.PHYSICAL_RECURSIVE_CTE, qualifier, outputs, childrenOutputs, logicalProperties, children); + this.constantExprsList = ImmutableList.copyOf( + Objects.requireNonNull(constantExprsList, "constantExprsList should not be null")); + } + + /** PhysicalRecursiveCte */ + public PhysicalRecursiveCte(Qualifier qualifier, + List<NamedExpression> outputs, + List<List<SlotReference>> childrenOutputs, + List<List<NamedExpression>> constantExprsList, + Optional<GroupExpression> groupExpression, + LogicalProperties logicalProperties, + List<Plan> children) { + super(PlanType.PHYSICAL_RECURSIVE_CTE, qualifier, outputs, childrenOutputs, + groupExpression, logicalProperties, children); + this.constantExprsList = ImmutableList.copyOf( + Objects.requireNonNull(constantExprsList, "constantExprsList should not be null")); + } + + /** PhysicalRecursiveCte */ + public PhysicalRecursiveCte(Qualifier qualifier, List<NamedExpression> outputs, + List<List<SlotReference>> childrenOutputs, List<List<NamedExpression>> constantExprsList, + Optional<GroupExpression> groupExpression, LogicalProperties logicalProperties, + PhysicalProperties physicalProperties, Statistics statistics, List<Plan> inputs) { + super(PlanType.PHYSICAL_RECURSIVE_CTE, qualifier, outputs, childrenOutputs, + groupExpression, logicalProperties, physicalProperties, statistics, inputs); + this.constantExprsList = ImmutableList.copyOf( + Objects.requireNonNull(constantExprsList, "constantExprsList should not be null")); + } + + public List<List<NamedExpression>> getConstantExprsList() { + return constantExprsList; + } + + @Override + public <R, C> R accept(PlanVisitor<R, C> visitor, C context) { + return visitor.visitPhysicalRecursiveCte(this, context); + } + + @Override + public String toString() { + return Utils.toSqlString("PhysicalRecursiveCte" + "[" + id.asInt() + "]" + getGroupIdWithPrefix(), + "stats", statistics, + "qualifier", qualifier, + "outputs", outputs, + "regularChildrenOutputs", regularChildrenOutputs, + "constantExprsList", constantExprsList); + } + + @Override + public String shapeInfo() { + ConnectContext context = ConnectContext.get(); + if (context != null + && context.getSessionVariable().getDetailShapePlanNodesSet().contains(getClass().getSimpleName())) { + StringBuilder builder = new StringBuilder(); + builder.append(getClass().getSimpleName()); + builder.append("(constantExprsList="); + builder.append(constantExprsList.stream() + .map(exprs -> exprs.stream().map(Expression::shapeInfo) + .collect(Collectors.joining(", ", "[", "]"))) + .collect(Collectors.joining(", ", "[", "]"))); + builder.append(")"); + return builder.toString(); + } else { + return super.shapeInfo(); + } + } + + @Override + public PhysicalRecursiveCte withChildren(List<Plan> children) { + return new PhysicalRecursiveCte(qualifier, outputs, regularChildrenOutputs, constantExprsList, groupExpression, + getLogicalProperties(), children); + } + + @Override + public PhysicalRecursiveCte withGroupExpression(Optional<GroupExpression> groupExpression) { + return new PhysicalRecursiveCte(qualifier, outputs, regularChildrenOutputs, constantExprsList, + groupExpression, getLogicalProperties(), children); + } + + @Override + public Plan withGroupExprLogicalPropChildren(Optional<GroupExpression> groupExpression, + Optional<LogicalProperties> logicalProperties, List<Plan> children) { + return new PhysicalRecursiveCte(qualifier, outputs, regularChildrenOutputs, constantExprsList, + groupExpression, logicalProperties.get(), children); + } + + @Override + public PhysicalRecursiveCte withPhysicalPropertiesAndStats( + PhysicalProperties physicalProperties, Statistics statistics) { + return new PhysicalRecursiveCte(qualifier, outputs, regularChildrenOutputs, constantExprsList, + groupExpression, getLogicalProperties(), physicalProperties, statistics, children); + } + + @Override + public PhysicalRecursiveCte resetLogicalProperties() { + return new PhysicalRecursiveCte(qualifier, outputs, regularChildrenOutputs, constantExprsList, + Optional.empty(), null, physicalProperties, statistics, children); + } + + @Override + public void computeUnique(DataTrait.Builder builder) { + if (qualifier == Qualifier.DISTINCT) { + builder.addUniqueSlot(ImmutableSet.copyOf(getOutput())); + } + } + + @Override + public void computeUniform(DataTrait.Builder builder) { + // don't propagate uniform slots + } + + private List<BitSet> mapSlotToIndex(Plan plan, List<Set<Slot>> equalSlotsList) { + Map<Slot, Integer> slotToIndex = new HashMap<>(); + for (int i = 0; i < plan.getOutput().size(); i++) { + slotToIndex.put(plan.getOutput().get(i), i); + } + List<BitSet> equalSlotIndicesList = new ArrayList<>(); + for (Set<Slot> equalSlots : equalSlotsList) { + BitSet equalSlotIndices = new BitSet(); + for (Slot slot : equalSlots) { + if (slotToIndex.containsKey(slot)) { + equalSlotIndices.set(slotToIndex.get(slot)); + } + } + if (equalSlotIndices.cardinality() > 1) { + equalSlotIndicesList.add(equalSlotIndices); + } + } + return equalSlotIndicesList; + } + + @Override + public void computeEqualSet(DataTrait.Builder builder) { + if (children.isEmpty()) { + return; + } + + // Get the list of equal slot sets and their corresponding index mappings for the first child + List<Set<Slot>> childEqualSlotsList = child(0).getLogicalProperties() + .getTrait().calAllEqualSet(); + List<BitSet> childEqualSlotsIndicesList = mapSlotToIndex(child(0), childEqualSlotsList); + List<BitSet> unionEqualSlotIndicesList = new ArrayList<>(childEqualSlotsIndicesList); + + // Traverse all children and find the equal sets that exist in all children + for (int i = 1; i < children.size(); i++) { + Plan child = children.get(i); + + // Get the equal slot sets for the current child + childEqualSlotsList = child.getLogicalProperties().getTrait().calAllEqualSet(); + + // Map slots to indices for the current child + childEqualSlotsIndicesList = mapSlotToIndex(child, childEqualSlotsList); + + // Only keep the equal pairs that exist in all children of the union + // This is done by calculating the intersection of all children's equal slot indices + for (BitSet unionEqualSlotIndices : unionEqualSlotIndicesList) { + BitSet intersect = new BitSet(); + for (BitSet childEqualSlotIndices : childEqualSlotsIndicesList) { + if (unionEqualSlotIndices.intersects(childEqualSlotIndices)) { + intersect = childEqualSlotIndices; + break; + } + } + unionEqualSlotIndices.and(intersect); + } + } + + // Build the functional dependencies for the output slots + List<Slot> outputList = getOutput(); + for (BitSet equalSlotIndices : unionEqualSlotIndicesList) { + if (equalSlotIndices.cardinality() <= 1) { + continue; + } + int first = equalSlotIndices.nextSetBit(0); + int next = equalSlotIndices.nextSetBit(first + 1); + while (next > 0) { + builder.addEqualPair(outputList.get(first), outputList.get(next)); + next = equalSlotIndices.nextSetBit(next + 1); + } + } + } + + @Override + public void computeFd(DataTrait.Builder builder) { + // don't generate + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalRecursiveCteScan.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalRecursiveCteScan.java new file mode 100644 index 00000000000..3450ae0de18 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalRecursiveCteScan.java @@ -0,0 +1,85 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.nereids.trees.plans.physical; + +import org.apache.doris.catalog.TableIf; +import org.apache.doris.nereids.memo.GroupExpression; +import org.apache.doris.nereids.properties.LogicalProperties; +import org.apache.doris.nereids.properties.PhysicalProperties; +import org.apache.doris.nereids.trees.expressions.Slot; +import org.apache.doris.nereids.trees.plans.Plan; +import org.apache.doris.nereids.trees.plans.PlanType; +import org.apache.doris.nereids.trees.plans.RelationId; +import org.apache.doris.nereids.trees.plans.visitor.PlanVisitor; +import org.apache.doris.nereids.util.Utils; +import org.apache.doris.statistics.Statistics; + +import java.util.Collection; +import java.util.List; +import java.util.Optional; + +/** + * PhysicalRecursiveCteScan. + */ +public class PhysicalRecursiveCteScan extends PhysicalCatalogRelation { + public PhysicalRecursiveCteScan(RelationId relationId, TableIf table, List<String> qualifier, + Optional<GroupExpression> groupExpression, LogicalProperties logicalProperties, + Collection<Slot> operativeSlots) { + this(relationId, table, qualifier, groupExpression, logicalProperties, PhysicalProperties.ANY, null, + operativeSlots); + } + + public PhysicalRecursiveCteScan(RelationId relationId, TableIf table, List<String> qualifier, + Optional<GroupExpression> groupExpression, LogicalProperties logicalProperties, + PhysicalProperties physicalProperties, Statistics statistics, Collection<Slot> operativeSlots) { + super(relationId, PlanType.PHYSICAL_RECURSIVE_CTE_SCAN, table, qualifier, groupExpression, logicalProperties, + physicalProperties, statistics, operativeSlots); + } + + @Override + public <R, C> R accept(PlanVisitor<R, C> visitor, C context) { + return visitor.visitPhysicalRecursiveCteScan(this, context); + } + + @Override + public Plan withGroupExpression(Optional<GroupExpression> groupExpression) { + return new PhysicalRecursiveCteScan(relationId, table, qualifier, groupExpression, getLogicalProperties(), + physicalProperties, statistics, operativeSlots); + } + + @Override + public Plan withGroupExprLogicalPropChildren(Optional<GroupExpression> groupExpression, + Optional<LogicalProperties> logicalProperties, List<Plan> children) { + return new PhysicalRecursiveCteScan(relationId, table, qualifier, groupExpression, getLogicalProperties(), + physicalProperties, statistics, operativeSlots); + } + + @Override + public PhysicalPlan withPhysicalPropertiesAndStats(PhysicalProperties physicalProperties, Statistics statistics) { + return new PhysicalRecursiveCteScan(relationId, table, qualifier, groupExpression, getLogicalProperties(), + physicalProperties, statistics, operativeSlots); + } + + @Override + public String toString() { + return Utils.toSqlString("PhysicalRecursiveCteScan[" + table.getName() + "]" + getGroupIdWithPrefix(), + "stats", statistics, + "qualified", Utils.qualifiedName(qualifier, table.getName()), + "operativeCols", getOperativeSlots()); + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/visitor/PlanVisitor.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/visitor/PlanVisitor.java index 291f9ed5d80..d34de256608 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/visitor/PlanVisitor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/visitor/PlanVisitor.java @@ -44,6 +44,7 @@ import org.apache.doris.nereids.trees.plans.logical.LogicalPreAggOnHint; import org.apache.doris.nereids.trees.plans.logical.LogicalPreFilter; import org.apache.doris.nereids.trees.plans.logical.LogicalProject; import org.apache.doris.nereids.trees.plans.logical.LogicalQualify; +import org.apache.doris.nereids.trees.plans.logical.LogicalRecursiveCte; import org.apache.doris.nereids.trees.plans.logical.LogicalRelation; import org.apache.doris.nereids.trees.plans.logical.LogicalRepeat; import org.apache.doris.nereids.trees.plans.logical.LogicalSelectHint; @@ -78,6 +79,7 @@ import org.apache.doris.nereids.trees.plans.physical.PhysicalNestedLoopJoin; import org.apache.doris.nereids.trees.plans.physical.PhysicalPartitionTopN; import org.apache.doris.nereids.trees.plans.physical.PhysicalProject; import org.apache.doris.nereids.trees.plans.physical.PhysicalQuickSort; +import org.apache.doris.nereids.trees.plans.physical.PhysicalRecursiveCte; import org.apache.doris.nereids.trees.plans.physical.PhysicalRelation; import org.apache.doris.nereids.trees.plans.physical.PhysicalRepeat; import org.apache.doris.nereids.trees.plans.physical.PhysicalSetOperation; @@ -211,6 +213,10 @@ public abstract class PlanVisitor<R, C> implements CommandVisitor<R, C>, Relatio return visit(join, context); } + public R visitLogicalRecursiveCte(LogicalRecursiveCte recursiveCte, C context) { + return visit(recursiveCte, context); + } + public R visitLogicalLimit(LogicalLimit<? extends Plan> limit, C context) { return visit(limit, context); } @@ -376,6 +382,10 @@ public abstract class PlanVisitor<R, C> implements CommandVisitor<R, C>, Relatio return visitPhysicalSetOperation(union, context); } + public R visitPhysicalRecursiveCte(PhysicalRecursiveCte recursiveCte, C context) { + return visitPhysicalSetOperation(recursiveCte, context); + } + public R visitAbstractPhysicalSort(AbstractPhysicalSort<? extends Plan> sort, C context) { return visit(sort, context); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/visitor/RelationVisitor.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/visitor/RelationVisitor.java index fef94ff52f9..b325849b1b1 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/visitor/RelationVisitor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/visitor/RelationVisitor.java @@ -30,6 +30,7 @@ import org.apache.doris.nereids.trees.plans.logical.LogicalJdbcScan; import org.apache.doris.nereids.trees.plans.logical.LogicalOdbcScan; import org.apache.doris.nereids.trees.plans.logical.LogicalOlapScan; import org.apache.doris.nereids.trees.plans.logical.LogicalOneRowRelation; +import org.apache.doris.nereids.trees.plans.logical.LogicalRecursiveCteScan; import org.apache.doris.nereids.trees.plans.logical.LogicalRelation; import org.apache.doris.nereids.trees.plans.logical.LogicalSchemaScan; import org.apache.doris.nereids.trees.plans.logical.LogicalTVFRelation; @@ -45,6 +46,7 @@ import org.apache.doris.nereids.trees.plans.physical.PhysicalJdbcScan; import org.apache.doris.nereids.trees.plans.physical.PhysicalOdbcScan; import org.apache.doris.nereids.trees.plans.physical.PhysicalOlapScan; import org.apache.doris.nereids.trees.plans.physical.PhysicalOneRowRelation; +import org.apache.doris.nereids.trees.plans.physical.PhysicalRecursiveCteScan; import org.apache.doris.nereids.trees.plans.physical.PhysicalRelation; import org.apache.doris.nereids.trees.plans.physical.PhysicalSchemaScan; import org.apache.doris.nereids.trees.plans.physical.PhysicalTVFRelation; @@ -139,6 +141,10 @@ public interface RelationVisitor<R, C> { return visitLogicalCatalogRelation(testScan, context); } + default R visitLogicalRecursiveCteScan(LogicalRecursiveCteScan recursiveCteScan, C context) { + return visitLogicalCatalogRelation(recursiveCteScan, context); + } + // ******************************* // physical relations // ******************************* @@ -176,6 +182,10 @@ public interface RelationVisitor<R, C> { return visitPhysicalCatalogRelation(deferMaterializeOlapScan, context); } + default R visitPhysicalRecursiveCteScan(PhysicalRecursiveCteScan recursiveCteScan, C context) { + return visitPhysicalCatalogRelation(recursiveCteScan, context); + } + default R visitPhysicalOneRowRelation(PhysicalOneRowRelation oneRowRelation, C context) { return visitPhysicalRelation(oneRowRelation, context); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/RecursiveCteNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/RecursiveCteNode.java new file mode 100644 index 00000000000..b4075d66663 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/RecursiveCteNode.java @@ -0,0 +1,52 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +// This file is copied from +// https://github.com/apache/impala/blob/branch-2.9.0/fe/src/main/java/org/apache/impala/UnionNode.java +// and modified by Doris + +package org.apache.doris.planner; + +import org.apache.doris.analysis.TupleId; +import org.apache.doris.statistics.StatisticalType; +import org.apache.doris.thrift.TPlanNode; +import org.apache.doris.thrift.TPlanNodeType; +import org.apache.doris.thrift.TRecCTENode; + +public class RecursiveCteNode extends SetOperationNode { + + private boolean isUnionAll; + private TRecCTENode tRecCTENode; + + public RecursiveCteNode(PlanNodeId id, TupleId tupleId, boolean isUnionAll) { + super(id, tupleId, "RECURSIVE_CTE", StatisticalType.RECURSIVE_CTE_NODE); + this.isUnionAll = isUnionAll; + } + + public boolean isUnionAll() { + return isUnionAll; + } + + public void settRecCTENode(TRecCTENode tRecCTENode) { + this.tRecCTENode = tRecCTENode; + } + + @Override + protected void toThrift(TPlanNode msg) { + msg.node_type = TPlanNodeType.REC_CTE_NODE; + msg.rec_cte_node = tRecCTENode; + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/RecursiveCteScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/RecursiveCteScanNode.java new file mode 100644 index 00000000000..103abd8b790 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/RecursiveCteScanNode.java @@ -0,0 +1,113 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.planner; + +import org.apache.doris.analysis.TupleDescriptor; +import org.apache.doris.catalog.Env; +import org.apache.doris.common.UserException; +import org.apache.doris.statistics.StatisticalType; +import org.apache.doris.system.Backend; +import org.apache.doris.thrift.TDataGenScanRange; +import org.apache.doris.thrift.TExplainLevel; +import org.apache.doris.thrift.TNetworkAddress; +import org.apache.doris.thrift.TPlanNode; +import org.apache.doris.thrift.TPlanNodeType; +import org.apache.doris.thrift.TScanRange; +import org.apache.doris.thrift.TScanRangeLocation; +import org.apache.doris.thrift.TScanRangeLocations; + +import com.google.common.collect.Lists; + +import java.util.Collections; +import java.util.List; + +// Full scan of recursive cte temp table +public class RecursiveCteScanNode extends ScanNode { + + public RecursiveCteScanNode(PlanNodeId id, TupleDescriptor desc) { + super(id, desc, "RECURSIVE_CTE_SCAN", StatisticalType.CTE_SCAN_NODE); + } + + public void initScanRangeLocations() throws UserException { + createScanRangeLocations(); + } + + @Override + protected void createScanRangeLocations() throws UserException { + scanRangeLocations = Lists.newArrayList(); + // randomly select 1 backend + List<Backend> backendList = Lists.newArrayList(); + for (Backend be : Env.getCurrentSystemInfo().getBackendsByCurrentCluster().values()) { + if (be.isAlive()) { + backendList.add(be); + } + } + if (backendList.isEmpty()) { + throw new UserException("No Alive backends"); + } + Collections.shuffle(backendList); + Backend selectedBackend = backendList.get(0); + + // create a dummy scan range + TScanRange scanRange = new TScanRange(); + TDataGenScanRange dataGenScanRange = new TDataGenScanRange(); + scanRange.setDataGenScanRange(dataGenScanRange); + + // create scan range locations + TScanRangeLocations locations = new TScanRangeLocations(); + TScanRangeLocation location = new TScanRangeLocation(); + location.setBackendId(selectedBackend.getId()); + location.setServer(new TNetworkAddress(selectedBackend.getHost(), selectedBackend.getBePort())); + locations.addToLocations(location); + locations.setScanRange(scanRange); + scanRangeLocations.add(locations); + } + + @Override + public List<TScanRangeLocations> getScanRangeLocations(long maxScanRangeLength) { + return scanRangeLocations; + } + + @Override + public int getNumInstances() { + return 1; + } + + @Override + public int getScanRangeNum() { + return 1; + } + + @Override + public String getNodeExplainString(String prefix, TExplainLevel detailLevel) { + StringBuilder output = new StringBuilder(); + output.append(prefix).append("Recursive Cte: ").append(getTableIf().getName()); + output.append("\n"); + return output.toString(); + } + + @Override + protected void toThrift(TPlanNode msg) { + msg.node_type = TPlanNodeType.REC_CTE_SCAN_NODE; + } + + @Override + public boolean isSerialOperator() { + return true; + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticalType.java b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticalType.java index 42a930c2471..a2633b51449 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticalType.java +++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/StatisticalType.java @@ -41,6 +41,8 @@ public enum StatisticalType { ODBC_SCAN_NODE, OLAP_SCAN_NODE, PARTITION_TOPN_NODE, + RECURSIVE_CTE_NODE, + RECURSIVE_CTE_SCAN_NODE, REPEAT_NODE, SELECT_NODE, SET_OPERATION_NODE, diff --git a/regression-test/suites/nereids_p0/recursive_cte/test_recursive_cte.groovy b/regression-test/suites/nereids_p0/recursive_cte/test_recursive_cte.groovy new file mode 100644 index 00000000000..6055f9798cd --- /dev/null +++ b/regression-test/suites/nereids_p0/recursive_cte/test_recursive_cte.groovy @@ -0,0 +1,102 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +suite("test_recursive_cte") { + sql """drop table if exists t1""" + sql """drop table if exists t2""" + sql """drop table if exists t3""" + sql """drop table if exists t4""" + + sql """ + create table if not exists t1 ( + c2 int , + c1 int , + c3 int , + c4 int , + pk int + ) + distributed by hash(pk) buckets 10 + properties("replication_num" = "1"); + """ + + sql """ + create table if not exists t2 ( + c1 int , + c2 int , + c3 int , + c4 int , + pk int + ) + distributed by hash(pk) buckets 10 + properties("replication_num" = "1"); + """ + + sql """ + create table if not exists t3 ( + c2 int , + c1 int , + c3 int , + c4 int , + pk int + ) + distributed by hash(pk) buckets 10 + properties("replication_num" = "1"); + """ + + sql """ + create table if not exists t4 ( + c1 int , + c2 int , + c3 int , + c4 int , + pk int + ) + distributed by hash(pk) buckets 10 + properties("replication_num" = "1"); + """ + + sql """ + insert into t1(pk,c1,c2,c3,c4) values (0,7,2,3328056,7),(1,3,5,3,3045349),(2,2130015,0,7,-7116176),(3,4411710,1203314,1,2336164),(4,4,-8001461,0,8),(5,9,3,6,2),(6,-8088092,null,-7256698,-2025142),(7,8,2,5,1),(8,4,4953685,3,null),(9,-6662413,-3845449,4,2),(10,5315281,0,5,null),(11,9,3,7,7),(12,4341905,null,null,8),(13,3,6,5,1),(14,5,9,6541164,3),(15,1,-582319,1,9),(16,5533636,4,39841,0),(17,1,1,null,7),(18,742881,-1420303,6,1),(19,281430,6753011,3,2),(20,7,1,4,-31350),(21,-5663089 [...] + """ + + sql """ + insert into t2(pk,c1,c2,c3,c4) values (0,5,4,189864,-7663457),(1,7,null,6,1),(2,null,8,-3362640,9),(3,3,2,5,-2197130),(4,2,3,7160615,1),(5,null,-57834,420441,3),(6,0,null,2,2),(7,1,-3681539,3,4),(8,548866,3,0,5),(9,8,-2824887,0,3246956),(10,5,3,7,2),(11,8,8,6,8),(12,0,2,7,9),(13,8,6,null,null),(14,-4103729,4,5,8),(15,-3659292,2,7,5),(16,8,7,1,null),(17,2526018,4,8069607,5),(18,6,6,5,2802235),(19,9,0,6379201,null),(20,3,null,4,3),(21,0,8,-5506402,2),(22,6,4,3,1),(23,4,5225086,3,1) [...] + """ + + sql """ + insert into t3(pk,c1,c2,c3,c4) values (0,3,2,6,-3164679),(1,-6216443,3437690,-288827,6),(2,4,-5352286,-1005469,4118240),(3,9,6795167,5,1616205),(4,8,-4659990,-4816829,6),(5,0,9,4,8),(6,-4454766,2,2510766,3),(7,7860071,-3434966,8,3),(8,null,0,2,1),(9,8031908,2,-6673194,-5981416),(10,5,6716310,8,2529959),(11,null,-3622116,1,-7891010),(12,null,3527222,7993802,null),(13,null,1,2,1),(14,2,8,7,7),(15,0,9,5,null),(16,7452083,null,-4620796,0),(17,9,9,null,6),(18,3,1,-1578776,5),(19,9,253 [...] + """ + + sql """ + insert into t4(pk,c1,c2,c3,c4) values (0,-4263513,null,null,6),(1,1,3,4,null),(2,2460936,6,5,6299003),(3,null,7,7107446,-2366754),(4,6247611,4785035,3,-8014875),(5,0,2,5249218,3),(6,null,253825,4,3),(7,null,2,9,-350785),(8,6,null,null,4),(9,1,3,1,3422691),(10,0,-6596165,1808018,3),(11,2,752342,null,1),(12,-5220927,2676278,9,7),(13,6025864,2,1,4),(14,7,4,4,9),(15,5,9,9,849881),(16,-4253076,null,-4404479,-6365351),(17,null,6,4240023,3),(18,7,1276495,7,6),(19,null,-4459040,178194,-6 [...] + """ + + sql """ + EXPLAIN ALL PLAN WITH XX AS ( + SELECT C1 + FROM t1 + ), + RECURSIVE YY AS ( + SELECT C1 + 1.0 as c2 + FROM XX + UNION + SELECT c1 + FROM XX t2 join YY + WHERE t2.c1 = YY.c2 + ) + SELECT * FROM YY; + """ +} --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
