This is an automated email from the ASF dual-hosted git repository.

github-bot pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new d5302768c9 Fix `elapsed_compute` metric for Parquet DataSourceExec 
(#20767)
d5302768c9 is described below

commit d5302768c9cc0b0c79622985c847018bc9f5abbc
Author: Ernest Provo <[email protected]>
AuthorDate: Tue Mar 24 17:27:32 2026 -0400

    Fix `elapsed_compute` metric for Parquet DataSourceExec (#20767)
    
    ## Which issue does this PR close?
    
    Closes part of https://github.com/apache/datafusion/issues/18195 —
    specifically the `elapsed_compute` baseline metric sub-item for Parquet
    scans.
    
    ## Rationale
    
    `EXPLAIN ANALYZE` on Parquet scans reports `elapsed_compute` values like
    `14ns` for full table scans, which is misleading. The metric was never
    being populated because no timer wrapped the per-batch compute work in
    the Parquet scan path.
    
    ## What changes are included in this PR?
    
    Follows the same pattern established in PR #18901 (CSV fix):
    
    1. Added `BaselineMetrics` instantiation in `ParquetOpener::open()`
    using the existing `metrics` and `partition_index` fields
    2. Wrapped the per-batch stream `.map()` closure with an
    `elapsed_compute` timer that measures projection, schema replacement,
    and metrics copy work
    
    Single file changed: `datafusion/datasource-parquet/src/opener.rs` (+7,
    -3 lines)
    
    ## Are these changes tested?
    
    - All 81 existing tests in `datafusion-datasource-parquet` pass
    - The metric correctness is verified by observing realistic
    `elapsed_compute` values in `EXPLAIN ANALYZE` output (no longer showing
    nanosecond-level values for real scans)
    - Per maintainer guidance from @2010YOUY01: "Testing if we have the time
    measured correct is tricky, I don't think there is a good way to do it.
    But for a large parquet file scan, several nanoseconds is definitely not
    reasonable."
    
    ## Are there any user-facing changes?
    
    `EXPLAIN ANALYZE` output for Parquet scans will now show accurate
    `elapsed_compute` values reflecting actual CPU time spent on per-batch
    processing.
    
    Co-authored-by: Andrew Lamb <[email protected]>
---
 datafusion/datasource-parquet/src/opener.rs                    | 10 ++++++++--
 .../sqllogictest/test_files/dynamic_filter_pushdown_config.slt |  2 +-
 2 files changed, 9 insertions(+), 3 deletions(-)

diff --git a/datafusion/datasource-parquet/src/opener.rs 
b/datafusion/datasource-parquet/src/opener.rs
index bb330c3f4c..b960d8e6a0 100644
--- a/datafusion/datasource-parquet/src/opener.rs
+++ b/datafusion/datasource-parquet/src/opener.rs
@@ -48,7 +48,7 @@ use datafusion_physical_expr_common::physical_expr::{
     PhysicalExpr, is_dynamic_physical_expr,
 };
 use datafusion_physical_plan::metrics::{
-    Count, ExecutionPlanMetricsSet, Gauge, MetricBuilder, PruningMetrics,
+    BaselineMetrics, Count, ExecutionPlanMetricsSet, Gauge, MetricBuilder, 
PruningMetrics,
 };
 use datafusion_pruning::{FilePruner, PruningPredicate, 
build_pruning_predicate};
 
@@ -135,6 +135,7 @@ impl FileOpener for ParquetOpener {
         let file_name = file_location.to_string();
         let file_metrics =
             ParquetFileMetrics::new(self.partition_index, &file_name, 
&self.metrics);
+        let baseline_metrics = BaselineMetrics::new(&self.metrics, 
self.partition_index);
 
         let metadata_size_hint = partitioned_file
             .metadata_size_hint
@@ -605,6 +606,7 @@ impl FileOpener for ParquetOpener {
                     arrow_reader_metrics,
                     predicate_cache_inner_records,
                     predicate_cache_records,
+                    baseline_metrics,
                 },
                 |mut state| async move {
                     let result = state.transition().await;
@@ -646,6 +648,7 @@ struct PushDecoderStreamState {
     arrow_reader_metrics: ArrowReaderMetrics,
     predicate_cache_inner_records: Gauge,
     predicate_cache_records: Gauge,
+    baseline_metrics: BaselineMetrics,
 }
 
 impl PushDecoderStreamState {
@@ -671,8 +674,11 @@ impl PushDecoderStreamState {
                     }
                 }
                 Ok(DecodeResult::Data(batch)) => {
+                    let mut timer = 
self.baseline_metrics.elapsed_compute().timer();
                     self.copy_arrow_reader_metrics();
-                    return Some(self.project_batch(&batch));
+                    let result = self.project_batch(&batch);
+                    timer.stop();
+                    return Some(result);
                 }
                 Ok(DecodeResult::Finished) => {
                     return None;
diff --git 
a/datafusion/sqllogictest/test_files/dynamic_filter_pushdown_config.slt 
b/datafusion/sqllogictest/test_files/dynamic_filter_pushdown_config.slt
index 5a1058b8a2..b0d9fceeff 100644
--- a/datafusion/sqllogictest/test_files/dynamic_filter_pushdown_config.slt
+++ b/datafusion/sqllogictest/test_files/dynamic_filter_pushdown_config.slt
@@ -104,7 +104,7 @@ Plan with Metrics
 03)----ProjectionExec: expr=[id@0 as id, value@1 as v, value@1 + id@0 as 
name], metrics=[output_rows=10, <slt:ignore>]
 04)------FilterExec: value@1 > 3, metrics=[output_rows=10, <slt:ignore>, 
selectivity=100% (10/10)]
 05)--------RepartitionExec: partitioning=RoundRobinBatch(4), 
input_partitions=1, metrics=[output_rows=10, <slt:ignore>]
-06)----------DataSourceExec: file_groups={1 group: 
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/dynamic_filter_pushdown_config/test_data.parquet]]},
 projection=[id, value], file_type=parquet, predicate=value@1 > 3 AND 
DynamicFilter [ value@1 IS NULL OR value@1 > 800 ], 
pruning_predicate=value_null_count@1 != row_count@2 AND value_max@0 > 3 AND 
(value_null_count@1 > 0 OR value_null_count@1 != row_count@2 AND value_max@0 > 
800), required_guarantees=[], metrics=[output_rows=1 [...]
+06)----------DataSourceExec: file_groups={1 group: 
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/dynamic_filter_pushdown_config/test_data.parquet]]},
 projection=[id, value], file_type=parquet, predicate=value@1 > 3 AND 
DynamicFilter [ value@1 IS NULL OR value@1 > 800 ], 
pruning_predicate=value_null_count@1 != row_count@2 AND value_max@0 > 3 AND 
(value_null_count@1 > 0 OR value_null_count@1 != row_count@2 AND value_max@0 > 
800), required_guarantees=[], metrics=[output_rows=1 [...]
 
 statement ok
 set datafusion.explain.analyze_level = dev;


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to