On Mon, Feb 19, 2018 at 7:11 AM, Romain Manni-Bucau <rmannibu...@gmail.com> wrote:
> > > 2018-02-19 15:57 GMT+01:00 Reuven Lax <re...@google.com>: > >> >> >> On Mon, Feb 19, 2018 at 12:35 AM, Romain Manni-Bucau < >> rmannibu...@gmail.com> wrote: >> >>> @Reuven: in practise it is created by pool of 256 but leads to the same >>> pattern, the teardown is just a "if (iCreatedThem) releaseThem();" >>> >> >> How do you control "256?" Even if you have a pool of 256 workers, nothing >> in Beam guarantees how many threads and DoFns are created per worker. In >> theory the runner might decide to create 1000 threads on each worker. >> > > Nop was the other way around, in this case on AWS you can get 256 > instances at once but not 512 (which will be 2x256). So when you compute > the distribution you allocate to some fn the role to own the instance > lookup and releasing. > I still don't understand. Let's be more precise. If you write the following code: pCollection.apply(ParDo.of(new MyDoFn())); There is no way to control how many instances of MyDoFn are created. The runner might decided to create a million instances of this class across your worker pool, which means that you will get a million Setup and Teardown calls. > Anyway this was just an example of an external resource you must release. > Real topic is that beam should define asap a guaranteed generic lifecycle > to let user embrace its programming model. > > >> >> >> >>> @Eugene: >>> 1. wait logic is about passing the value which is not always possible >>> (like 15% of cases from my raw estimate) >>> 2. sdf: i'll try to detail why i mention SDF more here >>> >>> >>> Concretely beam exposes a portable API (included in the SDK core). This >>> API defines a *container* API and therefore implies bean lifecycles. I'll >>> not detail them all but just use the sources and dofn (not sdf) to >>> illustrate the idea I'm trying to develop. >>> >>> A. Source >>> >>> A source computes a partition plan with 2 primitives: estimateSize and >>> split. As an user you can expect both to be called on the same bean >>> instance to avoid to pay the same connection cost(s) twice. Concretely: >>> >>> connect() >>> try { >>> estimateSize() >>> split() >>> } finally { >>> disconnect() >>> } >>> >>> this is not guaranteed by the API so you must do: >>> >>> connect() >>> try { >>> estimateSize() >>> } finally { >>> disconnect() >>> } >>> connect() >>> try { >>> split() >>> } finally { >>> disconnect() >>> } >>> >>> + a workaround with an internal estimate size since this primitive is >>> often called in split but you dont want to connect twice in the second >>> phase. >>> >>> Why do you need that? Simply cause you want to define an API to >>> implement sources which initializes the source bean and destroys it. >>> I insists it is a very very basic concern for such API. However beam >>> doesn't embraces it and doesn't assume it so building any API on top of >>> beam is very hurtful today and for direct beam users you hit the exact same >>> issues - check how IO are implemented, the static utilities which create >>> volatile connections preventing to reuse existing connection in a single >>> method (https://github.com/apache/beam/blob/master/sdks/java/io/ela >>> sticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearc >>> h/ElasticsearchIO.java#L862). >>> >>> Same logic applies to the reader which is then created. >>> >>> B. DoFn & SDF >>> >>> As a fn dev you expect the same from the beam runtime: init(); try { >>> while (...) process(); } finally { destroy(); } and that it is executed on >>> the exact same instance to be able to be stateful at that level for >>> expensive connections/operations/flow state handling. >>> >>> As you mentionned with the million example, this sequence should happen >>> for each single instance so 1M times for your example. >>> >>> Now why did I mention SDF several times? Because SDF is a generalisation >>> of both cases (source and dofn). Therefore it creates way more instances >>> and requires to have a way more strict/explicit definition of the exact >>> lifecycle and which instance does what. Since beam handles the full >>> lifecycle of the bean instances it must provide init/destroy hooks >>> (setup/teardown) which can be stateful. >>> >>> If you take the JDBC example which was mentionned earlier. Today, >>> because of the teardown issue it uses bundles. Since bundles size is not >>> defined - and will not with SDF, it must use a pool to be able to reuse a >>> connection instance to not correct performances. Now with the SDF and the >>> split increase, how do you handle the pool size? Generally in batch you use >>> a single connection per thread to avoid to consume all database >>> connections. With a pool you have 2 choices: 1. use a pool of 1, 2. use a >>> pool a bit higher but multiplied by the number of beans you will likely x2 >>> or 3 the connection count and make the execution fail with "no more >>> connection available". I you picked 1 (pool of #1), then you still have to >>> have a reliable teardown by pool instance (close() generally) to ensure you >>> release the pool and don't leak the connection information in the JVM. In >>> all case you come back to the init()/destroy() lifecycle even if you fake >>> to get connections with bundles. >>> >>> Just to make it obvious: SDF mentions are just cause SDF imply all the >>> current issues with the loose definition of the bean lifecycles at an >>> exponential level, nothing else. >>> >>> >>> >>> Romain Manni-Bucau >>> @rmannibucau <https://twitter.com/rmannibucau> | Blog >>> <https://rmannibucau.metawerx.net/> | Old Blog >>> <http://rmannibucau.wordpress.com> | Github >>> <https://github.com/rmannibucau> | LinkedIn >>> <https://www.linkedin.com/in/rmannibucau> | Book >>> <https://www.packtpub.com/application-development/java-ee-8-high-performance> >>> >>> 2018-02-18 22:32 GMT+01:00 Eugene Kirpichov <kirpic...@google.com>: >>> >>>> The kind of whole-transform lifecycle you're mentioning can be >>>> accomplished using the Wait transform as I suggested in the thread above, >>>> and I believe it should become the canonical way to do that. >>>> >>>> (Would like to reiterate one more time, as the main author of most >>>> design documents related to SDF and of its implementation in the Java >>>> direct and dataflow runner that SDF is fully unrelated to the topic of >>>> cleanup - I'm very confused as to why it keeps coming up) >>>> >>>> On Sun, Feb 18, 2018, 1:15 PM Romain Manni-Bucau <rmannibu...@gmail.com> >>>> wrote: >>>> >>>>> I kind of agree except transforms lack a lifecycle too. My >>>>> understanding is that sdf could be a way to unify it and clean the api. >>>>> >>>>> Otherwise how to normalize - single api - lifecycle of transforms? >>>>> >>>>> Le 18 févr. 2018 21:32, "Ben Chambers" <bchamb...@apache.org> a >>>>> écrit : >>>>> >>>>>> Are you sure that focusing on the cleanup of specific DoFn's is >>>>>> appropriate? Many cases where cleanup is necessary, it is around an >>>>>> entire >>>>>> composite PTransform. I think there have been discussions/proposals >>>>>> around >>>>>> a more methodical "cleanup" option, but those haven't been implemented, >>>>>> to >>>>>> the best of my knowledge. >>>>>> >>>>>> For instance, consider the steps of a FileIO: >>>>>> 1. Write to a bunch (N shards) of temporary files >>>>>> 2. When all temporary files are complete, attempt to do a bulk copy >>>>>> to put them in the final destination. >>>>>> 3. Cleanup all the temporary files. >>>>>> >>>>>> (This is often desirable because it minimizes the chance of seeing >>>>>> partial/incomplete results in the final destination). >>>>>> >>>>>> In the above, you'd want step 1 to execute on many workers, likely >>>>>> using a ParDo (say N different workers). >>>>>> The move step should only happen once, so on one worker. This means >>>>>> it will be a different DoFn, likely with some stuff done to ensure it >>>>>> runs >>>>>> on one worker. >>>>>> >>>>>> In such a case, cleanup / @TearDown of the DoFn is not enough. We >>>>>> need an API for a PTransform to schedule some cleanup work for when the >>>>>> transform is "done". In batch this is relatively straightforward, but >>>>>> doesn't exist. This is the source of some problems, such as BigQuery sink >>>>>> leaving files around that have failed to import into BigQuery. >>>>>> >>>>>> In streaming this is less straightforward -- do you want to wait >>>>>> until the end of the pipeline? Or do you want to wait until the end of >>>>>> the >>>>>> window? In practice, you just want to wait until you know nobody will >>>>>> need >>>>>> the resource anymore. >>>>>> >>>>>> This led to some discussions around a "cleanup" API, where you could >>>>>> have a transform that output resource objects. Each resource object would >>>>>> have logic for cleaning it up. And there would be something that >>>>>> indicated >>>>>> what parts of the pipeline needed that resource, and what kind of >>>>>> temporal >>>>>> lifetime those objects had. As soon as that part of the pipeline had >>>>>> advanced far enough that it would no longer need the resources, they >>>>>> would >>>>>> get cleaned up. This can be done at pipeline shutdown, or incrementally >>>>>> during a streaming pipeline, etc. >>>>>> >>>>>> Would something like this be a better fit for your use case? If not, >>>>>> why is handling teardown within a single DoFn sufficient? >>>>>> >>>>>> On Sun, Feb 18, 2018 at 11:53 AM Romain Manni-Bucau < >>>>>> rmannibu...@gmail.com> wrote: >>>>>> >>>>>>> Yes 1M. Lets try to explain you simplifying the overall execution. >>>>>>> Each instance - one fn so likely in a thread of a worker - has its >>>>>>> lifecycle. Caricaturally: "new" and garbage collection. >>>>>>> >>>>>>> In practise, new is often an unsafe allocate (deserialization) but >>>>>>> it doesnt matter here. >>>>>>> >>>>>>> What i want is any "new" to have a following setup before any >>>>>>> process or stattbundle and the last time beam has the instance before >>>>>>> it is >>>>>>> gc-ed and after last finishbundle it calls teardown. >>>>>>> >>>>>>> It is as simple as it. >>>>>>> This way no need to comibe fn in a way making a fn not self >>>>>>> contained to implement basic transforms. >>>>>>> >>>>>>> Le 18 févr. 2018 20:07, "Reuven Lax" <re...@google.com> a écrit : >>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> On Sun, Feb 18, 2018 at 10:50 AM, Romain Manni-Bucau < >>>>>>>> rmannibu...@gmail.com> wrote: >>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> Le 18 févr. 2018 19:28, "Ben Chambers" <bchamb...@apache.org> a >>>>>>>>> écrit : >>>>>>>>> >>>>>>>>> It feels like his thread may be a bit off-track. Rather than >>>>>>>>> focusing on the semantics of the existing methods -- which have been >>>>>>>>> noted >>>>>>>>> to be meet many existing use cases -- it would be helpful to focus on >>>>>>>>> more >>>>>>>>> on the reason you are looking for something with different semantics. >>>>>>>>> >>>>>>>>> Some possibilities (I'm not sure which one you are trying to do): >>>>>>>>> >>>>>>>>> 1. Clean-up some external, global resource, that was initialized >>>>>>>>> once during the startup of the pipeline. If this is the case, how are >>>>>>>>> you >>>>>>>>> ensuring it was really only initialized once (and not once per >>>>>>>>> worker, per >>>>>>>>> thread, per instance, etc.)? How do you know when the pipeline should >>>>>>>>> release it? If the answer is "when it reaches step X", then what >>>>>>>>> about a >>>>>>>>> streaming pipeline? >>>>>>>>> >>>>>>>>> >>>>>>>>> When the dofn is no more needed logically ie when the batch is >>>>>>>>> done or stream is stopped (manually or by a jvm shutdown) >>>>>>>>> >>>>>>>> >>>>>>>> I'm really not following what this means. >>>>>>>> >>>>>>>> Let's say that a pipeline is running 1000 workers, and each worker >>>>>>>> is running 1000 threads (each running a copy of the same DoFn). How >>>>>>>> many >>>>>>>> cleanups do you want (do you want 1000 * 1000 = 1M cleanups) and when >>>>>>>> do >>>>>>>> you want it called? When the entire pipeline is shut down? When an >>>>>>>> individual worker is about to shut down (which may be temporary - may >>>>>>>> be >>>>>>>> about to start back up)? Something else? >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> 2. Finalize some resources that are used within some region of the >>>>>>>>> pipeline. While, the DoFn lifecycle methods are not a good fit for >>>>>>>>> this >>>>>>>>> (they are focused on managing resources within the DoFn), you could >>>>>>>>> model >>>>>>>>> this on how FileIO finalizes the files that it produced. For instance: >>>>>>>>> a) ParDo generates "resource IDs" (or some token that stores >>>>>>>>> information about resources) >>>>>>>>> b) "Require Deterministic Input" (to prevent retries from >>>>>>>>> changing resource IDs) >>>>>>>>> c) ParDo that initializes the resources >>>>>>>>> d) Pipeline segments that use the resources, and eventually >>>>>>>>> output the fact they're done >>>>>>>>> e) "Require Deterministic Input" >>>>>>>>> f) ParDo that frees the resources >>>>>>>>> >>>>>>>>> By making the use of the resource part of the data it is possible >>>>>>>>> to "checkpoint" which resources may be in use or have been finished by >>>>>>>>> using the require deterministic input. This is important to ensuring >>>>>>>>> everything is actually cleaned up. >>>>>>>>> >>>>>>>>> >>>>>>>>> I nees that but generic and not case by case to industrialize some >>>>>>>>> api on top of beam. >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> 3. Some other use case that I may be missing? If it is this case, >>>>>>>>> could you elaborate on what you are trying to accomplish? That would >>>>>>>>> help >>>>>>>>> me understand both the problems with existing options and possibly >>>>>>>>> what >>>>>>>>> could be done to help. >>>>>>>>> >>>>>>>>> >>>>>>>>> I understand there are sorkaround for almost all cases but means >>>>>>>>> each transform is different in its lifecycle handling except i >>>>>>>>> dislike it >>>>>>>>> a lot at a scale and as a user since you cant put any unified >>>>>>>>> practise on >>>>>>>>> top of beam, it also makes beam very hard to integrate or to use to >>>>>>>>> build >>>>>>>>> higher level libraries or softwares. >>>>>>>>> >>>>>>>>> This is why i tried to not start the workaround discussions and >>>>>>>>> just stay at API level. >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> -- Ben >>>>>>>>> >>>>>>>>> >>>>>>>>> On Sun, Feb 18, 2018 at 9:56 AM Romain Manni-Bucau < >>>>>>>>> rmannibu...@gmail.com> wrote: >>>>>>>>> >>>>>>>>>> 2018-02-18 18:36 GMT+01:00 Eugene Kirpichov <kirpic...@google.com >>>>>>>>>> >: >>>>>>>>>> >>>>>>>>>>> "Machine state" is overly low-level because many of the possible >>>>>>>>>>> reasons can happen on a perfectly fine machine. >>>>>>>>>>> If you'd like to rephrase it to "it will be called except in >>>>>>>>>>> various situations where it's logically impossible or impractical to >>>>>>>>>>> guarantee that it's called", that's fine. Or you can list some of >>>>>>>>>>> the >>>>>>>>>>> examples above. >>>>>>>>>>> >>>>>>>>>> >>>>>>>>>> Sounds ok to me >>>>>>>>>> >>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> The main point for the user is, you *will* see non-preventable >>>>>>>>>>> situations where it couldn't be called - it's not just intergalactic >>>>>>>>>>> crashes - so if the logic is very important (e.g. cleaning up a >>>>>>>>>>> large >>>>>>>>>>> amount of temporary files, shutting down a large number of VMs you >>>>>>>>>>> started >>>>>>>>>>> etc), you have to express it using one of the other methods that >>>>>>>>>>> have >>>>>>>>>>> stricter guarantees (which obviously come at a cost, e.g. no >>>>>>>>>>> pass-by-reference). >>>>>>>>>>> >>>>>>>>>> >>>>>>>>>> FinishBundle has the exact same guarantee sadly so not which >>>>>>>>>> which other method you speak about. Concretely if you make it really >>>>>>>>>> unreliable - this is what best effort sounds to me - then users can >>>>>>>>>> use it >>>>>>>>>> to clean anything but if you make it "can happen but it is >>>>>>>>>> unexpected and >>>>>>>>>> means something happent" then it is fine to have a manual - or auto >>>>>>>>>> if >>>>>>>>>> fancy - recovery procedure. This is where it makes all the >>>>>>>>>> difference and >>>>>>>>>> impacts the developpers, ops (all users basically). >>>>>>>>>> >>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> On Sun, Feb 18, 2018 at 9:16 AM Romain Manni-Bucau < >>>>>>>>>>> rmannibu...@gmail.com> wrote: >>>>>>>>>>> >>>>>>>>>>>> Agree Eugene except that "best effort" means that. It is also >>>>>>>>>>>> often used to say "at will" and this is what triggered this thread. >>>>>>>>>>>> >>>>>>>>>>>> I'm fine using "except if the machine state prevents it" but >>>>>>>>>>>> "best effort" is too open and can be very badly and wrongly >>>>>>>>>>>> perceived by >>>>>>>>>>>> users (like I did). >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> Romain Manni-Bucau >>>>>>>>>>>> @rmannibucau <https://twitter.com/rmannibucau> | Blog >>>>>>>>>>>> <https://rmannibucau.metawerx.net/> | Old Blog >>>>>>>>>>>> <http://rmannibucau.wordpress.com> | Github >>>>>>>>>>>> <https://github.com/rmannibucau> | LinkedIn >>>>>>>>>>>> <https://www.linkedin.com/in/rmannibucau> | Book >>>>>>>>>>>> <https://www.packtpub.com/application-development/java-ee-8-high-performance> >>>>>>>>>>>> >>>>>>>>>>>> 2018-02-18 18:13 GMT+01:00 Eugene Kirpichov < >>>>>>>>>>>> kirpic...@google.com>: >>>>>>>>>>>> >>>>>>>>>>>>> It will not be called if it's impossible to call it: in the >>>>>>>>>>>>> example situation you have (intergalactic crash), and in a number >>>>>>>>>>>>> of more >>>>>>>>>>>>> common cases: eg in case the worker container has crashed (eg >>>>>>>>>>>>> user code in >>>>>>>>>>>>> a different thread called a C library over JNI and it >>>>>>>>>>>>> segfaulted), JVM bug, >>>>>>>>>>>>> crash due to user code OOM, in case the worker has lost network >>>>>>>>>>>>> connectivity (then it may be called but it won't be able to do >>>>>>>>>>>>> anything >>>>>>>>>>>>> useful), in case this is running on a preemptible VM and it was >>>>>>>>>>>>> preempted >>>>>>>>>>>>> by the underlying cluster manager without notice or if the worker >>>>>>>>>>>>> was too >>>>>>>>>>>>> busy with other stuff (eg calling other Teardown functions) until >>>>>>>>>>>>> the >>>>>>>>>>>>> preemption timeout elapsed, in case the underlying hardware >>>>>>>>>>>>> simply failed >>>>>>>>>>>>> (which happens quite often at scale), and in many other >>>>>>>>>>>>> conditions. >>>>>>>>>>>>> >>>>>>>>>>>>> "Best effort" is the commonly used term to describe such >>>>>>>>>>>>> behavior. Please feel free to file bugs for cases where you >>>>>>>>>>>>> observed a >>>>>>>>>>>>> runner not call Teardown in a situation where it was possible to >>>>>>>>>>>>> call it >>>>>>>>>>>>> but the runner made insufficient effort. >>>>>>>>>>>>> >>>>>>>>>>>>> On Sun, Feb 18, 2018, 9:02 AM Romain Manni-Bucau < >>>>>>>>>>>>> rmannibu...@gmail.com> wrote: >>>>>>>>>>>>> >>>>>>>>>>>>>> 2018-02-18 18:00 GMT+01:00 Eugene Kirpichov < >>>>>>>>>>>>>> kirpic...@google.com>: >>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> On Sun, Feb 18, 2018, 2:06 AM Romain Manni-Bucau < >>>>>>>>>>>>>>> rmannibu...@gmail.com> wrote: >>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> Le 18 févr. 2018 00:23, "Kenneth Knowles" <k...@google.com> >>>>>>>>>>>>>>>> a écrit : >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> On Sat, Feb 17, 2018 at 3:09 PM, Romain Manni-Bucau < >>>>>>>>>>>>>>>> rmannibu...@gmail.com> wrote: >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> If you give an example of a high-level need (e.g. "I'm >>>>>>>>>>>>>>>>> trying to write an IO for system $x and it requires the >>>>>>>>>>>>>>>>> following >>>>>>>>>>>>>>>>> initialization and the following cleanup logic and the >>>>>>>>>>>>>>>>> following processing >>>>>>>>>>>>>>>>> in between") I'll be better able to help you. >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> Take a simple example of a transform requiring a >>>>>>>>>>>>>>>>> connection. Using bundles is a perf killer since size is not >>>>>>>>>>>>>>>>> controlled. >>>>>>>>>>>>>>>>> Using teardown doesnt allow you to release the connection >>>>>>>>>>>>>>>>> since it is a >>>>>>>>>>>>>>>>> best effort thing. Not releasing the connection makes you pay >>>>>>>>>>>>>>>>> a lot - aws >>>>>>>>>>>>>>>>> ;) - or prevents you to launch other processings - concurrent >>>>>>>>>>>>>>>>> limit. >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> For this example @Teardown is an exact fit. If things die >>>>>>>>>>>>>>>> so badly that @Teardown is not called then nothing else can be >>>>>>>>>>>>>>>> called to >>>>>>>>>>>>>>>> close the connection either. What AWS service are you thinking >>>>>>>>>>>>>>>> of that >>>>>>>>>>>>>>>> stays open for a long time when everything at the other end >>>>>>>>>>>>>>>> has died? >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> You assume connections are kind of stateless but some >>>>>>>>>>>>>>>> (proprietary) protocols requires some closing exchanges which >>>>>>>>>>>>>>>> are not only >>>>>>>>>>>>>>>> "im leaving". >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> For aws i was thinking about starting some services - >>>>>>>>>>>>>>>> machines - on the fly in a pipeline startup and closing them >>>>>>>>>>>>>>>> at the end. If >>>>>>>>>>>>>>>> teardown is not called you leak machines and money. You can >>>>>>>>>>>>>>>> say it can be >>>>>>>>>>>>>>>> done another way...as the full pipeline ;). >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> I dont want to be picky but if beam cant handle its >>>>>>>>>>>>>>>> components lifecycle it can be used at scale for generic >>>>>>>>>>>>>>>> pipelines and if >>>>>>>>>>>>>>>> bound to some particular IO. >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> What does prevent to enforce teardown - ignoring the >>>>>>>>>>>>>>>> interstellar crash case which cant be handled by any human >>>>>>>>>>>>>>>> system? Nothing >>>>>>>>>>>>>>>> technically. Why do you push to not handle it? Is it due to >>>>>>>>>>>>>>>> some legacy >>>>>>>>>>>>>>>> code on dataflow or something else? >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>> Teardown *is* already documented and implemented this way >>>>>>>>>>>>>>> (best-effort). So I'm not sure what kind of change you're >>>>>>>>>>>>>>> asking for. >>>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> Remove "best effort" from the javadoc. If it is not call then >>>>>>>>>>>>>> it is a bug and we are done :). >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> Also what does it mean for the users? Direct runner does it >>>>>>>>>>>>>>>> so if a user udes the RI in test, he will get a different >>>>>>>>>>>>>>>> behavior in prod? >>>>>>>>>>>>>>>> Also dont forget the user doesnt know what the IOs he composes >>>>>>>>>>>>>>>> use so this >>>>>>>>>>>>>>>> is so impacting for the whole product than he must be handled >>>>>>>>>>>>>>>> IMHO. >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> I understand the portability culture is new in big data >>>>>>>>>>>>>>>> world but it is not a reason to ignore what people did for >>>>>>>>>>>>>>>> years and do it >>>>>>>>>>>>>>>> wrong before doing right ;). >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> My proposal is to list what can prevent to guarantee - in >>>>>>>>>>>>>>>> the normal IT conditions - the execution of teardown. Then we >>>>>>>>>>>>>>>> see if we can >>>>>>>>>>>>>>>> handle it and only if there is a technical reason we cant we >>>>>>>>>>>>>>>> make it >>>>>>>>>>>>>>>> experimental/unsupported in the api. I know spark and flink >>>>>>>>>>>>>>>> can, any >>>>>>>>>>>>>>>> unknown blocker for other runners? >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> Technical note: even a kill should go through java shutdown >>>>>>>>>>>>>>>> hooks otherwise your environment (beam enclosing software) is >>>>>>>>>>>>>>>> fully >>>>>>>>>>>>>>>> unhandled and your overall system is uncontrolled. Only case >>>>>>>>>>>>>>>> where it is >>>>>>>>>>>>>>>> not true is when the software is always owned by a vendor and >>>>>>>>>>>>>>>> never >>>>>>>>>>>>>>>> installed on customer environment. In this case it belongd to >>>>>>>>>>>>>>>> the vendor to >>>>>>>>>>>>>>>> handle beam API and not to beam to adjust its API for a vendor >>>>>>>>>>>>>>>> - otherwise >>>>>>>>>>>>>>>> all unsupported features by one runner should be made optional >>>>>>>>>>>>>>>> right? >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> All state is not about network, even in distributed systems >>>>>>>>>>>>>>>> so this is key to have an explicit and defined lifecycle. >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> Kenn >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>> >>>>>>>> >>> >> >