+Mikhail Gryzykhin <mailto:[email protected]> +Rui Wang
<mailto:[email protected]> +Reza Rokni
<mailto:[email protected]> who have all done some
investigations here.
On Fri, Nov 22, 2019 at 11:48 AM Jan Lukavský
<[email protected] <mailto:[email protected]>> wrote:
On 11/22/19 7:54 PM, Reuven Lax wrote:
On Fri, Nov 22, 2019 at 10:19 AM Jan Lukavský
<[email protected] <mailto:[email protected]>> wrote:
Hi Reuven,
I didn't investigate that particular one, but
looking into that now, it looks that is (same as
the "classic" join library) builds around CoGBK. Is
that correct? If yes, then it essentially means
that it:
- works only for cases where both sides have the
same windowfn (that is limitation of Flatten that
precedes CoGBK)
Correct. Did you want to join different windows? If so
what are the semantics? If the lhs has FixedWindows and
the rhs has SessionWindows, what do you want the join
semantics to be? The only thing I could imagine would
be for the user to provide some function telling the
join how to map the windows together, but that could be
pretty complicated.
I don't want to go too far into details, but generally
both lhs and rhs can be put onto time line and then full
join can be defined as each pair of (lhs, first
preceding rhs) and (rhs, first preceding lhs). Then the
end of window is semantically just clearing the joined
value (setting it to null, thus at the end of window
there will be pair (lhs, null) or (null, rhs) in case of
full outer join). This way any combination of windows is
possible, because all window does is that it "scopes"
validity of respective values (lhs, rhs).
I think it is very valid to hope to do a join in the sense
of a relational join where it is row-to-row. In this case,
Beam's concept of windowing may or may not make sense. It is
just a tool for the job. It is just a grouping key that
provides a time when state can be deleted. So I would say
your use case is more global window to global window join.
That is what I think of as a true stream-to-stream join
anyhow. You probably don't want to wait forever for output.
So you'll need to use some knob other than Beam windows or
triggers.
Reza has prototyped a join like you describe here:
https://github.com/apache/beam/pull/9032
If your join condition explicitly includes the event time
distance between elements, then it could "just work". If
that isn't really part of your join condition, then you will
have to see this restriction as a "knob" that you tweak on
your results.
- when using global window, there has to be
trigger and (afaik) there is no trigger that would
guarantee firing after each data element (for early
panes) (because triggers are there to express
cost-latency tradeoff, not semantics)
Can you explain the use case where this matters? If you
do trigger elementCountAtLeast(1) on the join, then the
consumer will simply see a continuous stream of
outputs. I'm not sure I understand why the consumer
cares that some of those outputs were in a pane that
really held 3 outputs instead of 1.
What I'm trying to solve is basically this:
- lhs is event stream
- rhs is stream of a "state updates"
purpose of the join is "take each event, pair it with
currently valid state and produce output and possibly
modified state". I cannot process two events at a time,
because first event can modify the state and the
subsequent event should see this. It is not a "simple"
stateful pardo either, because the state can be modified
externally (not going into too much detail here, but
e.g. by writing into kafka topic).
Reuven's explanation is missing some detail. If the CoGBK is
in discarding mode, then it will miss join results. If the
CoGBK is in accumulating mode, it will duplicate join
results. This is a known problem and the general solution is
retractions.
Basically, CoGBK-based joins just don't work with triggers
until we have retractions.
Moreover, I'd like to define the join semantics so
that when there are available elements from both
sides, the fired pane should be ON_TIME, not EARLY.
That essentially means that the fully general case
would not be built around (Co)GBK, but stateful
ParDo. There are specific options where this fully
general case "degrades" into forms that can be
efficiently expressed using (Co)GBK, that is true.
BTW building this around stateful DoFn might be a
better fit. The main reason I didn't is because we
would need a good distributed MapState (something
discussed fairly recently on the list), and that is not
yet built. Once we had that, I might be inclined to
rewrite this join on stateful DoFn.
Yes, the sorted state helps for streaming case. But I'd
be careful about that for batch case, where this might
lead to high pressure on the state (and
InMemoryStateInternals might OOME for instance).
However can you explain what you are expecting from the
pane? An EARLY pane simply means that we are producing
output before the end of the window. If you are in the
global window triggering every element, then every
output is EARLY. It might seem weird if you are
interpreting EARLY as "outputting data that isn't
ready," however that's not what EARLY is defined to be.
Any change to the pane semantics would be a major
breaking change to very fundamental semantics.
I wonder if you are really objecting to the name EARLY
and ON_TIME? Maybe we would've been better off tagging
it BEFORE_WINDOW_END instead of EARLY, to make it clear
what is meant?
Essentially I don't object anything here. I'm missing
solution to the "event vs. state" join described above.
I was thinking about how to make these types of problems
more user friendly and it essentially leads to creating
a somewhat more generic semantics of join, where
end-of-window is converted into "'value-delete events"
and then just joining by the "previous" or "valid" value
(yes, this relates to validity windows mentioned on Beam
Summit Europe). It actually turns out that with some
work we could define quite "naturally" a join on two
streams with global window and no trigger. It would even
function with lowest latency possible (but yes, with the
highest expenses, it is actually the introduction of
(same!) windows that enable certain optimizations). It
the correctly defines semantics for different windows,
although the result would be (probably unexpectedly)
windowed using global window. But that doesn't seem to
be any breaking change, because it is currently not
possible (any such pipeline will not be validated).
Maybe for reference, the unwindowed join would be what
is described here [1]
[1]
https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Streams+Join+Semantics#KafkaStreamsJoinSemantics-KStream-KTableJoin
Jan
On 11/22/19 6:47 PM, Reuven Lax wrote:
Have you seen the Join library that is part of
schemas? I'm curious whether this fits your needs,
or there's something lacking there.
On Fri, Nov 22, 2019 at 12:31 AM Jan Lukavský
<[email protected] <mailto:[email protected]>> wrote:
Hi,
based on roadmap [1], we would like to define
and implement a full set
of (unified) stream-stream joins. That would
include:
- joins (left, right, full outer) on global
window with "immediate
trigger"
- joins with different windowing functions
on left and right side
The approach would be to define these
operations in a natural way, so
that the definition is aligned with how
current joins work (same
windows, cartesian product of values with same
keys, output timestamp
projected to the end of window, etc.). Because
this should be a generic
approach, this effort should probably be part
of join library, that can
the be reused by other components, too (e.g. SQL).
The question is - is (or was) there any effort
that we can build upon?
Or should this be designed from scratch?
Jan
[1] https://beam.apache.org/roadmap/euphoria/