This is an automated email from the ASF dual-hosted git repository.

github-bot pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new 93d177db4e Extend dynamic filter to joins that preserve probe side ON 
(#20447)
93d177db4e is described below

commit 93d177db4ee5bf519d05b70a7753f71d3cb410ad
Author: Helgi Kristvin Sigurbjarnarson <[email protected]>
AuthorDate: Mon Mar 2 08:01:54 2026 -0800

    Extend dynamic filter to joins that preserve probe side ON (#20447)
    
    The dynamic filter from HashJoinExec was previously gated to Inner joins
    only.
    
    PR #20192 refactored the join filter pushdown infrastructure, which
    makes extending self-generated filters to join types that preserve probe
    side on as defined by by `on_lr_is_preserved` function trivial.
    
    This PR promotes that function to the `JoinType` and uses it to
    determine what self-generated dynamic join filter to push down.
    
    This enables dynamic filter for hash joins to Left, LeftSemi, RightSemi,
    LeftAnti and LeftMark.
    
    ## Which issue does this PR close?
    
    This PR makes progress on #16973
    
    ## Rationale for this change
    
    The self-generated dynamic filter in HashJoinExec filters the probe side
    using build-side values. For Left and LeftSemi joins, the right-hand
    probe side has the same filtering semantics as Inner. Relaxing the gate
    using `on_lr_is_preserved`.
    
    ## Are these changes tested?
    
    Yes.
    
    ## Are there any user-facing changes?
    
    No.
---
 datafusion/common/src/join_type.rs                 |  29 +++
 .../tests/physical_optimizer/filter_pushdown.rs    | 274 +++++++++++++++++++++
 datafusion/optimizer/src/push_down_filter.rs       |  22 +-
 .../physical-plan/src/joins/hash_join/exec.rs      |   5 +-
 .../test_files/dynamic_filter_pushdown_config.slt  |  99 ++++++--
 .../test_files/projection_pushdown.slt             |   2 +-
 6 files changed, 391 insertions(+), 40 deletions(-)

diff --git a/datafusion/common/src/join_type.rs 
b/datafusion/common/src/join_type.rs
index e6a90db2dc..8855e993f2 100644
--- a/datafusion/common/src/join_type.rs
+++ b/datafusion/common/src/join_type.rs
@@ -97,6 +97,35 @@ impl JoinType {
         }
     }
 
+    /// Whether each side of the join is preserved for ON-clause filter 
pushdown.
+    ///
+    /// It is only correct to push ON-clause filters below a join for preserved
+    /// inputs.
+    ///
+    /// # "Preserved" input definition
+    ///
+    /// A join side is preserved if the join returns all or a subset of the 
rows
+    /// from that side, such that each output row directly maps to an input 
row.
+    /// If a side is not preserved, the join can produce extra null rows that
+    /// don't map to any input row.
+    ///
+    /// # Return Value
+    ///
+    /// A tuple of booleans - (left_preserved, right_preserved).
+    pub fn on_lr_is_preserved(&self) -> (bool, bool) {
+        match self {
+            JoinType::Inner => (true, true),
+            JoinType::Left => (false, true),
+            JoinType::Right => (true, false),
+            JoinType::Full => (false, false),
+            JoinType::LeftSemi | JoinType::RightSemi => (true, true),
+            JoinType::LeftAnti => (false, true),
+            JoinType::RightAnti => (true, false),
+            JoinType::LeftMark => (false, true),
+            JoinType::RightMark => (true, false),
+        }
+    }
+
     /// Does the join type support swapping inputs?
     pub fn supports_swap(&self) -> bool {
         matches!(
diff --git a/datafusion/core/tests/physical_optimizer/filter_pushdown.rs 
b/datafusion/core/tests/physical_optimizer/filter_pushdown.rs
index 99db81d34d..ac64727481 100644
--- a/datafusion/core/tests/physical_optimizer/filter_pushdown.rs
+++ b/datafusion/core/tests/physical_optimizer/filter_pushdown.rs
@@ -4086,3 +4086,277 @@ async fn test_filter_with_projection_pushdown() {
     ];
     assert_batches_eq!(expected, &result);
 }
+
+#[tokio::test]
+async fn test_hashjoin_dynamic_filter_pushdown_left_join() {
+    use datafusion_common::JoinType;
+    use datafusion_physical_plan::joins::{HashJoinExec, PartitionMode};
+
+    // Create build side with limited values
+    let build_batches = vec![
+        record_batch!(
+            ("a", Utf8, ["aa", "ab"]),
+            ("b", Utf8, ["ba", "bb"]),
+            ("c", Float64, [1.0, 2.0])
+        )
+        .unwrap(),
+    ];
+    let build_side_schema = Arc::new(Schema::new(vec![
+        Field::new("a", DataType::Utf8, false),
+        Field::new("b", DataType::Utf8, false),
+        Field::new("c", DataType::Float64, false),
+    ]));
+    let build_scan = TestScanBuilder::new(Arc::clone(&build_side_schema))
+        .with_support(true)
+        .with_batches(build_batches)
+        .build();
+
+    // Create probe side with more values (some won't match)
+    let probe_batches = vec![
+        record_batch!(
+            ("a", Utf8, ["aa", "ab", "ac", "ad"]),
+            ("b", Utf8, ["ba", "bb", "bc", "bd"]),
+            ("e", Float64, [1.0, 2.0, 3.0, 4.0])
+        )
+        .unwrap(),
+    ];
+    let probe_side_schema = Arc::new(Schema::new(vec![
+        Field::new("a", DataType::Utf8, false),
+        Field::new("b", DataType::Utf8, false),
+        Field::new("e", DataType::Float64, false),
+    ]));
+    let probe_scan = TestScanBuilder::new(Arc::clone(&probe_side_schema))
+        .with_support(true)
+        .with_batches(probe_batches)
+        .build();
+
+    // Create HashJoinExec with Left join and CollectLeft mode
+    let on = vec![
+        (
+            col("a", &build_side_schema).unwrap(),
+            col("a", &probe_side_schema).unwrap(),
+        ),
+        (
+            col("b", &build_side_schema).unwrap(),
+            col("b", &probe_side_schema).unwrap(),
+        ),
+    ];
+    let plan = Arc::new(
+        HashJoinExec::try_new(
+            build_scan,
+            Arc::clone(&probe_scan),
+            on,
+            None,
+            &JoinType::Left,
+            None,
+            PartitionMode::CollectLeft,
+            datafusion_common::NullEquality::NullEqualsNothing,
+            false,
+        )
+        .unwrap(),
+    ) as Arc<dyn ExecutionPlan>;
+
+    // Expect the dynamic filter predicate to be pushed down into the probe 
side DataSource
+    insta::assert_snapshot!(
+        OptimizationTest::new(Arc::clone(&plan), 
FilterPushdown::new_post_optimization(), true),
+        @r"
+    OptimizationTest:
+      input:
+        - HashJoinExec: mode=CollectLeft, join_type=Left, on=[(a@0, a@0), 
(b@1, b@1)]
+        -   DataSourceExec: file_groups={1 group: [[test.parquet]]}, 
projection=[a, b, c], file_type=test, pushdown_supported=true
+        -   DataSourceExec: file_groups={1 group: [[test.parquet]]}, 
projection=[a, b, e], file_type=test, pushdown_supported=true
+      output:
+        Ok:
+          - HashJoinExec: mode=CollectLeft, join_type=Left, on=[(a@0, a@0), 
(b@1, b@1)]
+          -   DataSourceExec: file_groups={1 group: [[test.parquet]]}, 
projection=[a, b, c], file_type=test, pushdown_supported=true
+          -   DataSourceExec: file_groups={1 group: [[test.parquet]]}, 
projection=[a, b, e], file_type=test, pushdown_supported=true, 
predicate=DynamicFilter [ empty ]
+    ",
+    );
+
+    // Actually apply the optimization and execute the plan
+    let mut config = ConfigOptions::default();
+    config.execution.parquet.pushdown_filters = true;
+    config.optimizer.enable_dynamic_filter_pushdown = true;
+    let plan = FilterPushdown::new_post_optimization()
+        .optimize(plan, &config)
+        .unwrap();
+
+    // Test that dynamic filter linking survives with_new_children
+    let children = plan.children().into_iter().map(Arc::clone).collect();
+    let plan = plan.with_new_children(children).unwrap();
+
+    let config = SessionConfig::new().with_batch_size(10);
+    let session_ctx = SessionContext::new_with_config(config);
+    session_ctx.register_object_store(
+        ObjectStoreUrl::parse("test://").unwrap().as_ref(),
+        Arc::new(InMemory::new()),
+    );
+    let state = session_ctx.state();
+    let task_ctx = state.task_ctx();
+    let batches = collect(Arc::clone(&plan), Arc::clone(&task_ctx))
+        .await
+        .unwrap();
+
+    // After execution, verify the dynamic filter was populated with bounds 
and IN-list
+    insta::assert_snapshot!(
+        format!("{}", format_plan_for_test(&plan)),
+        @r"
+    - HashJoinExec: mode=CollectLeft, join_type=Left, on=[(a@0, a@0), (b@1, 
b@1)]
+    -   DataSourceExec: file_groups={1 group: [[test.parquet]]}, 
projection=[a, b, c], file_type=test, pushdown_supported=true
+    -   DataSourceExec: file_groups={1 group: [[test.parquet]]}, 
projection=[a, b, e], file_type=test, pushdown_supported=true, 
predicate=DynamicFilter [ a@0 >= aa AND a@0 <= ab AND b@1 >= ba AND b@1 <= bb 
AND struct(a@0, b@1) IN (SET) ([{c0:aa,c1:ba}, {c0:ab,c1:bb}]) ]
+    "
+    );
+
+    // Verify result correctness: left join preserves all build (left) rows.
+    // All build rows match probe rows here, so we get 2 matched rows.
+    // The dynamic filter pruned unmatched probe rows (ac, ad) at scan time,
+    // which is safe because those probe rows would never match any build row.
+    let result = format!("{}", pretty_format_batches(&batches).unwrap());
+    insta::assert_snapshot!(
+        result,
+        @r"
+    +----+----+-----+----+----+-----+
+    | a  | b  | c   | a  | b  | e   |
+    +----+----+-----+----+----+-----+
+    | aa | ba | 1.0 | aa | ba | 1.0 |
+    | ab | bb | 2.0 | ab | bb | 2.0 |
+    +----+----+-----+----+----+-----+
+    "
+    );
+}
+
+#[tokio::test]
+async fn test_hashjoin_dynamic_filter_pushdown_left_semi_join() {
+    use datafusion_common::JoinType;
+    use datafusion_physical_plan::joins::{HashJoinExec, PartitionMode};
+
+    // Create build side with limited values
+    let build_batches = vec![
+        record_batch!(
+            ("a", Utf8, ["aa", "ab"]),
+            ("b", Utf8, ["ba", "bb"]),
+            ("c", Float64, [1.0, 2.0])
+        )
+        .unwrap(),
+    ];
+    let build_side_schema = Arc::new(Schema::new(vec![
+        Field::new("a", DataType::Utf8, false),
+        Field::new("b", DataType::Utf8, false),
+        Field::new("c", DataType::Float64, false),
+    ]));
+    let build_scan = TestScanBuilder::new(Arc::clone(&build_side_schema))
+        .with_support(true)
+        .with_batches(build_batches)
+        .build();
+
+    // Create probe side with more values (some won't match)
+    let probe_batches = vec![
+        record_batch!(
+            ("a", Utf8, ["aa", "ab", "ac", "ad"]),
+            ("b", Utf8, ["ba", "bb", "bc", "bd"]),
+            ("e", Float64, [1.0, 2.0, 3.0, 4.0])
+        )
+        .unwrap(),
+    ];
+    let probe_side_schema = Arc::new(Schema::new(vec![
+        Field::new("a", DataType::Utf8, false),
+        Field::new("b", DataType::Utf8, false),
+        Field::new("e", DataType::Float64, false),
+    ]));
+    let probe_scan = TestScanBuilder::new(Arc::clone(&probe_side_schema))
+        .with_support(true)
+        .with_batches(probe_batches)
+        .build();
+
+    // Create HashJoinExec with LeftSemi join and CollectLeft mode
+    let on = vec![
+        (
+            col("a", &build_side_schema).unwrap(),
+            col("a", &probe_side_schema).unwrap(),
+        ),
+        (
+            col("b", &build_side_schema).unwrap(),
+            col("b", &probe_side_schema).unwrap(),
+        ),
+    ];
+    let plan = Arc::new(
+        HashJoinExec::try_new(
+            build_scan,
+            Arc::clone(&probe_scan),
+            on,
+            None,
+            &JoinType::LeftSemi,
+            None,
+            PartitionMode::CollectLeft,
+            datafusion_common::NullEquality::NullEqualsNothing,
+            false,
+        )
+        .unwrap(),
+    ) as Arc<dyn ExecutionPlan>;
+
+    // Expect the dynamic filter predicate to be pushed down into the probe 
side DataSource
+    insta::assert_snapshot!(
+        OptimizationTest::new(Arc::clone(&plan), 
FilterPushdown::new_post_optimization(), true),
+        @r"
+    OptimizationTest:
+      input:
+        - HashJoinExec: mode=CollectLeft, join_type=LeftSemi, on=[(a@0, a@0), 
(b@1, b@1)]
+        -   DataSourceExec: file_groups={1 group: [[test.parquet]]}, 
projection=[a, b, c], file_type=test, pushdown_supported=true
+        -   DataSourceExec: file_groups={1 group: [[test.parquet]]}, 
projection=[a, b, e], file_type=test, pushdown_supported=true
+      output:
+        Ok:
+          - HashJoinExec: mode=CollectLeft, join_type=LeftSemi, on=[(a@0, 
a@0), (b@1, b@1)]
+          -   DataSourceExec: file_groups={1 group: [[test.parquet]]}, 
projection=[a, b, c], file_type=test, pushdown_supported=true
+          -   DataSourceExec: file_groups={1 group: [[test.parquet]]}, 
projection=[a, b, e], file_type=test, pushdown_supported=true, 
predicate=DynamicFilter [ empty ]
+    ",
+    );
+
+    // Actually apply the optimization and execute the plan
+    let mut config = ConfigOptions::default();
+    config.execution.parquet.pushdown_filters = true;
+    config.optimizer.enable_dynamic_filter_pushdown = true;
+    let plan = FilterPushdown::new_post_optimization()
+        .optimize(plan, &config)
+        .unwrap();
+
+    // Test that dynamic filter linking survives with_new_children
+    let children = plan.children().into_iter().map(Arc::clone).collect();
+    let plan = plan.with_new_children(children).unwrap();
+
+    let config = SessionConfig::new().with_batch_size(10);
+    let session_ctx = SessionContext::new_with_config(config);
+    session_ctx.register_object_store(
+        ObjectStoreUrl::parse("test://").unwrap().as_ref(),
+        Arc::new(InMemory::new()),
+    );
+    let state = session_ctx.state();
+    let task_ctx = state.task_ctx();
+    let batches = collect(Arc::clone(&plan), Arc::clone(&task_ctx))
+        .await
+        .unwrap();
+
+    // After execution, verify the dynamic filter was populated with bounds 
and IN-list
+    insta::assert_snapshot!(
+        format!("{}", format_plan_for_test(&plan)),
+        @r"
+    - HashJoinExec: mode=CollectLeft, join_type=LeftSemi, on=[(a@0, a@0), 
(b@1, b@1)]
+    -   DataSourceExec: file_groups={1 group: [[test.parquet]]}, 
projection=[a, b, c], file_type=test, pushdown_supported=true
+    -   DataSourceExec: file_groups={1 group: [[test.parquet]]}, 
projection=[a, b, e], file_type=test, pushdown_supported=true, 
predicate=DynamicFilter [ a@0 >= aa AND a@0 <= ab AND b@1 >= ba AND b@1 <= bb 
AND struct(a@0, b@1) IN (SET) ([{c0:aa,c1:ba}, {c0:ab,c1:bb}]) ]
+    "
+    );
+
+    // Verify result correctness: left semi join returns only build (left) rows
+    // that have at least one matching probe row. Output schema is build-side 
columns only.
+    let result = format!("{}", pretty_format_batches(&batches).unwrap());
+    insta::assert_snapshot!(
+        result,
+        @r"
+    +----+----+-----+
+    | a  | b  | c   |
+    +----+----+-----+
+    | aa | ba | 1.0 |
+    | ab | bb | 2.0 |
+    +----+----+-----+
+    "
+    );
+}
diff --git a/datafusion/optimizer/src/push_down_filter.rs 
b/datafusion/optimizer/src/push_down_filter.rs
index b1c0960386..f1664f267b 100644
--- a/datafusion/optimizer/src/push_down_filter.rs
+++ b/datafusion/optimizer/src/push_down_filter.rs
@@ -176,27 +176,9 @@ pub(crate) fn lr_is_preserved(join_type: JoinType) -> 
(bool, bool) {
     }
 }
 
-/// For a given JOIN type, determine whether each input of the join is 
preserved
-/// for the join condition (`ON` clause filters).
-///
-/// It is only correct to push filters below a join for preserved inputs.
-///
-/// # Return Value
-/// A tuple of booleans - (left_preserved, right_preserved).
-///
-/// See [`lr_is_preserved`] for a definition of "preserved".
+/// See [`JoinType::on_lr_is_preserved`] for details.
 pub(crate) fn on_lr_is_preserved(join_type: JoinType) -> (bool, bool) {
-    match join_type {
-        JoinType::Inner => (true, true),
-        JoinType::Left => (false, true),
-        JoinType::Right => (true, false),
-        JoinType::Full => (false, false),
-        JoinType::LeftSemi | JoinType::RightSemi => (true, true),
-        JoinType::LeftAnti => (false, true),
-        JoinType::RightAnti => (true, false),
-        JoinType::LeftMark => (false, true),
-        JoinType::RightMark => (true, false),
-    }
+    join_type.on_lr_is_preserved()
 }
 
 /// Evaluates the columns referenced in the given expression to see if they 
refer
diff --git a/datafusion/physical-plan/src/joins/hash_join/exec.rs 
b/datafusion/physical-plan/src/joins/hash_join/exec.rs
index eda7e93eff..b6ce472bbe 100644
--- a/datafusion/physical-plan/src/joins/hash_join/exec.rs
+++ b/datafusion/physical-plan/src/joins/hash_join/exec.rs
@@ -837,9 +837,8 @@ impl HashJoinExec {
     }
 
     fn allow_join_dynamic_filter_pushdown(&self, config: &ConfigOptions) -> 
bool {
-        if self.join_type != JoinType::Inner
-            || !config.optimizer.enable_join_dynamic_filter_pushdown
-        {
+        let (_, probe_preserved) = self.join_type.on_lr_is_preserved();
+        if !probe_preserved || 
!config.optimizer.enable_join_dynamic_filter_pushdown {
             return false;
         }
 
diff --git 
a/datafusion/sqllogictest/test_files/dynamic_filter_pushdown_config.slt 
b/datafusion/sqllogictest/test_files/dynamic_filter_pushdown_config.slt
index cbf9f81e42..d5202a1d95 100644
--- a/datafusion/sqllogictest/test_files/dynamic_filter_pushdown_config.slt
+++ b/datafusion/sqllogictest/test_files/dynamic_filter_pushdown_config.slt
@@ -222,8 +222,7 @@ ORDER BY l.id;
 5 left5 right5
 
 # RIGHT JOIN: optimizer swaps to physical Left join (build=right_parquet, 
probe=left_parquet).
-# No self-generated dynamic filter (only Inner joins get that), but parent 
filters
-# on the preserved (build) side can still push down.
+# Physical Left join generates a self-generated dynamic filter on the probe 
side.
 query TT
 EXPLAIN SELECT l.*, r.info
 FROM left_parquet l
@@ -240,7 +239,7 @@ physical_plan
 01)ProjectionExec: expr=[id@1 as id, data@2 as data, info@0 as info]
 02)--HashJoinExec: mode=CollectLeft, join_type=Left, on=[(id@0, id@0)], 
