This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git
The following commit(s) were added to refs/heads/main by this push:
new 15249bfb19 Refactor ipc reading code into methods on `ArrayReader`
(#7006)
15249bfb19 is described below
commit 15249bfb196b2cb66aad64822697313d59cf7f69
Author: Andrew Lamb <[email protected]>
AuthorDate: Mon Jan 27 05:50:08 2025 -0500
Refactor ipc reading code into methods on `ArrayReader` (#7006)
---
arrow-ipc/src/reader.rs | 610 ++++++++++++++++++++++-------------------
arrow-ipc/src/reader/stream.rs | 10 +-
2 files changed, 330 insertions(+), 290 deletions(-)
diff --git a/arrow-ipc/src/reader.rs b/arrow-ipc/src/reader.rs
index b72785651b..ff509c4f57 100644
--- a/arrow-ipc/src/reader.rs
+++ b/arrow-ipc/src/reader.rs
@@ -65,213 +65,214 @@ fn read_buffer(
(false, Some(decompressor)) =>
decompressor.decompress_to_buffer(&buf_data),
}
}
-
-/// Coordinates reading arrays based on data types.
-///
-/// `variadic_counts` encodes the number of buffers to read for variadic types
(e.g., Utf8View, BinaryView)
-/// When encounter such types, we pop from the front of the queue to get the
number of buffers to read.
-///
-/// Notes:
-/// * In the IPC format, null buffers are always set, but may be empty. We
discard them if an array has 0 nulls
-/// * Numeric values inside list arrays are often stored as 64-bit values
regardless of their data type size.
-/// We thus:
-/// - check if the bit width of non-64-bit numbers is 64, and
-/// - read the buffer as 64-bit (signed integer or float), and
-/// - cast the 64-bit array to the appropriate data type
-fn create_array(
- reader: &mut ArrayReader,
- field: &Field,
- variadic_counts: &mut VecDeque<i64>,
- require_alignment: bool,
-) -> Result<ArrayRef, ArrowError> {
- let data_type = field.data_type();
- match data_type {
- Utf8 | Binary | LargeBinary | LargeUtf8 => create_primitive_array(
- reader.next_node(field)?,
- data_type,
- &[
- reader.next_buffer()?,
- reader.next_buffer()?,
- reader.next_buffer()?,
- ],
- require_alignment,
- ),
- BinaryView | Utf8View => {
- let count = variadic_counts
- .pop_front()
- .ok_or(ArrowError::IpcError(format!(
- "Missing variadic count for {data_type} column"
- )))?;
- let count = count + 2; // view and null buffer.
- let buffers = (0..count)
- .map(|_| reader.next_buffer())
- .collect::<Result<Vec<_>, _>>()?;
- create_primitive_array(
+impl ArrayReader<'_> {
+ /// Coordinates reading arrays based on data types.
+ ///
+ /// `variadic_counts` encodes the number of buffers to read for variadic
types (e.g., Utf8View, BinaryView)
+ /// When encounter such types, we pop from the front of the queue to get
the number of buffers to read.
+ ///
+ /// Notes:
+ /// * In the IPC format, null buffers are always set, but may be empty. We
discard them if an array has 0 nulls
+ /// * Numeric values inside list arrays are often stored as 64-bit values
regardless of their data type size.
+ /// We thus:
+ /// - check if the bit width of non-64-bit numbers is 64, and
+ /// - read the buffer as 64-bit (signed integer or float), and
+ /// - cast the 64-bit array to the appropriate data type
+ fn create_array(
+ &mut self,
+ field: &Field,
+ variadic_counts: &mut VecDeque<i64>,
+ ) -> Result<ArrayRef, ArrowError> {
+ let reader = self;
+ let data_type = field.data_type();
+ match data_type {
+ Utf8 | Binary | LargeBinary | LargeUtf8 => create_primitive_array(
reader.next_node(field)?,
data_type,
- &buffers,
- require_alignment,
- )
- }
- FixedSizeBinary(_) => create_primitive_array(
- reader.next_node(field)?,
- data_type,
- &[reader.next_buffer()?, reader.next_buffer()?],
- require_alignment,
- ),
- List(ref list_field) | LargeList(ref list_field) | Map(ref list_field,
_) => {
- let list_node = reader.next_node(field)?;
- let list_buffers = [reader.next_buffer()?, reader.next_buffer()?];
- let values = create_array(reader, list_field, variadic_counts,
require_alignment)?;
- create_list_array(
- list_node,
- data_type,
- &list_buffers,
- values,
- require_alignment,
- )
- }
- FixedSizeList(ref list_field, _) => {
- let list_node = reader.next_node(field)?;
- let list_buffers = [reader.next_buffer()?];
- let values = create_array(reader, list_field, variadic_counts,
require_alignment)?;
- create_list_array(
- list_node,
+ &[
+ reader.next_buffer()?,
+ reader.next_buffer()?,
+ reader.next_buffer()?,
+ ],
+ reader.require_alignment,
+ ),
+ BinaryView | Utf8View => {
+ let count = variadic_counts
+ .pop_front()
+ .ok_or(ArrowError::IpcError(format!(
+ "Missing variadic count for {data_type} column"
+ )))?;
+ let count = count + 2; // view and null buffer.
+ let buffers = (0..count)
+ .map(|_| reader.next_buffer())
+ .collect::<Result<Vec<_>, _>>()?;
+ create_primitive_array(
+ reader.next_node(field)?,
+ data_type,
+ &buffers,
+ reader.require_alignment,
+ )
+ }
+ FixedSizeBinary(_) => create_primitive_array(
+ reader.next_node(field)?,
data_type,
- &list_buffers,
- values,
- require_alignment,
- )
- }
- Struct(struct_fields) => {
- let struct_node = reader.next_node(field)?;
- let null_buffer = reader.next_buffer()?;
-
- // read the arrays for each field
- let mut struct_arrays = vec![];
- // TODO investigate whether just knowing the number of buffers
could
- // still work
- for struct_field in struct_fields {
- let child = create_array(reader, struct_field,
variadic_counts, require_alignment)?;
- struct_arrays.push(child);
+ &[reader.next_buffer()?, reader.next_buffer()?],
+ reader.require_alignment,
+ ),
+ List(ref list_field) | LargeList(ref list_field) | Map(ref
list_field, _) => {
+ let list_node = reader.next_node(field)?;
+ let list_buffers = [reader.next_buffer()?,
reader.next_buffer()?];
+ let values = reader.create_array(list_field, variadic_counts)?;
+ create_list_array(
+ list_node,
+ data_type,
+ &list_buffers,
+ values,
+ reader.require_alignment,
+ )
}
- let null_count = struct_node.null_count() as usize;
- let struct_array = if struct_arrays.is_empty() {
- // `StructArray::from` can't infer the correct row count
- // if we have zero fields
- let len = struct_node.length() as usize;
- StructArray::new_empty_fields(
- len,
- (null_count > 0).then(|| BooleanBuffer::new(null_buffer,
0, len).into()),
+ FixedSizeList(ref list_field, _) => {
+ let list_node = reader.next_node(field)?;
+ let list_buffers = [reader.next_buffer()?];
+ let values = reader.create_array(list_field, variadic_counts)?;
+ create_list_array(
+ list_node,
+ data_type,
+ &list_buffers,
+ values,
+ reader.require_alignment,
)
- } else if null_count > 0 {
- // create struct array from fields, arrays and null data
- let len = struct_node.length() as usize;
- let nulls = BooleanBuffer::new(null_buffer, 0, len).into();
- StructArray::try_new(struct_fields.clone(), struct_arrays,
Some(nulls))?
- } else {
- StructArray::try_new(struct_fields.clone(), struct_arrays,
None)?
- };
- Ok(Arc::new(struct_array))
- }
- RunEndEncoded(run_ends_field, values_field) => {
- let run_node = reader.next_node(field)?;
- let run_ends =
- create_array(reader, run_ends_field, variadic_counts,
require_alignment)?;
- let values = create_array(reader, values_field, variadic_counts,
require_alignment)?;
-
- let run_array_length = run_node.length() as usize;
- let array_data = ArrayData::builder(data_type.clone())
- .len(run_array_length)
- .offset(0)
- .add_child_data(run_ends.into_data())
- .add_child_data(values.into_data())
- .align_buffers(!require_alignment)
- .build()?;
-
- Ok(make_array(array_data))
- }
- // Create dictionary array from RecordBatch
- Dictionary(_, _) => {
- let index_node = reader.next_node(field)?;
- let index_buffers = [reader.next_buffer()?, reader.next_buffer()?];
+ }
+ Struct(struct_fields) => {
+ let struct_node = reader.next_node(field)?;
+ let null_buffer = reader.next_buffer()?;
- #[allow(deprecated)]
- let dict_id = field.dict_id().ok_or_else(|| {
- ArrowError::ParseError(format!("Field {field} does not have
dict id"))
- })?;
-
- let value_array =
reader.dictionaries_by_id.get(&dict_id).ok_or_else(|| {
- ArrowError::ParseError(format!(
- "Cannot find a dictionary batch with dict id: {dict_id}"
- ))
- })?;
-
- create_dictionary_array(
- index_node,
- data_type,
- &index_buffers,
- value_array.clone(),
- require_alignment,
- )
- }
- Union(fields, mode) => {
- let union_node = reader.next_node(field)?;
- let len = union_node.length() as usize;
-
- // In V4, union types has validity bitmap
- // In V5 and later, union types have no validity bitmap
- if reader.version < MetadataVersion::V5 {
- reader.next_buffer()?;
+ // read the arrays for each field
+ let mut struct_arrays = vec![];
+ // TODO investigate whether just knowing the number of buffers
could
+ // still work
+ for struct_field in struct_fields {
+ let child = reader.create_array(struct_field,
variadic_counts)?;
+ struct_arrays.push(child);
+ }
+ let null_count = struct_node.null_count() as usize;
+ let struct_array = if struct_arrays.is_empty() {
+ // `StructArray::from` can't infer the correct row count
+ // if we have zero fields
+ let len = struct_node.length() as usize;
+ StructArray::new_empty_fields(
+ len,
+ (null_count > 0).then(||
BooleanBuffer::new(null_buffer, 0, len).into()),
+ )
+ } else if null_count > 0 {
+ // create struct array from fields, arrays and null data
+ let len = struct_node.length() as usize;
+ let nulls = BooleanBuffer::new(null_buffer, 0, len).into();
+ StructArray::try_new(struct_fields.clone(), struct_arrays,
Some(nulls))?
+ } else {
+ StructArray::try_new(struct_fields.clone(), struct_arrays,
None)?
+ };
+ Ok(Arc::new(struct_array))
}
+ RunEndEncoded(run_ends_field, values_field) => {
+ let run_node = reader.next_node(field)?;
+ let run_ends = reader.create_array(run_ends_field,
variadic_counts)?;
+ let values = reader.create_array(values_field,
variadic_counts)?;
+
+ let run_array_length = run_node.length() as usize;
+ let array_data = ArrayData::builder(data_type.clone())
+ .len(run_array_length)
+ .offset(0)
+ .add_child_data(run_ends.into_data())
+ .add_child_data(values.into_data())
+ .align_buffers(!reader.require_alignment)
+ .build()?;
+
+ Ok(make_array(array_data))
+ }
+ // Create dictionary array from RecordBatch
+ Dictionary(_, _) => {
+ let index_node = reader.next_node(field)?;
+ let index_buffers = [reader.next_buffer()?,
reader.next_buffer()?];
- let type_ids: ScalarBuffer<i8> =
reader.next_buffer()?.slice_with_length(0, len).into();
+ #[allow(deprecated)]
+ let dict_id = field.dict_id().ok_or_else(|| {
+ ArrowError::ParseError(format!("Field {field} does not
have dict id"))
+ })?;
+
+ let value_array =
reader.dictionaries_by_id.get(&dict_id).ok_or_else(|| {
+ ArrowError::ParseError(format!(
+ "Cannot find a dictionary batch with dict id:
{dict_id}"
+ ))
+ })?;
+
+ create_dictionary_array(
+ index_node,
+ data_type,
+ &index_buffers,
+ value_array.clone(),
+ reader.require_alignment,
+ )
+ }
+ Union(fields, mode) => {
+ let union_node = reader.next_node(field)?;
+ let len = union_node.length() as usize;
- let value_offsets = match mode {
- UnionMode::Dense => {
- let offsets: ScalarBuffer<i32> =
- reader.next_buffer()?.slice_with_length(0, len *
4).into();
- Some(offsets)
+ // In V4, union types has validity bitmap
+ // In V5 and later, union types have no validity bitmap
+ if reader.version < MetadataVersion::V5 {
+ reader.next_buffer()?;
}
- UnionMode::Sparse => None,
- };
- let mut children = Vec::with_capacity(fields.len());
+ let type_ids: ScalarBuffer<i8> =
+ reader.next_buffer()?.slice_with_length(0, len).into();
- for (_id, field) in fields.iter() {
- let child = create_array(reader, field, variadic_counts,
require_alignment)?;
- children.push(child);
- }
+ let value_offsets = match mode {
+ UnionMode::Dense => {
+ let offsets: ScalarBuffer<i32> =
+ reader.next_buffer()?.slice_with_length(0, len *
4).into();
+ Some(offsets)
+ }
+ UnionMode::Sparse => None,
+ };
- let array = UnionArray::try_new(fields.clone(), type_ids,
value_offsets, children)?;
- Ok(Arc::new(array))
- }
- Null => {
- let node = reader.next_node(field)?;
- let length = node.length();
- let null_count = node.null_count();
-
- if length != null_count {
- return Err(ArrowError::SchemaError(format!(
- "Field {field} of NullArray has unequal null_count
{null_count} and len {length}"
- )));
+ let mut children = Vec::with_capacity(fields.len());
+
+ for (_id, field) in fields.iter() {
+ let child = reader.create_array(field, variadic_counts)?;
+ children.push(child);
+ }
+
+ let array = UnionArray::try_new(fields.clone(), type_ids,
value_offsets, children)?;
+ Ok(Arc::new(array))
}
+ Null => {
+ let node = reader.next_node(field)?;
+ let length = node.length();
+ let null_count = node.null_count();
+
+ if length != null_count {
+ return Err(ArrowError::SchemaError(format!(
+ "Field {field} of NullArray has unequal null_count
{null_count} and len {length}"
+ )));
+ }
- let array_data = ArrayData::builder(data_type.clone())
- .len(length as usize)
- .offset(0)
- .align_buffers(!require_alignment)
- .build()?;
+ let array_data = ArrayData::builder(data_type.clone())
+ .len(length as usize)
+ .offset(0)
+ .align_buffers(!reader.require_alignment)
+ .build()?;
- // no buffer increases
- Ok(Arc::new(NullArray::from(array_data)))
+ // no buffer increases
+ Ok(Arc::new(NullArray::from(array_data)))
+ }
+ _ => create_primitive_array(
+ reader.next_node(field)?,
+ data_type,
+ &[reader.next_buffer()?, reader.next_buffer()?],
+ reader.require_alignment,
+ ),
}
- _ => create_primitive_array(
- reader.next_node(field)?,
- data_type,
- &[reader.next_buffer()?, reader.next_buffer()?],
- require_alignment,
- ),
}
}
@@ -371,6 +372,10 @@ fn create_dictionary_array(
/// State for decoding arrays from an encoded [`RecordBatch`]
struct ArrayReader<'a> {
+ /// The flatbuffers encoded record batch
+ batch: crate::RecordBatch<'a>,
+ /// The output schema
+ schema: SchemaRef,
/// Decoded dictionaries indexed by dictionary id
dictionaries_by_id: &'a HashMap<i64, ArrayRef>,
/// Optional compression codec
@@ -383,9 +388,111 @@ struct ArrayReader<'a> {
nodes: VectorIter<'a, FieldNode>,
/// The buffers comprising this array
buffers: VectorIter<'a, crate::Buffer>,
+ /// Projection (subset of columns) to read, if any
+ /// See [`ArrayReader::with_projection`] for details
+ projection: Option<&'a [usize]>,
+ /// Are buffers required to already be aligned? See
+ /// [`ArrayReader::with_require_alignment`] for details
+ require_alignment: bool,
}
impl<'a> ArrayReader<'a> {
+ /// Create a reader for decoding arrays from an encoded [`RecordBatch`]
+ fn try_new(
+ buf: &'a Buffer,
+ batch: crate::RecordBatch<'a>,
+ schema: SchemaRef,
+ dictionaries_by_id: &'a HashMap<i64, ArrayRef>,
+ metadata: &'a MetadataVersion,
+ ) -> Result<Self, ArrowError> {
+ let buffers = batch.buffers().ok_or_else(|| {
+ ArrowError::IpcError("Unable to get buffers from IPC
RecordBatch".to_string())
+ })?;
+ let field_nodes = batch.nodes().ok_or_else(|| {
+ ArrowError::IpcError("Unable to get field nodes from IPC
RecordBatch".to_string())
+ })?;
+
+ let batch_compression = batch.compression();
+ let compression = batch_compression
+ .map(|batch_compression| batch_compression.codec().try_into())
+ .transpose()?;
+
+ Ok(Self {
+ batch,
+ schema,
+ dictionaries_by_id,
+ compression,
+ version: *metadata,
+ data: buf,
+ nodes: field_nodes.iter(),
+ buffers: buffers.iter(),
+ projection: None,
+ require_alignment: false,
+ })
+ }
+
+ /// Set the projection (default: None)
+ ///
+ /// If set, the projection is the list of column indices
+ /// that will be read
+ pub fn with_projection(mut self, projection: Option<&'a [usize]>) -> Self {
+ self.projection = projection;
+ self
+ }
+
+ /// Set require_alignment (default: false)
+ ///
+ /// If true, buffers must be aligned appropriately or error will
+ /// result. If false, buffers will be copied to aligned buffers
+ /// if necessary.
+ pub fn with_require_alignment(mut self, require_alignment: bool) -> Self {
+ self.require_alignment = require_alignment;
+ self
+ }
+
+ /// Read the record batch, consuming the reader
+ fn read_record_batch(mut self) -> Result<RecordBatch, ArrowError> {
+ let mut variadic_counts: VecDeque<i64> = self
+ .batch
+ .variadicBufferCounts()
+ .into_iter()
+ .flatten()
+ .collect();
+
+ let options =
RecordBatchOptions::new().with_row_count(Some(self.batch.length() as usize));
+
+ let schema = Arc::clone(&self.schema);
+ if let Some(projection) = self.projection {
+ let mut arrays = vec![];
+ // project fields
+ for (idx, field) in schema.fields().iter().enumerate() {
+ // Create array for projected field
+ if let Some(proj_idx) = projection.iter().position(|p| p ==
&idx) {
+ let child = self.create_array(field, &mut
variadic_counts)?;
+ arrays.push((proj_idx, child));
+ } else {
+ self.skip_field(field, &mut variadic_counts)?;
+ }
+ }
+ assert!(variadic_counts.is_empty());
+ arrays.sort_by_key(|t| t.0);
+ RecordBatch::try_new_with_options(
+ Arc::new(schema.project(projection)?),
+ arrays.into_iter().map(|t| t.1).collect(),
+ &options,
+ )
+ } else {
+ let mut children = vec![];
+ // keep track of index as lists require more than one node
+ for field in schema.fields() {
+ let child = self.create_array(field, &mut variadic_counts)?;
+ children.push(child);
+ }
+ assert!(variadic_counts.is_empty());
+ RecordBatch::try_new_with_options(schema, children, &options)
+ }
+ }
+
fn next_buffer(&mut self) -> Result<Buffer, ArrowError> {
read_buffer(self.buffers.next().unwrap(), self.data, self.compression)
}
@@ -497,15 +604,10 @@ pub fn read_record_batch(
projection: Option<&[usize]>,
metadata: &MetadataVersion,
) -> Result<RecordBatch, ArrowError> {
- read_record_batch_impl(
- buf,
- batch,
- schema,
- dictionaries_by_id,
- projection,
- metadata,
- false,
- )
+ ArrayReader::try_new(buf, batch, schema, dictionaries_by_id, metadata)?
+ .with_projection(projection)
+ .with_require_alignment(false)
+ .read_record_batch()
}
/// Read the dictionary from the buffer and provided metadata,
@@ -520,73 +622,6 @@ pub fn read_dictionary(
read_dictionary_impl(buf, batch, schema, dictionaries_by_id, metadata,
false)
}
-fn read_record_batch_impl(
- buf: &Buffer,
- batch: crate::RecordBatch,
- schema: SchemaRef,
- dictionaries_by_id: &HashMap<i64, ArrayRef>,
- projection: Option<&[usize]>,
- metadata: &MetadataVersion,
- require_alignment: bool,
-) -> Result<RecordBatch, ArrowError> {
- let buffers = batch.buffers().ok_or_else(|| {
- ArrowError::IpcError("Unable to get buffers from IPC
RecordBatch".to_string())
- })?;
- let field_nodes = batch.nodes().ok_or_else(|| {
- ArrowError::IpcError("Unable to get field nodes from IPC
RecordBatch".to_string())
- })?;
-
- let mut variadic_counts: VecDeque<i64> =
- batch.variadicBufferCounts().into_iter().flatten().collect();
-
- let batch_compression = batch.compression();
- let compression = batch_compression
- .map(|batch_compression| batch_compression.codec().try_into())
- .transpose()?;
-
- let mut reader = ArrayReader {
- dictionaries_by_id,
- compression,
- version: *metadata,
- data: buf,
- nodes: field_nodes.iter(),
- buffers: buffers.iter(),
- };
-
- let options = RecordBatchOptions::new().with_row_count(Some(batch.length()
as usize));
-
- if let Some(projection) = projection {
- let mut arrays = vec![];
- // project fields
- for (idx, field) in schema.fields().iter().enumerate() {
- // Create array for projected field
- if let Some(proj_idx) = projection.iter().position(|p| p == &idx) {
- let child =
- create_array(&mut reader, field, &mut variadic_counts,
require_alignment)?;
- arrays.push((proj_idx, child));
- } else {
- reader.skip_field(field, &mut variadic_counts)?;
- }
- }
- assert!(variadic_counts.is_empty());
- arrays.sort_by_key(|t| t.0);
- RecordBatch::try_new_with_options(
- Arc::new(schema.project(projection)?),
- arrays.into_iter().map(|t| t.1).collect(),
- &options,
- )
- } else {
- let mut children = vec![];
- // keep track of index as lists require more than one node
- for field in schema.fields() {
- let child = create_array(&mut reader, field, &mut variadic_counts,
require_alignment)?;
- children.push(child);
- }
- assert!(variadic_counts.is_empty());
- RecordBatch::try_new_with_options(schema, children, &options)
- }
-}
-
fn read_dictionary_impl(
buf: &Buffer,
batch: crate::DictionaryBatch,
@@ -617,15 +652,16 @@ fn read_dictionary_impl(
let value = value_type.as_ref().clone();
let schema = Schema::new(vec![Field::new("", value, true)]);
// Read a single column
- let record_batch = read_record_batch_impl(
+ let record_batch = ArrayReader::try_new(
buf,
batch.data().unwrap(),
Arc::new(schema),
dictionaries_by_id,
- None,
metadata,
- require_alignment,
- )?;
+ )?
+ .with_require_alignment(require_alignment)
+ .read_record_batch()?;
+
Some(record_batch.column(0).clone())
}
_ => None,
@@ -840,15 +876,16 @@ impl FileDecoder {
ArrowError::IpcError("Unable to read IPC message as record
batch".to_string())
})?;
// read the block that makes up the record batch into a buffer
- read_record_batch_impl(
+ ArrayReader::try_new(
&buf.slice(block.metaDataLength() as _),
batch,
self.schema.clone(),
&self.dictionaries,
- self.projection.as_deref(),
&message.version(),
- self.require_alignment,
- )
+ )?
+ .with_projection(self.projection.as_deref())
+ .with_require_alignment(self.require_alignment)
+ .read_record_batch()
.map(Some)
}
crate::MessageHeader::NONE => Ok(None),
@@ -1389,15 +1426,16 @@ impl<R: Read> StreamReader<R> {
let mut buf =
MutableBuffer::from_len_zeroed(message.bodyLength() as usize);
self.reader.read_exact(&mut buf)?;
- read_record_batch_impl(
+ ArrayReader::try_new(
&buf.into(),
batch,
self.schema(),
&self.dictionaries_by_id,
- self.projection.as_ref().map(|x| x.0.as_ref()),
&message.version(),
- false,
- )
+ )?
+ .with_projection(self.projection.as_ref().map(|x|
x.0.as_ref()))
+ .with_require_alignment(false)
+ .read_record_batch()
.map(Some)
}
crate::MessageHeader::DictionaryBatch => {
@@ -2239,15 +2277,16 @@ mod tests {
assert_ne!(b.as_ptr().align_offset(8), 0);
let ipc_batch = message.header_as_record_batch().unwrap();
- let roundtrip = read_record_batch_impl(
+ let roundtrip = ArrayReader::try_new(
&b,
ipc_batch,
batch.schema(),
&Default::default(),
- None,
&message.version(),
- false,
)
+ .unwrap()
+ .with_require_alignment(false)
+ .read_record_batch()
.unwrap();
assert_eq!(batch, roundtrip);
}
@@ -2277,15 +2316,16 @@ mod tests {
assert_ne!(b.as_ptr().align_offset(8), 0);
let ipc_batch = message.header_as_record_batch().unwrap();
- let result = read_record_batch_impl(
+ let result = ArrayReader::try_new(
&b,
ipc_batch,
batch.schema(),
&Default::default(),
- None,
&message.version(),
- true,
- );
+ )
+ .unwrap()
+ .with_require_alignment(true)
+ .read_record_batch();
let error = result.unwrap_err();
assert_eq!(
diff --git a/arrow-ipc/src/reader/stream.rs b/arrow-ipc/src/reader/stream.rs
index 9b0eea9b61..e66896f151 100644
--- a/arrow-ipc/src/reader/stream.rs
+++ b/arrow-ipc/src/reader/stream.rs
@@ -24,7 +24,7 @@ use arrow_buffer::{Buffer, MutableBuffer};
use arrow_schema::{ArrowError, SchemaRef};
use crate::convert::MessageBuffer;
-use crate::reader::{read_dictionary_impl, read_record_batch_impl};
+use crate::reader::{read_dictionary_impl, ArrayReader};
use crate::{MessageHeader, CONTINUATION_MARKER};
/// A low-level interface for reading [`RecordBatch`] data from a stream of
bytes
@@ -211,15 +211,15 @@ impl StreamDecoder {
let schema = self.schema.clone().ok_or_else(|| {
ArrowError::IpcError("Missing
schema".to_string())
})?;
- let batch = read_record_batch_impl(
+ let batch = ArrayReader::try_new(
&body,
batch,
schema,
&self.dictionaries,
- None,
&version,
- self.require_alignment,
- )?;
+ )?
+ .with_require_alignment(self.require_alignment)
+ .read_record_batch()?;
self.state = DecoderState::default();
return Ok(Some(batch));
}