Interestingly, I'm currently also working on a proposal for generic join semantics. I plan to send a proposal for review, but unfortunately, there are still other things keeping me busy. I take this opportunity to review high-level thoughts, maybe someone can give some points.

The general idea is to define a join that can incorporate all other types as special cases, where the generic implementation can be simplified or optimized, but the semantics remain the same. As I plan to put this down to a full design document I will just very roughly outline ideas:

 a) the generic semantics, should be equivalent to running relational join against set of tables _after each individual modification of the relation_ and producing results with timestamp of the last modification

 b) windows "scope" state of each "table" - i.e. when time reaches window.maxTimestamp() corresponding "table" is cleared

 c) it should be possible to derive other types of joins from this definition by certain manipulations (e.g. buffering multiple updates in single window and assigninig all elements timestamp of window.maxTimestamp() will yield the classical "windowed join" with the requirement to have same windows on both (all) sides as otherwise the result will be empty) - the goal of these modification is typically enabling some optimization (e.g. the fully generic implementation must include time sorting - either implicitly or explicitly, optimized variants can drop this requirement).

It would be great is someone has any comments on this bottom-up approach.

Jan

On 5/1/20 5:30 PM, Kenneth Knowles wrote:
+dev <mailto:[email protected]>@beam and some people who I talk about joins with

Interesting! It is a lot to take in and fully grok the code, so calling in reinforcements...

Generally, I think there's agreement that for a lot of real use cases, you have to roll your own join using the lower level Beam primitives. So I think it would be great to get some of these other approaches to joins into Beam, perhaps as an extension of the Java SDK or even in the core (since schema joins are in the core). In particular:

 - "join in fixed window with repeater" sounds similar (but not identical) to work by Mikhail  - "join in global window with cache" sounds similar (but not identical) to work and discussions w/ Reza and Tyson

I want to be clear that I am *not* saying there's any duplication. I'm guessing these all fit into a collection of different ways to accomplish joins, and if everything comes to fruition we will have the great opportunity to document how a user should choose between them.

Kenn

On Fri, May 1, 2020 at 7:56 AM Marcin Kuthan <[email protected] <mailto:[email protected]>> wrote:

    Hi,

    it's my first post here but I'm a group reader for a while, so
    thank you for sharing the knowledge!

    I've been using Beam/Scio on Dataflow for about a year, mostly for
    stream processing from unbounded source like PubSub. During my
    daily work I found that built-in windowing is very generic and
    provides reach watermark/late events semantics but there are a few
    very annoying limitations, e.g:
    - both side of the join must be defined within compatible windows
    - for fixed windows, elements close to window boundaries (but in
    different windows) won't be joined
    - for sliding windows there is a huge overhead if the duration is
    much longer than offset

    I would like to ask you to review a few "join/windowing patterns"
    with custom stateful ParDos, not so generic as Beam built-ins but
    perhaps better crafted for more specific needs. I published code
    with tests, feel free to comment as GitHub issues or on the
    mailing list. The event time processing with watermarks is so
    demanding that I'm almost sure that I overlooked many important
    corner cases.
    https://github.com/mkuthan/beam-examples

    If you think that the examples are somehow useful I'll be glad to
    write blog post with more details :)

    Marcin

Reply via email to