andygrove opened a new issue, #3154:
URL: https://github.com/apache/datafusion-comet/issues/3154

   ## What is the problem the feature request solves?
   
   > **Note:** This issue was generated with AI assistance. The specification 
details have been extracted from Spark documentation and may need verification.
   
   Comet does not currently support the Spark `array_aggregate` function, 
causing queries using this function to fall back to Spark's JVM execution 
instead of running natively on DataFusion.
   
   ArrayAggregate is a higher-order function that reduces an array to a single 
value by applying a merge function to each element with an accumulator, then 
applying a finish function to the final accumulator. It's equivalent to a fold 
operation followed by a transformation on arrays.
   
   Supporting this expression would allow more Spark workloads to benefit from 
Comet's native acceleration.
   
   ## Describe the potential solution
   
   ### Spark Specification
   
   **Syntax:**
   ```sql
   aggregate(array_expr, initial_value, merge_function, finish_function)
   ```
   
   **Arguments:**
   | Argument | Type | Description |
   |----------|------|-------------|
   | array_expr | ArrayType | The input array to aggregate |
   | initial_value | AnyDataType | The initial accumulator value |
   | merge_function | LambdaFunction | Function that takes (accumulator, 
element) and returns new accumulator |
   | finish_function | LambdaFunction | Function that transforms the final 
accumulator to the result |
   
   **Return Type:** The data type returned by the `finish_function` expression.
   
   **Supported Data Types:**
   - **Array**: Any ArrayType for the input array
   - **Initial Value**: Any data type for the initial accumulator
   - **Accumulator**: Must maintain consistent type throughout merge operations
   - **Result**: Determined by the finish function's return type
   
   **Edge Cases:**
   - **Null Array**: Returns null if the input array is null
   - **Empty Array**: Processes zero elements, applies finish function to 
initial value
   - **Null Elements**: Null array elements are passed to the merge function 
as-is
   - **Type Consistency**: The merge function output type must match the 
initial value type structurally
   - **Nullable Accumulator**: Always treats accumulator as nullable for safety
   
   **Examples:**
   ```sql
   -- Sum array elements with final multiplication
   SELECT aggregate(array(1, 2, 3), 0, (acc, x) -> acc + x, acc -> acc * 10);
   -- Result: 60
   
   -- Concatenate strings with prefix
   SELECT aggregate(array('a', 'b', 'c'), '', (acc, x) -> concat(acc, x), acc 
-> concat('prefix:', acc));
   -- Result: 'prefix:abc'
   
   -- Find maximum with default handling
   SELECT aggregate(array(5, 2, 8, 1), 0, (acc, x) -> greatest(acc, x), acc -> 
acc);
   -- Result: 8
   ```
   
   ```scala
   // Example DataFrame API usage
   import org.apache.spark.sql.functions._
   
   df.select(
     expr("aggregate(numbers, 0, (acc, x) -> acc + x, acc -> acc * 2)")
   ).show()
   
   // Using higher-order functions API
   df.select(
     aggregate(
       col("array_col"), 
       lit(0), 
       (acc, x) => acc + x,
       acc => acc * 2
     )
   ).show()
   ```
   
   ### Implementation Approach
   
   See the [Comet guide on adding new 
expressions](https://datafusion.apache.org/comet/contributor-guide/adding_a_new_expression.html)
 for detailed instructions.
   
   1. **Scala Serde**: Add expression handler in 
`spark/src/main/scala/org/apache/comet/serde/`
   2. **Register**: Add to appropriate map in `QueryPlanSerde.scala`
   3. **Protobuf**: Add message type in `native/proto/src/proto/expr.proto` if 
needed
   4. **Rust**: Implement in `native/spark-expr/src/` (check if DataFusion has 
built-in support first)
   
   
   ## Additional context
   
   **Difficulty:** Medium
   **Spark Expression Class:** 
`org.apache.spark.sql.catalyst.expressions.ArrayAggregate`
   
   **Related:**
   - `array_reduce` - Simplified version without finish function
   - `transform` - Apply function to each array element
   - `filter` - Filter array elements with predicate
   - `exists` - Check if any array element satisfies condition
   
   ---
   *This issue was auto-generated from Spark reference documentation.*
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to