This is an automated email from the ASF dual-hosted git repository.

github-bot pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new 073174b034 feat: Show the number of matched Parquet pages in 
`DataSourceExec` (#19977)
073174b034 is described below

commit 073174b0340a59588d59ca057801c0904166929f
Author: Nuno Faria <[email protected]>
AuthorDate: Tue Jan 27 06:03:41 2026 +0000

    feat: Show the number of matched Parquet pages in `DataSourceExec` (#19977)
    
    ## Which issue does this PR close?
    
    - Closes #19875.
    
    ## Rationale for this change
    
    Show the number of matched (and pruned) pages in the explain analyze
    plan to help make decisions about file optimization.
    
    Example:
    
    ```sql
    DataSourceExec: ..., metrics=[
      ...
      page_index_rows_pruned=1.00 K total → 100 matched,
      page_index_pages_pruned=100 total → 10 matched,
      ...
    ]
    ```
    
    ## What changes are included in this PR?
    
    - Added `page_index_pages_pruned` metric to DataSourceExec.
    - Updated and extended existing tests.
    
    ## Are these changes tested?
    
    Yes.
    
    ## Are there any user-facing changes?
    
    New metric in the explain plans.
---
 .../core/src/datasource/physical_plan/parquet.rs   | 48 ++++++++++++++++++----
 datafusion/core/tests/sql/explain_analyze.rs       |  5 ++-
 datafusion/datasource-parquet/src/metrics.rs       |  8 ++++
 datafusion/datasource-parquet/src/page_filter.rs   | 25 +++++++++--
 .../physical-expr-common/src/metrics/value.rs      | 27 ++++++------
 .../test_files/dynamic_filter_pushdown_config.slt  |  2 +-
 .../sqllogictest/test_files/limit_pruning.slt      |  4 +-
 docs/source/user-guide/explain-usage.md            |  1 +
 8 files changed, 91 insertions(+), 29 deletions(-)

diff --git a/datafusion/core/src/datasource/physical_plan/parquet.rs 
b/datafusion/core/src/datasource/physical_plan/parquet.rs
index ce2b05e6d3..9b4733dbcc 100644
--- a/datafusion/core/src/datasource/physical_plan/parquet.rs
+++ b/datafusion/core/src/datasource/physical_plan/parquet.rs
@@ -995,6 +995,7 @@ mod tests {
         assert_eq!(read, 1, "Expected 1 rows to match the predicate");
         assert_eq!(get_value(&metrics, "row_groups_pruned_statistics"), 0);
         assert_eq!(get_value(&metrics, "page_index_rows_pruned"), 2);
+        assert_eq!(get_value(&metrics, "page_index_pages_pruned"), 1);
         assert_eq!(get_value(&metrics, "pushdown_rows_pruned"), 1);
         // If we filter with a value that is completely out of the range of 
the data
         // we prune at the row group level.
@@ -1168,10 +1169,16 @@ mod tests {
         // There are 4 rows pruned in each of batch2, batch3, and
         // batch4 for a total of 12. batch1 had no pruning as c2 was
         // filled in as null
-        let (page_index_pruned, page_index_matched) =
+        let (page_index_rows_pruned, page_index_rows_matched) =
             get_pruning_metric(&metrics, "page_index_rows_pruned");
-        assert_eq!(page_index_pruned, 12);
-        assert_eq!(page_index_matched, 6);
+        assert_eq!(page_index_rows_pruned, 12);
+        assert_eq!(page_index_rows_matched, 6);
+
+        // each page has 2 rows, so the num of pages is 1/2 the number of rows
+        let (page_index_pages_pruned, page_index_pages_matched) =
+            get_pruning_metric(&metrics, "page_index_pages_pruned");
+        assert_eq!(page_index_pages_pruned, 6);
+        assert_eq!(page_index_pages_matched, 3);
     }
 
     #[tokio::test]
@@ -1734,6 +1741,7 @@ mod tests {
             Some(3),
             Some(4),
             Some(5),
+            Some(6), // last page with only one row
         ]));
         let batch1 = create_batch(vec![("int", c1.clone())]);
 
@@ -1742,7 +1750,7 @@ mod tests {
         let rt = RoundTrip::new()
             .with_predicate(filter)
             .with_page_index_predicate()
-            .round_trip(vec![batch1])
+            .round_trip(vec![batch1.clone()])
             .await;
 
         let metrics = rt.parquet_exec.metrics().unwrap();
@@ -1755,14 +1763,40 @@ mod tests {
         | 5   |
         +-----+
         ");
-        let (page_index_pruned, page_index_matched) =
+        let (page_index_rows_pruned, page_index_rows_matched) =
             get_pruning_metric(&metrics, "page_index_rows_pruned");
-        assert_eq!(page_index_pruned, 4);
-        assert_eq!(page_index_matched, 2);
+        assert_eq!(page_index_rows_pruned, 5);
+        assert_eq!(page_index_rows_matched, 2);
         assert!(
             get_value(&metrics, "page_index_eval_time") > 0,
             "no eval time in metrics: {metrics:#?}"
         );
+
+        // each page has 2 rows, so the num of pages is 1/2 the number of rows
+        let (page_index_pages_pruned, page_index_pages_matched) =
+            get_pruning_metric(&metrics, "page_index_pages_pruned");
+        assert_eq!(page_index_pages_pruned, 3);
+        assert_eq!(page_index_pages_matched, 1);
+
+        // test with a filter that matches the page with one row
+        let filter = col("int").eq(lit(6_i32));
+        let rt = RoundTrip::new()
+            .with_predicate(filter)
+            .with_page_index_predicate()
+            .round_trip(vec![batch1])
+            .await;
+
+        let metrics = rt.parquet_exec.metrics().unwrap();
+
+        let (page_index_rows_pruned, page_index_rows_matched) =
+            get_pruning_metric(&metrics, "page_index_rows_pruned");
+        assert_eq!(page_index_rows_pruned, 6);
+        assert_eq!(page_index_rows_matched, 1);
+
+        let (page_index_pages_pruned, page_index_pages_matched) =
+            get_pruning_metric(&metrics, "page_index_pages_pruned");
+        assert_eq!(page_index_pages_pruned, 3);
+        assert_eq!(page_index_pages_matched, 1);
     }
 
     /// Returns a string array with contents:
diff --git a/datafusion/core/tests/sql/explain_analyze.rs 
b/datafusion/core/tests/sql/explain_analyze.rs
index fa248c4486..5f62f7204e 100644
--- a/datafusion/core/tests/sql/explain_analyze.rs
+++ b/datafusion/core/tests/sql/explain_analyze.rs
@@ -879,12 +879,13 @@ async fn parquet_explain_analyze() {
     let i_rowgroup_stat = 
formatted.find("row_groups_pruned_statistics").unwrap();
     let i_rowgroup_bloomfilter =
         formatted.find("row_groups_pruned_bloom_filter").unwrap();
-    let i_page = formatted.find("page_index_rows_pruned").unwrap();
+    let i_page_rows = formatted.find("page_index_rows_pruned").unwrap();
+    let i_page_pages = formatted.find("page_index_pages_pruned").unwrap();
 
     assert!(
         (i_file < i_rowgroup_stat)
             && (i_rowgroup_stat < i_rowgroup_bloomfilter)
-            && (i_rowgroup_bloomfilter < i_page),
+            && (i_rowgroup_bloomfilter < i_page_pages && i_page_pages < 
i_page_rows),
         "The parquet pruning metrics should be displayed in an order of: file 
range -> row group statistics -> row group bloom filter -> page index."
     );
 }
diff --git a/datafusion/datasource-parquet/src/metrics.rs 
b/datafusion/datasource-parquet/src/metrics.rs
index 1e34b324a5..317612fac1 100644
--- a/datafusion/datasource-parquet/src/metrics.rs
+++ b/datafusion/datasource-parquet/src/metrics.rs
@@ -65,6 +65,8 @@ pub struct ParquetFileMetrics {
     pub bloom_filter_eval_time: Time,
     /// Total rows filtered or matched by parquet page index
     pub page_index_rows_pruned: PruningMetrics,
+    /// Total pages filtered or matched by parquet page index
+    pub page_index_pages_pruned: PruningMetrics,
     /// Total time spent evaluating parquet page index filters
     pub page_index_eval_time: Time,
     /// Total time spent reading and parsing metadata from the footer
@@ -121,6 +123,11 @@ impl ParquetFileMetrics {
             .with_type(MetricType::SUMMARY)
             .pruning_metrics("page_index_rows_pruned", partition);
 
+        let page_index_pages_pruned = MetricBuilder::new(metrics)
+            .with_new_label("filename", filename.to_string())
+            .with_type(MetricType::SUMMARY)
+            .pruning_metrics("page_index_pages_pruned", partition);
+
         let bytes_scanned = MetricBuilder::new(metrics)
             .with_new_label("filename", filename.to_string())
             .with_type(MetricType::SUMMARY)
@@ -191,6 +198,7 @@ impl ParquetFileMetrics {
             pushdown_rows_matched,
             row_pushdown_eval_time,
             page_index_rows_pruned,
+            page_index_pages_pruned,
             statistics_eval_time,
             bloom_filter_eval_time,
             page_index_eval_time,
diff --git a/datafusion/datasource-parquet/src/page_filter.rs 
b/datafusion/datasource-parquet/src/page_filter.rs
index e25e33835f..194e6e94fb 100644
--- a/datafusion/datasource-parquet/src/page_filter.rs
+++ b/datafusion/datasource-parquet/src/page_filter.rs
@@ -189,6 +189,10 @@ impl PagePruningAccessPlanFilter {
         let mut total_skip = 0;
         // track the total number of rows that should not be skipped
         let mut total_select = 0;
+        // track the total number of pages that should be skipped
+        let mut total_pages_skip = 0;
+        // track the total number of pages that should not be skipped
+        let mut total_pages_select = 0;
 
         // for each row group specified in the access plan
         let row_group_indexes = access_plan.row_group_indexes();
@@ -226,10 +230,12 @@ impl PagePruningAccessPlanFilter {
                     file_metrics,
                 );
 
-                let Some(selection) = selection else {
+                let Some((selection, total_pages, matched_pages)) = selection 
else {
                     trace!("No pages pruned in prune_pages_in_one_row_group");
                     continue;
                 };
+                total_pages_select += matched_pages;
+                total_pages_skip += total_pages - matched_pages;
 
                 debug!(
                     "Use filter and page index to create RowSelection {:?} 
from predicate: {:?}",
@@ -278,6 +284,12 @@ impl PagePruningAccessPlanFilter {
         file_metrics
             .page_index_rows_pruned
             .add_matched(total_select);
+        file_metrics
+            .page_index_pages_pruned
+            .add_pruned(total_pages_skip);
+        file_metrics
+            .page_index_pages_pruned
+            .add_matched(total_pages_select);
         access_plan
     }
 
@@ -297,7 +309,8 @@ fn update_selection(
     }
 }
 
-/// Returns a [`RowSelection`] for the rows in this row group to scan.
+/// Returns a [`RowSelection`] for the rows in this row group to scan, in 
addition to the number of
+/// total and matched pages.
 ///
 /// This Row Selection is formed from the page index and the predicate skips 
row
 /// ranges that can be ruled out based on the predicate.
@@ -310,7 +323,7 @@ fn prune_pages_in_one_row_group(
     converter: StatisticsConverter<'_>,
     parquet_metadata: &ParquetMetaData,
     metrics: &ParquetFileMetrics,
-) -> Option<RowSelection> {
+) -> Option<(RowSelection, usize, usize)> {
     let pruning_stats =
         PagesPruningStatistics::try_new(row_group_index, converter, 
parquet_metadata)?;
 
@@ -362,7 +375,11 @@ fn prune_pages_in_one_row_group(
         RowSelector::skip(sum_row)
     };
     vec.push(selector);
-    Some(RowSelection::from(vec))
+
+    let total_pages = values.len();
+    let matched_pages = values.iter().filter(|v| **v).count();
+
+    Some((RowSelection::from(vec), total_pages, matched_pages))
 }
 
 /// Implement [`PruningStatistics`] for one column's PageIndex (column_index + 
offset_index)
diff --git a/datafusion/physical-expr-common/src/metrics/value.rs 
b/datafusion/physical-expr-common/src/metrics/value.rs
index 5ecaa86fc3..26f68980ba 100644
--- a/datafusion/physical-expr-common/src/metrics/value.rs
+++ b/datafusion/physical-expr-common/src/metrics/value.rs
@@ -984,20 +984,21 @@ impl MetricValue {
                 "files_ranges_pruned_statistics" => 4,
                 "row_groups_pruned_statistics" => 5,
                 "row_groups_pruned_bloom_filter" => 6,
-                "page_index_rows_pruned" => 7,
-                _ => 8,
+                "page_index_pages_pruned" => 7,
+                "page_index_rows_pruned" => 8,
+                _ => 9,
             },
-            Self::SpillCount(_) => 9,
-            Self::SpilledBytes(_) => 10,
-            Self::SpilledRows(_) => 11,
-            Self::CurrentMemoryUsage(_) => 12,
-            Self::Count { .. } => 13,
-            Self::Gauge { .. } => 14,
-            Self::Time { .. } => 15,
-            Self::Ratio { .. } => 16,
-            Self::StartTimestamp(_) => 17, // show timestamps last
-            Self::EndTimestamp(_) => 18,
-            Self::Custom { .. } => 19,
+            Self::SpillCount(_) => 10,
+            Self::SpilledBytes(_) => 11,
+            Self::SpilledRows(_) => 12,
+            Self::CurrentMemoryUsage(_) => 13,
+            Self::Count { .. } => 14,
+            Self::Gauge { .. } => 15,
+            Self::Time { .. } => 16,
+            Self::Ratio { .. } => 17,
+            Self::StartTimestamp(_) => 18, // show timestamps last
+            Self::EndTimestamp(_) => 19,
+            Self::Custom { .. } => 20,
         }
     }
 
diff --git 
a/datafusion/sqllogictest/test_files/dynamic_filter_pushdown_config.slt 
b/datafusion/sqllogictest/test_files/dynamic_filter_pushdown_config.slt
index 38a5b11870..54418f0509 100644
--- a/datafusion/sqllogictest/test_files/dynamic_filter_pushdown_config.slt
+++ b/datafusion/sqllogictest/test_files/dynamic_filter_pushdown_config.slt
@@ -104,7 +104,7 @@ Plan with Metrics
 03)----ProjectionExec: expr=[id@0 as id, value@1 as v, value@1 + id@0 as 
name], metrics=[output_rows=10, <slt:ignore>]
 04)------FilterExec: value@1 > 3, metrics=[output_rows=10, <slt:ignore>, 
selectivity=100% (10/10)]
 05)--------RepartitionExec: partitioning=RoundRobinBatch(4), 
input_partitions=1, metrics=[output_rows=10, <slt:ignore>]
-06)----------DataSourceExec: file_groups={1 group: 
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/dynamic_filter_pushdown_config/test_data.parquet]]},
 projection=[id, value], file_type=parquet, predicate=value@1 > 3 AND 
