This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git


The following commit(s) were added to refs/heads/main by this push:
     new 15249bfb19 Refactor ipc reading code into methods on `ArrayReader` 
(#7006)
15249bfb19 is described below

commit 15249bfb196b2cb66aad64822697313d59cf7f69
Author: Andrew Lamb <[email protected]>
AuthorDate: Mon Jan 27 05:50:08 2025 -0500

    Refactor ipc reading code into methods on `ArrayReader` (#7006)
---
 arrow-ipc/src/reader.rs        | 610 ++++++++++++++++++++++-------------------
 arrow-ipc/src/reader/stream.rs |  10 +-
 2 files changed, 330 insertions(+), 290 deletions(-)

diff --git a/arrow-ipc/src/reader.rs b/arrow-ipc/src/reader.rs
index b72785651b..ff509c4f57 100644
--- a/arrow-ipc/src/reader.rs
+++ b/arrow-ipc/src/reader.rs
@@ -65,213 +65,214 @@ fn read_buffer(
         (false, Some(decompressor)) => 
decompressor.decompress_to_buffer(&buf_data),
     }
 }
-
-/// Coordinates reading arrays based on data types.
-///
-/// `variadic_counts` encodes the number of buffers to read for variadic types 
(e.g., Utf8View, BinaryView)
-/// When encounter such types, we pop from the front of the queue to get the 
number of buffers to read.
-///
-/// Notes:
-/// * In the IPC format, null buffers are always set, but may be empty. We 
discard them if an array has 0 nulls
-/// * Numeric values inside list arrays are often stored as 64-bit values 
regardless of their data type size.
-///   We thus:
-///     - check if the bit width of non-64-bit numbers is 64, and
-///     - read the buffer as 64-bit (signed integer or float), and
-///     - cast the 64-bit array to the appropriate data type
-fn create_array(
-    reader: &mut ArrayReader,
-    field: &Field,
-    variadic_counts: &mut VecDeque<i64>,
-    require_alignment: bool,
-) -> Result<ArrayRef, ArrowError> {
-    let data_type = field.data_type();
-    match data_type {
-        Utf8 | Binary | LargeBinary | LargeUtf8 => create_primitive_array(
-            reader.next_node(field)?,
-            data_type,
-            &[
-                reader.next_buffer()?,
-                reader.next_buffer()?,
-                reader.next_buffer()?,
-            ],
-            require_alignment,
-        ),
-        BinaryView | Utf8View => {
-            let count = variadic_counts
-                .pop_front()
-                .ok_or(ArrowError::IpcError(format!(
-                    "Missing variadic count for {data_type} column"
-                )))?;
-            let count = count + 2; // view and null buffer.
-            let buffers = (0..count)
-                .map(|_| reader.next_buffer())
-                .collect::<Result<Vec<_>, _>>()?;
-            create_primitive_array(
+impl ArrayReader<'_> {
+    /// Coordinates reading arrays based on data types.
+    ///
+    /// `variadic_counts` encodes the number of buffers to read for variadic 
types (e.g., Utf8View, BinaryView)
+    /// When encounter such types, we pop from the front of the queue to get 
the number of buffers to read.
+    ///
+    /// Notes:
+    /// * In the IPC format, null buffers are always set, but may be empty. We 
discard them if an array has 0 nulls
+    /// * Numeric values inside list arrays are often stored as 64-bit values 
regardless of their data type size.
+    ///   We thus:
+    ///     - check if the bit width of non-64-bit numbers is 64, and
+    ///     - read the buffer as 64-bit (signed integer or float), and
+    ///     - cast the 64-bit array to the appropriate data type
+    fn create_array(
+        &mut self,
+        field: &Field,
+        variadic_counts: &mut VecDeque<i64>,
+    ) -> Result<ArrayRef, ArrowError> {
+        let reader = self;
+        let data_type = field.data_type();
+        match data_type {
+            Utf8 | Binary | LargeBinary | LargeUtf8 => create_primitive_array(
                 reader.next_node(field)?,
                 data_type,
-                &buffers,
-                require_alignment,
-            )
-        }
-        FixedSizeBinary(_) => create_primitive_array(
-            reader.next_node(field)?,
-            data_type,
-            &[reader.next_buffer()?, reader.next_buffer()?],
-            require_alignment,
-        ),
-        List(ref list_field) | LargeList(ref list_field) | Map(ref list_field, 
_) => {
-            let list_node = reader.next_node(field)?;
-            let list_buffers = [reader.next_buffer()?, reader.next_buffer()?];
-            let values = create_array(reader, list_field, variadic_counts, 
require_alignment)?;
-            create_list_array(
-                list_node,
-                data_type,
-                &list_buffers,
-                values,
-                require_alignment,
-            )
-        }
-        FixedSizeList(ref list_field, _) => {
-            let list_node = reader.next_node(field)?;
-            let list_buffers = [reader.next_buffer()?];
-            let values = create_array(reader, list_field, variadic_counts, 
require_alignment)?;
-            create_list_array(
-                list_node,
+                &[
+                    reader.next_buffer()?,
+                    reader.next_buffer()?,
+                    reader.next_buffer()?,
+                ],
+                reader.require_alignment,
+            ),
+            BinaryView | Utf8View => {
+                let count = variadic_counts
+                    .pop_front()
+                    .ok_or(ArrowError::IpcError(format!(
+                        "Missing variadic count for {data_type} column"
+                    )))?;
+                let count = count + 2; // view and null buffer.
+                let buffers = (0..count)
+                    .map(|_| reader.next_buffer())
+                    .collect::<Result<Vec<_>, _>>()?;
+                create_primitive_array(
+                    reader.next_node(field)?,
+                    data_type,
+                    &buffers,
+                    reader.require_alignment,
+                )
+            }
+            FixedSizeBinary(_) => create_primitive_array(
+                reader.next_node(field)?,
                 data_type,
-                &list_buffers,
-                values,
-                require_alignment,
-            )
-        }
-        Struct(struct_fields) => {
-            let struct_node = reader.next_node(field)?;
-            let null_buffer = reader.next_buffer()?;
-
-            // read the arrays for each field
-            let mut struct_arrays = vec![];
-            // TODO investigate whether just knowing the number of buffers 
could
-            // still work
-            for struct_field in struct_fields {
-                let child = create_array(reader, struct_field, 
variadic_counts, require_alignment)?;
-                struct_arrays.push(child);
+                &[reader.next_buffer()?, reader.next_buffer()?],
+                reader.require_alignment,
+            ),
+            List(ref list_field) | LargeList(ref list_field) | Map(ref 
list_field, _) => {
+                let list_node = reader.next_node(field)?;
+                let list_buffers = [reader.next_buffer()?, 
reader.next_buffer()?];
+                let values = reader.create_array(list_field, variadic_counts)?;
+                create_list_array(
+                    list_node,
+                    data_type,
+                    &list_buffers,
+                    values,
+                    reader.require_alignment,
+                )
             }
-            let null_count = struct_node.null_count() as usize;
-            let struct_array = if struct_arrays.is_empty() {
-                // `StructArray::from` can't infer the correct row count
-                // if we have zero fields
-                let len = struct_node.length() as usize;
-                StructArray::new_empty_fields(
-                    len,
-                    (null_count > 0).then(|| BooleanBuffer::new(null_buffer, 
0, len).into()),
+            FixedSizeList(ref list_field, _) => {
+                let list_node = reader.next_node(field)?;
+                let list_buffers = [reader.next_buffer()?];
+                let values = reader.create_array(list_field, variadic_counts)?;
+                create_list_array(
+                    list_node,
+                    data_type,
+                    &list_buffers,
+                    values,
+                    reader.require_alignment,
                 )
-            } else if null_count > 0 {
-                // create struct array from fields, arrays and null data
-                let len = struct_node.length() as usize;
-                let nulls = BooleanBuffer::new(null_buffer, 0, len).into();
-                StructArray::try_new(struct_fields.clone(), struct_arrays, 
Some(nulls))?
-            } else {
-                StructArray::try_new(struct_fields.clone(), struct_arrays, 
None)?
-            };
-            Ok(Arc::new(struct_array))
-        }
-        RunEndEncoded(run_ends_field, values_field) => {
-            let run_node = reader.next_node(field)?;
-            let run_ends =
-                create_array(reader, run_ends_field, variadic_counts, 
require_alignment)?;
-            let values = create_array(reader, values_field, variadic_counts, 
require_alignment)?;
-
-            let run_array_length = run_node.length() as usize;
-            let array_data = ArrayData::builder(data_type.clone())
-                .len(run_array_length)
-                .offset(0)
-                .add_child_data(run_ends.into_data())
-                .add_child_data(values.into_data())
-                .align_buffers(!require_alignment)
-                .build()?;
-
-            Ok(make_array(array_data))
-        }
-        // Create dictionary array from RecordBatch
-        Dictionary(_, _) => {
-            let index_node = reader.next_node(field)?;
-            let index_buffers = [reader.next_buffer()?, reader.next_buffer()?];
+            }
+            Struct(struct_fields) => {
+                let struct_node = reader.next_node(field)?;
+                let null_buffer = reader.next_buffer()?;
 
-            #[allow(deprecated)]
-            let dict_id = field.dict_id().ok_or_else(|| {
-                ArrowError::ParseError(format!("Field {field} does not have 
dict id"))
-            })?;
-
-            let value_array = 
reader.dictionaries_by_id.get(&dict_id).ok_or_else(|| {
-                ArrowError::ParseError(format!(
-                    "Cannot find a dictionary batch with dict id: {dict_id}"
-                ))
-            })?;
-
-            create_dictionary_array(
-                index_node,
-                data_type,
-                &index_buffers,
-                value_array.clone(),
-                require_alignment,
-            )
-        }
-        Union(fields, mode) => {
-            let union_node = reader.next_node(field)?;
-            let len = union_node.length() as usize;
-
-            // In V4, union types has validity bitmap
-            // In V5 and later, union types have no validity bitmap
-            if reader.version < MetadataVersion::V5 {
-                reader.next_buffer()?;
+                // read the arrays for each field
+                let mut struct_arrays = vec![];
+                // TODO investigate whether just knowing the number of buffers 
could
+                // still work
+                for struct_field in struct_fields {
+                    let child = reader.create_array(struct_field, 
variadic_counts)?;
+                    struct_arrays.push(child);
+                }
+                let null_count = struct_node.null_count() as usize;
+                let struct_array = if struct_arrays.is_empty() {
+                    // `StructArray::from` can't infer the correct row count
+                    // if we have zero fields
+                    let len = struct_node.length() as usize;
+                    StructArray::new_empty_fields(
+                        len,
+                        (null_count > 0).then(|| 
BooleanBuffer::new(null_buffer, 0, len).into()),
+                    )
+                } else if null_count > 0 {
+                    // create struct array from fields, arrays and null data
+                    let len = struct_node.length() as usize;
+                    let nulls = BooleanBuffer::new(null_buffer, 0, len).into();
+                    StructArray::try_new(struct_fields.clone(), struct_arrays, 
Some(nulls))?
+                } else {
+                    StructArray::try_new(struct_fields.clone(), struct_arrays, 
None)?
+                };
+                Ok(Arc::new(struct_array))
             }
+            RunEndEncoded(run_ends_field, values_field) => {
+                let run_node = reader.next_node(field)?;
+                let run_ends = reader.create_array(run_ends_field, 
variadic_counts)?;
+                let values = reader.create_array(values_field, 
variadic_counts)?;
+
+                let run_array_length = run_node.length() as usize;
+                let array_data = ArrayData::builder(data_type.clone())
+                    .len(run_array_length)
+                    .offset(0)
+                    .add_child_data(run_ends.into_data())
+                    .add_child_data(values.into_data())
+                    .align_buffers(!reader.require_alignment)
+                    .build()?;
+
+                Ok(make_array(array_data))
+            }
+            // Create dictionary array from RecordBatch
+            Dictionary(_, _) => {
+                let index_node = reader.next_node(field)?;
+                let index_buffers = [reader.next_buffer()?, 
reader.next_buffer()?];
 
-            let type_ids: ScalarBuffer<i8> = 
reader.next_buffer()?.slice_with_length(0, len).into();
+                #[allow(deprecated)]
+                let dict_id = field.dict_id().ok_or_else(|| {
+                    ArrowError::ParseError(format!("Field {field} does not 
have dict id"))
+                })?;
+
+                let value_array = 
reader.dictionaries_by_id.get(&dict_id).ok_or_else(|| {
+                    ArrowError::ParseError(format!(
+                        "Cannot find a dictionary batch with dict id: 
{dict_id}"
+                    ))
+                })?;
+
+                create_dictionary_array(
+                    index_node,
+                    data_type,
+                    &index_buffers,
+                    value_array.clone(),
+                    reader.require_alignment,
+                )
+            }
+            Union(fields, mode) => {
+                let union_node = reader.next_node(field)?;
+                let len = union_node.length() as usize;
 
-            let value_offsets = match mode {
-                UnionMode::Dense => {
-                    let offsets: ScalarBuffer<i32> =
-                        reader.next_buffer()?.slice_with_length(0, len * 
4).into();
-                    Some(offsets)
+                // In V4, union types has validity bitmap
+                // In V5 and later, union types have no validity bitmap
+                if reader.version < MetadataVersion::V5 {
+                    reader.next_buffer()?;
                 }
-                UnionMode::Sparse => None,
-            };
 
-            let mut children = Vec::with_capacity(fields.len());
+                let type_ids: ScalarBuffer<i8> =
+                    reader.next_buffer()?.slice_with_length(0, len).into();
 
-            for (_id, field) in fields.iter() {
-                let child = create_array(reader, field, variadic_counts, 
require_alignment)?;
-                children.push(child);
-            }
+                let value_offsets = match mode {
+                    UnionMode::Dense => {
+                        let offsets: ScalarBuffer<i32> =
+                            reader.next_buffer()?.slice_with_length(0, len * 
4).into();
+                        Some(offsets)
+                    }
+                    UnionMode::Sparse => None,
+                };
 
-            let array = UnionArray::try_new(fields.clone(), type_ids, 
value_offsets, children)?;
-            Ok(Arc::new(array))
-        }
-        Null => {
-            let node = reader.next_node(field)?;
-            let length = node.length();
-            let null_count = node.null_count();
-
-            if length != null_count {
-                return Err(ArrowError::SchemaError(format!(
-                    "Field {field} of NullArray has unequal null_count 
{null_count} and len {length}"
-                )));
+                let mut children = Vec::with_capacity(fields.len());
+
+                for (_id, field) in fields.iter() {
+                    let child = reader.create_array(field, variadic_counts)?;
+                    children.push(child);
+                }
+
+                let array = UnionArray::try_new(fields.clone(), type_ids, 
value_offsets, children)?;
+                Ok(Arc::new(array))
             }
+            Null => {
+                let node = reader.next_node(field)?;
+                let length = node.length();
+                let null_count = node.null_count();
+
+                if length != null_count {
+                    return Err(ArrowError::SchemaError(format!(
+                        "Field {field} of NullArray has unequal null_count 
{null_count} and len {length}"
+                    )));
+                }
 
-            let array_data = ArrayData::builder(data_type.clone())
-                .len(length as usize)
-                .offset(0)
-                .align_buffers(!require_alignment)
-                .build()?;
+                let array_data = ArrayData::builder(data_type.clone())
+                    .len(length as usize)
+                    .offset(0)
+                    .align_buffers(!reader.require_alignment)
+                    .build()?;
 
-            // no buffer increases
-            Ok(Arc::new(NullArray::from(array_data)))
+                // no buffer increases
+                Ok(Arc::new(NullArray::from(array_data)))
+            }
+            _ => create_primitive_array(
+                reader.next_node(field)?,
+                data_type,
+                &[reader.next_buffer()?, reader.next_buffer()?],
+                reader.require_alignment,
+            ),
         }
-        _ => create_primitive_array(
-            reader.next_node(field)?,
-            data_type,
-            &[reader.next_buffer()?, reader.next_buffer()?],
-            require_alignment,
-        ),
     }
 }
 
@@ -371,6 +372,10 @@ fn create_dictionary_array(
 
 /// State for decoding arrays from an encoded [`RecordBatch`]
 struct ArrayReader<'a> {
+    /// The flatbuffers encoded record batch
+    batch: crate::RecordBatch<'a>,
+    /// The output schema
+    schema: SchemaRef,
     /// Decoded dictionaries indexed by dictionary id
     dictionaries_by_id: &'a HashMap<i64, ArrayRef>,
     /// Optional compression codec
@@ -383,9 +388,111 @@ struct ArrayReader<'a> {
     nodes: VectorIter<'a, FieldNode>,
     /// The buffers comprising this array
     buffers: VectorIter<'a, crate::Buffer>,
+    /// Projection (subset of columns) to read, if any
+    /// See [`ArrayReader::with_projection`] for details
+    projection: Option<&'a [usize]>,
+    /// Are buffers required to already be aligned? See
+    /// [`ArrayReader::with_require_alignment`] for details
+    require_alignment: bool,
 }
 
 impl<'a> ArrayReader<'a> {
+    /// Create a reader for decoding arrays from an encoded [`RecordBatch`]
+    fn try_new(
+        buf: &'a Buffer,
+        batch: crate::RecordBatch<'a>,
+        schema: SchemaRef,
+        dictionaries_by_id: &'a HashMap<i64, ArrayRef>,
+        metadata: &'a MetadataVersion,
+    ) -> Result<Self, ArrowError> {
+        let buffers = batch.buffers().ok_or_else(|| {
+            ArrowError::IpcError("Unable to get buffers from IPC 
RecordBatch".to_string())
+        })?;
+        let field_nodes = batch.nodes().ok_or_else(|| {
+            ArrowError::IpcError("Unable to get field nodes from IPC 
RecordBatch".to_string())
+        })?;
+
+        let batch_compression = batch.compression();
+        let compression = batch_compression
+            .map(|batch_compression| batch_compression.codec().try_into())
+            .transpose()?;
+
+        Ok(Self {
+            batch,
+            schema,
+            dictionaries_by_id,
+            compression,
+            version: *metadata,
+            data: buf,
+            nodes: field_nodes.iter(),
+            buffers: buffers.iter(),
+            projection: None,
+            require_alignment: false,
+        })
+    }
+
+    /// Set the projection (default: None)
+    ///
+    /// If set, the projection is the list  of column indices
+    /// that will be read
+    pub fn with_projection(mut self, projection: Option<&'a [usize]>) -> Self {
+        self.projection = projection;
+        self
+    }
+
+    /// Set require_alignment (default: false)
+    ///
+    /// If true, buffers must be aligned appropriately or error will
+    /// result. If false, buffers will be copied to aligned buffers
+    /// if necessary.
+    pub fn with_require_alignment(mut self, require_alignment: bool) -> Self {
+        self.require_alignment = require_alignment;
+        self
+    }
+
+    /// Read the record batch, consuming the reader
+    fn read_record_batch(mut self) -> Result<RecordBatch, ArrowError> {
+        let mut variadic_counts: VecDeque<i64> = self
+            .batch
+            .variadicBufferCounts()
+            .into_iter()
+            .flatten()
+            .collect();
+
+        let options = 
RecordBatchOptions::new().with_row_count(Some(self.batch.length() as usize));
+
+        let schema = Arc::clone(&self.schema);
+        if let Some(projection) = self.projection {
+            let mut arrays = vec![];
+            // project fields
+            for (idx, field) in schema.fields().iter().enumerate() {
+                // Create array for projected field
+                if let Some(proj_idx) = projection.iter().position(|p| p == 
&idx) {
+                    let child = self.create_array(field, &mut 
variadic_counts)?;
+                    arrays.push((proj_idx, child));
+                } else {
+                    self.skip_field(field, &mut variadic_counts)?;
+                }
+            }
+            assert!(variadic_counts.is_empty());
+            arrays.sort_by_key(|t| t.0);
+            RecordBatch::try_new_with_options(
+                Arc::new(schema.project(projection)?),
+                arrays.into_iter().map(|t| t.1).collect(),
+                &options,
+            )
+        } else {
+            let mut children = vec![];
+            // keep track of index as lists require more than one node
+            for field in schema.fields() {
+                let child = self.create_array(field, &mut variadic_counts)?;
+                children.push(child);
+            }
+            assert!(variadic_counts.is_empty());
+            RecordBatch::try_new_with_options(schema, children, &options)
+        }
+    }
+
     fn next_buffer(&mut self) -> Result<Buffer, ArrowError> {
         read_buffer(self.buffers.next().unwrap(), self.data, self.compression)
     }
@@ -497,15 +604,10 @@ pub fn read_record_batch(
     projection: Option<&[usize]>,
     metadata: &MetadataVersion,
 ) -> Result<RecordBatch, ArrowError> {
-    read_record_batch_impl(
-        buf,
-        batch,
-        schema,
-        dictionaries_by_id,
-        projection,
-        metadata,
-        false,
-    )
+    ArrayReader::try_new(buf, batch, schema, dictionaries_by_id, metadata)?
+        .with_projection(projection)
+        .with_require_alignment(false)
+        .read_record_batch()
 }
 
 /// Read the dictionary from the buffer and provided metadata,
@@ -520,73 +622,6 @@ pub fn read_dictionary(
     read_dictionary_impl(buf, batch, schema, dictionaries_by_id, metadata, 
false)
 }
 
-fn read_record_batch_impl(
-    buf: &Buffer,
-    batch: crate::RecordBatch,
-    schema: SchemaRef,
-    dictionaries_by_id: &HashMap<i64, ArrayRef>,
-    projection: Option<&[usize]>,
-    metadata: &MetadataVersion,
-    require_alignment: bool,
-) -> Result<RecordBatch, ArrowError> {
-    let buffers = batch.buffers().ok_or_else(|| {
-        ArrowError::IpcError("Unable to get buffers from IPC 
RecordBatch".to_string())
-    })?;
-    let field_nodes = batch.nodes().ok_or_else(|| {
-        ArrowError::IpcError("Unable to get field nodes from IPC 
RecordBatch".to_string())
-    })?;
-
-    let mut variadic_counts: VecDeque<i64> =
-        batch.variadicBufferCounts().into_iter().flatten().collect();
-
-    let batch_compression = batch.compression();
-    let compression = batch_compression
-        .map(|batch_compression| batch_compression.codec().try_into())
-        .transpose()?;
-
-    let mut reader = ArrayReader {
-        dictionaries_by_id,
-        compression,
-        version: *metadata,
-        data: buf,
-        nodes: field_nodes.iter(),
-        buffers: buffers.iter(),
-    };
-
-    let options = RecordBatchOptions::new().with_row_count(Some(batch.length() 
as usize));
-
-    if let Some(projection) = projection {
-        let mut arrays = vec![];
-        // project fields
-        for (idx, field) in schema.fields().iter().enumerate() {
-            // Create array for projected field
-            if let Some(proj_idx) = projection.iter().position(|p| p == &idx) {
-                let child =
-                    create_array(&mut reader, field, &mut variadic_counts, 
require_alignment)?;
-                arrays.push((proj_idx, child));
-            } else {
-                reader.skip_field(field, &mut variadic_counts)?;
-            }
-        }
-        assert!(variadic_counts.is_empty());
-        arrays.sort_by_key(|t| t.0);
-        RecordBatch::try_new_with_options(
-            Arc::new(schema.project(projection)?),
-            arrays.into_iter().map(|t| t.1).collect(),
-            &options,
-        )
-    } else {
-        let mut children = vec![];
-        // keep track of index as lists require more than one node
-        for field in schema.fields() {
-            let child = create_array(&mut reader, field, &mut variadic_counts, 
require_alignment)?;
-            children.push(child);
-        }
-        assert!(variadic_counts.is_empty());
-        RecordBatch::try_new_with_options(schema, children, &options)
-    }
-}
-
 fn read_dictionary_impl(
     buf: &Buffer,
     batch: crate::DictionaryBatch,
@@ -617,15 +652,16 @@ fn read_dictionary_impl(
             let value = value_type.as_ref().clone();
             let schema = Schema::new(vec![Field::new("", value, true)]);
             // Read a single column
-            let record_batch = read_record_batch_impl(
+            let record_batch = ArrayReader::try_new(
                 buf,
                 batch.data().unwrap(),
                 Arc::new(schema),
                 dictionaries_by_id,
-                None,
                 metadata,
-                require_alignment,
-            )?;
+            )?
+            .with_require_alignment(require_alignment)
+            .read_record_batch()?;
+
             Some(record_batch.column(0).clone())
         }
         _ => None,
@@ -840,15 +876,16 @@ impl FileDecoder {
                     ArrowError::IpcError("Unable to read IPC message as record 
batch".to_string())
                 })?;
                 // read the block that makes up the record batch into a buffer
-                read_record_batch_impl(
+                ArrayReader::try_new(
                     &buf.slice(block.metaDataLength() as _),
                     batch,
                     self.schema.clone(),
                     &self.dictionaries,
-                    self.projection.as_deref(),
                     &message.version(),
-                    self.require_alignment,
-                )
+                )?
+                .with_projection(self.projection.as_deref())
+                .with_require_alignment(self.require_alignment)
+                .read_record_batch()
                 .map(Some)
             }
             crate::MessageHeader::NONE => Ok(None),
@@ -1389,15 +1426,16 @@ impl<R: Read> StreamReader<R> {
                 let mut buf = 
MutableBuffer::from_len_zeroed(message.bodyLength() as usize);
                 self.reader.read_exact(&mut buf)?;
 
-                read_record_batch_impl(
+                ArrayReader::try_new(
                     &buf.into(),
                     batch,
                     self.schema(),
                     &self.dictionaries_by_id,
-                    self.projection.as_ref().map(|x| x.0.as_ref()),
                     &message.version(),
-                    false,
-                )
+                )?
+                .with_projection(self.projection.as_ref().map(|x| 
x.0.as_ref()))
+                .with_require_alignment(false)
+                .read_record_batch()
                 .map(Some)
             }
             crate::MessageHeader::DictionaryBatch => {
@@ -2239,15 +2277,16 @@ mod tests {
         assert_ne!(b.as_ptr().align_offset(8), 0);
 
         let ipc_batch = message.header_as_record_batch().unwrap();
-        let roundtrip = read_record_batch_impl(
+        let roundtrip = ArrayReader::try_new(
             &b,
             ipc_batch,
             batch.schema(),
             &Default::default(),
-            None,
             &message.version(),
-            false,
         )
+        .unwrap()
+        .with_require_alignment(false)
+        .read_record_batch()
         .unwrap();
         assert_eq!(batch, roundtrip);
     }
@@ -2277,15 +2316,16 @@ mod tests {
         assert_ne!(b.as_ptr().align_offset(8), 0);
 
         let ipc_batch = message.header_as_record_batch().unwrap();
-        let result = read_record_batch_impl(
+        let result = ArrayReader::try_new(
             &b,
             ipc_batch,
             batch.schema(),
             &Default::default(),
-            None,
             &message.version(),
-            true,
-        );
+        )
+        .unwrap()
+        .with_require_alignment(true)
+        .read_record_batch();
 
         let error = result.unwrap_err();
         assert_eq!(
diff --git a/arrow-ipc/src/reader/stream.rs b/arrow-ipc/src/reader/stream.rs
index 9b0eea9b61..e66896f151 100644
--- a/arrow-ipc/src/reader/stream.rs
+++ b/arrow-ipc/src/reader/stream.rs
@@ -24,7 +24,7 @@ use arrow_buffer::{Buffer, MutableBuffer};
 use arrow_schema::{ArrowError, SchemaRef};
 
 use crate::convert::MessageBuffer;
-use crate::reader::{read_dictionary_impl, read_record_batch_impl};
+use crate::reader::{read_dictionary_impl, ArrayReader};
 use crate::{MessageHeader, CONTINUATION_MARKER};
 
 /// A low-level interface for reading [`RecordBatch`] data from a stream of 
bytes
@@ -211,15 +211,15 @@ impl StreamDecoder {
                             let schema = self.schema.clone().ok_or_else(|| {
                                 ArrowError::IpcError("Missing 
schema".to_string())
                             })?;
-                            let batch = read_record_batch_impl(
+                            let batch = ArrayReader::try_new(
                                 &body,
                                 batch,
                                 schema,
                                 &self.dictionaries,
-                                None,
                                 &version,
-                                self.require_alignment,
-                            )?;
+                            )?
+                            .with_require_alignment(self.require_alignment)
+                            .read_record_batch()?;
                             self.state = DecoderState::default();
                             return Ok(Some(batch));
                         }

Reply via email to