echauchot commented on issue #24035: URL: https://github.com/apache/beam/issues/24035#issuecomment-1323356158
> @echauchot Thanks, here's my findings / thinking so far. > > First, let's look at the side-input problem from a different perspective. Semantically it's in fact a co-group / join by window problem. Now, how to implement that efficiently in Spark considering we're dealing with a potentially very large dataset and a comparably small side-input. > > Here's a couple of possible ways: > > * Naively implement it as co-group / join by window. Though, that would mean shuffling both the large dataset as well as the side-input. > * We can be smarter of course, and implement it as a broadcast join. That avoids shuffling the large dataset. Instead the smaller side-input is send to all partitions of the larger one and the join happens locally in place. Sounds pretty much like what we want, right? Agree > * And of course, we can collect the side-input on the driver and broadcast it as variable. That way we can also do a local join / lookup. Though, this has the disadvantage that we will need to evaluate (and broadcast) the side-input ahead of time during the translation of the Beam pipeline > > To also mention Spark temp views, these are in fact just named `Dataset` for usage in SQL. All the above applies to them as well. Their data is distributed the same way across the cluster with the same implications. > > Now, looking more closely, a couple of findings: > > * To join on windows we have to first explode the dataset, so it consists rows of single windowed values. That's potentially costly. yes, better to use another approach > * Broadcast joins in Spark are implemented as follows: 1) Collect data on the driver. 2) Broadcast it. Turns out this isn't much different from the manual broadcast. So then doing the manual broadcast seems better as it will allow you proper control and potential optimizations on the broadcast. Hence what you chose in the related PR. > * Side-inputs are typically build involving a CombineGlobally, meaning we need a global reduce step leaving just 1 partition. If we could do this final reduce step on the driver, the data would be already available to be broadcasted. Unfortunately there's additional (primitive) processing steps in-between that make this difficult. Yes, that would have been a good improvement to avoid collecting to the driver. Let's see, maybe in a next step. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
