This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git


The following commit(s) were added to refs/heads/master by this push:
     new 17058c76be6 IPC format support for StringViewArray and BinaryViewArray 
(#5525)
17058c76be6 is described below

commit 17058c76be6206b5f87ffa3fb1aaa10c255c2e67
Author: Xiangpeng Hao <[email protected]>
AuthorDate: Mon Apr 1 14:30:56 2024 -0400

    IPC format support for StringViewArray and BinaryViewArray (#5525)
    
    * check in ipc format for view types
    
    * update tests
    
    * fix variadic counting
    
    * fix linting, address comments
    
    * Apply suggestions from code review
    
    Co-authored-by: Benjamin Kietzman <[email protected]>
    Co-authored-by: Raphael Taylor-Davies 
<[email protected]>
    
    * address some review comments
    
    * update comments
    
    * Add tests and fix bugs with dict types
    
    * make clippy happy
    
    * update test cases
    
    ---------
    
    Co-authored-by: Benjamin Kietzman <[email protected]>
    Co-authored-by: Raphael Taylor-Davies 
<[email protected]>
    Co-authored-by: Andrew Lamb <[email protected]>
---
 arrow-ipc/src/convert.rs |  15 +++-
 arrow-ipc/src/reader.rs  | 197 ++++++++++++++++++++++++++++++++++++++++++-----
 arrow-ipc/src/writer.rs  | 111 ++++++++++++++++++++++++++
 3 files changed, 304 insertions(+), 19 deletions(-)

diff --git a/arrow-ipc/src/convert.rs b/arrow-ipc/src/convert.rs
index 51e54215ea7..49da0efae3a 100644
--- a/arrow-ipc/src/convert.rs
+++ b/arrow-ipc/src/convert.rs
@@ -247,8 +247,10 @@ pub(crate) fn get_data_type(field: crate::Field, 
may_be_dictionary: bool) -> Dat
             }
         }
         crate::Type::Binary => DataType::Binary,
+        crate::Type::BinaryView => DataType::BinaryView,
         crate::Type::LargeBinary => DataType::LargeBinary,
         crate::Type::Utf8 => DataType::Utf8,
+        crate::Type::Utf8View => DataType::Utf8View,
         crate::Type::LargeUtf8 => DataType::LargeUtf8,
         crate::Type::FixedSizeBinary => {
             let fsb = field.type_as_fixed_size_binary().unwrap();
@@ -548,7 +550,16 @@ pub(crate) fn get_fb_field_type<'a>(
                 .as_union_value(),
             children: Some(fbb.create_vector(&empty_fields[..])),
         },
