This is an automated email from the ASF dual-hosted git repository.

ozankabak pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new 7b4e5598a5 Minor: Avoid emitting empty batches in partial sort (#13895)
7b4e5598a5 is described below

commit 7b4e5598a5d3e95a6c0dfcb9375f50778a2b2f64
Author: Berkay Şahin <[email protected]>
AuthorDate: Wed Dec 25 10:20:35 2024 +0300

    Minor: Avoid emitting empty batches in partial sort (#13895)
    
    * Update partial_sort.rs
    
    * Update partial_sort.rs
    
    * Update partial_sort.rs
---
 datafusion/physical-plan/src/sorts/partial_sort.rs | 25 +++++++++++++++-------
 1 file changed, 17 insertions(+), 8 deletions(-)

diff --git a/datafusion/physical-plan/src/sorts/partial_sort.rs 
b/datafusion/physical-plan/src/sorts/partial_sort.rs
index f14ba6606e..c838376a48 100644
--- a/datafusion/physical-plan/src/sorts/partial_sort.rs
+++ b/datafusion/physical-plan/src/sorts/partial_sort.rs
@@ -366,7 +366,7 @@ impl PartialSortStream {
             return Poll::Ready(None);
         }
         loop {
-            return Poll::Ready(Some(match 
ready!(self.input.poll_next_unpin(cx)) {
+            return Poll::Ready(match ready!(self.input.poll_next_unpin(cx)) {
                 Some(Ok(batch)) => {
                     if let Some(slice_point) =
                         self.get_slice_point(self.common_prefix_length, 
&batch)?
@@ -374,21 +374,33 @@ impl PartialSortStream {
                         self.in_mem_batches.push(batch.slice(0, slice_point));
                         let remaining_batch =
                             batch.slice(slice_point, batch.num_rows() - 
slice_point);
+                        // Extract the sorted batch
                         let sorted_batch = self.sort_in_mem_batches();
+                        // Refill with the remaining batch
                         self.in_mem_batches.push(remaining_batch);
-                        sorted_batch
+
+                        debug_assert!(sorted_batch
+                            .as_ref()
+                            .map(|batch| batch.num_rows() > 0)
+                            .unwrap_or(true));
+                        Some(sorted_batch)
                     } else {
                         self.in_mem_batches.push(batch);
                         continue;
                     }
                 }
-                Some(Err(e)) => Err(e),
+                Some(Err(e)) => Some(Err(e)),
                 None => {
                     self.is_closed = true;
                     // once input is consumed, sort the rest of the inserted 
batches
-                    self.sort_in_mem_batches()
+                    let remaining_batch = self.sort_in_mem_batches()?;
+                    if remaining_batch.num_rows() > 0 {
+                        Some(Ok(remaining_batch))
+                    } else {
+                        None
+                    }
                 }
-            }));
+            });
         }
     }
 
@@ -409,9 +421,6 @@ impl PartialSortStream {
                 self.is_closed = true;
             }
         }
-        // Empty record batches should not be emitted.
-        // They need to be treated as [`Option<RecordBatch>`]es and handle 
separately
-        debug_assert!(result.num_rows() > 0);
         Ok(result)
     }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to