> One note - some people definitely use timer.withOutputTimestamp as a watermark hold.

Definitely.

> In fact, I do not view a "watermark hold" as a fundamental concept. The act of "set a timer with the intent that I am allowed to produce output with timestamp X" is the fundamental concept, and watermark hold is an implementation detail that should really never have been surfaced as an end-user concept, or really even as an SDK author concept.

Agree that this need not be exposed explicitly, but the given the causality-preserving invariant that elements arriving *before* watermark *must not* leave after watermark I think that .withOutputTimestamp actually defines watermark hold implicitly. I think there is no other valid implementation than to hold output watermark not to cross the output timestamp of any active per-key timer (actually, we could distinguish cases when the timer is set for already late elements, there is no need - or possibility - to hold the watermark).

I'd be also supportive for associating any buffer output timestamp with timer, rather than the buffer itself, as that really feels like a better description of what is *really* going to happen.

This was probably discussed, but I cannot see this in this discussion, what keeps us from setting output timestamp of processing-time timer to something like min(endOfWindow, currentOutputWatermark)? Yes, output watermark is not stable, but anything that is derived from _processing time_ is not stable by definition. For on-time elements, outputWatermark gives an estimation of the current position in event-time, so it makes sense to me to use that. Are there any counter examples?

 Jan

On 1/18/22 21:10, Kenneth Knowles wrote:
Yea, it makes sense. This is an issue for the global window where there isn't automatic cleanup of state. I've had a few user cases where they would like a good way of doing state cleanup in the global window too - something where whenever state gets buffer there is always a finite timer that will fire. There might be an opportunity here, if we attach the hold to that associated timer rather than the state. It sounds similar to what you describe where someone made a timer just to create a watermark hold associated with some state - I assume they actually do need to process and emit that state in some way related to the timer.

On Tue, Jan 18, 2022 at 9:35 AM Reuven Lax <re...@google.com> wrote:

    Correct.

    IIRC originally we didn't want to add "buffered data timestamps"
    because it was error prone. Leaking even one record in state
    holds up the watermark and can cause the entire pipeline to grind
    to a halt. Associating with a timer guarantees that holds are
    always cleared eventually.

    On Tue, Jan 18, 2022 at 9:13 AM Kenneth Knowles <k...@apache.org>
    wrote:

        This is an interesting case, and a legitimate counterexample
        to consider. I'd call it a workaround :-). The semantic thing
        they would want/need is "output timestamp" associated with
        buffered data (also implemented with watermark hold). I do
        know systems that designed their state with this built in.

        Kenn

        On Tue, Jan 18, 2022 at 8:57 AM Reuven Lax <re...@google.com>
        wrote:

            One note - some people definitely use
            timer.withOutputTimestamp as a watermark hold.


            This is a scenario in which one outputs (from
            processElement) a timestamp behind the current input
            element timestamp but knows that it is safe because there
            is already an extent timer with an earlier
            output timestamp (state can be used for this). In this
            case I've seen timers set simply for the hold - the actual
            onTimer never outputs anything.

            Reuven

            On Tue, Jan 18, 2022 at 6:42 AM Kenneth Knowles
            <k...@apache.org> wrote:



                On Tue, Dec 14, 2021 at 2:38 PM Steve Niemitz
                <sniem...@apache.org> wrote:

                    > I think this wouldn't be very robust to
                    different situations where processing time and
                    event time may not be that close to each other.

                    if you do something like `min(endOfWindow,
                    max(eventInputTimestamp,
                    computedFiringTimestamp))` the worst case is that
                    you set a watermark hold for somewhere in the
                    future, right?  For example, if the watermark is
                    lagging 3 hours, processing time = 4pm, event
                    input = 1pm, window end = 5pm, the watermark
                    hold/output time is set to 4pm + T. This would
                    make the timestamps "newer" than the input, but
                    shouldn't ever create late data, correct?

                    Also, imo, the timestamps really already cross
                    domains now, because the watermark (event time) is
                    held until the (processing time) timer fires.

                    The concrete issue that brought this up was a
                    pipeline with some state, and the state was
                    "cleaned up" periodically with a processing time
                    timer that fired every ~hour.  The author of the
                    pipeline was confused why the watermark wasn't
                    moving (and thus GBKs firing, etc).  The root
                    cause was the watermark being held by the timer.

                    > It would just save you
                    .withOutputTimestamp(elementTimestamp) on your
                    calls to setting the event time timer, right?

                    Correct, the main thing I'm trying to solve is
                    having to recalculate an output timestamp using
                    the same logic that the timer itself is using to
                    set its firing timestamp.


                It sounds like the main use case that you are dealing
                with is the case where the timer doesn't actually
                produce output (or set further timers that produce
                output) so it doesn't need (or want) a watermark hold.
                That makes sense.

                In fact, I do not view a "watermark hold" as a
                fundamental concept. The act of "set a timer with the
                intent that I am allowed to produce output with
                timestamp X" is the fundamental concept, and watermark
                hold is an implementation detail that should really
                never have been surfaced as an end-user concept, or
                really even as an SDK author concept. This is why in
                my proposal for adding output timestamps to timers, I
                called it "withOutputTimestamp", and this is why the
                design does not include any watermark holds - there is
                a self-loop on a transform where timers produce an
                input watermark distinct from the watermark on input
                elements, and that is enough. There is not now, and
                never has been, a need for the concept of a hold at
                the level of the Beam model.

                I wonder if we can automate this behavior by noticing
                that there is no OutputReceiver parameters to the
                timer callback, and also transitively. Or just work
                around it by saying ".withoutOutput" on the timer.

                Kenn




                    On Tue, Dec 14, 2021 at 4:10 PM Kenneth Knowles
                    <k...@apache.org> wrote:



                        On Tue, Dec 7, 2021 at 7:27 AM Steve Niemitz
                        <sniem...@apache.org> wrote:

                            If I have a processing time timer, is
                            there any way to automatically set the
                            output timestamp to the timer firing
                            timestamp (similar to how event-time
                            timers work).

                            A common use case would be to do something
                            like:
                            timer.offset(X).align(Y).setRelative()

                            but have the output timestamp be the
                            firing timestamp.  In order to do this now
                            you need to re-calculate the output
                            timestamp (using the same logic as the
                            timer does internally) and manually use
                            withOutputTimestamp.


                        I think this wouldn't be very robust to
                        different situations where processing time and
                        event time may not be that close to each
                        other. In general I'm skeptical of reusing
                        timestamps across time domains, for just this
                        sort of reason. I wouldn't recommend doing
                        this manually either.

                            I'm not sure what the API would look like
                            here, but it would also be nice to allow
                            event-time timers to do the same in
                            reverse (use the element input timestamp
                            rather than the firing timestamp).  Maybe
                            something like
                            `withDefaultOutputTimestampFrom(...)` and
                            an enum of FIRING_TIMESTAMP,
                            ELEMENT_TIMESTAMP?


                        It would just save you
                        .withOutputTimestamp(elementTimestamp) on your
                        calls to setting the event time timer, right?
                        It doesn't work in general because a timer can
                        be set from other OnTimer methods, where there
                        is no "element" per se, but just the output
                        timestamp of the fired timer.

                        Kenn

Reply via email to