andygrove opened a new issue, #4412:
URL: https://github.com/apache/datafusion-comet/issues/4412
## Problem
Spark's AQE rule `AQEPropagateEmptyRelation` (in
`sql/core/.../adaptive/AQEPropagateEmptyRelation.scala`) detects when a query
stage materializes 0 rows and propagates emptiness upward, eliminating
downstream operators. The key pattern for aggregates is:
```scala
case LogicalQueryStage(_, agg: BaseAggregateExec) if
agg.groupingExpressions.nonEmpty &&
agg.child.isInstanceOf[QueryStageExec] =>
// estimate rows = child stage's actual row count
```
`CometHashAggregateExec` extends `CometUnaryExec`, not `BaseAggregateExec`,
so the pattern never matches. The optimization can't see Comet aggregates,
can't short-circuit them, and can't propagate emptiness through them to
downstream unions / joins.
## Why this surfaced now
PR #4374 (re-enable COUNT for mixed Spark partial / Comet final aggregates)
routes COUNT's Final aggregate through Comet. Pre-PR, COUNT's Final stayed as
Spark `HashAggregate` (= `BaseAggregateExec`), so the optimization fired.
Post-PR it doesn't, and the affected tests fail:
- `SPARK-35442: Support propagate empty relation through aggregate`
(AdaptiveQueryExecSuite)
- `SPARK-35442: Support propagate empty relation through union`
(AdaptiveQueryExecSuite)
Both are tagged `IgnoreComet` linking to this issue in
`dev/diffs/{3.4.3,3.5.8,4.0.2,4.1.1}.diff`.
## Impact
**Query results are correct under Comet.** `CometHashAggregateExec` over an
empty input returns empty. What's lost is the AQE short-circuit: instead of the
plan reducing to `LocalTableScanExec(empty)` at planning time, the Comet
aggregate runs against an empty shuffle stage and returns empty. The wasted
work is microseconds, minor, but visible on plans with many post-aggregate
operators (unions, joins) that could also collapse.
## Options
**A. Widen the SPARK-35442 test assertions** to accept
`CometHashAggregateExec` as a valid endpoint. Test-only change. Doesn't fix the
actual missed optimization.
**B. Add a Comet AQE rule that mirrors `AQEPropagateEmptyRelation`.** When a
`CometHashAggregateExec` (with GROUP BY) sits over a materialized empty stage,
rewrite it to `LocalTableScanExec(empty)`. ~40-80 lines plus tests. Closes the
real gap; localized blast radius.
**C. Make `CometHashAggregateExec extend BaseAggregateExec`.**
`BaseAggregateExec` is a trait, and 3 of its 8 abstract members already exist
on `CometHashAggregateExec`; the remaining 5 are one-line delegates to
`originalPlan.asInstanceOf[BaseAggregateExec]`. ~5-10 lines. The optimization
fires naturally, and any future Spark rule keyed on `BaseAggregateExec` picks
up Comet for free.
Today's review of every other Spark 3.4.3 rule that pattern-matches
`BaseAggregateExec` (`ReplaceHashWithSortAgg`, `RemoveRedundantProjects`,
`AggregateCodegenSupport`, `DisableUnnecessaryBucketedScan`,
`LogicalQueryStage` stats) shows none would do anything harmful to a Comet
aggregate. The caveat with C is that this needs verification on every Spark
version we support (3.4 / 3.5 / 4.0 / 4.1 / 4.2), and any future
`BaseAggregateExec`-keyed rule would automatically pick up Comet without an
explicit opt-in.
## Suggested follow-up
Prototype Option C in isolation, verify SPARK-35442 passes unchanged on all
supported Spark versions, and verify no other rule regresses. If C proves
risky, fall back to B.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]