This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git
The following commit(s) were added to refs/heads/main by this push:
new a14f77c141 Fix: ViewType gc on huge batch would produce bad output
(#8694)
a14f77c141 is described below
commit a14f77c141aee2ab0be729cacad4572ca4b728bf
Author: mwish <[email protected]>
AuthorDate: Tue Nov 11 03:51:58 2025 +0800
Fix: ViewType gc on huge batch would produce bad output (#8694)
# Which issue does this PR close?
- Closes #8681.
# Rationale for this change
Previously, `gc()` will produce a single buffer. However, for buffer
size greater than 2GiB, it would be buggy, since buffer-offset it's a
4-byte signed integer.
# What changes are included in this PR?
Add a GcCopyGroup type, and do gc for it.
# Are these changes tested?
Yes
# Are there any user-facing changes?
gc would produce more buffers
---------
Co-authored-by: Andrew Lamb <[email protected]>
---
arrow-array/src/array/byte_view_array.rs | 146 ++++++++++++++++++++++++++++---
1 file changed, 134 insertions(+), 12 deletions(-)
diff --git a/arrow-array/src/array/byte_view_array.rs
b/arrow-array/src/array/byte_view_array.rs
index 0319363bf5..f677c4ae67 100644
--- a/arrow-array/src/array/byte_view_array.rs
+++ b/arrow-array/src/array/byte_view_array.rs
@@ -512,18 +512,85 @@ impl<T: ByteViewType + ?Sized> GenericByteViewArray<T> {
};
}
- // 3) Allocate exactly capacity for all non-inline data
- let mut data_buf = Vec::with_capacity(total_large);
+ let (views_buf, data_blocks) = if total_large < i32::MAX as usize {
+ // fast path, the entire data fits in a single buffer
+ // 3) Allocate exactly capacity for all non-inline data
+ let mut data_buf = Vec::with_capacity(total_large);
+
+ // 4) Iterate over views and process each inline/non-inline view
+ let views_buf: Vec<u128> = (0..len)
+ .map(|i| unsafe { self.copy_view_to_buffer(i, 0, &mut
data_buf) })
+ .collect();
+ let data_block = Buffer::from_vec(data_buf);
+ let data_blocks = vec![data_block];
+ (views_buf, data_blocks)
+ } else {
+ // slow path, need to split into multiple buffers
+
+ struct GcCopyGroup {
+ total_buffer_bytes: usize,
+ total_len: usize,
+ }
+
+ impl GcCopyGroup {
+ fn new(total_buffer_bytes: u32, total_len: usize) -> Self {
+ Self {
+ total_buffer_bytes: total_buffer_bytes as usize,
+ total_len,
+ }
+ }
+ }
- // 4) Iterate over views and process each inline/non-inline view
- let views_buf: Vec<u128> = (0..len)
- .map(|i| unsafe { self.copy_view_to_buffer(i, &mut data_buf) })
- .collect();
+ let mut groups = Vec::new();
+ let mut current_length = 0;
+ let mut current_elements = 0;
+
+ for view in self.views() {
+ let len = *view as u32;
+ if len > MAX_INLINE_VIEW_LEN {
+ if current_length + len > i32::MAX as u32 {
+ // Start a new group
+ groups.push(GcCopyGroup::new(current_length,
current_elements));
+ current_length = 0;
+ current_elements = 0;
+ }
+ current_length += len;
+ current_elements += 1;
+ }
+ }
+ if current_elements != 0 {
+ groups.push(GcCopyGroup::new(current_length,
current_elements));
+ }
+ debug_assert!(groups.len() <= i32::MAX as usize);
+
+ // 3) Copy the buffers group by group
+ let mut views_buf = Vec::with_capacity(len);
+ let mut data_blocks = Vec::with_capacity(groups.len());
+
+ let mut current_view_idx = 0;
+
+ for (group_idx, gc_copy_group) in groups.iter().enumerate() {
+ let mut data_buf =
Vec::with_capacity(gc_copy_group.total_buffer_bytes);
+
+ // Directly push views to avoid intermediate Vec allocation
+ let new_views = (current_view_idx..current_view_idx +
gc_copy_group.total_len).map(
+ |view_idx| {
+ // safety: the view index came from iterating over
valid range
+ unsafe {
+ self.copy_view_to_buffer(view_idx, group_idx as
i32, &mut data_buf)
+ }
+ },
+ );
+ views_buf.extend(new_views);
+
+ data_blocks.push(Buffer::from_vec(data_buf));
+ current_view_idx += gc_copy_group.total_len;
+ }
+ (views_buf, data_blocks)
+ };
- // 5) Wrap up buffers
- let data_block = Buffer::from_vec(data_buf);
+ // 5) Wrap up views buffer
let views_scalar = ScalarBuffer::from(views_buf);
- let data_blocks = vec![data_block];
// SAFETY: views_scalar, data_blocks, and nulls are correctly aligned
and sized
unsafe { GenericByteViewArray::new_unchecked(views_scalar,
data_blocks, nulls) }
@@ -538,10 +605,15 @@ impl<T: ByteViewType + ?Sized> GenericByteViewArray<T> {
/// inside one of `self.buffers`.
/// - `data_buf` must be ready to have additional bytes appended.
/// - After this call, the returned view will have its
- /// `buffer_index` reset to `0` and its `offset` updated so that it
points
+ /// `buffer_index` reset to `buffer_idx` and its `offset` updated so
that it points
/// into the bytes just appended at the end of `data_buf`.
#[inline(always)]
- unsafe fn copy_view_to_buffer(&self, i: usize, data_buf: &mut Vec<u8>) ->
u128 {
+ unsafe fn copy_view_to_buffer(
+ &self,
+ i: usize,
+ buffer_idx: i32,
+ data_buf: &mut Vec<u8>,
+ ) -> u128 {
// SAFETY: `i < self.len()` ensures this is in‑bounds.
let raw_view = unsafe { *self.views().get_unchecked(i) };
let mut bv = ByteView::from(raw_view);
@@ -561,7 +633,7 @@ impl<T: ByteViewType + ?Sized> GenericByteViewArray<T> {
let new_offset = data_buf.len() as u32;
data_buf.extend_from_slice(slice);
- bv.buffer_index = 0;
+ bv.buffer_index = buffer_idx as u32;
bv.offset = new_offset;
bv.into()
}
@@ -1404,6 +1476,56 @@ mod tests {
}
}
+ #[test]
+ #[cfg_attr(miri, ignore)] // Takes too long
+ fn test_gc_huge_array() {
+ // Construct multiple 128 MiB BinaryView entries so total > 4 GiB
+ let block_len: usize = 128 * 1024 * 1024; // 128 MiB per view
+ let num_views: usize = 36;
+
+ // Create a single 128 MiB data block with a simple byte pattern
+ let buffer = Buffer::from_vec(vec![0xAB; block_len]);
+ let buffer2 = Buffer::from_vec(vec![0xFF; block_len]);
+
+ // Append this block and then add many views pointing to it
+ let mut builder = BinaryViewBuilder::new();
+ let block_id = builder.append_block(buffer);
+ for _ in 0..num_views / 2 {
+ builder
+ .try_append_view(block_id, 0, block_len as u32)
+ .expect("append view into 128MiB block");
+ }
+ let block_id2 = builder.append_block(buffer2);
+ for _ in 0..num_views / 2 {
+ builder
+ .try_append_view(block_id2, 0, block_len as u32)
+ .expect("append view into 128MiB block");
+ }
+
+ let array = builder.finish();
+ let total = array.total_buffer_bytes_used();
+ assert!(
+ total > u32::MAX as usize,
+ "Expected total non-inline bytes to exceed 4 GiB, got {}",
+ total
+ );
+
+ // Run gc and verify correctness
+ let gced = array.gc();
+ assert_eq!(gced.len(), num_views, "Length mismatch after gc");
+ assert_eq!(gced.null_count(), 0, "Null count mismatch after gc");
+ assert_ne!(
+ gced.data_buffers().len(),
+ 1,
+ "gc with huge buffer should not consolidate data into a single
buffer"
+ );
+
+ // Element-wise equality check across the entire array
+ array.iter().zip(gced.iter()).for_each(|(orig, got)| {
+ assert_eq!(orig, got, "Value mismatch after gc on huge array");
+ });
+ }
+
#[test]
fn test_eq() {
let test_data = [