tustvold commented on code in PR #2335:
URL: https://github.com/apache/arrow-rs/pull/2335#discussion_r940088468
##########
parquet/src/arrow/async_reader.rs:
##########
@@ -271,25 +304,122 @@ impl<T: AsyncFileReader>
ParquetRecordBatchStreamBuilder<T> {
None => (0..self.metadata.row_groups().len()).collect(),
};
+ let reader = ReaderFactory {
+ input: self.input,
+ filter: self.filter,
+ metadata: self.metadata.clone(),
+ schema: self.schema.clone(),
+ };
+
Ok(ParquetRecordBatchStream {
+ metadata: self.metadata,
+ batch_size: self.batch_size,
row_groups,
projection: self.projection,
- batch_size: self.batch_size,
- metadata: self.metadata,
+ selection: self.selection,
schema: self.schema,
- input: Some(self.input),
+ reader: Some(reader),
state: StreamState::Init,
})
}
}
+type ReadResult<T> = Result<(ReaderFactory<T>,
Option<ParquetRecordBatchReader>)>;
+
+/// [`ReaderFactory`] is used by [`ParquetRecordBatchStream`] to create
+/// [`ParquetRecordBatchReader`]
+struct ReaderFactory<T> {
+ metadata: Arc<ParquetMetaData>,
+
+ schema: SchemaRef,
+
+ input: T,
+
+ filter: Option<RowFilter>,
+}
+
+impl<T> ReaderFactory<T>
+where
+ T: AsyncFileReader + Send,
+{
+ /// Reads the next row group with the provided `selection`, `projection`
and `batch_size`
+ ///
+ /// Note: this captures self so that the resulting future has a static
lifetime
+ async fn read_row_group(
+ mut self,
+ row_group_idx: usize,
+ mut selection: Option<RowSelection>,
+ projection: ProjectionMask,
+ batch_size: usize,
+ ) -> ReadResult<T> {
+ // TODO: calling build_array multiple times is wasteful
+ let selects_any = |selection: Option<&RowSelection>| {
+ selection.map(|x| x.selects_any()).unwrap_or(true)
+ };
+
+ let meta = self.metadata.row_group(row_group_idx);
+ let mut row_group = InMemoryRowGroup {
+ schema: meta.schema_descr_ptr(),
+ row_count: meta.num_rows() as usize,
+ column_chunks: vec![None; meta.columns().len()],
+ };
+
+ if let Some(filter) = self.filter.as_mut() {
+ for predicate in filter.predicates.iter_mut() {
+ if !selects_any(selection.as_ref()) {
+ return Ok((self, None));
+ }
+
+ let predicate_projection = predicate.projection().clone();
+ row_group
+ .fetch(
+ &mut self.input,
+ meta,
+ &predicate_projection,
+ selection.as_ref(),
+ )
+ .await?;
+
+ let array_reader = build_array_reader(
+ self.schema.clone(),
+ predicate_projection,
+ &row_group,
+ )?;
+
+ selection = Some(evaluate_predicate(
+ batch_size,
+ array_reader,
+ selection,
+ predicate.as_mut(),
+ )?);
+ }
+ }
+
+ if !selects_any(selection.as_ref()) {
+ return Ok((self, None));
+ }
+
+ row_group
+ .fetch(&mut self.input, meta, &projection, selection.as_ref())
+ .await?;
+
+ let reader = ParquetRecordBatchReader::new(
+ batch_size,
+ build_array_reader(self.schema.clone(), projection, &row_group)?,
+ selection,
+ );
Review Comment:
I will create a follow up ticket to investigate this :+1:
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]