This is an automated email from the ASF dual-hosted git repository.

github-bot pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new 984d210036 fix: Pick correct columns in Sort Merge Equijoin (#18772)
984d210036 is described below

commit 984d210036792a129fdaf3e8cead88da9e405662
Author: Tal Glanzman <[email protected]>
AuthorDate: Tue Nov 18 20:36:57 2025 +0200

    fix: Pick correct columns in Sort Merge Equijoin (#18772)
    
    ## Which issue does this PR close?
    
    - Closes #18804.
    
    ## Rationale for this change
    
    ## What changes are included in this PR?
    
    Take correct columns
    
    ## Are these changes tested?
    
    Yes,
    
    - added rust tests to hit invalid code paths
    - sqltests
    - fuzz tests enhancement to fuzzify columns count
    
    Fuzz tests are taken from @rluvaton 's #18788 , excluding those this PR
    doesn't fix:
    
    ```
        fuzz_cases::join_fuzz::test_right_anti_join_1k_binary_filtered
        fuzz_cases::join_fuzz::test_right_anti_join_1k_filtered
        fuzz_cases::join_fuzz::test_right_semi_join_1k_filtered
    ```
    
    ## Are there any user-facing changes?
---
 datafusion/core/tests/fuzz_cases/join_fuzz.rs      | 697 ++++++++++++---------
 .../src/joins/sort_merge_join/stream.rs            |  46 +-
 .../src/joins/sort_merge_join/tests.rs             | 234 +++++++
 .../sqllogictest/test_files/sort_merge_join.slt    |  49 ++
 4 files changed, 699 insertions(+), 327 deletions(-)

diff --git a/datafusion/core/tests/fuzz_cases/join_fuzz.rs 
b/datafusion/core/tests/fuzz_cases/join_fuzz.rs
index e8ff1ccf06..b6a5e18da1 100644
--- a/datafusion/core/tests/fuzz_cases/join_fuzz.rs
+++ b/datafusion/core/tests/fuzz_cases/join_fuzz.rs
@@ -91,141 +91,163 @@ fn col_lt_col_filter(schema1: Arc<Schema>, schema2: 
Arc<Schema>) -> JoinFilter {
 
 #[tokio::test]
 async fn test_inner_join_1k_filtered() {
-    JoinFuzzTestCase::new(
-        make_staggered_batches_i32(1000),
-        make_staggered_batches_i32(1000),
-        JoinType::Inner,
-        Some(Box::new(col_lt_col_filter)),
-    )
-    .run_test(&[HjSmj, NljHj], false)
-    .await
+    for (left_extra, right_extra) in [(true, true), (false, true), (true, 
false)] {
+        JoinFuzzTestCase::new(
+            make_staggered_batches_i32(1000, left_extra),
+            make_staggered_batches_i32(1000, right_extra),
+            JoinType::Inner,
+            Some(Box::new(col_lt_col_filter)),
+        )
+        .run_test(&[HjSmj, NljHj], false)
+        .await
+    }
 }
 
 #[tokio::test]
 async fn test_inner_join_1k() {
-    JoinFuzzTestCase::new(
-        make_staggered_batches_i32(1000),
-        make_staggered_batches_i32(1000),
-        JoinType::Inner,
-        None,
-    )
-    .run_test(&[HjSmj, NljHj], false)
-    .await
+    for (left_extra, right_extra) in [(true, true), (false, true), (true, 
false)] {
+        JoinFuzzTestCase::new(
+            make_staggered_batches_i32(1000, left_extra),
+            make_staggered_batches_i32(1000, right_extra),
+            JoinType::Inner,
+            None,
+        )
+        .run_test(&[HjSmj, NljHj], false)
+        .await
+    }
 }
 
 #[tokio::test]
 async fn test_left_join_1k() {
-    JoinFuzzTestCase::new(
-        make_staggered_batches_i32(1000),
-        make_staggered_batches_i32(1000),
-        JoinType::Left,
-        None,
-    )
-    .run_test(&[HjSmj, NljHj], false)
-    .await
+    for (left_extra, right_extra) in [(true, true), (false, true), (true, 
false)] {
+        JoinFuzzTestCase::new(
+            make_staggered_batches_i32(1000, left_extra),
+            make_staggered_batches_i32(1000, right_extra),
+            JoinType::Left,
+            None,
+        )
+        .run_test(&[HjSmj, NljHj], false)
+        .await
+    }
 }
 
 #[tokio::test]
 async fn test_left_join_1k_filtered() {
-    JoinFuzzTestCase::new(
-        make_staggered_batches_i32(1000),
-        make_staggered_batches_i32(1000),
-        JoinType::Left,
-        Some(Box::new(col_lt_col_filter)),
-    )
-    .run_test(&[HjSmj, NljHj], false)
-    .await
+    for (left_extra, right_extra) in [(true, true), (false, true), (true, 
false)] {
+        JoinFuzzTestCase::new(
+            make_staggered_batches_i32(1000, left_extra),
+            make_staggered_batches_i32(1000, right_extra),
+            JoinType::Left,
+            Some(Box::new(col_lt_col_filter)),
+        )
+        .run_test(&[HjSmj, NljHj], false)
+        .await
+    }
 }
 
 #[tokio::test]
 async fn test_right_join_1k() {
-    JoinFuzzTestCase::new(
-        make_staggered_batches_i32(1000),
-        make_staggered_batches_i32(1000),
-        JoinType::Right,
-        None,
-    )
-    .run_test(&[HjSmj, NljHj], false)
-    .await
+    for (left_extra, right_extra) in [(true, true), (false, true), (true, 
false)] {
+        JoinFuzzTestCase::new(
+            make_staggered_batches_i32(1000, left_extra),
+            make_staggered_batches_i32(1000, right_extra),
+            JoinType::Right,
+            None,
+        )
+        .run_test(&[HjSmj, NljHj], false)
+        .await
+    }
 }
 
 #[tokio::test]
 async fn test_right_join_1k_filtered() {
-    JoinFuzzTestCase::new(
-        make_staggered_batches_i32(1000),
-        make_staggered_batches_i32(1000),
-        JoinType::Right,
-        Some(Box::new(col_lt_col_filter)),
-    )
-    .run_test(&[HjSmj, NljHj], false)
-    .await
+    for (left_extra, right_extra) in [(true, true), (false, true), (true, 
false)] {
+        JoinFuzzTestCase::new(
+            make_staggered_batches_i32(1000, left_extra),
+            make_staggered_batches_i32(1000, right_extra),
+            JoinType::Right,
+            Some(Box::new(col_lt_col_filter)),
+        )
+        .run_test(&[HjSmj, NljHj], false)
+        .await
+    }
 }
 
 #[tokio::test]
 async fn test_full_join_1k() {
-    JoinFuzzTestCase::new(
-        make_staggered_batches_i32(1000),
-        make_staggered_batches_i32(1000),
-        JoinType::Full,
-        None,
-    )
-    .run_test(&[HjSmj, NljHj], false)
-    .await
+    for (left_extra, right_extra) in [(true, true), (false, true), (true, 
false)] {
+        JoinFuzzTestCase::new(
+            make_staggered_batches_i32(1000, left_extra),
+            make_staggered_batches_i32(1000, right_extra),
+            JoinType::Full,
+            None,
+        )
+        .run_test(&[HjSmj, NljHj], false)
+        .await
+    }
 }
 
 #[tokio::test]
 async fn test_full_join_1k_filtered() {
-    JoinFuzzTestCase::new(
-        make_staggered_batches_i32(1000),
-        make_staggered_batches_i32(1000),
-        JoinType::Full,
-        Some(Box::new(col_lt_col_filter)),
-    )
-    .run_test(&[NljHj, HjSmj], false)
-    .await
+    for (left_extra, right_extra) in [(true, true), (false, true), (true, 
false)] {
+        JoinFuzzTestCase::new(
+            make_staggered_batches_i32(1000, left_extra),
+            make_staggered_batches_i32(1000, right_extra),
+            JoinType::Full,
+            Some(Box::new(col_lt_col_filter)),
+        )
+        .run_test(&[NljHj, HjSmj], false)
+        .await
+    }
 }
 
 #[tokio::test]
 async fn test_left_semi_join_1k() {
-    JoinFuzzTestCase::new(
-        make_staggered_batches_i32(1000),
-        make_staggered_batches_i32(1000),
-        JoinType::LeftSemi,
-        None,
-    )
-    .run_test(&[HjSmj, NljHj], false)
-    .await
+    for (left_extra, right_extra) in [(true, true), (false, true), (true, 
false)] {
+        JoinFuzzTestCase::new(
+            make_staggered_batches_i32(1000, left_extra),
+            make_staggered_batches_i32(1000, right_extra),
+            JoinType::LeftSemi,
+            None,
+        )
+        .run_test(&[HjSmj, NljHj], false)
+        .await
+    }
 }
 
 #[tokio::test]
 async fn test_left_semi_join_1k_filtered() {
-    JoinFuzzTestCase::new(
-        make_staggered_batches_i32(1000),
-        make_staggered_batches_i32(1000),
-        JoinType::LeftSemi,
-        Some(Box::new(col_lt_col_filter)),
-    )
-    .run_test(&[HjSmj, NljHj], false)
-    .await
+    for (left_extra, right_extra) in [(true, true), (false, true), (true, 
false)] {
+        JoinFuzzTestCase::new(
+            make_staggered_batches_i32(1000, left_extra),
+            make_staggered_batches_i32(1000, right_extra),
+            JoinType::LeftSemi,
+            Some(Box::new(col_lt_col_filter)),
+        )
+        .run_test(&[HjSmj, NljHj], false)
+        .await
+    }
 }
 
 #[tokio::test]
 async fn test_right_semi_join_1k() {
-    JoinFuzzTestCase::new(
-        make_staggered_batches_i32(1000),
-        make_staggered_batches_i32(1000),
-        JoinType::RightSemi,
-        None,
-    )
-    .run_test(&[HjSmj, NljHj], false)
-    .await
+    for (left_extra, right_extra) in [(true, true), (false, true), (true, 
false)] {
+        JoinFuzzTestCase::new(
+            make_staggered_batches_i32(1000, left_extra),
+            make_staggered_batches_i32(1000, right_extra),
+            JoinType::RightSemi,
+            None,
+        )
+        .run_test(&[HjSmj, NljHj], false)
+        .await
+    }
 }
 
 #[tokio::test]
 async fn test_right_semi_join_1k_filtered() {
     JoinFuzzTestCase::new(
-        make_staggered_batches_i32(1000),
-        make_staggered_batches_i32(1000),
+        make_staggered_batches_i32(1000, false),
+        make_staggered_batches_i32(1000, false),
         JoinType::RightSemi,
         Some(Box::new(col_lt_col_filter)),
     )
@@ -235,45 +257,51 @@ async fn test_right_semi_join_1k_filtered() {
 
 #[tokio::test]
 async fn test_left_anti_join_1k() {
-    JoinFuzzTestCase::new(
-        make_staggered_batches_i32(1000),
-        make_staggered_batches_i32(1000),
-        JoinType::LeftAnti,
-        None,
-    )
-    .run_test(&[HjSmj, NljHj], false)
-    .await
+    for (left_extra, right_extra) in [(true, true), (false, true), (true, 
false)] {
+        JoinFuzzTestCase::new(
+            make_staggered_batches_i32(1000, left_extra),
+            make_staggered_batches_i32(1000, right_extra),
+            JoinType::LeftAnti,
+            None,
+        )
+        .run_test(&[HjSmj, NljHj], false)
+        .await
+    }
 }
 
 #[tokio::test]
 async fn test_left_anti_join_1k_filtered() {
-    JoinFuzzTestCase::new(
-        make_staggered_batches_i32(1000),
-        make_staggered_batches_i32(1000),
-        JoinType::LeftAnti,
-        Some(Box::new(col_lt_col_filter)),
-    )
-    .run_test(&[HjSmj, NljHj], false)
-    .await
+    for (left_extra, right_extra) in [(true, true), (false, true), (true, 
false)] {
+        JoinFuzzTestCase::new(
+            make_staggered_batches_i32(1000, left_extra),
+            make_staggered_batches_i32(1000, right_extra),
+            JoinType::LeftAnti,
+            Some(Box::new(col_lt_col_filter)),
+        )
+        .run_test(&[HjSmj, NljHj], false)
+        .await
+    }
 }
 
 #[tokio::test]
 async fn test_right_anti_join_1k() {
-    JoinFuzzTestCase::new(
-        make_staggered_batches_i32(1000),
-        make_staggered_batches_i32(1000),
-        JoinType::RightAnti,
-        None,
-    )
-    .run_test(&[HjSmj, NljHj], false)
-    .await
+    for (left_extra, right_extra) in [(true, true), (false, true), (true, 
false)] {
+        JoinFuzzTestCase::new(
+            make_staggered_batches_i32(1000, left_extra),
+            make_staggered_batches_i32(1000, right_extra),
+            JoinType::RightAnti,
+            None,
+        )
+        .run_test(&[HjSmj, NljHj], false)
+        .await
+    }
 }
 
 #[tokio::test]
 async fn test_right_anti_join_1k_filtered() {
     JoinFuzzTestCase::new(
-        make_staggered_batches_i32(1000),
-        make_staggered_batches_i32(1000),
+        make_staggered_batches_i32(1000, false),
+        make_staggered_batches_i32(1000, false),
         JoinType::RightAnti,
         Some(Box::new(col_lt_col_filter)),
     )
@@ -283,226 +311,262 @@ async fn test_right_anti_join_1k_filtered() {
 
 #[tokio::test]
 async fn test_left_mark_join_1k() {
-    JoinFuzzTestCase::new(
-        make_staggered_batches_i32(1000),
-        make_staggered_batches_i32(1000),
-        JoinType::LeftMark,
-        None,
-    )
-    .run_test(&[HjSmj, NljHj], false)
-    .await
+    for (left_extra, right_extra) in [(true, true), (false, true), (true, 
false)] {
+        JoinFuzzTestCase::new(
+            make_staggered_batches_i32(1000, left_extra),
+            make_staggered_batches_i32(1000, right_extra),
+            JoinType::LeftMark,
+            None,
+        )
+        .run_test(&[HjSmj, NljHj], false)
+        .await
+    }
 }
 
 #[tokio::test]
 async fn test_left_mark_join_1k_filtered() {
-    JoinFuzzTestCase::new(
-        make_staggered_batches_i32(1000),
-        make_staggered_batches_i32(1000),
-        JoinType::LeftMark,
-        Some(Box::new(col_lt_col_filter)),
-    )
-    .run_test(&[HjSmj, NljHj], false)
-    .await
+    for (left_extra, right_extra) in [(true, true), (false, true), (true, 
false)] {
+        JoinFuzzTestCase::new(
+            make_staggered_batches_i32(1000, left_extra),
+            make_staggered_batches_i32(1000, right_extra),
+            JoinType::LeftMark,
+            Some(Box::new(col_lt_col_filter)),
+        )
+        .run_test(&[HjSmj, NljHj], false)
+        .await
+    }
 }
 
 // todo: add JoinTestType::HjSmj after Right mark SortMergeJoin support
 #[tokio::test]
 async fn test_right_mark_join_1k() {
-    JoinFuzzTestCase::new(
-        make_staggered_batches_i32(1000),
-        make_staggered_batches_i32(1000),
-        JoinType::RightMark,
-        None,
-    )
-    .run_test(&[HjSmj, NljHj], false)
-    .await
+    for (left_extra, right_extra) in [(true, true), (false, true), (true, 
false)] {
+        JoinFuzzTestCase::new(
+            make_staggered_batches_i32(1000, left_extra),
+            make_staggered_batches_i32(1000, right_extra),
+            JoinType::RightMark,
+            None,
+        )
+        .run_test(&[HjSmj, NljHj], false)
+        .await
+    }
 }
 
 #[tokio::test]
 async fn test_right_mark_join_1k_filtered() {
-    JoinFuzzTestCase::new(
-        make_staggered_batches_i32(1000),
-        make_staggered_batches_i32(1000),
-        JoinType::RightMark,
-        Some(Box::new(col_lt_col_filter)),
-    )
-    .run_test(&[HjSmj, NljHj], false)
-    .await
+    for (left_extra, right_extra) in [(true, true), (false, true), (true, 
false)] {
+        JoinFuzzTestCase::new(
+            make_staggered_batches_i32(1000, left_extra),
+            make_staggered_batches_i32(1000, right_extra),
+            JoinType::RightMark,
+            Some(Box::new(col_lt_col_filter)),
+        )
+        .run_test(&[HjSmj, NljHj], false)
+        .await
+    }
 }
 
 #[tokio::test]
 async fn test_inner_join_1k_binary_filtered() {
-    JoinFuzzTestCase::new(
-        make_staggered_batches_binary(1000),
-        make_staggered_batches_binary(1000),
-        JoinType::Inner,
-        Some(Box::new(col_lt_col_filter)),
-    )
-    .run_test(&[HjSmj, NljHj], false)
-    .await
+    for (left_extra, right_extra) in [(true, true), (false, true), (true, 
false)] {
+        JoinFuzzTestCase::new(
+            make_staggered_batches_binary(1000, left_extra),
+            make_staggered_batches_binary(1000, right_extra),
+            JoinType::Inner,
+            Some(Box::new(col_lt_col_filter)),
+        )
+        .run_test(&[HjSmj, NljHj], false)
+        .await
+    }
 }
 
 #[tokio::test]
 async fn test_inner_join_1k_binary() {
-    JoinFuzzTestCase::new(
-        make_staggered_batches_binary(1000),
-        make_staggered_batches_binary(1000),
-        JoinType::Inner,
-        None,
-    )
-    .run_test(&[HjSmj, NljHj], false)
-    .await
+    for (left_extra, right_extra) in [(true, true), (false, true), (true, 
false)] {
+        JoinFuzzTestCase::new(
+            make_staggered_batches_binary(1000, left_extra),
+            make_staggered_batches_binary(1000, right_extra),
+            JoinType::Inner,
+            None,
+        )
+        .run_test(&[HjSmj, NljHj], false)
+        .await
+    }
 }
 
 #[tokio::test]
 async fn test_left_join_1k_binary() {
-    JoinFuzzTestCase::new(
-        make_staggered_batches_binary(1000),
-        make_staggered_batches_binary(1000),
-        JoinType::Left,
-        None,
-    )
-    .run_test(&[HjSmj, NljHj], false)
-    .await
+    for (left_extra, right_extra) in [(true, true), (false, true), (true, 
false)] {
+        JoinFuzzTestCase::new(
+            make_staggered_batches_binary(1000, left_extra),
+            make_staggered_batches_binary(1000, right_extra),
+            JoinType::Left,
+            None,
+        )
+        .run_test(&[HjSmj, NljHj], false)
+        .await
+    }
 }
 
 #[tokio::test]
 async fn test_left_join_1k_binary_filtered() {
-    JoinFuzzTestCase::new(
-        make_staggered_batches_binary(1000),
-        make_staggered_batches_binary(1000),
-        JoinType::Left,
-        Some(Box::new(col_lt_col_filter)),
-    )
-    .run_test(&[HjSmj, NljHj], false)
-    .await
+    for (left_extra, right_extra) in [(true, true), (false, true), (true, 
false)] {
+        JoinFuzzTestCase::new(
+            make_staggered_batches_binary(1000, left_extra),
+            make_staggered_batches_binary(1000, right_extra),
+            JoinType::Left,
+            Some(Box::new(col_lt_col_filter)),
+        )
+        .run_test(&[HjSmj, NljHj], false)
+        .await
+    }
 }
 
 #[tokio::test]
 async fn test_right_join_1k_binary() {
-    JoinFuzzTestCase::new(
-        make_staggered_batches_binary(1000),
-        make_staggered_batches_binary(1000),
-        JoinType::Right,
-        None,
-    )
-    .run_test(&[HjSmj, NljHj], false)
-    .await
+    for (left_extra, right_extra) in [(true, true), (false, true), (true, 
false)] {
+        JoinFuzzTestCase::new(
+            make_staggered_batches_binary(1000, left_extra),
+            make_staggered_batches_binary(1000, right_extra),
+            JoinType::Right,
+            None,
+        )
+        .run_test(&[HjSmj, NljHj], false)
+        .await
+    }
 }
 
 #[tokio::test]
 async fn test_right_join_1k_binary_filtered() {
-    JoinFuzzTestCase::new(
-        make_staggered_batches_binary(1000),
-        make_staggered_batches_binary(1000),
-        JoinType::Right,
-        Some(Box::new(col_lt_col_filter)),
-    )
-    .run_test(&[HjSmj, NljHj], false)
-    .await
+    for (left_extra, right_extra) in [(true, true), (false, true), (true, 
false)] {
+        JoinFuzzTestCase::new(
+            make_staggered_batches_binary(1000, left_extra),
+            make_staggered_batches_binary(1000, right_extra),
+            JoinType::Right,
+            Some(Box::new(col_lt_col_filter)),
+        )
+        .run_test(&[HjSmj, NljHj], false)
+        .await
+    }
 }
 
 #[tokio::test]
 async fn test_full_join_1k_binary() {
-    JoinFuzzTestCase::new(
-        make_staggered_batches_binary(1000),
-        make_staggered_batches_binary(1000),
-        JoinType::Full,
-        None,
-    )
-    .run_test(&[HjSmj, NljHj], false)
-    .await
+    for (left_extra, right_extra) in [(true, true), (false, true), (true, 
false)] {
+        JoinFuzzTestCase::new(
+            make_staggered_batches_binary(1000, left_extra),
+            make_staggered_batches_binary(1000, right_extra),
+            JoinType::Full,
+            None,
+        )
+        .run_test(&[HjSmj, NljHj], false)
+        .await
+    }
 }
 
 #[tokio::test]
 async fn test_full_join_1k_binary_filtered() {
-    JoinFuzzTestCase::new(
-        make_staggered_batches_binary(1000),
-        make_staggered_batches_binary(1000),
-        JoinType::Full,
-        Some(Box::new(col_lt_col_filter)),
-    )
-    .run_test(&[NljHj, HjSmj], false)
-    .await
+    for (left_extra, right_extra) in [(true, true), (false, true), (true, 
false)] {
+        JoinFuzzTestCase::new(
+            make_staggered_batches_binary(1000, left_extra),
+            make_staggered_batches_binary(1000, right_extra),
+            JoinType::Full,
+            Some(Box::new(col_lt_col_filter)),
+        )
+        .run_test(&[NljHj, HjSmj], false)
+        .await
+    }
 }
 
 #[tokio::test]
 async fn test_left_semi_join_1k_binary() {
-    JoinFuzzTestCase::new(
-        make_staggered_batches_binary(1000),
-        make_staggered_batches_binary(1000),
-        JoinType::LeftSemi,
-        None,
-    )
-    .run_test(&[HjSmj, NljHj], false)
-    .await
+    for (left_extra, right_extra) in [(true, true), (false, true), (true, 
false)] {
+        JoinFuzzTestCase::new(
+            make_staggered_batches_binary(1000, left_extra),
+            make_staggered_batches_binary(1000, right_extra),
+            JoinType::LeftSemi,
+            None,
+        )
+        .run_test(&[HjSmj, NljHj], false)
+        .await
+    }
 }
 
 #[tokio::test]
 async fn test_left_semi_join_1k_binary_filtered() {
-    JoinFuzzTestCase::new(
-        make_staggered_batches_binary(1000),
-        make_staggered_batches_binary(1000),
-        JoinType::LeftSemi,
-        Some(Box::new(col_lt_col_filter)),
-    )
-    .run_test(&[HjSmj, NljHj], false)
-    .await
+    for (left_extra, right_extra) in [(true, true), (false, true), (true, 
false)] {
+        JoinFuzzTestCase::new(
+            make_staggered_batches_binary(1000, left_extra),
+            make_staggered_batches_binary(1000, right_extra),
+            JoinType::LeftSemi,
+            Some(Box::new(col_lt_col_filter)),
+        )
+        .run_test(&[HjSmj, NljHj], false)
+        .await
+    }
 }
 
 #[tokio::test]
 async fn test_right_semi_join_1k_binary() {
-    JoinFuzzTestCase::new(
-        make_staggered_batches_binary(1000),
-        make_staggered_batches_binary(1000),
-        JoinType::RightSemi,
-        None,
-    )
-    .run_test(&[HjSmj, NljHj], false)
-    .await
+    for (left_extra, right_extra) in [(true, true), (false, true), (true, 
false)] {
+        JoinFuzzTestCase::new(
+            make_staggered_batches_binary(1000, left_extra),
+            make_staggered_batches_binary(1000, right_extra),
+            JoinType::RightSemi,
+            None,
+        )
+        .run_test(&[HjSmj, NljHj], false)
+        .await
+    }
 }
 
 #[tokio::test]
 async fn test_right_semi_join_1k_binary_filtered() {
-    JoinFuzzTestCase::new(
-        make_staggered_batches_binary(1000),
-        make_staggered_batches_binary(1000),
-        JoinType::RightSemi,
-        Some(Box::new(col_lt_col_filter)),
-    )
-    .run_test(&[HjSmj, NljHj], false)
-    .await
+    for (left_extra, right_extra) in [(true, true), (false, true), (true, 
false)] {
+        JoinFuzzTestCase::new(
+            make_staggered_batches_binary(1000, left_extra),
+            make_staggered_batches_binary(1000, right_extra),
+            JoinType::RightSemi,
+            Some(Box::new(col_lt_col_filter)),
+        )
+        .run_test(&[HjSmj, NljHj], false)
+        .await
+    }
 }
 
 #[tokio::test]
 async fn test_left_anti_join_1k_binary() {
-    JoinFuzzTestCase::new(
-        make_staggered_batches_binary(1000),
-        make_staggered_batches_binary(1000),
-        JoinType::LeftAnti,
-        None,
-    )
-    .run_test(&[HjSmj, NljHj], false)
-    .await
+    for (left_extra, right_extra) in [(true, true), (false, true), (true, 
false)] {
+        JoinFuzzTestCase::new(
+            make_staggered_batches_binary(1000, left_extra),
+            make_staggered_batches_binary(1000, right_extra),
+            JoinType::LeftAnti,
+            None,
+        )
+        .run_test(&[HjSmj, NljHj], false)
+        .await
+    }
 }
 
 #[tokio::test]
 async fn test_left_anti_join_1k_binary_filtered() {
-    JoinFuzzTestCase::new(
-        make_staggered_batches_binary(1000),
-        make_staggered_batches_binary(1000),
-        JoinType::LeftAnti,
-        Some(Box::new(col_lt_col_filter)),
-    )
-    .run_test(&[HjSmj, NljHj], false)
-    .await
+    for (left_extra, right_extra) in [(true, true), (false, true), (true, 
false)] {
+        JoinFuzzTestCase::new(
+            make_staggered_batches_binary(1000, left_extra),
+            make_staggered_batches_binary(1000, right_extra),
+            JoinType::LeftAnti,
+            Some(Box::new(col_lt_col_filter)),
+        )
+        .run_test(&[HjSmj, NljHj], false)
+        .await
+    }
 }
 
 #[tokio::test]
 async fn test_right_anti_join_1k_binary() {
     JoinFuzzTestCase::new(
-        make_staggered_batches_binary(1000),
-        make_staggered_batches_binary(1000),
+        make_staggered_batches_binary(1000, false),
+        make_staggered_batches_binary(1000, false),
         JoinType::RightAnti,
         None,
     )
@@ -513,8 +577,8 @@ async fn test_right_anti_join_1k_binary() {
 #[tokio::test]
 async fn test_right_anti_join_1k_binary_filtered() {
     JoinFuzzTestCase::new(
-        make_staggered_batches_binary(1000),
-        make_staggered_batches_binary(1000),
+        make_staggered_batches_binary(1000, false),
+        make_staggered_batches_binary(1000, false),
         JoinType::RightAnti,
         Some(Box::new(col_lt_col_filter)),
     )
@@ -524,51 +588,59 @@ async fn test_right_anti_join_1k_binary_filtered() {
 
 #[tokio::test]
 async fn test_left_mark_join_1k_binary() {
-    JoinFuzzTestCase::new(
-        make_staggered_batches_binary(1000),
-        make_staggered_batches_binary(1000),
-        JoinType::LeftMark,
-        None,
-    )
-    .run_test(&[HjSmj, NljHj], false)
-    .await
+    for (left_extra, right_extra) in [(true, true), (false, true), (true, 
false)] {
+        JoinFuzzTestCase::new(
+            make_staggered_batches_binary(1000, left_extra),
+            make_staggered_batches_binary(1000, right_extra),
+            JoinType::LeftMark,
+            None,
+        )
+        .run_test(&[HjSmj, NljHj], false)
+        .await
+    }
 }
 
 #[tokio::test]
 async fn test_left_mark_join_1k_binary_filtered() {
-    JoinFuzzTestCase::new(
-        make_staggered_batches_binary(1000),
-        make_staggered_batches_binary(1000),
-        JoinType::LeftMark,
-        Some(Box::new(col_lt_col_filter)),
-    )
-    .run_test(&[HjSmj, NljHj], false)
-    .await
+    for (left_extra, right_extra) in [(true, true), (false, true), (true, 
false)] {
+        JoinFuzzTestCase::new(
+            make_staggered_batches_binary(1000, left_extra),
+            make_staggered_batches_binary(1000, right_extra),
+            JoinType::LeftMark,
+            Some(Box::new(col_lt_col_filter)),
+        )
+        .run_test(&[HjSmj, NljHj], false)
+        .await
+    }
 }
 
 // todo: add JoinTestType::HjSmj after Right mark SortMergeJoin support
 #[tokio::test]
 async fn test_right_mark_join_1k_binary() {
-    JoinFuzzTestCase::new(
-        make_staggered_batches_binary(1000),
-        make_staggered_batches_binary(1000),
-        JoinType::RightMark,
-        None,
-    )
-    .run_test(&[HjSmj, NljHj], false)
-    .await
+    for (left_extra, right_extra) in [(true, true), (false, true), (true, 
false)] {
+        JoinFuzzTestCase::new(
+            make_staggered_batches_binary(1000, left_extra),
+            make_staggered_batches_binary(1000, right_extra),
+            JoinType::RightMark,
+            None,
+        )
+        .run_test(&[HjSmj, NljHj], false)
+        .await
+    }
 }
 
 #[tokio::test]
 async fn test_right_mark_join_1k_binary_filtered() {
-    JoinFuzzTestCase::new(
-        make_staggered_batches_binary(1000),
-        make_staggered_batches_binary(1000),
-        JoinType::RightMark,
-        Some(Box::new(col_lt_col_filter)),
-    )
-    .run_test(&[HjSmj, NljHj], false)
-    .await
+    for (left_extra, right_extra) in [(true, true), (false, true), (true, 
false)] {
+        JoinFuzzTestCase::new(
+            make_staggered_batches_binary(1000, left_extra),
+            make_staggered_batches_binary(1000, right_extra),
+            JoinType::RightMark,
+            Some(Box::new(col_lt_col_filter)),
+        )
+        .run_test(&[HjSmj, NljHj], false)
+        .await
+    }
 }
 
 type JoinFilterBuilder = Box<dyn Fn(Arc<Schema>, Arc<Schema>) -> JoinFilter>;
@@ -1031,7 +1103,7 @@ impl JoinFuzzTestCase {
 /// Return randomly sized record batches with:
 /// two sorted int32 columns 'a', 'b' ranged from 0..99 as join columns
 /// two random int32 columns 'x', 'y' as other columns
-fn make_staggered_batches_i32(len: usize) -> Vec<RecordBatch> {
+fn make_staggered_batches_i32(len: usize, with_extra_column: bool) -> 
Vec<RecordBatch> {
     let mut rng = rand::rng();
     let mut input12: Vec<(i32, i32)> = vec![(0, 0); len];
     let mut input3: Vec<i32> = vec![0; len];
@@ -1047,14 +1119,18 @@ fn make_staggered_batches_i32(len: usize) -> 
Vec<RecordBatch> {
     let input3 = Int32Array::from_iter_values(input3);
     let input4 = Int32Array::from_iter_values(input4);
 
-    // split into several record batches
-    let batch = RecordBatch::try_from_iter(vec![
+    let mut columns = vec![
         ("a", Arc::new(input1) as ArrayRef),
         ("b", Arc::new(input2) as ArrayRef),
         ("x", Arc::new(input3) as ArrayRef),
-        ("y", Arc::new(input4) as ArrayRef),
-    ])
-    .unwrap();
+    ];
+
+    if with_extra_column {
+        columns.push(("y", Arc::new(input4) as ArrayRef));
+    }
+
+    // split into several record batches
+    let batch = RecordBatch::try_from_iter(columns).unwrap();
 
     // use a random number generator to pick a random sized output
     stagger_batch_with_seed(batch, 42)
@@ -1070,7 +1146,10 @@ fn rand_bytes<R: Rng>(rng: &mut R, min: usize, max: 
usize) -> Vec<u8> {
 /// Return randomly sized record batches with:
 /// two sorted binary columns 'a', 'b' (lexicographically) as join columns
 /// two random binary columns 'x', 'y' as other columns
-fn make_staggered_batches_binary(len: usize) -> Vec<RecordBatch> {
+fn make_staggered_batches_binary(
+    len: usize,
+    with_extra_column: bool,
+) -> Vec<RecordBatch> {
     let mut rng = rand::rng();
 
     // produce (a,b) pairs then sort lexicographically so SMJ has naturally 
sorted keys
@@ -1088,13 +1167,17 @@ fn make_staggered_batches_binary(len: usize) -> 
Vec<RecordBatch> {
     let x = BinaryArray::from_iter_values(input3.iter());
     let y = BinaryArray::from_iter_values(input4.iter());
 
-    let batch = RecordBatch::try_from_iter(vec![
+    let mut columns = vec![
         ("a", Arc::new(a) as ArrayRef),
         ("b", Arc::new(b) as ArrayRef),
         ("x", Arc::new(x) as ArrayRef),
-        ("y", Arc::new(y) as ArrayRef),
-    ])
-    .unwrap();
+    ];
+
+    if with_extra_column {
+        columns.push(("y", Arc::new(y) as ArrayRef));
+    }
+
+    let batch = RecordBatch::try_from_iter(columns).unwrap();
 
     // preserve your existing randomized partitioning
     stagger_batch_with_seed(batch, 42)
diff --git a/datafusion/physical-plan/src/joins/sort_merge_join/stream.rs 
b/datafusion/physical-plan/src/joins/sort_merge_join/stream.rs
index 28020450c4..0325e37d42 100644
--- a/datafusion/physical-plan/src/joins/sort_merge_join/stream.rs
+++ b/datafusion/physical-plan/src/joins/sort_merge_join/stream.rs
@@ -1559,26 +1559,32 @@ impl SortMergeJoinStream {
                 null_joined_batch.num_rows(),
             );
 
-            let columns = if !matches!(self.join_type, JoinType::Right) {
-                let mut left_columns = null_joined_batch
-                    .columns()
-                    .iter()
-                    .take(right_columns_length)
-                    .cloned()
-                    .collect::<Vec<_>>();
-
-                left_columns.extend(right_columns);
-                left_columns
-            } else {
-                let left_columns = null_joined_batch
-                    .columns()
-                    .iter()
-                    .skip(left_columns_length)
-                    .cloned()
-                    .collect::<Vec<_>>();
-
-                right_columns.extend(left_columns);
-                right_columns
+            let columns = match self.join_type {
+                JoinType::Right => {
+                    // The first columns are the right columns.
+                    let left_columns = null_joined_batch
+                        .columns()
+                        .iter()
+                        .skip(right_columns_length)
+                        .cloned()
+                        .collect::<Vec<_>>();
+
+                    right_columns.extend(left_columns);
+                    right_columns
+                }
+                JoinType::Left | JoinType::LeftMark | JoinType::RightMark => {
+                    // The first columns are the left columns.
+                    let mut left_columns = null_joined_batch
+                        .columns()
+                        .iter()
+                        .take(left_columns_length)
+                        .cloned()
+                        .collect::<Vec<_>>();
+
+                    left_columns.extend(right_columns);
+                    left_columns
+                }
+                _ => exec_err!("Did not expect join type {}", self.join_type)?,
             };
 
             // Push the streamed/buffered batch joined nulls to the output
diff --git a/datafusion/physical-plan/src/joins/sort_merge_join/tests.rs 
b/datafusion/physical-plan/src/joins/sort_merge_join/tests.rs
index 83a5c4041c..f91bffbed7 100644
--- a/datafusion/physical-plan/src/joins/sort_merge_join/tests.rs
+++ b/datafusion/physical-plan/src/joins/sort_merge_join/tests.rs
@@ -644,6 +644,240 @@ async fn join_right_one() -> Result<()> {
     Ok(())
 }
 
+#[tokio::test]
+async fn join_right_different_columns_count_with_filter() -> Result<()> {
+    // select *
+    // from t1
+    // right join t2 on t1.b1 = t2.b1 and t1.a1 > t2.a2
+
+    let left = build_table(
+        ("a1", &vec![1, 21, 3]), // 21(t1.a1) > 20(t2.a2)
+        ("b1", &vec![4, 5, 7]),
+        ("c1", &vec![7, 8, 9]),
+    );
+
+    let right = build_table_two_cols(
+        ("a2", &vec![10, 20, 30]),
+        ("b1", &vec![4, 5, 6]), // 6 does not exist on the left
+    );
+
+    let on = vec![(
+        Arc::new(Column::new_with_schema("b1", &left.schema())?) as _,
+        Arc::new(Column::new_with_schema("b1", &right.schema())?) as _,
+    )];
+
+    let filter = JoinFilter::new(
+        Arc::new(BinaryExpr::new(
+            Arc::new(Column::new("a1", 0)),
+            Operator::Gt,
+            Arc::new(Column::new("a2", 1)),
+        )),
+        vec![
+            ColumnIndex {
+                index: 0,
+                side: JoinSide::Left,
+            },
+            ColumnIndex {
+                index: 0,
+                side: JoinSide::Right,
+            },
+        ],
+        Arc::new(Schema::new(vec![
+            Field::new("a1", DataType::Int32, true),
+            Field::new("a2", DataType::Int32, true),
+        ])),
+    );
+
+    let (_, batches) = join_collect_with_filter(left, right, on, filter, 
Right).await?;
+
+    assert_snapshot!(batches_to_string(&batches), @r#"
+            +----+----+----+----+----+
+            | a1 | b1 | c1 | a2 | b1 |
+            +----+----+----+----+----+
+            |    |    |    | 10 | 4  |
+            | 21 | 5  | 8  | 20 | 5  |
+            |    |    |    | 30 | 6  |
+            +----+----+----+----+----+
+            "#);
+    Ok(())
+}
+
+#[tokio::test]
+async fn join_left_different_columns_count_with_filter() -> Result<()> {
+    // select *
+    // from t2
+    // left join t1 on t2.b1 = t1.b1 and t2.a2 > t1.a1
+
+    let left = build_table_two_cols(
+        ("a2", &vec![10, 20, 30]),
+        ("b1", &vec![4, 5, 6]), // 6 does not exist on the right
+    );
+
+    let right = build_table(
+        ("a1", &vec![1, 21, 3]), // 20(t2.a2) > 1(t1.a1)
+        ("b1", &vec![4, 5, 7]),
+        ("c1", &vec![7, 8, 9]),
+    );
+
+    let on = vec![(
+        Arc::new(Column::new_with_schema("b1", &left.schema())?) as _,
+        Arc::new(Column::new_with_schema("b1", &right.schema())?) as _,
+    )];
+
+    let filter = JoinFilter::new(
+        Arc::new(BinaryExpr::new(
+            Arc::new(Column::new("a2", 0)),
+            Operator::Gt,
+            Arc::new(Column::new("a1", 1)),
+        )),
+        vec![
+            ColumnIndex {
+                index: 0,
+                side: JoinSide::Left,
+            },
+            ColumnIndex {
+                index: 0,
+                side: JoinSide::Right,
+            },
+        ],
+        Arc::new(Schema::new(vec![
+            Field::new("a2", DataType::Int32, true),
+            Field::new("a1", DataType::Int32, true),
+        ])),
+    );
+
+    let (_, batches) = join_collect_with_filter(left, right, on, filter, 
Left).await?;
+
+    assert_snapshot!(batches_to_string(&batches), @r#"
+            +----+----+----+----+----+
+            | a2 | b1 | a1 | b1 | c1 |
+            +----+----+----+----+----+
+            | 10 | 4  | 1  | 4  | 7  |
+            | 20 | 5  |    |    |    |
+            | 30 | 6  |    |    |    |
+            +----+----+----+----+----+
+            "#);
+    Ok(())
+}
+
+#[tokio::test]
+async fn join_left_mark_different_columns_count_with_filter() -> Result<()> {
+    // select *
+    // from t2
+    // left mark join t1 on t2.b1 = t1.b1 and t2.a2 > t1.a1
+
+    let left = build_table_two_cols(
+        ("a2", &vec![10, 20, 30]),
+        ("b1", &vec![4, 5, 6]), // 6 does not exist on the right
+    );
+
+    let right = build_table(
+        ("a1", &vec![1, 21, 3]), // 20(t2.a2) > 1(t1.a1)
+        ("b1", &vec![4, 5, 7]),
+        ("c1", &vec![7, 8, 9]),
+    );
+
+    let on = vec![(
+        Arc::new(Column::new_with_schema("b1", &left.schema())?) as _,
+        Arc::new(Column::new_with_schema("b1", &right.schema())?) as _,
+    )];
+
+    let filter = JoinFilter::new(
+        Arc::new(BinaryExpr::new(
+            Arc::new(Column::new("a2", 0)),
+            Operator::Gt,
+            Arc::new(Column::new("a1", 1)),
+        )),
+        vec![
+            ColumnIndex {
+                index: 0,
+                side: JoinSide::Left,
+            },
+            ColumnIndex {
+                index: 0,
+                side: JoinSide::Right,
+            },
+        ],
+        Arc::new(Schema::new(vec![
+            Field::new("a2", DataType::Int32, true),
+            Field::new("a1", DataType::Int32, true),
+        ])),
+    );
+
+    let (_, batches) =
+        join_collect_with_filter(left, right, on, filter, LeftMark).await?;
+
+    assert_snapshot!(batches_to_string(&batches), @r#"
+            +----+----+-------+
+            | a2 | b1 | mark  |
+            +----+----+-------+
+            | 10 | 4  | true  |
+            | 20 | 5  | false |
+            | 30 | 6  | false |
+            +----+----+-------+
+            "#);
+    Ok(())
+}
+
+#[tokio::test]
+async fn join_right_mark_different_columns_count_with_filter() -> Result<()> {
+    // select *
+    // from t1
+    // right mark join t2 on t1.b1 = t2.b1 and t1.a1 > t2.a2
+
+    let left = build_table(
+        ("a1", &vec![1, 21, 3]), // 21(t1.a1) > 20(t2.a2)
+        ("b1", &vec![4, 5, 7]),
+        ("c1", &vec![7, 8, 9]),
+    );
+
+    let right = build_table_two_cols(
+        ("a2", &vec![10, 20, 30]),
+        ("b1", &vec![4, 5, 6]), // 6 does not exist on the left
+    );
+
+    let on = vec![(
+        Arc::new(Column::new_with_schema("b1", &left.schema())?) as _,
+        Arc::new(Column::new_with_schema("b1", &right.schema())?) as _,
+    )];
+
+    let filter = JoinFilter::new(
+        Arc::new(BinaryExpr::new(
+            Arc::new(Column::new("a1", 0)),
+            Operator::Gt,
+            Arc::new(Column::new("a2", 1)),
+        )),
+        vec![
+            ColumnIndex {
+                index: 0,
+                side: JoinSide::Left,
+            },
+            ColumnIndex {
+                index: 0,
+                side: JoinSide::Right,
+            },
+        ],
+        Arc::new(Schema::new(vec![
+            Field::new("a1", DataType::Int32, true),
+            Field::new("a2", DataType::Int32, true),
+        ])),
+    );
+
+    let (_, batches) =
+        join_collect_with_filter(left, right, on, filter, RightMark).await?;
+
+    assert_snapshot!(batches_to_string(&batches), @r#"
+            +----+----+-------+
+            | a2 | b1 | mark  |
+            +----+----+-------+
+            | 10 | 4  | false |
+            | 20 | 5  | true  |
+            | 30 | 6  | false |
+            +----+----+-------+
+            "#);
+    Ok(())
+}
+
 #[tokio::test]
 async fn join_full_one() -> Result<()> {
     let left = build_table(
diff --git a/datafusion/sqllogictest/test_files/sort_merge_join.slt 
b/datafusion/sqllogictest/test_files/sort_merge_join.slt
index ed46333321..aa87026c5c 100644
--- a/datafusion/sqllogictest/test_files/sort_merge_join.slt
+++ b/datafusion/sqllogictest/test_files/sort_merge_join.slt
@@ -891,3 +891,52 @@ drop table t2;
 # return sql params back to default values
 statement ok
 set datafusion.optimizer.prefer_hash_join = true;
+
+##########
+## Tests for equijoins with different column counts
+##########
+
+statement ok
+set datafusion.optimizer.prefer_hash_join = false;
+
+statement ok
+DROP TABLE IF EXISTS t1;
+
+statement ok
+CREATE TABLE t1(a int, b int) AS VALUES (1, 100), (2, 200), (3, 300);
+
+statement ok
+DROP TABLE IF EXISTS t2;
+
+statement ok
+CREATE TABLE t2(a int, b int, c int) AS VALUES (4, 101, 1001), (3, 201, 2001), 
(2, 250, 3001);
+
+statement ok
+DROP TABLE IF EXISTS t3;
+
+statement ok
+CREATE TABLE t3(x int) AS VALUES (1);
+
+query IIIII
+SELECT * FROM t2 RIGHT JOIN t1 on t1.a = t2.a AND t1.b < t2.b
+----
+NULL NULL NULL 1 100
+2 250 3001 2 200
+NULL NULL NULL 3 300
+
+query IIIII
+SELECT * FROM t1 LEFT JOIN t2 on t1.a = t2.a AND t1.b < t2.b
+----
+1 100 NULL NULL NULL
+2 200 2 250 3001
+3 300 NULL NULL NULL
+
+# Small table for LeftMark
+
+# LeftMark equijoin with different columns count
+query III rowsort
+SELECT t2.a, t2.b, t2.c
+FROM t2
+WHERE t2.a > 3 OR t2.a IN (SELECT t3.x FROM t3 WHERE t2.b < 150)
+----
+4 101 1001
\ No newline at end of file


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to