jecsand838 commented on code in PR #8930:
URL: https://github.com/apache/arrow-rs/pull/8930#discussion_r2600755125
##########
arrow-avro/src/reader/async_reader/mod.rs:
##########
@@ -0,0 +1,1021 @@
+use crate::compression::CompressionCodec;
+use crate::reader::Decoder;
+use crate::reader::block::BlockDecoder;
+use crate::reader::vlq::VLQDecoder;
+use arrow_array::RecordBatch;
+use arrow_schema::{ArrowError, SchemaRef};
+use bytes::Bytes;
+use futures::FutureExt;
+use futures::Stream;
+use futures::future::BoxFuture;
+use std::mem;
+use std::ops::Range;
+use std::pin::Pin;
+use std::task::{Context, Poll};
+
+mod async_file_reader;
+mod builder;
+
+pub use async_file_reader::AsyncFileReader;
+pub use builder::AsyncAvroReaderBuilder;
+
+#[cfg(feature = "object_store")]
+mod object_store_reader;
+
+#[cfg(feature = "object_store")]
+pub use object_store_reader::ObjectStoreFileReader;
+
+type DataFetchFutureBoxed = BoxFuture<'static, Result<Bytes, ArrowError>>;
+
+enum ReaderState {
+ Idle,
+ Limbo,
+ FetchingData {
+ existing_data: Option<Bytes>,
+ fetch_future: DataFetchFutureBoxed,
+ },
+ DecodingBlock {
+ data: Bytes,
+ },
+ ReadingBatches {
+ data: Bytes,
+ block_data: Bytes,
+ remaining_in_block: usize,
+ },
+ Finished,
+}
+
+/// An asynchronous Avro file reader that implements `Stream<Item =
Result<RecordBatch, ArrowError>>`.
+/// This uses an [`AsyncFileReader`] to fetch data ranges as needed, starting
with fetching the header,
+/// then reading all the blocks in the provided range where:
+/// 1. Reads and decodes data until the header is fully decoded.
+/// 2. Searching from `range.start` for the first sync marker, and starting
with the following block.
+/// (If `range.start` is less than the header length, we start at the
header length minus the sync marker bytes)
+/// 3. Reading blocks sequentially, decoding them into RecordBatches.
+/// 4. If a block is incomplete (due to range ending mid-block), fetching the
remaining bytes from the [`AsyncFileReader`].
+/// 5. If no range was originally provided, reads the full file.
+/// 6. If the range is 0, file_size is 0, or `range.end` is less than the
header length, finish immediately.
+pub struct AsyncAvroReader<R: AsyncFileReader> {
+ // Members required to fetch data
+ reader: R,
+ range: Range<u64>,
+ file_size: u64,
+
+ // Members required to actually decode and read data
+ decoder: Decoder,
+ block_decoder: BlockDecoder,
+ codec: Option<CompressionCodec>,
+ sync_marker: [u8; 16],
+
+ // Members keeping the current state of the reader
+ reader_state: ReaderState,
+ finishing_partial_block: bool,
+}
+
+impl<R: AsyncFileReader> AsyncAvroReader<R> {
+ /// Returns a builder for a new [`Self`], allowing some optional
parameters.
+ pub fn builder(
+ reader: R,
+ file_size: u64,
+ schema: SchemaRef,
Review Comment:
I really don't think the `schema` field should be required here as it
subtracts from Avro's self-describing characteristic while effectively making
the optional `reader_schema` required.
I'd recommend setting this up in a manner that encourages callers to use
`with_reader_schema`. That way callers which simply want to read an OCF file
without schema resolution are optimally supported.
If we absolutely need to support passing in an Arrow `reader_schema`, then
I'd recommend adding an optional (and well documented)
`with_arrow_reader_schema` method (to compliment `with_reader_schema`) that
inputs an Arrow `SchemaRef` and runs `AvroSchema::try_from` on it.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]