Workers restarting is not a bug, it's standard often expected. On Mon, Feb 19, 2018, 12:03 PM Romain Manni-Bucau <rmannibu...@gmail.com> wrote:
> Nothing, as mentionned it is a bug so recovery is a bug recovery > (procedure) > > Le 19 févr. 2018 19:42, "Eugene Kirpichov" <kirpic...@google.com> a > écrit : > >> So what would you like to happen if there is a crash? The DoFn instance >> no longer exists because the JVM it ran on no longer exists. What should >> Teardown be called on? >> >> On Mon, Feb 19, 2018, 10:20 AM Romain Manni-Bucau <rmannibu...@gmail.com> >> wrote: >> >>> This is what i want and not 999999 teardowns for 1000000 setups until >>> there is an unexpected crash (= a bug). >>> >>> Le 19 févr. 2018 18:57, "Reuven Lax" <re...@google.com> a écrit : >>> >>>> >>>> >>>> On Mon, Feb 19, 2018 at 7:11 AM, Romain Manni-Bucau < >>>> rmannibu...@gmail.com> wrote: >>>> >>>>> >>>>> >>>>> 2018-02-19 15:57 GMT+01:00 Reuven Lax <re...@google.com>: >>>>> >>>>>> >>>>>> >>>>>> On Mon, Feb 19, 2018 at 12:35 AM, Romain Manni-Bucau < >>>>>> rmannibu...@gmail.com> wrote: >>>>>> >>>>>>> @Reuven: in practise it is created by pool of 256 but leads to the >>>>>>> same pattern, the teardown is just a "if (iCreatedThem) releaseThem();" >>>>>>> >>>>>> >>>>>> How do you control "256?" Even if you have a pool of 256 workers, >>>>>> nothing in Beam guarantees how many threads and DoFns are created per >>>>>> worker. In theory the runner might decide to create 1000 threads on each >>>>>> worker. >>>>>> >>>>> >>>>> Nop was the other way around, in this case on AWS you can get 256 >>>>> instances at once but not 512 (which will be 2x256). So when you compute >>>>> the distribution you allocate to some fn the role to own the instance >>>>> lookup and releasing. >>>>> >>>> >>>> I still don't understand. Let's be more precise. If you write the >>>> following code: >>>> >>>> pCollection.apply(ParDo.of(new MyDoFn())); >>>> >>>> There is no way to control how many instances of MyDoFn are created. >>>> The runner might decided to create a million instances of this class across >>>> your worker pool, which means that you will get a million Setup and >>>> Teardown calls. >>>> >>>> >>>>> Anyway this was just an example of an external resource you must >>>>> release. Real topic is that beam should define asap a guaranteed generic >>>>> lifecycle to let user embrace its programming model. >>>>> >>>>> >>>>>> >>>>>> >>>>>> >>>>>>> @Eugene: >>>>>>> 1. wait logic is about passing the value which is not always >>>>>>> possible (like 15% of cases from my raw estimate) >>>>>>> 2. sdf: i'll try to detail why i mention SDF more here >>>>>>> >>>>>>> >>>>>>> Concretely beam exposes a portable API (included in the SDK core). >>>>>>> This API defines a *container* API and therefore implies bean >>>>>>> lifecycles. >>>>>>> I'll not detail them all but just use the sources and dofn (not sdf) to >>>>>>> illustrate the idea I'm trying to develop. >>>>>>> >>>>>>> A. Source >>>>>>> >>>>>>> A source computes a partition plan with 2 primitives: estimateSize >>>>>>> and split. As an user you can expect both to be called on the same bean >>>>>>> instance to avoid to pay the same connection cost(s) twice. Concretely: >>>>>>> >>>>>>> connect() >>>>>>> try { >>>>>>> estimateSize() >>>>>>> split() >>>>>>> } finally { >>>>>>> disconnect() >>>>>>> } >>>>>>> >>>>>>> this is not guaranteed by the API so you must do: >>>>>>> >>>>>>> connect() >>>>>>> try { >>>>>>> estimateSize() >>>>>>> } finally { >>>>>>> disconnect() >>>>>>> } >>>>>>> connect() >>>>>>> try { >>>>>>> split() >>>>>>> } finally { >>>>>>> disconnect() >>>>>>> } >>>>>>> >>>>>>> + a workaround with an internal estimate size since this primitive >>>>>>> is often called in split but you dont want to connect twice in the >>>>>>> second >>>>>>> phase. >>>>>>> >>>>>>> Why do you need that? Simply cause you want to define an API to >>>>>>> implement sources which initializes the source bean and destroys it. >>>>>>> I insists it is a very very basic concern for such API. However beam >>>>>>> doesn't embraces it and doesn't assume it so building any API on top of >>>>>>> beam is very hurtful today and for direct beam users you hit the exact >>>>>>> same >>>>>>> issues - check how IO are implemented, the static utilities which create >>>>>>> volatile connections preventing to reuse existing connection in a single >>>>>>> method ( >>>>>>> https://github.com/apache/beam/blob/master/sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java#L862 >>>>>>> ). >>>>>>> >>>>>>> Same logic applies to the reader which is then created. >>>>>>> >>>>>>> B. DoFn & SDF >>>>>>> >>>>>>> As a fn dev you expect the same from the beam runtime: init(); try { >>>>>>> while (...) process(); } finally { destroy(); } and that it is executed >>>>>>> on >>>>>>> the exact same instance to be able to be stateful at that level for >>>>>>> expensive connections/operations/flow state handling. >>>>>>> >>>>>>> As you mentionned with the million example, this sequence should >>>>>>> happen for each single instance so 1M times for your example. >>>>>>> >>>>>>> Now why did I mention SDF several times? Because SDF is a >>>>>>> generalisation of both cases (source and dofn). Therefore it creates way >>>>>>> more instances and requires to have a way more strict/explicit >>>>>>> definition >>>>>>> of the exact lifecycle and which instance does what. Since beam handles >>>>>>> the >>>>>>> full lifecycle of the bean instances it must provide init/destroy hooks >>>>>>> (setup/teardown) which can be stateful. >>>>>>> >>>>>>> If you take the JDBC example which was mentionned earlier. Today, >>>>>>> because of the teardown issue it uses bundles. Since bundles size is not >>>>>>> defined - and will not with SDF, it must use a pool to be able to reuse >>>>>>> a >>>>>>> connection instance to not correct performances. Now with the SDF and >>>>>>> the >>>>>>> split increase, how do you handle the pool size? Generally in batch you >>>>>>> use >>>>>>> a single connection per thread to avoid to consume all database >>>>>>> connections. With a pool you have 2 choices: 1. use a pool of 1, 2. use >>>>>>> a >>>>>>> pool a bit higher but multiplied by the number of beans you will likely >>>>>>> x2 >>>>>>> or 3 the connection count and make the execution fail with "no more >>>>>>> connection available". I you picked 1 (pool of #1), then you still have >>>>>>> to >>>>>>> have a reliable teardown by pool instance (close() generally) to ensure >>>>>>> you >>>>>>> release the pool and don't leak the connection information in the JVM. >>>>>>> In >>>>>>> all case you come back to the init()/destroy() lifecycle even if you >>>>>>> fake >>>>>>> to get connections with bundles. >>>>>>> >>>>>>> Just to make it obvious: SDF mentions are just cause SDF imply all >>>>>>> the current issues with the loose definition of the bean lifecycles at >>>>>>> an >>>>>>> exponential level, nothing else. >>>>>>> >>>>>>> >>>>>>> >>>>>>> Romain Manni-Bucau >>>>>>> @rmannibucau <https://twitter.com/rmannibucau> | Blog >>>>>>> <https://rmannibucau.metawerx.net/> | Old Blog >>>>>>> <http://rmannibucau.wordpress.com> | Github >>>>>>> <https://github.com/rmannibucau> | LinkedIn >>>>>>> <https://www.linkedin.com/in/rmannibucau> | Book >>>>>>> <https://www.packtpub.com/application-development/java-ee-8-high-performance> >>>>>>> >>>>>>> 2018-02-18 22:32 GMT+01:00 Eugene Kirpichov <kirpic...@google.com>: >>>>>>> >>>>>>>> The kind of whole-transform lifecycle you're mentioning can be >>>>>>>> accomplished using the Wait transform as I suggested in the thread >>>>>>>> above, >>>>>>>> and I believe it should become the canonical way to do that. >>>>>>>> >>>>>>>> (Would like to reiterate one more time, as the main author of most >>>>>>>> design documents related to SDF and of its implementation in the Java >>>>>>>> direct and dataflow runner that SDF is fully unrelated to the topic of >>>>>>>> cleanup - I'm very confused as to why it keeps coming up) >>>>>>>> >>>>>>>> On Sun, Feb 18, 2018, 1:15 PM Romain Manni-Bucau < >>>>>>>> rmannibu...@gmail.com> wrote: >>>>>>>> >>>>>>>>> I kind of agree except transforms lack a lifecycle too. My >>>>>>>>> understanding is that sdf could be a way to unify it and clean the >>>>>>>>> api. >>>>>>>>> >>>>>>>>> Otherwise how to normalize - single api - lifecycle of transforms? >>>>>>>>> >>>>>>>>> Le 18 févr. 2018 21:32, "Ben Chambers" <bchamb...@apache.org> a >>>>>>>>> écrit : >>>>>>>>> >>>>>>>>>> Are you sure that focusing on the cleanup of specific DoFn's is >>>>>>>>>> appropriate? Many cases where cleanup is necessary, it is around an >>>>>>>>>> entire >>>>>>>>>> composite PTransform. I think there have been discussions/proposals >>>>>>>>>> around >>>>>>>>>> a more methodical "cleanup" option, but those haven't been >>>>>>>>>> implemented, to >>>>>>>>>> the best of my knowledge. >>>>>>>>>> >>>>>>>>>> For instance, consider the steps of a FileIO: >>>>>>>>>> 1. Write to a bunch (N shards) of temporary files >>>>>>>>>> 2. When all temporary files are complete, attempt to do a bulk >>>>>>>>>> copy to put them in the final destination. >>>>>>>>>> 3. Cleanup all the temporary files. >>>>>>>>>> >>>>>>>>>> (This is often desirable because it minimizes the chance of >>>>>>>>>> seeing partial/incomplete results in the final destination). >>>>>>>>>> >>>>>>>>>> In the above, you'd want step 1 to execute on many workers, >>>>>>>>>> likely using a ParDo (say N different workers). >>>>>>>>>> The move step should only happen once, so on one worker. This >>>>>>>>>> means it will be a different DoFn, likely with some stuff done to >>>>>>>>>> ensure it >>>>>>>>>> runs on one worker. >>>>>>>>>> >>>>>>>>>> In such a case, cleanup / @TearDown of the DoFn is not enough. We >>>>>>>>>> need an API for a PTransform to schedule some cleanup work for when >>>>>>>>>> the >>>>>>>>>> transform is "done". In batch this is relatively straightforward, but >>>>>>>>>> doesn't exist. This is the source of some problems, such as BigQuery >>>>>>>>>> sink >>>>>>>>>> leaving files around that have failed to import into BigQuery. >>>>>>>>>> >>>>>>>>>> In streaming this is less straightforward -- do you want to wait >>>>>>>>>> until the end of the pipeline? Or do you want to wait until the end >>>>>>>>>> of the >>>>>>>>>> window? In practice, you just want to wait until you know nobody >>>>>>>>>> will need >>>>>>>>>> the resource anymore. >>>>>>>>>> >>>>>>>>>> This led to some discussions around a "cleanup" API, where you >>>>>>>>>> could have a transform that output resource objects. Each resource >>>>>>>>>> object >>>>>>>>>> would have logic for cleaning it up. And there would be something >>>>>>>>>> that >>>>>>>>>> indicated what parts of the pipeline needed that resource, and what >>>>>>>>>> kind of >>>>>>>>>> temporal lifetime those objects had. As soon as that part of the >>>>>>>>>> pipeline >>>>>>>>>> had advanced far enough that it would no longer need the resources, >>>>>>>>>> they >>>>>>>>>> would get cleaned up. This can be done at pipeline shutdown, or >>>>>>>>>> incrementally during a streaming pipeline, etc. >>>>>>>>>> >>>>>>>>>> Would something like this be a better fit for your use case? If >>>>>>>>>> not, why is handling teardown within a single DoFn sufficient? >>>>>>>>>> >>>>>>>>>> On Sun, Feb 18, 2018 at 11:53 AM Romain Manni-Bucau < >>>>>>>>>> rmannibu...@gmail.com> wrote: >>>>>>>>>> >>>>>>>>>>> Yes 1M. Lets try to explain you simplifying the overall >>>>>>>>>>> execution. Each instance - one fn so likely in a thread of a worker >>>>>>>>>>> - has >>>>>>>>>>> its lifecycle. Caricaturally: "new" and garbage collection. >>>>>>>>>>> >>>>>>>>>>> In practise, new is often an unsafe allocate (deserialization) >>>>>>>>>>> but it doesnt matter here. >>>>>>>>>>> >>>>>>>>>>> What i want is any "new" to have a following setup before any >>>>>>>>>>> process or stattbundle and the last time beam has the instance >>>>>>>>>>> before it is >>>>>>>>>>> gc-ed and after last finishbundle it calls teardown. >>>>>>>>>>> >>>>>>>>>>> It is as simple as it. >>>>>>>>>>> This way no need to comibe fn in a way making a fn not self >>>>>>>>>>> contained to implement basic transforms. >>>>>>>>>>> >>>>>>>>>>> Le 18 févr. 2018 20:07, "Reuven Lax" <re...@google.com> a >>>>>>>>>>> écrit : >>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> On Sun, Feb 18, 2018 at 10:50 AM, Romain Manni-Bucau < >>>>>>>>>>>> rmannibu...@gmail.com> wrote: >>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> Le 18 févr. 2018 19:28, "Ben Chambers" <bchamb...@apache.org> >>>>>>>>>>>>> a écrit : >>>>>>>>>>>>> >>>>>>>>>>>>> It feels like his thread may be a bit off-track. Rather than >>>>>>>>>>>>> focusing on the semantics of the existing methods -- which have >>>>>>>>>>>>> been noted >>>>>>>>>>>>> to be meet many existing use cases -- it would be helpful to >>>>>>>>>>>>> focus on more >>>>>>>>>>>>> on the reason you are looking for something with different >>>>>>>>>>>>> semantics. >>>>>>>>>>>>> >>>>>>>>>>>>> Some possibilities (I'm not sure which one you are trying to >>>>>>>>>>>>> do): >>>>>>>>>>>>> >>>>>>>>>>>>> 1. Clean-up some external, global resource, that was >>>>>>>>>>>>> initialized once during the startup of the pipeline. If this is >>>>>>>>>>>>> the case, >>>>>>>>>>>>> how are you ensuring it was really only initialized once (and not >>>>>>>>>>>>> once per >>>>>>>>>>>>> worker, per thread, per instance, etc.)? How do you know when the >>>>>>>>>>>>> pipeline >>>>>>>>>>>>> should release it? If the answer is "when it reaches step X", >>>>>>>>>>>>> then what >>>>>>>>>>>>> about a streaming pipeline? >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> When the dofn is no more needed logically ie when the batch is >>>>>>>>>>>>> done or stream is stopped (manually or by a jvm shutdown) >>>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> I'm really not following what this means. >>>>>>>>>>>> >>>>>>>>>>>> Let's say that a pipeline is running 1000 workers, and each >>>>>>>>>>>> worker is running 1000 threads (each running a copy of the same >>>>>>>>>>>> DoFn). How >>>>>>>>>>>> many cleanups do you want (do you want 1000 * 1000 = 1M cleanups) >>>>>>>>>>>> and when >>>>>>>>>>>> do you want it called? When the entire pipeline is shut down? When >>>>>>>>>>>> an >>>>>>>>>>>> individual worker is about to shut down (which may be temporary - >>>>>>>>>>>> may be >>>>>>>>>>>> about to start back up)? Something else? >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> 2. Finalize some resources that are used within some region of >>>>>>>>>>>>> the pipeline. While, the DoFn lifecycle methods are not a good >>>>>>>>>>>>> fit for this >>>>>>>>>>>>> (they are focused on managing resources within the DoFn), you >>>>>>>>>>>>> could model >>>>>>>>>>>>> this on how FileIO finalizes the files that it produced. For >>>>>>>>>>>>> instance: >>>>>>>>>>>>> a) ParDo generates "resource IDs" (or some token that >>>>>>>>>>>>> stores information about resources) >>>>>>>>>>>>> b) "Require Deterministic Input" (to prevent retries from >>>>>>>>>>>>> changing resource IDs) >>>>>>>>>>>>> c) ParDo that initializes the resources >>>>>>>>>>>>> d) Pipeline segments that use the resources, and eventually >>>>>>>>>>>>> output the fact they're done >>>>>>>>>>>>> e) "Require Deterministic Input" >>>>>>>>>>>>> f) ParDo that frees the resources >>>>>>>>>>>>> >>>>>>>>>>>>> By making the use of the resource part of the data it is >>>>>>>>>>>>> possible to "checkpoint" which resources may be in use or have >>>>>>>>>>>>> been >>>>>>>>>>>>> finished by using the require deterministic input. This is >>>>>>>>>>>>> important to >>>>>>>>>>>>> ensuring everything is actually cleaned up. >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> I nees that but generic and not case by case to industrialize >>>>>>>>>>>>> some api on top of beam. >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> 3. Some other use case that I may be missing? If it is this >>>>>>>>>>>>> case, could you elaborate on what you are trying to accomplish? >>>>>>>>>>>>> That would >>>>>>>>>>>>> help me understand both the problems with existing options and >>>>>>>>>>>>> possibly >>>>>>>>>>>>> what could be done to help. >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> I understand there are sorkaround for almost all cases but >>>>>>>>>>>>> means each transform is different in its lifecycle handling >>>>>>>>>>>>> except i >>>>>>>>>>>>> dislike it a lot at a scale and as a user since you cant put any >>>>>>>>>>>>> unified >>>>>>>>>>>>> practise on top of beam, it also makes beam very hard to >>>>>>>>>>>>> integrate or to >>>>>>>>>>>>> use to build higher level libraries or softwares. >>>>>>>>>>>>> >>>>>>>>>>>>> This is why i tried to not start the workaround discussions >>>>>>>>>>>>> and just stay at API level. >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> -- Ben >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> On Sun, Feb 18, 2018 at 9:56 AM Romain Manni-Bucau < >>>>>>>>>>>>> rmannibu...@gmail.com> wrote: >>>>>>>>>>>>> >>>>>>>>>>>>>> 2018-02-18 18:36 GMT+01:00 Eugene Kirpichov < >>>>>>>>>>>>>> kirpic...@google.com>: >>>>>>>>>>>>>> >>>>>>>>>>>>>>> "Machine state" is overly low-level because many of the >>>>>>>>>>>>>>> possible reasons can happen on a perfectly fine machine. >>>>>>>>>>>>>>> If you'd like to rephrase it to "it will be called except in >>>>>>>>>>>>>>> various situations where it's logically impossible or >>>>>>>>>>>>>>> impractical to >>>>>>>>>>>>>>> guarantee that it's called", that's fine. Or you can list some >>>>>>>>>>>>>>> of the >>>>>>>>>>>>>>> examples above. >>>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> Sounds ok to me >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> The main point for the user is, you *will* see >>>>>>>>>>>>>>> non-preventable situations where it couldn't be called - it's >>>>>>>>>>>>>>> not just >>>>>>>>>>>>>>> intergalactic crashes - so if the logic is very important (e.g. >>>>>>>>>>>>>>> cleaning up >>>>>>>>>>>>>>> a large amount of temporary files, shutting down a large number >>>>>>>>>>>>>>> of VMs you >>>>>>>>>>>>>>> started etc), you have to express it using one of the other >>>>>>>>>>>>>>> methods that >>>>>>>>>>>>>>> have stricter guarantees (which obviously come at a cost, e.g. >>>>>>>>>>>>>>> no >>>>>>>>>>>>>>> pass-by-reference). >>>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> FinishBundle has the exact same guarantee sadly so not which >>>>>>>>>>>>>> which other method you speak about. Concretely if you make it >>>>>>>>>>>>>> really >>>>>>>>>>>>>> unreliable - this is what best effort sounds to me - then users >>>>>>>>>>>>>> can use it >>>>>>>>>>>>>> to clean anything but if you make it "can happen but it is >>>>>>>>>>>>>> unexpected and >>>>>>>>>>>>>> means something happent" then it is fine to have a manual - or >>>>>>>>>>>>>> auto if >>>>>>>>>>>>>> fancy - recovery procedure. This is where it makes all the >>>>>>>>>>>>>> difference and >>>>>>>>>>>>>> impacts the developpers, ops (all users basically). >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> On Sun, Feb 18, 2018 at 9:16 AM Romain Manni-Bucau < >>>>>>>>>>>>>>> rmannibu...@gmail.com> wrote: >>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> Agree Eugene except that "best effort" means that. It is >>>>>>>>>>>>>>>> also often used to say "at will" and this is what triggered >>>>>>>>>>>>>>>> this thread. >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> I'm fine using "except if the machine state prevents it" >>>>>>>>>>>>>>>> but "best effort" is too open and can be very badly and >>>>>>>>>>>>>>>> wrongly perceived >>>>>>>>>>>>>>>> by users (like I did). >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> Romain Manni-Bucau >>>>>>>>>>>>>>>> @rmannibucau <https://twitter.com/rmannibucau> | Blog >>>>>>>>>>>>>>>> <https://rmannibucau.metawerx.net/> | Old Blog >>>>>>>>>>>>>>>> <http://rmannibucau.wordpress.com> | Github >>>>>>>>>>>>>>>> <https://github.com/rmannibucau> | LinkedIn >>>>>>>>>>>>>>>> <https://www.linkedin.com/in/rmannibucau> | Book >>>>>>>>>>>>>>>> <https://www.packtpub.com/application-development/java-ee-8-high-performance> >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> 2018-02-18 18:13 GMT+01:00 Eugene Kirpichov < >>>>>>>>>>>>>>>> kirpic...@google.com>: >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> It will not be called if it's impossible to call it: in >>>>>>>>>>>>>>>>> the example situation you have (intergalactic crash), and in >>>>>>>>>>>>>>>>> a number of >>>>>>>>>>>>>>>>> more common cases: eg in case the worker container has >>>>>>>>>>>>>>>>> crashed (eg user >>>>>>>>>>>>>>>>> code in a different thread called a C library over JNI and it >>>>>>>>>>>>>>>>> segfaulted), >>>>>>>>>>>>>>>>> JVM bug, crash due to user code OOM, in case the worker has >>>>>>>>>>>>>>>>> lost network >>>>>>>>>>>>>>>>> connectivity (then it may be called but it won't be able to >>>>>>>>>>>>>>>>> do anything >>>>>>>>>>>>>>>>> useful), in case this is running on a preemptible VM and it >>>>>>>>>>>>>>>>> was preempted >>>>>>>>>>>>>>>>> by the underlying cluster manager without notice or if the >>>>>>>>>>>>>>>>> worker was too >>>>>>>>>>>>>>>>> busy with other stuff (eg calling other Teardown functions) >>>>>>>>>>>>>>>>> until the >>>>>>>>>>>>>>>>> preemption timeout elapsed, in case the underlying hardware >>>>>>>>>>>>>>>>> simply failed >>>>>>>>>>>>>>>>> (which happens quite often at scale), and in many other >>>>>>>>>>>>>>>>> conditions. >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> "Best effort" is the commonly used term to describe such >>>>>>>>>>>>>>>>> behavior. Please feel free to file bugs for cases where you >>>>>>>>>>>>>>>>> observed a >>>>>>>>>>>>>>>>> runner not call Teardown in a situation where it was possible >>>>>>>>>>>>>>>>> to call it >>>>>>>>>>>>>>>>> but the runner made insufficient effort. >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> On Sun, Feb 18, 2018, 9:02 AM Romain Manni-Bucau < >>>>>>>>>>>>>>>>> rmannibu...@gmail.com> wrote: >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> 2018-02-18 18:00 GMT+01:00 Eugene Kirpichov < >>>>>>>>>>>>>>>>>> kirpic...@google.com>: >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> On Sun, Feb 18, 2018, 2:06 AM Romain Manni-Bucau < >>>>>>>>>>>>>>>>>>> rmannibu...@gmail.com> wrote: >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> Le 18 févr. 2018 00:23, "Kenneth Knowles" < >>>>>>>>>>>>>>>>>>>> k...@google.com> a écrit : >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> On Sat, Feb 17, 2018 at 3:09 PM, Romain Manni-Bucau < >>>>>>>>>>>>>>>>>>>> rmannibu...@gmail.com> wrote: >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> If you give an example of a high-level need (e.g. "I'm >>>>>>>>>>>>>>>>>>>>> trying to write an IO for system $x and it requires the >>>>>>>>>>>>>>>>>>>>> following >>>>>>>>>>>>>>>>>>>>> initialization and the following cleanup logic and the >>>>>>>>>>>>>>>>>>>>> following processing >>>>>>>>>>>>>>>>>>>>> in between") I'll be better able to help you. >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> Take a simple example of a transform requiring a >>>>>>>>>>>>>>>>>>>>> connection. Using bundles is a perf killer since size is >>>>>>>>>>>>>>>>>>>>> not controlled. >>>>>>>>>>>>>>>>>>>>> Using teardown doesnt allow you to release the connection >>>>>>>>>>>>>>>>>>>>> since it is a >>>>>>>>>>>>>>>>>>>>> best effort thing. Not releasing the connection makes you >>>>>>>>>>>>>>>>>>>>> pay a lot - aws >>>>>>>>>>>>>>>>>>>>> ;) - or prevents you to launch other processings - >>>>>>>>>>>>>>>>>>>>> concurrent limit. >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> For this example @Teardown is an exact fit. If things >>>>>>>>>>>>>>>>>>>> die so badly that @Teardown is not called then nothing >>>>>>>>>>>>>>>>>>>> else can be called >>>>>>>>>>>>>>>>>>>> to close the connection either. What AWS service are you >>>>>>>>>>>>>>>>>>>> thinking of that >>>>>>>>>>>>>>>>>>>> stays open for a long time when everything at the other >>>>>>>>>>>>>>>>>>>> end has died? >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> You assume connections are kind of stateless but some >>>>>>>>>>>>>>>>>>>> (proprietary) protocols requires some closing exchanges >>>>>>>>>>>>>>>>>>>> which are not only >>>>>>>>>>>>>>>>>>>> "im leaving". >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> For aws i was thinking about starting some services - >>>>>>>>>>>>>>>>>>>> machines - on the fly in a pipeline startup and closing >>>>>>>>>>>>>>>>>>>> them at the end. If >>>>>>>>>>>>>>>>>>>> teardown is not called you leak machines and money. You >>>>>>>>>>>>>>>>>>>> can say it can be >>>>>>>>>>>>>>>>>>>> done another way...as the full pipeline ;). >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> I dont want to be picky but if beam cant handle its >>>>>>>>>>>>>>>>>>>> components lifecycle it can be used at scale for generic >>>>>>>>>>>>>>>>>>>> pipelines and if >>>>>>>>>>>>>>>>>>>> bound to some particular IO. >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> What does prevent to enforce teardown - ignoring the >>>>>>>>>>>>>>>>>>>> interstellar crash case which cant be handled by any human >>>>>>>>>>>>>>>>>>>> system? Nothing >>>>>>>>>>>>>>>>>>>> technically. Why do you push to not handle it? Is it due >>>>>>>>>>>>>>>>>>>> to some legacy >>>>>>>>>>>>>>>>>>>> code on dataflow or something else? >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> Teardown *is* already documented and implemented this >>>>>>>>>>>>>>>>>>> way (best-effort). So I'm not sure what kind of change >>>>>>>>>>>>>>>>>>> you're asking for. >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> Remove "best effort" from the javadoc. If it is not call >>>>>>>>>>>>>>>>>> then it is a bug and we are done :). >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> Also what does it mean for the users? Direct runner >>>>>>>>>>>>>>>>>>>> does it so if a user udes the RI in test, he will get a >>>>>>>>>>>>>>>>>>>> different behavior >>>>>>>>>>>>>>>>>>>> in prod? Also dont forget the user doesnt know what the >>>>>>>>>>>>>>>>>>>> IOs he composes use >>>>>>>>>>>>>>>>>>>> so this is so impacting for the whole product than he must >>>>>>>>>>>>>>>>>>>> be handled IMHO. >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> I understand the portability culture is new in big data >>>>>>>>>>>>>>>>>>>> world but it is not a reason to ignore what people did for >>>>>>>>>>>>>>>>>>>> years and do it >>>>>>>>>>>>>>>>>>>> wrong before doing right ;). >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> My proposal is to list what can prevent to guarantee - >>>>>>>>>>>>>>>>>>>> in the normal IT conditions - the execution of teardown. >>>>>>>>>>>>>>>>>>>> Then we see if we >>>>>>>>>>>>>>>>>>>> can handle it and only if there is a technical reason we >>>>>>>>>>>>>>>>>>>> cant we make it >>>>>>>>>>>>>>>>>>>> experimental/unsupported in the api. I know spark and >>>>>>>>>>>>>>>>>>>> flink can, any >>>>>>>>>>>>>>>>>>>> unknown blocker for other runners? >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> Technical note: even a kill should go through java >>>>>>>>>>>>>>>>>>>> shutdown hooks otherwise your environment (beam enclosing >>>>>>>>>>>>>>>>>>>> software) is >>>>>>>>>>>>>>>>>>>> fully unhandled and your overall system is uncontrolled. >>>>>>>>>>>>>>>>>>>> Only case where it >>>>>>>>>>>>>>>>>>>> is not true is when the software is always owned by a >>>>>>>>>>>>>>>>>>>> vendor and never >>>>>>>>>>>>>>>>>>>> installed on customer environment. In this case it belongd >>>>>>>>>>>>>>>>>>>> to the vendor to >>>>>>>>>>>>>>>>>>>> handle beam API and not to beam to adjust its API for a >>>>>>>>>>>>>>>>>>>> vendor - otherwise >>>>>>>>>>>>>>>>>>>> all unsupported features by one runner should be made >>>>>>>>>>>>>>>>>>>> optional right? >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> All state is not about network, even in distributed >>>>>>>>>>>>>>>>>>>> systems so this is key to have an explicit and defined >>>>>>>>>>>>>>>>>>>> lifecycle. >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> Kenn >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>> >>>>>>> >>>>>> >>>>> >>>>