This is an automated email from the ASF dual-hosted git repository. alamb pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/datafusion-site.git
The following commit(s) were added to refs/heads/main by this push: new f69c5cc Dynamic filters blog post (rev 2) (#103) f69c5cc is described below commit f69c5cc9380ceff24e39728f5fc9306b11538ca9 Author: Andrew Lamb <and...@nerdnetworks.org> AuthorDate: Wed Sep 10 08:36:52 2025 -0700 Dynamic filters blog post (rev 2) (#103) * start dynamic filters * start blog post * update image * finish first draft * cleanup * fix typos Co-authored-by: Copilot <175728472+copi...@users.noreply.github.com> * Update content/blog/2025-08-16-dynamic-filters.md Co-authored-by: Copilot <175728472+copi...@users.noreply.github.com> * Update content/blog/2025-08-16-dynamic-filters.md Co-authored-by: Copilot <175728472+copi...@users.noreply.github.com> * Update content/blog/2025-08-16-dynamic-filters.md Co-authored-by: Copilot <175728472+copi...@users.noreply.github.com> * Update date + titl * Render markdown tables * update * Fix images * Add about author, about DataFusion sections, correct reference * Start filling out some more background * more * Update date * Take a pass at background / content * Take a pass at background / content * More content obsession * footnotes * update * AI proofreading * tweak * Work on join section * add acknowledgements * update future work * complete * proofread * proofread * proofread * proofread * cleanups, thanks claude * tweaks * Update results chart * Update join performance figure * tweak capitalization * clarify hash join section and add bullet points * Add djanderson to aknowledgemnts * clarify join selectivity comment --------- Co-authored-by: Adrian Garcia Badaracco <1755071+adria...@users.noreply.github.com> Co-authored-by: Copilot <175728472+copi...@users.noreply.github.com> --- content/blog/2025-09-10-dynamic-filters.md | 650 +++++++++++++++++++++ content/images/dynamic-filters/execution-time.svg | 1 + .../images/dynamic-filters/join-performance.svg | 1 + .../images/dynamic-filters/query-plan-naive.png | Bin 0 -> 111826 bytes .../query-plan-topk-dynamic-filters.png | Bin 0 -> 160390 bytes content/images/dynamic-filters/query-plan-topk.png | Bin 0 -> 112692 bytes pelicanconf.py | 6 +- 7 files changed, 654 insertions(+), 4 deletions(-) diff --git a/content/blog/2025-09-10-dynamic-filters.md b/content/blog/2025-09-10-dynamic-filters.md new file mode 100644 index 0000000..058614c --- /dev/null +++ b/content/blog/2025-09-10-dynamic-filters.md @@ -0,0 +1,650 @@ +--- +layout: post +title: Dynamic Filters: Passing Information Between Operators During Execution for 10x Faster Queries +date: 2025-09-10 +author: Adrian Garcia Badaracco (Pydantic), Andrew Lamb (InfluxData) +categories: [features] +--- +<!-- +{% comment %} +Licensed to the Apache Software Foundation (ASF) under one or more +contributor license agreements. See the NOTICE file distributed with +this work for additional information regarding copyright ownership. +The ASF licenses this file to you under the Apache License, Version 2.0 +(the "License"); you may not use this file except in compliance with +the License. You may obtain a copy of the License at + +http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +{% endcomment %} +--> + +<!-- +diagrams source: https://docs.google.com/presentation/d/1FFYy27ydZdeFZWWuMjZGnYKUx9QNJfzuVLAH8AE5wlc/edit?slide=id.g364a74cba3d_0_92#slide=id.g364a74cba3d_0_92 +Intended Audience: Query engine / data systems developers who want to learn about topk optimization +Goal: Introduce TopK and dynamic filters as general optimization techniques for query engines, and how they were used to improve performance in DataFusion. +--> + +This blog post introduces the query engine optimization techniques called TopK +and dynamic filters. We describe the motivating use case, how these +optimizations work, and how we implemented them with the [Apache DataFusion] +community to improve performance by an order of magnitude for some query +patterns. + +[Apache DataFusion]: https://datafusion.apache.org/ + +## Motivation and Results + +The main commercial product at [Pydantic], [Logfire], is an observability +platform built on DataFusion. One of the most common workflows / queries is +"show me the last K traces" which translates to a query similar to: + +[Pydantic]: https://pydantic.dev +[Logfire]: https://pydantic.dev/logfire + +```sql +SELECT * FROM records ORDER BY start_timestamp DESC LIMIT 1000; +``` + +We noticed this was *pretty slow*, even though DataFusion has long had the +classic `TopK` optimization (described below). After implementing the dynamic +filter techniques described in this blog, we saw performance improve *by over 10x* +for this query pattern, and are applying the optimization to other queries and +operators as well. + +Let's look at some preliminary numbers, using [ClickBench] [Q23], which has +the same pattern as our motivating example: + +```sql +SELECT * FROM hits WHERE "URL" LIKE '%google%' ORDER BY "EventTime" LIMIT 10; +``` + +<div class="text-center"> +<img + src="/blog/images/dynamic-filters/execution-time.svg" + width="80%" + class="img-responsive" + alt="Q23 Performance Improvement with Dynamic Filters and Late Materialization" +/> +</div> + +**Figure 1**: Execution times for ClickBench Q23 with and without dynamic +filters (DF)<sup id="fn1">[1](#footnote1)</sup>, and late materialization +(LM)<sup id="fn2">[2](#footnote2)</sup> for different partitions / core usage. +Dynamic filters alone (yellow) and late materialization alone (red) show a large +improvement over the baseline (blue). When both optimizations are enabled (green) +performance improves by up to 22x. See the appendix for more measurement details. + + +## Background: TopK and Dynamic Filters + +To explain how dynamic filters improve query performance, we first need to +explain the so-called "TopK" optimization. To do so, we will use a simplified +version of ClickBench Q23: + +```sql +SELECT * +FROM hits +ORDER BY "EventTime" +LIMIT 10 +``` + +[Q23]: https://github.com/apache/datafusion/blob/main/benchmarks/queries/clickbench/queries/q23.sql +[ClickBench]: https://benchmark.clickhouse.com/ + +A straightforward, though slow, plan to answer this query is shown in Figure 2. + +<div class="text-center"> +<img + src="/blog/images/dynamic-filters/query-plan-naive.png" + width="80%" + class="img-responsive" + alt="Naive Query Plan" +/> +</div> + +**Figure 2**: Simple Query Plan for ClickBench Q23. Data flows in plans from the +scan at the bottom to the limit at the top. This plan reads all 100M rows of the +`hits` table, sorts them by `EventTime`, and then discards everything except the top 10 rows. + +This naive plan requires substantial effort as all columns from all rows are +decoded and sorted, even though only 10 are returned. + +High-performance query engines typically avoid the expensive full sort with a +specialized operator that tracks the current top rows using a [heap], rather +than sorting all the data. For example, this operator +is called [TopK in DataFusion], [SortWithLimit in Snowflake], and [topn in +DuckDB]. The plan for Q23 using this specialized operator is shown in Figure 3. + +[heap]: https://en.wikipedia.org/wiki/Heap_(data_structure) +[TopK in DataFusion]: https://docs.rs/datafusion/latest/datafusion/physical_plan/struct.TopK.html +[SortWithLimit in Snowflake]: https://docs.snowflake.com/en/user-guide/ui-snowsight-activity +[topn in DuckDB]: https://duckdb.org/2024/10/25/topn.html#introduction-to-top-n + +<div class="text-center"> +<img + src="/blog/images/dynamic-filters/query-plan-topk.png" + width="80%" + class="img-responsive" + alt="TopK Query Plan" +/> +</div> + +**Figure 3**: Query plan for Q23 in DataFusion using the TopK operator. This +plan still reads all 100M rows of the `hits` table, but instead of first sorting +them all by `EventTime`, the TopK operator keeps track of the current top 10 +rows using a min/max heap. Credit to [Visualgo](https://visualgo.net/en) for the +heap icon + +Figure 3 is better, but it still reads and decodes all 100M rows of the `hits` table, +which is often unnecessary once we have found the top 10 rows. For example, +while running the query, if the current top 10 rows all have `EventTime` in +2025, then any subsequent rows with `EventTime` in 2024 or earlier can be +skipped entirely without reading or decoding them. This technique is especially +effective at skipping entire files or row groups if the top 10 values are in the +first few files read, which is very common when the +data insert order is approximately the same as the timestamp order. + +Leveraging this insight is the key idea behind dynamic filters, which introduce +a runtime mechanism for the TopK operator to provide the current top values to +the scan operator, allowing it to skip unnecessary rows, entire files, or portions +of files. The plan for Q23 with dynamic filters is shown in Figure 4. + +<div class="text-center"> +<img + src="/blog/images/dynamic-filters/query-plan-topk-dynamic-filters.png" + width="100%" + class="img-responsive" + alt="TopK Query Plan with Dynamic Filters" +/> +</div> + +**Figure 4**: Query plan for Q23 in DataFusion with specialized TopK operator +and dynamic filters. The TopK operator provides the minimum `EventTime` of the +current top 10 rows to the scan operator, allowing it to skip rows with +`EventTime` later than that value. The scan operator uses this dynamic filter +to skip unnecessary files and rows, reducing the amount of data that needs to +be read and processed. + +## Worked Example + +To make dynamic filters more concrete, here is a fully worked example. Imagine +we have a table `records` with a column `start_timestamp` and we are running the +motivating query: + +```sql +SELECT * +FROM records +ORDER BY start_timestamp +DESC LIMIT 3; +``` + +In this example, at some point during execution, the heap in the `TopK` operator +will contain the actual 3 most recent values, which might be: + +| start_timestamp | +|--------------------------| +| 2025-08-16T20:35:15.00Z | +| 2025-08-16T20:35:14.00Z | +| 2025-08-16T20:35:13.00Z | + +Since `2025-08-16T20:35:13.00Z` is the smallest of these values, we know that +any subsequent rows with `start_timestamp` less than or equal to this value +cannot possibly be in the top 3, and can be skipped entirely. +This knowledge is encoded in a filter of the form `start_timestamp > +'2025-08-16T20:35:13.00Z'`. If we knew the correct timestamp value before +starting the plan, we could simply write: + +```sql +SELECT * +FROM records +WHERE start_timestamp > '2025-08-16T20:35:13.00Z' -- Filter to skip rows +ORDER BY start_timestamp DESC +LIMIT 3; +``` + +And DataFusion's existing hierarchical pruning (described in [this blog]) would +skip reading unnecessary files and row groups, and only decode +the necessary rows. + +[this blog]: https://datafusion.apache.org/blog/2025/08/15/external-parquet-indexes/ + +However, obviously when we start running the query we don't have the value +`'2025-08-16T20:35:13.00Z'`, so what DataFusion now does is put a dynamic filter +into the plan instead, which you can think of as a function call like +`dynamic_filter()`, something like this: + +```sql +SELECT * +FROM records +WHERE dynamic_filter() -- Updated during execution as we know more +ORDER BY start_timestamp DESC +LIMIT 3; +``` + +In this case, `dynamic_filter()` initially has the value `true` (passes all +rows) but will be progressively updated by the TopK operator as the query +progresses to filter more and more rows. Note that while we are using SQL for +illustrative purposes in this example, these optimizations are done at the +physical plan ([ExecutionPlan]) level — and they apply equally to SQL, DataFrame +APIs, and custom query languages built with DataFusion. + +[ExecutionPlan]: https://docs.rs/datafusion/latest/datafusion/physical_plan/trait.ExecutionPlan.html + +## TopK + Dynamic Filters + +As mentioned above, DataFusion has a specialized sort operator named [TopK] that +only keeps `K` rows in memory. For a `DESC` sort order, each new input batch is +compared against the current `K` largest values, and then the current `K` rows +possibly get replaced with any new input rows that are larger. The [code is +here]. + +[TopK]: https://docs.rs/datafusion/latest/datafusion/physical_plan/struct.TopK.html +[code is here]: https://github.com/apache/datafusion/blob/b4a8b5ae54d939353b7cbd5ab8aee7d3bedecb66/datafusion/physical-plan/src/topk/mod.rs + +Prior to dynamic filters, DataFusion had no early termination: it would read the +*entire* `records` table even if it already had the top `K` rows because it +still had to check that there were no rows that had larger `start_timestamp`. +You can see how this is a problem if you have 2 years' worth of time-series data +and the largest `1000` values of `start_timestamp` are likely within the first +few files read. Even once the `TopK` operator has seen 1000 timestamps (e.g. on +August 16th, 2025), DataFusion would still read all remaining files (e.g. even +those that contain data only from 2024) just to make sure. + +InfluxData [optimized a similar query pattern in InfluxDB IOx] using another +operator called `ProgressiveEvalExec`. However, `ProgressiveEvalExec` requires that the data +is already sorted and a careful analysis of ordering to prove that it can be +used and still produce correct results. That is not the case for Logfire data (and many other datasets): +data tends to be *roughly* sorted (e.g. if you append to files as you receive +it) but that does not guarantee that it is fully sorted, either within or between +files. + +We [discussed possible solutions] with the community, and ultimately decided to +implement generic "dynamic filters", which are general enough to be used in +joins as well (see next section). Our implementation appears very similar to +recently announced optimizations in closed-source, commercial systems such as +[Accelerating TopK Queries in Snowflake], or [self-sharpening runtime filters in +Alibaba Cloud's PolarDB], and we are excited that we can offer similar features +in an open source query engine like DataFusion. + +[optimized a similar query pattern in InfluxDB IOx]: https://www.influxdata.com/blog/making-recent-value-queries-hundreds-times-faster/ +[discussed possible solutions]: https://github.com/apache/datafusion/issues/15037 +[Accelerating TopK Queries in Snowflake]: https://program.berlinbuzzwords.de/bbuzz24/talk/3DTQJB/ +[self-sharpening runtime filters in Alibaba Cloud's PolarDB]: https://www.alibabacloud.com/blog/about-database-kernel-%7C-learn-about-polardb-imci-optimization-techniques_600274 + +At the query plan level, Q23 looks like this before it is executed: + +```text +┌───────────────────────────┐ +│ SortExec(TopK) │ +│ -------------------- │ +│ EventTime@4 ASC NULLS LAST│ +│ │ +│ limit: 10 │ +└─────────────┬─────────────┘ +┌─────────────┴─────────────┐ +│ DataSourceExec │ +│ -------------------- │ +│ files: 100 │ +│ format: parquet │ +│ │ +│ predicate: │ +│ CAST(URL AS Utf8View) LIKE│ +│ %google% AND true │ +└───────────────────────────┘ +``` + +**Figure 5**: Physical plan for ClickBench Q23 prior to execution. The dynamic +filter is shown as `true` in the `predicate` field of the `DataSourceExec` +operator. + +The dynamic filter is updated by the `SortExec(TopK)` operator during execution +as shown in Figure 6. + +```text +┌───────────────────────────┐ +│ SortExec(TopK) │ +│ -------------------- │ +│ EventTime@4 ASC NULLS LAST│ +│ │ +│ limit: 10 │ +└─────────────┬─────────────┘ +┌─────────────┴─────────────┐ +│ DataSourceExec │ +│ -------------------- │ +│ files: 100 │ +│ format: parquet │ +│ │ +│ predicate: │ +│ CAST(URL AS Utf8View) LIKE│ +│ %google% AND │ +│ EventTime < 1372713773.0 │ +└───────────────────────────┘ +``` +**Figure 6**: Physical plan for ClickBench Q23 after execution. The dynamic filter has been +updated to `EventTime < 1372713773.0`, which allows the `DataSourceExec` operator to skip +files and rows that do not match the filter. + +## Hash Join + Dynamic Filters + +We spent significant effort to make dynamic filters a general-purpose +optimization (see the Extensibility section below for more details). Instead of +a one-off optimization for TopK queries, we created a general mechanism for +passing information between operators during execution that can be used in multiple contexts. +We have already used the dynamic filter infrastructure to +improve hash joins by implementing a technique called [sideways information +passing], which is similar to [Bloom filter joins] in Apache Spark. See +[issue #7955] for more details. + +[sideways information passing]: https://15721.courses.cs.cmu.edu/spring2020/papers/13-execution/shrinivas-icde2013.pdf +[Bloom filter joins]: https://issues.apache.org/jira/browse/SPARK-32268 +[issue #7955]: https://github.com/apache/datafusion/issues/7955 + +In a Hash Join, the query engine picks one input of the join to be the "build" +input and the other input to be the "probe" side. + +* First, the **build side** is loaded into memory, and turned into a hash table. + +* Then, the **probe side** is scanned, and matching rows are found by looking + in the hash table. Non-matching rows are discarded and thus joins often act as + filters. + +Many hash joins act as selective filters for rows from the probe side (when only +a small number of rows are matched), so it is natural to use the same dynamic +filter technique. DataFusion 50.0.0 pushes down knowledge of what keys exist on +the build side into the scan of the probe side with a dynamic filter based on +min/max join key values. For example, if the build side only has keys in the +range `[100, 200]`, then DataFusion will filter out all probe rows with keys +outside that range during the scan. + +This simple approach is fast to evaluate and the filter improves performance +significantly when combined with statistics pruning, late materialization, and +other optimizations as shown in Figure 7. + +<div class="text-center"> +<img + src="/blog/images/dynamic-filters/join-performance.svg" + width="80%" + class="img-responsive" + alt="Join Performance Improvements with Dynamic Filters" +/> +</div> + +**Figure 7**: Join performance with and without dynamic filters. In DataFusion +49.0.2 the join takes 2.5s, even with late materialization (LM) enabled. In +DataFusion 50.0.0 with dynamic filters enabled (the default), the join takes +only 0.7s, a 5x improvement. With both dynamic filters and late materialization, +DataFusion 50.0.0 takes 0.1s, a 25x improvement. See this [discussion] for more +details. + +[discussion]: https://github.com/apache/datafusion-site/pull/103#issuecomment-3262612288 + +You can see dynamic join filters in action with the following example. + +```sql +-- create two tables: small_table with 1K rows and large_table with 100K rows +COPY (SELECT i as k, i as v FROM generate_series(1, 1000) t(i)) TO 'small_table.parquet'; +CREATE EXTERNAL TABLE small_table STORED AS PARQUET LOCATION 'small_table.parquet'; +COPY (SELECT i as k FROM generate_series(1, 100000) t(i)) TO 'large_table.parquet'; +CREATE EXTERNAL TABLE large_table STORED AS PARQUET LOCATION 'large_table.parquet'; + +-- Join the two tables, with a filter on small_table +EXPLAIN +SELECT * +FROM small_table JOIN large_table ON small_table.k = large_table.k +WHERE small_table.v >= 50; +``` + +Note there are no filters on the `large_table` in the initial query, but a +dynamic filter is introduced by DataFusion on the `large_table` scan. As the +`small_table` is read and the hash table is built, the dynamic filter is updated +to become more and more effective. Before execution, the plan +looks like this: + +```text ++---------------+------------------------------------------------------------+ +| plan_type | plan | ++---------------+------------------------------------------------------------+ +| physical_plan | ┌───────────────────────────┐ | +| | │ CoalesceBatchesExec │ | +| | │ -------------------- │ | +| | │ target_batch_size: │ | +| | │ 8192 │ | +| | └─────────────┬─────────────┘ | +| | ┌─────────────┴─────────────┐ | +| | │ HashJoinExec │ | +| | │ -------------------- ├──────────────┐ | +| | │ on: (k = k) │ │ | +| | └─────────────┬─────────────┘ │ | +| | ┌─────────────┴─────────────┐┌─────────────┴─────────────┐ | +| | │ CoalescePartitionsExec ││ RepartitionExec │ | +| | │ ││ -------------------- │ | +| | │ ││ partition_count(in->out): │ | +| | │ ││ 1 -> 16 │ | +| | │ ││ │ | +| | │ ││ partitioning_scheme: │ | +| | │ ││ RoundRobinBatch(16) │ | +| | └─────────────┬─────────────┘└─────────────┬─────────────┘ | +| | ┌─────────────┴─────────────┐┌─────────────┴─────────────┐ | +| | │ CoalesceBatchesExec ││ DataSourceExec │ | +| | │ -------------------- ││ -------------------- │ | +| | │ target_batch_size: ││ files: 1 │ | +| | │ 8192 ││ format: parquet │ | +| | │ ││ predicate: true │ | +| | └─────────────┬─────────────┘└───────────────────────────┘ | +| | ┌─────────────┴─────────────┐ | +| | │ FilterExec │ | +| | │ -------------------- │ | +| | │ predicate: v >= 50 │ | +| | └─────────────┬─────────────┘ | +| | ┌─────────────┴─────────────┐ | +| | │ RepartitionExec │ | +| | │ -------------------- │ | +| | │ partition_count(in->out): │ | +| | │ 1 -> 16 │ | +| | │ │ | +| | │ partitioning_scheme: │ | +| | │ RoundRobinBatch(16) │ | +| | └─────────────┬─────────────┘ | +| | ┌─────────────┴─────────────┐ | +| | │ DataSourceExec │ | +| | │ -------------------- │ | +| | │ files: 1 │ | +| | │ format: parquet │ | +| | │ predicate: v >= 50 │ | +| | └───────────────────────────┘ | +| | | ++---------------+------------------------------------------------------------+ +``` + +**Figure 8**: Physical plan for the join query before execution. The left input +to the join is the build side, which scans `small_table` and applies the filter +`v >= 50`. The right input to the join is the probe side, which scans `large_table` +and has the dynamic filter (shown here as the placeholder `true`). + +## Dynamic Filter Extensibility: Custom `ExecutionPlan` Operators + +We went to great efforts to ensure that dynamic filters are not a hardcoded +black box that only works for internal operators. This is important not only for +software maintainability, but also because DataFusion is used in many different +contexts including advanced custom operators specialized for specific use cases. + +Dynamic filter creation and pushdown are implemented as methods on the +[ExecutionPlan trait]. Thus, it is possible for user-defined, custom +`ExecutionPlan`s to work with dynamic filters with little to no modification. We +also provide an extensive library of helper structs and functions, so it often +takes only 1-2 lines of code to implement filter pushdown support or a source of +dynamic filters for custom operators. + +[ExecutionPlan trait]: https://docs.rs/datafusion/latest/datafusion/physical_plan/trait.ExecutionPlan.html + +This approach has already paid off, and we know of community members who have +implemented support for dynamic filter pushdown using preview releases of +DataFusion 50.0.0. + +<!-- AAL Who else has done this? --> + +### Design of Scan Operator Integration + +A core design decision is to represent dynamic filters as `Arc<dyn +PhysicalExpr>`, the same interface as all other expressions in DataFusion. This +means that `DataSourceExec` and other scan operators do not require special +logic to handle dynamic filters, and existing filter pushdown logic works +without modification. We did add some new functionality to `PhysicalExpr` to +make working with dynamic filters more performant for specific use cases: + +* `PhysicalExpr::generation() -> u64`: to track if a tree of filters has + changed (e.g. it has a dynamic filter that has been updated). For + example, if a predicate changes from `c1 = 'a' AND DynamicFilter [ c2 > 1]` to `c1 = 'a' AND + DynamicFilter [ c2 > 2]` the generation value will also change so operators know if they + should re-evaluate the filter against static data like file or row group + level statistics. This is used in the ListingTable provider to do early termination of reading a file if the + filter is updated mid scan to skip the entire file, without + needlessly re-evaluating file level statistics on each batch. + +* `PhysicalExpr::snapshot() -> Arc<dyn PhysicalExpr>`: to create a snapshot + of the filter at a given point in time. Dynamic filters use this to return the + current value of their inner static filter. This can be used to serialize the + filter across the network for distributed engines or pass to systems that + support specific static filter patterns (e.g. stats pruning rewrites). + +This is all implemented in the `DynamicFilterPhysicalExpr` struct. + +Another important design point was handling concurrency and information +flow. In early designs, the scan polled the source operators on every row / +batch, which had significant overhead. The final design is a "push" model where +the scan path has minimal locking and the write path (e.g. the TopK +operator) is responsible for updating the filter. You can think of +`DynamicFilterPhysicalExpr` as an `Arc<RwLock<Arc<dyn PhysicalExpr>>>`, which +allows the TopK operator to update the filter without blocking the scan +operator. + +## Future Work + +Although we've made great progress and DataFusion now has one of the most +advanced open-source dynamic filter / sideways information passing +implementations that we know of, we see many areas of future improvement such as: + +* [Support for more types of joins]: This optimization is only implemented for + `INNER` hash joins so far, but it could be implemented for other join algorithms + (e.g. nested loop joins) and join types (e.g. `LEFT OUTER JOIN`). + +* [Push down entire hash tables to the scan operator]: Improve the representation + of the dynamic filter beyond min/max values to improve performance for joins with many + distinct matching keys that are not naturally ordered or have significant skew. + +* [Use file level statistics to order files] to match the `ORDER BY` clause as + much as possible. This can help TopK dynamic filters be more effective at + pruning by skipping more work earlier in the scan. + +[Support for more types of joins]: https://github.com/apache/datafusion/issues/16973 +[Push down entire hash tables to the scan operator]: https://github.com/apache/datafusion/issues/17171 +[Use file level statistics to order files]: https://github.com/apache/datafusion/issues/17348 + + +## Acknowledgements + +Thank you to [Pydantic] and [InfluxData] for supporting our work on DataFusion +and open source in general. Thank you to [zhuqi-lucas], [xudong963], +[Dandandan], and [LiaCastaneda], for helping with the dynamic join filter +implementation and testing. Thank you to [nuno-faria] for providing join performance +results and [djanderson] for their helpful review comments. + +[Pydantic]: https://pydantic.dev +[InfluxData]: https://www.influxdata.com/ +[zhuqi-lucas]: https://github.com/zhuqi-lucas +[xudong963]: https://github.com/xudong963 +[Dandandan]: https://github.com/Dandandan +[LiaCastaneda]: https://github.com/LiaCastaneda +[nuno-faria]: https://github.com/nuno-faria +[djanderson]: https://github.com/djanderson + + +## About the Authors + +[Adrian Garcia Badaracco](https://www.linkedin.com/in/adrian-garcia-badaracco/) is a Founding Engineer at +[Pydantic](https://pydantic.dev/), and an [Apache +DataFusion](https://datafusion.apache.org/) committer. + +[Andrew Lamb](https://www.linkedin.com/in/andrewalamb/) is a Staff Engineer at +[InfluxData](https://www.influxdata.com/), and a member of the [Apache +DataFusion](https://datafusion.apache.org/) and [Apache Arrow](https://arrow.apache.org/) PMCs. He has been working on +databases and related systems for more than 20 years. + +## About DataFusion + +[Apache DataFusion] is an extensible query engine toolkit, written +in Rust, that uses [Apache Arrow] as its in-memory format. DataFusion and +similar technology are part of the next generation “Deconstructed Database” +architectures, where new systems are built on a foundation of fast, modular +components, rather than as a single tightly integrated system. + +The [DataFusion community] is always looking for new contributors to help +improve the project. If you are interested in learning more about how query +execution works, help document or improve the DataFusion codebase, or just try +it out, we would love for you to join us. + +[Apache Arrow]: https://arrow.apache.org/ +[Apache DataFusion]: https://datafusion.apache.org/ +[DataFusion community]: https://datafusion.apache.org/contributor-guide/communication.html + +## Footnotes + +<a id="footnote1"></a><sup>[1](#fn1)</sup> *Dynamic Filters (DF)* refers to the +optimization described in this blog post. The TopK operator will generate a +filter that is applied to the scan operators, which will first be used to skip +rows and then as we open new files (if there are more to open) it will be used +to skip entire files that do not match the filter. + +<a id="footnote2"></a><sup>[2](#fn2)</sup> *Late Materialization (LM)* refers to +the optimization described in [this blog post]. Late Materialization is +particularly effective when combined with dynamic filters as it can apply +filters during a scan. Without late materialization, dynamic filters can only be +used to prune row groups or entire files, which will be less effective if the +files themselves are large or the top values are not in the first few files read. + +[this blog post]: https://datafusion.apache.org/blog/2025/03/21/parquet-pushdown/ + + +## Appendix + +### Queries and Data + +#### Figure 1: ClickBench Q23 + +```sql +-- Data was downloaded using apache/datafusion -> benchmarks/bench.sh -> ./benchmarks/bench.sh data clickbench_partitioned +create external table hits stored as parquet location 'benchmarks/data/hits_partitioned'; + +-- Must set for ClickBench hits_partitioned dataset. See https://github.com/apache/datafusion/issues/16591 +set datafusion.execution.parquet.binary_as_string = true; +-- Only matters if pushdown_filters is enabled but they don't get enabled together sadly +set datafusion.execution.parquet.reorder_filters = true; + +set datafusion.execution.target_partitions = 1; -- or set to 12 to use multiple cores +set datafusion.optimizer.enable_dynamic_filter_pushdown = false; +set datafusion.execution.parquet.pushdown_filters = false; + +explain analyze +SELECT * +FROM hits +WHERE "URL" LIKE '%google%' +ORDER BY "EventTime" +LIMIT 10; +``` + +| dynamic filters | late materialization | cores | time (s) | +|:------------------|:-----------------------|--------:|-----------:| +| False | False | 1 | 32.039 | +| False | True | 1 | 16.903 | +| True | False | 1 | 18.195 | +| True | True | 1 | 1.42 | +| False | False | 12 | 5.04 | +| False | True | 12 | 2.37 | +| True | False | 12 | 5.055 | +| True | True | 12 | 0.602 | diff --git a/content/images/dynamic-filters/execution-time.svg b/content/images/dynamic-filters/execution-time.svg new file mode 100644 index 0000000..bf45f3c --- /dev/null +++ b/content/images/dynamic-filters/execution-time.svg @@ -0,0 +1 @@ +<svg version="1.1" viewBox="0.0 0.0 960.0 540.0" fill="none" stroke="none" stroke-linecap="square" stroke-miterlimit="10" xmlns:xlink="http://www.w3.org/1999/xlink" xmlns="http://www.w3.org/2000/svg"><clipPath id="g37cfe6f370d_1_0.0"><path d="m0 0l960.0 0l0 540.0l-960.0 0l0 -540.0z" clip-rule="nonzero"/></clipPath><g clip-path="url(#g37cfe6f370d_1_0.0)"><path fill="#ffffff" d="m0 0l960.0 0l0 540.0l-960.0 0z" fill-rule="evenodd"/><path fill="#000000" fill-opacity="0.0" d="m50.07349 0l808. [...] \ No newline at end of file diff --git a/content/images/dynamic-filters/join-performance.svg b/content/images/dynamic-filters/join-performance.svg new file mode 100644 index 0000000..e7fb986 --- /dev/null +++ b/content/images/dynamic-filters/join-performance.svg @@ -0,0 +1 @@ +<svg version="1.1" viewBox="0.0 0.0 960.0 540.0" fill="none" stroke="none" stroke-linecap="square" stroke-miterlimit="10" xmlns:xlink="http://www.w3.org/1999/xlink" xmlns="http://www.w3.org/2000/svg"><clipPath id="g37cfe6f370d_1_10.0"><path d="m0 0l960.0 0l0 540.0l-960.0 0l0 -540.0z" clip-rule="nonzero"/></clipPath><g clip-path="url(#g37cfe6f370d_1_10.0)"><path fill="#ffffff" d="m0 0l960.0 0l0 540.0l-960.0 0z" fill-rule="evenodd"/><path fill="#000000" fill-opacity="0.0" d="m0 0l873.3165 [...] \ No newline at end of file diff --git a/content/images/dynamic-filters/query-plan-naive.png b/content/images/dynamic-filters/query-plan-naive.png new file mode 100644 index 0000000..5dfa95f Binary files /dev/null and b/content/images/dynamic-filters/query-plan-naive.png differ diff --git a/content/images/dynamic-filters/query-plan-topk-dynamic-filters.png b/content/images/dynamic-filters/query-plan-topk-dynamic-filters.png new file mode 100644 index 0000000..4a6f5f0 Binary files /dev/null and b/content/images/dynamic-filters/query-plan-topk-dynamic-filters.png differ diff --git a/content/images/dynamic-filters/query-plan-topk.png b/content/images/dynamic-filters/query-plan-topk.png new file mode 100644 index 0000000..fec8365 Binary files /dev/null and b/content/images/dynamic-filters/query-plan-topk.png differ diff --git a/pelicanconf.py b/pelicanconf.py index de91d11..a409eee 100644 --- a/pelicanconf.py +++ b/pelicanconf.py @@ -50,12 +50,9 @@ ASF_GENID = { 'metadata': False, 'elements': False, 'permalinks': False, - 'tables': False, + 'tables': True, 'headings': False, - - 'toc': False, - 'debug': False, } @@ -68,6 +65,7 @@ FEED_RSS = "feed.xml" MARKDOWN = { 'extension_configs': { 'markdown.extensions.fenced_code': {}, + 'markdown.extensions.tables': {}, }, 'output_format': 'html5', } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@datafusion.apache.org For additional commands, e-mail: commits-h...@datafusion.apache.org