jonathanc-n commented on code in PR #16443: URL: https://github.com/apache/datafusion/pull/16443#discussion_r2181125973
########## datafusion/physical-plan/src/joins/nested_loop_join.rs: ########## @@ -828,13 +828,127 @@ impl<T: BatchTransformer> NestedLoopJoinStream<T> { handle_state!(self.process_probe_batch()) } NestedLoopJoinStreamState::ExhaustedProbeSide => { - handle_state!(self.process_unmatched_build_batch()) + handle_state!(self.prepare_unmatched_output_indices()) + } + NestedLoopJoinStreamState::OutputUnmatchedBuildRows(_) => { + handle_state!(self.build_unmatched_output()) } NestedLoopJoinStreamState::Completed => Poll::Ready(None), }; } } + fn get_next_join_result(&mut self) -> Result<Option<RecordBatch>> { + let (left_indices, right_indices, start) = + self.join_result_status.as_mut().ok_or_else(|| { + datafusion_common::_internal_datafusion_err!( + "get_next_join_result called without initializing join_result_status" + ) + })?; + + let left_batch = self + .left_data + .as_ref() + .ok_or_else(|| { + datafusion_common::_internal_datafusion_err!("should have left_batch") + })? + .batch(); + + let right_batch = match &self.state { + NestedLoopJoinStreamState::ProcessProbeBatch(record_batch) => record_batch, + NestedLoopJoinStreamState::OutputUnmatchedBuildRows(record_batch) => { + record_batch + } + _ => { + return internal_err!( + "state should be ProcessProbeBatch or OutputUnmatchBatch" Review Comment: ```suggestion "State should be ProcessProbeBatch or OutputUnmatchedBuildRows" ``` ########## datafusion/physical-plan/src/joins/nested_loop_join.rs: ########## @@ -828,13 +828,127 @@ impl<T: BatchTransformer> NestedLoopJoinStream<T> { handle_state!(self.process_probe_batch()) } NestedLoopJoinStreamState::ExhaustedProbeSide => { - handle_state!(self.process_unmatched_build_batch()) + handle_state!(self.prepare_unmatched_output_indices()) + } + NestedLoopJoinStreamState::OutputUnmatchedBuildRows(_) => { + handle_state!(self.build_unmatched_output()) } NestedLoopJoinStreamState::Completed => Poll::Ready(None), }; } } + fn get_next_join_result(&mut self) -> Result<Option<RecordBatch>> { + let (left_indices, right_indices, start) = + self.join_result_status.as_mut().ok_or_else(|| { + datafusion_common::_internal_datafusion_err!( + "get_next_join_result called without initializing join_result_status" + ) + })?; + + let left_batch = self + .left_data + .as_ref() + .ok_or_else(|| { + datafusion_common::_internal_datafusion_err!("should have left_batch") + })? + .batch(); + + let right_batch = match &self.state { + NestedLoopJoinStreamState::ProcessProbeBatch(record_batch) => record_batch, + NestedLoopJoinStreamState::OutputUnmatchedBuildRows(record_batch) => { Review Comment: ```suggestion NestedLoopJoinStreamState::ProcessProbeBatch(record_batch) | NestedLoopJoinStreamState::OutputUnmatchedBuildRows(record_batch) => { ``` ########## datafusion/physical-plan/src/joins/nested_loop_join.rs: ########## @@ -883,44 +1000,63 @@ impl<T: BatchTransformer> NestedLoopJoinStream<T> { let visited_left_side = left_data.bitmap(); let batch = self.state.try_as_process_probe_batch()?; - match self.batch_transformer.next() { - None => { - // Setting up timer & updating input metrics - self.join_metrics.input_batches.add(1); - self.join_metrics.input_rows.add(batch.num_rows()); - let timer = self.join_metrics.join_time.timer(); - - let result = join_left_and_right_batch( - left_data.batch(), - batch, - self.join_type, - self.filter.as_ref(), - &self.column_indices, - &self.schema, - visited_left_side, - &mut self.indices_cache, - self.right_side_ordered, - ); - timer.done(); + let binding = self.join_metrics.join_time.clone(); + let _timer = binding.timer(); + + if self.join_result_status.is_none() { + let (left_side_indices, right_side_indices) = join_left_and_right_batch( + left_data.batch(), + batch, + self.join_type, + self.filter.as_ref(), + visited_left_side, + &mut self.indices_cache, + self.right_side_ordered, + self.intermediate_batch_size, + )?; + self.join_result_status = Some((left_side_indices, right_side_indices, 0)) + } + + let join_result = self.get_next_join_result()?; - self.batch_transformer.set_batch(result?); + match join_result { + Some(res) => { + self.join_metrics.output_batches.add(1); + self.join_metrics.output_rows.add(res.num_rows()); + + Ok(StatefulStreamResult::Ready(Some(res))) + } + None => { + self.state = NestedLoopJoinStreamState::FetchProbeBatch; + self.join_result_status = None; Ok(StatefulStreamResult::Continue) } - Some((batch, last)) => { - if last { - self.state = NestedLoopJoinStreamState::FetchProbeBatch; - } + } + } - self.join_metrics.output_batches.add(1); - self.join_metrics.output_rows.add(batch.num_rows()); - Ok(StatefulStreamResult::Ready(Some(batch))) + fn build_unmatched_output( + &mut self, + ) -> Result<StatefulStreamResult<Option<RecordBatch>>> { + if matches!( + self.state, + NestedLoopJoinStreamState::OutputUnmatchedBuildRows(_) + ) { + let start = Instant::now(); + let res = self.get_next_join_result()?; + self.join_metrics.join_time.add_elapsed(start); + match res { + Some(res) => Ok(StatefulStreamResult::Ready(Some(res))), + None => { + self.state = NestedLoopJoinStreamState::Completed; + Ok(StatefulStreamResult::Ready(None)) + } } + } else { + internal_err!("state should be OutputUnmatchBatch") Review Comment: ```suggestion internal_err!("State should be OutputUnmatchedBuildRows") ``` ########## datafusion/physical-plan/src/joins/nested_loop_join.rs: ########## @@ -729,10 +716,26 @@ struct NestedLoopJoinStream<T> { right_side_ordered: bool, /// Current state of the stream state: NestedLoopJoinStreamState, + #[allow(dead_code)] + // TODO: remove this field ?? /// Transforms the output batch before returning. batch_transformer: T, /// Result of the left data future left_data: Option<Arc<JoinLeftData>>, + + // Tracks progress when building join result batches incrementally + // Contains (build_indices, probe_indices, processed_count) where: + // - build_indices: row indices from build-side table (left table) + // - probe_indices: row indices from probe-side table (right table) + // - processed_count: number of index pairs already processed into output batches + // We have completed join result for indices [0..processed_count) + join_result_status: Option<( Review Comment: I still think we should make this into a struct -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For additional commands, e-mail: github-h...@datafusion.apache.org