This is an automated email from the ASF dual-hosted git repository.
paleolimbot pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/sedona-db.git
The following commit(s) were added to refs/heads/main by this push:
new f6e0fff2 feat(rust/sedona-spatial-join-gpu): Integrate libgpuspatial
into sedona-spatial-join (#722)
f6e0fff2 is described below
commit f6e0fff2ad9f21632d341bc6ea2349f06ece4807
Author: Liang Geng <[email protected]>
AuthorDate: Wed Apr 15 22:00:31 2026 +0800
feat(rust/sedona-spatial-join-gpu): Integrate libgpuspatial into
sedona-spatial-join (#722)
Co-authored-by: Dewey Dunnington <[email protected]>
---
.github/workflows/rust.yml | 2 +-
Cargo.lock | 43 +-
Cargo.toml | 3 +
.../libgpuspatial/src/gpuspatial_c.cc | 3 -
rust/sedona-spatial-join-gpu/Cargo.toml | 68 ++
.../src/index.rs} | 11 +-
.../src/index/gpu_spatial_index.rs | 743 +++++++++++++++++++++
.../src/index/gpu_spatial_index_builder.rs | 286 ++++++++
rust/sedona-spatial-join-gpu/src/join_provider.rs | 142 ++++
.../src/lib.rs} | 13 +-
rust/sedona-spatial-join-gpu/src/options.rs | 51 ++
.../src/physical_planner.rs | 202 ++++++
rust/sedona-spatial-join/src/exec.rs | 8 +
rust/sedona-spatial-join/src/index.rs | 4 +-
rust/sedona-spatial-join/src/utils.rs | 2 +-
rust/sedona-spatial-join/src/utils/join_utils.rs | 2 +-
rust/sedona/Cargo.toml | 2 +
rust/sedona/src/context.rs | 16 +
18 files changed, 1573 insertions(+), 28 deletions(-)
diff --git a/.github/workflows/rust.yml b/.github/workflows/rust.yml
index d79d7a23..a7c51464 100644
--- a/.github/workflows/rust.yml
+++ b/.github/workflows/rust.yml
@@ -162,7 +162,7 @@ jobs:
- name: Test
if: matrix.name == 'test'
run: |
- cargo test --workspace --all-targets --all-features
+ cargo test --workspace --all-targets --all-features --exclude
sedona-spatial-join-gpu
# Clean up intermediate build artifacts to free disk space
aggressively
cargo clean -p sedona-s2geography
rm -rf target/debug/deps
diff --git a/Cargo.lock b/Cargo.lock
index fa803af4..bb0a7417 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -2532,9 +2532,9 @@ checksum =
"dea2df4cf52843e0452895c455a1a2cfbb842a1e7329671acf418fdc53ed4c59"
[[package]]
name = "fastrand"
-version = "2.4.0"
+version = "2.4.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "a043dc74da1e37d6afe657061213aa6f425f855399a11d3463c6ecccc4dfda1f"
+checksum = "9f1f227452a390804cdb637b74a86990f2a7d7ba4b7d5693aac9b4dd6defd8d6"
[[package]]
name = "fd-lock"
@@ -5216,6 +5216,7 @@ dependencies = [
"sedona-s2geography",
"sedona-schema",
"sedona-spatial-join",
+ "sedona-spatial-join-gpu",
"sedona-testing",
"sedona-tg",
"serde",
@@ -5753,6 +5754,40 @@ dependencies = [
"wkt 0.14.0",
]
+[[package]]
+name = "sedona-spatial-join-gpu"
+version = "0.4.0"
+dependencies = [
+ "arrow",
+ "arrow-array",
+ "arrow-schema",
+ "async-trait",
+ "criterion",
+ "datafusion",
+ "datafusion-common",
+ "datafusion-physical-expr",
+ "env_logger 0.11.8",
+ "futures",
+ "geo-index",
+ "geo-types",
+ "log",
+ "parking_lot",
+ "rand 0.10.1",
+ "rstest",
+ "sedona-common",
+ "sedona-expr",
+ "sedona-functions",
+ "sedona-geo-generic-alg",
+ "sedona-libgpuspatial",
+ "sedona-query-planner",
+ "sedona-schema",
+ "sedona-spatial-join",
+ "sedona-testing",
+ "tokio",
+ "wkb",
+ "wkt 0.14.0",
+]
+
[[package]]
name = "sedona-testing"
version = "0.4.0"
@@ -6350,9 +6385,9 @@ checksum =
"1f3ccbac311fea05f86f61904b462b55fb3df8837a366dfc601a0161d0532f20"
[[package]]
name = "tokio"
-version = "1.51.0"
+version = "1.51.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "2bd1c4c0fc4a7ab90fc15ef6daaa3ec3b893f004f915f2392557ed23237820cd"
+checksum = "f66bf9585cda4b724d3e78ab34b73fb2bbaba9011b9bfdf69dc836382ea13b8c"
dependencies = [
"bytes",
"libc",
diff --git a/Cargo.toml b/Cargo.toml
index c1436139..44db7a01 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -40,6 +40,7 @@ members = [
"rust/sedona-raster-functions",
"rust/sedona-schema",
"rust/sedona-spatial-join",
+ "rust/sedona-spatial-join-gpu",
"rust/sedona-query-planner",
"rust/sedona-testing",
"rust/sedona",
@@ -147,6 +148,7 @@ sedona-raster = { version = "0.4.0", path =
"rust/sedona-raster" }
sedona-raster-functions = { version = "0.4.0", path =
"rust/sedona-raster-functions" }
sedona-schema = { version = "0.4.0", path = "rust/sedona-schema" }
sedona-spatial-join = { version = "0.4.0", path = "rust/sedona-spatial-join" }
+sedona-spatial-join-gpu = { version = "0.4.0", path =
"rust/sedona-spatial-join-gpu" }
sedona-query-planner = { version = "0.4.0", path = "rust/sedona-query-planner"
}
sedona-testing = { version = "0.4.0", path = "rust/sedona-testing" }
@@ -158,3 +160,4 @@ sedona-gdal = { version = "0.4.0", path = "c/sedona-gdal",
default-features = fa
sedona-proj = { version = "0.4.0", path = "c/sedona-proj", default-features =
false }
sedona-s2geography = { version = "0.4.0", path = "c/sedona-s2geography" }
sedona-tg = { version = "0.4.0", path = "c/sedona-tg" }
+sedona-libgpuspatial = { version = "0.4.0", path = "c/sedona-libgpuspatial" }
diff --git a/c/sedona-libgpuspatial/libgpuspatial/src/gpuspatial_c.cc
b/c/sedona-libgpuspatial/libgpuspatial/src/gpuspatial_c.cc
index e0fae7a4..4f697622 100644
--- a/c/sedona-libgpuspatial/libgpuspatial/src/gpuspatial_c.cc
+++ b/c/sedona-libgpuspatial/libgpuspatial/src/gpuspatial_c.cc
@@ -82,9 +82,6 @@ struct GpuSpatialRuntimeExporter {
std::string ptx_root(config->ptx_root);
auto rt_config = gpuspatial::get_default_rt_config(ptx_root);
- GPUSPATIAL_LOG_INFO("Initializing GpuSpatialRuntime on device %d, PTX
root %s",
- config->device_id, config->ptx_root);
-
CUDA_CHECK(cudaSetDevice(config->device_id));
gpuspatial::MemoryManager::instance().Init(config->use_cuda_memory_pool,
diff --git a/rust/sedona-spatial-join-gpu/Cargo.toml
b/rust/sedona-spatial-join-gpu/Cargo.toml
new file mode 100644
index 00000000..fb7d4b20
--- /dev/null
+++ b/rust/sedona-spatial-join-gpu/Cargo.toml
@@ -0,0 +1,68 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+[package]
+name = "sedona-spatial-join-gpu"
+version.workspace = true
+license.workspace = true
+keywords.workspace = true
+categories.workspace = true
+homepage.workspace = true
+repository.workspace = true
+description.workspace = true
+readme.workspace = true
+edition.workspace = true
+rust-version.workspace = true
+
+[lints.clippy]
+result_large_err = "allow"
+
+[features]
+gpu = ["sedona-libgpuspatial/gpu"]
+
+[dependencies]
+arrow = { workspace = true }
+arrow-array = { workspace = true }
+arrow-schema = { workspace = true }
+async-trait = { workspace = true }
+datafusion = { workspace = true }
+datafusion-common = { workspace = true }
+datafusion-physical-expr = { workspace = true }
+geo-types = { workspace = true }
+parking_lot = { workspace = true }
+sedona-common = { workspace = true }
+sedona-expr = { workspace = true }
+geo-index = { workspace = true }
+tokio = { workspace = true }
+sedona-schema = { workspace = true }
+sedona-functions = { workspace = true }
+sedona-spatial-join = { workspace = true }
+sedona-libgpuspatial = { workspace = true }
+sedona-geo-generic-alg = { workspace = true }
+sedona-query-planner = { workspace = true }
+futures = { workspace = true }
+wkb = { workspace = true }
+log = { workspace = true }
+
+[dev-dependencies]
+criterion = { workspace = true }
+datafusion = { workspace = true, features = ["sql"] }
+rstest = { workspace = true }
+sedona-testing = { workspace = true }
+wkt = { workspace = true }
+tokio = { workspace = true, features = ["macros"] }
+rand = { workspace = true }
+env_logger = { workspace = true }
diff --git a/rust/sedona-spatial-join/src/utils.rs
b/rust/sedona-spatial-join-gpu/src/index.rs
similarity index 77%
copy from rust/sedona-spatial-join/src/utils.rs
copy to rust/sedona-spatial-join-gpu/src/index.rs
index ae784899..a3c80eb5 100644
--- a/rust/sedona-spatial-join/src/utils.rs
+++ b/rust/sedona-spatial-join-gpu/src/index.rs
@@ -15,11 +15,6 @@
// specific language governing permissions and limitations
// under the License.
-pub(crate) mod arrow_utils;
-pub(crate) mod bbox_sampler;
-pub(crate) mod disposable_async_cell;
-pub(crate) mod init_once_array;
-pub mod internal_benchmark_util;
-pub(crate) mod join_utils;
-pub(crate) mod once_fut;
-pub(crate) mod spill;
+pub(crate) mod gpu_spatial_index;
+pub(crate) mod gpu_spatial_index_builder;
+pub(crate) use gpu_spatial_index_builder::GPUSpatialIndexBuilder as
GpuSpatialIndexBuilder;
diff --git a/rust/sedona-spatial-join-gpu/src/index/gpu_spatial_index.rs
b/rust/sedona-spatial-join-gpu/src/index/gpu_spatial_index.rs
new file mode 100644
index 00000000..08c59caa
--- /dev/null
+++ b/rust/sedona-spatial-join-gpu/src/index/gpu_spatial_index.rs
@@ -0,0 +1,743 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::options::GpuOptions;
+use arrow::array::BooleanBufferBuilder;
+use arrow_array::{ArrayRef, RecordBatch};
+use arrow_schema::SchemaRef;
+use async_trait::async_trait;
+use datafusion_common::{DataFusionError, Result};
+use geo_types::{coord, Rect};
+use parking_lot::Mutex;
+use sedona_common::ExecutionMode;
+use sedona_expr::statistics::GeoStatistics;
+use sedona_libgpuspatial::{
+ GpuSpatialIndex, GpuSpatialOptions, GpuSpatialRefiner,
GpuSpatialRelationPredicate,
+};
+use sedona_spatial_join::evaluated_batch::EvaluatedBatch;
+use sedona_spatial_join::index::spatial_index::SpatialIndex;
+use sedona_spatial_join::index::QueryResultMetrics;
+use sedona_spatial_join::spatial_predicate::SpatialRelationType;
+use sedona_spatial_join::SpatialPredicate;
+use std::ops::Range;
+use std::sync::atomic::{AtomicUsize, Ordering};
+use std::sync::Arc;
+use wkb::reader::Wkb;
+
+pub struct GPUSpatialIndex {
+ pub(crate) schema: SchemaRef,
+ /// GPU spatial index for performing GPU-accelerated filtering
+ pub(crate) index: Arc<GpuSpatialIndex>,
+ /// GPU spatial refiner for performing GPU-accelerated refinement
+ pub(crate) refiner: Arc<GpuSpatialRefiner>,
+ pub(crate) spatial_predicate: SpatialPredicate,
+ /// Indexed batches containing evaluated geometry arrays. It contains the
original record
+ /// batches and geometry arrays obtained by evaluating the geometry
expression on the build side.
+ pub(crate) indexed_batches: Vec<EvaluatedBatch>,
+ /// An array for translating data index to geometry batch index and row
index
+ pub(crate) data_id_to_batch_pos: Vec<(i32, i32)>,
+ /// Shared bitmap builders for visited left indices, one per batch
+ pub(crate) visited_build_side: Option<Mutex<Vec<BooleanBufferBuilder>>>,
+ /// Counter of running probe-threads, potentially able to update `bitmap`.
+ /// Each time a probe thread finished probing the index, it will decrement
the counter.
+ /// The last finished probe thread will produce the extra output batches
for unmatched
+ /// build side when running left-outer joins. See also
[`report_probe_completed`].
+ pub(crate) probe_threads_counter: AtomicUsize,
+}
+impl GPUSpatialIndex {
+ pub fn empty(
+ spatial_predicate: SpatialPredicate,
+ schema: SchemaRef,
+ gpu_options: GpuOptions,
+ visited_build_side: Option<Mutex<Vec<BooleanBufferBuilder>>>,
+ probe_threads_counter: AtomicUsize,
+ ) -> Result<Self> {
+ let gpu_libspatial_options = GpuSpatialOptions {
+ cuda_use_memory_pool: gpu_options.use_memory_pool,
+ cuda_memory_pool_init_percent:
gpu_options.memory_pool_init_percentage as i32,
+ concurrency: 1,
+ device_id: gpu_options.device_id as i32,
+ compress_bvh: gpu_options.compress_bvh,
+ pipeline_batches: gpu_options.pipeline_batches as u32,
+ };
+
+ Ok(Self {
+ schema,
+ spatial_predicate,
+ index: Arc::new(
+ GpuSpatialIndex::try_new(&gpu_libspatial_options)
+ .map_err(|e| DataFusionError::Execution(e.to_string()))?,
+ ),
+ refiner: Arc::new(
+ GpuSpatialRefiner::try_new(&gpu_libspatial_options)
+ .map_err(|e| DataFusionError::Execution(e.to_string()))?,
+ ),
+ indexed_batches: vec![],
+ data_id_to_batch_pos: vec![],
+ visited_build_side,
+ probe_threads_counter,
+ })
+ }
+
+ fn refine(
+ &self,
+ probe_geoms: &ArrayRef,
+ predicate: &SpatialPredicate,
+ build_indices: &mut Vec<u32>,
+ probe_indices: &mut Vec<u32>,
+ ) -> Result<()> {
+ match predicate {
+ SpatialPredicate::Relation(rel_p) => {
+ self.refiner
+ .refine(
+ probe_geoms,
+ Self::convert_relation_type(&rel_p.relation_type)?,
+ build_indices,
+ probe_indices,
+ )
+ .map_err(|e| {
+ DataFusionError::Execution(format!(
+ "GPU spatial refinement failed: {:?}",
+ e
+ ))
+ })?;
+ Ok(())
+ }
+ _ => Err(DataFusionError::NotImplemented(
+ "Only Relation predicate is supported for GPU spatial
query".to_string(),
+ )),
+ }
+ }
+ // Translate Sedona SpatialRelationType to GpuSpatialRelationPredicate
+ fn convert_relation_type(t: &SpatialRelationType) ->
Result<GpuSpatialRelationPredicate> {
+ match t {
+ SpatialRelationType::Equals =>
Ok(GpuSpatialRelationPredicate::Equals),
+ SpatialRelationType::Touches =>
Ok(GpuSpatialRelationPredicate::Touches),
+ SpatialRelationType::Contains =>
Ok(GpuSpatialRelationPredicate::Contains),
+ SpatialRelationType::Covers =>
Ok(GpuSpatialRelationPredicate::Covers),
+ SpatialRelationType::Intersects =>
Ok(GpuSpatialRelationPredicate::Intersects),
+ SpatialRelationType::Within =>
Ok(GpuSpatialRelationPredicate::Within),
+ SpatialRelationType::CoveredBy =>
Ok(GpuSpatialRelationPredicate::CoveredBy),
+ _ => {
+ // This should not happen as we check for supported predicates
earlier
+ Err(DataFusionError::Execution(format!(
+ "Unsupported spatial relation type for GPU: {:?}",
+ t
+ )))
+ }
+ }
+ }
+}
+
+#[async_trait]
+impl SpatialIndex for GPUSpatialIndex {
+ fn schema(&self) -> SchemaRef {
+ self.schema.clone()
+ }
+
+ fn num_indexed_batches(&self) -> usize {
+ self.indexed_batches.len()
+ }
+ fn get_indexed_batch(&self, batch_idx: usize) -> &RecordBatch {
+ &self.indexed_batches[batch_idx].batch
+ }
+
+ /// This method implements [`SpatialIndex::query_batch`] with
GPU-accelerated spatial filtering
+ /// and refinement. It takes a batch of probe geometries and a range of
row indices to process,
+ /// performs a spatial query on the GPU to find candidate matches,
+ /// refines the candidates using the GPU refiner,
+ /// and returns the matching build batch positions and probe indices.
+ async fn query_batch(
+ &self,
+ evaluated_batch: &Arc<EvaluatedBatch>,
+ range: Range<usize>,
+ _max_result_size: usize,
+ build_batch_positions: &mut Vec<(i32, i32)>,
+ probe_indices: &mut Vec<u32>,
+ ) -> Result<(QueryResultMetrics, usize)> {
+ if range.is_empty() {
+ return Ok((
+ QueryResultMetrics {
+ count: 0,
+ candidate_count: 0,
+ },
+ range.start,
+ ));
+ }
+ let index = &self.index.as_ref();
+
+ let empty_rect = Rect::new(
+ coord!(x: f32::NAN, y: f32::NAN),
+ coord!(x: f32::NAN, y: f32::NAN),
+ );
+ let rects: Vec<_> = range
+ .clone()
+ .map(|row_idx|
evaluated_batch.geom_array.rects()[row_idx].unwrap_or(empty_rect))
+ .collect();
+
+ let (mut gpu_build_indices, mut gpu_probe_indices) =
+ index.probe(rects.as_ref()).map_err(|e| {
+ DataFusionError::Execution(format!("GPU spatial query failed:
{:?}", e))
+ })?;
+
+ assert_eq!(gpu_build_indices.len(), gpu_probe_indices.len());
+
+ let candidate_count = gpu_build_indices.len();
+
+ self.refine(
+ evaluated_batch.geom_array.geometry_array(),
+ &self.spatial_predicate,
+ &mut gpu_build_indices,
+ &mut gpu_probe_indices,
+ )?;
+
+ assert_eq!(gpu_build_indices.len(), gpu_probe_indices.len());
+
+ let total_count = gpu_build_indices.len();
+
+ for (build_idx, probe_idx) in
gpu_build_indices.iter().zip(gpu_probe_indices.iter()) {
+ let data_id = *build_idx as usize;
+ let (batch_idx, row_idx) = self.data_id_to_batch_pos[data_id];
+ build_batch_positions.push((batch_idx, row_idx));
+ probe_indices.push(range.start as u32 + probe_idx);
+ }
+ Ok((
+ QueryResultMetrics {
+ count: total_count,
+ candidate_count,
+ },
+ range.end,
+ ))
+ }
+ fn need_more_probe_stats(&self) -> bool {
+ false
+ }
+
+ fn merge_probe_stats(&self, _stats: GeoStatistics) {}
+
+ fn visited_build_side(&self) -> Option<&Mutex<Vec<BooleanBufferBuilder>>> {
+ self.visited_build_side.as_ref()
+ }
+
+ fn report_probe_completed(&self) -> bool {
+ self.probe_threads_counter.fetch_sub(1, Ordering::Relaxed) == 1
+ }
+
+ fn get_refiner_mem_usage(&self) -> usize {
+ 0
+ }
+
+ fn get_actual_execution_mode(&self) -> ExecutionMode {
+ ExecutionMode::PrepareBuild // GPU-based spatial index is always on
PrepareBuild mode
+ }
+
+ fn query_knn(
+ &self,
+ _probe_wkb: &Wkb,
+ _k: u32,
+ _use_spheroid: bool,
+ _include_tie_breakers: bool,
+ _build_batch_positions: &mut Vec<(i32, i32)>,
+ _distances: Option<&mut Vec<f64>>,
+ ) -> Result<QueryResultMetrics> {
+ Err(DataFusionError::NotImplemented(
+ "KNN query is not implemented for GPU spatial index".to_string(),
+ ))
+ }
+}
+
+#[cfg(test)]
+#[cfg(feature = "gpu")]
+mod tests {
+ use crate::index::gpu_spatial_index_builder::GPUSpatialIndexBuilder;
+ use crate::options::GpuOptions;
+ use arrow_array::RecordBatch;
+ use arrow_schema::{DataType, Field, SchemaRef};
+ use datafusion_common::JoinType;
+ use datafusion_physical_expr::expressions::Column;
+ use futures::Stream;
+ use sedona_expr::statistics::GeoStatistics;
+ use sedona_schema::datatypes::WKB_GEOMETRY;
+ use sedona_spatial_join::evaluated_batch::evaluated_batch_stream::{
+ EvaluatedBatchStream, SendableEvaluatedBatchStream,
+ };
+ use sedona_spatial_join::evaluated_batch::EvaluatedBatch;
+ use sedona_spatial_join::index::spatial_index::SpatialIndexRef;
+ use sedona_spatial_join::index::spatial_index_builder::{
+ SpatialIndexBuilder, SpatialJoinBuildMetrics,
+ };
+ use sedona_spatial_join::operand_evaluator::EvaluatedGeometryArray;
+ use sedona_spatial_join::spatial_predicate::{RelationPredicate,
SpatialRelationType};
+ use sedona_spatial_join::SpatialPredicate;
+ use sedona_testing::create::create_array;
+ use std::pin::Pin;
+ use std::sync::Arc;
+ use std::task::{Context, Poll};
+ use std::vec::IntoIter;
+
+ pub struct SingleBatchStream {
+ // We use an Option so we can `take()` it on the first poll,
+ // leaving `None` for subsequent polls to signal the end of the stream.
+ batch: Option<EvaluatedBatch>,
+ schema: SchemaRef,
+ }
+
+ impl SingleBatchStream {
+ pub fn new(batch: EvaluatedBatch, schema: SchemaRef) -> Self {
+ Self {
+ batch: Some(batch),
+ schema,
+ }
+ }
+ }
+
+ impl Stream for SingleBatchStream {
+ type Item = datafusion_common::Result<EvaluatedBatch>; // Or
Result<EvaluatedBatch, DataFusionError>
+
+ fn poll_next(mut self: Pin<&mut Self>, _cx: &mut Context<'_>) ->
Poll<Option<Self::Item>> {
+ // `take()` removes the value from the Option, leaving `None` in
its place.
+ // If there is a batch, it maps it to `Some(Ok(batch))`.
+ // If it's already empty, it returns `None`.
+ Poll::Ready(self.batch.take().map(Ok))
+ }
+ }
+
+ impl EvaluatedBatchStream for SingleBatchStream {
+ fn is_external(&self) -> bool {
+ false
+ }
+
+ fn schema(&self) -> SchemaRef {
+ self.schema.clone()
+ }
+ }
+
+ async fn build_index(
+ builder: GPUSpatialIndexBuilder,
+ indexed_batch: EvaluatedBatch,
+ schema: SchemaRef,
+ ) -> SpatialIndexRef {
+ let single_batch_stream = SingleBatchStream::new(indexed_batch,
schema);
+ let sendable_stream: SendableEvaluatedBatchStream =
Box::pin(single_batch_stream);
+ let stats = GeoStatistics::empty();
+ let mut builder = Box::new(builder);
+ builder.add_stream(sendable_stream, stats).await.unwrap();
+ builder.finish().unwrap()
+ }
+
+ // 1. Create a new stream struct for multiple batches
+ pub struct VecBatchStream {
+ // IntoIter allows us to cleanly pop items off the vector one by one
+ batches: IntoIter<EvaluatedBatch>,
+ schema: SchemaRef,
+ }
+
+ impl VecBatchStream {
+ pub fn new(batches: Vec<EvaluatedBatch>, schema: SchemaRef) -> Self {
+ Self {
+ batches: batches.into_iter(),
+ schema,
+ }
+ }
+ }
+
+ impl Stream for VecBatchStream {
+ type Item = datafusion_common::Result<EvaluatedBatch>;
+
+ fn poll_next(mut self: Pin<&mut Self>, _cx: &mut Context<'_>) ->
Poll<Option<Self::Item>> {
+ // `next()` on IntoIter returns Option<EvaluatedBatch>
+ // We map it to Option<Result<EvaluatedBatch>> to match the
stream's Item type
+ Poll::Ready(self.batches.next().map(Ok))
+ }
+ }
+
+ impl EvaluatedBatchStream for VecBatchStream {
+ fn is_external(&self) -> bool {
+ false
+ }
+
+ fn schema(&self) -> SchemaRef {
+ self.schema.clone()
+ }
+ }
+
+ // 2. Write the new build_index function that accepts the Vec
+ async fn build_index_from_vec(
+ mut builder: Box<GPUSpatialIndexBuilder>,
+ indexed_batches: Vec<EvaluatedBatch>,
+ schema: SchemaRef,
+ ) -> SpatialIndexRef {
+ let vec_batch_stream = VecBatchStream::new(indexed_batches, schema);
+ let sendable_stream: SendableEvaluatedBatchStream =
Box::pin(vec_batch_stream);
+
+ let stats = GeoStatistics::empty();
+
+ // Add the stream of multiple batches to the builder
+ builder.add_stream(sendable_stream, stats).await.unwrap();
+ builder.finish().unwrap()
+ }
+ #[test]
+ fn test_spatial_index_builder_empty() {
+ let options = GpuOptions {
+ enable: true,
+ ..Default::default()
+ };
+ let metrics = SpatialJoinBuildMetrics::default();
+ let schema = Arc::new(arrow_schema::Schema::empty());
+ let spatial_predicate =
SpatialPredicate::Relation(RelationPredicate::new(
+ Arc::new(Column::new("geom", 0)),
+ Arc::new(Column::new("geom", 1)),
+ SpatialRelationType::Intersects,
+ ));
+
+ let mut builder = Box::new(GPUSpatialIndexBuilder::new(
+ schema.clone(),
+ spatial_predicate,
+ options,
+ JoinType::Inner,
+ 4,
+ metrics,
+ ));
+
+ // Test finishing with empty data
+ let index = builder.finish().unwrap();
+ assert_eq!(index.schema(), schema);
+ assert_eq!(index.num_indexed_batches(), 0);
+ }
+
+ #[tokio::test]
+ async fn test_spatial_index_builder_add_batch() {
+ let options = GpuOptions {
+ enable: true,
+ ..Default::default()
+ };
+ let metrics = SpatialJoinBuildMetrics::default();
+
+ let spatial_predicate =
SpatialPredicate::Relation(RelationPredicate::new(
+ Arc::new(Column::new("geom", 0)),
+ Arc::new(Column::new("geom", 1)),
+ SpatialRelationType::Intersects,
+ ));
+
+ // Create a simple test geometry batch
+ let schema = Arc::new(arrow_schema::Schema::new(vec![Field::new(
+ "geom",
+ DataType::Binary,
+ true,
+ )]));
+
+ let builder = GPUSpatialIndexBuilder::new(
+ schema.clone(),
+ spatial_predicate,
+ options,
+ JoinType::Inner,
+ 4,
+ metrics,
+ );
+
+ let geom_batch = create_array(
+ &[
+ Some("POINT (0.25 0.25)"),
+ Some("POINT (10 10)"),
+ None,
+ Some("POINT (0.25 0.25)"),
+ ],
+ &WKB_GEOMETRY,
+ );
+ let batch =
+ RecordBatch::try_new(schema.clone(),
vec![Arc::new(geom_batch.clone())]).unwrap();
+ let indexed_batch = EvaluatedBatch {
+ batch,
+ geom_array: EvaluatedGeometryArray::try_new(geom_batch,
&WKB_GEOMETRY).unwrap(),
+ };
+ let index = build_index(builder, indexed_batch, schema.clone()).await;
+
+ assert_eq!(index.schema(), schema);
+ assert_eq!(index.num_indexed_batches(), 1);
+ }
+
+ #[tokio::test]
+ async fn test_spatial_index_builder_add_multiple_batches() {
+ let gpu_options = GpuOptions {
+ enable: true,
+ concat_build: false,
+ ..Default::default()
+ };
+ let metrics = SpatialJoinBuildMetrics::default();
+
+ let spatial_predicate =
SpatialPredicate::Relation(RelationPredicate::new(
+ Arc::new(Column::new("geom", 0)),
+ Arc::new(Column::new("geom", 1)),
+ SpatialRelationType::Intersects,
+ ));
+
+ // Create the schema
+ let schema = Arc::new(arrow_schema::Schema::new(vec![Field::new(
+ "geom",
+ DataType::Binary,
+ true,
+ )]));
+
+ // Initialize the builder
+ let builder = GPUSpatialIndexBuilder::new(
+ schema.clone(),
+ spatial_predicate,
+ gpu_options,
+ JoinType::Inner,
+ 4,
+ metrics,
+ );
+
+ // --- Create Batch 1 ---
+ let geom_batch1 = create_array(
+ &[
+ Some("POINT (0.25 0.25)"),
+ Some("POINT (10 10)"),
+ None,
+ Some("POINT (0.25 0.25)"),
+ ],
+ &WKB_GEOMETRY,
+ );
+ let batch1 =
+ RecordBatch::try_new(schema.clone(),
vec![Arc::new(geom_batch1.clone())]).unwrap();
+ let evaluated_batch1 = EvaluatedBatch {
+ batch: batch1,
+ geom_array: EvaluatedGeometryArray::try_new(geom_batch1,
&WKB_GEOMETRY).unwrap(),
+ };
+
+ // --- Create Batch 2 ---
+ let geom_batch2 = create_array(
+ &[
+ Some("POINT (1 1)"),
+ Some("POINT (5 5)"),
+ Some("POINT (20 20)"),
+ ],
+ &WKB_GEOMETRY,
+ );
+ let batch2 =
+ RecordBatch::try_new(schema.clone(),
vec![Arc::new(geom_batch2.clone())]).unwrap();
+ let evaluated_batch2 = EvaluatedBatch {
+ batch: batch2,
+ geom_array: EvaluatedGeometryArray::try_new(geom_batch2,
&WKB_GEOMETRY).unwrap(),
+ };
+
+ // --- Build the Index ---
+ // Combine them into a Vec and use the multi-batch builder function
+ let indexed_batches = vec![evaluated_batch1, evaluated_batch2];
+
+ // Note: This relies on the `build_index_from_vec` function we created
earlier
+ let index = build_index_from_vec(Box::new(builder), indexed_batches,
schema.clone()).await;
+
+ // --- Assertions ---
+ assert_eq!(index.schema(), schema);
+
+ // Assert that exactly 2 batches were indexed
+ assert_eq!(index.num_indexed_batches(), 2);
+ }
+
+ async fn setup_index_for_batch_test(
+ build_geoms: &[Option<&str>],
+ options: GpuOptions,
+ ) -> SpatialIndexRef {
+ let metrics = SpatialJoinBuildMetrics::default();
+ let spatial_predicate =
SpatialPredicate::Relation(RelationPredicate::new(
+ Arc::new(Column::new("left", 0)),
+ Arc::new(Column::new("right", 0)),
+ SpatialRelationType::Intersects,
+ ));
+ let schema = Arc::new(arrow_schema::Schema::new(vec![Field::new(
+ "geom",
+ DataType::Binary,
+ true,
+ )]));
+
+ let builder = GPUSpatialIndexBuilder::new(
+ schema.clone(),
+ spatial_predicate,
+ options,
+ JoinType::Inner,
+ 1,
+ metrics,
+ );
+
+ let geom_array = create_array(build_geoms, &WKB_GEOMETRY);
+ let batch = RecordBatch::try_new(
+ Arc::new(arrow_schema::Schema::new(vec![Field::new(
+ "geom",
+ DataType::Binary,
+ true,
+ )])),
+ vec![Arc::new(geom_array.clone())],
+ )
+ .unwrap();
+ let evaluated_batch = EvaluatedBatch {
+ batch,
+ geom_array: EvaluatedGeometryArray::try_new(geom_array,
&WKB_GEOMETRY).unwrap(),
+ };
+ build_index(builder, evaluated_batch, schema).await
+ }
+
+ fn create_probe_batch(probe_geoms: &[Option<&str>]) -> Arc<EvaluatedBatch>
{
+ let geom_array = create_array(probe_geoms, &WKB_GEOMETRY);
+ let batch = RecordBatch::try_new(
+ Arc::new(arrow_schema::Schema::new(vec![Field::new(
+ "geom",
+ DataType::Binary,
+ true,
+ )])),
+ vec![Arc::new(geom_array.clone())],
+ )
+ .unwrap();
+ Arc::new(EvaluatedBatch {
+ batch,
+ geom_array: EvaluatedGeometryArray::try_new(geom_array,
&WKB_GEOMETRY).unwrap(),
+ })
+ }
+ #[tokio::test]
+ async fn test_query_batch_empty_results() {
+ let build_geoms = &[Some("POINT (0 0)"), Some("POINT (1 1)")];
+ let options = GpuOptions {
+ enable: true,
+ ..Default::default()
+ };
+ let index = setup_index_for_batch_test(build_geoms, options).await;
+
+ // Probe with geometries that don't intersect
+ let probe_geoms = &[Some("POINT (10 10)"), Some("POINT (20 20)")];
+ let probe_batch = create_probe_batch(probe_geoms);
+
+ let mut build_batch_positions = Vec::new();
+ let mut probe_indices = Vec::new();
+ let (metrics, next_idx) = index
+ .query_batch(
+ &probe_batch,
+ 0..2,
+ usize::MAX,
+ &mut build_batch_positions,
+ &mut probe_indices,
+ )
+ .await
+ .unwrap();
+
+ assert_eq!(metrics.count, 0);
+ assert_eq!(build_batch_positions.len(), 0);
+ assert_eq!(probe_indices.len(), 0);
+ assert_eq!(next_idx, 2);
+ }
+
+ #[tokio::test]
+ async fn test_query_batch_non_empty_results_multiple_build_batches() {
+ let gpu_options = GpuOptions {
+ enable: true,
+ concat_build: false,
+ ..Default::default()
+ };
+ let metrics = SpatialJoinBuildMetrics::default();
+
+ let spatial_predicate =
SpatialPredicate::Relation(RelationPredicate::new(
+ Arc::new(Column::new("geom", 0)),
+ Arc::new(Column::new("geom", 1)),
+ SpatialRelationType::Intersects,
+ ));
+
+ let schema = Arc::new(arrow_schema::Schema::new(vec![Field::new(
+ "geom",
+ DataType::Binary,
+ true,
+ )]));
+
+ let builder = GPUSpatialIndexBuilder::new(
+ schema.clone(),
+ spatial_predicate,
+ gpu_options,
+ JoinType::Inner,
+ 4,
+ metrics,
+ );
+
+ // --- Build Side: Multiple Batches of Polygons ---
+ // Batch 0
+ let build_geom_batch0 = create_array(
+ &[
+ Some("POLYGON ((0 0, 0 10, 10 10, 10 0, 0 0))"),
+ Some("POLYGON ((20 20, 20 30, 30 30, 30 20, 20 20))"),
+ ],
+ &WKB_GEOMETRY,
+ );
+ let build_batch0 =
+ RecordBatch::try_new(schema.clone(),
vec![Arc::new(build_geom_batch0.clone())])
+ .unwrap();
+ let evaluated_build0 = EvaluatedBatch {
+ batch: build_batch0,
+ geom_array: EvaluatedGeometryArray::try_new(build_geom_batch0,
&WKB_GEOMETRY).unwrap(),
+ };
+
+ // Batch 1
+ let build_geom_batch1 = create_array(
+ &[Some("POLYGON ((40 40, 40 50, 50 50, 50 40, 40 40))")],
+ &WKB_GEOMETRY,
+ );
+ let build_batch1 =
+ RecordBatch::try_new(schema.clone(),
vec![Arc::new(build_geom_batch1.clone())])
+ .unwrap();
+ let evaluated_build1 = EvaluatedBatch {
+ batch: build_batch1,
+ geom_array: EvaluatedGeometryArray::try_new(build_geom_batch1,
&WKB_GEOMETRY).unwrap(),
+ };
+
+ // Build the multi-batch index using the helper function we created
previously
+ let indexed_batches = vec![evaluated_build0, evaluated_build1];
+ let index = build_index_from_vec(Box::new(builder), indexed_batches,
schema.clone()).await;
+
+ // --- Probe Side: One Batch of Points ---
+ let probe_geoms = &[
+ Some("POINT (5 5)"), // Matches Batch 0, Row 0
+ Some("POINT (100 100)"), // No match
+ Some("POINT (25 25)"), // Matches Batch 0, Row 1
+ Some("POINT (45 45)"), // Matches Batch 1, Row 0
+ ];
+ let probe_batch = create_probe_batch(probe_geoms);
+
+ // --- Execute Query ---
+ let mut build_batch_positions = Vec::new();
+ let mut probe_indices = Vec::new();
+
+ // Probing all 4 points
+ let (query_metrics, next_idx) = index
+ .query_batch(
+ &probe_batch,
+ 0..4,
+ usize::MAX,
+ &mut build_batch_positions,
+ &mut probe_indices,
+ )
+ .await
+ .unwrap();
+
+ // --- Assertions ---
+ // We expect exactly 3 matches out of the 4 probe points
+ assert_eq!(query_metrics.count, 3);
+ assert_eq!(build_batch_positions.len(), 3);
+ assert_eq!(probe_indices.len(), 3);
+
+ // The probe indices should match the rows in our probe batch that
successfully intersected
+ assert_eq!(probe_indices, vec![0, 2, 3]);
+
+ // The query processed all 4 probe points
+ assert_eq!(next_idx, 4);
+ }
+}
diff --git
a/rust/sedona-spatial-join-gpu/src/index/gpu_spatial_index_builder.rs
b/rust/sedona-spatial-join-gpu/src/index/gpu_spatial_index_builder.rs
new file mode 100644
index 00000000..224e7852
--- /dev/null
+++ b/rust/sedona-spatial-join-gpu/src/index/gpu_spatial_index_builder.rs
@@ -0,0 +1,286 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::index::gpu_spatial_index::GPUSpatialIndex;
+use crate::options::GpuOptions;
+use arrow::array::BooleanBufferBuilder;
+use arrow::compute::concat_batches;
+use arrow_array::RecordBatch;
+use arrow_schema::SchemaRef;
+use async_trait::async_trait;
+use datafusion_common::Result;
+use datafusion_common::{DataFusionError, JoinType};
+use futures::StreamExt;
+use geo_types::{coord, Rect};
+use parking_lot::Mutex;
+use sedona_common::{sedona_internal_err, SpatialJoinOptions};
+use sedona_expr::statistics::GeoStatistics;
+use sedona_libgpuspatial::{GpuSpatialIndex, GpuSpatialOptions,
GpuSpatialRefiner};
+use
sedona_spatial_join::evaluated_batch::evaluated_batch_stream::SendableEvaluatedBatchStream;
+use sedona_spatial_join::evaluated_batch::EvaluatedBatch;
+use sedona_spatial_join::index::spatial_index::SpatialIndexRef;
+use sedona_spatial_join::index::spatial_index_builder::{
+ SpatialIndexBuilder, SpatialJoinBuildMetrics,
+};
+use sedona_spatial_join::operand_evaluator::EvaluatedGeometryArray;
+use sedona_spatial_join::utils::join_utils::need_produce_result_in_final;
+use sedona_spatial_join::SpatialPredicate;
+use std::sync::atomic::AtomicUsize;
+use std::sync::Arc;
+
+pub(crate) struct GPUSpatialIndexBuilder {
+ schema: SchemaRef,
+ spatial_predicate: SpatialPredicate,
+ gpu_options: GpuOptions,
+ join_type: JoinType,
+ probe_threads_count: usize,
+ metrics: SpatialJoinBuildMetrics,
+ /// Batches to be indexed
+ indexed_batches: Vec<EvaluatedBatch>,
+ /// Statistics for indexed geometries
+ memory_used: usize,
+}
+
+impl GPUSpatialIndexBuilder {
+ pub fn new(
+ schema: SchemaRef,
+ spatial_predicate: SpatialPredicate,
+ gpu_options: GpuOptions,
+ join_type: JoinType,
+ probe_threads_count: usize,
+ metrics: SpatialJoinBuildMetrics,
+ ) -> Self {
+ Self {
+ schema,
+ spatial_predicate,
+ gpu_options,
+ join_type,
+ probe_threads_count,
+ metrics,
+ indexed_batches: vec![],
+ memory_used: 0,
+ }
+ }
+
+ fn build_visited_bitmaps(&mut self) ->
Result<Option<Mutex<Vec<BooleanBufferBuilder>>>> {
+ if !need_produce_result_in_final(self.join_type) {
+ return Ok(None);
+ }
+
+ let mut bitmaps = Vec::with_capacity(self.indexed_batches.len());
+ let mut total_buffer_size = 0;
+
+ for batch in &self.indexed_batches {
+ let batch_rows = batch.batch.num_rows();
+ let buffer_size = batch_rows.div_ceil(8);
+ total_buffer_size += buffer_size;
+
+ let mut bitmap = BooleanBufferBuilder::new(batch_rows);
+ bitmap.append_n(batch_rows, false);
+ bitmaps.push(bitmap);
+ }
+
+ self.record_memory_usage(total_buffer_size);
+
+ Ok(Some(Mutex::new(bitmaps)))
+ }
+
+ fn record_memory_usage(&mut self, bytes: usize) {
+ self.memory_used += bytes;
+ self.metrics.build_mem_used.set_max(self.memory_used);
+ }
+ /// Add a geometry batch to be indexed.
+ ///
+ /// This method accumulates geometry batches that will be used to build
the spatial index.
+ /// Each batch contains processed geometry data along with memory usage
information.
+ fn add_batch(&mut self, indexed_batch: EvaluatedBatch) -> Result<()> {
+ let in_mem_size = indexed_batch.in_mem_size()?;
+ self.indexed_batches.push(indexed_batch);
+ self.record_memory_usage(in_mem_size);
+ Ok(())
+ }
+
+ pub(crate) fn estimate_extra_memory_usage(
+ geo_stats: &GeoStatistics,
+ _spatial_predicate: &SpatialPredicate,
+ _options: &SpatialJoinOptions,
+ ) -> usize {
+ let num_geoms = geo_stats.total_geometries().unwrap_or(0) as usize;
+ // This line estimates the size of temporary memory space for an array
of bounding boxes,
+ // which will be fed to the GPU library to build an on-device index.
+ // The memory is allocated until the finishing of a spatial join.
+ // Each geometry requires 4 f32 values to store the bounding rectangle
(min_x, min_y, max_x, max_y)
+ num_geoms * (4 * 4)
+ }
+}
+fn concat_evaluated_batches(batches: &[EvaluatedBatch]) ->
Result<EvaluatedBatch> {
+ if batches.is_empty() {
+ return sedona_internal_err!("Cannot concatenate empty list of
EvaluatedBatches");
+ }
+
+ // 1. Concatenate the underlying Arrow RecordBatches
+ let schema = batches[0].schema();
+ let record_batches: Vec<&RecordBatch> = batches.iter().map(|b|
&b.batch).collect();
+ let concatenated_batch = concat_batches(&schema, record_batches)?;
+
+ // 2. Prepare for Geometry Interleaving
+ // We need to create a list of (batch_index, row_index) for every row in
order
+ let mut indices = Vec::with_capacity(concatenated_batch.num_rows());
+ for (batch_idx, batch) in batches.iter().enumerate() {
+ for row_idx in 0..batch.num_rows() {
+ indices.push((batch_idx, row_idx));
+ }
+ }
+
+ // 3. Concatenate Geometry Arrays using the interleave method
+ let geom_arrays: Vec<&EvaluatedGeometryArray> = batches.iter().map(|b|
&b.geom_array).collect();
+
+ let concatenated_geom_array =
EvaluatedGeometryArray::interleave(&geom_arrays, &indices)?;
+
+ Ok(EvaluatedBatch {
+ batch: concatenated_batch,
+ geom_array: concatenated_geom_array,
+ })
+}
+#[async_trait]
+impl SpatialIndexBuilder for GPUSpatialIndexBuilder {
+ /// Finish building and return the completed SpatialIndex.
+ fn finish(&mut self) -> Result<SpatialIndexRef> {
+ if self.indexed_batches.is_empty() {
+ let visited_build_side = self.build_visited_bitmaps()?;
+ return Ok(Arc::new(GPUSpatialIndex::empty(
+ self.spatial_predicate.clone(),
+ self.schema.clone(),
+ self.gpu_options.clone(),
+ visited_build_side,
+ AtomicUsize::new(self.probe_threads_count),
+ )?));
+ }
+ let build_timer = self.metrics.build_time.timer();
+ let gpu_options = GpuSpatialOptions {
+ cuda_use_memory_pool: self.gpu_options.use_memory_pool,
+ cuda_memory_pool_init_percent:
self.gpu_options.memory_pool_init_percentage as i32,
+ concurrency: self.probe_threads_count as u32,
+ device_id: self.gpu_options.device_id as i32,
+ compress_bvh: self.gpu_options.compress_bvh,
+ pipeline_batches: self.gpu_options.pipeline_batches as u32,
+ };
+
+ let mut index = GpuSpatialIndex::try_new(&gpu_options).map_err(|e| {
+ DataFusionError::Execution(format!("Failed to initialize GPU
Index: {e:?}"))
+ })?;
+ let mut refiner = GpuSpatialRefiner::try_new(&gpu_options).map_err(|e|
{
+ DataFusionError::Execution(format!("Failed to initialize GPU
Refiner: {e:?}"))
+ })?;
+
+ // Concat indexed batches into a single batch to reduce build time
+ let total_rows: usize = self
+ .indexed_batches
+ .iter()
+ .map(|batch| batch.batch.num_rows())
+ .sum();
+
+ let sedona_type =
self.indexed_batches[0].geom_array.sedona_type().clone();
+
+ if self.gpu_options.concat_build {
+ let concat_batch =
concat_evaluated_batches(&self.indexed_batches)?;
+ self.indexed_batches.clear();
+ self.indexed_batches.push(concat_batch);
+ }
+ let mut data_id_to_batch_pos: Vec<(i32, i32)> = Vec::with_capacity(
+ self.indexed_batches
+ .iter()
+ .map(|x| x.batch.num_rows())
+ .sum(),
+ );
+ let empty_rect = Rect::new(
+ coord!(x: f32::NAN, y: f32::NAN),
+ coord!(x: f32::NAN, y: f32::NAN),
+ );
+
+ refiner
+ .init_build_schema(sedona_type.storage_type())
+ .map_err(|e| {
+ DataFusionError::Execution(format!("Failed to init schema for
refiner {e:?}"))
+ })?;
+
+ let mut native_rects = Vec::with_capacity(total_rows);
+
+ for (batch_idx, batch) in self.indexed_batches.iter().enumerate() {
+ let rects = batch.geom_array.rects();
+
+ for (idx, rect_opt) in rects.iter().enumerate() {
+ if let Some(rect) = rect_opt {
+ native_rects.push(*rect);
+ } else {
+ native_rects.push(empty_rect);
+ }
+ data_id_to_batch_pos.push((batch_idx as i32, idx as i32));
+ }
+ refiner
+ .push_build(batch.geom_array.geometry_array())
+ .map_err(|e| {
+ DataFusionError::Execution(format!(
+ "Failed to add geometries to GPU refiner {e:?}"
+ ))
+ })?;
+ }
+
+ refiner.finish_building().map_err(|e| {
+ DataFusionError::Execution(format!("Failed to build spatial
refiner on GPU {e:?}"))
+ })?;
+
+ // Add rectangles from build side to the spatial index
+ index.push_build(&native_rects).map_err(|e| {
+ DataFusionError::Execution(format!(
+ "Failed to push rectangles to GPU spatial index {e:?}"
+ ))
+ })?;
+
+ index.finish_building().map_err(|e| {
+ DataFusionError::Execution(format!("Failed to build spatial index
on GPU {e:?}"))
+ })?;
+ build_timer.done();
+ let visited_build_side = self.build_visited_bitmaps()?;
+ // Build index for rectangle queries
+ Ok(Arc::new(GPUSpatialIndex {
+ schema: self.schema.clone(),
+ index: Arc::new(index),
+ refiner: Arc::new(refiner),
+ spatial_predicate: self.spatial_predicate.clone(),
+ indexed_batches: self
+ .indexed_batches
+ .drain(0..self.indexed_batches.len())
+ .collect(),
+ data_id_to_batch_pos,
+ visited_build_side,
+ probe_threads_counter: AtomicUsize::new(self.probe_threads_count),
+ }))
+ }
+
+ async fn add_stream(
+ &mut self,
+ mut stream: SendableEvaluatedBatchStream,
+ _geo_statistics: GeoStatistics,
+ ) -> Result<()> {
+ while let Some(batch) = stream.next().await {
+ let indexed_batch = batch?;
+ self.add_batch(indexed_batch)?;
+ }
+ Ok(())
+ }
+}
diff --git a/rust/sedona-spatial-join-gpu/src/join_provider.rs
b/rust/sedona-spatial-join-gpu/src/join_provider.rs
new file mode 100644
index 00000000..3a206180
--- /dev/null
+++ b/rust/sedona-spatial-join-gpu/src/join_provider.rs
@@ -0,0 +1,142 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::index::GpuSpatialIndexBuilder;
+use crate::options::GpuOptions;
+use arrow_array::ArrayRef;
+use arrow_schema::SchemaRef;
+use datafusion_common::JoinType;
+use datafusion_common::Result;
+use geo_index::rtree::util::f64_box_to_f32;
+use geo_types::{coord, Rect};
+use sedona_common::SpatialJoinOptions;
+use sedona_expr::statistics::GeoStatistics;
+use sedona_functions::executor::IterGeo;
+use sedona_geo_generic_alg::BoundingRect;
+use sedona_schema::datatypes::SedonaType;
+use sedona_spatial_join::index::spatial_index_builder::{
+ SpatialIndexBuilder, SpatialJoinBuildMetrics,
+};
+use sedona_spatial_join::join_provider::SpatialJoinProvider;
+use sedona_spatial_join::operand_evaluator::{
+ EvaluatedGeometryArray, EvaluatedGeometryArrayFactory,
+};
+use sedona_spatial_join::SpatialPredicate;
+use std::sync::Arc;
+use wkb::reader::GeometryType;
+
+#[derive(Debug)]
+pub(crate) struct GpuSpatialJoinProvider {
+ gpu_options: GpuOptions,
+}
+
+impl GpuSpatialJoinProvider {
+ pub(crate) fn new(gpu_options: GpuOptions) -> Self {
+ Self { gpu_options }
+ }
+}
+
+impl SpatialJoinProvider for GpuSpatialJoinProvider {
+ fn try_new_spatial_index_builder(
+ &self,
+ schema: SchemaRef,
+ spatial_predicate: SpatialPredicate,
+ _options: SpatialJoinOptions,
+ join_type: JoinType,
+ probe_threads_count: usize,
+ metrics: SpatialJoinBuildMetrics,
+ ) -> Result<Box<dyn SpatialIndexBuilder>> {
+ let builder = GpuSpatialIndexBuilder::new(
+ schema,
+ spatial_predicate,
+ self.gpu_options.clone(),
+ join_type,
+ probe_threads_count,
+ metrics,
+ );
+ Ok(Box::new(builder))
+ }
+
+ fn estimate_extra_memory_usage(
+ &self,
+ geo_stats: &GeoStatistics,
+ spatial_predicate: &SpatialPredicate,
+ options: &SpatialJoinOptions,
+ ) -> usize {
+ GpuSpatialIndexBuilder::estimate_extra_memory_usage(geo_stats,
spatial_predicate, options)
+ }
+
+ fn evaluated_array_factory(&self) -> Arc<dyn
EvaluatedGeometryArrayFactory> {
+ Arc::new(DefaultGeometryArrayFactory)
+ }
+}
+
+#[derive(Debug)]
+pub(crate) struct DefaultGeometryArrayFactory;
+
+impl EvaluatedGeometryArrayFactory for DefaultGeometryArrayFactory {
+ fn try_new_evaluated_array(
+ &self,
+ geometry_array: ArrayRef,
+ sedona_type: &SedonaType,
+ ) -> Result<EvaluatedGeometryArray> {
+ let num_rows = geometry_array.len();
+ let mut rect_vec = Vec::with_capacity(num_rows);
+ geometry_array.iter_as_wkb(sedona_type, num_rows, |wkb_opt| {
+ let rect_opt = if let Some(wkb) = &wkb_opt {
+ // This piece of code checks whether the underlying geometry
is a point
+ // By representing the point with an MBR with the same min
corner and max corner,
+ // libgpuspatial treats the MBR as a point, which triggers an
optimized point query
+ // instead of using rect-rect query for high performance
+ // Ref:
https://github.com/apache/sedona-db/blob/9187f8b8c4ca52b64837fab5fddd377703f7331b/c/sedona-libgpuspatial/libgpuspatial/src/rt_spatial_index.cu#L374
+ if let Some(rect) = wkb.bounding_rect() {
+ let min = rect.min();
+ let max = rect.max();
+ // Why conservative bounding boxes prevent false negatives:
+ // 1. P32 = round_nearest(P64), so P32 is the closest
possible float to P64.
+ // 2. Min32 = round_down(Min64), guaranteeing Min32 <=
Min64.
+ // 3. Max32 = round_up(Max64), guaranteeing Max32 >= Max64.
+ // If P64 is inside Box64 (Min64 <= P64 <= Max64), P32
cannot fall outside Box32.
+ // If P32 < Min32, it would mean Min32 is closer to P64
than P32 is, which
+ // contradicts P32 being the nearest float. Therefore,
false negatives are impossible.
+ if wkb.geometry_type() == GeometryType::Point {
+ Some(Rect::new(
+ coord!(x: min.x as f32, y: min.y as f32),
+ coord!(x: max.x as f32, y: max.y as f32),
+ ))
+ } else {
+ let (min_x, min_y, max_x, max_y) =
+ f64_box_to_f32(min.x, min.y, max.x, max.y);
+ Some(Rect::new(
+ coord!(x: min_x, y: min_y),
+ coord!(x: max_x, y: max_y),
+ ))
+ }
+ } else {
+ None
+ }
+ } else {
+ None
+ };
+
+ rect_vec.push(rect_opt);
+ Ok(())
+ })?;
+
+ EvaluatedGeometryArray::try_new_with_rects(geometry_array, rect_vec,
sedona_type)
+ }
+}
diff --git a/rust/sedona-spatial-join/src/utils.rs
b/rust/sedona-spatial-join-gpu/src/lib.rs
similarity index 77%
copy from rust/sedona-spatial-join/src/utils.rs
copy to rust/sedona-spatial-join-gpu/src/lib.rs
index ae784899..809e9acb 100644
--- a/rust/sedona-spatial-join/src/utils.rs
+++ b/rust/sedona-spatial-join-gpu/src/lib.rs
@@ -15,11 +15,8 @@
// specific language governing permissions and limitations
// under the License.
-pub(crate) mod arrow_utils;
-pub(crate) mod bbox_sampler;
-pub(crate) mod disposable_async_cell;
-pub(crate) mod init_once_array;
-pub mod internal_benchmark_util;
-pub(crate) mod join_utils;
-pub(crate) mod once_fut;
-pub(crate) mod spill;
+mod index;
+mod join_provider;
+
+pub mod options;
+pub mod physical_planner;
diff --git a/rust/sedona-spatial-join-gpu/src/options.rs
b/rust/sedona-spatial-join-gpu/src/options.rs
new file mode 100644
index 00000000..452a7c87
--- /dev/null
+++ b/rust/sedona-spatial-join-gpu/src/options.rs
@@ -0,0 +1,51 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use datafusion_common::{config::ConfigExtension, extensions_options};
+
+extensions_options! {
+ /// Configuration options for GPU-accelerated spatial joins.
+ pub struct GpuOptions {
+ /// Enable GPU-accelerated spatial joins (requires CUDA and GPU
feature flag)
+ pub enable: bool, default = false
+
+ /// Concatenate all build-side geometries into one buffer for GPU
processing
+ pub concat_build: bool, default = true
+
+ /// GPU device ID to use (0 = first GPU, 1 = second, etc.)
+ pub device_id: usize, default = 0
+
+ /// Fall back to CPU if GPU initialization or execution fails
+ pub fallback_to_cpu: bool, default = true
+
+ /// Use CUDA memory pool for GPU memory management
+ pub use_memory_pool: bool, default = true
+
+ /// Percentage of total GPU memory for initializing CUDA memory pool
(0-100)
+ pub memory_pool_init_percentage: usize, default = 50
+
+ /// Overlap parsing and refinement by pipelining multiple batches; 1
means no pipelining
+ pub pipeline_batches: usize, default = 1
+
+ /// Compress BVH to reduce memory usage (may reduce performance)
+ pub compress_bvh: bool, default = false
+ }
+}
+
+impl ConfigExtension for GpuOptions {
+ const PREFIX: &'static str = "gpu";
+}
diff --git a/rust/sedona-spatial-join-gpu/src/physical_planner.rs
b/rust/sedona-spatial-join-gpu/src/physical_planner.rs
new file mode 100644
index 00000000..7b2593aa
--- /dev/null
+++ b/rust/sedona-spatial-join-gpu/src/physical_planner.rs
@@ -0,0 +1,202 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::sync::Arc;
+
+use crate::join_provider::GpuSpatialJoinProvider;
+use crate::options::GpuOptions;
+use arrow_schema::Schema;
+use datafusion::physical_plan::ExecutionPlan;
+use datafusion_common::{DataFusionError, Result};
+use datafusion_physical_expr::PhysicalExpr;
+use sedona_common::SpatialJoinOptions;
+use sedona_query_planner::spatial_join_physical_planner::{
+ PlanSpatialJoinArgs, SpatialJoinPhysicalPlanner,
+};
+use sedona_query_planner::spatial_predicate::{
+ RelationPredicate, SpatialPredicate, SpatialRelationType,
+};
+use sedona_schema::datatypes::SedonaType;
+use sedona_schema::matchers::ArgMatcher;
+use sedona_spatial_join::SpatialJoinExec;
+
+/// [SpatialJoinFactory] implementation for the default spatial join
+///
+/// This struct is the entrypoint to ensuring the SedonaQueryPlanner is able
+/// to instantiate the [ExecutionPlan] implemented in this crate.
+#[derive(Debug)]
+pub struct GpuSpatialJoinPhysicalPlanner;
+
+impl GpuSpatialJoinPhysicalPlanner {
+ /// Create a new default join factory
+ pub fn new() -> Self {
+ Self {}
+ }
+}
+
+impl Default for GpuSpatialJoinPhysicalPlanner {
+ fn default() -> Self {
+ Self::new()
+ }
+}
+
+impl SpatialJoinPhysicalPlanner for GpuSpatialJoinPhysicalPlanner {
+ fn plan_spatial_join(
+ &self,
+ args: &PlanSpatialJoinArgs<'_>,
+ ) -> Result<Option<Arc<dyn ExecutionPlan>>> {
+ let supported = is_spatial_predicate_supported(
+ args.spatial_predicate,
+ &args.physical_left.schema(),
+ &args.physical_right.schema(),
+ )?;
+ let gpu_options = args
+ .options
+ .extensions
+ .get::<GpuOptions>()
+ .cloned()
+ .unwrap_or_default();
+
+ if !gpu_options.enable {
+ return Ok(None);
+ }
+
+ if !supported {
+ if gpu_options.fallback_to_cpu {
+ log::warn!("Falling back to CPU spatial join as the spatial
predicate is not supported on GPU");
+ return Ok(None);
+ } else {
+ return Err(DataFusionError::Plan("GPU spatial join is enabled,
but the spatial predicate is not supported on GPU".into()));
+ }
+ }
+
+ let should_swap = !matches!(
+ args.spatial_predicate,
+ SpatialPredicate::KNearestNeighbors(_)
+ ) && args.join_type.supports_swap()
+ && should_swap_join_order(
+ args.join_options,
+ args.physical_left.as_ref(),
+ args.physical_right.as_ref(),
+ )?;
+
+ let exec = SpatialJoinExec::try_new(
+ args.physical_left.clone(),
+ args.physical_right.clone(),
+ args.spatial_predicate.clone(),
+ args.remainder.cloned(),
+ args.join_type,
+ None,
+ args.join_options,
+ )?;
+ let exec =
+
exec.with_spatial_join_provider(Arc::new(GpuSpatialJoinProvider::new(gpu_options)));
+
+ if should_swap {
+ exec.swap_inputs().map(Some)
+ } else {
+ Ok(Some(Arc::new(exec) as Arc<dyn ExecutionPlan>))
+ }
+ }
+}
+
+/// Spatial join reordering heuristic:
+/// 1. Put the input with fewer rows on the build side, because fewer entries
+/// produce a smaller and more efficient spatial index (R-tree).
+/// 2. If row-count statistics are unavailable (for example, for CSV sources),
+/// fall back to total input size as an estimate.
+/// 3. Do not swap the join order if join reordering is disabled or no relevant
+/// statistics are available.
+fn should_swap_join_order(
+ spatial_join_options: &SpatialJoinOptions,
+ left: &dyn ExecutionPlan,
+ right: &dyn ExecutionPlan,
+) -> Result<bool> {
+ if !spatial_join_options.spatial_join_reordering {
+ log::info!(
+ "spatial join swap heuristic disabled via
sedona.spatial_join.spatial_join_reordering"
+ );
+ return Ok(false);
+ }
+
+ let left_stats = left.partition_statistics(None)?;
+ let right_stats = right.partition_statistics(None)?;
+
+ let left_num_rows = left_stats.num_rows;
+ let right_num_rows = right_stats.num_rows;
+ let left_total_byte_size = left_stats.total_byte_size;
+ let right_total_byte_size = right_stats.total_byte_size;
+
+ let should_swap = match (left_num_rows.get_value(),
right_num_rows.get_value()) {
+ (Some(l), Some(r)) => l > r,
+ _ => match (
+ left_total_byte_size.get_value(),
+ right_total_byte_size.get_value(),
+ ) {
+ (Some(l), Some(r)) => l > r,
+ _ => false,
+ },
+ };
+
+ log::info!(
+ "spatial join swap heuristic: left_num_rows={left_num_rows:?},
right_num_rows={right_num_rows:?},
left_total_byte_size={left_total_byte_size:?},
right_total_byte_size={right_total_byte_size:?}, should_swap={should_swap}"
+ );
+
+ Ok(should_swap)
+}
+
+pub fn is_spatial_predicate_supported(
+ spatial_predicate: &SpatialPredicate,
+ left_schema: &Schema,
+ right_schema: &Schema,
+) -> Result<bool> {
+ fn is_geometry_type_supported(expr: &Arc<dyn PhysicalExpr>, schema:
&Schema) -> Result<bool> {
+ let return_field = expr.return_field(schema)?;
+ let sedona_type = SedonaType::from_storage_field(&return_field)?;
+ Ok(ArgMatcher::is_geometry().match_type(&sedona_type))
+ }
+
+ let both_geometry =
+ |left: &Arc<dyn PhysicalExpr>, right: &Arc<dyn PhysicalExpr>| ->
Result<bool> {
+ Ok(is_geometry_type_supported(left, left_schema)?
+ && is_geometry_type_supported(right, right_schema)?)
+ };
+
+ match spatial_predicate {
+ SpatialPredicate::Relation(RelationPredicate {
+ left,
+ right,
+ relation_type,
+ }) => {
+ if !matches!(
+ relation_type,
+ SpatialRelationType::Intersects
+ | SpatialRelationType::Contains
+ | SpatialRelationType::Within
+ | SpatialRelationType::Covers
+ | SpatialRelationType::CoveredBy
+ | SpatialRelationType::Touches
+ | SpatialRelationType::Equals
+ ) {
+ return Ok(false);
+ }
+
+ both_geometry(left, right)
+ }
+ SpatialPredicate::Distance(_) | SpatialPredicate::KNearestNeighbors(_)
=> Ok(false),
+ }
+}
diff --git a/rust/sedona-spatial-join/src/exec.rs
b/rust/sedona-spatial-join/src/exec.rs
index 42ca6ebb..0b8c61a2 100644
--- a/rust/sedona-spatial-join/src/exec.rs
+++ b/rust/sedona-spatial-join/src/exec.rs
@@ -179,6 +179,14 @@ impl SpatialJoinExec {
})
}
+ /// Create a new [`SpatialJoinExec`] with customized
[`SpatialJoinProvider`]
+ pub fn with_spatial_join_provider(
+ mut self,
+ join_provider: Arc<dyn SpatialJoinProvider>,
+ ) -> Self {
+ self.join_provider = join_provider;
+ self
+ }
/// How the join is performed
pub fn join_type(&self) -> &JoinType {
&self.join_type
diff --git a/rust/sedona-spatial-join/src/index.rs
b/rust/sedona-spatial-join/src/index.rs
index 9c3c00c5..fb732954 100644
--- a/rust/sedona-spatial-join/src/index.rs
+++ b/rust/sedona-spatial-join/src/index.rs
@@ -21,8 +21,8 @@ pub(crate) mod default_spatial_index_builder;
mod knn_adapter;
pub(crate) mod memory_plan;
pub(crate) mod partitioned_index_provider;
-pub(crate) mod spatial_index;
-pub(crate) mod spatial_index_builder;
+pub mod spatial_index;
+pub mod spatial_index_builder;
pub(crate) use build_side_collector::{
BuildPartition, BuildSideBatchesCollector, CollectBuildSideMetrics,
diff --git a/rust/sedona-spatial-join/src/utils.rs
b/rust/sedona-spatial-join/src/utils.rs
index ae784899..a600616f 100644
--- a/rust/sedona-spatial-join/src/utils.rs
+++ b/rust/sedona-spatial-join/src/utils.rs
@@ -20,6 +20,6 @@ pub(crate) mod bbox_sampler;
pub(crate) mod disposable_async_cell;
pub(crate) mod init_once_array;
pub mod internal_benchmark_util;
-pub(crate) mod join_utils;
+pub mod join_utils;
pub(crate) mod once_fut;
pub(crate) mod spill;
diff --git a/rust/sedona-spatial-join/src/utils/join_utils.rs
b/rust/sedona-spatial-join/src/utils/join_utils.rs
index 9c6565e9..722a189f 100644
--- a/rust/sedona-spatial-join/src/utils/join_utils.rs
+++ b/rust/sedona-spatial-join/src/utils/join_utils.rs
@@ -50,7 +50,7 @@ use crate::SpatialPredicate;
///
/// For example of the `Left` join, in each iteration of right side, can get
the matched result, but need
/// to maintain the matched indices bit map to get the unmatched row for the
left side.
-pub(crate) fn need_produce_result_in_final(join_type: JoinType) -> bool {
+pub fn need_produce_result_in_final(join_type: JoinType) -> bool {
matches!(
join_type,
JoinType::Left
diff --git a/rust/sedona/Cargo.toml b/rust/sedona/Cargo.toml
index b0787420..b3e53ce6 100644
--- a/rust/sedona/Cargo.toml
+++ b/rust/sedona/Cargo.toml
@@ -43,6 +43,7 @@ pointcloud = ["dep:sedona-pointcloud"]
proj = ["sedona-proj/proj-sys"]
gdal = ["sedona-gdal/gdal-sys"]
spatial-join = ["dep:sedona-spatial-join"]
+gpu = ["dep:sedona-spatial-join-gpu", "sedona-spatial-join-gpu/gpu"]
s2geography = ["dep:sedona-s2geography"]
[dev-dependencies]
@@ -82,6 +83,7 @@ sedona-gdal = { workspace = true }
sedona-raster-functions = { workspace = true }
sedona-schema = { workspace = true }
sedona-spatial-join = { workspace = true, optional = true }
+sedona-spatial-join-gpu = { workspace = true, optional = true }
sedona-query-planner = { workspace = true }
sedona-s2geography = { workspace = true, optional = true }
sedona-testing = { workspace = true }
diff --git a/rust/sedona/src/context.rs b/rust/sedona/src/context.rs
index b85e3be8..77ff4e9b 100644
--- a/rust/sedona/src/context.rs
+++ b/rust/sedona/src/context.rs
@@ -64,6 +64,9 @@ use sedona_pointcloud::las::{
format::{Extension, LasFormatFactory},
options::{GeometryEncoding, LasExtraBytes, LasOptions},
};
+#[cfg(feature = "gpu")]
+use sedona_spatial_join_gpu::options::GpuOptions;
+
use sedona_query_planner::{
optimizer::register_spatial_join_logical_optimizer,
query_planner::SedonaQueryPlanner,
};
@@ -139,6 +142,9 @@ impl SedonaContext {
.with_las_extra_bytes(LasExtraBytes::Typed),
);
+ #[cfg(feature = "gpu")]
+ let session_config =
session_config.with_option_extension(GpuOptions::default());
+
#[allow(unused_mut)]
let mut state_builder = SessionStateBuilder::new()
.with_default_features()
@@ -154,6 +160,16 @@ impl SedonaContext {
planner = planner.with_spatial_join_physical_planner(Arc::new(
DefaultSpatialJoinPhysicalPlanner::new(),
));
+ // Register the GPU join after the default planer
+ // If a query is not supported, it falls back to the default
planner.
+ #[cfg(feature = "gpu")]
+ {
+ use
sedona_spatial_join_gpu::physical_planner::GpuSpatialJoinPhysicalPlanner;
+
+ planner = planner.with_spatial_join_physical_planner(Arc::new(
+ GpuSpatialJoinPhysicalPlanner::new(),
+ ));
+ }
}
state_builder =
register_spatial_join_logical_optimizer(state_builder)?;