This is an automated email from the ASF dual-hosted git repository.

github-bot pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new bd95a6b8cb feat: Support swap for `RightMark` Join (#17651)
bd95a6b8cb is described below

commit bd95a6b8cbb97368f61b8924e8fbd99ed388e619
Author: Jonathan Chen <[email protected]>
AuthorDate: Wed Oct 1 15:52:09 2025 -0400

    feat: Support swap for `RightMark` Join (#17651)
    
    * feat: Support swap for `RightMark` Join
    
    * add flag
    
    * fmt
    
    * add comment + fix test
    
    * Update datafusion/physical-plan/src/joins/sort_merge_join/tests.rs
    
    Co-authored-by: Oleks V <[email protected]>
    
    ---------
    
    Co-authored-by: Oleks V <[email protected]>
---
 datafusion/common/src/join_type.rs                 |  2 +
 datafusion/core/tests/fuzz_cases/join_fuzz.rs      |  8 +--
 .../tests/physical_optimizer/join_selection.rs     | 58 ++++++++++++++++-
 datafusion/expr/src/logical_plan/builder.rs        |  5 +-
 .../src/decorrelate_predicate_subquery.rs          | 49 ++++++++++++++-
 .../physical-optimizer/src/join_selection.rs       |  6 +-
 .../physical-plan/src/joins/hash_join/exec.rs      |  2 +
 .../physical-plan/src/joins/nested_loop_join.rs    |  2 +
 .../src/joins/sort_merge_join/stream.rs            | 73 +++++++++++++---------
 .../src/joins/sort_merge_join/tests.rs             | 40 ++++++++++--
 .../physical-plan/src/joins/symmetric_hash_join.rs |  9 +++
 datafusion/physical-plan/src/joins/utils.rs        | 10 ++-
 datafusion/sqllogictest/test_files/joins.slt       | 30 +++++++++
 datafusion/sqllogictest/test_files/subquery.slt    |  2 +-
 14 files changed, 249 insertions(+), 47 deletions(-)

diff --git a/datafusion/common/src/join_type.rs 
b/datafusion/common/src/join_type.rs
index d9a1478f02..e6a90db2dc 100644
--- a/datafusion/common/src/join_type.rs
+++ b/datafusion/common/src/join_type.rs
@@ -109,6 +109,8 @@ impl JoinType {
                 | JoinType::RightSemi
                 | JoinType::LeftAnti
                 | JoinType::RightAnti
+                | JoinType::LeftMark
+                | JoinType::RightMark
         )
     }
 }
diff --git a/datafusion/core/tests/fuzz_cases/join_fuzz.rs 
b/datafusion/core/tests/fuzz_cases/join_fuzz.rs
index 5a2f9e9733..e8ff1ccf06 100644
--- a/datafusion/core/tests/fuzz_cases/join_fuzz.rs
+++ b/datafusion/core/tests/fuzz_cases/join_fuzz.rs
@@ -314,7 +314,7 @@ async fn test_right_mark_join_1k() {
         JoinType::RightMark,
         None,
     )
-    .run_test(&[NljHj], false)
+    .run_test(&[HjSmj, NljHj], false)
     .await
 }
 
@@ -326,7 +326,7 @@ async fn test_right_mark_join_1k_filtered() {
         JoinType::RightMark,
         Some(Box::new(col_lt_col_filter)),
     )
-    .run_test(&[NljHj], false)
+    .run_test(&[HjSmj, NljHj], false)
     .await
 }
 
@@ -555,7 +555,7 @@ async fn test_right_mark_join_1k_binary() {
         JoinType::RightMark,
         None,
     )
-    .run_test(&[NljHj], false)
+    .run_test(&[HjSmj, NljHj], false)
     .await
 }
 
@@ -567,7 +567,7 @@ async fn test_right_mark_join_1k_binary_filtered() {
         JoinType::RightMark,
         Some(Box::new(col_lt_col_filter)),
     )
-    .run_test(&[NljHj], false)
+    .run_test(&[HjSmj, NljHj], false)
     .await
 }
 
diff --git a/datafusion/core/tests/physical_optimizer/join_selection.rs 
b/datafusion/core/tests/physical_optimizer/join_selection.rs
index 7ae1d6e50d..551fde5d7f 100644
--- a/datafusion/core/tests/physical_optimizer/join_selection.rs
+++ b/datafusion/core/tests/physical_optimizer/join_selection.rs
@@ -369,6 +369,61 @@ async fn test_join_with_swap_semi() {
     }
 }
 
