alamb opened a new issue, #7362: URL: https://github.com/apache/arrow-rs/issues/7362
**Is your feature request related to a problem or challenge? Please describe what you are trying to do.** We are trying to speed up [`RowFilter`](https://docs.rs/parquet/latest/parquet/arrow/arrow_reader/struct.RowFilter.html) evaluation ("selection pushdown") in parquet decoding, This is mostly copied from https://github.com/apache/arrow-rs/pull/6921 from @XiangpengHao as I don't want to lose the context A high level overview can be found in this Blog Post: https://datafusion.apache.org/blog/2025/03/21/parquet-pushdown/ ## Why selection pushdown? Selection pushdown (or late materialization or row-filter or filter pushdown) is great in concept, but can be tricky to implement efficiently. For example, current straightforward implementation actually slow down many queries, which prevents query engine like DataFusion to enable filter pushdown by default. The goal of a super fast row-filter pushdown parquet reader is described by @alamb in https://github.com/apache/arrow-rs/issues/5523#issuecomment-2429364040: > is that evaluating predicates in ArrowFilter (aka pushed down predicates) is never worse than decoding the columns first and then filtering them with the filter kernel Previous discussions have listed many potential optimizations to current selection pushdown, like the ones in https://github.com/apache/arrow-rs/issues/5523#issuecomment-2429470872. However, it's not clear how we can incorporate those optimizations into the current implementation. After thinking more carefully about the design spaces and the implications, I believe the only way to reach that goal is to re-structure the parquet reading pipline, and also reuse as much existing implementation as possible. ## Current implementation and the problems We currently implement a [two phase decoding](https://github.com/apache/arrow-rs/blob/2c84f243b882eff69806cd7294d38bf422fdb24a/parquet/src/arrow/async_reader/mod.rs#L497): Phase 1: Build selectors on each predicate ``` Empty selector -> decode column 1 -> selection 1 Selection 1 -> decode column 2 -> selection 2 … Selection K ``` Phase 2: Decode parquet data using the selector ``` Selection K -> decode column 1 Selection K -> decode column 2 … Selection K -> decode column N ``` The problem is that **we have to decode the predicate column twice**, for example, if column 1 is being filtered, we need to first decode column 1 while evaluating the predicate, then decode it again to build the array. ## Caching is the solution but not that simple The high level intuition is that, if the problem is decoding twice, we simply cache the first decoding results and reuse later. Here are the nuances: 1. In the first stage of decoding, we build the filter over the entire column. To cache the decoded array, we need to cache the entire decoded column in memory, which can be prohibitively expensive. 2. Each predicate inherit the selection from the previous evaluation, meaning that each predicates have different selection, which is different from the final selection. It's very non-trivial to convert the cached intermediate record batch to the final record batch, as the selection is completely different 3. The columns being filtered may not be the columns in the final output, and it is especially challenging when we have nested columns, which requires us to do sub-tree matching. ## Proposed solutions The solution consists two parts: 1. **A new decoding pipeline that pushdown the predicate evaluation down to record batch decoding.** 2. **A carefully designed cache that consumes constant amount of memory (up to 2 page per column) to the column size. The extra memory overhead is therefore negligible. ** The pipeline looks like this: ``` Load predicate columns for batch 1 -> evaluate predicates -> filter 1 -> load & emit batch 1 Load predicate columns for batch 2 -> evaluate predicates -> filter 2 -> load & emit batch 2 ... Load predicate columns for batch N -> evaluate predicates -> filter N -> load & emit batch N ``` Once we have this pipeline, we can cache the `predicate columns for batch N` and reuse it when `load & emit batch N`, this avoids double decoding. Due to the difficulties mentioned above, https://github.com/apache/arrow-rs/pull/6921 caches the decompressed pages, rather than decoded arrow arrays. As [some research](https://youtu.be/woqXLE25gMc?t=1232) suggests, decompressing pages costs up to twice as much as decoding arrow, if we can cache the decompressed pages, then we only need to decode arrow twice, which might be good enough. Caching decompressed pages is much simpler to implement, as we can reuse the current array_readers and just implement a new [PageReader](https://github.com/apache/arrow-rs/blob/5249f99e0e6671fc06996365dd4cf22a8427348c/parquet/src/column/page.rs#L340). **Describe alternatives you've considered** <!-- A clear and concise description of any alternative solutions or features you've considered. --> **Additional context** - https://github.com/apache/arrow-rs/pull/6921 - Related to #6454 - Related to #5523 - Related to #6624 - Related to https://github.com/apache/datafusion/issues/13298 - Related to https://github.com/apache/datafusion/issues/3463 - Related to https://github.com/apache/datafusion/pull/12524 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@arrow.apache.org.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org