Actually the initialization should be treated using Wait transform too.

So basically the pattern is just:
input.apply(Wait.on(...initialization result...))
  .apply(...your processing...)
  .apply(Wait.on(...finalization result...))

where initialization and finalization results can be computed using
arbitrary PTransforms.

On Sat, Feb 17, 2018 at 11:11 AM Eugene Kirpichov <kirpic...@google.com>
wrote:

> "Single instance of the fn for the full pipeline execution", if taken
> literally, is incompatible:
> - with parallelization: requiring a single instance rules out multiple
> parallel/distributed instances
> - with fault tolerance: what if the worker running this "single instance"
> crashes or becomes a zombie - then, obviously, we'll need to create another
> instance
> - with infinite collections: "full pipeline execution" is moot. More
> likely than not, you'd want this per window rather than truly globally
> Also, you probably want this sort of scoping at the level of arbitrary
> PTransforms, not DoFn's: what if at some point you need to refactor the
> DoFn into a more complex transform?
>
> But I think I understand what you mean and at the core, it's a legitimate
> need. Please correct me if this is wrong: you want to be able to write
> per-window initialization/finalization code - "code that will run before
> this PTransform starts processing any elements in window W", and "code that
> will run at a point when it's guaranteed that this PTransform will never be
> asked to process any elements in window W".
>
> We can flip this to be instead about PCollections: "code that will run
> before any PTransform can observe any elements from this collection in
> window W" and "code that will run at a point when no PTransform can ever
> observe any elements from this collection in window W".
>
> We've faced this need multiple times in the past in various forms and we
> know how to address it. This need is faced e.g. by BigQueryIO.read() and
> write(), which need to create import/export jobs ("WorkerStartFn") and
> clean up temporary files once they are done ("WorkerEndFn"). However, this
> is a separate need from the one satisfied by @Setup/@Teardown or the bundle
> methods.
>
> - @ProcessElement is the only semantically necessary primitive. What you
> want can be achieved without either of Start/FinishBundle or
> Setup/Teardown, in a way I describe below.
> - @Start/FinishBundle is a semantically no-op optimization that allows you
> to amortize the cost of processing multiple elements that can be processed
> together (batching), by letting you know what is the scope over which you
> are and aren't allowed to amortize.
> - @Setup/@Teardown is a semantically no-op optimization that allows you to
> share costly long-lived resources between bundles running in the same
> thread.
>
> What you want can be achieved using side inputs and the Wait transform,
> e.g.:
>
> PCollection<Foo> rawInput = ...;
> PCollectionView<Void> initResult = ...apply initialization transform...
> .apply(View.asSingleton());
> PCollection<Foo> protectedInput =
> input.apply(ParDo.of(...identity...).withSideInputs(initResult));
> // protectedInput has the property that, by the time you process it, the
> initialization transform has already run.
> PCollection<Bar> rawOutput = protectedInput.apply(...your processing...);
> PCollection<Void> finalizationResult = ...apply finalization transform,
> possibly using "rawOutput"...;
> PCollection<Bar> finalizedOutput =
> rawOutput.apply(Wait.on(finalizationResult));
> // finalizedOutput has the property that, by the time you process it, the
> finalization transform has already run
>
>
>
> On Sat, Feb 17, 2018 at 8:59 AM Romain Manni-Bucau <rmannibu...@gmail.com>
> wrote:
>
>> Assuming a Pipeline.run(); the corresponding sequence:
>>
>> WorkerStartFn();
>> WorkerEndFn();
>>
>> So a single instance of the fn for the full pipeline execution.
>>
>> Le 17 févr. 2018 17:42, "Reuven Lax" <re...@google.com> a écrit :
>>
>>> " and a transform is by design bound to an execution"
>>>
>>> What do you mean by execution?
>>>
>>> On Sat, Feb 17, 2018 at 12:50 AM, Romain Manni-Bucau <
>>> rmannibu...@gmail.com> wrote:
>>>
>>>>
>>>>
>>>> Le 16 févr. 2018 22:41, "Reuven Lax" <re...@google.com> a écrit :
>>>>
>>>> Kenn is correct. Allowing Fn reuse across bundles was a major, major
>>>> performance improvement. Profiling on the old Dataflow SDKs consistently
>>>> showed Java serialization being the number one performance bottleneck for
>>>> streaming pipelines, and Beam fixed this.
>>>>
>>>>
>>>> Sorry but this doesnt help me much to understand. Let me try to
>>>> explain. I read it as "we were slow somehow around serialization so a quick
>>>> fix was caching".
>>>>
>>>> It is not to be picky but i had a lot of remote ejb over rmi super fast
>>>> setup do java serialization is slower than alternative serialization,
>>>> right, but doesnt justify caching most of the time.
>>>>
>>>> My main interrogation is: isnt beam which is designed to be slow in the
>>>> way it designed the dofn/transform and therefore serializes way more than
>>>> it requires - you never care to serialize the full transform and can in 95%
>>>> do a writeReplace which is light and fast compared to the default.
>>>>
>>>> If so the cache is an implementation workaround and not a fix.
>>>>
>>>> Hope my view is clearer on it.
>>>>
>>>>
>>>>
>>>> Romain - can you state precisely what you want? I do think there is
>>>> still a gap - IMO there's a place for a longer-lived per-fn container;
>>>> evidence for this is that people still often need to use statics to store
>>>> things. However I'm not sure if this is what you're looking for.
>>>>
>>>>
>>>> Yes. I build a framework on top of beam and must be able to provide a
>>>> lifecycle clear and reliable. The bare minimum for any user is
>>>> start-exec-stop and a transform is by design bound to an execution (stream
>>>> or batch).
>>>>
>>>> Bundles are not an option as explained cause not bound to the execution
>>>> but an uncontrolled subpart. You can see it as a beam internal until
>>>> runners unify this definition. And in any case it is closer to a chunk
>>>> notion than a lifecycle one.
>>>>
>>>> So setup and teardown must be symmetric.
>>>>
>>>> Note that a dofn instance owns a config so is bound to an execution.
>>>>
>>>> This all lead to the nees of a reliable teardown.
>>>>
>>>> Caching can be neat bit requires it own api like passivation one of
>>>> ejbs.
>>>>
>>>>
>>>>
>>>> Reuven
>>>>
>>>> On Fri, Feb 16, 2018 at 1:33 PM, Kenneth Knowles <k...@google.com>
>>>> wrote:
>>>>
>>>>> On Fri, Feb 16, 2018 at 1:00 PM, Romain Manni-Bucau <
>>>>> rmannibu...@gmail.com> wrote:
>>>>>>
>>>>>> The serialization of fn being once per bundle, the perf impact is
>>>>>> only huge if there is a bug somewhere else, even java serialization is
>>>>>> negligeable on big config compared to any small pipeline (seconds vs
>>>>>> minutes).
>>>>>>
>>>>>
>>>>> Profiling is clear that this is a huge performance impact. One of the
>>>>> most important backwards-incompatible changes we made for Beam 2.0.0 was 
>>>>> to
>>>>> allow Fn reuse across bundles.
>>>>>
>>>>> When we used a DoFn only for one bundle, there was no @Teardown
>>>>> because it has ~no use. You do everything in @FinishBundle. So for 
>>>>> whatever
>>>>> use case you are working on, if your pipeline performs well enough doing 
>>>>> it
>>>>> per bundle, you can put it in @FinishBundle. Of course it still might not
>>>>> get called because that is a logical impossibility - you just know that 
>>>>> for
>>>>> a given element the element will be retried if @FinishBundle fails.
>>>>>
>>>>> If you have cleanup logic that absolutely must get executed, then you
>>>>> need to build a composite PTransform around it so it will be retried until
>>>>> cleanup succeeds. In Beam's sinks you can find many examples.
>>>>>
>>>>> Kenn
>>>>>
>>>>>
>>>>
>>>>
>>>

Reply via email to