Hi Rui,

> Hi Kenn, you think stateful DoFn based join can emit joined rows that never to be retracted because in stateful DoFn case joined rows will be controlled by timers and emit will be only once? If so I will agree with it. Generally speaking, if only emit once is the factor of needing retraction or not.

that would imply buffering elements up until watermark, then sorting and so reduces to the option a) again, is that true? This also has to deal with allowed lateness, that would mean, that with allowed lateness greater than zero, there can still be multiple firings and so retractions are needed.

Jan

On 11/25/19 10:17 PM, Rui Wang wrote:


On Mon, Nov 25, 2019 at 11:29 AM Jan Lukavský <[email protected] <mailto:[email protected]>> wrote:


    On 11/25/19 7:47 PM, Kenneth Knowles wrote:


    On Sun, Nov 24, 2019 at 12:57 AM Jan Lukavský <[email protected]
    <mailto:[email protected]>> wrote:

        I can put down a design document, but before that I need to
        clarify some things for me. I'm struggling to put all of this
        into a bigger picture. Sorry if the arguments are
        circulating, but I didn't notice any proposal of how to solve
        these. If anyone can disprove any of this logic it would be
        very much appreciated as I might be able to get from a dead end:

         a) in the bi-temporal join you can either buffer until
        watermark, or emit false data that has to be retracted

    This is not the case. A stateful DoFn based join can emit
    immediately joined rows that will never need to be retracted. The
    need for retractions has to do with CoGBK-based implementation of
    a join.
    I fail to see how this could work. If I emit joined rows
    immediately without waiting for watermark to pass, I can join two
    elements, that don't belong to each other, because later can
    arrive element with lower time distance, that should have been
    joint in the place of the previously emitted one. This is wrong
    result that has to be retracted. Or what I'm missing?


Hi Kenn, you think stateful DoFn based join can emit joined rows that never to be retracted because in stateful DoFn case joined rows will be controlled by timers and emit will be only once? If so I will agree with it. Generally speaking, if only emit once is the factor of needing retraction or not.

In the past brainstorming, even having retractions ready, streaming join with windowing are likely be implemented by a style of CoGBK + stateful DoFn.


    I suggest that you work out the definition of the join you are
    interested in, with a good amount of mathematical rigor, and then
    consider the ways you can implement it. That is where a design
    doc will probably clarify things.

    Kenn

         b) until retractions are 100% functional (and that is sort
        of holy grail for now), then the only solution is using a
        buffer holding data up to watermark *and then sort by event time*

         c) even if retractions were 100% functional, there would
        have to be special implementation for batch case, because
        otherwise this would simply blow up downstream processing
        with insanely many false additions and subsequent retractions

        Property b) means that if we want this feature now, we must
        sort by event time and there is no way around. Property c)
        shows that even in the future, we must make (in certain
        cases) distinction between batch and streaming code paths,
        which seems weird to me, but it might be an option. But
        still, there is no way to express this join in batch case,
        because it would require either buffering (up to) whole input
        on local worker (doesn't look like viable option) or provide
        a way in user code to signal the need for ordering of data
        inside GBK (and we are there again :)). Yes, we might shift
        this need from stateful dofn to GBK like

         input.apply(GroupByKey.sorted())

        I cannot find a good reasoning why this would be better than
        giving this semantics to (stateful) ParDo.

        Maybe someone can help me out here?

        Jan

        On 11/24/19 5:05 AM, Kenneth Knowles wrote:
        I don't actually see how event time sorting simplifies this
        case much. You still need to buffer elements until they can
        no longer be matched in the join, and you still need to
        query that buffer for elements that might match. The general
        "bi-temporal join" (without sorting) requires one new state
        type and then it has identical API, does not require any
        novel data structures or reasoning, yields better latency
        (no sort buffer delay), and discards less data (no sort
        buffer cutoff; watermark is better). Perhaps a design
        document about this specific case would clarify.

        Kenn

        On Fri, Nov 22, 2019 at 10:08 PM Jan Lukavský
        <[email protected] <mailto:[email protected]>> wrote:

            I didn't want to go too much into detail, but to
            describe the idea roughly (ignoring the problem of
            different window fns on both sides to keep it as simple
            as possible):

            rhs -----  \

                            flatten (on global window) ---- stateful
            par do (sorted by event time)  ---- output

            lhs -----  /

            If we can guarantee event time order arrival of events
            into the stateful pardo, then the whole complexity
            reduces to keep current value of left and right element
            and just flush them out each time there is an update.
            That is the "knob" is actually when watermark moves,
            because it is what tells the join operation that there
            will be no more (not late) input. This is very, very
            simplified, but depicts the solution. The "classical"
            windowed join reduces to this if all data in each window
            is projected onto window end boundary. Then there will
            be a cartesian product, because all the elements have
            the same timestamp. I can put this into a design doc
            with all the details, I was trying to find out if there
            is or was any effort around this.

            I was in touch with Reza in the PR #9032, I think that
            it currently suffers from problems with running this on
            batch.

            I think I can even (partly) resolve the retraction issue
            (for joins), as described on the thread [1]. Shortly,
            there can be two copies of the stateful dofn, one
            running at watermark and the other at (watermark -
            allowed lateness). One would produce ON_TIME (maybe
            wrong) results, the other would produce LATE but correct
            ones. Being able to compare them, the outcome would be
            that it would be possible to retract the wrong results.

            Yes, this is also about providing more evidence of why I
            think event-time sorting should be (somehow) part of the
            model. :-)

            Jan

            [1]
            
