Hi Reuven,

> Your first statement is partially true, but the second statement doesn't follow from that. Stateful DoFn is in some sense a more general transform, yes. However that doesn't mean that semantics should be defined in terms of stateful DoFn. There are other ways of implementing GroupByKey, and it's far from clear that stateful DoFn is always the best way.

These are two independent things - how semantics (correctness) is defined and how transform is implemented. I'm not suggesting implement GBK on top of stateful dofn (this might be default, but runners will override it to provide more efficient implementation). There is the same relation between Combine and GBK -> semantics of GBK are equal to GBK + Combine.groupedValues() (ParDo), but that is not how you want to implement and run it, because there exist more efficient implementation. The same holds true for stateful dofn -> GBK.

> For example, batch runners never implement GroupByKey on top of state.

This is just another kind of optimization that follows from the fact, that batch sources can be re-read and so state can be held locally and recomputed on failures. This is just optimization that follows from specific conditions in batch case and doesn't affect semantics.

> Even in streaming, the current released Beam does not have sufficient functionality in Stateful DoFn to properly implement GroupByKey. You would need watermark holds for instance (now added to Beam, but not yet released). To implement things somewhat efficiently you would also need dynamic states, and Beam currently supports only static state tags (hopefully dynamic is coming soon)

This is true, but it only shows that these features are actually missing in stateful dofn. Another example is missing support for merging windows.

> This is a valid point, but also problematic. The watermark cannot work when element times move backwards (partially because the watermark is defined to be monotonic). Usually such pipelines end up being restricted to using non-watermark techniques for aggregation - i.e. processing-time triggers or state+timers.

Two questions here:
 a) why not to recompute watermark when we reassign event time?
 b) why state+timers should be non-watermark approach? (event-time) timers AFAIK work on watermark.

Jan

On 1/23/20 10:00 AM, Reuven Lax wrote:


On Wed, Jan 22, 2020 at 11:37 PM Jan Lukavský <je...@seznam.cz <mailto:je...@seznam.cz>> wrote:

    Hi Kenn,

    I do not agree with the last part. We are talking about definition
    of semantics. If GBK can be implemented on top of stateful dofn,
    then stateful dofn is the more generic transform. Therefore,
    semantics should be defined on this transform, and _derived_ (or
    transferred) to the less generic ones.


Your first statement is partially true, but the second statement doesn't follow from that. Stateful DoFn is in some sense a more general transform, yes. However that doesn't mean that semantics should be defined in terms of stateful DoFn. There are other ways of implementing GroupByKey, and it's far from clear that stateful DoFn is always the best way. For example, batch runners never implement GroupByKey on top of state. Even in streaming, the current released Beam does not have sufficient functionality in Stateful DoFn to properly implement GroupByKey. You would need watermark holds for instance (now added to Beam, but not yet released). To implement things somewhat efficiently you would also need dynamic states, and Beam currently supports only static state tags (hopefully dynamic is coming soon)

    If you execute GBK as a stateful dofn or not (probably not) is
    just a runtime optimization (these optimizations are possible due
    to discrete - and predictable - movements of time defined by
    triggers). But semantics should adhere to the generic definition
    and not be affected by runtime optimizations.

    Last remark, yes, if we disallow moving element's timestamp to the
    past, then we don't need window.minTimestamp, because the
    minTimestamp is the defined implicitly by window open time. It
    opens a question if a droppable element should or should not be
    dropped not only when arriving too late after window close, but if
    arriving too late after window open.

    But disallowing timestamp to move back in time seems impractical,
    because I can imagine source assigning elements ingestion time
    timestamps (e.g. kafka by default), which are later remapped to
    event time in user code. That will necessarily mean moving time
    backwards.

