What is the reason to rely on StartBundle and not Setup in this case? If
the life-cycle of bundle is not "closed" (i.e. start - finish), then it
seems to be ill defined and Setup should do?
I'm trying to think of non-caching use-cases of
StartBundle-FinishBundle, are there such cases? I'd say yes, but I'm a
little struggling finding a specific example that cannot be solved using
Setup or lazy init.
On 9/27/23 19:58, Reuven Lax via dev wrote:
DoFns are allowed to be non deterministic, so they don't have to yield
the "same" output.
The example I'm thinking of is where users perform some "best-effort"
deduplication by creating a hashmap in StartBundle and removing
duplicates. This is usually done purely for performance to reduce
shuffle size, as opposed to a guaranteed RemoveDuplicates. This
scenario doesn't require FinishBundle, though it does require a
StartBundle.
On Tue, Sep 26, 2023 at 11:59 AM Kenneth Knowles <k...@apache.org> wrote:
On Tue, Sep 26, 2023 at 1:33 PM Reuven Lax via dev
<dev@beam.apache.org> wrote:
Yes, not including FinishBundle in ParDoPayload seems like a
mistake. Though absence of FinishBundle doesn't mean that one
can assume that elements in a bundle don't affect subsequent
bundle elements (i.e. there might still be caching!)
Well for a DoFn to be correct, it has to yield the same (or "the
same as much as the user expects it to be the same") output
regardless of order of processing or bundling so a runner or SDK
harness can definitely take a bunch of elements and process them
however it wants if there's no @FinishBundle. I think that's what
Jan is getting at - adding a @FinishBundle is the user placing a
new restriction on the runner. Technically probably have to
include @StartBundle in that consideration.
Kenn
On Tue, Sep 26, 2023 at 8:54 AM Kenneth Knowles
<k...@apache.org> wrote:
On Mon, Sep 25, 2023 at 1:19 PM Jan Lukavský
<je...@seznam.cz> wrote:
Hi Kenn and Reuven,
I agree with all these points. The only issue here
seems to be that FlinkRunner does not fulfill these
constraints. This is a bug that can be fixed, though
we need to change some defaults, as 1000 ms default
bundle "duration" for lower traffic Pipelines can be
too much. We are also probably missing some
@ValidatesReunner tests for this. I created [1] and
[2] to track this.
One question still remains, the bundle vs. element
life-cycle is relevant only for cases where processing
of element X can affect processing of element Y later
in the same bundle. Once this influence is rules out
(i.e. no caching), this information can result in
runner optimization that yields better performance.
Should we consider propagate this information from
user code to the runner?
Yes!
This was the explicit goal of the move to
annotation-driven DoFn in https://s.apache.org/a-new-dofn
to make it so that the SDK and runner can get good
information about what the DoFn requirements are.
When there is no @FinishBundle method, the runner can make
additional optimizations. This should have been included
in the ParDoPayload in the proto when we moved to portable
pipelines. I cannot remember if there was a good reason
that we did not do so. Maybe we (incorrectly) thought that
this was an issue that only the Java SDK harness needed to
know about.
Kenn
[1] https://github.com/apache/beam/issues/28649
[2] https://github.com/apache/beam/issues/28650
On 9/25/23 18:31, Reuven Lax via dev wrote:
On Mon, Sep 25, 2023 at 6:19 AM Jan Lukavský
<je...@seznam.cz> wrote:
On 9/23/23 18:16, Reuven Lax via dev wrote:
Two separate things here:
1. Yes, a watermark can update in the middle of
a bundle.
2. The records in the bundle themselves will
prevent the watermark from updating as they are
still in flight until after finish bundle.
Therefore simply caching the records should
always be watermark safe, regardless of the
runner. You will only run into problems if you
try and move timestamps "backwards" - which is
why Beam strongly discourages this.
This is not aligned with FlinkRunner's
implementation. And I actually think it is not
aligned conceptually. As mentioned, Flink does
not have the concept of bundles at all. It
achieves fault tolerance via checkpointing,
essentially checkpoint barrier flowing from
sources to sinks, safely snapshotting state of
each operator on the way. Bundles are implemented
as a somewhat arbitrary set of elements between
two consecutive checkpoints (there can be
multiple bundles between checkpoints). A bundle
is 'committed' (i.e. persistently stored and
guaranteed not to retry) only after the
checkpoint barrier passes over the elements in
the bundle (every bundle is finished at the very
latest exactly before a checkpoint). But
watermark propagation and bundle finalization is
completely unrelated. This might be a bug in the
runner, but requiring checkpoint for watermark
propagation will introduce insane delays between
processing time and watermarks, every executable
stage will delay watermark propagation until a
checkpoint (which is typically the order of
seconds). This delay would add up after each stage.
It's not bundles that hold up processing, rather it
is elements, and elements are not considered
"processed" until FinishBundle.
You are right about Flink. In many cases this is fine
- if Flink rolls back to the last checkpoint, the
watermark will also roll back, and everything stays
consistent. So in general, one does not need to wait
for checkpoints for watermark propagation.
Where things get a bit weirder with Flink is whenever
one has external side effects. In theory, one should
wait for checkpoints before letting a Sink flush,
otherwise one could end up with incorrect outputs
(especially with a sink like TextIO). Flink itself
recognizes this, and that's why they provide
TwoPhaseCommitSinkFunction
<https://nightlies.apache.org/flink/flink-docs-master/api/java/org/apache/flink/streaming/api/functions/sink/TwoPhaseCommitSinkFunction.html>
which
waits for a checkpoint. In Beam, this is the reason
we introduced RequiresStableInput. Of course in
practice many Flink users don't do this - in which
case they are prioritizing latency over data
correctness.
Reuven
On Sat, Sep 23, 2023 at 12:03 AM Jan Lukavský
<je...@seznam.cz> wrote:
> Watermarks shouldn't be (visibly) advanced
until @FinishBundle is committed, as there's
no guarantee that this work won't be discarded.
There was a thread [1], where the conclusion
seemed to be that updating watermark is
possible even in the middle of a bundle.
Actually, handling watermarks is
runner-dependent (e.g. Flink does not store
watermarks in checkpoints, they are always
recomputed from scratch on restore).
[1]
https://lists.apache.org/thread/10db7l9bhnhmo484myps723sfxtjwwmv
On 9/22/23 21:47, Robert Bradshaw via dev wrote:
On Fri, Sep 22, 2023 at 10:58 AM Jan
Lukavský <je...@seznam.cz> wrote:
On 9/22/23 18:07, Robert Bradshaw via
dev wrote:
On Fri, Sep 22, 2023 at 7:23 AM Byron
Ellis via dev <dev@beam.apache.org> wrote:
I've actually wondered about this
specifically for streaming... if
you're writing a pipeline there it
seems like you're often going to
want to put high fixed cost things
like database connections even
outside of the bundle setup. You
really only want to do that once
in the lifetime of the worker
itself, not the bundle. Seems like
having that boundary be somewhere
other than an arbitrarily (and
probably small in streaming to
avoid latency) group of elements
might be more useful? I suppose
this depends heavily on the object
lifecycle in the sdk worker though.
+1. This is the difference
between @Setup and @StartBundle. The
start/finish bundle operations should
be used for bracketing element
processing that must be committed as a
unit for correct failure recovery
(e.g. if elements are cached in
ProcessElement, they should all be
emitted in FinishBundle). On the other
hand, things like open database
connections can and likely should be
shared across bundles.
This is correct, but the caching
between @StartBundle and @FinishBundle
has some problems. First, users need to
manually set watermark hold for
min(timestamp in bundle), otherwise
watermark might overtake the buffered
elements.
Watermarks shouldn't be (visibly) advanced
until @FinishBundle is committed, as
there's no guarantee that this work won't
be discarded.
Users don't have other option than
using timer.withOutputTimestamp for
that, as we don't have a user-facing
API to set watermark hold otherwise,
thus the in-bundle caching implies
stateful DoFn. The question might then
by, why not use "classical" stateful
caching involving state, as there is
full control over the caching in user
code. This triggered me an idea if it
would be useful to add the information
about caching to the API (e.g. in Java
@StartBundle(caching=true)), which
could solve the above issues maybe
(runner would know to set the hold, it
could work with "stateless" DoFns)?
Really, this is one of the areas that the
streaming/batch abstraction leaks. In batch
it was a common pattern to have local DoFn
instance state that persisted from start to
finish bundle, and these were also used as
convenient entry points for other
operations (like opening
database connections) 'cause bundles were
often "as large as possible." WIth the
advent of n streaming it makes sense to put
this in explicitly managed runner state to
allow for cross-bundle amortization and
there's more value in
distinguishing between @Setup and @StartBundle.
(Were I do to things over I'd probably
encourage an API that discouraged
non-configuration instance state on DoFns
altogether, e.g. in the notion of Python
context managers (and an equivalent API
could probably be put together with
AutoClosables in Java) one would have
something like
ParDo(X)
which would logically (though not
necessarily physically) lead to an
execution like
with X.bundle_processor() as bundle_processor:
for bundle in bundles:
with
bundle_processor.element_processor() as
process:
for element in bundle:
process(element)
where the traditional
setup/start_bundle/finish_bundle/teardown
logic would live in the __enter__ and
__exit__ methods (made even easier with
coroutines.) For convenience one could of
course provide a raw bundle processor or
element processor to ParDo if the
enter/exit contexts are trivial. But this
is getting somewhat off-topic...
Best,
B
On Fri, Sep 22, 2023 at 7:03 AM
Kenneth Knowles <k...@apache.org>
wrote:
(I notice that you replied
only to yourself, but there
has been a whole thread of
discussion on this - are you
subscribed to dev@beam?
https://lists.apache.org/thread/k81fq301ypwmjowknzyqq2qc63844rbd)
It sounds like you want what
everyone wants: to have the
biggest bundles possible.
So for bounded data, basically
you make even splits of the
data and each split is one
bundle. And then dynamic
splitting to redistribute work
to eliminate stragglers, if
your engine has that capability.
For unbounded data, you
more-or-less bundle as much as
you can without waiting too
long, like Jan described.
Users know to put their high
fixed costs in @StartBundle
and then it is the runner's
job to put as many calls
to @ProcessElement as possible
to amortize.
Kenn
On Fri, Sep 22, 2023 at
9:39 AM Joey Tran
<joey.t...@schrodinger.com> wrote:
Whoops, I typoed my last
email. I meant to write
"this isn't the
greatest strategy for high
*fixed* cost transforms",
e.g. a transform that
takes 5 minutes to get set
up and then maybe a
microsecond per input
I suppose one solution is
to move the responsibility
for handling this kind of
situation to the user and
expect users to use a
bundling transform (e.g.
BatchElements [1])
followed by a
Reshuffle+FlatMap. Is this
what other runners expect?
Just want to make sure I'm
not missing some smart
generic bundling strategy
that might handle this for
users.
[1]
https://beam.apache.org/releases/pydoc/current/apache_beam.transforms.util.html#apache_beam.transforms.util.BatchElements
On Thu, Sep 21, 2023 at
7:23 PM Joey Tran
<joey.t...@schrodinger.com>
wrote:
Writing a runner and
the first strategy for
determining bundling
size was to just start
with a bundle size of
one and double it
until we reach a size
that we expect to take
some targets
per-bundle runtime
(e.g. maybe 10
minutes). I realize
that this isn't the
greatest strategy for
high sized cost
transforms. I'm
curious what kind of
strategies other
runners take?