On 11/25/19 7:47 PM, Kenneth Knowles wrote:


On Sun, Nov 24, 2019 at 12:57 AM Jan Lukavský <[email protected] <mailto:[email protected]>> wrote:

    I can put down a design document, but before that I need to
    clarify some things for me. I'm struggling to put all of this into
    a bigger picture. Sorry if the arguments are circulating, but I
    didn't notice any proposal of how to solve these. If anyone can
    disprove any of this logic it would be very much appreciated as I
    might be able to get from a dead end:

     a) in the bi-temporal join you can either buffer until watermark,
    or emit false data that has to be retracted

This is not the case. A stateful DoFn based join can emit immediately joined rows that will never need to be retracted. The need for retractions has to do with CoGBK-based implementation of a join.
I fail to see how this could work. If I emit joined rows immediately without waiting for watermark to pass, I can join two elements, that don't belong to each other, because later can arrive element with lower time distance, that should have been joint in the place of the previously emitted one. This is wrong result that has to be retracted. Or what I'm missing?

I suggest that you work out the definition of the join you are interested in, with a good amount of mathematical rigor, and then consider the ways you can implement it. That is where a design doc will probably clarify things.

Kenn

     b) until retractions are 100% functional (and that is sort of
    holy grail for now), then the only solution is using a buffer
    holding data up to watermark *and then sort by event time*

     c) even if retractions were 100% functional, there would have to
    be special implementation for batch case, because otherwise this
    would simply blow up downstream processing with insanely many
    false additions and subsequent retractions

    Property b) means that if we want this feature now, we must sort
    by event time and there is no way around. Property c) shows that
    even in the future, we must make (in certain cases) distinction
    between batch and streaming code paths, which seems weird to me,
    but it might be an option. But still, there is no way to express
    this join in batch case, because it would require either buffering
    (up to) whole input on local worker (doesn't look like viable
    option) or provide a way in user code to signal the need for
    ordering of data inside GBK (and we are there again :)). Yes, we
    might shift this need from stateful dofn to GBK like

     input.apply(GroupByKey.sorted())

    I cannot find a good reasoning why this would be better than
    giving this semantics to (stateful) ParDo.

    Maybe someone can help me out here?

    Jan

    On 11/24/19 5:05 AM, Kenneth Knowles wrote:
    I don't actually see how event time sorting simplifies this case
    much. You still need to buffer elements until they can no longer
    be matched in the join, and you still need to query that buffer
    for elements that might match. The general "bi-temporal join"
    (without sorting) requires one new state type and then it has
    identical API, does not require any novel data structures or
    reasoning, yields better latency (no sort buffer delay), and
    discards less data (no sort buffer cutoff; watermark is better).
    Perhaps a design document about this specific case would clarify.

    Kenn

    On Fri, Nov 22, 2019 at 10:08 PM Jan Lukavský <[email protected]
    <mailto:[email protected]>> wrote:

        I didn't want to go too much into detail, but to describe the
        idea roughly (ignoring the problem of different window fns on
        both sides to keep it as simple as possible):

        rhs -----  \

                        flatten (on global window) ---- stateful par
        do (sorted by event time)  ---- output

        lhs -----  /

        If we can guarantee event time order arrival of events into
        the stateful pardo, then the whole complexity reduces to keep
        current value of left and right element and just flush them
        out each time there is an update. That is the "knob" is
        actually when watermark moves, because it is what tells the
        join operation that there will be no more (not late) input.
        This is very, very simplified, but depicts the solution. The
        "classical" windowed join reduces to this if all data in each
        window is projected onto window end boundary. Then there will
        be a cartesian product, because all the elements have the
        same timestamp. I can put this into a design doc with all the
        details, I was trying to find out if there is or was any
        effort around this.

        I was in touch with Reza in the PR #9032, I think that it
        currently suffers from problems with running this on batch.

        I think I can even (partly) resolve the retraction issue (for
        joins), as described on the thread [1]. Shortly, there can be
        two copies of the stateful dofn, one running at watermark and
        the other at (watermark - allowed lateness). One would
        produce ON_TIME (maybe wrong) results, the other would
        produce LATE but correct ones. Being able to compare them,
        the outcome would be that it would be possible to retract the
        wrong results.

        Yes, this is also about providing more evidence of why I
        think event-time sorting should be (somehow) part of the
        model. :-)

        Jan

        [1]
        
