On 11/25/19 11:45 PM, Kenneth Knowles wrote:


On Mon, Nov 25, 2019 at 1:56 PM Jan Lukavský <[email protected] <mailto:[email protected]>> wrote:

    Hi Rui,

    > Hi Kenn, you think stateful DoFn based join can emit joined rows
    that never to be retracted because in stateful DoFn case joined
    rows will be controlled by timers and emit will be only once? If
    so I will agree with it. Generally speaking, if only emit once is
    the factor of needing retraction or not.

    that would imply buffering elements up until watermark, then
    sorting and so reduces to the option a) again, is that true? This
    also has to deal with allowed lateness, that would mean, that with
    allowed lateness greater than zero, there can still be multiple
    firings and so retractions are needed.

Specifically, when I say "bi-temporal join" I mean unbounded-to-unbounded join where one of the join conditions is that elements are within event time distance d of one another. An element at time t will be saved until time t + 2d and then garbage collected. Every matching pair can be emitted immediately.

OK, this might simplify things a little. Is there a design doc for that? If there are multiple LHS elements within event time distance from RHS element, which one should be joined? I suppose all of them, but that is not "(time-varying-)relational" join semantics. In that semantics only the last element must be joined, because that is how a (classical) relational database would see the relation at time T (the old record would have been overwritten and not be part of the output). Because of the time distance constraint this is different from the join I have in mind, because that simply joins every LHS element(s) to most recent RHS element(s) and vice versa, without any additional time constraints (that is the RHS "update" can happen arbitrarily far in past).

Jan


In the triggered CoGBK + join-product implementation, you do need retractions as a model concept. But you don't need full support, since they only need to be shipped as deltas and only from the CoGBK to the join-product transform where they are all consumed to create only positive elements. Again a delay is not required; this yields correct results with the "always" trigger.

Neither case requires waiting or time sorting a whole buffer. The bi-temporal join requires something more, in a way, since you need to query by time range and GC time prefixes.

Kenn

    Jan

    On 11/25/19 10:17 PM, Rui Wang wrote:


    On Mon, Nov 25, 2019 at 11:29 AM Jan Lukavský <[email protected]
    <mailto:[email protected]>> wrote:


        On 11/25/19 7:47 PM, Kenneth Knowles wrote:


        On Sun, Nov 24, 2019 at 12:57 AM Jan Lukavský
        <[email protected] <mailto:[email protected]>> wrote:

            I can put down a design document, but before that I need
            to clarify some things for me. I'm struggling to put all
            of this into a bigger picture. Sorry if the arguments
            are circulating, but I didn't notice any proposal of how
            to solve these. If anyone can disprove any of this logic
            it would be very much appreciated as I might be able to
            get from a dead end:

             a) in the bi-temporal join you can either buffer until
            watermark, or emit false data that has to be retracted

        This is not the case. A stateful DoFn based join can emit
        immediately joined rows that will never need to be
        retracted. The need for retractions has to do with
        CoGBK-based implementation of a join.
        I fail to see how this could work. If I emit joined rows
        immediately without waiting for watermark to pass, I can join
        two elements, that don't belong to each other, because later
        can arrive element with lower time distance, that should have
        been joint in the place of the previously emitted one. This
        is wrong result that has to be retracted. Or what I'm missing?


    Hi Kenn, you think stateful DoFn based join can emit joined rows
    that never to be retracted because in stateful DoFn case joined
    rows will be controlled by timers and emit will be only once? If
    so I will agree with it. Generally speaking, if only emit once is
    the factor of needing retraction or not.

    In the past brainstorming, even having retractions ready,
    streaming join with windowing are likely be implemented by a
    style of CoGBK + stateful DoFn.


        I suggest that you work out the definition of the join you
        are interested in, with a good amount of mathematical rigor,
        and then consider the ways you can implement it. That is
        where a design doc will probably clarify things.

        Kenn

             b) until retractions are 100% functional (and that is
            sort of holy grail for now), then the only solution is
            using a buffer holding data up to watermark *and then
            sort by event time*

             c) even if retractions were 100% functional, there
            would have to be special implementation for batch case,
            because otherwise this would simply blow up downstream
            processing with insanely many false additions and
            subsequent retractions

            Property b) means that if we want this feature now, we
            must sort by event time and there is no way around.
            Property c) shows that even in the future, we must make
            (in certain cases) distinction between batch and
            streaming code paths, which seems weird to me, but it
            might be an option. But still, there is no way to
            express this join in batch case, because it would
            require either buffering (up to) whole input on local
            worker (doesn't look like viable option) or provide a
            way in user code to signal the need for ordering of data
            inside GBK (and we are there again :)). Yes, we might
            shift this need from stateful dofn to GBK like

             input.apply(GroupByKey.sorted())

            I cannot find a good reasoning why this would be better
            than giving this semantics to (stateful) ParDo.

            Maybe someone can help me out here?

            Jan

            On 11/24/19 5:05 AM, Kenneth Knowles wrote:
            I don't actually see how event time sorting simplifies
            this case much. You still need to buffer elements until
            they can no longer be matched in the join, and you
            still need to query that buffer for elements that might
            match. The general "bi-temporal join" (without sorting)
            requires one new state type and then it has identical
            API, does not require any novel data structures or
            reasoning, yields better latency (no sort buffer
            delay), and discards less data (no sort buffer cutoff;
            watermark is better). Perhaps a design document about
            this specific case would clarify.

            Kenn

            On Fri, Nov 22, 2019 at 10:08 PM Jan Lukavský
            <[email protected] <mailto:[email protected]>> wrote:

                I didn't want to go too much into detail, but to
                describe the idea roughly (ignoring the problem of
                different window fns on both sides to keep it as
                simple as possible):

                rhs -----  \

                                flatten (on global window) ----
                stateful par do (sorted by event time)  ---- output

                lhs -----  /

                If we can guarantee event time order arrival of
                events into the stateful pardo, then the whole
                complexity reduces to keep current value of left
                and right element and just flush them out each time
                there is an update. That is the "knob" is actually
                when watermark moves, because it is what tells the
                join operation that there will be no more (not
                late) input. This is very, very simplified, but
                depicts the solution. The "classical" windowed join
                reduces to this if all data in each window is
                projected onto window end boundary. Then there will
                be a cartesian product, because all the elements
                have the same timestamp. I can put this into a
                design doc with all the details, I was trying to
                find out if there is or was any effort around this.

                I was in touch with Reza in the PR #9032, I think
                that it currently suffers from problems with
                running this on batch.

                I think I can even (partly) resolve the retraction
                issue (for joins), as described on the thread [1].
                Shortly, there can be two copies of the stateful
                dofn, one running at watermark and the other at
                (watermark - allowed lateness). One would produce
                ON_TIME (maybe wrong) results, the other would
                produce LATE but correct ones. Being able to
                compare them, the outcome would be that it would be
                possible to retract the wrong results.

                Yes, this is also about providing more evidence of
                why I think event-time sorting should be (somehow)
                part of the model. :-)

                Jan

                [1]
                
