This is an automated email from the ASF dual-hosted git repository. ytyou pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/datafusion.git
The following commit(s) were added to refs/heads/main by this push: new 79c4c057e6 Address memory over-accounting in array_agg (#16816) 79c4c057e6 is described below commit 79c4c057e67623e86a7cbe2abb1210fc71fa94e3 Author: Gabriel <45515538+gabote...@users.noreply.github.com> AuthorDate: Wed Aug 6 04:04:31 2025 +0200 Address memory over-accounting in array_agg (#16816) * Use get_slice_memory_size() instead of get_array_memory_size() for measuring array_agg accumulator size * Add comment explaining the rationale for using `.get_slice_memory_size()` --- datafusion/functions-aggregate/src/array_agg.rs | 26 ++++++++++++++----------- 1 file changed, 15 insertions(+), 11 deletions(-) diff --git a/datafusion/functions-aggregate/src/array_agg.rs b/datafusion/functions-aggregate/src/array_agg.rs index 5f5738d153..3ada331040 100644 --- a/datafusion/functions-aggregate/src/array_agg.rs +++ b/datafusion/functions-aggregate/src/array_agg.rs @@ -23,14 +23,12 @@ use std::mem::{size_of, size_of_val, take}; use std::sync::Arc; use arrow::array::{ - make_array, new_empty_array, Array, ArrayRef, AsArray, BooleanArray, ListArray, - StructArray, + new_empty_array, Array, ArrayRef, AsArray, BooleanArray, ListArray, StructArray, }; use arrow::compute::{filter, SortOptions}; use arrow::datatypes::{DataType, Field, FieldRef, Fields}; use datafusion_common::cast::as_list_array; -use datafusion_common::scalar::copy_array_data; use datafusion_common::utils::{ compare_rows, get_row_at_idx, take_function_args, SingleRowListArrayBuilder, }; @@ -335,11 +333,7 @@ impl Accumulator for ArrayAggAccumulator { }; if !val.is_empty() { - // The ArrayRef might be holding a reference to its original input buffer, so - // storing it here directly copied/compacted avoids over accounting memory - // not used here. - self.values - .push(make_array(copy_array_data(&val.to_data()))); + self.values.push(val) } Ok(()) @@ -398,7 +392,18 @@ impl Accumulator for ArrayAggAccumulator { + self .values .iter() - .map(|arr| arr.get_array_memory_size()) + // Each ArrayRef might be just a reference to a bigger array, and many + // ArrayRefs here might be referencing exactly the same array, so if we + // were to call `arr.get_array_memory_size()`, we would be double-counting + // the same underlying data many times. + // + // Instead, we do an approximation by estimating how much memory each + // ArrayRef would occupy if its underlying data was fully owned by this + // accumulator. + // + // Note that this is just an estimation, but the reality is that this + // accumulator might not own any data. + .map(|arr| arr.to_data().get_slice_memory_size().unwrap_or_default()) .sum::<usize>() + self.datatype.size() - size_of_val(&self.datatype) @@ -1064,8 +1069,7 @@ mod tests { acc2.update_batch(&[data(["b", "c", "a"])])?; acc1 = merge(acc1, acc2)?; - // without compaction, the size is 2652. - assert_eq!(acc1.size(), 732); + assert_eq!(acc1.size(), 266); Ok(()) } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@datafusion.apache.org For additional commands, e-mail: commits-h...@datafusion.apache.org