This is an automated email from the ASF dual-hosted git repository.

adriangb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new 1429c92474 Dynamic filter pushdown for TopK sorts (#15770)
1429c92474 is described below

commit 1429c92474238a91a09f1cd4a68c19d03329b6a7
Author: Adrian Garcia Badaracco <1755071+adria...@users.noreply.github.com>
AuthorDate: Tue Jun 17 13:37:44 2025 -0500

    Dynamic filter pushdown for TopK sorts (#15770)
    
    Co-authored-by: Daniƫl Heres <danielhe...@gmail.com>
---
 datafusion/common/src/config.rs                    |   7 +
 datafusion/core/tests/fuzz_cases/mod.rs            |   1 +
 .../core/tests/fuzz_cases/topk_filter_pushdown.rs  | 387 +++++++++++++++++++++
 .../physical_optimizer/filter_pushdown/mod.rs      | 173 ++++++++-
 .../physical_optimizer/filter_pushdown/util.rs     |  17 +
 datafusion/datasource/src/source.rs                |   3 +-
 datafusion/physical-expr/src/expressions/mod.rs    |   1 +
 .../physical-optimizer/src/filter_pushdown.rs      |  47 ++-
 datafusion/physical-optimizer/src/optimizer.rs     |  10 +-
 datafusion/physical-plan/src/coalesce_batches.rs   |   5 +-
 datafusion/physical-plan/src/execution_plan.rs     |  13 +-
 datafusion/physical-plan/src/filter.rs             |  14 +-
 datafusion/physical-plan/src/filter_pushdown.rs    |  39 +++
 datafusion/physical-plan/src/repartition/mod.rs    |   5 +-
 datafusion/physical-plan/src/sorts/sort.rs         |  61 +++-
 datafusion/physical-plan/src/topk/mod.rs           | 258 +++++++++++++-
 datafusion/sqllogictest/test_files/explain.slt     |   3 +
 .../sqllogictest/test_files/information_schema.slt |   2 +
 datafusion/sqllogictest/test_files/limit.slt       |   2 +-
 datafusion/sqllogictest/test_files/topk.slt        |  14 +-
 docs/source/user-guide/configs.md                  |   1 +
 21 files changed, 1003 insertions(+), 60 deletions(-)

diff --git a/datafusion/common/src/config.rs b/datafusion/common/src/config.rs
index 8324ce130a..b7aca9e270 100644
--- a/datafusion/common/src/config.rs
+++ b/datafusion/common/src/config.rs
@@ -614,6 +614,13 @@ config_namespace! {
         /// during aggregations, if possible
         pub enable_topk_aggregation: bool, default = true
 
+        /// When set to true attempts to push down dynamic filters generated 
by operators into the file scan phase.
+        /// For example, for a query such as `SELECT * FROM t ORDER BY 
timestamp DESC LIMIT 10`, the optimizer
+        /// will attempt to push down the current top 10 timestamps that the 
TopK operator references into the file scans.
+        /// This means that if we already have 10 timestamps in the year 2025
+        /// any files that only have timestamps in the year 2024 can be 
skipped / pruned at various stages in the scan.
+        pub enable_dynamic_filter_pushdown: bool, default = true
+
         /// When set to true, the optimizer will insert filters before a join 
between
         /// a nullable and non-nullable column to filter out nulls on the 
nullable side. This
         /// filter can add additional overhead when the file format does not 
fully support
diff --git a/datafusion/core/tests/fuzz_cases/mod.rs 
b/datafusion/core/tests/fuzz_cases/mod.rs
index 8ccc2a5bc1..9e01621c02 100644
--- a/datafusion/core/tests/fuzz_cases/mod.rs
+++ b/datafusion/core/tests/fuzz_cases/mod.rs
@@ -21,6 +21,7 @@ mod join_fuzz;
 mod merge_fuzz;
 mod sort_fuzz;
 mod sort_query_fuzz;
+mod topk_filter_pushdown;
 
 mod aggregation_fuzzer;
 mod equivalence;
diff --git a/datafusion/core/tests/fuzz_cases/topk_filter_pushdown.rs 
b/datafusion/core/tests/fuzz_cases/topk_filter_pushdown.rs
new file mode 100644
index 0000000000..a5934882cb
--- /dev/null
+++ b/datafusion/core/tests/fuzz_cases/topk_filter_pushdown.rs
@@ -0,0 +1,387 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use std::collections::HashMap;
+use std::sync::{Arc, LazyLock};
+
+use arrow::array::{Int32Array, StringArray, StringDictionaryBuilder};
+use arrow::datatypes::Int32Type;
+use arrow::record_batch::RecordBatch;
+use arrow::util::pretty::pretty_format_batches;
+use arrow_schema::{DataType, Field, Schema};
+use datafusion::datasource::listing::{ListingOptions, ListingTable, 
ListingTableConfig};
+use datafusion::prelude::{SessionConfig, SessionContext};
+use datafusion_datasource::ListingTableUrl;
+use datafusion_datasource_parquet::ParquetFormat;
+use datafusion_execution::object_store::ObjectStoreUrl;
+use itertools::Itertools;
+use object_store::memory::InMemory;
+use object_store::path::Path;
+use object_store::{ObjectStore, PutPayload};
+use parquet::arrow::ArrowWriter;
+use rand::rngs::StdRng;
+use rand::{Rng, SeedableRng};
+use tokio::sync::Mutex;
+use tokio::task::JoinSet;
+
+#[derive(Clone)]
+struct TestDataSet {
+    store: Arc<dyn ObjectStore>,
+    schema: Arc<Schema>,
+}
+
+/// List of in memory parquet files with UTF8 data
+// Use a mutex rather than LazyLock to allow for async initialization
+static TESTFILES: LazyLock<Mutex<Vec<TestDataSet>>> =
+    LazyLock::new(|| Mutex::new(vec![]));
+
+async fn test_files() -> Vec<TestDataSet> {
+    let files_mutex = &TESTFILES;
+    let mut files = files_mutex.lock().await;
+    if !files.is_empty() {
+        return (*files).clone();
+    }
+
+    let mut rng = StdRng::seed_from_u64(0);
+
+    for nulls_in_ids in [false, true] {
+        for nulls_in_names in [false, true] {
+            for nulls_in_departments in [false, true] {
+                let store = Arc::new(InMemory::new());
+
+                let schema = Arc::new(Schema::new(vec![
+                    Field::new("id", DataType::Int32, nulls_in_ids),
+                    Field::new("name", DataType::Utf8, nulls_in_names),
+                    Field::new(
+                        "department",
+                        DataType::Dictionary(
+                            Box::new(DataType::Int32),
+                            Box::new(DataType::Utf8),
+                        ),
+                        nulls_in_departments,
+                    ),
+                ]));
+
+                let name_choices = if nulls_in_names {
+                    [Some("Alice"), Some("Bob"), None, Some("David"), None]
+                } else {
+                    [
+                        Some("Alice"),
+                        Some("Bob"),
+                        Some("Charlie"),
+                        Some("David"),
+                        Some("Eve"),
+                    ]
+                };
+
+                let department_choices = if nulls_in_departments {
+                    [
+                        Some("Theater"),
+                        Some("Engineering"),
+                        None,
+                        Some("Arts"),
+                        None,
+                    ]
+                } else {
+                    [
+                        Some("Theater"),
+                        Some("Engineering"),
+                        Some("Healthcare"),
+                        Some("Arts"),
+                        Some("Music"),
+                    ]
+                };
+
+                // Generate 5 files, some with overlapping or repeated ids 
some without
+                for i in 0..5 {
+                    let num_batches = rng.random_range(1..3);
+                    let mut batches = Vec::with_capacity(num_batches);
+                    for _ in 0..num_batches {
+                        let num_rows = 25;
+                        let ids = 
Int32Array::from_iter((0..num_rows).map(|file| {
+                            if nulls_in_ids {
+                                if rng.random_bool(1.0 / 10.0) {
+                                    None
+                                } else {
+                                    Some(rng.random_range(file..file + 5))
+                                }
+                            } else {
+                                Some(rng.random_range(file..file + 5))
+                            }
+                        }));
+                        let names = 
StringArray::from_iter((0..num_rows).map(|_| {
+                            // randomly select a name
+                            let idx = rng.random_range(0..name_choices.len());
+                            name_choices[idx].map(|s| s.to_string())
+                        }));
+                        let mut departments = 
StringDictionaryBuilder::<Int32Type>::new();
+                        for _ in 0..num_rows {
+                            // randomly select a department
+                            let idx = 
rng.random_range(0..department_choices.len());
+                            
departments.append_option(department_choices[idx].as_ref());
+                        }
+                        let batch = RecordBatch::try_new(
+                            schema.clone(),
+                            vec![
+                                Arc::new(ids),
+                                Arc::new(names),
+                                Arc::new(departments.finish()),
+                            ],
+                        )
+                        .unwrap();
+                        batches.push(batch);
+                    }
+                    let mut buf = vec![];
+                    {
+                        let mut writer =
+                            ArrowWriter::try_new(&mut buf, schema.clone(), 
None).unwrap();
+                        for batch in batches {
+                            writer.write(&batch).unwrap();
+                            writer.flush().unwrap();
+                        }
+                        writer.flush().unwrap();
+                        writer.finish().unwrap();
+                    }
+                    let payload = PutPayload::from(buf);
+                    let path = Path::from(format!("file_{i}.parquet"));
+                    store.put(&path, payload).await.unwrap();
+                }
+                files.push(TestDataSet { store, schema });
+            }
+        }
+    }
+    (*files).clone()
+}
+
+struct RunResult {
+    results: Vec<RecordBatch>,
+    explain_plan: String,
+}
+
+async fn run_query_with_config(
+    query: &str,
+    config: SessionConfig,
+    dataset: TestDataSet,
+) -> RunResult {
+    let store = dataset.store;
+    let schema = dataset.schema;
+    let ctx = SessionContext::new_with_config(config);
+    let url = ObjectStoreUrl::parse("memory://").unwrap();
+    ctx.register_object_store(url.as_ref(), store.clone());
+
+    let format = Arc::new(
+        ParquetFormat::default()
+            .with_options(ctx.state().table_options().parquet.clone()),
+    );
+    let options = ListingOptions::new(format);
+    let table_path = ListingTableUrl::parse("memory:///").unwrap();
+    let config = ListingTableConfig::new(table_path)
+        .with_listing_options(options)
+        .with_schema(schema);
+    let table = Arc::new(ListingTable::try_new(config).unwrap());
+
+    ctx.register_table("test_table", table).unwrap();
+
+    let results = ctx.sql(query).await.unwrap().collect().await.unwrap();
+    let explain_batches = ctx
+        .sql(&format!("EXPLAIN ANALYZE {query}"))
+        .await
+        .unwrap()
+        .collect()
+        .await
+        .unwrap();
+    let explain_plan = 
pretty_format_batches(&explain_batches).unwrap().to_string();
+    RunResult {
+        results,
+        explain_plan,
+    }
+}
+
+#[derive(Debug)]
+struct RunQueryResult {
+    query: String,
+    result: Vec<RecordBatch>,
+    expected: Vec<RecordBatch>,
+}
+
+impl RunQueryResult {
+    fn expected_formated(&self) -> String {
+        format!("{}", pretty_format_batches(&self.expected).unwrap())
+    }
+
+    fn result_formated(&self) -> String {
+        format!("{}", pretty_format_batches(&self.result).unwrap())
+    }
+
+    fn is_ok(&self) -> bool {
+        self.expected_formated() == self.result_formated()
+    }
+}
+
+/// Iterate over each line in the plan and check that one of them has 
`DataSourceExec` and `DynamicFilterPhysicalExpr` in the same line.
+fn has_dynamic_filter_expr_pushdown(plan: &str) -> bool {
+    for line in plan.lines() {
+        if line.contains("DataSourceExec") && 
line.contains("DynamicFilterPhysicalExpr") {
+            return true;
+        }
+    }
+    false
+}
+
+async fn run_query(
+    query: String,
+    cfg: SessionConfig,
+    dataset: TestDataSet,
+) -> RunQueryResult {
+    let cfg_with_dynamic_filters = cfg
+        .clone()
+        .set_bool("datafusion.optimizer.enable_dynamic_filter_pushdown", true);
+    let cfg_without_dynamic_filters = cfg
+        .clone()
+        .set_bool("datafusion.optimizer.enable_dynamic_filter_pushdown", 
false);
+
+    let expected_result =
+        run_query_with_config(&query, cfg_without_dynamic_filters, 
dataset.clone()).await;
+    let result =
+        run_query_with_config(&query, cfg_with_dynamic_filters, 
dataset.clone()).await;
+    // Check that dynamic filters were actually pushed down
+    if !has_dynamic_filter_expr_pushdown(&result.explain_plan) {
+        panic!(
+            "Dynamic filter was not pushed down in query: {query}\n\n{}",
+            result.explain_plan
+        );
+    }
+
+    RunQueryResult {
+        query: query.to_string(),
+        result: result.results,
+        expected: expected_result.results,
+    }
+}
+
+struct TestCase {
+    query: String,
+    cfg: SessionConfig,
+    dataset: TestDataSet,
+}
+
+#[tokio::test(flavor = "multi_thread")]
+async fn test_fuzz_topk_filter_pushdown() {
+    let order_columns = ["id", "name", "department"];
+    let order_directions = ["ASC", "DESC"];
+    let null_orders = ["NULLS FIRST", "NULLS LAST"];
+
+    let start = datafusion_common::instant::Instant::now();
+    let mut orders: HashMap<String, Vec<String>> = HashMap::new();
+    for order_column in &order_columns {
+        for order_direction in &order_directions {
+            for null_order in &null_orders {
+                // if there is a vec for this column insert the order, 
otherwise create a new vec
+                let ordering = format!("{order_column} {order_direction} 
{null_order}");
+                match orders.get_mut(*order_column) {
+                    Some(order_vec) => {
+                        order_vec.push(ordering);
+                    }
+                    None => {
+                        orders.insert(order_column.to_string(), 
vec![ordering]);
+                    }
+                }
+            }
+        }
+    }
+
+    let mut queries = vec![];
+
+    for limit in [1, 10] {
+        for num_order_by_columns in [1, 2, 3] {
+            for order_columns in ["id", "name", "department"]
+                .iter()
+                .combinations(num_order_by_columns)
+            {
+                for orderings in order_columns
+                    .iter()
+                    .map(|col| orders.get(**col).unwrap())
+                    .multi_cartesian_product()
+                {
+                    let query = format!(
+                        "SELECT * FROM test_table ORDER BY {} LIMIT {}",
+                        orderings.into_iter().join(", "),
+                        limit
+                    );
+                    queries.push(query);
+                }
+            }
+        }
+    }
+
+    queries.sort_unstable();
+    println!(
+        "Generated {} queries in {:?}",
+        queries.len(),
+        start.elapsed()
+    );
+
+    let start = datafusion_common::instant::Instant::now();
+    let datasets = test_files().await;
+    println!("Generated test files in {:?}", start.elapsed());
+
+    let mut test_cases = vec![];
+    for enable_filter_pushdown in [true, false] {
+        for query in &queries {
+            for dataset in &datasets {
+                let mut cfg = SessionConfig::new();
+                cfg = cfg.set_bool(
+                    "datafusion.optimizer.enable_dynamic_filter_pushdown",
+                    enable_filter_pushdown,
+                );
+                test_cases.push(TestCase {
+                    query: query.to_string(),
+                    cfg,
+                    dataset: dataset.clone(),
+                });
+            }
+        }
+    }
+
+    let start = datafusion_common::instant::Instant::now();
+    let mut join_set = JoinSet::new();
+    for tc in test_cases {
+        join_set.spawn(run_query(tc.query, tc.cfg, tc.dataset));
+    }
+    let mut results = join_set.join_all().await;
+    results.sort_unstable_by(|a, b| a.query.cmp(&b.query));
+    println!("Ran {} test cases in {:?}", results.len(), start.elapsed());
+
+    let failures = results
+        .iter()
+        .filter(|result| !result.is_ok())
+        .collect::<Vec<_>>();
+
+    for failure in &failures {
+        println!("Failure:");
+        println!("Query:\n{}", failure.query);
+        println!("\nExpected:\n{}", failure.expected_formated());
+        println!("\nResult:\n{}", failure.result_formated());
+        println!("\n\n");
+    }
+
+    if !failures.is_empty() {
+        panic!("Some test cases failed");
+    } else {
+        println!("All test cases passed");
+    }
+}
diff --git a/datafusion/core/tests/physical_optimizer/filter_pushdown/mod.rs 
b/datafusion/core/tests/physical_optimizer/filter_pushdown/mod.rs
index 16dba7e3f2..f1ef365c92 100644
--- a/datafusion/core/tests/physical_optimizer/filter_pushdown/mod.rs
+++ b/datafusion/core/tests/physical_optimizer/filter_pushdown/mod.rs
@@ -17,28 +17,41 @@
 
 use std::sync::{Arc, LazyLock};
 
-use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
+use arrow::{
+    array::record_batch,
+    datatypes::{DataType, Field, Schema, SchemaRef},
+    util::pretty::pretty_format_batches,
+};
+use arrow_schema::SortOptions;
 use datafusion::{
     logical_expr::Operator,
     physical_plan::{
         expressions::{BinaryExpr, Column, Literal},
         PhysicalExpr,
     },
+    prelude::{ParquetReadOptions, SessionConfig, SessionContext},
     scalar::ScalarValue,
 };
 use datafusion_common::config::ConfigOptions;
+use datafusion_execution::object_store::ObjectStoreUrl;
 use datafusion_functions_aggregate::count::count_udaf;
-use datafusion_physical_expr::expressions::col;
 use datafusion_physical_expr::{aggregate::AggregateExprBuilder, Partitioning};
-use datafusion_physical_optimizer::filter_pushdown::FilterPushdown;
+use datafusion_physical_expr::{expressions::col, LexOrdering, 
PhysicalSortExpr};
+use datafusion_physical_optimizer::{
+    filter_pushdown::FilterPushdown, PhysicalOptimizerRule,
+};
 use datafusion_physical_plan::{
     aggregates::{AggregateExec, AggregateMode, PhysicalGroupBy},
     coalesce_batches::CoalesceBatchesExec,
     filter::FilterExec,
     repartition::RepartitionExec,
+    sorts::sort::SortExec,
+    ExecutionPlan,
 };
 
-use util::{OptimizationTest, TestNode, TestScanBuilder};
+use futures::StreamExt;
+use object_store::{memory::InMemory, ObjectStore};
+use util::{format_plan_for_test, OptimizationTest, TestNode, TestScanBuilder};
 
 mod util;
 
@@ -50,7 +63,7 @@ fn test_pushdown_into_scan() {
 
     // expect the predicate to be pushed down into the DataSource
     insta::assert_snapshot!(
-        OptimizationTest::new(plan, FilterPushdown{}, true),
+        OptimizationTest::new(plan, FilterPushdown::new(), true),
         @r"
     OptimizationTest:
       input:
@@ -74,7 +87,7 @@ fn test_pushdown_into_scan_with_config_options() {
     insta::assert_snapshot!(
         OptimizationTest::new(
             Arc::clone(&plan),
-            FilterPushdown {},
+            FilterPushdown::new(),
             false
         ),
         @r"
@@ -93,7 +106,7 @@ fn test_pushdown_into_scan_with_config_options() {
     insta::assert_snapshot!(
         OptimizationTest::new(
             plan,
-            FilterPushdown {},
+            FilterPushdown::new(),
             true
         ),
         @r"
@@ -118,7 +131,7 @@ fn test_filter_collapse() {
     let plan = Arc::new(FilterExec::try_new(predicate2, filter1).unwrap());
 
     insta::assert_snapshot!(
-        OptimizationTest::new(plan, FilterPushdown{}, true),
+        OptimizationTest::new(plan, FilterPushdown::new(), true),
         @r"
     OptimizationTest:
       input:
@@ -146,7 +159,7 @@ fn test_filter_with_projection() {
 
     // expect the predicate to be pushed down into the DataSource but the 
FilterExec to be converted to ProjectionExec
     insta::assert_snapshot!(
-        OptimizationTest::new(plan, FilterPushdown{}, true),
+        OptimizationTest::new(plan, FilterPushdown::new(), true),
         @r"
     OptimizationTest:
       input:
@@ -169,7 +182,7 @@ fn test_filter_with_projection() {
             .unwrap(),
     );
     insta::assert_snapshot!(
-        OptimizationTest::new(plan, FilterPushdown{},true),
+        OptimizationTest::new(plan, FilterPushdown::new(),true),
         @r"
     OptimizationTest:
       input:
@@ -198,7 +211,7 @@ fn test_push_down_through_transparent_nodes() {
 
     // expect the predicate to be pushed down into the DataSource
     insta::assert_snapshot!(
-        OptimizationTest::new(plan, FilterPushdown{},true),
+        OptimizationTest::new(plan, FilterPushdown::new(),true),
         @r"
     OptimizationTest:
       input:
@@ -262,7 +275,7 @@ fn test_no_pushdown_through_aggregates() {
 
     // expect the predicate to be pushed down into the DataSource
     insta::assert_snapshot!(
-        OptimizationTest::new(plan, FilterPushdown{}, true),
+        OptimizationTest::new(plan, FilterPushdown::new(), true),
         @r"
     OptimizationTest:
       input:
@@ -293,7 +306,7 @@ fn test_node_handles_child_pushdown_result() {
     let predicate = col_lit_predicate("a", "foo", &schema());
     let plan = Arc::new(TestNode::new(true, Arc::clone(&scan), predicate));
     insta::assert_snapshot!(
-        OptimizationTest::new(plan, FilterPushdown{}, true),
+        OptimizationTest::new(plan, FilterPushdown::new(), true),
         @r"
     OptimizationTest:
       input:
@@ -312,7 +325,7 @@ fn test_node_handles_child_pushdown_result() {
     let predicate = col_lit_predicate("a", "foo", &schema());
     let plan = Arc::new(TestNode::new(true, Arc::clone(&scan), predicate));
     insta::assert_snapshot!(
-        OptimizationTest::new(plan, FilterPushdown{}, true),
+        OptimizationTest::new(plan, FilterPushdown::new(), true),
         @r"
     OptimizationTest:
       input:
@@ -332,7 +345,7 @@ fn test_node_handles_child_pushdown_result() {
     let predicate = col_lit_predicate("a", "foo", &schema());
     let plan = Arc::new(TestNode::new(false, Arc::clone(&scan), predicate));
     insta::assert_snapshot!(
-        OptimizationTest::new(plan, FilterPushdown{}, true),
+        OptimizationTest::new(plan, FilterPushdown::new(), true),
         @r"
     OptimizationTest:
       input:
@@ -346,6 +359,136 @@ fn test_node_handles_child_pushdown_result() {
     );
 }
 
+#[tokio::test]
+async fn test_topk_dynamic_filter_pushdown() {
+    let batches = vec![
+        record_batch!(
+            ("a", Utf8, ["aa", "ab"]),
+            ("b", Utf8, ["bd", "bc"]),
+            ("c", Float64, [1.0, 2.0])
+        )
+        .unwrap(),
+        record_batch!(
+            ("a", Utf8, ["ac", "ad"]),
+            ("b", Utf8, ["bb", "ba"]),
+            ("c", Float64, [2.0, 1.0])
+        )
+        .unwrap(),
+    ];
+    let scan = TestScanBuilder::new(schema())
+        .with_support(true)
+        .with_batches(batches)
+        .build();
+    let plan = Arc::new(
+        SortExec::new(
+            LexOrdering::new(vec![PhysicalSortExpr::new(
+                col("b", &schema()).unwrap(),
+                SortOptions::new(true, false), // descending, nulls_first
+            )])
+            .unwrap(),
+            Arc::clone(&scan),
+        )
+        .with_fetch(Some(1)),
+    ) as Arc<dyn ExecutionPlan>;
+
+    // expect the predicate to be pushed down into the DataSource
+    insta::assert_snapshot!(
+        OptimizationTest::new(Arc::clone(&plan), 
FilterPushdown::new_post_optimization(), true),
+        @r"
+    OptimizationTest:
+      input:
+        - SortExec: TopK(fetch=1), expr=[b@1 DESC NULLS LAST], 
preserve_partitioning=[false]
+        -   DataSourceExec: file_groups={1 group: [[test.parquet]]}, 
projection=[a, b, c], file_type=test, pushdown_supported=true
+      output:
+        Ok:
+          - SortExec: TopK(fetch=1), expr=[b@1 DESC NULLS LAST], 
preserve_partitioning=[false]
+          -   DataSourceExec: file_groups={1 group: [[test.parquet]]}, 
projection=[a, b, c], file_type=test, pushdown_supported=true, 
predicate=DynamicFilterPhysicalExpr [ true ]
+    "
+    );
+
+    // Actually apply the optimization to the plan and put some data through 
it to check that the filter is updated to reflect the TopK state
+    let mut config = ConfigOptions::default();
+    config.execution.parquet.pushdown_filters = true;
+    let plan = FilterPushdown::new_post_optimization()
+        .optimize(plan, &config)
+        .unwrap();
+    let config = SessionConfig::new().with_batch_size(2);
+    let session_ctx = SessionContext::new_with_config(config);
+    session_ctx.register_object_store(
+        ObjectStoreUrl::parse("test://").unwrap().as_ref(),
+        Arc::new(InMemory::new()),
+    );
+    let state = session_ctx.state();
+    let task_ctx = state.task_ctx();
+    let mut stream = plan.execute(0, Arc::clone(&task_ctx)).unwrap();
+    // Iterate one batch
+    stream.next().await.unwrap().unwrap();
+    // Now check what our filter looks like
+    insta::assert_snapshot!(
+        format!("{}", format_plan_for_test(&plan)),
+        @r"
+    - SortExec: TopK(fetch=1), expr=[b@1 DESC NULLS LAST], 
preserve_partitioning=[false], filter=[b@1 > bd]
+    -   DataSourceExec: file_groups={1 group: [[test.parquet]]}, 
projection=[a, b, c], file_type=test, pushdown_supported=true, 
predicate=DynamicFilterPhysicalExpr [ b@1 > bd ]
+    "
+    );
+}
+
+/// Integration test for dynamic filter pushdown with TopK.
+/// We use an integration test because there are complex interactions in the 
optimizer rules
+/// that the unit tests applying a single optimizer rule do not cover.
+#[tokio::test]
+async fn test_topk_dynamic_filter_pushdown_integration() {
+    let store = Arc::new(InMemory::new()) as Arc<dyn ObjectStore>;
+    let mut cfg = SessionConfig::new();
+    cfg.options_mut().execution.parquet.pushdown_filters = true;
+    cfg.options_mut().execution.parquet.max_row_group_size = 128;
+    let ctx = SessionContext::new_with_config(cfg);
+    ctx.register_object_store(
+        ObjectStoreUrl::parse("memory://").unwrap().as_ref(),
+        Arc::clone(&store),
+    );
+    ctx.sql(
+        r"
+COPY  (
+  SELECT 1372708800 + value AS t 
+  FROM generate_series(0, 99999)
+  ORDER BY t
+ ) TO 'memory:///1.parquet'
+STORED AS PARQUET;
+  ",
+    )
+    .await
+    .unwrap()
+    .collect()
+    .await
+    .unwrap();
+
+    // Register the file with the context
+    ctx.register_parquet(
+        "topk_pushdown",
+        "memory:///1.parquet",
+        ParquetReadOptions::default(),
+    )
+    .await
+    .unwrap();
+
+    // Create a TopK query that will use dynamic filter pushdown
+    let df = ctx
+        .sql(r"EXPLAIN ANALYZE SELECT t FROM topk_pushdown ORDER BY t LIMIT 
10;")
+        .await
+        .unwrap();
+    let batches = df.collect().await.unwrap();
+    let explain = format!("{}", pretty_format_batches(&batches).unwrap());
+
+    assert!(explain.contains("output_rows=128")); // Read 1 row group
+    assert!(explain.contains("t@0 < 1372708809")); // Dynamic filter was 
applied
+    assert!(
+        explain.contains("pushdown_rows_matched=128, 
pushdown_rows_pruned=99872"),
+        "{explain}"
+    );
+    // Pushdown pruned most rows
+}
+
 /// Schema:
 /// a: String
 /// b: String
diff --git a/datafusion/core/tests/physical_optimizer/filter_pushdown/util.rs 
b/datafusion/core/tests/physical_optimizer/filter_pushdown/util.rs
index c60d2b4d81..e793af8ed4 100644
--- a/datafusion/core/tests/physical_optimizer/filter_pushdown/util.rs
+++ b/datafusion/core/tests/physical_optimizer/filter_pushdown/util.rs
@@ -29,6 +29,7 @@ use datafusion_datasource::{
 use datafusion_physical_expr::conjunction;
 use datafusion_physical_expr_common::physical_expr::fmt_sql;
 use datafusion_physical_optimizer::PhysicalOptimizerRule;
+use datafusion_physical_plan::filter_pushdown::FilterPushdownPhase;
 use datafusion_physical_plan::{
     displayable,
     filter::FilterExec,
@@ -271,6 +272,11 @@ impl TestScanBuilder {
         self
     }
 
+    pub fn with_batches(mut self, batches: Vec<RecordBatch>) -> Self {
+        self.batches = batches;
+        self
+    }
+
     pub fn build(self) -> Arc<dyn ExecutionPlan> {
         let source = Arc::new(TestSource::new(self.support, self.batches));
         let base_config = FileScanConfigBuilder::new(
@@ -426,6 +432,15 @@ fn format_lines(s: &str) -> Vec<String> {
     s.trim().split('\n').map(|s| s.to_string()).collect()
 }
 
+pub fn format_plan_for_test(plan: &Arc<dyn ExecutionPlan>) -> String {
+    let mut out = String::new();
+    for line in format_execution_plan(plan) {
+        out.push_str(&format!("  - {line}\n"));
+    }
+    out.push('\n');
+    out
+}
+
 #[derive(Debug)]
 pub(crate) struct TestNode {
     inject_filter: bool,
@@ -496,6 +511,7 @@ impl ExecutionPlan for TestNode {
 
     fn gather_filters_for_pushdown(
         &self,
+        _phase: FilterPushdownPhase,
         parent_filters: Vec<Arc<dyn PhysicalExpr>>,
         _config: &ConfigOptions,
     ) -> Result<FilterDescription> {
@@ -506,6 +522,7 @@ impl ExecutionPlan for TestNode {
 
     fn handle_child_pushdown_result(
         &self,
+        _phase: FilterPushdownPhase,
         child_pushdown_result: ChildPushdownResult,
         _config: &ConfigOptions,
     ) -> Result<FilterPushdownPropagation<Arc<dyn ExecutionPlan>>> {
diff --git a/datafusion/datasource/src/source.rs 
b/datafusion/datasource/src/source.rs
index 29c3c9c3d7..a0f49ad7b1 100644
--- a/datafusion/datasource/src/source.rs
+++ b/datafusion/datasource/src/source.rs
@@ -36,7 +36,7 @@ use datafusion_execution::{SendableRecordBatchStream, 
TaskContext};
 use datafusion_physical_expr::{EquivalenceProperties, Partitioning, 
PhysicalExpr};
 use datafusion_physical_expr_common::sort_expr::LexOrdering;
 use datafusion_physical_plan::filter_pushdown::{
-    ChildPushdownResult, FilterPushdownPropagation,
+    ChildPushdownResult, FilterPushdownPhase, FilterPushdownPropagation,
 };
 use datafusion_physical_plan::yield_stream::wrap_yield_stream;
 
@@ -318,6 +318,7 @@ impl ExecutionPlan for DataSourceExec {
 
     fn handle_child_pushdown_result(
         &self,
+        _phase: FilterPushdownPhase,
         child_pushdown_result: ChildPushdownResult,
         config: &ConfigOptions,
     ) -> Result<FilterPushdownPropagation<Arc<dyn ExecutionPlan>>> {
diff --git a/datafusion/physical-expr/src/expressions/mod.rs 
b/datafusion/physical-expr/src/expressions/mod.rs
index d77207fbbc..8f46133ed0 100644
--- a/datafusion/physical-expr/src/expressions/mod.rs
+++ b/datafusion/physical-expr/src/expressions/mod.rs
@@ -43,6 +43,7 @@ pub use case::{case, CaseExpr};
 pub use cast::{cast, CastExpr};
 pub use column::{col, with_new_schema, Column};
 pub use datafusion_expr::utils::format_state_name;
+pub use dynamic_filters::DynamicFilterPhysicalExpr;
 pub use in_list::{in_list, InListExpr};
 pub use is_not_null::{is_not_null, IsNotNullExpr};
 pub use is_null::{is_null, IsNullExpr};
diff --git a/datafusion/physical-optimizer/src/filter_pushdown.rs 
b/datafusion/physical-optimizer/src/filter_pushdown.rs
index 5b2d47106b..885280576b 100644
--- a/datafusion/physical-optimizer/src/filter_pushdown.rs
+++ b/datafusion/physical-optimizer/src/filter_pushdown.rs
@@ -22,7 +22,8 @@ use crate::PhysicalOptimizerRule;
 use datafusion_common::{config::ConfigOptions, Result};
 use datafusion_physical_expr::PhysicalExpr;
 use datafusion_physical_plan::filter_pushdown::{
-    ChildPushdownResult, FilterPushdownPropagation, PredicateSupport, 
PredicateSupports,
+    ChildPushdownResult, FilterPushdownPhase, FilterPushdownPropagation,
+    PredicateSupport, PredicateSupports,
 };
 use datafusion_physical_plan::{with_new_children_if_necessary, ExecutionPlan};
 
@@ -362,11 +363,31 @@ use itertools::izip;
 /// [`ProjectionExec`]: datafusion_physical_plan::projection::ProjectionExec
 /// [`AggregateExec`]: datafusion_physical_plan::aggregates::AggregateExec
 #[derive(Debug)]
-pub struct FilterPushdown {}
+pub struct FilterPushdown {
+    phase: FilterPushdownPhase,
+    name: String,
+}
 
 impl FilterPushdown {
+    fn new_with_phase(phase: FilterPushdownPhase) -> Self {
+        let name = match phase {
+            FilterPushdownPhase::Pre => "FilterPushdown",
+            FilterPushdownPhase::Post => "FilterPushdown(Post)",
+        }
+        .to_string();
+        Self { phase, name }
+    }
+
+    /// Create a new [`FilterPushdown`] optimizer rule that runs in the 
pre-optimization phase.
+    /// See [`FilterPushdownPhase`] for more details.
     pub fn new() -> Self {
-        Self {}
+        Self::new_with_phase(FilterPushdownPhase::Pre)
+    }
+
+    /// Create a new [`FilterPushdown`] optimizer rule that runs in the 
post-optimization phase.
+    /// See [`FilterPushdownPhase`] for more details.
+    pub fn new_post_optimization() -> Self {
+        Self::new_with_phase(FilterPushdownPhase::Post)
     }
 }
 
@@ -382,13 +403,15 @@ impl PhysicalOptimizerRule for FilterPushdown {
         plan: Arc<dyn ExecutionPlan>,
         config: &ConfigOptions,
     ) -> Result<Arc<dyn ExecutionPlan>> {
-        Ok(push_down_filters(Arc::clone(&plan), vec![], config)?
-            .updated_node
-            .unwrap_or(plan))
+        Ok(
+            push_down_filters(Arc::clone(&plan), vec![], config, self.phase)?
+                .updated_node
+                .unwrap_or(plan),
+        )
     }
 
     fn name(&self) -> &str {
-        "FilterPushdown"
+        &self.name
     }
 
     fn schema_check(&self) -> bool {
@@ -409,6 +432,7 @@ fn push_down_filters(
     node: Arc<dyn ExecutionPlan>,
     parent_predicates: Vec<Arc<dyn PhysicalExpr>>,
     config: &ConfigOptions,
+    phase: FilterPushdownPhase,
 ) -> Result<FilterPushdownPropagation<Arc<dyn ExecutionPlan>>> {
     // If the node has any child, these will be rewritten as supported or 
unsupported
     let mut parent_predicates_pushdown_states =
@@ -418,7 +442,7 @@ fn push_down_filters(
 
     let children = node.children();
     let filter_description =
-        node.gather_filters_for_pushdown(parent_predicates.clone(), config)?;
+        node.gather_filters_for_pushdown(phase, parent_predicates.clone(), 
config)?;
 
     for (child, parent_filters, self_filters) in izip!(
         children,
@@ -460,7 +484,7 @@ fn push_down_filters(
         }
 
         // Any filters that could not be pushed down to a child are marked as 
not-supported to our parents
-        let result = push_down_filters(Arc::clone(child), all_predicates, 
config)?;
+        let result = push_down_filters(Arc::clone(child), all_predicates, 
config, phase)?;
 
         if let Some(new_child) = result.updated_node {
             // If we have a filter pushdown result, we need to update our 
children
@@ -524,8 +548,11 @@ fn push_down_filters(
             })
             .collect(),
     );
-    // Check what the current node wants to do given the result of pushdown to 
it's children
+    // TODO: by calling `handle_child_pushdown_result` we are assuming that the
+    // `ExecutionPlan` implementation will not change the plan itself.
+    // Should we have a separate method for dynamic pushdown that does not 
allow modifying the plan?
     let mut res = updated_node.handle_child_pushdown_result(
+        phase,
         ChildPushdownResult {
             parent_filters: parent_pushdown_result,
             self_filters: self_filters_pushdown_supports,
diff --git a/datafusion/physical-optimizer/src/optimizer.rs 
b/datafusion/physical-optimizer/src/optimizer.rs
index d5129cea9d..aed8160691 100644
--- a/datafusion/physical-optimizer/src/optimizer.rs
+++ b/datafusion/physical-optimizer/src/optimizer.rs
@@ -97,8 +97,10 @@ impl PhysicalOptimizer {
             // Applying the rule early means only directly-connected 
AggregateExecs must be examined.
             Arc::new(LimitedDistinctAggregation::new()),
             // The FilterPushdown rule tries to push down filters as far as it 
can.
-            // For example, it will push down filtering from a `FilterExec` to
-            // a `DataSourceExec`, or from a `TopK`'s current state to a 
`DataSourceExec`.
+            // For example, it will push down filtering from a `FilterExec` to 
`DataSourceExec`.
+            // Note that this does not push down dynamic filters (such as 
those created by a `SortExec` operator in TopK mode),
+            // those are handled by the later `FilterPushdown` rule.
+            // See `FilterPushdownPhase` for more details.
             Arc::new(FilterPushdown::new()),
             // The EnforceDistribution rule is for adding essential 
repartitioning to satisfy distribution
             // requirements. Please make sure that the whole plan tree is 
determined before this rule.
@@ -139,6 +141,10 @@ impl PhysicalOptimizer {
             // reduced by narrowing their input tables.
             Arc::new(ProjectionPushdown::new()),
             Arc::new(InsertYieldExec::new()),
+            // This FilterPushdown handles dynamic filters that may have 
references to the source ExecutionPlan.
+            // Therefore it should be run at the end of the optimization 
process since any changes to the plan may break the dynamic filter's references.
+            // See `FilterPushdownPhase` for more details.
+            Arc::new(FilterPushdown::new_post_optimization()),
             // The SanityCheckPlan rule checks whether the order and
             // distribution requirements of each node in the plan
             // is satisfied. It will also reject non-runnable query
diff --git a/datafusion/physical-plan/src/coalesce_batches.rs 
b/datafusion/physical-plan/src/coalesce_batches.rs
index f35231fb6a..78bd4b4fc3 100644
--- a/datafusion/physical-plan/src/coalesce_batches.rs
+++ b/datafusion/physical-plan/src/coalesce_batches.rs
@@ -37,7 +37,8 @@ use datafusion_physical_expr::PhysicalExpr;
 use crate::coalesce::{BatchCoalescer, CoalescerState};
 use crate::execution_plan::CardinalityEffect;
 use crate::filter_pushdown::{
-    ChildPushdownResult, FilterDescription, FilterPushdownPropagation,
+    ChildPushdownResult, FilterDescription, FilterPushdownPhase,
+    FilterPushdownPropagation,
 };
 use datafusion_common::config::ConfigOptions;
 use futures::ready;
@@ -229,6 +230,7 @@ impl ExecutionPlan for CoalesceBatchesExec {
 
     fn gather_filters_for_pushdown(
         &self,
+        _phase: FilterPushdownPhase,
         parent_filters: Vec<Arc<dyn PhysicalExpr>>,
         _config: &ConfigOptions,
     ) -> Result<FilterDescription> {
@@ -238,6 +240,7 @@ impl ExecutionPlan for CoalesceBatchesExec {
 
     fn handle_child_pushdown_result(
         &self,
+        _phase: FilterPushdownPhase,
         child_pushdown_result: ChildPushdownResult,
         _config: &ConfigOptions,
     ) -> Result<FilterPushdownPropagation<Arc<dyn ExecutionPlan>>> {
diff --git a/datafusion/physical-plan/src/execution_plan.rs 
b/datafusion/physical-plan/src/execution_plan.rs
index 5dd090c12c..75f1c90820 100644
--- a/datafusion/physical-plan/src/execution_plan.rs
+++ b/datafusion/physical-plan/src/execution_plan.rs
@@ -17,7 +17,8 @@
 
 pub use crate::display::{DefaultDisplay, DisplayAs, DisplayFormatType, 
VerboseDisplay};
 use crate::filter_pushdown::{
-    ChildPushdownResult, FilterDescription, FilterPushdownPropagation,
+    ChildPushdownResult, FilterDescription, FilterPushdownPhase,
+    FilterPushdownPropagation,
 };
 pub use crate::metrics::Metric;
 pub use crate::ordering::InputOrderMode;
@@ -509,8 +510,13 @@ pub trait ExecutionPlan: Debug + DisplayAs + Send + Sync {
     ///
     /// The default implementation bars all parent filters from being pushed 
down and adds no new filters.
     /// This is the safest option, making filter pushdown opt-in on a per-node 
pasis.
+    ///
+    /// There are two different phases in filter pushdown, which some 
operators may handle the same and some differently.
+    /// Depending on the phase the operator may or may not be allowed to 
modify the plan.
+    /// See [`FilterPushdownPhase`] for more details.
     fn gather_filters_for_pushdown(
         &self,
+        _phase: FilterPushdownPhase,
         parent_filters: Vec<Arc<dyn PhysicalExpr>>,
         _config: &ConfigOptions,
     ) -> Result<FilterDescription> {
@@ -548,10 +554,15 @@ pub trait ExecutionPlan: Debug + DisplayAs + Send + Sync {
     ///   This can be used alongside 
[`FilterPushdownPropagation::with_filters`] and 
[`FilterPushdownPropagation::with_updated_node`]
     ///   to dynamically build a result with a mix of supported and 
unsupported filters.
     ///
+    /// There are two different phases in filter pushdown, which some 
operators may handle the same and some differently.
+    /// Depending on the phase the operator may or may not be allowed to 
modify the plan.
+    /// See [`FilterPushdownPhase`] for more details.
+    ///
     /// [`PredicateSupport::Supported`]: 
crate::filter_pushdown::PredicateSupport::Supported
     /// [`PredicateSupports::new_with_supported_check`]: 
crate::filter_pushdown::PredicateSupports::new_with_supported_check
     fn handle_child_pushdown_result(
         &self,
+        _phase: FilterPushdownPhase,
         child_pushdown_result: ChildPushdownResult,
         _config: &ConfigOptions,
     ) -> Result<FilterPushdownPropagation<Arc<dyn ExecutionPlan>>> {
diff --git a/datafusion/physical-plan/src/filter.rs 
b/datafusion/physical-plan/src/filter.rs
index 25eb98a61b..252af9ebcd 100644
--- a/datafusion/physical-plan/src/filter.rs
+++ b/datafusion/physical-plan/src/filter.rs
@@ -28,7 +28,8 @@ use super::{
 use crate::common::can_project;
 use crate::execution_plan::CardinalityEffect;
 use crate::filter_pushdown::{
-    ChildPushdownResult, FilterDescription, FilterPushdownPropagation,
+    ChildPushdownResult, FilterDescription, FilterPushdownPhase,
+    FilterPushdownPropagation,
 };
 use crate::projection::{
     make_with_child, try_embed_projection, update_expr, EmbeddedProjection,
@@ -449,9 +450,14 @@ impl ExecutionPlan for FilterExec {
 
     fn gather_filters_for_pushdown(
         &self,
+        phase: FilterPushdownPhase,
         parent_filters: Vec<Arc<dyn PhysicalExpr>>,
         _config: &ConfigOptions,
     ) -> Result<FilterDescription> {
+        if !matches!(phase, FilterPushdownPhase::Pre) {
+            return Ok(FilterDescription::new_with_child_count(1)
+                .all_parent_filters_supported(parent_filters));
+        }
         let self_filter = split_conjunction(&self.predicate)
             .into_iter()
             .cloned()
@@ -503,9 +509,15 @@ impl ExecutionPlan for FilterExec {
 
     fn handle_child_pushdown_result(
         &self,
+        phase: FilterPushdownPhase,
         child_pushdown_result: ChildPushdownResult,
         _config: &ConfigOptions,
     ) -> Result<FilterPushdownPropagation<Arc<dyn ExecutionPlan>>> {
+        if !matches!(phase, FilterPushdownPhase::Pre) {
+            return Ok(FilterPushdownPropagation::transparent(
+                child_pushdown_result,
+            ));
+        }
         // We absorb any parent filters that were not handled by our children
         let mut unhandled_filters =
             child_pushdown_result.parent_filters.collect_unsupported();
diff --git a/datafusion/physical-plan/src/filter_pushdown.rs 
b/datafusion/physical-plan/src/filter_pushdown.rs
index 5222a98e3d..3bbe3997fd 100644
--- a/datafusion/physical-plan/src/filter_pushdown.rs
+++ b/datafusion/physical-plan/src/filter_pushdown.rs
@@ -20,6 +20,45 @@ use std::vec::IntoIter;
 
 use datafusion_physical_expr_common::physical_expr::PhysicalExpr;
 
+#[derive(Debug, Clone, Copy)]
+pub enum FilterPushdownPhase {
+    /// Pushdown that happens before most other optimizations.
+    /// This pushdown allows static filters that do not reference any 
[`ExecutionPlan`]s to be pushed down.
+    /// Filters that reference an [`ExecutionPlan`] cannot be pushed down at 
this stage since the whole plan tree may be rewritten
+    /// by other optimizations.
+    /// Implementers are however allowed to modify the execution plan 
themselves during this phase, for example by returning a completely
+    /// different [`ExecutionPlan`] from 
[`ExecutionPlan::handle_child_pushdown_result`].
+    ///
+    /// Pushdown of [`FilterExec`] into `DataSourceExec` is an example of a 
pre-pushdown.
+    ///
+    /// [`ExecutionPlan`]: crate::ExecutionPlan
+    /// [`FilterExec`]: crate::filter::FilterExec
+    /// [`ExecutionPlan::handle_child_pushdown_result`]: 
crate::ExecutionPlan::handle_child_pushdown_result
+    Pre,
+    /// Pushdown that happens after most other optimizations.
+    /// This stage of filter pushdown allows filters that reference an 
[`ExecutionPlan`] to be pushed down.
+    /// Since subsequent optimizations should not change the structure of the 
plan tree except for calling [`ExecutionPlan::with_new_children`]
+    /// (which generally preserves internal references) it is safe for 
references between [`ExecutionPlan`]s to be established at this stage.
+    ///
+    /// This phase is used to link a [`SortExec`] (with a TopK operator) or a 
[`HashJoinExec`] to a `DataSourceExec`.
+    ///
+    /// [`ExecutionPlan`]: crate::ExecutionPlan
+    /// [`ExecutionPlan::with_new_children`]: 
crate::ExecutionPlan::with_new_children
+    /// [`SortExec`]: crate::sorts::sort::SortExec
+    /// [`HashJoinExec`]: crate::joins::HashJoinExec
+    /// [`ExecutionPlan::handle_child_pushdown_result`]: 
crate::ExecutionPlan::handle_child_pushdown_result
+    Post,
+}
+
+impl std::fmt::Display for FilterPushdownPhase {
+    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+        match self {
+            FilterPushdownPhase::Pre => write!(f, "Pre"),
+            FilterPushdownPhase::Post => write!(f, "Post"),
+        }
+    }
+}
+
 /// The result of a plan for pushing down a filter into a child node.
 /// This contains references to filters so that nodes can mutate a filter
 /// before pushing it down to a child node (e.g. to adjust a projection)
diff --git a/datafusion/physical-plan/src/repartition/mod.rs 
b/datafusion/physical-plan/src/repartition/mod.rs
index 8b0f7e9784..d872a84d72 100644
--- a/datafusion/physical-plan/src/repartition/mod.rs
+++ b/datafusion/physical-plan/src/repartition/mod.rs
@@ -55,7 +55,8 @@ use datafusion_physical_expr::{EquivalenceProperties, 
PhysicalExpr};
 use datafusion_physical_expr_common::sort_expr::LexOrdering;
 
 use crate::filter_pushdown::{
-    ChildPushdownResult, FilterDescription, FilterPushdownPropagation,
+    ChildPushdownResult, FilterDescription, FilterPushdownPhase,
+    FilterPushdownPropagation,
 };
 use futures::stream::Stream;
 use futures::{FutureExt, StreamExt, TryStreamExt};
@@ -807,6 +808,7 @@ impl ExecutionPlan for RepartitionExec {
 
     fn gather_filters_for_pushdown(
         &self,
+        _phase: FilterPushdownPhase,
         parent_filters: Vec<Arc<dyn PhysicalExpr>>,
         _config: &ConfigOptions,
     ) -> Result<FilterDescription> {
@@ -816,6 +818,7 @@ impl ExecutionPlan for RepartitionExec {
 
     fn handle_child_pushdown_result(
         &self,
+        _phase: FilterPushdownPhase,
         child_pushdown_result: ChildPushdownResult,
         _config: &ConfigOptions,
     ) -> Result<FilterPushdownPropagation<Arc<dyn ExecutionPlan>>> {
diff --git a/datafusion/physical-plan/src/sorts/sort.rs 
b/datafusion/physical-plan/src/sorts/sort.rs
index c31f17291e..0aa284297b 100644
--- a/datafusion/physical-plan/src/sorts/sort.rs
+++ b/datafusion/physical-plan/src/sorts/sort.rs
@@ -27,6 +27,7 @@ use std::sync::Arc;
 use crate::common::spawn_buffered;
 use crate::execution_plan::{Boundedness, CardinalityEffect, EmissionType};
 use crate::expressions::PhysicalSortExpr;
+use crate::filter_pushdown::{FilterDescription, FilterPushdownPhase};
 use crate::limit::LimitStream;
 use crate::metrics::{
     BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet, SpillMetrics,
@@ -52,7 +53,9 @@ use datafusion_execution::disk_manager::RefCountedTempFile;
 use datafusion_execution::memory_pool::{MemoryConsumer, MemoryReservation};
 use datafusion_execution::runtime_env::RuntimeEnv;
 use datafusion_execution::TaskContext;
+use datafusion_physical_expr::expressions::{lit, DynamicFilterPhysicalExpr};
 use datafusion_physical_expr::LexOrdering;
+use datafusion_physical_expr::PhysicalExpr;
 
 use futures::{StreamExt, TryStreamExt};
 use log::{debug, trace};
@@ -843,6 +846,8 @@ pub struct SortExec {
     common_sort_prefix: Vec<PhysicalSortExpr>,
     /// Cache holding plan properties like equivalences, output partitioning 
etc.
     cache: PlanProperties,
+    /// Filter matching the state of the sort for dynamic filter pushdown
+    filter: Option<Arc<DynamicFilterPhysicalExpr>>,
 }
 
 impl SortExec {
@@ -861,6 +866,7 @@ impl SortExec {
             fetch: None,
             common_sort_prefix: sort_prefix,
             cache,
+            filter: None,
         }
     }
 
@@ -906,6 +912,17 @@ impl SortExec {
         if fetch.is_some() && is_pipeline_friendly {
             cache = cache.with_boundedness(Boundedness::Bounded);
         }
+        let filter = fetch.is_some().then(|| {
+            // If we already have a filter, keep it. Otherwise, create a new 
one.
+            self.filter.clone().unwrap_or_else(|| {
+                let children = self
+                    .expr
+                    .iter()
+                    .map(|sort_expr| Arc::clone(&sort_expr.expr))
+                    .collect::<Vec<_>>();
+                Arc::new(DynamicFilterPhysicalExpr::new(children, lit(true)))
+            })
+        });
         SortExec {
             input: Arc::clone(&self.input),
             expr: self.expr.clone(),
@@ -914,9 +931,15 @@ impl SortExec {
             common_sort_prefix: self.common_sort_prefix.clone(),
             fetch,
             cache,
+            filter,
         }
     }
 
+    pub fn with_filter(mut self, filter: Arc<DynamicFilterPhysicalExpr>) -> 
Self {
+        self.filter = Some(filter);
+        self
+    }
+
     /// Input schema
     pub fn input(&self) -> &Arc<dyn ExecutionPlan> {
         &self.input
@@ -932,6 +955,11 @@ impl SortExec {
         self.fetch
     }
 
+    /// If `Some(filter)`, returns the filter expression that matches the 
state of the sort.
+    pub fn filter(&self) -> Option<Arc<DynamicFilterPhysicalExpr>> {
+        self.filter.clone()
+    }
+
     fn output_partitioning_helper(
         input: &Arc<dyn ExecutionPlan>,
         preserve_partitioning: bool,
@@ -1009,6 +1037,13 @@ impl DisplayAs for SortExec {
                 match self.fetch {
                     Some(fetch) => {
                         write!(f, "SortExec: TopK(fetch={fetch}), expr=[{}], 
preserve_partitioning=[{preserve_partitioning}]", self.expr)?;
+                        if let Some(filter) = &self.filter {
+                            if let Ok(current) = filter.current() {
+                                if !current.eq(&lit(true)) {
+                                    write!(f, ", filter=[{current}]")?;
+                                }
+                            }
+                        }
                         if !self.common_sort_prefix.is_empty() {
                             write!(f, ", sort_prefix=[")?;
                             let mut first = true;
@@ -1079,9 +1114,10 @@ impl ExecutionPlan for SortExec {
         self: Arc<Self>,
         children: Vec<Arc<dyn ExecutionPlan>>,
     ) -> Result<Arc<dyn ExecutionPlan>> {
-        let new_sort = SortExec::new(self.expr.clone(), 
Arc::clone(&children[0]))
+        let mut new_sort = SortExec::new(self.expr.clone(), 
Arc::clone(&children[0]))
             .with_fetch(self.fetch)
             .with_preserve_partitioning(self.preserve_partitioning);
+        new_sort.filter = self.filter.clone();
 
         Ok(Arc::new(new_sort))
     }
@@ -1122,6 +1158,7 @@ impl ExecutionPlan for SortExec {
                     context.session_config().batch_size(),
                     context.runtime_env(),
                     &self.metrics_set,
+                    self.filter.clone(),
                 )?;
                 Ok(Box::pin(RecordBatchStreamAdapter::new(
                     self.schema(),
@@ -1228,6 +1265,28 @@ impl ExecutionPlan for SortExec {
                 .with_preserve_partitioning(self.preserve_partitioning()),
         )))
     }
+
+    fn gather_filters_for_pushdown(
+        &self,
+        phase: FilterPushdownPhase,
+        parent_filters: Vec<Arc<dyn PhysicalExpr>>,
+        config: &datafusion_common::config::ConfigOptions,
+    ) -> Result<FilterDescription> {
+        if !matches!(phase, FilterPushdownPhase::Post) {
+            return Ok(FilterDescription::new_with_child_count(1)
+                .all_parent_filters_supported(parent_filters));
+        }
+        if let Some(filter) = &self.filter {
+            if config.optimizer.enable_dynamic_filter_pushdown {
+                let filter = Arc::clone(filter) as Arc<dyn PhysicalExpr>;
+                return Ok(FilterDescription::new_with_child_count(1)
+                    .all_parent_filters_supported(parent_filters)
+                    .with_self_filter(filter));
+            }
+        }
+        Ok(FilterDescription::new_with_child_count(1)
+            .all_parent_filters_supported(parent_filters))
+    }
 }
 
 #[cfg(test)]
diff --git a/datafusion/physical-plan/src/topk/mod.rs 
b/datafusion/physical-plan/src/topk/mod.rs
index d8b6f0e400..8d06fa73ce 100644
--- a/datafusion/physical-plan/src/topk/mod.rs
+++ b/datafusion/physical-plan/src/topk/mod.rs
@@ -17,6 +17,12 @@
 
 //! TopK: Combination of Sort / LIMIT
 
+use arrow::{
+    array::{Array, AsArray},
+    compute::{interleave_record_batch, prep_null_mask_filter, FilterBuilder},
+    row::{RowConverter, Rows, SortField},
+};
+use datafusion_expr::{ColumnarValue, Operator};
 use std::mem::size_of;
 use std::{cmp::Ordering, collections::BinaryHeap, sync::Arc};
 
@@ -25,14 +31,18 @@ use crate::spill::get_record_batch_memory_size;
 use crate::{stream::RecordBatchStreamAdapter, SendableRecordBatchStream};
 
 use arrow::array::{ArrayRef, RecordBatch};
-use arrow::compute::interleave_record_batch;
 use arrow::datatypes::SchemaRef;
-use arrow::row::{RowConverter, Rows, SortField};
-use datafusion_common::{internal_datafusion_err, HashMap, Result};
+use datafusion_common::{
+    internal_datafusion_err, internal_err, HashMap, Result, ScalarValue,
+};
 use datafusion_execution::{
     memory_pool::{MemoryConsumer, MemoryReservation},
     runtime_env::RuntimeEnv,
 };
+use datafusion_physical_expr::{
+    expressions::{is_not_null, is_null, lit, BinaryExpr, 
DynamicFilterPhysicalExpr},
+    PhysicalExpr,
+};
 use datafusion_physical_expr_common::sort_expr::{LexOrdering, 
PhysicalSortExpr};
 
 /// Global TopK
@@ -110,6 +120,8 @@ pub struct TopK {
     common_sort_prefix_converter: Option<RowConverter>,
     /// Common sort prefix between the input and the sort expressions to allow 
early exit optimization
     common_sort_prefix: Arc<[PhysicalSortExpr]>,
+    /// Filter matching the state of the `TopK` heap used for dynamic filter 
pushdown
+    filter: Option<Arc<DynamicFilterPhysicalExpr>>,
     /// If true, indicates that all rows of subsequent batches are guaranteed
     /// to be greater (by byte order, after row conversion) than the top K,
     /// which means the top K won't change and the computation can be finished 
early.
@@ -148,6 +160,7 @@ impl TopK {
         batch_size: usize,
         runtime: Arc<RuntimeEnv>,
         metrics: &ExecutionPlanMetricsSet,
+        filter: Option<Arc<DynamicFilterPhysicalExpr>>,
     ) -> Result<Self> {
         let reservation = MemoryConsumer::new(format!("TopK[{partition_id}]"))
             .register(&runtime.memory_pool);
@@ -179,6 +192,7 @@ impl TopK {
             common_sort_prefix_converter: prefix_row_converter,
             common_sort_prefix: Arc::from(common_sort_prefix),
             finished: false,
+            filter,
         })
     }
 
@@ -189,7 +203,7 @@ impl TopK {
         let baseline = self.metrics.baseline.clone();
         let _timer = baseline.elapsed_compute().timer();
 
-        let sort_keys: Vec<ArrayRef> = self
+        let mut sort_keys: Vec<ArrayRef> = self
             .expr
             .iter()
             .map(|expr| {
@@ -198,39 +212,202 @@ impl TopK {
             })
             .collect::<Result<Vec<_>>>()?;
 
+        let mut selected_rows = None;
+
+        if let Some(filter) = self.filter.as_ref() {
+            // If a filter is provided, update it with the new rows
+            let filter = filter.current()?;
+            let filtered = filter.evaluate(&batch)?;
+            let num_rows = batch.num_rows();
+            let array = filtered.into_array(num_rows)?;
+            let mut filter = array.as_boolean().clone();
+            let true_count = filter.true_count();
+            if true_count == 0 {
+                // nothing to filter, so no need to update
+                return Ok(());
+            }
+            // only update the keys / rows if the filter does not match all 
rows
+            if true_count < num_rows {
+                // Indices in `set_indices` should be correct if filter 
contains nulls
+                // So we prepare the filter here. Note this is also done in 
the `FilterBuilder`
+                // so there is no overhead to do this here.
+                if filter.nulls().is_some() {
+                    filter = prep_null_mask_filter(&filter);
+                }
+
+                let filter_predicate = FilterBuilder::new(&filter);
+                let filter_predicate = if sort_keys.len() > 1 {
+                    // Optimize filter when it has multiple sort keys
+                    filter_predicate.optimize().build()
+                } else {
+                    filter_predicate.build()
+                };
+                selected_rows = Some(filter);
+                sort_keys = sort_keys
+                    .iter()
+                    .map(|key| filter_predicate.filter(key).map_err(|x| 
x.into()))
+                    .collect::<Result<Vec<_>>>()?;
+            }
+        };
         // reuse existing `Rows` to avoid reallocations
         let rows = &mut self.scratch_rows;
         rows.clear();
         self.row_converter.append(rows, &sort_keys)?;
 
-        // TODO make this algorithmically better?:
-        // Idea: filter out rows >= self.heap.max() early (before passing to 
`RowConverter`)
-        //       this avoids some work and also might be better vectorizable.
         let mut batch_entry = self.heap.register_batch(batch.clone());
-        for (index, row) in rows.iter().enumerate() {
+
+        let replacements = match selected_rows {
+            Some(filter) => {
+                self.find_new_topk_items(filter.values().set_indices(), &mut 
batch_entry)
+            }
+            None => self.find_new_topk_items(0..sort_keys[0].len(), &mut 
batch_entry),
+        };
+
+        if replacements > 0 {
+            self.metrics.row_replacements.add(replacements);
+
+            self.heap.insert_batch_entry(batch_entry);
+
+            // conserve memory
+            self.heap.maybe_compact()?;
+
+            // update memory reservation
+            self.reservation.try_resize(self.size())?;
+
+            // flag the topK as finished if we know that all
+            // subsequent batches are guaranteed to be greater (by byte order, 
after row conversion) than the top K,
+            // which means the top K won't change and the computation can be 
finished early.
+            self.attempt_early_completion(&batch)?;
+
+            // update the filter representation of our TopK heap
+            self.update_filter()?;
+        }
+
+        Ok(())
+    }
+
+    fn find_new_topk_items(
+        &mut self,
+        items: impl Iterator<Item = usize>,
+        batch_entry: &mut RecordBatchEntry,
+    ) -> usize {
+        let mut replacements = 0;
+        let rows = &mut self.scratch_rows;
+        for (index, row) in items.zip(rows.iter()) {
             match self.heap.max() {
                 // heap has k items, and the new row is greater than the
                 // current max in the heap ==> it is not a new topk
                 Some(max_row) if row.as_ref() >= max_row.row() => {}
                 // don't yet have k items or new item is lower than the 
currently k low values
                 None | Some(_) => {
-                    self.heap.add(&mut batch_entry, row, index);
-                    self.metrics.row_replacements.add(1);
+                    self.heap.add(batch_entry, row, index);
+                    replacements += 1;
                 }
             }
         }
-        self.heap.insert_batch_entry(batch_entry);
+        replacements
+    }
 
-        // conserve memory
-        self.heap.maybe_compact()?;
+    /// Update the filter representation of our TopK heap.
+    /// For example, given the sort expression `ORDER BY a DESC, b ASC LIMIT 
3`,
+    /// and the current heap values `[(1, 5), (1, 4), (2, 3)]`,
+    /// the filter will be updated to:
+    ///
+    /// ```sql
+    /// (a > 1 OR (a = 1 AND b < 5)) AND
+    /// (a > 1 OR (a = 1 AND b < 4)) AND
+    /// (a > 2 OR (a = 2 AND b < 3))
+    /// ```
+    fn update_filter(&mut self) -> Result<()> {
+        let Some(filter) = &self.filter else {
+            return Ok(());
+        };
+        let Some(thresholds) = self.heap.get_threshold_values(&self.expr)? 
else {
+            return Ok(());
+        };
+
+        // Create filter expressions for each threshold
+        let mut filters: Vec<Arc<dyn PhysicalExpr>> =
+            Vec::with_capacity(thresholds.len());
+
+        let mut prev_sort_expr: Option<Arc<dyn PhysicalExpr>> = None;
+        for (sort_expr, value) in self.expr.iter().zip(thresholds.iter()) {
+            // Create the appropriate operator based on sort order
+            let op = if sort_expr.options.descending {
+                // For descending sort, we want col > threshold (exclude 
smaller values)
+                Operator::Gt
+            } else {
+                // For ascending sort, we want col < threshold (exclude larger 
values)
+                Operator::Lt
+            };
+
+            let value_null = value.is_null();
+
+            let comparison = Arc::new(BinaryExpr::new(
+                Arc::clone(&sort_expr.expr),
+                op,
+                lit(value.clone()),
+            ));
+
+            let comparison_with_null = match (sort_expr.options.nulls_first, 
value_null) {
+                // For nulls first, transform to (threshold.value is not null) 
and (threshold.expr is null or comparison)
+                (true, true) => lit(false),
+                (true, false) => Arc::new(BinaryExpr::new(
+                    is_null(Arc::clone(&sort_expr.expr))?,
+                    Operator::Or,
+                    comparison,
+                )),
+                // For nulls last, transform to (threshold.value is null and 
threshold.expr is not null)
+                // or (threshold.value is not null and comparison)
+                (false, true) => is_not_null(Arc::clone(&sort_expr.expr))?,
+                (false, false) => comparison,
+            };
+
+            let mut eq_expr = Arc::new(BinaryExpr::new(
+                Arc::clone(&sort_expr.expr),
+                Operator::Eq,
+                lit(value.clone()),
+            ));
+
+            if value_null {
+                eq_expr = Arc::new(BinaryExpr::new(
+                    is_null(Arc::clone(&sort_expr.expr))?,
+                    Operator::Or,
+                    eq_expr,
+                ));
+            }
 
-        // update memory reservation
-        self.reservation.try_resize(self.size())?;
+            // For a query like order by a, b, the filter for column `b` is 
only applied if
+            // the condition a = threshold.value (considering null equality) 
is met.
+            // Therefore, we add equality predicates for all preceding fields 
to the filter logic of the current field,
+            // and include the current field's equality predicate in 
`prev_sort_expr` for use with subsequent fields.
+            match prev_sort_expr.take() {
+                None => {
+                    prev_sort_expr = Some(eq_expr);
+                    filters.push(comparison_with_null);
+                }
+                Some(p) => {
+                    filters.push(Arc::new(BinaryExpr::new(
+                        Arc::clone(&p),
+                        Operator::And,
+                        comparison_with_null,
+                    )));
+
+                    prev_sort_expr =
+                        Some(Arc::new(BinaryExpr::new(p, Operator::And, 
eq_expr)));
+                }
+            }
+        }
 
-        // flag the topK as finished if we know that all
-        // subsequent batches are guaranteed to be greater (by byte order, 
after row conversion) than the top K,
-        // which means the top K won't change and the computation can be 
finished early.
-        self.attempt_early_completion(&batch)?;
+        let dynamic_predicate = filters
+            .into_iter()
+            .reduce(|a, b| Arc::new(BinaryExpr::new(a, Operator::Or, b)));
+
+        if let Some(predicate) = dynamic_predicate {
+            if !predicate.eq(&lit(true)) {
+                filter.update(predicate)?;
+            }
+        }
 
         Ok(())
     }
