This is an automated email from the ASF dual-hosted git repository.

fengzhang pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/sedona-db.git


The following commit(s) were added to refs/heads/main by this push:
     new 188ea8e5 feat(rust/sedona-spatial-join): Support sequential spatial 
index building option (#362)
188ea8e5 is described below

commit 188ea8e50aac68ecc6bb06631a32e80f04af6c04
Author: Feng Zhang <[email protected]>
AuthorDate: Thu Dec 11 07:02:01 2025 -0800

    feat(rust/sedona-spatial-join): Support sequential spatial index building 
option (#362)
---
 rust/sedona-common/src/option.rs                   |  4 +++
 rust/sedona-spatial-join/src/build_index.rs        | 30 +++++++++++++++++++---
 rust/sedona-spatial-join/src/exec.rs               | 13 ++++------
 rust/sedona-spatial-join/src/index.rs              |  4 +--
 .../sedona-spatial-join/src/index/spatial_index.rs |  2 +-
 .../src/index/spatial_index_builder.rs             |  4 +--
 rust/sedona-spatial-join/src/lib.rs                |  5 ++++
 7 files changed, 45 insertions(+), 17 deletions(-)

diff --git a/rust/sedona-common/src/option.rs b/rust/sedona-common/src/option.rs
index a788ba5e..fcd692fb 100644
--- a/rust/sedona-common/src/option.rs
+++ b/rust/sedona-common/src/option.rs
@@ -64,6 +64,10 @@ config_namespace! {
         /// The execution mode determining how prepared geometries are used
         pub execution_mode: ExecutionMode, default = 
ExecutionMode::Speculative(DEFAULT_SPECULATIVE_THRESHOLD)
 
+        /// Collect build side partitions concurrently (using spawned tasks).
+        /// Set to false for contexts where spawning new tasks is not 
supported.
+        pub concurrent_build_side_collection: bool, default = true
+
         /// Include tie-breakers in KNN join results when there are tied 
distances
         pub knn_include_tie_breakers: bool, default = false
     }
diff --git a/rust/sedona-spatial-join/src/build_index.rs 
b/rust/sedona-spatial-join/src/build_index.rs
index 15b7ea71..3600d294 100644
--- a/rust/sedona-spatial-join/src/build_index.rs
+++ b/rust/sedona-spatial-join/src/build_index.rs
@@ -33,7 +33,13 @@ use crate::{
     spatial_predicate::SpatialPredicate,
 };
 
-pub(crate) async fn build_index(
+/// Build a spatial index from the build side streams.
+///
+/// This function reads the `concurrent_build_side_collection` configuration 
from the context
+/// to determine whether to collect build side partitions concurrently (using 
spawned tasks)
+/// or sequentially (for JNI/embedded contexts without async runtime support).
+#[allow(clippy::too_many_arguments)]
+pub async fn build_index(
     context: Arc<TaskContext>,
     build_schema: SchemaRef,
     build_streams: Vec<SendableRecordBatchStream>,
@@ -49,6 +55,7 @@ pub(crate) async fn build_index(
         .get::<SedonaOptions>()
         .cloned()
         .unwrap_or_default();
+    let concurrent = 
sedona_options.spatial_join.concurrent_build_side_collection;
     let memory_pool = context.memory_pool();
     let evaluator =
         create_operand_evaluator(&spatial_predicate, 
sedona_options.spatial_join.clone());
@@ -64,9 +71,24 @@ pub(crate) async fn build_index(
         collect_metrics_vec.push(CollectBuildSideMetrics::new(k, &metrics));
     }
 
-    let build_partitions = collector
-        .collect_all(build_streams, reservations, collect_metrics_vec)
-        .await?;
+    let build_partitions = if concurrent {
+        // Collect partitions concurrently using collect_all which spawns tasks
+        collector
+            .collect_all(build_streams, reservations, collect_metrics_vec)
+            .await?
+    } else {
+        // Collect partitions sequentially (for JNI/embedded contexts)
+        let mut partitions = Vec::with_capacity(num_partitions);
+        for ((stream, reservation), metrics) in build_streams
+            .into_iter()
+            .zip(reservations)
+            .zip(&collect_metrics_vec)
+        {
+            let partition = collector.collect(stream, reservation, 
metrics).await?;
+            partitions.push(partition);
+        }
+        partitions
+    };
 
     let contains_external_stream = build_partitions
         .iter()
diff --git a/rust/sedona-spatial-join/src/exec.rs 
b/rust/sedona-spatial-join/src/exec.rs
index 7bf28cdb..1644f069 100644
--- a/rust/sedona-spatial-join/src/exec.rs
+++ b/rust/sedona-spatial-join/src/exec.rs
@@ -123,17 +123,16 @@ pub struct SpatialJoinExec {
     /// The schema after join. Please be careful when using this schema,
     /// if there is a projection, the schema isn't the same as the output 
schema.
     join_schema: SchemaRef,
-    metrics: ExecutionPlanMetricsSet,
+    /// Metrics for tracking execution statistics (public for wrapper 
implementations)
+    pub metrics: ExecutionPlanMetricsSet,
     /// The projection indices of the columns in the output schema of join
     projection: Option<Vec<usize>>,
     /// Information of index and left / right placement of columns
     column_indices: Vec<ColumnIndex>,
     /// Cache holding plan properties like equivalences, output partitioning 
etc.
     cache: PlanProperties,
-    /// Once future for building the spatial index.
-    /// This futures run only once before the spatial index probing phase. It 
can also be disposed
-    /// by the last finished stream so that the spatial index does not have to 
live as long as
-    /// `SpatialJoinExec`.
+    /// Spatial index built asynchronously on first execute() call and shared 
across all partitions.
+    /// Uses OnceAsync for lazy initialization coordinated via async runtime.
     once_async_spatial_index: Arc<Mutex<Option<OnceAsync<SpatialIndex>>>>,
     /// Indicates if this SpatialJoin was converted from a HashJoin
     /// When true, we preserve HashJoin's equivalence properties and 
partitioning
@@ -439,7 +438,7 @@ impl ExecutionPlan for SpatialJoinExec {
                 // Regular join semantics: left is build, right is probe
                 let (build_plan, probe_plan) = (&self.left, &self.right);
 
-                // Build the spatial index
+                // Build the spatial index using shared OnceAsync
                 let once_fut_spatial_index = {
                     let mut once_async = self.once_async_spatial_index.lock();
                     once_async
@@ -456,7 +455,6 @@ impl ExecutionPlan for SpatialJoinExec {
 
                             let probe_thread_count =
                                 
self.right.output_partitioning().partition_count();
-
                             Ok(build_index(
                                 Arc::clone(&context),
                                 build_side.schema(),
@@ -549,7 +547,6 @@ impl SpatialJoinExec {
                     }
 
                     let probe_thread_count = 
probe_plan.output_partitioning().partition_count();
-
                     Ok(build_index(
                         Arc::clone(&context),
                         build_side.schema(),
diff --git a/rust/sedona-spatial-join/src/index.rs 
b/rust/sedona-spatial-join/src/index.rs
index c6fa542c..55df23d5 100644
--- a/rust/sedona-spatial-join/src/index.rs
+++ b/rust/sedona-spatial-join/src/index.rs
@@ -23,8 +23,8 @@ pub(crate) mod spatial_index_builder;
 pub(crate) use build_side_collector::{
     BuildPartition, BuildSideBatchesCollector, CollectBuildSideMetrics,
 };
-pub(crate) use spatial_index::SpatialIndex;
-pub(crate) use spatial_index_builder::{SpatialIndexBuilder, 
SpatialJoinBuildMetrics};
+pub use spatial_index::SpatialIndex;
+pub use spatial_index_builder::{SpatialIndexBuilder, SpatialJoinBuildMetrics};
 use wkb::reader::Wkb;
 
 /// The result of a spatial index query
diff --git a/rust/sedona-spatial-join/src/index/spatial_index.rs 
b/rust/sedona-spatial-join/src/index/spatial_index.rs
index 09158d49..d70f2a63 100644
--- a/rust/sedona-spatial-join/src/index/spatial_index.rs
+++ b/rust/sedona-spatial-join/src/index/spatial_index.rs
@@ -48,7 +48,7 @@ use crate::{
 use arrow::array::BooleanBufferBuilder;
 use sedona_common::{option::SpatialJoinOptions, ExecutionMode};
 
-pub(crate) struct SpatialIndex {
+pub struct SpatialIndex {
     pub(crate) schema: SchemaRef,
 
     /// The spatial predicate evaluator for the spatial predicate.
diff --git a/rust/sedona-spatial-join/src/index/spatial_index_builder.rs 
b/rust/sedona-spatial-join/src/index/spatial_index_builder.rs
index 2b07a829..0ab90322 100644
--- a/rust/sedona-spatial-join/src/index/spatial_index_builder.rs
+++ b/rust/sedona-spatial-join/src/index/spatial_index_builder.rs
@@ -59,7 +59,7 @@ const REFINER_RESERVATION_PREALLOC_SIZE: usize = 10 * 1024 * 
1024; // 10MB
 /// 2. Building the spatial R-tree index
 /// 3. Setting up memory tracking and visited bitmaps
 /// 4. Configuring prepared geometries based on execution mode
-pub(crate) struct SpatialIndexBuilder {
+pub struct SpatialIndexBuilder {
     schema: SchemaRef,
     spatial_predicate: SpatialPredicate,
     options: SpatialJoinOptions,
@@ -81,7 +81,7 @@ pub(crate) struct SpatialIndexBuilder {
 
 /// Metrics for the build phase of the spatial join.
 #[derive(Clone, Debug, Default)]
-pub(crate) struct SpatialJoinBuildMetrics {
+pub struct SpatialJoinBuildMetrics {
     /// Total time for collecting build-side of join
     pub(crate) build_time: metrics::Time,
     /// Memory used by the spatial-index in bytes
diff --git a/rust/sedona-spatial-join/src/lib.rs 
b/rust/sedona-spatial-join/src/lib.rs
index 592ac2e7..6ad60008 100644
--- a/rust/sedona-spatial-join/src/lib.rs
+++ b/rust/sedona-spatial-join/src/lib.rs
@@ -29,5 +29,10 @@ pub mod utils;
 pub use exec::SpatialJoinExec;
 pub use optimizer::register_spatial_join_optimizer;
 
+// Re-export types needed for external usage (e.g., in Comet)
+pub use build_index::build_index;
+pub use index::{SpatialIndex, SpatialJoinBuildMetrics};
+pub use spatial_predicate::SpatialPredicate;
+
 // Re-export option types from sedona-common for convenience
 pub use sedona_common::option::*;

Reply via email to