-        BinaryView | Utf8View => unimplemented!("unimplemented"),
+        BinaryView => FBFieldType {
+            type_type: crate::Type::BinaryView,
+            type_: 
crate::BinaryViewBuilder::new(fbb).finish().as_union_value(),
+            children: Some(fbb.create_vector(&empty_fields[..])),
+        },
+        Utf8View => FBFieldType {
+            type_type: crate::Type::Utf8View,
+            type_: crate::Utf8ViewBuilder::new(fbb).finish().as_union_value(),
+            children: Some(fbb.create_vector(&empty_fields[..])),
+        },
         Utf8 => FBFieldType {
             type_type: crate::Type::Utf8,
             type_: crate::Utf8Builder::new(fbb).finish().as_union_value(),
@@ -921,7 +932,9 @@ mod tests {
                     true,
                 ),
                 Field::new("utf8", DataType::Utf8, false),
+                Field::new("utf8_view", DataType::Utf8View, false),
                 Field::new("binary", DataType::Binary, false),
+                Field::new("binary_view", DataType::BinaryView, false),
                 Field::new_list("list[u8]", Field::new("item", 
DataType::UInt8, false), true),
                 Field::new_fixed_size_list(
                     "fixed_size_list[u8]",
diff --git a/arrow-ipc/src/reader.rs b/arrow-ipc/src/reader.rs
index dd0365da4bc..4591777c1e3 100644
--- a/arrow-ipc/src/reader.rs
+++ b/arrow-ipc/src/reader.rs
@@ -25,7 +25,7 @@ mod stream;
 pub use stream::*;
 
 use flatbuffers::{VectorIter, VerifierOptions};
-use std::collections::HashMap;
+use std::collections::{HashMap, VecDeque};
 use std::fmt;
 use std::io::{BufReader, Read, Seek, SeekFrom};
 use std::sync::Arc;
@@ -64,6 +64,9 @@ fn read_buffer(
 
 /// Coordinates reading arrays based on data types.
 ///
+/// `variadic_counts` encodes the number of buffers to read for variadic types 
(e.g., Utf8View, BinaryView)
+/// When encounter such types, we pop from the front of the queue to get the 
number of buffers to read.
+///
 /// Notes:
 /// * In the IPC format, null buffers are always set, but may be empty. We 
discard them if an array has 0 nulls
 /// * Numeric values inside list arrays are often stored as 64-bit values 
regardless of their data type size.
@@ -71,7 +74,11 @@ fn read_buffer(
 ///     - check if the bit width of non-64-bit numbers is 64, and
 ///     - read the buffer as 64-bit (signed integer or float), and
 ///     - cast the 64-bit array to the appropriate data type
-fn create_array(reader: &mut ArrayReader, field: &Field) -> Result<ArrayRef, 
ArrowError> {
+fn create_array(
+    reader: &mut ArrayReader,
+    field: &Field,
+    variadic_counts: &mut VecDeque<i64>,
+) -> Result<ArrayRef, ArrowError> {
     let data_type = field.data_type();
     match data_type {
         Utf8 | Binary | LargeBinary | LargeUtf8 => create_primitive_array(
@@ -83,6 +90,18 @@ fn create_array(reader: &mut ArrayReader, field: &Field) -> 
Result<ArrayRef, Arr
                 reader.next_buffer()?,
             ],
         ),
+        BinaryView | Utf8View => {
+            let count = variadic_counts
+                .pop_front()
+                .ok_or(ArrowError::IpcError(format!(
+                    "Missing variadic count for {data_type} column"
+                )))?;
+            let count = count + 2; // view and null buffer.
+            let buffers = (0..count)
+                .map(|_| reader.next_buffer())
+                .collect::<Result<Vec<_>, _>>()?;
+            create_primitive_array(reader.next_node(field)?, data_type, 
&buffers)
+        }
         FixedSizeBinary(_) => create_primitive_array(
             reader.next_node(field)?,
             data_type,
@@ -91,13 +110,13 @@ fn create_array(reader: &mut ArrayReader, field: &Field) 
-> Result<ArrayRef, Arr
         List(ref list_field) | LargeList(ref list_field) | Map(ref list_field, 
_) => {
             let list_node = reader.next_node(field)?;
             let list_buffers = [reader.next_buffer()?, reader.next_buffer()?];
-            let values = create_array(reader, list_field)?;
+            let values = create_array(reader, list_field, variadic_counts)?;
             create_list_array(list_node, data_type, &list_buffers, values)
         }
         FixedSizeList(ref list_field, _) => {
             let list_node = reader.next_node(field)?;
             let list_buffers = [reader.next_buffer()?];
-            let values = create_array(reader, list_field)?;
+            let values = create_array(reader, list_field, variadic_counts)?;
             create_list_array(list_node, data_type, &list_buffers, values)
         }
         Struct(struct_fields) => {
@@ -109,7 +128,7 @@ fn create_array(reader: &mut ArrayReader, field: &Field) -> 
Result<ArrayRef, Arr
             // TODO investigate whether just knowing the number of buffers 
could
             // still work
             for struct_field in struct_fields {
-                let child = create_array(reader, struct_field)?;
+                let child = create_array(reader, struct_field, 
variadic_counts)?;
                 struct_arrays.push((struct_field.clone(), child));
             }
             let null_count = struct_node.null_count() as usize;
@@ -123,8 +142,8 @@ fn create_array(reader: &mut ArrayReader, field: &Field) -> 
Result<ArrayRef, Arr
         }
         RunEndEncoded(run_ends_field, values_field) => {
             let run_node = reader.next_node(field)?;
-            let run_ends = create_array(reader, run_ends_field)?;
-            let values = create_array(reader, values_field)?;
+            let run_ends = create_array(reader, run_ends_field, 
variadic_counts)?;
+            let values = create_array(reader, values_field, variadic_counts)?;
 
             let run_array_length = run_node.length() as usize;
             let data = ArrayData::builder(data_type.clone())
@@ -177,7 +196,7 @@ fn create_array(reader: &mut ArrayReader, field: &Field) -> 
Result<ArrayRef, Arr
             let mut ids = Vec::with_capacity(fields.len());
 
             for (id, field) in fields.iter() {
-                let child = create_array(reader, field)?;
+                let child = create_array(reader, field, variadic_counts)?;
                 children.push((field.as_ref().clone(), child));
                 ids.push(id);
             }
@@ -230,6 +249,11 @@ fn create_primitive_array(
                 .null_bit_buffer(null_buffer)
                 .build_aligned()?
         }
+        BinaryView | Utf8View => ArrayData::builder(data_type.clone())
+            .len(length)
+            .buffers(buffers[1..].to_vec())
+            .null_bit_buffer(null_buffer)
+            .build_aligned()?,
         _ if data_type.is_primitive() || matches!(data_type, Boolean | 
FixedSizeBinary(_)) => {
             // read 2 buffers: null buffer (optional) and data buffer
             ArrayData::builder(data_type.clone())
@@ -328,7 +352,11 @@ impl<'a> ArrayReader<'a> {
         })
     }
 
-    fn skip_field(&mut self, field: &Field) -> Result<(), ArrowError> {
+    fn skip_field(
+        &mut self,
+        field: &Field,
+        variadic_count: &mut VecDeque<i64>,
+    ) -> Result<(), ArrowError> {
         self.next_node(field)?;
 
         match field.data_type() {
@@ -337,6 +365,18 @@ impl<'a> ArrayReader<'a> {
                     self.skip_buffer()
                 }
             }
+            Utf8View | BinaryView => {
+                let count = variadic_count
+                    .pop_front()
+                    .ok_or(ArrowError::IpcError(format!(
+                        "Missing variadic count for {} column",
+                        field.data_type()
+                    )))?;
+                let count = count + 2; // view and null buffer.
+                for _i in 0..count {
+                    self.skip_buffer()
+                }
+            }
             FixedSizeBinary(_) => {
                 self.skip_buffer();
                 self.skip_buffer();
@@ -344,23 +384,23 @@ impl<'a> ArrayReader<'a> {
             List(list_field) | LargeList(list_field) | Map(list_field, _) => {
                 self.skip_buffer();
                 self.skip_buffer();
-                self.skip_field(list_field)?;
+                self.skip_field(list_field, variadic_count)?;
             }
             FixedSizeList(list_field, _) => {
                 self.skip_buffer();
-                self.skip_field(list_field)?;
+                self.skip_field(list_field, variadic_count)?;
             }
             Struct(struct_fields) => {
                 self.skip_buffer();
 
                 // skip for each field
                 for struct_field in struct_fields {
-                    self.skip_field(struct_field)?
+                    self.skip_field(struct_field, variadic_count)?
                 }
             }
             RunEndEncoded(run_ends_field, values_field) => {
-                self.skip_field(run_ends_field)?;
-                self.skip_field(values_field)?;
+                self.skip_field(run_ends_field, variadic_count)?;
+                self.skip_field(values_field, variadic_count)?;
             }
             Dictionary(_, _) => {
                 self.skip_buffer(); // Nulls
@@ -375,7 +415,7 @@ impl<'a> ArrayReader<'a> {
                 };
 
                 for (_, field) in fields.iter() {
-                    self.skip_field(field)?
+                    self.skip_field(field, variadic_count)?
                 }
             }
             Null => {} // No buffer increases
@@ -403,6 +443,10 @@ pub fn read_record_batch(
     let field_nodes = batch.nodes().ok_or_else(|| {
         ArrowError::IpcError("Unable to get field nodes from IPC 
RecordBatch".to_string())
     })?;
+
+    let mut variadic_counts: VecDeque<i64> =
+        batch.variadicBufferCounts().into_iter().flatten().collect();
+
     let batch_compression = batch.compression();
     let compression = batch_compression
         .map(|batch_compression| batch_compression.codec().try_into())
@@ -425,12 +469,13 @@ pub fn read_record_batch(
         for (idx, field) in schema.fields().iter().enumerate() {
             // Create array for projected field
             if let Some(proj_idx) = projection.iter().position(|p| p == &idx) {
-                let child = create_array(&mut reader, field)?;
+                let child = create_array(&mut reader, field, &mut 
variadic_counts)?;
                 arrays.push((proj_idx, child));
             } else {
-                reader.skip_field(field)?;
+                reader.skip_field(field, &mut variadic_counts)?;
             }
         }
+        assert!(variadic_counts.is_empty());
         arrays.sort_by_key(|t| t.0);
         RecordBatch::try_new_with_options(
             Arc::new(schema.project(projection)?),
@@ -441,9 +486,10 @@ pub fn read_record_batch(
         let mut children = vec![];
         // keep track of index as lists require more than one node
         for field in schema.fields() {
-            let child = create_array(&mut reader, field)?;
+            let child = create_array(&mut reader, field, &mut 
variadic_counts)?;
             children.push(child);
         }
+        assert!(variadic_counts.is_empty());
         RecordBatch::try_new_with_options(schema, children, &options)
     }
 }
@@ -1759,6 +1805,121 @@ mod tests {
         assert_eq!(input_batch, output_batch);
     }
 
+    const LONG_TEST_STRING: &str =
+        "This is a long string to make sure binary view array handles it";
+
+    #[test]
+    fn test_roundtrip_view_types() {
+        let schema = Schema::new(vec![
+            Field::new("field_1", DataType::BinaryView, true),
+            Field::new("field_2", DataType::Utf8, true),
+            Field::new("field_3", DataType::Utf8View, true),
+        ]);
+        let bin_values: Vec<Option<&[u8]>> = vec![
+            Some(b"foo"),
+            None,
+            Some(b"bar"),
+            Some(LONG_TEST_STRING.as_bytes()),
+        ];
+        let utf8_values: Vec<Option<&str>> =
+            vec![Some("foo"), None, Some("bar"), Some(LONG_TEST_STRING)];
+        let bin_view_array = BinaryViewArray::from_iter(bin_values);
+        let utf8_array = StringArray::from_iter(utf8_values.iter());
+        let utf8_view_array = StringViewArray::from_iter(utf8_values);
+        let record_batch = RecordBatch::try_new(
+            Arc::new(schema.clone()),
+            vec![
+                Arc::new(bin_view_array),
+                Arc::new(utf8_array),
+                Arc::new(utf8_view_array),
+            ],
+        )
+        .unwrap();
+
+        assert_eq!(record_batch, roundtrip_ipc(&record_batch));
+        assert_eq!(record_batch, roundtrip_ipc_stream(&record_batch));
+
+        let sliced_batch = record_batch.slice(1, 2);
+        assert_eq!(sliced_batch, roundtrip_ipc(&sliced_batch));
+        assert_eq!(sliced_batch, roundtrip_ipc_stream(&sliced_batch));
+    }
+
+    #[test]
+    fn test_roundtrip_view_types_nested_dict() {
+        let bin_values: Vec<Option<&[u8]>> = vec![
+            Some(b"foo"),
+            None,
+            Some(b"bar"),
+            Some(LONG_TEST_STRING.as_bytes()),
+            Some(b"field"),
+        ];
+        let utf8_values: Vec<Option<&str>> = vec![
+            Some("foo"),
+            None,
+            Some("bar"),
+            Some(LONG_TEST_STRING),
+            Some("field"),
+        ];
+        let bin_view_array = Arc::new(BinaryViewArray::from_iter(bin_values));
+        let utf8_view_array = 
Arc::new(StringViewArray::from_iter(utf8_values));
+
+        let key_dict_keys = Int8Array::from_iter_values([0, 0, 1, 2, 0, 1, 3]);
+        let key_dict_array = DictionaryArray::new(key_dict_keys, 
utf8_view_array.clone());
+        let keys_field = Arc::new(Field::new_dict(
+            "keys",
+            DataType::Dictionary(Box::new(DataType::Int8), 
Box::new(DataType::Utf8View)),
+            true,
+            1,
+            false,
+        ));
+
+        let value_dict_keys = Int8Array::from_iter_values([0, 3, 0, 1, 2, 0, 
1]);
+        let value_dict_array = DictionaryArray::new(value_dict_keys, 
bin_view_array);
+        let values_field = Arc::new(Field::new_dict(
+            "values",
+            DataType::Dictionary(Box::new(DataType::Int8), 
Box::new(DataType::BinaryView)),
+            true,
+            2,
+            false,
+        ));
+        let entry_struct = StructArray::from(vec![
+            (keys_field, make_array(key_dict_array.into_data())),
+            (values_field, make_array(value_dict_array.into_data())),
+        ]);
+
+        let map_data_type = DataType::Map(
+            Arc::new(Field::new(
+                "entries",
+                entry_struct.data_type().clone(),
+                false,
+            )),
+            false,
+        );
+        let entry_offsets = Buffer::from_slice_ref([0, 2, 4, 7]);
+        let map_data = ArrayData::builder(map_data_type)
+            .len(3)
+            .add_buffer(entry_offsets)
+            .add_child_data(entry_struct.into_data())
+            .build()
+            .unwrap();
+        let map_array = MapArray::from(map_data);
+
+        let dict_keys = Int8Array::from_iter_values([0, 1, 0, 1, 1, 2, 0, 1, 
2]);
+        let dict_dict_array = DictionaryArray::new(dict_keys, 
Arc::new(map_array));
+        let schema = Arc::new(Schema::new(vec![Field::new(
+            "f1",
+            dict_dict_array.data_type().clone(),
+            false,
+        )]));
+        let batch = RecordBatch::try_new(schema, 
vec![Arc::new(dict_dict_array)]).unwrap();
+        assert_eq!(batch, roundtrip_ipc(&batch));
+        assert_eq!(batch, roundtrip_ipc_stream(&batch));
+
+        let sliced_batch = batch.slice(1, 2);
+        assert_eq!(sliced_batch, roundtrip_ipc(&sliced_batch));
+        assert_eq!(sliced_batch, roundtrip_ipc_stream(&sliced_batch));
+    }
+
     #[test]
     fn test_no_columns_batch() {
         let schema = Arc::new(Schema::empty());
diff --git a/arrow-ipc/src/writer.rs b/arrow-ipc/src/writer.rs
index 22edfbc2454..2a3474fe0fc 100644
--- a/arrow-ipc/src/writer.rs
+++ b/arrow-ipc/src/writer.rs
@@ -412,6 +412,8 @@ impl IpcDataGenerator {
         let compression_codec: Option<CompressionCodec> =
             batch_compression_type.map(TryInto::try_into).transpose()?;
 
+        let mut variadic_buffer_counts = vec![];
+
         for array in batch.columns() {
             let array_data = array.to_data();
             offset = write_array_data(
@@ -425,6 +427,8 @@ impl IpcDataGenerator {
                 compression_codec,
                 write_options,
             )?;
+
+            append_variadic_buffer_counts(&mut variadic_buffer_counts, 
&array_data);
         }
         // pad the tail of body data
         let len = arrow_data.len();
@@ -434,6 +438,12 @@ impl IpcDataGenerator {
         // write data
         let buffers = fbb.create_vector(&buffers);
         let nodes = fbb.create_vector(&nodes);
+        let variadic_buffer = if variadic_buffer_counts.is_empty() {
+            None
+        } else {
+            Some(fbb.create_vector(&variadic_buffer_counts))
+        };
+
         let root = {
             let mut batch_builder = crate::RecordBatchBuilder::new(&mut fbb);
             batch_builder.add_length(batch.num_rows() as i64);
@@ -442,6 +452,10 @@ impl IpcDataGenerator {
             if let Some(c) = compression {
                 batch_builder.add_compression(c);
             }
+
+            if let Some(v) = variadic_buffer {
+                batch_builder.add_variadicBufferCounts(v);
+            }
             let b = batch_builder.finish();
             b.as_union_value()
         };
@@ -501,6 +515,9 @@ impl IpcDataGenerator {
             write_options,
         )?;
 
+        let mut variadic_buffer_counts = vec![];
+        append_variadic_buffer_counts(&mut variadic_buffer_counts, array_data);
+
         // pad the tail of body data
         let len = arrow_data.len();
         let pad_len = pad_to_8(len as u32);
@@ -509,6 +526,11 @@ impl IpcDataGenerator {
         // write data
         let buffers = fbb.create_vector(&buffers);
         let nodes = fbb.create_vector(&nodes);
+        let variadic_buffer = if variadic_buffer_counts.is_empty() {
+            None
+        } else {
+            Some(fbb.create_vector(&variadic_buffer_counts))
+        };
 
         let root = {
             let mut batch_builder = crate::RecordBatchBuilder::new(&mut fbb);
@@ -518,6 +540,9 @@ impl IpcDataGenerator {
             if let Some(c) = compression {
                 batch_builder.add_compression(c);
             }
+            if let Some(v) = variadic_buffer {
+                batch_builder.add_variadicBufferCounts(v);
+            }
             batch_builder.finish()
         };
 
@@ -547,6 +572,25 @@ impl IpcDataGenerator {
     }
 }
 
+fn append_variadic_buffer_counts(counts: &mut Vec<i64>, array: &ArrayData) {
+    match array.data_type() {
+        DataType::BinaryView | DataType::Utf8View => {
+            // The spec documents the counts only includes the variadic 
buffers, not the view/null buffers.
+            // 
https://arrow.apache.org/docs/format/Columnar.html#variadic-buffers
+            counts.push(array.buffers().len() as i64 - 1);
+        }
+        DataType::Dictionary(_, _) => {
+            // Do nothing
+            // Dictionary types are handled in `encode_dictionaries`.
+        }
+        _ => {
+            for child in array.child_data() {
+                append_variadic_buffer_counts(counts, child)
+            }
+        }
+    }
+}
+
 pub(crate) fn unslice_run_array(arr: ArrayData) -> Result<ArrayData, 
ArrowError> {
     match arr.data_type() {
         DataType::RunEndEncoded(k, _) => match k.data_type() {
@@ -1249,6 +1293,22 @@ fn write_array_data(
                 compression_codec,
             )?;
         }
+    } else if matches!(data_type, DataType::BinaryView | DataType::Utf8View) {
+        // Slicing the views buffer is safe and easy,
+        // but pruning unneeded data buffers is much more nuanced since it's 
complicated to prove that no views reference the pruned buffers
+        //
+        // Current implementation just serialize the raw arrays as given and 
not try to optimize anything.
+        // If users wants to "compact" the arrays prior to sending them over 
IPC,
+        // they should consider the gc API suggested in #5513
+        for buffer in array_data.buffers() {
+            offset = write_buffer(
+                buffer.as_slice(),
+                buffers,
+                arrow_data,
+                offset,
+                compression_codec,
+            )?;
+        }
     } else if matches!(data_type, DataType::LargeBinary | DataType::LargeUtf8) 
{
         let (offsets, values) = get_byte_array_buffers::<i64>(array_data);
         for buffer in [offsets, values] {
@@ -1804,6 +1864,57 @@ mod tests {
         write_union_file(IpcWriteOptions::try_new(8, false, 
MetadataVersion::V5).unwrap());
     }
 
+    #[test]
+    fn test_write_view_types() {
+        const LONG_TEST_STRING: &str =
+            "This is a long string to make sure binary view array handles it";
+        let schema = Schema::new(vec![
+            Field::new("field1", DataType::BinaryView, true),
+            Field::new("field2", DataType::Utf8View, true),
+        ]);
+        let values: Vec<Option<&[u8]>> = vec![
+            Some(b"foo"),
+            Some(b"bar"),
+            Some(LONG_TEST_STRING.as_bytes()),
+        ];
+        let binary_array = BinaryViewArray::from_iter(values);
+        let utf8_array =
+            StringViewArray::from_iter(vec![Some("foo"), Some("bar"), 
Some(LONG_TEST_STRING)]);
+        let record_batch = RecordBatch::try_new(
+            Arc::new(schema.clone()),
+            vec![Arc::new(binary_array), Arc::new(utf8_array)],
+        )
+        .unwrap();
+
+        let mut file = tempfile::tempfile().unwrap();
+        {
+            let mut writer = FileWriter::try_new(&mut file, &schema).unwrap();
+            writer.write(&record_batch).unwrap();
+            writer.finish().unwrap();
+        }
+        file.rewind().unwrap();
+        {
+            let mut reader = FileReader::try_new(&file, None).unwrap();
+            let read_batch = reader.next().unwrap().unwrap();
+            read_batch
+                .columns()
+                .iter()
+                .zip(record_batch.columns())
+                .for_each(|(a, b)| {
+                    assert_eq!(a, b);
+                });
+        }
+        file.rewind().unwrap();
+        {
+            let mut reader = FileReader::try_new(&file, 
Some(vec![0])).unwrap();
+            let read_batch = reader.next().unwrap().unwrap();
+            assert_eq!(read_batch.num_columns(), 1);
+            let read_array = read_batch.column(0);
+            let write_array = record_batch.column(0);
+            assert_eq!(read_array, write_array);
+        }
+    }
+
     #[test]
     fn truncate_ipc_record_batch() {
         fn create_batch(rows: usize) -> RecordBatch {

Reply via email to