Romain, would it be fair to say that currently the goal of your participation in this discussion is to identify situations where @Teardown in principle could have been called, but some of the current runners don't make a good enough effort to call it? If yes - as I said before, please, by all means, file bugs of the form "Runner X doesn't call @Teardown in situation Y" if you're aware of any, and feel free to send PRs fixing runner X to reliably call @Teardown in situation Y. I think we all agree that this would be a good improvement.
On Mon, Feb 19, 2018 at 2:03 PM Romain Manni-Bucau <[email protected]> wrote: > > > Le 19 févr. 2018 22:56, "Reuven Lax" <[email protected]> a écrit : > > > > On Mon, Feb 19, 2018 at 1:51 PM, Romain Manni-Bucau <[email protected] > > wrote: > >> >> >> Le 19 févr. 2018 21:28, "Reuven Lax" <[email protected]> a écrit : >> >> How do you call teardown? There are cases in which the Java code gets no >> indication that the restart is happening (e.g. cases where the machine >> itself is taken down) >> >> >> This is a bug, 0 downtime maintenance is very doable in 2018 ;). Crashes >> are bugs, kill -9 to shutdown is a bug too. Other cases let call shutdown >> with a hook worse case. >> > > What you say here is simply not true. > > There are many scenarios in which workers shutdown with no opportunity for > any sort of shutdown hook. Sometimes the entire machine gets shutdown, and > not even the OS will have much of a chance to do anything. At scale this > will happen with some regularity, and a distributed system that assumes > this will not happen is a poor distributed system. > > > This is part of the infra and there is no reason the machine is shutdown > without shutting down what runs on it before except if it is a bug in the > software or setup. I can hear you maybe dont do it everywhere but there is > no blocker to do it. Means you can shutdown the machines and guarantee > teardown is called. > > Where i go is simply that it is doable and beam sdk core can assume setup > is well done. If there is a best effort downside due to that - with the > meaning you defined - it is an impl bug or a user installation issue. > > Technically all is true. > > What can prevent teardown is a hardware failure or so. This is fine and > doesnt need to be in doc since it is life in IT and obvious or must be very > explicit to avoid current ambiguity. > > > >> >> >> >> On Mon, Feb 19, 2018, 12:24 PM Romain Manni-Bucau <[email protected]> >> wrote: >> >>> Restarting doesnt mean you dont call teardown. Except a bug there is no >>> reason - technically - it happens, no reason. >>> >>> Le 19 févr. 2018 21:14, "Reuven Lax" <[email protected]> a écrit : >>> >>>> Workers restarting is not a bug, it's standard often expected. >>>> >>>> On Mon, Feb 19, 2018, 12:03 PM Romain Manni-Bucau < >>>> [email protected]> wrote: >>>> >>>>> Nothing, as mentionned it is a bug so recovery is a bug recovery >>>>> (procedure) >>>>> >>>>> Le 19 févr. 2018 19:42, "Eugene Kirpichov" <[email protected]> a >>>>> écrit : >>>>> >>>>>> So what would you like to happen if there is a crash? The DoFn >>>>>> instance no longer exists because the JVM it ran on no longer exists. >>>>>> What >>>>>> should Teardown be called on? >>>>>> >>>>>> On Mon, Feb 19, 2018, 10:20 AM Romain Manni-Bucau < >>>>>> [email protected]> wrote: >>>>>> >>>>>>> This is what i want and not 999999 teardowns for 1000000 setups >>>>>>> until there is an unexpected crash (= a bug). >>>>>>> >>>>>>> Le 19 févr. 2018 18:57, "Reuven Lax" <[email protected]> a écrit : >>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> On Mon, Feb 19, 2018 at 7:11 AM, Romain Manni-Bucau < >>>>>>>> [email protected]> wrote: >>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> 2018-02-19 15:57 GMT+01:00 Reuven Lax <[email protected]>: >>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> On Mon, Feb 19, 2018 at 12:35 AM, Romain Manni-Bucau < >>>>>>>>>> [email protected]> wrote: >>>>>>>>>> >>>>>>>>>>> @Reuven: in practise it is created by pool of 256 but leads to >>>>>>>>>>> the same pattern, the teardown is just a "if (iCreatedThem) >>>>>>>>>>> releaseThem();" >>>>>>>>>>> >>>>>>>>>> >>>>>>>>>> How do you control "256?" Even if you have a pool of 256 workers, >>>>>>>>>> nothing in Beam guarantees how many threads and DoFns are created per >>>>>>>>>> worker. In theory the runner might decide to create 1000 threads on >>>>>>>>>> each >>>>>>>>>> worker. >>>>>>>>>> >>>>>>>>> >>>>>>>>> Nop was the other way around, in this case on AWS you can get 256 >>>>>>>>> instances at once but not 512 (which will be 2x256). So when you >>>>>>>>> compute >>>>>>>>> the distribution you allocate to some fn the role to own the instance >>>>>>>>> lookup and releasing. >>>>>>>>> >>>>>>>> >>>>>>>> I still don't understand. Let's be more precise. If you write the >>>>>>>> following code: >>>>>>>> >>>>>>>> pCollection.apply(ParDo.of(new MyDoFn())); >>>>>>>> >>>>>>>> There is no way to control how many instances of MyDoFn are >>>>>>>> created. The runner might decided to create a million instances of this >>>>>>>> class across your worker pool, which means that you will get a million >>>>>>>> Setup and Teardown calls. >>>>>>>> >>>>>>>> >>>>>>>>> Anyway this was just an example of an external resource you must >>>>>>>>> release. Real topic is that beam should define asap a guaranteed >>>>>>>>> generic >>>>>>>>> lifecycle to let user embrace its programming model. >>>>>>>>> >>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>>> @Eugene: >>>>>>>>>>> 1. wait logic is about passing the value which is not always >>>>>>>>>>> possible (like 15% of cases from my raw estimate) >>>>>>>>>>> 2. sdf: i'll try to detail why i mention SDF more here >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> Concretely beam exposes a portable API (included in the SDK >>>>>>>>>>> core). This API defines a *container* API and therefore implies bean >>>>>>>>>>> lifecycles. I'll not detail them all but just use the sources and >>>>>>>>>>> dofn (not >>>>>>>>>>> sdf) to illustrate the idea I'm trying to develop. >>>>>>>>>>> >>>>>>>>>>> A. Source >>>>>>>>>>> >>>>>>>>>>> A source computes a partition plan with 2 primitives: >>>>>>>>>>> estimateSize and split. As an user you can expect both to be called >>>>>>>>>>> on the >>>>>>>>>>> same bean instance to avoid to pay the same connection cost(s) >>>>>>>>>>> twice. >>>>>>>>>>> Concretely: >>>>>>>>>>> >>>>>>>>>>> connect() >>>>>>>>>>> try { >>>>>>>>>>> estimateSize() >>>>>>>>>>> split() >>>>>>>>>>> } finally { >>>>>>>>>>> disconnect() >>>>>>>>>>> } >>>>>>>>>>> >>>>>>>>>>> this is not guaranteed by the API so you must do: >>>>>>>>>>> >>>>>>>>>>> connect() >>>>>>>>>>> try { >>>>>>>>>>> estimateSize() >>>>>>>>>>> } finally { >>>>>>>>>>> disconnect() >>>>>>>>>>> } >>>>>>>>>>> connect() >>>>>>>>>>> try { >>>>>>>>>>> split() >>>>>>>>>>> } finally { >>>>>>>>>>> disconnect() >>>>>>>>>>> } >>>>>>>>>>> >>>>>>>>>>> + a workaround with an internal estimate size since this >>>>>>>>>>> primitive is often called in split but you dont want to connect >>>>>>>>>>> twice in >>>>>>>>>>> the second phase. >>>>>>>>>>> >>>>>>>>>>> Why do you need that? Simply cause you want to define an API to >>>>>>>>>>> implement sources which initializes the source bean and destroys it. >>>>>>>>>>> I insists it is a very very basic concern for such API. However >>>>>>>>>>> beam doesn't embraces it and doesn't assume it so building any API >>>>>>>>>>> on top >>>>>>>>>>> of beam is very hurtful today and for direct beam users you hit the >>>>>>>>>>> exact >>>>>>>>>>> same issues - check how IO are implemented, the static utilities >>>>>>>>>>> which >>>>>>>>>>> create volatile connections preventing to reuse existing connection >>>>>>>>>>> in a >>>>>>>>>>> single method ( >>>>>>>>>>> https://github.com/apache/beam/blob/master/sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java#L862 >>>>>>>>>>> ). >>>>>>>>>>> >>>>>>>>>>> Same logic applies to the reader which is then created. >>>>>>>>>>> >>>>>>>>>>> B. DoFn & SDF >>>>>>>>>>> >>>>>>>>>>> As a fn dev you expect the same from the beam runtime: init(); >>>>>>>>>>> try { while (...) process(); } finally { destroy(); } and that it is >>>>>>>>>>> executed on the exact same instance to be able to be stateful at >>>>>>>>>>> that level >>>>>>>>>>> for expensive connections/operations/flow state handling. >>>>>>>>>>> >>>>>>>>>>> As you mentionned with the million example, this sequence should >>>>>>>>>>> happen for each single instance so 1M times for your example. >>>>>>>>>>> >>>>>>>>>>> Now why did I mention SDF several times? Because SDF is a >>>>>>>>>>> generalisation of both cases (source and dofn). Therefore it >>>>>>>>>>> creates way >>>>>>>>>>> more instances and requires to have a way more strict/explicit >>>>>>>>>>> definition >>>>>>>>>>> of the exact lifecycle and which instance does what. Since beam >>>>>>>>>>> handles the >>>>>>>>>>> full lifecycle of the bean instances it must provide init/destroy >>>>>>>>>>> hooks >>>>>>>>>>> (setup/teardown) which can be stateful. >>>>>>>>>>> >>>>>>>>>>> If you take the JDBC example which was mentionned earlier. >>>>>>>>>>> Today, because of the teardown issue it uses bundles. Since bundles >>>>>>>>>>> size is >>>>>>>>>>> not defined - and will not with SDF, it must use a pool to be able >>>>>>>>>>> to reuse >>>>>>>>>>> a connection instance to not correct performances. Now with the SDF >>>>>>>>>>> and the >>>>>>>>>>> split increase, how do you handle the pool size? Generally in batch >>>>>>>>>>> you use >>>>>>>>>>> a single connection per thread to avoid to consume all database >>>>>>>>>>> connections. With a pool you have 2 choices: 1. use a pool of 1, 2. >>>>>>>>>>> use a >>>>>>>>>>> pool a bit higher but multiplied by the number of beans you will >>>>>>>>>>> likely x2 >>>>>>>>>>> or 3 the connection count and make the execution fail with "no more >>>>>>>>>>> connection available". I you picked 1 (pool of #1), then you still >>>>>>>>>>> have to >>>>>>>>>>> have a reliable teardown by pool instance (close() generally) to >>>>>>>>>>> ensure you >>>>>>>>>>> release the pool and don't leak the connection information in the >>>>>>>>>>> JVM. In >>>>>>>>>>> all case you come back to the init()/destroy() lifecycle even if >>>>>>>>>>> you fake >>>>>>>>>>> to get connections with bundles. >>>>>>>>>>> >>>>>>>>>>> Just to make it obvious: SDF mentions are just cause SDF imply >>>>>>>>>>> all the current issues with the loose definition of the bean >>>>>>>>>>> lifecycles at >>>>>>>>>>> an exponential level, nothing else. >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> Romain Manni-Bucau >>>>>>>>>>> @rmannibucau <https://twitter.com/rmannibucau> | Blog >>>>>>>>>>> <https://rmannibucau.metawerx.net/> | Old Blog >>>>>>>>>>> <http://rmannibucau.wordpress.com> | Github >>>>>>>>>>> <https://github.com/rmannibucau> | LinkedIn >>>>>>>>>>> <https://www.linkedin.com/in/rmannibucau> | Book >>>>>>>>>>> <https://www.packtpub.com/application-development/java-ee-8-high-performance> >>>>>>>>>>> >>>>>>>>>>> 2018-02-18 22:32 GMT+01:00 Eugene Kirpichov < >>>>>>>>>>> [email protected]>: >>>>>>>>>>> >>>>>>>>>>>> The kind of whole-transform lifecycle you're mentioning can be >>>>>>>>>>>> accomplished using the Wait transform as I suggested in the thread >>>>>>>>>>>> above, >>>>>>>>>>>> and I believe it should become the canonical way to do that. >>>>>>>>>>>> >>>>>>>>>>>> (Would like to reiterate one more time, as the main author of >>>>>>>>>>>> most design documents related to SDF and of its implementation in >>>>>>>>>>>> the Java >>>>>>>>>>>> direct and dataflow runner that SDF is fully unrelated to the >>>>>>>>>>>> topic of >>>>>>>>>>>> cleanup - I'm very confused as to why it keeps coming up) >>>>>>>>>>>> >>>>>>>>>>>> On Sun, Feb 18, 2018, 1:15 PM Romain Manni-Bucau < >>>>>>>>>>>> [email protected]> wrote: >>>>>>>>>>>> >>>>>>>>>>>>> I kind of agree except transforms lack a lifecycle too. My >>>>>>>>>>>>> understanding is that sdf could be a way to unify it and clean >>>>>>>>>>>>> the api. >>>>>>>>>>>>> >>>>>>>>>>>>> Otherwise how to normalize - single api - lifecycle of >>>>>>>>>>>>> transforms? >>>>>>>>>>>>> >>>>>>>>>>>>> Le 18 févr. 2018 21:32, "Ben Chambers" <[email protected]> >>>>>>>>>>>>> a écrit : >>>>>>>>>>>>> >>>>>>>>>>>>>> Are you sure that focusing on the cleanup of specific DoFn's >>>>>>>>>>>>>> is appropriate? Many cases where cleanup is necessary, it is >>>>>>>>>>>>>> around an >>>>>>>>>>>>>> entire composite PTransform. I think there have been >>>>>>>>>>>>>> discussions/proposals >>>>>>>>>>>>>> around a more methodical "cleanup" option, but those haven't been >>>>>>>>>>>>>> implemented, to the best of my knowledge. >>>>>>>>>>>>>> >>>>>>>>>>>>>> For instance, consider the steps of a FileIO: >>>>>>>>>>>>>> 1. Write to a bunch (N shards) of temporary files >>>>>>>>>>>>>> 2. When all temporary files are complete, attempt to do a >>>>>>>>>>>>>> bulk copy to put them in the final destination. >>>>>>>>>>>>>> 3. Cleanup all the temporary files. >>>>>>>>>>>>>> >>>>>>>>>>>>>> (This is often desirable because it minimizes the chance of >>>>>>>>>>>>>> seeing partial/incomplete results in the final destination). >>>>>>>>>>>>>> >>>>>>>>>>>>>> In the above, you'd want step 1 to execute on many workers, >>>>>>>>>>>>>> likely using a ParDo (say N different workers). >>>>>>>>>>>>>> The move step should only happen once, so on one worker. This >>>>>>>>>>>>>> means it will be a different DoFn, likely with some stuff done >>>>>>>>>>>>>> to ensure it >>>>>>>>>>>>>> runs on one worker. >>>>>>>>>>>>>> >>>>>>>>>>>>>> In such a case, cleanup / @TearDown of the DoFn is not >>>>>>>>>>>>>> enough. We need an API for a PTransform to schedule some cleanup >>>>>>>>>>>>>> work for >>>>>>>>>>>>>> when the transform is "done". In batch this is relatively >>>>>>>>>>>>>> straightforward, >>>>>>>>>>>>>> but doesn't exist. This is the source of some problems, such as >>>>>>>>>>>>>> BigQuery >>>>>>>>>>>>>> sink leaving files around that have failed to import into >>>>>>>>>>>>>> BigQuery. >>>>>>>>>>>>>> >>>>>>>>>>>>>> In streaming this is less straightforward -- do you want to >>>>>>>>>>>>>> wait until the end of the pipeline? Or do you want to wait until >>>>>>>>>>>>>> the end of >>>>>>>>>>>>>> the window? In practice, you just want to wait until you know >>>>>>>>>>>>>> nobody will >>>>>>>>>>>>>> need the resource anymore. >>>>>>>>>>>>>> >>>>>>>>>>>>>> This led to some discussions around a "cleanup" API, where >>>>>>>>>>>>>> you could have a transform that output resource objects. Each >>>>>>>>>>>>>> resource >>>>>>>>>>>>>> object would have logic for cleaning it up. And there would be >>>>>>>>>>>>>> something >>>>>>>>>>>>>> that indicated what parts of the pipeline needed that resource, >>>>>>>>>>>>>> and what >>>>>>>>>>>>>> kind of temporal lifetime those objects had. As soon as that >>>>>>>>>>>>>> part of the >>>>>>>>>>>>>> pipeline had advanced far enough that it would no longer need the >>>>>>>>>>>>>> resources, they would get cleaned up. This can be done at >>>>>>>>>>>>>> pipeline >>>>>>>>>>>>>> shutdown, or incrementally during a streaming pipeline, etc. >>>>>>>>>>>>>> >>>>>>>>>>>>>> Would something like this be a better fit for your use case? >>>>>>>>>>>>>> If not, why is handling teardown within a single DoFn sufficient? >>>>>>>>>>>>>> >>>>>>>>>>>>>> On Sun, Feb 18, 2018 at 11:53 AM Romain Manni-Bucau < >>>>>>>>>>>>>> [email protected]> wrote: >>>>>>>>>>>>>> >>>>>>>>>>>>>>> Yes 1M. Lets try to explain you simplifying the overall >>>>>>>>>>>>>>> execution. Each instance - one fn so likely in a thread of a >>>>>>>>>>>>>>> worker - has >>>>>>>>>>>>>>> its lifecycle. Caricaturally: "new" and garbage collection. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> In practise, new is often an unsafe allocate >>>>>>>>>>>>>>> (deserialization) but it doesnt matter here. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> What i want is any "new" to have a following setup before >>>>>>>>>>>>>>> any process or stattbundle and the last time beam has the >>>>>>>>>>>>>>> instance before >>>>>>>>>>>>>>> it is gc-ed and after last finishbundle it calls teardown. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> It is as simple as it. >>>>>>>>>>>>>>> This way no need to comibe fn in a way making a fn not self >>>>>>>>>>>>>>> contained to implement basic transforms. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> Le 18 févr. 2018 20:07, "Reuven Lax" <[email protected]> a >>>>>>>>>>>>>>> écrit : >>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> On Sun, Feb 18, 2018 at 10:50 AM, Romain Manni-Bucau < >>>>>>>>>>>>>>>> [email protected]> wrote: >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> Le 18 févr. 2018 19:28, "Ben Chambers" < >>>>>>>>>>>>>>>>> [email protected]> a écrit : >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> It feels like his thread may be a bit off-track. Rather >>>>>>>>>>>>>>>>> than focusing on the semantics of the existing methods -- >>>>>>>>>>>>>>>>> which have been >>>>>>>>>>>>>>>>> noted to be meet many existing use cases -- it would be >>>>>>>>>>>>>>>>> helpful to focus on >>>>>>>>>>>>>>>>> more on the reason you are looking for something with >>>>>>>>>>>>>>>>> different semantics. >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> Some possibilities (I'm not sure which one you are trying >>>>>>>>>>>>>>>>> to do): >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> 1. Clean-up some external, global resource, that was >>>>>>>>>>>>>>>>> initialized once during the startup of the pipeline. If this >>>>>>>>>>>>>>>>> is the case, >>>>>>>>>>>>>>>>> how are you ensuring it was really only initialized once (and >>>>>>>>>>>>>>>>> not once per >>>>>>>>>>>>>>>>> worker, per thread, per instance, etc.)? How do you know when >>>>>>>>>>>>>>>>> the pipeline >>>>>>>>>>>>>>>>> should release it? If the answer is "when it reaches step X", >>>>>>>>>>>>>>>>> then what >>>>>>>>>>>>>>>>> about a streaming pipeline? >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> When the dofn is no more needed logically ie when the >>>>>>>>>>>>>>>>> batch is done or stream is stopped (manually or by a jvm >>>>>>>>>>>>>>>>> shutdown) >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> I'm really not following what this means. >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> Let's say that a pipeline is running 1000 workers, and each >>>>>>>>>>>>>>>> worker is running 1000 threads (each running a copy of the >>>>>>>>>>>>>>>> same DoFn). How >>>>>>>>>>>>>>>> many cleanups do you want (do you want 1000 * 1000 = 1M >>>>>>>>>>>>>>>> cleanups) and when >>>>>>>>>>>>>>>> do you want it called? When the entire pipeline is shut down? >>>>>>>>>>>>>>>> When an >>>>>>>>>>>>>>>> individual worker is about to shut down (which may be >>>>>>>>>>>>>>>> temporary - may be >>>>>>>>>>>>>>>> about to start back up)? Something else? >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> 2. Finalize some resources that are used within some >>>>>>>>>>>>>>>>> region of the pipeline. While, the DoFn lifecycle methods are >>>>>>>>>>>>>>>>> not a good >>>>>>>>>>>>>>>>> fit for this (they are focused on managing resources within >>>>>>>>>>>>>>>>> the DoFn), you >>>>>>>>>>>>>>>>> could model this on how FileIO finalizes the files that it >>>>>>>>>>>>>>>>> produced. For >>>>>>>>>>>>>>>>> instance: >>>>>>>>>>>>>>>>> a) ParDo generates "resource IDs" (or some token that >>>>>>>>>>>>>>>>> stores information about resources) >>>>>>>>>>>>>>>>> b) "Require Deterministic Input" (to prevent retries >>>>>>>>>>>>>>>>> from changing resource IDs) >>>>>>>>>>>>>>>>> c) ParDo that initializes the resources >>>>>>>>>>>>>>>>> d) Pipeline segments that use the resources, and >>>>>>>>>>>>>>>>> eventually output the fact they're done >>>>>>>>>>>>>>>>> e) "Require Deterministic Input" >>>>>>>>>>>>>>>>> f) ParDo that frees the resources >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> By making the use of the resource part of the data it is >>>>>>>>>>>>>>>>> possible to "checkpoint" which resources may be in use or >>>>>>>>>>>>>>>>> have been >>>>>>>>>>>>>>>>> finished by using the require deterministic input. This is >>>>>>>>>>>>>>>>> important to >>>>>>>>>>>>>>>>> ensuring everything is actually cleaned up. >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> I nees that but generic and not case by case to >>>>>>>>>>>>>>>>> industrialize some api on top of beam. >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> 3. Some other use case that I may be missing? If it is >>>>>>>>>>>>>>>>> this case, could you elaborate on what you are trying to >>>>>>>>>>>>>>>>> accomplish? That >>>>>>>>>>>>>>>>> would help me understand both the problems with existing >>>>>>>>>>>>>>>>> options and >>>>>>>>>>>>>>>>> possibly what could be done to help. >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> I understand there are sorkaround for almost all cases but >>>>>>>>>>>>>>>>> means each transform is different in its lifecycle handling >>>>>>>>>>>>>>>>> except i >>>>>>>>>>>>>>>>> dislike it a lot at a scale and as a user since you cant put >>>>>>>>>>>>>>>>> any unified >>>>>>>>>>>>>>>>> practise on top of beam, it also makes beam very hard to >>>>>>>>>>>>>>>>> integrate or to >>>>>>>>>>>>>>>>> use to build higher level libraries or softwares. >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> This is why i tried to not start the workaround >>>>>>>>>>>>>>>>> discussions and just stay at API level. >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> -- Ben >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> On Sun, Feb 18, 2018 at 9:56 AM Romain Manni-Bucau < >>>>>>>>>>>>>>>>> [email protected]> wrote: >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> 2018-02-18 18:36 GMT+01:00 Eugene Kirpichov < >>>>>>>>>>>>>>>>>> [email protected]>: >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> "Machine state" is overly low-level because many of the >>>>>>>>>>>>>>>>>>> possible reasons can happen on a perfectly fine machine. >>>>>>>>>>>>>>>>>>> If you'd like to rephrase it to "it will be called >>>>>>>>>>>>>>>>>>> except in various situations where it's logically >>>>>>>>>>>>>>>>>>> impossible or impractical >>>>>>>>>>>>>>>>>>> to guarantee that it's called", that's fine. Or you can >>>>>>>>>>>>>>>>>>> list some of the >>>>>>>>>>>>>>>>>>> examples above. >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> Sounds ok to me >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> The main point for the user is, you *will* see >>>>>>>>>>>>>>>>>>> non-preventable situations where it couldn't be called - >>>>>>>>>>>>>>>>>>> it's not just >>>>>>>>>>>>>>>>>>> intergalactic crashes - so if the logic is very important >>>>>>>>>>>>>>>>>>> (e.g. cleaning up >>>>>>>>>>>>>>>>>>> a large amount of temporary files, shutting down a large >>>>>>>>>>>>>>>>>>> number of VMs you >>>>>>>>>>>>>>>>>>> started etc), you have to express it using one of the other >>>>>>>>>>>>>>>>>>> methods that >>>>>>>>>>>>>>>>>>> have stricter guarantees (which obviously come at a cost, >>>>>>>>>>>>>>>>>>> e.g. no >>>>>>>>>>>>>>>>>>> pass-by-reference). >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> FinishBundle has the exact same guarantee sadly so not >>>>>>>>>>>>>>>>>> which which other method you speak about. Concretely if you >>>>>>>>>>>>>>>>>> make it really >>>>>>>>>>>>>>>>>> unreliable - this is what best effort sounds to me - then >>>>>>>>>>>>>>>>>> users can use it >>>>>>>>>>>>>>>>>> to clean anything but if you make it "can happen but it is >>>>>>>>>>>>>>>>>> unexpected and >>>>>>>>>>>>>>>>>> means something happent" then it is fine to have a manual - >>>>>>>>>>>>>>>>>> or auto if >>>>>>>>>>>>>>>>>> fancy - recovery procedure. This is where it makes all the >>>>>>>>>>>>>>>>>> difference and >>>>>>>>>>>>>>>>>> impacts the developpers, ops (all users basically). >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> On Sun, Feb 18, 2018 at 9:16 AM Romain Manni-Bucau < >>>>>>>>>>>>>>>>>>> [email protected]> wrote: >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> Agree Eugene except that "best effort" means that. It >>>>>>>>>>>>>>>>>>>> is also often used to say "at will" and this is what >>>>>>>>>>>>>>>>>>>> triggered this thread. >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> I'm fine using "except if the machine state prevents >>>>>>>>>>>>>>>>>>>> it" but "best effort" is too open and can be very badly >>>>>>>>>>>>>>>>>>>> and wrongly >>>>>>>>>>>>>>>>>>>> perceived by users (like I did). >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> Romain Manni-Bucau >>>>>>>>>>>>>>>>>>>> @rmannibucau <https://twitter.com/rmannibucau> | Blog >>>>>>>>>>>>>>>>>>>> <https://rmannibucau.metawerx.net/> | Old Blog >>>>>>>>>>>>>>>>>>>> <http://rmannibucau.wordpress.com> | Github >>>>>>>>>>>>>>>>>>>> <https://github.com/rmannibucau> | LinkedIn >>>>>>>>>>>>>>>>>>>> <https://www.linkedin.com/in/rmannibucau> | Book >>>>>>>>>>>>>>>>>>>> <https://www.packtpub.com/application-development/java-ee-8-high-performance> >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> 2018-02-18 18:13 GMT+01:00 Eugene Kirpichov < >>>>>>>>>>>>>>>>>>>> [email protected]>: >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> It will not be called if it's impossible to call it: >>>>>>>>>>>>>>>>>>>>> in the example situation you have (intergalactic crash), >>>>>>>>>>>>>>>>>>>>> and in a number of >>>>>>>>>>>>>>>>>>>>> more common cases: eg in case the worker container has >>>>>>>>>>>>>>>>>>>>> crashed (eg user >>>>>>>>>>>>>>>>>>>>> code in a different thread called a C library over JNI >>>>>>>>>>>>>>>>>>>>> and it segfaulted), >>>>>>>>>>>>>>>>>>>>> JVM bug, crash due to user code OOM, in case the worker >>>>>>>>>>>>>>>>>>>>> has lost network >>>>>>>>>>>>>>>>>>>>> connectivity (then it may be called but it won't be able >>>>>>>>>>>>>>>>>>>>> to do anything >>>>>>>>>>>>>>>>>>>>> useful), in case this is running on a preemptible VM and >>>>>>>>>>>>>>>>>>>>> it was preempted >>>>>>>>>>>>>>>>>>>>> by the underlying cluster manager without notice or if >>>>>>>>>>>>>>>>>>>>> the worker was too >>>>>>>>>>>>>>>>>>>>> busy with other stuff (eg calling other Teardown >>>>>>>>>>>>>>>>>>>>> functions) until the >>>>>>>>>>>>>>>>>>>>> preemption timeout elapsed, in case the underlying >>>>>>>>>>>>>>>>>>>>> hardware simply failed >>>>>>>>>>>>>>>>>>>>> (which happens quite often at scale), and in many other >>>>>>>>>>>>>>>>>>>>> conditions. >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> "Best effort" is the commonly used term to describe >>>>>>>>>>>>>>>>>>>>> such behavior. Please feel free to file bugs for cases >>>>>>>>>>>>>>>>>>>>> where you observed a >>>>>>>>>>>>>>>>>>>>> runner not call Teardown in a situation where it was >>>>>>>>>>>>>>>>>>>>> possible to call it >>>>>>>>>>>>>>>>>>>>> but the runner made insufficient effort. >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> On Sun, Feb 18, 2018, 9:02 AM Romain Manni-Bucau < >>>>>>>>>>>>>>>>>>>>> [email protected]> wrote: >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> 2018-02-18 18:00 GMT+01:00 Eugene Kirpichov < >>>>>>>>>>>>>>>>>>>>>> [email protected]>: >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> On Sun, Feb 18, 2018, 2:06 AM Romain Manni-Bucau < >>>>>>>>>>>>>>>>>>>>>>> [email protected]> wrote: >>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>> Le 18 févr. 2018 00:23, "Kenneth Knowles" < >>>>>>>>>>>>>>>>>>>>>>>> [email protected]> a écrit : >>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>> On Sat, Feb 17, 2018 at 3:09 PM, Romain Manni-Bucau >>>>>>>>>>>>>>>>>>>>>>>> <[email protected]> wrote: >>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>> If you give an example of a high-level need (e.g. >>>>>>>>>>>>>>>>>>>>>>>>> "I'm trying to write an IO for system $x and it >>>>>>>>>>>>>>>>>>>>>>>>> requires the following >>>>>>>>>>>>>>>>>>>>>>>>> initialization and the following cleanup logic and >>>>>>>>>>>>>>>>>>>>>>>>> the following processing >>>>>>>>>>>>>>>>>>>>>>>>> in between") I'll be better able to help you. >>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>>> Take a simple example of a transform requiring a >>>>>>>>>>>>>>>>>>>>>>>>> connection. Using bundles is a perf killer since size >>>>>>>>>>>>>>>>>>>>>>>>> is not controlled. >>>>>>>>>>>>>>>>>>>>>>>>> Using teardown doesnt allow you to release the >>>>>>>>>>>>>>>>>>>>>>>>> connection since it is a >>>>>>>>>>>>>>>>>>>>>>>>> best effort thing. Not releasing the connection makes >>>>>>>>>>>>>>>>>>>>>>>>> you pay a lot - aws >>>>>>>>>>>>>>>>>>>>>>>>> ;) - or prevents you to launch other processings - >>>>>>>>>>>>>>>>>>>>>>>>> concurrent limit. >>>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>> For this example @Teardown is an exact fit. If >>>>>>>>>>>>>>>>>>>>>>>> things die so badly that @Teardown is not called then >>>>>>>>>>>>>>>>>>>>>>>> nothing else can be >>>>>>>>>>>>>>>>>>>>>>>> called to close the connection either. What AWS >>>>>>>>>>>>>>>>>>>>>>>> service are you thinking of >>>>>>>>>>>>>>>>>>>>>>>> that stays open for a long time when everything at the >>>>>>>>>>>>>>>>>>>>>>>> other end has died? >>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>> You assume connections are kind of stateless but >>>>>>>>>>>>>>>>>>>>>>>> some (proprietary) protocols requires some closing >>>>>>>>>>>>>>>>>>>>>>>> exchanges which are not >>>>>>>>>>>>>>>>>>>>>>>> only "im leaving". >>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>> For aws i was thinking about starting some services >>>>>>>>>>>>>>>>>>>>>>>> - machines - on the fly in a pipeline startup and >>>>>>>>>>>>>>>>>>>>>>>> closing them at the end. >>>>>>>>>>>>>>>>>>>>>>>> If teardown is not called you leak machines and money. >>>>>>>>>>>>>>>>>>>>>>>> You can say it can >>>>>>>>>>>>>>>>>>>>>>>> be done another way...as the full pipeline ;). >>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>> I dont want to be picky but if beam cant handle its >>>>>>>>>>>>>>>>>>>>>>>> components lifecycle it can be used at scale for >>>>>>>>>>>>>>>>>>>>>>>> generic pipelines and if >>>>>>>>>>>>>>>>>>>>>>>> bound to some particular IO. >>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>> What does prevent to enforce teardown - ignoring >>>>>>>>>>>>>>>>>>>>>>>> the interstellar crash case which cant be handled by >>>>>>>>>>>>>>>>>>>>>>>> any human system? >>>>>>>>>>>>>>>>>>>>>>>> Nothing technically. Why do you push to not handle it? >>>>>>>>>>>>>>>>>>>>>>>> Is it due to some >>>>>>>>>>>>>>>>>>>>>>>> legacy code on dataflow or something else? >>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> Teardown *is* already documented and implemented >>>>>>>>>>>>>>>>>>>>>>> this way (best-effort). So I'm not sure what kind of >>>>>>>>>>>>>>>>>>>>>>> change you're asking >>>>>>>>>>>>>>>>>>>>>>> for. >>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> Remove "best effort" from the javadoc. If it is not >>>>>>>>>>>>>>>>>>>>>> call then it is a bug and we are done :). >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>> Also what does it mean for the users? Direct runner >>>>>>>>>>>>>>>>>>>>>>>> does it so if a user udes the RI in test, he will get >>>>>>>>>>>>>>>>>>>>>>>> a different behavior >>>>>>>>>>>>>>>>>>>>>>>> in prod? Also dont forget the user doesnt know what >>>>>>>>>>>>>>>>>>>>>>>> the IOs he composes use >>>>>>>>>>>>>>>>>>>>>>>> so this is so impacting for the whole product than he >>>>>>>>>>>>>>>>>>>>>>>> must be handled IMHO. >>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>> I understand the portability culture is new in big >>>>>>>>>>>>>>>>>>>>>>>> data world but it is not a reason to ignore what >>>>>>>>>>>>>>>>>>>>>>>> people did for years and >>>>>>>>>>>>>>>>>>>>>>>> do it wrong before doing right ;). >>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>> My proposal is to list what can prevent to >>>>>>>>>>>>>>>>>>>>>>>> guarantee - in the normal IT conditions - the >>>>>>>>>>>>>>>>>>>>>>>> execution of teardown. Then >>>>>>>>>>>>>>>>>>>>>>>> we see if we can handle it and only if there is a >>>>>>>>>>>>>>>>>>>>>>>> technical reason we cant >>>>>>>>>>>>>>>>>>>>>>>> we make it experimental/unsupported in the api. I know >>>>>>>>>>>>>>>>>>>>>>>> spark and flink can, >>>>>>>>>>>>>>>>>>>>>>>> any unknown blocker for other runners? >>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>> Technical note: even a kill should go through java >>>>>>>>>>>>>>>>>>>>>>>> shutdown hooks otherwise your environment (beam >>>>>>>>>>>>>>>>>>>>>>>> enclosing software) is >>>>>>>>>>>>>>>>>>>>>>>> fully unhandled and your overall system is >>>>>>>>>>>>>>>>>>>>>>>> uncontrolled. Only case where it >>>>>>>>>>>>>>>>>>>>>>>> is not true is when the software is always owned by a >>>>>>>>>>>>>>>>>>>>>>>> vendor and never >>>>>>>>>>>>>>>>>>>>>>>> installed on customer environment. In this case it >>>>>>>>>>>>>>>>>>>>>>>> belongd to the vendor to >>>>>>>>>>>>>>>>>>>>>>>> handle beam API and not to beam to adjust its API for >>>>>>>>>>>>>>>>>>>>>>>> a vendor - otherwise >>>>>>>>>>>>>>>>>>>>>>>> all unsupported features by one runner should be made >>>>>>>>>>>>>>>>>>>>>>>> optional right? >>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>> All state is not about network, even in distributed >>>>>>>>>>>>>>>>>>>>>>>> systems so this is key to have an explicit and defined >>>>>>>>>>>>>>>>>>>>>>>> lifecycle. >>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>> Kenn >>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>> >>>>>>>>> >>>>>>>> >> > >
