jonathanc-n commented on code in PR #16443: URL: https://github.com/apache/datafusion/pull/16443#discussion_r2181102938
########## datafusion/physical-plan/src/joins/nested_loop_join.rs: ########## @@ -828,13 +833,127 @@ impl<T: BatchTransformer> NestedLoopJoinStream<T> { handle_state!(self.process_probe_batch()) } NestedLoopJoinStreamState::ExhaustedProbeSide => { - handle_state!(self.process_unmatched_build_batch()) + handle_state!(self.prepare_unmatched_output_indices()) + } + NestedLoopJoinStreamState::OutputUnmatchedBuildRows(_) => { + handle_state!(self.build_unmatched_output()) } NestedLoopJoinStreamState::Completed => Poll::Ready(None), }; } } + fn get_next_join_result(&mut self) -> Result<Option<RecordBatch>> { + let (left_indices, right_indices, start) = + self.join_result_status.as_mut().ok_or_else(|| { + datafusion_common::_internal_datafusion_err!( + "should have join_result_status" Review Comment: Should we import `_internal_datafusion_err!` so we do not need the path qualifier? small nit, just looks cleaner that way ########## datafusion/physical-plan/src/joins/nested_loop_join.rs: ########## @@ -828,13 +833,127 @@ impl<T: BatchTransformer> NestedLoopJoinStream<T> { handle_state!(self.process_probe_batch()) } NestedLoopJoinStreamState::ExhaustedProbeSide => { - handle_state!(self.process_unmatched_build_batch()) + handle_state!(self.prepare_unmatched_output_indices()) + } + NestedLoopJoinStreamState::OutputUnmatchedBuildRows(_) => { + handle_state!(self.build_unmatched_output()) } NestedLoopJoinStreamState::Completed => Poll::Ready(None), }; } } + fn get_next_join_result(&mut self) -> Result<Option<RecordBatch>> { + let (left_indices, right_indices, start) = + self.join_result_status.as_mut().ok_or_else(|| { + datafusion_common::_internal_datafusion_err!( + "should have join_result_status" + ) + })?; + + let left_batch = self + .left_data + .as_ref() + .ok_or_else(|| { + datafusion_common::_internal_datafusion_err!("should have left_batch") + })? + .batch(); + + let right_batch = match &self.state { + NestedLoopJoinStreamState::ProcessProbeBatch(record_batch) => record_batch, + NestedLoopJoinStreamState::OutputUnmatchedBuildRows(record_batch) => { + record_batch + } + _ => { + return internal_err!( + "state should be ProcessProbeBatch or OutputUnmatchBatch" + ) + } + }; + + let current_start = *start; + + if left_indices.is_empty() && right_indices.is_empty() && current_start == 0 { Review Comment: Can we just build and return an empty batch instead of calling build_batch_from_indices? ########## datafusion/physical-plan/src/joins/utils.rs: ########## @@ -843,24 +844,56 @@ pub(crate) fn apply_join_filter_to_indices( probe_indices: UInt32Array, filter: &JoinFilter, build_side: JoinSide, + max_intermediate_size: Option<usize>, ) -> Result<(UInt64Array, UInt32Array)> { if build_indices.is_empty() && probe_indices.is_empty() { return Ok((build_indices, probe_indices)); }; - let intermediate_batch = build_batch_from_indices( - filter.schema(), - build_input_buffer, - probe_batch, - &build_indices, - &probe_indices, - filter.column_indices(), - build_side, - )?; - let filter_result = filter - .expression() - .evaluate(&intermediate_batch)? - .into_array(intermediate_batch.num_rows())?; + let filter_result = if let Some(max_size) = max_intermediate_size { Review Comment: Shouldn't we have this done in this pull request? I think it would make more sense (just moving this logic to `build_join_indices` ########## datafusion/physical-plan/src/joins/nested_loop_join.rs: ########## @@ -883,44 +1000,63 @@ impl<T: BatchTransformer> NestedLoopJoinStream<T> { let visited_left_side = left_data.bitmap(); let batch = self.state.try_as_process_probe_batch()?; - match self.batch_transformer.next() { - None => { - // Setting up timer & updating input metrics - self.join_metrics.input_batches.add(1); - self.join_metrics.input_rows.add(batch.num_rows()); - let timer = self.join_metrics.join_time.timer(); - - let result = join_left_and_right_batch( - left_data.batch(), - batch, - self.join_type, - self.filter.as_ref(), - &self.column_indices, - &self.schema, - visited_left_side, - &mut self.indices_cache, - self.right_side_ordered, - ); - timer.done(); + let binding = self.join_metrics.join_time.clone(); + let _timer = binding.timer(); + + if self.join_result_status.is_none() { + let (left_side_indices, right_side_indices) = join_left_and_right_batch( + left_data.batch(), + batch, + self.join_type, + self.filter.as_ref(), + visited_left_side, + &mut self.indices_cache, + self.right_side_ordered, + self.intermediate_batch_size, + )?; + self.join_result_status = Some((left_side_indices, right_side_indices, 0)) + } + + let join_result = self.get_next_join_result()?; - self.batch_transformer.set_batch(result?); + match join_result { + Some(res) => { + self.join_metrics.output_batches.add(1); + self.join_metrics.output_rows.add(res.num_rows()); + + Ok(StatefulStreamResult::Ready(Some(res))) + } + None => { + self.state = NestedLoopJoinStreamState::FetchProbeBatch; + self.join_result_status = None; Ok(StatefulStreamResult::Continue) } - Some((batch, last)) => { - if last { - self.state = NestedLoopJoinStreamState::FetchProbeBatch; - } + } + } - self.join_metrics.output_batches.add(1); - self.join_metrics.output_rows.add(batch.num_rows()); - Ok(StatefulStreamResult::Ready(Some(batch))) + fn build_unmatched_output( + &mut self, + ) -> Result<StatefulStreamResult<Option<RecordBatch>>> { + if matches!( Review Comment: I do not think we need this check, it is already guaranteed that this function will only run when we have the `OutputUnmatchedBuildRows` state -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For additional commands, e-mail: github-h...@datafusion.apache.org