https://lists.apache.org/thread.html/a736ebd6b0913d2e06dfa54797716d022adf2c45140530d5c3bd538d@%3Cdev.beam.apache.org%3E

            On 11/23/19 5:54 AM, Kenneth Knowles wrote:
            +Mikhail Gryzykhin <mailto:[email protected]> +Rui Wang
            <mailto:[email protected]> +Reza Rokni
            <mailto:[email protected]> who have all done some
            investigations here.


            On Fri, Nov 22, 2019 at 11:48 AM Jan Lukavský
            <[email protected] <mailto:[email protected]>> wrote:


                On 11/22/19 7:54 PM, Reuven Lax wrote:


                On Fri, Nov 22, 2019 at 10:19 AM Jan Lukavský
                <[email protected] <mailto:[email protected]>> wrote:

                    Hi Reuven,

                    I didn't investigate that particular one, but
                    looking into that now, it looks that is (same
                    as the "classic" join library) builds around
                    CoGBK. Is that correct? If yes, then it
                    essentially means that it:

                     - works only for cases where both sides have
                    the same windowfn (that is limitation of
                    Flatten that precedes CoGBK)

                Correct. Did you want to join different windows?
                If so what are the semantics? If the lhs has
                FixedWindows and the rhs has SessionWindows, what
                do you want the join semantics to be? The only
                thing I could imagine would be for the user to
                provide some function telling the join how to map
                the windows together, but that could be pretty
                complicated.
                I don't want to go too far into details, but
                generally both lhs and rhs can be put onto time
                line and then full join can be defined as each pair
                of (lhs, first preceding rhs) and (rhs, first
                preceding lhs). Then the end of window is
                semantically just clearing the joined value
                (setting it to null, thus at the end of window
                there will be pair (lhs, null) or (null, rhs) in
                case of full outer join). This way any combination
                of windows is possible, because all window does is
                that it "scopes" validity of respective values
                (lhs, rhs).


            I think it is very valid to hope to do a join in the
            sense of a relational join where it is row-to-row. In
            this case, Beam's concept of windowing may or may not
            make sense. It is just a tool for the job. It is just a
            grouping key that provides a time when state can be
            deleted. So I would say your use case is more global
            window to global window join. That is what I think of
            as a true stream-to-stream join anyhow. You probably
            don't want to wait forever for output. So you'll need
            to use some knob other than Beam windows or triggers.

            Reza has prototyped a join like you describe here:
            https://github.com/apache/beam/pull/9032

            If your join condition explicitly includes the event
            time distance between elements, then it could "just
            work". If that isn't really part of your join
            condition, then you will have to see this restriction
            as a "knob" that you tweak on your results.

                     - when using global window, there has to be
                    trigger and (afaik) there is no trigger that
                    would guarantee firing after each data element
                    (for early panes) (because triggers are there
                    to express cost-latency tradeoff, not semantics)


                Can you explain the use case where this matters?
                If you do trigger elementCountAtLeast(1) on the
                join, then the consumer will simply see a
                continuous stream of outputs. I'm not sure I
                understand why the consumer cares that some of
                those outputs were in a pane that really held 3
                outputs instead of 1.

                What I'm trying to solve is basically this:

                 - lhs is event stream

                 - rhs is stream of a "state updates"

                purpose of the join is "take each event, pair it
                with currently valid state and produce output and
                possibly modified state". I cannot process two
                events at a time, because first event can modify
                the state and the subsequent event should see this.
                It is not a "simple" stateful pardo either, because
                the state can be modified externally (not going
                into too much detail here, but e.g. by writing into
                kafka topic).

            Reuven's explanation is missing some detail. If the
            CoGBK is in discarding mode, then it will miss join
            results. If the CoGBK is in accumulating mode, it will
            duplicate join results. This is a known problem and the
            general solution is retractions.

            Basically, CoGBK-based joins just don't work with
            triggers until we have retractions.

                    Moreover, I'd like to define the join
                    semantics so that when there are available
                    elements from both sides, the fired pane
                    should be ON_TIME, not EARLY. That essentially
                    means that the fully general case would not be
                    built around (Co)GBK, but stateful ParDo.
                    There are specific options where this fully
                    general case "degrades" into forms that can be
                    efficiently expressed using (Co)GBK, that is true.


                BTW building this around stateful DoFn might be a
                better fit. The main reason I didn't is because we
                would need a good distributed MapState (something
                discussed fairly recently on the list), and that
                is not yet built. Once we had that, I might be
                inclined to rewrite this join on stateful DoFn.
                Yes, the sorted state helps for streaming case. But
                I'd be careful about that for batch case, where
                this might lead to high pressure on the state (and
                InMemoryStateInternals might OOME for instance).

                However can you explain what you are expecting
                from the pane? An EARLY pane simply means that we
                are producing output before the end of the window.
                If you are in the global window triggering every
                element, then every output is EARLY. It might seem
                weird if you are interpreting EARLY as "outputting
                data that isn't ready," however that's not what
                EARLY is defined to be. Any change to the pane
                semantics would be a major breaking change to very
                fundamental semantics.

                I wonder if you are really objecting to the name
                EARLY and ON_TIME? Maybe we would've been better
                off tagging it BEFORE_WINDOW_END instead of EARLY,
                to make it clear what is meant?

                Essentially I don't object anything here. I'm
                missing solution to the "event vs. state" join
                described above. I was thinking about how to make
                these types of problems more user friendly and it
                essentially leads to creating a somewhat more
                generic semantics of join, where end-of-window is
                converted into "'value-delete events" and then just
                joining by the "previous" or "valid" value (yes,
                this relates to validity windows mentioned on Beam
                Summit Europe). It actually turns out that with
                some work we could define quite "naturally" a join
                on two streams with global window and no trigger.
                It would even function with lowest latency possible
                (but yes, with the highest expenses, it is actually
                the introduction of (same!) windows that enable
                certain optimizations). It the correctly defines
                semantics for different windows, although the
                result would be (probably unexpectedly) windowed
                using global window. But that doesn't seem to be
                any breaking change, because it is currently not
                possible (any such pipeline will not be validated).

                Maybe for reference, the unwindowed join would be
                what is described here [1]

                [1]
                
