This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion-site.git
The following commit(s) were added to refs/heads/main by this push:
new d47b534 Blog post on Parquet filter pushdown (#61)
d47b534 is described below
commit d47b534689220dfa032c2432932965fc5e9782ef
Author: Xiangpeng Hao <[email protected]>
AuthorDate: Tue Mar 25 10:46:59 2025 -0500
Blog post on Parquet filter pushdown (#61)
* new blog post
* add parquet pushdown blog post
* Apply suggestions from code review
Co-authored-by: Andrew Lamb <[email protected]>
* improve writing
* update
* update
* Update date
* Capitalization consistency
* Add links and format
* Updat date
* remove old page
* Apply suggestions from code review
Co-authored-by: Andrew Lamb <[email protected]>
Co-authored-by: Oleks V <[email protected]>
* Update content/blog/2025-03-21-parquet-pushdown.md
Co-authored-by: Kevin Liu <[email protected]>
* update
* update
* Update content/blog/2025-03-21-parquet-pushdown.md
Co-authored-by: Kevin Liu <[email protected]>
* Update content/blog/2025-03-21-parquet-pushdown.md
Co-authored-by: Kevin Liu <[email protected]>
* Update content/blog/2025-03-21-parquet-pushdown.md
Co-authored-by: Kevin Liu <[email protected]>
* Apply suggestions from code review
Co-authored-by: Yongting You <[email protected]>
---------
Co-authored-by: Andrew Lamb <[email protected]>
Co-authored-by: Oleks V <[email protected]>
Co-authored-by: Kevin Liu <[email protected]>
Co-authored-by: Yongting You <[email protected]>
---
content/blog/2025-03-21-parquet-pushdown.md | 312 +++++++++++++++++++++
content/images/parquet-pushdown/baseline-impl.jpg | Bin 0 -> 166560 bytes
content/images/parquet-pushdown/cached-pages.jpg | Bin 0 -> 37963 bytes
content/images/parquet-pushdown/new-pipeline.jpg | Bin 0 -> 106800 bytes
content/images/parquet-pushdown/parquet-viewer.jpg | Bin 0 -> 344238 bytes
.../parquet-pushdown/pushdown-vs-no-pushdown.jpg | Bin 0 -> 194575 bytes
6 files changed, 312 insertions(+)
diff --git a/content/blog/2025-03-21-parquet-pushdown.md
b/content/blog/2025-03-21-parquet-pushdown.md
new file mode 100644
index 0000000..d2d8609
--- /dev/null
+++ b/content/blog/2025-03-21-parquet-pushdown.md
@@ -0,0 +1,312 @@
+---
+layout: post
+title: Efficient Filter Pushdown in Parquet
+date: 2025-03-21
+author: Xiangpeng Hao
+categories: [performance]
+---
+
+<style>
+figure {
+ margin: 20px 0;
+}
+
+figure img {
+ display: block;
+ max-width: 80%;
+}
+
+figcaption {
+ font-style: italic;
+ margin-top: 10px;
+ color: #555;
+ font-size: 0.9em;
+ max-width: 80%;
+}
+</style>
+
+<!--
+{% comment %}
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements. See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to you under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License. You may obtain a copy of the License at
+
+http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+{% endcomment %}
+-->
+
+_Editor's Note: This blog was first published on [Xiangpeng Hao's blog].
Thanks to [InfluxData] for sponsoring this work as part of his PhD funding._
+
+[Xiangpeng Hao's blog]: https://blog.xiangpeng.systems/posts/parquet-pushdown/
+[InfluxData]: https://www.influxdata.com/
+<hr/>
+
+
+In the [previous post], we discussed how [Apache DataFusion] prunes [Apache
Parquet] files to skip irrelevant **files/row_groups** (sometimes also
[pages](https://parquet.apache.org/docs/file-format/pageindex/)).
+
+This post discusses how Parquet readers skip irrelevant **rows** while
scanning data,
+leveraging Parquet's columnar layout by first reading only filter columns,
+and then selectively reading other columns only for matching rows.
+
+
+[previous post]: https://datafusion.apache.org/blog/2025/03/20/parquet-pruning
+[Apache DataFusion]: https://datafusion.apache.org/
+[Apache Parquet]: https://parquet.apache.org/
+
+## Why filter pushdown in Parquet?
+
+Below is an example query that reads sensor data with filters on `date_time`
and `location`.
+Without filter pushdown, all rows from `location`, `val`, and `date_time`
columns are decoded before `location='office'` is evaluated. Filter pushdown is
especially useful when the filter is selective, i.e., removes many rows.
+
+
+```sql
+SELECT val, location
+FROM sensor_data
+WHERE date_time > '2025-03-11' AND location = 'office';
+```
+
+<figure>
+ <img src="/blog/images/parquet-pushdown/pushdown-vs-no-pushdown.jpg"
alt="Parquet pruning skips irrelevant files/row_groups, while filter pushdown
skips irrelevant rows. Without filter pushdown, all rows from location, val,
and date_time columns are decoded before `location='office'` is evaluated.
Filter pushdown is especially useful when the filter is selective, i.e.,
removes many rows." width="80%" class="img-responsive">
+ <figcaption>
+ Parquet pruning skips irrelevant files/row_groups, while filter pushdown
skips irrelevant rows. Without filter pushdown, all rows from location, val,
and date_time columns are decoded before `location='office'` is evaluated.
Filter pushdown is especially useful when the filter is selective, i.e.,
removes many rows.
+ </figcaption>
+</figure>
+
+
+In our setup, sensor data is aggregated by date — each day has its own Parquet
file.
+At planning time, DataFusion prunes the unneeded Parquet files, i.e.,
`2025-03-10.parquet` and `2025-03-11.parquet`.
+
+Once the files to read are located, the [*DataFusion's current default
implementation*](https://github.com/apache/datafusion/issues/3463) reads all
the projected columns (`sensor_id`, `val`, and `location`) into Arrow
RecordBatches, then applies the filters over `location` to get the final set of
rows.
+
+A better approach is called **filter pushdown** with **late materialization**,
which evaluates filter conditions first and only decodes data that passes these
conditions.
+In practice, this works by first processing only the filter columns
(`date_time` and `location`), building a boolean mask of rows that satisfy our
conditions, then using this mask to selectively decode only the relevant rows
from other columns (`sensor_id`, `val`).
+This eliminates the waste of decoding rows that will be immediately filtered
out.
+
+While simple in theory, practical implementations often make performance worse.
+
+## How can filter pushdown be slower?
+
+At a high level, the Parquet reader first builds a filter mask -- essentially
a boolean array indicating which rows meet the filter criteria -- and then uses
this mask to selectively decode only the needed rows from the remaining columns
in the projection.
+
+Let's dig into details of [how filter pushdown is
implemented](https://github.com/apache/arrow-rs/blob/d5339f31a60a4bd8a4256e7120fe32603249d88e/parquet/src/arrow/async_reader/mod.rs#L618-L712)
in the current Rust Parquet reader implementation, illustrated in the
following figure.
+
+<figure>
+ <img src="/blog/images/parquet-pushdown/baseline-impl.jpg"
alt="Implementation of filter pushdown in Rust Parquet readers"
class="img-responsive" with="70%">
+ <figcaption>
+ Implementation of filter pushdown in Rust Parquet readers -- the first
phase builds the filter mask, the second phase applies the filter mask to the
other columns
+ </figcaption>
+</figure>
+
+The filter pushdown has two phases:
+
+1. Build the filter mask (steps 1-3)
+
+2. Use the filter mask to selectively decode other columns (steps 4-7), e.g.,
output step 3 is used as input for step 5 and 7.
+
+Within each phase, it takes three steps from Parquet to Arrow:
+
+1. Decompress the Parquet pages using generic decompression algorithms like
LZ4, Zstd, etc. (steps 1, 4, 6)
+
+2. Decode the page content into Arrow format (steps 2, 5, 7)
+
+3. Evaluate the filter over Arrow data (step 3)
+
+In the figure above, we can see that `location` is **decompressed and decoded
twice**, first when building the filter mask (steps 1, 2), and second when
building the output (steps 4, 5).
+This happens for all columns that appear both in the filter and output.
+
+The table below shows the corresponding CPU time on the [ClickBench query
22](https://github.com/apache/datafusion/blob/main/benchmarks/queries/clickbench/queries.sql#L23):
+
+```
++------------+--------+-------------+--------+
+| Decompress | Decode | Apply filter| Others |
++------------+--------+-------------+--------+
+| 206 ms | 117 ms | 22 ms | 48 ms |
++------------+--------+-------------+--------+
+```
+
+Clearly, decompress/decode operations dominate the time spent. With filter
pushdown, it needs to decompress/decode twice; but without filter pushdown, it
only needs to do this once.
+This explains why filter pushdown is slower in some cases.
+
+
+> **Note:** Highly selective filters may skip the entire page; but as long as
it reads one row from the page, it needs to decompress and often decode the
entire page.
+
+
+## Attempt: cache filter columns
+
+Intuitively, caching the filter columns and reusing them later could help.
+
+But naively caching decoded pages consumes prohibitively high memory:
+
+1. It needs to cache Arrow arrays, which are on average [4x larger than
Parquet
data](https://github.com/XiangpengHao/liquid-cache/blob/main/dev/doc/liquid-cache-vldb.pdf).
+
+2. It needs to cache the **entire column chunk in memory**, because in Phase 1
it builds filters over the column chunk, and only use it in Phase 2.
+
+3. The memory usage is proportional to the number of filter columns, which can
be unboundedly high.
+
+Worse, caching filter columns means it needs to read partially from Parquet
and partially from cache, which is complex to implement, likely requiring a
substantial change to the current implementation.
+
+> **Feel the complexity:** consider building a cache that properly handles
nested columns, multiple filters, and filters with multiple columns.
+
+## Real solution
+
+We need a solution that:
+
+1. Is simple to implement, i.e., doesn't require thousands of lines of code.
+
+2. Incurs minimal memory overhead.
+
+This section describes my [<700 LOC PR (with lots of comments and
tests)](https://github.com/apache/arrow-rs/pull/6921#issuecomment-2718792433)
that **reduces total ClickBench time by 15%, with up to 2x lower latency for
some queries, no obvious regression on other queries, and caches at most 2
pages (~2MB) per column in memory**.
+
+
+<figure>
+ <img src="/blog/images/parquet-pushdown/new-pipeline.jpg" alt="New decoding
pipeline, building filter mask and output columns are interleaved in a single
pass, allowing us to cache minimal pages for minimal amount of time"
width="80%" class="img-responsive">
+ <figcaption>
+ New decoding pipeline, building filter mask and output columns are
interleaved in a single pass, allowing us to cache minimal pages for minimal
amount of time
+ </figcaption>
+</figure>
+
+The new pipeline interleaves the previous two phases into a single pass, so
that:
+
+1. The page being decompressed is immediately used to build filter masks and
output columns.
+
+2. Decompressed pages are cached for minimal time; after one pass (steps 1-6),
the cache memory is released for the next pass.
+
+This allows the cache to only hold 1 page at a time, and to immediately
discard the previous page after it's used, significantly reducing the memory
requirement for caching.
+
+### What pages are cached?
+You may have noticed that only `location` is cached, not `val`, because `val`
is only used for output.
+More generally, only columns that appear both in the filter and output are
cached, and at most 1 page is cached for each such column.
+
+More examples:
+```sql
+SELECT val
+FROM sensor_data
+WHERE date_time > '2025-03-11' AND location = 'office';
+```
+
+In this case, no columns are cached, because `val` is not used for filtering.
+
+```sql
+SELECT COUNT(*)
+FROM sensor_data
+WHERE date_time > '2025-03-11' AND location = 'office';
+```
+
+In this case, again, no columns are cached, because the output projection is
empty after query plan optimization.
+
+### Then why cache 2 pages per column instead of 1?
+This is another real-world nuance regarding how Parquet layouts the pages.
+
+Parquet by default encodes data using [dictionary
encoding](https://parquet.apache.org/docs/file-format/data-pages/encodings/),
which writes a dictionary page as the first page of a column chunk, followed by
the keys referencing the dictionary.
+
+You can see this in action using
[parquet-viewer](https://parquet-viewer.xiangpeng.systems):
+
+<figure>
+ <img src="/blog/images/parquet-pushdown/parquet-viewer.jpg" alt="Parquet
viewer shows the page layout of a column chunk" width="80%"
class="img-responsive">
+ <figcaption>
+ Parquet viewer shows the page layout of a column chunk
+ </figcaption>
+</figure>
+
+This means that to decode a page of data, it actually references two pages:
the dictionary page and the data page.
+
+This is why it caches 2 pages per column: one dictionary page and one data
page.
+The data page slot will move forward as it reads the data; but the dictionary
page slot always references the first page.
+
+<figure>
+ <img src="/blog/images/parquet-pushdown/cached-pages.jpg" alt="Cached two
pages, one for dictionary (pinned), one for data (moves as it reads the data)"
width="80%" class="img-responsive">
+ <figcaption>
+ Cached two pages, one for dictionary (pinned), one for data (moves as it
reads the data)
+ </figcaption>
+</figure>
+
+
+## How does it perform?
+
+Here are my results on
[ClickBench](https://github.com/apache/datafusion/tree/main/benchmarks#clickbench)
on my AMD 9900X machine. The total time is reduced by 15%, with Q23 being
2.24x faster,
+and queries that get slower are likely due to noise.
+
+```
+┏━━━━━━━━━━━━━━┳━━━━━━━━━━━━━┳━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━┓
+┃ Query ┃ no-pushdown ┃ new-pushdown ┃ Change ┃
+┡━━━━━━━━━━━━━━╇━━━━━━━━━━━━━╇━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━━┩
+│ QQuery 0 │ 0.47ms │ 0.43ms │ +1.10x faster │
+│ QQuery 1 │ 51.10ms │ 50.10ms │ no change │
+│ QQuery 2 │ 68.23ms │ 64.49ms │ +1.06x faster │
+│ QQuery 3 │ 90.68ms │ 86.73ms │ no change │
+│ QQuery 4 │ 458.93ms │ 458.59ms │ no change │
+│ QQuery 5 │ 522.06ms │ 478.50ms │ +1.09x faster │
+│ QQuery 6 │ 49.84ms │ 49.94ms │ no change │
+│ QQuery 7 │ 55.09ms │ 55.77ms │ no change │
+│ QQuery 8 │ 565.26ms │ 556.95ms │ no change │
+│ QQuery 9 │ 575.83ms │ 575.05ms │ no change │
+│ QQuery 10 │ 164.56ms │ 178.23ms │ 1.08x slower │
+│ QQuery 11 │ 177.20ms │ 191.32ms │ 1.08x slower │
+│ QQuery 12 │ 591.05ms │ 569.92ms │ no change │
+│ QQuery 13 │ 861.06ms │ 848.59ms │ no change │
+│ QQuery 14 │ 596.20ms │ 580.73ms │ no change │
+│ QQuery 15 │ 554.96ms │ 548.77ms │ no change │
+│ QQuery 16 │ 1175.08ms │ 1146.07ms │ no change │
+│ QQuery 17 │ 1150.45ms │ 1121.49ms │ no change │
+│ QQuery 18 │ 2634.75ms │ 2494.07ms │ +1.06x faster │
+│ QQuery 19 │ 90.15ms │ 89.24ms │ no change │
+│ QQuery 20 │ 620.15ms │ 591.67ms │ no change │
+│ QQuery 21 │ 782.38ms │ 703.15ms │ +1.11x faster │
+│ QQuery 22 │ 1927.94ms │ 1404.35ms │ +1.37x faster │
+│ QQuery 23 │ 8104.11ms │ 3610.76ms │ +2.24x faster │
+│ QQuery 24 │ 360.79ms │ 330.55ms │ +1.09x faster │
+│ QQuery 25 │ 290.61ms │ 252.54ms │ +1.15x faster │
+│ QQuery 26 │ 395.18ms │ 362.72ms │ +1.09x faster │
+│ QQuery 27 │ 891.76ms │ 959.39ms │ 1.08x slower │
+│ QQuery 28 │ 4059.54ms │ 4137.37ms │ no change │
+│ QQuery 29 │ 235.88ms │ 228.99ms │ no change │
+│ QQuery 30 │ 564.22ms │ 584.65ms │ no change │
+│ QQuery 31 │ 741.20ms │ 757.87ms │ no change │
+│ QQuery 32 │ 2652.48ms │ 2574.19ms │ no change │
+│ QQuery 33 │ 2373.71ms │ 2327.10ms │ no change │
+│ QQuery 34 │ 2391.00ms │ 2342.15ms │ no change │
+│ QQuery 35 │ 700.79ms │ 694.51ms │ no change │
+│ QQuery 36 │ 151.51ms │ 152.93ms │ no change │
+│ QQuery 37 │ 108.18ms │ 86.03ms │ +1.26x faster │
+│ QQuery 38 │ 114.64ms │ 106.22ms │ +1.08x faster │
+│ QQuery 39 │ 260.80ms │ 239.13ms │ +1.09x faster │
+│ QQuery 40 │ 60.74ms │ 73.29ms │ 1.21x slower │
+│ QQuery 41 │ 58.75ms │ 67.85ms │ 1.15x slower │
+│ QQuery 42 │ 65.49ms │ 68.11ms │ no change │
+└──────────────┴─────────────┴──────────────┴───────────────┘
+┏━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━┓
+┃ Benchmark Summary ┃ ┃
+┡━━━━━━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━┩
+│ Total Time (no-pushdown) │ 38344.79ms │
+│ Total Time (new-pushdown) │ 32800.50ms │
+│ Average Time (no-pushdown) │ 891.74ms │
+│ Average Time (new-pushdown) │ 762.80ms │
+│ Queries Faster │ 13 │
+│ Queries Slower │ 5 │
+│ Queries with No Change │ 25 │
+└─────────────────────────────┴────────────┘
+```
+
+## Conclusion
+
+Despite being simple in theory, filter pushdown in Parquet is non-trivial to
implement.
+It requires understanding both the Parquet format and reader implementation
details.
+The challenge lies in efficiently navigating through the dynamics of decoding,
filter evaluation, and memory management.
+
+If you are interested in this level of optimization and want to help test,
document and implement this type of optimization, come find us in the
[DataFusion Community]. We would love to have you.
+
+[DataFusion Community]:
https://datafusion.apache.org/contributor-guide/communication.html
+
+
+
+
+
diff --git a/content/images/parquet-pushdown/baseline-impl.jpg
b/content/images/parquet-pushdown/baseline-impl.jpg
new file mode 100644
index 0000000..28f7e59
Binary files /dev/null and b/content/images/parquet-pushdown/baseline-impl.jpg
differ
diff --git a/content/images/parquet-pushdown/cached-pages.jpg
b/content/images/parquet-pushdown/cached-pages.jpg
new file mode 100644
index 0000000..0949544
Binary files /dev/null and b/content/images/parquet-pushdown/cached-pages.jpg
differ
diff --git a/content/images/parquet-pushdown/new-pipeline.jpg
b/content/images/parquet-pushdown/new-pipeline.jpg
new file mode 100644
index 0000000..93db49b
Binary files /dev/null and b/content/images/parquet-pushdown/new-pipeline.jpg
differ
diff --git a/content/images/parquet-pushdown/parquet-viewer.jpg
b/content/images/parquet-pushdown/parquet-viewer.jpg
new file mode 100644
index 0000000..71de38d
Binary files /dev/null and b/content/images/parquet-pushdown/parquet-viewer.jpg
differ
diff --git a/content/images/parquet-pushdown/pushdown-vs-no-pushdown.jpg
b/content/images/parquet-pushdown/pushdown-vs-no-pushdown.jpg
new file mode 100644
index 0000000..bedcf10
Binary files /dev/null and
b/content/images/parquet-pushdown/pushdown-vs-no-pushdown.jpg differ
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]