This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git


The following commit(s) were added to refs/heads/master by this push:
     new 6170c51  ARROW-11784: [Rust][DataFusion] CoalesceBatchesStream doesn't 
honor Stream interface
6170c51 is described below

commit 6170c5190feab2f585e65e989957bfc67ad6bd52
Author: Ximo Guanter <[email protected]>
AuthorDate: Fri Feb 26 06:09:47 2021 -0500

    ARROW-11784: [Rust][DataFusion] CoalesceBatchesStream doesn't honor Stream 
interface
    
    Unit tests now cover the bug to avoid regressions.
    
    Closes #9574 from edrevo/fix-coalescebatchesstream
    
    Authored-by: Ximo Guanter <[email protected]>
    Signed-off-by: Andrew Lamb <[email protected]>
---
 rust/datafusion/src/physical_plan/coalesce_batches.rs | 13 +++++++++++--
 1 file changed, 11 insertions(+), 2 deletions(-)

diff --git a/rust/datafusion/src/physical_plan/coalesce_batches.rs 
b/rust/datafusion/src/physical_plan/coalesce_batches.rs
index 9f36fd8..b91e0b6 100644
--- a/rust/datafusion/src/physical_plan/coalesce_batches.rs
+++ b/rust/datafusion/src/physical_plan/coalesce_batches.rs
@@ -111,6 +111,7 @@ impl ExecutionPlan for CoalesceBatchesExec {
             target_batch_size: self.target_batch_size,
             buffer: Vec::new(),
             buffered_rows: 0,
+            is_closed: false,
         }))
     }
 }
@@ -126,6 +127,8 @@ struct CoalesceBatchesStream {
     buffer: Vec<RecordBatch>,
     /// Buffered row count
     buffered_rows: usize,
+    /// Whether the stream has finished returning all of its data or not
+    is_closed: bool,
 }
 
 impl Stream for CoalesceBatchesStream {
@@ -135,6 +138,9 @@ impl Stream for CoalesceBatchesStream {
         mut self: Pin<&mut Self>,
         cx: &mut Context<'_>,
     ) -> Poll<Option<Self::Item>> {
+        if self.is_closed {
+            return Poll::Ready(None);
+        }
         loop {
             let input_batch = self.input.poll_next_unpin(cx);
             match input_batch {
@@ -167,6 +173,7 @@ impl Stream for CoalesceBatchesStream {
                         }
                     }
                     None => {
+                        self.is_closed = true;
                         // we have reached the end of the input stream but 
there could still
                         // be buffered batches
                         if self.buffer.is_empty() {
@@ -234,7 +241,7 @@ pub fn concat_batches(
 #[cfg(test)]
 mod tests {
     use super::*;
-    use crate::physical_plan::memory::MemoryExec;
+    use crate::physical_plan::{memory::MemoryExec, 
repartition::RepartitionExec};
     use arrow::array::UInt32Array;
     use arrow::datatypes::{DataType, Field, Schema};
 
@@ -244,7 +251,7 @@ mod tests {
         let partition = create_vec_batches(&schema, 10);
         let partitions = vec![partition];
 
-        let output_partitions = coalesce_batches(&schema, partitions, 
20).await?;
+        let output_partitions = coalesce_batches(&schema, partitions, 
21).await?;
         assert_eq!(1, output_partitions.len());
 
         // input is 10 batches x 8 rows (80 rows)
@@ -287,6 +294,8 @@ mod tests {
     ) -> Result<Vec<Vec<RecordBatch>>> {
         // create physical plan
         let exec = MemoryExec::try_new(&input_partitions, schema.clone(), 
None)?;
+        let exec =
+            RepartitionExec::try_new(Arc::new(exec), 
Partitioning::RoundRobinBatch(1))?;
         let exec: Arc<dyn ExecutionPlan> =
             Arc::new(CoalesceBatchesExec::new(Arc::new(exec), 
target_batch_size));
 

Reply via email to