berkaysynnada commented on code in PR #9830:
URL: https://github.com/apache/arrow-datafusion/pull/9830#discussion_r1546066877


##########
datafusion/physical-plan/src/joins/cross_join.rs:
##########
@@ -374,64 +376,147 @@ impl Stream for CrossJoinStream {
 }
 
 impl CrossJoinStream {
-    /// Separate implementation function that unpins the [`CrossJoinStream`] so
-    /// that partial borrows work correctly
+    /// Separate implementation function that unpins the [`CrossJoinStream`]
+    /// so that partial borrows work correctly
     fn poll_next_impl(
         &mut self,
         cx: &mut std::task::Context<'_>,
     ) -> std::task::Poll<Option<Result<RecordBatch>>> {
+        loop {
+            return match self.state {
+                CrossJoinStreamState::WaitBuildSide => {
+                    handle_state!(ready!(self.collect_build_side(cx)))
+                }
+                CrossJoinStreamState::FetchProbeBatch => {
+                    handle_state!(ready!(self.fetch_probe_batch(cx)))
+                }
+                CrossJoinStreamState::GenerateResult => {
+                    handle_state!(self.generate_result())
+                }
+                CrossJoinStreamState::Completed => Poll::Ready(None),
+            };
+        }
+    }
+
+    /// Waits until the left data computation completes. After it is ready,
+    /// copies it into the state and continues with fetching probe side. If we
+    /// cannot receive any row from left, the operation ends without polling 
right.
+    fn collect_build_side(
+        &mut self,
+        cx: &mut std::task::Context<'_>,
+    ) -> Poll<Result<StatefulStreamResult<Option<RecordBatch>>>> {
         let build_timer = self.join_metrics.build_time.timer();
         let (left_data, _) = match ready!(self.left_fut.get(cx)) {
             Ok(left_data) => left_data,
-            Err(e) => return Poll::Ready(Some(Err(e))),
+            Err(e) => return Poll::Ready(Err(e)),
         };
         build_timer.done();
 
-        if left_data.num_rows() == 0 {
-            return Poll::Ready(None);
+        // If the left batch is empty, we can return `Poll::Ready(None)` 
immediately.
+        if left_data.iter().all(|batch| batch.num_rows() == 0) {
+            self.state = CrossJoinStreamState::Completed;
+            Poll::Ready(Ok(StatefulStreamResult::Continue))
+        } else {
+            self.left_data = left_data

Review Comment:
   Isn't `CrossJoinExecStream` created once and execution continues on the same 
object? If it is so, it will be only one copy since 
`CrossJoinStreamState::WaitBuildSide` state is only visited once at the start 
of the execution. I believe the clone is not expensive here also, just a vector 
of references.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to