projection=[info@1, id@2, data@3]
 03)----DataSourceExec: file_groups={1 group: 
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/dynamic_filter_pushdown_config/join_right.parquet]]},
 projection=[id, info], file_type=parquet
-04)----DataSourceExec: file_groups={1 group: 
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/dynamic_filter_pushdown_config/join_left.parquet]]},
 projection=[id, data], file_type=parquet
+04)----DataSourceExec: file_groups={1 group: 
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/dynamic_filter_pushdown_config/join_left.parquet]]},
 projection=[id, data], file_type=parquet, predicate=DynamicFilter [ empty ]
 
 # RIGHT JOIN correctness: all right rows appear, unmatched left rows produce 
NULLs
 query ITT
@@ -272,9 +271,8 @@ physical_plan
 03)----DataSourceExec: file_groups={1 group: 
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/dynamic_filter_pushdown_config/join_right.parquet]]},
 projection=[id, info], file_type=parquet
 04)----DataSourceExec: file_groups={1 group: 
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/dynamic_filter_pushdown_config/join_left.parquet]]},
 projection=[id, data], file_type=parquet
 
-# LEFT SEMI JOIN: optimizer swaps to RightSemi (build=right_parquet, 
probe=left_parquet).
-# No self-generated dynamic filter (only Inner joins), but parent filters on
-# the preserved (probe) side can push down.
+# LEFT SEMI JOIN: optimizer swaps to RightSemi (build=right_parquet, 
probe=left_parquet)
+# and pushes the self-generated filter to the right side (left parquet).
 query TT
 EXPLAIN SELECT l.*
 FROM left_parquet l
