This is an automated email from the ASF dual-hosted git repository.
timsaucer pushed a commit to branch branch-52
in repository https://gitbox.apache.org/repos/asf/datafusion.git
The following commit(s) were added to refs/heads/branch-52 by this push:
new 1bd7082b79 [branch-52] Fix repartition from dropping data when
spilling (#20672) (#20777)
1bd7082b79 is described below
commit 1bd7082b798d0d55c1e90c7be1d7e3dba057c288
Author: Andrew Lamb <[email protected]>
AuthorDate: Sun Mar 8 11:14:32 2026 -0400
[branch-52] Fix repartition from dropping data when spilling (#20672)
(#20777)
- Part of https://github.com/apache/datafusion/issues/20681
- Closes https://github.com/apache/datafusion/issues/20683 on branch-52
This PR:
- Backports https://github.com/apache/datafusion/pull/20672 from
@xanderbailey to the `branch-52` line
Co-authored-by: Xander <[email protected]>
---
datafusion/physical-plan/src/spill/spill_pool.rs | 102 ++++++++++++++++++++++-
1 file changed, 101 insertions(+), 1 deletion(-)
diff --git a/datafusion/physical-plan/src/spill/spill_pool.rs
b/datafusion/physical-plan/src/spill/spill_pool.rs
index e8eea360da..36920c7fd0 100644
--- a/datafusion/physical-plan/src/spill/spill_pool.rs
+++ b/datafusion/physical-plan/src/spill/spill_pool.rs
@@ -61,6 +61,10 @@ struct SpillPoolShared {
/// Writer's reference to the current file (shared by all cloned writers).
/// Has its own lock to allow I/O without blocking queue access.
current_write_file: Option<Arc<Mutex<ActiveSpillFileShared>>>,
+ /// Number of active writer clones. Only when this reaches zero should
+ /// `writer_dropped` be set to true. This prevents premature EOF signaling
+ /// when one writer clone is dropped while others are still active.
+ active_writer_count: usize,
}
impl SpillPoolShared {
@@ -72,6 +76,7 @@ impl SpillPoolShared {
waker: None,
writer_dropped: false,
current_write_file: None,
+ active_writer_count: 1,
}
}
@@ -97,7 +102,6 @@ impl SpillPoolShared {
/// The writer automatically manages file rotation based on the
`max_file_size_bytes`
/// configured in [`channel`]. When the last writer clone is dropped, it
finalizes the
/// current file so readers can access all written data.
-#[derive(Clone)]
pub struct SpillPoolWriter {
/// Maximum size in bytes before rotating to a new file.
/// Typically set from configuration
`datafusion.execution.max_spill_file_size_bytes`.
@@ -106,6 +110,18 @@ pub struct SpillPoolWriter {
shared: Arc<Mutex<SpillPoolShared>>,
}
+impl Clone for SpillPoolWriter {
+ fn clone(&self) -> Self {
+ // Increment the active writer count so that `writer_dropped` is only
+ // set to true when the *last* clone is dropped.
+ self.shared.lock().active_writer_count += 1;
+ Self {
+ max_file_size_bytes: self.max_file_size_bytes,
+ shared: Arc::clone(&self.shared),
+ }
+ }
+}
+
impl SpillPoolWriter {
/// Spills a batch to the pool, rotating files when necessary.
///
@@ -233,6 +249,15 @@ impl Drop for SpillPoolWriter {
fn drop(&mut self) {
let mut shared = self.shared.lock();
+ shared.active_writer_count -= 1;
+ let is_last_writer = shared.active_writer_count == 0;
+
+ if !is_last_writer {
+ // Other writer clones are still active; do not finalize or
+ // signal EOF to readers.
+ return;
+ }
+
// Finalize the current file when the last writer is dropped
if let Some(current_file) = shared.current_write_file.take() {
// Release shared lock before locking file
@@ -1343,6 +1368,81 @@ mod tests {
Ok(())
}
+ /// Verifies that the reader stays alive as long as any writer clone
exists.
+ ///
+ /// `SpillPoolWriter` is `Clone`, and in non-preserve-order repartitioning
+ /// mode multiple input partition tasks share clones of the same writer.
+ /// The reader must not see EOF until **all** clones have been dropped,
+ /// even if the queue is temporarily empty between writes from different
+ /// clones.
+ ///
+ /// The test sequence is:
+ ///
+ /// 1. writer1 writes a batch, then is dropped.
+ /// 2. The reader consumes that batch (queue is now empty).
+ /// 3. writer2 (still alive) writes a batch.
+ /// 4. The reader must see that batch.
+ /// 5. EOF is only signalled after writer2 is also dropped.
+ #[tokio::test]
+ async fn test_clone_drop_does_not_signal_eof_prematurely() -> Result<()> {
+ let (writer1, mut reader) = create_spill_channel(1024 * 1024);
+ let writer2 = writer1.clone();
+
+ // Synchronization: tell writer2 when it may proceed.
+ let (proceed_tx, proceed_rx) = tokio::sync::oneshot::channel::<()>();
+
+ // Spawn writer2 — it waits for the signal before writing.
+ let writer2_handle = SpawnedTask::spawn(async move {
+ proceed_rx.await.unwrap();
+ writer2.push_batch(&create_test_batch(10, 10)).unwrap();
+ // writer2 is dropped here (last clone → true EOF)
+ });
+
+ // Writer1 writes one batch, then drops.
+ writer1.push_batch(&create_test_batch(0, 10))?;
+ drop(writer1);
+
+ // Read writer1's batch.
+ let batch1 = reader.next().await.unwrap()?;
+ assert_eq!(batch1.num_rows(), 10);
+ let col = batch1
+ .column(0)
+ .as_any()
+ .downcast_ref::<Int32Array>()
+ .unwrap();
+ assert_eq!(col.value(0), 0);
+
+ // Signal writer2 to write its batch. It will execute when the
+ // current task yields (i.e. when reader.next() returns Pending).
+ proceed_tx.send(()).unwrap();
+
+ // The reader should wait (Pending) for writer2's data, not EOF.
+ let batch2 =
+ tokio::time::timeout(std::time::Duration::from_secs(5),
reader.next())
+ .await
+ .expect("Reader timed out — should not hang");
+
+ assert!(
+ batch2.is_some(),
+ "Reader must not return EOF while a writer clone is still alive"
+ );
+ let batch2 = batch2.unwrap()?;
+ assert_eq!(batch2.num_rows(), 10);
+ let col = batch2
+ .column(0)
+ .as_any()
+ .downcast_ref::<Int32Array>()
+ .unwrap();
+ assert_eq!(col.value(0), 10);
+
+ writer2_handle.await.unwrap();
+
+ // All writers dropped — reader should see real EOF now.
+ assert!(reader.next().await.is_none());
+
+ Ok(())
+ }
+
#[tokio::test]
async fn test_disk_usage_decreases_as_files_consumed() -> Result<()> {
use datafusion_execution::runtime_env::RuntimeEnvBuilder;
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]