@@ -324,6 +501,7 @@ impl TopK {
             common_sort_prefix_converter: _,
             common_sort_prefix: _,
             finished: _,
+            filter: _,
         } = self;
         let _timer = metrics.baseline.elapsed_compute().timer(); // time 
updated on drop
 
@@ -566,6 +744,47 @@ impl TopKHeap {
             + self.store.size()
             + self.owned_bytes
     }
+
+    fn get_threshold_values(
+        &self,
+        sort_exprs: &[PhysicalSortExpr],
+    ) -> Result<Option<Vec<ScalarValue>>> {
+        // If the heap doesn't have k elements yet, we can't create thresholds
+        let max_row = match self.max() {
+            Some(row) => row,
+            None => return Ok(None),
+        };
+
+        // Get the batch that contains the max row
+        let batch_entry = match self.store.get(max_row.batch_id) {
+            Some(entry) => entry,
+            None => return internal_err!("Invalid batch ID in TopKRow"),
+        };
+
+        // Extract threshold values for each sort expression
+        let mut scalar_values = Vec::with_capacity(sort_exprs.len());
+        for sort_expr in sort_exprs {
+            // Extract the value for this column from the max row
+            let expr = Arc::clone(&sort_expr.expr);
+            let value = expr.evaluate(&batch_entry.batch.slice(max_row.index, 
1))?;
+
+            // Convert to scalar value - should be a single value since we're 
evaluating on a single row batch
+            let scalar = match value {
+                ColumnarValue::Scalar(scalar) => scalar,
+                ColumnarValue::Array(array) if array.len() == 1 => {
+                    // Extract the first (and only) value from the array
+                    ScalarValue::try_from_array(&array, 0)?
+                }
+                array => {
+                    return internal_err!("Expected a scalar value, got {:?}", 
array)
+                }
+            };
+
+            scalar_values.push(scalar);
+        }
+
+        Ok(Some(scalar_values))
+    }
 }
 
 /// Represents one of the top K rows held in this heap. Orders
