This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git


The following commit(s) were added to refs/heads/main by this push:
     new a14f77c141 Fix: ViewType gc on huge batch would produce bad output 
(#8694)
a14f77c141 is described below

commit a14f77c141aee2ab0be729cacad4572ca4b728bf
Author: mwish <[email protected]>
AuthorDate: Tue Nov 11 03:51:58 2025 +0800

    Fix: ViewType gc on huge batch would produce bad output (#8694)
    
    # Which issue does this PR close?
    
    - Closes #8681.
    
    # Rationale for this change
    
    Previously, `gc()` will produce a single buffer. However, for buffer
    size greater than 2GiB, it would be buggy, since buffer-offset it's a
    4-byte signed integer.
    
    # What changes are included in this PR?
    
    Add a GcCopyGroup type, and do gc for it.
    
    # Are these changes tested?
    
    Yes
    
    # Are there any user-facing changes?
    
    gc would produce more buffers
    
    ---------
    
    Co-authored-by: Andrew Lamb <[email protected]>
---
 arrow-array/src/array/byte_view_array.rs | 146 ++++++++++++++++++++++++++++---
 1 file changed, 134 insertions(+), 12 deletions(-)

diff --git a/arrow-array/src/array/byte_view_array.rs 
b/arrow-array/src/array/byte_view_array.rs
index 0319363bf5..f677c4ae67 100644
--- a/arrow-array/src/array/byte_view_array.rs
+++ b/arrow-array/src/array/byte_view_array.rs
@@ -512,18 +512,85 @@ impl<T: ByteViewType + ?Sized> GenericByteViewArray<T> {
             };
         }
 
-        // 3) Allocate exactly capacity for all non-inline data
-        let mut data_buf = Vec::with_capacity(total_large);
+        let (views_buf, data_blocks) = if total_large < i32::MAX as usize {
+            // fast path, the entire data fits in a single buffer
+            // 3) Allocate exactly capacity for all non-inline data
+            let mut data_buf = Vec::with_capacity(total_large);
+
+            // 4) Iterate over views and process each inline/non-inline view
+            let views_buf: Vec<u128> = (0..len)
+                .map(|i| unsafe { self.copy_view_to_buffer(i, 0, &mut 
data_buf) })
+                .collect();
+            let data_block = Buffer::from_vec(data_buf);
+            let data_blocks = vec![data_block];
+            (views_buf, data_blocks)
+        } else {
+            // slow path, need to split into multiple buffers
+
+            struct GcCopyGroup {
+                total_buffer_bytes: usize,
+                total_len: usize,
+            }
+
+            impl GcCopyGroup {
+                fn new(total_buffer_bytes: u32, total_len: usize) -> Self {
+                    Self {
+                        total_buffer_bytes: total_buffer_bytes as usize,
+                        total_len,
+                    }
+                }
+            }
 
-        // 4) Iterate over views and process each inline/non-inline view
-        let views_buf: Vec<u128> = (0..len)
-            .map(|i| unsafe { self.copy_view_to_buffer(i, &mut data_buf) })
-            .collect();
+            let mut groups = Vec::new();
+            let mut current_length = 0;
+            let mut current_elements = 0;
+
+            for view in self.views() {
+                let len = *view as u32;
+                if len > MAX_INLINE_VIEW_LEN {
+                    if current_length + len > i32::MAX as u32 {
+                        // Start a new group
+                        groups.push(GcCopyGroup::new(current_length, 
current_elements));
+                        current_length = 0;
+                        current_elements = 0;
+                    }
+                    current_length += len;
+                    current_elements += 1;
+                }
+            }
+            if current_elements != 0 {
+                groups.push(GcCopyGroup::new(current_length, 
current_elements));
+            }
+            debug_assert!(groups.len() <= i32::MAX as usize);
+
+            // 3) Copy the buffers group by group
+            let mut views_buf = Vec::with_capacity(len);
+            let mut data_blocks = Vec::with_capacity(groups.len());
+
+            let mut current_view_idx = 0;
+
+            for (group_idx, gc_copy_group) in groups.iter().enumerate() {
+                let mut data_buf = 
Vec::with_capacity(gc_copy_group.total_buffer_bytes);
+
+                // Directly push views to avoid intermediate Vec allocation
+                let new_views = (current_view_idx..current_view_idx + 
gc_copy_group.total_len).map(
+                    |view_idx| {
+                        // safety: the view index came from iterating over 
valid range
+                        unsafe {
+                            self.copy_view_to_buffer(view_idx, group_idx as 
i32, &mut data_buf)
+                        }
+                    },
+                );
+                views_buf.extend(new_views);
+
+                data_blocks.push(Buffer::from_vec(data_buf));
+                current_view_idx += gc_copy_group.total_len;
+            }
+            (views_buf, data_blocks)
+        };
 
-        // 5) Wrap up buffers
-        let data_block = Buffer::from_vec(data_buf);
+        // 5) Wrap up views buffer
         let views_scalar = ScalarBuffer::from(views_buf);
-        let data_blocks = vec![data_block];
 
         // SAFETY: views_scalar, data_blocks, and nulls are correctly aligned 
and sized
         unsafe { GenericByteViewArray::new_unchecked(views_scalar, 
data_blocks, nulls) }
@@ -538,10 +605,15 @@ impl<T: ByteViewType + ?Sized> GenericByteViewArray<T> {
     ///   inside one of `self.buffers`.
     /// - `data_buf` must be ready to have additional bytes appended.
     /// - After this call, the returned view will have its
-    ///   `buffer_index` reset to `0` and its `offset` updated so that it 
points
+    ///   `buffer_index` reset to `buffer_idx` and its `offset` updated so 
that it points
     ///   into the bytes just appended at the end of `data_buf`.
     #[inline(always)]
-    unsafe fn copy_view_to_buffer(&self, i: usize, data_buf: &mut Vec<u8>) -> 
u128 {
+    unsafe fn copy_view_to_buffer(
+        &self,
+        i: usize,
+        buffer_idx: i32,
+        data_buf: &mut Vec<u8>,
+    ) -> u128 {
         // SAFETY: `i < self.len()` ensures this is in‑bounds.
         let raw_view = unsafe { *self.views().get_unchecked(i) };
         let mut bv = ByteView::from(raw_view);
@@ -561,7 +633,7 @@ impl<T: ByteViewType + ?Sized> GenericByteViewArray<T> {
             let new_offset = data_buf.len() as u32;
             data_buf.extend_from_slice(slice);
 
-            bv.buffer_index = 0;
+            bv.buffer_index = buffer_idx as u32;
             bv.offset = new_offset;
             bv.into()
         }
@@ -1404,6 +1476,56 @@ mod tests {
         }
     }
 
+    #[test]
+    #[cfg_attr(miri, ignore)] // Takes too long
+    fn test_gc_huge_array() {
+        // Construct multiple 128 MiB BinaryView entries so total > 4 GiB
+        let block_len: usize = 128 * 1024 * 1024; // 128 MiB per view
+        let num_views: usize = 36;
+
+        // Create a single 128 MiB data block with a simple byte pattern
+        let buffer = Buffer::from_vec(vec![0xAB; block_len]);
+        let buffer2 = Buffer::from_vec(vec![0xFF; block_len]);
+
+        // Append this block and then add many views pointing to it
+        let mut builder = BinaryViewBuilder::new();
+        let block_id = builder.append_block(buffer);
+        for _ in 0..num_views / 2 {
+            builder
+                .try_append_view(block_id, 0, block_len as u32)
+                .expect("append view into 128MiB block");
+        }
+        let block_id2 = builder.append_block(buffer2);
+        for _ in 0..num_views / 2 {
+            builder
+                .try_append_view(block_id2, 0, block_len as u32)
+                .expect("append view into 128MiB block");
+        }
+
+        let array = builder.finish();
+        let total = array.total_buffer_bytes_used();
+        assert!(
+            total > u32::MAX as usize,
+            "Expected total non-inline bytes to exceed 4 GiB, got {}",
+            total
+        );
+
+        // Run gc and verify correctness
+        let gced = array.gc();
+        assert_eq!(gced.len(), num_views, "Length mismatch after gc");
+        assert_eq!(gced.null_count(), 0, "Null count mismatch after gc");
+        assert_ne!(
+            gced.data_buffers().len(),
+            1,
+            "gc with huge buffer should not consolidate data into a single 
buffer"
+        );
+
+        // Element-wise equality check across the entire array
+        array.iter().zip(gced.iter()).for_each(|(orig, got)| {
+            assert_eq!(orig, got, "Value mismatch after gc on huge array");
+        });
+    }
+
     #[test]
     fn test_eq() {
         let test_data = [

Reply via email to