alamb commented on code in PR #20839:
URL: https://github.com/apache/datafusion/pull/20839#discussion_r2911079109


##########
datafusion/datasource-parquet/src/opener.rs:
##########
@@ -617,57 +635,76 @@ impl FileOpener for ParquetOpener {
                 file_metrics.predicate_cache_inner_records.clone();
             let predicate_cache_records = 
file_metrics.predicate_cache_records.clone();
 
-            let stream_schema = Arc::clone(stream.schema());
-            // Check if we need to replace the schema to handle things like 
differing nullability or metadata.
-            // See note below about file vs. output schema.
-            let replace_schema = !stream_schema.eq(&output_schema);
-
             // Rebase column indices to match the narrowed stream schema.
             // The projection expressions have indices based on 
physical_file_schema,
             // but the stream only contains the columns selected by the 
ProjectionMask.
+            let stream_schema = 
Arc::new(physical_file_schema.project(&indices)?);
+            let replace_schema = stream_schema != output_schema;
             let projection = projection
                 .try_map_exprs(|expr| reassign_expr_columns(expr, 
&stream_schema))?;
-
             let projector = projection.make_projector(&stream_schema)?;
-
-            let stream = stream.map_err(DataFusionError::from).map(move |b| {
-                b.and_then(|mut b| {
-                    copy_arrow_reader_metrics(
-                        &arrow_reader_metrics,
-                        &predicate_cache_inner_records,
-                        &predicate_cache_records,
-                    );
-                    b = projector.project_batch(&b)?;
-                    if replace_schema {
-                        // Ensure the output batch has the expected schema.
-                        // This handles things like schema level and field 
level metadata, which may not be present
-                        // in the physical file schema.
-                        // It is also possible for nullability to differ; some 
writers create files with
-                        // OPTIONAL fields even when there are no nulls in the 
data.
-                        // In these cases it may make sense for the logical 
schema to be `NOT NULL`.
-                        // RecordBatch::try_new_with_options checks that if 
the schema is NOT NULL
-                        // the array cannot contain nulls, amongst other 
checks.
-                        let (_stream_schema, arrays, num_rows) = 
b.into_parts();
-                        let options =
-                            
RecordBatchOptions::new().with_row_count(Some(num_rows));
-                        RecordBatch::try_new_with_options(
-                            Arc::clone(&output_schema),
-                            arrays,
-                            &options,
-                        )
-                        .map_err(Into::into)
-                    } else {
-                        Ok(b)
+            let stream = futures::stream::unfold(
+                PushDecoderStreamState {
+                    decoder,
+                    reader: async_file_reader,
+                    projector,
+                    output_schema,
+                    replace_schema,
+                    arrow_reader_metrics,
+                    predicate_cache_inner_records,
+                    predicate_cache_records,
+                },
+                |mut state| async move {

Review Comment:
   stylistically, I think it would be easier to understand (and would be less 
indented) this code if it were a method on `PushDecoderStreamState` rather than 
an inline clousre
   
   Maybe something like
   ```rust
   impl PushDecoderStreamState {
   ...
     fn transition(self) -> Result<Option<Self>> {...}
   }
   ```
   



##########
datafusion/datasource-parquet/src/opener.rs:
##########
@@ -679,6 +716,33 @@ impl FileOpener for ParquetOpener {
     }
 }
 
+struct PushDecoderStreamState {

Review Comment:
   I think it would be good to add explanation of what this is -- basically it 
is tracking
   
   (Also can we please avoid the inline `use` qualifications and use module 
level `use` above instead) ? That is entirely to help human readers



##########
datafusion/datasource-parquet/src/opener.rs:
##########
@@ -524,11 +475,27 @@ impl FileOpener for ParquetOpener {
                 }
 
                 if enable_bloom_filter && !row_groups.is_empty() {
+                    // Use the existing reader for bloom filter I/O;
+                    // replace with a fresh reader for decoding below.
+                    let bf_reader = std::mem::replace(
+                        &mut async_file_reader,
+                        parquet_file_reader_factory.create_reader(
+                            partition_index,
+                            partitioned_file.clone(),
+                            metadata_size_hint,
+                            &metrics,
+                        )?,
+                    );
+                    let mut bf_builder =
+                        ParquetRecordBatchStreamBuilder::new_with_metadata(

Review Comment:
   It is unfortunate that we need to create a new builder simply to read out 
the bloom filters (seems like because `prune_by_bloom_filters` calls 
`ParquetRecordBatchStreamBuilder::get_row_group_column_bloom_filter`
   
   It seems like maybe upstream we should move the bloom filter reading into 
the `ParquetMetadataDecoder` 🤔 (as a follow on PR / cleanup)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to