On Sun, Nov 24, 2019 at 12:57 AM Jan Lukavský <[email protected]> wrote:

> I can put down a design document, but before that I need to clarify some
> things for me. I'm struggling to put all of this into a bigger picture.
> Sorry if the arguments are circulating, but I didn't notice any proposal of
> how to solve these. If anyone can disprove any of this logic it would be
> very much appreciated as I might be able to get from a dead end:
>
>  a) in the bi-temporal join you can either buffer until watermark, or emit
> false data that has to be retracted
>
This is not the case. A stateful DoFn based join can emit immediately
joined rows that will never need to be retracted. The need for retractions
has to do with CoGBK-based implementation of a join.

I suggest that you work out the definition of the join you are interested
in, with a good amount of mathematical rigor, and then consider the ways
you can implement it. That is where a design doc will probably clarify
things.

Kenn

 b) until retractions are 100% functional (and that is sort of holy grail
> for now), then the only solution is using a buffer holding data up to
> watermark *and then sort by event time*
>
 c) even if retractions were 100% functional, there would have to be
> special implementation for batch case, because otherwise this would simply
> blow up downstream processing with insanely many false additions and
> subsequent retractions
>
> Property b) means that if we want this feature now, we must sort by event
> time and there is no way around. Property c) shows that even in the future,
> we must make (in certain cases) distinction between batch and streaming
> code paths, which seems weird to me, but it might be an option. But still,
> there is no way to express this join in batch case, because it would
> require either buffering (up to) whole input on local worker (doesn't look
> like viable option) or provide a way in user code to signal the need for
> ordering of data inside GBK (and we are there again :)). Yes, we might
> shift this need from stateful dofn to GBK like
>
>  input.apply(GroupByKey.sorted())
>
> I cannot find a good reasoning why this would be better than giving this
> semantics to (stateful) ParDo.
>
> Maybe someone can help me out here?
>
> Jan
> On 11/24/19 5:05 AM, Kenneth Knowles wrote:
>
> I don't actually see how event time sorting simplifies this case much. You
> still need to buffer elements until they can no longer be matched in the
> join, and you still need to query that buffer for elements that might
> match. The general "bi-temporal join" (without sorting) requires one new
> state type and then it has identical API, does not require any novel data
> structures or reasoning, yields better latency (no sort buffer delay), and
> discards less data (no sort buffer cutoff; watermark is better). Perhaps a
> design document about this specific case would clarify.
>
> Kenn
>
> On Fri, Nov 22, 2019 at 10:08 PM Jan Lukavský <[email protected]> wrote:
>
>> I didn't want to go too much into detail, but to describe the idea
>> roughly (ignoring the problem of different window fns on both sides to keep
>> it as simple as possible):
>>
>> rhs -----  \
>>
>>                 flatten (on global window) ---- stateful par do (sorted
>> by event time)  ---- output
>>
>> lhs -----  /
>>
>> If we can guarantee event time order arrival of events into the stateful
>> pardo, then the whole complexity reduces to keep current value of left and
>> right element and just flush them out each time there is an update. That is
>> the "knob" is actually when watermark moves, because it is what tells the
>> join operation that there will be no more (not late) input. This is very,
>> very simplified, but depicts the solution. The "classical" windowed join
>> reduces to this if all data in each window is projected onto window end
>> boundary. Then there will be a cartesian product, because all the elements
>> have the same timestamp. I can put this into a design doc with all the
>> details, I was trying to find out if there is or was any effort around this.
>>
>> I was in touch with Reza in the PR #9032, I think that it currently
>> suffers from problems with running this on batch.
>>
>> I think I can even (partly) resolve the retraction issue (for joins), as
>> described on the thread [1]. Shortly, there can be two copies of the
>> stateful dofn, one running at watermark and the other at (watermark -
>> allowed lateness). One would produce ON_TIME (maybe wrong) results, the
>> other would produce LATE but correct ones. Being able to compare them, the
>> outcome would be that it would be possible to retract the wrong results.
>>
>> Yes, this is also about providing more evidence of why I think event-time
>> sorting should be (somehow) part of the model. :-)
>>
>> Jan
>>
>> [1]
>> https://lists.apache.org/thread.html/a736ebd6b0913d2e06dfa54797716d022adf2c45140530d5c3bd538d@%3Cdev.beam.apache.org%3E
>> On 11/23/19 5:54 AM, Kenneth Knowles wrote:
>>
>> +Mikhail Gryzykhin <[email protected]> +Rui Wang <[email protected]> +Reza
>> Rokni <[email protected]> who have all done some investigations here.
>>
>>
>> On Fri, Nov 22, 2019 at 11:48 AM Jan Lukavský <[email protected]> wrote:
>>
>>>
>>> On 11/22/19 7:54 PM, Reuven Lax wrote:
>>>
>>>
>>>
>>> On Fri, Nov 22, 2019 at 10:19 AM Jan Lukavský <[email protected]> wrote:
>>>
>>>> Hi Reuven,
>>>>
>>>> I didn't investigate that particular one, but looking into that now, it
>>>> looks that is (same as the "classic" join library) builds around CoGBK. Is
>>>> that correct? If yes, then it essentially means that it:
>>>>
>>>  - works only for cases where both sides have the same windowfn (that is
>>>> limitation of Flatten that precedes CoGBK)
>>>>
>>> Correct. Did you want to join different windows? If so what are the
>>> semantics? If the lhs has FixedWindows and the rhs has SessionWindows, what
>>> do you want the join semantics to be? The only thing I could imagine would
>>> be for the user to provide some function telling the join how to map the
>>> windows together, but that could be pretty complicated.
>>>
>>> I don't want to go too far into details, but generally both lhs and rhs
>>> can be put onto time line and then full join can be defined as each pair of
>>> (lhs, first preceding rhs) and (rhs, first preceding lhs). Then the end of
>>> window is semantically just clearing the joined value (setting it to null,
>>> thus at the end of window there will be pair (lhs, null) or (null, rhs) in
>>> case of full outer join). This way any combination of windows is possible,
>>> because all window does is that it "scopes" validity of respective values
>>> (lhs, rhs).
>>>
>>
>> I think it is very valid to hope to do a join in the sense of a
>> relational join where it is row-to-row. In this case, Beam's concept of
>> windowing may or may not make sense. It is just a tool for the job. It is
>> just a grouping key that provides a time when state can be deleted. So I
>> would say your use case is more global window to global window join. That
>> is what I think of as a true stream-to-stream join anyhow. You probably
>> don't want to wait forever for output. So you'll need to use some knob
>> other than Beam windows or triggers.
>>
>>> Reza has prototyped a join like you describe here:
>> https://github.com/apache/beam/pull/9032
>>
>> If your join condition explicitly includes the event time distance
>> between elements, then it could "just work". If that isn't really part of
>> your join condition, then you will have to see this restriction as a "knob"
>> that you tweak on your results.
>>
>>>  - when using global window, there has to be trigger and (afaik) there
>>>> is no trigger that would guarantee firing after each data element (for
>>>> early panes) (because triggers are there to express cost-latency tradeoff,
>>>> not semantics)
>>>>
>>>
>>> Can you explain the use case where this matters? If you do trigger
>>> elementCountAtLeast(1) on the join, then the consumer will simply see a
>>> continuous stream of outputs. I'm not sure I understand why the consumer
>>> cares that some of those outputs were in a pane that really held 3 outputs
>>> instead of 1.
>>>
>>> What I'm trying to solve is basically this:
>>>
>>>  - lhs is event stream
>>>
>>>  - rhs is stream of a "state updates"
>>>
>>> purpose of the join is "take each event, pair it with currently valid
>>> state and produce output and possibly modified state". I cannot process two
>>> events at a time, because first event can modify the state and the
>>> subsequent event should see this. It is not a "simple" stateful pardo
>>> either, because the state can be modified externally (not going into too
>>> much detail here, but e.g. by writing into kafka topic).
>>>
>> Reuven's explanation is missing some detail. If the CoGBK is in
>> discarding mode, then it will miss join results. If the CoGBK is in
>> accumulating mode, it will duplicate join results. This is a known problem
>> and the general solution is retractions.
>>
>> Basically, CoGBK-based joins just don't work with triggers until we have
>> retractions.
>>
>>
>>
>>> Moreover, I'd like to define the join semantics so that when there are
>>>> available elements from both sides, the fired pane should be ON_TIME, not
>>>> EARLY. That essentially means that the fully general case would not be
>>>> built around (Co)GBK, but stateful ParDo. There are specific options where
>>>> this fully general case "degrades" into forms that can be efficiently
>>>> expressed using (Co)GBK, that is true.
>>>>
>>>
>>> BTW building this around stateful DoFn might be a better fit. The main
>>> reason I didn't is because we would need a good distributed MapState
>>> (something discussed fairly recently on the list), and that is not yet
>>> built. Once we had that, I might be inclined to rewrite this join on
>>> stateful DoFn.
>>>
>>> Yes, the sorted state helps for streaming case. But I'd be careful about
>>> that for batch case, where this might lead to high pressure on the state
>>> (and InMemoryStateInternals might OOME for instance).
>>>
>>>
>>> However can you explain what you are expecting from the pane? An EARLY
>>> pane simply means that we are producing output before the end of the
>>> window. If you are in the global window triggering every element, then
>>> every output is EARLY. It might seem weird if you are interpreting EARLY as
>>> "outputting data that isn't ready," however that's not what EARLY is
>>> defined to be. Any change to the pane semantics would be a major breaking
>>> change to very fundamental semantics.
>>>
>>> I wonder if you are really objecting to the name EARLY and ON_TIME?
>>> Maybe we would've been better off tagging it BEFORE_WINDOW_END instead of
>>> EARLY, to make it clear what is meant?
>>>
>>> Essentially I don't object anything here. I'm missing solution to the
>>> "event vs. state" join described above. I was thinking about how to make
>>> these types of problems more user friendly and it essentially leads to
>>> creating a somewhat more generic semantics of join, where end-of-window is
>>> converted into "'value-delete events" and then just joining by the
>>> "previous" or "valid" value (yes, this relates to validity windows
>>> mentioned on Beam Summit Europe). It actually turns out that with some work
>>> we could define quite "naturally" a join on two streams with global window
>>> and no trigger. It would even function with lowest latency possible (but
>>> yes, with the highest expenses, it is actually the introduction of (same!)
>>> windows that enable certain optimizations). It the correctly defines
>>> semantics for different windows, although the result would be (probably
>>> unexpectedly) windowed using global window. But that doesn't seem to be any
>>> breaking change, because it is currently not possible (any such pipeline
>>> will not be validated).
>>>
>>> Maybe for reference, the unwindowed join would be what is described here
>>> [1]
>>>
>>> [1]
>>> https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Streams+Join+Semantics#KafkaStreamsJoinSemantics-KStream-KTableJoin
>>>
>>>
>>>
>>>> Jan
>>>> On 11/22/19 6:47 PM, Reuven Lax wrote:
>>>>
>>>> Have you seen the Join library that is part of schemas? I'm curious
>>>> whether this fits your needs, or there's something lacking there.
>>>>
>>>> On Fri, Nov 22, 2019 at 12:31 AM Jan Lukavský <[email protected]> wrote:
>>>>
>>>>> Hi,
>>>>>
>>>>> based on roadmap [1], we would like to define and implement a full set
>>>>> of (unified) stream-stream joins. That would include:
>>>>>
>>>>>   - joins (left, right, full outer) on global window with "immediate
>>>>> trigger"
>>>>>
>>>>>   - joins with different windowing functions on left and right side
>>>>>
>>>>> The approach would be to define these operations in a natural way, so
>>>>> that the definition is aligned with how current joins work (same
>>>>> windows, cartesian product of values with same keys, output timestamp
>>>>> projected to the end of window, etc.). Because this should be a
>>>>> generic
>>>>> approach, this effort should probably be part of join library, that
>>>>> can
>>>>> the be reused by other components, too (e.g. SQL).
>>>>>
>>>>> The question is - is (or was) there any effort that we can build upon?
>>>>> Or should this be designed from scratch?
>>>>>
>>>>> Jan
>>>>>
>>>>> [1] https://beam.apache.org/roadmap/euphoria/
>>>>>
>>>>>

Reply via email to