https://lists.apache.org/thread.html/a736ebd6b0913d2e06dfa54797716d022adf2c45140530d5c3bd538d@%3Cdev.beam.apache.org%3E

                On 11/23/19 5:54 AM, Kenneth Knowles wrote:
                +Mikhail Gryzykhin <mailto:[email protected]> +Rui
                Wang <mailto:[email protected]> +Reza Rokni
                <mailto:[email protected]> who have all done some
                investigations here.


                On Fri, Nov 22, 2019 at 11:48 AM Jan Lukavský
                <[email protected] <mailto:[email protected]>> wrote:


                    On 11/22/19 7:54 PM, Reuven Lax wrote:


                    On Fri, Nov 22, 2019 at 10:19 AM Jan Lukavský
                    <[email protected] <mailto:[email protected]>> wrote:

                        Hi Reuven,

                        I didn't investigate that particular one,
                        but looking into that now, it looks that
                        is (same as the "classic" join library)
                        builds around CoGBK. Is that correct? If
                        yes, then it essentially means that it:

                         - works only for cases where both sides
                        have the same windowfn (that is
                        limitation of Flatten that precedes CoGBK)

                    Correct. Did you want to join different
                    windows? If so what are the semantics? If the
                    lhs has FixedWindows and the rhs has
                    SessionWindows, what do you want the join
                    semantics to be? The only thing I could
                    imagine would be for the user to provide some
                    function telling the join how to map the
                    windows together, but that could be pretty
                    complicated.
                    I don't want to go too far into details, but
                    generally both lhs and rhs can be put onto
                    time line and then full join can be defined as
                    each pair of (lhs, first preceding rhs) and
                    (rhs, first preceding lhs). Then the end of
                    window is semantically just clearing the
                    joined value (setting it to null, thus at the
                    end of window there will be pair (lhs, null)
                    or (null, rhs) in case of full outer join).
                    This way any combination of windows is
                    possible, because all window does is that it
                    "scopes" validity of respective values (lhs, rhs).


                I think it is very valid to hope to do a join in
                the sense of a relational join where it is
                row-to-row. In this case, Beam's concept of
                windowing may or may not make sense. It is just a
                tool for the job. It is just a grouping key that
                provides a time when state can be deleted. So I
                would say your use case is more global window to
                global window join. That is what I think of as a
                true stream-to-stream join anyhow. You probably
                don't want to wait forever for output. So you'll
                need to use some knob other than Beam windows or
                triggers.

                Reza has prototyped a join like you describe here:
                https://github.com/apache/beam/pull/9032

                If your join condition explicitly includes the
                event time distance between elements, then it
                could "just work". If that isn't really part of
                your join condition, then you will have to see
                this restriction as a "knob" that you tweak on
                your results.

                         - when using global window, there has to
                        be trigger and (afaik) there is no
                        trigger that would guarantee firing after
                        each data element (for early panes)
                        (because triggers are there to express
                        cost-latency tradeoff, not semantics)


                    Can you explain the use case where this
                    matters? If you do trigger
                    elementCountAtLeast(1) on the join, then the
                    consumer will simply see a continuous stream
                    of outputs. I'm not sure I understand why the
                    consumer cares that some of those outputs
                    were in a pane that really held 3 outputs
                    instead of 1.

                    What I'm trying to solve is basically this:

                     - lhs is event stream

                     - rhs is stream of a "state updates"

                    purpose of the join is "take each event, pair
                    it with currently valid state and produce
                    output and possibly modified state". I cannot
                    process two events at a time, because first
                    event can modify the state and the subsequent
                    event should see this. It is not a "simple"
                    stateful pardo either, because the state can
                    be modified externally (not going into too
                    much detail here, but e.g. by writing into
                    kafka topic).

                Reuven's explanation is missing some detail. If
                the CoGBK is in discarding mode, then it will miss
                join results. If the CoGBK is in accumulating
                mode, it will duplicate join results. This is a
                known problem and the general solution is retractions.

                Basically, CoGBK-based joins just don't work with
                triggers until we have retractions.

                        Moreover, I'd like to define the join
                        semantics so that when there are
                        available elements from both sides, the
                        fired pane should be ON_TIME, not EARLY.
                        That essentially means that the fully
                        general case would not be built around
                        (Co)GBK, but stateful ParDo. There are
                        specific options where this fully general
                        case "degrades" into forms that can be
                        efficiently expressed using (Co)GBK, that
                        is true.


                    BTW building this around stateful DoFn might
                    be a better fit. The main reason I didn't is
                    because we would need a good distributed
                    MapState (something discussed fairly recently
                    on the list), and that is not yet built. Once
                    we had that, I might be inclined to rewrite
                    this join on stateful DoFn.
                    Yes, the sorted state helps for streaming
                    case. But I'd be careful about that for batch
                    case, where this might lead to high pressure
                    on the state (and InMemoryStateInternals might
                    OOME for instance).

                    However can you explain what you are
                    expecting from the pane? An EARLY pane simply
                    means that we are producing output before the
                    end of the window. If you are in the global
                    window triggering every element, then every
                    output is EARLY. It might seem weird if you
                    are interpreting EARLY as "outputting data
                    that isn't ready," however that's not what
                    EARLY is defined to be. Any change to the
                    pane semantics would be a major breaking
                    change to very fundamental semantics.

                    I wonder if you are really objecting to the
                    name EARLY and ON_TIME? Maybe we would've
                    been better off tagging it BEFORE_WINDOW_END
                    instead of EARLY, to make it clear what is meant?

                    Essentially I don't object anything here. I'm
                    missing solution to the "event vs. state" join
                    described above. I was thinking about how to
                    make these types of problems more user
                    friendly and it essentially leads to creating
                    a somewhat more generic semantics of join,
                    where end-of-window is converted into
                    "'value-delete events" and then just joining
                    by the "previous" or "valid" value (yes, this
                    relates to validity windows mentioned on Beam
                    Summit Europe). It actually turns out that
                    with some work we could define quite
                    "naturally" a join on two streams with global
                    window and no trigger. It would even function
                    with lowest latency possible (but yes, with
                    the highest expenses, it is actually the
                    introduction of (same!) windows that enable
                    certain optimizations). It the correctly
                    defines semantics for different windows,
                    although the result would be (probably
                    unexpectedly) windowed using global window.
                    But that doesn't seem to be any breaking
                    change, because it is currently not possible
                    (any such pipeline will not be validated).

                    Maybe for reference, the unwindowed join would
                    be what is described here [1]

                    [1]
                    
