alamb commented on code in PR #2335:
URL: https://github.com/apache/arrow-rs/pull/2335#discussion_r940678795


##########
parquet/src/arrow/async_reader.rs:
##########
@@ -271,25 +304,122 @@ impl<T: AsyncFileReader> 
ParquetRecordBatchStreamBuilder<T> {
             None => (0..self.metadata.row_groups().len()).collect(),
         };
 
+        let reader = ReaderFactory {
+            input: self.input,
+            filter: self.filter,
+            metadata: self.metadata.clone(),
+            schema: self.schema.clone(),
+        };
+
         Ok(ParquetRecordBatchStream {
+            metadata: self.metadata,
+            batch_size: self.batch_size,
             row_groups,
             projection: self.projection,
-            batch_size: self.batch_size,
-            metadata: self.metadata,
+            selection: self.selection,
             schema: self.schema,
-            input: Some(self.input),
+            reader: Some(reader),
             state: StreamState::Init,
         })
     }
 }
 
+type ReadResult<T> = Result<(ReaderFactory<T>, 
Option<ParquetRecordBatchReader>)>;
+
+/// [`ReaderFactory`] is used by [`ParquetRecordBatchStream`] to create
+/// [`ParquetRecordBatchReader`]
+struct ReaderFactory<T> {
+    metadata: Arc<ParquetMetaData>,
+
+    schema: SchemaRef,
+
+    input: T,
+
+    filter: Option<RowFilter>,
+}
+
+impl<T> ReaderFactory<T>
+where
+    T: AsyncFileReader + Send,
+{
+    /// Reads the next row group with the provided `selection`, `projection` 
and `batch_size`
+    ///
+    /// Note: this captures self so that the resulting future has a static 
lifetime
+    async fn read_row_group(
+        mut self,
+        row_group_idx: usize,
+        mut selection: Option<RowSelection>,
+        projection: ProjectionMask,
+        batch_size: usize,
+    ) -> ReadResult<T> {
+        // TODO: calling build_array multiple times is wasteful
+        let selects_any = |selection: Option<&RowSelection>| {
+            selection.map(|x| x.selects_any()).unwrap_or(true)
+        };
+
+        let meta = self.metadata.row_group(row_group_idx);
+        let mut row_group = InMemoryRowGroup {
+            schema: meta.schema_descr_ptr(),
+            row_count: meta.num_rows() as usize,
+            column_chunks: vec![None; meta.columns().len()],
+        };
+
+        if let Some(filter) = self.filter.as_mut() {
+            for predicate in filter.predicates.iter_mut() {
+                if !selects_any(selection.as_ref()) {
+                    return Ok((self, None));
+                }
+
+                let predicate_projection = predicate.projection().clone();
+                row_group
+                    .fetch(
+                        &mut self.input,
+                        meta,
+                        &predicate_projection,
+                        selection.as_ref(),
+                    )
+                    .await?;
+
+                let array_reader = build_array_reader(
+                    self.schema.clone(),
+                    predicate_projection,
+                    &row_group,
+                )?;
+
+                selection = Some(evaluate_predicate(
+                    batch_size,
+                    array_reader,
+                    selection,
+                    predicate.as_mut(),
+                )?);
+            }
+        }
+
+        if !selects_any(selection.as_ref()) {
+            return Ok((self, None));
+        }
+
+        row_group
+            .fetch(&mut self.input, meta, &projection, selection.as_ref())
+            .await?;
+
+        let reader = ParquetRecordBatchReader::new(
+            batch_size,
+            build_array_reader(self.schema.clone(), projection, &row_group)?,
+            selection,
+        );

Review Comment:
   > We went from being mostly CPU bound with parquet decoding to being mostly 
IO bound which means I expect there is even more room for improvement once we 
are using the selection and the page index to avoid IO altogether. That's all 
to say I'm super excited about this work and think it will be a huge step 
forward!
   
   That is terrific news 🎉 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to