alamb opened a new issue, #7362:
URL: https://github.com/apache/arrow-rs/issues/7362

   **Is your feature request related to a problem or challenge? Please describe 
what you are trying to do.**
   
   We are trying to speed up 
[`RowFilter`](https://docs.rs/parquet/latest/parquet/arrow/arrow_reader/struct.RowFilter.html)
 evaluation ("selection pushdown") in parquet decoding, 
   
   This is mostly copied from https://github.com/apache/arrow-rs/pull/6921 from 
@XiangpengHao as I don't want to lose the context
   
   A high level overview can be found in this Blog Post: 
https://datafusion.apache.org/blog/2025/03/21/parquet-pushdown/
   
   ## Why selection pushdown?
    
   Selection pushdown (or late materialization or row-filter or filter 
pushdown) is great in concept, but can be tricky to implement efficiently. For 
example, current straightforward implementation actually slow down many 
queries, which prevents query engine like DataFusion to enable filter pushdown 
by default. The goal of a super fast row-filter pushdown parquet reader is 
described by @alamb  in 
https://github.com/apache/arrow-rs/issues/5523#issuecomment-2429364040:
   > is that evaluating predicates in ArrowFilter (aka pushed down predicates) 
is never worse than decoding the columns first and then filtering them with the 
filter kernel
   
   Previous discussions have listed many potential optimizations to current 
selection pushdown, like the ones in  
https://github.com/apache/arrow-rs/issues/5523#issuecomment-2429470872.
   However, it's not clear how we can incorporate those optimizations into the 
current implementation. After thinking more carefully about the design spaces 
and the implications, I believe the only way to reach that goal is to 
re-structure the parquet reading pipline, and also reuse as much existing 
implementation as possible.
   
   ## Current implementation and the problems
   We currently implement a [two phase 
decoding](https://github.com/apache/arrow-rs/blob/2c84f243b882eff69806cd7294d38bf422fdb24a/parquet/src/arrow/async_reader/mod.rs#L497):
 
   Phase 1: Build selectors on each predicate
   ```
   Empty selector -> decode column 1 -> selection 1
   Selection 1 -> decode column 2 -> selection 2
   …
   Selection K
   ```
   Phase 2: Decode parquet data using the selector
   ```
   Selection K -> decode column 1
   Selection K -> decode column 2
   …
   Selection K -> decode column N
   ```
   
   The problem is that **we have to decode the predicate column twice**, for 
example, if column 1 is being filtered, we need to first decode column 1 while 
evaluating the predicate, then decode it again to build the array.
   
   ## Caching is the solution but not that simple
   
   The high level intuition is that, if the problem is decoding twice, we 
simply cache the first decoding results and reuse later.
   
   Here are the nuances:
   1. In the first stage of decoding, we build the filter over the entire 
column. To cache the decoded array, we need to cache the entire decoded column 
in memory, which can be prohibitively expensive.
   2. Each predicate inherit the selection from the previous evaluation, 
meaning that each predicates have different selection, which is different from 
the final selection. It's very non-trivial to convert the cached intermediate 
record batch to the final record batch, as the selection is completely different
   3. The columns being filtered may not be the columns in the final output, 
and it is especially challenging when we have nested columns, which requires us 
to do sub-tree matching.
   
   
   ## Proposed solutions
   
   The solution consists two parts:
   1. **A new decoding pipeline that pushdown the predicate evaluation down to 
record batch decoding.**
   2. **A carefully designed cache that consumes constant amount of memory (up 
to 2 page per column) to the column size. The extra memory overhead is 
therefore negligible. **
   
   The pipeline looks like this:
   ```
   Load predicate columns for batch 1 -> evaluate predicates -> filter 1 -> 
load & emit batch 1
   Load predicate columns for batch 2 -> evaluate predicates -> filter 2 -> 
load & emit batch 2
   ...
   Load predicate columns for batch N -> evaluate predicates -> filter N -> 
load & emit batch N
   ```
   
   Once we have this pipeline, we can cache the `predicate columns for batch N` 
 and reuse it when `load & emit batch N`, this avoids double decoding.
   
   Due to the difficulties mentioned above, 
https://github.com/apache/arrow-rs/pull/6921 caches the decompressed pages, 
rather than decoded arrow arrays. As [some 
research](https://youtu.be/woqXLE25gMc?t=1232) suggests, decompressing pages 
costs up to twice as much as decoding arrow, if we can cache the decompressed 
pages, then we only need to decode arrow twice, which might be good enough. 
Caching decompressed pages is much simpler to implement, as we can reuse the 
current array_readers and just implement a new 
[PageReader](https://github.com/apache/arrow-rs/blob/5249f99e0e6671fc06996365dd4cf22a8427348c/parquet/src/column/page.rs#L340).
   
   
   **Describe alternatives you've considered**
   <!--
   A clear and concise description of any alternative solutions or features 
you've considered.
   -->
   
   **Additional context**
   - https://github.com/apache/arrow-rs/pull/6921
   - Related to #6454 
   - Related to #5523 
   - Related to #6624 
   - Related to https://github.com/apache/datafusion/issues/13298
   - Related to https://github.com/apache/datafusion/issues/3463
   - Related to https://github.com/apache/datafusion/pull/12524


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@arrow.apache.org.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to