Copilot commented on code in PR #18207:
URL: https://github.com/apache/datafusion/pull/18207#discussion_r2453422764


##########
datafusion/physical-plan/src/repartition/mod.rs:
##########
@@ -1331,58 +1383,77 @@ impl Stream for PerPartitionStream {
         mut self: Pin<&mut Self>,
         cx: &mut Context<'_>,
     ) -> Poll<Option<Self::Item>> {
+        use futures::StreamExt;
+
         loop {
-            match &mut self.state {
-                RepartitionStreamState::ReceivingFromChannel => {
-                    let value = 
futures::ready!(self.receiver.recv().poll_unpin(cx));
-                    match value {
-                        Some(Some(v)) => match v {
-                            Ok(RepartitionBatch::Memory(batch)) => {
-                                // Release memory and return
-                                self.reservation
-                                    .lock()
-                                    .shrink(batch.get_array_memory_size());
-                                return Poll::Ready(Some(Ok(batch)));
-                            }
-                            Ok(RepartitionBatch::Spilled { spill_file, size }) 
=> {
-                                // Read from disk - SpillReaderStream uses 
tokio::fs internally
-                                // Pass the original size for validation
-                                let stream = self
-                                    .spill_manager
-                                    .read_spill_as_stream(spill_file, 
Some(size))?;
-                                self.state =
-                                    
RepartitionStreamState::ReadingSpilledBatch(stream);
-                                // Continue loop to poll the stream immediately
-                            }
-                            Err(e) => {
-                                return Poll::Ready(Some(Err(e)));
-                            }
-                        },
-                        Some(None) => {
-                            // Input partition has finished sending batches
-                            return Poll::Ready(None);
-                        }
-                        None => return Poll::Ready(None),
+            // First, check if there's a spilled batch available
+            match self.spill_stream.poll_next_unpin(cx) {
+                Poll::Ready(Some(Ok(batch))) => {
+                    // Got a spilled batch
+                    return Poll::Ready(Some(Ok(batch)));
+                }
+                Poll::Ready(Some(Err(e))) => {
+                    return Poll::Ready(Some(Err(e)));
+                }
+                Poll::Ready(None) => {
+                    // Spill stream ended - all spilled data has been read
+                    return Poll::Ready(None);
+                }
+                Poll::Pending => {
+                    // No spilled data available
+                    if self.input_finished {
+                        // Input finished and no more spilled data - we're done
+                        return Poll::Ready(None);
                     }
+                    // Otherwise check the channel
                 }
+            }
 
-                RepartitionStreamState::ReadingSpilledBatch(stream) => {
-                    match futures::ready!(stream.poll_next_unpin(cx)) {
-                        Some(Ok(batch)) => {
-                            // Return batch and stay in ReadingSpilledBatch 
state to read more batches
-                            return Poll::Ready(Some(Ok(batch)));
-                        }
-                        Some(Err(e)) => {
-                            self.state = 
RepartitionStreamState::ReceivingFromChannel;
-                            return Poll::Ready(Some(Err(e)));
-                        }
-                        None => {
-                            // Spill stream ended - go back to receiving from 
channel
-                            self.state = 
RepartitionStreamState::ReceivingFromChannel;
-                            continue;
-                        }
+            // If input is finished, don't poll channel anymore
+            if self.input_finished {
+                continue;

Review Comment:
   This continue statement creates an infinite loop when `input_finished` is 
true and spill_stream returns Pending. Since the channel won't be polled and 
spill_stream already returned Pending, the loop will continue indefinitely 
without waiting. The logic at line 1404-1406 should have returned 
Poll::Ready(None) when input is finished and no spilled data is available, but 
this code path bypasses that check.
   ```suggestion
                   return Poll::Ready(None);
   ```



##########
datafusion/physical-plan/src/repartition/mod.rs:
##########
@@ -1228,67 +1259,85 @@ impl Stream for RepartitionStream {
         mut self: Pin<&mut Self>,
         cx: &mut Context<'_>,
     ) -> Poll<Option<Self::Item>> {
+        use futures::StreamExt;
+
         loop {
-            match &mut self.state {
-                RepartitionStreamState::ReceivingFromChannel => {
-                    let value = 
futures::ready!(self.input.recv().poll_unpin(cx));
-                    match value {
-                        Some(Some(v)) => match v {
-                            Ok(RepartitionBatch::Memory(batch)) => {
-                                // Release memory and return
-                                self.reservation
-                                    .lock()
-                                    .shrink(batch.get_array_memory_size());
-                                return Poll::Ready(Some(Ok(batch)));
-                            }
-                            Ok(RepartitionBatch::Spilled { spill_file, size }) 
=> {
-                                // Read from disk - SpillReaderStream uses 
tokio::fs internally
-                                // Pass the original size for validation
-                                let stream = self
-                                    .spill_manager
-                                    .read_spill_as_stream(spill_file, 
Some(size))?;
-                                self.state =
-                                    
RepartitionStreamState::ReadingSpilledBatch(stream);
-                                // Continue loop to poll the stream immediately
-                            }
-                            Err(e) => {
-                                return Poll::Ready(Some(Err(e)));
-                            }
-                        },
-                        Some(None) => {
-                            self.num_input_partitions_processed += 1;
-
-                            if self.num_input_partitions
-                                == self.num_input_partitions_processed
-                            {
-                                // all input partitions have finished sending 
batches
-                                return Poll::Ready(None);
-                            } else {
-                                // other partitions still have data to send
-                                continue;
-                            }
-                        }
-                        None => {
-                            return Poll::Ready(None);
-                        }
+            // First, check if there's a spilled batch available
+            match self.spill_stream.poll_next_unpin(cx) {
+                Poll::Ready(Some(Ok(batch))) => {
+                    // Got a spilled batch
+                    return Poll::Ready(Some(Ok(batch)));
+                }
+                Poll::Ready(Some(Err(e))) => {
+                    return Poll::Ready(Some(Err(e)));
+                }
+                Poll::Ready(None) => {
+                    // Spill stream ended - all spilled data has been read
+                    return Poll::Ready(None);
+                }
+                Poll::Pending => {
+                    // No spilled data available right now
+                    if self.all_inputs_finished {
+                        // All inputs finished, wait for spill stream to have 
more data or finish
+                        return Poll::Pending;
                     }
+                    // Otherwise check the channel
                 }
-                RepartitionStreamState::ReadingSpilledBatch(stream) => {
-                    match futures::ready!(stream.poll_next_unpin(cx)) {
-                        Some(Ok(batch)) => {
-                            // Return batch and stay in ReadingSpilledBatch 
state to read more batches
-                            return Poll::Ready(Some(Ok(batch)));
-                        }
-                        Some(Err(e)) => {
-                            self.state = 
RepartitionStreamState::ReceivingFromChannel;
-                            return Poll::Ready(Some(Err(e)));
-                        }
-                        None => {
-                            // Spill stream ended - go back to receiving from 
channel
-                            self.state = 
RepartitionStreamState::ReceivingFromChannel;
-                            continue;
-                        }
+            }
+
+            // If all inputs are finished, don't poll channel anymore, just 
wait for spill_stream
+            if self.all_inputs_finished {
+                return Poll::Pending;
+            }
+
+            // Try to get next item from channel
+            let value = match self.input.recv().poll_unpin(cx) {
+                Poll::Ready(v) => v,
+                Poll::Pending => {
+                    // Nothing from channel either, wait
+                    return Poll::Pending;
+                }
+            };
+
+            match value {
+                Some(Some(v)) => match v {
+                    Ok(RepartitionBatch::Memory(batch)) => {
+                        // Release memory and return
+                        self.reservation
+                            .lock()
+                            .shrink(batch.get_array_memory_size());
+                        return Poll::Ready(Some(Ok(batch)));
+                    }
+                    Ok(RepartitionBatch::Spilled) => {
+                        // Batch was spilled, it's available in spill_stream
+                        // Loop back to poll spill_stream again
+                        continue;
+                    }
+                    Err(e) => {
+                        return Poll::Ready(Some(Err(e)));
                     }
+                },
+                Some(None) => {
+                    self.num_input_partitions_processed += 1;
+
+                    if self.num_input_partitions == 
self.num_input_partitions_processed {
+                        // All input partitions have finished sending batches
+                        // Flush and finalize the SpillPool
+                        {
+                            let mut pool = self.spill_pool.lock();
+                            pool.flush().ok();

Review Comment:
   Silently ignoring flush errors with `.ok()` could lead to data loss if the 
flush fails. Consider logging the error or returning it to the caller for 
proper error handling.
   ```suggestion
                               if let Err(e) = pool.flush() {
                                   return 
Poll::Ready(Some(Err(DataFusionError::External(Box::new(e)))));
                               }
   ```



##########
datafusion/physical-plan/src/spill/spill_pool.rs:
##########
@@ -0,0 +1,1105 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! SpillPool: A reusable abstraction for managing spill files with FIFO 
semantics.
+//!
+//! # Overview
+//!
+//! The `SpillPool` provides a centralized mechanism for spilling record 
batches to disk
+//! when memory is constrained. It manages a collection of spill files, each 
containing
+//! multiple batches, with configurable maximum file sizes.
+//!
+//! # Design
+//!
+//! - **FIFO (Queue) semantics**: Batches are read in the order they were 
spilled
+//! - **File handle reuse**: Multiple batches are written to the same file to 
minimize syscalls
+//! - **Automatic file rotation**: When a file exceeds `max_file_size_bytes`, 
rotate to a new file
+//! - **Sequential reading**: Uses IPC Stream format's natural sequential 
access pattern
+//! - **Automatic cleanup**: Files are deleted once fully consumed
+//!
+//! # Usage Example
+//!
+//! ```ignore
+//! let pool = SpillPool::new(
+//!     100 * 1024 * 1024,  // 100MB max per file
+//!     spill_manager,
+//!     schema,
+//! );
+//!
+//! // Spill batches - automatically rotates files when size limit reached
+//! pool.push_batch(batch1)?;
+//! pool.push_batch(batch2)?;
+//! pool.flush()?;  // Finalize current file
+//!
+//! // Read back in FIFO order
+//! let batch = pool.pop_batch()?.unwrap();  // Returns batch1
+//! let batch = pool.pop_batch()?.unwrap();  // Returns batch2
+//! ```
+
+use std::collections::VecDeque;
+use std::sync::Arc;
+use std::task::Waker;
+
+use parking_lot::Mutex;
+
+use arrow::datatypes::SchemaRef;
+use arrow::record_batch::RecordBatch;
+use datafusion_common::Result;
+use datafusion_execution::disk_manager::RefCountedTempFile;
+use datafusion_execution::SendableRecordBatchStream;
+
+use super::in_progress_spill_file::InProgressSpillFile;
+use super::spill_manager::SpillManager;
+
+/// A single spill file containing one or more record batches.
+struct SpillFile {
+    /// The temp file handle (auto-deletes when dropped)
+    file: RefCountedTempFile,
+}
+
+impl SpillFile {
+    fn new(file: RefCountedTempFile, _total_batches: usize, _total_size: 
usize) -> Self {

Review Comment:
   The parameters `_total_batches` and `_total_size` are unused. If these are 
intended for future use or debugging, consider adding a comment explaining 
their purpose. Otherwise, remove them to simplify the API.
   ```suggestion
       fn new(file: RefCountedTempFile) -> Self {
   ```



##########
datafusion/physical-plan/src/repartition/mod.rs:
##########
@@ -1228,67 +1259,85 @@ impl Stream for RepartitionStream {
         mut self: Pin<&mut Self>,
         cx: &mut Context<'_>,
     ) -> Poll<Option<Self::Item>> {
+        use futures::StreamExt;
+
         loop {
-            match &mut self.state {
-                RepartitionStreamState::ReceivingFromChannel => {
-                    let value = 
futures::ready!(self.input.recv().poll_unpin(cx));
-                    match value {
-                        Some(Some(v)) => match v {
-                            Ok(RepartitionBatch::Memory(batch)) => {
-                                // Release memory and return
-                                self.reservation
-                                    .lock()
-                                    .shrink(batch.get_array_memory_size());
-                                return Poll::Ready(Some(Ok(batch)));
-                            }
-                            Ok(RepartitionBatch::Spilled { spill_file, size }) 
=> {
-                                // Read from disk - SpillReaderStream uses 
tokio::fs internally
-                                // Pass the original size for validation
-                                let stream = self
-                                    .spill_manager
-                                    .read_spill_as_stream(spill_file, 
Some(size))?;
-                                self.state =
-                                    
RepartitionStreamState::ReadingSpilledBatch(stream);
-                                // Continue loop to poll the stream immediately
-                            }
-                            Err(e) => {
-                                return Poll::Ready(Some(Err(e)));
-                            }
-                        },
-                        Some(None) => {
-                            self.num_input_partitions_processed += 1;
-
-                            if self.num_input_partitions
-                                == self.num_input_partitions_processed
-                            {
-                                // all input partitions have finished sending 
batches
-                                return Poll::Ready(None);
-                            } else {
-                                // other partitions still have data to send
-                                continue;
-                            }
-                        }
-                        None => {
-                            return Poll::Ready(None);
-                        }
+            // First, check if there's a spilled batch available
+            match self.spill_stream.poll_next_unpin(cx) {
+                Poll::Ready(Some(Ok(batch))) => {
+                    // Got a spilled batch
+                    return Poll::Ready(Some(Ok(batch)));
+                }
+                Poll::Ready(Some(Err(e))) => {
+                    return Poll::Ready(Some(Err(e)));
+                }
+                Poll::Ready(None) => {
+                    // Spill stream ended - all spilled data has been read
+                    return Poll::Ready(None);
+                }
+                Poll::Pending => {
+                    // No spilled data available right now
+                    if self.all_inputs_finished {
+                        // All inputs finished, wait for spill stream to have 
more data or finish
+                        return Poll::Pending;

Review Comment:
   When `all_inputs_finished` is true and spill_stream returns Pending, this 
returns Pending without checking if the spill stream will ever produce more 
data. According to the SpillPool implementation, once finalized, the stream 
should return None when there's no more data. However, if finalize() fails or 
wasn't called, this could hang indefinitely. Consider adding a check to ensure 
the spill pool was properly finalized before returning Pending.
   ```suggestion
                           // All inputs finished, check if the spill pool is 
finalized.
                           let is_finalized = {
                               let pool = self.spill_pool.lock();
                               pool.is_finalized()
                           };
                           if is_finalized {
                               // If finalized and still pending, treat as end 
of stream to avoid hanging.
                               return Poll::Ready(None);
                           } else {
                               // Not finalized yet, wait for more data or 
finalization.
                               return Poll::Pending;
                           }
   ```



##########
datafusion/physical-plan/src/repartition/mod.rs:
##########
@@ -1331,58 +1383,77 @@ impl Stream for PerPartitionStream {
         mut self: Pin<&mut Self>,
         cx: &mut Context<'_>,
     ) -> Poll<Option<Self::Item>> {
+        use futures::StreamExt;
+
         loop {
-            match &mut self.state {
-                RepartitionStreamState::ReceivingFromChannel => {
-                    let value = 
futures::ready!(self.receiver.recv().poll_unpin(cx));
-                    match value {
-                        Some(Some(v)) => match v {
-                            Ok(RepartitionBatch::Memory(batch)) => {
-                                // Release memory and return
-                                self.reservation
-                                    .lock()
-                                    .shrink(batch.get_array_memory_size());
-                                return Poll::Ready(Some(Ok(batch)));
-                            }
-                            Ok(RepartitionBatch::Spilled { spill_file, size }) 
=> {
-                                // Read from disk - SpillReaderStream uses 
tokio::fs internally
-                                // Pass the original size for validation
-                                let stream = self
-                                    .spill_manager
-                                    .read_spill_as_stream(spill_file, 
Some(size))?;
-                                self.state =
-                                    
RepartitionStreamState::ReadingSpilledBatch(stream);
-                                // Continue loop to poll the stream immediately
-                            }
-                            Err(e) => {
-                                return Poll::Ready(Some(Err(e)));
-                            }
-                        },
-                        Some(None) => {
-                            // Input partition has finished sending batches
-                            return Poll::Ready(None);
-                        }
-                        None => return Poll::Ready(None),
+            // First, check if there's a spilled batch available
+            match self.spill_stream.poll_next_unpin(cx) {
+                Poll::Ready(Some(Ok(batch))) => {
+                    // Got a spilled batch
+                    return Poll::Ready(Some(Ok(batch)));
+                }
+                Poll::Ready(Some(Err(e))) => {
+                    return Poll::Ready(Some(Err(e)));
+                }
+                Poll::Ready(None) => {
+                    // Spill stream ended - all spilled data has been read
+                    return Poll::Ready(None);
+                }
+                Poll::Pending => {
+                    // No spilled data available
+                    if self.input_finished {
+                        // Input finished and no more spilled data - we're done
+                        return Poll::Ready(None);
                     }
+                    // Otherwise check the channel
                 }
+            }
 
-                RepartitionStreamState::ReadingSpilledBatch(stream) => {
-                    match futures::ready!(stream.poll_next_unpin(cx)) {
-                        Some(Ok(batch)) => {
-                            // Return batch and stay in ReadingSpilledBatch 
state to read more batches
-                            return Poll::Ready(Some(Ok(batch)));
-                        }
-                        Some(Err(e)) => {
-                            self.state = 
RepartitionStreamState::ReceivingFromChannel;
-                            return Poll::Ready(Some(Err(e)));
-                        }
-                        None => {
-                            // Spill stream ended - go back to receiving from 
channel
-                            self.state = 
RepartitionStreamState::ReceivingFromChannel;
-                            continue;
-                        }
+            // If input is finished, don't poll channel anymore
+            if self.input_finished {
+                continue;
+            }
+
+            // Try to get next item from channel
+            let value = match self.receiver.recv().poll_unpin(cx) {
+                Poll::Ready(v) => v,
+                Poll::Pending => {
+                    // Nothing from channel either, wait
+                    return Poll::Pending;
+                }
+            };
+
+            match value {
+                Some(Some(v)) => match v {
+                    Ok(RepartitionBatch::Memory(batch)) => {
+                        // Release memory and return
+                        self.reservation
+                            .lock()
+                            .shrink(batch.get_array_memory_size());
+                        return Poll::Ready(Some(Ok(batch)));
+                    }
+                    Ok(RepartitionBatch::Spilled) => {
+                        // Batch was spilled, it's available in spill_stream
+                        // Loop back to poll spill_stream again
+                        continue;
                     }
+                    Err(e) => {
+                        return Poll::Ready(Some(Err(e)));
+                    }
+                },
+                Some(None) => {
+                    // Input partition has finished sending batches
+                    // Flush and finalize the SpillPool
+                    {
+                        let mut pool = self.spill_pool.lock();
+                        pool.flush().ok();

Review Comment:
   Silently ignoring flush errors with `.ok()` could lead to data loss if the 
flush fails. Consider logging the error or returning it to the caller for 
proper error handling.
   ```suggestion
                           if let Err(e) = pool.flush() {
                               return Poll::Ready(Some(Err(e)));
                           }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to