This is an automated email from the ASF dual-hosted git repository.

ytyou pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new 79c4c057e6 Address memory over-accounting in array_agg (#16816)
79c4c057e6 is described below

commit 79c4c057e67623e86a7cbe2abb1210fc71fa94e3
Author: Gabriel <45515538+gabote...@users.noreply.github.com>
AuthorDate: Wed Aug 6 04:04:31 2025 +0200

    Address memory over-accounting in array_agg (#16816)
    
    * Use get_slice_memory_size() instead of get_array_memory_size() for 
measuring array_agg accumulator size
    
    * Add comment explaining the rationale for using `.get_slice_memory_size()`
---
 datafusion/functions-aggregate/src/array_agg.rs | 26 ++++++++++++++-----------
 1 file changed, 15 insertions(+), 11 deletions(-)

diff --git a/datafusion/functions-aggregate/src/array_agg.rs 
b/datafusion/functions-aggregate/src/array_agg.rs
index 5f5738d153..3ada331040 100644
--- a/datafusion/functions-aggregate/src/array_agg.rs
+++ b/datafusion/functions-aggregate/src/array_agg.rs
@@ -23,14 +23,12 @@ use std::mem::{size_of, size_of_val, take};
 use std::sync::Arc;
 
 use arrow::array::{
-    make_array, new_empty_array, Array, ArrayRef, AsArray, BooleanArray, 
ListArray,
-    StructArray,
+    new_empty_array, Array, ArrayRef, AsArray, BooleanArray, ListArray, 
StructArray,
 };
 use arrow::compute::{filter, SortOptions};
 use arrow::datatypes::{DataType, Field, FieldRef, Fields};
 
 use datafusion_common::cast::as_list_array;
-use datafusion_common::scalar::copy_array_data;
 use datafusion_common::utils::{
     compare_rows, get_row_at_idx, take_function_args, 
SingleRowListArrayBuilder,
 };
@@ -335,11 +333,7 @@ impl Accumulator for ArrayAggAccumulator {
         };
 
         if !val.is_empty() {
-            // The ArrayRef might be holding a reference to its original input 
buffer, so
-            // storing it here directly copied/compacted avoids over 
accounting memory
-            // not used here.
-            self.values
-                .push(make_array(copy_array_data(&val.to_data())));
+            self.values.push(val)
         }
 
         Ok(())
@@ -398,7 +392,18 @@ impl Accumulator for ArrayAggAccumulator {
             + self
                 .values
                 .iter()
-                .map(|arr| arr.get_array_memory_size())
+                // Each ArrayRef might be just a reference to a bigger array, 
and many
+                // ArrayRefs here might be referencing exactly the same array, 
so if we
+                // were to call `arr.get_array_memory_size()`, we would be 
double-counting
+                // the same underlying data many times.
+                //
+                // Instead, we do an approximation by estimating how much 
memory each
+                // ArrayRef would occupy if its underlying data was fully 
owned by this
+                // accumulator.
+                //
+                // Note that this is just an estimation, but the reality is 
that this
+                // accumulator might not own any data.
+                .map(|arr| 
arr.to_data().get_slice_memory_size().unwrap_or_default())
                 .sum::<usize>()
             + self.datatype.size()
             - size_of_val(&self.datatype)
@@ -1064,8 +1069,7 @@ mod tests {
         acc2.update_batch(&[data(["b", "c", "a"])])?;
         acc1 = merge(acc1, acc2)?;
 
-        // without compaction, the size is 2652.
-        assert_eq!(acc1.size(), 732);
+        assert_eq!(acc1.size(), 266);
 
         Ok(())
     }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@datafusion.apache.org
For additional commands, e-mail: commits-h...@datafusion.apache.org

Reply via email to