A couple of things that are really nice here,

1- Domain specific (CTR in your example). We may find that eventually it's
not possible / practical to build out generic joins for all situations. But
with the primitives available in Beam and good 'patterns' domain specific
joins could be added for different industries.

2- Pros / Cons section. This is very nice and as Kenn mentioned it would be
great for there to be a Collection of joins that users can choose from
based on the pros / cons.

I got pulled onto other work before I could complete this PR (LINK
<https://github.com/apache/beam/pull/9032>) for example, but I hope to go
back to it, it's specific to a time series use case from a specific
industry with pros and cons based on throughput etc....

Maybe we should consider adding something with links etc to Beam
patterns...

https://beam.apache.org/documentation/patterns/overview/

Perhaps a Joins section and we do something that has not been done before
and add a Industry / Domain flavour..

Cheers

Reza

On Sat, 2 May 2020 at 14:45, Marcin Kuthan <[email protected]> wrote:

> @Kenneth - thank for your response, for sure I was inspired a lot by
> earlier discussions on the group and latest documentation updates about
> Timers:
> https://beam.apache.org/documentation/programming-guide/#timers
>
> In the limitations I forgot to mention about SideInputs, it works quite
> well for scenarios where one side of the join is updated slowly, very
> slowly. But for scenarios where the main stream gets 50k+ events per
> seconds and the joined stream ~100 events per second it simply does not
> work. Especially if there is no support for updates in Map side input and
> the side input has to be updated/broadcasted as a whole.
>
> @Jan - very interesting, as I understood the joins are already implemented
> (plenty of them in Scio, classic ones, sparse versions, etc.) the problem
> is with limited windows semantics, triggering policy and the time of
> emitted events.
>
> Please look at LookupCacheDoFn, it looks like left outer join - but it
> isn't. Only the latest Lookup value (right side of the join) is cached. And
> the left side of the join is cached only until the first matching lookup is
> observed. Not so generic but quite efficient.
>
>
> https://github.com/mkuthan/beam-examples/blob/master/src/main/scala/org/mkuthan/beam/examples/LookupCacheDoFn.scala
>
> Marcin
>
> On Fri, 1 May 2020 at 22:22, Jan Lukavský <[email protected]> wrote:
>
>> Interestingly, I'm currently also working on a proposal for generic join
>> semantics. I plan to send a proposal for review, but unfortunately, there
>> are still other things keeping me busy. I take this opportunity to review
>> high-level thoughts, maybe someone can give some points.
>>
>> The general idea is to define a join that can incorporate all other types
>> as special cases, where the generic implementation can be simplified or
>> optimized, but the semantics remain the same. As I plan to put this down to
>> a full design document I will just very roughly outline ideas:
>>
>>  a) the generic semantics, should be equivalent to running relational
>> join against set of tables _after each individual modification of the
>> relation_ and producing results with timestamp of the last modification
>>
>>  b) windows "scope" state of each "table" - i.e. when time reaches
>> window.maxTimestamp() corresponding "table" is cleared
>>
>>  c) it should be possible to derive other types of joins from this
>> definition by certain manipulations (e.g. buffering multiple updates in
>> single window and assigninig all elements timestamp of
>> window.maxTimestamp() will yield the classical "windowed join" with the
>> requirement to have same windows on both (all) sides as otherwise the
>> result will be empty) - the goal of these modification is typically
>> enabling some optimization (e.g. the fully generic implementation must
>> include time sorting - either implicitly or explicitly, optimized variants
>> can drop this requirement).
>>
>> It would be great is someone has any comments on this bottom-up approach.
>>
>> Jan
>> On 5/1/20 5:30 PM, Kenneth Knowles wrote:
>>
>> +dev <[email protected]>@beam and some people who I talk about joins
>> with
>>
>> Interesting! It is a lot to take in and fully grok the code, so calling
>> in reinforcements...
>>
>> Generally, I think there's agreement that for a lot of real use cases,
>> you have to roll your own join using the lower level Beam primitives. So I
>> think it would be great to get some of these other approaches to joins into
>> Beam, perhaps as an extension of the Java SDK or even in the core (since
>> schema joins are in the core). In particular:
>>
>>  - "join in fixed window with repeater" sounds similar (but not
>> identical) to work by Mikhail
>>  - "join in global window with cache" sounds similar (but not identical)
>> to work and discussions w/ Reza and Tyson
>>
>> I want to be clear that I am *not* saying there's any duplication. I'm
>> guessing these all fit into a collection of different ways to accomplish
>> joins, and if everything comes to fruition we will have the great
>> opportunity to document how a user should choose between them.
>>
>> Kenn
>>
>> On Fri, May 1, 2020 at 7:56 AM Marcin Kuthan <[email protected]>
>> wrote:
>>
>>> Hi,
>>>
>>> it's my first post here but I'm a group reader for a while, so thank you
>>> for sharing the knowledge!
>>>
>>> I've been using Beam/Scio on Dataflow for about a year, mostly for
>>> stream processing from unbounded source like PubSub. During my daily work I
>>> found that built-in windowing is very generic and provides reach
>>> watermark/late events semantics but there are a few very annoying
>>> limitations, e.g:
>>> - both side of the join must be defined within compatible windows
>>> - for fixed windows, elements close to window boundaries (but in
>>> different windows) won't be joined
>>> - for sliding windows there is a huge overhead if the duration is much
>>> longer than offset
>>>
>>> I would like to ask you to review a few "join/windowing patterns" with
>>> custom stateful ParDos, not so generic as Beam built-ins but perhaps better
>>> crafted for more specific needs. I published code with tests, feel free to
>>> comment as GitHub issues or on the mailing list. The event time processing
>>> with watermarks is so demanding that I'm almost sure that I overlooked many
>>> important corner cases.
>>> https://github.com/mkuthan/beam-examples
>>>
>>> If you think that the examples are somehow useful I'll be glad to write
>>> blog post with more details :)
>>>
>>> Marcin
>>>
>>

Reply via email to