Hi Kenn and Reuven,

I agree with all these points. The only issue here seems to be that FlinkRunner does not fulfill these constraints. This is a bug that can be fixed, though we need to change some defaults, as 1000 ms default bundle "duration" for lower traffic Pipelines can be too much. We are also probably missing some @ValidatesReunner tests for this. I created [1] and [2] to track this.

One question still remains, the bundle vs. element life-cycle is relevant only for cases where processing of element X can affect processing of element Y later in the same bundle. Once this influence is rules out (i.e. no caching), this information can result in runner optimization that yields better performance. Should we consider propagate this information from user code to the runner?

[1] https://github.com/apache/beam/issues/28649

[2] https://github.com/apache/beam/issues/28650

On 9/25/23 18:31, Reuven Lax via dev wrote:


On Mon, Sep 25, 2023 at 6:19 AM Jan Lukavský <je...@seznam.cz> wrote:


    On 9/23/23 18:16, Reuven Lax via dev wrote:
    Two separate things here:

    1. Yes, a watermark can update in the middle of a bundle.
    2. The records in the bundle themselves will prevent the
    watermark from updating as they are still in flight until after
    finish bundle. Therefore simply caching the records should always
    be watermark safe, regardless of the runner. You will only run
    into problems if you try and move timestamps "backwards" - which
    is why Beam strongly discourages this.
    This is not aligned with  FlinkRunner's implementation. And I
    actually think it is not aligned conceptually.  As mentioned,
    Flink does not have the concept of bundles at all. It achieves
    fault tolerance via checkpointing, essentially checkpoint barrier
    flowing from sources to sinks, safely snapshotting state of each
    operator on the way. Bundles are implemented as a somewhat
    arbitrary set of elements between two consecutive checkpoints
    (there can be multiple bundles between checkpoints). A bundle is
    'committed' (i.e. persistently stored and guaranteed not to retry)
    only after the checkpoint barrier passes over the elements in the
    bundle (every bundle is finished at the very latest exactly before
    a checkpoint). But watermark propagation and bundle finalization
    is completely unrelated. This might be a bug in the runner, but
    requiring checkpoint for watermark propagation will introduce
    insane delays between processing time and watermarks, every
    executable stage will delay watermark propagation until a
    checkpoint (which is typically the order of seconds). This delay
    would add up after each stage.


It's not bundles that hold up processing, rather it is elements, and elements are not considered "processed" until FinishBundle.

You are right about Flink. In many cases this is fine - if Flink rolls back to the last checkpoint, the watermark will also roll back, and everything stays consistent. So in general, one does not need to wait for checkpoints for watermark propagation.

