alamb commented on code in PR #20839:
URL: https://github.com/apache/datafusion/pull/20839#discussion_r2911079109
##########
datafusion/datasource-parquet/src/opener.rs:
##########
@@ -617,57 +635,76 @@ impl FileOpener for ParquetOpener {
file_metrics.predicate_cache_inner_records.clone();
let predicate_cache_records =
file_metrics.predicate_cache_records.clone();
- let stream_schema = Arc::clone(stream.schema());
- // Check if we need to replace the schema to handle things like
differing nullability or metadata.
- // See note below about file vs. output schema.
- let replace_schema = !stream_schema.eq(&output_schema);
-
// Rebase column indices to match the narrowed stream schema.
// The projection expressions have indices based on
physical_file_schema,
// but the stream only contains the columns selected by the
ProjectionMask.
+ let stream_schema =
Arc::new(physical_file_schema.project(&indices)?);
+ let replace_schema = stream_schema != output_schema;
let projection = projection
.try_map_exprs(|expr| reassign_expr_columns(expr,
&stream_schema))?;
-
let projector = projection.make_projector(&stream_schema)?;
-
- let stream = stream.map_err(DataFusionError::from).map(move |b| {
- b.and_then(|mut b| {
- copy_arrow_reader_metrics(
- &arrow_reader_metrics,
- &predicate_cache_inner_records,
- &predicate_cache_records,
- );
- b = projector.project_batch(&b)?;
- if replace_schema {
- // Ensure the output batch has the expected schema.
- // This handles things like schema level and field
level metadata, which may not be present
- // in the physical file schema.
- // It is also possible for nullability to differ; some
writers create files with
- // OPTIONAL fields even when there are no nulls in the
data.
- // In these cases it may make sense for the logical
schema to be `NOT NULL`.
- // RecordBatch::try_new_with_options checks that if
the schema is NOT NULL
- // the array cannot contain nulls, amongst other
checks.
- let (_stream_schema, arrays, num_rows) =
b.into_parts();
- let options =
-
RecordBatchOptions::new().with_row_count(Some(num_rows));
- RecordBatch::try_new_with_options(
- Arc::clone(&output_schema),
- arrays,
- &options,
- )
- .map_err(Into::into)
- } else {
- Ok(b)
+ let stream = futures::stream::unfold(
+ PushDecoderStreamState {
+ decoder,
+ reader: async_file_reader,
+ projector,
+ output_schema,
+ replace_schema,
+ arrow_reader_metrics,
+ predicate_cache_inner_records,
+ predicate_cache_records,
+ },
+ |mut state| async move {
Review Comment:
stylistically, I think it would be easier to understand (and would be less
indented) this code if it were a method on `PushDecoderStreamState` rather than
an inline clousre
Maybe something like
```rust
impl PushDecoderStreamState {
...
fn transition(self) -> Result<Option<Self>> {...}
}
```
##########
datafusion/datasource-parquet/src/opener.rs:
##########
@@ -679,6 +716,33 @@ impl FileOpener for ParquetOpener {
}
}
+struct PushDecoderStreamState {
Review Comment:
I think it would be good to add explanation of what this is -- basically it
is tracking
(Also can we please avoid the inline `use` qualifications and use module
level `use` above instead) ? That is entirely to help human readers
##########
datafusion/datasource-parquet/src/opener.rs:
##########
@@ -524,11 +475,27 @@ impl FileOpener for ParquetOpener {
}
if enable_bloom_filter && !row_groups.is_empty() {
+ // Use the existing reader for bloom filter I/O;
+ // replace with a fresh reader for decoding below.
+ let bf_reader = std::mem::replace(
+ &mut async_file_reader,
+ parquet_file_reader_factory.create_reader(
+ partition_index,
+ partitioned_file.clone(),
+ metadata_size_hint,
+ &metrics,
+ )?,
+ );
+ let mut bf_builder =
+ ParquetRecordBatchStreamBuilder::new_with_metadata(
Review Comment:
It is unfortunate that we need to create a new builder simply to read out
the bloom filters (seems like because `prune_by_bloom_filters` calls
`ParquetRecordBatchStreamBuilder::get_row_group_column_bloom_filter`
It seems like maybe upstream we should move the bloom filter reading into
the `ParquetMetadataDecoder` 🤔 (as a follow on PR / cleanup)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]