alamb commented on code in PR #15355:
URL: https://github.com/apache/datafusion/pull/15355#discussion_r2009077074


##########
datafusion/physical-plan/src/spill.rs:
##########
@@ -223,25 +229,182 @@ impl IPCStreamWriter {
     }
 }
 
+/// The `SpillManager` is responsible for the following tasks:
+/// - Reading and writing `RecordBatch`es to raw files based on the provided 
configurations.
+/// - Updating the associated metrics.
+///
+/// Note: The caller (external operators such as `SortExec`) is responsible 
for interpreting the spilled files.
+/// For example, all records within the same spill file are ordered according 
to a specific order.
+#[derive(Debug, Clone)]
+pub(crate) struct SpillManager {
+    env: Arc<RuntimeEnv>,
+    metrics: SpillMetrics,
+    schema: SchemaRef,
+    /// Number of batches to buffer in memory during disk reads
+    batch_read_buffer_capacity: usize,
+    // TODO: Add general-purpose compression options
+}
+
+impl SpillManager {
+    pub fn new(env: Arc<RuntimeEnv>, metrics: SpillMetrics, schema: SchemaRef) 
-> Self {
+        Self {
+            env,
+            metrics,
+            schema,
+            batch_read_buffer_capacity: 2,
+        }
+    }
+
+    /// Creates a temporary file for in-progress operations, returning an error
+    /// message if file creation fails. The file can be used to append batches
+    /// incrementally and then finish the file when done.
+    pub fn create_in_progress_file(
+        &self,
+        request_msg: &str,
+    ) -> Result<InProgressSpillFile> {
+        let temp_file = self.env.disk_manager.create_tmp_file(request_msg)?;
+        Ok(InProgressSpillFile::new(Arc::new(self.clone()), temp_file))
+    }
+
+    /// Spill input `batches` into a single file in a atomic operation. If it 
is
+    /// intended to incrementally write in-memory batches into the same spill 
file,
+    /// use [`Self::create_in_progress_file`] instead.
+    /// None is returned if no batches are spilled.
+    #[allow(dead_code)] // TODO: remove after change SPM to use SpillManager
+    pub fn spill_record_batch_and_finish(
+        &self,
+        batches: &[RecordBatch],
+        request_msg: &str,
+    ) -> Result<Option<RefCountedTempFile>> {
+        let mut in_progress_file = self.create_in_progress_file(request_msg)?;
+
+        for batch in batches {
+            in_progress_file.append_batch(batch)?;
+        }
+
+        in_progress_file.finish()
+    }
+
+    /// Refer to the documentation for 
[`Self::spill_record_batch_and_finish`]. This method
+    /// additionally spills the `RecordBatch` into smaller batches, divided by 
`row_limit`.
+    #[allow(dead_code)] // TODO: remove after change aggregate to use 
SpillManager
+    pub fn spill_record_batch_by_size(
+        &self,
+        batch: &RecordBatch,
+        request_description: &str,
+        row_limit: usize,
+    ) -> Result<Option<RefCountedTempFile>> {
+        let total_rows = batch.num_rows();
+        let mut batches = Vec::new();
+        let mut offset = 0;
+
+        // It's ok to calculate all slices first, because slicing is zero-copy.
+        while offset < total_rows {
+            let length = std::cmp::min(total_rows - offset, row_limit);
+            let sliced_batch = batch.slice(offset, length);
+            batches.push(sliced_batch);
+            offset += length;
+        }
+
+        // Spill the sliced batches to disk
+        self.spill_record_batch_and_finish(&batches, request_description)
+    }
+
+    /// Reads a spill file as a stream. The file must be created by the 
current `SpillManager`.
+    pub fn read_spill_as_stream(
+        &self,
+        spill_file_path: RefCountedTempFile,
+    ) -> Result<SendableRecordBatchStream> {
+        let mut builder = RecordBatchReceiverStream::builder(
+            Arc::clone(&self.schema),
+            self.batch_read_buffer_capacity,
+        );
+        let sender = builder.tx();
+
+        builder.spawn_blocking(move || read_spill(sender, 
spill_file_path.path()));
+
+        Ok(builder.build())
+    }
+}
+
+pub(crate) struct InProgressSpillFile {

Review Comment:
   I think it would help to add some high level comments here about what an 
InProgressSpill file is



##########
datafusion/physical-plan/src/spill.rs:
##########
@@ -223,25 +229,182 @@ impl IPCStreamWriter {
     }
 }
 