@@ -834,6 +1053,7 @@ mod tests {
             2,
             runtime,
             &metrics,
+            None,
         )?;
 
         // Create the first batch with two columns:
diff --git a/datafusion/sqllogictest/test_files/explain.slt 
b/datafusion/sqllogictest/test_files/explain.slt
index 5606482710..e652250845 100644
--- a/datafusion/sqllogictest/test_files/explain.slt
+++ b/datafusion/sqllogictest/test_files/explain.slt
@@ -243,6 +243,7 @@ physical_plan after LimitAggregation SAME TEXT AS ABOVE
 physical_plan after LimitPushdown SAME TEXT AS ABOVE
 physical_plan after ProjectionPushdown SAME TEXT AS ABOVE
 physical_plan after insert_yield_exec SAME TEXT AS ABOVE
+physical_plan after FilterPushdown(Post) SAME TEXT AS ABOVE
 physical_plan after SanityCheckPlan SAME TEXT AS ABOVE
 physical_plan DataSourceExec: file_groups={1 group: 
[[WORKSPACE_ROOT/datafusion/core/tests/data/example.csv]]}, projection=[a, b, 
c], file_type=csv, has_header=true
 physical_plan_with_stats DataSourceExec: file_groups={1 group: 
[[WORKSPACE_ROOT/datafusion/core/tests/data/example.csv]]}, projection=[a, b, 
c], file_type=csv, has_header=true, statistics=[Rows=Absent, Bytes=Absent, 
[(Col[0]:),(Col[1]:),(Col[2]:)]]
@@ -321,6 +322,7 @@ physical_plan after LimitAggregation SAME TEXT AS ABOVE
 physical_plan after LimitPushdown DataSourceExec: file_groups={1 group: 
[[WORKSPACE_ROOT/parquet-testing/data/alltypes_plain.parquet]]}, 
projection=[id, bool_col, tinyint_col, smallint_col, int_col, bigint_col, 
float_col, double_col, date_string_col, string_col, timestamp_col], limit=10, 
file_type=parquet, statistics=[Rows=Exact(8), Bytes=Exact(671), 
[(Col[0]:),(Col[1]:),(Col[2]:),(Col[3]:),(Col[4]:),(Col[5]:),(Col[6]:),(Col[7]:),(Col[8]:),(Col[9]:),(Col[10]:)]]
 physical_plan after ProjectionPushdown SAME TEXT AS ABOVE
 physical_plan after insert_yield_exec SAME TEXT AS ABOVE