DynamicFilter [ value@1 IS NULL OR value@1 > 800 ], 
pruning_predicate=value_null_count@1 != row_count@2 AND value_max@0 > 3 AND 
(value_null_count@1 > 0 OR value_null_count@1 != row_count@2 AND value_max@0 > 
800), required_guarantees=[], metrics=[output_rows=1 [...]
+06)----------DataSourceExec: file_groups={1 group: 
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/dynamic_filter_pushdown_config/test_data.parquet]]},
 projection=[id, value], file_type=parquet, predicate=value@1 > 3 AND 
DynamicFilter [ value@1 IS NULL OR value@1 > 800 ], 
pruning_predicate=value_null_count@1 != row_count@2 AND value_max@0 > 3 AND 
(value_null_count@1 > 0 OR value_null_count@1 != row_count@2 AND value_max@0 > 
800), required_guarantees=[], metrics=[output_rows=1 [...]
 
 statement ok
 set datafusion.explain.analyze_level = dev;
diff --git a/datafusion/sqllogictest/test_files/limit_pruning.slt 
b/datafusion/sqllogictest/test_files/limit_pruning.slt
index 8a94bf8adc..5dae82516d 100644
--- a/datafusion/sqllogictest/test_files/limit_pruning.slt
+++ b/datafusion/sqllogictest/test_files/limit_pruning.slt
@@ -63,7 +63,7 @@ set datafusion.explain.analyze_level = summary;
 query TT
 explain analyze select * from tracking_data where species > 'M' AND s >= 50 
limit 3;
 ----
-Plan with Metrics DataSourceExec: file_groups={1 group: 
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/limit_pruning/data.parquet]]},
 projection=[species, s], limit=3, file_type=parquet, predicate=species@0 > M 
