On 2/27/24 16:36, Robert Burke wrote:
An "as fast as it can runner" with dynamic splits, would ultimately split to the systems maximum available parallelism (for stateful DoFns, this is the number of keys; for SplittableDoFns, this is the maximum sharding of each input element's restriction. That's what would happen with a "normal" sleep.
I see. It is definitely possible for a runner to split all processing to maximum parallelism, but - provided this cannot be controlled by user - can the semantics of the Throttle transform be even consistently defined in terms of processing time? Seems it would require a coordination with the runner so that user-code would at least be aware of current parallelism. The situation is easier for runners that set parallelism upfront.

WRT Portability, this means adding a current ProcessingTime field to the ProcessBundleRequest, and likely also to the ProgressRequest so the runner could coordinate. ProgressResponse may then need a "asleepUntil" field to communicate back the state of the bundle, which the runner could then use to better time its next ProgressRequest, and potentially arrest dynamic splitting for that bundle. After all, the sleeping bundle is blocked until processing time has advanced anyway; no progress can be made.

I like moving the abstraction out of the timer space, as it better aligns with user intent for the throttle case, and it doesn't require a Stateful DoFn to operate (orthogonal!), meaning it's useful for It also solves the testing issue WRT ProcessingTime timers using an absolute time, rather than a relative time, as the SDK can rebuild it's relative setters for output time on the new canonical processing time, without user code changing.
With what was said above - is the definition of sleep (pause) valid in the context of a bundle? By the same logic of splitting keys, "enough fast and efficient runner" could delay only the paused bundle and start processing different bundle (via different DoFn). It might require splitting bundles by keys, but should be possible. Seems that would in the end make the feature useless as well.

The sleeping inprogress bundle naturally holds back the watermark too.

I suspect this mechanism would end up tending to over throttle as Reuven described earlier, since the user is only pushing back on immediate processing for the current element, not necessarily all elements. This is particularly likely if there's a long gap between ProgressRequests for the bundle and the runner doesn't adapt it's cadence.

An external source of rate doesn't really exist, other than some external source that can provide throttle information. There would remain time skew between the runner system and the external system though, but for a throttle that's likely fine.

A central notion of ProcessingTime also allows the runner to "smear" processing time so if there's a particularly long delay, it doesn't need to catch up at once. I don't think that's relevant for the throttle case though, since with the described clock mechanism and the communication back to the runner, the unblocking notion is probably fine.

We'd need a discussion of what an SDK must do if the runner doesn't support the central clock for completeness, and consistency.


On Tue, Feb 27, 2024, 6:58 AM Jan Lukavský <je...@seznam.cz> wrote:

    On 2/27/24 14:51, Kenneth Knowles wrote:
    I very much like the idea of processing time clock as a parameter
    to @ProcessElement. That will be obviously useful and remove a
    source of inconsistency, in addition to letting the runner/SDK
    harness control it. I also like the idea of passing a Sleeper or
    to @ProcessElement. These are both good practices for testing and
    flexibility and runner/SDK language differences.

    In your (a) (b) (c) can you be more specific about which
    watermarks you are referring to? Are they the same as in my
    opening email? If so, then what you describe is what we already have.
    Yes, we have that for streaming, but it does not work this way in
    batch. In my understanding we violate (a), we ignore (b) because
    we fire timers at GC time only and (c) is currently relevant only
    immediately preceding window GC time, but can be defined more
    generally. But essentially yes, I was just trying to restate the
    streaming processing time semantics in the limited batch case.

    Kenn

    On Tue, Feb 27, 2024 at 2:40 AM Jan Lukavský <je...@seznam.cz> wrote:

        I think that before we introduce a possibly somewhat
        duplicate new feature we should be certain that it is really
        semantically different. I'll rephrase the two cases:

         a) need to wait and block data (delay) - the use case is the
        motivating example of Throttle transform

         b) act without data, not block

        Provided we align processing time with local machine clock
        (or better, because of testing, make current processing time
        available via context to @ProcessElement) it seems to possble
        to unify both cases under slightly updated semantics of
        processing time timer in batch:

         a) processing time timers fire with best-effort, i.e. trying
        to minimize delay between firing timestamp and timer's timestamp
         b) timer is valid only in the context of current key-window,
        once watermark passes window GC time for the particular
        window that created the timer, it is ignored
         c) if timer has output timestamp, this timestamp holds
        watermark (but this is currently probably noop, because
        runners currently do no propagate (per-key) watermark in
        batch, I assume)

        In case b) there might be needed to distinguish cases when
        timer has output timestamp, if so, it probably should be
        taken into account.

        Now, such semantics should be quite aligned with what we do
        in streaming case and what users generally expect. The
        blocking part can be implemented in @ProcessElement using
        buffer & timer, once there is need to wait, it can be
        implemented in user code using plain sleep(). That is due to
        the alignment between local time and definition of processing
        time. If we had some reason to be able to run
        faster-than-wall-clock (as I'm still not in favor of that),
        we could do that using ProcessContext.sleep(). Delaying
        processing in the @ProcessElement should result in
        backpressuring and backpropagation of this backpressure from
        the Throttle transform to the sources as mentioned (of course
        this is only for the streaming case).

        Is there anything missing in such definition that would still
        require splitting the timers into two distinct features?

         Jan

        On 2/26/24 21:22, Kenneth Knowles wrote:
        Yea I like DelayTimer, or SleepTimer, or WaitTimer or some
        such.

        OutputTime is always an event time timestamp so it isn't
        even allowed to be set outside the window (or you'd end up
        with an element assigned to a window that it isn't within,
        since OutputTime essentially represents reserving the right
        to output an element with that timestamp)

        Kenn

        On Mon, Feb 26, 2024 at 3:19 PM Robert Burke
        <rob...@frantil.com> wrote:

            Agreed that a retroactive behavior change would be bad,
            even if tied to a beam version change. I agree that it
            meshes well with the general theme of State & Timers
            exposing underlying primitives for implementing
            Windowing and similar. I'd say the distinction between
            the two might be additional complexity for users to
            grok, and would need to be documented well, as both
            operate in the ProcessingTime domain, but differently.

            What to call this new timer then? DelayTimer?

            "A DelayTimer sets an instant in ProcessingTime at which
            point computations can continue. Runners will prevent
            the EventTimer watermark from advancing past the set
            OutputTime until Processing Time has advanced to at
            least the provided instant to execute the timers
            callback. This can be used to allow the runner to
            constrain pipeline throughput with user guidance."

            I'd probably add that a timer with an output time
            outside of the window would not be guaranteed to fire,
            and that OnWindowExpiry is the correct way to ensure
            cleanup occurs.

            No solution to the Looping Timers on Drain problem here,
            but i think that's ultimately an orthogonal discussion,
            and will restrain my thoughts on that for now.

            This isn't a proposal, but exploring the solution space
            within our problem. We'd want to break down exactly what
            different and the same for the 3 kinds of timers...




            On Mon, Feb 26, 2024, 11:45 AM Kenneth Knowles
            <k...@apache.org> wrote:

                Pulling out focus points:

                On Fri, Feb 23, 2024 at 7:21 PM Robert Bradshaw via
                dev <dev@beam.apache.org> wrote:
                > I can't act on something yet [...] but I expect to
                be able to [...] at some time in the processing-time
                future.

                I like this as a clear and internally-consistent
                feature description. It describes
                ProcessContinuation and those timers which serve the
                same purpose as ProcessContinuation.

                On Fri, Feb 23, 2024 at 7:21 PM Robert Bradshaw via
                dev <dev@beam.apache.org> wrote:
                > I can't think of a batch or streaming scenario
                where it would be correct to not wait at least that long

                The main reason we created timers: to take action in
                the absence of data. The archetypal use case for
                processing time timers was/is "flush data from state
                if it has been sitting there too long". For this use
                case, the right behavior for batch is to skip the
                timer. It is actually basically incorrect to wait.

                On Fri, Feb 23, 2024 at 3:54 PM Robert Burke
                <lostl...@apache.org> wrote:
                > It doesn't require a new primitive.

                IMO what's being proposed *is* a new primitive. I
                think it is a good primitive. It is the underlying
                primitive to ProcessContinuation. It would be
                user-friendly as a kind of timer. But if we made
                this the behavior of processing time timers
                retroactively, it would break everyone using them to
                flush data who is also reprocessing data.

                There's two very different use cases ("I need to
                wait, and block data" vs "I want to act without
                data, aka NOT wait for data") and I think we should
                serve both of them, but it doesn't have to be with
                the same low-level feature.

                Kenn


                On Fri, Feb 23, 2024 at 7:21 PM Robert Bradshaw via
                dev <dev@beam.apache.org> wrote:

                    On Fri, Feb 23, 2024 at 3:54 PM Robert Burke
                    <lostl...@apache.org> wrote:
                    >
                    > While I'm currently on the other side of the
                    fence, I would not be against changing/requiring
                    the semantics of ProcessingTime constructs to be
                    "must wait and execute" as such a solution, and
                    enables the Proposed "batch" process
                    continuation throttling mechanism to work as
                    hypothesized for both "batch" and "streaming"
                    execution.
                    >
                    > There's a lot to like, as it leans Beam
                    further into the unification of Batch and
                    Stream, with one fewer exception (eg. unifies
                    timer experience further). It doesn't require a
                    new primitive. It probably matches more with
                    user expectations anyway.
                    >
                    > It does cause looping timer execution with
                    processing time to be a problem for Drains however.

                    I think we have a problem with looping timers
                    plus drain (a mostly
                    streaming idea anyway) regardless.

                    > I'd argue though that in the case of a drain,
                    we could updated the semantics as "move
                    watermark to infinity" "existing timers are
                    executed, but new timers are ignored",

                    I don't like the idea of dropping timers for
                    drain. I think correct
                    handling here requires user visibility into
                    whether a pipeline is
                    draining or not.

                    > and ensure/and update the requirements around
                    OnWindowExpiration callbacks to be a bit more
                    insistent on being implemented for correct
                    execution, which is currently the only "hard"
                    signal to the SDK side that the window's work is
                    guaranteed to be over, and remaining state needs
                    to be addressed by the transform or be garbage
                    collected. This remains critical for developing
                    a good pattern for ProcessingTime timers within
                    a Global Window too.

                    +1

                    >
                    > On 2024/02/23 19:48:22 Robert Bradshaw via dev
                    wrote:
                    > > Thanks for bringing this up.
                    > >
                    > > My position is that both batch and streaming
                    should wait for
                    > > processing time timers, according to local
                    time (with the exception of
                    > > tests that can accelerate this via faked
                    clocks).
                    > >
                    > > Both ProcessContinuations delays and
                    ProcessingTimeTimers are IMHO
                    > > isomorphic, and can be implemented in terms
                    of each other (at least in
                    > > one direction, and likely the other). Both
                    are an indication that I
                    > > can't act on something yet due to external
                    constraints (e.g. not all
                    > > the data has been published, or I lack
                    sufficient capacity/quota to
                    > > push things downstream) but I expect to be
                    able to (or at least would
                    > > like to check again) at some time in the
                    processing-time future. I
                    > > can't think of a batch or streaming scenario
                    where it would be correct
                    > > to not wait at least that long (even in
                    batch inputs, e.g. suppose I'm
                    > > tailing logs and was eagerly started before
                    they were fully written,
                    > > or waiting for some kind of
                    (non-data-dependent) quiessence or other
                    > > operation to finish).
                    > >
                    > >
                    > > On Fri, Feb 23, 2024 at 12:36 AM Jan
                    Lukavský <je...@seznam.cz> wrote:
                    > > >
                    > > > For me it always helps to seek analogy in
                    our physical reality. Stream
                    > > > processing actually has quite a good
                    analogy for both event-time and
                    > > > processing-time - the simplest model for
                    this being relativity theory.
                    > > > Event-time is the time at which events
                    occur _at distant locations_. Due
                    > > > to finite and invariant speed of light
                    (which is actually really
                    > > > involved in the explanation why any stream
                    processing is inevitably
                    > > > unordered) these events are observed
                    (processed) at different times
                    > > > (processing time, different for different
                    observers). It is perfectly
                    > > > possible for an observer to observe events
                    at a rate that is higher than
                    > > > one second per second. This also happens
                    in reality for observers that
                    > > > travel at relativistic speeds (which might
                    be an analogy for fast -
                    > > > batch - (re)processing). Besides the
                    invariant speed, there is also
                    > > > another invariant - local clock (wall
                    time) always ticks exactly at the
                    > > > rate of one second per second, no matter
                    what. It is not possible to
                    > > > "move faster or slower" through (local) time.
                    > > >
                    > > > In my understanding the reason why we do
                    not put any guarantees or
                    > > > bounds on the delay of firing processing
                    time timers is purely technical
                    > > > - the processing is (per key)
                    single-threaded, thus any timer has to
                    > > > wait before any element processing
                    finishes. This is only consequence of
                    > > > a technical solution, not something
                    fundamental.
                    > > >
                    > > > Having said that, my point is that
                    according to the above analogy, it
                    > > > should be perfectly fine to fire
                    processing time timers in batch based
                    > > > on (local wall) time only. There should be
                    no way of manipulating this
                    > > > local time (excluding tests). Watermarks
                    should be affected the same way
                    > > > as any buffering in a state that would
                    happen in a stateful DoFn (i.e.
                    > > > set timer holds output watermark). We
                    should probably pay attention to
                    > > > looping timers, but it seems possible to
                    define a valid stopping
                    > > > condition (input watermark at infinity).
                    > > >
                    > > >   Jan
                    > > >
                    > > > On 2/22/24 19:50, Kenneth Knowles wrote:
                    > > > > Forking this thread.
                    > > > >
                    > > > > The state of processing time timers in
                    this mode of processing is not
                    > > > > satisfactory and is discussed a lot but
                    we should make everything
                    > > > > explicit.
                    > > > >
                    > > > > Currently, a state and timer DoFn has a
                    number of logical watermarks:
                    > > > > (apologies for fixed width not coming
                    through in email lists). Treat
                    > > > > timers as a back edge.
                    > > > >
                    > > > > input --(A)----(C)--> ParDo(DoFn)
                    ----(D)---> output
                    > > > >  ^                      |
                    > > > > |--(B)-----------------|
                    > > > >                 timers
                    > > > >
                    > > > > (A) Input Element watermark: this is the
                    watermark that promises there
                    > > > > is no incoming element with a timestamp
                    earlier than it. Each input
                    > > > > element's timestamp holds this
                    watermark. Note that *event time timers
                    > > > > firing is according to this watermark*.
                    But a runner commits changes
                    > > > > to this watermark *whenever it wants*,
                    in a way that can be
                    > > > > consistent. So the runner can absolute
                    process *all* the elements
                    > > > > before advancing the watermark (A), and
                    only afterwards start firing
                    > > > > timers.
                    > > > >
                    > > > > (B) Timer watermark: this is a watermark
                    that promises no timer is set
                    > > > > with an output timestamp earlier than
                    it. Each timer that has an
                    > > > > output timestamp holds this watermark.
                    Note that timers can set new
                    > > > > timers, indefinitely, so this may never
                    reach infinity even in a drain
                    > > > > scenario.
                    > > > >
                    > > > > (C) (derived) total input watermark:
                    this is a watermark that is the
                    > > > > minimum of the two above, and ensures
                    that all state for the DoFn for
                    > > > > expired windows can be GCd after calling
                    @OnWindowExpiration.
                    > > > >
                    > > > > (D) output watermark: this is a promise
                    that the DoFn will not output
                    > > > > earlier than the watermark. It is held
                    by the total input watermark.
                    > > > >
                    > > > > So a any timer, processing or not, holds
                    the total input watermark
                    > > > > which prevents window GC, hence the
                    timer must be fired. You can set
                    > > > > timers without a timestamp and they will
                    not hold (B) hence not hold
                    > > > > the total input / GC watermark (C). Then
                    if a timer fires for an
                    > > > > expired window, it is ignored. But in
                    general a timer that sets an
                    > > > > output timestamp is saying that it may
                    produce output, so it *must* be
                    > > > > fired, even in batch, for data
                    integrity. There was a time before
                    > > > > timers had output timestamps that we
                    said that you *always* have to
                    > > > > have an @OnWindowExpiration callback for
                    data integrity, and
                    > > > > processing time timers could not hold
                    the watermark. That is changed now.
                    > > > >
                    > > > > One main purpose of processing time
                    timers in streaming is to be a
                    > > > > "timeout" for data buffered in state, to
                    eventually flush. In this
                    > > > > case the output timestamp should be the
                    minimum of the elements in
                    > > > > state (or equivalent). In batch, of
                    course, this kind of timer is not
                    > > > > relevant and we should definitely not
                    wait for it, because the goal is
                    > > > > to just get through all the data. We can
                    justify this by saying that
                    > > > > the worker really has no business having
                    any idea what time it really
                    > > > > is, and the runner can just run the
                    clock at whatever speed it wants.
                    > > > >
                    > > > > Another purpose, brought up on the
                    Throttle thread, is to wait or
                    > > > > backoff. In this case it would be
                    desired for the timer to actually
                    > > > > cause batch processing to pause and
                    wait. This kind of behavior has
                    > > > > not been explored much. Notably the
                    runner can absolutely process all
                    > > > > elements first, then start to fire any
                    enqueued processing time
                    > > > > timers. In the same way that state in
                    batch can just be in memory,
                    > > > > this *could* just be a call to sleep().
                    It all seems a bit sketchy so
                    > > > > I'd love clearer opinions.
                    > > > >
                    > > > > These two are both operational effects -
                    as you would expect for
                    > > > > processing time timers - and they seem
                    to be in conflict. Maybe they
                    > > > > just need different features?
                    > > > >
                    > > > > I'd love to hear some more uses of
                    processing time timers from the
                    > > > > community.
                    > > > >
                    > > > > Kenn
                    > >

Reply via email to