This is an automated email from the ASF dual-hosted git repository.
jiayu pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/sedona-db.git
The following commit(s) were added to refs/heads/main by this push:
new b320337 feat: Add metrics in `DataSourceExec` related to spatial
predicate pruning (#173)
b320337 is described below
commit b3203376bf082c19ca9c6f9515bfb0085819c649
Author: Yongting You <[email protected]>
AuthorDate: Sat Oct 4 23:49:30 2025 +0800
feat: Add metrics in `DataSourceExec` related to spatial predicate pruning
(#173)
---
rust/sedona-geoparquet/src/file_opener.rs | 48 ++++++++++++++
rust/sedona-geoparquet/src/format.rs | 17 +++--
rust/sedona-testing/src/data.rs | 59 +++++++++++++++++
rust/sedona/tests/metrics.rs | 101 ++++++++++++++++++++++++++++++
4 files changed, 221 insertions(+), 4 deletions(-)
diff --git a/rust/sedona-geoparquet/src/file_opener.rs
b/rust/sedona-geoparquet/src/file_opener.rs
index d507f2f..a71c954 100644
--- a/rust/sedona-geoparquet/src/file_opener.rs
+++ b/rust/sedona-geoparquet/src/file_opener.rs
@@ -24,6 +24,7 @@ use datafusion::datasource::{
};
use datafusion_common::Result;
use datafusion_physical_expr::PhysicalExpr;
+use datafusion_physical_plan::metrics::{Count, ExecutionPlanMetricsSet,
MetricBuilder};
use object_store::ObjectStore;
use parquet::file::{
metadata::{ParquetMetaData, RowGroupMetaData},
@@ -35,6 +36,40 @@ use sedona_schema::{datatypes::SedonaType,
matchers::ArgMatcher};
use crate::metadata::GeoParquetMetadata;
+#[derive(Clone)]
+struct GeoParquetFileOpenerMetrics {
+ /// How many file ranges are pruned by [`SpatialFilter`]
+ ///
+ /// Note on "file range": an opener may read only part of a file rather
than the
+ /// entire file; that portion is referred to as the "file range". See
[`PartitionedFile`]
+ /// for details.
+ files_ranges_spatial_pruned: Count,
+ /// How many file ranges are matched by [`SpatialFilter`]
+ files_ranges_spatial_matched: Count,
+ /// How many row groups are pruned by [`SpatialFilter`]
+ ///
+ /// Note: row groups skipped during the file-level pruning step are not
counted
+ /// again here.
+ row_groups_spatial_pruned: Count,
+ /// How many row groups are matched by [`SpatialFilter`]
+ row_groups_spatial_matched: Count,
+}
+
+impl GeoParquetFileOpenerMetrics {
+ fn new(execution_plan_global_metrics: &ExecutionPlanMetricsSet) -> Self {
+ Self {
+ files_ranges_spatial_pruned:
MetricBuilder::new(execution_plan_global_metrics)
+ .global_counter("files_ranges_spatial_pruned"),
+ files_ranges_spatial_matched:
MetricBuilder::new(execution_plan_global_metrics)
+ .global_counter("files_ranges_spatial_matched"),
+ row_groups_spatial_pruned:
MetricBuilder::new(execution_plan_global_metrics)
+ .global_counter("row_groups_spatial_pruned"),
+ row_groups_spatial_matched:
MetricBuilder::new(execution_plan_global_metrics)
+ .global_counter("row_groups_spatial_matched"),
+ }
+ }
+}
+
/// Geo-aware [FileOpener] implementing file and row group pruning
///
/// Pruning happens (for Parquet) in the [FileOpener], so we implement
@@ -47,6 +82,7 @@ pub struct GeoParquetFileOpener {
predicate: Arc<dyn PhysicalExpr>,
file_schema: SchemaRef,
enable_pruning: bool,
+ metrics: GeoParquetFileOpenerMetrics,
}
impl GeoParquetFileOpener {
@@ -58,6 +94,7 @@ impl GeoParquetFileOpener {
predicate: Arc<dyn PhysicalExpr>,
file_schema: SchemaRef,
enable_pruning: bool,
+ execution_plan_global_metrics: &ExecutionPlanMetricsSet,
) -> Self {
Self {
inner,
@@ -66,6 +103,7 @@ impl GeoParquetFileOpener {
predicate,
file_schema,
enable_pruning,
+ metrics:
GeoParquetFileOpenerMetrics::new(execution_plan_global_metrics),
}
}
}
@@ -96,6 +134,7 @@ impl FileOpener for GeoParquetFileOpener {
&mut access_plan,
&spatial_filter,
&geoparquet_metadata,
+ &self_clone.metrics,
)?;
filter_access_plan_using_geoparquet_covering(
@@ -104,6 +143,7 @@ impl FileOpener for GeoParquetFileOpener {
&spatial_filter,
&geoparquet_metadata,
&parquet_metadata,
+ &self_clone.metrics,
)?;
}
}
@@ -135,12 +175,16 @@ fn filter_access_plan_using_geoparquet_file_metadata(
access_plan: &mut ParquetAccessPlan,
spatial_filter: &SpatialFilter,
metadata: &GeoParquetMetadata,
+ metrics: &GeoParquetFileOpenerMetrics,
) -> Result<()> {
let table_geo_stats = geoparquet_file_geo_stats(file_schema, metadata)?;
if !spatial_filter.evaluate(&table_geo_stats) {
+ metrics.files_ranges_spatial_pruned.add(1);
for i in access_plan.row_group_indexes() {
access_plan.skip(i);
}
+ } else {
+ metrics.files_ranges_spatial_matched.add(1);
}
Ok(())
@@ -156,6 +200,7 @@ fn filter_access_plan_using_geoparquet_covering(
spatial_filter: &SpatialFilter,
metadata: &GeoParquetMetadata,
parquet_metadata: &ParquetMetaData,
+ metrics: &GeoParquetFileOpenerMetrics,
) -> Result<()> {
let row_group_indices_to_scan = access_plan.row_group_indexes();
@@ -176,7 +221,10 @@ fn filter_access_plan_using_geoparquet_covering(
// Evaluate predicate!
if !spatial_filter.evaluate(&row_group_geo_stats) {
+ metrics.row_groups_spatial_pruned.add(1);
access_plan.skip(i);
+ } else {
+ metrics.row_groups_spatial_matched.add(1);
}
}
diff --git a/rust/sedona-geoparquet/src/format.rs
b/rust/sedona-geoparquet/src/format.rs
index 94576ab..bed3dd8 100644
--- a/rust/sedona-geoparquet/src/format.rs
+++ b/rust/sedona-geoparquet/src/format.rs
@@ -364,15 +364,21 @@ impl GeoParquetFileSource {
predicate: Option<Arc<dyn PhysicalExpr>>,
) -> Result<Self> {
if let Some(parquet_source) =
inner.as_any().downcast_ref::<ParquetSource>() {
- let mut parquet_source = parquet_source.clone();
+ let parquet_source = parquet_source.clone();
// Extract the predicate from the existing source if it exists so
we can keep a copy of it
let new_predicate = match (parquet_source.predicate().cloned(),
predicate) {
(None, None) => None,
(None, Some(specified_predicate)) => Some(specified_predicate),
(Some(inner_predicate), None) => Some(inner_predicate),
- (Some(_), Some(specified_predicate)) => {
- parquet_source =
parquet_source.with_predicate(specified_predicate.clone());
- Some(specified_predicate)
+ (Some(inner_predicate), Some(specified_predicate)) => {
+ // Sanity check: predicate in `GeoParquetFileSource` is
init
+ // from its inner ParquetSource's predicate, they should be
+ // equivalent.
+ if Arc::ptr_eq(&inner_predicate, &specified_predicate) {
+ Some(inner_predicate)
+ } else {
+ return sedona_internal_err!("Inner predicate should be
equivalent to the predicate in `GeoParquetFileSource`");
+ }
}
};
@@ -452,6 +458,9 @@ impl FileSource for GeoParquetFileSource {
self.predicate.clone().unwrap(),
base_config.file_schema.clone(),
self.inner.table_parquet_options().global.pruning,
+ // HACK: Since there is no public API to set inner's metrics, so
we use
+ // inner's metrics as the ExecutionPlan-global metrics
+ self.inner.metrics(),
))
}
diff --git a/rust/sedona-testing/src/data.rs b/rust/sedona-testing/src/data.rs
index cfcdc8b..a6bd991 100644
--- a/rust/sedona-testing/src/data.rs
+++ b/rust/sedona-testing/src/data.rs
@@ -84,6 +84,47 @@ pub fn geoarrow_data_dir() -> Result<String> {
)
}
+/// Find the most likely path to the sedona-testing directory if it exists
+///
+/// This mirrors [`geoarrow_data_dir`] but for the sedona-testing submodule.
+/// It checks the `SEDONA_TESTING_DIR` environment variable first, then
+/// falls back to the typical repository-relative locations.
+pub fn sedona_testing_dir() -> Result<String> {
+ if let Ok(from_env) = env::var("SEDONA_TESTING_DIR") {
+ if fs::exists(&from_env)? {
+ return Ok(from_env);
+ } else {
+ return sedona_internal_err!(
+ "{}\n{}{}{}",
+ "Can't resolve sedona-testing directory because",
+ "the value of the SEDONA_TESTING_DIR (",
+ from_env,
+ ") does not exist"
+ );
+ }
+ }
+
+ let likely_possibilities = [
+ "../../submodules/sedona-testing".to_string(),
+ "submodules/sedona-testing".to_string(),
+ ];
+
+ for possibility in likely_possibilities.into_iter().rev() {
+ if let Ok(exists) = fs::exists(&possibility) {
+ if exists {
+ return Ok(possibility);
+ }
+ }
+ }
+
+ sedona_internal_err!(
+ "{}\n{}\n{}",
+ "Can't resolve sedona-testing directory from the current working
directory",
+ "You may need to run `git submodule init && git submodule update
--recursive` or",
+ "set the SEDONA_TESTING_DIR environment variable"
+ )
+}
+
#[cfg(test)]
mod test {
use super::*;
@@ -113,4 +154,22 @@ mod test {
env::remove_var("SEDONA_GEOARROW_DATA_DIR");
assert!(maybe_file.is_ok());
}
+
+ #[test]
+ fn sedona_testing_dir_resolves() {
+ assert!(sedona_testing_dir().is_ok());
+
+ env::set_var("SEDONA_TESTING_DIR", "this_directory_does_not_exist");
+ let err = sedona_testing_dir();
+ env::remove_var("SEDONA_TESTING_DIR");
+ assert!(err
+ .unwrap_err()
+ .message()
+ .contains("the value of the SEDONA_TESTING_DIR"));
+
+ env::set_var("SEDONA_TESTING_DIR", sedona_testing_dir().unwrap());
+ let maybe_dir = sedona_testing_dir();
+ env::remove_var("SEDONA_TESTING_DIR");
+ assert!(maybe_dir.is_ok());
+ }
}
diff --git a/rust/sedona/tests/metrics.rs b/rust/sedona/tests/metrics.rs
new file mode 100644
index 0000000..520fd8e
--- /dev/null
+++ b/rust/sedona/tests/metrics.rs
@@ -0,0 +1,101 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+use datafusion::arrow::util::pretty::pretty_format_batches;
+use sedona::context::SedonaContext;
+use sedona_testing::data::sedona_testing_dir;
+
+#[tokio::test]
+async fn geo_parquet_metrics() {
+ // Setup and register test table
+ // -----------------------------
+ let ctx = SedonaContext::new_local_interactive()
+ .await
+ .expect("interactive context should initialize");
+
+ let geo_parquet_path = format!(
+ "{}/data/parquet/geoparquet-1.1.0.parquet",
+ sedona_testing_dir().expect("sedona-testing directory should resolve")
+ );
+ let create_table_sql =
+ format!("CREATE EXTERNAL TABLE test STORED AS PARQUET LOCATION
'{geo_parquet_path}'");
+
+ ctx.sql(&create_table_sql)
+ .await
+ .expect("create table should succeed")
+ .collect()
+ .await
+ .expect("collecting create table result should succeed");
+
+ // Test 1: query with spatial predicate that pruned the entire file
+ // ----------------------------------------------------------------
+ let prune_query = r#"
+ EXPLAIN ANALYZE
+ SELECT *
+ FROM test
+ WHERE ST_Intersects(
+ geometry,
+ ST_SetSRID(
+ ST_GeomFromText('POLYGON((-10 84, -10 88, 10 88, 10 84, -10
84))'),
+ 4326
+ )
+ )
+ "#;
+
+ let prune_plan = run_and_format(&ctx, prune_query).await;
+ assert!(prune_plan.contains("files_ranges_spatial_pruned=1"));
+ assert!(prune_plan.contains("files_ranges_spatial_matched=0"));
+ assert!(prune_plan.contains("row_groups_spatial_pruned=0"));
+ assert!(prune_plan.contains("row_groups_spatial_matched=0"));
+
+ // Test 2: query with spatial filter that can't skip any file or row group
+ // -----------------------------------------------------------------------
+ let match_query = r#"
+ EXPLAIN ANALYZE
+ SELECT *
+ FROM test
+ WHERE ST_Intersects(
+ geometry,
+ ST_SetSRID(
+ ST_GeomFromText(
+ 'POLYGON((-180 -18.28799, -180 83.23324, 180 83.23324, 180
-18.28799, -180 -18.28799))'
+ ),
+ 4326
+ )
+ )
+ "#;
+
+ let match_plan = run_and_format(&ctx, match_query).await;
+ assert!(match_plan.contains("files_ranges_spatial_pruned=0"));
+ assert!(match_plan.contains("files_ranges_spatial_matched=1"));
+ assert!(match_plan.contains("row_groups_spatial_pruned=0"));
+ assert!(match_plan.contains("row_groups_spatial_matched=1"));
+}
+
+async fn run_and_format(ctx: &SedonaContext, sql: &str) -> String {
+ let df = ctx
+ .sql(sql.trim())
+ .await
+ .expect("explain analyze query should succeed");
+ let batches = df
+ .collect()
+ .await
+ .expect("collecting explain analyze result should succeed");
+ format!(
+ "{}",
+ pretty_format_batches(&batches).expect("formatting plan should
succeed")
+ )
+}