github-actions[bot] commented on code in PR #63763:
URL: https://github.com/apache/doris/pull/63763#discussion_r3453030891


##########
fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/AggScalarSubQueryToWindowFunction.java:
##########
@@ -79,22 +81,29 @@ public class AggScalarSubQueryToWindowFunction extends 
DefaultPlanRewriter<JobCo
 
     private static final Set<Class<? extends LogicalPlan>> 
OUTER_SUPPORTED_PLAN = ImmutableSet.of(
             LogicalJoin.class,
+            LogicalFilter.class,
             LogicalProject.class,

Review Comment:
   Adding `LogicalFilter` to the supported outer shape makes the rule accept 
filters that are already inside `apply.left()`, but `rewrite()` only 
reclassifies conjuncts from the top filter above the `Apply`. For a shape like:
   
   ```text
   Filter(f.k = d.k, f.v * 2 > sum_alias)
     Apply(correlation: d.k)
       CrossJoin
         Filter(f.v > 6)
           Scan fact f
         Scan dim d  -- unique on k
       Aggregate(sum(f2.v) AS sum_alias)
         Filter(f2.k = d.k)
           Scan fact f2
   ```
   
   `checkFilter()` matches `f.k = d.k`, and the new `LogicalFilter` allowance 
lets the nested `f.v > 6` pass. `rewrite()` then places `apply.left()` 
unchanged under the window, so `SUM(f.v) OVER (PARTITION BY d.k)` is computed 
after `f.v > 6`, while the original scalar subquery sums all `fact` rows for 
the key. Please either reject outer plans containing shared-side 
`LogicalFilter`s, or extract and classify those nested filters so shared-table 
predicates are kept above the window.



##########
fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/AggScalarSubQueryToWindowFunction.java:
##########
@@ -362,9 +478,81 @@ private Plan rewrite(LogicalFilter<? extends Plan> filter, 
LogicalApply<Plan, Pl
         windowFilterConjunct = ExpressionUtils.replace(windowFilterConjunct,
                 ImmutableMap.of(aggOut.toSlot(), aggOutExpr));
 
-        LogicalFilter<Plan> newFilter = 
filter.withConjunctsAndChild(conjuncts.get(true), apply.left());
+        // Split uncorrelated conjuncts: predicates that reference ONLY shared
+        // relation slots (tables appearing in both outer and inner plans) must
+        // stay ABOVE the window. Otherwise the window function would see a
+        // different set of rows than the original scalar subquery.
+        //
+        // For example, with fact(f) as shared table and dim(d) as outer-only:
+        //   f.v > 6        → shared-only → must stay above the window
+        //   d.tag > 0      → outer-only  → safe below the window
+        //   f.k = d.k      → join cond   → needed below the window
+        //
+        // We find shared tables by comparing table IDs that appear in both
+        // outer and inner plans, then collect ALL output slots of those
+        // tables (not just columns referenced in the inner query).
+        List<CatalogRelation> outerRels = outerPlans.stream()
+                .filter(CatalogRelation.class::isInstance)
+                .map(CatalogRelation.class::cast)
+                .collect(Collectors.toList());
+        List<CatalogRelation> innerRels = innerPlans.stream()
+                .filter(CatalogRelation.class::isInstance)
+                .map(CatalogRelation.class::cast)
+                .collect(Collectors.toList());
+        Set<Long> innerTableIds = innerRels.stream()
+                .map(r -> r.getTable().getId())
+                .collect(Collectors.toSet());
+        Set<ExprId> sharedOuterExprIds = outerRels.stream()
+                .filter(r -> innerTableIds.contains(r.getTable().getId()))
+                .flatMap(r -> r.getOutput().stream())
+                .map(Slot::getExprId)
+                .collect(Collectors.toSet());
+        Set<Expression> uncorrelatedConjuncts = conjuncts.get(true);
+        Set<Expression> belowWindowConjuncts = Sets.newHashSet();
+        Set<Expression> aboveWindowConjuncts = Sets.newHashSet();
+        if (uncorrelatedConjuncts != null) {
+            for (Expression conj : uncorrelatedConjuncts) {
+                // Conjuncts that were matched against inner subquery filter
+                // conjuncts (tracked by checkFilter) must stay BELOW the
+                // window.  They are semantically part of the inner aggregate's
+                // filter, not extra outer-only predicates.  Placing them above
+                // the window would let the window see more rows than the
+                // original scalar subquery, producing wrong aggregate results.
+                if (matchedInnerFilterConjuncts.contains(conj)) {
+                    belowWindowConjuncts.add(conj);
+                    continue;
+                }
+                // Any predicate that references shared-table slots (tables
+                // appearing in both outer and inner plans) must stay ABOVE
+                // the window.  This handles both shared-only predicates
+                // (e.g. f.v > 6) and mixed shared+outer predicates
+                // (e.g. f.v > d.tag).  Pushing them below would restrict
+                // the rows seen by the window function, producing a
+                // different aggregate than the original scalar subquery.
+                boolean hasShared = false;
+                for (ExprId id : conj.getInputSlotExprIds()) {
+                    if (sharedOuterExprIds.contains(id)) {
+                        hasShared = true;
+                        break;
+                    }
+                }
+                if (hasShared) {
+                    aboveWindowConjuncts.add(conj);
+                } else {
+                    // No shared-table references: the predicate involves only

Review Comment:
   This branch also treats slotless predicates as safe below the window. For 
example, `random() > 0.5` has an empty input-slot set, so `hasShared` remains 
false and it is added to `belowWindowConjuncts`. In the original plan:
   
   ```text
   Filter(random() > 0.5, f.k = d.k, f.v * 2 > sum_alias)
     Apply(...)
   ```
   
   the random predicate filters outer rows, but the scalar subquery still 
computes `SUM(f2.v)` over all matching inner `fact` rows. After this rewrite, 
the random predicate is below `LogicalWindow`, so the window aggregate is 
computed only over the randomly surviving rows in each partition. 
`PushDownFilterThroughWindow.canPushDown()` has the same empty-input hazard and 
explicitly rejects volatile predicates for this reason. Please reject volatile 
uncorrelated conjuncts here, or keep them above the window instead of treating 
them as outer-only filters.



##########
fe/fe-core/src/test/java/org/apache/doris/nereids/rules/rewrite/AggScalarSubQueryToWindowFunctionTest.java:
##########
@@ -338,6 +362,392 @@ public void testNotMatchTheRule() {
         }
     }
 
+    @Test
+    public void testWindowPartitionsByOuterOnlyRelationSlots() throws 
Exception {
+        // Use TPC-H Q17: correlated table is part (p_partkey is unique via 
constraint),
+        // fact table is lineitem. The window PARTITION BY should contain all 
output
+        // columns of the correlated table (part).
+        String sql = TPCHUtils.Q17;
+
+        Plan plan = PlanChecker.from(createCascadesContext(sql))
+                .analyze(sql)
+                .applyBottomUp(new PullUpProjectUnderApply())
+                .applyTopDown(new PushDownFilterThroughProject())
+                .customRewrite(new EliminateUnnecessaryProject())
+                .customRewrite(new AggScalarSubQueryToWindowFunction())
+                .getPlan();
+
+        List<LogicalWindow<Plan>> windows = 
plan.collectToList(LogicalWindow.class::isInstance);
+        Assertions.assertEquals(1, windows.size());
+
+        LogicalWindow<Plan> window = windows.get(0);
+        List<NamedExpression> windowExpressions = 
window.getWindowExpressions();
+        Assertions.assertEquals(1, windowExpressions.size());
+
+        WindowExpression windowExpression = (WindowExpression) 
windowExpressions.get(0).child(0);
+        Set<String> partitionKeys = 
windowExpression.getPartitionKeys().stream()
+                .map(Expression.class::cast)
+                .filter(Slot.class::isInstance)
+                .map(Slot.class::cast)
+                .map(Slot::getName)
+                .collect(Collectors.toSet());
+        // The window PARTITION BY should include the correlated column 
(p_partkey)
+        Assertions.assertTrue(partitionKeys.contains("p_partkey"),
+                "Expected partition keys to contain p_partkey, got: " + 
partitionKeys);
+    }
+
+    @Test
+    public void testSharedTablePredicatesStayAboveWindow() throws Exception {
+        createTable("CREATE TABLE fact_split (\n"
+                + "  id INT,\n"
+                + "  k INT,\n"
+                + "  v INT\n"
+                + ") ENGINE=OLAP\n"
+                + "DUPLICATE KEY(id)\n"
+                + "DISTRIBUTED BY HASH(id) BUCKETS 1\n"
+                + "PROPERTIES ('replication_num' = '1')");
+        createTable("CREATE TABLE dim_split (\n"
+                + "  did INT,\n"
+                + "  k INT NOT NULL,\n"
+                + "  tag INT\n"
+                + ") ENGINE=OLAP\n"
+                + "DUPLICATE KEY(did)\n"
+                + "DISTRIBUTED BY HASH(did) BUCKETS 1\n"
+                + "PROPERTIES ('replication_num' = '1')");
+        // Add UNIQUE constraint so the rule matches
+        addConstraint("alter table dim_split add constraint uq_dim_split_k 
unique (k)");
+
+        // Query with extra predicate on shared table (f.v > 6).
+        // This predicate must stay ABOVE the window, otherwise the window
+        // function would aggregate fewer rows than the original scalar 
subquery.
+        String sql = "SELECT d.did, d.k, d.tag, f.id, f.v "
+                + "FROM fact_split f, dim_split d "
+                + "WHERE f.k = d.k "
+                + "  AND f.v > 6"
+                + "  AND f.v * 2 > ("
+                + "    SELECT SUM(f2.v) "
+                + "    FROM fact_split f2 "
+                + "    WHERE f2.k = d.k"
+                + "  )";
+
+        Plan plan = PlanChecker.from(createCascadesContext(sql))
+                .analyze(sql)
+                .applyBottomUp(new PullUpProjectUnderApply())
+                .applyTopDown(new PushDownFilterThroughProject())
+                .customRewrite(new EliminateUnnecessaryProject())
+                .customRewrite(new AggScalarSubQueryToWindowFunction())
+                .getPlan();
+
+        // Rule should match and produce a window
+        Assertions.assertTrue(plan.anyMatch(LogicalWindow.class::isInstance),
+                "Rule should produce a window for this query");
+
+        // Collect the ExprIds of the shared table (fact_split – appears in 
both
+        // outer and inner plans).  Predicates that reference ONLY these 
ExprIds
+        // (shared-table-only filters) must stay ABOVE the window, otherwise 
the
+        // window function would see fewer rows than the original scalar 
subquery.
+        List<CatalogRelation> rels = 
plan.collectToList(CatalogRelation.class::isInstance);
+        Set<ExprId> sharedExprIds = rels.stream()
+                .filter(r -> r.getTable().getName().equals("fact_split"))
+                .flatMap(r -> r.getOutputExprIdSet().stream())
+                .collect(Collectors.toSet());
+
+        // Verify that no filter below the window contains a predicate whose
+        // input ExprIds are all from the shared table.  Such predicates
+        // (e.g. f.v > 6) must have been placed above the window.
+        List<LogicalWindow<Plan>> windows = 
plan.collectToList(LogicalWindow.class::isInstance);
+        LogicalWindow<Plan> window = windows.get(0);
+        Plan belowWindow = window.child(0);
+        List<LogicalFilter<Plan>> belowFilters = belowWindow
+                .collectToList(LogicalFilter.class::isInstance);
+        for (LogicalFilter<Plan> f : belowFilters) {
+            for (Expression conj : f.getConjuncts()) {
+                Set<ExprId> conjExprIds = conj.getInputSlotExprIds();
+                if (!conjExprIds.isEmpty() && 
sharedExprIds.containsAll(conjExprIds)) {
+                    Assertions.fail(
+                            "Shared-table-only predicate should not be below 
window: " + conj.toSql());
+                }
+            }
+        }
+    }
+
+    @Test
+    public void testMixedSharedOuterPredicatesStayAboveWindow() throws 
Exception {
+        // Mixed predicates that reference both shared-table and 
outer-only-table
+        // columns (e.g. f.v > d.tag) must stay ABOVE the window.  Pushing them
+        // below would restrict the rows seen by the window function, producing
+        // a different aggregate than the original scalar subquery.
+        //
+        // Input plan shape:
+        //   Filter(f.v > d.tag, f.v * 2 > sum_alias)       ← mixed + 
correlated
+        //     Apply(correlation: d.k)
+        //       CrossJoin
+        //         Scan fact f
+        //         Scan dim d   -- d.k is unique (constraint)
+        //       Aggregate(sum(f2.v) AS sum_alias)
+        //         Filter(f2.k = d.k)
+        //           Scan fact f2
+        //
+        // Output plan shape:
+        //   Filter(f.v > d.tag, f.v * 2 > sum_over_window)  ← mixed stays 
ABOVE
+        //     Window(sum(v) OVER (PARTITION BY d.k))
+        //       Filter(f.k = d.k)                           ← join cond BELOW
+        //         CrossJoin
+        //           Scan fact f
+        //           Scan dim d
+        createTable("CREATE TABLE fact_mixed (\n"
+                + "  id INT,\n"
+                + "  k INT,\n"
+                + "  v INT\n"
+                + ") ENGINE=OLAP\n"
+                + "DUPLICATE KEY(id)\n"
+                + "DISTRIBUTED BY HASH(id) BUCKETS 1\n"
+                + "PROPERTIES ('replication_num' = '1')");
+        createTable("CREATE TABLE dim_mixed (\n"
+                + "  did INT,\n"
+                + "  k INT NOT NULL,\n"
+                + "  tag INT\n"
+                + ") ENGINE=OLAP\n"
+                + "DUPLICATE KEY(did)\n"
+                + "DISTRIBUTED BY HASH(did) BUCKETS 1\n"
+                + "PROPERTIES ('replication_num' = '1')");
+        addConstraint("alter table dim_mixed add constraint uq_dim_mixed_k 
unique (k)");
+
+        // Mixed predicate: f.v > d.tag references both shared (f.v) and
+        // outer-only (d.tag) columns.  It must stay ABOVE the window.
+        String sql = "SELECT d.did, d.k, d.tag, f.id, f.v "
+                + "FROM fact_mixed f, dim_mixed d "
+                + "WHERE f.k = d.k "
+                + "  AND f.v > d.tag"
+                + "  AND f.v * 2 > ("
+                + "    SELECT SUM(f2.v) "
+                + "    FROM fact_mixed f2 "
+                + "    WHERE f2.k = d.k"
+                + "  )";
+
+        Plan plan = PlanChecker.from(createCascadesContext(sql))
+                .analyze(sql)
+                .applyBottomUp(new PullUpProjectUnderApply())
+                .applyTopDown(new PushDownFilterThroughProject())
+                .customRewrite(new EliminateUnnecessaryProject())
+                .customRewrite(new AggScalarSubQueryToWindowFunction())
+                .getPlan();
+
+        // Rule should match and produce a window
+        Assertions.assertTrue(plan.anyMatch(LogicalWindow.class::isInstance),
+                "Rule should produce a window for this query");
+
+        // Collect shared table (fact_mixed) ExprIds
+        List<CatalogRelation> rels = 
plan.collectToList(CatalogRelation.class::isInstance);
+        Set<ExprId> sharedExprIds = rels.stream()
+                .filter(r -> r.getTable().getName().equals("fact_mixed"))
+                .flatMap(r -> r.getOutputExprIdSet().stream())
+                .collect(Collectors.toSet());
+
+        // Verify: the mixed predicate f.v > d.tag must NOT be below the 
window.
+        // It should appear in a filter ABOVE the window.  We verify by 
checking
+        // that no filter directly below the window contains predicates that
+        // reference ONLY shared table columns.
+        List<LogicalWindow<Plan>> windows = 
plan.collectToList(LogicalWindow.class::isInstance);
+        LogicalWindow<Plan> window = windows.get(0);
+        Plan belowWindow = window.child(0);
+        List<LogicalFilter<Plan>> belowFilters = belowWindow
+                .collectToList(LogicalFilter.class::isInstance);
+        for (LogicalFilter<Plan> f : belowFilters) {
+            for (Expression conj : f.getConjuncts()) {
+                Set<ExprId> conjExprIds = conj.getInputSlotExprIds();
+                if (!conjExprIds.isEmpty() && 
sharedExprIds.containsAll(conjExprIds)) {
+                    Assertions.fail(

Review Comment:
   This test still would not catch the mixed-predicate regression it describes. 
The query uses `f.v > d.tag`, but the assertion below only fails when 
`sharedExprIds.containsAll(conjExprIds)` is true. A misplaced mixed predicate 
has both the shared slot `fact_mixed.v` and the outer-only slot 
`dim_mixed.tag`, so its input set is not contained in `sharedExprIds` and the 
test would pass even if `f.v > d.tag` stayed below the window. Please assert on 
the exact mixed predicate shape or input ExprIds and verify that it is absent 
from the below-window filters, and ideally present in the above-window filter.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to