This is an automated email from the ASF dual-hosted git repository.
github-bot pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new f4a49b538c feat(small): Set 'summary' level metrics for
`DataSourceExec` with parquet source (#18196)
f4a49b538c is described below
commit f4a49b538c5de116758de1edf474eba874f4cb20
Author: Yongting You <[email protected]>
AuthorDate: Sat Oct 25 10:57:00 2025 +0800
feat(small): Set 'summary' level metrics for `DataSourceExec` with parquet
source (#18196)
## Which issue does this PR close?
<!--
We generally require a GitHub issue to be filed for all bug fixes and
enhancements and this helps us generate change logs for our releases.
You can link an issue to this PR using the GitHub syntax. For example
`Closes #123` indicates that this PR will close issue #123.
-->
Part of https://github.com/apache/datafusion/issues/18116
## Rationale for this change
<!--
Why are you proposing this change? If this is already explained clearly
in the issue then this section is not needed.
Explaining clearly why changes are proposed helps reviewers understand
your changes and offer better suggestions for fixes.
-->
The below configuration can be used to let `EXPLAIN ANALYZE` only show
important high-level insights.
```
set datafusion.explain.analyze_level = summary;
```
This PR sets `summary` level metrics for the parquet data source:
### `summary` level metrics for `DataSourceExec` with `Parquet` source
- File level pruning metrics
- Row-group level pruning metrics
- Bytes scanned
- metadata load time
In
https://github.com/apache/datafusion/blob/155b56e521d75186776a65f1634ee03058899a79/datafusion/datasource-parquet/src/metrics.rs#L29
The remaining metrics are kept in the `dev` level. I'm not sure if the
page level pruning metrics should also be included to the `summary`
level, I'm open to suggestions for this, or any other metrics that
should also be included.
While implementing this, I came up with a few ideas to further improve
metrics tracking in the Parquet scanner. I’ve documented them in
https://github.com/apache/datafusion/issues/18195
## What changes are included in this PR?
<!--
There is no need to duplicate the description in the issue here but it
is sometimes worth providing a summary of the individual changes in this
PR.
-->
Set the above metrics to `summary` analyze level
## Are these changes tested?
UTs
<!--
We typically require tests for all PRs in order to:
1. Prevent the code from being accidentally broken by subsequent changes
2. Serve as another way to document the expected behavior of the code
If tests are not included in your PR, please explain why (for example,
are they covered by existing tests)?
-->
## Are there any user-facing changes?
No
<!--
If there are user-facing changes then we may require documentation to be
updated before approving the PR.
-->
<!--
If there are any breaking changes to public APIs, please add the `api
change` label.
-->
---
datafusion/core/tests/sql/explain_analyze.rs | 65 +++++++++++++++++++++++-----
datafusion/datasource-parquet/src/metrics.rs | 53 +++++++++++++++--------
2 files changed, 87 insertions(+), 31 deletions(-)
diff --git a/datafusion/core/tests/sql/explain_analyze.rs
b/datafusion/core/tests/sql/explain_analyze.rs
index 54a57ed901..6d386cc456 100644
--- a/datafusion/core/tests/sql/explain_analyze.rs
+++ b/datafusion/core/tests/sql/explain_analyze.rs
@@ -161,22 +161,35 @@ fn nanos_from_timestamp(ts: &Timestamp) -> i64 {
}
// Test different detail level for config `datafusion.explain.analyze_level`
+
+async fn collect_plan_with_context(
+ sql_str: &str,
+ ctx: &SessionContext,
+ level: ExplainAnalyzeLevel,
+) -> String {
+ {
+ let state = ctx.state_ref();
+ let mut state = state.write();
+ state.config_mut().options_mut().explain.analyze_level = level;
+ }
+ let dataframe = ctx.sql(sql_str).await.unwrap();
+ let batches = dataframe.collect().await.unwrap();
+ arrow::util::pretty::pretty_format_batches(&batches)
+ .unwrap()
+ .to_string()
+}
+
+async fn collect_plan(sql_str: &str, level: ExplainAnalyzeLevel) -> String {
+ let ctx = SessionContext::new();
+ collect_plan_with_context(sql_str, &ctx, level).await
+}
+
#[tokio::test]
async fn explain_analyze_level() {
- async fn collect_plan(level: ExplainAnalyzeLevel) -> String {
- let mut config = SessionConfig::new();
- config.options_mut().explain.analyze_level = level;
- let ctx = SessionContext::new_with_config(config);
- let sql = "EXPLAIN ANALYZE \
+ let sql = "EXPLAIN ANALYZE \
SELECT * \
FROM generate_series(10) as t1(v1) \
ORDER BY v1 DESC";
- let dataframe = ctx.sql(sql).await.unwrap();
- let batches = dataframe.collect().await.unwrap();
- arrow::util::pretty::pretty_format_batches(&batches)
- .unwrap()
- .to_string()
- }
for (level, needle, should_contain) in [
(ExplainAnalyzeLevel::Summary, "spill_count", false),
@@ -184,7 +197,35 @@ async fn explain_analyze_level() {
(ExplainAnalyzeLevel::Dev, "spill_count", true),
(ExplainAnalyzeLevel::Dev, "output_rows", true),
] {
- let plan = collect_plan(level).await;
+ let plan = collect_plan(sql, level).await;
+ assert_eq!(
+ plan.contains(needle),
+ should_contain,
+ "plan for level {level:?} unexpected content: {plan}"
+ );
+ }
+}
+
+#[tokio::test]
+async fn explain_analyze_level_datasource_parquet() {
+ let table_name = "tpch_lineitem_small";
+ let parquet_path = "tests/data/tpch_lineitem_small.parquet";
+ let sql = format!("EXPLAIN ANALYZE SELECT * FROM {table_name}");
+
+ // Register test parquet file into context
+ let ctx = SessionContext::new();
+ ctx.register_parquet(table_name, parquet_path,
ParquetReadOptions::default())
+ .await
+ .expect("register parquet table for explain analyze test");
+
+ for (level, needle, should_contain) in [
+ (ExplainAnalyzeLevel::Summary, "metadata_load_time", true),
+ (ExplainAnalyzeLevel::Summary, "page_index_eval_time", false),
+ (ExplainAnalyzeLevel::Dev, "metadata_load_time", true),
+ (ExplainAnalyzeLevel::Dev, "page_index_eval_time", true),
+ ] {
+ let plan = collect_plan_with_context(&sql, &ctx, level).await;
+
assert_eq!(
plan.contains(needle),
should_contain,
diff --git a/datafusion/datasource-parquet/src/metrics.rs
b/datafusion/datasource-parquet/src/metrics.rs
index d75a979d4c..5f17fbb4b9 100644
--- a/datafusion/datasource-parquet/src/metrics.rs
+++ b/datafusion/datasource-parquet/src/metrics.rs
@@ -16,7 +16,7 @@
// under the License.
use datafusion_physical_plan::metrics::{
- Count, ExecutionPlanMetricsSet, MetricBuilder, Time,
+ Count, ExecutionPlanMetricsSet, MetricBuilder, MetricType, Time,
};
/// Stores metrics about the parquet execution for a particular parquet file.
@@ -88,30 +88,59 @@ impl ParquetFileMetrics {
filename: &str,
metrics: &ExecutionPlanMetricsSet,
) -> Self {
- let predicate_evaluation_errors = MetricBuilder::new(metrics)
- .with_new_label("filename", filename.to_string())
- .counter("predicate_evaluation_errors", partition);
-
+ // -----------------------
+ // 'summary' level metrics
+ // -----------------------
let row_groups_matched_bloom_filter = MetricBuilder::new(metrics)
.with_new_label("filename", filename.to_string())
+ .with_type(MetricType::SUMMARY)
.counter("row_groups_matched_bloom_filter", partition);
let row_groups_pruned_bloom_filter = MetricBuilder::new(metrics)
.with_new_label("filename", filename.to_string())
+ .with_type(MetricType::SUMMARY)
.counter("row_groups_pruned_bloom_filter", partition);
let row_groups_matched_statistics = MetricBuilder::new(metrics)
.with_new_label("filename", filename.to_string())
+ .with_type(MetricType::SUMMARY)
.counter("row_groups_matched_statistics", partition);
let row_groups_pruned_statistics = MetricBuilder::new(metrics)
.with_new_label("filename", filename.to_string())
+ .with_type(MetricType::SUMMARY)
.counter("row_groups_pruned_statistics", partition);
+ let page_index_rows_pruned = MetricBuilder::new(metrics)
+ .with_new_label("filename", filename.to_string())
+ .with_type(MetricType::SUMMARY)
+ .counter("page_index_rows_pruned", partition);
+ let page_index_rows_matched = MetricBuilder::new(metrics)
+ .with_new_label("filename", filename.to_string())
+ .with_type(MetricType::SUMMARY)
+ .counter("page_index_rows_matched", partition);
+
let bytes_scanned = MetricBuilder::new(metrics)
.with_new_label("filename", filename.to_string())
+ .with_type(MetricType::SUMMARY)
.counter("bytes_scanned", partition);
+ let metadata_load_time = MetricBuilder::new(metrics)
+ .with_new_label("filename", filename.to_string())
+ .with_type(MetricType::SUMMARY)
+ .subset_time("metadata_load_time", partition);
+
+ let files_ranges_pruned_statistics = MetricBuilder::new(metrics)
+ .with_type(MetricType::SUMMARY)
+ .counter("files_ranges_pruned_statistics", partition);
+
+ // -----------------------
+ // 'dev' level metrics
+ // -----------------------
+ let predicate_evaluation_errors = MetricBuilder::new(metrics)
+ .with_new_label("filename", filename.to_string())
+ .counter("predicate_evaluation_errors", partition);
+
let pushdown_rows_pruned = MetricBuilder::new(metrics)
.with_new_label("filename", filename.to_string())
.counter("pushdown_rows_pruned", partition);
@@ -129,24 +158,10 @@ impl ParquetFileMetrics {
.with_new_label("filename", filename.to_string())
.subset_time("bloom_filter_eval_time", partition);
- let page_index_rows_pruned = MetricBuilder::new(metrics)
- .with_new_label("filename", filename.to_string())
- .counter("page_index_rows_pruned", partition);
- let page_index_rows_matched = MetricBuilder::new(metrics)
- .with_new_label("filename", filename.to_string())
- .counter("page_index_rows_matched", partition);
-
let page_index_eval_time = MetricBuilder::new(metrics)
.with_new_label("filename", filename.to_string())
.subset_time("page_index_eval_time", partition);
- let metadata_load_time = MetricBuilder::new(metrics)
- .with_new_label("filename", filename.to_string())
- .subset_time("metadata_load_time", partition);
-
- let files_ranges_pruned_statistics = MetricBuilder::new(metrics)
- .counter("files_ranges_pruned_statistics", partition);
-
let predicate_cache_inner_records = MetricBuilder::new(metrics)
.with_new_label("filename", filename.to_string())
.counter("predicate_cache_inner_records", partition);
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]