github-actions[bot] commented on code in PR #63763:
URL: https://github.com/apache/doris/pull/63763#discussion_r3491863006


##########
fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/AggScalarSubQueryToWindowFunction.java:
##########
@@ -362,17 +510,253 @@ private Plan rewrite(LogicalFilter<? extends Plan> 
filter, LogicalApply<Plan, Pl
         windowFilterConjunct = ExpressionUtils.replace(windowFilterConjunct,
                 ImmutableMap.of(aggOut.toSlot(), aggOutExpr));
 
-        LogicalFilter<Plan> newFilter = 
filter.withConjunctsAndChild(conjuncts.get(true), apply.left());
+        // Split uncorrelated conjuncts: predicates that reference ONLY shared
+        // relation slots (tables appearing in both outer and inner plans) must
+        // stay ABOVE the window. Otherwise the window function would see a
+        // different set of rows than the original scalar subquery.
+        //
+        // For example, with fact(f) as shared table and dim(d) as outer-only:
+        //   f.v > 6        → shared-only → must stay above the window
+        //   d.tag > 0      → outer-only  → safe below the window
+        //   f.k = d.k      → join cond   → needed below the window
+        //
+        // We find shared tables by comparing table IDs that appear in both
+        // outer and inner plans, then collect ALL output slots of those
+        // tables (not just columns referenced in the inner query).
+        List<CatalogRelation> outerRels = outerPlans.stream()
+                .filter(CatalogRelation.class::isInstance)
+                .map(CatalogRelation.class::cast)
+                .collect(Collectors.toList());
+        List<CatalogRelation> innerRels = innerPlans.stream()
+                .filter(CatalogRelation.class::isInstance)
+                .map(CatalogRelation.class::cast)
+                .collect(Collectors.toList());
+        Set<Long> innerTableIds = innerRels.stream()
+                .map(r -> r.getTable().getId())
+                .collect(Collectors.toSet());
+        Set<ExprId> sharedOuterExprIds = outerRels.stream()
+                .filter(r -> innerTableIds.contains(r.getTable().getId()))
+                .flatMap(r -> r.getOutput().stream())
+                .map(Slot::getExprId)
+                .collect(Collectors.toSet());
+        Set<Expression> uncorrelatedConjuncts = conjuncts.get(true);
+        Set<Expression> belowWindowConjuncts = Sets.newHashSet();
+        Set<Expression> aboveWindowConjuncts = Sets.newHashSet();
+        if (uncorrelatedConjuncts != null) {
+            for (Expression conj : uncorrelatedConjuncts) {
+                // Conjuncts that were matched against inner subquery filter
+                // conjuncts (tracked by checkFilter) must stay BELOW the
+                // window.  They are semantically part of the inner aggregate's
+                // filter, not extra outer-only predicates.  Placing them above
+                // the window would let the window see more rows than the
+                // original scalar subquery, producing wrong aggregate results.
+                if (matchedInnerFilterConjuncts.contains(conj)) {
+                    belowWindowConjuncts.add(conj);
+                    continue;
+                }
+                // Volatile predicates (e.g. random() > 0.5) must stay ABOVE
+                // the window.  Pushing them below would let the window
+                // aggregate over a different set of rows per partition than
+                // the original scalar subquery.  PushDownFilterThroughWindow
+                // has the same hazard and explicitly rejects volatile
+                // predicates for this reason.
+                if (conj.containsVolatileExpression()) {
+                    aboveWindowConjuncts.add(conj);
+                    continue;
+                }
+                // Any predicate that references shared-table slots (tables
+                // appearing in both outer and inner plans) must stay ABOVE
+                // the window.  This handles both shared-only predicates
+                // (e.g. f.v > 6) and mixed shared+outer predicates
+                // (e.g. f.v > d.tag).  Pushing them below would restrict
+                // the rows seen by the window function, producing a
+                // different aggregate than the original scalar subquery.
+                boolean hasShared = false;
+                for (ExprId id : conj.getInputSlotExprIds()) {
+                    if (sharedOuterExprIds.contains(id)) {
+                        hasShared = true;
+                        break;
+                    }
+                }
+                if (hasShared) {
+                    aboveWindowConjuncts.add(conj);
+                } else {
+                    // No shared-table references: the predicate involves only
+                    // outer-only table columns.  Safe to place below the 
window.
+                    belowWindowConjuncts.add(conj);
+                }
+            }
+        }
+
+        // Extract and classify conjuncts from any LogicalFilter nodes nested
+        // inside the outer child (apply.left()).  Nested shared-table filters
+        // (e.g. f.v > 6 under a CrossJoin) must be hoisted ABOVE the window;
+        // otherwise the window would aggregate over a filtered subset of rows
+        // while the original scalar subquery sees all rows for the key.
+        //
+        // Nested outer-only filters (e.g. d.tag > 0) are safe below and can
+        // stay in place after the filters are stripped.
+        List<LogicalFilter<Plan>> nestedOuterFilters = apply.left()
+                .collectToList(LogicalFilter.class::isInstance);
+        Set<ExprId> extractedConjunctExprIds = Sets.newHashSet();
+        for (LogicalFilter<Plan> nf : nestedOuterFilters) {
+            for (Expression conj : nf.getConjuncts()) {
+                // Matched inner-filter conjuncts always go below.
+                if (matchedInnerFilterConjuncts.contains(conj)) {
+                    belowWindowConjuncts.add(conj);
+                    
extractedConjunctExprIds.addAll(conj.getInputSlotExprIds());
+                    continue;
+                }
+                // Volatile (e.g. random()) and NoneMovableFunction
+                // (e.g. assert_true()) predicates in nested filters must NOT
+                // be hoisted: moving them across a join or window changes
+                // their evaluation context.  Keeping them in place is only
+                // safe when the predicate belongs exclusively to the
+                // outer-only relation.  A volatile / side-effecting predicate
+                // on a shared table would restrict the window's input relative
+                // to the original scalar subquery, producing wrong results.
+                // Reject such rewrites.
+                if (conj.containsVolatileExpression()
+                        || conj.containsType(NoneMovableFunction.class)) {
+                    Set<Long> tablesUnderFilter = nf.collect(
+                            CatalogRelation.class::isInstance).stream()
+                            .map(r -> ((CatalogRelation) r).getTable().getId())
+                            .collect(Collectors.toSet());
+                    if (!Sets.intersection(tablesUnderFilter, 
innerTableIds).isEmpty()) {
+                        return filter;
+                    }
+                    continue;
+                }
+                boolean nfHasShared = false;
+                for (ExprId id : conj.getInputSlotExprIds()) {
+                    if (sharedOuterExprIds.contains(id)) {
+                        nfHasShared = true;
+                        break;
+                    }
+                }
+                if (nfHasShared) {
+                    // Shared-table predicate → hoist above the window.
+                    aboveWindowConjuncts.add(conj);
+                } else {
+                    // Outer-only predicate → safe below, keep it there.
+                    belowWindowConjuncts.add(conj);
+                }
+                extractedConjunctExprIds.addAll(conj.getInputSlotExprIds());
+            }
+        }
+        // Strip all nested LogicalFilter nodes from the outer child so the
+        // window operates on the unfiltered scan/join.
+        Plan strippedOuterChild = stripOuterFilters(apply.left());
+
+        // The window function's aggregate references shared-table slots
+        // (e.g. f.v for SUM(f.v) after slot replacement).  A pruning
+        // project inside apply.left() may have dropped those slots even
+        // when there are no nested filters to extract.  Collect the
+        // replaced aggregate's input slot ExprIds and include them in
+        // the project-expansion set so ensureProjectOutput carries them
+        // through.
+        Set<ExprId> allNeededExprIds = 
Sets.newHashSet(extractedConjunctExprIds);
+        allNeededExprIds.addAll(ExpressionUtils.replace(function, 
innerOuterSlotMap)
+                .getInputSlotExprIds());
+        strippedOuterChild = ensureProjectOutput(strippedOuterChild,
+                allNeededExprIds);
+
+        LogicalFilter<Plan> newFilter = filter.withConjunctsAndChild(
+                belowWindowConjuncts, strippedOuterChild);
         LogicalWindow<Plan> newWindow = new 
LogicalWindow<>(ImmutableList.of(windowFunctionAlias), newFilter);
-        LogicalFilter<Plan> windowFilter = new 
LogicalFilter<>(ImmutableSet.of(windowFilterConjunct), newWindow);
+
+        // Combine shared-table predicates with the window comparison 
predicate above the window
+        Set<Expression> topConjuncts = Sets.newHashSet(windowFilterConjunct);
+        topConjuncts.addAll(aboveWindowConjuncts);
+        LogicalFilter<Plan> windowFilter = new 
LogicalFilter<>(ImmutableSet.copyOf(topConjuncts), newWindow);
         return windowFilter;
     }
 
