Understood, thanks. This is fairly unintuitive from the "checkpoint barrier" viewpoint, because when such runner fails, it simply restarts from the checkpoint as it would be a fresh start - i.e. calling Setup. It makes sense that a bundle-based runner might not do that.

It seems to follow that we cannot infer any optimizations purely from static analysis of the DoFn, should we consider adding an opt-out parameter for the bundle atomicity (which has also implications) and bundle in-flight element watermark hold? I'd say yes, because otherwise we might restrict some runners too much.

On 9/27/23 20:24, Reuven Lax via dev wrote:
Using Setup would cause data loss in this case. A runner can always retry a bundle, and I don't believe Setup is called again in this case. If the user initiated the hashmap in setup, this would cause records to be completely lost whenever bundles retry.

On Wed, Sep 27, 2023 at 11:20 AM Jan Lukavský <je...@seznam.cz> wrote:

    What is the reason to rely on StartBundle and not Setup in this
    case? If the life-cycle of bundle is not "closed" (i.e. start -
    finish), then it seems to be ill defined and Setup should do?
    I'm trying to think of non-caching use-cases of
    StartBundle-FinishBundle, are there such cases? I'd say yes, but
    I'm a little struggling finding a specific example that cannot be
    solved using Setup or lazy init.

    On 9/27/23 19:58, Reuven Lax via dev wrote:
    DoFns are allowed to be non deterministic, so they don't have to
    yield the "same" output.

    The example I'm thinking of is where users perform some
    "best-effort" deduplication by creating a hashmap in StartBundle
    and removing duplicates. This is usually done purely for
    performance to reduce shuffle size, as opposed to a guaranteed
    RemoveDuplicates. This scenario doesn't require FinishBundle,
    though it does require a StartBundle.

    On Tue, Sep 26, 2023 at 11:59 AM Kenneth Knowles
    <k...@apache.org> wrote:



        On Tue, Sep 26, 2023 at 1:33 PM Reuven Lax via dev
        <dev@beam.apache.org> wrote:

            Yes, not including FinishBundle in ParDoPayload seems
            like a mistake. Though absence of FinishBundle doesn't
            mean that one can assume that elements in a bundle don't
            affect subsequent bundle elements (i.e. there might still
            be caching!)


        Well for a DoFn to be correct, it has to yield the same (or
        "the same as much as the user expects it to be the same")
        output regardless of order of processing or bundling so a
        runner or SDK harness can definitely take a bunch of elements
        and process them however it wants if there's
        no @FinishBundle. I think that's what Jan is getting at -
        adding a @FinishBundle is the user placing a new restriction
        on the runner. Technically probably have to
        include @StartBundle in that consideration.

        Kenn


            On Tue, Sep 26, 2023 at 8:54 AM Kenneth Knowles
            <k...@apache.org> wrote:



                On Mon, Sep 25, 2023 at 1:19 PM Jan Lukavský
                <je...@seznam.cz> wrote:

                    Hi Kenn and Reuven,

                    I agree with all these points. The only issue
                    here seems to be that FlinkRunner does not
                    fulfill these constraints. This is a bug that can
                    be fixed, though we need to change some defaults,
                    as 1000 ms default bundle "duration" for lower
                    traffic Pipelines can be too much. We are also
                    probably missing some @ValidatesReunner tests for
                    this. I created [1] and [2] to track this.

                    One question still remains, the bundle vs.
                    element life-cycle is relevant only for cases
                    where processing of element X can affect
                    processing of element Y later in the same bundle.
                    Once this influence is rules out (i.e. no
                    caching), this information can result in runner
                    optimization that yields better performance.
                    Should we consider propagate this information
                    from user code to the runner?

                Yes!

                This was the explicit goal of the move to
                annotation-driven DoFn in
                https://s.apache.org/a-new-dofn to make it so that
                the SDK and runner can get good information about
                what the DoFn requirements are.

                When there is no @FinishBundle method, the runner can
                make additional optimizations. This should have been
                included in the ParDoPayload in the proto when we
                moved to portable pipelines. I cannot remember if
                there was a good reason that we did not do so. Maybe
                we (incorrectly) thought that this was an issue that
                only the Java SDK harness needed to know about.

                Kenn

                    [1] https://github.com/apache/beam/issues/28649

                    [2] https://github.com/apache/beam/issues/28650

                    On 9/25/23 18:31, Reuven Lax via dev wrote:


                    On Mon, Sep 25, 2023 at 6:19 AM Jan Lukavský
                    <je...@seznam.cz> wrote:


                        On 9/23/23 18:16, Reuven Lax via dev wrote:
                        Two separate things here:

                        1. Yes, a watermark can update in the
                        middle of a bundle.
                        2. The records in the bundle themselves
                        will prevent the watermark from updating as
                        they are still in flight until after finish
                        bundle. Therefore simply caching the
                        records should always be watermark safe,
                        regardless of the runner. You will only run
                        into problems if you try and move
                        timestamps "backwards" - which is why Beam
                        strongly discourages this.
                        This is not aligned with FlinkRunner's
                        implementation. And I actually think it is
                        not aligned conceptually.  As mentioned,
                        Flink does not have the concept of bundles
                        at all. It achieves fault tolerance via
                        checkpointing, essentially checkpoint
                        barrier flowing from sources to sinks,
                        safely snapshotting state of each operator
                        on the way. Bundles are implemented as a
                        somewhat arbitrary set of elements between
                        two consecutive checkpoints (there can be
                        multiple bundles between checkpoints). A
                        bundle is 'committed' (i.e. persistently
                        stored and guaranteed not to retry) only
                        after the checkpoint barrier passes over the
                        elements in the bundle (every bundle is
                        finished at the very latest exactly before a
                        checkpoint). But watermark propagation and
                        bundle finalization is completely unrelated.
                        This might be a bug in the runner, but
                        requiring checkpoint for watermark
                        propagation will introduce insane delays
                        between processing time and watermarks,
                        every executable stage will delay watermark
                        propagation until a checkpoint (which is
                        typically the order of seconds). This delay
                        would add up after each stage.


                    It's not bundles that hold up processing, rather
                    it is elements, and elements are not considered
                    "processed" until FinishBundle.

                    You are right about Flink. In many cases this is
                    fine - if Flink rolls back to the last
                    checkpoint, the watermark will also roll back,
                    and everything stays consistent. So in general,
                    one does not need to wait for checkpoints for
                    watermark propagation.

                    Where things get a bit weirder with Flink is
                    whenever one has external side effects. In
                    theory, one should wait for checkpoints before
                    letting a Sink flush, otherwise one could end up
                    with incorrect outputs (especially with a sink
                    like TextIO). Flink itself recognizes this, and
                    that's why they provide
                    TwoPhaseCommitSinkFunction
                    
