comphead commented on code in PR #10892:
URL: https://github.com/apache/datafusion/pull/10892#discussion_r1638430287


##########
datafusion/physical-plan/src/joins/sort_merge_join.rs:
##########
@@ -1313,51 +1354,37 @@ impl SMJStream {
                             streamed_columns
                         };
 
+                        // Push the streamed/buffered batch joined nulls to 
the output
                         let null_joined_streamed_batch =
                             RecordBatch::try_new(self.schema.clone(), 
columns.clone())?;
                         
self.output_record_batches.push(null_joined_streamed_batch);
 
-                        // For full join, we also need to output the null 
joined rows from the buffered side
+                        // For full join, we also need to output the null 
joined rows from the buffered side.
+                        // Usually this is done by `freeze_buffered`. However, 
if a buffered row is joined with
+                        // streamed side, it won't be outputted by 
`freeze_buffered`.
+                        // We need to check if a buffered row is joined with 
streamed side and output.
+                        // If it is joined with streamed side, but finally 
fails on the join filter,
+                        // we need to output it with nulls as streamed side.
                         if matches!(self.join_type, JoinType::Full) {
-                            // Handle not mask for buffered side further.
-                            // For buffered side, we want to output the rows 
that are not null joined with
-                            // the streamed side. i.e. the rows that are not 
null in the `buffered_indices`.
-                            let not_mask = if let Some(nulls) = 
buffered_indices.nulls() {
-                                let mask = not_mask.values() & nulls.inner();
-                                BooleanArray::new(mask, None)
-                            } else {
-                                not_mask
-                            };
-
-                            let null_joined_batch =
-                                compute::filter_record_batch(&output_batch, 
&not_mask)?;
-
-                            let mut streamed_columns = self
-                                .streamed_schema
-                                .fields()
-                                .iter()
-                                .map(|f| {
-                                    new_null_array(
-                                        f.data_type(),
-                                        null_joined_batch.num_rows(),
-                                    )
-                                })
-                                .collect::<Vec<_>>();
-
-                            let buffered_columns = null_joined_batch
-                                .columns()
-                                .iter()
-                                .skip(streamed_columns_length)
-                                .cloned()
-                                .collect::<Vec<_>>();
-
-                            streamed_columns.extend(buffered_columns);
-
-                            let null_joined_buffered_batch = 
RecordBatch::try_new(
-                                self.schema.clone(),
-                                streamed_columns,
-                            )?;
-                            
self.output_record_batches.push(null_joined_buffered_batch);
+                            for i in 0..pre_mask.len() {
+                                let buffered_batch = &mut 
self.buffered_data.batches
+                                    [chunk.buffered_batch_idx.unwrap()];
+                                let buffered_index = buffered_indices.value(i);
+
+                                if !pre_mask.value(i) {
+                                    // For a buffered row that is joined with 
streamed side but failed on the join filter,

Review Comment:
   ```suggestion
                                       // For a buffered row that is joined 
with streamed side but doesn't satisfy the join filter,
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org
For additional commands, e-mail: github-h...@datafusion.apache.org

Reply via email to