This is an automated email from the ASF dual-hosted git repository.

adriangb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new ff77b70200 Add dynamic filter (bounds) pushdown to HashJoinExec 
(#16445)
ff77b70200 is described below

commit ff77b702003b5bf6d9eaa759d37c4af3aefd9f4d
Author: Adrian Garcia Badaracco <1755071+adria...@users.noreply.github.com>
AuthorDate: Mon Aug 11 12:58:20 2025 -0500

    Add dynamic filter (bounds) pushdown to HashJoinExec (#16445)
---
 Cargo.lock                                         |   1 +
 .../physical_optimizer/filter_pushdown/mod.rs      | 382 ++++++++++++++++++++-
 datafusion/physical-plan/Cargo.toml                |   1 +
 datafusion/physical-plan/src/joins/hash_join.rs    | 149 +++++++-
 4 files changed, 512 insertions(+), 21 deletions(-)

diff --git a/Cargo.lock b/Cargo.lock
index 595966377f..c7c7dd804b 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -2505,6 +2505,7 @@ dependencies = [
  "datafusion-execution",
  "datafusion-expr",
  "datafusion-functions-aggregate",
+ "datafusion-functions-aggregate-common",
  "datafusion-functions-window",
  "datafusion-functions-window-common",
  "datafusion-physical-expr",
diff --git a/datafusion/core/tests/physical_optimizer/filter_pushdown/mod.rs 
b/datafusion/core/tests/physical_optimizer/filter_pushdown/mod.rs
index 07d08b4f94..1639960fde 100644
--- a/datafusion/core/tests/physical_optimizer/filter_pushdown/mod.rs
+++ b/datafusion/core/tests/physical_optimizer/filter_pushdown/mod.rs
@@ -169,13 +169,13 @@ async fn 
test_dynamic_filter_pushdown_through_hash_join_with_topk() {
     // Create build side with limited values
     let build_batches = vec![record_batch!(
         ("a", Utf8, ["aa", "ab"]),
-        ("b", Utf8, ["ba", "bb"]),
+        ("b", Utf8View, ["ba", "bb"]),
         ("c", Float64, [1.0, 2.0])
     )
     .unwrap()];
     let build_side_schema = Arc::new(Schema::new(vec![
         Field::new("a", DataType::Utf8, false),
-        Field::new("b", DataType::Utf8, false),
+        Field::new("b", DataType::Utf8View, false),
         Field::new("c", DataType::Float64, false),
     ]));
     let build_scan = TestScanBuilder::new(Arc::clone(&build_side_schema))
@@ -186,13 +186,13 @@ async fn 
test_dynamic_filter_pushdown_through_hash_join_with_topk() {
     // Create probe side with more values
     let probe_batches = vec![record_batch!(
         ("d", Utf8, ["aa", "ab", "ac", "ad"]),
-        ("e", Utf8, ["ba", "bb", "bc", "bd"]),
+        ("e", Utf8View, ["ba", "bb", "bc", "bd"]),
         ("f", Float64, [1.0, 2.0, 3.0, 4.0])
     )
     .unwrap()];
     let probe_side_schema = Arc::new(Schema::new(vec![
         Field::new("d", DataType::Utf8, false),
-        Field::new("e", DataType::Utf8, false),
+        Field::new("e", DataType::Utf8View, false),
         Field::new("f", DataType::Float64, false),
     ]));
     let probe_scan = TestScanBuilder::new(Arc::clone(&probe_side_schema))
@@ -245,7 +245,7 @@ async fn 
test_dynamic_filter_pushdown_through_hash_join_with_topk() {
     - SortExec: TopK(fetch=2), expr=[e@4 ASC], preserve_partitioning=[false]
     -   HashJoinExec: mode=Partitioned, join_type=Inner, on=[(a@0, d@0)]
     -     DataSourceExec: file_groups={1 group: [[test.parquet]]}, 
projection=[a, b, c], file_type=test, pushdown_supported=true
-    -     DataSourceExec: file_groups={1 group: [[test.parquet]]}, 
projection=[d, e, f], file_type=test, pushdown_supported=true, 
predicate=DynamicFilterPhysicalExpr [ true ]
+    -     DataSourceExec: file_groups={1 group: [[test.parquet]]}, 
projection=[d, e, f], file_type=test, pushdown_supported=true, 
predicate=DynamicFilterPhysicalExpr [ true ] AND DynamicFilterPhysicalExpr [ 
true ]
     "
     );
 
@@ -266,9 +266,9 @@ async fn 
test_dynamic_filter_pushdown_through_hash_join_with_topk() {
         format_plan_for_test(&plan),
         @r"
     - SortExec: TopK(fetch=2), expr=[e@4 ASC], preserve_partitioning=[false], 
filter=[e@4 IS NULL OR e@4 < bb]
-    -   HashJoinExec: mode=Partitioned, join_type=Inner, on=[(a@0, d@0)]
+    -   HashJoinExec: mode=Partitioned, join_type=Inner, on=[(a@0, d@0)], 
filter=[d@0 >= aa AND d@0 <= ab]
     -     DataSourceExec: file_groups={1 group: [[test.parquet]]}, 
projection=[a, b, c], file_type=test, pushdown_supported=true
-    -     DataSourceExec: file_groups={1 group: [[test.parquet]]}, 
projection=[d, e, f], file_type=test, pushdown_supported=true, 
predicate=DynamicFilterPhysicalExpr [ e@1 IS NULL OR e@1 < bb ]
+    -     DataSourceExec: file_groups={1 group: [[test.parquet]]}, 
projection=[d, e, f], file_type=test, pushdown_supported=true, 
predicate=DynamicFilterPhysicalExpr [ d@0 >= aa AND d@0 <= ab ] AND 
DynamicFilterPhysicalExpr [ e@1 IS NULL OR e@1 < bb ]
     "
     );
 }
@@ -285,13 +285,13 @@ async fn test_static_filter_pushdown_through_hash_join() {
     // Create build side with limited values
     let build_batches = vec![record_batch!(
         ("a", Utf8, ["aa", "ab"]),
-        ("b", Utf8, ["ba", "bb"]),
+        ("b", Utf8View, ["ba", "bb"]),
         ("c", Float64, [1.0, 2.0])
     )
     .unwrap()];
     let build_side_schema = Arc::new(Schema::new(vec![
         Field::new("a", DataType::Utf8, false),
-        Field::new("b", DataType::Utf8, false),
+        Field::new("b", DataType::Utf8View, false),
         Field::new("c", DataType::Float64, false),
     ]));
     let build_scan = TestScanBuilder::new(Arc::clone(&build_side_schema))
@@ -302,13 +302,13 @@ async fn test_static_filter_pushdown_through_hash_join() {
     // Create probe side with more values
     let probe_batches = vec![record_batch!(
         ("d", Utf8, ["aa", "ab", "ac", "ad"]),
-        ("e", Utf8, ["ba", "bb", "bc", "bd"]),
+        ("e", Utf8View, ["ba", "bb", "bc", "bd"]),
         ("f", Float64, [1.0, 2.0, 3.0, 4.0])
     )
     .unwrap()];
     let probe_side_schema = Arc::new(Schema::new(vec![
         Field::new("d", DataType::Utf8, false),
-        Field::new("e", DataType::Utf8, false),
+        Field::new("e", DataType::Utf8View, false),
         Field::new("f", DataType::Float64, false),
     ]));
     let probe_scan = TestScanBuilder::new(Arc::clone(&probe_side_schema))
@@ -736,6 +736,366 @@ async fn test_topk_dynamic_filter_pushdown() {
     );
 }
 
+#[tokio::test]
+async fn test_hashjoin_dynamic_filter_pushdown() {
+    use datafusion_common::JoinType;
+    use datafusion_physical_plan::joins::{HashJoinExec, PartitionMode};
+
+    // Create build side with limited values
+    let build_batches = vec![record_batch!(
+        ("a", Utf8, ["aa", "ab"]),
+        ("b", Utf8, ["ba", "bb"]),
+        ("c", Float64, [1.0, 2.0]) // Extra column not used in join
+    )
+    .unwrap()];
+    let build_side_schema = Arc::new(Schema::new(vec![
+        Field::new("a", DataType::Utf8, false),
+        Field::new("b", DataType::Utf8, false),
+        Field::new("c", DataType::Float64, false),
+    ]));
+    let build_scan = TestScanBuilder::new(Arc::clone(&build_side_schema))
+        .with_support(true)
+        .with_batches(build_batches)
+        .build();
+
+    // Create probe side with more values
+    let probe_batches = vec![record_batch!(
+        ("a", Utf8, ["aa", "ab", "ac", "ad"]),
+        ("b", Utf8, ["ba", "bb", "bc", "bd"]),
+        ("e", Float64, [1.0, 2.0, 3.0, 4.0]) // Extra column not used in join
+    )
+    .unwrap()];
+    let probe_side_schema = Arc::new(Schema::new(vec![
+        Field::new("a", DataType::Utf8, false),
+        Field::new("b", DataType::Utf8, false),
+        Field::new("e", DataType::Float64, false),
+    ]));
+    let probe_scan = TestScanBuilder::new(Arc::clone(&probe_side_schema))
+        .with_support(true)
+        .with_batches(probe_batches)
+        .build();
+
+    // Create HashJoinExec with dynamic filter
+    let on = vec![
+        (
+            col("a", &build_side_schema).unwrap(),
+            col("a", &probe_side_schema).unwrap(),
+        ),
+        (
+            col("b", &build_side_schema).unwrap(),
+            col("b", &probe_side_schema).unwrap(),
+        ),
+    ];
+    let plan = Arc::new(
+        HashJoinExec::try_new(
+            build_scan,
+            probe_scan,
+            on,
+            None,
+            &JoinType::Inner,
+            None,
+            PartitionMode::Partitioned,
+            datafusion_common::NullEquality::NullEqualsNothing,
+        )
+        .unwrap(),
+    ) as Arc<dyn ExecutionPlan>;
+
+    // expect the predicate to be pushed down into the probe side DataSource
+    insta::assert_snapshot!(
+        OptimizationTest::new(Arc::clone(&plan), 
FilterPushdown::new_post_optimization(), true),
+        @r"
+    OptimizationTest:
+      input:
+        - HashJoinExec: mode=Partitioned, join_type=Inner, on=[(a@0, a@0), 
(b@1, b@1)]
+        -   DataSourceExec: file_groups={1 group: [[test.parquet]]}, 
projection=[a, b, c], file_type=test, pushdown_supported=true
+        -   DataSourceExec: file_groups={1 group: [[test.parquet]]}, 
projection=[a, b, e], file_type=test, pushdown_supported=true
+      output:
+        Ok:
+          - HashJoinExec: mode=Partitioned, join_type=Inner, on=[(a@0, a@0), 
(b@1, b@1)]
+          -   DataSourceExec: file_groups={1 group: [[test.parquet]]}, 
projection=[a, b, c], file_type=test, pushdown_supported=true
+          -   DataSourceExec: file_groups={1 group: [[test.parquet]]}, 
projection=[a, b, e], file_type=test, pushdown_supported=true, 
predicate=DynamicFilterPhysicalExpr [ true ]
+    ",
+    );
+
+    // Actually apply the optimization to the plan and execute to see the 
filter in action
+    let mut config = ConfigOptions::default();
+    config.execution.parquet.pushdown_filters = true;
+    config.optimizer.enable_dynamic_filter_pushdown = true;
+    let plan = FilterPushdown::new_post_optimization()
+        .optimize(plan, &config)
+        .unwrap();
+    let config = SessionConfig::new().with_batch_size(10);
+    let session_ctx = SessionContext::new_with_config(config);
+    session_ctx.register_object_store(
+        ObjectStoreUrl::parse("test://").unwrap().as_ref(),
+        Arc::new(InMemory::new()),
+    );
+    let state = session_ctx.state();
+    let task_ctx = state.task_ctx();
+    let mut stream = plan.execute(0, Arc::clone(&task_ctx)).unwrap();
+    // Iterate one batch
+    stream.next().await.unwrap().unwrap();
+
+    // Now check what our filter looks like
+    insta::assert_snapshot!(
+        format!("{}", format_plan_for_test(&plan)),
+        @r"
+    - HashJoinExec: mode=Partitioned, join_type=Inner, on=[(a@0, a@0), (b@1, 
b@1)], filter=[a@0 >= aa AND a@0 <= ab AND b@1 >= ba AND b@1 <= bb]
+    -   DataSourceExec: file_groups={1 group: [[test.parquet]]}, 
projection=[a, b, c], file_type=test, pushdown_supported=true
+    -   DataSourceExec: file_groups={1 group: [[test.parquet]]}, 
projection=[a, b, e], file_type=test, pushdown_supported=true, 
predicate=DynamicFilterPhysicalExpr [ a@0 >= aa AND a@0 <= ab AND b@1 >= ba AND 
b@1 <= bb ]
+    "
+    );
+}
+
+#[tokio::test]
+async fn test_nested_hashjoin_dynamic_filter_pushdown() {
+    use datafusion_common::JoinType;
+    use datafusion_physical_plan::joins::{HashJoinExec, PartitionMode};
+
+    // Create test data for three tables: t1, t2, t3
+    // t1: small table with limited values (will be build side of outer join)
+    let t1_batches =
+        vec![
+            record_batch!(("a", Utf8, ["aa", "ab"]), ("x", Float64, [1.0, 
2.0])).unwrap(),
+        ];
+    let t1_schema = Arc::new(Schema::new(vec![
+        Field::new("a", DataType::Utf8, false),
+        Field::new("x", DataType::Float64, false),
+    ]));
+    let t1_scan = TestScanBuilder::new(Arc::clone(&t1_schema))
+        .with_support(true)
+        .with_batches(t1_batches)
+        .build();
+
+    // t2: larger table (will be probe side of inner join, build side of outer 
join)
+    let t2_batches = vec![record_batch!(
+        ("b", Utf8, ["aa", "ab", "ac", "ad", "ae"]),
+        ("c", Utf8, ["ca", "cb", "cc", "cd", "ce"]),
+        ("y", Float64, [1.0, 2.0, 3.0, 4.0, 5.0])
+    )
+    .unwrap()];
+    let t2_schema = Arc::new(Schema::new(vec![
+        Field::new("b", DataType::Utf8, false),
+        Field::new("c", DataType::Utf8, false),
+        Field::new("y", DataType::Float64, false),
+    ]));
+    let t2_scan = TestScanBuilder::new(Arc::clone(&t2_schema))
+        .with_support(true)
+        .with_batches(t2_batches)
+        .build();
+
+    // t3: largest table (will be probe side of inner join)
+    let t3_batches = vec![record_batch!(
+        ("d", Utf8, ["ca", "cb", "cc", "cd", "ce", "cf", "cg", "ch"]),
+        ("z", Float64, [1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0])
+    )
+    .unwrap()];
+    let t3_schema = Arc::new(Schema::new(vec![
+        Field::new("d", DataType::Utf8, false),
+        Field::new("z", DataType::Float64, false),
+    ]));
+    let t3_scan = TestScanBuilder::new(Arc::clone(&t3_schema))
+        .with_support(true)
+        .with_batches(t3_batches)
+        .build();
+
+    // Create nested join structure:
+    // Join (t1.a = t2.b)
+    // /        \
+    // t1    Join(t2.c = t3.d)
+    //         /    \
+    //        t2   t3
+
+    // First create inner join: t2.c = t3.d
+    let inner_join_on =
+        vec![(col("c", &t2_schema).unwrap(), col("d", &t3_schema).unwrap())];
+    let inner_join = Arc::new(
+        HashJoinExec::try_new(
+            t2_scan,
+            t3_scan,
+            inner_join_on,
+            None,
+            &JoinType::Inner,
+            None,
+            PartitionMode::Partitioned,
+            datafusion_common::NullEquality::NullEqualsNothing,
+        )
+        .unwrap(),
+    );
+
+    // Then create outer join: t1.a = t2.b (from inner join result)
+    let outer_join_on = vec![(
+        col("a", &t1_schema).unwrap(),
+        col("b", &inner_join.schema()).unwrap(),
+    )];
+    let outer_join = Arc::new(
+        HashJoinExec::try_new(
+            t1_scan,
+            inner_join as Arc<dyn ExecutionPlan>,
+            outer_join_on,
+            None,
+            &JoinType::Inner,
+            None,
+            PartitionMode::Partitioned,
+            datafusion_common::NullEquality::NullEqualsNothing,
+        )
+        .unwrap(),
+    ) as Arc<dyn ExecutionPlan>;
+
+    // Test that dynamic filters are pushed down correctly through nested joins
+    insta::assert_snapshot!(
+        OptimizationTest::new(Arc::clone(&outer_join), 
FilterPushdown::new_post_optimization(), true),
+        @r"
+    OptimizationTest:
+      input:
+        - HashJoinExec: mode=Partitioned, join_type=Inner, on=[(a@0, b@0)]
+        -   DataSourceExec: file_groups={1 group: [[test.parquet]]}, 
projection=[a, x], file_type=test, pushdown_supported=true
+        -   HashJoinExec: mode=Partitioned, join_type=Inner, on=[(c@1, d@0)]
+        -     DataSourceExec: file_groups={1 group: [[test.parquet]]}, 
projection=[b, c, y], file_type=test, pushdown_supported=true
+        -     DataSourceExec: file_groups={1 group: [[test.parquet]]}, 
projection=[d, z], file_type=test, pushdown_supported=true
+      output:
+        Ok:
+          - HashJoinExec: mode=Partitioned, join_type=Inner, on=[(a@0, b@0)]
+          -   DataSourceExec: file_groups={1 group: [[test.parquet]]}, 
projection=[a, x], file_type=test, pushdown_supported=true
+          -   HashJoinExec: mode=Partitioned, join_type=Inner, on=[(c@1, d@0)]
+          -     DataSourceExec: file_groups={1 group: [[test.parquet]]}, 
projection=[b, c, y], file_type=test, pushdown_supported=true, 
predicate=DynamicFilterPhysicalExpr [ true ]
+          -     DataSourceExec: file_groups={1 group: [[test.parquet]]}, 
projection=[d, z], file_type=test, pushdown_supported=true, 
predicate=DynamicFilterPhysicalExpr [ true ]
+    ",
+    );
+
+    // Execute the plan to verify the dynamic filters are properly updated
+    let mut config = ConfigOptions::default();
+    config.execution.parquet.pushdown_filters = true;
+    config.optimizer.enable_dynamic_filter_pushdown = true;
+    let plan = FilterPushdown::new_post_optimization()
+        .optimize(outer_join, &config)
+        .unwrap();
+    let config = SessionConfig::new().with_batch_size(10);
+    let session_ctx = SessionContext::new_with_config(config);
+    session_ctx.register_object_store(
+        ObjectStoreUrl::parse("test://").unwrap().as_ref(),
+        Arc::new(InMemory::new()),
+    );
+    let state = session_ctx.state();
+    let task_ctx = state.task_ctx();
+    let mut stream = plan.execute(0, Arc::clone(&task_ctx)).unwrap();
+    // Execute to populate the dynamic filters
+    stream.next().await.unwrap().unwrap();
+
+    // Verify that both the inner and outer join have updated dynamic filters
+    insta::assert_snapshot!(
+        format!("{}", format_plan_for_test(&plan)),
+        @r"
+    - HashJoinExec: mode=Partitioned, join_type=Inner, on=[(a@0, b@0)], 
filter=[b@0 >= aa AND b@0 <= ab]
+    -   DataSourceExec: file_groups={1 group: [[test.parquet]]}, 
projection=[a, x], file_type=test, pushdown_supported=true
+    -   HashJoinExec: mode=Partitioned, join_type=Inner, on=[(c@1, d@0)], 
filter=[d@0 >= ca AND d@0 <= ce]
+    -     DataSourceExec: file_groups={1 group: [[test.parquet]]}, 
projection=[b, c, y], file_type=test, pushdown_supported=true, 
predicate=DynamicFilterPhysicalExpr [ b@0 >= aa AND b@0 <= ab ]
+    -     DataSourceExec: file_groups={1 group: [[test.parquet]]}, 
projection=[d, z], file_type=test, pushdown_supported=true, 
predicate=DynamicFilterPhysicalExpr [ d@0 >= ca AND d@0 <= ce ]
+    "
+    );
+}
+
+#[tokio::test]
+async fn test_hashjoin_parent_filter_pushdown() {
+    use datafusion_common::JoinType;
+    use datafusion_physical_plan::joins::{HashJoinExec, PartitionMode};
+
+    // Create build side with limited values
+    let build_batches = vec![record_batch!(
+        ("a", Utf8, ["aa", "ab"]),
+        ("b", Utf8, ["ba", "bb"]),
+        ("c", Float64, [1.0, 2.0])
+    )
+    .unwrap()];
+    let build_side_schema = Arc::new(Schema::new(vec![
+        Field::new("a", DataType::Utf8, false),
+        Field::new("b", DataType::Utf8, false),
+        Field::new("c", DataType::Float64, false),
+    ]));
+    let build_scan = TestScanBuilder::new(Arc::clone(&build_side_schema))
+        .with_support(true)
+        .with_batches(build_batches)
+        .build();
+
+    // Create probe side with more values
+    let probe_batches = vec![record_batch!(
+        ("d", Utf8, ["aa", "ab", "ac", "ad"]),
+        ("e", Utf8, ["ba", "bb", "bc", "bd"]),
+        ("f", Float64, [1.0, 2.0, 3.0, 4.0])
+    )
+    .unwrap()];
+    let probe_side_schema = Arc::new(Schema::new(vec![
+        Field::new("d", DataType::Utf8, false),
+        Field::new("e", DataType::Utf8, false),
+        Field::new("f", DataType::Float64, false),
+    ]));
+    let probe_scan = TestScanBuilder::new(Arc::clone(&probe_side_schema))
+        .with_support(true)
+        .with_batches(probe_batches)
+        .build();
+
+    // Create HashJoinExec
+    let on = vec![(
+        col("a", &build_side_schema).unwrap(),
+        col("d", &probe_side_schema).unwrap(),
+    )];
+    let join = Arc::new(
+        HashJoinExec::try_new(
+            build_scan,
+            probe_scan,
+            on,
+            None,
+            &JoinType::Inner,
+            None,
+            PartitionMode::Partitioned,
+            datafusion_common::NullEquality::NullEqualsNothing,
+        )
+        .unwrap(),
+    );
+
+    // Create filters that can be pushed down to different sides
+    // We need to create filters in the context of the join output schema
+    let join_schema = join.schema();
+
+    // Filter on build side column: a = 'aa'
+    let left_filter = col_lit_predicate("a", "aa", &join_schema);
+    // Filter on probe side column: e = 'ba'
+    let right_filter = col_lit_predicate("e", "ba", &join_schema);
+    // Filter that references both sides: a = d (should not be pushed down)
+    let cross_filter = Arc::new(BinaryExpr::new(
+        col("a", &join_schema).unwrap(),
+        Operator::Eq,
+        col("d", &join_schema).unwrap(),
+    )) as Arc<dyn PhysicalExpr>;
+
+    let filter =
+        Arc::new(FilterExec::try_new(left_filter, Arc::clone(&join) as 
_).unwrap());
+    let filter = Arc::new(FilterExec::try_new(right_filter, filter).unwrap());
+    let plan = Arc::new(FilterExec::try_new(cross_filter, filter).unwrap())
+        as Arc<dyn ExecutionPlan>;
+
+    // Test that filters are pushed down correctly to each side of the join
+    insta::assert_snapshot!(
+        OptimizationTest::new(Arc::clone(&plan), FilterPushdown::new(), true),
+        @r"
+    OptimizationTest:
+      input:
+        - FilterExec: a@0 = d@3
+        -   FilterExec: e@4 = ba
+        -     FilterExec: a@0 = aa
+        -       HashJoinExec: mode=Partitioned, join_type=Inner, on=[(a@0, 
d@0)]
+        -         DataSourceExec: file_groups={1 group: [[test.parquet]]}, 
projection=[a, b, c], file_type=test, pushdown_supported=true
+        -         DataSourceExec: file_groups={1 group: [[test.parquet]]}, 
projection=[d, e, f], file_type=test, pushdown_supported=true
+      output:
+        Ok:
+          - FilterExec: a@0 = d@3
+          -   HashJoinExec: mode=Partitioned, join_type=Inner, on=[(a@0, d@0)]
+          -     DataSourceExec: file_groups={1 group: [[test.parquet]]}, 
projection=[a, b, c], file_type=test, pushdown_supported=true, predicate=a@0 = 
aa
+          -     DataSourceExec: file_groups={1 group: [[test.parquet]]}, 
projection=[d, e, f], file_type=test, pushdown_supported=true, predicate=e@1 = 
ba
+    "
+    );
+}
+
 /// Integration test for dynamic filter pushdown with TopK.
 /// We use an integration test because there are complex interactions in the 
optimizer rules
 /// that the unit tests applying a single optimizer rule do not cover.
diff --git a/datafusion/physical-plan/Cargo.toml 
b/datafusion/physical-plan/Cargo.toml
index 97b1cff777..6bf3270a35 100644
--- a/datafusion/physical-plan/Cargo.toml
+++ b/datafusion/physical-plan/Cargo.toml
@@ -53,6 +53,7 @@ datafusion-common = { workspace = true, default-features = 
true }
 datafusion-common-runtime = { workspace = true, default-features = true }
 datafusion-execution = { workspace = true }
 datafusion-expr = { workspace = true }
+datafusion-functions-aggregate-common = { workspace = true }
 datafusion-functions-window-common = { workspace = true }
 datafusion-physical-expr = { workspace = true, default-features = true }
 datafusion-physical-expr-common = { workspace = true }
diff --git a/datafusion/physical-plan/src/joins/hash_join.rs 
b/datafusion/physical-plan/src/joins/hash_join.rs
index 6058b7974e..d3999c6cd8 100644
--- a/datafusion/physical-plan/src/joins/hash_join.rs
+++ b/datafusion/physical-plan/src/joins/hash_join.rs
@@ -76,14 +76,16 @@ use datafusion_common::config::ConfigOptions;
 use datafusion_common::utils::memory::estimate_memory_size;
 use datafusion_common::{
     internal_datafusion_err, internal_err, plan_err, project_schema, JoinSide, 
JoinType,
-    NullEquality, Result,
+    NullEquality, Result, ScalarValue,
 };
 use datafusion_execution::memory_pool::{MemoryConsumer, MemoryReservation};
 use datafusion_execution::TaskContext;
 use datafusion_expr::Operator;
+use datafusion_functions_aggregate_common::min_max::{max_batch, min_batch};
 use datafusion_physical_expr::equivalence::{
     join_equivalence_properties, ProjectionMapping,
 };
+use datafusion_physical_expr::expressions::{lit, BinaryExpr, 
DynamicFilterPhysicalExpr};
 use datafusion_physical_expr::{PhysicalExpr, PhysicalExprRef};
 use datafusion_physical_expr_common::datum::compare_op_for_nested;
 
@@ -363,6 +365,8 @@ pub struct HashJoinExec {
     pub null_equality: NullEquality,
     /// Cache holding plan properties like equivalences, output partitioning 
etc.
     cache: PlanProperties,
+    /// Dynamic filter for pushing down to the probe side
+    dynamic_filter: Arc<DynamicFilterPhysicalExpr>,
 }
 
 impl HashJoinExec {
@@ -409,6 +413,8 @@ impl HashJoinExec {
             projection.as_ref(),
         )?;
 
+        let dynamic_filter = Self::create_dynamic_filter(&on);
+
         Ok(HashJoinExec {
             left,
             right,
@@ -424,9 +430,17 @@ impl HashJoinExec {
             column_indices,
             null_equality,
             cache,
+            dynamic_filter,
         })
     }
 
+    fn create_dynamic_filter(on: &JoinOn) -> Arc<DynamicFilterPhysicalExpr> {
+        // Extract the right-side keys from the `on` clauses
+        let right_keys: Vec<_> = on.iter().map(|(_, r)| 
Arc::clone(r)).collect();
+        // Initialize with a placeholder expression (true) that will be 
updated when the hash table is built
+        Arc::new(DynamicFilterPhysicalExpr::new(right_keys, lit(true)))
+    }
+
     /// left (build) side which gets hashed
     pub fn left(&self) -> &Arc<dyn ExecutionPlan> {
         &self.left
@@ -672,10 +686,21 @@ impl DisplayAs for HashJoinExec {
                     .map(|(c1, c2)| format!("({c1}, {c2})"))
                     .collect::<Vec<String>>()
                     .join(", ");
+                let dynamic_filter_display = match 
self.dynamic_filter.current() {
+                    Ok(current) if current != lit(true) => {
+                        format!(", filter=[{current}]")
+                    }
+                    _ => "".to_string(),
+                };
                 write!(
                     f,
-                    "HashJoinExec: mode={:?}, join_type={:?}, on=[{}]{}{}",
-                    self.mode, self.join_type, on, display_filter, 
display_projections
+                    "HashJoinExec: mode={:?}, join_type={:?}, on=[{}]{}{}{}",
+                    self.mode,
+                    self.join_type,
+                    on,
+                    display_filter,
+                    display_projections,
+                    dynamic_filter_display
                 )
             }
             DisplayFormatType::TreeRender => {
@@ -762,7 +787,7 @@ impl ExecutionPlan for HashJoinExec {
         self: Arc<Self>,
         children: Vec<Arc<dyn ExecutionPlan>>,
     ) -> Result<Arc<dyn ExecutionPlan>> {
-        Ok(Arc::new(HashJoinExec::try_new(
+        let mut new_join = HashJoinExec::try_new(
             Arc::clone(&children[0]),
             Arc::clone(&children[1]),
             self.on.clone(),
@@ -771,7 +796,10 @@ impl ExecutionPlan for HashJoinExec {
             self.projection.clone(),
             self.mode,
             self.null_equality,
-        )?))
+        )?;
+        // Preserve the dynamic filter if it exists
+        new_join.dynamic_filter = Arc::clone(&self.dynamic_filter);
+        Ok(Arc::new(new_join))
     }
 
     fn reset_state(self: Arc<Self>) -> Result<Arc<dyn ExecutionPlan>> {
@@ -791,6 +819,7 @@ impl ExecutionPlan for HashJoinExec {
             column_indices: self.column_indices.clone(),
             null_equality: self.null_equality,
             cache: self.cache.clone(),
+            dynamic_filter: Self::create_dynamic_filter(&self.on),
         }))
     }
 
@@ -827,6 +856,12 @@ impl ExecutionPlan for HashJoinExec {
             );
         }
 
+        let enable_dynamic_filter_pushdown = context
+            .session_config()
+            .options()
+            .optimizer
+            .enable_dynamic_filter_pushdown;
+
         let join_metrics = BuildProbeJoinMetrics::new(partition, 
&self.metrics);
         let left_fut = match self.mode {
             PartitionMode::CollectLeft => self.left_fut.try_once(|| {
@@ -843,6 +878,9 @@ impl ExecutionPlan for HashJoinExec {
                     reservation,
                     need_produce_result_in_final(self.join_type),
                     self.right().output_partitioning().partition_count(),
+                    enable_dynamic_filter_pushdown
+                        .then_some(Arc::clone(&self.dynamic_filter)),
+                    on_right.clone(),
                 ))
             })?,
             PartitionMode::Partitioned => {
@@ -860,6 +898,9 @@ impl ExecutionPlan for HashJoinExec {
                     reservation,
                     need_produce_result_in_final(self.join_type),
                     1,
+                    enable_dynamic_filter_pushdown
+                        .then_some(Arc::clone(&self.dynamic_filter)),
+                    on_right.clone(),
                 ))
             }
             PartitionMode::Auto => {
@@ -972,9 +1013,9 @@ impl ExecutionPlan for HashJoinExec {
 
     fn gather_filters_for_pushdown(
         &self,
-        _phase: FilterPushdownPhase,
+        phase: FilterPushdownPhase,
         parent_filters: Vec<Arc<dyn PhysicalExpr>>,
-        _config: &ConfigOptions,
+        config: &ConfigOptions,
     ) -> Result<FilterDescription> {
         // Other types of joins can support *some* filters, but restrictions 
are complex and error prone.
         // For now we don't support them.
@@ -986,8 +1027,30 @@ impl ExecutionPlan for HashJoinExec {
                 &self.children(),
             ));
         }
-        FilterDescription::from_children(parent_filters, &self.children())
-        // TODO: push down our self filters to children in the post 
optimization phase
+
+        // Get basic filter descriptions for both children
+        let left_child = 
crate::filter_pushdown::ChildFilterDescription::from_child(
+            &parent_filters,
+            self.left(),
+        )?;
+        let mut right_child = 
crate::filter_pushdown::ChildFilterDescription::from_child(
+            &parent_filters,
+            self.right(),
+        )?;
+
+        // Add dynamic filters in Post phase if enabled
+        if matches!(phase, FilterPushdownPhase::Post)
+            && config.optimizer.enable_dynamic_filter_pushdown
+        {
+            // Add actual dynamic filter to right side (probe side)
+            let dynamic_filter =
+                Arc::clone(&self.dynamic_filter) as Arc<dyn PhysicalExpr>;
+            right_child = right_child.with_self_filter(dynamic_filter);
+        }
+
+        Ok(FilterDescription::new()
+            .with_child(left_child)
+            .with_child(right_child))
     }
 
     fn handle_child_pushdown_result(
@@ -1012,8 +1075,31 @@ impl ExecutionPlan for HashJoinExec {
     }
 }
 
+/// Compute min/max bounds for each column in the given arrays
+fn compute_bounds(arrays: &[ArrayRef]) -> Result<Vec<(ScalarValue, 
ScalarValue)>> {
+    arrays
+        .iter()
+        .map(|array| {
+            if array.is_empty() {
+                // Return NULL values for empty arrays
+                return Ok((
+                    ScalarValue::try_from(array.data_type())?,
+                    ScalarValue::try_from(array.data_type())?,
+                ));
+            }
+
+            // Use Arrow kernels for efficient min/max computation
+            let min_val = min_batch(array)?;
+            let max_val = max_batch(array)?;
+
+            Ok((min_val, max_val))
+        })
+        .collect()
+}
+
 /// Reads the left (build) side of the input, buffering it in memory, to build 
a
 /// hash table (`LeftJoinData`)
+#[expect(clippy::too_many_arguments)]
 async fn collect_left_input(
     random_state: RandomState,
     left_stream: SendableRecordBatchStream,
@@ -1022,6 +1108,8 @@ async fn collect_left_input(
     reservation: MemoryReservation,
     with_visited_indices_bitmap: bool,
     probe_threads_count: usize,
+    dynamic_filter: Option<Arc<DynamicFilterPhysicalExpr>>,
+    on_right: Vec<PhysicalExprRef>,
 ) -> Result<JoinLeftData> {
     let schema = left_stream.schema();
 
@@ -1114,12 +1202,53 @@ async fn collect_left_input(
     let data = JoinLeftData::new(
         hashmap,
         single_batch,
-        left_values,
+        left_values.clone(),
         Mutex::new(visited_indices_bitmap),
         AtomicUsize::new(probe_threads_count),
         reservation,
     );
 
+    // Update dynamic filter with min/max bounds if provided
+    if let Some(dynamic_filter) = dynamic_filter {
+        if num_rows > 0 {
+            let bounds = compute_bounds(&left_values)?;
+
+            // Create range predicates for each join key
+            let mut predicates = Vec::with_capacity(bounds.len());
+            for ((min_val, max_val), right_expr) in 
bounds.iter().zip(on_right.iter()) {
+                // Create predicate: col >= min AND col <= max
+                let min_expr = Arc::new(BinaryExpr::new(
+                    Arc::clone(right_expr),
+                    Operator::GtEq,
+                    lit(min_val.clone()),
+                )) as Arc<dyn PhysicalExpr>;
+
+                let max_expr = Arc::new(BinaryExpr::new(
+                    Arc::clone(right_expr),
+                    Operator::LtEq,
+                    lit(max_val.clone()),
+                )) as Arc<dyn PhysicalExpr>;
+
+                let range_expr =
+                    Arc::new(BinaryExpr::new(min_expr, Operator::And, 
max_expr))
+                        as Arc<dyn PhysicalExpr>;
+
+                predicates.push(range_expr);
+            }
+
+            // Combine all predicates with AND
+            let combined_predicate = predicates
+                .into_iter()
+                .reduce(|acc, pred| {
+                    Arc::new(BinaryExpr::new(acc, Operator::And, pred))
+                        as Arc<dyn PhysicalExpr>
+                })
+                .unwrap_or_else(|| lit(true));
+
+            dynamic_filter.update(combined_predicate)?;
+        }
+    }
+
     Ok(data)
 }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@datafusion.apache.org
For additional commands, e-mail: commits-h...@datafusion.apache.org

Reply via email to