Hello, thanks Eugene for improving the documentation so we can close this thread.
Reuven, I understood the semantics of the methods, what surprised me was that I interpreted the new documentation as if a runner could simply ignore to call @Teardown, and we already have dealt with the issues of not doing this when there is an exception on the element methods (startBundle/processElement/finishBundle), we can leak resources by not calling teardown, as the Spark runner user reported in the link I sent. So considering that a runner should try at best to call that method, I promoted some of the methods of ParDoLifecycleTest to be ValidatesRunner to ensure that runners call teardown after exceptions and I filled BEAM-3245 so the DataflowRunner try at its best to respect the lifecycle when it can. (Note I auto-assigned this JIRA but it is up to you guys to reassign it to the person who can work on it). On Wed, Feb 21, 2018 at 7:26 AM, Reuven Lax <re...@google.com> wrote: > To close the loop here: > > Romain, I think your actual concern was that the Javadoc made it sound like > a runner could simply decide not to call Teardown. If so, then I agree with > you - the Javadoc was misleading (and appears it was confusing to Ismael as > well). If a runner destroys a DoFn, it _must_ call TearDown before it calls > Setup on a new DoFn. > > If so, then most of the back and forth on this thread had little to do with > your actual concern. However it did take almost three days of discussion > before Eugene understood what your real concern was, leading to the side > discussions. > > Reuven > > On Mon, Feb 19, 2018 at 6:08 PM, Reuven Lax <re...@google.com> wrote: >> >> +1 This PR clarifies the semantics quite a bit. >> >> On Mon, Feb 19, 2018 at 3:24 PM, Eugene Kirpichov <kirpic...@google.com> >> wrote: >>> >>> I've sent out a PR editing the Javadoc >>> https://github.com/apache/beam/pull/4711 . Hopefully, that should be >>> sufficient. >>> >>> On Mon, Feb 19, 2018 at 3:20 PM Reuven Lax <re...@google.com> wrote: >>>> >>>> Ismael, your understanding is appropriate for FinishBundle. >>>> >>>> One basic issue with this understanding, is that the lifecycle of a DoFn >>>> is much longer than a single bundle (which I think you expressed by adding >>>> the *s). How long the DoFn lives is not defined. In fact a runner is >>>> completely free to decide that it will _never_ destroy the DoFn, in which >>>> case TearDown is never called simply because the DoFn was never torn down. >>>> >>>> Also, as mentioned before, the runner can only call TearDown in cases >>>> where the shutdown is in its control. If the JVM is shut down externally, >>>> the runner has no chance to call TearDown. This means that while TearDown >>>> is >>>> appropriate for cleaning up in-process resources (open connections, etc.), >>>> it's not the right answer for cleaning up persistent resources. If you rely >>>> on TearDown to delete VMs or delete files, there will be cases in which >>>> those files of VMs are not deleted. >>>> >>>> What we are _not_ saying is that the runner is free to just ignore >>>> TearDown. If the runner is explicitly destroying a DoFn object, it should >>>> call TearDown. >>>> >>>> Reuven >>>> >>>> >>>> On Mon, Feb 19, 2018 at 2:35 PM, Ismaël Mejía <ieme...@gmail.com> wrote: >>>>> >>>>> I also had a different understanding of the lifecycle of a DoFn. >>>>> >>>>> My understanding of the use case for every method in the DoFn was clear >>>>> and >>>>> perfectly aligned with Thomas explanation, but what I understood was >>>>> that in a >>>>> general terms ‘@Setup was where I got resources/prepare connections and >>>>> @Teardown where I free them’, so calling Teardown seemed essential to >>>>> have a >>>>> complete lifecycle: >>>>> Setup → StartBundle* → ProcessElement* → FinishBundle* → Teardown >>>>> >>>>> The fact that @Teardown could not be called is a new detail for me too, >>>>> and I >>>>> also find weird to have a method that may or not be called as part of >>>>> an API, >>>>> why would users implement teardown if it will not be called? In that >>>>> case >>>>> probably a cleaner approach would be to get rid of that method >>>>> altogether, no? >>>>> >>>>> But well maybe that’s not so easy too, there was another point: Some >>>>> user >>>>> reported an issue with leaking resources using KafkaIO in the Spark >>>>> runner, for >>>>> ref. >>>>> https://apachebeam.slack.com/archives/C1AAFJYMP/p1510596938000622 >>>>> >>>>> In that moment my understanding was that there was something fishy >>>>> because we >>>>> should be calling Teardown to close correctly the connections and free >>>>> the >>>>> resources in case of exceptions on start/process/finish, so I filled a >>>>> JIRA and >>>>> fixed this by enforcing the call of teardown for the Spark runner and >>>>> the Flink >>>>> runner: >>>>> https://issues.apache.org/jira/browse/BEAM-3187 >>>>> https://issues.apache.org/jira/browse/BEAM-3244 >>>>> >>>>> As you can see not calling this method does have consequences at least >>>>> for >>>>> non-containerized runners. Of course a runner that uses containers >>>>> could not >>>>> care about cleaning the resources this way, but a long living JVM in a >>>>> Hadoop >>>>> environment probably won’t have the same luck. So I am not sure that >>>>> having a >>>>> loose semantic there is the right option, I mean, runners could simply >>>>> guarantee >>>>> that they call teardown and if teardown takes too long they can decide >>>>> to send a >>>>> signal or kill the process/container/etc and go ahead, that way at >>>>> least users >>>>> would have a motivation to implement the teardown method, otherwise it >>>>> doesn’t >>>>> make any sense to have it (API wise). >>>>> >>>>> On Mon, Feb 19, 2018 at 11:30 PM, Eugene Kirpichov >>>>> <kirpic...@google.com> wrote: >>>>> > Romain, would it be fair to say that currently the goal of your >>>>> > participation in this discussion is to identify situations where >>>>> > @Teardown >>>>> > in principle could have been called, but some of the current runners >>>>> > don't >>>>> > make a good enough effort to call it? If yes - as I said before, >>>>> > please, by >>>>> > all means, file bugs of the form "Runner X doesn't call @Teardown in >>>>> > situation Y" if you're aware of any, and feel free to send PRs fixing >>>>> > runner >>>>> > X to reliably call @Teardown in situation Y. I think we all agree >>>>> > that this >>>>> > would be a good improvement. >>>>> > >>>>> > On Mon, Feb 19, 2018 at 2:03 PM Romain Manni-Bucau >>>>> > <rmannibu...@gmail.com> >>>>> > wrote: >>>>> >> >>>>> >> >>>>> >> >>>>> >> Le 19 févr. 2018 22:56, "Reuven Lax" <re...@google.com> a écrit : >>>>> >> >>>>> >> >>>>> >> >>>>> >> On Mon, Feb 19, 2018 at 1:51 PM, Romain Manni-Bucau >>>>> >> <rmannibu...@gmail.com> wrote: >>>>> >>> >>>>> >>> >>>>> >>> >>>>> >>> Le 19 févr. 2018 21:28, "Reuven Lax" <re...@google.com> a écrit : >>>>> >>> >>>>> >>> How do you call teardown? There are cases in which the Java code >>>>> >>> gets no >>>>> >>> indication that the restart is happening (e.g. cases where the >>>>> >>> machine >>>>> >>> itself is taken down) >>>>> >>> >>>>> >>> >>>>> >>> This is a bug, 0 downtime maintenance is very doable in 2018 ;). >>>>> >>> Crashes >>>>> >>> are bugs, kill -9 to shutdown is a bug too. Other cases let call >>>>> >>> shutdown >>>>> >>> with a hook worse case. >>>>> >> >>>>> >> >>>>> >> What you say here is simply not true. >>>>> >> >>>>> >> There are many scenarios in which workers shutdown with no >>>>> >> opportunity for >>>>> >> any sort of shutdown hook. Sometimes the entire machine gets >>>>> >> shutdown, and >>>>> >> not even the OS will have much of a chance to do anything. At scale >>>>> >> this >>>>> >> will happen with some regularity, and a distributed system that >>>>> >> assumes this >>>>> >> will not happen is a poor distributed system. >>>>> >> >>>>> >> >>>>> >> This is part of the infra and there is no reason the machine is >>>>> >> shutdown >>>>> >> without shutting down what runs on it before except if it is a bug >>>>> >> in the >>>>> >> software or setup. I can hear you maybe dont do it everywhere but >>>>> >> there is >>>>> >> no blocker to do it. Means you can shutdown the machines and >>>>> >> guarantee >>>>> >> teardown is called. >>>>> >> >>>>> >> Where i go is simply that it is doable and beam sdk core can assume >>>>> >> setup >>>>> >> is well done. If there is a best effort downside due to that - with >>>>> >> the >>>>> >> meaning you defined - it is an impl bug or a user installation >>>>> >> issue. >>>>> >> >>>>> >> Technically all is true. >>>>> >> >>>>> >> What can prevent teardown is a hardware failure or so. This is fine >>>>> >> and >>>>> >> doesnt need to be in doc since it is life in IT and obvious or must >>>>> >> be very >>>>> >> explicit to avoid current ambiguity. >>>>> >> >>>>> >> >>>>> >>> >>>>> >>> >>>>> >>> >>>>> >>> >>>>> >>> On Mon, Feb 19, 2018, 12:24 PM Romain Manni-Bucau >>>>> >>> <rmannibu...@gmail.com> >>>>> >>> wrote: >>>>> >>>> >>>>> >>>> Restarting doesnt mean you dont call teardown. Except a bug there >>>>> >>>> is no >>>>> >>>> reason - technically - it happens, no reason. >>>>> >>>> >>>>> >>>> Le 19 févr. 2018 21:14, "Reuven Lax" <re...@google.com> a écrit : >>>>> >>>>> >>>>> >>>>> Workers restarting is not a bug, it's standard often expected. >>>>> >>>>> >>>>> >>>>> On Mon, Feb 19, 2018, 12:03 PM Romain Manni-Bucau >>>>> >>>>> <rmannibu...@gmail.com> wrote: >>>>> >>>>>> >>>>> >>>>>> Nothing, as mentionned it is a bug so recovery is a bug recovery >>>>> >>>>>> (procedure) >>>>> >>>>>> >>>>> >>>>>> Le 19 févr. 2018 19:42, "Eugene Kirpichov" >>>>> >>>>>> <kirpic...@google.com> a >>>>> >>>>>> écrit : >>>>> >>>>>>> >>>>> >>>>>>> So what would you like to happen if there is a crash? The DoFn >>>>> >>>>>>> instance no longer exists because the JVM it ran on no longer >>>>> >>>>>>> exists. What >>>>> >>>>>>> should Teardown be called on? >>>>> >>>>>>> >>>>> >>>>>>> >>>>> >>>>>>> On Mon, Feb 19, 2018, 10:20 AM Romain Manni-Bucau >>>>> >>>>>>> <rmannibu...@gmail.com> wrote: >>>>> >>>>>>>> >>>>> >>>>>>>> This is what i want and not 999999 teardowns for 1000000 >>>>> >>>>>>>> setups >>>>> >>>>>>>> until there is an unexpected crash (= a bug). >>>>> >>>>>>>> >>>>> >>>>>>>> Le 19 févr. 2018 18:57, "Reuven Lax" <re...@google.com> a >>>>> >>>>>>>> écrit : >>>>> >>>>>>>>> >>>>> >>>>>>>>> >>>>> >>>>>>>>> >>>>> >>>>>>>>> On Mon, Feb 19, 2018 at 7:11 AM, Romain Manni-Bucau >>>>> >>>>>>>>> <rmannibu...@gmail.com> wrote: >>>>> >>>>>>>>>> >>>>> >>>>>>>>>> >>>>> >>>>>>>>>> >>>>> >>>>>>>>>> 2018-02-19 15:57 GMT+01:00 Reuven Lax <re...@google.com>: >>>>> >>>>>>>>>>> >>>>> >>>>>>>>>>> >>>>> >>>>>>>>>>> >>>>> >>>>>>>>>>> On Mon, Feb 19, 2018 at 12:35 AM, Romain Manni-Bucau >>>>> >>>>>>>>>>> <rmannibu...@gmail.com> wrote: >>>>> >>>>>>>>>>>> >>>>> >>>>>>>>>>>> @Reuven: in practise it is created by pool of 256 but >>>>> >>>>>>>>>>>> leads to >>>>> >>>>>>>>>>>> the same pattern, the teardown is just a "if >>>>> >>>>>>>>>>>> (iCreatedThem) releaseThem();" >>>>> >>>>>>>>>>> >>>>> >>>>>>>>>>> >>>>> >>>>>>>>>>> How do you control "256?" Even if you have a pool of 256 >>>>> >>>>>>>>>>> workers, >>>>> >>>>>>>>>>> nothing in Beam guarantees how many threads and DoFns are >>>>> >>>>>>>>>>> created per >>>>> >>>>>>>>>>> worker. In theory the runner might decide to create 1000 >>>>> >>>>>>>>>>> threads on each >>>>> >>>>>>>>>>> worker. >>>>> >>>>>>>>>> >>>>> >>>>>>>>>> >>>>> >>>>>>>>>> Nop was the other way around, in this case on AWS you can >>>>> >>>>>>>>>> get 256 >>>>> >>>>>>>>>> instances at once but not 512 (which will be 2x256). So when >>>>> >>>>>>>>>> you compute the >>>>> >>>>>>>>>> distribution you allocate to some fn the role to own the >>>>> >>>>>>>>>> instance lookup and >>>>> >>>>>>>>>> releasing. >>>>> >>>>>>>>> >>>>> >>>>>>>>> >>>>> >>>>>>>>> I still don't understand. Let's be more precise. If you write >>>>> >>>>>>>>> the >>>>> >>>>>>>>> following code: >>>>> >>>>>>>>> >>>>> >>>>>>>>> pCollection.apply(ParDo.of(new MyDoFn())); >>>>> >>>>>>>>> >>>>> >>>>>>>>> There is no way to control how many instances of MyDoFn are >>>>> >>>>>>>>> created. The runner might decided to create a million >>>>> >>>>>>>>> instances of this >>>>> >>>>>>>>> class across your worker pool, which means that you will get >>>>> >>>>>>>>> a million Setup >>>>> >>>>>>>>> and Teardown calls. >>>>> >>>>>>>>> >>>>> >>>>>>>>>> >>>>> >>>>>>>>>> Anyway this was just an example of an external resource you >>>>> >>>>>>>>>> must >>>>> >>>>>>>>>> release. Real topic is that beam should define asap a >>>>> >>>>>>>>>> guaranteed generic >>>>> >>>>>>>>>> lifecycle to let user embrace its programming model. >>>>> >>>>>>>>>> >>>>> >>>>>>>>>>> >>>>> >>>>>>>>>>> >>>>> >>>>>>>>>>> >>>>> >>>>>>>>>>>> >>>>> >>>>>>>>>>>> @Eugene: >>>>> >>>>>>>>>>>> 1. wait logic is about passing the value which is not >>>>> >>>>>>>>>>>> always >>>>> >>>>>>>>>>>> possible (like 15% of cases from my raw estimate) >>>>> >>>>>>>>>>>> 2. sdf: i'll try to detail why i mention SDF more here >>>>> >>>>>>>>>>>> >>>>> >>>>>>>>>>>> >>>>> >>>>>>>>>>>> Concretely beam exposes a portable API (included in the >>>>> >>>>>>>>>>>> SDK >>>>> >>>>>>>>>>>> core). This API defines a *container* API and therefore >>>>> >>>>>>>>>>>> implies bean >>>>> >>>>>>>>>>>> lifecycles. I'll not detail them all but just use the >>>>> >>>>>>>>>>>> sources and dofn (not >>>>> >>>>>>>>>>>> sdf) to illustrate the idea I'm trying to develop. >>>>> >>>>>>>>>>>> >>>>> >>>>>>>>>>>> A. Source >>>>> >>>>>>>>>>>> >>>>> >>>>>>>>>>>> A source computes a partition plan with 2 primitives: >>>>> >>>>>>>>>>>> estimateSize and split. As an user you can expect both to >>>>> >>>>>>>>>>>> be called on the >>>>> >>>>>>>>>>>> same bean instance to avoid to pay the same connection >>>>> >>>>>>>>>>>> cost(s) twice. >>>>> >>>>>>>>>>>> Concretely: >>>>> >>>>>>>>>>>> >>>>> >>>>>>>>>>>> connect() >>>>> >>>>>>>>>>>> try { >>>>> >>>>>>>>>>>> estimateSize() >>>>> >>>>>>>>>>>> split() >>>>> >>>>>>>>>>>> } finally { >>>>> >>>>>>>>>>>> disconnect() >>>>> >>>>>>>>>>>> } >>>>> >>>>>>>>>>>> >>>>> >>>>>>>>>>>> this is not guaranteed by the API so you must do: >>>>> >>>>>>>>>>>> >>>>> >>>>>>>>>>>> connect() >>>>> >>>>>>>>>>>> try { >>>>> >>>>>>>>>>>> estimateSize() >>>>> >>>>>>>>>>>> } finally { >>>>> >>>>>>>>>>>> disconnect() >>>>> >>>>>>>>>>>> } >>>>> >>>>>>>>>>>> connect() >>>>> >>>>>>>>>>>> try { >>>>> >>>>>>>>>>>> split() >>>>> >>>>>>>>>>>> } finally { >>>>> >>>>>>>>>>>> disconnect() >>>>> >>>>>>>>>>>> } >>>>> >>>>>>>>>>>> >>>>> >>>>>>>>>>>> + a workaround with an internal estimate size since this >>>>> >>>>>>>>>>>> primitive is often called in split but you dont want to >>>>> >>>>>>>>>>>> connect twice in the >>>>> >>>>>>>>>>>> second phase. >>>>> >>>>>>>>>>>> >>>>> >>>>>>>>>>>> Why do you need that? Simply cause you want to define an >>>>> >>>>>>>>>>>> API to >>>>> >>>>>>>>>>>> implement sources which initializes the source bean and >>>>> >>>>>>>>>>>> destroys it. >>>>> >>>>>>>>>>>> I insists it is a very very basic concern for such API. >>>>> >>>>>>>>>>>> However >>>>> >>>>>>>>>>>> beam doesn't embraces it and doesn't assume it so building >>>>> >>>>>>>>>>>> any API on top of >>>>> >>>>>>>>>>>> beam is very hurtful today and for direct beam users you >>>>> >>>>>>>>>>>> hit the exact same >>>>> >>>>>>>>>>>> issues - check how IO are implemented, the static >>>>> >>>>>>>>>>>> utilities which create >>>>> >>>>>>>>>>>> volatile connections preventing to reuse existing >>>>> >>>>>>>>>>>> connection in a single >>>>> >>>>>>>>>>>> method >>>>> >>>>>>>>>>>> >>>>> >>>>>>>>>>>> (https://github.com/apache/beam/blob/master/sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java#L862). >>>>> >>>>>>>>>>>> >>>>> >>>>>>>>>>>> Same logic applies to the reader which is then created. >>>>> >>>>>>>>>>>> >>>>> >>>>>>>>>>>> B. DoFn & SDF >>>>> >>>>>>>>>>>> >>>>> >>>>>>>>>>>> As a fn dev you expect the same from the beam runtime: >>>>> >>>>>>>>>>>> init(); >>>>> >>>>>>>>>>>> try { while (...) process(); } finally { destroy(); } and >>>>> >>>>>>>>>>>> that it is >>>>> >>>>>>>>>>>> executed on the exact same instance to be able to be >>>>> >>>>>>>>>>>> stateful at that level >>>>> >>>>>>>>>>>> for expensive connections/operations/flow state handling. >>>>> >>>>>>>>>>>> >>>>> >>>>>>>>>>>> As you mentionned with the million example, this sequence >>>>> >>>>>>>>>>>> should >>>>> >>>>>>>>>>>> happen for each single instance so 1M times for your >>>>> >>>>>>>>>>>> example. >>>>> >>>>>>>>>>>> >>>>> >>>>>>>>>>>> Now why did I mention SDF several times? Because SDF is a >>>>> >>>>>>>>>>>> generalisation of both cases (source and dofn). Therefore >>>>> >>>>>>>>>>>> it creates way >>>>> >>>>>>>>>>>> more instances and requires to have a way more >>>>> >>>>>>>>>>>> strict/explicit definition of >>>>> >>>>>>>>>>>> the exact lifecycle and which instance does what. Since >>>>> >>>>>>>>>>>> beam handles the >>>>> >>>>>>>>>>>> full lifecycle of the bean instances it must provide >>>>> >>>>>>>>>>>> init/destroy hooks >>>>> >>>>>>>>>>>> (setup/teardown) which can be stateful. >>>>> >>>>>>>>>>>> >>>>> >>>>>>>>>>>> If you take the JDBC example which was mentionned earlier. >>>>> >>>>>>>>>>>> Today, because of the teardown issue it uses bundles. >>>>> >>>>>>>>>>>> Since bundles size is >>>>> >>>>>>>>>>>> not defined - and will not with SDF, it must use a pool to >>>>> >>>>>>>>>>>> be able to reuse >>>>> >>>>>>>>>>>> a connection instance to not correct performances. Now >>>>> >>>>>>>>>>>> with the SDF and the >>>>> >>>>>>>>>>>> split increase, how do you handle the pool size? Generally >>>>> >>>>>>>>>>>> in batch you use >>>>> >>>>>>>>>>>> a single connection per thread to avoid to consume all >>>>> >>>>>>>>>>>> database connections. >>>>> >>>>>>>>>>>> With a pool you have 2 choices: 1. use a pool of 1, 2. use >>>>> >>>>>>>>>>>> a pool a bit >>>>> >>>>>>>>>>>> higher but multiplied by the number of beans you will >>>>> >>>>>>>>>>>> likely x2 or 3 the >>>>> >>>>>>>>>>>> connection count and make the execution fail with "no more >>>>> >>>>>>>>>>>> connection >>>>> >>>>>>>>>>>> available". I you picked 1 (pool of #1), then you still >>>>> >>>>>>>>>>>> have to have a >>>>> >>>>>>>>>>>> reliable teardown by pool instance (close() generally) to >>>>> >>>>>>>>>>>> ensure you release >>>>> >>>>>>>>>>>> the pool and don't leak the connection information in the >>>>> >>>>>>>>>>>> JVM. In all case >>>>> >>>>>>>>>>>> you come back to the init()/destroy() lifecycle even if >>>>> >>>>>>>>>>>> you fake to get >>>>> >>>>>>>>>>>> connections with bundles. >>>>> >>>>>>>>>>>> >>>>> >>>>>>>>>>>> Just to make it obvious: SDF mentions are just cause SDF >>>>> >>>>>>>>>>>> imply >>>>> >>>>>>>>>>>> all the current issues with the loose definition of the >>>>> >>>>>>>>>>>> bean lifecycles at >>>>> >>>>>>>>>>>> an exponential level, nothing else. >>>>> >>>>>>>>>>>> >>>>> >>>>>>>>>>>> >>>>> >>>>>>>>>>>> >>>>> >>>>>>>>>>>> Romain Manni-Bucau >>>>> >>>>>>>>>>>> @rmannibucau | Blog | Old Blog | Github | LinkedIn | Book >>>>> >>>>>>>>>>>> >>>>> >>>>>>>>>>>> 2018-02-18 22:32 GMT+01:00 Eugene Kirpichov >>>>> >>>>>>>>>>>> <kirpic...@google.com>: >>>>> >>>>>>>>>>>>> >>>>> >>>>>>>>>>>>> The kind of whole-transform lifecycle you're mentioning >>>>> >>>>>>>>>>>>> can be >>>>> >>>>>>>>>>>>> accomplished using the Wait transform as I suggested in >>>>> >>>>>>>>>>>>> the thread above, >>>>> >>>>>>>>>>>>> and I believe it should become the canonical way to do >>>>> >>>>>>>>>>>>> that. >>>>> >>>>>>>>>>>>> >>>>> >>>>>>>>>>>>> (Would like to reiterate one more time, as the main >>>>> >>>>>>>>>>>>> author of >>>>> >>>>>>>>>>>>> most design documents related to SDF and of its >>>>> >>>>>>>>>>>>> implementation in the Java >>>>> >>>>>>>>>>>>> direct and dataflow runner that SDF is fully unrelated to >>>>> >>>>>>>>>>>>> the topic of >>>>> >>>>>>>>>>>>> cleanup - I'm very confused as to why it keeps coming up) >>>>> >>>>>>>>>>>>> >>>>> >>>>>>>>>>>>> >>>>> >>>>>>>>>>>>> On Sun, Feb 18, 2018, 1:15 PM Romain Manni-Bucau >>>>> >>>>>>>>>>>>> <rmannibu...@gmail.com> wrote: >>>>> >>>>>>>>>>>>>> >>>>> >>>>>>>>>>>>>> I kind of agree except transforms lack a lifecycle too. >>>>> >>>>>>>>>>>>>> My >>>>> >>>>>>>>>>>>>> understanding is that sdf could be a way to unify it and >>>>> >>>>>>>>>>>>>> clean the api. >>>>> >>>>>>>>>>>>>> >>>>> >>>>>>>>>>>>>> Otherwise how to normalize - single api - lifecycle of >>>>> >>>>>>>>>>>>>> transforms? >>>>> >>>>>>>>>>>>>> >>>>> >>>>>>>>>>>>>> Le 18 févr. 2018 21:32, "Ben Chambers" >>>>> >>>>>>>>>>>>>> <bchamb...@apache.org> >>>>> >>>>>>>>>>>>>> a écrit : >>>>> >>>>>>>>>>>>>>> >>>>> >>>>>>>>>>>>>>> Are you sure that focusing on the cleanup of specific >>>>> >>>>>>>>>>>>>>> DoFn's >>>>> >>>>>>>>>>>>>>> is appropriate? Many cases where cleanup is necessary, >>>>> >>>>>>>>>>>>>>> it is around an >>>>> >>>>>>>>>>>>>>> entire composite PTransform. I think there have been >>>>> >>>>>>>>>>>>>>> discussions/proposals >>>>> >>>>>>>>>>>>>>> around a more methodical "cleanup" option, but those >>>>> >>>>>>>>>>>>>>> haven't been >>>>> >>>>>>>>>>>>>>> implemented, to the best of my knowledge. >>>>> >>>>>>>>>>>>>>> >>>>> >>>>>>>>>>>>>>> For instance, consider the steps of a FileIO: >>>>> >>>>>>>>>>>>>>> 1. Write to a bunch (N shards) of temporary files >>>>> >>>>>>>>>>>>>>> 2. When all temporary files are complete, attempt to do >>>>> >>>>>>>>>>>>>>> a >>>>> >>>>>>>>>>>>>>> bulk copy to put them in the final destination. >>>>> >>>>>>>>>>>>>>> 3. Cleanup all the temporary files. >>>>> >>>>>>>>>>>>>>> >>>>> >>>>>>>>>>>>>>> (This is often desirable because it minimizes the >>>>> >>>>>>>>>>>>>>> chance of >>>>> >>>>>>>>>>>>>>> seeing partial/incomplete results in the final >>>>> >>>>>>>>>>>>>>> destination). >>>>> >>>>>>>>>>>>>>> >>>>> >>>>>>>>>>>>>>> In the above, you'd want step 1 to execute on many >>>>> >>>>>>>>>>>>>>> workers, >>>>> >>>>>>>>>>>>>>> likely using a ParDo (say N different workers). >>>>> >>>>>>>>>>>>>>> The move step should only happen once, so on one >>>>> >>>>>>>>>>>>>>> worker. This >>>>> >>>>>>>>>>>>>>> means it will be a different DoFn, likely with some >>>>> >>>>>>>>>>>>>>> stuff done to ensure it >>>>> >>>>>>>>>>>>>>> runs on one worker. >>>>> >>>>>>>>>>>>>>> >>>>> >>>>>>>>>>>>>>> In such a case, cleanup / @TearDown of the DoFn is not >>>>> >>>>>>>>>>>>>>> enough. We need an API for a PTransform to schedule >>>>> >>>>>>>>>>>>>>> some cleanup work for >>>>> >>>>>>>>>>>>>>> when the transform is "done". In batch this is >>>>> >>>>>>>>>>>>>>> relatively straightforward, >>>>> >>>>>>>>>>>>>>> but doesn't exist. This is the source of some problems, >>>>> >>>>>>>>>>>>>>> such as BigQuery >>>>> >>>>>>>>>>>>>>> sink leaving files around that have failed to import >>>>> >>>>>>>>>>>>>>> into BigQuery. >>>>> >>>>>>>>>>>>>>> >>>>> >>>>>>>>>>>>>>> In streaming this is less straightforward -- do you >>>>> >>>>>>>>>>>>>>> want to >>>>> >>>>>>>>>>>>>>> wait until the end of the pipeline? Or do you want to >>>>> >>>>>>>>>>>>>>> wait until the end of >>>>> >>>>>>>>>>>>>>> the window? In practice, you just want to wait until >>>>> >>>>>>>>>>>>>>> you know nobody will >>>>> >>>>>>>>>>>>>>> need the resource anymore. >>>>> >>>>>>>>>>>>>>> >>>>> >>>>>>>>>>>>>>> This led to some discussions around a "cleanup" API, >>>>> >>>>>>>>>>>>>>> where >>>>> >>>>>>>>>>>>>>> you could have a transform that output resource >>>>> >>>>>>>>>>>>>>> objects. Each resource >>>>> >>>>>>>>>>>>>>> object would have logic for cleaning it up. And there >>>>> >>>>>>>>>>>>>>> would be something >>>>> >>>>>>>>>>>>>>> that indicated what parts of the pipeline needed that >>>>> >>>>>>>>>>>>>>> resource, and what >>>>> >>>>>>>>>>>>>>> kind of temporal lifetime those objects had. As soon as >>>>> >>>>>>>>>>>>>>> that part of the >>>>> >>>>>>>>>>>>>>> pipeline had advanced far enough that it would no >>>>> >>>>>>>>>>>>>>> longer need the resources, >>>>> >>>>>>>>>>>>>>> they would get cleaned up. This can be done at pipeline >>>>> >>>>>>>>>>>>>>> shutdown, or >>>>> >>>>>>>>>>>>>>> incrementally during a streaming pipeline, etc. >>>>> >>>>>>>>>>>>>>> >>>>> >>>>>>>>>>>>>>> Would something like this be a better fit for your use >>>>> >>>>>>>>>>>>>>> case? >>>>> >>>>>>>>>>>>>>> If not, why is handling teardown within a single DoFn >>>>> >>>>>>>>>>>>>>> sufficient? >>>>> >>>>>>>>>>>>>>> >>>>> >>>>>>>>>>>>>>> On Sun, Feb 18, 2018 at 11:53 AM Romain Manni-Bucau >>>>> >>>>>>>>>>>>>>> <rmannibu...@gmail.com> wrote: >>>>> >>>>>>>>>>>>>>>> >>>>> >>>>>>>>>>>>>>>> Yes 1M. Lets try to explain you simplifying the >>>>> >>>>>>>>>>>>>>>> overall >>>>> >>>>>>>>>>>>>>>> execution. Each instance - one fn so likely in a >>>>> >>>>>>>>>>>>>>>> thread of a worker - has >>>>> >>>>>>>>>>>>>>>> its lifecycle. Caricaturally: "new" and garbage >>>>> >>>>>>>>>>>>>>>> collection. >>>>> >>>>>>>>>>>>>>>> >>>>> >>>>>>>>>>>>>>>> In practise, new is often an unsafe allocate >>>>> >>>>>>>>>>>>>>>> (deserialization) but it doesnt matter here. >>>>> >>>>>>>>>>>>>>>> >>>>> >>>>>>>>>>>>>>>> What i want is any "new" to have a following setup >>>>> >>>>>>>>>>>>>>>> before >>>>> >>>>>>>>>>>>>>>> any process or stattbundle and the last time beam has >>>>> >>>>>>>>>>>>>>>> the instance before it >>>>> >>>>>>>>>>>>>>>> is gc-ed and after last finishbundle it calls >>>>> >>>>>>>>>>>>>>>> teardown. >>>>> >>>>>>>>>>>>>>>> >>>>> >>>>>>>>>>>>>>>> It is as simple as it. >>>>> >>>>>>>>>>>>>>>> This way no need to comibe fn in a way making a fn not >>>>> >>>>>>>>>>>>>>>> self >>>>> >>>>>>>>>>>>>>>> contained to implement basic transforms. >>>>> >>>>>>>>>>>>>>>> >>>>> >>>>>>>>>>>>>>>> Le 18 févr. 2018 20:07, "Reuven Lax" >>>>> >>>>>>>>>>>>>>>> <re...@google.com> a >>>>> >>>>>>>>>>>>>>>> écrit : >>>>> >>>>>>>>>>>>>>>>> >>>>> >>>>>>>>>>>>>>>>> >>>>> >>>>>>>>>>>>>>>>> >>>>> >>>>>>>>>>>>>>>>> On Sun, Feb 18, 2018 at 10:50 AM, Romain Manni-Bucau >>>>> >>>>>>>>>>>>>>>>> <rmannibu...@gmail.com> wrote: >>>>> >>>>>>>>>>>>>>>>>> >>>>> >>>>>>>>>>>>>>>>>> >>>>> >>>>>>>>>>>>>>>>>> >>>>> >>>>>>>>>>>>>>>>>> Le 18 févr. 2018 19:28, "Ben Chambers" >>>>> >>>>>>>>>>>>>>>>>> <bchamb...@apache.org> a écrit : >>>>> >>>>>>>>>>>>>>>>>> >>>>> >>>>>>>>>>>>>>>>>> It feels like his thread may be a bit off-track. >>>>> >>>>>>>>>>>>>>>>>> Rather >>>>> >>>>>>>>>>>>>>>>>> than focusing on the semantics of the existing >>>>> >>>>>>>>>>>>>>>>>> methods -- which have been >>>>> >>>>>>>>>>>>>>>>>> noted to be meet many existing use cases -- it would >>>>> >>>>>>>>>>>>>>>>>> be helpful to focus on >>>>> >>>>>>>>>>>>>>>>>> more on the reason you are looking for something >>>>> >>>>>>>>>>>>>>>>>> with different semantics. >>>>> >>>>>>>>>>>>>>>>>> >>>>> >>>>>>>>>>>>>>>>>> Some possibilities (I'm not sure which one you are >>>>> >>>>>>>>>>>>>>>>>> trying >>>>> >>>>>>>>>>>>>>>>>> to do): >>>>> >>>>>>>>>>>>>>>>>> >>>>> >>>>>>>>>>>>>>>>>> 1. Clean-up some external, global resource, that was >>>>> >>>>>>>>>>>>>>>>>> initialized once during the startup of the pipeline. >>>>> >>>>>>>>>>>>>>>>>> If this is the case, >>>>> >>>>>>>>>>>>>>>>>> how are you ensuring it was really only initialized >>>>> >>>>>>>>>>>>>>>>>> once (and not once per >>>>> >>>>>>>>>>>>>>>>>> worker, per thread, per instance, etc.)? How do you >>>>> >>>>>>>>>>>>>>>>>> know when the pipeline >>>>> >>>>>>>>>>>>>>>>>> should release it? If the answer is "when it reaches >>>>> >>>>>>>>>>>>>>>>>> step X", then what >>>>> >>>>>>>>>>>>>>>>>> about a streaming pipeline? >>>>> >>>>>>>>>>>>>>>>>> >>>>> >>>>>>>>>>>>>>>>>> >>>>> >>>>>>>>>>>>>>>>>> When the dofn is no more needed logically ie when >>>>> >>>>>>>>>>>>>>>>>> the >>>>> >>>>>>>>>>>>>>>>>> batch is done or stream is stopped (manually or by a >>>>> >>>>>>>>>>>>>>>>>> jvm shutdown) >>>>> >>>>>>>>>>>>>>>>> >>>>> >>>>>>>>>>>>>>>>> >>>>> >>>>>>>>>>>>>>>>> I'm really not following what this means. >>>>> >>>>>>>>>>>>>>>>> >>>>> >>>>>>>>>>>>>>>>> Let's say that a pipeline is running 1000 workers, >>>>> >>>>>>>>>>>>>>>>> and each >>>>> >>>>>>>>>>>>>>>>> worker is running 1000 threads (each running a copy >>>>> >>>>>>>>>>>>>>>>> of the same DoFn). How >>>>> >>>>>>>>>>>>>>>>> many cleanups do you want (do you want 1000 * 1000 = >>>>> >>>>>>>>>>>>>>>>> 1M cleanups) and when >>>>> >>>>>>>>>>>>>>>>> do you want it called? When the entire pipeline is >>>>> >>>>>>>>>>>>>>>>> shut down? When an >>>>> >>>>>>>>>>>>>>>>> individual worker is about to shut down (which may be >>>>> >>>>>>>>>>>>>>>>> temporary - may be >>>>> >>>>>>>>>>>>>>>>> about to start back up)? Something else? >>>>> >>>>>>>>>>>>>>>>> >>>>> >>>>>>>>>>>>>>>>> >>>>> >>>>>>>>>>>>>>>>>> >>>>> >>>>>>>>>>>>>>>>>> >>>>> >>>>>>>>>>>>>>>>>> >>>>> >>>>>>>>>>>>>>>>>> >>>>> >>>>>>>>>>>>>>>>>> 2. Finalize some resources that are used within some >>>>> >>>>>>>>>>>>>>>>>> region of the pipeline. While, the DoFn lifecycle >>>>> >>>>>>>>>>>>>>>>>> methods are not a good fit >>>>> >>>>>>>>>>>>>>>>>> for this (they are focused on managing resources >>>>> >>>>>>>>>>>>>>>>>> within the DoFn), you could >>>>> >>>>>>>>>>>>>>>>>> model this on how FileIO finalizes the files that it >>>>> >>>>>>>>>>>>>>>>>> produced. For instance: >>>>> >>>>>>>>>>>>>>>>>> a) ParDo generates "resource IDs" (or some token >>>>> >>>>>>>>>>>>>>>>>> that >>>>> >>>>>>>>>>>>>>>>>> stores information about resources) >>>>> >>>>>>>>>>>>>>>>>> b) "Require Deterministic Input" (to prevent >>>>> >>>>>>>>>>>>>>>>>> retries >>>>> >>>>>>>>>>>>>>>>>> from changing resource IDs) >>>>> >>>>>>>>>>>>>>>>>> c) ParDo that initializes the resources >>>>> >>>>>>>>>>>>>>>>>> d) Pipeline segments that use the resources, and >>>>> >>>>>>>>>>>>>>>>>> eventually output the fact they're done >>>>> >>>>>>>>>>>>>>>>>> e) "Require Deterministic Input" >>>>> >>>>>>>>>>>>>>>>>> f) ParDo that frees the resources >>>>> >>>>>>>>>>>>>>>>>> >>>>> >>>>>>>>>>>>>>>>>> By making the use of the resource part of the data >>>>> >>>>>>>>>>>>>>>>>> it is >>>>> >>>>>>>>>>>>>>>>>> possible to "checkpoint" which resources may be in >>>>> >>>>>>>>>>>>>>>>>> use or have been finished >>>>> >>>>>>>>>>>>>>>>>> by using the require deterministic input. This is >>>>> >>>>>>>>>>>>>>>>>> important to ensuring >>>>> >>>>>>>>>>>>>>>>>> everything is actually cleaned up. >>>>> >>>>>>>>>>>>>>>>>> >>>>> >>>>>>>>>>>>>>>>>> >>>>> >>>>>>>>>>>>>>>>>> I nees that but generic and not case by case to >>>>> >>>>>>>>>>>>>>>>>> industrialize some api on top of beam. >>>>> >>>>>>>>>>>>>>>>>> >>>>> >>>>>>>>>>>>>>>>>> >>>>> >>>>>>>>>>>>>>>>>> >>>>> >>>>>>>>>>>>>>>>>> 3. Some other use case that I may be missing? If it >>>>> >>>>>>>>>>>>>>>>>> is >>>>> >>>>>>>>>>>>>>>>>> this case, could you elaborate on what you are >>>>> >>>>>>>>>>>>>>>>>> trying to accomplish? That >>>>> >>>>>>>>>>>>>>>>>> would help me understand both the problems with >>>>> >>>>>>>>>>>>>>>>>> existing options and >>>>> >>>>>>>>>>>>>>>>>> possibly what could be done to help. >>>>> >>>>>>>>>>>>>>>>>> >>>>> >>>>>>>>>>>>>>>>>> >>>>> >>>>>>>>>>>>>>>>>> I understand there are sorkaround for almost all >>>>> >>>>>>>>>>>>>>>>>> cases but >>>>> >>>>>>>>>>>>>>>>>> means each transform is different in its lifecycle >>>>> >>>>>>>>>>>>>>>>>> handling except i >>>>> >>>>>>>>>>>>>>>>>> dislike it a lot at a scale and as a user since you >>>>> >>>>>>>>>>>>>>>>>> cant put any unified >>>>> >>>>>>>>>>>>>>>>>> practise on top of beam, it also makes beam very >>>>> >>>>>>>>>>>>>>>>>> hard to integrate or to use >>>>> >>>>>>>>>>>>>>>>>> to build higher level libraries or softwares. >>>>> >>>>>>>>>>>>>>>>>> >>>>> >>>>>>>>>>>>>>>>>> This is why i tried to not start the workaround >>>>> >>>>>>>>>>>>>>>>>> discussions and just stay at API level. >>>>> >>>>>>>>>>>>>>>>>> >>>>> >>>>>>>>>>>>>>>>>> >>>>> >>>>>>>>>>>>>>>>>> >>>>> >>>>>>>>>>>>>>>>>> -- Ben >>>>> >>>>>>>>>>>>>>>>>> >>>>> >>>>>>>>>>>>>>>>>> >>>>> >>>>>>>>>>>>>>>>>> On Sun, Feb 18, 2018 at 9:56 AM Romain Manni-Bucau >>>>> >>>>>>>>>>>>>>>>>> <rmannibu...@gmail.com> wrote: >>>>> >>>>>>>>>>>>>>>>>>> >>>>> >>>>>>>>>>>>>>>>>>> 2018-02-18 18:36 GMT+01:00 Eugene Kirpichov >>>>> >>>>>>>>>>>>>>>>>>> <kirpic...@google.com>: >>>>> >>>>>>>>>>>>>>>>>>>> >>>>> >>>>>>>>>>>>>>>>>>>> "Machine state" is overly low-level because many >>>>> >>>>>>>>>>>>>>>>>>>> of the >>>>> >>>>>>>>>>>>>>>>>>>> possible reasons can happen on a perfectly fine >>>>> >>>>>>>>>>>>>>>>>>>> machine. >>>>> >>>>>>>>>>>>>>>>>>>> If you'd like to rephrase it to "it will be called >>>>> >>>>>>>>>>>>>>>>>>>> except in various situations where it's logically >>>>> >>>>>>>>>>>>>>>>>>>> impossible or impractical >>>>> >>>>>>>>>>>>>>>>>>>> to guarantee that it's called", that's fine. Or >>>>> >>>>>>>>>>>>>>>>>>>> you can list some of the >>>>> >>>>>>>>>>>>>>>>>>>> examples above. >>>>> >>>>>>>>>>>>>>>>>>> >>>>> >>>>>>>>>>>>>>>>>>> >>>>> >>>>>>>>>>>>>>>>>>> Sounds ok to me >>>>> >>>>>>>>>>>>>>>>>>> >>>>> >>>>>>>>>>>>>>>>>>>> >>>>> >>>>>>>>>>>>>>>>>>>> >>>>> >>>>>>>>>>>>>>>>>>>> The main point for the user is, you *will* see >>>>> >>>>>>>>>>>>>>>>>>>> non-preventable situations where it couldn't be >>>>> >>>>>>>>>>>>>>>>>>>> called - it's not just >>>>> >>>>>>>>>>>>>>>>>>>> intergalactic crashes - so if the logic is very >>>>> >>>>>>>>>>>>>>>>>>>> important (e.g. cleaning up >>>>> >>>>>>>>>>>>>>>>>>>> a large amount of temporary files, shutting down a >>>>> >>>>>>>>>>>>>>>>>>>> large number of VMs you >>>>> >>>>>>>>>>>>>>>>>>>> started etc), you have to express it using one of >>>>> >>>>>>>>>>>>>>>>>>>> the other methods that >>>>> >>>>>>>>>>>>>>>>>>>> have stricter guarantees (which obviously come at >>>>> >>>>>>>>>>>>>>>>>>>> a cost, e.g. no >>>>> >>>>>>>>>>>>>>>>>>>> pass-by-reference). >>>>> >>>>>>>>>>>>>>>>>>> >>>>> >>>>>>>>>>>>>>>>>>> >>>>> >>>>>>>>>>>>>>>>>>> FinishBundle has the exact same guarantee sadly so >>>>> >>>>>>>>>>>>>>>>>>> not >>>>> >>>>>>>>>>>>>>>>>>> which which other method you speak about. >>>>> >>>>>>>>>>>>>>>>>>> Concretely if you make it really >>>>> >>>>>>>>>>>>>>>>>>> unreliable - this is what best effort sounds to me >>>>> >>>>>>>>>>>>>>>>>>> - then users can use it >>>>> >>>>>>>>>>>>>>>>>>> to clean anything but if you make it "can happen >>>>> >>>>>>>>>>>>>>>>>>> but it is unexpected and >>>>> >>>>>>>>>>>>>>>>>>> means something happent" then it is fine to have a >>>>> >>>>>>>>>>>>>>>>>>> manual - or auto if fancy >>>>> >>>>>>>>>>>>>>>>>>> - recovery procedure. This is where it makes all >>>>> >>>>>>>>>>>>>>>>>>> the difference and impacts >>>>> >>>>>>>>>>>>>>>>>>> the developpers, ops (all users basically). >>>>> >>>>>>>>>>>>>>>>>>> >>>>> >>>>>>>>>>>>>>>>>>>> >>>>> >>>>>>>>>>>>>>>>>>>> >>>>> >>>>>>>>>>>>>>>>>>>> On Sun, Feb 18, 2018 at 9:16 AM Romain Manni-Bucau >>>>> >>>>>>>>>>>>>>>>>>>> <rmannibu...@gmail.com> wrote: >>>>> >>>>>>>>>>>>>>>>>>>>> >>>>> >>>>>>>>>>>>>>>>>>>>> Agree Eugene except that "best effort" means >>>>> >>>>>>>>>>>>>>>>>>>>> that. It >>>>> >>>>>>>>>>>>>>>>>>>>> is also often used to say "at will" and this is >>>>> >>>>>>>>>>>>>>>>>>>>> what triggered this thread. >>>>> >>>>>>>>>>>>>>>>>>>>> >>>>> >>>>>>>>>>>>>>>>>>>>> I'm fine using "except if the machine state >>>>> >>>>>>>>>>>>>>>>>>>>> prevents >>>>> >>>>>>>>>>>>>>>>>>>>> it" but "best effort" is too open and can be very >>>>> >>>>>>>>>>>>>>>>>>>>> badly and wrongly >>>>> >>>>>>>>>>>>>>>>>>>>> perceived by users (like I did). >>>>> >>>>>>>>>>>>>>>>>>>>> >>>>> >>>>>>>>>>>>>>>>>>>>> >>>>> >>>>>>>>>>>>>>>>>>>>> Romain Manni-Bucau >>>>> >>>>>>>>>>>>>>>>>>>>> @rmannibucau | Blog | Old Blog | Github | >>>>> >>>>>>>>>>>>>>>>>>>>> LinkedIn | >>>>> >>>>>>>>>>>>>>>>>>>>> Book >>>>> >>>>>>>>>>>>>>>>>>>>> >>>>> >>>>>>>>>>>>>>>>>>>>> 2018-02-18 18:13 GMT+01:00 Eugene Kirpichov >>>>> >>>>>>>>>>>>>>>>>>>>> <kirpic...@google.com>: >>>>> >>>>>>>>>>>>>>>>>>>>>> >>>>> >>>>>>>>>>>>>>>>>>>>>> It will not be called if it's impossible to call >>>>> >>>>>>>>>>>>>>>>>>>>>> it: >>>>> >>>>>>>>>>>>>>>>>>>>>> in the example situation you have (intergalactic >>>>> >>>>>>>>>>>>>>>>>>>>>> crash), and in a number of >>>>> >>>>>>>>>>>>>>>>>>>>>> more common cases: eg in case the worker >>>>> >>>>>>>>>>>>>>>>>>>>>> container has crashed (eg user code >>>>> >>>>>>>>>>>>>>>>>>>>>> in a different thread called a C library over >>>>> >>>>>>>>>>>>>>>>>>>>>> JNI and it segfaulted), JVM >>>>> >>>>>>>>>>>>>>>>>>>>>> bug, crash due to user code OOM, in case the >>>>> >>>>>>>>>>>>>>>>>>>>>> worker has lost network >>>>> >>>>>>>>>>>>>>>>>>>>>> connectivity (then it may be called but it won't >>>>> >>>>>>>>>>>>>>>>>>>>>> be able to do anything >>>>> >>>>>>>>>>>>>>>>>>>>>> useful), in case this is running on a >>>>> >>>>>>>>>>>>>>>>>>>>>> preemptible VM and it was preempted by >>>>> >>>>>>>>>>>>>>>>>>>>>> the underlying cluster manager without notice or >>>>> >>>>>>>>>>>>>>>>>>>>>> if the worker was too busy >>>>> >>>>>>>>>>>>>>>>>>>>>> with other stuff (eg calling other Teardown >>>>> >>>>>>>>>>>>>>>>>>>>>> functions) until the preemption >>>>> >>>>>>>>>>>>>>>>>>>>>> timeout elapsed, in case the underlying hardware >>>>> >>>>>>>>>>>>>>>>>>>>>> simply failed (which >>>>> >>>>>>>>>>>>>>>>>>>>>> happens quite often at scale), and in many other >>>>> >>>>>>>>>>>>>>>>>>>>>> conditions. >>>>> >>>>>>>>>>>>>>>>>>>>>> >>>>> >>>>>>>>>>>>>>>>>>>>>> "Best effort" is the commonly used term to >>>>> >>>>>>>>>>>>>>>>>>>>>> describe >>>>> >>>>>>>>>>>>>>>>>>>>>> such behavior. Please feel free to file bugs for >>>>> >>>>>>>>>>>>>>>>>>>>>> cases where you observed a >>>>> >>>>>>>>>>>>>>>>>>>>>> runner not call Teardown in a situation where it >>>>> >>>>>>>>>>>>>>>>>>>>>> was possible to call it but >>>>> >>>>>>>>>>>>>>>>>>>>>> the runner made insufficient effort. >>>>> >>>>>>>>>>>>>>>>>>>>>> >>>>> >>>>>>>>>>>>>>>>>>>>>> >>>>> >>>>>>>>>>>>>>>>>>>>>> On Sun, Feb 18, 2018, 9:02 AM Romain Manni-Bucau >>>>> >>>>>>>>>>>>>>>>>>>>>> <rmannibu...@gmail.com> wrote: >>>>> >>>>>>>>>>>>>>>>>>>>>>> >>>>> >>>>>>>>>>>>>>>>>>>>>>> 2018-02-18 18:00 GMT+01:00 Eugene Kirpichov >>>>> >>>>>>>>>>>>>>>>>>>>>>> <kirpic...@google.com>: >>>>> >>>>>>>>>>>>>>>>>>>>>>>> >>>>> >>>>>>>>>>>>>>>>>>>>>>>> >>>>> >>>>>>>>>>>>>>>>>>>>>>>> >>>>> >>>>>>>>>>>>>>>>>>>>>>>> On Sun, Feb 18, 2018, 2:06 AM Romain >>>>> >>>>>>>>>>>>>>>>>>>>>>>> Manni-Bucau >>>>> >>>>>>>>>>>>>>>>>>>>>>>> <rmannibu...@gmail.com> wrote: >>>>> >>>>>>>>>>>>>>>>>>>>>>>>> >>>>> >>>>>>>>>>>>>>>>>>>>>>>>> >>>>> >>>>>>>>>>>>>>>>>>>>>>>>> >>>>> >>>>>>>>>>>>>>>>>>>>>>>>> Le 18 févr. 2018 00:23, "Kenneth Knowles" >>>>> >>>>>>>>>>>>>>>>>>>>>>>>> <k...@google.com> a écrit : >>>>> >>>>>>>>>>>>>>>>>>>>>>>>> >>>>> >>>>>>>>>>>>>>>>>>>>>>>>> On Sat, Feb 17, 2018 at 3:09 PM, Romain >>>>> >>>>>>>>>>>>>>>>>>>>>>>>> Manni-Bucau >>>>> >>>>>>>>>>>>>>>>>>>>>>>>> <rmannibu...@gmail.com> wrote: >>>>> >>>>>>>>>>>>>>>>>>>>>>>>>> >>>>> >>>>>>>>>>>>>>>>>>>>>>>>>> If you give an example of a high-level need >>>>> >>>>>>>>>>>>>>>>>>>>>>>>>> (e.g. >>>>> >>>>>>>>>>>>>>>>>>>>>>>>>> "I'm trying to write an IO for system $x and >>>>> >>>>>>>>>>>>>>>>>>>>>>>>>> it requires the following >>>>> >>>>>>>>>>>>>>>>>>>>>>>>>> initialization and the following cleanup >>>>> >>>>>>>>>>>>>>>>>>>>>>>>>> logic and the following processing >>>>> >>>>>>>>>>>>>>>>>>>>>>>>>> in between") I'll be better able to help >>>>> >>>>>>>>>>>>>>>>>>>>>>>>>> you. >>>>> >>>>>>>>>>>>>>>>>>>>>>>>>> >>>>> >>>>>>>>>>>>>>>>>>>>>>>>>> >>>>> >>>>>>>>>>>>>>>>>>>>>>>>>> Take a simple example of a transform >>>>> >>>>>>>>>>>>>>>>>>>>>>>>>> requiring a >>>>> >>>>>>>>>>>>>>>>>>>>>>>>>> connection. Using bundles is a perf killer >>>>> >>>>>>>>>>>>>>>>>>>>>>>>>> since size is not controlled. >>>>> >>>>>>>>>>>>>>>>>>>>>>>>>> Using teardown doesnt allow you to release >>>>> >>>>>>>>>>>>>>>>>>>>>>>>>> the connection since it is a best >>>>> >>>>>>>>>>>>>>>>>>>>>>>>>> effort thing. Not releasing the connection >>>>> >>>>>>>>>>>>>>>>>>>>>>>>>> makes you pay a lot - aws ;) - or >>>>> >>>>>>>>>>>>>>>>>>>>>>>>>> prevents you to launch other processings - >>>>> >>>>>>>>>>>>>>>>>>>>>>>>>> concurrent limit. >>>>> >>>>>>>>>>>>>>>>>>>>>>>>> >>>>> >>>>>>>>>>>>>>>>>>>>>>>>> >>>>> >>>>>>>>>>>>>>>>>>>>>>>>> For this example @Teardown is an exact fit. >>>>> >>>>>>>>>>>>>>>>>>>>>>>>> If >>>>> >>>>>>>>>>>>>>>>>>>>>>>>> things die so badly that @Teardown is not >>>>> >>>>>>>>>>>>>>>>>>>>>>>>> called then nothing else can be >>>>> >>>>>>>>>>>>>>>>>>>>>>>>> called to close the connection either. What >>>>> >>>>>>>>>>>>>>>>>>>>>>>>> AWS service are you thinking of >>>>> >>>>>>>>>>>>>>>>>>>>>>>>> that stays open for a long time when >>>>> >>>>>>>>>>>>>>>>>>>>>>>>> everything at the other end has died? >>>>> >>>>>>>>>>>>>>>>>>>>>>>>> >>>>> >>>>>>>>>>>>>>>>>>>>>>>>> >>>>> >>>>>>>>>>>>>>>>>>>>>>>>> You assume connections are kind of stateless >>>>> >>>>>>>>>>>>>>>>>>>>>>>>> but >>>>> >>>>>>>>>>>>>>>>>>>>>>>>> some (proprietary) protocols requires some >>>>> >>>>>>>>>>>>>>>>>>>>>>>>> closing exchanges which are not >>>>> >>>>>>>>>>>>>>>>>>>>>>>>> only "im leaving". >>>>> >>>>>>>>>>>>>>>>>>>>>>>>> >>>>> >>>>>>>>>>>>>>>>>>>>>>>>> For aws i was thinking about starting some >>>>> >>>>>>>>>>>>>>>>>>>>>>>>> services >>>>> >>>>>>>>>>>>>>>>>>>>>>>>> - machines - on the fly in a pipeline startup >>>>> >>>>>>>>>>>>>>>>>>>>>>>>> and closing them at the end. >>>>> >>>>>>>>>>>>>>>>>>>>>>>>> If teardown is not called you leak machines >>>>> >>>>>>>>>>>>>>>>>>>>>>>>> and money. You can say it can be >>>>> >>>>>>>>>>>>>>>>>>>>>>>>> done another way...as the full pipeline ;). >>>>> >>>>>>>>>>>>>>>>>>>>>>>>> >>>>> >>>>>>>>>>>>>>>>>>>>>>>>> I dont want to be picky but if beam cant >>>>> >>>>>>>>>>>>>>>>>>>>>>>>> handle its >>>>> >>>>>>>>>>>>>>>>>>>>>>>>> components lifecycle it can be used at scale >>>>> >>>>>>>>>>>>>>>>>>>>>>>>> for generic pipelines and if >>>>> >>>>>>>>>>>>>>>>>>>>>>>>> bound to some particular IO. >>>>> >>>>>>>>>>>>>>>>>>>>>>>>> >>>>> >>>>>>>>>>>>>>>>>>>>>>>>> What does prevent to enforce teardown - >>>>> >>>>>>>>>>>>>>>>>>>>>>>>> ignoring >>>>> >>>>>>>>>>>>>>>>>>>>>>>>> the interstellar crash case which cant be >>>>> >>>>>>>>>>>>>>>>>>>>>>>>> handled by any human system? >>>>> >>>>>>>>>>>>>>>>>>>>>>>>> Nothing technically. Why do you push to not >>>>> >>>>>>>>>>>>>>>>>>>>>>>>> handle it? Is it due to some >>>>> >>>>>>>>>>>>>>>>>>>>>>>>> legacy code on dataflow or something else? >>>>> >>>>>>>>>>>>>>>>>>>>>>>> >>>>> >>>>>>>>>>>>>>>>>>>>>>>> Teardown *is* already documented and >>>>> >>>>>>>>>>>>>>>>>>>>>>>> implemented >>>>> >>>>>>>>>>>>>>>>>>>>>>>> this way (best-effort). So I'm not sure what >>>>> >>>>>>>>>>>>>>>>>>>>>>>> kind of change you're asking >>>>> >>>>>>>>>>>>>>>>>>>>>>>> for. >>>>> >>>>>>>>>>>>>>>>>>>>>>> >>>>> >>>>>>>>>>>>>>>>>>>>>>> >>>>> >>>>>>>>>>>>>>>>>>>>>>> Remove "best effort" from the javadoc. If it is >>>>> >>>>>>>>>>>>>>>>>>>>>>> not >>>>> >>>>>>>>>>>>>>>>>>>>>>> call then it is a bug and we are done :). >>>>> >>>>>>>>>>>>>>>>>>>>>>> >>>>> >>>>>>>>>>>>>>>>>>>>>>>> >>>>> >>>>>>>>>>>>>>>>>>>>>>>> >>>>> >>>>>>>>>>>>>>>>>>>>>>>>> >>>>> >>>>>>>>>>>>>>>>>>>>>>>>> Also what does it mean for the users? Direct >>>>> >>>>>>>>>>>>>>>>>>>>>>>>> runner >>>>> >>>>>>>>>>>>>>>>>>>>>>>>> does it so if a user udes the RI in test, he >>>>> >>>>>>>>>>>>>>>>>>>>>>>>> will get a different behavior >>>>> >>>>>>>>>>>>>>>>>>>>>>>>> in prod? Also dont forget the user doesnt >>>>> >>>>>>>>>>>>>>>>>>>>>>>>> know what the IOs he composes use >>>>> >>>>>>>>>>>>>>>>>>>>>>>>> so this is so impacting for the whole product >>>>> >>>>>>>>>>>>>>>>>>>>>>>>> than he must be handled IMHO. >>>>> >>>>>>>>>>>>>>>>>>>>>>>>> >>>>> >>>>>>>>>>>>>>>>>>>>>>>>> I understand the portability culture is new >>>>> >>>>>>>>>>>>>>>>>>>>>>>>> in big >>>>> >>>>>>>>>>>>>>>>>>>>>>>>> data world but it is not a reason to ignore >>>>> >>>>>>>>>>>>>>>>>>>>>>>>> what people did for years and do >>>>> >>>>>>>>>>>>>>>>>>>>>>>>> it wrong before doing right ;). >>>>> >>>>>>>>>>>>>>>>>>>>>>>>> >>>>> >>>>>>>>>>>>>>>>>>>>>>>>> My proposal is to list what can prevent to >>>>> >>>>>>>>>>>>>>>>>>>>>>>>> guarantee - in the normal IT conditions - the >>>>> >>>>>>>>>>>>>>>>>>>>>>>>> execution of teardown. Then we >>>>> >>>>>>>>>>>>>>>>>>>>>>>>> see if we can handle it and only if there is >>>>> >>>>>>>>>>>>>>>>>>>>>>>>> a technical reason we cant we >>>>> >>>>>>>>>>>>>>>>>>>>>>>>> make it experimental/unsupported in the api. >>>>> >>>>>>>>>>>>>>>>>>>>>>>>> I know spark and flink can, any >>>>> >>>>>>>>>>>>>>>>>>>>>>>>> unknown blocker for other runners? >>>>> >>>>>>>>>>>>>>>>>>>>>>>>> >>>>> >>>>>>>>>>>>>>>>>>>>>>>>> Technical note: even a kill should go through >>>>> >>>>>>>>>>>>>>>>>>>>>>>>> java >>>>> >>>>>>>>>>>>>>>>>>>>>>>>> shutdown hooks otherwise your environment >>>>> >>>>>>>>>>>>>>>>>>>>>>>>> (beam enclosing software) is fully >>>>> >>>>>>>>>>>>>>>>>>>>>>>>> unhandled and your overall system is >>>>> >>>>>>>>>>>>>>>>>>>>>>>>> uncontrolled. Only case where it is not >>>>> >>>>>>>>>>>>>>>>>>>>>>>>> true is when the software is always owned by >>>>> >>>>>>>>>>>>>>>>>>>>>>>>> a vendor and never installed on >>>>> >>>>>>>>>>>>>>>>>>>>>>>>> customer environment. In this case it belongd >>>>> >>>>>>>>>>>>>>>>>>>>>>>>> to the vendor to handle beam >>>>> >>>>>>>>>>>>>>>>>>>>>>>>> API and not to beam to adjust its API for a >>>>> >>>>>>>>>>>>>>>>>>>>>>>>> vendor - otherwise all >>>>> >>>>>>>>>>>>>>>>>>>>>>>>> unsupported features by one runner should be >>>>> >>>>>>>>>>>>>>>>>>>>>>>>> made optional right? >>>>> >>>>>>>>>>>>>>>>>>>>>>>>> >>>>> >>>>>>>>>>>>>>>>>>>>>>>>> All state is not about network, even in >>>>> >>>>>>>>>>>>>>>>>>>>>>>>> distributed >>>>> >>>>>>>>>>>>>>>>>>>>>>>>> systems so this is key to have an explicit >>>>> >>>>>>>>>>>>>>>>>>>>>>>>> and defined lifecycle. >>>>> >>>>>>>>>>>>>>>>>>>>>>>>> >>>>> >>>>>>>>>>>>>>>>>>>>>>>>> >>>>> >>>>>>>>>>>>>>>>>>>>>>>>> Kenn >>>>> >>>>>>>>>>>>>>>>>>>>>>>>> >>>>> >>>>>>>>>>>>>>>>>>>>>>>>> >>>>> >>>>>>>>>>>>>>>>>>>>> >>>>> >>>>>>>>>>>>>>>>>> >>>>> >>>>>>>>>>>>>>>>> >>>>> >>>>>>>>>>>> >>>>> >>>>>>>>>>> >>>>> >>>>>>>>>> >>>>> >>>>>>>>> >>>>> >>> >>>>> >> >>>>> >> >>>>> > >>>> >>>> >> >