+physical_plan after FilterPushdown(Post) SAME TEXT AS ABOVE
 physical_plan after SanityCheckPlan SAME TEXT AS ABOVE
 physical_plan DataSourceExec: file_groups={1 group: 
[[WORKSPACE_ROOT/parquet-testing/data/alltypes_plain.parquet]]}, 
projection=[id, bool_col, tinyint_col, smallint_col, int_col, bigint_col, 
float_col, double_col, date_string_col, string_col, timestamp_col], limit=10, 
file_type=parquet, statistics=[Rows=Exact(8), Bytes=Exact(671), 
[(Col[0]:),(Col[1]:),(Col[2]:),(Col[3]:),(Col[4]:),(Col[5]:),(Col[6]:),(Col[7]:),(Col[8]:),(Col[9]:),(Col[10]:)]]
 physical_plan_with_schema DataSourceExec: file_groups={1 group: 
[[WORKSPACE_ROOT/parquet-testing/data/alltypes_plain.parquet]]}, 
projection=[id, bool_col, tinyint_col, smallint_col, int_col, bigint_col, 
float_col, double_col, date_string_col, string_col, timestamp_col], limit=10, 
file_type=parquet, schema=[id:Int32;N, bool_col:Boolean;N, tinyint_col:Int32;N, 
smallint_col:Int32;N, int_col:Int32;N, bigint_col:Int64;N, float_col:Float32;N, 
double_col:Float64;N, date_string_col:BinaryView;N, [...]
@@ -363,6 +365,7 @@ physical_plan after LimitAggregation SAME TEXT AS ABOVE
 physical_plan after LimitPushdown DataSourceExec: file_groups={1 group: 
[[WORKSPACE_ROOT/parquet-testing/data/alltypes_plain.parquet]]}, 
projection=[id, bool_col, tinyint_col, smallint_col, int_col, bigint_col, 
float_col, double_col, date_string_col, string_col, timestamp_col], limit=10, 
file_type=parquet
 physical_plan after ProjectionPushdown SAME TEXT AS ABOVE
 physical_plan after insert_yield_exec SAME TEXT AS ABOVE
+physical_plan after FilterPushdown(Post) SAME TEXT AS ABOVE
 physical_plan after SanityCheckPlan SAME TEXT AS ABOVE
 physical_plan DataSourceExec: file_groups={1 group: 
[[WORKSPACE_ROOT/parquet-testing/data/alltypes_plain.parquet]]}, 
projection=[id, bool_col, tinyint_col, smallint_col, int_col, bigint_col, 
float_col, double_col, date_string_col, string_col, timestamp_col], limit=10, 
file_type=parquet
 physical_plan_with_stats DataSourceExec: file_groups={1 group: 
[[WORKSPACE_ROOT/parquet-testing/data/alltypes_plain.parquet]]}, 
projection=[id, bool_col, tinyint_col, smallint_col, int_col, bigint_col, 
float_col, double_col, date_string_col, string_col, timestamp_col], limit=10, 
file_type=parquet, statistics=[Rows=Exact(8), Bytes=Exact(671), 
[(Col[0]:),(Col[1]:),(Col[2]:),(Col[3]:),(Col[4]:),(Col[5]:),(Col[6]:),(Col[7]:),(Col[8]:),(Col[9]:),(Col[10]:)]]
diff --git a/datafusion/sqllogictest/test_files/information_schema.slt 
b/datafusion/sqllogictest/test_files/information_schema.slt
index f8c86df024..e2c166bc7d 100644
--- a/datafusion/sqllogictest/test_files/information_schema.slt
+++ b/datafusion/sqllogictest/test_files/information_schema.slt
@@ -285,6 +285,7 @@ datafusion.format.types_info false
 datafusion.optimizer.allow_symmetric_joins_without_pruning true
 datafusion.optimizer.default_filter_selectivity 20
 datafusion.optimizer.enable_distinct_aggregation_soft_limit true
+datafusion.optimizer.enable_dynamic_filter_pushdown true
 datafusion.optimizer.enable_round_robin_repartition true
 datafusion.optimizer.enable_topk_aggregation true
 datafusion.optimizer.expand_views_at_output false
@@ -396,6 +397,7 @@ datafusion.format.types_info false Show types in visual 
representation batches
 datafusion.optimizer.allow_symmetric_joins_without_pruning true Should 
DataFusion allow symmetric hash joins for unbounded data sources even when its 
inputs do not have any ordering or filtering If the flag is not enabled, the 
SymmetricHashJoin operator will be unable to prune its internal buffers, 
resulting in certain join types - such as Full, Left, LeftAnti, LeftSemi, 
Right, RightAnti, and RightSemi - being produced only at the end of the 
execution. This is not typical in stream proce [...]
 datafusion.optimizer.default_filter_selectivity 20 The default filter 
selectivity used by Filter Statistics when an exact selectivity cannot be 
determined. Valid values are between 0 (no selectivity) and 100 (all rows are 
selected).
 datafusion.optimizer.enable_distinct_aggregation_soft_limit true When set to 
true, the optimizer will push a limit operation into grouped aggregations which 
have no aggregate expressions, as a soft limit, emitting groups once the limit 
is reached, before all rows in the group are read.
+datafusion.optimizer.enable_dynamic_filter_pushdown true When set to true 
attempts to push down dynamic filters generated by operators into the file scan 
phase. For example, for a query such as `SELECT * FROM t ORDER BY timestamp 
DESC LIMIT 10`, the optimizer will attempt to push down the current top 10 
timestamps that the TopK operator references into the file scans. This means 
that if we already have 10 timestamps in the year 2025 any files that only have 
timestamps in the year 2024 ca [...]
 datafusion.optimizer.enable_round_robin_repartition true When set to true, the 
physical plan optimizer will try to add round robin repartitioning to increase 
parallelism to leverage more CPU cores
 datafusion.optimizer.enable_topk_aggregation true When set to true, the 
optimizer will attempt to perform limit operations during aggregations, if 
possible
 datafusion.optimizer.expand_views_at_output false When set to true, if the 
returned type is a view type then the output will be coerced to a non-view. 
Coerces `Utf8View` to `LargeUtf8`, and `BinaryView` to `LargeBinary`.
diff --git a/datafusion/sqllogictest/test_files/limit.slt 
b/datafusion/sqllogictest/test_files/limit.slt
index 9d5106bf2c..3398fa2901 100644
--- a/datafusion/sqllogictest/test_files/limit.slt
+++ b/datafusion/sqllogictest/test_files/limit.slt
@@ -854,7 +854,7 @@ physical_plan
 01)ProjectionExec: expr=[1 as foo]
 02)--SortPreservingMergeExec: [part_key@0 ASC NULLS LAST], fetch=1
 03)----SortExec: TopK(fetch=1), expr=[part_key@0 ASC NULLS LAST], 
