github-actions[bot] commented on code in PR #63736: URL: https://github.com/apache/doris/pull/63736#discussion_r3351230397
########## fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/PullUpProjectExprUnderTopN.java: ########## @@ -0,0 +1,642 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.nereids.rules.rewrite; + +import org.apache.doris.nereids.jobs.JobContext; +import org.apache.doris.nereids.properties.OrderKey; +import org.apache.doris.nereids.trees.expressions.Alias; +import org.apache.doris.nereids.trees.expressions.ExprId; +import org.apache.doris.nereids.trees.expressions.Expression; +import org.apache.doris.nereids.trees.expressions.NamedExpression; +import org.apache.doris.nereids.trees.expressions.Slot; +import org.apache.doris.nereids.trees.expressions.functions.NoneMovableFunction; +import org.apache.doris.nereids.trees.expressions.functions.scalar.Score; +import org.apache.doris.nereids.trees.expressions.literal.Literal; +import org.apache.doris.nereids.trees.plans.Plan; +import org.apache.doris.nereids.trees.plans.logical.LogicalAggregate; +import org.apache.doris.nereids.trees.plans.logical.LogicalCTEProducer; +import org.apache.doris.nereids.trees.plans.logical.LogicalProject; +import org.apache.doris.nereids.trees.plans.logical.LogicalRelation; +import org.apache.doris.nereids.trees.plans.logical.LogicalRepeat; +import org.apache.doris.nereids.trees.plans.logical.LogicalSetOperation; +import org.apache.doris.nereids.trees.plans.logical.LogicalTopN; +import org.apache.doris.nereids.trees.plans.logical.LogicalWindow; +import org.apache.doris.nereids.trees.plans.visitor.CustomRewriter; +import org.apache.doris.nereids.trees.plans.visitor.DefaultPlanRewriter; +import org.apache.doris.nereids.util.ExpressionUtils; +import org.apache.doris.qe.ConnectContext; +import org.apache.doris.qe.SessionVariable; + +import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableList; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.IdentityHashMap; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; + +/** + * Pull up non-trivial expressions from Projects below TopN to above TopN, + * exposing their input base columns as lazy materialization candidates. + * + * <p>Two-pass CustomRewriter: + * <ol> + * <li><b>Collector (top-down)</b>: walk the plan tree, find qualifying TopNs, + * walk into their descendants to find Projects with pull-able expressions. + * Any operator that references a slot blocks pulling up expressions that + * output that slot past it. Boundary nodes (Aggregate, Window, Repeat, + * Relation, CTEProducer) stop the walk. + * Set operators are treated as blockers for the current TopN but their + * children are still traversed so nested TopNs inside them are visited.</li> + * <li><b>Replacer (bottom-up)</b>: simplify found Projects and add upper + * Projects above TopN to restore pulled-up expressions.</li> + * </ol> + */ +public class PullUpProjectExprUnderTopN implements CustomRewriter { + + @Override + public Plan rewriteRoot(Plan plan, JobContext jobContext) { + ConnectContext ctx = jobContext.getCascadesContext() + .getStatementContext().getConnectContext(); + if (ctx != null && !ctx.getSessionVariable().enableTopnExprPullup) { + return plan; + } + + // Pass 1: Collect pull-up info + CollectorContext collectorCtx = new CollectorContext(); + plan.accept(new Collector(), collectorCtx); + + if (collectorCtx.topNToPullUpInfo.isEmpty()) { + return plan; + } + + // Deduplicate: when nested TopNs both try to pull up the same expression + // from the same Project, keep it only in the outermost TopN. + deduplicatePullUps(collectorCtx); + + // Pass 2: Replace/restructure + return plan.accept(new Replacer(), collectorCtx); + } + + // ========================================================================= + // Data structures + // ========================================================================= + + /** Info collected per TopN about which expressions to pull up from which Projects. */ + static class PullUpInfo { + final LogicalTopN topN; + final List<Slot> originalTopNOutput; + final List<NamedExpression> allPulledUpExprs = new ArrayList<>(); + final Map<LogicalProject<? extends Plan>, List<NamedExpression>> projectToPulledUpExprs + = new LinkedHashMap<>(); + final Map<ExprId, List<Slot>> baseSlotsByExpr = new HashMap<>(); + final Map<ExprId, NamedExpression> passThroughExprByDeduplicatedExpr = new HashMap<>(); + + PullUpInfo(LogicalTopN topN) { + this.topN = topN; + this.originalTopNOutput = ImmutableList.copyOf(topN.getOutput()); + } + + void addPulledUpExpr(LogicalProject<? extends Plan> project, NamedExpression expr) { + allPulledUpExprs.add(expr); + projectToPulledUpExprs.computeIfAbsent(project, k -> new ArrayList<>()).add(expr); + baseSlotsByExpr.put(expr.getExprId(), ImmutableList.copyOf(expr.getInputSlots())); + } + + void addPassThroughExprForDeduplicatedExpr(NamedExpression expr) { + passThroughExprByDeduplicatedExpr.put(expr.getExprId(), expr); + } + } + + /** Context shared between collector and replacer passes. */ + static class CollectorContext { + final Map<LogicalTopN, PullUpInfo> topNToPullUpInfo = new LinkedHashMap<>(); + final Map<Slot, Expression> pullUpExprReplaceMap = new LinkedHashMap<>(); + /** + * When collectFromNode encounters a nested TopN, it saves the current + * blockedExprIds (accumulated from outer nodes) so that visitLogicalTopN + * for the inner TopN can merge them into its fresh blocked set. + */ + final Map<LogicalTopN, Set<ExprId>> outerBlockedByTopN = new IdentityHashMap<>(); + int cteProducerDepth = 0; + + boolean hasPullUpInfo(LogicalTopN topN) { + return topNToPullUpInfo.containsKey(topN); + } + + PullUpInfo getPullUpInfo(LogicalTopN topN) { + return topNToPullUpInfo.get(topN); + } + + void addPullUpExprReplace(NamedExpression expr) { + if (expr instanceof Alias) { + pullUpExprReplaceMap.putIfAbsent(expr.toSlot(), expr.child(0)); + } + } + } + + // ========================================================================= + // Pass 1: Collector (top-down) + // ========================================================================= + + private static boolean qualifiesForLazyMatThreshold(LogicalTopN topN) { + long limit = topN.getLimit(); + if (limit <= 0) { + return false; + } + long threshold = SessionVariable.getTopNLazyMaterializationThreshold(); + return threshold >= limit; + } + + static class Collector extends DefaultPlanRewriter<CollectorContext> { + + @Override + public Plan visitLogicalCTEProducer( + LogicalCTEProducer<? extends Plan> cteProducer, CollectorContext context) { + context.cteProducerDepth++; + try { + return visit(cteProducer, context); + } finally { + context.cteProducerDepth--; + } + } + + @Override + public Plan visitLogicalTopN(LogicalTopN topN, CollectorContext context) { + if (context.cteProducerDepth > 0 + || !qualifiesForLazyMatThreshold(topN)) { + return visit(topN, context); + } + PullUpInfo info = new PullUpInfo(topN); + // Seed blockedExprIds with this TopN's order key ExprIds so that + // expressions used by order keys are not pulled up past this TopN. + Set<ExprId> blockedExprIds = buildOrderKeyExprIds(topN); + // If this is a nested TopN, merge in the outer blocked set that was + // saved by collectFromNode when it encountered this TopN. This + // ensures that slots consumed by outer operators (e.g. join + // conditions above this TopN) also block pull-up from projects + // under this TopN. + Set<ExprId> outerBlocked = context.outerBlockedByTopN.remove(topN); + if (outerBlocked != null) { + blockedExprIds.addAll(outerBlocked); + } + collectFromNode((Plan) topN.child(0), info, blockedExprIds, context); + if (!info.allPulledUpExprs.isEmpty()) { + for (NamedExpression expr : info.allPulledUpExprs) { + context.addPullUpExprReplace(expr); + } + context.topNToPullUpInfo.put(topN, info); + } + return visit(topN, context); + } + } + + /** + * Recursively walk down from a TopN's child to find Projects with pull-able expressions. + * + * <p>{@code blockedExprIds} contains ExprIds of slots that are referenced by operators + * along the path from the TopN to the current node. An expression whose output ExprId + * is in this set cannot be pulled up past the operators that reference it. + */ + private static void collectFromNode(Plan node, PullUpInfo info, Set<ExprId> blockedExprIds, + CollectorContext context) { + if (node instanceof LogicalProject) { + LogicalProject<? extends Plan> project = (LogicalProject<? extends Plan>) node; + for (NamedExpression ne : project.getProjects()) { + if (canPullUp(ne) && !blockedExprIds.contains(ne.getExprId())) { + info.addPulledUpExpr(project, ne); + } + } + // Continue into the project's child. Chained projects are all visited. + collectFromNode((Plan) project.child(0), info, blockedExprIds, context); + return; + } + + if (node instanceof LogicalTopN) { + LogicalTopN inner = (LogicalTopN) node; + // Save the current blockedExprIds (accumulated from outer nodes + // such as outer TopN + intermediate Joins) so that the inner + // TopN's own visitLogicalTopN can merge them into its fresh + // blocked set. Without this, outer join condition slots would + // not block pull-up from projects under the inner TopN. + context.outerBlockedByTopN.put(inner, new HashSet<>(blockedExprIds)); + // TopN preserves all input columns, so it doesn't block by itself. + // However, its order keys consume slots, so add them to blocked set. + // Do NOT reset blockedExprIds — intermediate operators between the + // outer and inner TopN must still block expressions. + Set<ExprId> newBlocked = new HashSet<>(blockedExprIds); + newBlocked.addAll(buildOrderKeyExprIds(inner)); + collectFromNode((Plan) inner.child(0), info, newBlocked, context); + return; + } + + // Stop at boundary nodes that transform the schema or are data sources. + if (node instanceof LogicalRelation || node instanceof LogicalCTEProducer + || isBlockingNode(node)) { + return; + } + + // Set operations are a boundary for the current TopN: do NOT collect + // expressions from below them. UNION ALL children may compute the same + // output column with different expressions (e.g. a+1 vs a+2), and a + // single pull-up Project above the TopN cannot represent branch-specific + // semantics. The normal visitor will still traverse into the children, + // so nested TopNs inside set operations are handled independently. + if (node instanceof LogicalSetOperation) { + return; + } + + // For all other nodes, add their input slot ExprIds to the blocked set. + // Any operator that references a slot in its expressions prevents + // expressions that output that slot from being pulled up past it. + Set<ExprId> newBlocked = new HashSet<>(blockedExprIds); + for (Expression expr : node.getExpressions()) { + newBlocked.addAll(expr.getInputSlotExprIds()); + if (expr instanceof NamedExpression) { + newBlocked.add(((NamedExpression) expr).getExprId()); + } + } + + for (Plan child : node.children()) { + collectFromNode(child, info, newBlocked, context); Review Comment: This traversal also walks through null-generating joins, but the blocked set only contains slots referenced by the join expressions. That lets the rule pull expressions from the nullable side of an outer join above the join, which is not semantics-preserving for non-null-preserving expressions. For example, `TopN(order by l.id) -> LEFT JOIN(left=l, right=Project(ifnull(r.b, 0) AS x, r.id))` currently can collect `x` because the join condition references `r.id` but not `x`. Before the rewrite, unmatched left rows get `x = NULL` from join null-extension of the right tuple; after simplification the upper project computes `ifnull(NULL, 0)` and returns `0`. Please treat null-generating join sides as boundaries, or at least block the outputs from those sides, and add a negative test for a nullable-side expression such as `ifnull`/`coalesce`. ########## fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/materialize/LazyMaterializeTopN.java: ########## @@ -85,36 +79,82 @@ public Plan visitPhysicalTopN(PhysicalTopN topN, CascadesContext ctx) { } } - private Plan computeTopN(PhysicalTopN topN, CascadesContext ctx) { + private Plan computeTopN(PhysicalTopN<? extends Plan> topN, CascadesContext ctx) { if (hasMaterialized) { return topN; } if (SessionVariable.getTopNLazyMaterializationThreshold() < topN.getLimit()) { return topN; } - /* - topn(output=[x] orderkey=[b]) - ->project(a as x) - ->T(a, b) - 'x' can be lazy materialized. - materializeMap: x->(T, a) - */ + try { + List<Slot> userVisibleOutput = ImmutableList.copyOf(topN.getOutput()); + List<Slot> effectiveOutput = ImmutableList.copyOf(topN.getOutput()); + Plan result = doComputeTopN(topN, ctx, effectiveOutput); + if (result == topN) { + return topN; + } + result = new PhysicalProject(ImmutableList.copyOf(userVisibleOutput), null, result); + return result; + } catch (RuntimeException e) { + LOG.warn("lazy materialize topn failed for plan: {}", topN.shapeInfo(), e); + return topN; + } + } + + private LazySlotPruning createLazySlotPruning() { + return new LazySlotPruning() { + @Override + protected boolean shouldPruneChild(Plan child, Context context) { + return true; + } Review Comment: Overriding `shouldPruneChild()` to always return true makes `LazySlotPruning` recurse into every child branch, not only the branch that contains `context.lazySlots`. In a join-shaped TopN where only one relation has lazy slots, the visitor also reaches the sibling scan; `visitPhysicalOlapScan()`/`visitPhysicalFileScan()` then sees that the sibling output does not contain the lazy slots and throws `Lazy materialize fault`. `computeTopN()` catches that `RuntimeException` and returns the original TopN, so join cases that were otherwise eligible silently lose lazy materialization. Please keep the original output-containment guard for unrelated branches and address stale logical properties in a narrower way, and add a test with TopN above a join where only one side is lazily materialized. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
