viirya commented on code in PR #12159:
URL: https://github.com/apache/datafusion/pull/12159#discussion_r1730441508
##########
datafusion/physical-plan/src/joins/sort_merge_join.rs:
##########
@@ -1474,6 +1474,12 @@ impl SMJStream {
[chunk.buffered_batch_idx.unwrap()];
for i in 0..pre_mask.len() {
+ // If the buffered row is not joined with
streamed side,
+ // skip it.
+ if buffered_indices.is_null(i) {
+ continue;
+ }
+
Review Comment:
I was trying to duplicate the Spark SQL test but the test failure depends on
the data distribution. I cannot make it as same as Spark case.
The test case looks like
```
select * from (
with left as (
select N, L from (select unnest(make_array(1, 2, 3, 4)) N,
unnest(make_array('A', 'B', 'C', 'D')) L) where N <= 4
), right as (
select N, L from (select unnest(make_array(1, 2, 3, 4)) N,
unnest(make_array('A', 'B', 'C', 'D')) L) where N >= 3
)
select * from left full join right on left.N = right.N and left.N != 3
) order by 1, 2, 3, 4;
```
I want those values to be in same partition to run sort merge join. So the
joined batch looks like:
```
1 A null null
2 B null null # this and above row wrongly considered passing join filter
for buffered row [3, C]
3 C 3 C # join filter => false
4 D 4 D # join filter => true
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]