mbutrovich opened a new issue, #21197:
URL: https://github.com/apache/datafusion/issues/21197

   ### Description
   In `SortMergeJoinExec`, when a LeftMark join has a join filter that 
references buffered (right) side columns, the stream crashes with:
   
   `Arrow error: Invalid argument error: Column 'data' is declared as 
non-nullable but contains null values`
   
   ### Root cause
   
   In `produce_buffered_results` ([stream.rs 
~L1293](https://github.com/apache/datafusion/blob/main/datafusion/physical-plan/src/joins/sort_merge_join/stream.rs#L1293)),
 the `right_indices` `UInt64Array` contains null entries for unmatched streamed 
rows (nulls signal "no match” for the mark column). At 
[L1332](https://github.com/apache/datafusion/blob/main/datafusion/physical-plan/src/joins/sort_merge_join/stream.rs#L1332),
 the filter evaluation path calls `fetch_right_columns_by_idxs` with these 
null-containing indices. The `take` kernel maps null indices to null values in 
the output arrays, but the buffered batch schema declares those columns as 
non-nullable. When `RecordBatch::try_new` validates the filter batch at 
[L1376](https://github.com/apache/datafusion/blob/main/datafusion/physical-plan/src/joins/sort_merge_join/stream.rs#L1376),
 it fails because the array contains nulls but the schema says non-nullable.
   
   ### Reproduction
   
   ```sql
   SET datafusion.optimizer.prefer_hash_join = false;
   WITH t1 AS (
       SELECT value % 1000 as key, value as data
       FROM range(10000) ORDER BY key, data
   ),
   t2 AS (
       SELECT value % 1000 as key, value as data
       FROM range(100000) ORDER BY key, data
   )
   SELECT count(*) FROM t1
   WHERE t1.data < 0
      OR EXISTS (
       SELECT 1 FROM t2
       WHERE t2.key = t1.key
         AND t2.data <> t1.data
         AND t2.data % 100 = 0
   );
   ```
   The `t2.data <> t1.data` becomes a cross-table join filter. The `OR` forces 
a `LeftMark` join. The error surfaces at scale when batches contain unmatched 
rows.
   
   There is also a 
https://github.com/apache/datafusion/blob/main/datafusion/sqllogictest/test_files/sort_merge_join.slt
 that reproduces this (added in the fix PR).
   
   #### Why the semi-anti stream is not affected
   
   The `SemiAntiSortMergeJoinStream` evaluates filters in 
`evaluate_filter_for_inner_row` against actual matched inner rows only, never 
constructing a batch with null indices for unmatched rows. Unmatched rows 
simply get false in the matched bitset without ever touching the filter path.
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to