Hi Kenn and Reuven,
I agree with all these points. The only issue here seems to be that
FlinkRunner does not fulfill these constraints. This is a bug that can
be fixed, though we need to change some defaults, as 1000 ms default
bundle "duration" for lower traffic Pipelines can be too much. We are
also probably missing some @ValidatesReunner tests for this. I created
[1] and [2] to track this.
One question still remains, the bundle vs. element life-cycle is
relevant only for cases where processing of element X can affect
processing of element Y later in the same bundle. Once this influence is
rules out (i.e. no caching), this information can result in runner
optimization that yields better performance. Should we consider
propagate this information from user code to the runner?
[1] https://github.com/apache/beam/issues/28649
[2] https://github.com/apache/beam/issues/28650
On 9/25/23 18:31, Reuven Lax via dev wrote:
On Mon, Sep 25, 2023 at 6:19 AM Jan Lukavský <je...@seznam.cz> wrote:
On 9/23/23 18:16, Reuven Lax via dev wrote:
Two separate things here:
1. Yes, a watermark can update in the middle of a bundle.
2. The records in the bundle themselves will prevent the
watermark from updating as they are still in flight until after
finish bundle. Therefore simply caching the records should always
be watermark safe, regardless of the runner. You will only run
into problems if you try and move timestamps "backwards" - which
is why Beam strongly discourages this.
This is not aligned with FlinkRunner's implementation. And I
actually think it is not aligned conceptually. As mentioned,
Flink does not have the concept of bundles at all. It achieves
fault tolerance via checkpointing, essentially checkpoint barrier
flowing from sources to sinks, safely snapshotting state of each
operator on the way. Bundles are implemented as a somewhat
arbitrary set of elements between two consecutive checkpoints
(there can be multiple bundles between checkpoints). A bundle is
'committed' (i.e. persistently stored and guaranteed not to retry)
only after the checkpoint barrier passes over the elements in the
bundle (every bundle is finished at the very latest exactly before
a checkpoint). But watermark propagation and bundle finalization
is completely unrelated. This might be a bug in the runner, but
requiring checkpoint for watermark propagation will introduce
insane delays between processing time and watermarks, every
executable stage will delay watermark propagation until a
checkpoint (which is typically the order of seconds). This delay
would add up after each stage.
It's not bundles that hold up processing, rather it is elements, and
elements are not considered "processed" until FinishBundle.
You are right about Flink. In many cases this is fine - if Flink rolls
back to the last checkpoint, the watermark will also roll back, and
everything stays consistent. So in general, one does not need to wait
for checkpoints for watermark propagation.
Where things get a bit weirder with Flink is whenever one has external
side effects. In theory, one should wait for checkpoints before
letting a Sink flush, otherwise one could end up with incorrect
outputs (especially with a sink like TextIO). Flink itself recognizes
this, and that's why they provide TwoPhaseCommitSinkFunction
<https://nightlies.apache.org/flink/flink-docs-master/api/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunction.html> which
waits for a checkpoint. In Beam, this is the reason we introduced
RequiresStableInput. Of course in practice many Flink users don't do
this - in which case they are prioritizing latency over data correctness.
Reuven
On Sat, Sep 23, 2023 at 12:03 AM Jan Lukavský <je...@seznam.cz>
wrote:
> Watermarks shouldn't be (visibly) advanced until
@FinishBundle is committed, as there's no guarantee that this
work won't be discarded.
There was a thread [1], where the conclusion seemed to be
that updating watermark is possible even in the middle of a
bundle. Actually, handling watermarks is runner-dependent
(e.g. Flink does not store watermarks in checkpoints, they
are always recomputed from scratch on restore).
[1]
https://lists.apache.org/thread/10db7l9bhnhmo484myps723sfxtjwwmv
On 9/22/23 21:47, Robert Bradshaw via dev wrote:
On Fri, Sep 22, 2023 at 10:58 AM Jan Lukavský
<je...@seznam.cz> wrote:
On 9/22/23 18:07, Robert Bradshaw via dev wrote:
On Fri, Sep 22, 2023 at 7:23 AM Byron Ellis via dev
<dev@beam.apache.org> wrote:
I've actually wondered about this specifically for
streaming... if you're writing a pipeline there it
seems like you're often going to want to put high
fixed cost things like database connections even
outside of the bundle setup. You really only want
to do that once in the lifetime of the worker
itself, not the bundle. Seems like having that
boundary be somewhere other than an arbitrarily
(and probably small in streaming to avoid latency)
group of elements might be more useful? I suppose
this depends heavily on the object lifecycle in the
sdk worker though.
+1. This is the difference between @Setup
and @StartBundle. The start/finish bundle operations
should be used for bracketing element processing that
must be committed as a unit for
correct failure recovery (e.g. if elements are cached
in ProcessElement, they should all be emitted in
FinishBundle). On the other hand, things like open
database connections can and likely should be shared
across bundles.
This is correct, but the caching between @StartBundle
and @FinishBundle has some problems. First, users need
to manually set watermark hold for min(timestamp in
bundle), otherwise watermark might overtake the buffered
elements.
Watermarks shouldn't be (visibly) advanced until
@FinishBundle is committed, as there's no guarantee that
this work won't be discarded.
Users don't have other option than using
timer.withOutputTimestamp for that, as we don't have a
user-facing API to set watermark hold otherwise, thus
the in-bundle caching implies stateful DoFn. The
question might then by, why not use "classical" stateful
caching involving state, as there is full control over
the caching in user code. This triggered me an idea if
it would be useful to add the information about caching
to the API (e.g. in Java @StartBundle(caching=true)),
which could solve the above issues maybe (runner would
know to set the hold, it could work with "stateless" DoFns)?
Really, this is one of the areas that the streaming/batch
abstraction leaks. In batch it was a common pattern to have
local DoFn instance state that persisted from start to
finish bundle, and these were also used as convenient entry
points for other operations (like opening
database connections) 'cause bundles were often "as large as
possible." WIth the advent of n streaming it makes sense to
put this in explicitly managed runner state to allow for
cross-bundle amortization and there's more value in
distinguishing between @Setup and @StartBundle.
(Were I do to things over I'd probably encourage an API that
discouraged non-configuration instance state on DoFns
altogether, e.g. in the notion of Python context managers
(and an equivalent API could probably be put together with
AutoClosables in Java) one would have something like
ParDo(X)
which would logically (though not necessarily physically)
lead to an execution like
with X.bundle_processor() as bundle_processor:
for bundle in bundles:
with bundle_processor.element_processor() as process:
for element in bundle:
process(element)
where the traditional
setup/start_bundle/finish_bundle/teardown logic would live
in the __enter__ and __exit__ methods (made even easier with
coroutines.) For convenience one could of course provide a
raw bundle processor or element processor to ParDo if the
enter/exit contexts are trivial. But this is getting
somewhat off-topic...
Best,
B
On Fri, Sep 22, 2023 at 7:03 AM Kenneth Knowles
<k...@apache.org> wrote:
(I notice that you replied only to yourself,
but there has been a whole thread of discussion
on this - are you subscribed to dev@beam?
https://lists.apache.org/thread/k81fq301ypwmjowknzyqq2qc63844rbd)
It sounds like you want what everyone wants: to
have the biggest bundles possible.
So for bounded data, basically you make even
splits of the data and each split is one
bundle. And then dynamic splitting to
redistribute work to eliminate stragglers, if
your engine has that capability.
For unbounded data, you more-or-less bundle as
much as you can without waiting too long, like
Jan described.
Users know to put their high fixed costs
in @StartBundle and then it is the runner's job
to put as many calls to @ProcessElement as
possible to amortize.
Kenn
On Fri, Sep 22, 2023 at 9:39 AM Joey Tran
<joey.t...@schrodinger.com> wrote:
Whoops, I typoed my last email. I meant to
write "this isn't the greatest strategy for
high *fixed* cost transforms", e.g. a
transform that takes 5 minutes to get set
up and then maybe a microsecond per input
I suppose one solution is to move the
responsibility for handling this kind of
situation to the user and expect users to
use a bundling transform (e.g.
BatchElements [1]) followed by a
Reshuffle+FlatMap. Is this what other
runners expect? Just want to make sure I'm
not missing some smart generic bundling
strategy that might handle this for users.
[1]
https://beam.apache.org/releases/pydoc/current/apache_beam.transforms.util.html#apache_beam.transforms.util.BatchElements
On Thu, Sep 21, 2023 at 7:23 PM Joey Tran
<joey.t...@schrodinger.com> wrote:
Writing a runner and the first strategy
for determining bundling size was to
just start with a bundle size of one
and double it until we reach a size
that we expect to take some targets
per-bundle runtime (e.g. maybe 10
minutes). I realize that this isn't the
greatest strategy for high sized cost
transforms. I'm curious what kind of
strategies other runners take?