This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git
The following commit(s) were added to refs/heads/master by this push:
new 6170c51 ARROW-11784: [Rust][DataFusion] CoalesceBatchesStream doesn't
honor Stream interface
6170c51 is described below
commit 6170c5190feab2f585e65e989957bfc67ad6bd52
Author: Ximo Guanter <[email protected]>
AuthorDate: Fri Feb 26 06:09:47 2021 -0500
ARROW-11784: [Rust][DataFusion] CoalesceBatchesStream doesn't honor Stream
interface
Unit tests now cover the bug to avoid regressions.
Closes #9574 from edrevo/fix-coalescebatchesstream
Authored-by: Ximo Guanter <[email protected]>
Signed-off-by: Andrew Lamb <[email protected]>
---
rust/datafusion/src/physical_plan/coalesce_batches.rs | 13 +++++++++++--
1 file changed, 11 insertions(+), 2 deletions(-)
diff --git a/rust/datafusion/src/physical_plan/coalesce_batches.rs
b/rust/datafusion/src/physical_plan/coalesce_batches.rs
index 9f36fd8..b91e0b6 100644
--- a/rust/datafusion/src/physical_plan/coalesce_batches.rs
+++ b/rust/datafusion/src/physical_plan/coalesce_batches.rs
@@ -111,6 +111,7 @@ impl ExecutionPlan for CoalesceBatchesExec {
target_batch_size: self.target_batch_size,
buffer: Vec::new(),
buffered_rows: 0,
+ is_closed: false,
}))
}
}
@@ -126,6 +127,8 @@ struct CoalesceBatchesStream {
buffer: Vec<RecordBatch>,
/// Buffered row count
buffered_rows: usize,
+ /// Whether the stream has finished returning all of its data or not
+ is_closed: bool,
}
impl Stream for CoalesceBatchesStream {
@@ -135,6 +138,9 @@ impl Stream for CoalesceBatchesStream {
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Option<Self::Item>> {
+ if self.is_closed {
+ return Poll::Ready(None);
+ }
loop {
let input_batch = self.input.poll_next_unpin(cx);
match input_batch {
@@ -167,6 +173,7 @@ impl Stream for CoalesceBatchesStream {
}
}
None => {
+ self.is_closed = true;
// we have reached the end of the input stream but
there could still
// be buffered batches
if self.buffer.is_empty() {
@@ -234,7 +241,7 @@ pub fn concat_batches(
#[cfg(test)]
mod tests {
use super::*;
- use crate::physical_plan::memory::MemoryExec;
+ use crate::physical_plan::{memory::MemoryExec,
repartition::RepartitionExec};
use arrow::array::UInt32Array;
use arrow::datatypes::{DataType, Field, Schema};
@@ -244,7 +251,7 @@ mod tests {
let partition = create_vec_batches(&schema, 10);
let partitions = vec![partition];
- let output_partitions = coalesce_batches(&schema, partitions,
20).await?;
+ let output_partitions = coalesce_batches(&schema, partitions,
21).await?;
assert_eq!(1, output_partitions.len());
// input is 10 batches x 8 rows (80 rows)
@@ -287,6 +294,8 @@ mod tests {
) -> Result<Vec<Vec<RecordBatch>>> {
// create physical plan
let exec = MemoryExec::try_new(&input_partitions, schema.clone(),
None)?;
+ let exec =
+ RepartitionExec::try_new(Arc::new(exec),
Partitioning::RoundRobinBatch(1))?;
let exec: Arc<dyn ExecutionPlan> =
Arc::new(CoalesceBatchesExec::new(Arc::new(exec),
target_batch_size));