Where things get a bit weirder with Flink is whenever one has external side effects. In theory, one should wait for checkpoints before letting a Sink flush, otherwise one could end up with incorrect outputs (especially with a sink like TextIO). Flink itself recognizes this, and that's why they provide TwoPhaseCommitSinkFunction <https://nightlies.apache.org/flink/flink-docs-master/api/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunction.html> which waits for a checkpoint. In Beam, this is the reason we introduced RequiresStableInput. Of course in practice many Flink users don't do this - in which case they are prioritizing latency over data correctness.


    Reuven

    On Sat, Sep 23, 2023 at 12:03 AM Jan Lukavský <je...@seznam.cz>
    wrote:

        > Watermarks shouldn't be (visibly) advanced until
        @FinishBundle is committed, as there's no guarantee that this
        work won't be discarded.

        There was a thread [1], where the conclusion seemed to be
        that updating watermark is possible even in the middle of a
        bundle. Actually, handling watermarks is runner-dependent
        (e.g. Flink does not store watermarks in checkpoints, they
        are always recomputed from scratch on restore).

        [1]
        https://lists.apache.org/thread/10db7l9bhnhmo484myps723sfxtjwwmv

        On 9/22/23 21:47, Robert Bradshaw via dev wrote:
        On Fri, Sep 22, 2023 at 10:58 AM Jan Lukavský
        <je...@seznam.cz> wrote:

            On 9/22/23 18:07, Robert Bradshaw via dev wrote:

            On Fri, Sep 22, 2023 at 7:23 AM Byron Ellis via dev
            <dev@beam.apache.org> wrote:

                I've actually wondered about this specifically for
                streaming... if you're writing a pipeline there it
                seems like you're often going to want to put high
                fixed cost things like database connections even
                outside of the bundle setup. You really only want
                to do that once in the lifetime of the worker
                itself, not the bundle. Seems like having that
                boundary be somewhere other than an arbitrarily
                (and probably small in streaming to avoid latency)
                group of elements might be more useful? I suppose
                this depends heavily on the object lifecycle in the
                sdk worker though.


            +1. This is the difference between @Setup
            and @StartBundle. The start/finish bundle operations
            should be used for bracketing element processing that
            must be committed as a unit for
            correct failure recovery (e.g. if elements are cached
            in ProcessElement, they should all be emitted in
            FinishBundle). On the other hand, things like open
            database connections can and likely should be shared
            across bundles.
            This is correct, but the caching between @StartBundle
            and @FinishBundle has some problems. First, users need
            to manually set watermark hold for min(timestamp in
            bundle), otherwise watermark might overtake the buffered
            elements.


        Watermarks shouldn't be (visibly) advanced until
        @FinishBundle is committed, as there's no guarantee that
        this work won't be discarded.

            Users don't have other option than using
            timer.withOutputTimestamp for that, as we don't have a
            user-facing API to set watermark hold otherwise, thus
            the in-bundle caching implies stateful DoFn. The
            question might then by, why not use "classical" stateful
            caching involving state, as there is full control over
            the caching in user code. This triggered me an idea if
            it would be useful to add the information about caching
            to the API (e.g. in Java @StartBundle(caching=true)),
            which could solve the above issues maybe (runner would
            know to set the hold, it could work with "stateless" DoFns)?


        Really, this is one of the areas that the streaming/batch
        abstraction leaks. In batch it was a common pattern to have
        local DoFn instance state that persisted from start to
        finish bundle, and these were also used as convenient entry
        points for other operations (like opening
        database connections) 'cause bundles were often "as large as
        possible." WIth the advent of n streaming it makes sense to
        put this in explicitly managed runner state to allow for
        cross-bundle amortization and there's more value in
        distinguishing between @Setup and @StartBundle.

        (Were I do to things over I'd probably encourage an API that
        discouraged non-configuration instance state on DoFns
        altogether, e.g. in the notion of Python context managers
        (and an equivalent API could probably be put together with
        AutoClosables in Java) one would have something like

        ParDo(X)

        which would logically (though not necessarily physically)
        lead to an execution like

        with X.bundle_processor() as bundle_processor:
          for bundle in bundles:
            with bundle_processor.element_processor() as process:
              for element in bundle:
                process(element)

        where the traditional
        setup/start_bundle/finish_bundle/teardown logic would live
        in the __enter__ and __exit__ methods (made even easier with
        coroutines.) For convenience one could of course provide a
        raw bundle processor or element processor to ParDo if the
        enter/exit contexts are trivial. But this is getting
        somewhat off-topic...


                Best,
                B

                On Fri, Sep 22, 2023 at 7:03 AM Kenneth Knowles
                <k...@apache.org> wrote:

                    (I notice that you replied only to yourself,
                    but there has been a whole thread of discussion
                    on this - are you subscribed to dev@beam?
                    
https://lists.apache.org/thread/k81fq301ypwmjowknzyqq2qc63844rbd)

                    It sounds like you want what everyone wants: to
                    have the biggest bundles possible.

                    So for bounded data, basically you make even
                    splits of the data and each split is one
                    bundle. And then dynamic splitting to
                    redistribute work to eliminate stragglers, if
                    your engine has that capability.

                    For unbounded data, you more-or-less bundle as
                    much as you can without waiting too long, like
                    Jan described.

                    Users know to put their high fixed costs
                    in @StartBundle and then it is the runner's job
                    to put as many calls to @ProcessElement as
                    possible to amortize.

                    Kenn

                    On Fri, Sep 22, 2023 at 9:39 AM Joey Tran
                    <joey.t...@schrodinger.com> wrote:

                        Whoops, I typoed my last email. I meant to
                        write "this isn't the greatest strategy for
                        high *fixed* cost transforms", e.g. a
                        transform that takes 5 minutes to get set
                        up and then maybe a microsecond per input

                        I suppose one solution is to move the
                        responsibility for handling this kind of
                        situation to the user and expect users to
                        use a bundling transform (e.g.
                        BatchElements [1]) followed by a
                        Reshuffle+FlatMap. Is this what other
                        runners expect? Just want to make sure I'm
                        not missing some smart generic bundling
                        strategy that might handle this for users.

                        [1]
                        
https://beam.apache.org/releases/pydoc/current/apache_beam.transforms.util.html#apache_beam.transforms.util.BatchElements



                        On Thu, Sep 21, 2023 at 7:23 PM Joey Tran
                        <joey.t...@schrodinger.com> wrote:

                            Writing a runner and the first strategy
                            for determining bundling size was to
                            just start with a bundle size of one
                            and double it until we reach a size
                            that we expect to take some targets
                            per-bundle runtime (e.g. maybe 10
                            minutes). I realize that this isn't the
                            greatest strategy for high sized cost
                            transforms. I'm curious what kind of
                            strategies other runners take?

Reply via email to