> Watermarks shouldn't be (visibly) advanced until @FinishBundle is committed, as there's no guarantee that this work won't be discarded.

There was a thread [1], where the conclusion seemed to be that updating watermark is possible even in the middle of a bundle. Actually, handling watermarks is runner-dependent (e.g. Flink does not store watermarks in checkpoints, they are always recomputed from scratch on restore).

[1] https://lists.apache.org/thread/10db7l9bhnhmo484myps723sfxtjwwmv

On 9/22/23 21:47, Robert Bradshaw via dev wrote:
On Fri, Sep 22, 2023 at 10:58 AM Jan Lukavský <je...@seznam.cz> wrote:

    On 9/22/23 18:07, Robert Bradshaw via dev wrote:

    On Fri, Sep 22, 2023 at 7:23 AM Byron Ellis via dev
    <dev@beam.apache.org> wrote:

        I've actually wondered about this specifically for
        streaming... if you're writing a pipeline there it seems like
        you're often going to want to put high fixed cost things like
        database connections even outside of the bundle setup. You
        really only want to do that once in the lifetime of the
        worker itself, not the bundle. Seems like having that
        boundary be somewhere other than an arbitrarily (and probably
        small in streaming to avoid latency) group of elements might
        be more useful? I suppose this depends heavily on the object
        lifecycle in the sdk worker though.


    +1. This is the difference between @Setup and @StartBundle. The
    start/finish bundle operations should be used for bracketing
    element processing that must be committed as a unit for
    correct failure recovery (e.g. if elements are cached in
    ProcessElement, they should all be emitted in FinishBundle). On
    the other hand, things like open database connections can and
    likely should be shared across bundles.
    This is correct, but the caching between @StartBundle and
    @FinishBundle has some problems. First, users need to manually set
    watermark hold for min(timestamp in bundle), otherwise watermark
    might overtake the buffered elements.


Watermarks shouldn't be (visibly) advanced until @FinishBundle is committed, as there's no guarantee that this work won't be discarded.

    Users don't have other option than using timer.withOutputTimestamp
    for that, as we don't have a user-facing API to set watermark hold
    otherwise, thus the in-bundle caching implies stateful DoFn. The
    question might then by, why not use "classical" stateful caching
    involving state, as there is full control over the caching in user
    code. This triggered me an idea if it would be useful to add the
    information about caching to the API (e.g. in Java
    @StartBundle(caching=true)), which could solve the above issues
    maybe (runner would know to set the hold, it could work with
    "stateless" DoFns)?


Really, this is one of the areas that the streaming/batch abstraction leaks. In batch it was a common pattern to have local DoFn instance state that persisted from start to finish bundle, and these were also used as convenient entry points for other operations (like opening database connections) 'cause bundles were often "as large as possible." WIth the advent of n streaming it makes sense to put this in explicitly managed runner state to allow for cross-bundle amortization and there's more value in distinguishing between @Setup and @StartBundle.

(Were I do to things over I'd probably encourage an API that discouraged non-configuration instance state on DoFns altogether, e.g. in the notion of Python context managers (and an equivalent API could probably be put together with AutoClosables in Java) one would have something like

ParDo(X)

which would logically (though not necessarily physically) lead to an execution like

with X.bundle_processor() as bundle_processor:
  for bundle in bundles:
    with bundle_processor.element_processor() as process:
      for element in bundle:
        process(element)

where the traditional setup/start_bundle/finish_bundle/teardown logic would live in the __enter__ and __exit__ methods (made even easier with coroutines.) For convenience one could of course provide a raw bundle processor or element processor to ParDo if the enter/exit contexts are trivial. But this is getting somewhat off-topic...


        Best,
        B

        On Fri, Sep 22, 2023 at 7:03 AM Kenneth Knowles
        <k...@apache.org> wrote:

            (I notice that you replied only to yourself, but there
            has been a whole thread of discussion on this - are you
            subscribed to dev@beam?
            https://lists.apache.org/thread/k81fq301ypwmjowknzyqq2qc63844rbd)

            It sounds like you want what everyone wants: to have the
            biggest bundles possible.

            So for bounded data, basically you make even splits of
            the data and each split is one bundle. And then dynamic
            splitting to redistribute work to eliminate stragglers,
            if your engine has that capability.

            For unbounded data, you more-or-less bundle as much as
            you can without waiting too long, like Jan described.

            Users know to put their high fixed costs in @StartBundle
            and then it is the runner's job to put as many calls
            to @ProcessElement as possible to amortize.

            Kenn

            On Fri, Sep 22, 2023 at 9:39 AM Joey Tran
            <joey.t...@schrodinger.com> wrote:

                Whoops, I typoed my last email. I meant to write
                "this isn't the greatest strategy for high *fixed*
                cost transforms", e.g. a transform that takes 5
                minutes to get set up and then maybe a microsecond
                per input

                I suppose one solution is to move the responsibility
                for handling this kind of situation to the user and
                expect users to use a bundling transform (e.g.
                BatchElements [1]) followed by a Reshuffle+FlatMap.
                Is this what other runners expect? Just want to make
                sure I'm not missing some smart generic bundling
                strategy that might handle this for users.

                [1]
                
https://beam.apache.org/releases/pydoc/current/apache_beam.transforms.util.html#apache_beam.transforms.util.BatchElements



                On Thu, Sep 21, 2023 at 7:23 PM Joey Tran
                <joey.t...@schrodinger.com> wrote:

                    Writing a runner and the first strategy for
                    determining bundling size was to just start with
                    a bundle size of one and double it until we reach
                    a size that we expect to take some targets
                    per-bundle runtime (e.g. maybe 10 minutes). I
                    realize that this isn't the greatest strategy for
                    high sized cost transforms. I'm curious what kind
                    of strategies other runners take?

Reply via email to