This is an automated email from the ASF dual-hosted git repository.

github-merge-queue[bot] pushed a commit to branch 
gh-readonly-queue/main/pr-22558-7b78f0cea0409f60c79a3833781e621344d52ffd
in repository https://gitbox.apache.org/repos/asf/datafusion.git

commit c1f0d54bb9e1054e7c7212037e970b3d3b34885c
Author: Ariel Miculas-Trif <[email protected]>
AuthorDate: Fri Jun 5 04:24:01 2026 +0300

    perf: avoid unnecessary large allocations (#22558)
    
    ## Which issue does this PR close?
    Related a bit to https://github.com/apache/datafusion/issues/22526
    
    ~Needs rebasing once https://github.com/apache/datafusion/pull/22416 is
    merged~
    
    ## Rationale for this change
    split_off does this:
    > Returns a newly allocated vector containing the elements in the range
    [at, len). After the call, the original vector will be left containing
    the elements [0, at) with its previous capacity unchanged.
    
    which is bad when taking a small slice from a large Vec, for two
    reasons:
    * it will allocate memory for the remaining elements, which are a lot
    more than n
    * it will return a Vec with a very large capacity compared to its length
    
    split_vec_min_alloc still has some issues:
    https://github.com/apache/datafusion/issues/22548 but it uses drain +
    collect when n is small, which is better because it only allocates for
    the initial n elements and doesn't inflate the capacity
    
    ## What changes are included in this PR?
    
    
    ## Are these changes tested?
    Yes
    
    ## Are there any user-facing changes?
    No
---
 datafusion/expr-common/src/groups_accumulator.rs   | 60 +++++++++++++++++++---
 .../group_values/single_group_by/primitive.rs      | 53 +++++++++++++++++--
 2 files changed, 102 insertions(+), 11 deletions(-)

diff --git a/datafusion/expr-common/src/groups_accumulator.rs 
b/datafusion/expr-common/src/groups_accumulator.rs
index 9053f7a8ea..da5da384c7 100644
--- a/datafusion/expr-common/src/groups_accumulator.rs
+++ b/datafusion/expr-common/src/groups_accumulator.rs
@@ -18,7 +18,7 @@
 //! Vectorized [`GroupsAccumulator`]
 
 use arrow::array::{ArrayRef, BooleanArray};
-use datafusion_common::{Result, not_impl_err};
+use datafusion_common::{Result, not_impl_err, utils::split_vec_min_alloc};
 
 /// Describes how many rows should be emitted during grouping.
 #[derive(Debug, Clone, Copy, PartialEq, Eq)]
@@ -45,13 +45,7 @@ impl EmitTo {
                 // Take the entire vector, leave new (empty) vector
                 std::mem::take(v)
             }
-            Self::First(n) => {
-                // get end n+1,.. values into t
-                let mut t = v.split_off(*n);
-                // leave n+1,.. in v
-                std::mem::swap(v, &mut t);
-                t
-            }
+            Self::First(n) => split_vec_min_alloc(v, *n),
         }
     }
 }
@@ -254,3 +248,53 @@ pub trait GroupsAccumulator: Send + std::any::Any {
     /// compute, not `O(num_groups)`
     fn size(&self) -> usize;
 }
+
+#[cfg(test)]
+mod tests {
+    use super::EmitTo;
+
+    /// When `n` is small relative to `len`, the old `split_off(n) + swap` 
pattern had
+    /// two allocation problems:
+    ///
+    /// 1. The returned Vec kept the original large backing allocation even 
though it
+    ///    only contains `n` elements (wasted capacity on a short-lived value).
+    /// 2. `split_off` allocated a fresh Vec for the `len - n` remaining 
elements,
+    ///    even though that side is much larger than `n` — the expensive side 
to
+    ///    allocate.
+    ///
+    /// `split_vec_min_alloc` fixes both: when `n * 2 <= len` it uses
+    /// `drain(0..n).collect()`, allocating only `n` elements for the emitted 
prefix
+    /// and keeping the original large backing in the remaining accumulator.
+    #[test]
+    fn take_needed_first_small_n_allocates_minimally() {
+        let mut v: Vec<i32> = Vec::with_capacity(128);
+        v.extend(0..20i32);
+        let original_capacity = v.capacity(); // 128
+
+        // n=4, n*2=8 <= len=20 -> drain branch in split_vec_min_alloc
+        let emitted = EmitTo::First(4).take_needed(&mut v);
+
+        assert_eq!(emitted, vec![0, 1, 2, 3]);
+        assert_eq!(v, (4..20i32).collect::<Vec<_>>());
+
+        // The emitted prefix must NOT carry the original large allocation.
+        // Old split_off+swap returned a Vec with capacity=128 for only 4 
elements.
+        assert!(
+            emitted.capacity() <= 4,
+            "emitted prefix capacity {} should be ~n=4, not the original {}",
+            emitted.capacity(),
+            original_capacity,
+        );
+
+        // The remaining accumulator must retain the original large allocation 
so
+        // that incoming groups don't immediately force a realloc.
+        // Old split_off+swap left the remaining vec with a small fresh 
allocation.
+        assert_eq!(
+            v.capacity(),
+            original_capacity,
+            "remaining vec capacity {} should equal original {}",
+            v.capacity(),
+            original_capacity,
+        );
+    }
+}
diff --git 
a/datafusion/physical-plan/src/aggregates/group_values/single_group_by/primitive.rs
 
