tustvold opened a new issue #1473: URL: https://github.com/apache/arrow-rs/issues/1473
**Background** In #1154 I added an `async` parquet API in the form of `ParquetRecordBatchStream`. This was maximally async, that is it made use of tokio's async IO traits to be as generic as possible. However, having experimented with this I'm not sure that this design is quite right. In particular https://github.com/apache/arrow-datafusion/pull/1617 showed non-trivial performance regressions operating on local files. This is caused by three major factors: * Additional copying of buffers in tokio and the ParquetRecordBatchStream necessary to convert an async Read to a sync Read * Less parallelism due to parquet decode taking place in a separate blocking thread on master * Overheads due to `tokio::fs::File` calling `spawn_blocking` for every IO operation This last point is pretty important and touches on something I was not aware of, tokio does not use an IO reactor for file IO like say boost::asio, instead it just calls `tokio::task::spawn_blocking` for every IO call. This somewhat undermines the concept of async file IO, as all you're doing is moving where the `tokio::task::spawn_blocking` is called, and in fact you're moving it lower in the call chain where its overheads are less amortized. As part of further exploring this design space I created #1472 which instead of using the tokio IO traits, uses the non-async `ChunkReader` trait and `tokio::task::spawn_blocking`. Effectively this just upstreams logic from DataFusion's ParquetExec operator, and so perhaps unsurprisingly does not represent a performance regression. This is still technically an `async` API, however, I am aware that a number of people expressed interest in an `async` version of `ChunkReader` which suggests they want lower-level async-ness. It is also unclear that `ChunkReader` is quite right either - see #1163 and https://github.com/apache/arrow-datafusion/pull/1905. To further complicate matters, differing storage media have different trade-offs, in particular when fetching from local disk or memory it may make sense to perform the most granular reads possible, potentially filtering out individual pages, columns, etc... However, when fetching data from object storage this is less clear cut. As each request costs and comes with non-trivial latency, there is likely a desire to coalesce proximate byte ranges into a single request, even if this results in reading more data then needed. As a result there is likely no general-purpose strategy for fetching data, and we therefore need the flexibility to allow this to be customized downstream. Finally, there is ongoing effort to introduce more parallelism into the parquet scan - https://github.com/apache/arrow-datafusion/pull/1990, and whilst async is a concurrency primitive and not a parallelism primitive, the two concepts are closely related in practice. **Requirements** I think the requirements are therefore as follows 1. Provide an `async` API that yields a stream of `Result<RecordBatch>` 2. Use predicate and projection pushdown to filter the data to scan 3. Separate identifying the byte ranges of column data to scan, from actually performing the scan 4. Delegate fetching the corresponding byte ranges to an `async` trait, allowing downstream customisation of the fetch strategy 5. Avoid copying the page data between buffers 7. Avoid calling spawn_blocking where the read implementation will not block (e.g. already in-memory) 8. Be extensible to support parallel column decoding (#TBD) 9. Be extensible to support more advanced predicate pushdown (#1191) **Proposal** An intentionally vague proposal would be to extend https://github.com/apache/arrow-datafusion/pull/1617 replacing the use of `ChunkReader` with a `Storage` trait that might look something like ``` #[async_trait] pub trait Storage { async fn prefetch(&mut self, ranges: Vec<std::ops::Range<usize>>) -> Result<()>, async fn read(&mut self, range: std::ops::Range<usize>) -> Result<ByteBufferPtr> } ``` `ParquetRecordBatchStreamBuilder` would use this trait to first read the footer, and then as part of `build()` invoke `prefetch()` with the determined byte ranges to scan. Finally `ParquetRecordBatchStream` would drive `Storage::read` with the individual column chunk ranges as needed by the stream. This will likely require some minor alterations to `SerializedPageReader` in order to avoid copying the data returned from `Storage::read` but I think this is worthwhile and will also benefit reading data from in-memory. FYI @rdettai @yjshen @alamb @sunchao -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
