This is an automated email from the ASF dual-hosted git repository.
github-bot pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new 93d177db4e Extend dynamic filter to joins that preserve probe side ON
(#20447)
93d177db4e is described below
commit 93d177db4ee5bf519d05b70a7753f71d3cb410ad
Author: Helgi Kristvin Sigurbjarnarson <[email protected]>
AuthorDate: Mon Mar 2 08:01:54 2026 -0800
Extend dynamic filter to joins that preserve probe side ON (#20447)
The dynamic filter from HashJoinExec was previously gated to Inner joins
only.
PR #20192 refactored the join filter pushdown infrastructure, which
makes extending self-generated filters to join types that preserve probe
side on as defined by by `on_lr_is_preserved` function trivial.
This PR promotes that function to the `JoinType` and uses it to
determine what self-generated dynamic join filter to push down.
This enables dynamic filter for hash joins to Left, LeftSemi, RightSemi,
LeftAnti and LeftMark.
## Which issue does this PR close?
This PR makes progress on #16973
## Rationale for this change
The self-generated dynamic filter in HashJoinExec filters the probe side
using build-side values. For Left and LeftSemi joins, the right-hand
probe side has the same filtering semantics as Inner. Relaxing the gate
using `on_lr_is_preserved`.
## Are these changes tested?
Yes.
## Are there any user-facing changes?
No.
---
datafusion/common/src/join_type.rs | 29 +++
.../tests/physical_optimizer/filter_pushdown.rs | 274 +++++++++++++++++++++
datafusion/optimizer/src/push_down_filter.rs | 22 +-
.../physical-plan/src/joins/hash_join/exec.rs | 5 +-
.../test_files/dynamic_filter_pushdown_config.slt | 99 ++++++--
.../test_files/projection_pushdown.slt | 2 +-
6 files changed, 391 insertions(+), 40 deletions(-)
diff --git a/datafusion/common/src/join_type.rs
b/datafusion/common/src/join_type.rs
index e6a90db2dc..8855e993f2 100644
--- a/datafusion/common/src/join_type.rs
+++ b/datafusion/common/src/join_type.rs
@@ -97,6 +97,35 @@ impl JoinType {
}
}
+ /// Whether each side of the join is preserved for ON-clause filter
pushdown.
+ ///
+ /// It is only correct to push ON-clause filters below a join for preserved
+ /// inputs.
+ ///
+ /// # "Preserved" input definition
+ ///
+ /// A join side is preserved if the join returns all or a subset of the
rows
+ /// from that side, such that each output row directly maps to an input
row.
+ /// If a side is not preserved, the join can produce extra null rows that
+ /// don't map to any input row.
+ ///
+ /// # Return Value
+ ///
+ /// A tuple of booleans - (left_preserved, right_preserved).
+ pub fn on_lr_is_preserved(&self) -> (bool, bool) {
+ match self {
+ JoinType::Inner => (true, true),
+ JoinType::Left => (false, true),
+ JoinType::Right => (true, false),
+ JoinType::Full => (false, false),
+ JoinType::LeftSemi | JoinType::RightSemi => (true, true),
+ JoinType::LeftAnti => (false, true),
+ JoinType::RightAnti => (true, false),
+ JoinType::LeftMark => (false, true),
+ JoinType::RightMark => (true, false),
+ }
+ }
+
/// Does the join type support swapping inputs?
pub fn supports_swap(&self) -> bool {
matches!(
diff --git a/datafusion/core/tests/physical_optimizer/filter_pushdown.rs
b/datafusion/core/tests/physical_optimizer/filter_pushdown.rs
index 99db81d34d..ac64727481 100644
--- a/datafusion/core/tests/physical_optimizer/filter_pushdown.rs
+++ b/datafusion/core/tests/physical_optimizer/filter_pushdown.rs
@@ -4086,3 +4086,277 @@ async fn test_filter_with_projection_pushdown() {
];
assert_batches_eq!(expected, &result);
}
+
+#[tokio::test]
+async fn test_hashjoin_dynamic_filter_pushdown_left_join() {
+ use datafusion_common::JoinType;
+ use datafusion_physical_plan::joins::{HashJoinExec, PartitionMode};
+
+ // Create build side with limited values
+ let build_batches = vec![
+ record_batch!(
+ ("a", Utf8, ["aa", "ab"]),
+ ("b", Utf8, ["ba", "bb"]),
+ ("c", Float64, [1.0, 2.0])
+ )
+ .unwrap(),
+ ];
+ let build_side_schema = Arc::new(Schema::new(vec![
+ Field::new("a", DataType::Utf8, false),
+ Field::new("b", DataType::Utf8, false),
+ Field::new("c", DataType::Float64, false),
+ ]));
+ let build_scan = TestScanBuilder::new(Arc::clone(&build_side_schema))
+ .with_support(true)
+ .with_batches(build_batches)
+ .build();
+
+ // Create probe side with more values (some won't match)
+ let probe_batches = vec![
+ record_batch!(
+ ("a", Utf8, ["aa", "ab", "ac", "ad"]),
+ ("b", Utf8, ["ba", "bb", "bc", "bd"]),
+ ("e", Float64, [1.0, 2.0, 3.0, 4.0])
+ )
+ .unwrap(),
+ ];
+ let probe_side_schema = Arc::new(Schema::new(vec![
+ Field::new("a", DataType::Utf8, false),
+ Field::new("b", DataType::Utf8, false),
+ Field::new("e", DataType::Float64, false),
+ ]));
+ let probe_scan = TestScanBuilder::new(Arc::clone(&probe_side_schema))
+ .with_support(true)
+ .with_batches(probe_batches)
+ .build();
+
+ // Create HashJoinExec with Left join and CollectLeft mode
+ let on = vec![
+ (
+ col("a", &build_side_schema).unwrap(),
+ col("a", &probe_side_schema).unwrap(),
+ ),
+ (
+ col("b", &build_side_schema).unwrap(),
+ col("b", &probe_side_schema).unwrap(),
+ ),
+ ];
+ let plan = Arc::new(
+ HashJoinExec::try_new(
+ build_scan,
+ Arc::clone(&probe_scan),
+ on,
+ None,
+ &JoinType::Left,
+ None,
+ PartitionMode::CollectLeft,
+ datafusion_common::NullEquality::NullEqualsNothing,
+ false,
+ )
+ .unwrap(),
+ ) as Arc<dyn ExecutionPlan>;
+
+ // Expect the dynamic filter predicate to be pushed down into the probe
side DataSource
+ insta::assert_snapshot!(
+ OptimizationTest::new(Arc::clone(&plan),
FilterPushdown::new_post_optimization(), true),
+ @r"
+ OptimizationTest:
+ input:
+ - HashJoinExec: mode=CollectLeft, join_type=Left, on=[(a@0, a@0),
(b@1, b@1)]
+ - DataSourceExec: file_groups={1 group: [[test.parquet]]},
projection=[a, b, c], file_type=test, pushdown_supported=true
+ - DataSourceExec: file_groups={1 group: [[test.parquet]]},
projection=[a, b, e], file_type=test, pushdown_supported=true
+ output:
+ Ok:
+ - HashJoinExec: mode=CollectLeft, join_type=Left, on=[(a@0, a@0),
(b@1, b@1)]
+ - DataSourceExec: file_groups={1 group: [[test.parquet]]},
projection=[a, b, c], file_type=test, pushdown_supported=true
+ - DataSourceExec: file_groups={1 group: [[test.parquet]]},
projection=[a, b, e], file_type=test, pushdown_supported=true,
predicate=DynamicFilter [ empty ]
+ ",
+ );
+
+ // Actually apply the optimization and execute the plan
+ let mut config = ConfigOptions::default();
+ config.execution.parquet.pushdown_filters = true;
+ config.optimizer.enable_dynamic_filter_pushdown = true;
+ let plan = FilterPushdown::new_post_optimization()
+ .optimize(plan, &config)
+ .unwrap();
+
+ // Test that dynamic filter linking survives with_new_children
+ let children = plan.children().into_iter().map(Arc::clone).collect();
+ let plan = plan.with_new_children(children).unwrap();
+
+ let config = SessionConfig::new().with_batch_size(10);
+ let session_ctx = SessionContext::new_with_config(config);
+ session_ctx.register_object_store(
+ ObjectStoreUrl::parse("test://").unwrap().as_ref(),
+ Arc::new(InMemory::new()),
+ );
+ let state = session_ctx.state();
+ let task_ctx = state.task_ctx();
+ let batches = collect(Arc::clone(&plan), Arc::clone(&task_ctx))
+ .await
+ .unwrap();
+
+ // After execution, verify the dynamic filter was populated with bounds
and IN-list
+ insta::assert_snapshot!(
+ format!("{}", format_plan_for_test(&plan)),
+ @r"
+ - HashJoinExec: mode=CollectLeft, join_type=Left, on=[(a@0, a@0), (b@1,
b@1)]
+ - DataSourceExec: file_groups={1 group: [[test.parquet]]},
projection=[a, b, c], file_type=test, pushdown_supported=true
+ - DataSourceExec: file_groups={1 group: [[test.parquet]]},
projection=[a, b, e], file_type=test, pushdown_supported=true,
predicate=DynamicFilter [ a@0 >= aa AND a@0 <= ab AND b@1 >= ba AND b@1 <= bb
AND struct(a@0, b@1) IN (SET) ([{c0:aa,c1:ba}, {c0:ab,c1:bb}]) ]
+ "
+ );
+
+ // Verify result correctness: left join preserves all build (left) rows.
+ // All build rows match probe rows here, so we get 2 matched rows.
+ // The dynamic filter pruned unmatched probe rows (ac, ad) at scan time,
+ // which is safe because those probe rows would never match any build row.
+ let result = format!("{}", pretty_format_batches(&batches).unwrap());
+ insta::assert_snapshot!(
+ result,
+ @r"
+ +----+----+-----+----+----+-----+
+ | a | b | c | a | b | e |
+ +----+----+-----+----+----+-----+
+ | aa | ba | 1.0 | aa | ba | 1.0 |
+ | ab | bb | 2.0 | ab | bb | 2.0 |
+ +----+----+-----+----+----+-----+
+ "
+ );
+}
+
+#[tokio::test]
+async fn test_hashjoin_dynamic_filter_pushdown_left_semi_join() {
+ use datafusion_common::JoinType;
+ use datafusion_physical_plan::joins::{HashJoinExec, PartitionMode};
+
+ // Create build side with limited values
+ let build_batches = vec![
+ record_batch!(
+ ("a", Utf8, ["aa", "ab"]),
+ ("b", Utf8, ["ba", "bb"]),
+ ("c", Float64, [1.0, 2.0])
+ )
+ .unwrap(),
+ ];
+ let build_side_schema = Arc::new(Schema::new(vec![
+ Field::new("a", DataType::Utf8, false),
+ Field::new("b", DataType::Utf8, false),
+ Field::new("c", DataType::Float64, false),
+ ]));
+ let build_scan = TestScanBuilder::new(Arc::clone(&build_side_schema))
+ .with_support(true)
+ .with_batches(build_batches)
+ .build();
+
+ // Create probe side with more values (some won't match)
+ let probe_batches = vec![
+ record_batch!(
+ ("a", Utf8, ["aa", "ab", "ac", "ad"]),
+ ("b", Utf8, ["ba", "bb", "bc", "bd"]),
+ ("e", Float64, [1.0, 2.0, 3.0, 4.0])
+ )
+ .unwrap(),
+ ];
+ let probe_side_schema = Arc::new(Schema::new(vec![
+ Field::new("a", DataType::Utf8, false),
+ Field::new("b", DataType::Utf8, false),
+ Field::new("e", DataType::Float64, false),
+ ]));
+ let probe_scan = TestScanBuilder::new(Arc::clone(&probe_side_schema))
+ .with_support(true)
+ .with_batches(probe_batches)
+ .build();
+
+ // Create HashJoinExec with LeftSemi join and CollectLeft mode
+ let on = vec![
+ (
+ col("a", &build_side_schema).unwrap(),
+ col("a", &probe_side_schema).unwrap(),
+ ),
+ (
+ col("b", &build_side_schema).unwrap(),
+ col("b", &probe_side_schema).unwrap(),
+ ),
+ ];
+ let plan = Arc::new(
+ HashJoinExec::try_new(
+ build_scan,
+ Arc::clone(&probe_scan),
+ on,
+ None,
+ &JoinType::LeftSemi,
+ None,
+ PartitionMode::CollectLeft,
+ datafusion_common::NullEquality::NullEqualsNothing,
+ false,
+ )
+ .unwrap(),
+ ) as Arc<dyn ExecutionPlan>;
+
+ // Expect the dynamic filter predicate to be pushed down into the probe
side DataSource
+ insta::assert_snapshot!(
+ OptimizationTest::new(Arc::clone(&plan),
FilterPushdown::new_post_optimization(), true),
+ @r"
+ OptimizationTest:
+ input:
+ - HashJoinExec: mode=CollectLeft, join_type=LeftSemi, on=[(a@0, a@0),
(b@1, b@1)]
+ - DataSourceExec: file_groups={1 group: [[test.parquet]]},
projection=[a, b, c], file_type=test, pushdown_supported=true
+ - DataSourceExec: file_groups={1 group: [[test.parquet]]},
projection=[a, b, e], file_type=test, pushdown_supported=true
+ output:
+ Ok:
+ - HashJoinExec: mode=CollectLeft, join_type=LeftSemi, on=[(a@0,
a@0), (b@1, b@1)]
+ - DataSourceExec: file_groups={1 group: [[test.parquet]]},
projection=[a, b, c], file_type=test, pushdown_supported=true
+ - DataSourceExec: file_groups={1 group: [[test.parquet]]},
projection=[a, b, e], file_type=test, pushdown_supported=true,
predicate=DynamicFilter [ empty ]
+ ",
+ );
+
+ // Actually apply the optimization and execute the plan
+ let mut config = ConfigOptions::default();
+ config.execution.parquet.pushdown_filters = true;
+ config.optimizer.enable_dynamic_filter_pushdown = true;
+ let plan = FilterPushdown::new_post_optimization()
+ .optimize(plan, &config)
+ .unwrap();
+
+ // Test that dynamic filter linking survives with_new_children
+ let children = plan.children().into_iter().map(Arc::clone).collect();
+ let plan = plan.with_new_children(children).unwrap();
+
+ let config = SessionConfig::new().with_batch_size(10);
+ let session_ctx = SessionContext::new_with_config(config);
+ session_ctx.register_object_store(
+ ObjectStoreUrl::parse("test://").unwrap().as_ref(),
+ Arc::new(InMemory::new()),
+ );
+ let state = session_ctx.state();
+ let task_ctx = state.task_ctx();
+ let batches = collect(Arc::clone(&plan), Arc::clone(&task_ctx))
+ .await
+ .unwrap();
+
+ // After execution, verify the dynamic filter was populated with bounds
and IN-list
+ insta::assert_snapshot!(
+ format!("{}", format_plan_for_test(&plan)),
+ @r"
+ - HashJoinExec: mode=CollectLeft, join_type=LeftSemi, on=[(a@0, a@0),
(b@1, b@1)]
+ - DataSourceExec: file_groups={1 group: [[test.parquet]]},
projection=[a, b, c], file_type=test, pushdown_supported=true
+ - DataSourceExec: file_groups={1 group: [[test.parquet]]},
projection=[a, b, e], file_type=test, pushdown_supported=true,
predicate=DynamicFilter [ a@0 >= aa AND a@0 <= ab AND b@1 >= ba AND b@1 <= bb
AND struct(a@0, b@1) IN (SET) ([{c0:aa,c1:ba}, {c0:ab,c1:bb}]) ]
+ "
+ );
+
+ // Verify result correctness: left semi join returns only build (left) rows
+ // that have at least one matching probe row. Output schema is build-side
columns only.
+ let result = format!("{}", pretty_format_batches(&batches).unwrap());
+ insta::assert_snapshot!(
+ result,
+ @r"
+ +----+----+-----+
+ | a | b | c |
+ +----+----+-----+
+ | aa | ba | 1.0 |
+ | ab | bb | 2.0 |
+ +----+----+-----+
+ "
+ );
+}
diff --git a/datafusion/optimizer/src/push_down_filter.rs
b/datafusion/optimizer/src/push_down_filter.rs
index b1c0960386..f1664f267b 100644
--- a/datafusion/optimizer/src/push_down_filter.rs
+++ b/datafusion/optimizer/src/push_down_filter.rs
@@ -176,27 +176,9 @@ pub(crate) fn lr_is_preserved(join_type: JoinType) ->
(bool, bool) {
}
}
-/// For a given JOIN type, determine whether each input of the join is
preserved
-/// for the join condition (`ON` clause filters).
-///
-/// It is only correct to push filters below a join for preserved inputs.
-///
-/// # Return Value
-/// A tuple of booleans - (left_preserved, right_preserved).
-///
-/// See [`lr_is_preserved`] for a definition of "preserved".
+/// See [`JoinType::on_lr_is_preserved`] for details.
pub(crate) fn on_lr_is_preserved(join_type: JoinType) -> (bool, bool) {
- match join_type {
- JoinType::Inner => (true, true),
- JoinType::Left => (false, true),
- JoinType::Right => (true, false),
- JoinType::Full => (false, false),
- JoinType::LeftSemi | JoinType::RightSemi => (true, true),
- JoinType::LeftAnti => (false, true),
- JoinType::RightAnti => (true, false),
- JoinType::LeftMark => (false, true),
- JoinType::RightMark => (true, false),
- }
+ join_type.on_lr_is_preserved()
}
/// Evaluates the columns referenced in the given expression to see if they
refer
diff --git a/datafusion/physical-plan/src/joins/hash_join/exec.rs
b/datafusion/physical-plan/src/joins/hash_join/exec.rs
index eda7e93eff..b6ce472bbe 100644
--- a/datafusion/physical-plan/src/joins/hash_join/exec.rs
+++ b/datafusion/physical-plan/src/joins/hash_join/exec.rs
@@ -837,9 +837,8 @@ impl HashJoinExec {
}
fn allow_join_dynamic_filter_pushdown(&self, config: &ConfigOptions) ->
bool {
- if self.join_type != JoinType::Inner
- || !config.optimizer.enable_join_dynamic_filter_pushdown
- {
+ let (_, probe_preserved) = self.join_type.on_lr_is_preserved();
+ if !probe_preserved ||
!config.optimizer.enable_join_dynamic_filter_pushdown {
return false;
}
diff --git
a/datafusion/sqllogictest/test_files/dynamic_filter_pushdown_config.slt
b/datafusion/sqllogictest/test_files/dynamic_filter_pushdown_config.slt
index cbf9f81e42..d5202a1d95 100644
--- a/datafusion/sqllogictest/test_files/dynamic_filter_pushdown_config.slt
+++ b/datafusion/sqllogictest/test_files/dynamic_filter_pushdown_config.slt
@@ -222,8 +222,7 @@ ORDER BY l.id;
5 left5 right5
# RIGHT JOIN: optimizer swaps to physical Left join (build=right_parquet,
probe=left_parquet).
-# No self-generated dynamic filter (only Inner joins get that), but parent
filters
-# on the preserved (build) side can still push down.
+# Physical Left join generates a self-generated dynamic filter on the probe
side.
query TT
EXPLAIN SELECT l.*, r.info
FROM left_parquet l
@@ -240,7 +239,7 @@ physical_plan
01)ProjectionExec: expr=[id@1 as id, data@2 as data, info@0 as info]
02)--HashJoinExec: mode=CollectLeft, join_type=Left, on=[(id@0, id@0)],
projection=[info@1, id@2, data@3]
03)----DataSourceExec: file_groups={1 group:
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/dynamic_filter_pushdown_config/join_right.parquet]]},
projection=[id, info], file_type=parquet
-04)----DataSourceExec: file_groups={1 group:
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/dynamic_filter_pushdown_config/join_left.parquet]]},
projection=[id, data], file_type=parquet
+04)----DataSourceExec: file_groups={1 group:
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/dynamic_filter_pushdown_config/join_left.parquet]]},
projection=[id, data], file_type=parquet, predicate=DynamicFilter [ empty ]
# RIGHT JOIN correctness: all right rows appear, unmatched left rows produce
NULLs
query ITT
@@ -272,9 +271,8 @@ physical_plan
03)----DataSourceExec: file_groups={1 group:
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/dynamic_filter_pushdown_config/join_right.parquet]]},
projection=[id, info], file_type=parquet
04)----DataSourceExec: file_groups={1 group:
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/dynamic_filter_pushdown_config/join_left.parquet]]},
projection=[id, data], file_type=parquet
-# LEFT SEMI JOIN: optimizer swaps to RightSemi (build=right_parquet,
probe=left_parquet).
-# No self-generated dynamic filter (only Inner joins), but parent filters on
-# the preserved (probe) side can push down.
+# LEFT SEMI JOIN: optimizer swaps to RightSemi (build=right_parquet,
probe=left_parquet)
+# and pushes the self-generated filter to the right side (left parquet).
query TT
EXPLAIN SELECT l.*
FROM left_parquet l
@@ -290,10 +288,40 @@ logical_plan
physical_plan
01)HashJoinExec: mode=CollectLeft, join_type=RightSemi, on=[(id@0, id@0)]
02)--DataSourceExec: file_groups={1 group:
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/dynamic_filter_pushdown_config/join_right.parquet]]},
projection=[id], file_type=parquet
-03)--DataSourceExec: file_groups={1 group:
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/dynamic_filter_pushdown_config/join_left.parquet]]},
projection=[id, data], file_type=parquet
+03)--DataSourceExec: file_groups={1 group:
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/dynamic_filter_pushdown_config/join_left.parquet]]},
projection=[id, data], file_type=parquet, predicate=DynamicFilter [ empty ]
-# LEFT ANTI JOIN: no self-generated dynamic filter, but parent filters can push
-# to the preserved (left/build) side.
+# LEFT SEMI JOIN (physical LeftSemi): reverse table roles so optimizer keeps
LeftSemi
+# (right_parquet has 3 rows < left_parquet has 5 rows, so no swap occurs).
+# Physical LeftSemi generates a self-generated dynamic filter on the probe
side.
+query TT
+EXPLAIN SELECT r.*
+FROM right_parquet r
+WHERE r.id IN (SELECT l.id FROM left_parquet l);
+----
+logical_plan
+01)LeftSemi Join: r.id = __correlated_sq_1.id
+02)--SubqueryAlias: r
+03)----TableScan: right_parquet projection=[id, info]
+04)--SubqueryAlias: __correlated_sq_1
+05)----SubqueryAlias: l
+06)------TableScan: left_parquet projection=[id]
+physical_plan
+01)HashJoinExec: mode=CollectLeft, join_type=LeftSemi, on=[(id@0, id@0)]
+02)--DataSourceExec: file_groups={1 group:
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/dynamic_filter_pushdown_config/join_right.parquet]]},
projection=[id, info], file_type=parquet
+03)--DataSourceExec: file_groups={1 group:
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/dynamic_filter_pushdown_config/join_left.parquet]]},
projection=[id], file_type=parquet, predicate=DynamicFilter [ empty ]
+
+# LEFT SEMI (physical LeftSemi) correctness: only right rows with matching
left ids
+query IT rowsort
+SELECT r.*
+FROM right_parquet r
+WHERE r.id IN (SELECT l.id FROM left_parquet l);
+----
+1 right1
+3 right3
+5 right5
+
+# LEFT ANTI JOIN: both self generated and parent filters can push to the
+# preserved (left/build) side.
query TT
EXPLAIN SELECT l.*
FROM left_parquet l
@@ -309,13 +337,50 @@ logical_plan
physical_plan
01)HashJoinExec: mode=CollectLeft, join_type=LeftAnti, on=[(id@0, id@0)]
02)--DataSourceExec: file_groups={1 group:
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/dynamic_filter_pushdown_config/join_left.parquet]]},
projection=[id, data], file_type=parquet
-03)--DataSourceExec: file_groups={1 group:
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/dynamic_filter_pushdown_config/join_right.parquet]]},
projection=[id], file_type=parquet
+03)--DataSourceExec: file_groups={1 group:
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/dynamic_filter_pushdown_config/join_right.parquet]]},
projection=[id], file_type=parquet, predicate=DynamicFilter [ empty ]
+
+# LEFT MARK JOIN: the OR prevents decorrelation to LeftSemi, so the optimizer
+# uses LeftMark. Self-generated dynamic filter pushes to the probe side.
+query TT
+EXPLAIN SELECT r.id, r.info
+FROM right_parquet r
+WHERE EXISTS (SELECT 1 FROM left_parquet l WHERE r.id = l.id)
+ OR r.id = 999;
+----
+logical_plan
+01)Projection: r.id, r.info
+02)--Filter: __correlated_sq_1.mark OR r.id = Int32(999)
+03)----LeftMark Join: r.id = __correlated_sq_1.id
+04)------SubqueryAlias: r
+05)--------TableScan: right_parquet projection=[id, info]
+06)------SubqueryAlias: __correlated_sq_1
+07)--------SubqueryAlias: l
+08)----------TableScan: left_parquet projection=[id]
+physical_plan
+01)FilterExec: mark@2 OR id@0 = 999, projection=[id@0, info@1]
+02)--RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
+03)----HashJoinExec: mode=CollectLeft, join_type=LeftMark, on=[(id@0, id@0)]
+04)------DataSourceExec: file_groups={1 group:
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/dynamic_filter_pushdown_config/join_right.parquet]]},
projection=[id, info], file_type=parquet
+05)------DataSourceExec: file_groups={1 group:
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/dynamic_filter_pushdown_config/join_left.parquet]]},
projection=[id], file_type=parquet, predicate=DynamicFilter [ empty ]
+
+# LEFT MARK correctness: all right rows match EXISTS, so all 3 appear
+query IT rowsort
+SELECT r.id, r.info
+FROM right_parquet r
+WHERE EXISTS (SELECT 1 FROM left_parquet l WHERE r.id = l.id)
+ OR r.id = 999;
+----
+1 right1
+3 right3
+5 right5
# Test 2c: Parent dynamic filter (from TopK) pushed through semi/anti joins
# Sort on the join key (id) so the TopK dynamic filter pushes to BOTH sides.
-# SEMI JOIN with TopK parent: TopK generates a dynamic filter on `id` (join
key)
-# that pushes through the RightSemi join to both the build and probe sides.
+# SEMI JOIN with TopK parent: TopK generates a dynamic filter on `id` (join
+# key) that pushes through the RightSemi join to both the build and probe sides
+# as well as the HashJoinExec pushing the self-generated filter to the
+# right-hand side of the join.
query TT
EXPLAIN SELECT l.*
FROM left_parquet l
@@ -334,7 +399,7 @@ physical_plan
01)SortExec: TopK(fetch=2), expr=[id@0 ASC NULLS LAST],
preserve_partitioning=[false]
02)--HashJoinExec: mode=CollectLeft, join_type=RightSemi, on=[(id@0, id@0)]
03)----DataSourceExec: file_groups={1 group:
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/dynamic_filter_pushdown_config/join_right.parquet]]},
projection=[id], file_type=parquet, predicate=DynamicFilter [ empty ]
-04)----DataSourceExec: file_groups={1 group:
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/dynamic_filter_pushdown_config/join_left.parquet]]},
projection=[id, data], file_type=parquet, predicate=DynamicFilter [ empty ]
+04)----DataSourceExec: file_groups={1 group:
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/dynamic_filter_pushdown_config/join_left.parquet]]},
projection=[id, data], file_type=parquet, predicate=DynamicFilter [ empty ]
AND DynamicFilter [ empty ]
# Correctness check
query IT
@@ -346,8 +411,10 @@ ORDER BY l.id LIMIT 2;
1 left1
3 left3
-# ANTI JOIN with TopK parent: TopK generates a dynamic filter on `id` (join
key)
-# that pushes through the LeftAnti join to both the preserved and
non-preserved sides.
+# ANTI JOIN with TopK parent: TopK generates a dynamic filter on `id` (join
+# key) that pushes through the LeftAnti join to both the preserved and
+# non-preserved sides. The HashJoin pushes the self-generated filter to the
+# right hand side of the LeftAnti join.
query TT
EXPLAIN SELECT l.*
FROM left_parquet l
@@ -366,7 +433,7 @@ physical_plan
01)SortExec: TopK(fetch=2), expr=[id@0 ASC NULLS LAST],
preserve_partitioning=[false]
02)--HashJoinExec: mode=CollectLeft, join_type=LeftAnti, on=[(id@0, id@0)]
03)----DataSourceExec: file_groups={1 group:
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/dynamic_filter_pushdown_config/join_left.parquet]]},
projection=[id, data], file_type=parquet, predicate=DynamicFilter [ empty ]
-04)----DataSourceExec: file_groups={1 group:
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/dynamic_filter_pushdown_config/join_right.parquet]]},
projection=[id], file_type=parquet, predicate=DynamicFilter [ empty ]
+04)----DataSourceExec: file_groups={1 group:
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/dynamic_filter_pushdown_config/join_right.parquet]]},
projection=[id], file_type=parquet, predicate=DynamicFilter [ empty ] AND
DynamicFilter [ empty ]
# Correctness check
query IT
diff --git a/datafusion/sqllogictest/test_files/projection_pushdown.slt
b/datafusion/sqllogictest/test_files/projection_pushdown.slt
index dbb77b33c2..1c89923080 100644
--- a/datafusion/sqllogictest/test_files/projection_pushdown.slt
+++ b/datafusion/sqllogictest/test_files/projection_pushdown.slt
@@ -1604,7 +1604,7 @@ physical_plan
02)--HashJoinExec: mode=CollectLeft, join_type=Left, on=[(id@1, id@0)],
projection=[__datafusion_extracted_2@0, id@1, __datafusion_extracted_3@3]
03)----DataSourceExec: file_groups={1 group:
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/projection_pushdown/simple.parquet]]},
projection=[get_field(s@1, value) as __datafusion_extracted_2, id],
file_type=parquet
04)----FilterExec: __datafusion_extracted_1@0 > 5, projection=[id@1,
__datafusion_extracted_3@2]
-05)------DataSourceExec: file_groups={1 group:
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/projection_pushdown/join_right.parquet]]},
projection=[get_field(s@1, level) as __datafusion_extracted_1, id,
get_field(s@1, level) as __datafusion_extracted_3], file_type=parquet
+05)------DataSourceExec: file_groups={1 group:
[[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/projection_pushdown/join_right.parquet]]},
projection=[get_field(s@1, level) as __datafusion_extracted_1, id,
get_field(s@1, level) as __datafusion_extracted_3], file_type=parquet,
predicate=DynamicFilter [ empty ]
# Verify correctness - left join with level > 5 condition
# Only join_right rows with level > 5 are matched: id=1 (level=10), id=4
(level=8)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]