This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new 908a3a1d2f Minor: SMJ fuzz tests fix for rowcounts (#10891)
908a3a1d2f is described below
commit 908a3a1d2feea1b1ae8c6220dcdb9e8264dd27ad
Author: Oleks V <[email protected]>
AuthorDate: Wed Jun 12 14:46:50 2024 -0700
Minor: SMJ fuzz tests fix for rowcounts (#10891)
* Fix: Sort Merge Join crashes on TPCH Q21
* Fix LeftAnti SMJ join when the join filter is set
* rm dbg
* Minor: Fix fuzz testing row counts
---
datafusion/core/tests/fuzz_cases/join_fuzz.rs | 55 +++++++++++++++------------
1 file changed, 31 insertions(+), 24 deletions(-)
diff --git a/datafusion/core/tests/fuzz_cases/join_fuzz.rs
b/datafusion/core/tests/fuzz_cases/join_fuzz.rs
index 7dbbfb25bf..a893e78058 100644
--- a/datafusion/core/tests/fuzz_cases/join_fuzz.rs
+++ b/datafusion/core/tests/fuzz_cases/join_fuzz.rs
@@ -55,7 +55,7 @@ async fn test_inner_join_1k() {
.await
}
-fn less_than_10_join_filter(schema1: Arc<Schema>, _schema2: Arc<Schema>) ->
JoinFilter {
+fn less_than_100_join_filter(schema1: Arc<Schema>, _schema2: Arc<Schema>) ->
JoinFilter {
let less_than_100 = Arc::new(BinaryExpr::new(
Arc::new(Column::new("a", 0)),
Operator::Lt,
@@ -77,7 +77,7 @@ async fn test_inner_join_1k_filtered() {
make_staggered_batches(1000),
make_staggered_batches(1000),
JoinType::Inner,
- Some(Box::new(less_than_10_join_filter)),
+ Some(Box::new(less_than_100_join_filter)),
)
.run_test()
.await
@@ -113,7 +113,7 @@ async fn test_left_join_1k_filtered() {
make_staggered_batches(1000),
make_staggered_batches(1000),
JoinType::Left,
- Some(Box::new(less_than_10_join_filter)),
+ Some(Box::new(less_than_100_join_filter)),
)
.run_test()
.await
@@ -138,7 +138,7 @@ async fn test_right_join_1k_filtered() {
make_staggered_batches(1000),
make_staggered_batches(1000),
JoinType::Right,
- Some(Box::new(less_than_10_join_filter)),
+ Some(Box::new(less_than_100_join_filter)),
)
.run_test()
.await
@@ -162,7 +162,7 @@ async fn test_full_join_1k_filtered() {
make_staggered_batches(1000),
make_staggered_batches(1000),
JoinType::Full,
- Some(Box::new(less_than_10_join_filter)),
+ Some(Box::new(less_than_100_join_filter)),
)
.run_test()
.await
@@ -179,15 +179,14 @@ async fn test_semi_join_1k() {
.run_test()
.await
}
-// See https://github.com/apache/datafusion/issues/10886
-#[ignore]
+
#[tokio::test]
async fn test_semi_join_1k_filtered() {
JoinFuzzTestCase::new(
make_staggered_batches(1000),
make_staggered_batches(1000),
JoinType::LeftSemi,
- Some(Box::new(less_than_10_join_filter)),
+ Some(Box::new(less_than_100_join_filter)),
)
.run_test()
.await
@@ -213,7 +212,7 @@ async fn test_anti_join_1k_filtered() {
make_staggered_batches(1000),
make_staggered_batches(1000),
JoinType::LeftAnti,
- Some(Box::new(less_than_10_join_filter)),
+ Some(Box::new(less_than_100_join_filter)),
)
.run_test()
.await
@@ -392,6 +391,15 @@ impl JoinFuzzTestCase {
let hj = self.hash_join();
let hj_collected = collect(hj, task_ctx.clone()).await.unwrap();
+ // Get actual row counts(without formatting overhead) for HJ and
SMJ
+ let hj_rows = hj_collected.iter().fold(0, |acc, b| acc +
b.num_rows());
+ let smj_rows = smj_collected.iter().fold(0, |acc, b| acc +
b.num_rows());
+
+ assert_eq!(
+ hj_rows, smj_rows,
+ "SortMergeJoinExec and HashJoinExec produced different row
counts"
+ );
+
let nlj = self.nested_loop_join();
let nlj_collected = collect(nlj, task_ctx.clone()).await.unwrap();
@@ -414,21 +422,20 @@ impl JoinFuzzTestCase {
nlj_formatted.trim().lines().collect();
nlj_formatted_sorted.sort_unstable();
- assert_eq!(
- smj_formatted_sorted.len(),
- hj_formatted_sorted.len(),
- "SortMergeJoinExec and HashJoinExec produced different row
counts"
- );
- for (i, (smj_line, hj_line)) in smj_formatted_sorted
- .iter()
- .zip(&hj_formatted_sorted)
- .enumerate()
- {
- assert_eq!(
- (i, smj_line),
- (i, hj_line),
- "SortMergeJoinExec and HashJoinExec produced different
results"
- );
+ // row level compare if any of joins returns the result
+ // the reason is different formatting when there is no rows
+ if smj_rows > 0 || hj_rows > 0 {
+ for (i, (smj_line, hj_line)) in smj_formatted_sorted
+ .iter()
+ .zip(&hj_formatted_sorted)
+ .enumerate()
+ {
+ assert_eq!(
+ (i, smj_line),
+ (i, hj_line),
+ "SortMergeJoinExec and HashJoinExec produced different
results"
+ );
+ }
}
for (i, (nlj_line, hj_line)) in nlj_formatted_sorted
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]