tustvold opened a new issue #1473:
URL: https://github.com/apache/arrow-rs/issues/1473


   **Background**
   
   In #1154 I added an `async` parquet API in the form of  
`ParquetRecordBatchStream`. This was maximally async, that is it made use of 
tokio's async IO traits to be as generic as possible. However, having 
experimented with this I'm not sure that this design is quite right. 
   
   In particular https://github.com/apache/arrow-datafusion/pull/1617 showed 
non-trivial performance regressions operating on local files. This is caused by 
three major factors:
   
   * Additional copying of buffers in tokio and the ParquetRecordBatchStream 
necessary to convert an async Read to a sync Read
   * Less parallelism due to parquet decode taking place in a separate blocking 
thread on master
   * Overheads due to `tokio::fs::File` calling `spawn_blocking` for every IO 
operation
   
   This last point is pretty important and touches on something I was not aware 
of, tokio does not use an IO reactor for file IO like say boost::asio, instead 
it just calls `tokio::task::spawn_blocking` for every IO call. This somewhat 
undermines the concept of async file IO, as all you're doing is moving where 
the `tokio::task::spawn_blocking` is called, and in fact you're moving it lower 
in the call chain where its overheads are less amortized.
   
   As part of further exploring this design space I created #1472 which instead 
of using the tokio IO traits, uses the non-async `ChunkReader` trait and 
`tokio::task::spawn_blocking`. Effectively this just upstreams logic from 
DataFusion's ParquetExec operator, and so perhaps unsurprisingly does not 
represent a performance regression.
   
   This is still technically an `async` API, however, I am aware that a number 
of people expressed interest in an `async` version of `ChunkReader` which 
suggests they want lower-level async-ness. It is also unclear that 
`ChunkReader` is quite right either - see #1163 and 
https://github.com/apache/arrow-datafusion/pull/1905.
   
   To further complicate matters, differing storage media have different 
trade-offs, in particular when fetching from local disk or memory it may make 
sense to perform the most granular reads possible, potentially filtering out 
individual pages, columns, etc... However, when fetching data from object 
storage this is less clear cut. As each request costs and comes with 
non-trivial latency, there is likely a desire to coalesce proximate byte ranges 
into a single request, even if this results in reading more data then needed. 
As a result there is likely no general-purpose strategy for fetching data, and 
we therefore need the flexibility to allow this to be customized downstream.
   
   Finally, there is ongoing effort to introduce more parallelism into the 
parquet scan - https://github.com/apache/arrow-datafusion/pull/1990, and whilst 
async is a concurrency primitive and not a parallelism primitive, the two 
concepts are closely related in practice. 
   
   **Requirements**
   
   I think the requirements are therefore as follows
   
   1. Provide an `async` API that yields a stream of `Result<RecordBatch>`
   2. Use predicate and projection pushdown to filter the data to scan
   3. Separate identifying the byte ranges of column data to scan, from 
actually performing the scan
   4. Delegate fetching the corresponding byte ranges to an `async` trait, 
allowing downstream customisation of the fetch strategy
   5. Avoid copying the page data between buffers
   7. Avoid calling spawn_blocking where the read implementation will not block 
(e.g. already in-memory)
   8. Be extensible to support parallel column decoding (#TBD)
   9. Be extensible to support more advanced predicate pushdown (#1191)
   
   **Proposal**
   
   An intentionally vague proposal would be to extend 
https://github.com/apache/arrow-datafusion/pull/1617 replacing the use of 
`ChunkReader` with a `Storage` trait that might look something like
   
   ```
   #[async_trait]
   pub trait Storage {
     async fn prefetch(&mut self, ranges: Vec<std::ops::Range<usize>>) -> 
Result<()>,
   
     async fn read(&mut self, range: std::ops::Range<usize>) -> 
Result<ByteBufferPtr>
   }
   ```
   
   `ParquetRecordBatchStreamBuilder` would use this trait to first read the 
footer, and then as part of `build()` invoke `prefetch()` with the determined 
byte ranges to scan. Finally `ParquetRecordBatchStream` would drive 
`Storage::read` with the individual column chunk ranges as needed by the stream.
   
   This will likely require some minor alterations to `SerializedPageReader` in 
order to avoid copying the data returned from `Storage::read` but I think this 
is worthwhile and will also benefit reading data from in-memory.
   
   FYI @rdettai @yjshen @alamb @sunchao 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to