This is an automated email from the ASF dual-hosted git repository.

viirya pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new b2ff63fdb0 Use prep_null_mask_filter to handle nulls in selection mask 
(#9163)
b2ff63fdb0 is described below

commit b2ff63fdb061c8dc48b6fb1a0ac2b727a17e5831
Author: Liang-Chi Hsieh <[email protected]>
AuthorDate: Sat Feb 10 11:39:53 2024 -0800

    Use prep_null_mask_filter to handle nulls in selection mask (#9163)
    
    * Use prep_null_mask_filter to handle nulls in selection mask
    
    * Update datafusion/physical-plan/src/joins/sort_merge_join.rs
    
    Co-authored-by: Andrew Lamb <[email protected]>
    
    * Avoid unwrap
    
    ---------
    
    Co-authored-by: Andrew Lamb <[email protected]>
---
 .../physical-plan/src/joins/sort_merge_join.rs     | 23 +++++++++++++++++++++-
 1 file changed, 22 insertions(+), 1 deletion(-)

diff --git a/datafusion/physical-plan/src/joins/sort_merge_join.rs 
b/datafusion/physical-plan/src/joins/sort_merge_join.rs
index 107fd7dde0..7af614e534 100644
--- a/datafusion/physical-plan/src/joins/sort_merge_join.rs
+++ b/datafusion/physical-plan/src/joins/sort_merge_join.rs
@@ -1209,7 +1209,15 @@ impl SMJStream {
                     ) {
                         // The reverse of the selection mask. For the rows not 
pass join filter above,
                         // we need to join them (left or right) with null rows 
for outer joins.
-                        let not_mask = compute::not(mask)?;
+                        let not_mask = if mask.null_count() > 0 {
+                            // If the mask contains nulls, we need to use 
`prep_null_mask_filter` to
+                            // handle the nulls in the mask as false to 
produce rows where the mask
+                            // was null itself.
+                            
compute::not(&compute::prep_null_mask_filter(mask))?
+                        } else {
+                            compute::not(mask)?
+                        };
+
                         let null_joined_batch =
                             compute::filter_record_batch(&output_batch, 
&not_mask)?;
 
@@ -1254,6 +1262,19 @@ impl SMJStream {
 
                         // For full join, we also need to output the null 
joined rows from the buffered side
                         if matches!(self.join_type, JoinType::Full) {
+                            // Handle not mask for buffered side further.
+                            // For buffered side, we want to output the rows 
that are not null joined with
+                            // the streamed side. i.e. the rows that are not 
null in the `buffered_indices`.
+                            let not_mask = if let Some(nulls) = 
buffered_indices.nulls() {
+                                let mask = not_mask.values() & nulls.inner();
+                                BooleanArray::new(mask, None)
+                            } else {
+                                not_mask
+                            };
+
+                            let null_joined_batch =
+                                compute::filter_record_batch(&output_batch, 
&not_mask)?;
+
                             let mut streamed_columns = self
                                 .streamed_schema
                                 .fields()

Reply via email to