github-actions[bot] commented on code in PR #63763:
URL: https://github.com/apache/doris/pull/63763#discussion_r3489124383
##########
fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/AggScalarSubQueryToWindowFunction.java:
##########
@@ -207,6 +226,9 @@ private boolean checkFilter(LogicalFilter<? extends Plan>
outerFilter) {
if (ExpressionIdenticalChecker.INSTANCE.check(innerExpr,
outerExpr)) {
Review Comment:
This match should reject volatile predicates before
`ExpressionIdenticalChecker` runs. A direct boolean volatile UDF does not use
the `ComparisonPredicate` special case: `JavaUdf.accept()`/`PythonUdf.accept()`
reach the generic visitor, which only compares class and recursively checked
children, so it bypasses the UDF `equals()` methods that compare
`VolatileIdentity`.
For example, with `volatile_bool_udf` declared `VOLATILE`:
```text
Filter(f.k = d.k, volatile_bool_udf(f.k), f.v * 2 > sum_alias)
Apply(correlation: d.k)
CrossJoin(Scan fact f, Scan dim_unique d)
Aggregate(sum(f2.v) AS sum_alias)
Filter(f2.k = d.k, volatile_bool_udf(f2.k))
Scan fact f2
```
After slot replacement, the two UDF conjuncts have the same class and child
slot, so this check can mark the outer UDF as a matched inner filter even
though the parser/builder gave each volatile call a fresh identity. The rewrite
then sends that matched conjunct below the window before the volatile guard,
collapsing the independent outer-row filter and inner-aggregate filter into one
predicate. Please reject volatile inner/outer conjuncts here, or make this
checker respect `VolatileIdentity` for UDF/volatile nodes.
##########
fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/AggScalarSubQueryToWindowFunction.java:
##########
@@ -362,17 +483,228 @@ private Plan rewrite(LogicalFilter<? extends Plan>
filter, LogicalApply<Plan, Pl
windowFilterConjunct = ExpressionUtils.replace(windowFilterConjunct,
ImmutableMap.of(aggOut.toSlot(), aggOutExpr));
- LogicalFilter<Plan> newFilter =
filter.withConjunctsAndChild(conjuncts.get(true), apply.left());
+ // Split uncorrelated conjuncts: predicates that reference ONLY shared
+ // relation slots (tables appearing in both outer and inner plans) must
+ // stay ABOVE the window. Otherwise the window function would see a
+ // different set of rows than the original scalar subquery.
+ //
+ // For example, with fact(f) as shared table and dim(d) as outer-only:
+ // f.v > 6 → shared-only → must stay above the window
+ // d.tag > 0 → outer-only → safe below the window
+ // f.k = d.k → join cond → needed below the window
+ //
+ // We find shared tables by comparing table IDs that appear in both
+ // outer and inner plans, then collect ALL output slots of those
+ // tables (not just columns referenced in the inner query).
+ List<CatalogRelation> outerRels = outerPlans.stream()
+ .filter(CatalogRelation.class::isInstance)
+ .map(CatalogRelation.class::cast)
+ .collect(Collectors.toList());
+ List<CatalogRelation> innerRels = innerPlans.stream()
+ .filter(CatalogRelation.class::isInstance)
+ .map(CatalogRelation.class::cast)
+ .collect(Collectors.toList());
+ Set<Long> innerTableIds = innerRels.stream()
+ .map(r -> r.getTable().getId())
+ .collect(Collectors.toSet());
+ Set<ExprId> sharedOuterExprIds = outerRels.stream()
+ .filter(r -> innerTableIds.contains(r.getTable().getId()))
+ .flatMap(r -> r.getOutput().stream())
+ .map(Slot::getExprId)
+ .collect(Collectors.toSet());
+ Set<Expression> uncorrelatedConjuncts = conjuncts.get(true);
+ Set<Expression> belowWindowConjuncts = Sets.newHashSet();
+ Set<Expression> aboveWindowConjuncts = Sets.newHashSet();
+ if (uncorrelatedConjuncts != null) {
+ for (Expression conj : uncorrelatedConjuncts) {
+ // Conjuncts that were matched against inner subquery filter
+ // conjuncts (tracked by checkFilter) must stay BELOW the
+ // window. They are semantically part of the inner aggregate's
+ // filter, not extra outer-only predicates. Placing them above
+ // the window would let the window see more rows than the
+ // original scalar subquery, producing wrong aggregate results.
+ if (matchedInnerFilterConjuncts.contains(conj)) {
+ belowWindowConjuncts.add(conj);
+ continue;
+ }
+ // Volatile predicates (e.g. random() > 0.5) must stay ABOVE
+ // the window. Pushing them below would let the window
+ // aggregate over a different set of rows per partition than
+ // the original scalar subquery. PushDownFilterThroughWindow
+ // has the same hazard and explicitly rejects volatile
+ // predicates for this reason.
+ if (conj.containsVolatileExpression()) {
+ aboveWindowConjuncts.add(conj);
+ continue;
+ }
+ // Any predicate that references shared-table slots (tables
+ // appearing in both outer and inner plans) must stay ABOVE
+ // the window. This handles both shared-only predicates
+ // (e.g. f.v > 6) and mixed shared+outer predicates
+ // (e.g. f.v > d.tag). Pushing them below would restrict
+ // the rows seen by the window function, producing a
+ // different aggregate than the original scalar subquery.
+ boolean hasShared = false;
+ for (ExprId id : conj.getInputSlotExprIds()) {
+ if (sharedOuterExprIds.contains(id)) {
+ hasShared = true;
+ break;
+ }
+ }
+ if (hasShared) {
+ aboveWindowConjuncts.add(conj);
+ } else {
+ // No shared-table references: the predicate involves only
+ // outer-only table columns. Safe to place below the
window.
+ belowWindowConjuncts.add(conj);
+ }
+ }
+ }
+
+ // Extract and classify conjuncts from any LogicalFilter nodes nested
+ // inside the outer child (apply.left()). Nested shared-table filters
+ // (e.g. f.v > 6 under a CrossJoin) must be hoisted ABOVE the window;
+ // otherwise the window would aggregate over a filtered subset of rows
+ // while the original scalar subquery sees all rows for the key.
+ //
+ // Nested outer-only filters (e.g. d.tag > 0) are safe below and can
+ // stay in place after the filters are stripped.
+ List<LogicalFilter<Plan>> nestedOuterFilters = apply.left()
+ .collectToList(LogicalFilter.class::isInstance);
+ Set<ExprId> extractedConjunctExprIds = Sets.newHashSet();
+ for (LogicalFilter<Plan> nf : nestedOuterFilters) {
+ for (Expression conj : nf.getConjuncts()) {
+ // Matched inner-filter conjuncts always go below.
+ if (matchedInnerFilterConjuncts.contains(conj)) {
+ belowWindowConjuncts.add(conj);
+
extractedConjunctExprIds.addAll(conj.getInputSlotExprIds());
+ continue;
+ }
+ // Volatile predicates (e.g. random() > 0.5) in nested filters
+ // must NOT be hoisted: moving them across a join changes their
+ // evaluation frequency. For a shape like
+ // CrossJoin(fact, (select * from dim where random()>0.5) d)
+ // random() is evaluated once per dim row before the join;
+ // hoisting it above the window/join evaluates random() once
per
+ // joined fact row, which can change which dim rows survive.
+ // Keep volatile nested-filter predicates at their original
+ // position by skipping them here.
+ if (conj.containsVolatileExpression()) {
Review Comment:
Keeping nested volatile filters in place is still unsafe when the nested
filter is on a shared-table branch. For example:
```text
Filter(sf.k = d.k, sf.v * 2 > sum_alias)
Apply(correlation: d.k)
CrossJoin
Filter(random() > 0.5)
Scan fact sf -- same table as the scalar subquery
Scan dim_unique d -- d.k is unique/non-null
Aggregate(sum(f2.v) AS sum_alias)
Filter(f2.k = d.k)
Scan fact f2
```
`checkFilter()` can match `f2.k = d.k`, and the unique-key check can pass.
This branch then skips the volatile conjunct here, while `stripOuterFilters()`
preserves it in the child used below `LogicalWindow`. The rewritten window
therefore computes `SUM(sf.v) OVER (PARTITION BY d.k)` only over the
random-surviving fact rows, but the original scalar subquery still sums all
`fact` rows for the key; `random()` only decides which outer fact rows are
tested. Please reject nested volatile filters on shared-table paths, or only
keep a nested volatile predicate in place after proving it belongs exclusively
to the outer-only relation.
##########
fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/AggScalarSubQueryToWindowFunction.java:
##########
@@ -362,17 +483,228 @@ private Plan rewrite(LogicalFilter<? extends Plan>
filter, LogicalApply<Plan, Pl
windowFilterConjunct = ExpressionUtils.replace(windowFilterConjunct,
ImmutableMap.of(aggOut.toSlot(), aggOutExpr));
- LogicalFilter<Plan> newFilter =
filter.withConjunctsAndChild(conjuncts.get(true), apply.left());
+ // Split uncorrelated conjuncts: predicates that reference ONLY shared
+ // relation slots (tables appearing in both outer and inner plans) must
+ // stay ABOVE the window. Otherwise the window function would see a
+ // different set of rows than the original scalar subquery.
+ //
+ // For example, with fact(f) as shared table and dim(d) as outer-only:
+ // f.v > 6 → shared-only → must stay above the window
+ // d.tag > 0 → outer-only → safe below the window
+ // f.k = d.k → join cond → needed below the window
+ //
+ // We find shared tables by comparing table IDs that appear in both
+ // outer and inner plans, then collect ALL output slots of those
+ // tables (not just columns referenced in the inner query).
+ List<CatalogRelation> outerRels = outerPlans.stream()
+ .filter(CatalogRelation.class::isInstance)
+ .map(CatalogRelation.class::cast)
+ .collect(Collectors.toList());
+ List<CatalogRelation> innerRels = innerPlans.stream()
+ .filter(CatalogRelation.class::isInstance)
+ .map(CatalogRelation.class::cast)
+ .collect(Collectors.toList());
+ Set<Long> innerTableIds = innerRels.stream()
+ .map(r -> r.getTable().getId())
+ .collect(Collectors.toSet());
+ Set<ExprId> sharedOuterExprIds = outerRels.stream()
+ .filter(r -> innerTableIds.contains(r.getTable().getId()))
+ .flatMap(r -> r.getOutput().stream())
+ .map(Slot::getExprId)
+ .collect(Collectors.toSet());
+ Set<Expression> uncorrelatedConjuncts = conjuncts.get(true);
+ Set<Expression> belowWindowConjuncts = Sets.newHashSet();
+ Set<Expression> aboveWindowConjuncts = Sets.newHashSet();
+ if (uncorrelatedConjuncts != null) {
+ for (Expression conj : uncorrelatedConjuncts) {
+ // Conjuncts that were matched against inner subquery filter
+ // conjuncts (tracked by checkFilter) must stay BELOW the
+ // window. They are semantically part of the inner aggregate's
+ // filter, not extra outer-only predicates. Placing them above
+ // the window would let the window see more rows than the
+ // original scalar subquery, producing wrong aggregate results.
+ if (matchedInnerFilterConjuncts.contains(conj)) {
+ belowWindowConjuncts.add(conj);
+ continue;
+ }
+ // Volatile predicates (e.g. random() > 0.5) must stay ABOVE
+ // the window. Pushing them below would let the window
+ // aggregate over a different set of rows per partition than
+ // the original scalar subquery. PushDownFilterThroughWindow
+ // has the same hazard and explicitly rejects volatile
+ // predicates for this reason.
+ if (conj.containsVolatileExpression()) {
+ aboveWindowConjuncts.add(conj);
+ continue;
+ }
+ // Any predicate that references shared-table slots (tables
+ // appearing in both outer and inner plans) must stay ABOVE
+ // the window. This handles both shared-only predicates
+ // (e.g. f.v > 6) and mixed shared+outer predicates
+ // (e.g. f.v > d.tag). Pushing them below would restrict
+ // the rows seen by the window function, producing a
+ // different aggregate than the original scalar subquery.
+ boolean hasShared = false;
+ for (ExprId id : conj.getInputSlotExprIds()) {
+ if (sharedOuterExprIds.contains(id)) {
+ hasShared = true;
+ break;
+ }
+ }
+ if (hasShared) {
+ aboveWindowConjuncts.add(conj);
+ } else {
+ // No shared-table references: the predicate involves only
+ // outer-only table columns. Safe to place below the
window.
+ belowWindowConjuncts.add(conj);
+ }
+ }
+ }
+
+ // Extract and classify conjuncts from any LogicalFilter nodes nested
+ // inside the outer child (apply.left()). Nested shared-table filters
+ // (e.g. f.v > 6 under a CrossJoin) must be hoisted ABOVE the window;
+ // otherwise the window would aggregate over a filtered subset of rows
+ // while the original scalar subquery sees all rows for the key.
+ //
+ // Nested outer-only filters (e.g. d.tag > 0) are safe below and can
+ // stay in place after the filters are stripped.
+ List<LogicalFilter<Plan>> nestedOuterFilters = apply.left()
+ .collectToList(LogicalFilter.class::isInstance);
+ Set<ExprId> extractedConjunctExprIds = Sets.newHashSet();
+ for (LogicalFilter<Plan> nf : nestedOuterFilters) {
+ for (Expression conj : nf.getConjuncts()) {
+ // Matched inner-filter conjuncts always go below.
+ if (matchedInnerFilterConjuncts.contains(conj)) {
+ belowWindowConjuncts.add(conj);
+
extractedConjunctExprIds.addAll(conj.getInputSlotExprIds());
+ continue;
+ }
+ // Volatile predicates (e.g. random() > 0.5) in nested filters
+ // must NOT be hoisted: moving them across a join changes their
+ // evaluation frequency. For a shape like
+ // CrossJoin(fact, (select * from dim where random()>0.5) d)
+ // random() is evaluated once per dim row before the join;
+ // hoisting it above the window/join evaluates random() once
per
+ // joined fact row, which can change which dim rows survive.
+ // Keep volatile nested-filter predicates at their original
+ // position by skipping them here.
+ if (conj.containsVolatileExpression()) {
+ continue;
+ }
+ boolean nfHasShared = false;
+ for (ExprId id : conj.getInputSlotExprIds()) {
+ if (sharedOuterExprIds.contains(id)) {
+ nfHasShared = true;
+ break;
+ }
+ }
+ if (nfHasShared) {
Review Comment:
This extraction also needs to respect `NoneMovableFunction`, not just
volatile expressions. A nested deterministic predicate such as
`assert_true(d.tag > 0, 'bad')` is not volatile, so it reaches this
shared/outer-only classification and is later stripped from its original branch
by `stripOuterFilters()`. For example:
```text
Filter(sf.k = d.k, sf.v * 2 > sum_alias)
Apply(correlation: d.k)
CrossJoin
Scan fact sf
Filter(assert_true(d.tag > 0, 'bad'))
Scan dim_unique d
Aggregate(sum(f2.v) AS sum_alias)
Filter(f2.k = d.k)
Scan fact f2
```
After the rewrite, the `assert_true` predicate is reinserted above the whole
`CrossJoin` below the window, so a failing `dim` row with no matching fact row
may no longer raise, and matching rows can evaluate the assertion once per
joined fact row. `AssertTrue` implements `NoneMovableFunction`, whose contract
says these functions should not be moved/pruned, and other filter-pushdown code
checks that marker. Please reject nested filters containing
`NoneMovableFunction`, or preserve those predicates at their original branch
position instead of extracting them.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]