This is an automated email from the ASF dual-hosted git repository.

github-bot pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new 3a23bb2531 perf: Optimize `array_agg()` using `GroupsAccumulator` 
(#20504)
3a23bb2531 is described below

commit 3a23bb2531e9276607e71836e1c78ae2214085e9
Author: Neil Conway <[email protected]>
AuthorDate: Sat Feb 28 08:21:11 2026 -0500

    perf: Optimize `array_agg()` using `GroupsAccumulator` (#20504)
    
    ## Which issue does this PR close?
    
    - Closes #20465.
    - Closes #17446.
    
    ## Rationale for this change
    
    This PR optimizes the performance of `array_agg()` by adding support for
    the `GroupsAccumulator` API.
    
    The design tries to minimize the amount of per-batch work done in
    `update_batch()`: we store a reference to the batch, and a `(group_idx,
    row_idx)` pair for each row. In `evaluate()`, we assemble all the
    requested output with a single `interleave` call.
    
    This turns out to be significantly faster, because we copy much less
    data and assembling the results can be vectorized more effectively. For
    example, on a benchmark with 5000 groups and 5000 int64 values per
    group, this approach is roughly 190x faster than the previous approach.
    
    Releasing memory after a partial emit is a little more involved than the
    previous approach, but with some determination it is still possible.
    
    ## What changes are included in this PR?
    
    * Implement the `GroupsAccumulator` API for `array_agg()`
    * Add benchmark for `array_agg` of a named struct over a dict, following
    the workload in #17446
    * Add unit tests
    * Improve SLT test coverage
    * Remove a redundant SLT test
    
    ## Are these changes tested?
    
    Yes, and benchmarked.
    
    ## Are there any user-facing changes?
    
    No.
    
    ## AI usage
    
    Iterated with the help of multiple AI tools; I've reviewed and
    understand the resulting code.
---
 datafusion/core/benches/aggregate_query_sql.rs     |  11 +
 datafusion/core/benches/data_utils/mod.rs          |  18 +-
 .../src/aggregate/groups_accumulator/nulls.rs      |   2 +-
 datafusion/functions-aggregate/src/array_agg.rs    | 718 ++++++++++++++++++++-
 .../test_files/aggregate_skip_partial.slt          |  41 +-
 5 files changed, 773 insertions(+), 17 deletions(-)

diff --git a/datafusion/core/benches/aggregate_query_sql.rs 
b/datafusion/core/benches/aggregate_query_sql.rs
index b47512e5e9..402ac9c717 100644
--- a/datafusion/core/benches/aggregate_query_sql.rs
+++ b/datafusion/core/benches/aggregate_query_sql.rs
@@ -284,6 +284,17 @@ fn criterion_benchmark(c: &mut Criterion) {
             )
         })
     });
+
+    c.bench_function("array_agg_struct_query_group_by_mid_groups", |b| {
+        b.iter(|| {
+            query(
+                ctx.clone(),
+                &rt,
+                "SELECT u64_mid, array_agg(named_struct('market', dict10, 
'price', f64)) \
+                 FROM t GROUP BY u64_mid",
+            )
+        })
+    });
 }
 
 criterion_group!(benches, criterion_benchmark);
diff --git a/datafusion/core/benches/data_utils/mod.rs 
b/datafusion/core/benches/data_utils/mod.rs
index a30ada4205..728c6490c7 100644
--- a/datafusion/core/benches/data_utils/mod.rs
+++ b/datafusion/core/benches/data_utils/mod.rs
@@ -20,8 +20,9 @@
 use arrow::array::{
     ArrayRef, Float32Array, Float64Array, RecordBatch, StringArray, 
StringViewBuilder,
     UInt64Array,
-    builder::{Int64Builder, StringBuilder},
+    builder::{Int64Builder, StringBuilder, StringDictionaryBuilder},
 };
+use arrow::datatypes::Int32Type;
 use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
 use datafusion::datasource::MemTable;
 use datafusion::error::Result;
@@ -65,6 +66,11 @@ pub fn create_schema() -> Schema {
         // Integers randomly selected from a narrow range of values such that
         // there are a few distinct values, but they are repeated often.
         Field::new("u64_narrow", DataType::UInt64, false),
+        Field::new(
+            "dict10",
+            DataType::Dictionary(Box::new(DataType::Int32), 
Box::new(DataType::Utf8)),
+            true,
+        ),
     ])
 }
 
