alamb commented on code in PR #8159:
URL: https://github.com/apache/arrow-rs/pull/8159#discussion_r2492146121


##########
parquet/src/arrow/async_reader/mod.rs:
##########
@@ -483,300 +476,114 @@ impl<T: AsyncFileReader + Send + 'static> 
ParquetRecordBatchStreamBuilder<T> {
     ///
     /// See examples on [`ParquetRecordBatchStreamBuilder::new`]
     pub fn build(self) -> Result<ParquetRecordBatchStream<T>> {
-        let num_row_groups = self.metadata.row_groups().len();
-
-        let row_groups = match self.row_groups {
-            Some(row_groups) => {
-                if let Some(col) = row_groups.iter().find(|x| **x >= 
num_row_groups) {
-                    return Err(general_err!(
-                        "row group {} out of bounds 0..{}",
-                        col,
-                        num_row_groups
-                    ));
-                }
-                row_groups.into()
-            }
-            None => (0..self.metadata.row_groups().len()).collect(),
-        };
-
-        // Try to avoid allocate large buffer
-        let batch_size = self
-            .batch_size
-            .min(self.metadata.file_metadata().num_rows() as usize);
-        let reader_factory = ReaderFactory {
-            input: self.input.0,
-            filter: self.filter,
-            metadata: self.metadata.clone(),
-            fields: self.fields,
-            limit: self.limit,
-            offset: self.offset,
-            metrics: self.metrics,
-            max_predicate_cache_size: self.max_predicate_cache_size,
-        };
+        let Self {
+            input,
+            metadata,
+            schema,
+            fields,
+            batch_size,
+            row_groups,
+            projection,
+            filter,
+            selection,
+            limit,
+            offset,
+            metrics,
+            max_predicate_cache_size,
+        } = self;
 
         // Ensure schema of ParquetRecordBatchStream respects projection, and 
does
         // not store metadata (same as for ParquetRecordBatchReader and 
emitted RecordBatches)
-        let projected_fields = match reader_factory.fields.as_deref().map(|pf| 
&pf.arrow_type) {
-            Some(DataType::Struct(fields)) => {
-                fields.filter_leaves(|idx, _| 
self.projection.leaf_included(idx))
-            }
-            None => Fields::empty(),
-            _ => unreachable!("Must be Struct for root type"),
-        };
-        let schema = Arc::new(Schema::new(projected_fields));
-
-        Ok(ParquetRecordBatchStream {
-            metadata: self.metadata,
+        let projected_fields = schema
+            .fields
+            .filter_leaves(|idx, _| projection.leaf_included(idx));
+        let projected_schema = Arc::new(Schema::new(projected_fields));
+
+        let decoder = ParquetPushDecoderBuilder {
+            // Async reader doesn't know the overall size of the input, but it
+            // is not required for decoding as we will already have the 
metadata
+            input: 0,

Review Comment:
   I actually removed the u64 from the push decoder builder and I think the 
code is much nicer now.
   - dd683bdcd4
   
   Thank you for the suggestion @vustef 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to