github-actions[bot] commented on code in PR #63763:
URL: https://github.com/apache/doris/pull/63763#discussion_r3458163847


##########
fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/AggScalarSubQueryToWindowFunction.java:
##########
@@ -362,17 +483,207 @@ private Plan rewrite(LogicalFilter<? extends Plan> 
filter, LogicalApply<Plan, Pl
         windowFilterConjunct = ExpressionUtils.replace(windowFilterConjunct,
                 ImmutableMap.of(aggOut.toSlot(), aggOutExpr));
 
-        LogicalFilter<Plan> newFilter = 
filter.withConjunctsAndChild(conjuncts.get(true), apply.left());
+        // Split uncorrelated conjuncts: predicates that reference ONLY shared
+        // relation slots (tables appearing in both outer and inner plans) must
+        // stay ABOVE the window. Otherwise the window function would see a
+        // different set of rows than the original scalar subquery.
+        //
+        // For example, with fact(f) as shared table and dim(d) as outer-only:
+        //   f.v > 6        → shared-only → must stay above the window
+        //   d.tag > 0      → outer-only  → safe below the window
+        //   f.k = d.k      → join cond   → needed below the window
+        //
+        // We find shared tables by comparing table IDs that appear in both
+        // outer and inner plans, then collect ALL output slots of those
+        // tables (not just columns referenced in the inner query).
+        List<CatalogRelation> outerRels = outerPlans.stream()
+                .filter(CatalogRelation.class::isInstance)
+                .map(CatalogRelation.class::cast)
+                .collect(Collectors.toList());
+        List<CatalogRelation> innerRels = innerPlans.stream()
+                .filter(CatalogRelation.class::isInstance)
+                .map(CatalogRelation.class::cast)
+                .collect(Collectors.toList());
+        Set<Long> innerTableIds = innerRels.stream()
+                .map(r -> r.getTable().getId())
+                .collect(Collectors.toSet());
+        Set<ExprId> sharedOuterExprIds = outerRels.stream()
+                .filter(r -> innerTableIds.contains(r.getTable().getId()))
+                .flatMap(r -> r.getOutput().stream())
+                .map(Slot::getExprId)
+                .collect(Collectors.toSet());
+        Set<Expression> uncorrelatedConjuncts = conjuncts.get(true);
+        Set<Expression> belowWindowConjuncts = Sets.newHashSet();
+        Set<Expression> aboveWindowConjuncts = Sets.newHashSet();
+        if (uncorrelatedConjuncts != null) {
+            for (Expression conj : uncorrelatedConjuncts) {
+                // Conjuncts that were matched against inner subquery filter
+                // conjuncts (tracked by checkFilter) must stay BELOW the
+                // window.  They are semantically part of the inner aggregate's
+                // filter, not extra outer-only predicates.  Placing them above
+                // the window would let the window see more rows than the
+                // original scalar subquery, producing wrong aggregate results.
+                if (matchedInnerFilterConjuncts.contains(conj)) {
+                    belowWindowConjuncts.add(conj);
+                    continue;
+                }
+                // Volatile predicates (e.g. random() > 0.5) must stay ABOVE
+                // the window.  Pushing them below would let the window
+                // aggregate over a different set of rows per partition than
+                // the original scalar subquery.  PushDownFilterThroughWindow
+                // has the same hazard and explicitly rejects volatile
+                // predicates for this reason.
+                if (conj.containsVolatileExpression()) {
+                    aboveWindowConjuncts.add(conj);
+                    continue;
+                }
+                // Any predicate that references shared-table slots (tables
+                // appearing in both outer and inner plans) must stay ABOVE
+                // the window.  This handles both shared-only predicates
+                // (e.g. f.v > 6) and mixed shared+outer predicates
+                // (e.g. f.v > d.tag).  Pushing them below would restrict
+                // the rows seen by the window function, producing a
+                // different aggregate than the original scalar subquery.
+                boolean hasShared = false;
+                for (ExprId id : conj.getInputSlotExprIds()) {
+                    if (sharedOuterExprIds.contains(id)) {
+                        hasShared = true;
+                        break;
+                    }
+                }
+                if (hasShared) {
+                    aboveWindowConjuncts.add(conj);
+                } else {
+                    // No shared-table references: the predicate involves only
+                    // outer-only table columns.  Safe to place below the 
window.
+                    belowWindowConjuncts.add(conj);
+                }
+            }
+        }
+
+        // Extract and classify conjuncts from any LogicalFilter nodes nested
+        // inside the outer child (apply.left()).  Nested shared-table filters
+        // (e.g. f.v > 6 under a CrossJoin) must be hoisted ABOVE the window;
+        // otherwise the window would aggregate over a filtered subset of rows
+        // while the original scalar subquery sees all rows for the key.
+        //
+        // Nested outer-only filters (e.g. d.tag > 0) are safe below and can
+        // stay in place after the filters are stripped.
+        List<LogicalFilter<Plan>> nestedOuterFilters = apply.left()
+                .collectToList(LogicalFilter.class::isInstance);
+        Set<ExprId> extractedConjunctExprIds = Sets.newHashSet();
+        for (LogicalFilter<Plan> nf : nestedOuterFilters) {
+            for (Expression conj : nf.getConjuncts()) {

Review Comment:
   For nested volatile filters this moves the predicate across a join, not only 
across the window. A shape like `CrossJoin(fact, (select * from dim_unique 
where random() > 0.5) d)` originally evaluates `random()` once per `dim_unique` 
row before that row is duplicated by matching fact rows. This branch strips 
that nested filter and later puts the volatile conjunct in the top filter above 
the window/join, so the same `random()` is evaluated once per joined fact row; 
one dim row with two matching fact rows can now keep one row instead of both or 
none. Please reject nested volatile filters, or keep them at their original 
child position while only extracting deterministic predicates.



##########
fe/fe-core/src/test/java/org/apache/doris/nereids/rules/rewrite/AggScalarSubQueryToWindowFunctionTest.java:
##########
@@ -338,6 +362,767 @@ public void testNotMatchTheRule() {
         }
     }
 
+    @Test
+    public void testWindowPartitionsByOuterOnlyRelationSlots() throws 
Exception {
+        // Use TPC-H Q17: correlated table is part (p_partkey is unique via 
constraint),
+        // fact table is lineitem. The window PARTITION BY should contain all 
output
+        // columns of the correlated table (part).
+        String sql = TPCHUtils.Q17;
+
+        Plan plan = PlanChecker.from(createCascadesContext(sql))
+                .analyze(sql)
+                .applyBottomUp(new PullUpProjectUnderApply())
+                .applyTopDown(new PushDownFilterThroughProject())
+                .customRewrite(new EliminateUnnecessaryProject())
+                .customRewrite(new AggScalarSubQueryToWindowFunction())
+                .getPlan();
+
+        List<LogicalWindow<Plan>> windows = 
plan.collectToList(LogicalWindow.class::isInstance);
+        Assertions.assertEquals(1, windows.size());
+
+        LogicalWindow<Plan> window = windows.get(0);
+        List<NamedExpression> windowExpressions = 
window.getWindowExpressions();
+        Assertions.assertEquals(1, windowExpressions.size());
+
+        WindowExpression windowExpression = (WindowExpression) 
windowExpressions.get(0).child(0);
+        Set<String> partitionKeys = 
windowExpression.getPartitionKeys().stream()
+                .map(Expression.class::cast)
+                .filter(Slot.class::isInstance)
+                .map(Slot.class::cast)
+                .map(Slot::getName)
+                .collect(Collectors.toSet());
+        // The window PARTITION BY should include the correlated column 
(p_partkey)
+        Assertions.assertTrue(partitionKeys.contains("p_partkey"),
+                "Expected partition keys to contain p_partkey, got: " + 
partitionKeys);
+    }
+
+    @Test
+    public void testSharedTablePredicatesStayAboveWindow() throws Exception {
+        createTable("CREATE TABLE fact_split (\n"
+                + "  id INT,\n"
+                + "  k INT,\n"
+                + "  v INT\n"
+                + ") ENGINE=OLAP\n"
+                + "DUPLICATE KEY(id)\n"
+                + "DISTRIBUTED BY HASH(id) BUCKETS 1\n"
+                + "PROPERTIES ('replication_num' = '1')");
+        createTable("CREATE TABLE dim_split (\n"
+                + "  did INT,\n"
+                + "  k INT NOT NULL,\n"
+                + "  tag INT\n"
+                + ") ENGINE=OLAP\n"
+                + "DUPLICATE KEY(did)\n"
+                + "DISTRIBUTED BY HASH(did) BUCKETS 1\n"
+                + "PROPERTIES ('replication_num' = '1')");
+        // Add UNIQUE constraint so the rule matches
+        addConstraint("alter table dim_split add constraint uq_dim_split_k 
unique (k)");
+
+        // Query with extra predicate on shared table (f.v > 6).
+        // This predicate must stay ABOVE the window, otherwise the window
+        // function would aggregate fewer rows than the original scalar 
subquery.
+        String sql = "SELECT d.did, d.k, d.tag, f.id, f.v "
+                + "FROM fact_split f, dim_split d "
+                + "WHERE f.k = d.k "
+                + "  AND f.v > 6"
+                + "  AND f.v * 2 > ("
+                + "    SELECT SUM(f2.v) "
+                + "    FROM fact_split f2 "
+                + "    WHERE f2.k = d.k"
+                + "  )";
+
+        Plan plan = PlanChecker.from(createCascadesContext(sql))
+                .analyze(sql)
+                .applyBottomUp(new PullUpProjectUnderApply())
+                .applyTopDown(new PushDownFilterThroughProject())
+                .customRewrite(new EliminateUnnecessaryProject())
+                .customRewrite(new AggScalarSubQueryToWindowFunction())
+                .getPlan();
+
+        // Rule should match and produce a window
+        Assertions.assertTrue(plan.anyMatch(LogicalWindow.class::isInstance),
+                "Rule should produce a window for this query");
+
+        // Collect the ExprIds of the shared table (fact_split – appears in 
both
+        // outer and inner plans).  Predicates that reference ONLY these 
ExprIds
+        // (shared-table-only filters) must stay ABOVE the window, otherwise 
the
+        // window function would see fewer rows than the original scalar 
subquery.
+        List<CatalogRelation> rels = 
plan.collectToList(CatalogRelation.class::isInstance);
+        Set<ExprId> sharedExprIds = rels.stream()
+                .filter(r -> r.getTable().getName().equals("fact_split"))
+                .flatMap(r -> r.getOutputExprIdSet().stream())
+                .collect(Collectors.toSet());
+
+        // Verify that no filter below the window contains a predicate whose
+        // input ExprIds are all from the shared table.  Such predicates
+        // (e.g. f.v > 6) must have been placed above the window.
+        List<LogicalWindow<Plan>> windows = 
plan.collectToList(LogicalWindow.class::isInstance);
+        LogicalWindow<Plan> window = windows.get(0);
+        Plan belowWindow = window.child(0);
+        List<LogicalFilter<Plan>> belowFilters = belowWindow
+                .collectToList(LogicalFilter.class::isInstance);
+        for (LogicalFilter<Plan> f : belowFilters) {
+            for (Expression conj : f.getConjuncts()) {
+                Set<ExprId> conjExprIds = conj.getInputSlotExprIds();
+                if (!conjExprIds.isEmpty() && 
sharedExprIds.containsAll(conjExprIds)) {
+                    Assertions.fail(
+                            "Shared-table-only predicate should not be below 
window: " + conj.toSql());
+                }
+            }
+        }
+    }
+
+    @Test
+    public void testMixedSharedOuterPredicatesStayAboveWindow() throws 
Exception {
+        // Mixed predicates that reference both shared-table and 
outer-only-table
+        // columns (e.g. f.v > d.tag) must stay ABOVE the window.  Pushing them
+        // below would restrict the rows seen by the window function, producing
+        // a different aggregate than the original scalar subquery.
+        //
+        // Input plan shape:
+        //   Filter(f.v > d.tag, f.v * 2 > sum_alias)       ← mixed + 
correlated
+        //     Apply(correlation: d.k)
+        //       CrossJoin
+        //         Scan fact f
+        //         Scan dim d   -- d.k is unique (constraint)
+        //       Aggregate(sum(f2.v) AS sum_alias)
+        //         Filter(f2.k = d.k)
+        //           Scan fact f2
+        //
+        // Output plan shape:
+        //   Filter(f.v > d.tag, f.v * 2 > sum_over_window)  ← mixed stays 
ABOVE
+        //     Window(sum(v) OVER (PARTITION BY d.k))
+        //       Filter(f.k = d.k)                           ← join cond BELOW
+        //         CrossJoin
+        //           Scan fact f
+        //           Scan dim d
+        createTable("CREATE TABLE fact_mixed (\n"
+                + "  id INT,\n"
+                + "  k INT,\n"
+                + "  v INT\n"
+                + ") ENGINE=OLAP\n"
+                + "DUPLICATE KEY(id)\n"
+                + "DISTRIBUTED BY HASH(id) BUCKETS 1\n"
+                + "PROPERTIES ('replication_num' = '1')");
+        createTable("CREATE TABLE dim_mixed (\n"
+                + "  did INT,\n"
+                + "  k INT NOT NULL,\n"
+                + "  tag INT\n"
+                + ") ENGINE=OLAP\n"
+                + "DUPLICATE KEY(did)\n"
+                + "DISTRIBUTED BY HASH(did) BUCKETS 1\n"
+                + "PROPERTIES ('replication_num' = '1')");
+        addConstraint("alter table dim_mixed add constraint uq_dim_mixed_k 
unique (k)");
+
+        // Mixed predicate: f.v > d.tag references both shared (f.v) and
+        // outer-only (d.tag) columns.  It must stay ABOVE the window.
+        String sql = "SELECT d.did, d.k, d.tag, f.id, f.v "
+                + "FROM fact_mixed f, dim_mixed d "
+                + "WHERE f.k = d.k "
+                + "  AND f.v > d.tag"
+                + "  AND f.v * 2 > ("
+                + "    SELECT SUM(f2.v) "
+                + "    FROM fact_mixed f2 "
+                + "    WHERE f2.k = d.k"
+                + "  )";
+
+        Plan plan = PlanChecker.from(createCascadesContext(sql))
+                .analyze(sql)
+                .applyBottomUp(new PullUpProjectUnderApply())
+                .applyTopDown(new PushDownFilterThroughProject())
+                .customRewrite(new EliminateUnnecessaryProject())
+                .customRewrite(new AggScalarSubQueryToWindowFunction())
+                .getPlan();
+
+        // Rule should match and produce a window
+        Assertions.assertTrue(plan.anyMatch(LogicalWindow.class::isInstance),
+                "Rule should produce a window for this query");
+
+        // Collect shared table (fact_mixed) ExprIds.
+        // A mixed predicate like f.v > d.tag references BOTH shared and
+        // outer-only sets.
+        List<CatalogRelation> rels = 
plan.collectToList(CatalogRelation.class::isInstance);
+        Set<ExprId> sharedExprIds = rels.stream()
+                .filter(r -> r.getTable().getName().equals("fact_mixed"))
+                .flatMap(r -> r.getOutputExprIdSet().stream())
+                .collect(Collectors.toSet());
+
+        // Verify: the mixed predicate f.v > d.tag must NOT be below the 
window.
+        // A mixed predicate references slots from BOTH shared and outer-only
+        // tables.  We detect it by checking that at least one ExprId is in the
+        // shared set AND at least one is outside the shared set.
+        List<LogicalWindow<Plan>> windows = 
plan.collectToList(LogicalWindow.class::isInstance);
+        LogicalWindow<Plan> window = windows.get(0);
+        Plan belowWindow = window.child(0);
+        List<LogicalFilter<Plan>> belowFilters = belowWindow
+                .collectToList(LogicalFilter.class::isInstance);
+        for (LogicalFilter<Plan> f : belowFilters) {
+            for (Expression conj : f.getConjuncts()) {
+                Set<ExprId> conjExprIds = conj.getInputSlotExprIds();
+                if (conjExprIds.isEmpty()) {
+                    continue;
+                }
+                boolean hasShared = false;
+                boolean hasNonShared = false;
+                for (ExprId id : conjExprIds) {
+                    if (sharedExprIds.contains(id)) {
+                        hasShared = true;
+                    } else {
+                        hasNonShared = true;
+                    }
+                }
+                if (hasShared && hasNonShared) {
+                    // Join conditions (f.k = d.k) are expected below the
+                    // window — they are matched from the inner filter and
+                    // needed for the join.  Only flag non-equality mixed
+                    // predicates like f.v > d.tag.
+                    if (!(conj instanceof 
org.apache.doris.nereids.trees.expressions.EqualPredicate)) {
+                        Assertions.fail(
+                                "Mixed shared+outer predicate should not be 
below window: "
+                                + conj.toSql());
+                    }
+                }
+                if (!hasNonShared) {
+                    // Shared-table-only predicate also belongs ABOVE.
+                    Assertions.fail(
+                            "Shared-table-only predicate should not be below 
window: "
+                            + conj.toSql());
+                }
+            }
+        }
+
+        // Verify the mixed predicate IS present in a filter ABOVE the window.
+        // Collect all filters above the window by excluding below-window 
filters.
+        List<LogicalFilter<Plan>> allFilters = plan
+                .collectToList(LogicalFilter.class::isInstance);
+        List<LogicalFilter<Plan>> aboveFilters = allFilters.stream()
+                .filter(f -> !belowFilters.contains(f))
+                .collect(Collectors.toList());
+        boolean foundMixedAbove = false;
+        for (LogicalFilter<Plan> f : aboveFilters) {
+            for (Expression conj : f.getConjuncts()) {
+                Set<ExprId> conjExprIds = conj.getInputSlotExprIds();
+                boolean hasShared = false;
+                boolean hasNonShared = false;
+                for (ExprId id : conjExprIds) {
+                    if (sharedExprIds.contains(id)) {
+                        hasShared = true;
+                    } else {
+                        hasNonShared = true;
+                    }
+                }
+                if (hasShared && hasNonShared) {
+                    foundMixedAbove = true;
+                }
+            }
+        }
+        Assertions.assertTrue(foundMixedAbove,
+                "Mixed predicate f.v > d.tag should be above the window");
+    }
+
+    @Test
+    public void testNestedFilterUnderProjectDoesNotBreakRewrite() throws 
Exception {
+        // Regression test: when a nested filter sits below a pruning project,
+        // the ensureProjectOutput() method in rewrite() must expand the
+        // project to carry through columns referenced by extracted filter
+        // conjuncts.  Without the fix, the reinserted predicates would have
+        // dangling slot references.
+        //
+        // We use a subquery to create the project boundary.  The inner block
+        // projects all needed columns; the outer SELECT prunes some, creating
+        // a project that drops columns referenced by the nested filter.
+        createTable("CREATE TABLE fact_proj (\n"
+                + "  id INT,\n"
+                + "  k INT,\n"
+                + "  v INT\n"
+                + ") ENGINE=OLAP\n"
+                + "DUPLICATE KEY(id)\n"
+                + "DISTRIBUTED BY HASH(id) BUCKETS 1\n"
+                + "PROPERTIES ('replication_num' = '1')");
+        createTable("CREATE TABLE dim_proj (\n"
+                + "  did INT,\n"
+                + "  k INT NOT NULL,\n"
+                + "  tag INT\n"
+                + ") ENGINE=OLAP\n"
+                + "DUPLICATE KEY(did)\n"
+                + "DISTRIBUTED BY HASH(did) BUCKETS 1\n"
+                + "PROPERTIES ('replication_num' = '1')");
+        addConstraint("alter table dim_proj add constraint uq_dim_proj_k 
unique (k)");
+
+        // The inner SELECT selects all columns including f.v, d.did, d.k, 
d.tag.
+        // The outer SELECT selects only f.id, d.k — pruning f.v, d.did, d.tag.
+        // The WHERE uses the pruned columns (f.v > 6), creating a nested 
filter
+        // below the pruning project.
+        String sql = "SELECT t.id, t.k "
+                + "FROM ("
+                + "    SELECT f.id, d.did, d.k, d.tag, f.v "
+                + "    FROM fact_proj f, dim_proj d "
+                + "    WHERE f.k = d.k"
+                + "      AND f.v > 6"
+                + ") t "
+                + "WHERE t.v * 2 > ("
+                + "    SELECT SUM(f2.v) "
+                + "    FROM fact_proj f2 "
+                + "    WHERE f2.k = t.k"
+                + "  )";
+
+        // The subquery alias 't' changes table identity, which may prevent
+        // checkRelation() from matching.  In that case the rule won't match
+        // and the regression is not triggered.  The test documents the
+        // expected behavior: if the rule matches, the project must be 
expanded.
+        Plan plan = PlanChecker.from(createCascadesContext(sql))
+                .analyze(sql)
+                .applyBottomUp(new PullUpProjectUnderApply())
+                .applyTopDown(new PushDownFilterThroughProject())
+                .customRewrite(new EliminateUnnecessaryProject())
+                .customRewrite(new AggScalarSubQueryToWindowFunction())
+                .getPlan();
+
+        // Whether or not the rule matches the subquery-alias case, the plan
+        // must be well-formed (no dangling references).
+        Assertions.assertFalse(
+                plan.anyMatch(p -> 
p.getClass().getSimpleName().equals("LogicalEmptyRelation")),
+                "Plan should not contain unresolved nodes");
+    }
+
+    @Test
+    public void testNotMatchWhenCorrelatedTableNotUnique() throws Exception {
+        createTable("CREATE TABLE tpch.fact_dup (\n"
+                + "  id INT,\n"
+                + "  k INT,\n"
+                + "  v INT\n"
+                + ") ENGINE=OLAP\n"
+                + "DUPLICATE KEY(id)\n"
+                + "DISTRIBUTED BY HASH(id) BUCKETS 1\n"
+                + "PROPERTIES ('replication_num' = '1')");
+        createTable("CREATE TABLE tpch.dim_dup (\n"
+                + "  did INT,\n"
+                + "  k INT,\n"
+                + "  tag INT\n"
+                + ") ENGINE=OLAP\n"
+                + "DUPLICATE KEY(did)\n"
+                + "DISTRIBUTED BY HASH(did) BUCKETS 1\n"
+                + "PROPERTIES ('replication_num' = '1')");
+
+        String sql = "SELECT d.did, d.k, d.tag, f.id, f.v "
+                + "FROM fact_dup f, dim_dup d "
+                + "WHERE f.k = d.k "
+                + "  AND f.v * 2 > ("
+                + "    SELECT SUM(f2.v) "
+                + "    FROM fact_dup f2 "
+                + "    WHERE f2.k = d.k"
+                + "  )";
+
+        Plan plan = PlanChecker.from(createCascadesContext(sql))
+                .analyze(sql)
+                .applyBottomUp(new PullUpProjectUnderApply())
+                .applyTopDown(new PushDownFilterThroughProject())
+                .customRewrite(new EliminateUnnecessaryProject())
+                .customRewrite(new AggScalarSubQueryToWindowFunction())
+                .getPlan();
+
+        // DUPLICATE KEY table does not guarantee uniqueness, rule should not 
match
+        Assertions.assertFalse(plan.anyMatch(LogicalWindow.class::isInstance));
+    }
+
+    @Test
+    public void testUniqueKeyModelTriggersRewrite() throws Exception {
+        // UNIQUE KEY model tables guarantee uniqueness + non-null on the key
+        // column.  DataTrait recognizes this even without an explicit
+        // ADD CONSTRAINT, so the rule should fire.
+        createTable("CREATE TABLE tpch.fact_ukey (\n"
+                + "  id INT,\n"
+                + "  k INT,\n"
+                + "  v INT\n"
+                + ") ENGINE=OLAP\n"
+                + "DUPLICATE KEY(id)\n"
+                + "DISTRIBUTED BY HASH(id) BUCKETS 1\n"
+                + "PROPERTIES ('replication_num' = '1')");
+        // dim_ukey has UNIQUE KEY(k) with k INT NOT NULL — this implies
+        // unique + non-null without needing an explicit constraint.
+        createTable("CREATE TABLE tpch.dim_ukey (\n"
+                + "  k INT NOT NULL,\n"
+                + "  did INT,\n"
+                + "  tag INT\n"
+                + ") ENGINE=OLAP\n"
+                + "UNIQUE KEY(k)\n"
+                + "DISTRIBUTED BY HASH(k) BUCKETS 1\n"
+                + "PROPERTIES ('replication_num' = '1')");
+
+        String sql = "SELECT d.did, d.k, d.tag, f.id, f.v "
+                + "FROM fact_ukey f, dim_ukey d "
+                + "WHERE f.k = d.k "
+                + "  AND f.v * 2 > ("
+                + "    SELECT SUM(f2.v) "
+                + "    FROM fact_ukey f2 "
+                + "    WHERE f2.k = d.k"
+                + "  )";
+
+        Plan plan = PlanChecker.from(createCascadesContext(sql))
+                .analyze(sql)
+                .applyBottomUp(new PullUpProjectUnderApply())
+                .applyTopDown(new PushDownFilterThroughProject())
+                .customRewrite(new EliminateUnnecessaryProject())
+                .customRewrite(new AggScalarSubQueryToWindowFunction())
+                .getPlan();
+
+        // UNIQUE KEY model alone provides uniqueness + non-null.
+        Assertions.assertTrue(plan.anyMatch(LogicalWindow.class::isInstance),
+                "UNIQUE KEY model should trigger the window rewrite");
+    }
+
+    @Test
+    public void testNotMatchWhenOuterOnlyRelationOutputIsPruned() throws 
Exception {
+        createTable("CREATE TABLE tpch.fact_window_pruned (\n"
+                + "  id INT,\n"
+                + "  k INT,\n"
+                + "  v INT\n"
+                + ") ENGINE=OLAP\n"
+                + "DUPLICATE KEY(id)\n"
+                + "DISTRIBUTED BY HASH(id) BUCKETS 1\n"
+                + "PROPERTIES ('replication_num' = '1')");
+        createTable("CREATE TABLE tpch.dim_window_pruned (\n"
+                + "  did INT,\n"
+                + "  k INT,\n"
+                + "  tag INT\n"
+                + ") ENGINE=OLAP\n"
+                + "DUPLICATE KEY(did)\n"
+                + "DISTRIBUTED BY HASH(did) BUCKETS 1\n"
+                + "PROPERTIES ('replication_num' = '1')");
+
+        String sql = "SELECT t.id, t.k, t.v "
+                + "FROM ("
+                + "    SELECT f.id, d.k, f.v "
+                + "    FROM fact_window_pruned f, dim_window_pruned d "
+                + "    WHERE f.k = d.k"
+                + ") t "
+                + "WHERE t.v * 2 > ("
+                + "    SELECT SUM(f2.v) "
+                + "    FROM fact_window_pruned f2 "
+                + "    WHERE f2.k = t.k"
+                + "  )";
+
+        Plan plan = PlanChecker.from(createCascadesContext(sql))
+                .analyze(sql)
+                .applyBottomUp(new PullUpProjectUnderApply())
+                .applyTopDown(new PushDownFilterThroughProject())
+                .customRewrite(new EliminateUnnecessaryProject())
+                .customRewrite(new AggScalarSubQueryToWindowFunction())
+                .getPlan();
+
+        Assertions.assertFalse(plan.anyMatch(LogicalWindow.class::isInstance));
+    }
+
+    @Test
+    public void testNestedOuterFilterHoistedAboveWindow() throws Exception {
+        // When the outer child of Apply contains a nested LogicalFilter
+        // (e.g. a filter pushed into the FROM subquery like
+        // FROM (SELECT * FROM fact WHERE v > 6) sf), the rule must extract
+        // the nested conjuncts, classify them, and hoist shared-table
+        // predicates ABOVE the window.  Otherwise the window would aggregate
+        // over a filtered subset, while the original scalar subquery computes
+        // over ALL fact rows for the key.
+        createTable("CREATE TABLE fact_nested (\n"
+                + "  id INT,\n"
+                + "  k INT,\n"
+                + "  v INT\n"
+                + ") ENGINE=OLAP\n"
+                + "DUPLICATE KEY(id)\n"
+                + "DISTRIBUTED BY HASH(id) BUCKETS 1\n"
+                + "PROPERTIES ('replication_num' = '1')");
+        createTable("CREATE TABLE dim_nested (\n"
+                + "  did INT,\n"
+                + "  k INT NOT NULL,\n"
+                + "  tag INT\n"
+                + ") ENGINE=OLAP\n"
+                + "DUPLICATE KEY(did)\n"
+                + "DISTRIBUTED BY HASH(did) BUCKETS 1\n"
+                + "PROPERTIES ('replication_num' = '1')");
+        addConstraint("alter table dim_nested add constraint uq_dim_nested_k 
unique (k)");
+
+        // The FROM-subquery with WHERE v > 6 produces a LogicalFilter(f.v > 6)
+        // nested inside apply.left() under the LogicalSubQueryAlias.
+        String sql = "SELECT d.did, sf.id, sf.k, sf.v "
+                + "FROM (SELECT id, k, v FROM fact_nested WHERE v > 6) sf, 
dim_nested d "
+                + "WHERE sf.k = d.k "
+                + "  AND sf.v * 2 > ("
+                + "    SELECT SUM(f2.v) "
+                + "    FROM fact_nested f2 "
+                + "    WHERE f2.k = d.k"
+                + "  )";
+
+        Plan plan = PlanChecker.from(createCascadesContext(sql))
+                .analyze(sql)
+                .applyBottomUp(new PullUpProjectUnderApply())
+                .customRewrite(new EliminateUnnecessaryProject())
+                .customRewrite(new AggScalarSubQueryToWindowFunction())
+                .getPlan();
+
+        // Rule should match and produce a window
+        Assertions.assertTrue(plan.anyMatch(LogicalWindow.class::isInstance),
+                "Rule should produce a window when nested outer filter is 
present");
+
+        // The nested shared-table predicate (v > 6) must be hoisted ABOVE the
+        // window.  Verify no shared-table-only predicate is below the window.
+        List<CatalogRelation> rels = 
plan.collectToList(CatalogRelation.class::isInstance);
+        Set<ExprId> factExprIds = rels.stream()
+                .filter(r -> r.getTable().getName().equals("fact_nested"))
+                .flatMap(r -> r.getOutputExprIdSet().stream())
+                .collect(Collectors.toSet());
+        List<LogicalWindow<Plan>> windows = 
plan.collectToList(LogicalWindow.class::isInstance);
+        LogicalWindow<Plan> window = windows.get(0);
+        Plan belowWindow = window.child(0);
+        List<LogicalFilter<Plan>> belowFilters = belowWindow
+                .collectToList(LogicalFilter.class::isInstance);
+        for (LogicalFilter<Plan> f : belowFilters) {
+            for (Expression conj : f.getConjuncts()) {
+                Set<ExprId> conjExprIds = conj.getInputSlotExprIds();
+                if (!conjExprIds.isEmpty() && 
factExprIds.containsAll(conjExprIds)) {
+                    Assertions.fail(
+                            "Nested shared-table predicate should be hoisted 
above window: "
+                            + conj.toSql());
+                }
+            }
+        }
+    }
+
+    @Test
+    public void testVolatilePredicateStaysAboveWindow() throws Exception {
+        // Volatile predicates like random() > 0.5 have no table column
+        // references but are non-deterministic.  They must stay ABOVE the
+        // window, otherwise the window function would aggregate over a
+        // different set of rows per partition than the original scalar
+        // subquery.  This follows the same principle as
+        // PushDownFilterThroughWindow.canPushDown().
+        createTable("CREATE TABLE fact_volatile (\n"
+                + "  id INT,\n"
+                + "  k INT,\n"
+                + "  v INT\n"
+                + ") ENGINE=OLAP\n"
+                + "DUPLICATE KEY(id)\n"
+                + "DISTRIBUTED BY HASH(id) BUCKETS 1\n"
+                + "PROPERTIES ('replication_num' = '1')");
+        createTable("CREATE TABLE dim_volatile (\n"
+                + "  did INT,\n"
+                + "  k INT NOT NULL,\n"
+                + "  tag INT\n"
+                + ") ENGINE=OLAP\n"
+                + "DUPLICATE KEY(did)\n"
+                + "DISTRIBUTED BY HASH(did) BUCKETS 1\n"
+                + "PROPERTIES ('replication_num' = '1')");
+        addConstraint("alter table dim_volatile add constraint 
uq_dim_volatile_k unique (k)");
+
+        // random() > 0.5 is a volatile predicate with no input slots.
+        // It must be kept ABOVE the window.
+        String sql = "SELECT d.did, f.id, f.k, f.v "
+                + "FROM fact_volatile f, dim_volatile d "
+                + "WHERE f.k = d.k "
+                + "  AND random() > 0.5"
+                + "  AND f.v * 2 > ("
+                + "    SELECT SUM(f2.v) "
+                + "    FROM fact_volatile f2 "
+                + "    WHERE f2.k = d.k"
+                + "  )";
+
+        Plan plan = PlanChecker.from(createCascadesContext(sql))
+                .analyze(sql)
+                .applyBottomUp(new PullUpProjectUnderApply())
+                .customRewrite(new EliminateUnnecessaryProject())
+                .customRewrite(new AggScalarSubQueryToWindowFunction())
+                .getPlan();
+
+        // Rule should match and produce a window
+        Assertions.assertTrue(plan.anyMatch(LogicalWindow.class::isInstance),
+                "Rule should produce a window for volatile predicate query");
+
+        // Verify the volatile predicate is NOT below the window.
+        List<LogicalWindow<Plan>> windows = 
plan.collectToList(LogicalWindow.class::isInstance);
+        LogicalWindow<Plan> window = windows.get(0);
+        Plan belowWindow = window.child(0);
+        List<LogicalFilter<Plan>> belowFilters = belowWindow
+                .collectToList(LogicalFilter.class::isInstance);
+        for (LogicalFilter<Plan> f : belowFilters) {
+            for (Expression conj : f.getConjuncts()) {
+                Assertions.assertFalse(conj.containsVolatileExpression(),
+                        "Volatile predicate should stay above window: " + 
conj.toSql());
+            }
+        }
+    }
+
+    @Test
+    public void testInnerFilterConjunctsStayBelowWindow() throws Exception {
+        // Regression test: shared-table predicates that were matched against
+        // inner subquery filter conjuncts must stay BELOW the window.
+        //
+        // Without the matchedInnerFilterConjuncts tracking, f.v < 10 would be
+        // classified as shared-table-only and placed ABOVE the window, letting
+        // the window aggregate over rows the original scalar subquery 
excluded.
+        createTable("CREATE TABLE fact_inner_filter (\n"
+                + "  id INT,\n"
+                + "  k INT,\n"
+                + "  v INT\n"
+                + ") ENGINE=OLAP\n"
+                + "DUPLICATE KEY(id)\n"
+                + "DISTRIBUTED BY HASH(id) BUCKETS 1\n"
+                + "PROPERTIES ('replication_num' = '1')");
+        createTable("CREATE TABLE dim_inner_filter (\n"
+                + "  did INT,\n"
+                + "  k INT NOT NULL,\n"
+                + "  tag INT\n"
+                + ") ENGINE=OLAP\n"
+                + "DUPLICATE KEY(did)\n"
+                + "DISTRIBUTED BY HASH(did) BUCKETS 1\n"
+                + "PROPERTIES ('replication_num' = '1')");
+        // UNIQUE constraint so the rule matches
+        addConstraint("alter table dim_inner_filter add constraint uq_dim_if_k 
unique (k)");
+
+        // Inner subquery has f2.v < 10 as a filter.  After checkFilter matches
+        // it against outer f.v < 10, the outer conjunct must go BELOW the 
window
+        // because it is semantically part of the inner aggregate's filter.
+        String sql = "SELECT d.did, f.id, f.k, f.v "
+                + "FROM fact_inner_filter f, dim_inner_filter d "
+                + "WHERE f.k = d.k "
+                + "  AND f.v < 10"
+                + "  AND f.v * 2 > ("
+                + "    SELECT SUM(f2.v) "
+                + "    FROM fact_inner_filter f2 "
+                + "    WHERE f2.k = d.k"
+                + "      AND f2.v < 10"
+                + "  )";
+
+        Plan plan = PlanChecker.from(createCascadesContext(sql))
+                .analyze(sql)
+                .applyBottomUp(new PullUpProjectUnderApply())
+                .customRewrite(new EliminateUnnecessaryProject())
+                .customRewrite(new AggScalarSubQueryToWindowFunction())
+                .getPlan();
+
+        // Rule should match and produce a window
+        Assertions.assertTrue(plan.anyMatch(LogicalWindow.class::isInstance),
+                "Rule should produce a window for this query");
+
+        // Collect the ExprIds of the shared table (fact_inner_filter).
+        List<CatalogRelation> allRels = 
plan.collectToList(CatalogRelation.class::isInstance);
+        Set<ExprId> sharedExprIds = allRels.stream()
+                .filter(r -> 
r.getTable().getName().equals("fact_inner_filter"))
+                .flatMap(r -> r.getOutputExprIdSet().stream())
+                .collect(Collectors.toSet());
+
+        // The conjunct f.v < 10, which was matched from the inner filter, must
+        // appear in a filter BELOW the window (it is part of the aggregate
+        // computation).  Verify it exists there and is NOT above.
+        List<LogicalWindow<Plan>> windows = 
plan.collectToList(LogicalWindow.class::isInstance);
+        LogicalWindow<Plan> window = windows.get(0);
+
+        // Check below-window filters: there MUST be at least one 
shared-table-only
+        // conjunct (f.v < 10) — this is the matched inner-filter predicate.
+        Plan belowWindow = window.child(0);
+        List<LogicalFilter<Plan>> belowFilters = belowWindow
+                .collectToList(LogicalFilter.class::isInstance);
+        boolean foundSharedOnlyBelow = false;
+        for (LogicalFilter<Plan> f : belowFilters) {
+            for (Expression conj : f.getConjuncts()) {
+                Set<ExprId> conjExprIds = conj.getInputSlotExprIds();
+                if (!conjExprIds.isEmpty() && 
sharedExprIds.containsAll(conjExprIds)) {
+                    foundSharedOnlyBelow = true;
+                }
+            }
+        }
+        Assertions.assertTrue(foundSharedOnlyBelow,
+                "Matched inner-filter conjunct f.v < 10 must be below the 
window");
+
+        // Check above-window filters: there should NOT be a shared-table-only
+        // conjunct that is NOT the window comparison.  f.v < 10 should not 
leak
+        // above.
+        List<LogicalFilter<Plan>> allFilters = plan
+                .collectToList(LogicalFilter.class::isInstance);
+        List<LogicalFilter<Plan>> aboveFilters = allFilters.stream()
+                .filter(f -> !belowFilters.contains(f))
+                .collect(Collectors.toList());
+        for (LogicalFilter<Plan> f : aboveFilters) {
+            for (Expression conj : f.getConjuncts()) {
+                Set<ExprId> conjExprIds = conj.getInputSlotExprIds();
+                if (!conjExprIds.isEmpty() && 
sharedExprIds.containsAll(conjExprIds)) {
+                    Assertions.fail(
+                            "Unexpected shared-table-only predicate above 
window: " + conj.toSql());
+                }
+            }
+        }
+    }
+
+    @Test
+    public void testSplitInnerFilterFromPushDown() throws Exception {
+        // Regression: PushDownFilterThroughProject splits the inner WHERE 
clause
+        // into multiple LogicalFilter nodes when a correlated predicate cannot
+        // be pushed through a project (references a correlated slot not in the
+        // project output) but a non-correlated predicate can.
+        //
+        // Before the fix, checkFilter() required exactly one inner filter and
+        // would reject plans where the filter was split.  Now it collects

Review Comment:
   `testSharedTablePredicatesStayAboveWindow()` already creates `fact_split`, 
`dim_split`, and `uq_dim_split_k` in the same per-class FE fixture. 
`AnalyzeCheckTestBase.runBeforeEach()` only clears `StatementScopeIdGenerator`, 
so those catalog objects remain for later test methods, and this second `CREATE 
TABLE fact_split` will fail whenever the earlier test has run first. Please use 
distinct table/constraint names here or drop the tables before creating them.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to