+#[tokio::test]
+async fn test_join_with_swap_mark() {
+    let join_types = [JoinType::LeftMark, JoinType::RightMark];
+    for join_type in join_types {
+        let (big, small) = create_big_and_small();
+
+        let join = HashJoinExec::try_new(
+            Arc::clone(&big),
+            Arc::clone(&small),
+            vec![(
+                Arc::new(Column::new_with_schema("big_col", 
&big.schema()).unwrap()),
+                Arc::new(Column::new_with_schema("small_col", 
&small.schema()).unwrap()),
+            )],
+            None,
+            &join_type,
+            None,
+            PartitionMode::Partitioned,
+            NullEquality::NullEqualsNothing,
+        )
+        .unwrap();
+
+        let original_schema = join.schema();
+
+        let optimized_join = JoinSelection::new()
+            .optimize(Arc::new(join), &ConfigOptions::new())
+            .unwrap();
+
+        let swapped_join = optimized_join
+            .as_any()
+            .downcast_ref::<HashJoinExec>()
+            .expect(
+                "A proj is not required to swap columns back to their original 
order",
+            );
+
+        assert_eq!(swapped_join.schema().fields().len(), 2);
+        assert_eq!(
+            swapped_join
+                .left()
+                .partition_statistics(None)
+                .unwrap()
+                .total_byte_size,
+            Precision::Inexact(8192)
+        );
+        assert_eq!(
+            swapped_join
+                .right()
+                .partition_statistics(None)
+                .unwrap()
+                .total_byte_size,
+            Precision::Inexact(2097152)
+        );
+        assert_eq!(original_schema, swapped_join.schema());
+    }
+}
+
 /// Compare the input plan with the plan after running the probe order 
