This is an automated email from the ASF dual-hosted git repository.

viirya pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new 780639329f fix: Produce buffered null join row only if all joined rows 
are failed on join filter in SMJ full join (#12090)
780639329f is described below

commit 780639329f1664cce184165a9d4653319b893cb0
Author: Liang-Chi Hsieh <[email protected]>
AuthorDate: Fri Aug 23 00:12:24 2024 -0700

    fix: Produce buffered null join row only if all joined rows are failed on 
join filter in SMJ full join (#12090)
    
    * Add test case
    
    * fix
    
    * Update test
    
    * fix
    
    * Remove duplicate
    
    * fix clippy
    
    * fix clippy again
    
    * For review
---
 .../physical-plan/src/joins/sort_merge_join.rs     | 48 ++++++++++++----------
 .../sqllogictest/test_files/sort_merge_join.slt    | 27 +++++++++++-
 2 files changed, 53 insertions(+), 22 deletions(-)

diff --git a/datafusion/physical-plan/src/joins/sort_merge_join.rs 
b/datafusion/physical-plan/src/joins/sort_merge_join.rs
index 96d5ba728a..b5994d16d2 100644
--- a/datafusion/physical-plan/src/joins/sort_merge_join.rs
+++ b/datafusion/physical-plan/src/joins/sort_merge_join.rs
@@ -22,7 +22,7 @@
 
 use std::any::Any;
 use std::cmp::Ordering;
-use std::collections::VecDeque;
+use std::collections::{HashMap, VecDeque};
 use std::fmt::Formatter;
 use std::fs::File;
 use std::io::BufReader;
@@ -595,8 +595,10 @@ struct BufferedBatch {
     /// Size estimation used for reserving / releasing memory
     pub size_estimation: usize,
     /// The indices of buffered batch that failed the join filter.
+    /// This is a map between buffered row index and a boolean value 
indicating whether all joined row
+    /// of the buffered row failed the join filter.
     /// When dequeuing the buffered batch, we need to produce null joined rows 
for these indices.
-    pub join_filter_failed_idxs: HashSet<u64>,
+    pub join_filter_failed_map: HashMap<u64, bool>,
     /// Current buffered batch number of rows. Equal to batch.num_rows()
     /// but if batch is spilled to disk this property is preferable
     /// and less expensive
@@ -637,7 +639,7 @@ impl BufferedBatch {
             join_arrays,
             null_joined: vec![],
             size_estimation,
-            join_filter_failed_idxs: HashSet::new(),
+            join_filter_failed_map: HashMap::new(),
             num_rows,
             spill_file: None,
         }
@@ -1229,11 +1231,19 @@ impl SMJStream {
             }
             buffered_batch.null_joined.clear();
 
-            // For buffered rows which are joined with streamed side but 
doesn't satisfy the join filter
+            // For buffered row which is joined with streamed side rows but 
all joined rows
+            // don't satisfy the join filter
             if output_not_matched_filter {
+                let not_matched_buffered_indices = buffered_batch
+                    .join_filter_failed_map
+                    .iter()
+                    .filter_map(|(idx, failed)| if *failed { Some(*idx) } else 
{ None })
+                    .collect::<Vec<_>>();
+
                 let buffered_indices = UInt64Array::from_iter_values(
-                    buffered_batch.join_filter_failed_idxs.iter().copied(),
+                    not_matched_buffered_indices.iter().copied(),
                 );
+
                 if let Some(record_batch) = produce_buffered_null_batch(
                     &self.schema,
                     &self.streamed_schema,
@@ -1242,7 +1252,7 @@ impl SMJStream {
                 )? {
                     self.output_record_batches.push(record_batch);
                 }
-                buffered_batch.join_filter_failed_idxs.clear();
+                buffered_batch.join_filter_failed_map.clear();
             }
         }
         Ok(())
@@ -1459,24 +1469,20 @@ impl SMJStream {
                         // If it is joined with streamed side, but doesn't 
match the join filter,
                         // we need to output it with nulls as streamed side.
                         if matches!(self.join_type, JoinType::Full) {
+                            let buffered_batch = &mut 
self.buffered_data.batches
+                                [chunk.buffered_batch_idx.unwrap()];
+
                             for i in 0..pre_mask.len() {
-                                let buffered_batch = &mut 
self.buffered_data.batches
-                                    [chunk.buffered_batch_idx.unwrap()];
                                 let buffered_index = buffered_indices.value(i);
 
-                                if !pre_mask.value(i) {
-                                    // For a buffered row that is joined with 
streamed side but doesn't satisfy the join filter,
-                                    buffered_batch
-                                        .join_filter_failed_idxs
-                                        .insert(buffered_index);
-                                } else if buffered_batch
-                                    .join_filter_failed_idxs
-                                    .contains(&buffered_index)
-                                {
-                                    buffered_batch
-                                        .join_filter_failed_idxs
-                                        .remove(&buffered_index);
-                                }
+                                buffered_batch.join_filter_failed_map.insert(
+                                    buffered_index,
+                                    *buffered_batch
+                                        .join_filter_failed_map
+                                        .get(&buffered_index)
+                                        .unwrap_or(&true)
+                                        && !pre_mask.value(i),
+                                );
                             }
                         }
                     }
diff --git a/datafusion/sqllogictest/test_files/sort_merge_join.slt 
b/datafusion/sqllogictest/test_files/sort_merge_join.slt
index ea3088e696..2c28a5fead 100644
--- a/datafusion/sqllogictest/test_files/sort_merge_join.slt
+++ b/datafusion/sqllogictest/test_files/sort_merge_join.slt
@@ -134,7 +134,6 @@ Alice 100 NULL NULL
 Alice 50 Alice 2
 Bob 1 NULL NULL
 NULL NULL Alice 1
-NULL NULL Alice 2
 
 query TITI rowsort
 SELECT * FROM t1 FULL JOIN t2 ON t1.a = t2.a AND t1.b > t2.b + 50
@@ -624,6 +623,32 @@ NULL NULL 7 9
 NULL NULL 8 10
 NULL NULL 9 11
 
+query IIII
+select * from (
+with t as (
+    select id_a id_a_1, id_a % 5 id_a_2 from (select unnest(make_array(5, 6, 
7, 8, 9, 0, 1, 2, 3, 4)) id_a)
+), t1 as (
+    select id_b % 10 id_b_1, id_b + 2 id_b_2 from (select unnest(make_array(0, 
1, 2, 3, 4, 5, 6, 7, 8, 9)) id_b)
+)
+select * from t full join t1 on t.id_a_2 = t1.id_b_1 and t.id_a_1 > t1.id_b_2
+) order by 1, 2, 3, 4
+----
+0 0 NULL NULL
+1 1 NULL NULL
+2 2 NULL NULL
+3 3 NULL NULL
+4 4 NULL NULL
+5 0 0 2
+6 1 1 3
+7 2 2 4
+8 3 3 5
+9 4 4 6
+NULL NULL 5 7
+NULL NULL 6 8
+NULL NULL 7 9
+NULL NULL 8 10
+NULL NULL 9 11
+
 # return sql params back to default values
 statement ok
 set datafusion.optimizer.prefer_hash_join = true;


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to