On Sun, Nov 24, 2019 at 12:57 AM Jan Lukavský <[email protected]> wrote:
> I can put down a design document, but before that I need to clarify some > things for me. I'm struggling to put all of this into a bigger picture. > Sorry if the arguments are circulating, but I didn't notice any proposal of > how to solve these. If anyone can disprove any of this logic it would be > very much appreciated as I might be able to get from a dead end: > > a) in the bi-temporal join you can either buffer until watermark, or emit > false data that has to be retracted > This is not the case. A stateful DoFn based join can emit immediately joined rows that will never need to be retracted. The need for retractions has to do with CoGBK-based implementation of a join. I suggest that you work out the definition of the join you are interested in, with a good amount of mathematical rigor, and then consider the ways you can implement it. That is where a design doc will probably clarify things. Kenn b) until retractions are 100% functional (and that is sort of holy grail > for now), then the only solution is using a buffer holding data up to > watermark *and then sort by event time* > c) even if retractions were 100% functional, there would have to be > special implementation for batch case, because otherwise this would simply > blow up downstream processing with insanely many false additions and > subsequent retractions > > Property b) means that if we want this feature now, we must sort by event > time and there is no way around. Property c) shows that even in the future, > we must make (in certain cases) distinction between batch and streaming > code paths, which seems weird to me, but it might be an option. But still, > there is no way to express this join in batch case, because it would > require either buffering (up to) whole input on local worker (doesn't look > like viable option) or provide a way in user code to signal the need for > ordering of data inside GBK (and we are there again :)). Yes, we might > shift this need from stateful dofn to GBK like > > input.apply(GroupByKey.sorted()) > > I cannot find a good reasoning why this would be better than giving this > semantics to (stateful) ParDo. > > Maybe someone can help me out here? > > Jan > On 11/24/19 5:05 AM, Kenneth Knowles wrote: > > I don't actually see how event time sorting simplifies this case much. You > still need to buffer elements until they can no longer be matched in the > join, and you still need to query that buffer for elements that might > match. The general "bi-temporal join" (without sorting) requires one new > state type and then it has identical API, does not require any novel data > structures or reasoning, yields better latency (no sort buffer delay), and > discards less data (no sort buffer cutoff; watermark is better). Perhaps a > design document about this specific case would clarify. > > Kenn > > On Fri, Nov 22, 2019 at 10:08 PM Jan Lukavský <[email protected]> wrote: > >> I didn't want to go too much into detail, but to describe the idea >> roughly (ignoring the problem of different window fns on both sides to keep >> it as simple as possible): >> >> rhs ----- \ >> >> flatten (on global window) ---- stateful par do (sorted >> by event time) ---- output >> >> lhs ----- / >> >> If we can guarantee event time order arrival of events into the stateful >> pardo, then the whole complexity reduces to keep current value of left and >> right element and just flush them out each time there is an update. That is >> the "knob" is actually when watermark moves, because it is what tells the >> join operation that there will be no more (not late) input. This is very, >> very simplified, but depicts the solution. The "classical" windowed join >> reduces to this if all data in each window is projected onto window end >> boundary. Then there will be a cartesian product, because all the elements >> have the same timestamp. I can put this into a design doc with all the >> details, I was trying to find out if there is or was any effort around this. >> >> I was in touch with Reza in the PR #9032, I think that it currently >> suffers from problems with running this on batch. >> >> I think I can even (partly) resolve the retraction issue (for joins), as >> described on the thread [1]. Shortly, there can be two copies of the >> stateful dofn, one running at watermark and the other at (watermark - >> allowed lateness). One would produce ON_TIME (maybe wrong) results, the >> other would produce LATE but correct ones. Being able to compare them, the >> outcome would be that it would be possible to retract the wrong results. >> >> Yes, this is also about providing more evidence of why I think event-time >> sorting should be (somehow) part of the model. :-) >> >> Jan >> >> [1] >> https://lists.apache.org/thread.html/a736ebd6b0913d2e06dfa54797716d022adf2c45140530d5c3bd538d@%3Cdev.beam.apache.org%3E >> On 11/23/19 5:54 AM, Kenneth Knowles wrote: >> >> +Mikhail Gryzykhin <[email protected]> +Rui Wang <[email protected]> +Reza >> Rokni <[email protected]> who have all done some investigations here. >> >> >> On Fri, Nov 22, 2019 at 11:48 AM Jan Lukavský <[email protected]> wrote: >> >>> >>> On 11/22/19 7:54 PM, Reuven Lax wrote: >>> >>> >>> >>> On Fri, Nov 22, 2019 at 10:19 AM Jan Lukavský <[email protected]> wrote: >>> >>>> Hi Reuven, >>>> >>>> I didn't investigate that particular one, but looking into that now, it >>>> looks that is (same as the "classic" join library) builds around CoGBK. Is >>>> that correct? If yes, then it essentially means that it: >>>> >>> - works only for cases where both sides have the same windowfn (that is >>>> limitation of Flatten that precedes CoGBK) >>>> >>> Correct. Did you want to join different windows? If so what are the >>> semantics? If the lhs has FixedWindows and the rhs has SessionWindows, what >>> do you want the join semantics to be? The only thing I could imagine would >>> be for the user to provide some function telling the join how to map the >>> windows together, but that could be pretty complicated. >>> >>> I don't want to go too far into details, but generally both lhs and rhs >>> can be put onto time line and then full join can be defined as each pair of >>> (lhs, first preceding rhs) and (rhs, first preceding lhs). Then the end of >>> window is semantically just clearing the joined value (setting it to null, >>> thus at the end of window there will be pair (lhs, null) or (null, rhs) in >>> case of full outer join). This way any combination of windows is possible, >>> because all window does is that it "scopes" validity of respective values >>> (lhs, rhs). >>> >> >> I think it is very valid to hope to do a join in the sense of a >> relational join where it is row-to-row. In this case, Beam's concept of >> windowing may or may not make sense. It is just a tool for the job. It is >> just a grouping key that provides a time when state can be deleted. So I >> would say your use case is more global window to global window join. That >> is what I think of as a true stream-to-stream join anyhow. You probably >> don't want to wait forever for output. So you'll need to use some knob >> other than Beam windows or triggers. >> >>> Reza has prototyped a join like you describe here: >> https://github.com/apache/beam/pull/9032 >> >> If your join condition explicitly includes the event time distance >> between elements, then it could "just work". If that isn't really part of >> your join condition, then you will have to see this restriction as a "knob" >> that you tweak on your results. >> >>> - when using global window, there has to be trigger and (afaik) there >>>> is no trigger that would guarantee firing after each data element (for >>>> early panes) (because triggers are there to express cost-latency tradeoff, >>>> not semantics) >>>> >>> >>> Can you explain the use case where this matters? If you do trigger >>> elementCountAtLeast(1) on the join, then the consumer will simply see a >>> continuous stream of outputs. I'm not sure I understand why the consumer >>> cares that some of those outputs were in a pane that really held 3 outputs >>> instead of 1. >>> >>> What I'm trying to solve is basically this: >>> >>> - lhs is event stream >>> >>> - rhs is stream of a "state updates" >>> >>> purpose of the join is "take each event, pair it with currently valid >>> state and produce output and possibly modified state". I cannot process two >>> events at a time, because first event can modify the state and the >>> subsequent event should see this. It is not a "simple" stateful pardo >>> either, because the state can be modified externally (not going into too >>> much detail here, but e.g. by writing into kafka topic). >>> >> Reuven's explanation is missing some detail. If the CoGBK is in >> discarding mode, then it will miss join results. If the CoGBK is in >> accumulating mode, it will duplicate join results. This is a known problem >> and the general solution is retractions. >> >> Basically, CoGBK-based joins just don't work with triggers until we have >> retractions. >> >> >> >>> Moreover, I'd like to define the join semantics so that when there are >>>> available elements from both sides, the fired pane should be ON_TIME, not >>>> EARLY. That essentially means that the fully general case would not be >>>> built around (Co)GBK, but stateful ParDo. There are specific options where >>>> this fully general case "degrades" into forms that can be efficiently >>>> expressed using (Co)GBK, that is true. >>>> >>> >>> BTW building this around stateful DoFn might be a better fit. The main >>> reason I didn't is because we would need a good distributed MapState >>> (something discussed fairly recently on the list), and that is not yet >>> built. Once we had that, I might be inclined to rewrite this join on >>> stateful DoFn. >>> >>> Yes, the sorted state helps for streaming case. But I'd be careful about >>> that for batch case, where this might lead to high pressure on the state >>> (and InMemoryStateInternals might OOME for instance). >>> >>> >>> However can you explain what you are expecting from the pane? An EARLY >>> pane simply means that we are producing output before the end of the >>> window. If you are in the global window triggering every element, then >>> every output is EARLY. It might seem weird if you are interpreting EARLY as >>> "outputting data that isn't ready," however that's not what EARLY is >>> defined to be. Any change to the pane semantics would be a major breaking >>> change to very fundamental semantics. >>> >>> I wonder if you are really objecting to the name EARLY and ON_TIME? >>> Maybe we would've been better off tagging it BEFORE_WINDOW_END instead of >>> EARLY, to make it clear what is meant? >>> >>> Essentially I don't object anything here. I'm missing solution to the >>> "event vs. state" join described above. I was thinking about how to make >>> these types of problems more user friendly and it essentially leads to >>> creating a somewhat more generic semantics of join, where end-of-window is >>> converted into "'value-delete events" and then just joining by the >>> "previous" or "valid" value (yes, this relates to validity windows >>> mentioned on Beam Summit Europe). It actually turns out that with some work >>> we could define quite "naturally" a join on two streams with global window >>> and no trigger. It would even function with lowest latency possible (but >>> yes, with the highest expenses, it is actually the introduction of (same!) >>> windows that enable certain optimizations). It the correctly defines >>> semantics for different windows, although the result would be (probably >>> unexpectedly) windowed using global window. But that doesn't seem to be any >>> breaking change, because it is currently not possible (any such pipeline >>> will not be validated). >>> >>> Maybe for reference, the unwindowed join would be what is described here >>> [1] >>> >>> [1] >>> https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Streams+Join+Semantics#KafkaStreamsJoinSemantics-KStream-KTableJoin >>> >>> >>> >>>> Jan >>>> On 11/22/19 6:47 PM, Reuven Lax wrote: >>>> >>>> Have you seen the Join library that is part of schemas? I'm curious >>>> whether this fits your needs, or there's something lacking there. >>>> >>>> On Fri, Nov 22, 2019 at 12:31 AM Jan Lukavský <[email protected]> wrote: >>>> >>>>> Hi, >>>>> >>>>> based on roadmap [1], we would like to define and implement a full set >>>>> of (unified) stream-stream joins. That would include: >>>>> >>>>> - joins (left, right, full outer) on global window with "immediate >>>>> trigger" >>>>> >>>>> - joins with different windowing functions on left and right side >>>>> >>>>> The approach would be to define these operations in a natural way, so >>>>> that the definition is aligned with how current joins work (same >>>>> windows, cartesian product of values with same keys, output timestamp >>>>> projected to the end of window, etc.). Because this should be a >>>>> generic >>>>> approach, this effort should probably be part of join library, that >>>>> can >>>>> the be reused by other components, too (e.g. SQL). >>>>> >>>>> The question is - is (or was) there any effort that we can build upon? >>>>> Or should this be designed from scratch? >>>>> >>>>> Jan >>>>> >>>>> [1] https://beam.apache.org/roadmap/euphoria/ >>>>> >>>>>
