mhilton commented on code in PR #12634:
URL: https://github.com/apache/datafusion/pull/12634#discussion_r1778300562


##########
datafusion/physical-plan/src/joins/nested_loop_join.rs:
##########
@@ -560,91 +729,147 @@ impl NestedLoopJoinStream {
         // Get or initialize visited_left_side bitmap if required by join type
         let visited_left_side = left_data.bitmap();
 
-        // Check is_exhausted before polling the outer_table, such that when 
the outer table
-        // does not support `FusedStream`, Self will not poll it again
-        if self.is_exhausted {
-            return Poll::Ready(None);
-        }
+        loop {
+            if let Some(batch) = self.output_buffer.next() {
+                self.join_metrics.output_batches.add(1);
+                self.join_metrics.output_rows.add(batch.num_rows());
+                return Poll::Ready(Some(Ok(batch)));
+            }
 
-        self.outer_table
-            .poll_next_unpin(cx)
-            .map(|maybe_batch| match maybe_batch {
-                Some(Ok(right_batch)) => {
-                    // Setting up timer & updating input metrics
-                    self.join_metrics.input_batches.add(1);
-                    self.join_metrics.input_rows.add(right_batch.num_rows());
-                    let timer = self.join_metrics.join_time.timer();
-
-                    let result = join_left_and_right_batch(
-                        left_data.batch(),
-                        &right_batch,
-                        self.join_type,
-                        self.filter.as_ref(),
-                        &self.column_indices,
-                        &self.schema,
-                        visited_left_side,
-                        &mut self.indices_cache,
-                        self.right_side_ordered,
-                    );
-
-                    // Recording time & updating output metrics
-                    if let Ok(batch) = &result {
-                        timer.done();
-                        self.join_metrics.output_batches.add(1);
-                        self.join_metrics.output_rows.add(batch.num_rows());
+            // Check is_exhausted before polling the outer_table, such that 
when the outer table
+            // does not support `FusedStream`, Self will not poll it again
+            if self.is_exhausted {
+                let batch = self.output_buffer.finish()?;
+                if let Some(batch) = &batch {
+                    self.join_metrics.output_batches.add(1);
+                    self.join_metrics.output_rows.add(batch.num_rows());
+                }
+                return Poll::Ready(Ok(batch).transpose());
+            }
+
+            if self.outer_record_batch.is_none() {
+                // Get the next outer record batch
+                match self.outer_table.poll_next_unpin(cx) {
+                    Poll::Ready(Some(Ok(batch))) => {
+                        self.join_metrics.input_batches.add(1);
+                        self.join_metrics.input_rows.add(batch.num_rows());
+                        self.memory_reservation
+                            .try_grow(batch.get_array_memory_size())?;
+                        self.outer_record_batch = Some(batch);
+                        self.outer_record_batch_row = 0;
                     }
+                    Poll::Ready(Some(Err(e))) => return 
Poll::Ready(Some(Err(e))),
+                    Poll::Ready(None) => {
+                        if need_produce_result_in_final(self.join_type) {
+                            // At this stage `visited_left_side` won't be 
updated, so it's
+                            // safe to report about probe completion.
+                            //
+                            // Setting `is_exhausted` will prevent from 
multiple calls of
+                            // `report_probe_completed()`
+                            if !left_data.report_probe_completed() {
+                                self.is_exhausted = true;
+                                continue;
+                            };
+
+                            // Only setting up timer, input is exhausted
+                            let timer = self.join_metrics.join_time.timer();
+                            // use the global left bitmap to produce the left 
indices and right indices
+                            let (left_side, right_side) =
+                                get_final_indices_from_shared_bitmap(
+                                    visited_left_side,
+                                    self.join_type,
+                                );
+                            let empty_right_batch =
+                                
RecordBatch::new_empty(self.outer_table.schema());
+                            // use the left and right indices to produce the 
batch result
+                            let result = build_batch_from_indices(
+                                &self.schema,
+                                left_data.batch(),
+                                &empty_right_batch,
+                                &left_side,
+                                &right_side,
+                                &self.column_indices,
+                                JoinSide::Left,
+                            );
+                            self.is_exhausted = true;
 
-                    Some(result)
-                }
-                Some(err) => Some(err),
-                None => {
-                    if need_produce_result_in_final(self.join_type) {
-                        // At this stage `visited_left_side` won't be updated, 
so it's
-                        // safe to report about probe completion.
-                        //
-                        // Setting `is_exhausted` / returning None will 
prevent from
-                        // multiple calls of `report_probe_completed()`
-                        if !left_data.report_probe_completed() {
+                            // Recording time & updating output metrics
+                            match result {
+                                Ok(batch) => {
+                                    timer.done();
+                                    self.output_buffer.push(batch)?;
+                                    continue;
+                                }
+                                Err(e) => return Poll::Ready(Some(Err(e))),
+                            }
+                        } else {
                             self.is_exhausted = true;
-                            return None;
+                            continue;
+                        }
+                    }
+                    Poll::Pending => {
+                        return match self.output_buffer.flush() {
+                            Ok(Some(batch)) => {
+                                // If there was anything in the output buffer 
flush it
+                                // so that it can be processed.
+                                self.join_metrics.output_batches.add(1);
+                                
self.join_metrics.output_rows.add(batch.num_rows());
+                                Poll::Ready(Some(Ok(batch)))
+                            }
+                            Ok(None) => Poll::Pending,
+                            Err(err) => Poll::Ready(Some(Err(err))),
                         };
+                    }
+                }
+            }
 
-                        // Only setting up timer, input is exhausted
-                        let timer = self.join_metrics.join_time.timer();
-                        // use the global left bitmap to produce the left 
indices and right indices
-                        let (left_side, right_side) =
-                            get_final_indices_from_shared_bitmap(
-                                visited_left_side,
-                                self.join_type,
-                            );
-                        let empty_right_batch =
-                            RecordBatch::new_empty(self.outer_table.schema());
-                        // use the left and right indices to produce the batch 
result
-                        let result = build_batch_from_indices(
-                            &self.schema,
-                            left_data.batch(),
-                            &empty_right_batch,
-                            &left_side,
-                            &right_side,
-                            &self.column_indices,
-                            JoinSide::Left,
-                        );
-                        self.is_exhausted = true;
-
-                        // Recording time & updating output metrics
-                        if let Ok(batch) = &result {
-                            timer.done();
-                            self.join_metrics.output_batches.add(1);
-                            
self.join_metrics.output_rows.add(batch.num_rows());
-                        }
+            debug_assert!(self.outer_record_batch.is_some());
+            let right_batch = self.outer_record_batch.as_ref().unwrap();
+            let num_rows = match (self.join_type, 
left_data.batch().num_rows()) {
+                // An inner join will only produce 1 output row per input row.
+                (JoinType::Inner, _) | (_, 0) => 
self.output_buffer.needed_rows(),

Review Comment:
   You are right of course, I'm not sure why I got it in my head that they 
would be 1-1. I'll fix that.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org
For additional commands, e-mail: github-h...@datafusion.apache.org

Reply via email to