This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new fcb193b941 Update spilled_bytes to report spill file size (#16535)
fcb193b941 is described below

commit fcb193b941c19eeda7a99e69ef6610776d905193
Author: ding-young <lsyh...@snu.ac.kr>
AuthorDate: Thu Jun 26 05:07:37 2025 +0900

    Update spilled_bytes to report spill file size (#16535)
---
 datafusion/execution/src/disk_manager.rs                     |  4 ++++
 datafusion/physical-plan/src/metrics/baseline.rs             |  2 +-
 datafusion/physical-plan/src/sorts/sort.rs                   |  2 +-
 datafusion/physical-plan/src/spill/in_progress_spill_file.rs | 11 +++++++++--
 datafusion/physical-plan/src/spill/mod.rs                    |  7 ++++---
 5 files changed, 19 insertions(+), 7 deletions(-)

diff --git a/datafusion/execution/src/disk_manager.rs 
b/datafusion/execution/src/disk_manager.rs
index 1810601fd3..82f2d75ac1 100644
--- a/datafusion/execution/src/disk_manager.rs
+++ b/datafusion/execution/src/disk_manager.rs
@@ -366,6 +366,10 @@ impl RefCountedTempFile {
 
         Ok(())
     }
+
+    pub fn current_disk_usage(&self) -> u64 {
+        self.current_file_disk_usage
+    }
 }
 
 /// When the temporary file is dropped, subtract its disk usage from the disk 
manager's total
diff --git a/datafusion/physical-plan/src/metrics/baseline.rs 
b/datafusion/physical-plan/src/metrics/baseline.rs
index b57652d4b6..a52336108a 100644
--- a/datafusion/physical-plan/src/metrics/baseline.rs
+++ b/datafusion/physical-plan/src/metrics/baseline.rs
@@ -151,7 +151,7 @@ pub struct SpillMetrics {
     /// count of spills during the execution of the operator
     pub spill_file_count: Count,
 
-    /// total spilled bytes during the execution of the operator
+    /// total bytes actually written to disk during the execution of the 
operator
     pub spilled_bytes: Count,
 
     /// total spilled rows during the execution of the operator
diff --git a/datafusion/physical-plan/src/sorts/sort.rs 
b/datafusion/physical-plan/src/sorts/sort.rs
index f941827dd0..21f98fd012 100644
--- a/datafusion/physical-plan/src/sorts/sort.rs
+++ b/datafusion/physical-plan/src/sorts/sort.rs
@@ -1514,7 +1514,7 @@ mod tests {
         // bytes. We leave a little wiggle room for the actual numbers.
         assert!((3..=10).contains(&spill_count));
         assert!((9000..=10000).contains(&spilled_rows));
-        assert!((38000..=42000).contains(&spilled_bytes));
+        assert!((38000..=44000).contains(&spilled_bytes));
 
         let columns = result[0].columns();
 
diff --git a/datafusion/physical-plan/src/spill/in_progress_spill_file.rs 
b/datafusion/physical-plan/src/spill/in_progress_spill_file.rs
index 61d54bb738..14917e23b7 100644
--- a/datafusion/physical-plan/src/spill/in_progress_spill_file.rs
+++ b/datafusion/physical-plan/src/spill/in_progress_spill_file.rs
@@ -75,7 +75,7 @@ impl InProgressSpillFile {
             }
         }
         if let Some(writer) = &mut self.writer {
-            let (spilled_rows, spilled_bytes) = writer.write(batch)?;
+            let (spilled_rows, _) = writer.write(batch)?;
             if let Some(in_progress_file) = &mut self.in_progress_file {
                 in_progress_file.update_disk_usage()?;
             } else {
@@ -83,7 +83,6 @@ impl InProgressSpillFile {
             }
 
             // Update metrics
-            self.spill_writer.metrics.spilled_bytes.add(spilled_bytes);
             self.spill_writer.metrics.spilled_rows.add(spilled_rows);
         }
         Ok(())
@@ -98,6 +97,14 @@ impl InProgressSpillFile {
             return Ok(None);
         }
 
+        // Since spill files are append-only, add the file size to 
spilled_bytes
+        if let Some(in_progress_file) = &mut self.in_progress_file {
+            // Since writer.finish() writes continuation marker and message 
length at the end
+            in_progress_file.update_disk_usage()?;
+            let size = in_progress_file.current_disk_usage();
+            self.spill_writer.metrics.spilled_bytes.add(size as usize);
+        }
+
         Ok(self.in_progress_file.take())
     }
 }
diff --git a/datafusion/physical-plan/src/spill/mod.rs 
b/datafusion/physical-plan/src/spill/mod.rs
index a6dbce5370..a81221c8b6 100644
--- a/datafusion/physical-plan/src/spill/mod.rs
+++ b/datafusion/physical-plan/src/spill/mod.rs
@@ -809,12 +809,13 @@ mod tests {
                 Arc::new(StringArray::from(vec!["d", "e", "f"])),
             ],
         )?;
-
+        // After appending each batch, spilled_rows should increase, while 
spill_file_count and
+        // spilled_bytes remain the same (spilled_bytes is updated only after 
finish() is called)
         in_progress_file.append_batch(&batch1)?;
-        verify_metrics(&in_progress_file, 1, 356, 3)?;
+        verify_metrics(&in_progress_file, 1, 0, 3)?;
 
         in_progress_file.append_batch(&batch2)?;
-        verify_metrics(&in_progress_file, 1, 712, 6)?;
+        verify_metrics(&in_progress_file, 1, 0, 6)?;
 
         let completed_file = in_progress_file.finish()?;
         assert!(completed_file.is_some());


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@datafusion.apache.org
For additional commands, e-mail: commits-h...@datafusion.apache.org

Reply via email to