This is an automated email from the ASF dual-hosted git repository.
github-bot pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new 984d210036 fix: Pick correct columns in Sort Merge Equijoin (#18772)
984d210036 is described below
commit 984d210036792a129fdaf3e8cead88da9e405662
Author: Tal Glanzman <[email protected]>
AuthorDate: Tue Nov 18 20:36:57 2025 +0200
fix: Pick correct columns in Sort Merge Equijoin (#18772)
## Which issue does this PR close?
- Closes #18804.
## Rationale for this change
## What changes are included in this PR?
Take correct columns
## Are these changes tested?
Yes,
- added rust tests to hit invalid code paths
- sqltests
- fuzz tests enhancement to fuzzify columns count
Fuzz tests are taken from @rluvaton 's #18788 , excluding those this PR
doesn't fix:
```
fuzz_cases::join_fuzz::test_right_anti_join_1k_binary_filtered
fuzz_cases::join_fuzz::test_right_anti_join_1k_filtered
fuzz_cases::join_fuzz::test_right_semi_join_1k_filtered
```
## Are there any user-facing changes?
---
datafusion/core/tests/fuzz_cases/join_fuzz.rs | 697 ++++++++++++---------
.../src/joins/sort_merge_join/stream.rs | 46 +-
.../src/joins/sort_merge_join/tests.rs | 234 +++++++
.../sqllogictest/test_files/sort_merge_join.slt | 49 ++
4 files changed, 699 insertions(+), 327 deletions(-)
diff --git a/datafusion/core/tests/fuzz_cases/join_fuzz.rs
b/datafusion/core/tests/fuzz_cases/join_fuzz.rs
index e8ff1ccf06..b6a5e18da1 100644
--- a/datafusion/core/tests/fuzz_cases/join_fuzz.rs
+++ b/datafusion/core/tests/fuzz_cases/join_fuzz.rs
@@ -91,141 +91,163 @@ fn col_lt_col_filter(schema1: Arc<Schema>, schema2:
Arc<Schema>) -> JoinFilter {
#[tokio::test]
async fn test_inner_join_1k_filtered() {
- JoinFuzzTestCase::new(
- make_staggered_batches_i32(1000),
- make_staggered_batches_i32(1000),
- JoinType::Inner,
- Some(Box::new(col_lt_col_filter)),
- )
- .run_test(&[HjSmj, NljHj], false)
- .await
+ for (left_extra, right_extra) in [(true, true), (false, true), (true,
false)] {
+ JoinFuzzTestCase::new(
+ make_staggered_batches_i32(1000, left_extra),
+ make_staggered_batches_i32(1000, right_extra),
+ JoinType::Inner,
+ Some(Box::new(col_lt_col_filter)),
+ )
+ .run_test(&[HjSmj, NljHj], false)
+ .await
+ }
}
#[tokio::test]
async fn test_inner_join_1k() {
- JoinFuzzTestCase::new(
- make_staggered_batches_i32(1000),
- make_staggered_batches_i32(1000),
- JoinType::Inner,
- None,
- )
- .run_test(&[HjSmj, NljHj], false)
- .await
+ for (left_extra, right_extra) in [(true, true), (false, true), (true,
false)] {
+ JoinFuzzTestCase::new(
+ make_staggered_batches_i32(1000, left_extra),
+ make_staggered_batches_i32(1000, right_extra),
+ JoinType::Inner,
+ None,
+ )
+ .run_test(&[HjSmj, NljHj], false)
+ .await
+ }
}
#[tokio::test]
async fn test_left_join_1k() {
- JoinFuzzTestCase::new(
- make_staggered_batches_i32(1000),
- make_staggered_batches_i32(1000),
- JoinType::Left,
- None,
- )
- .run_test(&[HjSmj, NljHj], false)
- .await
+ for (left_extra, right_extra) in [(true, true), (false, true), (true,
false)] {
+ JoinFuzzTestCase::new(
+ make_staggered_batches_i32(1000, left_extra),
+ make_staggered_batches_i32(1000, right_extra),
+ JoinType::Left,
+ None,
+ )
+ .run_test(&[HjSmj, NljHj], false)
+ .await
+ }
}
#[tokio::test]
async fn test_left_join_1k_filtered() {
- JoinFuzzTestCase::new(
- make_staggered_batches_i32(1000),
- make_staggered_batches_i32(1000),
- JoinType::Left,
- Some(Box::new(col_lt_col_filter)),
- )
- .run_test(&[HjSmj, NljHj], false)
- .await
+ for (left_extra, right_extra) in [(true, true), (false, true), (true,
false)] {
+ JoinFuzzTestCase::new(
+ make_staggered_batches_i32(1000, left_extra),
+ make_staggered_batches_i32(1000, right_extra),
+ JoinType::Left,
+ Some(Box::new(col_lt_col_filter)),
+ )
+ .run_test(&[HjSmj, NljHj], false)
+ .await
+ }
}
#[tokio::test]
async fn test_right_join_1k() {
- JoinFuzzTestCase::new(
- make_staggered_batches_i32(1000),
- make_staggered_batches_i32(1000),
- JoinType::Right,
- None,
- )
- .run_test(&[HjSmj, NljHj], false)
- .await
+ for (left_extra, right_extra) in [(true, true), (false, true), (true,
false)] {
+ JoinFuzzTestCase::new(
+ make_staggered_batches_i32(1000, left_extra),
+ make_staggered_batches_i32(1000, right_extra),
+ JoinType::Right,
+ None,
+ )
+ .run_test(&[HjSmj, NljHj], false)
+ .await
+ }
}
#[tokio::test]
async fn test_right_join_1k_filtered() {
- JoinFuzzTestCase::new(
- make_staggered_batches_i32(1000),
- make_staggered_batches_i32(1000),
- JoinType::Right,
- Some(Box::new(col_lt_col_filter)),
- )
- .run_test(&[HjSmj, NljHj], false)
- .await
+ for (left_extra, right_extra) in [(true, true), (false, true), (true,
false)] {
+ JoinFuzzTestCase::new(
+ make_staggered_batches_i32(1000, left_extra),
+ make_staggered_batches_i32(1000, right_extra),
+ JoinType::Right,
+ Some(Box::new(col_lt_col_filter)),
+ )
+ .run_test(&[HjSmj, NljHj], false)
+ .await
+ }
}
#[tokio::test]
async fn test_full_join_1k() {
- JoinFuzzTestCase::new(
- make_staggered_batches_i32(1000),
- make_staggered_batches_i32(1000),
- JoinType::Full,
- None,
- )
- .run_test(&[HjSmj, NljHj], false)
- .await
+ for (left_extra, right_extra) in [(true, true), (false, true), (true,
false)] {
+ JoinFuzzTestCase::new(
+ make_staggered_batches_i32(1000, left_extra),
+ make_staggered_batches_i32(1000, right_extra),
+ JoinType::Full,
+ None,
+ )
+ .run_test(&[HjSmj, NljHj], false)
+ .await
+ }
}
#[tokio::test]
async fn test_full_join_1k_filtered() {
- JoinFuzzTestCase::new(
- make_staggered_batches_i32(1000),
- make_staggered_batches_i32(1000),
- JoinType::Full,
- Some(Box::new(col_lt_col_filter)),
- )
- .run_test(&[NljHj, HjSmj], false)
- .await
+ for (left_extra, right_extra) in [(true, true), (false, true), (true,
false)] {
+ JoinFuzzTestCase::new(
+ make_staggered_batches_i32(1000, left_extra),
+ make_staggered_batches_i32(1000, right_extra),
+ JoinType::Full,
+ Some(Box::new(col_lt_col_filter)),
+ )
+ .run_test(&[NljHj, HjSmj], false)
+ .await
+ }
}
#[tokio::test]
async fn test_left_semi_join_1k() {
- JoinFuzzTestCase::new(
- make_staggered_batches_i32(1000),
- make_staggered_batches_i32(1000),
- JoinType::LeftSemi,
- None,
- )
- .run_test(&[HjSmj, NljHj], false)
- .await
+ for (left_extra, right_extra) in [(true, true), (false, true), (true,
false)] {
+ JoinFuzzTestCase::new(
+ make_staggered_batches_i32(1000, left_extra),
+ make_staggered_batches_i32(1000, right_extra),
+ JoinType::LeftSemi,
+ None,
+ )
+ .run_test(&[HjSmj, NljHj], false)
+ .await
+ }
}
#[tokio::test]
async fn test_left_semi_join_1k_filtered() {
- JoinFuzzTestCase::new(
- make_staggered_batches_i32(1000),
- make_staggered_batches_i32(1000),
- JoinType::LeftSemi,
- Some(Box::new(col_lt_col_filter)),
- )
- .run_test(&[HjSmj, NljHj], false)
- .await
+ for (left_extra, right_extra) in [(true, true), (false, true), (true,
false)] {
+ JoinFuzzTestCase::new(
+ make_staggered_batches_i32(1000, left_extra),
+ make_staggered_batches_i32(1000, right_extra),
+ JoinType::LeftSemi,
+ Some(Box::new(col_lt_col_filter)),
+ )
+ .run_test(&[HjSmj, NljHj], false)
+ .await
+ }
}
#[tokio::test]
async fn test_right_semi_join_1k() {
- JoinFuzzTestCase::new(
- make_staggered_batches_i32(1000),
- make_staggered_batches_i32(1000),
- JoinType::RightSemi,
- None,
- )
- .run_test(&[HjSmj, NljHj], false)
- .await
+ for (left_extra, right_extra) in [(true, true), (false, true), (true,
false)] {
+ JoinFuzzTestCase::new(
+ make_staggered_batches_i32(1000, left_extra),
+ make_staggered_batches_i32(1000, right_extra),
+ JoinType::RightSemi,
+ None,
+ )
+ .run_test(&[HjSmj, NljHj], false)
+ .await
+ }
}
#[tokio::test]
async fn test_right_semi_join_1k_filtered() {
JoinFuzzTestCase::new(
- make_staggered_batches_i32(1000),
- make_staggered_batches_i32(1000),
+ make_staggered_batches_i32(1000, false),
+ make_staggered_batches_i32(1000, false),
JoinType::RightSemi,
Some(Box::new(col_lt_col_filter)),
)
@@ -235,45 +257,51 @@ async fn test_right_semi_join_1k_filtered() {
#[tokio::test]
async fn test_left_anti_join_1k() {
- JoinFuzzTestCase::new(
- make_staggered_batches_i32(1000),
- make_staggered_batches_i32(1000),
- JoinType::LeftAnti,
- None,
- )
- .run_test(&[HjSmj, NljHj], false)
- .await
+ for (left_extra, right_extra) in [(true, true), (false, true), (true,
false)] {
+ JoinFuzzTestCase::new(
+ make_staggered_batches_i32(1000, left_extra),
+ make_staggered_batches_i32(1000, right_extra),
+ JoinType::LeftAnti,
+ None,
+ )
+ .run_test(&[HjSmj, NljHj], false)
+ .await
+ }
}
#[tokio::test]
async fn test_left_anti_join_1k_filtered() {
- JoinFuzzTestCase::new(
- make_staggered_batches_i32(1000),
- make_staggered_batches_i32(1000),
- JoinType::LeftAnti,
- Some(Box::new(col_lt_col_filter)),
- )
- .run_test(&[HjSmj, NljHj], false)
- .await
+ for (left_extra, right_extra) in [(true, true), (false, true), (true,
false)] {
+ JoinFuzzTestCase::new(
+ make_staggered_batches_i32(1000, left_extra),
+ make_staggered_batches_i32(1000, right_extra),
+ JoinType::LeftAnti,
+ Some(Box::new(col_lt_col_filter)),
+ )
+ .run_test(&[HjSmj, NljHj], false)
+ .await
+ }
}
#[tokio::test]
async fn test_right_anti_join_1k() {
- JoinFuzzTestCase::new(
- make_staggered_batches_i32(1000),
- make_staggered_batches_i32(1000),
- JoinType::RightAnti,
- None,
- )
- .run_test(&[HjSmj, NljHj], false)
- .await
+ for (left_extra, right_extra) in [(true, true), (false, true), (true,
false)] {
+ JoinFuzzTestCase::new(
+ make_staggered_batches_i32(1000, left_extra),
+ make_staggered_batches_i32(1000, right_extra),
+ JoinType::RightAnti,
+ None,
+ )
+ .run_test(&[HjSmj, NljHj], false)
+ .await
+ }
}
#[tokio::test]
async fn test_right_anti_join_1k_filtered() {
JoinFuzzTestCase::new(
- make_staggered_batches_i32(1000),
- make_staggered_batches_i32(1000),
+ make_staggered_batches_i32(1000, false),
+ make_staggered_batches_i32(1000, false),
JoinType::RightAnti,
Some(Box::new(col_lt_col_filter)),
)
@@ -283,226 +311,262 @@ async fn test_right_anti_join_1k_filtered() {
#[tokio::test]
async fn test_left_mark_join_1k() {
- JoinFuzzTestCase::new(
- make_staggered_batches_i32(1000),
- make_staggered_batches_i32(1000),
- JoinType::LeftMark,
- None,
- )
- .run_test(&[HjSmj, NljHj], false)
- .await
+ for (left_extra, right_extra) in [(true, true), (false, true), (true,
false)] {
+ JoinFuzzTestCase::new(
+ make_staggered_batches_i32(1000, left_extra),
+ make_staggered_batches_i32(1000, right_extra),
+ JoinType::LeftMark,
+ None,
+ )
+ .run_test(&[HjSmj, NljHj], false)
+ .await
+ }
}
#[tokio::test]
async fn test_left_mark_join_1k_filtered() {
- JoinFuzzTestCase::new(
- make_staggered_batches_i32(1000),
- make_staggered_batches_i32(1000),
- JoinType::LeftMark,
- Some(Box::new(col_lt_col_filter)),
- )
- .run_test(&[HjSmj, NljHj], false)
- .await
+ for (left_extra, right_extra) in [(true, true), (false, true), (true,
false)] {
+ JoinFuzzTestCase::new(
+ make_staggered_batches_i32(1000, left_extra),
+ make_staggered_batches_i32(1000, right_extra),
+ JoinType::LeftMark,
+ Some(Box::new(col_lt_col_filter)),
+ )
+ .run_test(&[HjSmj, NljHj], false)
+ .await
+ }
}
// todo: add JoinTestType::HjSmj after Right mark SortMergeJoin support
#[tokio::test]
async fn test_right_mark_join_1k() {
- JoinFuzzTestCase::new(
- make_staggered_batches_i32(1000),
- make_staggered_batches_i32(1000),
- JoinType::RightMark,
- None,
- )
- .run_test(&[HjSmj, NljHj], false)
- .await
+ for (left_extra, right_extra) in [(true, true), (false, true), (true,
false)] {
+ JoinFuzzTestCase::new(
+ make_staggered_batches_i32(1000, left_extra),
+ make_staggered_batches_i32(1000, right_extra),
+ JoinType::RightMark,
+ None,
+ )
+ .run_test(&[HjSmj, NljHj], false)
+ .await
+ }
}
#[tokio::test]
async fn test_right_mark_join_1k_filtered() {
- JoinFuzzTestCase::new(
- make_staggered_batches_i32(1000),
- make_staggered_batches_i32(1000),
- JoinType::RightMark,
- Some(Box::new(col_lt_col_filter)),
- )
- .run_test(&[HjSmj, NljHj], false)
- .await
+ for (left_extra, right_extra) in [(true, true), (false, true), (true,
false)] {
+ JoinFuzzTestCase::new(
+ make_staggered_batches_i32(1000, left_extra),
+ make_staggered_batches_i32(1000, right_extra),
+ JoinType::RightMark,
+ Some(Box::new(col_lt_col_filter)),
+ )
+ .run_test(&[HjSmj, NljHj], false)
+ .await
+ }
}
#[tokio::test]
async fn test_inner_join_1k_binary_filtered() {
- JoinFuzzTestCase::new(
- make_staggered_batches_binary(1000),
- make_staggered_batches_binary(1000),
- JoinType::Inner,
- Some(Box::new(col_lt_col_filter)),
- )
- .run_test(&[HjSmj, NljHj], false)
- .await
+ for (left_extra, right_extra) in [(true, true), (false, true), (true,
false)] {
+ JoinFuzzTestCase::new(
+ make_staggered_batches_binary(1000, left_extra),
+ make_staggered_batches_binary(1000, right_extra),
+ JoinType::Inner,
+ Some(Box::new(col_lt_col_filter)),
+ )
+ .run_test(&[HjSmj, NljHj], false)
+ .await
+ }
}
#[tokio::test]
async fn test_inner_join_1k_binary() {
- JoinFuzzTestCase::new(
- make_staggered_batches_binary(1000),
- make_staggered_batches_binary(1000),
- JoinType::Inner,
- None,
- )
- .run_test(&[HjSmj, NljHj], false)
- .await
+ for (left_extra, right_extra) in [(true, true), (false, true), (true,
false)] {
+ JoinFuzzTestCase::new(
+ make_staggered_batches_binary(1000, left_extra),
+ make_staggered_batches_binary(1000, right_extra),
+ JoinType::Inner,
+ None,
+ )
+ .run_test(&[HjSmj, NljHj], false)
+ .await
+ }
}
#[tokio::test]
async fn test_left_join_1k_binary() {
- JoinFuzzTestCase::new(
- make_staggered_batches_binary(1000),
- make_staggered_batches_binary(1000),
- JoinType::Left,
- None,
- )
- .run_test(&[HjSmj, NljHj], false)
- .await
+ for (left_extra, right_extra) in [(true, true), (false, true), (true,
false)] {
+ JoinFuzzTestCase::new(
+ make_staggered_batches_binary(1000, left_extra),
+ make_staggered_batches_binary(1000, right_extra),
+ JoinType::Left,
+ None,
+ )
+ .run_test(&[HjSmj, NljHj], false)
+ .await
+ }
}
#[tokio::test]
async fn test_left_join_1k_binary_filtered() {
- JoinFuzzTestCase::new(
- make_staggered_batches_binary(1000),
- make_staggered_batches_binary(1000),
- JoinType::Left,
- Some(Box::new(col_lt_col_filter)),
- )
- .run_test(&[HjSmj, NljHj], false)
- .await
+ for (left_extra, right_extra) in [(true, true), (false, true), (true,
false)] {
+ JoinFuzzTestCase::new(
+ make_staggered_batches_binary(1000, left_extra),
+ make_staggered_batches_binary(1000, right_extra),
+ JoinType::Left,
+ Some(Box::new(col_lt_col_filter)),
+ )
+ .run_test(&[HjSmj, NljHj], false)
+ .await
+ }
}
#[tokio::test]
async fn test_right_join_1k_binary() {
- JoinFuzzTestCase::new(
- make_staggered_batches_binary(1000),
- make_staggered_batches_binary(1000),
- JoinType::Right,
- None,
- )
- .run_test(&[HjSmj, NljHj], false)
- .await
+ for (left_extra, right_extra) in [(true, true), (false, true), (true,
false)] {
+ JoinFuzzTestCase::new(
+ make_staggered_batches_binary(1000, left_extra),
+ make_staggered_batches_binary(1000, right_extra),
+ JoinType::Right,
+ None,
+ )
+ .run_test(&[HjSmj, NljHj], false)
+ .await
+ }
}
#[tokio::test]
async fn test_right_join_1k_binary_filtered() {
- JoinFuzzTestCase::new(
- make_staggered_batches_binary(1000),
- make_staggered_batches_binary(1000),
- JoinType::Right,
- Some(Box::new(col_lt_col_filter)),
- )
- .run_test(&[HjSmj, NljHj], false)
- .await
+ for (left_extra, right_extra) in [(true, true), (false, true), (true,
false)] {
+ JoinFuzzTestCase::new(
+ make_staggered_batches_binary(1000, left_extra),
+ make_staggered_batches_binary(1000, right_extra),
+ JoinType::Right,
+ Some(Box::new(col_lt_col_filter)),
+ )
+ .run_test(&[HjSmj, NljHj], false)
+ .await
+ }
}
#[tokio::test]
async fn test_full_join_1k_binary() {
- JoinFuzzTestCase::new(
- make_staggered_batches_binary(1000),
- make_staggered_batches_binary(1000),
- JoinType::Full,
- None,
- )
- .run_test(&[HjSmj, NljHj], false)
- .await
+ for (left_extra, right_extra) in [(true, true), (false, true), (true,
false)] {
+ JoinFuzzTestCase::new(
+ make_staggered_batches_binary(1000, left_extra),
+ make_staggered_batches_binary(1000, right_extra),
+ JoinType::Full,
+ None,
+ )
+ .run_test(&[HjSmj, NljHj], false)
+ .await
+ }
}
#[tokio::test]
async fn test_full_join_1k_binary_filtered() {
- JoinFuzzTestCase::new(
- make_staggered_batches_binary(1000),
- make_staggered_batches_binary(1000),
- JoinType::Full,
- Some(Box::new(col_lt_col_filter)),
- )
- .run_test(&[NljHj, HjSmj], false)
- .await
+ for (left_extra, right_extra) in [(true, true), (false, true), (true,
false)] {
+ JoinFuzzTestCase::new(
+ make_staggered_batches_binary(1000, left_extra),
+ make_staggered_batches_binary(1000, right_extra),
+ JoinType::Full,
+ Some(Box::new(col_lt_col_filter)),
+ )
+ .run_test(&[NljHj, HjSmj], false)
+ .await
+ }
}
#[tokio::test]
async fn test_left_semi_join_1k_binary() {
- JoinFuzzTestCase::new(
- make_staggered_batches_binary(1000),
- make_staggered_batches_binary(1000),
- JoinType::LeftSemi,
- None,
- )
- .run_test(&[HjSmj, NljHj], false)
- .await
+ for (left_extra, right_extra) in [(true, true), (false, true), (true,
false)] {
+ JoinFuzzTestCase::new(
+ make_staggered_batches_binary(1000, left_extra),
+ make_staggered_batches_binary(1000, right_extra),
+ JoinType::LeftSemi,
+ None,
+ )
+ .run_test(&[HjSmj, NljHj], false)
+ .await
+ }
}
#[tokio::test]
async fn test_left_semi_join_1k_binary_filtered() {
- JoinFuzzTestCase::new(
- make_staggered_batches_binary(1000),
- make_staggered_batches_binary(1000),
- JoinType::LeftSemi,
- Some(Box::new(col_lt_col_filter)),
- )
- .run_test(&[HjSmj, NljHj], false)
- .await
+ for (left_extra, right_extra) in [(true, true), (false, true), (true,
false)] {
+ JoinFuzzTestCase::new(
+ make_staggered_batches_binary(1000, left_extra),
+ make_staggered_batches_binary(1000, right_extra),
+ JoinType::LeftSemi,
+ Some(Box::new(col_lt_col_filter)),
+ )
+ .run_test(&[HjSmj, NljHj], false)
+ .await
+ }
}
#[tokio::test]
async fn test_right_semi_join_1k_binary() {
- JoinFuzzTestCase::new(
- make_staggered_batches_binary(1000),
- make_staggered_batches_binary(1000),
- JoinType::RightSemi,
- None,
- )
- .run_test(&[HjSmj, NljHj], false)
- .await
+ for (left_extra, right_extra) in [(true, true), (false, true), (true,
false)] {
+ JoinFuzzTestCase::new(
+ make_staggered_batches_binary(1000, left_extra),
+ make_staggered_batches_binary(1000, right_extra),
+ JoinType::RightSemi,
+ None,
+ )
+ .run_test(&[HjSmj, NljHj], false)
+ .await
+ }
}
#[tokio::test]
async fn test_right_semi_join_1k_binary_filtered() {
- JoinFuzzTestCase::new(
- make_staggered_batches_binary(1000),
- make_staggered_batches_binary(1000),
- JoinType::RightSemi,
- Some(Box::new(col_lt_col_filter)),
- )
- .run_test(&[HjSmj, NljHj], false)
- .await
+ for (left_extra, right_extra) in [(true, true), (false, true), (true,
false)] {
+ JoinFuzzTestCase::new(
+ make_staggered_batches_binary(1000, left_extra),
+ make_staggered_batches_binary(1000, right_extra),
+ JoinType::RightSemi,
+ Some(Box::new(col_lt_col_filter)),
+ )
+ .run_test(&[HjSmj, NljHj], false)
+ .await
+ }
}
#[tokio::test]
async fn test_left_anti_join_1k_binary() {
- JoinFuzzTestCase::new(
- make_staggered_batches_binary(1000),
- make_staggered_batches_binary(1000),
- JoinType::LeftAnti,
- None,
- )
- .run_test(&[HjSmj, NljHj], false)
- .await
+ for (left_extra, right_extra) in [(true, true), (false, true), (true,
false)] {
+ JoinFuzzTestCase::new(
+ make_staggered_batches_binary(1000, left_extra),
+ make_staggered_batches_binary(1000, right_extra),
+ JoinType::LeftAnti,
+ None,
+ )
+ .run_test(&[HjSmj, NljHj], false)
+ .await
+ }
}
#[tokio::test]
async fn test_left_anti_join_1k_binary_filtered() {
- JoinFuzzTestCase::new(
- make_staggered_batches_binary(1000),
- make_staggered_batches_binary(1000),
- JoinType::LeftAnti,
- Some(Box::new(col_lt_col_filter)),
- )
- .run_test(&[HjSmj, NljHj], false)
- .await
+ for (left_extra, right_extra) in [(true, true), (false, true), (true,
false)] {
+ JoinFuzzTestCase::new(
+ make_staggered_batches_binary(1000, left_extra),
+ make_staggered_batches_binary(1000, right_extra),
+ JoinType::LeftAnti,
+ Some(Box::new(col_lt_col_filter)),
+ )
+ .run_test(&[HjSmj, NljHj], false)
+ .await
+ }
}
#[tokio::test]
async fn test_right_anti_join_1k_binary() {
JoinFuzzTestCase::new(
- make_staggered_batches_binary(1000),
- make_staggered_batches_binary(1000),
+ make_staggered_batches_binary(1000, false),
+ make_staggered_batches_binary(1000, false),
JoinType::RightAnti,
None,
)
@@ -513,8 +577,8 @@ async fn test_right_anti_join_1k_binary() {
#[tokio::test]
async fn test_right_anti_join_1k_binary_filtered() {
JoinFuzzTestCase::new(
- make_staggered_batches_binary(1000),
- make_staggered_batches_binary(1000),
+ make_staggered_batches_binary(1000, false),
+ make_staggered_batches_binary(1000, false),
JoinType::RightAnti,
Some(Box::new(col_lt_col_filter)),
)
@@ -524,51 +588,59 @@ async fn test_right_anti_join_1k_binary_filtered() {
#[tokio::test]
async fn test_left_mark_join_1k_binary() {
- JoinFuzzTestCase::new(
- make_staggered_batches_binary(1000),
- make_staggered_batches_binary(1000),
- JoinType::LeftMark,
- None,
- )
- .run_test(&[HjSmj, NljHj], false)
- .await
+ for (left_extra, right_extra) in [(true, true), (false, true), (true,
false)] {
+ JoinFuzzTestCase::new(
+ make_staggered_batches_binary(1000, left_extra),
+ make_staggered_batches_binary(1000, right_extra),
+ JoinType::LeftMark,
+ None,
+ )
+ .run_test(&[HjSmj, NljHj], false)
+ .await
+ }
}
#[tokio::test]
async fn test_left_mark_join_1k_binary_filtered() {
- JoinFuzzTestCase::new(
- make_staggered_batches_binary(1000),
- make_staggered_batches_binary(1000),
- JoinType::LeftMark,
- Some(Box::new(col_lt_col_filter)),
- )
- .run_test(&[HjSmj, NljHj], false)
- .await
+ for (left_extra, right_extra) in [(true, true), (false, true), (true,
false)] {
+ JoinFuzzTestCase::new(
+ make_staggered_batches_binary(1000, left_extra),
+ make_staggered_batches_binary(1000, right_extra),
+ JoinType::LeftMark,
+ Some(Box::new(col_lt_col_filter)),
+ )
+ .run_test(&[HjSmj, NljHj], false)
+ .await
+ }
}
// todo: add JoinTestType::HjSmj after Right mark SortMergeJoin support
#[tokio::test]
async fn test_right_mark_join_1k_binary() {
- JoinFuzzTestCase::new(
- make_staggered_batches_binary(1000),
- make_staggered_batches_binary(1000),
- JoinType::RightMark,
- None,
- )
- .run_test(&[HjSmj, NljHj], false)
- .await
+ for (left_extra, right_extra) in [(true, true), (false, true), (true,
false)] {
+ JoinFuzzTestCase::new(
+ make_staggered_batches_binary(1000, left_extra),
+ make_staggered_batches_binary(1000, right_extra),
+ JoinType::RightMark,
+ None,
+ )
+ .run_test(&[HjSmj, NljHj], false)
+ .await
+ }
}
#[tokio::test]
async fn test_right_mark_join_1k_binary_filtered() {
- JoinFuzzTestCase::new(
- make_staggered_batches_binary(1000),
- make_staggered_batches_binary(1000),
- JoinType::RightMark,
- Some(Box::new(col_lt_col_filter)),
- )
- .run_test(&[HjSmj, NljHj], false)
- .await
+ for (left_extra, right_extra) in [(true, true), (false, true), (true,
false)] {
+ JoinFuzzTestCase::new(
+ make_staggered_batches_binary(1000, left_extra),
+ make_staggered_batches_binary(1000, right_extra),
+ JoinType::RightMark,
+ Some(Box::new(col_lt_col_filter)),
+ )
+ .run_test(&[HjSmj, NljHj], false)
+ .await
+ }
}
type JoinFilterBuilder = Box<dyn Fn(Arc<Schema>, Arc<Schema>) -> JoinFilter>;
@@ -1031,7 +1103,7 @@ impl JoinFuzzTestCase {
/// Return randomly sized record batches with:
/// two sorted int32 columns 'a', 'b' ranged from 0..99 as join columns
/// two random int32 columns 'x', 'y' as other columns
-fn make_staggered_batches_i32(len: usize) -> Vec<RecordBatch> {
+fn make_staggered_batches_i32(len: usize, with_extra_column: bool) ->
Vec<RecordBatch> {
let mut rng = rand::rng();
let mut input12: Vec<(i32, i32)> = vec![(0, 0); len];
let mut input3: Vec<i32> = vec![0; len];
@@ -1047,14 +1119,18 @@ fn make_staggered_batches_i32(len: usize) ->
Vec<RecordBatch> {
let input3 = Int32Array::from_iter_values(input3);
let input4 = Int32Array::from_iter_values(input4);
- // split into several record batches
- let batch = RecordBatch::try_from_iter(vec![
+ let mut columns = vec![
("a", Arc::new(input1) as ArrayRef),
("b", Arc::new(input2) as ArrayRef),
("x", Arc::new(input3) as ArrayRef),
- ("y", Arc::new(input4) as ArrayRef),
- ])
- .unwrap();
+ ];
+
+ if with_extra_column {
+ columns.push(("y", Arc::new(input4) as ArrayRef));
+ }
+
+ // split into several record batches
+ let batch = RecordBatch::try_from_iter(columns).unwrap();
// use a random number generator to pick a random sized output
stagger_batch_with_seed(batch, 42)
@@ -1070,7 +1146,10 @@ fn rand_bytes<R: Rng>(rng: &mut R, min: usize, max:
usize) -> Vec<u8> {
/// Return randomly sized record batches with:
/// two sorted binary columns 'a', 'b' (lexicographically) as join columns
/// two random binary columns 'x', 'y' as other columns
-fn make_staggered_batches_binary(len: usize) -> Vec<RecordBatch> {
+fn make_staggered_batches_binary(
+ len: usize,
+ with_extra_column: bool,
+) -> Vec<RecordBatch> {
let mut rng = rand::rng();
// produce (a,b) pairs then sort lexicographically so SMJ has naturally
sorted keys
@@ -1088,13 +1167,17 @@ fn make_staggered_batches_binary(len: usize) ->
Vec<RecordBatch> {
let x = BinaryArray::from_iter_values(input3.iter());
let y = BinaryArray::from_iter_values(input4.iter());
- let batch = RecordBatch::try_from_iter(vec![
+ let mut columns = vec![
("a", Arc::new(a) as ArrayRef),
("b", Arc::new(b) as ArrayRef),
("x", Arc::new(x) as ArrayRef),
- ("y", Arc::new(y) as ArrayRef),
- ])
- .unwrap();
+ ];
+
+ if with_extra_column {
+ columns.push(("y", Arc::new(y) as ArrayRef));
+ }
+
+ let batch = RecordBatch::try_from_iter(columns).unwrap();
// preserve your existing randomized partitioning
stagger_batch_with_seed(batch, 42)
diff --git a/datafusion/physical-plan/src/joins/sort_merge_join/stream.rs
b/datafusion/physical-plan/src/joins/sort_merge_join/stream.rs
index 28020450c4..0325e37d42 100644
--- a/datafusion/physical-plan/src/joins/sort_merge_join/stream.rs
+++ b/datafusion/physical-plan/src/joins/sort_merge_join/stream.rs
@@ -1559,26 +1559,32 @@ impl SortMergeJoinStream {
null_joined_batch.num_rows(),
);
- let columns = if !matches!(self.join_type, JoinType::Right) {
- let mut left_columns = null_joined_batch
- .columns()
- .iter()
- .take(right_columns_length)
- .cloned()
- .collect::<Vec<_>>();
-
- left_columns.extend(right_columns);
- left_columns
- } else {
- let left_columns = null_joined_batch
- .columns()
- .iter()
- .skip(left_columns_length)
- .cloned()
- .collect::<Vec<_>>();
-
- right_columns.extend(left_columns);
- right_columns
+ let columns = match self.join_type {
+ JoinType::Right => {
+ // The first columns are the right columns.
+ let left_columns = null_joined_batch
+ .columns()
+ .iter()
+ .skip(right_columns_length)
+ .cloned()
+ .collect::<Vec<_>>();
+
+ right_columns.extend(left_columns);
+ right_columns
+ }
+ JoinType::Left | JoinType::LeftMark | JoinType::RightMark => {
+ // The first columns are the left columns.
+ let mut left_columns = null_joined_batch
+ .columns()
+ .iter()
+ .take(left_columns_length)
+ .cloned()
+ .collect::<Vec<_>>();
+
+ left_columns.extend(right_columns);
+ left_columns
+ }
+ _ => exec_err!("Did not expect join type {}", self.join_type)?,
};
// Push the streamed/buffered batch joined nulls to the output
diff --git a/datafusion/physical-plan/src/joins/sort_merge_join/tests.rs
b/datafusion/physical-plan/src/joins/sort_merge_join/tests.rs
index 83a5c4041c..f91bffbed7 100644
--- a/datafusion/physical-plan/src/joins/sort_merge_join/tests.rs
+++ b/datafusion/physical-plan/src/joins/sort_merge_join/tests.rs
@@ -644,6 +644,240 @@ async fn join_right_one() -> Result<()> {
Ok(())
}
+#[tokio::test]
+async fn join_right_different_columns_count_with_filter() -> Result<()> {
+ // select *
+ // from t1
+ // right join t2 on t1.b1 = t2.b1 and t1.a1 > t2.a2
+
+ let left = build_table(
+ ("a1", &vec![1, 21, 3]), // 21(t1.a1) > 20(t2.a2)
+ ("b1", &vec![4, 5, 7]),
+ ("c1", &vec![7, 8, 9]),
+ );
+
+ let right = build_table_two_cols(
+ ("a2", &vec![10, 20, 30]),
+ ("b1", &vec![4, 5, 6]), // 6 does not exist on the left
+ );
+
+ let on = vec![(
+ Arc::new(Column::new_with_schema("b1", &left.schema())?) as _,
+ Arc::new(Column::new_with_schema("b1", &right.schema())?) as _,
+ )];
+
+ let filter = JoinFilter::new(
+ Arc::new(BinaryExpr::new(
+ Arc::new(Column::new("a1", 0)),
+ Operator::Gt,
+ Arc::new(Column::new("a2", 1)),
+ )),
+ vec![
+ ColumnIndex {
+ index: 0,
+ side: JoinSide::Left,
+ },
+ ColumnIndex {
+ index: 0,
+ side: JoinSide::Right,
+ },
+ ],
+ Arc::new(Schema::new(vec![
+ Field::new("a1", DataType::Int32, true),
+ Field::new("a2", DataType::Int32, true),
+ ])),
+ );
+
+ let (_, batches) = join_collect_with_filter(left, right, on, filter,
Right).await?;
+
+ assert_snapshot!(batches_to_string(&batches), @r#"
+ +----+----+----+----+----+
+ | a1 | b1 | c1 | a2 | b1 |
+ +----+----+----+----+----+
+ | | | | 10 | 4 |
+ | 21 | 5 | 8 | 20 | 5 |
+ | | | | 30 | 6 |
+ +----+----+----+----+----+
+ "#);
+ Ok(())
+}
+
+#[tokio::test]
+async fn join_left_different_columns_count_with_filter() -> Result<()> {
+ // select *
+ // from t2
+ // left join t1 on t2.b1 = t1.b1 and t2.a2 > t1.a1
+
+ let left = build_table_two_cols(
+ ("a2", &vec![10, 20, 30]),
+ ("b1", &vec![4, 5, 6]), // 6 does not exist on the right
+ );
+
+ let right = build_table(
+ ("a1", &vec![1, 21, 3]), // 20(t2.a2) > 1(t1.a1)
+ ("b1", &vec![4, 5, 7]),
+ ("c1", &vec![7, 8, 9]),
+ );
+
+ let on = vec![(
+ Arc::new(Column::new_with_schema("b1", &left.schema())?) as _,
+ Arc::new(Column::new_with_schema("b1", &right.schema())?) as _,
+ )];
+
+ let filter = JoinFilter::new(
+ Arc::new(BinaryExpr::new(
+ Arc::new(Column::new("a2", 0)),
+ Operator::Gt,
+ Arc::new(Column::new("a1", 1)),
+ )),
+ vec![
+ ColumnIndex {
+ index: 0,
+ side: JoinSide::Left,
+ },
+ ColumnIndex {
+ index: 0,
+ side: JoinSide::Right,
+ },
+ ],
+ Arc::new(Schema::new(vec![
+ Field::new("a2", DataType::Int32, true),
+ Field::new("a1", DataType::Int32, true),
+ ])),
+ );
+
+ let (_, batches) = join_collect_with_filter(left, right, on, filter,
Left).await?;
+
+ assert_snapshot!(batches_to_string(&batches), @r#"
+ +----+----+----+----+----+
+ | a2 | b1 | a1 | b1 | c1 |
+ +----+----+----+----+----+
+ | 10 | 4 | 1 | 4 | 7 |
+ | 20 | 5 | | | |
+ | 30 | 6 | | | |
+ +----+----+----+----+----+
+ "#);
+ Ok(())
+}
+
+#[tokio::test]
+async fn join_left_mark_different_columns_count_with_filter() -> Result<()> {
+ // select *
+ // from t2
+ // left mark join t1 on t2.b1 = t1.b1 and t2.a2 > t1.a1
+
+ let left = build_table_two_cols(
+ ("a2", &vec![10, 20, 30]),
+ ("b1", &vec![4, 5, 6]), // 6 does not exist on the right
+ );
+
+ let right = build_table(
+ ("a1", &vec![1, 21, 3]), // 20(t2.a2) > 1(t1.a1)
+ ("b1", &vec![4, 5, 7]),
+ ("c1", &vec![7, 8, 9]),
+ );
+
+ let on = vec![(
+ Arc::new(Column::new_with_schema("b1", &left.schema())?) as _,
+ Arc::new(Column::new_with_schema("b1", &right.schema())?) as _,
+ )];
+
+ let filter = JoinFilter::new(
+ Arc::new(BinaryExpr::new(
+ Arc::new(Column::new("a2", 0)),
+ Operator::Gt,
+ Arc::new(Column::new("a1", 1)),
+ )),
+ vec![
+ ColumnIndex {
+ index: 0,
+ side: JoinSide::Left,
+ },
+ ColumnIndex {
+ index: 0,
+ side: JoinSide::Right,
+ },
+ ],
+ Arc::new(Schema::new(vec![
+ Field::new("a2", DataType::Int32, true),
+ Field::new("a1", DataType::Int32, true),
+ ])),
+ );
+
+ let (_, batches) =
+ join_collect_with_filter(left, right, on, filter, LeftMark).await?;
+
+ assert_snapshot!(batches_to_string(&batches), @r#"
+ +----+----+-------+
+ | a2 | b1 | mark |
+ +----+----+-------+
+ | 10 | 4 | true |
+ | 20 | 5 | false |
+ | 30 | 6 | false |
+ +----+----+-------+
+ "#);
+ Ok(())
+}
+
+#[tokio::test]
+async fn join_right_mark_different_columns_count_with_filter() -> Result<()> {
+ // select *
+ // from t1
+ // right mark join t2 on t1.b1 = t2.b1 and t1.a1 > t2.a2
+
+ let left = build_table(
+ ("a1", &vec![1, 21, 3]), // 21(t1.a1) > 20(t2.a2)
+ ("b1", &vec![4, 5, 7]),
+ ("c1", &vec![7, 8, 9]),
+ );
+
+ let right = build_table_two_cols(
+ ("a2", &vec![10, 20, 30]),
+ ("b1", &vec![4, 5, 6]), // 6 does not exist on the left
+ );
+
+ let on = vec![(
+ Arc::new(Column::new_with_schema("b1", &left.schema())?) as _,
+ Arc::new(Column::new_with_schema("b1", &right.schema())?) as _,
+ )];
+
+ let filter = JoinFilter::new(
+ Arc::new(BinaryExpr::new(
+ Arc::new(Column::new("a1", 0)),
+ Operator::Gt,
+ Arc::new(Column::new("a2", 1)),
+ )),
+ vec![
+ ColumnIndex {
+ index: 0,
+ side: JoinSide::Left,
+ },
+ ColumnIndex {
+ index: 0,
+ side: JoinSide::Right,
+ },
+ ],
+ Arc::new(Schema::new(vec![
+ Field::new("a1", DataType::Int32, true),
+ Field::new("a2", DataType::Int32, true),
+ ])),
+ );
+
+ let (_, batches) =
+ join_collect_with_filter(left, right, on, filter, RightMark).await?;
+
+ assert_snapshot!(batches_to_string(&batches), @r#"
+ +----+----+-------+
+ | a2 | b1 | mark |
+ +----+----+-------+
+ | 10 | 4 | false |
+ | 20 | 5 | true |
+ | 30 | 6 | false |
+ +----+----+-------+
+ "#);
+ Ok(())
+}
+
#[tokio::test]
async fn join_full_one() -> Result<()> {
let left = build_table(
diff --git a/datafusion/sqllogictest/test_files/sort_merge_join.slt
b/datafusion/sqllogictest/test_files/sort_merge_join.slt
index ed46333321..aa87026c5c 100644
--- a/datafusion/sqllogictest/test_files/sort_merge_join.slt
+++ b/datafusion/sqllogictest/test_files/sort_merge_join.slt
@@ -891,3 +891,52 @@ drop table t2;
# return sql params back to default values
statement ok
set datafusion.optimizer.prefer_hash_join = true;
+
+##########
+## Tests for equijoins with different column counts
+##########
+
+statement ok
+set datafusion.optimizer.prefer_hash_join = false;
+
+statement ok
+DROP TABLE IF EXISTS t1;
+
+statement ok
+CREATE TABLE t1(a int, b int) AS VALUES (1, 100), (2, 200), (3, 300);
+
+statement ok
+DROP TABLE IF EXISTS t2;
+
+statement ok
+CREATE TABLE t2(a int, b int, c int) AS VALUES (4, 101, 1001), (3, 201, 2001),
(2, 250, 3001);
+
+statement ok
+DROP TABLE IF EXISTS t3;
+
+statement ok
+CREATE TABLE t3(x int) AS VALUES (1);
+
+query IIIII
+SELECT * FROM t2 RIGHT JOIN t1 on t1.a = t2.a AND t1.b < t2.b
+----
+NULL NULL NULL 1 100
+2 250 3001 2 200
+NULL NULL NULL 3 300
+
+query IIIII
+SELECT * FROM t1 LEFT JOIN t2 on t1.a = t2.a AND t1.b < t2.b
+----
+1 100 NULL NULL NULL
+2 200 2 250 3001
+3 300 NULL NULL NULL
+
+# Small table for LeftMark
+
+# LeftMark equijoin with different columns count
+query III rowsort
+SELECT t2.a, t2.b, t2.c
+FROM t2
+WHERE t2.a > 3 OR t2.a IN (SELECT t3.x FROM t3 WHERE t2.b < 150)
+----
+4 101 1001
\ No newline at end of file
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]