On 2/27/24 14:51, Kenneth Knowles wrote:
I very much like the idea of processing time clock as a parameter to @ProcessElement. That will be obviously useful and remove a source of inconsistency, in addition to letting the runner/SDK harness control it. I also like the idea of passing a Sleeper or to @ProcessElement. These are both good practices for testing and flexibility and runner/SDK language differences.

In your (a) (b) (c) can you be more specific about which watermarks you are referring to? Are they the same as in my opening email? If so, then what you describe is what we already have.
Yes, we have that for streaming, but it does not work this way in batch. In my understanding we violate (a), we ignore (b) because we fire timers at GC time only and (c) is currently relevant only immediately preceding window GC time, but can be defined more generally. But essentially yes, I was just trying to restate the streaming processing time semantics in the limited batch case.

Kenn

On Tue, Feb 27, 2024 at 2:40 AM Jan Lukavský <je...@seznam.cz> wrote:

    I think that before we introduce a possibly somewhat duplicate new
    feature we should be certain that it is really semantically
    different. I'll rephrase the two cases:

     a) need to wait and block data (delay) - the use case is the
    motivating example of Throttle transform

     b) act without data, not block

    Provided we align processing time with local machine clock (or
    better, because of testing, make current processing time available
    via context to @ProcessElement) it seems to possble to unify both
    cases under slightly updated semantics of processing time timer in
    batch:

     a) processing time timers fire with best-effort, i.e. trying to
    minimize delay between firing timestamp and timer's timestamp
     b) timer is valid only in the context of current key-window, once
    watermark passes window GC time for the particular window that
    created the timer, it is ignored
     c) if timer has output timestamp, this timestamp holds watermark
    (but this is currently probably noop, because runners currently do
    no propagate (per-key) watermark in batch, I assume)

    In case b) there might be needed to distinguish cases when timer
    has output timestamp, if so, it probably should be taken into account.

    Now, such semantics should be quite aligned with what we do in
    streaming case and what users generally expect. The blocking part
    can be implemented in @ProcessElement using buffer & timer, once
    there is need to wait, it can be implemented in user code using
    plain sleep(). That is due to the alignment between local time and
    definition of processing time. If we had some reason to be able to
    run faster-than-wall-clock (as I'm still not in favor of that), we
    could do that using ProcessContext.sleep(). Delaying processing in
    the @ProcessElement should result in backpressuring and
    backpropagation of this backpressure from the Throttle transform
    to the sources as mentioned (of course this is only for the
    streaming case).

    Is there anything missing in such definition that would still
    require splitting the timers into two distinct features?

     Jan

    On 2/26/24 21:22, Kenneth Knowles wrote:
    Yea I like DelayTimer, or SleepTimer, or WaitTimer or some such.

    OutputTime is always an event time timestamp so it isn't even
    allowed to be set outside the window (or you'd end up with an
    element assigned to a window that it isn't within, since
    OutputTime essentially represents reserving the right to output
    an element with that timestamp)

    Kenn

    On Mon, Feb 26, 2024 at 3:19 PM Robert Burke <rob...@frantil.com>
    wrote:

        Agreed that a retroactive behavior change would be bad, even
        if tied to a beam version change. I agree that it meshes well
        with the general theme of State & Timers exposing underlying
        primitives for implementing Windowing and similar. I'd say
        the distinction between the two might be additional
        complexity for users to grok, and would need to be documented
        well, as both operate in the ProcessingTime domain, but
        differently.

        What to call this new timer then? DelayTimer?

        "A DelayTimer sets an instant in ProcessingTime at which
        point computations can continue. Runners will prevent the
        EventTimer watermark from advancing past the set OutputTime
        until Processing Time has advanced to at least the provided
        instant to execute the timers callback. This can be used to
        allow the runner to constrain pipeline throughput with user
        guidance."

        I'd probably add that a timer with an output time outside of
        the window would not be guaranteed to fire, and that
        OnWindowExpiry is the correct way to ensure cleanup occurs.

        No solution to the Looping Timers on Drain problem here, but
        i think that's ultimately an orthogonal discussion, and will
        restrain my thoughts on that for now.

        This isn't a proposal, but exploring the solution space
        within our problem. We'd want to break down exactly what
        different and the same for the 3 kinds of timers...




        On Mon, Feb 26, 2024, 11:45 AM Kenneth Knowles
        <k...@apache.org> wrote:

            Pulling out focus points:

            On Fri, Feb 23, 2024 at 7:21 PM Robert Bradshaw via dev
            <dev@beam.apache.org> wrote:
            > I can't act on something yet [...] but I expect to be
            able to [...] at some time in the processing-time future.

            I like this as a clear and internally-consistent feature
            description. It describes ProcessContinuation and those
            timers which serve the same purpose as ProcessContinuation.

            On Fri, Feb 23, 2024 at 7:21 PM Robert Bradshaw via dev
            <dev@beam.apache.org> wrote:
            > I can't think of a batch or streaming scenario where it
            would be correct to not wait at least that long

            The main reason we created timers: to take action in the
            absence of data. The archetypal use case for processing
            time timers was/is "flush data from state if it has been
            sitting there too long". For this use case, the right
            behavior for batch is to skip the timer. It is actually
            basically incorrect to wait.

            On Fri, Feb 23, 2024 at 3:54 PM Robert Burke
            <lostl...@apache.org> wrote:
            > It doesn't require a new primitive.

            IMO what's being proposed *is* a new primitive. I think
            it is a good primitive. It is the underlying primitive to
            ProcessContinuation. It would be user-friendly as a kind
            of timer. But if we made this the behavior of processing
            time timers retroactively, it would break everyone using
            them to flush data who is also reprocessing data.

            There's two very different use cases ("I need to wait,
            and block data" vs "I want to act without data, aka NOT
            wait for data") and I think we should serve both of them,
            but it doesn't have to be with the same low-level feature.

            Kenn


            On Fri, Feb 23, 2024 at 7:21 PM Robert Bradshaw via dev
            <dev@beam.apache.org> wrote:

                On Fri, Feb 23, 2024 at 3:54 PM Robert Burke
                <lostl...@apache.org> wrote:
                >
                > While I'm currently on the other side of the fence,
                I would not be against changing/requiring the
                semantics of ProcessingTime constructs to be "must
                wait and execute" as such a solution, and enables the
                Proposed "batch" process continuation throttling
                mechanism to work as hypothesized for both "batch"
                and "streaming" execution.
                >
                > There's a lot to like, as it leans Beam further
                into the unification of Batch and Stream, with one
                fewer exception (eg. unifies timer experience
                further). It doesn't require a new primitive. It
                probably matches more with user expectations anyway.
                >
                > It does cause looping timer execution with
                processing time to be a problem for Drains however.

                I think we have a problem with looping timers plus
                drain (a mostly
                streaming idea anyway) regardless.

                > I'd argue though that in the case of a drain, we
                could updated the semantics as "move watermark to
                infinity"  "existing timers are executed, but new
                timers are ignored",

                I don't like the idea of dropping timers for drain. I
                think correct
                handling here requires user visibility into whether a
                pipeline is
                draining or not.

                > and ensure/and update the requirements around
                OnWindowExpiration callbacks to be a bit more
                insistent on being implemented for correct execution,
                which is currently the only "hard" signal to the SDK
                side that the window's work is guaranteed to be over,
                and remaining state needs to be addressed by the
                transform or be garbage collected. This remains
                critical for developing a good pattern for
                ProcessingTime timers within a Global Window too.

                +1

                >
                > On 2024/02/23 19:48:22 Robert Bradshaw via dev wrote:
                > > Thanks for bringing this up.
                > >
                > > My position is that both batch and streaming
                should wait for
                > > processing time timers, according to local time
                (with the exception of
                > > tests that can accelerate this via faked clocks).
                > >
                > > Both ProcessContinuations delays and
                ProcessingTimeTimers are IMHO
                > > isomorphic, and can be implemented in terms of
                each other (at least in
                > > one direction, and likely the other). Both are an
                indication that I
                > > can't act on something yet due to external
                constraints (e.g. not all
                > > the data has been published, or I lack sufficient
                capacity/quota to
                > > push things downstream) but I expect to be able
                to (or at least would
                > > like to check again) at some time in the
                processing-time future. I
                > > can't think of a batch or streaming scenario
                where it would be correct
                > > to not wait at least that long (even in batch
                inputs, e.g. suppose I'm
                > > tailing logs and was eagerly started before they
                were fully written,
                > > or waiting for some kind of (non-data-dependent)
                quiessence or other
                > > operation to finish).
                > >
                > >
                > > On Fri, Feb 23, 2024 at 12:36 AM Jan Lukavský
                <je...@seznam.cz> wrote:
                > > >
                > > > For me it always helps to seek analogy in our
                physical reality. Stream
                > > > processing actually has quite a good analogy
                for both event-time and
                > > > processing-time - the simplest model for this
                being relativity theory.
                > > > Event-time is the time at which events occur
                _at distant locations_. Due
                > > > to finite and invariant speed of light (which
                is actually really
                > > > involved in the explanation why any stream
                processing is inevitably
                > > > unordered) these events are observed
                (processed) at different times
                > > > (processing time, different for different
                observers). It is perfectly
                > > > possible for an observer to observe events at a
                rate that is higher than
                > > > one second per second. This also happens in
                reality for observers that
                > > > travel at relativistic speeds (which might be
                an analogy for fast -
                > > > batch - (re)processing). Besides the invariant
                speed, there is also
                > > > another invariant - local clock (wall time)
                always ticks exactly at the
                > > > rate of one second per second, no matter what.
                It is not possible to
                > > > "move faster or slower" through (local) time.
                > > >
                > > > In my understanding the reason why we do not
                put any guarantees or
                > > > bounds on the delay of firing processing time
                timers is purely technical
                > > > - the processing is (per key) single-threaded,
                thus any timer has to
                > > > wait before any element processing finishes.
                This is only consequence of
                > > > a technical solution, not something fundamental.
                > > >
                > > > Having said that, my point is that according to
                the above analogy, it
                > > > should be perfectly fine to fire processing
                time timers in batch based
                > > > on (local wall) time only. There should be no
                way of manipulating this
                > > > local time (excluding tests). Watermarks should
                be affected the same way
                > > > as any buffering in a state that would happen
                in a stateful DoFn (i.e.
                > > > set timer holds output watermark). We should
                probably pay attention to
                > > > looping timers, but it seems possible to define
                a valid stopping
                > > > condition (input watermark at infinity).
                > > >
                > > >   Jan
                > > >
                > > > On 2/22/24 19:50, Kenneth Knowles wrote:
                > > > > Forking this thread.
                > > > >
                > > > > The state of processing time timers in this
                mode of processing is not
                > > > > satisfactory and is discussed a lot but we
                should make everything
                > > > > explicit.
                > > > >
                > > > > Currently, a state and timer DoFn has a
                number of logical watermarks:
                > > > > (apologies for fixed width not coming through
                in email lists). Treat
                > > > > timers as a back edge.
                > > > >
                > > > > input --(A)----(C)--> ParDo(DoFn) ----(D)--->
                output
                > > > >             ^               |
                > > > > |--(B)-----------------|
                > > > >       timers
                > > > >
                > > > > (A) Input Element watermark: this is the
                watermark that promises there
                > > > > is no incoming element with a timestamp
                earlier than it. Each input
                > > > > element's timestamp holds this watermark.
                Note that *event time timers
                > > > > firing is according to this watermark*. But a
                runner commits changes
                > > > > to this watermark *whenever it wants*, in a
                way that can be
                > > > > consistent. So the runner can absolute
                process *all* the elements
                > > > > before advancing the watermark (A), and only
                afterwards start firing
                > > > > timers.
                > > > >
                > > > > (B) Timer watermark: this is a watermark that
                promises no timer is set
                > > > > with an output timestamp earlier than it.
                Each timer that has an
                > > > > output timestamp holds this watermark. Note
                that timers can set new
                > > > > timers, indefinitely, so this may never reach
                infinity even in a drain
                > > > > scenario.
                > > > >
                > > > > (C) (derived) total input watermark: this is
                a watermark that is the
                > > > > minimum of the two above, and ensures that
                all state for the DoFn for
                > > > > expired windows can be GCd after calling
                @OnWindowExpiration.
                > > > >
                > > > > (D) output watermark: this is a promise that
                the DoFn will not output
                > > > > earlier than the watermark. It is held by the
                total input watermark.
                > > > >
                > > > > So a any timer, processing or not, holds the
                total input watermark
                > > > > which prevents window GC, hence the timer
                must be fired. You can set
                > > > > timers without a timestamp and they will not
                hold (B) hence not hold
                > > > > the total input / GC watermark (C). Then if a
                timer fires for an
                > > > > expired window, it is ignored. But in general
                a timer that sets an
                > > > > output timestamp is saying that it may
                produce output, so it *must* be
                > > > > fired, even in batch, for data integrity.
                There was a time before
                > > > > timers had output timestamps that we said
                that you *always* have to
                > > > > have an @OnWindowExpiration callback for data
                integrity, and
                > > > > processing time timers could not hold the
                watermark. That is changed now.
                > > > >
                > > > > One main purpose of processing time timers in
                streaming is to be a
                > > > > "timeout" for data buffered in state, to
                eventually flush. In this
                > > > > case the output timestamp should be the
                minimum of the elements in
                > > > > state (or equivalent). In batch, of course,
                this kind of timer is not
                > > > > relevant and we should definitely not wait
                for it, because the goal is
                > > > > to just get through all the data. We can
                justify this by saying that
                > > > > the worker really has no business having any
                idea what time it really
                > > > > is, and the runner can just run the clock at
                whatever speed it wants.
                > > > >
                > > > > Another purpose, brought up on the Throttle
                thread, is to wait or
                > > > > backoff. In this case it would be desired for
                the timer to actually
                > > > > cause batch processing to pause and wait.
                This kind of behavior has
                > > > > not been explored much. Notably the runner
                can absolutely process all
                > > > > elements first, then start to fire any
                enqueued processing time
                > > > > timers. In the same way that state in batch
                can just be in memory,
                > > > > this *could* just be a call to sleep(). It
                all seems a bit sketchy so
                > > > > I'd love clearer opinions.
                > > > >
                > > > > These two are both operational effects - as
                you would expect for
                > > > > processing time timers - and they seem to be
                in conflict. Maybe they
                > > > > just need different features?
                > > > >
                > > > > I'd love to hear some more uses of processing
                time timers from the
                > > > > community.
                > > > >
                > > > > Kenn
                > >

Reply via email to