This is an automated email from the ASF dual-hosted git repository. adriangb pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/datafusion.git
The following commit(s) were added to refs/heads/main by this push: new ff77b70200 Add dynamic filter (bounds) pushdown to HashJoinExec (#16445) ff77b70200 is described below commit ff77b702003b5bf6d9eaa759d37c4af3aefd9f4d Author: Adrian Garcia Badaracco <1755071+adria...@users.noreply.github.com> AuthorDate: Mon Aug 11 12:58:20 2025 -0500 Add dynamic filter (bounds) pushdown to HashJoinExec (#16445) --- Cargo.lock | 1 + .../physical_optimizer/filter_pushdown/mod.rs | 382 ++++++++++++++++++++- datafusion/physical-plan/Cargo.toml | 1 + datafusion/physical-plan/src/joins/hash_join.rs | 149 +++++++- 4 files changed, 512 insertions(+), 21 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 595966377f..c7c7dd804b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2505,6 +2505,7 @@ dependencies = [ "datafusion-execution", "datafusion-expr", "datafusion-functions-aggregate", + "datafusion-functions-aggregate-common", "datafusion-functions-window", "datafusion-functions-window-common", "datafusion-physical-expr", diff --git a/datafusion/core/tests/physical_optimizer/filter_pushdown/mod.rs b/datafusion/core/tests/physical_optimizer/filter_pushdown/mod.rs index 07d08b4f94..1639960fde 100644 --- a/datafusion/core/tests/physical_optimizer/filter_pushdown/mod.rs +++ b/datafusion/core/tests/physical_optimizer/filter_pushdown/mod.rs @@ -169,13 +169,13 @@ async fn test_dynamic_filter_pushdown_through_hash_join_with_topk() { // Create build side with limited values let build_batches = vec![record_batch!( ("a", Utf8, ["aa", "ab"]), - ("b", Utf8, ["ba", "bb"]), + ("b", Utf8View, ["ba", "bb"]), ("c", Float64, [1.0, 2.0]) ) .unwrap()]; let build_side_schema = Arc::new(Schema::new(vec![ Field::new("a", DataType::Utf8, false), - Field::new("b", DataType::Utf8, false), + Field::new("b", DataType::Utf8View, false), Field::new("c", DataType::Float64, false), ])); let build_scan = TestScanBuilder::new(Arc::clone(&build_side_schema)) @@ -186,13 +186,13 @@ async fn test_dynamic_filter_pushdown_through_hash_join_with_topk() { // Create probe side with more values let probe_batches = vec![record_batch!( ("d", Utf8, ["aa", "ab", "ac", "ad"]), - ("e", Utf8, ["ba", "bb", "bc", "bd"]), + ("e", Utf8View, ["ba", "bb", "bc", "bd"]), ("f", Float64, [1.0, 2.0, 3.0, 4.0]) ) .unwrap()]; let probe_side_schema = Arc::new(Schema::new(vec![ Field::new("d", DataType::Utf8, false), - Field::new("e", DataType::Utf8, false), + Field::new("e", DataType::Utf8View, false), Field::new("f", DataType::Float64, false), ])); let probe_scan = TestScanBuilder::new(Arc::clone(&probe_side_schema)) @@ -245,7 +245,7 @@ async fn test_dynamic_filter_pushdown_through_hash_join_with_topk() { - SortExec: TopK(fetch=2), expr=[e@4 ASC], preserve_partitioning=[false] - HashJoinExec: mode=Partitioned, join_type=Inner, on=[(a@0, d@0)] - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true - - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[d, e, f], file_type=test, pushdown_supported=true, predicate=DynamicFilterPhysicalExpr [ true ] + - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[d, e, f], file_type=test, pushdown_supported=true, predicate=DynamicFilterPhysicalExpr [ true ] AND DynamicFilterPhysicalExpr [ true ] " ); @@ -266,9 +266,9 @@ async fn test_dynamic_filter_pushdown_through_hash_join_with_topk() { format_plan_for_test(&plan), @r" - SortExec: TopK(fetch=2), expr=[e@4 ASC], preserve_partitioning=[false], filter=[e@4 IS NULL OR e@4 < bb] - - HashJoinExec: mode=Partitioned, join_type=Inner, on=[(a@0, d@0)] + - HashJoinExec: mode=Partitioned, join_type=Inner, on=[(a@0, d@0)], filter=[d@0 >= aa AND d@0 <= ab] - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true - - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[d, e, f], file_type=test, pushdown_supported=true, predicate=DynamicFilterPhysicalExpr [ e@1 IS NULL OR e@1 < bb ] + - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[d, e, f], file_type=test, pushdown_supported=true, predicate=DynamicFilterPhysicalExpr [ d@0 >= aa AND d@0 <= ab ] AND DynamicFilterPhysicalExpr [ e@1 IS NULL OR e@1 < bb ] " ); } @@ -285,13 +285,13 @@ async fn test_static_filter_pushdown_through_hash_join() { // Create build side with limited values let build_batches = vec![record_batch!( ("a", Utf8, ["aa", "ab"]), - ("b", Utf8, ["ba", "bb"]), + ("b", Utf8View, ["ba", "bb"]), ("c", Float64, [1.0, 2.0]) ) .unwrap()]; let build_side_schema = Arc::new(Schema::new(vec![ Field::new("a", DataType::Utf8, false), - Field::new("b", DataType::Utf8, false), + Field::new("b", DataType::Utf8View, false), Field::new("c", DataType::Float64, false), ])); let build_scan = TestScanBuilder::new(Arc::clone(&build_side_schema)) @@ -302,13 +302,13 @@ async fn test_static_filter_pushdown_through_hash_join() { // Create probe side with more values let probe_batches = vec![record_batch!( ("d", Utf8, ["aa", "ab", "ac", "ad"]), - ("e", Utf8, ["ba", "bb", "bc", "bd"]), + ("e", Utf8View, ["ba", "bb", "bc", "bd"]), ("f", Float64, [1.0, 2.0, 3.0, 4.0]) ) .unwrap()]; let probe_side_schema = Arc::new(Schema::new(vec![ Field::new("d", DataType::Utf8, false), - Field::new("e", DataType::Utf8, false), + Field::new("e", DataType::Utf8View, false), Field::new("f", DataType::Float64, false), ])); let probe_scan = TestScanBuilder::new(Arc::clone(&probe_side_schema)) @@ -736,6 +736,366 @@ async fn test_topk_dynamic_filter_pushdown() { ); } +#[tokio::test] +async fn test_hashjoin_dynamic_filter_pushdown() { + use datafusion_common::JoinType; + use datafusion_physical_plan::joins::{HashJoinExec, PartitionMode}; + + // Create build side with limited values + let build_batches = vec![record_batch!( + ("a", Utf8, ["aa", "ab"]), + ("b", Utf8, ["ba", "bb"]), + ("c", Float64, [1.0, 2.0]) // Extra column not used in join + ) + .unwrap()]; + let build_side_schema = Arc::new(Schema::new(vec![ + Field::new("a", DataType::Utf8, false), + Field::new("b", DataType::Utf8, false), + Field::new("c", DataType::Float64, false), + ])); + let build_scan = TestScanBuilder::new(Arc::clone(&build_side_schema)) + .with_support(true) + .with_batches(build_batches) + .build(); + + // Create probe side with more values + let probe_batches = vec![record_batch!( + ("a", Utf8, ["aa", "ab", "ac", "ad"]), + ("b", Utf8, ["ba", "bb", "bc", "bd"]), + ("e", Float64, [1.0, 2.0, 3.0, 4.0]) // Extra column not used in join + ) + .unwrap()]; + let probe_side_schema = Arc::new(Schema::new(vec![ + Field::new("a", DataType::Utf8, false), + Field::new("b", DataType::Utf8, false), + Field::new("e", DataType::Float64, false), + ])); + let probe_scan = TestScanBuilder::new(Arc::clone(&probe_side_schema)) + .with_support(true) + .with_batches(probe_batches) + .build(); + + // Create HashJoinExec with dynamic filter + let on = vec![ + ( + col("a", &build_side_schema).unwrap(), + col("a", &probe_side_schema).unwrap(), + ), + ( + col("b", &build_side_schema).unwrap(), + col("b", &probe_side_schema).unwrap(), + ), + ]; + let plan = Arc::new( + HashJoinExec::try_new( + build_scan, + probe_scan, + on, + None, + &JoinType::Inner, + None, + PartitionMode::Partitioned, + datafusion_common::NullEquality::NullEqualsNothing, + ) + .unwrap(), + ) as Arc<dyn ExecutionPlan>; + + // expect the predicate to be pushed down into the probe side DataSource + insta::assert_snapshot!( + OptimizationTest::new(Arc::clone(&plan), FilterPushdown::new_post_optimization(), true), + @r" + OptimizationTest: + input: + - HashJoinExec: mode=Partitioned, join_type=Inner, on=[(a@0, a@0), (b@1, b@1)] + - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true + - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, e], file_type=test, pushdown_supported=true + output: + Ok: + - HashJoinExec: mode=Partitioned, join_type=Inner, on=[(a@0, a@0), (b@1, b@1)] + - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true + - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, e], file_type=test, pushdown_supported=true, predicate=DynamicFilterPhysicalExpr [ true ] + ", + ); + + // Actually apply the optimization to the plan and execute to see the filter in action + let mut config = ConfigOptions::default(); + config.execution.parquet.pushdown_filters = true; + config.optimizer.enable_dynamic_filter_pushdown = true; + let plan = FilterPushdown::new_post_optimization() + .optimize(plan, &config) + .unwrap(); + let config = SessionConfig::new().with_batch_size(10); + let session_ctx = SessionContext::new_with_config(config); + session_ctx.register_object_store( + ObjectStoreUrl::parse("test://").unwrap().as_ref(), + Arc::new(InMemory::new()), + ); + let state = session_ctx.state(); + let task_ctx = state.task_ctx(); + let mut stream = plan.execute(0, Arc::clone(&task_ctx)).unwrap(); + // Iterate one batch + stream.next().await.unwrap().unwrap(); + + // Now check what our filter looks like + insta::assert_snapshot!( + format!("{}", format_plan_for_test(&plan)), + @r" + - HashJoinExec: mode=Partitioned, join_type=Inner, on=[(a@0, a@0), (b@1, b@1)], filter=[a@0 >= aa AND a@0 <= ab AND b@1 >= ba AND b@1 <= bb] + - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true + - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, e], file_type=test, pushdown_supported=true, predicate=DynamicFilterPhysicalExpr [ a@0 >= aa AND a@0 <= ab AND b@1 >= ba AND b@1 <= bb ] + " + ); +} + +#[tokio::test] +async fn test_nested_hashjoin_dynamic_filter_pushdown() { + use datafusion_common::JoinType; + use datafusion_physical_plan::joins::{HashJoinExec, PartitionMode}; + + // Create test data for three tables: t1, t2, t3 + // t1: small table with limited values (will be build side of outer join) + let t1_batches = + vec![ + record_batch!(("a", Utf8, ["aa", "ab"]), ("x", Float64, [1.0, 2.0])).unwrap(), + ]; + let t1_schema = Arc::new(Schema::new(vec![ + Field::new("a", DataType::Utf8, false), + Field::new("x", DataType::Float64, false), + ])); + let t1_scan = TestScanBuilder::new(Arc::clone(&t1_schema)) + .with_support(true) + .with_batches(t1_batches) + .build(); + + // t2: larger table (will be probe side of inner join, build side of outer join) + let t2_batches = vec![record_batch!( + ("b", Utf8, ["aa", "ab", "ac", "ad", "ae"]), + ("c", Utf8, ["ca", "cb", "cc", "cd", "ce"]), + ("y", Float64, [1.0, 2.0, 3.0, 4.0, 5.0]) + ) + .unwrap()]; + let t2_schema = Arc::new(Schema::new(vec![ + Field::new("b", DataType::Utf8, false), + Field::new("c", DataType::Utf8, false), + Field::new("y", DataType::Float64, false), + ])); + let t2_scan = TestScanBuilder::new(Arc::clone(&t2_schema)) + .with_support(true) + .with_batches(t2_batches) + .build(); + + // t3: largest table (will be probe side of inner join) + let t3_batches = vec![record_batch!( + ("d", Utf8, ["ca", "cb", "cc", "cd", "ce", "cf", "cg", "ch"]), + ("z", Float64, [1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0]) + ) + .unwrap()]; + let t3_schema = Arc::new(Schema::new(vec![ + Field::new("d", DataType::Utf8, false), + Field::new("z", DataType::Float64, false), + ])); + let t3_scan = TestScanBuilder::new(Arc::clone(&t3_schema)) + .with_support(true) + .with_batches(t3_batches) + .build(); + + // Create nested join structure: + // Join (t1.a = t2.b) + // / \ + // t1 Join(t2.c = t3.d) + // / \ + // t2 t3 + + // First create inner join: t2.c = t3.d + let inner_join_on = + vec![(col("c", &t2_schema).unwrap(), col("d", &t3_schema).unwrap())]; + let inner_join = Arc::new( + HashJoinExec::try_new( + t2_scan, + t3_scan, + inner_join_on, + None, + &JoinType::Inner, + None, + PartitionMode::Partitioned, + datafusion_common::NullEquality::NullEqualsNothing, + ) + .unwrap(), + ); + + // Then create outer join: t1.a = t2.b (from inner join result) + let outer_join_on = vec![( + col("a", &t1_schema).unwrap(), + col("b", &inner_join.schema()).unwrap(), + )]; + let outer_join = Arc::new( + HashJoinExec::try_new( + t1_scan, + inner_join as Arc<dyn ExecutionPlan>, + outer_join_on, + None, + &JoinType::Inner, + None, + PartitionMode::Partitioned, + datafusion_common::NullEquality::NullEqualsNothing, + ) + .unwrap(), + ) as Arc<dyn ExecutionPlan>; + + // Test that dynamic filters are pushed down correctly through nested joins + insta::assert_snapshot!( + OptimizationTest::new(Arc::clone(&outer_join), FilterPushdown::new_post_optimization(), true), + @r" + OptimizationTest: + input: + - HashJoinExec: mode=Partitioned, join_type=Inner, on=[(a@0, b@0)] + - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, x], file_type=test, pushdown_supported=true + - HashJoinExec: mode=Partitioned, join_type=Inner, on=[(c@1, d@0)] + - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[b, c, y], file_type=test, pushdown_supported=true + - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[d, z], file_type=test, pushdown_supported=true + output: + Ok: + - HashJoinExec: mode=Partitioned, join_type=Inner, on=[(a@0, b@0)] + - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, x], file_type=test, pushdown_supported=true + - HashJoinExec: mode=Partitioned, join_type=Inner, on=[(c@1, d@0)] + - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[b, c, y], file_type=test, pushdown_supported=true, predicate=DynamicFilterPhysicalExpr [ true ] + - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[d, z], file_type=test, pushdown_supported=true, predicate=DynamicFilterPhysicalExpr [ true ] + ", + ); + + // Execute the plan to verify the dynamic filters are properly updated + let mut config = ConfigOptions::default(); + config.execution.parquet.pushdown_filters = true; + config.optimizer.enable_dynamic_filter_pushdown = true; + let plan = FilterPushdown::new_post_optimization() + .optimize(outer_join, &config) + .unwrap(); + let config = SessionConfig::new().with_batch_size(10); + let session_ctx = SessionContext::new_with_config(config); + session_ctx.register_object_store( + ObjectStoreUrl::parse("test://").unwrap().as_ref(), + Arc::new(InMemory::new()), + ); + let state = session_ctx.state(); + let task_ctx = state.task_ctx(); + let mut stream = plan.execute(0, Arc::clone(&task_ctx)).unwrap(); + // Execute to populate the dynamic filters + stream.next().await.unwrap().unwrap(); + + // Verify that both the inner and outer join have updated dynamic filters + insta::assert_snapshot!( + format!("{}", format_plan_for_test(&plan)), + @r" + - HashJoinExec: mode=Partitioned, join_type=Inner, on=[(a@0, b@0)], filter=[b@0 >= aa AND b@0 <= ab] + - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, x], file_type=test, pushdown_supported=true + - HashJoinExec: mode=Partitioned, join_type=Inner, on=[(c@1, d@0)], filter=[d@0 >= ca AND d@0 <= ce] + - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[b, c, y], file_type=test, pushdown_supported=true, predicate=DynamicFilterPhysicalExpr [ b@0 >= aa AND b@0 <= ab ] + - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[d, z], file_type=test, pushdown_supported=true, predicate=DynamicFilterPhysicalExpr [ d@0 >= ca AND d@0 <= ce ] + " + ); +} + +#[tokio::test] +async fn test_hashjoin_parent_filter_pushdown() { + use datafusion_common::JoinType; + use datafusion_physical_plan::joins::{HashJoinExec, PartitionMode}; + + // Create build side with limited values + let build_batches = vec![record_batch!( + ("a", Utf8, ["aa", "ab"]), + ("b", Utf8, ["ba", "bb"]), + ("c", Float64, [1.0, 2.0]) + ) + .unwrap()]; + let build_side_schema = Arc::new(Schema::new(vec![ + Field::new("a", DataType::Utf8, false), + Field::new("b", DataType::Utf8, false), + Field::new("c", DataType::Float64, false), + ])); + let build_scan = TestScanBuilder::new(Arc::clone(&build_side_schema)) + .with_support(true) + .with_batches(build_batches) + .build(); + + // Create probe side with more values + let probe_batches = vec![record_batch!( + ("d", Utf8, ["aa", "ab", "ac", "ad"]), + ("e", Utf8, ["ba", "bb", "bc", "bd"]), + ("f", Float64, [1.0, 2.0, 3.0, 4.0]) + ) + .unwrap()]; + let probe_side_schema = Arc::new(Schema::new(vec![ + Field::new("d", DataType::Utf8, false), + Field::new("e", DataType::Utf8, false), + Field::new("f", DataType::Float64, false), + ])); + let probe_scan = TestScanBuilder::new(Arc::clone(&probe_side_schema)) + .with_support(true) + .with_batches(probe_batches) + .build(); + + // Create HashJoinExec + let on = vec![( + col("a", &build_side_schema).unwrap(), + col("d", &probe_side_schema).unwrap(), + )]; + let join = Arc::new( + HashJoinExec::try_new( + build_scan, + probe_scan, + on, + None, + &JoinType::Inner, + None, + PartitionMode::Partitioned, + datafusion_common::NullEquality::NullEqualsNothing, + ) + .unwrap(), + ); + + // Create filters that can be pushed down to different sides + // We need to create filters in the context of the join output schema + let join_schema = join.schema(); + + // Filter on build side column: a = 'aa' + let left_filter = col_lit_predicate("a", "aa", &join_schema); + // Filter on probe side column: e = 'ba' + let right_filter = col_lit_predicate("e", "ba", &join_schema); + // Filter that references both sides: a = d (should not be pushed down) + let cross_filter = Arc::new(BinaryExpr::new( + col("a", &join_schema).unwrap(), + Operator::Eq, + col("d", &join_schema).unwrap(), + )) as Arc<dyn PhysicalExpr>; + + let filter = + Arc::new(FilterExec::try_new(left_filter, Arc::clone(&join) as _).unwrap()); + let filter = Arc::new(FilterExec::try_new(right_filter, filter).unwrap()); + let plan = Arc::new(FilterExec::try_new(cross_filter, filter).unwrap()) + as Arc<dyn ExecutionPlan>; + + // Test that filters are pushed down correctly to each side of the join + insta::assert_snapshot!( + OptimizationTest::new(Arc::clone(&plan), FilterPushdown::new(), true), + @r" + OptimizationTest: + input: + - FilterExec: a@0 = d@3 + - FilterExec: e@4 = ba + - FilterExec: a@0 = aa + - HashJoinExec: mode=Partitioned, join_type=Inner, on=[(a@0, d@0)] + - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true + - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[d, e, f], file_type=test, pushdown_supported=true + output: + Ok: + - FilterExec: a@0 = d@3 + - HashJoinExec: mode=Partitioned, join_type=Inner, on=[(a@0, d@0)] + - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[a, b, c], file_type=test, pushdown_supported=true, predicate=a@0 = aa + - DataSourceExec: file_groups={1 group: [[test.parquet]]}, projection=[d, e, f], file_type=test, pushdown_supported=true, predicate=e@1 = ba + " + ); +} + /// Integration test for dynamic filter pushdown with TopK. /// We use an integration test because there are complex interactions in the optimizer rules /// that the unit tests applying a single optimizer rule do not cover. diff --git a/datafusion/physical-plan/Cargo.toml b/datafusion/physical-plan/Cargo.toml index 97b1cff777..6bf3270a35 100644 --- a/datafusion/physical-plan/Cargo.toml +++ b/datafusion/physical-plan/Cargo.toml @@ -53,6 +53,7 @@ datafusion-common = { workspace = true, default-features = true } datafusion-common-runtime = { workspace = true, default-features = true } datafusion-execution = { workspace = true } datafusion-expr = { workspace = true } +datafusion-functions-aggregate-common = { workspace = true } datafusion-functions-window-common = { workspace = true } datafusion-physical-expr = { workspace = true, default-features = true } datafusion-physical-expr-common = { workspace = true } diff --git a/datafusion/physical-plan/src/joins/hash_join.rs b/datafusion/physical-plan/src/joins/hash_join.rs index 6058b7974e..d3999c6cd8 100644 --- a/datafusion/physical-plan/src/joins/hash_join.rs +++ b/datafusion/physical-plan/src/joins/hash_join.rs @@ -76,14 +76,16 @@ use datafusion_common::config::ConfigOptions; use datafusion_common::utils::memory::estimate_memory_size; use datafusion_common::{ internal_datafusion_err, internal_err, plan_err, project_schema, JoinSide, JoinType, - NullEquality, Result, + NullEquality, Result, ScalarValue, }; use datafusion_execution::memory_pool::{MemoryConsumer, MemoryReservation}; use datafusion_execution::TaskContext; use datafusion_expr::Operator; +use datafusion_functions_aggregate_common::min_max::{max_batch, min_batch}; use datafusion_physical_expr::equivalence::{ join_equivalence_properties, ProjectionMapping, }; +use datafusion_physical_expr::expressions::{lit, BinaryExpr, DynamicFilterPhysicalExpr}; use datafusion_physical_expr::{PhysicalExpr, PhysicalExprRef}; use datafusion_physical_expr_common::datum::compare_op_for_nested; @@ -363,6 +365,8 @@ pub struct HashJoinExec { pub null_equality: NullEquality, /// Cache holding plan properties like equivalences, output partitioning etc. cache: PlanProperties, + /// Dynamic filter for pushing down to the probe side + dynamic_filter: Arc<DynamicFilterPhysicalExpr>, } impl HashJoinExec { @@ -409,6 +413,8 @@ impl HashJoinExec { projection.as_ref(), )?; + let dynamic_filter = Self::create_dynamic_filter(&on); + Ok(HashJoinExec { left, right, @@ -424,9 +430,17 @@ impl HashJoinExec { column_indices, null_equality, cache, + dynamic_filter, }) } + fn create_dynamic_filter(on: &JoinOn) -> Arc<DynamicFilterPhysicalExpr> { + // Extract the right-side keys from the `on` clauses + let right_keys: Vec<_> = on.iter().map(|(_, r)| Arc::clone(r)).collect(); + // Initialize with a placeholder expression (true) that will be updated when the hash table is built + Arc::new(DynamicFilterPhysicalExpr::new(right_keys, lit(true))) + } + /// left (build) side which gets hashed pub fn left(&self) -> &Arc<dyn ExecutionPlan> { &self.left @@ -672,10 +686,21 @@ impl DisplayAs for HashJoinExec { .map(|(c1, c2)| format!("({c1}, {c2})")) .collect::<Vec<String>>() .join(", "); + let dynamic_filter_display = match self.dynamic_filter.current() { + Ok(current) if current != lit(true) => { + format!(", filter=[{current}]") + } + _ => "".to_string(), + }; write!( f, - "HashJoinExec: mode={:?}, join_type={:?}, on=[{}]{}{}", - self.mode, self.join_type, on, display_filter, display_projections + "HashJoinExec: mode={:?}, join_type={:?}, on=[{}]{}{}{}", + self.mode, + self.join_type, + on, + display_filter, + display_projections, + dynamic_filter_display ) } DisplayFormatType::TreeRender => { @@ -762,7 +787,7 @@ impl ExecutionPlan for HashJoinExec { self: Arc<Self>, children: Vec<Arc<dyn ExecutionPlan>>, ) -> Result<Arc<dyn ExecutionPlan>> { - Ok(Arc::new(HashJoinExec::try_new( + let mut new_join = HashJoinExec::try_new( Arc::clone(&children[0]), Arc::clone(&children[1]), self.on.clone(), @@ -771,7 +796,10 @@ impl ExecutionPlan for HashJoinExec { self.projection.clone(), self.mode, self.null_equality, - )?)) + )?; + // Preserve the dynamic filter if it exists + new_join.dynamic_filter = Arc::clone(&self.dynamic_filter); + Ok(Arc::new(new_join)) } fn reset_state(self: Arc<Self>) -> Result<Arc<dyn ExecutionPlan>> { @@ -791,6 +819,7 @@ impl ExecutionPlan for HashJoinExec { column_indices: self.column_indices.clone(), null_equality: self.null_equality, cache: self.cache.clone(), + dynamic_filter: Self::create_dynamic_filter(&self.on), })) } @@ -827,6 +856,12 @@ impl ExecutionPlan for HashJoinExec { ); } + let enable_dynamic_filter_pushdown = context + .session_config() + .options() + .optimizer + .enable_dynamic_filter_pushdown; + let join_metrics = BuildProbeJoinMetrics::new(partition, &self.metrics); let left_fut = match self.mode { PartitionMode::CollectLeft => self.left_fut.try_once(|| { @@ -843,6 +878,9 @@ impl ExecutionPlan for HashJoinExec { reservation, need_produce_result_in_final(self.join_type), self.right().output_partitioning().partition_count(), + enable_dynamic_filter_pushdown + .then_some(Arc::clone(&self.dynamic_filter)), + on_right.clone(), )) })?, PartitionMode::Partitioned => { @@ -860,6 +898,9 @@ impl ExecutionPlan for HashJoinExec { reservation, need_produce_result_in_final(self.join_type), 1, + enable_dynamic_filter_pushdown + .then_some(Arc::clone(&self.dynamic_filter)), + on_right.clone(), )) } PartitionMode::Auto => { @@ -972,9 +1013,9 @@ impl ExecutionPlan for HashJoinExec { fn gather_filters_for_pushdown( &self, - _phase: FilterPushdownPhase, + phase: FilterPushdownPhase, parent_filters: Vec<Arc<dyn PhysicalExpr>>, - _config: &ConfigOptions, + config: &ConfigOptions, ) -> Result<FilterDescription> { // Other types of joins can support *some* filters, but restrictions are complex and error prone. // For now we don't support them. @@ -986,8 +1027,30 @@ impl ExecutionPlan for HashJoinExec { &self.children(), )); } - FilterDescription::from_children(parent_filters, &self.children()) - // TODO: push down our self filters to children in the post optimization phase + + // Get basic filter descriptions for both children + let left_child = crate::filter_pushdown::ChildFilterDescription::from_child( + &parent_filters, + self.left(), + )?; + let mut right_child = crate::filter_pushdown::ChildFilterDescription::from_child( + &parent_filters, + self.right(), + )?; + + // Add dynamic filters in Post phase if enabled + if matches!(phase, FilterPushdownPhase::Post) + && config.optimizer.enable_dynamic_filter_pushdown + { + // Add actual dynamic filter to right side (probe side) + let dynamic_filter = + Arc::clone(&self.dynamic_filter) as Arc<dyn PhysicalExpr>; + right_child = right_child.with_self_filter(dynamic_filter); + } + + Ok(FilterDescription::new() + .with_child(left_child) + .with_child(right_child)) } fn handle_child_pushdown_result( @@ -1012,8 +1075,31 @@ impl ExecutionPlan for HashJoinExec { } } +/// Compute min/max bounds for each column in the given arrays +fn compute_bounds(arrays: &[ArrayRef]) -> Result<Vec<(ScalarValue, ScalarValue)>> { + arrays + .iter() + .map(|array| { + if array.is_empty() { + // Return NULL values for empty arrays + return Ok(( + ScalarValue::try_from(array.data_type())?, + ScalarValue::try_from(array.data_type())?, + )); + } + + // Use Arrow kernels for efficient min/max computation + let min_val = min_batch(array)?; + let max_val = max_batch(array)?; + + Ok((min_val, max_val)) + }) + .collect() +} + /// Reads the left (build) side of the input, buffering it in memory, to build a /// hash table (`LeftJoinData`) +#[expect(clippy::too_many_arguments)] async fn collect_left_input( random_state: RandomState, left_stream: SendableRecordBatchStream, @@ -1022,6 +1108,8 @@ async fn collect_left_input( reservation: MemoryReservation, with_visited_indices_bitmap: bool, probe_threads_count: usize, + dynamic_filter: Option<Arc<DynamicFilterPhysicalExpr>>, + on_right: Vec<PhysicalExprRef>, ) -> Result<JoinLeftData> { let schema = left_stream.schema(); @@ -1114,12 +1202,53 @@ async fn collect_left_input( let data = JoinLeftData::new( hashmap, single_batch, - left_values, + left_values.clone(), Mutex::new(visited_indices_bitmap), AtomicUsize::new(probe_threads_count), reservation, ); + // Update dynamic filter with min/max bounds if provided + if let Some(dynamic_filter) = dynamic_filter { + if num_rows > 0 { + let bounds = compute_bounds(&left_values)?; + + // Create range predicates for each join key + let mut predicates = Vec::with_capacity(bounds.len()); + for ((min_val, max_val), right_expr) in bounds.iter().zip(on_right.iter()) { + // Create predicate: col >= min AND col <= max + let min_expr = Arc::new(BinaryExpr::new( + Arc::clone(right_expr), + Operator::GtEq, + lit(min_val.clone()), + )) as Arc<dyn PhysicalExpr>; + + let max_expr = Arc::new(BinaryExpr::new( + Arc::clone(right_expr), + Operator::LtEq, + lit(max_val.clone()), + )) as Arc<dyn PhysicalExpr>; + + let range_expr = + Arc::new(BinaryExpr::new(min_expr, Operator::And, max_expr)) + as Arc<dyn PhysicalExpr>; + + predicates.push(range_expr); + } + + // Combine all predicates with AND + let combined_predicate = predicates + .into_iter() + .reduce(|acc, pred| { + Arc::new(BinaryExpr::new(acc, Operator::And, pred)) + as Arc<dyn PhysicalExpr> + }) + .unwrap_or_else(|| lit(true)); + + dynamic_filter.update(combined_predicate)?; + } + } + Ok(data) } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@datafusion.apache.org For additional commands, e-mail: commits-h...@datafusion.apache.org