jecsand838 commented on code in PR #8930:
URL: https://github.com/apache/arrow-rs/pull/8930#discussion_r2710612882


##########
arrow-avro/src/reader/async_reader/builder.rs:
##########
@@ -0,0 +1,226 @@
+use crate::codec::AvroFieldBuilder;
+use crate::reader::async_reader::ReaderState;
+use crate::reader::header::{Header, HeaderDecoder};
+use crate::reader::record::RecordDecoder;
+use crate::reader::{AsyncAvroFileReader, AsyncFileReader, Decoder};
+use crate::schema::{AvroSchema, FingerprintAlgorithm, SCHEMA_METADATA_KEY};
+use arrow_schema::ArrowError;
+use indexmap::IndexMap;
+use std::ops::Range;
+
+const DEFAULT_HEADER_SIZE_HINT: u64 = 16 * 1024; // 16 KB
+
+/// Builder for an asynchronous Avro file reader.
+pub struct AsyncAvroFileReaderBuilder<R: AsyncFileReader> {
+    reader: R,
+    file_size: u64,
+    batch_size: usize,
+    range: Option<Range<u64>>,
+    reader_schema: Option<AvroSchema>,
+    projection: Option<Vec<usize>>,
+    header_size_hint: Option<u64>,
+    utf8_view: bool,
+    strict_mode: bool,
+}
+
+impl<R: AsyncFileReader + Unpin + 'static> AsyncAvroFileReaderBuilder<R> {
+    pub(super) fn new(reader: R, file_size: u64, batch_size: usize) -> Self {
+        Self {
+            reader,
+            file_size,
+            batch_size,
+            range: None,
+            reader_schema: None,
+            projection: None,
+            header_size_hint: None,
+            utf8_view: false,
+            strict_mode: false,
+        }
+    }
+
+    /// Specify a byte range to read from the Avro file.
+    /// If this is provided, the reader will read all the blocks within the 
specified range,
+    /// if the range ends mid-block, it will attempt to fetch the remaining 
bytes to complete the block,
+    /// but no further blocks will be read.
+    /// If this is omitted, the full file will be read.
+    pub fn with_range(self, range: Range<u64>) -> Self {
+        Self {
+            range: Some(range),
+            ..self
+        }
+    }
+
+    /// Specify a reader schema to use when reading the Avro file.
+    /// This can be useful to project specific columns or handle schema 
evolution.
+    /// If this is not provided, the schema will be derived from the Arrow 
schema provided.
+    pub fn with_reader_schema(self, reader_schema: AvroSchema) -> Self {
+        Self {
+            reader_schema: Some(reader_schema),
+            ..self
+        }
+    }
+
+    /// Specify a projection of column indices to read from the Avro file.
+    /// This can help optimize reading by only fetching the necessary columns.
+    pub fn with_projection(self, projection: Vec<usize>) -> Self {
+        Self {
+            projection: Some(projection),
+            ..self
+        }
+    }
+
+    /// Provide a hint for the expected size of the Avro header in bytes.
+    /// This can help optimize the initial read operation when fetching the 
header.
+    pub fn with_header_size_hint(self, hint: u64) -> Self {
+        Self {
+            header_size_hint: Some(hint),
+            ..self
+        }
+    }
+
+    /// Enable usage of Utf8View types when reading string data.
+    pub fn with_utf8_view(self, utf8_view: bool) -> Self {
+        Self { utf8_view, ..self }
+    }
+
+    /// Enable strict mode for schema validation and data reading.
+    pub fn with_strict_mode(self, strict_mode: bool) -> Self {
+        Self {
+            strict_mode,
+            ..self
+        }
+    }
+
+    async fn read_header(&mut self) -> Result<(Header, u64), ArrowError> {
+        let mut decoder = HeaderDecoder::default();
+        let mut position = 0;
+        loop {
+            let range_to_fetch = position
+                ..(position + 
self.header_size_hint.unwrap_or(DEFAULT_HEADER_SIZE_HINT))
+                    .min(self.file_size);
+            let current_data = 
self.reader.get_bytes(range_to_fetch).await.map_err(|err| {
+                ArrowError::AvroError(format!(
+                    "Error fetching Avro header from object store: {err}"
+                ))
+            })?;
+            if current_data.is_empty() {
+                break;
+            }
+            let read = current_data.len();
+            let decoded = decoder.decode(&current_data)?;
+            if decoded != read {
+                position += decoded as u64;
+                break;
+            }
+            position += read as u64;
+        }
+
+        decoder
+            .flush()
+            .map(|header| (header, position))
+            .ok_or_else(|| ArrowError::AvroError("Unexpected EOF while reading 
Avro header".into()))
+    }
+
+    /// Build the asynchronous Avro reader with the provided parameters.
+    /// This reads the header first to initialize the reader state.
+    pub async fn try_build(mut self) -> Result<AsyncAvroFileReader<R>, 
ArrowError> {
+        if self.file_size == 0 {
+            return Err(ArrowError::AvroError("File size cannot be 0".into()));
+        }
+
+        // Start by reading the header from the beginning of the avro file
+        // take the writer schema from the header
+        let (header, header_len) = self.read_header().await?;
+        let writer_schema = header
+            .schema()
+            .map_err(|e| ArrowError::ExternalError(Box::new(e)))?
+            .ok_or_else(|| {
+                ArrowError::ParseError("No Avro schema present in file 
header".into())
+            })?;
+
+        // If projection exists, project the reader schema,
+        // if no reader schema is provided, parse it from the header(get the 
raw writer schema), and project that
+        // this projected schema will be the schema used for reading.
+        let projected_reader_schema = self
+            .projection
+            .as_deref()
+            .map(|projection| {
+                let base_schema = if let Some(reader_schema) = 
&self.reader_schema {
+                    reader_schema.clone()
+                } else {
+                    let raw = header.get(SCHEMA_METADATA_KEY).ok_or_else(|| {
+                        ArrowError::ParseError("No Avro schema present in file 
header".to_string())
+                    })?;
+                    let json_string = std::str::from_utf8(raw)
+                        .map_err(|e| {
+                            ArrowError::ParseError(format!(
+                                "Invalid UTF-8 in Avro schema header: {e}"
+                            ))
+                        })?
+                        .to_string();
+                    AvroSchema::new(json_string)
+                };
+                base_schema.project(projection)
+            })
+            .transpose()?;

Review Comment:
   We should probably add more test coverage to the 
`arrow-avro/src/reader/async_reader/builder.rs` file.
   
   <img width="868" height="1016" alt="Screenshot 2026-01-20 at 7 14 41 PM" 
src="https://github.com/user-attachments/assets/8a316420-4116-4e02-a680-1cebdb28732b";
 />
   
   



##########
arrow-avro/src/reader/async_reader/mod.rs:
##########
@@ -0,0 +1,1330 @@
+use crate::compression::CompressionCodec;
+use crate::reader::Decoder;
+use crate::reader::block::{BlockDecoder, BlockDecoderState};
+use arrow_array::RecordBatch;
+use arrow_schema::ArrowError;
+use bytes::Bytes;
+use futures::future::BoxFuture;
+use futures::{FutureExt, Stream};
+use std::mem;
+use std::ops::Range;
+use std::pin::Pin;
+use std::task::{Context, Poll};
+
+mod async_file_reader;
+mod builder;
+
+pub use async_file_reader::AsyncFileReader;
+pub use builder::AsyncAvroFileReaderBuilder;
+
+#[cfg(feature = "object_store")]
+mod store;
+
+#[cfg(feature = "object_store")]
+pub use store::AvroObjectReader;
+
+enum FetchNextBehaviour {
+    /// Initial read: scan for sync marker, then move to decoding blocks
+    ReadSyncMarker,
+    /// Parse VLQ header bytes one at a time until Data state, then continue 
decoding
+    DecodeVLQHeader,
+    /// Continue decoding the current block with the fetched data
+    ContinueDecoding,
+}
+
+enum ReaderState<R: AsyncFileReader> {
+    /// Intermediate state to fix ownership issues
+    InvalidState,
+    /// Initial state, fetch initial range
+    Idle { reader: R },
+    /// Fetching data from the reader
+    FetchingData {
+        future: BoxFuture<'static, Result<(R, Bytes), ArrowError>>,
+        next_behaviour: FetchNextBehaviour,
+    },
+    /// Decode a block in a loop until completion
+    DecodingBlock { data: Bytes, reader: R },
+    /// Output batches from a decoded block
+    ReadingBatches {
+        data: Bytes,
+        block_data: Bytes,
+        remaining_in_block: usize,
+        reader: R,
+    },
+    /// An error occurred, should not have been polled again.
+    Error,
+    /// Done, flush decoder and return
+    Finished,
+}

Review Comment:
   After looking through the code, I strongly recommend making the stream 
"fused" so that it terminates after an error, rather than yielding infinite 
errors.
   
   My concern is that currently, if the reader enters `ReaderState::Error` (or 
hits `InvalidState`), it returns `Some(Err(...))` indefinitely. This causes 
consumers like `StreamExt::collect` to hang forever.
   
   I recommend fixing this by strictly transitioning to `ReaderState::Finished` 
immediately upon encountering any error. This is similar to the Parquet Async 
Reader.
   
   What do you think about these updates below?
   
   ```suggestion
   enum ReaderState<R: AsyncFileReader> {
       /// Intermediate state to fix ownership issues
       InvalidState,
       /// Initial state, fetch initial range
       Idle { reader: R },
       /// Fetching data from the reader
       FetchingData {
           future: BoxFuture<'static, Result<(R, Bytes), ArrowError>>,
           next_behaviour: FetchNextBehaviour,
       },
       /// Decode a block in a loop until completion
       DecodingBlock { data: Bytes, reader: R },
       /// Output batches from a decoded block
       ReadingBatches {
           data: Bytes,
           block_data: Bytes,
           remaining_in_block: usize,
           reader: R,
       },
       /// Successfully finished reading file contents; drain any remaining 
buffered records
       /// from the decoder into (possibly partial) output batches.
       Flushing,
       /// Terminal state. Always yields `None` and never returns items again.
       Finished,
   }
   ```
   
   Then we can do something like this for `AsyncAvroFileReader::read_next`:
   
   ```rust
   
       /// Terminate the stream after returning this error once.
       #[inline]
       fn finish_with_error(
           &mut self,
           error: ArrowError,
       ) -> Poll<Option<Result<RecordBatch, ArrowError>>> {
           self.reader_state = ReaderState::Finished;
           Poll::Ready(Some(Err(error)))
       }
   
       #[inline]
       fn start_flushing(&mut self) {
           self.reader_state = ReaderState::Flushing;
       }
   
       /// Drain any remaining buffered records from the decoder.
       #[inline]
       fn poll_flush(&mut self) -> Poll<Option<Result<RecordBatch, 
ArrowError>>> {
           match self.decoder.flush() {
               Ok(Some(batch)) => {
                   self.reader_state = ReaderState::Flushing;
                   Poll::Ready(Some(Ok(batch)))
               }
               Ok(None) => {
                   self.reader_state = ReaderState::Finished;
                   Poll::Ready(None)
               }
               Err(e) => self.finish_with_error(e),
           }
       }
   
       fn read_next(&mut self, cx: &mut Context<'_>) -> 
Poll<Option<Result<RecordBatch, ArrowError>>> {
           loop {
               match mem::replace(&mut self.reader_state, 
ReaderState::InvalidState) {
                   ReaderState::Idle { mut reader } => {
                       let range = self.range.clone();
                       if range.start >= range.end {
                           return 
self.finish_with_error(ArrowError::AvroError(format!(
                               "Invalid range specified for Avro file: start {} 
>= end {}, file_size: {}",
                               range.start, range.end, self.file_size
                           )));
                       }
                       let future = async move {
                           let data = reader.get_bytes(range).await?;
                           Ok((reader, data))
                       }
                       .boxed();
                       self.reader_state = ReaderState::FetchingData {
                           future,
                           next_behaviour: FetchNextBehaviour::ReadSyncMarker,
                       };
                   }
                   ReaderState::FetchingData {
                       mut future,
                       next_behaviour,
                   } => {
                       let (reader, data_chunk) = match future.poll_unpin(cx) {
                           Poll::Ready(Ok(data)) => data,
                           Poll::Ready(Err(e)) => return 
self.finish_with_error(e),
                           Poll::Pending => {
                               self.reader_state = ReaderState::FetchingData {
                                   future,
                                   next_behaviour,
                               };
                               return Poll::Pending;
                           }
                       };
                       match next_behaviour {
                           FetchNextBehaviour::ReadSyncMarker => {
                               let sync_marker_pos = data_chunk
                                   .windows(16)
                                   .position(|slice| slice == self.sync_marker);
                               let block_start = match sync_marker_pos {
                                   Some(pos) => pos + 16, // Move past the sync 
marker
                                   None => {
                                       // Sync marker not found, valid if we 
arbitrarily split the file at its end.
                                       self.reader_state = 
ReaderState::Finished;
                                       return Poll::Ready(None);
                                   }
                               };
                               self.reader_state = ReaderState::DecodingBlock {
                                   reader,
                                   data: data_chunk.slice(block_start..),
                               };
                           }
                           FetchNextBehaviour::DecodeVLQHeader => {
                               let mut reader = reader;
                               let mut data = data_chunk;
                               // Feed bytes one at a time until we reach Data 
state (VLQ header complete)
                               while !matches!(self.block_decoder.state(), 
BlockDecoderState::Data) {
                                   if data.is_empty() {
                                       return 
self.finish_with_error(ArrowError::AvroError(
                                           "Unexpected EOF while reading Avro 
block header".into(),
                                       ));
                                   }
                                   let consumed = match 
self.block_decoder.decode(&data[..1]) {
                                       Ok(consumed) => consumed,
                                       Err(e) => return 
self.finish_with_error(e),
                                   };
                                   if consumed == 0 {
                                       return 
self.finish_with_error(ArrowError::AvroError(
                                           "BlockDecoder failed to consume byte 
during VLQ header parsing"
                                               .into(),
                                       ));
                                   }
                                   data = data.slice(consumed..);
                               }
                               // Now we know the block size. Slice remaining 
data to what we need.
                               let bytes_remaining = 
self.block_decoder.bytes_remaining();
                               let data_to_use = 
data.slice(..data.len().min(bytes_remaining));
                               let consumed = match 
self.block_decoder.decode(&data_to_use) {
                                   Ok(consumed) => consumed,
                                   Err(e) => return self.finish_with_error(e),
                               };
                               if consumed != data_to_use.len() {
                                   return 
self.finish_with_error(ArrowError::AvroError(
                                       "BlockDecoder failed to consume all 
bytes after VLQ header parsing"
                                           .into(),
                                   ));
                               }
                               // May need more data to finish the block.
                               let range_to_fetch = match 
self.remaining_block_range() {
                                   Ok(range) if range.is_empty() => {
                                       // All bytes fetched, move to decoding 
block directly
                                       self.reader_state = 
ReaderState::DecodingBlock {
                                           reader,
                                           data: Bytes::new(),
                                       };
                                       continue;
                                   }
                                   Ok(range) => range,
                                   Err(e) => return self.finish_with_error(e),
                               };
                               let future = async move {
                                   let data = 
reader.get_bytes(range_to_fetch).await?;
                                   Ok((reader, data))
                               }
                               .boxed();
                               self.reader_state = ReaderState::FetchingData {
                                   future,
                                   next_behaviour: 
FetchNextBehaviour::ContinueDecoding,
                               };
                               continue;
                           }
                           FetchNextBehaviour::ContinueDecoding => {
                               self.reader_state = ReaderState::DecodingBlock {
                                   reader,
                                   data: data_chunk,
                               };
                           }
                       }
                   }
                   ReaderState::InvalidState => {
                       return self.finish_with_error(ArrowError::AvroError(
                           "AsyncAvroFileReader in invalid state".into(),
                       ));
                   }
                   ReaderState::DecodingBlock {
                       mut reader,
                       mut data,
                   } => {
                       // Try to decode another block from the buffered reader.
                       let consumed = match self.block_decoder.decode(&data) {
                           Ok(consumed) => consumed,
                           Err(e) => return self.finish_with_error(e),
                       };
                       data = data.slice(consumed..);
                       // If we reached the end of the block, flush it, and 
move to read batches.
                       if let Some(block) = self.block_decoder.flush() {
                           // Successfully decoded a block.
                           let block_count = block.count;
                           // We completed (or resumed and completed) a block 
successfully.
                           self.finishing_partial_block = false;
                           let block_data = Bytes::from_owner(if let Some(ref 
codec) = self.codec {
                               match codec.decompress(&block.data) {
                                   Ok(decompressed) => decompressed,
                                   Err(e) => {
                                       return 
self.finish_with_error(ArrowError::AvroError(format!(
                                           "Error decompressing Avro block with 
codec {codec:?}: {e}"
                                       )));
                                   }
                               }
                           } else {
                               block.data
                           });
                           // Since we have an active block, move to reading 
batches
                           self.reader_state = ReaderState::ReadingBatches {
                               reader,
                               data,
                               block_data,
                               remaining_in_block: block_count,
                           };
                           continue;
                       }
                       // data should always be consumed unless Finished, if it 
wasn't, something went wrong
                       if !data.is_empty() {
                           return self.finish_with_error(ArrowError::AvroError(
                               "Unable to make progress decoding Avro block, 
data may be corrupted"
                                   .into(),
                           ));
                       }
                       if matches!(self.block_decoder.state(), 
BlockDecoderState::Finished) {
                           // We've already flushed, so if no batch was 
produced, we are simply done.
                           self.finishing_partial_block = false;
                           self.start_flushing();
                           continue;
                       }
                       // If we've tried the following stage before, and still 
can't decode,
                       // this means the file is truncated or corrupted.
                       if self.finishing_partial_block {
                           return self.finish_with_error(ArrowError::AvroError(
                               "Unexpected EOF while reading last Avro 
block".into(),
                           ));
                       }
                       // Avro splitting case: block is incomplete, we need to:
                       // 1. Parse the length so we know how much to read
                       // 2. Fetch more data from the object store
                       // 3. Create a new block data from the remaining slice 
and the newly fetched data
                       // 4. Continue decoding until end of block
                       self.finishing_partial_block = true;
                       // Mid-block, but we don't know how many bytes are 
missing yet
                       if matches!(
                           self.block_decoder.state(),
                           BlockDecoderState::Count | BlockDecoderState::Size
                       ) {
                           // Max VLQ header is 20 bytes (10 bytes each for 
count and size).
                           // Fetch just enough to complete it.
                           const MAX_VLQ_HEADER_SIZE: u64 = 20;
                           let fetch_end = (self.range.end + 
MAX_VLQ_HEADER_SIZE).min(self.file_size);
                           // If there is nothing more to fetch, error out
                           if fetch_end == self.range.end {
                               return 
self.finish_with_error(ArrowError::AvroError(
                                   "Unexpected EOF while reading Avro block 
header".into(),
                               ));
                           }
                           let range_to_fetch = self.range.end..fetch_end;
                           self.range.end = fetch_end; // Track that we've 
fetched these bytes
                           let future = async move {
                               let data = 
reader.get_bytes(range_to_fetch).await?;
                               Ok((reader, data))
                           }
                           .boxed();
                           self.reader_state = ReaderState::FetchingData {
                               future,
                               next_behaviour: 
FetchNextBehaviour::DecodeVLQHeader,
                           };
                           continue;
                       }
                       // Otherwise, we're mid-block but know how many bytes 
are remaining to fetch.
                       let range_to_fetch = match self.remaining_block_range() {
                           Ok(range) => range,
                           Err(e) => return self.finish_with_error(e),
                       };
                       let future = async move {
                           let data = reader.get_bytes(range_to_fetch).await?;
                           Ok((reader, data))
                       }
                       .boxed();
                       self.reader_state = ReaderState::FetchingData {
                           future,
                           next_behaviour: FetchNextBehaviour::ContinueDecoding,
                       };
                       continue;
                   }
                   ReaderState::ReadingBatches {
                       reader,
                       data,
                       mut block_data,
                       mut remaining_in_block,
                   } => {
                       let (consumed, records_decoded) =
                           match self.decoder.decode_block(&block_data, 
remaining_in_block) {
                               Ok((consumed, records_decoded)) => (consumed, 
records_decoded),
                               Err(e) => return self.finish_with_error(e),
                           };
                       remaining_in_block -= records_decoded;
                       if remaining_in_block == 0 {
                           if data.is_empty() {
                               // No more data to read, drain remaining 
buffered records
                               self.start_flushing();
                           } else {
                               // Finished this block, move to decode next 
block in the next iteration
                               self.reader_state = ReaderState::DecodingBlock { 
reader, data };
                           }
                       } else {
                           // Still more records to decode in this block, slice 
the already-read data and stay in this state
                           block_data = block_data.slice(consumed..);
                           self.reader_state = ReaderState::ReadingBatches {
                               reader,
                               data,
                               block_data,
                               remaining_in_block,
                           };
                       }
                       // We have a full batch ready, emit it
                       // (This is not mutually exclusive with the block being 
finished, so the state change is valid)
                       if self.decoder.batch_is_full() {
                           match self.decoder.flush() {
                               Ok(Some(batch)) => return 
Poll::Ready(Some(Ok(batch))),
                               Ok(None) => {
                                   return 
self.finish_with_error(ArrowError::AvroError(
                                       "Decoder reported a full batch, but 
flush returned None".into(),
                                   ));
                               }
                               Err(e) => return self.finish_with_error(e),
                           }
                       }
                   }
                   ReaderState::Flushing => {
                       return self.poll_flush();
                   }
                   ReaderState::Finished => {
                       // Terminal: once finished (including after an error), 
always yield None
                       self.reader_state = ReaderState::Finished;
                       return Poll::Ready(None);
                   }
               }
           }
       }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to