This is an automated email from the ASF dual-hosted git repository. alamb pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/datafusion.git
The following commit(s) were added to refs/heads/main by this push: new fcb193b941 Update spilled_bytes to report spill file size (#16535) fcb193b941 is described below commit fcb193b941c19eeda7a99e69ef6610776d905193 Author: ding-young <lsyh...@snu.ac.kr> AuthorDate: Thu Jun 26 05:07:37 2025 +0900 Update spilled_bytes to report spill file size (#16535) --- datafusion/execution/src/disk_manager.rs | 4 ++++ datafusion/physical-plan/src/metrics/baseline.rs | 2 +- datafusion/physical-plan/src/sorts/sort.rs | 2 +- datafusion/physical-plan/src/spill/in_progress_spill_file.rs | 11 +++++++++-- datafusion/physical-plan/src/spill/mod.rs | 7 ++++--- 5 files changed, 19 insertions(+), 7 deletions(-) diff --git a/datafusion/execution/src/disk_manager.rs b/datafusion/execution/src/disk_manager.rs index 1810601fd3..82f2d75ac1 100644 --- a/datafusion/execution/src/disk_manager.rs +++ b/datafusion/execution/src/disk_manager.rs @@ -366,6 +366,10 @@ impl RefCountedTempFile { Ok(()) } + + pub fn current_disk_usage(&self) -> u64 { + self.current_file_disk_usage + } } /// When the temporary file is dropped, subtract its disk usage from the disk manager's total diff --git a/datafusion/physical-plan/src/metrics/baseline.rs b/datafusion/physical-plan/src/metrics/baseline.rs index b57652d4b6..a52336108a 100644 --- a/datafusion/physical-plan/src/metrics/baseline.rs +++ b/datafusion/physical-plan/src/metrics/baseline.rs @@ -151,7 +151,7 @@ pub struct SpillMetrics { /// count of spills during the execution of the operator pub spill_file_count: Count, - /// total spilled bytes during the execution of the operator + /// total bytes actually written to disk during the execution of the operator pub spilled_bytes: Count, /// total spilled rows during the execution of the operator diff --git a/datafusion/physical-plan/src/sorts/sort.rs b/datafusion/physical-plan/src/sorts/sort.rs index f941827dd0..21f98fd012 100644 --- a/datafusion/physical-plan/src/sorts/sort.rs +++ b/datafusion/physical-plan/src/sorts/sort.rs @@ -1514,7 +1514,7 @@ mod tests { // bytes. We leave a little wiggle room for the actual numbers. assert!((3..=10).contains(&spill_count)); assert!((9000..=10000).contains(&spilled_rows)); - assert!((38000..=42000).contains(&spilled_bytes)); + assert!((38000..=44000).contains(&spilled_bytes)); let columns = result[0].columns(); diff --git a/datafusion/physical-plan/src/spill/in_progress_spill_file.rs b/datafusion/physical-plan/src/spill/in_progress_spill_file.rs index 61d54bb738..14917e23b7 100644 --- a/datafusion/physical-plan/src/spill/in_progress_spill_file.rs +++ b/datafusion/physical-plan/src/spill/in_progress_spill_file.rs @@ -75,7 +75,7 @@ impl InProgressSpillFile { } } if let Some(writer) = &mut self.writer { - let (spilled_rows, spilled_bytes) = writer.write(batch)?; + let (spilled_rows, _) = writer.write(batch)?; if let Some(in_progress_file) = &mut self.in_progress_file { in_progress_file.update_disk_usage()?; } else { @@ -83,7 +83,6 @@ impl InProgressSpillFile { } // Update metrics - self.spill_writer.metrics.spilled_bytes.add(spilled_bytes); self.spill_writer.metrics.spilled_rows.add(spilled_rows); } Ok(()) @@ -98,6 +97,14 @@ impl InProgressSpillFile { return Ok(None); } + // Since spill files are append-only, add the file size to spilled_bytes + if let Some(in_progress_file) = &mut self.in_progress_file { + // Since writer.finish() writes continuation marker and message length at the end + in_progress_file.update_disk_usage()?; + let size = in_progress_file.current_disk_usage(); + self.spill_writer.metrics.spilled_bytes.add(size as usize); + } + Ok(self.in_progress_file.take()) } } diff --git a/datafusion/physical-plan/src/spill/mod.rs b/datafusion/physical-plan/src/spill/mod.rs index a6dbce5370..a81221c8b6 100644 --- a/datafusion/physical-plan/src/spill/mod.rs +++ b/datafusion/physical-plan/src/spill/mod.rs @@ -809,12 +809,13 @@ mod tests { Arc::new(StringArray::from(vec!["d", "e", "f"])), ], )?; - + // After appending each batch, spilled_rows should increase, while spill_file_count and + // spilled_bytes remain the same (spilled_bytes is updated only after finish() is called) in_progress_file.append_batch(&batch1)?; - verify_metrics(&in_progress_file, 1, 356, 3)?; + verify_metrics(&in_progress_file, 1, 0, 3)?; in_progress_file.append_batch(&batch2)?; - verify_metrics(&in_progress_file, 1, 712, 6)?; + verify_metrics(&in_progress_file, 1, 0, 6)?; let completed_file = in_progress_file.finish()?; assert!(completed_file.is_some()); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@datafusion.apache.org For additional commands, e-mail: commits-h...@datafusion.apache.org