mbutrovich opened a new issue, #21197: URL: https://github.com/apache/datafusion/issues/21197
### Description In `SortMergeJoinExec`, when a LeftMark join has a join filter that references buffered (right) side columns, the stream crashes with: `Arrow error: Invalid argument error: Column 'data' is declared as non-nullable but contains null values` ### Root cause In `produce_buffered_results` ([stream.rs ~L1293](https://github.com/apache/datafusion/blob/main/datafusion/physical-plan/src/joins/sort_merge_join/stream.rs#L1293)), the `right_indices` `UInt64Array` contains null entries for unmatched streamed rows (nulls signal "no match” for the mark column). At [L1332](https://github.com/apache/datafusion/blob/main/datafusion/physical-plan/src/joins/sort_merge_join/stream.rs#L1332), the filter evaluation path calls `fetch_right_columns_by_idxs` with these null-containing indices. The `take` kernel maps null indices to null values in the output arrays, but the buffered batch schema declares those columns as non-nullable. When `RecordBatch::try_new` validates the filter batch at [L1376](https://github.com/apache/datafusion/blob/main/datafusion/physical-plan/src/joins/sort_merge_join/stream.rs#L1376), it fails because the array contains nulls but the schema says non-nullable. ### Reproduction ```sql SET datafusion.optimizer.prefer_hash_join = false; WITH t1 AS ( SELECT value % 1000 as key, value as data FROM range(10000) ORDER BY key, data ), t2 AS ( SELECT value % 1000 as key, value as data FROM range(100000) ORDER BY key, data ) SELECT count(*) FROM t1 WHERE t1.data < 0 OR EXISTS ( SELECT 1 FROM t2 WHERE t2.key = t1.key AND t2.data <> t1.data AND t2.data % 100 = 0 ); ``` The `t2.data <> t1.data` becomes a cross-table join filter. The `OR` forces a `LeftMark` join. The error surfaces at scale when batches contain unmatched rows. There is also a https://github.com/apache/datafusion/blob/main/datafusion/sqllogictest/test_files/sort_merge_join.slt that reproduces this (added in the fix PR). #### Why the semi-anti stream is not affected The `SemiAntiSortMergeJoinStream` evaluates filters in `evaluate_filter_for_inner_row` against actual matched inner rows only, never constructing a batch with null indices for unmatched rows. Unmatched rows simply get false in the matched bitset without ever touching the filter path. ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
