This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git
The following commit(s) were added to refs/heads/main by this push:
new c4b43bb916 feat(arrow-avro): `HeaderInfo` to expose OCF header (#9548)
c4b43bb916 is described below
commit c4b43bb916aa91d366d17013d867992d931aae70
Author: Mikhail Zabaluev <[email protected]>
AuthorDate: Wed Mar 18 16:11:40 2026 +0200
feat(arrow-avro): `HeaderInfo` to expose OCF header (#9548)
# Which issue does this PR close?
- Closes #9460.
# Rationale for this change
Rework of #9462 along the lines proposed in
https://github.com/apache/arrow-rs/pull/9462#issuecomment-3995541243.
# What changes are included in this PR?
Add `HeaderInfo` as a cheaply cloneable value to expose header
information parsed from an Avro OCF file.
Add `read_header_info` function to the `reader` module, and its async
counterpart to the `reader::async_reader` module, to read the header
from the file reader and return `HeaderInfo`.
Add `build_with_header` method to async reader builder to enable reuse
of the header with multiple readers.
# Are these changes tested?
Added a test for the async reader.
# Are there any user-facing changes?
New API in arrow-avro:
* `reader::HeaderInfo`
* `reader::read_header_info` and
`reader::async_reader::read_header_info`
* `build_with_header` method of `AvroAsyncFileReader`'s builder.
---------
Co-authored-by: Connor Sanders
<[email protected]>
---
arrow-avro/src/reader/async_reader/builder.rs | 135 ++++++++++++++++----------
arrow-avro/src/reader/async_reader/mod.rs | 45 ++++++++-
arrow-avro/src/reader/header.rs | 67 ++++++++++++-
arrow-avro/src/reader/mod.rs | 8 +-
4 files changed, 195 insertions(+), 60 deletions(-)
diff --git a/arrow-avro/src/reader/async_reader/builder.rs
b/arrow-avro/src/reader/async_reader/builder.rs
index 9e979c7566..d3cca70425 100644
--- a/arrow-avro/src/reader/async_reader/builder.rs
+++ b/arrow-avro/src/reader/async_reader/builder.rs
@@ -18,10 +18,10 @@
use crate::codec::{AvroFieldBuilder, Tz};
use crate::errors::AvroError;
use crate::reader::async_reader::ReaderState;
-use crate::reader::header::{Header, HeaderDecoder};
+use crate::reader::header::{Header, HeaderDecoder, HeaderInfo};
use crate::reader::record::RecordDecoder;
use crate::reader::{AsyncAvroFileReader, AsyncFileReader, Decoder};
-use crate::schema::{AvroSchema, FingerprintAlgorithm, SCHEMA_METADATA_KEY};
+use crate::schema::{AvroSchema, FingerprintAlgorithm};
use indexmap::IndexMap;
use std::ops::Range;
@@ -119,50 +119,71 @@ impl<R> ReaderBuilder<R> {
}
}
-impl<R: AsyncFileReader> ReaderBuilder<R> {
- async fn read_header(&mut self) -> Result<(Header, u64), AvroError> {
- let mut decoder = HeaderDecoder::default();
- let mut position = 0;
- loop {
- let range_to_fetch = position
- ..(position +
self.header_size_hint.unwrap_or(DEFAULT_HEADER_SIZE_HINT))
- .min(self.file_size);
+/// Reads the Avro file header (magic, metadata, sync marker) asynchronously
from `reader`.
+///
+/// On success, returns the parsed [`HeaderInfo`] containing the header and
its length in bytes.
+pub async fn read_header_info<R>(
+ reader: &mut R,
+ file_size: u64,
+ header_size_hint: Option<u64>,
+) -> Result<HeaderInfo, AvroError>
+where
+ R: AsyncFileReader,
+{
+ read_header(reader, file_size, header_size_hint)
+ .await
+ .map(|(header, header_len)| HeaderInfo::new(header, header_len))
+}
- // Maybe EOF after the header, no actual data
- if range_to_fetch.is_empty() {
- break;
- }
+async fn read_header<R>(
+ reader: &mut R,
+ file_size: u64,
+ header_size_hint: Option<u64>,
+) -> Result<(Header, u64), AvroError>
+where
+ R: AsyncFileReader,
+{
+ let mut decoder = HeaderDecoder::default();
+ let mut position = 0;
+ loop {
+ let range_to_fetch = position
+ ..(position +
header_size_hint.unwrap_or(DEFAULT_HEADER_SIZE_HINT)).min(file_size);
- let current_data = self
- .reader
- .get_bytes(range_to_fetch.clone())
- .await
- .map_err(|err| {
- AvroError::General(format!(
- "Error fetching Avro header from file reader: {err}"
- ))
- })?;
- if current_data.is_empty() {
- return Err(AvroError::EOF(
- "Unexpected EOF while fetching header data".into(),
- ));
- }
+ // Maybe EOF after the header, no actual data
+ if range_to_fetch.is_empty() {
+ break;
+ }
- let read = current_data.len();
- let decoded = decoder.decode(¤t_data)?;
- if decoded != read {
- position += decoded as u64;
- break;
- }
- position += read as u64;
+ let current_data = reader
+ .get_bytes(range_to_fetch.clone())
+ .await
+ .map_err(|err| {
+ AvroError::General(format!(
+ "Error fetching Avro header from file reader: {err}"
+ ))
+ })?;
+ if current_data.is_empty() {
+ return Err(AvroError::EOF(
+ "Unexpected EOF while fetching header data".into(),
+ ));
}
- decoder
- .flush()
- .map(|header| (header, position))
- .ok_or_else(|| AvroError::EOF("Unexpected EOF while reading Avro
header".into()))
+ let read = current_data.len();
+ let decoded = decoder.decode(¤t_data)?;
+ if decoded != read {
+ position += decoded as u64;
+ break;
+ }
+ position += read as u64;
}
+ decoder
+ .flush()
+ .map(|header| (header, position))
+ .ok_or_else(|| AvroError::EOF("Unexpected EOF while reading Avro
header".into()))
+}
+
+impl<R: AsyncFileReader> ReaderBuilder<R> {
/// Build the asynchronous Avro reader with the provided parameters.
/// This reads the header first to initialize the reader state.
pub async fn try_build(mut self) -> Result<AsyncAvroFileReader<R>,
AvroError> {
@@ -172,18 +193,24 @@ impl<R: AsyncFileReader> ReaderBuilder<R> {
// Start by reading the header from the beginning of the avro file
// take the writer schema from the header
- let (header, header_len) = self.read_header().await?;
- let writer_schema = {
- let raw = header.get(SCHEMA_METADATA_KEY).ok_or_else(|| {
- AvroError::ParseError("No Avro schema present in file
header".to_string())
- })?;
- let json_string = std::str::from_utf8(raw)
- .map_err(|e| {
- AvroError::ParseError(format!("Invalid UTF-8 in Avro
schema header: {e}"))
- })?
- .to_string();
- AvroSchema::new(json_string)
- };
+ let header_info =
+ read_header_info(&mut self.reader, self.file_size,
self.header_size_hint).await?;
+
+ self.build_with_header(header_info)
+ }
+
+ /// Build the asynchronous Avro reader with the provided header.
+ ///
+ /// This allows initializing the reader with pre-parsed header information.
+ /// Note that this method is not async because it does not need to perform
any I/O operations.
+ ///
+ /// Note: Any `header_size_hint` set via [`Self::with_header_size_hint`]
is not used
+ /// when building with a pre-parsed header, since no header fetching
occurs.
+ pub fn build_with_header(
+ self,
+ header_info: HeaderInfo,
+ ) -> Result<AsyncAvroFileReader<R>, AvroError> {
+ let writer_schema = header_info.writer_schema()?;
// If projection exists, project the reader schema,
// if no reader schema is provided, parse it from the header(get the
raw writer schema), and project that
@@ -230,6 +257,7 @@ impl<R: AsyncFileReader> ReaderBuilder<R> {
IndexMap::new(),
FingerprintAlgorithm::Rabin,
);
+ let header_len = header_info.header_len();
let range = match self.range {
Some(r) => {
// If this PartitionedFile's range starts at 0, we need to
skip the header bytes.
@@ -252,8 +280,9 @@ impl<R: AsyncFileReader> ReaderBuilder<R> {
reader: self.reader,
}
};
- let codec = header.compression()?;
- let sync_marker = header.sync();
+
+ let codec = header_info.compression()?;
+ let sync_marker = header_info.sync();
Ok(AsyncAvroFileReader::new(
range,
diff --git a/arrow-avro/src/reader/async_reader/mod.rs
b/arrow-avro/src/reader/async_reader/mod.rs
index 0aaa739eef..c034411edb 100644
--- a/arrow-avro/src/reader/async_reader/mod.rs
+++ b/arrow-avro/src/reader/async_reader/mod.rs
@@ -15,6 +15,11 @@
// specific language governing permissions and limitations
// under the License.
+//! Asynchronous implementation of Avro file reader.
+//!
+//! This module provides [`AsyncAvroFileReader`], which supports reading and
decoding
+//! the Avro OCF format from any source that implements [`AsyncFileReader`].
+
use crate::compression::CompressionCodec;
use crate::reader::Decoder;
use crate::reader::block::{BlockDecoder, BlockDecoderState};
@@ -32,7 +37,7 @@ mod async_file_reader;
mod builder;
pub use async_file_reader::AsyncFileReader;
-pub use builder::ReaderBuilder;
+pub use builder::{ReaderBuilder, read_header_info};
#[cfg(feature = "object_store")]
mod store;
@@ -1286,6 +1291,44 @@ mod tests {
assert_eq!(batch.num_rows(), 8);
}
+ #[tokio::test]
+ async fn test_builder_with_header_info() {
+ let file = arrow_test_data("avro/alltypes_plain.avro");
+ let store = Arc::new(LocalFileSystem::new());
+ let location = Path::from_filesystem_path(&file).unwrap();
+
+ let file_size = store.head(&location).await.unwrap().size;
+
+ let mut file_reader = AvroObjectReader::new(store, location);
+
+ let header_info = read_header_info(&mut file_reader, file_size, None)
+ .await
+ .unwrap();
+
+ assert_eq!(header_info.header_len(), 675);
+
+ let writer_schema = header_info.writer_schema().unwrap();
+ let expected_avro_json: serde_json::Value = serde_json::from_str(
+ get_alltypes_schema()
+ .metadata()
+ .get(SCHEMA_METADATA_KEY)
+ .unwrap(),
+ )
+ .unwrap();
+ let actual_avro_json: serde_json::Value =
+ serde_json::from_str(&writer_schema.json_string).unwrap();
+ assert_eq!(actual_avro_json, expected_avro_json);
+
+ let reader = AsyncAvroFileReader::builder(file_reader, file_size, 1024)
+ .build_with_header(header_info)
+ .unwrap();
+
+ let batches: Vec<RecordBatch> = reader.try_collect().await.unwrap();
+
+ let batch = &batches[0];
+ assert_eq!(batch.num_rows(), 8)
+ }
+
#[tokio::test]
async fn test_roundtrip_write_then_async_read() {
use crate::writer::AvroWriter;
diff --git a/arrow-avro/src/reader/header.rs b/arrow-avro/src/reader/header.rs
index b5efd8bcdb..c5593ba0ad 100644
--- a/arrow-avro/src/reader/header.rs
+++ b/arrow-avro/src/reader/header.rs
@@ -20,12 +20,17 @@
use crate::compression::{CODEC_METADATA_KEY, CompressionCodec};
use crate::errors::AvroError;
use crate::reader::vlq::VLQDecoder;
-use crate::schema::{SCHEMA_METADATA_KEY, Schema};
+use crate::schema::{AvroSchema, SCHEMA_METADATA_KEY, Schema};
use std::io::BufRead;
+use std::str;
+use std::sync::Arc;
/// Read the Avro file header (magic, metadata, sync marker) from `reader`.
-pub(crate) fn read_header<R: BufRead>(mut reader: R) -> Result<Header,
AvroError> {
+///
+/// On success, returns the parsed [`Header`] and the number of bytes read
from `reader`.
+pub(crate) fn read_header<R: BufRead>(mut reader: R) -> Result<(Header, u64),
AvroError> {
let mut decoder = HeaderDecoder::default();
+ let mut position = 0;
loop {
let buf = reader.fill_buf()?;
if buf.is_empty() {
@@ -34,12 +39,14 @@ pub(crate) fn read_header<R: BufRead>(mut reader: R) ->
Result<Header, AvroError
let read = buf.len();
let decoded = decoder.decode(buf)?;
reader.consume(decoded);
+ position += decoded as u64;
if decoded != read {
break;
}
}
decoder
.flush()
+ .map(|header| (header, position))
.ok_or_else(|| AvroError::EOF("Unexpected EOF while reading Avro
header".to_string()))
}
@@ -124,6 +131,60 @@ impl Header {
}
}
+/// Header information for an Avro OCF file.
+///
+/// The header can be parsed once and shared to construct multiple readers
+/// for the same file, and so this struct is designed to be cheaply clonable.
+#[derive(Clone)]
+pub struct HeaderInfo(Arc<HeaderInfoInner>);
+
+struct HeaderInfoInner {
+ header: Header,
+ header_len: u64,
+}
+
+/// Reads the Avro file header (magic, metadata, sync marker) from `reader`.
+///
+/// On success, returns the parsed [`HeaderInfo`] containing the header and
its length in bytes.
+pub fn read_header_info<R: BufRead>(reader: R) -> Result<HeaderInfo,
AvroError> {
+ let (header, header_len) = read_header(reader)?;
+ Ok(HeaderInfo::new(header, header_len))
+}
+
+impl HeaderInfo {
+ pub(crate) fn new(header: Header, header_len: u64) -> Self {
+ Self(Arc::new(HeaderInfoInner { header, header_len }))
+ }
+
+ /// Returns the writer schema for this file.
+ pub fn writer_schema(&self) -> Result<AvroSchema, AvroError> {
+ let raw = self.0.header.get(SCHEMA_METADATA_KEY).ok_or_else(|| {
+ AvroError::ParseError("No Avro schema present in file
header".to_string())
+ })?;
+ let json_string = str::from_utf8(raw)
+ .map_err(|e| {
+ AvroError::ParseError(format!("Invalid UTF-8 in Avro schema
header: {e}"))
+ })?
+ .to_string();
+ Ok(AvroSchema::new(json_string))
+ }
+
+ /// Returns the [`CompressionCodec`] if any
+ pub fn compression(&self) -> Result<Option<CompressionCodec>, AvroError> {
+ self.0.header.compression()
+ }
+
+ /// Returns the length of the header in bytes.
+ pub fn header_len(&self) -> u64 {
+ self.0.header_len
+ }
+
+ /// Returns the sync token for this file.
+ pub fn sync(&self) -> [u8; 16] {
+ self.0.header.sync()
+ }
+}
+
/// A decoder for [`Header`]
///
/// The avro file format does not encode the length of the header, and so it
@@ -315,7 +376,7 @@ mod test {
fn decode_file(file: &str) -> Header {
let file = File::open(file).unwrap();
- read_header(BufReader::with_capacity(1000, file)).unwrap()
+ read_header(BufReader::with_capacity(1000, file)).unwrap().0
}
#[test]
diff --git a/arrow-avro/src/reader/mod.rs b/arrow-avro/src/reader/mod.rs
index 84d41cf9c6..070204f2bc 100644
--- a/arrow-avro/src/reader/mod.rs
+++ b/arrow-avro/src/reader/mod.rs
@@ -500,7 +500,9 @@ mod record;
mod vlq;
#[cfg(feature = "async")]
-mod async_reader;
+pub mod async_reader;
+
+pub use header::{HeaderInfo, read_header_info};
#[cfg(feature = "object_store")]
pub use async_reader::AvroObjectReader;
@@ -1285,7 +1287,7 @@ impl ReaderBuilder {
/// the discovered writer (and optional reader) schema, and prepares to
iterate blocks,
/// decompressing if necessary.
pub fn build<R: BufRead>(self, mut reader: R) -> Result<Reader<R>,
ArrowError> {
- let header = read_header(&mut reader)?;
+ let (header, _) = read_header(&mut reader)?;
let decoder = self.make_decoder(Some(&header),
self.reader_schema.as_ref())?;
Ok(Reader {
reader,
@@ -1644,7 +1646,7 @@ mod test {
fn load_writer_schema_json(path: &str) -> Value {
let file = File::open(path).unwrap();
- let header = super::read_header(BufReader::new(file)).unwrap();
+ let (header, _) = super::read_header(BufReader::new(file)).unwrap();
let schema = header.schema().unwrap().unwrap();
serde_json::to_value(&schema).unwrap()
}