This is an automated email from the ASF dual-hosted git repository. starocean999 pushed a commit to branch dev_rec in repository https://gitbox.apache.org/repos/asf/doris.git
commit aa680e5c1271c64f7e7815315067562f12cfc298 Author: lichi <[email protected]> AuthorDate: Fri Oct 17 12:41:40 2025 +0800 update fe code --- .../antlr4/org/apache/doris/nereids/DorisParser.g4 | 5 +- .../glue/translator/PhysicalPlanTranslator.java | 8 +- .../doris/nereids/parser/LogicalPlanBuilder.java | 6 +- .../processor/pre/PullUpSubqueryAliasToCTE.java | 8 +- .../doris/nereids/rules/analysis/AnalyzeCTE.java | 50 ++++-- .../doris/nereids/rules/rewrite/ColumnPruning.java | 39 +---- .../plans/commands/UpdateMvByPartitionCommand.java | 3 +- .../nereids/trees/plans/logical/LogicalCTE.java | 27 ++- .../trees/plans/logical/LogicalRecursiveCte.java | 183 --------------------- .../logical/LogicalRecursiveCteRecursiveChild.java | 5 + .../plans/logical/LogicalRecursiveCteScan.java | 27 ++- .../trees/plans/logical/LogicalSubQueryAlias.java | 53 +++--- .../PhysicalRecursiveCteRecursiveChild.java | 5 + .../org/apache/doris/planner/RecursiveCteNode.java | 27 ++- .../apache/doris/planner/RecursiveCteScanNode.java | 9 + .../doris/qe/runtime/ThriftPlansBuilder.java | 14 +- gensrc/thrift/PaloInternalService.thrift | 1 + 17 files changed, 182 insertions(+), 288 deletions(-) diff --git a/fe/fe-core/src/main/antlr4/org/apache/doris/nereids/DorisParser.g4 b/fe/fe-core/src/main/antlr4/org/apache/doris/nereids/DorisParser.g4 index 162c6f21688..4add5f55556 100644 --- a/fe/fe-core/src/main/antlr4/org/apache/doris/nereids/DorisParser.g4 +++ b/fe/fe-core/src/main/antlr4/org/apache/doris/nereids/DorisParser.g4 @@ -1200,11 +1200,11 @@ querySpecification ; cte - : WITH aliasQuery (COMMA aliasQuery)* + : WITH RECURSIVE? aliasQuery (COMMA aliasQuery)* ; aliasQuery - : RECURSIVE? identifier columnAliases? AS LEFT_PAREN query RIGHT_PAREN + : identifier columnAliases? AS LEFT_PAREN query RIGHT_PAREN ; columnAliases @@ -2128,6 +2128,7 @@ nonReserved | RANDOM | RECENT | RECOVER + | RECURSIVE | RECYCLE | REFRESH | REPEATABLE diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java index 3c0007a12c0..8c35e65d1f1 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java @@ -2256,8 +2256,10 @@ public class PhysicalPlanTranslator extends DefaultPlanVisitor<PlanFragment, Pla if (inputPlanNode instanceof OlapScanNode) { ((OlapScanNode) inputPlanNode).updateRequiredSlots(context, requiredByProjectSlotIdSet); } - updateScanSlotsMaterialization((ScanNode) inputPlanNode, requiredSlotIdSet, - requiredByProjectSlotIdSet, context); + if (!(inputPlanNode instanceof RecursiveCteScanNode)) { + updateScanSlotsMaterialization((ScanNode) inputPlanNode, requiredSlotIdSet, + requiredByProjectSlotIdSet, context); + } } else { if (project.child() instanceof PhysicalDeferMaterializeTopN) { inputFragment.setOutputExprs(allProjectionExprs); @@ -2339,6 +2341,8 @@ public class PhysicalPlanTranslator extends DefaultPlanVisitor<PlanFragment, Pla recursiveCteFragment.setHasColocatePlanNode(true); recursiveCteNode.setColocate(true); } + recursiveCteFragment.updateDataPartition(DataPartition.UNPARTITIONED); + recursiveCteFragment.setOutputPartition(DataPartition.UNPARTITIONED); return recursiveCteFragment; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/parser/LogicalPlanBuilder.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/parser/LogicalPlanBuilder.java index 3a3481132b3..ebcbe07c7bb 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/parser/LogicalPlanBuilder.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/parser/LogicalPlanBuilder.java @@ -2134,7 +2134,8 @@ public class LogicalPlanBuilder extends DorisParserBaseVisitor<Object> { if (ctx == null) { return plan; } - return new LogicalCTE<>((List) visit(ctx.aliasQuery(), LogicalSubQueryAlias.class), plan); + return new LogicalCTE<>(ctx.RECURSIVE() != null, + (List) visit(ctx.aliasQuery(), LogicalSubQueryAlias.class), plan); } /** @@ -2149,8 +2150,7 @@ public class LogicalPlanBuilder extends DorisParserBaseVisitor<Object> { .map(RuleContext::getText) .collect(ImmutableList.toImmutableList()) ); - return new LogicalSubQueryAlias<>(ctx.identifier().getText(), columnNames, ctx.RECURSIVE() != null, - queryPlan); + return new LogicalSubQueryAlias<>(ctx.identifier().getText(), columnNames, queryPlan); }); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/pre/PullUpSubqueryAliasToCTE.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/pre/PullUpSubqueryAliasToCTE.java index 31a205d5ed5..1dfe63c9e67 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/pre/PullUpSubqueryAliasToCTE.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/pre/PullUpSubqueryAliasToCTE.java @@ -47,10 +47,11 @@ public class PullUpSubqueryAliasToCTE extends PlanPreprocessor { subQueryAliases.addAll(logicalCTE.getAliasQueries()); subQueryAliases.addAll(aliasQueries); return topPlan.withChildren( - new LogicalCTE<>(subQueryAliases, (LogicalPlan) ((UnboundResultSink) topPlan).child())); + new LogicalCTE<>(logicalCTE.isRecursiveCte(), subQueryAliases, + (LogicalPlan) ((UnboundResultSink) topPlan).child())); } return topPlan.withChildren( - new LogicalCTE<>(aliasQueries, (LogicalPlan) ((UnboundResultSink) topPlan).child())); + new LogicalCTE<>(false, aliasQueries, (LogicalPlan) ((UnboundResultSink) topPlan).child())); } return topPlan; } @@ -86,7 +87,8 @@ public class PullUpSubqueryAliasToCTE extends PlanPreprocessor { subQueryAliasesOfCte.addAll(logicalCTE.getAliasQueries()); subQueryAliasesOfCte.addAll(aliasQueries); aliasQueries = new ArrayList<>(); - return new LogicalCTE<>(subQueryAliasesOfCte, (LogicalPlan) newLogicalCTE.child()); + return new LogicalCTE<>(newLogicalCTE.isRecursiveCte(), subQueryAliasesOfCte, + (LogicalPlan) newLogicalCTE.child()); } return cte; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/AnalyzeCTE.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/AnalyzeCTE.java index 906d572c6c1..b7ef605c395 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/AnalyzeCTE.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/analysis/AnalyzeCTE.java @@ -39,6 +39,7 @@ import org.apache.doris.nereids.trees.plans.logical.LogicalPlan; import org.apache.doris.nereids.trees.plans.logical.LogicalProject; import org.apache.doris.nereids.trees.plans.logical.LogicalRecursiveCte; import org.apache.doris.nereids.trees.plans.logical.LogicalRecursiveCteRecursiveChild; +import org.apache.doris.nereids.trees.plans.logical.LogicalRecursiveCteScan; import org.apache.doris.nereids.trees.plans.logical.LogicalSubQueryAlias; import org.apache.doris.nereids.trees.plans.logical.LogicalUnion; import org.apache.doris.nereids.trees.plans.logical.ProjectProcessor; @@ -108,7 +109,7 @@ public class AnalyzeCTE extends OneAnalysisRuleFactory { List<LogicalCTEProducer<Plan>> cteProducerPlans = new ArrayList<>(); for (LogicalSubQueryAlias<Plan> aliasQuery : aliasQueries) { // we should use a chain to ensure visible of cte - if (aliasQuery.isRecursiveCte()) { + if (aliasQuery.isRecursiveCte() && logicalCTE.isRecursiveCte()) { Pair<CTEContext, LogicalCTEProducer<Plan>> result = analyzeRecursiveCte(aliasQuery, outerCteCtx, cascadesContext); outerCteCtx = result.first; @@ -150,13 +151,10 @@ public class AnalyzeCTE extends OneAnalysisRuleFactory { }); cascadesContext.addPlanProcesses(innerAnchorCascadesCtx.getPlanProcesses()); LogicalPlan analyzedAnchorChild = (LogicalPlan) innerAnchorCascadesCtx.getRewritePlan(); - List<NamedExpression> anchorNullableOutputs = new ArrayList<>(analyzedAnchorChild.getOutput().size()); - for (Slot slot : analyzedAnchorChild.getOutput()) { - anchorNullableOutputs.add(new Alias(new Nullable(slot), slot.getName())); - } - analyzedAnchorChild = new LogicalProject<>(anchorNullableOutputs, analyzedAnchorChild); checkColumnAlias(aliasQuery, analyzedAnchorChild.getOutput()); - + // make all output nullable + analyzedAnchorChild = forceOutputNullable(analyzedAnchorChild, + aliasQuery.getColumnAliases().orElse(ImmutableList.of())); // analyze recursive child LogicalPlan recursiveChild = (LogicalPlan) parsedCtePlan.child(1); CascadesContext innerRecursiveCascadesCtx = CascadesContext.newContextWithCteContext( @@ -167,26 +165,28 @@ public class AnalyzeCTE extends OneAnalysisRuleFactory { }); cascadesContext.addPlanProcesses(innerRecursiveCascadesCtx.getPlanProcesses()); LogicalPlan analyzedRecursiveChild = (LogicalPlan) innerRecursiveCascadesCtx.getRewritePlan(); - List<DataType> anchorChildOutputTypes = new ArrayList<>(analyzedAnchorChild.getOutput().size()); - for (Slot slot : analyzedAnchorChild.getOutput()) { + List<LogicalRecursiveCteScan> recursiveCteScanList = analyzedRecursiveChild + .collectToList(LogicalRecursiveCteScan.class::isInstance); + if (recursiveCteScanList.size() > 1) { + throw new AnalysisException(String.format("can have only 1 recursive cte instance, but there is %d", + recursiveCteScanList.size())); + } + List<Slot> anchorChildOutputs = analyzedAnchorChild.getOutput(); + List<DataType> anchorChildOutputTypes = new ArrayList<>(anchorChildOutputs.size()); + for (Slot slot : anchorChildOutputs) { anchorChildOutputTypes.add(slot.getDataType()); } List<Slot> recursiveChildOutputs = analyzedRecursiveChild.getOutput(); for (int i = 0; i < recursiveChildOutputs.size(); ++i) { - if (recursiveChildOutputs.get(i).getDataType() != anchorChildOutputTypes.get(i)) { + if (!recursiveChildOutputs.get(i).getDataType().equals(anchorChildOutputTypes.get(i))) { throw new AnalysisException(String.format("recursive child's %d column's datatype in select list %s " + "is different from anchor child's output datatype %s, please add cast manually " + "to get expect datatype", i + 1, recursiveChildOutputs.get(i).getDataType(), anchorChildOutputTypes.get(i))); } } - - List<NamedExpression> recursiveNullableOutputs = new ArrayList<>(analyzedRecursiveChild.getOutput().size()); - for (Slot slot : analyzedRecursiveChild.getOutput()) { - recursiveNullableOutputs.add(new Alias(new Nullable(slot), slot.getName())); - } - analyzedRecursiveChild = new LogicalProject<>(recursiveNullableOutputs, analyzedRecursiveChild); - analyzedRecursiveChild = new LogicalRecursiveCteRecursiveChild<>(analyzedRecursiveChild); + analyzedRecursiveChild = new LogicalRecursiveCteRecursiveChild<>(forceOutputNullable(analyzedRecursiveChild, + ImmutableList.of())); // create LogicalRecursiveCte LogicalUnion logicalUnion = (LogicalUnion) parsedCtePlan; @@ -223,6 +223,22 @@ public class AnalyzeCTE extends OneAnalysisRuleFactory { return Pair.of(outerCteCtx, cteProducer); } + private LogicalPlan forceOutputNullable(LogicalPlan logicalPlan, List<String> aliasNames) { + List<Slot> oldOutputs = logicalPlan.getOutput(); + int size = oldOutputs.size(); + List<NamedExpression> newOutputs = new ArrayList<>(oldOutputs.size()); + if (!aliasNames.isEmpty()) { + for (int i = 0; i < size; ++i) { + newOutputs.add(new Alias(new Nullable(oldOutputs.get(i)), aliasNames.get(i))); + } + } else { + for (Slot slot : oldOutputs) { + newOutputs.add(new Alias(new Nullable(slot), slot.getName())); + } + } + return new LogicalProject<>(newOutputs, logicalPlan); + } + /** * check columnAliases' size and name */ diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/ColumnPruning.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/ColumnPruning.java index 30a00314fbf..dfbf13cedb7 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/ColumnPruning.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/ColumnPruning.java @@ -216,43 +216,8 @@ public class ColumnPruning extends DefaultPlanRewriter<PruneContext> implements @Override public Plan visitLogicalRecursiveCte(LogicalRecursiveCte recursiveCte, PruneContext context) { - // LogicalRecursiveCte is basically like LogicalUnion, so just do same as LogicalUnion - if (!recursiveCte.isUnionAll()) { - return skipPruneThisAndFirstLevelChildren(recursiveCte); - } - LogicalRecursiveCte prunedOutputRecursiveCte = pruneRecursiveCteOutput(recursiveCte, context); - // start prune children of recursiveCte - List<Slot> originOutput = recursiveCte.getOutput(); - Set<Slot> prunedOutput = prunedOutputRecursiveCte.getOutputSet(); - List<Integer> prunedOutputIndexes = IntStream.range(0, originOutput.size()) - .filter(index -> prunedOutput.contains(originOutput.get(index))) - .boxed() - .collect(ImmutableList.toImmutableList()); - - ImmutableList.Builder<Plan> prunedChildren = ImmutableList.builder(); - ImmutableList.Builder<List<SlotReference>> prunedChildrenOutputs = ImmutableList.builder(); - for (int i = 0; i < prunedOutputRecursiveCte.arity(); i++) { - List<SlotReference> regularChildOutputs = prunedOutputRecursiveCte.getRegularChildOutput(i); - - RoaringBitmap prunedChildOutputExprIds = new RoaringBitmap(); - Builder<SlotReference> prunedChildOutputBuilder - = ImmutableList.builderWithExpectedSize(regularChildOutputs.size()); - for (Integer index : prunedOutputIndexes) { - SlotReference slot = regularChildOutputs.get(index); - prunedChildOutputBuilder.add(slot); - prunedChildOutputExprIds.add(slot.getExprId().asInt()); - } - - List<SlotReference> prunedChildOutput = prunedChildOutputBuilder.build(); - Plan prunedChild = doPruneChild( - prunedOutputRecursiveCte, prunedOutputRecursiveCte.child(i), prunedChildOutputExprIds, - prunedChildOutput, true - ); - prunedChildrenOutputs.add(prunedChildOutput); - prunedChildren.add(prunedChild); - } - return prunedOutputRecursiveCte.withChildrenAndTheirOutputs(prunedChildren.build(), - prunedChildrenOutputs.build()); + // keep LogicalRecursiveCte's output unchanged + return skipPruneThisAndFirstLevelChildren(recursiveCte); } // union can not prune children by the common logic, we must override visit method to write special code. diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/UpdateMvByPartitionCommand.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/UpdateMvByPartitionCommand.java index 27758175527..c33db474d06 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/UpdateMvByPartitionCommand.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/UpdateMvByPartitionCommand.java @@ -267,7 +267,8 @@ public class UpdateMvByPartitionCommand extends InsertOverwriteTableCommand { ); rewrittenSubQueryAlias.add(subQueryAlias.withChildren(subQueryAliasChildren)); } - return super.visitLogicalCTE(new LogicalCTE<>(rewrittenSubQueryAlias, cte.child()), predicates); + return super.visitLogicalCTE(new LogicalCTE<>(cte.isRecursiveCte(), + rewrittenSubQueryAlias, cte.child()), predicates); } @Override diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalCTE.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalCTE.java index 4f810c3b6de..5ceb0e55bca 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalCTE.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalCTE.java @@ -41,21 +41,28 @@ import java.util.stream.Collectors; public class LogicalCTE<CHILD_TYPE extends Plan> extends LogicalUnary<CHILD_TYPE> implements PropagateFuncDeps { private final List<LogicalSubQueryAlias<Plan>> aliasQueries; + private final boolean isRecursiveCte; - public LogicalCTE(List<LogicalSubQueryAlias<Plan>> aliasQueries, CHILD_TYPE child) { - this(aliasQueries, Optional.empty(), Optional.empty(), child); + public LogicalCTE(boolean isRecursiveCte, List<LogicalSubQueryAlias<Plan>> aliasQueries, CHILD_TYPE child) { + this(isRecursiveCte, aliasQueries, Optional.empty(), Optional.empty(), child); } - public LogicalCTE(List<LogicalSubQueryAlias<Plan>> aliasQueries, Optional<GroupExpression> groupExpression, - Optional<LogicalProperties> logicalProperties, CHILD_TYPE child) { + public LogicalCTE(boolean isRecursiveCte, List<LogicalSubQueryAlias<Plan>> aliasQueries, + Optional<GroupExpression> groupExpression, Optional<LogicalProperties> logicalProperties, + CHILD_TYPE child) { super(PlanType.LOGICAL_CTE, groupExpression, logicalProperties, child); this.aliasQueries = ImmutableList.copyOf(Objects.requireNonNull(aliasQueries, "aliasQueries can not be null")); + this.isRecursiveCte = isRecursiveCte; } public List<LogicalSubQueryAlias<Plan>> getAliasQueries() { return aliasQueries; } + public boolean isRecursiveCte() { + return isRecursiveCte; + } + @Override public List<? extends Plan> extraPlans() { return aliasQueries; @@ -74,6 +81,7 @@ public class LogicalCTE<CHILD_TYPE extends Plan> extends LogicalUnary<CHILD_TYPE @Override public String toString() { return Utils.toSqlString("LogicalCTE", + "isRecursiveCte", isRecursiveCte, "aliasQueries", aliasQueries ); } @@ -105,18 +113,18 @@ public class LogicalCTE<CHILD_TYPE extends Plan> extends LogicalUnary<CHILD_TYPE return false; } LogicalCTE that = (LogicalCTE) o; - return aliasQueries.equals(that.aliasQueries); + return aliasQueries.equals(that.aliasQueries) && isRecursiveCte == that.isRecursiveCte; } @Override public int hashCode() { - return Objects.hash(aliasQueries); + return Objects.hash(aliasQueries, isRecursiveCte); } @Override public Plan withChildren(List<Plan> children) { Preconditions.checkArgument(aliasQueries.size() > 0); - return new LogicalCTE<>(aliasQueries, children.get(0)); + return new LogicalCTE<>(isRecursiveCte, aliasQueries, children.get(0)); } @Override @@ -131,13 +139,14 @@ public class LogicalCTE<CHILD_TYPE extends Plan> extends LogicalUnary<CHILD_TYPE @Override public LogicalCTE<CHILD_TYPE> withGroupExpression(Optional<GroupExpression> groupExpression) { - return new LogicalCTE<>(aliasQueries, groupExpression, Optional.of(getLogicalProperties()), child()); + return new LogicalCTE<>(isRecursiveCte, aliasQueries, groupExpression, + Optional.of(getLogicalProperties()), child()); } @Override public Plan withGroupExprLogicalPropChildren(Optional<GroupExpression> groupExpression, Optional<LogicalProperties> logicalProperties, List<Plan> children) { Preconditions.checkArgument(aliasQueries.size() > 0); - return new LogicalCTE<>(aliasQueries, groupExpression, logicalProperties, children.get(0)); + return new LogicalCTE<>(isRecursiveCte, aliasQueries, groupExpression, logicalProperties, children.get(0)); } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalRecursiveCte.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalRecursiveCte.java index 3958d5b4df1..4a727476d35 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalRecursiveCte.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalRecursiveCte.java @@ -17,14 +17,10 @@ package org.apache.doris.nereids.trees.plans.logical; -import org.apache.doris.common.Pair; -import org.apache.doris.nereids.CascadesContext; import org.apache.doris.nereids.exceptions.AnalysisException; import org.apache.doris.nereids.memo.GroupExpression; import org.apache.doris.nereids.properties.DataTrait; import org.apache.doris.nereids.properties.LogicalProperties; -import org.apache.doris.nereids.properties.PhysicalProperties; -import org.apache.doris.nereids.rules.expression.ExpressionRewriteContext; import org.apache.doris.nereids.trees.expressions.Alias; import org.apache.doris.nereids.trees.expressions.Cast; import org.apache.doris.nereids.trees.expressions.ExprId; @@ -33,30 +29,20 @@ import org.apache.doris.nereids.trees.expressions.NamedExpression; import org.apache.doris.nereids.trees.expressions.Slot; import org.apache.doris.nereids.trees.expressions.SlotReference; import org.apache.doris.nereids.trees.expressions.StatementScopeIdGenerator; -import org.apache.doris.nereids.trees.expressions.literal.Literal; import org.apache.doris.nereids.trees.plans.Plan; import org.apache.doris.nereids.trees.plans.PlanType; import org.apache.doris.nereids.trees.plans.algebra.RecursiveCte; import org.apache.doris.nereids.trees.plans.visitor.PlanVisitor; import org.apache.doris.nereids.types.DataType; -import org.apache.doris.nereids.util.ExpressionUtils; import org.apache.doris.nereids.util.TypeCoercionUtils; import org.apache.doris.nereids.util.Utils; -import org.apache.doris.qe.ConnectContext; import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableList; -import com.google.common.collect.ImmutableSet; -import com.google.common.collect.Lists; -import java.util.ArrayList; -import java.util.BitSet; -import java.util.HashMap; import java.util.List; -import java.util.Map; import java.util.Objects; import java.util.Optional; -import java.util.Set; /** * LogicalRecursiveCte is basically like LogicalUnion @@ -269,44 +255,10 @@ public class LogicalRecursiveCte extends AbstractLogicalPlan implements Recursiv @Override public void computeUnique(DataTrait.Builder builder) { - if (!isUnionAll) { - builder.addUniqueSlot(ImmutableSet.copyOf(getOutput())); - } } @Override public void computeUniform(DataTrait.Builder builder) { - final Optional<ExpressionRewriteContext> context = ConnectContext.get() == null ? Optional.empty() - : Optional.of(new ExpressionRewriteContext(CascadesContext.initContext( - ConnectContext.get().getStatementContext(), this, PhysicalProperties.ANY))); - for (int i = 0; i < getOutputs().size(); i++) { - Optional<Literal> value = Optional.empty(); - for (int childIdx = 0; childIdx < children.size(); childIdx++) { - // TODO: use originOutputs = child(childIdx).getOutput() ? - List<? extends Slot> originOutputs = regularChildrenOutputs.get(childIdx); - Slot slot = originOutputs.get(i); - Optional<Expression> childValue = child(childIdx).getLogicalProperties() - .getTrait().getUniformValue(slot); - if (childValue == null || !childValue.isPresent() || !childValue.get().isConstant()) { - value = Optional.empty(); - break; - } - Optional<Literal> constExprOpt = ExpressionUtils.checkConstantExpr(childValue.get(), context); - if (!constExprOpt.isPresent()) { - value = Optional.empty(); - break; - } - if (!value.isPresent()) { - value = constExprOpt; - } else if (!value.equals(constExprOpt)) { - value = Optional.empty(); - break; - } - } - if (value.isPresent()) { - builder.addUniformSlotAndLiteral(getOutputs().get(i).toSlot(), value.get()); - } - } } @Override @@ -314,147 +266,12 @@ public class LogicalRecursiveCte extends AbstractLogicalPlan implements Recursiv return outputs.isEmpty(); } - private List<BitSet> mapSlotToIndex(Plan plan, List<Set<Slot>> equalSlotsList) { - Map<Slot, Integer> slotToIndex = new HashMap<>(); - for (int i = 0; i < plan.getOutput().size(); i++) { - slotToIndex.put(plan.getOutput().get(i), i); - } - List<BitSet> equalSlotIndicesList = new ArrayList<>(); - for (Set<Slot> equalSlots : equalSlotsList) { - BitSet equalSlotIndices = new BitSet(); - for (Slot slot : equalSlots) { - if (slotToIndex.containsKey(slot)) { - equalSlotIndices.set(slotToIndex.get(slot)); - } - } - if (equalSlotIndices.cardinality() > 1) { - equalSlotIndicesList.add(equalSlotIndices); - } - } - return equalSlotIndicesList; - } - @Override public void computeEqualSet(DataTrait.Builder builder) { - if (children.isEmpty()) { - return; - } - - // Get the list of equal slot sets and their corresponding index mappings for the first child - List<Set<Slot>> childEqualSlotsList = child(0).getLogicalProperties() - .getTrait().calAllEqualSet(); - List<BitSet> childEqualSlotsIndicesList = mapSlotToIndex(child(0), childEqualSlotsList); - List<BitSet> unionEqualSlotIndicesList = new ArrayList<>(childEqualSlotsIndicesList); - - // Traverse all children and find the equal sets that exist in all children - for (int i = 1; i < children.size(); i++) { - Plan child = children.get(i); - - // Get the equal slot sets for the current child - childEqualSlotsList = child.getLogicalProperties().getTrait().calAllEqualSet(); - - // Map slots to indices for the current child - childEqualSlotsIndicesList = mapSlotToIndex(child, childEqualSlotsList); - - // Only keep the equal pairs that exist in all children of the union - // This is done by calculating the intersection of all children's equal slot indices - for (BitSet unionEqualSlotIndices : unionEqualSlotIndicesList) { - BitSet intersect = new BitSet(); - for (BitSet childEqualSlotIndices : childEqualSlotsIndicesList) { - if (unionEqualSlotIndices.intersects(childEqualSlotIndices)) { - intersect = childEqualSlotIndices; - break; - } - } - unionEqualSlotIndices.and(intersect); - } - } - - // Build the functional dependencies for the output slots - List<Slot> outputList = getOutput(); - for (BitSet equalSlotIndices : unionEqualSlotIndicesList) { - if (equalSlotIndices.cardinality() <= 1) { - continue; - } - int first = equalSlotIndices.nextSetBit(0); - int next = equalSlotIndices.nextSetBit(first + 1); - while (next > 0) { - builder.addEqualPair(outputList.get(first), outputList.get(next)); - next = equalSlotIndices.nextSetBit(next + 1); - } - } } @Override public void computeFd(DataTrait.Builder builder) { // don't generate } - - /** castCommonDataTypeAndNullableByConstants */ - public static Pair<List<List<NamedExpression>>, List<Boolean>> castCommonDataTypeAndNullableByConstants( - List<List<NamedExpression>> constantExprsList) { - int columnCount = constantExprsList.isEmpty() ? 0 : constantExprsList.get(0).size(); - Pair<List<DataType>, List<Boolean>> commonInfo = computeCommonDataTypeAndNullable(constantExprsList, - columnCount); - List<List<NamedExpression>> castedRows = castToCommonType(constantExprsList, commonInfo.key(), columnCount); - List<Boolean> nullables = commonInfo.second; - return Pair.of(castedRows, nullables); - } - - private static Pair<List<DataType>, List<Boolean>> computeCommonDataTypeAndNullable( - List<List<NamedExpression>> constantExprsList, int columnCount) { - List<Boolean> nullables = Lists.newArrayListWithCapacity(columnCount); - List<DataType> commonDataTypes = Lists.newArrayListWithCapacity(columnCount); - List<NamedExpression> firstRow = constantExprsList.get(0); - for (int columnId = 0; columnId < columnCount; columnId++) { - Expression constant = firstRow.get(columnId).child(0); - Pair<DataType, Boolean> commonDataTypeAndNullable = computeCommonDataTypeAndNullable(constant, columnId, - constantExprsList); - commonDataTypes.add(commonDataTypeAndNullable.first); - nullables.add(commonDataTypeAndNullable.second); - } - return Pair.of(commonDataTypes, nullables); - } - - private static Pair<DataType, Boolean> computeCommonDataTypeAndNullable( - Expression firstRowExpr, int columnId, List<List<NamedExpression>> constantExprsList) { - DataType commonDataType = firstRowExpr.getDataType(); - boolean nullable = firstRowExpr.nullable(); - for (int rowId = 1; rowId < constantExprsList.size(); rowId++) { - NamedExpression namedExpression = constantExprsList.get(rowId).get(columnId); - Expression otherConstant = namedExpression.child(0); - nullable |= otherConstant.nullable(); - DataType otherDataType = otherConstant.getDataType(); - commonDataType = LogicalSetOperation.getAssignmentCompatibleType(commonDataType, otherDataType); - } - return Pair.of(commonDataType, nullable); - } - - private static List<List<NamedExpression>> castToCommonType( - List<List<NamedExpression>> rows, List<DataType> commonDataTypes, int columnCount) { - ImmutableList.Builder<List<NamedExpression>> castedConstants = ImmutableList - .builderWithExpectedSize(rows.size()); - for (List<NamedExpression> row : rows) { - castedConstants.add(castToCommonType(row, commonDataTypes)); - } - return castedConstants.build(); - } - - private static List<NamedExpression> castToCommonType(List<NamedExpression> row, List<DataType> commonTypes) { - ImmutableList.Builder<NamedExpression> castedRow = ImmutableList.builderWithExpectedSize(row.size()); - boolean changed = false; - for (int columnId = 0; columnId < row.size(); columnId++) { - NamedExpression constantAlias = row.get(columnId); - Expression constant = constantAlias.child(0); - DataType commonType = commonTypes.get(columnId); - if (commonType.equals(constant.getDataType())) { - castedRow.add(constantAlias); - } else { - changed = true; - Expression expression = TypeCoercionUtils.castIfNotSameTypeStrict(constant, commonType); - castedRow.add((NamedExpression) constantAlias.withChildren(expression)); - } - } - return changed ? castedRow.build() : row; - } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalRecursiveCteRecursiveChild.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalRecursiveCteRecursiveChild.java index f73323b50bb..55747f5ba12 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalRecursiveCteRecursiveChild.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalRecursiveCteRecursiveChild.java @@ -76,6 +76,11 @@ public class LogicalRecursiveCteRecursiveChild<CHILD_TYPE extends Plan> extends return new LogicalRecursiveCteRecursiveChild<>(groupExpression, logicalProperties, children); } + @Override + public String toString() { + return "LogicalRecursiveCteRecursiveChild(MUST_SHUFFLE)"; + } + @Override public void computeUnique(DataTrait.Builder builder) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalRecursiveCteScan.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalRecursiveCteScan.java index 7b1fc4862ee..110a6297858 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalRecursiveCteScan.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalRecursiveCteScan.java @@ -24,6 +24,7 @@ import org.apache.doris.nereids.trees.plans.Plan; import org.apache.doris.nereids.trees.plans.PlanType; import org.apache.doris.nereids.trees.plans.RelationId; import org.apache.doris.nereids.trees.plans.visitor.PlanVisitor; +import org.apache.doris.nereids.util.Utils; import java.util.List; import java.util.Optional; @@ -41,22 +42,42 @@ public class LogicalRecursiveCteScan extends LogicalCatalogRelation { super(relationId, PlanType.LOGICAL_RECURSIVE_CTE_SCAN, table, qualifier, groupExpression, logicalProperties); } + private LogicalRecursiveCteScan(RelationId relationId, TableIf table, List<String> qualifier, + Optional<GroupExpression> groupExpression, Optional<LogicalProperties> logicalProperties, + String tableAlias) { + super(relationId, PlanType.LOGICAL_RECURSIVE_CTE_SCAN, table, qualifier, groupExpression, + logicalProperties, tableAlias); + } + + @Override + public String toString() { + return Utils.toSqlString("LogicalRecursiveCteScan", + "recursive cte name", table.getName()); + } + @Override public Plan withGroupExpression(Optional<GroupExpression> groupExpression) { return new LogicalRecursiveCteScan(relationId, table, qualifier, - groupExpression, Optional.ofNullable(getLogicalProperties())); + groupExpression, Optional.ofNullable(getLogicalProperties()), tableAlias); } @Override public Plan withGroupExprLogicalPropChildren(Optional<GroupExpression> groupExpression, Optional<LogicalProperties> logicalProperties, List<Plan> children) { - return new LogicalRecursiveCteScan(relationId, table, qualifier, groupExpression, logicalProperties); + return new LogicalRecursiveCteScan(relationId, table, qualifier, groupExpression, logicalProperties, + tableAlias); } @Override public LogicalCatalogRelation withRelationId(RelationId relationId) { return new LogicalRecursiveCteScan(relationId, table, qualifier, - groupExpression, Optional.ofNullable(getLogicalProperties())); + groupExpression, Optional.ofNullable(getLogicalProperties()), tableAlias); + } + + @Override + public LogicalCatalogRelation withTableAlias(String tableAlias) { + return new LogicalRecursiveCteScan(relationId, table, qualifier, + groupExpression, Optional.ofNullable(getLogicalProperties()), tableAlias); } @Override diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalSubQueryAlias.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalSubQueryAlias.java index f7f8d718e07..8b18d3ce49d 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalSubQueryAlias.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalSubQueryAlias.java @@ -17,6 +17,7 @@ package org.apache.doris.nereids.trees.plans.logical; +import org.apache.doris.nereids.analyzer.UnboundRelation; import org.apache.doris.nereids.memo.GroupExpression; import org.apache.doris.nereids.properties.DataTrait; import org.apache.doris.nereids.properties.LogicalProperties; @@ -27,6 +28,7 @@ import org.apache.doris.nereids.trees.plans.Plan; import org.apache.doris.nereids.trees.plans.PlanType; import org.apache.doris.nereids.trees.plans.RelationId; import org.apache.doris.nereids.trees.plans.visitor.PlanVisitor; +import org.apache.doris.nereids.util.LazyCompute; import org.apache.doris.nereids.util.Utils; import com.google.common.base.Preconditions; @@ -42,6 +44,7 @@ import java.util.Map; import java.util.Objects; import java.util.Optional; import java.util.Set; +import java.util.function.Supplier; import java.util.stream.Collectors; /** @@ -55,37 +58,31 @@ public class LogicalSubQueryAlias<CHILD_TYPE extends Plan> extends LogicalUnary< protected RelationId relationId; private final List<String> qualifier; private final Optional<List<String>> columnAliases; - - private final boolean isRecursiveCte; + private final Supplier<Boolean> isRecursiveCte; public LogicalSubQueryAlias(String tableAlias, CHILD_TYPE child) { - this(ImmutableList.of(tableAlias), Optional.empty(), false, Optional.empty(), Optional.empty(), child); + this(ImmutableList.of(tableAlias), Optional.empty(), Optional.empty(), Optional.empty(), child); } public LogicalSubQueryAlias(List<String> qualifier, CHILD_TYPE child) { - this(qualifier, Optional.empty(), false, Optional.empty(), Optional.empty(), child); + this(qualifier, Optional.empty(), Optional.empty(), Optional.empty(), child); } public LogicalSubQueryAlias(String tableAlias, Optional<List<String>> columnAliases, CHILD_TYPE child) { - this(ImmutableList.of(tableAlias), columnAliases, false, Optional.empty(), Optional.empty(), child); - } - - public LogicalSubQueryAlias(String tableAlias, Optional<List<String>> columnAliases, boolean isRecursiveCte, - CHILD_TYPE child) { - this(ImmutableList.of(tableAlias), columnAliases, isRecursiveCte, Optional.empty(), Optional.empty(), child); + this(ImmutableList.of(tableAlias), columnAliases, Optional.empty(), Optional.empty(), child); } public LogicalSubQueryAlias(List<String> qualifier, Optional<List<String>> columnAliases, CHILD_TYPE child) { - this(qualifier, columnAliases, false, Optional.empty(), Optional.empty(), child); + this(qualifier, columnAliases, Optional.empty(), Optional.empty(), child); } - public LogicalSubQueryAlias(List<String> qualifier, Optional<List<String>> columnAliases, boolean isRecursiveCte, + public LogicalSubQueryAlias(List<String> qualifier, Optional<List<String>> columnAliases, Optional<GroupExpression> groupExpression, Optional<LogicalProperties> logicalProperties, CHILD_TYPE child) { super(PlanType.LOGICAL_SUBQUERY_ALIAS, groupExpression, logicalProperties, child); this.qualifier = ImmutableList.copyOf(Objects.requireNonNull(qualifier, "qualifier is null")); this.columnAliases = columnAliases; - this.isRecursiveCte = isRecursiveCte; + this.isRecursiveCte = computeIsRecursiveCte(); } @Override @@ -129,6 +126,23 @@ public class LogicalSubQueryAlias<CHILD_TYPE extends Plan> extends LogicalUnary< return currentOutput.build(); } + private Supplier<Boolean> computeIsRecursiveCte() { + return LazyCompute.of(() -> { + List<UnboundRelation> relationList = collectToList(UnboundRelation.class::isInstance); + for (UnboundRelation relation : relationList) { + List<String> nameParts = relation.getNameParts(); + if (nameParts.size() == 1 && nameParts.get(0).equalsIgnoreCase(getAlias())) { + return true; + } + } + return false; + }); + } + + public boolean isRecursiveCte() { + return isRecursiveCte.get(); + } + public String getAlias() { return qualifier.get(qualifier.size() - 1); } @@ -137,14 +151,10 @@ public class LogicalSubQueryAlias<CHILD_TYPE extends Plan> extends LogicalUnary< return columnAliases; } - public boolean isRecursiveCte() { - return isRecursiveCte; - } - @Override public String toString() { return columnAliases.map(strings -> Utils.toSqlString("LogicalSubQueryAlias", - "qualifier", qualifier, "isRecursiveCte", isRecursiveCte, + "qualifier", qualifier, "columnAliases", StringUtils.join(strings, ",") )).orElseGet(() -> Utils.toSqlString("LogicalSubQueryAlias", "qualifier", qualifier @@ -183,8 +193,7 @@ public class LogicalSubQueryAlias<CHILD_TYPE extends Plan> extends LogicalUnary< @Override public LogicalSubQueryAlias<Plan> withChildren(List<Plan> children) { Preconditions.checkArgument(children.size() == 1); - return new LogicalSubQueryAlias<>(qualifier, columnAliases, isRecursiveCte, Optional.empty(), Optional.empty(), - children.get(0)); + return new LogicalSubQueryAlias<>(qualifier, columnAliases, children.get(0)); } @Override @@ -199,7 +208,7 @@ public class LogicalSubQueryAlias<CHILD_TYPE extends Plan> extends LogicalUnary< @Override public LogicalSubQueryAlias<CHILD_TYPE> withGroupExpression(Optional<GroupExpression> groupExpression) { - return new LogicalSubQueryAlias<>(qualifier, columnAliases, isRecursiveCte, groupExpression, + return new LogicalSubQueryAlias<>(qualifier, columnAliases, groupExpression, Optional.of(getLogicalProperties()), child()); } @@ -207,7 +216,7 @@ public class LogicalSubQueryAlias<CHILD_TYPE extends Plan> extends LogicalUnary< public Plan withGroupExprLogicalPropChildren(Optional<GroupExpression> groupExpression, Optional<LogicalProperties> logicalProperties, List<Plan> children) { Preconditions.checkArgument(children.size() == 1); - return new LogicalSubQueryAlias<>(qualifier, columnAliases, isRecursiveCte, groupExpression, logicalProperties, + return new LogicalSubQueryAlias<>(qualifier, columnAliases, groupExpression, logicalProperties, children.get(0)); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalRecursiveCteRecursiveChild.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalRecursiveCteRecursiveChild.java index 5f598d8feac..25903905b7a 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalRecursiveCteRecursiveChild.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/physical/PhysicalRecursiveCteRecursiveChild.java @@ -54,6 +54,11 @@ public class PhysicalRecursiveCteRecursiveChild<CHILD_TYPE extends Plan> extends statistics, child); } + @Override + public String toString() { + return "PhysicalRecursiveCteRecursiveChild(MUST_SHUFFLE)"; + } + @Override public Plan withChildren(List<Plan> children) { Preconditions.checkArgument(children.size() == 1); diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/RecursiveCteNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/RecursiveCteNode.java index b4075d66663..7d9eecd35be 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/RecursiveCteNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/RecursiveCteNode.java @@ -14,18 +14,19 @@ // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. -// This file is copied from -// https://github.com/apache/impala/blob/branch-2.9.0/fe/src/main/java/org/apache/impala/UnionNode.java -// and modified by Doris package org.apache.doris.planner; +import org.apache.doris.analysis.Expr; import org.apache.doris.analysis.TupleId; import org.apache.doris.statistics.StatisticalType; +import org.apache.doris.thrift.TExplainLevel; import org.apache.doris.thrift.TPlanNode; import org.apache.doris.thrift.TPlanNodeType; import org.apache.doris.thrift.TRecCTENode; +import com.google.common.base.MoreObjects; + public class RecursiveCteNode extends SetOperationNode { private boolean isUnionAll; @@ -49,4 +50,24 @@ public class RecursiveCteNode extends SetOperationNode { msg.node_type = TPlanNodeType.REC_CTE_NODE; msg.rec_cte_node = tRecCTENode; } + + @Override + public String getNodeExplainString(String prefix, TExplainLevel detailLevel) { + StringBuilder output = new StringBuilder(); + output.append(prefix).append("Recursive Cte: ").append("\n"); + output.append(prefix).append("isUnionAll: ").append(isUnionAll).append("\n"); + if (!conjuncts.isEmpty()) { + Expr expr = convertConjunctsToAndCompoundPredicate(conjuncts); + output.append(prefix).append("PREDICATES: ").append(expr.toSql()).append("\n"); + } + return output.toString(); + } + + @Override + public String toString() { + return MoreObjects.toStringHelper(this) + .add("id", getId().asInt()) + .add("tid", tupleId.asInt()) + .add("isUnionAll", isUnionAll).toString(); + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/RecursiveCteScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/RecursiveCteScanNode.java index eb63ba6fcb7..9603e3cfa63 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/RecursiveCteScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/RecursiveCteScanNode.java @@ -31,6 +31,7 @@ import org.apache.doris.thrift.TScanRange; import org.apache.doris.thrift.TScanRangeLocation; import org.apache.doris.thrift.TScanRangeLocations; +import com.google.common.base.MoreObjects; import com.google.common.collect.Lists; import java.util.Collections; @@ -101,6 +102,14 @@ public class RecursiveCteScanNode extends ScanNode { return output.toString(); } + @Override + public String toString() { + return MoreObjects.toStringHelper(this) + .add("recursiveCteName", recursiveCteName) + .add("id", getId().asInt()) + .add("tid", desc.getId().asInt()).toString(); + } + @Override protected void toThrift(TPlanNode msg) { msg.node_type = TPlanNodeType.REC_CTE_SCAN_NODE; diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/runtime/ThriftPlansBuilder.java b/fe/fe-core/src/main/java/org/apache/doris/qe/runtime/ThriftPlansBuilder.java index df71172baad..e556c3555ee 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/runtime/ThriftPlansBuilder.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/runtime/ThriftPlansBuilder.java @@ -83,6 +83,7 @@ import org.apache.logging.log4j.Logger; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; +import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Map.Entry; @@ -99,7 +100,7 @@ public class ThriftPlansBuilder { CoordinatorContext coordinatorContext) { List<PipelineDistributedPlan> distributedPlans = coordinatorContext.distributedPlans; - setParamsForRecursiveCteNode(distributedPlans); + Set<Integer> fragmentToNotifyClose = setParamsForRecursiveCteNode(distributedPlans); // we should set runtime predicate first, then we can use heap sort and to thrift setRuntimePredicateIfNeed(coordinatorContext.scanNodes); @@ -125,7 +126,7 @@ public class ThriftPlansBuilder { TPipelineFragmentParams currentFragmentParam = fragmentToThriftIfAbsent( currentFragmentPlan, instanceJob, workerToCurrentFragment, instancesPerWorker, exchangeSenderNum, sharedFileScanRangeParams, - workerProcessInstanceNum, coordinatorContext); + workerProcessInstanceNum, fragmentToNotifyClose, coordinatorContext); TPipelineInstanceParams instanceParam = instanceToThrift( currentFragmentParam, instanceJob, currentInstanceIndex++); @@ -331,6 +332,7 @@ public class ThriftPlansBuilder { Map<Integer, Integer> exchangeSenderNum, Map<Integer, TFileScanRangeParams> fileScanRangeParamsMap, Multiset<DistributedPlanWorker> workerProcessInstanceNum, + Set<Integer> fragmentToNotifyClose, CoordinatorContext coordinatorContext) { DistributedPlanWorker worker = assignedJob.getAssignedWorker(); return workerToFragmentParams.computeIfAbsent(worker, w -> { @@ -344,6 +346,9 @@ public class ThriftPlansBuilder { params.setDescTbl(coordinatorContext.descriptorTable); params.setQueryId(coordinatorContext.queryId); params.setFragmentId(fragment.getFragmentId().asInt()); + if (fragmentToNotifyClose.contains(params.getFragmentId())) { + params.setNeedNotifyClose(true); + } // Each tParam will set the total number of Fragments that need to be executed on the same BE, // and the BE will determine whether all Fragments have been executed based on this information. @@ -582,7 +587,8 @@ public class ThriftPlansBuilder { } } - private static void setParamsForRecursiveCteNode(List<PipelineDistributedPlan> distributedPlans) { + private static Set<Integer> setParamsForRecursiveCteNode(List<PipelineDistributedPlan> distributedPlans) { + Set<Integer> fragmentToNotifyClose = new HashSet<>(); Map<PlanFragmentId, TRecCTETarget> fragmentIdToRecCteTargetMap = new TreeMap<>(); Map<PlanFragmentId, Set<TNetworkAddress>> fragmentIdToNetworkAddressMap = new TreeMap<>(); for (PipelineDistributedPlan plan : distributedPlans) { @@ -634,6 +640,7 @@ public class ThriftPlansBuilder { planFragment.getChild(0).collectAll(PlanFragment.class::isInstance, childFragments); for (PlanFragment child : childFragments) { PlanFragmentId childFragmentId = child.getFragmentId(); + fragmentToNotifyClose.add(childFragmentId.asInt()); TRecCTETarget tRecCTETarget = fragmentIdToRecCteTargetMap.getOrDefault(childFragmentId, null); if (tRecCTETarget != null) { targets.add(tRecCTETarget); @@ -665,6 +672,7 @@ public class ThriftPlansBuilder { recursiveCteNode.settRecCTENode(tRecCTENode); } } + return fragmentToNotifyClose; } private static class PerNodeScanParams { diff --git a/gensrc/thrift/PaloInternalService.thrift b/gensrc/thrift/PaloInternalService.thrift index c5982ed4b0f..37bbc177b27 100644 --- a/gensrc/thrift/PaloInternalService.thrift +++ b/gensrc/thrift/PaloInternalService.thrift @@ -653,6 +653,7 @@ struct TPipelineFragmentParams { // Used by 2.1 44: optional list<i32> topn_filter_source_node_ids 45: optional map<string, TAIResource> ai_resources + 46: optional bool need_notify_close // For cloud 1000: optional bool is_mow_table; --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
