Copilot commented on code in PR #525:
URL: https://github.com/apache/sedona-db/pull/525#discussion_r2704357691


##########
rust/sedona-spatial-join/src/stream.rs:
##########
@@ -486,24 +486,118 @@ pub(crate) struct SpatialJoinBatchIterator {
     spatial_index: Arc<SpatialIndex>,
     /// The probe side batch being processed
     probe_evaluated_batch: Arc<EvaluatedBatch>,
-    /// Current probe row index being processed
-    current_probe_idx: usize,
     /// Join metrics for tracking performance
     join_metrics: SpatialJoinProbeMetrics,
     /// Maximum batch size before yielding a result
     max_batch_size: usize,
     /// Maintains the order of the probe side
     probe_side_ordered: bool,
-    /// Current accumulated build batch positions
-    build_batch_positions: Vec<(i32, i32)>,
-    /// Current accumulated probe indices
-    probe_indices: Vec<u32>,
-    /// Whether iteration is complete
-    is_complete: bool,
     /// The spatial predicate being evaluated
     spatial_predicate: SpatialPredicate,
     /// The spatial join options
     options: SpatialJoinOptions,
+    /// Progress of probing
+    progress: Option<ProbeProgress>,
+}
+
+struct ProbeProgress {
+    /// Index of the probe row to be probed by 
[SpatialJoinBatchIterator::probe_range] or
+    /// [SpatialJoinBatchIterator::probe_knn].
+    current_probe_idx: usize,
+    /// Index of the lastly produced probe row. There are three cases:
+    /// - -1 means nothing was produced yet.
+    /// - >=num_rows means we have produced all probe rows. The iterator is 
complete.
+    /// - within [0, num_rows) means we have produced up to this probe index 
(inclusive)].
+    ///   The value is largest probe row index that has matching build rows so 
far.

Review Comment:
   The use of -1 to represent 'nothing produced yet' should be more explicitly 
documented as a sentinel value. Consider adding why i64 is used instead of 
Option<usize> to make the design choice clearer.
   ```suggestion
       /// Index of the lastly produced probe row. This field uses `-1` as a 
sentinel value
       /// to represent "nothing produced yet" and is stored as `i64` instead of
       /// `Option<usize>` to keep the layout compact and avoid extra branching 
and
       /// wrapping/unwrapping in the hot probe loop. There are three cases:
       /// - `-1` means nothing was produced yet.
       /// - `>= num_rows` means we have produced all probe rows. The iterator 
is complete.
       /// - within `[0, num_rows)` means we have produced up to this probe 
index (inclusive).
       ///   The value is the largest probe row index that has matching build 
rows so far.
   ```



##########
rust/sedona-spatial-join/src/stream.rs:
##########
@@ -1202,4 +1381,156 @@ mod tests {
                 "Data mismatch when mapping back from assembled batch row {i} 
to original batch {original_batch_idx} row {original_row_idx}");
         }
     }
+
+    #[test]
+    fn test_produce_joined_indices() {
+        for max_batch_size in 1..20 {
+            verify_produce_probe_indices(&[], 0, max_batch_size);
+            verify_produce_probe_indices(&[0, 0, 0, 0], 1, max_batch_size);
+            verify_produce_probe_indices(&[0, 0, 0, 0], 10, max_batch_size);
+            verify_produce_probe_indices(&[3, 3, 3], 10, max_batch_size);
+            verify_produce_probe_indices(&[0, 0, 3, 3, 3, 6, 7], 10, 
max_batch_size);
+            verify_produce_probe_indices(&[0, 3, 3, 3, 4, 5, 5, 9], 10, 
max_batch_size);
+            verify_produce_probe_indices(&[0, 3, 3, 4, 5, 5, 9, 9], 10, 
max_batch_size);
+        }
+    }
+
+    #[test]
+    fn test_fuzz_produce_probe_indices() {
+        let num_rows_range = 0..100;
+        let max_batch_size_range = 1..100;
+        let match_probability = 0.5;
+        let num_matches_range = 1..100;
+        for _ in 0..1000 {
+            fuzz_produce_probe_indices(
+                num_rows_range.clone(),
+                max_batch_size_range.clone(),
+                match_probability,
+                num_matches_range.clone(),
+            );
+        }
+    }
+
+    fn fuzz_produce_probe_indices(
+        num_rows_range: Range<usize>,
+        max_batch_size_range: Range<usize>,
+        match_probability: f64,
+        num_matches_range: Range<usize>,
+    ) {
+        let mut rng = rand::rng();

Review Comment:
   Using the default thread-local RNG without seeding makes the fuzz test 
non-deterministic and difficult to reproduce failures. Consider using a seeded 
RNG (e.g., `StdRng::seed_from_u64`) or at least logging the seed used for each 
test run to enable reproduction of failures.



##########
rust/sedona-spatial-join/src/stream.rs:
##########
@@ -532,68 +626,117 @@ impl SpatialJoinBatchIterator {
             build_side: params.build_side,
             spatial_index: params.spatial_index,
             probe_evaluated_batch: params.probe_evaluated_batch,
-            current_probe_idx: 0,
             join_metrics: params.join_metrics,
             max_batch_size: params.max_batch_size,
             probe_side_ordered: params.probe_side_ordered,
-            build_batch_positions: Vec::new(),
-            probe_indices: Vec::new(),
-            is_complete: false,
             spatial_predicate: params.spatial_predicate,
             options: params.options,
+            progress: Some(ProbeProgress {
+                current_probe_idx: 0,
+                last_produced_probe_idx: -1,
+                build_batch_positions: Vec::new(),
+                probe_indices: Vec::new(),
+                pos: 0,
+            }),
         })
     }
 
     pub async fn next_batch(&mut self) -> Result<Option<RecordBatch>> {
-        if self.is_complete {
-            return Ok(None);
-        }
+        let progress_opt = std::mem::take(&mut self.progress);
+        let mut progress = progress_opt.expect("Progress should be available");
+        let res = self.next_batch_inner(&mut progress).await;
+        self.progress = Some(progress);
+        res

Review Comment:
   Using `std::mem::take` followed by `expect` creates unnecessary indirection. 
Since `progress` is always `Some` during normal operation, consider using a 
different pattern such as storing `ProbeProgress` directly and using `&mut` 
references, or document why the Option wrapper is necessary.
   ```suggestion
           let progress = self
               .progress
               .as_mut()
               .expect("Progress should be available");
           self.next_batch_inner(progress).await
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to