This is an automated email from the ASF dual-hosted git repository.

tustvold pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git


The following commit(s) were added to refs/heads/master by this push:
     new 9630aaf55b Blockwise IO in IPC FileReader (#5153) (#5179)
9630aaf55b is described below

commit 9630aaf55bda98e2028c4f44e6a7264ec41e04d5
Author: Raphael Taylor-Davies <[email protected]>
AuthorDate: Fri Dec 8 21:51:48 2023 +0000

    Blockwise IO in IPC FileReader (#5153) (#5179)
    
    * Blockwise IO in IPC FileReader (#5153)
    
    * Docs
    
    * Clippy
    
    * Update arrow-ipc/src/reader.rs
    
    Co-authored-by: Andrew Lamb <[email protected]>
    
    ---------
    
    Co-authored-by: Andrew Lamb <[email protected]>
---
 arrow-ipc/src/reader.rs | 121 +++++++++++++++++++-----------------------------
 1 file changed, 48 insertions(+), 73 deletions(-)

diff --git a/arrow-ipc/src/reader.rs b/arrow-ipc/src/reader.rs
index 6f2cb30a16..06e53505fc 100644
--- a/arrow-ipc/src/reader.rs
+++ b/arrow-ipc/src/reader.rs
@@ -27,12 +27,12 @@ use std::io::{BufReader, Read, Seek, SeekFrom};
 use std::sync::Arc;
 
 use arrow_array::*;
-use arrow_buffer::{Buffer, MutableBuffer};
+use arrow_buffer::{ArrowNativeType, Buffer, MutableBuffer};
 use arrow_data::ArrayData;
 use arrow_schema::*;
 
 use crate::compression::CompressionCodec;
-use crate::{FieldNode, MetadataVersion, CONTINUATION_MARKER};
+use crate::{Block, FieldNode, Message, MetadataVersion, CONTINUATION_MARKER};
 use DataType::*;
 
 /// Read a buffer based on offset and length
@@ -498,10 +498,34 @@ pub fn read_dictionary(
     Ok(())
 }
 
+/// Read the data for a given block
+fn read_block<R: Read + Seek>(mut reader: R, block: &Block) -> Result<Buffer, 
ArrowError> {
+    reader.seek(SeekFrom::Start(block.offset() as u64))?;
+    let body_len = block.bodyLength().to_usize().unwrap();
+    let metadata_len = block.metaDataLength().to_usize().unwrap();
+    let total_len = body_len.checked_add(metadata_len).unwrap();
+
+    let mut buf = MutableBuffer::from_len_zeroed(total_len);
+    reader.read_exact(&mut buf)?;
+    Ok(buf.into())
+}
+
+/// Parse an encapsulated message
+///
+/// 
<https://arrow.apache.org/docs/format/Columnar.html#encapsulated-message-format>
+fn parse_message(buf: &[u8]) -> Result<Message, ArrowError> {
+    let buf = match buf[..4] == CONTINUATION_MARKER {
+        true => &buf[8..],
+        false => &buf[4..],
+    };
+    crate::root_as_message(buf)
+        .map_err(|err| ArrowError::ParseError(format!("Unable to get root as 
message: {err:?}")))
+}
+
 /// Arrow File reader
 pub struct FileReader<R: Read + Seek> {
     /// Buffered file reader that supports reading and seeking
-    reader: BufReader<R>,
+    reader: R,
 
     /// The schema that is read from the file header
     schema: SchemaRef,
@@ -535,7 +559,6 @@ pub struct FileReader<R: Read + Seek> {
 impl<R: Read + Seek> fmt::Debug for FileReader<R> {
     fn fmt(&self, f: &mut fmt::Formatter<'_>) -> std::result::Result<(), 
fmt::Error> {
         f.debug_struct("FileReader<R>")
-            .field("reader", &"BufReader<..>")
             .field("schema", &self.schema)
             .field("blocks", &self.blocks)
             .field("current_block", &self.current_block)
@@ -543,37 +566,28 @@ impl<R: Read + Seek> fmt::Debug for FileReader<R> {
             .field("dictionaries_by_id", &self.dictionaries_by_id)
             .field("metadata_version", &self.metadata_version)
             .field("projection", &self.projection)
-            .finish()
+            .finish_non_exhaustive()
     }
 }
 
 impl<R: Read + Seek> FileReader<R> {
     /// Try to create a new file reader
     ///
-    /// Returns errors if the file does not meet the Arrow Format header and 
footer
-    /// requirements
-    pub fn try_new(reader: R, projection: Option<Vec<usize>>) -> Result<Self, 
ArrowError> {
-        let mut reader = BufReader::new(reader);
-        // check if header and footer contain correct magic bytes
-        let mut magic_buffer: [u8; 6] = [0; 6];
-        reader.read_exact(&mut magic_buffer)?;
-        if magic_buffer != super::ARROW_MAGIC {
-            return Err(ArrowError::ParseError(
-                "Arrow file does not contain correct header".to_string(),
-            ));
-        }
-        reader.seek(SeekFrom::End(-6))?;
-        reader.read_exact(&mut magic_buffer)?;
-        if magic_buffer != super::ARROW_MAGIC {
+    /// Returns errors if the file does not meet the Arrow Format footer 
requirements
+    pub fn try_new(mut reader: R, projection: Option<Vec<usize>>) -> 
Result<Self, ArrowError> {
+        // Space for ARROW_MAGIC (6 bytes) and length (4 bytes)
+        let mut buffer = [0; 10];
+        reader.seek(SeekFrom::End(-10))?;
+        reader.read_exact(&mut buffer)?;
+
+        if buffer[4..] != super::ARROW_MAGIC {
             return Err(ArrowError::ParseError(
                 "Arrow file does not contain correct footer".to_string(),
             ));
         }
+
         // read footer length
-        let mut footer_size: [u8; 4] = [0; 4];
-        reader.seek(SeekFrom::End(-10))?;
-        reader.read_exact(&mut footer_size)?;
-        let footer_len = i32::from_le_bytes(footer_size);
+        let footer_len = i32::from_le_bytes(buffer[..4].try_into().unwrap());
 
         // read footer
         let mut footer_data = vec![0; footer_len as usize];
@@ -607,35 +621,14 @@ impl<R: Read + Seek> FileReader<R> {
         let mut dictionaries_by_id = HashMap::new();
         if let Some(dictionaries) = footer.dictionaries() {
             for block in dictionaries {
-                // read length from end of offset
-                let mut message_size: [u8; 4] = [0; 4];
-                reader.seek(SeekFrom::Start(block.offset() as u64))?;
-                reader.read_exact(&mut message_size)?;
-                if message_size == CONTINUATION_MARKER {
-                    reader.read_exact(&mut message_size)?;
-                }
-                let footer_len = i32::from_le_bytes(message_size);
-                let mut block_data = vec![0; footer_len as usize];
-
-                reader.read_exact(&mut block_data)?;
-
-                let message = 
crate::root_as_message(&block_data[..]).map_err(|err| {
-                    ArrowError::ParseError(format!("Unable to get root as 
message: {err:?}"))
-                })?;
+                let buf = read_block(&mut reader, block)?;
+                let message = parse_message(&buf)?;
 
                 match message.header_type() {
                     crate::MessageHeader::DictionaryBatch => {
                         let batch = 
message.header_as_dictionary_batch().unwrap();
-
-                        // read the block that makes up the dictionary batch 
into a buffer
-                        let mut buf = 
MutableBuffer::from_len_zeroed(message.bodyLength() as usize);
-                        reader.seek(SeekFrom::Start(
-                            block.offset() as u64 + block.metaDataLength() as 
u64,
-                        ))?;
-                        reader.read_exact(&mut buf)?;
-
                         read_dictionary(
-                            &buf.into(),
+                            &buf.slice(block.metaDataLength() as _),
                             batch,
                             &schema,
                             &mut dictionaries_by_id,
@@ -702,27 +695,15 @@ impl<R: Read + Seek> FileReader<R> {
     }
 
     fn maybe_next(&mut self) -> Result<Option<RecordBatch>, ArrowError> {
-        let block = self.blocks[self.current_block];
+        let block = &self.blocks[self.current_block];
         self.current_block += 1;
 
         // read length
-        self.reader.seek(SeekFrom::Start(block.offset() as u64))?;
-        let mut meta_buf = [0; 4];
-        self.reader.read_exact(&mut meta_buf)?;
-        if meta_buf == CONTINUATION_MARKER {
-            // continuation marker encountered, read message next
-            self.reader.read_exact(&mut meta_buf)?;
-        }
-        let meta_len = i32::from_le_bytes(meta_buf);
-
-        let mut block_data = vec![0; meta_len as usize];
-        self.reader.read_exact(&mut block_data)?;
-        let message = crate::root_as_message(&block_data[..]).map_err(|err| {
-            ArrowError::ParseError(format!("Unable to get root as footer: 
{err:?}"))
-        })?;
+        let buffer = read_block(&mut self.reader, block)?;
+        let message = parse_message(&buffer)?;
 
         // some old test data's footer metadata is not set, so we account for 
that
-        if self.metadata_version != crate::MetadataVersion::V1
+        if self.metadata_version != MetadataVersion::V1
             && message.version() != self.metadata_version
         {
             return Err(ArrowError::IpcError(
@@ -739,14 +720,8 @@ impl<R: Read + Seek> FileReader<R> {
                     ArrowError::IpcError("Unable to read IPC message as record 
batch".to_string())
                 })?;
                 // read the block that makes up the record batch into a buffer
-                let mut buf = 
MutableBuffer::from_len_zeroed(message.bodyLength() as usize);
-                self.reader.seek(SeekFrom::Start(
-                    block.offset() as u64 + block.metaDataLength() as u64,
-                ))?;
-                self.reader.read_exact(&mut buf)?;
-
                 read_record_batch(
-                    &buf.into(),
+                    &buffer.slice(block.metaDataLength() as _),
                     batch,
                     self.schema(),
                     &self.dictionaries_by_id,
@@ -766,14 +741,14 @@ impl<R: Read + Seek> FileReader<R> {
     ///
     /// It is inadvisable to directly read from the underlying reader.
     pub fn get_ref(&self) -> &R {
-        self.reader.get_ref()
+        &self.reader
     }
 
     /// Gets a mutable reference to the underlying reader.
     ///
     /// It is inadvisable to directly read from the underlying reader.
     pub fn get_mut(&mut self) -> &mut R {
-        self.reader.get_mut()
+        &mut self.reader
     }
 }
 

Reply via email to