@@ -290,10 +288,40 @@ logical_plan
 physical_plan
 01)HashJoinExec: mode=CollectLeft, join_type=RightSemi, on=[(id@0, id@0)]
 02)--DataSourceExec: file_groups={1 group: 
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/dynamic_filter_pushdown_config/join_right.parquet]]},
 projection=[id], file_type=parquet
-03)--DataSourceExec: file_groups={1 group: 
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/dynamic_filter_pushdown_config/join_left.parquet]]},
 projection=[id, data], file_type=parquet
+03)--DataSourceExec: file_groups={1 group: 
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/dynamic_filter_pushdown_config/join_left.parquet]]},
 projection=[id, data], file_type=parquet, predicate=DynamicFilter [ empty ]
 
-# LEFT ANTI JOIN: no self-generated dynamic filter, but parent filters can push
-# to the preserved (left/build) side.
+# LEFT SEMI JOIN (physical LeftSemi): reverse table roles so optimizer keeps 
LeftSemi
+# (right_parquet has 3 rows < left_parquet has 5 rows, so no swap occurs).
+# Physical LeftSemi generates a self-generated dynamic filter on the probe 
side.
+query TT
+EXPLAIN SELECT r.*
+FROM right_parquet r
+WHERE r.id IN (SELECT l.id FROM left_parquet l);
+----
+logical_plan
+01)LeftSemi Join: r.id = __correlated_sq_1.id
+02)--SubqueryAlias: r
+03)----TableScan: right_parquet projection=[id, info]
+04)--SubqueryAlias: __correlated_sq_1
+05)----SubqueryAlias: l
+06)------TableScan: left_parquet projection=[id]
+physical_plan
+01)HashJoinExec: mode=CollectLeft, join_type=LeftSemi, on=[(id@0, id@0)]
+02)--DataSourceExec: file_groups={1 group: 
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/dynamic_filter_pushdown_config/join_right.parquet]]},
 projection=[id, info], file_type=parquet
