alamb commented on code in PR #15355:
URL: https://github.com/apache/datafusion/pull/15355#discussion_r2008788363


##########
datafusion/physical-plan/src/sorts/sort.rs:
##########
@@ -230,9 +219,14 @@ struct ExternalSorter {
     /// if `Self::in_mem_batches` are sorted
     in_mem_batches_sorted: bool,
 
-    /// If data has previously been spilled, the locations of the
-    /// spill files (in Arrow IPC format)
-    spills: Vec<RefCountedTempFile>,
+    /// During external sorting, in-memory intermediate data will be appended 
to
+    /// this file incrementally. Once finished, this file will be moved to 
[`Self::finished_spill_files`].
+    in_progress_spill_file: Option<InProgressSpillFile>,
+    /// If data has previously been spilled, the locations of the spill files 
(in
+    /// Arrow IPC format)
+    /// Within the same spill file, the data might be chunked into multiple 
batches,
+    /// and ordered by sort keys.
+    finished_spill_files: Vec<RefCountedTempFile>,

Review Comment:
   It might make more sense to have the `SpillManager` own these files so there 
can't be different sets of references



##########
datafusion/physical-plan/src/sorts/sort.rs:
##########
@@ -65,23 +63,14 @@ struct ExternalSorterMetrics {
     /// metrics
     baseline: BaselineMetrics,
 
-    /// count of spills during the execution of the operator

Review Comment:
   Nice



##########
datafusion/physical-plan/src/spill.rs:
##########
@@ -223,25 +229,182 @@ impl IPCStreamWriter {
     }
 }
 
+/// The `SpillManager` is responsible for the following tasks:

Review Comment:
   Love the spill manager 👍 



##########
datafusion/physical-plan/src/sorts/sort.rs:
##########
@@ -379,46 +382,64 @@ impl ExternalSorter {
 
     /// How many bytes have been spilled to disk?
     fn spilled_bytes(&self) -> usize {
-        self.metrics.spilled_bytes.value()
+        self.metrics.spill_metrics.spilled_bytes.value()
     }
 
     /// How many rows have been spilled to disk?
     fn spilled_rows(&self) -> usize {
-        self.metrics.spilled_rows.value()
+        self.metrics.spill_metrics.spilled_rows.value()
     }
 
     /// How many spill files have been created?
     fn spill_count(&self) -> usize {
-        self.metrics.spill_count.value()
+        self.metrics.spill_metrics.spill_file_count.value()
     }
 
-    /// Writes any `in_memory_batches` to a spill file and clears
-    /// the batches. The contents of the spill file are sorted.
-    ///
-    /// Returns the amount of memory freed.
-    async fn spill(&mut self) -> Result<usize> {
+    /// When calling, all `in_mem_batches` must be sorted, and then all of 
them will
+    /// be appended to the in-progress spill file.
+    async fn spill_append(&mut self) -> Result<()> {
         // we could always get a chance to free some memory as long as we are 
holding some
         if self.in_mem_batches.is_empty() {
-            return Ok(0);
+            return Ok(());
+        }
+
+        // Lazily initialize the in-progress spill file
+        if self.in_progress_spill_file.is_none() {
+            self.in_progress_spill_file =
+                Some(self.spill_manager.create_in_progress_file("Sorting")?);
         }
 
         self.organize_stringview_arrays()?;
 
         debug!("Spilling sort data of ExternalSorter to disk whilst 
inserting");
 
-        let spill_file = self.runtime.disk_manager.create_tmp_file("Sorting")?;
         let batches = std::mem::take(&mut self.in_mem_batches);
-        let (spilled_rows, spilled_bytes) = spill_record_batches(
-            &batches,
-            spill_file.path().into(),
-            Arc::clone(&self.schema),
-        )?;
-        let used = self.reservation.free();
-        self.metrics.spill_count.add(1);
-        self.metrics.spilled_bytes.add(spilled_bytes);
-        self.metrics.spilled_rows.add(spilled_rows);
-        self.spills.push(spill_file);
-        Ok(used)
+        self.reservation.free();
+
+        let in_progress_file = 
self.in_progress_spill_file.as_mut().ok_or_else(|| {
+            internal_datafusion_err!("In-progress spill file should be 
initialized")
+        })?;
+
+        for batch in batches {

Review Comment:
   I don't understand this logic -- i thought that each individual 
`self.in_mem_batches`  was sorted but they aren't sorted overall
   
   Thus if we write write them back to back to the same spill file, the spill 
file itself won't be sorted
   
   Like if the two in memory batches are
   
   | A | B |
   |--------|--------|
   | 1 | 10|
   | 2 | 10 |
   | 2 | 10 | 
   
   | A | B |
   |--------|--------|
   | 1 | 10|
   | 2 | 10 |
   | 2 | 10 | 
   
   I think this code would produce a single spill file like
   
   | A | B |
   |--------|--------|
   | 1 | 10|
   | 2 | 10 |
   | 2 | 10 | 
   | 1 | 10|
   | 2 | 10 |
   | 2 | 10 | 
   
   
   Which is not sorted 🤔 
   
   On the other hand all the tests are passing so maybe I misunderstand what 
this is doing (or we have a testing gap)



##########
datafusion/physical-plan/src/sorts/sort.rs:
##########
@@ -379,46 +382,64 @@ impl ExternalSorter {
 
     /// How many bytes have been spilled to disk?
     fn spilled_bytes(&self) -> usize {
-        self.metrics.spilled_bytes.value()
+        self.metrics.spill_metrics.spilled_bytes.value()
     }
 
     /// How many rows have been spilled to disk?
     fn spilled_rows(&self) -> usize {
-        self.metrics.spilled_rows.value()
+        self.metrics.spill_metrics.spilled_rows.value()
     }
 
     /// How many spill files have been created?
     fn spill_count(&self) -> usize {
-        self.metrics.spill_count.value()
+        self.metrics.spill_metrics.spill_file_count.value()
     }
 
-    /// Writes any `in_memory_batches` to a spill file and clears
-    /// the batches. The contents of the spill file are sorted.
-    ///
-    /// Returns the amount of memory freed.
-    async fn spill(&mut self) -> Result<usize> {
+    /// When calling, all `in_mem_batches` must be sorted, and then all of 
them will
+    /// be appended to the in-progress spill file.

Review Comment:
   If they must all be sorted, then maybe you can put an assert/check that 
`self.in_mem_batches_sorted` is true



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org
For additional commands, e-mail: github-h...@datafusion.apache.org

Reply via email to