preserve_partitioning=[true]
-04)------DataSourceExec: file_groups={3 groups: 
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet/test_limit_with_partitions/part-0.parquet:0..794],
 
[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet/test_limit_with_partitions/part-1.parquet:0..794],
 
[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet/test_limit_with_partitions/part-2.parquet:0..794]]},
 projection=[part_key], file_type=parquet
+04)------DataSourceExec: file_groups={3 groups: 
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet/test_limit_with_partitions/part-0.parquet:0..794],
 
[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet/test_limit_with_partitions/part-1.parquet:0..794],
 
[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/parquet/test_limit_with_partitions/part-2.parquet:0..794]]},
 projection=[part_key], file_type=parquet, predicate=DynamicFilterPhysicalExpr 
[ true ]
 
 query I
 with selection as (
diff --git a/datafusion/sqllogictest/test_files/topk.slt 
b/datafusion/sqllogictest/test_files/topk.slt
index 26fef6d666..9ff382d32a 100644
--- a/datafusion/sqllogictest/test_files/topk.slt
+++ b/datafusion/sqllogictest/test_files/topk.slt
@@ -316,7 +316,7 @@ explain select number, letter, age from partial_sorted 
order by number desc, let
 ----
 physical_plan
 01)SortExec: TopK(fetch=3), expr=[number@0 DESC, letter@1 ASC NULLS LAST, 
age@2 DESC], preserve_partitioning=[false], sort_prefix=[number@0 DESC, 
letter@1 ASC NULLS LAST]
-02)--DataSourceExec: file_groups={1 group: 
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/topk/partial_sorted/1.parquet]]},
 projection=[number, letter, age], output_ordering=[number@0 DESC, letter@1 ASC 
NULLS LAST], file_type=parquet
+02)--DataSourceExec: file_groups={1 group: 
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/topk/partial_sorted/1.parquet]]},
 projection=[number, letter, age], output_ordering=[number@0 DESC, letter@1 ASC 
NULLS LAST], file_type=parquet, predicate=DynamicFilterPhysicalExpr [ true ]
 
 
 # Explain variations of the above query with different orderings, and 
