This is an automated email from the ASF dual-hosted git repository. starocean999 pushed a commit to branch dev_rec in repository https://gitbox.apache.org/repos/asf/doris.git
commit c94e85b20c91f38f5063d1384429937fa85cbfbe Author: lichi <[email protected]> AuthorDate: Sat Oct 11 16:55:51 2025 +0800 fix some bug in fe --- .../org/apache/doris/analysis/DescriptorTable.java | 5 + .../doris/catalog/RecursiveCteTempTable.java | 4 +- .../glue/translator/PhysicalPlanTranslator.java | 77 ++++++- .../processor/post/RuntimeFilterPruner.java | 9 + .../properties/ChildOutputPropertyDeriver.java | 2 +- .../doris/nereids/rules/analysis/AnalyzeCTE.java | 9 +- .../LogicalRecursiveCteToPhysicalRecursiveCte.java | 3 +- .../nereids/rules/rewrite/AdjustNullable.java | 63 ++++-- .../doris/nereids/rules/rewrite/ColumnPruning.java | 22 +- .../doris/nereids/stats/StatsCalculator.java | 70 ++++++- .../trees/copier/LogicalPlanDeepCopier.java | 8 +- .../trees/plans/algebra/RecursiveCte.java} | 20 +- .../commands/CreateMaterializedViewCommand.java | 2 +- .../distribute/worker/job/AssignedJobBuilder.java | 93 --------- .../worker/job/UnassignedRecursiveCteScanJob.java | 5 +- .../trees/plans/logical/LogicalRecursiveCte.java | 223 ++++++++++++--------- .../trees/plans/physical/PhysicalRecursiveCte.java | 113 +++++++---- .../nereids/trees/plans/visitor/PlanVisitor.java | 2 +- .../apache/doris/planner/RecursiveCteScanNode.java | 18 +- .../doris/qe/runtime/ThriftPlansBuilder.java | 96 +++++++++ 20 files changed, 547 insertions(+), 297 deletions(-) diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/DescriptorTable.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/DescriptorTable.java index fb6cc7df0a8..197293ac69a 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/DescriptorTable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/DescriptorTable.java @@ -20,6 +20,7 @@ package org.apache.doris.analysis; +import org.apache.doris.catalog.RecursiveCteTempTable; import org.apache.doris.catalog.TableIf; import org.apache.doris.common.AnalysisException; import org.apache.doris.common.IdGenerator; @@ -212,6 +213,10 @@ public class DescriptorTable { } for (TableIf tbl : referencedTbls.values()) { + if (tbl instanceof RecursiveCteTempTable) { + // skip recursive cte temp table + continue; + } result.addToTableDescriptors(tbl.toThrift()); } thriftDescTable = result; diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/RecursiveCteTempTable.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/RecursiveCteTempTable.java index 9f36b04dfc4..ee924466fbb 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/RecursiveCteTempTable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/RecursiveCteTempTable.java @@ -17,10 +17,12 @@ package org.apache.doris.catalog; +import org.apache.doris.common.SystemIdGenerator; + import java.util.List; public class RecursiveCteTempTable extends Table { public RecursiveCteTempTable(String tableName, List<Column> fullSchema) { - super(-1, tableName, TableType.RECURSIVE_CTE_TEMP_TABLE, fullSchema); + super(SystemIdGenerator.getNextId(), tableName, TableType.RECURSIVE_CTE_TEMP_TABLE, fullSchema); } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java index 21f9bbf2126..a27268a5af7 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java @@ -116,7 +116,6 @@ import org.apache.doris.nereids.trees.plans.Plan; import org.apache.doris.nereids.trees.plans.PreAggStatus; import org.apache.doris.nereids.trees.plans.algebra.Aggregate; import org.apache.doris.nereids.trees.plans.algebra.Relation; -import org.apache.doris.nereids.trees.plans.algebra.SetOperation; import org.apache.doris.nereids.trees.plans.physical.AbstractPhysicalJoin; import org.apache.doris.nereids.trees.plans.physical.AbstractPhysicalSort; import org.apache.doris.nereids.trees.plans.physical.PhysicalAssertNumRows; @@ -2269,6 +2268,79 @@ public class PhysicalPlanTranslator extends DefaultPlanVisitor<PlanFragment, Pla return inputFragment; } + @Override + public PlanFragment visitPhysicalRecursiveCte(PhysicalRecursiveCte recursiveCte, PlanTranslatorContext context) { + List<PlanFragment> childrenFragments = new ArrayList<>(); + for (Plan plan : recursiveCte.children()) { + childrenFragments.add(plan.accept(this, context)); + } + + TupleDescriptor setTuple = generateTupleDesc(recursiveCte.getOutput(), null, context); + List<SlotDescriptor> outputSlotDescs = new ArrayList<>(setTuple.getSlots()); + + RecursiveCteNode recursiveCteNode = new RecursiveCteNode(context.nextPlanNodeId(), setTuple.getId(), + recursiveCte.isUnionAll()); + List<List<Expr>> distributeExprLists = getDistributeExprs(recursiveCte.children().toArray(new Plan[0])); + recursiveCteNode.setChildrenDistributeExprLists(distributeExprLists); + recursiveCteNode.setNereidsId(recursiveCte.getId()); + List<List<Expression>> resultExpressionLists = Lists.newArrayList(); + context.getNereidsIdToPlanNodeIdMap().put(recursiveCte.getId(), recursiveCteNode.getId()); + for (List<SlotReference> regularChildrenOutput : recursiveCte.getRegularChildrenOutputs()) { + resultExpressionLists.add(new ArrayList<>(regularChildrenOutput)); + } + + for (PlanFragment childFragment : childrenFragments) { + recursiveCteNode.addChild(childFragment.getPlanRoot()); + } + + List<List<Expr>> materializedResultExprLists = Lists.newArrayList(); + for (int i = 0; i < resultExpressionLists.size(); ++i) { + List<Expression> resultExpressionList = resultExpressionLists.get(i); + List<Expr> exprList = Lists.newArrayList(); + Preconditions.checkState(resultExpressionList.size() == outputSlotDescs.size()); + for (int j = 0; j < resultExpressionList.size(); ++j) { + if (outputSlotDescs.get(j).isMaterialized()) { + exprList.add(ExpressionTranslator.translate(resultExpressionList.get(j), context)); + // TODO: reconsider this, we may change nullable info in previous nereids rules not here. + outputSlotDescs.get(j) + .setIsNullable(outputSlotDescs.get(j).getIsNullable() || exprList.get(j).isNullable()); + } + } + materializedResultExprLists.add(exprList); + } + recursiveCteNode.setMaterializedResultExprLists(materializedResultExprLists); + Preconditions.checkState(recursiveCteNode.getMaterializedResultExprLists().size() + == recursiveCteNode.getChildren().size()); + + PlanFragment recursiveCteFragment; + if (childrenFragments.isEmpty()) { + recursiveCteFragment = createPlanFragment(recursiveCteNode, + DataPartition.UNPARTITIONED, recursiveCte); + context.addPlanFragment(recursiveCteFragment); + } else { + int childrenSize = childrenFragments.size(); + recursiveCteFragment = childrenFragments.get(childrenSize - 1); + for (int i = childrenSize - 2; i >= 0; i--) { + context.mergePlanFragment(childrenFragments.get(i), recursiveCteFragment); + for (PlanFragment child : childrenFragments.get(i).getChildren()) { + recursiveCteFragment.addChild(child); + } + } + setPlanRoot(recursiveCteFragment, recursiveCteNode, recursiveCte); + } + + // in pipeline engine, we use parallel scan by default, but it broke the rule of data distribution + // we need turn of parallel scan to ensure to get correct result. + // TODO: nereids forbid all parallel scan under PhysicalSetOperation temporary + if (!recursiveCte.getPhysicalProperties().equals(PhysicalProperties.ANY) + && findOlapScanNodesByPassExchangeAndJoinNode(recursiveCteFragment.getPlanRoot())) { + recursiveCteFragment.setHasColocatePlanNode(true); + recursiveCteNode.setColocate(true); + } + + return recursiveCteFragment; + } + /** * Returns a new fragment with a UnionNode as its root. The data partition of the * returned fragment and how the data of the child fragments is consumed depends on the @@ -2300,9 +2372,6 @@ public class PhysicalPlanTranslator extends DefaultPlanVisitor<PlanFragment, Pla setOperationNode = new ExceptNode(context.nextPlanNodeId(), setTuple.getId()); } else if (setOperation instanceof PhysicalIntersect) { setOperationNode = new IntersectNode(context.nextPlanNodeId(), setTuple.getId()); - } else if (setOperation instanceof PhysicalRecursiveCte) { - setOperationNode = new RecursiveCteNode(context.nextPlanNodeId(), setTuple.getId(), - setOperation.getQualifier().equals(SetOperation.Qualifier.ALL)); } else { throw new RuntimeException("not support set operation type " + setOperation); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/RuntimeFilterPruner.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/RuntimeFilterPruner.java index 51a9003b78f..d7e4e5b67cb 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/RuntimeFilterPruner.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/RuntimeFilterPruner.java @@ -33,6 +33,7 @@ import org.apache.doris.nereids.trees.plans.physical.PhysicalHashJoin; import org.apache.doris.nereids.trees.plans.physical.PhysicalIntersect; import org.apache.doris.nereids.trees.plans.physical.PhysicalLimit; import org.apache.doris.nereids.trees.plans.physical.PhysicalNestedLoopJoin; +import org.apache.doris.nereids.trees.plans.physical.PhysicalRecursiveCte; import org.apache.doris.nereids.trees.plans.physical.PhysicalRelation; import org.apache.doris.nereids.trees.plans.physical.PhysicalSetOperation; import org.apache.doris.nereids.trees.plans.physical.PhysicalTopN; @@ -76,6 +77,14 @@ public class RuntimeFilterPruner extends PlanPostProcessor { return plan; } + @Override + public PhysicalRecursiveCte visitPhysicalRecursiveCte(PhysicalRecursiveCte recursiveCte, CascadesContext context) { + for (Plan child : recursiveCte.children()) { + child.accept(this, context); + } + return recursiveCte; + } + @Override public PhysicalSetOperation visitPhysicalSetOperation(PhysicalSetOperation setOperation, CascadesContext context) { for (Plan child : setOperation.children()) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/ChildOutputPropertyDeriver.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/ChildOutputPropertyDeriver.java index d3592388dae..0751f14b715 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/ChildOutputPropertyDeriver.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/ChildOutputPropertyDeriver.java @@ -151,7 +151,7 @@ public class ChildOutputPropertyDeriver extends PlanVisitor<PhysicalProperties, @Override public PhysicalProperties visitPhysicalRecursiveCteScan(PhysicalRecursiveCteScan cteScan, PlanContext context) { - return PhysicalProperties.GATHER; + return PhysicalProperties.MUST_SHUFFLE; } /** diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/AnalyzeCTE.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/AnalyzeCTE.java index 8ac4234e846..a6259f0b24c 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/AnalyzeCTE.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/AnalyzeCTE.java @@ -29,6 +29,7 @@ import org.apache.doris.nereids.trees.expressions.Slot; import org.apache.doris.nereids.trees.expressions.SlotReference; import org.apache.doris.nereids.trees.expressions.StatementScopeIdGenerator; import org.apache.doris.nereids.trees.plans.Plan; +import org.apache.doris.nereids.trees.plans.algebra.SetOperation; import org.apache.doris.nereids.trees.plans.logical.LogicalCTE; import org.apache.doris.nereids.trees.plans.logical.LogicalCTEAnchor; import org.apache.doris.nereids.trees.plans.logical.LogicalCTEProducer; @@ -159,9 +160,10 @@ public class AnalyzeCTE extends OneAnalysisRuleFactory { LogicalPlan analyzedRecursiveChild = (LogicalPlan) innerRecursiveCascadesCtx.getRewritePlan(); LogicalUnion logicalUnion = (LogicalUnion) parsedCtePlan; - // manually bind LogicalRecursiveCte, see bindSetOperation in BindExpression.java + // create LogicalRecursiveCte LogicalRecursiveCte analyzedCtePlan = new LogicalRecursiveCte( - logicalUnion.getQualifier(), ImmutableList.of(analyzedAnchorChild, analyzedRecursiveChild)); + logicalUnion.getQualifier() == SetOperation.Qualifier.ALL, + ImmutableList.of(analyzedAnchorChild, analyzedRecursiveChild)); List<List<NamedExpression>> childrenProjections = analyzedCtePlan.collectChildrenProjections(); int childrenProjectionSize = childrenProjections.size(); ImmutableList.Builder<List<SlotReference>> childrenOutputs = ImmutableList @@ -180,8 +182,7 @@ public class AnalyzeCTE extends OneAnalysisRuleFactory { newChildren.add(newChild); childrenOutputs.add((List<SlotReference>) (List) newChild.getOutput()); } - analyzedCtePlan = (LogicalRecursiveCte) analyzedCtePlan.withChildrenAndTheirOutputs(newChildren.build(), - childrenOutputs.build()); + analyzedCtePlan = analyzedCtePlan.withChildrenAndTheirOutputs(newChildren.build(), childrenOutputs.build()); List<NamedExpression> newOutputs = analyzedCtePlan.buildNewOutputs(); analyzedCtePlan = analyzedCtePlan.withNewOutputs(newOutputs); diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/implementation/LogicalRecursiveCteToPhysicalRecursiveCte.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/implementation/LogicalRecursiveCteToPhysicalRecursiveCte.java index c8960aa9e51..467092895a9 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/implementation/LogicalRecursiveCteToPhysicalRecursiveCte.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/implementation/LogicalRecursiveCteToPhysicalRecursiveCte.java @@ -28,10 +28,9 @@ public class LogicalRecursiveCteToPhysicalRecursiveCte extends OneImplementation @Override public Rule build() { return logicalRecursiveCte().then(recursiveCte -> - new PhysicalRecursiveCte(recursiveCte.getQualifier(), + new PhysicalRecursiveCte(recursiveCte.isUnionAll(), recursiveCte.getOutputs(), recursiveCte.getRegularChildrenOutputs(), - recursiveCte.getConstantExprsList(), recursiveCte.getLogicalProperties(), recursiveCte.children()) ).toRule(RuleType.LOGICAL_RECURSIVE_CTE_TO_PHYSICAL_RECURSIVE_CTE); diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/AdjustNullable.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/AdjustNullable.java index 63d63f32b6f..204a018fbc7 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/AdjustNullable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/AdjustNullable.java @@ -290,6 +290,53 @@ public class AdjustNullable extends DefaultPlanRewriter<Map<ExprId, Slot>> imple return repeat.withGroupSetsAndOutput(repeat.getGroupingSets(), newOutputs).recomputeLogicalProperties(); } + @Override + public Plan visitLogicalRecursiveCte(LogicalRecursiveCte recursiveCte, Map<ExprId, Slot> replaceMap) { + recursiveCte = (LogicalRecursiveCte) super.visit(recursiveCte, replaceMap); + ImmutableList.Builder<List<SlotReference>> newChildrenOutputs = ImmutableList.builder(); + List<Boolean> inputNullable = null; + if (!recursiveCte.children().isEmpty()) { + inputNullable = Lists.newArrayListWithCapacity(recursiveCte.getOutputs().size()); + for (int i = 0; i < recursiveCte.getOutputs().size(); i++) { + inputNullable.add(false); + } + for (int i = 0; i < recursiveCte.arity(); i++) { + List<Slot> childOutput = recursiveCte.child(i).getOutput(); + List<SlotReference> setChildOutput = recursiveCte.getRegularChildOutput(i); + ImmutableList.Builder<SlotReference> newChildOutputs = ImmutableList.builder(); + for (int j = 0; j < setChildOutput.size(); j++) { + for (Slot slot : childOutput) { + if (slot.getExprId().equals(setChildOutput.get(j).getExprId())) { + inputNullable.set(j, slot.nullable() || inputNullable.get(j)); + newChildOutputs.add((SlotReference) slot); + break; + } + } + } + newChildrenOutputs.add(newChildOutputs.build()); + } + } + if (inputNullable == null) { + // this is a fail-safe + // means there is no children and having no getConstantExprsList + // no way to update the nullable flag, so just do nothing + return recursiveCte; + } + List<NamedExpression> outputs = recursiveCte.getOutputs(); + List<NamedExpression> newOutputs = Lists.newArrayListWithCapacity(outputs.size()); + for (int i = 0; i < inputNullable.size(); i++) { + NamedExpression ne = outputs.get(i); + Slot slot = ne instanceof Alias ? (Slot) ((Alias) ne).child() : (Slot) ne; + slot = slot.withNullable(inputNullable.get(i)); + NamedExpression newOutput = ne instanceof Alias ? (NamedExpression) ne.withChildren(slot) : slot; + newOutputs.add(newOutput); + replaceMap.put(newOutput.getExprId(), newOutput.toSlot()); + } + return recursiveCte.withNewOutputs(newOutputs) + .withChildrenAndTheirOutputs(recursiveCte.children(), newChildrenOutputs.build()) + .recomputeLogicalProperties(); + } + @Override public Plan visitLogicalSetOperation(LogicalSetOperation setOperation, Map<ExprId, Slot> replaceMap) { setOperation = (LogicalSetOperation) super.visit(setOperation, replaceMap); @@ -331,22 +378,6 @@ public class AdjustNullable extends DefaultPlanRewriter<Map<ExprId, Slot>> imple inputNullable.set(j, inputNullable.get(j) || constantExprs.get(j).nullable()); } } - } else if (setOperation instanceof LogicalRecursiveCte) { - // LogicalRecursiveCte is basically like LogicalUnion, so just do same as LogicalUnion - LogicalRecursiveCte logicalRecursiveCte = (LogicalRecursiveCte) setOperation; - if (!logicalRecursiveCte.getConstantExprsList().isEmpty() && setOperation.children().isEmpty()) { - int outputSize = logicalRecursiveCte.getConstantExprsList().get(0).size(); - // create the inputNullable list and fill it with all FALSE values - inputNullable = Lists.newArrayListWithCapacity(outputSize); - for (int i = 0; i < outputSize; i++) { - inputNullable.add(false); - } - } - for (List<NamedExpression> constantExprs : logicalRecursiveCte.getConstantExprsList()) { - for (int j = 0; j < constantExprs.size(); j++) { - inputNullable.set(j, inputNullable.get(j) || constantExprs.get(j).nullable()); - } - } } if (inputNullable == null) { // this is a fail-safe diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/ColumnPruning.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/ColumnPruning.java index 96daceedfe4..30a00314fbf 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/ColumnPruning.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/ColumnPruning.java @@ -217,7 +217,7 @@ public class ColumnPruning extends DefaultPlanRewriter<PruneContext> implements @Override public Plan visitLogicalRecursiveCte(LogicalRecursiveCte recursiveCte, PruneContext context) { // LogicalRecursiveCte is basically like LogicalUnion, so just do same as LogicalUnion - if (recursiveCte.getQualifier() == Qualifier.DISTINCT) { + if (!recursiveCte.isUnionAll()) { return skipPruneThisAndFirstLevelChildren(recursiveCte); } LogicalRecursiveCte prunedOutputRecursiveCte = pruneRecursiveCteOutput(recursiveCte, context); @@ -451,7 +451,6 @@ public class ColumnPruning extends DefaultPlanRewriter<PruneContext> implements return recursiveCte; } List<NamedExpression> prunedOutputs = Lists.newArrayList(); - List<List<NamedExpression>> constantExprsList = recursiveCte.getConstantExprsList(); List<List<SlotReference>> regularChildrenOutputs = recursiveCte.getRegularChildrenOutputs(); List<Plan> children = recursiveCte.children(); List<Integer> extractColumnIndex = Lists.newArrayList(); @@ -463,8 +462,6 @@ public class ColumnPruning extends DefaultPlanRewriter<PruneContext> implements } } - ImmutableList.Builder<List<NamedExpression>> prunedConstantExprsList - = ImmutableList.builderWithExpectedSize(constantExprsList.size()); if (prunedOutputs.isEmpty()) { // process prune all columns NamedExpression originSlot = originOutput.get(0); @@ -472,7 +469,7 @@ public class ColumnPruning extends DefaultPlanRewriter<PruneContext> implements TinyIntType.INSTANCE, false, originSlot.getQualifier())); regularChildrenOutputs = Lists.newArrayListWithCapacity(regularChildrenOutputs.size()); children = Lists.newArrayListWithCapacity(children.size()); - for (int i = 0; i < recursiveCte.getArity(); i++) { + for (int i = 0; i < recursiveCte.children().size(); i++) { Plan child = recursiveCte.child(i); List<NamedExpression> newProjectOutput = ImmutableList.of(new Alias(new TinyIntLiteral((byte) 1))); LogicalProject<?> project; @@ -487,25 +484,12 @@ public class ColumnPruning extends DefaultPlanRewriter<PruneContext> implements regularChildrenOutputs.add((List) project.getOutput()); children.add(project); } - for (int i = 0; i < constantExprsList.size(); i++) { - prunedConstantExprsList.add(ImmutableList.of(new Alias(new TinyIntLiteral((byte) 1)))); - } - } else { - int len = extractColumnIndex.size(); - for (List<NamedExpression> row : constantExprsList) { - ImmutableList.Builder<NamedExpression> newRow = ImmutableList.builderWithExpectedSize(len); - for (int idx : extractColumnIndex) { - newRow.add(row.get(idx)); - } - prunedConstantExprsList.add(newRow.build()); - } } if (prunedOutputs.equals(originOutput) && !context.requiredSlotsIds.isEmpty()) { return recursiveCte; } else { - return recursiveCte.withNewOutputsChildrenAndConstExprsList(prunedOutputs, children, - regularChildrenOutputs, prunedConstantExprsList.build()); + return recursiveCte.withNewOutputsAndChildren(prunedOutputs, children, regularChildrenOutputs); } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/stats/StatsCalculator.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/stats/StatsCalculator.java index a14c2dfc9cb..9ddcc7e2e40 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/stats/StatsCalculator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/stats/StatsCalculator.java @@ -60,6 +60,7 @@ import org.apache.doris.nereids.trees.plans.algebra.Limit; import org.apache.doris.nereids.trees.plans.algebra.OlapScan; import org.apache.doris.nereids.trees.plans.algebra.PartitionTopN; import org.apache.doris.nereids.trees.plans.algebra.Project; +import org.apache.doris.nereids.trees.plans.algebra.RecursiveCte; import org.apache.doris.nereids.trees.plans.algebra.Relation; import org.apache.doris.nereids.trees.plans.algebra.Repeat; import org.apache.doris.nereids.trees.plans.algebra.SetOperation; @@ -925,7 +926,7 @@ public class StatsCalculator extends DefaultPlanVisitor<Statistics, Void> { @Override public Statistics visitLogicalRecursiveCte( LogicalRecursiveCte recursiveCte, Void context) { - return computeUnion(recursiveCte, + return computeRecursiveCte(recursiveCte, groupExpression.children() .stream().map(Group::getStatistics).collect(Collectors.toList())); } @@ -1108,7 +1109,7 @@ public class StatsCalculator extends DefaultPlanVisitor<Statistics, Void> { @Override public Statistics visitPhysicalRecursiveCte(PhysicalRecursiveCte recursiveCte, Void context) { - return computeUnion(recursiveCte, groupExpression.children() + return computeRecursiveCte(recursiveCte, groupExpression.children() .stream().map(Group::getStatistics).collect(Collectors.toList())); } @@ -1493,6 +1494,71 @@ public class StatsCalculator extends DefaultPlanVisitor<Statistics, Void> { return new Statistics(rowCount, 1, columnStatsMap); } + /** + * computeRecursiveCte + */ + public Statistics computeRecursiveCte(RecursiveCte recursiveCte, List<Statistics> childStats) { + // TODO: refactor this for one row relation + List<SlotReference> head; + Statistics headStats; + List<List<SlotReference>> childOutputs = Lists.newArrayList(recursiveCte.getRegularChildrenOutputs()); + + head = childOutputs.get(0); + headStats = new StatisticsBuilder(childStats.get(0)).build(); + + StatisticsBuilder statisticsBuilder = new StatisticsBuilder(); + List<NamedExpression> unionOutput = recursiveCte.getOutputs(); + double unionRowCount = childStats.stream().mapToDouble(Statistics::getRowCount).sum(); + statisticsBuilder.setRowCount(unionRowCount); + + for (int i = 0; i < head.size(); i++) { + Slot headSlot = head.get(i); + ColumnStatisticBuilder colStatsBuilder = new ColumnStatisticBuilder( + headStats.findColumnStatistics(headSlot)); + for (int j = 1; j < childOutputs.size(); j++) { + Slot slot = childOutputs.get(j).get(i); + ColumnStatistic rightStatistic = childStats.get(j).findColumnStatistics(slot); + double rightRowCount = childStats.get(j).getRowCount(); + colStatsBuilder = unionColumn(colStatsBuilder, + headStats.getRowCount(), rightStatistic, rightRowCount, headSlot.getDataType()); + } + + //update hot values + Map<Literal, Float> unionHotValues = new HashMap<>(); + for (int j = 0; j < childOutputs.size(); j++) { + Slot slot = childOutputs.get(j).get(i); + ColumnStatistic slotStats = childStats.get(j).findColumnStatistics(slot); + if (slotStats.getHotValues() != null) { + for (Map.Entry<Literal, Float> entry : slotStats.getHotValues().entrySet()) { + Float value = unionHotValues.get(entry.getKey()); + if (value == null) { + unionHotValues.put(entry.getKey(), + (float) (entry.getValue() * childStats.get(j).getRowCount())); + } else { + unionHotValues.put(entry.getKey(), + (float) (value + entry.getValue() * childStats.get(j).getRowCount())); + } + } + } + } + + Map<Literal, Float> resultHotValues = new LinkedHashMap<>(); + for (Literal hot : unionHotValues.keySet()) { + float ratio = (float) (unionHotValues.get(hot) / unionRowCount); + if (ratio * colStatsBuilder.getNdv() >= SessionVariable.getSkewValueThreshold() + || ratio >= SessionVariable.getHotValueThreshold()) { + resultHotValues.put(hot, ratio); + } + } + if (!resultHotValues.isEmpty()) { + colStatsBuilder.setHotValues(resultHotValues); + } + statisticsBuilder.putColumnStatistics(unionOutput.get(i), colStatsBuilder.build()); + } + + return statisticsBuilder.setWidthInJoinCluster(1).build(); + } + /** * computeUnion */ diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/copier/LogicalPlanDeepCopier.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/copier/LogicalPlanDeepCopier.java index c9cf877f709..739047f4a77 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/copier/LogicalPlanDeepCopier.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/copier/LogicalPlanDeepCopier.java @@ -360,11 +360,6 @@ public class LogicalPlanDeepCopier extends DefaultPlanRewriter<DeepCopierContext List<Plan> children = recursiveCte.children().stream() .map(c -> c.accept(this, context)) .collect(ImmutableList.toImmutableList()); - List<List<NamedExpression>> constantExprsList = recursiveCte.getConstantExprsList().stream() - .map(l -> l.stream() - .map(e -> (NamedExpression) ExpressionDeepCopier.INSTANCE.deepCopy(e, context)) - .collect(ImmutableList.toImmutableList())) - .collect(ImmutableList.toImmutableList()); List<NamedExpression> outputs = recursiveCte.getOutputs().stream() .map(o -> (NamedExpression) ExpressionDeepCopier.INSTANCE.deepCopy(o, context)) .collect(ImmutableList.toImmutableList()); @@ -373,8 +368,7 @@ public class LogicalPlanDeepCopier extends DefaultPlanRewriter<DeepCopierContext .map(o -> (SlotReference) ExpressionDeepCopier.INSTANCE.deepCopy(o, context)) .collect(ImmutableList.toImmutableList())) .collect(ImmutableList.toImmutableList()); - return new LogicalRecursiveCte(recursiveCte.getQualifier(), outputs, childrenOutputs, - constantExprsList, recursiveCte.hasPushedFilter(), children); + return new LogicalRecursiveCte(recursiveCte.isUnionAll(), outputs, childrenOutputs, children); } @Override diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/RecursiveCteTempTable.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/algebra/RecursiveCte.java similarity index 64% copy from fe/fe-core/src/main/java/org/apache/doris/catalog/RecursiveCteTempTable.java copy to fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/algebra/RecursiveCte.java index 9f36b04dfc4..d7fd8f985a1 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/RecursiveCteTempTable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/algebra/RecursiveCte.java @@ -15,12 +15,22 @@ // specific language governing permissions and limitations // under the License. -package org.apache.doris.catalog; +package org.apache.doris.nereids.trees.plans.algebra; + +import org.apache.doris.nereids.trees.expressions.NamedExpression; +import org.apache.doris.nereids.trees.expressions.SlotReference; import java.util.List; -public class RecursiveCteTempTable extends Table { - public RecursiveCteTempTable(String tableName, List<Column> fullSchema) { - super(-1, tableName, TableType.RECURSIVE_CTE_TEMP_TABLE, fullSchema); - } +/** + * Common interface for logical/physical recursive cte. + */ +public interface RecursiveCte { + boolean isUnionAll(); + + List<SlotReference> getRegularChildOutput(int i); + + List<NamedExpression> getOutputs(); + + List<List<SlotReference>> getRegularChildrenOutputs(); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/CreateMaterializedViewCommand.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/CreateMaterializedViewCommand.java index e8387d675dc..a7f99e101ae 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/CreateMaterializedViewCommand.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/CreateMaterializedViewCommand.java @@ -326,7 +326,7 @@ public class CreateMaterializedViewCommand extends Command implements ForwardWit } try { Expr defineExpr = translateToLegacyExpr(predicate, context.planTranslatorContext); - context.filterItem = new MVColumnItem(predicate.toSql(), defineExpr); + context.filterItem = new MVColumnItem(defineExpr.toSqlWithoutTbl(), defineExpr); } catch (Exception ex) { throw new AnalysisException(ex.getMessage()); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/AssignedJobBuilder.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/AssignedJobBuilder.java index 4291ba32732..9369acd8d6f 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/AssignedJobBuilder.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/AssignedJobBuilder.java @@ -17,33 +17,19 @@ package org.apache.doris.nereids.trees.plans.distribute.worker.job; -import org.apache.doris.analysis.Expr; import org.apache.doris.nereids.trees.plans.distribute.DistributeContext; import org.apache.doris.nereids.trees.plans.distribute.worker.BackendDistributedPlanWorkerManager; -import org.apache.doris.nereids.trees.plans.distribute.worker.DistributedPlanWorker; import org.apache.doris.planner.ExchangeNode; -import org.apache.doris.planner.PlanFragment; import org.apache.doris.planner.PlanFragmentId; -import org.apache.doris.planner.RecursiveCteNode; -import org.apache.doris.planner.RecursiveCteScanNode; import org.apache.doris.thrift.TExplainLevel; -import org.apache.doris.thrift.TExpr; -import org.apache.doris.thrift.TNetworkAddress; -import org.apache.doris.thrift.TRecCTENode; -import org.apache.doris.thrift.TRecCTEResetInfo; -import org.apache.doris.thrift.TRecCTETarget; import com.google.common.collect.ArrayListMultimap; import com.google.common.collect.ListMultimap; -import java.util.ArrayList; import java.util.Collection; import java.util.List; import java.util.Map; import java.util.Map.Entry; -import java.util.Set; -import java.util.TreeMap; -import java.util.TreeSet; /** AssignedJobBuilder */ public class AssignedJobBuilder { @@ -53,8 +39,6 @@ public class AssignedJobBuilder { boolean isLoadJob) { DistributeContext distributeContext = new DistributeContext(workerManager, isLoadJob); ListMultimap<PlanFragmentId, AssignedJob> allAssignedJobs = ArrayListMultimap.create(); - Map<PlanFragmentId, TRecCTETarget> fragmentIdToRecCteTargetMap = new TreeMap<>(); - Map<PlanFragmentId, Set<TNetworkAddress>> fragmentIdToNetworkAddressMap = new TreeMap<>(); for (Entry<PlanFragmentId, UnassignedJob> kv : unassignedJobs.entrySet()) { PlanFragmentId fragmentId = kv.getKey(); UnassignedJob unassignedJob = kv.getValue(); @@ -71,83 +55,6 @@ public class AssignedJobBuilder { + ", fragment: " + unassignedJob.getFragment().getExplainString(TExplainLevel.VERBOSE)); } allAssignedJobs.putAll(fragmentId, fragmentAssignedJobs); - - Set<TNetworkAddress> networkAddresses = new TreeSet<>(); - for (AssignedJob assignedJob : fragmentAssignedJobs) { - DistributedPlanWorker distributedPlanWorker = assignedJob.getAssignedWorker(); - networkAddresses.add(new TNetworkAddress(distributedPlanWorker.host(), distributedPlanWorker.port())); - } - fragmentIdToNetworkAddressMap.put(fragmentId, networkAddresses); - - PlanFragment planFragment = unassignedJob.getFragment(); - List<RecursiveCteScanNode> recursiveCteScanNodes = planFragment.getPlanRoot() - .collectInCurrentFragment(RecursiveCteScanNode.class::isInstance); - if (!recursiveCteScanNodes.isEmpty()) { - if (recursiveCteScanNodes.size() != 1) { - throw new IllegalStateException( - String.format("one fragment can only have 1 recursive cte scan node, but there is %d", - recursiveCteScanNodes.size())); - } - if (fragmentAssignedJobs.size() != 1) { - throw new IllegalStateException(String.format( - "fragmentAssignedJobs's size must be 1 for recursive cte scan node, but it is %d", - fragmentAssignedJobs.size())); - } - TRecCTETarget tRecCTETarget = new TRecCTETarget(); - DistributedPlanWorker distributedPlanWorker = fragmentAssignedJobs.get(0).getAssignedWorker(); - tRecCTETarget.setAddr(new TNetworkAddress(distributedPlanWorker.host(), distributedPlanWorker.port())); - tRecCTETarget.setFragmentInstanceId(fragmentAssignedJobs.get(0).instanceId()); - tRecCTETarget.setNodeId(recursiveCteScanNodes.get(0).getId().asInt()); - fragmentIdToRecCteTargetMap.put(fragmentId, tRecCTETarget); - } - - List<RecursiveCteNode> recursiveCteNodes = planFragment.getPlanRoot() - .collectInCurrentFragment(RecursiveCteNode.class::isInstance); - if (!recursiveCteNodes.isEmpty()) { - if (recursiveCteNodes.size() != 1) { - throw new IllegalStateException( - String.format("one fragment can only have 1 recursive cte node, but there is %d", - recursiveCteNodes.size())); - } - - List<TRecCTETarget> targets = new ArrayList<>(); - List<TRecCTEResetInfo> fragmentsToReset = new ArrayList<>(); - // PhysicalPlanTranslator will swap recursiveCteNodes's child fragment, - // so we get recursive one by 1st child - List<PlanFragment> childFragments = new ArrayList<>(); - planFragment.getChild(0).collectAll(PlanFragment.class::isInstance, childFragments); - for (PlanFragment child : childFragments) { - PlanFragmentId childFragmentId = child.getFragmentId(); - TRecCTETarget tRecCTETarget = fragmentIdToRecCteTargetMap.getOrDefault(childFragmentId, null); - if (tRecCTETarget != null) { - targets.add(tRecCTETarget); - } - Set<TNetworkAddress> tNetworkAddresses = fragmentIdToNetworkAddressMap.get(childFragmentId); - if (tNetworkAddresses == null) { - throw new IllegalStateException( - String.format("can't find TNetworkAddress for fragment %d", childFragmentId)); - } - for (TNetworkAddress address : tNetworkAddresses) { - TRecCTEResetInfo tRecCTEResetInfo = new TRecCTEResetInfo(); - tRecCTEResetInfo.setFragmentId(childFragmentId.asInt()); - tRecCTEResetInfo.setAddr(address); - fragmentsToReset.add(tRecCTEResetInfo); - } - } - - RecursiveCteNode recursiveCteNode = recursiveCteNodes.get(0); - List<List<Expr>> materializedResultExprLists = recursiveCteNode.getMaterializedResultExprLists(); - List<List<TExpr>> texprLists = new ArrayList<>(materializedResultExprLists.size()); - for (List<Expr> exprList : materializedResultExprLists) { - texprLists.add(Expr.treesToThrift(exprList)); - } - TRecCTENode tRecCTENode = new TRecCTENode(); - tRecCTENode.setIsUnionAll(recursiveCteNode.isUnionAll()); - tRecCTENode.setTargets(targets); - tRecCTENode.setFragmentsToReset(fragmentsToReset); - tRecCTENode.setResultExprLists(texprLists); - recursiveCteNode.settRecCTENode(tRecCTENode); - } } return allAssignedJobs; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/UnassignedRecursiveCteScanJob.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/UnassignedRecursiveCteScanJob.java index 7fbb3f88e21..cfd3ebe7b2b 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/UnassignedRecursiveCteScanJob.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/UnassignedRecursiveCteScanJob.java @@ -26,6 +26,7 @@ import org.apache.doris.planner.ExchangeNode; import org.apache.doris.planner.PlanFragment; import org.apache.doris.planner.ScanNode; +import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableList; import com.google.common.collect.ListMultimap; @@ -57,6 +58,8 @@ public class UnassignedRecursiveCteScanJob extends AbstractUnassignedScanJob { @Override protected List<AssignedJob> fillUpAssignedJobs(List<AssignedJob> assignedJobs, DistributedPlanWorkerManager workerManager, ListMultimap<ExchangeNode, AssignedJob> inputJobs) { - return fillUpSingleEmptyInstance(workerManager); + Preconditions.checkArgument(!assignedJobs.isEmpty(), + "assignedJobs is empty for UnassignedRecursiveCteScanJob"); + return assignedJobs; } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalRecursiveCte.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalRecursiveCte.java index cfdf317c620..3958d5b4df1 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalRecursiveCte.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalRecursiveCte.java @@ -19,19 +19,24 @@ package org.apache.doris.nereids.trees.plans.logical; import org.apache.doris.common.Pair; import org.apache.doris.nereids.CascadesContext; +import org.apache.doris.nereids.exceptions.AnalysisException; import org.apache.doris.nereids.memo.GroupExpression; import org.apache.doris.nereids.properties.DataTrait; import org.apache.doris.nereids.properties.LogicalProperties; import org.apache.doris.nereids.properties.PhysicalProperties; import org.apache.doris.nereids.rules.expression.ExpressionRewriteContext; +import org.apache.doris.nereids.trees.expressions.Alias; +import org.apache.doris.nereids.trees.expressions.Cast; +import org.apache.doris.nereids.trees.expressions.ExprId; import org.apache.doris.nereids.trees.expressions.Expression; import org.apache.doris.nereids.trees.expressions.NamedExpression; import org.apache.doris.nereids.trees.expressions.Slot; import org.apache.doris.nereids.trees.expressions.SlotReference; +import org.apache.doris.nereids.trees.expressions.StatementScopeIdGenerator; import org.apache.doris.nereids.trees.expressions.literal.Literal; import org.apache.doris.nereids.trees.plans.Plan; import org.apache.doris.nereids.trees.plans.PlanType; -import org.apache.doris.nereids.trees.plans.algebra.Union; +import org.apache.doris.nereids.trees.plans.algebra.RecursiveCte; import org.apache.doris.nereids.trees.plans.visitor.PlanVisitor; import org.apache.doris.nereids.types.DataType; import org.apache.doris.nereids.util.ExpressionUtils; @@ -56,68 +61,126 @@ import java.util.Set; /** * LogicalRecursiveCte is basically like LogicalUnion */ -public class LogicalRecursiveCte extends LogicalSetOperation implements Union, OutputPrunable { - - // in doris, we use union node to present one row relation - private final List<List<NamedExpression>> constantExprsList; - // When there is an agg on the union and there is a filter on the agg, - // it is necessary to keep the filter on the agg and push the filter down to each child of the union. - private final boolean hasPushedFilter; +public class LogicalRecursiveCte extends AbstractLogicalPlan implements RecursiveCte, OutputPrunable { + protected final List<NamedExpression> outputs; + protected final List<List<SlotReference>> regularChildrenOutputs; + private final boolean isUnionAll; /** LogicalRecursiveCte */ - public LogicalRecursiveCte(Qualifier qualifier, List<Plan> children) { - this(qualifier, ImmutableList.of(), children); + public LogicalRecursiveCte(boolean isUnionAll, List<Plan> children) { + this(isUnionAll, ImmutableList.of(), ImmutableList.of(), children); } /** LogicalRecursiveCte */ - public LogicalRecursiveCte(Qualifier qualifier, List<List<NamedExpression>> constantExprsList, - List<Plan> children) { - this(qualifier, ImmutableList.of(), ImmutableList.of(), constantExprsList, false, children); - } - - /** LogicalRecursiveCte */ - public LogicalRecursiveCte(Qualifier qualifier, List<NamedExpression> outputs, - List<List<SlotReference>> childrenOutputs, - List<List<NamedExpression>> constantExprsList, boolean hasPushedFilter, List<Plan> children) { - this(qualifier, outputs, childrenOutputs, constantExprsList, hasPushedFilter, Optional.empty(), + public LogicalRecursiveCte(boolean isUnionAll, List<NamedExpression> outputs, + List<List<SlotReference>> childrenOutputs, List<Plan> children) { + this(isUnionAll, outputs, childrenOutputs, Optional.empty(), Optional.empty(), children); } /** LogicalRecursiveCte */ - public LogicalRecursiveCte(Qualifier qualifier, List<NamedExpression> outputs, + public LogicalRecursiveCte(boolean isUnionAll, List<NamedExpression> outputs, List<List<SlotReference>> childrenOutputs, - List<List<NamedExpression>> constantExprsList, boolean hasPushedFilter, Optional<GroupExpression> groupExpression, Optional<LogicalProperties> logicalProperties, List<Plan> children) { - super(PlanType.LOGICAL_RECURSIVE_CTE, qualifier, outputs, childrenOutputs, - groupExpression, logicalProperties, children); - this.hasPushedFilter = hasPushedFilter; - this.constantExprsList = Utils.fastToImmutableList( - Objects.requireNonNull(constantExprsList, "constantExprsList should not be null")); + super(PlanType.LOGICAL_RECURSIVE_CTE, groupExpression, logicalProperties, children); + this.isUnionAll = isUnionAll; + this.outputs = ImmutableList.copyOf(outputs); + this.regularChildrenOutputs = ImmutableList.copyOf(childrenOutputs); } - public boolean hasPushedFilter() { - return hasPushedFilter; + @Override + public boolean isUnionAll() { + return isUnionAll; } - public List<List<NamedExpression>> getConstantExprsList() { - return constantExprsList; + @Override + public List<SlotReference> getRegularChildOutput(int i) { + return regularChildrenOutputs.get(i); } @Override - public List<? extends Expression> getExpressions() { - return constantExprsList.stream().flatMap(List::stream).collect(ImmutableList.toImmutableList()); + public List<List<SlotReference>> getRegularChildrenOutputs() { + return regularChildrenOutputs; + } + + public List<List<NamedExpression>> collectChildrenProjections() { + return castCommonDataTypeOutputs(); + } + + private List<List<NamedExpression>> castCommonDataTypeOutputs() { + int childOutputSize = child(0).getOutput().size(); + ImmutableList.Builder<NamedExpression> newLeftOutputs = ImmutableList.builderWithExpectedSize( + childOutputSize); + ImmutableList.Builder<NamedExpression> newRightOutputs = ImmutableList.builderWithExpectedSize( + childOutputSize + ); + // Ensure that the output types of the left and right children are consistent and expand upward. + for (int i = 0; i < childOutputSize; ++i) { + Slot left = child(0).getOutput().get(i); + Slot right = child(1).getOutput().get(i); + DataType compatibleType; + try { + compatibleType = LogicalSetOperation.getAssignmentCompatibleType(left.getDataType(), + right.getDataType()); + } catch (Exception e) { + throw new AnalysisException( + "Can not find compatible type for " + left + " and " + right + ", " + e.getMessage()); + } + Expression newLeft = TypeCoercionUtils.castIfNotSameTypeStrict(left, compatibleType); + Expression newRight = TypeCoercionUtils.castIfNotSameTypeStrict(right, compatibleType); + if (newLeft instanceof Cast) { + newLeft = new Alias(newLeft, left.getName()); + } + if (newRight instanceof Cast) { + newRight = new Alias(newRight, right.getName()); + } + newLeftOutputs.add((NamedExpression) newLeft); + newRightOutputs.add((NamedExpression) newRight); + } + + return ImmutableList.of(newLeftOutputs.build(), newRightOutputs.build()); + } + + /** + * Generate new output for Recursive Cte. + */ + public List<NamedExpression> buildNewOutputs() { + List<Slot> slots = resetNullableForLeftOutputs(); + ImmutableList.Builder<NamedExpression> newOutputs = ImmutableList.builderWithExpectedSize(slots.size()); + + for (int i = 0; i < slots.size(); i++) { + Slot slot = slots.get(i); + ExprId exprId = i < outputs.size() ? outputs.get(i).getExprId() : StatementScopeIdGenerator.newExprId(); + newOutputs.add( + new SlotReference(exprId, slot.toSql(), slot.getDataType(), slot.nullable(), ImmutableList.of()) + ); + } + return newOutputs.build(); + } + + // If the right child is nullable, need to ensure that the left child is also nullable + private List<Slot> resetNullableForLeftOutputs() { + int rightChildOutputSize = child(1).getOutput().size(); + ImmutableList.Builder<Slot> resetNullableForLeftOutputs + = ImmutableList.builderWithExpectedSize(rightChildOutputSize); + for (int i = 0; i < rightChildOutputSize; ++i) { + if (child(1).getOutput().get(i).nullable() && !child(0).getOutput().get(i).nullable()) { + resetNullableForLeftOutputs.add(child(0).getOutput().get(i).withNullable(true)); + } else { + resetNullableForLeftOutputs.add(child(0).getOutput().get(i)); + } + } + return resetNullableForLeftOutputs.build(); } @Override public String toString() { return Utils.toSqlStringSkipNull("LogicalRecursiveCte", - "qualifier", qualifier, + "isUnionAll", isUnionAll, "outputs", outputs, "regularChildrenOutputs", regularChildrenOutputs, - "constantExprsList", constantExprsList, - "hasPushedFilter", hasPushedFilter, "stats", statistics); } @@ -130,13 +193,13 @@ public class LogicalRecursiveCte extends LogicalSetOperation implements Union, O return false; } LogicalRecursiveCte that = (LogicalRecursiveCte) o; - return super.equals(that) && hasPushedFilter == that.hasPushedFilter - && Objects.equals(constantExprsList, that.constantExprsList); + return isUnionAll == that.isUnionAll && Objects.equals(outputs, that.outputs) + && Objects.equals(regularChildrenOutputs, that.regularChildrenOutputs); } @Override public int hashCode() { - return Objects.hash(super.hashCode(), hasPushedFilter, constantExprsList); + return Objects.hash(isUnionAll, outputs, regularChildrenOutputs); } @Override @@ -145,64 +208,58 @@ public class LogicalRecursiveCte extends LogicalSetOperation implements Union, O } @Override - public LogicalRecursiveCte withChildren(List<Plan> children) { - return new LogicalRecursiveCte(qualifier, outputs, regularChildrenOutputs, - constantExprsList, hasPushedFilter, children); + public List<? extends Expression> getExpressions() { + return regularChildrenOutputs.stream().flatMap(List::stream).collect(ImmutableList.toImmutableList()); } @Override - public LogicalSetOperation withChildrenAndTheirOutputs(List<Plan> children, + public List<Slot> computeOutput() { + return outputs.stream() + .map(NamedExpression::toSlot) + .collect(ImmutableList.toImmutableList()); + } + + @Override + public LogicalRecursiveCte withChildren(List<Plan> children) { + return new LogicalRecursiveCte(isUnionAll, outputs, regularChildrenOutputs, children); + } + + public LogicalRecursiveCte withChildrenAndTheirOutputs(List<Plan> children, List<List<SlotReference>> childrenOutputs) { Preconditions.checkArgument(children.size() == childrenOutputs.size(), "children size %s is not equals with children outputs size %s", children.size(), childrenOutputs.size()); - return new LogicalRecursiveCte(qualifier, outputs, childrenOutputs, constantExprsList, hasPushedFilter, - children); + return new LogicalRecursiveCte(isUnionAll, outputs, childrenOutputs, children); } @Override public LogicalRecursiveCte withGroupExpression(Optional<GroupExpression> groupExpression) { - return new LogicalRecursiveCte(qualifier, outputs, regularChildrenOutputs, constantExprsList, hasPushedFilter, + return new LogicalRecursiveCte(isUnionAll, outputs, regularChildrenOutputs, groupExpression, Optional.of(getLogicalProperties()), children); } @Override public Plan withGroupExprLogicalPropChildren(Optional<GroupExpression> groupExpression, Optional<LogicalProperties> logicalProperties, List<Plan> children) { - return new LogicalRecursiveCte(qualifier, outputs, regularChildrenOutputs, constantExprsList, hasPushedFilter, + return new LogicalRecursiveCte(isUnionAll, outputs, regularChildrenOutputs, groupExpression, logicalProperties, children); } - @Override public LogicalRecursiveCte withNewOutputs(List<NamedExpression> newOutputs) { - return new LogicalRecursiveCte(qualifier, newOutputs, regularChildrenOutputs, constantExprsList, - hasPushedFilter, Optional.empty(), Optional.empty(), children); - } - - public LogicalRecursiveCte withNewOutputsAndConstExprsList(List<NamedExpression> newOutputs, - List<List<NamedExpression>> constantExprsList) { - return new LogicalRecursiveCte(qualifier, newOutputs, regularChildrenOutputs, constantExprsList, - hasPushedFilter, Optional.empty(), Optional.empty(), children); - } - - public LogicalRecursiveCte withChildrenAndConstExprsList(List<Plan> children, - List<List<SlotReference>> childrenOutputs, List<List<NamedExpression>> constantExprsList) { - return new LogicalRecursiveCte(qualifier, outputs, childrenOutputs, constantExprsList, hasPushedFilter, - children); + return new LogicalRecursiveCte(isUnionAll, newOutputs, regularChildrenOutputs, + Optional.empty(), Optional.empty(), children); } - public LogicalRecursiveCte withNewOutputsChildrenAndConstExprsList(List<NamedExpression> newOutputs, - List<Plan> children, - List<List<SlotReference>> childrenOutputs, - List<List<NamedExpression>> constantExprsList) { - return new LogicalRecursiveCte(qualifier, newOutputs, childrenOutputs, constantExprsList, - hasPushedFilter, Optional.empty(), Optional.empty(), children); + public LogicalRecursiveCte withNewOutputsAndChildren(List<NamedExpression> newOutputs, + List<Plan> children, + List<List<SlotReference>> childrenOutputs) { + return new LogicalRecursiveCte(isUnionAll, newOutputs, childrenOutputs, + Optional.empty(), Optional.empty(), children); } - public LogicalRecursiveCte withAllQualifier() { - return new LogicalRecursiveCte(Qualifier.ALL, outputs, regularChildrenOutputs, constantExprsList, - hasPushedFilter, - Optional.empty(), Optional.empty(), children); + @Override + public List<NamedExpression> getOutputs() { + return outputs; } @Override @@ -212,7 +269,7 @@ public class LogicalRecursiveCte extends LogicalSetOperation implements Union, O @Override public void computeUnique(DataTrait.Builder builder) { - if (qualifier == Qualifier.DISTINCT) { + if (!isUnionAll) { builder.addUniqueSlot(ImmutableSet.copyOf(getOutput())); } } @@ -224,19 +281,6 @@ public class LogicalRecursiveCte extends LogicalSetOperation implements Union, O ConnectContext.get().getStatementContext(), this, PhysicalProperties.ANY))); for (int i = 0; i < getOutputs().size(); i++) { Optional<Literal> value = Optional.empty(); - if (!constantExprsList.isEmpty()) { - value = ExpressionUtils.checkConstantExpr(constantExprsList.get(0).get(i), context); - if (!value.isPresent()) { - continue; - } - final int fi = i; - Literal literal = value.get(); - if (constantExprsList.stream() - .map(exprs -> ExpressionUtils.checkConstantExpr(exprs.get(fi), context)) - .anyMatch(val -> !val.isPresent() || !val.get().equals(literal))) { - continue; - } - } for (int childIdx = 0; childIdx < children.size(); childIdx++) { // TODO: use originOutputs = child(childIdx).getOutput() ? List<? extends Slot> originOutputs = regularChildrenOutputs.get(childIdx); @@ -267,10 +311,7 @@ public class LogicalRecursiveCte extends LogicalSetOperation implements Union, O @Override public boolean hasUnboundExpression() { - if (!constantExprsList.isEmpty() && children.isEmpty()) { - return false; - } - return super.hasUnboundExpression(); + return outputs.isEmpty(); } private List<BitSet> mapSlotToIndex(Plan plan, List<Set<Slot>> equalSlotsList) { @@ -384,7 +425,7 @@ public class LogicalRecursiveCte extends LogicalSetOperation implements Union, O Expression otherConstant = namedExpression.child(0); nullable |= otherConstant.nullable(); DataType otherDataType = otherConstant.getDataType(); - commonDataType = getAssignmentCompatibleType(commonDataType, otherDataType); + commonDataType = LogicalSetOperation.getAssignmentCompatibleType(commonDataType, otherDataType); } return Pair.of(commonDataType, nullable); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalRecursiveCte.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalRecursiveCte.java index ef0cc98de0a..c0a7f0222d7 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalRecursiveCte.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalRecursiveCte.java @@ -27,7 +27,7 @@ import org.apache.doris.nereids.trees.expressions.Slot; import org.apache.doris.nereids.trees.expressions.SlotReference; import org.apache.doris.nereids.trees.plans.Plan; import org.apache.doris.nereids.trees.plans.PlanType; -import org.apache.doris.nereids.trees.plans.algebra.Union; +import org.apache.doris.nereids.trees.plans.algebra.RecursiveCte; import org.apache.doris.nereids.trees.plans.visitor.PlanVisitor; import org.apache.doris.nereids.util.Utils; import org.apache.doris.qe.ConnectContext; @@ -44,55 +44,91 @@ import java.util.Map; import java.util.Objects; import java.util.Optional; import java.util.Set; -import java.util.stream.Collectors; /** * PhysicalRecursiveCte is basically like PhysicalUnion */ -public class PhysicalRecursiveCte extends PhysicalSetOperation implements Union { +public class PhysicalRecursiveCte extends AbstractPhysicalPlan implements RecursiveCte { - // in doris, we use union node to present one row relation - private final List<List<NamedExpression>> constantExprsList; + protected final List<NamedExpression> outputs; + protected final List<List<SlotReference>> regularChildrenOutputs; + private final boolean isUnionAll; /** PhysicalRecursiveCte */ - public PhysicalRecursiveCte(Qualifier qualifier, + public PhysicalRecursiveCte(boolean isUnionAll, List<NamedExpression> outputs, List<List<SlotReference>> childrenOutputs, - List<List<NamedExpression>> constantExprsList, LogicalProperties logicalProperties, List<Plan> children) { - super(PlanType.PHYSICAL_RECURSIVE_CTE, qualifier, outputs, childrenOutputs, logicalProperties, children); - this.constantExprsList = ImmutableList.copyOf( - Objects.requireNonNull(constantExprsList, "constantExprsList should not be null")); + this(isUnionAll, outputs, childrenOutputs, Optional.empty(), logicalProperties, children); } /** PhysicalRecursiveCte */ - public PhysicalRecursiveCte(Qualifier qualifier, + public PhysicalRecursiveCte(boolean isUnionAll, List<NamedExpression> outputs, List<List<SlotReference>> childrenOutputs, - List<List<NamedExpression>> constantExprsList, Optional<GroupExpression> groupExpression, LogicalProperties logicalProperties, List<Plan> children) { - super(PlanType.PHYSICAL_RECURSIVE_CTE, qualifier, outputs, childrenOutputs, - groupExpression, logicalProperties, children); - this.constantExprsList = ImmutableList.copyOf( - Objects.requireNonNull(constantExprsList, "constantExprsList should not be null")); + this(isUnionAll, outputs, childrenOutputs, groupExpression, logicalProperties, + PhysicalProperties.ANY, null, children); } /** PhysicalRecursiveCte */ - public PhysicalRecursiveCte(Qualifier qualifier, List<NamedExpression> outputs, - List<List<SlotReference>> childrenOutputs, List<List<NamedExpression>> constantExprsList, + public PhysicalRecursiveCte(boolean isUnionAll, List<NamedExpression> outputs, + List<List<SlotReference>> childrenOutputs, Optional<GroupExpression> groupExpression, LogicalProperties logicalProperties, - PhysicalProperties physicalProperties, Statistics statistics, List<Plan> inputs) { - super(PlanType.PHYSICAL_RECURSIVE_CTE, qualifier, outputs, childrenOutputs, - groupExpression, logicalProperties, physicalProperties, statistics, inputs); - this.constantExprsList = ImmutableList.copyOf( - Objects.requireNonNull(constantExprsList, "constantExprsList should not be null")); + PhysicalProperties physicalProperties, Statistics statistics, List<Plan> children) { + super(PlanType.PHYSICAL_RECURSIVE_CTE, groupExpression, logicalProperties, physicalProperties, + statistics, children.toArray(new Plan[0])); + this.isUnionAll = isUnionAll; + this.outputs = ImmutableList.copyOf(outputs); + this.regularChildrenOutputs = ImmutableList.copyOf(childrenOutputs); } - public List<List<NamedExpression>> getConstantExprsList() { - return constantExprsList; + @Override + public boolean isUnionAll() { + return isUnionAll; + } + + @Override + public List<SlotReference> getRegularChildOutput(int i) { + return regularChildrenOutputs.get(i); + } + + @Override + public List<NamedExpression> getOutputs() { + return outputs; + } + + @Override + public List<Slot> computeOutput() { + return outputs.stream() + .map(NamedExpression::toSlot) + .collect(ImmutableList.toImmutableList()); + } + + @Override + public List<List<SlotReference>> getRegularChildrenOutputs() { + return regularChildrenOutputs; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + PhysicalRecursiveCte that = (PhysicalRecursiveCte) o; + return isUnionAll == that.isUnionAll && Objects.equals(outputs, that.outputs) && Objects.equals( + regularChildrenOutputs, that.regularChildrenOutputs); + } + + @Override + public int hashCode() { + return Objects.hash(isUnionAll, outputs, regularChildrenOutputs); } @Override @@ -100,14 +136,18 @@ public class PhysicalRecursiveCte extends PhysicalSetOperation implements Union return visitor.visitPhysicalRecursiveCte(this, context); } + @Override + public List<? extends Expression> getExpressions() { + return regularChildrenOutputs.stream().flatMap(List::stream).collect(ImmutableList.toImmutableList()); + } + @Override public String toString() { return Utils.toSqlString("PhysicalRecursiveCte" + "[" + id.asInt() + "]" + getGroupIdWithPrefix(), "stats", statistics, - "qualifier", qualifier, + "isUnionAll", isUnionAll, "outputs", outputs, - "regularChildrenOutputs", regularChildrenOutputs, - "constantExprsList", constantExprsList); + "regularChildrenOutputs", regularChildrenOutputs); } @Override @@ -117,11 +157,6 @@ public class PhysicalRecursiveCte extends PhysicalSetOperation implements Union && context.getSessionVariable().getDetailShapePlanNodesSet().contains(getClass().getSimpleName())) { StringBuilder builder = new StringBuilder(); builder.append(getClass().getSimpleName()); - builder.append("(constantExprsList="); - builder.append(constantExprsList.stream() - .map(exprs -> exprs.stream().map(Expression::shapeInfo) - .collect(Collectors.joining(", ", "[", "]"))) - .collect(Collectors.joining(", ", "[", "]"))); builder.append(")"); return builder.toString(); } else { @@ -131,39 +166,39 @@ public class PhysicalRecursiveCte extends PhysicalSetOperation implements Union @Override public PhysicalRecursiveCte withChildren(List<Plan> children) { - return new PhysicalRecursiveCte(qualifier, outputs, regularChildrenOutputs, constantExprsList, groupExpression, + return new PhysicalRecursiveCte(isUnionAll, outputs, regularChildrenOutputs, groupExpression, getLogicalProperties(), children); } @Override public PhysicalRecursiveCte withGroupExpression(Optional<GroupExpression> groupExpression) { - return new PhysicalRecursiveCte(qualifier, outputs, regularChildrenOutputs, constantExprsList, + return new PhysicalRecursiveCte(isUnionAll, outputs, regularChildrenOutputs, groupExpression, getLogicalProperties(), children); } @Override public Plan withGroupExprLogicalPropChildren(Optional<GroupExpression> groupExpression, Optional<LogicalProperties> logicalProperties, List<Plan> children) { - return new PhysicalRecursiveCte(qualifier, outputs, regularChildrenOutputs, constantExprsList, + return new PhysicalRecursiveCte(isUnionAll, outputs, regularChildrenOutputs, groupExpression, logicalProperties.get(), children); } @Override public PhysicalRecursiveCte withPhysicalPropertiesAndStats( PhysicalProperties physicalProperties, Statistics statistics) { - return new PhysicalRecursiveCte(qualifier, outputs, regularChildrenOutputs, constantExprsList, + return new PhysicalRecursiveCte(isUnionAll, outputs, regularChildrenOutputs, groupExpression, getLogicalProperties(), physicalProperties, statistics, children); } @Override public PhysicalRecursiveCte resetLogicalProperties() { - return new PhysicalRecursiveCte(qualifier, outputs, regularChildrenOutputs, constantExprsList, + return new PhysicalRecursiveCte(isUnionAll, outputs, regularChildrenOutputs, Optional.empty(), null, physicalProperties, statistics, children); } @Override public void computeUnique(DataTrait.Builder builder) { - if (qualifier == Qualifier.DISTINCT) { + if (!isUnionAll) { builder.addUniqueSlot(ImmutableSet.copyOf(getOutput())); } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/visitor/PlanVisitor.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/visitor/PlanVisitor.java index d34de256608..66918845e00 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/visitor/PlanVisitor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/visitor/PlanVisitor.java @@ -383,7 +383,7 @@ public abstract class PlanVisitor<R, C> implements CommandVisitor<R, C>, Relatio } public R visitPhysicalRecursiveCte(PhysicalRecursiveCte recursiveCte, C context) { - return visitPhysicalSetOperation(recursiveCte, context); + return visit(recursiveCte, context); } public R visitAbstractPhysicalSort(AbstractPhysicalSort<? extends Plan> sort, C context) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/RecursiveCteScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/RecursiveCteScanNode.java index 103abd8b790..35065314bab 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/RecursiveCteScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/RecursiveCteScanNode.java @@ -17,12 +17,12 @@ package org.apache.doris.planner; +import org.apache.doris.analysis.Expr; import org.apache.doris.analysis.TupleDescriptor; import org.apache.doris.catalog.Env; import org.apache.doris.common.UserException; import org.apache.doris.statistics.StatisticalType; import org.apache.doris.system.Backend; -import org.apache.doris.thrift.TDataGenScanRange; import org.apache.doris.thrift.TExplainLevel; import org.apache.doris.thrift.TNetworkAddress; import org.apache.doris.thrift.TPlanNode; @@ -63,18 +63,13 @@ public class RecursiveCteScanNode extends ScanNode { Collections.shuffle(backendList); Backend selectedBackend = backendList.get(0); - // create a dummy scan range - TScanRange scanRange = new TScanRange(); - TDataGenScanRange dataGenScanRange = new TDataGenScanRange(); - scanRange.setDataGenScanRange(dataGenScanRange); - // create scan range locations - TScanRangeLocations locations = new TScanRangeLocations(); TScanRangeLocation location = new TScanRangeLocation(); location.setBackendId(selectedBackend.getId()); location.setServer(new TNetworkAddress(selectedBackend.getHost(), selectedBackend.getBePort())); + TScanRangeLocations locations = new TScanRangeLocations(); locations.addToLocations(location); - locations.setScanRange(scanRange); + locations.setScanRange(new TScanRange()); scanRangeLocations.add(locations); } @@ -96,8 +91,11 @@ public class RecursiveCteScanNode extends ScanNode { @Override public String getNodeExplainString(String prefix, TExplainLevel detailLevel) { StringBuilder output = new StringBuilder(); - output.append(prefix).append("Recursive Cte: ").append(getTableIf().getName()); - output.append("\n"); + output.append(prefix).append("Recursive Cte: ").append(getTableIf().getName()).append("\n"); + if (!conjuncts.isEmpty()) { + Expr expr = convertConjunctsToAndCompoundPredicate(conjuncts); + output.append(prefix).append("PREDICATES: ").append(expr.toSql()).append("\n"); + } return output.toString(); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/runtime/ThriftPlansBuilder.java b/fe/fe-core/src/main/java/org/apache/doris/qe/runtime/ThriftPlansBuilder.java index 5fcd14fcb79..f6f24aae153 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/runtime/ThriftPlansBuilder.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/runtime/ThriftPlansBuilder.java @@ -17,6 +17,7 @@ package org.apache.doris.qe.runtime; +import org.apache.doris.analysis.Expr; import org.apache.doris.catalog.AIResource; import org.apache.doris.catalog.Env; import org.apache.doris.catalog.Resource; @@ -41,6 +42,9 @@ import org.apache.doris.planner.MultiCastDataSink; import org.apache.doris.planner.OlapScanNode; import org.apache.doris.planner.OlapTableSink; import org.apache.doris.planner.PlanFragment; +import org.apache.doris.planner.PlanFragmentId; +import org.apache.doris.planner.RecursiveCteNode; +import org.apache.doris.planner.RecursiveCteScanNode; import org.apache.doris.planner.ScanNode; import org.apache.doris.planner.SortNode; import org.apache.doris.qe.ConnectContext; @@ -48,6 +52,7 @@ import org.apache.doris.qe.CoordinatorContext; import org.apache.doris.thrift.PaloInternalServiceVersion; import org.apache.doris.thrift.TAIResource; import org.apache.doris.thrift.TDataSinkType; +import org.apache.doris.thrift.TExpr; import org.apache.doris.thrift.TFileScanRangeParams; import org.apache.doris.thrift.TNetworkAddress; import org.apache.doris.thrift.TPipelineFragmentParams; @@ -56,6 +61,9 @@ import org.apache.doris.thrift.TPipelineInstanceParams; import org.apache.doris.thrift.TPlanFragment; import org.apache.doris.thrift.TPlanFragmentDestination; import org.apache.doris.thrift.TQueryOptions; +import org.apache.doris.thrift.TRecCTENode; +import org.apache.doris.thrift.TRecCTEResetInfo; +import org.apache.doris.thrift.TRecCTETarget; import org.apache.doris.thrift.TRuntimeFilterInfo; import org.apache.doris.thrift.TRuntimeFilterParams; import org.apache.doris.thrift.TScanRangeParams; @@ -79,6 +87,8 @@ import java.util.List; import java.util.Map; import java.util.Map.Entry; import java.util.Set; +import java.util.TreeMap; +import java.util.TreeSet; import java.util.function.BiConsumer; import java.util.function.Supplier; @@ -89,6 +99,7 @@ public class ThriftPlansBuilder { CoordinatorContext coordinatorContext) { List<PipelineDistributedPlan> distributedPlans = coordinatorContext.distributedPlans; + setParamsForRecursiveCteNode(distributedPlans); // we should set runtime predicate first, then we can use heap sort and to thrift setRuntimePredicateIfNeed(coordinatorContext.scanNodes); @@ -571,6 +582,91 @@ public class ThriftPlansBuilder { } } + private static void setParamsForRecursiveCteNode(List<PipelineDistributedPlan> distributedPlans) { + Map<PlanFragmentId, TRecCTETarget> fragmentIdToRecCteTargetMap = new TreeMap<>(); + Map<PlanFragmentId, Set<TNetworkAddress>> fragmentIdToNetworkAddressMap = new TreeMap<>(); + for (PipelineDistributedPlan plan : distributedPlans) { + List<AssignedJob> fragmentAssignedJobs = plan.getInstanceJobs(); + Set<TNetworkAddress> networkAddresses = new TreeSet<>(); + for (AssignedJob assignedJob : fragmentAssignedJobs) { + DistributedPlanWorker distributedPlanWorker = assignedJob.getAssignedWorker(); + networkAddresses.add(new TNetworkAddress(distributedPlanWorker.host(), + distributedPlanWorker.brpcPort())); + } + PlanFragment planFragment = plan.getFragmentJob().getFragment(); + fragmentIdToNetworkAddressMap.put(planFragment.getFragmentId(), networkAddresses); + + List<RecursiveCteScanNode> recursiveCteScanNodes = planFragment.getPlanRoot() + .collectInCurrentFragment(RecursiveCteScanNode.class::isInstance); + if (!recursiveCteScanNodes.isEmpty()) { + if (recursiveCteScanNodes.size() != 1) { + throw new IllegalStateException( + String.format("one fragment can only have 1 recursive cte scan node, but there is %d", + recursiveCteScanNodes.size())); + } + if (fragmentAssignedJobs.isEmpty()) { + throw new IllegalStateException( + "fragmentAssignedJobs is empty for recursive cte scan node"); + } + TRecCTETarget tRecCTETarget = new TRecCTETarget(); + DistributedPlanWorker distributedPlanWorker = fragmentAssignedJobs.get(0).getAssignedWorker(); + tRecCTETarget.setAddr(new TNetworkAddress(distributedPlanWorker.host(), + distributedPlanWorker.brpcPort())); + tRecCTETarget.setFragmentInstanceId(fragmentAssignedJobs.get(0).instanceId()); + tRecCTETarget.setNodeId(recursiveCteScanNodes.get(0).getId().asInt()); + fragmentIdToRecCteTargetMap.put(planFragment.getFragmentId(), tRecCTETarget); + } + + List<RecursiveCteNode> recursiveCteNodes = planFragment.getPlanRoot() + .collectInCurrentFragment(RecursiveCteNode.class::isInstance); + if (!recursiveCteNodes.isEmpty()) { + if (recursiveCteNodes.size() != 1) { + throw new IllegalStateException( + String.format("one fragment can only have 1 recursive cte node, but there is %d", + recursiveCteNodes.size())); + } + + List<TRecCTETarget> targets = new ArrayList<>(); + List<TRecCTEResetInfo> fragmentsToReset = new ArrayList<>(); + // PhysicalPlanTranslator will swap recursiveCteNodes's child fragment, + // so we get recursive one by 1st child + List<PlanFragment> childFragments = new ArrayList<>(); + planFragment.getChild(0).collectAll(PlanFragment.class::isInstance, childFragments); + for (PlanFragment child : childFragments) { + PlanFragmentId childFragmentId = child.getFragmentId(); + TRecCTETarget tRecCTETarget = fragmentIdToRecCteTargetMap.getOrDefault(childFragmentId, null); + if (tRecCTETarget != null) { + targets.add(tRecCTETarget); + } + Set<TNetworkAddress> tNetworkAddresses = fragmentIdToNetworkAddressMap.get(childFragmentId); + if (tNetworkAddresses == null) { + throw new IllegalStateException( + String.format("can't find TNetworkAddress for fragment %d", childFragmentId)); + } + for (TNetworkAddress address : tNetworkAddresses) { + TRecCTEResetInfo tRecCTEResetInfo = new TRecCTEResetInfo(); + tRecCTEResetInfo.setFragmentId(childFragmentId.asInt()); + tRecCTEResetInfo.setAddr(address); + fragmentsToReset.add(tRecCTEResetInfo); + } + } + + RecursiveCteNode recursiveCteNode = recursiveCteNodes.get(0); + List<List<Expr>> materializedResultExprLists = recursiveCteNode.getMaterializedResultExprLists(); + List<List<TExpr>> texprLists = new ArrayList<>(materializedResultExprLists.size()); + for (List<Expr> exprList : materializedResultExprLists) { + texprLists.add(Expr.treesToThrift(exprList)); + } + TRecCTENode tRecCTENode = new TRecCTENode(); + tRecCTENode.setIsUnionAll(recursiveCteNode.isUnionAll()); + tRecCTENode.setTargets(targets); + tRecCTENode.setFragmentsToReset(fragmentsToReset); + tRecCTENode.setResultExprLists(texprLists); + recursiveCteNode.settRecCTENode(tRecCTENode); + } + } + } + private static class PerNodeScanParams { Map<Integer, List<TScanRangeParams>> perNodeScanRanges; --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
