XiangpengHao opened a new pull request, #6921:
URL: https://github.com/apache/arrow-rs/pull/6921
DO NOT MERGE.
I'll send breakdown PRs once I nailed down the designs.
# Which issue does this PR close?
<!--
We generally require a GitHub issue to be filed for all bug fixes and
enhancements and this helps us generate change logs for our releases. You can
link an issue to this PR using the GitHub syntax. For example `Closes #123`
indicates that this PR will close issue #123.
-->
Many long lasting issues in DataFusion and parquet-rs, todo: find some
Closes #.
# Rationale for this change
<!--
Why are you proposing this change? If this is already explained clearly in
the issue then this section is not needed.
Explaining clearly why changes are proposed helps reviewers understand your
changes and offer better suggestions for fixes.
-->
Filter pushdown is great in concept, but current straightforward
implementation actually slow down queries. The goal of a super fast filter
pushdown parquet reader is described by @alamb in
https://github.com/apache/arrow-rs/issues/5523#issuecomment-2429364040:
> is that evaluating predicates in ArrowFilter (aka pushed down predicates)
is never worse than decoding the columns first and then filtering them with the
filter kernel
I initially thought we can find simple/smart tricks to workaround some of
the issues
https://github.com/apache/arrow-rs/issues/5523#issuecomment-2429470872 of the
current implementation. After thinking more carefully about the design spaces
and the implications, I believe the only way to reach that goal is to
re-structure the parquet reading pipline, and also reuse as much existing
implementation as possible.
### Problems of current implementation
We follow a [two phase
decoding](https://github.com/apache/arrow-rs/blob/2c84f243b882eff69806cd7294d38bf422fdb24a/parquet/src/arrow/async_reader/mod.rs#L497):
Phase 1: Build selectors on each predicate
```
Empty selector -> decode column 1 -> selection 1
Selection 1 -> decode column 2 -> selection 2
…
Selection K
```
Phase 2: Decode parquet data using the selector
```
Selection K -> decode column 1
Selection K -> decode column 2
…
Selection K -> decode column N
```
The first problem is that we have to decode the predicate column twice, for
example, if column 1 is being filtered, we need to first decode column 1 while
evaluating the predicate, then decode it again to build the array.
The second problem is that, we need to decode and evaluate all the
predicates before we can emit the first record batch. This not only cause high
first-record-batch latency, but also making caching decoded value challenging
-- we need to cache the entire column to avoid double-decoding.
I have described other issues in
https://github.com/apache/arrow-rs/issues/5523#issuecomment-2429470872, but
they are relatively straghtforward to fix.
### Proposed decoding pipeline
We can't pre-build the filter once and use it to decode data, instead, we
need to build filter batch-by-batch along with decoding. The pipeline looks
like this:
```
Load predicate columns for batch 1 -> evaluate predicates -> filter 1 ->
load & emit batch 1
Load predicate columns for batch 2 -> evaluate predicates -> filter 2 ->
load & emit batch 2
...
Load predicate columns for batch N -> evaluate predicates -> filter N ->
load & emit batch N
```
Once we have this pipeline, we can cache the `predicate columns for batch N`
and reuse it when `load & emit batch N`, this avoids double decoding.
However, the key challenge is to handle nesting types. The `predicate
columns` is not an array, but a tree; same to the result columns. So the
problem is not just to intersect two column lists, but also to reconstruct the
predicate columns tree to the result column tree.
A workaround is to cache the decompressed pages, rather than decoded arrow
arrays. As some research suggests (todo: cite myself) decompressing pages costs
up to twice as much as decoding arrow, if we can cache the decompressed pages,
then we only need to decode arrow twice, which might be good enough. Caching
decompressed pages is much simpler to implement, as we can reuse the current
array_readers and just implement a new
[PageReader](https://github.com/apache/arrow-rs/blob/5249f99e0e6671fc06996365dd4cf22a8427348c/parquet/src/column/page.rs#L340).
This PR implements caching decompressed pages, however, testing on my local
machine shows that it still cause slow down to some queries, particularly for
string columns.
# What changes are included in this PR?
This draft PR implements some of the ideas described above, and already see
many performance improvements. But here are some todos:
- [ ] Push down InMemoryRowGroup fetch to decoding time as well. Currently
we fetch columns too early
- [ ] Try to cache arrow array again
- [ ] other straightforward todos...
<!--
There is no need to duplicate the description in the issue here but it is
sometimes worth providing a summary of the individual changes in this PR.
-->
# Are there any user-facing changes?
<!--
If there are user-facing changes then we may require documentation to be
updated before approving the PR.
-->
<!---
If there are any breaking changes to public APIs, please call them out.
-->
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]