andygrove opened a new pull request, #4565: URL: https://github.com/apache/datafusion-comet/pull/4565
## Which issue does this PR close? N/A — proactive coverage for an aggregate operator Spark sometimes plans that Comet was previously falling back on. ## Rationale for this change Spark's planner picks `SortAggregateExec` when neither `HashAggregateExec` nor `ObjectHashAggregateExec` fits — typically when `spark.sql.execution.useObjectHashAggregateExec=false` is set, or for `TypedImperativeAggregate` functions whose buffer state can't ride hash aggregation. Today Comet doesn't recognize the operator at all, so any such Partial→Final pair runs on Spark and tends to drag the surrounding shuffle off Comet too. CollectSet under `useObjectHashAggregate=false` is the motivating example. ## What changes are included in this PR? - Map `SortAggregateExec` to a new `CometSortAggregateExec` serde object that reuses the existing `CometBaseAggregate.doConvert` path. Same Comet-shuffle gate as `CometObjectHashAggregateExec` (their TypedImperativeAggregate buffer formats differ between Spark and Comet, so a Comet-Partial / Spark-Final split would crash). - `createExec` produces a `CometHashAggregateExec` wrapper — the same approach `CometObjectHashAggregateExec` already uses. `CometExec.outputOrdering` defaults to `originalPlan.outputOrdering`, so `SortAggregateExec`'s grouping-key ordering flows through unchanged for any downstream operator that elided a sort against it. - No proto / Rust changes: DataFusion's `AggregateExec::try_new` auto-detects `InputOrderMode::Sorted` from the child's output ordering, so sorted input naturally produces sorted output and the streaming-aggregate optimization kicks in for free. - Cleanups landed in passing: lifted `getSupportLevel` (was duplicated 3x) and `adjustOutputForNativeState` (was duplicated verbatim 2x) onto the `CometBaseAggregate` trait, and collapsed three Spark-side aggregate enumeration sites (`isAggregate`, `findCometPartialAgg`, the shuffle guard in `canAggregateBeConverted`) to match `BaseAggregateExec` once instead of listing each subclass. ## What's NOT included Arbitrary user-defined `TypedImperativeAggregate` UDAFs still fall back — their `update`/`merge`/`serialize` methods are JVM code that native execution can't invoke. This PR only enables `SortAggregateExec` for aggregate functions Comet already implements natively (currently `CollectSet` and `BloomFilterAggregate` among the TypedImperative set; future natives like `CollectList` will benefit automatically). ## How are these changes tested? Added `CometAggregateSuite` test "SortAggregate with collect_set is converted to native": forces `useObjectHashAggregateExec=false`, asserts Spark plans `SortAggregateExec` for the query, then runs Comet and asserts the entire plan executes natively and matches Spark. Verified locally with the full `CometAggregateSuite` (80 tests pass) and `CometExecRuleSuite`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
