Dandandan commented on code in PR #20839:
URL: https://github.com/apache/datafusion/pull/20839#discussion_r2910766140
##########
datafusion/datasource-parquet/src/opener.rs:
##########
@@ -617,57 +635,76 @@ impl FileOpener for ParquetOpener {
file_metrics.predicate_cache_inner_records.clone();
let predicate_cache_records =
file_metrics.predicate_cache_records.clone();
- let stream_schema = Arc::clone(stream.schema());
- // Check if we need to replace the schema to handle things like
differing nullability or metadata.
- // See note below about file vs. output schema.
- let replace_schema = !stream_schema.eq(&output_schema);
-
// Rebase column indices to match the narrowed stream schema.
// The projection expressions have indices based on
physical_file_schema,
// but the stream only contains the columns selected by the
ProjectionMask.
+ let stream_schema =
Arc::new(physical_file_schema.project(&indices)?);
+ let replace_schema = stream_schema != output_schema;
let projection = projection
.try_map_exprs(|expr| reassign_expr_columns(expr,
&stream_schema))?;
-
let projector = projection.make_projector(&stream_schema)?;
-
- let stream = stream.map_err(DataFusionError::from).map(move |b| {
- b.and_then(|mut b| {
- copy_arrow_reader_metrics(
- &arrow_reader_metrics,
- &predicate_cache_inner_records,
- &predicate_cache_records,
- );
- b = projector.project_batch(&b)?;
- if replace_schema {
- // Ensure the output batch has the expected schema.
- // This handles things like schema level and field
level metadata, which may not be present
- // in the physical file schema.
- // It is also possible for nullability to differ; some
writers create files with
- // OPTIONAL fields even when there are no nulls in the
data.
- // In these cases it may make sense for the logical
schema to be `NOT NULL`.
- // RecordBatch::try_new_with_options checks that if
the schema is NOT NULL
- // the array cannot contain nulls, amongst other
checks.
- let (_stream_schema, arrays, num_rows) =
b.into_parts();
- let options =
-
RecordBatchOptions::new().with_row_count(Some(num_rows));
- RecordBatch::try_new_with_options(
- Arc::clone(&output_schema),
- arrays,
- &options,
- )
- .map_err(Into::into)
- } else {
- Ok(b)
+ let stream = futures::stream::unfold(
+ PushDecoderStreamState {
+ decoder,
+ reader: async_file_reader,
+ projector,
+ output_schema,
+ replace_schema,
+ arrow_reader_metrics,
+ predicate_cache_inner_records,
+ predicate_cache_records,
+ },
+ |mut state| async move {
+ loop {
+ match state.decoder.try_decode() {
+ Ok(DecodeResult::NeedsData(ranges)) => {
+ match
state.reader.get_byte_ranges(ranges.clone()).await {
Review Comment:
It seems slightly faster to coalesce adjacent ranges (even for local
storage) to remove some IO requests, mainly beneficial for TPC-DS it seems (see
https://github.com/apache/datafusion/pull/20839#issuecomment-4030259338), and
somehow for tpch_mem as well (I guess more batches will be `batch_size` sized).
But I kept it out of this diff.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]