jonathanc-n commented on code in PR #16443:
URL: https://github.com/apache/datafusion/pull/16443#discussion_r2181125973


##########
datafusion/physical-plan/src/joins/nested_loop_join.rs:
##########
@@ -828,13 +828,127 @@ impl<T: BatchTransformer> NestedLoopJoinStream<T> {
                     handle_state!(self.process_probe_batch())
                 }
                 NestedLoopJoinStreamState::ExhaustedProbeSide => {
-                    handle_state!(self.process_unmatched_build_batch())
+                    handle_state!(self.prepare_unmatched_output_indices())
+                }
+                NestedLoopJoinStreamState::OutputUnmatchedBuildRows(_) => {
+                    handle_state!(self.build_unmatched_output())
                 }
                 NestedLoopJoinStreamState::Completed => Poll::Ready(None),
             };
         }
     }
 
+    fn get_next_join_result(&mut self) -> Result<Option<RecordBatch>> {
+        let (left_indices, right_indices, start) =
+            self.join_result_status.as_mut().ok_or_else(|| {
+                datafusion_common::_internal_datafusion_err!(
+                    "get_next_join_result called without initializing 
join_result_status"
+                )
+            })?;
+
+        let left_batch = self
+            .left_data
+            .as_ref()
+            .ok_or_else(|| {
+                datafusion_common::_internal_datafusion_err!("should have 
left_batch")
+            })?
+            .batch();
+
+        let right_batch = match &self.state {
+            NestedLoopJoinStreamState::ProcessProbeBatch(record_batch) => 
record_batch,
+            NestedLoopJoinStreamState::OutputUnmatchedBuildRows(record_batch) 
=> {
+                record_batch
+            }
+            _ => {
+                return internal_err!(
+                    "state should be ProcessProbeBatch or OutputUnmatchBatch"

Review Comment:
   ```suggestion
                       "State should be ProcessProbeBatch or 
OutputUnmatchedBuildRows"
   ```



##########
datafusion/physical-plan/src/joins/nested_loop_join.rs:
##########
@@ -828,13 +828,127 @@ impl<T: BatchTransformer> NestedLoopJoinStream<T> {
                     handle_state!(self.process_probe_batch())
                 }
                 NestedLoopJoinStreamState::ExhaustedProbeSide => {
-                    handle_state!(self.process_unmatched_build_batch())
+                    handle_state!(self.prepare_unmatched_output_indices())
+                }
+                NestedLoopJoinStreamState::OutputUnmatchedBuildRows(_) => {
+                    handle_state!(self.build_unmatched_output())
                 }
                 NestedLoopJoinStreamState::Completed => Poll::Ready(None),
             };
         }
     }
 
+    fn get_next_join_result(&mut self) -> Result<Option<RecordBatch>> {
+        let (left_indices, right_indices, start) =
+            self.join_result_status.as_mut().ok_or_else(|| {
+                datafusion_common::_internal_datafusion_err!(
+                    "get_next_join_result called without initializing 
join_result_status"
+                )
+            })?;
+
+        let left_batch = self
+            .left_data
+            .as_ref()
+            .ok_or_else(|| {
+                datafusion_common::_internal_datafusion_err!("should have 
left_batch")
+            })?
+            .batch();
+
+        let right_batch = match &self.state {
+            NestedLoopJoinStreamState::ProcessProbeBatch(record_batch) => 
record_batch,
+            NestedLoopJoinStreamState::OutputUnmatchedBuildRows(record_batch) 
=> {

Review Comment:
   ```suggestion
               NestedLoopJoinStreamState::ProcessProbeBatch(record_batch) | 
NestedLoopJoinStreamState::OutputUnmatchedBuildRows(record_batch) => {
   ```



##########
datafusion/physical-plan/src/joins/nested_loop_join.rs:
##########
@@ -883,44 +1000,63 @@ impl<T: BatchTransformer> NestedLoopJoinStream<T> {
         let visited_left_side = left_data.bitmap();
         let batch = self.state.try_as_process_probe_batch()?;
 
-        match self.batch_transformer.next() {
-            None => {
-                // Setting up timer & updating input metrics
-                self.join_metrics.input_batches.add(1);
-                self.join_metrics.input_rows.add(batch.num_rows());
-                let timer = self.join_metrics.join_time.timer();
-
-                let result = join_left_and_right_batch(
-                    left_data.batch(),
-                    batch,
-                    self.join_type,
-                    self.filter.as_ref(),
-                    &self.column_indices,
-                    &self.schema,
-                    visited_left_side,
-                    &mut self.indices_cache,
-                    self.right_side_ordered,
-                );
-                timer.done();
+        let binding = self.join_metrics.join_time.clone();
+        let _timer = binding.timer();
+
+        if self.join_result_status.is_none() {
+            let (left_side_indices, right_side_indices) = 
join_left_and_right_batch(
+                left_data.batch(),
+                batch,
+                self.join_type,
+                self.filter.as_ref(),
+                visited_left_side,
+                &mut self.indices_cache,
+                self.right_side_ordered,
+                self.intermediate_batch_size,
+            )?;
+            self.join_result_status = Some((left_side_indices, 
right_side_indices, 0))
+        }
+
+        let join_result = self.get_next_join_result()?;
 
-                self.batch_transformer.set_batch(result?);
+        match join_result {
+            Some(res) => {
+                self.join_metrics.output_batches.add(1);
+                self.join_metrics.output_rows.add(res.num_rows());
+
+                Ok(StatefulStreamResult::Ready(Some(res)))
+            }
+            None => {
+                self.state = NestedLoopJoinStreamState::FetchProbeBatch;
+                self.join_result_status = None;
                 Ok(StatefulStreamResult::Continue)
             }
-            Some((batch, last)) => {
-                if last {
-                    self.state = NestedLoopJoinStreamState::FetchProbeBatch;
-                }
+        }
+    }
 
-                self.join_metrics.output_batches.add(1);
-                self.join_metrics.output_rows.add(batch.num_rows());
-                Ok(StatefulStreamResult::Ready(Some(batch)))
+    fn build_unmatched_output(
+        &mut self,
+    ) -> Result<StatefulStreamResult<Option<RecordBatch>>> {
+        if matches!(
+            self.state,
+            NestedLoopJoinStreamState::OutputUnmatchedBuildRows(_)
+        ) {
+            let start = Instant::now();
+            let res = self.get_next_join_result()?;
+            self.join_metrics.join_time.add_elapsed(start);
+            match res {
+                Some(res) => Ok(StatefulStreamResult::Ready(Some(res))),
+                None => {
+                    self.state = NestedLoopJoinStreamState::Completed;
+                    Ok(StatefulStreamResult::Ready(None))
+                }
             }
+        } else {
+            internal_err!("state should be OutputUnmatchBatch")

Review Comment:
   ```suggestion
               internal_err!("State should be OutputUnmatchedBuildRows")
   ```



##########
datafusion/physical-plan/src/joins/nested_loop_join.rs:
##########
@@ -729,10 +716,26 @@ struct NestedLoopJoinStream<T> {
     right_side_ordered: bool,
     /// Current state of the stream
     state: NestedLoopJoinStreamState,
+    #[allow(dead_code)]
+    // TODO: remove this field ??
     /// Transforms the output batch before returning.
     batch_transformer: T,
     /// Result of the left data future
     left_data: Option<Arc<JoinLeftData>>,
+
+    // Tracks progress when building join result batches incrementally
+    // Contains (build_indices, probe_indices, processed_count) where:
+    // - build_indices: row indices from build-side table (left table)
+    // - probe_indices: row indices from probe-side table (right table)
+    // - processed_count: number of index pairs already processed into output 
batches
+    // We have completed join result for indices [0..processed_count)
+    join_result_status: Option<(

Review Comment:
   I still think we should make this into a struct 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org
For additional commands, e-mail: github-h...@datafusion.apache.org

Reply via email to