This is an automated email from the ASF dual-hosted git repository.
github-bot pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new 073174b034 feat: Show the number of matched Parquet pages in
`DataSourceExec` (#19977)
073174b034 is described below
commit 073174b0340a59588d59ca057801c0904166929f
Author: Nuno Faria <[email protected]>
AuthorDate: Tue Jan 27 06:03:41 2026 +0000
feat: Show the number of matched Parquet pages in `DataSourceExec` (#19977)
## Which issue does this PR close?
- Closes #19875.
## Rationale for this change
Show the number of matched (and pruned) pages in the explain analyze
plan to help make decisions about file optimization.
Example:
```sql
DataSourceExec: ..., metrics=[
...
page_index_rows_pruned=1.00 K total → 100 matched,
page_index_pages_pruned=100 total → 10 matched,
...
]
```
## What changes are included in this PR?
- Added `page_index_pages_pruned` metric to DataSourceExec.
- Updated and extended existing tests.
## Are these changes tested?
Yes.
## Are there any user-facing changes?
New metric in the explain plans.
---
.../core/src/datasource/physical_plan/parquet.rs | 48 ++++++++++++++++++----
datafusion/core/tests/sql/explain_analyze.rs | 5 ++-
datafusion/datasource-parquet/src/metrics.rs | 8 ++++
datafusion/datasource-parquet/src/page_filter.rs | 25 +++++++++--
.../physical-expr-common/src/metrics/value.rs | 27 ++++++------
.../test_files/dynamic_filter_pushdown_config.slt | 2 +-
.../sqllogictest/test_files/limit_pruning.slt | 4 +-
docs/source/user-guide/explain-usage.md | 1 +
8 files changed, 91 insertions(+), 29 deletions(-)
diff --git a/datafusion/core/src/datasource/physical_plan/parquet.rs
b/datafusion/core/src/datasource/physical_plan/parquet.rs
index ce2b05e6d3..9b4733dbcc 100644
--- a/datafusion/core/src/datasource/physical_plan/parquet.rs
+++ b/datafusion/core/src/datasource/physical_plan/parquet.rs
@@ -995,6 +995,7 @@ mod tests {
assert_eq!(read, 1, "Expected 1 rows to match the predicate");
assert_eq!(get_value(&metrics, "row_groups_pruned_statistics"), 0);
assert_eq!(get_value(&metrics, "page_index_rows_pruned"), 2);
+ assert_eq!(get_value(&metrics, "page_index_pages_pruned"), 1);
assert_eq!(get_value(&metrics, "pushdown_rows_pruned"), 1);
// If we filter with a value that is completely out of the range of
the data
// we prune at the row group level.
@@ -1168,10 +1169,16 @@ mod tests {
// There are 4 rows pruned in each of batch2, batch3, and
// batch4 for a total of 12. batch1 had no pruning as c2 was
// filled in as null
- let (page_index_pruned, page_index_matched) =
+ let (page_index_rows_pruned, page_index_rows_matched) =
get_pruning_metric(&metrics, "page_index_rows_pruned");
- assert_eq!(page_index_pruned, 12);
- assert_eq!(page_index_matched, 6);
+ assert_eq!(page_index_rows_pruned, 12);
+ assert_eq!(page_index_rows_matched, 6);
+
+ // each page has 2 rows, so the num of pages is 1/2 the number of rows
+ let (page_index_pages_pruned, page_index_pages_matched) =
+ get_pruning_metric(&metrics, "page_index_pages_pruned");
+ assert_eq!(page_index_pages_pruned, 6);
+ assert_eq!(page_index_pages_matched, 3);
}
#[tokio::test]
@@ -1734,6 +1741,7 @@ mod tests {
Some(3),
Some(4),
Some(5),
+ Some(6), // last page with only one row
]));
let batch1 = create_batch(vec![("int", c1.clone())]);
@@ -1742,7 +1750,7 @@ mod tests {
let rt = RoundTrip::new()
.with_predicate(filter)
.with_page_index_predicate()
- .round_trip(vec![batch1])
+ .round_trip(vec![batch1.clone()])
.await;
let metrics = rt.parquet_exec.metrics().unwrap();
@@ -1755,14 +1763,40 @@ mod tests {
| 5 |
+-----+
");
- let (page_index_pruned, page_index_matched) =
+ let (page_index_rows_pruned, page_index_rows_matched) =
get_pruning_metric(&metrics, "page_index_rows_pruned");
- assert_eq!(page_index_pruned, 4);
- assert_eq!(page_index_matched, 2);
+ assert_eq!(page_index_rows_pruned, 5);
+ assert_eq!(page_index_rows_matched, 2);
assert!(
get_value(&metrics, "page_index_eval_time") > 0,
"no eval time in metrics: {metrics:#?}"
);
+
+ // each page has 2 rows, so the num of pages is 1/2 the number of rows
+ let (page_index_pages_pruned, page_index_pages_matched) =
+ get_pruning_metric(&metrics, "page_index_pages_pruned");
+ assert_eq!(page_index_pages_pruned, 3);
+ assert_eq!(page_index_pages_matched, 1);
+
+ // test with a filter that matches the page with one row
+ let filter = col("int").eq(lit(6_i32));
+ let rt = RoundTrip::new()
+ .with_predicate(filter)
+ .with_page_index_predicate()
+ .round_trip(vec![batch1])
+ .await;
+
+ let metrics = rt.parquet_exec.metrics().unwrap();
+
+ let (page_index_rows_pruned, page_index_rows_matched) =
+ get_pruning_metric(&metrics, "page_index_rows_pruned");
+ assert_eq!(page_index_rows_pruned, 6);
+ assert_eq!(page_index_rows_matched, 1);
+
+ let (page_index_pages_pruned, page_index_pages_matched) =
+ get_pruning_metric(&metrics, "page_index_pages_pruned");
+ assert_eq!(page_index_pages_pruned, 3);
+ assert_eq!(page_index_pages_matched, 1);
}
/// Returns a string array with contents:
diff --git a/datafusion/core/tests/sql/explain_analyze.rs
b/datafusion/core/tests/sql/explain_analyze.rs
index fa248c4486..5f62f7204e 100644
--- a/datafusion/core/tests/sql/explain_analyze.rs
+++ b/datafusion/core/tests/sql/explain_analyze.rs
@@ -879,12 +879,13 @@ async fn parquet_explain_analyze() {
let i_rowgroup_stat =
formatted.find("row_groups_pruned_statistics").unwrap();
let i_rowgroup_bloomfilter =
formatted.find("row_groups_pruned_bloom_filter").unwrap();
- let i_page = formatted.find("page_index_rows_pruned").unwrap();
+ let i_page_rows = formatted.find("page_index_rows_pruned").unwrap();
+ let i_page_pages = formatted.find("page_index_pages_pruned").unwrap();
assert!(
(i_file < i_rowgroup_stat)
&& (i_rowgroup_stat < i_rowgroup_bloomfilter)
- && (i_rowgroup_bloomfilter < i_page),
+ && (i_rowgroup_bloomfilter < i_page_pages && i_page_pages <
i_page_rows),
"The parquet pruning metrics should be displayed in an order of: file
range -> row group statistics -> row group bloom filter -> page index."
);
}
diff --git a/datafusion/datasource-parquet/src/metrics.rs
b/datafusion/datasource-parquet/src/metrics.rs
index 1e34b324a5..317612fac1 100644
--- a/datafusion/datasource-parquet/src/metrics.rs
+++ b/datafusion/datasource-parquet/src/metrics.rs
@@ -65,6 +65,8 @@ pub struct ParquetFileMetrics {
pub bloom_filter_eval_time: Time,
/// Total rows filtered or matched by parquet page index
pub page_index_rows_pruned: PruningMetrics,
+ /// Total pages filtered or matched by parquet page index
+ pub page_index_pages_pruned: PruningMetrics,
/// Total time spent evaluating parquet page index filters
pub page_index_eval_time: Time,
/// Total time spent reading and parsing metadata from the footer
@@ -121,6 +123,11 @@ impl ParquetFileMetrics {
.with_type(MetricType::SUMMARY)
.pruning_metrics("page_index_rows_pruned", partition);
+ let page_index_pages_pruned = MetricBuilder::new(metrics)
+ .with_new_label("filename", filename.to_string())
+ .with_type(MetricType::SUMMARY)
+ .pruning_metrics("page_index_pages_pruned", partition);
+
let bytes_scanned = MetricBuilder::new(metrics)
.with_new_label("filename", filename.to_string())
.with_type(MetricType::SUMMARY)
@@ -191,6 +198,7 @@ impl ParquetFileMetrics {
pushdown_rows_matched,
row_pushdown_eval_time,
page_index_rows_pruned,
+ page_index_pages_pruned,
statistics_eval_time,
bloom_filter_eval_time,
page_index_eval_time,
diff --git a/datafusion/datasource-parquet/src/page_filter.rs
b/datafusion/datasource-parquet/src/page_filter.rs
index e25e33835f..194e6e94fb 100644
--- a/datafusion/datasource-parquet/src/page_filter.rs
+++ b/datafusion/datasource-parquet/src/page_filter.rs
@@ -189,6 +189,10 @@ impl PagePruningAccessPlanFilter {
let mut total_skip = 0;
// track the total number of rows that should not be skipped
let mut total_select = 0;
+ // track the total number of pages that should be skipped
+ let mut total_pages_skip = 0;
+ // track the total number of pages that should not be skipped
+ let mut total_pages_select = 0;
// for each row group specified in the access plan
let row_group_indexes = access_plan.row_group_indexes();
@@ -226,10 +230,12 @@ impl PagePruningAccessPlanFilter {
file_metrics,
);
- let Some(selection) = selection else {
+ let Some((selection, total_pages, matched_pages)) = selection
else {
trace!("No pages pruned in prune_pages_in_one_row_group");
continue;
};
+ total_pages_select += matched_pages;
+ total_pages_skip += total_pages - matched_pages;
debug!(
"Use filter and page index to create RowSelection {:?}
from predicate: {:?}",
@@ -278,6 +284,12 @@ impl PagePruningAccessPlanFilter {
file_metrics
.page_index_rows_pruned
.add_matched(total_select);
+ file_metrics
+ .page_index_pages_pruned
+ .add_pruned(total_pages_skip);
+ file_metrics
+ .page_index_pages_pruned
+ .add_matched(total_pages_select);
access_plan
}
@@ -297,7 +309,8 @@ fn update_selection(
}
}
-/// Returns a [`RowSelection`] for the rows in this row group to scan.
+/// Returns a [`RowSelection`] for the rows in this row group to scan, in
addition to the number of
+/// total and matched pages.
///
/// This Row Selection is formed from the page index and the predicate skips
row
/// ranges that can be ruled out based on the predicate.
@@ -310,7 +323,7 @@ fn prune_pages_in_one_row_group(
converter: StatisticsConverter<'_>,
parquet_metadata: &ParquetMetaData,
metrics: &ParquetFileMetrics,
-) -> Option<RowSelection> {
+) -> Option<(RowSelection, usize, usize)> {
let pruning_stats =
PagesPruningStatistics::try_new(row_group_index, converter,
parquet_metadata)?;
@@ -362,7 +375,11 @@ fn prune_pages_in_one_row_group(
RowSelector::skip(sum_row)
};
vec.push(selector);
- Some(RowSelection::from(vec))
+
+ let total_pages = values.len();
+ let matched_pages = values.iter().filter(|v| **v).count();
+
+ Some((RowSelection::from(vec), total_pages, matched_pages))
}
/// Implement [`PruningStatistics`] for one column's PageIndex (column_index +
offset_index)
diff --git a/datafusion/physical-expr-common/src/metrics/value.rs
b/datafusion/physical-expr-common/src/metrics/value.rs
index 5ecaa86fc3..26f68980ba 100644
--- a/datafusion/physical-expr-common/src/metrics/value.rs
+++ b/datafusion/physical-expr-common/src/metrics/value.rs
@@ -984,20 +984,21 @@ impl MetricValue {
"files_ranges_pruned_statistics" => 4,
"row_groups_pruned_statistics" => 5,
"row_groups_pruned_bloom_filter" => 6,
- "page_index_rows_pruned" => 7,
- _ => 8,
+ "page_index_pages_pruned" => 7,
+ "page_index_rows_pruned" => 8,
+ _ => 9,
},
- Self::SpillCount(_) => 9,
- Self::SpilledBytes(_) => 10,
- Self::SpilledRows(_) => 11,
- Self::CurrentMemoryUsage(_) => 12,
- Self::Count { .. } => 13,
- Self::Gauge { .. } => 14,
- Self::Time { .. } => 15,
- Self::Ratio { .. } => 16,
- Self::StartTimestamp(_) => 17, // show timestamps last
- Self::EndTimestamp(_) => 18,
- Self::Custom { .. } => 19,
+ Self::SpillCount(_) => 10,
+ Self::SpilledBytes(_) => 11,
+ Self::SpilledRows(_) => 12,
+ Self::CurrentMemoryUsage(_) => 13,
+ Self::Count { .. } => 14,
+ Self::Gauge { .. } => 15,
+ Self::Time { .. } => 16,
+ Self::Ratio { .. } => 17,
+ Self::StartTimestamp(_) => 18, // show timestamps last
+ Self::EndTimestamp(_) => 19,
+ Self::Custom { .. } => 20,
}
}
diff --git
a/datafusion/sqllogictest/test_files/dynamic_filter_pushdown_config.slt
b/datafusion/sqllogictest/test_files/dynamic_filter_pushdown_config.slt
index 38a5b11870..54418f0509 100644
--- a/datafusion/sqllogictest/test_files/dynamic_filter_pushdown_config.slt
+++ b/datafusion/sqllogictest/test_files/dynamic_filter_pushdown_config.slt
@@ -104,7 +104,7 @@ Plan with Metrics
03)----ProjectionExec: expr=[id@0 as id, value@1 as v, value@1 + id@0 as
name], metrics=[output_rows=10, <slt:ignore>]
04)------FilterExec: value@1 > 3, metrics=[output_rows=10, <slt:ignore>,
selectivity=100% (10/10)]
05)--------RepartitionExec: partitioning=RoundRobinBatch(4),
input_partitions=1, metrics=[output_rows=10, <slt:ignore>]
-06)----------DataSourceExec: file_groups={1 group:
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/dynamic_filter_pushdown_config/test_data.parquet]]},
projection=[id, value], file_type=parquet, predicate=value@1 > 3 AND
DynamicFilter [ value@1 IS NULL OR value@1 > 800 ],
pruning_predicate=value_null_count@1 != row_count@2 AND value_max@0 > 3 AND
(value_null_count@1 > 0 OR value_null_count@1 != row_count@2 AND value_max@0 >
800), required_guarantees=[], metrics=[output_rows=1 [...]
+06)----------DataSourceExec: file_groups={1 group:
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/dynamic_filter_pushdown_config/test_data.parquet]]},
projection=[id, value], file_type=parquet, predicate=value@1 > 3 AND
DynamicFilter [ value@1 IS NULL OR value@1 > 800 ],
pruning_predicate=value_null_count@1 != row_count@2 AND value_max@0 > 3 AND
(value_null_count@1 > 0 OR value_null_count@1 != row_count@2 AND value_max@0 >
800), required_guarantees=[], metrics=[output_rows=1 [...]
statement ok
set datafusion.explain.analyze_level = dev;
diff --git a/datafusion/sqllogictest/test_files/limit_pruning.slt
b/datafusion/sqllogictest/test_files/limit_pruning.slt
index 8a94bf8adc..5dae82516d 100644
--- a/datafusion/sqllogictest/test_files/limit_pruning.slt
+++ b/datafusion/sqllogictest/test_files/limit_pruning.slt
@@ -63,7 +63,7 @@ set datafusion.explain.analyze_level = summary;
query TT
explain analyze select * from tracking_data where species > 'M' AND s >= 50
limit 3;
----
-Plan with Metrics DataSourceExec: file_groups={1 group:
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/limit_pruning/data.parquet]]},
projection=[species, s], limit=3, file_type=parquet, predicate=species@0 > M
AND s@1 >= 50, pruning_predicate=species_null_count@1 != row_count@2 AND
species_max@0 > M AND s_null_count@4 != row_count@2 AND s_max@3 >= 50,
required_guarantees=[], metrics=[output_rows=3, elapsed_compute=<slt:ignore>,
output_bytes=<slt:ignore>, files_ranges_pruned [...]
+Plan with Metrics DataSourceExec: file_groups={1 group:
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/limit_pruning/data.parquet]]},
projection=[species, s], limit=3, file_type=parquet, predicate=species@0 > M
AND s@1 >= 50, pruning_predicate=species_null_count@1 != row_count@2 AND
species_max@0 > M AND s_null_count@4 != row_count@2 AND s_max@3 >= 50,
required_guarantees=[], metrics=[output_rows=3, elapsed_compute=<slt:ignore>,
output_bytes=<slt:ignore>, files_ranges_pruned [...]
# limit_pruned_row_groups=0 total → 0 matched
# because of order by, scan needs to preserve sort, so limit pruning is
disabled
@@ -72,7 +72,7 @@ explain analyze select * from tracking_data where species >
'M' AND s >= 50 orde
----
Plan with Metrics
01)SortExec: TopK(fetch=3), expr=[species@0 ASC NULLS LAST],
preserve_partitioning=[false], filter=[species@0 < Nlpine Sheep],
metrics=[output_rows=3, elapsed_compute=<slt:ignore>, output_bytes=<slt:ignore>]
-02)--DataSourceExec: file_groups={1 group:
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/limit_pruning/data.parquet]]},
projection=[species, s], file_type=parquet, predicate=species@0 > M AND s@1 >=
50 AND DynamicFilter [ species@0 < Nlpine Sheep ],
pruning_predicate=species_null_count@1 != row_count@2 AND species_max@0 > M AND
s_null_count@4 != row_count@2 AND s_max@3 >= 50 AND species_null_count@1 !=
row_count@2 AND species_min@5 < Nlpine Sheep, required_guarantees=[], me [...]
+02)--DataSourceExec: file_groups={1 group:
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/limit_pruning/data.parquet]]},
projection=[species, s], file_type=parquet, predicate=species@0 > M AND s@1 >=
50 AND DynamicFilter [ species@0 < Nlpine Sheep ],
pruning_predicate=species_null_count@1 != row_count@2 AND species_max@0 > M AND
s_null_count@4 != row_count@2 AND s_max@3 >= 50 AND species_null_count@1 !=
row_count@2 AND species_min@5 < Nlpine Sheep, required_guarantees=[], me [...]
statement ok
drop table tracking_data;
diff --git a/docs/source/user-guide/explain-usage.md
b/docs/source/user-guide/explain-usage.md
index 8fe8316381..c047659e99 100644
--- a/docs/source/user-guide/explain-usage.md
+++ b/docs/source/user-guide/explain-usage.md
@@ -226,6 +226,7 @@ Again, reading from bottom up:
When predicate pushdown is enabled, `DataSourceExec` with `ParquetSource`
gains the following metrics:
- `page_index_rows_pruned`: number of rows evaluated by page index filters.
The metric reports both how many rows were considered in total and how many
matched (were not pruned).
+- `page_index_pages_pruned`: number of pages evaluated by page index filters.
The metric reports both how many pages were considered in total and how many
matched (were not pruned).
- `row_groups_pruned_bloom_filter`: number of row groups evaluated by Bloom
Filters, reporting both total checked groups and groups that matched.
- `row_groups_pruned_statistics`: number of row groups evaluated by row-group
statistics (min/max), reporting both total checked groups and groups that
matched.
- `limit_pruned_row_groups`: number of row groups pruned by the limit.
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]