This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git
The following commit(s) were added to refs/heads/main by this push:
new b8b2f21f6a Add example reading data from an `mmap`ed IPC file (#6986)
b8b2f21f6a is described below
commit b8b2f21f6a8254224d37a1e2d231b6b1e1767648
Author: Andrew Lamb <[email protected]>
AuthorDate: Tue Jan 21 06:21:44 2025 -0500
Add example reading data from an `mmap`ed IPC file (#6986)
* Add example reading data from an `mmap`ed IPC file
* update readme
---
arrow-ipc/src/reader.rs | 8 ++
arrow/Cargo.toml | 8 ++
arrow/examples/README.md | 1 +
arrow/examples/zero_copy_ipc.rs | 157 ++++++++++++++++++++++++++++++++++++++++
4 files changed, 174 insertions(+)
diff --git a/arrow-ipc/src/reader.rs b/arrow-ipc/src/reader.rs
index 5cf208c8eb..4dcd56156e 100644
--- a/arrow-ipc/src/reader.rs
+++ b/arrow-ipc/src/reader.rs
@@ -683,6 +683,10 @@ pub fn read_footer_length(buf: [u8; 10]) -> Result<usize,
ArrowError> {
///
/// For a higher-level interface see [`FileReader`]
///
+/// For an example of using this API with `mmap` see the [`zero_copy_ipc`]
example.
+///
+/// [`zero_copy_ipc`]:
https://github.com/apache/arrow-rs/blob/main/arrow/examples/zero_copy_ipc.rs
+///
/// ```
/// # use std::sync::Arc;
/// # use arrow_array::*;
@@ -994,6 +998,10 @@ impl FileReaderBuilder {
}
/// Arrow File reader
+///
+/// For an example creating Arrays with memory mapped (`mmap`) files see the
[`zero_copy_ipc`] example.
+///
+/// [`zero_copy_ipc`]:
https://github.com/apache/arrow-rs/blob/main/arrow/examples/zero_copy_ipc.rs
pub struct FileReader<R> {
/// File reader that supports reading and seeking
reader: R,
diff --git a/arrow/Cargo.toml b/arrow/Cargo.toml
index 76119ec4ab..1b01dcd25e 100644
--- a/arrow/Cargo.toml
+++ b/arrow/Cargo.toml
@@ -87,6 +87,9 @@ criterion = { version = "0.5", default-features = false }
half = { version = "2.1", default-features = false }
rand = { version = "0.8", default-features = false, features = ["std",
"std_rng"] }
serde = { version = "1.0", default-features = false, features = ["derive"] }
+# used in examples
+memmap2 = "0.9.3"
+bytes = "1.9"
[build-dependencies]
@@ -105,6 +108,11 @@ name = "read_csv_infer_schema"
required-features = ["prettyprint", "csv"]
path = "./examples/read_csv_infer_schema.rs"
+[[example]]
+name = "zero_copy_ipc"
+required-features = ["prettyprint"]
+path = "examples/zero_copy_ipc.rs"
+
[[bench]]
name = "aggregate_kernels"
harness = false
diff --git a/arrow/examples/README.md b/arrow/examples/README.md
index 87aa6ee047..9d896cd117 100644
--- a/arrow/examples/README.md
+++ b/arrow/examples/README.md
@@ -24,5 +24,6 @@
- [`dynamic_types.rs`](dynamic_types.rs): Dealing with mixed types dynamically
at runtime
- [`read_csv.rs`](read_csv.rs): Reading CSV files with explicit schema, pretty
printing Arrays
- [`read_csv_infer_schema.rs`](read_csv_infer_schema.rs): Reading CSV files,
pretty printing Arrays
+- [`zero_copy_ipc`](zero_copy_ipc): Zero copy read of Arrow IPC file using
`mmap`
- [`tensor_builder.rs`](tensor_builder.rs): Using tensor builder
- [`version.rs`](version.rs): Print the arrow version and exit
diff --git a/arrow/examples/zero_copy_ipc.rs b/arrow/examples/zero_copy_ipc.rs
new file mode 100644
index 0000000000..15fc477c59
--- /dev/null
+++ b/arrow/examples/zero_copy_ipc.rs
@@ -0,0 +1,157 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Example of reading arrow IPC files and streams using "zero copy" API
+//!
+//! Zero copy in this case means the Arrow arrays refer directly to a user
+//! provided buffer or memory region.
+
+use arrow::array::{record_batch, RecordBatch};
+use arrow::error::Result;
+use arrow_buffer::Buffer;
+use arrow_cast::pretty::pretty_format_batches;
+use arrow_ipc::convert::fb_to_schema;
+use arrow_ipc::reader::{read_footer_length, FileDecoder};
+use arrow_ipc::writer::FileWriter;
+use arrow_ipc::{root_as_footer, Block};
+use std::path::PathBuf;
+use std::sync::Arc;
+
+/// This example shows how to read data from an Arrow IPC file without copying
+/// using `mmap` and the [`FileDecoder`] API
+fn main() {
+ // Create a temporary file with 3 record batches
+ let ipc_path = ipc_file();
+ // Open the file and memory map it using the mmap2 crate:
+ let ipc_file = std::fs::File::open(&ipc_path.path).expect("failed to open
file");
+ let mmap = unsafe { memmap2::Mmap::map(&ipc_file).expect("failed to mmap
file") };
+
+ // Convert the mmap region to an Arrow `Buffer` to back the arrow arrays.
We
+ // do this by first creating a `bytes::Bytes` (which is zero copy) and then
+ // creating a Buffer from the `Bytes` (which is also zero copy)
+ let bytes = bytes::Bytes::from_owner(mmap);
+ let buffer = Buffer::from(bytes);
+
+ // Now, use the FileDecoder API (wrapped by `IPCBufferDecoder` for
+ // convenience) to crate Arrays re-using the data in the underlying buffer
+ let decoder = IPCBufferDecoder::new(buffer);
+ assert_eq!(decoder.num_batches(), 3);
+
+ // Create the Arrays and print them
+ for i in 0..decoder.num_batches() {
+ let batch = decoder.get_batch(i).unwrap().expect("failed to read
batch");
+ assert_eq!(3, batch.num_rows());
+ println!("Batch {i}\n{}", pretty_format_batches(&[batch]).unwrap());
+ }
+}
+
+/// Return 3 [`RecordBatch`]es with a single column of type `Int32`
+fn example_data() -> Vec<RecordBatch> {
+ vec![
+ record_batch!(("my_column", Int32, [1, 2, 3])).unwrap(),
+ record_batch!(("my_column", Int32, [4, 5, 6])).unwrap(),
+ record_batch!(("my_column", Int32, [7, 8, 9])).unwrap(),
+ ]
+}
+
+/// Return a temporary file that contains an IPC file with 3 [`RecordBatch`]es
+fn ipc_file() -> TempFile {
+ let path = PathBuf::from("example.arrow");
+ let file = std::fs::File::create(&path).unwrap();
+ let data = example_data();
+ let mut writer = FileWriter::try_new(file, &data[0].schema()).unwrap();
+ for batch in &data {
+ writer.write(batch).unwrap();
+ }
+ writer.finish().unwrap();
+ TempFile { path }
+}
+
+/// Incrementally decodes [`RecordBatch`]es from an IPC file stored in a Arrow
+/// [`Buffer`] using the [`FileDecoder`] API.
+///
+/// This is a wrapper around the example in the `FileDecoder` which handles the
+/// low level interaction with the Arrow IPC format.
+struct IPCBufferDecoder {
+ /// Memory (or memory mapped) Buffer with the data
+ buffer: Buffer,
+ /// Decoder that reads Arrays that refers to the underlying buffers
+ decoder: FileDecoder,
+ /// Location of the batches within the buffer
+ batches: Vec<Block>,
+}
+
+impl IPCBufferDecoder {
+ fn new(buffer: Buffer) -> Self {
+ let trailer_start = buffer.len() - 10;
+ let footer_len =
read_footer_length(buffer[trailer_start..].try_into().unwrap()).unwrap();
+ let footer = root_as_footer(&buffer[trailer_start -
footer_len..trailer_start]).unwrap();
+
+ let schema = fb_to_schema(footer.schema().unwrap());
+
+ let mut decoder = FileDecoder::new(Arc::new(schema), footer.version());
+
+ // Read dictionaries
+ for block in footer.dictionaries().iter().flatten() {
+ let block_len = block.bodyLength() as usize +
block.metaDataLength() as usize;
+ let data = buffer.slice_with_length(block.offset() as _,
block_len);
+ decoder.read_dictionary(block, &data).unwrap();
+ }
+
+ // convert to Vec from the flatbuffers Vector to avoid having a direct
dependency on flatbuffers
+ let batches = footer
+ .recordBatches()
+ .map(|b| b.iter().copied().collect())
+ .unwrap_or_default();
+
+ Self {
+ buffer,
+ decoder,
+ batches,
+ }
+ }
+
+ /// Return the number of [`RecordBatch`]es in this buffer
+ fn num_batches(&self) -> usize {
+ self.batches.len()
+ }
+
+ /// Return the [`RecordBatch`] at message index `i`.
+ ///
+ /// This may return `None` if the IPC message was None
+ fn get_batch(&self, i: usize) -> Result<Option<RecordBatch>> {
+ let block = &self.batches[i];
+ let block_len = block.bodyLength() as usize + block.metaDataLength()
as usize;
+ let data = self
+ .buffer
+ .slice_with_length(block.offset() as _, block_len);
+ self.decoder.read_record_batch(block, &data)
+ }
+}
+
+/// This structure deletes the file when it is dropped
+struct TempFile {
+ path: PathBuf,
+}
+
+impl Drop for TempFile {
+ fn drop(&mut self) {
+ if let Err(e) = std::fs::remove_file(&self.path) {
+ println!("Error deleting '{:?}': {:?}", self.path, e);
+ }
+ }
+}