peterxcli opened a new pull request, #4727:
URL: https://github.com/apache/datafusion-comet/pull/4727

   ## Which issue does this PR close?
   
   <!--
   We generally require a GitHub issue to be filed for all bug fixes and 
enhancements and this helps us generate change logs for our releases. You can 
link an issue to this PR using the GitHub syntax. For example `Closes #123` 
indicates that this PR will close issue #123.
   -->
   
   Closes https://github.com/apache/datafusion-comet/issues/4724.
   
   ## Rationale for this change
   
   `collect_list` / `array_agg` and `collect_set` use Spark 
`TypedImperativeAggregate` buffers that Spark declares as serialized 
`BinaryType`, while Comet’s native implementations keep the real aggregate 
state as an Arrow/Spark `ArrayType`.
   
   The existing schema adjustment only handled simple two-stage `Partial -> 
Final` collect aggregates. Spark’s distinct-aggregate rewrite can introduce 
multi-stage plans with `PartialMerge` stages, for example:
   
   ```sql
   SELECT x, count(DISTINCT y), collect_list(z) FROM t GROUP BY x
   ```
   
   Without correcting the intermediate buffer schema for these stages, a 
fully-native pipeline can fail when native list state is treated as Spark 
binary state. This change makes the native array state round-trip through 
`Partial`, `PartialMerge`, and mixed `{Partial, PartialMerge}` stages so 
`collect_list` / `collect_set` can run fully native in distinct-combined 
aggregate plans.
   
   ## What changes are included in this PR?
   
   - Adds native `collect_list` / `array_agg` aggregate support:
     - Adds `CollectList` to the aggregate expression proto.
     - Adds JVM serde for Spark `CollectList`.
     - Registers `CollectList -> CometCollectList`.
     - Wires native planning to 
`datafusion_spark::function::aggregate::collect::SparkCollectList`.
   
   - Extends collect aggregate native-state schema adjustment:
     - Updates `CometObjectHashAggregateExec.adjustOutputForNativeState` to 
handle both `CollectList` and `CollectSet`.
     - Applies the rewrite to intermediate `Partial`, `PartialMerge`, and mixed 
`{Partial, PartialMerge}` stages, not only pure `Partial`.
     - Rewrites Spark’s serialized `BinaryType` buffer attributes to Comet’s 
native `ArrayType(elementType, containsNull = true)` state type.
   
   - Adds regression coverage for fully-native distinct-combined collect 
aggregates.
   
   - Updates expression support docs and aggregate audit notes for 
`collect_list` / `array_agg`.
   
   ## How are these changes tested?
   
   Ran:
   
   ```bash
   cargo fmt --manifest-path native/Cargo.toml --all
   cargo check --manifest-path native/Cargo.toml -p datafusion-comet
   make core
   ./mvnw test -Dtest=none -Dsuites="org.apache.comet.exec.CometAggregateSuite 
collect_list/collect_set combined with distinct aggregate runs fully native" 
-Dscalastyle.skip=true -Pspark-3.5
   ./mvnw spotless:check -Dscalastyle.skip=true -Pspark-3.5
   ```
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to