@@ -109,6 +115,15 @@ fn create_record_batch(
         .map(|_| rng.random_range(0..10))
         .collect::<Vec<_>>();
 
+    let mut dict_builder = StringDictionaryBuilder::<Int32Type>::new();
+    for _ in 0..batch_size {
+        if rng.random::<f64>() > 0.9 {
+            dict_builder.append_null();
+        } else {
+            dict_builder.append_value(format!("market_{}", 
rng.random_range(0..10)));
+        }
+    }
+
     RecordBatch::try_new(
         schema,
         vec![
@@ -118,6 +133,7 @@ fn create_record_batch(
             Arc::new(UInt64Array::from(integer_values_wide)),
             Arc::new(UInt64Array::from(integer_values_mid)),
             Arc::new(UInt64Array::from(integer_values_narrow)),
+            Arc::new(dict_builder.finish()),
         ],
     )
     .unwrap()
diff --git 
a/datafusion/functions-aggregate-common/src/aggregate/groups_accumulator/nulls.rs
 
b/datafusion/functions-aggregate-common/src/aggregate/groups_accumulator/nulls.rs
index 74d361cf25..435560721c 100644
--- 
a/datafusion/functions-aggregate-common/src/aggregate/groups_accumulator/nulls.rs
+++ 
b/datafusion/functions-aggregate-common/src/aggregate/groups_accumulator/nulls.rs
@@ -44,7 +44,7 @@ pub fn set_nulls<T: ArrowNumericType + Send>(
 /// The `NullBuffer` is
 /// * `true` (representing valid) for values that were `true` in filter
 /// * `false` (representing null) for values that were `false` or `null` in 
filter
-fn filter_to_nulls(filter: &BooleanArray) -> Option<NullBuffer> {
+pub fn filter_to_nulls(filter: &BooleanArray) -> Option<NullBuffer> {
     let (filter_bools, filter_nulls) = filter.clone().into_parts();
     let filter_bools = NullBuffer::from(filter_bools);
     NullBuffer::union(Some(&filter_bools), filter_nulls.as_ref())
diff --git a/datafusion/functions-aggregate/src/array_agg.rs 
b/datafusion/functions-aggregate/src/array_agg.rs
index c07958a858..cd4cb9b19f 100644
--- a/datafusion/functions-aggregate/src/array_agg.rs
+++ b/datafusion/functions-aggregate/src/array_agg.rs
@@ -23,8 +23,10 @@ use std::mem::{size_of, size_of_val, take};
 use std::sync::Arc;
 
 use arrow::array::{
-    Array, ArrayRef, AsArray, BooleanArray, ListArray, StructArray, 
new_empty_array,
+    Array, ArrayRef, AsArray, BooleanArray, ListArray, NullBufferBuilder, 
StructArray,
+    UInt32Array, new_empty_array,
 };
+use arrow::buffer::{NullBuffer, OffsetBuffer, ScalarBuffer};
 use arrow::compute::{SortOptions, filter};
 use arrow::datatypes::{DataType, Field, FieldRef, Fields};
 
@@ -36,8 +38,10 @@ use datafusion_common::{Result, ScalarValue, 
assert_eq_or_internal_err, exec_err
 use datafusion_expr::function::{AccumulatorArgs, StateFieldsArgs};
 use datafusion_expr::utils::format_state_name;
 use datafusion_expr::{
-    Accumulator, AggregateUDFImpl, Documentation, Signature, Volatility,
+    Accumulator, AggregateUDFImpl, Documentation, EmitTo, GroupsAccumulator, 
Signature,
+    Volatility,
 };
+use 
datafusion_functions_aggregate_common::aggregate::groups_accumulator::nulls::filter_to_nulls;
 use datafusion_functions_aggregate_common::merge_arrays::merge_ordered_arrays;
 use datafusion_functions_aggregate_common::order::AggregateOrderSensitivity;
 use datafusion_functions_aggregate_common::utils::ordering_fields;
@@ -228,6 +232,23 @@ impl AggregateUDFImpl for ArrayAgg {
         datafusion_expr::ReversedUDAF::Reversed(array_agg_udaf())
     }
 
+    fn groups_accumulator_supported(&self, args: AccumulatorArgs) -> bool {
+        !args.is_distinct && args.order_bys.is_empty()
+    }
+
+    fn create_groups_accumulator(
+        &self,
+        args: AccumulatorArgs,
+    ) -> Result<Box<dyn GroupsAccumulator>> {
+        let field = &args.expr_fields[0];
+        let data_type = field.data_type().clone();
+        let ignore_nulls = args.ignore_nulls && field.is_nullable();
+        Ok(Box::new(ArrayAggGroupsAccumulator::new(
+            data_type,
+            ignore_nulls,
+        )))
+    }
+
     fn supports_null_handling_clause(&self) -> bool {
         true
     }
@@ -414,6 +435,331 @@ impl Accumulator for ArrayAggAccumulator {
     }
 }
 
+#[derive(Debug)]
+struct ArrayAggGroupsAccumulator {
+    datatype: DataType,
+    ignore_nulls: bool,
+    /// Source arrays — input arrays (from update_batch) or list backing
+    /// arrays (from merge_batch).
+    batches: Vec<ArrayRef>,
+    /// Per-batch list of (group_idx, row_idx) pairs.
+    batch_entries: Vec<Vec<(u32, u32)>>,
+    /// Total number of groups tracked.
+    num_groups: usize,
+}
+
+impl ArrayAggGroupsAccumulator {
+    fn new(datatype: DataType, ignore_nulls: bool) -> Self {
+        Self {
+            datatype,
+            ignore_nulls,
+            batches: Vec::new(),
+            batch_entries: Vec::new(),
+            num_groups: 0,
+        }
+    }
+
+    fn clear_state(&mut self) {
+        // `size()` measures Vec capacity rather than len, so allocate new
+        // buffers instead of using `clear()`.
+        self.batches = Vec::new();
+        self.batch_entries = Vec::new();
+        self.num_groups = 0;
+    }
+
+    fn compact_retained_state(&mut self, emit_groups: usize) -> Result<()> {
+        // EmitTo::First is used to recover from memory pressure. Simply
+        // removing emitted entries in place is not enough because mixed 
batches
+        // would continue to pin their original Array arrays, even if only a 
few
+        // retained rows remain.
+        //
+        // Rebuild the retained state from scratch so fully emitted batches are
+        // dropped, mixed batches are compacted to arrays containing only the
+        // surviving rows, and retained metadata is right-sized.
+        let emit_groups = emit_groups as u32;
+        let old_batches = take(&mut self.batches);
+        let old_batch_entries = take(&mut self.batch_entries);
+
+        let mut batches = Vec::new();
+        let mut batch_entries = Vec::new();
+
+        for (batch, entries) in old_batches.into_iter().zip(old_batch_entries) 
{
+            let retained_len = entries.iter().filter(|(g, _)| *g >= 
emit_groups).count();
+
+            if retained_len == 0 {
+                continue;
+            }
+
+            if retained_len == entries.len() {
+                // Nothing was emitted from this batch, so we keep the existing
+                // array and only renumber the remaining group IDs so that they
+                // start from 0.
+                let mut retained_entries = entries;
+                for (g, _) in &mut retained_entries {
+                    *g -= emit_groups;
+                }
+                retained_entries.shrink_to_fit();
+                batches.push(batch);
+                batch_entries.push(retained_entries);
+                continue;
+            }
+
+            let mut retained_entries = Vec::with_capacity(retained_len);
+            let mut retained_rows = Vec::with_capacity(retained_len);
+
+            for (g, r) in entries {
+                if g >= emit_groups {
+                    // Compute the new `(group_idx, row_idx)` pair for a
+                    // retained row. `group_idx` is renumbered to start from
+                    // 0, and `row_idx` points into the new dense batch we are
+                    // building.
+                    retained_entries.push((g - emit_groups, 
retained_rows.len() as u32));
+                    retained_rows.push(r);
+                }
+            }
+
+            debug_assert_eq!(retained_entries.len(), retained_len);
+            debug_assert_eq!(retained_rows.len(), retained_len);
+
+            let batch = if retained_len == batch.len() {
+                batch
+            } else {
+                // Compact mixed batches so retained rows no longer pin the
+                // original array.
+                let retained_rows = UInt32Array::from(retained_rows);
+                arrow::compute::take(batch.as_ref(), &retained_rows, None)?
+            };
+
+            batches.push(batch);
+            batch_entries.push(retained_entries);
+        }
+
+        self.batches = batches;
+        self.batch_entries = batch_entries;
+        self.num_groups -= emit_groups as usize;
+
+        Ok(())
+    }
+}
+
+impl GroupsAccumulator for ArrayAggGroupsAccumulator {
+    /// Store a reference to the input batch, plus a `(group_idx, row_idx)` 
pair
+    /// for every row.
+    fn update_batch(
+        &mut self,
+        values: &[ArrayRef],
+        group_indices: &[usize],
+        opt_filter: Option<&BooleanArray>,
+        total_num_groups: usize,
+    ) -> Result<()> {
+        assert_eq!(values.len(), 1, "single argument to update_batch");
+        let input = &values[0];
+
+        self.num_groups = self.num_groups.max(total_num_groups);
+
+        let nulls = if self.ignore_nulls {
+            input.logical_nulls()
+        } else {
+            None
+        };
+
+        let mut entries = Vec::new();
+
+        for (row_idx, &group_idx) in group_indices.iter().enumerate() {
+            // Skip filtered rows
+            if let Some(filter) = opt_filter
+                && (filter.is_null(row_idx) || !filter.value(row_idx))
+            {
+                continue;
+            }
+
+            // Skip null values when ignore_nulls is set
+            if let Some(ref nulls) = nulls
+                && nulls.is_null(row_idx)
+            {
+                continue;
+            }
+
+            entries.push((group_idx as u32, row_idx as u32));
+        }
+
+        // We only need to record the batch if it was non-empty.
+        if !entries.is_empty() {
+            self.batches.push(Arc::clone(input));
+            self.batch_entries.push(entries);
+        }
+
+        Ok(())
+    }
+
+    /// Produce a `ListArray` ordered by group index: the list at
+    /// position N contains the aggregated values for group N.
+    ///
+    /// Uses a counting sort to rearrange the stored `(group, row)`
+    /// entries into group order, then calls `interleave` to gather
+    /// the values into a flat array that backs the output `ListArray`.
+    fn evaluate(&mut self, emit_to: EmitTo) -> Result<ArrayRef> {
+        let emit_groups = match emit_to {
+            EmitTo::All => self.num_groups,
+            EmitTo::First(n) => n,
+        };
+
+        // Step 1: Count entries per group. For EmitTo::First(n), only groups
+        // 0..n are counted; the rest are retained to be emitted in the future.
+        let mut counts = vec![0u32; emit_groups];
+        for entries in &self.batch_entries {
+            for &(g, _) in entries {
+                let g = g as usize;
+                if g < emit_groups {
+                    counts[g] += 1;
+                }
+            }
+        }
+
+        // Step 2: Do a prefix sum over the counts and use it to build 
ListArray
+        // offsets, null buffer, and write positions for the counting sort.
+        let mut offsets = Vec::<i32>::with_capacity(emit_groups + 1);
+        offsets.push(0);
+        let mut nulls_builder = NullBufferBuilder::new(emit_groups);
+        let mut write_positions = Vec::with_capacity(emit_groups);
+        let mut cur_offset = 0u32;
+        for &count in &counts {
+            if count == 0 {
+                nulls_builder.append_null();
+            } else {
+                nulls_builder.append_non_null();
+            }
+            write_positions.push(cur_offset);
+            cur_offset += count;
+            offsets.push(cur_offset as i32);
+        }
+        let total_rows = cur_offset as usize;
+
+        // Step 3: Scatter entries into group order using the counting sort. 
The
+        // batch index is implicit from the outer loop position.
+        let flat_values = if total_rows == 0 {
+            new_empty_array(&self.datatype)
+        } else {
+            let mut interleave_indices = vec![(0usize, 0usize); total_rows];
+            for (batch_idx, entries) in self.batch_entries.iter().enumerate() {
+                for &(g, r) in entries {
+                    let g = g as usize;
+                    if g < emit_groups {
+                        let wp = write_positions[g] as usize;
+                        interleave_indices[wp] = (batch_idx, r as usize);
+                        write_positions[g] += 1;
+                    }
+                }
+            }
+
+            let sources: Vec<&dyn Array> =
+                self.batches.iter().map(|b| b.as_ref()).collect();
+            arrow::compute::interleave(&sources, &interleave_indices)?
+        };
+
+        // Step 4: Release state for emitted groups.
+        match emit_to {
+            EmitTo::All => self.clear_state(),
+            EmitTo::First(_) => self.compact_retained_state(emit_groups)?,
+        }
+
+        let offsets = OffsetBuffer::new(ScalarBuffer::from(offsets));
+        let field = Arc::new(Field::new_list_field(self.datatype.clone(), 
true));
+        let result = ListArray::new(field, offsets, flat_values, 
nulls_builder.finish());
+
+        Ok(Arc::new(result))
+    }
+
+    fn state(&mut self, emit_to: EmitTo) -> Result<Vec<ArrayRef>> {
+        Ok(vec![self.evaluate(emit_to)?])
+    }
+
+    fn merge_batch(
+        &mut self,
+        values: &[ArrayRef],
+        group_indices: &[usize],
+        _opt_filter: Option<&BooleanArray>,
+        total_num_groups: usize,
+    ) -> Result<()> {
+        assert_eq!(values.len(), 1, "one argument to merge_batch");
+        let input_list = values[0].as_list::<i32>();
+
+        self.num_groups = self.num_groups.max(total_num_groups);
+
+        // Push the ListArray's backing values array as a single batch.
+        let list_values = input_list.values();
+        let list_offsets = input_list.offsets();
+
+        let mut entries = Vec::new();
+
+        for (row_idx, &group_idx) in group_indices.iter().enumerate() {
+            if input_list.is_null(row_idx) {
+                continue;
+            }
+            let start = list_offsets[row_idx] as u32;
+            let end = list_offsets[row_idx + 1] as u32;
+            for pos in start..end {
+                entries.push((group_idx as u32, pos));
+            }
+        }
+
+        if !entries.is_empty() {
+            self.batches.push(Arc::clone(list_values));
+            self.batch_entries.push(entries);
+        }
+
+        Ok(())
+    }
+
+    fn convert_to_state(
+        &self,
+        values: &[ArrayRef],
+        opt_filter: Option<&BooleanArray>,
+    ) -> Result<Vec<ArrayRef>> {
+        assert_eq!(values.len(), 1, "one argument to convert_to_state");
+
+        let input = &values[0];
+
+        // Each row becomes a 1-element list: offsets are [0, 1, 2, ..., n].
+        let offsets = OffsetBuffer::from_repeated_length(1, input.len());
+
+        // Filtered rows become null list entries, which merge_batch will skip.
+        let filter_nulls = opt_filter.and_then(filter_to_nulls);
+
+        // With ignore_nulls, null values also become null list entries. 
Without
+        // ignore_nulls, null values stay as [NULL] so merge_batch retains 
them.
+        let nulls = if self.ignore_nulls {
+            let logical = input.logical_nulls();
+            NullBuffer::union(filter_nulls.as_ref(), logical.as_ref())
+        } else {
+            filter_nulls
+        };
+
+        let field = Arc::new(Field::new_list_field(self.datatype.clone(), 
true));
+        let list_array = ListArray::new(field, offsets, Arc::clone(input), 
nulls);
+
+        Ok(vec![Arc::new(list_array)])
+    }
+
+    fn supports_convert_to_state(&self) -> bool {
+        true
+    }
+
+    fn size(&self) -> usize {
+        self.batches
+            .iter()
+            .map(|arr| 
arr.to_data().get_slice_memory_size().unwrap_or_default())
+            .sum::<usize>()
+            + self.batches.capacity() * size_of::<ArrayRef>()
+            + self
+                .batch_entries
+                .iter()
+                .map(|e| e.capacity() * size_of::<(u32, u32)>())
+                .sum::<usize>()
+            + self.batch_entries.capacity() * size_of::<Vec<(u32, u32)>>()
+    }
+}
+
 #[derive(Debug)]
 pub struct DistinctArrayAggAccumulator {
     values: HashSet<ScalarValue>,
@@ -1227,4 +1573,372 @@ mod tests {
         acc1.merge_batch(&intermediate_state)?;
         Ok(acc1)
     }
+
+    // ---- GroupsAccumulator tests ----
+
+    use arrow::array::Int32Array;
+
+    fn list_array_to_i32_vecs(list: &ListArray) -> 
Vec<Option<Vec<Option<i32>>>> {
+        (0..list.len())
+            .map(|i| {
+                if list.is_null(i) {
+                    None
+                } else {
+                    let arr = list.value(i);
+                    let vals: Vec<Option<i32>> = arr
+                        .as_any()
+                        .downcast_ref::<Int32Array>()
+                        .unwrap()
+                        .iter()
+                        .collect();
+                    Some(vals)
+                }
+            })
+            .collect()
+    }
+
+    fn eval_i32_lists(
+        acc: &mut ArrayAggGroupsAccumulator,
+        emit_to: EmitTo,
+    ) -> Result<Vec<Option<Vec<Option<i32>>>>> {
+        let result = acc.evaluate(emit_to)?;
+        Ok(list_array_to_i32_vecs(result.as_list::<i32>()))
+    }
+
+    #[test]
+    fn groups_accumulator_multiple_batches() -> Result<()> {
+        let mut acc = ArrayAggGroupsAccumulator::new(DataType::Int32, false);
+
+        // First batch
+        let values: ArrayRef = Arc::new(Int32Array::from(vec![1, 2, 3]));
+        acc.update_batch(&[values], &[0, 1, 0], None, 2)?;
+
+        // Second batch
+        let values: ArrayRef = Arc::new(Int32Array::from(vec![4, 5]));
+        acc.update_batch(&[values], &[1, 0], None, 2)?;
+
+        let vals = eval_i32_lists(&mut acc, EmitTo::All)?;
+        assert_eq!(vals[0], Some(vec![Some(1), Some(3), Some(5)]));
+        assert_eq!(vals[1], Some(vec![Some(2), Some(4)]));
+
+        Ok(())
+    }
+
+    #[test]
+    fn groups_accumulator_emit_first() -> Result<()> {
+        let mut acc = ArrayAggGroupsAccumulator::new(DataType::Int32, false);
+
+        let values: ArrayRef = Arc::new(Int32Array::from(vec![10, 20, 30]));
+        acc.update_batch(&[values], &[0, 1, 2], None, 3)?;
+
+        // Emit first 2 groups
+        let vals = eval_i32_lists(&mut acc, EmitTo::First(2))?;
+        assert_eq!(vals.len(), 2);
+        assert_eq!(vals[0], Some(vec![Some(10)]));
+        assert_eq!(vals[1], Some(vec![Some(20)]));
+
+        // Remaining group (was index 2, now shifted to 0)
+        let vals = eval_i32_lists(&mut acc, EmitTo::All)?;
+        assert_eq!(vals.len(), 1);
+        assert_eq!(vals[0], Some(vec![Some(30)]));
+
+        Ok(())
+    }
+
+    #[test]
+    fn groups_accumulator_emit_first_frees_batches() -> Result<()> {
+        // Batch 0 has rows only for group 0; batch 1 has rows for
+        // both groups. After emitting group 0, batch 0 should be
+        // dropped entirely and batch 1 should be compacted to the
+        // retained row(s).
+        let mut acc = ArrayAggGroupsAccumulator::new(DataType::Int32, false);
+
+        let batch0: ArrayRef = Arc::new(Int32Array::from(vec![10, 20]));
+        acc.update_batch(&[batch0], &[0, 0], None, 2)?;
+
+        let batch1: ArrayRef = Arc::new(Int32Array::from(vec![30, 40]));
+        acc.update_batch(&[batch1], &[0, 1], None, 2)?;
+
+        assert_eq!(acc.batches.len(), 2);
+        assert!(!acc.batches[0].is_empty());
+        assert!(!acc.batches[1].is_empty());
+
+        // Emit group 0. Batch 0 is only referenced by group 0, so it
+        // should be removed. Batch 1 is mixed, so it should be compacted
+        // to contain only the retained row for group 1.
+        let vals = eval_i32_lists(&mut acc, EmitTo::First(1))?;
+        assert_eq!(vals[0], Some(vec![Some(10), Some(20), Some(30)]));
+
+        assert_eq!(acc.batches.len(), 1);
+        let retained = acc.batches[0]
+            .as_any()
+            .downcast_ref::<Int32Array>()
+            .unwrap();
+        assert_eq!(retained.values(), &[40]);
+        assert_eq!(acc.batch_entries, vec![vec![(0, 0)]]);
+
+        // Emit remaining group 1
+        let vals = eval_i32_lists(&mut acc, EmitTo::All)?;
+        assert_eq!(vals[0], Some(vec![Some(40)]));
+
+        assert!(acc.batches.is_empty());
+        assert_eq!(acc.size(), 0);
+
+        Ok(())
+    }
+
+    #[test]
+    fn groups_accumulator_emit_first_compacts_mixed_batches() -> Result<()> {
+        let mut acc = ArrayAggGroupsAccumulator::new(DataType::Int32, false);
+
+        let batch: ArrayRef = Arc::new(Int32Array::from(vec![10, 20, 30, 40]));
+        acc.update_batch(&[batch], &[0, 1, 0, 1], None, 2)?;
+
+        let size_before = acc.size();
+        let vals = eval_i32_lists(&mut acc, EmitTo::First(1))?;
+        assert_eq!(vals[0], Some(vec![Some(10), Some(30)]));
+
+        assert_eq!(acc.num_groups, 1);
+        assert_eq!(acc.batches.len(), 1);
+        let retained = acc.batches[0]
+            .as_any()
+            .downcast_ref::<Int32Array>()
+            .unwrap();
+        assert_eq!(retained.values(), &[20, 40]);
+        assert_eq!(acc.batch_entries, vec![vec![(0, 0), (0, 1)]]);
+        assert!(acc.size() < size_before);
+
+        let vals = eval_i32_lists(&mut acc, EmitTo::All)?;
+        assert_eq!(vals[0], Some(vec![Some(20), Some(40)]));
+        assert_eq!(acc.size(), 0);
+
+        Ok(())
+    }
+
+    #[test]
+    fn groups_accumulator_emit_all_releases_capacity() -> Result<()> {
+        let mut acc = ArrayAggGroupsAccumulator::new(DataType::Int32, false);
+
+        let batch: ArrayRef = Arc::new(Int32Array::from_iter_values(0..64));
+        acc.update_batch(
+            &[batch],
+            &(0..64).map(|i| i % 4).collect::<Vec<_>>(),
+            None,
+            4,
+        )?;
+
+        assert!(acc.size() > 0);
+        let _ = eval_i32_lists(&mut acc, EmitTo::All)?;
+
+        assert_eq!(acc.size(), 0);
+        assert_eq!(acc.batches.capacity(), 0);
+        assert_eq!(acc.batch_entries.capacity(), 0);
+
+        Ok(())
+    }
+
+    #[test]
+    fn groups_accumulator_null_groups() -> Result<()> {
+        // Groups that never receive values should produce null
+        let mut acc = ArrayAggGroupsAccumulator::new(DataType::Int32, false);
+
+        let values: ArrayRef = Arc::new(Int32Array::from(vec![1]));
+        // Only group 0 gets a value, groups 1 and 2 are empty
+        acc.update_batch(&[values], &[0], None, 3)?;
+
+        let vals = eval_i32_lists(&mut acc, EmitTo::All)?;
+        assert_eq!(vals, vec![Some(vec![Some(1)]), None, None]);
+
+        Ok(())
+    }
+
+    #[test]
+    fn groups_accumulator_ignore_nulls() -> Result<()> {
+        let mut acc = ArrayAggGroupsAccumulator::new(DataType::Int32, true);
+
+        let values: ArrayRef =
+            Arc::new(Int32Array::from(vec![Some(1), None, Some(3), None]));
+        acc.update_batch(&[values], &[0, 0, 1, 1], None, 2)?;
+
+        let vals = eval_i32_lists(&mut acc, EmitTo::All)?;
+        // Group 0: only non-null value is 1
+        assert_eq!(vals[0], Some(vec![Some(1)]));
+        // Group 1: only non-null value is 3
+        assert_eq!(vals[1], Some(vec![Some(3)]));
+
+        Ok(())
+    }
+
+    #[test]
+    fn groups_accumulator_opt_filter() -> Result<()> {
+        let mut acc = ArrayAggGroupsAccumulator::new(DataType::Int32, false);
+
+        let values: ArrayRef = Arc::new(Int32Array::from(vec![1, 2, 3, 4]));
+        // Use a mix of false and null to filter out rows — both should
+        // be skipped.
+        let filter = BooleanArray::from(vec![Some(true), None, Some(true), 
Some(false)]);
+        acc.update_batch(&[values], &[0, 0, 1, 1], Some(&filter), 2)?;
+
+        let vals = eval_i32_lists(&mut acc, EmitTo::All)?;
+        assert_eq!(vals[0], Some(vec![Some(1)])); // row 1 filtered (null)
+        assert_eq!(vals[1], Some(vec![Some(3)])); // row 3 filtered (false)
+
+        Ok(())
+    }
+
+    #[test]
+    fn groups_accumulator_state_merge_roundtrip() -> Result<()> {
+        // Accumulator 1: update_batch, then merge, then update_batch again.
+        // Verifies that values appear in chronological insertion order.
+        let mut acc1 = ArrayAggGroupsAccumulator::new(DataType::Int32, false);
+        let values: ArrayRef = Arc::new(Int32Array::from(vec![1, 2]));
+        acc1.update_batch(&[values], &[0, 1], None, 2)?;
+
+        // Accumulator 2
+        let mut acc2 = ArrayAggGroupsAccumulator::new(DataType::Int32, false);
+        let values: ArrayRef = Arc::new(Int32Array::from(vec![3, 4]));
+        acc2.update_batch(&[values], &[0, 1], None, 2)?;
+
+        // Merge acc2's state into acc1
+        let state = acc2.state(EmitTo::All)?;
+        acc1.merge_batch(&state, &[0, 1], None, 2)?;
+
+        // Another update_batch on acc1 after the merge
+        let values: ArrayRef = Arc::new(Int32Array::from(vec![5, 6]));
+        acc1.update_batch(&[values], &[0, 1], None, 2)?;
+
+        // Each group's values in insertion order:
+        // group 0: update(1), merge(3), update(5) → [1, 3, 5]
+        // group 1: update(2), merge(4), update(6) → [2, 4, 6]
+        let vals = eval_i32_lists(&mut acc1, EmitTo::All)?;
+        assert_eq!(vals[0], Some(vec![Some(1), Some(3), Some(5)]));
+        assert_eq!(vals[1], Some(vec![Some(2), Some(4), Some(6)]));
+
+        Ok(())
+    }
+
+    #[test]
+    fn groups_accumulator_convert_to_state() -> Result<()> {
+        let acc = ArrayAggGroupsAccumulator::new(DataType::Int32, false);
+
+        let values: ArrayRef = Arc::new(Int32Array::from(vec![Some(10), None, 
Some(30)]));
+        let state = acc.convert_to_state(&[values], None)?;
+
+        assert_eq!(state.len(), 1);
+        let vals = list_array_to_i32_vecs(state[0].as_list::<i32>());
+        assert_eq!(
+            vals,
+            vec![
+                Some(vec![Some(10)]),
+                Some(vec![None]), // null preserved inside list, not promoted
+                Some(vec![Some(30)]),
+            ]
+        );
+
+        Ok(())
+    }
+
+    #[test]
+    fn groups_accumulator_convert_to_state_with_filter() -> Result<()> {
+        let acc = ArrayAggGroupsAccumulator::new(DataType::Int32, false);
+
+        let values: ArrayRef = Arc::new(Int32Array::from(vec![10, 20, 30]));
+        let filter = BooleanArray::from(vec![true, false, true]);
+        let state = acc.convert_to_state(&[values], Some(&filter))?;
+
+        let vals = list_array_to_i32_vecs(state[0].as_list::<i32>());
+        assert_eq!(
+            vals,
+            vec![
+                Some(vec![Some(10)]),
+                None, // filtered
+                Some(vec![Some(30)]),
+            ]
+        );
+
+        Ok(())
+    }
+
+    #[test]
+    fn groups_accumulator_convert_to_state_merge_preserves_nulls() -> 
Result<()> {
+        // Verifies that null values survive the convert_to_state -> 
merge_batch
+        // round-trip when ignore_nulls is false (default null handling).
+        let acc = ArrayAggGroupsAccumulator::new(DataType::Int32, false);
+
+        let values: ArrayRef = Arc::new(Int32Array::from(vec![Some(1), None, 
Some(3)]));
+        let state = acc.convert_to_state(&[values], None)?;
+
+        // Feed state into a new accumulator via merge_batch
+        let mut acc2 = ArrayAggGroupsAccumulator::new(DataType::Int32, false);
+        acc2.merge_batch(&state, &[0, 0, 1], None, 2)?;
+
+        // Group 0 received rows 0 ([1]) and 1 ([NULL]) → [1, NULL]
+        let vals = eval_i32_lists(&mut acc2, EmitTo::All)?;
+        assert_eq!(vals[0], Some(vec![Some(1), None]));
+        // Group 1 received row 2 ([3]) → [3]
+        assert_eq!(vals[1], Some(vec![Some(3)]));
+
+        Ok(())
+    }
+
+    #[test]
+    fn groups_accumulator_convert_to_state_merge_ignore_nulls() -> Result<()> {
+        // Verifies that null values are dropped in the convert_to_state ->
+        // merge_batch round-trip when ignore_nulls is true.
+        let acc = ArrayAggGroupsAccumulator::new(DataType::Int32, true);
+
+        let values: ArrayRef =
+            Arc::new(Int32Array::from(vec![Some(1), None, Some(3), None]));
+        let state = acc.convert_to_state(&[values], None)?;
+
+        let list = state[0].as_list::<i32>();
+        // Rows 0 and 2 are valid lists; rows 1 and 3 are null list entries
+        assert!(!list.is_null(0));
+        assert!(list.is_null(1));
+        assert!(!list.is_null(2));
+        assert!(list.is_null(3));
+
+        // Feed state into a new accumulator via merge_batch
+        let mut acc2 = ArrayAggGroupsAccumulator::new(DataType::Int32, true);
+        acc2.merge_batch(&state, &[0, 0, 1, 1], None, 2)?;
+
+        // Group 0: received [1] and null (skipped) → [1]
+        let vals = eval_i32_lists(&mut acc2, EmitTo::All)?;
+        assert_eq!(vals[0], Some(vec![Some(1)]));
+        // Group 1: received [3] and null (skipped) → [3]
+        assert_eq!(vals[1], Some(vec![Some(3)]));
+
+        Ok(())
+    }
+
+    #[test]
+    fn groups_accumulator_all_groups_empty() -> Result<()> {
+        let mut acc = ArrayAggGroupsAccumulator::new(DataType::Int32, false);
+
+        // Create groups but don't add any values (all filtered out)
+        let values: ArrayRef = Arc::new(Int32Array::from(vec![1, 2]));
+        let filter = BooleanArray::from(vec![false, false]);
+        acc.update_batch(&[values], &[0, 1], Some(&filter), 2)?;
+
+        let vals = eval_i32_lists(&mut acc, EmitTo::All)?;
+        assert_eq!(vals, vec![None, None]);
+
+        Ok(())
+    }
+
+    #[test]
+    fn groups_accumulator_ignore_nulls_all_null_group() -> Result<()> {
+        // When ignore_nulls is true and a group receives only nulls,
+        // it should produce a null output
+        let mut acc = ArrayAggGroupsAccumulator::new(DataType::Int32, true);
+
+        let values: ArrayRef = Arc::new(Int32Array::from(vec![None, Some(1), 
None]));
+        acc.update_batch(&[values], &[0, 1, 0], None, 2)?;
+
+        let vals = eval_i32_lists(&mut acc, EmitTo::All)?;
+        assert_eq!(vals[0], None); // group 0 got only nulls, all filtered
+        assert_eq!(vals[1], Some(vec![Some(1)])); // group 1 got value 1
+
+        Ok(())
+    }
 }
diff --git a/datafusion/sqllogictest/test_files/aggregate_skip_partial.slt 
b/datafusion/sqllogictest/test_files/aggregate_skip_partial.slt
index 0885a6a7d6..c16a6f4424 100644
--- a/datafusion/sqllogictest/test_files/aggregate_skip_partial.slt
+++ b/datafusion/sqllogictest/test_files/aggregate_skip_partial.slt
@@ -175,6 +175,21 @@ GROUP BY 1, 2 ORDER BY 1 LIMIT 5;
 -2117946883 d 1 0 0 0
 -2098805236 c 1 0 0 0
 
+query IT????
+SELECT c5, c1,
+       ARRAY_AGG(c3),
+       ARRAY_AGG(CASE WHEN c1 = 'a' THEN c3 ELSE NULL END),
+       ARRAY_AGG(c3) FILTER (WHERE c1 = 'b'),
+       ARRAY_AGG(CASE WHEN c1 = 'a' THEN c3 ELSE NULL END) FILTER (WHERE c1 = 
'b')
+FROM aggregate_test_100
+GROUP BY 1, 2 ORDER BY 1 LIMIT 5;
+----
+-2141999138 c [-2] [NULL] NULL NULL
+-2141451704 a [-72] [-72] NULL NULL
+-2138770630 b [63] [NULL] [63] [NULL]
+-2117946883 d [-59] [NULL] NULL NULL
+-2098805236 c [22] [NULL] NULL NULL
+
 # Regression test for https://github.com/apache/datafusion/issues/11846
 query TBBBB rowsort
 select v1, bool_or(v2), bool_and(v2), bool_or(v3), bool_and(v3)
@@ -244,6 +259,19 @@ SELECT c2, count(c1), count(c5), count(c11) FROM 
aggregate_test_100 GROUP BY c2
 4 23 23 23
 5 14 14 14
 
+# Test array_agg; we sort the output to ensure deterministic results
+query I??
+SELECT c2,
+       array_sort(array_agg(c5)),
+       array_sort(array_agg(c3) FILTER (WHERE c3 > 0))
+FROM aggregate_test_100 GROUP BY c2 ORDER BY c2;
+----
+1 [-1991133944, -1882293856, -1448995523, -1383162419, -1339586153, 
-1331533190, -1176490478, -1143802338, -928766616, -644225469, -335410409, 
383352709, 431378678, 794623392, 994303988, 1171968280, 1188089983, 1213926989, 
1325868318, 1413111008, 2106705285, 2143473091] [12, 29, 36, 38, 41, 54, 57, 
70, 71, 83, 103, 120, 125]
+2 [-2138770630, -1927628110, -1908480893, -1899175111, -1808210365, 
-1660426473, -1222533990, -1090239422, -1011669561, -800561771, -587831330, 
-537142430, -168758331, -108973366, 49866617, 370975815, 439738328, 715235348, 
1354539333, 1593800404, 2033001162, 2053379412] [1, 29, 31, 45, 49, 52, 52, 63, 
68, 93, 97, 113, 122]
+3 [-2141999138, -2141451704, -2098805236, -1302295658, -903316089, -421042466, 
-382483011, -346989627, 141218956, 240273900, 397430452, 670497898, 912707948, 
1299719633, 1337043149, 1436496767, 1489733240, 1738331255, 2030965207] [13, 
13, 14, 17, 17, 22, 71, 73, 77, 97, 104, 112, 123]
+4 [-1885422396, -1813935549, -1009656194, -673237643, -237425046, -4229382, 
61035129, 427197269, 434021400, 659422734, 702611616, 762932956, 852509237, 
1282464673, 1423957796, 1544188174, 1579876740, 1902023838, 1991172974, 
1993193190, 2047637360, 2051224722, 2064155045] [3, 5, 17, 30, 47, 55, 65, 73, 
74, 96, 97, 102, 123]
+5 [-2117946883, -842693467, -629486480, -467659022, -134213907, 41423756, 
586844478, 623103518, 706441268, 1188285940, 1689098844, 1824882165, 
1955646088, 2025611582] [36, 62, 64, 68, 118]
+
 # Test min / max for int / float
 query IIIRR
 SELECT c2, min(c5), max(c5), min(c11), max(c11) FROM aggregate_test_100 GROUP 
BY c2 ORDER BY c2;
@@ -389,19 +417,6 @@ c 2.666666666667 0.425241138254
 d 2.444444444444 0.541519476308
 e 3 0.505440263521
 
-# FIXME: add bool_and(v3) column when issue fixed
-# ISSUE https://github.com/apache/datafusion/issues/11846
-query TBBB rowsort
-select v1, bool_or(v2), bool_and(v2), bool_or(v3)
-from aggregate_test_100_bool
-group by v1
-----
-a true false true
-b true false true
-c true false false
-d true false false
-e true false NULL
-
 query TBBB rowsort
 select v1,
       bool_or(v2) FILTER (WHERE v1 = 'a' OR v1 = 'c' OR v1 = 'e'),


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to