This is an automated email from the ASF dual-hosted git repository.

paleolimbot pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/sedona-db.git


The following commit(s) were added to refs/heads/main by this push:
     new f6e0fff2 feat(rust/sedona-spatial-join-gpu): Integrate libgpuspatial 
into sedona-spatial-join (#722)
f6e0fff2 is described below

commit f6e0fff2ad9f21632d341bc6ea2349f06ece4807
Author: Liang Geng <[email protected]>
AuthorDate: Wed Apr 15 22:00:31 2026 +0800

    feat(rust/sedona-spatial-join-gpu): Integrate libgpuspatial into 
sedona-spatial-join (#722)
    
    Co-authored-by: Dewey Dunnington <[email protected]>
---
 .github/workflows/rust.yml                         |   2 +-
 Cargo.lock                                         |  43 +-
 Cargo.toml                                         |   3 +
 .../libgpuspatial/src/gpuspatial_c.cc              |   3 -
 rust/sedona-spatial-join-gpu/Cargo.toml            |  68 ++
 .../src/index.rs}                                  |  11 +-
 .../src/index/gpu_spatial_index.rs                 | 743 +++++++++++++++++++++
 .../src/index/gpu_spatial_index_builder.rs         | 286 ++++++++
 rust/sedona-spatial-join-gpu/src/join_provider.rs  | 142 ++++
 .../src/lib.rs}                                    |  13 +-
 rust/sedona-spatial-join-gpu/src/options.rs        |  51 ++
 .../src/physical_planner.rs                        | 202 ++++++
 rust/sedona-spatial-join/src/exec.rs               |   8 +
 rust/sedona-spatial-join/src/index.rs              |   4 +-
 rust/sedona-spatial-join/src/utils.rs              |   2 +-
 rust/sedona-spatial-join/src/utils/join_utils.rs   |   2 +-
 rust/sedona/Cargo.toml                             |   2 +
 rust/sedona/src/context.rs                         |  16 +
 18 files changed, 1573 insertions(+), 28 deletions(-)

diff --git a/.github/workflows/rust.yml b/.github/workflows/rust.yml
index d79d7a23..a7c51464 100644
--- a/.github/workflows/rust.yml
+++ b/.github/workflows/rust.yml
@@ -162,7 +162,7 @@ jobs:
       - name: Test
         if: matrix.name == 'test'
         run: |
-          cargo test --workspace --all-targets --all-features
+          cargo test --workspace --all-targets --all-features --exclude 
sedona-spatial-join-gpu
           # Clean up intermediate build artifacts to free disk space 
aggressively
           cargo clean -p sedona-s2geography
           rm -rf target/debug/deps
diff --git a/Cargo.lock b/Cargo.lock
index fa803af4..bb0a7417 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -2532,9 +2532,9 @@ checksum = 
"dea2df4cf52843e0452895c455a1a2cfbb842a1e7329671acf418fdc53ed4c59"
 
 [[package]]
 name = "fastrand"
-version = "2.4.0"
+version = "2.4.1"
 source = "registry+https://github.com/rust-lang/crates.io-index";
-checksum = "a043dc74da1e37d6afe657061213aa6f425f855399a11d3463c6ecccc4dfda1f"
+checksum = "9f1f227452a390804cdb637b74a86990f2a7d7ba4b7d5693aac9b4dd6defd8d6"
 
 [[package]]
 name = "fd-lock"
