On 9/23/23 18:16, Reuven Lax via dev wrote:
Two separate things here:

1. Yes, a watermark can update in the middle of a bundle.
2. The records in the bundle themselves will prevent the watermark from updating as they are still in flight until after finish bundle. Therefore simply caching the records should always be watermark safe, regardless of the runner. You will only run into problems if you try and move timestamps "backwards" - which is why Beam strongly discourages this.
This is not aligned with  FlinkRunner's implementation. And I actually think it is not aligned conceptually.  As mentioned, Flink does not have the concept of bundles at all. It achieves fault tolerance via checkpointing, essentially checkpoint barrier flowing from sources to sinks, safely snapshotting state of each operator on the way. Bundles are implemented as a somewhat arbitrary set of elements between two consecutive checkpoints (there can be multiple bundles between checkpoints). A bundle is 'committed' (i.e. persistently stored and guaranteed not to retry) only after the checkpoint barrier passes over the elements in the bundle (every bundle is finished at the very latest exactly before a checkpoint). But watermark propagation and bundle finalization is completely unrelated. This might be a bug in the runner, but requiring checkpoint for watermark propagation will introduce insane delays between processing time and watermarks, every executable stage will delay watermark propagation until a checkpoint (which is typically the order of seconds). This delay would add up after each stage.

Reuven

On Sat, Sep 23, 2023 at 12:03 AM Jan Lukavský <je...@seznam.cz> wrote:

    > Watermarks shouldn't be (visibly) advanced until @FinishBundle
    is committed, as there's no guarantee that this work won't be
    discarded.

    There was a thread [1], where the conclusion seemed to be that
    updating watermark is possible even in the middle of a bundle.
    Actually, handling watermarks is runner-dependent (e.g. Flink does
    not store watermarks in checkpoints, they are always recomputed
    from scratch on restore).

    [1] https://lists.apache.org/thread/10db7l9bhnhmo484myps723sfxtjwwmv

    On 9/22/23 21:47, Robert Bradshaw via dev wrote:
    On Fri, Sep 22, 2023 at 10:58 AM Jan Lukavský <je...@seznam.cz>
    wrote:

        On 9/22/23 18:07, Robert Bradshaw via dev wrote:

        On Fri, Sep 22, 2023 at 7:23 AM Byron Ellis via dev
        <dev@beam.apache.org> wrote:

            I've actually wondered about this specifically for
            streaming... if you're writing a pipeline there it seems
            like you're often going to want to put high fixed cost
            things like database connections even outside of the
            bundle setup. You really only want to do that once in
            the lifetime of the worker itself, not the bundle. Seems
            like having that boundary be somewhere other than an
            arbitrarily (and probably small in streaming to avoid
            latency) group of elements might be more useful? I
            suppose this depends heavily on the object lifecycle in
            the sdk worker though.


        +1. This is the difference between @Setup and @StartBundle.
        The start/finish bundle operations should be used for
        bracketing element processing that must be committed as a
        unit for correct failure recovery (e.g. if elements are
        cached in ProcessElement, they should all be emitted in
        FinishBundle). On the other hand, things like open database
        connections can and likely should be shared across bundles.
        This is correct, but the caching between @StartBundle and
        @FinishBundle has some problems. First, users need to
        manually set watermark hold for min(timestamp in bundle),
        otherwise watermark might overtake the buffered elements.


    Watermarks shouldn't be (visibly) advanced until @FinishBundle is
    committed, as there's no guarantee that this work won't be
    discarded.

        Users don't have other option than using
        timer.withOutputTimestamp for that, as we don't have a
        user-facing API to set watermark hold otherwise, thus the
        in-bundle caching implies stateful DoFn. The question might
        then by, why not use "classical" stateful caching involving
        state, as there is full control over the caching in user
        code. This triggered me an idea if it would be useful to add
        the information about caching to the API (e.g. in Java
        @StartBundle(caching=true)), which could solve the above
        issues maybe (runner would know to set the hold, it could
        work with "stateless" DoFns)?


    Really, this is one of the areas that the streaming/batch
    abstraction leaks. In batch it was a common pattern to have local
    DoFn instance state that persisted from start to finish bundle,
    and these were also used as convenient entry points for other
    operations (like opening database connections) 'cause bundles
    were often "as large as possible." WIth the advent of n streaming
    it makes sense to put this in explicitly managed runner state to
    allow for cross-bundle amortization and there's more value in
    distinguishing between @Setup and @StartBundle.

    (Were I do to things over I'd probably encourage an API that
    discouraged non-configuration instance state on DoFns altogether,
    e.g. in the notion of Python context managers (and an equivalent
    API could probably be put together with AutoClosables in Java)
    one would have something like

    ParDo(X)

    which would logically (though not necessarily physically) lead to
    an execution like

    with X.bundle_processor() as bundle_processor:
      for bundle in bundles:
        with bundle_processor.element_processor() as process:
          for element in bundle:
            process(element)

    where the traditional setup/start_bundle/finish_bundle/teardown
    logic would live in the __enter__ and __exit__ methods (made even
    easier with coroutines.) For convenience one could of course
    provide a raw bundle processor or element processor to ParDo if
    the enter/exit contexts are trivial. But this is getting somewhat
    off-topic...


            Best,
            B

            On Fri, Sep 22, 2023 at 7:03 AM Kenneth Knowles
            <k...@apache.org> wrote:

                (I notice that you replied only to yourself, but
                there has been a whole thread of discussion on this
                - are you subscribed to dev@beam?
                
https://lists.apache.org/thread/k81fq301ypwmjowknzyqq2qc63844rbd)

                It sounds like you want what everyone wants: to have
                the biggest bundles possible.

                So for bounded data, basically you make even splits
                of the data and each split is one bundle. And then
                dynamic splitting to redistribute work to eliminate
                stragglers, if your engine has that capability.

                For unbounded data, you more-or-less bundle as much
                as you can without waiting too long, like Jan described.

                Users know to put their high fixed costs
                in @StartBundle and then it is the runner's job to
                put as many calls to @ProcessElement as possible to
                amortize.

                Kenn

                On Fri, Sep 22, 2023 at 9:39 AM Joey Tran
                <joey.t...@schrodinger.com> wrote:

                    Whoops, I typoed my last email. I meant to write
                    "this isn't the greatest strategy for high
                    *fixed* cost transforms", e.g. a transform that
                    takes 5 minutes to get set up and then maybe a
                    microsecond per input

                    I suppose one solution is to move the
                    responsibility for handling this kind of
                    situation to the user and expect users to use a
                    bundling transform (e.g. BatchElements [1])
                    followed by a Reshuffle+FlatMap. Is this what
                    other runners expect? Just want to make sure I'm
                    not missing some smart generic bundling strategy
                    that might handle this for users.

                    [1]
                    
https://beam.apache.org/releases/pydoc/current/apache_beam.transforms.util.html#apache_beam.transforms.util.BatchElements



                    On Thu, Sep 21, 2023 at 7:23 PM Joey Tran
                    <joey.t...@schrodinger.com> wrote:

                        Writing a runner and the first strategy for
                        determining bundling size was to just start
                        with a bundle size of one and double it
                        until we reach a size that we expect to take
                        some targets per-bundle runtime (e.g. maybe
                        10 minutes). I realize that this isn't the
                        greatest strategy for high sized cost
                        transforms. I'm curious what kind of
                        strategies other runners take?

Reply via email to