This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git


The following commit(s) were added to refs/heads/main by this push:
     new 92cfd99e9a Refactor arrow-ipc: Rename `ArrayReader` to 
`RecodeBatchDecoder` (#7028)
92cfd99e9a is described below

commit 92cfd99e9ab4a6c54500ec65252027b9edf1ee55
Author: Andrew Lamb <[email protected]>
AuthorDate: Thu Feb 6 14:43:59 2025 -0500

    Refactor arrow-ipc: Rename `ArrayReader` to `RecodeBatchDecoder` (#7028)
    
    * Rename `ArrayReader` to `RecordBatchDecoder`
    
    * Remove alias for `self`
---
 arrow-ipc/src/reader.rs        | 110 +++++++++++++++++++++--------------------
 arrow-ipc/src/reader/stream.rs |   4 +-
 2 files changed, 58 insertions(+), 56 deletions(-)

diff --git a/arrow-ipc/src/reader.rs b/arrow-ipc/src/reader.rs
index ff509c4f57..e79ab23211 100644
--- a/arrow-ipc/src/reader.rs
+++ b/arrow-ipc/src/reader.rs
@@ -65,7 +65,7 @@ fn read_buffer(
         (false, Some(decompressor)) => 
decompressor.decompress_to_buffer(&buf_data),
     }
 }
-impl ArrayReader<'_> {
+impl RecordBatchDecoder<'_> {
     /// Coordinates reading arrays based on data types.
     ///
     /// `variadic_counts` encodes the number of buffers to read for variadic 
types (e.g., Utf8View, BinaryView)
@@ -83,18 +83,17 @@ impl ArrayReader<'_> {
         field: &Field,
         variadic_counts: &mut VecDeque<i64>,
     ) -> Result<ArrayRef, ArrowError> {
-        let reader = self;
         let data_type = field.data_type();
         match data_type {
             Utf8 | Binary | LargeBinary | LargeUtf8 => create_primitive_array(
-                reader.next_node(field)?,
+                self.next_node(field)?,
                 data_type,
                 &[
-                    reader.next_buffer()?,
-                    reader.next_buffer()?,
-                    reader.next_buffer()?,
+                    self.next_buffer()?,
+                    self.next_buffer()?,
+                    self.next_buffer()?,
                 ],
-                reader.require_alignment,
+                self.require_alignment,
             ),
             BinaryView | Utf8View => {
                 let count = variadic_counts
@@ -104,55 +103,55 @@ impl ArrayReader<'_> {
                     )))?;
                 let count = count + 2; // view and null buffer.
                 let buffers = (0..count)