This is a valid point, but also problematic. The watermark cannot work when element times move backwards (partially because the watermark is defined to be monotonic). Usually such pipelines end up being restricted to using non-watermark techniques for aggregation - i.e. processing-time triggers or state+timers.

    Jan

    On 1/22/20 11:53 PM, Kenneth Knowles wrote:
    Had a lunch chat about this issue.

    Moving elements back in time can make them late or droppable. You
    just can't really do it safely.

    Moving elements into the future is fine up to the end of the
    window. It is not safe to move further. The watermark for a
    PCollection is based on the element timestamps. If an element's
    timestamp is in the future, the watermark can advance to that
    point in the future. This may cause the watermark to expire the
    window. So this can also make data late or droppable.

    It is actually not true that GBK is based on stateful DoFn. That
    is one way to implement it, but not the only way nor always the
    best way. They are qualitatively different.

    Kenn

    On Wed, Jan 22, 2020 at 1:52 AM Jan Lukavský <je...@seznam.cz
    <mailto:je...@seznam.cz>> wrote:

        I sense this discussion might be (remotely) related to [1]
        (and especially [2]). The common ground here is that we need
        a sound definition of window. I think people might be
        currently having different definitions, which leads to this
        sort of misunderstandings. The definition should be created
        in terms of stateful dofn (not GBK, which might probably be
        the case today), because that is the most low level
        transform, all the others are being built upon it. Looking at
        this with this optics, it seems that window actually scopes
        state of stateful dofn. The scope can be:

         (a) one sided (having only defined max timestamp)

         (b) both sided (having minimum and maximum)

        We have currently approach (a), which results in ability to
        move timestamp *arbitrarily far to the past*, which moving
        timestamp to future is limited by window's maxTimestamp. If
        we extend this to (b), then windowFn starts to create
        something like universe (actually multiverse, because it can
        return multiple windows). It should be invalid for element to
        escape its universe, that would be counter intuitive. If we
        disallow emission of data elements that are _late even when
        created_ (i.e. are emitted with timestamp less than output
        watermark) and we disallow setting timers with timestamp
        higher than window.maxTimestamp (which we currently do), then
        we have disallowed any element to escape its window
        (universe, range of validity). It would also require the
        output watermark of stateful dofn to be keyed and set to at
        least window.minTimestamp when window is opened. This would
        remove a sort of asymmetry (why to know maxTimestamp and not
        minTimestamp?). Also note that (a) is equal to (b) if and
        only if we disallow shifting time to past.

        Jan

        [1]
        
https://lists.apache.org/thread.html/c37dfb6c545fba7d794a13c507dccebb654bbd8b317dab748a6775dc%40%3Cdev.beam.apache.org%3E

        [2]
        
