This is an automated email from the ASF dual-hosted git repository.
jiayu pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/sedona-db.git
The following commit(s) were added to refs/heads/main by this push:
new 9226f66 perf: Using spatial join to run range query with query window
specified as subquery (#91)
9226f66 is described below
commit 9226f662320a5e09177bfe2b4d4a72b552cc4bcb
Author: Kristin Cowalcijk <[email protected]>
AuthorDate: Wed Sep 17 12:50:15 2025 +0800
perf: Using spatial join to run range query with query window specified as
subquery (#91)
---
python/sedonadb/tests/test_sjoin.py | 87 ++++++++++++
rust/sedona-spatial-join/src/exec.rs | 10 ++
rust/sedona-spatial-join/src/optimizer.rs | 212 +++++++++++++++++++++++++++++-
3 files changed, 308 insertions(+), 1 deletion(-)
diff --git a/python/sedonadb/tests/test_sjoin.py
b/python/sedonadb/tests/test_sjoin.py
index 4aa90fa..acf2c41 100644
--- a/python/sedonadb/tests/test_sjoin.py
+++ b/python/sedonadb/tests/test_sjoin.py
@@ -140,3 +140,90 @@ def test_spatial_join_geography(join_type, on):
sedonadb_results = eng_sedonadb.execute_and_collect(sql).to_pandas()
eng_postgis.assert_query_result(sql, sedonadb_results)
+
+
+def test_query_window_in_subquery():
+ with (
+ SedonaDB.create_or_skip() as eng_sedonadb,
+ PostGIS.create_or_skip() as eng_postgis,
+ ):
+ options = json.dumps(
+ {
+ "geom_type": "Point",
+ "seed": 42,
+ }
+ )
+ df_point = eng_sedonadb.execute_and_collect(
+ f"SELECT * FROM sd_random_geometry('{options}') LIMIT 100"
+ )
+ options = json.dumps(
+ {
+ "geom_type": "Polygon",
+ "polygon_hole_rate": 0.5,
+ "num_parts_range": [2, 10],
+ "vertices_per_linestring_range": [2, 10],
+ "size_range": [50, 60],
+ "seed": 43,
+ }
+ )
+ df_polygon = eng_sedonadb.execute_and_collect(
+ f"SELECT * FROM sd_random_geometry('{options}') LIMIT 100"
+ )
+ eng_sedonadb.create_table_arrow("sjoin_point", df_point)
+ eng_sedonadb.create_table_arrow("sjoin_polygon", df_polygon)
+ eng_postgis.create_table_arrow("sjoin_point", df_point)
+ eng_postgis.create_table_arrow("sjoin_polygon", df_polygon)
+
+ # This should be optimized to a spatial join
+ sql = """
+ SELECT id FROM sjoin_point AS L
+ WHERE ST_Intersects(L.geometry, (SELECT R.geometry FROM
sjoin_polygon AS R WHERE R.id = 1))
+ ORDER BY id
+ """
+
+ # Verify that the physical query plan should contain a SpatialJoinExec
+ query_plan = eng_sedonadb.execute_and_collect(f"EXPLAIN
{sql}").to_pandas()
+ assert "SpatialJoinExec" in query_plan.iloc[1, 1]
+
+ sedonadb_results = eng_sedonadb.execute_and_collect(sql).to_pandas()
+ assert len(sedonadb_results) > 0
+ eng_postgis.assert_query_result(sql, sedonadb_results)
+
+
+def test_non_optimizable_subquery():
+ with (
+ SedonaDB.create_or_skip() as eng_sedonadb,
+ PostGIS.create_or_skip() as eng_postgis,
+ ):
+ options = json.dumps(
+ {
+ "geom_type": "Point",
+ "seed": 42,
+ }
+ )
+ df_main = eng_sedonadb.execute_and_collect(
+ f"SELECT * FROM sd_random_geometry('{options}') LIMIT 100"
+ )
+ options = json.dumps(
+ {
+ "geom_type": "Point",
+ "seed": 43,
+ }
+ )
+ df_subquery = eng_sedonadb.execute_and_collect(
+ f"SELECT * FROM sd_random_geometry('{options}') LIMIT 100"
+ )
+ eng_sedonadb.create_table_arrow("sjoin_main", df_main)
+ eng_sedonadb.create_table_arrow("sjoin_subquery", df_subquery)
+ eng_postgis.create_table_arrow("sjoin_main", df_main)
+ eng_postgis.create_table_arrow("sjoin_subquery", df_subquery)
+
+ # This cannot be optimized to a spatial join, but the query result
should still be correct
+ sql = """
+ SELECT id FROM sjoin_main AS L
+ WHERE ST_DWithin(L.geometry, ST_Point(10, 10), (SELECT R.dist
FROM sjoin_subquery AS R WHERE R.id = 1))
+ ORDER BY id
+ """
+ sedonadb_results = eng_sedonadb.execute_and_collect(sql).to_pandas()
+ assert len(sedonadb_results) > 0
+ eng_postgis.assert_query_result(sql, sedonadb_results)
diff --git a/rust/sedona-spatial-join/src/exec.rs
b/rust/sedona-spatial-join/src/exec.rs
index fdbef75..effc895 100644
--- a/rust/sedona-spatial-join/src/exec.rs
+++ b/rust/sedona-spatial-join/src/exec.rs
@@ -1028,6 +1028,16 @@ mod tests {
Ok(())
}
+ #[tokio::test]
+ async fn test_query_window_in_subquery() -> Result<()> {
+ let ((left_schema, left_partitions), (right_schema, right_partitions))
=
+ create_test_data_with_size_range((50.0, 60.0), WKB_GEOMETRY)?;
+ let options = SpatialJoinOptions::default();
+ test_spatial_join_query(&left_schema, &right_schema,
left_partitions.clone(), right_partitions.clone(), &options, 10,
+ "SELECT id FROM L WHERE ST_Intersects(L.geometry, (SELECT
R.geometry FROM R WHERE R.id = 1))").await?;
+ Ok(())
+ }
+
async fn test_with_join_types(join_type: JoinType) -> Result<RecordBatch> {
let ((left_schema, left_partitions), (right_schema, right_partitions))
=
create_test_data_with_empty_partitions()?;
diff --git a/rust/sedona-spatial-join/src/optimizer.rs
b/rust/sedona-spatial-join/src/optimizer.rs
index 0a0ea54..db81641 100644
--- a/rust/sedona-spatial-join/src/optimizer.rs
+++ b/rust/sedona-spatial-join/src/optimizer.rs
@@ -21,6 +21,7 @@ use crate::spatial_predicate::{
DistancePredicate, KNNPredicate, RelationPredicate, SpatialPredicate,
SpatialRelationType,
};
use arrow_schema::{Schema, SchemaRef};
+use datafusion::optimizer::{ApplyOrder, OptimizerConfig, OptimizerRule};
use datafusion::physical_optimizer::sanity_checker::SanityCheckPlan;
use datafusion::{
config::ConfigOptions, execution::session_state::SessionStateBuilder,
@@ -32,7 +33,7 @@ use datafusion_common::{
JoinSide,
};
use datafusion_common::{HashMap, Result};
-use datafusion_expr::Operator;
+use datafusion_expr::{Expr, Filter, Join, JoinType, LogicalPlan, Operator};
use datafusion_physical_expr::expressions::{BinaryExpr, Column, Literal};
use datafusion_physical_expr::{PhysicalExpr, ScalarFunctionExpr};
use datafusion_physical_plan::coalesce_partitions::CoalescePartitionsExec;
@@ -89,6 +90,144 @@ impl PhysicalOptimizerRule for SpatialJoinOptimizer {
}
}
+impl OptimizerRule for SpatialJoinOptimizer {
+ fn name(&self) -> &str {
+ "spatial_join_optimizer"
+ }
+
+ fn apply_order(&self) -> Option<ApplyOrder> {
+ Some(ApplyOrder::BottomUp)
+ }
+
+ /// Try to rewrite the plan containing a spatial Filter on top of a cross
join without on or filter
+ /// to a theta-join with filter. For instance, the following query plan:
+ ///
+ /// ```text
+ /// Filter: st_intersects(l.geom, _scalar_sq_1.geom)
+ /// Left Join (no on, no filter):
+ /// TableScan: l projection=[id, geom]
+ /// SubqueryAlias: __scalar_sq_1
+ /// Projection: r.geom
+ /// Filter: r.id = Int32(1)
+ /// TableScan: r projection=[id, geom]
+ /// ```
+ ///
+ /// will be rewritten to
+ ///
+ /// ```text
+ /// Inner Join: Filter: st_intersects(l.geom, _scalar_sq_1.geom)
+ /// TableScan: l projection=[id, geom]
+ /// SubqueryAlias: __scalar_sq_1
+ /// Projection: r.geom
+ /// Filter: r.id = Int32(1)
+ /// TableScan: r projection=[id, geom]
+ /// ```
+ ///
+ /// This is for enabling this logical join operator to be converted to a
NestedLoopJoin physical
+ /// node with a spatial predicate, so that it could subsequently be
optimized to a SpatialJoin
+ /// physical node. Please refer to the `PhysicalOptimizerRule`
implementation of this struct
+ /// and [SpatialJoinOptimizer::try_optimize_join] for details.
+ fn rewrite(
+ &self,
+ plan: LogicalPlan,
+ config: &dyn OptimizerConfig,
+ ) -> Result<Transformed<LogicalPlan>> {
+ let Some(extension) =
config.options().extensions.get::<SedonaOptions>() else {
+ return Ok(Transformed::no(plan));
+ };
+ if !extension.spatial_join.enable {
+ return Ok(Transformed::no(plan));
+ }
+
+ let LogicalPlan::Filter(Filter {
+ predicate, input, ..
+ }) = &plan
+ else {
+ return Ok(Transformed::no(plan));
+ };
+ if !is_spatial_predicate(predicate) {
+ return Ok(Transformed::no(plan));
+ }
+
+ let LogicalPlan::Join(Join {
+ ref left,
+ ref right,
+ ref on,
+ ref filter,
+ join_type,
+ ref join_constraint,
+ ref null_equality,
+ ..
+ }) = input.as_ref()
+ else {
+ return Ok(Transformed::no(plan));
+ };
+
+ // Check if this is a suitable join for rewriting
+ if !matches!(
+ join_type,
+ JoinType::Inner | JoinType::Left | JoinType::Right
+ ) || !on.is_empty()
+ || filter.is_some()
+ {
+ return Ok(Transformed::no(plan));
+ }
+
+ let rewritten_plan = Join::try_new(
+ Arc::clone(left),
+ Arc::clone(right),
+ on.clone(),
+ Some(predicate.clone()),
+ JoinType::Inner,
+ *join_constraint,
+ *null_equality,
+ )?;
+
+ Ok(Transformed::yes(LogicalPlan::Join(rewritten_plan)))
+ }
+}
+
+/// Check if a given logical expression contains a spatial predicate component
or not. We assume that the given
+/// `expr` evaluates to a boolean value and originates from a filter logical
node.
+fn is_spatial_predicate(expr: &Expr) -> bool {
+ fn is_distance_expr(expr: &Expr) -> bool {
+ let Expr::ScalarFunction(datafusion_expr::expr::ScalarFunction { func,
.. }) = expr else {
+ return false;
+ };
+ func.name().to_lowercase() == "st_distance"
+ }
+
+ match expr {
+ Expr::BinaryExpr(datafusion_expr::expr::BinaryExpr {
+ left, right, op, ..
+ }) => match op {
+ Operator::And => is_spatial_predicate(left) ||
is_spatial_predicate(right),
+ Operator::Lt | Operator::LtEq => is_distance_expr(left),
+ Operator::Gt | Operator::GtEq => is_distance_expr(right),
+ _ => false,
+ },
+ Expr::ScalarFunction(datafusion_expr::expr::ScalarFunction { func, ..
}) => {
+ let func_name = func.name().to_lowercase();
+ matches!(
+ func_name.as_str(),
+ "st_intersects"
+ | "st_contains"
+ | "st_within"
+ | "st_covers"
+ | "st_covered_by"
+ | "st_coveredby"
+ | "st_touches"
+ | "st_crosses"
+ | "st_overlaps"
+ | "st_equals"
+ | "st_dwithin"
+ | "st_knn"
+ )
+ }
+ _ => false,
+ }
+}
+
impl SpatialJoinOptimizer {
/// Rewrite `plan` containing NestedLoopJoinExec or HashJoinExec with
spatial predicates to SpatialJoinExec.
fn try_optimize_join(
@@ -419,6 +558,7 @@ pub fn register_spatial_join_optimizer(
session_state_builder: SessionStateBuilder,
) -> SessionStateBuilder {
session_state_builder
+ .with_optimizer_rule(Arc::new(SpatialJoinOptimizer::new()))
.with_physical_optimizer_rule(Arc::new(SpatialJoinOptimizer::new()))
.with_physical_optimizer_rule(Arc::new(SanityCheckPlan::new()))
}
@@ -2570,4 +2710,74 @@ mod tests {
)
.unwrap());
}
+
+ #[test]
+ fn test_is_spatial_predicate() {
+ use datafusion_expr::ColumnarValue;
+ use datafusion_expr::{col, lit, Expr, ScalarUDF, SimpleScalarUDF};
+
+ // Test 1: ST_ functions should return true
+ let st_intersects_udf = create_dummy_st_intersects_udf();
+ let st_intersects_expr =
Expr::ScalarFunction(datafusion_expr::expr::ScalarFunction {
+ func: st_intersects_udf,
+ args: vec![col("geom1"), col("geom2")],
+ });
+ assert!(super::is_spatial_predicate(&st_intersects_expr));
+
+ // ST_Distance(geom1, geom2) < 100 should return true
+ let st_distance_udf = create_dummy_st_distance_udf();
+ let st_distance_expr =
Expr::ScalarFunction(datafusion_expr::expr::ScalarFunction {
+ func: st_distance_udf,
+ args: vec![col("geom1"), col("geom2")],
+ });
+ let distance_lt_expr =
Expr::BinaryExpr(datafusion_expr::expr::BinaryExpr {
+ left: Box::new(st_distance_expr.clone()),
+ op: Operator::Lt,
+ right: Box::new(lit(100.0)),
+ });
+ assert!(super::is_spatial_predicate(&distance_lt_expr));
+
+ // ST_Distance(geom1, geom2) > 100 should return false
+ let distance_gt_expr =
Expr::BinaryExpr(datafusion_expr::expr::BinaryExpr {
+ left: Box::new(st_distance_expr.clone()),
+ op: Operator::Gt,
+ right: Box::new(lit(100.0)),
+ });
+ assert!(!super::is_spatial_predicate(&distance_gt_expr));
+
+ // AND expressions with spatial predicates should return true
+ let and_expr = Expr::BinaryExpr(datafusion_expr::expr::BinaryExpr {
+ left: Box::new(st_intersects_expr.clone()),
+ op: Operator::And,
+ right: Box::new(col("id").eq(lit(1))),
+ });
+ assert!(super::is_spatial_predicate(&and_expr));
+
+ // Non-spatial expressions should return false
+
+ // Simple column comparison
+ let non_spatial_expr = col("id").eq(lit(1));
+ assert!(!super::is_spatial_predicate(&non_spatial_expr));
+
+ // Not a spatial relationship function
+ let non_st_func =
Expr::ScalarFunction(datafusion_expr::expr::ScalarFunction {
+ func: Arc::new(ScalarUDF::from(SimpleScalarUDF::new(
+ "st_non_spatial_relation_func",
+ vec![DataType::Int32],
+ DataType::Boolean,
+ datafusion_expr::Volatility::Immutable,
+ Arc::new(|_|
Ok(ColumnarValue::Scalar(ScalarValue::Boolean(Some(true))))),
+ ))),
+ args: vec![col("id")],
+ });
+ assert!(!super::is_spatial_predicate(&non_st_func));
+
+ // AND expression with no spatial predicates
+ let non_spatial_and =
Expr::BinaryExpr(datafusion_expr::expr::BinaryExpr {
+ left: Box::new(col("id").eq(lit(1))),
+ op: Operator::And,
+ right: Box::new(col("name").eq(lit("test"))),
+ });
+ assert!(!super::is_spatial_predicate(&non_spatial_and));
+ }
}