This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git


The following commit(s) were added to refs/heads/main by this push:
     new c4b43bb916 feat(arrow-avro): `HeaderInfo` to expose OCF header (#9548)
c4b43bb916 is described below

commit c4b43bb916aa91d366d17013d867992d931aae70
Author: Mikhail Zabaluev <[email protected]>
AuthorDate: Wed Mar 18 16:11:40 2026 +0200

    feat(arrow-avro): `HeaderInfo` to expose OCF header (#9548)
    
    # Which issue does this PR close?
    
    - Closes #9460.
    
    # Rationale for this change
    
    Rework of #9462 along the lines proposed in
    https://github.com/apache/arrow-rs/pull/9462#issuecomment-3995541243.
    
    # What changes are included in this PR?
    
    Add `HeaderInfo` as a cheaply cloneable value to expose header
    information parsed from an Avro OCF file.
    
    Add `read_header_info` function to the `reader` module, and its async
    counterpart to the `reader::async_reader` module, to read the header
    from the file reader and return `HeaderInfo`.
    
    Add `build_with_header` method to async reader builder to enable reuse
    of the header with multiple readers.
    
    # Are these changes tested?
    
    Added a test for the async reader.
    
    # Are there any user-facing changes?
    
    New API in arrow-avro:
    
    * `reader::HeaderInfo`
    * `reader::read_header_info` and
    `reader::async_reader::read_header_info`
    *  `build_with_header` method of `AvroAsyncFileReader`'s builder.
    
    ---------
    
    Co-authored-by: Connor Sanders 
<[email protected]>
---
 arrow-avro/src/reader/async_reader/builder.rs | 135 ++++++++++++++++----------
 arrow-avro/src/reader/async_reader/mod.rs     |  45 ++++++++-
 arrow-avro/src/reader/header.rs               |  67 ++++++++++++-
 arrow-avro/src/reader/mod.rs                  |   8 +-
 4 files changed, 195 insertions(+), 60 deletions(-)

diff --git a/arrow-avro/src/reader/async_reader/builder.rs 
b/arrow-avro/src/reader/async_reader/builder.rs
index 9e979c7566..d3cca70425 100644
--- a/arrow-avro/src/reader/async_reader/builder.rs
+++ b/arrow-avro/src/reader/async_reader/builder.rs
@@ -18,10 +18,10 @@
 use crate::codec::{AvroFieldBuilder, Tz};
 use crate::errors::AvroError;
 use crate::reader::async_reader::ReaderState;
-use crate::reader::header::{Header, HeaderDecoder};
+use crate::reader::header::{Header, HeaderDecoder, HeaderInfo};
 use crate::reader::record::RecordDecoder;
 use crate::reader::{AsyncAvroFileReader, AsyncFileReader, Decoder};
-use crate::schema::{AvroSchema, FingerprintAlgorithm, SCHEMA_METADATA_KEY};
+use crate::schema::{AvroSchema, FingerprintAlgorithm};
 use indexmap::IndexMap;
 use std::ops::Range;
 
@@ -119,50 +119,71 @@ impl<R> ReaderBuilder<R> {
     }
 }
 
-impl<R: AsyncFileReader> ReaderBuilder<R> {
-    async fn read_header(&mut self) -> Result<(Header, u64), AvroError> {
-        let mut decoder = HeaderDecoder::default();
-        let mut position = 0;
-        loop {
-            let range_to_fetch = position
-                ..(position + 
self.header_size_hint.unwrap_or(DEFAULT_HEADER_SIZE_HINT))
-                    .min(self.file_size);
+/// Reads the Avro file header (magic, metadata, sync marker) asynchronously 
from `reader`.
+///
+/// On success, returns the parsed [`HeaderInfo`] containing the header and 
its length in bytes.
+pub async fn read_header_info<R>(
+    reader: &mut R,
+    file_size: u64,
+    header_size_hint: Option<u64>,
+) -> Result<HeaderInfo, AvroError>
+where
+    R: AsyncFileReader,
+{
+    read_header(reader, file_size, header_size_hint)
+        .await
+        .map(|(header, header_len)| HeaderInfo::new(header, header_len))
+}
 
-            // Maybe EOF after the header, no actual data
-            if range_to_fetch.is_empty() {
-                break;
-            }
+async fn read_header<R>(
+    reader: &mut R,
+    file_size: u64,
+    header_size_hint: Option<u64>,
+) -> Result<(Header, u64), AvroError>
+where
+    R: AsyncFileReader,
+{
+    let mut decoder = HeaderDecoder::default();
+    let mut position = 0;
+    loop {
+        let range_to_fetch = position
+            ..(position + 
header_size_hint.unwrap_or(DEFAULT_HEADER_SIZE_HINT)).min(file_size);
 
-            let current_data = self
-                .reader
-                .get_bytes(range_to_fetch.clone())
-                .await
-                .map_err(|err| {
-                    AvroError::General(format!(
-                        "Error fetching Avro header from file reader: {err}"
-                    ))
-                })?;
-            if current_data.is_empty() {
-                return Err(AvroError::EOF(
-                    "Unexpected EOF while fetching header data".into(),
-                ));
-            }
+        // Maybe EOF after the header, no actual data
+        if range_to_fetch.is_empty() {
+            break;
+        }
 
-            let read = current_data.len();
-            let decoded = decoder.decode(&current_data)?;
-            if decoded != read {
-                position += decoded as u64;
-                break;
-            }
-            position += read as u64;
+        let current_data = reader
+            .get_bytes(range_to_fetch.clone())
+            .await
+            .map_err(|err| {
+                AvroError::General(format!(
+                    "Error fetching Avro header from file reader: {err}"
+                ))
+            })?;
+        if current_data.is_empty() {
+            return Err(AvroError::EOF(
+                "Unexpected EOF while fetching header data".into(),
+            ));
         }
 
-        decoder
-            .flush()
-            .map(|header| (header, position))
-            .ok_or_else(|| AvroError::EOF("Unexpected EOF while reading Avro 
header".into()))
+        let read = current_data.len();
+        let decoded = decoder.decode(&current_data)?;
+        if decoded != read {
+            position += decoded as u64;
+            break;
+        }
+        position += read as u64;
     }
 
+    decoder
+        .flush()
+        .map(|header| (header, position))
+        .ok_or_else(|| AvroError::EOF("Unexpected EOF while reading Avro 
header".into()))
+}
+
+impl<R: AsyncFileReader> ReaderBuilder<R> {
     /// Build the asynchronous Avro reader with the provided parameters.
     /// This reads the header first to initialize the reader state.
     pub async fn try_build(mut self) -> Result<AsyncAvroFileReader<R>, 
AvroError> {
@@ -172,18 +193,24 @@ impl<R: AsyncFileReader> ReaderBuilder<R> {
 
         // Start by reading the header from the beginning of the avro file
         // take the writer schema from the header
-        let (header, header_len) = self.read_header().await?;
-        let writer_schema = {
-            let raw = header.get(SCHEMA_METADATA_KEY).ok_or_else(|| {
-                AvroError::ParseError("No Avro schema present in file 
header".to_string())
-            })?;
-            let json_string = std::str::from_utf8(raw)
-                .map_err(|e| {
-                    AvroError::ParseError(format!("Invalid UTF-8 in Avro 
schema header: {e}"))
-                })?
-                .to_string();
-            AvroSchema::new(json_string)
-        };
+        let header_info =
+            read_header_info(&mut self.reader, self.file_size, 
self.header_size_hint).await?;
+
+        self.build_with_header(header_info)
+    }
+
+    /// Build the asynchronous Avro reader with the provided header.
+    ///
+    /// This allows initializing the reader with pre-parsed header information.
+    /// Note that this method is not async because it does not need to perform 
any I/O operations.
+    ///
+    /// Note: Any `header_size_hint` set via [`Self::with_header_size_hint`] 
is not used
+    /// when building with a pre-parsed header, since no header fetching 
occurs.
+    pub fn build_with_header(
+        self,
+        header_info: HeaderInfo,
+    ) -> Result<AsyncAvroFileReader<R>, AvroError> {
+        let writer_schema = header_info.writer_schema()?;
 
         // If projection exists, project the reader schema,
         // if no reader schema is provided, parse it from the header(get the 
raw writer schema), and project that
@@ -230,6 +257,7 @@ impl<R: AsyncFileReader> ReaderBuilder<R> {
             IndexMap::new(),
             FingerprintAlgorithm::Rabin,
         );
+        let header_len = header_info.header_len();
         let range = match self.range {
             Some(r) => {
                 // If this PartitionedFile's range starts at 0, we need to 
skip the header bytes.
@@ -252,8 +280,9 @@ impl<R: AsyncFileReader> ReaderBuilder<R> {
                 reader: self.reader,
             }
         };
-        let codec = header.compression()?;
-        let sync_marker = header.sync();
+
+        let codec = header_info.compression()?;
+        let sync_marker = header_info.sync();
 
         Ok(AsyncAvroFileReader::new(
             range,
diff --git a/arrow-avro/src/reader/async_reader/mod.rs 
b/arrow-avro/src/reader/async_reader/mod.rs
index 0aaa739eef..c034411edb 100644
--- a/arrow-avro/src/reader/async_reader/mod.rs
+++ b/arrow-avro/src/reader/async_reader/mod.rs
@@ -15,6 +15,11 @@
 // specific language governing permissions and limitations
 // under the License.
 
+//! Asynchronous implementation of Avro file reader.
+//!
+//! This module provides [`AsyncAvroFileReader`], which supports reading and 
decoding
+//! the Avro OCF format from any source that implements [`AsyncFileReader`].
+
 use crate::compression::CompressionCodec;
 use crate::reader::Decoder;
 use crate::reader::block::{BlockDecoder, BlockDecoderState};
@@ -32,7 +37,7 @@ mod async_file_reader;
 mod builder;
 
 pub use async_file_reader::AsyncFileReader;
-pub use builder::ReaderBuilder;
+pub use builder::{ReaderBuilder, read_header_info};
 
 #[cfg(feature = "object_store")]
 mod store;
@@ -1286,6 +1291,44 @@ mod tests {
         assert_eq!(batch.num_rows(), 8);
     }
 
+    #[tokio::test]
+    async fn test_builder_with_header_info() {
+        let file = arrow_test_data("avro/alltypes_plain.avro");
+        let store = Arc::new(LocalFileSystem::new());
+        let location = Path::from_filesystem_path(&file).unwrap();
+
+        let file_size = store.head(&location).await.unwrap().size;
+
+        let mut file_reader = AvroObjectReader::new(store, location);
+
+        let header_info = read_header_info(&mut file_reader, file_size, None)
+            .await
+            .unwrap();
+
+        assert_eq!(header_info.header_len(), 675);
+
+        let writer_schema = header_info.writer_schema().unwrap();
+        let expected_avro_json: serde_json::Value = serde_json::from_str(
+            get_alltypes_schema()
+                .metadata()
+                .get(SCHEMA_METADATA_KEY)
+                .unwrap(),
+        )
+        .unwrap();
+        let actual_avro_json: serde_json::Value =
+            serde_json::from_str(&writer_schema.json_string).unwrap();
+        assert_eq!(actual_avro_json, expected_avro_json);
+
+        let reader = AsyncAvroFileReader::builder(file_reader, file_size, 1024)
+            .build_with_header(header_info)
+            .unwrap();
+
+        let batches: Vec<RecordBatch> = reader.try_collect().await.unwrap();
+
+        let batch = &batches[0];
+        assert_eq!(batch.num_rows(), 8)
+    }
+
     #[tokio::test]
     async fn test_roundtrip_write_then_async_read() {
         use crate::writer::AvroWriter;
diff --git a/arrow-avro/src/reader/header.rs b/arrow-avro/src/reader/header.rs
index b5efd8bcdb..c5593ba0ad 100644
--- a/arrow-avro/src/reader/header.rs
+++ b/arrow-avro/src/reader/header.rs
@@ -20,12 +20,17 @@
 use crate::compression::{CODEC_METADATA_KEY, CompressionCodec};
 use crate::errors::AvroError;
 use crate::reader::vlq::VLQDecoder;
-use crate::schema::{SCHEMA_METADATA_KEY, Schema};
+use crate::schema::{AvroSchema, SCHEMA_METADATA_KEY, Schema};
 use std::io::BufRead;
+use std::str;
+use std::sync::Arc;
 
 /// Read the Avro file header (magic, metadata, sync marker) from `reader`.
-pub(crate) fn read_header<R: BufRead>(mut reader: R) -> Result<Header, 
AvroError> {
+///
+/// On success, returns the parsed [`Header`] and the number of bytes read 
from `reader`.
+pub(crate) fn read_header<R: BufRead>(mut reader: R) -> Result<(Header, u64), 
AvroError> {
     let mut decoder = HeaderDecoder::default();
+    let mut position = 0;
     loop {
         let buf = reader.fill_buf()?;
         if buf.is_empty() {
@@ -34,12 +39,14 @@ pub(crate) fn read_header<R: BufRead>(mut reader: R) -> 
Result<Header, AvroError
         let read = buf.len();
         let decoded = decoder.decode(buf)?;
         reader.consume(decoded);
+        position += decoded as u64;
         if decoded != read {
             break;
         }
     }
     decoder
         .flush()
+        .map(|header| (header, position))
         .ok_or_else(|| AvroError::EOF("Unexpected EOF while reading Avro 
header".to_string()))
 }
 
@@ -124,6 +131,60 @@ impl Header {
     }
 }
 
+/// Header information for an Avro OCF file.
+///
+/// The header can be parsed once and shared to construct multiple readers
+/// for the same file, and so this struct is designed to be cheaply clonable.
+#[derive(Clone)]
+pub struct HeaderInfo(Arc<HeaderInfoInner>);
+
+struct HeaderInfoInner {
+    header: Header,
+    header_len: u64,
+}
+
+/// Reads the Avro file header (magic, metadata, sync marker) from `reader`.
+///
+/// On success, returns the parsed [`HeaderInfo`] containing the header and 
its length in bytes.
+pub fn read_header_info<R: BufRead>(reader: R) -> Result<HeaderInfo, 
AvroError> {
+    let (header, header_len) = read_header(reader)?;
+    Ok(HeaderInfo::new(header, header_len))
+}
+
+impl HeaderInfo {
+    pub(crate) fn new(header: Header, header_len: u64) -> Self {
+        Self(Arc::new(HeaderInfoInner { header, header_len }))
+    }
+
+    /// Returns the writer schema for this file.
+    pub fn writer_schema(&self) -> Result<AvroSchema, AvroError> {
+        let raw = self.0.header.get(SCHEMA_METADATA_KEY).ok_or_else(|| {
+            AvroError::ParseError("No Avro schema present in file 
header".to_string())
+        })?;
+        let json_string = str::from_utf8(raw)
+            .map_err(|e| {
+                AvroError::ParseError(format!("Invalid UTF-8 in Avro schema 
header: {e}"))
+            })?
+            .to_string();
+        Ok(AvroSchema::new(json_string))
+    }
+
+    /// Returns the [`CompressionCodec`] if any
+    pub fn compression(&self) -> Result<Option<CompressionCodec>, AvroError> {
+        self.0.header.compression()
+    }
+
+    /// Returns the length of the header in bytes.
+    pub fn header_len(&self) -> u64 {
+        self.0.header_len
+    }
+
+    /// Returns the sync token for this file.
+    pub fn sync(&self) -> [u8; 16] {
+        self.0.header.sync()
+    }
+}
+
 /// A decoder for [`Header`]
 ///
 /// The avro file format does not encode the length of the header, and so it
@@ -315,7 +376,7 @@ mod test {
 
     fn decode_file(file: &str) -> Header {
         let file = File::open(file).unwrap();
-        read_header(BufReader::with_capacity(1000, file)).unwrap()
+        read_header(BufReader::with_capacity(1000, file)).unwrap().0
     }
 
     #[test]
diff --git a/arrow-avro/src/reader/mod.rs b/arrow-avro/src/reader/mod.rs
index 84d41cf9c6..070204f2bc 100644
--- a/arrow-avro/src/reader/mod.rs
+++ b/arrow-avro/src/reader/mod.rs
@@ -500,7 +500,9 @@ mod record;
 mod vlq;
 
 #[cfg(feature = "async")]
-mod async_reader;
+pub mod async_reader;
+
+pub use header::{HeaderInfo, read_header_info};
 
 #[cfg(feature = "object_store")]
 pub use async_reader::AvroObjectReader;
@@ -1285,7 +1287,7 @@ impl ReaderBuilder {
     /// the discovered writer (and optional reader) schema, and prepares to 
iterate blocks,
     /// decompressing if necessary.
     pub fn build<R: BufRead>(self, mut reader: R) -> Result<Reader<R>, 
ArrowError> {
-        let header = read_header(&mut reader)?;
+        let (header, _) = read_header(&mut reader)?;
         let decoder = self.make_decoder(Some(&header), 
self.reader_schema.as_ref())?;
         Ok(Reader {
             reader,
@@ -1644,7 +1646,7 @@ mod test {
 
     fn load_writer_schema_json(path: &str) -> Value {
         let file = File::open(path).unwrap();
-        let header = super::read_header(BufReader::new(file)).unwrap();
+        let (header, _) = super::read_header(BufReader::new(file)).unwrap();
         let schema = header.schema().unwrap().unwrap();
         serde_json::to_value(&schema).unwrap()
     }

Reply via email to