This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git


The following commit(s) were added to refs/heads/main by this push:
     new 0e40460e6e fix: Panic on reencoding offsets in arrow-ipc with sliced 
nested arrays (#6998)
0e40460e6e is described below

commit 0e40460e6e12bc02c7dd0e842f0156affc2b2a15
Author: Michael Maletich <[email protected]>
AuthorDate: Sat Jan 25 08:38:53 2025 -0600

    fix: Panic on reencoding offsets in arrow-ipc with sliced nested arrays 
(#6998)
    
    * fix: Panic on reencoding offsets
    
    Code was incorrectly defining the end of the offset slice to be the start + 
slice_length * 2 because slice_with_length adds the start to the end.
    This caused the encoded batches to be larger than they needed to be and 
would result in a panic for certain slices.
    
    * Add tests for slicing larger arrays
    
    * Run rustfmt
    
    * Added end to end unit test which shows the problem is fixed.
    
    ---------
    
    Co-authored-by: Andrew Lamb <[email protected]>
---
 arrow-ipc/src/writer.rs | 161 ++++++++++++++++++++++++++++++++++++++++++++++--
 1 file changed, 156 insertions(+), 5 deletions(-)

diff --git a/arrow-ipc/src/writer.rs b/arrow-ipc/src/writer.rs
index 1581df56de..451b7e02c5 100644
--- a/arrow-ipc/src/writer.rs
+++ b/arrow-ipc/src/writer.rs
@@ -1520,10 +1520,7 @@ fn reencode_offsets<O: OffsetSizeTrait>(
     let offsets = match start_offset.as_usize() {
         0 => {
             let size = size_of::<O>();
-            offsets.slice_with_length(
-                data.offset() * size,
-                (data.offset() + data.len() + 1) * size,
-            )
+            offsets.slice_with_length(data.offset() * size, (data.len() + 1) * 
size)
         }
         _ => offset_slice.iter().map(|x| *x - *start_offset).collect(),
     };
@@ -1840,9 +1837,9 @@ mod tests {
     use std::io::Cursor;
     use std::io::Seek;
 
-    use arrow_array::builder::GenericListBuilder;
     use arrow_array::builder::MapBuilder;
     use arrow_array::builder::UnionBuilder;
+    use arrow_array::builder::{GenericListBuilder, ListBuilder, StringBuilder};
     use arrow_array::builder::{PrimitiveRunBuilder, UInt32Builder};
     use arrow_array::types::*;
     use arrow_buffer::ScalarBuffer;
@@ -2480,6 +2477,126 @@ mod tests {
         );
     }
 
+    #[test]
+    fn test_large_slice_uint32() {
+        ensure_roundtrip(Arc::new(UInt32Array::from_iter((0..8000).map(|i| {
+            if i % 2 == 0 {
+                Some(i)
+            } else {
+                None
+            }
+        }))));
+    }
+
+    #[test]
+    fn test_large_slice_string() {
+        let strings: Vec<_> = (0..8000)
+            .map(|i| {
+                if i % 2 == 0 {
+                    Some(format!("value{}", i))
+                } else {
+                    None
+                }
+            })
+            .collect();
+
+        ensure_roundtrip(Arc::new(StringArray::from(strings)));
+    }
+
+    #[test]
+    fn test_large_slice_string_list() {
+        let mut ls = ListBuilder::new(StringBuilder::new());
+
+        let mut s = String::new();
+        for row_number in 0..8000 {
+            if row_number % 2 == 0 {
+                for list_element in 0..1000 {
+                    s.clear();
+                    use std::fmt::Write;
+                    write!(&mut s, 
"value{row_number}-{list_element}").unwrap();
+                    ls.values().append_value(&s);
+                }
+                ls.append(true)
+            } else {
+                ls.append(false); // null
+            }
+        }
+
+        ensure_roundtrip(Arc::new(ls.finish()));
+    }
+
+    #[test]
+    fn test_large_slice_string_list_of_lists() {
+        // The reason for the special test is to verify reencode_offsets which 
looks both at
+        // the starting offset and the data offset.  So need a dataset where 
the starting_offset
+        // is zero but the data offset is not.
+        let mut ls = ListBuilder::new(ListBuilder::new(StringBuilder::new()));
+
+        for _ in 0..4000 {
+            ls.values().append(true);
+            ls.append(true)
+        }
+
+        let mut s = String::new();
+        for row_number in 0..4000 {
+            if row_number % 2 == 0 {
+                for list_element in 0..1000 {
+                    s.clear();
+                    use std::fmt::Write;
+                    write!(&mut s, 
"value{row_number}-{list_element}").unwrap();
+                    ls.values().values().append_value(&s);
+                }
+                ls.values().append(true);
+                ls.append(true)
+            } else {
+                ls.append(false); // null
+            }
+        }
+
+        ensure_roundtrip(Arc::new(ls.finish()));
+    }
+
+    /// Read/write a record batch to a File and Stream and ensure it is the 
same at the outout
+    fn ensure_roundtrip(array: ArrayRef) {
+        let num_rows = array.len();
+        let orig_batch = RecordBatch::try_from_iter(vec![("a", 
array)]).unwrap();
+        // take off the first element
+        let sliced_batch = orig_batch.slice(1, num_rows - 1);
+
+        let schema = orig_batch.schema();
+        let stream_data = {
+            let mut writer = StreamWriter::try_new(vec![], &schema).unwrap();
+            writer.write(&sliced_batch).unwrap();
+            writer.into_inner().unwrap()
+        };
+        let read_batch = {
+            let projection = None;
+            let mut reader = StreamReader::try_new(Cursor::new(stream_data), 
projection).unwrap();
+            reader
+                .next()
+                .expect("expect no errors reading batch")
+                .expect("expect batch")
+        };
+        assert_eq!(sliced_batch, read_batch);
+
+        let file_data = {
+            let mut writer = FileWriter::try_new_buffered(vec![], 
&schema).unwrap();
+            writer.write(&sliced_batch).unwrap();
+            writer.into_inner().unwrap().into_inner().unwrap()
+        };
+        let read_batch = {
+            let projection = None;
+            let mut reader = FileReader::try_new(Cursor::new(file_data), 
projection).unwrap();
+            reader
+                .next()
+                .expect("expect no errors reading batch")
+                .expect("expect batch")
+        };
+        assert_eq!(sliced_batch, read_batch);
+
+        // TODO test file writer/reader
+    }
+
     #[test]
     fn encode_bools_slice() {
         // Test case for https://github.com/apache/arrow-rs/issues/3496
@@ -2662,6 +2779,40 @@ mod tests {
         builder.finish()
     }
 
+    #[test]
+    fn reencode_offsets_when_first_offset_is_not_zero() {
+        let original_list = generate_list_data::<i32>();
+        let original_data = original_list.into_data();
+        let slice_data = original_data.slice(75, 7);
+        let (new_offsets, original_start, length) =
+            reencode_offsets::<i32>(&slice_data.buffers()[0], &slice_data);
+        assert_eq!(
+            vec![0, 3, 6, 9, 12, 15, 18, 21],
+            new_offsets.typed_data::<i32>()
+        );
+        assert_eq!(225, original_start);
+        assert_eq!(21, length);
+    }
+
+    #[test]
+    fn reencode_offsets_when_first_offset_is_zero() {
+        let mut ls = GenericListBuilder::<i32, _>::new(UInt32Builder::new());
+        // ls = [[], [35, 42]
+        ls.append(true);
+        ls.values().append_value(35);
+        ls.values().append_value(42);
+        ls.append(true);
+        let original_list = ls.finish();
+        let original_data = original_list.into_data();
+
+        let slice_data = original_data.slice(1, 1);
+        let (new_offsets, original_start, length) =
+            reencode_offsets::<i32>(&slice_data.buffers()[0], &slice_data);
+        assert_eq!(vec![0, 2], new_offsets.typed_data::<i32>());
+        assert_eq!(0, original_start);
+        assert_eq!(2, length);
+    }
+
     /// Ensure when serde full & sliced versions they are equal to original 
input.
     /// Also ensure serialized sliced version is significantly smaller than 
serialized full.
     fn roundtrip_ensure_sliced_smaller(in_batch: RecordBatch, 
expected_size_factor: usize) {

Reply via email to