mzabaluev commented on code in PR #8930:
URL: https://github.com/apache/arrow-rs/pull/8930#discussion_r2725958122


##########
arrow-avro/src/reader/async_reader/mod.rs:
##########
@@ -0,0 +1,1717 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::compression::CompressionCodec;
+use crate::reader::Decoder;
+use crate::reader::block::{BlockDecoder, BlockDecoderState};
+use arrow_array::RecordBatch;
+use arrow_schema::ArrowError;
+use bytes::Bytes;
+use futures::future::BoxFuture;
+use futures::{FutureExt, Stream};
+use std::mem;
+use std::ops::Range;
+use std::pin::Pin;
+use std::task::{Context, Poll};
+
+mod async_file_reader;
+mod builder;
+
+pub use async_file_reader::AsyncFileReader;
+pub use builder::AsyncAvroFileReaderBuilder;
+
+#[cfg(feature = "object_store")]
+mod store;
+
+#[cfg(feature = "object_store")]
+pub use store::AvroObjectReader;
+
+enum FetchNextBehaviour {
+    /// Initial read: scan for sync marker, then move to decoding blocks
+    ReadSyncMarker,
+    /// Parse VLQ header bytes one at a time until Data state, then continue 
decoding
+    DecodeVLQHeader,
+    /// Continue decoding the current block with the fetched data
+    ContinueDecoding,
+}
+
+enum ReaderState<R: AsyncFileReader> {
+    /// Intermediate state to fix ownership issues
+    InvalidState,
+    /// Initial state, fetch initial range
+    Idle { reader: R },
+    /// Fetching data from the reader
+    FetchingData {
+        future: BoxFuture<'static, Result<(R, Bytes), ArrowError>>,
+        next_behaviour: FetchNextBehaviour,
+    },
+    /// Decode a block in a loop until completion
+    DecodingBlock { data: Bytes, reader: R },
+    /// Output batches from a decoded block
+    ReadingBatches {
+        data: Bytes,
+        block_data: Bytes,
+        remaining_in_block: usize,
+        reader: R,
+    },
+    /// Successfully finished reading file contents; drain any remaining 
buffered records
+    /// from the decoder into (possibly partial) output batches.
+    Flushing,
+    /// Done, flush decoder and return
+    Finished,
+}
+
+/// An asynchronous Avro file reader that implements `Stream<Item = 
Result<RecordBatch, ArrowError>>`.
+/// This uses an [`AsyncFileReader`] to fetch data ranges as needed, starting 
with fetching the header,
+/// then reading all the blocks in the provided range where:
+/// 1. Reads and decodes data until the header is fully decoded.
+/// 2. Searching from `range.start` for the first sync marker, and starting 
with the following block.
+///    (If `range.start` is less than the header length, we start at the 
header length minus the sync marker bytes)
+/// 3. Reading blocks sequentially, decoding them into RecordBatches.
+/// 4. If a block is incomplete (due to range ending mid-block), fetching the 
remaining bytes from the [`AsyncFileReader`].
+/// 5. If no range was originally provided, reads the full file.
+/// 6. If the range is 0, file_size is 0, or `range.end` is less than the 
header length, finish immediately.
+pub struct AsyncAvroFileReader<R: AsyncFileReader> {
+    // Members required to fetch data
+    range: Range<u64>,
+    file_size: u64,
+
+    // Members required to actually decode and read data
+    decoder: Decoder,
+    block_decoder: BlockDecoder,
+    codec: Option<CompressionCodec>,
+    sync_marker: [u8; 16],
+
+    // Members keeping the current state of the reader
+    reader_state: ReaderState<R>,
+    finishing_partial_block: bool,
+}
+
+impl<R: AsyncFileReader + Unpin + 'static> AsyncAvroFileReader<R> {

Review Comment:
   The `Unpin` bound should not be necessary.



##########
arrow-avro/src/reader/async_reader/builder.rs:
##########
@@ -0,0 +1,239 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::codec::AvroFieldBuilder;
+use crate::reader::async_reader::ReaderState;
+use crate::reader::header::{Header, HeaderDecoder};
+use crate::reader::record::RecordDecoder;
+use crate::reader::{AsyncAvroFileReader, AsyncFileReader, Decoder};
+use crate::schema::{AvroSchema, FingerprintAlgorithm, SCHEMA_METADATA_KEY};
+use arrow_schema::ArrowError;
+use indexmap::IndexMap;
+use std::ops::Range;
+
+const DEFAULT_HEADER_SIZE_HINT: u64 = 16 * 1024; // 16 KB
+
+/// Builder for an asynchronous Avro file reader.
+pub struct AsyncAvroFileReaderBuilder<R: AsyncFileReader> {
+    reader: R,
+    file_size: u64,
+    batch_size: usize,
+    range: Option<Range<u64>>,
+    reader_schema: Option<AvroSchema>,
+    projection: Option<Vec<usize>>,
+    header_size_hint: Option<u64>,
+    utf8_view: bool,
+    strict_mode: bool,
+}
+
+impl<R: AsyncFileReader + Unpin + 'static> AsyncAvroFileReaderBuilder<R> {

Review Comment:
   I don't think `Unpin` and `'static` bounds are necessary to explicitly 
