This is an automated email from the ASF dual-hosted git repository.

timsaucer pushed a commit to branch branch-52
in repository https://gitbox.apache.org/repos/asf/datafusion.git


The following commit(s) were added to refs/heads/branch-52 by this push:
     new 1bd7082b79 [branch-52] Fix repartition from dropping data when 
spilling (#20672) (#20777)
1bd7082b79 is described below

commit 1bd7082b798d0d55c1e90c7be1d7e3dba057c288
Author: Andrew Lamb <[email protected]>
AuthorDate: Sun Mar 8 11:14:32 2026 -0400

    [branch-52] Fix repartition from dropping data when spilling (#20672) 
(#20777)
    
    - Part of https://github.com/apache/datafusion/issues/20681
    - Closes https://github.com/apache/datafusion/issues/20683 on branch-52
    
    This PR:
    - Backports https://github.com/apache/datafusion/pull/20672 from
    @xanderbailey to the `branch-52` line
    
    Co-authored-by: Xander <[email protected]>
---
 datafusion/physical-plan/src/spill/spill_pool.rs | 102 ++++++++++++++++++++++-
 1 file changed, 101 insertions(+), 1 deletion(-)

diff --git a/datafusion/physical-plan/src/spill/spill_pool.rs 
b/datafusion/physical-plan/src/spill/spill_pool.rs
index e8eea360da..36920c7fd0 100644
--- a/datafusion/physical-plan/src/spill/spill_pool.rs
+++ b/datafusion/physical-plan/src/spill/spill_pool.rs
@@ -61,6 +61,10 @@ struct SpillPoolShared {
     /// Writer's reference to the current file (shared by all cloned writers).
     /// Has its own lock to allow I/O without blocking queue access.
     current_write_file: Option<Arc<Mutex<ActiveSpillFileShared>>>,
+    /// Number of active writer clones. Only when this reaches zero should
+    /// `writer_dropped` be set to true. This prevents premature EOF signaling
+    /// when one writer clone is dropped while others are still active.
+    active_writer_count: usize,
 }
 
 impl SpillPoolShared {
@@ -72,6 +76,7 @@ impl SpillPoolShared {
             waker: None,
             writer_dropped: false,
             current_write_file: None,
+            active_writer_count: 1,
         }
     }
 
@@ -97,7 +102,6 @@ impl SpillPoolShared {
 /// The writer automatically manages file rotation based on the 
`max_file_size_bytes`
 /// configured in [`channel`]. When the last writer clone is dropped, it 
finalizes the
 /// current file so readers can access all written data.
-#[derive(Clone)]
 pub struct SpillPoolWriter {
     /// Maximum size in bytes before rotating to a new file.
     /// Typically set from configuration 
`datafusion.execution.max_spill_file_size_bytes`.
@@ -106,6 +110,18 @@ pub struct SpillPoolWriter {
     shared: Arc<Mutex<SpillPoolShared>>,
 }
 
+impl Clone for SpillPoolWriter {
+    fn clone(&self) -> Self {
+        // Increment the active writer count so that `writer_dropped` is only
+        // set to true when the *last* clone is dropped.
+        self.shared.lock().active_writer_count += 1;
+        Self {
+            max_file_size_bytes: self.max_file_size_bytes,
+            shared: Arc::clone(&self.shared),
+        }
+    }
+}
+
 impl SpillPoolWriter {
     /// Spills a batch to the pool, rotating files when necessary.
     ///
@@ -233,6 +249,15 @@ impl Drop for SpillPoolWriter {
     fn drop(&mut self) {
         let mut shared = self.shared.lock();
 
+        shared.active_writer_count -= 1;
+        let is_last_writer = shared.active_writer_count == 0;
+
+        if !is_last_writer {
+            // Other writer clones are still active; do not finalize or
+            // signal EOF to readers.
+            return;
+        }
+
         // Finalize the current file when the last writer is dropped
         if let Some(current_file) = shared.current_write_file.take() {
             // Release shared lock before locking file
@@ -1343,6 +1368,81 @@ mod tests {
         Ok(())
     }
 
+    /// Verifies that the reader stays alive as long as any writer clone 
exists.
+    ///
+    /// `SpillPoolWriter` is `Clone`, and in non-preserve-order repartitioning
+    /// mode multiple input partition tasks share clones of the same writer.
+    /// The reader must not see EOF until **all** clones have been dropped,
+    /// even if the queue is temporarily empty between writes from different
+    /// clones.
+    ///
+    /// The test sequence is:
+    ///
+    /// 1. writer1 writes a batch, then is dropped.
+    /// 2. The reader consumes that batch (queue is now empty).
+    /// 3. writer2 (still alive) writes a batch.
+    /// 4. The reader must see that batch.
+    /// 5. EOF is only signalled after writer2 is also dropped.
+    #[tokio::test]
+    async fn test_clone_drop_does_not_signal_eof_prematurely() -> Result<()> {
+        let (writer1, mut reader) = create_spill_channel(1024 * 1024);
+        let writer2 = writer1.clone();
+
+        // Synchronization: tell writer2 when it may proceed.
+        let (proceed_tx, proceed_rx) = tokio::sync::oneshot::channel::<()>();
+
+        // Spawn writer2 — it waits for the signal before writing.
+        let writer2_handle = SpawnedTask::spawn(async move {
+            proceed_rx.await.unwrap();
+            writer2.push_batch(&create_test_batch(10, 10)).unwrap();
+            // writer2 is dropped here (last clone → true EOF)
+        });
+
+        // Writer1 writes one batch, then drops.
+        writer1.push_batch(&create_test_batch(0, 10))?;
+        drop(writer1);
+
+        // Read writer1's batch.
+        let batch1 = reader.next().await.unwrap()?;
+        assert_eq!(batch1.num_rows(), 10);
+        let col = batch1
+            .column(0)
+            .as_any()
+            .downcast_ref::<Int32Array>()
+            .unwrap();
+        assert_eq!(col.value(0), 0);
+
+        // Signal writer2 to write its batch. It will execute when the
+        // current task yields (i.e. when reader.next() returns Pending).
+        proceed_tx.send(()).unwrap();
+
+        // The reader should wait (Pending) for writer2's data, not EOF.
+        let batch2 =
+            tokio::time::timeout(std::time::Duration::from_secs(5), 
reader.next())
+                .await
+                .expect("Reader timed out — should not hang");
+
+        assert!(
+            batch2.is_some(),
+            "Reader must not return EOF while a writer clone is still alive"
+        );
+        let batch2 = batch2.unwrap()?;
+        assert_eq!(batch2.num_rows(), 10);
+        let col = batch2
+            .column(0)
+            .as_any()
+            .downcast_ref::<Int32Array>()
+            .unwrap();
+        assert_eq!(col.value(0), 10);
+
+        writer2_handle.await.unwrap();
+
+        // All writers dropped — reader should see real EOF now.
+        assert!(reader.next().await.is_none());
+
+        Ok(())
+    }
+
     #[tokio::test]
     async fn test_disk_usage_decreases_as_files_consumed() -> Result<()> {
         use datafusion_execution::runtime_env::RuntimeEnvBuilder;


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to