thinkharderdev commented on code in PR #2335:
URL: https://github.com/apache/arrow-rs/pull/2335#discussion_r939103004
##########
parquet/src/arrow/async_reader.rs:
##########
@@ -271,25 +304,122 @@ impl<T: AsyncFileReader>
ParquetRecordBatchStreamBuilder<T> {
None => (0..self.metadata.row_groups().len()).collect(),
};
+ let reader = ReaderFactory {
+ input: self.input,
+ filter: self.filter,
+ metadata: self.metadata.clone(),
+ schema: self.schema.clone(),
+ };
+
Ok(ParquetRecordBatchStream {
+ metadata: self.metadata,
+ batch_size: self.batch_size,
row_groups,
projection: self.projection,
- batch_size: self.batch_size,
- metadata: self.metadata,
+ selection: self.selection,
schema: self.schema,
- input: Some(self.input),
+ reader: Some(reader),
state: StreamState::Init,
})
}
}
+type ReadResult<T> = Result<(ReaderFactory<T>,
Option<ParquetRecordBatchReader>)>;
+
+/// [`ReaderFactory`] is used by [`ParquetRecordBatchStream`] to create
+/// [`ParquetRecordBatchReader`]
+struct ReaderFactory<T> {
+ metadata: Arc<ParquetMetaData>,
+
+ schema: SchemaRef,
+
+ input: T,
+
+ filter: Option<RowFilter>,
+}
+
+impl<T> ReaderFactory<T>
+where
+ T: AsyncFileReader + Send,
+{
+ /// Reads the next row group with the provided `selection`, `projection`
and `batch_size`
+ ///
+ /// Note: this captures self so that the resulting future has a static
lifetime
+ async fn read_row_group(
+ mut self,
+ row_group_idx: usize,
+ mut selection: Option<RowSelection>,
+ projection: ProjectionMask,
+ batch_size: usize,
+ ) -> ReadResult<T> {
+ // TODO: calling build_array multiple times is wasteful
+ let selects_any = |selection: Option<&RowSelection>| {
+ selection.map(|x| x.selects_any()).unwrap_or(true)
+ };
+
+ let meta = self.metadata.row_group(row_group_idx);
+ let mut row_group = InMemoryRowGroup {
+ schema: meta.schema_descr_ptr(),
+ row_count: meta.num_rows() as usize,
+ column_chunks: vec![None; meta.columns().len()],
+ };
+
+ if let Some(filter) = self.filter.as_mut() {
+ for predicate in filter.predicates.iter_mut() {
+ if !selects_any(selection.as_ref()) {
+ return Ok((self, None));
+ }
+
+ let predicate_projection = predicate.projection().clone();
+ row_group
+ .fetch(
+ &mut self.input,
+ meta,
+ &predicate_projection,
+ selection.as_ref(),
+ )
+ .await?;
+
+ let array_reader = build_array_reader(
+ self.schema.clone(),
+ predicate_projection,
+ &row_group,
+ )?;
+
+ selection = Some(evaluate_predicate(
+ batch_size,
+ array_reader,
+ selection,
+ predicate.as_mut(),
+ )?);
+ }
+ }
+
+ if !selects_any(selection.as_ref()) {
+ return Ok((self, None));
+ }
+
+ row_group
+ .fetch(&mut self.input, meta, &projection, selection.as_ref())
+ .await?;
+
+ let reader = ParquetRecordBatchReader::new(
+ batch_size,
+ build_array_reader(self.schema.clone(), projection, &row_group)?,
+ selection,
+ );
Review Comment:
This would require decoding the filter columns twice (or multiple times in
the case where we have multiple predicates) right?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]