This is an automated email from the ASF dual-hosted git repository.
viirya pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new b2ff63fdb0 Use prep_null_mask_filter to handle nulls in selection mask
(#9163)
b2ff63fdb0 is described below
commit b2ff63fdb061c8dc48b6fb1a0ac2b727a17e5831
Author: Liang-Chi Hsieh <[email protected]>
AuthorDate: Sat Feb 10 11:39:53 2024 -0800
Use prep_null_mask_filter to handle nulls in selection mask (#9163)
* Use prep_null_mask_filter to handle nulls in selection mask
* Update datafusion/physical-plan/src/joins/sort_merge_join.rs
Co-authored-by: Andrew Lamb <[email protected]>
* Avoid unwrap
---------
Co-authored-by: Andrew Lamb <[email protected]>
---
.../physical-plan/src/joins/sort_merge_join.rs | 23 +++++++++++++++++++++-
1 file changed, 22 insertions(+), 1 deletion(-)
diff --git a/datafusion/physical-plan/src/joins/sort_merge_join.rs
b/datafusion/physical-plan/src/joins/sort_merge_join.rs
index 107fd7dde0..7af614e534 100644
--- a/datafusion/physical-plan/src/joins/sort_merge_join.rs
+++ b/datafusion/physical-plan/src/joins/sort_merge_join.rs
@@ -1209,7 +1209,15 @@ impl SMJStream {
) {
// The reverse of the selection mask. For the rows not
pass join filter above,
// we need to join them (left or right) with null rows
for outer joins.
- let not_mask = compute::not(mask)?;
+ let not_mask = if mask.null_count() > 0 {
+ // If the mask contains nulls, we need to use
`prep_null_mask_filter` to
+ // handle the nulls in the mask as false to
produce rows where the mask
+ // was null itself.
+
compute::not(&compute::prep_null_mask_filter(mask))?
+ } else {
+ compute::not(mask)?
+ };
+
let null_joined_batch =
compute::filter_record_batch(&output_batch,
¬_mask)?;
@@ -1254,6 +1262,19 @@ impl SMJStream {
// For full join, we also need to output the null
joined rows from the buffered side
if matches!(self.join_type, JoinType::Full) {
+ // Handle not mask for buffered side further.
+ // For buffered side, we want to output the rows
that are not null joined with
+ // the streamed side. i.e. the rows that are not
null in the `buffered_indices`.
+ let not_mask = if let Some(nulls) =
buffered_indices.nulls() {
+ let mask = not_mask.values() & nulls.inner();
+ BooleanArray::new(mask, None)
+ } else {
+ not_mask
+ };
+
+ let null_joined_batch =
+ compute::filter_record_batch(&output_batch,
¬_mask)?;
+
let mut streamed_columns = self
.streamed_schema
.fields()