specify. In fact, most of the methods such as `new`, and the builder setters 
don't need any bounds on the impl; only `try_build` does. Also the struct 
definition could do without the `AsyncFileReader` bound on the parameter, to 
cut on the boilerplate.



##########
arrow-avro/src/reader/async_reader/mod.rs:
##########
@@ -0,0 +1,1717 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::compression::CompressionCodec;
+use crate::reader::Decoder;
+use crate::reader::block::{BlockDecoder, BlockDecoderState};
+use arrow_array::RecordBatch;
+use arrow_schema::ArrowError;
+use bytes::Bytes;
+use futures::future::BoxFuture;
+use futures::{FutureExt, Stream};
+use std::mem;
+use std::ops::Range;
+use std::pin::Pin;
+use std::task::{Context, Poll};
+
+mod async_file_reader;
+mod builder;
+
+pub use async_file_reader::AsyncFileReader;
+pub use builder::AsyncAvroFileReaderBuilder;
+
+#[cfg(feature = "object_store")]
+mod store;
+
+#[cfg(feature = "object_store")]
+pub use store::AvroObjectReader;
+
+enum FetchNextBehaviour {
+    /// Initial read: scan for sync marker, then move to decoding blocks
+    ReadSyncMarker,
+    /// Parse VLQ header bytes one at a time until Data state, then continue 
decoding
+    DecodeVLQHeader,
+    /// Continue decoding the current block with the fetched data
+    ContinueDecoding,
+}
+
+enum ReaderState<R: AsyncFileReader> {
+    /// Intermediate state to fix ownership issues
+    InvalidState,
+    /// Initial state, fetch initial range
+    Idle { reader: R },
+    /// Fetching data from the reader
+    FetchingData {
+        future: BoxFuture<'static, Result<(R, Bytes), ArrowError>>,
+        next_behaviour: FetchNextBehaviour,
+    },
+    /// Decode a block in a loop until completion
+    DecodingBlock { data: Bytes, reader: R },
+    /// Output batches from a decoded block
+    ReadingBatches {
+        data: Bytes,
+        block_data: Bytes,
+        remaining_in_block: usize,
+        reader: R,
+    },
+    /// Successfully finished reading file contents; drain any remaining 
buffered records
+    /// from the decoder into (possibly partial) output batches.
+    Flushing,
+    /// Done, flush decoder and return
+    Finished,
+}
+
+/// An asynchronous Avro file reader that implements `Stream<Item = 
Result<RecordBatch, ArrowError>>`.
+/// This uses an [`AsyncFileReader`] to fetch data ranges as needed, starting 
with fetching the header,
+/// then reading all the blocks in the provided range where:
+/// 1. Reads and decodes data until the header is fully decoded.
+/// 2. Searching from `range.start` for the first sync marker, and starting 
with the following block.
+///    (If `range.start` is less than the header length, we start at the 
header length minus the sync marker bytes)
+/// 3. Reading blocks sequentially, decoding them into RecordBatches.
+/// 4. If a block is incomplete (due to range ending mid-block), fetching the 
remaining bytes from the [`AsyncFileReader`].
+/// 5. If no range was originally provided, reads the full file.
+/// 6. If the range is 0, file_size is 0, or `range.end` is less than the 
header length, finish immediately.
+pub struct AsyncAvroFileReader<R: AsyncFileReader> {

Review Comment:
   The parameter bound is dragged into every type definition that contains this 
struct. A better style IMO is to only specify the bounds on the impl block of 
the methods that really need them.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to