This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git
The following commit(s) were added to refs/heads/main by this push:
new 0e40460e6e fix: Panic on reencoding offsets in arrow-ipc with sliced
nested arrays (#6998)
0e40460e6e is described below
commit 0e40460e6e12bc02c7dd0e842f0156affc2b2a15
Author: Michael Maletich <[email protected]>
AuthorDate: Sat Jan 25 08:38:53 2025 -0600
fix: Panic on reencoding offsets in arrow-ipc with sliced nested arrays
(#6998)
* fix: Panic on reencoding offsets
Code was incorrectly defining the end of the offset slice to be the start +
slice_length * 2 because slice_with_length adds the start to the end.
This caused the encoded batches to be larger than they needed to be and
would result in a panic for certain slices.
* Add tests for slicing larger arrays
* Run rustfmt
* Added end to end unit test which shows the problem is fixed.
---------
Co-authored-by: Andrew Lamb <[email protected]>
---
arrow-ipc/src/writer.rs | 161 ++++++++++++++++++++++++++++++++++++++++++++++--
1 file changed, 156 insertions(+), 5 deletions(-)
diff --git a/arrow-ipc/src/writer.rs b/arrow-ipc/src/writer.rs
index 1581df56de..451b7e02c5 100644
--- a/arrow-ipc/src/writer.rs
+++ b/arrow-ipc/src/writer.rs
@@ -1520,10 +1520,7 @@ fn reencode_offsets<O: OffsetSizeTrait>(
let offsets = match start_offset.as_usize() {
0 => {
let size = size_of::<O>();
- offsets.slice_with_length(
- data.offset() * size,
- (data.offset() + data.len() + 1) * size,
- )
+ offsets.slice_with_length(data.offset() * size, (data.len() + 1) *
size)
}
_ => offset_slice.iter().map(|x| *x - *start_offset).collect(),
};
@@ -1840,9 +1837,9 @@ mod tests {
use std::io::Cursor;
use std::io::Seek;
- use arrow_array::builder::GenericListBuilder;
use arrow_array::builder::MapBuilder;
use arrow_array::builder::UnionBuilder;
+ use arrow_array::builder::{GenericListBuilder, ListBuilder, StringBuilder};
use arrow_array::builder::{PrimitiveRunBuilder, UInt32Builder};
use arrow_array::types::*;
use arrow_buffer::ScalarBuffer;
@@ -2480,6 +2477,126 @@ mod tests {
);
}
+ #[test]
+ fn test_large_slice_uint32() {
+ ensure_roundtrip(Arc::new(UInt32Array::from_iter((0..8000).map(|i| {
+ if i % 2 == 0 {
+ Some(i)
+ } else {
+ None
+ }
+ }))));
+ }
+
+ #[test]
+ fn test_large_slice_string() {
+ let strings: Vec<_> = (0..8000)
+ .map(|i| {
+ if i % 2 == 0 {
+ Some(format!("value{}", i))
+ } else {
+ None
+ }
+ })
+ .collect();
+
+ ensure_roundtrip(Arc::new(StringArray::from(strings)));
+ }
+
+ #[test]
+ fn test_large_slice_string_list() {
+ let mut ls = ListBuilder::new(StringBuilder::new());
+
+ let mut s = String::new();
+ for row_number in 0..8000 {
+ if row_number % 2 == 0 {
+ for list_element in 0..1000 {
+ s.clear();
+ use std::fmt::Write;
+ write!(&mut s,
"value{row_number}-{list_element}").unwrap();
+ ls.values().append_value(&s);
+ }
+ ls.append(true)
+ } else {
+ ls.append(false); // null
+ }
+ }
+
+ ensure_roundtrip(Arc::new(ls.finish()));
+ }
+
+ #[test]
+ fn test_large_slice_string_list_of_lists() {
+ // The reason for the special test is to verify reencode_offsets which
looks both at
+ // the starting offset and the data offset. So need a dataset where
the starting_offset
+ // is zero but the data offset is not.
+ let mut ls = ListBuilder::new(ListBuilder::new(StringBuilder::new()));
+
+ for _ in 0..4000 {
+ ls.values().append(true);
+ ls.append(true)
+ }
+
+ let mut s = String::new();
+ for row_number in 0..4000 {
+ if row_number % 2 == 0 {
+ for list_element in 0..1000 {
+ s.clear();
+ use std::fmt::Write;
+ write!(&mut s,
"value{row_number}-{list_element}").unwrap();
+ ls.values().values().append_value(&s);
+ }
+ ls.values().append(true);
+ ls.append(true)
+ } else {
+ ls.append(false); // null
+ }
+ }
+
+ ensure_roundtrip(Arc::new(ls.finish()));
+ }
+
+ /// Read/write a record batch to a File and Stream and ensure it is the
same at the outout
+ fn ensure_roundtrip(array: ArrayRef) {
+ let num_rows = array.len();
+ let orig_batch = RecordBatch::try_from_iter(vec![("a",
array)]).unwrap();
+ // take off the first element
+ let sliced_batch = orig_batch.slice(1, num_rows - 1);
+
+ let schema = orig_batch.schema();
+ let stream_data = {
+ let mut writer = StreamWriter::try_new(vec![], &schema).unwrap();
+ writer.write(&sliced_batch).unwrap();
+ writer.into_inner().unwrap()
+ };
+ let read_batch = {
+ let projection = None;
+ let mut reader = StreamReader::try_new(Cursor::new(stream_data),
projection).unwrap();
+ reader
+ .next()
+ .expect("expect no errors reading batch")
+ .expect("expect batch")
+ };
+ assert_eq!(sliced_batch, read_batch);
+
+ let file_data = {
+ let mut writer = FileWriter::try_new_buffered(vec![],
&schema).unwrap();
+ writer.write(&sliced_batch).unwrap();
+ writer.into_inner().unwrap().into_inner().unwrap()
+ };
+ let read_batch = {
+ let projection = None;
+ let mut reader = FileReader::try_new(Cursor::new(file_data),
projection).unwrap();
+ reader
+ .next()
+ .expect("expect no errors reading batch")
+ .expect("expect batch")
+ };
+ assert_eq!(sliced_batch, read_batch);
+
+ // TODO test file writer/reader
+ }
+
#[test]
fn encode_bools_slice() {
// Test case for https://github.com/apache/arrow-rs/issues/3496
@@ -2662,6 +2779,40 @@ mod tests {
builder.finish()
}
+ #[test]
+ fn reencode_offsets_when_first_offset_is_not_zero() {
+ let original_list = generate_list_data::<i32>();
+ let original_data = original_list.into_data();
+ let slice_data = original_data.slice(75, 7);
+ let (new_offsets, original_start, length) =
+ reencode_offsets::<i32>(&slice_data.buffers()[0], &slice_data);
+ assert_eq!(
+ vec![0, 3, 6, 9, 12, 15, 18, 21],
+ new_offsets.typed_data::<i32>()
+ );
+ assert_eq!(225, original_start);
+ assert_eq!(21, length);
+ }
+
+ #[test]
+ fn reencode_offsets_when_first_offset_is_zero() {
+ let mut ls = GenericListBuilder::<i32, _>::new(UInt32Builder::new());
+ // ls = [[], [35, 42]
+ ls.append(true);
+ ls.values().append_value(35);
+ ls.values().append_value(42);
+ ls.append(true);
+ let original_list = ls.finish();
+ let original_data = original_list.into_data();
+
+ let slice_data = original_data.slice(1, 1);
+ let (new_offsets, original_start, length) =
+ reencode_offsets::<i32>(&slice_data.buffers()[0], &slice_data);
+ assert_eq!(vec![0, 2], new_offsets.typed_data::<i32>());
+ assert_eq!(0, original_start);
+ assert_eq!(2, length);
+ }
+
/// Ensure when serde full & sliced versions they are equal to original
input.
/// Also ensure serialized sliced version is significantly smaller than
serialized full.
fn roundtrip_ensure_sliced_smaller(in_batch: RecordBatch,
expected_size_factor: usize) {