ariel-miculas commented on code in PR #9632:
URL: https://github.com/apache/arrow-rs/pull/9632#discussion_r3021941480


##########
arrow-avro/src/reader/async_reader/mod.rs:
##########
@@ -46,39 +46,214 @@ use crate::errors::AvroError;
 #[cfg(feature = "object_store")]
 pub use store::AvroObjectReader;
 
-enum FetchNextBehaviour {
-    /// Initial read: scan for sync marker, then move to decoding blocks
-    ReadSyncMarker,
-    /// Parse VLQ header bytes one at a time until Data state, then continue 
decoding
-    DecodeVLQHeader,
-    /// Continue decoding the current block with the fetched data
-    ContinueDecoding,
-}
-
-enum ReaderState<R> {
-    /// Intermediate state to fix ownership issues
-    InvalidState,
-    /// Initial state, fetch initial range
-    Idle { reader: R },
-    /// Fetching data from the reader
-    FetchingData {
-        future: BoxFuture<'static, Result<(R, Bytes), AvroError>>,
-        next_behaviour: FetchNextBehaviour,
+/// State of the block-reading loop inside [`make_stream`].
+///
+/// The active `stream` lives alongside `reader` as a local variable in the 
async block;
+/// the compiler's async state machine (via `Pin`) handles that 
self-referential borrow
+/// automatically — no `unsafe` required.
+enum Phase {
+    ReadSyncMarker {
+        accumulated: BytesMut,
+    },
+    DecodingBlock {
+        buffered: Bytes,
     },
-    /// Decode a block in a loop until completion
-    DecodingBlock { data: Bytes, reader: R },
-    /// Output batches from a decoded block
     ReadingBatches {
-        data: Bytes,
         block_data: Bytes,
-        remaining_in_block: usize,
-        reader: R,
+        remaining: usize,
+        /// Bytes left over in the stream chunk after the completed block.
+        next_buffered: Bytes,
     },
-    /// Successfully finished reading file contents; drain any remaining 
buffered records
-    /// from the decoder into (possibly partial) output batches.
-    Flushing,
-    /// Done, flush decoder and return
-    Finished,
+}
+
+/// The non-reader fields of [`AvroReaderState`], split out so that
+/// `remaining_block_range` can borrow them while `reader` is mutably borrowed 
by the active stream.
+struct AvroReaderMeta {
+    range: Range<u64>,
+    file_size: u64,
+    decoder: Decoder,
+    codec: Option<CompressionCodec>,
+    sync_marker: [u8; 16],
+}
+
+impl AvroReaderMeta {
+    /// Calculate the byte range needed to complete the current block.
+    /// Only valid when `block_decoder` is in `Data` or `Sync` state.
+    fn remaining_block_range(
+        &self,
+        block_decoder: &BlockDecoder,
+    ) -> Result<Range<u64>, ArrowError> {
+        let remaining = block_decoder.bytes_remaining() as u64
+            + match block_decoder.state() {
+                BlockDecoderState::Data => 16, // Include sync marker
+                BlockDecoderState::Sync => 0,
+                state => {
+                    return Err(ArrowError::ParseError(format!(
+                        "remaining_block_range called in unexpected state: 
{state:?}"
+                    )));
+                }
+            };
+        let fetch_end = self.range.end + remaining;
+        if fetch_end > self.file_size {
+            return Err(ArrowError::ParseError(format!(
+                "Avro block requires more bytes than what exists in the file: \
+                 fetch_end {fetch_end}, remaining {remaining}, file_size {}",
+                self.file_size
+            )));
+        }
+        Ok(self.range.end..fetch_end)
+    }
+}
+
+/// Owns all the state needed to drive the Avro block-reading loop.
+struct AvroReaderState<R> {
+    reader: R,
+    meta: AvroReaderMeta,
+}
+
+/// Drives all reading and decoding as a natural async generator.
+///
+/// `state.reader` is owned (moved in). `stream` is a local borrow of 
`state.reader` inside the
+/// `async_stream::try_stream!` block; the compiler's generated async state 
machine stores
+/// both and manages the lifetime relationship safely via `Pin`.
+fn make_stream<R: AsyncFileReader + Unpin + Send + 'static>(
+    mut state: AvroReaderState<R>,
+) -> impl Stream<Item = Result<RecordBatch, ArrowError>> + Send + 'static {
+    async_stream::try_stream! {
+        if state.meta.range.start >= state.meta.range.end {

Review Comment:
   In my end-to-end benchmark, prefetching is done by the object-store 
implementation, specifically in the call to `get_opts`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to