zhuqi-lucas commented on code in PR #15380:
URL: https://github.com/apache/datafusion/pull/15380#discussion_r2048456865


##########
datafusion/physical-plan/src/sorts/sort.rs:
##########
@@ -667,35 +669,57 @@ impl ExternalSorter {
         // space for batches produced by the resulting stream will be reserved 
by the
         // consumer of the stream.
 
-        if self.in_mem_batches.len() == 1 {
-            let batch = self.in_mem_batches.swap_remove(0);
-            let reservation = self.reservation.take();
-            return self.sort_batch_stream(batch, metrics, reservation);
+        let mut merged_batches = Vec::new();
+        let mut current_batches = Vec::new();
+        let mut current_size = 0;
+
+
+        let in_mem_batches = std::mem::take(&mut self.in_mem_batches);
+        // Note:
+        // Now we use `sort_in_place_threshold_bytes` to determine, in future 
we can make it more dynamic.
+        for batch in &in_mem_batches {
+            let batch_size = get_reserved_byte_for_record_batch(batch);
+
+            // If adding this batch would exceed the memory threshold, merge 
current_batches.
+            if current_size + batch_size > self.sort_in_place_threshold_bytes
+                && !current_batches.is_empty()
+            {
+                self.merge_and_push_sorted_batch(
+                    &mut current_batches,
+                    &mut current_size,
+                    &mut merged_batches,
+                )?;
+                current_size = 0;
+            }
+
+            current_batches.push(batch.clone());
+            current_size += batch_size;
         }
 
-        // If less than sort_in_place_threshold_bytes, concatenate and sort in 
place
-        if self.reservation.size() < self.sort_in_place_threshold_bytes {
-            // Concatenate memory batches together and sort
-            let batch = concat_batches(&self.schema, &self.in_mem_batches)?;
-            self.in_mem_batches.clear();
-            self.reservation
-                .try_resize(get_reserved_byte_for_record_batch(&batch))
-                .map_err(Self::err_with_oom_context)?;
-            let reservation = self.reservation.take();
-            return self.sort_batch_stream(batch, metrics, reservation);
+        // Merge any remaining batches after the loop.
+        if !current_batches.is_empty() {
+            self.merge_and_push_sorted_batch(
+                &mut current_batches,
+                &mut current_size,
+                &mut merged_batches,
+            )?;
         }
 
-        let streams = std::mem::take(&mut self.in_mem_batches)
+        let streams = merged_batches

Review Comment:
   Good catch @Dandandan , for only one batch we don't need to send to 
streaming merge.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org
For additional commands, e-mail: github-h...@datafusion.apache.org

Reply via email to