jecsand838 commented on code in PR #8930:
URL: https://github.com/apache/arrow-rs/pull/8930#discussion_r2566495381
##########
arrow-avro/Cargo.toml:
##########
@@ -36,14 +36,15 @@ bench = false
all-features = true
[features]
-default = ["deflate", "snappy", "zstd", "bzip2", "xz"]
+default = ["deflate", "snappy", "zstd", "bzip2", "xz", "object_store"]
Review Comment:
Not sure about having `object_store` as a default imo. Seems a bit heavy to
me.
##########
arrow-avro/src/reader/async_reader.rs:
##########
@@ -0,0 +1,1074 @@
+use crate::codec::AvroFieldBuilder;
+use crate::reader::Decoder;
+use crate::reader::block::BlockDecoder;
+use crate::reader::header::{Header, HeaderDecoder};
+use crate::reader::record::RecordDecoder;
+use crate::reader::vlq::VLQDecoder;
+use crate::schema::{AvroSchema, FingerprintAlgorithm};
+use arrow_array::RecordBatch;
+use arrow_schema::ArrowError;
+use bytes::Bytes;
+use futures::FutureExt;
+use futures::Stream;
+use indexmap::IndexMap;
+use object_store::path::Path;
+use std::future::Future;
+use std::mem;
+use std::ops::Range;
+use std::pin::Pin;
+use std::sync::Arc;
+use std::task::{Context, Poll};
+
+type DataFetchFutureBoxed = Pin<Box<dyn Future<Output = Result<Vec<Bytes>,
ArrowError>> + Send>>;
+
+/// Read the Avro file header (magic, metadata, sync marker) from `reader`.
+async fn read_header(
+ store: &Arc<dyn object_store::ObjectStore>,
+ location: &Path,
+ file_size: u64,
+) -> Result<(Header, u64), ArrowError> {
+ let mut decoder = HeaderDecoder::default();
+ let mut position = 0;
+ loop {
+ let range_to_fetch = position..(position + 64 * 1024).min(file_size);
+ let current_data = store
+ .get_range(location, range_to_fetch)
+ .await
+ .map_err(|err| {
+ ArrowError::AvroError(format!(
+ "Error fetching Avro header from object store: {err}"
+ ))
+ })?;
+ if current_data.is_empty() {
+ break;
+ }
+ let read = current_data.len();
+ let decoded = decoder.decode(¤t_data)?;
+ if decoded != read {
+ position += decoded as u64;
+ break;
+ }
+ position += read as u64;
+ }
+
+ decoder
+ .flush()
+ .map(|header| (header, position))
+ .ok_or_else(|| ArrowError::AvroError("Unexpected EOF while reading
Avro header".into()))
+}
+
+enum ReaderState {
+ Idle,
+ Limbo,
+ FetchingData(Pin<Box<dyn Future<Output = Result<Vec<Bytes>, ArrowError>> +
Send>>),
+ DecodingBlock,
+ ReadingBatches,
+ Finished,
+}
+
+/// An asynchronous Avro file reader that implements `Stream<Item =
Result<RecordBatch, ArrowError>>`.
+/// This uses ObjectStore to fetch data ranges as needed, starting with
fetching the header,
+/// then reading all the blocks in the provided range where:
+/// 1. Reads and decodes data until the header is fully decoded.
+/// 2. Searching from `range.start` for the first sync marker, and starting
with the following block.
+/// (If `range.start` is less than the header length, we start at the
header length minus the sync marker bytes)
+/// 3. Reading blocks sequentially, decoding them into RecordBatches.
+/// 4. If a block is incomplete (due to range ending mid-block), fetching the
remaining bytes from ObjectStore.
+/// 5. If no range was originally provided, reads the full file.
+/// 6. If the range is 0, file_size is 0, or `range.end` is less than the
header length, finish immediately.
+pub struct AsyncAvroReader {
+ store: Arc<dyn object_store::ObjectStore>,
Review Comment:
I think the biggest high-level concern I have is the `object_store`
hardwiring. My gut tells me we'd be better off with a generic
`AsyncFileReader<T: AsyncRead + AsyncSeek>` or similar trait as the primary
abstraction, with `object_store` as one feature flagged adapter imo.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]