mbutrovich commented on code in PR #21600:
URL: https://github.com/apache/datafusion/pull/21600#discussion_r3080332211


##########
datafusion/physical-plan/src/sorts/sort.rs:
##########
@@ -323,56 +312,185 @@ impl ExternalSorter {
         self.reserve_memory_for_batch_and_maybe_spill(&input)
             .await?;
 
-        self.in_mem_batches.push(input);
+        let coalescer = self
+            .coalescer
+            .as_mut()
+            .expect("coalescer must exist during insert phase");
+        coalescer
+            .push_batch(input)
+            .map_err(|e| DataFusionError::ArrowError(Box::new(e), None))?;
+
+        self.drain_completed_batches()?;
+
+        Ok(())
+    }
+
+    /// Drains completed (full) batches from the coalescer, sorts each,
+    /// and appends the sorted chunks to `sorted_runs`.
+    fn drain_completed_batches(&mut self) -> Result<()> {
+        // Collect completed batches first to avoid borrow conflict
+        let mut completed = vec![];
+        if let Some(coalescer) = self.coalescer.as_mut() {
+            while let Some(batch) = coalescer.next_completed_batch() {
+                completed.push(batch);
+            }
+        }
+        for batch in &completed {
+            self.sort_and_store_run(batch)?;
+        }
+        Ok(())
+    }
+
+    /// Sorts a single coalesced batch and stores the result as a new run.
+    ///
+    /// Uses radix sort when the batch is large enough to amortize encoding
+    /// overhead (more than `batch_size` rows). Otherwise falls back to 
lexsort.
+    fn sort_and_store_run(&mut self, batch: &RecordBatch) -> Result<()> {
+        let use_radix_for_this_batch =
+            self.use_radix && batch.num_rows() > self.batch_size;
+
+        let sorted_chunks = if use_radix_for_this_batch {
+            sort_batch_chunked(batch, &self.expr, self.batch_size, true)?
+        } else {
+            vec![sort_batch(batch, &self.expr, None)?]
+        };
+
+        // After take(), StringView arrays may reference shared buffers from
+        // multiple coalesced input batches, inflating reported memory size.
+        // GC compacts them so reservation tracking stays accurate.
+        let sorted_chunks = Self::gc_stringview_batches(sorted_chunks)?;
+
+        let run_size: usize =
+            sorted_chunks.iter().map(get_record_batch_memory_size).sum();
+
+        self.sorted_runs.push(sorted_chunks);
+        self.sorted_runs_memory += run_size;
+
+        // Align reservation to actual sorted run memory with a single pool
+        // interaction. Normally 2x reservation > 1x sorted output, so we
+        // shrink. For tiny batches (single-digit rows), per-column buffer
+        // overhead can make the sorted output slightly larger — grow
+        // unconditionally since the memory is already allocated. This uses
+        // grow() which bypasses the pool limit, so the pool's tracked total
+        // may briefly exceed its limit by a small amount (tens of KB).
+        let reservation_size = self.reservation.size();
+        if reservation_size > self.sorted_runs_memory {
+            self.reservation
+                .shrink(reservation_size - self.sorted_runs_memory);
+        } else if self.sorted_runs_memory > reservation_size {
+            self.reservation
+                .grow(self.sorted_runs_memory - reservation_size);
+        }

Review Comment:
   I updated the docs to explain the scenario a bit more, but let me know if 
you still think we should do something more strict.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to