korowa commented on code in PR #9830:
URL: https://github.com/apache/arrow-datafusion/pull/9830#discussion_r1546448558
##########
datafusion/physical-plan/src/joins/cross_join.rs:
##########
@@ -374,64 +376,147 @@ impl Stream for CrossJoinStream {
}
impl CrossJoinStream {
- /// Separate implementation function that unpins the [`CrossJoinStream`] so
- /// that partial borrows work correctly
+ /// Separate implementation function that unpins the [`CrossJoinStream`]
+ /// so that partial borrows work correctly
fn poll_next_impl(
&mut self,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Option<Result<RecordBatch>>> {
+ loop {
+ return match self.state {
+ CrossJoinStreamState::WaitBuildSide => {
+ handle_state!(ready!(self.collect_build_side(cx)))
+ }
+ CrossJoinStreamState::FetchProbeBatch => {
+ handle_state!(ready!(self.fetch_probe_batch(cx)))
+ }
+ CrossJoinStreamState::GenerateResult => {
+ handle_state!(self.generate_result())
+ }
+ CrossJoinStreamState::Completed => Poll::Ready(None),
+ };
+ }
+ }
+
+ /// Waits until the left data computation completes. After it is ready,
+ /// copies it into the state and continues with fetching probe side. If we
+ /// cannot receive any row from left, the operation ends without polling
right.
+ fn collect_build_side(
+ &mut self,
+ cx: &mut std::task::Context<'_>,
+ ) -> Poll<Result<StatefulStreamResult<Option<RecordBatch>>>> {
let build_timer = self.join_metrics.build_time.timer();
let (left_data, _) = match ready!(self.left_fut.get(cx)) {
Ok(left_data) => left_data,
- Err(e) => return Poll::Ready(Some(Err(e))),
+ Err(e) => return Poll::Ready(Err(e)),
};
build_timer.done();
- if left_data.num_rows() == 0 {
- return Poll::Ready(None);
+ // If the left batch is empty, we can return `Poll::Ready(None)`
immediately.
+ if left_data.iter().all(|batch| batch.num_rows() == 0) {
+ self.state = CrossJoinStreamState::Completed;
+ Poll::Ready(Ok(StatefulStreamResult::Continue))
+ } else {
+ self.left_data = left_data
Review Comment:
`CrossJoinExec::execute(partition)` will create stream per each partition,
so there will be multiple objects. And you're right, cloning vector of
RecordBatches (cloning RecordBatches) is ok memory-wise, as columns are already
behind Arcs, so it doesn't look like an issue :ok_hand:
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]