This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new 5d70c32a9a Change `Accumulator::evaluate` and `Accumulator::state` to
take `&mut self` (#8925)
5d70c32a9a is described below
commit 5d70c32a9a4accf21e9f27ff5ed62666cbbcbe54
Author: Andrew Lamb <[email protected]>
AuthorDate: Wed Jan 24 07:01:43 2024 -0500
Change `Accumulator::evaluate` and `Accumulator::state` to take `&mut self`
(#8925)
* Change `Accumulator::evaluate` and `Accumulator::state` to take `&mut
self`
* improve docs
* fix signature
---
datafusion-examples/examples/advanced_udaf.rs | 4 ++--
datafusion-examples/examples/simple_udaf.rs | 4 ++--
datafusion/core/src/datasource/statistics.rs | 4 ++--
.../tests/user_defined/user_defined_aggregates.rs | 12 +++++------
.../user_defined/user_defined_scalar_functions.rs | 4 ++--
datafusion/expr/src/accumulator.rs | 23 ++++++++++++++++++----
.../physical-expr/src/aggregate/approx_distinct.rs | 4 ++--
.../src/aggregate/approx_percentile_cont.rs | 4 ++--
.../approx_percentile_cont_with_weight.rs | 4 ++--
.../physical-expr/src/aggregate/array_agg.rs | 4 ++--
.../src/aggregate/array_agg_distinct.rs | 4 ++--
.../src/aggregate/array_agg_ordered.rs | 4 ++--
datafusion/physical-expr/src/aggregate/average.rs | 8 ++++----
.../physical-expr/src/aggregate/bit_and_or_xor.rs | 16 +++++++--------
.../physical-expr/src/aggregate/bool_and_or.rs | 8 ++++----
.../physical-expr/src/aggregate/correlation.rs | 6 +++---
datafusion/physical-expr/src/aggregate/count.rs | 4 ++--
.../physical-expr/src/aggregate/count_distinct.rs | 12 +++++------
.../physical-expr/src/aggregate/covariance.rs | 6 +++---
.../physical-expr/src/aggregate/first_last.rs | 8 ++++----
.../src/aggregate/groups_accumulator/adapter.rs | 4 ++--
datafusion/physical-expr/src/aggregate/median.rs | 7 +++----
datafusion/physical-expr/src/aggregate/min_max.rs | 16 +++++++--------
.../physical-expr/src/aggregate/nth_value.rs | 4 ++--
datafusion/physical-expr/src/aggregate/regr.rs | 4 ++--
datafusion/physical-expr/src/aggregate/stddev.rs | 6 +++---
.../physical-expr/src/aggregate/string_agg.rs | 4 ++--
datafusion/physical-expr/src/aggregate/sum.rs | 8 ++++----
.../physical-expr/src/aggregate/sum_distinct.rs | 4 ++--
datafusion/physical-expr/src/aggregate/utils.rs | 2 +-
datafusion/physical-expr/src/aggregate/variance.rs | 6 +++---
datafusion/physical-plan/src/aggregates/mod.rs | 6 +++---
.../physical-plan/src/aggregates/no_grouping.rs | 13 ++++++------
.../proto/tests/cases/roundtrip_logical_plan.rs | 8 ++++----
.../proto/tests/cases/roundtrip_physical_plan.rs | 4 ++--
.../tests/cases/roundtrip_logical_plan.rs | 4 ++--
36 files changed, 129 insertions(+), 114 deletions(-)
diff --git a/datafusion-examples/examples/advanced_udaf.rs
b/datafusion-examples/examples/advanced_udaf.rs
index e5433013d9..10164a850b 100644
--- a/datafusion-examples/examples/advanced_udaf.rs
+++ b/datafusion-examples/examples/advanced_udaf.rs
@@ -125,7 +125,7 @@ impl Accumulator for GeometricMean {
// This function serializes our state to `ScalarValue`, which DataFusion
uses
// to pass this state between execution stages.
// Note that this can be arbitrary data.
- fn state(&self) -> Result<Vec<ScalarValue>> {
+ fn state(&mut self) -> Result<Vec<ScalarValue>> {
Ok(vec![
ScalarValue::from(self.prod),
ScalarValue::from(self.n),
@@ -134,7 +134,7 @@ impl Accumulator for GeometricMean {
// DataFusion expects this function to return the final value of this
aggregator.
// in this case, this is the formula of the geometric mean
- fn evaluate(&self) -> Result<ScalarValue> {
+ fn evaluate(&mut self) -> Result<ScalarValue> {
let value = self.prod.powf(1.0 / self.n as f64);
Ok(ScalarValue::from(value))
}
diff --git a/datafusion-examples/examples/simple_udaf.rs
b/datafusion-examples/examples/simple_udaf.rs
index 2c797f221b..0996a67245 100644
--- a/datafusion-examples/examples/simple_udaf.rs
+++ b/datafusion-examples/examples/simple_udaf.rs
@@ -72,7 +72,7 @@ impl Accumulator for GeometricMean {
// This function serializes our state to `ScalarValue`, which DataFusion
uses
// to pass this state between execution stages.
// Note that this can be arbitrary data.
- fn state(&self) -> Result<Vec<ScalarValue>> {
+ fn state(&mut self) -> Result<Vec<ScalarValue>> {
Ok(vec![
ScalarValue::from(self.prod),
ScalarValue::from(self.n),
@@ -81,7 +81,7 @@ impl Accumulator for GeometricMean {
// DataFusion expects this function to return the final value of this
aggregator.
// in this case, this is the formula of the geometric mean
- fn evaluate(&self) -> Result<ScalarValue> {
+ fn evaluate(&mut self) -> Result<ScalarValue> {
let value = self.prod.powf(1.0 / self.n as f64);
Ok(ScalarValue::from(value))
}
diff --git a/datafusion/core/src/datasource/statistics.rs
b/datafusion/core/src/datasource/statistics.rs
index 695e139517..73896f8eb7 100644
--- a/datafusion/core/src/datasource/statistics.rs
+++ b/datafusion/core/src/datasource/statistics.rs
@@ -194,11 +194,11 @@ pub(crate) fn get_col_stats(
) -> Vec<ColumnStatistics> {
(0..schema.fields().len())
.map(|i| {
- let max_value = match &max_values[i] {
+ let max_value = match max_values.get_mut(i).unwrap() {
Some(max_value) => max_value.evaluate().ok(),
None => None,
};
- let min_value = match &min_values[i] {
+ let min_value = match min_values.get_mut(i).unwrap() {
Some(min_value) => min_value.evaluate().ok(),
None => None,
};
diff --git a/datafusion/core/tests/user_defined/user_defined_aggregates.rs
b/datafusion/core/tests/user_defined/user_defined_aggregates.rs
index 5b578daa7e..5dbac0322f 100644
--- a/datafusion/core/tests/user_defined/user_defined_aggregates.rs
+++ b/datafusion/core/tests/user_defined/user_defined_aggregates.rs
@@ -456,7 +456,7 @@ impl TimeSum {
}
impl Accumulator for TimeSum {
- fn state(&self) -> Result<Vec<ScalarValue>> {
+ fn state(&mut self) -> Result<Vec<ScalarValue>> {
Ok(vec![self.evaluate()?])
}
@@ -478,7 +478,7 @@ impl Accumulator for TimeSum {
self.update_batch(states)
}
- fn evaluate(&self) -> Result<ScalarValue> {
+ fn evaluate(&mut self) -> Result<ScalarValue> {
println!("Evaluating to {}", self.sum);
Ok(ScalarValue::TimestampNanosecond(Some(self.sum), None))
}
@@ -603,14 +603,14 @@ impl FirstSelector {
}
impl Accumulator for FirstSelector {
- fn state(&self) -> Result<Vec<ScalarValue>> {
+ fn state(&mut self) -> Result<Vec<ScalarValue>> {
let state = self.to_state().into_iter().collect::<Vec<_>>();
Ok(state)
}
/// produce the output structure
- fn evaluate(&self) -> Result<ScalarValue> {
+ fn evaluate(&mut self) -> Result<ScalarValue> {
Ok(self.to_scalar())
}
@@ -689,7 +689,7 @@ impl Accumulator for TestGroupsAccumulator {
Ok(())
}
- fn evaluate(&self) -> Result<ScalarValue> {
+ fn evaluate(&mut self) -> Result<ScalarValue> {
Ok(ScalarValue::from(self.result))
}
@@ -697,7 +697,7 @@ impl Accumulator for TestGroupsAccumulator {
std::mem::size_of::<u64>()
}
- fn state(&self) -> Result<Vec<ScalarValue>> {
+ fn state(&mut self) -> Result<Vec<ScalarValue>> {
Ok(vec![ScalarValue::from(self.result)])
}
diff --git
a/datafusion/core/tests/user_defined/user_defined_scalar_functions.rs
b/datafusion/core/tests/user_defined/user_defined_scalar_functions.rs
index b8573a690e..f1e32591fb 100644
--- a/datafusion/core/tests/user_defined/user_defined_scalar_functions.rs
+++ b/datafusion/core/tests/user_defined/user_defined_scalar_functions.rs
@@ -254,7 +254,7 @@ async fn udaf_as_window_func() -> Result<()> {
struct MyAccumulator;
impl Accumulator for MyAccumulator {
- fn state(&self) -> Result<Vec<ScalarValue>> {
+ fn state(&mut self) -> Result<Vec<ScalarValue>> {
unimplemented!()
}
@@ -266,7 +266,7 @@ async fn udaf_as_window_func() -> Result<()> {
unimplemented!()
}
- fn evaluate(&self) -> Result<ScalarValue> {
+ fn evaluate(&mut self) -> Result<ScalarValue> {
unimplemented!()
}
diff --git a/datafusion/expr/src/accumulator.rs
b/datafusion/expr/src/accumulator.rs
index 32de88b3d9..523e4e21a6 100644
--- a/datafusion/expr/src/accumulator.rs
+++ b/datafusion/expr/src/accumulator.rs
@@ -56,11 +56,18 @@ pub trait Accumulator: Send + Sync + Debug {
/// running sum.
fn update_batch(&mut self, values: &[ArrayRef]) -> Result<()>;
- /// Returns the final aggregate value.
+ /// Returns the final aggregate value, consuming the internal state.
///
/// For example, the `SUM` accumulator maintains a running sum,
/// and `evaluate` will produce that running sum as its output.
- fn evaluate(&self) -> Result<ScalarValue>;
+ ///
+ /// After this call, the accumulator's internal state should be
+ /// equivalent to when it was first created.
+ ///
+ /// This function gets `&mut self` to allow for the accumulator to build
+ /// arrow compatible internal state that can be returned without copying
+ /// when possible (for example distinct strings)
+ fn evaluate(&mut self) -> Result<ScalarValue>;
/// Returns the allocated size required for this accumulator, in
/// bytes, including `Self`.
@@ -72,7 +79,15 @@ pub trait Accumulator: Send + Sync + Debug {
/// the `capacity` should be used not the `len`.
fn size(&self) -> usize;
- /// Returns the intermediate state of the accumulator.
+ /// Returns the intermediate state of the accumulator, consuming the
+ /// intermediate state.
+ ///
+ /// After this call, the accumulator's internal state should be
+ /// equivalent to when it was first created.
+ ///
+ /// This function gets `&mut self` to allow for the accumulator to build
+ /// arrow compatible internal state that can be returned without copying
+ /// when possible (for example distinct strings).
///
/// Intermediate state is used for "multi-phase" grouping in
/// DataFusion, where an aggregate is computed in parallel with
@@ -129,7 +144,7 @@ pub trait Accumulator: Send + Sync + Debug {
/// Note that [`ScalarValue::List`] can be used to pass multiple
/// values if the number of intermediate values is not known at
/// planning time (e.g. for `MEDIAN`)
- fn state(&self) -> Result<Vec<ScalarValue>>;
+ fn state(&mut self) -> Result<Vec<ScalarValue>>;
/// Updates the accumulator's state from an `Array` containing one
/// or more intermediate values.
diff --git a/datafusion/physical-expr/src/aggregate/approx_distinct.rs
b/datafusion/physical-expr/src/aggregate/approx_distinct.rs
index b79a5611c3..66e1310695 100644
--- a/datafusion/physical-expr/src/aggregate/approx_distinct.rs
+++ b/datafusion/physical-expr/src/aggregate/approx_distinct.rs
@@ -244,12 +244,12 @@ macro_rules! default_accumulator_impl {
Ok(())
}
- fn state(&self) -> Result<Vec<ScalarValue>> {
+ fn state(&mut self) -> Result<Vec<ScalarValue>> {
let value = ScalarValue::from(&self.hll);
Ok(vec![value])
}
- fn evaluate(&self) -> Result<ScalarValue> {
+ fn evaluate(&mut self) -> Result<ScalarValue> {
Ok(ScalarValue::UInt64(Some(self.hll.count() as u64)))
}
diff --git a/datafusion/physical-expr/src/aggregate/approx_percentile_cont.rs
b/datafusion/physical-expr/src/aggregate/approx_percentile_cont.rs
index 15c0fb3ace..b3de7b0b4d 100644
--- a/datafusion/physical-expr/src/aggregate/approx_percentile_cont.rs
+++ b/datafusion/physical-expr/src/aggregate/approx_percentile_cont.rs
@@ -377,7 +377,7 @@ impl ApproxPercentileAccumulator {
}
impl Accumulator for ApproxPercentileAccumulator {
- fn state(&self) -> Result<Vec<ScalarValue>> {
+ fn state(&mut self) -> Result<Vec<ScalarValue>> {
Ok(self.digest.to_scalar_state().into_iter().collect())
}
@@ -389,7 +389,7 @@ impl Accumulator for ApproxPercentileAccumulator {
Ok(())
}
- fn evaluate(&self) -> Result<ScalarValue> {
+ fn evaluate(&mut self) -> Result<ScalarValue> {
if self.digest.count() == 0.0 {
return exec_err!("aggregate function needs at least one non-null
element");
}
diff --git
a/datafusion/physical-expr/src/aggregate/approx_percentile_cont_with_weight.rs
b/datafusion/physical-expr/src/aggregate/approx_percentile_cont_with_weight.rs
index ee5ef7228f..3fa715a592 100644
---
a/datafusion/physical-expr/src/aggregate/approx_percentile_cont_with_weight.rs
+++
b/datafusion/physical-expr/src/aggregate/approx_percentile_cont_with_weight.rs
@@ -129,7 +129,7 @@ impl ApproxPercentileWithWeightAccumulator {
}
impl Accumulator for ApproxPercentileWithWeightAccumulator {
- fn state(&self) -> Result<Vec<ScalarValue>> {
+ fn state(&mut self) -> Result<Vec<ScalarValue>> {
self.approx_percentile_cont_accumulator.state()
}
@@ -155,7 +155,7 @@ impl Accumulator for ApproxPercentileWithWeightAccumulator {
Ok(())
}
- fn evaluate(&self) -> Result<ScalarValue> {
+ fn evaluate(&mut self) -> Result<ScalarValue> {
self.approx_percentile_cont_accumulator.evaluate()
}
diff --git a/datafusion/physical-expr/src/aggregate/array_agg.rs
b/datafusion/physical-expr/src/aggregate/array_agg.rs
index 91d5c867d3..5dc29f834f 100644
--- a/datafusion/physical-expr/src/aggregate/array_agg.rs
+++ b/datafusion/physical-expr/src/aggregate/array_agg.rs
@@ -153,11 +153,11 @@ impl Accumulator for ArrayAggAccumulator {
Ok(())
}
- fn state(&self) -> Result<Vec<ScalarValue>> {
+ fn state(&mut self) -> Result<Vec<ScalarValue>> {
Ok(vec![self.evaluate()?])
}
- fn evaluate(&self) -> Result<ScalarValue> {
+ fn evaluate(&mut self) -> Result<ScalarValue> {
// Transform Vec<ListArr> to ListArr
let element_arrays: Vec<&dyn Array> =
diff --git a/datafusion/physical-expr/src/aggregate/array_agg_distinct.rs
b/datafusion/physical-expr/src/aggregate/array_agg_distinct.rs
index 2d263a42e0..a58856e398 100644
--- a/datafusion/physical-expr/src/aggregate/array_agg_distinct.rs
+++ b/datafusion/physical-expr/src/aggregate/array_agg_distinct.rs
@@ -129,7 +129,7 @@ impl DistinctArrayAggAccumulator {
}
impl Accumulator for DistinctArrayAggAccumulator {
- fn state(&self) -> Result<Vec<ScalarValue>> {
+ fn state(&mut self) -> Result<Vec<ScalarValue>> {
Ok(vec![self.evaluate()?])
}
@@ -163,7 +163,7 @@ impl Accumulator for DistinctArrayAggAccumulator {
Ok(())
}
- fn evaluate(&self) -> Result<ScalarValue> {
+ fn evaluate(&mut self) -> Result<ScalarValue> {
let values: Vec<ScalarValue> = self.values.iter().cloned().collect();
let arr = ScalarValue::new_list(&values, &self.datatype);
Ok(ScalarValue::List(arr))
diff --git a/datafusion/physical-expr/src/aggregate/array_agg_ordered.rs
b/datafusion/physical-expr/src/aggregate/array_agg_ordered.rs
index 34f8d20628..5263fa83a6 100644
--- a/datafusion/physical-expr/src/aggregate/array_agg_ordered.rs
+++ b/datafusion/physical-expr/src/aggregate/array_agg_ordered.rs
@@ -279,13 +279,13 @@ impl Accumulator for OrderSensitiveArrayAggAccumulator {
Ok(())
}
- fn state(&self) -> Result<Vec<ScalarValue>> {
+ fn state(&mut self) -> Result<Vec<ScalarValue>> {
let mut result = vec![self.evaluate()?];
result.push(self.evaluate_orderings()?);
Ok(result)
}
- fn evaluate(&self) -> Result<ScalarValue> {
+ fn evaluate(&mut self) -> Result<ScalarValue> {
let values = self.values.clone();
let array = if self.reverse {
ScalarValue::new_list_from_iter(values.into_iter().rev(),
&self.datatypes[0])
diff --git a/datafusion/physical-expr/src/aggregate/average.rs
b/datafusion/physical-expr/src/aggregate/average.rs
index 187373e14f..57f8fa211e 100644
--- a/datafusion/physical-expr/src/aggregate/average.rs
+++ b/datafusion/physical-expr/src/aggregate/average.rs
@@ -238,7 +238,7 @@ pub struct AvgAccumulator {
}
impl Accumulator for AvgAccumulator {
- fn state(&self) -> Result<Vec<ScalarValue>> {
+ fn state(&mut self) -> Result<Vec<ScalarValue>> {
Ok(vec![
ScalarValue::from(self.count),
ScalarValue::Float64(self.sum),
@@ -276,7 +276,7 @@ impl Accumulator for AvgAccumulator {
Ok(())
}
- fn evaluate(&self) -> Result<ScalarValue> {
+ fn evaluate(&mut self) -> Result<ScalarValue> {
Ok(ScalarValue::Float64(
self.sum.map(|f| f / self.count as f64),
))
@@ -314,7 +314,7 @@ impl<T: DecimalType + ArrowNumericType> Debug for
DecimalAvgAccumulator<T> {
}
impl<T: DecimalType + ArrowNumericType> Accumulator for
DecimalAvgAccumulator<T> {
- fn state(&self) -> Result<Vec<ScalarValue>> {
+ fn state(&mut self) -> Result<Vec<ScalarValue>> {
Ok(vec![
ScalarValue::from(self.count),
ScalarValue::new_primitive::<T>(
@@ -356,7 +356,7 @@ impl<T: DecimalType + ArrowNumericType> Accumulator for
DecimalAvgAccumulator<T>
Ok(())
}
- fn evaluate(&self) -> Result<ScalarValue> {
+ fn evaluate(&mut self) -> Result<ScalarValue> {
let v = self
.sum
.map(|v| {
diff --git a/datafusion/physical-expr/src/aggregate/bit_and_or_xor.rs
b/datafusion/physical-expr/src/aggregate/bit_and_or_xor.rs
index 92883d8049..ad5e8a5ac7 100644
--- a/datafusion/physical-expr/src/aggregate/bit_and_or_xor.rs
+++ b/datafusion/physical-expr/src/aggregate/bit_and_or_xor.rs
@@ -190,11 +190,11 @@ where
self.update_batch(states)
}
- fn state(&self) -> Result<Vec<ScalarValue>> {
+ fn state(&mut self) -> Result<Vec<ScalarValue>> {
Ok(vec![self.evaluate()?])
}
- fn evaluate(&self) -> Result<ScalarValue> {
+ fn evaluate(&mut self) -> Result<ScalarValue> {
ScalarValue::new_primitive::<T>(self.value, &T::DATA_TYPE)
}
@@ -339,7 +339,7 @@ impl<T: ArrowNumericType> Accumulator for
BitOrAccumulator<T>
where
T::Native: std::ops::BitOr<Output = T::Native>,
{
- fn state(&self) -> Result<Vec<ScalarValue>> {
+ fn state(&mut self) -> Result<Vec<ScalarValue>> {
Ok(vec![self.evaluate()?])
}
@@ -355,7 +355,7 @@ where
self.update_batch(states)
}
- fn evaluate(&self) -> Result<ScalarValue> {
+ fn evaluate(&mut self) -> Result<ScalarValue> {
ScalarValue::new_primitive::<T>(self.value, &T::DATA_TYPE)
}
@@ -500,7 +500,7 @@ impl<T: ArrowNumericType> Accumulator for
BitXorAccumulator<T>
where
T::Native: std::ops::BitXor<Output = T::Native>,
{
- fn state(&self) -> Result<Vec<ScalarValue>> {
+ fn state(&mut self) -> Result<Vec<ScalarValue>> {
Ok(vec![self.evaluate()?])
}
@@ -516,7 +516,7 @@ where
self.update_batch(states)
}
- fn evaluate(&self) -> Result<ScalarValue> {
+ fn evaluate(&mut self) -> Result<ScalarValue> {
ScalarValue::new_primitive::<T>(self.value, &T::DATA_TYPE)
}
@@ -634,7 +634,7 @@ impl<T: ArrowNumericType> Accumulator for
DistinctBitXorAccumulator<T>
where
T::Native: std::ops::BitXor<Output = T::Native> + std::hash::Hash + Eq,
{
- fn state(&self) -> Result<Vec<ScalarValue>> {
+ fn state(&mut self) -> Result<Vec<ScalarValue>> {
// 1. Stores aggregate state in `ScalarValue::List`
// 2. Constructs `ScalarValue::List` state from distinct numeric
stored in hash set
let state_out = {
@@ -679,7 +679,7 @@ where
Ok(())
}
- fn evaluate(&self) -> Result<ScalarValue> {
+ fn evaluate(&mut self) -> Result<ScalarValue> {
let mut acc = T::Native::usize_as(0);
for distinct_value in self.values.iter() {
acc = acc ^ *distinct_value;
diff --git a/datafusion/physical-expr/src/aggregate/bool_and_or.rs
b/datafusion/physical-expr/src/aggregate/bool_and_or.rs
index ae205141b4..0a018fe086 100644
--- a/datafusion/physical-expr/src/aggregate/bool_and_or.rs
+++ b/datafusion/physical-expr/src/aggregate/bool_and_or.rs
@@ -191,11 +191,11 @@ impl Accumulator for BoolAndAccumulator {
self.update_batch(states)
}
- fn state(&self) -> Result<Vec<ScalarValue>> {
+ fn state(&mut self) -> Result<Vec<ScalarValue>> {
Ok(vec![ScalarValue::Boolean(self.acc)])
}
- fn evaluate(&self) -> Result<ScalarValue> {
+ fn evaluate(&mut self) -> Result<ScalarValue> {
Ok(ScalarValue::Boolean(self.acc))
}
@@ -309,7 +309,7 @@ struct BoolOrAccumulator {
}
impl Accumulator for BoolOrAccumulator {
- fn state(&self) -> Result<Vec<ScalarValue>> {
+ fn state(&mut self) -> Result<Vec<ScalarValue>> {
Ok(vec![ScalarValue::Boolean(self.acc)])
}
@@ -328,7 +328,7 @@ impl Accumulator for BoolOrAccumulator {
self.update_batch(states)
}
- fn evaluate(&self) -> Result<ScalarValue> {
+ fn evaluate(&mut self) -> Result<ScalarValue> {
Ok(ScalarValue::Boolean(self.acc))
}
diff --git a/datafusion/physical-expr/src/aggregate/correlation.rs
b/datafusion/physical-expr/src/aggregate/correlation.rs
index 61f2db5c8e..4dca1e4a88 100644
--- a/datafusion/physical-expr/src/aggregate/correlation.rs
+++ b/datafusion/physical-expr/src/aggregate/correlation.rs
@@ -149,7 +149,7 @@ impl CorrelationAccumulator {
}
impl Accumulator for CorrelationAccumulator {
- fn state(&self) -> Result<Vec<ScalarValue>> {
+ fn state(&mut self) -> Result<Vec<ScalarValue>> {
Ok(vec![
ScalarValue::from(self.covar.get_count()),
ScalarValue::from(self.covar.get_mean1()),
@@ -215,7 +215,7 @@ impl Accumulator for CorrelationAccumulator {
Ok(())
}
- fn evaluate(&self) -> Result<ScalarValue> {
+ fn evaluate(&mut self) -> Result<ScalarValue> {
let covar = self.covar.evaluate()?;
let stddev1 = self.stddev1.evaluate()?;
let stddev2 = self.stddev2.evaluate()?;
@@ -519,7 +519,7 @@ mod tests {
.collect::<Result<Vec<_>>>()?;
accum1.update_batch(&values1)?;
accum2.update_batch(&values2)?;
- let state2 = get_accum_scalar_values_as_arrays(accum2.as_ref())?;
+ let state2 = get_accum_scalar_values_as_arrays(accum2.as_mut())?;
accum1.merge_batch(&state2)?;
accum1.evaluate()
}
diff --git a/datafusion/physical-expr/src/aggregate/count.rs
b/datafusion/physical-expr/src/aggregate/count.rs
index b6d4b73004..3b0fe0efd3 100644
--- a/datafusion/physical-expr/src/aggregate/count.rs
+++ b/datafusion/physical-expr/src/aggregate/count.rs
@@ -294,7 +294,7 @@ impl CountAccumulator {
}
impl Accumulator for CountAccumulator {
- fn state(&self) -> Result<Vec<ScalarValue>> {
+ fn state(&mut self) -> Result<Vec<ScalarValue>> {
Ok(vec![ScalarValue::Int64(Some(self.count))])
}
@@ -319,7 +319,7 @@ impl Accumulator for CountAccumulator {
Ok(())
}
- fn evaluate(&self) -> Result<ScalarValue> {
+ fn evaluate(&mut self) -> Result<ScalarValue> {
Ok(ScalarValue::Int64(Some(self.count)))
}
diff --git a/datafusion/physical-expr/src/aggregate/count_distinct.rs
b/datafusion/physical-expr/src/aggregate/count_distinct.rs
index 021c33fb94..ef1a248d5f 100644
--- a/datafusion/physical-expr/src/aggregate/count_distinct.rs
+++ b/datafusion/physical-expr/src/aggregate/count_distinct.rs
@@ -212,7 +212,7 @@ impl DistinctCountAccumulator {
}
impl Accumulator for DistinctCountAccumulator {
- fn state(&self) -> Result<Vec<ScalarValue>> {
+ fn state(&mut self) -> Result<Vec<ScalarValue>> {
let scalars = self.values.iter().cloned().collect::<Vec<_>>();
let arr = ScalarValue::new_list(scalars.as_slice(),
&self.state_data_type);
Ok(vec![ScalarValue::List(arr)])
@@ -249,7 +249,7 @@ impl Accumulator for DistinctCountAccumulator {
Ok(())
}
- fn evaluate(&self) -> Result<ScalarValue> {
+ fn evaluate(&mut self) -> Result<ScalarValue> {
Ok(ScalarValue::Int64(Some(self.values.len() as i64)))
}
@@ -288,7 +288,7 @@ where
T: ArrowPrimitiveType + Send + Debug,
T::Native: Eq + Hash,
{
- fn state(&self) -> Result<Vec<ScalarValue>> {
+ fn state(&mut self) -> Result<Vec<ScalarValue>> {
let arr = Arc::new(PrimitiveArray::<T>::from_iter_values(
self.values.iter().cloned(),
)) as ArrayRef;
@@ -331,7 +331,7 @@ where
})
}
- fn evaluate(&self) -> Result<ScalarValue> {
+ fn evaluate(&mut self) -> Result<ScalarValue> {
Ok(ScalarValue::Int64(Some(self.values.len() as i64)))
}
@@ -374,7 +374,7 @@ impl<T> Accumulator for FloatDistinctCountAccumulator<T>
where
T: ArrowPrimitiveType + Send + Debug,
{
- fn state(&self) -> Result<Vec<ScalarValue>> {
+ fn state(&mut self) -> Result<Vec<ScalarValue>> {
let arr = Arc::new(PrimitiveArray::<T>::from_iter_values(
self.values.iter().map(|v| v.0),
)) as ArrayRef;
@@ -418,7 +418,7 @@ where
})
}
- fn evaluate(&self) -> Result<ScalarValue> {
+ fn evaluate(&mut self) -> Result<ScalarValue> {
Ok(ScalarValue::Int64(Some(self.values.len() as i64)))
}
diff --git a/datafusion/physical-expr/src/aggregate/covariance.rs
b/datafusion/physical-expr/src/aggregate/covariance.rs
index 0f838eb6fa..45f9926975 100644
--- a/datafusion/physical-expr/src/aggregate/covariance.rs
+++ b/datafusion/physical-expr/src/aggregate/covariance.rs
@@ -260,7 +260,7 @@ impl CovarianceAccumulator {
}
impl Accumulator for CovarianceAccumulator {
- fn state(&self) -> Result<Vec<ScalarValue>> {
+ fn state(&mut self) -> Result<Vec<ScalarValue>> {
Ok(vec![
ScalarValue::from(self.count),
ScalarValue::from(self.mean1),
@@ -381,7 +381,7 @@ impl Accumulator for CovarianceAccumulator {
Ok(())
}
- fn evaluate(&self) -> Result<ScalarValue> {
+ fn evaluate(&mut self) -> Result<ScalarValue> {
let count = match self.stats_type {
StatsType::Population => self.count,
StatsType::Sample => {
@@ -768,7 +768,7 @@ mod tests {
.collect::<Result<Vec<_>>>()?;
accum1.update_batch(&values1)?;
accum2.update_batch(&values2)?;
- let state2 = get_accum_scalar_values_as_arrays(accum2.as_ref())?;
+ let state2 = get_accum_scalar_values_as_arrays(accum2.as_mut())?;
accum1.merge_batch(&state2)?;
accum1.evaluate()
}
diff --git a/datafusion/physical-expr/src/aggregate/first_last.rs
b/datafusion/physical-expr/src/aggregate/first_last.rs
index 4afa8d0dd5..d2bf48551f 100644
--- a/datafusion/physical-expr/src/aggregate/first_last.rs
+++ b/datafusion/physical-expr/src/aggregate/first_last.rs
@@ -270,7 +270,7 @@ impl FirstValueAccumulator {
}
impl Accumulator for FirstValueAccumulator {
- fn state(&self) -> Result<Vec<ScalarValue>> {
+ fn state(&mut self) -> Result<Vec<ScalarValue>> {
let mut result = vec![self.first.clone()];
result.extend(self.orderings.iter().cloned());
result.push(ScalarValue::Boolean(Some(self.is_set)));
@@ -336,7 +336,7 @@ impl Accumulator for FirstValueAccumulator {
Ok(())
}
- fn evaluate(&self) -> Result<ScalarValue> {
+ fn evaluate(&mut self) -> Result<ScalarValue> {
Ok(self.first.clone())
}
@@ -586,7 +586,7 @@ impl LastValueAccumulator {
}
impl Accumulator for LastValueAccumulator {
- fn state(&self) -> Result<Vec<ScalarValue>> {
+ fn state(&mut self) -> Result<Vec<ScalarValue>> {
let mut result = vec![self.last.clone()];
result.extend(self.orderings.clone());
result.push(ScalarValue::Boolean(Some(self.is_set)));
@@ -655,7 +655,7 @@ impl Accumulator for LastValueAccumulator {
Ok(())
}
- fn evaluate(&self) -> Result<ScalarValue> {
+ fn evaluate(&mut self) -> Result<ScalarValue> {
Ok(self.last.clone())
}
diff --git
a/datafusion/physical-expr/src/aggregate/groups_accumulator/adapter.rs
b/datafusion/physical-expr/src/aggregate/groups_accumulator/adapter.rs
index b4e6d2ebc5..9856e1c989 100644
--- a/datafusion/physical-expr/src/aggregate/groups_accumulator/adapter.rs
+++ b/datafusion/physical-expr/src/aggregate/groups_accumulator/adapter.rs
@@ -271,7 +271,7 @@ impl GroupsAccumulator for GroupsAccumulatorAdapter {
let results: Vec<ScalarValue> = states
.into_iter()
- .map(|state| {
+ .map(|mut state| {
self.free_allocation(state.size());
state.accumulator.evaluate()
})
@@ -292,7 +292,7 @@ impl GroupsAccumulator for GroupsAccumulatorAdapter {
// which we need to form into columns
let mut results: Vec<Vec<ScalarValue>> = vec![];
- for state in states {
+ for mut state in states {
self.free_allocation(state.size());
let accumulator_state = state.accumulator.state()?;
results.resize_with(accumulator_state.len(), Vec::new);
diff --git a/datafusion/physical-expr/src/aggregate/median.rs
b/datafusion/physical-expr/src/aggregate/median.rs
index 691b1c1752..94cc5c7fb7 100644
--- a/datafusion/physical-expr/src/aggregate/median.rs
+++ b/datafusion/physical-expr/src/aggregate/median.rs
@@ -145,7 +145,7 @@ impl<T: ArrowNumericType> std::fmt::Debug for
MedianAccumulator<T> {
}
impl<T: ArrowNumericType> Accumulator for MedianAccumulator<T> {
- fn state(&self) -> Result<Vec<ScalarValue>> {
+ fn state(&mut self) -> Result<Vec<ScalarValue>> {
let all_values = self
.all_values
.iter()
@@ -171,9 +171,8 @@ impl<T: ArrowNumericType> Accumulator for
MedianAccumulator<T> {
Ok(())
}
- fn evaluate(&self) -> Result<ScalarValue> {
- // TODO: evaluate could pass &mut self
- let mut d = self.all_values.clone();
+ fn evaluate(&mut self) -> Result<ScalarValue> {
+ let mut d = std::mem::take(&mut self.all_values);
let cmp = |x: &T::Native, y: &T::Native| x.compare(*y);
let len = d.len();
diff --git a/datafusion/physical-expr/src/aggregate/min_max.rs
b/datafusion/physical-expr/src/aggregate/min_max.rs
index ba3e708553..3573df3743 100644
--- a/datafusion/physical-expr/src/aggregate/min_max.rs
+++ b/datafusion/physical-expr/src/aggregate/min_max.rs
@@ -764,11 +764,11 @@ impl Accumulator for MaxAccumulator {
self.update_batch(states)
}
- fn state(&self) -> Result<Vec<ScalarValue>> {
+ fn state(&mut self) -> Result<Vec<ScalarValue>> {
Ok(vec![self.max.clone()])
}
- fn evaluate(&self) -> Result<ScalarValue> {
+ fn evaluate(&mut self) -> Result<ScalarValue> {
Ok(self.max.clone())
}
@@ -820,11 +820,11 @@ impl Accumulator for SlidingMaxAccumulator {
self.update_batch(states)
}
- fn state(&self) -> Result<Vec<ScalarValue>> {
+ fn state(&mut self) -> Result<Vec<ScalarValue>> {
Ok(vec![self.max.clone()])
}
- fn evaluate(&self) -> Result<ScalarValue> {
+ fn evaluate(&mut self) -> Result<ScalarValue> {
Ok(self.max.clone())
}
@@ -1016,7 +1016,7 @@ impl MinAccumulator {
}
impl Accumulator for MinAccumulator {
- fn state(&self) -> Result<Vec<ScalarValue>> {
+ fn state(&mut self) -> Result<Vec<ScalarValue>> {
Ok(vec![self.min.clone()])
}
@@ -1031,7 +1031,7 @@ impl Accumulator for MinAccumulator {
self.update_batch(states)
}
- fn evaluate(&self) -> Result<ScalarValue> {
+ fn evaluate(&mut self) -> Result<ScalarValue> {
Ok(self.min.clone())
}
@@ -1058,7 +1058,7 @@ impl SlidingMinAccumulator {
}
impl Accumulator for SlidingMinAccumulator {
- fn state(&self) -> Result<Vec<ScalarValue>> {
+ fn state(&mut self) -> Result<Vec<ScalarValue>> {
Ok(vec![self.min.clone()])
}
@@ -1092,7 +1092,7 @@ impl Accumulator for SlidingMinAccumulator {
self.update_batch(states)
}
- fn evaluate(&self) -> Result<ScalarValue> {
+ fn evaluate(&mut self) -> Result<ScalarValue> {
Ok(self.min.clone())
}
diff --git a/datafusion/physical-expr/src/aggregate/nth_value.rs
b/datafusion/physical-expr/src/aggregate/nth_value.rs
index 5a1ca90b7f..26a1254858 100644
--- a/datafusion/physical-expr/src/aggregate/nth_value.rs
+++ b/datafusion/physical-expr/src/aggregate/nth_value.rs
@@ -302,7 +302,7 @@ impl Accumulator for NthValueAccumulator {
Ok(())
}
- fn state(&self) -> Result<Vec<ScalarValue>> {
+ fn state(&mut self) -> Result<Vec<ScalarValue>> {
let mut result = vec![self.evaluate_values()];
if !self.ordering_req.is_empty() {
result.push(self.evaluate_orderings());
@@ -310,7 +310,7 @@ impl Accumulator for NthValueAccumulator {
Ok(result)
}
- fn evaluate(&self) -> Result<ScalarValue> {
+ fn evaluate(&mut self) -> Result<ScalarValue> {
let n_required = self.n.unsigned_abs() as usize;
let from_start = self.n > 0;
let nth_value_idx = if from_start {
diff --git a/datafusion/physical-expr/src/aggregate/regr.rs
b/datafusion/physical-expr/src/aggregate/regr.rs
index 6922cb131c..36e7b7c9b3 100644
--- a/datafusion/physical-expr/src/aggregate/regr.rs
+++ b/datafusion/physical-expr/src/aggregate/regr.rs
@@ -251,7 +251,7 @@ impl RegrAccumulator {
}
impl Accumulator for RegrAccumulator {
- fn state(&self) -> Result<Vec<ScalarValue>> {
+ fn state(&mut self) -> Result<Vec<ScalarValue>> {
Ok(vec![
ScalarValue::from(self.count),
ScalarValue::from(self.mean_x),
@@ -418,7 +418,7 @@ impl Accumulator for RegrAccumulator {
Ok(())
}
- fn evaluate(&self) -> Result<ScalarValue> {
+ fn evaluate(&mut self) -> Result<ScalarValue> {
let cov_pop_x_y = self.algo_const / self.count as f64;
let var_pop_x = self.m2_x / self.count as f64;
let var_pop_y = self.m2_y / self.count as f64;
diff --git a/datafusion/physical-expr/src/aggregate/stddev.rs
b/datafusion/physical-expr/src/aggregate/stddev.rs
index 64e19ef502..dcc2b0e69c 100644
--- a/datafusion/physical-expr/src/aggregate/stddev.rs
+++ b/datafusion/physical-expr/src/aggregate/stddev.rs
@@ -200,7 +200,7 @@ impl StddevAccumulator {
}
impl Accumulator for StddevAccumulator {
- fn state(&self) -> Result<Vec<ScalarValue>> {
+ fn state(&mut self) -> Result<Vec<ScalarValue>> {
Ok(vec![
ScalarValue::from(self.variance.get_count()),
ScalarValue::from(self.variance.get_mean()),
@@ -220,7 +220,7 @@ impl Accumulator for StddevAccumulator {
self.variance.merge_batch(states)
}
- fn evaluate(&self) -> Result<ScalarValue> {
+ fn evaluate(&mut self) -> Result<ScalarValue> {
let variance = self.variance.evaluate()?;
match variance {
ScalarValue::Float64(e) => {
@@ -459,7 +459,7 @@ mod tests {
.collect::<Result<Vec<_>>>()?;
accum1.update_batch(&values1)?;
accum2.update_batch(&values2)?;
- let state2 = get_accum_scalar_values_as_arrays(accum2.as_ref())?;
+ let state2 = get_accum_scalar_values_as_arrays(accum2.as_mut())?;
accum1.merge_batch(&state2)?;
accum1.evaluate()
}
diff --git a/datafusion/physical-expr/src/aggregate/string_agg.rs
b/datafusion/physical-expr/src/aggregate/string_agg.rs
index 7adc736932..7a1da6d622 100644
--- a/datafusion/physical-expr/src/aggregate/string_agg.rs
+++ b/datafusion/physical-expr/src/aggregate/string_agg.rs
@@ -153,11 +153,11 @@ impl Accumulator for StringAggAccumulator {
Ok(())
}
- fn state(&self) -> Result<Vec<ScalarValue>> {
+ fn state(&mut self) -> Result<Vec<ScalarValue>> {
Ok(vec![self.evaluate()?])
}
- fn evaluate(&self) -> Result<ScalarValue> {
+ fn evaluate(&mut self) -> Result<ScalarValue> {
Ok(ScalarValue::LargeUtf8(self.values.clone()))
}
diff --git a/datafusion/physical-expr/src/aggregate/sum.rs
b/datafusion/physical-expr/src/aggregate/sum.rs
index a770b3874c..6cf2810ce5 100644
--- a/datafusion/physical-expr/src/aggregate/sum.rs
+++ b/datafusion/physical-expr/src/aggregate/sum.rs
@@ -191,7 +191,7 @@ impl<T: ArrowNumericType> SumAccumulator<T> {
}
impl<T: ArrowNumericType> Accumulator for SumAccumulator<T> {
- fn state(&self) -> Result<Vec<ScalarValue>> {
+ fn state(&mut self) -> Result<Vec<ScalarValue>> {
Ok(vec![self.evaluate()?])
}
@@ -208,7 +208,7 @@ impl<T: ArrowNumericType> Accumulator for SumAccumulator<T>
{
self.update_batch(states)
}
- fn evaluate(&self) -> Result<ScalarValue> {
+ fn evaluate(&mut self) -> Result<ScalarValue> {
ScalarValue::new_primitive::<T>(self.sum, &self.data_type)
}
@@ -243,7 +243,7 @@ impl<T: ArrowNumericType> SlidingSumAccumulator<T> {
}
impl<T: ArrowNumericType> Accumulator for SlidingSumAccumulator<T> {
- fn state(&self) -> Result<Vec<ScalarValue>> {
+ fn state(&mut self) -> Result<Vec<ScalarValue>> {
Ok(vec![self.evaluate()?, self.count.into()])
}
@@ -267,7 +267,7 @@ impl<T: ArrowNumericType> Accumulator for
SlidingSumAccumulator<T> {
Ok(())
}
- fn evaluate(&self) -> Result<ScalarValue> {
+ fn evaluate(&mut self) -> Result<ScalarValue> {
let v = (self.count != 0).then_some(self.sum);
ScalarValue::new_primitive::<T>(v, &self.data_type)
}
diff --git a/datafusion/physical-expr/src/aggregate/sum_distinct.rs
b/datafusion/physical-expr/src/aggregate/sum_distinct.rs
index 6dbb392246..4c0f94b3a2 100644
--- a/datafusion/physical-expr/src/aggregate/sum_distinct.rs
+++ b/datafusion/physical-expr/src/aggregate/sum_distinct.rs
@@ -140,7 +140,7 @@ impl<T: ArrowPrimitiveType> DistinctSumAccumulator<T> {
}
impl<T: ArrowPrimitiveType> Accumulator for DistinctSumAccumulator<T> {
- fn state(&self) -> Result<Vec<ScalarValue>> {
+ fn state(&mut self) -> Result<Vec<ScalarValue>> {
// 1. Stores aggregate state in `ScalarValue::List`
// 2. Constructs `ScalarValue::List` state from distinct numeric
stored in hash set
let state_out = {
@@ -186,7 +186,7 @@ impl<T: ArrowPrimitiveType> Accumulator for
DistinctSumAccumulator<T> {
Ok(())
}
- fn evaluate(&self) -> Result<ScalarValue> {
+ fn evaluate(&mut self) -> Result<ScalarValue> {
let mut acc = T::Native::usize_as(0);
for distinct_value in self.values.iter() {
acc = acc.add_wrapping(distinct_value.0)
diff --git a/datafusion/physical-expr/src/aggregate/utils.rs
b/datafusion/physical-expr/src/aggregate/utils.rs
index 6dd586bfb8..60d59c16be 100644
--- a/datafusion/physical-expr/src/aggregate/utils.rs
+++ b/datafusion/physical-expr/src/aggregate/utils.rs
@@ -35,7 +35,7 @@ use datafusion_expr::Accumulator;
/// Convert scalar values from an accumulator into arrays.
pub fn get_accum_scalar_values_as_arrays(
- accum: &dyn Accumulator,
+ accum: &mut dyn Accumulator,
) -> Result<Vec<ArrayRef>> {
accum
.state()?
diff --git a/datafusion/physical-expr/src/aggregate/variance.rs
b/datafusion/physical-expr/src/aggregate/variance.rs
index d82c5ad562..94d7be4265 100644
--- a/datafusion/physical-expr/src/aggregate/variance.rs
+++ b/datafusion/physical-expr/src/aggregate/variance.rs
@@ -231,7 +231,7 @@ impl VarianceAccumulator {
}
impl Accumulator for VarianceAccumulator {
- fn state(&self) -> Result<Vec<ScalarValue>> {
+ fn state(&mut self) -> Result<Vec<ScalarValue>> {
Ok(vec![
ScalarValue::from(self.count),
ScalarValue::from(self.mean),
@@ -302,7 +302,7 @@ impl Accumulator for VarianceAccumulator {
Ok(())
}
- fn evaluate(&self) -> Result<ScalarValue> {
+ fn evaluate(&mut self) -> Result<ScalarValue> {
let count = match self.stats_type {
StatsType::Population => self.count,
StatsType::Sample => {
@@ -533,7 +533,7 @@ mod tests {
.collect::<Result<Vec<_>>>()?;
accum1.update_batch(&values1)?;
accum2.update_batch(&values2)?;
- let state2 = get_accum_scalar_values_as_arrays(accum2.as_ref())?;
+ let state2 = get_accum_scalar_values_as_arrays(accum2.as_mut())?;
accum1.merge_batch(&state2)?;
accum1.evaluate()
}
diff --git a/datafusion/physical-plan/src/aggregates/mod.rs
b/datafusion/physical-plan/src/aggregates/mod.rs
index d3ae0d5ce0..2d7a8cccc4 100644
--- a/datafusion/physical-plan/src/aggregates/mod.rs
+++ b/datafusion/physical-plan/src/aggregates/mod.rs
@@ -1087,14 +1087,14 @@ fn create_accumulators(
/// returns a vector of ArrayRefs, where each entry corresponds to either the
/// final value (mode = Final, FinalPartitioned and Single) or states (mode =
Partial)
fn finalize_aggregation(
- accumulators: &[AccumulatorItem],
+ accumulators: &mut [AccumulatorItem],
mode: &AggregateMode,
) -> Result<Vec<ArrayRef>> {
match mode {
AggregateMode::Partial => {
// Build the vector of states
accumulators
- .iter()
+ .iter_mut()
.map(|accumulator| {
accumulator.state().and_then(|e| {
e.iter()
@@ -1111,7 +1111,7 @@ fn finalize_aggregation(
| AggregateMode::SinglePartitioned => {
// Merge the state to the final value
accumulators
- .iter()
+ .iter_mut()
.map(|accumulator| accumulator.evaluate().and_then(|v|
v.to_array()))
.collect()
}
diff --git a/datafusion/physical-plan/src/aggregates/no_grouping.rs
b/datafusion/physical-plan/src/aggregates/no_grouping.rs
index 90eb488a2e..5ec95bd799 100644
--- a/datafusion/physical-plan/src/aggregates/no_grouping.rs
+++ b/datafusion/physical-plan/src/aggregates/no_grouping.rs
@@ -137,12 +137,13 @@ impl AggregateStream {
None => {
this.finished = true;
let timer =
this.baseline_metrics.elapsed_compute().timer();
- let result = finalize_aggregation(&this.accumulators,
&this.mode)
- .and_then(|columns| {
- RecordBatch::try_new(this.schema.clone(),
columns)
- .map_err(Into::into)
- })
- .record_output(&this.baseline_metrics);
+ let result =
+ finalize_aggregation(&mut this.accumulators,
&this.mode)
+ .and_then(|columns| {
+ RecordBatch::try_new(this.schema.clone(),
columns)
+ .map_err(Into::into)
+ })
+ .record_output(&this.baseline_metrics);
timer.done();
diff --git a/datafusion/proto/tests/cases/roundtrip_logical_plan.rs
b/datafusion/proto/tests/cases/roundtrip_logical_plan.rs
index cf991e524f..17d47a65d8 100644
--- a/datafusion/proto/tests/cases/roundtrip_logical_plan.rs
+++ b/datafusion/proto/tests/cases/roundtrip_logical_plan.rs
@@ -1577,7 +1577,7 @@ fn roundtrip_aggregate_udf() {
struct Dummy {}
impl Accumulator for Dummy {
- fn state(&self) -> datafusion::error::Result<Vec<ScalarValue>> {
+ fn state(&mut self) -> datafusion::error::Result<Vec<ScalarValue>> {
Ok(vec![])
}
@@ -1592,7 +1592,7 @@ fn roundtrip_aggregate_udf() {
Ok(())
}
- fn evaluate(&self) -> datafusion::error::Result<ScalarValue> {
+ fn evaluate(&mut self) -> datafusion::error::Result<ScalarValue> {
Ok(ScalarValue::Float64(None))
}
@@ -1764,7 +1764,7 @@ fn roundtrip_window() {
struct DummyAggr {}
impl Accumulator for DummyAggr {
- fn state(&self) -> datafusion::error::Result<Vec<ScalarValue>> {
+ fn state(&mut self) -> datafusion::error::Result<Vec<ScalarValue>> {
Ok(vec![])
}
@@ -1779,7 +1779,7 @@ fn roundtrip_window() {
Ok(())
}
- fn evaluate(&self) -> datafusion::error::Result<ScalarValue> {
+ fn evaluate(&mut self) -> datafusion::error::Result<ScalarValue> {
Ok(ScalarValue::Float64(None))
}
diff --git a/datafusion/proto/tests/cases/roundtrip_physical_plan.rs
b/datafusion/proto/tests/cases/roundtrip_physical_plan.rs
index 4f91713f48..9a95e103c2 100644
--- a/datafusion/proto/tests/cases/roundtrip_physical_plan.rs
+++ b/datafusion/proto/tests/cases/roundtrip_physical_plan.rs
@@ -353,7 +353,7 @@ fn roundtrip_aggregate_udaf() -> Result<()> {
#[derive(Debug)]
struct Example;
impl Accumulator for Example {
- fn state(&self) -> Result<Vec<ScalarValue>> {
+ fn state(&mut self) -> Result<Vec<ScalarValue>> {
Ok(vec![ScalarValue::Int64(Some(0))])
}
@@ -365,7 +365,7 @@ fn roundtrip_aggregate_udaf() -> Result<()> {
Ok(())
}
- fn evaluate(&self) -> Result<ScalarValue> {
+ fn evaluate(&mut self) -> Result<ScalarValue> {
Ok(ScalarValue::Int64(Some(0)))
}
diff --git a/datafusion/substrait/tests/cases/roundtrip_logical_plan.rs
b/datafusion/substrait/tests/cases/roundtrip_logical_plan.rs
index d7327caee4..79cf76de59 100644
--- a/datafusion/substrait/tests/cases/roundtrip_logical_plan.rs
+++ b/datafusion/substrait/tests/cases/roundtrip_logical_plan.rs
@@ -712,7 +712,7 @@ async fn roundtrip_aggregate_udf() -> Result<()> {
struct Dummy {}
impl Accumulator for Dummy {
- fn state(&self) -> datafusion::error::Result<Vec<ScalarValue>> {
+ fn state(&mut self) -> datafusion::error::Result<Vec<ScalarValue>> {
Ok(vec![])
}
@@ -727,7 +727,7 @@ async fn roundtrip_aggregate_udf() -> Result<()> {
Ok(())
}
- fn evaluate(&self) -> datafusion::error::Result<ScalarValue> {
+ fn evaluate(&mut self) -> datafusion::error::Result<ScalarValue> {
Ok(ScalarValue::Float64(None))
}