This is an automated email from the ASF dual-hosted git repository.

jiayu pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/sedona-db.git


The following commit(s) were added to refs/heads/main by this push:
     new 9226f66  perf: Using spatial join to run range query with query window 
specified as subquery (#91)
9226f66 is described below

commit 9226f662320a5e09177bfe2b4d4a72b552cc4bcb
Author: Kristin Cowalcijk <[email protected]>
AuthorDate: Wed Sep 17 12:50:15 2025 +0800

    perf: Using spatial join to run range query with query window specified as 
subquery (#91)
---
 python/sedonadb/tests/test_sjoin.py       |  87 ++++++++++++
 rust/sedona-spatial-join/src/exec.rs      |  10 ++
 rust/sedona-spatial-join/src/optimizer.rs | 212 +++++++++++++++++++++++++++++-
 3 files changed, 308 insertions(+), 1 deletion(-)

diff --git a/python/sedonadb/tests/test_sjoin.py 
b/python/sedonadb/tests/test_sjoin.py
index 4aa90fa..acf2c41 100644
--- a/python/sedonadb/tests/test_sjoin.py
+++ b/python/sedonadb/tests/test_sjoin.py
@@ -140,3 +140,90 @@ def test_spatial_join_geography(join_type, on):
 
         sedonadb_results = eng_sedonadb.execute_and_collect(sql).to_pandas()
         eng_postgis.assert_query_result(sql, sedonadb_results)
+
+
+def test_query_window_in_subquery():
+    with (
+        SedonaDB.create_or_skip() as eng_sedonadb,
+        PostGIS.create_or_skip() as eng_postgis,
+    ):
+        options = json.dumps(
+            {
+                "geom_type": "Point",
+                "seed": 42,
+            }
+        )
+        df_point = eng_sedonadb.execute_and_collect(
+            f"SELECT * FROM sd_random_geometry('{options}') LIMIT 100"
+        )
+        options = json.dumps(
+            {
+                "geom_type": "Polygon",
+                "polygon_hole_rate": 0.5,
+                "num_parts_range": [2, 10],
+                "vertices_per_linestring_range": [2, 10],
+                "size_range": [50, 60],
+                "seed": 43,
+            }
+        )
+        df_polygon = eng_sedonadb.execute_and_collect(
+            f"SELECT * FROM sd_random_geometry('{options}') LIMIT 100"
+        )
+        eng_sedonadb.create_table_arrow("sjoin_point", df_point)
+        eng_sedonadb.create_table_arrow("sjoin_polygon", df_polygon)
+        eng_postgis.create_table_arrow("sjoin_point", df_point)
+        eng_postgis.create_table_arrow("sjoin_polygon", df_polygon)
+
+        # This should be optimized to a spatial join
+        sql = """
+               SELECT id FROM sjoin_point AS L
+               WHERE ST_Intersects(L.geometry, (SELECT R.geometry FROM 
sjoin_polygon AS R WHERE R.id = 1))
+               ORDER BY id
+               """
+
+        # Verify that the physical query plan should contain a SpatialJoinExec
+        query_plan = eng_sedonadb.execute_and_collect(f"EXPLAIN 
{sql}").to_pandas()
+        assert "SpatialJoinExec" in query_plan.iloc[1, 1]
+
+        sedonadb_results = eng_sedonadb.execute_and_collect(sql).to_pandas()
+        assert len(sedonadb_results) > 0
+        eng_postgis.assert_query_result(sql, sedonadb_results)
+
+
+def test_non_optimizable_subquery():
+    with (
+        SedonaDB.create_or_skip() as eng_sedonadb,
+        PostGIS.create_or_skip() as eng_postgis,
+    ):
+        options = json.dumps(
+            {
+                "geom_type": "Point",
+                "seed": 42,
+            }
+        )
+        df_main = eng_sedonadb.execute_and_collect(
+            f"SELECT * FROM sd_random_geometry('{options}') LIMIT 100"
+        )
+        options = json.dumps(
+            {
+                "geom_type": "Point",
+                "seed": 43,
+            }
+        )
+        df_subquery = eng_sedonadb.execute_and_collect(
+            f"SELECT * FROM sd_random_geometry('{options}') LIMIT 100"
+        )
+        eng_sedonadb.create_table_arrow("sjoin_main", df_main)
+        eng_sedonadb.create_table_arrow("sjoin_subquery", df_subquery)
+        eng_postgis.create_table_arrow("sjoin_main", df_main)
+        eng_postgis.create_table_arrow("sjoin_subquery", df_subquery)
+
+        # This cannot be optimized to a spatial join, but the query result 
should still be correct
+        sql = """
+               SELECT id FROM sjoin_main AS L
+               WHERE ST_DWithin(L.geometry, ST_Point(10, 10), (SELECT R.dist 
FROM sjoin_subquery AS R WHERE R.id = 1))
+               ORDER BY id
+               """
+        sedonadb_results = eng_sedonadb.execute_and_collect(sql).to_pandas()
+        assert len(sedonadb_results) > 0
+        eng_postgis.assert_query_result(sql, sedonadb_results)
diff --git a/rust/sedona-spatial-join/src/exec.rs 
b/rust/sedona-spatial-join/src/exec.rs
index fdbef75..effc895 100644
--- a/rust/sedona-spatial-join/src/exec.rs
+++ b/rust/sedona-spatial-join/src/exec.rs
@@ -1028,6 +1028,16 @@ mod tests {
         Ok(())
     }
 
+    #[tokio::test]
+    async fn test_query_window_in_subquery() -> Result<()> {
+        let ((left_schema, left_partitions), (right_schema, right_partitions)) 
=
+            create_test_data_with_size_range((50.0, 60.0), WKB_GEOMETRY)?;
+        let options = SpatialJoinOptions::default();
+        test_spatial_join_query(&left_schema, &right_schema, 
left_partitions.clone(), right_partitions.clone(), &options, 10,
+                "SELECT id FROM L WHERE ST_Intersects(L.geometry, (SELECT 
R.geometry FROM R WHERE R.id = 1))").await?;
+        Ok(())
+    }
+
     async fn test_with_join_types(join_type: JoinType) -> Result<RecordBatch> {
         let ((left_schema, left_partitions), (right_schema, right_partitions)) 
=
             create_test_data_with_empty_partitions()?;
diff --git a/rust/sedona-spatial-join/src/optimizer.rs 
b/rust/sedona-spatial-join/src/optimizer.rs
index 0a0ea54..db81641 100644
--- a/rust/sedona-spatial-join/src/optimizer.rs
+++ b/rust/sedona-spatial-join/src/optimizer.rs
@@ -21,6 +21,7 @@ use crate::spatial_predicate::{
     DistancePredicate, KNNPredicate, RelationPredicate, SpatialPredicate, 
SpatialRelationType,
 };
 use arrow_schema::{Schema, SchemaRef};
+use datafusion::optimizer::{ApplyOrder, OptimizerConfig, OptimizerRule};
 use datafusion::physical_optimizer::sanity_checker::SanityCheckPlan;
 use datafusion::{
     config::ConfigOptions, execution::session_state::SessionStateBuilder,
@@ -32,7 +33,7 @@ use datafusion_common::{
     JoinSide,
 };
 use datafusion_common::{HashMap, Result};
-use datafusion_expr::Operator;
+use datafusion_expr::{Expr, Filter, Join, JoinType, LogicalPlan, Operator};
 use datafusion_physical_expr::expressions::{BinaryExpr, Column, Literal};
 use datafusion_physical_expr::{PhysicalExpr, ScalarFunctionExpr};
 use datafusion_physical_plan::coalesce_partitions::CoalescePartitionsExec;
@@ -89,6 +90,144 @@ impl PhysicalOptimizerRule for SpatialJoinOptimizer {
     }
 }
 
+impl OptimizerRule for SpatialJoinOptimizer {
+    fn name(&self) -> &str {
+        "spatial_join_optimizer"
+    }
+
+    fn apply_order(&self) -> Option<ApplyOrder> {
+        Some(ApplyOrder::BottomUp)
+    }
+
+    /// Try to rewrite the plan containing a spatial Filter on top of a cross 
join without on or filter
+    /// to a theta-join with filter. For instance, the following query plan:
+    ///
+    /// ```text
+    /// Filter: st_intersects(l.geom, _scalar_sq_1.geom)
+    ///   Left Join (no on, no filter):
+    ///     TableScan: l projection=[id, geom]
+    ///     SubqueryAlias: __scalar_sq_1
+    ///       Projection: r.geom
+    ///         Filter: r.id = Int32(1)
+    ///           TableScan: r projection=[id, geom]
+    /// ```
+    ///
+    /// will be rewritten to
+    ///
+    /// ```text
+    /// Inner Join: Filter: st_intersects(l.geom, _scalar_sq_1.geom)
+    ///   TableScan: l projection=[id, geom]
+    ///   SubqueryAlias: __scalar_sq_1
+    ///     Projection: r.geom
+    ///       Filter: r.id = Int32(1)
+    ///         TableScan: r projection=[id, geom]
+    /// ```
+    ///
+    /// This is for enabling this logical join operator to be converted to a 
NestedLoopJoin physical
+    /// node with a spatial predicate, so that it could subsequently be 
optimized to a SpatialJoin
+    /// physical node. Please refer to the `PhysicalOptimizerRule` 
implementation of this struct
+    /// and [SpatialJoinOptimizer::try_optimize_join] for details.
+    fn rewrite(
+        &self,
+        plan: LogicalPlan,
+        config: &dyn OptimizerConfig,
+    ) -> Result<Transformed<LogicalPlan>> {
+        let Some(extension) = 
config.options().extensions.get::<SedonaOptions>() else {
+            return Ok(Transformed::no(plan));
+        };
+        if !extension.spatial_join.enable {
+            return Ok(Transformed::no(plan));
+        }
+
+        let LogicalPlan::Filter(Filter {
+            predicate, input, ..
+        }) = &plan
+        else {
+            return Ok(Transformed::no(plan));
+        };
+        if !is_spatial_predicate(predicate) {
+            return Ok(Transformed::no(plan));
+        }
+
+        let LogicalPlan::Join(Join {
+            ref left,
+            ref right,
+            ref on,
+            ref filter,
+            join_type,
+            ref join_constraint,
+            ref null_equality,
+            ..
+        }) = input.as_ref()
+        else {
+            return Ok(Transformed::no(plan));
+        };
+
+        // Check if this is a suitable join for rewriting
+        if !matches!(
+            join_type,
+            JoinType::Inner | JoinType::Left | JoinType::Right
+        ) || !on.is_empty()
+            || filter.is_some()
+        {
+            return Ok(Transformed::no(plan));
+        }
+
+        let rewritten_plan = Join::try_new(
+            Arc::clone(left),
+            Arc::clone(right),
+            on.clone(),
+            Some(predicate.clone()),
+            JoinType::Inner,
+            *join_constraint,
+            *null_equality,
+        )?;
+
+        Ok(Transformed::yes(LogicalPlan::Join(rewritten_plan)))
+    }
+}
+
+/// Check if a given logical expression contains a spatial predicate component 
or not. We assume that the given
+/// `expr` evaluates to a boolean value and originates from a filter logical 
node.
+fn is_spatial_predicate(expr: &Expr) -> bool {
+    fn is_distance_expr(expr: &Expr) -> bool {
+        let Expr::ScalarFunction(datafusion_expr::expr::ScalarFunction { func, 
.. }) = expr else {
+            return false;
+        };
+        func.name().to_lowercase() == "st_distance"
+    }
+
+    match expr {
+        Expr::BinaryExpr(datafusion_expr::expr::BinaryExpr {
+            left, right, op, ..
+        }) => match op {
+            Operator::And => is_spatial_predicate(left) || 
is_spatial_predicate(right),
+            Operator::Lt | Operator::LtEq => is_distance_expr(left),
+            Operator::Gt | Operator::GtEq => is_distance_expr(right),
+            _ => false,
+        },
+        Expr::ScalarFunction(datafusion_expr::expr::ScalarFunction { func, .. 
}) => {
+            let func_name = func.name().to_lowercase();
+            matches!(
+                func_name.as_str(),
+                "st_intersects"
+                    | "st_contains"
+                    | "st_within"
+                    | "st_covers"
+                    | "st_covered_by"
+                    | "st_coveredby"
+                    | "st_touches"
+                    | "st_crosses"
+                    | "st_overlaps"
+                    | "st_equals"
+                    | "st_dwithin"
+                    | "st_knn"
+            )
+        }
+        _ => false,
+    }
+}
+
 impl SpatialJoinOptimizer {
     /// Rewrite `plan` containing NestedLoopJoinExec or HashJoinExec with 
spatial predicates to SpatialJoinExec.
     fn try_optimize_join(
@@ -419,6 +558,7 @@ pub fn register_spatial_join_optimizer(
     session_state_builder: SessionStateBuilder,
 ) -> SessionStateBuilder {
     session_state_builder
+        .with_optimizer_rule(Arc::new(SpatialJoinOptimizer::new()))
         .with_physical_optimizer_rule(Arc::new(SpatialJoinOptimizer::new()))
         .with_physical_optimizer_rule(Arc::new(SanityCheckPlan::new()))
 }
@@ -2570,4 +2710,74 @@ mod tests {
         )
         .unwrap());
     }
+
+    #[test]
+    fn test_is_spatial_predicate() {
+        use datafusion_expr::ColumnarValue;
+        use datafusion_expr::{col, lit, Expr, ScalarUDF, SimpleScalarUDF};
+
+        // Test 1: ST_ functions should return true
+        let st_intersects_udf = create_dummy_st_intersects_udf();
+        let st_intersects_expr = 
Expr::ScalarFunction(datafusion_expr::expr::ScalarFunction {
+            func: st_intersects_udf,
+            args: vec![col("geom1"), col("geom2")],
+        });
+        assert!(super::is_spatial_predicate(&st_intersects_expr));
+
+        // ST_Distance(geom1, geom2) < 100 should return true
+        let st_distance_udf = create_dummy_st_distance_udf();
+        let st_distance_expr = 
Expr::ScalarFunction(datafusion_expr::expr::ScalarFunction {
+            func: st_distance_udf,
+            args: vec![col("geom1"), col("geom2")],
+        });
+        let distance_lt_expr = 
Expr::BinaryExpr(datafusion_expr::expr::BinaryExpr {
+            left: Box::new(st_distance_expr.clone()),
+            op: Operator::Lt,
+            right: Box::new(lit(100.0)),
+        });
+        assert!(super::is_spatial_predicate(&distance_lt_expr));
+
+        // ST_Distance(geom1, geom2) > 100 should return false
+        let distance_gt_expr = 
Expr::BinaryExpr(datafusion_expr::expr::BinaryExpr {
+            left: Box::new(st_distance_expr.clone()),
+            op: Operator::Gt,
+            right: Box::new(lit(100.0)),
+        });
+        assert!(!super::is_spatial_predicate(&distance_gt_expr));
+
+        // AND expressions with spatial predicates should return true
+        let and_expr = Expr::BinaryExpr(datafusion_expr::expr::BinaryExpr {
+            left: Box::new(st_intersects_expr.clone()),
+            op: Operator::And,
+            right: Box::new(col("id").eq(lit(1))),
+        });
+        assert!(super::is_spatial_predicate(&and_expr));
+
+        // Non-spatial expressions should return false
+
+        // Simple column comparison
+        let non_spatial_expr = col("id").eq(lit(1));
+        assert!(!super::is_spatial_predicate(&non_spatial_expr));
+
+        // Not a spatial relationship function
+        let non_st_func = 
Expr::ScalarFunction(datafusion_expr::expr::ScalarFunction {
+            func: Arc::new(ScalarUDF::from(SimpleScalarUDF::new(
+                "st_non_spatial_relation_func",
+                vec![DataType::Int32],
+                DataType::Boolean,
+                datafusion_expr::Volatility::Immutable,
+                Arc::new(|_| 
Ok(ColumnarValue::Scalar(ScalarValue::Boolean(Some(true))))),
+            ))),
+            args: vec![col("id")],
+        });
+        assert!(!super::is_spatial_predicate(&non_st_func));
+
+        // AND expression with no spatial predicates
+        let non_spatial_and = 
Expr::BinaryExpr(datafusion_expr::expr::BinaryExpr {
+            left: Box::new(col("id").eq(lit(1))),
+            op: Operator::And,
+            right: Box::new(col("name").eq(lit("test"))),
+        });
+        assert!(!super::is_spatial_predicate(&non_spatial_and));
+    }
 }

Reply via email to