This is an automated email from the ASF dual-hosted git repository.

yjshen pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new fcddabc633 Minor: Add documentation + diagrams for ExternalSorter 
(#7179)
fcddabc633 is described below

commit fcddabc6337f096a25fa22b7fbd2b11b29c9bc08
Author: Andrew Lamb <[email protected]>
AuthorDate: Thu Aug 3 10:50:40 2023 -0500

    Minor: Add documentation + diagrams for ExternalSorter (#7179)
    
    * Minor: Add documentation for ExternalSorter
    
    * Apply suggestions from code review
    
    Co-authored-by: Yijie Shen <[email protected]>
    
    * Improve documentation for `sort()`
    
    ---------
    
    Co-authored-by: Yijie Shen <[email protected]>
---
 datafusion/core/src/physical_plan/sorts/sort.rs | 217 ++++++++++++++++++++++--
 1 file changed, 205 insertions(+), 12 deletions(-)

diff --git a/datafusion/core/src/physical_plan/sorts/sort.rs 
b/datafusion/core/src/physical_plan/sorts/sort.rs
index fc45acbc43..c7ae09bb2e 100644
--- a/datafusion/core/src/physical_plan/sorts/sort.rs
+++ b/datafusion/core/src/physical_plan/sorts/sort.rs
@@ -76,27 +76,147 @@ impl ExternalSorterMetrics {
     }
 }
 
-/// Sort arbitrary size of data to get a total order (may spill several times 
during sorting based on free memory available).
+/// Sorts an arbitrary sized, unsorted, stream of [`RecordBatch`]es to
+/// a total order. Depending on the input size and memory manager
+/// configuration, writes intermediate results to disk ("spills")
+/// using Arrow IPC format.
+///
+/// # Algorithm
 ///
-/// The basic architecture of the algorithm:
 /// 1. get a non-empty new batch from input
