alamb opened a new issue, #7983:
URL: https://github.com/apache/arrow-rs/issues/7983

   (This is based on discussions with @crepererum  and @XiangpengHao over the 
last few days)
   
   ## Is your feature request related to a problem or challenge? 
   
   After working with the Parquet Reader, I have concluded it is not realistic 
to implement smart prefetching of row groups or pages because the knowledge of 
what data is needed next is not available from the reader.
   
   This means in practice, users either have to 
   1. buffer the entire file in memory or local SSD (which is what we do in 
InfluxDB)
   2. read the data serially with multiple smaller requests while decoding. 
   
   Buffering the entire file consumes significant resource and avoids some of 
the benefits inherent in columnar IO formats. 
   
   Reading the data with multiple smaller requests is ok on a low latency local 
file system or when you have a bunch of outstanding requests (where the latency 
can be hidden by other work), but it is pretty bad for remote object stores or 
remote filesystems (e.g. S3, HDFS, etc) where the latency of each request is 
high.
   
   There are also several other things which I find annoying about the current 
design which i will rant about:
   1. Row filters are evaluated eagerly (before the reader is created)  see 
[docs](https://docs.rs/parquet/latest/parquet/arrow/arrow_reader/struct.ArrowReaderBuilder.html#method.build)
 ("Note: this will eagerly evaluate any RowFilter before returning") which can 
potentially "front load" significant IO and CPU operations before client code 
might expect
   1. There are different sync and async readers, which makes it hard 
(impossible?) to write tests that cover both without duplicating code (see 
https://github.com/XiangpengHao/arrow-rs/pull/7)
   2. The internal implementation (e.g. for evaluating filters) is duplicated 
between the sync and async readers, which makes it hard to maintain and. 
Example: The initial predicate cache in 
https://github.com/apache/arrow-rs/pull/7850 does not work for the sync reader 
😬 
   3. The 
[`ArrowReaderBuilder`](https://docs.rs/parquet/latest/parquet/arrow/arrow_reader/struct.ArrowReaderBuilder.html)
 is a templated typedef . So the innocent 
sounding[`ParquetRecordBatchReaderBuilder`](https://docs.rs/parquet/latest/parquet/arrow/arrow_reader/type.ParquetRecordBatchReaderBuilder.html)
 is actually  `ArrowReaderBuilder<SyncReader<T>>;` and to actually use that you 
need to figure out `T: ChunkReader`. The multiple template levels makes it 
challenging to use (why is the the underlying type of the IO reader needed to 
to configure things like a `RowSelection`?)
   4. I found the async API confusing: you have to implement 
[`AsyncFileReader`](https://docs.rs/parquet/latest/parquet/arrow/async_reader/trait.AsyncFileReader.html)
  which combines IO **and** optional Parquet metadata caching. It is quite 
tricky to implement if you want to read from some different remote source.
   5. It is also a pain to test the async reader (as you can't easily do so 
with in memory `Bytes`). You can see this pain as there are at [least two 
implementations of AsyncFileReader for Bytes in the 
codebase](https://github.com/search?q=repo%3Aapache%2Farrow-rs%20%22impl%20AsyncFileReader%22&type=code).
 (I also had to add a third in https://github.com/XiangpengHao/arrow-rs/pull/7)
   
   
   ### Root Cause: Pull Based Decoder ?
   I think the core reason for many of the above challenges is the current 
design of the Parquet Reader as a "pull" based decoder where the CPU operations 
(decoding) drives the IO operations (e.g reading from the file).
   
   
   The way the reader works is:
   1. the caller asks for the next batch,
   2. the reader internally (may) make an I/O call to the underyling reader 
(either async or non async) if it needs needs more data 
   3. the reader then decodes from buffered data
   
   ```rust
   let reader = ParquetRecordBatchBuilder::new(io_reader)
     // more configuration here
     .build();
   
   // The call to `reader.next()` MAY do IO, depending on the readers state, but
   // the outer loop has no way to know what IO will be needed next
   while let Some(batch) = reader.next() {
     // process
   }
   ```
   
   This design makes it challenging to add more sophisticated strategies. For 
example, @masonh22 proposed to add pre-fetching of row groups to the reader 
https://github.com/apache/arrow-rs/pull/6676. Among other challenges with that 
PR is that  automatically pre-fetching row group data helps for some cases 
(e.g. reading remote object store), but can be pure cost on others (e.g. 
increases memory usage when the data is already in memory)
   
    Given the current pull based design I do not think it is feasible to 
implement different strategies in a reasonable way with the current API.
   
   
   ## Describe the solution you'd like
   I would like a way for advanced users of the parquet crate to more carefully 
optimize the I/O patterns 
   
   ## Describe alternatives you've considered
   
   ### Push Based Decoder
   One potential solution (again, thanks to  @crepererum  and @XiangpengHao), 
is to make a  "push based" decoder, similarly to the JSON decoder and the CSV 
decoder.
   
   A (simplified) example of how the JSON 
[`Decoder`](https://docs.rs/arrow-json/55.2.0/arrow_json/reader/struct.Decoder.html)
 works shows illustrates the difference: 
   
   ```rust
      let mut decoder = ReaderBuilder::new(schema).build_decoder()?;
      let mut next = move || {
           loop {
               // load some more bytes from somewhere
               let buf = read_input();
               if buf.is_empty() {
                   break; // Input exhausted
               }
               let read = buf.len();
               let decoded = decoder.decode(buf)?;
   
               // If the decoder couldn't read the whole buffer, it made an 
output batch
               if decoded != read {
                   break;
               }
           }
            decoder.flush() // retrieve completed batch
       };
    ```
   
   So the idea is that we could make a `ParquetDecoder` that is "push" based, 
and did not have an underlying IO reader, but instead would be given a stream 
of data to decode. We could then implement the current sync and async APIs 
using that underlying `Decoder`
   
   The challenge is likely with the API design:  unlike streams of JSON / CSV, 
the data that the parquet reader needs next will need is not easy to predict as 
it depends on the filters, the row groups, which columns are requested, etc.
   
   Here is a high level [straw man 
desig](https://en.wikipedia.org/wiki/Straw_man_proposal)n:
   
   ```rust
   // Create a decoder for decoding parquet data (note it does NOT have any IO 
/ readers)
   let mut decoder = ParquetDecoderBuilder::new();
       // provide metadata, if known (avoid having to read it each time)
       .with_file_metadata(metadata)
       .with_row_groups(row_groups)
       .with_projection(projection)
       .build();
   
   // In a loop, ask the decoder what it needs next, and provide it with the 
required data
   while let Some(next_request) = decoder.next_request() {
       // next_request will tell us what data ranges it needs (details TBD)
       let data: Bytes = fetch_data(next_request);
   
       // provide the data to the decoder
       decoder.push_data(next_request, data);
   
       // Now the decoder may be able to decode a batch
       // maybe it will return one or more batches, or it will ask for more data
       while let Some(batch) = decoder.next_batch() {
           // process the batch
       }
   }
   ```
   
   Note that the above example is not async, but could be easily be used by an 
async API
   
   With such a `ParquetDecoder` I think we could then add things like 
prefetching of row groups, etc, with new APIs on the decoder
   
   ```rust
   // Create a decoder for decoding parquet data as above
   let mut decoder: ParquetDecoderBuilder = ...;
   
   // As the decoder up from what data it will need, start prefetching data if 
desired
   while let Some(pre_request) = decoder.peek_next_requests() {
       // note that this is a peek and if we call peek again in the
       // future, we may get a different set of pre_requests (for example
       // if the decoder has applied a row filter and ruled out
       // some row groups or data pages)
       start_prefetch(pre_request);
   }
   
   // push data to the decoder as before, but hopefully the reader
   // will have already prefetched some of the data
   ```
   
   
   **Additional context**
   I have now hit this while trying to write tests end to end for the parquet 
reader (also see https://github.com/apache/arrow-rs/pull/7971), and it was very 
annoying to have to duplicate the code for the sync and async readers.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@arrow.apache.org.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to