viirya commented on code in PR #9080:
URL: https://github.com/apache/arrow-datafusion/pull/9080#discussion_r1478770963
##########
datafusion/physical-plan/src/joins/sort_merge_join.rs:
##########
@@ -1121,16 +1151,138 @@ impl SMJStream {
.collect::<Vec<_>>()
};
+ let streamed_columns_length = streamed_columns.len();
+ let buffered_columns_length = buffered_columns.len();
+
+ // Prepare the columns we apply join filter on later.
+ // Only for joined rows between streamed and buffered.
+ let filter_columns = if chunk.buffered_batch_idx.is_some() {
+ if matches!(self.join_type, JoinType::Right) {
+ get_filter_column(&self.filter, &buffered_columns,
&streamed_columns)
+ } else {
+ get_filter_column(&self.filter, &streamed_columns,
&buffered_columns)
+ }
+ } else {
+ vec![]
+ };
+
let columns = if matches!(self.join_type, JoinType::Right) {
- buffered_columns.extend(streamed_columns);
+ buffered_columns.extend(streamed_columns.clone());
buffered_columns
} else {
streamed_columns.extend(buffered_columns);
streamed_columns
};
- self.output_record_batches
- .push(RecordBatch::try_new(self.schema.clone(), columns)?);
+ let output_batch =
+ RecordBatch::try_new(self.schema.clone(), columns.clone())?;
+
+ // Apply join filter if any
+ if !filter_columns.is_empty() {
+ if let Some(f) = &self.filter {
+ // Construct batch with only filter columns
+ let filter_batch = RecordBatch::try_new(
+ Arc::new(f.schema().clone()),
+ filter_columns,
+ )?;
+
+ let filter_result = f
+ .expression()
+ .evaluate(&filter_batch)?
+ .into_array(filter_batch.num_rows())?;
+
+ // The selection mask of the filter
+ let mask =
datafusion_common::cast::as_boolean_array(&filter_result)?;
+
+ // Push the filtered batch to the output
+ let filtered_batch =
+ compute::filter_record_batch(&output_batch, mask)?;
+ self.output_record_batches.push(filtered_batch);
+
+ // For outer joins, we need to push the null joined rows
to the output.
+ if matches!(
+ self.join_type,
+ JoinType::Left | JoinType::Right | JoinType::Full
+ ) {
+ // The reverse of the selection mask, which is for
null joined rows
Review Comment:
The rows reaches here are all passed the equijoin predicates already (their
`buffered_batch_idx` is `Some`). "null joined rows" here means the rows not
pass the join filter, and we are going to join them (left or right side) with
null. Let me add a few words to make it clear.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]