+03)--DataSourceExec: file_groups={1 group: 
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/dynamic_filter_pushdown_config/join_left.parquet]]},
 projection=[id], file_type=parquet, predicate=DynamicFilter [ empty ]
+
+# LEFT SEMI (physical LeftSemi) correctness: only right rows with matching 
left ids
+query IT rowsort
+SELECT r.*
+FROM right_parquet r
+WHERE r.id IN (SELECT l.id FROM left_parquet l);
+----
+1 right1
+3 right3
+5 right5
+
+# LEFT ANTI JOIN: both self generated and parent filters can push to the
+# preserved (left/build) side.
 query TT
 EXPLAIN SELECT l.*
 FROM left_parquet l
@@ -309,13 +337,50 @@ logical_plan
 physical_plan
 01)HashJoinExec: mode=CollectLeft, join_type=LeftAnti, on=[(id@0, id@0)]
 02)--DataSourceExec: file_groups={1 group: 
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/dynamic_filter_pushdown_config/join_left.parquet]]},
 projection=[id, data], file_type=parquet
-03)--DataSourceExec: file_groups={1 group: 
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/dynamic_filter_pushdown_config/join_right.parquet]]},
 projection=[id], file_type=parquet
+03)--DataSourceExec: file_groups={1 group: 
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/dynamic_filter_pushdown_config/join_right.parquet]]},
 projection=[id], file_type=parquet, predicate=DynamicFilter [ empty ]