AND s@1 >= 50, pruning_predicate=species_null_count@1 != row_count@2 AND 
species_max@0 > M AND s_null_count@4 != row_count@2 AND s_max@3 >= 50, 
required_guarantees=[], metrics=[output_rows=3, elapsed_compute=<slt:ignore>, 
output_bytes=<slt:ignore>, files_ranges_pruned [...]
+Plan with Metrics DataSourceExec: file_groups={1 group: 
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/limit_pruning/data.parquet]]},
 projection=[species, s], limit=3, file_type=parquet, predicate=species@0 > M 
AND s@1 >= 50, pruning_predicate=species_null_count@1 != row_count@2 AND 
species_max@0 > M AND s_null_count@4 != row_count@2 AND s_max@3 >= 50, 
required_guarantees=[], metrics=[output_rows=3, elapsed_compute=<slt:ignore>, 
output_bytes=<slt:ignore>, files_ranges_pruned [...]
 
 # limit_pruned_row_groups=0 total → 0 matched
 # because of order by, scan needs to preserve sort, so limit pruning is 
disabled
@@ -72,7 +72,7 @@ explain analyze select * from tracking_data where species > 
'M' AND s >= 50 orde
 ----
 Plan with Metrics
 01)SortExec: TopK(fetch=3), expr=[species@0 ASC NULLS LAST], 
