Dandandan commented on code in PR #15380:
URL: https://github.com/apache/datafusion/pull/15380#discussion_r2047540213


##########
datafusion/physical-plan/src/sorts/sort.rs:
##########
@@ -673,29 +676,211 @@ impl ExternalSorter {
             return self.sort_batch_stream(batch, metrics, reservation);
         }
 
-        // If less than sort_in_place_threshold_bytes, concatenate and sort in 
place
-        if self.reservation.size() < self.sort_in_place_threshold_bytes {
-            // Concatenate memory batches together and sort
-            let batch = concat_batches(&self.schema, &self.in_mem_batches)?;
-            self.in_mem_batches.clear();
-            self.reservation
-                .try_resize(get_reserved_byte_for_record_batch(&batch))
-                .map_err(Self::err_with_oom_context)?;
-            let reservation = self.reservation.take();
-            return self.sort_batch_stream(batch, metrics, reservation);
+        let mut merged_batches = Vec::new();
+        let mut current_batches = Vec::new();
+        let mut current_size = 0;
+
+        // Drain in_mem_batches using pop() to release memory earlier.
+        // This avoids holding onto the entire vector during iteration.
+        // Note:
+        // Now we use `sort_in_place_threshold_bytes` to determine, in future 
we can make it more dynamic.
+        while let Some(batch) = self.in_mem_batches.pop() {
+            let batch_size = get_reserved_byte_for_record_batch(&batch);
+
+            // If adding this batch would exceed the memory threshold, merge 
current_batches.
+            if current_size + batch_size > self.sort_in_place_threshold_bytes
+                && !current_batches.is_empty()
+            {
+                // ===== Phase 1: Build global sort columns for each sort 
expression =====
+                // For each sort expression, evaluate and collect the 
corresponding sort column from each in-memory batch
+                // Here, `self.expr` is a list of sort expressions, each 
providing `evaluate_to_sort_column()`,
+                // which returns an ArrayRef (in `.values`) and sort options 
(`options`)
+
+                /// ```text
+                /// columns_by_expr for example:
+                /// ├── expr_0 ──┬── ArrayRef_0_0 (from batch_0)
+                /// │            ├── ArrayRef_0_1 (from batch_1)
+                /// │            └── ArrayRef_0_2 (from batch_2)
+                /// │
+                /// └── expr_1 ──┬── ArrayRef_1_0 (from batch_0)
+                ///              ├── ArrayRef_1_1 (from batch_1)
+                ///              └── ArrayRef_1_2 (from batch_2)
+                /// ```
+                let mut columns_by_expr: Vec<Vec<ArrayRef>> = self
+                    .expr
+                    .iter()
+                    .map(|_| Vec::with_capacity(current_batches.len()))
+                    .collect();
+
+                let mut total_rows = 0;
+                for batch in &current_batches {
+                    for (i, expr) in self.expr.iter().enumerate() {
+                        let col = expr.evaluate_to_sort_column(batch)?.values;
+                        columns_by_expr[i].push(col);
+                    }
+                    total_rows += batch.num_rows();
+                }
+
+                // For each sort expression, concatenate arrays from all 
batches into one global array
+                let mut sort_columns = Vec::with_capacity(self.expr.len());
+                for (arrays, expr) in 
columns_by_expr.into_iter().zip(self.expr.iter()) {
+                    let array = concat(
+                        &arrays
+                            .iter()
+                            .map(|a| a.as_ref())
+                            .collect::<Vec<&dyn Array>>(),
+                    )?;
+                    sort_columns.push(SortColumn {
+                        values: array,
+                        options: expr.options.into(),
+                    });
+                }
+
+                // ===== Phase 2: Compute global sorted indices =====
+                // Use `lexsort_to_indices` to get global row indices in 
sorted order (as if all batches were concatenated)
+
+                let indices = lexsort_to_indices(&sort_columns, None)?;
+
+                // ===== Phase 3: Reorder each column using the global sorted 
indices =====
+                let batch_indices: Vec<(usize, usize)> = current_batches
+                    .iter()
+                    .enumerate()
+                    .map(|(batch_id, batch)| {
+                        (0..batch.num_rows()).map(move |i| (batch_id, i))
+                    })
+                    .flatten()
+                    .collect();
+
+                // For each column:
+                // 1. Concatenate all batch arrays for this column (in the 
same order as assumed by `lexsort_to_indices`)
+                // 2. Use Arrow's `take` function to reorder the column by 
sorted indices
+                let interleave_indices: Vec<(usize, usize)> = indices
+                    .values()
+                    .iter()
+                    .map(|x| batch_indices[*x as usize])
+                    .collect();
+                // Build a RecordBatch from the sorted columns
+
+                let batches: Vec<&RecordBatch> = 
current_batches.iter().collect();
+
+                let sorted_batch =
+                    interleave_record_batch(batches.as_ref(), 
&interleave_indices)?;
+
+                current_batches.clear();
+
+                // Update memory reservation.
+                self.reservation.try_shrink(current_size)?;
+                let merged_size = 
get_reserved_byte_for_record_batch(&sorted_batch);
+                self.reservation.try_grow(merged_size)?;
+
+                merged_batches.push(sorted_batch);
+                current_size = 0;
+            }
+
+            current_batches.push(batch);
+            current_size += batch_size;
+        }
+
+        // Merge any remaining batches after the loop.
+        if !current_batches.is_empty() {
+            // ===== Phase 1: Build global sort columns for each sort 
expression =====
+            // For each sort expression, evaluate and collect the 
corresponding sort column from each in-memory batch
+            // Here, `self.expr` is a list of sort expressions, each providing 
`evaluate_to_sort_column()`,
+            // which returns an ArrayRef (in `.values`) and sort options 
(`options`)
+
+            /// ```text
+            /// columns_by_expr for example:
+            /// ├── expr_0 ──┬── ArrayRef_0_0 (from batch_0)
+            /// │            ├── ArrayRef_0_1 (from batch_1)
+            /// │            └── ArrayRef_0_2 (from batch_2)
+            /// │
+            /// └── expr_1 ──┬── ArrayRef_1_0 (from batch_0)
+            ///              ├── ArrayRef_1_1 (from batch_1)
+            ///              └── ArrayRef_1_2 (from batch_2)
+            /// ```
+            let mut columns_by_expr: Vec<Vec<ArrayRef>> = self

Review Comment:
   I am sure this can be moved to a function that sorts a list of batches (as 
the code is the same / almost the same)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org
For additional commands, e-mail: github-h...@datafusion.apache.org

Reply via email to