b/datafusion/physical-plan/src/aggregates/group_values/single_group_by/primitive.rs
index efaf7eba0f..07535cfdaa 100644
--- 
a/datafusion/physical-plan/src/aggregates/group_values/single_group_by/primitive.rs
+++ 
b/datafusion/physical-plan/src/aggregates/group_values/single_group_by/primitive.rs
@@ -24,6 +24,7 @@ use arrow::array::{
 use arrow::datatypes::{DataType, i256};
 use datafusion_common::Result;
 use datafusion_common::hash_utils::RandomState;
+use datafusion_common::utils::split_vec_min_alloc;
 use datafusion_execution::memory_pool::proxy::VecAllocExt;
 use datafusion_expr::EmitTo;
 use half::f16;
@@ -207,9 +208,7 @@ where
                     Some(_) => self.null_group.take(),
                     None => None,
                 };
-                let mut split = self.values.split_off(n);
-                std::mem::swap(&mut self.values, &mut split);
-                build_primitive(split, null_group)
+                build_primitive(split_vec_min_alloc(&mut self.values, n), 
null_group)
             }
         };
 
@@ -223,3 +222,51 @@ where
         self.map.shrink_to(num_rows, |_| 0); // hasher does not matter since 
the map is cleared
     }
 }
+
+#[cfg(test)]
+mod tests {
+    use super::*;
+    use arrow::array::types::Int32Type;
+    use arrow::array::{ArrayRef, Int32Array};
+    use arrow::datatypes::DataType;
+    use datafusion_expr::EmitTo;
+    use std::sync::Arc;
+
+    /// Mirror of the `EmitTo::take_needed` regression test, applied to the
+    /// concrete `GroupValuesPrimitive` accumulator.
+    ///
+    /// When `n` is small, the old `split_off(n) + swap` pattern used inside
+    /// `emit(EmitTo::First(n))` left `self.values` with a small fresh 
allocation
+    /// and returned the emitted prefix carrying the original large backing.
+    ///
+    /// With `split_vec_min_alloc` and `n * 2 <= len`, the drain branch is 
taken:
+    /// the emitted prefix gets a compact allocation and `self.values` retains 
the
+    /// original large one.
+    #[test]
+    fn emit_first_small_n_allocates_minimally() -> Result<()> {
+        let mut gv = GroupValuesPrimitive::<Int32Type>::new(DataType::Int32);
+
+        // Intern 20 distinct values; `new()` pre-allocates capacity 128 for 
`values`.
+        let arr: ArrayRef = Arc::new(Int32Array::from_iter_values(0..20i32));
+        let mut groups = vec![];
+        gv.intern(&[arr], &mut groups)?;
+        let capacity_before = gv.values.capacity(); // 128
+
+        // n=4, n*2=8 <= len=20 -> drain branch
+        let emitted = gv.emit(EmitTo::First(4))?;
+
+        assert_eq!(emitted[0].len(), 4);
+
+        // `self.values` must retain its original large allocation.
+        // Old split_off+swap left it with a fresh small allocation (~16).
+        assert_eq!(
+            gv.values.capacity(),
+            capacity_before,
+            "self.values capacity {} should equal original {} after small 
First(n) emit",
+            gv.values.capacity(),
+            capacity_before,
+        );
+
+        Ok(())
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to