This is an automated email from the ASF dual-hosted git repository.
github-bot pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new d5302768c9 Fix `elapsed_compute` metric for Parquet DataSourceExec
(#20767)
d5302768c9 is described below
commit d5302768c9cc0b0c79622985c847018bc9f5abbc
Author: Ernest Provo <[email protected]>
AuthorDate: Tue Mar 24 17:27:32 2026 -0400
Fix `elapsed_compute` metric for Parquet DataSourceExec (#20767)
## Which issue does this PR close?
Closes part of https://github.com/apache/datafusion/issues/18195 —
specifically the `elapsed_compute` baseline metric sub-item for Parquet
scans.
## Rationale
`EXPLAIN ANALYZE` on Parquet scans reports `elapsed_compute` values like
`14ns` for full table scans, which is misleading. The metric was never
being populated because no timer wrapped the per-batch compute work in
the Parquet scan path.
## What changes are included in this PR?
Follows the same pattern established in PR #18901 (CSV fix):
1. Added `BaselineMetrics` instantiation in `ParquetOpener::open()`
using the existing `metrics` and `partition_index` fields
2. Wrapped the per-batch stream `.map()` closure with an
`elapsed_compute` timer that measures projection, schema replacement,
and metrics copy work
Single file changed: `datafusion/datasource-parquet/src/opener.rs` (+7,
-3 lines)
## Are these changes tested?
- All 81 existing tests in `datafusion-datasource-parquet` pass
- The metric correctness is verified by observing realistic
`elapsed_compute` values in `EXPLAIN ANALYZE` output (no longer showing
nanosecond-level values for real scans)
- Per maintainer guidance from @2010YOUY01: "Testing if we have the time
measured correct is tricky, I don't think there is a good way to do it.
But for a large parquet file scan, several nanoseconds is definitely not
reasonable."
## Are there any user-facing changes?
`EXPLAIN ANALYZE` output for Parquet scans will now show accurate
`elapsed_compute` values reflecting actual CPU time spent on per-batch
processing.
Co-authored-by: Andrew Lamb <[email protected]>
---
datafusion/datasource-parquet/src/opener.rs | 10 ++++++++--
.../sqllogictest/test_files/dynamic_filter_pushdown_config.slt | 2 +-
2 files changed, 9 insertions(+), 3 deletions(-)
diff --git a/datafusion/datasource-parquet/src/opener.rs
b/datafusion/datasource-parquet/src/opener.rs
index bb330c3f4c..b960d8e6a0 100644
--- a/datafusion/datasource-parquet/src/opener.rs
+++ b/datafusion/datasource-parquet/src/opener.rs
@@ -48,7 +48,7 @@ use datafusion_physical_expr_common::physical_expr::{
PhysicalExpr, is_dynamic_physical_expr,
};
use datafusion_physical_plan::metrics::{
- Count, ExecutionPlanMetricsSet, Gauge, MetricBuilder, PruningMetrics,
+ BaselineMetrics, Count, ExecutionPlanMetricsSet, Gauge, MetricBuilder,
PruningMetrics,
};
use datafusion_pruning::{FilePruner, PruningPredicate,
build_pruning_predicate};
@@ -135,6 +135,7 @@ impl FileOpener for ParquetOpener {
let file_name = file_location.to_string();
let file_metrics =
ParquetFileMetrics::new(self.partition_index, &file_name,
&self.metrics);
+ let baseline_metrics = BaselineMetrics::new(&self.metrics,
self.partition_index);
let metadata_size_hint = partitioned_file
.metadata_size_hint
@@ -605,6 +606,7 @@ impl FileOpener for ParquetOpener {
arrow_reader_metrics,
predicate_cache_inner_records,
predicate_cache_records,
+ baseline_metrics,
},
|mut state| async move {
let result = state.transition().await;
@@ -646,6 +648,7 @@ struct PushDecoderStreamState {
arrow_reader_metrics: ArrowReaderMetrics,
predicate_cache_inner_records: Gauge,
predicate_cache_records: Gauge,
+ baseline_metrics: BaselineMetrics,
}
impl PushDecoderStreamState {
@@ -671,8 +674,11 @@ impl PushDecoderStreamState {
}
}
Ok(DecodeResult::Data(batch)) => {
+ let mut timer =
self.baseline_metrics.elapsed_compute().timer();
self.copy_arrow_reader_metrics();
- return Some(self.project_batch(&batch));
+ let result = self.project_batch(&batch);
+ timer.stop();
+ return Some(result);
}
Ok(DecodeResult::Finished) => {
return None;
diff --git
a/datafusion/sqllogictest/test_files/dynamic_filter_pushdown_config.slt
b/datafusion/sqllogictest/test_files/dynamic_filter_pushdown_config.slt
index 5a1058b8a2..b0d9fceeff 100644
--- a/datafusion/sqllogictest/test_files/dynamic_filter_pushdown_config.slt
+++ b/datafusion/sqllogictest/test_files/dynamic_filter_pushdown_config.slt
@@ -104,7 +104,7 @@ Plan with Metrics
03)----ProjectionExec: expr=[id@0 as id, value@1 as v, value@1 + id@0 as
name], metrics=[output_rows=10, <slt:ignore>]
04)------FilterExec: value@1 > 3, metrics=[output_rows=10, <slt:ignore>,
selectivity=100% (10/10)]
05)--------RepartitionExec: partitioning=RoundRobinBatch(4),
input_partitions=1, metrics=[output_rows=10, <slt:ignore>]
-06)----------DataSourceExec: file_groups={1 group:
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/dynamic_filter_pushdown_config/test_data.parquet]]},
projection=[id, value], file_type=parquet, predicate=value@1 > 3 AND
DynamicFilter [ value@1 IS NULL OR value@1 > 800 ],
pruning_predicate=value_null_count@1 != row_count@2 AND value_max@0 > 3 AND
(value_null_count@1 > 0 OR value_null_count@1 != row_count@2 AND value_max@0 >
800), required_guarantees=[], metrics=[output_rows=1 [...]
+06)----------DataSourceExec: file_groups={1 group:
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/dynamic_filter_pushdown_config/test_data.parquet]]},
projection=[id, value], file_type=parquet, predicate=value@1 > 3 AND
DynamicFilter [ value@1 IS NULL OR value@1 > 800 ],
pruning_predicate=value_null_count@1 != row_count@2 AND value_max@0 > 3 AND
(value_null_count@1 > 0 OR value_null_count@1 != row_count@2 AND value_max@0 >
800), required_guarantees=[], metrics=[output_rows=1 [...]
statement ok
set datafusion.explain.analyze_level = dev;
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]