This is an automated email from the ASF dual-hosted git repository.

github-bot pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new 2f10d453f3 Expose Spilling Progress Interface in DataFusion (#19708)
2f10d453f3 is described below

commit 2f10d453f3283292aacb5c10fa35c72b09b77d05
Author: xudong.w <[email protected]>
AuthorDate: Wed Jan 14 09:21:55 2026 +0800

    Expose Spilling Progress Interface in DataFusion (#19708)
    
    ## Which issue does this PR close?
    
    <!--
    We generally require a GitHub issue to be filed for all bug fixes and
    enhancements and this helps us generate change logs for our releases.
    You can link an issue to this PR using the GitHub syntax. For example
    `Closes #123` indicates that this PR will close issue #123.
    -->
    
    - Closes #19697
    
    ## Rationale for this change
    
    <!--
    Why are you proposing this change? If this is already explained clearly
    in the issue then this section is not needed.
    Explaining clearly why changes are proposed helps reviewers understand
    your changes and offer better suggestions for fixes.
    -->
    
    ## What changes are included in this PR?
    
    <!--
    There is no need to duplicate the description in the issue here but it
    is sometimes worth providing a summary of the individual changes in this
    PR.
    -->
    
    ## Are these changes tested?
    
    <!--
    We typically require tests for all PRs in order to:
    1. Prevent the code from being accidentally broken by subsequent changes
    2. Serve as another way to document the expected behavior of the code
    
    If tests are not included in your PR, please explain why (for example,
    are they covered by existing tests)?
    -->
    
    ## Are there any user-facing changes?
    
    <!--
    If there are user-facing changes then we may require documentation to be
    updated before approving the PR.
    -->
    
    <!--
    If there are any breaking changes to public APIs, please add the `api
    change` label.
    -->
---
 datafusion/execution/src/disk_manager.rs           | 31 ++++++++-
 datafusion/execution/src/runtime_env.rs            |  7 +-
 .../src/spill/in_progress_spill_file.rs            | 29 +++++++--
 datafusion/physical-plan/src/spill/mod.rs          | 74 ++++++++++++++++++++--
 datafusion/physical-plan/src/spill/spill_pool.rs   | 10 +--
 5 files changed, 134 insertions(+), 17 deletions(-)

diff --git a/datafusion/execution/src/disk_manager.rs 
b/datafusion/execution/src/disk_manager.rs
index cb87053d8d..d878fdcf66 100644
--- a/datafusion/execution/src/disk_manager.rs
+++ b/datafusion/execution/src/disk_manager.rs
@@ -25,7 +25,7 @@ use parking_lot::Mutex;
 use rand::{Rng, rng};
 use std::path::{Path, PathBuf};
 use std::sync::Arc;
-use std::sync::atomic::{AtomicU64, Ordering};
+use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
 use tempfile::{Builder, NamedTempFile, TempDir};
 
 use datafusion_common::human_readable_size;
@@ -77,6 +77,7 @@ impl DiskManagerBuilder {
                 local_dirs: Mutex::new(Some(vec![])),
                 max_temp_directory_size: self.max_temp_directory_size,
                 used_disk_space: Arc::new(AtomicU64::new(0)),
+                active_files_count: Arc::new(AtomicUsize::new(0)),
             }),
             DiskManagerMode::Directories(conf_dirs) => {
                 let local_dirs = create_local_dirs(&conf_dirs)?;
@@ -87,12 +88,14 @@ impl DiskManagerBuilder {
                     local_dirs: Mutex::new(Some(local_dirs)),
                     max_temp_directory_size: self.max_temp_directory_size,
                     used_disk_space: Arc::new(AtomicU64::new(0)),
+                    active_files_count: Arc::new(AtomicUsize::new(0)),
                 })
             }
             DiskManagerMode::Disabled => Ok(DiskManager {
                 local_dirs: Mutex::new(None),
                 max_temp_directory_size: self.max_temp_directory_size,
                 used_disk_space: Arc::new(AtomicU64::new(0)),
+                active_files_count: Arc::new(AtomicUsize::new(0)),
             }),
         }
     }
