This is an automated email from the ASF dual-hosted git repository.
kontinuation pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/sedona-db.git
The following commit(s) were added to refs/heads/main by this push:
new e31d5540 chore(rust/sedona-spatial-join): Integrate bounding box
sampler and spilling support into build side collector (#542)
e31d5540 is described below
commit e31d55402c1bd8a820469c381d954f2e7c1c9afe
Author: Kristin Cowalcijk <[email protected]>
AuthorDate: Tue Jan 27 10:29:59 2026 +0800
chore(rust/sedona-spatial-join): Integrate bounding box sampler and
spilling support into build side collector (#542)
This is an intermediate step that integrates multiple pieces together. The
overall behavior is unchanged when memory limit of DataFusion is not set (which
is the default case). The collected bounding box samples are unused for now,
the performance overhead of sampling boxes is negligible according to our
tests. Committing this won't drive the project into an unstable or unreleasable
state.
The next step will be adding a partitioned spatial index provider and
integrate spatial partitioner into the main spatial join workflow, but will
effectively only work on one single partition for now. This will also be a
non-breaking change.
---
Cargo.lock | 1 +
Cargo.toml | 1 +
rust/sedona-common/src/option.rs | 97 +++++
rust/sedona-geometry/src/bounding_box.rs | 15 +
rust/sedona-spatial-join/Cargo.toml | 1 +
rust/sedona-spatial-join/src/build_index.rs | 28 +-
rust/sedona-spatial-join/src/exec.rs | 22 +-
.../src/index/build_side_collector.rs | 431 +++++++++++++++++++--
rust/sedona-spatial-join/src/optimizer.rs | 20 +-
9 files changed, 571 insertions(+), 45 deletions(-)
diff --git a/Cargo.lock b/Cargo.lock
index 22f5e45a..bc6799a9 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -5404,6 +5404,7 @@ dependencies = [
"geo-traits",
"geo-types",
"geos",
+ "log",
"once_cell",
"parking_lot",
"pin-project-lite",
diff --git a/Cargo.toml b/Cargo.toml
index 04dede88..85ff5f43 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -90,6 +90,7 @@ datafusion-physical-expr = { version = "51.0.0" }
datafusion-physical-plan = { version = "51.0.0" }
dirs = "6.0.0"
env_logger = "0.11"
+log = "^0.4"
fastrand = "2.0"
futures = "0.3"
pin-project-lite = "0.2"
diff --git a/rust/sedona-common/src/option.rs b/rust/sedona-common/src/option.rs
index bc74acf7..e5290688 100644
--- a/rust/sedona-common/src/option.rs
+++ b/rust/sedona-common/src/option.rs
@@ -71,12 +71,94 @@ config_namespace! {
/// Include tie-breakers in KNN join results when there are tied
distances
pub knn_include_tie_breakers: bool, default = false
+ /// Maximum number of sample bounding boxes collected from the index
side for partitioning the
+ /// data when running out-of-core spatial join
+ pub max_index_side_bbox_samples: usize, default = 10000
+
+ /// Minimum number of sample bounding boxes collected from the index
side for partitioning the
+ /// data when running out-of-core spatial join
+ pub min_index_side_bbox_samples: usize, default = 1000
+
+ /// Target sampling rate for sampling bounding boxes from the index
side for partitioning the
+ /// data when running out-of-core spatial join
+ pub target_index_side_bbox_sampling_rate: f64, default = 0.01
+
+ /// The in memory size threshold of batches written to spill files. If
the spilled batch is
+ /// too large, it will be broken into several smaller parts before
written to spill files.
+ /// This is for avoiding overshooting the memory limit when reading
spilled batches from
+ /// spill files. Specify 0 for unlimited size.
+ pub spilled_batch_in_memory_size_threshold: usize, default = 0
+
/// The minimum number of geometry pairs per chunk required to enable
parallel
/// refinement during the spatial join operation. When the refinement
phase has
/// fewer geometry pairs than this threshold, it will run sequentially
instead
/// of spawning parallel tasks. Higher values reduce parallelization
overhead
/// for small datasets, while lower values enable more fine-grained
parallelism.
pub parallel_refinement_chunk_size: usize, default = 8192
+
+ /// Options for debugging or testing spatial join
+ pub debug : SpatialJoinDebugOptions, default =
SpatialJoinDebugOptions::default()
+ }
+}
+
+config_namespace! {
+ /// Configurations for debugging or testing spatial join
+ pub struct SpatialJoinDebugOptions {
+ /// Number of spatial partitions to use for spatial join
+ pub num_spatial_partitions: NumSpatialPartitionsConfig, default =
NumSpatialPartitionsConfig::Auto
+
+ /// The amount of memory for intermittent usage such as spatially
repartitioning the data
+ pub memory_for_intermittent_usage: Option<usize>, default = None
+
+ /// Force spilling while collecting the build side or not
+ pub force_spill: bool, default = false
+
+ /// Seed for random processes in the spatial join for testing purpose
+ pub random_seed: Option<u64>, default = None
+ }
+}
+
+#[derive(Debug, PartialEq, Clone, Copy)]
+pub enum NumSpatialPartitionsConfig {
+ /// Automatically determine the number of spatial partitions
+ Auto,
+
+ /// Use a fixed number of spatial partitions
+ Fixed(usize),
+}
+
+impl ConfigField for NumSpatialPartitionsConfig {
+ fn visit<V: Visit>(&self, v: &mut V, key: &str, description: &'static str)
{
+ let value = match self {
+ NumSpatialPartitionsConfig::Auto => "auto".into(),
+ NumSpatialPartitionsConfig::Fixed(n) => format!("{n}"),
+ };
+ v.some(key, value, description);
+ }
+
+ fn set(&mut self, _key: &str, value: &str) -> Result<()> {
+ let value = value.to_lowercase();
+ let config = match value.as_str() {
+ "auto" => NumSpatialPartitionsConfig::Auto,
+ _ => match value.parse::<usize>() {
+ Ok(n) => {
+ if n > 0 {
+ NumSpatialPartitionsConfig::Fixed(n)
+ } else {
+ return
Err(datafusion_common::DataFusionError::Configuration(
+ "num_spatial_partitions must be greater than
0".to_string(),
+ ));
+ }
+ }
+ Err(_) => {
+ return
Err(datafusion_common::DataFusionError::Configuration(format!(
+ "Unknown num_spatial_partitions config: {value}.
Expected formats: auto, <number>"
+ )));
+ }
+ },
+ };
+ *self = config;
+ Ok(())
}
}
@@ -421,4 +503,19 @@ mod tests {
assert!(index_type.set("", "invalid").is_err());
assert!(index_type.set("", "").is_err());
}
+
+ #[test]
+ fn test_num_spatial_partitions_config_parsing() {
+ let mut config = NumSpatialPartitionsConfig::Auto;
+
+ assert!(config.set("", "auto").is_ok());
+ assert_eq!(config, NumSpatialPartitionsConfig::Auto);
+
+ assert!(config.set("", "10").is_ok());
+ assert_eq!(config, NumSpatialPartitionsConfig::Fixed(10));
+
+ assert!(config.set("", "0").is_err());
+ assert!(config.set("", "invalid").is_err());
+ assert!(config.set("", "fixed[10]").is_err());
+ }
}
diff --git a/rust/sedona-geometry/src/bounding_box.rs
b/rust/sedona-geometry/src/bounding_box.rs
index 7a018fb5..2da191fd 100644
--- a/rust/sedona-geometry/src/bounding_box.rs
+++ b/rust/sedona-geometry/src/bounding_box.rs
@@ -69,6 +69,16 @@ impl BoundingBox {
}
}
+ /// Create an empty BoundingBox
+ pub fn empty() -> Self {
+ Self {
+ x: WraparoundInterval::empty(),
+ y: Interval::empty(),
+ z: None,
+ m: None,
+ }
+ }
+
/// The x interval
pub fn x(&self) -> &WraparoundInterval {
&self.x
@@ -91,6 +101,11 @@ impl BoundingBox {
&self.m
}
+ /// Check whether this BoundingBox is empty
+ pub fn is_empty(&self) -> bool {
+ self.x.is_empty() || self.y.is_empty()
+ }
+
/// Calculate intersection with another BoundingBox
///
/// Returns true if this bounding box may intersect other or false
otherwise. This
diff --git a/rust/sedona-spatial-join/Cargo.toml
b/rust/sedona-spatial-join/Cargo.toml
index 010c2ead..322ec572 100644
--- a/rust/sedona-spatial-join/Cargo.toml
+++ b/rust/sedona-spatial-join/Cargo.toml
@@ -66,6 +66,7 @@ geo-index = { workspace = true }
geos = { workspace = true }
float_next_after = { workspace = true }
fastrand = { workspace = true }
+log = { workspace = true }
[dev-dependencies]
criterion = { workspace = true }
diff --git a/rust/sedona-spatial-join/src/build_index.rs
b/rust/sedona-spatial-join/src/build_index.rs
index 5a171007..f3cbb34b 100644
--- a/rust/sedona-spatial-join/src/build_index.rs
+++ b/rust/sedona-spatial-join/src/build_index.rs
@@ -46,7 +46,12 @@ pub async fn build_index(
join_type: JoinType,
probe_threads_count: usize,
metrics: ExecutionPlanMetricsSet,
+ seed: u64,
) -> Result<SpatialIndex> {
+ log::debug!(
+ "Building spatial index for running spatial join, seed = {}",
+ seed
+ );
let session_config = context.session_config();
let sedona_options = session_config
.options()
@@ -55,10 +60,14 @@ pub async fn build_index(
.cloned()
.unwrap_or_default();
let concurrent =
sedona_options.spatial_join.concurrent_build_side_collection;
+ let runtime_env = context.runtime_env();
+ let spill_compression = session_config.spill_compression();
let memory_pool = context.memory_pool();
let collector = BuildSideBatchesCollector::new(
spatial_predicate.clone(),
sedona_options.spatial_join.clone(),
+ Arc::clone(&runtime_env),
+ spill_compression,
);
let num_partitions = build_streams.len();
let mut collect_metrics_vec = Vec::with_capacity(num_partitions);
@@ -72,12 +81,23 @@ pub async fn build_index(
}
let build_partitions = collector
- .collect_all(build_streams, reservations, collect_metrics_vec,
concurrent)
+ .collect_all(
+ build_streams,
+ reservations,
+ collect_metrics_vec,
+ concurrent,
+ seed,
+ )
.await?;
- let contains_external_stream = build_partitions
- .iter()
- .any(|partition| partition.build_side_batch_stream.is_external());
+ let contains_external_stream = build_partitions.iter().any(|partition| {
+ // Access fields to avoid unused variable warnings. Will be removed
when out-of-core
+ // spatial join (https://github.com/apache/sedona-db/issues/436) is
fully implemented.
+ let _ = partition.num_rows;
+ let _ = partition.bbox_samples;
+ let _ = partition.estimated_spatial_index_memory_usage;
+ partition.build_side_batch_stream.is_external()
+ });
if !contains_external_stream {
let mut index_builder = SpatialIndexBuilder::new(
build_schema,
diff --git a/rust/sedona-spatial-join/src/exec.rs
b/rust/sedona-spatial-join/src/exec.rs
index 43b73290..50cbd171 100644
--- a/rust/sedona-spatial-join/src/exec.rs
+++ b/rust/sedona-spatial-join/src/exec.rs
@@ -33,6 +33,7 @@ use datafusion_physical_plan::{
PlanProperties,
};
use parking_lot::Mutex;
+use sedona_common::SpatialJoinOptions;
use crate::{
build_index::build_index,
@@ -137,6 +138,8 @@ pub struct SpatialJoinExec {
/// Indicates if this SpatialJoin was converted from a HashJoin
/// When true, we preserve HashJoin's equivalence properties and
partitioning
converted_from_hash_join: bool,
+ /// A random seed for making random procedures in spatial join
deterministic
+ seed: u64,
}
impl SpatialJoinExec {
@@ -148,11 +151,15 @@ impl SpatialJoinExec {
filter: Option<JoinFilter>,
join_type: &JoinType,
projection: Option<Vec<usize>>,
+ options: &SpatialJoinOptions,
) -> Result<Self> {
- Self::try_new_with_options(left, right, on, filter, join_type,
projection, false)
+ Self::try_new_with_options(
+ left, right, on, filter, join_type, projection, options, false,
+ )
}
/// Create a new SpatialJoinExec with additional options
+ #[allow(clippy::too_many_arguments)]
pub fn try_new_with_options(
left: Arc<dyn ExecutionPlan>,
right: Arc<dyn ExecutionPlan>,
@@ -160,6 +167,7 @@ impl SpatialJoinExec {
filter: Option<JoinFilter>,
join_type: &JoinType,
projection: Option<Vec<usize>>,
+ options: &SpatialJoinOptions,
converted_from_hash_join: bool,
) -> Result<Self> {
let left_schema = left.schema();
@@ -179,6 +187,11 @@ impl SpatialJoinExec {
converted_from_hash_join,
)?;
+ let seed = options
+ .debug
+ .random_seed
+ .unwrap_or(fastrand::u64(0..0xFFFF));
+
Ok(SpatialJoinExec {
left,
right,
@@ -192,6 +205,7 @@ impl SpatialJoinExec {
cache,
once_async_spatial_index: Arc::new(Mutex::new(None)),
converted_from_hash_join,
+ seed,
})
}
@@ -419,6 +433,7 @@ impl ExecutionPlan for SpatialJoinExec {
cache: self.cache.clone(),
once_async_spatial_index: Arc::new(Mutex::new(None)),
converted_from_hash_join: self.converted_from_hash_join,
+ seed: self.seed,
}))
}
@@ -472,6 +487,7 @@ impl ExecutionPlan for SpatialJoinExec {
self.join_type,
probe_thread_count,
self.metrics.clone(),
+ self.seed,
))
})?
};
@@ -563,6 +579,7 @@ impl SpatialJoinExec {
self.join_type,
probe_thread_count,
self.metrics.clone(),
+ self.seed,
))
})?
};
@@ -1330,7 +1347,7 @@ mod tests {
let sql = "SELECT * FROM L LEFT JOIN R ON ST_Intersects(L.geometry,
R.geometry)";
// Create SpatialJoinExec plan
- let ctx = setup_context(Some(options), batch_size)?;
+ let ctx = setup_context(Some(options.clone()), batch_size)?;
ctx.register_table("L", mem_table_left.clone())?;
ctx.register_table("R", mem_table_right.clone())?;
let df = ctx.sql(sql).await?;
@@ -1345,6 +1362,7 @@ mod tests {
original_exec.filter.clone(),
&join_type,
None,
+ &options,
)?;
// Create NestedLoopJoinExec plan for comparison
diff --git a/rust/sedona-spatial-join/src/index/build_side_collector.rs
b/rust/sedona-spatial-join/src/index/build_side_collector.rs
index 89537592..646c6be2 100644
--- a/rust/sedona-spatial-join/src/index/build_side_collector.rs
+++ b/rust/sedona-spatial-join/src/index/build_side_collector.rs
@@ -17,12 +17,17 @@
use std::sync::Arc;
-use datafusion_common::Result;
+use datafusion::config::SpillCompression;
+use datafusion_common::{DataFusionError, Result};
use datafusion_common_runtime::JoinSet;
-use datafusion_execution::{memory_pool::MemoryReservation,
SendableRecordBatchStream};
-use datafusion_physical_plan::metrics::{self, ExecutionPlanMetricsSet,
MetricBuilder};
+use datafusion_execution::{
+ memory_pool::MemoryReservation, runtime_env::RuntimeEnv,
SendableRecordBatchStream,
+};
+use datafusion_physical_plan::metrics::{
+ self, ExecutionPlanMetricsSet, MetricBuilder, SpillMetrics,
+};
use futures::StreamExt;
-use sedona_common::SpatialJoinOptions;
+use sedona_common::{sedona_internal_err, SpatialJoinOptions};
use sedona_expr::statistics::GeoStatistics;
use sedona_functions::st_analyze_agg::AnalyzeAccumulator;
use sedona_schema::datatypes::WKB_GEOMETRY;
@@ -30,27 +35,38 @@ use sedona_schema::datatypes::WKB_GEOMETRY;
use crate::{
evaluated_batch::{
evaluated_batch_stream::{
- evaluate::create_evaluated_build_stream,
in_mem::InMemoryEvaluatedBatchStream,
- SendableEvaluatedBatchStream,
+ evaluate::create_evaluated_build_stream,
external::ExternalEvaluatedBatchStream,
+ in_mem::InMemoryEvaluatedBatchStream, SendableEvaluatedBatchStream,
},
+ spill::EvaluatedBatchSpillWriter,
EvaluatedBatch,
},
index::SpatialIndexBuilder,
operand_evaluator::{create_operand_evaluator, OperandEvaluator},
- SpatialPredicate,
+ spatial_predicate::SpatialPredicate,
+ utils::bbox_sampler::{BoundingBoxSampler, BoundingBoxSamples},
};
-/// Safety buffer applied when pre-growing build-side reservations to leave
headroom for
-/// auxiliary structures beyond the build batches themselves.
-/// 20% was chosen as a conservative margin.
-const BUILD_SIDE_RESERVATION_BUFFER_RATIO: f64 = 0.20;
-
pub(crate) struct BuildPartition {
+ pub num_rows: usize,
pub build_side_batch_stream: SendableEvaluatedBatchStream,
pub geo_statistics: GeoStatistics,
- /// Memory reservation for tracking the memory usage of the build partition
- /// Cleared on `BuildPartition` drop
+ /// Subset of build-side bounding boxes kept for building partitioners
(e.g. KDB partitioner)
+ /// when the indexed data cannot be fully loaded into memory.
+ pub bbox_samples: BoundingBoxSamples,
+
+ /// The estimated memory usage of building spatial index from all the data
+ /// collected in this partition. The estimated memory used by the global
+ /// spatial index will be the sum of these per-partition estimation.
+ pub estimated_spatial_index_memory_usage: usize,
+
+ /// Memory reservation for tracking the maximum memory usage when
collecting
+ /// the build side. This reservation won't be freed even when spilling is
+ /// triggered. We deliberately only grow the memory reservation to probe
+ /// the amount of memory available for loading spatial index into memory.
+ /// The size of this reservation will be used to determine the maximum
size of
+ /// each spatial partition, as well as how many spatial partitions to
create.
pub reservation: MemoryReservation,
}
@@ -63,8 +79,11 @@ pub(crate) struct BuildSideBatchesCollector {
spatial_predicate: SpatialPredicate,
spatial_join_options: SpatialJoinOptions,
evaluator: Arc<dyn OperandEvaluator>,
+ runtime_env: Arc<RuntimeEnv>,
+ spill_compression: SpillCompression,
}
+#[derive(Clone)]
pub(crate) struct CollectBuildSideMetrics {
/// Number of batches collected
num_batches: metrics::Count,
@@ -77,6 +96,8 @@ pub(crate) struct CollectBuildSideMetrics {
/// Total time taken to collect and process the build side batches. This
does not include the time awaiting
/// for batches from the input stream.
time_taken: metrics::Time,
+ /// Spill metrics of build partitions collecting phase
+ spill_metrics: SpillMetrics,
}
impl CollectBuildSideMetrics {
@@ -88,6 +109,7 @@ impl CollectBuildSideMetrics {
.gauge("build_input_total_size_bytes", partition),
time_taken: MetricBuilder::new(metrics)
.subset_time("build_input_collection_time", partition),
+ spill_metrics: SpillMetrics::new(metrics, partition),
}
}
}
@@ -96,41 +118,95 @@ impl BuildSideBatchesCollector {
pub fn new(
spatial_predicate: SpatialPredicate,
spatial_join_options: SpatialJoinOptions,
+ runtime_env: Arc<RuntimeEnv>,
+ spill_compression: SpillCompression,
) -> Self {
let evaluator = create_operand_evaluator(&spatial_predicate,
spatial_join_options.clone());
BuildSideBatchesCollector {
spatial_predicate,
spatial_join_options,
evaluator,
+ runtime_env,
+ spill_compression,
}
}
+ /// Collect build-side batches from the stream into a `BuildPartition`.
+ ///
+ /// This method grows the given memory reservation as if an in-memory
spatial
+ /// index will be built for all collected batches. If the reservation
cannot
+ /// be grown, batches are spilled to disk and the reservation is left at
its
+ /// peak value.
+ ///
+ /// The reservation represents memory available for loading the spatial
index.
+ /// Across all partitions, the sum of their reservations forms a soft
memory
+ /// cap for subsequent spatial join operations. Reservations grown here are
+ /// not released until the spatial join operator completes.
pub async fn collect(
&self,
mut stream: SendableEvaluatedBatchStream,
mut reservation: MemoryReservation,
+ mut bbox_sampler: BoundingBoxSampler,
metrics: &CollectBuildSideMetrics,
) -> Result<BuildPartition> {
+ let mut spill_writer_opt = None;
let mut in_mem_batches: Vec<EvaluatedBatch> = Vec::new();
+ let mut total_num_rows = 0;
+ let mut total_size_bytes = 0;
let mut analyzer = AnalyzeAccumulator::new(WKB_GEOMETRY, WKB_GEOMETRY);
+ // Reserve memory for holding bbox samples. This should be a small
reservation.
+ // We simply return error if the reservation cannot be fulfilled,
since there's
+ // too little memory for the collector and proceeding will risk
overshooting the
+ // memory limit.
+ reservation.try_grow(bbox_sampler.estimate_maximum_memory_usage())?;
+
while let Some(evaluated_batch) = stream.next().await {
let build_side_batch = evaluated_batch?;
let _timer = metrics.time_taken.timer();
- // Process the record batch and create a BuildSideBatch
let geom_array = &build_side_batch.geom_array;
for wkb in geom_array.wkbs().iter().flatten() {
- analyzer.update_statistics(wkb)?;
+ let summary = sedona_geometry::analyze::analyze_geometry(wkb)
+ .map_err(|e| DataFusionError::External(Box::new(e)))?;
+ if !summary.bbox.is_empty() {
+ bbox_sampler.add_bbox(&summary.bbox);
+ }
+ analyzer.ingest_geometry_summary(&summary);
}
+ let num_rows = build_side_batch.num_rows();
let in_mem_size = build_side_batch.in_mem_size()?;
+ total_num_rows += num_rows;
+ total_size_bytes += in_mem_size;
+
metrics.num_batches.add(1);
- metrics.num_rows.add(build_side_batch.num_rows());
+ metrics.num_rows.add(num_rows);
metrics.total_size_bytes.add(in_mem_size);
- reservation.try_grow(in_mem_size)?;
- in_mem_batches.push(build_side_batch);
+ match &mut spill_writer_opt {
+ None => {
+ // Collected batches are in memory, no spilling happened
for this partition before. We'll try
+ // storing this batch in memory first, and switch to
writing everything to disk if we fail
+ // to grow the reservation.
+ in_mem_batches.push(build_side_batch);
+ if let Err(e) = reservation.try_grow(in_mem_size) {
+ log::debug!(
+ "Failed to grow reservation by {} bytes. Current
reservation: {} bytes. \
+ num rows: {}, reason: {:?}, Spilling...",
+ in_mem_size,
+ reservation.size(),
+ num_rows,
+ e,
+ );
+ spill_writer_opt =
+ self.spill_in_mem_batches(&mut in_mem_batches,
metrics)?;
+ }
+ }
+ Some(spill_writer) => {
+ spill_writer.append(&build_side_batch)?;
+ }
+ }
}
let geo_statistics = analyzer.finish();
@@ -140,19 +216,55 @@ impl BuildSideBatchesCollector {
&self.spatial_join_options,
);
- // Try to grow the reservation with a safety buffer to leave room for
additional data structures
- let buffer_bytes = ((extra_mem + reservation.size()) as f64
- * BUILD_SIDE_RESERVATION_BUFFER_RATIO)
- .ceil() as usize;
- let additional_reservation = extra_mem + buffer_bytes;
- reservation.try_grow(additional_reservation)?;
+ // Try to grow the reservation a bit more to account for any
underestimation of
+ // memory usage. We proceed even when the growth fails.
+ let additional_reservation = extra_mem + (extra_mem +
reservation.size()) / 5;
+ if let Err(e) = reservation.try_grow(additional_reservation) {
+ log::debug!(
+ "Failed to grow reservation by {} bytes to account for spatial
index building memory usage. \
+ Current reservation: {} bytes. reason: {:?}",
+ additional_reservation,
+ reservation.size(),
+ e,
+ );
+ }
+
+ // If force spill is enabled, flush everything to disk regardless of
whether the memory
+ // is enough or not.
+ if self.spatial_join_options.debug.force_spill &&
spill_writer_opt.is_none() {
+ log::debug!(
+ "Force spilling enabled. Spilling {} in-memory batches to
disk.",
+ in_mem_batches.len()
+ );
+ spill_writer_opt = self.spill_in_mem_batches(&mut in_mem_batches,
metrics)?;
+ }
+
+ let build_side_batch_stream: SendableEvaluatedBatchStream = match
spill_writer_opt {
+ Some(spill_writer) => {
+ let spill_file = spill_writer.finish()?;
+ if !in_mem_batches.is_empty() {
+ return sedona_internal_err!(
+ "In-memory batches should have been spilled when spill
file exists"
+ );
+ }
+ Box::pin(ExternalEvaluatedBatchStream::try_from_spill_file(
+ Arc::new(spill_file),
+ )?)
+ }
+ None => {
+ let schema = stream.schema();
+ Box::pin(InMemoryEvaluatedBatchStream::new(schema,
in_mem_batches))
+ }
+ };
+
+ let estimated_spatial_index_memory_usage = total_size_bytes +
extra_mem;
Ok(BuildPartition {
- build_side_batch_stream:
Box::pin(InMemoryEvaluatedBatchStream::new(
- stream.schema(),
- in_mem_batches,
- )),
+ num_rows: total_num_rows,
+ build_side_batch_stream,
geo_statistics,
+ bbox_samples: bbox_sampler.into_samples(),
+ estimated_spatial_index_memory_usage,
reservation,
})
}
@@ -163,16 +275,28 @@ impl BuildSideBatchesCollector {
reservations: Vec<MemoryReservation>,
metrics_vec: Vec<CollectBuildSideMetrics>,
concurrent: bool,
+ seed: u64,
) -> Result<Vec<BuildPartition>> {
if streams.is_empty() {
return Ok(vec![]);
}
+ assert_eq!(
+ streams.len(),
+ reservations.len(),
+ "each build stream must have a reservation"
+ );
+ assert_eq!(
+ streams.len(),
+ metrics_vec.len(),
+ "each build stream must have a metrics collector"
+ );
+
if concurrent {
- self.collect_all_concurrently(streams, reservations, metrics_vec)
+ self.collect_all_concurrently(streams, reservations, metrics_vec,
seed)
.await
} else {
- self.collect_all_sequentially(streams, reservations, metrics_vec)
+ self.collect_all_sequentially(streams, reservations, metrics_vec,
seed)
.await
}
}
@@ -182,8 +306,9 @@ impl BuildSideBatchesCollector {
streams: Vec<SendableRecordBatchStream>,
reservations: Vec<MemoryReservation>,
metrics_vec: Vec<CollectBuildSideMetrics>,
+ seed: u64,
) -> Result<Vec<BuildPartition>> {
- // Spawn a task for each stream to scan all streams concurrently
+ // Spawn task for each stream to scan all streams concurrently
let mut join_set = JoinSet::new();
for (partition_id, ((stream, metrics), reservation)) in streams
.into_iter()
@@ -193,11 +318,18 @@ impl BuildSideBatchesCollector {
{
let collector = self.clone();
let evaluator = Arc::clone(&self.evaluator);
+ let bbox_sampler = BoundingBoxSampler::try_new(
+ self.spatial_join_options.min_index_side_bbox_samples,
+ self.spatial_join_options.max_index_side_bbox_samples,
+ self.spatial_join_options
+ .target_index_side_bbox_sampling_rate,
+ seed.wrapping_add(partition_id as u64),
+ )?;
join_set.spawn(async move {
let evaluated_stream =
create_evaluated_build_stream(stream, evaluator,
metrics.time_taken.clone());
let result = collector
- .collect(evaluated_stream, reservation, &metrics)
+ .collect(evaluated_stream, reservation, bbox_sampler,
&metrics)
.await;
(partition_id, result)
});
@@ -224,20 +356,247 @@ impl BuildSideBatchesCollector {
streams: Vec<SendableRecordBatchStream>,
reservations: Vec<MemoryReservation>,
metrics_vec: Vec<CollectBuildSideMetrics>,
+ seed: u64,
) -> Result<Vec<BuildPartition>> {
// Collect partitions sequentially (for JNI/embedded contexts)
let mut results = Vec::with_capacity(streams.len());
- for ((stream, metrics), reservation) in
- streams.into_iter().zip(metrics_vec).zip(reservations)
+ for (partition_id, ((stream, metrics), reservation)) in streams
+ .into_iter()
+ .zip(metrics_vec)
+ .zip(reservations)
+ .enumerate()
{
let evaluator = Arc::clone(&self.evaluator);
+ let bbox_sampler = BoundingBoxSampler::try_new(
+ self.spatial_join_options.min_index_side_bbox_samples,
+ self.spatial_join_options.max_index_side_bbox_samples,
+ self.spatial_join_options
+ .target_index_side_bbox_sampling_rate,
+ seed.wrapping_add(partition_id as u64),
+ )?;
+
let evaluated_stream =
create_evaluated_build_stream(stream, evaluator,
metrics.time_taken.clone());
let result = self
- .collect(evaluated_stream, reservation, &metrics)
+ .collect(evaluated_stream, reservation, bbox_sampler, &metrics)
.await?;
results.push(result);
}
Ok(results)
}
+
+ fn spill_in_mem_batches(
+ &self,
+ in_mem_batches: &mut Vec<EvaluatedBatch>,
+ metrics: &CollectBuildSideMetrics,
+ ) -> Result<Option<EvaluatedBatchSpillWriter>> {
+ if in_mem_batches.is_empty() {
+ return Ok(None);
+ }
+
+ let build_side_batch = &in_mem_batches[0];
+
+ let schema = build_side_batch.schema();
+ let sedona_type = &build_side_batch.geom_array.sedona_type;
+ let mut spill_writer = EvaluatedBatchSpillWriter::try_new(
+ Arc::clone(&self.runtime_env),
+ schema,
+ sedona_type,
+ "spilling build side batches",
+ self.spill_compression,
+ metrics.spill_metrics.clone(),
+ if self
+ .spatial_join_options
+ .spilled_batch_in_memory_size_threshold
+ == 0
+ {
+ None
+ } else {
+ Some(
+ self.spatial_join_options
+ .spilled_batch_in_memory_size_threshold,
+ )
+ },
+ )?;
+
+ for in_mem_batch in in_mem_batches.iter() {
+ spill_writer.append(in_mem_batch)?;
+ }
+
+ in_mem_batches.clear();
+ Ok(Some(spill_writer))
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+ use crate::{
+ operand_evaluator::EvaluatedGeometryArray,
+ spatial_predicate::{RelationPredicate, SpatialRelationType},
+ };
+ use arrow_array::{ArrayRef, BinaryArray, Int32Array, RecordBatch};
+ use arrow_schema::{DataType, Field, Schema};
+ use datafusion_common::ScalarValue;
+ use datafusion_execution::memory_pool::{GreedyMemoryPool, MemoryConsumer,
MemoryPool};
+ use datafusion_physical_expr::expressions::Literal;
+ use datafusion_physical_plan::metrics::ExecutionPlanMetricsSet;
+ use futures::TryStreamExt;
+ use sedona_common::SpatialJoinOptions;
+ use sedona_schema::datatypes::WKB_GEOMETRY;
+
+ fn test_schema() -> Arc<Schema> {
+ Arc::new(Schema::new(vec![Field::new("id", DataType::Int32, false)]))
+ }
+
+ fn sample_batch(ids: &[i32], wkbs: Vec<Option<Vec<u8>>>) ->
Result<EvaluatedBatch> {
+ assert_eq!(ids.len(), wkbs.len());
+ let id_array = Arc::new(Int32Array::from(ids.to_vec())) as ArrayRef;
+ let batch = RecordBatch::try_new(test_schema(), vec![id_array])?;
+ let geom_values: Vec<Option<&[u8]>> = wkbs
+ .iter()
+ .map(|wkb_opt| wkb_opt.as_ref().map(|wkb| wkb.as_slice()))
+ .collect();
+ let geom_array: ArrayRef = Arc::new(BinaryArray::from(geom_values));
+ let geom = EvaluatedGeometryArray::try_new(geom_array, &WKB_GEOMETRY)?;
+ Ok(EvaluatedBatch {
+ batch,
+ geom_array: geom,
+ })
+ }
+
+ fn point_wkb(x: f64, y: f64) -> Vec<u8> {
+ let mut buf = vec![1u8, 1, 0, 0, 0];
+ buf.extend_from_slice(&x.to_le_bytes());
+ buf.extend_from_slice(&y.to_le_bytes());
+ buf
+ }
+
+ fn build_collector() -> BuildSideBatchesCollector {
+ let expr: Arc<dyn datafusion_physical_expr::PhysicalExpr> =
+ Arc::new(Literal::new(ScalarValue::Null));
+ let predicate = SpatialPredicate::Relation(RelationPredicate::new(
+ Arc::clone(&expr),
+ expr,
+ SpatialRelationType::Intersects,
+ ));
+ BuildSideBatchesCollector::new(
+ predicate,
+ SpatialJoinOptions::default(),
+ Arc::new(RuntimeEnv::default()),
+ SpillCompression::Uncompressed,
+ )
+ }
+
+ fn memory_reservation(limit: usize) -> (MemoryReservation, Arc<dyn
MemoryPool>) {
+ let pool: Arc<dyn MemoryPool> = Arc::new(GreedyMemoryPool::new(limit));
+ let consumer =
MemoryConsumer::new("build-side-test").with_can_spill(true);
+ let reservation = consumer.register(&pool);
+ (reservation, pool)
+ }
+
+ fn build_stream(batches: Vec<EvaluatedBatch>) ->
SendableEvaluatedBatchStream {
+ let schema = batches
+ .first()
+ .map(|batch| batch.schema())
+ .unwrap_or_else(test_schema);
+ Box::pin(InMemoryEvaluatedBatchStream::new(schema, batches))
+ }
+
+ fn collect_ids(batches: &[EvaluatedBatch]) -> Vec<i32> {
+ let mut ids = Vec::new();
+ for batch in batches {
+ let array = batch
+ .batch
+ .column(0)
+ .as_any()
+ .downcast_ref::<Int32Array>()
+ .unwrap();
+ for i in 0..array.len() {
+ ids.push(array.value(i));
+ }
+ }
+ ids
+ }
+
+ #[tokio::test]
+ async fn collect_keeps_batches_in_memory_when_capacity_suffices() ->
Result<()> {
+ let collector = build_collector();
+ let (reservation, _pool) = memory_reservation(10 * 1024 * 1024);
+ let sampler = BoundingBoxSampler::try_new(1, 4, 1.0, 7)?;
+ let batch_a = sample_batch(
+ &[0, 1],
+ vec![Some(point_wkb(0.0, 0.0)), Some(point_wkb(1.0, 1.0))],
+ )?;
+ let batch_b = sample_batch(&[2], vec![Some(point_wkb(2.0, 2.0))])?;
+ let stream = build_stream(vec![batch_a, batch_b]);
+ let metrics_set = ExecutionPlanMetricsSet::new();
+ let metrics = CollectBuildSideMetrics::new(0, &metrics_set);
+
+ let partition = collector
+ .collect(stream, reservation, sampler, &metrics)
+ .await?;
+ let stream = partition.build_side_batch_stream;
+ let is_external = stream.is_external();
+ let batches: Vec<EvaluatedBatch> = stream.try_collect().await?;
+ assert!(!is_external, "Expected in-memory batches");
+ assert_eq!(collect_ids(&batches), vec![0, 1, 2]);
+ assert_eq!(partition.num_rows, 3);
+ assert_eq!(metrics.num_batches.value(), 2);
+ assert_eq!(metrics.num_rows.value(), 3);
+ assert!(metrics.total_size_bytes.value() > 0);
+ Ok(())
+ }
+
+ #[tokio::test]
+ async fn collect_spills_when_reservation_cannot_grow() -> Result<()> {
+ let collector = build_collector();
+ let sampler = BoundingBoxSampler::try_new(1, 2, 1.0, 13)?;
+ let bbox_mem = sampler.estimate_maximum_memory_usage();
+ let (reservation, _pool) = memory_reservation(bbox_mem + 1);
+ let batch_a = sample_batch(
+ &[10, 11],
+ vec![Some(point_wkb(5.0, 5.0)), Some(point_wkb(6.0, 6.0))],
+ )?;
+ let batch_b = sample_batch(&[12], vec![Some(point_wkb(7.0, 7.0))])?;
+ let stream = build_stream(vec![batch_a, batch_b]);
+ let metrics_set = ExecutionPlanMetricsSet::new();
+ let metrics = CollectBuildSideMetrics::new(0, &metrics_set);
+
+ let partition = collector
+ .collect(stream, reservation, sampler, &metrics)
+ .await?;
+ let stream = partition.build_side_batch_stream;
+ let is_external = stream.is_external();
+ let batches: Vec<EvaluatedBatch> = stream.try_collect().await?;
+ assert!(is_external, "Expected batches to spill to disk");
+ assert_eq!(collect_ids(&batches), vec![10, 11, 12]);
+ let spill_metrics = metrics.spill_metrics;
+ assert!(spill_metrics.spill_file_count.value() >= 1);
+ assert!(spill_metrics.spilled_rows.value() >= 1);
+ Ok(())
+ }
+
+ #[tokio::test]
+ async fn collect_handles_empty_stream() -> Result<()> {
+ let collector = build_collector();
+ let (reservation, _pool) = memory_reservation(1024);
+ let sampler = BoundingBoxSampler::try_new(1, 2, 1.0, 19)?;
+ let stream = build_stream(Vec::new());
+ let metrics_set = ExecutionPlanMetricsSet::new();
+ let metrics = CollectBuildSideMetrics::new(0, &metrics_set);
+
+ let partition = collector
+ .collect(stream, reservation, sampler, &metrics)
+ .await?;
+ assert_eq!(partition.num_rows, 0);
+ let stream = partition.build_side_batch_stream;
+ let is_external = stream.is_external();
+ let batches: Vec<EvaluatedBatch> = stream.try_collect().await?;
+ assert!(!is_external);
+ assert!(batches.is_empty());
+ assert_eq!(metrics.num_batches.value(), 0);
+ assert_eq!(metrics.num_rows.value(), 0);
+ Ok(())
+ }
}
diff --git a/rust/sedona-spatial-join/src/optimizer.rs
b/rust/sedona-spatial-join/src/optimizer.rs
index bd01821b..a8c28167 100644
--- a/rust/sedona-spatial-join/src/optimizer.rs
+++ b/rust/sedona-spatial-join/src/optimizer.rs
@@ -235,18 +235,20 @@ impl SpatialJoinOptimizer {
fn try_optimize_join(
&self,
plan: Arc<dyn ExecutionPlan>,
- _config: &ConfigOptions,
+ config: &ConfigOptions,
) -> Result<Transformed<Arc<dyn ExecutionPlan>>> {
// Check if this is a NestedLoopJoinExec that we can convert to
spatial join
if let Some(nested_loop_join) =
plan.as_any().downcast_ref::<NestedLoopJoinExec>() {
- if let Some(spatial_join) =
self.try_convert_to_spatial_join(nested_loop_join)? {
+ if let Some(spatial_join) =
+ self.try_convert_to_spatial_join(nested_loop_join, config)?
+ {
return Ok(Transformed::yes(spatial_join));
}
}
// Check if this is a HashJoinExec with spatial filter that we can
convert to spatial join
if let Some(hash_join) = plan.as_any().downcast_ref::<HashJoinExec>() {
- if let Some(spatial_join) =
self.try_convert_hash_join_to_spatial(hash_join)? {
+ if let Some(spatial_join) =
self.try_convert_hash_join_to_spatial(hash_join, config)? {
return Ok(Transformed::yes(spatial_join));
}
}
@@ -261,7 +263,12 @@ impl SpatialJoinOptimizer {
fn try_convert_to_spatial_join(
&self,
nested_loop_join: &NestedLoopJoinExec,
+ config: &ConfigOptions,
) -> Result<Option<Arc<dyn ExecutionPlan>>> {
+ let Some(options) = config.extensions.get::<SedonaOptions>() else {
+ return Ok(None);
+ };
+
if let Some(join_filter) = nested_loop_join.filter() {
if let Some((spatial_predicate, remainder)) =
transform_join_filter(join_filter) {
// The left side of the nested loop join is required to have
only one partition, while SpatialJoinExec
@@ -300,6 +307,7 @@ impl SpatialJoinOptimizer {
remainder,
join_type,
nested_loop_join.projection().cloned(),
+ &options.spatial_join,
)?;
return Ok(Some(Arc::new(spatial_join)));
@@ -316,7 +324,12 @@ impl SpatialJoinOptimizer {
fn try_convert_hash_join_to_spatial(
&self,
hash_join: &HashJoinExec,
+ config: &ConfigOptions,
) -> Result<Option<Arc<dyn ExecutionPlan>>> {
+ let Some(options) = config.extensions.get::<SedonaOptions>() else {
+ return Ok(None);
+ };
+
// Check if the filter contains spatial predicates
if let Some(join_filter) = hash_join.filter() {
if let Some((spatial_predicate, mut remainder)) =
transform_join_filter(join_filter) {
@@ -354,6 +367,7 @@ impl SpatialJoinOptimizer {
remainder,
hash_join.join_type(),
None, // No projection in SpatialJoinExec
+ &options.spatial_join,
true, // converted_from_hash_join = true
)?);