Dandandan commented on code in PR #2132:
URL: https://github.com/apache/arrow-datafusion/pull/2132#discussion_r842129926


##########
datafusion/core/src/physical_plan/sorts/sort.rs:
##########
@@ -268,36 +282,220 @@ impl MemoryConsumer for ExternalSorter {
 
 /// consume the non-empty `sorted_bathes` and do in_mem_sort
 fn in_mem_partial_sort(
-    buffered_batches: &mut Vec<RecordBatch>,
+    buffered_batches: &mut Vec<BatchWithSortArray>,
     schema: SchemaRef,
     expressions: &[PhysicalSortExpr],
+    batch_size: usize,
     tracking_metrics: MemTrackingMetrics,
 ) -> Result<SendableRecordBatchStream> {
     assert_ne!(buffered_batches.len(), 0);
+    if buffered_batches.len() == 1 {
+        let result = buffered_batches.pop();
+        Ok(Box::pin(SizedRecordBatchStream::new(
+            schema,
+            vec![Arc::new(result.unwrap().sorted_batch)],
+            tracking_metrics,
+        )))
+    } else {
+        let (sorted_arrays, batches): (Vec<Vec<ArrayRef>>, Vec<RecordBatch>) =
+            buffered_batches
+                .drain(..)
+                .into_iter()
+                .map(|b| {
+                    let BatchWithSortArray {
+                        sort_arrays,
+                        sorted_batch: batch,
+                    } = b;
+                    (sort_arrays, batch)
+                })
+                .unzip();
+
+        let sorted_iter = {
+            // NB timer records time taken on drop, so there are no
+            // calls to `timer.done()` below.
+            let _timer = tracking_metrics.elapsed_compute().timer();
+            get_sorted_iter(&sorted_arrays, expressions, batch_size)?
+        };
+        Ok(Box::pin(SortedSizedRecordBatchStream::new(
+            schema,
+            batches,
+            sorted_iter,
+            tracking_metrics,
+        )))
+    }
+}
 
-    let result = {
-        // NB timer records time taken on drop, so there are no
-        // calls to `timer.done()` below.
-        let _timer = tracking_metrics.elapsed_compute().timer();
+#[derive(Debug, Copy, Clone)]
+struct CompositeIndex {
+    batch_idx: u32,
+    row_idx: u32,
+}
 
-        let pre_sort = if buffered_batches.len() == 1 {
-            buffered_batches.pop()
-        } else {
-            let batches = buffered_batches.drain(..).collect::<Vec<_>>();
-            // combine all record batches into one for each column
-            common::combine_batches(&batches, schema.clone())?
-        };
+/// Get sorted iterator by sort concatenated `SortColumn`s
+fn get_sorted_iter(
+    sort_arrays: &[Vec<ArrayRef>],
+    expr: &[PhysicalSortExpr],
+    batch_size: usize,
+) -> Result<SortedIterator> {
+    let row_indices = sort_arrays
+        .iter()
+        .enumerate()
+        .flat_map(|(i, arrays)| {
+            (0..arrays[0].len())
+                .map(|r| CompositeIndex {
+                    // since we original use UInt32Array to index the combined 
mono batch,
+                    // component record batches won't overflow as well,
+                    // use u32 here for space efficiency.
+                    batch_idx: i as u32,
+                    row_idx: r as u32,
+                })
+                .collect::<Vec<CompositeIndex>>()

Review Comment:
   Doing a collect here while flattening it later seems doing some extra 
allocations. I think we should be able to generate this in one go?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to