different sort prefixes.
@@ -326,28 +326,28 @@ explain select number, letter, age from partial_sorted 
order by age desc limit 3
 ----
 physical_plan
 01)SortExec: TopK(fetch=3), expr=[age@2 DESC], preserve_partitioning=[false]
-02)--DataSourceExec: file_groups={1 group: 
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/topk/partial_sorted/1.parquet]]},
 projection=[number, letter, age], output_ordering=[number@0 DESC, letter@1 ASC 
NULLS LAST], file_type=parquet
+02)--DataSourceExec: file_groups={1 group: 
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/topk/partial_sorted/1.parquet]]},
 projection=[number, letter, age], output_ordering=[number@0 DESC, letter@1 ASC 
NULLS LAST], file_type=parquet, predicate=DynamicFilterPhysicalExpr [ true ]
 
 query TT
 explain select number, letter, age from partial_sorted order by number desc, 
letter desc limit 3;
 ----
 physical_plan
 01)SortExec: TopK(fetch=3), expr=[number@0 DESC, letter@1 DESC], 
preserve_partitioning=[false], sort_prefix=[number@0 DESC]
-02)--DataSourceExec: file_groups={1 group: 
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/topk/partial_sorted/1.parquet]]},
 projection=[number, letter, age], output_ordering=[number@0 DESC, letter@1 ASC 
NULLS LAST], file_type=parquet
+02)--DataSourceExec: file_groups={1 group: 
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/topk/partial_sorted/1.parquet]]},
 projection=[number, letter, age], output_ordering=[number@0 DESC, letter@1 ASC 
NULLS LAST], file_type=parquet, predicate=DynamicFilterPhysicalExpr [ true ]
 
 query TT
 explain select number, letter, age from partial_sorted order by number asc 
