isidentical commented on code in PR #3593: URL: https://github.com/apache/arrow-datafusion/pull/3593#discussion_r978935000
########## datafusion/core/src/physical_plan/sorts/sort.rs: ########## @@ -124,6 +124,21 @@ impl ExternalSorter { // calls to `timer.done()` below. let _timer = tracking_metrics.elapsed_compute().timer(); let partial = sort_batch(input, self.schema.clone(), &self.expr, self.fetch)?; + // The resulting batch might be smaller than the input batch if there + // is an propagated limit. + + if self.fetch.is_some() { + let new_size = batch_byte_size(&partial.sorted_batch); + let size_delta = size.checked_sub(new_size).ok_or_else(|| { + DataFusionError::Internal(format!( + "The size of the sorted batch is larger than the size of the input batch: {} > {}", + size, + new_size + )) + })?; + self.shrink(size_delta); + self.metrics.mem_used().sub(size_delta); Review Comment: @Dandandan I was trying a bunch of methods to write a reliable test, but it seems like without accessing to the internal implementation details (creating a stream, feeding it one by one, comparing the size of internal memory metrics with the total size of in memory batches) it seems to be very tricky to do so (with only using the public stuff, like the `SortExec`). I can also try to write a test case to ensure a specific usage does not spill (which would normally spill without a limit), but that also seems like would test a factor of conditions (the initial TopK implementation, if implemented #3579, this PR etc. they all try to prevent spills) and not this fix as is. If the spilling/not-spilling one sounds good, I can submit it (or if you have any other suggestions, would happy to also give them a try). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@arrow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org