andygrove opened a new issue, #4412:
URL: https://github.com/apache/datafusion-comet/issues/4412

   ## Problem
   
   Spark's AQE rule `AQEPropagateEmptyRelation` (in 
`sql/core/.../adaptive/AQEPropagateEmptyRelation.scala`) detects when a query 
stage materializes 0 rows and propagates emptiness upward, eliminating 
downstream operators. The key pattern for aggregates is:
   
   ```scala
   case LogicalQueryStage(_, agg: BaseAggregateExec) if 
agg.groupingExpressions.nonEmpty &&
     agg.child.isInstanceOf[QueryStageExec] =>
     // estimate rows = child stage's actual row count
   ```
   
   `CometHashAggregateExec` extends `CometUnaryExec`, not `BaseAggregateExec`, 
so the pattern never matches. The optimization can't see Comet aggregates, 
can't short-circuit them, and can't propagate emptiness through them to 
downstream unions / joins.
   
   ## Why this surfaced now
   
   PR #4374 (re-enable COUNT for mixed Spark partial / Comet final aggregates) 
routes COUNT's Final aggregate through Comet. Pre-PR, COUNT's Final stayed as 
Spark `HashAggregate` (= `BaseAggregateExec`), so the optimization fired. 
Post-PR it doesn't, and the affected tests fail:
   
   - `SPARK-35442: Support propagate empty relation through aggregate` 
(AdaptiveQueryExecSuite)
   - `SPARK-35442: Support propagate empty relation through union` 
(AdaptiveQueryExecSuite)
   
   Both are tagged `IgnoreComet` linking to this issue in 
`dev/diffs/{3.4.3,3.5.8,4.0.2,4.1.1}.diff`.
   
   ## Impact
   
   **Query results are correct under Comet.** `CometHashAggregateExec` over an 
empty input returns empty. What's lost is the AQE short-circuit: instead of the 
plan reducing to `LocalTableScanExec(empty)` at planning time, the Comet 
aggregate runs against an empty shuffle stage and returns empty. The wasted 
work is microseconds, minor, but visible on plans with many post-aggregate 
operators (unions, joins) that could also collapse.
   
   ## Options
   
   **A. Widen the SPARK-35442 test assertions** to accept 
`CometHashAggregateExec` as a valid endpoint. Test-only change. Doesn't fix the 
actual missed optimization.
   
   **B. Add a Comet AQE rule that mirrors `AQEPropagateEmptyRelation`.** When a 
`CometHashAggregateExec` (with GROUP BY) sits over a materialized empty stage, 
rewrite it to `LocalTableScanExec(empty)`. ~40-80 lines plus tests. Closes the 
real gap; localized blast radius.
   
   **C. Make `CometHashAggregateExec extend BaseAggregateExec`.** 
`BaseAggregateExec` is a trait, and 3 of its 8 abstract members already exist 
on `CometHashAggregateExec`; the remaining 5 are one-line delegates to 
`originalPlan.asInstanceOf[BaseAggregateExec]`. ~5-10 lines. The optimization 
fires naturally, and any future Spark rule keyed on `BaseAggregateExec` picks 
up Comet for free.
   
   Today's review of every other Spark 3.4.3 rule that pattern-matches 
`BaseAggregateExec` (`ReplaceHashWithSortAgg`, `RemoveRedundantProjects`, 
`AggregateCodegenSupport`, `DisableUnnecessaryBucketedScan`, 
`LogicalQueryStage` stats) shows none would do anything harmful to a Comet 
aggregate. The caveat with C is that this needs verification on every Spark 
version we support (3.4 / 3.5 / 4.0 / 4.1 / 4.2), and any future 
`BaseAggregateExec`-keyed rule would automatically pick up Comet without an 
explicit opt-in.
   
   ## Suggested follow-up
   
   Prototype Option C in isolation, verify SPARK-35442 passes unchanged on all 
supported Spark versions, and verify no other rule regresses. If C proves 
risky, fall back to B.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to