> Watermarks shouldn't be (visibly) advanced until @FinishBundle is
committed, as there's no guarantee that this work won't be discarded.
There was a thread [1], where the conclusion seemed to be that updating
watermark is possible even in the middle of a bundle. Actually, handling
watermarks is runner-dependent (e.g. Flink does not store watermarks in
checkpoints, they are always recomputed from scratch on restore).
[1] https://lists.apache.org/thread/10db7l9bhnhmo484myps723sfxtjwwmv
On 9/22/23 21:47, Robert Bradshaw via dev wrote:
On Fri, Sep 22, 2023 at 10:58 AM Jan Lukavský <je...@seznam.cz> wrote:
On 9/22/23 18:07, Robert Bradshaw via dev wrote:
On Fri, Sep 22, 2023 at 7:23 AM Byron Ellis via dev
<dev@beam.apache.org> wrote:
I've actually wondered about this specifically for
streaming... if you're writing a pipeline there it seems like
you're often going to want to put high fixed cost things like
database connections even outside of the bundle setup. You
really only want to do that once in the lifetime of the
worker itself, not the bundle. Seems like having that
boundary be somewhere other than an arbitrarily (and probably
small in streaming to avoid latency) group of elements might
be more useful? I suppose this depends heavily on the object
lifecycle in the sdk worker though.
+1. This is the difference between @Setup and @StartBundle. The
start/finish bundle operations should be used for bracketing
element processing that must be committed as a unit for
correct failure recovery (e.g. if elements are cached in
ProcessElement, they should all be emitted in FinishBundle). On
the other hand, things like open database connections can and
likely should be shared across bundles.
This is correct, but the caching between @StartBundle and
@FinishBundle has some problems. First, users need to manually set
watermark hold for min(timestamp in bundle), otherwise watermark
might overtake the buffered elements.
Watermarks shouldn't be (visibly) advanced until @FinishBundle is
committed, as there's no guarantee that this work won't be discarded.
Users don't have other option than using timer.withOutputTimestamp
for that, as we don't have a user-facing API to set watermark hold
otherwise, thus the in-bundle caching implies stateful DoFn. The
question might then by, why not use "classical" stateful caching
involving state, as there is full control over the caching in user
code. This triggered me an idea if it would be useful to add the
information about caching to the API (e.g. in Java
@StartBundle(caching=true)), which could solve the above issues
maybe (runner would know to set the hold, it could work with
"stateless" DoFns)?
Really, this is one of the areas that the streaming/batch abstraction
leaks. In batch it was a common pattern to have local DoFn instance
state that persisted from start to finish bundle, and these were also
used as convenient entry points for other operations (like opening
database connections) 'cause bundles were often "as large as
possible." WIth the advent of n streaming it makes sense to put this
in explicitly managed runner state to allow for cross-bundle
amortization and there's more value in distinguishing between @Setup
and @StartBundle.
(Were I do to things over I'd probably encourage an API that
discouraged non-configuration instance state on DoFns altogether, e.g.
in the notion of Python context managers (and an equivalent API could
probably be put together with AutoClosables in Java) one would have
something like
ParDo(X)
which would logically (though not necessarily physically) lead to an
execution like
with X.bundle_processor() as bundle_processor:
for bundle in bundles:
with bundle_processor.element_processor() as process:
for element in bundle:
process(element)
where the traditional setup/start_bundle/finish_bundle/teardown logic
would live in the __enter__ and __exit__ methods (made even easier
with coroutines.) For convenience one could of course provide a raw
bundle processor or element processor to ParDo if the enter/exit
contexts are trivial. But this is getting somewhat off-topic...
Best,
B
On Fri, Sep 22, 2023 at 7:03 AM Kenneth Knowles
<k...@apache.org> wrote:
(I notice that you replied only to yourself, but there
has been a whole thread of discussion on this - are you
subscribed to dev@beam?
https://lists.apache.org/thread/k81fq301ypwmjowknzyqq2qc63844rbd)
It sounds like you want what everyone wants: to have the
biggest bundles possible.
So for bounded data, basically you make even splits of
the data and each split is one bundle. And then dynamic
splitting to redistribute work to eliminate stragglers,
if your engine has that capability.
For unbounded data, you more-or-less bundle as much as
you can without waiting too long, like Jan described.
Users know to put their high fixed costs in @StartBundle
and then it is the runner's job to put as many calls
to @ProcessElement as possible to amortize.
Kenn
On Fri, Sep 22, 2023 at 9:39 AM Joey Tran
<joey.t...@schrodinger.com> wrote:
Whoops, I typoed my last email. I meant to write
"this isn't the greatest strategy for high *fixed*
cost transforms", e.g. a transform that takes 5
minutes to get set up and then maybe a microsecond
per input
I suppose one solution is to move the responsibility
for handling this kind of situation to the user and
expect users to use a bundling transform (e.g.
BatchElements [1]) followed by a Reshuffle+FlatMap.
Is this what other runners expect? Just want to make
sure I'm not missing some smart generic bundling
strategy that might handle this for users.
[1]
https://beam.apache.org/releases/pydoc/current/apache_beam.transforms.util.html#apache_beam.transforms.util.BatchElements
On Thu, Sep 21, 2023 at 7:23 PM Joey Tran
<joey.t...@schrodinger.com> wrote:
Writing a runner and the first strategy for
determining bundling size was to just start with
a bundle size of one and double it until we reach
a size that we expect to take some targets
per-bundle runtime (e.g. maybe 10 minutes). I
realize that this isn't the greatest strategy for
high sized cost transforms. I'm curious what kind
of strategies other runners take?