+    /**
+     * Ensure that every LogicalProject in the plan tree outputs all slots
+     * referenced by extracted conjuncts (ExprIds in {@code neededExprIds}).
+     * If a project prunes away a needed slot, the reinserted filter
+     * predicates would have dangling references.
+     *
+     * <p>Recursively walks the tree; expands each project to include any
+     * missing slots as simple identity projections (SlotReference →
+     * SlotReference).
+     */
+    private Plan ensureProjectOutput(Plan plan, Set<ExprId> neededExprIds) {
+        if (neededExprIds.isEmpty()) {
+            return plan;
+        }
+        if (plan instanceof LogicalProject) {
+            LogicalProject<Plan> project = (LogicalProject<Plan>) plan;
+            Set<ExprId> projectOutputExprIds = project.getOutputExprIdSet();
+            Set<Slot> childOutput = project.child().getOutputSet();

Review Comment:
   `ensureProjectOutput()` still has a dangling-slot path when the shared table 
is behind stacked pruning projects:
   
   ```text
   SubQueryAlias sf
     Project(k)
       Project(k)
         Scan fact(k, v)
   ```
   
   `allNeededExprIds` correctly includes `v`, but this method reads the outer 
project's `childOutput` before recursively expanding the inner project. The 
outer project therefore cannot add `v`; after the inner child is expanded, line 
711 rebuilds the outer project with only `k`, so the window/filter above can 
still reference `sf.v` above a child that does not output it. Please recurse 
into the child before computing the current `childOutput`, or otherwise iterate 
to a fixpoint, and add coverage with at least two slot-only projects between 
the shared-table scan and the apply left branch.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to