@@ -169,6 +172,17 @@ pub struct DiskManager {
     /// Used disk space in the temporary directories. Now only spilled data for
     /// external executors are counted.
     used_disk_space: Arc<AtomicU64>,
+    /// Number of active temporary files created by this disk manager
+    active_files_count: Arc<AtomicUsize>,
+}
+
+/// Information about the current disk usage for spilling
+#[derive(Debug, Clone, Copy)]
+pub struct SpillingProgress {
+    /// Total bytes currently used on disk for spilling
+    pub current_bytes: u64,
+    /// Total number of active spill files
+    pub active_files_count: usize,
 }
 
 impl DiskManager {
@@ -187,6 +201,7 @@ impl DiskManager {
                 local_dirs: Mutex::new(Some(vec![])),
                 max_temp_directory_size: DEFAULT_MAX_TEMP_DIRECTORY_SIZE,
                 used_disk_space: Arc::new(AtomicU64::new(0)),
+                active_files_count: Arc::new(AtomicUsize::new(0)),
             })),
             DiskManagerConfig::NewSpecified(conf_dirs) => {
                 let local_dirs = create_local_dirs(&conf_dirs)?;
@@ -197,12 +212,14 @@ impl DiskManager {
                     local_dirs: Mutex::new(Some(local_dirs)),
                     max_temp_directory_size: DEFAULT_MAX_TEMP_DIRECTORY_SIZE,
                     used_disk_space: Arc::new(AtomicU64::new(0)),
+                    active_files_count: Arc::new(AtomicUsize::new(0)),
                 }))
             }
             DiskManagerConfig::Disabled => Ok(Arc::new(Self {
                 local_dirs: Mutex::new(None),
                 max_temp_directory_size: DEFAULT_MAX_TEMP_DIRECTORY_SIZE,
                 used_disk_space: Arc::new(AtomicU64::new(0)),
+                active_files_count: Arc::new(AtomicUsize::new(0)),
             })),
         }
     }
@@ -252,6 +269,14 @@ impl DiskManager {
         self.max_temp_directory_size
     }
 
