I've sent out a PR editing the Javadoc https://github.com/apache/beam/pull/4711 . Hopefully, that should be sufficient.
On Mon, Feb 19, 2018 at 3:20 PM Reuven Lax <re...@google.com> wrote: > Ismael, your understanding is appropriate for FinishBundle. > > One basic issue with this understanding, is that the lifecycle of a DoFn > is much longer than a single bundle (which I think you expressed by adding > the *s). How long the DoFn lives is not defined. In fact a runner is > completely free to decide that it will _never_ destroy the DoFn, in which > case TearDown is never called simply because the DoFn was never torn down. > > Also, as mentioned before, the runner can only call TearDown in cases > where the shutdown is in its control. If the JVM is shut down externally, > the runner has no chance to call TearDown. This means that while TearDown > is appropriate for cleaning up in-process resources (open connections, > etc.), it's not the right answer for cleaning up persistent resources. If > you rely on TearDown to delete VMs or delete files, there will be cases in > which those files of VMs are not deleted. > > What we are _not_ saying is that the runner is free to just ignore > TearDown. If the runner is explicitly destroying a DoFn object, it should > call TearDown. > > Reuven > > > On Mon, Feb 19, 2018 at 2:35 PM, Ismaël Mejía <ieme...@gmail.com> wrote: > >> I also had a different understanding of the lifecycle of a DoFn. >> >> My understanding of the use case for every method in the DoFn was clear >> and >> perfectly aligned with Thomas explanation, but what I understood was that >> in a >> general terms ‘@Setup was where I got resources/prepare connections and >> @Teardown where I free them’, so calling Teardown seemed essential to >> have a >> complete lifecycle: >> Setup → StartBundle* → ProcessElement* → FinishBundle* → Teardown >> >> The fact that @Teardown could not be called is a new detail for me too, >> and I >> also find weird to have a method that may or not be called as part of an >> API, >> why would users implement teardown if it will not be called? In that case >> probably a cleaner approach would be to get rid of that method >> altogether, no? >> >> But well maybe that’s not so easy too, there was another point: Some user >> reported an issue with leaking resources using KafkaIO in the Spark >> runner, for >> ref. >> https://apachebeam.slack.com/archives/C1AAFJYMP/p1510596938000622 >> >> In that moment my understanding was that there was something fishy >> because we >> should be calling Teardown to close correctly the connections and free the >> resources in case of exceptions on start/process/finish, so I filled a >> JIRA and >> fixed this by enforcing the call of teardown for the Spark runner and the >> Flink >> runner: >> https://issues.apache.org/jira/browse/BEAM-3187 >> https://issues.apache.org/jira/browse/BEAM-3244 >> >> As you can see not calling this method does have consequences at least for >> non-containerized runners. Of course a runner that uses containers could >> not >> care about cleaning the resources this way, but a long living JVM in a >> Hadoop >> environment probably won’t have the same luck. So I am not sure that >> having a >> loose semantic there is the right option, I mean, runners could simply >> guarantee >> that they call teardown and if teardown takes too long they can decide to >> send a >> signal or kill the process/container/etc and go ahead, that way at least >> users >> would have a motivation to implement the teardown method, otherwise it >> doesn’t >> make any sense to have it (API wise). >> >> On Mon, Feb 19, 2018 at 11:30 PM, Eugene Kirpichov <kirpic...@google.com> >> wrote: >> > Romain, would it be fair to say that currently the goal of your >> > participation in this discussion is to identify situations where >> @Teardown >> > in principle could have been called, but some of the current runners >> don't >> > make a good enough effort to call it? If yes - as I said before, >> please, by >> > all means, file bugs of the form "Runner X doesn't call @Teardown in >> > situation Y" if you're aware of any, and feel free to send PRs fixing >> runner >> > X to reliably call @Teardown in situation Y. I think we all agree that >> this >> > would be a good improvement. >> > >> > On Mon, Feb 19, 2018 at 2:03 PM Romain Manni-Bucau < >> rmannibu...@gmail.com> >> > wrote: >> >> >> >> >> >> >> >> Le 19 févr. 2018 22:56, "Reuven Lax" <re...@google.com> a écrit : >> >> >> >> >> >> >> >> On Mon, Feb 19, 2018 at 1:51 PM, Romain Manni-Bucau >> >> <rmannibu...@gmail.com> wrote: >> >>> >> >>> >> >>> >> >>> Le 19 févr. 2018 21:28, "Reuven Lax" <re...@google.com> a écrit : >> >>> >> >>> How do you call teardown? There are cases in which the Java code gets >> no >> >>> indication that the restart is happening (e.g. cases where the machine >> >>> itself is taken down) >> >>> >> >>> >> >>> This is a bug, 0 downtime maintenance is very doable in 2018 ;). >> Crashes >> >>> are bugs, kill -9 to shutdown is a bug too. Other cases let call >> shutdown >> >>> with a hook worse case. >> >> >> >> >> >> What you say here is simply not true. >> >> >> >> There are many scenarios in which workers shutdown with no opportunity >> for >> >> any sort of shutdown hook. Sometimes the entire machine gets shutdown, >> and >> >> not even the OS will have much of a chance to do anything. At scale >> this >> >> will happen with some regularity, and a distributed system that >> assumes this >> >> will not happen is a poor distributed system. >> >> >> >> >> >> This is part of the infra and there is no reason the machine is >> shutdown >> >> without shutting down what runs on it before except if it is a bug in >> the >> >> software or setup. I can hear you maybe dont do it everywhere but >> there is >> >> no blocker to do it. Means you can shutdown the machines and guarantee >> >> teardown is called. >> >> >> >> Where i go is simply that it is doable and beam sdk core can assume >> setup >> >> is well done. If there is a best effort downside due to that - with the >> >> meaning you defined - it is an impl bug or a user installation issue. >> >> >> >> Technically all is true. >> >> >> >> What can prevent teardown is a hardware failure or so. This is fine and >> >> doesnt need to be in doc since it is life in IT and obvious or must be >> very >> >> explicit to avoid current ambiguity. >> >> >> >> >> >>> >> >>> >> >>> >> >>> >> >>> On Mon, Feb 19, 2018, 12:24 PM Romain Manni-Bucau < >> rmannibu...@gmail.com> >> >>> wrote: >> >>>> >> >>>> Restarting doesnt mean you dont call teardown. Except a bug there is >> no >> >>>> reason - technically - it happens, no reason. >> >>>> >> >>>> Le 19 févr. 2018 21:14, "Reuven Lax" <re...@google.com> a écrit : >> >>>>> >> >>>>> Workers restarting is not a bug, it's standard often expected. >> >>>>> >> >>>>> On Mon, Feb 19, 2018, 12:03 PM Romain Manni-Bucau >> >>>>> <rmannibu...@gmail.com> wrote: >> >>>>>> >> >>>>>> Nothing, as mentionned it is a bug so recovery is a bug recovery >> >>>>>> (procedure) >> >>>>>> >> >>>>>> Le 19 févr. 2018 19:42, "Eugene Kirpichov" <kirpic...@google.com> >> a >> >>>>>> écrit : >> >>>>>>> >> >>>>>>> So what would you like to happen if there is a crash? The DoFn >> >>>>>>> instance no longer exists because the JVM it ran on no longer >> exists. What >> >>>>>>> should Teardown be called on? >> >>>>>>> >> >>>>>>> >> >>>>>>> On Mon, Feb 19, 2018, 10:20 AM Romain Manni-Bucau >> >>>>>>> <rmannibu...@gmail.com> wrote: >> >>>>>>>> >> >>>>>>>> This is what i want and not 999999 teardowns for 1000000 setups >> >>>>>>>> until there is an unexpected crash (= a bug). >> >>>>>>>> >> >>>>>>>> Le 19 févr. 2018 18:57, "Reuven Lax" <re...@google.com> a écrit >> : >> >>>>>>>>> >> >>>>>>>>> >> >>>>>>>>> >> >>>>>>>>> On Mon, Feb 19, 2018 at 7:11 AM, Romain Manni-Bucau >> >>>>>>>>> <rmannibu...@gmail.com> wrote: >> >>>>>>>>>> >> >>>>>>>>>> >> >>>>>>>>>> >> >>>>>>>>>> 2018-02-19 15:57 GMT+01:00 Reuven Lax <re...@google.com>: >> >>>>>>>>>>> >> >>>>>>>>>>> >> >>>>>>>>>>> >> >>>>>>>>>>> On Mon, Feb 19, 2018 at 12:35 AM, Romain Manni-Bucau >> >>>>>>>>>>> <rmannibu...@gmail.com> wrote: >> >>>>>>>>>>>> >> >>>>>>>>>>>> @Reuven: in practise it is created by pool of 256 but leads >> to >> >>>>>>>>>>>> the same pattern, the teardown is just a "if (iCreatedThem) >> releaseThem();" >> >>>>>>>>>>> >> >>>>>>>>>>> >> >>>>>>>>>>> How do you control "256?" Even if you have a pool of 256 >> workers, >> >>>>>>>>>>> nothing in Beam guarantees how many threads and DoFns are >> created per >> >>>>>>>>>>> worker. In theory the runner might decide to create 1000 >> threads on each >> >>>>>>>>>>> worker. >> >>>>>>>>>> >> >>>>>>>>>> >> >>>>>>>>>> Nop was the other way around, in this case on AWS you can get >> 256 >> >>>>>>>>>> instances at once but not 512 (which will be 2x256). So when >> you compute the >> >>>>>>>>>> distribution you allocate to some fn the role to own the >> instance lookup and >> >>>>>>>>>> releasing. >> >>>>>>>>> >> >>>>>>>>> >> >>>>>>>>> I still don't understand. Let's be more precise. If you write >> the >> >>>>>>>>> following code: >> >>>>>>>>> >> >>>>>>>>> pCollection.apply(ParDo.of(new MyDoFn())); >> >>>>>>>>> >> >>>>>>>>> There is no way to control how many instances of MyDoFn are >> >>>>>>>>> created. The runner might decided to create a million instances >> of this >> >>>>>>>>> class across your worker pool, which means that you will get a >> million Setup >> >>>>>>>>> and Teardown calls. >> >>>>>>>>> >> >>>>>>>>>> >> >>>>>>>>>> Anyway this was just an example of an external resource you >> must >> >>>>>>>>>> release. Real topic is that beam should define asap a >> guaranteed generic >> >>>>>>>>>> lifecycle to let user embrace its programming model. >> >>>>>>>>>> >> >>>>>>>>>>> >> >>>>>>>>>>> >> >>>>>>>>>>> >> >>>>>>>>>>>> >> >>>>>>>>>>>> @Eugene: >> >>>>>>>>>>>> 1. wait logic is about passing the value which is not always >> >>>>>>>>>>>> possible (like 15% of cases from my raw estimate) >> >>>>>>>>>>>> 2. sdf: i'll try to detail why i mention SDF more here >> >>>>>>>>>>>> >> >>>>>>>>>>>> >> >>>>>>>>>>>> Concretely beam exposes a portable API (included in the SDK >> >>>>>>>>>>>> core). This API defines a *container* API and therefore >> implies bean >> >>>>>>>>>>>> lifecycles. I'll not detail them all but just use the >> sources and dofn (not >> >>>>>>>>>>>> sdf) to illustrate the idea I'm trying to develop. >> >>>>>>>>>>>> >> >>>>>>>>>>>> A. Source >> >>>>>>>>>>>> >> >>>>>>>>>>>> A source computes a partition plan with 2 primitives: >> >>>>>>>>>>>> estimateSize and split. As an user you can expect both to be >> called on the >> >>>>>>>>>>>> same bean instance to avoid to pay the same connection >> cost(s) twice. >> >>>>>>>>>>>> Concretely: >> >>>>>>>>>>>> >> >>>>>>>>>>>> connect() >> >>>>>>>>>>>> try { >> >>>>>>>>>>>> estimateSize() >> >>>>>>>>>>>> split() >> >>>>>>>>>>>> } finally { >> >>>>>>>>>>>> disconnect() >> >>>>>>>>>>>> } >> >>>>>>>>>>>> >> >>>>>>>>>>>> this is not guaranteed by the API so you must do: >> >>>>>>>>>>>> >> >>>>>>>>>>>> connect() >> >>>>>>>>>>>> try { >> >>>>>>>>>>>> estimateSize() >> >>>>>>>>>>>> } finally { >> >>>>>>>>>>>> disconnect() >> >>>>>>>>>>>> } >> >>>>>>>>>>>> connect() >> >>>>>>>>>>>> try { >> >>>>>>>>>>>> split() >> >>>>>>>>>>>> } finally { >> >>>>>>>>>>>> disconnect() >> >>>>>>>>>>>> } >> >>>>>>>>>>>> >> >>>>>>>>>>>> + a workaround with an internal estimate size since this >> >>>>>>>>>>>> primitive is often called in split but you dont want to >> connect twice in the >> >>>>>>>>>>>> second phase. >> >>>>>>>>>>>> >> >>>>>>>>>>>> Why do you need that? Simply cause you want to define an API >> to >> >>>>>>>>>>>> implement sources which initializes the source bean and >> destroys it. >> >>>>>>>>>>>> I insists it is a very very basic concern for such API. >> However >> >>>>>>>>>>>> beam doesn't embraces it and doesn't assume it so building >> any API on top of >> >>>>>>>>>>>> beam is very hurtful today and for direct beam users you hit >> the exact same >> >>>>>>>>>>>> issues - check how IO are implemented, the static utilities >> which create >> >>>>>>>>>>>> volatile connections preventing to reuse existing connection >> in a single >> >>>>>>>>>>>> method >> >>>>>>>>>>>> ( >> https://github.com/apache/beam/blob/master/sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java#L862 >> ). >> >>>>>>>>>>>> >> >>>>>>>>>>>> Same logic applies to the reader which is then created. >> >>>>>>>>>>>> >> >>>>>>>>>>>> B. DoFn & SDF >> >>>>>>>>>>>> >> >>>>>>>>>>>> As a fn dev you expect the same from the beam runtime: >> init(); >> >>>>>>>>>>>> try { while (...) process(); } finally { destroy(); } and >> that it is >> >>>>>>>>>>>> executed on the exact same instance to be able to be >> stateful at that level >> >>>>>>>>>>>> for expensive connections/operations/flow state handling. >> >>>>>>>>>>>> >> >>>>>>>>>>>> As you mentionned with the million example, this sequence >> should >> >>>>>>>>>>>> happen for each single instance so 1M times for your example. >> >>>>>>>>>>>> >> >>>>>>>>>>>> Now why did I mention SDF several times? Because SDF is a >> >>>>>>>>>>>> generalisation of both cases (source and dofn). Therefore it >> creates way >> >>>>>>>>>>>> more instances and requires to have a way more >> strict/explicit definition of >> >>>>>>>>>>>> the exact lifecycle and which instance does what. Since beam >> handles the >> >>>>>>>>>>>> full lifecycle of the bean instances it must provide >> init/destroy hooks >> >>>>>>>>>>>> (setup/teardown) which can be stateful. >> >>>>>>>>>>>> >> >>>>>>>>>>>> If you take the JDBC example which was mentionned earlier. >> >>>>>>>>>>>> Today, because of the teardown issue it uses bundles. Since >> bundles size is >> >>>>>>>>>>>> not defined - and will not with SDF, it must use a pool to >> be able to reuse >> >>>>>>>>>>>> a connection instance to not correct performances. Now with >> the SDF and the >> >>>>>>>>>>>> split increase, how do you handle the pool size? Generally >> in batch you use >> >>>>>>>>>>>> a single connection per thread to avoid to consume all >> database connections. >> >>>>>>>>>>>> With a pool you have 2 choices: 1. use a pool of 1, 2. use a >> pool a bit >> >>>>>>>>>>>> higher but multiplied by the number of beans you will likely >> x2 or 3 the >> >>>>>>>>>>>> connection count and make the execution fail with "no more >> connection >> >>>>>>>>>>>> available". I you picked 1 (pool of #1), then you still have >> to have a >> >>>>>>>>>>>> reliable teardown by pool instance (close() generally) to >> ensure you release >> >>>>>>>>>>>> the pool and don't leak the connection information in the >> JVM. In all case >> >>>>>>>>>>>> you come back to the init()/destroy() lifecycle even if you >> fake to get >> >>>>>>>>>>>> connections with bundles. >> >>>>>>>>>>>> >> >>>>>>>>>>>> Just to make it obvious: SDF mentions are just cause SDF >> imply >> >>>>>>>>>>>> all the current issues with the loose definition of the bean >> lifecycles at >> >>>>>>>>>>>> an exponential level, nothing else. >> >>>>>>>>>>>> >> >>>>>>>>>>>> >> >>>>>>>>>>>> >> >>>>>>>>>>>> Romain Manni-Bucau >> >>>>>>>>>>>> @rmannibucau | Blog | Old Blog | Github | LinkedIn | Book >> >>>>>>>>>>>> >> >>>>>>>>>>>> 2018-02-18 22:32 GMT+01:00 Eugene Kirpichov >> >>>>>>>>>>>> <kirpic...@google.com>: >> >>>>>>>>>>>>> >> >>>>>>>>>>>>> The kind of whole-transform lifecycle you're mentioning can >> be >> >>>>>>>>>>>>> accomplished using the Wait transform as I suggested in the >> thread above, >> >>>>>>>>>>>>> and I believe it should become the canonical way to do that. >> >>>>>>>>>>>>> >> >>>>>>>>>>>>> (Would like to reiterate one more time, as the main author >> of >> >>>>>>>>>>>>> most design documents related to SDF and of its >> implementation in the Java >> >>>>>>>>>>>>> direct and dataflow runner that SDF is fully unrelated to >> the topic of >> >>>>>>>>>>>>> cleanup - I'm very confused as to why it keeps coming up) >> >>>>>>>>>>>>> >> >>>>>>>>>>>>> >> >>>>>>>>>>>>> On Sun, Feb 18, 2018, 1:15 PM Romain Manni-Bucau >> >>>>>>>>>>>>> <rmannibu...@gmail.com> wrote: >> >>>>>>>>>>>>>> >> >>>>>>>>>>>>>> I kind of agree except transforms lack a lifecycle too. My >> >>>>>>>>>>>>>> understanding is that sdf could be a way to unify it and >> clean the api. >> >>>>>>>>>>>>>> >> >>>>>>>>>>>>>> Otherwise how to normalize - single api - lifecycle of >> >>>>>>>>>>>>>> transforms? >> >>>>>>>>>>>>>> >> >>>>>>>>>>>>>> Le 18 févr. 2018 21:32, "Ben Chambers" < >> bchamb...@apache.org> >> >>>>>>>>>>>>>> a écrit : >> >>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>> Are you sure that focusing on the cleanup of specific >> DoFn's >> >>>>>>>>>>>>>>> is appropriate? Many cases where cleanup is necessary, it >> is around an >> >>>>>>>>>>>>>>> entire composite PTransform. I think there have been >> discussions/proposals >> >>>>>>>>>>>>>>> around a more methodical "cleanup" option, but those >> haven't been >> >>>>>>>>>>>>>>> implemented, to the best of my knowledge. >> >>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>> For instance, consider the steps of a FileIO: >> >>>>>>>>>>>>>>> 1. Write to a bunch (N shards) of temporary files >> >>>>>>>>>>>>>>> 2. When all temporary files are complete, attempt to do a >> >>>>>>>>>>>>>>> bulk copy to put them in the final destination. >> >>>>>>>>>>>>>>> 3. Cleanup all the temporary files. >> >>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>> (This is often desirable because it minimizes the chance >> of >> >>>>>>>>>>>>>>> seeing partial/incomplete results in the final >> destination). >> >>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>> In the above, you'd want step 1 to execute on many >> workers, >> >>>>>>>>>>>>>>> likely using a ParDo (say N different workers). >> >>>>>>>>>>>>>>> The move step should only happen once, so on one worker. >> This >> >>>>>>>>>>>>>>> means it will be a different DoFn, likely with some stuff >> done to ensure it >> >>>>>>>>>>>>>>> runs on one worker. >> >>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>> In such a case, cleanup / @TearDown of the DoFn is not >> >>>>>>>>>>>>>>> enough. We need an API for a PTransform to schedule some >> cleanup work for >> >>>>>>>>>>>>>>> when the transform is "done". In batch this is relatively >> straightforward, >> >>>>>>>>>>>>>>> but doesn't exist. This is the source of some problems, >> such as BigQuery >> >>>>>>>>>>>>>>> sink leaving files around that have failed to import into >> BigQuery. >> >>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>> In streaming this is less straightforward -- do you want >> to >> >>>>>>>>>>>>>>> wait until the end of the pipeline? Or do you want to >> wait until the end of >> >>>>>>>>>>>>>>> the window? In practice, you just want to wait until you >> know nobody will >> >>>>>>>>>>>>>>> need the resource anymore. >> >>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>> This led to some discussions around a "cleanup" API, where >> >>>>>>>>>>>>>>> you could have a transform that output resource objects. >> Each resource >> >>>>>>>>>>>>>>> object would have logic for cleaning it up. And there >> would be something >> >>>>>>>>>>>>>>> that indicated what parts of the pipeline needed that >> resource, and what >> >>>>>>>>>>>>>>> kind of temporal lifetime those objects had. As soon as >> that part of the >> >>>>>>>>>>>>>>> pipeline had advanced far enough that it would no longer >> need the resources, >> >>>>>>>>>>>>>>> they would get cleaned up. This can be done at pipeline >> shutdown, or >> >>>>>>>>>>>>>>> incrementally during a streaming pipeline, etc. >> >>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>> Would something like this be a better fit for your use >> case? >> >>>>>>>>>>>>>>> If not, why is handling teardown within a single DoFn >> sufficient? >> >>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>> On Sun, Feb 18, 2018 at 11:53 AM Romain Manni-Bucau >> >>>>>>>>>>>>>>> <rmannibu...@gmail.com> wrote: >> >>>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>>> Yes 1M. Lets try to explain you simplifying the overall >> >>>>>>>>>>>>>>>> execution. Each instance - one fn so likely in a thread >> of a worker - has >> >>>>>>>>>>>>>>>> its lifecycle. Caricaturally: "new" and garbage >> collection. >> >>>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>>> In practise, new is often an unsafe allocate >> >>>>>>>>>>>>>>>> (deserialization) but it doesnt matter here. >> >>>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>>> What i want is any "new" to have a following setup before >> >>>>>>>>>>>>>>>> any process or stattbundle and the last time beam has >> the instance before it >> >>>>>>>>>>>>>>>> is gc-ed and after last finishbundle it calls teardown. >> >>>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>>> It is as simple as it. >> >>>>>>>>>>>>>>>> This way no need to comibe fn in a way making a fn not >> self >> >>>>>>>>>>>>>>>> contained to implement basic transforms. >> >>>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>>> Le 18 févr. 2018 20:07, "Reuven Lax" <re...@google.com> >> a >> >>>>>>>>>>>>>>>> écrit : >> >>>>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>>>> On Sun, Feb 18, 2018 at 10:50 AM, Romain Manni-Bucau >> >>>>>>>>>>>>>>>>> <rmannibu...@gmail.com> wrote: >> >>>>>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>>>>> Le 18 févr. 2018 19:28, "Ben Chambers" >> >>>>>>>>>>>>>>>>>> <bchamb...@apache.org> a écrit : >> >>>>>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>>>>> It feels like his thread may be a bit off-track. Rather >> >>>>>>>>>>>>>>>>>> than focusing on the semantics of the existing methods >> -- which have been >> >>>>>>>>>>>>>>>>>> noted to be meet many existing use cases -- it would >> be helpful to focus on >> >>>>>>>>>>>>>>>>>> more on the reason you are looking for something with >> different semantics. >> >>>>>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>>>>> Some possibilities (I'm not sure which one you are >> trying >> >>>>>>>>>>>>>>>>>> to do): >> >>>>>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>>>>> 1. Clean-up some external, global resource, that was >> >>>>>>>>>>>>>>>>>> initialized once during the startup of the pipeline. >> If this is the case, >> >>>>>>>>>>>>>>>>>> how are you ensuring it was really only initialized >> once (and not once per >> >>>>>>>>>>>>>>>>>> worker, per thread, per instance, etc.)? How do you >> know when the pipeline >> >>>>>>>>>>>>>>>>>> should release it? If the answer is "when it reaches >> step X", then what >> >>>>>>>>>>>>>>>>>> about a streaming pipeline? >> >>>>>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>>>>> When the dofn is no more needed logically ie when the >> >>>>>>>>>>>>>>>>>> batch is done or stream is stopped (manually or by a >> jvm shutdown) >> >>>>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>>>> I'm really not following what this means. >> >>>>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>>>> Let's say that a pipeline is running 1000 workers, and >> each >> >>>>>>>>>>>>>>>>> worker is running 1000 threads (each running a copy of >> the same DoFn). How >> >>>>>>>>>>>>>>>>> many cleanups do you want (do you want 1000 * 1000 = 1M >> cleanups) and when >> >>>>>>>>>>>>>>>>> do you want it called? When the entire pipeline is shut >> down? When an >> >>>>>>>>>>>>>>>>> individual worker is about to shut down (which may be >> temporary - may be >> >>>>>>>>>>>>>>>>> about to start back up)? Something else? >> >>>>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>>>>> 2. Finalize some resources that are used within some >> >>>>>>>>>>>>>>>>>> region of the pipeline. While, the DoFn lifecycle >> methods are not a good fit >> >>>>>>>>>>>>>>>>>> for this (they are focused on managing resources >> within the DoFn), you could >> >>>>>>>>>>>>>>>>>> model this on how FileIO finalizes the files that it >> produced. For instance: >> >>>>>>>>>>>>>>>>>> a) ParDo generates "resource IDs" (or some token >> that >> >>>>>>>>>>>>>>>>>> stores information about resources) >> >>>>>>>>>>>>>>>>>> b) "Require Deterministic Input" (to prevent retries >> >>>>>>>>>>>>>>>>>> from changing resource IDs) >> >>>>>>>>>>>>>>>>>> c) ParDo that initializes the resources >> >>>>>>>>>>>>>>>>>> d) Pipeline segments that use the resources, and >> >>>>>>>>>>>>>>>>>> eventually output the fact they're done >> >>>>>>>>>>>>>>>>>> e) "Require Deterministic Input" >> >>>>>>>>>>>>>>>>>> f) ParDo that frees the resources >> >>>>>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>>>>> By making the use of the resource part of the data it >> is >> >>>>>>>>>>>>>>>>>> possible to "checkpoint" which resources may be in use >> or have been finished >> >>>>>>>>>>>>>>>>>> by using the require deterministic input. This is >> important to ensuring >> >>>>>>>>>>>>>>>>>> everything is actually cleaned up. >> >>>>>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>>>>> I nees that but generic and not case by case to >> >>>>>>>>>>>>>>>>>> industrialize some api on top of beam. >> >>>>>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>>>>> 3. Some other use case that I may be missing? If it is >> >>>>>>>>>>>>>>>>>> this case, could you elaborate on what you are trying >> to accomplish? That >> >>>>>>>>>>>>>>>>>> would help me understand both the problems with >> existing options and >> >>>>>>>>>>>>>>>>>> possibly what could be done to help. >> >>>>>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>>>>> I understand there are sorkaround for almost all cases >> but >> >>>>>>>>>>>>>>>>>> means each transform is different in its lifecycle >> handling except i >> >>>>>>>>>>>>>>>>>> dislike it a lot at a scale and as a user since you >> cant put any unified >> >>>>>>>>>>>>>>>>>> practise on top of beam, it also makes beam very hard >> to integrate or to use >> >>>>>>>>>>>>>>>>>> to build higher level libraries or softwares. >> >>>>>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>>>>> This is why i tried to not start the workaround >> >>>>>>>>>>>>>>>>>> discussions and just stay at API level. >> >>>>>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>>>>> -- Ben >> >>>>>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>>>>> On Sun, Feb 18, 2018 at 9:56 AM Romain Manni-Bucau >> >>>>>>>>>>>>>>>>>> <rmannibu...@gmail.com> wrote: >> >>>>>>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>>>>>> 2018-02-18 18:36 GMT+01:00 Eugene Kirpichov >> >>>>>>>>>>>>>>>>>>> <kirpic...@google.com>: >> >>>>>>>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>>>>>>> "Machine state" is overly low-level because many of >> the >> >>>>>>>>>>>>>>>>>>>> possible reasons can happen on a perfectly fine >> machine. >> >>>>>>>>>>>>>>>>>>>> If you'd like to rephrase it to "it will be called >> >>>>>>>>>>>>>>>>>>>> except in various situations where it's logically >> impossible or impractical >> >>>>>>>>>>>>>>>>>>>> to guarantee that it's called", that's fine. Or you >> can list some of the >> >>>>>>>>>>>>>>>>>>>> examples above. >> >>>>>>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>>>>>> Sounds ok to me >> >>>>>>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>>>>>>> The main point for the user is, you *will* see >> >>>>>>>>>>>>>>>>>>>> non-preventable situations where it couldn't be >> called - it's not just >> >>>>>>>>>>>>>>>>>>>> intergalactic crashes - so if the logic is very >> important (e.g. cleaning up >> >>>>>>>>>>>>>>>>>>>> a large amount of temporary files, shutting down a >> large number of VMs you >> >>>>>>>>>>>>>>>>>>>> started etc), you have to express it using one of >> the other methods that >> >>>>>>>>>>>>>>>>>>>> have stricter guarantees (which obviously come at a >> cost, e.g. no >> >>>>>>>>>>>>>>>>>>>> pass-by-reference). >> >>>>>>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>>>>>> FinishBundle has the exact same guarantee sadly so not >> >>>>>>>>>>>>>>>>>>> which which other method you speak about. Concretely >> if you make it really >> >>>>>>>>>>>>>>>>>>> unreliable - this is what best effort sounds to me - >> then users can use it >> >>>>>>>>>>>>>>>>>>> to clean anything but if you make it "can happen but >> it is unexpected and >> >>>>>>>>>>>>>>>>>>> means something happent" then it is fine to have a >> manual - or auto if fancy >> >>>>>>>>>>>>>>>>>>> - recovery procedure. This is where it makes all the >> difference and impacts >> >>>>>>>>>>>>>>>>>>> the developpers, ops (all users basically). >> >>>>>>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>>>>>>> On Sun, Feb 18, 2018 at 9:16 AM Romain Manni-Bucau >> >>>>>>>>>>>>>>>>>>>> <rmannibu...@gmail.com> wrote: >> >>>>>>>>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>>>>>>>> Agree Eugene except that "best effort" means that. >> It >> >>>>>>>>>>>>>>>>>>>>> is also often used to say "at will" and this is >> what triggered this thread. >> >>>>>>>>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>>>>>>>> I'm fine using "except if the machine state prevents >> >>>>>>>>>>>>>>>>>>>>> it" but "best effort" is too open and can be very >> badly and wrongly >> >>>>>>>>>>>>>>>>>>>>> perceived by users (like I did). >> >>>>>>>>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>>>>>>>> Romain Manni-Bucau >> >>>>>>>>>>>>>>>>>>>>> @rmannibucau | Blog | Old Blog | Github | LinkedIn >> | >> >>>>>>>>>>>>>>>>>>>>> Book >> >>>>>>>>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>>>>>>>> 2018-02-18 18:13 GMT+01:00 Eugene Kirpichov >> >>>>>>>>>>>>>>>>>>>>> <kirpic...@google.com>: >> >>>>>>>>>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>>>>>>>>> It will not be called if it's impossible to call >> it: >> >>>>>>>>>>>>>>>>>>>>>> in the example situation you have (intergalactic >> crash), and in a number of >> >>>>>>>>>>>>>>>>>>>>>> more common cases: eg in case the worker container >> has crashed (eg user code >> >>>>>>>>>>>>>>>>>>>>>> in a different thread called a C library over JNI >> and it segfaulted), JVM >> >>>>>>>>>>>>>>>>>>>>>> bug, crash due to user code OOM, in case the >> worker has lost network >> >>>>>>>>>>>>>>>>>>>>>> connectivity (then it may be called but it won't >> be able to do anything >> >>>>>>>>>>>>>>>>>>>>>> useful), in case this is running on a preemptible >> VM and it was preempted by >> >>>>>>>>>>>>>>>>>>>>>> the underlying cluster manager without notice or >> if the worker was too busy >> >>>>>>>>>>>>>>>>>>>>>> with other stuff (eg calling other Teardown >> functions) until the preemption >> >>>>>>>>>>>>>>>>>>>>>> timeout elapsed, in case the underlying hardware >> simply failed (which >> >>>>>>>>>>>>>>>>>>>>>> happens quite often at scale), and in many other >> conditions. >> >>>>>>>>>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>>>>>>>>> "Best effort" is the commonly used term to describe >> >>>>>>>>>>>>>>>>>>>>>> such behavior. Please feel free to file bugs for >> cases where you observed a >> >>>>>>>>>>>>>>>>>>>>>> runner not call Teardown in a situation where it >> was possible to call it but >> >>>>>>>>>>>>>>>>>>>>>> the runner made insufficient effort. >> >>>>>>>>>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>>>>>>>>> On Sun, Feb 18, 2018, 9:02 AM Romain Manni-Bucau >> >>>>>>>>>>>>>>>>>>>>>> <rmannibu...@gmail.com> wrote: >> >>>>>>>>>>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>>>>>>>>>> 2018-02-18 18:00 GMT+01:00 Eugene Kirpichov >> >>>>>>>>>>>>>>>>>>>>>>> <kirpic...@google.com>: >> >>>>>>>>>>>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>>>>>>>>>>> On Sun, Feb 18, 2018, 2:06 AM Romain Manni-Bucau >> >>>>>>>>>>>>>>>>>>>>>>>> <rmannibu...@gmail.com> wrote: >> >>>>>>>>>>>>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>>>>>>>>>>>> Le 18 févr. 2018 00:23, "Kenneth Knowles" >> >>>>>>>>>>>>>>>>>>>>>>>>> <k...@google.com> a écrit : >> >>>>>>>>>>>>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>>>>>>>>>>>> On Sat, Feb 17, 2018 at 3:09 PM, Romain >> Manni-Bucau >> >>>>>>>>>>>>>>>>>>>>>>>>> <rmannibu...@gmail.com> wrote: >> >>>>>>>>>>>>>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>>>>>>>>>>>>> If you give an example of a high-level need >> (e.g. >> >>>>>>>>>>>>>>>>>>>>>>>>>> "I'm trying to write an IO for system $x and >> it requires the following >> >>>>>>>>>>>>>>>>>>>>>>>>>> initialization and the following cleanup logic >> and the following processing >> >>>>>>>>>>>>>>>>>>>>>>>>>> in between") I'll be better able to help you. >> >>>>>>>>>>>>>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>>>>>>>>>>>>> Take a simple example of a transform requiring >> a >> >>>>>>>>>>>>>>>>>>>>>>>>>> connection. Using bundles is a perf killer >> since size is not controlled. >> >>>>>>>>>>>>>>>>>>>>>>>>>> Using teardown doesnt allow you to release the >> connection since it is a best >> >>>>>>>>>>>>>>>>>>>>>>>>>> effort thing. Not releasing the connection >> makes you pay a lot - aws ;) - or >> >>>>>>>>>>>>>>>>>>>>>>>>>> prevents you to launch other processings - >> concurrent limit. >> >>>>>>>>>>>>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>>>>>>>>>>>> For this example @Teardown is an exact fit. If >> >>>>>>>>>>>>>>>>>>>>>>>>> things die so badly that @Teardown is not >> called then nothing else can be >> >>>>>>>>>>>>>>>>>>>>>>>>> called to close the connection either. What AWS >> service are you thinking of >> >>>>>>>>>>>>>>>>>>>>>>>>> that stays open for a long time when everything >> at the other end has died? >> >>>>>>>>>>>>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>>>>>>>>>>>> You assume connections are kind of stateless but >> >>>>>>>>>>>>>>>>>>>>>>>>> some (proprietary) protocols requires some >> closing exchanges which are not >> >>>>>>>>>>>>>>>>>>>>>>>>> only "im leaving". >> >>>>>>>>>>>>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>>>>>>>>>>>> For aws i was thinking about starting some >> services >> >>>>>>>>>>>>>>>>>>>>>>>>> - machines - on the fly in a pipeline startup >> and closing them at the end. >> >>>>>>>>>>>>>>>>>>>>>>>>> If teardown is not called you leak machines and >> money. You can say it can be >> >>>>>>>>>>>>>>>>>>>>>>>>> done another way...as the full pipeline ;). >> >>>>>>>>>>>>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>>>>>>>>>>>> I dont want to be picky but if beam cant handle >> its >> >>>>>>>>>>>>>>>>>>>>>>>>> components lifecycle it can be used at scale >> for generic pipelines and if >> >>>>>>>>>>>>>>>>>>>>>>>>> bound to some particular IO. >> >>>>>>>>>>>>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>>>>>>>>>>>> What does prevent to enforce teardown - ignoring >> >>>>>>>>>>>>>>>>>>>>>>>>> the interstellar crash case which cant be >> handled by any human system? >> >>>>>>>>>>>>>>>>>>>>>>>>> Nothing technically. Why do you push to not >> handle it? Is it due to some >> >>>>>>>>>>>>>>>>>>>>>>>>> legacy code on dataflow or something else? >> >>>>>>>>>>>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>>>>>>>>>>> Teardown *is* already documented and implemented >> >>>>>>>>>>>>>>>>>>>>>>>> this way (best-effort). So I'm not sure what >> kind of change you're asking >> >>>>>>>>>>>>>>>>>>>>>>>> for. >> >>>>>>>>>>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>>>>>>>>>> Remove "best effort" from the javadoc. If it is >> not >> >>>>>>>>>>>>>>>>>>>>>>> call then it is a bug and we are done :). >> >>>>>>>>>>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>>>>>>>>>>>> Also what does it mean for the users? Direct >> runner >> >>>>>>>>>>>>>>>>>>>>>>>>> does it so if a user udes the RI in test, he >> will get a different behavior >> >>>>>>>>>>>>>>>>>>>>>>>>> in prod? Also dont forget the user doesnt know >> what the IOs he composes use >> >>>>>>>>>>>>>>>>>>>>>>>>> so this is so impacting for the whole product >> than he must be handled IMHO. >> >>>>>>>>>>>>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>>>>>>>>>>>> I understand the portability culture is new in >> big >> >>>>>>>>>>>>>>>>>>>>>>>>> data world but it is not a reason to ignore >> what people did for years and do >> >>>>>>>>>>>>>>>>>>>>>>>>> it wrong before doing right ;). >> >>>>>>>>>>>>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>>>>>>>>>>>> My proposal is to list what can prevent to >> >>>>>>>>>>>>>>>>>>>>>>>>> guarantee - in the normal IT conditions - the >> execution of teardown. Then we >> >>>>>>>>>>>>>>>>>>>>>>>>> see if we can handle it and only if there is a >> technical reason we cant we >> >>>>>>>>>>>>>>>>>>>>>>>>> make it experimental/unsupported in the api. I >> know spark and flink can, any >> >>>>>>>>>>>>>>>>>>>>>>>>> unknown blocker for other runners? >> >>>>>>>>>>>>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>>>>>>>>>>>> Technical note: even a kill should go through >> java >> >>>>>>>>>>>>>>>>>>>>>>>>> shutdown hooks otherwise your environment (beam >> enclosing software) is fully >> >>>>>>>>>>>>>>>>>>>>>>>>> unhandled and your overall system is >> uncontrolled. Only case where it is not >> >>>>>>>>>>>>>>>>>>>>>>>>> true is when the software is always owned by a >> vendor and never installed on >> >>>>>>>>>>>>>>>>>>>>>>>>> customer environment. In this case it belongd >> to the vendor to handle beam >> >>>>>>>>>>>>>>>>>>>>>>>>> API and not to beam to adjust its API for a >> vendor - otherwise all >> >>>>>>>>>>>>>>>>>>>>>>>>> unsupported features by one runner should be >> made optional right? >> >>>>>>>>>>>>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>>>>>>>>>>>> All state is not about network, even in >> distributed >> >>>>>>>>>>>>>>>>>>>>>>>>> systems so this is key to have an explicit and >> defined lifecycle. >> >>>>>>>>>>>>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>>>>>>>>>>>> Kenn >> >>>>>>>>>>>>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>>>>> >> >>>>>>>>>>>>>>>>> >> >>>>>>>>>>>> >> >>>>>>>>>>> >> >>>>>>>>>> >> >>>>>>>>> >> >>> >> >> >> >> >> > >> > >