+
+# LEFT MARK JOIN: the OR prevents decorrelation to LeftSemi, so the optimizer
+# uses LeftMark. Self-generated dynamic filter pushes to the probe side.
+query TT
+EXPLAIN SELECT r.id, r.info
+FROM right_parquet r
+WHERE EXISTS (SELECT 1 FROM left_parquet l WHERE r.id = l.id)
+   OR r.id = 999;
+----
+logical_plan
+01)Projection: r.id, r.info
+02)--Filter: __correlated_sq_1.mark OR r.id = Int32(999)
+03)----LeftMark Join: r.id = __correlated_sq_1.id
+04)------SubqueryAlias: r
+05)--------TableScan: right_parquet projection=[id, info]
+06)------SubqueryAlias: __correlated_sq_1
+07)--------SubqueryAlias: l
+08)----------TableScan: left_parquet projection=[id]
+physical_plan
+01)FilterExec: mark@2 OR id@0 = 999, projection=[id@0, info@1]
+02)--RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
+03)----HashJoinExec: mode=CollectLeft, join_type=LeftMark, on=[(id@0, id@0)]
+04)------DataSourceExec: file_groups={1 group: 
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/dynamic_filter_pushdown_config/join_right.parquet]]},
 projection=[id, info], file_type=parquet
