github-actions[bot] commented on code in PR #63763:
URL: https://github.com/apache/doris/pull/63763#discussion_r3453030891
##########
fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/AggScalarSubQueryToWindowFunction.java:
##########
@@ -79,22 +81,29 @@ public class AggScalarSubQueryToWindowFunction extends
DefaultPlanRewriter<JobCo
private static final Set<Class<? extends LogicalPlan>>
OUTER_SUPPORTED_PLAN = ImmutableSet.of(
LogicalJoin.class,
+ LogicalFilter.class,
LogicalProject.class,
Review Comment:
Adding `LogicalFilter` to the supported outer shape makes the rule accept
filters that are already inside `apply.left()`, but `rewrite()` only
reclassifies conjuncts from the top filter above the `Apply`. For a shape like:
```text
Filter(f.k = d.k, f.v * 2 > sum_alias)
Apply(correlation: d.k)
CrossJoin
Filter(f.v > 6)
Scan fact f
Scan dim d -- unique on k
Aggregate(sum(f2.v) AS sum_alias)
Filter(f2.k = d.k)
Scan fact f2
```
`checkFilter()` matches `f.k = d.k`, and the new `LogicalFilter` allowance
lets the nested `f.v > 6` pass. `rewrite()` then places `apply.left()`
unchanged under the window, so `SUM(f.v) OVER (PARTITION BY d.k)` is computed
after `f.v > 6`, while the original scalar subquery sums all `fact` rows for
the key. Please either reject outer plans containing shared-side
`LogicalFilter`s, or extract and classify those nested filters so shared-table
predicates are kept above the window.
##########
fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/AggScalarSubQueryToWindowFunction.java:
##########
@@ -362,9 +478,81 @@ private Plan rewrite(LogicalFilter<? extends Plan> filter,
LogicalApply<Plan, Pl
windowFilterConjunct = ExpressionUtils.replace(windowFilterConjunct,
ImmutableMap.of(aggOut.toSlot(), aggOutExpr));
- LogicalFilter<Plan> newFilter =
filter.withConjunctsAndChild(conjuncts.get(true), apply.left());
+ // Split uncorrelated conjuncts: predicates that reference ONLY shared
+ // relation slots (tables appearing in both outer and inner plans) must
+ // stay ABOVE the window. Otherwise the window function would see a
+ // different set of rows than the original scalar subquery.
+ //
+ // For example, with fact(f) as shared table and dim(d) as outer-only:
+ // f.v > 6 → shared-only → must stay above the window
+ // d.tag > 0 → outer-only → safe below the window
+ // f.k = d.k → join cond → needed below the window
+ //
+ // We find shared tables by comparing table IDs that appear in both
+ // outer and inner plans, then collect ALL output slots of those
+ // tables (not just columns referenced in the inner query).
+ List<CatalogRelation> outerRels = outerPlans.stream()
+ .filter(CatalogRelation.class::isInstance)
+ .map(CatalogRelation.class::cast)
+ .collect(Collectors.toList());
+ List<CatalogRelation> innerRels = innerPlans.stream()
+ .filter(CatalogRelation.class::isInstance)
+ .map(CatalogRelation.class::cast)
+ .collect(Collectors.toList());
+ Set<Long> innerTableIds = innerRels.stream()
+ .map(r -> r.getTable().getId())
+ .collect(Collectors.toSet());
+ Set<ExprId> sharedOuterExprIds = outerRels.stream()
+ .filter(r -> innerTableIds.contains(r.getTable().getId()))
+ .flatMap(r -> r.getOutput().stream())
+ .map(Slot::getExprId)
+ .collect(Collectors.toSet());
+ Set<Expression> uncorrelatedConjuncts = conjuncts.get(true);
+ Set<Expression> belowWindowConjuncts = Sets.newHashSet();
+ Set<Expression> aboveWindowConjuncts = Sets.newHashSet();
+ if (uncorrelatedConjuncts != null) {
+ for (Expression conj : uncorrelatedConjuncts) {
+ // Conjuncts that were matched against inner subquery filter
+ // conjuncts (tracked by checkFilter) must stay BELOW the
+ // window. They are semantically part of the inner aggregate's
+ // filter, not extra outer-only predicates. Placing them above
+ // the window would let the window see more rows than the
+ // original scalar subquery, producing wrong aggregate results.
+ if (matchedInnerFilterConjuncts.contains(conj)) {
+ belowWindowConjuncts.add(conj);
+ continue;
+ }
+ // Any predicate that references shared-table slots (tables
+ // appearing in both outer and inner plans) must stay ABOVE
+ // the window. This handles both shared-only predicates
+ // (e.g. f.v > 6) and mixed shared+outer predicates
+ // (e.g. f.v > d.tag). Pushing them below would restrict
+ // the rows seen by the window function, producing a
+ // different aggregate than the original scalar subquery.
+ boolean hasShared = false;
+ for (ExprId id : conj.getInputSlotExprIds()) {
+ if (sharedOuterExprIds.contains(id)) {
+ hasShared = true;
+ break;
+ }
+ }
+ if (hasShared) {
+ aboveWindowConjuncts.add(conj);
+ } else {
+ // No shared-table references: the predicate involves only
Review Comment:
This branch also treats slotless predicates as safe below the window. For
example, `random() > 0.5` has an empty input-slot set, so `hasShared` remains
false and it is added to `belowWindowConjuncts`. In the original plan:
```text
Filter(random() > 0.5, f.k = d.k, f.v * 2 > sum_alias)
Apply(...)
```
the random predicate filters outer rows, but the scalar subquery still
computes `SUM(f2.v)` over all matching inner `fact` rows. After this rewrite,
the random predicate is below `LogicalWindow`, so the window aggregate is
computed only over the randomly surviving rows in each partition.
`PushDownFilterThroughWindow.canPushDown()` has the same empty-input hazard and
explicitly rejects volatile predicates for this reason. Please reject volatile
uncorrelated conjuncts here, or keep them above the window instead of treating
them as outer-only filters.
##########
fe/fe-core/src/test/java/org/apache/doris/nereids/rules/rewrite/AggScalarSubQueryToWindowFunctionTest.java:
##########
@@ -338,6 +362,392 @@ public void testNotMatchTheRule() {
}
}
+ @Test
+ public void testWindowPartitionsByOuterOnlyRelationSlots() throws
Exception {
+ // Use TPC-H Q17: correlated table is part (p_partkey is unique via
constraint),
+ // fact table is lineitem. The window PARTITION BY should contain all
output
+ // columns of the correlated table (part).
+ String sql = TPCHUtils.Q17;
+
+ Plan plan = PlanChecker.from(createCascadesContext(sql))
+ .analyze(sql)
+ .applyBottomUp(new PullUpProjectUnderApply())
+ .applyTopDown(new PushDownFilterThroughProject())
+ .customRewrite(new EliminateUnnecessaryProject())
+ .customRewrite(new AggScalarSubQueryToWindowFunction())
+ .getPlan();
+
+ List<LogicalWindow<Plan>> windows =
plan.collectToList(LogicalWindow.class::isInstance);
+ Assertions.assertEquals(1, windows.size());
+
+ LogicalWindow<Plan> window = windows.get(0);
+ List<NamedExpression> windowExpressions =
window.getWindowExpressions();
+ Assertions.assertEquals(1, windowExpressions.size());
+
+ WindowExpression windowExpression = (WindowExpression)
windowExpressions.get(0).child(0);
+ Set<String> partitionKeys =
windowExpression.getPartitionKeys().stream()
+ .map(Expression.class::cast)
+ .filter(Slot.class::isInstance)
+ .map(Slot.class::cast)
+ .map(Slot::getName)
+ .collect(Collectors.toSet());
+ // The window PARTITION BY should include the correlated column
(p_partkey)
+ Assertions.assertTrue(partitionKeys.contains("p_partkey"),
+ "Expected partition keys to contain p_partkey, got: " +
partitionKeys);
+ }
+
+ @Test
+ public void testSharedTablePredicatesStayAboveWindow() throws Exception {
+ createTable("CREATE TABLE fact_split (\n"
+ + " id INT,\n"
+ + " k INT,\n"
+ + " v INT\n"
+ + ") ENGINE=OLAP\n"
+ + "DUPLICATE KEY(id)\n"
+ + "DISTRIBUTED BY HASH(id) BUCKETS 1\n"
+ + "PROPERTIES ('replication_num' = '1')");
+ createTable("CREATE TABLE dim_split (\n"
+ + " did INT,\n"
+ + " k INT NOT NULL,\n"
+ + " tag INT\n"
+ + ") ENGINE=OLAP\n"
+ + "DUPLICATE KEY(did)\n"
+ + "DISTRIBUTED BY HASH(did) BUCKETS 1\n"
+ + "PROPERTIES ('replication_num' = '1')");
+ // Add UNIQUE constraint so the rule matches
+ addConstraint("alter table dim_split add constraint uq_dim_split_k
unique (k)");
+
+ // Query with extra predicate on shared table (f.v > 6).
+ // This predicate must stay ABOVE the window, otherwise the window
+ // function would aggregate fewer rows than the original scalar
subquery.
+ String sql = "SELECT d.did, d.k, d.tag, f.id, f.v "
+ + "FROM fact_split f, dim_split d "
+ + "WHERE f.k = d.k "
+ + " AND f.v > 6"
+ + " AND f.v * 2 > ("
+ + " SELECT SUM(f2.v) "
+ + " FROM fact_split f2 "
+ + " WHERE f2.k = d.k"
+ + " )";
+
+ Plan plan = PlanChecker.from(createCascadesContext(sql))
+ .analyze(sql)
+ .applyBottomUp(new PullUpProjectUnderApply())
+ .applyTopDown(new PushDownFilterThroughProject())
+ .customRewrite(new EliminateUnnecessaryProject())
+ .customRewrite(new AggScalarSubQueryToWindowFunction())
+ .getPlan();
+
+ // Rule should match and produce a window
+ Assertions.assertTrue(plan.anyMatch(LogicalWindow.class::isInstance),
+ "Rule should produce a window for this query");
+
+ // Collect the ExprIds of the shared table (fact_split – appears in
both
+ // outer and inner plans). Predicates that reference ONLY these
ExprIds
+ // (shared-table-only filters) must stay ABOVE the window, otherwise
the
+ // window function would see fewer rows than the original scalar
subquery.
+ List<CatalogRelation> rels =
plan.collectToList(CatalogRelation.class::isInstance);
+ Set<ExprId> sharedExprIds = rels.stream()
+ .filter(r -> r.getTable().getName().equals("fact_split"))
+ .flatMap(r -> r.getOutputExprIdSet().stream())
+ .collect(Collectors.toSet());
+
+ // Verify that no filter below the window contains a predicate whose
+ // input ExprIds are all from the shared table. Such predicates
+ // (e.g. f.v > 6) must have been placed above the window.
+ List<LogicalWindow<Plan>> windows =
plan.collectToList(LogicalWindow.class::isInstance);
+ LogicalWindow<Plan> window = windows.get(0);
+ Plan belowWindow = window.child(0);
+ List<LogicalFilter<Plan>> belowFilters = belowWindow
+ .collectToList(LogicalFilter.class::isInstance);
+ for (LogicalFilter<Plan> f : belowFilters) {
+ for (Expression conj : f.getConjuncts()) {
+ Set<ExprId> conjExprIds = conj.getInputSlotExprIds();
+ if (!conjExprIds.isEmpty() &&
sharedExprIds.containsAll(conjExprIds)) {
+ Assertions.fail(
+ "Shared-table-only predicate should not be below
window: " + conj.toSql());
+ }
+ }
+ }
+ }
+
+ @Test
+ public void testMixedSharedOuterPredicatesStayAboveWindow() throws
Exception {
+ // Mixed predicates that reference both shared-table and
outer-only-table
+ // columns (e.g. f.v > d.tag) must stay ABOVE the window. Pushing them
+ // below would restrict the rows seen by the window function, producing
+ // a different aggregate than the original scalar subquery.
+ //
+ // Input plan shape:
+ // Filter(f.v > d.tag, f.v * 2 > sum_alias) ← mixed +
correlated
+ // Apply(correlation: d.k)
+ // CrossJoin
+ // Scan fact f
+ // Scan dim d -- d.k is unique (constraint)
+ // Aggregate(sum(f2.v) AS sum_alias)
+ // Filter(f2.k = d.k)
+ // Scan fact f2
+ //
+ // Output plan shape:
+ // Filter(f.v > d.tag, f.v * 2 > sum_over_window) ← mixed stays
ABOVE
+ // Window(sum(v) OVER (PARTITION BY d.k))
+ // Filter(f.k = d.k) ← join cond BELOW
+ // CrossJoin
+ // Scan fact f
+ // Scan dim d
+ createTable("CREATE TABLE fact_mixed (\n"
+ + " id INT,\n"
+ + " k INT,\n"
+ + " v INT\n"
+ + ") ENGINE=OLAP\n"
+ + "DUPLICATE KEY(id)\n"
+ + "DISTRIBUTED BY HASH(id) BUCKETS 1\n"
+ + "PROPERTIES ('replication_num' = '1')");
+ createTable("CREATE TABLE dim_mixed (\n"
+ + " did INT,\n"
+ + " k INT NOT NULL,\n"
+ + " tag INT\n"
+ + ") ENGINE=OLAP\n"
+ + "DUPLICATE KEY(did)\n"
+ + "DISTRIBUTED BY HASH(did) BUCKETS 1\n"
+ + "PROPERTIES ('replication_num' = '1')");
+ addConstraint("alter table dim_mixed add constraint uq_dim_mixed_k
unique (k)");
+
+ // Mixed predicate: f.v > d.tag references both shared (f.v) and
+ // outer-only (d.tag) columns. It must stay ABOVE the window.
+ String sql = "SELECT d.did, d.k, d.tag, f.id, f.v "
+ + "FROM fact_mixed f, dim_mixed d "
+ + "WHERE f.k = d.k "
+ + " AND f.v > d.tag"
+ + " AND f.v * 2 > ("
+ + " SELECT SUM(f2.v) "
+ + " FROM fact_mixed f2 "
+ + " WHERE f2.k = d.k"
+ + " )";
+
+ Plan plan = PlanChecker.from(createCascadesContext(sql))
+ .analyze(sql)
+ .applyBottomUp(new PullUpProjectUnderApply())
+ .applyTopDown(new PushDownFilterThroughProject())
+ .customRewrite(new EliminateUnnecessaryProject())
+ .customRewrite(new AggScalarSubQueryToWindowFunction())
+ .getPlan();
+
+ // Rule should match and produce a window
+ Assertions.assertTrue(plan.anyMatch(LogicalWindow.class::isInstance),
+ "Rule should produce a window for this query");
+
+ // Collect shared table (fact_mixed) ExprIds
+ List<CatalogRelation> rels =
plan.collectToList(CatalogRelation.class::isInstance);
+ Set<ExprId> sharedExprIds = rels.stream()
+ .filter(r -> r.getTable().getName().equals("fact_mixed"))
+ .flatMap(r -> r.getOutputExprIdSet().stream())
+ .collect(Collectors.toSet());
+
+ // Verify: the mixed predicate f.v > d.tag must NOT be below the
window.
+ // It should appear in a filter ABOVE the window. We verify by
checking
+ // that no filter directly below the window contains predicates that
+ // reference ONLY shared table columns.
+ List<LogicalWindow<Plan>> windows =
plan.collectToList(LogicalWindow.class::isInstance);
+ LogicalWindow<Plan> window = windows.get(0);
+ Plan belowWindow = window.child(0);
+ List<LogicalFilter<Plan>> belowFilters = belowWindow
+ .collectToList(LogicalFilter.class::isInstance);
+ for (LogicalFilter<Plan> f : belowFilters) {
+ for (Expression conj : f.getConjuncts()) {
+ Set<ExprId> conjExprIds = conj.getInputSlotExprIds();
+ if (!conjExprIds.isEmpty() &&
sharedExprIds.containsAll(conjExprIds)) {
+ Assertions.fail(
Review Comment:
This test still would not catch the mixed-predicate regression it describes.
The query uses `f.v > d.tag`, but the assertion below only fails when
`sharedExprIds.containsAll(conjExprIds)` is true. A misplaced mixed predicate
has both the shared slot `fact_mixed.v` and the outer-only slot
`dim_mixed.tag`, so its input set is not contained in `sharedExprIds` and the
test would pass even if `f.v > d.tag` stayed below the window. Please assert on
the exact mixed predicate shape or input ExprIds and verify that it is absent
from the below-window filters, and ideally present in the above-window filter.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]