alamb commented on code in PR #8159:
URL: https://github.com/apache/arrow-rs/pull/8159#discussion_r2478112777


##########
parquet/src/arrow/async_reader/mod.rs:
##########
@@ -483,300 +476,113 @@ impl<T: AsyncFileReader + Send + 'static> 
ParquetRecordBatchStreamBuilder<T> {
     ///
     /// See examples on [`ParquetRecordBatchStreamBuilder::new`]
     pub fn build(self) -> Result<ParquetRecordBatchStream<T>> {
-        let num_row_groups = self.metadata.row_groups().len();
-
-        let row_groups = match self.row_groups {
-            Some(row_groups) => {
-                if let Some(col) = row_groups.iter().find(|x| **x >= 
num_row_groups) {
-                    return Err(general_err!(
-                        "row group {} out of bounds 0..{}",
-                        col,
-                        num_row_groups
-                    ));
-                }
-                row_groups.into()
-            }
-            None => (0..self.metadata.row_groups().len()).collect(),
-        };
-
-        // Try to avoid allocate large buffer
-        let batch_size = self
-            .batch_size
-            .min(self.metadata.file_metadata().num_rows() as usize);
-        let reader_factory = ReaderFactory {
-            input: self.input.0,
-            filter: self.filter,
-            metadata: self.metadata.clone(),
-            fields: self.fields,
-            limit: self.limit,
-            offset: self.offset,
-            metrics: self.metrics,
-            max_predicate_cache_size: self.max_predicate_cache_size,
-        };
+        let Self {
+            input,
+            metadata,
+            schema,
+            fields,
+            batch_size,
+            row_groups,
+            projection,
+            filter,
+            selection,
+            limit,
+            offset,
+            metrics,
+            max_predicate_cache_size,
+        } = self;
 
         // Ensure schema of ParquetRecordBatchStream respects projection, and 
does
         // not store metadata (same as for ParquetRecordBatchReader and 
emitted RecordBatches)
-        let projected_fields = match reader_factory.fields.as_deref().map(|pf| 
&pf.arrow_type) {
-            Some(DataType::Struct(fields)) => {
-                fields.filter_leaves(|idx, _| 
self.projection.leaf_included(idx))
-            }
-            None => Fields::empty(),
-            _ => unreachable!("Must be Struct for root type"),
-        };
-        let schema = Arc::new(Schema::new(projected_fields));
-
-        Ok(ParquetRecordBatchStream {
-            metadata: self.metadata,
+        let projected_fields = schema
+            .fields
+            .filter_leaves(|idx, _| projection.leaf_included(idx));
+        let projected_schema = Arc::new(Schema::new(projected_fields));
+
+        let decoder = ParquetPushDecoderBuilder {
+            // Async reader doesn't know the overall size of the input, so 
hard code to 0
+            input: 0,
+            metadata,
+            schema,
+            fields,
+            projection,
+            filter,
+            selection,
             batch_size,
             row_groups,
-            projection: self.projection,
-            selection: self.selection,
-            schema,
-            reader_factory: Some(reader_factory),
-            state: StreamState::Init,
+            limit,
+            offset,
+            metrics,
+            max_predicate_cache_size,
+        }
+        .build()?;
+
+        let request_state = RequestState::None { input: input.0 };
+
+        Ok(ParquetRecordBatchStream {
+            schema: projected_schema,
+            decoder,
+            request_state,
         })
     }
 }
 
-/// Returns a [`ReaderFactory`] and an optional [`ParquetRecordBatchReader`] 
for the next row group
+/// State machine that tracks outstanding requests to fetch data
 ///
-/// Note: If all rows are filtered out in the row group (e.g by filters, limit 
or
-/// offset), returns `None` for the reader.
-type ReadResult<T> = Result<(ReaderFactory<T>, 
Option<ParquetRecordBatchReader>)>;
-
-/// [`ReaderFactory`] is used by [`ParquetRecordBatchStream`] to create
-/// [`ParquetRecordBatchReader`]
-struct ReaderFactory<T> {
-    metadata: Arc<ParquetMetaData>,
-
-    /// Top level parquet schema
-    fields: Option<Arc<ParquetField>>,
-
-    input: T,
-
-    /// Optional filter
-    filter: Option<RowFilter>,
-
-    /// Limit to apply to remaining row groups.  
-    limit: Option<usize>,
-
-    /// Offset to apply to the next
-    offset: Option<usize>,
-
-    /// Metrics
-    metrics: ArrowReaderMetrics,
-
-    /// Maximum size of the predicate cache
-    ///
-    /// See [`RowGroupCache`] for details.
-    max_predicate_cache_size: usize,
+/// The parameter `T` is the input, typically a `AsyncFileReader`
+enum RequestState<T> {
+    /// No outstanding requests
+    None {
+        input: T,
+    },
+    /// There is an outstanding request for data
+    Outstanding {
+        /// Ranges that have been requested
+        ranges: Vec<Range<u64>>,
+        /// Future that will resolve (input, requested_ranges)
+        ///
+        /// Note the future owns the reader while the request it outstanding
+        /// and returns it upon completion
+        future: BoxFuture<'static, Result<(T, Vec<Bytes>)>>,
+    },
+    Done,
 }
 
-impl<T> ReaderFactory<T>
+impl<T> RequestState<T>
 where
-    T: AsyncFileReader + Send,
+    T: AsyncFileReader + Unpin + Send + 'static,
 {
-    /// Reads the next row group with the provided `selection`, `projection` 
and `batch_size`
-    ///
-    /// Updates the `limit` and `offset` of the reader factory
-    ///
-    /// Note: this captures self so that the resulting future has a static 
lifetime
-    async fn read_row_group(
-        mut self,
-        row_group_idx: usize,
-        selection: Option<RowSelection>,
-        projection: ProjectionMask,
-        batch_size: usize,
-    ) -> ReadResult<T> {
-        // TODO: calling build_array multiple times is wasteful
-
-        let meta = self.metadata.row_group(row_group_idx);

Review Comment:
   All this code now exists as an explicit state machine in the push decoder



##########
parquet/src/arrow/push_decoder/mod.rs:
##########
@@ -268,7 +268,53 @@ impl ParquetPushDecoder {
     ///```
     pub fn try_decode(&mut self) -> Result<DecodeResult<RecordBatch>, 
ParquetError> {
         let current_state = std::mem::replace(&mut self.state, 
ParquetDecoderState::Finished);
-        let (new_state, decode_result) = current_state.try_transition()?;
+        let (new_state, decode_result) = current_state.try_next_batch()?;
+        self.state = new_state;
+        Ok(decode_result)
+    }
+
+    /// Attempt to return the next [`ParquetRecordBatchReader`] or return what 
data is needed

Review Comment:
   this is a new API on the ParquetPushDecoder that is needed to implement the 
existing `next_row_group` API: 
https://docs.rs/parquet/latest/parquet/arrow/async_reader/struct.ParquetRecordBatchStream.html#method.next_row_group



##########
parquet/src/arrow/async_reader/mod.rs:
##########
@@ -858,45 +653,33 @@ where
     /// - `Ok(Some(reader))` which holds all the data for the row group.
     pub async fn next_row_group(&mut self) -> 
Result<Option<ParquetRecordBatchReader>> {
         loop {
-            match &mut self.state {
-                StreamState::Decoding(_) | StreamState::Reading(_) => {
-                    return Err(ParquetError::General(
-                        "Cannot combine the use of next_row_group with the 
Stream API".to_string(),
-                    ));
-                }
-                StreamState::Init => {
-                    let row_group_idx = match self.row_groups.pop_front() {
-                        Some(idx) => idx,
-                        None => return Ok(None),
-                    };
-
-                    let row_count = 
self.metadata.row_group(row_group_idx).num_rows() as usize;
-
-                    let selection = self.selection.as_mut().map(|s| 
s.split_off(row_count));
-
-                    let reader_factory = 
self.reader_factory.take().expect("lost reader factory");
-
-                    let (reader_factory, maybe_reader) = reader_factory
-                        .read_row_group(
-                            row_group_idx,
-                            selection,
-                            self.projection.clone(),
-                            self.batch_size,
-                        )
-                        .await
-                        .inspect_err(|_| {
-                            self.state = StreamState::Error;
-                        })?;
-                    self.reader_factory = Some(reader_factory);
-
-                    if let Some(reader) = maybe_reader {
-                        return Ok(Some(reader));
-                    } else {
-                        // All rows skipped, read next row group
-                        continue;
+            let request_state = std::mem::replace(&mut self.request_state, 
RequestState::Done);
+            match request_state {
+                // No outstanding requests, proceed to setup next row group
+                RequestState::None { input } => {

Review Comment:
   This is now the core state machine of ParquetRecordBatchStreamBuilder, and I 
think it is fairly straightforward now



##########
parquet/src/arrow/async_reader/mod.rs:
##########
@@ -1724,98 +1487,6 @@ mod tests {
         assert_eq!(total_rows, 730);
     }
 
-    #[tokio::test]
-    #[allow(deprecated)]
-    async fn test_in_memory_row_group_sparse() {

Review Comment:
   This test was introduced in the following PR by @thinkharderdev 
   - https://github.com/apache/arrow-rs/pull/2473
   
   I believe it is meant to verify the PageIndex is used to prune IO, 
   
   The reason I propose deleting this test is:
   1. IO pruning based on PageIndex is covered in the newer `io` tests,  for 
example 
https://github.com/apache/arrow-rs/blob/2eabb595d20e691cf0c9c3ccf6a5e1b67472b07b/parquet/tests/arrow_reader/io/async_reader.rs#L119
   2. This test is in terms of non public APIs (the ReaderFactory and 
InMemoryRowGroup) which don't reflect the requests that are actually made (the 
ranges are coalesced, for example, for each column's pages)



##########
parquet/src/arrow/async_reader/mod.rs:
##########
@@ -796,35 +603,23 @@ impl<T> std::fmt::Debug for StreamState<T> {
 /// required, which is especially important for object stores, where IO 
operations
 /// have latencies in the hundreds of milliseconds
 ///
+/// See [`ParquetPushDecoderBuilder`] for an API with lower level control over
+/// buffering.
 ///
 /// [`Stream`]: https://docs.rs/futures/latest/futures/stream/trait.Stream.html
 pub struct ParquetRecordBatchStream<T> {

Review Comment:
   I am quite pleased with this -- `ParquetRecordBatchStreamBuilder` is now 
clearly separated into the IO handling piece `request_state` and the decoding 
piece, `decoder`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to