This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion-site.git


The following commit(s) were added to refs/heads/main by this push:
     new f69c5cc  Dynamic filters blog post (rev 2) (#103)
f69c5cc is described below

commit f69c5cc9380ceff24e39728f5fc9306b11538ca9
Author: Andrew Lamb <and...@nerdnetworks.org>
AuthorDate: Wed Sep 10 08:36:52 2025 -0700

    Dynamic filters blog post (rev 2) (#103)
    
    * start dynamic filters
    
    * start blog post
    
    * update image
    
    * finish first draft
    
    * cleanup
    
    * fix typos
    
    Co-authored-by: Copilot <175728472+copi...@users.noreply.github.com>
    
    * Update content/blog/2025-08-16-dynamic-filters.md
    
    Co-authored-by: Copilot <175728472+copi...@users.noreply.github.com>
    
    * Update content/blog/2025-08-16-dynamic-filters.md
    
    Co-authored-by: Copilot <175728472+copi...@users.noreply.github.com>
    
    * Update content/blog/2025-08-16-dynamic-filters.md
    
    Co-authored-by: Copilot <175728472+copi...@users.noreply.github.com>
    
    * Update date + titl
    
    * Render markdown tables
    
    * update
    
    * Fix images
    
    * Add about author, about DataFusion sections, correct reference
    
    * Start filling out some more background
    
    * more
    
    * Update date
    
    * Take a pass at background / content
    
    * Take a pass at background / content
    
    * More content obsession
    
    * footnotes
    
    * update
    
    * AI proofreading
    
    * tweak
    
    * Work on join section
    
    * add acknowledgements
    
    * update future work
    
    * complete
    
    * proofread
    
    * proofread
    
    * proofread
    
    * proofread
    
    * cleanups, thanks claude
    
    * tweaks
    
    * Update results chart
    
    * Update join performance figure
    
    * tweak capitalization
    
    * clarify hash join section and add bullet points
    
    * Add djanderson to aknowledgemnts
    
    * clarify join selectivity comment
    
    ---------
    
    Co-authored-by: Adrian Garcia Badaracco 
<1755071+adria...@users.noreply.github.com>
    Co-authored-by: Copilot <175728472+copi...@users.noreply.github.com>
---
 content/blog/2025-09-10-dynamic-filters.md         | 650 +++++++++++++++++++++
 content/images/dynamic-filters/execution-time.svg  |   1 +
 .../images/dynamic-filters/join-performance.svg    |   1 +
 .../images/dynamic-filters/query-plan-naive.png    | Bin 0 -> 111826 bytes
 .../query-plan-topk-dynamic-filters.png            | Bin 0 -> 160390 bytes
 content/images/dynamic-filters/query-plan-topk.png | Bin 0 -> 112692 bytes
 pelicanconf.py                                     |   6 +-
 7 files changed, 654 insertions(+), 4 deletions(-)

diff --git a/content/blog/2025-09-10-dynamic-filters.md 
b/content/blog/2025-09-10-dynamic-filters.md
new file mode 100644
index 0000000..058614c
--- /dev/null
+++ b/content/blog/2025-09-10-dynamic-filters.md
@@ -0,0 +1,650 @@
+---
+layout: post
+title: Dynamic Filters: Passing Information Between Operators During Execution 
for 10x Faster Queries
+date: 2025-09-10
+author: Adrian Garcia Badaracco (Pydantic), Andrew Lamb (InfluxData)
+categories: [features]
+---
+<!--
+{% comment %}
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements.  See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to you under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License.  You may obtain a copy of the License at
+
+http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+{% endcomment %}
+-->
+
+<!-- 
+diagrams source: 
https://docs.google.com/presentation/d/1FFYy27ydZdeFZWWuMjZGnYKUx9QNJfzuVLAH8AE5wlc/edit?slide=id.g364a74cba3d_0_92#slide=id.g364a74cba3d_0_92
+Intended Audience: Query engine / data systems developers who want to learn 
about topk optimization
+Goal: Introduce TopK and dynamic filters as general optimization techniques 
for query engines, and how they were used to improve performance in DataFusion.
+-->
+
+This blog post introduces the query engine optimization techniques called TopK
+and dynamic filters. We describe the motivating use case, how these
+optimizations work, and how we implemented them with the [Apache DataFusion]
+community to improve performance by an order of magnitude for some query
+patterns.
+
+[Apache DataFusion]: https://datafusion.apache.org/
+
+## Motivation and Results
+
+The main commercial product at [Pydantic], [Logfire], is an observability
+platform built on DataFusion. One of the most common workflows / queries is
+"show me the last K traces" which translates to a query similar to:
+
+[Pydantic]: https://pydantic.dev
+[Logfire]: https://pydantic.dev/logfire
+
+```sql
+SELECT * FROM records ORDER BY start_timestamp DESC LIMIT 1000;
+```
+
+We noticed this was *pretty slow*, even though DataFusion has long had the
+classic `TopK` optimization (described below). After implementing the dynamic
+filter techniques described in this blog, we saw performance improve *by over 
10x*
+for this query pattern, and are applying the optimization to other queries and
+operators as well.
+
+Let's look at some preliminary numbers, using [ClickBench] [Q23], which has 
+the same pattern as our motivating example:
+
+```sql
+SELECT * FROM hits WHERE "URL" LIKE '%google%' ORDER BY "EventTime" LIMIT 10;
+```
+
+<div class="text-center">
+<img 
+  src="/blog/images/dynamic-filters/execution-time.svg" 
+  width="80%" 
+  class="img-responsive" 
+  alt="Q23 Performance Improvement with Dynamic Filters and Late 
Materialization"
+/>
+</div>
+
+**Figure 1**: Execution times for ClickBench Q23 with and without dynamic
+filters (DF)<sup id="fn1">[1](#footnote1)</sup>, and late materialization
+(LM)<sup id="fn2">[2](#footnote2)</sup> for different partitions / core usage.
+Dynamic filters alone (yellow) and late materialization alone (red) show a 
large
+improvement over the baseline (blue). When both optimizations are enabled 
(green)
+performance improves by up to 22x. See the appendix for more measurement 
details.
+
+
+## Background: TopK and Dynamic Filters
+
+To explain how dynamic filters improve query performance, we first need to
+explain the so-called "TopK" optimization. To do so, we will use a simplified
+version of ClickBench Q23:
+
+```sql
+SELECT * 
+FROM hits 
+ORDER BY "EventTime"
+LIMIT 10
+```
+
+[Q23]: 
https://github.com/apache/datafusion/blob/main/benchmarks/queries/clickbench/queries/q23.sql
+[ClickBench]: https://benchmark.clickhouse.com/
+
+A straightforward, though slow, plan to answer this query is shown in Figure 2.
+
+<div class="text-center">
+<img 
+  src="/blog/images/dynamic-filters/query-plan-naive.png" 
+  width="80%" 
+  class="img-responsive" 
+  alt="Naive Query Plan"
+/>
+</div>
+
+**Figure 2**: Simple Query Plan for ClickBench Q23. Data flows in plans from 
the
+scan at the bottom to the limit at the top. This plan reads all 100M rows of 
the
+`hits` table, sorts them by `EventTime`, and then discards everything except 
the top 10 rows.
+
+This naive plan requires substantial effort as all columns from all rows are
+decoded and sorted, even though only 10 are returned. 
+
+High-performance query engines typically avoid the expensive full sort with a
+specialized operator that tracks the current top rows using a [heap], rather
+than sorting all the data. For example, this operator
+is called [TopK in DataFusion], [SortWithLimit in Snowflake], and [topn in
+DuckDB]. The plan for Q23 using this specialized operator is shown in Figure 3.
+
+[heap]: https://en.wikipedia.org/wiki/Heap_(data_structure)
+[TopK in DataFusion]: 
https://docs.rs/datafusion/latest/datafusion/physical_plan/struct.TopK.html
+[SortWithLimit in Snowflake]: 
https://docs.snowflake.com/en/user-guide/ui-snowsight-activity
+[topn in DuckDB]: https://duckdb.org/2024/10/25/topn.html#introduction-to-top-n
+
+<div class="text-center">
+<img 
+  src="/blog/images/dynamic-filters/query-plan-topk.png" 
+  width="80%" 
+  class="img-responsive" 
+  alt="TopK Query Plan"
+/>
+</div>
+
+**Figure 3**: Query plan for Q23 in DataFusion using the TopK operator. This
+plan still reads all 100M rows of the `hits` table, but instead of first 
sorting
+them all by `EventTime`, the TopK operator keeps track of the current top 10
+rows using a min/max heap. Credit to [Visualgo](https://visualgo.net/en) for 
the
+heap icon
+
+Figure 3 is better, but it still reads and decodes all 100M rows of the `hits` 
table,
+which is often unnecessary once we have found the top 10 rows. For example,
+while running the query, if the current top 10 rows all have `EventTime` in
+2025, then any subsequent rows with `EventTime` in 2024 or earlier can be
+skipped entirely without reading or decoding them. This technique is especially
+effective at skipping entire files or row groups if the top 10 values are in 
the
+first few files read, which is very common when the
+data insert order is approximately the same as the timestamp order.
+
+Leveraging this insight is the key idea behind dynamic filters, which introduce
+a runtime mechanism for the TopK operator to provide the current top values to
+the scan operator, allowing it to skip unnecessary rows, entire files, or 
portions
+of files. The plan for Q23 with dynamic filters is shown in Figure 4.
+
+<div class="text-center">
+<img 
+  src="/blog/images/dynamic-filters/query-plan-topk-dynamic-filters.png" 
+  width="100%" 
+  class="img-responsive" 
+  alt="TopK Query Plan with Dynamic Filters"
+/>
+</div>
+
+**Figure 4**: Query plan for Q23 in DataFusion with specialized TopK operator
+and dynamic filters. The TopK operator provides the minimum `EventTime` of the
+current top 10 rows to the scan operator, allowing it to skip rows with
+`EventTime` later than that value. The scan operator uses this dynamic filter
+to skip unnecessary files and rows, reducing the amount of data that needs to
+be read and processed.
+
+## Worked Example
+
+To make dynamic filters more concrete, here is a fully worked example. Imagine
+we have a table `records` with a column `start_timestamp` and we are running 
the
+motivating query:
+
+```sql
+SELECT * 
+FROM records 
+ORDER BY start_timestamp 
+DESC LIMIT 3;
+```
+
+In this example, at some point during execution, the heap in the `TopK` 
operator
+will contain the actual 3 most recent values, which might be:
+
+| start_timestamp          |
+|--------------------------|
+| 2025-08-16T20:35:15.00Z  |
+| 2025-08-16T20:35:14.00Z  |
+| 2025-08-16T20:35:13.00Z  |
+
+Since `2025-08-16T20:35:13.00Z` is the smallest of these values, we know that
+any subsequent rows with `start_timestamp` less than or equal to this value
+cannot possibly be in the top 3, and can be skipped entirely.
+This knowledge is encoded in a filter of the form `start_timestamp >
+'2025-08-16T20:35:13.00Z'`. If we knew the correct timestamp value before
+starting the plan, we could simply write:
+
+```sql
+SELECT *
+FROM records
+WHERE start_timestamp > '2025-08-16T20:35:13.00Z'  -- Filter to skip rows
+ORDER BY start_timestamp DESC
+LIMIT 3;
+```
+
+And DataFusion's existing hierarchical pruning (described in [this blog]) would
+skip reading unnecessary files and row groups, and only decode
+the necessary rows.
+
+[this blog]: 
https://datafusion.apache.org/blog/2025/08/15/external-parquet-indexes/
+
+However, obviously when we start running the query we don't have the value
+`'2025-08-16T20:35:13.00Z'`, so what DataFusion now does is put a dynamic 
filter
+into the plan instead, which you can think of as a function call like
+`dynamic_filter()`, something like this:
+
+```sql
+SELECT *
+FROM records
+WHERE dynamic_filter() -- Updated during execution as we know more
+ORDER BY start_timestamp DESC
+LIMIT 3;
+```
+
+In this case, `dynamic_filter()` initially has the value `true` (passes all
+rows) but will be progressively updated by the TopK operator as the query
+progresses to filter more and more rows. Note that while we are using SQL for
+illustrative purposes in this example, these optimizations are done at the
+physical plan ([ExecutionPlan]) level — and they apply equally to SQL, 
DataFrame
+APIs, and custom query languages built with DataFusion.
+
+[ExecutionPlan]: 
https://docs.rs/datafusion/latest/datafusion/physical_plan/trait.ExecutionPlan.html
+
+## TopK + Dynamic Filters
+
+As mentioned above, DataFusion has a specialized sort operator named [TopK] 
that
+only keeps `K` rows in memory. For a `DESC` sort order, each new input batch is
+compared against the current `K` largest values, and then the current `K` rows
+possibly get replaced with any new input rows that are larger. The [code is
+here].
+
+[TopK]: 
https://docs.rs/datafusion/latest/datafusion/physical_plan/struct.TopK.html
+[code is here]: 
https://github.com/apache/datafusion/blob/b4a8b5ae54d939353b7cbd5ab8aee7d3bedecb66/datafusion/physical-plan/src/topk/mod.rs
+
+Prior to dynamic filters, DataFusion had no early termination: it would read 
the
+*entire* `records` table even if it already had the top `K` rows because it
+still had to check that there were no rows that had larger `start_timestamp`.
+You can see how this is a problem if you have 2 years' worth of time-series 
data
+and the largest `1000` values of `start_timestamp` are likely within the first
+few files read. Even once the `TopK` operator has seen 1000 timestamps (e.g. on
+August 16th, 2025), DataFusion would still read all remaining files (e.g. even
+those that contain data only from 2024) just to make sure.
+
+InfluxData [optimized a similar query pattern in InfluxDB IOx] using another
+operator called `ProgressiveEvalExec`. However, `ProgressiveEvalExec` requires 
that the data
+is already sorted and a careful analysis of ordering to prove that it can be
+used and still produce correct results. That is not the case for Logfire data 
(and many other datasets):
+data tends to be *roughly* sorted (e.g. if you append to files as you receive
+it) but that does not guarantee that it is fully sorted, either within or 
between
+files. 
+
+We [discussed possible solutions] with the community, and ultimately decided to
+implement generic "dynamic filters", which are general enough to be used in
+joins as well (see next section). Our implementation appears very similar to
+recently announced optimizations in closed-source, commercial systems such as
+[Accelerating TopK Queries in Snowflake], or [self-sharpening runtime filters 
in
+Alibaba Cloud's PolarDB], and we are excited that we can offer similar features
+in an open source query engine like DataFusion.
+
+[optimized a similar query pattern in InfluxDB IOx]:  
https://www.influxdata.com/blog/making-recent-value-queries-hundreds-times-faster/
+[discussed possible solutions]: 
https://github.com/apache/datafusion/issues/15037
+[Accelerating TopK Queries in Snowflake]: 
https://program.berlinbuzzwords.de/bbuzz24/talk/3DTQJB/
+[self-sharpening runtime filters in Alibaba Cloud's PolarDB]: 
https://www.alibabacloud.com/blog/about-database-kernel-%7C-learn-about-polardb-imci-optimization-techniques_600274
+
+At the query plan level, Q23 looks like this before it is executed:
+
+```text
+┌───────────────────────────┐
+│       SortExec(TopK)      │
+│    --------------------   │
+│ EventTime@4 ASC NULLS LAST│
+│                           │
+│         limit: 10         │
+└─────────────┬─────────────┘
+┌─────────────┴─────────────┐
+│       DataSourceExec      │
+│    --------------------   │
+│         files: 100        │
+│      format: parquet      │
+│                           │
+│         predicate:        │
+│ CAST(URL AS Utf8View) LIKE│
+│      %google% AND true    │
+└───────────────────────────┘
+```
+
+**Figure 5**: Physical plan for ClickBench Q23 prior to execution. The dynamic
+filter is shown as `true` in the `predicate` field of the `DataSourceExec`
+operator.
+
+The dynamic filter is updated by the `SortExec(TopK)` operator during execution
+as shown in Figure 6.
+
+```text
+┌───────────────────────────┐
+│       SortExec(TopK)      │
+│    --------------------   │
+│ EventTime@4 ASC NULLS LAST│
+│                           │
+│         limit: 10         │
+└─────────────┬─────────────┘
+┌─────────────┴─────────────┐
+│       DataSourceExec      │
+│    --------------------   │
+│         files: 100        │
+│      format: parquet      │
+│                           │
+│         predicate:        │
+│ CAST(URL AS Utf8View) LIKE│
+│      %google% AND         │
+│ EventTime < 1372713773.0  │
+└───────────────────────────┘
+```
+**Figure 6**: Physical plan for ClickBench Q23 after execution. The dynamic 
filter has been
+updated to `EventTime < 1372713773.0`, which allows the `DataSourceExec` 
operator to skip
+files and rows that do not match the filter.
+
+## Hash Join + Dynamic Filters
+
+We spent significant effort to make dynamic filters a general-purpose
+optimization (see the Extensibility section below for more details). Instead of
+a one-off optimization for TopK queries, we created a general mechanism for
+passing information between operators during execution that can be used in 
multiple contexts. 
+We have already used the dynamic filter infrastructure to
+improve hash joins by implementing a technique called [sideways information
+passing], which is similar to [Bloom filter joins] in Apache Spark. See 
+[issue #7955] for more details.
+
+[sideways information passing]: 
https://15721.courses.cs.cmu.edu/spring2020/papers/13-execution/shrinivas-icde2013.pdf
+[Bloom filter joins]: https://issues.apache.org/jira/browse/SPARK-32268
+[issue #7955]: https://github.com/apache/datafusion/issues/7955
+
+In a Hash Join, the query engine picks one input of the join to be the "build"
+input and the other input to be the "probe" side.
+
+* First, the **build side** is loaded into memory, and turned into a hash 
table.
+
+* Then, the **probe side** is scanned, and matching rows are found by looking 
+  in the hash table. Non-matching rows are discarded and thus joins often act 
as
+  filters.
+
+Many hash joins act as selective filters for rows from the probe side (when 
only
+a small number of rows are matched), so it is natural to use the same dynamic
+filter technique. DataFusion 50.0.0 pushes down knowledge of what keys exist on
+the build side into the scan of the probe side with a dynamic filter based on
+min/max join key values. For example, if the build side only has keys in the
+range `[100, 200]`, then DataFusion will filter out all probe rows with keys
+outside that range during the scan.
+
+This simple approach is fast to evaluate and the filter improves performance
+significantly when combined with statistics pruning, late materialization, and
+other optimizations as shown in Figure 7.
+
+<div class="text-center">
+<img 
+  src="/blog/images/dynamic-filters/join-performance.svg" 
+  width="80%" 
+  class="img-responsive" 
+  alt="Join Performance Improvements with Dynamic Filters"
+/>
+</div>
+
+**Figure 7**: Join performance with and without dynamic filters. In DataFusion
+49.0.2 the join takes 2.5s, even with late materialization (LM) enabled. In
+DataFusion 50.0.0 with dynamic filters enabled (the default), the join takes
+only 0.7s, a 5x improvement. With both dynamic filters and late 
materialization,
+DataFusion 50.0.0 takes 0.1s, a 25x improvement. See this [discussion] for more
+details.
+
+[discussion]: 
https://github.com/apache/datafusion-site/pull/103#issuecomment-3262612288
+
+You can see dynamic join filters in action with the following example. 
+
+```sql
+-- create two tables: small_table with 1K rows and large_table with 100K rows
+COPY (SELECT i as k, i as v FROM generate_series(1, 1000) t(i)) TO 
'small_table.parquet';
+CREATE EXTERNAL TABLE small_table STORED AS PARQUET LOCATION 
'small_table.parquet';
+COPY (SELECT i as k FROM generate_series(1, 100000) t(i)) TO 
'large_table.parquet';
+CREATE EXTERNAL TABLE large_table STORED AS PARQUET LOCATION 
'large_table.parquet';
+
+-- Join the two tables, with a filter on small_table
+EXPLAIN 
+SELECT * 
+FROM small_table JOIN large_table ON small_table.k = large_table.k 
+WHERE small_table.v >= 50;
+```
+
+Note there are no filters on the `large_table` in the initial query, but a
+dynamic filter is introduced by DataFusion on the `large_table` scan. As the
+`small_table` is read and the hash table is built, the dynamic filter is 
updated 
+to become more and more effective. Before execution, the plan
+looks like this:
+
+```text
++---------------+------------------------------------------------------------+
+| plan_type     | plan                                                       |
++---------------+------------------------------------------------------------+
+| physical_plan | ┌───────────────────────────┐                              |
+|               | │    CoalesceBatchesExec    │                              |
+|               | │    --------------------   │                              |
+|               | │     target_batch_size:    │                              |
+|               | │            8192           │                              |
+|               | └─────────────┬─────────────┘                              |
+|               | ┌─────────────┴─────────────┐                              |
+|               | │        HashJoinExec       │                              |
+|               | │    --------------------   ├──────────────┐               |
+|               | │        on: (k = k)        │              │               |
+|               | └─────────────┬─────────────┘              │               |
+|               | ┌─────────────┴─────────────┐┌─────────────┴─────────────┐ |
+|               | │   CoalescePartitionsExec  ││      RepartitionExec      │ |
+|               | │                           ││    --------------------   │ |
+|               | │                           ││ partition_count(in->out): │ |
+|               | │                           ││          1 -> 16          │ |
+|               | │                           ││                           │ |
+|               | │                           ││    partitioning_scheme:   │ |
+|               | │                           ││    RoundRobinBatch(16)    │ |
+|               | └─────────────┬─────────────┘└─────────────┬─────────────┘ |
+|               | ┌─────────────┴─────────────┐┌─────────────┴─────────────┐ |
+|               | │    CoalesceBatchesExec    ││       DataSourceExec      │ |
+|               | │    --------------------   ││    --------------------   │ |
+|               | │     target_batch_size:    ││          files: 1         │ |
+|               | │            8192           ││      format: parquet      │ |
+|               | │                           ││      predicate: true      │ |
+|               | └─────────────┬─────────────┘└───────────────────────────┘ |
+|               | ┌─────────────┴─────────────┐                              |
+|               | │         FilterExec        │                              |
+|               | │    --------------------   │                              |
+|               | │     predicate: v >= 50    │                              |
+|               | └─────────────┬─────────────┘                              |
+|               | ┌─────────────┴─────────────┐                              |
+|               | │      RepartitionExec      │                              |
+|               | │    --------------------   │                              |
+|               | │ partition_count(in->out): │                              |
+|               | │          1 -> 16          │                              |
+|               | │                           │                              |
+|               | │    partitioning_scheme:   │                              |
+|               | │    RoundRobinBatch(16)    │                              |
+|               | └─────────────┬─────────────┘                              |
+|               | ┌─────────────┴─────────────┐                              |
+|               | │       DataSourceExec      │                              |
+|               | │    --------------------   │                              |
+|               | │          files: 1         │                              |
+|               | │      format: parquet      │                              |
+|               | │     predicate: v >= 50    │                              |
+|               | └───────────────────────────┘                              |
+|               |                                                            |
++---------------+------------------------------------------------------------+
+```
+
+**Figure 8**: Physical plan for the join query before execution. The left input
+to the join is the build side, which scans `small_table` and applies the filter
+`v >= 50`. The right input to the join is the probe side, which scans 
`large_table`
+and has the dynamic filter (shown here as the placeholder `true`).
+
+## Dynamic Filter Extensibility: Custom `ExecutionPlan` Operators
+
+We went to great efforts to ensure that dynamic filters are not a hardcoded
+black box that only works for internal operators. This is important not only 
for
+software maintainability, but also because DataFusion is used in many different
+contexts including advanced custom operators specialized for specific use 
cases.
+
+Dynamic filter creation and pushdown are implemented as methods on the
+[ExecutionPlan trait]. Thus, it is possible for user-defined, custom
+`ExecutionPlan`s to work with dynamic filters with little to no modification. 
We
+also provide an extensive library of helper structs and functions, so it often
+takes only 1-2 lines of code to implement filter pushdown support or a source 
of
+dynamic filters for custom operators.
+
+[ExecutionPlan trait]: 
https://docs.rs/datafusion/latest/datafusion/physical_plan/trait.ExecutionPlan.html
+
+This approach has already paid off, and we know of community members who have
+implemented support for dynamic filter pushdown using preview releases of
+DataFusion 50.0.0.
+
+<!-- AAL Who else has done this? -->
+
+### Design of Scan Operator Integration
+
+A core design decision is to represent dynamic filters as `Arc<dyn
+PhysicalExpr>`,  the same interface as all other expressions in DataFusion. 
This
+means that `DataSourceExec` and other scan operators do not require special
+logic to handle dynamic filters, and existing filter pushdown logic works
+without modification. We did add some new functionality to `PhysicalExpr` to
+make working with dynamic filters more performant for specific use cases:
+
+* `PhysicalExpr::generation() -> u64`: to track if a tree of filters has
+  changed (e.g. it has a dynamic filter that has been updated). For
+  example, if a predicate changes from `c1 = 'a' AND DynamicFilter [ c2 > 1]` 
to `c1 = 'a' AND
+  DynamicFilter [ c2 > 2]` the generation value will also change so operators 
know if they
+  should re-evaluate the filter against static data like file or row group
+  level statistics. This is used in the ListingTable provider to do early 
termination of reading a file if the
+  filter is updated mid scan to skip the entire file, without
+  needlessly re-evaluating file level statistics on each batch.
+
+* `PhysicalExpr::snapshot() -> Arc<dyn PhysicalExpr>`: to create a snapshot
+  of the filter at a given point in time. Dynamic filters use this to return 
the
+  current value of their inner static filter. This can be used to serialize the
+  filter across the network for distributed engines or pass to systems that
+  support specific static filter patterns (e.g. stats pruning rewrites).
+
+This is all implemented in the `DynamicFilterPhysicalExpr` struct.
+
+Another important design point was handling concurrency and information
+flow. In early designs, the scan polled the source operators on every row /
+batch, which had significant overhead. The final design is a "push" model where
+the scan path has minimal locking and the write path (e.g. the TopK
+operator) is responsible for updating the filter. You can think of
+`DynamicFilterPhysicalExpr` as an `Arc<RwLock<Arc<dyn PhysicalExpr>>>`, which
+allows the TopK operator to update the filter without blocking the scan
+operator.
+
+## Future Work
+
+Although we've made great progress and DataFusion now has one of the most
+advanced open-source dynamic filter / sideways information passing
+implementations that we know of, we see many areas of future improvement such 
as:
+
+* [Support for more types of joins]: This optimization is only implemented for
+  `INNER` hash joins so far, but it could be implemented for other join 
algorithms
+  (e.g. nested loop joins) and join types (e.g. `LEFT OUTER JOIN`).
+
+* [Push down entire hash tables to the scan operator]: Improve the 
representation
+  of the dynamic filter beyond min/max values to improve performance for joins 
with many
+  distinct matching keys that are not naturally ordered or have significant 
skew.
+
+* [Use file level statistics to order files] to match the `ORDER BY` clause as
+  much as possible. This can help TopK dynamic filters be more effective at
+  pruning by skipping more work earlier in the scan.
+
+[Support for more types of joins]: 
https://github.com/apache/datafusion/issues/16973
+[Push down entire hash tables to the scan operator]: 
https://github.com/apache/datafusion/issues/17171
+[Use file level statistics to order files]: 
https://github.com/apache/datafusion/issues/17348
+
+
+## Acknowledgements
+
+Thank you to [Pydantic] and [InfluxData] for supporting our work on DataFusion
+and open source in general. Thank you to [zhuqi-lucas], [xudong963],
+[Dandandan], and [LiaCastaneda], for helping with the dynamic join filter
+implementation and testing. Thank you to [nuno-faria] for providing join 
performance
+results and [djanderson] for their helpful review comments. 
+
+[Pydantic]: https://pydantic.dev
+[InfluxData]: https://www.influxdata.com/
+[zhuqi-lucas]: https://github.com/zhuqi-lucas
+[xudong963]: https://github.com/xudong963
+[Dandandan]: https://github.com/Dandandan
+[LiaCastaneda]: https://github.com/LiaCastaneda
+[nuno-faria]: https://github.com/nuno-faria
+[djanderson]: https://github.com/djanderson
+
+
+## About the Authors
+
+[Adrian Garcia 
Badaracco](https://www.linkedin.com/in/adrian-garcia-badaracco/) is a Founding 
Engineer at
+[Pydantic](https://pydantic.dev/), and an [Apache
+DataFusion](https://datafusion.apache.org/) committer.
+
+[Andrew Lamb](https://www.linkedin.com/in/andrewalamb/) is a Staff Engineer at
+[InfluxData](https://www.influxdata.com/), and a member of the [Apache
+DataFusion](https://datafusion.apache.org/) and [Apache 
Arrow](https://arrow.apache.org/) PMCs. He has been working on
+databases and related systems for more than 20 years.
+
+## About DataFusion
+
+[Apache DataFusion] is an extensible query engine toolkit, written
+in Rust, that uses [Apache Arrow] as its in-memory format. DataFusion and
+similar technology are part of the next generation “Deconstructed Database”
+architectures, where new systems are built on a foundation of fast, modular
+components, rather than as a single tightly integrated system.
+
+The [DataFusion community] is always looking for new contributors to help
+improve the project. If you are interested in learning more about how query
+execution works, help document or improve the DataFusion codebase, or just try
+it out, we would love for you to join us.
+
+[Apache Arrow]: https://arrow.apache.org/
+[Apache DataFusion]: https://datafusion.apache.org/
+[DataFusion community]: 
https://datafusion.apache.org/contributor-guide/communication.html
+
+## Footnotes
+
+<a id="footnote1"></a><sup>[1](#fn1)</sup> *Dynamic Filters (DF)* refers to the
+optimization described in this blog post. The TopK operator will generate a
+filter that is applied to the scan operators, which will first be used to skip
+rows and then as we open new files (if there are more to open) it will be used
+to skip entire files that do not match the filter.
+
+<a id="footnote2"></a><sup>[2](#fn2)</sup> *Late Materialization (LM)* refers 
to
+the optimization described in [this blog post]. Late Materialization is
+particularly effective when combined with dynamic filters as it can apply
+filters during a scan. Without late materialization, dynamic filters can only 
be
+used to prune row groups or entire files, which will be less effective if the
+files themselves are large or the top values are not in the first few files 
read.
+
+[this blog post]: 
https://datafusion.apache.org/blog/2025/03/21/parquet-pushdown/
+
+
+## Appendix
+
+### Queries and Data
+
+#### Figure 1: ClickBench Q23
+
+```sql
+-- Data was downloaded using apache/datafusion -> benchmarks/bench.sh -> 
./benchmarks/bench.sh data clickbench_partitioned
+create external table hits stored as parquet location 
'benchmarks/data/hits_partitioned';
+
+-- Must set for ClickBench hits_partitioned dataset. See 
https://github.com/apache/datafusion/issues/16591
+set datafusion.execution.parquet.binary_as_string = true;
+-- Only matters if pushdown_filters is enabled but they don't get enabled 
together sadly
+set datafusion.execution.parquet.reorder_filters = true;
+
+set datafusion.execution.target_partitions = 1;  -- or set to 12 to use 
multiple cores
+set datafusion.optimizer.enable_dynamic_filter_pushdown = false;
+set datafusion.execution.parquet.pushdown_filters = false;
+
+explain analyze
+SELECT *
+FROM hits
+WHERE "URL" LIKE '%google%'
+ORDER BY "EventTime"
+LIMIT 10;
+```
+
+| dynamic filters   | late materialization   |   cores |   time (s) |
+|:------------------|:-----------------------|--------:|-----------:|
+| False             | False                  |       1 |     32.039 |
+| False             | True                   |       1 |     16.903 |
+| True              | False                  |       1 |     18.195 |
+| True              | True                   |       1 |      1.42  |
+| False             | False                  |      12 |      5.04  |
+| False             | True                   |      12 |      2.37  |
+| True              | False                  |      12 |      5.055 |
+| True              | True                   |      12 |      0.602 |
diff --git a/content/images/dynamic-filters/execution-time.svg 
b/content/images/dynamic-filters/execution-time.svg
new file mode 100644
index 0000000..bf45f3c
--- /dev/null
+++ b/content/images/dynamic-filters/execution-time.svg
@@ -0,0 +1 @@
+<svg version="1.1" viewBox="0.0 0.0 960.0 540.0" fill="none" stroke="none" 
stroke-linecap="square" stroke-miterlimit="10" 
xmlns:xlink="http://www.w3.org/1999/xlink"; 
xmlns="http://www.w3.org/2000/svg";><clipPath id="g37cfe6f370d_1_0.0"><path 
d="m0 0l960.0 0l0 540.0l-960.0 0l0 -540.0z" clip-rule="nonzero"/></clipPath><g 
clip-path="url(#g37cfe6f370d_1_0.0)"><path fill="#ffffff" d="m0 0l960.0 0l0 
540.0l-960.0 0z" fill-rule="evenodd"/><path fill="#000000" fill-opacity="0.0" 
d="m50.07349 0l808. [...]
\ No newline at end of file
diff --git a/content/images/dynamic-filters/join-performance.svg 
b/content/images/dynamic-filters/join-performance.svg
new file mode 100644
index 0000000..e7fb986
--- /dev/null
+++ b/content/images/dynamic-filters/join-performance.svg
@@ -0,0 +1 @@
+<svg version="1.1" viewBox="0.0 0.0 960.0 540.0" fill="none" stroke="none" 
stroke-linecap="square" stroke-miterlimit="10" 
xmlns:xlink="http://www.w3.org/1999/xlink"; 
xmlns="http://www.w3.org/2000/svg";><clipPath id="g37cfe6f370d_1_10.0"><path 
d="m0 0l960.0 0l0 540.0l-960.0 0l0 -540.0z" clip-rule="nonzero"/></clipPath><g 
clip-path="url(#g37cfe6f370d_1_10.0)"><path fill="#ffffff" d="m0 0l960.0 0l0 
540.0l-960.0 0z" fill-rule="evenodd"/><path fill="#000000" fill-opacity="0.0" 
d="m0 0l873.3165  [...]
\ No newline at end of file
diff --git a/content/images/dynamic-filters/query-plan-naive.png 
b/content/images/dynamic-filters/query-plan-naive.png
new file mode 100644
index 0000000..5dfa95f
Binary files /dev/null and 
b/content/images/dynamic-filters/query-plan-naive.png differ
diff --git a/content/images/dynamic-filters/query-plan-topk-dynamic-filters.png 
b/content/images/dynamic-filters/query-plan-topk-dynamic-filters.png
new file mode 100644
index 0000000..4a6f5f0
Binary files /dev/null and 
b/content/images/dynamic-filters/query-plan-topk-dynamic-filters.png differ
diff --git a/content/images/dynamic-filters/query-plan-topk.png 
b/content/images/dynamic-filters/query-plan-topk.png
new file mode 100644
index 0000000..fec8365
Binary files /dev/null and b/content/images/dynamic-filters/query-plan-topk.png 
differ
diff --git a/pelicanconf.py b/pelicanconf.py
index de91d11..a409eee 100644
--- a/pelicanconf.py
+++ b/pelicanconf.py
@@ -50,12 +50,9 @@ ASF_GENID = {
  'metadata': False,
  'elements': False,
  'permalinks': False,
- 'tables': False,
+ 'tables': True,
  'headings': False,
-
-
  'toc': False,
-
  'debug': False,
 }
 
@@ -68,6 +65,7 @@ FEED_RSS = "feed.xml"
 MARKDOWN = {
     'extension_configs': {
         'markdown.extensions.fenced_code': {},
+        'markdown.extensions.tables': {},
     },
     'output_format': 'html5',
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@datafusion.apache.org
For additional commands, e-mail: commits-h...@datafusion.apache.org


Reply via email to