On 9/22/23 18:07, Robert Bradshaw via dev wrote:
On Fri, Sep 22, 2023 at 7:23 AM Byron Ellis via dev
<dev@beam.apache.org> wrote:
I've actually wondered about this specifically for streaming... if
you're writing a pipeline there it seems like you're often going
to want to put high fixed cost things like database connections
even outside of the bundle setup. You really only want to do that
once in the lifetime of the worker itself, not the bundle. Seems
like having that boundary be somewhere other than an arbitrarily
(and probably small in streaming to avoid latency) group of
elements might be more useful? I suppose this depends heavily on
the object lifecycle in the sdk worker though.
+1. This is the difference between @Setup and @StartBundle. The
start/finish bundle operations should be used for bracketing element
processing that must be committed as a unit for
correct failure recovery (e.g. if elements are cached in
ProcessElement, they should all be emitted in FinishBundle). On the
other hand, things like open database connections can and likely
should be shared across bundles.
This is correct, but the caching between @StartBundle and @FinishBundle
has some problems. First, users need to manually set watermark hold for
min(timestamp in bundle), otherwise watermark might overtake the
buffered elements. Users don't have other option than using
timer.withOutputTimestamp for that, as we don't have a user-facing API
to set watermark hold otherwise, thus the in-bundle caching implies
stateful DoFn. The question might then by, why not use "classical"
stateful caching involving state, as there is full control over the
caching in user code. This triggered me an idea if it would be useful to
add the information about caching to the API (e.g. in Java
@StartBundle(caching=true)), which could solve the above issues maybe
(runner would know to set the hold, it could work with "stateless" DoFns)?
Best,
B
On Fri, Sep 22, 2023 at 7:03 AM Kenneth Knowles <k...@apache.org>
wrote:
(I notice that you replied only to yourself, but there has
been a whole thread of discussion on this - are you subscribed
to dev@beam?
https://lists.apache.org/thread/k81fq301ypwmjowknzyqq2qc63844rbd)
It sounds like you want what everyone wants: to have the
biggest bundles possible.
So for bounded data, basically you make even splits of the
data and each split is one bundle. And then dynamic splitting
to redistribute work to eliminate stragglers, if your engine
has that capability.
For unbounded data, you more-or-less bundle as much as you can
without waiting too long, like Jan described.
Users know to put their high fixed costs in @StartBundle and
then it is the runner's job to put as many calls
to @ProcessElement as possible to amortize.
Kenn
On Fri, Sep 22, 2023 at 9:39 AM Joey Tran
<joey.t...@schrodinger.com> wrote:
Whoops, I typoed my last email. I meant to write "this
isn't the greatest strategy for high *fixed* cost
transforms", e.g. a transform that takes 5 minutes to get
set up and then maybe a microsecond per input
I suppose one solution is to move the responsibility for
handling this kind of situation to the user and expect
users to use a bundling transform (e.g. BatchElements [1])
followed by a Reshuffle+FlatMap. Is this what other
runners expect? Just want to make sure I'm not missing
some smart generic bundling strategy that might handle
this for users.
[1]
https://beam.apache.org/releases/pydoc/current/apache_beam.transforms.util.html#apache_beam.transforms.util.BatchElements
On Thu, Sep 21, 2023 at 7:23 PM Joey Tran
<joey.t...@schrodinger.com> wrote:
Writing a runner and the first strategy for
determining bundling size was to just start with a
bundle size of one and double it until we reach a size
that we expect to take some targets per-bundle runtime
(e.g. maybe 10 minutes). I realize that this isn't the
greatest strategy for high sized cost transforms. I'm
curious what kind of strategies other runners take?