thinkharderdev commented on code in PR #2335:
URL: https://github.com/apache/arrow-rs/pull/2335#discussion_r939516218


##########
parquet/src/arrow/async_reader.rs:
##########
@@ -271,25 +304,122 @@ impl<T: AsyncFileReader> 
ParquetRecordBatchStreamBuilder<T> {
             None => (0..self.metadata.row_groups().len()).collect(),
         };
 
+        let reader = ReaderFactory {
+            input: self.input,
+            filter: self.filter,
+            metadata: self.metadata.clone(),
+            schema: self.schema.clone(),
+        };
+
         Ok(ParquetRecordBatchStream {
+            metadata: self.metadata,
+            batch_size: self.batch_size,
             row_groups,
             projection: self.projection,
-            batch_size: self.batch_size,
-            metadata: self.metadata,
+            selection: self.selection,
             schema: self.schema,
-            input: Some(self.input),
+            reader: Some(reader),
             state: StreamState::Init,
         })
     }
 }
 
+type ReadResult<T> = Result<(ReaderFactory<T>, 
Option<ParquetRecordBatchReader>)>;
+
+/// [`ReaderFactory`] is used by [`ParquetRecordBatchStream`] to create
+/// [`ParquetRecordBatchReader`]
+struct ReaderFactory<T> {
+    metadata: Arc<ParquetMetaData>,
+
+    schema: SchemaRef,
+
+    input: T,
+
+    filter: Option<RowFilter>,
+}
+
+impl<T> ReaderFactory<T>
+where
+    T: AsyncFileReader + Send,
+{
+    /// Reads the next row group with the provided `selection`, `projection` 
and `batch_size`
+    ///
+    /// Note: this captures self so that the resulting future has a static 
lifetime
+    async fn read_row_group(
+        mut self,
+        row_group_idx: usize,
+        mut selection: Option<RowSelection>,
+        projection: ProjectionMask,
+        batch_size: usize,
+    ) -> ReadResult<T> {
+        // TODO: calling build_array multiple times is wasteful
+        let selects_any = |selection: Option<&RowSelection>| {
+            selection.map(|x| x.selects_any()).unwrap_or(true)
+        };
+
+        let meta = self.metadata.row_group(row_group_idx);
+        let mut row_group = InMemoryRowGroup {
+            schema: meta.schema_descr_ptr(),
+            row_count: meta.num_rows() as usize,
+            column_chunks: vec![None; meta.columns().len()],
+        };
+
+        if let Some(filter) = self.filter.as_mut() {
+            for predicate in filter.predicates.iter_mut() {
+                if !selects_any(selection.as_ref()) {
+                    return Ok((self, None));
+                }
+
+                let predicate_projection = predicate.projection().clone();
+                row_group
+                    .fetch(
+                        &mut self.input,
+                        meta,
+                        &predicate_projection,
+                        selection.as_ref(),
+                    )
+                    .await?;
+
+                let array_reader = build_array_reader(
+                    self.schema.clone(),
+                    predicate_projection,
+                    &row_group,
+                )?;
+
+                selection = Some(evaluate_predicate(
+                    batch_size,
+                    array_reader,
+                    selection,
+                    predicate.as_mut(),
+                )?);
+            }
+        }
+
+        if !selects_any(selection.as_ref()) {
+            return Ok((self, None));
+        }
+
+        row_group
+            .fetch(&mut self.input, meta, &projection, selection.as_ref())
+            .await?;
+
+        let reader = ParquetRecordBatchReader::new(
+            batch_size,
+            build_array_reader(self.schema.clone(), projection, &row_group)?,
+            selection,
+        );

Review Comment:
   Yeah, I think in many cases the difference would be negligible but in the 
degenerate cases (lots of predicates, filters which don't do much filtering, 
etc) I think it could potentially add up. The reason I worry about it in 
general is that we would have to rely on the engine to determine which 
predicates to apply and in which order. And in a situation where all we have is 
row group metadata we don't have a ton to go on. 
   
   I took a crack at seeing what it might look like preserving the decoded 
arrays and came up with https://github.com/tustvold/arrow-rs/pull/24. It 
certainly involves a lot of array slicing and dicing but the complexity seems 
manageable and would help ensure that applying filters doesn't ever come with a 
significant performance cost. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to