paleolimbot commented on code in PR #465:
URL: https://github.com/apache/sedona-db/pull/465#discussion_r2663410606


##########
rust/sedona-spatial-join-gpu/src/config.rs:
##########
@@ -0,0 +1,89 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use datafusion::logical_expr::JoinType;
+use datafusion_physical_plan::joins::utils::JoinFilter;
+
+#[derive(Debug, Clone)]
+pub struct GpuSpatialJoinConfig {
+    /// Join type (Inner, Left, Right, Full)
+    pub join_type: JoinType,
+
+    /// Left geometry column information
+    pub left_geom_column: GeometryColumnInfo,
+
+    /// Right geometry column information
+    pub right_geom_column: GeometryColumnInfo,
+
+    /// Spatial predicate for the join
+    pub predicate: GpuSpatialPredicate,
+
+    /// GPU device ID to use
+    pub device_id: i32,
+
+    /// Batch size for GPU processing
+    pub batch_size: usize,
+
+    /// Additional join filters (from WHERE clause)
+    pub additional_filters: Option<JoinFilter>,
+
+    /// Maximum GPU memory to use (bytes, None = unlimited)
+    pub max_memory: Option<usize>,
+
+    /// Fall back to CPU if GPU fails
+    pub fallback_to_cpu: bool,
+}
+
+#[derive(Debug, Clone)]
+pub struct GeometryColumnInfo {
+    /// Column name
+    pub name: String,
+
+    /// Column index in schema
+    pub index: usize,
+}
+
+#[derive(Debug, Clone, Copy)]
+pub enum GpuSpatialPredicate {
+    /// Relation predicate (Intersects, Contains, etc.)
+    Relation(sedona_libgpuspatial::SpatialPredicate),
+    // Future extensions: Distance, KNN
+}
+
+impl Default for GpuSpatialJoinConfig {
+    fn default() -> Self {
+        Self {
+            join_type: JoinType::Inner,
+            left_geom_column: GeometryColumnInfo {
+                name: "geometry".to_string(),
+                index: 0,
+            },
+            right_geom_column: GeometryColumnInfo {
+                name: "geometry".to_string(),
+                index: 0,
+            },
+            predicate: GpuSpatialPredicate::Relation(
+                sedona_libgpuspatial::SpatialPredicate::Intersects,
+            ),
+            device_id: 0,
+            batch_size: 8192,

Review Comment:
   Should this be `Option<usize>` so that it can default to the 
`datafusion.batch_size` setting?



##########
rust/sedona-spatial-join-gpu/benches/gpu_spatial_join.rs:
##########
@@ -0,0 +1,360 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use arrow::datatypes::{DataType, Field, Schema};
+use arrow_array::{Int32Array, RecordBatch};
+use criterion::{criterion_group, criterion_main, BenchmarkId, Criterion};
+use datafusion::execution::context::TaskContext;
+use datafusion::physical_plan::ExecutionPlan;
+use futures::StreamExt;
+use sedona_schema::crs::lnglat;
+use sedona_schema::datatypes::{Edges, SedonaType, WKB_GEOMETRY};
+use sedona_spatial_join_gpu::{
+    GeometryColumnInfo, GpuSpatialJoinConfig, GpuSpatialJoinExec, 
GpuSpatialPredicate,
+    SpatialPredicate,
+};
+use sedona_testing::create::create_array_storage;
+use std::sync::Arc;
+use tokio::runtime::Runtime;
+
+// Helper execution plan that returns a single pre-loaded batch
+struct SingleBatchExec {
+    schema: Arc<Schema>,
+    batch: RecordBatch,
+    props: datafusion::physical_plan::PlanProperties,
+}

Review Comment:
   This seems very similar to `SessionContext::register_batch()` and is a lot 
of lines of code. Do we need this?



##########
rust/sedona-spatial-join-gpu/benches/gpu_spatial_join.rs:
##########
@@ -0,0 +1,360 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use arrow::datatypes::{DataType, Field, Schema};
+use arrow_array::{Int32Array, RecordBatch};
+use criterion::{criterion_group, criterion_main, BenchmarkId, Criterion};
+use datafusion::execution::context::TaskContext;
+use datafusion::physical_plan::ExecutionPlan;
+use futures::StreamExt;
+use sedona_schema::crs::lnglat;
+use sedona_schema::datatypes::{Edges, SedonaType, WKB_GEOMETRY};
+use sedona_spatial_join_gpu::{
+    GeometryColumnInfo, GpuSpatialJoinConfig, GpuSpatialJoinExec, 
GpuSpatialPredicate,
+    SpatialPredicate,
+};
+use sedona_testing::create::create_array_storage;
+use std::sync::Arc;
+use tokio::runtime::Runtime;
+
+// Helper execution plan that returns a single pre-loaded batch
+struct SingleBatchExec {
+    schema: Arc<Schema>,
+    batch: RecordBatch,
+    props: datafusion::physical_plan::PlanProperties,
+}
+
+impl SingleBatchExec {
+    fn new(batch: RecordBatch) -> Self {
+        let schema = batch.schema();
+        let eq_props = 
datafusion::physical_expr::EquivalenceProperties::new(schema.clone());
+        let partitioning = 
datafusion::physical_plan::Partitioning::UnknownPartitioning(1);
+        let props = datafusion::physical_plan::PlanProperties::new(
+            eq_props,
+            partitioning,
+            datafusion::physical_plan::execution_plan::EmissionType::Final,
+            datafusion::physical_plan::execution_plan::Boundedness::Bounded,
+        );
+        Self {
+            schema,
+            batch,
+            props,
+        }
+    }
+}
+
+impl std::fmt::Debug for SingleBatchExec {
+    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+        write!(f, "SingleBatchExec")
+    }
+}
+
+impl datafusion::physical_plan::DisplayAs for SingleBatchExec {
+    fn fmt_as(
+        &self,
+        _t: datafusion::physical_plan::DisplayFormatType,
+        f: &mut std::fmt::Formatter,
+    ) -> std::fmt::Result {
+        write!(f, "SingleBatchExec")
+    }
+}
+
+impl datafusion::physical_plan::ExecutionPlan for SingleBatchExec {
+    fn name(&self) -> &str {
+        "SingleBatchExec"
+    }
+
+    fn as_any(&self) -> &dyn std::any::Any {
+        self
+    }
+
+    fn schema(&self) -> Arc<Schema> {
+        self.schema.clone()
+    }
+
+    fn properties(&self) -> &datafusion::physical_plan::PlanProperties {
+        &self.props
+    }
+
+    fn children(&self) -> Vec<&Arc<dyn 
datafusion::physical_plan::ExecutionPlan>> {
+        vec![]
+    }
+
+    fn with_new_children(
+        self: Arc<Self>,
+        _children: Vec<Arc<dyn datafusion::physical_plan::ExecutionPlan>>,
+    ) -> datafusion_common::Result<Arc<dyn 
datafusion::physical_plan::ExecutionPlan>> {
+        Ok(self)
+    }
+
+    fn execute(
+        &self,
+        _partition: usize,
+        _context: Arc<datafusion::execution::context::TaskContext>,
+    ) -> 
datafusion_common::Result<datafusion::physical_plan::SendableRecordBatchStream> 
{
+        use datafusion::physical_plan::{RecordBatchStream, 
SendableRecordBatchStream};
+        use futures::Stream;
+        use std::pin::Pin;
+        use std::task::{Context, Poll};
+
+        struct OnceBatchStream {
+            schema: Arc<Schema>,
+            batch: Option<RecordBatch>,
+        }
+
+        impl Stream for OnceBatchStream {
+            type Item = datafusion_common::Result<RecordBatch>;
+
+            fn poll_next(
+                mut self: Pin<&mut Self>,
+                _cx: &mut Context<'_>,
+            ) -> Poll<Option<Self::Item>> {
+                Poll::Ready(self.batch.take().map(Ok))
+            }
+        }
+
+        impl RecordBatchStream for OnceBatchStream {
+            fn schema(&self) -> Arc<Schema> {
+                self.schema.clone()
+            }
+        }
+
+        Ok(Box::pin(OnceBatchStream {
+            schema: self.schema.clone(),
+            batch: Some(self.batch.clone()),
+        }) as SendableRecordBatchStream)
+    }
+}
+
+/// Generate random points within a bounding box
+fn generate_random_points(count: usize) -> Vec<String> {
+    use rand::Rng;
+    let mut rng = rand::thread_rng();
+    (0..count)
+        .map(|_| {
+            let x: f64 = rng.gen_range(-180.0..180.0);
+            let y: f64 = rng.gen_range(-90.0..90.0);
+            format!("POINT ({} {})", x, y)
+        })
+        .collect()
+}

Review Comment:
   We have a random geometry generator in sedona-testing (that is used in the 
non-GPU join tests and elsewhere) that I think we should be using here!



##########
rust/sedona-spatial-join-gpu/src/once_fut.rs:
##########
@@ -0,0 +1,165 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+/// This module contains the OnceAsync and OnceFut types, which are used to
+/// run an async closure once. The source code was copied from DataFusion
+/// 
https://github.com/apache/datafusion/blob/48.0.0/datafusion/physical-plan/src/joins/utils.rs
+use std::task::{Context, Poll};
+use std::{
+    fmt::{self, Debug},
+    future::Future,
+    sync::Arc,
+};
+
+use datafusion::error::{DataFusionError, Result};
+use datafusion_common::SharedResult;
+use futures::{
+    future::{BoxFuture, Shared},
+    ready, FutureExt,
+};
+use parking_lot::Mutex;
+
+/// A [`OnceAsync`] runs an `async` closure once, where multiple calls to
+/// [`OnceAsync::try_once`] return a [`OnceFut`] that resolves to the result 
of the
+/// same computation.
+///
+/// This is useful for joins where the results of one child are needed to 
proceed
+/// with multiple output stream
+///
+///
+/// For example, in a hash join, one input is buffered and shared across
+/// potentially multiple output partitions. Each output partition must wait for
+/// the hash table to be built before proceeding.
+///
+/// Each output partition waits on the same `OnceAsync` before proceeding.
+pub(crate) struct OnceAsync<T> {
+    fut: Mutex<Option<SharedResult<OnceFut<T>>>>,
+}
+
+impl<T> Default for OnceAsync<T> {
+    fn default() -> Self {
+        Self {
+            fut: Mutex::new(None),
+        }
+    }
+}
+
+impl<T> Debug for OnceAsync<T> {
+    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+        write!(f, "OnceAsync")
+    }
+}
+
+impl<T: 'static> OnceAsync<T> {
+    /// If this is the first call to this function on this object, will invoke
+    /// `f` to obtain a future and return a [`OnceFut`] referring to this. `f`
+    /// may fail, in which case its error is returned.
+    ///
+    /// If this is not the first call, will return a [`OnceFut`] referring
+    /// to the same future as was returned by the first call - or the same
+    /// error if the initial call to `f` failed.
+    pub(crate) fn try_once<F, Fut>(&self, f: F) -> Result<OnceFut<T>>
+    where
+        F: FnOnce() -> Result<Fut>,
+        Fut: Future<Output = Result<T>> + Send + 'static,
+    {
+        self.fut
+            .lock()
+            .get_or_insert_with(|| f().map(OnceFut::new).map_err(Arc::new))
+            .clone()
+            .map_err(DataFusionError::Shared)
+    }
+}
+
+/// The shared future type used internally within [`OnceAsync`]
+type OnceFutPending<T> = Shared<BoxFuture<'static, SharedResult<Arc<T>>>>;
+
+/// A [`OnceFut`] represents a shared asynchronous computation, that will be 
evaluated
+/// once for all [`Clone`]'s, with [`OnceFut::get`] providing a non-consuming 
interface
+/// to drive the underlying [`Future`] to completion
+pub(crate) struct OnceFut<T> {
+    state: OnceFutState<T>,
+}
+
+impl<T> Clone for OnceFut<T> {
+    fn clone(&self) -> Self {
+        Self {
+            state: self.state.clone(),
+        }
+    }
+}
+
+enum OnceFutState<T> {
+    Pending(OnceFutPending<T>),
+    Ready(SharedResult<Arc<T>>),
+}
+
+impl<T> Clone for OnceFutState<T> {
+    fn clone(&self) -> Self {
+        match self {
+            Self::Pending(p) => Self::Pending(p.clone()),
+            Self::Ready(r) => Self::Ready(r.clone()),
+        }
+    }
+}
+
+impl<T: 'static> OnceFut<T> {
+    /// Create a new [`OnceFut`] from a [`Future`]
+    pub(crate) fn new<Fut>(fut: Fut) -> Self
+    where
+        Fut: Future<Output = Result<T>> + Send + 'static,
+    {
+        Self {
+            state: OnceFutState::Pending(
+                fut.map(|res| res.map(Arc::new).map_err(Arc::new))
+                    .boxed()
+                    .shared(),
+            ),
+        }
+    }
+
+    /// Get the result of the computation if it is ready, without consuming it
+    #[allow(unused)]

