zhuqi-lucas commented on code in PR #15380:
URL: https://github.com/apache/datafusion/pull/15380#discussion_r2049125883
##########
datafusion/physical-plan/src/sorts/sort.rs:
##########
@@ -662,53 +665,152 @@ impl ExternalSorter {
let elapsed_compute = metrics.elapsed_compute().clone();
let _timer = elapsed_compute.timer();
- // Please pay attention that any operation inside of
`in_mem_sort_stream` will
- // not perform any memory reservation. This is for avoiding the need
of handling
- // reservation failure and spilling in the middle of the sort/merge.
The memory
- // space for batches produced by the resulting stream will be reserved
by the
- // consumer of the stream.
+ // Note, in theory in memory batches should have limited size, but
some testing
+ // cases testing the memory limit use `sort_in_place_threshold_bytes`
to, so here we
+ // set a larger limit to avoid testing failure.
+ if self.expr.len() <= 2
+ && self.reservation.size() < 1000 *
self.sort_in_place_threshold_bytes
+ {
+ let interleave_indices = self.build_sorted_indices(
+ self.in_mem_batches.as_slice(),
+ Arc::clone(&self.expr),
+ )?;
- if self.in_mem_batches.len() == 1 {
- let batch = self.in_mem_batches.swap_remove(0);
- let reservation = self.reservation.take();
- return self.sort_batch_stream(batch, metrics, reservation);
- }
+ let batches: Vec<&RecordBatch> =
self.in_mem_batches.iter().collect();
+ let sorted_batch = interleave_record_batch(&batches,
&interleave_indices)?;
- // If less than sort_in_place_threshold_bytes, concatenate and sort in
place
- if self.reservation.size() < self.sort_in_place_threshold_bytes {
- // Concatenate memory batches together and sort
- let batch = concat_batches(&self.schema, &self.in_mem_batches)?;
self.in_mem_batches.clear();
self.reservation
- .try_resize(get_reserved_byte_for_record_batch(&batch))
+ .try_resize(get_reserved_byte_for_record_batch(&sorted_batch))
.map_err(Self::err_with_oom_context)?;
- let reservation = self.reservation.take();
- return self.sort_batch_stream(batch, metrics, reservation);
+
+ metrics.record_output(sorted_batch.num_rows());
+
+ Ok(Box::pin(RecordBatchStreamAdapter::new(
+ Arc::clone(&self.schema),
+ futures::stream::once(async { Ok(sorted_batch) }),
+ )) as SendableRecordBatchStream)
+ } else {
+ // Please pay attention that any operation inside of
`in_mem_sort_stream` will
+ // not perform any memory reservation. This is for avoiding the
need of handling
+ // reservation failure and spilling in the middle of the
sort/merge. The memory
+ // space for batches produced by the resulting stream will be
reserved by the
+ // consumer of the stream.
+
+ if self.in_mem_batches.len() == 1 {
+ let batch = self.in_mem_batches.swap_remove(0);
+ let reservation = self.reservation.take();
+ return self.sort_batch_stream(batch, metrics, reservation);
+ }
+
+ // If less than sort_in_place_threshold_bytes, concatenate and
sort in place
+ if self.reservation.size() < self.sort_in_place_threshold_bytes {
+ // Concatenate memory batches together and sort
Review Comment:
Thank you for the feedback @Dandandan,
Now the logic is:
1. self.expr.len() <= 2, we merge all the memory batch into one batch, and
don't send to merge streaming
2. Other cases keep original cases.
Because, i can't see performance gain for interleave after setting to when
the sort column >2 from the benchmark result, so i don't change the original
concatenate here.
And more challenges now are the testing fails and memory model. Because we
return one single batch for the above case1, so it will not have partial merge
for many cases, only the final merge, so the memory model will fail for some
cases.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]