This is an automated email from the ASF dual-hosted git repository.
tustvold pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git
The following commit(s) were added to refs/heads/master by this push:
new 9630aaf55b Blockwise IO in IPC FileReader (#5153) (#5179)
9630aaf55b is described below
commit 9630aaf55bda98e2028c4f44e6a7264ec41e04d5
Author: Raphael Taylor-Davies <[email protected]>
AuthorDate: Fri Dec 8 21:51:48 2023 +0000
Blockwise IO in IPC FileReader (#5153) (#5179)
* Blockwise IO in IPC FileReader (#5153)
* Docs
* Clippy
* Update arrow-ipc/src/reader.rs
Co-authored-by: Andrew Lamb <[email protected]>
---------
Co-authored-by: Andrew Lamb <[email protected]>
---
arrow-ipc/src/reader.rs | 121 +++++++++++++++++++-----------------------------
1 file changed, 48 insertions(+), 73 deletions(-)
diff --git a/arrow-ipc/src/reader.rs b/arrow-ipc/src/reader.rs
index 6f2cb30a16..06e53505fc 100644
--- a/arrow-ipc/src/reader.rs
+++ b/arrow-ipc/src/reader.rs
@@ -27,12 +27,12 @@ use std::io::{BufReader, Read, Seek, SeekFrom};
use std::sync::Arc;
use arrow_array::*;
-use arrow_buffer::{Buffer, MutableBuffer};
+use arrow_buffer::{ArrowNativeType, Buffer, MutableBuffer};
use arrow_data::ArrayData;
use arrow_schema::*;
use crate::compression::CompressionCodec;
-use crate::{FieldNode, MetadataVersion, CONTINUATION_MARKER};
+use crate::{Block, FieldNode, Message, MetadataVersion, CONTINUATION_MARKER};
use DataType::*;
/// Read a buffer based on offset and length
@@ -498,10 +498,34 @@ pub fn read_dictionary(
Ok(())
}
+/// Read the data for a given block
+fn read_block<R: Read + Seek>(mut reader: R, block: &Block) -> Result<Buffer,
ArrowError> {
+ reader.seek(SeekFrom::Start(block.offset() as u64))?;
+ let body_len = block.bodyLength().to_usize().unwrap();
+ let metadata_len = block.metaDataLength().to_usize().unwrap();
+ let total_len = body_len.checked_add(metadata_len).unwrap();
+
+ let mut buf = MutableBuffer::from_len_zeroed(total_len);
+ reader.read_exact(&mut buf)?;
+ Ok(buf.into())
+}
+
+/// Parse an encapsulated message
+///
+///
<https://arrow.apache.org/docs/format/Columnar.html#encapsulated-message-format>
+fn parse_message(buf: &[u8]) -> Result<Message, ArrowError> {
+ let buf = match buf[..4] == CONTINUATION_MARKER {
+ true => &buf[8..],
+ false => &buf[4..],
+ };
+ crate::root_as_message(buf)
+ .map_err(|err| ArrowError::ParseError(format!("Unable to get root as
message: {err:?}")))
+}
+
/// Arrow File reader
pub struct FileReader<R: Read + Seek> {
/// Buffered file reader that supports reading and seeking
- reader: BufReader<R>,
+ reader: R,
/// The schema that is read from the file header
schema: SchemaRef,
@@ -535,7 +559,6 @@ pub struct FileReader<R: Read + Seek> {
impl<R: Read + Seek> fmt::Debug for FileReader<R> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> std::result::Result<(),
fmt::Error> {
f.debug_struct("FileReader<R>")
- .field("reader", &"BufReader<..>")
.field("schema", &self.schema)
.field("blocks", &self.blocks)
.field("current_block", &self.current_block)
@@ -543,37 +566,28 @@ impl<R: Read + Seek> fmt::Debug for FileReader<R> {
.field("dictionaries_by_id", &self.dictionaries_by_id)
.field("metadata_version", &self.metadata_version)
.field("projection", &self.projection)
- .finish()
+ .finish_non_exhaustive()
}
}
impl<R: Read + Seek> FileReader<R> {
/// Try to create a new file reader
///
- /// Returns errors if the file does not meet the Arrow Format header and
footer
- /// requirements
- pub fn try_new(reader: R, projection: Option<Vec<usize>>) -> Result<Self,
ArrowError> {
- let mut reader = BufReader::new(reader);
- // check if header and footer contain correct magic bytes
- let mut magic_buffer: [u8; 6] = [0; 6];
- reader.read_exact(&mut magic_buffer)?;
- if magic_buffer != super::ARROW_MAGIC {
- return Err(ArrowError::ParseError(
- "Arrow file does not contain correct header".to_string(),
- ));
- }
- reader.seek(SeekFrom::End(-6))?;
- reader.read_exact(&mut magic_buffer)?;
- if magic_buffer != super::ARROW_MAGIC {
+ /// Returns errors if the file does not meet the Arrow Format footer
requirements
+ pub fn try_new(mut reader: R, projection: Option<Vec<usize>>) ->
Result<Self, ArrowError> {
+ // Space for ARROW_MAGIC (6 bytes) and length (4 bytes)
+ let mut buffer = [0; 10];
+ reader.seek(SeekFrom::End(-10))?;
+ reader.read_exact(&mut buffer)?;
+
+ if buffer[4..] != super::ARROW_MAGIC {
return Err(ArrowError::ParseError(
"Arrow file does not contain correct footer".to_string(),
));
}
+
// read footer length
- let mut footer_size: [u8; 4] = [0; 4];
- reader.seek(SeekFrom::End(-10))?;
- reader.read_exact(&mut footer_size)?;
- let footer_len = i32::from_le_bytes(footer_size);
+ let footer_len = i32::from_le_bytes(buffer[..4].try_into().unwrap());
// read footer
let mut footer_data = vec![0; footer_len as usize];
@@ -607,35 +621,14 @@ impl<R: Read + Seek> FileReader<R> {
let mut dictionaries_by_id = HashMap::new();
if let Some(dictionaries) = footer.dictionaries() {
for block in dictionaries {
- // read length from end of offset
- let mut message_size: [u8; 4] = [0; 4];
- reader.seek(SeekFrom::Start(block.offset() as u64))?;
- reader.read_exact(&mut message_size)?;
- if message_size == CONTINUATION_MARKER {
- reader.read_exact(&mut message_size)?;
- }
- let footer_len = i32::from_le_bytes(message_size);
- let mut block_data = vec![0; footer_len as usize];
-
- reader.read_exact(&mut block_data)?;
-
- let message =
crate::root_as_message(&block_data[..]).map_err(|err| {
- ArrowError::ParseError(format!("Unable to get root as
message: {err:?}"))
- })?;
+ let buf = read_block(&mut reader, block)?;
+ let message = parse_message(&buf)?;
match message.header_type() {
crate::MessageHeader::DictionaryBatch => {
let batch =
message.header_as_dictionary_batch().unwrap();
-
- // read the block that makes up the dictionary batch
into a buffer
- let mut buf =
MutableBuffer::from_len_zeroed(message.bodyLength() as usize);
- reader.seek(SeekFrom::Start(
- block.offset() as u64 + block.metaDataLength() as
u64,
- ))?;
- reader.read_exact(&mut buf)?;
-
read_dictionary(
- &buf.into(),
+ &buf.slice(block.metaDataLength() as _),
batch,
&schema,
&mut dictionaries_by_id,
@@ -702,27 +695,15 @@ impl<R: Read + Seek> FileReader<R> {
}
fn maybe_next(&mut self) -> Result<Option<RecordBatch>, ArrowError> {
- let block = self.blocks[self.current_block];
+ let block = &self.blocks[self.current_block];
self.current_block += 1;
// read length
- self.reader.seek(SeekFrom::Start(block.offset() as u64))?;
- let mut meta_buf = [0; 4];
- self.reader.read_exact(&mut meta_buf)?;
- if meta_buf == CONTINUATION_MARKER {
- // continuation marker encountered, read message next
- self.reader.read_exact(&mut meta_buf)?;
- }
- let meta_len = i32::from_le_bytes(meta_buf);
-
- let mut block_data = vec![0; meta_len as usize];
- self.reader.read_exact(&mut block_data)?;
- let message = crate::root_as_message(&block_data[..]).map_err(|err| {
- ArrowError::ParseError(format!("Unable to get root as footer:
{err:?}"))
- })?;
+ let buffer = read_block(&mut self.reader, block)?;
+ let message = parse_message(&buffer)?;
// some old test data's footer metadata is not set, so we account for
that
- if self.metadata_version != crate::MetadataVersion::V1
+ if self.metadata_version != MetadataVersion::V1
&& message.version() != self.metadata_version
{
return Err(ArrowError::IpcError(
@@ -739,14 +720,8 @@ impl<R: Read + Seek> FileReader<R> {
ArrowError::IpcError("Unable to read IPC message as record
batch".to_string())
})?;
// read the block that makes up the record batch into a buffer
- let mut buf =
MutableBuffer::from_len_zeroed(message.bodyLength() as usize);
- self.reader.seek(SeekFrom::Start(
- block.offset() as u64 + block.metaDataLength() as u64,
- ))?;
- self.reader.read_exact(&mut buf)?;
-
read_record_batch(
- &buf.into(),
+ &buffer.slice(block.metaDataLength() as _),
batch,
self.schema(),
&self.dictionaries_by_id,
@@ -766,14 +741,14 @@ impl<R: Read + Seek> FileReader<R> {
///
/// It is inadvisable to directly read from the underlying reader.
pub fn get_ref(&self) -> &R {
- self.reader.get_ref()
+ &self.reader
}
/// Gets a mutable reference to the underlying reader.
///
/// It is inadvisable to directly read from the underlying reader.
pub fn get_mut(&mut self) -> &mut R {
- self.reader.get_mut()
+ &mut self.reader
}
}