I kind of agree except transforms lack a lifecycle too. My understanding is that sdf could be a way to unify it and clean the api.
Otherwise how to normalize - single api - lifecycle of transforms? Le 18 févr. 2018 21:32, "Ben Chambers" <bchamb...@apache.org> a écrit : > Are you sure that focusing on the cleanup of specific DoFn's is > appropriate? Many cases where cleanup is necessary, it is around an entire > composite PTransform. I think there have been discussions/proposals around > a more methodical "cleanup" option, but those haven't been implemented, to > the best of my knowledge. > > For instance, consider the steps of a FileIO: > 1. Write to a bunch (N shards) of temporary files > 2. When all temporary files are complete, attempt to do a bulk copy to put > them in the final destination. > 3. Cleanup all the temporary files. > > (This is often desirable because it minimizes the chance of seeing > partial/incomplete results in the final destination). > > In the above, you'd want step 1 to execute on many workers, likely using a > ParDo (say N different workers). > The move step should only happen once, so on one worker. This means it > will be a different DoFn, likely with some stuff done to ensure it runs on > one worker. > > In such a case, cleanup / @TearDown of the DoFn is not enough. We need an > API for a PTransform to schedule some cleanup work for when the transform > is "done". In batch this is relatively straightforward, but doesn't exist. > This is the source of some problems, such as BigQuery sink leaving files > around that have failed to import into BigQuery. > > In streaming this is less straightforward -- do you want to wait until the > end of the pipeline? Or do you want to wait until the end of the window? In > practice, you just want to wait until you know nobody will need the > resource anymore. > > This led to some discussions around a "cleanup" API, where you could have > a transform that output resource objects. Each resource object would have > logic for cleaning it up. And there would be something that indicated what > parts of the pipeline needed that resource, and what kind of temporal > lifetime those objects had. As soon as that part of the pipeline had > advanced far enough that it would no longer need the resources, they would > get cleaned up. This can be done at pipeline shutdown, or incrementally > during a streaming pipeline, etc. > > Would something like this be a better fit for your use case? If not, why > is handling teardown within a single DoFn sufficient? > > On Sun, Feb 18, 2018 at 11:53 AM Romain Manni-Bucau <rmannibu...@gmail.com> > wrote: > >> Yes 1M. Lets try to explain you simplifying the overall execution. Each >> instance - one fn so likely in a thread of a worker - has its lifecycle. >> Caricaturally: "new" and garbage collection. >> >> In practise, new is often an unsafe allocate (deserialization) but it >> doesnt matter here. >> >> What i want is any "new" to have a following setup before any process or >> stattbundle and the last time beam has the instance before it is gc-ed and >> after last finishbundle it calls teardown. >> >> It is as simple as it. >> This way no need to comibe fn in a way making a fn not self contained to >> implement basic transforms. >> >> Le 18 févr. 2018 20:07, "Reuven Lax" <re...@google.com> a écrit : >> >>> >>> >>> On Sun, Feb 18, 2018 at 10:50 AM, Romain Manni-Bucau < >>> rmannibu...@gmail.com> wrote: >>> >>>> >>>> >>>> Le 18 févr. 2018 19:28, "Ben Chambers" <bchamb...@apache.org> a écrit : >>>> >>>> It feels like his thread may be a bit off-track. Rather than focusing >>>> on the semantics of the existing methods -- which have been noted to be >>>> meet many existing use cases -- it would be helpful to focus on more on the >>>> reason you are looking for something with different semantics. >>>> >>>> Some possibilities (I'm not sure which one you are trying to do): >>>> >>>> 1. Clean-up some external, global resource, that was initialized once >>>> during the startup of the pipeline. If this is the case, how are you >>>> ensuring it was really only initialized once (and not once per worker, per >>>> thread, per instance, etc.)? How do you know when the pipeline should >>>> release it? If the answer is "when it reaches step X", then what about a >>>> streaming pipeline? >>>> >>>> >>>> When the dofn is no more needed logically ie when the batch is done or >>>> stream is stopped (manually or by a jvm shutdown) >>>> >>> >>> I'm really not following what this means. >>> >>> Let's say that a pipeline is running 1000 workers, and each worker is >>> running 1000 threads (each running a copy of the same DoFn). How many >>> cleanups do you want (do you want 1000 * 1000 = 1M cleanups) and when do >>> you want it called? When the entire pipeline is shut down? When an >>> individual worker is about to shut down (which may be temporary - may be >>> about to start back up)? Something else? >>> >>> >>> >>>> >>>> >>>> >>>> 2. Finalize some resources that are used within some region of the >>>> pipeline. While, the DoFn lifecycle methods are not a good fit for this >>>> (they are focused on managing resources within the DoFn), you could model >>>> this on how FileIO finalizes the files that it produced. For instance: >>>> a) ParDo generates "resource IDs" (or some token that stores >>>> information about resources) >>>> b) "Require Deterministic Input" (to prevent retries from changing >>>> resource IDs) >>>> c) ParDo that initializes the resources >>>> d) Pipeline segments that use the resources, and eventually output >>>> the fact they're done >>>> e) "Require Deterministic Input" >>>> f) ParDo that frees the resources >>>> >>>> By making the use of the resource part of the data it is possible to >>>> "checkpoint" which resources may be in use or have been finished by using >>>> the require deterministic input. This is important to ensuring everything >>>> is actually cleaned up. >>>> >>>> >>>> I nees that but generic and not case by case to industrialize some api >>>> on top of beam. >>>> >>>> >>>> >>>> 3. Some other use case that I may be missing? If it is this case, could >>>> you elaborate on what you are trying to accomplish? That would help me >>>> understand both the problems with existing options and possibly what could >>>> be done to help. >>>> >>>> >>>> I understand there are sorkaround for almost all cases but means each >>>> transform is different in its lifecycle handling except i dislike it a lot >>>> at a scale and as a user since you cant put any unified practise on top of >>>> beam, it also makes beam very hard to integrate or to use to build higher >>>> level libraries or softwares. >>>> >>>> This is why i tried to not start the workaround discussions and just >>>> stay at API level. >>>> >>>> >>>> >>>> -- Ben >>>> >>>> >>>> On Sun, Feb 18, 2018 at 9:56 AM Romain Manni-Bucau < >>>> rmannibu...@gmail.com> wrote: >>>> >>>>> 2018-02-18 18:36 GMT+01:00 Eugene Kirpichov <kirpic...@google.com>: >>>>> >>>>>> "Machine state" is overly low-level because many of the possible >>>>>> reasons can happen on a perfectly fine machine. >>>>>> If you'd like to rephrase it to "it will be called except in various >>>>>> situations where it's logically impossible or impractical to guarantee >>>>>> that >>>>>> it's called", that's fine. Or you can list some of the examples above. >>>>>> >>>>> >>>>> Sounds ok to me >>>>> >>>>> >>>>>> >>>>>> The main point for the user is, you *will* see non-preventable >>>>>> situations where it couldn't be called - it's not just intergalactic >>>>>> crashes - so if the logic is very important (e.g. cleaning up a large >>>>>> amount of temporary files, shutting down a large number of VMs you >>>>>> started >>>>>> etc), you have to express it using one of the other methods that have >>>>>> stricter guarantees (which obviously come at a cost, e.g. no >>>>>> pass-by-reference). >>>>>> >>>>> >>>>> FinishBundle has the exact same guarantee sadly so not which which >>>>> other method you speak about. Concretely if you make it really unreliable >>>>> - >>>>> this is what best effort sounds to me - then users can use it to clean >>>>> anything but if you make it "can happen but it is unexpected and means >>>>> something happent" then it is fine to have a manual - or auto if fancy - >>>>> recovery procedure. This is where it makes all the difference and impacts >>>>> the developpers, ops (all users basically). >>>>> >>>>> >>>>>> >>>>>> On Sun, Feb 18, 2018 at 9:16 AM Romain Manni-Bucau < >>>>>> rmannibu...@gmail.com> wrote: >>>>>> >>>>>>> Agree Eugene except that "best effort" means that. It is also often >>>>>>> used to say "at will" and this is what triggered this thread. >>>>>>> >>>>>>> I'm fine using "except if the machine state prevents it" but "best >>>>>>> effort" is too open and can be very badly and wrongly perceived by users >>>>>>> (like I did). >>>>>>> >>>>>>> >>>>>>> Romain Manni-Bucau >>>>>>> @rmannibucau <https://twitter.com/rmannibucau> | Blog >>>>>>> <https://rmannibucau.metawerx.net/> | Old Blog >>>>>>> <http://rmannibucau.wordpress.com> | Github >>>>>>> <https://github.com/rmannibucau> | LinkedIn >>>>>>> <https://www.linkedin.com/in/rmannibucau> | Book >>>>>>> <https://www.packtpub.com/application-development/java-ee-8-high-performance> >>>>>>> >>>>>>> 2018-02-18 18:13 GMT+01:00 Eugene Kirpichov <kirpic...@google.com>: >>>>>>> >>>>>>>> It will not be called if it's impossible to call it: in the example >>>>>>>> situation you have (intergalactic crash), and in a number of more >>>>>>>> common >>>>>>>> cases: eg in case the worker container has crashed (eg user code in a >>>>>>>> different thread called a C library over JNI and it segfaulted), JVM >>>>>>>> bug, >>>>>>>> crash due to user code OOM, in case the worker has lost network >>>>>>>> connectivity (then it may be called but it won't be able to do anything >>>>>>>> useful), in case this is running on a preemptible VM and it was >>>>>>>> preempted >>>>>>>> by the underlying cluster manager without notice or if the worker was >>>>>>>> too >>>>>>>> busy with other stuff (eg calling other Teardown functions) until the >>>>>>>> preemption timeout elapsed, in case the underlying hardware simply >>>>>>>> failed >>>>>>>> (which happens quite often at scale), and in many other conditions. >>>>>>>> >>>>>>>> "Best effort" is the commonly used term to describe such behavior. >>>>>>>> Please feel free to file bugs for cases where you observed a runner not >>>>>>>> call Teardown in a situation where it was possible to call it but the >>>>>>>> runner made insufficient effort. >>>>>>>> >>>>>>>> On Sun, Feb 18, 2018, 9:02 AM Romain Manni-Bucau < >>>>>>>> rmannibu...@gmail.com> wrote: >>>>>>>> >>>>>>>>> 2018-02-18 18:00 GMT+01:00 Eugene Kirpichov <kirpic...@google.com> >>>>>>>>> : >>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> On Sun, Feb 18, 2018, 2:06 AM Romain Manni-Bucau < >>>>>>>>>> rmannibu...@gmail.com> wrote: >>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> Le 18 févr. 2018 00:23, "Kenneth Knowles" <k...@google.com> a >>>>>>>>>>> écrit : >>>>>>>>>>> >>>>>>>>>>> On Sat, Feb 17, 2018 at 3:09 PM, Romain Manni-Bucau < >>>>>>>>>>> rmannibu...@gmail.com> wrote: >>>>>>>>>>>> >>>>>>>>>>>> If you give an example of a high-level need (e.g. "I'm trying >>>>>>>>>>>> to write an IO for system $x and it requires the following >>>>>>>>>>>> initialization >>>>>>>>>>>> and the following cleanup logic and the following processing in >>>>>>>>>>>> between") >>>>>>>>>>>> I'll be better able to help you. >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> Take a simple example of a transform requiring a connection. >>>>>>>>>>>> Using bundles is a perf killer since size is not controlled. Using >>>>>>>>>>>> teardown >>>>>>>>>>>> doesnt allow you to release the connection since it is a best >>>>>>>>>>>> effort thing. >>>>>>>>>>>> Not releasing the connection makes you pay a lot - aws ;) - or >>>>>>>>>>>> prevents you >>>>>>>>>>>> to launch other processings - concurrent limit. >>>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> For this example @Teardown is an exact fit. If things die so >>>>>>>>>>> badly that @Teardown is not called then nothing else can be called >>>>>>>>>>> to close >>>>>>>>>>> the connection either. What AWS service are you thinking of that >>>>>>>>>>> stays open >>>>>>>>>>> for a long time when everything at the other end has died? >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> You assume connections are kind of stateless but some >>>>>>>>>>> (proprietary) protocols requires some closing exchanges which are >>>>>>>>>>> not only >>>>>>>>>>> "im leaving". >>>>>>>>>>> >>>>>>>>>>> For aws i was thinking about starting some services - machines - >>>>>>>>>>> on the fly in a pipeline startup and closing them at the end. If >>>>>>>>>>> teardown >>>>>>>>>>> is not called you leak machines and money. You can say it can be >>>>>>>>>>> done >>>>>>>>>>> another way...as the full pipeline ;). >>>>>>>>>>> >>>>>>>>>>> I dont want to be picky but if beam cant handle its components >>>>>>>>>>> lifecycle it can be used at scale for generic pipelines and if >>>>>>>>>>> bound to >>>>>>>>>>> some particular IO. >>>>>>>>>>> >>>>>>>>>>> What does prevent to enforce teardown - ignoring the >>>>>>>>>>> interstellar crash case which cant be handled by any human system? >>>>>>>>>>> Nothing >>>>>>>>>>> technically. Why do you push to not handle it? Is it due to some >>>>>>>>>>> legacy >>>>>>>>>>> code on dataflow or something else? >>>>>>>>>>> >>>>>>>>>> Teardown *is* already documented and implemented this way >>>>>>>>>> (best-effort). So I'm not sure what kind of change you're asking for. >>>>>>>>>> >>>>>>>>> >>>>>>>>> Remove "best effort" from the javadoc. If it is not call then it >>>>>>>>> is a bug and we are done :). >>>>>>>>> >>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>>> Also what does it mean for the users? Direct runner does it so >>>>>>>>>>> if a user udes the RI in test, he will get a different behavior in >>>>>>>>>>> prod? >>>>>>>>>>> Also dont forget the user doesnt know what the IOs he composes use >>>>>>>>>>> so this >>>>>>>>>>> is so impacting for the whole product than he must be handled IMHO. >>>>>>>>>>> >>>>>>>>>>> I understand the portability culture is new in big data world >>>>>>>>>>> but it is not a reason to ignore what people did for years and do >>>>>>>>>>> it wrong >>>>>>>>>>> before doing right ;). >>>>>>>>>>> >>>>>>>>>>> My proposal is to list what can prevent to guarantee - in the >>>>>>>>>>> normal IT conditions - the execution of teardown. Then we see if we >>>>>>>>>>> can >>>>>>>>>>> handle it and only if there is a technical reason we cant we make it >>>>>>>>>>> experimental/unsupported in the api. I know spark and flink can, any >>>>>>>>>>> unknown blocker for other runners? >>>>>>>>>>> >>>>>>>>>>> Technical note: even a kill should go through java shutdown >>>>>>>>>>> hooks otherwise your environment (beam enclosing software) is fully >>>>>>>>>>> unhandled and your overall system is uncontrolled. Only case where >>>>>>>>>>> it is >>>>>>>>>>> not true is when the software is always owned by a vendor and never >>>>>>>>>>> installed on customer environment. In this case it belongd to the >>>>>>>>>>> vendor to >>>>>>>>>>> handle beam API and not to beam to adjust its API for a vendor - >>>>>>>>>>> otherwise >>>>>>>>>>> all unsupported features by one runner should be made optional >>>>>>>>>>> right? >>>>>>>>>>> >>>>>>>>>>> All state is not about network, even in distributed systems so >>>>>>>>>>> this is key to have an explicit and defined lifecycle. >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> Kenn >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>> >>>> >>>