jecsand838 commented on code in PR #8930:
URL: https://github.com/apache/arrow-rs/pull/8930#discussion_r2600697636


##########
arrow-avro/src/reader/async_reader/builder.rs:
##########
@@ -0,0 +1,149 @@
+use crate::codec::AvroFieldBuilder;
+use crate::reader::async_reader::ReaderState;
+use crate::reader::header::{Header, HeaderDecoder};
+use crate::reader::record::RecordDecoder;
+use crate::reader::{AsyncAvroReader, AsyncFileReader, Decoder};
+use crate::schema::{AvroSchema, FingerprintAlgorithm};
+use arrow_schema::{ArrowError, SchemaRef};
+use indexmap::IndexMap;
+use std::ops::Range;
+
+/// Builder for an asynchronous Avro file reader.
+pub struct AsyncAvroReaderBuilder<R: AsyncFileReader> {
+    pub(super) reader: R,
+    pub(super) file_size: u64,
+    pub(super) schema: SchemaRef,
+    pub(super) batch_size: usize,
+    pub(super) range: Option<Range<u64>>,
+    pub(super) reader_schema: Option<AvroSchema>,
+}
+
+impl<R: AsyncFileReader> AsyncAvroReaderBuilder<R> {
+    /// Specify a byte range to read from the Avro file.
+    /// If this is provided, the reader will read all the blocks within the 
specified range,
+    /// if the range ends mid-block, it will attempt to fetch the remaining 
bytes to complete the block,
+    /// but no further blocks will be read.
+    /// If this is omitted, the full file will be read.
+    pub fn with_range(self, range: Range<u64>) -> Self {
+        Self {
+            range: Some(range),
+            ..self
+        }
+    }
+
+    /// Specify a reader schema to use when reading the Avro file.
+    /// This can be useful to project specific columns or handle schema 
evolution.
+    /// If this is not provided, the schema will be derived from the Arrow 
schema provided.
+    pub fn with_reader_schema(self, reader_schema: AvroSchema) -> Self {
+        Self {
+            reader_schema: Some(reader_schema),
+            ..self
+        }
+    }
+
+    async fn read_header(&mut self) -> Result<(Header, u64), ArrowError> {
+        let mut decoder = HeaderDecoder::default();
+        let mut position = 0;
+        loop {
+            let range_to_fetch = position..(position + 64 * 
1024).min(self.file_size);
+            let current_data = self
+                .reader
+                .fetch_extra_range(range_to_fetch)
+                .await
+                .map_err(|err| {
+                    ArrowError::AvroError(format!(
+                        "Error fetching Avro header from object store: {err}"
+                    ))
+                })?;
+            if current_data.is_empty() {
+                break;
+            }
+            let read = current_data.len();
+            let decoded = decoder.decode(&current_data)?;
+            if decoded != read {
+                position += decoded as u64;
+                break;
+            }
+            position += read as u64;
+        }
+
+        decoder
+            .flush()
+            .map(|header| (header, position))
+            .ok_or_else(|| ArrowError::AvroError("Unexpected EOF while reading 
Avro header".into()))
+    }
+
+    /// Build the asynchronous Avro reader with the provided parameters.
+    /// This reads the header first to initialize the reader state.
+    pub async fn try_build(mut self) -> Result<AsyncAvroReader<R>, ArrowError> 
{
+        if self.file_size == 0 {
+            return Err(ArrowError::AvroError("File size cannot be 0".into()));
+        }
+
+        // Start by reading the header from the beginning of the avro file
+        let (header, header_len) = self.read_header().await?;
+        let writer_schema = header
+            .schema()
+            .map_err(|e| ArrowError::ExternalError(Box::new(e)))?
+            .ok_or_else(|| {
+                ArrowError::ParseError("No Avro schema present in file 
header".into())
+            })?;
+
+        let root = {
+            let field_builder = AvroFieldBuilder::new(&writer_schema);
+            match self.reader_schema.as_ref() {
+                None => {
+                    let devised_avro_schema = 
AvroSchema::try_from(self.schema.as_ref())?;
+                    let devised_reader_schema = devised_avro_schema.schema()?;
+                    field_builder
+                        .with_reader_schema(&devised_reader_schema)
+                        .build()
+                }

Review Comment:
   Shouldn't we just execute `field_builder.build` without a `reader_schema` in 
this case? 
   
   The `Reader` treats this scenario as one where the caller simply wants to 
decode an OCF file without schema resolution, purely using the `writer_schema`.
   



##########
arrow-avro/src/reader/async_reader/mod.rs:
##########
@@ -0,0 +1,1021 @@
+use crate::compression::CompressionCodec;
+use crate::reader::Decoder;
+use crate::reader::block::BlockDecoder;
+use crate::reader::vlq::VLQDecoder;
+use arrow_array::RecordBatch;
+use arrow_schema::{ArrowError, SchemaRef};
+use bytes::Bytes;
+use futures::FutureExt;
+use futures::Stream;
+use futures::future::BoxFuture;
+use std::mem;
+use std::ops::Range;
+use std::pin::Pin;
+use std::task::{Context, Poll};
+
+mod async_file_reader;
+mod builder;
+
+pub use async_file_reader::AsyncFileReader;
+pub use builder::AsyncAvroReaderBuilder;
+
+#[cfg(feature = "object_store")]
+mod object_store_reader;
+
+#[cfg(feature = "object_store")]
+pub use object_store_reader::ObjectStoreFileReader;
+
+type DataFetchFutureBoxed = BoxFuture<'static, Result<Bytes, ArrowError>>;
+
+enum ReaderState {
+    Idle,
+    Limbo,
+    FetchingData {
+        existing_data: Option<Bytes>,
+        fetch_future: DataFetchFutureBoxed,
+    },
+    DecodingBlock {
+        data: Bytes,
+    },
+    ReadingBatches {
+        data: Bytes,
+        block_data: Bytes,
+        remaining_in_block: usize,
+    },
+    Finished,
+}
+
+/// An asynchronous Avro file reader that implements `Stream<Item = 
Result<RecordBatch, ArrowError>>`.
+/// This uses an [`AsyncFileReader`] to fetch data ranges as needed, starting 
with fetching the header,
+/// then reading all the blocks in the provided range where:
+/// 1. Reads and decodes data until the header is fully decoded.
+/// 2. Searching from `range.start` for the first sync marker, and starting 
with the following block.
+///    (If `range.start` is less than the header length, we start at the 
header length minus the sync marker bytes)
+/// 3. Reading blocks sequentially, decoding them into RecordBatches.
+/// 4. If a block is incomplete (due to range ending mid-block), fetching the 
remaining bytes from the [`AsyncFileReader`].
+/// 5. If no range was originally provided, reads the full file.
+/// 6. If the range is 0, file_size is 0, or `range.end` is less than the 
header length, finish immediately.
+pub struct AsyncAvroReader<R: AsyncFileReader> {
+    // Members required to fetch data
+    reader: R,
+    range: Range<u64>,
+    file_size: u64,
+
+    // Members required to actually decode and read data
+    decoder: Decoder,
+    block_decoder: BlockDecoder,
+    codec: Option<CompressionCodec>,
+    sync_marker: [u8; 16],
+
+    // Members keeping the current state of the reader
+    reader_state: ReaderState,
+    finishing_partial_block: bool,
+}
+
+impl<R: AsyncFileReader> AsyncAvroReader<R> {
+    /// Returns a builder for a new [`Self`], allowing some optional 
parameters.
+    pub fn builder(
+        reader: R,
+        file_size: u64,
+        schema: SchemaRef,

Review Comment:
   I really don't think the `schema` field should be required here as it 
subtracts from Avro's self-describing characteristic while effectively making 
the optional `reader_schema` required. 
   
   I'd recommend setting this up in a manner that encourages callers to use 
`with_reader_schema`. That way callers which simply want to read an OCF file 
without schema resolution are optimally supported. 
   
   If we absolutely need to support passing in an Arrow `reader_schema`, then 
I'd recommend adding an optional (and well documented) 
`with_arrow_reader_schema` method (to compliment `with_reader_schema`) that 
inputs an Arrow `SchemaRef` and runs `AvroSchema::try_new` on it.



##########
arrow-avro/src/reader/async_reader/async_file_reader.rs:
##########
@@ -0,0 +1,18 @@
+use crate::reader::async_reader::DataFetchFutureBoxed;
+use std::ops::Range;
+
+/// A broad generic trait definition allowing fetching bytes from any source 
asynchronously.
+/// This trait has very few limitations, mostly in regard to ownership and 
lifetime,
+/// but it must return a boxed Future containing [`bytes::Bytes`] or an error.
+pub trait AsyncFileReader: Send + Unpin {
+    /// Fetch a range of bytes asynchronously using a custom reading method
+    fn fetch_range(&mut self, range: Range<u64>) -> DataFetchFutureBoxed;
+
+    /// Fetch a range that is beyond the originally provided file range,
+    /// such as reading the header before reading the file,
+    /// or fetching the remainder of the block in case the range ended before 
the block's end.
+    /// By default, this will simply point to the fetch_range function.
+    fn fetch_extra_range(&mut self, range: Range<u64>) -> DataFetchFutureBoxed 
{
+        self.fetch_range(range)
+    }
+}

Review Comment:
   What's your take on aligning this a bit more with the trait used in 
`parquet` and `arrow/async_reader`?
   
   ```suggestion
   pub trait AsyncFileReader: Send {
       /// Retrieve the bytes in `range`
       fn get_bytes(&mut self, range: Range<u64>) -> BoxFuture<'_, 
Result<Bytes>>;
   ```
   
   My thinking is this:
   1. The `get_bytes` trait method is just "fetch these bytes". It doesn't know 
or care whether the range is within some "expected" range. The out-of-band 
reads (header, partial block completion) could be a concern of the reader 
logic, not the I/O trait.
   2. Users already understand `get_bytes` / `get_byte_ranges`. Reusing that 
mental model reduces friction. Plus consistency across crates is generally a 
best practice.
   3. This would unlock a clean default `impl for AsyncRead + AsyncSeek` (like 
`tokio::fs::File`) the same way Parquet does . The  current`'static` 
requirement forces all implementations to be fully owned or `Arc`-wrapped, 
which seems unnecessarily rigid for simple file readers.



##########
arrow-avro/src/reader/async_reader/mod.rs:
##########
@@ -0,0 +1,1021 @@
+use crate::compression::CompressionCodec;
+use crate::reader::Decoder;
+use crate::reader::block::BlockDecoder;
+use crate::reader::vlq::VLQDecoder;
+use arrow_array::RecordBatch;
+use arrow_schema::{ArrowError, SchemaRef};
+use bytes::Bytes;
+use futures::FutureExt;
+use futures::Stream;
+use futures::future::BoxFuture;
+use std::mem;
+use std::ops::Range;
+use std::pin::Pin;
+use std::task::{Context, Poll};
+
+mod async_file_reader;
+mod builder;
+
+pub use async_file_reader::AsyncFileReader;
+pub use builder::AsyncAvroReaderBuilder;
+
+#[cfg(feature = "object_store")]
+mod object_store_reader;
+
+#[cfg(feature = "object_store")]
+pub use object_store_reader::ObjectStoreFileReader;
+
+type DataFetchFutureBoxed = BoxFuture<'static, Result<Bytes, ArrowError>>;
+
+enum ReaderState {
+    Idle,
+    Limbo,
+    FetchingData {
+        existing_data: Option<Bytes>,
+        fetch_future: DataFetchFutureBoxed,
+    },
+    DecodingBlock {
+        data: Bytes,
+    },
+    ReadingBatches {
+        data: Bytes,
+        block_data: Bytes,
+        remaining_in_block: usize,
+    },
+    Finished,
+}
+
+/// An asynchronous Avro file reader that implements `Stream<Item = 
Result<RecordBatch, ArrowError>>`.
+/// This uses an [`AsyncFileReader`] to fetch data ranges as needed, starting 
with fetching the header,
+/// then reading all the blocks in the provided range where:
+/// 1. Reads and decodes data until the header is fully decoded.
+/// 2. Searching from `range.start` for the first sync marker, and starting 
with the following block.
+///    (If `range.start` is less than the header length, we start at the 
header length minus the sync marker bytes)
+/// 3. Reading blocks sequentially, decoding them into RecordBatches.
+/// 4. If a block is incomplete (due to range ending mid-block), fetching the 
remaining bytes from the [`AsyncFileReader`].
+/// 5. If no range was originally provided, reads the full file.
+/// 6. If the range is 0, file_size is 0, or `range.end` is less than the 
header length, finish immediately.
+pub struct AsyncAvroReader<R: AsyncFileReader> {
+    // Members required to fetch data
+    reader: R,
+    range: Range<u64>,
+    file_size: u64,
+
+    // Members required to actually decode and read data
+    decoder: Decoder,
+    block_decoder: BlockDecoder,
+    codec: Option<CompressionCodec>,
+    sync_marker: [u8; 16],
+
+    // Members keeping the current state of the reader
+    reader_state: ReaderState,
+    finishing_partial_block: bool,
+}
+
+impl<R: AsyncFileReader> AsyncAvroReader<R> {
+    /// Returns a builder for a new [`Self`], allowing some optional 
parameters.
+    pub fn builder(
+        reader: R,
+        file_size: u64,
+        schema: SchemaRef,
+        batch_size: usize,
+    ) -> AsyncAvroReaderBuilder<R> {
+        AsyncAvroReaderBuilder {
+            reader,
+            file_size,
+            schema,
+            batch_size,
+            range: None,
+            reader_schema: None,
+        }
+    }
+
+    /// Create a new asynchronous Avro reader for the given file location in 
the object store,
+    /// reading the specified byte range (if any), with the provided reader 
schema and batch size.
+    /// If no range is provided, the full file is read (file_size must be 
provided in this case).
+    fn new(
+        reader: R,
+        range: Range<u64>,
+        file_size: u64,
+        decoder: Decoder,
+        codec: Option<CompressionCodec>,
+        sync_marker: [u8; 16],
+        reader_state: ReaderState,
+    ) -> Self {
+        Self {
+            reader,
+            range,
+            file_size,
+
+            decoder,
+            block_decoder: Default::default(),
+            codec,
+            sync_marker,
+
+            reader_state,
+            finishing_partial_block: false,
+        }
+    }
+
+    fn fetch_data_future(&mut self, range: Range<u64>) -> 
Result<DataFetchFutureBoxed, ArrowError> {
+        if range.start >= range.end || range.end > self.file_size {
+            return Err(ArrowError::AvroError(format!(
+                "Invalid range specified for Avro file: start {} >= end {}, 
file_size: {}",
+                range.start, range.end, self.file_size
+            )));
+        }
+
+        Ok(self.reader.fetch_range(range))
+    }
+
+    fn read_next(&mut self, cx: &mut Context<'_>) -> 
Poll<Option<Result<RecordBatch, ArrowError>>> {
+        loop {
+            match mem::replace(&mut self.reader_state, ReaderState::Limbo) {
+                ReaderState::Idle => {
+                    let fetch_future = 
self.fetch_data_future(self.range.clone())?;
+                    self.reader_state = ReaderState::FetchingData {
+                        existing_data: None,
+                        fetch_future,
+                    };
+                }
+                ReaderState::Limbo => {
+                    unreachable!("ReaderState::Limbo should never be 
observed");
+                }
+                ReaderState::FetchingData {
+                    existing_data,
+                    mut fetch_future,
+                } => {
+                    let data_chunk = match fetch_future.poll_unpin(cx)? {
+                        Poll::Ready(data) => data,
+                        Poll::Pending => {
+                            // Return control to executor
+                            self.reader_state = ReaderState::FetchingData {
+                                existing_data,
+                                fetch_future,
+                            };
+                            return Poll::Pending;
+                        }
+                    };
+
+                    if let Some(current_data) = existing_data {
+                        // If data already exists, it means we have a partial 
block,
+                        // Append the newly fetched chunk to the existing 
buffered data.
+                        let combined =
+                            Bytes::from_owner([current_data.clone(), 
data_chunk].concat());
+                        self.reader_state = ReaderState::DecodingBlock { data: 
combined };
+                    } else {
+                        let sync_marker_pos = data_chunk
+                            .windows(16)
+                            .position(|slice| slice == self.sync_marker);
+                        let block_start = match sync_marker_pos {
+                            Some(pos) => pos + 16, // Move past the sync marker
+                            None => {
+                                // Sync marker not found, this is actually 
valid if we arbitrarily split the file at its end.
+                                self.reader_state = ReaderState::Finished;
+                                return Poll::Ready(None);
+                            }
+                        };
+
+                        // This is the first time we read data, so try and 
find the sync marker.
+                        self.reader_state = ReaderState::DecodingBlock {
+                            data: data_chunk.slice(block_start..),
+                        };
+                    }
+                }
+                ReaderState::DecodingBlock { mut data } => {
+                    // Try to decode another block from the buffered reader.
+                    let consumed = self.block_decoder.decode(&data)?;
+                    if consumed == 0 {
+                        // If the last block was exactly at the end of the 
file,
+                        // we're simply done reading.
+                        if data.is_empty() {
+                            let final_batch = self.decoder.flush();
+                            self.reader_state = ReaderState::Finished;
+                            return Poll::Ready(final_batch.transpose());
+                        }
+
+                        // If we've tried the following stage before, and 
still can't decode,
+                        // this means the file is truncated or corrupted.
+                        if self.finishing_partial_block {
+                            return Poll::Ready(Some(Err(ArrowError::AvroError(
+                                "Unexpected EOF while reading last Avro 
block".into(),
+                            ))));
+                        }
+
+                        // Avro splitting case: block is incomplete, we need 
to:
+                        // 1. Parse the length so we know how much to read
+                        // 2. Fetch more data from the object store
+                        // 3. Create a new block data from the remaining slice 
and the newly fetched data
+                        // 4. Continue decoding until end of block
+                        self.finishing_partial_block = true;
+
+                        let (size, vlq_header_len) = {
+                            let mut vlq = VLQDecoder::default();
+                            let mut vlq_buf = &data[..];
+                            let original_len = vlq_buf.len();
+
+                            let _ = vlq.long(&mut vlq_buf).ok_or_else(|| {
+                                ArrowError::AvroError(
+                                    "Unexpected EOF while reading Avro block 
count".into(),
+                                )
+                            })?;
+
+                            let size = vlq.long(&mut vlq_buf).ok_or_else(|| {
+                                ArrowError::AvroError(
+                                    "Unexpected EOF while reading Avro block 
size".into(),
+                                )
+                            })? as u64;
+
+                            // Calculate how many bytes were consumed by the 
two VLQ integers
+                            let header_len = 
original_len.checked_sub(vlq_buf.len()).unwrap();
+
+                            (size, header_len as u64)
+                        };
+
+                        // Two longs: count and size have already been read, 
but using our vlq,
+                        // meaning they were not consumed.
+                        let total_block_size = size + vlq_header_len;
+                        let remaining_to_fetch =
+                            total_block_size.checked_sub(data.len() as 
u64).unwrap();
+
+                        let range_to_fetch = self.range.end..(self.range.end + 
remaining_to_fetch);
+                        self.reader_state = ReaderState::FetchingData {
+                            existing_data: Some(data),
+                            fetch_future: 
self.fetch_data_future(range_to_fetch)?,
+                        };
+                        continue;
+                    }
+
+                    // Slice off the consumed data
+                    data = data.slice(consumed..);
+
+                    // Decompress the block if needed, prepare it for decoding.
+                    if let Some(block) = self.block_decoder.flush() {
+                        // Successfully decoded a block.
+                        let block_data = Bytes::from_owner(if let Some(ref 
codec) = self.codec {
+                            codec.decompress(&block.data)?
+                        } else {
+                            block.data
+                        });
+
+                        // Since we have an active block, move to reading 
batches
+                        self.reader_state = ReaderState::ReadingBatches {
+                            data,
+                            block_data,
+                            remaining_in_block: block.count,
+                        };
+                    } else {
+                        // Block not finished yet, try to decode more in the 
next iteration
+                        self.reader_state = ReaderState::DecodingBlock { data 
};
+                    }
+                }
+                ReaderState::ReadingBatches {
+                    data,
+                    mut block_data,
+                    mut remaining_in_block,
+                } => {
+                    let (consumed, records_decoded) =
+                        self.decoder.decode_block(&block_data, 
remaining_in_block)?;
+
+                    remaining_in_block -= records_decoded;
+
+                    if remaining_in_block == 0 {
+                        // Finished this block, move to decode next block in 
the next iteration
+                        self.reader_state = ReaderState::DecodingBlock { data 
};
+                    } else {
+                        // Still more records to decode in this block, slice 
the already-read data and stay in this state
+                        block_data = block_data.slice(consumed..);
+                        self.reader_state = ReaderState::ReadingBatches {
+                            data,
+                            block_data,
+                            remaining_in_block,
+                        };
+                    }
+
+                    // We have a full batch ready, emit it
+                    // (This is not mutually exclusive with the block being 
finished, so the state change is valid)
+                    if self.decoder.batch_is_full() {
+                        let batch_res = self.decoder.flush();
+                        return Poll::Ready(batch_res.transpose());
+                    }
+                }
+                // No more batches to emit
+                ReaderState::Finished => return Poll::Ready(None),
+            }
+        }
+    }
+}
+
+impl<R: AsyncFileReader> Stream for AsyncAvroReader<R> {
+    type Item = Result<RecordBatch, ArrowError>;
+
+    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> 
Poll<Option<Self::Item>> {
+        self.get_mut().read_next(cx)
+    }
+}
+
+#[cfg(all(test, feature = "object_store"))]
+mod tests {
+    use super::*;
+    use crate::schema::{AvroSchema, SCHEMA_METADATA_KEY};
+    use arrow_array::*;
+    use arrow_schema::{DataType, Field, Schema, SchemaRef, TimeUnit};
+    use futures::{StreamExt, TryStreamExt};
+    use object_store::ObjectStore;
+    use object_store::local::LocalFileSystem;
+    use object_store::path::Path;
+    use std::collections::HashMap;
+    use std::sync::Arc;
+
+    fn arrow_test_data(file: &str) -> String {
+        let base =
+            std::env::var("ARROW_TEST_DATA").unwrap_or_else(|_| 
"../testing/data".to_string());
+        format!("{}/{}", base, file)
+    }
+
+    fn get_alltypes_schema() -> SchemaRef {
+        let schema = Schema::new(vec![
+            Field::new("id", DataType::Int32, true),
+            Field::new("bool_col", DataType::Boolean, true),
+            Field::new("tinyint_col", DataType::Int32, true),
+            Field::new("smallint_col", DataType::Int32, true),
+            Field::new("int_col", DataType::Int32, true),
+            Field::new("bigint_col", DataType::Int64, true),
+            Field::new("float_col", DataType::Float32, true),
+            Field::new("double_col", DataType::Float64, true),
+            Field::new("date_string_col", DataType::Binary, true),
+            Field::new("string_col", DataType::Binary, true),
+            Field::new(
+                "timestamp_col",
+                DataType::Timestamp(TimeUnit::Microsecond, 
Some("+00:00".into())),
+                true,
+            ),
+        ])
+        .with_metadata(HashMap::from([(
+            SCHEMA_METADATA_KEY.into(),
+            r#"{
+    "type": "record",
+    "name": "topLevelRecord",
+    "fields": [
+        {
+            "name": "id",
+            "type": [
+                "int",
+                "null"
+            ]
+        },
+        {
+            "name": "bool_col",
+            "type": [
+                "boolean",
+                "null"
+            ]
+        },
+        {
+            "name": "tinyint_col",
+            "type": [
+                "int",
+                "null"
+            ]
+        },
+        {
+            "name": "smallint_col",
+            "type": [
+                "int",
+                "null"
+            ]
+        },
+        {
+            "name": "int_col",
+            "type": [
+                "int",
+                "null"
+            ]
+        },
+        {
+            "name": "bigint_col",
+            "type": [
+                "long",
+                "null"
+            ]
+        },
+        {
+            "name": "float_col",
+            "type": [
+                "float",
+                "null"
+            ]
+        },
+        {
+            "name": "double_col",
+            "type": [
+                "double",
+                "null"
+            ]
+        },
+        {
+            "name": "date_string_col",
+            "type": [
+                "bytes",
+                "null"
+            ]
+        },
+        {
+            "name": "string_col",
+            "type": [
+                "bytes",
+                "null"
+            ]
+        },
+        {
+            "name": "timestamp_col",
+            "type": [
+                {
+                    "type": "long",
+                    "logicalType": "timestamp-micros"
+                },
+                "null"
+            ]
+        }
+    ]
+}
+"#
+            .into(),
+        )]));
+        Arc::new(schema)
+    }
+
+    fn get_alltypes_with_nulls_schema() -> SchemaRef {
+        let schema = Schema::new(vec![
+            Field::new("string_col", DataType::Binary, true),
+            Field::new("int_col", DataType::Int32, true),
+            Field::new("bool_col", DataType::Boolean, true),
+            Field::new("bigint_col", DataType::Int64, true),
+            Field::new("float_col", DataType::Float32, true),
+            Field::new("double_col", DataType::Float64, true),
+            Field::new("bytes_col", DataType::Binary, true),
+        ])
+        .with_metadata(HashMap::from([(
+            SCHEMA_METADATA_KEY.into(),
+            r#"{
+    "type": "record",
+    "name": "topLevelRecord",
+    "fields": [
+        {
+            "name": "string_col",
+            "type": [
+                "null",
+                "string"
+            ],
+            "default": null
+        },
+        {
+            "name": "int_col",
+            "type": [
+                "null",
+                "int"
+            ],
+            "default": null
+        },
+        {
+            "name": "bool_col",
+            "type": [
+                "null",
+                "boolean"
+            ],
+            "default": null
+        },
+        {
+            "name": "bigint_col",
+            "type": [
+                "null",
+                "long"
+            ],
+            "default": null
+        },
+        {
+            "name": "float_col",
+            "type": [
+                "null",
+                "float"
+            ],
+            "default": null
+        },
+        {
+            "name": "double_col",
+            "type": [
+                "null",
+                "double"
+            ],
+            "default": null
+        },
+        {
+            "name": "bytes_col",
+            "type": [
+                "null",
+                "bytes"
+            ],
+            "default": null
+        }
+    ]
+}"#
+            .into(),
+        )]));
+
+        Arc::new(schema)
+    }
+
+    fn get_nested_records_schema() -> SchemaRef {
+        let schema = Schema::new(vec![
+            Field::new(
+                "f1",
+                DataType::Struct(
+                    vec![
+                        Field::new("f1_1", DataType::Utf8, false),
+                        Field::new("f1_2", DataType::Int32, false),
+                        Field::new(
+                            "f1_3",
+                            DataType::Struct(
+                                vec![Field::new("f1_3_1", DataType::Float64, 
false)].into(),
+                            ),
+                            false,
+                        ),
+                    ]
+                    .into(),
+                ),
+                false,
+            ),
+            Field::new(
+                "f2",
+                DataType::List(Arc::new(Field::new(
+                    "item",
+                    DataType::Struct(
+                        vec![
+                            Field::new("f2_1", DataType::Boolean, false),
+                            Field::new("f2_2", DataType::Float32, false),
+                        ]
+                        .into(),
+                    ),
+                    false,
+                ))),
+                false,
+            ),
+            Field::new(
+                "f3",
+                DataType::Struct(vec![Field::new("f3_1", DataType::Utf8, 
false)].into()),
+                true,
+            ),
+            Field::new(
+                "f4",
+                DataType::List(Arc::new(Field::new(
+                    "item",
+                    DataType::Struct(vec![Field::new("f4_1", DataType::Int64, 
false)].into()),
+                    true,
+                ))),
+                false,
+            ),
+        ])
+        .with_metadata(HashMap::from([(
+            SCHEMA_METADATA_KEY.into(),
+            r#"{
+    "type": "record",
+    "namespace": "ns1",
+    "name": "record1",
+    "fields": [
+        {
+            "name": "f1",
+            "type": {
+                "type": "record",
+                "namespace": "ns2",
+                "name": "record2",
+                "fields": [
+                    {
+                        "name": "f1_1",
+                        "type": "string"
+                    },
+                    {
+                        "name": "f1_2",
+                        "type": "int"
+                    },
+                    {
+                        "name": "f1_3",
+                        "type": {
+                            "type": "record",
+                            "namespace": "ns3",
+                            "name": "record3",
+                            "fields": [
+                                {
+                                    "name": "f1_3_1",
+                                    "type": "double"
+                                }
+                            ]
+                        }
+                    }
+                ]
+            }
+        },
+        {
+            "name": "f2",
+            "type": {
+                "type": "array",
+                "items": {
+                    "type": "record",
+                    "namespace": "ns4",
+                    "name": "record4",
+                    "fields": [
+                        {
+                            "name": "f2_1",
+                            "type": "boolean"
+                        },
+                        {
+                            "name": "f2_2",
+                            "type": "float"
+                        }
+                    ]
+                }
+            }
+        },
+        {
+            "name": "f3",
+            "type": [
+                "null",
+                {
+                    "type": "record",
+                    "namespace": "ns5",
+                    "name": "record5",
+                    "fields": [
+                        {
+                            "name": "f3_1",
+                            "type": "string"
+                        }
+                    ]
+                }
+            ],
+            "default": null
+        },
+        {
+            "name": "f4",
+            "type": {
+                "type": "array",
+                "items": [
+                    "null",
+                    {
+                        "type": "record",
+                        "namespace": "ns6",
+                        "name": "record6",
+                        "fields": [
+                            {
+                                "name": "f4_1",
+                                "type": "long"
+                            }
+                        ]
+                    }
+                ]
+            }
+        }
+    ]
+}
+"#
+            .into(),
+        )]));
+
+        Arc::new(schema)
+    }
+
+    async fn read_async_file(
+        path: &str,
+        batch_size: usize,
+        range: Option<Range<u64>>,
+        schema: SchemaRef,
+    ) -> Result<Vec<RecordBatch>, ArrowError> {
+        let store: Arc<dyn ObjectStore> = Arc::new(LocalFileSystem::new());
+        let location = Path::from_filesystem_path(path).unwrap();
+
+        let file_size = store.head(&location).await.unwrap().size;
+
+        let file_reader = ObjectStoreFileReader::new(store, location);
+        let reader_schema = AvroSchema::try_from(schema.as_ref())?;
+        let builder = AsyncAvroReader::builder(file_reader, file_size, schema, 
batch_size)
+            .with_reader_schema(reader_schema);
+        let reader = if let Some(range) = range {
+            builder.with_range(range)
+        } else {
+            builder
+        }
+        .try_build()
+        .await?;
+
+        let batches: Vec<RecordBatch> = reader.try_collect().await?;
+        Ok(batches)
+    }
+
+    // 
============================================================================
+    // CORE FUNCTIONALITY TESTS
+    // 
============================================================================
+
+    #[tokio::test]
+    async fn test_full_file_read() {
+        let file = arrow_test_data("avro/alltypes_plain.avro");
+        let schema = get_alltypes_schema();
+        let batches = read_async_file(&file, 1024, None, 
schema).await.unwrap();
+        let batch = &batches[0];
+
+        assert_eq!(batch.num_rows(), 8);
+        assert_eq!(batch.num_columns(), 11);
+
+        let id_array = batch
+            .column(0)
+            .as_any()
+            .downcast_ref::<Int32Array>()
+            .unwrap();
+        assert_eq!(id_array.value(0), 4);
+        assert_eq!(id_array.value(7), 1);
+    }
+
+    #[tokio::test]
+    async fn test_small_batch_size() {
+        let file = arrow_test_data("avro/alltypes_plain.avro");
+        let schema = get_alltypes_schema();
+        let batches = read_async_file(&file, 2, None, schema).await.unwrap();
+        assert_eq!(batches.len(), 4);
+
+        let batch = &batches[0];
+
+        assert_eq!(batch.num_rows(), 2);
+        assert_eq!(batch.num_columns(), 11);
+    }
+
+    #[tokio::test]
+    async fn test_batch_size_one() {
+        let file = arrow_test_data("avro/alltypes_plain.avro");
+        let schema = get_alltypes_schema();
+        let batches = read_async_file(&file, 1, None, schema).await.unwrap();
+        let batch = &batches[0];
+
+        assert_eq!(batches.len(), 8);
+        assert_eq!(batch.num_rows(), 1);
+    }
+
+    #[tokio::test]
+    async fn test_batch_size_larger_than_file() {
+        let file = arrow_test_data("avro/alltypes_plain.avro");
+        let schema = get_alltypes_schema();
+        let batches = read_async_file(&file, 10000, None, 
schema).await.unwrap();
+        let batch = &batches[0];
+
+        assert_eq!(batch.num_rows(), 8);
+    }
+
+    // 
============================================================================
+    // RANGE HANDLING TESTS
+    // 
============================================================================
+
+    #[tokio::test]
+    async fn test_empty_range() {
+        let file = arrow_test_data("avro/alltypes_plain.avro");
+        let range = 100..100;
+        let schema = get_alltypes_schema();
+        let batches = read_async_file(&file, 1024, Some(range), schema)
+            .await
+            .unwrap();
+        assert_eq!(batches.len(), 0);
+    }
+
+    #[tokio::test]
+    async fn test_range_starting_at_zero() {
+        // Tests that range starting at 0 correctly skips header
+        let file = arrow_test_data("avro/alltypes_plain.avro");
+        let store = Arc::new(LocalFileSystem::new());
+        let location = Path::from_filesystem_path(&file).unwrap();
+        let meta = store.head(&location).await.unwrap();
+
+        let range = 0..meta.size;
+        let schema = get_alltypes_schema();
+        let batches = read_async_file(&file, 1024, Some(range), schema)
+            .await
+            .unwrap();
+        let batch = &batches[0];
+
+        assert_eq!(batch.num_rows(), 8);
+    }
+
+    #[tokio::test]
+    async fn test_range_after_header() {
+        let file = arrow_test_data("avro/alltypes_plain.avro");
+        let store = Arc::new(LocalFileSystem::new());
+        let location = Path::from_filesystem_path(&file).unwrap();
+        let meta = store.head(&location).await.unwrap();
+
+        let range = 100..meta.size;
+        let schema = get_alltypes_schema();
+        let batches = read_async_file(&file, 1024, Some(range), schema)
+            .await
+            .unwrap();
+        let batch = &batches[0];
+
+        assert!(batch.num_rows() > 0);
+    }
+
+    #[tokio::test]
+    async fn test_range_no_sync_marker() {
+        // Small range unlikely to contain sync marker
+        let file = arrow_test_data("avro/alltypes_plain.avro");
+        let range = 50..150;
+        let schema = get_alltypes_schema();
+        let batches = read_async_file(&file, 1024, Some(range), schema)
+            .await
+            .unwrap();
+        assert_eq!(batches.len(), 0);
+    }
+
+    #[tokio::test]
+    async fn test_range_starting_mid_file() {
+        let file = arrow_test_data("avro/alltypes_plain.avro");
+
+        let range = 700..768; // Header ends at 675, so this should be 
mid-block
+        let schema = get_alltypes_schema();
+        let batches = read_async_file(&file, 1024, Some(range), schema)
+            .await
+            .unwrap();
+        assert_eq!(batches.len(), 0);
+    }
+
+    #[tokio::test]
+    async fn test_range_ending_at_file_size() {
+        let file = arrow_test_data("avro/alltypes_plain.avro");
+        let store = Arc::new(LocalFileSystem::new());
+        let location = Path::from_filesystem_path(&file).unwrap();
+        let meta = store.head(&location).await.unwrap();
+
+        let range = 200..meta.size;
+        let schema = get_alltypes_schema();
+        let batches = read_async_file(&file, 1024, Some(range), schema)
+            .await
+            .unwrap();
+        let batch = &batches[0];
+
+        assert_eq!(batch.num_rows(), 8);
+    }
+
+    // 
============================================================================
+    // INCOMPLETE BLOCK HANDLING TESTS
+    // 
============================================================================
+
+    #[tokio::test]
+    async fn test_incomplete_block_requires_fetch() {
+        // Range ends mid-block, should trigger fetching_rem_block logic
+        let file = arrow_test_data("avro/alltypes_plain.avro");
+        let range = 0..1200;
+        let schema = get_alltypes_schema();
+        let batches = read_async_file(&file, 1024, Some(range), schema)
+            .await
+            .unwrap();
+        let batch = &batches[0];
+
+        assert_eq!(batch.num_rows(), 8)
+    }
+
+    #[cfg(feature = "snappy")]
+    #[tokio::test]
+    async fn test_snappy_compressed_with_range() {
+        {
+            let file = arrow_test_data("avro/alltypes_plain.snappy.avro");
+            let store = Arc::new(LocalFileSystem::new());
+            let location = Path::from_filesystem_path(&file).unwrap();
+            let meta = store.head(&location).await.unwrap();
+
+            let range = 200..meta.size;
+            let schema = get_alltypes_schema();
+            let batches = read_async_file(&file, 1024, Some(range), schema)
+                .await
+                .unwrap();
+            let batch = &batches[0];
+
+            assert!(batch.num_rows() > 0);
+        }
+    }
+
+    #[tokio::test]
+    async fn test_nulls() {
+        let file = arrow_test_data("avro/alltypes_nulls_plain.avro");
+        let schema = get_alltypes_with_nulls_schema();
+        let batches = read_async_file(&file, 1024, None, 
schema).await.unwrap();
+        let batch = &batches[0];
+
+        assert_eq!(batch.num_rows(), 1);
+        for col in batch.columns() {
+            assert!(col.is_null(0));
+        }
+    }
+
+    #[tokio::test]
+    async fn test_nested_records() {
+        let file = arrow_test_data("avro/nested_records.avro");
+        let schema = get_nested_records_schema();
+        let batches = read_async_file(&file, 1024, None, 
schema).await.unwrap();
+        let batch = &batches[0];
+
+        assert_eq!(batch.num_rows(), 2);
+        assert!(batch.num_columns() > 0);
+    }
+
+    // 
============================================================================
+    // STREAM BEHAVIOR TESTS
+    // 
============================================================================
+
+    #[tokio::test]
+    async fn test_stream_produces_multiple_batches() {
+        let file = arrow_test_data("avro/alltypes_plain.avro");
+        let store = Arc::new(LocalFileSystem::new());
+        let location = Path::from_filesystem_path(&file).unwrap();
+
+        let file_size = store.head(&location).await.unwrap().size;
+
+        let file_reader = ObjectStoreFileReader::new(store, location);
+        let schema = get_alltypes_schema();
+        let reader_schema = AvroSchema::try_from(schema.as_ref()).unwrap();
+        let reader = AsyncAvroReader::builder(
+            file_reader,
+            file_size,
+            schema,
+            2, // Small batch size to force multiple batches
+        )
+        .with_reader_schema(reader_schema)
+        .try_build()
+        .await
+        .unwrap();
+
+        let batches: Vec<RecordBatch> = reader.try_collect().await.unwrap();
+
+        assert!(batches.len() > 1);
+        let total_rows: usize = batches.iter().map(|b| b.num_rows()).sum();
+        assert_eq!(total_rows, 8);
+    }
+
+    #[tokio::test]
+    async fn test_stream_early_termination() {
+        let file = arrow_test_data("avro/alltypes_plain.avro");
+        let store = Arc::new(LocalFileSystem::new());
+        let location = Path::from_filesystem_path(&file).unwrap();
+
+        let file_size = store.head(&location).await.unwrap().size;
+
+        let file_reader = ObjectStoreFileReader::new(store, location);
+        let schema = get_alltypes_schema();
+        let reader_schema = AvroSchema::try_from(schema.as_ref()).unwrap();
+        let reader = AsyncAvroReader::builder(file_reader, file_size, schema, 
1)
+            .with_reader_schema(reader_schema)
+            .try_build()
+            .await
+            .unwrap();
+
+        let first_batch = 
reader.take(1).try_collect::<Vec<_>>().await.unwrap();
+
+        assert_eq!(first_batch.len(), 1);
+        assert!(first_batch[0].num_rows() > 0);
+    }
+
+    // 
============================================================================
+    // EDGE CASE TESTS
+    // 
============================================================================
+
+    #[tokio::test]
+    async fn test_various_batch_sizes() {
+        let file = arrow_test_data("avro/alltypes_plain.avro");
+
+        for batch_size in [1, 2, 3, 5, 7, 11, 100] {
+            let schema = get_alltypes_schema();
+            let batches = read_async_file(&file, batch_size, None, schema)
+                .await
+                .unwrap();
+            let batch = &batches[0];
+
+            // Size should be what was provided, to the limit of the batch in 
the file
+            assert_eq!(
+                batch.num_rows(),
+                batch_size.min(8),
+                "Failed with batch_size={}",
+                batch_size
+            );
+        }
+    }
+
+    #[tokio::test]
+    async fn test_range_larger_than_file() {
+        let file = arrow_test_data("avro/alltypes_plain.avro");
+        let store = Arc::new(LocalFileSystem::new());
+        let location = Path::from_filesystem_path(&file).unwrap();
+        let meta = store.head(&location).await.unwrap();
+
+        // Range extends beyond file size
+        let range = 100..(meta.size + 1000);
+        let schema = get_alltypes_schema();
+        let batches = read_async_file(&file, 1024, Some(range), schema)
+            .await
+            .unwrap();
+        let batch = &batches[0];
+
+        // Should clamp to file size
+        assert_eq!(batch.num_rows(), 8);
+    }

Review Comment:
   It may also be worth adding round-trip tests using the `Writer`.



##########
arrow-avro/src/reader/async_reader/builder.rs:
##########
@@ -0,0 +1,149 @@
+use crate::codec::AvroFieldBuilder;
+use crate::reader::async_reader::ReaderState;
+use crate::reader::header::{Header, HeaderDecoder};
+use crate::reader::record::RecordDecoder;
+use crate::reader::{AsyncAvroReader, AsyncFileReader, Decoder};
+use crate::schema::{AvroSchema, FingerprintAlgorithm};
+use arrow_schema::{ArrowError, SchemaRef};
+use indexmap::IndexMap;
+use std::ops::Range;
+
+/// Builder for an asynchronous Avro file reader.
+pub struct AsyncAvroReaderBuilder<R: AsyncFileReader> {
+    pub(super) reader: R,
+    pub(super) file_size: u64,
+    pub(super) schema: SchemaRef,
+    pub(super) batch_size: usize,
+    pub(super) range: Option<Range<u64>>,
+    pub(super) reader_schema: Option<AvroSchema>,
+}
+
+impl<R: AsyncFileReader> AsyncAvroReaderBuilder<R> {

Review Comment:
   Nit, but I'd consider naming this either `AsyncFileReaderBuilder` or 
`AsyncOcfReaderBuilder`



##########
arrow-avro/src/reader/async_reader/async_file_reader.rs:
##########
@@ -0,0 +1,18 @@
+use crate::reader::async_reader::DataFetchFutureBoxed;
+use std::ops::Range;
+
+/// A broad generic trait definition allowing fetching bytes from any source 
asynchronously.
+/// This trait has very few limitations, mostly in regard to ownership and 
lifetime,
+/// but it must return a boxed Future containing [`bytes::Bytes`] or an error.

Review Comment:
   You may want to provide examples on how to use this for the docs.



##########
arrow-avro/src/reader/async_reader/mod.rs:
##########
@@ -0,0 +1,1021 @@
+use crate::compression::CompressionCodec;
+use crate::reader::Decoder;
+use crate::reader::block::BlockDecoder;
+use crate::reader::vlq::VLQDecoder;
+use arrow_array::RecordBatch;
+use arrow_schema::{ArrowError, SchemaRef};
+use bytes::Bytes;
+use futures::FutureExt;
+use futures::Stream;
+use futures::future::BoxFuture;
+use std::mem;
+use std::ops::Range;
+use std::pin::Pin;
+use std::task::{Context, Poll};
+
+mod async_file_reader;
+mod builder;
+
+pub use async_file_reader::AsyncFileReader;
+pub use builder::AsyncAvroReaderBuilder;
+
+#[cfg(feature = "object_store")]
+mod object_store_reader;
+
+#[cfg(feature = "object_store")]
+pub use object_store_reader::ObjectStoreFileReader;
+
+type DataFetchFutureBoxed = BoxFuture<'static, Result<Bytes, ArrowError>>;
+
+enum ReaderState {
+    Idle,
+    Limbo,
+    FetchingData {
+        existing_data: Option<Bytes>,
+        fetch_future: DataFetchFutureBoxed,
+    },
+    DecodingBlock {
+        data: Bytes,
+    },
+    ReadingBatches {
+        data: Bytes,
+        block_data: Bytes,
+        remaining_in_block: usize,
+    },
+    Finished,
+}
+
+/// An asynchronous Avro file reader that implements `Stream<Item = 
Result<RecordBatch, ArrowError>>`.
+/// This uses an [`AsyncFileReader`] to fetch data ranges as needed, starting 
with fetching the header,
+/// then reading all the blocks in the provided range where:
+/// 1. Reads and decodes data until the header is fully decoded.
+/// 2. Searching from `range.start` for the first sync marker, and starting 
with the following block.
+///    (If `range.start` is less than the header length, we start at the 
header length minus the sync marker bytes)
+/// 3. Reading blocks sequentially, decoding them into RecordBatches.
+/// 4. If a block is incomplete (due to range ending mid-block), fetching the 
remaining bytes from the [`AsyncFileReader`].
+/// 5. If no range was originally provided, reads the full file.
+/// 6. If the range is 0, file_size is 0, or `range.end` is less than the 
header length, finish immediately.
+pub struct AsyncAvroReader<R: AsyncFileReader> {

Review Comment:
   Nit: but I'd also consider calling this `AsyncFileReader ` or 
`AsyncOcfReader`.



##########
arrow-avro/Cargo.toml:
##########
@@ -45,16 +45,26 @@ sha256 = ["dep:sha2"]
 small_decimals = []
 avro_custom_types = ["dep:arrow-select"]
 
+# Enable async APIs
+async = ["futures"]
+# Enable object_store integration
+object_store = ["dep:object_store", "async"]

Review Comment:
   I'd recommend updating the `README.md` and docs with details on these new 
features.



##########
arrow-avro/src/reader/async_reader/builder.rs:
##########
@@ -0,0 +1,149 @@
+use crate::codec::AvroFieldBuilder;
+use crate::reader::async_reader::ReaderState;
+use crate::reader::header::{Header, HeaderDecoder};
+use crate::reader::record::RecordDecoder;
+use crate::reader::{AsyncAvroReader, AsyncFileReader, Decoder};
+use crate::schema::{AvroSchema, FingerprintAlgorithm};
+use arrow_schema::{ArrowError, SchemaRef};
+use indexmap::IndexMap;
+use std::ops::Range;
+
+/// Builder for an asynchronous Avro file reader.
+pub struct AsyncAvroReaderBuilder<R: AsyncFileReader> {
+    pub(super) reader: R,
+    pub(super) file_size: u64,
+    pub(super) schema: SchemaRef,
+    pub(super) batch_size: usize,
+    pub(super) range: Option<Range<u64>>,
+    pub(super) reader_schema: Option<AvroSchema>,
+}
+
+impl<R: AsyncFileReader> AsyncAvroReaderBuilder<R> {
+    /// Specify a byte range to read from the Avro file.
+    /// If this is provided, the reader will read all the blocks within the 
specified range,
+    /// if the range ends mid-block, it will attempt to fetch the remaining 
bytes to complete the block,
+    /// but no further blocks will be read.
+    /// If this is omitted, the full file will be read.
+    pub fn with_range(self, range: Range<u64>) -> Self {
+        Self {
+            range: Some(range),
+            ..self
+        }
+    }
+
+    /// Specify a reader schema to use when reading the Avro file.
+    /// This can be useful to project specific columns or handle schema 
evolution.
+    /// If this is not provided, the schema will be derived from the Arrow 
schema provided.
+    pub fn with_reader_schema(self, reader_schema: AvroSchema) -> Self {
+        Self {
+            reader_schema: Some(reader_schema),
+            ..self
+        }
+    }
+
+    async fn read_header(&mut self) -> Result<(Header, u64), ArrowError> {
+        let mut decoder = HeaderDecoder::default();
+        let mut position = 0;
+        loop {
+            let range_to_fetch = position..(position + 64 * 
1024).min(self.file_size);

Review Comment:
   Is there a reason for hardcoding `position + 64 * 1024`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to