azhangd opened a new pull request, #21548:
URL: https://github.com/apache/datafusion/pull/21548

   ## Which issue does this PR close?
   
   - Part of #17964.
   
   ## Rationale for this change
   
   SparkAvg's AvgGroupsAccumulator doesn't implement supports_convert_to_state 
(defaults to false), which prevents the skip-partial-aggregation optimization 
from kicking in for queries that use Spark's avg().
   
   I ran into this while benchmarking a Spark Connect engine built on 
DataFusion. On TPC-H q17 at SF10, the partial aggregate for avg(l_quantity) 
grouped by l_partkey (~2M groups out of 60M rows) was not triggering 
skip-aggregation:
   
   | Metric | Without convert_to_state | With convert_to_state |
   |--------|-------------------------|-----------------------|
   | Partial aggregate memory | 923 MB | 40 MB |
   | Partial aggregate elapsed | 4.75s | 109ms |
   
   The skip-aggregation probe (#11627) detects when a partial aggregate isn't 
reducing cardinality and falls back to passing rows through as state directly. 
This needs convert_to_state so the accumulator can produce [sum, count] state 
arrays from raw input. The built-in Avg already has this (#11734), but it 
wasn't carried over when SparkAvg was migrated from Comet in #17871.
   
   ## What changes are included in this PR?
   
   Adds convert_to_state() and supports_convert_to_state() to 
AvgGroupsAccumulator in datafusion-spark.
   
   Follows the same approach as the built-in Avg, adapted for SparkAvg's 
differences:
   - State order is [sum, count] (vs [count, sum] in the built-in)
   - Count type is Int64 (vs UInt64 in the built-in)
   - Null handling uses NullBuffer::union directly instead of pulling in 
datafusion-functions-aggregate-common as a dep
   
   Also cleaned up the fully-qualified arrow::array::BooleanArray references in 
update_batch / merge_batch since adding BooleanArray to the import block 
triggered the unused_qualifications lint.
   
   ## Are these changes tested?
   
   Yes, unit tests covering basic conversion, null propagation, filter 
handling, and a roundtrip through merge_batch to verify the converted state 
produces correct results end-to-end.
   
   ## Are there any user-facing changes?
   
   No. Queries using avg() through the Spark function registry will 
automatically benefit from skip-partial-aggregation on high-cardinality 
groupings.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to