alamb commented on code in PR #10304:
URL: https://github.com/apache/datafusion/pull/10304#discussion_r1598870577
##########
datafusion/sqllogictest/test_files/sort_merge_join.slt:
##########
@@ -263,5 +263,139 @@ DROP TABLE t1;
statement ok
DROP TABLE t2;
+
Review Comment:
To verify that these tests cover the code changes, I ran them locally
without the code changes in this PR and they failed as expected 👍
```
Running "sort_merge_join.slt"
thread 'tokio-runtime-worker' panicked at
datafusion/physical-plan/src/joins/sort_merge_join.rs:1356:22:
index out of bounds: the len is 0 but the index is 1
note: run with `RUST_BACKTRACE=1` environment variable to display a backtrace
External error: task 17 panicked
Error: Execution("1 failures")
error: test failed, to rerun pass `-p datafusion-sqllogictest --test
sqllogictests`
Caused by:
process didn't exit successfully:
`/Users/andrewlamb/Software/datafusion/target/debug/deps/sqllogictests-ce3a36cfeab74789
sort_merge` (exit status: 1)
```
##########
datafusion/physical-plan/src/joins/sort_merge_join.rs:
##########
@@ -1363,6 +1380,57 @@ fn get_filter_column(
filter_columns
}
+// Get buffered data sliece by specific batch index and for specified column
indices only
+#[inline(always)]
+fn get_buffered_columns(
+ buffered_data: &BufferedData,
+ buffered_batch_idx: usize,
+ buffered_indices: &UInt64Array,
+) -> Result<Vec<ArrayRef>, ArrowError> {
+ buffered_data.batches[buffered_batch_idx]
+ .batch
+ .columns()
+ .iter()
+ .map(|column| take(column, &buffered_indices, None))
+ .collect::<Result<Vec<_>, ArrowError>>()
+}
+
+// Calculate join filter bit mask considering join type specifics
Review Comment:
I think it would help to also document what the input parameters
`streamed_indicies` and `mask` represents here (I think it is the rows in
streamed_indices that match the join predicate?)
##########
datafusion/physical-plan/src/joins/sort_merge_join.rs:
##########
@@ -1363,6 +1380,57 @@ fn get_filter_column(
filter_columns
}
+// Get buffered data sliece by specific batch index and for specified column
indices only
Review Comment:
I think `buffered_indices` are row indices (not column indices), which
confused me for a moment
Maybe this would be clearer:
```suggestion
/// Get `buffered_indices` rows for `buffered_data[buffered_batch_idx]`
```
##########
datafusion/physical-plan/src/joins/sort_merge_join.rs:
##########
@@ -2639,6 +2710,70 @@ mod tests {
Ok(())
}
+
+ #[tokio::test]
+ async fn left_semi_join_filtered_mask() -> Result<()> {
+ assert_eq!(
+ get_filtered_join_mask(
+ LeftSemi,
Review Comment:
maybe we should test a type other than LeftSemi as negative test coverage 🤔
##########
datafusion/physical-plan/src/joins/sort_merge_join.rs:
##########
@@ -1161,6 +1162,15 @@ impl SMJStream {
let filter_columns = if chunk.buffered_batch_idx.is_some() {
if matches!(self.join_type, JoinType::Right) {
get_filter_column(&self.filter, &buffered_columns,
&streamed_columns)
+ } else if matches!(self.join_type, JoinType::LeftSemi) {
Review Comment:
I wonder if this should also check for `JoinType::Left` (and the clause
above also check for `JoinType::RightSemi` 🤔
##########
datafusion/physical-plan/src/joins/sort_merge_join.rs:
##########
@@ -1363,6 +1380,57 @@ fn get_filter_column(
filter_columns
}
+// Get buffered data sliece by specific batch index and for specified column
indices only
+#[inline(always)]
+fn get_buffered_columns(
+ buffered_data: &BufferedData,
+ buffered_batch_idx: usize,
+ buffered_indices: &UInt64Array,
+) -> Result<Vec<ArrayRef>, ArrowError> {
+ buffered_data.batches[buffered_batch_idx]
+ .batch
+ .columns()
+ .iter()
+ .map(|column| take(column, &buffered_indices, None))
+ .collect::<Result<Vec<_>, ArrowError>>()
+}
+
+// Calculate join filter bit mask considering join type specifics
+fn get_filtered_join_mask(
+ join_type: JoinType,
+ streamed_indices: UInt64Array,
+ mask: &BooleanArray,
+) -> Option<BooleanArray> {
+ // for LeftSemi Join the filter mask should be calculated in its own way:
+ // if we find at least one matching row for specific streaming index
+ // we dont need to check any others for the same index
+ if matches!(join_type, JoinType::LeftSemi) {
+ // have we seen a filter match for a streaming index before
+ let mut seen_as_true: bool = false;
+ let streamed_indices_length = streamed_indices.len();
+ let mut corrected_mask: Vec<bool> = vec![false;
streamed_indices_length];
+
+ #[allow(clippy::needless_range_loop)]
Review Comment:
I wonder why ignore clippy here?
##########
datafusion/physical-plan/src/joins/sort_merge_join.rs:
##########
@@ -1363,6 +1380,57 @@ fn get_filter_column(
filter_columns
}
+// Get buffered data sliece by specific batch index and for specified column
indices only
+#[inline(always)]
+fn get_buffered_columns(
+ buffered_data: &BufferedData,
+ buffered_batch_idx: usize,
+ buffered_indices: &UInt64Array,
+) -> Result<Vec<ArrayRef>, ArrowError> {
+ buffered_data.batches[buffered_batch_idx]
+ .batch
+ .columns()
+ .iter()
+ .map(|column| take(column, &buffered_indices, None))
+ .collect::<Result<Vec<_>, ArrowError>>()
+}
+
+// Calculate join filter bit mask considering join type specifics
+fn get_filtered_join_mask(
+ join_type: JoinType,
+ streamed_indices: UInt64Array,
+ mask: &BooleanArray,
+) -> Option<BooleanArray> {
+ // for LeftSemi Join the filter mask should be calculated in its own way:
+ // if we find at least one matching row for specific streaming index
+ // we dont need to check any others for the same index
+ if matches!(join_type, JoinType::LeftSemi) {
Review Comment:
I find it strange this doesn't' also handle RightSemi (as in I would expect
the code to be symmetrical)
##########
datafusion/physical-plan/src/joins/sort_merge_join.rs:
##########
@@ -1363,6 +1380,57 @@ fn get_filter_column(
filter_columns
}
+// Get buffered data sliece by specific batch index and for specified column
indices only
+#[inline(always)]
+fn get_buffered_columns(
+ buffered_data: &BufferedData,
+ buffered_batch_idx: usize,
+ buffered_indices: &UInt64Array,
+) -> Result<Vec<ArrayRef>, ArrowError> {
+ buffered_data.batches[buffered_batch_idx]
+ .batch
+ .columns()
+ .iter()
+ .map(|column| take(column, &buffered_indices, None))
+ .collect::<Result<Vec<_>, ArrowError>>()
+}
+
+// Calculate join filter bit mask considering join type specifics
+fn get_filtered_join_mask(
+ join_type: JoinType,
+ streamed_indices: UInt64Array,
+ mask: &BooleanArray,
+) -> Option<BooleanArray> {
+ // for LeftSemi Join the filter mask should be calculated in its own way:
+ // if we find at least one matching row for specific streaming index
+ // we dont need to check any others for the same index
+ if matches!(join_type, JoinType::LeftSemi) {
+ // have we seen a filter match for a streaming index before
+ let mut seen_as_true: bool = false;
+ let streamed_indices_length = streamed_indices.len();
+ let mut corrected_mask: Vec<bool> = vec![false;
streamed_indices_length];
Review Comment:
FWIW it might be faster / easier to followto create the `BooleanArray`
directly using
[`BooleanBuilder`](https://docs.rs/arrow/latest/arrow/array/struct.BooleanBuilder.html)
rather than Vec<bool>
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]