limit 3;
 ----
 physical_plan
 01)SortExec: TopK(fetch=3), expr=[number@0 ASC NULLS LAST], 
preserve_partitioning=[false]
-02)--DataSourceExec: file_groups={1 group: 
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/topk/partial_sorted/1.parquet]]},
 projection=[number, letter, age], output_ordering=[number@0 DESC, letter@1 ASC 
NULLS LAST], file_type=parquet
+02)--DataSourceExec: file_groups={1 group: 
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/topk/partial_sorted/1.parquet]]},
 projection=[number, letter, age], output_ordering=[number@0 DESC, letter@1 ASC 
NULLS LAST], file_type=parquet, predicate=DynamicFilterPhysicalExpr [ true ]
 
 query TT
 explain select number, letter, age from partial_sorted order by letter asc, 
number desc limit 3;
 ----
 physical_plan
 01)SortExec: TopK(fetch=3), expr=[letter@1 ASC NULLS LAST, number@0 DESC], 
preserve_partitioning=[false]
-02)--DataSourceExec: file_groups={1 group: 
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/topk/partial_sorted/1.parquet]]},
 projection=[number, letter, age], output_ordering=[number@0 DESC, letter@1 ASC 
NULLS LAST], file_type=parquet
+02)--DataSourceExec: file_groups={1 group: 
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/topk/partial_sorted/1.parquet]]},
 projection=[number, letter, age], output_ordering=[number@0 DESC, letter@1 ASC 
NULLS LAST], file_type=parquet, predicate=DynamicFilterPhysicalExpr [ true ]
 
 # Explicit NULLS ordering cases (reversing the order of the NULLS on the 
