This is an automated email from the ASF dual-hosted git repository.
kontinuation pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/sedona-db.git
The following commit(s) were added to refs/heads/main by this push:
new 71123806 feat(rust/sedona-spatial-join): Add a repartitioner to write
spatially repartitioned data to spill files (#527)
71123806 is described below
commit 71123806312a23ca0550dc341360c90590f1aee8
Author: Kristin Cowalcijk <[email protected]>
AuthorDate: Wed Jan 21 14:43:04 2026 +0800
feat(rust/sedona-spatial-join): Add a repartitioner to write spatially
repartitioned data to spill files (#527)
This implements part of https://github.com/apache/sedona-db/issues/436. The
build side and probe side data will be spatially repartitioned using
`StreamRepartitioner` introduced by this patch when the build side does not fit
in memory. The code that integrates `StreamRepartitioner` with
`SpatialJoinExec` and `SpatialJoinStream` will be submitted later.
Co-authored-by: Copilot <[email protected]>
---
rust/sedona-functions/src/st_analyze_agg.rs | 30 +-
rust/sedona-geometry/src/analyze.rs | 9 +-
rust/sedona-spatial-join/Cargo.toml | 5 +
.../bench/partitioning/stream_repartitioner.rs | 243 ++++
rust/sedona-spatial-join/src/evaluated_batch.rs | 5 +
.../src/evaluated_batch/evaluated_batch_stream.rs | 6 +-
.../evaluated_batch_stream/in_mem.rs | 2 +-
.../src/index/build_side_collector.rs | 2 +-
rust/sedona-spatial-join/src/operand_evaluator.rs | 3 +
rust/sedona-spatial-join/src/partitioning.rs | 2 +
.../src/partitioning/partition_slots.rs | 85 ++
.../src/partitioning/stream_repartitioner.rs | 1359 ++++++++++++++++++++
rust/sedona-spatial-join/src/partitioning/util.rs | 7 +
rust/sedona-spatial-join/src/stream.rs | 2 +-
rust/sedona-spatial-join/src/utils/spill.rs | 8 +-
15 files changed, 1741 insertions(+), 27 deletions(-)
diff --git a/rust/sedona-functions/src/st_analyze_agg.rs
b/rust/sedona-functions/src/st_analyze_agg.rs
index 9dd49b26..82647a6d 100644
--- a/rust/sedona-functions/src/st_analyze_agg.rs
+++ b/rust/sedona-functions/src/st_analyze_agg.rs
@@ -34,7 +34,7 @@ use sedona_expr::aggregate_udf::SedonaAccumulatorRef;
use sedona_expr::aggregate_udf::SedonaAggregateUDF;
use sedona_expr::item_crs::ItemCrsSedonaAccumulator;
use sedona_expr::{aggregate_udf::SedonaAccumulator, statistics::GeoStatistics};
-use sedona_geometry::analyze::GeometryAnalysis;
+use sedona_geometry::analyze::GeometrySummary;
use sedona_geometry::interval::IntervalTrait;
use sedona_geometry::types::{GeometryTypeAndDimensions,
GeometryTypeAndDimensionsSet};
use sedona_schema::{datatypes::SedonaType, matchers::ArgMatcher};
@@ -246,25 +246,29 @@ impl AnalyzeAccumulator {
}
}
- pub fn update_statistics(&mut self, geom: &Wkb, size_bytes: usize) ->
Result<()> {
+ pub fn update_statistics(&mut self, geom: &Wkb) -> Result<()> {
// Get geometry analysis information
- let analysis = sedona_geometry::analyze::analyze_geometry(geom)
+ let summary = sedona_geometry::analyze::analyze_geometry(geom)
.map_err(|e| DataFusionError::External(Box::new(e)))?;
+ self.ingest_geometry_summary(&summary);
+
+ Ok(())
+ }
+
+ pub fn ingest_geometry_summary(&mut self, summary: &GeometrySummary) {
// Start with a clone of the current stats
let mut stats = self.stats.clone();
// Update each component of the statistics
- stats = self.update_basic_counts(stats, size_bytes);
- stats = self.update_geometry_type_counts(stats, &analysis);
- stats = self.update_point_count(stats, analysis.point_count);
- stats = self.update_envelope_info(stats, &analysis);
- stats = self.update_geometry_types(stats, analysis.geometry_type);
+ stats = self.update_basic_counts(stats, summary.size_bytes);
+ stats = self.update_geometry_type_counts(stats, summary);
+ stats = self.update_point_count(stats, summary.point_count);
+ stats = self.update_envelope_info(stats, summary);
+ stats = self.update_geometry_types(stats, summary.geometry_type);
// Assign the updated stats back to self.stats
self.stats = stats;
-
- Ok(())
}
pub fn finish(self) -> GeoStatistics {
@@ -284,7 +288,7 @@ impl AnalyzeAccumulator {
fn update_geometry_type_counts(
&self,
stats: GeoStatistics,
- analysis: &GeometryAnalysis,
+ analysis: &GeometrySummary,
) -> GeoStatistics {
// Add the counts from analysis to existing stats
let puntal = stats.puntal_count().unwrap_or(0) + analysis.puntal_count;
@@ -309,7 +313,7 @@ impl AnalyzeAccumulator {
fn update_envelope_info(
&self,
stats: GeoStatistics,
- analysis: &GeometryAnalysis,
+ analysis: &GeometrySummary,
) -> GeoStatistics {
// The bbox is directly available on analysis, not wrapped in an Option
let bbox = &analysis.bbox;
@@ -368,7 +372,7 @@ impl AnalyzeAccumulator {
fn execute_update(&mut self, executor: WkbExecutor) -> Result<()> {
executor.execute_wkb_void(|maybe_item| {
if let Some(item) = maybe_item {
- self.update_statistics(&item, item.buf().len())?;
+ self.update_statistics(&item)?;
}
Ok(())
})?;
diff --git a/rust/sedona-geometry/src/analyze.rs
b/rust/sedona-geometry/src/analyze.rs
index e34e191b..aa01f11a 100644
--- a/rust/sedona-geometry/src/analyze.rs
+++ b/rust/sedona-geometry/src/analyze.rs
@@ -23,9 +23,10 @@ use crate::{
};
use wkb::reader::Wkb;
-/// Contains analysis results for a geometry
+/// Captures the size, bounds, and type-derived counts for a single geometry.
+/// Used as the per-geometry input that eventually feeds aggregated
`GeoStatistics`.
#[derive(Debug, Clone)]
-pub struct GeometryAnalysis {
+pub struct GeometrySummary {
pub size_bytes: usize,
pub point_count: i64,
pub geometry_type: GeometryTypeAndDimensions,
@@ -37,7 +38,7 @@ pub struct GeometryAnalysis {
}
/// Analyzes a WKB geometry and returns its size, point count, dimensions, and
type
-pub fn analyze_geometry(geom: &Wkb) -> Result<GeometryAnalysis,
SedonaGeometryError> {
+pub fn analyze_geometry(geom: &Wkb) -> Result<GeometrySummary,
SedonaGeometryError> {
// Get size in bytes directly from WKB buffer
let size_bytes = geom.buf().len();
@@ -74,7 +75,7 @@ pub fn analyze_geometry(geom: &Wkb) ->
Result<GeometryAnalysis, SedonaGeometryEr
GeometryTypeId::GeometryCollection
) as i64;
- Ok(GeometryAnalysis {
+ Ok(GeometrySummary {
size_bytes,
point_count,
geometry_type,
diff --git a/rust/sedona-spatial-join/Cargo.toml
b/rust/sedona-spatial-join/Cargo.toml
index 9831c59b..010c2ead 100644
--- a/rust/sedona-spatial-join/Cargo.toml
+++ b/rust/sedona-spatial-join/Cargo.toml
@@ -100,3 +100,8 @@ harness = false
name = "external_evaluated_batch_stream"
path = "bench/evaluated_batch/external_evaluated_batch_stream.rs"
harness = false
+
+[[bench]]
+name = "stream_repartitioner"
+path = "bench/partitioning/stream_repartitioner.rs"
+harness = false
diff --git
a/rust/sedona-spatial-join/bench/partitioning/stream_repartitioner.rs
b/rust/sedona-spatial-join/bench/partitioning/stream_repartitioner.rs
new file mode 100644
index 00000000..62641a43
--- /dev/null
+++ b/rust/sedona-spatial-join/bench/partitioning/stream_repartitioner.rs
@@ -0,0 +1,243 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::sync::{
+ atomic::{AtomicU64, Ordering},
+ Arc,
+};
+use std::time::Duration;
+
+use arrow_array::{
+ ArrayRef, BinaryArray, Date32Array, Int64Array, RecordBatch, StringArray,
+ TimestampMicrosecondArray,
+};
+use arrow_schema::{DataType, Field, Schema, TimeUnit};
+use criterion::{criterion_group, criterion_main, BatchSize, Criterion,
Throughput};
+use datafusion::config::SpillCompression;
+use datafusion_execution::runtime_env::RuntimeEnv;
+use datafusion_physical_plan::metrics::{ExecutionPlanMetricsSet, SpillMetrics};
+use futures::executor::block_on;
+use rand::{rngs::StdRng, Rng, SeedableRng};
+use sedona_geometry::{bounding_box::BoundingBox, interval::IntervalTrait};
+use sedona_schema::datatypes::WKB_GEOMETRY;
+use sedona_spatial_join::evaluated_batch::{
+ evaluated_batch_stream::{in_mem::InMemoryEvaluatedBatchStream,
SendableEvaluatedBatchStream},
+ EvaluatedBatch,
+};
+use sedona_spatial_join::operand_evaluator::EvaluatedGeometryArray;
+use sedona_spatial_join::partitioning::PartitionedSide;
+use sedona_spatial_join::partitioning::{
+ kdb::KDBPartitioner, stream_repartitioner::StreamRepartitioner,
SpatialPartitioner,
+};
+
+const RNG_SEED: u64 = 0x05ED_04A5;
+const NUM_BATCHES: usize = 50;
+const ROWS_PER_BATCH: usize = 8192;
+const SAMPLE_FOR_PARTITIONER: usize = 1_000;
+const MAX_ITEMS_PER_NODE: usize = 128;
+const MAX_LEVELS: usize = 4;
+const REPARTITIONER_BUFFER_BYTES: usize = 8 * 1024 * 1024;
+
+fn bench_stream_partitioner(c: &mut Criterion) {
+ let extent = Arc::new(default_extent());
+ let partitioner = build_partitioner(extent.as_ref());
+ let schema = Arc::new(build_schema());
+ let runtime_env = Arc::new(RuntimeEnv::default());
+ let metrics_set = ExecutionPlanMetricsSet::new();
+ let spill_metrics = SpillMetrics::new(&metrics_set, 0);
+ let seed_counter = Arc::new(AtomicU64::new(RNG_SEED));
+
+ let mut group = c.benchmark_group("stream_partitioner_repartition");
+ group.throughput(Throughput::Elements((NUM_BATCHES * ROWS_PER_BATCH) as
u64));
+
+ group.bench_function("kdb_repartition", |b| {
+ let seed_counter = Arc::clone(&seed_counter);
+ let schema = Arc::clone(&schema);
+ let runtime_env = Arc::clone(&runtime_env);
+ let partitioner = Arc::clone(&partitioner);
+ let spill_metrics = spill_metrics.clone();
+ let extent = Arc::clone(&extent);
+
+ b.iter_batched(
+ move || {
+ let seed = seed_counter.fetch_add(1, Ordering::Relaxed);
+ generate_stream(seed, schema.clone(), extent.as_ref())
+ },
+ move |stream| {
+ block_on(async {
+ StreamRepartitioner::builder(
+ runtime_env.clone(),
+ partitioner.clone(),
+ PartitionedSide::BuildSide,
+ spill_metrics.clone(),
+ )
+ .spill_compression(SpillCompression::Uncompressed)
+ .buffer_bytes_threshold(REPARTITIONER_BUFFER_BYTES)
+ .target_batch_size(ROWS_PER_BATCH)
+ .spilled_batch_in_memory_size_threshold(None)
+ .build()
+ .repartition_stream(stream)
+ .await
+ .expect("repartition should succeed in benchmark");
+ });
+ },
+ BatchSize::SmallInput,
+ );
+ });
+
+ group.finish();
+}
+
+fn generate_stream(
+ seed: u64,
+ schema: Arc<Schema>,
+ extent: &BoundingBox,
+) -> SendableEvaluatedBatchStream {
+ let mut rng = StdRng::seed_from_u64(seed);
+ let mut batches = Vec::with_capacity(NUM_BATCHES);
+ for _ in 0..NUM_BATCHES {
+ batches.push(random_evaluated_batch(
+ schema.clone(),
+ ROWS_PER_BATCH,
+ extent,
+ &mut rng,
+ ));
+ }
+ in_memory_stream(schema, batches)
+}
+
+fn random_evaluated_batch(
+ schema: Arc<Schema>,
+ rows: usize,
+ extent: &BoundingBox,
+ rng: &mut StdRng,
+) -> EvaluatedBatch {
+ let batch = random_record_batch(schema, rows, rng);
+ let geom_array = random_geometry_array(rows, extent, rng);
+ let geom_array = EvaluatedGeometryArray::try_new(geom_array, &WKB_GEOMETRY)
+ .expect("geometry array allocation should succeed");
+ EvaluatedBatch { batch, geom_array }
+}
+
+fn random_record_batch(schema: Arc<Schema>, rows: usize, rng: &mut StdRng) ->
RecordBatch {
+ let ids = Int64Array::from_iter_values((0..rows).map(|_|
rng.random_range(0..1_000_000_i64)));
+ let words = StringArray::from_iter_values((0..rows).map(|_|
random_string(rng)));
+ let dates = Date32Array::from_iter_values((0..rows).map(|_|
rng.random_range(18_000..20_000)));
+ let timestamps = TimestampMicrosecondArray::from_iter_values(
+ (0..rows).map(|_|
rng.random_range(1_600_000_000_000_000i64..1_700_000_000_000_000)),
+ );
+
+ let columns: Vec<ArrayRef> = vec![
+ Arc::new(ids),
+ Arc::new(words),
+ Arc::new(dates),
+ Arc::new(timestamps),
+ ];
+
+ RecordBatch::try_new(schema, columns).expect("record batch assembly should
succeed")
+}
+
+fn random_geometry_array(rows: usize, extent: &BoundingBox, rng: &mut StdRng)
-> ArrayRef {
+ let wkbs: Vec<Vec<u8>> = (0..rows)
+ .map(|_| {
+ let x = rng.random_range(extent.x().lo()..=extent.x().hi());
+ let y = rng.random_range(extent.y().lo()..=extent.y().hi());
+ point_wkb(x, y)
+ })
+ .collect();
+
+ let binary = BinaryArray::from_iter_values(wkbs.iter().map(|wkb|
wkb.as_slice()));
+ Arc::new(binary)
+}
+
+fn random_string(rng: &mut StdRng) -> String {
+ const CHARSET: &[u8] = b"abcdefghijklmnopqrstuvwxyz";
+ let mut buf = [0u8; 8];
+ for slot in &mut buf {
+ let idx = rng.random_range(0..CHARSET.len());
+ *slot = CHARSET[idx];
+ }
+ String::from_utf8_lossy(&buf).to_string()
+}
+
+fn build_schema() -> Schema {
+ Schema::new(vec![
+ Field::new("id", DataType::Int64, false),
+ Field::new("category", DataType::Utf8, true),
+ Field::new("event_date", DataType::Date32, true),
+ Field::new(
+ "event_ts",
+ DataType::Timestamp(TimeUnit::Microsecond, None),
+ true,
+ ),
+ ])
+}
+
+fn build_partitioner(extent: &BoundingBox) -> Arc<dyn SpatialPartitioner +
Send + Sync> {
+ let mut rng = StdRng::seed_from_u64(RNG_SEED ^ 0x00FF_FFFF);
+ let samples = (0..SAMPLE_FOR_PARTITIONER)
+ .map(|_| random_bbox(extent, &mut rng))
+ .collect::<Vec<_>>();
+
+ let partitioner = KDBPartitioner::build(
+ samples.into_iter(),
+ MAX_ITEMS_PER_NODE,
+ MAX_LEVELS,
+ extent.clone(),
+ )
+ .expect("kdb builder should succeed");
+
+ Arc::new(partitioner)
+}
+
+fn random_bbox(extent: &BoundingBox, rng: &mut StdRng) -> BoundingBox {
+ let span_x = (extent.x().hi() - extent.x().lo()) / 20.0;
+ let span_y = (extent.y().hi() - extent.y().lo()) / 20.0;
+ let width = rng.random_range(10.0..=span_x).max(1.0);
+ let height = rng.random_range(10.0..=span_y).max(1.0);
+ let min_x = rng.random_range(extent.x().lo()..=extent.x().hi() - width);
+ let min_y = rng.random_range(extent.y().lo()..=extent.y().hi() - height);
+ BoundingBox::xy((min_x, min_x + width), (min_y, min_y + height))
+}
+
+fn default_extent() -> BoundingBox {
+ BoundingBox::xy((0.0, 10_000.0), (0.0, 10_000.0))
+}
+
+fn point_wkb(x: f64, y: f64) -> Vec<u8> {
+ let mut buf = vec![1u8, 1, 0, 0, 0];
+ buf.extend_from_slice(&x.to_le_bytes());
+ buf.extend_from_slice(&y.to_le_bytes());
+ buf
+}
+
+criterion_group! {
+ name = stream_partitioner;
+ config = Criterion::default()
+ .sample_size(10)
+ .measurement_time(Duration::from_secs(4))
+ .warm_up_time(Duration::from_secs(2));
+ targets = bench_stream_partitioner
+}
+criterion_main!(stream_partitioner);
+
+fn in_memory_stream(
+ schema: Arc<Schema>,
+ batches: Vec<EvaluatedBatch>,
+) -> SendableEvaluatedBatchStream {
+ Box::pin(InMemoryEvaluatedBatchStream::new(schema, batches))
+}
diff --git a/rust/sedona-spatial-join/src/evaluated_batch.rs
b/rust/sedona-spatial-join/src/evaluated_batch.rs
index aad3f11b..7fa0cd79 100644
--- a/rust/sedona-spatial-join/src/evaluated_batch.rs
+++ b/rust/sedona-spatial-join/src/evaluated_batch.rs
@@ -16,6 +16,7 @@
// under the License.
use arrow_array::RecordBatch;
+use arrow_schema::SchemaRef;
use datafusion_common::Result;
use datafusion_expr::ColumnarValue;
use geo::Rect;
@@ -47,6 +48,10 @@ impl EvaluatedBatch {
Ok(record_batch_size + geom_array_size)
}
+ pub fn schema(&self) -> SchemaRef {
+ self.batch.schema()
+ }
+
pub fn num_rows(&self) -> usize {
self.batch.num_rows()
}
diff --git
a/rust/sedona-spatial-join/src/evaluated_batch/evaluated_batch_stream.rs
b/rust/sedona-spatial-join/src/evaluated_batch/evaluated_batch_stream.rs
index c18761d2..49b4a6f4 100644
--- a/rust/sedona-spatial-join/src/evaluated_batch/evaluated_batch_stream.rs
+++ b/rust/sedona-spatial-join/src/evaluated_batch/evaluated_batch_stream.rs
@@ -25,7 +25,7 @@ use datafusion_common::Result;
/// A stream that produces [`EvaluatedBatch`] items. This stream may have
purely in-memory or
/// out-of-core implementations. The type of the stream could be queried
calling `is_external()`.
-pub(crate) trait EvaluatedBatchStream: Stream<Item = Result<EvaluatedBatch>> {
+pub trait EvaluatedBatchStream: Stream<Item = Result<EvaluatedBatch>> {
/// Returns true if this stream is an external stream, where batch data
were spilled to disk.
fn is_external(&self) -> bool;
@@ -36,8 +36,8 @@ pub(crate) trait EvaluatedBatchStream: Stream<Item =
Result<EvaluatedBatch>> {
fn schema(&self) -> SchemaRef;
}
-pub(crate) type SendableEvaluatedBatchStream = Pin<Box<dyn
EvaluatedBatchStream + Send>>;
+pub type SendableEvaluatedBatchStream = Pin<Box<dyn EvaluatedBatchStream +
Send>>;
pub(crate) mod evaluate;
pub mod external;
-pub(crate) mod in_mem;
+pub mod in_mem;
diff --git
a/rust/sedona-spatial-join/src/evaluated_batch/evaluated_batch_stream/in_mem.rs
b/rust/sedona-spatial-join/src/evaluated_batch/evaluated_batch_stream/in_mem.rs
index 550e308a..4ec4ba9c 100644
---
a/rust/sedona-spatial-join/src/evaluated_batch/evaluated_batch_stream/in_mem.rs
+++
b/rust/sedona-spatial-join/src/evaluated_batch/evaluated_batch_stream/in_mem.rs
@@ -27,7 +27,7 @@ use datafusion_common::Result;
use crate::evaluated_batch::{evaluated_batch_stream::EvaluatedBatchStream,
EvaluatedBatch};
-pub(crate) struct InMemoryEvaluatedBatchStream {
+pub struct InMemoryEvaluatedBatchStream {
schema: SchemaRef,
iter: IntoIter<EvaluatedBatch>,
}
diff --git a/rust/sedona-spatial-join/src/index/build_side_collector.rs
b/rust/sedona-spatial-join/src/index/build_side_collector.rs
index 9230aaf4..90ebb05f 100644
--- a/rust/sedona-spatial-join/src/index/build_side_collector.rs
+++ b/rust/sedona-spatial-join/src/index/build_side_collector.rs
@@ -103,7 +103,7 @@ impl BuildSideBatchesCollector {
// Process the record batch and create a BuildSideBatch
let geom_array = &build_side_batch.geom_array;
for wkb in geom_array.wkbs().iter().flatten() {
- analyzer.update_statistics(wkb, wkb.buf().len())?;
+ analyzer.update_statistics(wkb)?;
}
let in_mem_size = build_side_batch.in_mem_size()?;
diff --git a/rust/sedona-spatial-join/src/operand_evaluator.rs
b/rust/sedona-spatial-join/src/operand_evaluator.rs
index 8b431396..a8b83264 100644
--- a/rust/sedona-spatial-join/src/operand_evaluator.rs
+++ b/rust/sedona-spatial-join/src/operand_evaluator.rs
@@ -91,6 +91,8 @@ pub(crate) fn create_operand_evaluator(
/// Result of evaluating a geometry batch.
pub struct EvaluatedGeometryArray {
+ /// Type of geometry_array
+ pub sedona_type: SedonaType,
/// The array of geometries produced by evaluating the geometry expression.
pub geometry_array: ArrayRef,
/// The rects of the geometries in the geometry array. The length of this
array is equal to the number of geometries.
@@ -139,6 +141,7 @@ impl EvaluatedGeometryArray {
.map(|wkb| wkb.map(|wkb| unsafe { transmute(wkb) }))
.collect();
Ok(Self {
+ sedona_type: sedona_type.clone(),
geometry_array,
rects: rect_vec,
distance: None,
diff --git a/rust/sedona-spatial-join/src/partitioning.rs
b/rust/sedona-spatial-join/src/partitioning.rs
index d20eb523..fe495f6b 100644
--- a/rust/sedona-spatial-join/src/partitioning.rs
+++ b/rust/sedona-spatial-join/src/partitioning.rs
@@ -20,7 +20,9 @@ use sedona_geometry::bounding_box::BoundingBox;
pub mod flat;
pub mod kdb;
+pub(crate) mod partition_slots;
pub mod rtree;
+pub mod stream_repartitioner;
pub(crate) mod util;
/// Spatial partitioning is different from traditional data partitioning such
as hash partitioning.
diff --git a/rust/sedona-spatial-join/src/partitioning/partition_slots.rs
b/rust/sedona-spatial-join/src/partitioning/partition_slots.rs
new file mode 100644
index 00000000..3916cea0
--- /dev/null
+++ b/rust/sedona-spatial-join/src/partitioning/partition_slots.rs
@@ -0,0 +1,85 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::partitioning::SpatialPartition;
+
+#[derive(Clone, Copy, Debug)]
+/// Maintains the slot mapping for all `SpatialPartition` variants, reserving
+/// contiguous indices for regular partitions plus dedicated None/Multi slots.
+pub struct PartitionSlots {
+ num_regular: usize,
+}
+
+impl PartitionSlots {
+ /// Create a slot manager for `num_regular` `SpatialPartition::Regular`
entries.
+ /// Two additional slots are implicitly reserved: one for `None` and one
for `Multi`.
+ pub fn new(num_regular: usize) -> Self {
+ Self { num_regular }
+ }
+
+ /// Return the total slot count (`Regular + None + Multi`).
+ pub fn total_slots(&self) -> usize {
+ self.num_regular + 2
+ }
+
+ /// Convert a `SpatialPartition` into its backing slot index.
+ pub fn slot(&self, partition: SpatialPartition) -> Option<usize> {
+ match partition {
+ SpatialPartition::Regular(id) => {
+ let id = id as usize;
+ if id < self.num_regular {
+ Some(id)
+ } else {
+ None
+ }
+ }
+ SpatialPartition::None => Some(self.none_slot()),
+ SpatialPartition::Multi => Some(self.multi_slot()),
+ }
+ }
+
+ /// Convert a slot index back into the corresponding `SpatialPartition`
variant.
+ pub fn partition(&self, slot: usize) -> SpatialPartition {
+ if slot < self.num_regular {
+ SpatialPartition::Regular(slot as u32)
+ } else if slot == self.none_slot() {
+ SpatialPartition::None
+ } else if slot == self.multi_slot() {
+ SpatialPartition::Multi
+ } else {
+ panic!(
+ "invalid partition slot {slot} for {} regular partitions",
+ self.num_regular
+ );
+ }
+ }
+
+ /// Number of regular partitions
+ pub fn num_regular_partitions(&self) -> usize {
+ self.num_regular
+ }
+
+ /// Slot dedicated to `SpatialPartition::None`.
+ pub fn none_slot(&self) -> usize {
+ self.num_regular
+ }
+
+ /// Slot dedicated to `SpatialPartition::Multi`.
+ pub fn multi_slot(&self) -> usize {
+ self.num_regular + 1
+ }
+}
diff --git a/rust/sedona-spatial-join/src/partitioning/stream_repartitioner.rs
b/rust/sedona-spatial-join/src/partitioning/stream_repartitioner.rs
new file mode 100644
index 00000000..44591107
--- /dev/null
+++ b/rust/sedona-spatial-join/src/partitioning/stream_repartitioner.rs
@@ -0,0 +1,1359 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Streaming spatial partitioning utilities.
+//!
+//! This module provides helpers that repartition an [`EvaluatedBatch`] stream
into
+//! spatial spill files using a [`SpatialPartitioner`]. Each regular
partition, along
+//! with the None and Multi partitions, gets at most one spill file which can
+//! later be replayed via [`SendableEvaluatedBatchStream`].
+
+use std::sync::Arc;
+
+use crate::{
+ evaluated_batch::{
+ evaluated_batch_stream::SendableEvaluatedBatchStream,
spill::EvaluatedBatchSpillWriter,
+ EvaluatedBatch,
+ },
+ operand_evaluator::EvaluatedGeometryArray,
+ partitioning::{
+ partition_slots::PartitionSlots, util::geo_rect_to_bbox,
PartitionedSide, SpatialPartition,
+ SpatialPartitioner,
+ },
+};
+use arrow::compute::interleave as arrow_interleave;
+use arrow::compute::interleave_record_batch;
+use arrow_array::{Array, ArrayRef, RecordBatch};
+use datafusion::config::SpillCompression;
+use datafusion_common::{Result, ScalarValue};
+use datafusion_execution::{disk_manager::RefCountedTempFile,
runtime_env::RuntimeEnv};
+use datafusion_expr::ColumnarValue;
+use datafusion_physical_plan::metrics::SpillMetrics;
+use futures::StreamExt;
+use sedona_common::sedona_internal_err;
+use sedona_expr::statistics::GeoStatistics;
+use sedona_functions::st_analyze_agg::AnalyzeAccumulator;
+use sedona_geometry::bounding_box::BoundingBox;
+use sedona_geometry::interval::IntervalTrait;
+use sedona_schema::datatypes::WKB_GEOMETRY;
+
+/// Result emitted after a stream is spatially repartitioned.
+#[derive(Debug)]
+pub struct SpilledPartitions {
+ slots: PartitionSlots,
+ partitions: Vec<Option<SpilledPartition>>,
+}
+
+/// Metadata and spill files produced for a single spatial partition.
+///
+/// A `SpilledPartition` corresponds to one logical spatial partition
(including
+/// special partitions such as `None` or `Multi`) after the stream
repartitioner
+/// has flushed in-memory data to disk. It tracks the set of temporary spill
+/// files that hold the partition's rows, along with aggregated geospatial
+/// statistics and the total number of rows written.
+#[derive(Debug, Clone)]
+pub struct SpilledPartition {
+ /// Temporary spill files containing the rows assigned to this partition.
+ spill_files: Vec<Arc<RefCountedTempFile>>,
+ /// Aggregated geospatial statistics computed over all rows in this
partition.
+ geo_statistics: GeoStatistics,
+ /// Total number of rows that were written into `spill_files`.
+ num_rows: usize,
+}
+
+impl SpilledPartition {
+ /// Construct a spilled partition from finalized spill files and
aggregated statistics.
+ pub fn new(
+ spill_files: Vec<Arc<RefCountedTempFile>>,
+ geo_statistics: GeoStatistics,
+ num_rows: usize,
+ ) -> Self {
+ Self {
+ spill_files,
+ geo_statistics,
+ num_rows,
+ }
+ }
+
+ /// Create an empty spilled partition (no files, empty stats, zero rows).
+ pub fn empty() -> Self {
+ Self::new(Vec::new(), GeoStatistics::empty(), 0)
+ }
+
+ /// Spill files produced for this partition.
+ pub fn spill_files(&self) -> &[Arc<RefCountedTempFile>] {
+ &self.spill_files
+ }
+
+ /// Aggregated geospatial statistics for this partition.
+ pub fn geo_statistics(&self) -> &GeoStatistics {
+ &self.geo_statistics
+ }
+
+ /// Total number of rows assigned to this partition.
+ pub fn num_rows(&self) -> usize {
+ self.num_rows
+ }
+
+ /// Bounding box, if available from accumulated statistics.
+ pub fn bounding_box(&self) -> Option<&BoundingBox> {
+ self.geo_statistics.bbox()
+ }
+
+ /// Consume this value and return only the spill files.
+ pub fn into_spill_files(self) -> Vec<Arc<RefCountedTempFile>> {
+ self.spill_files
+ }
+
+ /// Consume this value and return `(spill_files, geo_statistics,
num_rows)`.
+ pub fn into_inner(self) -> (Vec<Arc<RefCountedTempFile>>, GeoStatistics,
usize) {
+ (self.spill_files, self.geo_statistics, self.num_rows)
+ }
+}
+
+impl SpilledPartitions {
+ /// Construct a new spilled-partitions container for the provided slots.
+ ///
+ /// `partitions` must contain one entry for every slot in `slots`.
+ pub fn new(slots: PartitionSlots, partitions: Vec<SpilledPartition>) ->
Self {
+ assert_eq!(partitions.len(), slots.total_slots());
+ let partitions = partitions.into_iter().map(Some).collect();
+ Self { slots, partitions }
+ }
+
+ /// Number of regular partitions
+ pub fn num_regular_partitions(&self) -> usize {
+ self.slots.num_regular_partitions()
+ }
+
+ /// Get the slots for mapping spatial partitions to sequential 0-based
indexes
+ pub fn slots(&self) -> PartitionSlots {
+ self.slots
+ }
+
+ /// Count of spill files that were actually materialized.
+ pub fn spill_file_count(&self) -> usize {
+ self.partitions
+ .iter()
+ .map(|partition| partition.as_ref().map_or(0, |p|
p.spill_files().len()))
+ .sum()
+ }
+
+ /// Retrieve the spilled partition for a given spatial partition.
+ pub fn spilled_partition(&self, partition: SpatialPartition) ->
Result<&SpilledPartition> {
+ let Some(slot) = self.slots.slot(partition) else {
+ return sedona_internal_err!(
+ "Invalid partition {:?} for {} regular partitions",
+ partition,
+ self.slots.num_regular_partitions()
+ );
+ };
+ match &self.partitions[slot] {
+ Some(spilled_partition) => Ok(spilled_partition),
+ None => sedona_internal_err!(
+ "Spilled partition {:?} has already been taken away",
+ partition
+ ),
+ }
+ }
+
+ /// Consume this structure into concrete spilled partitions.
+ pub fn into_spilled_partitions(self) -> Result<Vec<SpilledPartition>> {
+ let mut partitions = Vec::with_capacity(self.partitions.len());
+ for partition_opt in self.partitions {
+ match partition_opt {
+ Some(partition) => partitions.push(partition),
+ None => {
+ return sedona_internal_err!(
+ "Some of the spilled partitions have already been
taken away"
+ )
+ }
+ }
+ }
+ Ok(partitions)
+ }
+
+ /// Get a clone of the spill files in specified partition without
consuming it. This is
+ /// mainly for retrieving the Multi partition, which may be scanned
multiple times.
+ pub fn get_spilled_partition(&self, partition: SpatialPartition) ->
Result<SpilledPartition> {
+ let Some(slot) = self.slots.slot(partition) else {
+ return sedona_internal_err!(
+ "Invalid partition {:?} for {} regular partitions",
+ partition,
+ self.slots.num_regular_partitions()
+ );
+ };
+ match &self.partitions[slot] {
+ Some(spilled_partition) => Ok(spilled_partition.clone()),
+ None => sedona_internal_err!(
+ "Spilled partition {:?} has already been taken away",
+ partition
+ ),
+ }
+ }
+
+ /// Take the spill files in specified partition from it without consuming
this value. This is
+ /// mainly for retrieving the regular partitions, which will only be
scanned once.
+ pub fn take_spilled_partition(
+ &mut self,
+ partition: SpatialPartition,
+ ) -> Result<SpilledPartition> {
+ let Some(slot) = self.slots.slot(partition) else {
+ return sedona_internal_err!(
+ "Invalid partition {:?} for {} regular partitions",
+ partition,
+ self.slots.num_regular_partitions()
+ );
+ };
+ match std::mem::take(&mut self.partitions[slot]) {
+ Some(spilled_partition) => Ok(spilled_partition),
+ None => sedona_internal_err!(
+ "Spilled partition {:?} has already been taken away",
+ partition
+ ),
+ }
+ }
+
+ /// Are the spill files still present and can they be taken away?
+ pub fn can_take_spilled_partition(&self, partition: SpatialPartition) ->
bool {
+ let Some(slot) = self.slots.slot(partition) else {
+ return false;
+ };
+ self.partitions[slot].is_some()
+ }
+
+ /// Write debug info for this spilled partitions
+ pub fn debug_print(&self, f: &mut impl std::fmt::Write) ->
std::fmt::Result {
+ for k in 0..self.slots.total_slots() {
+ if let Some(spilled_partition) = &self.partitions[k] {
+ let bbox_str = if let Some(bbox) =
spilled_partition.bounding_box() {
+ format!(
+ "x: [{:.6}, {:.6}], y: [{:.6}, {:.6}]",
+ bbox.x().lo(),
+ bbox.x().hi(),
+ bbox.y().lo(),
+ bbox.y().hi()
+ )
+ } else {
+ "None".to_string()
+ };
+ let spill_files = spilled_partition.spill_files();
+ let spill_file_sizes = spill_files
+ .iter()
+ .map(|sp| {
+ sp.inner()
+ .as_file()
+ .metadata()
+ .map(|m| m.len())
+ .unwrap_or(0)
+ })
+ .collect::<Vec<_>>();
+ writeln!(
+ f,
+ "Partition {:?}: {} spill file(s), num non-empty geoms:
{:?}, bbox: {}, spill file sizes: {:?}",
+ self.slots.partition(k),
+ spilled_partition.spill_files().len(),
+ spilled_partition
+ .geo_statistics()
+ .total_geometries()
+ .unwrap_or_default(),
+ bbox_str,
+ spill_file_sizes,
+ )?;
+ } else {
+ writeln!(f, "Partition {}: already taken away", k)?;
+ }
+ }
+ Ok(())
+ }
+}
+
+/// Incremental (stateful) repartitioner for an [`EvaluatedBatch`] stream.
+///
+/// This type assigns each incoming row to a [`SpatialPartition`] (based on a
+/// [`SpatialPartitioner`]) and writes the partitioned output into spill files.
+///
+/// It buffers incoming data and keeps per-partition spill writers open across
+/// batches to amortize setup cost. Flushing is controlled via
+/// `buffer_bytes_threshold`, and output is optionally chunked to approximately
+/// `target_batch_size` rows per partition batch.
+pub struct StreamRepartitioner {
+ runtime_env: Arc<RuntimeEnv>,
+ partitioner: Arc<dyn SpatialPartitioner>,
+ partitioned_side: PartitionedSide,
+ slots: PartitionSlots,
+ /// Spill files for each spatial partition.
+ /// The None and Multi partitions should be None when repartitioning the
build side.
+ spill_registry: Vec<Option<EvaluatedBatchSpillWriter>>,
+ /// Geospatial statistics for each spatial partition.
+ geo_stats_accumulators: Vec<AnalyzeAccumulator>,
+ /// Number of rows in each spatial partition.
+ num_rows: Vec<usize>,
+ slot_assignments: Vec<Vec<(usize, usize)>>,
+ row_assignments_buffer: Vec<SpatialPartition>,
+ spill_compression: SpillCompression,
+ spill_metrics: SpillMetrics,
+ buffer_bytes_threshold: usize,
+ target_batch_size: usize,
+ spilled_batch_in_memory_size_threshold: Option<usize>,
+ pending_batches: Vec<EvaluatedBatch>,
+ pending_bytes: usize,
+}
+
+/// Builder for configuring and constructing a [`StreamRepartitioner`].
+///
+/// Defaults are chosen to be safe and explicit:
+/// - `spill_compression`: [`SpillCompression::Uncompressed`]
+/// - `buffer_bytes_threshold`: `0` (flush on every inserted batch)
+/// - `target_batch_size`: `0` (do not chunk; emit one batch per partition
flush)
+/// - `spilled_batch_in_memory_size_threshold`: `None`
+pub struct StreamRepartitionerBuilder {
+ runtime_env: Arc<RuntimeEnv>,
+ partitioner: Arc<dyn SpatialPartitioner>,
+ partitioned_side: PartitionedSide,
+ spill_compression: SpillCompression,
+ spill_metrics: SpillMetrics,
+ buffer_bytes_threshold: usize,
+ target_batch_size: usize,
+ spilled_batch_in_memory_size_threshold: Option<usize>,
+}
+
+impl StreamRepartitionerBuilder {
+ /// Set spill compression applied to newly created per-partition spill
files.
+ pub fn spill_compression(mut self, spill_compression: SpillCompression) ->
Self {
+ self.spill_compression = spill_compression;
+ self
+ }
+
+ /// Set the in-memory buffering threshold (in bytes).
+ ///
+ /// When the buffered in-memory size meets/exceeds this threshold, pending
+ /// rows are flushed to partition writers.
+ pub fn buffer_bytes_threshold(mut self, buffer_bytes_threshold: usize) ->
Self {
+ self.buffer_bytes_threshold = buffer_bytes_threshold;
+ self
+ }
+
+ /// Set the target maximum number of rows per flushed batch (per
partition).
+ ///
+ /// A value of `0` disables chunking.
+ pub fn target_batch_size(mut self, target_batch_size: usize) -> Self {
+ self.target_batch_size = target_batch_size;
+ self
+ }
+
+ /// Set an optional threshold used by spill writers to decide whether to
keep a
+ /// batch in memory vs. spilling.
+ pub fn spilled_batch_in_memory_size_threshold(
+ mut self,
+ spilled_batch_in_memory_size_threshold: Option<usize>,
+ ) -> Self {
+ self.spilled_batch_in_memory_size_threshold =
spilled_batch_in_memory_size_threshold;
+ self
+ }
+
+ /// Build a [`StreamRepartitioner`] with the configured parameters.
+ pub fn build(self) -> StreamRepartitioner {
+ let slots =
PartitionSlots::new(self.partitioner.num_regular_partitions());
+ let slot_count = slots.total_slots();
+ StreamRepartitioner {
+ runtime_env: self.runtime_env,
+ partitioner: self.partitioner,
+ partitioned_side: self.partitioned_side,
+ slots,
+ spill_registry: (0..slot_count).map(|_| None).collect(),
+ geo_stats_accumulators: (0..slot_count)
+ .map(|_| AnalyzeAccumulator::new(WKB_GEOMETRY, WKB_GEOMETRY))
+ .collect(),
+ num_rows: vec![0; slot_count],
+ slot_assignments: (0..slot_count).map(|_| Vec::new()).collect(),
+ row_assignments_buffer: Vec::new(),
+ spill_compression: self.spill_compression,
+ spill_metrics: self.spill_metrics,
+ buffer_bytes_threshold: self.buffer_bytes_threshold,
+ target_batch_size: self.target_batch_size,
+ spilled_batch_in_memory_size_threshold:
self.spilled_batch_in_memory_size_threshold,
+ pending_batches: Vec::new(),
+ pending_bytes: 0,
+ }
+ }
+}
+
+impl StreamRepartitioner {
+ /// Start building a new [`StreamRepartitioner`].
+ ///
+ /// This captures the required configuration (runtime, partitioner, side,
and
+ /// spill metrics). Optional parameters can then be set on the returned
builder.
+ pub fn builder(
+ runtime_env: Arc<RuntimeEnv>,
+ partitioner: Arc<dyn SpatialPartitioner>,
+ partitioned_side: PartitionedSide,
+ spill_metrics: SpillMetrics,
+ ) -> StreamRepartitionerBuilder {
+ StreamRepartitionerBuilder {
+ runtime_env,
+ partitioner,
+ partitioned_side,
+ spill_compression: SpillCompression::Uncompressed,
+ spill_metrics,
+ buffer_bytes_threshold: 0,
+ target_batch_size: 0,
+ spilled_batch_in_memory_size_threshold: None,
+ }
+ }
+
+ /// Repartition a stream of evaluated batches into per-partition spill
files.
+ ///
+ /// This consumes the repartitioner and returns [`SpilledPartitions`] once
the
+ /// input stream is exhausted.
+ pub async fn repartition_stream(
+ mut self,
+ mut stream: SendableEvaluatedBatchStream,
+ ) -> Result<SpilledPartitions> {
+ while let Some(batch_result) = stream.next().await {
+ let batch = batch_result?;
+ self.repartition_batch(batch)?;
+ }
+ self.finish()
+ }
+
+ /// Route a single evaluated batch into its corresponding spill writers.
+ ///
+ /// This runs the spatial partitioner to compute row assignments, buffers
the
+ /// batch, and may flush pending buffered data depending on configuration.
+ pub fn repartition_batch(&mut self, batch: EvaluatedBatch) -> Result<()> {
+ let mut row_assignments = std::mem::take(&mut
self.row_assignments_buffer);
+ assign_rows(
+ &batch,
+ self.partitioner.as_ref(),
+ self.partitioned_side,
+ &mut row_assignments,
+ )?;
+ self.insert_repartitioned_batch(batch, &row_assignments)?;
+ self.row_assignments_buffer = row_assignments;
+ Ok(())
+ }
+
+ /// Insert batch with row assignments into the repartitioner. The spatial
partitioner
+ /// does not need to be invoked in this method. This is useful when the
batch has
+ /// already been partitioned by calling assign_rows.
+ ///
+ /// `row_assignments` must have the same length as the batch row count and
contain
+ /// only partitions valid for the configured [`SpatialPartitioner`].
+ pub fn insert_repartitioned_batch(
+ &mut self,
+ batch: EvaluatedBatch,
+ row_assignments: &[SpatialPartition],
+ ) -> Result<()> {
+ let batch_idx = self.pending_batches.len();
+ self.pending_bytes += batch.in_mem_size()?;
+ self.pending_batches.push(batch);
+ let batch_ref = &self.pending_batches[batch_idx];
+ assert_eq!(row_assignments.len(), batch_ref.num_rows());
+ for (row_idx, partition) in row_assignments.iter().enumerate() {
+ let Some(slot_idx) = self.slots.slot(*partition) else {
+ return sedona_internal_err!(
+ "Invalid partition {:?} for {} regular partitions",
+ partition,
+ self.slots.num_regular_partitions()
+ );
+ };
+ if let Some(wkb) = batch_ref.wkb(row_idx) {
+ self.geo_stats_accumulators[slot_idx].update_statistics(wkb)?;
+ }
+ self.slot_assignments[slot_idx].push((batch_idx, row_idx));
+ self.num_rows[slot_idx] += 1;
+ }
+ let threshold = self.buffer_bytes_threshold;
+ if threshold == 0 || self.pending_bytes >= threshold {
+ self.flush_pending_batches()?;
+ }
+ Ok(())
+ }
+
+ fn flush_pending_batches(&mut self) -> Result<()> {
+ if self.pending_batches.is_empty() {
+ debug_assert!(self
+ .slot_assignments
+ .iter()
+ .all(|assignments| assignments.is_empty()));
+ return Ok(());
+ }
+
+ let pending_batches = std::mem::take(&mut self.pending_batches);
+ self.pending_bytes = 0;
+
+ let record_batches: Vec<&RecordBatch> =
+ pending_batches.iter().map(|batch| &batch.batch).collect();
+ let geom_arrays: Vec<&EvaluatedGeometryArray> = pending_batches
+ .iter()
+ .map(|batch| &batch.geom_array)
+ .collect();
+
+ let mut slot_assignments = std::mem::take(&mut self.slot_assignments);
+
+ for (slot_idx, assignments) in slot_assignments.iter_mut().enumerate()
{
+ if assignments.is_empty() {
+ continue;
+ }
+ let chunk_cap = if self.target_batch_size == 0 {
+ assignments.len()
+ } else {
+ self.target_batch_size
+ }
+ .max(1);
+ for chunk in assignments.chunks(chunk_cap) {
+ let sliced_batch =
+ interleave_evaluated_batch(&record_batches, &geom_arrays,
chunk)?;
+ let writer = self.ensure_writer(slot_idx, &sliced_batch)?;
+ writer.append(&sliced_batch)?;
+ }
+
+ assignments.clear();
+ }
+
+ self.slot_assignments = slot_assignments;
+ Ok(())
+ }
+
+ /// Seal every partition and return their associated spill files and
bounds.
+ ///
+ /// This flushes any buffered rows, closes all partition writers, and
returns a
+ /// [`SpilledPartitions`] summary.
+ pub fn finish(mut self) -> Result<SpilledPartitions> {
+ self.flush_pending_batches()?;
+ let slot_count = self.slots.total_slots();
+ let mut spilled_partition_vec = Vec::with_capacity(slot_count);
+ for ((writer_opt, accumulator), num_rows) in self
+ .spill_registry
+ .into_iter()
+ .zip(self.geo_stats_accumulators.into_iter())
+ .zip(self.num_rows.into_iter())
+ {
+ let spilled_partition = if let Some(writer) = writer_opt {
+ let spill_files = vec![Arc::new(writer.finish()?)];
+ let geo_statistics = accumulator.finish();
+ SpilledPartition::new(spill_files, geo_statistics, num_rows)
+ } else {
+ SpilledPartition::empty()
+ };
+ spilled_partition_vec.push(spilled_partition);
+ }
+
+ Ok(SpilledPartitions::new(self.slots, spilled_partition_vec))
+ }
+
+ fn ensure_writer(
+ &mut self,
+ slot_idx: usize,
+ batch: &EvaluatedBatch,
+ ) -> Result<&mut EvaluatedBatchSpillWriter> {
+ if self.spill_registry[slot_idx].is_none() {
+ self.spill_registry[slot_idx] =
Some(EvaluatedBatchSpillWriter::try_new(
+ Arc::clone(&self.runtime_env),
+ batch.schema(),
+ &batch.geom_array.sedona_type,
+ "streaming repartitioner",
+ self.spill_compression,
+ self.spill_metrics.clone(),
+ self.spilled_batch_in_memory_size_threshold,
+ )?);
+ }
+ Ok(self.spill_registry[slot_idx]
+ .as_mut()
+ .expect("writer inserted above"))
+ }
+}
+
+/// Populate `assignments` with the spatial partition for every row in `batch`,
+/// reusing the provided buffer to avoid repeated allocations. The vector
length
+/// after this call matches `batch.rects().len()` and each entry records which
+/// [`SpatialPartition`] the corresponding row belongs to.
+pub(crate) fn assign_rows(
+ batch: &EvaluatedBatch,
+ partitioner: &dyn SpatialPartitioner,
+ partitioned_side: PartitionedSide,
+ assignments: &mut Vec<SpatialPartition>,
+) -> Result<()> {
+ assignments.clear();
+ assignments.reserve(batch.rects().len());
+
+ match partitioned_side {
+ PartitionedSide::BuildSide => {
+ let mut cnt = 0;
+ let num_regular_partitions = partitioner.num_regular_partitions()
as u32;
+ for rect_opt in batch.rects() {
+ let partition = match rect_opt {
+ Some(rect) =>
partitioner.partition_no_multi(&geo_rect_to_bbox(rect))?,
+ None => {
+ // Round-robin empty geometries through regular
partitions to avoid
+ // overloading a single slot when the build side is
mostly empty.
+ let p = SpatialPartition::Regular(cnt);
+ cnt = (cnt + 1) % num_regular_partitions;
+ p
+ }
+ };
+ assignments.push(partition);
+ }
+ }
+ PartitionedSide::ProbeSide => {
+ for rect_opt in batch.rects() {
+ let partition = match rect_opt {
+ Some(rect) =>
partitioner.partition(&geo_rect_to_bbox(rect))?,
+ None => SpatialPartition::None,
+ };
+ assignments.push(partition);
+ }
+ }
+ }
+
+ Ok(())
+}
+
+/// Build a new [`EvaluatedBatch`] by interleaving rows from the provided
+/// `record_batches`/`geom_arrays` inputs according to `assignments`. Each pair
+/// in `assignments` identifies the source batch index and row index that
should
+/// appear in the output in order, ensuring the geometry metadata stays aligned
+/// with the Arrow row data.
+pub(crate) fn interleave_evaluated_batch(
+ record_batches: &[&RecordBatch],
+ geom_arrays: &[&EvaluatedGeometryArray],
+ indices: &[(usize, usize)],
+) -> Result<EvaluatedBatch> {
+ if record_batches.is_empty() || geom_arrays.is_empty() {
+ return sedona_internal_err!("interleave_evaluated_batch requires at
least one batch");
+ }
+ let batch = interleave_record_batch(record_batches, indices)?;
+ let geom_array = interleave_geometry_array(geom_arrays, indices)?;
+ Ok(EvaluatedBatch { batch, geom_array })
+}
+
+fn interleave_geometry_array(
+ geom_arrays: &[&EvaluatedGeometryArray],
+ indices: &[(usize, usize)],
+) -> Result<EvaluatedGeometryArray> {
+ if geom_arrays.is_empty() {
+ return sedona_internal_err!("interleave_geometry_array requires at
least one batch");
+ }
+ let sedona_type = &geom_arrays[0].sedona_type;
+ let value_refs: Vec<&dyn Array> = geom_arrays
+ .iter()
+ .map(|geom| geom.geometry_array.as_ref())
+ .collect();
+ let geometry_array = arrow_interleave(&value_refs, indices)?;
+
+ let distance = interleave_distance_columns(geom_arrays, indices)?;
+
+ let mut result = EvaluatedGeometryArray::try_new(geometry_array,
sedona_type)?;
+ result.distance = distance;
+ Ok(result)
+}
+
+fn interleave_distance_columns(
+ geom_arrays: &[&EvaluatedGeometryArray],
+ assignments: &[(usize, usize)],
+) -> Result<Option<ColumnarValue>> {
+ // Check consistency and determine if we need array conversion
+ let mut first_value: Option<&ColumnarValue> = None;
+ let mut needs_array = false;
+ let mut all_null = true;
+ let mut first_scalar: Option<&ScalarValue> = None;
+
+ for geom in geom_arrays {
+ match &geom.distance {
+ Some(value) => {
+ if first_value.is_none() {
+ first_value = Some(value);
+ }
+
+ match value {
+ ColumnarValue::Array(array) => {
+ needs_array = true;
+ if all_null && array.logical_null_count() !=
array.len() {
+ all_null = false;
+ }
+ }
+ ColumnarValue::Scalar(scalar) => {
+ if let Some(first) = first_scalar {
+ if first != scalar {
+ needs_array = true;
+ }
+ } else {
+ first_scalar = Some(scalar);
+ }
+ if !scalar.is_null() {
+ all_null = false;
+ }
+ }
+ }
+ }
+ None => {
+ if first_value.is_some() && !all_null {
+ return sedona_internal_err!("Inconsistent distance
metadata across batches");
+ }
+ }
+ }
+ }
+
+ if all_null {
+ return Ok(None);
+ }
+
+ let Some(distance_value) = first_value else {
+ return Ok(None);
+ };
+
+ // If all scalars match, return scalar
+ if !needs_array {
+ if let ColumnarValue::Scalar(value) = distance_value {
+ return Ok(Some(ColumnarValue::Scalar(value.clone())));
+ }
+ }
+
+ // Convert to arrays and interleave
+ let mut arrays: Vec<ArrayRef> = Vec::with_capacity(geom_arrays.len());
+ for geom in geom_arrays {
+ match &geom.distance {
+ Some(ColumnarValue::Array(array)) => arrays.push(array.clone()),
+ Some(ColumnarValue::Scalar(value)) => {
+
arrays.push(value.to_array_of_size(geom.geometry_array.len())?);
+ }
+ None => {
+ return sedona_internal_err!("Inconsistent distance metadata
across batches");
+ }
+ }
+ }
+
+ let array_refs: Vec<&dyn Array> = arrays.iter().map(|array|
array.as_ref()).collect();
+ let array = arrow_interleave(&array_refs, assignments)?;
+ Ok(Some(ColumnarValue::Array(array)))
+}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+ use arrow_array::{ArrayRef, BinaryArray, Int32Array};
+ use arrow_schema::{DataType, Field, Schema};
+ use datafusion_physical_plan::metrics::ExecutionPlanMetricsSet;
+ use sedona_geometry::bounding_box::BoundingBox;
+ use sedona_geometry::interval::IntervalTrait;
+ use sedona_schema::datatypes::WKB_GEOMETRY;
+
+ use crate::{
+ evaluated_batch::{
+ evaluated_batch_stream::in_mem::InMemoryEvaluatedBatchStream,
+ spill::EvaluatedBatchSpillReader,
+ },
+ partitioning::flat::FlatPartitioner,
+ };
+
+ const BUFFER_BYTES: usize = 8 * 1024 * 1024;
+ const TARGET_BATCH_SIZE: usize = 4096;
+
+ fn sample_schema() -> Arc<Schema> {
+ Arc::new(Schema::new(vec![Field::new("id", DataType::Int32, false)]))
+ }
+
+ fn sample_batch(ids: &[i32], wkbs: Vec<Option<Vec<u8>>>) ->
Result<EvaluatedBatch> {
+ assert_eq!(ids.len(), wkbs.len());
+ let id_array = Arc::new(Int32Array::from(ids.to_vec())) as ArrayRef;
+ let batch = RecordBatch::try_new(sample_schema(), vec![id_array])?;
+ let geom_values: Vec<Option<&[u8]>> = wkbs
+ .iter()
+ .map(|wkb_opt| wkb_opt.as_ref().map(|wkb| wkb.as_slice()))
+ .collect();
+ let geom_array: ArrayRef = Arc::new(BinaryArray::from(geom_values));
+ let geom = EvaluatedGeometryArray::try_new(geom_array, &WKB_GEOMETRY)?;
+ Ok(EvaluatedBatch {
+ batch,
+ geom_array: geom,
+ })
+ }
+
+ fn point_wkb(x: f64, y: f64) -> Vec<u8> {
+ let mut buf = vec![1u8, 1, 0, 0, 0];
+ buf.extend_from_slice(&x.to_le_bytes());
+ buf.extend_from_slice(&y.to_le_bytes());
+ buf
+ }
+
+ fn rect_wkb(min_x: f64, min_y: f64, max_x: f64, max_y: f64) -> Vec<u8> {
+ assert!(min_x <= max_x, "min_x must be <= max_x");
+ assert!(min_y <= max_y, "min_y must be <= max_y");
+ let mut buf = Vec::with_capacity(1 + 4 + 4 + 4 + 5 * 16);
+ buf.push(1u8); // little endian
+ buf.extend_from_slice(&3u32.to_le_bytes()); // polygon type
+ buf.extend_from_slice(&1u32.to_le_bytes()); // single ring
+ buf.extend_from_slice(&5u32.to_le_bytes()); // five coordinates
(closed ring)
+ let coords = [
+ (min_x, min_y),
+ (max_x, min_y),
+ (max_x, max_y),
+ (min_x, max_y),
+ (min_x, min_y),
+ ];
+ for (x, y) in coords {
+ buf.extend_from_slice(&x.to_le_bytes());
+ buf.extend_from_slice(&y.to_le_bytes());
+ }
+ buf
+ }
+
+ fn read_ids(file: &RefCountedTempFile) -> Result<Vec<i32>> {
+ let mut reader = EvaluatedBatchSpillReader::try_new(file)?;
+ let mut ids = Vec::new();
+ while let Some(batch) = reader.next_batch() {
+ let batch = batch?;
+ let array = batch
+ .batch
+ .column(0)
+ .as_any()
+ .downcast_ref::<Int32Array>()
+ .unwrap();
+ for i in 0..array.len() {
+ ids.push(array.value(i));
+ }
+ }
+ Ok(ids)
+ }
+
+ fn read_batch_row_counts(file: &RefCountedTempFile) -> Result<Vec<usize>> {
+ let mut reader = EvaluatedBatchSpillReader::try_new(file)?;
+ let mut counts = Vec::new();
+ while let Some(batch) = reader.next_batch() {
+ let batch = batch?;
+ counts.push(batch.batch.num_rows());
+ }
+ Ok(counts)
+ }
+
+ fn bbox_limits(bbox: &BoundingBox) -> (f64, f64, f64, f64) {
+ (bbox.x().lo(), bbox.x().hi(), bbox.y().lo(), bbox.y().hi())
+ }
+
+ #[tokio::test]
+ async fn repartition_basic() -> Result<()> {
+ let wkbs = vec![
+ Some(point_wkb(10.0, 10.0)),
+ Some(point_wkb(60.0, 10.0)),
+ Some(point_wkb(150.0, 10.0)),
+ ];
+ let batch = sample_batch(&[0, 1, 2], wkbs)?;
+ let schema = batch.schema();
+ let stream: SendableEvaluatedBatchStream =
+ Box::pin(InMemoryEvaluatedBatchStream::new(schema, vec![batch]));
+
+ let partitions = vec![
+ BoundingBox::xy((0.0, 50.0), (0.0, 50.0)),
+ BoundingBox::xy((50.0, 100.0), (0.0, 50.0)),
+ ];
+ let partitioner = Arc::new(FlatPartitioner::try_new(partitions)?);
+ let runtime_env = Arc::new(RuntimeEnv::default());
+ let metrics = SpillMetrics::new(&ExecutionPlanMetricsSet::new(), 0);
+
+ let result = StreamRepartitioner::builder(
+ runtime_env,
+ partitioner,
+ PartitionedSide::ProbeSide,
+ metrics,
+ )
+ .spill_compression(SpillCompression::Uncompressed)
+ .buffer_bytes_threshold(BUFFER_BYTES)
+ .target_batch_size(TARGET_BATCH_SIZE)
+ .spilled_batch_in_memory_size_threshold(None)
+ .build()
+ .repartition_stream(stream)
+ .await?;
+
+ assert_eq!(result.spill_file_count(), 3);
+ assert_eq!(
+ read_ids(
+ &result
+ .spilled_partition(SpatialPartition::Regular(0))?
+ .spill_files()[0]
+ )?,
+ vec![0]
+ );
+ assert_eq!(
+ read_ids(
+ &result
+ .spilled_partition(SpatialPartition::Regular(1))?
+ .spill_files()[0]
+ )?,
+ vec![1]
+ );
+ assert_eq!(
+ read_ids(
+ &result
+ .spilled_partition(SpatialPartition::None)?
+ .spill_files()[0]
+ )?,
+ vec![2]
+ );
+
+ assert_eq!(
+ bbox_limits(
+ result
+ .spilled_partition(SpatialPartition::Regular(0))?
+ .bounding_box()
+ .unwrap()
+ ),
+ (10.0, 10.0, 10.0, 10.0)
+ );
+ assert_eq!(
+ bbox_limits(
+ result
+ .spilled_partition(SpatialPartition::Regular(1))?
+ .bounding_box()
+ .unwrap()
+ ),
+ (60.0, 60.0, 10.0, 10.0)
+ );
+ assert_eq!(
+ bbox_limits(
+ result
+ .spilled_partition(SpatialPartition::None)?
+ .bounding_box()
+ .unwrap()
+ ),
+ (150.0, 150.0, 10.0, 10.0)
+ );
+ Ok(())
+ }
+
+ #[tokio::test]
+ async fn repartition_multi_and_none() -> Result<()> {
+ let wkbs = vec![Some(rect_wkb(25.0, 0.0, 75.0, 20.0)), None];
+ let batch = sample_batch(&[0, 1], wkbs)?;
+ let schema = batch.schema();
+ let stream: SendableEvaluatedBatchStream =
+ Box::pin(InMemoryEvaluatedBatchStream::new(schema, vec![batch]));
+
+ let partitions = vec![
+ BoundingBox::xy((0.0, 50.0), (0.0, 50.0)),
+ BoundingBox::xy((50.0, 100.0), (0.0, 50.0)),
+ ];
+ let partitioner = Arc::new(FlatPartitioner::try_new(partitions)?);
+ let runtime_env = Arc::new(RuntimeEnv::default());
+ let metrics = SpillMetrics::new(&ExecutionPlanMetricsSet::new(), 0);
+
+ let result = StreamRepartitioner::builder(
+ runtime_env,
+ partitioner,
+ PartitionedSide::ProbeSide,
+ metrics,
+ )
+ .spill_compression(SpillCompression::Uncompressed)
+ .buffer_bytes_threshold(BUFFER_BYTES)
+ .target_batch_size(TARGET_BATCH_SIZE)
+ .spilled_batch_in_memory_size_threshold(None)
+ .build()
+ .repartition_stream(stream)
+ .await?;
+
+ assert_eq!(result.spill_file_count(), 2);
+ assert_eq!(
+ read_ids(
+ &result
+ .spilled_partition(SpatialPartition::Multi)?
+ .spill_files()[0]
+ )?,
+ vec![0]
+ );
+ assert_eq!(
+ read_ids(
+ &result
+ .spilled_partition(SpatialPartition::None)?
+ .spill_files()[0]
+ )?,
+ vec![1]
+ );
+ assert_eq!(
+ bbox_limits(
+ result
+ .spilled_partition(SpatialPartition::Multi)?
+ .bounding_box()
+ .unwrap()
+ ),
+ (25.0, 75.0, 0.0, 20.0)
+ );
+ let none_bound = result
+ .spilled_partition(SpatialPartition::None)?
+ .bounding_box()
+ .expect("Geo stats should exist for None partition");
+ assert!(none_bound.x().is_empty());
+ assert!(none_bound.y().is_empty());
+ Ok(())
+ }
+
+ #[test]
+ fn streaming_repartitioner_finishes_partitions() -> Result<()> {
+ let wkbs = vec![Some(point_wkb(10.0, 10.0)), Some(point_wkb(60.0,
10.0))];
+ let batch = sample_batch(&[0, 1], wkbs)?;
+ let partitions = vec![
+ BoundingBox::xy((0.0, 50.0), (0.0, 50.0)),
+ BoundingBox::xy((50.0, 100.0), (0.0, 50.0)),
+ ];
+ let partitioner = Arc::new(FlatPartitioner::try_new(partitions)?);
+ let runtime_env = Arc::new(RuntimeEnv::default());
+ let spill_metrics = SpillMetrics::new(&ExecutionPlanMetricsSet::new(),
0);
+ let mut repartitioner = StreamRepartitioner::builder(
+ runtime_env,
+ partitioner,
+ PartitionedSide::ProbeSide,
+ spill_metrics,
+ )
+ .spill_compression(SpillCompression::Uncompressed)
+ .buffer_bytes_threshold(0)
+ .target_batch_size(TARGET_BATCH_SIZE)
+ .spilled_batch_in_memory_size_threshold(None)
+ .build();
+
+ repartitioner.repartition_batch(batch)?;
+ let result = repartitioner.finish()?;
+ assert!(result
+ .spilled_partition(SpatialPartition::None)?
+ .spill_files()
+ .is_empty());
+ assert_eq!(
+ read_ids(
+ &result
+ .spilled_partition(SpatialPartition::Regular(0))?
+ .spill_files()[0]
+ )?,
+ vec![0]
+ );
+ assert_eq!(
+ read_ids(
+ &result
+ .spilled_partition(SpatialPartition::Regular(1))?
+ .spill_files()[0]
+ )?,
+ vec![1]
+ );
+ Ok(())
+ }
+
+ #[test]
+ fn streaming_repartitioner_buffers_until_threshold() -> Result<()> {
+ let batch_a = sample_batch(&[0], vec![Some(point_wkb(10.0, 10.0))])?;
+ let batch_b = sample_batch(&[1], vec![Some(point_wkb(20.0, 10.0))])?;
+ let partitions = vec![BoundingBox::xy((0.0, 50.0), (0.0, 50.0))];
+ let partitioner = Arc::new(FlatPartitioner::try_new(partitions)?);
+ let runtime_env = Arc::new(RuntimeEnv::default());
+ let spill_metrics = SpillMetrics::new(&ExecutionPlanMetricsSet::new(),
0);
+ let mut repartitioner = StreamRepartitioner::builder(
+ runtime_env,
+ partitioner,
+ PartitionedSide::ProbeSide,
+ spill_metrics,
+ )
+ .spill_compression(SpillCompression::Uncompressed)
+ .buffer_bytes_threshold(usize::MAX)
+ .target_batch_size(TARGET_BATCH_SIZE)
+ .spilled_batch_in_memory_size_threshold(None)
+ .build();
+
+ repartitioner.repartition_batch(batch_a)?;
+ repartitioner.repartition_batch(batch_b)?;
+ let result = repartitioner.finish()?;
+ assert_eq!(
+ read_ids(
+ &result
+ .spilled_partition(SpatialPartition::Regular(0))?
+ .spill_files()[0]
+ )?,
+ vec![0, 1]
+ );
+ Ok(())
+ }
+
+ #[test]
+ fn streaming_repartitioner_respects_target_batch_size() -> Result<()> {
+ let batch_a = sample_batch(&[0], vec![Some(point_wkb(10.0, 10.0))])?;
+ let batch_b = sample_batch(&[1], vec![Some(point_wkb(20.0, 10.0))])?;
+ let partitions = vec![BoundingBox::xy((0.0, 50.0), (0.0, 50.0))];
+ let partitioner = Arc::new(FlatPartitioner::try_new(partitions)?);
+ let runtime_env = Arc::new(RuntimeEnv::default());
+ let spill_metrics = SpillMetrics::new(&ExecutionPlanMetricsSet::new(),
0);
+ let mut repartitioner = StreamRepartitioner::builder(
+ runtime_env,
+ partitioner,
+ PartitionedSide::ProbeSide,
+ spill_metrics,
+ )
+ .spill_compression(SpillCompression::Uncompressed)
+ .buffer_bytes_threshold(usize::MAX)
+ .target_batch_size(1)
+ .spilled_batch_in_memory_size_threshold(None)
+ .build();
+
+ repartitioner.repartition_batch(batch_a)?;
+ repartitioner.repartition_batch(batch_b)?;
+ let result = repartitioner.finish()?;
+ let counts = read_batch_row_counts(
+ &result
+ .spilled_partition(SpatialPartition::Regular(0))?
+ .spill_files()[0],
+ )?;
+ assert_eq!(counts, vec![1, 1]);
+ Ok(())
+ }
+
+ fn make_geom_array_with_distance(
+ wkbs: Vec<Vec<u8>>,
+ distance: Option<ColumnarValue>,
+ ) -> Result<EvaluatedGeometryArray> {
+ let geom_array: ArrayRef = Arc::new(BinaryArray::from(
+ wkbs.iter()
+ .map(|wkb| Some(wkb.as_slice()))
+ .collect::<Vec<_>>(),
+ ));
+ let mut geom = EvaluatedGeometryArray::try_new(geom_array,
&WKB_GEOMETRY)?;
+ geom.distance = distance;
+ Ok(geom)
+ }
+
+ #[test]
+ fn interleave_distance_none() -> Result<()> {
+ let wkbs1 = vec![point_wkb(10.0, 10.0), point_wkb(20.0, 20.0)];
+ let wkbs2 = vec![point_wkb(30.0, 30.0)];
+
+ let geom1 = make_geom_array_with_distance(wkbs1, None)?;
+ let geom2 = make_geom_array_with_distance(wkbs2, None)?;
+
+ let geom_arrays = vec![&geom1, &geom2];
+ let assignments = vec![(0, 0), (1, 0), (0, 1)];
+
+ let result = interleave_distance_columns(&geom_arrays, &assignments)?;
+ assert!(result.is_none());
+ Ok(())
+ }
+
+ #[test]
+ fn interleave_distance_uniform_scalar() -> Result<()> {
+ let wkbs1 = vec![point_wkb(10.0, 10.0), point_wkb(20.0, 20.0)];
+ let wkbs2 = vec![point_wkb(30.0, 30.0)];
+
+ let scalar = ScalarValue::Float64(Some(5.0));
+ let geom1 =
+ make_geom_array_with_distance(wkbs1,
Some(ColumnarValue::Scalar(scalar.clone())))?;
+ let geom2 =
+ make_geom_array_with_distance(wkbs2,
Some(ColumnarValue::Scalar(scalar.clone())))?;
+
+ let geom_arrays = vec![&geom1, &geom2];
+ let assignments = vec![(0, 0), (1, 0), (0, 1)];
+
+ let result = interleave_distance_columns(&geom_arrays, &assignments)?;
+ assert!(matches!(result, Some(ColumnarValue::Scalar(_))));
+ if let Some(ColumnarValue::Scalar(value)) = result {
+ assert_eq!(value, ScalarValue::Float64(Some(5.0)));
+ }
+ Ok(())
+ }
+
+ #[test]
+ fn interleave_distance_different_scalars() -> Result<()> {
+ use arrow_array::Float64Array;
+
+ let wkbs1 = vec![point_wkb(10.0, 10.0), point_wkb(20.0, 20.0)];
+ let wkbs2 = vec![point_wkb(30.0, 30.0)];
+
+ let scalar1 = ScalarValue::Float64(Some(5.0));
+ let scalar2 = ScalarValue::Float64(Some(10.0));
+ let geom1 = make_geom_array_with_distance(wkbs1,
Some(ColumnarValue::Scalar(scalar1)))?;
+ let geom2 = make_geom_array_with_distance(wkbs2,
Some(ColumnarValue::Scalar(scalar2)))?;
+
+ let geom_arrays = vec![&geom1, &geom2];
+ let assignments = vec![(0, 0), (1, 0), (0, 1)];
+
+ let result = interleave_distance_columns(&geom_arrays, &assignments)?;
+ assert!(matches!(result, Some(ColumnarValue::Array(_))));
+ if let Some(ColumnarValue::Array(array)) = result {
+ let float_array =
array.as_any().downcast_ref::<Float64Array>().unwrap();
+ assert_eq!(float_array.len(), 3);
+ assert_eq!(float_array.value(0), 5.0);
+ assert_eq!(float_array.value(1), 10.0);
+ assert_eq!(float_array.value(2), 5.0);
+ }
+ Ok(())
+ }
+
+ #[test]
+ fn interleave_distance_arrays() -> Result<()> {
+ use arrow_array::Float64Array;
+
+ let wkbs1 = vec![point_wkb(10.0, 10.0), point_wkb(20.0, 20.0)];
+ let wkbs2 = vec![point_wkb(30.0, 30.0)];
+
+ let array1: ArrayRef = Arc::new(Float64Array::from(vec![1.0, 2.0]));
+ let array2: ArrayRef = Arc::new(Float64Array::from(vec![3.0]));
+ let geom1 = make_geom_array_with_distance(wkbs1,
Some(ColumnarValue::Array(array1)))?;
+ let geom2 = make_geom_array_with_distance(wkbs2,
Some(ColumnarValue::Array(array2)))?;
+
+ let geom_arrays = vec![&geom1, &geom2];
+ let assignments = vec![(0, 0), (1, 0), (0, 1)];
+
+ let result = interleave_distance_columns(&geom_arrays, &assignments)?;
+ assert!(matches!(result, Some(ColumnarValue::Array(_))));
+ if let Some(ColumnarValue::Array(array)) = result {
+ let float_array =
array.as_any().downcast_ref::<Float64Array>().unwrap();
+ assert_eq!(float_array.len(), 3);
+ assert_eq!(float_array.value(0), 1.0);
+ assert_eq!(float_array.value(1), 3.0);
+ assert_eq!(float_array.value(2), 2.0);
+ }
+ Ok(())
+ }
+
+ #[test]
+ fn interleave_distance_mixed_scalar_and_array() -> Result<()> {
+ use arrow_array::Float64Array;
+
+ let wkbs1 = vec![point_wkb(10.0, 10.0), point_wkb(20.0, 20.0)];
+ let wkbs2 = vec![point_wkb(30.0, 30.0)];
+
+ let scalar = ScalarValue::Float64(Some(5.0));
+ let array: ArrayRef = Arc::new(Float64Array::from(vec![10.0]));
+ let geom1 = make_geom_array_with_distance(wkbs1,
Some(ColumnarValue::Scalar(scalar)))?;
+ let geom2 = make_geom_array_with_distance(wkbs2,
Some(ColumnarValue::Array(array)))?;
+
+ let geom_arrays = vec![&geom1, &geom2];
+ let assignments = vec![(0, 0), (1, 0), (0, 1)];
+
+ let result = interleave_distance_columns(&geom_arrays, &assignments)?;
+ assert!(matches!(result, Some(ColumnarValue::Array(_))));
+ if let Some(ColumnarValue::Array(array)) = result {
+ let float_array =
array.as_any().downcast_ref::<Float64Array>().unwrap();
+ assert_eq!(float_array.len(), 3);
+ assert_eq!(float_array.value(0), 5.0);
+ assert_eq!(float_array.value(1), 10.0);
+ assert_eq!(float_array.value(2), 5.0);
+ }
+ Ok(())
+ }
+
+ #[test]
+ fn interleave_evaluated_batch_empty_assignments() -> Result<()> {
+ let batch_a = sample_batch(&[0], vec![Some(point_wkb(10.0, 10.0))])?;
+ let batch_b = sample_batch(&[1], vec![Some(point_wkb(20.0, 20.0))])?;
+ let record_batches = vec![&batch_a.batch, &batch_b.batch];
+ let geom_arrays = vec![&batch_a.geom_array, &batch_b.geom_array];
+
+ let result = interleave_evaluated_batch(&record_batches, &geom_arrays,
&[])?;
+ assert_eq!(result.batch.num_rows(), 0);
+ assert_eq!(result.geom_array.geometry_array.len(), 0);
+ assert!(result.geom_array.rects.is_empty());
+ assert!(result.geom_array.distance.is_none());
+ Ok(())
+ }
+
+ #[test]
+ fn interleave_distance_inconsistent_metadata() -> Result<()> {
+ let wkbs1 = vec![point_wkb(10.0, 10.0)];
+ let wkbs2 = vec![point_wkb(20.0, 20.0)];
+
+ let scalar = ScalarValue::Float64(Some(5.0));
+ let geom1 = make_geom_array_with_distance(wkbs1,
Some(ColumnarValue::Scalar(scalar)))?;
+ let geom2 = make_geom_array_with_distance(wkbs2, None)?;
+
+ let geom_arrays = vec![&geom1, &geom2];
+ let assignments = vec![(0, 0), (1, 0)];
+
+ let result = interleave_distance_columns(&geom_arrays, &assignments);
+ assert!(result.is_err());
+ if let Err(e) = result {
+ assert!(e.to_string().contains("Inconsistent distance metadata"));
+ }
+ Ok(())
+ }
+
+ #[test]
+ fn interleave_binary_view_array() -> Result<()> {
+ use arrow_array::BinaryViewArray;
+ use sedona_schema::crs::Crs;
+ use sedona_schema::datatypes::{Edges, SedonaType};
+ let wkb_view_geometry = SedonaType::WkbView(Edges::Planar, Crs::None);
+
+ let wkbs1 = [point_wkb(10.0, 10.0), point_wkb(20.0, 20.0)];
+ let wkbs2 = [point_wkb(30.0, 30.0)];
+
+ // Create BinaryViewArray
+ let array1 = BinaryViewArray::from_iter(wkbs1.iter().map(|w|
Some(w.as_slice())));
+ let array2 = BinaryViewArray::from_iter(wkbs2.iter().map(|w|
Some(w.as_slice())));
+
+ let geom1 = EvaluatedGeometryArray::try_new(Arc::new(array1),
&wkb_view_geometry)?;
+ let geom2 = EvaluatedGeometryArray::try_new(Arc::new(array2),
&wkb_view_geometry)?;
+
+ let geom_arrays = vec![&geom1, &geom2];
+ let assignments = vec![(0, 0), (1, 0), (0, 1)];
+
+ // Create dummy record batches
+ let batch1 = RecordBatch::try_new(
+ sample_schema(),
+ vec![Arc::new(Int32Array::from(vec![1, 2]))],
+ )?;
+ let batch2 =
+ RecordBatch::try_new(sample_schema(),
vec![Arc::new(Int32Array::from(vec![3]))])?;
+ let record_batches = vec![&batch1, &batch2];
+
+ let result = interleave_evaluated_batch(&record_batches, &geom_arrays,
&assignments)?;
+
+ // Check if the result geometry array is BinaryViewArray
+ let geom_array = result.geom_array.geometry_array;
+ assert!(geom_array
+ .as_any()
+ .downcast_ref::<BinaryViewArray>()
+ .is_some());
+
+ // Check values
+ let view_array = geom_array
+ .as_any()
+ .downcast_ref::<BinaryViewArray>()
+ .unwrap();
+ assert_eq!(view_array.len(), 3);
+ assert_eq!(view_array.value(0), wkbs1[0].as_slice());
+ assert_eq!(view_array.value(1), wkbs2[0].as_slice());
+ assert_eq!(view_array.value(2), wkbs1[1].as_slice());
+
+ Ok(())
+ }
+
+ #[test]
+ fn interleave_distance_mixed_none_and_null() -> Result<()> {
+ use arrow_array::Float64Array;
+
+ let wkbs1 = vec![point_wkb(10.0, 10.0)];
+ let wkbs2 = vec![point_wkb(20.0, 20.0)];
+ let wkbs3 = vec![point_wkb(30.0, 30.0)];
+
+ let null_array = Arc::new(Float64Array::new_null(1));
+ let ega1 = make_geom_array_with_distance(wkbs1,
Some(ColumnarValue::Array(null_array)))?;
+
+ let null_scalar = ScalarValue::Float64(None);
+ let ega2 = make_geom_array_with_distance(wkbs2,
Some(ColumnarValue::Scalar(null_scalar)))?;
+
+ let ega3 = make_geom_array_with_distance(wkbs3, None)?;
+
+ let vec_ega = vec![&ega1, &ega2, &ega3];
+ let assignments = vec![(0, 0), (1, 0), (2, 0)];
+
+ let result = interleave_distance_columns(&vec_ega, &assignments)?;
+ assert!(result.is_none());
+ Ok(())
+ }
+}
diff --git a/rust/sedona-spatial-join/src/partitioning/util.rs
b/rust/sedona-spatial-join/src/partitioning/util.rs
index a5f57304..20a93801 100644
--- a/rust/sedona-spatial-join/src/partitioning/util.rs
+++ b/rust/sedona-spatial-join/src/partitioning/util.rs
@@ -63,6 +63,13 @@ pub(crate) fn bbox_to_geo_rect(bbox: &BoundingBox) ->
Result<Option<Rect<f32>>>
}
}
+/// Convert a [`Rect<f32>`] into a [`BoundingBox`].
+pub(crate) fn geo_rect_to_bbox(rect: &Rect<f32>) -> BoundingBox {
+ let min = rect.min();
+ let max = rect.max();
+ BoundingBox::xy((min.x as f64, max.x as f64), (min.y as f64, max.y as f64))
+}
+
/// Creates a `Rect` from four coordinate values representing the bounding box.
///
/// This is a convenience function that constructs a `geo::Rect` from
individual
diff --git a/rust/sedona-spatial-join/src/stream.rs
b/rust/sedona-spatial-join/src/stream.rs
index 6cf175c2..8451ff2d 100644
--- a/rust/sedona-spatial-join/src/stream.rs
+++ b/rust/sedona-spatial-join/src/stream.rs
@@ -413,7 +413,7 @@ impl SpatialJoinStream {
let mut analyzer = AnalyzeAccumulator::new(WKB_GEOMETRY,
WKB_GEOMETRY);
let geom_array = &probe_evaluated_batch.geom_array;
for wkb in geom_array.wkbs().iter().flatten() {
- analyzer.update_statistics(wkb, wkb.buf().len())?;
+ analyzer.update_statistics(wkb)?;
}
let stats = analyzer.finish();
spatial_index.merge_probe_stats(stats);
diff --git a/rust/sedona-spatial-join/src/utils/spill.rs
b/rust/sedona-spatial-join/src/utils/spill.rs
index fef585a5..116d5043 100644
--- a/rust/sedona-spatial-join/src/utils/spill.rs
+++ b/rust/sedona-spatial-join/src/utils/spill.rs
@@ -39,7 +39,7 @@ pub(crate) struct RecordBatchSpillWriter {
in_progress_file: RefCountedTempFile,
writer: StreamWriter<File>,
metrics: SpillMetrics,
- batch_size_threshold: Option<usize>,
+ batch_in_memory_size_threshold: Option<usize>,
gc_view_arrays: bool,
}
@@ -50,7 +50,7 @@ impl RecordBatchSpillWriter {
request_description: &str,
compression: SpillCompression,
metrics: SpillMetrics,
- batch_size_threshold: Option<usize>,
+ batch_in_memory_size_threshold: Option<usize>,
) -> Result<Self> {
let in_progress_file =
env.disk_manager.create_tmp_file(request_description)?;
let file = File::create(in_progress_file.path())?;
@@ -67,7 +67,7 @@ impl RecordBatchSpillWriter {
in_progress_file,
writer,
metrics,
- batch_size_threshold,
+ batch_in_memory_size_threshold,
gc_view_arrays,
})
}
@@ -101,7 +101,7 @@ impl RecordBatchSpillWriter {
}
fn calculate_rows_per_split(&self, batch: &RecordBatch, num_rows: usize)
-> Result<usize> {
- let Some(threshold) = self.batch_size_threshold else {
+ let Some(threshold) = self.batch_in_memory_size_threshold else {
return Ok(num_rows);
};
if threshold == 0 {