thinkharderdev commented on code in PR #2335:
URL: https://github.com/apache/arrow-rs/pull/2335#discussion_r939540049


##########
parquet/src/arrow/async_reader.rs:
##########
@@ -271,25 +304,122 @@ impl<T: AsyncFileReader> 
ParquetRecordBatchStreamBuilder<T> {
             None => (0..self.metadata.row_groups().len()).collect(),
         };
 
+        let reader = ReaderFactory {
+            input: self.input,
+            filter: self.filter,
+            metadata: self.metadata.clone(),
+            schema: self.schema.clone(),
+        };
+
         Ok(ParquetRecordBatchStream {
+            metadata: self.metadata,
+            batch_size: self.batch_size,
             row_groups,
             projection: self.projection,
-            batch_size: self.batch_size,
-            metadata: self.metadata,
+            selection: self.selection,
             schema: self.schema,
-            input: Some(self.input),
+            reader: Some(reader),
             state: StreamState::Init,
         })
     }
 }
 
+type ReadResult<T> = Result<(ReaderFactory<T>, 
Option<ParquetRecordBatchReader>)>;
+
+/// [`ReaderFactory`] is used by [`ParquetRecordBatchStream`] to create
+/// [`ParquetRecordBatchReader`]
+struct ReaderFactory<T> {
+    metadata: Arc<ParquetMetaData>,
+
+    schema: SchemaRef,
+
+    input: T,
+
+    filter: Option<RowFilter>,
+}
+
+impl<T> ReaderFactory<T>
+where
+    T: AsyncFileReader + Send,
+{
+    /// Reads the next row group with the provided `selection`, `projection` 
and `batch_size`
+    ///
+    /// Note: this captures self so that the resulting future has a static 
lifetime
+    async fn read_row_group(
+        mut self,
+        row_group_idx: usize,
+        mut selection: Option<RowSelection>,
+        projection: ProjectionMask,
+        batch_size: usize,
+    ) -> ReadResult<T> {
+        // TODO: calling build_array multiple times is wasteful
+        let selects_any = |selection: Option<&RowSelection>| {
+            selection.map(|x| x.selects_any()).unwrap_or(true)
+        };
+
+        let meta = self.metadata.row_group(row_group_idx);
+        let mut row_group = InMemoryRowGroup {
+            schema: meta.schema_descr_ptr(),
+            row_count: meta.num_rows() as usize,
+            column_chunks: vec![None; meta.columns().len()],
+        };
+
+        if let Some(filter) = self.filter.as_mut() {
+            for predicate in filter.predicates.iter_mut() {
+                if !selects_any(selection.as_ref()) {
+                    return Ok((self, None));
+                }
+
+                let predicate_projection = predicate.projection().clone();
+                row_group
+                    .fetch(
+                        &mut self.input,
+                        meta,
+                        &predicate_projection,
+                        selection.as_ref(),
+                    )
+                    .await?;
+
+                let array_reader = build_array_reader(
+                    self.schema.clone(),
+                    predicate_projection,
+                    &row_group,
+                )?;
+
+                selection = Some(evaluate_predicate(
+                    batch_size,
+                    array_reader,
+                    selection,
+                    predicate.as_mut(),
+                )?);
+            }
+        }
+
+        if !selects_any(selection.as_ref()) {
+            return Ok((self, None));
+        }
+
+        row_group
+            .fetch(&mut self.input, meta, &projection, selection.as_ref())
+            .await?;
+
+        let reader = ParquetRecordBatchReader::new(
+            batch_size,
+            build_array_reader(self.schema.clone(), projection, &row_group)?,
+            selection,
+        );

Review Comment:
   > Conveniently, it seems like nothing in the API of this PR requires 
decoding multiple times, so I think we could also potentially implement the 
'use take rather than redundant decode' in a follow on PR as well.
   
   I agree. As @tustvold points out, we need to be careful about memory 
overhead so maybe the best course is to go with the current approach and tackle 
avoiding the redundant decoding in a follow up. I prototyped the DataFusion 
piece (based on my other draft PR but it should be roughly similar) to see how 
it affected our internal benchmarks and saw a roughly 50% improvement on 
queries with reasonably selective predicates. We went from being mostly CPU 
bound with parquet decoding to being mostly IO bound which means I expect there 
is even more room for improvement once we are using the selection and the page 
index to avoid IO altogether. That's all to say I'm super excited about this 
work and think it will be a huge step forward!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to