<https://nightlies.apache.org/flink/flink-docs-master/api/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunction.html>
 which
                    waits for a checkpoint. In Beam, this is the
                    reason we introduced RequiresStableInput. Of
                    course in practice many Flink users don't do
                    this - in which case they are prioritizing
                    latency over data correctness.


                        Reuven

                        On Sat, Sep 23, 2023 at 12:03 AM Jan
                        Lukavský <je...@seznam.cz> wrote:

                            > Watermarks shouldn't be (visibly)
                            advanced until @FinishBundle is
                            committed, as there's no guarantee that
                            this work won't be discarded.

                            There was a thread [1], where the
                            conclusion seemed to be that updating
                            watermark is possible even in the
                            middle of a bundle. Actually, handling
                            watermarks is runner-dependent (e.g.
                            Flink does not store watermarks in
                            checkpoints, they are always recomputed
                            from scratch on restore).

                            [1]
                            
https://lists.apache.org/thread/10db7l9bhnhmo484myps723sfxtjwwmv

                            On 9/22/23 21:47, Robert Bradshaw via
                            dev wrote:
                            On Fri, Sep 22, 2023 at 10:58 AM Jan
                            Lukavský <je...@seznam.cz> wrote:

                                On 9/22/23 18:07, Robert Bradshaw
                                via dev wrote:

                                On Fri, Sep 22, 2023 at 7:23 AM
                                Byron Ellis via dev
                                <dev@beam.apache.org> wrote:

                                    I've actually wondered about
                                    this specifically for
                                    streaming... if you're
                                    writing a pipeline there it
                                    seems like you're often going
                                    to want to put high fixed
                                    cost things like database
                                    connections even outside of
                                    the bundle setup. You really
                                    only want to do that once in
                                    the lifetime of the worker
                                    itself, not the bundle. Seems
                                    like having that boundary be
                                    somewhere other than an
                                    arbitrarily (and probably
                                    small in streaming to avoid
                                    latency) group of elements
                                    might be more useful? I
                                    suppose this depends heavily
                                    on the object lifecycle in
                                    the sdk worker though.


                                +1. This is the difference
                                between @Setup and @StartBundle.
                                The start/finish bundle
                                operations should be used for
                                bracketing element processing
                                that must be committed as a unit
                                for correct failure recovery
                                (e.g. if elements are cached in
                                ProcessElement, they should all
                                be emitted in FinishBundle). On
                                the other hand, things like open
                                database connections can and
                                likely should be shared across
                                bundles.
                                This is correct, but the caching
                                between @StartBundle and
                                @FinishBundle has some problems.
                                First, users need to manually set
                                watermark hold for min(timestamp
                                in bundle), otherwise watermark
                                might overtake the buffered elements.


                            Watermarks shouldn't be (visibly)
                            advanced until @FinishBundle is
                            committed, as there's no guarantee
                            that this work won't be discarded.

                                Users don't have other option than
                                using timer.withOutputTimestamp
                                for that, as we don't have a
                                user-facing API to set watermark
                                hold otherwise, thus the in-bundle
                                caching implies stateful DoFn. The
                                question might then by, why not
                                use "classical" stateful caching
                                involving state, as there is full
                                control over the caching in user
                                code. This triggered me an idea if
                                it would be useful to add the
                                information about caching to the
                                API (e.g. in Java
                                @StartBundle(caching=true)), which
                                could solve the above issues maybe
                                (runner would know to set the
                                hold, it could work with
                                "stateless" DoFns)?


                            Really, this is one of the areas that
                            the streaming/batch abstraction leaks.
                            In batch it was a common pattern to
                            have local DoFn instance state that
                            persisted from start to finish bundle,
                            and these were also used as convenient
                            entry points for other operations
                            (like opening database connections)
                            'cause bundles were often "as large as
                            possible." WIth the advent of n
                            streaming it makes sense to put this
                            in explicitly managed runner state to
                            allow for cross-bundle amortization
                            and there's more value in
                            distinguishing between @Setup and
                            @StartBundle.

                            (Were I do to things over I'd probably
                            encourage an API that discouraged
                            non-configuration instance state on
                            DoFns altogether, e.g. in the notion
                            of Python context managers (and an
                            equivalent API could probably be put
                            together with AutoClosables in Java)
                            one would have something like

                            ParDo(X)

                            which would logically (though not
                            necessarily physically) lead to an
                            execution like

                            with X.bundle_processor() as
                            bundle_processor:
                              for bundle in bundles:
                                with
                            bundle_processor.element_processor()
                            as process:
                                  for element in bundle:
                            process(element)

                            where the traditional
                            setup/start_bundle/finish_bundle/teardown
                            logic would live in the __enter__ and
                            __exit__ methods (made even easier
                            with coroutines.) For convenience one
                            could of course provide a raw bundle
                            processor or element processor to
                            ParDo if the enter/exit contexts are
                            trivial. But this is getting somewhat
                            off-topic...


                                    Best,
                                    B

                                    On Fri, Sep 22, 2023 at
                                    7:03 AM Kenneth Knowles
                                    <k...@apache.org> wrote:

                                        (I notice that you
                                        replied only to yourself,
                                        but there has been a
                                        whole thread of
                                        discussion on this - are
                                        you subscribed to
                                        dev@beam?
                                        
https://lists.apache.org/thread/k81fq301ypwmjowknzyqq2qc63844rbd)

                                        It sounds like you want
                                        what everyone wants: to
                                        have the biggest bundles
                                        possible.

                                        So for bounded data,
                                        basically you make even
                                        splits of the data and
                                        each split is one bundle.
                                        And then dynamic
                                        splitting to redistribute
                                        work to eliminate
                                        stragglers, if your
                                        engine has that capability.

                                        For unbounded data, you
                                        more-or-less bundle as
                                        much as you can without
                                        waiting too long, like
                                        Jan described.

                                        Users know to put their
                                        high fixed costs
                                        in @StartBundle and then
                                        it is the runner's job to
                                        put as many calls
                                        to @ProcessElement as
                                        possible to amortize.

                                        Kenn

                                        On Fri, Sep 22, 2023 at
                                        9:39 AM Joey Tran
                                        <joey.t...@schrodinger.com>
                                        wrote:

                                            Whoops, I typoed my
                                            last email. I meant
                                            to write "this isn't
                                            the greatest strategy
                                            for high *fixed* cost
                                            transforms", e.g. a
                                            transform that takes
                                            5 minutes to get set
                                            up and then maybe a
                                            microsecond per input

                                            I suppose one
                                            solution is to move
                                            the responsibility
                                            for handling this
                                            kind of situation to
                                            the user and expect
                                            users to use a
                                            bundling transform
                                            (e.g. BatchElements
                                            [1]) followed by a
                                            Reshuffle+FlatMap. Is
                                            this what other
                                            runners expect? Just
                                            want to make sure I'm
                                            not missing some
                                            smart generic
                                            bundling strategy
                                            that might handle
                                            this for users.

                                            [1]
                                            
https://beam.apache.org/releases/pydoc/current/apache_beam.transforms.util.html#apache_beam.transforms.util.BatchElements



                                            On Thu, Sep 21, 2023
                                            at 7:23 PM Joey Tran
                                            <joey.t...@schrodinger.com>
                                            wrote:

                                                Writing a runner
                                                and the first
                                                strategy for
                                                determining
                                                bundling size was
                                                to just start
                                                with a bundle
                                                size of one and
                                                double it until
                                                we reach a size
                                                that we expect to
                                                take some targets
                                                per-bundle
                                                runtime (e.g.
                                                maybe 10
                                                minutes). I
                                                realize that this
                                                isn't the
                                                greatest strategy
                                                for high sized
                                                cost transforms.
                                                I'm curious what
                                                kind of
                                                strategies other
                                                runners take?

Reply via email to