+Mikhail Gryzykhin <mailto:[email protected]> +Rui Wang
<mailto:[email protected]> +Reza Rokni
<mailto:[email protected]> who have all done some
investigations here.
On Fri, Nov 22, 2019 at 11:48 AM Jan Lukavský
<[email protected] <mailto:[email protected]>> wrote:
On 11/22/19 7:54 PM, Reuven Lax wrote:
On Fri, Nov 22, 2019 at 10:19 AM Jan Lukavský
<[email protected] <mailto:[email protected]>> wrote:
Hi Reuven,
I didn't investigate that particular one, but
looking into that now, it looks that is (same
as the "classic" join library) builds around
CoGBK. Is that correct? If yes, then it
essentially means that it:
- works only for cases where both sides have
the same windowfn (that is limitation of
Flatten that precedes CoGBK)
Correct. Did you want to join different windows?
If so what are the semantics? If the lhs has
FixedWindows and the rhs has SessionWindows, what
do you want the join semantics to be? The only
thing I could imagine would be for the user to
provide some function telling the join how to map
the windows together, but that could be pretty
complicated.
I don't want to go too far into details, but
generally both lhs and rhs can be put onto time
line and then full join can be defined as each pair
of (lhs, first preceding rhs) and (rhs, first
preceding lhs). Then the end of window is
semantically just clearing the joined value
(setting it to null, thus at the end of window
there will be pair (lhs, null) or (null, rhs) in
case of full outer join). This way any combination
of windows is possible, because all window does is
that it "scopes" validity of respective values
(lhs, rhs).
I think it is very valid to hope to do a join in the
sense of a relational join where it is row-to-row. In
this case, Beam's concept of windowing may or may not
make sense. It is just a tool for the job. It is just a
grouping key that provides a time when state can be
deleted. So I would say your use case is more global
window to global window join. That is what I think of
as a true stream-to-stream join anyhow. You probably
don't want to wait forever for output. So you'll need
to use some knob other than Beam windows or triggers.
Reza has prototyped a join like you describe here:
https://github.com/apache/beam/pull/9032
If your join condition explicitly includes the event
time distance between elements, then it could "just
work". If that isn't really part of your join
condition, then you will have to see this restriction
as a "knob" that you tweak on your results.
- when using global window, there has to be
trigger and (afaik) there is no trigger that
would guarantee firing after each data element
(for early panes) (because triggers are there
to express cost-latency tradeoff, not semantics)
Can you explain the use case where this matters?
If you do trigger elementCountAtLeast(1) on the
join, then the consumer will simply see a
continuous stream of outputs. I'm not sure I
understand why the consumer cares that some of
those outputs were in a pane that really held 3
outputs instead of 1.
What I'm trying to solve is basically this:
- lhs is event stream
- rhs is stream of a "state updates"
purpose of the join is "take each event, pair it
with currently valid state and produce output and
possibly modified state". I cannot process two
events at a time, because first event can modify
the state and the subsequent event should see this.
It is not a "simple" stateful pardo either, because
the state can be modified externally (not going
into too much detail here, but e.g. by writing into
kafka topic).
Reuven's explanation is missing some detail. If the
CoGBK is in discarding mode, then it will miss join
results. If the CoGBK is in accumulating mode, it will
duplicate join results. This is a known problem and the
general solution is retractions.
Basically, CoGBK-based joins just don't work with
triggers until we have retractions.
Moreover, I'd like to define the join
semantics so that when there are available
elements from both sides, the fired pane
should be ON_TIME, not EARLY. That essentially
means that the fully general case would not be
built around (Co)GBK, but stateful ParDo.
There are specific options where this fully
general case "degrades" into forms that can be
efficiently expressed using (Co)GBK, that is true.
BTW building this around stateful DoFn might be a
better fit. The main reason I didn't is because we
would need a good distributed MapState (something
discussed fairly recently on the list), and that
is not yet built. Once we had that, I might be
inclined to rewrite this join on stateful DoFn.
Yes, the sorted state helps for streaming case. But
I'd be careful about that for batch case, where
this might lead to high pressure on the state (and
InMemoryStateInternals might OOME for instance).
However can you explain what you are expecting
from the pane? An EARLY pane simply means that we
are producing output before the end of the window.
If you are in the global window triggering every
element, then every output is EARLY. It might seem
weird if you are interpreting EARLY as "outputting
data that isn't ready," however that's not what
EARLY is defined to be. Any change to the pane
semantics would be a major breaking change to very
fundamental semantics.
I wonder if you are really objecting to the name
EARLY and ON_TIME? Maybe we would've been better
off tagging it BEFORE_WINDOW_END instead of EARLY,
to make it clear what is meant?
Essentially I don't object anything here. I'm
missing solution to the "event vs. state" join
described above. I was thinking about how to make
these types of problems more user friendly and it
essentially leads to creating a somewhat more
generic semantics of join, where end-of-window is
converted into "'value-delete events" and then just
joining by the "previous" or "valid" value (yes,
this relates to validity windows mentioned on Beam
Summit Europe). It actually turns out that with
some work we could define quite "naturally" a join
on two streams with global window and no trigger.
It would even function with lowest latency possible
(but yes, with the highest expenses, it is actually
the introduction of (same!) windows that enable
certain optimizations). It the correctly defines
semantics for different windows, although the
result would be (probably unexpectedly) windowed
using global window. But that doesn't seem to be
any breaking change, because it is currently not
possible (any such pipeline will not be validated).
Maybe for reference, the unwindowed join would be
what is described here [1]
[1]
https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Streams+Join+Semantics#KafkaStreamsJoinSemantics-KStream-KTableJoin
Jan
On 11/22/19 6:47 PM, Reuven Lax wrote:
Have you seen the Join library that is part
of schemas? I'm curious whether this fits
your needs, or there's something lacking there.
On Fri, Nov 22, 2019 at 12:31 AM Jan Lukavský
<[email protected] <mailto:[email protected]>> wrote:
Hi,
based on roadmap [1], we would like to
define and implement a full set
of (unified) stream-stream joins. That
would include:
- joins (left, right, full outer) on
global window with "immediate
trigger"
- joins with different windowing
functions on left and right side
The approach would be to define these
operations in a natural way, so
that the definition is aligned with how
current joins work (same
windows, cartesian product of values with
same keys, output timestamp
projected to the end of window, etc.).
Because this should be a generic
approach, this effort should probably be
part of join library, that can
the be reused by other components, too
(e.g. SQL).
The question is - is (or was) there any
effort that we can build upon?
Or should this be designed from scratch?
Jan
[1] https://beam.apache.org/roadmap/euphoria/