https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Streams+Join+Semantics#KafkaStreamsJoinSemantics-KStream-KTableJoin

                    Jan

                    On 11/22/19 6:47 PM, Reuven Lax wrote:
                    Have you seen the Join library that is part
                    of schemas? I'm curious whether this fits
                    your needs, or there's something lacking there.

                    On Fri, Nov 22, 2019 at 12:31 AM Jan Lukavský
                    <[email protected] <mailto:[email protected]>> wrote:

                        Hi,

                        based on roadmap [1], we would like to
                        define and implement a full set
                        of (unified) stream-stream joins. That
                        would include:

                          - joins (left, right, full outer) on
                        global window with "immediate
                        trigger"

                          - joins with different windowing
                        functions on left and right side

                        The approach would be to define these
                        operations in a natural way, so
                        that the definition is aligned with how
                        current joins work (same
                        windows, cartesian product of values with
                        same keys, output timestamp
                        projected to the end of window, etc.).
                        Because this should be a generic
                        approach, this effort should probably be
                        part of join library, that can
                        the be reused by other components, too
                        (e.g. SQL).

                        The question is - is (or was) there any
                        effort that we can build upon?
                        Or should this be designed from scratch?

                        Jan

                        [1] https://beam.apache.org/roadmap/euphoria/

Reply via email to