kevinjqliu commented on code in PR #61:
URL: https://github.com/apache/datafusion-site/pull/61#discussion_r2008823867


##########
content/blog/2025-03-21-parquet-pushdown.md:
##########
@@ -0,0 +1,312 @@
+---
+layout: post
+title: Efficient Filter Pushdown in Parquet
+date: 2025-03-21
+author: Xiangpeng Hao
+categories: [performance]
+---
+
+<style>
+figure {
+  margin: 20px 0;
+}
+
+figure img {
+  display: block;
+  max-width: 80%;
+}
+
+figcaption {
+  font-style: italic;
+  margin-top: 10px;
+  color: #555;
+  font-size: 0.9em;
+  max-width: 80%;
+}
+</style>
+
+<!--
+{% comment %}
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements.  See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to you under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License.  You may obtain a copy of the License at
+
+http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+{% endcomment %}
+-->
+
+_Editor's Note: This blog was first published on [Xiangpeng Hao's blog]. 
Thanks to [InfluxData] for sponsoring this work as part of his PhD funding._
+
+[Xiangpeng Hao's blog]: https://blog.xiangpeng.systems/posts/parquet-pushdown/
+[InfluxData]: https://www.influxdata.com/
+<hr/>
+
+
+In the [previous post], we discussed how [Apache DataFusion] prunes [Apache 
Parquet] files to skip irrelevant **files/row_groups** (sometimes also 
[pages](https://parquet.apache.org/docs/file-format/pageindex/)).
+
+This post discusses how Parquet readers skip irrelevant **rows** while 
scanning data,
+leveraging Parquet's columnar layout by first reading only filter columns,
+and then selectively reading other columns only for matching rows.
+
+
+[previous post]: https://datafusion.apache.org/blog/2025/03/20/parquet-pruning
+[Apache DataFusion]: https://datafusion.apache.org/
+[Apache Parquet]: https://parquet.apache.org/
+
+## Why filter pushdown in Parquet? 
+
+Below is an example query that reads sensor data with filters on `date_time` 
and `location`.
+Without filter pushdown, all rows from location, val, and date_time columns 
are decoded before `location='office'` is evaluated. Filter pushdown is 
especially useful when the filter is selective, i.e., removes many rows.
+
+
+```sql
+SELECT val, location 
+FROM sensor_data 
+WHERE date_time > '2025-03-11' AND location = 'office';
+```
+
+<figure>
+  <img src="/blog/images/parquet-pushdown/pushdown-vs-no-pushdown.jpg" 
alt="Parquet pruning skips irrelevant files/row_groups, while filter pushdown 
skips irrelevant rows. Without filter pushdown, all rows from location, val, 
and date_time columns are decoded before `location='office'` is evaluated. 
Filter pushdown is especially useful when the filter is selective, i.e., 
removes many rows." width="80%" class="img-responsive">
+  <figcaption>
+    Parquet pruning skips irrelevant files/row_groups, while filter pushdown 
skips irrelevant rows. Without filter pushdown, all rows from location, val, 
and date_time columns are decoded before `location='office'` is evaluated. 
Filter pushdown is especially useful when the filter is selective, i.e., 
removes many rows.
+  </figcaption>
+</figure>
+
+
+In our setup, sensor data is aggregated by date — each day has its own Parquet 
file.
+At planning time, DataFusion prunes the unneeded Parquet files, i.e., 
`2025-03-10.parquet` and `2025-03-11.parquet`.
+
+Once the files to read are located, the [*DataFusion's current default 
implementation*](https://github.com/apache/datafusion/issues/3463) reads all 
the projected columns (`sensor_id`, `val`, and `location`) into Arrow 
RecordBatches, then applies the filters over `location` to get the final set of 
rows.
+
+A better approach is called **filter pushdown**, which evaluates filter 
conditions first and only decodes data that passes these conditions.
+In practice, this works by first processing only the filter columns 
(`date_time` and `location`), building a boolean mask of rows that satisfy our 
conditions, then using this mask to selectively decode only the relevant rows 
from other columns (`sensor_id`, `val`). 
+This eliminates the waste of decoding rows that will be immediately filtered 
out.
+
+While simple in theory, practical implementations often make performance worse.
+
+## How can filter pushdown be slower?
+
+At a high level, the Parquet reader first builds a filter mask -- essentially 
a boolean array indicating which rows meet the filter criteria -- and then uses 
this mask to selectively decode only the needed rows from the remaining columns 
in the projection.
+
+Let's dig into details of [how filter pushdown is 
implemented](https://github.com/apache/arrow-rs/blob/d5339f31a60a4bd8a4256e7120fe32603249d88e/parquet/src/arrow/async_reader/mod.rs#L618-L712)
 in the current Rust Parquet reader implementation, illustrated in the 
following figure.
+
+<figure>
+  <img src="/blog/images/parquet-pushdown/baseline-impl.jpg" 
alt="Implementation of filter pushdown in Rust Parquet readers" 
class="img-responsive" with="70%">
+  <figcaption>
+    Implementation of filter pushdown in Rust Parquet readers -- the first 
phase builds the filter mask, the second phase applies the filter mask to the 
other columns
+  </figcaption>
+</figure>
+
+The filter pushdown has two phases:
+
+1. Build the filter mask (steps 1-3)
+
+2. Use the filter mask to selectively decode other columns (steps 4-7), e.g., 
output step 3 is used as input for step 5 and 7.
+
+Within each phase, it takes three steps from Parquet to Arrow:
+
+1. Decompress the Parquet pages using generic decompression algorithms like 
LZ4, Zstd, etc. (steps 1, 4, 6)
+
+2. Decode the page content into Arrow format (steps 2, 5, 7)
+
+3. Evaluate the filter over Arrow data (step 3)
+
+In the figure above, we can see that `location` is **decompressed and decoded 
twice**, first when building the filter mask (steps 1, 2), and second when 
building the output (steps 4, 5).
+This happens for all columns that appear both in the filter and output.
+
+The table below shows the corresponding CPU time on the [ClickBench query 
22](https://github.com/apache/datafusion/blob/main/benchmarks/queries/clickbench/queries.sql#L23):
+
+```
++------------+--------+-------------+--------+
+| Decompress | Decode | Apply filter| Others |
++------------+--------+-------------+--------+
+| 206 ms     | 117 ms | 22 ms       | 48 ms  |
++------------+--------+-------------+--------+
+```
+
+Clearly, decompress/decode operations dominate the time spent. With filter 
pushdown, it needs to decompress/decode twice; but without filter pushdown, it 
only needs to do this once.
+This explains why filter pushdown is slower in some cases.
+
+
+> **Note:** Highly selective filters may skip the entire page; but as long as 
it reads one row from the page, it needs to decompress and often decode the 
entire page.
+
+
+## Attempt: cache filter columns
+
+Intuitively, caching the filter columns and reusing them later could help.
+
+But naively caching decoded pages consumes prohibitively high memory:
+
+1. It needs to cache Arrow arrays, which are on average [4x larger than 
Parquet 
data](https://github.com/XiangpengHao/liquid-cache/blob/main/dev/doc/liquid-cache-vldb.pdf).
+
+2. It needs to cache the **entire column chunk in memory**, because in Phase 1 
it builds filters over the column chunk, and only use it in Phase 2.  
+
+3. The memory usage is proportional to the number of filter columns, which can 
be unboundedly high. 
+
+Worse, caching filter columns means it needs to read partially from Parquet 
and partially from cache, which is complex to implement, likely requiring a 
substantial change to the current implementation. 
+
+> **Feel the complexity:** consider building a cache that properly handles 
nested columns, multiple filters, and filters with multiple columns.
+
+## Real solution
+
+We need a solution that:
+
+1. Is simple to implement, i.e., doesn't require thousands of lines of code.
+
+2. Incurs minimal memory overhead.
+
+This section describes my [<700 LOC PR (with lots of comments and 
tests)](https://github.com/apache/arrow-rs/pull/6921#issuecomment-2718792433) 
that **reduces total ClickBench time by 15%, with up to 2x lower latency for 
some queries, no obvious regression on other queries, and caches at most 2 
pages (~2MB) per column in memory**.
+
+
+<figure>
+  <img src="/blog/images/parquet-pushdown/new-pipeline.jpg" alt="New decoding 
pipeline, building filter mask and output columns are interleaved in a single 
pass, allowing us to cache minimal pages for minimal amount of time" 
width="80%" class="img-responsive">
+  <figcaption>
+    New decoding pipeline, building filter mask and output columns are 
interleaved in a single pass, allowing us to cache minimal pages for minimal 
amount of time
+  </figcaption>
+</figure>
+
+The new pipeline interleaves the previous two phases into a single pass, so 
that:
+
+1. The page being decompressed is immediately used to build filter masks and 
output columns.
+
+2. Decompressed pages are cached for minimal time; after one pass (steps 1-6), 
the cache memory is released for the next pass. 
+
+This allows the cache to only hold 1 page at a time, and to immediately 
discard the previous page after it's used, significantly reducing the memory 
requirement for caching.
+
+### What pages are cached?
+You may have noticed that only `location` is cached, not `val`, because `val` 
is only used for output.
+More generally, only columns that appear both in the filter and output are 
cached, and at most 1 page is cached for each such column.
+
+More examples:
+```sql
+SELECT val 
+FROM sensor_data 
+WHERE date_time > '2025-03-12' AND location = 'office';

Review Comment:
   nit: match above
   ```suggestion
   WHERE date_time > '2025-03-11' AND location = 'office';
   ```



##########
content/blog/2025-03-21-parquet-pushdown.md:
##########
@@ -0,0 +1,312 @@
+---
+layout: post
+title: Efficient Filter Pushdown in Parquet
+date: 2025-03-21
+author: Xiangpeng Hao
+categories: [performance]
+---
+
+<style>
+figure {
+  margin: 20px 0;
+}
+
+figure img {
+  display: block;
+  max-width: 80%;
+}
+
+figcaption {
+  font-style: italic;
+  margin-top: 10px;
+  color: #555;
+  font-size: 0.9em;
+  max-width: 80%;
+}
+</style>
+
+<!--
+{% comment %}
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements.  See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to you under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License.  You may obtain a copy of the License at
+
+http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+{% endcomment %}
+-->
+
+_Editor's Note: This blog was first published on [Xiangpeng Hao's blog]. 
Thanks to [InfluxData] for sponsoring this work as part of his PhD funding._
+
+[Xiangpeng Hao's blog]: https://blog.xiangpeng.systems/posts/parquet-pushdown/
+[InfluxData]: https://www.influxdata.com/
+<hr/>
+
+
+In the [previous post], we discussed how [Apache DataFusion] prunes [Apache 
Parquet] files to skip irrelevant **files/row_groups** (sometimes also 
[pages](https://parquet.apache.org/docs/file-format/pageindex/)).
+
+This post discusses how Parquet readers skip irrelevant **rows** while 
scanning data,
+leveraging Parquet's columnar layout by first reading only filter columns,
+and then selectively reading other columns only for matching rows.
+
+
+[previous post]: https://datafusion.apache.org/blog/2025/03/20/parquet-pruning
+[Apache DataFusion]: https://datafusion.apache.org/
+[Apache Parquet]: https://parquet.apache.org/
+
+## Why filter pushdown in Parquet? 
+
+Below is an example query that reads sensor data with filters on `date_time` 
and `location`.
+Without filter pushdown, all rows from location, val, and date_time columns 
are decoded before `location='office'` is evaluated. Filter pushdown is 
especially useful when the filter is selective, i.e., removes many rows.
+
+
+```sql
+SELECT val, location 
+FROM sensor_data 
+WHERE date_time > '2025-03-11' AND location = 'office';
+```
+
+<figure>
+  <img src="/blog/images/parquet-pushdown/pushdown-vs-no-pushdown.jpg" 
alt="Parquet pruning skips irrelevant files/row_groups, while filter pushdown 
skips irrelevant rows. Without filter pushdown, all rows from location, val, 
and date_time columns are decoded before `location='office'` is evaluated. 
Filter pushdown is especially useful when the filter is selective, i.e., 
removes many rows." width="80%" class="img-responsive">
+  <figcaption>
+    Parquet pruning skips irrelevant files/row_groups, while filter pushdown 
skips irrelevant rows. Without filter pushdown, all rows from location, val, 
and date_time columns are decoded before `location='office'` is evaluated. 
Filter pushdown is especially useful when the filter is selective, i.e., 
removes many rows.
+  </figcaption>
+</figure>
+
+
+In our setup, sensor data is aggregated by date — each day has its own Parquet 
file.
+At planning time, DataFusion prunes the unneeded Parquet files, i.e., 
`2025-03-10.parquet` and `2025-03-11.parquet`.
+
+Once the files to read are located, the [*DataFusion's current default 
implementation*](https://github.com/apache/datafusion/issues/3463) reads all 
the projected columns (`sensor_id`, `val`, and `location`) into Arrow 
RecordBatches, then applies the filters over `location` to get the final set of 
rows.
+
+A better approach is called **filter pushdown**, which evaluates filter 
conditions first and only decodes data that passes these conditions.
+In practice, this works by first processing only the filter columns 
(`date_time` and `location`), building a boolean mask of rows that satisfy our 
conditions, then using this mask to selectively decode only the relevant rows 
from other columns (`sensor_id`, `val`). 
+This eliminates the waste of decoding rows that will be immediately filtered 
out.
+
+While simple in theory, practical implementations often make performance worse.
+
+## How can filter pushdown be slower?
+
+At a high level, the Parquet reader first builds a filter mask -- essentially 
a boolean array indicating which rows meet the filter criteria -- and then uses 
this mask to selectively decode only the needed rows from the remaining columns 
in the projection.
+
+Let's dig into details of [how filter pushdown is 
implemented](https://github.com/apache/arrow-rs/blob/d5339f31a60a4bd8a4256e7120fe32603249d88e/parquet/src/arrow/async_reader/mod.rs#L618-L712)
 in the current Rust Parquet reader implementation, illustrated in the 
following figure.
+
+<figure>
+  <img src="/blog/images/parquet-pushdown/baseline-impl.jpg" 
alt="Implementation of filter pushdown in Rust Parquet readers" 
class="img-responsive" with="70%">
+  <figcaption>
+    Implementation of filter pushdown in Rust Parquet readers -- the first 
phase builds the filter mask, the second phase applies the filter mask to the 
other columns
+  </figcaption>
+</figure>
+
+The filter pushdown has two phases:
+
+1. Build the filter mask (steps 1-3)
+
+2. Use the filter mask to selectively decode other columns (steps 4-7), e.g., 
output step 3 is used as input for step 5 and 7.
+
+Within each phase, it takes three steps from Parquet to Arrow:
+
+1. Decompress the Parquet pages using generic decompression algorithms like 
LZ4, Zstd, etc. (steps 1, 4, 6)
+
+2. Decode the page content into Arrow format (steps 2, 5, 7)
+
+3. Evaluate the filter over Arrow data (step 3)
+
+In the figure above, we can see that `location` is **decompressed and decoded 
twice**, first when building the filter mask (steps 1, 2), and second when 
building the output (steps 4, 5).
+This happens for all columns that appear both in the filter and output.
+
+The table below shows the corresponding CPU time on the [ClickBench query 
22](https://github.com/apache/datafusion/blob/main/benchmarks/queries/clickbench/queries.sql#L23):
+
+```
++------------+--------+-------------+--------+
+| Decompress | Decode | Apply filter| Others |
++------------+--------+-------------+--------+
+| 206 ms     | 117 ms | 22 ms       | 48 ms  |
++------------+--------+-------------+--------+
+```
+
+Clearly, decompress/decode operations dominate the time spent. With filter 
pushdown, it needs to decompress/decode twice; but without filter pushdown, it 
only needs to do this once.
+This explains why filter pushdown is slower in some cases.
+
+
+> **Note:** Highly selective filters may skip the entire page; but as long as 
it reads one row from the page, it needs to decompress and often decode the 
entire page.
+
+
+## Attempt: cache filter columns
+
+Intuitively, caching the filter columns and reusing them later could help.
+
+But naively caching decoded pages consumes prohibitively high memory:
+
+1. It needs to cache Arrow arrays, which are on average [4x larger than 
Parquet 
data](https://github.com/XiangpengHao/liquid-cache/blob/main/dev/doc/liquid-cache-vldb.pdf).
+
+2. It needs to cache the **entire column chunk in memory**, because in Phase 1 
it builds filters over the column chunk, and only use it in Phase 2.  
+
+3. The memory usage is proportional to the number of filter columns, which can 
be unboundedly high. 
+
+Worse, caching filter columns means it needs to read partially from Parquet 
and partially from cache, which is complex to implement, likely requiring a 
substantial change to the current implementation. 
+
+> **Feel the complexity:** consider building a cache that properly handles 
nested columns, multiple filters, and filters with multiple columns.
+
+## Real solution
+
+We need a solution that:
+
+1. Is simple to implement, i.e., doesn't require thousands of lines of code.
+
+2. Incurs minimal memory overhead.
+
+This section describes my [<700 LOC PR (with lots of comments and 
tests)](https://github.com/apache/arrow-rs/pull/6921#issuecomment-2718792433) 
that **reduces total ClickBench time by 15%, with up to 2x lower latency for 
some queries, no obvious regression on other queries, and caches at most 2 
pages (~2MB) per column in memory**.
+
+
+<figure>
+  <img src="/blog/images/parquet-pushdown/new-pipeline.jpg" alt="New decoding 
pipeline, building filter mask and output columns are interleaved in a single 
pass, allowing us to cache minimal pages for minimal amount of time" 
width="80%" class="img-responsive">
+  <figcaption>
+    New decoding pipeline, building filter mask and output columns are 
interleaved in a single pass, allowing us to cache minimal pages for minimal 
amount of time
+  </figcaption>
+</figure>
+
+The new pipeline interleaves the previous two phases into a single pass, so 
that:
+
+1. The page being decompressed is immediately used to build filter masks and 
output columns.
+
+2. Decompressed pages are cached for minimal time; after one pass (steps 1-6), 
the cache memory is released for the next pass. 
+
+This allows the cache to only hold 1 page at a time, and to immediately 
discard the previous page after it's used, significantly reducing the memory 
requirement for caching.
+
+### What pages are cached?
+You may have noticed that only `location` is cached, not `val`, because `val` 
is only used for output.
+More generally, only columns that appear both in the filter and output are 
cached, and at most 1 page is cached for each such column.
+
+More examples:
+```sql
+SELECT val 
+FROM sensor_data 
+WHERE date_time > '2025-03-12' AND location = 'office';
+```
+
+In this case, no columns are cached, because `val` is not used for filtering.
+
+```sql
+SELECT COUNT(*) 
+FROM sensor_data 
+WHERE date_time > '2025-03-12' AND location = 'office';

Review Comment:
   nit: match above
   ```suggestion
   WHERE date_time > '2025-03-11' AND location = 'office';
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org
For additional commands, e-mail: github-h...@datafusion.apache.org

Reply via email to