This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new 8a2d61821a Fix array_agg memory over use (#16346)
8a2d61821a is described below

commit 8a2d61821a6a9097dfabe2758642fd75dcc73aaa
Author: Gabriel <45515538+gabote...@users.noreply.github.com>
AuthorDate: Wed Jun 11 15:04:09 2025 +0200

    Fix array_agg memory over use (#16346)
    
    * Fix array_agg memory over accounting
    
    * Add comment
---
 datafusion/common/src/scalar/mod.rs             |  6 ++
 datafusion/functions-aggregate/src/array_agg.rs | 83 +++++++++++++++++++++++--
 2 files changed, 84 insertions(+), 5 deletions(-)

diff --git a/datafusion/common/src/scalar/mod.rs 
b/datafusion/common/src/scalar/mod.rs
index 6316444dad..750b78d59e 100644
--- a/datafusion/common/src/scalar/mod.rs
+++ b/datafusion/common/src/scalar/mod.rs
@@ -3525,6 +3525,12 @@ impl ScalarValue {
             }
         }
     }
+
+    /// Compacts ([ScalarValue::compact]) the current [ScalarValue] and 
returns it.
+    pub fn compacted(mut self) -> Self {
+        self.compact();
+        self
+    }
 }
 
 /// Compacts the data of an `ArrayData` into a new `ArrayData`.
diff --git a/datafusion/functions-aggregate/src/array_agg.rs 
b/datafusion/functions-aggregate/src/array_agg.rs
index 8c426d1f88..4ec73e306e 100644
--- a/datafusion/functions-aggregate/src/array_agg.rs
+++ b/datafusion/functions-aggregate/src/array_agg.rs
@@ -23,12 +23,14 @@ use std::mem::{size_of, size_of_val};
 use std::sync::Arc;
 
 use arrow::array::{
-    new_empty_array, Array, ArrayRef, AsArray, BooleanArray, ListArray, 
StructArray,
+    make_array, new_empty_array, Array, ArrayRef, AsArray, BooleanArray, 
ListArray,
+    StructArray,
 };
 use arrow::compute::{filter, SortOptions};
 use arrow::datatypes::{DataType, Field, FieldRef, Fields};
 
 use datafusion_common::cast::as_list_array;
+use datafusion_common::scalar::copy_array_data;
 use datafusion_common::utils::{get_row_at_idx, SingleRowListArrayBuilder};
 use datafusion_common::{exec_err, internal_err, Result, ScalarValue};
 use datafusion_expr::function::{AccumulatorArgs, StateFieldsArgs};
@@ -313,7 +315,11 @@ impl Accumulator for ArrayAggAccumulator {
         };
 
         if !val.is_empty() {
-            self.values.push(val);
+            // The ArrayRef might be holding a reference to its original input 
buffer, so
+            // storing it here directly copied/compacted avoids over 
accounting memory
+            // not used here.
+            self.values
+                .push(make_array(copy_array_data(&val.to_data())));
         }
 
         Ok(())
@@ -423,7 +429,8 @@ impl Accumulator for DistinctArrayAggAccumulator {
         if nulls.is_none_or(|nulls| nulls.null_count() < val.len()) {
             for i in 0..val.len() {
                 if nulls.is_none_or(|nulls| nulls.is_valid(i)) {
-                    self.values.insert(ScalarValue::try_from_array(val, i)?);
+                    self.values
+                        .insert(ScalarValue::try_from_array(val, 
i)?.compacted());
                 }
             }
         }
@@ -577,8 +584,14 @@ impl Accumulator for OrderSensitiveArrayAggAccumulator {
         if nulls.is_none_or(|nulls| nulls.null_count() < val.len()) {
             for i in 0..val.len() {
                 if nulls.is_none_or(|nulls| nulls.is_valid(i)) {
-                    self.values.push(ScalarValue::try_from_array(val, i)?);
-                    self.ordering_values.push(get_row_at_idx(ord, i)?)
+                    self.values
+                        .push(ScalarValue::try_from_array(val, 
i)?.compacted());
+                    self.ordering_values.push(
+                        get_row_at_idx(ord, i)?
+                            .into_iter()
+                            .map(|v| v.compacted())
+                            .collect(),
+                    )
                 }
             }
         }
@@ -714,6 +727,7 @@ impl Accumulator for OrderSensitiveArrayAggAccumulator {
 #[cfg(test)]
 mod tests {
     use super::*;
+    use arrow::array::{ListBuilder, StringBuilder};
     use arrow::datatypes::{FieldRef, Schema};
     use datafusion_common::cast::as_generic_string_array;
     use datafusion_common::internal_err;
@@ -980,6 +994,56 @@ mod tests {
         Ok(())
     }
 
+    #[test]
+    fn does_not_over_account_memory() -> Result<()> {
+        let (mut acc1, mut acc2) = 
ArrayAggAccumulatorBuilder::string().build_two()?;
+
+        acc1.update_batch(&[data(["a", "c", "b"])])?;
+        acc2.update_batch(&[data(["b", "c", "a"])])?;
+        acc1 = merge(acc1, acc2)?;
+
+        // without compaction, the size is 2652.
+        assert_eq!(acc1.size(), 732);
+
+        Ok(())
+    }
+    #[test]
+    fn does_not_over_account_memory_distinct() -> Result<()> {
+        let (mut acc1, mut acc2) = ArrayAggAccumulatorBuilder::string()
+            .distinct()
+            .build_two()?;
+
+        acc1.update_batch(&[string_list_data([
+            vec!["a", "b", "c"],
+            vec!["d", "e", "f"],
+        ])])?;
+        acc2.update_batch(&[string_list_data([vec!["e", "f", "g"]])])?;
+        acc1 = merge(acc1, acc2)?;
+
+        // without compaction, the size is 16660
+        assert_eq!(acc1.size(), 1660);
+
+        Ok(())
+    }
+
+    #[test]
+    fn does_not_over_account_memory_ordered() -> Result<()> {
+        let mut acc = ArrayAggAccumulatorBuilder::string()
+            .order_by_col("col", SortOptions::new(false, false))
+            .build()?;
+
+        acc.update_batch(&[string_list_data([
+            vec!["a", "b", "c"],
+            vec!["c", "d", "e"],
+            vec!["b", "c", "d"],
+        ])])?;
+
+        // without compaction, the size is 17112
+        assert_eq!(acc.size(), 2112);
+
+        Ok(())
+    }
+
     struct ArrayAggAccumulatorBuilder {
         return_field: FieldRef,
         distinct: bool,
@@ -1059,6 +1123,15 @@ mod tests {
             .collect()
     }
 
+    fn string_list_data<'a>(data: impl IntoIterator<Item = Vec<&'a str>>) -> 
ArrayRef {
+        let mut builder = ListBuilder::new(StringBuilder::new());
+        for string_list in data.into_iter() {
+            
builder.append_value(string_list.iter().map(Some).collect::<Vec<_>>());
+        }
+
+        Arc::new(builder.finish())
+    }
+
     fn data<T, const N: usize>(list: [T; N]) -> ArrayRef
     where
         ScalarValue: From<T>,


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@datafusion.apache.org
For additional commands, e-mail: commits-h...@datafusion.apache.org

Reply via email to