This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new 908a3a1d2f Minor: SMJ fuzz tests fix for rowcounts (#10891)
908a3a1d2f is described below

commit 908a3a1d2feea1b1ae8c6220dcdb9e8264dd27ad
Author: Oleks V <[email protected]>
AuthorDate: Wed Jun 12 14:46:50 2024 -0700

    Minor: SMJ fuzz tests fix for rowcounts (#10891)
    
    * Fix: Sort Merge Join crashes on TPCH Q21
    
    * Fix LeftAnti SMJ join when the join filter is set
    
    * rm dbg
    
    * Minor: Fix fuzz testing row counts
---
 datafusion/core/tests/fuzz_cases/join_fuzz.rs | 55 +++++++++++++++------------
 1 file changed, 31 insertions(+), 24 deletions(-)

diff --git a/datafusion/core/tests/fuzz_cases/join_fuzz.rs 
b/datafusion/core/tests/fuzz_cases/join_fuzz.rs
index 7dbbfb25bf..a893e78058 100644
--- a/datafusion/core/tests/fuzz_cases/join_fuzz.rs
+++ b/datafusion/core/tests/fuzz_cases/join_fuzz.rs
@@ -55,7 +55,7 @@ async fn test_inner_join_1k() {
     .await
 }
 
-fn less_than_10_join_filter(schema1: Arc<Schema>, _schema2: Arc<Schema>) -> 
JoinFilter {
+fn less_than_100_join_filter(schema1: Arc<Schema>, _schema2: Arc<Schema>) -> 
JoinFilter {
     let less_than_100 = Arc::new(BinaryExpr::new(
         Arc::new(Column::new("a", 0)),
         Operator::Lt,
@@ -77,7 +77,7 @@ async fn test_inner_join_1k_filtered() {
         make_staggered_batches(1000),
         make_staggered_batches(1000),
         JoinType::Inner,
-        Some(Box::new(less_than_10_join_filter)),
+        Some(Box::new(less_than_100_join_filter)),
     )
     .run_test()
     .await
@@ -113,7 +113,7 @@ async fn test_left_join_1k_filtered() {
         make_staggered_batches(1000),
         make_staggered_batches(1000),
         JoinType::Left,
-        Some(Box::new(less_than_10_join_filter)),
+        Some(Box::new(less_than_100_join_filter)),
     )
     .run_test()
     .await
@@ -138,7 +138,7 @@ async fn test_right_join_1k_filtered() {
         make_staggered_batches(1000),
         make_staggered_batches(1000),
         JoinType::Right,
-        Some(Box::new(less_than_10_join_filter)),
+        Some(Box::new(less_than_100_join_filter)),
     )
     .run_test()
     .await
@@ -162,7 +162,7 @@ async fn test_full_join_1k_filtered() {
         make_staggered_batches(1000),
         make_staggered_batches(1000),
         JoinType::Full,
-        Some(Box::new(less_than_10_join_filter)),
+        Some(Box::new(less_than_100_join_filter)),
     )
     .run_test()
     .await
@@ -179,15 +179,14 @@ async fn test_semi_join_1k() {
     .run_test()
     .await
 }
-// See https://github.com/apache/datafusion/issues/10886
-#[ignore]
+
 #[tokio::test]
 async fn test_semi_join_1k_filtered() {
     JoinFuzzTestCase::new(
         make_staggered_batches(1000),
         make_staggered_batches(1000),
         JoinType::LeftSemi,
-        Some(Box::new(less_than_10_join_filter)),
+        Some(Box::new(less_than_100_join_filter)),
     )
     .run_test()
     .await
@@ -213,7 +212,7 @@ async fn test_anti_join_1k_filtered() {
         make_staggered_batches(1000),
         make_staggered_batches(1000),
         JoinType::LeftAnti,
-        Some(Box::new(less_than_10_join_filter)),
+        Some(Box::new(less_than_100_join_filter)),
     )
     .run_test()
     .await
@@ -392,6 +391,15 @@ impl JoinFuzzTestCase {
             let hj = self.hash_join();
             let hj_collected = collect(hj, task_ctx.clone()).await.unwrap();
 
+            // Get actual row counts(without formatting overhead) for HJ and 
SMJ
+            let hj_rows = hj_collected.iter().fold(0, |acc, b| acc + 
b.num_rows());
+            let smj_rows = smj_collected.iter().fold(0, |acc, b| acc + 
b.num_rows());
+
+            assert_eq!(
+                hj_rows, smj_rows,
+                "SortMergeJoinExec and HashJoinExec produced different row 
counts"
+            );
+
             let nlj = self.nested_loop_join();
             let nlj_collected = collect(nlj, task_ctx.clone()).await.unwrap();
 
@@ -414,21 +422,20 @@ impl JoinFuzzTestCase {
                 nlj_formatted.trim().lines().collect();
             nlj_formatted_sorted.sort_unstable();
 
-            assert_eq!(
-                smj_formatted_sorted.len(),
-                hj_formatted_sorted.len(),
-                "SortMergeJoinExec and HashJoinExec produced different row 
counts"
-            );
-            for (i, (smj_line, hj_line)) in smj_formatted_sorted
-                .iter()
-                .zip(&hj_formatted_sorted)
-                .enumerate()
-            {
-                assert_eq!(
-                    (i, smj_line),
-                    (i, hj_line),
-                    "SortMergeJoinExec and HashJoinExec produced different 
results"
-                );
+            // row level compare if any of joins returns the result
+            // the reason is different formatting when there is no rows
+            if smj_rows > 0 || hj_rows > 0 {
+                for (i, (smj_line, hj_line)) in smj_formatted_sorted
+                    .iter()
+                    .zip(&hj_formatted_sorted)
+                    .enumerate()
+                {
+                    assert_eq!(
+                        (i, smj_line),
+                        (i, hj_line),
+                        "SortMergeJoinExec and HashJoinExec produced different 
results"
+                    );
+                }
             }
 
             for (i, (nlj_line, hj_line)) in nlj_formatted_sorted


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to