+05)------DataSourceExec: file_groups={1 group: 
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/dynamic_filter_pushdown_config/join_left.parquet]]},
 projection=[id], file_type=parquet, predicate=DynamicFilter [ empty ]
+
+# LEFT MARK correctness: all right rows match EXISTS, so all 3 appear
+query IT rowsort
+SELECT r.id, r.info
+FROM right_parquet r
+WHERE EXISTS (SELECT 1 FROM left_parquet l WHERE r.id = l.id)
+   OR r.id = 999;
+----
+1 right1
+3 right3
+5 right5
 
 # Test 2c: Parent dynamic filter (from TopK) pushed through semi/anti joins
 # Sort on the join key (id) so the TopK dynamic filter pushes to BOTH sides.
 
-# SEMI JOIN with TopK parent: TopK generates a dynamic filter on `id` (join 
key)
-# that pushes through the RightSemi join to both the build and probe sides.
+# SEMI JOIN with TopK parent: TopK generates a dynamic filter on `id` (join
+# key) that pushes through the RightSemi join to both the build and probe sides
+# as well as the HashJoinExec pushing the self-generated filter to the
+# right-hand side of the join.
 query TT
 EXPLAIN SELECT l.*
 FROM left_parquet l
@@ -334,7 +399,7 @@ physical_plan
 01)SortExec: TopK(fetch=2), expr=[id@0 ASC NULLS LAST], 
