Copilot commented on code in PR #562:
URL: https://github.com/apache/sedona-db/pull/562#discussion_r2793652014


##########
rust/sedona-spatial-join/src/planner/logical_plan_node.rs:
##########
@@ -0,0 +1,129 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::cmp::Ordering;
+use std::fmt;
+use std::sync::Arc;
+
+use datafusion_common::{plan_err, DFSchemaRef, NullEquality, Result};
+use datafusion_expr::logical_plan::UserDefinedLogicalNodeCore;
+use datafusion_expr::{Expr, JoinConstraint, JoinType, LogicalPlan};
+
+/// Logical extension node used as a planning hook for spatial joins.
+///
+/// Carries a join's inputs and filter expression so the physical planner can 
recognize and plan
+/// a `SpatialJoinExec`.
+#[derive(PartialEq, Eq, Hash)]
+pub(crate) struct SpatialJoinPlanNode {
+    pub left: LogicalPlan,
+    pub right: LogicalPlan,
+    pub join_type: JoinType,
+    pub filter: Expr,
+    pub schema: DFSchemaRef,
+    pub join_constraint: JoinConstraint,
+    pub null_equality: NullEquality,
+}
+
+// Manual implementation needed because of `schema` field. Comparison excludes 
this field.
+// See 
https://github.com/apache/datafusion/blob/52.1.0/datafusion/expr/src/logical_plan/plan.rs#L3886
+impl PartialOrd for SpatialJoinPlanNode {
+    fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
+        #[derive(PartialEq, PartialOrd)]
+        struct ComparableJoin<'a> {
+            pub left: &'a LogicalPlan,
+            pub right: &'a LogicalPlan,
+            pub filter: &'a Expr,
+            pub join_type: &'a JoinType,
+            pub join_constraint: &'a JoinConstraint,
+            pub null_equality: &'a NullEquality,
+        }
+        let comparable_self = ComparableJoin {
+            left: &self.left,
+            right: &self.right,
+            filter: &self.filter,
+            join_type: &self.join_type,
+            join_constraint: &self.join_constraint,
+            null_equality: &self.null_equality,
+        };
+        let comparable_other = ComparableJoin {
+            left: &other.left,
+            right: &other.right,
+            filter: &other.filter,
+            join_type: &other.join_type,
+            join_constraint: &other.join_constraint,
+            null_equality: &self.null_equality,

Review Comment:
   `ComparableJoin` for `other` incorrectly uses `&self.null_equality` instead 
of `&other.null_equality`, which can produce wrong ordering comparisons (and 
can violate `PartialOrd` consistency). Change line 68 to reference 
`other.null_equality`.
   ```suggestion
               null_equality: &other.null_equality,
   ```



##########
rust/sedona-spatial-join/src/utils/join_utils.rs:
##########
@@ -715,11 +778,154 @@ pub(crate) fn boundedness_from_children<'a>(
     }
 }
 
