Dandandan commented on code in PR #15380:
URL: https://github.com/apache/datafusion/pull/15380#discussion_r2047526844


##########
datafusion/physical-plan/src/sorts/sort.rs:
##########
@@ -673,29 +676,211 @@ impl ExternalSorter {
             return self.sort_batch_stream(batch, metrics, reservation);
         }
 
-        // If less than sort_in_place_threshold_bytes, concatenate and sort in 
place
-        if self.reservation.size() < self.sort_in_place_threshold_bytes {
-            // Concatenate memory batches together and sort
-            let batch = concat_batches(&self.schema, &self.in_mem_batches)?;
-            self.in_mem_batches.clear();
-            self.reservation
-                .try_resize(get_reserved_byte_for_record_batch(&batch))
-                .map_err(Self::err_with_oom_context)?;
-            let reservation = self.reservation.take();
-            return self.sort_batch_stream(batch, metrics, reservation);
+        let mut merged_batches = Vec::new();
+        let mut current_batches = Vec::new();
+        let mut current_size = 0;
+
+        // Drain in_mem_batches using pop() to release memory earlier.
+        // This avoids holding onto the entire vector during iteration.
+        // Note:
+        // Now we use `sort_in_place_threshold_bytes` to determine, in future 
we can make it more dynamic.
+        while let Some(batch) = self.in_mem_batches.pop() {
+            let batch_size = get_reserved_byte_for_record_batch(&batch);
+
+            // If adding this batch would exceed the memory threshold, merge 
current_batches.
+            if current_size + batch_size > self.sort_in_place_threshold_bytes
+                && !current_batches.is_empty()
+            {
+                // ===== Phase 1: Build global sort columns for each sort 
expression =====
+                // For each sort expression, evaluate and collect the 
corresponding sort column from each in-memory batch
+                // Here, `self.expr` is a list of sort expressions, each 
providing `evaluate_to_sort_column()`,
+                // which returns an ArrayRef (in `.values`) and sort options 
(`options`)
+
+                /// ```text
+                /// columns_by_expr for example:
+                /// ├── expr_0 ──┬── ArrayRef_0_0 (from batch_0)
+                /// │            ├── ArrayRef_0_1 (from batch_1)
+                /// │            └── ArrayRef_0_2 (from batch_2)
+                /// │
+                /// └── expr_1 ──┬── ArrayRef_1_0 (from batch_0)
+                ///              ├── ArrayRef_1_1 (from batch_1)
+                ///              └── ArrayRef_1_2 (from batch_2)
+                /// ```
+                let mut columns_by_expr: Vec<Vec<ArrayRef>> = self
+                    .expr
+                    .iter()
+                    .map(|_| Vec::with_capacity(current_batches.len()))
+                    .collect();
+
+                let mut total_rows = 0;

Review Comment:
   `total_rows` doesn't seem used



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org
For additional commands, e-mail: github-h...@datafusion.apache.org

Reply via email to