viirya commented on code in PR #10304:
URL: https://github.com/apache/datafusion/pull/10304#discussion_r1592978951


##########
datafusion/physical-plan/src/joins/sort_merge_join.rs:
##########
@@ -1195,11 +1207,45 @@ impl SMJStream {
                         .into_array(filter_batch.num_rows())?;
 
                     // The selection mask of the filter
-                    let mask = 
datafusion_common::cast::as_boolean_array(&filter_result)?;
+                    let mut mask =
+                        
datafusion_common::cast::as_boolean_array(&filter_result)?;
+                    // for LeftSemi Join the filter mask should be calculated 
in its own way:
+                    // if we find at least one matching row for specific 
streaming key/filter we dont need to check others for the same key/filter
+                    let mut maybe_left_semi_mask: Option<BooleanArray> = None;
+                    if matches!(self.join_type, JoinType::LeftSemi) {
+                        // did we get a filter match for a streaming index
+                        let mut seen_as_true: bool = false;
+                        let streamed_indices_length = streamed_indices.len();
+                        let mut corrected_mask: Vec<bool> =
+                            vec![false; streamed_indices_length];
+
+                        #[allow(clippy::needless_range_loop)]
+                        for i in 0..streamed_indices_length {
+                            // if for a streaming index its a match first 
time, set it as true
+                            if mask.value(i) && !seen_as_true {
+                                seen_as_true = true;
+                                corrected_mask[i] = true;
+                            }
+
+                            // if switched to next streaming index(e.g from 0 
to 1, or from 1 to 2), we reset seen_as_true flag
+                            if i < streamed_indices_length - 1
+                                && streamed_indices.value(i)
+                                    != streamed_indices.value(i + 1)
+                            {
+                                seen_as_true = false;
+                            }
+                        }
+                        maybe_left_semi_mask = 
Some(BooleanArray::from(corrected_mask))
+                    };
+
+                    if let Some(ref left_semi_mask) = maybe_left_semi_mask {
+                        mask = left_semi_mask;
+                    }
 
                     // Push the filtered batch to the output
                     let filtered_batch =
                         compute::filter_record_batch(&output_batch, mask)?;
+

Review Comment:
   Unnecessary change.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org
For additional commands, e-mail: github-h...@datafusion.apache.org

Reply via email to