This is an automated email from the ASF dual-hosted git repository.
yjshen pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new fcddabc633 Minor: Add documentation + diagrams for ExternalSorter
(#7179)
fcddabc633 is described below
commit fcddabc6337f096a25fa22b7fbd2b11b29c9bc08
Author: Andrew Lamb <[email protected]>
AuthorDate: Thu Aug 3 10:50:40 2023 -0500
Minor: Add documentation + diagrams for ExternalSorter (#7179)
* Minor: Add documentation for ExternalSorter
* Apply suggestions from code review
Co-authored-by: Yijie Shen <[email protected]>
* Improve documentation for `sort()`
---------
Co-authored-by: Yijie Shen <[email protected]>
---
datafusion/core/src/physical_plan/sorts/sort.rs | 217 ++++++++++++++++++++++--
1 file changed, 205 insertions(+), 12 deletions(-)
diff --git a/datafusion/core/src/physical_plan/sorts/sort.rs
b/datafusion/core/src/physical_plan/sorts/sort.rs
index fc45acbc43..c7ae09bb2e 100644
--- a/datafusion/core/src/physical_plan/sorts/sort.rs
+++ b/datafusion/core/src/physical_plan/sorts/sort.rs
@@ -76,27 +76,147 @@ impl ExternalSorterMetrics {
}
}
-/// Sort arbitrary size of data to get a total order (may spill several times
during sorting based on free memory available).
+/// Sorts an arbitrary sized, unsorted, stream of [`RecordBatch`]es to
+/// a total order. Depending on the input size and memory manager
+/// configuration, writes intermediate results to disk ("spills")
+/// using Arrow IPC format.
+///
+/// # Algorithm
///
-/// The basic architecture of the algorithm:
/// 1. get a non-empty new batch from input
-/// 2. check with the memory manager if we could buffer the batch in memory
-/// 2.1 if memory sufficient, then buffer batch in memory, go to 1.
-/// 2.2 if the memory threshold is reached, sort all buffered batches and
spill to file.
-/// buffer the batch in memory, go to 1.
-/// 3. when input is exhausted, merge all in memory batches and spills to get
a total order.
+///
+/// 2. check with the memory manager there is sufficient space to
+/// buffer the batch in memory 2.1 if memory sufficient, buffer
+/// batch in memory, go to 1.
+///
+/// 2.2 if no more memory is available, sort all buffered batches and
+/// spill to file. buffer the next batch in memory, go to 1.
+///
+/// 3. when input is exhausted, merge all in memory batches and spills
+/// to get a total order.
+///
+/// # When data fits in available memory
+///
+/// If there is sufficient memory, data is sorted in memory to produce the
output
+///
+/// ```text
+/// ┌─────┐
+/// │ 2 │
+/// │ 3 │
+/// │ 1 │─ ─ ─ ─ ─ ─ ─ ─ ─ ┐
+/// │ 4 │
+/// │ 2 │ │
+/// └─────┘ ▼
+/// ┌─────┐
+/// │ 1 │ In memory
+/// │ 4 │─ ─ ─ ─ ─ ─▶ sort/merge ─ ─ ─ ─ ─▶ total sorted output
+/// │ 1 │
+/// └─────┘ ▲
+/// ... │
+///
+/// ┌─────┐ │
+/// │ 4 │
+/// │ 3 │─ ─ ─ ─ ─ ─ ─ ─ ─ ┘
+/// └─────┘
+///
+/// in_mem_batches
+///
+/// ```
+///
+/// # When data does not fit in available memory
+///
+/// When memory is exhausted, data is first sorted and written to one
+/// or more spill files on disk:
+///
+/// ```text
+/// ┌─────┐ .─────────────────.
+/// │ 2 │ ( )
+/// │ 3 │ │`─────────────────'│
+/// │ 1 │─ ─ ─ ─ ─ ─ ─ │ ┌────┐ │
+/// │ 4 │ │ │ │ 1 │░ │
+/// │ 2 │ │ │... │░ │
+/// └─────┘ ▼ │ │ 4 │░ ┌ ─ ─ │
+/// ┌─────┐ │ └────┘░ 1 │░ │
+/// │ 1 │ In memory │ ░░░░░░ │ ░░ │
+/// │ 4 │─ ─ ▶ sort/merge ─ ─ ─ ─ ┼ ─ ─ ─ ─ ─▶ ... │░ │
+/// │ 1 │ and write to file │ │ ░░ │
+/// └─────┘ │ 4 │░ │
+/// ... ▲ │ └░─░─░░ │
+/// │ │ ░░░░░░ │
+/// ┌─────┐ │.─────────────────.│
+/// │ 4 │ │ ( )
+/// │ 3 │─ ─ ─ ─ ─ ─ ─ `─────────────────'
+/// └─────┘
+///
+/// in_mem_batches spills
+/// (file on disk in Arrow
+/// IPC format)
+/// ```
+///
+/// Once the input is completely read, the spill files are read and
+/// merged with any in memory batches to produce a single total sorted
+/// output:
+///
+/// ```text
+/// .─────────────────.
+/// ( )
+/// │`─────────────────'│
+/// │ ┌────┐ │
+/// │ │ 1 │░ │
+/// │ │... │─ ─ ─ ─ ─ ─│─ ─ ─ ─ ─ ─
+/// │ │ 4 │░ ┌────┐ │ │
+/// │ └────┘░ │ 1 │░ │ ▼
+/// │ ░░░░░░ │ │░ │
+/// │ │... │─ ─│─ ─ ─ ▶ merge ─ ─ ─▶ total sorted output
+/// │ │ │░ │
+/// │ │ 4 │░ │ ▲
+/// │ └────┘░ │ │
+/// │ ░░░░░░ │
+/// │.─────────────────.│ │
+/// ( )
+/// `─────────────────' │
+/// spills
+/// │
+///
+/// │
+///
+/// ┌─────┐ │
+/// │ 1 │
+/// │ 4 │─ ─ ─ ─ │
+/// └─────┘ │
+/// ... In memory
+/// └ ─ ─ ─▶ sort/merge
+/// ┌─────┐
+/// │ 4 │ ▲
+/// │ 3 │─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┘
+/// └─────┘
+///
+/// in_mem_batches
+/// ```
struct ExternalSorter {
+ /// schema of the output (and the input)
schema: SchemaRef,
+ /// Potentially unsorted in memory buffer
in_mem_batches: Vec<RecordBatch>,
+ /// if `Self::in_mem_batches` are sorted
in_mem_batches_sorted: bool,
+ /// If data has previously been spilled, the locations of the
+ /// spill files (in Arrow IPC format)
spills: Vec<NamedTempFile>,
/// Sort expressions
expr: Arc<[PhysicalSortExpr]>,
+ /// Runtime metrics
metrics: ExternalSorterMetrics,
+ /// If Some, the maximum number of output rows that will be
+ /// produced.
fetch: Option<usize>,
+ /// Memory usage tracking
reservation: MemoryReservation,
+ /// The partition id that this Sort is handling (for identification)
partition_id: usize,
+ /// A handle to the runtime to get Disk spill files
runtime: Arc<RuntimeEnv>,
+ /// The target number of rows for output batches
batch_size: usize,
}
@@ -142,7 +262,7 @@ impl ExternalSorter {
if self.reservation.try_grow(size).is_err() {
let before = self.reservation.size();
self.in_mem_sort().await?;
- // Sorting may have freed memory, especially if fetch is not `None`
+ // Sorting may have freed memory, especially if fetch is `Some`
//
// As such we check again, and if the memory usage has dropped by
// a factor of 2, and we can allocate the necessary capacity,
@@ -168,7 +288,15 @@ impl ExternalSorter {
!self.spills.is_empty()
}
- /// MergeSort in mem batches as well as spills into total order with
`SortPreservingMergeStream`.
+ /// Returns the final sorted output of all batches inserted via
+ /// [`Self::insert_batch`] as a stream of [`RecordBatch`]es.
+ ///
+ /// This process could either be:
+ ///
+ /// 1. An in-memory sort/merge (if the input fit in memory)
+ ///
+ /// 2. A combined streaming merge incorporating both in-memory
+ /// batches and data from spill files on disk.
fn sort(&mut self) -> Result<SendableRecordBatchStream> {
if self.spilled_before() {
let mut streams = vec![];
@@ -201,18 +329,25 @@ impl ExternalSorter {
}
}
+ /// How much memory is buffered in this `ExternalSorter`?
fn used(&self) -> usize {
self.reservation.size()
}
+ /// How many bytes have been spilled to disk?
fn spilled_bytes(&self) -> usize {
self.metrics.spilled_bytes.value()
}
+ /// How many spill files have been created?
fn spill_count(&self) -> usize {
self.metrics.spill_count.value()
}
+ /// Writes any `in_memory_batches` to a spill file and clears
+ /// the batches. The contents of the spil file are sorted.
+ ///
+ /// Returns the amount of memory freed.
async fn spill(&mut self) -> Result<usize> {
// we could always get a chance to free some memory as long as we are
holding some
if self.in_mem_batches.is_empty() {
@@ -255,7 +390,64 @@ impl ExternalSorter {
Ok(())
}
- /// Consumes in_mem_batches returning a sorted stream
+ /// Consumes in_mem_batches returning a sorted stream of
+ /// batches. This proceeds in one of two ways:
+ ///
+ /// # Small Datasets
+ ///
+ /// For "smaller" datasets, the data is first concatenated into a
+ /// single batch and then sorted. This is often faster than
+ /// sorting and then merging.
+ ///
+ /// ```text
+ /// ┌─────┐
+ /// │ 2 │
+ /// │ 3 │
+ /// │ 1 │─ ─ ─ ─ ┐ ┌─────┐
+ /// │ 4 │ │ 2 │
+ /// │ 2 │ │ │ 3 │
+ /// └─────┘ │ 1 │ sorted output
+ /// ┌─────┐ ▼ │ 4 │ stream
+ /// │ 1 │ │ 2 │
+ /// │ 4 │─ ─▶ concat ─ ─ ─ ─ ▶│ 1 │─ ─ ▶ sort ─ ─ ─ ─ ─▶
+ /// │ 1 │ │ 4 │
+ /// └─────┘ ▲ │ 1 │
+ /// ... │ │ ... │
+ /// │ 4 │
+ /// ┌─────┐ │ │ 3 │
+ /// │ 4 │ └─────┘
+ /// │ 3 │─ ─ ─ ─ ┘
+ /// └─────┘
+ /// in_mem_batches
+ /// ```
+ ///
+ /// # Larger datasets
+ ///
+ /// For larger datasets, the batches are first sorted individually
+ /// and then merged together.
+ ///
+ /// ```text
+ /// ┌─────┐ ┌─────┐
+ /// │ 2 │ │ 1 │
+ /// │ 3 │ │ 2 │
+ /// │ 1 │─ ─▶ sort ─ ─▶│ 2 │─ ─ ─ ─ ─ ┐
+ /// │ 4 │ │ 3 │
+ /// │ 2 │ │ 4 │ │
+ /// └─────┘ └─────┘ sorted output
+ /// ┌─────┐ ┌─────┐ ▼ stream
+ /// │ 1 │ │ 1 │
+ /// │ 4 │─ ▶ sort ─ ─ ▶│ 1 ├ ─ ─ ▶ merge ─ ─ ─ ─▶
+ /// │ 1 │ │ 4 │
+ /// └─────┘ └─────┘ ▲
+ /// ... ... ... │
+ ///
+ /// ┌─────┐ ┌─────┐ │
+ /// │ 4 │ │ 3 │
+ /// │ 3 │─ ▶ sort ─ ─ ▶│ 4 │─ ─ ─ ─ ─ ┘
+ /// └─────┘ └─────┘
+ ///
+ /// in_mem_batches
+ /// ```
fn in_mem_sort_stream(
&mut self,
metrics: BaselineMetrics,
@@ -296,6 +488,7 @@ impl ExternalSorter {
)
}
+ /// Sorts a single `RecordBatch` into a single stream
fn sort_batch_stream(
&self,
batch: RecordBatch,
@@ -417,8 +610,8 @@ fn read_spill(sender: Sender<Result<RecordBatch>>, path:
&Path) -> Result<()> {
/// Sort execution plan.
///
-/// This operator supports sorting datasets that are larger than the
-/// memory allotted by the memory manager, by spilling to disk.
+/// Support sorting datasets that are larger than the memory allotted
+/// by the memory manager, by spilling to disk.
#[derive(Debug)]
pub struct SortExec {
/// Input schema