-                    .map(|_| reader.next_buffer())
+                    .map(|_| self.next_buffer())
                     .collect::<Result<Vec<_>, _>>()?;
                 create_primitive_array(
-                    reader.next_node(field)?,
+                    self.next_node(field)?,
                     data_type,
                     &buffers,
-                    reader.require_alignment,
+                    self.require_alignment,
                 )
             }
             FixedSizeBinary(_) => create_primitive_array(
-                reader.next_node(field)?,
+                self.next_node(field)?,
                 data_type,
-                &[reader.next_buffer()?, reader.next_buffer()?],
-                reader.require_alignment,
+                &[self.next_buffer()?, self.next_buffer()?],
+                self.require_alignment,
             ),
             List(ref list_field) | LargeList(ref list_field) | Map(ref 
list_field, _) => {
-                let list_node = reader.next_node(field)?;
-                let list_buffers = [reader.next_buffer()?, 
reader.next_buffer()?];
-                let values = reader.create_array(list_field, variadic_counts)?;
+                let list_node = self.next_node(field)?;
+                let list_buffers = [self.next_buffer()?, self.next_buffer()?];
+                let values = self.create_array(list_field, variadic_counts)?;
                 create_list_array(
                     list_node,
                     data_type,
                     &list_buffers,
                     values,
-                    reader.require_alignment,
+                    self.require_alignment,
                 )
             }
             FixedSizeList(ref list_field, _) => {
-                let list_node = reader.next_node(field)?;
-                let list_buffers = [reader.next_buffer()?];
-                let values = reader.create_array(list_field, variadic_counts)?;
+                let list_node = self.next_node(field)?;
+                let list_buffers = [self.next_buffer()?];
+                let values = self.create_array(list_field, variadic_counts)?;
                 create_list_array(
                     list_node,
                     data_type,
                     &list_buffers,
                     values,
-                    reader.require_alignment,
+                    self.require_alignment,
                 )
             }
             Struct(struct_fields) => {
-                let struct_node = reader.next_node(field)?;
-                let null_buffer = reader.next_buffer()?;
+                let struct_node = self.next_node(field)?;
+                let null_buffer = self.next_buffer()?;
 
                 // read the arrays for each field
                 let mut struct_arrays = vec![];
                 // TODO investigate whether just knowing the number of buffers 
could
                 // still work
                 for struct_field in struct_fields {
-                    let child = reader.create_array(struct_field, 
variadic_counts)?;
+                    let child = self.create_array(struct_field, 
variadic_counts)?;
                     struct_arrays.push(child);
                 }
                 let null_count = struct_node.null_count() as usize;
@@ -175,9 +174,9 @@ impl ArrayReader<'_> {
                 Ok(Arc::new(struct_array))
             }
             RunEndEncoded(run_ends_field, values_field) => {
-                let run_node = reader.next_node(field)?;
-                let run_ends = reader.create_array(run_ends_field, 
variadic_counts)?;
-                let values = reader.create_array(values_field, 
variadic_counts)?;
+                let run_node = self.next_node(field)?;
+                let run_ends = self.create_array(run_ends_field, 
variadic_counts)?;
+                let values = self.create_array(values_field, variadic_counts)?;
 
                 let run_array_length = run_node.length() as usize;
                 let array_data = ArrayData::builder(data_type.clone())
@@ -185,22 +184,22 @@ impl ArrayReader<'_> {
                     .offset(0)
                     .add_child_data(run_ends.into_data())
                     .add_child_data(values.into_data())
-                    .align_buffers(!reader.require_alignment)
+                    .align_buffers(!self.require_alignment)
                     .build()?;
 
                 Ok(make_array(array_data))
             }
             // Create dictionary array from RecordBatch
             Dictionary(_, _) => {
-                let index_node = reader.next_node(field)?;
-                let index_buffers = [reader.next_buffer()?, 
reader.next_buffer()?];
+                let index_node = self.next_node(field)?;
+                let index_buffers = [self.next_buffer()?, self.next_buffer()?];
 
                 #[allow(deprecated)]
                 let dict_id = field.dict_id().ok_or_else(|| {
                     ArrowError::ParseError(format!("Field {field} does not 
have dict id"))
                 })?;
 
-                let value_array = 
reader.dictionaries_by_id.get(&dict_id).ok_or_else(|| {
+                let value_array = 
self.dictionaries_by_id.get(&dict_id).ok_or_else(|| {
                     ArrowError::ParseError(format!(
                         "Cannot find a dictionary batch with dict id: 
{dict_id}"
                     ))
@@ -211,26 +210,26 @@ impl ArrayReader<'_> {
                     data_type,
                     &index_buffers,
                     value_array.clone(),
-                    reader.require_alignment,
+                    self.require_alignment,
                 )
             }
             Union(fields, mode) => {
-                let union_node = reader.next_node(field)?;
+                let union_node = self.next_node(field)?;
                 let len = union_node.length() as usize;
 
                 // In V4, union types has validity bitmap
                 // In V5 and later, union types have no validity bitmap
-                if reader.version < MetadataVersion::V5 {
-                    reader.next_buffer()?;
+                if self.version < MetadataVersion::V5 {
+                    self.next_buffer()?;
                 }
 
                 let type_ids: ScalarBuffer<i8> =
-                    reader.next_buffer()?.slice_with_length(0, len).into();
+                    self.next_buffer()?.slice_with_length(0, len).into();
 
                 let value_offsets = match mode {
                     UnionMode::Dense => {
                         let offsets: ScalarBuffer<i32> =
-                            reader.next_buffer()?.slice_with_length(0, len * 
4).into();
+                            self.next_buffer()?.slice_with_length(0, len * 
4).into();
                         Some(offsets)
                     }
                     UnionMode::Sparse => None,
@@ -239,7 +238,7 @@ impl ArrayReader<'_> {
                 let mut children = Vec::with_capacity(fields.len());
 
                 for (_id, field) in fields.iter() {
-                    let child = reader.create_array(field, variadic_counts)?;
+                    let child = self.create_array(field, variadic_counts)?;
                     children.push(child);
                 }
 
@@ -247,7 +246,7 @@ impl ArrayReader<'_> {
                 Ok(Arc::new(array))
             }
             Null => {
-                let node = reader.next_node(field)?;
+                let node = self.next_node(field)?;
                 let length = node.length();
                 let null_count = node.null_count();
 
@@ -260,17 +259,17 @@ impl ArrayReader<'_> {
                 let array_data = ArrayData::builder(data_type.clone())
                     .len(length as usize)
                     .offset(0)
-                    .align_buffers(!reader.require_alignment)
+                    .align_buffers(!self.require_alignment)
                     .build()?;
 
                 // no buffer increases
                 Ok(Arc::new(NullArray::from(array_data)))
             }
             _ => create_primitive_array(
-                reader.next_node(field)?,
+                self.next_node(field)?,
                 data_type,
-                &[reader.next_buffer()?, reader.next_buffer()?],
-                reader.require_alignment,
+                &[self.next_buffer()?, self.next_buffer()?],
+                self.require_alignment,
             ),
         }
     }
@@ -370,8 +369,11 @@ fn create_dictionary_array(
     }
 }
 
-/// State for decoding arrays from an encoded [`RecordBatch`]
-struct ArrayReader<'a> {
+/// State for decoding Arrow arrays from an [IPC RecordBatch] structure to
+/// [`RecordBatch`]
+///
+/// [IPC RecordBatch]: crate::RecordBatch
+struct RecordBatchDecoder<'a> {
     /// The flatbuffers encoded record batch
     batch: crate::RecordBatch<'a>,
     /// The output schema
@@ -389,14 +391,14 @@ struct ArrayReader<'a> {
     /// The buffers comprising this array
     buffers: VectorIter<'a, crate::Buffer>,
     /// Projection (subset of columns) to read, if any
-    /// See [`ArrayReader::with_projection`] for details
+    /// See [`RecordBatchDecoder::with_projection`] for details
     projection: Option<&'a [usize]>,
     /// Are buffers required to already be aligned? See
-    /// [`ArrayReader::with_require_alignment`] for details
+    /// [`RecordBatchDecoder::with_require_alignment`] for details
     require_alignment: bool,
 }
 
-impl<'a> ArrayReader<'a> {
+impl<'a> RecordBatchDecoder<'a> {
     /// Create a reader for decoding arrays from an encoded [`RecordBatch`]
     fn try_new(
         buf: &'a Buffer,
@@ -604,7 +606,7 @@ pub fn read_record_batch(
     projection: Option<&[usize]>,
     metadata: &MetadataVersion,
 ) -> Result<RecordBatch, ArrowError> {
-    ArrayReader::try_new(buf, batch, schema, dictionaries_by_id, metadata)?
+    RecordBatchDecoder::try_new(buf, batch, schema, dictionaries_by_id, 
metadata)?
         .with_projection(projection)
         .with_require_alignment(false)
         .read_record_batch()
@@ -652,7 +654,7 @@ fn read_dictionary_impl(
             let value = value_type.as_ref().clone();
             let schema = Schema::new(vec![Field::new("", value, true)]);
             // Read a single column
-            let record_batch = ArrayReader::try_new(
+            let record_batch = RecordBatchDecoder::try_new(
                 buf,
                 batch.data().unwrap(),
                 Arc::new(schema),
@@ -876,7 +878,7 @@ impl FileDecoder {
                     ArrowError::IpcError("Unable to read IPC message as record 
batch".to_string())
                 })?;
                 // read the block that makes up the record batch into a buffer
-                ArrayReader::try_new(
+                RecordBatchDecoder::try_new(
                     &buf.slice(block.metaDataLength() as _),
                     batch,
                     self.schema.clone(),
@@ -1426,7 +1428,7 @@ impl<R: Read> StreamReader<R> {
                 let mut buf = 
MutableBuffer::from_len_zeroed(message.bodyLength() as usize);
                 self.reader.read_exact(&mut buf)?;
 
-                ArrayReader::try_new(
+                RecordBatchDecoder::try_new(
                     &buf.into(),
                     batch,
                     self.schema(),
@@ -2277,7 +2279,7 @@ mod tests {
         assert_ne!(b.as_ptr().align_offset(8), 0);
 
         let ipc_batch = message.header_as_record_batch().unwrap();
-        let roundtrip = ArrayReader::try_new(
+        let roundtrip = RecordBatchDecoder::try_new(
             &b,
             ipc_batch,
             batch.schema(),
@@ -2316,7 +2318,7 @@ mod tests {
         assert_ne!(b.as_ptr().align_offset(8), 0);
 
         let ipc_batch = message.header_as_record_batch().unwrap();
-        let result = ArrayReader::try_new(
+        let result = RecordBatchDecoder::try_new(
             &b,
             ipc_batch,
             batch.schema(),
diff --git a/arrow-ipc/src/reader/stream.rs b/arrow-ipc/src/reader/stream.rs
index e66896f151..174e69c1f6 100644
--- a/arrow-ipc/src/reader/stream.rs
+++ b/arrow-ipc/src/reader/stream.rs
@@ -24,7 +24,7 @@ use arrow_buffer::{Buffer, MutableBuffer};
 use arrow_schema::{ArrowError, SchemaRef};
 
 use crate::convert::MessageBuffer;
-use crate::reader::{read_dictionary_impl, ArrayReader};
+use crate::reader::{read_dictionary_impl, RecordBatchDecoder};
 use crate::{MessageHeader, CONTINUATION_MARKER};
 
 /// A low-level interface for reading [`RecordBatch`] data from a stream of 
bytes
@@ -211,7 +211,7 @@ impl StreamDecoder {
                             let schema = self.schema.clone().ok_or_else(|| {
                                 ArrowError::IpcError("Missing 
schema".to_string())
                             })?;
-                            let batch = ArrayReader::try_new(
+                            let batch = RecordBatchDecoder::try_new(
                                 &body,
                                 batch,
                                 schema,

Reply via email to