mbutrovich opened a new pull request, #20806: URL: https://github.com/apache/datafusion/pull/20806
## Which issue does this PR close? - N/A. ## Rationale for this change DataFusion's `SortMergeJoinExec` handles semi/anti joins by materializing `(outer, inner)` row pairs, applying a filter, then deduplicating with a corrected filter mask. Semi/anti joins only need a boolean per outer row — not pairs. The pair-based approach allocates unnecessary intermediate batches and index arrays to materialize output. Recent PRs have improved SMJ performance within the existing pair-based framework — [#18875](https://github.com/apache/datafusion/pull/18875) (BatchCoalescer to reduce concatenation overhead), [#20463](https://github.com/apache/datafusion/pull/20463) (zero-copy slice instead of take for contiguous indices), [#20478](https://github.com/apache/datafusion/pull/20478) (cached row counts to avoid O(n) recalculation) — but the fundamental mismatch remains: semi/anti joins don't need pairs at all. ## What changes are included in this PR? A new `SemiAntiSortMergeJoinExec` operator for `LeftSemi`, `LeftAnti`, `RightSemi`, and `RightAnti` sort-merge joins. Instead of materializing row pairs, it maintains a per-outer-batch bitset (`BooleanBufferBuilder`) recording which outer rows have a matching inner row, then emits output via `filter_record_batch`. **Algorithm:** Merge-scan across two sorted inputs. On key match without filter, set matched bits for the outer key group. With filter, buffer the inner key group and evaluate the filter as outer_slice × inner_scalar, OR-ing results into the bitset with `apply_bitwise_binary_op` (64 bits per iteration). Short-circuit when all outer rows in the group are matched. **Files:** - `semi_anti_sort_merge_join/{exec,stream,mod,tests}.rs` — new operator (1035 + 1302 lines) - `physical_planner.rs` — route semi/anti joins to the new operator when `enable_semi_anti_sort_merge_join` is true (default) - `config.rs` — session config flag `datafusion.optimizer.enable_semi_anti_sort_merge_join` - `enforce_distribution.rs` — partition key reordering support for the new operator - `sort_pushdown.rs` — TODO for future sort pushdown support - `join_fuzz.rs` — fuzz tests comparing HJ, NLJ, SMJ, and SaSmj - `smj.rs` — benchmark fix: Q13 filter changed to cross-side predicate **Benchmark results** (best of 3, `dfbench smj`): | Query | Type | Old (ms) | New (ms) | Speedup | |-------|------|----------|----------|---------| | Q10 | LEFT SEMI, no filter | 4.79 | 4.27 | 1.12x | | Q11 | LEFT SEMI, 1% filter | 3.00 | 2.30 | 1.31x | | Q12 | LEFT SEMI, 50% filter | 38.1 | 5.52 | 6.9x | | Q13 | LEFT SEMI, 90% filter | 66.9 | 10.2 | 6.6x | | Q14 | LEFT ANTI, no filter | 4.96 | 4.13 | 1.20x | | Q15 | LEFT ANTI, partial match | 4.80 | 4.22 | 1.14x | | Q16 | LEFT ANTI, stress | 1.63 | 1.64 | 1.00x | | Q18 | LEFT SEMI, 2% filter | 7.61 | 5.34 | 1.42x | | Q19 | LEFT ANTI, partial match | 24.1 | 21.8 | 1.10x | Non-semi/anti queries are unaffected (same operator). ## Are these changes tested? - 21 unit tests covering all join types, filters, nulls, batch boundaries, and async re-entry. Compares to current SMJ operator. - Fuzz tests (`join_fuzz.rs`) compare output against `HashJoinExec`, `NestedLoopJoinExec`, and `SortMergeJoinExec`. Ran thousands of iterations locally (randomly seeded not just repeated the same tests) - Existing `sort_merge_join.slt` sqllogic tests pass (the operator is transparent to the SQL layer) ## Are there any user-facing changes? New session config `datafusion.optimizer.enable_semi_anti_sort_merge_join` (default `true`). Set to `false` to use the previous `SortMergeJoinExec` for semi/anti joins. Can be toggled via environment variable `DATAFUSION_OPTIMIZER_ENABLE_SEMI_ANTI_SORT_MERGE_JOIN=false`. Known limitations to address in follow-up PRs: - Sort pushdown through the new operator is not yet implemented (TODO in `sort_pushdown.rs`), so some plans may include an unnecessary `SortExec` above the join that the optimizer could otherwise eliminate. - The inner key group buffer for filtered joins is unbounded (TODO in `stream.rs`). A single highly-skewed join key with many inner rows will buffer all of them in memory. This matches the existing `SortMergeJoinExec` behavior but could benefit from spill support. Claude wrote the initial draft of the PR description and aspects of the implementation. I understand the implementation though. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
