I didn't want to go too much into detail, but to describe the idea roughly (ignoring the problem of different window fns on both sides to keep it as simple as possible):

rhs -----  \

                flatten (on global window) ---- stateful par do (sorted by event time)  ---- output

lhs -----  /

If we can guarantee event time order arrival of events into the stateful pardo, then the whole complexity reduces to keep current value of left and right element and just flush them out each time there is an update. That is the "knob" is actually when watermark moves, because it is what tells the join operation that there will be no more (not late) input. This is very, very simplified, but depicts the solution. The "classical" windowed join reduces to this if all data in each window is projected onto window end boundary. Then there will be a cartesian product, because all the elements have the same timestamp. I can put this into a design doc with all the details, I was trying to find out if there is or was any effort around this.

I was in touch with Reza in the PR #9032, I think that it currently suffers from problems with running this on batch.

I think I can even (partly) resolve the retraction issue (for joins), as described on the thread [1]. Shortly, there can be two copies of the stateful dofn, one running at watermark and the other at (watermark - allowed lateness). One would produce ON_TIME (maybe wrong) results, the other would produce LATE but correct ones. Being able to compare them, the outcome would be that it would be possible to retract the wrong results.

Yes, this is also about providing more evidence of why I think event-time sorting should be (somehow) part of the model. :-)

Jan

[1] https://lists.apache.org/thread.html/a736ebd6b0913d2e06dfa54797716d022adf2c45140530d5c3bd538d@%3Cdev.beam.apache.org%3E

On 11/23/19 5:54 AM, Kenneth Knowles wrote:
+Mikhail Gryzykhin <mailto:[email protected]> +Rui Wang <mailto:[email protected]> +Reza Rokni <mailto:[email protected]> who have all done some investigations here.


On Fri, Nov 22, 2019 at 11:48 AM Jan Lukavský <[email protected] <mailto:[email protected]>> wrote:


    On 11/22/19 7:54 PM, Reuven Lax wrote:


    On Fri, Nov 22, 2019 at 10:19 AM Jan Lukavský <[email protected]
    <mailto:[email protected]>> wrote:

        Hi Reuven,

        I didn't investigate that particular one, but looking into
        that now, it looks that is (same as the "classic" join
        library) builds around CoGBK. Is that correct? If yes, then
        it essentially means that it:

         - works only for cases where both sides have the same
        windowfn (that is limitation of Flatten that precedes CoGBK)

    Correct. Did you want to join different windows? If so what are
    the semantics? If the lhs has FixedWindows and the rhs has
    SessionWindows, what do you want the join semantics to be? The
    only thing I could imagine would be for the user to provide some
    function telling the join how to map the windows together, but
    that could be pretty complicated.
    I don't want to go too far into details, but generally both lhs
    and rhs can be put onto time line and then full join can be
    defined as each pair of (lhs, first preceding rhs) and (rhs, first
    preceding lhs). Then the end of window is semantically just
    clearing the joined value (setting it to null, thus at the end of
    window there will be pair (lhs, null) or (null, rhs) in case of
    full outer join). This way any combination of windows is possible,
    because all window does is that it "scopes" validity of respective
    values (lhs, rhs).


I think it is very valid to hope to do a join in the sense of a relational join where it is row-to-row. In this case, Beam's concept of windowing may or may not make sense. It is just a tool for the job. It is just a grouping key that provides a time when state can be deleted. So I would say your use case is more global window to global window join. That is what I think of as a true stream-to-stream join anyhow. You probably don't want to wait forever for output. So you'll need to use some knob other than Beam windows or triggers.

Reza has prototyped a join like you describe here: https://github.com/apache/beam/pull/9032

If your join condition explicitly includes the event time distance between elements, then it could "just work". If that isn't really part of your join condition, then you will have to see this restriction as a "knob" that you tweak on your results.

         - when using global window, there has to be trigger and
        (afaik) there is no trigger that would guarantee firing after
        each data element (for early panes) (because triggers are
        there to express cost-latency tradeoff, not semantics)


    Can you explain the use case where this matters? If you do
    trigger elementCountAtLeast(1) on the join, then the consumer
    will simply see a continuous stream of outputs. I'm not sure I
    understand why the consumer cares that some of those outputs were
    in a pane that really held 3 outputs instead of 1.

    What I'm trying to solve is basically this:

     - lhs is event stream

     - rhs is stream of a "state updates"

    purpose of the join is "take each event, pair it with currently
    valid state and produce output and possibly modified state". I
    cannot process two events at a time, because first event can
    modify the state and the subsequent event should see this. It is
    not a "simple" stateful pardo either, because the state can be
    modified externally (not going into too much detail here, but e.g.
    by writing into kafka topic).