preserve_partitioning=[false], filter=[species@0 < Nlpine Sheep], 
metrics=[output_rows=3, elapsed_compute=<slt:ignore>, output_bytes=<slt:ignore>]
-02)--DataSourceExec: file_groups={1 group: 
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/limit_pruning/data.parquet]]},
 projection=[species, s], file_type=parquet, predicate=species@0 > M AND s@1 >= 
50 AND DynamicFilter [ species@0 < Nlpine Sheep ], 
pruning_predicate=species_null_count@1 != row_count@2 AND species_max@0 > M AND 
s_null_count@4 != row_count@2 AND s_max@3 >= 50 AND species_null_count@1 != 
row_count@2 AND species_min@5 < Nlpine Sheep, required_guarantees=[], me [...]
+02)--DataSourceExec: file_groups={1 group: 
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/limit_pruning/data.parquet]]},
 projection=[species, s], file_type=parquet, predicate=species@0 > M AND s@1 >= 
50 AND DynamicFilter [ species@0 < Nlpine Sheep ], 
pruning_predicate=species_null_count@1 != row_count@2 AND species_max@0 > M AND 
s_null_count@4 != row_count@2 AND s_max@3 >= 50 AND species_null_count@1 != 
row_count@2 AND species_min@5 < Nlpine Sheep, required_guarantees=[], me [...]
 
 statement ok
 drop table tracking_data;
diff --git a/docs/source/user-guide/explain-usage.md 
b/docs/source/user-guide/explain-usage.md
index 8fe8316381..c047659e99 100644
--- a/docs/source/user-guide/explain-usage.md
+++ b/docs/source/user-guide/explain-usage.md
@@ -226,6 +226,7 @@ Again, reading from bottom up:
 When predicate pushdown is enabled, `DataSourceExec` with `ParquetSource` 
gains the following metrics:
 
 - `page_index_rows_pruned`: number of rows evaluated by page index filters. 
The metric reports both how many rows were considered in total and how many 
matched (were not pruned).
+- `page_index_pages_pruned`: number of pages evaluated by page index filters. 
The metric reports both how many pages were considered in total and how many 
matched (were not pruned).
 - `row_groups_pruned_bloom_filter`: number of row groups evaluated by Bloom 
Filters, reporting both total checked groups and groups that matched.
 - `row_groups_pruned_statistics`: number of row groups evaluated by row-group 
statistics (min/max), reporting both total checked groups and groups that 
matched.
 - `limit_pruned_row_groups`: number of row groups pruned by the limit.


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to