tustvold commented on code in PR #2335:
URL: https://github.com/apache/arrow-rs/pull/2335#discussion_r939508363
##########
parquet/src/arrow/async_reader.rs:
##########
@@ -271,25 +304,122 @@ impl<T: AsyncFileReader>
ParquetRecordBatchStreamBuilder<T> {
None => (0..self.metadata.row_groups().len()).collect(),
};
+ let reader = ReaderFactory {
+ input: self.input,
+ filter: self.filter,
+ metadata: self.metadata.clone(),
+ schema: self.schema.clone(),
+ };
+
Ok(ParquetRecordBatchStream {
+ metadata: self.metadata,
+ batch_size: self.batch_size,
row_groups,
projection: self.projection,
- batch_size: self.batch_size,
- metadata: self.metadata,
+ selection: self.selection,
schema: self.schema,
- input: Some(self.input),
+ reader: Some(reader),
state: StreamState::Init,
})
}
}
+type ReadResult<T> = Result<(ReaderFactory<T>,
Option<ParquetRecordBatchReader>)>;
+
+/// [`ReaderFactory`] is used by [`ParquetRecordBatchStream`] to create
+/// [`ParquetRecordBatchReader`]
+struct ReaderFactory<T> {
+ metadata: Arc<ParquetMetaData>,
+
+ schema: SchemaRef,
+
+ input: T,
+
+ filter: Option<RowFilter>,
+}
+
+impl<T> ReaderFactory<T>
+where
+ T: AsyncFileReader + Send,
+{
+ /// Reads the next row group with the provided `selection`, `projection`
and `batch_size`
+ ///
+ /// Note: this captures self so that the resulting future has a static
lifetime
+ async fn read_row_group(
+ mut self,
+ row_group_idx: usize,
+ mut selection: Option<RowSelection>,
+ projection: ProjectionMask,
+ batch_size: usize,
+ ) -> ReadResult<T> {
+ // TODO: calling build_array multiple times is wasteful
+ let selects_any = |selection: Option<&RowSelection>| {
+ selection.map(|x| x.selects_any()).unwrap_or(true)
+ };
+
+ let meta = self.metadata.row_group(row_group_idx);
+ let mut row_group = InMemoryRowGroup {
+ schema: meta.schema_descr_ptr(),
+ row_count: meta.num_rows() as usize,
+ column_chunks: vec![None; meta.columns().len()],
+ };
+
+ if let Some(filter) = self.filter.as_mut() {
+ for predicate in filter.predicates.iter_mut() {
+ if !selects_any(selection.as_ref()) {
+ return Ok((self, None));
+ }
+
+ let predicate_projection = predicate.projection().clone();
+ row_group
+ .fetch(
+ &mut self.input,
+ meta,
+ &predicate_projection,
+ selection.as_ref(),
+ )
+ .await?;
+
+ let array_reader = build_array_reader(
+ self.schema.clone(),
+ predicate_projection,
+ &row_group,
+ )?;
+
+ selection = Some(evaluate_predicate(
+ batch_size,
+ array_reader,
+ selection,
+ predicate.as_mut(),
+ )?);
+ }
+ }
+
+ if !selects_any(selection.as_ref()) {
+ return Ok((self, None));
+ }
+
+ row_group
+ .fetch(&mut self.input, meta, &projection, selection.as_ref())
+ .await?;
+
+ let reader = ParquetRecordBatchReader::new(
+ batch_size,
+ build_array_reader(self.schema.clone(), projection, &row_group)?,
+ selection,
+ );
Review Comment:
If a column appears in multiple predicates and/or the final projection, it
will need to be decoded multiple times. I don't really see a way around this,
keeping the data around and doing `take + concat` adds significant complexity,
and it is unclear that it would necessarily be faster.
Eventually it might be possible to push the predicates down to operate
directly on the encoded data, this would eliminate this
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]