preserve_partitioning=[false]
 02)--HashJoinExec: mode=CollectLeft, join_type=RightSemi, on=[(id@0, id@0)]
 03)----DataSourceExec: file_groups={1 group: 
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/dynamic_filter_pushdown_config/join_right.parquet]]},
 projection=[id], file_type=parquet, predicate=DynamicFilter [ empty ]
-04)----DataSourceExec: file_groups={1 group: 
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/dynamic_filter_pushdown_config/join_left.parquet]]},
 projection=[id, data], file_type=parquet, predicate=DynamicFilter [ empty ]
+04)----DataSourceExec: file_groups={1 group: 
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/dynamic_filter_pushdown_config/join_left.parquet]]},
 projection=[id, data], file_type=parquet, predicate=DynamicFilter [ empty ] 
AND DynamicFilter [ empty ]
 
 # Correctness check
 query IT
@@ -346,8 +411,10 @@ ORDER BY l.id LIMIT 2;
 1 left1
 3 left3
 
-# ANTI JOIN with TopK parent: TopK generates a dynamic filter on `id` (join 
key)
-# that pushes through the LeftAnti join to both the preserved and 
non-preserved sides.
+# ANTI JOIN with TopK parent: TopK generates a dynamic filter on `id` (join
+# key) that pushes through the LeftAnti join to both the preserved and
+# non-preserved sides. The HashJoin pushes the self-generated filter to the
+# right hand side of the LeftAnti join.
 query TT
 EXPLAIN SELECT l.*
 FROM left_parquet l
