comphead commented on code in PR #10892: URL: https://github.com/apache/datafusion/pull/10892#discussion_r1638430287
########## datafusion/physical-plan/src/joins/sort_merge_join.rs: ########## @@ -1313,51 +1354,37 @@ impl SMJStream { streamed_columns }; + // Push the streamed/buffered batch joined nulls to the output let null_joined_streamed_batch = RecordBatch::try_new(self.schema.clone(), columns.clone())?; self.output_record_batches.push(null_joined_streamed_batch); - // For full join, we also need to output the null joined rows from the buffered side + // For full join, we also need to output the null joined rows from the buffered side. + // Usually this is done by `freeze_buffered`. However, if a buffered row is joined with + // streamed side, it won't be outputted by `freeze_buffered`. + // We need to check if a buffered row is joined with streamed side and output. + // If it is joined with streamed side, but finally fails on the join filter, + // we need to output it with nulls as streamed side. if matches!(self.join_type, JoinType::Full) { - // Handle not mask for buffered side further. - // For buffered side, we want to output the rows that are not null joined with - // the streamed side. i.e. the rows that are not null in the `buffered_indices`. - let not_mask = if let Some(nulls) = buffered_indices.nulls() { - let mask = not_mask.values() & nulls.inner(); - BooleanArray::new(mask, None) - } else { - not_mask - }; - - let null_joined_batch = - compute::filter_record_batch(&output_batch, ¬_mask)?; - - let mut streamed_columns = self - .streamed_schema - .fields() - .iter() - .map(|f| { - new_null_array( - f.data_type(), - null_joined_batch.num_rows(), - ) - }) - .collect::<Vec<_>>(); - - let buffered_columns = null_joined_batch - .columns() - .iter() - .skip(streamed_columns_length) - .cloned() - .collect::<Vec<_>>(); - - streamed_columns.extend(buffered_columns); - - let null_joined_buffered_batch = RecordBatch::try_new( - self.schema.clone(), - streamed_columns, - )?; - self.output_record_batches.push(null_joined_buffered_batch); + for i in 0..pre_mask.len() { + let buffered_batch = &mut self.buffered_data.batches + [chunk.buffered_batch_idx.unwrap()]; + let buffered_index = buffered_indices.value(i); + + if !pre_mask.value(i) { + // For a buffered row that is joined with streamed side but failed on the join filter, Review Comment: ```suggestion // For a buffered row that is joined with streamed side but doesn't satisfy the join filter, ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For additional commands, e-mail: github-h...@datafusion.apache.org