comphead commented on code in PR #10724:
URL: https://github.com/apache/datafusion/pull/10724#discussion_r1623243082


##########
datafusion/physical-plan/src/joins/sort_merge_join.rs:
##########
@@ -1419,51 +1438,87 @@ fn get_buffered_columns(
         .collect::<Result<Vec<_>, ArrowError>>()
 }
 
-// Calculate join filter bit mask considering join type specifics
-// `streamed_indices` - array of streamed datasource JOINED row indices
-// `mask` - array booleans representing computed join filter expression eval 
result:
-//      true = the row index matches the join filter
-//      false = the row index doesn't match the join filter
-// `streamed_indices` have the same length as `mask`
+/// Calculate join filter bit mask considering join type specifics
+/// `streamed_indices` - array of streamed datasource JOINED row indices
+/// `mask` - array booleans representing computed join filter expression eval 
result:
+///      true = the row index matches the join filter
+///      false = the row index doesn't match the join filter
+/// `streamed_indices` have the same length as `mask`
+/// `matched_indices` array of streaming indices that already has a join 
filter match
+/// `scanning_batch_idx` current buffered batch
+/// `buffered_batches_len` how many batches are in buffered data
 fn get_filtered_join_mask(
     join_type: JoinType,
     streamed_indices: UInt64Array,
     mask: &BooleanArray,
+    matched_indices: &HashSet<u64>,
+    scanning_buffered_batch_idx: &usize,
+    buffered_batches_len: &usize,
 ) -> Option<(BooleanArray, Vec<u64>)> {
-    // for LeftSemi Join the filter mask should be calculated in its own way:
-    // if we find at least one matching row for specific streaming index
-    // we don't need to check any others for the same index
-    if matches!(join_type, JoinType::LeftSemi) {
-        // have we seen a filter match for a streaming index before
-        let mut seen_as_true: bool = false;
-        let streamed_indices_length = streamed_indices.len();
-        let mut corrected_mask: BooleanBuilder =
-            BooleanBuilder::with_capacity(streamed_indices_length);
-
-        let mut filter_matched_indices: Vec<u64> = vec![];
-
-        #[allow(clippy::needless_range_loop)]
-        for i in 0..streamed_indices_length {
-            // LeftSemi respects only first true values for specific streaming 
index,
-            // others true values for the same index must be false
-            if mask.value(i) && !seen_as_true {
-                seen_as_true = true;
-                corrected_mask.append_value(true);
-                filter_matched_indices.push(streamed_indices.value(i));
-            } else {
-                corrected_mask.append_value(false);
+    let mut seen_as_true: bool = false;
+    let streamed_indices_length = streamed_indices.len();
+    let mut corrected_mask: BooleanBuilder =
+        BooleanBuilder::with_capacity(streamed_indices_length);
+
+    let mut filter_matched_indices: Vec<u64> = vec![];
+
+    #[allow(clippy::needless_range_loop)]
+    match join_type {
+        // for LeftSemi Join the filter mask should be calculated in its own 
way:
+        // if we find at least one matching row for specific streaming index
+        // we don't need to check any others for the same index
+        JoinType::LeftSemi => {
+            // have we seen a filter match for a streaming index before
+            for i in 0..streamed_indices_length {
+                // LeftSemi respects only first true values for specific 
streaming index,
+                // others true values for the same index must be false
+                if mask.value(i) && !seen_as_true {
+                    seen_as_true = true;
+                    corrected_mask.append_value(true);
+                    filter_matched_indices.push(streamed_indices.value(i));
+                } else {
+                    corrected_mask.append_value(false);
+                }
+
+                // if switched to next streaming index(e.g. from 0 to 1, or 
from 1 to 2), we reset seen_as_true flag
+                if i < streamed_indices_length - 1
+                    && streamed_indices.value(i) != streamed_indices.value(i + 
1)
+                {
+                    seen_as_true = false;
+                }
             }
+            Some((corrected_mask.finish(), filter_matched_indices))
+        }
+        // LeftAnti semantics: return true if for every x in the collection, 
p(x) is false.
+        // the true(if any) flag needs to be set only once per streaming index
+        // to prevent duplicates in the output
+        JoinType::LeftAnti => {
+            // have we seen a filter match for a streaming index before
+            for i in 0..streamed_indices_length {
+                if mask.value(i) && !seen_as_true {
+                    seen_as_true = true;
+                    filter_matched_indices.push(streamed_indices.value(i));
+                }
 
-            // if switched to next streaming index(e.g. from 0 to 1, or from 1 
to 2), we reset seen_as_true flag
-            if i < streamed_indices_length - 1
-                && streamed_indices.value(i) != streamed_indices.value(i + 1)
-            {
-                seen_as_true = false;
+                // if switched to next streaming index(e.g. from 0 to 1, or 
from 1 to 2), we reset seen_as_true flag
+                if (i < streamed_indices_length - 1
+                    && streamed_indices.value(i) != streamed_indices.value(i + 
1))
+                    || (i == streamed_indices_length - 1
+                        && *scanning_buffered_batch_idx == 
buffered_batches_len - 1)

Review Comment:
   > Hmm, the second condition is for the last index in `streamed_indices` and 
last buffered batch. But in last buffered batch, there are many buffered 
indices. It is possible we are in the last buffered batch, but we still have 
more buffered indices to process. Isn't this put a value into `corrected_mask` 
too early so it could be incorrect?
   
   Good point, I'll check that



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to