Kontinuation commented on code in PR #563:
URL: https://github.com/apache/sedona-db/pull/563#discussion_r2748717578


##########
rust/sedona-spatial-join/src/stream.rs:
##########
@@ -185,49 +213,155 @@ impl SpatialJoinProbeMetrics {
 pub(crate) enum SpatialJoinStreamState {
     /// The initial mode: waiting for the spatial join components to become 
available
     WaitPrepareSpatialJoinComponents,
-    /// The initial mode: waiting for the spatial index to be built
-    WaitBuildIndex,
+    /// Wait for a specific partition's index. The boolean denotes whether 
this stream should kick
+    /// off building the index (`true`) or simply wait for someone else to 
build it (`false`).
+    WaitBuildIndex(u32, bool),
     /// Indicates that build-side has been collected, and stream is ready for
-    /// fetching probe-side
-    FetchProbeBatch,
+    /// fetching probe-side batches
+    FetchProbeBatch(PartitionDescriptor),
     /// Indicates that we're processing a probe batch using the batch iterator
     ProcessProbeBatch(
+        PartitionDescriptor,
         BoxFuture<'static, (Box<SpatialJoinBatchIterator>, 
Result<Option<RecordBatch>>)>,
     ),
-    /// Indicates that probe-side has been fully processed
-    ExhaustedProbeSide,
+    /// Indicates that we have exhausted the current probe stream, move to the 
Multi partition
+    /// or prepare for emitting unmatched build batch
+    ExhaustedProbeStream(PartitionDescriptor),
+    /// Indicates that probe-side has been fully processed, prepare iterator 
for producing
+    /// unmatched build side batches for outer join
+    PrepareUnmatchedBuildBatch(PartitionDescriptor),
     /// Indicates that we're processing unmatched build-side batches using an 
iterator
-    ProcessUnmatchedBuildBatch(UnmatchedBuildBatchIterator),
+    ProcessUnmatchedBuildBatch(PartitionDescriptor, 
UnmatchedBuildBatchIterator),
+    /// Prepare for processing the next partition.
+    /// If the last partition has been processed, simply transfer to 
[`SpatialJoinStreamState::Completed`];
+    /// If the there's still more partitions to process, then transfer to 
[`SpatialJoinStreamState::WaitBuildIndex`] state.
+    /// If we are the last one finishing processing the current partition, we 
can safely
+    /// drop the current index and kick off the building of the index for the 
next partition.
+    PrepareForNextPartition(u32, bool),
     /// Indicates that SpatialJoinStream execution is completed
     Completed,
 }
 
+impl std::fmt::Debug for SpatialJoinStreamState {
+    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+        match self {
+            Self::WaitPrepareSpatialJoinComponents => write!(f, 
"WaitPrepareSpatialJoinComponents"),
+            Self::WaitBuildIndex(id, build) => f
+                .debug_tuple("WaitBuildIndex")
+                .field(id)
+                .field(build)
+                .finish(),
+            Self::FetchProbeBatch(desc) => 
f.debug_tuple("FetchProbeBatch").field(desc).finish(),
+            Self::ProcessProbeBatch(desc, _) => {
+                f.debug_tuple("ProcessProbeBatch").field(desc).finish()
+            }
+            Self::ExhaustedProbeStream(desc) => {
+                f.debug_tuple("ExhaustedProbeStream").field(desc).finish()
+            }
+            Self::PrepareUnmatchedBuildBatch(desc) => f
+                .debug_tuple("PrepareUnmatchedBuildBatch")
+                .field(desc)
+                .finish(),
+            Self::ProcessUnmatchedBuildBatch(desc, iter) => f
+                .debug_tuple("ProcessUnmatchedBuildBatch")
+                .field(desc)
+                .field(iter)
+                .finish(),
+            Self::PrepareForNextPartition(id, last) => f
+                .debug_tuple("PrepareForNextPartition")
+                .field(id)
+                .field(last)
+                .finish(),
+            Self::Completed => write!(f, "Completed"),
+        }
+    }
+}
+
+#[derive(Debug, Clone, Copy)]
+pub(crate) struct PartitionDescriptor {
+    partition_id: u32,
+    partition: SpatialPartition,
+}
+
+impl PartitionDescriptor {
+    fn regular(partition_id: u32) -> Self {
+        Self {
+            partition_id,
+            partition: SpatialPartition::Regular(partition_id),
+        }
+    }
+
+    fn multi(partition_id: u32) -> Self {
+        Self {
+            partition_id,
+            partition: SpatialPartition::Multi,
+        }

Review Comment:
   There is no problem. We need to process the MULTI partition after processing 
each regular partition. The partition_id is needed to track where we are at 
when processing the MULTI partition.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to