Reuven's explanation is missing some detail. If the CoGBK is in discarding mode, then it will miss join results. If the CoGBK is in accumulating mode, it will duplicate join results. This is a known problem and the general solution is retractions.

Basically, CoGBK-based joins just don't work with triggers until we have retractions.

        Moreover, I'd like to define the join semantics so that when
        there are available elements from both sides, the fired pane
        should be ON_TIME, not EARLY. That essentially means that the
        fully general case would not be built around (Co)GBK, but
        stateful ParDo. There are specific options where this fully
        general case "degrades" into forms that can be efficiently
        expressed using (Co)GBK, that is true.


    BTW building this around stateful DoFn might be a better fit. The
    main reason I didn't is because we would need a good distributed
    MapState (something discussed fairly recently on the list), and
    that is not yet built. Once we had that, I might be inclined to
    rewrite this join on stateful DoFn.
    Yes, the sorted state helps for streaming case. But I'd be careful
    about that for batch case, where this might lead to high pressure
    on the state (and InMemoryStateInternals might OOME for instance).

    However can you explain what you are expecting from the pane? An
    EARLY pane simply means that we are producing output before the
    end of the window. If you are in the global window triggering
    every element, then every output is EARLY. It might seem weird if
    you are interpreting EARLY as "outputting data that isn't ready,"
    however that's not what EARLY is defined to be. Any change to the
    pane semantics would be a major breaking change to very
    fundamental semantics.

    I wonder if you are really objecting to the name EARLY and
    ON_TIME? Maybe we would've been better off tagging it
    BEFORE_WINDOW_END instead of EARLY, to make it clear what is meant?

    Essentially I don't object anything here. I'm missing solution to
    the "event vs. state" join described above. I was thinking about
    how to make these types of problems more user friendly and it
    essentially leads to creating a somewhat more generic semantics of
    join, where end-of-window is converted into "'value-delete events"
    and then just joining by the "previous" or "valid" value (yes,
    this relates to validity windows mentioned on Beam Summit Europe).
    It actually turns out that with some work we could define quite
    "naturally" a join on two streams with global window and no
    trigger. It would even function with lowest latency possible (but
    yes, with the highest expenses, it is actually the introduction of
    (same!) windows that enable certain optimizations). It the
    correctly defines semantics for different windows, although the
    result would be (probably unexpectedly) windowed using global
    window. But that doesn't seem to be any breaking change, because
    it is currently not possible (any such pipeline will not be
    validated).

    Maybe for reference, the unwindowed join would be what is
    described here [1]

    [1]
    
https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Streams+Join+Semantics#KafkaStreamsJoinSemantics-KStream-KTableJoin

        Jan

        On 11/22/19 6:47 PM, Reuven Lax wrote:
        Have you seen the Join library that is part of schemas? I'm
        curious whether this fits your needs, or there's something
        lacking there.

        On Fri, Nov 22, 2019 at 12:31 AM Jan Lukavský
        <[email protected] <mailto:[email protected]>> wrote:

            Hi,

            based on roadmap [1], we would like to define and
            implement a full set
            of (unified) stream-stream joins. That would include:

              - joins (left, right, full outer) on global window
            with "immediate
            trigger"

              - joins with different windowing functions on left and
            right side

            The approach would be to define these operations in a
            natural way, so
            that the definition is aligned with how current joins
            work (same
            windows, cartesian product of values with same keys,
            output timestamp
            projected to the end of window, etc.). Because this
            should be a generic
            approach, this effort should probably be part of join
            library, that can
            the be reused by other components, too (e.g. SQL).

            The question is - is (or was) there any effort that we
            can build upon?
            Or should this be designed from scratch?

            Jan

            [1] https://beam.apache.org/roadmap/euphoria/

Reply via email to