gruuya commented on code in PR #7180:
URL: https://github.com/apache/arrow-datafusion/pull/7180#discussion_r1286970765
##########
datafusion/core/src/physical_plan/sorts/sort.rs:
##########
@@ -465,16 +484,23 @@ impl ExternalSorter {
// This is a very rough heuristic and likely could be refined further
if self.reservation.size() < 1048576 {
// Concatenate memory batches together and sort
- let batch = concat_batches(&self.schema, &self.in_mem_batches)?;
+ let (_, batches): (Vec<bool>, Vec<RecordBatch>) =
+ std::mem::take(&mut self.in_mem_batches).into_iter().unzip();
+ let batch = concat_batches(&self.schema, &batches)?;
self.in_mem_batches.clear();
- return self.sort_batch_stream(batch, metrics);
+ // Even if all individual batches were themselves sorted the
resulting concatenated one
+ // isn't guaranteed to be sorted, so we must perform sorting on
the stream.
+ return self.sort_batch_stream(batch, false, metrics);
Review Comment:
So I did try to test this approach as well, and then saw some improvements
that seemed too good to be true. I went and re-ran the benchmarks again and the
improvements held, until they didn't at some point 🤷🏻♂️ (fwiw I'm running the
benchmarks on a cloud VM, not dedicated hardware).
In hindsight, the sorting benchmarks actually do not use a memory limit and
so there were no spills and this code path wasn't exercised. I did try running
the benchmarks with memory limits on, but then I hit `Dictionary replacement
detected when writing IPC file format.` arrow error during spilling. It seems
like this is a general problem as it happens on the main branch too, though I
haven't investigated further.
Either way, I'll add this check now even without doing benchmarking on it
because it seems it can only help.
##########
datafusion/core/src/physical_plan/sorts/sort.rs:
##########
@@ -465,16 +484,23 @@ impl ExternalSorter {
// This is a very rough heuristic and likely could be refined further
if self.reservation.size() < 1048576 {
// Concatenate memory batches together and sort
- let batch = concat_batches(&self.schema, &self.in_mem_batches)?;
+ let (_, batches): (Vec<bool>, Vec<RecordBatch>) =
+ std::mem::take(&mut self.in_mem_batches).into_iter().unzip();
+ let batch = concat_batches(&self.schema, &batches)?;
self.in_mem_batches.clear();
- return self.sort_batch_stream(batch, metrics);
+ // Even if all individual batches were themselves sorted the
resulting concatenated one
+ // isn't guaranteed to be sorted, so we must perform sorting on
the stream.
+ return self.sort_batch_stream(batch, false, metrics);
Review Comment:
So I did try to test this approach as well, and then saw some improvements
that seemed too good to be true. I went and re-ran the benchmarks again and the
improvements held, until they didn't at some point 🤷🏻♂️ (fwiw I'm running the
benchmarks on a cloud VM, not dedicated hardware).
In hindsight, the sorting benchmarks actually do not use a memory limit and
so there were no spills and this code path wasn't exercised. I did try running
the benchmarks with memory limits on, but then I hit `Dictionary replacement
detected when writing IPC file format.` arrow error during spilling. It seems
like this is a general problem as it happens on the main branch too, though I
haven't investigated further.
Either way, I'll add this check now even without doing benchmarking on it
because it seems it can only help.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]