@@ -5216,6 +5216,7 @@ dependencies = [
  "sedona-s2geography",
  "sedona-schema",
  "sedona-spatial-join",
+ "sedona-spatial-join-gpu",
  "sedona-testing",
  "sedona-tg",
  "serde",
@@ -5753,6 +5754,40 @@ dependencies = [
  "wkt 0.14.0",
 ]
 
+[[package]]
+name = "sedona-spatial-join-gpu"
+version = "0.4.0"
+dependencies = [
+ "arrow",
+ "arrow-array",
+ "arrow-schema",
+ "async-trait",
+ "criterion",
+ "datafusion",
+ "datafusion-common",
+ "datafusion-physical-expr",
+ "env_logger 0.11.8",
+ "futures",
+ "geo-index",
+ "geo-types",
+ "log",
+ "parking_lot",
+ "rand 0.10.1",
+ "rstest",
+ "sedona-common",
+ "sedona-expr",
+ "sedona-functions",
+ "sedona-geo-generic-alg",
+ "sedona-libgpuspatial",
+ "sedona-query-planner",
+ "sedona-schema",
+ "sedona-spatial-join",
+ "sedona-testing",
+ "tokio",
+ "wkb",
+ "wkt 0.14.0",
+]
+
 [[package]]
 name = "sedona-testing"
 version = "0.4.0"
@@ -6350,9 +6385,9 @@ checksum = 
"1f3ccbac311fea05f86f61904b462b55fb3df8837a366dfc601a0161d0532f20"
 
 [[package]]
 name = "tokio"
-version = "1.51.0"
+version = "1.51.1"
 source = "registry+https://github.com/rust-lang/crates.io-index";
-checksum = "2bd1c4c0fc4a7ab90fc15ef6daaa3ec3b893f004f915f2392557ed23237820cd"
+checksum = "f66bf9585cda4b724d3e78ab34b73fb2bbaba9011b9bfdf69dc836382ea13b8c"
 dependencies = [
  "bytes",
  "libc",
diff --git a/Cargo.toml b/Cargo.toml
index c1436139..44db7a01 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -40,6 +40,7 @@ members = [
     "rust/sedona-raster-functions",
     "rust/sedona-schema",
     "rust/sedona-spatial-join",
+    "rust/sedona-spatial-join-gpu",
     "rust/sedona-query-planner",
     "rust/sedona-testing",
     "rust/sedona",
@@ -147,6 +148,7 @@ sedona-raster = { version = "0.4.0", path = 
"rust/sedona-raster" }
 sedona-raster-functions = { version = "0.4.0", path = 
"rust/sedona-raster-functions" }
 sedona-schema = { version = "0.4.0", path = "rust/sedona-schema" }
 sedona-spatial-join = { version = "0.4.0", path = "rust/sedona-spatial-join" }
+sedona-spatial-join-gpu = { version = "0.4.0", path = 
"rust/sedona-spatial-join-gpu" }
 sedona-query-planner = { version = "0.4.0", path = "rust/sedona-query-planner" 
}
 sedona-testing = { version = "0.4.0", path = "rust/sedona-testing" }
 
@@ -158,3 +160,4 @@ sedona-gdal = { version = "0.4.0", path = "c/sedona-gdal", 
default-features = fa
 sedona-proj = { version = "0.4.0", path = "c/sedona-proj", default-features = 
false }
 sedona-s2geography = { version = "0.4.0", path = "c/sedona-s2geography" }
 sedona-tg = { version = "0.4.0", path = "c/sedona-tg" }
+sedona-libgpuspatial  = { version = "0.4.0", path = "c/sedona-libgpuspatial" }
diff --git a/c/sedona-libgpuspatial/libgpuspatial/src/gpuspatial_c.cc 
b/c/sedona-libgpuspatial/libgpuspatial/src/gpuspatial_c.cc
index e0fae7a4..4f697622 100644
--- a/c/sedona-libgpuspatial/libgpuspatial/src/gpuspatial_c.cc
+++ b/c/sedona-libgpuspatial/libgpuspatial/src/gpuspatial_c.cc
@@ -82,9 +82,6 @@ struct GpuSpatialRuntimeExporter {
       std::string ptx_root(config->ptx_root);
       auto rt_config = gpuspatial::get_default_rt_config(ptx_root);
 
-      GPUSPATIAL_LOG_INFO("Initializing GpuSpatialRuntime on device %d, PTX 
root %s",
-                          config->device_id, config->ptx_root);
-
       CUDA_CHECK(cudaSetDevice(config->device_id));
 
       gpuspatial::MemoryManager::instance().Init(config->use_cuda_memory_pool,
diff --git a/rust/sedona-spatial-join-gpu/Cargo.toml 
b/rust/sedona-spatial-join-gpu/Cargo.toml
new file mode 100644
index 00000000..fb7d4b20
--- /dev/null
+++ b/rust/sedona-spatial-join-gpu/Cargo.toml
@@ -0,0 +1,68 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+[package]
+name = "sedona-spatial-join-gpu"
+version.workspace = true
+license.workspace = true
+keywords.workspace = true
+categories.workspace = true
+homepage.workspace = true
+repository.workspace = true
+description.workspace = true
+readme.workspace = true
+edition.workspace = true
+rust-version.workspace = true
+
+[lints.clippy]
+result_large_err = "allow"
+
+[features]
+gpu = ["sedona-libgpuspatial/gpu"]
+
+[dependencies]
+arrow = { workspace = true }
+arrow-array = { workspace = true }
+arrow-schema = { workspace = true }
+async-trait = { workspace = true }
+datafusion = { workspace = true }
+datafusion-common = { workspace = true }
+datafusion-physical-expr = { workspace = true }
+geo-types = { workspace = true }
+parking_lot = { workspace = true }
+sedona-common = { workspace = true }
+sedona-expr = { workspace = true }
+geo-index = { workspace = true }
+tokio = { workspace = true }
+sedona-schema = { workspace = true }
+sedona-functions = { workspace = true }
+sedona-spatial-join = { workspace = true }
+sedona-libgpuspatial = { workspace = true }
+sedona-geo-generic-alg = { workspace = true }
+sedona-query-planner = { workspace = true }
+futures = { workspace = true }
+wkb = { workspace = true }
+log = { workspace = true }
+
+[dev-dependencies]
+criterion = { workspace = true }
+datafusion = { workspace = true, features = ["sql"] }
+rstest = { workspace = true }
+sedona-testing = { workspace = true }
+wkt = { workspace = true }
+tokio = { workspace = true, features = ["macros"] }
+rand = { workspace = true }
+env_logger = { workspace = true }
diff --git a/rust/sedona-spatial-join/src/utils.rs 
b/rust/sedona-spatial-join-gpu/src/index.rs
similarity index 77%
copy from rust/sedona-spatial-join/src/utils.rs
copy to rust/sedona-spatial-join-gpu/src/index.rs
index ae784899..a3c80eb5 100644
--- a/rust/sedona-spatial-join/src/utils.rs
+++ b/rust/sedona-spatial-join-gpu/src/index.rs
@@ -15,11 +15,6 @@
 // specific language governing permissions and limitations
 // under the License.
 
-pub(crate) mod arrow_utils;
-pub(crate) mod bbox_sampler;
-pub(crate) mod disposable_async_cell;
-pub(crate) mod init_once_array;
-pub mod internal_benchmark_util;
-pub(crate) mod join_utils;
-pub(crate) mod once_fut;
-pub(crate) mod spill;
+pub(crate) mod gpu_spatial_index;
+pub(crate) mod gpu_spatial_index_builder;
+pub(crate) use gpu_spatial_index_builder::GPUSpatialIndexBuilder as 
GpuSpatialIndexBuilder;
diff --git a/rust/sedona-spatial-join-gpu/src/index/gpu_spatial_index.rs 
b/rust/sedona-spatial-join-gpu/src/index/gpu_spatial_index.rs
new file mode 100644
index 00000000..08c59caa
--- /dev/null
+++ b/rust/sedona-spatial-join-gpu/src/index/gpu_spatial_index.rs
@@ -0,0 +1,743 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::options::GpuOptions;
+use arrow::array::BooleanBufferBuilder;
+use arrow_array::{ArrayRef, RecordBatch};
+use arrow_schema::SchemaRef;
+use async_trait::async_trait;
+use datafusion_common::{DataFusionError, Result};
+use geo_types::{coord, Rect};
+use parking_lot::Mutex;
+use sedona_common::ExecutionMode;
+use sedona_expr::statistics::GeoStatistics;
+use sedona_libgpuspatial::{
+    GpuSpatialIndex, GpuSpatialOptions, GpuSpatialRefiner, 
GpuSpatialRelationPredicate,
+};
+use sedona_spatial_join::evaluated_batch::EvaluatedBatch;
+use sedona_spatial_join::index::spatial_index::SpatialIndex;
+use sedona_spatial_join::index::QueryResultMetrics;
+use sedona_spatial_join::spatial_predicate::SpatialRelationType;
+use sedona_spatial_join::SpatialPredicate;
+use std::ops::Range;
+use std::sync::atomic::{AtomicUsize, Ordering};
+use std::sync::Arc;
+use wkb::reader::Wkb;
+
+pub struct GPUSpatialIndex {
+    pub(crate) schema: SchemaRef,
+    /// GPU spatial index for performing GPU-accelerated filtering
+    pub(crate) index: Arc<GpuSpatialIndex>,
+    /// GPU spatial refiner for performing GPU-accelerated refinement
+    pub(crate) refiner: Arc<GpuSpatialRefiner>,
+    pub(crate) spatial_predicate: SpatialPredicate,
+    /// Indexed batches containing evaluated geometry arrays. It contains the 
original record
+    /// batches and geometry arrays obtained by evaluating the geometry 
expression on the build side.
+    pub(crate) indexed_batches: Vec<EvaluatedBatch>,
+    /// An array for translating data index to geometry batch index and row 
index
+    pub(crate) data_id_to_batch_pos: Vec<(i32, i32)>,
+    /// Shared bitmap builders for visited left indices, one per batch
+    pub(crate) visited_build_side: Option<Mutex<Vec<BooleanBufferBuilder>>>,
+    /// Counter of running probe-threads, potentially able to update `bitmap`.
+    /// Each time a probe thread finished probing the index, it will decrement 
the counter.
+    /// The last finished probe thread will produce the extra output batches 
for unmatched
+    /// build side when running left-outer joins. See also 
[`report_probe_completed`].
+    pub(crate) probe_threads_counter: AtomicUsize,
+}
+impl GPUSpatialIndex {
+    pub fn empty(
+        spatial_predicate: SpatialPredicate,
+        schema: SchemaRef,
+        gpu_options: GpuOptions,
+        visited_build_side: Option<Mutex<Vec<BooleanBufferBuilder>>>,
+        probe_threads_counter: AtomicUsize,
+    ) -> Result<Self> {
+        let gpu_libspatial_options = GpuSpatialOptions {
+            cuda_use_memory_pool: gpu_options.use_memory_pool,
+            cuda_memory_pool_init_percent: 
gpu_options.memory_pool_init_percentage as i32,
+            concurrency: 1,
+            device_id: gpu_options.device_id as i32,
+            compress_bvh: gpu_options.compress_bvh,
+            pipeline_batches: gpu_options.pipeline_batches as u32,
+        };
+
+        Ok(Self {
+            schema,
+            spatial_predicate,
+            index: Arc::new(
+                GpuSpatialIndex::try_new(&gpu_libspatial_options)
+                    .map_err(|e| DataFusionError::Execution(e.to_string()))?,
+            ),
+            refiner: Arc::new(
+                GpuSpatialRefiner::try_new(&gpu_libspatial_options)
+                    .map_err(|e| DataFusionError::Execution(e.to_string()))?,
+            ),
+            indexed_batches: vec![],
+            data_id_to_batch_pos: vec![],
+            visited_build_side,
+            probe_threads_counter,
+        })
+    }
+
+    fn refine(
+        &self,
+        probe_geoms: &ArrayRef,
+        predicate: &SpatialPredicate,
+        build_indices: &mut Vec<u32>,
+        probe_indices: &mut Vec<u32>,
+    ) -> Result<()> {
+        match predicate {
+            SpatialPredicate::Relation(rel_p) => {
+                self.refiner
+                    .refine(
+                        probe_geoms,
+                        Self::convert_relation_type(&rel_p.relation_type)?,
+                        build_indices,
+                        probe_indices,
+                    )
+                    .map_err(|e| {
+                        DataFusionError::Execution(format!(
+                            "GPU spatial refinement failed: {:?}",
+                            e
+                        ))
+                    })?;
+                Ok(())
+            }
+            _ => Err(DataFusionError::NotImplemented(
+                "Only Relation predicate is supported for GPU spatial 
query".to_string(),
+            )),
+        }
+    }
+    // Translate Sedona SpatialRelationType to GpuSpatialRelationPredicate
+    fn convert_relation_type(t: &SpatialRelationType) -> 
Result<GpuSpatialRelationPredicate> {
+        match t {
+            SpatialRelationType::Equals => 
Ok(GpuSpatialRelationPredicate::Equals),
+            SpatialRelationType::Touches => 
Ok(GpuSpatialRelationPredicate::Touches),
+            SpatialRelationType::Contains => 
Ok(GpuSpatialRelationPredicate::Contains),
+            SpatialRelationType::Covers => 
Ok(GpuSpatialRelationPredicate::Covers),
+            SpatialRelationType::Intersects => 
Ok(GpuSpatialRelationPredicate::Intersects),
+            SpatialRelationType::Within => 
Ok(GpuSpatialRelationPredicate::Within),
+            SpatialRelationType::CoveredBy => 
Ok(GpuSpatialRelationPredicate::CoveredBy),
+            _ => {
+                // This should not happen as we check for supported predicates 
earlier
+                Err(DataFusionError::Execution(format!(
+                    "Unsupported spatial relation type for GPU: {:?}",
+                    t
+                )))
+            }
+        }
+    }
+}
+
+#[async_trait]
+impl SpatialIndex for GPUSpatialIndex {
+    fn schema(&self) -> SchemaRef {
+        self.schema.clone()
+    }
+
+    fn num_indexed_batches(&self) -> usize {
+        self.indexed_batches.len()
+    }
+    fn get_indexed_batch(&self, batch_idx: usize) -> &RecordBatch {
+        &self.indexed_batches[batch_idx].batch
+    }
+
+    /// This method implements [`SpatialIndex::query_batch`] with 
GPU-accelerated spatial filtering
+    /// and refinement. It takes a batch of probe geometries and a range of 
row indices to process,
+    /// performs a spatial query on the GPU to find candidate matches,
+    /// refines the candidates using the GPU refiner,
+    /// and returns the matching build batch positions and probe indices.
+    async fn query_batch(
+        &self,
+        evaluated_batch: &Arc<EvaluatedBatch>,
+        range: Range<usize>,
+        _max_result_size: usize,
+        build_batch_positions: &mut Vec<(i32, i32)>,
+        probe_indices: &mut Vec<u32>,
+    ) -> Result<(QueryResultMetrics, usize)> {
+        if range.is_empty() {
+            return Ok((
+                QueryResultMetrics {
+                    count: 0,
+                    candidate_count: 0,
+                },
+                range.start,
+            ));
+        }
+        let index = &self.index.as_ref();
+
+        let empty_rect = Rect::new(
+            coord!(x: f32::NAN, y: f32::NAN),
+            coord!(x: f32::NAN, y: f32::NAN),
+        );
+        let rects: Vec<_> = range
+            .clone()
+            .map(|row_idx| 
evaluated_batch.geom_array.rects()[row_idx].unwrap_or(empty_rect))
+            .collect();
+
+        let (mut gpu_build_indices, mut gpu_probe_indices) =
+            index.probe(rects.as_ref()).map_err(|e| {
+                DataFusionError::Execution(format!("GPU spatial query failed: 
{:?}", e))
+            })?;
+
+        assert_eq!(gpu_build_indices.len(), gpu_probe_indices.len());
+
+        let candidate_count = gpu_build_indices.len();
+
+        self.refine(
+            evaluated_batch.geom_array.geometry_array(),
+            &self.spatial_predicate,
+            &mut gpu_build_indices,
+            &mut gpu_probe_indices,
+        )?;
+
+        assert_eq!(gpu_build_indices.len(), gpu_probe_indices.len());
+
+        let total_count = gpu_build_indices.len();
+
+        for (build_idx, probe_idx) in 
gpu_build_indices.iter().zip(gpu_probe_indices.iter()) {
+            let data_id = *build_idx as usize;
+            let (batch_idx, row_idx) = self.data_id_to_batch_pos[data_id];
+            build_batch_positions.push((batch_idx, row_idx));
+            probe_indices.push(range.start as u32 + probe_idx);
+        }
+        Ok((
+            QueryResultMetrics {
+                count: total_count,
+                candidate_count,
+            },
+            range.end,
+        ))
+    }
+    fn need_more_probe_stats(&self) -> bool {
+        false
+    }
+
+    fn merge_probe_stats(&self, _stats: GeoStatistics) {}
+
+    fn visited_build_side(&self) -> Option<&Mutex<Vec<BooleanBufferBuilder>>> {
+        self.visited_build_side.as_ref()
+    }
+
+    fn report_probe_completed(&self) -> bool {
+        self.probe_threads_counter.fetch_sub(1, Ordering::Relaxed) == 1
+    }
+
+    fn get_refiner_mem_usage(&self) -> usize {
+        0
+    }
+
+    fn get_actual_execution_mode(&self) -> ExecutionMode {
+        ExecutionMode::PrepareBuild // GPU-based spatial index is always on 
PrepareBuild mode
+    }
+
+    fn query_knn(
+        &self,
+        _probe_wkb: &Wkb,
+        _k: u32,
+        _use_spheroid: bool,
+        _include_tie_breakers: bool,
+        _build_batch_positions: &mut Vec<(i32, i32)>,
+        _distances: Option<&mut Vec<f64>>,
+    ) -> Result<QueryResultMetrics> {
+        Err(DataFusionError::NotImplemented(
+            "KNN query is not implemented for GPU spatial index".to_string(),
+        ))
+    }
+}
+
+#[cfg(test)]
+#[cfg(feature = "gpu")]
+mod tests {
+    use crate::index::gpu_spatial_index_builder::GPUSpatialIndexBuilder;
+    use crate::options::GpuOptions;
+    use arrow_array::RecordBatch;
+    use arrow_schema::{DataType, Field, SchemaRef};
+    use datafusion_common::JoinType;
+    use datafusion_physical_expr::expressions::Column;
+    use futures::Stream;
+    use sedona_expr::statistics::GeoStatistics;
+    use sedona_schema::datatypes::WKB_GEOMETRY;
+    use sedona_spatial_join::evaluated_batch::evaluated_batch_stream::{
+        EvaluatedBatchStream, SendableEvaluatedBatchStream,
+    };
+    use sedona_spatial_join::evaluated_batch::EvaluatedBatch;
+    use sedona_spatial_join::index::spatial_index::SpatialIndexRef;
+    use sedona_spatial_join::index::spatial_index_builder::{
+        SpatialIndexBuilder, SpatialJoinBuildMetrics,
+    };
+    use sedona_spatial_join::operand_evaluator::EvaluatedGeometryArray;
+    use sedona_spatial_join::spatial_predicate::{RelationPredicate, 
SpatialRelationType};
+    use sedona_spatial_join::SpatialPredicate;
+    use sedona_testing::create::create_array;
+    use std::pin::Pin;
+    use std::sync::Arc;
+    use std::task::{Context, Poll};
+    use std::vec::IntoIter;
+
+    pub struct SingleBatchStream {
+        // We use an Option so we can `take()` it on the first poll,
+        // leaving `None` for subsequent polls to signal the end of the stream.
+        batch: Option<EvaluatedBatch>,
+        schema: SchemaRef,
+    }
+
+    impl SingleBatchStream {
+        pub fn new(batch: EvaluatedBatch, schema: SchemaRef) -> Self {
+            Self {
+                batch: Some(batch),
+                schema,
+            }
+        }
+    }
+
+    impl Stream for SingleBatchStream {
+        type Item = datafusion_common::Result<EvaluatedBatch>; // Or 
Result<EvaluatedBatch, DataFusionError>
+
+        fn poll_next(mut self: Pin<&mut Self>, _cx: &mut Context<'_>) -> 
Poll<Option<Self::Item>> {
+            // `take()` removes the value from the Option, leaving `None` in 
its place.
+            // If there is a batch, it maps it to `Some(Ok(batch))`.
+            // If it's already empty, it returns `None`.
+            Poll::Ready(self.batch.take().map(Ok))
+        }
+    }
+
+    impl EvaluatedBatchStream for SingleBatchStream {
+        fn is_external(&self) -> bool {
+            false
+        }
+
+        fn schema(&self) -> SchemaRef {
+            self.schema.clone()
+        }
+    }
+
+    async fn build_index(
+        builder: GPUSpatialIndexBuilder,
+        indexed_batch: EvaluatedBatch,
+        schema: SchemaRef,
+    ) -> SpatialIndexRef {
+        let single_batch_stream = SingleBatchStream::new(indexed_batch, 
schema);
+        let sendable_stream: SendableEvaluatedBatchStream = 
Box::pin(single_batch_stream);
+        let stats = GeoStatistics::empty();
+        let mut builder = Box::new(builder);
+        builder.add_stream(sendable_stream, stats).await.unwrap();
+        builder.finish().unwrap()
+    }
+
+    // 1. Create a new stream struct for multiple batches
+    pub struct VecBatchStream {
+        // IntoIter allows us to cleanly pop items off the vector one by one
+        batches: IntoIter<EvaluatedBatch>,
+        schema: SchemaRef,
+    }
+
+    impl VecBatchStream {
+        pub fn new(batches: Vec<EvaluatedBatch>, schema: SchemaRef) -> Self {
+            Self {
+                batches: batches.into_iter(),
+                schema,
+            }
+        }
+    }
+
+    impl Stream for VecBatchStream {
+        type Item = datafusion_common::Result<EvaluatedBatch>;
+
+        fn poll_next(mut self: Pin<&mut Self>, _cx: &mut Context<'_>) -> 
Poll<Option<Self::Item>> {
+            // `next()` on IntoIter returns Option<EvaluatedBatch>
+            // We map it to Option<Result<EvaluatedBatch>> to match the 
stream's Item type
+            Poll::Ready(self.batches.next().map(Ok))
+        }
+    }
+
+    impl EvaluatedBatchStream for VecBatchStream {
+        fn is_external(&self) -> bool {
+            false
+        }
+
+        fn schema(&self) -> SchemaRef {
+            self.schema.clone()
+        }
+    }
+
+    // 2. Write the new build_index function that accepts the Vec
+    async fn build_index_from_vec(
+        mut builder: Box<GPUSpatialIndexBuilder>,
+        indexed_batches: Vec<EvaluatedBatch>,
+        schema: SchemaRef,
+    ) -> SpatialIndexRef {
+        let vec_batch_stream = VecBatchStream::new(indexed_batches, schema);
+        let sendable_stream: SendableEvaluatedBatchStream = 
Box::pin(vec_batch_stream);
+
+        let stats = GeoStatistics::empty();
+
+        // Add the stream of multiple batches to the builder
+        builder.add_stream(sendable_stream, stats).await.unwrap();
+        builder.finish().unwrap()
+    }
+    #[test]
+    fn test_spatial_index_builder_empty() {
+        let options = GpuOptions {
+            enable: true,
+            ..Default::default()
+        };
+        let metrics = SpatialJoinBuildMetrics::default();
+        let schema = Arc::new(arrow_schema::Schema::empty());
+        let spatial_predicate = 
SpatialPredicate::Relation(RelationPredicate::new(
+            Arc::new(Column::new("geom", 0)),
+            Arc::new(Column::new("geom", 1)),
+            SpatialRelationType::Intersects,
+        ));
+
+        let mut builder = Box::new(GPUSpatialIndexBuilder::new(
+            schema.clone(),
+            spatial_predicate,
+            options,
+            JoinType::Inner,
+            4,
+            metrics,
+        ));
+
+        // Test finishing with empty data
+        let index = builder.finish().unwrap();
+        assert_eq!(index.schema(), schema);
+        assert_eq!(index.num_indexed_batches(), 0);
+    }
+
+    #[tokio::test]
+    async fn test_spatial_index_builder_add_batch() {
+        let options = GpuOptions {
+            enable: true,
+            ..Default::default()
+        };
+        let metrics = SpatialJoinBuildMetrics::default();
+
+        let spatial_predicate = 
SpatialPredicate::Relation(RelationPredicate::new(
+            Arc::new(Column::new("geom", 0)),
+            Arc::new(Column::new("geom", 1)),
+            SpatialRelationType::Intersects,
+        ));
+
+        // Create a simple test geometry batch
+        let schema = Arc::new(arrow_schema::Schema::new(vec![Field::new(
+            "geom",
+            DataType::Binary,
+            true,
+        )]));
+
+        let builder = GPUSpatialIndexBuilder::new(
+            schema.clone(),
+            spatial_predicate,
+            options,
+            JoinType::Inner,
+            4,
+            metrics,
+        );
+
+        let geom_batch = create_array(
+            &[
+                Some("POINT (0.25 0.25)"),
+                Some("POINT (10 10)"),
+                None,
+                Some("POINT (0.25 0.25)"),
+            ],
+            &WKB_GEOMETRY,
+        );
+        let batch =
+            RecordBatch::try_new(schema.clone(), 
vec![Arc::new(geom_batch.clone())]).unwrap();
+        let indexed_batch = EvaluatedBatch {
+            batch,
+            geom_array: EvaluatedGeometryArray::try_new(geom_batch, 
&WKB_GEOMETRY).unwrap(),
+        };
+        let index = build_index(builder, indexed_batch, schema.clone()).await;
+
+        assert_eq!(index.schema(), schema);
+        assert_eq!(index.num_indexed_batches(), 1);
+    }
+
+    #[tokio::test]
+    async fn test_spatial_index_builder_add_multiple_batches() {
+        let gpu_options = GpuOptions {
+            enable: true,
+            concat_build: false,
+            ..Default::default()
+        };
+        let metrics = SpatialJoinBuildMetrics::default();
+
+        let spatial_predicate = 
SpatialPredicate::Relation(RelationPredicate::new(
+            Arc::new(Column::new("geom", 0)),
+            Arc::new(Column::new("geom", 1)),
+            SpatialRelationType::Intersects,
+        ));
+
+        // Create the schema
+        let schema = Arc::new(arrow_schema::Schema::new(vec![Field::new(
+            "geom",
+            DataType::Binary,
+            true,
+        )]));
+
+        // Initialize the builder
+        let builder = GPUSpatialIndexBuilder::new(
+            schema.clone(),
+            spatial_predicate,
+            gpu_options,
+            JoinType::Inner,
+            4,
+            metrics,
+        );
+
+        // --- Create Batch 1 ---
+        let geom_batch1 = create_array(
+            &[
+                Some("POINT (0.25 0.25)"),
+                Some("POINT (10 10)"),
+                None,
+                Some("POINT (0.25 0.25)"),
+            ],
+            &WKB_GEOMETRY,
+        );
+        let batch1 =
+            RecordBatch::try_new(schema.clone(), 
vec![Arc::new(geom_batch1.clone())]).unwrap();
+        let evaluated_batch1 = EvaluatedBatch {
+            batch: batch1,
+            geom_array: EvaluatedGeometryArray::try_new(geom_batch1, 
&WKB_GEOMETRY).unwrap(),
+        };
+
+        // --- Create Batch 2 ---
+        let geom_batch2 = create_array(
+            &[
+                Some("POINT (1 1)"),
+                Some("POINT (5 5)"),
+                Some("POINT (20 20)"),
+            ],
+            &WKB_GEOMETRY,
+        );
+        let batch2 =
+            RecordBatch::try_new(schema.clone(), 
vec![Arc::new(geom_batch2.clone())]).unwrap();
+        let evaluated_batch2 = EvaluatedBatch {
+            batch: batch2,
+            geom_array: EvaluatedGeometryArray::try_new(geom_batch2, 
&WKB_GEOMETRY).unwrap(),
+        };
+
+        // --- Build the Index ---
+        // Combine them into a Vec and use the multi-batch builder function
+        let indexed_batches = vec![evaluated_batch1, evaluated_batch2];
+
+        // Note: This relies on the `build_index_from_vec` function we created 
earlier
+        let index = build_index_from_vec(Box::new(builder), indexed_batches, 
schema.clone()).await;
+
+        // --- Assertions ---
+        assert_eq!(index.schema(), schema);
+
+        // Assert that exactly 2 batches were indexed
+        assert_eq!(index.num_indexed_batches(), 2);
+    }
+
+    async fn setup_index_for_batch_test(
+        build_geoms: &[Option<&str>],
+        options: GpuOptions,
+    ) -> SpatialIndexRef {
+        let metrics = SpatialJoinBuildMetrics::default();
+        let spatial_predicate = 
SpatialPredicate::Relation(RelationPredicate::new(
+            Arc::new(Column::new("left", 0)),
+            Arc::new(Column::new("right", 0)),
+            SpatialRelationType::Intersects,
+        ));
+        let schema = Arc::new(arrow_schema::Schema::new(vec![Field::new(
+            "geom",
+            DataType::Binary,
+            true,
+        )]));
+
+        let builder = GPUSpatialIndexBuilder::new(
+            schema.clone(),
+            spatial_predicate,
+            options,
+            JoinType::Inner,
+            1,
+            metrics,
+        );
+
+        let geom_array = create_array(build_geoms, &WKB_GEOMETRY);
+        let batch = RecordBatch::try_new(
+            Arc::new(arrow_schema::Schema::new(vec![Field::new(
+                "geom",
+                DataType::Binary,
+                true,
+            )])),
+            vec![Arc::new(geom_array.clone())],
+        )
+        .unwrap();
+        let evaluated_batch = EvaluatedBatch {
+            batch,
+            geom_array: EvaluatedGeometryArray::try_new(geom_array, 
&WKB_GEOMETRY).unwrap(),
+        };
+        build_index(builder, evaluated_batch, schema).await
+    }
+
+    fn create_probe_batch(probe_geoms: &[Option<&str>]) -> Arc<EvaluatedBatch> 
{
+        let geom_array = create_array(probe_geoms, &WKB_GEOMETRY);
+        let batch = RecordBatch::try_new(
+            Arc::new(arrow_schema::Schema::new(vec![Field::new(
+                "geom",
+                DataType::Binary,
+                true,
+            )])),
+            vec![Arc::new(geom_array.clone())],
+        )
+        .unwrap();
+        Arc::new(EvaluatedBatch {
+            batch,
+            geom_array: EvaluatedGeometryArray::try_new(geom_array, 
&WKB_GEOMETRY).unwrap(),
+        })
+    }
+    #[tokio::test]
+    async fn test_query_batch_empty_results() {
+        let build_geoms = &[Some("POINT (0 0)"), Some("POINT (1 1)")];
+        let options = GpuOptions {
+            enable: true,
+            ..Default::default()
+        };
+        let index = setup_index_for_batch_test(build_geoms, options).await;
+
+        // Probe with geometries that don't intersect
+        let probe_geoms = &[Some("POINT (10 10)"), Some("POINT (20 20)")];
+        let probe_batch = create_probe_batch(probe_geoms);
+
+        let mut build_batch_positions = Vec::new();
+        let mut probe_indices = Vec::new();
+        let (metrics, next_idx) = index
+            .query_batch(
+                &probe_batch,
+                0..2,
+                usize::MAX,
+                &mut build_batch_positions,
+                &mut probe_indices,
+            )
+            .await
+            .unwrap();
+
+        assert_eq!(metrics.count, 0);
+        assert_eq!(build_batch_positions.len(), 0);
+        assert_eq!(probe_indices.len(), 0);
+        assert_eq!(next_idx, 2);
+    }
+
+    #[tokio::test]
+    async fn test_query_batch_non_empty_results_multiple_build_batches() {
+        let gpu_options = GpuOptions {
+            enable: true,
+            concat_build: false,
+            ..Default::default()
+        };
+        let metrics = SpatialJoinBuildMetrics::default();
+
+        let spatial_predicate = 
SpatialPredicate::Relation(RelationPredicate::new(
+            Arc::new(Column::new("geom", 0)),
+            Arc::new(Column::new("geom", 1)),
+            SpatialRelationType::Intersects,
+        ));
+
+        let schema = Arc::new(arrow_schema::Schema::new(vec![Field::new(
+            "geom",
+            DataType::Binary,
+            true,
+        )]));
+
+        let builder = GPUSpatialIndexBuilder::new(
+            schema.clone(),
+            spatial_predicate,
+            gpu_options,
+            JoinType::Inner,
+            4,
+            metrics,
+        );
+
+        // --- Build Side: Multiple Batches of Polygons ---
+        // Batch 0
+        let build_geom_batch0 = create_array(
+            &[
+                Some("POLYGON ((0 0, 0 10, 10 10, 10 0, 0 0))"),
+                Some("POLYGON ((20 20, 20 30, 30 30, 30 20, 20 20))"),
+            ],
+            &WKB_GEOMETRY,
+        );
+        let build_batch0 =
+            RecordBatch::try_new(schema.clone(), 
vec![Arc::new(build_geom_batch0.clone())])
+                .unwrap();
+        let evaluated_build0 = EvaluatedBatch {
+            batch: build_batch0,
+            geom_array: EvaluatedGeometryArray::try_new(build_geom_batch0, 
&WKB_GEOMETRY).unwrap(),
+        };
+
+        // Batch 1
+        let build_geom_batch1 = create_array(
+            &[Some("POLYGON ((40 40, 40 50, 50 50, 50 40, 40 40))")],
+            &WKB_GEOMETRY,
+        );
+        let build_batch1 =
+            RecordBatch::try_new(schema.clone(), 
vec![Arc::new(build_geom_batch1.clone())])
+                .unwrap();
+        let evaluated_build1 = EvaluatedBatch {
+            batch: build_batch1,
+            geom_array: EvaluatedGeometryArray::try_new(build_geom_batch1, 
&WKB_GEOMETRY).unwrap(),
+        };
+
+        // Build the multi-batch index using the helper function we created 
previously
+        let indexed_batches = vec![evaluated_build0, evaluated_build1];
+        let index = build_index_from_vec(Box::new(builder), indexed_batches, 
schema.clone()).await;
+
+        // --- Probe Side: One Batch of Points ---
+        let probe_geoms = &[
+            Some("POINT (5 5)"),     // Matches Batch 0, Row 0
+            Some("POINT (100 100)"), // No match
+            Some("POINT (25 25)"),   // Matches Batch 0, Row 1
+            Some("POINT (45 45)"),   // Matches Batch 1, Row 0
+        ];
+        let probe_batch = create_probe_batch(probe_geoms);
+
+        // --- Execute Query ---
+        let mut build_batch_positions = Vec::new();
+        let mut probe_indices = Vec::new();
+
+        // Probing all 4 points
+        let (query_metrics, next_idx) = index
+            .query_batch(
+                &probe_batch,
+                0..4,
+                usize::MAX,
+                &mut build_batch_positions,
+                &mut probe_indices,
+            )
+            .await
+            .unwrap();
+
+        // --- Assertions ---
+        // We expect exactly 3 matches out of the 4 probe points
+        assert_eq!(query_metrics.count, 3);
+        assert_eq!(build_batch_positions.len(), 3);
+        assert_eq!(probe_indices.len(), 3);
+
+        // The probe indices should match the rows in our probe batch that 
successfully intersected
+        assert_eq!(probe_indices, vec![0, 2, 3]);
+
+        // The query processed all 4 probe points
+        assert_eq!(next_idx, 4);
+    }
+}
diff --git 
a/rust/sedona-spatial-join-gpu/src/index/gpu_spatial_index_builder.rs 
b/rust/sedona-spatial-join-gpu/src/index/gpu_spatial_index_builder.rs
new file mode 100644
index 00000000..224e7852
--- /dev/null
+++ b/rust/sedona-spatial-join-gpu/src/index/gpu_spatial_index_builder.rs
@@ -0,0 +1,286 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::index::gpu_spatial_index::GPUSpatialIndex;
+use crate::options::GpuOptions;
+use arrow::array::BooleanBufferBuilder;
+use arrow::compute::concat_batches;
+use arrow_array::RecordBatch;
+use arrow_schema::SchemaRef;
+use async_trait::async_trait;
+use datafusion_common::Result;
+use datafusion_common::{DataFusionError, JoinType};
+use futures::StreamExt;
+use geo_types::{coord, Rect};
+use parking_lot::Mutex;
+use sedona_common::{sedona_internal_err, SpatialJoinOptions};
+use sedona_expr::statistics::GeoStatistics;
+use sedona_libgpuspatial::{GpuSpatialIndex, GpuSpatialOptions, 
GpuSpatialRefiner};
+use 
sedona_spatial_join::evaluated_batch::evaluated_batch_stream::SendableEvaluatedBatchStream;
+use sedona_spatial_join::evaluated_batch::EvaluatedBatch;
+use sedona_spatial_join::index::spatial_index::SpatialIndexRef;
+use sedona_spatial_join::index::spatial_index_builder::{
+    SpatialIndexBuilder, SpatialJoinBuildMetrics,
+};
+use sedona_spatial_join::operand_evaluator::EvaluatedGeometryArray;
+use sedona_spatial_join::utils::join_utils::need_produce_result_in_final;
+use sedona_spatial_join::SpatialPredicate;
+use std::sync::atomic::AtomicUsize;
+use std::sync::Arc;
+
+pub(crate) struct GPUSpatialIndexBuilder {
+    schema: SchemaRef,
+    spatial_predicate: SpatialPredicate,
+    gpu_options: GpuOptions,
+    join_type: JoinType,
+    probe_threads_count: usize,
+    metrics: SpatialJoinBuildMetrics,
+    /// Batches to be indexed
+    indexed_batches: Vec<EvaluatedBatch>,
+    /// Statistics for indexed geometries
+    memory_used: usize,
+}
+
+impl GPUSpatialIndexBuilder {
+    pub fn new(
+        schema: SchemaRef,
+        spatial_predicate: SpatialPredicate,
+        gpu_options: GpuOptions,
+        join_type: JoinType,
+        probe_threads_count: usize,
+        metrics: SpatialJoinBuildMetrics,
+    ) -> Self {
+        Self {
+            schema,
+            spatial_predicate,
+            gpu_options,
+            join_type,
+            probe_threads_count,
+            metrics,
+            indexed_batches: vec![],
+            memory_used: 0,
+        }
+    }
+
+    fn build_visited_bitmaps(&mut self) -> 
Result<Option<Mutex<Vec<BooleanBufferBuilder>>>> {
+        if !need_produce_result_in_final(self.join_type) {
+            return Ok(None);
+        }
+
+        let mut bitmaps = Vec::with_capacity(self.indexed_batches.len());
+        let mut total_buffer_size = 0;
+
+        for batch in &self.indexed_batches {
+            let batch_rows = batch.batch.num_rows();
+            let buffer_size = batch_rows.div_ceil(8);
+            total_buffer_size += buffer_size;
+
+            let mut bitmap = BooleanBufferBuilder::new(batch_rows);
+            bitmap.append_n(batch_rows, false);
+            bitmaps.push(bitmap);
+        }
+
+        self.record_memory_usage(total_buffer_size);
+
+        Ok(Some(Mutex::new(bitmaps)))
+    }
+
+    fn record_memory_usage(&mut self, bytes: usize) {
+        self.memory_used += bytes;
+        self.metrics.build_mem_used.set_max(self.memory_used);
+    }
+    /// Add a geometry batch to be indexed.
+    ///
+    /// This method accumulates geometry batches that will be used to build 
the spatial index.
+    /// Each batch contains processed geometry data along with memory usage 
information.
+    fn add_batch(&mut self, indexed_batch: EvaluatedBatch) -> Result<()> {
+        let in_mem_size = indexed_batch.in_mem_size()?;
+        self.indexed_batches.push(indexed_batch);
+        self.record_memory_usage(in_mem_size);
+        Ok(())
+    }
+
+    pub(crate) fn estimate_extra_memory_usage(
+        geo_stats: &GeoStatistics,
+        _spatial_predicate: &SpatialPredicate,
+        _options: &SpatialJoinOptions,
+    ) -> usize {
+        let num_geoms = geo_stats.total_geometries().unwrap_or(0) as usize;
+        // This line estimates the size of temporary memory space for an array 
of bounding boxes,
+        // which will be fed to the GPU library to build an on-device index.
+        // The memory is allocated until the finishing of a spatial join.
+        // Each geometry requires 4 f32 values to store the bounding rectangle 
(min_x, min_y, max_x, max_y)
+        num_geoms * (4 * 4)
+    }
+}
+fn concat_evaluated_batches(batches: &[EvaluatedBatch]) -> 
Result<EvaluatedBatch> {
+    if batches.is_empty() {
+        return sedona_internal_err!("Cannot concatenate empty list of 
EvaluatedBatches");
+    }
+
+    // 1. Concatenate the underlying Arrow RecordBatches
+    let schema = batches[0].schema();
+    let record_batches: Vec<&RecordBatch> = batches.iter().map(|b| 
&b.batch).collect();
+    let concatenated_batch = concat_batches(&schema, record_batches)?;
+
+    // 2. Prepare for Geometry Interleaving
+    // We need to create a list of (batch_index, row_index) for every row in 
order
+    let mut indices = Vec::with_capacity(concatenated_batch.num_rows());
+    for (batch_idx, batch) in batches.iter().enumerate() {
+        for row_idx in 0..batch.num_rows() {
+            indices.push((batch_idx, row_idx));
+        }
+    }
+
+    // 3. Concatenate Geometry Arrays using the interleave method
+    let geom_arrays: Vec<&EvaluatedGeometryArray> = batches.iter().map(|b| 
&b.geom_array).collect();
+
+    let concatenated_geom_array = 
EvaluatedGeometryArray::interleave(&geom_arrays, &indices)?;
+
+    Ok(EvaluatedBatch {
+        batch: concatenated_batch,
+        geom_array: concatenated_geom_array,
+    })
+}
+#[async_trait]
+impl SpatialIndexBuilder for GPUSpatialIndexBuilder {
+    /// Finish building and return the completed SpatialIndex.
+    fn finish(&mut self) -> Result<SpatialIndexRef> {
+        if self.indexed_batches.is_empty() {
+            let visited_build_side = self.build_visited_bitmaps()?;
+            return Ok(Arc::new(GPUSpatialIndex::empty(
+                self.spatial_predicate.clone(),
+                self.schema.clone(),
+                self.gpu_options.clone(),
+                visited_build_side,
+                AtomicUsize::new(self.probe_threads_count),
+            )?));
+        }
+        let build_timer = self.metrics.build_time.timer();
+        let gpu_options = GpuSpatialOptions {
+            cuda_use_memory_pool: self.gpu_options.use_memory_pool,
+            cuda_memory_pool_init_percent: 
self.gpu_options.memory_pool_init_percentage as i32,
+            concurrency: self.probe_threads_count as u32,
+            device_id: self.gpu_options.device_id as i32,
+            compress_bvh: self.gpu_options.compress_bvh,
+            pipeline_batches: self.gpu_options.pipeline_batches as u32,
+        };
+
+        let mut index = GpuSpatialIndex::try_new(&gpu_options).map_err(|e| {
+            DataFusionError::Execution(format!("Failed to initialize GPU 
Index: {e:?}"))
+        })?;
+        let mut refiner = GpuSpatialRefiner::try_new(&gpu_options).map_err(|e| 
{
+            DataFusionError::Execution(format!("Failed to initialize GPU 
Refiner: {e:?}"))
+        })?;
+
+        // Concat indexed batches into a single batch to reduce build time
+        let total_rows: usize = self
+            .indexed_batches
+            .iter()
+            .map(|batch| batch.batch.num_rows())
+            .sum();
+
+        let sedona_type = 
self.indexed_batches[0].geom_array.sedona_type().clone();
+
+        if self.gpu_options.concat_build {
+            let concat_batch = 
concat_evaluated_batches(&self.indexed_batches)?;
+            self.indexed_batches.clear();
+            self.indexed_batches.push(concat_batch);
+        }
+        let mut data_id_to_batch_pos: Vec<(i32, i32)> = Vec::with_capacity(
+            self.indexed_batches
+                .iter()
+                .map(|x| x.batch.num_rows())
+                .sum(),
+        );
+        let empty_rect = Rect::new(
+            coord!(x: f32::NAN, y: f32::NAN),
+            coord!(x: f32::NAN, y: f32::NAN),
+        );
+
+        refiner
+            .init_build_schema(sedona_type.storage_type())
+            .map_err(|e| {
+                DataFusionError::Execution(format!("Failed to init schema for 
refiner {e:?}"))
+            })?;
+
+        let mut native_rects = Vec::with_capacity(total_rows);
+
+        for (batch_idx, batch) in self.indexed_batches.iter().enumerate() {
+            let rects = batch.geom_array.rects();
+
+            for (idx, rect_opt) in rects.iter().enumerate() {
+                if let Some(rect) = rect_opt {
+                    native_rects.push(*rect);
+                } else {
+                    native_rects.push(empty_rect);
+                }
+                data_id_to_batch_pos.push((batch_idx as i32, idx as i32));
+            }
+            refiner
+                .push_build(batch.geom_array.geometry_array())
+                .map_err(|e| {
+                    DataFusionError::Execution(format!(
+                        "Failed to add geometries to GPU refiner {e:?}"
+                    ))
+                })?;
+        }
+
+        refiner.finish_building().map_err(|e| {
+            DataFusionError::Execution(format!("Failed to build spatial 
refiner on GPU {e:?}"))
+        })?;
+
+        // Add rectangles from build side to the spatial index
+        index.push_build(&native_rects).map_err(|e| {
+            DataFusionError::Execution(format!(
+                "Failed to push rectangles to GPU spatial index {e:?}"
+            ))
+        })?;
+
+        index.finish_building().map_err(|e| {
+            DataFusionError::Execution(format!("Failed to build spatial index 
on GPU {e:?}"))
+        })?;
+        build_timer.done();
+        let visited_build_side = self.build_visited_bitmaps()?;
+        // Build index for rectangle queries
+        Ok(Arc::new(GPUSpatialIndex {
+            schema: self.schema.clone(),
+            index: Arc::new(index),
+            refiner: Arc::new(refiner),
+            spatial_predicate: self.spatial_predicate.clone(),
+            indexed_batches: self
+                .indexed_batches
+                .drain(0..self.indexed_batches.len())
+                .collect(),
+            data_id_to_batch_pos,
+            visited_build_side,
+            probe_threads_counter: AtomicUsize::new(self.probe_threads_count),
+        }))
+    }
+
+    async fn add_stream(
+        &mut self,
+        mut stream: SendableEvaluatedBatchStream,
+        _geo_statistics: GeoStatistics,
+    ) -> Result<()> {
+        while let Some(batch) = stream.next().await {
+            let indexed_batch = batch?;
+            self.add_batch(indexed_batch)?;
+        }
+        Ok(())
+    }
+}
diff --git a/rust/sedona-spatial-join-gpu/src/join_provider.rs 
b/rust/sedona-spatial-join-gpu/src/join_provider.rs
new file mode 100644
index 00000000..3a206180
--- /dev/null
+++ b/rust/sedona-spatial-join-gpu/src/join_provider.rs
@@ -0,0 +1,142 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::index::GpuSpatialIndexBuilder;
+use crate::options::GpuOptions;
+use arrow_array::ArrayRef;
+use arrow_schema::SchemaRef;
+use datafusion_common::JoinType;
+use datafusion_common::Result;
+use geo_index::rtree::util::f64_box_to_f32;
+use geo_types::{coord, Rect};
+use sedona_common::SpatialJoinOptions;
+use sedona_expr::statistics::GeoStatistics;
+use sedona_functions::executor::IterGeo;
+use sedona_geo_generic_alg::BoundingRect;
+use sedona_schema::datatypes::SedonaType;
+use sedona_spatial_join::index::spatial_index_builder::{
+    SpatialIndexBuilder, SpatialJoinBuildMetrics,
+};
+use sedona_spatial_join::join_provider::SpatialJoinProvider;
+use sedona_spatial_join::operand_evaluator::{
+    EvaluatedGeometryArray, EvaluatedGeometryArrayFactory,
+};
+use sedona_spatial_join::SpatialPredicate;
+use std::sync::Arc;
+use wkb::reader::GeometryType;
+
+#[derive(Debug)]
+pub(crate) struct GpuSpatialJoinProvider {
+    gpu_options: GpuOptions,
+}
+
+impl GpuSpatialJoinProvider {
+    pub(crate) fn new(gpu_options: GpuOptions) -> Self {
+        Self { gpu_options }
+    }
+}
+
+impl SpatialJoinProvider for GpuSpatialJoinProvider {
+    fn try_new_spatial_index_builder(
+        &self,
+        schema: SchemaRef,
+        spatial_predicate: SpatialPredicate,
+        _options: SpatialJoinOptions,
+        join_type: JoinType,
+        probe_threads_count: usize,
+        metrics: SpatialJoinBuildMetrics,
+    ) -> Result<Box<dyn SpatialIndexBuilder>> {
+        let builder = GpuSpatialIndexBuilder::new(
+            schema,
+            spatial_predicate,
+            self.gpu_options.clone(),
+            join_type,
+            probe_threads_count,
+            metrics,
+        );
+        Ok(Box::new(builder))
+    }
+
+    fn estimate_extra_memory_usage(
+        &self,
+        geo_stats: &GeoStatistics,
+        spatial_predicate: &SpatialPredicate,
+        options: &SpatialJoinOptions,
+    ) -> usize {
+        GpuSpatialIndexBuilder::estimate_extra_memory_usage(geo_stats, 
spatial_predicate, options)
+    }
+
+    fn evaluated_array_factory(&self) -> Arc<dyn 
EvaluatedGeometryArrayFactory> {
+        Arc::new(DefaultGeometryArrayFactory)
+    }
+}
+
+#[derive(Debug)]
+pub(crate) struct DefaultGeometryArrayFactory;
+
+impl EvaluatedGeometryArrayFactory for DefaultGeometryArrayFactory {
+    fn try_new_evaluated_array(
+        &self,
+        geometry_array: ArrayRef,
+        sedona_type: &SedonaType,
+    ) -> Result<EvaluatedGeometryArray> {
+        let num_rows = geometry_array.len();
+        let mut rect_vec = Vec::with_capacity(num_rows);
+        geometry_array.iter_as_wkb(sedona_type, num_rows, |wkb_opt| {
+            let rect_opt = if let Some(wkb) = &wkb_opt {
+                // This piece of code checks whether the underlying geometry 
is a point
+                // By representing the point with an MBR with the same min 
corner and max corner,
+                // libgpuspatial treats the MBR as a point, which triggers an 
optimized point query
+                // instead of using rect-rect query for high performance
+                // Ref: 
https://github.com/apache/sedona-db/blob/9187f8b8c4ca52b64837fab5fddd377703f7331b/c/sedona-libgpuspatial/libgpuspatial/src/rt_spatial_index.cu#L374
+                if let Some(rect) = wkb.bounding_rect() {
+                    let min = rect.min();
+                    let max = rect.max();
+                    // Why conservative bounding boxes prevent false negatives:
+                    // 1. P32 = round_nearest(P64), so P32 is the closest 
possible float to P64.
+                    // 2. Min32 = round_down(Min64), guaranteeing Min32 <= 
Min64.
+                    // 3. Max32 = round_up(Max64), guaranteeing Max32 >= Max64.
+                    // If P64 is inside Box64 (Min64 <= P64 <= Max64), P32 
cannot fall outside Box32.
+                    // If P32 < Min32, it would mean Min32 is closer to P64 
than P32 is, which
+                    // contradicts P32 being the nearest float. Therefore, 
false negatives are impossible.
+                    if wkb.geometry_type() == GeometryType::Point {
+                        Some(Rect::new(
+                            coord!(x: min.x as f32, y: min.y as f32),
+                            coord!(x: max.x as f32, y: max.y as f32),
+                        ))
+                    } else {
+                        let (min_x, min_y, max_x, max_y) =
+                            f64_box_to_f32(min.x, min.y, max.x, max.y);
+                        Some(Rect::new(
+                            coord!(x: min_x, y: min_y),
+                            coord!(x: max_x, y: max_y),
+                        ))
+                    }
+                } else {
+                    None
+                }
+            } else {
+                None
+            };
+
+            rect_vec.push(rect_opt);
+            Ok(())
+        })?;
+
+        EvaluatedGeometryArray::try_new_with_rects(geometry_array, rect_vec, 
sedona_type)
+    }
+}
diff --git a/rust/sedona-spatial-join/src/utils.rs 
b/rust/sedona-spatial-join-gpu/src/lib.rs
similarity index 77%
copy from rust/sedona-spatial-join/src/utils.rs
copy to rust/sedona-spatial-join-gpu/src/lib.rs
index ae784899..809e9acb 100644
--- a/rust/sedona-spatial-join/src/utils.rs
+++ b/rust/sedona-spatial-join-gpu/src/lib.rs
@@ -15,11 +15,8 @@
 // specific language governing permissions and limitations
 // under the License.
 
-pub(crate) mod arrow_utils;
-pub(crate) mod bbox_sampler;
-pub(crate) mod disposable_async_cell;
-pub(crate) mod init_once_array;
-pub mod internal_benchmark_util;
-pub(crate) mod join_utils;
-pub(crate) mod once_fut;
-pub(crate) mod spill;
+mod index;
+mod join_provider;
+
+pub mod options;
+pub mod physical_planner;
diff --git a/rust/sedona-spatial-join-gpu/src/options.rs 
b/rust/sedona-spatial-join-gpu/src/options.rs
new file mode 100644
index 00000000..452a7c87
--- /dev/null
+++ b/rust/sedona-spatial-join-gpu/src/options.rs
@@ -0,0 +1,51 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use datafusion_common::{config::ConfigExtension, extensions_options};
+
+extensions_options! {
+    /// Configuration options for GPU-accelerated spatial joins.
+    pub struct GpuOptions {
+        /// Enable GPU-accelerated spatial joins (requires CUDA and GPU 
feature flag)
+        pub enable: bool, default = false
+
+        /// Concatenate all build-side geometries into one buffer for GPU 
processing
+        pub concat_build: bool, default = true
+
+        /// GPU device ID to use (0 = first GPU, 1 = second, etc.)
+        pub device_id: usize, default = 0
+
+        /// Fall back to CPU if GPU initialization or execution fails
+        pub fallback_to_cpu: bool, default = true
+
+        /// Use CUDA memory pool for GPU memory management
+        pub use_memory_pool: bool, default = true
+
+        /// Percentage of total GPU memory for initializing CUDA memory pool 
(0-100)
+        pub memory_pool_init_percentage: usize, default = 50
+
+        /// Overlap parsing and refinement by pipelining multiple batches; 1 
means no pipelining
+        pub pipeline_batches: usize, default = 1
+
+        /// Compress BVH to reduce memory usage (may reduce performance)
+        pub compress_bvh: bool, default = false
+    }
+}
+
+impl ConfigExtension for GpuOptions {
+    const PREFIX: &'static str = "gpu";
+}
diff --git a/rust/sedona-spatial-join-gpu/src/physical_planner.rs 
b/rust/sedona-spatial-join-gpu/src/physical_planner.rs
new file mode 100644
index 00000000..7b2593aa
--- /dev/null
+++ b/rust/sedona-spatial-join-gpu/src/physical_planner.rs
@@ -0,0 +1,202 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::sync::Arc;
+
+use crate::join_provider::GpuSpatialJoinProvider;
+use crate::options::GpuOptions;
+use arrow_schema::Schema;
+use datafusion::physical_plan::ExecutionPlan;
+use datafusion_common::{DataFusionError, Result};
+use datafusion_physical_expr::PhysicalExpr;
+use sedona_common::SpatialJoinOptions;
+use sedona_query_planner::spatial_join_physical_planner::{
+    PlanSpatialJoinArgs, SpatialJoinPhysicalPlanner,
+};
+use sedona_query_planner::spatial_predicate::{
+    RelationPredicate, SpatialPredicate, SpatialRelationType,
+};
+use sedona_schema::datatypes::SedonaType;
+use sedona_schema::matchers::ArgMatcher;
+use sedona_spatial_join::SpatialJoinExec;
+
+/// [SpatialJoinFactory] implementation for the default spatial join
+///
+/// This struct is the entrypoint to ensuring the SedonaQueryPlanner is able
+/// to instantiate the [ExecutionPlan] implemented in this crate.
+#[derive(Debug)]
+pub struct GpuSpatialJoinPhysicalPlanner;
+
+impl GpuSpatialJoinPhysicalPlanner {
+    /// Create a new default join factory
+    pub fn new() -> Self {
+        Self {}
+    }
+}
+
+impl Default for GpuSpatialJoinPhysicalPlanner {
+    fn default() -> Self {
+        Self::new()
+    }
+}
+
+impl SpatialJoinPhysicalPlanner for GpuSpatialJoinPhysicalPlanner {
+    fn plan_spatial_join(
+        &self,
+        args: &PlanSpatialJoinArgs<'_>,
+    ) -> Result<Option<Arc<dyn ExecutionPlan>>> {
+        let supported = is_spatial_predicate_supported(
+            args.spatial_predicate,
+            &args.physical_left.schema(),
+            &args.physical_right.schema(),
+        )?;
+        let gpu_options = args
+            .options
+            .extensions
+            .get::<GpuOptions>()
+            .cloned()
+            .unwrap_or_default();
+
+        if !gpu_options.enable {
+            return Ok(None);
+        }
+
+        if !supported {
+            if gpu_options.fallback_to_cpu {
+                log::warn!("Falling back to CPU spatial join as the spatial 
predicate is not supported on GPU");
+                return Ok(None);
+            } else {
+                return Err(DataFusionError::Plan("GPU spatial join is enabled, 
but the spatial predicate is not supported on GPU".into()));
+            }
+        }
+
+        let should_swap = !matches!(
+            args.spatial_predicate,
+            SpatialPredicate::KNearestNeighbors(_)
+        ) && args.join_type.supports_swap()
+            && should_swap_join_order(
+                args.join_options,
+                args.physical_left.as_ref(),
+                args.physical_right.as_ref(),
+            )?;
+
+        let exec = SpatialJoinExec::try_new(
+            args.physical_left.clone(),
+            args.physical_right.clone(),
+            args.spatial_predicate.clone(),
+            args.remainder.cloned(),
+            args.join_type,
+            None,
+            args.join_options,
+        )?;
+        let exec =
+            
exec.with_spatial_join_provider(Arc::new(GpuSpatialJoinProvider::new(gpu_options)));
+
+        if should_swap {
+            exec.swap_inputs().map(Some)
+        } else {
+            Ok(Some(Arc::new(exec) as Arc<dyn ExecutionPlan>))
+        }
+    }
+}
+
+/// Spatial join reordering heuristic:
+/// 1. Put the input with fewer rows on the build side, because fewer entries
+///    produce a smaller and more efficient spatial index (R-tree).
+/// 2. If row-count statistics are unavailable (for example, for CSV sources),
+///    fall back to total input size as an estimate.
+/// 3. Do not swap the join order if join reordering is disabled or no relevant
+///    statistics are available.
+fn should_swap_join_order(
+    spatial_join_options: &SpatialJoinOptions,
+    left: &dyn ExecutionPlan,
+    right: &dyn ExecutionPlan,
+) -> Result<bool> {
+    if !spatial_join_options.spatial_join_reordering {
+        log::info!(
+            "spatial join swap heuristic disabled via 
sedona.spatial_join.spatial_join_reordering"
+        );
+        return Ok(false);
+    }
+
+    let left_stats = left.partition_statistics(None)?;
+    let right_stats = right.partition_statistics(None)?;
+
+    let left_num_rows = left_stats.num_rows;
+    let right_num_rows = right_stats.num_rows;
+    let left_total_byte_size = left_stats.total_byte_size;
+    let right_total_byte_size = right_stats.total_byte_size;
+
+    let should_swap = match (left_num_rows.get_value(), 
right_num_rows.get_value()) {
+        (Some(l), Some(r)) => l > r,
+        _ => match (
+            left_total_byte_size.get_value(),
+            right_total_byte_size.get_value(),
+        ) {
+            (Some(l), Some(r)) => l > r,
+            _ => false,
+        },
+    };
+
+    log::info!(
+        "spatial join swap heuristic: left_num_rows={left_num_rows:?}, 
right_num_rows={right_num_rows:?}, 
left_total_byte_size={left_total_byte_size:?}, 
right_total_byte_size={right_total_byte_size:?}, should_swap={should_swap}"
+    );
+
+    Ok(should_swap)
+}
+
+pub fn is_spatial_predicate_supported(
+    spatial_predicate: &SpatialPredicate,
+    left_schema: &Schema,
+    right_schema: &Schema,
+) -> Result<bool> {
+    fn is_geometry_type_supported(expr: &Arc<dyn PhysicalExpr>, schema: 
&Schema) -> Result<bool> {
+        let return_field = expr.return_field(schema)?;
+        let sedona_type = SedonaType::from_storage_field(&return_field)?;
+        Ok(ArgMatcher::is_geometry().match_type(&sedona_type))
+    }
+
+    let both_geometry =
+        |left: &Arc<dyn PhysicalExpr>, right: &Arc<dyn PhysicalExpr>| -> 
Result<bool> {
+            Ok(is_geometry_type_supported(left, left_schema)?
+                && is_geometry_type_supported(right, right_schema)?)
+        };
+
+    match spatial_predicate {
+        SpatialPredicate::Relation(RelationPredicate {
+            left,
+            right,
+            relation_type,
+        }) => {
+            if !matches!(
+                relation_type,
+                SpatialRelationType::Intersects
+                    | SpatialRelationType::Contains
+                    | SpatialRelationType::Within
+                    | SpatialRelationType::Covers
+                    | SpatialRelationType::CoveredBy
+                    | SpatialRelationType::Touches
+                    | SpatialRelationType::Equals
+            ) {
+                return Ok(false);
+            }
+
+            both_geometry(left, right)
+        }
+        SpatialPredicate::Distance(_) | SpatialPredicate::KNearestNeighbors(_) 
=> Ok(false),
+    }
+}
diff --git a/rust/sedona-spatial-join/src/exec.rs 
b/rust/sedona-spatial-join/src/exec.rs
index 42ca6ebb..0b8c61a2 100644
--- a/rust/sedona-spatial-join/src/exec.rs
+++ b/rust/sedona-spatial-join/src/exec.rs
@@ -179,6 +179,14 @@ impl SpatialJoinExec {
         })
     }
 
+    /// Create a new [`SpatialJoinExec`] with customized 
[`SpatialJoinProvider`]
+    pub fn with_spatial_join_provider(
+        mut self,
+        join_provider: Arc<dyn SpatialJoinProvider>,
+    ) -> Self {
+        self.join_provider = join_provider;
+        self
+    }
     /// How the join is performed
     pub fn join_type(&self) -> &JoinType {
         &self.join_type
diff --git a/rust/sedona-spatial-join/src/index.rs 
b/rust/sedona-spatial-join/src/index.rs
index 9c3c00c5..fb732954 100644
--- a/rust/sedona-spatial-join/src/index.rs
+++ b/rust/sedona-spatial-join/src/index.rs
@@ -21,8 +21,8 @@ pub(crate) mod default_spatial_index_builder;
 mod knn_adapter;
 pub(crate) mod memory_plan;
 pub(crate) mod partitioned_index_provider;
-pub(crate) mod spatial_index;
-pub(crate) mod spatial_index_builder;
+pub mod spatial_index;
+pub mod spatial_index_builder;
 
 pub(crate) use build_side_collector::{
     BuildPartition, BuildSideBatchesCollector, CollectBuildSideMetrics,
diff --git a/rust/sedona-spatial-join/src/utils.rs 
b/rust/sedona-spatial-join/src/utils.rs
index ae784899..a600616f 100644
--- a/rust/sedona-spatial-join/src/utils.rs
+++ b/rust/sedona-spatial-join/src/utils.rs
@@ -20,6 +20,6 @@ pub(crate) mod bbox_sampler;
 pub(crate) mod disposable_async_cell;
 pub(crate) mod init_once_array;
 pub mod internal_benchmark_util;
-pub(crate) mod join_utils;
+pub mod join_utils;
 pub(crate) mod once_fut;
 pub(crate) mod spill;
diff --git a/rust/sedona-spatial-join/src/utils/join_utils.rs 
b/rust/sedona-spatial-join/src/utils/join_utils.rs
index 9c6565e9..722a189f 100644
--- a/rust/sedona-spatial-join/src/utils/join_utils.rs
+++ b/rust/sedona-spatial-join/src/utils/join_utils.rs
@@ -50,7 +50,7 @@ use crate::SpatialPredicate;
 ///
 /// For example of the `Left` join, in each iteration of right side, can get 
the matched result, but need
 /// to maintain the matched indices bit map to get the unmatched row for the 
left side.
-pub(crate) fn need_produce_result_in_final(join_type: JoinType) -> bool {
+pub fn need_produce_result_in_final(join_type: JoinType) -> bool {
     matches!(
         join_type,
         JoinType::Left
diff --git a/rust/sedona/Cargo.toml b/rust/sedona/Cargo.toml
index b0787420..b3e53ce6 100644
--- a/rust/sedona/Cargo.toml
+++ b/rust/sedona/Cargo.toml
@@ -43,6 +43,7 @@ pointcloud = ["dep:sedona-pointcloud"]
 proj = ["sedona-proj/proj-sys"]
 gdal = ["sedona-gdal/gdal-sys"]
 spatial-join = ["dep:sedona-spatial-join"]
+gpu = ["dep:sedona-spatial-join-gpu", "sedona-spatial-join-gpu/gpu"]
 s2geography = ["dep:sedona-s2geography"]
 
 [dev-dependencies]
@@ -82,6 +83,7 @@ sedona-gdal = { workspace = true }
 sedona-raster-functions = { workspace = true }
 sedona-schema = { workspace = true }
 sedona-spatial-join = { workspace = true, optional = true }
+sedona-spatial-join-gpu = { workspace = true, optional = true }
 sedona-query-planner = { workspace = true }
 sedona-s2geography = { workspace = true, optional = true }
 sedona-testing = { workspace = true }
diff --git a/rust/sedona/src/context.rs b/rust/sedona/src/context.rs
index b85e3be8..77ff4e9b 100644
--- a/rust/sedona/src/context.rs
+++ b/rust/sedona/src/context.rs
@@ -64,6 +64,9 @@ use sedona_pointcloud::las::{
     format::{Extension, LasFormatFactory},
     options::{GeometryEncoding, LasExtraBytes, LasOptions},
 };
+#[cfg(feature = "gpu")]
+use sedona_spatial_join_gpu::options::GpuOptions;
+
 use sedona_query_planner::{
     optimizer::register_spatial_join_logical_optimizer, 
query_planner::SedonaQueryPlanner,
 };
@@ -139,6 +142,9 @@ impl SedonaContext {
                 .with_las_extra_bytes(LasExtraBytes::Typed),
         );
 
+        #[cfg(feature = "gpu")]
+        let session_config = 
session_config.with_option_extension(GpuOptions::default());
+
         #[allow(unused_mut)]
         let mut state_builder = SessionStateBuilder::new()
             .with_default_features()
@@ -154,6 +160,16 @@ impl SedonaContext {
             planner = planner.with_spatial_join_physical_planner(Arc::new(
                 DefaultSpatialJoinPhysicalPlanner::new(),
             ));
+            // Register the GPU join after the default planer
+            // If a query is not supported, it falls back to the default 
planner.
+            #[cfg(feature = "gpu")]
+            {
+                use 
sedona_spatial_join_gpu::physical_planner::GpuSpatialJoinPhysicalPlanner;
+
+                planner = planner.with_spatial_join_physical_planner(Arc::new(
+                    GpuSpatialJoinPhysicalPlanner::new(),
+                ));
+            }
         }
 
         state_builder = 
register_spatial_join_logical_optimizer(state_builder)?;

Reply via email to