What is the reason to rely on StartBundle and not Setup in this case? If the life-cycle of bundle is not "closed" (i.e. start - finish), then it seems to be ill defined and Setup should do? I'm trying to think of non-caching use-cases of StartBundle-FinishBundle, are there such cases? I'd say yes, but I'm a little struggling finding a specific example that cannot be solved using Setup or lazy init.

On 9/27/23 19:58, Reuven Lax via dev wrote:
DoFns are allowed to be non deterministic, so they don't have to yield the "same" output.

The example I'm thinking of is where users perform some "best-effort" deduplication by creating a hashmap in StartBundle and removing duplicates. This is usually done purely for performance to reduce shuffle size, as opposed to a guaranteed RemoveDuplicates. This scenario doesn't require FinishBundle, though it does require a StartBundle.

On Tue, Sep 26, 2023 at 11:59 AM Kenneth Knowles <k...@apache.org> wrote:



    On Tue, Sep 26, 2023 at 1:33 PM Reuven Lax via dev
    <dev@beam.apache.org> wrote:

        Yes, not including FinishBundle in ParDoPayload seems like a
        mistake. Though absence of FinishBundle doesn't mean that one
        can assume that elements in a bundle don't affect subsequent
        bundle elements (i.e. there might still be caching!)


    Well for a DoFn to be correct, it has to yield the same (or "the
    same as much as the user expects it to be the same") output
    regardless of order of processing or bundling so a runner or SDK
    harness can definitely take a bunch of elements and process them
    however it wants if there's no @FinishBundle. I think that's what
    Jan is getting at - adding a @FinishBundle is the user placing a
    new restriction on the runner. Technically probably have to
    include @StartBundle in that consideration.

    Kenn


        On Tue, Sep 26, 2023 at 8:54 AM Kenneth Knowles
        <k...@apache.org> wrote:



            On Mon, Sep 25, 2023 at 1:19 PM Jan Lukavský
            <je...@seznam.cz> wrote:

                Hi Kenn and Reuven,

                I agree with all these points. The only issue here
                seems to be that FlinkRunner does not fulfill these
                constraints. This is a bug that can be fixed, though
                we need to change some defaults, as 1000 ms default
                bundle "duration" for lower traffic Pipelines can be
                too much. We are also probably missing some
                @ValidatesReunner tests for this. I created [1] and
                [2] to track this.

                One question still remains, the bundle vs. element
                life-cycle is relevant only for cases where processing
                of element X can affect processing of element Y later
                in the same bundle. Once this influence is rules out
                (i.e. no caching), this information can result in
                runner optimization that yields better performance.
                Should we consider propagate this information from
                user code to the runner?

            Yes!

            This was the explicit goal of the move to
            annotation-driven DoFn in https://s.apache.org/a-new-dofn
            to make it so that the SDK and runner can get good
            information about what the DoFn requirements are.

            When there is no @FinishBundle method, the runner can make
            additional optimizations. This should have been included
            in the ParDoPayload in the proto when we moved to portable
            pipelines. I cannot remember if there was a good reason
            that we did not do so. Maybe we (incorrectly) thought that
            this was an issue that only the Java SDK harness needed to
            know about.

            Kenn

                [1] https://github.com/apache/beam/issues/28649

                [2] https://github.com/apache/beam/issues/28650

                On 9/25/23 18:31, Reuven Lax via dev wrote:


                On Mon, Sep 25, 2023 at 6:19 AM Jan Lukavský
                <je...@seznam.cz> wrote:


                    On 9/23/23 18:16, Reuven Lax via dev wrote:
                    Two separate things here:

                    1. Yes, a watermark can update in the middle of
                    a bundle.
                    2. The records in the bundle themselves will
                    prevent the watermark from updating as they are
                    still in flight until after finish bundle.
                    Therefore simply caching the records should
                    always be watermark safe, regardless of the
                    runner. You will only run into problems if you
                    try and move timestamps "backwards" - which is
                    why Beam strongly discourages this.
                    This is not aligned with FlinkRunner's
                    implementation. And I actually think it is not
                    aligned conceptually.  As mentioned, Flink does
                    not have the concept of bundles at all. It
                    achieves fault tolerance via checkpointing,
                    essentially checkpoint barrier flowing from
                    sources to sinks, safely snapshotting state of
                    each operator on the way. Bundles are implemented
                    as a somewhat arbitrary set of elements between
                    two consecutive checkpoints (there can be
                    multiple bundles between checkpoints). A bundle
                    is 'committed' (i.e. persistently stored and
                    guaranteed not to retry) only after the
                    checkpoint barrier passes over the elements in
                    the bundle (every bundle is finished at the very
                    latest exactly before a checkpoint). But
                    watermark propagation and bundle finalization is
                    completely unrelated. This might be a bug in the
                    runner, but requiring checkpoint for watermark
                    propagation will introduce insane delays between
                    processing time and watermarks, every executable
                    stage will delay watermark propagation until a
                    checkpoint (which is typically the order of
                    seconds). This delay would add up after each stage.


                It's not bundles that hold up processing, rather it
                is elements, and elements are not considered
                "processed" until FinishBundle.

                You are right about Flink. In many cases this is fine
                - if Flink rolls back to the last checkpoint, the
                watermark will also roll back, and everything stays
                consistent. So in general, one does not need to wait
                for checkpoints for watermark propagation.

                Where things get a bit weirder with Flink is whenever
                one has external side effects. In theory, one should
                wait for checkpoints before letting a Sink flush,
                otherwise one could end up with incorrect outputs
                (especially with a sink like TextIO). Flink itself
                recognizes this, and that's why they provide
                TwoPhaseCommitSinkFunction
                
<https://nightlies.apache.org/flink/flink-docs-master/api/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunction.html>
 which
                waits for a checkpoint. In Beam, this is the reason
                we introduced RequiresStableInput. Of course in
                practice many Flink users don't do this - in which
                case they are prioritizing latency over data
                correctness.


                    Reuven

                    On Sat, Sep 23, 2023 at 12:03 AM Jan Lukavský
                    <je...@seznam.cz> wrote:

                        > Watermarks shouldn't be (visibly) advanced
                        until @FinishBundle is committed, as there's
                        no guarantee that this work won't be discarded.

                        There was a thread [1], where the conclusion
                        seemed to be that updating watermark is
                        possible even in the middle of a bundle.
                        Actually, handling watermarks is
                        runner-dependent (e.g. Flink does not store
                        watermarks in checkpoints, they are always
                        recomputed from scratch on restore).

                        [1]
                        
https://lists.apache.org/thread/10db7l9bhnhmo484myps723sfxtjwwmv

                        On 9/22/23 21:47, Robert Bradshaw via dev wrote:
                        On Fri, Sep 22, 2023 at 10:58 AM Jan
                        Lukavský <je...@seznam.cz> wrote:

                            On 9/22/23 18:07, Robert Bradshaw via
                            dev wrote:

                            On Fri, Sep 22, 2023 at 7:23 AM Byron
                            Ellis via dev <dev@beam.apache.org> wrote:

                                I've actually wondered about this
                                specifically for streaming... if
                                you're writing a pipeline there it
                                seems like you're often going to
                                want to put high fixed cost things
                                like database connections even
                                outside of the bundle setup. You
                                really only want to do that once
                                in the lifetime of the worker
                                itself, not the bundle. Seems like
                                having that boundary be somewhere
                                other than an arbitrarily (and
                                probably small in streaming to
                                avoid latency) group of elements
                                might be more useful? I suppose
                                this depends heavily on the object
                                lifecycle in the sdk worker though.


                            +1. This is the difference
                            between @Setup and @StartBundle. The
                            start/finish bundle operations should
                            be used for bracketing element
                            processing that must be committed as a
                            unit for correct failure recovery
                            (e.g. if elements are cached in
                            ProcessElement, they should all be
                            emitted in FinishBundle). On the other
                            hand, things like open database
                            connections can and likely should be
                            shared across bundles.
                            This is correct, but the caching
                            between @StartBundle and @FinishBundle
                            has some problems. First, users need to
                            manually set watermark hold for
                            min(timestamp in bundle), otherwise
                            watermark might overtake the buffered
                            elements.


                        Watermarks shouldn't be (visibly) advanced
                        until @FinishBundle is committed, as
                        there's no guarantee that this work won't
                        be discarded.

                            Users don't have other option than
                            using timer.withOutputTimestamp for
                            that, as we don't have a user-facing
                            API to set watermark hold otherwise,
                            thus the in-bundle caching implies
                            stateful DoFn. The question might then
                            by, why not use "classical" stateful
                            caching involving state, as there is
                            full control over the caching in user
                            code. This triggered me an idea if it
                            would be useful to add the information
                            about caching to the API (e.g. in Java
                            @StartBundle(caching=true)), which
                            could solve the above issues maybe
                            (runner would know to set the hold, it
                            could work with "stateless" DoFns)?


                        Really, this is one of the areas that the
                        streaming/batch abstraction leaks. In batch
                        it was a common pattern to have local DoFn
                        instance state that persisted from start to
                        finish bundle, and these were also used as
                        convenient entry points for other
                        operations (like opening
                        database connections) 'cause bundles were
                        often "as large as possible." WIth the
                        advent of n streaming it makes sense to put
                        this in explicitly managed runner state to
                        allow for cross-bundle amortization and
                        there's more value in
                        distinguishing between @Setup and @StartBundle.

                        (Were I do to things over I'd probably
                        encourage an API that discouraged
                        non-configuration instance state on DoFns
                        altogether, e.g. in the notion of Python
                        context managers (and an equivalent API
                        could probably be put together with
                        AutoClosables in Java) one would have
                        something like

                        ParDo(X)

                        which would logically (though not
                        necessarily physically) lead to an
                        execution like

                        with X.bundle_processor() as bundle_processor:
                          for bundle in bundles:
                            with
                        bundle_processor.element_processor() as
                        process:
                              for element in bundle:
                        process(element)

                        where the traditional
                        setup/start_bundle/finish_bundle/teardown
                        logic would live in the __enter__ and
                        __exit__ methods (made even easier with
                        coroutines.) For convenience one could of
                        course provide a raw bundle processor or
                        element processor to ParDo if the
                        enter/exit contexts are trivial. But this
                        is getting somewhat off-topic...


                                Best,
                                B

                                On Fri, Sep 22, 2023 at 7:03 AM
                                Kenneth Knowles <k...@apache.org>
                                wrote:

                                    (I notice that you replied
                                    only to yourself, but there
                                    has been a whole thread of
                                    discussion on this - are you
                                    subscribed to dev@beam?
                                    
https://lists.apache.org/thread/k81fq301ypwmjowknzyqq2qc63844rbd)

                                    It sounds like you want what
                                    everyone wants: to have the
                                    biggest bundles possible.

                                    So for bounded data, basically
                                    you make even splits of the
                                    data and each split is one
                                    bundle. And then dynamic
                                    splitting to redistribute work
                                    to eliminate stragglers, if
                                    your engine has that capability.

                                    For unbounded data, you
                                    more-or-less bundle as much as
                                    you can without waiting too
                                    long, like Jan described.

                                    Users know to put their high
                                    fixed costs in @StartBundle
                                    and then it is the runner's
                                    job to put as many calls
                                    to @ProcessElement as possible
                                    to amortize.

                                    Kenn

                                    On Fri, Sep 22, 2023 at
                                    9:39 AM Joey Tran
                                    <joey.t...@schrodinger.com> wrote:

                                        Whoops, I typoed my last
                                        email. I meant to write
                                        "this isn't the
                                        greatest strategy for high
                                        *fixed* cost transforms",
                                        e.g. a transform that
                                        takes 5 minutes to get set
                                        up and then maybe a
                                        microsecond per input

                                        I suppose one solution is
                                        to move the responsibility
                                        for handling this kind of
                                        situation to the user and
                                        expect users to use a
                                        bundling transform (e.g.
                                        BatchElements [1])
                                        followed by a
                                        Reshuffle+FlatMap. Is this
                                        what other runners expect?
                                        Just want to make sure I'm
                                        not missing some smart
                                        generic bundling strategy
                                        that might handle this for
                                        users.

                                        [1]
                                        
https://beam.apache.org/releases/pydoc/current/apache_beam.transforms.util.html#apache_beam.transforms.util.BatchElements



                                        On Thu, Sep 21, 2023 at
                                        7:23 PM Joey Tran
                                        <joey.t...@schrodinger.com>
                                        wrote:

                                            Writing a runner and
                                            the first strategy for
                                            determining bundling
                                            size was to just start
                                            with a bundle size of
                                            one and double it
                                            until we reach a size
                                            that we expect to take
                                            some targets
                                            per-bundle runtime
                                            (e.g. maybe 10
                                            minutes). I realize
                                            that this isn't the
                                            greatest strategy for
                                            high sized cost
                                            transforms. I'm
                                            curious what kind of
                                            strategies other
                                            runners take?

Reply via email to