+pub(crate) fn compute_join_emission_type(
+    left: &Arc<dyn ExecutionPlan>,
+    right: &Arc<dyn ExecutionPlan>,
+    join_type: JoinType,
+    probe_side: JoinSide,
+) -> EmissionType {
+    let (build, probe) = if probe_side == JoinSide::Left {
+        (right, left)
+    } else {
+        (left, right)
+    };
+
+    if build.boundedness().is_unbounded() {
+        return EmissionType::Final;
+    }
+
+    if probe.pipeline_behavior() == EmissionType::Incremental {
+        match join_type {
+            // If we only need to generate matched rows from the probe side,
+            // we can emit rows incrementally.
+            JoinType::Inner => EmissionType::Incremental,
+            JoinType::Right | JoinType::RightSemi | JoinType::RightAnti | 
JoinType::RightMark => {
+                if probe_side == JoinSide::Right {
+                    EmissionType::Incremental
+                } else {
+                    EmissionType::Both
+                }
+            }
+            // If we need to generate unmatched rows from the *build side*,
+            // we need to emit them at the end.
+            JoinType::Left | JoinType::LeftSemi | JoinType::LeftAnti | 
JoinType::LeftMark => {
+                if probe_side == JoinSide::Left {
+                    EmissionType::Incremental
+                } else {
+                    EmissionType::Both
+                }
+            }
+            JoinType::Full => EmissionType::Both,
+        }
+    } else {
+        probe.pipeline_behavior()
+    }
+}
+
+/// Data required to push down a projection through a spatial join.
+/// This is mostly taken from 
https://github.com/apache/datafusion/blob/51.0.0/datafusion/physical-plan/src/projection.rs
+pub(crate) struct JoinPushdownData {
+    pub projected_left_child: ProjectionExec,
+    pub projected_right_child: ProjectionExec,
+    pub join_filter: Option<JoinFilter>,
+    pub join_on: SpatialPredicate,
+}
+
+/// Push down the given `projection` through the spatial join.
+/// This code is adapted from 
https://github.com/apache/datafusion/blob/51.0.0/datafusion/physical-plan/src/projection.rs
+pub(crate) fn try_pushdown_through_join(
+    projection: &ProjectionExec,
+    join_left: &Arc<dyn ExecutionPlan>,
+    join_right: &Arc<dyn ExecutionPlan>,
+    join_schema: &SchemaRef,
+    join_type: JoinType,
+    join_filter: Option<&JoinFilter>,
+    join_on: &SpatialPredicate,
+) -> Result<Option<JoinPushdownData>> {
+    let Some(projection_as_columns) = 
physical_to_column_exprs(projection.expr()) else {
+        return Ok(None);
+    };
+
+    // Mark joins produce a synthetic column that does not belong to either 
child. This synthetic
+    // `mark` column will make `new_join_children` fail, so we skip pushdown 
for such joins.
+    // This limitation if inherited from DataFusion's builtin 
`try_pushdown_through_join`.

Review Comment:
   Fix grammar in comment: change 'This limitation if inherited' to 'This 
limitation is inherited'.
   ```suggestion
       // This limitation is inherited from DataFusion's builtin 
`try_pushdown_through_join`.
   ```



##########
rust/sedona-spatial-join/src/planner/optimizer.rs:
##########
@@ -0,0 +1,231 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+use std::sync::Arc;
+
+use crate::planner::logical_plan_node::SpatialJoinPlanNode;
+use crate::planner::spatial_expr_utils::collect_spatial_predicate_names;
+use crate::planner::spatial_expr_utils::is_spatial_predicate;
+use datafusion::execution::session_state::SessionStateBuilder;
+use datafusion::optimizer::{ApplyOrder, OptimizerConfig, OptimizerRule};
+use datafusion_common::tree_node::Transformed;
+use datafusion_common::NullEquality;
+use datafusion_common::Result;
+use datafusion_expr::logical_plan::Extension;
+use datafusion_expr::{BinaryExpr, Expr, Operator};
+use datafusion_expr::{Filter, Join, JoinType, LogicalPlan};
+use sedona_common::option::SedonaOptions;
+
+/// Register only the logical spatial join optimizer rule.
+///
+/// This enables building `Join(filter=...)` from patterns like 
`Filter(CrossJoin)`.
+/// It intentionally does not register any physical plan rewrite rules.
+pub fn register_spatial_join_logical_optimizer(
+    session_state_builder: SessionStateBuilder,
+) -> SessionStateBuilder {
+    session_state_builder
+        .with_optimizer_rule(Arc::new(MergeSpatialProjectionIntoJoin))
+        .with_optimizer_rule(Arc::new(SpatialJoinLogicalRewrite))
+}
+/// Logical optimizer rule that enables spatial join planning.
+///
+/// This rule turns eligible `Join(filter=...)` nodes into a 
`SpatialJoinPlanNode` extension.
+#[derive(Default, Debug)]
+struct SpatialJoinLogicalRewrite;
+
+impl OptimizerRule for SpatialJoinLogicalRewrite {
+    fn name(&self) -> &str {
+        "spatial_join_logical_rewrite"
+    }
+
+    fn apply_order(&self) -> Option<ApplyOrder> {
+        Some(ApplyOrder::BottomUp)
+    }
+
+    fn supports_rewrite(&self) -> bool {
+        true
+    }
+
+    fn rewrite(
+        &self,
+        plan: LogicalPlan,
+        config: &dyn OptimizerConfig,
+    ) -> Result<Transformed<LogicalPlan>> {
+        let options = config.options();
+        let Some(ext) = options.extensions.get::<SedonaOptions>() else {
+            return Ok(Transformed::no(plan));
+        };
+        if !ext.spatial_join.enable {
+            return Ok(Transformed::no(plan));
+        }
+
+        let LogicalPlan::Join(join) = &plan else {
+            return Ok(Transformed::no(plan));
+        };
+
+        // v1: only rewrite joins that already have a spatial predicate in 
`filter`.
+        let Some(filter) = join.filter.as_ref() else {
+            return Ok(Transformed::no(plan));
+        };
+
+        let spatial_predicate_names = collect_spatial_predicate_names(filter);
+        if spatial_predicate_names.is_empty() {
+            return Ok(Transformed::no(plan));
+        }
+
+        // Join with with equi-join condition and spatial join condition. Only 
handle it
+        // when the join condition contains ST_KNN. KNN join is not a regular 
join and
+        // ST_KNN is also not a regular predicate. It must be handled by our 
spatial join exec.
+        if !join.on.is_empty() && !spatial_predicate_names.contains("st_knn") {
+            return Ok(Transformed::no(plan));
+        }
+
+        // Build new filter expression including equi-join conditions
+        let filter = filter.clone();
+        let eq_op = if join.null_equality == NullEquality::NullEqualsNothing {
+            Operator::Eq
+        } else {
+            Operator::IsNotDistinctFrom
+        };
+        let filter = join.on.iter().fold(filter, |acc, (l, r)| {
+            let eq_expr = Expr::BinaryExpr(BinaryExpr::new(
+                Box::new(l.clone()),
+                eq_op,
+                Box::new(r.clone()),
+            ));
+            Expr::and(acc, eq_expr)
+        });
+
+        let schema = Arc::clone(&join.schema);
+        let node = SpatialJoinPlanNode {
+            left: join.left.as_ref().clone(),
+            right: join.right.as_ref().clone(),
+            join_type: join.join_type,
+            filter,
+            schema,
+            join_constraint: join.join_constraint,
+            null_equality: join.null_equality,
+        };
+
+        Ok(Transformed::yes(LogicalPlan::Extension(Extension {
+            node: Arc::new(node),
+        })))
+    }
+}
+
+/// Logical optimizer rule that enables spatial join planning.
+///
+/// This rule turns eligible `Filter(Join(filter=...))` nodes into a 
`Join(filter=...)` node,
+/// so that the spatial join can be rewritten later by 
[SpatialJoinLogicalRewrite].
+#[derive(Debug, Default)]
+struct MergeSpatialProjectionIntoJoin;
+
+impl OptimizerRule for MergeSpatialProjectionIntoJoin {
+    fn name(&self) -> &str {
+        "spatial_join_optimizer"

Review Comment:
   The rule name `spatial_join_optimizer` is ambiguous/misleading here (and it 
matches the old optimizer naming), but this rule specifically merges 
`Filter(CrossJoin)` into `Join(filter=...)`. Consider renaming to something 
more specific like `merge_spatial_filter_into_join` (or similar) to make 
optimizer traces and debug output clearer.
   ```suggestion
           "merge_spatial_filter_into_join"
   ```



##########
rust/sedona-spatial-join/src/planner/physical_planner.rs:
##########
@@ -0,0 +1,263 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::collections::HashMap;
+use std::fmt;
+use std::sync::Arc;
+
+use async_trait::async_trait;
+
+use arrow_schema::Schema;
+
+use datafusion::execution::context::QueryPlanner;
+use datafusion::execution::session_state::{SessionState, SessionStateBuilder};
+use datafusion::physical_plan::ExecutionPlan;
+use datafusion::physical_planner::{DefaultPhysicalPlanner, ExtensionPlanner, 
PhysicalPlanner};
+use datafusion_common::{plan_err, DFSchema, Result};
+use datafusion_expr::logical_plan::UserDefinedLogicalNode;
+use datafusion_expr::LogicalPlan;
+use datafusion_physical_expr::create_physical_expr;
+use datafusion_physical_plan::joins::utils::JoinFilter;
+use datafusion_physical_plan::joins::NestedLoopJoinExec;
+use sedona_common::sedona_internal_err;
+
+use crate::exec::SpatialJoinExec;
+use crate::planner::logical_plan_node::SpatialJoinPlanNode;
+use crate::planner::spatial_expr_utils::{is_spatial_predicate_supported, 
transform_join_filter};
+use crate::spatial_predicate::SpatialPredicate;
+use sedona_common::option::SedonaOptions;
+
+/// Registers a query planner that can produce [`SpatialJoinExec`] from a 
logical extension node.
+pub fn register_spatial_join_planner(builder: SessionStateBuilder) -> 
SessionStateBuilder {
+    builder.with_query_planner(Arc::new(SedonaSpatialQueryPlanner))
+}
+
+/// Query planner that enables Sedona's spatial join planning.
+///
+/// Installs an [`ExtensionPlanner`] that recognizes `SpatialJoinPlanNode` and 
produces
+/// `SpatialJoinExec` when supported and enabled.
+pub struct SedonaSpatialQueryPlanner;
+
+impl fmt::Debug for SedonaSpatialQueryPlanner {
+    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+        f.debug_struct("SedonaSpatialQueryPlanner").finish()
+    }
+}
+
+#[async_trait]
+impl QueryPlanner for SedonaSpatialQueryPlanner {
+    async fn create_physical_plan(
+        &self,
+        logical_plan: &LogicalPlan,
+        session_state: &SessionState,
+    ) -> Result<Arc<dyn ExecutionPlan>> {
+        let physical_planner = 
DefaultPhysicalPlanner::with_extension_planners(vec![Arc::new(
+            SpatialJoinExtensionPlanner {},
+        )]);
+        physical_planner
+            .create_physical_plan(logical_plan, session_state)
+            .await
+    }
+}
+
+/// Physical planner hook for `SpatialJoinPlanNode`.
+struct SpatialJoinExtensionPlanner;
+
+#[async_trait]
+impl ExtensionPlanner for SpatialJoinExtensionPlanner {
+    async fn plan_extension(
+        &self,
+        _planner: &dyn PhysicalPlanner,
+        node: &dyn UserDefinedLogicalNode,
+        logical_inputs: &[&LogicalPlan],
+        physical_inputs: &[Arc<dyn ExecutionPlan>],
+        session_state: &SessionState,
+    ) -> Result<Option<Arc<dyn ExecutionPlan>>> {
+        let Some(spatial_node) = 
node.as_any().downcast_ref::<SpatialJoinPlanNode>() else {
+            return Ok(None);
+        };
+
+        let Some(ext) = session_state
+            .config_options()
+            .extensions
+            .get::<SedonaOptions>()
+        else {
+            return sedona_internal_err!("SedonaOptions not found in session 
state extensions");
+        };
+
+        if !ext.spatial_join.enable {
+            return sedona_internal_err!("Spatial join is disabled in 
SedonaOptions");

Review Comment:
   Because this is a planner hook, returning an internal error here can turn a 
planning mismatch into a hard query failure. It would be more robust to return 
`Ok(None)` (let DataFusion handle the extension node / or fail with a clearer 
`plan_err!`), or fall back to planning a regular join when `SedonaOptions` is 
missing/disabled.
   ```suggestion
           let ext = match session_state
               .config_options()
               .extensions
               .get::<SedonaOptions>()
           {
               Some(ext) => ext,
               None => {
                   // Let DataFusion handle the extension node or fail with a 
clearer error
                   return Ok(None);
               }
           };
   
           if !ext.spatial_join.enable {
               // Spatial joins are disabled; opt out of planning this extension
               return Ok(None);
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to