mhilton commented on code in PR #12634: URL: https://github.com/apache/datafusion/pull/12634#discussion_r1778300562
########## datafusion/physical-plan/src/joins/nested_loop_join.rs: ########## @@ -560,91 +729,147 @@ impl NestedLoopJoinStream { // Get or initialize visited_left_side bitmap if required by join type let visited_left_side = left_data.bitmap(); - // Check is_exhausted before polling the outer_table, such that when the outer table - // does not support `FusedStream`, Self will not poll it again - if self.is_exhausted { - return Poll::Ready(None); - } + loop { + if let Some(batch) = self.output_buffer.next() { + self.join_metrics.output_batches.add(1); + self.join_metrics.output_rows.add(batch.num_rows()); + return Poll::Ready(Some(Ok(batch))); + } - self.outer_table - .poll_next_unpin(cx) - .map(|maybe_batch| match maybe_batch { - Some(Ok(right_batch)) => { - // Setting up timer & updating input metrics - self.join_metrics.input_batches.add(1); - self.join_metrics.input_rows.add(right_batch.num_rows()); - let timer = self.join_metrics.join_time.timer(); - - let result = join_left_and_right_batch( - left_data.batch(), - &right_batch, - self.join_type, - self.filter.as_ref(), - &self.column_indices, - &self.schema, - visited_left_side, - &mut self.indices_cache, - self.right_side_ordered, - ); - - // Recording time & updating output metrics - if let Ok(batch) = &result { - timer.done(); - self.join_metrics.output_batches.add(1); - self.join_metrics.output_rows.add(batch.num_rows()); + // Check is_exhausted before polling the outer_table, such that when the outer table + // does not support `FusedStream`, Self will not poll it again + if self.is_exhausted { + let batch = self.output_buffer.finish()?; + if let Some(batch) = &batch { + self.join_metrics.output_batches.add(1); + self.join_metrics.output_rows.add(batch.num_rows()); + } + return Poll::Ready(Ok(batch).transpose()); + } + + if self.outer_record_batch.is_none() { + // Get the next outer record batch + match self.outer_table.poll_next_unpin(cx) { + Poll::Ready(Some(Ok(batch))) => { + self.join_metrics.input_batches.add(1); + self.join_metrics.input_rows.add(batch.num_rows()); + self.memory_reservation + .try_grow(batch.get_array_memory_size())?; + self.outer_record_batch = Some(batch); + self.outer_record_batch_row = 0; } + Poll::Ready(Some(Err(e))) => return Poll::Ready(Some(Err(e))), + Poll::Ready(None) => { + if need_produce_result_in_final(self.join_type) { + // At this stage `visited_left_side` won't be updated, so it's + // safe to report about probe completion. + // + // Setting `is_exhausted` will prevent from multiple calls of + // `report_probe_completed()` + if !left_data.report_probe_completed() { + self.is_exhausted = true; + continue; + }; + + // Only setting up timer, input is exhausted + let timer = self.join_metrics.join_time.timer(); + // use the global left bitmap to produce the left indices and right indices + let (left_side, right_side) = + get_final_indices_from_shared_bitmap( + visited_left_side, + self.join_type, + ); + let empty_right_batch = + RecordBatch::new_empty(self.outer_table.schema()); + // use the left and right indices to produce the batch result + let result = build_batch_from_indices( + &self.schema, + left_data.batch(), + &empty_right_batch, + &left_side, + &right_side, + &self.column_indices, + JoinSide::Left, + ); + self.is_exhausted = true; - Some(result) - } - Some(err) => Some(err), - None => { - if need_produce_result_in_final(self.join_type) { - // At this stage `visited_left_side` won't be updated, so it's - // safe to report about probe completion. - // - // Setting `is_exhausted` / returning None will prevent from - // multiple calls of `report_probe_completed()` - if !left_data.report_probe_completed() { + // Recording time & updating output metrics + match result { + Ok(batch) => { + timer.done(); + self.output_buffer.push(batch)?; + continue; + } + Err(e) => return Poll::Ready(Some(Err(e))), + } + } else { self.is_exhausted = true; - return None; + continue; + } + } + Poll::Pending => { + return match self.output_buffer.flush() { + Ok(Some(batch)) => { + // If there was anything in the output buffer flush it + // so that it can be processed. + self.join_metrics.output_batches.add(1); + self.join_metrics.output_rows.add(batch.num_rows()); + Poll::Ready(Some(Ok(batch))) + } + Ok(None) => Poll::Pending, + Err(err) => Poll::Ready(Some(Err(err))), }; + } + } + } - // Only setting up timer, input is exhausted - let timer = self.join_metrics.join_time.timer(); - // use the global left bitmap to produce the left indices and right indices - let (left_side, right_side) = - get_final_indices_from_shared_bitmap( - visited_left_side, - self.join_type, - ); - let empty_right_batch = - RecordBatch::new_empty(self.outer_table.schema()); - // use the left and right indices to produce the batch result - let result = build_batch_from_indices( - &self.schema, - left_data.batch(), - &empty_right_batch, - &left_side, - &right_side, - &self.column_indices, - JoinSide::Left, - ); - self.is_exhausted = true; - - // Recording time & updating output metrics - if let Ok(batch) = &result { - timer.done(); - self.join_metrics.output_batches.add(1); - self.join_metrics.output_rows.add(batch.num_rows()); - } + debug_assert!(self.outer_record_batch.is_some()); + let right_batch = self.outer_record_batch.as_ref().unwrap(); + let num_rows = match (self.join_type, left_data.batch().num_rows()) { + // An inner join will only produce 1 output row per input row. + (JoinType::Inner, _) | (_, 0) => self.output_buffer.needed_rows(), Review Comment: You are right of course, I'm not sure why I got it in my head that they would be 1-1. I'll fix that. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For additional commands, e-mail: github-h...@datafusion.apache.org