number and letter orderings)
 query TT
@@ -355,14 +355,14 @@ explain select number, letter, age from partial_sorted 
order by number desc, let
 ----
 physical_plan
 01)SortExec: TopK(fetch=3), expr=[number@0 DESC, letter@1 ASC], 
preserve_partitioning=[false], sort_prefix=[number@0 DESC]
-02)--DataSourceExec: file_groups={1 group: 
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/topk/partial_sorted/1.parquet]]},
 projection=[number, letter, age], output_ordering=[number@0 DESC, letter@1 ASC 
NULLS LAST], file_type=parquet
+02)--DataSourceExec: file_groups={1 group: 
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/topk/partial_sorted/1.parquet]]},
 projection=[number, letter, age], output_ordering=[number@0 DESC, letter@1 ASC 
NULLS LAST], file_type=parquet, predicate=DynamicFilterPhysicalExpr [ true ]
 
 query TT
 explain select number, letter, age from partial_sorted order by number desc 
NULLS LAST, letter asc limit 3;
 ----
 physical_plan
 01)SortExec: TopK(fetch=3), expr=[number@0 DESC NULLS LAST, letter@1 ASC NULLS 
LAST], preserve_partitioning=[false]
-02)--DataSourceExec: file_groups={1 group: 
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/topk/partial_sorted/1.parquet]]},
 projection=[number, letter, age], output_ordering=[number@0 DESC, letter@1 ASC 
NULLS LAST], file_type=parquet
+02)--DataSourceExec: file_groups={1 group: 
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/topk/partial_sorted/1.parquet]]},
 projection=[number, letter, age], output_ordering=[number@0 DESC, letter@1 ASC 
NULLS LAST], file_type=parquet, predicate=DynamicFilterPhysicalExpr [ true ]
 
 
 # Verify that the sort prefix is correctly computed on the normalized ordering 
(removing redundant aliased columns)
diff --git a/docs/source/user-guide/configs.md 
b/docs/source/user-guide/configs.md
index 5c80cbd563..1b8233a541 100644
--- a/docs/source/user-guide/configs.md
+++ b/docs/source/user-guide/configs.md
@@ -101,6 +101,7 @@ Environment variables are read during `SessionConfig` 
initialisation so they mus
 | datafusion.optimizer.enable_distinct_aggregation_soft_limit             | 
true                      | When set to true, the optimizer will push a limit 
operation into grouped aggregations which have no aggregate expressions, as a 
soft limit, emitting groups once the limit is reached, before all rows in the 
group are read.                                                                 
                                                                                
                       [...]
 | datafusion.optimizer.enable_round_robin_repartition                     | 
true                      | When set to true, the physical plan optimizer will 
try to add round robin repartitioning to increase parallelism to leverage more 
CPU cores                                                                       
                                                                                
                                                                                
                   [...]
 | datafusion.optimizer.enable_topk_aggregation                            | 
true                      | When set to true, the optimizer will attempt to 
perform limit operations during aggregations, if possible                       
                                                                                
                                                                                
                                                                                
                     [...]
+| datafusion.optimizer.enable_dynamic_filter_pushdown                     | 
true                      | When set to true attempts to push down dynamic 
filters generated by operators into the file scan phase. For example, for a 
query such as `SELECT * FROM t ORDER BY timestamp DESC LIMIT 10`, the optimizer 
will attempt to push down the current top 10 timestamps that the TopK operator 
references into the file scans. This means that if we already have 10 
timestamps in the year 2025 any file [...]
 | datafusion.optimizer.filter_null_join_keys                              | 
false                     | When set to true, the optimizer will insert filters 
before a join between a nullable and non-nullable column to filter out nulls on 
the nullable side. This filter can add additional overhead when the file format 
does not fully support predicate push down.                                     
                                                                                
                 [...]
 | datafusion.optimizer.repartition_aggregations                           | 
true                      | Should DataFusion repartition data using the 
aggregate keys to execute aggregates in parallel using the provided 
`target_partitions` level                                                       
                                                                                
                                                                                
                                    [...]
 | datafusion.optimizer.repartition_file_min_size                          | 
10485760                  | Minimum total files size in bytes to perform file 
scan repartitioning.                                                            
                                                                                
                                                                                
                                                                                
                   [...]


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@datafusion.apache.org
For additional commands, e-mail: commits-h...@datafusion.apache.org

Reply via email to