mbutrovich commented on code in PR #21629:
URL: https://github.com/apache/datafusion/pull/21629#discussion_r3088742930


##########
datafusion/physical-plan/src/sorts/sort.rs:
##########
@@ -95,116 +93,76 @@ impl ExternalSorterMetrics {
 ///
 /// # Algorithm
 ///
-/// 1. get a non-empty new batch from input
+/// Incoming batches are coalesced via [`BatchCoalescer`] to
+/// `sort_coalesce_target_rows` (default 32768) before sorting. This
+/// reduces merge fan-in by producing fewer, larger sorted runs.
 ///
-/// 2. check with the memory manager there is sufficient space to
-///    buffer the batch in memory.
+/// Each coalesced batch is sorted immediately and stored as a
+/// pre-sorted run.
 ///
-/// 2.1 if memory is sufficient, buffer batch in memory, go to 1.
+/// 1. For each incoming batch:
+///    - Reserve memory (2x batch size). If reservation fails, flush
+///      the coalescer, spill all sorted runs to disk, then retry.
+///    - Push batch into the coalescer.
+///    - If the coalescer reached its target: sort the coalesced batch
+///      and store as a new sorted run.
 ///
-/// 2.2 if no more memory is available, sort all buffered batches and
-///     spill to file.  buffer the next batch in memory, go to 1.
-///
-/// 3. when input is exhausted, merge all in memory batches and spills
-///    to get a total order.
+/// 2. When input is exhausted, merge all sorted runs (and any spill
+///    files) to produce a total order.
 ///
 /// # When data fits in available memory
 ///
-/// If there is sufficient memory, data is sorted in memory to produce the 
output
+/// Sorted runs are merged in memory using a loser-tree k-way merge
+/// (via [`StreamingMergeBuilder`]).
 ///
 /// ```text
-///    ┌─────┐
-///    │  2  │
-///    │  3  │
-///    │  1  │─ ─ ─ ─ ─ ─ ─ ─ ─ ┐
-///    │  4  │
-///    │  2  │                  │
-///    └─────┘                  ▼
-///    ┌─────┐
-///    │  1  │              In memory
-///    │  4  │─ ─ ─ ─ ─ ─▶ sort/merge  ─ ─ ─ ─ ─▶  total sorted output
-///    │  1  │
-///    └─────┘                  ▲
-///      ...                    │
-///
-///    ┌─────┐                  │
-///    │  4  │
-///    │  3  │─ ─ ─ ─ ─ ─ ─ ─ ─ ┘
-///    └─────┘
-///
-/// in_mem_batches
+///   ┌──────────┐     ┌────────────┐     ┌──────┐     ┌────────────┐
+///   │ Incoming │────▶│  Batch     │────▶│ Sort │────▶│ Sorted Run │
+///   │ Batches  │     │ Coalescer  │     │      │     │ (in memory)│
+///   └──────────┘     └────────────┘     └──────┘     └─────┬──────┘
+///                                                          │
+///                                           ┌──────────────┘
+///                                           ▼
+///                                    k-way merge (loser tree)
+///                                           │
+///                                           ▼
+///                                    total sorted output
 /// ```
 ///
 /// # When data does not fit in available memory
 ///
-///  When memory is exhausted, data is first sorted and written to one
-///  or more spill files on disk:
-///
-/// ```text
-///    ┌─────┐                               .─────────────────.
-///    │  2  │                              (                   )
-///    │  3  │                              │`─────────────────'│
-///    │  1  │─ ─ ─ ─ ─ ─ ─                 │  ┌────┐           │
-///    │  4  │             │                │  │ 1  │░          │
-///    │  2  │                              │  │... │░          │
-///    └─────┘             ▼                │  │ 4  │░  ┌ ─ ─   │
-///    ┌─────┐                              │  └────┘░    1  │░ │
-///    │  1  │         In memory            │   ░░░░░░  │    ░░ │
-///    │  4  │─ ─ ▶   sort/merge    ─ ─ ─ ─ ┼ ─ ─ ─ ─ ─▶ ... │░ │
-///    │  1  │     and write to file        │           │    ░░ │
-///    └─────┘                              │             4  │░ │
-///      ...               ▲                │           └░─░─░░ │
-///                        │                │            ░░░░░░ │
-///    ┌─────┐                              │.─────────────────.│
-///    │  4  │             │                (                   )
-///    │  3  │─ ─ ─ ─ ─ ─ ─                  `─────────────────'
-///    └─────┘
-///
-/// in_mem_batches                                  spills
-///                                         (file on disk in Arrow
-///                                               IPC format)
-/// ```
-///
-/// Once the input is completely read, the spill files are read and
-/// merged with any in memory batches to produce a single total sorted
-/// output:
+/// When memory is exhausted, sorted runs are spilled directly to disk
+/// (one spill file per run — no merge needed since runs are already
+/// sorted). `MultiLevelMergeBuilder` handles the final merge from disk
+/// with dynamic fan-in.
 ///
 /// ```text
-///   .─────────────────.
-///  (                   )
-///  │`─────────────────'│
-///  │  ┌────┐           │
-///  │  │ 1  │░          │
-///  │  │... │─ ─ ─ ─ ─ ─│─ ─ ─ ─ ─ ─
-///  │  │ 4  │░ ┌────┐   │           │
-///  │  └────┘░ │ 1  │░  │           ▼
-///  │   ░░░░░░ │    │░  │
-///  │          │... │─ ─│─ ─ ─ ▶ merge  ─ ─ ─▶  total sorted output
-///  │          │    │░  │
-///  │          │ 4  │░  │           ▲
-///  │          └────┘░  │           │
-///  │           ░░░░░░  │
-///  │.─────────────────.│           │
-///  (                   )
-///   `─────────────────'            │
-///         spills
+///   ┌──────────┐     ┌────────────┐     ┌──────┐     ┌────────────┐
+///   │ Incoming │────▶│  Batch     │────▶│ Sort │────▶│ Sorted Run │

Review Comment:
   @andygrove and I have been discussing this in the context of Comet, where 
for some Spark stages that get translated to native plans we'd possibly want to 
set different target batch sizes. What you're proposing sounds even more 
granular, where individual operators could potentially advertise an ideal input 
batch size, though this could get complicated fast based on schema. Maybe some 
sort of trait on operators to advertise when they want larger batch sizes, and 
the optimizer could figure out what that target should be?
   
   Regardless, a general solution seems out of scope for this PR.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to