+/// The `SpillManager` is responsible for the following tasks:
+/// - Reading and writing `RecordBatch`es to raw files based on the provided 
configurations.
+/// - Updating the associated metrics.
+///
+/// Note: The caller (external operators such as `SortExec`) is responsible 
for interpreting the spilled files.
+/// For example, all records within the same spill file are ordered according 
to a specific order.
+#[derive(Debug, Clone)]
+pub(crate) struct SpillManager {

Review Comment:
   As a follow on PR, I suggest starting to break up this code into multiple 
modules (like `spill/mod.rs`, `spill/spill_manager.rs`, etc



##########
datafusion/physical-plan/src/sorts/sort.rs:
##########
@@ -379,46 +382,64 @@ impl ExternalSorter {
 
     /// How many bytes have been spilled to disk?
     fn spilled_bytes(&self) -> usize {
-        self.metrics.spilled_bytes.value()
+        self.metrics.spill_metrics.spilled_bytes.value()
     }
 
     /// How many rows have been spilled to disk?
     fn spilled_rows(&self) -> usize {
-        self.metrics.spilled_rows.value()
+        self.metrics.spill_metrics.spilled_rows.value()
     }
 
     /// How many spill files have been created?
     fn spill_count(&self) -> usize {
-        self.metrics.spill_count.value()
+        self.metrics.spill_metrics.spill_file_count.value()
     }
 
-    /// Writes any `in_memory_batches` to a spill file and clears
-    /// the batches. The contents of the spill file are sorted.
-    ///
-    /// Returns the amount of memory freed.
-    async fn spill(&mut self) -> Result<usize> {
+    /// When calling, all `in_mem_batches` must be sorted, and then all of 
them will
+    /// be appended to the in-progress spill file.
+    async fn spill_append(&mut self) -> Result<()> {
         // we could always get a chance to free some memory as long as we are 
holding some
         if self.in_mem_batches.is_empty() {
-            return Ok(0);
+            return Ok(());
+        }
+
+        // Lazily initialize the in-progress spill file
+        if self.in_progress_spill_file.is_none() {
+            self.in_progress_spill_file =
+                Some(self.spill_manager.create_in_progress_file("Sorting")?);
         }
 
         self.organize_stringview_arrays()?;
 
         debug!("Spilling sort data of ExternalSorter to disk whilst 
inserting");
 
-        let spill_file = self.runtime.disk_manager.create_tmp_file("Sorting")?;
         let batches = std::mem::take(&mut self.in_mem_batches);
-        let (spilled_rows, spilled_bytes) = spill_record_batches(
-            &batches,
-            spill_file.path().into(),
-            Arc::clone(&self.schema),
-        )?;
-        let used = self.reservation.free();
-        self.metrics.spill_count.add(1);
-        self.metrics.spilled_bytes.add(spilled_bytes);
-        self.metrics.spilled_rows.add(spilled_rows);
-        self.spills.push(spill_file);
-        Ok(used)
+        self.reservation.free();
+
+        let in_progress_file = 
self.in_progress_spill_file.as_mut().ok_or_else(|| {
+            internal_datafusion_err!("In-progress spill file should be 
initialized")
+        })?;
+
+        for batch in batches {
+            in_progress_file.append_batch(&batch)?;
+        }
+
+        Ok(())
+    }
+
+    /// Finishes the in-progress spill file and moves it to the finished spill 
files.
+    async fn spill_finish(&mut self) -> Result<()> {
+        let mut in_progress_file =

Review Comment:
   I am finding the various states of the ExternalSorter hard to track 
(specifically what are the valid combinations of `in_mem_batches`, 
`in_progress_spill_file`, `spill`, and `sorted_in_mem`
   
   I wonder if we could move to some sort of state enum that would make this 
easier to understand
   
   Like 
   ```rust
   struct SortState
     AllInMemory {...}
     InProgressSpill { ... }
     AllOnDisk {...}
   ...
   }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org
For additional commands, e-mail: github-h...@datafusion.apache.org

Reply via email to