Review Comment:
   Can we remove this if it is not used?



##########
rust/sedona-spatial-join/src/optimizer.rs:
##########
@@ -1054,6 +1080,282 @@ fn is_spatial_predicate_supported(
     }
 }
 
+// ============================================================================
+// GPU Optimizer Module
+// ============================================================================
+
+/// GPU optimizer module - conditionally compiled when GPU feature is enabled
+#[cfg(feature = "gpu")]
+mod gpu_optimizer {
+    use super::*;
+    use datafusion_common::DataFusionError;
+    use sedona_spatial_join_gpu::{
+        GeometryColumnInfo, GpuSpatialJoinConfig, GpuSpatialJoinExec, 
GpuSpatialPredicate,
+    };
+
+    /// Attempt to create a GPU-accelerated spatial join.
+    /// Returns None if GPU path is not applicable for this query.
+    pub fn try_create_gpu_spatial_join(
+        spatial_join: &SpatialJoinExec,
+        config: &ConfigOptions,
+    ) -> Result<Option<Arc<dyn ExecutionPlan>>> {
+        let sedona_options = config
+            .extensions
+            .get::<SedonaOptions>()
+            .ok_or_else(|| DataFusionError::Internal("SedonaOptions not 
found".into()))?;
+
+        // Check if GPU is enabled
+        if !sedona_options.spatial_join.gpu.enable {
+            return Ok(None);
+        }
+
+        // Check if predicate is supported on GPU
+        if !is_gpu_supported_predicate(&spatial_join.on) {
+            log::debug!("Predicate {:?} not supported on GPU", 
spatial_join.on);
+            return Ok(None);
+        }
+
+        // Get child plans
+        let left = spatial_join.left.clone();
+        let right = spatial_join.right.clone();
+
+        // Get schemas from child plans
+        let left_schema = left.schema();
+        let right_schema = right.schema();
+
+        // Find geometry columns in schemas
+        let left_geom_col = find_geometry_column(&left_schema)?;
+        let right_geom_col = find_geometry_column(&right_schema)?;
+
+        // Convert spatial predicate to GPU predicate
+        let gpu_predicate = convert_to_gpu_predicate(&spatial_join.on)?;
+
+        // Create GPU spatial join configuration
+        let gpu_config = GpuSpatialJoinConfig {
+            join_type: *spatial_join.join_type(),
+            left_geom_column: left_geom_col,
+            right_geom_column: right_geom_col,
+            predicate: gpu_predicate,
+            device_id: sedona_options.spatial_join.gpu.device_id as i32,
+            batch_size: sedona_options.spatial_join.gpu.batch_size,
+            additional_filters: spatial_join.filter.clone(),
+            max_memory: if sedona_options.spatial_join.gpu.max_memory_mb > 0 {
+                Some(sedona_options.spatial_join.gpu.max_memory_mb * 1024 * 
1024)
+            } else {
+                None
+            },
+            fallback_to_cpu: sedona_options.spatial_join.gpu.fallback_to_cpu,
+        };
+
+        log::info!(
+            "Creating GPU spatial join: predicate: {:?}, left geom: {}, right 
geom: {}",
+            gpu_config.predicate,
+            gpu_config.left_geom_column.name,
+            gpu_config.right_geom_column.name,
+        );
+
+        let gpu_join = Arc::new(GpuSpatialJoinExec::new(left, right, 
gpu_config)?);
+
+        // If the original SpatialJoinExec had a projection, wrap the GPU join 
with a ProjectionExec
+        if spatial_join.contains_projection() {
+            use datafusion_physical_expr::expressions::Column;
+            use datafusion_physical_plan::projection::ProjectionExec;
+
+            // Get the projection indices from the SpatialJoinExec
+            let projection_indices = spatial_join
+                .projection()
+                .expect("contains_projection() was true but projection() 
returned None");
+
+            // Create projection expressions that map from GPU join output to 
desired output
+            let mut projection_exprs = Vec::new();
+            let gpu_schema = gpu_join.schema();
+
+            for &idx in projection_indices {
+                let field = gpu_schema.field(idx);
+                let col_expr = Arc::new(Column::new(field.name(), idx))
+                    as Arc<dyn datafusion_physical_expr::PhysicalExpr>;
+                projection_exprs.push((col_expr, field.name().clone()));
+            }
+
+            let projection_exec = ProjectionExec::try_new(projection_exprs, 
gpu_join)?;
+            Ok(Some(Arc::new(projection_exec)))
+        } else {
+            Ok(Some(gpu_join))
+        }
+    }
+
+    /// Check if spatial predicate is supported on GPU
+    pub(crate) fn is_gpu_supported_predicate(predicate: &SpatialPredicate) -> 
bool {
+        match predicate {
+            SpatialPredicate::Relation(rel) => {
+                use crate::spatial_predicate::SpatialRelationType;
+                matches!(
+                    rel.relation_type,
+                    SpatialRelationType::Intersects
+                        | SpatialRelationType::Contains
+                        | SpatialRelationType::Covers
+                        | SpatialRelationType::Within
+                        | SpatialRelationType::CoveredBy
+                        | SpatialRelationType::Touches
+                        | SpatialRelationType::Equals
+                )
+            }
+            // Distance predicates not yet supported on GPU
+            SpatialPredicate::Distance(_) => false,
+            // KNN not yet supported on GPU
+            SpatialPredicate::KNearestNeighbors(_) => false,
+        }
+    }
+
+    /// Find geometry column in schema
+    pub(crate) fn find_geometry_column(schema: &SchemaRef) -> 
Result<GeometryColumnInfo> {
+        use arrow_schema::DataType;
+
+        for (idx, field) in schema.fields().iter().enumerate() {
+            // Check if this is a WKB geometry column (Binary, LargeBinary, or 
BinaryView)
+            if matches!(
+                field.data_type(),
+                DataType::Binary | DataType::LargeBinary | DataType::BinaryView
+            ) {
+                // Check metadata for geometry type
+                if let Some(meta) = 
field.metadata().get("ARROW:extension:name") {
+                    if meta.contains("geoarrow.wkb") || 
meta.contains("geometry") {
+                        return Ok(GeometryColumnInfo {
+                            name: field.name().clone(),
+                            index: idx,
+                        });
+                    }
+                }
+
+                // If no metadata, assume first binary column is geometry
+                // This is a fallback for files without proper GeoArrow 
metadata
+                if idx == schema.fields().len() - 1
+                    || schema.fields().iter().skip(idx + 1).all(|f| {
+                        !matches!(
+                            f.data_type(),
+                            DataType::Binary | DataType::LargeBinary | 
DataType::BinaryView
+                        )
+                    })
+                {
+                    log::warn!(
+                        "Geometry column '{}' has no GeoArrow metadata, 
assuming it's WKB",
+                        field.name()
+                    );
+                    return Ok(GeometryColumnInfo {
+                        name: field.name().clone(),
+                        index: idx,
+                    });
+                }
+            }
+        }
+
+        Err(DataFusionError::Plan(
+            "No geometry column found in schema".into(),
+        ))
+    }
+
+    /// Convert SpatialPredicate to GPU predicate
+    pub(crate) fn convert_to_gpu_predicate(
+        predicate: &SpatialPredicate,
+    ) -> Result<GpuSpatialPredicate> {
+        use crate::spatial_predicate::SpatialRelationType;
+        use sedona_libgpuspatial::SpatialPredicate as LibGpuPred;
+
+        match predicate {
+            SpatialPredicate::Relation(rel) => {
+                let gpu_pred = match rel.relation_type {
+                    SpatialRelationType::Intersects => LibGpuPred::Intersects,
+                    SpatialRelationType::Contains => LibGpuPred::Contains,
+                    SpatialRelationType::Covers => LibGpuPred::Covers,
+                    SpatialRelationType::Within => LibGpuPred::Within,
+                    SpatialRelationType::CoveredBy => LibGpuPred::CoveredBy,
+                    SpatialRelationType::Touches => LibGpuPred::Touches,
+                    SpatialRelationType::Equals => LibGpuPred::Equals,

Review Comment:
   Can we move `SpatialRelationType` to `sedona-geometry` or `sedona-common` to 
avoid two copies?



##########
python/sedonadb/Cargo.toml:
##########
@@ -29,6 +29,7 @@ crate-type = ["cdylib"]
 default = ["mimalloc"]
 mimalloc = ["dep:mimalloc", "dep:libmimalloc-sys"]
 s2geography = ["sedona/s2geography"]
+gpu = ["sedona/gpu"]

Review Comment:
   Because we don't have any tests in Python for this feature I suggest leaving 
this out for now (a follow-up PR could add Python support + a test)



##########
rust/sedona-spatial-join-gpu/tests/gpu_functional_test.rs:
##########
@@ -0,0 +1,588 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! GPU Functional Tests
+//!
+//! These tests require actual GPU hardware and CUDA toolkit.
+//! They verify the correctness and performance of actual GPU computation.
+//!
+//! **Prerequisites:**
+//! - CUDA-capable GPU (compute capability 6.0+)
+//! - CUDA Toolkit 11.0+ installed
+//! - Linux or Windows OS
+//! - Build with --features gpu
+//!
+//! **Running:**
+//! ```bash
+//! # Run all GPU functional tests
+//! cargo test --package sedona-spatial-join-gpu --features gpu 
gpu_functional_tests
+//!
+//! # Run ignored tests (requires GPU)
+//! cargo test --package sedona-spatial-join-gpu --features gpu -- --ignored
+//! ```
+
+use arrow::datatypes::{DataType, Field, Schema};
+use arrow::ipc::reader::StreamReader;
+use arrow_array::{Int32Array, RecordBatch};
+use datafusion::execution::context::TaskContext;
+use datafusion::physical_plan::ExecutionPlan;
+use futures::StreamExt;
+use sedona_spatial_join_gpu::{
+    GeometryColumnInfo, GpuSpatialJoinConfig, GpuSpatialJoinExec, 
GpuSpatialPredicate,
+    SpatialPredicate,
+};
+use std::fs::File;
+use std::sync::Arc;
+
+/// Helper to create test geometry data
+#[allow(dead_code)]
+fn create_point_wkb(x: f64, y: f64) -> Vec<u8> {
+    let mut wkb = vec![0x01, 0x01, 0x00, 0x00, 0x00]; // Little endian point 
type
+    wkb.extend_from_slice(&x.to_le_bytes());
+    wkb.extend_from_slice(&y.to_le_bytes());
+    wkb
+}
+
+/// Check if GPU is actually available
+fn is_gpu_available() -> bool {
+    use sedona_libgpuspatial::GpuSpatialContext;
+
+    match GpuSpatialContext::new() {
+        Ok(mut ctx) => ctx.init().is_ok(),
+        Err(_) => false,
+    }
+}
+
+/// Mock execution plan that produces geometry data
+#[allow(dead_code)]

Review Comment:
   Would something like:
   
   ```rust
   #[cfg(feature = "gpu"))
   mod gpu_functional_test;
   ```
   
   enable you to remove all these unused/dead code bits?



##########
rust/sedona-spatial-join-gpu/src/exec.rs:
##########
@@ -0,0 +1,294 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::any::Any;
+use std::fmt::{Debug, Formatter};
+use std::sync::Arc;
+
+use arrow::datatypes::SchemaRef;
+use datafusion::error::{DataFusionError, Result};
+use datafusion::execution::context::TaskContext;
+use datafusion::physical_expr::EquivalenceProperties;
+use datafusion::physical_plan::execution_plan::{Boundedness, EmissionType};
+use datafusion::physical_plan::{
+    joins::utils::build_join_schema, DisplayAs, DisplayFormatType, 
ExecutionPlan, PlanProperties,
+    SendableRecordBatchStream,
+};
+use datafusion_physical_plan::metrics::ExecutionPlanMetricsSet;
+use datafusion_physical_plan::ExecutionPlanProperties;
+use futures::stream::StreamExt;
+use parking_lot::Mutex;
+
+use crate::config::GpuSpatialJoinConfig;
+use crate::once_fut::OnceAsync;
+
+/// GPU-accelerated spatial join execution plan
+///
+/// This execution plan accepts two child inputs (e.g., ParquetExec) and 
performs:
+/// 1. Reading data from child streams
+/// 2. Data transfer to GPU memory
+/// 3. GPU spatial join execution
+/// 4. Result materialization
+pub struct GpuSpatialJoinExec {
+    /// Left child execution plan (build side)
+    left: Arc<dyn ExecutionPlan>,
+
+    /// Right child execution plan (probe side)
+    right: Arc<dyn ExecutionPlan>,
+
+    /// Join configuration
+    config: GpuSpatialJoinConfig,
+
+    /// Combined output schema
+    schema: SchemaRef,
+
+    /// Execution properties
+    properties: PlanProperties,
+
+    /// Metrics for this join operation
+    metrics: datafusion_physical_plan::metrics::ExecutionPlanMetricsSet,
+
+    /// Shared build data computed once and reused across all output partitions
+    once_async_build_data: 
Arc<Mutex<Option<OnceAsync<crate::build_data::GpuBuildData>>>>,
+}
+
+impl GpuSpatialJoinExec {
+    pub fn new(
+        left: Arc<dyn ExecutionPlan>,
+        right: Arc<dyn ExecutionPlan>,
+        config: GpuSpatialJoinConfig,
+    ) -> Result<Self> {
+        // Build join schema using DataFusion's utility to handle duplicate 
column names
+        let left_schema = left.schema();
+        let right_schema = right.schema();
+        let (join_schema, _column_indices) =
+            build_join_schema(&left_schema, &right_schema, &config.join_type);
+        let schema = Arc::new(join_schema);
+
+        // Create execution properties
+        // Output partitioning matches right side to enable parallelism
+        let eq_props = EquivalenceProperties::new(schema.clone());
+        let partitioning = right.output_partitioning().clone();
+        let properties = PlanProperties::new(
+            eq_props,
+            partitioning,
+            EmissionType::Final, // GPU join produces all results at once

Review Comment:
   Just checking that this is correct (I thought that because one side is 
streaming the output might be incremental?)



##########
rust/sedona-spatial-join-gpu/src/gpu_backend.rs:
##########
@@ -0,0 +1,294 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::Result;
+use arrow::compute::take;
+use arrow_array::{Array, ArrayRef, BinaryArray, RecordBatch, UInt32Array};
+use arrow_schema::{DataType, Schema};
+use sedona_libgpuspatial::{GpuSpatialContext, SpatialPredicate};
+use std::sync::Arc;
+use std::time::Instant;
+
+/// GPU backend for spatial operations
+#[allow(dead_code)]
+pub struct GpuBackend {
+    device_id: i32,
+    gpu_context: Option<GpuSpatialContext>,
+}
+
+#[allow(dead_code)]
+impl GpuBackend {

Review Comment:
   Can these dead code markers be removed?



##########
rust/sedona-spatial-join-gpu/src/gpu_backend.rs:
##########
@@ -0,0 +1,294 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::Result;
+use arrow::compute::take;
+use arrow_array::{Array, ArrayRef, BinaryArray, RecordBatch, UInt32Array};
+use arrow_schema::{DataType, Schema};
+use sedona_libgpuspatial::{GpuSpatialContext, SpatialPredicate};
+use std::sync::Arc;
+use std::time::Instant;
+
+/// GPU backend for spatial operations
+#[allow(dead_code)]
+pub struct GpuBackend {
+    device_id: i32,
+    gpu_context: Option<GpuSpatialContext>,
+}
+
+#[allow(dead_code)]
+impl GpuBackend {
+    pub fn new(device_id: i32) -> Result<Self> {
+        Ok(Self {
+            device_id,
+            gpu_context: None,
+        })
+    }
+
+    pub fn init(&mut self) -> Result<()> {
+        // Initialize GPU context
+        log::info!(
+            "[GPU Join] Initializing GPU context (device {})",
+            self.device_id
+        );
+        match GpuSpatialContext::new() {
+            Ok(mut ctx) => {
+                ctx.init().map_err(|e| {
+                    crate::Error::GpuInit(format!("Failed to initialize GPU 
context: {e:?}"))
+                })?;
+                self.gpu_context = Some(ctx);
+                log::info!("[GPU Join] GPU context initialized successfully");
+                Ok(())
+            }
+            Err(e) => {
+                log::warn!("[GPU Join] GPU not available: {e:?}");
+                // Gracefully handle GPU not being available
+                Ok(())
+            }
+        }
+    }
+
+    /// Convert BinaryView array to Binary array for GPU processing
+    /// OPTIMIZATION: Use Arrow's optimized cast instead of manual iteration
+    fn ensure_binary_array(array: &ArrayRef) -> Result<ArrayRef> {
+        match array.data_type() {
+            DataType::BinaryView => {
+                // OPTIMIZATION: Use Arrow's cast which is much faster than 
manual iteration
+                use arrow::compute::cast;
+                cast(array.as_ref(), 
&DataType::Binary).map_err(crate::Error::Arrow)
+            }

Review Comment:
   This feels like it should be fixed on the C side in a follow-up 
(nanoarrow/geoarrow support view types and should be able to ingest them).



##########
c/sedona-s2geography/s2geography:
##########


Review Comment:
   `git submodule update --recursive` should remove this diff



##########
rust/sedona-spatial-join-gpu/tests/gpu_functional_test.rs:
##########
@@ -0,0 +1,588 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! GPU Functional Tests
+//!
+//! These tests require actual GPU hardware and CUDA toolkit.
+//! They verify the correctness and performance of actual GPU computation.
+//!
+//! **Prerequisites:**
+//! - CUDA-capable GPU (compute capability 6.0+)
+//! - CUDA Toolkit 11.0+ installed
+//! - Linux or Windows OS
+//! - Build with --features gpu
+//!
+//! **Running:**
+//! ```bash
+//! # Run all GPU functional tests
+//! cargo test --package sedona-spatial-join-gpu --features gpu 
gpu_functional_tests
+//!
+//! # Run ignored tests (requires GPU)
+//! cargo test --package sedona-spatial-join-gpu --features gpu -- --ignored
+//! ```
+
+use arrow::datatypes::{DataType, Field, Schema};
+use arrow::ipc::reader::StreamReader;
+use arrow_array::{Int32Array, RecordBatch};
+use datafusion::execution::context::TaskContext;
+use datafusion::physical_plan::ExecutionPlan;
+use futures::StreamExt;
+use sedona_spatial_join_gpu::{
+    GeometryColumnInfo, GpuSpatialJoinConfig, GpuSpatialJoinExec, 
GpuSpatialPredicate,
+    SpatialPredicate,
+};
+use std::fs::File;
+use std::sync::Arc;
+
+/// Helper to create test geometry data
+#[allow(dead_code)]
+fn create_point_wkb(x: f64, y: f64) -> Vec<u8> {
+    let mut wkb = vec![0x01, 0x01, 0x00, 0x00, 0x00]; // Little endian point 
type
+    wkb.extend_from_slice(&x.to_le_bytes());
+    wkb.extend_from_slice(&y.to_le_bytes());
+    wkb
+}

Review Comment:
   We have a helper for this in sedona-testing (and one specifically for points 
in sedona-geometry)



##########
rust/sedona-spatial-join/src/exec.rs:
##########
@@ -1151,26 +1156,195 @@ mod tests {
         let df = ctx.sql(sql).await?;
         let actual_schema = df.schema().as_arrow().clone();
         let plan = df.clone().create_physical_plan().await?;
-        let spatial_join_execs = collect_spatial_join_exec(&plan)?;
+        let spatial_join_count = collect_spatial_join_exec(&plan)?;
         if is_optimized_spatial_join {
-            assert_eq!(spatial_join_execs.len(), 1);
+            assert_eq!(spatial_join_count, 1);
         } else {
-            assert!(spatial_join_execs.is_empty());
+            assert_eq!(spatial_join_count, 0);
         }
         let result_batches = df.collect().await?;
         let result_batch =
             arrow::compute::concat_batches(&Arc::new(actual_schema), 
&result_batches)?;
         Ok(result_batch)
     }
 
-    fn collect_spatial_join_exec(plan: &Arc<dyn ExecutionPlan>) -> 
Result<Vec<&SpatialJoinExec>> {
-        let mut spatial_join_execs = Vec::new();
+    fn collect_spatial_join_exec(plan: &Arc<dyn ExecutionPlan>) -> 
Result<usize> {
+        let mut count = 0;
         plan.apply(|node| {
-            if let Some(spatial_join_exec) = 
node.as_any().downcast_ref::<SpatialJoinExec>() {
-                spatial_join_execs.push(spatial_join_exec);
+            if node.as_any().downcast_ref::<SpatialJoinExec>().is_some() {
+                count += 1;
+            }
+            #[cfg(feature = "gpu")]
+            if node
+                .as_any()
+                .downcast_ref::<sedona_spatial_join_gpu::GpuSpatialJoinExec>()
+                .is_some()
+            {
+                count += 1;
             }
             Ok(TreeNodeRecursion::Continue)
         })?;
-        Ok(spatial_join_execs)
+        Ok(count)
+    }
+
+    #[cfg(feature = "gpu")]
+    #[tokio::test]
+    #[ignore] // Requires GPU hardware

Review Comment:
   We need to figure out a way to not ignore tests in this repo (in this case I 
think these tests shouldn't exist if the gpu feature isn't enabled so we 
shouldn't need the ignore it?)



##########
rust/sedona-spatial-join-gpu/tests/gpu_functional_test.rs:
##########
@@ -0,0 +1,588 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! GPU Functional Tests
+//!
+//! These tests require actual GPU hardware and CUDA toolkit.
+//! They verify the correctness and performance of actual GPU computation.
+//!
+//! **Prerequisites:**
+//! - CUDA-capable GPU (compute capability 6.0+)
+//! - CUDA Toolkit 11.0+ installed
+//! - Linux or Windows OS
+//! - Build with --features gpu
+//!
+//! **Running:**
+//! ```bash
+//! # Run all GPU functional tests
+//! cargo test --package sedona-spatial-join-gpu --features gpu 
gpu_functional_tests
+//!
+//! # Run ignored tests (requires GPU)
+//! cargo test --package sedona-spatial-join-gpu --features gpu -- --ignored
+//! ```
+
+use arrow::datatypes::{DataType, Field, Schema};
+use arrow::ipc::reader::StreamReader;
+use arrow_array::{Int32Array, RecordBatch};
+use datafusion::execution::context::TaskContext;
+use datafusion::physical_plan::ExecutionPlan;
+use futures::StreamExt;
+use sedona_spatial_join_gpu::{
+    GeometryColumnInfo, GpuSpatialJoinConfig, GpuSpatialJoinExec, 
GpuSpatialPredicate,
+    SpatialPredicate,
+};
+use std::fs::File;
+use std::sync::Arc;
+
+/// Helper to create test geometry data
+#[allow(dead_code)]
+fn create_point_wkb(x: f64, y: f64) -> Vec<u8> {
+    let mut wkb = vec![0x01, 0x01, 0x00, 0x00, 0x00]; // Little endian point 
type
+    wkb.extend_from_slice(&x.to_le_bytes());
+    wkb.extend_from_slice(&y.to_le_bytes());
+    wkb
+}
+
+/// Check if GPU is actually available
+fn is_gpu_available() -> bool {
+    use sedona_libgpuspatial::GpuSpatialContext;
+
+    match GpuSpatialContext::new() {
+        Ok(mut ctx) => ctx.init().is_ok(),
+        Err(_) => false,
+    }
+}
+
+/// Mock execution plan that produces geometry data
+#[allow(dead_code)]
+struct GeometryDataExec {
+    schema: Arc<Schema>,
+    batch: RecordBatch,
+}
+
+#[allow(dead_code)]
+impl GeometryDataExec {
+    fn new(ids: Vec<i32>, geometries: Vec<Vec<u8>>) -> Self {
+        let schema = Arc::new(Schema::new(vec![
+            Field::new("id", DataType::Int32, false),
+            Field::new("geometry", DataType::Binary, false),
+        ]));
+
+        let id_array = Int32Array::from(ids);

Review Comment:
   It feels like this is duplicated from the single batch exec also in this PR 
(and either way I think this could be removed in favour of 
`SessionContext::register_batch()`).



##########
rust/sedona-spatial-join-gpu/tests/gpu_functional_test.rs:
##########
@@ -0,0 +1,588 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! GPU Functional Tests
+//!
+//! These tests require actual GPU hardware and CUDA toolkit.
+//! They verify the correctness and performance of actual GPU computation.
+//!
+//! **Prerequisites:**
+//! - CUDA-capable GPU (compute capability 6.0+)
+//! - CUDA Toolkit 11.0+ installed
+//! - Linux or Windows OS
+//! - Build with --features gpu
+//!
+//! **Running:**
+//! ```bash
+//! # Run all GPU functional tests
+//! cargo test --package sedona-spatial-join-gpu --features gpu 
gpu_functional_tests
+//!
+//! # Run ignored tests (requires GPU)
+//! cargo test --package sedona-spatial-join-gpu --features gpu -- --ignored
+//! ```
+
+use arrow::datatypes::{DataType, Field, Schema};
+use arrow::ipc::reader::StreamReader;
+use arrow_array::{Int32Array, RecordBatch};
+use datafusion::execution::context::TaskContext;
+use datafusion::physical_plan::ExecutionPlan;
+use futures::StreamExt;
+use sedona_spatial_join_gpu::{
+    GeometryColumnInfo, GpuSpatialJoinConfig, GpuSpatialJoinExec, 
GpuSpatialPredicate,
+    SpatialPredicate,
+};
+use std::fs::File;
+use std::sync::Arc;
+
+/// Helper to create test geometry data
+#[allow(dead_code)]
+fn create_point_wkb(x: f64, y: f64) -> Vec<u8> {
+    let mut wkb = vec![0x01, 0x01, 0x00, 0x00, 0x00]; // Little endian point 
type
+    wkb.extend_from_slice(&x.to_le_bytes());
+    wkb.extend_from_slice(&y.to_le_bytes());
+    wkb
+}
+
+/// Check if GPU is actually available
+fn is_gpu_available() -> bool {
+    use sedona_libgpuspatial::GpuSpatialContext;
+
+    match GpuSpatialContext::new() {
+        Ok(mut ctx) => ctx.init().is_ok(),
+        Err(_) => false,
+    }
+}
+
+/// Mock execution plan that produces geometry data
+#[allow(dead_code)]
+struct GeometryDataExec {
+    schema: Arc<Schema>,
+    batch: RecordBatch,
+}
+
+#[allow(dead_code)]
+impl GeometryDataExec {
+    fn new(ids: Vec<i32>, geometries: Vec<Vec<u8>>) -> Self {
+        let schema = Arc::new(Schema::new(vec![
+            Field::new("id", DataType::Int32, false),
+            Field::new("geometry", DataType::Binary, false),
+        ]));
+
+        let id_array = Int32Array::from(ids);
+
+        // Build BinaryArray using builder to avoid lifetime issues
+        let mut builder = arrow_array::builder::BinaryBuilder::new();
+        for geom in geometries {
+            builder.append_value(&geom);
+        }
+        let geom_array = builder.finish();
+
+        let batch = RecordBatch::try_new(
+            schema.clone(),
+            vec![Arc::new(id_array), Arc::new(geom_array)],
+        )
+        .unwrap();
+
+        Self { schema, batch }
+    }
+}
+
+impl std::fmt::Debug for GeometryDataExec {
+    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+        write!(f, "GeometryDataExec")
+    }
+}
+
+impl datafusion::physical_plan::DisplayAs for GeometryDataExec {
+    fn fmt_as(
+        &self,
+        _t: datafusion::physical_plan::DisplayFormatType,
+        f: &mut std::fmt::Formatter,
+    ) -> std::fmt::Result {
+        write!(f, "GeometryDataExec")
+    }
+}
+
+impl ExecutionPlan for GeometryDataExec {
+    fn name(&self) -> &str {
+        "GeometryDataExec"
+    }
+
+    fn as_any(&self) -> &dyn std::any::Any {
+        self
+    }
+
+    fn schema(&self) -> Arc<Schema> {
+        self.schema.clone()
+    }
+
+    fn properties(&self) -> &datafusion::physical_plan::PlanProperties {
+        unimplemented!("properties not needed for test")
+    }
+
+    fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
+        vec![]
+    }
+
+    fn with_new_children(
+        self: Arc<Self>,
+        _children: Vec<Arc<dyn ExecutionPlan>>,
+    ) -> datafusion_common::Result<Arc<dyn ExecutionPlan>> {
+        Ok(self)
+    }
+
+    fn execute(
+        &self,
+        _partition: usize,
+        _context: Arc<TaskContext>,
+    ) -> 
datafusion_common::Result<datafusion::physical_plan::SendableRecordBatchStream> 
{
+        use datafusion::physical_plan::{RecordBatchStream, 
SendableRecordBatchStream};
+        use futures::Stream;
+        use std::pin::Pin;
+        use std::task::{Context, Poll};
+
+        struct SingleBatchStream {
+            schema: Arc<Schema>,
+            batch: Option<RecordBatch>,
+        }
+
+        impl Stream for SingleBatchStream {
+            type Item = datafusion_common::Result<RecordBatch>;
+
+            fn poll_next(
+                mut self: Pin<&mut Self>,
+                _cx: &mut Context<'_>,
+            ) -> Poll<Option<Self::Item>> {
+                Poll::Ready(self.batch.take().map(Ok))
+            }
+        }
+
+        impl RecordBatchStream for SingleBatchStream {
+            fn schema(&self) -> Arc<Schema> {
+                self.schema.clone()
+            }
+        }
+
+        Ok(Box::pin(SingleBatchStream {
+            schema: self.schema.clone(),
+            batch: Some(self.batch.clone()),
+        }) as SendableRecordBatchStream)
+    }
+}
+
+#[tokio::test]
+#[ignore] // Requires GPU hardware
+async fn test_gpu_spatial_join_basic_correctness() {
+    let _ = env_logger::builder().is_test(true).try_init();
+
+    if !is_gpu_available() {
+        log::warn!("GPU not available, skipping test");
+        return;
+    }
+
+    let test_data_dir = concat!(
+        env!("CARGO_MANIFEST_DIR"),
+        "/../../c/sedona-libgpuspatial/libgpuspatial/test_data"
+    );
+    let points_path = format!("{}/test_points.arrows", test_data_dir);
+    let polygons_path = format!("{}/test_polygons.arrows", test_data_dir);
+
+    let points_file =
+        File::open(&points_path).unwrap_or_else(|_| panic!("Failed to open 
{}", points_path));
+    let polygons_file =
+        File::open(&polygons_path).unwrap_or_else(|_| panic!("Failed to open 
{}", polygons_path));
+
+    let mut points_reader = StreamReader::try_new(points_file, None).unwrap();
+    let mut polygons_reader = StreamReader::try_new(polygons_file, 
None).unwrap();
+
+    // Process all batches like the CUDA test does
+    let mut total_rows = 0;
+    let mut iteration = 0;
+
+    loop {
+        // Read next batch from each stream
+        let polygons_batch = match polygons_reader.next() {
+            Some(Ok(batch)) => batch,
+            Some(Err(e)) => panic!("Error reading polygons batch: {}", e),
+            None => break, // End of stream
+        };
+
+        let points_batch = match points_reader.next() {
+            Some(Ok(batch)) => batch,
+            Some(Err(e)) => panic!("Error reading points batch: {}", e),
+            None => break, // End of stream
+        };
+
+        if iteration == 0 {
+            log::info!(
+                "Batch {}: {} polygons, {} points",
+                iteration,
+                polygons_batch.num_rows(),
+                points_batch.num_rows()
+            );
+        }
+
+        // Find geometry column index
+        let points_geom_idx = points_batch
+            .schema()
+            .index_of("geometry")
+            .expect("geometry column not found");
+        let polygons_geom_idx = polygons_batch
+            .schema()
+            .index_of("geometry")
+            .expect("geometry column not found");
+
+        // Create execution plans from the batches
+        let left_plan =
+            Arc::new(SingleBatchExec::new(polygons_batch.clone())) as Arc<dyn 
ExecutionPlan>;
+        let right_plan =
+            Arc::new(SingleBatchExec::new(points_batch.clone())) as Arc<dyn 
ExecutionPlan>;
+
+        let config = GpuSpatialJoinConfig {
+            join_type: datafusion::logical_expr::JoinType::Inner,
+            left_geom_column: GeometryColumnInfo {
+                name: "geometry".to_string(),
+                index: polygons_geom_idx,
+            },
+            right_geom_column: GeometryColumnInfo {
+                name: "geometry".to_string(),
+                index: points_geom_idx,
+            },
+            predicate: 
GpuSpatialPredicate::Relation(SpatialPredicate::Intersects),
+            device_id: 0,
+            batch_size: 8192,
+            additional_filters: None,
+            max_memory: None,
+            fallback_to_cpu: false,
+        };
+
+        let gpu_join = Arc::new(GpuSpatialJoinExec::new(left_plan, right_plan, 
config).unwrap());
+        let task_context = Arc::new(TaskContext::default());
+        let mut stream = gpu_join.execute(0, task_context).unwrap();
+
+        while let Some(result) = stream.next().await {
+            match result {
+                Ok(batch) => {
+                    let batch_rows = batch.num_rows();
+                    total_rows += batch_rows;
+                    if batch_rows > 0 && iteration < 5 {
+                        log::debug!(
+                            "Iteration {}: Got {} rows from GPU join",
+                            iteration,
+                            batch_rows
+                        );
+                    }
+                }
+                Err(e) => {
+                    panic!("GPU join failed at iteration {}: {}", iteration, 
e);
+                }
+            }
+        }
+
+        iteration += 1;
+    }
+
+    log::info!(
+        "Total rows from GPU join across {} iterations: {}",
+        iteration,
+        total_rows
+    );
+    // Test passes if GPU join completes without crashing and finds results
+    // The CUDA reference test loops through all batches to accumulate results
+    assert!(
+        total_rows > 0,
+        "Expected at least some results across {} iterations, got {}",
+        iteration,
+        total_rows
+    );
+    log::info!(
+        "GPU spatial join completed successfully with {} result rows",
+        total_rows
+    );
+}
+/// Helper execution plan that returns a single pre-loaded batch
+struct SingleBatchExec {
+    schema: Arc<Schema>,
+    batch: RecordBatch,
+    props: datafusion::physical_plan::PlanProperties,
+}

Review Comment:
   Duplicate of the other single batch exec?



##########
rust/sedona-spatial-join-gpu/tests/gpu_functional_test.rs:
##########
@@ -0,0 +1,588 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! GPU Functional Tests
+//!
+//! These tests require actual GPU hardware and CUDA toolkit.
+//! They verify the correctness and performance of actual GPU computation.
+//!
+//! **Prerequisites:**
+//! - CUDA-capable GPU (compute capability 6.0+)
+//! - CUDA Toolkit 11.0+ installed
+//! - Linux or Windows OS
+//! - Build with --features gpu
+//!
+//! **Running:**
+//! ```bash
+//! # Run all GPU functional tests
+//! cargo test --package sedona-spatial-join-gpu --features gpu 
gpu_functional_tests
+//!
+//! # Run ignored tests (requires GPU)
+//! cargo test --package sedona-spatial-join-gpu --features gpu -- --ignored
+//! ```
+
+use arrow::datatypes::{DataType, Field, Schema};
+use arrow::ipc::reader::StreamReader;
+use arrow_array::{Int32Array, RecordBatch};
+use datafusion::execution::context::TaskContext;
+use datafusion::physical_plan::ExecutionPlan;
+use futures::StreamExt;
+use sedona_spatial_join_gpu::{
+    GeometryColumnInfo, GpuSpatialJoinConfig, GpuSpatialJoinExec, 
GpuSpatialPredicate,
+    SpatialPredicate,
+};
+use std::fs::File;
+use std::sync::Arc;
+
+/// Helper to create test geometry data
+#[allow(dead_code)]
+fn create_point_wkb(x: f64, y: f64) -> Vec<u8> {
+    let mut wkb = vec![0x01, 0x01, 0x00, 0x00, 0x00]; // Little endian point 
type
+    wkb.extend_from_slice(&x.to_le_bytes());
+    wkb.extend_from_slice(&y.to_le_bytes());
+    wkb
+}
+
+/// Check if GPU is actually available
+fn is_gpu_available() -> bool {
+    use sedona_libgpuspatial::GpuSpatialContext;
+
+    match GpuSpatialContext::new() {
+        Ok(mut ctx) => ctx.init().is_ok(),
+        Err(_) => false,
+    }
+}
+
+/// Mock execution plan that produces geometry data
+#[allow(dead_code)]
+struct GeometryDataExec {
+    schema: Arc<Schema>,
+    batch: RecordBatch,
+}
+
+#[allow(dead_code)]
+impl GeometryDataExec {
+    fn new(ids: Vec<i32>, geometries: Vec<Vec<u8>>) -> Self {
+        let schema = Arc::new(Schema::new(vec![
+            Field::new("id", DataType::Int32, false),
+            Field::new("geometry", DataType::Binary, false),
+        ]));
+
+        let id_array = Int32Array::from(ids);
+
+        // Build BinaryArray using builder to avoid lifetime issues
+        let mut builder = arrow_array::builder::BinaryBuilder::new();
+        for geom in geometries {
+            builder.append_value(&geom);
+        }
+        let geom_array = builder.finish();
+
+        let batch = RecordBatch::try_new(
+            schema.clone(),
+            vec![Arc::new(id_array), Arc::new(geom_array)],
+        )
+        .unwrap();
+
+        Self { schema, batch }
+    }
+}
+
+impl std::fmt::Debug for GeometryDataExec {
+    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+        write!(f, "GeometryDataExec")
+    }
+}
+
+impl datafusion::physical_plan::DisplayAs for GeometryDataExec {
+    fn fmt_as(
+        &self,
+        _t: datafusion::physical_plan::DisplayFormatType,
+        f: &mut std::fmt::Formatter,
+    ) -> std::fmt::Result {
+        write!(f, "GeometryDataExec")
+    }
+}
+
+impl ExecutionPlan for GeometryDataExec {
+    fn name(&self) -> &str {
+        "GeometryDataExec"
+    }
+
+    fn as_any(&self) -> &dyn std::any::Any {
+        self
+    }
+
+    fn schema(&self) -> Arc<Schema> {
+        self.schema.clone()
+    }
+
+    fn properties(&self) -> &datafusion::physical_plan::PlanProperties {
+        unimplemented!("properties not needed for test")
+    }
+
+    fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
+        vec![]
+    }
+
+    fn with_new_children(
+        self: Arc<Self>,
+        _children: Vec<Arc<dyn ExecutionPlan>>,
+    ) -> datafusion_common::Result<Arc<dyn ExecutionPlan>> {
+        Ok(self)
+    }
+
+    fn execute(
+        &self,
+        _partition: usize,
+        _context: Arc<TaskContext>,
+    ) -> 
datafusion_common::Result<datafusion::physical_plan::SendableRecordBatchStream> 
{
+        use datafusion::physical_plan::{RecordBatchStream, 
SendableRecordBatchStream};
+        use futures::Stream;
+        use std::pin::Pin;
+        use std::task::{Context, Poll};
+
+        struct SingleBatchStream {
+            schema: Arc<Schema>,
+            batch: Option<RecordBatch>,
+        }
+
+        impl Stream for SingleBatchStream {
+            type Item = datafusion_common::Result<RecordBatch>;
+
+            fn poll_next(
+                mut self: Pin<&mut Self>,
+                _cx: &mut Context<'_>,
+            ) -> Poll<Option<Self::Item>> {
+                Poll::Ready(self.batch.take().map(Ok))
+            }
+        }
+
+        impl RecordBatchStream for SingleBatchStream {
+            fn schema(&self) -> Arc<Schema> {
+                self.schema.clone()
+            }
+        }
+
+        Ok(Box::pin(SingleBatchStream {
+            schema: self.schema.clone(),
+            batch: Some(self.batch.clone()),
+        }) as SendableRecordBatchStream)
+    }
+}
+
+#[tokio::test]
+#[ignore] // Requires GPU hardware
+async fn test_gpu_spatial_join_basic_correctness() {
+    let _ = env_logger::builder().is_test(true).try_init();
+
+    if !is_gpu_available() {
+        log::warn!("GPU not available, skipping test");
+        return;
+    }
+
+    let test_data_dir = concat!(
+        env!("CARGO_MANIFEST_DIR"),
+        "/../../c/sedona-libgpuspatial/libgpuspatial/test_data"
+    );
+    let points_path = format!("{}/test_points.arrows", test_data_dir);
+    let polygons_path = format!("{}/test_polygons.arrows", test_data_dir);
+
+    let points_file =
+        File::open(&points_path).unwrap_or_else(|_| panic!("Failed to open 
{}", points_path));
+    let polygons_file =
+        File::open(&polygons_path).unwrap_or_else(|_| panic!("Failed to open 
{}", polygons_path));
+
+    let mut points_reader = StreamReader::try_new(points_file, None).unwrap();
+    let mut polygons_reader = StreamReader::try_new(polygons_file, 
None).unwrap();
+
+    // Process all batches like the CUDA test does
+    let mut total_rows = 0;
+    let mut iteration = 0;
+
+    loop {
+        // Read next batch from each stream
+        let polygons_batch = match polygons_reader.next() {
+            Some(Ok(batch)) => batch,
+            Some(Err(e)) => panic!("Error reading polygons batch: {}", e),
+            None => break, // End of stream
+        };
+
+        let points_batch = match points_reader.next() {
+            Some(Ok(batch)) => batch,
+            Some(Err(e)) => panic!("Error reading points batch: {}", e),
+            None => break, // End of stream
+        };
+
+        if iteration == 0 {
+            log::info!(
+                "Batch {}: {} polygons, {} points",
+                iteration,
+                polygons_batch.num_rows(),
+                points_batch.num_rows()
+            );
+        }
+
+        // Find geometry column index
+        let points_geom_idx = points_batch
+            .schema()
+            .index_of("geometry")
+            .expect("geometry column not found");
+        let polygons_geom_idx = polygons_batch
+            .schema()
+            .index_of("geometry")
+            .expect("geometry column not found");
+
+        // Create execution plans from the batches
+        let left_plan =
+            Arc::new(SingleBatchExec::new(polygons_batch.clone())) as Arc<dyn 
ExecutionPlan>;
+        let right_plan =
+            Arc::new(SingleBatchExec::new(points_batch.clone())) as Arc<dyn 
ExecutionPlan>;
+
+        let config = GpuSpatialJoinConfig {
+            join_type: datafusion::logical_expr::JoinType::Inner,
+            left_geom_column: GeometryColumnInfo {
+                name: "geometry".to_string(),
+                index: polygons_geom_idx,
+            },
+            right_geom_column: GeometryColumnInfo {
+                name: "geometry".to_string(),
+                index: points_geom_idx,
+            },
+            predicate: 
GpuSpatialPredicate::Relation(SpatialPredicate::Intersects),
+            device_id: 0,
+            batch_size: 8192,
+            additional_filters: None,
+            max_memory: None,
+            fallback_to_cpu: false,
+        };
+
+        let gpu_join = Arc::new(GpuSpatialJoinExec::new(left_plan, right_plan, 
config).unwrap());
+        let task_context = Arc::new(TaskContext::default());
+        let mut stream = gpu_join.execute(0, task_context).unwrap();
+
+        while let Some(result) = stream.next().await {
+            match result {
+                Ok(batch) => {
+                    let batch_rows = batch.num_rows();
+                    total_rows += batch_rows;
+                    if batch_rows > 0 && iteration < 5 {
+                        log::debug!(
+                            "Iteration {}: Got {} rows from GPU join",
+                            iteration,
+                            batch_rows
+                        );
+                    }
+                }
+                Err(e) => {
+                    panic!("GPU join failed at iteration {}: {}", iteration, 
e);
+                }
+            }
+        }
+
+        iteration += 1;
+    }
+
+    log::info!(
+        "Total rows from GPU join across {} iterations: {}",
+        iteration,
+        total_rows
+    );
+    // Test passes if GPU join completes without crashing and finds results
+    // The CUDA reference test loops through all batches to accumulate results
+    assert!(
+        total_rows > 0,
+        "Expected at least some results across {} iterations, got {}",
+        iteration,
+        total_rows
+    );
+    log::info!(
+        "GPU spatial join completed successfully with {} result rows",
+        total_rows
+    );
+}
+/// Helper execution plan that returns a single pre-loaded batch
+struct SingleBatchExec {
+    schema: Arc<Schema>,
+    batch: RecordBatch,
+    props: datafusion::physical_plan::PlanProperties,
+}
+
+impl SingleBatchExec {
+    fn new(batch: RecordBatch) -> Self {
+        let schema = batch.schema();
+        let eq_props = 
datafusion::physical_expr::EquivalenceProperties::new(schema.clone());
+        let partitioning = 
datafusion::physical_plan::Partitioning::UnknownPartitioning(1);
+        let props = datafusion::physical_plan::PlanProperties::new(
+            eq_props,
+            partitioning,
+            datafusion::physical_plan::execution_plan::EmissionType::Final,
+            datafusion::physical_plan::execution_plan::Boundedness::Bounded,
+        );
+        Self {
+            schema,
+            batch,
+            props,
+        }
+    }
+}
+
+impl std::fmt::Debug for SingleBatchExec {
+    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+        write!(f, "SingleBatchExec")
+    }
+}
+
+impl datafusion::physical_plan::DisplayAs for SingleBatchExec {
+    fn fmt_as(
+        &self,
+        _t: datafusion::physical_plan::DisplayFormatType,
+        f: &mut std::fmt::Formatter,
+    ) -> std::fmt::Result {
+        write!(f, "SingleBatchExec")
+    }
+}
+
+impl datafusion::physical_plan::ExecutionPlan for SingleBatchExec {
+    fn name(&self) -> &str {
+        "SingleBatchExec"
+    }
+
+    fn as_any(&self) -> &dyn std::any::Any {
+        self
+    }
+
+    fn schema(&self) -> Arc<Schema> {
+        self.schema.clone()
+    }
+
+    fn properties(&self) -> &datafusion::physical_plan::PlanProperties {
+        &self.props
+    }
+
+    fn children(&self) -> Vec<&Arc<dyn 
datafusion::physical_plan::ExecutionPlan>> {
+        vec![]
+    }
+
+    fn with_new_children(
+        self: Arc<Self>,
+        _children: Vec<Arc<dyn datafusion::physical_plan::ExecutionPlan>>,
+    ) -> datafusion_common::Result<Arc<dyn 
datafusion::physical_plan::ExecutionPlan>> {
+        Ok(self)
+    }
+
+    fn execute(
+        &self,
+        _partition: usize,
+        _context: Arc<datafusion::execution::context::TaskContext>,
+    ) -> 
datafusion_common::Result<datafusion::physical_plan::SendableRecordBatchStream> 
{
+        use datafusion::physical_plan::{RecordBatchStream, 
SendableRecordBatchStream};
+        use futures::Stream;
+        use std::pin::Pin;
+        use std::task::{Context, Poll};
+
+        struct OnceBatchStream {
+            schema: Arc<Schema>,
+            batch: Option<RecordBatch>,
+        }
+
+        impl Stream for OnceBatchStream {
+            type Item = datafusion_common::Result<RecordBatch>;
+
+            fn poll_next(
+                mut self: Pin<&mut Self>,
+                _cx: &mut Context<'_>,
+            ) -> Poll<Option<Self::Item>> {
+                Poll::Ready(self.batch.take().map(Ok))
+            }
+        }
+
+        impl RecordBatchStream for OnceBatchStream {
+            fn schema(&self) -> Arc<Schema> {
+                self.schema.clone()
+            }
+        }
+
+        Ok(Box::pin(OnceBatchStream {
+            schema: self.schema.clone(),
+            batch: Some(self.batch.clone()),
+        }) as SendableRecordBatchStream)
+    }
+}
+#[tokio::test]
+#[ignore] // Requires GPU hardware
+async fn test_gpu_spatial_join_correctness() {
+    use sedona_expr::scalar_udf::SedonaScalarUDF;
+    use sedona_geos::register::scalar_kernels;
+    use sedona_schema::crs::lnglat;
+    use sedona_schema::datatypes::{Edges, SedonaType, WKB_GEOMETRY};
+    use sedona_testing::create::create_array_storage;
+    use sedona_testing::testers::ScalarUdfTester;
+
+    let _ = env_logger::builder().is_test(true).try_init();
+
+    if !is_gpu_available() {
+        log::warn!("GPU not available, skipping test");
+        return;
+    }
+
+    // Use the same test data as the libgpuspatial reference test
+    let polygon_values = &[
+        Some("POLYGON ((30 10, 40 40, 20 40, 10 20, 30 10))"),
+        Some("POLYGON ((35 10, 45 45, 15 40, 10 20, 35 10), (20 30, 35 35, 30 
20, 20 30))"),
+        Some("POLYGON ((0 0, 10 0, 10 10, 0 10, 0 0), (2 2, 3 2, 3 3, 2 3, 2 
2), (6 6, 8 6, 8 8, 6 8, 6 6))"),
+        Some("POLYGON ((30 0, 60 20, 50 50, 10 50, 0 20, 30 0), (20 30, 25 40, 
15 40, 20 30), (30 30, 35 40, 25 40, 30 30), (40 30, 45 40, 35 40, 40 30))"),
+        Some("POLYGON ((40 0, 50 30, 80 20, 90 70, 60 90, 30 80, 20 40, 40 0), 
(50 20, 65 30, 60 50, 45 40, 50 20), (30 60, 50 70, 45 80, 30 60))"),
+    ];
+
+    let point_values = &[
+        Some("POINT (30 20)"), // poly0
+        Some("POINT (20 20)"), // poly1
+        Some("POINT (1 1)"),   // poly2
+        Some("POINT (70 70)"), // no match
+        Some("POINT (55 35)"), // poly4
+    ];
+
+    // Create Arrow arrays from WKT (shared for all predicates)
+    let polygons = create_array_storage(polygon_values, &WKB_GEOMETRY);
+    let points = create_array_storage(point_values, &WKB_GEOMETRY);
+
+    // Create RecordBatches (shared for all predicates)
+    let polygon_schema = Arc::new(Schema::new(vec![
+        Field::new("id", DataType::Int32, false),
+        Field::new("geometry", DataType::Binary, false),
+    ]));
+
+    let point_schema = Arc::new(Schema::new(vec![
+        Field::new("id", DataType::Int32, false),
+        Field::new("geometry", DataType::Binary, false),
+    ]));
+
+    let polygon_ids = Int32Array::from(vec![0, 1, 2, 3, 4]);
+    let point_ids = Int32Array::from(vec![0, 1, 2, 3, 4]);
+
+    let polygon_batch = RecordBatch::try_new(
+        polygon_schema.clone(),
+        vec![Arc::new(polygon_ids), polygons],
+    )
+    .unwrap();
+
+    let point_batch =
+        RecordBatch::try_new(point_schema.clone(), vec![Arc::new(point_ids), 
points]).unwrap();
+
+    // Pre-create CPU testers for all predicates (shared across all tests)
+    let kernels = scalar_kernels();
+    let sedona_type = SedonaType::Wkb(Edges::Planar, lnglat());
+
+    let _cpu_testers: std::collections::HashMap<&str, ScalarUdfTester> = [

Review Comment:
   Is there a reason this variable is not used / can we do this using a `for` 
loop to avoid this indirection?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to