On 9/22/23 18:07, Robert Bradshaw via dev wrote:
On Fri, Sep 22, 2023 at 7:23 AM Byron Ellis via dev <dev@beam.apache.org> wrote:

    I've actually wondered about this specifically for streaming... if
    you're writing a pipeline there it seems like you're often going
    to want to put high fixed cost things like database connections
    even outside of the bundle setup. You really only want to do that
    once in the lifetime of the worker itself, not the bundle. Seems
    like having that boundary be somewhere other than an arbitrarily
    (and probably small in streaming to avoid latency) group of
    elements might be more useful? I suppose this depends heavily on
    the object lifecycle in the sdk worker though.


+1. This is the difference between @Setup and @StartBundle. The start/finish bundle operations should be used for bracketing element processing that must be committed as a unit for correct failure recovery (e.g. if elements are cached in ProcessElement, they should all be emitted in FinishBundle). On the other hand, things like open database connections can and likely should be shared across bundles.
This is correct, but the caching between @StartBundle and @FinishBundle has some problems. First, users need to manually set watermark hold for min(timestamp in bundle), otherwise watermark might overtake the buffered elements. Users don't have other option than using timer.withOutputTimestamp for that, as we don't have a user-facing API to set watermark hold otherwise, thus the in-bundle caching implies stateful DoFn. The question might then by, why not use "classical" stateful caching involving state, as there is full control over the caching in user code. This triggered me an idea if it would be useful to add the information about caching to the API (e.g. in Java @StartBundle(caching=true)), which could solve the above issues maybe (runner would know to set the hold, it could work with "stateless" DoFns)?


    Best,
    B

    On Fri, Sep 22, 2023 at 7:03 AM Kenneth Knowles <k...@apache.org>
    wrote:

        (I notice that you replied only to yourself, but there has
        been a whole thread of discussion on this - are you subscribed
        to dev@beam?
        https://lists.apache.org/thread/k81fq301ypwmjowknzyqq2qc63844rbd)

        It sounds like you want what everyone wants: to have the
        biggest bundles possible.

        So for bounded data, basically you make even splits of the
        data and each split is one bundle. And then dynamic splitting
        to redistribute work to eliminate stragglers, if your engine
        has that capability.

        For unbounded data, you more-or-less bundle as much as you can
        without waiting too long, like Jan described.

        Users know to put their high fixed costs in @StartBundle and
        then it is the runner's job to put as many calls
        to @ProcessElement as possible to amortize.

        Kenn

        On Fri, Sep 22, 2023 at 9:39 AM Joey Tran
        <joey.t...@schrodinger.com> wrote:

            Whoops, I typoed my last email. I meant to write "this
            isn't the greatest strategy for high *fixed* cost
            transforms", e.g. a transform that takes 5 minutes to get
            set up and then maybe a microsecond per input

            I suppose one solution is to move the responsibility for
            handling this kind of situation to the user and expect
            users to use a bundling transform (e.g. BatchElements [1])
            followed by a Reshuffle+FlatMap. Is this what other
            runners expect? Just want to make sure I'm not missing
            some smart generic bundling strategy that might handle
            this for users.

            [1]
            
https://beam.apache.org/releases/pydoc/current/apache_beam.transforms.util.html#apache_beam.transforms.util.BatchElements



            On Thu, Sep 21, 2023 at 7:23 PM Joey Tran
            <joey.t...@schrodinger.com> wrote:

                Writing a runner and the first strategy for
                determining bundling size was to just start with a
                bundle size of one and double it until we reach a size
                that we expect to take some targets per-bundle runtime
                (e.g. maybe 10 minutes). I realize that this isn't the
                greatest strategy for high sized cost transforms. I'm
                curious what kind of strategies other runners take?

Reply via email to