https://lists.apache.org/thread.html/r7f38860557d6571869e8e0989275f6ed610cf8c99b2f56fc6418a1d1%40%3Cdev.beam.apache.org%3E

        On 1/21/20 10:08 PM, Ankur Goenka wrote:


        On Thu, Jan 16, 2020 at 9:52 PM Kenneth Knowles
        <k...@apache.org <mailto:k...@apache.org>> wrote:



            On Thu, Jan 16, 2020 at 11:38 AM Robert Bradshaw
            <rober...@google.com <mailto:rober...@google.com>> wrote:

                On Thu, Jan 16, 2020 at 11:00 AM Kenneth Knowles
                <k...@apache.org <mailto:k...@apache.org>> wrote:
                >
                > IIRC in Java it is forbidden to output an element
                with a timestamp outside its current window.

                I don't think this is checked anywhere. (Not sure
                how you would check
                it, as there's not generic window containment
                function--I suppose you
                could check if it's past the end of the window (and
                of course skew
                limits how far you can go back). I suppose you could
                try re-windowing
                and then fail if it didn't agree with what was
                already there.


            I think you are right. This is governed by how a runner
            invoked utilities from runners-core (output ultimately
            reaches this point without validation:
            
https://github.com/apache/beam/blob/master/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java#L258)

                > An exception is outputs from @FinishBundle, where
                the output timestamp is required and the window is
                applied. TBH it seems more of an artifact of a
                mismatch between the pre-windowing and
                post-windowing worlds.

                Elements are always in some window, even if just the
                global window.


            I mean that the existence of a
            window-unaware @FinishBundle method is an artifact of
            the method existing prior to windowing as a concept. The
            idea that a user can use a DoFn's local variables to
            buffer stuff and then output in @FinishBundle predates
            the existence of windowing.

                > Most of the time, mixing processing across windows
                is simply wrong. But there are fears that calling
                @FinishBundle once per window would be a performance
                problem. On the other hand, don't most correct
                implementations have to separate processing for each
                window anyhow?

                Processing needs to be done per window iff the
                result depends on the
                window or if there are side effects.

                > Anyhow I think the Java behavior is better, so
                window assignment happens exactly and only at window
                transforms.

                But then one ends up with timestamps that are
                unrelated to the windows, right?


            As far as the model goes, I think windows provide an
            upper bound but not a lower bound. If we take the
            approach that windows are a "secondary key with a max
            timestamp" then the timestamps should be related to the
            window in the sense that they are <= the window's max
            timestamp.

        A window only makes sense when a trigger or timer is fired.
        And the timestamp of the elements in the window should be
        within the window's time range when a trigger is set. For
        consistency, I think element timestamp should remain within
        the corresponding time range at every stage of the graph.
        IIUC based on the discussion, users can violate this
        requirement easily in the pipeline code which might give
        inconsistent behavior across runners.

        I think we should stick to a consistent behavior across
        languages and runners. We have multiple options here like
        1. Don't have any promised correlation between element
        timestamp and window. Window will just behave like a
        secondary key for the element.
        2. Making it explicit that the last window function can be
        applied out of order anytime on the elements.
        3. Not letting users change the timestamp without applying a
        windowing function after the changed timestamp and before a
        trigger. Though, this can only be validated at the runtime
        in python.
        4. Revalidating the window after changing the timestamp.
        Also provide additional methods to explicitly change the
        timestamp and window in oneshot.
        5. etc....


            Kenn

                > Kenn
                >
                > On Wed, Jan 15, 2020 at 4:59 PM Ankur Goenka
                <goe...@google.com <mailto:goe...@google.com>> wrote:
                >>
                >> The case where a plan vanilla value or a windowed
                value is emitted seems as expected as the user
                intent is honored without any surprises.
                >>
                >> If I understand correctly in the case when
                timestamp is changed then applying window function
                again can have unintended behavior in following cases
                >> * Custom windows: User code can be executed in
                unintended order.
                >> * User emit a windowed value in a previous
                transform: Timestamping the value in this case would
                overwrite the user assigned window in earlier step
                even when the actual timestamp is the same.
                Semantically, emitting an element or a timestamped
                value with the same timestamp should have the same
                behaviour.
                >>
                >> What do you think?
                >>
                >>
                >> On Wed, Jan 15, 2020 at 4:04 PM Robert Bradshaw
                <rober...@google.com <mailto:rober...@google.com>>
                wrote:
                >>>
                >>> If an element is emitted with a timestamp, the
                window assignment is
                >>> re-applied at that time. At least that's how it
                is in Python. You can
                >>> emit the full windowed value (accepted without
                checking...), a
                >>> timestamped value (in which case the window will
                be computed), or a
                >>> plain old element (in which case the window and
                timestamp will be
                >>> computed (really, propagated)).
                >>>
                >>> On Wed, Jan 15, 2020 at 3:51 PM Ankur Goenka
                <goe...@google.com <mailto:goe...@google.com>> wrote:
                >>> >
                >>> > Yup, This might result in unintended behavior
                as timestamp is changed after the window assignment
                as elements in windows do not have timestamp in the
                window time range.
                >>> >
                >>> > Shall we start validating atleast one window
                assignment between timestamp assignment and
                GBK/triggers to avoid unintended behaviors mentioned
                above?
                >>> >
                >>> > On Wed, Jan 15, 2020 at 1:24 PM Luke Cwik
                <lc...@google.com <mailto:lc...@google.com>> wrote:
                >>> >>
                >>> >> Window assignment happens at the point in the
                pipeline the WindowInto transform was applied. So in
                this case the window would have been assigned using
                the original timestamp.
                >>> >>
                >>> >> Grouping is by key and window.
                >>> >>
                >>> >> On Tue, Jan 14, 2020 at 7:30 PM Ankur Goenka
                <goe...@google.com <mailto:goe...@google.com>> wrote:
                >>> >>>
                >>> >>> Hi,
                >>> >>>
                >>> >>> I am not sure about the effect of the order
                of element timestamp change and window association
                has on a group by key.
                >>> >>> More specifically, what would be the
                behavior if we apply window -> change element
                timestamp -> Group By key.
                >>> >>> I think we should always apply window
                function after changing the timestamp of elements.
                Though this is neither checked nor a recommended
                practice in Beam.
                >>> >>>
                >>> >>> Example pipeline would look like this:
                >>> >>>
                >>> >>>       def applyTimestamp(value):
                >>> >>>  return window.TimestampedValue((key,
                value), int(time.time())
                >>> >>>
                >>> >>>         p \
                >>> >>>  | 'Create' >> beam.Create(range(0, 10)) \
                >>> >>>  | 'Fixed Window' >>
                beam.WindowInto(window.FixedWindows(5)) \
                >>> >>>  | 'Apply Timestamp' >>
                beam.Map(applyTimestamp) \ # Timestamp is changed
                after windowing and before GBK
                >>> >>>  | 'Group By Key' >> beam.GroupByKey() \
                >>> >>>  | 'Print' >> beam.Map(print)
                >>> >>>
                >>> >>> Thanks,
                >>> >>> Ankur

Reply via email to