mbutrovich opened a new pull request, #20806:
URL: https://github.com/apache/datafusion/pull/20806

   ## Which issue does this PR close?
   - N/A.
   
   ## Rationale for this change
   DataFusion's `SortMergeJoinExec` handles semi/anti joins by materializing 
`(outer, inner)` row pairs, applying a filter, then deduplicating with a 
corrected filter mask. Semi/anti joins only need a boolean per outer row — not 
pairs. The pair-based approach allocates unnecessary intermediate batches and 
index arrays to materialize output.
   
   Recent PRs have improved SMJ performance within the existing pair-based 
framework — [#18875](https://github.com/apache/datafusion/pull/18875) 
(BatchCoalescer to reduce concatenation overhead), 
[#20463](https://github.com/apache/datafusion/pull/20463) (zero-copy slice 
instead of take for contiguous indices), 
[#20478](https://github.com/apache/datafusion/pull/20478) (cached row counts to 
avoid O(n) recalculation) — but the fundamental mismatch remains: semi/anti 
joins don't need pairs at all.
   
   ## What changes are included in this PR?
   
   A new `SemiAntiSortMergeJoinExec` operator for `LeftSemi`, `LeftAnti`, 
`RightSemi`, and `RightAnti` sort-merge joins. Instead of materializing row 
pairs, it maintains a per-outer-batch bitset (`BooleanBufferBuilder`) recording 
which outer rows have a matching inner row, then emits output via 
`filter_record_batch`.
   
   **Algorithm:** Merge-scan across two sorted inputs. On key match without 
filter, set matched bits for the outer key group. With filter, buffer the inner 
key group and evaluate the filter as outer_slice × inner_scalar, OR-ing results 
into the bitset with `apply_bitwise_binary_op` (64 bits per iteration). 
Short-circuit when all outer rows in the group are matched.
   
   **Files:**
   - `semi_anti_sort_merge_join/{exec,stream,mod,tests}.rs` — new operator 
(1035 + 1302 lines)
   - `physical_planner.rs` — route semi/anti joins to the new operator when 
`enable_semi_anti_sort_merge_join` is true (default)
   - `config.rs` — session config flag 
`datafusion.optimizer.enable_semi_anti_sort_merge_join`
   - `enforce_distribution.rs` — partition key reordering support for the new 
operator
   - `sort_pushdown.rs` — TODO for future sort pushdown support
   - `join_fuzz.rs` — fuzz tests comparing HJ, NLJ, SMJ, and SaSmj
   - `smj.rs` — benchmark fix: Q13 filter changed to cross-side predicate
   
   **Benchmark results** (best of 3, `dfbench smj`):
   
   | Query | Type | Old (ms) | New (ms) | Speedup |
   |-------|------|----------|----------|---------|
   | Q10 | LEFT SEMI, no filter | 4.79 | 4.27 | 1.12x |
   | Q11 | LEFT SEMI, 1% filter | 3.00 | 2.30 | 1.31x |
   | Q12 | LEFT SEMI, 50% filter | 38.1 | 5.52 | 6.9x |
   | Q13 | LEFT SEMI, 90% filter | 66.9 | 10.2 | 6.6x |
   | Q14 | LEFT ANTI, no filter | 4.96 | 4.13 | 1.20x |
   | Q15 | LEFT ANTI, partial match | 4.80 | 4.22 | 1.14x |
   | Q16 | LEFT ANTI, stress | 1.63 | 1.64 | 1.00x |
   | Q18 | LEFT SEMI, 2% filter | 7.61 | 5.34 | 1.42x |
   | Q19 | LEFT ANTI, partial match | 24.1 | 21.8 | 1.10x |
   
   Non-semi/anti queries are unaffected (same operator).
   
   ## Are these changes tested?
   
   - 21 unit tests covering all join types, filters, nulls, batch boundaries, 
and async re-entry. Compares to current SMJ operator.
   - Fuzz tests (`join_fuzz.rs`) compare output against `HashJoinExec`, 
`NestedLoopJoinExec`, and `SortMergeJoinExec`. Ran thousands of iterations 
locally (randomly seeded not just repeated the same tests)
   - Existing `sort_merge_join.slt` sqllogic tests pass (the operator is 
transparent to the SQL layer)
   
   ## Are there any user-facing changes?
   New session config `datafusion.optimizer.enable_semi_anti_sort_merge_join` 
(default `true`). Set to `false` to use the previous `SortMergeJoinExec` for 
semi/anti joins. Can be toggled via environment variable 
`DATAFUSION_OPTIMIZER_ENABLE_SEMI_ANTI_SORT_MERGE_JOIN=false`.
   
   Known limitations to address in follow-up PRs:
   - Sort pushdown through the new operator is not yet implemented (TODO in 
`sort_pushdown.rs`), so some plans may include an unnecessary `SortExec` above 
the join that the optimizer could otherwise eliminate.
   - The inner key group buffer for filtered joins is unbounded (TODO in 
`stream.rs`). A single highly-skewed join key with many inner rows will buffer 
all of them in memory. This matches the existing `SortMergeJoinExec` behavior 
but could benefit from spill support.
   
   Claude wrote the initial draft of the PR description and aspects of the 
implementation. I understand the implementation though.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to