This is an automated email from the ASF dual-hosted git repository. xudong963 pushed a commit to branch fix/sort-merge-reservation-starvation in repository https://gitbox.apache.org/repos/asf/datafusion.git
commit db69656f487320a1654859553332f9edc48548b5 Author: xudong.w <[email protected]> AuthorDate: Wed Jan 14 09:21:55 2026 +0800 Expose Spilling Progress Interface in DataFusion (#19708) <!-- We generally require a GitHub issue to be filed for all bug fixes and enhancements and this helps us generate change logs for our releases. You can link an issue to this PR using the GitHub syntax. For example `Closes #123` indicates that this PR will close issue #123. --> - Closes #19697 <!-- Why are you proposing this change? If this is already explained clearly in the issue then this section is not needed. Explaining clearly why changes are proposed helps reviewers understand your changes and offer better suggestions for fixes. --> <!-- There is no need to duplicate the description in the issue here but it is sometimes worth providing a summary of the individual changes in this PR. --> <!-- We typically require tests for all PRs in order to: 1. Prevent the code from being accidentally broken by subsequent changes 2. Serve as another way to document the expected behavior of the code If tests are not included in your PR, please explain why (for example, are they covered by existing tests)? --> <!-- If there are user-facing changes then we may require documentation to be updated before approving the PR. --> <!-- If there are any breaking changes to public APIs, please add the `api change` label. --> --- datafusion/execution/src/disk_manager.rs | 49 +++++++++++++- datafusion/execution/src/runtime_env.rs | 9 ++- .../src/spill/in_progress_spill_file.rs | 29 +++++++-- datafusion/physical-plan/src/spill/mod.rs | 74 ++++++++++++++++++++-- datafusion/physical-plan/src/spill/spill_pool.rs | 10 +-- 5 files changed, 153 insertions(+), 18 deletions(-) diff --git a/datafusion/execution/src/disk_manager.rs b/datafusion/execution/src/disk_manager.rs index c2923d6112..96f55dd05b 100644 --- a/datafusion/execution/src/disk_manager.rs +++ b/datafusion/execution/src/disk_manager.rs @@ -24,8 +24,8 @@ use log::debug; use parking_lot::Mutex; use rand::{rng, Rng}; use std::path::{Path, PathBuf}; -use std::sync::atomic::{AtomicU64, Ordering}; use std::sync::Arc; +use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering}; use tempfile::{Builder, NamedTempFile, TempDir}; use crate::memory_pool::human_readable_size; @@ -77,6 +77,7 @@ impl DiskManagerBuilder { local_dirs: Mutex::new(Some(vec![])), max_temp_directory_size: self.max_temp_directory_size, used_disk_space: Arc::new(AtomicU64::new(0)), + active_files_count: Arc::new(AtomicUsize::new(0)), }), DiskManagerMode::Directories(conf_dirs) => { let local_dirs = create_local_dirs(conf_dirs)?; @@ -87,12 +88,14 @@ impl DiskManagerBuilder { local_dirs: Mutex::new(Some(local_dirs)), max_temp_directory_size: self.max_temp_directory_size, used_disk_space: Arc::new(AtomicU64::new(0)), + active_files_count: Arc::new(AtomicUsize::new(0)), }) } DiskManagerMode::Disabled => Ok(DiskManager { local_dirs: Mutex::new(None), max_temp_directory_size: self.max_temp_directory_size, used_disk_space: Arc::new(AtomicU64::new(0)), + active_files_count: Arc::new(AtomicUsize::new(0)), }), } } @@ -168,6 +171,17 @@ pub struct DiskManager { /// Used disk space in the temporary directories. Now only spilled data for /// external executors are counted. used_disk_space: Arc<AtomicU64>, + /// Number of active temporary files created by this disk manager + active_files_count: Arc<AtomicUsize>, +} + +/// Information about the current disk usage for spilling +#[derive(Debug, Clone, Copy)] +pub struct SpillingProgress { + /// Total bytes currently used on disk for spilling + pub current_bytes: u64, + /// Total number of active spill files + pub active_files_count: usize, } impl DiskManager { @@ -186,6 +200,7 @@ impl DiskManager { local_dirs: Mutex::new(Some(vec![])), max_temp_directory_size: DEFAULT_MAX_TEMP_DIRECTORY_SIZE, used_disk_space: Arc::new(AtomicU64::new(0)), + active_files_count: Arc::new(AtomicUsize::new(0)), })), DiskManagerConfig::NewSpecified(conf_dirs) => { let local_dirs = create_local_dirs(conf_dirs)?; @@ -196,12 +211,14 @@ impl DiskManager { local_dirs: Mutex::new(Some(local_dirs)), max_temp_directory_size: DEFAULT_MAX_TEMP_DIRECTORY_SIZE, used_disk_space: Arc::new(AtomicU64::new(0)), + active_files_count: Arc::new(AtomicUsize::new(0)), })) } DiskManagerConfig::Disabled => Ok(Arc::new(Self { local_dirs: Mutex::new(None), max_temp_directory_size: DEFAULT_MAX_TEMP_DIRECTORY_SIZE, used_disk_space: Arc::new(AtomicU64::new(0)), + active_files_count: Arc::new(AtomicUsize::new(0)), })), } } @@ -246,6 +263,32 @@ impl DiskManager { self.used_disk_space.load(Ordering::Relaxed) } + /// Returns the maximum temporary directory size in bytes + pub fn max_temp_directory_size(&self) -> u64 { + self.max_temp_directory_size + } + + /// Returns the current spilling progress + pub fn spilling_progress(&self) -> SpillingProgress { + SpillingProgress { + current_bytes: self.used_disk_space.load(Ordering::Relaxed), + active_files_count: self.active_files_count.load(Ordering::Relaxed), + } + } + + /// Returns the temporary directory paths + pub fn temp_dir_paths(&self) -> Vec<PathBuf> { + self.local_dirs + .lock() + .as_ref() + .map(|dirs| { + dirs.iter() + .map(|temp_dir| temp_dir.path().to_path_buf()) + .collect() + }) + .unwrap_or_default() + } + /// Return true if this disk manager supports creating temporary /// files. If this returns false, any call to `create_tmp_file` /// will error. @@ -282,6 +325,7 @@ impl DiskManager { } let dir_index = rng().random_range(0..local_dirs.len()); + self.active_files_count.fetch_add(1, Ordering::Relaxed); Ok(RefCountedTempFile { parent_temp_dir: Arc::clone(&local_dirs[dir_index]), tempfile: Arc::new( @@ -403,6 +447,9 @@ impl Drop for RefCountedTempFile { self.disk_manager .used_disk_space .fetch_sub(current_usage, Ordering::Relaxed); + self.disk_manager + .active_files_count + .fetch_sub(1, Ordering::Relaxed); } } } diff --git a/datafusion/execution/src/runtime_env.rs b/datafusion/execution/src/runtime_env.rs index d699876008..e029cfcc1f 100644 --- a/datafusion/execution/src/runtime_env.rs +++ b/datafusion/execution/src/runtime_env.rs @@ -18,8 +18,8 @@ //! Execution [`RuntimeEnv`] environment that manages access to object //! store, memory manager, disk manager. -#[allow(deprecated)] -use crate::disk_manager::DiskManagerConfig; +#[expect(deprecated)] +use crate::disk_manager::{DiskManagerConfig, SpillingProgress}; use crate::{ disk_manager::{DiskManager, DiskManagerBuilder, DiskManagerMode}, memory_pool::{ @@ -152,6 +152,11 @@ impl RuntimeEnv { self.object_store_registry.get_store(url.as_ref()) } + /// Returns the current spilling progress + pub fn spilling_progress(&self) -> SpillingProgress { + self.disk_manager.spilling_progress() + } + /// Register an [`EncryptionFactory`] with an associated identifier that can be later /// used to configure encryption when reading or writing Parquet. /// If an encryption factory with the same identifier was already registered, it is replaced and returned. diff --git a/datafusion/physical-plan/src/spill/in_progress_spill_file.rs b/datafusion/physical-plan/src/spill/in_progress_spill_file.rs index e7f354a73b..e3259e86fc 100644 --- a/datafusion/physical-plan/src/spill/in_progress_spill_file.rs +++ b/datafusion/physical-plan/src/spill/in_progress_spill_file.rs @@ -63,7 +63,7 @@ impl InProgressSpillFile { } if self.writer.is_none() { let schema = batch.schema(); - if let Some(ref in_progress_file) = self.in_progress_file { + if let Some(in_progress_file) = &mut self.in_progress_file { self.writer = Some(IPCStreamWriter::new( in_progress_file.path(), schema.as_ref(), @@ -72,18 +72,31 @@ impl InProgressSpillFile { // Update metrics self.spill_writer.metrics.spill_file_count.add(1); + + // Update initial size (schema/header) + in_progress_file.update_disk_usage()?; + let initial_size = in_progress_file.current_disk_usage(); + self.spill_writer + .metrics + .spilled_bytes + .add(initial_size as usize); } } if let Some(writer) = &mut self.writer { let (spilled_rows, _) = writer.write(batch)?; if let Some(in_progress_file) = &mut self.in_progress_file { + let pre_size = in_progress_file.current_disk_usage(); in_progress_file.update_disk_usage()?; + let post_size = in_progress_file.current_disk_usage(); + + self.spill_writer.metrics.spilled_rows.add(spilled_rows); + self.spill_writer + .metrics + .spilled_bytes + .add((post_size - pre_size) as usize); } else { unreachable!() // Already checked inside current function } - - // Update metrics - self.spill_writer.metrics.spilled_rows.add(spilled_rows); } Ok(()) } @@ -106,9 +119,13 @@ impl InProgressSpillFile { // Since spill files are append-only, add the file size to spilled_bytes if let Some(in_progress_file) = &mut self.in_progress_file { // Since writer.finish() writes continuation marker and message length at the end + let pre_size = in_progress_file.current_disk_usage(); in_progress_file.update_disk_usage()?; - let size = in_progress_file.current_disk_usage(); - self.spill_writer.metrics.spilled_bytes.add(size as usize); + let post_size = in_progress_file.current_disk_usage(); + self.spill_writer + .metrics + .spilled_bytes + .add((post_size - pre_size) as usize); } Ok(self.in_progress_file.take()) diff --git a/datafusion/physical-plan/src/spill/mod.rs b/datafusion/physical-plan/src/spill/mod.rs index 58fd016a63..97b872de03 100644 --- a/datafusion/physical-plan/src/spill/mod.rs +++ b/datafusion/physical-plan/src/spill/mod.rs @@ -878,13 +878,13 @@ mod tests { Arc::new(StringArray::from(vec!["d", "e", "f"])), ], )?; - // After appending each batch, spilled_rows should increase, while spill_file_count and - // spilled_bytes remain the same (spilled_bytes is updated only after finish() is called) + // After appending each batch, spilled_rows and spilled_bytes should increase incrementally, + // while spill_file_count remains 1 (since we're writing to the same file) in_progress_file.append_batch(&batch1)?; - verify_metrics(&in_progress_file, 1, 0, 3)?; + verify_metrics(&in_progress_file, 1, 440, 3)?; in_progress_file.append_batch(&batch2)?; - verify_metrics(&in_progress_file, 1, 0, 6)?; + verify_metrics(&in_progress_file, 1, 704, 6)?; let completed_file = in_progress_file.finish()?; assert!(completed_file.is_some()); @@ -992,4 +992,70 @@ mod tests { assert_eq!(alignment, 8); Ok(()) } + #[tokio::test] + async fn test_real_time_spill_metrics() -> Result<()> { + let env = Arc::new(RuntimeEnv::default()); + let metrics = SpillMetrics::new(&ExecutionPlanMetricsSet::new(), 0); + let schema = Arc::new(Schema::new(vec![ + Field::new("a", DataType::Int32, false), + Field::new("b", DataType::Utf8, false), + ])); + + let spill_manager = Arc::new(SpillManager::new( + Arc::clone(&env), + metrics.clone(), + Arc::clone(&schema), + )); + let mut in_progress_file = spill_manager.create_in_progress_file("Test")?; + + let batch1 = RecordBatch::try_new( + Arc::clone(&schema), + vec![ + Arc::new(Int32Array::from(vec![1, 2, 3])), + Arc::new(StringArray::from(vec!["a", "b", "c"])), + ], + )?; + + // Before any batch, metrics should be 0 + assert_eq!(metrics.spilled_bytes.value(), 0); + assert_eq!(metrics.spill_file_count.value(), 0); + + // Append first batch + in_progress_file.append_batch(&batch1)?; + + // Metrics should be updated immediately (at least schema and first batch) + let bytes_after_batch1 = metrics.spilled_bytes.value(); + assert_eq!(bytes_after_batch1, 440); + assert_eq!(metrics.spill_file_count.value(), 1); + + // Check global progress + let progress = env.spilling_progress(); + assert_eq!(progress.current_bytes, bytes_after_batch1 as u64); + assert_eq!(progress.active_files_count, 1); + + // Append another batch + in_progress_file.append_batch(&batch1)?; + let bytes_after_batch2 = metrics.spilled_bytes.value(); + assert!(bytes_after_batch2 > bytes_after_batch1); + + // Check global progress again + let progress = env.spilling_progress(); + assert_eq!(progress.current_bytes, bytes_after_batch2 as u64); + + // Finish the file + let spilled_file = in_progress_file.finish()?; + let final_bytes = metrics.spilled_bytes.value(); + assert!(final_bytes > bytes_after_batch2); + + // Even after finish, file is still "active" until dropped + let progress = env.spilling_progress(); + assert!(progress.current_bytes > 0); + assert_eq!(progress.active_files_count, 1); + + drop(spilled_file); + assert_eq!(env.spilling_progress().active_files_count, 0); + assert_eq!(env.spilling_progress().current_bytes, 0); + + Ok(()) + } } diff --git a/datafusion/physical-plan/src/spill/spill_pool.rs b/datafusion/physical-plan/src/spill/spill_pool.rs index bbe54ca45c..d5d29b5686 100644 --- a/datafusion/physical-plan/src/spill/spill_pool.rs +++ b/datafusion/physical-plan/src/spill/spill_pool.rs @@ -874,8 +874,8 @@ mod tests { ); assert_eq!( metrics.spilled_bytes.value(), - 0, - "Spilled bytes should be 0 before file finalization" + 320, + "Spilled bytes should reflect data written (header + 1 batch)" ); assert_eq!( metrics.spilled_rows.value(), @@ -1287,11 +1287,11 @@ mod tests { writer.push_batch(&batch)?; } - // Check metrics before drop - spilled_bytes should be 0 since file isn't finalized yet + // Check metrics before drop - spilled_bytes already reflects written data let spilled_bytes_before = metrics.spilled_bytes.value(); assert_eq!( - spilled_bytes_before, 0, - "Spilled bytes should be 0 before writer is dropped" + spilled_bytes_before, 1088, + "Spilled bytes should reflect data written (header + 5 batches)" ); // Explicitly drop the writer - this should finalize the current file --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
