Copilot commented on code in PR #592:
URL: https://github.com/apache/sedona-db/pull/592#discussion_r2788708431
##########
rust/sedona-spatial-join/src/partitioning/flat.rs:
##########
@@ -40,6 +40,7 @@ use sedona_geometry::interval::IntervalTrait;
use crate::partitioning::{SpatialPartition, SpatialPartitioner};
/// Spatial partitioner that linearly scans partition boundaries.
+#[derive(Clone)]
pub struct FlatPartitioner {
boundaries: Vec<BoundingBox>,
}
Review Comment:
`FlatPartitioner` is now cloned to create per-task partitioners, and its
`Clone` will duplicate the full `Vec<BoundingBox>`. With many partitions and/or
many tasks, this can lead to significant memory overhead. Consider storing the
boundaries behind an `Arc` (e.g., `Arc<Vec<BoundingBox>>` or
`Arc<[BoundingBox]>`) so `box_clone()` is cheap like the RTree/KDB partitioners.
##########
rust/sedona-spatial-join/src/probe/partitioned_stream_provider.rs:
##########
@@ -144,15 +181,16 @@ impl PartitionedProbeStreamProvider {
let mut state_guard = self.state.lock();
match std::mem::replace(&mut *state_guard,
ProbeStreamState::FirstPass) {
ProbeStreamState::Pending { source } => {
- let partitioner = Arc::clone(
- self.options
- .partitioner
- .as_ref()
- .expect("Partitioned first pass requires a
partitioner"),
- );
+ let partitioner = self
+ .options
+ .partitioner
+ .as_ref()
+ .expect("Partitioned first pass requires a partitioner")
+ .lock()
+ .box_clone();
let repartitioner = StreamRepartitioner::builder(
Arc::clone(&self.runtime_env),
- Arc::clone(&partitioner),
+ partitioner.box_clone(),
Review Comment:
The cloning flow here is a bit hard to follow (clone from the mutex
prototype, then immediately `box_clone()` again for the repartitioner).
Consider splitting into two clearly named variables (e.g.,
`partitioner_for_stream` / `partitioner_for_repartitioner`) to make the intent
explicit and reduce the chance of accidentally changing clone semantics later.
##########
rust/sedona-spatial-join/src/probe/partitioned_stream_provider.rs:
##########
@@ -50,7 +49,10 @@ pub(crate) struct ProbeStreamOptions {
/// - `None` means the probe side is treated as a single, non-partitioned
stream and only
/// [`SpatialPartition::Regular(0)`] is supported.
/// - `Some(_)` enables partitioned streaming with a warm-up (first) pass.
- pub partitioner: Option<Arc<dyn SpatialPartitioner>>,
+ ///
+ /// We wrap the partitioner in a `Mutex` to make [`ProbeStreamOptions`]
Send + Sync,
+ /// which makes it easier to integrate into `SpatialJoinExec`.
Review Comment:
This comment may be misleading without additional context: the `Mutex` is
used to make the *options* `Send + Sync`, but the partitioner itself is not
intended for shared concurrent use; it’s acting as a clonable prototype.
Consider clarifying that the mutex is primarily for `Sync`-compatibility /
cloning the prototype, not to enable sharing a single partitioner instance
across tasks.
```suggestion
/// The `Mutex` is used here to make [`ProbeStreamOptions`] (and its
contained options)
/// `Send + Sync` so it can be shared/cloned into `SpatialJoinExec` and
across tasks.
/// The partitioner itself is treated as a clonable prototype and is not
intended to be
/// used by multiple tasks concurrently via this shared `Mutex`.
```
##########
rust/sedona-spatial-join/src/probe/partitioned_stream_provider.rs:
##########
@@ -62,6 +64,41 @@ pub(crate) struct ProbeStreamOptions {
pub spilled_batch_in_memory_size_threshold: Option<usize>,
}
+impl ProbeStreamOptions {
+ pub fn new(
+ partitioner: Option<Box<dyn SpatialPartitioner>>,
+ target_batch_rows: usize,
+ spill_compression: SpillCompression,
+ buffer_bytes_threshold: usize,
+ spilled_batch_in_memory_size_threshold: Option<usize>,
+ ) -> Self {
+ let partitioner = partitioner.map(Mutex::new);
+ Self {
+ partitioner,
+ target_batch_rows,
+ spill_compression,
+ buffer_bytes_threshold,
+ spilled_batch_in_memory_size_threshold,
+ }
+ }
+}
+
+impl Clone for ProbeStreamOptions {
+ fn clone(&self) -> Self {
+ let cloned_partitioner = self
+ .partitioner
+ .as_ref()
+ .map(|p| Mutex::new(p.lock().box_clone()));
Review Comment:
This assumes `Mutex::lock()` is infallible (returns a guard directly). If
`Mutex` is `std::sync::Mutex`, `lock()` returns a `Result` and this will not
compile. Consider explicitly using `parking_lot::Mutex` (or whichever
infallible Mutex type you intend) and importing it clearly, or handle the
`std::sync::Mutex::lock()` result.
##########
rust/sedona-spatial-join/src/partitioning/round_robin.rs:
##########
@@ -51,11 +52,16 @@ impl SpatialPartitioner for RoundRobinPartitioner {
}
fn partition_no_multi(&self, _bbox: &BoundingBox) ->
Result<SpatialPartition> {
- let idx = self.counter.fetch_add(1, Ordering::Relaxed);
+ let idx = self.counter.get();
+ self.counter.set(idx + 1);
Review Comment:
`idx + 1` can overflow and panic in debug builds (unlike
`AtomicUsize::fetch_add`, which wraps). Use wrapping arithmetic (e.g.,
`wrapping_add(1)`) to preserve prior behavior and avoid overflow panics.
```suggestion
self.counter
.set(idx.wrapping_add(1));
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]