This is an automated email from the ASF dual-hosted git repository.
englefly pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 6f2b7dac320 [fix](cte) unnest subquery before RewriteCteChildren
(#60717)
6f2b7dac320 is described below
commit 6f2b7dac320514ecd34cee7db5818c58b2015455
Author: minghong <[email protected]>
AuthorDate: Fri Feb 13 10:19:25 2026 +0800
[fix](cte) unnest subquery before RewriteCteChildren (#60717)
### What problem does this PR solve?
APPLY_TO_JOIN should be applied on CTE producer before
REWRITE_CTE_CHILDREN. Some rules depends on StatsDerive, and StatsDerive
assumes that all slots used by a plan node is from its children's
outputs. But LogicalApply breaks this assumption.
---
.../doris/nereids/jobs/executor/Rewriter.java | 259 +++++++++++----------
.../org/apache/doris/nereids/util/PlanChecker.java | 8 +
.../cte/subquery_in_cte/subquery_in_cte.groovy | 67 ++++++
3 files changed, 209 insertions(+), 125 deletions(-)
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/jobs/executor/Rewriter.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/jobs/executor/Rewriter.java
index a1c4504e8c5..a39a3c0a9ff 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/jobs/executor/Rewriter.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/jobs/executor/Rewriter.java
@@ -421,67 +421,69 @@ public class Rewriter extends AbstractBatchJobExecutor {
)
);
+ private static final List<RewriteJob> NORMALIZE_PLAN_JOBS = jobs(
+ topic("Plan Normalization",
+ custom(RuleType.FOLD_CONSTANT_FOR_SQL_CACHE,
FoldConstantForSqlCache::new),
+ // move MergeProjects rule from analyze phase
+ // because SubqueryToApply and BindSink rule may create
extra project node
+ // we need merge them at the beginning of rewrite phase to
let later rules happy
+ topDown(new MergeProjectable()),
+ topDown(
+ new EliminateOrderByConstant(),
+ new EliminateSortUnderSubqueryOrView(),
+ // MergeProjects depends on this rule
+ new LogicalSubQueryAliasToLogicalProject(),
+ // TODO: we should do expression normalization
after plan normalization
+ // because some rewritten depends on sub
expression tree matching
+ // such as group by key matching and replaced
+ // but we need to do some normalization before
subquery unnesting,
+ // such as extract common expression.
+
ExpressionNormalizationAndOptimization.FULL_RULE_INSTANCE,
+ new AvgDistinctToSumDivCount(),
+ new CountDistinctRewrite(),
+ new ExtractFilterFromCrossJoin()
+ ),
+ topDown(
+ // ExtractSingleTableExpressionFromDisjunction
conflict to InPredicateToEqualToRule
+ // in the ExpressionNormalization, so must invoke
in another job, otherwise dead loop.
+ new ExtractSingleTableExpressionFromDisjunction()
+ )
+ ),
+ // subquery unnesting relay on ExpressionNormalization to extract
common factor expression
+ topic("Subquery unnesting",
+ cascadesContext ->
cascadesContext.rewritePlanContainsTypes(LogicalApply.class),
+ // after doing NormalizeAggregate in analysis job
+ // we need run the following 2 rules to make
AGG_SCALAR_SUBQUERY_TO_WINDOW_FUNCTION work
+ bottomUp(new PullUpProjectUnderApply()),
+ topDown(
+ new PushDownFilterThroughProject(),
+ // the subquery may have where and having clause
+ // so there may be two filters we need to merge
them
+ new MergeFilters()
+ ),
+ // query rewrite support window, so add this rule here
+ custom(RuleType.AGG_SCALAR_SUBQUERY_TO_WINDOW_FUNCTION,
AggScalarSubQueryToWindowFunction::new),
+ bottomUp(
+ new EliminateUselessPlanUnderApply(),
+ // CorrelateApplyToUnCorrelateApply and ApplyToJoin
+ // and SelectMaterializedIndexWithAggregate
depends on this rule
+ new MergeProjectable(),
+ /*
+ * Subquery unnesting.
+ * 1. Adjust the plan in correlated logicalApply
+ * so that there are no correlated columns in
the subquery.
+ * 2. Convert logicalApply to a logicalJoin.
+ * TODO: group these rules to make sure the
result plan is what we expected.
+ */
+ new CorrelateApplyToUnCorrelateApply(),
+ new ApplyToJoin()
+ )
+ )
+ );
+
private static final List<RewriteJob>
CTE_CHILDREN_REWRITE_JOBS_BEFORE_SUB_PATH_PUSH_DOWN = notTraverseChildrenOf(
ImmutableSet.of(LogicalCTEAnchor.class),
() -> jobs(
- topic("Plan Normalization",
- custom(RuleType.FOLD_CONSTANT_FOR_SQL_CACHE,
FoldConstantForSqlCache::new),
- // move MergeProjects rule from analyze phase
- // because SubqueryToApply and BindSink rule may
create extra project node
- // we need merge them at the beginning of rewrite
phase to let later rules happy
- topDown(new MergeProjectable()),
- topDown(
- new EliminateOrderByConstant(),
- new EliminateSortUnderSubqueryOrView(),
- // MergeProjects depends on this rule
- new LogicalSubQueryAliasToLogicalProject(),
- // TODO: we should do expression normalization
after plan normalization
- // because some rewritten depends on sub
expression tree matching
- // such as group by key matching and replaced
- // but we need to do some normalization
before subquery unnesting,
- // such as extract common expression.
-
ExpressionNormalizationAndOptimization.FULL_RULE_INSTANCE,
- new AvgDistinctToSumDivCount(),
- new CountDistinctRewrite(),
- new ExtractFilterFromCrossJoin()
- ),
- topDown(
- // ExtractSingleTableExpressionFromDisjunction
conflict to InPredicateToEqualToRule
- // in the ExpressionNormalization, so must
invoke in another job, otherwise dead loop.
- new
ExtractSingleTableExpressionFromDisjunction()
- )
- ),
- // subquery unnesting relay on ExpressionNormalization to
extract common factor expression
- topic("Subquery unnesting",
- cascadesContext ->
cascadesContext.rewritePlanContainsTypes(LogicalApply.class),
- // after doing NormalizeAggregate in analysis job
- // we need run the following 2 rules to make
AGG_SCALAR_SUBQUERY_TO_WINDOW_FUNCTION work
- bottomUp(new PullUpProjectUnderApply()),
- topDown(
- new PushDownFilterThroughProject(),
- // the subquery may have where and having
clause
- // so there may be two filters we need to
merge them
- new MergeFilters()
- ),
- // query rewrite support window, so add this rule here
-
custom(RuleType.AGG_SCALAR_SUBQUERY_TO_WINDOW_FUNCTION,
AggScalarSubQueryToWindowFunction::new),
- bottomUp(
- new EliminateUselessPlanUnderApply(),
- // CorrelateApplyToUnCorrelateApply and
ApplyToJoin
- // and SelectMaterializedIndexWithAggregate
depends on this rule
- new MergeProjectable(),
- /*
- * Subquery unnesting.
- * 1. Adjust the plan in correlated
logicalApply
- * so that there are no correlated columns
in the subquery.
- * 2. Convert logicalApply to a logicalJoin.
- * TODO: group these rules to make sure the
result plan is what we expected.
- */
- new CorrelateApplyToUnCorrelateApply(),
- new ApplyToJoin()
- )
- ),
-
// before `Subquery unnesting` topic, some correlate slots
should have appeared at LogicalApply.left,
// but it appeared at LogicalApply.right. After the `Subquery
unnesting` topic, all slots is placed in a
// normal position, then we can check column privileges by
these steps
@@ -862,14 +864,14 @@ public class Rewriter extends AbstractBatchJobExecutor {
public static Rewriter getWholeTreeRewriterWithCustomJobs(
CascadesContext cascadesContext, List<RewriteJob> jobs) {
List<RewriteJob> wholeTreeRewriteJobs = getWholeTreeRewriteJobs(
- false, false, jobs, ImmutableList.of(), true);
+ false, false, jobs, ImmutableList.of(), true, false);
return new Rewriter(cascadesContext, wholeTreeRewriteJobs, true);
}
private static List<RewriteJob> getWholeTreeRewriteJobs(boolean
runCboRules) {
return getWholeTreeRewriteJobs(true, true,
CTE_CHILDREN_REWRITE_JOBS_BEFORE_SUB_PATH_PUSH_DOWN,
- CTE_CHILDREN_REWRITE_JOBS_AFTER_SUB_PATH_PUSH_DOWN,
runCboRules);
+ CTE_CHILDREN_REWRITE_JOBS_AFTER_SUB_PATH_PUSH_DOWN,
runCboRules, true);
}
private static List<RewriteJob> getWholeTreeRewriteJobs(
@@ -877,77 +879,84 @@ public class Rewriter extends AbstractBatchJobExecutor {
boolean needOrExpansion,
List<RewriteJob> beforePushDownJobs,
List<RewriteJob> afterPushDownJobs,
- boolean runCboRules) {
+ boolean runCboRules,
+ boolean includeNormalizePlanJobs) {
+ ImmutableList.Builder<RewriteJob> builder = ImmutableList.builder();
+ if (includeNormalizePlanJobs) {
+ builder.addAll(NORMALIZE_PLAN_JOBS);
+ }
+ builder.addAll(notTraverseChildrenOf(
+ ImmutableSet.of(LogicalCTEAnchor.class),
+ () -> {
+ List<RewriteJob> rewriteJobs =
Lists.newArrayListWithExpectedSize(300);
- return notTraverseChildrenOf(
- ImmutableSet.of(LogicalCTEAnchor.class),
- () -> {
- List<RewriteJob> rewriteJobs =
Lists.newArrayListWithExpectedSize(300);
+ rewriteJobs.addAll(jobs(
+ topic("cte inline and pull up all cte anchor",
+ custom(RuleType.PULL_UP_CTE_ANCHOR,
PullUpCteAnchor::new),
+ custom(RuleType.CTE_INLINE, CTEInline::new)
+ ),
+ topic("process limit session variables",
+ custom(RuleType.ADD_DEFAULT_LIMIT,
AddDefaultLimit::new)
+ ),
+ topic("record query tmp plan for mv pre rewrite",
+
custom(RuleType.RECORD_PLAN_FOR_MV_PRE_REWRITE, RecordPlanForMvPreRewrite::new)
+ ),
+ topic("rewrite cte sub-tree before sub path push
down",
+ custom(RuleType.REWRITE_CTE_CHILDREN,
+ () -> new
RewriteCteChildren(beforePushDownJobs, runCboRules)
+ )
+ )));
+ rewriteJobs.addAll(jobs(topic("convert outer join to anti",
+ custom(RuleType.CONVERT_OUTER_JOIN_TO_ANTI,
ConvertOuterJoinToAntiJoin::new))));
+ rewriteJobs.addAll(jobs(topic("eliminate group by key by
uniform",
+ custom(RuleType.ELIMINATE_GROUP_BY_KEY_BY_UNIFORM,
EliminateGroupByKeyByUniform::new))));
+ if (needOrExpansion) {
+ rewriteJobs.addAll(jobs(topic("or expansion",
+ custom(RuleType.OR_EXPANSION, () ->
OrExpansion.INSTANCE))));
+ }
+ rewriteJobs.add(topic("repeat rewrite",
+ custom(RuleType.DECOMPOSE_REPEAT, () ->
DecomposeRepeatWithPreAggregation.INSTANCE)));
+
+ rewriteJobs.addAll(jobs(topic("split multi distinct",
+ custom(RuleType.DISTINCT_AGG_STRATEGY_SELECTOR,
+ () ->
DistinctAggStrategySelector.INSTANCE))));
+
+ // Rewrite search function before VariantSubPathPruning
+ // so that ElementAt expressions from search can be
processed
+ rewriteJobs.addAll(jobs(
+ bottomUp(new RewriteSearchToSlots())
+ ));
- rewriteJobs.addAll(jobs(
- topic("cte inline and pull up all cte anchor",
- custom(RuleType.PULL_UP_CTE_ANCHOR,
PullUpCteAnchor::new),
- custom(RuleType.CTE_INLINE, CTEInline::new)
- ),
- topic("process limit session variables",
- custom(RuleType.ADD_DEFAULT_LIMIT,
AddDefaultLimit::new)
- ),
- topic("record query tmp plan for mv pre rewrite",
-
custom(RuleType.RECORD_PLAN_FOR_MV_PRE_REWRITE, RecordPlanForMvPreRewrite::new)
- ),
- topic("rewrite cte sub-tree before sub path push down",
- custom(RuleType.REWRITE_CTE_CHILDREN,
- () -> new
RewriteCteChildren(beforePushDownJobs, runCboRules)
+ if (needSubPathPushDown) {
+ rewriteJobs.addAll(jobs(
+ topic("variant element_at push down",
+
custom(RuleType.VARIANT_SUB_PATH_PRUNING, VariantSubPathPruning::new)
)
- )));
- rewriteJobs.addAll(jobs(topic("convert outer join to anti",
- custom(RuleType.CONVERT_OUTER_JOIN_TO_ANTI,
ConvertOuterJoinToAntiJoin::new))));
- rewriteJobs.addAll(jobs(topic("eliminate group by key by
uniform",
- custom(RuleType.ELIMINATE_GROUP_BY_KEY_BY_UNIFORM,
EliminateGroupByKeyByUniform::new))));
- if (needOrExpansion) {
- rewriteJobs.addAll(jobs(topic("or expansion",
- custom(RuleType.OR_EXPANSION, () ->
OrExpansion.INSTANCE))));
- }
- rewriteJobs.add(topic("repeat rewrite",
- custom(RuleType.DECOMPOSE_REPEAT, () ->
DecomposeRepeatWithPreAggregation.INSTANCE)));
- rewriteJobs.addAll(jobs(topic("split multi distinct",
- custom(RuleType.DISTINCT_AGG_STRATEGY_SELECTOR, () ->
DistinctAggStrategySelector.INSTANCE))));
-
- // Rewrite search function before VariantSubPathPruning
- // so that ElementAt expressions from search can be processed
- rewriteJobs.addAll(jobs(
- bottomUp(new RewriteSearchToSlots())
- ));
-
- if (needSubPathPushDown) {
- rewriteJobs.addAll(jobs(
- topic("variant element_at push down",
- custom(RuleType.VARIANT_SUB_PATH_PRUNING,
VariantSubPathPruning::new)
+ ));
+ }
+ rewriteJobs.add(
+ topic("nested column prune",
+ custom(RuleType.NESTED_COLUMN_PRUNING,
NestedColumnPruning::new)
)
+ );
+ rewriteJobs.addAll(jobs(
+ topic("rewrite cte sub-tree after sub path push
down",
+ custom(RuleType.CLEAR_CONTEXT_STATUS,
ClearContextStatus::new),
+ custom(RuleType.REWRITE_CTE_CHILDREN,
+ () -> new
RewriteCteChildren(afterPushDownJobs, runCboRules)
+ )
+ ),
+ topic("whole plan check",
+ custom(RuleType.ADJUST_NULLABLE, () -> new
AdjustNullable(false))
+ ),
+ // NullableDependentExpressionRewrite need to be
done after nullable fixed
+ topic("condition function",
bottomUp(ImmutableList.of(
+ new NullableDependentExpressionRewrite())))
));
+ return rewriteJobs;
}
- rewriteJobs.add(
- topic("nested column prune",
- custom(RuleType.NESTED_COLUMN_PRUNING,
NestedColumnPruning::new)
- )
- );
- rewriteJobs.addAll(jobs(
- topic("rewrite cte sub-tree after sub path push down",
- custom(RuleType.CLEAR_CONTEXT_STATUS,
ClearContextStatus::new),
- custom(RuleType.REWRITE_CTE_CHILDREN,
- () -> new
RewriteCteChildren(afterPushDownJobs, runCboRules)
- )
- ),
- topic("whole plan check",
- custom(RuleType.ADJUST_NULLABLE, () -> new
AdjustNullable(false))
- ),
- // NullableDependentExpressionRewrite need to be done
after nullable fixed
- topic("condition function", bottomUp(ImmutableList.of(
- new NullableDependentExpressionRewrite())))
- ));
- return rewriteJobs;
- }
- );
+ ));
+ return builder.build();
}
@Override
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/nereids/util/PlanChecker.java
b/fe/fe-core/src/test/java/org/apache/doris/nereids/util/PlanChecker.java
index 5084ef09823..be5a92ef08b 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/nereids/util/PlanChecker.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/nereids/util/PlanChecker.java
@@ -226,6 +226,14 @@ public class PlanChecker {
return planChecker;
}
+ public PlanChecker applyTopDown(RuleFactory... ruleFactories) {
+ List<Rule> allRules = new ArrayList<>();
+ for (RuleFactory factory : ruleFactories) {
+ allRules.addAll(factory.buildRules());
+ }
+ return applyTopDown(allRules);
+ }
+
public PlanChecker applyTopDown(List<Rule> rule) {
Rewriter.getWholeTreeRewriterWithCustomJobs(cascadesContext,
ImmutableList.of(new RootPlanTreeRewriteJob(new
FilteredRules(rule), PlanTreeRewriteTopDownJob::new, true)))
diff --git
a/regression-test/suites/nereids_p0/cte/subquery_in_cte/subquery_in_cte.groovy
b/regression-test/suites/nereids_p0/cte/subquery_in_cte/subquery_in_cte.groovy
new file mode 100644
index 00000000000..dc8b388e8b2
--- /dev/null
+++
b/regression-test/suites/nereids_p0/cte/subquery_in_cte/subquery_in_cte.groovy
@@ -0,0 +1,67 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+suite("subquery_in_cte") {
+ sql """
+ drop table if exists t1;
+
+ create table t1(a1 int,b1 int)
+ properties("replication_num" = "1");
+
+ insert into t1 values(1,2);
+
+ drop table if exists t2;
+
+ create table t2(a2 int,b2 int)
+ properties("replication_num" = "1");
+
+ insert into t2 values(1,3);
+ """
+
+ sql"""
+ with cte1 as (
+ select t1.a1, t1.b1
+ from t1
+ where t1.a1 > 0 and exists (select 1 from t2 where t1.a1 = t2.a2 or t1.a1
= t2.b2)
+ )
+ select * from cte1 union all select * from cte1;
+ """
+
+ /* TEST PURPOSE: APPLY_TO_JOIN should be applied on CTE producer before
REWRITE_CTE_CHILDREN,
+ considering the following analyzed plan, StatsDerive reports NPE when
visiting LogicalFilter[26],
+ a1#1 is not from its child's outputs.
+
----------------------------------------------------------------------------------------------------------------
+ Explain String(Nereids Planner)
+ LogicalResultSink[74] ( outputExprs=[a1#9, b1#10] )
+ +--LogicalCteAnchor[73] ( cteId=CTEId#0 )
+ |--LogicalCteProducer[63] ( cteId=CTEId#0 )
+ | +--LogicalSubQueryAlias ( qualifier=[cte1] )
+ | +--LogicalProject[36] ( distinct=false, projects=[a1#1, b1#2] )
+ | +--LogicalProject[35] ( distinct=false, projects=[a1#1, b1#2]
)
+ | +--LogicalFilter[34] ( predicates=(a1#1 > 0) )
+ | +--LogicalProject[33] ( distinct=false, projects=[a1#1,
b1#2] )
+ | +--LogicalApply ( correlationSlot=[a1#1],
correlationFilter=Optional.empty, isMarkJoin=false,
isMarkJoinSlotNotNull=false, MarkJoinSlotReference=empty )
+ | |--LogicalOlapScan ( qualified=internal.test.t1,
indexName=<index_not_selected>, selectedIndexId=1767221999857, preAgg=UNSET,
operativeCol=[], virtualColumns=[] )
+ | +--LogicalProject[27] ( distinct=false,
projects=[1 AS `1`#0] )
+ | +--LogicalFilter[26] ( predicates=OR[(a1#1 =
a2#3),(a1#1 = b2#4)] )
+ | +--LogicalOlapScan (
qualified=internal.test.t2, indexName=<index_not_selected>,
selectedIndexId=1767221999881, preAgg=UNSET, operativeCol=[], virtualColumns=[]
)
+ +--LogicalUnion ( qualifier=ALL, outputs=[a1#9, b1#10],
regularChildrenOutputs=[[a1#5, b1#6], [a1#7, b1#8]], constantExprsList=[],
hasPushedFilter=false )
+ |--LogicalProject[65] ( distinct=false, projects=[a1#5, b1#6] )
+ | +--LogicalCteConsumer[64] ( cteId=CTEId#0,
relationId=RelationId#0, name=cte1 )
+ +--LogicalProject[67] ( distinct=false, projects=[a1#7, b1#8] )
+ +--LogicalCteConsumer[66] ( cteId=CTEId#0,
relationId=RelationId#1, name=cte1 )
+ */
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]