Interestingly, I'm currently also working on a proposal for generic join
semantics. I plan to send a proposal for review, but unfortunately,
there are still other things keeping me busy. I take this opportunity to
review high-level thoughts, maybe someone can give some points.
The general idea is to define a join that can incorporate all other
types as special cases, where the generic implementation can be
simplified or optimized, but the semantics remain the same. As I plan to
put this down to a full design document I will just very roughly outline
ideas:
a) the generic semantics, should be equivalent to running relational
join against set of tables _after each individual modification of the
relation_ and producing results with timestamp of the last modification
b) windows "scope" state of each "table" - i.e. when time reaches
window.maxTimestamp() corresponding "table" is cleared
c) it should be possible to derive other types of joins from this
definition by certain manipulations (e.g. buffering multiple updates in
single window and assigninig all elements timestamp of
window.maxTimestamp() will yield the classical "windowed join" with the
requirement to have same windows on both (all) sides as otherwise the
result will be empty) - the goal of these modification is typically
enabling some optimization (e.g. the fully generic implementation must
include time sorting - either implicitly or explicitly, optimized
variants can drop this requirement).
It would be great is someone has any comments on this bottom-up approach.
Jan
On 5/1/20 5:30 PM, Kenneth Knowles wrote:
+dev <mailto:[email protected]>@beam and some people who I talk
about joins with
Interesting! It is a lot to take in and fully grok the code, so
calling in reinforcements...
Generally, I think there's agreement that for a lot of real use cases,
you have to roll your own join using the lower level Beam primitives.
So I think it would be great to get some of these other approaches to
joins into Beam, perhaps as an extension of the Java SDK or even in
the core (since schema joins are in the core). In particular:
- "join in fixed window with repeater" sounds similar (but not
identical) to work by Mikhail
- "join in global window with cache" sounds similar (but not
identical) to work and discussions w/ Reza and Tyson
I want to be clear that I am *not* saying there's any duplication. I'm
guessing these all fit into a collection of different ways to
accomplish joins, and if everything comes to fruition we will have the
great opportunity to document how a user should choose between them.
Kenn
On Fri, May 1, 2020 at 7:56 AM Marcin Kuthan <[email protected]
<mailto:[email protected]>> wrote:
Hi,
it's my first post here but I'm a group reader for a while, so
thank you for sharing the knowledge!
I've been using Beam/Scio on Dataflow for about a year, mostly for
stream processing from unbounded source like PubSub. During my
daily work I found that built-in windowing is very generic and
provides reach watermark/late events semantics but there are a few
very annoying limitations, e.g:
- both side of the join must be defined within compatible windows
- for fixed windows, elements close to window boundaries (but in
different windows) won't be joined
- for sliding windows there is a huge overhead if the duration is
much longer than offset
I would like to ask you to review a few "join/windowing patterns"
with custom stateful ParDos, not so generic as Beam built-ins but
perhaps better crafted for more specific needs. I published code
with tests, feel free to comment as GitHub issues or on the
mailing list. The event time processing with watermarks is so
demanding that I'm almost sure that I overlooked many important
corner cases.
https://github.com/mkuthan/beam-examples
If you think that the examples are somehow useful I'll be glad to
write blog post with more details :)
Marcin