-/// 2. check with the memory manager if we could buffer the batch in memory
-/// 2.1 if memory sufficient, then buffer batch in memory, go to 1.
-/// 2.2 if the memory threshold is reached, sort all buffered batches and 
spill to file.
-///     buffer the batch in memory, go to 1.
-/// 3. when input is exhausted, merge all in memory batches and spills to get 
a total order.
+///
+/// 2. check with the memory manager there is sufficient space to
+///   buffer the batch in memory 2.1 if memory sufficient, buffer
+///   batch in memory, go to 1.
+///
+/// 2.2 if no more memory is available, sort all buffered batches and
+///     spill to file.  buffer the next batch in memory, go to 1.
+///
+/// 3. when input is exhausted, merge all in memory batches and spills
+/// to get a total order.
+///
+/// # When data fits in available memory
+///
+/// If there is sufficient memory, data is sorted in memory to produce the 
output
+///
+/// ```text
+///    ┌─────┐
+///    │  2  │
+///    │  3  │
+///    │  1  │─ ─ ─ ─ ─ ─ ─ ─ ─ ┐
+///    │  4  │
+///    │  2  │                  │
+///    └─────┘                  ▼
+///    ┌─────┐
+///    │  1  │              In memory
+///    │  4  │─ ─ ─ ─ ─ ─▶ sort/merge  ─ ─ ─ ─ ─▶  total sorted output
+///    │  1  │
+///    └─────┘                  ▲
+///      ...                    │
+///
+///    ┌─────┐                  │
+///    │  4  │
+///    │  3  │─ ─ ─ ─ ─ ─ ─ ─ ─ ┘
+///    └─────┘
+///
+/// in_mem_batches
+///
+/// ```
+///
+/// # When data does not fit in available memory
+///
+///  When memory is exhausted, data is first sorted and written to one
+///  or more spill files on disk:
+///
+/// ```text
+///    ┌─────┐                               .─────────────────.
+///    │  2  │                              (                   )
+///    │  3  │                              │`─────────────────'│
+///    │  1  │─ ─ ─ ─ ─ ─ ─                 │  ┌────┐           │
+///    │  4  │             │                │  │ 1  │░          │
+///    │  2  │                              │  │... │░          │
+///    └─────┘             ▼                │  │ 4  │░  ┌ ─ ─   │
+///    ┌─────┐                              │  └────┘░    1  │░ │
+///    │  1  │         In memory            │   ░░░░░░  │    ░░ │
+///    │  4  │─ ─ ▶   sort/merge    ─ ─ ─ ─ ┼ ─ ─ ─ ─ ─▶ ... │░ │
+///    │  1  │     and write to file        │           │    ░░ │
+///    └─────┘                              │             4  │░ │
+///      ...               ▲                │           └░─░─░░ │
+///                        │                │            ░░░░░░ │
+///    ┌─────┐                              │.─────────────────.│
+///    │  4  │             │                (                   )
+///    │  3  │─ ─ ─ ─ ─ ─ ─                  `─────────────────'
+///    └─────┘
+///
+/// in_mem_batches                                  spills
+///                                         (file on disk in Arrow
+///                                               IPC format)
+/// ```
+///
+/// Once the input is completely read, the spill files are read and
+/// merged with any in memory batches to produce a single total sorted
+/// output:
+///
+/// ```text
+///   .─────────────────.
+///  (                   )
+///  │`─────────────────'│
+///  │  ┌────┐           │
+///  │  │ 1  │░          │
+///  │  │... │─ ─ ─ ─ ─ ─│─ ─ ─ ─ ─ ─
+///  │  │ 4  │░ ┌────┐   │           │
+///  │  └────┘░ │ 1  │░  │           ▼
+///  │   ░░░░░░ │    │░  │
+///  │          │... │─ ─│─ ─ ─ ▶ merge  ─ ─ ─▶  total sorted output
+///  │          │    │░  │
+///  │          │ 4  │░  │           ▲
+///  │          └────┘░  │           │
+///  │           ░░░░░░  │
+///  │.─────────────────.│           │
+///  (                   )
+///   `─────────────────'            │
+///         spills
+///                                  │
+///
+///                                  │
+///
+///     ┌─────┐                      │
+///     │  1  │
+///     │  4  │─ ─ ─ ─               │
+///     └─────┘       │
+///       ...                   In memory
+///                   └ ─ ─ ─▶  sort/merge
+///     ┌─────┐
+///     │  4  │                      ▲
+///     │  3  │─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ─ ┘
+///     └─────┘
+///
+///  in_mem_batches
+/// ```
 struct ExternalSorter {
+    /// schema of the output (and the input)
     schema: SchemaRef,
+    /// Potentially unsorted in memory buffer
     in_mem_batches: Vec<RecordBatch>,
+    /// if `Self::in_mem_batches` are sorted
     in_mem_batches_sorted: bool,
+    /// If data has previously been spilled, the locations of the
+    /// spill files (in Arrow IPC format)
     spills: Vec<NamedTempFile>,
     /// Sort expressions
     expr: Arc<[PhysicalSortExpr]>,
+    /// Runtime metrics
     metrics: ExternalSorterMetrics,
+    /// If Some, the maximum number of output rows that will be
+    /// produced.
     fetch: Option<usize>,
+    /// Memory usage tracking
     reservation: MemoryReservation,
+    /// The partition id that this Sort is handling (for identification)
     partition_id: usize,
+    /// A handle to the runtime to get Disk spill files
     runtime: Arc<RuntimeEnv>,
+    /// The target number of rows for output batches
     batch_size: usize,
 }
 