optimizer.
 macro_rules! assert_optimized {
     ($EXPECTED_LINES: expr, $PLAN: expr) => {
@@ -576,7 +631,8 @@ async fn test_nl_join_with_swap(join_type: JoinType) {
     case::left_semi(JoinType::LeftSemi),
     case::left_anti(JoinType::LeftAnti),
     case::right_semi(JoinType::RightSemi),
-    case::right_anti(JoinType::RightAnti)
+    case::right_anti(JoinType::RightAnti),
+    case::right_mark(JoinType::RightMark)
 )]
 #[tokio::test]
 async fn test_nl_join_with_swap_no_proj(join_type: JoinType) {
diff --git a/datafusion/expr/src/logical_plan/builder.rs 
b/datafusion/expr/src/logical_plan/builder.rs
index 7b57bce105..42eda4aea7 100644
--- a/datafusion/expr/src/logical_plan/builder.rs
+++ b/datafusion/expr/src/logical_plan/builder.rs
@@ -1696,7 +1696,10 @@ pub fn build_join_schema(
     );
 
     let (schema1, schema2) = match join_type {
-        JoinType::Right | JoinType::RightSemi | JoinType::RightAnti => (left, 
right),
+        JoinType::Right
+        | JoinType::RightSemi
+        | JoinType::RightAnti
+        | JoinType::RightMark => (left, right),
         _ => (right, left),
     };
 
diff --git a/datafusion/optimizer/src/decorrelate_predicate_subquery.rs 
b/datafusion/optimizer/src/decorrelate_predicate_subquery.rs
index a72657bf68..c8be689fc5 100644
--- a/datafusion/optimizer/src/decorrelate_predicate_subquery.rs
+++ b/datafusion/optimizer/src/decorrelate_predicate_subquery.rs
@@ -31,7 +31,7 @@ use datafusion_common::{internal_err, plan_err, Column, 
Result};
 use datafusion_expr::expr::{Exists, InSubquery};
 use datafusion_expr::expr_rewriter::create_col_from_scalar_expr;
 use datafusion_expr::logical_plan::{JoinType, Subquery};
-use datafusion_expr::utils::{conjunction, split_conjunction_owned};
+use datafusion_expr::utils::{conjunction, expr_to_columns, 
split_conjunction_owned};
 use datafusion_expr::{
     exists, in_subquery, lit, not, not_exists, not_in_subquery, BinaryExpr, 
Expr, Filter,
     LogicalPlan, LogicalPlanBuilder, Operator,
@@ -342,7 +342,7 @@ fn build_join(
             replace_qualified_name(filter, &all_correlated_cols, 
&alias).map(Some)
         })?;
 
-    let join_filter = match (join_filter_opt, in_predicate_opt) {
+    let join_filter = match (join_filter_opt, in_predicate_opt.clone()) {
         (
             Some(join_filter),
             Some(Expr::BinaryExpr(BinaryExpr {
@@ -371,6 +371,51 @@ fn build_join(
         (None, None) => lit(true),
         _ => return Ok(None),
     };
+
+    if matches!(join_type, JoinType::LeftMark | JoinType::RightMark) {
+        let right_schema = sub_query_alias.schema();
+
+        // Gather all columns needed for the join filter + predicates
+        let mut needed = std::collections::HashSet::new();
+        expr_to_columns(&join_filter, &mut needed)?;
+        if let Some(ref in_pred) = in_predicate_opt {
+            expr_to_columns(in_pred, &mut needed)?;
+        }
+
+        // Keep only columns that actually belong to the RIGHT child, and sort 
by their
+        // position in the right schema for deterministic order.
+        let mut right_cols_idx_and_col: Vec<(usize, Column)> = needed
+            .into_iter()
+            .filter_map(|c| right_schema.index_of_column(&c).ok().map(|idx| 
(idx, c)))
+            .collect();
+
+        right_cols_idx_and_col.sort_by_key(|(idx, _)| *idx);
+
+        let right_proj_exprs: Vec<Expr> = right_cols_idx_and_col
+            .into_iter()
+            .map(|(_, c)| Expr::Column(c))
+            .collect();
+
+        let right_projected = if !right_proj_exprs.is_empty() {
+            LogicalPlanBuilder::from(sub_query_alias.clone())
+                .project(right_proj_exprs)?
+                .build()?
+        } else {
+            // Degenerate case: no right columns referenced by the predicate(s)
+            sub_query_alias.clone()
+        };
+        let new_plan = LogicalPlanBuilder::from(left.clone())
+            .join_on(right_projected, join_type, Some(join_filter))?
+            .build()?;
+
+        debug!(
+            "predicate subquery optimized:\n{}",
+            new_plan.display_indent()
+        );
+
+        return Ok(Some(new_plan));
+    }
+
     // join our sub query into the main plan
     let new_plan = LogicalPlanBuilder::from(left.clone())
         .join_on(sub_query_alias, join_type, Some(join_filter))?
diff --git a/datafusion/physical-optimizer/src/join_selection.rs 
b/datafusion/physical-optimizer/src/join_selection.rs
index c2cfca681f..1db4d7b305 100644
--- a/datafusion/physical-optimizer/src/join_selection.rs
+++ b/datafusion/physical-optimizer/src/join_selection.rs
@@ -514,7 +514,11 @@ pub(crate) fn swap_join_according_to_unboundedness(
     match (*partition_mode, *join_type) {
         (
             _,
-            JoinType::Right | JoinType::RightSemi | JoinType::RightAnti | 
JoinType::Full,
+            JoinType::Right
+            | JoinType::RightSemi
+            | JoinType::RightAnti
+            | JoinType::RightMark
+            | JoinType::Full,
         ) => internal_err!("{join_type} join cannot be swapped for unbounded 
input."),
         (PartitionMode::Partitioned, _) => {
             hash_join.swap_inputs(PartitionMode::Partitioned)
diff --git a/datafusion/physical-plan/src/joins/hash_join/exec.rs 
b/datafusion/physical-plan/src/joins/hash_join/exec.rs
index 728497444c..fd3962c6ae 100644
--- a/datafusion/physical-plan/src/joins/hash_join/exec.rs
+++ b/datafusion/physical-plan/src/joins/hash_join/exec.rs
@@ -690,6 +690,8 @@ impl HashJoinExec {
                 | JoinType::RightSemi
                 | JoinType::LeftAnti
                 | JoinType::RightAnti
+                | JoinType::LeftMark
+                | JoinType::RightMark
         ) || self.projection.is_some()
         {
             Ok(Arc::new(new_join))
diff --git a/datafusion/physical-plan/src/joins/nested_loop_join.rs 
b/datafusion/physical-plan/src/joins/nested_loop_join.rs
index 00d1613090..6cd39e5a40 100644
--- a/datafusion/physical-plan/src/joins/nested_loop_join.rs
+++ b/datafusion/physical-plan/src/joins/nested_loop_join.rs
@@ -379,6 +379,8 @@ impl NestedLoopJoinExec {
                 | JoinType::RightSemi
                 | JoinType::LeftAnti
                 | JoinType::RightAnti
+                | JoinType::LeftMark
+                | JoinType::RightMark
         ) || self.projection.is_some()
         {
             Arc::new(new_join)
diff --git a/datafusion/physical-plan/src/joins/sort_merge_join/stream.rs 
b/datafusion/physical-plan/src/joins/sort_merge_join/stream.rs
index f16ef24fd1..879f47638d 100644
--- a/datafusion/physical-plan/src/joins/sort_merge_join/stream.rs
+++ b/datafusion/physical-plan/src/joins/sort_merge_join/stream.rs
@@ -430,7 +430,7 @@ pub(super) fn get_corrected_filter_mask(
             corrected_mask.append_n(expected_size - corrected_mask.len(), 
false);
             Some(corrected_mask.finish())
         }
-        JoinType::LeftMark => {
+        JoinType::LeftMark | JoinType::RightMark => {
             for i in 0..row_indices_length {
                 let last_index =
                     last_index_for_row(i, row_indices, batch_ids, 
row_indices_length);
@@ -582,6 +582,7 @@ impl Stream for SortMergeJoinStream {
                                                 | JoinType::LeftMark
                                                 | JoinType::Right
                                                 | JoinType::RightSemi
+                                                | JoinType::RightMark
                                                 | JoinType::LeftAnti
                                                 | JoinType::RightAnti
                                                 | JoinType::Full
@@ -691,6 +692,7 @@ impl Stream for SortMergeJoinStream {
                                         | JoinType::LeftAnti
                                         | JoinType::RightAnti
                                         | JoinType::LeftMark
+                                        | JoinType::RightMark
                                         | JoinType::Full
                                 )
                             {
@@ -718,6 +720,7 @@ impl Stream for SortMergeJoinStream {
                                     | JoinType::RightAnti
                                     | JoinType::Full
                                     | JoinType::LeftMark
+                                    | JoinType::RightMark
                             )
                         {
                             let record_batch = self.filter_joined_batch()?;
@@ -1042,6 +1045,7 @@ impl SortMergeJoinStream {
                         | JoinType::LeftAnti
                         | JoinType::RightAnti
                         | JoinType::LeftMark
+                        | JoinType::RightMark
                 ) {
                     join_streamed = !self.streamed_joined;
                 }
@@ -1049,9 +1053,15 @@ impl SortMergeJoinStream {
             Ordering::Equal => {
                 if matches!(
                     self.join_type,
-                    JoinType::LeftSemi | JoinType::LeftMark | 
JoinType::RightSemi
+                    JoinType::LeftSemi
+                        | JoinType::LeftMark
+                        | JoinType::RightSemi
+                        | JoinType::RightMark
                 ) {
-                    mark_row_as_match = matches!(self.join_type, 
JoinType::LeftMark);
+                    mark_row_as_match = matches!(
+                        self.join_type,
+                        JoinType::LeftMark | JoinType::RightMark
+                    );
                     // if the join filter is specified then its needed to 
output the streamed index
                     // only if it has not been emitted before
                     // the `join_filter_matched_idxs` keeps track on if 
streamed index has a successful
@@ -1266,31 +1276,32 @@ impl SortMergeJoinStream {
 
             // The row indices of joined buffered batch
             let right_indices: UInt64Array = chunk.buffered_indices.finish();
-            let mut right_columns = if matches!(self.join_type, 
JoinType::LeftMark) {
-                vec![Arc::new(is_not_null(&right_indices)?) as ArrayRef]
-            } else if matches!(
-                self.join_type,
-                JoinType::LeftSemi
-                    | JoinType::LeftAnti
-                    | JoinType::RightAnti
-                    | JoinType::RightSemi
-            ) {
-                vec![]
-            } else if let Some(buffered_idx) = chunk.buffered_batch_idx {
-                fetch_right_columns_by_idxs(
-                    &self.buffered_data,
-                    buffered_idx,
-                    &right_indices,
-                )?
-            } else {
-                // If buffered batch none, meaning it is null joined batch.
-                // We need to create null arrays for buffered columns to join 
with streamed rows.
-                create_unmatched_columns(
+            let mut right_columns =
+                if matches!(self.join_type, JoinType::LeftMark | 
JoinType::RightMark) {
+                    vec![Arc::new(is_not_null(&right_indices)?) as ArrayRef]
+                } else if matches!(
                     self.join_type,
-                    &self.buffered_schema,
-                    right_indices.len(),
-                )
-            };
+                    JoinType::LeftSemi
+                        | JoinType::LeftAnti
+                        | JoinType::RightAnti
+                        | JoinType::RightSemi
+                ) {
+                    vec![]
+                } else if let Some(buffered_idx) = chunk.buffered_batch_idx {
+                    fetch_right_columns_by_idxs(
+                        &self.buffered_data,
+                        buffered_idx,
+                        &right_indices,
+                    )?
+                } else {
+                    // If buffered batch none, meaning it is null joined batch.
+                    // We need to create null arrays for buffered columns to 
join with streamed rows.
+                    create_unmatched_columns(
+                        self.join_type,
+                        &self.buffered_schema,
+                        right_indices.len(),
+                    )
+                };
 
             // Prepare the columns we apply join filter on later.
             // Only for joined rows between streamed and buffered.
@@ -1309,7 +1320,7 @@ impl SortMergeJoinStream {
                         get_filter_column(&self.filter, &left_columns, 
&right_cols)
                     } else if matches!(
                         self.join_type,
-                        JoinType::RightAnti | JoinType::RightSemi
+                        JoinType::RightAnti | JoinType::RightSemi | 
JoinType::RightMark
                     ) {
                         let right_cols = fetch_right_columns_by_idxs(
                             &self.buffered_data,
@@ -1375,6 +1386,7 @@ impl SortMergeJoinStream {
                             | JoinType::LeftAnti
                             | JoinType::RightAnti
                             | JoinType::LeftMark
+                            | JoinType::RightMark
                             | JoinType::Full
                     ) {
                         self.staging_output_record_batches
@@ -1475,6 +1487,7 @@ impl SortMergeJoinStream {
                     | JoinType::LeftAnti
                     | JoinType::RightAnti
                     | JoinType::LeftMark
+                    | JoinType::RightMark
                     | JoinType::Full
             ))
         {
@@ -1537,7 +1550,7 @@ impl SortMergeJoinStream {
 
         if matches!(
             self.join_type,
-            JoinType::Left | JoinType::LeftMark | JoinType::Right
+            JoinType::Left | JoinType::LeftMark | JoinType::Right | 
JoinType::RightMark
         ) {
             let null_mask = compute::not(corrected_mask)?;
             let null_joined_batch = filter_record_batch(&record_batch, 
&null_mask)?;
@@ -1658,7 +1671,7 @@ fn create_unmatched_columns(
     schema: &SchemaRef,
     size: usize,
 ) -> Vec<ArrayRef> {
-    if matches!(join_type, JoinType::LeftMark) {
+    if matches!(join_type, JoinType::LeftMark | JoinType::RightMark) {
         vec![Arc::new(BooleanArray::from(vec![false; size])) as ArrayRef]
     } else {
         schema
diff --git a/datafusion/physical-plan/src/joins/sort_merge_join/tests.rs 
b/datafusion/physical-plan/src/joins/sort_merge_join/tests.rs
index 002a46f97a..83a5c4041c 100644
--- a/datafusion/physical-plan/src/joins/sort_merge_join/tests.rs
+++ b/datafusion/physical-plan/src/joins/sort_merge_join/tests.rs
@@ -1314,6 +1314,38 @@ async fn join_left_mark() -> Result<()> {
     Ok(())
 }
 
+#[tokio::test]
+async fn join_right_mark() -> Result<()> {
+    let left = build_table(
+        ("a1", &vec![1, 2, 2, 3]),
+        ("b1", &vec![4, 5, 5, 7]), // 7 does not exist on the right
+        ("c1", &vec![7, 8, 8, 9]),
+    );
+    let right = build_table(
+        ("a2", &vec![10, 20, 30, 40]),
+        ("b1", &vec![4, 4, 5, 6]), // 5 is double on the left
+        ("c2", &vec![60, 70, 80, 90]),
+    );
+    let on = vec![(
+        Arc::new(Column::new_with_schema("b1", &left.schema())?) as _,
+        Arc::new(Column::new_with_schema("b1", &right.schema())?) as _,
+    )];
+
+    let (_, batches) = join_collect(left, right, on, RightMark).await?;
+    // The output order is important as SMJ preserves sortedness
+    assert_snapshot!(batches_to_string(&batches), @r#"
+            +----+----+----+-------+
+            | a2 | b1 | c2 | mark  |
+            +----+----+----+-------+
+            | 10 | 4  | 60 | true  |
+            | 20 | 4  | 70 | true  |
+            | 30 | 5  | 80 | true  |
+            | 40 | 6  | 90 | false |
+            +----+----+----+-------+
+            "#);
+    Ok(())
+}
+
 #[tokio::test]
 async fn join_with_duplicated_column_names() -> Result<()> {
     let left = build_table(
@@ -1736,7 +1768,7 @@ async fn overallocation_single_batch_no_spill() -> 
Result<()> {
     let sort_options = vec![SortOptions::default(); on.len()];
 
     let join_types = vec![
-        Inner, Left, Right, RightSemi, Full, LeftSemi, LeftAnti, LeftMark,
+        Inner, Left, Right, RightSemi, Full, LeftSemi, LeftAnti, LeftMark, 
RightMark,
     ];
 
     // Disable DiskManager to prevent spilling
@@ -1817,7 +1849,7 @@ async fn overallocation_multi_batch_no_spill() -> 
Result<()> {
     let sort_options = vec![SortOptions::default(); on.len()];
 
     let join_types = vec![
-        Inner, Left, Right, RightSemi, Full, LeftSemi, LeftAnti, LeftMark,
+        Inner, Left, Right, RightSemi, Full, LeftSemi, LeftAnti, LeftMark, 
RightMark,
     ];
 
     // Disable DiskManager to prevent spilling
@@ -1877,7 +1909,7 @@ async fn overallocation_single_batch_spill() -> 
Result<()> {
     let sort_options = vec![SortOptions::default(); on.len()];
 
     let join_types = [
-        Inner, Left, Right, RightSemi, Full, LeftSemi, LeftAnti, LeftMark,
+        Inner, Left, Right, RightSemi, Full, LeftSemi, LeftAnti, LeftMark, 
RightMark,
     ];
 
     // Enable DiskManager to allow spilling
@@ -1981,7 +2013,7 @@ async fn overallocation_multi_batch_spill() -> Result<()> 
{
     let sort_options = vec![SortOptions::default(); on.len()];
 
     let join_types = [
-        Inner, Left, Right, RightSemi, Full, LeftSemi, LeftAnti, LeftMark,
+        Inner, Left, Right, RightSemi, Full, LeftSemi, LeftAnti, LeftMark, 
RightMark,
     ];
 
     // Enable DiskManager to allow spilling
diff --git a/datafusion/physical-plan/src/joins/symmetric_hash_join.rs 
b/datafusion/physical-plan/src/joins/symmetric_hash_join.rs
index aedeb97186..b55b7e15f1 100644
--- a/datafusion/physical-plan/src/joins/symmetric_hash_join.rs
+++ b/datafusion/physical-plan/src/joins/symmetric_hash_join.rs
@@ -1018,6 +1018,7 @@ pub(crate) fn join_with_probe_batch(
             | JoinType::LeftSemi
             | JoinType::LeftMark
             | JoinType::RightSemi
+            | JoinType::RightMark
     ) {
         Ok(None)
     } else {
@@ -1864,6 +1865,7 @@ mod tests {
             JoinType::LeftAnti,
             JoinType::LeftMark,
             JoinType::RightAnti,
+            JoinType::RightMark,
             JoinType::Full
         )]
         join_type: JoinType,
@@ -1952,6 +1954,7 @@ mod tests {
             JoinType::LeftAnti,
             JoinType::LeftMark,
             JoinType::RightAnti,
+            JoinType::RightMark,
             JoinType::Full
         )]
         join_type: JoinType,
@@ -2020,6 +2023,7 @@ mod tests {
             JoinType::LeftAnti,
             JoinType::LeftMark,
             JoinType::RightAnti,
+            JoinType::RightMark,
             JoinType::Full
         )]
         join_type: JoinType,
@@ -2073,6 +2077,7 @@ mod tests {
             JoinType::LeftAnti,
             JoinType::LeftMark,
             JoinType::RightAnti,
+            JoinType::RightMark,
             JoinType::Full
         )]
         join_type: JoinType,
@@ -2101,6 +2106,7 @@ mod tests {
             JoinType::LeftAnti,
             JoinType::LeftMark,
             JoinType::RightAnti,
+            JoinType::RightMark,
             JoinType::Full
         )]
         join_type: JoinType,
@@ -2485,6 +2491,7 @@ mod tests {
             JoinType::LeftAnti,
             JoinType::LeftMark,
             JoinType::RightAnti,
+            JoinType::RightMark,
             JoinType::Full
         )]
         join_type: JoinType,
@@ -2571,6 +2578,7 @@ mod tests {
             JoinType::LeftAnti,
             JoinType::LeftMark,
             JoinType::RightAnti,
+            JoinType::RightMark,
             JoinType::Full
         )]
         join_type: JoinType,
@@ -2649,6 +2657,7 @@ mod tests {
             JoinType::LeftAnti,
             JoinType::LeftMark,
             JoinType::RightAnti,
+            JoinType::RightMark,
             JoinType::Full
         )]
         join_type: JoinType,
diff --git a/datafusion/physical-plan/src/joins/utils.rs 
b/datafusion/physical-plan/src/joins/utils.rs
index a62ae79635..c50bfce93a 100644
--- a/datafusion/physical-plan/src/joins/utils.rs
+++ b/datafusion/physical-plan/src/joins/utils.rs
@@ -306,7 +306,10 @@ pub fn build_join_schema(
     };
 
     let (schema1, schema2) = match join_type {
-        JoinType::Right | JoinType::RightSemi | JoinType::RightAnti => (left, 
right),
+        JoinType::Right
+        | JoinType::RightSemi
+        | JoinType::RightAnti
+        | JoinType::RightMark => (left, right),
         _ => (right, left),
     };
 
@@ -1619,8 +1622,9 @@ pub fn swap_join_projection(
         JoinType::LeftAnti
         | JoinType::LeftSemi
         | JoinType::RightAnti
-        | JoinType::RightSemi => projection.cloned(),
-
+        | JoinType::RightSemi
+        | JoinType::LeftMark
+        | JoinType::RightMark => projection.cloned(),
         _ => projection.map(|p| {
             p.iter()
                 .map(|i| {
diff --git a/datafusion/sqllogictest/test_files/joins.slt 
b/datafusion/sqllogictest/test_files/joins.slt
index c24b0777cc..96d2bad086 100644
--- a/datafusion/sqllogictest/test_files/joins.slt
+++ b/datafusion/sqllogictest/test_files/joins.slt
@@ -5160,6 +5160,36 @@ LEFT ANTI JOIN t2 ON k1 = k2
 WHERE k1 < 0
 ----
 
+# Mark testing
+statement ok
+CREATE OR REPLACE TABLE t1(b INT, c INT, d INT);
+
+statement ok
+INSERT INTO t1 VALUES
+  (10, 5, 3),   
+  ( 1, 7, 8),  
+  ( 2, 9, 7),   
+  ( 3, 8,10),   
+  ( 5, 6, 6),   
+  ( 0, 4, 9),   
+  ( 4, 8, 7),   
+  (100,6, 5);  
+
+query I rowsort
+SELECT c
+  FROM t1
+ WHERE c > d
+    OR EXISTS(SELECT 1 FROM t1 AS x WHERE x.b<t1.b)
+    OR (c <= d-2 OR c >= d+2)
+----
+4
+5
+6
+6
+7
+8
+8
+9
 
 statement ok
 DROP TABLE t1;
diff --git a/datafusion/sqllogictest/test_files/subquery.slt 
b/datafusion/sqllogictest/test_files/subquery.slt
index 43f85d1e20..dec9357495 100644
--- a/datafusion/sqllogictest/test_files/subquery.slt
+++ b/datafusion/sqllogictest/test_files/subquery.slt
@@ -1192,7 +1192,7 @@ physical_plan
 01)CoalesceBatchesExec: target_batch_size=2
 02)--FilterExec: t1_id@0 > 40 OR NOT mark@3, projection=[t1_id@0, t1_name@1, 
t1_int@2]
 03)----CoalesceBatchesExec: target_batch_size=2
-04)------HashJoinExec: mode=CollectLeft, join_type=LeftMark, on=[(t1_id@0, 
t2_id@0)]
+04)------HashJoinExec: mode=CollectLeft, join_type=RightMark, on=[(t2_id@0, 
t1_id@0)]
 05)--------DataSourceExec: partitions=1, partition_sizes=[2]
 06)--------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
 07)----------DataSourceExec: partitions=1, partition_sizes=[2]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to