zhuqi-lucas commented on code in PR #15380:
URL: https://github.com/apache/datafusion/pull/15380#discussion_r2048731630


##########
datafusion/physical-plan/src/sorts/sort.rs:
##########
@@ -667,35 +668,81 @@ impl ExternalSorter {
         // space for batches produced by the resulting stream will be reserved 
by the
         // consumer of the stream.
 
-        if self.in_mem_batches.len() == 1 {
-            let batch = self.in_mem_batches.swap_remove(0);
-            let reservation = self.reservation.take();
-            return self.sort_batch_stream(batch, metrics, reservation);
-        }
+        let mut merged_batches = Vec::new();
+        let mut current_batches = Vec::new();
+        let mut current_size = 0;
 
         // If less than sort_in_place_threshold_bytes, concatenate and sort in 
place
         if self.reservation.size() < self.sort_in_place_threshold_bytes {
-            // Concatenate memory batches together and sort
-            let batch = concat_batches(&self.schema, &self.in_mem_batches)?;
+            let interleave_indices = self.build_sorted_indices(
+                self.in_mem_batches.as_slice(),
+                Arc::clone(&self.expr),
+            )?;
+
+            let batches: Vec<&RecordBatch> = 
self.in_mem_batches.iter().collect();
+            let sorted_batch = interleave_record_batch(&batches, 
&interleave_indices)?;
+
             self.in_mem_batches.clear();
             self.reservation
-                .try_resize(get_reserved_byte_for_record_batch(&batch))
+                .try_resize(get_reserved_byte_for_record_batch(&sorted_batch))
                 .map_err(Self::err_with_oom_context)?;
+
+            metrics.record_output(sorted_batch.num_columns());
             let reservation = self.reservation.take();
-            return self.sort_batch_stream(batch, metrics, reservation);
+            drop(reservation);
+
+            return Ok(Box::pin(RecordBatchStreamAdapter::new(
+                Arc::clone(&self.schema),
+                futures::stream::once(async { Ok(sorted_batch) }),
+            )) as SendableRecordBatchStream);
+        }
+
+        let in_mem_batches = std::mem::take(&mut self.in_mem_batches);
+        // Note:
+        // Now we use `sort_in_place_threshold_bytes` to determine, in future 
we can make it more dynamic.
+        for batch in &in_mem_batches {
+            let batch_size = get_reserved_byte_for_record_batch(batch);
+
+            // If adding this batch would exceed the memory threshold, merge 
current_batches.
+            if current_size + batch_size > self.sort_in_place_threshold_bytes
+                && !current_batches.is_empty()
+            {
+                self.merge_and_push_sorted_batch(
+                    &mut current_batches,
+                    &mut current_size,
+                    &mut merged_batches,
+                )?;
+                current_size = 0;
+            }
+
+            current_batches.push(batch.clone());

Review Comment:
   We merge for all batches for each sort_in_place_threshold_bytes. So it's a 
loop for merge.
   
   If we only do for < sort_in_place_threshold_bytes, i can't see too much 
improvement.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org
For additional commands, e-mail: github-h...@datafusion.apache.org

Reply via email to