https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Streams+Join+Semantics#KafkaStreamsJoinSemantics-KStream-KTableJoin

                        Jan

                        On 11/22/19 6:47 PM, Reuven Lax wrote:
                        Have you seen the Join library that is
                        part of schemas? I'm curious whether
                        this fits your needs, or there's
                        something lacking there.

                        On Fri, Nov 22, 2019 at 12:31 AM Jan
                        Lukavský <[email protected]
                        <mailto:[email protected]>> wrote:

                            Hi,

                            based on roadmap [1], we would like
                            to define and implement a full set
                            of (unified) stream-stream joins.
                            That would include:

                              - joins (left, right, full outer)
                            on global window with "immediate
                            trigger"

                              - joins with different windowing
                            functions on left and right side

                            The approach would be to define
                            these operations in a natural way, so
                            that the definition is aligned with
                            how current joins work (same
                            windows, cartesian product of values
                            with same keys, output timestamp
                            projected to the end of window,
                            etc.). Because this should be a generic
                            approach, this effort should
                            probably be part of join library,
                            that can
                            the be reused by other components,
                            too (e.g. SQL).

                            The question is - is (or was) there
                            any effort that we can build upon?
                            Or should this be designed from scratch?

                            Jan

                            [1]
                            https://beam.apache.org/roadmap/euphoria/

Reply via email to