2010YOUY01 commented on PR #16660:
URL: https://github.com/apache/datafusion/pull/16660#issuecomment-3174468965
Please bear with me — I might need more time to digest and outline the tasks
required to get this great operator merged. Here are some updates.
I looked through the implementation, I think the high-level idea is great!
It's definitely efficient for joins with IE predicate.
```
# High-level control flow for PiecewiseMergeJoin
Buffer all buffer-side batches
for probe_batch in probe_side_input:
sort(probe_batch)
for probe_row in probe_batch:
do linear scan in the buffer side to find the pivot
(since both side is sorted, we can remeber the previous
position in buffer side, only 1 scan is needed in both side!)
output (probe_row x range_in_buffer_side(like [0, pivot]))
```
# Question
Theoretically, as long as both the probe and build sides are sorted on the
compare key (regardless of ASC or DESC), this operator should be able to
execute, right?
I think the current implementation is designed to enforce a certain ordering
according to the inequality join predicate (by inserting a `SortExec` during
planning).
However, the buffer-side input might have the opposite order. For example,
PMJ requires `t1` to be `ORDER BY c1 ASC`, but the existing order might be
`ORDER BY c1 DESC`.
It would be more efficient to preserve any existing order, though it would
require more logic in the implementation, which can be hard to wrap your head
around. I’ll think about whether there’s a simpler way to implement this idea.
# Suggestion
I think the biggest problem in the implementation right now is that it might
be buffering too many output results.
In the `ProcessStreamBatch` state, it is currently buffering all the pairs
in `buffered_batches × single_right_batch` that pass the join predicate. I
think we should change it so that it can yield output incrementally.
At the moment, this operator only tracks the memory size of all buffered
batches, so the extra memory usage must be around `constant *
single_batch_mem_size`.
For `single_probe_side_row × buffered_batches`, the extra memory usage for
materializing the output result can be `O(buffer_side_total_mem_size)`. For
instance, if there are 100k rows in the buffer side and the join predicate is
not very selective (e.g., 50%), joining it with a single probe-side row will
output a 50k-row batch.
One approach to solve this is to add some state management after joining a
single row in the probe side: for example, if the join result is `probe_row ×
buffered_batch[0..pivot]`, we can use a state to incrementally process this
range, up to 8192 rows at a time, and put the final result inside the output
buffer. Once the output buffer reaches 8192 rows, eagerly output it. Possibly
it can jump to a new state for incremental output.
This util can be helpful:
https://docs.rs/arrow/latest/arrow/compute/struct.BatchCoalescer.html
# Summary
I still need some time to understand and think about the existence join
cases, but so far I suggest to include those two changes in the initial PR, I
think they would be very hard to do as follow-up patches (needs significant
structural changes)
1. Incremental result output mentioned above
2. Support additional join predicates
https://github.com/apache/datafusion/pull/16660#discussion_r2265310584
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]