xanderbailey commented on code in PR #20672:
URL: https://github.com/apache/datafusion/pull/20672#discussion_r2878707746
##########
datafusion/physical-plan/src/spill/spill_pool.rs:
##########
@@ -1343,6 +1368,90 @@ mod tests {
Ok(())
}
+ /// Regression test for data loss when multiple writer clones are used.
+ ///
+ /// `SpillPoolWriter` is `Clone`, and in non-preserve-order repartitioning
+ /// mode all input partition tasks share clones of the same writer. Before
+ /// the fix, `Drop` unconditionally set `writer_dropped = true` even when
+ /// other clones were still alive. This caused the `SpillPoolReader` to
+ /// return EOF prematurely, silently losing every batch written by the
+ /// remaining writers.
+ ///
+ /// The test sequence is:
+ ///
+ /// 1. writer1 writes a batch, then is dropped.
+ /// 2. The reader consumes that batch.
+ /// 3. The reader polls again — the queue is now empty.
+ /// - **Bug**: `writer_dropped` is already true → `Ready(None)` (EOF).
+ /// - **Fix**: `active_writer_count > 0` → `Pending` (wait for data).
+ /// 4. writer2 (still alive) writes a batch.
+ /// 5. The reader must see that batch — not silently lose it.
+ #[tokio::test]
+ async fn test_clone_drop_does_not_signal_eof_prematurely() -> Result<()> {
+ let (writer1, mut reader) = create_spill_channel(1024 * 1024);
+ let writer2 = writer1.clone();
+
+ // Synchronization: tell writer2 when it may proceed.
+ let (proceed_tx, proceed_rx) = tokio::sync::oneshot::channel::<()>();
+
+ // Spawn writer2 — it waits for the signal before writing.
+ let writer2_handle = SpawnedTask::spawn(async move {
+ proceed_rx.await.unwrap();
+ writer2.push_batch(&create_test_batch(10, 10)).unwrap();
+ // writer2 is dropped here (last clone → true EOF)
+ });
+
+ // Writer1 writes one batch, then drops.
+ writer1.push_batch(&create_test_batch(0, 10))?;
+ drop(writer1);
+
+ // Read writer1's batch.
+ let batch1 = reader.next().await.unwrap()?;
+ assert_eq!(batch1.num_rows(), 10);
+ let col = batch1
+ .column(0)
+ .as_any()
+ .downcast_ref::<Int32Array>()
+ .unwrap();
+ assert_eq!(col.value(0), 0);
+
+ // Signal writer2 to write its batch. It will execute when the
+ // current task yields (i.e. when reader.next() returns Pending).
+ proceed_tx.send(()).unwrap();
+
+ // With the bug the reader returns None here because it already
+ // saw writer_dropped=true on an empty queue. With the fix it
+ // returns Pending, the runtime schedules writer2, and the batch
+ // becomes available.
+ let batch2 = tokio::time::timeout(
+ std::time::Duration::from_secs(5),
+ reader.next(),
+ )
+ .await
+ .expect("Reader timed out — should not hang");
+
+ assert!(
Review Comment:
Without this fix we fail here.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]