This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch branch-52
in repository https://gitbox.apache.org/repos/asf/datafusion.git


The following commit(s) were added to refs/heads/branch-52 by this push:
     new 9a67de58c0 [branch-52] Fix Arrow Spill Underrun (#20159) (#20684)
9a67de58c0 is described below

commit 9a67de58c027e6057aa37327ae4d0192d5c45fc5
Author: Haresh Khanna <[email protected]>
AuthorDate: Wed Mar 4 20:13:36 2026 +0000

    [branch-52] Fix Arrow Spill Underrun (#20159) (#20684)
    
    ## Which issue does this PR close?
    
    - Related to #20681
    - Backport of https://github.com/apache/datafusion/pull/20159
    
    ## Rationale for this change
    
    This adjusts the way that the spill channel works. Currently we have a
    spill writer & reader pairing which uses a mutex to coordindate when a
    file is ready to be read.
    
    What happens is, that because we were using a `spawn_buffered` call, the
    read task would race ahead trying to read a file which is yet to be
    written out completely.
    
    Alongside this, we need to flush each write to the file, as there is a
    chance that another thread may see stale data.
    
    ## What changes are included in this PR?
    
    Adds a flush on write, and converts the read task to not buffer reads.
    
    ## Are these changes tested?
    
    I haven't written a test, but I have been running the example in the
    attached issue. While it now fails with allocation errors, the original
    error goes away.
    
    ## Are there any user-facing changes?
    
    Nope
    
    Co-authored-by: Peter L <[email protected]>
---
 .../physical-plan/src/spill/in_progress_spill_file.rs       |  7 +++++++
 datafusion/physical-plan/src/spill/mod.rs                   |  5 +++++
 datafusion/physical-plan/src/spill/spill_manager.rs         | 13 +++++++++++++
 datafusion/physical-plan/src/spill/spill_pool.rs            |  8 +++++++-
 4 files changed, 32 insertions(+), 1 deletion(-)

diff --git a/datafusion/physical-plan/src/spill/in_progress_spill_file.rs 
b/datafusion/physical-plan/src/spill/in_progress_spill_file.rs
index d2acf4993b..b9ff6b2f3b 100644
--- a/datafusion/physical-plan/src/spill/in_progress_spill_file.rs
+++ b/datafusion/physical-plan/src/spill/in_progress_spill_file.rs
@@ -88,6 +88,13 @@ impl InProgressSpillFile {
         Ok(())
     }
 
+    pub fn flush(&mut self) -> Result<()> {
+        if let Some(writer) = &mut self.writer {
+            writer.flush()?;
+        }
+        Ok(())
+    }
+
     /// Returns a reference to the in-progress file, if it exists.
     /// This can be used to get the file path for creating readers before the 
file is finished.
     pub fn file(&self) -> Option<&RefCountedTempFile> {
diff --git a/datafusion/physical-plan/src/spill/mod.rs 
b/datafusion/physical-plan/src/spill/mod.rs
index 78dea99ac8..3c4ee065c3 100644
--- a/datafusion/physical-plan/src/spill/mod.rs
+++ b/datafusion/physical-plan/src/spill/mod.rs
@@ -310,6 +310,11 @@ impl IPCStreamWriter {
         Ok((delta_num_rows, delta_num_bytes))
     }
 
+    pub fn flush(&mut self) -> Result<()> {
+        self.writer.flush()?;
+        Ok(())
+    }
+
     /// Finish the writer
     pub fn finish(&mut self) -> Result<()> {
         self.writer.finish().map_err(Into::into)
diff --git a/datafusion/physical-plan/src/spill/spill_manager.rs 
b/datafusion/physical-plan/src/spill/spill_manager.rs
index 89b0276206..6d931112ad 100644
--- a/datafusion/physical-plan/src/spill/spill_manager.rs
+++ b/datafusion/physical-plan/src/spill/spill_manager.rs
@@ -188,6 +188,19 @@ impl SpillManager {
 
         Ok(spawn_buffered(stream, self.batch_read_buffer_capacity))
     }
+
+    /// Same as `read_spill_as_stream`, but without buffering.
+    pub fn read_spill_as_stream_unbuffered(
+        &self,
+        spill_file_path: RefCountedTempFile,
+        max_record_batch_memory: Option<usize>,
+    ) -> Result<SendableRecordBatchStream> {
+        Ok(Box::pin(cooperative(SpillReaderStream::new(
+            Arc::clone(&self.schema),
+            spill_file_path,
+            max_record_batch_memory,
+        ))))
+    }
 }
 
 pub(crate) trait GetSlicedSize {
diff --git a/datafusion/physical-plan/src/spill/spill_pool.rs 
b/datafusion/physical-plan/src/spill/spill_pool.rs
index e3b547b573..e8eea360da 100644
--- a/datafusion/physical-plan/src/spill/spill_pool.rs
+++ b/datafusion/physical-plan/src/spill/spill_pool.rs
@@ -194,6 +194,8 @@ impl SpillPoolWriter {
             // Append the batch
             if let Some(ref mut writer) = file_shared.writer {
                 writer.append_batch(batch)?;
+                // make sure we flush the writer for readers
+                writer.flush()?;
                 file_shared.batches_written += 1;
                 file_shared.estimated_size += batch_size;
             }
@@ -535,7 +537,11 @@ impl Stream for SpillFile {
         // Step 2: Lazy-create reader stream if needed
         if self.reader.is_none() && should_read {
             if let Some(file) = file {
-                match self.spill_manager.read_spill_as_stream(file, None) {
+                // we want this unbuffered because files are actively being 
written to
+                match self
+                    .spill_manager
+                    .read_spill_as_stream_unbuffered(file, None)
+                {
                     Ok(stream) => {
                         self.reader = Some(SpillFileReader {
                             stream,


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to