mbutrovich opened a new pull request, #21184:
URL: https://github.com/apache/datafusion/pull/21184
Draft while I leave the fuzz tests running overnight.
## Which issue does this PR close?
Partially addresses #20910.
## Rationale for this change
Sort-merge join with a filter on outer joins (LEFT/RIGHT/FULL) runs
`process_filtered_batches()` on every key transition in the Init state. With
near-unique keys (1:1 cardinality), this means running the full deferred
filtering pipeline (concat + `get_corrected_filter_mask` +
`filter_record_batch_by_join_type`) once per row — making filtered
LEFT/RIGHT/FULL **55x slower** than INNER for 10M unique keys.
Additionally, mark join logic in `SortMergeJoinStream` materializes full
`(streamed, buffered)` pairs only to discard most of them via
`get_corrected_filter_mask()`. Mark joins are structurally identical to semi
joins (one output row per outer row with a boolean result) and belong in
`SemiAntiMarkSortMergeJoinStream`, which avoids pair materialization entirely
using a per-outer-batch bitset.
## What changes are included in this PR?
Three areas of improvement, building on the specialized semi/anti stream
from #20806:
**1. Move mark joins to `SemiAntiMarkSortMergeJoinStream`**
- Rename `semi_anti_sort_merge_join` → `semi_anti_mark_sort_merge_join`
- Add `is_mark` flag; `emit_outer_batch()` emits all rows with the match
bitset as a boolean column (vs semi's filter / anti's invert-and-filter)
- Route `LeftMark`/`RightMark` from `SortMergeJoinExec::execute()` to the
renamed stream
- Remove all mark-specific logic from `SortMergeJoinStream`
(`mark_row_as_match`, `is_not_null` column generation, mark arms in filter
correction)
**2. Batch filter evaluation in `freeze_streamed()`**
- Split `freeze_streamed()` into null-joined classification +
`freeze_streamed_matched()` for batched materialization
- Collect indices across chunks, materialize left/right columns once using
tiered Arrow kernels (`slice` → `take` → `interleave`)
- Single `RecordBatch` construction and single `expression.evaluate()` per
freeze instead of per chunk
- Vectorize `append_filter_metadata()` using builder `extend()` instead of
per-element loop
**3. Batch deferred filtering in Init state**
- Gate `process_filtered_batches()` on accumulated rows >= `batch_size`
instead of running on every Init entry
- Accumulated data bounded to ~2×batch_size (one from
`freeze_dequeuing_buffered`, one accumulating toward next freeze) — does not
reintroduce unbounded buffering fixed by PR #20482
- `Exhausted` state flushes any remainder
**Cleanup:**
- `SortMergeJoinStream` now handles only Inner/Left/Right/Full — all
semi/anti/mark branching removed
- `get_corrected_filter_mask()`: merge identical Left/Right/Full branches;
add null-metadata passthrough for already-null-joined rows
- `filter_record_batch_by_join_type()`: rewrite from `filter(true) +
filter(false) + concat` to `zip()` for in-place null-joining — preserves row
ordering and removes `create_null_joined_batch()` entirely
- `filter_record_batch_by_join_type()`: use `compute::filter()` directly on
`BooleanArray` instead of wrapping in temporary `RecordBatch`
## Benchmarks
`cargo run --release --bin dfbench -- smj`
| Query | Join Type | Rows | Keys | Filter | Main (ms) | PR (ms) | Speedup |
|-------|-----------|------|------|--------|-----------|---------|---------|
| Q1 | INNER | 100K×100K | 1:1 | — | 1.7 | 1.7 | 1.0x |
| Q2 | INNER | 100K×1M | 1:10 | — | 12.2 | 11.6 | 1.0x |
| Q3 | INNER | 1M×1M | 1:100 | — | 64.2 | 64.9 | 1.0x |
| Q4 | INNER | 100K×1M | 1:10 | 1% | 2.2 | 2.2 | 1.0x |
| Q5 | INNER | 1M×1M | 1:100 | 10% | 12.8 | 12.7 | 1.0x |
| Q6 | LEFT | 100K×1M | 1:10 | — | 11.1 | 11.3 | 1.0x |
| Q7 | LEFT | 100K×1M | 1:10 | 50% | 13.4 | 14.1 | 1.0x |
| Q8 | FULL | 100K×100K | 1:10 | — | 2.2 | 2.2 | 1.0x |
| Q9 | FULL | 100K×1M | 1:10 | 10% | 14.5 | 14.8 | 1.0x |
| Q10 | LEFT SEMI | 100K×1M | 1:10 | — | 3.6 | 3.4 | 1.0x |
| Q11 | LEFT SEMI | 100K×1M | 1:10 | 1% | 2.0 | 2.3 | 1.0x |
| Q12 | LEFT SEMI | 100K×1M | 1:10 | 50% | 5.1 | 5.4 | 1.0x |
| Q13 | LEFT SEMI | 100K×1M | 1:10 | 90% | 9.9 | 10.1 | 1.0x |
| Q14 | LEFT ANTI | 100K×1M | 1:10 | — | 3.5 | 3.7 | 1.0x |
| Q15 | LEFT ANTI | 100K×1M | 1:10 | partial | 3.7 | 3.5 | 1.0x |
| Q16 | LEFT ANTI | 100K×100K | 1:1 | — | 1.6 | 1.7 | 1.0x |
| Q17 | INNER | 100K×5M | 1:50 | 5% | 7.4 | 7.8 | 1.0x |
| Q18 | LEFT SEMI | 100K×5M | 1:50 | 2% | 5.4 | 5.5 | 1.0x |
| Q19 | LEFT ANTI | 100K×5M | 1:50 | partial | 21.0 | 21.2 | 1.0x |
| Q20 | INNER | 1M×10M | 1:100 | GROUP BY | 759 | 761 | 1.0x |
| Q21 | INNER | 10M×10M | 1:1 | 50% | 181 | 173 | 1.0x |
| Q22 | LEFT | 10M×10M | 1:1 | 50% | 10,228 | 184 | **55x** |
| Q23 | FULL | 10M×10M | 1:1 | 50% | 9,884 | 228 | **43x** |
General workload (Q1-Q20, various join types/cardinalities/selectivities):
no regressions.
## Are these changes tested?
Yes:
- 48 SMJ unit tests (`cargo test -p datafusion-physical-plan --lib
joins::sort_merge_join`)
- 10 join sqllogictest files (`cargo test -p datafusion-sqllogictest
--test sqllogictests -- join`)
- Semi/anti/mark stream tests (`cargo test -p datafusion-physical-plan
--lib joins::semi_anti_mark_sort_merge_join`)
- New unit test for mark join with filter via the renamed stream
- New benchmark queries Q21-Q23: 10M×10M unique keys with 50% join filter
for INNER/LEFT/FULL — exercises the degenerate case this PR fixes
## Are there any user-facing changes?
No.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]