jonathanc-n commented on code in PR #16443:
URL: https://github.com/apache/datafusion/pull/16443#discussion_r2181102938


##########
datafusion/physical-plan/src/joins/nested_loop_join.rs:
##########
@@ -828,13 +833,127 @@ impl<T: BatchTransformer> NestedLoopJoinStream<T> {
                     handle_state!(self.process_probe_batch())
                 }
                 NestedLoopJoinStreamState::ExhaustedProbeSide => {
-                    handle_state!(self.process_unmatched_build_batch())
+                    handle_state!(self.prepare_unmatched_output_indices())
+                }
+                NestedLoopJoinStreamState::OutputUnmatchedBuildRows(_) => {
+                    handle_state!(self.build_unmatched_output())
                 }
                 NestedLoopJoinStreamState::Completed => Poll::Ready(None),
             };
         }
     }
 
+    fn get_next_join_result(&mut self) -> Result<Option<RecordBatch>> {
+        let (left_indices, right_indices, start) =
+            self.join_result_status.as_mut().ok_or_else(|| {
+                datafusion_common::_internal_datafusion_err!(
+                    "should have join_result_status"

Review Comment:
   Should we import `_internal_datafusion_err!` so we do not need the path 
qualifier? small nit, just looks cleaner that way



##########
datafusion/physical-plan/src/joins/nested_loop_join.rs:
##########
@@ -828,13 +833,127 @@ impl<T: BatchTransformer> NestedLoopJoinStream<T> {
                     handle_state!(self.process_probe_batch())
                 }
                 NestedLoopJoinStreamState::ExhaustedProbeSide => {
-                    handle_state!(self.process_unmatched_build_batch())
+                    handle_state!(self.prepare_unmatched_output_indices())
+                }
+                NestedLoopJoinStreamState::OutputUnmatchedBuildRows(_) => {
+                    handle_state!(self.build_unmatched_output())
                 }
                 NestedLoopJoinStreamState::Completed => Poll::Ready(None),
             };
         }
     }
 
+    fn get_next_join_result(&mut self) -> Result<Option<RecordBatch>> {
+        let (left_indices, right_indices, start) =
+            self.join_result_status.as_mut().ok_or_else(|| {
+                datafusion_common::_internal_datafusion_err!(
+                    "should have join_result_status"
+                )
+            })?;
+
+        let left_batch = self
+            .left_data
+            .as_ref()
+            .ok_or_else(|| {
+                datafusion_common::_internal_datafusion_err!("should have 
left_batch")
+            })?
+            .batch();
+
+        let right_batch = match &self.state {
+            NestedLoopJoinStreamState::ProcessProbeBatch(record_batch) => 
record_batch,
+            NestedLoopJoinStreamState::OutputUnmatchedBuildRows(record_batch) 
=> {
+                record_batch
+            }
+            _ => {
+                return internal_err!(
+                    "state should be ProcessProbeBatch or OutputUnmatchBatch"
+                )
+            }
+        };
+
+        let current_start = *start;
+
+        if left_indices.is_empty() && right_indices.is_empty() && 
current_start == 0 {

Review Comment:
   Can we just build and return an empty batch instead of calling 
build_batch_from_indices?



##########
datafusion/physical-plan/src/joins/utils.rs:
##########
@@ -843,24 +844,56 @@ pub(crate) fn apply_join_filter_to_indices(
     probe_indices: UInt32Array,
     filter: &JoinFilter,
     build_side: JoinSide,
+    max_intermediate_size: Option<usize>,
 ) -> Result<(UInt64Array, UInt32Array)> {
     if build_indices.is_empty() && probe_indices.is_empty() {
         return Ok((build_indices, probe_indices));
     };
 
-    let intermediate_batch = build_batch_from_indices(
-        filter.schema(),
-        build_input_buffer,
-        probe_batch,
-        &build_indices,
-        &probe_indices,
-        filter.column_indices(),
-        build_side,
-    )?;
-    let filter_result = filter
-        .expression()
-        .evaluate(&intermediate_batch)?
-        .into_array(intermediate_batch.num_rows())?;
+    let filter_result = if let Some(max_size) = max_intermediate_size {

Review Comment:
   Shouldn't we have this done in this pull request? I think it would make more 
sense (just moving this logic to `build_join_indices`



##########
datafusion/physical-plan/src/joins/nested_loop_join.rs:
##########
@@ -883,44 +1000,63 @@ impl<T: BatchTransformer> NestedLoopJoinStream<T> {
         let visited_left_side = left_data.bitmap();
         let batch = self.state.try_as_process_probe_batch()?;
 
-        match self.batch_transformer.next() {
-            None => {
-                // Setting up timer & updating input metrics
-                self.join_metrics.input_batches.add(1);
-                self.join_metrics.input_rows.add(batch.num_rows());
-                let timer = self.join_metrics.join_time.timer();
-
-                let result = join_left_and_right_batch(
-                    left_data.batch(),
-                    batch,
-                    self.join_type,
-                    self.filter.as_ref(),
-                    &self.column_indices,
-                    &self.schema,
-                    visited_left_side,
-                    &mut self.indices_cache,
-                    self.right_side_ordered,
-                );
-                timer.done();
+        let binding = self.join_metrics.join_time.clone();
+        let _timer = binding.timer();
+
+        if self.join_result_status.is_none() {
+            let (left_side_indices, right_side_indices) = 
join_left_and_right_batch(
+                left_data.batch(),
+                batch,
+                self.join_type,
+                self.filter.as_ref(),
+                visited_left_side,
+                &mut self.indices_cache,
+                self.right_side_ordered,
+                self.intermediate_batch_size,
+            )?;
+            self.join_result_status = Some((left_side_indices, 
right_side_indices, 0))
+        }
+
+        let join_result = self.get_next_join_result()?;
 
-                self.batch_transformer.set_batch(result?);
+        match join_result {
+            Some(res) => {
+                self.join_metrics.output_batches.add(1);
+                self.join_metrics.output_rows.add(res.num_rows());
+
+                Ok(StatefulStreamResult::Ready(Some(res)))
+            }
+            None => {
+                self.state = NestedLoopJoinStreamState::FetchProbeBatch;
+                self.join_result_status = None;
                 Ok(StatefulStreamResult::Continue)
             }
-            Some((batch, last)) => {
-                if last {
-                    self.state = NestedLoopJoinStreamState::FetchProbeBatch;
-                }
+        }
+    }
 
-                self.join_metrics.output_batches.add(1);
-                self.join_metrics.output_rows.add(batch.num_rows());
-                Ok(StatefulStreamResult::Ready(Some(batch)))
+    fn build_unmatched_output(
+        &mut self,
+    ) -> Result<StatefulStreamResult<Option<RecordBatch>>> {
+        if matches!(

Review Comment:
   I do not think we need this check, it is already guaranteed that this 
function will only run when we have the `OutputUnmatchedBuildRows` state



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org
For additional commands, e-mail: github-h...@datafusion.apache.org

Reply via email to