This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new 5d70c32a9a Change `Accumulator::evaluate` and `Accumulator::state` to 
take `&mut self` (#8925)
5d70c32a9a is described below

commit 5d70c32a9a4accf21e9f27ff5ed62666cbbcbe54
Author: Andrew Lamb <[email protected]>
AuthorDate: Wed Jan 24 07:01:43 2024 -0500

    Change `Accumulator::evaluate` and `Accumulator::state` to take `&mut self` 
(#8925)
    
    * Change `Accumulator::evaluate` and `Accumulator::state` to take `&mut 
self`
    
    * improve docs
    
    * fix signature
---
 datafusion-examples/examples/advanced_udaf.rs      |  4 ++--
 datafusion-examples/examples/simple_udaf.rs        |  4 ++--
 datafusion/core/src/datasource/statistics.rs       |  4 ++--
 .../tests/user_defined/user_defined_aggregates.rs  | 12 +++++------
 .../user_defined/user_defined_scalar_functions.rs  |  4 ++--
 datafusion/expr/src/accumulator.rs                 | 23 ++++++++++++++++++----
 .../physical-expr/src/aggregate/approx_distinct.rs |  4 ++--
 .../src/aggregate/approx_percentile_cont.rs        |  4 ++--
 .../approx_percentile_cont_with_weight.rs          |  4 ++--
 .../physical-expr/src/aggregate/array_agg.rs       |  4 ++--
 .../src/aggregate/array_agg_distinct.rs            |  4 ++--
 .../src/aggregate/array_agg_ordered.rs             |  4 ++--
 datafusion/physical-expr/src/aggregate/average.rs  |  8 ++++----
 .../physical-expr/src/aggregate/bit_and_or_xor.rs  | 16 +++++++--------
 .../physical-expr/src/aggregate/bool_and_or.rs     |  8 ++++----
 .../physical-expr/src/aggregate/correlation.rs     |  6 +++---
 datafusion/physical-expr/src/aggregate/count.rs    |  4 ++--
 .../physical-expr/src/aggregate/count_distinct.rs  | 12 +++++------
 .../physical-expr/src/aggregate/covariance.rs      |  6 +++---
 .../physical-expr/src/aggregate/first_last.rs      |  8 ++++----
 .../src/aggregate/groups_accumulator/adapter.rs    |  4 ++--
 datafusion/physical-expr/src/aggregate/median.rs   |  7 +++----
 datafusion/physical-expr/src/aggregate/min_max.rs  | 16 +++++++--------
 .../physical-expr/src/aggregate/nth_value.rs       |  4 ++--
 datafusion/physical-expr/src/aggregate/regr.rs     |  4 ++--
 datafusion/physical-expr/src/aggregate/stddev.rs   |  6 +++---
 .../physical-expr/src/aggregate/string_agg.rs      |  4 ++--
 datafusion/physical-expr/src/aggregate/sum.rs      |  8 ++++----
 .../physical-expr/src/aggregate/sum_distinct.rs    |  4 ++--
 datafusion/physical-expr/src/aggregate/utils.rs    |  2 +-
 datafusion/physical-expr/src/aggregate/variance.rs |  6 +++---
 datafusion/physical-plan/src/aggregates/mod.rs     |  6 +++---
 .../physical-plan/src/aggregates/no_grouping.rs    | 13 ++++++------
 .../proto/tests/cases/roundtrip_logical_plan.rs    |  8 ++++----
 .../proto/tests/cases/roundtrip_physical_plan.rs   |  4 ++--
 .../tests/cases/roundtrip_logical_plan.rs          |  4 ++--
 36 files changed, 129 insertions(+), 114 deletions(-)

diff --git a/datafusion-examples/examples/advanced_udaf.rs 
b/datafusion-examples/examples/advanced_udaf.rs
index e5433013d9..10164a850b 100644
--- a/datafusion-examples/examples/advanced_udaf.rs
+++ b/datafusion-examples/examples/advanced_udaf.rs
@@ -125,7 +125,7 @@ impl Accumulator for GeometricMean {
     // This function serializes our state to `ScalarValue`, which DataFusion 
uses
     // to pass this state between execution stages.
     // Note that this can be arbitrary data.
-    fn state(&self) -> Result<Vec<ScalarValue>> {
+    fn state(&mut self) -> Result<Vec<ScalarValue>> {
         Ok(vec![
             ScalarValue::from(self.prod),
             ScalarValue::from(self.n),
@@ -134,7 +134,7 @@ impl Accumulator for GeometricMean {
 
     // DataFusion expects this function to return the final value of this 
aggregator.
     // in this case, this is the formula of the geometric mean
-    fn evaluate(&self) -> Result<ScalarValue> {
+    fn evaluate(&mut self) -> Result<ScalarValue> {
         let value = self.prod.powf(1.0 / self.n as f64);
         Ok(ScalarValue::from(value))
     }
diff --git a/datafusion-examples/examples/simple_udaf.rs 
b/datafusion-examples/examples/simple_udaf.rs
index 2c797f221b..0996a67245 100644
--- a/datafusion-examples/examples/simple_udaf.rs
+++ b/datafusion-examples/examples/simple_udaf.rs
@@ -72,7 +72,7 @@ impl Accumulator for GeometricMean {
     // This function serializes our state to `ScalarValue`, which DataFusion 
uses
     // to pass this state between execution stages.
     // Note that this can be arbitrary data.
-    fn state(&self) -> Result<Vec<ScalarValue>> {
+    fn state(&mut self) -> Result<Vec<ScalarValue>> {
         Ok(vec![
             ScalarValue::from(self.prod),
             ScalarValue::from(self.n),
@@ -81,7 +81,7 @@ impl Accumulator for GeometricMean {
 
     // DataFusion expects this function to return the final value of this 
aggregator.
     // in this case, this is the formula of the geometric mean
-    fn evaluate(&self) -> Result<ScalarValue> {
+    fn evaluate(&mut self) -> Result<ScalarValue> {
         let value = self.prod.powf(1.0 / self.n as f64);
         Ok(ScalarValue::from(value))
     }
diff --git a/datafusion/core/src/datasource/statistics.rs 
b/datafusion/core/src/datasource/statistics.rs
index 695e139517..73896f8eb7 100644
--- a/datafusion/core/src/datasource/statistics.rs
+++ b/datafusion/core/src/datasource/statistics.rs
@@ -194,11 +194,11 @@ pub(crate) fn get_col_stats(
 ) -> Vec<ColumnStatistics> {
     (0..schema.fields().len())
         .map(|i| {
-            let max_value = match &max_values[i] {
+            let max_value = match max_values.get_mut(i).unwrap() {
                 Some(max_value) => max_value.evaluate().ok(),
                 None => None,
             };
-            let min_value = match &min_values[i] {
+            let min_value = match min_values.get_mut(i).unwrap() {
                 Some(min_value) => min_value.evaluate().ok(),
                 None => None,
             };
diff --git a/datafusion/core/tests/user_defined/user_defined_aggregates.rs 
b/datafusion/core/tests/user_defined/user_defined_aggregates.rs
index 5b578daa7e..5dbac0322f 100644
--- a/datafusion/core/tests/user_defined/user_defined_aggregates.rs
+++ b/datafusion/core/tests/user_defined/user_defined_aggregates.rs
@@ -456,7 +456,7 @@ impl TimeSum {
 }
 
 impl Accumulator for TimeSum {
-    fn state(&self) -> Result<Vec<ScalarValue>> {
+    fn state(&mut self) -> Result<Vec<ScalarValue>> {
         Ok(vec![self.evaluate()?])
     }
 
@@ -478,7 +478,7 @@ impl Accumulator for TimeSum {
         self.update_batch(states)
     }
 
-    fn evaluate(&self) -> Result<ScalarValue> {
+    fn evaluate(&mut self) -> Result<ScalarValue> {
         println!("Evaluating to {}", self.sum);
         Ok(ScalarValue::TimestampNanosecond(Some(self.sum), None))
     }
@@ -603,14 +603,14 @@ impl FirstSelector {
 }
 
 impl Accumulator for FirstSelector {
-    fn state(&self) -> Result<Vec<ScalarValue>> {
+    fn state(&mut self) -> Result<Vec<ScalarValue>> {
         let state = self.to_state().into_iter().collect::<Vec<_>>();
 
         Ok(state)
     }
 
     /// produce the output structure
-    fn evaluate(&self) -> Result<ScalarValue> {
+    fn evaluate(&mut self) -> Result<ScalarValue> {
         Ok(self.to_scalar())
     }
 
@@ -689,7 +689,7 @@ impl Accumulator for TestGroupsAccumulator {
         Ok(())
     }
 
-    fn evaluate(&self) -> Result<ScalarValue> {
+    fn evaluate(&mut self) -> Result<ScalarValue> {
         Ok(ScalarValue::from(self.result))
     }
 
@@ -697,7 +697,7 @@ impl Accumulator for TestGroupsAccumulator {
         std::mem::size_of::<u64>()
     }
 
-    fn state(&self) -> Result<Vec<ScalarValue>> {
+    fn state(&mut self) -> Result<Vec<ScalarValue>> {
         Ok(vec![ScalarValue::from(self.result)])
     }
 
diff --git 
a/datafusion/core/tests/user_defined/user_defined_scalar_functions.rs 
b/datafusion/core/tests/user_defined/user_defined_scalar_functions.rs
index b8573a690e..f1e32591fb 100644
--- a/datafusion/core/tests/user_defined/user_defined_scalar_functions.rs
+++ b/datafusion/core/tests/user_defined/user_defined_scalar_functions.rs
@@ -254,7 +254,7 @@ async fn udaf_as_window_func() -> Result<()> {
     struct MyAccumulator;
 
     impl Accumulator for MyAccumulator {
-        fn state(&self) -> Result<Vec<ScalarValue>> {
+        fn state(&mut self) -> Result<Vec<ScalarValue>> {
             unimplemented!()
         }
 
@@ -266,7 +266,7 @@ async fn udaf_as_window_func() -> Result<()> {
             unimplemented!()
         }
 
-        fn evaluate(&self) -> Result<ScalarValue> {
+        fn evaluate(&mut self) -> Result<ScalarValue> {
             unimplemented!()
         }
 
diff --git a/datafusion/expr/src/accumulator.rs 
b/datafusion/expr/src/accumulator.rs
index 32de88b3d9..523e4e21a6 100644
--- a/datafusion/expr/src/accumulator.rs
+++ b/datafusion/expr/src/accumulator.rs
@@ -56,11 +56,18 @@ pub trait Accumulator: Send + Sync + Debug {
     /// running sum.
     fn update_batch(&mut self, values: &[ArrayRef]) -> Result<()>;
 
-    /// Returns the final aggregate value.
+    /// Returns the final aggregate value, consuming the internal state.
     ///
     /// For example, the `SUM` accumulator maintains a running sum,
     /// and `evaluate` will produce that running sum as its output.
-    fn evaluate(&self) -> Result<ScalarValue>;
+    ///
+    /// After this call, the accumulator's internal state should be
+    /// equivalent to when it was first created.
+    ///
+    /// This function gets `&mut self` to allow for the accumulator to build
+    /// arrow compatible internal state that can be returned without copying
+    /// when possible (for example distinct strings)
+    fn evaluate(&mut self) -> Result<ScalarValue>;
 
     /// Returns the allocated size required for this accumulator, in
     /// bytes, including `Self`.
@@ -72,7 +79,15 @@ pub trait Accumulator: Send + Sync + Debug {
     /// the `capacity` should be used not the `len`.
     fn size(&self) -> usize;
 
-    /// Returns the intermediate state of the accumulator.
+    /// Returns the intermediate state of the accumulator, consuming the
+    /// intermediate state.
+    ///
+    /// After this call, the accumulator's internal state should be
+    /// equivalent to when it was first created.
+    ///
+    /// This function gets `&mut self` to allow for the accumulator to build
+    /// arrow compatible internal state that can be returned without copying
+    /// when possible (for example distinct strings).
     ///
     /// Intermediate state is used for "multi-phase" grouping in
     /// DataFusion, where an aggregate is computed in parallel with
@@ -129,7 +144,7 @@ pub trait Accumulator: Send + Sync + Debug {
     /// Note that [`ScalarValue::List`] can be used to pass multiple
     /// values if the number of intermediate values is not known at
     /// planning time (e.g. for `MEDIAN`)
-    fn state(&self) -> Result<Vec<ScalarValue>>;
+    fn state(&mut self) -> Result<Vec<ScalarValue>>;
 
     /// Updates the accumulator's state from an `Array` containing one
     /// or more intermediate values.
diff --git a/datafusion/physical-expr/src/aggregate/approx_distinct.rs 
b/datafusion/physical-expr/src/aggregate/approx_distinct.rs
index b79a5611c3..66e1310695 100644
--- a/datafusion/physical-expr/src/aggregate/approx_distinct.rs
+++ b/datafusion/physical-expr/src/aggregate/approx_distinct.rs
@@ -244,12 +244,12 @@ macro_rules! default_accumulator_impl {
             Ok(())
         }
 
-        fn state(&self) -> Result<Vec<ScalarValue>> {
+        fn state(&mut self) -> Result<Vec<ScalarValue>> {
             let value = ScalarValue::from(&self.hll);
             Ok(vec![value])
         }
 
-        fn evaluate(&self) -> Result<ScalarValue> {
+        fn evaluate(&mut self) -> Result<ScalarValue> {
             Ok(ScalarValue::UInt64(Some(self.hll.count() as u64)))
         }
 
diff --git a/datafusion/physical-expr/src/aggregate/approx_percentile_cont.rs 
b/datafusion/physical-expr/src/aggregate/approx_percentile_cont.rs
index 15c0fb3ace..b3de7b0b4d 100644
--- a/datafusion/physical-expr/src/aggregate/approx_percentile_cont.rs
+++ b/datafusion/physical-expr/src/aggregate/approx_percentile_cont.rs
@@ -377,7 +377,7 @@ impl ApproxPercentileAccumulator {
 }
 
 impl Accumulator for ApproxPercentileAccumulator {
-    fn state(&self) -> Result<Vec<ScalarValue>> {
+    fn state(&mut self) -> Result<Vec<ScalarValue>> {
         Ok(self.digest.to_scalar_state().into_iter().collect())
     }
 
@@ -389,7 +389,7 @@ impl Accumulator for ApproxPercentileAccumulator {
         Ok(())
     }
 
-    fn evaluate(&self) -> Result<ScalarValue> {
+    fn evaluate(&mut self) -> Result<ScalarValue> {
         if self.digest.count() == 0.0 {
             return exec_err!("aggregate function needs at least one non-null 
element");
         }
diff --git 
a/datafusion/physical-expr/src/aggregate/approx_percentile_cont_with_weight.rs 
b/datafusion/physical-expr/src/aggregate/approx_percentile_cont_with_weight.rs
index ee5ef7228f..3fa715a592 100644
--- 
a/datafusion/physical-expr/src/aggregate/approx_percentile_cont_with_weight.rs
+++ 
b/datafusion/physical-expr/src/aggregate/approx_percentile_cont_with_weight.rs
@@ -129,7 +129,7 @@ impl ApproxPercentileWithWeightAccumulator {
 }
 
 impl Accumulator for ApproxPercentileWithWeightAccumulator {
-    fn state(&self) -> Result<Vec<ScalarValue>> {
+    fn state(&mut self) -> Result<Vec<ScalarValue>> {
         self.approx_percentile_cont_accumulator.state()
     }
 
@@ -155,7 +155,7 @@ impl Accumulator for ApproxPercentileWithWeightAccumulator {
         Ok(())
     }
 
-    fn evaluate(&self) -> Result<ScalarValue> {
+    fn evaluate(&mut self) -> Result<ScalarValue> {
         self.approx_percentile_cont_accumulator.evaluate()
     }
 
diff --git a/datafusion/physical-expr/src/aggregate/array_agg.rs 
b/datafusion/physical-expr/src/aggregate/array_agg.rs
index 91d5c867d3..5dc29f834f 100644
--- a/datafusion/physical-expr/src/aggregate/array_agg.rs
+++ b/datafusion/physical-expr/src/aggregate/array_agg.rs
@@ -153,11 +153,11 @@ impl Accumulator for ArrayAggAccumulator {
         Ok(())
     }
 
-    fn state(&self) -> Result<Vec<ScalarValue>> {
+    fn state(&mut self) -> Result<Vec<ScalarValue>> {
         Ok(vec![self.evaluate()?])
     }
 
-    fn evaluate(&self) -> Result<ScalarValue> {
+    fn evaluate(&mut self) -> Result<ScalarValue> {
         // Transform Vec<ListArr> to ListArr
 
         let element_arrays: Vec<&dyn Array> =
diff --git a/datafusion/physical-expr/src/aggregate/array_agg_distinct.rs 
b/datafusion/physical-expr/src/aggregate/array_agg_distinct.rs
index 2d263a42e0..a58856e398 100644
--- a/datafusion/physical-expr/src/aggregate/array_agg_distinct.rs
+++ b/datafusion/physical-expr/src/aggregate/array_agg_distinct.rs
@@ -129,7 +129,7 @@ impl DistinctArrayAggAccumulator {
 }
 
 impl Accumulator for DistinctArrayAggAccumulator {
-    fn state(&self) -> Result<Vec<ScalarValue>> {
+    fn state(&mut self) -> Result<Vec<ScalarValue>> {
         Ok(vec![self.evaluate()?])
     }
 
@@ -163,7 +163,7 @@ impl Accumulator for DistinctArrayAggAccumulator {
         Ok(())
     }
 
-    fn evaluate(&self) -> Result<ScalarValue> {
+    fn evaluate(&mut self) -> Result<ScalarValue> {
         let values: Vec<ScalarValue> = self.values.iter().cloned().collect();
         let arr = ScalarValue::new_list(&values, &self.datatype);
         Ok(ScalarValue::List(arr))
diff --git a/datafusion/physical-expr/src/aggregate/array_agg_ordered.rs 
b/datafusion/physical-expr/src/aggregate/array_agg_ordered.rs
index 34f8d20628..5263fa83a6 100644
--- a/datafusion/physical-expr/src/aggregate/array_agg_ordered.rs
+++ b/datafusion/physical-expr/src/aggregate/array_agg_ordered.rs
@@ -279,13 +279,13 @@ impl Accumulator for OrderSensitiveArrayAggAccumulator {
         Ok(())
     }
 
-    fn state(&self) -> Result<Vec<ScalarValue>> {
+    fn state(&mut self) -> Result<Vec<ScalarValue>> {
         let mut result = vec![self.evaluate()?];
         result.push(self.evaluate_orderings()?);
         Ok(result)
     }
 
-    fn evaluate(&self) -> Result<ScalarValue> {
+    fn evaluate(&mut self) -> Result<ScalarValue> {
         let values = self.values.clone();
         let array = if self.reverse {
             ScalarValue::new_list_from_iter(values.into_iter().rev(), 
&self.datatypes[0])
diff --git a/datafusion/physical-expr/src/aggregate/average.rs 
b/datafusion/physical-expr/src/aggregate/average.rs
index 187373e14f..57f8fa211e 100644
--- a/datafusion/physical-expr/src/aggregate/average.rs
+++ b/datafusion/physical-expr/src/aggregate/average.rs
@@ -238,7 +238,7 @@ pub struct AvgAccumulator {
 }
 
 impl Accumulator for AvgAccumulator {
-    fn state(&self) -> Result<Vec<ScalarValue>> {
+    fn state(&mut self) -> Result<Vec<ScalarValue>> {
         Ok(vec![
             ScalarValue::from(self.count),
             ScalarValue::Float64(self.sum),
@@ -276,7 +276,7 @@ impl Accumulator for AvgAccumulator {
         Ok(())
     }
 
-    fn evaluate(&self) -> Result<ScalarValue> {
+    fn evaluate(&mut self) -> Result<ScalarValue> {
         Ok(ScalarValue::Float64(
             self.sum.map(|f| f / self.count as f64),
         ))
@@ -314,7 +314,7 @@ impl<T: DecimalType + ArrowNumericType> Debug for 
DecimalAvgAccumulator<T> {
 }
 
 impl<T: DecimalType + ArrowNumericType> Accumulator for 
DecimalAvgAccumulator<T> {
-    fn state(&self) -> Result<Vec<ScalarValue>> {
+    fn state(&mut self) -> Result<Vec<ScalarValue>> {
         Ok(vec![
             ScalarValue::from(self.count),
             ScalarValue::new_primitive::<T>(
@@ -356,7 +356,7 @@ impl<T: DecimalType + ArrowNumericType> Accumulator for 
DecimalAvgAccumulator<T>
         Ok(())
     }
 
-    fn evaluate(&self) -> Result<ScalarValue> {
+    fn evaluate(&mut self) -> Result<ScalarValue> {
         let v = self
             .sum
             .map(|v| {
diff --git a/datafusion/physical-expr/src/aggregate/bit_and_or_xor.rs 
b/datafusion/physical-expr/src/aggregate/bit_and_or_xor.rs
index 92883d8049..ad5e8a5ac7 100644
--- a/datafusion/physical-expr/src/aggregate/bit_and_or_xor.rs
+++ b/datafusion/physical-expr/src/aggregate/bit_and_or_xor.rs
@@ -190,11 +190,11 @@ where
         self.update_batch(states)
     }
 
-    fn state(&self) -> Result<Vec<ScalarValue>> {
+    fn state(&mut self) -> Result<Vec<ScalarValue>> {
         Ok(vec![self.evaluate()?])
     }
 
-    fn evaluate(&self) -> Result<ScalarValue> {
+    fn evaluate(&mut self) -> Result<ScalarValue> {
         ScalarValue::new_primitive::<T>(self.value, &T::DATA_TYPE)
     }
 
@@ -339,7 +339,7 @@ impl<T: ArrowNumericType> Accumulator for 
BitOrAccumulator<T>
 where
     T::Native: std::ops::BitOr<Output = T::Native>,
 {
-    fn state(&self) -> Result<Vec<ScalarValue>> {
+    fn state(&mut self) -> Result<Vec<ScalarValue>> {
         Ok(vec![self.evaluate()?])
     }
 
@@ -355,7 +355,7 @@ where
         self.update_batch(states)
     }
 
-    fn evaluate(&self) -> Result<ScalarValue> {
+    fn evaluate(&mut self) -> Result<ScalarValue> {
         ScalarValue::new_primitive::<T>(self.value, &T::DATA_TYPE)
     }
 
@@ -500,7 +500,7 @@ impl<T: ArrowNumericType> Accumulator for 
BitXorAccumulator<T>
 where
     T::Native: std::ops::BitXor<Output = T::Native>,
 {
-    fn state(&self) -> Result<Vec<ScalarValue>> {
+    fn state(&mut self) -> Result<Vec<ScalarValue>> {
         Ok(vec![self.evaluate()?])
     }
 
@@ -516,7 +516,7 @@ where
         self.update_batch(states)
     }
 
-    fn evaluate(&self) -> Result<ScalarValue> {
+    fn evaluate(&mut self) -> Result<ScalarValue> {
         ScalarValue::new_primitive::<T>(self.value, &T::DATA_TYPE)
     }
 
@@ -634,7 +634,7 @@ impl<T: ArrowNumericType> Accumulator for 
DistinctBitXorAccumulator<T>
 where
     T::Native: std::ops::BitXor<Output = T::Native> + std::hash::Hash + Eq,
 {
-    fn state(&self) -> Result<Vec<ScalarValue>> {
+    fn state(&mut self) -> Result<Vec<ScalarValue>> {
         // 1. Stores aggregate state in `ScalarValue::List`
         // 2. Constructs `ScalarValue::List` state from distinct numeric 
stored in hash set
         let state_out = {
@@ -679,7 +679,7 @@ where
         Ok(())
     }
 
-    fn evaluate(&self) -> Result<ScalarValue> {
+    fn evaluate(&mut self) -> Result<ScalarValue> {
         let mut acc = T::Native::usize_as(0);
         for distinct_value in self.values.iter() {
             acc = acc ^ *distinct_value;
diff --git a/datafusion/physical-expr/src/aggregate/bool_and_or.rs 
b/datafusion/physical-expr/src/aggregate/bool_and_or.rs
index ae205141b4..0a018fe086 100644
--- a/datafusion/physical-expr/src/aggregate/bool_and_or.rs
+++ b/datafusion/physical-expr/src/aggregate/bool_and_or.rs
@@ -191,11 +191,11 @@ impl Accumulator for BoolAndAccumulator {
         self.update_batch(states)
     }
 
-    fn state(&self) -> Result<Vec<ScalarValue>> {
+    fn state(&mut self) -> Result<Vec<ScalarValue>> {
         Ok(vec![ScalarValue::Boolean(self.acc)])
     }
 
-    fn evaluate(&self) -> Result<ScalarValue> {
+    fn evaluate(&mut self) -> Result<ScalarValue> {
         Ok(ScalarValue::Boolean(self.acc))
     }
 
@@ -309,7 +309,7 @@ struct BoolOrAccumulator {
 }
 
 impl Accumulator for BoolOrAccumulator {
-    fn state(&self) -> Result<Vec<ScalarValue>> {
+    fn state(&mut self) -> Result<Vec<ScalarValue>> {
         Ok(vec![ScalarValue::Boolean(self.acc)])
     }
 
@@ -328,7 +328,7 @@ impl Accumulator for BoolOrAccumulator {
         self.update_batch(states)
     }
 
-    fn evaluate(&self) -> Result<ScalarValue> {
+    fn evaluate(&mut self) -> Result<ScalarValue> {
         Ok(ScalarValue::Boolean(self.acc))
     }
 
diff --git a/datafusion/physical-expr/src/aggregate/correlation.rs 
b/datafusion/physical-expr/src/aggregate/correlation.rs
index 61f2db5c8e..4dca1e4a88 100644
--- a/datafusion/physical-expr/src/aggregate/correlation.rs
+++ b/datafusion/physical-expr/src/aggregate/correlation.rs
@@ -149,7 +149,7 @@ impl CorrelationAccumulator {
 }
 
 impl Accumulator for CorrelationAccumulator {
-    fn state(&self) -> Result<Vec<ScalarValue>> {
+    fn state(&mut self) -> Result<Vec<ScalarValue>> {
         Ok(vec![
             ScalarValue::from(self.covar.get_count()),
             ScalarValue::from(self.covar.get_mean1()),
@@ -215,7 +215,7 @@ impl Accumulator for CorrelationAccumulator {
         Ok(())
     }
 
-    fn evaluate(&self) -> Result<ScalarValue> {
+    fn evaluate(&mut self) -> Result<ScalarValue> {
         let covar = self.covar.evaluate()?;
         let stddev1 = self.stddev1.evaluate()?;
         let stddev2 = self.stddev2.evaluate()?;
@@ -519,7 +519,7 @@ mod tests {
             .collect::<Result<Vec<_>>>()?;
         accum1.update_batch(&values1)?;
         accum2.update_batch(&values2)?;
-        let state2 = get_accum_scalar_values_as_arrays(accum2.as_ref())?;
+        let state2 = get_accum_scalar_values_as_arrays(accum2.as_mut())?;
         accum1.merge_batch(&state2)?;
         accum1.evaluate()
     }
diff --git a/datafusion/physical-expr/src/aggregate/count.rs 
b/datafusion/physical-expr/src/aggregate/count.rs
index b6d4b73004..3b0fe0efd3 100644
--- a/datafusion/physical-expr/src/aggregate/count.rs
+++ b/datafusion/physical-expr/src/aggregate/count.rs
@@ -294,7 +294,7 @@ impl CountAccumulator {
 }
 
 impl Accumulator for CountAccumulator {
-    fn state(&self) -> Result<Vec<ScalarValue>> {
+    fn state(&mut self) -> Result<Vec<ScalarValue>> {
         Ok(vec![ScalarValue::Int64(Some(self.count))])
     }
 
@@ -319,7 +319,7 @@ impl Accumulator for CountAccumulator {
         Ok(())
     }
 
-    fn evaluate(&self) -> Result<ScalarValue> {
+    fn evaluate(&mut self) -> Result<ScalarValue> {
         Ok(ScalarValue::Int64(Some(self.count)))
     }
 
diff --git a/datafusion/physical-expr/src/aggregate/count_distinct.rs 
b/datafusion/physical-expr/src/aggregate/count_distinct.rs
index 021c33fb94..ef1a248d5f 100644
--- a/datafusion/physical-expr/src/aggregate/count_distinct.rs
+++ b/datafusion/physical-expr/src/aggregate/count_distinct.rs
@@ -212,7 +212,7 @@ impl DistinctCountAccumulator {
 }
 
 impl Accumulator for DistinctCountAccumulator {
-    fn state(&self) -> Result<Vec<ScalarValue>> {
+    fn state(&mut self) -> Result<Vec<ScalarValue>> {
         let scalars = self.values.iter().cloned().collect::<Vec<_>>();
         let arr = ScalarValue::new_list(scalars.as_slice(), 
&self.state_data_type);
         Ok(vec![ScalarValue::List(arr)])
@@ -249,7 +249,7 @@ impl Accumulator for DistinctCountAccumulator {
         Ok(())
     }
 
-    fn evaluate(&self) -> Result<ScalarValue> {
+    fn evaluate(&mut self) -> Result<ScalarValue> {
         Ok(ScalarValue::Int64(Some(self.values.len() as i64)))
     }
 
@@ -288,7 +288,7 @@ where
     T: ArrowPrimitiveType + Send + Debug,
     T::Native: Eq + Hash,
 {
-    fn state(&self) -> Result<Vec<ScalarValue>> {
+    fn state(&mut self) -> Result<Vec<ScalarValue>> {
         let arr = Arc::new(PrimitiveArray::<T>::from_iter_values(
             self.values.iter().cloned(),
         )) as ArrayRef;
@@ -331,7 +331,7 @@ where
         })
     }
 
-    fn evaluate(&self) -> Result<ScalarValue> {
+    fn evaluate(&mut self) -> Result<ScalarValue> {
         Ok(ScalarValue::Int64(Some(self.values.len() as i64)))
     }
 
@@ -374,7 +374,7 @@ impl<T> Accumulator for FloatDistinctCountAccumulator<T>
 where
     T: ArrowPrimitiveType + Send + Debug,
 {
-    fn state(&self) -> Result<Vec<ScalarValue>> {
+    fn state(&mut self) -> Result<Vec<ScalarValue>> {
         let arr = Arc::new(PrimitiveArray::<T>::from_iter_values(
             self.values.iter().map(|v| v.0),
         )) as ArrayRef;
@@ -418,7 +418,7 @@ where
         })
     }
 
-    fn evaluate(&self) -> Result<ScalarValue> {
+    fn evaluate(&mut self) -> Result<ScalarValue> {
         Ok(ScalarValue::Int64(Some(self.values.len() as i64)))
     }
 
diff --git a/datafusion/physical-expr/src/aggregate/covariance.rs 
b/datafusion/physical-expr/src/aggregate/covariance.rs
index 0f838eb6fa..45f9926975 100644
--- a/datafusion/physical-expr/src/aggregate/covariance.rs
+++ b/datafusion/physical-expr/src/aggregate/covariance.rs
@@ -260,7 +260,7 @@ impl CovarianceAccumulator {
 }
 
 impl Accumulator for CovarianceAccumulator {
-    fn state(&self) -> Result<Vec<ScalarValue>> {
+    fn state(&mut self) -> Result<Vec<ScalarValue>> {
         Ok(vec![
             ScalarValue::from(self.count),
             ScalarValue::from(self.mean1),
@@ -381,7 +381,7 @@ impl Accumulator for CovarianceAccumulator {
         Ok(())
     }
 
-    fn evaluate(&self) -> Result<ScalarValue> {
+    fn evaluate(&mut self) -> Result<ScalarValue> {
         let count = match self.stats_type {
             StatsType::Population => self.count,
             StatsType::Sample => {
@@ -768,7 +768,7 @@ mod tests {
             .collect::<Result<Vec<_>>>()?;
         accum1.update_batch(&values1)?;
         accum2.update_batch(&values2)?;
-        let state2 = get_accum_scalar_values_as_arrays(accum2.as_ref())?;
+        let state2 = get_accum_scalar_values_as_arrays(accum2.as_mut())?;
         accum1.merge_batch(&state2)?;
         accum1.evaluate()
     }
diff --git a/datafusion/physical-expr/src/aggregate/first_last.rs 
b/datafusion/physical-expr/src/aggregate/first_last.rs
index 4afa8d0dd5..d2bf48551f 100644
--- a/datafusion/physical-expr/src/aggregate/first_last.rs
+++ b/datafusion/physical-expr/src/aggregate/first_last.rs
@@ -270,7 +270,7 @@ impl FirstValueAccumulator {
 }
 
 impl Accumulator for FirstValueAccumulator {
-    fn state(&self) -> Result<Vec<ScalarValue>> {
+    fn state(&mut self) -> Result<Vec<ScalarValue>> {
         let mut result = vec![self.first.clone()];
         result.extend(self.orderings.iter().cloned());
         result.push(ScalarValue::Boolean(Some(self.is_set)));
@@ -336,7 +336,7 @@ impl Accumulator for FirstValueAccumulator {
         Ok(())
     }
 
-    fn evaluate(&self) -> Result<ScalarValue> {
+    fn evaluate(&mut self) -> Result<ScalarValue> {
         Ok(self.first.clone())
     }
 
@@ -586,7 +586,7 @@ impl LastValueAccumulator {
 }
 
 impl Accumulator for LastValueAccumulator {
-    fn state(&self) -> Result<Vec<ScalarValue>> {
+    fn state(&mut self) -> Result<Vec<ScalarValue>> {
         let mut result = vec![self.last.clone()];
         result.extend(self.orderings.clone());
         result.push(ScalarValue::Boolean(Some(self.is_set)));
@@ -655,7 +655,7 @@ impl Accumulator for LastValueAccumulator {
         Ok(())
     }
 
-    fn evaluate(&self) -> Result<ScalarValue> {
+    fn evaluate(&mut self) -> Result<ScalarValue> {
         Ok(self.last.clone())
     }
 
diff --git 
a/datafusion/physical-expr/src/aggregate/groups_accumulator/adapter.rs 
b/datafusion/physical-expr/src/aggregate/groups_accumulator/adapter.rs
index b4e6d2ebc5..9856e1c989 100644
--- a/datafusion/physical-expr/src/aggregate/groups_accumulator/adapter.rs
+++ b/datafusion/physical-expr/src/aggregate/groups_accumulator/adapter.rs
@@ -271,7 +271,7 @@ impl GroupsAccumulator for GroupsAccumulatorAdapter {
 
         let results: Vec<ScalarValue> = states
             .into_iter()
-            .map(|state| {
+            .map(|mut state| {
                 self.free_allocation(state.size());
                 state.accumulator.evaluate()
             })
@@ -292,7 +292,7 @@ impl GroupsAccumulator for GroupsAccumulatorAdapter {
         // which we need to form into columns
         let mut results: Vec<Vec<ScalarValue>> = vec![];
 
-        for state in states {
+        for mut state in states {
             self.free_allocation(state.size());
             let accumulator_state = state.accumulator.state()?;
             results.resize_with(accumulator_state.len(), Vec::new);
diff --git a/datafusion/physical-expr/src/aggregate/median.rs 
b/datafusion/physical-expr/src/aggregate/median.rs
index 691b1c1752..94cc5c7fb7 100644
--- a/datafusion/physical-expr/src/aggregate/median.rs
+++ b/datafusion/physical-expr/src/aggregate/median.rs
@@ -145,7 +145,7 @@ impl<T: ArrowNumericType> std::fmt::Debug for 
MedianAccumulator<T> {
 }
 
 impl<T: ArrowNumericType> Accumulator for MedianAccumulator<T> {
-    fn state(&self) -> Result<Vec<ScalarValue>> {
+    fn state(&mut self) -> Result<Vec<ScalarValue>> {
         let all_values = self
             .all_values
             .iter()
@@ -171,9 +171,8 @@ impl<T: ArrowNumericType> Accumulator for 
MedianAccumulator<T> {
         Ok(())
     }
 
-    fn evaluate(&self) -> Result<ScalarValue> {
-        // TODO: evaluate could pass &mut self
-        let mut d = self.all_values.clone();
+    fn evaluate(&mut self) -> Result<ScalarValue> {
+        let mut d = std::mem::take(&mut self.all_values);
         let cmp = |x: &T::Native, y: &T::Native| x.compare(*y);
 
         let len = d.len();
diff --git a/datafusion/physical-expr/src/aggregate/min_max.rs 
b/datafusion/physical-expr/src/aggregate/min_max.rs
index ba3e708553..3573df3743 100644
--- a/datafusion/physical-expr/src/aggregate/min_max.rs
+++ b/datafusion/physical-expr/src/aggregate/min_max.rs
@@ -764,11 +764,11 @@ impl Accumulator for MaxAccumulator {
         self.update_batch(states)
     }
 
-    fn state(&self) -> Result<Vec<ScalarValue>> {
+    fn state(&mut self) -> Result<Vec<ScalarValue>> {
         Ok(vec![self.max.clone()])
     }
 
-    fn evaluate(&self) -> Result<ScalarValue> {
+    fn evaluate(&mut self) -> Result<ScalarValue> {
         Ok(self.max.clone())
     }
 
@@ -820,11 +820,11 @@ impl Accumulator for SlidingMaxAccumulator {
         self.update_batch(states)
     }
 
-    fn state(&self) -> Result<Vec<ScalarValue>> {
+    fn state(&mut self) -> Result<Vec<ScalarValue>> {
         Ok(vec![self.max.clone()])
     }
 
-    fn evaluate(&self) -> Result<ScalarValue> {
+    fn evaluate(&mut self) -> Result<ScalarValue> {
         Ok(self.max.clone())
     }
 
@@ -1016,7 +1016,7 @@ impl MinAccumulator {
 }
 
 impl Accumulator for MinAccumulator {
-    fn state(&self) -> Result<Vec<ScalarValue>> {
+    fn state(&mut self) -> Result<Vec<ScalarValue>> {
         Ok(vec![self.min.clone()])
     }
 
@@ -1031,7 +1031,7 @@ impl Accumulator for MinAccumulator {
         self.update_batch(states)
     }
 
-    fn evaluate(&self) -> Result<ScalarValue> {
+    fn evaluate(&mut self) -> Result<ScalarValue> {
         Ok(self.min.clone())
     }
 
@@ -1058,7 +1058,7 @@ impl SlidingMinAccumulator {
 }
 
 impl Accumulator for SlidingMinAccumulator {
-    fn state(&self) -> Result<Vec<ScalarValue>> {
+    fn state(&mut self) -> Result<Vec<ScalarValue>> {
         Ok(vec![self.min.clone()])
     }
 
@@ -1092,7 +1092,7 @@ impl Accumulator for SlidingMinAccumulator {
         self.update_batch(states)
     }
 
-    fn evaluate(&self) -> Result<ScalarValue> {
+    fn evaluate(&mut self) -> Result<ScalarValue> {
         Ok(self.min.clone())
     }
 
diff --git a/datafusion/physical-expr/src/aggregate/nth_value.rs 
b/datafusion/physical-expr/src/aggregate/nth_value.rs
index 5a1ca90b7f..26a1254858 100644
--- a/datafusion/physical-expr/src/aggregate/nth_value.rs
+++ b/datafusion/physical-expr/src/aggregate/nth_value.rs
@@ -302,7 +302,7 @@ impl Accumulator for NthValueAccumulator {
         Ok(())
     }
 
-    fn state(&self) -> Result<Vec<ScalarValue>> {
+    fn state(&mut self) -> Result<Vec<ScalarValue>> {
         let mut result = vec![self.evaluate_values()];
         if !self.ordering_req.is_empty() {
             result.push(self.evaluate_orderings());
@@ -310,7 +310,7 @@ impl Accumulator for NthValueAccumulator {
         Ok(result)
     }
 
-    fn evaluate(&self) -> Result<ScalarValue> {
+    fn evaluate(&mut self) -> Result<ScalarValue> {
         let n_required = self.n.unsigned_abs() as usize;
         let from_start = self.n > 0;
         let nth_value_idx = if from_start {
diff --git a/datafusion/physical-expr/src/aggregate/regr.rs 
b/datafusion/physical-expr/src/aggregate/regr.rs
index 6922cb131c..36e7b7c9b3 100644
--- a/datafusion/physical-expr/src/aggregate/regr.rs
+++ b/datafusion/physical-expr/src/aggregate/regr.rs
@@ -251,7 +251,7 @@ impl RegrAccumulator {
 }
 
 impl Accumulator for RegrAccumulator {
-    fn state(&self) -> Result<Vec<ScalarValue>> {
+    fn state(&mut self) -> Result<Vec<ScalarValue>> {
         Ok(vec![
             ScalarValue::from(self.count),
             ScalarValue::from(self.mean_x),
@@ -418,7 +418,7 @@ impl Accumulator for RegrAccumulator {
         Ok(())
     }
 
-    fn evaluate(&self) -> Result<ScalarValue> {
+    fn evaluate(&mut self) -> Result<ScalarValue> {
         let cov_pop_x_y = self.algo_const / self.count as f64;
         let var_pop_x = self.m2_x / self.count as f64;
         let var_pop_y = self.m2_y / self.count as f64;
diff --git a/datafusion/physical-expr/src/aggregate/stddev.rs 
b/datafusion/physical-expr/src/aggregate/stddev.rs
index 64e19ef502..dcc2b0e69c 100644
--- a/datafusion/physical-expr/src/aggregate/stddev.rs
+++ b/datafusion/physical-expr/src/aggregate/stddev.rs
@@ -200,7 +200,7 @@ impl StddevAccumulator {
 }
 
 impl Accumulator for StddevAccumulator {
-    fn state(&self) -> Result<Vec<ScalarValue>> {
+    fn state(&mut self) -> Result<Vec<ScalarValue>> {
         Ok(vec![
             ScalarValue::from(self.variance.get_count()),
             ScalarValue::from(self.variance.get_mean()),
@@ -220,7 +220,7 @@ impl Accumulator for StddevAccumulator {
         self.variance.merge_batch(states)
     }
 
-    fn evaluate(&self) -> Result<ScalarValue> {
+    fn evaluate(&mut self) -> Result<ScalarValue> {
         let variance = self.variance.evaluate()?;
         match variance {
             ScalarValue::Float64(e) => {
@@ -459,7 +459,7 @@ mod tests {
             .collect::<Result<Vec<_>>>()?;
         accum1.update_batch(&values1)?;
         accum2.update_batch(&values2)?;
-        let state2 = get_accum_scalar_values_as_arrays(accum2.as_ref())?;
+        let state2 = get_accum_scalar_values_as_arrays(accum2.as_mut())?;
         accum1.merge_batch(&state2)?;
         accum1.evaluate()
     }
diff --git a/datafusion/physical-expr/src/aggregate/string_agg.rs 
b/datafusion/physical-expr/src/aggregate/string_agg.rs
index 7adc736932..7a1da6d622 100644
--- a/datafusion/physical-expr/src/aggregate/string_agg.rs
+++ b/datafusion/physical-expr/src/aggregate/string_agg.rs
@@ -153,11 +153,11 @@ impl Accumulator for StringAggAccumulator {
         Ok(())
     }
 
-    fn state(&self) -> Result<Vec<ScalarValue>> {
+    fn state(&mut self) -> Result<Vec<ScalarValue>> {
         Ok(vec![self.evaluate()?])
     }
 
-    fn evaluate(&self) -> Result<ScalarValue> {
+    fn evaluate(&mut self) -> Result<ScalarValue> {
         Ok(ScalarValue::LargeUtf8(self.values.clone()))
     }
 
diff --git a/datafusion/physical-expr/src/aggregate/sum.rs 
b/datafusion/physical-expr/src/aggregate/sum.rs
index a770b3874c..6cf2810ce5 100644
--- a/datafusion/physical-expr/src/aggregate/sum.rs
+++ b/datafusion/physical-expr/src/aggregate/sum.rs
@@ -191,7 +191,7 @@ impl<T: ArrowNumericType> SumAccumulator<T> {
 }
 
 impl<T: ArrowNumericType> Accumulator for SumAccumulator<T> {
-    fn state(&self) -> Result<Vec<ScalarValue>> {
+    fn state(&mut self) -> Result<Vec<ScalarValue>> {
         Ok(vec![self.evaluate()?])
     }
 
@@ -208,7 +208,7 @@ impl<T: ArrowNumericType> Accumulator for SumAccumulator<T> 
{
         self.update_batch(states)
     }
 
-    fn evaluate(&self) -> Result<ScalarValue> {
+    fn evaluate(&mut self) -> Result<ScalarValue> {
         ScalarValue::new_primitive::<T>(self.sum, &self.data_type)
     }
 
@@ -243,7 +243,7 @@ impl<T: ArrowNumericType> SlidingSumAccumulator<T> {
 }
 
 impl<T: ArrowNumericType> Accumulator for SlidingSumAccumulator<T> {
-    fn state(&self) -> Result<Vec<ScalarValue>> {
+    fn state(&mut self) -> Result<Vec<ScalarValue>> {
         Ok(vec![self.evaluate()?, self.count.into()])
     }
 
@@ -267,7 +267,7 @@ impl<T: ArrowNumericType> Accumulator for 
SlidingSumAccumulator<T> {
         Ok(())
     }
 
-    fn evaluate(&self) -> Result<ScalarValue> {
+    fn evaluate(&mut self) -> Result<ScalarValue> {
         let v = (self.count != 0).then_some(self.sum);
         ScalarValue::new_primitive::<T>(v, &self.data_type)
     }
diff --git a/datafusion/physical-expr/src/aggregate/sum_distinct.rs 
b/datafusion/physical-expr/src/aggregate/sum_distinct.rs
index 6dbb392246..4c0f94b3a2 100644
--- a/datafusion/physical-expr/src/aggregate/sum_distinct.rs
+++ b/datafusion/physical-expr/src/aggregate/sum_distinct.rs
@@ -140,7 +140,7 @@ impl<T: ArrowPrimitiveType> DistinctSumAccumulator<T> {
 }
 
 impl<T: ArrowPrimitiveType> Accumulator for DistinctSumAccumulator<T> {
-    fn state(&self) -> Result<Vec<ScalarValue>> {
+    fn state(&mut self) -> Result<Vec<ScalarValue>> {
         // 1. Stores aggregate state in `ScalarValue::List`
         // 2. Constructs `ScalarValue::List` state from distinct numeric 
stored in hash set
         let state_out = {
@@ -186,7 +186,7 @@ impl<T: ArrowPrimitiveType> Accumulator for 
DistinctSumAccumulator<T> {
         Ok(())
     }
 
-    fn evaluate(&self) -> Result<ScalarValue> {
+    fn evaluate(&mut self) -> Result<ScalarValue> {
         let mut acc = T::Native::usize_as(0);
         for distinct_value in self.values.iter() {
             acc = acc.add_wrapping(distinct_value.0)
diff --git a/datafusion/physical-expr/src/aggregate/utils.rs 
b/datafusion/physical-expr/src/aggregate/utils.rs
index 6dd586bfb8..60d59c16be 100644
--- a/datafusion/physical-expr/src/aggregate/utils.rs
+++ b/datafusion/physical-expr/src/aggregate/utils.rs
@@ -35,7 +35,7 @@ use datafusion_expr::Accumulator;
 
 /// Convert scalar values from an accumulator into arrays.
 pub fn get_accum_scalar_values_as_arrays(
-    accum: &dyn Accumulator,
+    accum: &mut dyn Accumulator,
 ) -> Result<Vec<ArrayRef>> {
     accum
         .state()?
diff --git a/datafusion/physical-expr/src/aggregate/variance.rs 
b/datafusion/physical-expr/src/aggregate/variance.rs
index d82c5ad562..94d7be4265 100644
--- a/datafusion/physical-expr/src/aggregate/variance.rs
+++ b/datafusion/physical-expr/src/aggregate/variance.rs
@@ -231,7 +231,7 @@ impl VarianceAccumulator {
 }
 
 impl Accumulator for VarianceAccumulator {
-    fn state(&self) -> Result<Vec<ScalarValue>> {
+    fn state(&mut self) -> Result<Vec<ScalarValue>> {
         Ok(vec![
             ScalarValue::from(self.count),
             ScalarValue::from(self.mean),
@@ -302,7 +302,7 @@ impl Accumulator for VarianceAccumulator {
         Ok(())
     }
 
-    fn evaluate(&self) -> Result<ScalarValue> {
+    fn evaluate(&mut self) -> Result<ScalarValue> {
         let count = match self.stats_type {
             StatsType::Population => self.count,
             StatsType::Sample => {
@@ -533,7 +533,7 @@ mod tests {
             .collect::<Result<Vec<_>>>()?;
         accum1.update_batch(&values1)?;
         accum2.update_batch(&values2)?;
-        let state2 = get_accum_scalar_values_as_arrays(accum2.as_ref())?;
+        let state2 = get_accum_scalar_values_as_arrays(accum2.as_mut())?;
         accum1.merge_batch(&state2)?;
         accum1.evaluate()
     }
diff --git a/datafusion/physical-plan/src/aggregates/mod.rs 
b/datafusion/physical-plan/src/aggregates/mod.rs
index d3ae0d5ce0..2d7a8cccc4 100644
--- a/datafusion/physical-plan/src/aggregates/mod.rs
+++ b/datafusion/physical-plan/src/aggregates/mod.rs
@@ -1087,14 +1087,14 @@ fn create_accumulators(
 /// returns a vector of ArrayRefs, where each entry corresponds to either the
 /// final value (mode = Final, FinalPartitioned and Single) or states (mode = 
Partial)
 fn finalize_aggregation(
-    accumulators: &[AccumulatorItem],
+    accumulators: &mut [AccumulatorItem],
     mode: &AggregateMode,
 ) -> Result<Vec<ArrayRef>> {
     match mode {
         AggregateMode::Partial => {
             // Build the vector of states
             accumulators
-                .iter()
+                .iter_mut()
                 .map(|accumulator| {
                     accumulator.state().and_then(|e| {
                         e.iter()
@@ -1111,7 +1111,7 @@ fn finalize_aggregation(
         | AggregateMode::SinglePartitioned => {
             // Merge the state to the final value
             accumulators
-                .iter()
+                .iter_mut()
                 .map(|accumulator| accumulator.evaluate().and_then(|v| 
v.to_array()))
                 .collect()
         }
diff --git a/datafusion/physical-plan/src/aggregates/no_grouping.rs 
b/datafusion/physical-plan/src/aggregates/no_grouping.rs
index 90eb488a2e..5ec95bd799 100644
--- a/datafusion/physical-plan/src/aggregates/no_grouping.rs
+++ b/datafusion/physical-plan/src/aggregates/no_grouping.rs
@@ -137,12 +137,13 @@ impl AggregateStream {
                     None => {
                         this.finished = true;
                         let timer = 
this.baseline_metrics.elapsed_compute().timer();
-                        let result = finalize_aggregation(&this.accumulators, 
&this.mode)
-                            .and_then(|columns| {
-                                RecordBatch::try_new(this.schema.clone(), 
columns)
-                                    .map_err(Into::into)
-                            })
-                            .record_output(&this.baseline_metrics);
+                        let result =
+                            finalize_aggregation(&mut this.accumulators, 
&this.mode)
+                                .and_then(|columns| {
+                                    RecordBatch::try_new(this.schema.clone(), 
columns)
+                                        .map_err(Into::into)
+                                })
+                                .record_output(&this.baseline_metrics);
 
                         timer.done();
 
diff --git a/datafusion/proto/tests/cases/roundtrip_logical_plan.rs 
b/datafusion/proto/tests/cases/roundtrip_logical_plan.rs
index cf991e524f..17d47a65d8 100644
--- a/datafusion/proto/tests/cases/roundtrip_logical_plan.rs
+++ b/datafusion/proto/tests/cases/roundtrip_logical_plan.rs
@@ -1577,7 +1577,7 @@ fn roundtrip_aggregate_udf() {
     struct Dummy {}
 
     impl Accumulator for Dummy {
-        fn state(&self) -> datafusion::error::Result<Vec<ScalarValue>> {
+        fn state(&mut self) -> datafusion::error::Result<Vec<ScalarValue>> {
             Ok(vec![])
         }
 
@@ -1592,7 +1592,7 @@ fn roundtrip_aggregate_udf() {
             Ok(())
         }
 
-        fn evaluate(&self) -> datafusion::error::Result<ScalarValue> {
+        fn evaluate(&mut self) -> datafusion::error::Result<ScalarValue> {
             Ok(ScalarValue::Float64(None))
         }
 
@@ -1764,7 +1764,7 @@ fn roundtrip_window() {
     struct DummyAggr {}
 
     impl Accumulator for DummyAggr {
-        fn state(&self) -> datafusion::error::Result<Vec<ScalarValue>> {
+        fn state(&mut self) -> datafusion::error::Result<Vec<ScalarValue>> {
             Ok(vec![])
         }
 
@@ -1779,7 +1779,7 @@ fn roundtrip_window() {
             Ok(())
         }
 
-        fn evaluate(&self) -> datafusion::error::Result<ScalarValue> {
+        fn evaluate(&mut self) -> datafusion::error::Result<ScalarValue> {
             Ok(ScalarValue::Float64(None))
         }
 
diff --git a/datafusion/proto/tests/cases/roundtrip_physical_plan.rs 
b/datafusion/proto/tests/cases/roundtrip_physical_plan.rs
index 4f91713f48..9a95e103c2 100644
--- a/datafusion/proto/tests/cases/roundtrip_physical_plan.rs
+++ b/datafusion/proto/tests/cases/roundtrip_physical_plan.rs
@@ -353,7 +353,7 @@ fn roundtrip_aggregate_udaf() -> Result<()> {
     #[derive(Debug)]
     struct Example;
     impl Accumulator for Example {
-        fn state(&self) -> Result<Vec<ScalarValue>> {
+        fn state(&mut self) -> Result<Vec<ScalarValue>> {
             Ok(vec![ScalarValue::Int64(Some(0))])
         }
 
@@ -365,7 +365,7 @@ fn roundtrip_aggregate_udaf() -> Result<()> {
             Ok(())
         }
 
-        fn evaluate(&self) -> Result<ScalarValue> {
+        fn evaluate(&mut self) -> Result<ScalarValue> {
             Ok(ScalarValue::Int64(Some(0)))
         }
 
diff --git a/datafusion/substrait/tests/cases/roundtrip_logical_plan.rs 
b/datafusion/substrait/tests/cases/roundtrip_logical_plan.rs
index d7327caee4..79cf76de59 100644
--- a/datafusion/substrait/tests/cases/roundtrip_logical_plan.rs
+++ b/datafusion/substrait/tests/cases/roundtrip_logical_plan.rs
@@ -712,7 +712,7 @@ async fn roundtrip_aggregate_udf() -> Result<()> {
     struct Dummy {}
 
     impl Accumulator for Dummy {
-        fn state(&self) -> datafusion::error::Result<Vec<ScalarValue>> {
+        fn state(&mut self) -> datafusion::error::Result<Vec<ScalarValue>> {
             Ok(vec![])
         }
 
@@ -727,7 +727,7 @@ async fn roundtrip_aggregate_udf() -> Result<()> {
             Ok(())
         }
 
-        fn evaluate(&self) -> datafusion::error::Result<ScalarValue> {
+        fn evaluate(&mut self) -> datafusion::error::Result<ScalarValue> {
             Ok(ScalarValue::Float64(None))
         }
 


Reply via email to