github-actions[bot] commented on code in PR #63736:
URL: https://github.com/apache/doris/pull/63736#discussion_r3355611667


##########
fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/PullUpProjectExprUnderTopN.java:
##########
@@ -0,0 +1,710 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.nereids.rules.rewrite;
+
+import org.apache.doris.nereids.jobs.JobContext;
+import org.apache.doris.nereids.properties.OrderKey;
+import org.apache.doris.nereids.trees.expressions.Alias;
+import org.apache.doris.nereids.trees.expressions.ExprId;
+import org.apache.doris.nereids.trees.expressions.Expression;
+import org.apache.doris.nereids.trees.expressions.NamedExpression;
+import org.apache.doris.nereids.trees.expressions.Slot;
+import 
org.apache.doris.nereids.trees.expressions.functions.NoneMovableFunction;
+import org.apache.doris.nereids.trees.expressions.functions.scalar.Score;
+import org.apache.doris.nereids.trees.expressions.literal.Literal;
+import org.apache.doris.nereids.trees.plans.JoinType;
+import org.apache.doris.nereids.trees.plans.Plan;
+import org.apache.doris.nereids.trees.plans.logical.LogicalAggregate;
+import org.apache.doris.nereids.trees.plans.logical.LogicalCTEProducer;
+import org.apache.doris.nereids.trees.plans.logical.LogicalJoin;
+import org.apache.doris.nereids.trees.plans.logical.LogicalProject;
+import org.apache.doris.nereids.trees.plans.logical.LogicalRelation;
+import org.apache.doris.nereids.trees.plans.logical.LogicalRepeat;
+import org.apache.doris.nereids.trees.plans.logical.LogicalSetOperation;
+import org.apache.doris.nereids.trees.plans.logical.LogicalTopN;
+import org.apache.doris.nereids.trees.plans.logical.LogicalWindow;
+import org.apache.doris.nereids.trees.plans.visitor.CustomRewriter;
+import org.apache.doris.nereids.trees.plans.visitor.DefaultPlanRewriter;
+import org.apache.doris.nereids.util.ExpressionUtils;
+import org.apache.doris.qe.ConnectContext;
+import org.apache.doris.qe.SessionVariable;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableList;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.IdentityHashMap;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * Pull up non-trivial expressions from Projects below TopN to above TopN,
+ * exposing their input base columns as lazy materialization candidates.
+ *
+ * <p>Two-pass CustomRewriter:
+ * <ol>
+ * <li><b>Collector (top-down)</b>: walk the plan tree, find qualifying TopNs,
+ *     walk into their descendants to find Projects with pull-able expressions.
+ *     Any operator that references a slot blocks pulling up expressions that
+ *     output that slot past it. Boundary nodes (Aggregate, Window, Repeat,
+ *     Relation, CTEProducer) stop the walk.
+ *     Set operators are treated as blockers for the current TopN but their
+ *     children are still traversed so nested TopNs inside them are 
visited.</li>
+ * <li><b>Replacer (bottom-up)</b>: simplify found Projects and add upper
+ *     Projects above TopN to restore pulled-up expressions.</li>
+ * </ol>
+ */
+public class PullUpProjectExprUnderTopN implements CustomRewriter {
+
+    @Override
+    public Plan rewriteRoot(Plan plan, JobContext jobContext) {
+        ConnectContext ctx = jobContext.getCascadesContext()
+                .getStatementContext().getConnectContext();
+        if (ctx != null && !ctx.getSessionVariable().enableTopnExprPullup) {
+            return plan;
+        }
+
+        // Pass 1: Collect pull-up info
+        CollectorContext collectorCtx = new CollectorContext();
+        plan.accept(new Collector(), collectorCtx);
+
+        if (collectorCtx.topNToPullUpInfo.isEmpty()) {
+            return plan;
+        }
+
+        // Deduplicate: when nested TopNs both try to pull up the same 
expression
+        // from the same Project, keep it only in the outermost TopN.
+        deduplicatePullUps(collectorCtx);
+
+        // Pass 2: Replace/restructure
+        return plan.accept(new Replacer(), collectorCtx);
+    }
+
+    // 
=========================================================================
+    // Data structures
+    // 
=========================================================================
+
+    /** Info collected per TopN about which expressions to pull up from which 
Projects. */
+    static class PullUpInfo {
+        final LogicalTopN topN;
+        final List<Slot> originalTopNOutput;
+        final List<NamedExpression> allPulledUpExprs = new ArrayList<>();
+        final Map<LogicalProject<? extends Plan>, List<NamedExpression>> 
projectToPulledUpExprs
+                = new LinkedHashMap<>();
+        final Map<ExprId, List<Slot>> baseSlotsByExpr = new HashMap<>();
+        final Map<ExprId, NamedExpression> passThroughExprByDeduplicatedExpr = 
new HashMap<>();
+
+        PullUpInfo(LogicalTopN topN) {
+            this.topN = topN;
+            this.originalTopNOutput = ImmutableList.copyOf(topN.getOutput());
+        }
+
+        void addPulledUpExpr(LogicalProject<? extends Plan> project, 
NamedExpression expr) {
+            allPulledUpExprs.add(expr);
+            projectToPulledUpExprs.computeIfAbsent(project, k -> new 
ArrayList<>()).add(expr);
+            baseSlotsByExpr.put(expr.getExprId(), 
ImmutableList.copyOf(expr.getInputSlots()));
+        }
+
+        void addPassThroughExprForDeduplicatedExpr(NamedExpression expr) {
+            passThroughExprByDeduplicatedExpr.put(expr.getExprId(), expr);
+        }
+    }
+
+    /** Context shared between collector and replacer passes. */
+    static class CollectorContext {
+        /**
+         * Use IdentityHashMap so that two different TopN nodes with the same
+         * content (orderKeys, limit, offset) are treated as distinct keys.
+         * LogicalTopN.equals() is content-based, which would cause unrelated
+         * TopN nodes to collide in a regular HashMap/LinkedHashMap.
+         */
+        final Map<LogicalTopN, PullUpInfo> topNToPullUpInfo = new 
IdentityHashMap<>();
+        /**
+         * Maintain insertion order for deterministic outer-to-inner iteration
+         * in dedup and other passes. The Collector visits the plan top-down,
+         * so the order is naturally outer-before-inner.
+         */
+        final List<LogicalTopN> topNOrder = new ArrayList<>();
+        final Map<Slot, Expression> pullUpExprReplaceMap = new 
LinkedHashMap<>();
+        /**
+         * When collectFromNode encounters a nested TopN, it saves the current
+         * blockedExprIds (accumulated from outer nodes) so that 
visitLogicalTopN
+         * for the inner TopN can merge them into its fresh blocked set.
+         */
+        final Map<LogicalTopN, Set<ExprId>> outerBlockedByTopN = new 
IdentityHashMap<>();
+        int cteProducerDepth = 0;
+
+        boolean hasPullUpInfo(LogicalTopN topN) {
+            return topNToPullUpInfo.containsKey(topN);
+        }
+
+        PullUpInfo getPullUpInfo(LogicalTopN topN) {
+            return topNToPullUpInfo.get(topN);
+        }
+
+        void addPullUpInfo(LogicalTopN topN, PullUpInfo info) {
+            topNToPullUpInfo.put(topN, info);
+            topNOrder.add(topN);
+        }
+
+        void addPullUpExprReplace(NamedExpression expr) {
+            if (expr instanceof Alias) {
+                pullUpExprReplaceMap.putIfAbsent(expr.toSlot(), expr.child(0));
+            }
+        }
+    }
+
+    // 
=========================================================================
+    // Pass 1: Collector (top-down)
+    // 
=========================================================================
+
+    private static boolean qualifiesForLazyMatThreshold(LogicalTopN topN) {
+        long limit = topN.getLimit();
+        if (limit <= 0) {
+            return false;
+        }
+        long threshold = SessionVariable.getTopNLazyMaterializationThreshold();
+        return threshold >= limit;
+    }
+
+    static class Collector extends DefaultPlanRewriter<CollectorContext> {
+
+        @Override
+        public Plan visitLogicalCTEProducer(
+                LogicalCTEProducer<? extends Plan> cteProducer, 
CollectorContext context) {
+            context.cteProducerDepth++;
+            try {
+                return visit(cteProducer, context);
+            } finally {
+                context.cteProducerDepth--;
+            }
+        }
+
+        @Override
+        public Plan visitLogicalTopN(LogicalTopN topN, CollectorContext 
context) {
+            if (context.cteProducerDepth > 0
+                    || !qualifiesForLazyMatThreshold(topN)) {
+                return visit(topN, context);
+            }
+            PullUpInfo info = new PullUpInfo(topN);
+            // Seed blockedExprIds with this TopN's order key ExprIds so that
+            // expressions used by order keys are not pulled up past this TopN.
+            Set<ExprId> blockedExprIds = buildOrderKeyExprIds(topN);
+            // If this is a nested TopN, merge in the outer blocked set that 
was
+            // saved by collectFromNode when it encountered this TopN. This
+            // ensures that slots consumed by outer operators (e.g. join
+            // conditions above this TopN) also block pull-up from projects
+            // under this TopN.
+            Set<ExprId> outerBlocked = context.outerBlockedByTopN.remove(topN);
+            if (outerBlocked != null) {
+                blockedExprIds.addAll(outerBlocked);
+            }
+            collectFromNode((Plan) topN.child(0), info, blockedExprIds, 
context);
+            if (!info.allPulledUpExprs.isEmpty()) {
+                for (NamedExpression expr : info.allPulledUpExprs) {
+                    context.addPullUpExprReplace(expr);
+                }
+                context.addPullUpInfo(topN, info);
+            }
+            return visit(topN, context);
+        }
+    }
+
+    /**
+     * Recursively walk down from a TopN's child to find Projects with 
pull-able expressions.
+     *
+     * <p>{@code blockedExprIds} contains ExprIds of slots that are referenced 
by operators
+     * along the path from the TopN to the current node. An expression whose 
output ExprId
+     * is in this set cannot be pulled up past the operators that reference it.
+     */
+    private static void collectFromNode(Plan node, PullUpInfo info, 
Set<ExprId> blockedExprIds,
+            CollectorContext context) {
+        if (node instanceof LogicalProject) {
+            LogicalProject<? extends Plan> project = (LogicalProject<? extends 
Plan>) node;
+            for (NamedExpression ne : project.getProjects()) {
+                if (canPullUp(ne) && !blockedExprIds.contains(ne.getExprId())) 
{
+                    info.addPulledUpExpr(project, ne);
+                }
+            }
+            // Continue into the project's child. Chained projects are all 
visited.
+            collectFromNode((Plan) project.child(0), info, blockedExprIds, 
context);
+            return;
+        }
+
+        if (node instanceof LogicalTopN) {
+            LogicalTopN inner = (LogicalTopN) node;
+            // Save the current blockedExprIds (accumulated from outer nodes
+            // such as outer TopN + intermediate Joins) so that the inner
+            // TopN's own visitLogicalTopN can merge them into its fresh
+            // blocked set. Without this, outer join condition slots would
+            // not block pull-up from projects under the inner TopN.
+            context.outerBlockedByTopN.put(inner, new 
HashSet<>(blockedExprIds));
+            // TopN preserves all input columns, so it doesn't block by itself.
+            // However, its order keys consume slots, so add them to blocked 
set.
+            // Do NOT reset blockedExprIds — intermediate operators between the
+            // outer and inner TopN must still block expressions.
+            Set<ExprId> newBlocked = new HashSet<>(blockedExprIds);
+            newBlocked.addAll(buildOrderKeyExprIds(inner));
+            collectFromNode((Plan) inner.child(0), info, newBlocked, context);
+            return;
+        }
+
+        // Stop at boundary nodes that transform the schema or are data 
sources.
+        if (node instanceof LogicalRelation || node instanceof 
LogicalCTEProducer
+                || isBlockingNode(node)) {
+            return;
+        }
+
+        // Set operations are a boundary for the current TopN: do NOT collect
+        // expressions from below them. UNION ALL children may compute the same
+        // output column with different expressions (e.g. a+1 vs a+2), and a
+        // single pull-up Project above the TopN cannot represent 
branch-specific
+        // semantics. The normal visitor will still traverse into the children,
+        // so nested TopNs inside set operations are handled independently.
+        if (node instanceof LogicalSetOperation) {
+            return;
+        }
+
+        // For null-generating outer joins, block all output slots from the
+        // nullable side(s). Expressions inside a nullable side are protected
+        // by join null-extension: when there is no match, the entire nullable
+        // tuple is set to NULL. Pulling such an expression above the join
+        // would break this, e.g. ifnull(r.b, 0) inside the right side of a
+        // LEFT JOIN would see individual column NULLs and convert them to 0,
+        // changing the NULL that null-extension produced.
+        // Example: SELECT l.id, sub.x FROM l LEFT JOIN (
+        //            SELECT id, ifnull(b, 0) AS x FROM r) sub ON l.id = sub.id
+        //          ORDER BY l.id LIMIT 3;
+        // Here x=ifnull(b,0) is in a Project on the nullable (right) side.
+        // Pulling it above the join turns unmatched-row x from NULL to 0.
+        if (node instanceof LogicalJoin) {
+            LogicalJoin<?, ?> join = (LogicalJoin<?, ?>) node;
+            JoinType joinType = join.getJoinType();
+            Set<ExprId> newBlocked = new HashSet<>(blockedExprIds);
+            // add join expression slots (same as default branch)
+            for (Expression expr : node.getExpressions()) {
+                newBlocked.addAll(expr.getInputSlotExprIds());
+                if (expr instanceof NamedExpression) {
+                    newBlocked.add(((NamedExpression) expr).getExprId());
+                }
+            }
+            // block all output slots from the nullable side(s)
+            if (joinType.isLeftOuterJoin() || joinType.isFullOuterJoin()) {

Review Comment:
   The nullable-side guard still misses ASOF outer joins. 
`ASOF_LEFT_OUTER_JOIN` also null-extends the right side, and 
`ASOF_RIGHT_OUTER_JOIN` null-extends the left side, but these checks only cover 
ordinary `LEFT_OUTER_JOIN`/`RIGHT_OUTER_JOIN`/`FULL_OUTER_JOIN`. A plan such as 
`TopN -> Project(l.ts, x) -> ASOF LEFT OUTER JOIN(left=l, 
right=Project(ifnull(r.v, 0) AS x, r.ts))` can still collect `x` from the right 
child; after simplification/restoration, unmatched rows evaluate `ifnull(NULL, 
0)` above the join instead of preserving the null-extended `x = NULL`. This is 
distinct from the existing ordinary outer-join thread because the current fix 
does not include the ASOF outer join types. Please include 
`joinType.isAsofLeftOuterJoin()` in the right-side blocking condition and 
`joinType.isAsofRightOuterJoin()` in the left-side blocking condition, and add 
an ASOF outer join negative test.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to