2010YOUY01 commented on PR #16660: URL: https://github.com/apache/datafusion/pull/16660#issuecomment-3174468965
Please bear with me — I might need more time to digest and outline the tasks required to get this great operator merged. Here are some updates. I looked through the implementation, I think the high-level idea is great! It's definitely efficient for joins with IE predicate. ``` # High-level control flow for PiecewiseMergeJoin Buffer all buffer-side batches for probe_batch in probe_side_input: sort(probe_batch) for probe_row in probe_batch: do linear scan in the buffer side to find the pivot (since both side is sorted, we can remeber the previous position in buffer side, only 1 scan is needed in both side!) output (probe_row x range_in_buffer_side(like [0, pivot])) ``` # Question Theoretically, as long as both the probe and build sides are sorted on the compare key (regardless of ASC or DESC), this operator should be able to execute, right? I think the current implementation is designed to enforce a certain ordering according to the inequality join predicate (by inserting a `SortExec` during planning). However, the buffer-side input might have the opposite order. For example, PMJ requires `t1` to be `ORDER BY c1 ASC`, but the existing order might be `ORDER BY c1 DESC`. It would be more efficient to preserve any existing order, though it would require more logic in the implementation, which can be hard to wrap your head around. I’ll think about whether there’s a simpler way to implement this idea. # Suggestion I think the biggest problem in the implementation right now is that it might be buffering too many output results. In the `ProcessStreamBatch` state, it is currently buffering all the pairs in `buffered_batches × single_right_batch` that pass the join predicate. I think we should change it so that it can yield output incrementally. At the moment, this operator only tracks the memory size of all buffered batches, so the extra memory usage must be around `constant * single_batch_mem_size`. For `single_probe_side_row × buffered_batches`, the extra memory usage for materializing the output result can be `O(buffer_side_total_mem_size)`. For instance, if there are 100k rows in the buffer side and the join predicate is not very selective (e.g., 50%), joining it with a single probe-side row will output a 50k-row batch. One approach to solve this is to add some state management after joining a single row in the probe side: for example, if the join result is `probe_row × buffered_batch[0..pivot]`, we can use a state to incrementally process this range, up to 8192 rows at a time, and put the final result inside the output buffer. Once the output buffer reaches 8192 rows, eagerly output it. Possibly it can jump to a new state for incremental output. This util can be helpful: https://docs.rs/arrow/latest/arrow/compute/struct.BatchCoalescer.html # Summary I still need some time to understand and think about the existence join cases, but so far I suggest to include those two changes in the initial PR, I think they would be very hard to do as follow-up patches (needs significant structural changes) 1. Incremental result output mentioned above 2. Support additional join predicates https://github.com/apache/datafusion/pull/16660#discussion_r2265310584 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For additional commands, e-mail: github-h...@datafusion.apache.org