This is an automated email from the ASF dual-hosted git repository. alamb pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/datafusion.git
The following commit(s) were added to refs/heads/main by this push: new 8a2d61821a Fix array_agg memory over use (#16346) 8a2d61821a is described below commit 8a2d61821a6a9097dfabe2758642fd75dcc73aaa Author: Gabriel <45515538+gabote...@users.noreply.github.com> AuthorDate: Wed Jun 11 15:04:09 2025 +0200 Fix array_agg memory over use (#16346) * Fix array_agg memory over accounting * Add comment --- datafusion/common/src/scalar/mod.rs | 6 ++ datafusion/functions-aggregate/src/array_agg.rs | 83 +++++++++++++++++++++++-- 2 files changed, 84 insertions(+), 5 deletions(-) diff --git a/datafusion/common/src/scalar/mod.rs b/datafusion/common/src/scalar/mod.rs index 6316444dad..750b78d59e 100644 --- a/datafusion/common/src/scalar/mod.rs +++ b/datafusion/common/src/scalar/mod.rs @@ -3525,6 +3525,12 @@ impl ScalarValue { } } } + + /// Compacts ([ScalarValue::compact]) the current [ScalarValue] and returns it. + pub fn compacted(mut self) -> Self { + self.compact(); + self + } } /// Compacts the data of an `ArrayData` into a new `ArrayData`. diff --git a/datafusion/functions-aggregate/src/array_agg.rs b/datafusion/functions-aggregate/src/array_agg.rs index 8c426d1f88..4ec73e306e 100644 --- a/datafusion/functions-aggregate/src/array_agg.rs +++ b/datafusion/functions-aggregate/src/array_agg.rs @@ -23,12 +23,14 @@ use std::mem::{size_of, size_of_val}; use std::sync::Arc; use arrow::array::{ - new_empty_array, Array, ArrayRef, AsArray, BooleanArray, ListArray, StructArray, + make_array, new_empty_array, Array, ArrayRef, AsArray, BooleanArray, ListArray, + StructArray, }; use arrow::compute::{filter, SortOptions}; use arrow::datatypes::{DataType, Field, FieldRef, Fields}; use datafusion_common::cast::as_list_array; +use datafusion_common::scalar::copy_array_data; use datafusion_common::utils::{get_row_at_idx, SingleRowListArrayBuilder}; use datafusion_common::{exec_err, internal_err, Result, ScalarValue}; use datafusion_expr::function::{AccumulatorArgs, StateFieldsArgs}; @@ -313,7 +315,11 @@ impl Accumulator for ArrayAggAccumulator { }; if !val.is_empty() { - self.values.push(val); + // The ArrayRef might be holding a reference to its original input buffer, so + // storing it here directly copied/compacted avoids over accounting memory + // not used here. + self.values + .push(make_array(copy_array_data(&val.to_data()))); } Ok(()) @@ -423,7 +429,8 @@ impl Accumulator for DistinctArrayAggAccumulator { if nulls.is_none_or(|nulls| nulls.null_count() < val.len()) { for i in 0..val.len() { if nulls.is_none_or(|nulls| nulls.is_valid(i)) { - self.values.insert(ScalarValue::try_from_array(val, i)?); + self.values + .insert(ScalarValue::try_from_array(val, i)?.compacted()); } } } @@ -577,8 +584,14 @@ impl Accumulator for OrderSensitiveArrayAggAccumulator { if nulls.is_none_or(|nulls| nulls.null_count() < val.len()) { for i in 0..val.len() { if nulls.is_none_or(|nulls| nulls.is_valid(i)) { - self.values.push(ScalarValue::try_from_array(val, i)?); - self.ordering_values.push(get_row_at_idx(ord, i)?) + self.values + .push(ScalarValue::try_from_array(val, i)?.compacted()); + self.ordering_values.push( + get_row_at_idx(ord, i)? + .into_iter() + .map(|v| v.compacted()) + .collect(), + ) } } } @@ -714,6 +727,7 @@ impl Accumulator for OrderSensitiveArrayAggAccumulator { #[cfg(test)] mod tests { use super::*; + use arrow::array::{ListBuilder, StringBuilder}; use arrow::datatypes::{FieldRef, Schema}; use datafusion_common::cast::as_generic_string_array; use datafusion_common::internal_err; @@ -980,6 +994,56 @@ mod tests { Ok(()) } + #[test] + fn does_not_over_account_memory() -> Result<()> { + let (mut acc1, mut acc2) = ArrayAggAccumulatorBuilder::string().build_two()?; + + acc1.update_batch(&[data(["a", "c", "b"])])?; + acc2.update_batch(&[data(["b", "c", "a"])])?; + acc1 = merge(acc1, acc2)?; + + // without compaction, the size is 2652. + assert_eq!(acc1.size(), 732); + + Ok(()) + } + #[test] + fn does_not_over_account_memory_distinct() -> Result<()> { + let (mut acc1, mut acc2) = ArrayAggAccumulatorBuilder::string() + .distinct() + .build_two()?; + + acc1.update_batch(&[string_list_data([ + vec!["a", "b", "c"], + vec!["d", "e", "f"], + ])])?; + acc2.update_batch(&[string_list_data([vec!["e", "f", "g"]])])?; + acc1 = merge(acc1, acc2)?; + + // without compaction, the size is 16660 + assert_eq!(acc1.size(), 1660); + + Ok(()) + } + + #[test] + fn does_not_over_account_memory_ordered() -> Result<()> { + let mut acc = ArrayAggAccumulatorBuilder::string() + .order_by_col("col", SortOptions::new(false, false)) + .build()?; + + acc.update_batch(&[string_list_data([ + vec!["a", "b", "c"], + vec!["c", "d", "e"], + vec!["b", "c", "d"], + ])])?; + + // without compaction, the size is 17112 + assert_eq!(acc.size(), 2112); + + Ok(()) + } + struct ArrayAggAccumulatorBuilder { return_field: FieldRef, distinct: bool, @@ -1059,6 +1123,15 @@ mod tests { .collect() } + fn string_list_data<'a>(data: impl IntoIterator<Item = Vec<&'a str>>) -> ArrayRef { + let mut builder = ListBuilder::new(StringBuilder::new()); + for string_list in data.into_iter() { + builder.append_value(string_list.iter().map(Some).collect::<Vec<_>>()); + } + + Arc::new(builder.finish()) + } + fn data<T, const N: usize>(list: [T; N]) -> ArrayRef where ScalarValue: From<T>, --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@datafusion.apache.org For additional commands, e-mail: commits-h...@datafusion.apache.org