"Single instance of the fn for the full pipeline execution", if taken
literally, is incompatible:
- with parallelization: requiring a single instance rules out multiple
parallel/distributed instances
- with fault tolerance: what if the worker running this "single instance"
crashes or becomes a zombie - then, obviously, we'll need to create another
instance
- with infinite collections: "full pipeline execution" is moot. More likely
than not, you'd want this per window rather than truly globally
Also, you probably want this sort of scoping at the level of arbitrary
PTransforms, not DoFn's: what if at some point you need to refactor the
DoFn into a more complex transform?

But I think I understand what you mean and at the core, it's a legitimate
need. Please correct me if this is wrong: you want to be able to write
per-window initialization/finalization code - "code that will run before
this PTransform starts processing any elements in window W", and "code that
will run at a point when it's guaranteed that this PTransform will never be
asked to process any elements in window W".

We can flip this to be instead about PCollections: "code that will run
before any PTransform can observe any elements from this collection in
window W" and "code that will run at a point when no PTransform can ever
observe any elements from this collection in window W".

We've faced this need multiple times in the past in various forms and we
know how to address it. This need is faced e.g. by BigQueryIO.read() and
write(), which need to create import/export jobs ("WorkerStartFn") and
clean up temporary files once they are done ("WorkerEndFn"). However, this
is a separate need from the one satisfied by @Setup/@Teardown or the bundle
methods.

- @ProcessElement is the only semantically necessary primitive. What you
want can be achieved without either of Start/FinishBundle or
Setup/Teardown, in a way I describe below.
- @Start/FinishBundle is a semantically no-op optimization that allows you
to amortize the cost of processing multiple elements that can be processed
together (batching), by letting you know what is the scope over which you
are and aren't allowed to amortize.
- @Setup/@Teardown is a semantically no-op optimization that allows you to
share costly long-lived resources between bundles running in the same
thread.

What you want can be achieved using side inputs and the Wait transform,
e.g.:

PCollection<Foo> rawInput = ...;
PCollectionView<Void> initResult = ...apply initialization transform...
.apply(View.asSingleton());
PCollection<Foo> protectedInput =
input.apply(ParDo.of(...identity...).withSideInputs(initResult));
// protectedInput has the property that, by the time you process it, the
initialization transform has already run.
PCollection<Bar> rawOutput = protectedInput.apply(...your processing...);
PCollection<Void> finalizationResult = ...apply finalization transform,
possibly using "rawOutput"...;
PCollection<Bar> finalizedOutput =
rawOutput.apply(Wait.on(finalizationResult));
// finalizedOutput has the property that, by the time you process it, the
finalization transform has already run



On Sat, Feb 17, 2018 at 8:59 AM Romain Manni-Bucau <[email protected]>
wrote:

> Assuming a Pipeline.run(); the corresponding sequence:
>
> WorkerStartFn();
> WorkerEndFn();
>
> So a single instance of the fn for the full pipeline execution.
>
> Le 17 févr. 2018 17:42, "Reuven Lax" <[email protected]> a écrit :
>
>> " and a transform is by design bound to an execution"
>>
>> What do you mean by execution?
>>
>> On Sat, Feb 17, 2018 at 12:50 AM, Romain Manni-Bucau <
>> [email protected]> wrote:
>>
>>>
>>>
>>> Le 16 févr. 2018 22:41, "Reuven Lax" <[email protected]> a écrit :
>>>
>>> Kenn is correct. Allowing Fn reuse across bundles was a major, major
>>> performance improvement. Profiling on the old Dataflow SDKs consistently
>>> showed Java serialization being the number one performance bottleneck for
>>> streaming pipelines, and Beam fixed this.
>>>
>>>
>>> Sorry but this doesnt help me much to understand. Let me try to explain.
>>> I read it as "we were slow somehow around serialization so a quick fix was
>>> caching".
>>>
>>> It is not to be picky but i had a lot of remote ejb over rmi super fast
>>> setup do java serialization is slower than alternative serialization,
>>> right, but doesnt justify caching most of the time.
>>>
>>> My main interrogation is: isnt beam which is designed to be slow in the
>>> way it designed the dofn/transform and therefore serializes way more than
>>> it requires - you never care to serialize the full transform and can in 95%
>>> do a writeReplace which is light and fast compared to the default.
>>>
>>> If so the cache is an implementation workaround and not a fix.
>>>
>>> Hope my view is clearer on it.
>>>
>>>
>>>
>>> Romain - can you state precisely what you want? I do think there is
>>> still a gap - IMO there's a place for a longer-lived per-fn container;
>>> evidence for this is that people still often need to use statics to store
>>> things. However I'm not sure if this is what you're looking for.
>>>
>>>
>>> Yes. I build a framework on top of beam and must be able to provide a
>>> lifecycle clear and reliable. The bare minimum for any user is
>>> start-exec-stop and a transform is by design bound to an execution (stream
>>> or batch).
>>>
>>> Bundles are not an option as explained cause not bound to the execution
>>> but an uncontrolled subpart. You can see it as a beam internal until
>>> runners unify this definition. And in any case it is closer to a chunk
>>> notion than a lifecycle one.
>>>
>>> So setup and teardown must be symmetric.
>>>
>>> Note that a dofn instance owns a config so is bound to an execution.
>>>
>>> This all lead to the nees of a reliable teardown.
>>>
>>> Caching can be neat bit requires it own api like passivation one of ejbs.
>>>
>>>
>>>
>>> Reuven
>>>
>>> On Fri, Feb 16, 2018 at 1:33 PM, Kenneth Knowles <[email protected]> wrote:
>>>
>>>> On Fri, Feb 16, 2018 at 1:00 PM, Romain Manni-Bucau <
>>>> [email protected]> wrote:
>>>>>
>>>>> The serialization of fn being once per bundle, the perf impact is only
>>>>> huge if there is a bug somewhere else, even java serialization is
>>>>> negligeable on big config compared to any small pipeline (seconds vs
>>>>> minutes).
>>>>>
>>>>
>>>> Profiling is clear that this is a huge performance impact. One of the
>>>> most important backwards-incompatible changes we made for Beam 2.0.0 was to
>>>> allow Fn reuse across bundles.
>>>>
>>>> When we used a DoFn only for one bundle, there was no @Teardown because
>>>> it has ~no use. You do everything in @FinishBundle. So for whatever use
>>>> case you are working on, if your pipeline performs well enough doing it per
>>>> bundle, you can put it in @FinishBundle. Of course it still might not get
>>>> called because that is a logical impossibility - you just know that for a
>>>> given element the element will be retried if @FinishBundle fails.
>>>>
>>>> If you have cleanup logic that absolutely must get executed, then you
>>>> need to build a composite PTransform around it so it will be retried until
>>>> cleanup succeeds. In Beam's sinks you can find many examples.
>>>>
>>>> Kenn
>>>>
>>>>
>>>
>>>
>>

Reply via email to