korowa commented on code in PR #9830:
URL: https://github.com/apache/arrow-datafusion/pull/9830#discussion_r1546448558


##########
datafusion/physical-plan/src/joins/cross_join.rs:
##########
@@ -374,64 +376,147 @@ impl Stream for CrossJoinStream {
 }
 
 impl CrossJoinStream {
-    /// Separate implementation function that unpins the [`CrossJoinStream`] so
-    /// that partial borrows work correctly
+    /// Separate implementation function that unpins the [`CrossJoinStream`]
+    /// so that partial borrows work correctly
     fn poll_next_impl(
         &mut self,
         cx: &mut std::task::Context<'_>,
     ) -> std::task::Poll<Option<Result<RecordBatch>>> {
+        loop {
+            return match self.state {
+                CrossJoinStreamState::WaitBuildSide => {
+                    handle_state!(ready!(self.collect_build_side(cx)))
+                }
+                CrossJoinStreamState::FetchProbeBatch => {
+                    handle_state!(ready!(self.fetch_probe_batch(cx)))
+                }
+                CrossJoinStreamState::GenerateResult => {
+                    handle_state!(self.generate_result())
+                }
+                CrossJoinStreamState::Completed => Poll::Ready(None),
+            };
+        }
+    }
+
+    /// Waits until the left data computation completes. After it is ready,
+    /// copies it into the state and continues with fetching probe side. If we
+    /// cannot receive any row from left, the operation ends without polling 
right.
+    fn collect_build_side(
+        &mut self,
+        cx: &mut std::task::Context<'_>,
+    ) -> Poll<Result<StatefulStreamResult<Option<RecordBatch>>>> {
         let build_timer = self.join_metrics.build_time.timer();
         let (left_data, _) = match ready!(self.left_fut.get(cx)) {
             Ok(left_data) => left_data,
-            Err(e) => return Poll::Ready(Some(Err(e))),
+            Err(e) => return Poll::Ready(Err(e)),
         };
         build_timer.done();
 
-        if left_data.num_rows() == 0 {
-            return Poll::Ready(None);
+        // If the left batch is empty, we can return `Poll::Ready(None)` 
immediately.
+        if left_data.iter().all(|batch| batch.num_rows() == 0) {
+            self.state = CrossJoinStreamState::Completed;
+            Poll::Ready(Ok(StatefulStreamResult::Continue))
+        } else {
+            self.left_data = left_data

Review Comment:
   `CrossJoinExec::execute(partition)` will create stream per each partition, 
so there will be multiple objects. And you're right, cloning vector of 
RecordBatches (cloning RecordBatches) is ok memory-wise, as columns are already 
behind Arcs, so it doesn't look like an issue :ok_hand:



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to