echauchot commented on issue #24035:
URL: https://github.com/apache/beam/issues/24035#issuecomment-1323356158

   > @echauchot Thanks, here's my findings / thinking so far.
   > 
   > First, let's look at the side-input problem from a different perspective. 
Semantically it's in fact a co-group / join by window problem. Now, how to 
implement that efficiently in Spark considering we're dealing with a 
potentially very large dataset and a comparably small side-input.
   > 
   > Here's a couple of possible ways:
   > 
   > * Naively implement it as co-group / join by window. Though, that would 
mean shuffling both the large dataset as well as the side-input.
   > * We can be smarter of course, and implement it as a broadcast join. That 
avoids shuffling the large dataset. Instead the smaller side-input is send to 
all partitions of the larger one and the join happens locally in place. Sounds 
pretty much like what we want, right?
   
   Agree
   
   > * And of course, we can collect the side-input on the driver and broadcast 
it as variable. That way we can also do a local join / lookup. Though, this has 
the disadvantage that we will need to evaluate (and broadcast) the side-input 
ahead of time during the translation of the Beam pipeline
   > 
   > To also mention Spark temp views, these are in fact just named `Dataset` 
for usage in SQL. All the above applies to them as well. Their data is 
distributed the same way across the cluster with the same implications.
   > 
   > Now, looking more closely, a couple of findings:
   > 
   > * To join on windows we have to first explode the dataset, so it consists 
rows of single windowed values. That's potentially costly.
   
   yes, better to use another approach
   
   > * Broadcast joins in Spark are implemented as follows: 1) Collect data on 
the driver. 2) Broadcast it. Turns out this isn't much different from the 
manual broadcast.
   
   So then doing the manual broadcast seems better as it will allow you proper 
control and potential optimizations on the broadcast. Hence what you chose in 
the related PR.
   
   > * Side-inputs are typically build involving a CombineGlobally, meaning we 
need a global reduce step leaving just 1 partition. If we could do this final 
reduce step on the driver, the data would be already available to be 
broadcasted. Unfortunately there's additional (primitive) processing steps 
in-between that make this difficult.  
   
   Yes, that would have been a good improvement to avoid collecting to the 
driver. Let's see, maybe in a next step.
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to