This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git


The following commit(s) were added to refs/heads/main by this push:
     new b8b2f21f6a Add example reading data from an `mmap`ed IPC file (#6986)
b8b2f21f6a is described below

commit b8b2f21f6a8254224d37a1e2d231b6b1e1767648
Author: Andrew Lamb <[email protected]>
AuthorDate: Tue Jan 21 06:21:44 2025 -0500

    Add example reading data from an `mmap`ed IPC file (#6986)
    
    * Add example reading data from an `mmap`ed IPC file
    
    * update readme
---
 arrow-ipc/src/reader.rs         |   8 ++
 arrow/Cargo.toml                |   8 ++
 arrow/examples/README.md        |   1 +
 arrow/examples/zero_copy_ipc.rs | 157 ++++++++++++++++++++++++++++++++++++++++
 4 files changed, 174 insertions(+)

diff --git a/arrow-ipc/src/reader.rs b/arrow-ipc/src/reader.rs
index 5cf208c8eb..4dcd56156e 100644
--- a/arrow-ipc/src/reader.rs
+++ b/arrow-ipc/src/reader.rs
@@ -683,6 +683,10 @@ pub fn read_footer_length(buf: [u8; 10]) -> Result<usize, 
ArrowError> {
 ///
 /// For a higher-level interface see [`FileReader`]
 ///
+/// For an example of using this API with `mmap` see the [`zero_copy_ipc`] 
example.
+///
+/// [`zero_copy_ipc`]: 
https://github.com/apache/arrow-rs/blob/main/arrow/examples/zero_copy_ipc.rs
+///
 /// ```
 /// # use std::sync::Arc;
 /// # use arrow_array::*;
@@ -994,6 +998,10 @@ impl FileReaderBuilder {
 }
 
 /// Arrow File reader
+///
+/// For an example creating Arrays with memory mapped (`mmap`) files see the 
[`zero_copy_ipc`] example.
+///
+/// [`zero_copy_ipc`]: 
https://github.com/apache/arrow-rs/blob/main/arrow/examples/zero_copy_ipc.rs
 pub struct FileReader<R> {
     /// File reader that supports reading and seeking
     reader: R,
diff --git a/arrow/Cargo.toml b/arrow/Cargo.toml
index 76119ec4ab..1b01dcd25e 100644
--- a/arrow/Cargo.toml
+++ b/arrow/Cargo.toml
@@ -87,6 +87,9 @@ criterion = { version = "0.5", default-features = false }
 half = { version = "2.1", default-features = false }
 rand = { version = "0.8", default-features = false, features = ["std", 
"std_rng"] }
 serde = { version = "1.0", default-features = false, features = ["derive"] }
+# used in examples
+memmap2 = "0.9.3"
+bytes = "1.9"
 
 [build-dependencies]
 
@@ -105,6 +108,11 @@ name = "read_csv_infer_schema"
 required-features = ["prettyprint", "csv"]
 path = "./examples/read_csv_infer_schema.rs"
 
+[[example]]
+name = "zero_copy_ipc"
+required-features = ["prettyprint"]
+path = "examples/zero_copy_ipc.rs"
+
 [[bench]]
 name = "aggregate_kernels"
 harness = false
diff --git a/arrow/examples/README.md b/arrow/examples/README.md
index 87aa6ee047..9d896cd117 100644
--- a/arrow/examples/README.md
+++ b/arrow/examples/README.md
@@ -24,5 +24,6 @@
 - [`dynamic_types.rs`](dynamic_types.rs): Dealing with mixed types dynamically 
at runtime
 - [`read_csv.rs`](read_csv.rs): Reading CSV files with explicit schema, pretty 
printing Arrays
 - [`read_csv_infer_schema.rs`](read_csv_infer_schema.rs): Reading CSV files, 
pretty printing Arrays
+- [`zero_copy_ipc`](zero_copy_ipc): Zero copy read of Arrow IPC file using 
`mmap`
 - [`tensor_builder.rs`](tensor_builder.rs): Using tensor builder
 - [`version.rs`](version.rs): Print the arrow version and exit
diff --git a/arrow/examples/zero_copy_ipc.rs b/arrow/examples/zero_copy_ipc.rs
new file mode 100644
index 0000000000..15fc477c59
--- /dev/null
+++ b/arrow/examples/zero_copy_ipc.rs
@@ -0,0 +1,157 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Example of reading arrow IPC files and streams using "zero copy" API
+//!
+//! Zero copy in this case means the Arrow arrays refer directly to a user
+//! provided buffer or memory region.
+
+use arrow::array::{record_batch, RecordBatch};
+use arrow::error::Result;
+use arrow_buffer::Buffer;
+use arrow_cast::pretty::pretty_format_batches;
+use arrow_ipc::convert::fb_to_schema;
+use arrow_ipc::reader::{read_footer_length, FileDecoder};
+use arrow_ipc::writer::FileWriter;
+use arrow_ipc::{root_as_footer, Block};
+use std::path::PathBuf;
+use std::sync::Arc;
+
+/// This example shows how to read data from an Arrow IPC file without copying
+/// using `mmap` and the [`FileDecoder`] API
+fn main() {
+    // Create a temporary file with 3 record batches
+    let ipc_path = ipc_file();
+    // Open the file and memory map it using the mmap2 crate:
+    let ipc_file = std::fs::File::open(&ipc_path.path).expect("failed to open 
file");
+    let mmap = unsafe { memmap2::Mmap::map(&ipc_file).expect("failed to mmap 
file") };
+
+    // Convert the mmap region to an Arrow `Buffer` to back the arrow arrays. 
We
+    // do this by first creating a `bytes::Bytes` (which is zero copy) and then
+    // creating a Buffer from the `Bytes` (which is also zero copy)
+    let bytes = bytes::Bytes::from_owner(mmap);
+    let buffer = Buffer::from(bytes);
+
+    // Now, use the FileDecoder API (wrapped by `IPCBufferDecoder` for
+    // convenience) to crate Arrays re-using the data in the underlying buffer
+    let decoder = IPCBufferDecoder::new(buffer);
+    assert_eq!(decoder.num_batches(), 3);
+
+    // Create the Arrays and print them
+    for i in 0..decoder.num_batches() {
+        let batch = decoder.get_batch(i).unwrap().expect("failed to read 
batch");
+        assert_eq!(3, batch.num_rows());
+        println!("Batch {i}\n{}", pretty_format_batches(&[batch]).unwrap());
+    }
+}
+
+/// Return 3 [`RecordBatch`]es with a single column of type `Int32`
+fn example_data() -> Vec<RecordBatch> {
+    vec![
+        record_batch!(("my_column", Int32, [1, 2, 3])).unwrap(),
+        record_batch!(("my_column", Int32, [4, 5, 6])).unwrap(),
+        record_batch!(("my_column", Int32, [7, 8, 9])).unwrap(),
+    ]
+}
+
+/// Return a temporary file that contains an IPC file with 3 [`RecordBatch`]es
+fn ipc_file() -> TempFile {
+    let path = PathBuf::from("example.arrow");
+    let file = std::fs::File::create(&path).unwrap();
+    let data = example_data();
+    let mut writer = FileWriter::try_new(file, &data[0].schema()).unwrap();
+    for batch in &data {
+        writer.write(batch).unwrap();
+    }
+    writer.finish().unwrap();
+    TempFile { path }
+}
+
+/// Incrementally decodes [`RecordBatch`]es from an IPC file stored in a Arrow
+/// [`Buffer`] using the [`FileDecoder`] API.
+///
+/// This is a wrapper around the example in the `FileDecoder` which handles the
+/// low level interaction with the Arrow IPC format.
+struct IPCBufferDecoder {
+    /// Memory (or memory mapped) Buffer with the data
+    buffer: Buffer,
+    /// Decoder that reads Arrays that refers to the underlying buffers
+    decoder: FileDecoder,
+    /// Location of the batches within the buffer
+    batches: Vec<Block>,
+}
+
+impl IPCBufferDecoder {
+    fn new(buffer: Buffer) -> Self {
+        let trailer_start = buffer.len() - 10;
+        let footer_len = 
read_footer_length(buffer[trailer_start..].try_into().unwrap()).unwrap();
+        let footer = root_as_footer(&buffer[trailer_start - 
footer_len..trailer_start]).unwrap();
+
+        let schema = fb_to_schema(footer.schema().unwrap());
+
+        let mut decoder = FileDecoder::new(Arc::new(schema), footer.version());
+
+        // Read dictionaries
+        for block in footer.dictionaries().iter().flatten() {
+            let block_len = block.bodyLength() as usize + 
block.metaDataLength() as usize;
+            let data = buffer.slice_with_length(block.offset() as _, 
block_len);
+            decoder.read_dictionary(block, &data).unwrap();
+        }
+
+        // convert to Vec from the flatbuffers Vector to avoid having a direct 
dependency on flatbuffers
+        let batches = footer
+            .recordBatches()
+            .map(|b| b.iter().copied().collect())
+            .unwrap_or_default();
+
+        Self {
+            buffer,
+            decoder,
+            batches,
+        }
+    }
+
+    /// Return the number of [`RecordBatch`]es in this buffer
+    fn num_batches(&self) -> usize {
+        self.batches.len()
+    }
+
+    /// Return the [`RecordBatch`] at message index `i`.
+    ///
+    /// This may return `None` if the IPC message was None
+    fn get_batch(&self, i: usize) -> Result<Option<RecordBatch>> {
+        let block = &self.batches[i];
+        let block_len = block.bodyLength() as usize + block.metaDataLength() 
as usize;
+        let data = self
+            .buffer
+            .slice_with_length(block.offset() as _, block_len);
+        self.decoder.read_record_batch(block, &data)
+    }
+}
+
+/// This structure deletes the file when it is dropped
+struct TempFile {
+    path: PathBuf,
+}
+
+impl Drop for TempFile {
+    fn drop(&mut self) {
+        if let Err(e) = std::fs::remove_file(&self.path) {
+            println!("Error deleting '{:?}': {:?}", self.path, e);
+        }
+    }
+}

Reply via email to