kosiew opened a new issue, #22638:
URL: https://github.com/apache/datafusion/issues/22638

   ## Summary
   
   Spark Avg and built-in Avg currently maintain parallel group-accumulator 
logic for state conversion and merge behavior. Recent regressions showed that 
these code paths can drift, especially around null-state semantics used by 
convert_to_state and merge_batch. This issue proposes consolidating shared 
behavior so null/filter handling, state layout, and merge semantics are 
implemented once and reused by both implementations.
   
   ## Problem
   
   The Spark implementation in datafusion/spark/src/function/aggregate/avg.rs 
now includes convert_to_state support that mirrors behavior already present in 
datafusion/functions-aggregate/src/average.rs. However, duplicate logic 
increases the chance of semantic divergence:
   
   - null-state rows encoded by convert_to_state may be handled differently by 
merge paths
   - shared edge-case fixes can land in one implementation but not the other
   - reviewer burden grows because behavior must be compared manually across 
two files
   
   This drift already surfaced in the [supports_convert_to_state gap during 
Comet migration](https://github.com/apache/datafusion/pull/21548) and again 
around null-aware merge correctness.
   
   ## Motivation
   
   - Correctness: eliminate divergence risks in null/filter/state semantics for 
avg aggregation.
   - Maintainability: centralize logic so bug fixes and optimizer-related 
behavior changes are implemented once.
   - Reviewability: make Spark Avg behavior easy to verify against built-in Avg.
   - Extensibility: create a reusable primitive for future aggregate 
implementations that need similar state conversion patterns.
   
   ## Goals
   
   1. Share core group-accumulator state handling between Spark Avg and 
built-in Avg.
   2. Ensure convert_to_state and merge_batch have identical null/filter 
semantics where intended.
   3. Keep state field ordering and types explicit and consistent with each 
implementation's contracts.
   4. Preserve public behavior for Spark compatibility and built-in SQL 
behavior.
   
   ## Non-Goals
   
   - Reworking all aggregate UDF implementations in one pass.
   - Changing Spark SQL semantic compatibility guarantees.
   - Performing broad optimizer rewrites unrelated to Avg state handling.
   
   ## Proposed Design
   
   ### 1. Extract a shared helper/primitive
   
   Introduce shared utilities in a common aggregate-support location (likely 
datafusion/functions-aggregate-common) that encapsulate:
   
   - null-mask derivation from input nulls plus optional filter
   - applying null masks consistently to state arrays (sum/count)
   - null-aware merge accumulation over group indices
   
   The helper should be generic over Arrow primitive type and count type where 
practical.
   
   ### 2. Rewire built-in Avg and Spark Avg to use shared logic
   
   - Update convert_to_state in Spark Avg to reuse the same helper path 
currently used in built-in Avg (filtered_null_mask and set_nulls or a shared 
wrapper around them).
   - Update merge_batch in Spark Avg to use null-aware accumulation semantics 
equivalent to built-in Avg groups accumulator behavior.
   - Keep implementation-specific differences (type aliases, return field 
wiring, Spark-specific constraints) isolated around the shared core.
   
   ### 3. Keep state contracts explicit
   
   Document and enforce expected state tuple ordering in both implementations:
   
   - Spark Avg today: [sum, count]
   - Built-in Avg today: [count, sum]
   
   If ordering remains intentionally different, codify this in helper 
interfaces and tests to avoid accidental cross-use bugs.
   
   ## Detailed Work Items
   
   1. Design and add a shared state-conversion helper API.
   2. Design and add a shared null-aware group merge helper API.
   3. Refactor Spark Avg to use helpers for convert_to_state and merge_batch.
   4. Optionally align built-in Avg to the same abstraction boundary (without 
behavior change).
   5. Add regression tests that round-trip convert_to_state -> merge_batch -> 
evaluate for:
      - nullable input rows
      - filtered input rows
      - mixed nullable+filtered rows
   6. Add tests validating state ordering assumptions and merge correctness.
   7. Add micro-benchmark comparison (if needed) between helper-based and 
previous custom code path for Spark Avg.
   
   ## Testing Strategy
   
   ### Unit tests
   
   - Spark Avg groups accumulator:
     - null input rows are ignored in merge when represented as null state 
entries
     - filtered rows are ignored in merge when represented as null state entries
     - all-non-null path unchanged
   
   - Built-in Avg groups accumulator:
     - no behavior regression for convert_to_state and merge_batch
   
   ### SQL logic tests
   
   Add or extend sqllogictest coverage for avg over nullable inputs and 
filter-sensitive scenarios to validate end-to-end query behavior.
   
   
   ## Acceptance Criteria
   
   1. Spark Avg and built-in Avg both pass convert_to_state/merge/evaluate 
regression tests for null and filtered inputs.
   2. Spark Avg no longer open-codes null-mask + merge logic where shared 
helper applies.
   3. No behavior regression in existing Avg tests across datafusion-spark and 
functions-aggregate crates.
   4. Optional benchmark evidence shows no material performance regression for 
representative avg group workloads.
   
   ## References
   
   - Spark implementation: datafusion/spark/src/function/aggregate/avg.rs
   - Built-in implementation: datafusion/functions-aggregate/src/average.rs
   - Existing helper utilities: datafusion_functions_aggregate_common 
(filtered_null_mask, set_nulls)
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to