tustvold commented on code in PR #5851:
URL: https://github.com/apache/arrow-datafusion/pull/5851#discussion_r1156209690


##########
datafusion/core/src/physical_plan/sorts/sort.rs:
##########
@@ -276,279 +276,42 @@ impl Debug for ExternalSorter {
 
 /// consume the non-empty `sorted_batches` and do in_mem_sort
 fn in_mem_partial_sort(
-    buffered_batches: &mut Vec<BatchWithSortArray>,
+    buffered_batches: &mut Vec<RecordBatch>,
     schema: SchemaRef,
     expressions: &[PhysicalSortExpr],
     batch_size: usize,
     tracking_metrics: MemTrackingMetrics,
-    fetch: Option<usize>,
+    _fetch: Option<usize>,
 ) -> Result<SendableRecordBatchStream> {
-    assert_ne!(buffered_batches.len(), 0);
-    if buffered_batches.len() == 1 {
-        let result = buffered_batches.pop();
-        Ok(Box::pin(SizedRecordBatchStream::new(
-            schema,
-            vec![Arc::new(result.unwrap().sorted_batch)],
-            tracking_metrics,
-        )))
-    } else {
-        let (sorted_arrays, batches): (Vec<Vec<ArrayRef>>, Vec<RecordBatch>) =
-            buffered_batches
-                .drain(..)
-                .map(|b| {
-                    let BatchWithSortArray {
-                        sort_arrays,
-                        sorted_batch: batch,
-                    } = b;
-                    (sort_arrays, batch)
-                })
-                .unzip();
-
-        let sorted_iter = {
-            // NB timer records time taken on drop, so there are no
-            // calls to `timer.done()` below.
-            let _timer = tracking_metrics.elapsed_compute().timer();
-            get_sorted_iter(&sorted_arrays, expressions, batch_size, fetch)?
-        };
-        Ok(Box::pin(SortedSizedRecordBatchStream::new(
-            schema,
-            batches,
-            sorted_iter,
-            tracking_metrics,
-        )))
+    if buffered_batches.len() < 2 {
+        let batches: Vec<_> = buffered_batches.drain(..).collect();

Review Comment:
   It is worth highlighting why this is important to the benchmarks, as there 
was much discussion of this on 
https://github.com/apache/arrow-datafusion/issues/5230
   
   The way the "preserve partitioning" benchmarks are setup is they yield a 
single RecordBatch per partition, they're effectively a special case where the 
in_mem_partial_sort is a no-op.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@arrow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to