This is an automated email from the ASF dual-hosted git repository.

tustvold pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new 6c785d1f42 Specialize Avg and Sum accumulators (#6842) (#7358)
6c785d1f42 is described below

commit 6c785d1f424f1dddfe616a1ef3aa139f3b52f8d5
Author: Raphael Taylor-Davies <[email protected]>
AuthorDate: Wed Aug 23 09:17:36 2023 +0100

    Specialize Avg and Sum accumulators (#6842) (#7358)
    
    * Specialize SUM and AVG (#6842)
    
    * Specialize Distinct Sum
    
    * Review feedback
    
    * Update sqllogictest
---
 datafusion/core/src/execution/context.rs           |   7 +-
 datafusion/core/tests/sql/udf.rs                   |   7 +-
 datafusion/optimizer/src/analyzer/type_coercion.rs |  15 +-
 datafusion/physical-expr/src/aggregate/average.rs  | 206 ++++++++-----
 datafusion/physical-expr/src/aggregate/sum.rs      | 327 ++++++++-------------
 .../physical-expr/src/aggregate/sum_distinct.rs    | 170 +++++------
 datafusion/physical-expr/src/aggregate/utils.rs    |  31 +-
 datafusion/physical-expr/src/expressions/mod.rs    |   3 +-
 datafusion/sqllogictest/test_files/decimal.slt     |   2 +-
 9 files changed, 356 insertions(+), 412 deletions(-)

diff --git a/datafusion/core/src/execution/context.rs 
b/datafusion/core/src/execution/context.rs
index c97f770ab3..586fff30ac 100644
--- a/datafusion/core/src/execution/context.rs
+++ b/datafusion/core/src/execution/context.rs
@@ -2452,12 +2452,7 @@ mod tests {
             vec![DataType::Float64],
             Arc::new(DataType::Float64),
             Volatility::Immutable,
-            Arc::new(|_| {
-                Ok(Box::new(AvgAccumulator::try_new(
-                    &DataType::Float64,
-                    &DataType::Float64,
-                )?))
-            }),
+            Arc::new(|_| Ok(Box::<AvgAccumulator>::default())),
             Arc::new(vec![DataType::UInt64, DataType::Float64]),
         );
 
diff --git a/datafusion/core/tests/sql/udf.rs b/datafusion/core/tests/sql/udf.rs
index 86ff6ebac2..5aa3ab3bc0 100644
--- a/datafusion/core/tests/sql/udf.rs
+++ b/datafusion/core/tests/sql/udf.rs
@@ -237,12 +237,7 @@ async fn simple_udaf() -> Result<()> {
         vec![DataType::Float64],
         Arc::new(DataType::Float64),
         Volatility::Immutable,
-        Arc::new(|_| {
-            Ok(Box::new(AvgAccumulator::try_new(
-                &DataType::Float64,
-                &DataType::Float64,
-            )?))
-        }),
+        Arc::new(|_| Ok(Box::<AvgAccumulator>::default())),
         Arc::new(vec![DataType::UInt64, DataType::Float64]),
     );
 
diff --git a/datafusion/optimizer/src/analyzer/type_coercion.rs 
b/datafusion/optimizer/src/analyzer/type_coercion.rs
index 2a6e08cd52..1ebe234840 100644
--- a/datafusion/optimizer/src/analyzer/type_coercion.rs
+++ b/datafusion/optimizer/src/analyzer/type_coercion.rs
@@ -906,12 +906,7 @@ mod test {
             vec![DataType::Float64],
             Arc::new(DataType::Float64),
             Volatility::Immutable,
-            Arc::new(|_| {
-                Ok(Box::new(AvgAccumulator::try_new(
-                    &DataType::Float64,
-                    &DataType::Float64,
-                )?))
-            }),
+            Arc::new(|_| Ok(Box::<AvgAccumulator>::default())),
             Arc::new(vec![DataType::UInt64, DataType::Float64]),
         );
         let udaf = Expr::AggregateUDF(expr::AggregateUDF::new(
@@ -932,12 +927,8 @@ mod test {
             Arc::new(move |_| Ok(Arc::new(DataType::Float64)));
         let state_type: StateTypeFunction =
             Arc::new(move |_| Ok(Arc::new(vec![DataType::UInt64, 
DataType::Float64])));
-        let accumulator: AccumulatorFactoryFunction = Arc::new(|_| {
-            Ok(Box::new(AvgAccumulator::try_new(
-                &DataType::Float64,
-                &DataType::Float64,
-            )?))
-        });
+        let accumulator: AccumulatorFactoryFunction =
+            Arc::new(|_| Ok(Box::<AvgAccumulator>::default()));
         let my_avg = AggregateUDF::new(
             "MY_AVG",
             &Signature::uniform(1, vec![DataType::Float64], 
Volatility::Immutable),
diff --git a/datafusion/physical-expr/src/aggregate/average.rs 
b/datafusion/physical-expr/src/aggregate/average.rs
index 85e3e4f07a..ce8f587402 100644
--- a/datafusion/physical-expr/src/aggregate/average.rs
+++ b/datafusion/physical-expr/src/aggregate/average.rs
@@ -21,17 +21,13 @@ use arrow::array::{AsArray, PrimitiveBuilder};
 use log::debug;
 
 use std::any::Any;
-use std::convert::TryFrom;
 use std::sync::Arc;
 
 use crate::aggregate::groups_accumulator::accumulate::NullState;
-use crate::aggregate::sum;
-use crate::aggregate::sum::sum_batch;
-use crate::aggregate::utils::calculate_result_decimal_for_avg;
 use crate::aggregate::utils::down_cast_any_ref;
 use crate::expressions::format_state_name;
 use crate::{AggregateExpr, GroupsAccumulator, PhysicalExpr};
-use arrow::compute;
+use arrow::compute::sum;
 use arrow::datatypes::{DataType, Decimal128Type, Float64Type, UInt64Type};
 use arrow::{
     array::{ArrayRef, UInt64Array},
@@ -40,9 +36,7 @@ use arrow::{
 use arrow_array::{
     Array, ArrowNativeTypeOp, ArrowNumericType, ArrowPrimitiveType, 
PrimitiveArray,
 };
-use datafusion_common::{
-    downcast_value, internal_err, not_impl_err, DataFusionError, Result, 
ScalarValue,
-};
+use datafusion_common::{not_impl_err, DataFusionError, Result, ScalarValue};
 use datafusion_expr::type_coercion::aggregates::avg_return_type;
 use datafusion_expr::Accumulator;
 
@@ -87,11 +81,27 @@ impl AggregateExpr for Avg {
     }
 
     fn create_accumulator(&self) -> Result<Box<dyn Accumulator>> {
-        Ok(Box::new(AvgAccumulator::try_new(
-            // avg is f64 or decimal
-            &self.input_data_type,
-            &self.result_data_type,
-        )?))
+        use DataType::*;
+        // instantiate specialized accumulator based for the type
+        match (&self.input_data_type, &self.result_data_type) {
+            (Float64, Float64) => Ok(Box::<AvgAccumulator>::default()),
+            (
+                Decimal128(sum_precision, sum_scale),
+                Decimal128(target_precision, target_scale),
+            ) => Ok(Box::new(DecimalAvgAccumulator {
+                sum: None,
+                count: 0,
+                sum_scale: *sum_scale,
+                sum_precision: *sum_precision,
+                target_precision: *target_precision,
+                target_scale: *target_scale,
+            })),
+            _ => not_impl_err!(
+                "AvgAccumulator for ({} --> {})",
+                self.input_data_type,
+                self.result_data_type
+            ),
+        }
     }
 
     fn state_fields(&self) -> Result<Vec<Field>> {
@@ -122,10 +132,7 @@ impl AggregateExpr for Avg {
     }
 
     fn create_sliding_accumulator(&self) -> Result<Box<dyn Accumulator>> {
-        Ok(Box::new(AvgAccumulator::try_new(
-            &self.input_data_type,
-            &self.result_data_type,
-        )?))
+        self.create_accumulator()
     }
 
     fn groups_accumulator_supported(&self) -> bool {
@@ -189,91 +196,141 @@ impl PartialEq<dyn Any> for Avg {
 }
 
 /// An accumulator to compute the average
-#[derive(Debug)]
+#[derive(Debug, Default)]
 pub struct AvgAccumulator {
-    // sum is used for null
-    sum: ScalarValue,
-    return_data_type: DataType,
+    sum: Option<f64>,
     count: u64,
 }
 
-impl AvgAccumulator {
-    /// Creates a new `AvgAccumulator`
-    pub fn try_new(datatype: &DataType, return_data_type: &DataType) -> 
Result<Self> {
-        Ok(Self {
-            sum: ScalarValue::try_from(datatype)?,
-            return_data_type: return_data_type.clone(),
-            count: 0,
-        })
+impl Accumulator for AvgAccumulator {
+    fn state(&self) -> Result<Vec<ScalarValue>> {
+        Ok(vec![
+            ScalarValue::from(self.count),
+            ScalarValue::Float64(self.sum),
+        ])
+    }
+
+    fn update_batch(&mut self, values: &[ArrayRef]) -> Result<()> {
+        let values = values[0].as_primitive::<Float64Type>();
+        self.count += (values.len() - values.null_count()) as u64;
+        if let Some(x) = sum(values) {
+            let v = self.sum.get_or_insert(0.);
+            *v += x;
+        }
+        Ok(())
+    }
+
+    fn retract_batch(&mut self, values: &[ArrayRef]) -> Result<()> {
+        let values = values[0].as_primitive::<Float64Type>();
+        self.count -= (values.len() - values.null_count()) as u64;
+        if let Some(x) = sum(values) {
+            self.sum = Some(self.sum.unwrap() - x);
+        }
+        Ok(())
+    }
+
+    fn merge_batch(&mut self, states: &[ArrayRef]) -> Result<()> {
+        // counts are summed
+        self.count += 
sum(states[0].as_primitive::<UInt64Type>()).unwrap_or_default();
+
+        // sums are summed
+        if let Some(x) = sum(states[1].as_primitive::<Float64Type>()) {
+            let v = self.sum.get_or_insert(0.);
+            *v += x;
+        }
+        Ok(())
+    }
+
+    fn evaluate(&self) -> Result<ScalarValue> {
+        Ok(ScalarValue::Float64(
+            self.sum.map(|f| f / self.count as f64),
+        ))
+    }
+    fn supports_retract_batch(&self) -> bool {
+        true
+    }
+
+    fn size(&self) -> usize {
+        std::mem::size_of_val(self)
     }
 }
 
-impl Accumulator for AvgAccumulator {
+/// An accumulator to compute the average for decimals
+#[derive(Debug)]
+struct DecimalAvgAccumulator {
+    sum: Option<i128>,
+    count: u64,
+    sum_scale: i8,
+    sum_precision: u8,
+    target_precision: u8,
+    target_scale: i8,
+}
+
+impl Accumulator for DecimalAvgAccumulator {
     fn state(&self) -> Result<Vec<ScalarValue>> {
-        Ok(vec![ScalarValue::from(self.count), self.sum.clone()])
+        Ok(vec![
+            ScalarValue::from(self.count),
+            ScalarValue::Decimal128(self.sum, self.sum_precision, 
self.sum_scale),
+        ])
     }
 
     fn update_batch(&mut self, values: &[ArrayRef]) -> Result<()> {
-        let values = &values[0];
+        let values = values[0].as_primitive::<Decimal128Type>();
 
         self.count += (values.len() - values.null_count()) as u64;
-        self.sum = self.sum.add(&sum::sum_batch(values)?)?;
+        if let Some(x) = sum(values) {
+            let v = self.sum.get_or_insert(0);
+            *v += x;
+        }
         Ok(())
     }
 
     fn retract_batch(&mut self, values: &[ArrayRef]) -> Result<()> {
-        let values = &values[0];
+        let values = values[0].as_primitive::<Decimal128Type>();
         self.count -= (values.len() - values.null_count()) as u64;
-        let delta = sum_batch(values)?;
-        self.sum = self.sum.sub(&delta)?;
+        if let Some(x) = sum(values) {
+            self.sum = Some(self.sum.unwrap() - x);
+        }
         Ok(())
     }
 
     fn merge_batch(&mut self, states: &[ArrayRef]) -> Result<()> {
-        let counts = downcast_value!(states[0], UInt64Array);
         // counts are summed
-        self.count += compute::sum(counts).unwrap_or(0);
+        self.count += 
sum(states[0].as_primitive::<UInt64Type>()).unwrap_or_default();
 
         // sums are summed
-        self.sum = self.sum.add(&sum::sum_batch(&states[1])?)?;
+        if let Some(x) = sum(states[1].as_primitive::<Decimal128Type>()) {
+            let v = self.sum.get_or_insert(0);
+            *v += x;
+        }
         Ok(())
     }
 
     fn evaluate(&self) -> Result<ScalarValue> {
-        match self.sum {
-            ScalarValue::Float64(e) => {
-                Ok(ScalarValue::Float64(e.map(|f| f / self.count as f64)))
-            }
-            ScalarValue::Decimal128(value, _, scale) => {
-                match value {
-                    None => match &self.return_data_type {
-                        DataType::Decimal128(p, s) => {
-                            Ok(ScalarValue::Decimal128(None, *p, *s))
-                        }
-                        other => internal_err!(
-                            "Error returned data type in AvgAccumulator 
{other:?}"
-                        ),
-                    },
-                    Some(value) => {
-                        // now the sum_type and return type is not the same, 
need to convert the sum type to return type
-                        calculate_result_decimal_for_avg(
-                            value,
-                            self.count as i128,
-                            scale,
-                            &self.return_data_type,
-                        )
-                    }
-                }
-            }
-            _ => internal_err!("Sum should be f64 or decimal128 on average"),
-        }
+        let v = self
+            .sum
+            .map(|v| {
+                Decimal128Averager::try_new(
+                    self.sum_scale,
+                    self.target_precision,
+                    self.target_scale,
+                )?
+                .avg(v, self.count as _)
+            })
+            .transpose()?;
+
+        Ok(ScalarValue::Decimal128(
+            v,
+            self.target_precision,
+            self.target_scale,
+        ))
     }
     fn supports_retract_batch(&self) -> bool {
         true
     }
 
     fn size(&self) -> usize {
-        std::mem::size_of_val(self) - std::mem::size_of_val(&self.sum) + 
self.sum.size()
+        std::mem::size_of_val(self)
     }
 }
 
@@ -484,6 +541,7 @@ mod tests {
         assert_aggregate(
             array,
             AggregateFunction::Avg,
+            false,
             ScalarValue::Decimal128(Some(35000), 14, 4),
         );
     }
@@ -500,6 +558,7 @@ mod tests {
         assert_aggregate(
             array,
             AggregateFunction::Avg,
+            false,
             ScalarValue::Decimal128(Some(32500), 14, 4),
         );
     }
@@ -517,6 +576,7 @@ mod tests {
         assert_aggregate(
             array,
             AggregateFunction::Avg,
+            false,
             ScalarValue::Decimal128(None, 14, 4),
         );
     }
@@ -524,7 +584,7 @@ mod tests {
     #[test]
     fn avg_i32() {
         let a: ArrayRef = Arc::new(Int32Array::from(vec![1, 2, 3, 4, 5]));
-        assert_aggregate(a, AggregateFunction::Avg, ScalarValue::from(3_f64));
+        assert_aggregate(a, AggregateFunction::Avg, false, 
ScalarValue::from(3_f64));
     }
 
     #[test]
@@ -536,33 +596,33 @@ mod tests {
             Some(4),
             Some(5),
         ]));
-        assert_aggregate(a, AggregateFunction::Avg, 
ScalarValue::from(3.25f64));
+        assert_aggregate(a, AggregateFunction::Avg, false, 
ScalarValue::from(3.25f64));
     }
 
     #[test]
     fn avg_i32_all_nulls() {
         let a: ArrayRef = Arc::new(Int32Array::from(vec![None, None]));
-        assert_aggregate(a, AggregateFunction::Avg, 
ScalarValue::Float64(None));
+        assert_aggregate(a, AggregateFunction::Avg, false, 
ScalarValue::Float64(None));
     }
 
     #[test]
     fn avg_u32() {
         let a: ArrayRef =
             Arc::new(UInt32Array::from(vec![1_u32, 2_u32, 3_u32, 4_u32, 
5_u32]));
-        assert_aggregate(a, AggregateFunction::Avg, ScalarValue::from(3.0f64));
+        assert_aggregate(a, AggregateFunction::Avg, false, 
ScalarValue::from(3.0f64));
     }
 
     #[test]
     fn avg_f32() {
         let a: ArrayRef =
             Arc::new(Float32Array::from(vec![1_f32, 2_f32, 3_f32, 4_f32, 
5_f32]));
-        assert_aggregate(a, AggregateFunction::Avg, ScalarValue::from(3_f64));
+        assert_aggregate(a, AggregateFunction::Avg, false, 
ScalarValue::from(3_f64));
     }
 
     #[test]
     fn avg_f64() {
         let a: ArrayRef =
             Arc::new(Float64Array::from(vec![1_f64, 2_f64, 3_f64, 4_f64, 
5_f64]));
-        assert_aggregate(a, AggregateFunction::Avg, ScalarValue::from(3_f64));
+        assert_aggregate(a, AggregateFunction::Avg, false, 
ScalarValue::from(3_f64));
     }
 }
diff --git a/datafusion/physical-expr/src/aggregate/sum.rs 
b/datafusion/physical-expr/src/aggregate/sum.rs
index baaebada37..5cc8e93332 100644
--- a/datafusion/physical-expr/src/aggregate/sum.rs
+++ b/datafusion/physical-expr/src/aggregate/sum.rs
@@ -18,30 +18,22 @@
 //! Defines `SUM` and `SUM DISTINCT` aggregate accumulators
 
 use std::any::Any;
-use std::convert::TryFrom;
-use std::ops::AddAssign;
 use std::sync::Arc;
 
 use super::groups_accumulator::prim_op::PrimitiveGroupsAccumulator;
 use crate::aggregate::utils::down_cast_any_ref;
 use crate::expressions::format_state_name;
 use crate::{AggregateExpr, GroupsAccumulator, PhysicalExpr};
-use arrow::array::Array;
-use arrow::array::Decimal128Array;
-use arrow::array::Decimal256Array;
-use arrow::compute;
+use arrow::compute::sum;
 use arrow::datatypes::DataType;
-use arrow::{
-    array::{ArrayRef, Float64Array, Int64Array, UInt64Array},
-    datatypes::Field,
-};
+use arrow::{array::ArrayRef, datatypes::Field};
+use arrow_array::cast::AsArray;
 use arrow_array::types::{
-    Decimal128Type, Decimal256Type, Float32Type, Float64Type, Int32Type, 
Int64Type,
-    UInt32Type, UInt64Type,
-};
-use datafusion_common::{
-    downcast_value, internal_err, not_impl_err, DataFusionError, Result, 
ScalarValue,
+    Decimal128Type, Decimal256Type, Float64Type, Int64Type, UInt64Type,
 };
+use arrow_array::{Array, ArrowNativeTypeOp, ArrowNumericType};
+use arrow_buffer::ArrowNativeType;
+use datafusion_common::{not_impl_err, DataFusionError, Result, ScalarValue};
 use datafusion_expr::type_coercion::aggregates::sum_return_type;
 use datafusion_expr::Accumulator;
 
@@ -71,18 +63,24 @@ impl Sum {
     }
 }
 
-/// Creates a [`PrimitiveGroupsAccumulator`] with the specified
-/// [`ArrowPrimitiveType`] which applies `$FN` to each element
+/// Sum only supports a subset of numeric types, instead relying on type 
coercion
+///
+/// This macro is similar to 
[downcast_primitive](arrow_array::downcast_primitive)
 ///
-/// [`ArrowPrimitiveType`]: arrow::datatypes::ArrowPrimitiveType
-macro_rules! instantiate_primitive_accumulator {
-    ($SELF:expr, $PRIMTYPE:ident, $FN:expr) => {{
-        Ok(Box::new(PrimitiveGroupsAccumulator::<$PRIMTYPE, _>::new(
-            &$SELF.data_type,
-            $FN,
-        )))
-    }};
+/// `s` is a `Sum`, `helper` is a macro accepting (ArrowPrimitiveType, 
DataType)
+macro_rules! downcast_sum {
+    ($s:ident, $helper:ident) => {
+        match $s.data_type {
+            DataType::UInt64 => $helper!(UInt64Type, $s.data_type),
+            DataType::Int64 => $helper!(Int64Type, $s.data_type),
+            DataType::Float64 => $helper!(Float64Type, $s.data_type),
+            DataType::Decimal128(_, _) => $helper!(Decimal128Type, 
$s.data_type),
+            DataType::Decimal256(_, _) => $helper!(Decimal256Type, 
$s.data_type),
+            _ => not_impl_err!("Sum not supported for {}: {}", $s.name, 
$s.data_type),
+        }
+    };
 }
+pub(crate) use downcast_sum;
 
 impl AggregateExpr for Sum {
     /// Return a reference to Any that can be used for downcasting
@@ -99,7 +97,12 @@ impl AggregateExpr for Sum {
     }
 
     fn create_accumulator(&self) -> Result<Box<dyn Accumulator>> {
-        Ok(Box::new(SumAccumulator::try_new(&self.data_type)?))
+        macro_rules! helper {
+            ($t:ty, $dt:expr) => {
+                Ok(Box::new(SumAccumulator::<$t>::new($dt.clone())))
+            };
+        }
+        downcast_sum!(self, helper)
     }
 
     fn state_fields(&self) -> Result<Vec<Field>> {
@@ -123,46 +126,15 @@ impl AggregateExpr for Sum {
     }
 
     fn create_groups_accumulator(&self) -> Result<Box<dyn GroupsAccumulator>> {
-        // instantiate specialized accumulator
-        match self.data_type {
-            DataType::UInt64 => {
-                instantiate_primitive_accumulator!(self, UInt64Type, |x, y| x
-                    .add_assign(y))
-            }
-            DataType::Int64 => {
-                instantiate_primitive_accumulator!(self, Int64Type, |x, y| x
-                    .add_assign(y))
-            }
-            DataType::UInt32 => {
-                instantiate_primitive_accumulator!(self, UInt32Type, |x, y| x
-                    .add_assign(y))
-            }
-            DataType::Int32 => {
-                instantiate_primitive_accumulator!(self, Int32Type, |x, y| x
-                    .add_assign(y))
-            }
-            DataType::Float32 => {
-                instantiate_primitive_accumulator!(self, Float32Type, |x, y| x
-                    .add_assign(y))
-            }
-            DataType::Float64 => {
-                instantiate_primitive_accumulator!(self, Float64Type, |x, y| x
-                    .add_assign(y))
-            }
-            DataType::Decimal128(_, _) => {
-                instantiate_primitive_accumulator!(self, Decimal128Type, |x, 
y| x
-                    .add_assign(y))
-            }
-            DataType::Decimal256(_, _) => {
-                instantiate_primitive_accumulator!(self, Decimal256Type, |x, 
y| *x =
-                    *x + y)
-            }
-            _ => not_impl_err!(
-                "GroupsAccumulator not supported for {}: {}",
-                self.name,
-                self.data_type
-            ),
+        macro_rules! helper {
+            ($t:ty, $dt:expr) => {
+                Ok(Box::new(PrimitiveGroupsAccumulator::<$t, _>::new(
+                    &$dt,
+                    |x, y| *x = x.add_wrapping(y),
+                )))
+            };
         }
+        downcast_sum!(self, helper)
     }
 
     fn reverse_expr(&self) -> Option<Arc<dyn AggregateExpr>> {
@@ -170,7 +142,12 @@ impl AggregateExpr for Sum {
     }
 
     fn create_sliding_accumulator(&self) -> Result<Box<dyn Accumulator>> {
-        Ok(Box::new(SlidingSumAccumulator::try_new(&self.data_type)?))
+        macro_rules! helper {
+            ($t:ty, $dt:expr) => {
+                Ok(Box::new(SlidingSumAccumulator::<$t>::new($dt.clone())))
+            };
+        }
+        downcast_sum!(self, helper)
     }
 }
 
@@ -189,177 +166,135 @@ impl PartialEq<dyn Any> for Sum {
 }
 
 /// This accumulator computes SUM incrementally
-#[derive(Debug)]
-struct SumAccumulator {
-    sum: ScalarValue,
-}
-
-impl SumAccumulator {
-    /// new sum accumulator
-    pub fn try_new(data_type: &DataType) -> Result<Self> {
-        Ok(Self {
-            sum: ScalarValue::try_from(data_type)?,
-        })
-    }
-}
-
-/// This accumulator incrementally computes sums over a sliding window
-#[derive(Debug)]
-struct SlidingSumAccumulator {
-    sum: ScalarValue,
-    count: u64,
+struct SumAccumulator<T: ArrowNumericType> {
+    sum: Option<T::Native>,
+    data_type: DataType,
 }
 
-impl SlidingSumAccumulator {
-    /// new sum accumulator
-    pub fn try_new(data_type: &DataType) -> Result<Self> {
-        Ok(Self {
-            // start at zero
-            sum: ScalarValue::try_from(data_type)?,
-            count: 0,
-        })
+impl<T: ArrowNumericType> std::fmt::Debug for SumAccumulator<T> {
+    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+        write!(f, "SumAccumulator({})", self.data_type)
     }
 }
 
-/// Sums the contents of the `$VALUES` array using the arrow compute
-/// kernel, and return a `ScalarValue::$SCALAR`.
-///
-/// Handles nullability
-macro_rules! typed_sum_delta_batch {
-    ($VALUES:expr, $ARRAYTYPE:ident, $SCALAR:ident) => {{
-        let array = downcast_value!($VALUES, $ARRAYTYPE);
-        let delta = compute::sum(array);
-        ScalarValue::$SCALAR(delta)
-    }};
-}
-
-fn sum_decimal_batch(values: &ArrayRef, precision: u8, scale: i8) -> 
Result<ScalarValue> {
-    let array = downcast_value!(values, Decimal128Array);
-    let result = compute::sum(array);
-    Ok(ScalarValue::Decimal128(result, precision, scale))
-}
-
-fn sum_decimal256_batch(
-    values: &ArrayRef,
-    precision: u8,
-    scale: i8,
-) -> Result<ScalarValue> {
-    let array = downcast_value!(values, Decimal256Array);
-    let result = compute::sum(array);
-    Ok(ScalarValue::Decimal256(result, precision, scale))
-}
-
-// sums the array and returns a ScalarValue of its corresponding type.
-pub(crate) fn sum_batch(values: &ArrayRef) -> Result<ScalarValue> {
-    Ok(match values.data_type() {
-        DataType::Decimal128(precision, scale) => {
-            sum_decimal_batch(values, *precision, *scale)?
-        }
-        DataType::Decimal256(precision, scale) => {
-            sum_decimal256_batch(values, *precision, *scale)?
-        }
-        DataType::Float64 => typed_sum_delta_batch!(values, Float64Array, 
Float64),
-        DataType::Int64 => typed_sum_delta_batch!(values, Int64Array, Int64),
-        DataType::UInt64 => typed_sum_delta_batch!(values, UInt64Array, 
UInt64),
-        e => {
-            return internal_err!("Sum is not expected to receive the type 
{e:?}");
+impl<T: ArrowNumericType> SumAccumulator<T> {
+    fn new(data_type: DataType) -> Self {
+        Self {
+            sum: None,
+            data_type,
         }
-    })
+    }
 }
 
-impl Accumulator for SumAccumulator {
+impl<T: ArrowNumericType> Accumulator for SumAccumulator<T> {
     fn state(&self) -> Result<Vec<ScalarValue>> {
-        Ok(vec![self.sum.clone()])
+        Ok(vec![self.evaluate()?])
     }
 
     fn update_batch(&mut self, values: &[ArrayRef]) -> Result<()> {
-        let values = &values[0];
-        let delta = sum_batch(values)?;
-        self.sum = self.sum.add(&delta)?;
+        let values = values[0].as_primitive::<T>();
+        if let Some(x) = sum(values) {
+            let v = self.sum.get_or_insert(T::Native::usize_as(0));
+            *v = v.add_wrapping(x);
+        }
         Ok(())
     }
 
     fn merge_batch(&mut self, states: &[ArrayRef]) -> Result<()> {
-        // sum(sum1, sum2, sum3, ...) = sum1 + sum2 + sum3 + ...
         self.update_batch(states)
     }
 
     fn evaluate(&self) -> Result<ScalarValue> {
-        // TODO: add the checker for overflow
-        // For the decimal(precision,_) data type, the absolute of value must 
be less than 10^precision.
-        Ok(self.sum.clone())
+        Ok(ScalarValue::new_primitive::<T>(self.sum, &self.data_type))
     }
 
     fn size(&self) -> usize {
-        std::mem::size_of_val(self) - std::mem::size_of_val(&self.sum) + 
self.sum.size()
+        std::mem::size_of_val(self)
+    }
+}
+
+/// This accumulator incrementally computes sums over a sliding window
+///
+/// This is separate from [`SumAccumulator`] as requires additional state
+struct SlidingSumAccumulator<T: ArrowNumericType> {
+    sum: T::Native,
+    count: u64,
+    data_type: DataType,
+}
+
+impl<T: ArrowNumericType> std::fmt::Debug for SlidingSumAccumulator<T> {
+    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+        write!(f, "SlidingSumAccumulator({})", self.data_type)
+    }
+}
+
+impl<T: ArrowNumericType> SlidingSumAccumulator<T> {
+    fn new(data_type: DataType) -> Self {
+        Self {
+            sum: T::Native::usize_as(0),
+            count: 0,
+            data_type,
+        }
     }
 }
 
-impl Accumulator for SlidingSumAccumulator {
+impl<T: ArrowNumericType> Accumulator for SlidingSumAccumulator<T> {
     fn state(&self) -> Result<Vec<ScalarValue>> {
-        Ok(vec![self.sum.clone(), ScalarValue::from(self.count)])
+        Ok(vec![self.evaluate()?, self.count.into()])
     }
 
     fn update_batch(&mut self, values: &[ArrayRef]) -> Result<()> {
-        let values = &values[0];
+        let values = values[0].as_primitive::<T>();
         self.count += (values.len() - values.null_count()) as u64;
-        let delta = sum_batch(values)?;
-        self.sum = self.sum.add(&delta)?;
+        if let Some(x) = sum(values) {
+            self.sum = self.sum.add_wrapping(x)
+        }
         Ok(())
     }
 
-    fn retract_batch(&mut self, values: &[ArrayRef]) -> Result<()> {
-        let values = &values[0];
-        self.count -= (values.len() - values.null_count()) as u64;
-        let delta = sum_batch(values)?;
-        self.sum = self.sum.sub(&delta)?;
+    fn merge_batch(&mut self, states: &[ArrayRef]) -> Result<()> {
+        let values = states[0].as_primitive::<T>();
+        if let Some(x) = sum(values) {
+            self.sum = self.sum.add_wrapping(x)
+        }
+        if let Some(x) = sum(states[1].as_primitive::<UInt64Type>()) {
+            self.count += x;
+        }
         Ok(())
     }
 
-    fn merge_batch(&mut self, states: &[ArrayRef]) -> Result<()> {
-        // sum(sum1, sum2, sum3, ...) = sum1 + sum2 + sum3 + ...
-        self.update_batch(states)
+    fn evaluate(&self) -> Result<ScalarValue> {
+        let v = (self.count != 0).then_some(self.sum);
+        Ok(ScalarValue::new_primitive::<T>(v, &self.data_type))
     }
 
-    fn evaluate(&self) -> Result<ScalarValue> {
-        // TODO: add the checker for overflow
-        // For the decimal(precision,_) data type, the absolute of value must 
be less than 10^precision.
-        if self.count == 0 {
-            ScalarValue::try_from(&self.sum.get_datatype())
-        } else {
-            Ok(self.sum.clone())
+    fn size(&self) -> usize {
+        std::mem::size_of_val(self)
+    }
+
+    fn retract_batch(&mut self, values: &[ArrayRef]) -> Result<()> {
+        let values = values[0].as_primitive::<T>();
+        if let Some(x) = sum(values) {
+            self.sum = self.sum.sub_wrapping(x)
         }
+        self.count -= (values.len() - values.null_count()) as u64;
+        Ok(())
     }
 
     fn supports_retract_batch(&self) -> bool {
         true
     }
-
-    fn size(&self) -> usize {
-        std::mem::size_of_val(self) - std::mem::size_of_val(&self.sum) + 
self.sum.size()
-    }
 }
 
 #[cfg(test)]
 mod tests {
     use super::*;
     use crate::expressions::tests::assert_aggregate;
-    use arrow_array::{Float32Array, Int32Array, UInt32Array};
+    use arrow_array::*;
     use datafusion_expr::AggregateFunction;
 
     #[test]
     fn sum_decimal() {
-        // test sum batch
-        let array: ArrayRef = Arc::new(
-            (1..6)
-                .map(Some)
-                .collect::<Decimal128Array>()
-                .with_precision_and_scale(10, 0)
-                .unwrap(),
-        );
-        let result = sum_batch(&array).unwrap();
-        assert_eq!(ScalarValue::Decimal128(Some(15), 10, 0), result);
-
         // test agg
         let array: ArrayRef = Arc::new(
             (1..6)
@@ -372,23 +307,13 @@ mod tests {
         assert_aggregate(
             array,
             AggregateFunction::Sum,
+            false,
             ScalarValue::Decimal128(Some(15), 20, 0),
         );
     }
 
     #[test]
     fn sum_decimal_with_nulls() {
-        // test with batch
-        let array: ArrayRef = Arc::new(
-            (1..6)
-                .map(|i| if i == 2 { None } else { Some(i) })
-                .collect::<Decimal128Array>()
-                .with_precision_and_scale(10, 0)
-                .unwrap(),
-        );
-        let result = sum_batch(&array).unwrap();
-        assert_eq!(ScalarValue::Decimal128(Some(13), 10, 0), result);
-
         // test agg
         let array: ArrayRef = Arc::new(
             (1..6)
@@ -401,6 +326,7 @@ mod tests {
         assert_aggregate(
             array,
             AggregateFunction::Sum,
+            false,
             ScalarValue::Decimal128(Some(13), 38, 0),
         );
     }
@@ -415,13 +341,12 @@ mod tests {
                 .with_precision_and_scale(10, 0)
                 .unwrap(),
         );
-        let result = sum_batch(&array).unwrap();
-        assert_eq!(ScalarValue::Decimal128(None, 10, 0), result);
 
         // test agg
         assert_aggregate(
             array,
             AggregateFunction::Sum,
+            false,
             ScalarValue::Decimal128(None, 20, 0),
         );
     }
@@ -429,7 +354,7 @@ mod tests {
     #[test]
     fn sum_i32() {
         let a: ArrayRef = Arc::new(Int32Array::from(vec![1, 2, 3, 4, 5]));
-        assert_aggregate(a, AggregateFunction::Sum, ScalarValue::from(15i64));
+        assert_aggregate(a, AggregateFunction::Sum, false, 
ScalarValue::from(15i64));
     }
 
     #[test]
@@ -441,33 +366,33 @@ mod tests {
             Some(4),
             Some(5),
         ]));
-        assert_aggregate(a, AggregateFunction::Sum, ScalarValue::from(13i64));
+        assert_aggregate(a, AggregateFunction::Sum, false, 
ScalarValue::from(13i64));
     }
 
     #[test]
     fn sum_i32_all_nulls() {
         let a: ArrayRef = Arc::new(Int32Array::from(vec![None, None]));
-        assert_aggregate(a, AggregateFunction::Sum, ScalarValue::Int64(None));
+        assert_aggregate(a, AggregateFunction::Sum, false, 
ScalarValue::Int64(None));
     }
 
     #[test]
     fn sum_u32() {
         let a: ArrayRef =
             Arc::new(UInt32Array::from(vec![1_u32, 2_u32, 3_u32, 4_u32, 
5_u32]));
-        assert_aggregate(a, AggregateFunction::Sum, ScalarValue::from(15u64));
+        assert_aggregate(a, AggregateFunction::Sum, false, 
ScalarValue::from(15u64));
     }
 
     #[test]
     fn sum_f32() {
         let a: ArrayRef =
             Arc::new(Float32Array::from(vec![1_f32, 2_f32, 3_f32, 4_f32, 
5_f32]));
-        assert_aggregate(a, AggregateFunction::Sum, ScalarValue::from(15_f64));
+        assert_aggregate(a, AggregateFunction::Sum, false, 
ScalarValue::from(15_f64));
     }
 
     #[test]
     fn sum_f64() {
         let a: ArrayRef =
             Arc::new(Float64Array::from(vec![1_f64, 2_f64, 3_f64, 4_f64, 
5_f64]));
-        assert_aggregate(a, AggregateFunction::Sum, ScalarValue::from(15_f64));
+        assert_aggregate(a, AggregateFunction::Sum, false, 
ScalarValue::from(15_f64));
     }
 }
diff --git a/datafusion/physical-expr/src/aggregate/sum_distinct.rs 
b/datafusion/physical-expr/src/aggregate/sum_distinct.rs
index 366b875c23..c3d8d5e870 100644
--- a/datafusion/physical-expr/src/aggregate/sum_distinct.rs
+++ b/datafusion/physical-expr/src/aggregate/sum_distinct.rs
@@ -18,17 +18,21 @@
 use crate::expressions::format_state_name;
 use arrow::datatypes::{DataType, Field};
 use std::any::Any;
-use std::fmt::Debug;
 use std::sync::Arc;
 
 use ahash::RandomState;
 use arrow::array::{Array, ArrayRef};
+use arrow_array::cast::AsArray;
+use arrow_array::types::*;
+use arrow_array::{ArrowNativeTypeOp, ArrowPrimitiveType};
+use arrow_buffer::{ArrowNativeType, ToByteSlice};
 use std::collections::HashSet;
 
+use crate::aggregate::sum::downcast_sum;
 use crate::aggregate::utils::down_cast_any_ref;
 use crate::{AggregateExpr, PhysicalExpr};
-use datafusion_common::ScalarValue;
-use datafusion_common::{internal_err, DataFusionError, Result};
+use datafusion_common::{not_impl_err, DataFusionError, Result, ScalarValue};
+use datafusion_expr::type_coercion::aggregates::sum_return_type;
 use datafusion_expr::Accumulator;
 
 /// Expression for a SUM(DISTINCT) aggregation.
@@ -49,6 +53,7 @@ impl DistinctSum {
         name: String,
         data_type: DataType,
     ) -> Self {
+        let data_type = sum_return_type(&data_type).unwrap();
         Self {
             name,
             data_type,
@@ -84,7 +89,12 @@ impl AggregateExpr for DistinctSum {
     }
 
     fn create_accumulator(&self) -> Result<Box<dyn Accumulator>> {
-        Ok(Box::new(DistinctSumAccumulator::try_new(&self.data_type)?))
+        macro_rules! helper {
+            ($t:ty, $dt:expr) => {
+                Ok(Box::new(DistinctSumAccumulator::<$t>::try_new(&$dt)?))
+            };
+        }
+        downcast_sum!(self, helper)
     }
 }
 
@@ -106,29 +116,56 @@ impl PartialEq<dyn Any> for DistinctSum {
     }
 }
 
-#[derive(Debug)]
-struct DistinctSumAccumulator {
-    hash_values: HashSet<ScalarValue, RandomState>,
+/// A wrapper around a type to provide hash for floats
+#[derive(Copy, Clone)]
+struct Hashable<T>(T);
+
+impl<T: ToByteSlice> std::hash::Hash for Hashable<T> {
+    fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
+        self.0.to_byte_slice().hash(state)
+    }
+}
+
+impl<T: ArrowNativeTypeOp> PartialEq for Hashable<T> {
+    fn eq(&self, other: &Self) -> bool {
+        self.0.is_eq(other.0)
+    }
+}
+
+impl<T: ArrowNativeTypeOp> Eq for Hashable<T> {}
+
+struct DistinctSumAccumulator<T: ArrowPrimitiveType> {
+    values: HashSet<Hashable<T::Native>, RandomState>,
     data_type: DataType,
 }
-impl DistinctSumAccumulator {
+
+impl<T: ArrowPrimitiveType> std::fmt::Debug for DistinctSumAccumulator<T> {
+    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+        write!(f, "DistinctSumAccumulator({})", self.data_type)
+    }
+}
+
+impl<T: ArrowPrimitiveType> DistinctSumAccumulator<T> {
     pub fn try_new(data_type: &DataType) -> Result<Self> {
         Ok(Self {
-            hash_values: HashSet::default(),
+            values: HashSet::default(),
             data_type: data_type.clone(),
         })
     }
 }
 
-impl Accumulator for DistinctSumAccumulator {
+impl<T: ArrowPrimitiveType> Accumulator for DistinctSumAccumulator<T> {
     fn state(&self) -> Result<Vec<ScalarValue>> {
         // 1. Stores aggregate state in `ScalarValue::List`
         // 2. Constructs `ScalarValue::List` state from distinct numeric 
stored in hash set
         let state_out = {
             let mut distinct_values = Vec::new();
-            self.hash_values
-                .iter()
-                .for_each(|distinct_value| 
distinct_values.push(distinct_value.clone()));
+            self.values.iter().for_each(|distinct_value| {
+                distinct_values.push(ScalarValue::new_primitive::<T>(
+                    Some(distinct_value.0),
+                    &self.data_type,
+                ))
+            });
             vec![ScalarValue::new_list(
                 Some(distinct_values),
                 self.data_type.clone(),
@@ -142,62 +179,49 @@ impl Accumulator for DistinctSumAccumulator {
             return Ok(());
         }
 
-        let arr = &values[0];
-        (0..values[0].len()).try_for_each(|index| {
-            if !arr.is_null(index) {
-                let v = ScalarValue::try_from_array(arr, index)?;
-                self.hash_values.insert(v);
+        let array = values[0].as_primitive::<T>();
+        match array.nulls().filter(|x| x.null_count() > 0) {
+            Some(n) => {
+                for idx in n.valid_indices() {
+                    self.values.insert(Hashable(array.value(idx)));
+                }
             }
-            Ok(())
-        })
+            None => array.values().iter().for_each(|x| {
+                self.values.insert(Hashable(*x));
+            }),
+        }
+        Ok(())
     }
 
     fn merge_batch(&mut self, states: &[ArrayRef]) -> Result<()> {
-        if states.is_empty() {
-            return Ok(());
+        for x in states[0].as_list::<i32>().iter().flatten() {
+            self.update_batch(&[x])?
         }
-
-        let arr = &states[0];
-        (0..arr.len()).try_for_each(|index| {
-            let scalar = ScalarValue::try_from_array(arr, index)?;
-
-            if let ScalarValue::List(Some(scalar), _) = scalar {
-                scalar.iter().for_each(|scalar| {
-                    if !ScalarValue::is_null(scalar) {
-                        self.hash_values.insert(scalar.clone());
-                    }
-                });
-            } else {
-                return internal_err!("Unexpected accumulator state");
-            }
-            Ok(())
-        })
+        Ok(())
     }
 
     fn evaluate(&self) -> Result<ScalarValue> {
-        let mut sum_value = ScalarValue::try_from(&self.data_type)?;
-        for distinct_value in self.hash_values.iter() {
-            sum_value = sum_value.add(distinct_value)?;
+        let mut acc = T::Native::usize_as(0);
+        for distinct_value in self.values.iter() {
+            acc = acc.add_wrapping(distinct_value.0)
         }
-        Ok(sum_value)
+        let v = (!self.values.is_empty()).then_some(acc);
+        Ok(ScalarValue::new_primitive::<T>(v, &self.data_type))
     }
 
     fn size(&self) -> usize {
-        std::mem::size_of_val(self) + 
ScalarValue::size_of_hashset(&self.hash_values)
-            - std::mem::size_of_val(&self.hash_values)
-            + self.data_type.size()
-            - std::mem::size_of_val(&self.data_type)
+        std::mem::size_of_val(self)
+            + self.values.capacity() * std::mem::size_of::<T::Native>()
     }
 }
 
 #[cfg(test)]
 mod tests {
     use super::*;
-    use crate::expressions::col;
-    use crate::expressions::tests::aggregate;
-    use arrow::record_batch::RecordBatch;
-    use arrow::{array::*, datatypes::*};
+    use crate::expressions::tests::assert_aggregate;
+    use arrow::array::*;
     use datafusion_common::Result;
+    use datafusion_expr::AggregateFunction;
 
     fn run_update_batch(
         return_type: DataType,
@@ -211,26 +235,6 @@ mod tests {
         Ok((accum.state()?, accum.evaluate()?))
     }
 
-    macro_rules! generic_test_sum_distinct {
-        ($ARRAY:expr, $DATATYPE:expr, $EXPECTED:expr) => {{
-            let schema = Schema::new(vec![Field::new("a", $DATATYPE, true)]);
-
-            let batch = RecordBatch::try_new(Arc::new(schema.clone()), 
vec![$ARRAY])?;
-
-            let agg = Arc::new(DistinctSum::new(
-                vec![col("a", &schema)?],
-                "count_distinct_a".to_string(),
-                $EXPECTED.get_datatype(),
-            ));
-            let actual = aggregate(&batch, agg)?;
-            let expected = ScalarValue::from($EXPECTED);
-
-            assert_eq!(expected, actual);
-
-            Ok(())
-        }};
-    }
-
     #[test]
     fn sum_distinct_update_batch() -> Result<()> {
         let array_int64: ArrayRef = Arc::new(Int64Array::from(vec![1, 1, 3]));
@@ -244,7 +248,7 @@ mod tests {
     }
 
     #[test]
-    fn sum_distinct_i32_with_nulls() -> Result<()> {
+    fn sum_distinct_i32_with_nulls() {
         let array = Arc::new(Int32Array::from(vec![
             Some(1),
             Some(1),
@@ -253,11 +257,11 @@ mod tests {
             Some(2),
             Some(3),
         ]));
-        generic_test_sum_distinct!(array, DataType::Int32, 
ScalarValue::from(6_i32))
+        assert_aggregate(array, AggregateFunction::Sum, true, 6_i64.into());
     }
 
     #[test]
-    fn sum_distinct_u32_with_nulls() -> Result<()> {
+    fn sum_distinct_u32_with_nulls() {
         let array: ArrayRef = Arc::new(UInt32Array::from(vec![
             Some(1_u32),
             Some(1_u32),
@@ -265,28 +269,30 @@ mod tests {
             Some(3_u32),
             None,
         ]));
-        generic_test_sum_distinct!(array, DataType::UInt32, 
ScalarValue::from(4_u32))
+        assert_aggregate(array, AggregateFunction::Sum, true, 4_u64.into());
     }
 
     #[test]
-    fn sum_distinct_f64() -> Result<()> {
+    fn sum_distinct_f64() {
         let array: ArrayRef =
             Arc::new(Float64Array::from(vec![1_f64, 1_f64, 3_f64, 3_f64, 
3_f64]));
-        generic_test_sum_distinct!(array, DataType::Float64, 
ScalarValue::from(4_f64))
+        assert_aggregate(array, AggregateFunction::Sum, true, 4_f64.into());
     }
 
     #[test]
-    fn sum_distinct_decimal_with_nulls() -> Result<()> {
+    fn sum_distinct_decimal_with_nulls() {
         let array: ArrayRef = Arc::new(
             (1..6)
                 .map(|i| if i == 2 { None } else { Some(i % 2) })
                 .collect::<Decimal128Array>()
-                .with_precision_and_scale(35, 0)?,
+                .with_precision_and_scale(35, 0)
+                .unwrap(),
         );
-        generic_test_sum_distinct!(
+        assert_aggregate(
             array,
-            DataType::Decimal128(35, 0),
-            ScalarValue::Decimal128(Some(1), 38, 0)
-        )
+            AggregateFunction::Sum,
+            true,
+            ScalarValue::Decimal128(Some(1), 38, 0),
+        );
     }
 }
diff --git a/datafusion/physical-expr/src/aggregate/utils.rs 
b/datafusion/physical-expr/src/aggregate/utils.rs
index e86eb1dc1f..463d8fec18 100644
--- a/datafusion/physical-expr/src/aggregate/utils.rs
+++ b/datafusion/physical-expr/src/aggregate/utils.rs
@@ -23,8 +23,7 @@ use arrow::datatypes::{MAX_DECIMAL_FOR_EACH_PRECISION, 
MIN_DECIMAL_FOR_EACH_PREC
 use arrow_array::cast::AsArray;
 use arrow_array::types::Decimal128Type;
 use arrow_schema::{DataType, Field};
-use datafusion_common::internal_err;
-use datafusion_common::{DataFusionError, Result, ScalarValue};
+use datafusion_common::{DataFusionError, Result};
 use datafusion_expr::Accumulator;
 use std::any::Any;
 use std::sync::Arc;
@@ -118,34 +117,6 @@ impl Decimal128Averager {
     }
 }
 
-/// Returns `sum`/`count` for decimal values, detecting and reporting overflow.
-///
-/// * sum:  stored as Decimal128 with `sum_scale` scale
-/// * count: stored as a i128 (*NOT* a Decimal128 value)
-/// * sum_scale: the scale of `sum`
-/// * target_type: the output decimal type
-pub fn calculate_result_decimal_for_avg(
-    sum: i128,
-    count: i128,
-    sum_scale: i8,
-    target_type: &DataType,
-) -> Result<ScalarValue> {
-    match target_type {
-        DataType::Decimal128(target_precision, target_scale) => {
-            let new_value =
-                Decimal128Averager::try_new(sum_scale, *target_precision, 
*target_scale)?
-                    .avg(sum, count)?;
-
-            Ok(ScalarValue::Decimal128(
-                Some(new_value),
-                *target_precision,
-                *target_scale,
-            ))
-        }
-        other => internal_err!("Invalid target type in AvgAccumulator 
{other:?}"),
-    }
-}
-
 /// Adjust array type metadata if needed
 ///
 /// Since `Decimal128Arrays` created from `Vec<NativeType>` have
diff --git a/datafusion/physical-expr/src/expressions/mod.rs 
b/datafusion/physical-expr/src/expressions/mod.rs
index 4a6d52834d..bce1240e50 100644
--- a/datafusion/physical-expr/src/expressions/mod.rs
+++ b/datafusion/physical-expr/src/expressions/mod.rs
@@ -140,6 +140,7 @@ pub(crate) mod tests {
     pub fn assert_aggregate(
         array: ArrayRef,
         function: AggregateFunction,
+        distinct: bool,
         expected: ScalarValue,
     ) {
         let data_type = array.data_type();
@@ -159,7 +160,7 @@ pub(crate) mod tests {
 
         let schema = Schema::new(vec![Field::new("a", coerced[0].clone(), 
true)]);
         let agg =
-            create_aggregate_expr(&function, false, &[input], &[], &schema, 
"aggregate")
+            create_aggregate_expr(&function, distinct, &[input], &[], &schema, 
"agg")
                 .unwrap();
 
         let result = aggregate(&batch, agg).unwrap();
diff --git a/datafusion/sqllogictest/test_files/decimal.slt 
b/datafusion/sqllogictest/test_files/decimal.slt
index a2a1df55e5..76743e444e 100644
--- a/datafusion/sqllogictest/test_files/decimal.slt
+++ b/datafusion/sqllogictest/test_files/decimal.slt
@@ -618,7 +618,7 @@ select a / b from foo;
 statement ok
 create table t as values (arrow_cast(123, 'Decimal256(5,2)'));
 
-query error DataFusion error: Internal error: Operator \+ is not implemented 
for types Decimal256\(None,5,2\) and Decimal256\(Some\(12300\),5,2\)\. This was 
likely caused by a bug in DataFusion's code and we would welcome that you file 
an bug report in our issue tracker
+query error DataFusion error: This feature is not implemented: AvgAccumulator 
for \(Decimal256\(5, 2\) \-\-> Decimal256\(9, 6\)\)
 select AVG(column1) from t;
 
 statement ok


Reply via email to