This is an automated email from the ASF dual-hosted git repository.
ozankabak pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new 7b4e5598a5 Minor: Avoid emitting empty batches in partial sort (#13895)
7b4e5598a5 is described below
commit 7b4e5598a5d3e95a6c0dfcb9375f50778a2b2f64
Author: Berkay Şahin <[email protected]>
AuthorDate: Wed Dec 25 10:20:35 2024 +0300
Minor: Avoid emitting empty batches in partial sort (#13895)
* Update partial_sort.rs
* Update partial_sort.rs
* Update partial_sort.rs
---
datafusion/physical-plan/src/sorts/partial_sort.rs | 25 +++++++++++++++-------
1 file changed, 17 insertions(+), 8 deletions(-)
diff --git a/datafusion/physical-plan/src/sorts/partial_sort.rs
b/datafusion/physical-plan/src/sorts/partial_sort.rs
index f14ba6606e..c838376a48 100644
--- a/datafusion/physical-plan/src/sorts/partial_sort.rs
+++ b/datafusion/physical-plan/src/sorts/partial_sort.rs
@@ -366,7 +366,7 @@ impl PartialSortStream {
return Poll::Ready(None);
}
loop {
- return Poll::Ready(Some(match
ready!(self.input.poll_next_unpin(cx)) {
+ return Poll::Ready(match ready!(self.input.poll_next_unpin(cx)) {
Some(Ok(batch)) => {
if let Some(slice_point) =
self.get_slice_point(self.common_prefix_length,
&batch)?
@@ -374,21 +374,33 @@ impl PartialSortStream {
self.in_mem_batches.push(batch.slice(0, slice_point));
let remaining_batch =
batch.slice(slice_point, batch.num_rows() -
slice_point);
+ // Extract the sorted batch
let sorted_batch = self.sort_in_mem_batches();
+ // Refill with the remaining batch
self.in_mem_batches.push(remaining_batch);
- sorted_batch
+
+ debug_assert!(sorted_batch
+ .as_ref()
+ .map(|batch| batch.num_rows() > 0)
+ .unwrap_or(true));
+ Some(sorted_batch)
} else {
self.in_mem_batches.push(batch);
continue;
}
}
- Some(Err(e)) => Err(e),
+ Some(Err(e)) => Some(Err(e)),
None => {
self.is_closed = true;
// once input is consumed, sort the rest of the inserted
batches
- self.sort_in_mem_batches()
+ let remaining_batch = self.sort_in_mem_batches()?;
+ if remaining_batch.num_rows() > 0 {
+ Some(Ok(remaining_batch))
+ } else {
+ None
+ }
}
- }));
+ });
}
}
@@ -409,9 +421,6 @@ impl PartialSortStream {
self.is_closed = true;
}
}
- // Empty record batches should not be emitted.
- // They need to be treated as [`Option<RecordBatch>`]es and handle
separately
- debug_assert!(result.num_rows() > 0);
Ok(result)
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]