EmilyMatt commented on code in PR #8930:
URL: https://github.com/apache/arrow-rs/pull/8930#discussion_r2702557621


##########
arrow-avro/src/reader/async_reader/mod.rs:
##########
@@ -0,0 +1,1157 @@
+use crate::compression::CompressionCodec;
+use crate::reader::Decoder;
+use crate::reader::block::BlockDecoder;
+use crate::reader::vlq::VLQDecoder;
+use arrow_array::RecordBatch;
+use arrow_schema::ArrowError;
+use bytes::Bytes;
+use futures::future::BoxFuture;
+use futures::{FutureExt, Stream};
+use std::mem;
+use std::ops::Range;
+use std::pin::Pin;
+use std::task::{Context, Poll};
+
+mod async_file_reader;
+mod builder;
+
+pub use async_file_reader::AsyncFileReader;
+pub use builder::AsyncAvroFileReaderBuilder;
+
+#[cfg(feature = "object_store")]
+mod store;
+
+#[cfg(feature = "object_store")]
+pub use store::AvroObjectReader;
+
+enum ReaderState<R: AsyncFileReader> {
+    Idle {
+        reader: R,
+    },
+    FirstFetch {
+        future: BoxFuture<'static, Result<(R, Bytes), ArrowError>>,
+    },
+    Limbo,
+    DecodingBlock {
+        data: Bytes,
+        reader: R,
+    },
+    ReadingBatches {
+        data: Bytes,
+        block_data: Bytes,
+        remaining_in_block: usize,
+        reader: R,
+    },
+    ReadingFinalBlock {
+        current_data: Bytes,
+        future: BoxFuture<'static, Result<(R, Bytes), ArrowError>>,
+    },
+    Finished,
+}
+
+/// An asynchronous Avro file reader that implements `Stream<Item = 
Result<RecordBatch, ArrowError>>`.
+/// This uses an [`AsyncFileReader`] to fetch data ranges as needed, starting 
with fetching the header,
+/// then reading all the blocks in the provided range where:
+/// 1. Reads and decodes data until the header is fully decoded.
+/// 2. Searching from `range.start` for the first sync marker, and starting 
with the following block.
+///    (If `range.start` is less than the header length, we start at the 
header length minus the sync marker bytes)
+/// 3. Reading blocks sequentially, decoding them into RecordBatches.
+/// 4. If a block is incomplete (due to range ending mid-block), fetching the 
remaining bytes from the [`AsyncFileReader`].
+/// 5. If no range was originally provided, reads the full file.
+/// 6. If the range is 0, file_size is 0, or `range.end` is less than the 
header length, finish immediately.
+pub struct AsyncAvroFileReader<R: AsyncFileReader> {
+    // Members required to fetch data
+    range: Range<u64>,
+    file_size: u64,
+
+    // Members required to actually decode and read data
+    decoder: Decoder,
+    block_decoder: BlockDecoder,
+    codec: Option<CompressionCodec>,
+    sync_marker: [u8; 16],
+
+    // Members keeping the current state of the reader
+    reader_state: ReaderState<R>,
+    finishing_partial_block: bool,
+}
+
+impl<R: AsyncFileReader + Unpin + 'static> AsyncAvroFileReader<R> {
+    /// Returns a builder for a new [`Self`], allowing some optional 
parameters.
+    pub fn builder(reader: R, file_size: u64, batch_size: usize) -> 
AsyncAvroFileReaderBuilder<R> {
+        AsyncAvroFileReaderBuilder::new(reader, file_size, batch_size)
+    }
+
+    fn new(
+        range: Range<u64>,
+        file_size: u64,
+        decoder: Decoder,
+        codec: Option<CompressionCodec>,
+        sync_marker: [u8; 16],
+        reader_state: ReaderState<R>,
+    ) -> Self {
+        Self {
+            range,
+            file_size,
+
+            decoder,
+            block_decoder: Default::default(),
+            codec,
+            sync_marker,
+
+            reader_state,
+            finishing_partial_block: false,
+        }
+    }
+
+    fn read_next(&mut self, cx: &mut Context<'_>) -> 
Poll<Option<Result<RecordBatch, ArrowError>>> {
+        loop {
+            match mem::replace(&mut self.reader_state, ReaderState::Limbo) {
+                ReaderState::Idle { mut reader } => {
+                    let range = self.range.clone();
+                    if range.start >= range.end || range.end > self.file_size {
+                        return 
Poll::Ready(Some(Err(ArrowError::AvroError(format!(
+                            "Invalid range specified for Avro file: start {} 
>= end {}, file_size: {}",
+                            range.start, range.end, self.file_size
+                        )))));
+                    }
+
+                    let future = async move {
+                        let data = reader.get_bytes(range).await?;
+                        Ok((reader, data))
+                    }
+                    .boxed();
+
+                    self.reader_state = ReaderState::FirstFetch { future };
+                }
+                ReaderState::FirstFetch { mut future } => {
+                    let (reader, data_chunk) = match future.poll_unpin(cx) {
+                        Poll::Ready(Ok(data)) => data,
+                        Poll::Ready(Err(e)) => {
+                            return Poll::Ready(Some(Err(e)));
+                        }
+                        Poll::Pending => {
+                            self.reader_state = ReaderState::FirstFetch { 
future };
+                            return Poll::Pending;
+                        }
+                    };
+
+                    let sync_marker_pos = data_chunk
+                        .windows(16)
+                        .position(|slice| slice == self.sync_marker);
+                    let block_start = match sync_marker_pos {
+                        Some(pos) => pos + 16, // Move past the sync marker
+                        None => {
+                            // Sync marker not found, this is actually valid 
if we arbitrarily split the file at its end.
+                            self.reader_state = ReaderState::Finished;
+                            return Poll::Ready(None);
+                        }
+                    };
+
+                    // This is the first time we read data, so try and find 
the sync marker.
+                    self.reader_state = ReaderState::DecodingBlock {
+                        reader,
+                        data: data_chunk.slice(block_start..),
+                    };
+                }
+                ReaderState::Limbo => {
+                    unreachable!("ReaderState::Limbo should never be 
observed");
+                }
+                ReaderState::DecodingBlock {
+                    mut reader,
+                    mut data,
+                } => {
+                    // Try to decode another block from the buffered reader.
+                    let consumed = self.block_decoder.decode(&data)?;
+                    if consumed == 0 {
+                        // If the last block was exactly at the end of the 
file,
+                        // we're simply done reading.
+                        if data.is_empty() {
+                            let final_batch = self.decoder.flush();
+                            self.reader_state = ReaderState::Finished;
+                            return Poll::Ready(final_batch.transpose());
+                        }
+
+                        // If we've tried the following stage before, and 
still can't decode,
+                        // this means the file is truncated or corrupted.
+                        if self.finishing_partial_block {
+                            return Poll::Ready(Some(Err(ArrowError::AvroError(
+                                "Unexpected EOF while reading last Avro 
block".into(),
+                            ))));
+                        }
+
+                        // Avro splitting case: block is incomplete, we need 
to:
+                        // 1. Parse the length so we know how much to read
+                        // 2. Fetch more data from the object store
+                        // 3. Create a new block data from the remaining slice 
and the newly fetched data
+                        // 4. Continue decoding until end of block
+                        self.finishing_partial_block = true;
+
+                        let (size, vlq_header_len) = {
+                            let mut vlq = VLQDecoder::default();
+                            let mut vlq_buf = &data[..];
+                            let original_len = vlq_buf.len();
+
+                            let _ = vlq.long(&mut vlq_buf).ok_or_else(|| {
+                                ArrowError::AvroError(
+                                    "Unexpected EOF while reading Avro block 
count".into(),
+                                )
+                            })?;
+
+                            let size = vlq.long(&mut vlq_buf).ok_or_else(|| {
+                                ArrowError::AvroError(
+                                    "Unexpected EOF while reading Avro block 
size".into(),
+                                )
+                            })? as u64;
+
+                            // Calculate how many bytes were consumed by the 
two VLQ integers
+                            let header_len = 
original_len.checked_sub(vlq_buf.len()).unwrap();
+
+                            (size, header_len as u64)
+                        };
+
+                        // Two longs: count and size have already been read, 
but using our vlq,
+                        // meaning they were not consumed.
+                        let total_block_size = size + vlq_header_len;

Review Comment:
   Indeed there was, fixed using a common util function because I needed this 
in two places



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to