jecsand838 commented on code in PR #8930:
URL: https://github.com/apache/arrow-rs/pull/8930#discussion_r2710516335
##########
arrow-avro/src/reader/async_reader/mod.rs:
##########
@@ -0,0 +1,1157 @@
+use crate::compression::CompressionCodec;
+use crate::reader::Decoder;
+use crate::reader::block::BlockDecoder;
+use crate::reader::vlq::VLQDecoder;
+use arrow_array::RecordBatch;
+use arrow_schema::ArrowError;
+use bytes::Bytes;
+use futures::future::BoxFuture;
+use futures::{FutureExt, Stream};
+use std::mem;
+use std::ops::Range;
+use std::pin::Pin;
+use std::task::{Context, Poll};
+
+mod async_file_reader;
+mod builder;
+
+pub use async_file_reader::AsyncFileReader;
+pub use builder::AsyncAvroFileReaderBuilder;
+
+#[cfg(feature = "object_store")]
+mod store;
+
+#[cfg(feature = "object_store")]
+pub use store::AvroObjectReader;
+
+enum ReaderState<R: AsyncFileReader> {
+ Idle {
+ reader: R,
+ },
+ FirstFetch {
+ future: BoxFuture<'static, Result<(R, Bytes), ArrowError>>,
+ },
+ Limbo,
+ DecodingBlock {
+ data: Bytes,
+ reader: R,
+ },
+ ReadingBatches {
+ data: Bytes,
+ block_data: Bytes,
+ remaining_in_block: usize,
+ reader: R,
+ },
+ ReadingFinalBlock {
+ current_data: Bytes,
+ future: BoxFuture<'static, Result<(R, Bytes), ArrowError>>,
+ },
+ Finished,
+}
+
+/// An asynchronous Avro file reader that implements `Stream<Item =
Result<RecordBatch, ArrowError>>`.
+/// This uses an [`AsyncFileReader`] to fetch data ranges as needed, starting
with fetching the header,
+/// then reading all the blocks in the provided range where:
+/// 1. Reads and decodes data until the header is fully decoded.
+/// 2. Searching from `range.start` for the first sync marker, and starting
with the following block.
+/// (If `range.start` is less than the header length, we start at the
header length minus the sync marker bytes)
+/// 3. Reading blocks sequentially, decoding them into RecordBatches.
+/// 4. If a block is incomplete (due to range ending mid-block), fetching the
remaining bytes from the [`AsyncFileReader`].
+/// 5. If no range was originally provided, reads the full file.
+/// 6. If the range is 0, file_size is 0, or `range.end` is less than the
header length, finish immediately.
+pub struct AsyncAvroFileReader<R: AsyncFileReader> {
+ // Members required to fetch data
+ range: Range<u64>,
+ file_size: u64,
+
+ // Members required to actually decode and read data
+ decoder: Decoder,
+ block_decoder: BlockDecoder,
+ codec: Option<CompressionCodec>,
+ sync_marker: [u8; 16],
+
+ // Members keeping the current state of the reader
+ reader_state: ReaderState<R>,
+ finishing_partial_block: bool,
+}
+
+impl<R: AsyncFileReader + Unpin + 'static> AsyncAvroFileReader<R> {
+ /// Returns a builder for a new [`Self`], allowing some optional
parameters.
+ pub fn builder(reader: R, file_size: u64, batch_size: usize) ->
AsyncAvroFileReaderBuilder<R> {
+ AsyncAvroFileReaderBuilder::new(reader, file_size, batch_size)
+ }
+
+ fn new(
+ range: Range<u64>,
+ file_size: u64,
+ decoder: Decoder,
+ codec: Option<CompressionCodec>,
+ sync_marker: [u8; 16],
+ reader_state: ReaderState<R>,
+ ) -> Self {
+ Self {
+ range,
+ file_size,
+
+ decoder,
+ block_decoder: Default::default(),
+ codec,
+ sync_marker,
+
+ reader_state,
+ finishing_partial_block: false,
+ }
+ }
+
+ fn read_next(&mut self, cx: &mut Context<'_>) ->
Poll<Option<Result<RecordBatch, ArrowError>>> {
+ loop {
+ match mem::replace(&mut self.reader_state, ReaderState::Limbo) {
+ ReaderState::Idle { mut reader } => {
+ let range = self.range.clone();
+ if range.start >= range.end || range.end > self.file_size {
+ return
Poll::Ready(Some(Err(ArrowError::AvroError(format!(
+ "Invalid range specified for Avro file: start {}
>= end {}, file_size: {}",
+ range.start, range.end, self.file_size
+ )))));
+ }
+
+ let future = async move {
+ let data = reader.get_bytes(range).await?;
+ Ok((reader, data))
+ }
+ .boxed();
+
+ self.reader_state = ReaderState::FirstFetch { future };
+ }
+ ReaderState::FirstFetch { mut future } => {
+ let (reader, data_chunk) = match future.poll_unpin(cx) {
+ Poll::Ready(Ok(data)) => data,
+ Poll::Ready(Err(e)) => {
+ return Poll::Ready(Some(Err(e)));
+ }
+ Poll::Pending => {
+ self.reader_state = ReaderState::FirstFetch {
future };
+ return Poll::Pending;
+ }
+ };
+
+ let sync_marker_pos = data_chunk
+ .windows(16)
+ .position(|slice| slice == self.sync_marker);
+ let block_start = match sync_marker_pos {
+ Some(pos) => pos + 16, // Move past the sync marker
+ None => {
+ // Sync marker not found, this is actually valid
if we arbitrarily split the file at its end.
+ self.reader_state = ReaderState::Finished;
+ return Poll::Ready(None);
+ }
+ };
+
+ // This is the first time we read data, so try and find
the sync marker.
+ self.reader_state = ReaderState::DecodingBlock {
+ reader,
+ data: data_chunk.slice(block_start..),
+ };
+ }
+ ReaderState::Limbo => {
+ unreachable!("ReaderState::Limbo should never be
observed");
+ }
Review Comment:
@EmilyMatt
> Regarding 1. I was sure polling again after error violates the stream
contract, but perhaps I was mistaken, in which case I apologize for speaking
too confidently about there being no such code paths^^
All good! Polling again after an *error item* is not a `Stream` contract
violation that I'm aware of. The `Stream` contract only explicitly warns about
polling again after the stream has *terminated* (i.e. after `Ready(None)`).
With `Stream<Item = Result<_, _>>`, an `Err` is just another yielded item, and
it’s valid for callers to keep polling unless the stream returns `None`.
> I believe 2 is impossible, due to ownership semantics of async behaviour,
this can be observed in the other async readers as well, they all usually do
something similar when implementing streams manually.
I agree that removing `mem::replace` on the entire enum isn’t always
practical in these manual state machines -- you can do it, but I get that will
require a large refactor (pin-projection / splitting state / Options, etc.).
The `mem::replace` pattern is totally reasonable here, just wanted to make sure
we're careful about panics in the hot path of such an upstream project.
> Do you think an implementation such as the one I've pushed now is viable?
or perhaps it is best to merge the Error state and invalid state?
I feel like using Finished is too risky for the reasons I've previously
explained
On the current implementation: I glanced over the latest changes and I think
the updated `InvalidState` sentinel approach + removing `?`/early returns
inside the replaced-state match is viable. I'll conduct a follow-up code review
right now, but I'm pretty much inclined to approve this.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]