This is an automated email from the ASF dual-hosted git repository.
fengzhang pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/sedona-db.git
The following commit(s) were added to refs/heads/main by this push:
new 188ea8e5 feat(rust/sedona-spatial-join): Support sequential spatial
index building option (#362)
188ea8e5 is described below
commit 188ea8e50aac68ecc6bb06631a32e80f04af6c04
Author: Feng Zhang <[email protected]>
AuthorDate: Thu Dec 11 07:02:01 2025 -0800
feat(rust/sedona-spatial-join): Support sequential spatial index building
option (#362)
---
rust/sedona-common/src/option.rs | 4 +++
rust/sedona-spatial-join/src/build_index.rs | 30 +++++++++++++++++++---
rust/sedona-spatial-join/src/exec.rs | 13 ++++------
rust/sedona-spatial-join/src/index.rs | 4 +--
.../sedona-spatial-join/src/index/spatial_index.rs | 2 +-
.../src/index/spatial_index_builder.rs | 4 +--
rust/sedona-spatial-join/src/lib.rs | 5 ++++
7 files changed, 45 insertions(+), 17 deletions(-)
diff --git a/rust/sedona-common/src/option.rs b/rust/sedona-common/src/option.rs
index a788ba5e..fcd692fb 100644
--- a/rust/sedona-common/src/option.rs
+++ b/rust/sedona-common/src/option.rs
@@ -64,6 +64,10 @@ config_namespace! {
/// The execution mode determining how prepared geometries are used
pub execution_mode: ExecutionMode, default =
ExecutionMode::Speculative(DEFAULT_SPECULATIVE_THRESHOLD)
+ /// Collect build side partitions concurrently (using spawned tasks).
+ /// Set to false for contexts where spawning new tasks is not
supported.
+ pub concurrent_build_side_collection: bool, default = true
+
/// Include tie-breakers in KNN join results when there are tied
distances
pub knn_include_tie_breakers: bool, default = false
}
diff --git a/rust/sedona-spatial-join/src/build_index.rs
b/rust/sedona-spatial-join/src/build_index.rs
index 15b7ea71..3600d294 100644
--- a/rust/sedona-spatial-join/src/build_index.rs
+++ b/rust/sedona-spatial-join/src/build_index.rs
@@ -33,7 +33,13 @@ use crate::{
spatial_predicate::SpatialPredicate,
};
-pub(crate) async fn build_index(
+/// Build a spatial index from the build side streams.
+///
+/// This function reads the `concurrent_build_side_collection` configuration
from the context
+/// to determine whether to collect build side partitions concurrently (using
spawned tasks)
+/// or sequentially (for JNI/embedded contexts without async runtime support).
+#[allow(clippy::too_many_arguments)]
+pub async fn build_index(
context: Arc<TaskContext>,
build_schema: SchemaRef,
build_streams: Vec<SendableRecordBatchStream>,
@@ -49,6 +55,7 @@ pub(crate) async fn build_index(
.get::<SedonaOptions>()
.cloned()
.unwrap_or_default();
+ let concurrent =
sedona_options.spatial_join.concurrent_build_side_collection;
let memory_pool = context.memory_pool();
let evaluator =
create_operand_evaluator(&spatial_predicate,
sedona_options.spatial_join.clone());
@@ -64,9 +71,24 @@ pub(crate) async fn build_index(
collect_metrics_vec.push(CollectBuildSideMetrics::new(k, &metrics));
}
- let build_partitions = collector
- .collect_all(build_streams, reservations, collect_metrics_vec)
- .await?;
+ let build_partitions = if concurrent {
+ // Collect partitions concurrently using collect_all which spawns tasks
+ collector
+ .collect_all(build_streams, reservations, collect_metrics_vec)
+ .await?
+ } else {
+ // Collect partitions sequentially (for JNI/embedded contexts)
+ let mut partitions = Vec::with_capacity(num_partitions);
+ for ((stream, reservation), metrics) in build_streams
+ .into_iter()
+ .zip(reservations)
+ .zip(&collect_metrics_vec)
+ {
+ let partition = collector.collect(stream, reservation,
metrics).await?;
+ partitions.push(partition);
+ }
+ partitions
+ };
let contains_external_stream = build_partitions
.iter()
diff --git a/rust/sedona-spatial-join/src/exec.rs
b/rust/sedona-spatial-join/src/exec.rs
index 7bf28cdb..1644f069 100644
--- a/rust/sedona-spatial-join/src/exec.rs
+++ b/rust/sedona-spatial-join/src/exec.rs
@@ -123,17 +123,16 @@ pub struct SpatialJoinExec {
/// The schema after join. Please be careful when using this schema,
/// if there is a projection, the schema isn't the same as the output
schema.
join_schema: SchemaRef,
- metrics: ExecutionPlanMetricsSet,
+ /// Metrics for tracking execution statistics (public for wrapper
implementations)
+ pub metrics: ExecutionPlanMetricsSet,
/// The projection indices of the columns in the output schema of join
projection: Option<Vec<usize>>,
/// Information of index and left / right placement of columns
column_indices: Vec<ColumnIndex>,
/// Cache holding plan properties like equivalences, output partitioning
etc.
cache: PlanProperties,
- /// Once future for building the spatial index.
- /// This futures run only once before the spatial index probing phase. It
can also be disposed
- /// by the last finished stream so that the spatial index does not have to
live as long as
- /// `SpatialJoinExec`.
+ /// Spatial index built asynchronously on first execute() call and shared
across all partitions.
+ /// Uses OnceAsync for lazy initialization coordinated via async runtime.
once_async_spatial_index: Arc<Mutex<Option<OnceAsync<SpatialIndex>>>>,
/// Indicates if this SpatialJoin was converted from a HashJoin
/// When true, we preserve HashJoin's equivalence properties and
partitioning
@@ -439,7 +438,7 @@ impl ExecutionPlan for SpatialJoinExec {
// Regular join semantics: left is build, right is probe
let (build_plan, probe_plan) = (&self.left, &self.right);
- // Build the spatial index
+ // Build the spatial index using shared OnceAsync
let once_fut_spatial_index = {
let mut once_async = self.once_async_spatial_index.lock();
once_async
@@ -456,7 +455,6 @@ impl ExecutionPlan for SpatialJoinExec {
let probe_thread_count =
self.right.output_partitioning().partition_count();
-
Ok(build_index(
Arc::clone(&context),
build_side.schema(),
@@ -549,7 +547,6 @@ impl SpatialJoinExec {
}
let probe_thread_count =
probe_plan.output_partitioning().partition_count();
-
Ok(build_index(
Arc::clone(&context),
build_side.schema(),
diff --git a/rust/sedona-spatial-join/src/index.rs
b/rust/sedona-spatial-join/src/index.rs
index c6fa542c..55df23d5 100644
--- a/rust/sedona-spatial-join/src/index.rs
+++ b/rust/sedona-spatial-join/src/index.rs
@@ -23,8 +23,8 @@ pub(crate) mod spatial_index_builder;
pub(crate) use build_side_collector::{
BuildPartition, BuildSideBatchesCollector, CollectBuildSideMetrics,
};
-pub(crate) use spatial_index::SpatialIndex;
-pub(crate) use spatial_index_builder::{SpatialIndexBuilder,
SpatialJoinBuildMetrics};
+pub use spatial_index::SpatialIndex;
+pub use spatial_index_builder::{SpatialIndexBuilder, SpatialJoinBuildMetrics};
use wkb::reader::Wkb;
/// The result of a spatial index query
diff --git a/rust/sedona-spatial-join/src/index/spatial_index.rs
b/rust/sedona-spatial-join/src/index/spatial_index.rs
index 09158d49..d70f2a63 100644
--- a/rust/sedona-spatial-join/src/index/spatial_index.rs
+++ b/rust/sedona-spatial-join/src/index/spatial_index.rs
@@ -48,7 +48,7 @@ use crate::{
use arrow::array::BooleanBufferBuilder;
use sedona_common::{option::SpatialJoinOptions, ExecutionMode};
-pub(crate) struct SpatialIndex {
+pub struct SpatialIndex {
pub(crate) schema: SchemaRef,
/// The spatial predicate evaluator for the spatial predicate.
diff --git a/rust/sedona-spatial-join/src/index/spatial_index_builder.rs
b/rust/sedona-spatial-join/src/index/spatial_index_builder.rs
index 2b07a829..0ab90322 100644
--- a/rust/sedona-spatial-join/src/index/spatial_index_builder.rs
+++ b/rust/sedona-spatial-join/src/index/spatial_index_builder.rs
@@ -59,7 +59,7 @@ const REFINER_RESERVATION_PREALLOC_SIZE: usize = 10 * 1024 *
1024; // 10MB
/// 2. Building the spatial R-tree index
/// 3. Setting up memory tracking and visited bitmaps
/// 4. Configuring prepared geometries based on execution mode
-pub(crate) struct SpatialIndexBuilder {
+pub struct SpatialIndexBuilder {
schema: SchemaRef,
spatial_predicate: SpatialPredicate,
options: SpatialJoinOptions,
@@ -81,7 +81,7 @@ pub(crate) struct SpatialIndexBuilder {
/// Metrics for the build phase of the spatial join.
#[derive(Clone, Debug, Default)]
-pub(crate) struct SpatialJoinBuildMetrics {
+pub struct SpatialJoinBuildMetrics {
/// Total time for collecting build-side of join
pub(crate) build_time: metrics::Time,
/// Memory used by the spatial-index in bytes
diff --git a/rust/sedona-spatial-join/src/lib.rs
b/rust/sedona-spatial-join/src/lib.rs
index 592ac2e7..6ad60008 100644
--- a/rust/sedona-spatial-join/src/lib.rs
+++ b/rust/sedona-spatial-join/src/lib.rs
@@ -29,5 +29,10 @@ pub mod utils;
pub use exec::SpatialJoinExec;
pub use optimizer::register_spatial_join_optimizer;
+// Re-export types needed for external usage (e.g., in Comet)
+pub use build_index::build_index;
+pub use index::{SpatialIndex, SpatialJoinBuildMetrics};
+pub use spatial_predicate::SpatialPredicate;
+
// Re-export option types from sedona-common for convenience
pub use sedona_common::option::*;