Actually the initialization should be treated using Wait transform too. So basically the pattern is just: input.apply(Wait.on(...initialization result...)) .apply(...your processing...) .apply(Wait.on(...finalization result...))
where initialization and finalization results can be computed using arbitrary PTransforms. On Sat, Feb 17, 2018 at 11:11 AM Eugene Kirpichov <kirpic...@google.com> wrote: > "Single instance of the fn for the full pipeline execution", if taken > literally, is incompatible: > - with parallelization: requiring a single instance rules out multiple > parallel/distributed instances > - with fault tolerance: what if the worker running this "single instance" > crashes or becomes a zombie - then, obviously, we'll need to create another > instance > - with infinite collections: "full pipeline execution" is moot. More > likely than not, you'd want this per window rather than truly globally > Also, you probably want this sort of scoping at the level of arbitrary > PTransforms, not DoFn's: what if at some point you need to refactor the > DoFn into a more complex transform? > > But I think I understand what you mean and at the core, it's a legitimate > need. Please correct me if this is wrong: you want to be able to write > per-window initialization/finalization code - "code that will run before > this PTransform starts processing any elements in window W", and "code that > will run at a point when it's guaranteed that this PTransform will never be > asked to process any elements in window W". > > We can flip this to be instead about PCollections: "code that will run > before any PTransform can observe any elements from this collection in > window W" and "code that will run at a point when no PTransform can ever > observe any elements from this collection in window W". > > We've faced this need multiple times in the past in various forms and we > know how to address it. This need is faced e.g. by BigQueryIO.read() and > write(), which need to create import/export jobs ("WorkerStartFn") and > clean up temporary files once they are done ("WorkerEndFn"). However, this > is a separate need from the one satisfied by @Setup/@Teardown or the bundle > methods. > > - @ProcessElement is the only semantically necessary primitive. What you > want can be achieved without either of Start/FinishBundle or > Setup/Teardown, in a way I describe below. > - @Start/FinishBundle is a semantically no-op optimization that allows you > to amortize the cost of processing multiple elements that can be processed > together (batching), by letting you know what is the scope over which you > are and aren't allowed to amortize. > - @Setup/@Teardown is a semantically no-op optimization that allows you to > share costly long-lived resources between bundles running in the same > thread. > > What you want can be achieved using side inputs and the Wait transform, > e.g.: > > PCollection<Foo> rawInput = ...; > PCollectionView<Void> initResult = ...apply initialization transform... > .apply(View.asSingleton()); > PCollection<Foo> protectedInput = > input.apply(ParDo.of(...identity...).withSideInputs(initResult)); > // protectedInput has the property that, by the time you process it, the > initialization transform has already run. > PCollection<Bar> rawOutput = protectedInput.apply(...your processing...); > PCollection<Void> finalizationResult = ...apply finalization transform, > possibly using "rawOutput"...; > PCollection<Bar> finalizedOutput = > rawOutput.apply(Wait.on(finalizationResult)); > // finalizedOutput has the property that, by the time you process it, the > finalization transform has already run > > > > On Sat, Feb 17, 2018 at 8:59 AM Romain Manni-Bucau <rmannibu...@gmail.com> > wrote: > >> Assuming a Pipeline.run(); the corresponding sequence: >> >> WorkerStartFn(); >> WorkerEndFn(); >> >> So a single instance of the fn for the full pipeline execution. >> >> Le 17 févr. 2018 17:42, "Reuven Lax" <re...@google.com> a écrit : >> >>> " and a transform is by design bound to an execution" >>> >>> What do you mean by execution? >>> >>> On Sat, Feb 17, 2018 at 12:50 AM, Romain Manni-Bucau < >>> rmannibu...@gmail.com> wrote: >>> >>>> >>>> >>>> Le 16 févr. 2018 22:41, "Reuven Lax" <re...@google.com> a écrit : >>>> >>>> Kenn is correct. Allowing Fn reuse across bundles was a major, major >>>> performance improvement. Profiling on the old Dataflow SDKs consistently >>>> showed Java serialization being the number one performance bottleneck for >>>> streaming pipelines, and Beam fixed this. >>>> >>>> >>>> Sorry but this doesnt help me much to understand. Let me try to >>>> explain. I read it as "we were slow somehow around serialization so a quick >>>> fix was caching". >>>> >>>> It is not to be picky but i had a lot of remote ejb over rmi super fast >>>> setup do java serialization is slower than alternative serialization, >>>> right, but doesnt justify caching most of the time. >>>> >>>> My main interrogation is: isnt beam which is designed to be slow in the >>>> way it designed the dofn/transform and therefore serializes way more than >>>> it requires - you never care to serialize the full transform and can in 95% >>>> do a writeReplace which is light and fast compared to the default. >>>> >>>> If so the cache is an implementation workaround and not a fix. >>>> >>>> Hope my view is clearer on it. >>>> >>>> >>>> >>>> Romain - can you state precisely what you want? I do think there is >>>> still a gap - IMO there's a place for a longer-lived per-fn container; >>>> evidence for this is that people still often need to use statics to store >>>> things. However I'm not sure if this is what you're looking for. >>>> >>>> >>>> Yes. I build a framework on top of beam and must be able to provide a >>>> lifecycle clear and reliable. The bare minimum for any user is >>>> start-exec-stop and a transform is by design bound to an execution (stream >>>> or batch). >>>> >>>> Bundles are not an option as explained cause not bound to the execution >>>> but an uncontrolled subpart. You can see it as a beam internal until >>>> runners unify this definition. And in any case it is closer to a chunk >>>> notion than a lifecycle one. >>>> >>>> So setup and teardown must be symmetric. >>>> >>>> Note that a dofn instance owns a config so is bound to an execution. >>>> >>>> This all lead to the nees of a reliable teardown. >>>> >>>> Caching can be neat bit requires it own api like passivation one of >>>> ejbs. >>>> >>>> >>>> >>>> Reuven >>>> >>>> On Fri, Feb 16, 2018 at 1:33 PM, Kenneth Knowles <k...@google.com> >>>> wrote: >>>> >>>>> On Fri, Feb 16, 2018 at 1:00 PM, Romain Manni-Bucau < >>>>> rmannibu...@gmail.com> wrote: >>>>>> >>>>>> The serialization of fn being once per bundle, the perf impact is >>>>>> only huge if there is a bug somewhere else, even java serialization is >>>>>> negligeable on big config compared to any small pipeline (seconds vs >>>>>> minutes). >>>>>> >>>>> >>>>> Profiling is clear that this is a huge performance impact. One of the >>>>> most important backwards-incompatible changes we made for Beam 2.0.0 was >>>>> to >>>>> allow Fn reuse across bundles. >>>>> >>>>> When we used a DoFn only for one bundle, there was no @Teardown >>>>> because it has ~no use. You do everything in @FinishBundle. So for >>>>> whatever >>>>> use case you are working on, if your pipeline performs well enough doing >>>>> it >>>>> per bundle, you can put it in @FinishBundle. Of course it still might not >>>>> get called because that is a logical impossibility - you just know that >>>>> for >>>>> a given element the element will be retried if @FinishBundle fails. >>>>> >>>>> If you have cleanup logic that absolutely must get executed, then you >>>>> need to build a composite PTransform around it so it will be retried until >>>>> cleanup succeeds. In Beam's sinks you can find many examples. >>>>> >>>>> Kenn >>>>> >>>>> >>>> >>>> >>>