andygrove opened a new pull request, #4565:
URL: https://github.com/apache/datafusion-comet/pull/4565

   ## Which issue does this PR close?
   
   N/A — proactive coverage for an aggregate operator Spark sometimes plans 
that Comet was previously falling back on.
   
   ## Rationale for this change
   
   Spark's planner picks `SortAggregateExec` when neither `HashAggregateExec` 
nor `ObjectHashAggregateExec` fits — typically when 
`spark.sql.execution.useObjectHashAggregateExec=false` is set, or for 
`TypedImperativeAggregate` functions whose buffer state can't ride hash 
aggregation. Today Comet doesn't recognize the operator at all, so any such 
Partial→Final pair runs on Spark and tends to drag the surrounding shuffle off 
Comet too. CollectSet under `useObjectHashAggregate=false` is the motivating 
example.
   
   ## What changes are included in this PR?
   
   - Map `SortAggregateExec` to a new `CometSortAggregateExec` serde object 
that reuses the existing `CometBaseAggregate.doConvert` path. Same 
Comet-shuffle gate as `CometObjectHashAggregateExec` (their 
TypedImperativeAggregate buffer formats differ between Spark and Comet, so a 
Comet-Partial / Spark-Final split would crash).
   - `createExec` produces a `CometHashAggregateExec` wrapper — the same 
approach `CometObjectHashAggregateExec` already uses. 
`CometExec.outputOrdering` defaults to `originalPlan.outputOrdering`, so 
`SortAggregateExec`'s grouping-key ordering flows through unchanged for any 
downstream operator that elided a sort against it.
   - No proto / Rust changes: DataFusion's `AggregateExec::try_new` 
auto-detects `InputOrderMode::Sorted` from the child's output ordering, so 
sorted input naturally produces sorted output and the streaming-aggregate 
optimization kicks in for free.
   - Cleanups landed in passing: lifted `getSupportLevel` (was duplicated 3x) 
and `adjustOutputForNativeState` (was duplicated verbatim 2x) onto the 
`CometBaseAggregate` trait, and collapsed three Spark-side aggregate 
enumeration sites (`isAggregate`, `findCometPartialAgg`, the shuffle guard in 
`canAggregateBeConverted`) to match `BaseAggregateExec` once instead of listing 
each subclass.
   
   ## What's NOT included
   
   Arbitrary user-defined `TypedImperativeAggregate` UDAFs still fall back — 
their `update`/`merge`/`serialize` methods are JVM code that native execution 
can't invoke. This PR only enables `SortAggregateExec` for aggregate functions 
Comet already implements natively (currently `CollectSet` and 
`BloomFilterAggregate` among the TypedImperative set; future natives like 
`CollectList` will benefit automatically).
   
   ## How are these changes tested?
   
   Added `CometAggregateSuite` test "SortAggregate with collect_set is 
converted to native": forces `useObjectHashAggregateExec=false`, asserts Spark 
plans `SortAggregateExec` for the query, then runs Comet and asserts the entire 
plan executes natively and matches Spark. Verified locally with the full 
`CometAggregateSuite` (80 tests pass) and `CometExecRuleSuite`.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to