andygrove opened a new issue, #4724:
URL: https://github.com/apache/datafusion-comet/issues/4724

   ## Describe the bug
   
   `collect_list` / `collect_set` declare their aggregate buffer as 
`BinaryType` in Spark (serialized `TypedImperativeAggregate` state) but produce 
a native `ArrayType` (Arrow `List`) state in Comet. Comet bridges this only for 
the simple two-stage shape: 
`CometObjectHashAggregateExec.adjustOutputForNativeState` rewrites the buffer 
column of a **pure-`Partial`** aggregate to `ArrayType`, so `Partial -> Final` 
runs natively and correctly.
   
   It does **not** handle multi-stage aggregates that contain a `PartialMerge` 
stage, which Spark introduces for the distinct-aggregate rewrite, e.g.:
   
   ```sql
   SELECT x, count(DISTINCT y), collect_list(z) FROM t GROUP BY x
   ```
   
   This plans as `Partial(x,y) -> PartialMerge(x,y) -> [PartialMerge, 
Partial](x) -> Final(x)`. The intermediate `PartialMerge` outputs are still 
declared `BinaryType`, so a fully-native pipeline crashes at runtime:
   
   ```
   CometNativeException: Cast error: Cannot cast LIST to non-list data type 
Binary
   ```
   
   (and a related nullability drift `List(non-null T)` vs `List(nullable T)` 
for nested element types). This is a facet of the broader Arrow-type-drift 
tracked in #4515.
   
   This affects both `collect_list` (added in #4720) and the already-shipped 
`collect_set` — `collect_set` has the same latent crash on this shape.
   
   ## Current behavior (workaround in #4720)
   
   To avoid the crash, multi-stage `collect_list`/`collect_set` aggregates now 
fall back to Spark consistently:
   - `CometExecRule.tagUnsafePartialAggregates` tags the feeding pure-`Partial` 
when a `PartialMerge` stage of a `CollectList`/`CollectSet` aggregate is 
present.
   - `CometBaseAggregate.doConvert` falls back a `PartialMerge`/`Final` stage 
of these functions when no Comet partial produced the buffer (the cross-engine 
`LocalTableScan` case).
   
   The simple two-stage `collect_list`/`collect_set` cases continue to run 
natively.
   
   ## Expected behavior
   
   Enable fully-native multi-stage execution by correcting the intermediate 
buffer schema for `PartialMerge` (and multi-mode) stages of 
`CollectList`/`CollectSet` (extend `adjustOutputForNativeState` beyond 
pure-`Partial`, and fix the element-nullability drift), so the `ArrayType` 
buffer round-trips across all stages. Then remove the fallback guards added in 
#4720.
   
   ## Additional context
   
   - Related: #4515 (DataFusion / datafusion-spark functions whose Arrow return 
type drifts from Spark catalyst's declared type)
   - Introduced alongside: #4720 (native `collect_list` / `array_agg`)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to