On 9/23/23 18:16, Reuven Lax via dev wrote:
Two separate things here:
1. Yes, a watermark can update in the middle of a bundle.
2. The records in the bundle themselves will prevent the watermark
from updating as they are still in flight until after finish bundle.
Therefore simply caching the records should always be watermark safe,
regardless of the runner. You will only run into problems if you try
and move timestamps "backwards" - which is why Beam strongly
discourages this.
This is not aligned with FlinkRunner's implementation. And I actually
think it is not aligned conceptually. As mentioned, Flink does not have
the concept of bundles at all. It achieves fault tolerance via
checkpointing, essentially checkpoint barrier flowing from sources to
sinks, safely snapshotting state of each operator on the way. Bundles
are implemented as a somewhat arbitrary set of elements between two
consecutive checkpoints (there can be multiple bundles between
checkpoints). A bundle is 'committed' (i.e. persistently stored and
guaranteed not to retry) only after the checkpoint barrier passes over
the elements in the bundle (every bundle is finished at the very latest
exactly before a checkpoint). But watermark propagation and bundle
finalization is completely unrelated. This might be a bug in the runner,
but requiring checkpoint for watermark propagation will introduce insane
delays between processing time and watermarks, every executable stage
will delay watermark propagation until a checkpoint (which is typically
the order of seconds). This delay would add up after each stage.
Reuven
On Sat, Sep 23, 2023 at 12:03 AM Jan Lukavský <je...@seznam.cz> wrote:
> Watermarks shouldn't be (visibly) advanced until @FinishBundle
is committed, as there's no guarantee that this work won't be
discarded.
There was a thread [1], where the conclusion seemed to be that
updating watermark is possible even in the middle of a bundle.
Actually, handling watermarks is runner-dependent (e.g. Flink does
not store watermarks in checkpoints, they are always recomputed
from scratch on restore).
[1] https://lists.apache.org/thread/10db7l9bhnhmo484myps723sfxtjwwmv
On 9/22/23 21:47, Robert Bradshaw via dev wrote:
On Fri, Sep 22, 2023 at 10:58 AM Jan Lukavský <je...@seznam.cz>
wrote:
On 9/22/23 18:07, Robert Bradshaw via dev wrote:
On Fri, Sep 22, 2023 at 7:23 AM Byron Ellis via dev
<dev@beam.apache.org> wrote:
I've actually wondered about this specifically for
streaming... if you're writing a pipeline there it seems
like you're often going to want to put high fixed cost
things like database connections even outside of the
bundle setup. You really only want to do that once in
the lifetime of the worker itself, not the bundle. Seems
like having that boundary be somewhere other than an
arbitrarily (and probably small in streaming to avoid
latency) group of elements might be more useful? I
suppose this depends heavily on the object lifecycle in
the sdk worker though.
+1. This is the difference between @Setup and @StartBundle.
The start/finish bundle operations should be used for
bracketing element processing that must be committed as a
unit for correct failure recovery (e.g. if elements are
cached in ProcessElement, they should all be emitted in
FinishBundle). On the other hand, things like open database
connections can and likely should be shared across bundles.
This is correct, but the caching between @StartBundle and
@FinishBundle has some problems. First, users need to
manually set watermark hold for min(timestamp in bundle),
otherwise watermark might overtake the buffered elements.
Watermarks shouldn't be (visibly) advanced until @FinishBundle is
committed, as there's no guarantee that this work won't be
discarded.
Users don't have other option than using
timer.withOutputTimestamp for that, as we don't have a
user-facing API to set watermark hold otherwise, thus the
in-bundle caching implies stateful DoFn. The question might
then by, why not use "classical" stateful caching involving
state, as there is full control over the caching in user
code. This triggered me an idea if it would be useful to add
the information about caching to the API (e.g. in Java
@StartBundle(caching=true)), which could solve the above
issues maybe (runner would know to set the hold, it could
work with "stateless" DoFns)?
Really, this is one of the areas that the streaming/batch
abstraction leaks. In batch it was a common pattern to have local
DoFn instance state that persisted from start to finish bundle,
and these were also used as convenient entry points for other
operations (like opening database connections) 'cause bundles
were often "as large as possible." WIth the advent of n streaming
it makes sense to put this in explicitly managed runner state to
allow for cross-bundle amortization and there's more value in
distinguishing between @Setup and @StartBundle.
(Were I do to things over I'd probably encourage an API that
discouraged non-configuration instance state on DoFns altogether,
e.g. in the notion of Python context managers (and an equivalent
API could probably be put together with AutoClosables in Java)
one would have something like
ParDo(X)
which would logically (though not necessarily physically) lead to
an execution like
with X.bundle_processor() as bundle_processor:
for bundle in bundles:
with bundle_processor.element_processor() as process:
for element in bundle:
process(element)
where the traditional setup/start_bundle/finish_bundle/teardown
logic would live in the __enter__ and __exit__ methods (made even
easier with coroutines.) For convenience one could of course
provide a raw bundle processor or element processor to ParDo if
the enter/exit contexts are trivial. But this is getting somewhat
off-topic...
Best,
B
On Fri, Sep 22, 2023 at 7:03 AM Kenneth Knowles
<k...@apache.org> wrote:
(I notice that you replied only to yourself, but
there has been a whole thread of discussion on this
- are you subscribed to dev@beam?
https://lists.apache.org/thread/k81fq301ypwmjowknzyqq2qc63844rbd)
It sounds like you want what everyone wants: to have
the biggest bundles possible.
So for bounded data, basically you make even splits
of the data and each split is one bundle. And then
dynamic splitting to redistribute work to eliminate
stragglers, if your engine has that capability.
For unbounded data, you more-or-less bundle as much
as you can without waiting too long, like Jan described.
Users know to put their high fixed costs
in @StartBundle and then it is the runner's job to
put as many calls to @ProcessElement as possible to
amortize.
Kenn
On Fri, Sep 22, 2023 at 9:39 AM Joey Tran
<joey.t...@schrodinger.com> wrote:
Whoops, I typoed my last email. I meant to write
"this isn't the greatest strategy for high
*fixed* cost transforms", e.g. a transform that
takes 5 minutes to get set up and then maybe a
microsecond per input
I suppose one solution is to move the
responsibility for handling this kind of
situation to the user and expect users to use a
bundling transform (e.g. BatchElements [1])
followed by a Reshuffle+FlatMap. Is this what
other runners expect? Just want to make sure I'm
not missing some smart generic bundling strategy
that might handle this for users.
[1]
https://beam.apache.org/releases/pydoc/current/apache_beam.transforms.util.html#apache_beam.transforms.util.BatchElements
On Thu, Sep 21, 2023 at 7:23 PM Joey Tran
<joey.t...@schrodinger.com> wrote:
Writing a runner and the first strategy for
determining bundling size was to just start
with a bundle size of one and double it
until we reach a size that we expect to take
some targets per-bundle runtime (e.g. maybe
10 minutes). I realize that this isn't the
greatest strategy for high sized cost
transforms. I'm curious what kind of
strategies other runners take?