pepijnve commented on issue #16482:
URL: https://github.com/apache/datafusion/issues/16482#issuecomment-3005212491

   @ding-young I think the gist of it is correct indeed. I tinkered a bit 
locally with `spill_reader_yield` and condensed it down to this which is doing 
the same thing but is a bit easier to grasp since it fits on one page.
   
   ```
   async fn spill_reader_yield(
   ) -> Result<(), Box<dyn Error>> {
       use datafusion_physical_plan::common::spawn_buffered;
       use datafusion_execution::{RecordBatchStream};
       use futures::{Stream};
   
       /// A mock stream that always returns `Poll::Ready(Some(...))` 
immediately
       let always_ready = make_lazy_exec("value", false).execute(0, 
SessionContext::new().task_ctx())?;
   
       // this function makes a consumer stream that resembles how read_stream 
from spill file is constructed 
       let stream = make_cooperative(always_ready);
       
       // Set large buffer so that buffer always has free space for the 
producer/sender 
       let buffer_capacity = 100_000;  
       let mut mock_stream = spawn_buffered(stream, buffer_capacity);
       let schema = mock_stream.schema();
   
       let consumer_stream = futures::stream::poll_fn(move |cx| {
           use arrow::compute::concat_batches;
           let mut collected = vec![];
           // To make sure that inner stream is polled multiple times, loop 
forever if inner (producer) stream returns Ready
           loop {
               match mock_stream.as_mut().poll_next(cx) {
                   Poll::Ready(Some(Ok(batch))) => {
                       println!("received batch from inner");
                       collected.push(batch);
                   }
                   Poll::Ready(Some(Err(e))) => {
                       println!("error from inner");
                       return Poll::Ready(Some(Err(e)));
                   }
                   Poll::Ready(None) => {
                       println!("inner stream ended");
                       break;
                   }
                   Poll::Pending => {
                       // polling inner stream may return Pending only when it 
reaches budget, since 
                       // we intentionally made ProducerStream always return 
Ready
                       return Poll::Pending;
                   }
               }
           }
   
           // This should be unreachable since the stream is canceled
           let combined = concat_batches(&mock_stream.schema(), &collected)
               .expect("Failed to concat batches");
   
           Poll::Ready(Some(Ok(combined)))
       });
   
       let consumer_record_batch_stream = 
Box::pin(RecordBatchStreamAdapter::new(
           schema,
           consumer_stream
       ));
       
       stream_yields(consumer_record_batch_stream).await
   }
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org
For additional commands, e-mail: github-h...@datafusion.apache.org

Reply via email to