alamb opened a new issue #437:
URL: https://github.com/apache/arrow-datafusion/issues/437


   **Describe the bug**
   Given a `RepartitionExec` where its input stream does not have a record 
batch for immediate consumption, it will terminate early. 
   
   **To Reproduce**
   Set up a plan like this:
   
   ```
   ┌───────────────────┐        ┌───────────────────────┐
   │                   │        │                       │
   │    InputStream    │───────▶│   RepartitionStream   │
   │                   │        │                       │
   └───────────────────┘        └───────────────────────┘
   ```
   
   Where the input stream won't produce the record batch immediately. Full 
reproducer below. 
   
   I expect to the repartition stream to produce the same record batch as the 
input stream (will) provide. However, I get nothing!
   ```
   expected:
   
   [
       "+------------------+",
       "| my_awesome_field |",
       "+------------------+",
       "| foo              |",
       "| bar              |",
       "+------------------+",
   ]
   actual:
   
   [
       "++",
       "++",
   ]
   ```
   
   
   **Full Reproducer (run in repartition.rs)**
   Add any other context about the problem here.
   
   ```rust
   
       #[tokio::test]
       async fn repartition_with_delayed_stream() {
           let input = DelayedExec::new();
           let partitioning = input.output_partitioning();
           let expected_batches = vec![input.batch.clone()];
           let exec = RepartitionExec::try_new(Arc::new(input), 
partitioning).unwrap();
   
           let expected = vec![
               "+------------------+",
               "| my_awesome_field |",
               "+------------------+",
               "| foo              |",
               "| bar              |",
               "+------------------+",
           ];
   
           assert_batches_eq!(&expected, &expected_batches);
   
   
           let output_stream = exec.execute(0).await.unwrap();
           let batches = 
crate::physical_plan::common::collect(output_stream).await.unwrap();
   
           assert_batches_eq!(&expected, &batches);
   
       }
   
       #[derive(Debug)]
       struct DelayedExec {
           batch: RecordBatch
       }
   
       impl DelayedExec {
           fn new() -> Self {
               let batch =  RecordBatch::try_from_iter(vec![
                   ("my_awesome_field", Arc::new(StringArray::from(vec!["foo", 
"bar"])) as ArrayRef)
               ]).unwrap();
   
               Self {
                   batch
               }
           }
       }
   
   
       #[async_trait]
       impl ExecutionPlan for DelayedExec {
           fn as_any(&self) -> &dyn Any {
               self
           }
   
           fn schema(&self) -> SchemaRef {
               self.batch.schema()
           }
   
           fn output_partitioning(&self) -> Partitioning {
               Partitioning::UnknownPartitioning(1)
           }
   
           fn children(&self) -> Vec<Arc<dyn ExecutionPlan>> {
               unimplemented!()
           }
   
           fn with_new_children(
               &self,
               _children: Vec<Arc<dyn ExecutionPlan>>,
           ) -> Result<Arc<dyn ExecutionPlan>> {
               unimplemented!()
           }
   
           /// Returns a stream which does not have data immediately, but
           /// needs to yield (to allow another task to run) to get its
           /// input.
           async fn execute(&self, partition: usize) -> 
Result<SendableRecordBatchStream> {
               assert_eq!(partition, 0);
   
               let batch = self.batch.clone();
               let schema = batch.schema();
   
               let (tx, rx) = tokio::sync::mpsc::channel(2);
   
               // task simply sends the batch
               tokio::task::spawn(async move {
                   println!("Sending batch via delayed stream");
                   if let Err(e) = tx.send(Ok(batch.clone())).await {
                       println!("ERROR batch via delayed stream: {}", e);
                   }
               });
   
               // returned stream simply reads off the rx stream
               let stream = ParquetStream {
                   schema,
                   inner: ReceiverStream::new(rx),
               };
               Ok(Box::pin(stream))
           }
       }
   
   
       #[derive(Debug)]
       pub struct ParquetStream {
           schema: SchemaRef,
           inner: ReceiverStream<ArrowResult<RecordBatch>>,
       }
   
       impl Stream for ParquetStream {
           type Item = ArrowResult<RecordBatch>;
   
           fn poll_next(
               mut self: std::pin::Pin<&mut Self>,
               cx: &mut Context<'_>,
           ) -> Poll<Option<Self::Item>> {
               println!("ParquetStream::poll_next");
               let res = self.inner.poll_next_unpin(cx);
               println!("ParquetStream::poll_next() done");
               res
           }
       }
   
       impl RecordBatchStream for ParquetStream {
           fn schema(&self) -> SchemaRef {
               Arc::clone(&self.schema)
           }
       }
   
       impl Drop for ParquetStream {
           fn drop(&mut self) {
               println!("ParquetStream::drop()");
           }
       }
   ```
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to