alamb commented on code in PR #2132:
URL: https://github.com/apache/arrow-datafusion/pull/2132#discussion_r842645709
##########
datafusion/core/src/physical_plan/sorts/sort.rs:
##########
@@ -271,33 +291,212 @@ fn in_mem_partial_sort(
buffered_batches: &mut Vec<RecordBatch>,
schema: SchemaRef,
expressions: &[PhysicalSortExpr],
+ batch_size: usize,
tracking_metrics: MemTrackingMetrics,
) -> Result<SendableRecordBatchStream> {
assert_ne!(buffered_batches.len(), 0);
+ if buffered_batches.len() == 1 {
+ let result = buffered_batches.pop();
+ Ok(Box::pin(SizedRecordBatchStream::new(
+ schema,
+ vec![Arc::new(result.unwrap())],
+ tracking_metrics,
+ )))
+ } else {
+ let batches = buffered_batches.drain(..).collect::<Vec<_>>();
+ let sorted_iter = {
+ // NB timer records time taken on drop, so there are no
+ // calls to `timer.done()` below.
+ let _timer = tracking_metrics.elapsed_compute().timer();
+ get_sorted_iter(&batches, expressions, batch_size)?
+ };
+ Ok(Box::pin(SortedSizedRecordBatchStream::new(
+ schema,
+ batches,
+ sorted_iter,
+ tracking_metrics,
+ )))
+ }
+}
- let result = {
- // NB timer records time taken on drop, so there are no
- // calls to `timer.done()` below.
- let _timer = tracking_metrics.elapsed_compute().timer();
+fn get_sorted_iter(
Review Comment:
I would say that the peak memory usage is no different in the worst case
(all columns are being sorted) as the implementation on master copies the
entire input into a new record batch with all columns
However, in the common case where not all columns are part of the sort key,
this implementation will use significantly less peak memory
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]