This is an automated email from the ASF dual-hosted git repository. jeffreyvo pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/datafusion.git
The following commit(s) were added to refs/heads/main by this push: new 241e47d1c9 Support `avg(distinct)` for `float64` type (#17255) 241e47d1c9 is described below commit 241e47d1c939fb2b6839c1c87c436b7405bd30af Author: Jeffrey Vo <jeffrey.vo.austra...@gmail.com> AuthorDate: Sun Aug 24 11:03:23 2025 +1000 Support `avg(distinct)` for `float64` type (#17255) * chore: mv `DistinctSumAccumulator` to common * feat: add avg distinct support for float64 type * chore: fmt * refactor: update import for DataType in Float64DistinctAvgAccumulator and remove unused sum_distinct module * update * Revert "update" This reverts commit 1fa2db8a8051796f68f5dc747565b93e4d671007. * Fix submodules * Fix broken import * Fix state_fields for distinct avg * Update SLT test files * Added null case for SLT test * Disable group accumulator support for avg(distinct) and add group by test case * Fix state field name for avg distinct Co-authored-by: Andrew Lamb <and...@nerdnetworks.org> * Mix up tests * Update datafusion-tesitng pin --------- Co-authored-by: YuNing Chen <ad...@ynchen.me> Co-authored-by: Andrew Lamb <and...@nerdnetworks.org> --- datafusion-testing | 2 +- .../functions-aggregate-common/src/aggregate.rs | 2 + .../{aggregate.rs => aggregate/avg_distinct.rs} | 5 +- .../src/aggregate/avg_distinct/numeric.rs | 78 ++++++++++++ .../{aggregate.rs => aggregate/sum_distinct.rs} | 7 +- .../src/aggregate/sum_distinct/numeric.rs | 123 +++++++++++++++++++ datafusion/functions-aggregate/src/average.rs | 131 ++++++++++++--------- datafusion/functions-aggregate/src/sum.rs | 89 +------------- datafusion/sqllogictest/test_files/aggregate.slt | 60 +++++++++- 9 files changed, 348 insertions(+), 149 deletions(-) diff --git a/datafusion-testing b/datafusion-testing index aed98a3bd7..f72ac4075a 160000 --- a/datafusion-testing +++ b/datafusion-testing @@ -1 +1 @@ -Subproject commit aed98a3bd7b7b9dc82da514ec876e8fe6fa7e10e +Subproject commit f72ac4075ada5ea9810551bc0c3e3161c61204a2 diff --git a/datafusion/functions-aggregate-common/src/aggregate.rs b/datafusion/functions-aggregate-common/src/aggregate.rs index c9cbaa8396..aadce907e7 100644 --- a/datafusion/functions-aggregate-common/src/aggregate.rs +++ b/datafusion/functions-aggregate-common/src/aggregate.rs @@ -15,5 +15,7 @@ // specific language governing permissions and limitations // under the License. +pub mod avg_distinct; pub mod count_distinct; pub mod groups_accumulator; +pub mod sum_distinct; diff --git a/datafusion/functions-aggregate-common/src/aggregate.rs b/datafusion/functions-aggregate-common/src/aggregate/avg_distinct.rs similarity index 92% copy from datafusion/functions-aggregate-common/src/aggregate.rs copy to datafusion/functions-aggregate-common/src/aggregate/avg_distinct.rs index c9cbaa8396..3d6889431d 100644 --- a/datafusion/functions-aggregate-common/src/aggregate.rs +++ b/datafusion/functions-aggregate-common/src/aggregate/avg_distinct.rs @@ -15,5 +15,6 @@ // specific language governing permissions and limitations // under the License. -pub mod count_distinct; -pub mod groups_accumulator; +mod numeric; + +pub use numeric::Float64DistinctAvgAccumulator; diff --git a/datafusion/functions-aggregate-common/src/aggregate/avg_distinct/numeric.rs b/datafusion/functions-aggregate-common/src/aggregate/avg_distinct/numeric.rs new file mode 100644 index 0000000000..bb43acc261 --- /dev/null +++ b/datafusion/functions-aggregate-common/src/aggregate/avg_distinct/numeric.rs @@ -0,0 +1,78 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::fmt::Debug; + +use arrow::array::ArrayRef; +use arrow::datatypes::{DataType, Float64Type}; +use datafusion_common::{Result, ScalarValue}; +use datafusion_expr_common::accumulator::Accumulator; + +use crate::aggregate::sum_distinct::DistinctSumAccumulator; + +/// Specialized implementation of `AVG DISTINCT` for Float64 values, leveraging +/// the existing DistinctSumAccumulator implementation. +#[derive(Debug)] +pub struct Float64DistinctAvgAccumulator { + // We use the DistinctSumAccumulator to handle the set of distinct values + sum_accumulator: DistinctSumAccumulator<Float64Type>, +} + +impl Default for Float64DistinctAvgAccumulator { + fn default() -> Self { + Self { + sum_accumulator: DistinctSumAccumulator::<Float64Type>::new( + &DataType::Float64, + ), + } + } +} + +impl Accumulator for Float64DistinctAvgAccumulator { + fn state(&mut self) -> Result<Vec<ScalarValue>> { + self.sum_accumulator.state() + } + + fn update_batch(&mut self, values: &[ArrayRef]) -> Result<()> { + self.sum_accumulator.update_batch(values) + } + + fn merge_batch(&mut self, states: &[ArrayRef]) -> Result<()> { + self.sum_accumulator.merge_batch(states) + } + + fn evaluate(&mut self) -> Result<ScalarValue> { + // Get the sum from the DistinctSumAccumulator + let sum_result = self.sum_accumulator.evaluate()?; + + // Extract the sum value + if let ScalarValue::Float64(Some(sum)) = sum_result { + // Get the count of distinct values + let count = self.sum_accumulator.distinct_count() as f64; + // Calculate average + let avg = sum / count; + Ok(ScalarValue::Float64(Some(avg))) + } else { + // If sum is None, return None (null) + Ok(ScalarValue::Float64(None)) + } + } + + fn size(&self) -> usize { + self.sum_accumulator.size() + } +} diff --git a/datafusion/functions-aggregate-common/src/aggregate.rs b/datafusion/functions-aggregate-common/src/aggregate/sum_distinct.rs similarity index 88% copy from datafusion/functions-aggregate-common/src/aggregate.rs copy to datafusion/functions-aggregate-common/src/aggregate/sum_distinct.rs index c9cbaa8396..932bfba0bf 100644 --- a/datafusion/functions-aggregate-common/src/aggregate.rs +++ b/datafusion/functions-aggregate-common/src/aggregate/sum_distinct.rs @@ -15,5 +15,8 @@ // specific language governing permissions and limitations // under the License. -pub mod count_distinct; -pub mod groups_accumulator; +//! Sum distinct accumulator implementations + +pub mod numeric; + +pub use numeric::DistinctSumAccumulator; diff --git a/datafusion/functions-aggregate-common/src/aggregate/sum_distinct/numeric.rs b/datafusion/functions-aggregate-common/src/aggregate/sum_distinct/numeric.rs new file mode 100644 index 0000000000..3021783a2a --- /dev/null +++ b/datafusion/functions-aggregate-common/src/aggregate/sum_distinct/numeric.rs @@ -0,0 +1,123 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Defines the accumulator for `SUM DISTINCT` for primitive numeric types + +use std::collections::HashSet; +use std::fmt::Debug; +use std::mem::{size_of, size_of_val}; + +use ahash::RandomState; +use arrow::array::Array; +use arrow::array::ArrayRef; +use arrow::array::ArrowNativeTypeOp; +use arrow::array::ArrowPrimitiveType; +use arrow::array::AsArray; +use arrow::datatypes::ArrowNativeType; +use arrow::datatypes::DataType; + +use datafusion_common::Result; +use datafusion_common::ScalarValue; +use datafusion_expr_common::accumulator::Accumulator; + +use crate::utils::Hashable; + +/// Accumulator for computing SUM(DISTINCT expr) +pub struct DistinctSumAccumulator<T: ArrowPrimitiveType> { + values: HashSet<Hashable<T::Native>, RandomState>, + data_type: DataType, +} + +impl<T: ArrowPrimitiveType> Debug for DistinctSumAccumulator<T> { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "DistinctSumAccumulator({})", self.data_type) + } +} + +impl<T: ArrowPrimitiveType> DistinctSumAccumulator<T> { + pub fn new(data_type: &DataType) -> Self { + Self { + values: HashSet::default(), + data_type: data_type.clone(), + } + } + + pub fn distinct_count(&self) -> usize { + self.values.len() + } +} + +impl<T: ArrowPrimitiveType> Accumulator for DistinctSumAccumulator<T> { + fn state(&mut self) -> Result<Vec<ScalarValue>> { + // 1. Stores aggregate state in `ScalarValue::List` + // 2. Constructs `ScalarValue::List` state from distinct numeric stored in hash set + let state_out = { + let distinct_values = self + .values + .iter() + .map(|value| { + ScalarValue::new_primitive::<T>(Some(value.0), &self.data_type) + }) + .collect::<Result<Vec<_>>>()?; + + vec![ScalarValue::List(ScalarValue::new_list_nullable( + &distinct_values, + &self.data_type, + ))] + }; + Ok(state_out) + } + + fn update_batch(&mut self, values: &[ArrayRef]) -> Result<()> { + if values.is_empty() { + return Ok(()); + } + + let array = values[0].as_primitive::<T>(); + match array.nulls().filter(|x| x.null_count() > 0) { + Some(n) => { + for idx in n.valid_indices() { + self.values.insert(Hashable(array.value(idx))); + } + } + None => array.values().iter().for_each(|x| { + self.values.insert(Hashable(*x)); + }), + } + Ok(()) + } + + fn merge_batch(&mut self, states: &[ArrayRef]) -> Result<()> { + for x in states[0].as_list::<i32>().iter().flatten() { + self.update_batch(&[x])? + } + Ok(()) + } + + fn evaluate(&mut self) -> Result<ScalarValue> { + let mut acc = T::Native::usize_as(0); + for distinct_value in self.values.iter() { + acc = acc.add_wrapping(distinct_value.0) + } + let v = (!self.values.is_empty()).then_some(acc); + ScalarValue::new_primitive::<T>(v, &self.data_type) + } + + fn size(&self) -> usize { + size_of_val(self) + self.values.capacity() * size_of::<T::Native>() + } +} diff --git a/datafusion/functions-aggregate/src/average.rs b/datafusion/functions-aggregate/src/average.rs index 71272fc8ca..f7cb74fd55 100644 --- a/datafusion/functions-aggregate/src/average.rs +++ b/datafusion/functions-aggregate/src/average.rs @@ -40,6 +40,7 @@ use datafusion_expr::{ ReversedUDAF, Signature, }; +use datafusion_functions_aggregate_common::aggregate::avg_distinct::Float64DistinctAvgAccumulator; use datafusion_functions_aggregate_common::aggregate::groups_accumulator::accumulate::NullState; use datafusion_functions_aggregate_common::aggregate::groups_accumulator::nulls::{ filtered_null_mask, set_nulls, @@ -114,79 +115,95 @@ impl AggregateUDFImpl for Avg { } fn accumulator(&self, acc_args: AccumulatorArgs) -> Result<Box<dyn Accumulator>> { - if acc_args.is_distinct { - return exec_err!("avg(DISTINCT) aggregations are not available"); - } + let data_type = acc_args.exprs[0].data_type(acc_args.schema)?; use DataType::*; - let data_type = acc_args.exprs[0].data_type(acc_args.schema)?; // instantiate specialized accumulator based for the type - match (&data_type, acc_args.return_field.data_type()) { - (Float64, Float64) => Ok(Box::<AvgAccumulator>::default()), - ( - Decimal128(sum_precision, sum_scale), - Decimal128(target_precision, target_scale), - ) => Ok(Box::new(DecimalAvgAccumulator::<Decimal128Type> { - sum: None, - count: 0, - sum_scale: *sum_scale, - sum_precision: *sum_precision, - target_precision: *target_precision, - target_scale: *target_scale, - })), - - ( - Decimal256(sum_precision, sum_scale), - Decimal256(target_precision, target_scale), - ) => Ok(Box::new(DecimalAvgAccumulator::<Decimal256Type> { - sum: None, - count: 0, - sum_scale: *sum_scale, - sum_precision: *sum_precision, - target_precision: *target_precision, - target_scale: *target_scale, - })), - - (Duration(time_unit), Duration(result_unit)) => { - Ok(Box::new(DurationAvgAccumulator { + if acc_args.is_distinct { + match &data_type { + // Numeric types are converted to Float64 via `coerce_avg_type` during logical plan creation + Float64 => Ok(Box::new(Float64DistinctAvgAccumulator::default())), + _ => exec_err!("AVG(DISTINCT) for {} not supported", data_type), + } + } else { + match (&data_type, acc_args.return_field.data_type()) { + (Float64, Float64) => Ok(Box::<AvgAccumulator>::default()), + ( + Decimal128(sum_precision, sum_scale), + Decimal128(target_precision, target_scale), + ) => Ok(Box::new(DecimalAvgAccumulator::<Decimal128Type> { sum: None, count: 0, - time_unit: *time_unit, - result_unit: *result_unit, - })) - } + sum_scale: *sum_scale, + sum_precision: *sum_precision, + target_precision: *target_precision, + target_scale: *target_scale, + })), + + ( + Decimal256(sum_precision, sum_scale), + Decimal256(target_precision, target_scale), + ) => Ok(Box::new(DecimalAvgAccumulator::<Decimal256Type> { + sum: None, + count: 0, + sum_scale: *sum_scale, + sum_precision: *sum_precision, + target_precision: *target_precision, + target_scale: *target_scale, + })), + + (Duration(time_unit), Duration(result_unit)) => { + Ok(Box::new(DurationAvgAccumulator { + sum: None, + count: 0, + time_unit: *time_unit, + result_unit: *result_unit, + })) + } - _ => exec_err!( - "AvgAccumulator for ({} --> {})", - &data_type, - acc_args.return_field.data_type() - ), + _ => exec_err!( + "AvgAccumulator for ({} --> {})", + &data_type, + acc_args.return_field.data_type() + ), + } } } fn state_fields(&self, args: StateFieldsArgs) -> Result<Vec<FieldRef>> { - Ok(vec![ - Field::new( - format_state_name(args.name, "count"), - DataType::UInt64, - true, - ), - Field::new( - format_state_name(args.name, "sum"), - args.input_fields[0].data_type().clone(), - true, - ), - ] - .into_iter() - .map(Arc::new) - .collect()) + if args.is_distinct { + // Copied from datafusion_functions_aggregate::sum::Sum::state_fields + // since the accumulator uses DistinctSumAccumulator internally. + Ok(vec![Field::new_list( + format_state_name(args.name, "avg distinct"), + Field::new_list_field(args.return_type().clone(), true), + false, + ) + .into()]) + } else { + Ok(vec![ + Field::new( + format_state_name(args.name, "count"), + DataType::UInt64, + true, + ), + Field::new( + format_state_name(args.name, "sum"), + args.input_fields[0].data_type().clone(), + true, + ), + ] + .into_iter() + .map(Arc::new) + .collect()) + } } fn groups_accumulator_supported(&self, args: AccumulatorArgs) -> bool { matches!( args.return_field.data_type(), DataType::Float64 | DataType::Decimal128(_, _) | DataType::Duration(_) - ) + ) && !args.is_distinct } fn create_groups_accumulator( diff --git a/datafusion/functions-aggregate/src/sum.rs b/datafusion/functions-aggregate/src/sum.rs index 1e19543a16..445c7dfe6b 100644 --- a/datafusion/functions-aggregate/src/sum.rs +++ b/datafusion/functions-aggregate/src/sum.rs @@ -20,13 +20,11 @@ use ahash::RandomState; use datafusion_expr::utils::AggregateOrderSensitivity; use std::any::Any; -use std::collections::HashSet; -use std::mem::{size_of, size_of_val}; +use std::mem::size_of_val; use arrow::array::Array; use arrow::array::ArrowNativeTypeOp; use arrow::array::{ArrowNumericType, AsArray}; -use arrow::datatypes::ArrowPrimitiveType; use arrow::datatypes::{ArrowNativeType, FieldRef}; use arrow::datatypes::{ DataType, Decimal128Type, Decimal256Type, Float64Type, Int64Type, UInt64Type, @@ -44,7 +42,7 @@ use datafusion_expr::{ SetMonotonicity, Signature, Volatility, }; use datafusion_functions_aggregate_common::aggregate::groups_accumulator::prim_op::PrimitiveGroupsAccumulator; -use datafusion_functions_aggregate_common::utils::Hashable; +use datafusion_functions_aggregate_common::aggregate::sum_distinct::DistinctSumAccumulator; use datafusion_macros::user_doc; make_udaf_expr_and_func!( @@ -187,7 +185,7 @@ impl AggregateUDFImpl for Sum { if args.is_distinct { macro_rules! helper { ($t:ty, $dt:expr) => { - Ok(Box::new(DistinctSumAccumulator::<$t>::try_new(&$dt)?)) + Ok(Box::new(DistinctSumAccumulator::<$t>::new(&$dt))) }; } downcast_sum!(args, helper) @@ -408,87 +406,6 @@ impl<T: ArrowNumericType> Accumulator for SlidingSumAccumulator<T> { } } -struct DistinctSumAccumulator<T: ArrowPrimitiveType> { - values: HashSet<Hashable<T::Native>, RandomState>, - data_type: DataType, -} - -impl<T: ArrowPrimitiveType> std::fmt::Debug for DistinctSumAccumulator<T> { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - write!(f, "DistinctSumAccumulator({})", self.data_type) - } -} - -impl<T: ArrowPrimitiveType> DistinctSumAccumulator<T> { - pub fn try_new(data_type: &DataType) -> Result<Self> { - Ok(Self { - values: HashSet::default(), - data_type: data_type.clone(), - }) - } -} - -impl<T: ArrowPrimitiveType> Accumulator for DistinctSumAccumulator<T> { - fn state(&mut self) -> Result<Vec<ScalarValue>> { - // 1. Stores aggregate state in `ScalarValue::List` - // 2. Constructs `ScalarValue::List` state from distinct numeric stored in hash set - let state_out = { - let distinct_values = self - .values - .iter() - .map(|value| { - ScalarValue::new_primitive::<T>(Some(value.0), &self.data_type) - }) - .collect::<Result<Vec<_>>>()?; - - vec![ScalarValue::List(ScalarValue::new_list_nullable( - &distinct_values, - &self.data_type, - ))] - }; - Ok(state_out) - } - - fn update_batch(&mut self, values: &[ArrayRef]) -> Result<()> { - if values.is_empty() { - return Ok(()); - } - - let array = values[0].as_primitive::<T>(); - match array.nulls().filter(|x| x.null_count() > 0) { - Some(n) => { - for idx in n.valid_indices() { - self.values.insert(Hashable(array.value(idx))); - } - } - None => array.values().iter().for_each(|x| { - self.values.insert(Hashable(*x)); - }), - } - Ok(()) - } - - fn merge_batch(&mut self, states: &[ArrayRef]) -> Result<()> { - for x in states[0].as_list::<i32>().iter().flatten() { - self.update_batch(&[x])? - } - Ok(()) - } - - fn evaluate(&mut self) -> Result<ScalarValue> { - let mut acc = T::Native::usize_as(0); - for distinct_value in self.values.iter() { - acc = acc.add_wrapping(distinct_value.0) - } - let v = (!self.values.is_empty()).then_some(acc); - ScalarValue::new_primitive::<T>(v, &self.data_type) - } - - fn size(&self) -> usize { - size_of_val(self) + self.values.capacity() * size_of::<T::Native>() - } -} - /// A slidingâwindow accumulator for `SUM(DISTINCT)` over Int64 columns. /// Maintains a running sum so that `evaluate()` is O(1). #[derive(Debug)] diff --git a/datafusion/sqllogictest/test_files/aggregate.slt b/datafusion/sqllogictest/test_files/aggregate.slt index 8a4f0f4374..35b2a6c03b 100644 --- a/datafusion/sqllogictest/test_files/aggregate.slt +++ b/datafusion/sqllogictest/test_files/aggregate.slt @@ -5421,8 +5421,10 @@ select avg(distinct x_dict) from value_dict; ---- 3 -query error +query RR select avg(x_dict), avg(distinct x_dict) from value_dict; +---- +2.625 3 query I select min(x_dict) from value_dict; @@ -7387,3 +7389,59 @@ FROM (VALUES ('a'), ('d'), ('c'), ('a')) t(a_varchar); query error Error during planning: ORDER BY and WITHIN GROUP clauses cannot be used together in the same aggregate function SELECT array_agg(a_varchar order by a_varchar) WITHIN GROUP (ORDER BY a_varchar) FROM (VALUES ('a'), ('d'), ('c'), ('a')) t(a_varchar); + +# distinct average +statement ok +create table distinct_avg (a int, b double) as values + (3, null), + (2, null), + (5, 100.5), + (5, 1.0), + (5, 44.112), + (null, 1.0), + (5, 100.5), + (1, 4.09), + (5, 100.5), + (5, 100.5), + (4, null), + (null, null) +; + +# Need two columns to ensure single_distinct_to_group_by rule doesn't kick in, so we know our actual avg(distinct) code is being tested +query RTRTRR +select + avg(distinct a), + arrow_typeof(avg(distinct a)), + avg(distinct b), + arrow_typeof(avg(distinct b)), + avg(a), + avg(b) +from distinct_avg; +---- +3 Float64 37.4255 Float64 4 56.52525 + +query RR rowsort +select + avg(distinct a), + avg(distinct b) +from distinct_avg +group by b; +---- +1 4.09 +3 NULL +5 1 +5 100.5 +5 44.112 + +query RR +select + avg(distinct a), + avg(distinct b) +from distinct_avg +where a is null and b is null; +---- +NULL NULL + +statement ok +drop table distinct_avg; + --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@datafusion.apache.org For additional commands, e-mail: commits-h...@datafusion.apache.org