@@ -142,7 +262,7 @@ impl ExternalSorter {
         if self.reservation.try_grow(size).is_err() {
             let before = self.reservation.size();
             self.in_mem_sort().await?;
-            // Sorting may have freed memory, especially if fetch is not `None`
+            // Sorting may have freed memory, especially if fetch is `Some`
             //
             // As such we check again, and if the memory usage has dropped by
             // a factor of 2, and we can allocate the necessary capacity,
@@ -168,7 +288,15 @@ impl ExternalSorter {
         !self.spills.is_empty()
     }
 
-    /// MergeSort in mem batches as well as spills into total order with 
`SortPreservingMergeStream`.
+    /// Returns the final sorted output of all batches inserted via
+    /// [`Self::insert_batch`] as a stream of [`RecordBatch`]es.
+    ///
+    /// This process could either be:
+    ///
+    /// 1. An in-memory sort/merge (if the input fit in memory)
+    ///
+    /// 2. A combined streaming merge incorporating both in-memory
+    /// batches and data from spill files on disk.
     fn sort(&mut self) -> Result<SendableRecordBatchStream> {
         if self.spilled_before() {
             let mut streams = vec![];
@@ -201,18 +329,25 @@ impl ExternalSorter {
         }
     }
 
+    /// How much memory is buffered in this `ExternalSorter`?
     fn used(&self) -> usize {
         self.reservation.size()
     }
 
+    /// How many bytes have been spilled to disk?
     fn spilled_bytes(&self) -> usize {
         self.metrics.spilled_bytes.value()
     }
 
+    /// How many spill files have been created?
     fn spill_count(&self) -> usize {
         self.metrics.spill_count.value()
     }
 
+    /// Writes any `in_memory_batches` to a spill file and clears
+    /// the batches. The contents of the spil file are sorted.
+    ///
+    /// Returns the amount of memory freed.
     async fn spill(&mut self) -> Result<usize> {
         // we could always get a chance to free some memory as long as we are 
holding some
         if self.in_mem_batches.is_empty() {
@@ -255,7 +390,64 @@ impl ExternalSorter {
         Ok(())
     }
 
-    /// Consumes in_mem_batches returning a sorted stream
+    /// Consumes in_mem_batches returning a sorted stream of
+    /// batches. This proceeds in one of two ways:
+    ///
+    /// # Small Datasets
+    ///
+    /// For "smaller" datasets, the data is first concatenated into a
+    /// single batch and then sorted. This is often faster than
+    /// sorting and then merging.
+    ///
+    /// ```text
+    ///        ┌─────┐
+    ///        │  2  │
+    ///        │  3  │
+    ///        │  1  │─ ─ ─ ─ ┐            ┌─────┐
+    ///        │  4  │                     │  2  │
+    ///        │  2  │        │            │  3  │
+    ///        └─────┘                     │  1  │             sorted output
+    ///        ┌─────┐        ▼            │  4  │                stream
+    ///        │  1  │                     │  2  │
+    ///        │  4  │─ ─▶ concat ─ ─ ─ ─ ▶│  1  │─ ─ ▶  sort  ─ ─ ─ ─ ─▶
+    ///        │  1  │                     │  4  │
+    ///        └─────┘        ▲            │  1  │
+    ///          ...          │            │ ... │
+    ///                                    │  4  │
+    ///        ┌─────┐        │            │  3  │
+    ///        │  4  │                     └─────┘
+    ///        │  3  │─ ─ ─ ─ ┘
+    ///        └─────┘
+    ///     in_mem_batches
+    /// ```
+    ///
+    /// # Larger datasets
+    ///
+    /// For larger datasets, the batches are first sorted individually
+    /// and then merged together.
+    ///
+    /// ```text
+    ///      ┌─────┐                ┌─────┐
+    ///      │  2  │                │  1  │
+    ///      │  3  │                │  2  │
+    ///      │  1  │─ ─▶  sort  ─ ─▶│  2  │─ ─ ─ ─ ─ ┐
+    ///      │  4  │                │  3  │
+    ///      │  2  │                │  4  │          │
+    ///      └─────┘                └─────┘               sorted output
+    ///      ┌─────┐                ┌─────┐          ▼       stream
+    ///      │  1  │                │  1  │
+    ///      │  4  │─ ▶  sort  ─ ─ ▶│  1  ├ ─ ─ ▶ merge  ─ ─ ─ ─▶
+    ///      │  1  │                │  4  │
+    ///      └─────┘                └─────┘          ▲
+    ///        ...       ...         ...             │
+    ///
+    ///      ┌─────┐                ┌─────┐          │
+    ///      │  4  │                │  3  │
+    ///      │  3  │─ ▶  sort  ─ ─ ▶│  4  │─ ─ ─ ─ ─ ┘
+    ///      └─────┘                └─────┘
+    ///
+    ///   in_mem_batches
+    /// ```
     fn in_mem_sort_stream(
         &mut self,
         metrics: BaselineMetrics,
@@ -296,6 +488,7 @@ impl ExternalSorter {
         )
     }
 
+    /// Sorts a single `RecordBatch` into a single stream
     fn sort_batch_stream(
         &self,
         batch: RecordBatch,
@@ -417,8 +610,8 @@ fn read_spill(sender: Sender<Result<RecordBatch>>, path: 
&Path) -> Result<()> {
 
 /// Sort execution plan.
 ///
-/// This operator supports sorting datasets that are larger than the
-/// memory allotted by the memory manager, by spilling to disk.
+/// Support sorting datasets that are larger than the memory allotted
+/// by the memory manager, by spilling to disk.
 #[derive(Debug)]
 pub struct SortExec {
     /// Input schema

Reply via email to