neilconway opened a new pull request, #22914:
URL: https://github.com/apache/datafusion/pull/22914

   ## Which issue does this PR close?
   
   - Closes #22854
   
   ## Rationale for this change
   
   Suppose we have A semijoin B, and the optimizer chooses to implement that as 
as a `RightSemi` join. That means we build B and stream A, doing lookups 
against the B hash table. The current implementation does the following:
   
   1. Get the hash values for a chunk of A rows
   2. Lookup all the B hash table entries with matching hash values
   3. Check the (A, B) pairs for true equality to avoid hash collisions; for 
each match, produce a `(probe_idx, build_idx)` pair.
   4. Apply the join filter if there's a non-equijoin filter
   5. Remove duplicates from the list of all `probe_idx` values
   6. Materialize the output `RecordBatch` using the distinct `probe_idx` values
   
   Some of this work is redundant for `RightSemi` joins (and `RightAnti` joins 
as well):
   
   1. We can stop once we hit the first matching B value, rather than walking 
the rest of the chain. This means we can both do less hash table traversal and 
do fewer equality comparisons.
   2. We never need `build_idx` values
   3. We can skip removing duplicates from the `probe_idx` array, since we 
never produced them in the first place
   
   This PR adds an existence-probe fast path that emits each probe row at most
   once and stops each row's hash-chain search at its first key-equal match,
   while keeping key comparison fully vectorized.
   
   Benchmarks:
   ```
   right_semi_d100_h100                 4.977 ms     0.981 ms    -80.3%
   right_anti_d100_h100                 1.892 ms     1.132 ms    -40.2%
   right_semi_d100_h10                  0.886 ms     0.438 ms    -50.6%
   right_anti_d100_h10                  3.281 ms     0.765 ms    -76.7%
   right_semi_d100_h50                  2.754 ms     0.751 ms    -72.7%
   right_anti_d100_h50                  2.739 ms     0.981 ms    -64.2%
   right_semi_d50_h100                  5.062 ms     0.999 ms    -80.3%
   right_anti_d50_h100                  1.921 ms     1.128 ms    -41.3%
   right_semi_d50_h10                   0.890 ms     0.441 ms    -50.5%
   right_anti_d50_h10                   3.311 ms     0.767 ms    -76.8%
   right_semi_d10_h100                  7.314 ms     3.840 ms    -47.5%
   right_anti_d10_h100                  4.260 ms     3.932 ms     -7.7%
   right_semi_d10_h10                   7.604 ms     7.253 ms     -4.6%
   right_anti_d10_h10                  10.111 ms     7.573 ms    -25.1%
   right_semi_d10_h50                   7.234 ms     4.705 ms    -35.0%
   right_anti_d10_h50                   7.305 ms     4.943 ms    -32.3%
   right_semi_fanout100_h1              2.903 ms     0.414 ms    -85.7%
   right_anti_fanout100_h1              6.070 ms     0.935 ms    -84.6%
   right_semi_fanout10_h50             11.007 ms     0.921 ms    -91.6%
   right_anti_fanout10_h50             10.840 ms     1.118 ms    -89.7%
   right_semi_fanout10_h50_hashmap     17.153 ms     2.622 ms    -84.7%
   right_anti_fanout10_h50_hashmap     16.945 ms     2.810 ms    -83.4%
   right_semi_skewed_h50_hashmap        8.621 ms     7.428 ms    -13.8%
   right_anti_skewed_h50_hashmap        8.752 ms     7.765 ms    -11.3%
   right_semi_utf8_h50                 11.757 ms     8.427 ms    -28.3%
   right_anti_utf8_h50                 11.789 ms     8.690 ms    -26.3%
   right_semi_utf8_fanout10_h50        53.618 ms     6.959 ms    -87.0%
   right_anti_utf8_fanout10_h50        53.025 ms     7.064 ms    -86.7%
   ```
   
   As expected, this is a massive win for high fanout scenarios (in which we 
would previously have produced many intermediate duplicates), but it's still a 
win across the board.
   
   ## What changes are included in this PR?
   
   ** Core change **
   
   * Introduce a new "existence probe" code path, which we use for `RightSemi` 
and `RightAnti`, provided the join has no join filter and the build side is 
non-empty.
   * For `HashMap`s, the existence probe walks candidate hash chains in 
lockstep. In each round, we gather the next B candidates for every 
not-yet-matched probe row and do a single vectorized comparison. Any rows that 
did not match are advanced to the next round; since hash collisions will be 
very rare in practice, this will involve a single round almost always.
   * For `ArrayMap`s, we can do a single bucket probe per row.
   * Fast paths for several reasonably common scenarios: (1) every row in the 
probe chunk matches, (2) all `HashMap` chains are of length 1, and (3) emitted 
probe indices are a contiguous run.
   
   ** Supporting changes **
   
   * Refactor and share `is_contiguous_range` from SMJ
   * Add an `ExistenceProbe` trait. `JoinHashMapU32`/`U64` implement it; 
`PruningJoinHashMap` does not
   * Add a `ProbeScratch` abstraction to reuse scratch space between probe 
chunks
   * Refactor `equal_rows_arr` so we can reuse the vectorized comparison logic
   * Significantly broaden semi/anti join benchmark coverage
   * Add unit tests
   
   ## Are these changes tested?
   
   Yes; new tests added.
   
   ## Are there any user-facing changes?
   
   No.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to