+    /// Returns the current spilling progress
+    pub fn spilling_progress(&self) -> SpillingProgress {
+        SpillingProgress {
+            current_bytes: self.used_disk_space.load(Ordering::Relaxed),
+            active_files_count: 
self.active_files_count.load(Ordering::Relaxed),
+        }
+    }
+
     /// Returns the temporary directory paths
     pub fn temp_dir_paths(&self) -> Vec<PathBuf> {
         self.local_dirs
@@ -301,6 +326,7 @@ impl DiskManager {
         }
 
         let dir_index = rng().random_range(0..local_dirs.len());
+        self.active_files_count.fetch_add(1, Ordering::Relaxed);
         Ok(RefCountedTempFile {
             parent_temp_dir: Arc::clone(&local_dirs[dir_index]),
             tempfile: Arc::new(
@@ -422,6 +448,9 @@ impl Drop for RefCountedTempFile {
             self.disk_manager
                 .used_disk_space
                 .fetch_sub(current_usage, Ordering::Relaxed);
+            self.disk_manager
+                .active_files_count
+                .fetch_sub(1, Ordering::Relaxed);
         }
     }
 }
diff --git a/datafusion/execution/src/runtime_env.rs 
b/datafusion/execution/src/runtime_env.rs
index 67398d59f1..67604c424c 100644
--- a/datafusion/execution/src/runtime_env.rs
+++ b/datafusion/execution/src/runtime_env.rs
@@ -19,7 +19,7 @@
 //! store, memory manager, disk manager.
 
 #[expect(deprecated)]
-use crate::disk_manager::DiskManagerConfig;
+use crate::disk_manager::{DiskManagerConfig, SpillingProgress};
 use crate::{
     disk_manager::{DiskManager, DiskManagerBuilder, DiskManagerMode},
     memory_pool::{
@@ -199,6 +199,11 @@ impl RuntimeEnv {
         self.object_store_registry.get_store(url.as_ref())
     }
 
+    /// Returns the current spilling progress
+    pub fn spilling_progress(&self) -> SpillingProgress {
+        self.disk_manager.spilling_progress()
+    }
+
     /// Register an [`EncryptionFactory`] with an associated identifier that 
can be later
     /// used to configure encryption when reading or writing Parquet.
     /// If an encryption factory with the same identifier was already 
registered, it is replaced and returned.
diff --git a/datafusion/physical-plan/src/spill/in_progress_spill_file.rs 
b/datafusion/physical-plan/src/spill/in_progress_spill_file.rs
index d2acf4993b..0ad7aabf64 100644
--- a/datafusion/physical-plan/src/spill/in_progress_spill_file.rs
+++ b/datafusion/physical-plan/src/spill/in_progress_spill_file.rs
@@ -63,7 +63,7 @@ impl InProgressSpillFile {
         }
         if self.writer.is_none() {
             let schema = batch.schema();
-            if let Some(ref in_progress_file) = self.in_progress_file {
+            if let Some(in_progress_file) = &mut self.in_progress_file {
                 self.writer = Some(IPCStreamWriter::new(
                     in_progress_file.path(),
                     schema.as_ref(),
@@ -72,18 +72,31 @@ impl InProgressSpillFile {
 
                 // Update metrics
                 self.spill_writer.metrics.spill_file_count.add(1);
+
+                // Update initial size (schema/header)
+                in_progress_file.update_disk_usage()?;
+                let initial_size = in_progress_file.current_disk_usage();
+                self.spill_writer
+                    .metrics
+                    .spilled_bytes
+                    .add(initial_size as usize);
             }
         }
         if let Some(writer) = &mut self.writer {
             let (spilled_rows, _) = writer.write(batch)?;
             if let Some(in_progress_file) = &mut self.in_progress_file {
+                let pre_size = in_progress_file.current_disk_usage();
                 in_progress_file.update_disk_usage()?;
+                let post_size = in_progress_file.current_disk_usage();
+
+                self.spill_writer.metrics.spilled_rows.add(spilled_rows);
+                self.spill_writer
+                    .metrics
+                    .spilled_bytes
+                    .add((post_size - pre_size) as usize);
             } else {
                 unreachable!() // Already checked inside current function
             }
-
-            // Update metrics
-            self.spill_writer.metrics.spilled_rows.add(spilled_rows);
         }
         Ok(())
     }
@@ -106,9 +119,13 @@ impl InProgressSpillFile {
         // Since spill files are append-only, add the file size to 
spilled_bytes
         if let Some(in_progress_file) = &mut self.in_progress_file {
             // Since writer.finish() writes continuation marker and message 
length at the end
+            let pre_size = in_progress_file.current_disk_usage();
             in_progress_file.update_disk_usage()?;
-            let size = in_progress_file.current_disk_usage();
-            self.spill_writer.metrics.spilled_bytes.add(size as usize);
+            let post_size = in_progress_file.current_disk_usage();
+            self.spill_writer
+                .metrics
+                .spilled_bytes
+                .add((post_size - pre_size) as usize);
         }
 
         Ok(self.in_progress_file.take())
diff --git a/datafusion/physical-plan/src/spill/mod.rs 
b/datafusion/physical-plan/src/spill/mod.rs
index 78dea99ac8..166805a337 100644
--- a/datafusion/physical-plan/src/spill/mod.rs
+++ b/datafusion/physical-plan/src/spill/mod.rs
@@ -685,13 +685,13 @@ mod tests {
                 Arc::new(StringArray::from(vec!["d", "e", "f"])),
             ],
         )?;
-        // After appending each batch, spilled_rows should increase, while 
spill_file_count and
-        // spilled_bytes remain the same (spilled_bytes is updated only after 
finish() is called)
+        // After appending each batch, spilled_rows and spilled_bytes should 
increase incrementally,
+        // while spill_file_count remains 1 (since we're writing to the same 
file)
         in_progress_file.append_batch(&batch1)?;
-        verify_metrics(&in_progress_file, 1, 0, 3)?;
+        verify_metrics(&in_progress_file, 1, 440, 3)?;
 
         in_progress_file.append_batch(&batch2)?;
-        verify_metrics(&in_progress_file, 1, 0, 6)?;
+        verify_metrics(&in_progress_file, 1, 704, 6)?;
 
         let completed_file = in_progress_file.finish()?;
         assert!(completed_file.is_some());
@@ -799,4 +799,70 @@ mod tests {
         assert_eq!(alignment, 8);
         Ok(())
     }
+    #[tokio::test]
+    async fn test_real_time_spill_metrics() -> Result<()> {
+        let env = Arc::new(RuntimeEnv::default());
+        let metrics = SpillMetrics::new(&ExecutionPlanMetricsSet::new(), 0);
+        let schema = Arc::new(Schema::new(vec![
+            Field::new("a", DataType::Int32, false),
+            Field::new("b", DataType::Utf8, false),
+        ]));
+
+        let spill_manager = Arc::new(SpillManager::new(
+            Arc::clone(&env),
+            metrics.clone(),
+            Arc::clone(&schema),
+        ));
+        let mut in_progress_file = 
spill_manager.create_in_progress_file("Test")?;
+
+        let batch1 = RecordBatch::try_new(
+            Arc::clone(&schema),
+            vec![
+                Arc::new(Int32Array::from(vec![1, 2, 3])),
+                Arc::new(StringArray::from(vec!["a", "b", "c"])),
+            ],
+        )?;
+
+        // Before any batch, metrics should be 0
+        assert_eq!(metrics.spilled_bytes.value(), 0);
+        assert_eq!(metrics.spill_file_count.value(), 0);
+
+        // Append first batch
+        in_progress_file.append_batch(&batch1)?;
+
+        // Metrics should be updated immediately (at least schema and first 
batch)
+        let bytes_after_batch1 = metrics.spilled_bytes.value();
+        assert_eq!(bytes_after_batch1, 440);
+        assert_eq!(metrics.spill_file_count.value(), 1);
+
+        // Check global progress
+        let progress = env.spilling_progress();
+        assert_eq!(progress.current_bytes, bytes_after_batch1 as u64);
+        assert_eq!(progress.active_files_count, 1);
+
+        // Append another batch
+        in_progress_file.append_batch(&batch1)?;
+        let bytes_after_batch2 = metrics.spilled_bytes.value();
+        assert!(bytes_after_batch2 > bytes_after_batch1);
+
+        // Check global progress again
+        let progress = env.spilling_progress();
+        assert_eq!(progress.current_bytes, bytes_after_batch2 as u64);
+
+        // Finish the file
+        let spilled_file = in_progress_file.finish()?;
+        let final_bytes = metrics.spilled_bytes.value();
+        assert!(final_bytes > bytes_after_batch2);
+
+        // Even after finish, file is still "active" until dropped
+        let progress = env.spilling_progress();
+        assert!(progress.current_bytes > 0);
+        assert_eq!(progress.active_files_count, 1);
+
+        drop(spilled_file);
+        assert_eq!(env.spilling_progress().active_files_count, 0);
+        assert_eq!(env.spilling_progress().current_bytes, 0);
+
+        Ok(())
+    }
 }
diff --git a/datafusion/physical-plan/src/spill/spill_pool.rs 
b/datafusion/physical-plan/src/spill/spill_pool.rs
index e3b547b573..8f7f5212f6 100644
--- a/datafusion/physical-plan/src/spill/spill_pool.rs
+++ b/datafusion/physical-plan/src/spill/spill_pool.rs
@@ -879,8 +879,8 @@ mod tests {
         );
         assert_eq!(
             metrics.spilled_bytes.value(),
-            0,
-            "Spilled bytes should be 0 before file finalization"
+            320,
+            "Spilled bytes should reflect data written (header + 1 batch)"
         );
         assert_eq!(
             metrics.spilled_rows.value(),
@@ -1300,11 +1300,11 @@ mod tests {
             writer.push_batch(&batch)?;
         }
 
-        // Check metrics before drop - spilled_bytes should be 0 since file 
isn't finalized yet
+        // Check metrics before drop - spilled_bytes already reflects written 
data
         let spilled_bytes_before = metrics.spilled_bytes.value();
         assert_eq!(
-            spilled_bytes_before, 0,
-            "Spilled bytes should be 0 before writer is dropped"
+            spilled_bytes_before, 1088,
+            "Spilled bytes should reflect data written (header + 5 batches)"
         );
 
         // Explicitly drop the writer - this should finalize the current file


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to