This is an automated email from the ASF dual-hosted git repository.

jiayu pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/sedona-db.git


The following commit(s) were added to refs/heads/main by this push:
     new b320337  feat: Add metrics in `DataSourceExec` related to spatial 
predicate pruning (#173)
b320337 is described below

commit b3203376bf082c19ca9c6f9515bfb0085819c649
Author: Yongting You <[email protected]>
AuthorDate: Sat Oct 4 23:49:30 2025 +0800

    feat: Add metrics in `DataSourceExec` related to spatial predicate pruning 
(#173)
---
 rust/sedona-geoparquet/src/file_opener.rs |  48 ++++++++++++++
 rust/sedona-geoparquet/src/format.rs      |  17 +++--
 rust/sedona-testing/src/data.rs           |  59 +++++++++++++++++
 rust/sedona/tests/metrics.rs              | 101 ++++++++++++++++++++++++++++++
 4 files changed, 221 insertions(+), 4 deletions(-)

diff --git a/rust/sedona-geoparquet/src/file_opener.rs 
b/rust/sedona-geoparquet/src/file_opener.rs
index d507f2f..a71c954 100644
--- a/rust/sedona-geoparquet/src/file_opener.rs
+++ b/rust/sedona-geoparquet/src/file_opener.rs
@@ -24,6 +24,7 @@ use datafusion::datasource::{
 };
 use datafusion_common::Result;
 use datafusion_physical_expr::PhysicalExpr;
+use datafusion_physical_plan::metrics::{Count, ExecutionPlanMetricsSet, 
MetricBuilder};
 use object_store::ObjectStore;
 use parquet::file::{
     metadata::{ParquetMetaData, RowGroupMetaData},
@@ -35,6 +36,40 @@ use sedona_schema::{datatypes::SedonaType, 
matchers::ArgMatcher};
 
 use crate::metadata::GeoParquetMetadata;
 
+#[derive(Clone)]
+struct GeoParquetFileOpenerMetrics {
+    /// How many file ranges are pruned by [`SpatialFilter`]
+    ///
+    /// Note on "file range": an opener may read only part of a file rather 
than the
+    /// entire file; that portion is referred to as the "file range". See 
[`PartitionedFile`]
+    /// for details.
+    files_ranges_spatial_pruned: Count,
+    /// How many file ranges are matched by [`SpatialFilter`]
+    files_ranges_spatial_matched: Count,
+    /// How many row groups are pruned by [`SpatialFilter`]
+    ///
+    /// Note: row groups skipped during the file-level pruning step are not 
counted
+    /// again here.
+    row_groups_spatial_pruned: Count,
+    /// How many row groups are matched by [`SpatialFilter`]
+    row_groups_spatial_matched: Count,
+}
+
+impl GeoParquetFileOpenerMetrics {
+    fn new(execution_plan_global_metrics: &ExecutionPlanMetricsSet) -> Self {
+        Self {
+            files_ranges_spatial_pruned: 
MetricBuilder::new(execution_plan_global_metrics)
+                .global_counter("files_ranges_spatial_pruned"),
+            files_ranges_spatial_matched: 
MetricBuilder::new(execution_plan_global_metrics)
+                .global_counter("files_ranges_spatial_matched"),
+            row_groups_spatial_pruned: 
MetricBuilder::new(execution_plan_global_metrics)
+                .global_counter("row_groups_spatial_pruned"),
+            row_groups_spatial_matched: 
MetricBuilder::new(execution_plan_global_metrics)
+                .global_counter("row_groups_spatial_matched"),
+        }
+    }
+}
+
 /// Geo-aware [FileOpener] implementing file and row group pruning
 ///
 /// Pruning happens (for Parquet) in the [FileOpener], so we implement
@@ -47,6 +82,7 @@ pub struct GeoParquetFileOpener {
     predicate: Arc<dyn PhysicalExpr>,
     file_schema: SchemaRef,
     enable_pruning: bool,
+    metrics: GeoParquetFileOpenerMetrics,
 }
 
 impl GeoParquetFileOpener {
@@ -58,6 +94,7 @@ impl GeoParquetFileOpener {
         predicate: Arc<dyn PhysicalExpr>,
         file_schema: SchemaRef,
         enable_pruning: bool,
+        execution_plan_global_metrics: &ExecutionPlanMetricsSet,
     ) -> Self {
         Self {
             inner,
@@ -66,6 +103,7 @@ impl GeoParquetFileOpener {
             predicate,
             file_schema,
             enable_pruning,
+            metrics: 
GeoParquetFileOpenerMetrics::new(execution_plan_global_metrics),
         }
     }
 }
@@ -96,6 +134,7 @@ impl FileOpener for GeoParquetFileOpener {
                         &mut access_plan,
                         &spatial_filter,
                         &geoparquet_metadata,
+                        &self_clone.metrics,
                     )?;
 
                     filter_access_plan_using_geoparquet_covering(
@@ -104,6 +143,7 @@ impl FileOpener for GeoParquetFileOpener {
                         &spatial_filter,
                         &geoparquet_metadata,
                         &parquet_metadata,
+                        &self_clone.metrics,
                     )?;
                 }
             }
@@ -135,12 +175,16 @@ fn filter_access_plan_using_geoparquet_file_metadata(
     access_plan: &mut ParquetAccessPlan,
     spatial_filter: &SpatialFilter,
     metadata: &GeoParquetMetadata,
+    metrics: &GeoParquetFileOpenerMetrics,
 ) -> Result<()> {
     let table_geo_stats = geoparquet_file_geo_stats(file_schema, metadata)?;
     if !spatial_filter.evaluate(&table_geo_stats) {
+        metrics.files_ranges_spatial_pruned.add(1);
         for i in access_plan.row_group_indexes() {
             access_plan.skip(i);
         }
+    } else {
+        metrics.files_ranges_spatial_matched.add(1);
     }
 
     Ok(())
@@ -156,6 +200,7 @@ fn filter_access_plan_using_geoparquet_covering(
     spatial_filter: &SpatialFilter,
     metadata: &GeoParquetMetadata,
     parquet_metadata: &ParquetMetaData,
+    metrics: &GeoParquetFileOpenerMetrics,
 ) -> Result<()> {
     let row_group_indices_to_scan = access_plan.row_group_indexes();
 
@@ -176,7 +221,10 @@ fn filter_access_plan_using_geoparquet_covering(
 
         // Evaluate predicate!
         if !spatial_filter.evaluate(&row_group_geo_stats) {
+            metrics.row_groups_spatial_pruned.add(1);
             access_plan.skip(i);
+        } else {
+            metrics.row_groups_spatial_matched.add(1);
         }
     }
 
diff --git a/rust/sedona-geoparquet/src/format.rs 
b/rust/sedona-geoparquet/src/format.rs
index 94576ab..bed3dd8 100644
--- a/rust/sedona-geoparquet/src/format.rs
+++ b/rust/sedona-geoparquet/src/format.rs
@@ -364,15 +364,21 @@ impl GeoParquetFileSource {
         predicate: Option<Arc<dyn PhysicalExpr>>,
     ) -> Result<Self> {
         if let Some(parquet_source) = 
inner.as_any().downcast_ref::<ParquetSource>() {
-            let mut parquet_source = parquet_source.clone();
+            let parquet_source = parquet_source.clone();
             // Extract the predicate from the existing source if it exists so 
we can keep a copy of it
             let new_predicate = match (parquet_source.predicate().cloned(), 
predicate) {
                 (None, None) => None,
                 (None, Some(specified_predicate)) => Some(specified_predicate),
                 (Some(inner_predicate), None) => Some(inner_predicate),
-                (Some(_), Some(specified_predicate)) => {
-                    parquet_source = 
parquet_source.with_predicate(specified_predicate.clone());
-                    Some(specified_predicate)
+                (Some(inner_predicate), Some(specified_predicate)) => {
+                    // Sanity check: predicate in `GeoParquetFileSource` is 
init
+                    // from its inner ParquetSource's predicate, they should be
+                    // equivalent.
+                    if Arc::ptr_eq(&inner_predicate, &specified_predicate) {
+                        Some(inner_predicate)
+                    } else {
+                        return sedona_internal_err!("Inner predicate should be 
equivalent to the predicate in `GeoParquetFileSource`");
+                    }
                 }
             };
 
@@ -452,6 +458,9 @@ impl FileSource for GeoParquetFileSource {
             self.predicate.clone().unwrap(),
             base_config.file_schema.clone(),
             self.inner.table_parquet_options().global.pruning,
+            // HACK: Since there is no public API to set inner's metrics, so 
we use
+            // inner's metrics as the ExecutionPlan-global metrics
+            self.inner.metrics(),
         ))
     }
 
diff --git a/rust/sedona-testing/src/data.rs b/rust/sedona-testing/src/data.rs
index cfcdc8b..a6bd991 100644
--- a/rust/sedona-testing/src/data.rs
+++ b/rust/sedona-testing/src/data.rs
@@ -84,6 +84,47 @@ pub fn geoarrow_data_dir() -> Result<String> {
     )
 }
 
+/// Find the most likely path to the sedona-testing directory if it exists
+///
+/// This mirrors [`geoarrow_data_dir`] but for the sedona-testing submodule.
+/// It checks the `SEDONA_TESTING_DIR` environment variable first, then
+/// falls back to the typical repository-relative locations.
+pub fn sedona_testing_dir() -> Result<String> {
+    if let Ok(from_env) = env::var("SEDONA_TESTING_DIR") {
+        if fs::exists(&from_env)? {
+            return Ok(from_env);
+        } else {
+            return sedona_internal_err!(
+                "{}\n{}{}{}",
+                "Can't resolve sedona-testing directory because",
+                "the value of the SEDONA_TESTING_DIR (",
+                from_env,
+                ") does not exist"
+            );
+        }
+    }
+
+    let likely_possibilities = [
+        "../../submodules/sedona-testing".to_string(),
+        "submodules/sedona-testing".to_string(),
+    ];
+
+    for possibility in likely_possibilities.into_iter().rev() {
+        if let Ok(exists) = fs::exists(&possibility) {
+            if exists {
+                return Ok(possibility);
+            }
+        }
+    }
+
+    sedona_internal_err!(
+        "{}\n{}\n{}",
+        "Can't resolve sedona-testing directory from the current working 
directory",
+        "You may need to run `git submodule init && git submodule update 
--recursive` or",
+        "set the SEDONA_TESTING_DIR environment variable"
+    )
+}
+
 #[cfg(test)]
 mod test {
     use super::*;
@@ -113,4 +154,22 @@ mod test {
         env::remove_var("SEDONA_GEOARROW_DATA_DIR");
         assert!(maybe_file.is_ok());
     }
+
+    #[test]
+    fn sedona_testing_dir_resolves() {
+        assert!(sedona_testing_dir().is_ok());
+
+        env::set_var("SEDONA_TESTING_DIR", "this_directory_does_not_exist");
+        let err = sedona_testing_dir();
+        env::remove_var("SEDONA_TESTING_DIR");
+        assert!(err
+            .unwrap_err()
+            .message()
+            .contains("the value of the SEDONA_TESTING_DIR"));
+
+        env::set_var("SEDONA_TESTING_DIR", sedona_testing_dir().unwrap());
+        let maybe_dir = sedona_testing_dir();
+        env::remove_var("SEDONA_TESTING_DIR");
+        assert!(maybe_dir.is_ok());
+    }
 }
diff --git a/rust/sedona/tests/metrics.rs b/rust/sedona/tests/metrics.rs
new file mode 100644
index 0000000..520fd8e
--- /dev/null
+++ b/rust/sedona/tests/metrics.rs
@@ -0,0 +1,101 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+use datafusion::arrow::util::pretty::pretty_format_batches;
+use sedona::context::SedonaContext;
+use sedona_testing::data::sedona_testing_dir;
+
+#[tokio::test]
+async fn geo_parquet_metrics() {
+    // Setup and register test table
+    // -----------------------------
+    let ctx = SedonaContext::new_local_interactive()
+        .await
+        .expect("interactive context should initialize");
+
+    let geo_parquet_path = format!(
+        "{}/data/parquet/geoparquet-1.1.0.parquet",
+        sedona_testing_dir().expect("sedona-testing directory should resolve")
+    );
+    let create_table_sql =
+        format!("CREATE EXTERNAL TABLE test STORED AS PARQUET LOCATION 
'{geo_parquet_path}'");
+
+    ctx.sql(&create_table_sql)
+        .await
+        .expect("create table should succeed")
+        .collect()
+        .await
+        .expect("collecting create table result should succeed");
+
+    // Test 1: query with spatial predicate that pruned the entire file
+    // ----------------------------------------------------------------
+    let prune_query = r#"
+        EXPLAIN ANALYZE
+        SELECT *
+        FROM test
+        WHERE ST_Intersects(
+            geometry,
+            ST_SetSRID(
+                ST_GeomFromText('POLYGON((-10 84, -10 88, 10 88, 10 84, -10 
84))'),
+                4326
+            )
+        )
+    "#;
+
+    let prune_plan = run_and_format(&ctx, prune_query).await;
+    assert!(prune_plan.contains("files_ranges_spatial_pruned=1"));
+    assert!(prune_plan.contains("files_ranges_spatial_matched=0"));
+    assert!(prune_plan.contains("row_groups_spatial_pruned=0"));
+    assert!(prune_plan.contains("row_groups_spatial_matched=0"));
+
+    // Test 2: query with spatial filter that can't skip any file or row group
+    // -----------------------------------------------------------------------
+    let match_query = r#"
+        EXPLAIN ANALYZE
+        SELECT *
+        FROM test
+        WHERE ST_Intersects(
+            geometry,
+            ST_SetSRID(
+                ST_GeomFromText(
+                    'POLYGON((-180 -18.28799, -180 83.23324, 180 83.23324, 180 
-18.28799, -180 -18.28799))'
+                ),
+                4326
+            )
+        )
+    "#;
+
+    let match_plan = run_and_format(&ctx, match_query).await;
+    assert!(match_plan.contains("files_ranges_spatial_pruned=0"));
+    assert!(match_plan.contains("files_ranges_spatial_matched=1"));
+    assert!(match_plan.contains("row_groups_spatial_pruned=0"));
+    assert!(match_plan.contains("row_groups_spatial_matched=1"));
+}
+
+async fn run_and_format(ctx: &SedonaContext, sql: &str) -> String {
+    let df = ctx
+        .sql(sql.trim())
+        .await
+        .expect("explain analyze query should succeed");
+    let batches = df
+        .collect()
+        .await
+        .expect("collecting explain analyze result should succeed");
+    format!(
+        "{}",
+        pretty_format_batches(&batches).expect("formatting plan should 
succeed")
+    )
+}

Reply via email to