This is an automated email from the ASF dual-hosted git repository.
github-bot pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new bd95a6b8cb feat: Support swap for `RightMark` Join (#17651)
bd95a6b8cb is described below
commit bd95a6b8cbb97368f61b8924e8fbd99ed388e619
Author: Jonathan Chen <[email protected]>
AuthorDate: Wed Oct 1 15:52:09 2025 -0400
feat: Support swap for `RightMark` Join (#17651)
* feat: Support swap for `RightMark` Join
* add flag
* fmt
* add comment + fix test
* Update datafusion/physical-plan/src/joins/sort_merge_join/tests.rs
Co-authored-by: Oleks V <[email protected]>
---------
Co-authored-by: Oleks V <[email protected]>
---
datafusion/common/src/join_type.rs | 2 +
datafusion/core/tests/fuzz_cases/join_fuzz.rs | 8 +--
.../tests/physical_optimizer/join_selection.rs | 58 ++++++++++++++++-
datafusion/expr/src/logical_plan/builder.rs | 5 +-
.../src/decorrelate_predicate_subquery.rs | 49 ++++++++++++++-
.../physical-optimizer/src/join_selection.rs | 6 +-
.../physical-plan/src/joins/hash_join/exec.rs | 2 +
.../physical-plan/src/joins/nested_loop_join.rs | 2 +
.../src/joins/sort_merge_join/stream.rs | 73 +++++++++++++---------
.../src/joins/sort_merge_join/tests.rs | 40 ++++++++++--
.../physical-plan/src/joins/symmetric_hash_join.rs | 9 +++
datafusion/physical-plan/src/joins/utils.rs | 10 ++-
datafusion/sqllogictest/test_files/joins.slt | 30 +++++++++
datafusion/sqllogictest/test_files/subquery.slt | 2 +-
14 files changed, 249 insertions(+), 47 deletions(-)
diff --git a/datafusion/common/src/join_type.rs
b/datafusion/common/src/join_type.rs
index d9a1478f02..e6a90db2dc 100644
--- a/datafusion/common/src/join_type.rs
+++ b/datafusion/common/src/join_type.rs
@@ -109,6 +109,8 @@ impl JoinType {
| JoinType::RightSemi
| JoinType::LeftAnti
| JoinType::RightAnti
+ | JoinType::LeftMark
+ | JoinType::RightMark
)
}
}
diff --git a/datafusion/core/tests/fuzz_cases/join_fuzz.rs
b/datafusion/core/tests/fuzz_cases/join_fuzz.rs
index 5a2f9e9733..e8ff1ccf06 100644
--- a/datafusion/core/tests/fuzz_cases/join_fuzz.rs
+++ b/datafusion/core/tests/fuzz_cases/join_fuzz.rs
@@ -314,7 +314,7 @@ async fn test_right_mark_join_1k() {
JoinType::RightMark,
None,
)
- .run_test(&[NljHj], false)
+ .run_test(&[HjSmj, NljHj], false)
.await
}
@@ -326,7 +326,7 @@ async fn test_right_mark_join_1k_filtered() {
JoinType::RightMark,
Some(Box::new(col_lt_col_filter)),
)
- .run_test(&[NljHj], false)
+ .run_test(&[HjSmj, NljHj], false)
.await
}
@@ -555,7 +555,7 @@ async fn test_right_mark_join_1k_binary() {
JoinType::RightMark,
None,
)
- .run_test(&[NljHj], false)
+ .run_test(&[HjSmj, NljHj], false)
.await
}
@@ -567,7 +567,7 @@ async fn test_right_mark_join_1k_binary_filtered() {
JoinType::RightMark,
Some(Box::new(col_lt_col_filter)),
)
- .run_test(&[NljHj], false)
+ .run_test(&[HjSmj, NljHj], false)
.await
}
diff --git a/datafusion/core/tests/physical_optimizer/join_selection.rs
b/datafusion/core/tests/physical_optimizer/join_selection.rs
index 7ae1d6e50d..551fde5d7f 100644
--- a/datafusion/core/tests/physical_optimizer/join_selection.rs
+++ b/datafusion/core/tests/physical_optimizer/join_selection.rs
@@ -369,6 +369,61 @@ async fn test_join_with_swap_semi() {
}
}
+#[tokio::test]
+async fn test_join_with_swap_mark() {
+ let join_types = [JoinType::LeftMark, JoinType::RightMark];
+ for join_type in join_types {
+ let (big, small) = create_big_and_small();
+
+ let join = HashJoinExec::try_new(
+ Arc::clone(&big),
+ Arc::clone(&small),
+ vec![(
+ Arc::new(Column::new_with_schema("big_col",
&big.schema()).unwrap()),
+ Arc::new(Column::new_with_schema("small_col",
&small.schema()).unwrap()),
+ )],
+ None,
+ &join_type,
+ None,
+ PartitionMode::Partitioned,
+ NullEquality::NullEqualsNothing,
+ )
+ .unwrap();
+
+ let original_schema = join.schema();
+
+ let optimized_join = JoinSelection::new()
+ .optimize(Arc::new(join), &ConfigOptions::new())
+ .unwrap();
+
+ let swapped_join = optimized_join
+ .as_any()
+ .downcast_ref::<HashJoinExec>()
+ .expect(
+ "A proj is not required to swap columns back to their original
order",
+ );
+
+ assert_eq!(swapped_join.schema().fields().len(), 2);
+ assert_eq!(
+ swapped_join
+ .left()
+ .partition_statistics(None)
+ .unwrap()
+ .total_byte_size,
+ Precision::Inexact(8192)
+ );
+ assert_eq!(
+ swapped_join
+ .right()
+ .partition_statistics(None)
+ .unwrap()
+ .total_byte_size,
+ Precision::Inexact(2097152)
+ );
+ assert_eq!(original_schema, swapped_join.schema());
+ }
+}
+
/// Compare the input plan with the plan after running the probe order
optimizer.
macro_rules! assert_optimized {
($EXPECTED_LINES: expr, $PLAN: expr) => {
@@ -576,7 +631,8 @@ async fn test_nl_join_with_swap(join_type: JoinType) {
case::left_semi(JoinType::LeftSemi),
case::left_anti(JoinType::LeftAnti),
case::right_semi(JoinType::RightSemi),
- case::right_anti(JoinType::RightAnti)
+ case::right_anti(JoinType::RightAnti),
+ case::right_mark(JoinType::RightMark)
)]
#[tokio::test]
async fn test_nl_join_with_swap_no_proj(join_type: JoinType) {
diff --git a/datafusion/expr/src/logical_plan/builder.rs
b/datafusion/expr/src/logical_plan/builder.rs
index 7b57bce105..42eda4aea7 100644
--- a/datafusion/expr/src/logical_plan/builder.rs
+++ b/datafusion/expr/src/logical_plan/builder.rs
@@ -1696,7 +1696,10 @@ pub fn build_join_schema(
);
let (schema1, schema2) = match join_type {
- JoinType::Right | JoinType::RightSemi | JoinType::RightAnti => (left,
right),
+ JoinType::Right
+ | JoinType::RightSemi
+ | JoinType::RightAnti
+ | JoinType::RightMark => (left, right),
_ => (right, left),
};
diff --git a/datafusion/optimizer/src/decorrelate_predicate_subquery.rs
b/datafusion/optimizer/src/decorrelate_predicate_subquery.rs
index a72657bf68..c8be689fc5 100644
--- a/datafusion/optimizer/src/decorrelate_predicate_subquery.rs
+++ b/datafusion/optimizer/src/decorrelate_predicate_subquery.rs
@@ -31,7 +31,7 @@ use datafusion_common::{internal_err, plan_err, Column,
Result};
use datafusion_expr::expr::{Exists, InSubquery};
use datafusion_expr::expr_rewriter::create_col_from_scalar_expr;
use datafusion_expr::logical_plan::{JoinType, Subquery};
-use datafusion_expr::utils::{conjunction, split_conjunction_owned};
+use datafusion_expr::utils::{conjunction, expr_to_columns,
split_conjunction_owned};
use datafusion_expr::{
exists, in_subquery, lit, not, not_exists, not_in_subquery, BinaryExpr,
Expr, Filter,
LogicalPlan, LogicalPlanBuilder, Operator,
@@ -342,7 +342,7 @@ fn build_join(
replace_qualified_name(filter, &all_correlated_cols,
&alias).map(Some)
})?;
- let join_filter = match (join_filter_opt, in_predicate_opt) {
+ let join_filter = match (join_filter_opt, in_predicate_opt.clone()) {
(
Some(join_filter),
Some(Expr::BinaryExpr(BinaryExpr {
@@ -371,6 +371,51 @@ fn build_join(
(None, None) => lit(true),
_ => return Ok(None),
};
+
+ if matches!(join_type, JoinType::LeftMark | JoinType::RightMark) {
+ let right_schema = sub_query_alias.schema();
+
+ // Gather all columns needed for the join filter + predicates
+ let mut needed = std::collections::HashSet::new();
+ expr_to_columns(&join_filter, &mut needed)?;
+ if let Some(ref in_pred) = in_predicate_opt {
+ expr_to_columns(in_pred, &mut needed)?;
+ }
+
+ // Keep only columns that actually belong to the RIGHT child, and sort
by their
+ // position in the right schema for deterministic order.
+ let mut right_cols_idx_and_col: Vec<(usize, Column)> = needed
+ .into_iter()
+ .filter_map(|c| right_schema.index_of_column(&c).ok().map(|idx|
(idx, c)))
+ .collect();
+
+ right_cols_idx_and_col.sort_by_key(|(idx, _)| *idx);
+
+ let right_proj_exprs: Vec<Expr> = right_cols_idx_and_col
+ .into_iter()
+ .map(|(_, c)| Expr::Column(c))
+ .collect();
+
+ let right_projected = if !right_proj_exprs.is_empty() {
+ LogicalPlanBuilder::from(sub_query_alias.clone())
+ .project(right_proj_exprs)?
+ .build()?
+ } else {
+ // Degenerate case: no right columns referenced by the predicate(s)
+ sub_query_alias.clone()
+ };
+ let new_plan = LogicalPlanBuilder::from(left.clone())
+ .join_on(right_projected, join_type, Some(join_filter))?
+ .build()?;
+
+ debug!(
+ "predicate subquery optimized:\n{}",
+ new_plan.display_indent()
+ );
+
+ return Ok(Some(new_plan));
+ }
+
// join our sub query into the main plan
let new_plan = LogicalPlanBuilder::from(left.clone())
.join_on(sub_query_alias, join_type, Some(join_filter))?
diff --git a/datafusion/physical-optimizer/src/join_selection.rs
b/datafusion/physical-optimizer/src/join_selection.rs
index c2cfca681f..1db4d7b305 100644
--- a/datafusion/physical-optimizer/src/join_selection.rs
+++ b/datafusion/physical-optimizer/src/join_selection.rs
@@ -514,7 +514,11 @@ pub(crate) fn swap_join_according_to_unboundedness(
match (*partition_mode, *join_type) {
(
_,
- JoinType::Right | JoinType::RightSemi | JoinType::RightAnti |
JoinType::Full,
+ JoinType::Right
+ | JoinType::RightSemi
+ | JoinType::RightAnti
+ | JoinType::RightMark
+ | JoinType::Full,
) => internal_err!("{join_type} join cannot be swapped for unbounded
input."),
(PartitionMode::Partitioned, _) => {
hash_join.swap_inputs(PartitionMode::Partitioned)
diff --git a/datafusion/physical-plan/src/joins/hash_join/exec.rs
b/datafusion/physical-plan/src/joins/hash_join/exec.rs
index 728497444c..fd3962c6ae 100644
--- a/datafusion/physical-plan/src/joins/hash_join/exec.rs
+++ b/datafusion/physical-plan/src/joins/hash_join/exec.rs
@@ -690,6 +690,8 @@ impl HashJoinExec {
| JoinType::RightSemi
| JoinType::LeftAnti
| JoinType::RightAnti
+ | JoinType::LeftMark
+ | JoinType::RightMark
) || self.projection.is_some()
{
Ok(Arc::new(new_join))
diff --git a/datafusion/physical-plan/src/joins/nested_loop_join.rs
b/datafusion/physical-plan/src/joins/nested_loop_join.rs
index 00d1613090..6cd39e5a40 100644
--- a/datafusion/physical-plan/src/joins/nested_loop_join.rs
+++ b/datafusion/physical-plan/src/joins/nested_loop_join.rs
@@ -379,6 +379,8 @@ impl NestedLoopJoinExec {
| JoinType::RightSemi
| JoinType::LeftAnti
| JoinType::RightAnti
+ | JoinType::LeftMark
+ | JoinType::RightMark
) || self.projection.is_some()
{
Arc::new(new_join)
diff --git a/datafusion/physical-plan/src/joins/sort_merge_join/stream.rs
b/datafusion/physical-plan/src/joins/sort_merge_join/stream.rs
index f16ef24fd1..879f47638d 100644
--- a/datafusion/physical-plan/src/joins/sort_merge_join/stream.rs
+++ b/datafusion/physical-plan/src/joins/sort_merge_join/stream.rs
@@ -430,7 +430,7 @@ pub(super) fn get_corrected_filter_mask(
corrected_mask.append_n(expected_size - corrected_mask.len(),
false);
Some(corrected_mask.finish())
}
- JoinType::LeftMark => {
+ JoinType::LeftMark | JoinType::RightMark => {
for i in 0..row_indices_length {
let last_index =
last_index_for_row(i, row_indices, batch_ids,
row_indices_length);
@@ -582,6 +582,7 @@ impl Stream for SortMergeJoinStream {
| JoinType::LeftMark
| JoinType::Right
| JoinType::RightSemi
+ | JoinType::RightMark
| JoinType::LeftAnti
| JoinType::RightAnti
| JoinType::Full
@@ -691,6 +692,7 @@ impl Stream for SortMergeJoinStream {
| JoinType::LeftAnti
| JoinType::RightAnti
| JoinType::LeftMark
+ | JoinType::RightMark
| JoinType::Full
)
{
@@ -718,6 +720,7 @@ impl Stream for SortMergeJoinStream {
| JoinType::RightAnti
| JoinType::Full
| JoinType::LeftMark
+ | JoinType::RightMark
)
{
let record_batch = self.filter_joined_batch()?;
@@ -1042,6 +1045,7 @@ impl SortMergeJoinStream {
| JoinType::LeftAnti
| JoinType::RightAnti
| JoinType::LeftMark
+ | JoinType::RightMark
) {
join_streamed = !self.streamed_joined;
}
@@ -1049,9 +1053,15 @@ impl SortMergeJoinStream {
Ordering::Equal => {
if matches!(
self.join_type,
- JoinType::LeftSemi | JoinType::LeftMark |
JoinType::RightSemi
+ JoinType::LeftSemi
+ | JoinType::LeftMark
+ | JoinType::RightSemi
+ | JoinType::RightMark
) {
- mark_row_as_match = matches!(self.join_type,
JoinType::LeftMark);
+ mark_row_as_match = matches!(
+ self.join_type,
+ JoinType::LeftMark | JoinType::RightMark
+ );
// if the join filter is specified then its needed to
output the streamed index
// only if it has not been emitted before
// the `join_filter_matched_idxs` keeps track on if
streamed index has a successful
@@ -1266,31 +1276,32 @@ impl SortMergeJoinStream {
// The row indices of joined buffered batch
let right_indices: UInt64Array = chunk.buffered_indices.finish();
- let mut right_columns = if matches!(self.join_type,
JoinType::LeftMark) {
- vec![Arc::new(is_not_null(&right_indices)?) as ArrayRef]
- } else if matches!(
- self.join_type,
- JoinType::LeftSemi
- | JoinType::LeftAnti
- | JoinType::RightAnti
- | JoinType::RightSemi
- ) {
- vec![]
- } else if let Some(buffered_idx) = chunk.buffered_batch_idx {
- fetch_right_columns_by_idxs(
- &self.buffered_data,
- buffered_idx,
- &right_indices,
- )?
- } else {
- // If buffered batch none, meaning it is null joined batch.
- // We need to create null arrays for buffered columns to join
with streamed rows.
- create_unmatched_columns(
+ let mut right_columns =
+ if matches!(self.join_type, JoinType::LeftMark |
JoinType::RightMark) {
+ vec![Arc::new(is_not_null(&right_indices)?) as ArrayRef]
+ } else if matches!(
self.join_type,
- &self.buffered_schema,
- right_indices.len(),
- )
- };
+ JoinType::LeftSemi
+ | JoinType::LeftAnti
+ | JoinType::RightAnti
+ | JoinType::RightSemi
+ ) {
+ vec![]
+ } else if let Some(buffered_idx) = chunk.buffered_batch_idx {
+ fetch_right_columns_by_idxs(
+ &self.buffered_data,
+ buffered_idx,
+ &right_indices,
+ )?
+ } else {
+ // If buffered batch none, meaning it is null joined batch.
+ // We need to create null arrays for buffered columns to
join with streamed rows.
+ create_unmatched_columns(
+ self.join_type,
+ &self.buffered_schema,
+ right_indices.len(),
+ )
+ };
// Prepare the columns we apply join filter on later.
// Only for joined rows between streamed and buffered.
@@ -1309,7 +1320,7 @@ impl SortMergeJoinStream {
get_filter_column(&self.filter, &left_columns,
&right_cols)
} else if matches!(
self.join_type,
- JoinType::RightAnti | JoinType::RightSemi
+ JoinType::RightAnti | JoinType::RightSemi |
JoinType::RightMark
) {
let right_cols = fetch_right_columns_by_idxs(
&self.buffered_data,
@@ -1375,6 +1386,7 @@ impl SortMergeJoinStream {
| JoinType::LeftAnti
| JoinType::RightAnti
| JoinType::LeftMark
+ | JoinType::RightMark
| JoinType::Full
) {
self.staging_output_record_batches
@@ -1475,6 +1487,7 @@ impl SortMergeJoinStream {
| JoinType::LeftAnti
| JoinType::RightAnti
| JoinType::LeftMark
+ | JoinType::RightMark
| JoinType::Full
))
{
@@ -1537,7 +1550,7 @@ impl SortMergeJoinStream {
if matches!(
self.join_type,
- JoinType::Left | JoinType::LeftMark | JoinType::Right
+ JoinType::Left | JoinType::LeftMark | JoinType::Right |
JoinType::RightMark
) {
let null_mask = compute::not(corrected_mask)?;
let null_joined_batch = filter_record_batch(&record_batch,
&null_mask)?;
@@ -1658,7 +1671,7 @@ fn create_unmatched_columns(
schema: &SchemaRef,
size: usize,
) -> Vec<ArrayRef> {
- if matches!(join_type, JoinType::LeftMark) {
+ if matches!(join_type, JoinType::LeftMark | JoinType::RightMark) {
vec![Arc::new(BooleanArray::from(vec![false; size])) as ArrayRef]
} else {
schema
diff --git a/datafusion/physical-plan/src/joins/sort_merge_join/tests.rs
b/datafusion/physical-plan/src/joins/sort_merge_join/tests.rs
index 002a46f97a..83a5c4041c 100644
--- a/datafusion/physical-plan/src/joins/sort_merge_join/tests.rs
+++ b/datafusion/physical-plan/src/joins/sort_merge_join/tests.rs
@@ -1314,6 +1314,38 @@ async fn join_left_mark() -> Result<()> {
Ok(())
}
+#[tokio::test]
+async fn join_right_mark() -> Result<()> {
+ let left = build_table(
+ ("a1", &vec![1, 2, 2, 3]),
+ ("b1", &vec![4, 5, 5, 7]), // 7 does not exist on the right
+ ("c1", &vec![7, 8, 8, 9]),
+ );
+ let right = build_table(
+ ("a2", &vec![10, 20, 30, 40]),
+ ("b1", &vec![4, 4, 5, 6]), // 5 is double on the left
+ ("c2", &vec![60, 70, 80, 90]),
+ );
+ let on = vec![(
+ Arc::new(Column::new_with_schema("b1", &left.schema())?) as _,
+ Arc::new(Column::new_with_schema("b1", &right.schema())?) as _,
+ )];
+
+ let (_, batches) = join_collect(left, right, on, RightMark).await?;
+ // The output order is important as SMJ preserves sortedness
+ assert_snapshot!(batches_to_string(&batches), @r#"
+ +----+----+----+-------+
+ | a2 | b1 | c2 | mark |
+ +----+----+----+-------+
+ | 10 | 4 | 60 | true |
+ | 20 | 4 | 70 | true |
+ | 30 | 5 | 80 | true |
+ | 40 | 6 | 90 | false |
+ +----+----+----+-------+
+ "#);
+ Ok(())
+}
+
#[tokio::test]
async fn join_with_duplicated_column_names() -> Result<()> {
let left = build_table(
@@ -1736,7 +1768,7 @@ async fn overallocation_single_batch_no_spill() ->
Result<()> {
let sort_options = vec![SortOptions::default(); on.len()];
let join_types = vec![
- Inner, Left, Right, RightSemi, Full, LeftSemi, LeftAnti, LeftMark,
+ Inner, Left, Right, RightSemi, Full, LeftSemi, LeftAnti, LeftMark,
RightMark,
];
// Disable DiskManager to prevent spilling
@@ -1817,7 +1849,7 @@ async fn overallocation_multi_batch_no_spill() ->
Result<()> {
let sort_options = vec![SortOptions::default(); on.len()];
let join_types = vec![
- Inner, Left, Right, RightSemi, Full, LeftSemi, LeftAnti, LeftMark,
+ Inner, Left, Right, RightSemi, Full, LeftSemi, LeftAnti, LeftMark,
RightMark,
];
// Disable DiskManager to prevent spilling
@@ -1877,7 +1909,7 @@ async fn overallocation_single_batch_spill() ->
Result<()> {
let sort_options = vec![SortOptions::default(); on.len()];
let join_types = [
- Inner, Left, Right, RightSemi, Full, LeftSemi, LeftAnti, LeftMark,
+ Inner, Left, Right, RightSemi, Full, LeftSemi, LeftAnti, LeftMark,
RightMark,
];
// Enable DiskManager to allow spilling
@@ -1981,7 +2013,7 @@ async fn overallocation_multi_batch_spill() -> Result<()>
{
let sort_options = vec![SortOptions::default(); on.len()];
let join_types = [
- Inner, Left, Right, RightSemi, Full, LeftSemi, LeftAnti, LeftMark,
+ Inner, Left, Right, RightSemi, Full, LeftSemi, LeftAnti, LeftMark,
RightMark,
];
// Enable DiskManager to allow spilling
diff --git a/datafusion/physical-plan/src/joins/symmetric_hash_join.rs
b/datafusion/physical-plan/src/joins/symmetric_hash_join.rs
index aedeb97186..b55b7e15f1 100644
--- a/datafusion/physical-plan/src/joins/symmetric_hash_join.rs
+++ b/datafusion/physical-plan/src/joins/symmetric_hash_join.rs
@@ -1018,6 +1018,7 @@ pub(crate) fn join_with_probe_batch(
| JoinType::LeftSemi
| JoinType::LeftMark
| JoinType::RightSemi
+ | JoinType::RightMark
) {
Ok(None)
} else {
@@ -1864,6 +1865,7 @@ mod tests {
JoinType::LeftAnti,
JoinType::LeftMark,
JoinType::RightAnti,
+ JoinType::RightMark,
JoinType::Full
)]
join_type: JoinType,
@@ -1952,6 +1954,7 @@ mod tests {
JoinType::LeftAnti,
JoinType::LeftMark,
JoinType::RightAnti,
+ JoinType::RightMark,
JoinType::Full
)]
join_type: JoinType,
@@ -2020,6 +2023,7 @@ mod tests {
JoinType::LeftAnti,
JoinType::LeftMark,
JoinType::RightAnti,
+ JoinType::RightMark,
JoinType::Full
)]
join_type: JoinType,
@@ -2073,6 +2077,7 @@ mod tests {
JoinType::LeftAnti,
JoinType::LeftMark,
JoinType::RightAnti,
+ JoinType::RightMark,
JoinType::Full
)]
join_type: JoinType,
@@ -2101,6 +2106,7 @@ mod tests {
JoinType::LeftAnti,
JoinType::LeftMark,
JoinType::RightAnti,
+ JoinType::RightMark,
JoinType::Full
)]
join_type: JoinType,
@@ -2485,6 +2491,7 @@ mod tests {
JoinType::LeftAnti,
JoinType::LeftMark,
JoinType::RightAnti,
+ JoinType::RightMark,
JoinType::Full
)]
join_type: JoinType,
@@ -2571,6 +2578,7 @@ mod tests {
JoinType::LeftAnti,
JoinType::LeftMark,
JoinType::RightAnti,
+ JoinType::RightMark,
JoinType::Full
)]
join_type: JoinType,
@@ -2649,6 +2657,7 @@ mod tests {
JoinType::LeftAnti,
JoinType::LeftMark,
JoinType::RightAnti,
+ JoinType::RightMark,
JoinType::Full
)]
join_type: JoinType,
diff --git a/datafusion/physical-plan/src/joins/utils.rs
b/datafusion/physical-plan/src/joins/utils.rs
index a62ae79635..c50bfce93a 100644
--- a/datafusion/physical-plan/src/joins/utils.rs
+++ b/datafusion/physical-plan/src/joins/utils.rs
@@ -306,7 +306,10 @@ pub fn build_join_schema(
};
let (schema1, schema2) = match join_type {
- JoinType::Right | JoinType::RightSemi | JoinType::RightAnti => (left,
right),
+ JoinType::Right
+ | JoinType::RightSemi
+ | JoinType::RightAnti
+ | JoinType::RightMark => (left, right),
_ => (right, left),
};
@@ -1619,8 +1622,9 @@ pub fn swap_join_projection(
JoinType::LeftAnti
| JoinType::LeftSemi
| JoinType::RightAnti
- | JoinType::RightSemi => projection.cloned(),
-
+ | JoinType::RightSemi
+ | JoinType::LeftMark
+ | JoinType::RightMark => projection.cloned(),
_ => projection.map(|p| {
p.iter()
.map(|i| {
diff --git a/datafusion/sqllogictest/test_files/joins.slt
b/datafusion/sqllogictest/test_files/joins.slt
index c24b0777cc..96d2bad086 100644
--- a/datafusion/sqllogictest/test_files/joins.slt
+++ b/datafusion/sqllogictest/test_files/joins.slt
@@ -5160,6 +5160,36 @@ LEFT ANTI JOIN t2 ON k1 = k2
WHERE k1 < 0
----
+# Mark testing
+statement ok
+CREATE OR REPLACE TABLE t1(b INT, c INT, d INT);
+
+statement ok
+INSERT INTO t1 VALUES
+ (10, 5, 3),
+ ( 1, 7, 8),
+ ( 2, 9, 7),
+ ( 3, 8,10),
+ ( 5, 6, 6),
+ ( 0, 4, 9),
+ ( 4, 8, 7),
+ (100,6, 5);
+
+query I rowsort
+SELECT c
+ FROM t1
+ WHERE c > d
+ OR EXISTS(SELECT 1 FROM t1 AS x WHERE x.b<t1.b)
+ OR (c <= d-2 OR c >= d+2)
+----
+4
+5
+6
+6
+7
+8
+8
+9
statement ok
DROP TABLE t1;
diff --git a/datafusion/sqllogictest/test_files/subquery.slt
b/datafusion/sqllogictest/test_files/subquery.slt
index 43f85d1e20..dec9357495 100644
--- a/datafusion/sqllogictest/test_files/subquery.slt
+++ b/datafusion/sqllogictest/test_files/subquery.slt
@@ -1192,7 +1192,7 @@ physical_plan
01)CoalesceBatchesExec: target_batch_size=2
02)--FilterExec: t1_id@0 > 40 OR NOT mark@3, projection=[t1_id@0, t1_name@1,
t1_int@2]
03)----CoalesceBatchesExec: target_batch_size=2
-04)------HashJoinExec: mode=CollectLeft, join_type=LeftMark, on=[(t1_id@0,
t2_id@0)]
+04)------HashJoinExec: mode=CollectLeft, join_type=RightMark, on=[(t2_id@0,
t1_id@0)]
05)--------DataSourceExec: partitions=1, partition_sizes=[2]
06)--------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
07)----------DataSourceExec: partitions=1, partition_sizes=[2]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]