yjshen commented on code in PR #7180:
URL: https://github.com/apache/arrow-datafusion/pull/7180#discussion_r1286412463
##########
datafusion/core/src/physical_plan/sorts/sort.rs:
##########
@@ -253,11 +256,19 @@ impl ExternalSorter {
/// Appends an unsorted [`RecordBatch`] to `in_mem_batches`
///
/// Updates memory usage metrics, and possibly triggers spilling to disk
- async fn insert_batch(&mut self, input: RecordBatch) -> Result<()> {
+ async fn insert_batch(&mut self, mut input: RecordBatch) -> Result<()> {
if input.num_rows() == 0 {
return Ok(());
}
+ let mut batch_sorted = false;
+ if self.fetch.map_or(false, |f| f < input.num_rows()) {
Review Comment:
I'm thinking can we make the heuristic `f < input.num_rows() / 10` or
something magic numbers to only do eager sort for small `K's?
##########
datafusion/core/src/physical_plan/sorts/sort.rs:
##########
@@ -639,15 +614,47 @@ fn write_sorted(
Ok(())
}
-fn read_spill(sender: Sender<Result<RecordBatch>>, path: &Path) -> Result<()> {
+/// Stream batches from spill files.
+///
+/// Each spill file has one or more batches. Intra-batch order is guaranteed
(each one is sorted),
+/// but the inter-batch ordering is not guaranteed, hence why we need to
convert each batch from the
+/// spill to a separate input stream for the merge-sort procedure.
Review Comment:
Will this produce unnecessary comparison when the inter-batch ordering is
guaranteed for the normal case without `K`?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]