https://lists.apache.org/thread.html/a736ebd6b0913d2e06dfa54797716d022adf2c45140530d5c3bd538d@%3Cdev.beam.apache.org%3E

        On 11/23/19 5:54 AM, Kenneth Knowles wrote:
        +Mikhail Gryzykhin <mailto:[email protected]> +Rui Wang
        <mailto:[email protected]> +Reza Rokni
        <mailto:[email protected]> who have all done some
        investigations here.


        On Fri, Nov 22, 2019 at 11:48 AM Jan Lukavský
        <[email protected] <mailto:[email protected]>> wrote:


            On 11/22/19 7:54 PM, Reuven Lax wrote:


            On Fri, Nov 22, 2019 at 10:19 AM Jan Lukavský
            <[email protected] <mailto:[email protected]>> wrote:

                Hi Reuven,

                I didn't investigate that particular one, but
                looking into that now, it looks that is (same as
                the "classic" join library) builds around CoGBK. Is
                that correct? If yes, then it essentially means
                that it:

                 - works only for cases where both sides have the
                same windowfn (that is limitation of Flatten that
                precedes CoGBK)

            Correct. Did you want to join different windows? If so
            what are the semantics? If the lhs has FixedWindows and
            the rhs has SessionWindows, what do you want the join
            semantics to be? The only thing I could imagine would
            be for the user to provide some function telling the
            join how to map the windows together, but that could be
            pretty complicated.
            I don't want to go too far into details, but generally
            both lhs and rhs can be put onto time line and then full
            join can be defined as each pair of (lhs, first
            preceding rhs) and (rhs, first preceding lhs). Then the
            end of window is semantically just clearing the joined
            value (setting it to null, thus at the end of window
            there will be pair (lhs, null) or (null, rhs) in case of
            full outer join). This way any combination of windows is
            possible, because all window does is that it "scopes"
            validity of respective values (lhs, rhs).


        I think it is very valid to hope to do a join in the sense
        of a relational join where it is row-to-row. In this case,
        Beam's concept of windowing may or may not make sense. It is
        just a tool for the job. It is just a grouping key that
        provides a time when state can be deleted. So I would say
        your use case is more global window to global window join.
        That is what I think of as a true stream-to-stream join
        anyhow. You probably don't want to wait forever for output.
        So you'll need to use some knob other than Beam windows or
        triggers.

        Reza has prototyped a join like you describe here:
        https://github.com/apache/beam/pull/9032

        If your join condition explicitly includes the event time
        distance between elements, then it could "just work". If
        that isn't really part of your join condition, then you will
        have to see this restriction as a "knob" that you tweak on
        your results.

                 - when using global window, there has to be
                trigger and (afaik) there is no trigger that would
                guarantee firing after each data element (for early
                panes) (because triggers are there to express
                cost-latency tradeoff, not semantics)


            Can you explain the use case where this matters? If you
            do trigger elementCountAtLeast(1) on the join, then the
            consumer will simply see a continuous stream of
            outputs. I'm not sure I understand why the consumer
            cares that some of those outputs were in a pane that
            really held 3 outputs instead of 1.

            What I'm trying to solve is basically this:

             - lhs is event stream

             - rhs is stream of a "state updates"

            purpose of the join is "take each event, pair it with
            currently valid state and produce output and possibly
            modified state". I cannot process two events at a time,
            because first event can modify the state and the
            subsequent event should see this. It is not a "simple"
            stateful pardo either, because the state can be modified
            externally (not going into too much detail here, but
            e.g. by writing into kafka topic).

        Reuven's explanation is missing some detail. If the CoGBK is
        in discarding mode, then it will miss join results. If the
        CoGBK is in accumulating mode, it will duplicate join
        results. This is a known problem and the general solution is
        retractions.

        Basically, CoGBK-based joins just don't work with triggers
        until we have retractions.

                Moreover, I'd like to define the join semantics so
                that when there are available elements from both
                sides, the fired pane should be ON_TIME, not EARLY.
                That essentially means that the fully general case
                would not be built around (Co)GBK, but stateful
                ParDo. There are specific options where this fully
                general case "degrades" into forms that can be
                efficiently expressed using (Co)GBK, that is true.


            BTW building this around stateful DoFn might be a
            better fit. The main reason I didn't is because we
            would need a good distributed MapState (something
            discussed fairly recently on the list), and that is not
            yet built. Once we had that, I might be inclined to
            rewrite this join on stateful DoFn.
            Yes, the sorted state helps for streaming case. But I'd
            be careful about that for batch case, where this might
            lead to high pressure on the state (and
            InMemoryStateInternals might OOME for instance).

            However can you explain what you are expecting from the
            pane? An EARLY pane simply means that we are producing
            output before the end of the window. If you are in the
            global window triggering every element, then every
            output is EARLY. It might seem weird if you are
            interpreting EARLY as "outputting data that isn't
            ready," however that's not what EARLY is defined to be.
            Any change to the pane semantics would be a major
            breaking change to very fundamental semantics.

            I wonder if you are really objecting to the name EARLY
            and ON_TIME? Maybe we would've been better off tagging
            it BEFORE_WINDOW_END instead of EARLY, to make it clear
            what is meant?

            Essentially I don't object anything here. I'm missing
            solution to the "event vs. state" join described above.
            I was thinking about how to make these types of problems
            more user friendly and it essentially leads to creating
            a somewhat more generic semantics of join, where
            end-of-window is converted into "'value-delete events"
            and then just joining by the "previous" or "valid" value
            (yes, this relates to validity windows mentioned on Beam
            Summit Europe). It actually turns out that with some
            work we could define quite "naturally" a join on two
            streams with global window and no trigger. It would even
            function with lowest latency possible (but yes, with the
            highest expenses, it is actually the introduction of
            (same!) windows that enable certain optimizations). It
            the correctly defines semantics for different windows,
            although the result would be (probably unexpectedly)
            windowed using global window. But that doesn't seem to
            be any breaking change, because it is currently not
            possible (any such pipeline will not be validated).

            Maybe for reference, the unwindowed join would be what
            is described here [1]

            [1]
            
https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Streams+Join+Semantics#KafkaStreamsJoinSemantics-KStream-KTableJoin

                Jan

                On 11/22/19 6:47 PM, Reuven Lax wrote:
                Have you seen the Join library that is part of
                schemas? I'm curious whether this fits your needs,
                or there's something lacking there.

                On Fri, Nov 22, 2019 at 12:31 AM Jan Lukavský
                <[email protected] <mailto:[email protected]>> wrote:

                    Hi,

                    based on roadmap [1], we would like to define
                    and implement a full set
                    of (unified) stream-stream joins. That would
                    include:

                      - joins (left, right, full outer) on global
                    window with "immediate
                    trigger"

                      - joins with different windowing functions
                    on left and right side

                    The approach would be to define these
                    operations in a natural way, so
                    that the definition is aligned with how
                    current joins work (same
                    windows, cartesian product of values with same
                    keys, output timestamp
                    projected to the end of window, etc.). Because
                    this should be a generic
                    approach, this effort should probably be part
                    of join library, that can
                    the be reused by other components, too (e.g. SQL).

                    The question is - is (or was) there any effort
                    that we can build upon?
                    Or should this be designed from scratch?

                    Jan

                    [1] https://beam.apache.org/roadmap/euphoria/

Reply via email to