This is an automated email from the ASF dual-hosted git repository.

comphead pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new df5dab77c2 SMJ: Add more tests and improve comments (#10784)
df5dab77c2 is described below

commit df5dab77c24021d2b79089430a78c1dd4dc8d2fc
Author: Oleks V <[email protected]>
AuthorDate: Wed Jun 5 08:01:33 2024 -0700

    SMJ: Add more tests and improve comments (#10784)
    
    * SMJ: Add more tests and improve comments
---
 .../physical-plan/src/joins/sort_merge_join.rs     |  31 ++-----
 .../sqllogictest/test_files/sort_merge_join.slt    | 101 +++++++++++++++++++++
 2 files changed, 110 insertions(+), 22 deletions(-)

diff --git a/datafusion/physical-plan/src/joins/sort_merge_join.rs 
b/datafusion/physical-plan/src/joins/sort_merge_join.rs
index 143a726d31..8da345cdfc 100644
--- a/datafusion/physical-plan/src/joins/sort_merge_join.rs
+++ b/datafusion/physical-plan/src/joins/sort_merge_join.rs
@@ -1244,8 +1244,7 @@ impl SMJStream {
                             streamed_indices,
                             mask,
                             &self.streamed_batch.join_filter_matched_idxs,
-                            &self.buffered_data.scanning_batch_idx,
-                            &self.buffered_data.batches.len(),
+                            &self.buffered_data.scanning_offset,
                         );
 
                     if let Some(ref filtered_join_mask) = 
maybe_filtered_join_mask {
@@ -1445,15 +1444,13 @@ fn get_buffered_columns(
 ///      false = the row index doesn't match the join filter
 /// `streamed_indices` have the same length as `mask`
 /// `matched_indices` array of streaming indices that already has a join 
filter match
-/// `scanning_batch_idx` current buffered batch
-/// `buffered_batches_len` how many batches are in buffered data
+/// `scanning_buffered_offset` current buffered offset across batches
 fn get_filtered_join_mask(
     join_type: JoinType,
     streamed_indices: UInt64Array,
     mask: &BooleanArray,
     matched_indices: &HashSet<u64>,
-    scanning_buffered_batch_idx: &usize,
-    buffered_batches_len: &usize,
+    scanning_buffered_offset: &usize,
 ) -> Option<(BooleanArray, Vec<u64>)> {
     let mut seen_as_true: bool = false;
     let streamed_indices_length = streamed_indices.len();
@@ -1489,8 +1486,8 @@ fn get_filtered_join_mask(
             }
             Some((corrected_mask.finish(), filter_matched_indices))
         }
-        // LeftAnti semantics: return true if for every x in the collection, 
p(x) is false.
-        // the true(if any) flag needs to be set only once per streaming index
+        // LeftAnti semantics: return true if for every x in the collection 
the join matching filter is false.
+        // `filter_matched_indices` needs to be set once per streaming index
         // to prevent duplicates in the output
         JoinType::LeftAnti => {
             // have we seen a filter match for a streaming index before
@@ -1500,11 +1497,13 @@ fn get_filtered_join_mask(
                     filter_matched_indices.push(streamed_indices.value(i));
                 }
 
-                // if switched to next streaming index(e.g. from 0 to 1, or 
from 1 to 2), we reset seen_as_true flag
+                // Reset `seen_as_true` flag and calculate mask for the 
current streaming index
+                // - if within the batch it switched to next streaming 
index(e.g. from 0 to 1, or from 1 to 2)
+                // - if it is at the end of the all buffered batches for the 
given streaming index, 0 index comes last
                 if (i < streamed_indices_length - 1
                     && streamed_indices.value(i) != streamed_indices.value(i + 
1))
                     || (i == streamed_indices_length - 1
-                        && *scanning_buffered_batch_idx == 
buffered_batches_len - 1)
+                        && *scanning_buffered_offset == 0)
                 {
                     corrected_mask.append_value(
                         !matched_indices.contains(&streamed_indices.value(i))
@@ -2813,7 +2812,6 @@ mod tests {
                 &BooleanArray::from(vec![true, true, false, false]),
                 &HashSet::new(),
                 &0,
-                &0
             ),
             Some((BooleanArray::from(vec![true, false, false, false]), 
vec![0]))
         );
@@ -2825,7 +2823,6 @@ mod tests {
                 &BooleanArray::from(vec![true, true]),
                 &HashSet::new(),
                 &0,
-                &0
             ),
             Some((BooleanArray::from(vec![true, true]), vec![0, 1]))
         );
@@ -2837,7 +2834,6 @@ mod tests {
                 &BooleanArray::from(vec![false, true]),
                 &HashSet::new(),
                 &0,
-                &0
             ),
             Some((BooleanArray::from(vec![false, true]), vec![1]))
         );
@@ -2849,7 +2845,6 @@ mod tests {
                 &BooleanArray::from(vec![true, false]),
                 &HashSet::new(),
                 &0,
-                &0
             ),
             Some((BooleanArray::from(vec![true, false]), vec![0]))
         );
@@ -2861,7 +2856,6 @@ mod tests {
                 &BooleanArray::from(vec![false, true, true, true, true, true]),
                 &HashSet::new(),
                 &0,
-                &0
             ),
             Some((
                 BooleanArray::from(vec![false, true, false, true, false, 
false]),
@@ -2876,7 +2870,6 @@ mod tests {
                 &BooleanArray::from(vec![false, false, false, false, false, 
true]),
                 &HashSet::new(),
                 &0,
-                &0
             ),
             Some((
                 BooleanArray::from(vec![false, false, false, false, false, 
true]),
@@ -2896,7 +2889,6 @@ mod tests {
                 &BooleanArray::from(vec![true, true, false, false]),
                 &HashSet::new(),
                 &0,
-                &1
             ),
             Some((BooleanArray::from(vec![false, false, false, true]), 
vec![0]))
         );
@@ -2908,7 +2900,6 @@ mod tests {
                 &BooleanArray::from(vec![true, true]),
                 &HashSet::new(),
                 &0,
-                &1
             ),
             Some((BooleanArray::from(vec![false, false]), vec![0, 1]))
         );
@@ -2920,7 +2911,6 @@ mod tests {
                 &BooleanArray::from(vec![false, true]),
                 &HashSet::new(),
                 &0,
-                &1
             ),
             Some((BooleanArray::from(vec![true, false]), vec![1]))
         );
@@ -2932,7 +2922,6 @@ mod tests {
                 &BooleanArray::from(vec![true, false]),
                 &HashSet::new(),
                 &0,
-                &1
             ),
             Some((BooleanArray::from(vec![false, true]), vec![0]))
         );
@@ -2944,7 +2933,6 @@ mod tests {
                 &BooleanArray::from(vec![false, true, true, true, true, true]),
                 &HashSet::new(),
                 &0,
-                &1
             ),
             Some((
                 BooleanArray::from(vec![false, false, false, false, false, 
false]),
@@ -2959,7 +2947,6 @@ mod tests {
                 &BooleanArray::from(vec![false, false, false, false, false, 
true]),
                 &HashSet::new(),
                 &0,
-                &1
             ),
             Some((
                 BooleanArray::from(vec![false, false, true, false, false, 
false]),
diff --git a/datafusion/sqllogictest/test_files/sort_merge_join.slt 
b/datafusion/sqllogictest/test_files/sort_merge_join.slt
index 1fd8b0a346..b4deb43a72 100644
--- a/datafusion/sqllogictest/test_files/sort_merge_join.slt
+++ b/datafusion/sqllogictest/test_files/sort_merge_join.slt
@@ -463,6 +463,64 @@ select t1.* from t1 where not exists (select 1 from t2 
where t2.a = t1.a and t2.
 11 12 1
 11 13 2
 
+query II
+select * from (
+with
+t1 as (
+    select 11 a, 12 b),
+t2 as (
+    select 11 a, 13 c union all
+    select 11 a, 14 c union all
+    select 11 a, 15 c
+    )
+select t1.* from t1 where not exists (select 1 from t2 where t2.a = t1.a and 
t1.b > t2.c)
+) order by 1, 2
+----
+11 12
+
+query II
+select * from (
+with
+t1 as (
+    select 11 a, 12 b),
+t2 as (
+    select 11 a, 11 c union all
+    select 11 a, 14 c union all
+    select 11 a, 15 c
+    )
+select t1.* from t1 where not exists (select 1 from t2 where t2.a = t1.a and 
t1.b > t2.c)
+) order by 1, 2
+----
+
+query II
+select * from (
+with
+t1 as (
+    select 11 a, 12 b),
+t2 as (
+    select 11 a, 12 c union all
+    select 11 a, 11 c union all
+    select 11 a, 15 c
+    )
+select t1.* from t1 where not exists (select 1 from t2 where t2.a = t1.a and 
t1.b > t2.c)
+) order by 1, 2
+----
+
+query II
+select * from (
+with
+t1 as (
+    select 11 a, 12 b),
+t2 as (
+    select 11 a, 12 c union all
+    select 11 a, 14 c union all
+    select 11 a, 11 c
+    )
+select t1.* from t1 where not exists (select 1 from t2 where t2.a = t1.a and 
t1.b > t2.c)
+) order by 1, 2
+----
+
+
 # Test LEFT ANTI with cross batch data distribution
 statement ok
 set datafusion.execution.batch_size = 1;
@@ -512,6 +570,49 @@ select t1.* from t1 where not exists (select 1 from t2 
where t2.a = t1.a and t2.
 11 12 1
 11 13 2
 
+query II
+select * from (
+with
+t1 as (
+    select 11 a, 12 b),
+t2 as (
+    select 11 a, 13 c union all
+    select 11 a, 14 c union all
+    select 11 a, 15 c
+    )
+select t1.* from t1 where not exists (select 1 from t2 where t2.a = t1.a and 
t1.b > t2.c)
+) order by 1, 2
+----
+11 12
+
+query II
+select * from (
+with
+t1 as (
+    select 11 a, 12 b),
+t2 as (
+    select 11 a, 12 c union all
+    select 11 a, 11 c union all
+    select 11 a, 15 c
+    )
+select t1.* from t1 where not exists (select 1 from t2 where t2.a = t1.a and 
t1.b > t2.c)
+) order by 1, 2
+----
+
+query II
+select * from (
+with
+t1 as (
+    select 11 a, 12 b),
+t2 as (
+    select 11 a, 12 c union all
+    select 11 a, 14 c union all
+    select 11 a, 11 c
+    )
+select t1.* from t1 where not exists (select 1 from t2 where t2.a = t1.a and 
t1.b > t2.c)
+) order by 1, 2
+----
+
 # return sql params back to default values
 statement ok
 set datafusion.optimizer.prefer_hash_join = true;


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to