alamb commented on code in PR #15355: URL: https://github.com/apache/datafusion/pull/15355#discussion_r2009077074
########## datafusion/physical-plan/src/spill.rs: ########## @@ -223,25 +229,182 @@ impl IPCStreamWriter { } } +/// The `SpillManager` is responsible for the following tasks: +/// - Reading and writing `RecordBatch`es to raw files based on the provided configurations. +/// - Updating the associated metrics. +/// +/// Note: The caller (external operators such as `SortExec`) is responsible for interpreting the spilled files. +/// For example, all records within the same spill file are ordered according to a specific order. +#[derive(Debug, Clone)] +pub(crate) struct SpillManager { + env: Arc<RuntimeEnv>, + metrics: SpillMetrics, + schema: SchemaRef, + /// Number of batches to buffer in memory during disk reads + batch_read_buffer_capacity: usize, + // TODO: Add general-purpose compression options +} + +impl SpillManager { + pub fn new(env: Arc<RuntimeEnv>, metrics: SpillMetrics, schema: SchemaRef) -> Self { + Self { + env, + metrics, + schema, + batch_read_buffer_capacity: 2, + } + } + + /// Creates a temporary file for in-progress operations, returning an error + /// message if file creation fails. The file can be used to append batches + /// incrementally and then finish the file when done. + pub fn create_in_progress_file( + &self, + request_msg: &str, + ) -> Result<InProgressSpillFile> { + let temp_file = self.env.disk_manager.create_tmp_file(request_msg)?; + Ok(InProgressSpillFile::new(Arc::new(self.clone()), temp_file)) + } + + /// Spill input `batches` into a single file in a atomic operation. If it is + /// intended to incrementally write in-memory batches into the same spill file, + /// use [`Self::create_in_progress_file`] instead. + /// None is returned if no batches are spilled. + #[allow(dead_code)] // TODO: remove after change SPM to use SpillManager + pub fn spill_record_batch_and_finish( + &self, + batches: &[RecordBatch], + request_msg: &str, + ) -> Result<Option<RefCountedTempFile>> { + let mut in_progress_file = self.create_in_progress_file(request_msg)?; + + for batch in batches { + in_progress_file.append_batch(batch)?; + } + + in_progress_file.finish() + } + + /// Refer to the documentation for [`Self::spill_record_batch_and_finish`]. This method + /// additionally spills the `RecordBatch` into smaller batches, divided by `row_limit`. + #[allow(dead_code)] // TODO: remove after change aggregate to use SpillManager + pub fn spill_record_batch_by_size( + &self, + batch: &RecordBatch, + request_description: &str, + row_limit: usize, + ) -> Result<Option<RefCountedTempFile>> { + let total_rows = batch.num_rows(); + let mut batches = Vec::new(); + let mut offset = 0; + + // It's ok to calculate all slices first, because slicing is zero-copy. + while offset < total_rows { + let length = std::cmp::min(total_rows - offset, row_limit); + let sliced_batch = batch.slice(offset, length); + batches.push(sliced_batch); + offset += length; + } + + // Spill the sliced batches to disk + self.spill_record_batch_and_finish(&batches, request_description) + } + + /// Reads a spill file as a stream. The file must be created by the current `SpillManager`. + pub fn read_spill_as_stream( + &self, + spill_file_path: RefCountedTempFile, + ) -> Result<SendableRecordBatchStream> { + let mut builder = RecordBatchReceiverStream::builder( + Arc::clone(&self.schema), + self.batch_read_buffer_capacity, + ); + let sender = builder.tx(); + + builder.spawn_blocking(move || read_spill(sender, spill_file_path.path())); + + Ok(builder.build()) + } +} + +pub(crate) struct InProgressSpillFile { Review Comment: I think it would help to add some high level comments here about what an InProgressSpill file is ########## datafusion/physical-plan/src/spill.rs: ########## @@ -223,25 +229,182 @@ impl IPCStreamWriter { } } +/// The `SpillManager` is responsible for the following tasks: +/// - Reading and writing `RecordBatch`es to raw files based on the provided configurations. +/// - Updating the associated metrics. +/// +/// Note: The caller (external operators such as `SortExec`) is responsible for interpreting the spilled files. +/// For example, all records within the same spill file are ordered according to a specific order. +#[derive(Debug, Clone)] +pub(crate) struct SpillManager { Review Comment: As a follow on PR, I suggest starting to break up this code into multiple modules (like `spill/mod.rs`, `spill/spill_manager.rs`, etc ########## datafusion/physical-plan/src/sorts/sort.rs: ########## @@ -379,46 +382,64 @@ impl ExternalSorter { /// How many bytes have been spilled to disk? fn spilled_bytes(&self) -> usize { - self.metrics.spilled_bytes.value() + self.metrics.spill_metrics.spilled_bytes.value() } /// How many rows have been spilled to disk? fn spilled_rows(&self) -> usize { - self.metrics.spilled_rows.value() + self.metrics.spill_metrics.spilled_rows.value() } /// How many spill files have been created? fn spill_count(&self) -> usize { - self.metrics.spill_count.value() + self.metrics.spill_metrics.spill_file_count.value() } - /// Writes any `in_memory_batches` to a spill file and clears - /// the batches. The contents of the spill file are sorted. - /// - /// Returns the amount of memory freed. - async fn spill(&mut self) -> Result<usize> { + /// When calling, all `in_mem_batches` must be sorted, and then all of them will + /// be appended to the in-progress spill file. + async fn spill_append(&mut self) -> Result<()> { // we could always get a chance to free some memory as long as we are holding some if self.in_mem_batches.is_empty() { - return Ok(0); + return Ok(()); + } + + // Lazily initialize the in-progress spill file + if self.in_progress_spill_file.is_none() { + self.in_progress_spill_file = + Some(self.spill_manager.create_in_progress_file("Sorting")?); } self.organize_stringview_arrays()?; debug!("Spilling sort data of ExternalSorter to disk whilst inserting"); - let spill_file = self.runtime.disk_manager.create_tmp_file("Sorting")?; let batches = std::mem::take(&mut self.in_mem_batches); - let (spilled_rows, spilled_bytes) = spill_record_batches( - &batches, - spill_file.path().into(), - Arc::clone(&self.schema), - )?; - let used = self.reservation.free(); - self.metrics.spill_count.add(1); - self.metrics.spilled_bytes.add(spilled_bytes); - self.metrics.spilled_rows.add(spilled_rows); - self.spills.push(spill_file); - Ok(used) + self.reservation.free(); + + let in_progress_file = self.in_progress_spill_file.as_mut().ok_or_else(|| { + internal_datafusion_err!("In-progress spill file should be initialized") + })?; + + for batch in batches { + in_progress_file.append_batch(&batch)?; + } + + Ok(()) + } + + /// Finishes the in-progress spill file and moves it to the finished spill files. + async fn spill_finish(&mut self) -> Result<()> { + let mut in_progress_file = Review Comment: I am finding the various states of the ExternalSorter hard to track (specifically what are the valid combinations of `in_mem_batches`, `in_progress_spill_file`, `spill`, and `sorted_in_mem` I wonder if we could move to some sort of state enum that would make this easier to understand Like ```rust struct SortState AllInMemory {...} InProgressSpill { ... } AllOnDisk {...} ... } ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For additional commands, e-mail: github-h...@datafusion.apache.org