zhuqi-lucas commented on code in PR #15380: URL: https://github.com/apache/datafusion/pull/15380#discussion_r2048456865
########## datafusion/physical-plan/src/sorts/sort.rs: ########## @@ -667,35 +669,57 @@ impl ExternalSorter { // space for batches produced by the resulting stream will be reserved by the // consumer of the stream. - if self.in_mem_batches.len() == 1 { - let batch = self.in_mem_batches.swap_remove(0); - let reservation = self.reservation.take(); - return self.sort_batch_stream(batch, metrics, reservation); + let mut merged_batches = Vec::new(); + let mut current_batches = Vec::new(); + let mut current_size = 0; + + + let in_mem_batches = std::mem::take(&mut self.in_mem_batches); + // Note: + // Now we use `sort_in_place_threshold_bytes` to determine, in future we can make it more dynamic. + for batch in &in_mem_batches { + let batch_size = get_reserved_byte_for_record_batch(batch); + + // If adding this batch would exceed the memory threshold, merge current_batches. + if current_size + batch_size > self.sort_in_place_threshold_bytes + && !current_batches.is_empty() + { + self.merge_and_push_sorted_batch( + &mut current_batches, + &mut current_size, + &mut merged_batches, + )?; + current_size = 0; + } + + current_batches.push(batch.clone()); + current_size += batch_size; } - // If less than sort_in_place_threshold_bytes, concatenate and sort in place - if self.reservation.size() < self.sort_in_place_threshold_bytes { - // Concatenate memory batches together and sort - let batch = concat_batches(&self.schema, &self.in_mem_batches)?; - self.in_mem_batches.clear(); - self.reservation - .try_resize(get_reserved_byte_for_record_batch(&batch)) - .map_err(Self::err_with_oom_context)?; - let reservation = self.reservation.take(); - return self.sort_batch_stream(batch, metrics, reservation); + // Merge any remaining batches after the loop. + if !current_batches.is_empty() { + self.merge_and_push_sorted_batch( + &mut current_batches, + &mut current_size, + &mut merged_batches, + )?; } - let streams = std::mem::take(&mut self.in_mem_batches) + let streams = merged_batches Review Comment: Good catch @Dandandan , for only one batch we don't need to send to streaming merge. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For additional commands, e-mail: github-h...@datafusion.apache.org