@@ -366,7 +433,7 @@ physical_plan
 01)SortExec: TopK(fetch=2), expr=[id@0 ASC NULLS LAST], 
preserve_partitioning=[false]
 02)--HashJoinExec: mode=CollectLeft, join_type=LeftAnti, on=[(id@0, id@0)]
 03)----DataSourceExec: file_groups={1 group: 
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/dynamic_filter_pushdown_config/join_left.parquet]]},
 projection=[id, data], file_type=parquet, predicate=DynamicFilter [ empty ]
-04)----DataSourceExec: file_groups={1 group: 
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/dynamic_filter_pushdown_config/join_right.parquet]]},
 projection=[id], file_type=parquet, predicate=DynamicFilter [ empty ]
+04)----DataSourceExec: file_groups={1 group: 
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/dynamic_filter_pushdown_config/join_right.parquet]]},
 projection=[id], file_type=parquet, predicate=DynamicFilter [ empty ] AND 
DynamicFilter [ empty ]
 
 # Correctness check
 query IT
diff --git a/datafusion/sqllogictest/test_files/projection_pushdown.slt 
b/datafusion/sqllogictest/test_files/projection_pushdown.slt
index dbb77b33c2..1c89923080 100644
--- a/datafusion/sqllogictest/test_files/projection_pushdown.slt
+++ b/datafusion/sqllogictest/test_files/projection_pushdown.slt
@@ -1604,7 +1604,7 @@ physical_plan
 02)--HashJoinExec: mode=CollectLeft, join_type=Left, on=[(id@1, id@0)], 
projection=[__datafusion_extracted_2@0, id@1, __datafusion_extracted_3@3]
 03)----DataSourceExec: file_groups={1 group: 
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/projection_pushdown/simple.parquet]]},
 projection=[get_field(s@1, value) as __datafusion_extracted_2, id], 
file_type=parquet
 04)----FilterExec: __datafusion_extracted_1@0 > 5, projection=[id@1, 
__datafusion_extracted_3@2]
-05)------DataSourceExec: file_groups={1 group: 
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/projection_pushdown/join_right.parquet]]},
 projection=[get_field(s@1, level) as __datafusion_extracted_1, id, 
get_field(s@1, level) as __datafusion_extracted_3], file_type=parquet
+05)------DataSourceExec: file_groups={1 group: 
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/projection_pushdown/join_right.parquet]]},
 projection=[get_field(s@1, level) as __datafusion_extracted_1, id, 
get_field(s@1, level) as __datafusion_extracted_3], file_type=parquet, 
predicate=DynamicFilter [ empty ]
 
 # Verify correctness - left join with level > 5 condition
 # Only join_right rows with level > 5 are matched: id=1 (level=10), id=4 
(level=8)


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to