mbutrovich opened a new pull request, #21184:
URL: https://github.com/apache/datafusion/pull/21184

   Draft while I leave the fuzz tests running overnight.
   
   ## Which issue does this PR close?
   
   Partially addresses #20910.
   
   ## Rationale for this change
   
   Sort-merge join with a filter on outer joins (LEFT/RIGHT/FULL) runs 
`process_filtered_batches()` on every key transition in the Init state. With 
near-unique keys (1:1 cardinality), this means running the full deferred 
filtering pipeline (concat + `get_corrected_filter_mask` + 
`filter_record_batch_by_join_type`) once per row — making filtered 
LEFT/RIGHT/FULL **55x slower** than INNER for 10M unique keys.
   
   Additionally, mark join logic in `SortMergeJoinStream` materializes full 
`(streamed, buffered)` pairs only to discard most of them via 
`get_corrected_filter_mask()`. Mark joins are structurally identical to semi 
joins (one output row per outer row with a boolean result) and belong in 
`SemiAntiMarkSortMergeJoinStream`, which avoids pair materialization entirely 
using a per-outer-batch bitset.
   
   ## What changes are included in this PR?
   
   Three areas of improvement, building on the specialized semi/anti stream 
from #20806:
   
   **1. Move mark joins to `SemiAntiMarkSortMergeJoinStream`**
   - Rename `semi_anti_sort_merge_join` → `semi_anti_mark_sort_merge_join`
   - Add `is_mark` flag; `emit_outer_batch()` emits all rows with the match 
bitset as a boolean column (vs semi's filter / anti's invert-and-filter)
   - Route `LeftMark`/`RightMark` from `SortMergeJoinExec::execute()` to the 
renamed stream
   - Remove all mark-specific logic from `SortMergeJoinStream` 
(`mark_row_as_match`, `is_not_null` column generation, mark arms in filter 
correction)
   
   **2. Batch filter evaluation in `freeze_streamed()`**
   - Split `freeze_streamed()` into null-joined classification + 
`freeze_streamed_matched()` for batched materialization
   - Collect indices across chunks, materialize left/right columns once using 
tiered Arrow kernels (`slice` → `take` → `interleave`)
   - Single `RecordBatch` construction and single `expression.evaluate()` per 
freeze instead of per chunk
   - Vectorize `append_filter_metadata()` using builder `extend()` instead of 
per-element loop
   
   **3. Batch deferred filtering in Init state**
   - Gate `process_filtered_batches()` on accumulated rows >= `batch_size` 
instead of running on every Init entry
   - Accumulated data bounded to ~2×batch_size (one from 
`freeze_dequeuing_buffered`, one accumulating toward next freeze) — does not 
reintroduce unbounded buffering fixed by PR #20482
   - `Exhausted` state flushes any remainder
   
   **Cleanup:**
   - `SortMergeJoinStream` now handles only Inner/Left/Right/Full — all 
semi/anti/mark branching removed
   - `get_corrected_filter_mask()`: merge identical Left/Right/Full branches; 
add null-metadata passthrough for already-null-joined rows
   - `filter_record_batch_by_join_type()`: rewrite from `filter(true) + 
filter(false) + concat` to `zip()` for in-place null-joining — preserves row 
ordering and removes `create_null_joined_batch()` entirely
   - `filter_record_batch_by_join_type()`: use `compute::filter()` directly on 
`BooleanArray` instead of wrapping in temporary `RecordBatch`
   
   ## Benchmarks
   
   `cargo run --release --bin dfbench -- smj`
   
   | Query | Join Type | Rows | Keys | Filter | Main (ms) | PR (ms) | Speedup |
     
|-------|-----------|------|------|--------|-----------|---------|---------|
     | Q1 | INNER | 100K×100K | 1:1 | — | 1.7 | 1.7 | 1.0x |
     | Q2 | INNER | 100K×1M | 1:10 | — | 12.2 | 11.6 | 1.0x |
     | Q3 | INNER | 1M×1M | 1:100 | — | 64.2 | 64.9 | 1.0x |
     | Q4 | INNER | 100K×1M | 1:10 | 1% | 2.2 | 2.2 | 1.0x |
     | Q5 | INNER | 1M×1M | 1:100 | 10% | 12.8 | 12.7 | 1.0x |
     | Q6 | LEFT | 100K×1M | 1:10 | — | 11.1 | 11.3 | 1.0x |
     | Q7 | LEFT | 100K×1M | 1:10 | 50% | 13.4 | 14.1 | 1.0x |
     | Q8 | FULL | 100K×100K | 1:10 | — | 2.2 | 2.2 | 1.0x |
     | Q9 | FULL | 100K×1M | 1:10 | 10% | 14.5 | 14.8 | 1.0x |
     | Q10 | LEFT SEMI | 100K×1M | 1:10 | — | 3.6 | 3.4 | 1.0x |
     | Q11 | LEFT SEMI | 100K×1M | 1:10 | 1% | 2.0 | 2.3 | 1.0x |
     | Q12 | LEFT SEMI | 100K×1M | 1:10 | 50% | 5.1 | 5.4 | 1.0x |
     | Q13 | LEFT SEMI | 100K×1M | 1:10 | 90% | 9.9 | 10.1 | 1.0x |
     | Q14 | LEFT ANTI | 100K×1M | 1:10 | — | 3.5 | 3.7 | 1.0x |
     | Q15 | LEFT ANTI | 100K×1M | 1:10 | partial | 3.7 | 3.5 | 1.0x |
     | Q16 | LEFT ANTI | 100K×100K | 1:1 | — | 1.6 | 1.7 | 1.0x |
     | Q17 | INNER | 100K×5M | 1:50 | 5% | 7.4 | 7.8 | 1.0x |
     | Q18 | LEFT SEMI | 100K×5M | 1:50 | 2% | 5.4 | 5.5 | 1.0x |
     | Q19 | LEFT ANTI | 100K×5M | 1:50 | partial | 21.0 | 21.2 | 1.0x |
     | Q20 | INNER | 1M×10M | 1:100 | GROUP BY | 759 | 761 | 1.0x |
     | Q21 | INNER | 10M×10M | 1:1 | 50% | 181 | 173 | 1.0x |
     | Q22 | LEFT | 10M×10M | 1:1 | 50% | 10,228 | 184 | **55x** |
     | Q23 | FULL | 10M×10M | 1:1 | 50% | 9,884 | 228 | **43x** |
   
   General workload (Q1-Q20, various join types/cardinalities/selectivities): 
no regressions.
   
   ## Are these changes tested?
   
   Yes:                                                                         
                                                                                
                                                                                
                                         
     - 48 SMJ unit tests (`cargo test -p datafusion-physical-plan --lib 
joins::sort_merge_join`)
     - 10 join sqllogictest files (`cargo test -p datafusion-sqllogictest 
--test sqllogictests -- join`)
     - Semi/anti/mark stream tests (`cargo test -p datafusion-physical-plan 
--lib joins::semi_anti_mark_sort_merge_join`)
     - New unit test for mark join with filter via the renamed stream
     - New benchmark queries Q21-Q23: 10M×10M unique keys with 50% join filter 
for INNER/LEFT/FULL — exercises the degenerate case this PR fixes
   
   ## Are there any user-facing changes?
   
   No.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to