This is an automated email from the ASF dual-hosted git repository.
comphead pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new df5dab77c2 SMJ: Add more tests and improve comments (#10784)
df5dab77c2 is described below
commit df5dab77c24021d2b79089430a78c1dd4dc8d2fc
Author: Oleks V <[email protected]>
AuthorDate: Wed Jun 5 08:01:33 2024 -0700
SMJ: Add more tests and improve comments (#10784)
* SMJ: Add more tests and improve comments
---
.../physical-plan/src/joins/sort_merge_join.rs | 31 ++-----
.../sqllogictest/test_files/sort_merge_join.slt | 101 +++++++++++++++++++++
2 files changed, 110 insertions(+), 22 deletions(-)
diff --git a/datafusion/physical-plan/src/joins/sort_merge_join.rs
b/datafusion/physical-plan/src/joins/sort_merge_join.rs
index 143a726d31..8da345cdfc 100644
--- a/datafusion/physical-plan/src/joins/sort_merge_join.rs
+++ b/datafusion/physical-plan/src/joins/sort_merge_join.rs
@@ -1244,8 +1244,7 @@ impl SMJStream {
streamed_indices,
mask,
&self.streamed_batch.join_filter_matched_idxs,
- &self.buffered_data.scanning_batch_idx,
- &self.buffered_data.batches.len(),
+ &self.buffered_data.scanning_offset,
);
if let Some(ref filtered_join_mask) =
maybe_filtered_join_mask {
@@ -1445,15 +1444,13 @@ fn get_buffered_columns(
/// false = the row index doesn't match the join filter
/// `streamed_indices` have the same length as `mask`
/// `matched_indices` array of streaming indices that already has a join
filter match
-/// `scanning_batch_idx` current buffered batch
-/// `buffered_batches_len` how many batches are in buffered data
+/// `scanning_buffered_offset` current buffered offset across batches
fn get_filtered_join_mask(
join_type: JoinType,
streamed_indices: UInt64Array,
mask: &BooleanArray,
matched_indices: &HashSet<u64>,
- scanning_buffered_batch_idx: &usize,
- buffered_batches_len: &usize,
+ scanning_buffered_offset: &usize,
) -> Option<(BooleanArray, Vec<u64>)> {
let mut seen_as_true: bool = false;
let streamed_indices_length = streamed_indices.len();
@@ -1489,8 +1486,8 @@ fn get_filtered_join_mask(
}
Some((corrected_mask.finish(), filter_matched_indices))
}
- // LeftAnti semantics: return true if for every x in the collection,
p(x) is false.
- // the true(if any) flag needs to be set only once per streaming index
+ // LeftAnti semantics: return true if for every x in the collection
the join matching filter is false.
+ // `filter_matched_indices` needs to be set once per streaming index
// to prevent duplicates in the output
JoinType::LeftAnti => {
// have we seen a filter match for a streaming index before
@@ -1500,11 +1497,13 @@ fn get_filtered_join_mask(
filter_matched_indices.push(streamed_indices.value(i));
}
- // if switched to next streaming index(e.g. from 0 to 1, or
from 1 to 2), we reset seen_as_true flag
+ // Reset `seen_as_true` flag and calculate mask for the
current streaming index
+ // - if within the batch it switched to next streaming
index(e.g. from 0 to 1, or from 1 to 2)
+ // - if it is at the end of the all buffered batches for the
given streaming index, 0 index comes last
if (i < streamed_indices_length - 1
&& streamed_indices.value(i) != streamed_indices.value(i +
1))
|| (i == streamed_indices_length - 1
- && *scanning_buffered_batch_idx ==
buffered_batches_len - 1)
+ && *scanning_buffered_offset == 0)
{
corrected_mask.append_value(
!matched_indices.contains(&streamed_indices.value(i))
@@ -2813,7 +2812,6 @@ mod tests {
&BooleanArray::from(vec![true, true, false, false]),
&HashSet::new(),
&0,
- &0
),
Some((BooleanArray::from(vec![true, false, false, false]),
vec![0]))
);
@@ -2825,7 +2823,6 @@ mod tests {
&BooleanArray::from(vec![true, true]),
&HashSet::new(),
&0,
- &0
),
Some((BooleanArray::from(vec![true, true]), vec![0, 1]))
);
@@ -2837,7 +2834,6 @@ mod tests {
&BooleanArray::from(vec![false, true]),
&HashSet::new(),
&0,
- &0
),
Some((BooleanArray::from(vec![false, true]), vec![1]))
);
@@ -2849,7 +2845,6 @@ mod tests {
&BooleanArray::from(vec![true, false]),
&HashSet::new(),
&0,
- &0
),
Some((BooleanArray::from(vec![true, false]), vec![0]))
);
@@ -2861,7 +2856,6 @@ mod tests {
&BooleanArray::from(vec![false, true, true, true, true, true]),
&HashSet::new(),
&0,
- &0
),
Some((
BooleanArray::from(vec![false, true, false, true, false,
false]),
@@ -2876,7 +2870,6 @@ mod tests {
&BooleanArray::from(vec![false, false, false, false, false,
true]),
&HashSet::new(),
&0,
- &0
),
Some((
BooleanArray::from(vec![false, false, false, false, false,
true]),
@@ -2896,7 +2889,6 @@ mod tests {
&BooleanArray::from(vec![true, true, false, false]),
&HashSet::new(),
&0,
- &1
),
Some((BooleanArray::from(vec![false, false, false, true]),
vec![0]))
);
@@ -2908,7 +2900,6 @@ mod tests {
&BooleanArray::from(vec![true, true]),
&HashSet::new(),
&0,
- &1
),
Some((BooleanArray::from(vec![false, false]), vec![0, 1]))
);
@@ -2920,7 +2911,6 @@ mod tests {
&BooleanArray::from(vec![false, true]),
&HashSet::new(),
&0,
- &1
),
Some((BooleanArray::from(vec![true, false]), vec![1]))
);
@@ -2932,7 +2922,6 @@ mod tests {
&BooleanArray::from(vec![true, false]),
&HashSet::new(),
&0,
- &1
),
Some((BooleanArray::from(vec![false, true]), vec![0]))
);
@@ -2944,7 +2933,6 @@ mod tests {
&BooleanArray::from(vec![false, true, true, true, true, true]),
&HashSet::new(),
&0,
- &1
),
Some((
BooleanArray::from(vec![false, false, false, false, false,
false]),
@@ -2959,7 +2947,6 @@ mod tests {
&BooleanArray::from(vec![false, false, false, false, false,
true]),
&HashSet::new(),
&0,
- &1
),
Some((
BooleanArray::from(vec![false, false, true, false, false,
false]),
diff --git a/datafusion/sqllogictest/test_files/sort_merge_join.slt
b/datafusion/sqllogictest/test_files/sort_merge_join.slt
index 1fd8b0a346..b4deb43a72 100644
--- a/datafusion/sqllogictest/test_files/sort_merge_join.slt
+++ b/datafusion/sqllogictest/test_files/sort_merge_join.slt
@@ -463,6 +463,64 @@ select t1.* from t1 where not exists (select 1 from t2
where t2.a = t1.a and t2.
11 12 1
11 13 2
+query II
+select * from (
+with
+t1 as (
+ select 11 a, 12 b),
+t2 as (
+ select 11 a, 13 c union all
+ select 11 a, 14 c union all
+ select 11 a, 15 c
+ )
+select t1.* from t1 where not exists (select 1 from t2 where t2.a = t1.a and
t1.b > t2.c)
+) order by 1, 2
+----
+11 12
+
+query II
+select * from (
+with
+t1 as (
+ select 11 a, 12 b),
+t2 as (
+ select 11 a, 11 c union all
+ select 11 a, 14 c union all
+ select 11 a, 15 c
+ )
+select t1.* from t1 where not exists (select 1 from t2 where t2.a = t1.a and
t1.b > t2.c)
+) order by 1, 2
+----
+
+query II
+select * from (
+with
+t1 as (
+ select 11 a, 12 b),
+t2 as (
+ select 11 a, 12 c union all
+ select 11 a, 11 c union all
+ select 11 a, 15 c
+ )
+select t1.* from t1 where not exists (select 1 from t2 where t2.a = t1.a and
t1.b > t2.c)
+) order by 1, 2
+----
+
+query II
+select * from (
+with
+t1 as (
+ select 11 a, 12 b),
+t2 as (
+ select 11 a, 12 c union all
+ select 11 a, 14 c union all
+ select 11 a, 11 c
+ )
+select t1.* from t1 where not exists (select 1 from t2 where t2.a = t1.a and
t1.b > t2.c)
+) order by 1, 2
+----
+
+
# Test LEFT ANTI with cross batch data distribution
statement ok
set datafusion.execution.batch_size = 1;
@@ -512,6 +570,49 @@ select t1.* from t1 where not exists (select 1 from t2
where t2.a = t1.a and t2.
11 12 1
11 13 2
+query II
+select * from (
+with
+t1 as (
+ select 11 a, 12 b),
+t2 as (
+ select 11 a, 13 c union all
+ select 11 a, 14 c union all
+ select 11 a, 15 c
+ )
+select t1.* from t1 where not exists (select 1 from t2 where t2.a = t1.a and
t1.b > t2.c)
+) order by 1, 2
+----
+11 12
+
+query II
+select * from (
+with
+t1 as (
+ select 11 a, 12 b),
+t2 as (
+ select 11 a, 12 c union all
+ select 11 a, 11 c union all
+ select 11 a, 15 c
+ )
+select t1.* from t1 where not exists (select 1 from t2 where t2.a = t1.a and
t1.b > t2.c)
+) order by 1, 2
+----
+
+query II
+select * from (
+with
+t1 as (
+ select 11 a, 12 b),
+t2 as (
+ select 11 a, 12 c union all
+ select 11 a, 14 c union all
+ select 11 a, 11 c
+ )
+select t1.* from t1 where not exists (select 1 from t2 where t2.a = t1.a and
t1.b > t2.c)
+) order by 1, 2
+----
+
# return sql params back to default values
statement ok
set datafusion.optimizer.prefer_hash_join = true;
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]