berkaysynnada commented on code in PR #9830:
URL: https://github.com/apache/arrow-datafusion/pull/9830#discussion_r1546066877
##########
datafusion/physical-plan/src/joins/cross_join.rs:
##########
@@ -374,64 +376,147 @@ impl Stream for CrossJoinStream {
}
impl CrossJoinStream {
- /// Separate implementation function that unpins the [`CrossJoinStream`] so
- /// that partial borrows work correctly
+ /// Separate implementation function that unpins the [`CrossJoinStream`]
+ /// so that partial borrows work correctly
fn poll_next_impl(
&mut self,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Option<Result<RecordBatch>>> {
+ loop {
+ return match self.state {
+ CrossJoinStreamState::WaitBuildSide => {
+ handle_state!(ready!(self.collect_build_side(cx)))
+ }
+ CrossJoinStreamState::FetchProbeBatch => {
+ handle_state!(ready!(self.fetch_probe_batch(cx)))
+ }
+ CrossJoinStreamState::GenerateResult => {
+ handle_state!(self.generate_result())
+ }
+ CrossJoinStreamState::Completed => Poll::Ready(None),
+ };
+ }
+ }
+
+ /// Waits until the left data computation completes. After it is ready,
+ /// copies it into the state and continues with fetching probe side. If we
+ /// cannot receive any row from left, the operation ends without polling
right.
+ fn collect_build_side(
+ &mut self,
+ cx: &mut std::task::Context<'_>,
+ ) -> Poll<Result<StatefulStreamResult<Option<RecordBatch>>>> {
let build_timer = self.join_metrics.build_time.timer();
let (left_data, _) = match ready!(self.left_fut.get(cx)) {
Ok(left_data) => left_data,
- Err(e) => return Poll::Ready(Some(Err(e))),
+ Err(e) => return Poll::Ready(Err(e)),
};
build_timer.done();
- if left_data.num_rows() == 0 {
- return Poll::Ready(None);
+ // If the left batch is empty, we can return `Poll::Ready(None)`
immediately.
+ if left_data.iter().all(|batch| batch.num_rows() == 0) {
+ self.state = CrossJoinStreamState::Completed;
+ Poll::Ready(Ok(StatefulStreamResult::Continue))
+ } else {
+ self.left_data = left_data
Review Comment:
Isn't `CrossJoinExecStream` created once and execution continues on the same
object? If it is so, it will be only one copy since
`CrossJoinStreamState::WaitBuildSide` state is only visited once at the start
of the execution. I believe the clone is not expensive here also, just a vector
of references.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]