2010YOUY01 commented on code in PR #15355:
URL: https://github.com/apache/datafusion/pull/15355#discussion_r2009004733
##########
datafusion/physical-plan/src/sorts/sort.rs:
##########
@@ -230,9 +219,14 @@ struct ExternalSorter {
/// if `Self::in_mem_batches` are sorted
in_mem_batches_sorted: bool,
- /// If data has previously been spilled, the locations of the
- /// spill files (in Arrow IPC format)
- spills: Vec<RefCountedTempFile>,
+ /// During external sorting, in-memory intermediate data will be appended
to
+ /// this file incrementally. Once finished, this file will be moved to
[`Self::finished_spill_files`].
+ in_progress_spill_file: Option<InProgressSpillFile>,
+ /// If data has previously been spilled, the locations of the spill files
(in
+ /// Arrow IPC format)
+ /// Within the same spill file, the data might be chunked into multiple
batches,
+ /// and ordered by sort keys.
+ finished_spill_files: Vec<RefCountedTempFile>,
Review Comment:
I think it will be hard to define the semantics of those temp files if we
put them inside `SpillManager`, because different operators will interpret
those files differently:
- For `SortExec`, `vec<RefCountedTempFile>` is representing multiple sorted
runs on sort keys.
- For `ShuffleWriterExec` in `datafusion-comet`, since `Spark`'s shuffle
operator is blocking (due to spark's staged execution design), it might want to
keep `vec<InProgresSpillFile>` instead.
- Similarly, if we want to spill `Row`s to accelerate `SortExec`, or we want
to implement spilling hash join, the temp files will have very different
logical meanings.
Overall, the `SpillManager` is designed only to do `RecordBatch <-> raw
file` with different configurations and stat accounting. Operators have more
flexibility to implement specific utilities for managing raw files, which have
diverse semantics.
Do you see any potential issues or improvements?
##########
datafusion/physical-plan/src/sorts/sort.rs:
##########
@@ -379,46 +382,64 @@ impl ExternalSorter {
/// How many bytes have been spilled to disk?
fn spilled_bytes(&self) -> usize {
- self.metrics.spilled_bytes.value()
+ self.metrics.spill_metrics.spilled_bytes.value()
}
/// How many rows have been spilled to disk?
fn spilled_rows(&self) -> usize {
- self.metrics.spilled_rows.value()
+ self.metrics.spill_metrics.spilled_rows.value()
}
/// How many spill files have been created?
fn spill_count(&self) -> usize {
- self.metrics.spill_count.value()
+ self.metrics.spill_metrics.spill_file_count.value()
}
- /// Writes any `in_memory_batches` to a spill file and clears
- /// the batches. The contents of the spill file are sorted.
- ///
- /// Returns the amount of memory freed.
- async fn spill(&mut self) -> Result<usize> {
+ /// When calling, all `in_mem_batches` must be sorted, and then all of
them will
+ /// be appended to the in-progress spill file.
+ async fn spill_append(&mut self) -> Result<()> {
// we could always get a chance to free some memory as long as we are
holding some
if self.in_mem_batches.is_empty() {
- return Ok(0);
+ return Ok(());
+ }
+
+ // Lazily initialize the in-progress spill file
+ if self.in_progress_spill_file.is_none() {
+ self.in_progress_spill_file =
+ Some(self.spill_manager.create_in_progress_file("Sorting")?);
}
self.organize_stringview_arrays()?;
debug!("Spilling sort data of ExternalSorter to disk whilst
inserting");
- let spill_file = self.runtime.disk_manager.create_tmp_file("Sorting")?;
let batches = std::mem::take(&mut self.in_mem_batches);
- let (spilled_rows, spilled_bytes) = spill_record_batches(
- &batches,
- spill_file.path().into(),
- Arc::clone(&self.schema),
- )?;
- let used = self.reservation.free();
- self.metrics.spill_count.add(1);
- self.metrics.spilled_bytes.add(spilled_bytes);
- self.metrics.spilled_rows.add(spilled_rows);
- self.spills.push(spill_file);
- Ok(used)
+ self.reservation.free();
+
+ let in_progress_file =
self.in_progress_spill_file.as_mut().ok_or_else(|| {
+ internal_datafusion_err!("In-progress spill file should be
initialized")
+ })?;
+
+ for batch in batches {
Review Comment:
No, they are globally sorted. In different stages, `in_mem_batches` can
either represent unordered input, or globally sorted run (but chunked into
smaller batches)
I agree this approach has poor understandability and is error-prone, I'll
try to improve it.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]