Dandandan commented on code in PR #15380: URL: https://github.com/apache/datafusion/pull/15380#discussion_r2047526844
########## datafusion/physical-plan/src/sorts/sort.rs: ########## @@ -673,29 +676,211 @@ impl ExternalSorter { return self.sort_batch_stream(batch, metrics, reservation); } - // If less than sort_in_place_threshold_bytes, concatenate and sort in place - if self.reservation.size() < self.sort_in_place_threshold_bytes { - // Concatenate memory batches together and sort - let batch = concat_batches(&self.schema, &self.in_mem_batches)?; - self.in_mem_batches.clear(); - self.reservation - .try_resize(get_reserved_byte_for_record_batch(&batch)) - .map_err(Self::err_with_oom_context)?; - let reservation = self.reservation.take(); - return self.sort_batch_stream(batch, metrics, reservation); + let mut merged_batches = Vec::new(); + let mut current_batches = Vec::new(); + let mut current_size = 0; + + // Drain in_mem_batches using pop() to release memory earlier. + // This avoids holding onto the entire vector during iteration. + // Note: + // Now we use `sort_in_place_threshold_bytes` to determine, in future we can make it more dynamic. + while let Some(batch) = self.in_mem_batches.pop() { + let batch_size = get_reserved_byte_for_record_batch(&batch); + + // If adding this batch would exceed the memory threshold, merge current_batches. + if current_size + batch_size > self.sort_in_place_threshold_bytes + && !current_batches.is_empty() + { + // ===== Phase 1: Build global sort columns for each sort expression ===== + // For each sort expression, evaluate and collect the corresponding sort column from each in-memory batch + // Here, `self.expr` is a list of sort expressions, each providing `evaluate_to_sort_column()`, + // which returns an ArrayRef (in `.values`) and sort options (`options`) + + /// ```text + /// columns_by_expr for example: + /// ├── expr_0 ──┬── ArrayRef_0_0 (from batch_0) + /// │ ├── ArrayRef_0_1 (from batch_1) + /// │ └── ArrayRef_0_2 (from batch_2) + /// │ + /// └── expr_1 ──┬── ArrayRef_1_0 (from batch_0) + /// ├── ArrayRef_1_1 (from batch_1) + /// └── ArrayRef_1_2 (from batch_2) + /// ``` + let mut columns_by_expr: Vec<Vec<ArrayRef>> = self + .expr + .iter() + .map(|_| Vec::with_capacity(current_batches.len())) + .collect(); + + let mut total_rows = 0; Review Comment: `total_rows` doesn't seem used -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For additional commands, e-mail: github-h...@datafusion.apache.org