lyang24 opened a new issue, #9320:
URL: https://github.com/apache/arrow-rs/issues/9320
next_row_group async api is powerful that allows us to overlap io and
decoding. I felt like we should add a read_pipeline in the example.
```
async fn read_pipelined(data: Bytes) -> Result<Vec<RecordBatch>> {
let reader = InMemoryReader::new(data);
let builder = ParquetRecordBatchStreamBuilder::new(reader).await?;
let mut stream = builder.with_batch_size(8192).build()?;
// Channel buffers row group readers ahead of decode
// Buffer size of 2 means we can have up to 2 row groups fetched and
waiting
let (tx, mut rx) = mpsc::channel(2);
// Spawn I/O task - continuously fetches row groups
let io_handle = tokio::spawn(async move {
while let Ok(Some(reader)) = stream.next_row_group().await {
// Send reader to decode task; if receiver is dropped, stop
fetching
if tx.send(reader).await.is_err() {
break;
}
}
});
// Decode task - processes readers as they arrive
let mut batches = vec![];
while let Some(reader) = rx.recv().await {
// Decode in a blocking task to avoid blocking the async runtime
// This is important because decoding is CPU-bound
let decoded = tokio::task::spawn_blocking(move || {
reader.into_iter().collect::<std::result::Result<Vec<_>, _>>()
})
.await
.expect("decode task panicked")?;
batches.extend(decoded);
}
// Wait for I/O task to complete
io_handle.await.expect("I/O task panicked");
Ok(batches)
}
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]