A couple of things that are really nice here, 1- Domain specific (CTR in your example). We may find that eventually it's not possible / practical to build out generic joins for all situations. But with the primitives available in Beam and good 'patterns' domain specific joins could be added for different industries.
2- Pros / Cons section. This is very nice and as Kenn mentioned it would be great for there to be a Collection of joins that users can choose from based on the pros / cons. I got pulled onto other work before I could complete this PR (LINK <https://github.com/apache/beam/pull/9032>) for example, but I hope to go back to it, it's specific to a time series use case from a specific industry with pros and cons based on throughput etc.... Maybe we should consider adding something with links etc to Beam patterns... https://beam.apache.org/documentation/patterns/overview/ Perhaps a Joins section and we do something that has not been done before and add a Industry / Domain flavour.. Cheers Reza On Sat, 2 May 2020 at 14:45, Marcin Kuthan <[email protected]> wrote: > @Kenneth - thank for your response, for sure I was inspired a lot by > earlier discussions on the group and latest documentation updates about > Timers: > https://beam.apache.org/documentation/programming-guide/#timers > > In the limitations I forgot to mention about SideInputs, it works quite > well for scenarios where one side of the join is updated slowly, very > slowly. But for scenarios where the main stream gets 50k+ events per > seconds and the joined stream ~100 events per second it simply does not > work. Especially if there is no support for updates in Map side input and > the side input has to be updated/broadcasted as a whole. > > @Jan - very interesting, as I understood the joins are already implemented > (plenty of them in Scio, classic ones, sparse versions, etc.) the problem > is with limited windows semantics, triggering policy and the time of > emitted events. > > Please look at LookupCacheDoFn, it looks like left outer join - but it > isn't. Only the latest Lookup value (right side of the join) is cached. And > the left side of the join is cached only until the first matching lookup is > observed. Not so generic but quite efficient. > > > https://github.com/mkuthan/beam-examples/blob/master/src/main/scala/org/mkuthan/beam/examples/LookupCacheDoFn.scala > > Marcin > > On Fri, 1 May 2020 at 22:22, Jan Lukavský <[email protected]> wrote: > >> Interestingly, I'm currently also working on a proposal for generic join >> semantics. I plan to send a proposal for review, but unfortunately, there >> are still other things keeping me busy. I take this opportunity to review >> high-level thoughts, maybe someone can give some points. >> >> The general idea is to define a join that can incorporate all other types >> as special cases, where the generic implementation can be simplified or >> optimized, but the semantics remain the same. As I plan to put this down to >> a full design document I will just very roughly outline ideas: >> >> a) the generic semantics, should be equivalent to running relational >> join against set of tables _after each individual modification of the >> relation_ and producing results with timestamp of the last modification >> >> b) windows "scope" state of each "table" - i.e. when time reaches >> window.maxTimestamp() corresponding "table" is cleared >> >> c) it should be possible to derive other types of joins from this >> definition by certain manipulations (e.g. buffering multiple updates in >> single window and assigninig all elements timestamp of >> window.maxTimestamp() will yield the classical "windowed join" with the >> requirement to have same windows on both (all) sides as otherwise the >> result will be empty) - the goal of these modification is typically >> enabling some optimization (e.g. the fully generic implementation must >> include time sorting - either implicitly or explicitly, optimized variants >> can drop this requirement). >> >> It would be great is someone has any comments on this bottom-up approach. >> >> Jan >> On 5/1/20 5:30 PM, Kenneth Knowles wrote: >> >> +dev <[email protected]>@beam and some people who I talk about joins >> with >> >> Interesting! It is a lot to take in and fully grok the code, so calling >> in reinforcements... >> >> Generally, I think there's agreement that for a lot of real use cases, >> you have to roll your own join using the lower level Beam primitives. So I >> think it would be great to get some of these other approaches to joins into >> Beam, perhaps as an extension of the Java SDK or even in the core (since >> schema joins are in the core). In particular: >> >> - "join in fixed window with repeater" sounds similar (but not >> identical) to work by Mikhail >> - "join in global window with cache" sounds similar (but not identical) >> to work and discussions w/ Reza and Tyson >> >> I want to be clear that I am *not* saying there's any duplication. I'm >> guessing these all fit into a collection of different ways to accomplish >> joins, and if everything comes to fruition we will have the great >> opportunity to document how a user should choose between them. >> >> Kenn >> >> On Fri, May 1, 2020 at 7:56 AM Marcin Kuthan <[email protected]> >> wrote: >> >>> Hi, >>> >>> it's my first post here but I'm a group reader for a while, so thank you >>> for sharing the knowledge! >>> >>> I've been using Beam/Scio on Dataflow for about a year, mostly for >>> stream processing from unbounded source like PubSub. During my daily work I >>> found that built-in windowing is very generic and provides reach >>> watermark/late events semantics but there are a few very annoying >>> limitations, e.g: >>> - both side of the join must be defined within compatible windows >>> - for fixed windows, elements close to window boundaries (but in >>> different windows) won't be joined >>> - for sliding windows there is a huge overhead if the duration is much >>> longer than offset >>> >>> I would like to ask you to review a few "join/windowing patterns" with >>> custom stateful ParDos, not so generic as Beam built-ins but perhaps better >>> crafted for more specific needs. I published code with tests, feel free to >>> comment as GitHub issues or on the mailing list. The event time processing >>> with watermarks is so demanding that I'm almost sure that I overlooked many >>> important corner cases. >>> https://github.com/mkuthan/beam-examples >>> >>> If you think that the examples are somehow useful I'll be glad to write >>> blog post with more details :) >>> >>> Marcin >>> >>
