2010YOUY01 commented on PR #16660:
URL: https://github.com/apache/datafusion/pull/16660#issuecomment-3174468965

   Please bear with me — I might need more time to digest and outline the tasks 
required to get this great operator merged. Here are some updates.
   
   I looked through the implementation, I think the high-level idea is great! 
It's definitely efficient for joins with IE predicate.
   
   ```
   # High-level control flow for PiecewiseMergeJoin
   
   Buffer all buffer-side batches
   
   for probe_batch in probe_side_input:
        sort(probe_batch)
       for probe_row in probe_batch:
                do linear scan in the buffer side to find the pivot
                (since both side is sorted, we can remeber the previous 
position in buffer side, only 1 scan is needed in both side!)
                output (probe_row x range_in_buffer_side(like [0, pivot]))
   ```
   # Question
   Theoretically, as long as both the probe and build sides are sorted on the 
compare key (regardless of ASC or DESC), this operator should be able to 
execute, right?  
   I think the current implementation is designed to enforce a certain ordering 
according to the inequality join predicate (by inserting a `SortExec` during 
planning).  
   However, the buffer-side input might have the opposite order. For example, 
PMJ requires `t1` to be `ORDER BY c1 ASC`, but the existing order might be 
`ORDER BY c1 DESC`.  
   
   It would be more efficient to preserve any existing order, though it would 
require more logic in the implementation, which can be hard to wrap your head 
around. I’ll think about whether there’s a simpler way to implement this idea.
   
   # Suggestion
   I think the biggest problem in the implementation right now is that it might 
be buffering too many output results.  
   In the `ProcessStreamBatch` state, it is currently buffering all the pairs 
in `buffered_batches × single_right_batch` that pass the join predicate. I 
think we should change it so that it can yield output incrementally.  
   
   At the moment, this operator only tracks the memory size of all buffered 
batches, so the extra memory usage must be around `constant * 
single_batch_mem_size`.  
   For `single_probe_side_row × buffered_batches`, the extra memory usage for 
materializing the output result can be `O(buffer_side_total_mem_size)`. For 
instance, if there are 100k rows in the buffer side and the join predicate is 
not very selective (e.g., 50%), joining it with a single probe-side row will 
output a 50k-row batch.  
   
   One approach to solve this is to add some state management after joining a 
single row in the probe side: for example, if the join result is `probe_row × 
buffered_batch[0..pivot]`, we can use a state to incrementally process this 
range, up to 8192 rows at a time, and put the final result inside the output 
buffer. Once the output buffer reaches 8192 rows, eagerly output it. Possibly 
it can jump to a new state for incremental output.
   
   This util can be helpful: 
https://docs.rs/arrow/latest/arrow/compute/struct.BatchCoalescer.html
   
   # Summary
   I still need some time to understand and think about the existence join 
cases, but so far I suggest to include those two changes in the initial PR, I 
think they would be very hard to do as follow-up patches (needs significant 
structural changes)
   1. Incremental result output mentioned above
   2. Support additional join predicates 
https://github.com/apache/datafusion/pull/16660#discussion_r2265310584


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org
For additional commands, e-mail: github-h...@datafusion.apache.org

Reply via email to