alamb commented on code in PR #20672:
URL: https://github.com/apache/datafusion/pull/20672#discussion_r2885932273


##########
datafusion/physical-plan/src/spill/spill_pool.rs:
##########
@@ -233,6 +249,15 @@ impl Drop for SpillPoolWriter {
     fn drop(&mut self) {
         let mut shared = self.shared.lock();
 
+        shared.active_writer_count -= 1;
+        let is_last_writer = shared.active_writer_count == 0;
+
+        if !is_last_writer {
+            // Other writer clones are still active; do not finalize or
+            // signal EOF to readers.
+            return;
+        }
+
         // Finalize the current file when the last writer is dropped
         if let Some(current_file) = shared.current_write_file.take() {

Review Comment:
   this would also be great to move into a function of `shared` rather than 
have the logic in the drop of the sharedwriter
   
   That being said, it wasn't introduced in this PR so no need to change it



##########
datafusion/physical-plan/src/spill/spill_pool.rs:
##########
@@ -233,6 +249,15 @@ impl Drop for SpillPoolWriter {
     fn drop(&mut self) {
         let mut shared = self.shared.lock();
 
+        shared.active_writer_count -= 1;

Review Comment:
   I think in general this pattern of incrementing / decrementing / checking a 
counter from some other strcuture under a lock is error prone (e.g. if someone 
else changes this code in the future but forgets to update the counter it may 
be a hard to find bug)
   
   I would personally suggest encapsulating the counter into some structure  
that encpsulated the increment/decrement 
   
   Something maybe like putting an `Arc<AtomicUsize>` on the shared pool and 
then  passing out objects like
   
   ```rust
   struct WriterLease {
     inner: Arc<AtomicUsize>
   }
   
   impl Drop for WriterLease { 
   ...
   // decrement the counter
   }
   ```
   
   And then just get one of those for the SpillPollWriter
   
   ```rust
   pub struct SpillPoolWriter {
     lease: WriterLease
   ...
   }
   ```
   
   However it is a fair point that this PR is probably more efficient as it 
uses an already existing lock 🤷 
   
   I don't feel strongly about this



##########
datafusion/physical-plan/src/spill/spill_pool.rs:
##########
@@ -1343,6 +1368,90 @@ mod tests {
         Ok(())
     }
 
+    /// Regression test for data loss when multiple writer clones are used.
+    ///
+    /// `SpillPoolWriter` is `Clone`, and in non-preserve-order repartitioning
+    /// mode all input partition tasks share clones of the same writer. Before
+    /// the fix, `Drop` unconditionally set `writer_dropped = true` even when
+    /// other clones were still alive. This caused the `SpillPoolReader` to
+    /// return EOF prematurely, silently losing every batch written by the
+    /// remaining writers.
+    ///
+    /// The test sequence is:
+    ///
+    /// 1. writer1 writes a batch, then is dropped.
+    /// 2. The reader consumes that batch.
+    /// 3. The reader polls again — the queue is now empty.
+    ///    - **Bug**: `writer_dropped` is already true → `Ready(None)` (EOF).
+    ///    - **Fix**: `active_writer_count > 0` → `Pending` (wait for data).
+    /// 4. writer2 (still alive) writes a batch.
+    /// 5. The reader must see that batch — not silently lose it.
+    #[tokio::test]
+    async fn test_clone_drop_does_not_signal_eof_prematurely() -> Result<()> {
+        let (writer1, mut reader) = create_spill_channel(1024 * 1024);
+        let writer2 = writer1.clone();
+
+        // Synchronization: tell writer2 when it may proceed.
+        let (proceed_tx, proceed_rx) = tokio::sync::oneshot::channel::<()>();
+
+        // Spawn writer2 — it waits for the signal before writing.
+        let writer2_handle = SpawnedTask::spawn(async move {
+            proceed_rx.await.unwrap();
+            writer2.push_batch(&create_test_batch(10, 10)).unwrap();
+            // writer2 is dropped here (last clone → true EOF)
+        });
+
+        // Writer1 writes one batch, then drops.
+        writer1.push_batch(&create_test_batch(0, 10))?;
+        drop(writer1);
+
+        // Read writer1's batch.
+        let batch1 = reader.next().await.unwrap()?;
+        assert_eq!(batch1.num_rows(), 10);
+        let col = batch1
+            .column(0)
+            .as_any()
+            .downcast_ref::<Int32Array>()
+            .unwrap();
+        assert_eq!(col.value(0), 0);
+
+        // Signal writer2 to write its batch. It will execute when the
+        // current task yields (i.e. when reader.next() returns Pending).
+        proceed_tx.send(()).unwrap();
+
+        // With the bug the reader returns None here because it already
+        // saw writer_dropped=true on an empty queue. With the fix it
+        // returns Pending, the runtime schedules writer2, and the batch
+        // becomes available.
+        let batch2 = tokio::time::timeout(
+            std::time::Duration::from_secs(5),
+            reader.next(),
+        )
+        .await
+        .expect("Reader timed out — should not hang");
+
+        assert!(

Review Comment:
   I also verified that this test fails without the code fix
   
   ```shell
   andrewlamb@Andrews-MacBook-Pro-3:~/Software/datafusion2$ cargo test -p 
datafusion-physical-plan test_clone_drop_does_not_signal_eof_prematurely
       Finished `test` profile [unoptimized + debuginfo] target(s) in 0.13s
        Running unittests src/lib.rs 
(target/debug/deps/datafusion_physical_plan-33977765615826e4)
   
   running 1 test
   test 
spill::spill_pool::tests::test_clone_drop_does_not_signal_eof_prematurely ... 
FAILED
   
   failures:
   
   ---- 
spill::spill_pool::tests::test_clone_drop_does_not_signal_eof_prematurely 
stdout ----
   
   thread 
'spill::spill_pool::tests::test_clone_drop_does_not_signal_eof_prematurely' 
(2314602) panicked at datafusion/physical-plan/src/spill/spill_pool.rs:1400:9:
   Reader must not return EOF while a writer clone is still alive
   note: run with `RUST_BACKTRACE=1` environment variable to display a backtrace
   
   
   failures:
       spill::spill_pool::tests::test_clone_drop_does_not_signal_eof_prematurely
   
   test result: FAILED. 0 passed; 1 failed; 0 ignored; 0 measured; 1266 
filtered out; finished in 0.00s
   
   error: test failed, to rerun pass `-p datafusion-physical-plan --lib`
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to