2010YOUY01 commented on code in PR #21448:
URL: https://github.com/apache/datafusion/pull/21448#discussion_r3076787606


##########
datafusion/physical-plan/src/joins/nested_loop_join.rs:
##########
@@ -1153,31 +1231,318 @@ impl NestedLoopJoinStream {
             left_buffered_in_one_pass: true,
             handled_empty_output: false,
             should_track_unmatched_right: 
need_produce_right_in_final(join_type),
+            // Memory-limited fields (inactive until OOM fallback)
+            left_stream: None,
+            left_reservation: None,
+            left_stashed_batch: None,
+            left_pending_batches: Vec::new(),
+            left_schema: None,
+            spill_manager: None,
+            right_spill_in_progress: None,
+            right_spill_file: None,
+            right_max_batch_memory: 0,
+            is_first_right_pass: true,
+            // Fallback context
+            left_plan,
+            task_context,
         }
     }
 
+    /// Returns true if this stream is operating in memory-limited mode
+    fn is_memory_limited(&self) -> bool {
+        self.left_stream.is_some() || self.left_reservation.is_some()
+    }
+
+    /// Check if we can fall back to memory-limited mode on this error.
+    fn can_fallback_to_spill(&self, error: 
&datafusion_common::DataFusionError) -> bool {
+        self.left_plan.is_some()
+            && self.task_context.is_some()
+            && !self.is_memory_limited() // avoid infinite loop
+            && matches!(
+                error.find_root(),
+                datafusion_common::DataFusionError::ResourcesExhausted(_)
+            )
+    }
+
+    /// Switch from the standard OnceFut path to memory-limited mode.
+    ///
+    /// Re-executes the left child to get a fresh stream, creates a
+    /// SpillManager for right-side spilling, and sets up all the
+    /// memory-limited fields. The next call to `handle_buffering_left`
+    /// will dispatch to `handle_buffering_left_memory_limited`.
+    fn initiate_fallback(&mut self) -> Result<()> {
+        let left_plan = self
+            .left_plan
+            .as_ref()
+            .expect("left_plan must be set for fallback");
+        let context = self
+            .task_context
+            .as_ref()
+            .expect("task_context must be set for fallback");
+
+        // Re-execute left child to get a fresh stream
+        let left_stream = left_plan.execute(0, Arc::clone(context))?;
+        let left_schema = left_stream.schema();
+
+        // Create reservation with can_spill for fair memory allocation
+        let reservation = 
MemoryConsumer::new("NestedLoopJoinLoad[fallback]".to_string())
+            .with_can_spill(true)
+            .register(context.memory_pool());
+
+        // Create SpillManager for right-side spilling
+        let right_schema = self.right_data.schema();
+        let spill_manager = SpillManager::new(
+            context.runtime_env(),
+            self.metrics.spill_metrics.clone(),
+            right_schema,
+        )
+        .with_compression_type(context.session_config().spill_compression());
+
+        // Populate memory-limited fields
+        self.left_stream = Some(left_stream);
+        self.left_schema = Some(left_schema);
+        self.left_reservation = Some(reservation);
+        self.left_stashed_batch = None;
+        self.left_pending_batches = Vec::new();
+        self.spill_manager = Some(spill_manager);
+        self.right_spill_in_progress = None;
+        self.right_spill_file = None;
+        self.right_max_batch_memory = 0;
+        self.is_first_right_pass = true;
+
+        // State stays BufferingLeft — next poll will enter
+        // handle_buffering_left_memory_limited via is_memory_limited() check
+        self.state = NLJState::BufferingLeft;
+
+        Ok(())
+    }
+
     // ==== State handler functions ====
 
-    /// Handle BufferingLeft state - prepare left side batches
+    /// Handle BufferingLeft state - prepare left side batches.
+    ///
+    /// In standard mode, uses OnceFut to load all left data at once.
+    /// In memory-limited mode, incrementally buffers left batches until the
+    /// memory budget is reached or the left stream is exhausted.
     fn handle_buffering_left(
         &mut self,
         cx: &mut std::task::Context<'_>,
     ) -> ControlFlow<Poll<Option<Result<RecordBatch>>>> {
-        match self.left_data.get_shared(cx) {
-            Poll::Ready(Ok(left_data)) => {
-                self.buffered_left_data = Some(left_data);
-                // TODO: implement memory-limited case
-                self.left_exhausted = true;
-                self.state = NLJState::FetchingRight;
-                // Continue to next state immediately
-                ControlFlow::Continue(())
+        if self.is_memory_limited() {
+            self.handle_buffering_left_memory_limited(cx)
+        } else {
+            // Standard path: use OnceFut
+            match self.left_data.get_shared(cx) {
+                Poll::Ready(Ok(left_data)) => {
+                    self.buffered_left_data = Some(left_data);
+                    self.left_exhausted = true;
+                    self.state = NLJState::FetchingRight;
+                    ControlFlow::Continue(())
+                }
+                Poll::Ready(Err(e)) => {
+                    if self.can_fallback_to_spill(&e) {
+                        debug!(
+                            "NestedLoopJoin: OnceFut failed with OOM, \
+                             falling back to memory-limited mode"
+                        );
+                        match self.initiate_fallback() {
+                            Ok(()) => ControlFlow::Continue(()),
+                            Err(fallback_err) => {
+                                
ControlFlow::Break(Poll::Ready(Some(Err(fallback_err))))
+                            }
+                        }
+                    } else {
+                        ControlFlow::Break(Poll::Ready(Some(Err(e))))
+                    }
+                }
+                Poll::Pending => ControlFlow::Break(Poll::Pending),
             }
-            Poll::Ready(Err(e)) => 
ControlFlow::Break(Poll::Ready(Some(Err(e)))),
-            Poll::Pending => ControlFlow::Break(Poll::Pending),
         }
     }
 
-    /// Handle FetchingRight state - fetch next right batch and prepare for 
processing
+    /// Memory-limited path for handle_buffering_left.
+    ///
+    /// Incrementally polls the left stream and accumulates batches until:
+    /// - Memory reservation fails (chunk is full, more data remains)
+    /// - Left stream is exhausted (this is the last/only chunk)
+    fn handle_buffering_left_memory_limited(

Review Comment:
   Sounds good -- we could do this as the first follow-up PR to keep this PR 
simpler.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to