So what would you like to happen if there is a crash? The DoFn instance no longer exists because the JVM it ran on no longer exists. What should Teardown be called on?
On Mon, Feb 19, 2018, 10:20 AM Romain Manni-Bucau <rmannibu...@gmail.com> wrote: > This is what i want and not 999999 teardowns for 1000000 setups until > there is an unexpected crash (= a bug). > > Le 19 févr. 2018 18:57, "Reuven Lax" <re...@google.com> a écrit : > >> >> >> On Mon, Feb 19, 2018 at 7:11 AM, Romain Manni-Bucau < >> rmannibu...@gmail.com> wrote: >> >>> >>> >>> 2018-02-19 15:57 GMT+01:00 Reuven Lax <re...@google.com>: >>> >>>> >>>> >>>> On Mon, Feb 19, 2018 at 12:35 AM, Romain Manni-Bucau < >>>> rmannibu...@gmail.com> wrote: >>>> >>>>> @Reuven: in practise it is created by pool of 256 but leads to the >>>>> same pattern, the teardown is just a "if (iCreatedThem) releaseThem();" >>>>> >>>> >>>> How do you control "256?" Even if you have a pool of 256 workers, >>>> nothing in Beam guarantees how many threads and DoFns are created per >>>> worker. In theory the runner might decide to create 1000 threads on each >>>> worker. >>>> >>> >>> Nop was the other way around, in this case on AWS you can get 256 >>> instances at once but not 512 (which will be 2x256). So when you compute >>> the distribution you allocate to some fn the role to own the instance >>> lookup and releasing. >>> >> >> I still don't understand. Let's be more precise. If you write the >> following code: >> >> pCollection.apply(ParDo.of(new MyDoFn())); >> >> There is no way to control how many instances of MyDoFn are created. The >> runner might decided to create a million instances of this class across >> your worker pool, which means that you will get a million Setup and >> Teardown calls. >> >> >>> Anyway this was just an example of an external resource you must >>> release. Real topic is that beam should define asap a guaranteed generic >>> lifecycle to let user embrace its programming model. >>> >>> >>>> >>>> >>>> >>>>> @Eugene: >>>>> 1. wait logic is about passing the value which is not always possible >>>>> (like 15% of cases from my raw estimate) >>>>> 2. sdf: i'll try to detail why i mention SDF more here >>>>> >>>>> >>>>> Concretely beam exposes a portable API (included in the SDK core). >>>>> This API defines a *container* API and therefore implies bean lifecycles. >>>>> I'll not detail them all but just use the sources and dofn (not sdf) to >>>>> illustrate the idea I'm trying to develop. >>>>> >>>>> A. Source >>>>> >>>>> A source computes a partition plan with 2 primitives: estimateSize and >>>>> split. As an user you can expect both to be called on the same bean >>>>> instance to avoid to pay the same connection cost(s) twice. Concretely: >>>>> >>>>> connect() >>>>> try { >>>>> estimateSize() >>>>> split() >>>>> } finally { >>>>> disconnect() >>>>> } >>>>> >>>>> this is not guaranteed by the API so you must do: >>>>> >>>>> connect() >>>>> try { >>>>> estimateSize() >>>>> } finally { >>>>> disconnect() >>>>> } >>>>> connect() >>>>> try { >>>>> split() >>>>> } finally { >>>>> disconnect() >>>>> } >>>>> >>>>> + a workaround with an internal estimate size since this primitive is >>>>> often called in split but you dont want to connect twice in the second >>>>> phase. >>>>> >>>>> Why do you need that? Simply cause you want to define an API to >>>>> implement sources which initializes the source bean and destroys it. >>>>> I insists it is a very very basic concern for such API. However beam >>>>> doesn't embraces it and doesn't assume it so building any API on top of >>>>> beam is very hurtful today and for direct beam users you hit the exact >>>>> same >>>>> issues - check how IO are implemented, the static utilities which create >>>>> volatile connections preventing to reuse existing connection in a single >>>>> method ( >>>>> https://github.com/apache/beam/blob/master/sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java#L862 >>>>> ). >>>>> >>>>> Same logic applies to the reader which is then created. >>>>> >>>>> B. DoFn & SDF >>>>> >>>>> As a fn dev you expect the same from the beam runtime: init(); try { >>>>> while (...) process(); } finally { destroy(); } and that it is executed on >>>>> the exact same instance to be able to be stateful at that level for >>>>> expensive connections/operations/flow state handling. >>>>> >>>>> As you mentionned with the million example, this sequence should >>>>> happen for each single instance so 1M times for your example. >>>>> >>>>> Now why did I mention SDF several times? Because SDF is a >>>>> generalisation of both cases (source and dofn). Therefore it creates way >>>>> more instances and requires to have a way more strict/explicit definition >>>>> of the exact lifecycle and which instance does what. Since beam handles >>>>> the >>>>> full lifecycle of the bean instances it must provide init/destroy hooks >>>>> (setup/teardown) which can be stateful. >>>>> >>>>> If you take the JDBC example which was mentionned earlier. Today, >>>>> because of the teardown issue it uses bundles. Since bundles size is not >>>>> defined - and will not with SDF, it must use a pool to be able to reuse a >>>>> connection instance to not correct performances. Now with the SDF and the >>>>> split increase, how do you handle the pool size? Generally in batch you >>>>> use >>>>> a single connection per thread to avoid to consume all database >>>>> connections. With a pool you have 2 choices: 1. use a pool of 1, 2. use a >>>>> pool a bit higher but multiplied by the number of beans you will likely x2 >>>>> or 3 the connection count and make the execution fail with "no more >>>>> connection available". I you picked 1 (pool of #1), then you still have to >>>>> have a reliable teardown by pool instance (close() generally) to ensure >>>>> you >>>>> release the pool and don't leak the connection information in the JVM. In >>>>> all case you come back to the init()/destroy() lifecycle even if you fake >>>>> to get connections with bundles. >>>>> >>>>> Just to make it obvious: SDF mentions are just cause SDF imply all the >>>>> current issues with the loose definition of the bean lifecycles at an >>>>> exponential level, nothing else. >>>>> >>>>> >>>>> >>>>> Romain Manni-Bucau >>>>> @rmannibucau <https://twitter.com/rmannibucau> | Blog >>>>> <https://rmannibucau.metawerx.net/> | Old Blog >>>>> <http://rmannibucau.wordpress.com> | Github >>>>> <https://github.com/rmannibucau> | LinkedIn >>>>> <https://www.linkedin.com/in/rmannibucau> | Book >>>>> <https://www.packtpub.com/application-development/java-ee-8-high-performance> >>>>> >>>>> 2018-02-18 22:32 GMT+01:00 Eugene Kirpichov <kirpic...@google.com>: >>>>> >>>>>> The kind of whole-transform lifecycle you're mentioning can be >>>>>> accomplished using the Wait transform as I suggested in the thread above, >>>>>> and I believe it should become the canonical way to do that. >>>>>> >>>>>> (Would like to reiterate one more time, as the main author of most >>>>>> design documents related to SDF and of its implementation in the Java >>>>>> direct and dataflow runner that SDF is fully unrelated to the topic of >>>>>> cleanup - I'm very confused as to why it keeps coming up) >>>>>> >>>>>> On Sun, Feb 18, 2018, 1:15 PM Romain Manni-Bucau < >>>>>> rmannibu...@gmail.com> wrote: >>>>>> >>>>>>> I kind of agree except transforms lack a lifecycle too. My >>>>>>> understanding is that sdf could be a way to unify it and clean the api. >>>>>>> >>>>>>> Otherwise how to normalize - single api - lifecycle of transforms? >>>>>>> >>>>>>> Le 18 févr. 2018 21:32, "Ben Chambers" <bchamb...@apache.org> a >>>>>>> écrit : >>>>>>> >>>>>>>> Are you sure that focusing on the cleanup of specific DoFn's is >>>>>>>> appropriate? Many cases where cleanup is necessary, it is around an >>>>>>>> entire >>>>>>>> composite PTransform. I think there have been discussions/proposals >>>>>>>> around >>>>>>>> a more methodical "cleanup" option, but those haven't been >>>>>>>> implemented, to >>>>>>>> the best of my knowledge. >>>>>>>> >>>>>>>> For instance, consider the steps of a FileIO: >>>>>>>> 1. Write to a bunch (N shards) of temporary files >>>>>>>> 2. When all temporary files are complete, attempt to do a bulk copy >>>>>>>> to put them in the final destination. >>>>>>>> 3. Cleanup all the temporary files. >>>>>>>> >>>>>>>> (This is often desirable because it minimizes the chance of seeing >>>>>>>> partial/incomplete results in the final destination). >>>>>>>> >>>>>>>> In the above, you'd want step 1 to execute on many workers, likely >>>>>>>> using a ParDo (say N different workers). >>>>>>>> The move step should only happen once, so on one worker. This means >>>>>>>> it will be a different DoFn, likely with some stuff done to ensure it >>>>>>>> runs >>>>>>>> on one worker. >>>>>>>> >>>>>>>> In such a case, cleanup / @TearDown of the DoFn is not enough. We >>>>>>>> need an API for a PTransform to schedule some cleanup work for when the >>>>>>>> transform is "done". In batch this is relatively straightforward, but >>>>>>>> doesn't exist. This is the source of some problems, such as BigQuery >>>>>>>> sink >>>>>>>> leaving files around that have failed to import into BigQuery. >>>>>>>> >>>>>>>> In streaming this is less straightforward -- do you want to wait >>>>>>>> until the end of the pipeline? Or do you want to wait until the end of >>>>>>>> the >>>>>>>> window? In practice, you just want to wait until you know nobody will >>>>>>>> need >>>>>>>> the resource anymore. >>>>>>>> >>>>>>>> This led to some discussions around a "cleanup" API, where you >>>>>>>> could have a transform that output resource objects. Each resource >>>>>>>> object >>>>>>>> would have logic for cleaning it up. And there would be something that >>>>>>>> indicated what parts of the pipeline needed that resource, and what >>>>>>>> kind of >>>>>>>> temporal lifetime those objects had. As soon as that part of the >>>>>>>> pipeline >>>>>>>> had advanced far enough that it would no longer need the resources, >>>>>>>> they >>>>>>>> would get cleaned up. This can be done at pipeline shutdown, or >>>>>>>> incrementally during a streaming pipeline, etc. >>>>>>>> >>>>>>>> Would something like this be a better fit for your use case? If >>>>>>>> not, why is handling teardown within a single DoFn sufficient? >>>>>>>> >>>>>>>> On Sun, Feb 18, 2018 at 11:53 AM Romain Manni-Bucau < >>>>>>>> rmannibu...@gmail.com> wrote: >>>>>>>> >>>>>>>>> Yes 1M. Lets try to explain you simplifying the overall execution. >>>>>>>>> Each instance - one fn so likely in a thread of a worker - has its >>>>>>>>> lifecycle. Caricaturally: "new" and garbage collection. >>>>>>>>> >>>>>>>>> In practise, new is often an unsafe allocate (deserialization) but >>>>>>>>> it doesnt matter here. >>>>>>>>> >>>>>>>>> What i want is any "new" to have a following setup before any >>>>>>>>> process or stattbundle and the last time beam has the instance before >>>>>>>>> it is >>>>>>>>> gc-ed and after last finishbundle it calls teardown. >>>>>>>>> >>>>>>>>> It is as simple as it. >>>>>>>>> This way no need to comibe fn in a way making a fn not self >>>>>>>>> contained to implement basic transforms. >>>>>>>>> >>>>>>>>> Le 18 févr. 2018 20:07, "Reuven Lax" <re...@google.com> a écrit : >>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> On Sun, Feb 18, 2018 at 10:50 AM, Romain Manni-Bucau < >>>>>>>>>> rmannibu...@gmail.com> wrote: >>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> Le 18 févr. 2018 19:28, "Ben Chambers" <bchamb...@apache.org> a >>>>>>>>>>> écrit : >>>>>>>>>>> >>>>>>>>>>> It feels like his thread may be a bit off-track. Rather than >>>>>>>>>>> focusing on the semantics of the existing methods -- which have >>>>>>>>>>> been noted >>>>>>>>>>> to be meet many existing use cases -- it would be helpful to focus >>>>>>>>>>> on more >>>>>>>>>>> on the reason you are looking for something with different >>>>>>>>>>> semantics. >>>>>>>>>>> >>>>>>>>>>> Some possibilities (I'm not sure which one you are trying to do): >>>>>>>>>>> >>>>>>>>>>> 1. Clean-up some external, global resource, that was initialized >>>>>>>>>>> once during the startup of the pipeline. If this is the case, how >>>>>>>>>>> are you >>>>>>>>>>> ensuring it was really only initialized once (and not once per >>>>>>>>>>> worker, per >>>>>>>>>>> thread, per instance, etc.)? How do you know when the pipeline >>>>>>>>>>> should >>>>>>>>>>> release it? If the answer is "when it reaches step X", then what >>>>>>>>>>> about a >>>>>>>>>>> streaming pipeline? >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> When the dofn is no more needed logically ie when the batch is >>>>>>>>>>> done or stream is stopped (manually or by a jvm shutdown) >>>>>>>>>>> >>>>>>>>>> >>>>>>>>>> I'm really not following what this means. >>>>>>>>>> >>>>>>>>>> Let's say that a pipeline is running 1000 workers, and each >>>>>>>>>> worker is running 1000 threads (each running a copy of the same >>>>>>>>>> DoFn). How >>>>>>>>>> many cleanups do you want (do you want 1000 * 1000 = 1M cleanups) >>>>>>>>>> and when >>>>>>>>>> do you want it called? When the entire pipeline is shut down? When an >>>>>>>>>> individual worker is about to shut down (which may be temporary - >>>>>>>>>> may be >>>>>>>>>> about to start back up)? Something else? >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> 2. Finalize some resources that are used within some region of >>>>>>>>>>> the pipeline. While, the DoFn lifecycle methods are not a good fit >>>>>>>>>>> for this >>>>>>>>>>> (they are focused on managing resources within the DoFn), you could >>>>>>>>>>> model >>>>>>>>>>> this on how FileIO finalizes the files that it produced. For >>>>>>>>>>> instance: >>>>>>>>>>> a) ParDo generates "resource IDs" (or some token that stores >>>>>>>>>>> information about resources) >>>>>>>>>>> b) "Require Deterministic Input" (to prevent retries from >>>>>>>>>>> changing resource IDs) >>>>>>>>>>> c) ParDo that initializes the resources >>>>>>>>>>> d) Pipeline segments that use the resources, and eventually >>>>>>>>>>> output the fact they're done >>>>>>>>>>> e) "Require Deterministic Input" >>>>>>>>>>> f) ParDo that frees the resources >>>>>>>>>>> >>>>>>>>>>> By making the use of the resource part of the data it is >>>>>>>>>>> possible to "checkpoint" which resources may be in use or have been >>>>>>>>>>> finished by using the require deterministic input. This is >>>>>>>>>>> important to >>>>>>>>>>> ensuring everything is actually cleaned up. >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> I nees that but generic and not case by case to industrialize >>>>>>>>>>> some api on top of beam. >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> 3. Some other use case that I may be missing? If it is this >>>>>>>>>>> case, could you elaborate on what you are trying to accomplish? >>>>>>>>>>> That would >>>>>>>>>>> help me understand both the problems with existing options and >>>>>>>>>>> possibly >>>>>>>>>>> what could be done to help. >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> I understand there are sorkaround for almost all cases but means >>>>>>>>>>> each transform is different in its lifecycle handling except i >>>>>>>>>>> dislike it >>>>>>>>>>> a lot at a scale and as a user since you cant put any unified >>>>>>>>>>> practise on >>>>>>>>>>> top of beam, it also makes beam very hard to integrate or to use to >>>>>>>>>>> build >>>>>>>>>>> higher level libraries or softwares. >>>>>>>>>>> >>>>>>>>>>> This is why i tried to not start the workaround discussions and >>>>>>>>>>> just stay at API level. >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> -- Ben >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> On Sun, Feb 18, 2018 at 9:56 AM Romain Manni-Bucau < >>>>>>>>>>> rmannibu...@gmail.com> wrote: >>>>>>>>>>> >>>>>>>>>>>> 2018-02-18 18:36 GMT+01:00 Eugene Kirpichov < >>>>>>>>>>>> kirpic...@google.com>: >>>>>>>>>>>> >>>>>>>>>>>>> "Machine state" is overly low-level because many of the >>>>>>>>>>>>> possible reasons can happen on a perfectly fine machine. >>>>>>>>>>>>> If you'd like to rephrase it to "it will be called except in >>>>>>>>>>>>> various situations where it's logically impossible or impractical >>>>>>>>>>>>> to >>>>>>>>>>>>> guarantee that it's called", that's fine. Or you can list some of >>>>>>>>>>>>> the >>>>>>>>>>>>> examples above. >>>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> Sounds ok to me >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> The main point for the user is, you *will* see non-preventable >>>>>>>>>>>>> situations where it couldn't be called - it's not just >>>>>>>>>>>>> intergalactic >>>>>>>>>>>>> crashes - so if the logic is very important (e.g. cleaning up a >>>>>>>>>>>>> large >>>>>>>>>>>>> amount of temporary files, shutting down a large number of VMs >>>>>>>>>>>>> you started >>>>>>>>>>>>> etc), you have to express it using one of the other methods that >>>>>>>>>>>>> have >>>>>>>>>>>>> stricter guarantees (which obviously come at a cost, e.g. no >>>>>>>>>>>>> pass-by-reference). >>>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> FinishBundle has the exact same guarantee sadly so not which >>>>>>>>>>>> which other method you speak about. Concretely if you make it >>>>>>>>>>>> really >>>>>>>>>>>> unreliable - this is what best effort sounds to me - then users >>>>>>>>>>>> can use it >>>>>>>>>>>> to clean anything but if you make it "can happen but it is >>>>>>>>>>>> unexpected and >>>>>>>>>>>> means something happent" then it is fine to have a manual - or >>>>>>>>>>>> auto if >>>>>>>>>>>> fancy - recovery procedure. This is where it makes all the >>>>>>>>>>>> difference and >>>>>>>>>>>> impacts the developpers, ops (all users basically). >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> On Sun, Feb 18, 2018 at 9:16 AM Romain Manni-Bucau < >>>>>>>>>>>>> rmannibu...@gmail.com> wrote: >>>>>>>>>>>>> >>>>>>>>>>>>>> Agree Eugene except that "best effort" means that. It is also >>>>>>>>>>>>>> often used to say "at will" and this is what triggered this >>>>>>>>>>>>>> thread. >>>>>>>>>>>>>> >>>>>>>>>>>>>> I'm fine using "except if the machine state prevents it" but >>>>>>>>>>>>>> "best effort" is too open and can be very badly and wrongly >>>>>>>>>>>>>> perceived by >>>>>>>>>>>>>> users (like I did). >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> Romain Manni-Bucau >>>>>>>>>>>>>> @rmannibucau <https://twitter.com/rmannibucau> | Blog >>>>>>>>>>>>>> <https://rmannibucau.metawerx.net/> | Old Blog >>>>>>>>>>>>>> <http://rmannibucau.wordpress.com> | Github >>>>>>>>>>>>>> <https://github.com/rmannibucau> | LinkedIn >>>>>>>>>>>>>> <https://www.linkedin.com/in/rmannibucau> | Book >>>>>>>>>>>>>> <https://www.packtpub.com/application-development/java-ee-8-high-performance> >>>>>>>>>>>>>> >>>>>>>>>>>>>> 2018-02-18 18:13 GMT+01:00 Eugene Kirpichov < >>>>>>>>>>>>>> kirpic...@google.com>: >>>>>>>>>>>>>> >>>>>>>>>>>>>>> It will not be called if it's impossible to call it: in the >>>>>>>>>>>>>>> example situation you have (intergalactic crash), and in a >>>>>>>>>>>>>>> number of more >>>>>>>>>>>>>>> common cases: eg in case the worker container has crashed (eg >>>>>>>>>>>>>>> user code in >>>>>>>>>>>>>>> a different thread called a C library over JNI and it >>>>>>>>>>>>>>> segfaulted), JVM bug, >>>>>>>>>>>>>>> crash due to user code OOM, in case the worker has lost network >>>>>>>>>>>>>>> connectivity (then it may be called but it won't be able to do >>>>>>>>>>>>>>> anything >>>>>>>>>>>>>>> useful), in case this is running on a preemptible VM and it was >>>>>>>>>>>>>>> preempted >>>>>>>>>>>>>>> by the underlying cluster manager without notice or if the >>>>>>>>>>>>>>> worker was too >>>>>>>>>>>>>>> busy with other stuff (eg calling other Teardown functions) >>>>>>>>>>>>>>> until the >>>>>>>>>>>>>>> preemption timeout elapsed, in case the underlying hardware >>>>>>>>>>>>>>> simply failed >>>>>>>>>>>>>>> (which happens quite often at scale), and in many other >>>>>>>>>>>>>>> conditions. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> "Best effort" is the commonly used term to describe such >>>>>>>>>>>>>>> behavior. Please feel free to file bugs for cases where you >>>>>>>>>>>>>>> observed a >>>>>>>>>>>>>>> runner not call Teardown in a situation where it was possible >>>>>>>>>>>>>>> to call it >>>>>>>>>>>>>>> but the runner made insufficient effort. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> On Sun, Feb 18, 2018, 9:02 AM Romain Manni-Bucau < >>>>>>>>>>>>>>> rmannibu...@gmail.com> wrote: >>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> 2018-02-18 18:00 GMT+01:00 Eugene Kirpichov < >>>>>>>>>>>>>>>> kirpic...@google.com>: >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> On Sun, Feb 18, 2018, 2:06 AM Romain Manni-Bucau < >>>>>>>>>>>>>>>>> rmannibu...@gmail.com> wrote: >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> Le 18 févr. 2018 00:23, "Kenneth Knowles" <k...@google.com> >>>>>>>>>>>>>>>>>> a écrit : >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> On Sat, Feb 17, 2018 at 3:09 PM, Romain Manni-Bucau < >>>>>>>>>>>>>>>>>> rmannibu...@gmail.com> wrote: >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> If you give an example of a high-level need (e.g. "I'm >>>>>>>>>>>>>>>>>>> trying to write an IO for system $x and it requires the >>>>>>>>>>>>>>>>>>> following >>>>>>>>>>>>>>>>>>> initialization and the following cleanup logic and the >>>>>>>>>>>>>>>>>>> following processing >>>>>>>>>>>>>>>>>>> in between") I'll be better able to help you. >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> Take a simple example of a transform requiring a >>>>>>>>>>>>>>>>>>> connection. Using bundles is a perf killer since size is >>>>>>>>>>>>>>>>>>> not controlled. >>>>>>>>>>>>>>>>>>> Using teardown doesnt allow you to release the connection >>>>>>>>>>>>>>>>>>> since it is a >>>>>>>>>>>>>>>>>>> best effort thing. Not releasing the connection makes you >>>>>>>>>>>>>>>>>>> pay a lot - aws >>>>>>>>>>>>>>>>>>> ;) - or prevents you to launch other processings - >>>>>>>>>>>>>>>>>>> concurrent limit. >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> For this example @Teardown is an exact fit. If things die >>>>>>>>>>>>>>>>>> so badly that @Teardown is not called then nothing else can >>>>>>>>>>>>>>>>>> be called to >>>>>>>>>>>>>>>>>> close the connection either. What AWS service are you >>>>>>>>>>>>>>>>>> thinking of that >>>>>>>>>>>>>>>>>> stays open for a long time when everything at the other end >>>>>>>>>>>>>>>>>> has died? >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> You assume connections are kind of stateless but some >>>>>>>>>>>>>>>>>> (proprietary) protocols requires some closing exchanges >>>>>>>>>>>>>>>>>> which are not only >>>>>>>>>>>>>>>>>> "im leaving". >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> For aws i was thinking about starting some services - >>>>>>>>>>>>>>>>>> machines - on the fly in a pipeline startup and closing them >>>>>>>>>>>>>>>>>> at the end. If >>>>>>>>>>>>>>>>>> teardown is not called you leak machines and money. You can >>>>>>>>>>>>>>>>>> say it can be >>>>>>>>>>>>>>>>>> done another way...as the full pipeline ;). >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> I dont want to be picky but if beam cant handle its >>>>>>>>>>>>>>>>>> components lifecycle it can be used at scale for generic >>>>>>>>>>>>>>>>>> pipelines and if >>>>>>>>>>>>>>>>>> bound to some particular IO. >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> What does prevent to enforce teardown - ignoring the >>>>>>>>>>>>>>>>>> interstellar crash case which cant be handled by any human >>>>>>>>>>>>>>>>>> system? Nothing >>>>>>>>>>>>>>>>>> technically. Why do you push to not handle it? Is it due to >>>>>>>>>>>>>>>>>> some legacy >>>>>>>>>>>>>>>>>> code on dataflow or something else? >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> Teardown *is* already documented and implemented this way >>>>>>>>>>>>>>>>> (best-effort). So I'm not sure what kind of change you're >>>>>>>>>>>>>>>>> asking for. >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> Remove "best effort" from the javadoc. If it is not call >>>>>>>>>>>>>>>> then it is a bug and we are done :). >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> Also what does it mean for the users? Direct runner does >>>>>>>>>>>>>>>>>> it so if a user udes the RI in test, he will get a different >>>>>>>>>>>>>>>>>> behavior in >>>>>>>>>>>>>>>>>> prod? Also dont forget the user doesnt know what the IOs he >>>>>>>>>>>>>>>>>> composes use so >>>>>>>>>>>>>>>>>> this is so impacting for the whole product than he must be >>>>>>>>>>>>>>>>>> handled IMHO. >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> I understand the portability culture is new in big data >>>>>>>>>>>>>>>>>> world but it is not a reason to ignore what people did for >>>>>>>>>>>>>>>>>> years and do it >>>>>>>>>>>>>>>>>> wrong before doing right ;). >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> My proposal is to list what can prevent to guarantee - in >>>>>>>>>>>>>>>>>> the normal IT conditions - the execution of teardown. Then >>>>>>>>>>>>>>>>>> we see if we can >>>>>>>>>>>>>>>>>> handle it and only if there is a technical reason we cant we >>>>>>>>>>>>>>>>>> make it >>>>>>>>>>>>>>>>>> experimental/unsupported in the api. I know spark and flink >>>>>>>>>>>>>>>>>> can, any >>>>>>>>>>>>>>>>>> unknown blocker for other runners? >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> Technical note: even a kill should go through java >>>>>>>>>>>>>>>>>> shutdown hooks otherwise your environment (beam enclosing >>>>>>>>>>>>>>>>>> software) is >>>>>>>>>>>>>>>>>> fully unhandled and your overall system is uncontrolled. >>>>>>>>>>>>>>>>>> Only case where it >>>>>>>>>>>>>>>>>> is not true is when the software is always owned by a vendor >>>>>>>>>>>>>>>>>> and never >>>>>>>>>>>>>>>>>> installed on customer environment. In this case it belongd >>>>>>>>>>>>>>>>>> to the vendor to >>>>>>>>>>>>>>>>>> handle beam API and not to beam to adjust its API for a >>>>>>>>>>>>>>>>>> vendor - otherwise >>>>>>>>>>>>>>>>>> all unsupported features by one runner should be made >>>>>>>>>>>>>>>>>> optional right? >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> All state is not about network, even in distributed >>>>>>>>>>>>>>>>>> systems so this is key to have an explicit and defined >>>>>>>>>>>>>>>>>> lifecycle. >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> Kenn >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>> >>>>> >>>> >>> >>