Restarting doesnt mean you dont call teardown. Except a bug there is no reason - technically - it happens, no reason.
Le 19 févr. 2018 21:14, "Reuven Lax" <re...@google.com> a écrit : > Workers restarting is not a bug, it's standard often expected. > > On Mon, Feb 19, 2018, 12:03 PM Romain Manni-Bucau <rmannibu...@gmail.com> > wrote: > >> Nothing, as mentionned it is a bug so recovery is a bug recovery >> (procedure) >> >> Le 19 févr. 2018 19:42, "Eugene Kirpichov" <kirpic...@google.com> a >> écrit : >> >>> So what would you like to happen if there is a crash? The DoFn instance >>> no longer exists because the JVM it ran on no longer exists. What should >>> Teardown be called on? >>> >>> On Mon, Feb 19, 2018, 10:20 AM Romain Manni-Bucau <rmannibu...@gmail.com> >>> wrote: >>> >>>> This is what i want and not 999999 teardowns for 1000000 setups until >>>> there is an unexpected crash (= a bug). >>>> >>>> Le 19 févr. 2018 18:57, "Reuven Lax" <re...@google.com> a écrit : >>>> >>>>> >>>>> >>>>> On Mon, Feb 19, 2018 at 7:11 AM, Romain Manni-Bucau < >>>>> rmannibu...@gmail.com> wrote: >>>>> >>>>>> >>>>>> >>>>>> 2018-02-19 15:57 GMT+01:00 Reuven Lax <re...@google.com>: >>>>>> >>>>>>> >>>>>>> >>>>>>> On Mon, Feb 19, 2018 at 12:35 AM, Romain Manni-Bucau < >>>>>>> rmannibu...@gmail.com> wrote: >>>>>>> >>>>>>>> @Reuven: in practise it is created by pool of 256 but leads to the >>>>>>>> same pattern, the teardown is just a "if (iCreatedThem) releaseThem();" >>>>>>>> >>>>>>> >>>>>>> How do you control "256?" Even if you have a pool of 256 workers, >>>>>>> nothing in Beam guarantees how many threads and DoFns are created per >>>>>>> worker. In theory the runner might decide to create 1000 threads on each >>>>>>> worker. >>>>>>> >>>>>> >>>>>> Nop was the other way around, in this case on AWS you can get 256 >>>>>> instances at once but not 512 (which will be 2x256). So when you compute >>>>>> the distribution you allocate to some fn the role to own the instance >>>>>> lookup and releasing. >>>>>> >>>>> >>>>> I still don't understand. Let's be more precise. If you write the >>>>> following code: >>>>> >>>>> pCollection.apply(ParDo.of(new MyDoFn())); >>>>> >>>>> There is no way to control how many instances of MyDoFn are created. >>>>> The runner might decided to create a million instances of this class >>>>> across >>>>> your worker pool, which means that you will get a million Setup and >>>>> Teardown calls. >>>>> >>>>> >>>>>> Anyway this was just an example of an external resource you must >>>>>> release. Real topic is that beam should define asap a guaranteed generic >>>>>> lifecycle to let user embrace its programming model. >>>>>> >>>>>> >>>>>>> >>>>>>> >>>>>>> >>>>>>>> @Eugene: >>>>>>>> 1. wait logic is about passing the value which is not always >>>>>>>> possible (like 15% of cases from my raw estimate) >>>>>>>> 2. sdf: i'll try to detail why i mention SDF more here >>>>>>>> >>>>>>>> >>>>>>>> Concretely beam exposes a portable API (included in the SDK core). >>>>>>>> This API defines a *container* API and therefore implies bean >>>>>>>> lifecycles. >>>>>>>> I'll not detail them all but just use the sources and dofn (not sdf) to >>>>>>>> illustrate the idea I'm trying to develop. >>>>>>>> >>>>>>>> A. Source >>>>>>>> >>>>>>>> A source computes a partition plan with 2 primitives: estimateSize >>>>>>>> and split. As an user you can expect both to be called on the same bean >>>>>>>> instance to avoid to pay the same connection cost(s) twice. Concretely: >>>>>>>> >>>>>>>> connect() >>>>>>>> try { >>>>>>>> estimateSize() >>>>>>>> split() >>>>>>>> } finally { >>>>>>>> disconnect() >>>>>>>> } >>>>>>>> >>>>>>>> this is not guaranteed by the API so you must do: >>>>>>>> >>>>>>>> connect() >>>>>>>> try { >>>>>>>> estimateSize() >>>>>>>> } finally { >>>>>>>> disconnect() >>>>>>>> } >>>>>>>> connect() >>>>>>>> try { >>>>>>>> split() >>>>>>>> } finally { >>>>>>>> disconnect() >>>>>>>> } >>>>>>>> >>>>>>>> + a workaround with an internal estimate size since this primitive >>>>>>>> is often called in split but you dont want to connect twice in the >>>>>>>> second >>>>>>>> phase. >>>>>>>> >>>>>>>> Why do you need that? Simply cause you want to define an API to >>>>>>>> implement sources which initializes the source bean and destroys it. >>>>>>>> I insists it is a very very basic concern for such API. However >>>>>>>> beam doesn't embraces it and doesn't assume it so building any API on >>>>>>>> top >>>>>>>> of beam is very hurtful today and for direct beam users you hit the >>>>>>>> exact >>>>>>>> same issues - check how IO are implemented, the static utilities which >>>>>>>> create volatile connections preventing to reuse existing connection in >>>>>>>> a >>>>>>>> single method (https://github.com/apache/ >>>>>>>> beam/blob/master/sdks/java/io/elasticsearch/src/main/java/ >>>>>>>> org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java#L862). >>>>>>>> >>>>>>>> Same logic applies to the reader which is then created. >>>>>>>> >>>>>>>> B. DoFn & SDF >>>>>>>> >>>>>>>> As a fn dev you expect the same from the beam runtime: init(); try >>>>>>>> { while (...) process(); } finally { destroy(); } and that it is >>>>>>>> executed >>>>>>>> on the exact same instance to be able to be stateful at that level for >>>>>>>> expensive connections/operations/flow state handling. >>>>>>>> >>>>>>>> As you mentionned with the million example, this sequence should >>>>>>>> happen for each single instance so 1M times for your example. >>>>>>>> >>>>>>>> Now why did I mention SDF several times? Because SDF is a >>>>>>>> generalisation of both cases (source and dofn). Therefore it creates >>>>>>>> way >>>>>>>> more instances and requires to have a way more strict/explicit >>>>>>>> definition >>>>>>>> of the exact lifecycle and which instance does what. Since beam >>>>>>>> handles the >>>>>>>> full lifecycle of the bean instances it must provide init/destroy hooks >>>>>>>> (setup/teardown) which can be stateful. >>>>>>>> >>>>>>>> If you take the JDBC example which was mentionned earlier. Today, >>>>>>>> because of the teardown issue it uses bundles. Since bundles size is >>>>>>>> not >>>>>>>> defined - and will not with SDF, it must use a pool to be able to >>>>>>>> reuse a >>>>>>>> connection instance to not correct performances. Now with the SDF and >>>>>>>> the >>>>>>>> split increase, how do you handle the pool size? Generally in batch >>>>>>>> you use >>>>>>>> a single connection per thread to avoid to consume all database >>>>>>>> connections. With a pool you have 2 choices: 1. use a pool of 1, 2. >>>>>>>> use a >>>>>>>> pool a bit higher but multiplied by the number of beans you will >>>>>>>> likely x2 >>>>>>>> or 3 the connection count and make the execution fail with "no more >>>>>>>> connection available". I you picked 1 (pool of #1), then you still >>>>>>>> have to >>>>>>>> have a reliable teardown by pool instance (close() generally) to >>>>>>>> ensure you >>>>>>>> release the pool and don't leak the connection information in the JVM. >>>>>>>> In >>>>>>>> all case you come back to the init()/destroy() lifecycle even if you >>>>>>>> fake >>>>>>>> to get connections with bundles. >>>>>>>> >>>>>>>> Just to make it obvious: SDF mentions are just cause SDF imply all >>>>>>>> the current issues with the loose definition of the bean lifecycles at >>>>>>>> an >>>>>>>> exponential level, nothing else. >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> Romain Manni-Bucau >>>>>>>> @rmannibucau <https://twitter.com/rmannibucau> | Blog >>>>>>>> <https://rmannibucau.metawerx.net/> | Old Blog >>>>>>>> <http://rmannibucau.wordpress.com> | Github >>>>>>>> <https://github.com/rmannibucau> | LinkedIn >>>>>>>> <https://www.linkedin.com/in/rmannibucau> | Book >>>>>>>> <https://www.packtpub.com/application-development/java-ee-8-high-performance> >>>>>>>> >>>>>>>> 2018-02-18 22:32 GMT+01:00 Eugene Kirpichov <kirpic...@google.com>: >>>>>>>> >>>>>>>>> The kind of whole-transform lifecycle you're mentioning can be >>>>>>>>> accomplished using the Wait transform as I suggested in the thread >>>>>>>>> above, >>>>>>>>> and I believe it should become the canonical way to do that. >>>>>>>>> >>>>>>>>> (Would like to reiterate one more time, as the main author of most >>>>>>>>> design documents related to SDF and of its implementation in the Java >>>>>>>>> direct and dataflow runner that SDF is fully unrelated to the topic of >>>>>>>>> cleanup - I'm very confused as to why it keeps coming up) >>>>>>>>> >>>>>>>>> On Sun, Feb 18, 2018, 1:15 PM Romain Manni-Bucau < >>>>>>>>> rmannibu...@gmail.com> wrote: >>>>>>>>> >>>>>>>>>> I kind of agree except transforms lack a lifecycle too. My >>>>>>>>>> understanding is that sdf could be a way to unify it and clean the >>>>>>>>>> api. >>>>>>>>>> >>>>>>>>>> Otherwise how to normalize - single api - lifecycle of >>>>>>>>>> transforms? >>>>>>>>>> >>>>>>>>>> Le 18 févr. 2018 21:32, "Ben Chambers" <bchamb...@apache.org> a >>>>>>>>>> écrit : >>>>>>>>>> >>>>>>>>>>> Are you sure that focusing on the cleanup of specific DoFn's is >>>>>>>>>>> appropriate? Many cases where cleanup is necessary, it is around an >>>>>>>>>>> entire >>>>>>>>>>> composite PTransform. I think there have been discussions/proposals >>>>>>>>>>> around >>>>>>>>>>> a more methodical "cleanup" option, but those haven't been >>>>>>>>>>> implemented, to >>>>>>>>>>> the best of my knowledge. >>>>>>>>>>> >>>>>>>>>>> For instance, consider the steps of a FileIO: >>>>>>>>>>> 1. Write to a bunch (N shards) of temporary files >>>>>>>>>>> 2. When all temporary files are complete, attempt to do a bulk >>>>>>>>>>> copy to put them in the final destination. >>>>>>>>>>> 3. Cleanup all the temporary files. >>>>>>>>>>> >>>>>>>>>>> (This is often desirable because it minimizes the chance of >>>>>>>>>>> seeing partial/incomplete results in the final destination). >>>>>>>>>>> >>>>>>>>>>> In the above, you'd want step 1 to execute on many workers, >>>>>>>>>>> likely using a ParDo (say N different workers). >>>>>>>>>>> The move step should only happen once, so on one worker. This >>>>>>>>>>> means it will be a different DoFn, likely with some stuff done to >>>>>>>>>>> ensure it >>>>>>>>>>> runs on one worker. >>>>>>>>>>> >>>>>>>>>>> In such a case, cleanup / @TearDown of the DoFn is not enough. >>>>>>>>>>> We need an API for a PTransform to schedule some cleanup work for >>>>>>>>>>> when the >>>>>>>>>>> transform is "done". In batch this is relatively straightforward, >>>>>>>>>>> but >>>>>>>>>>> doesn't exist. This is the source of some problems, such as >>>>>>>>>>> BigQuery sink >>>>>>>>>>> leaving files around that have failed to import into BigQuery. >>>>>>>>>>> >>>>>>>>>>> In streaming this is less straightforward -- do you want to wait >>>>>>>>>>> until the end of the pipeline? Or do you want to wait until the end >>>>>>>>>>> of the >>>>>>>>>>> window? In practice, you just want to wait until you know nobody >>>>>>>>>>> will need >>>>>>>>>>> the resource anymore. >>>>>>>>>>> >>>>>>>>>>> This led to some discussions around a "cleanup" API, where you >>>>>>>>>>> could have a transform that output resource objects. Each resource >>>>>>>>>>> object >>>>>>>>>>> would have logic for cleaning it up. And there would be something >>>>>>>>>>> that >>>>>>>>>>> indicated what parts of the pipeline needed that resource, and what >>>>>>>>>>> kind of >>>>>>>>>>> temporal lifetime those objects had. As soon as that part of the >>>>>>>>>>> pipeline >>>>>>>>>>> had advanced far enough that it would no longer need the resources, >>>>>>>>>>> they >>>>>>>>>>> would get cleaned up. This can be done at pipeline shutdown, or >>>>>>>>>>> incrementally during a streaming pipeline, etc. >>>>>>>>>>> >>>>>>>>>>> Would something like this be a better fit for your use case? If >>>>>>>>>>> not, why is handling teardown within a single DoFn sufficient? >>>>>>>>>>> >>>>>>>>>>> On Sun, Feb 18, 2018 at 11:53 AM Romain Manni-Bucau < >>>>>>>>>>> rmannibu...@gmail.com> wrote: >>>>>>>>>>> >>>>>>>>>>>> Yes 1M. Lets try to explain you simplifying the overall >>>>>>>>>>>> execution. Each instance - one fn so likely in a thread of a >>>>>>>>>>>> worker - has >>>>>>>>>>>> its lifecycle. Caricaturally: "new" and garbage collection. >>>>>>>>>>>> >>>>>>>>>>>> In practise, new is often an unsafe allocate (deserialization) >>>>>>>>>>>> but it doesnt matter here. >>>>>>>>>>>> >>>>>>>>>>>> What i want is any "new" to have a following setup before any >>>>>>>>>>>> process or stattbundle and the last time beam has the instance >>>>>>>>>>>> before it is >>>>>>>>>>>> gc-ed and after last finishbundle it calls teardown. >>>>>>>>>>>> >>>>>>>>>>>> It is as simple as it. >>>>>>>>>>>> This way no need to comibe fn in a way making a fn not self >>>>>>>>>>>> contained to implement basic transforms. >>>>>>>>>>>> >>>>>>>>>>>> Le 18 févr. 2018 20:07, "Reuven Lax" <re...@google.com> a >>>>>>>>>>>> écrit : >>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> On Sun, Feb 18, 2018 at 10:50 AM, Romain Manni-Bucau < >>>>>>>>>>>>> rmannibu...@gmail.com> wrote: >>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> Le 18 févr. 2018 19:28, "Ben Chambers" <bchamb...@apache.org> >>>>>>>>>>>>>> a écrit : >>>>>>>>>>>>>> >>>>>>>>>>>>>> It feels like his thread may be a bit off-track. Rather than >>>>>>>>>>>>>> focusing on the semantics of the existing methods -- which have >>>>>>>>>>>>>> been noted >>>>>>>>>>>>>> to be meet many existing use cases -- it would be helpful to >>>>>>>>>>>>>> focus on more >>>>>>>>>>>>>> on the reason you are looking for something with different >>>>>>>>>>>>>> semantics. >>>>>>>>>>>>>> >>>>>>>>>>>>>> Some possibilities (I'm not sure which one you are trying to >>>>>>>>>>>>>> do): >>>>>>>>>>>>>> >>>>>>>>>>>>>> 1. Clean-up some external, global resource, that was >>>>>>>>>>>>>> initialized once during the startup of the pipeline. If this is >>>>>>>>>>>>>> the case, >>>>>>>>>>>>>> how are you ensuring it was really only initialized once (and >>>>>>>>>>>>>> not once per >>>>>>>>>>>>>> worker, per thread, per instance, etc.)? How do you know when >>>>>>>>>>>>>> the pipeline >>>>>>>>>>>>>> should release it? If the answer is "when it reaches step X", >>>>>>>>>>>>>> then what >>>>>>>>>>>>>> about a streaming pipeline? >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> When the dofn is no more needed logically ie when the batch >>>>>>>>>>>>>> is done or stream is stopped (manually or by a jvm shutdown) >>>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> I'm really not following what this means. >>>>>>>>>>>>> >>>>>>>>>>>>> Let's say that a pipeline is running 1000 workers, and each >>>>>>>>>>>>> worker is running 1000 threads (each running a copy of the same >>>>>>>>>>>>> DoFn). How >>>>>>>>>>>>> many cleanups do you want (do you want 1000 * 1000 = 1M cleanups) >>>>>>>>>>>>> and when >>>>>>>>>>>>> do you want it called? When the entire pipeline is shut down? >>>>>>>>>>>>> When an >>>>>>>>>>>>> individual worker is about to shut down (which may be temporary - >>>>>>>>>>>>> may be >>>>>>>>>>>>> about to start back up)? Something else? >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> 2. Finalize some resources that are used within some region >>>>>>>>>>>>>> of the pipeline. While, the DoFn lifecycle methods are not a >>>>>>>>>>>>>> good fit for >>>>>>>>>>>>>> this (they are focused on managing resources within the DoFn), >>>>>>>>>>>>>> you could >>>>>>>>>>>>>> model this on how FileIO finalizes the files that it produced. >>>>>>>>>>>>>> For instance: >>>>>>>>>>>>>> a) ParDo generates "resource IDs" (or some token that >>>>>>>>>>>>>> stores information about resources) >>>>>>>>>>>>>> b) "Require Deterministic Input" (to prevent retries from >>>>>>>>>>>>>> changing resource IDs) >>>>>>>>>>>>>> c) ParDo that initializes the resources >>>>>>>>>>>>>> d) Pipeline segments that use the resources, and >>>>>>>>>>>>>> eventually output the fact they're done >>>>>>>>>>>>>> e) "Require Deterministic Input" >>>>>>>>>>>>>> f) ParDo that frees the resources >>>>>>>>>>>>>> >>>>>>>>>>>>>> By making the use of the resource part of the data it is >>>>>>>>>>>>>> possible to "checkpoint" which resources may be in use or have >>>>>>>>>>>>>> been >>>>>>>>>>>>>> finished by using the require deterministic input. This is >>>>>>>>>>>>>> important to >>>>>>>>>>>>>> ensuring everything is actually cleaned up. >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> I nees that but generic and not case by case to industrialize >>>>>>>>>>>>>> some api on top of beam. >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> 3. Some other use case that I may be missing? If it is this >>>>>>>>>>>>>> case, could you elaborate on what you are trying to accomplish? >>>>>>>>>>>>>> That would >>>>>>>>>>>>>> help me understand both the problems with existing options and >>>>>>>>>>>>>> possibly >>>>>>>>>>>>>> what could be done to help. >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> I understand there are sorkaround for almost all cases but >>>>>>>>>>>>>> means each transform is different in its lifecycle handling >>>>>>>>>>>>>> except i >>>>>>>>>>>>>> dislike it a lot at a scale and as a user since you cant put any >>>>>>>>>>>>>> unified >>>>>>>>>>>>>> practise on top of beam, it also makes beam very hard to >>>>>>>>>>>>>> integrate or to >>>>>>>>>>>>>> use to build higher level libraries or softwares. >>>>>>>>>>>>>> >>>>>>>>>>>>>> This is why i tried to not start the workaround discussions >>>>>>>>>>>>>> and just stay at API level. >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> -- Ben >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> On Sun, Feb 18, 2018 at 9:56 AM Romain Manni-Bucau < >>>>>>>>>>>>>> rmannibu...@gmail.com> wrote: >>>>>>>>>>>>>> >>>>>>>>>>>>>>> 2018-02-18 18:36 GMT+01:00 Eugene Kirpichov < >>>>>>>>>>>>>>> kirpic...@google.com>: >>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> "Machine state" is overly low-level because many of the >>>>>>>>>>>>>>>> possible reasons can happen on a perfectly fine machine. >>>>>>>>>>>>>>>> If you'd like to rephrase it to "it will be called except >>>>>>>>>>>>>>>> in various situations where it's logically impossible or >>>>>>>>>>>>>>>> impractical to >>>>>>>>>>>>>>>> guarantee that it's called", that's fine. Or you can list some >>>>>>>>>>>>>>>> of the >>>>>>>>>>>>>>>> examples above. >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> Sounds ok to me >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> The main point for the user is, you *will* see >>>>>>>>>>>>>>>> non-preventable situations where it couldn't be called - it's >>>>>>>>>>>>>>>> not just >>>>>>>>>>>>>>>> intergalactic crashes - so if the logic is very important >>>>>>>>>>>>>>>> (e.g. cleaning up >>>>>>>>>>>>>>>> a large amount of temporary files, shutting down a large >>>>>>>>>>>>>>>> number of VMs you >>>>>>>>>>>>>>>> started etc), you have to express it using one of the other >>>>>>>>>>>>>>>> methods that >>>>>>>>>>>>>>>> have stricter guarantees (which obviously come at a cost, e.g. >>>>>>>>>>>>>>>> no >>>>>>>>>>>>>>>> pass-by-reference). >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> FinishBundle has the exact same guarantee sadly so not which >>>>>>>>>>>>>>> which other method you speak about. Concretely if you make it >>>>>>>>>>>>>>> really >>>>>>>>>>>>>>> unreliable - this is what best effort sounds to me - then users >>>>>>>>>>>>>>> can use it >>>>>>>>>>>>>>> to clean anything but if you make it "can happen but it is >>>>>>>>>>>>>>> unexpected and >>>>>>>>>>>>>>> means something happent" then it is fine to have a manual - or >>>>>>>>>>>>>>> auto if >>>>>>>>>>>>>>> fancy - recovery procedure. This is where it makes all the >>>>>>>>>>>>>>> difference and >>>>>>>>>>>>>>> impacts the developpers, ops (all users basically). >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> On Sun, Feb 18, 2018 at 9:16 AM Romain Manni-Bucau < >>>>>>>>>>>>>>>> rmannibu...@gmail.com> wrote: >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> Agree Eugene except that "best effort" means that. It is >>>>>>>>>>>>>>>>> also often used to say "at will" and this is what triggered >>>>>>>>>>>>>>>>> this thread. >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> I'm fine using "except if the machine state prevents it" >>>>>>>>>>>>>>>>> but "best effort" is too open and can be very badly and >>>>>>>>>>>>>>>>> wrongly perceived >>>>>>>>>>>>>>>>> by users (like I did). >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> Romain Manni-Bucau >>>>>>>>>>>>>>>>> @rmannibucau <https://twitter.com/rmannibucau> | Blog >>>>>>>>>>>>>>>>> <https://rmannibucau.metawerx.net/> | Old Blog >>>>>>>>>>>>>>>>> <http://rmannibucau.wordpress.com> | Github >>>>>>>>>>>>>>>>> <https://github.com/rmannibucau> | LinkedIn >>>>>>>>>>>>>>>>> <https://www.linkedin.com/in/rmannibucau> | Book >>>>>>>>>>>>>>>>> <https://www.packtpub.com/application-development/java-ee-8-high-performance> >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> 2018-02-18 18:13 GMT+01:00 Eugene Kirpichov < >>>>>>>>>>>>>>>>> kirpic...@google.com>: >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> It will not be called if it's impossible to call it: in >>>>>>>>>>>>>>>>>> the example situation you have (intergalactic crash), and in >>>>>>>>>>>>>>>>>> a number of >>>>>>>>>>>>>>>>>> more common cases: eg in case the worker container has >>>>>>>>>>>>>>>>>> crashed (eg user >>>>>>>>>>>>>>>>>> code in a different thread called a C library over JNI and >>>>>>>>>>>>>>>>>> it segfaulted), >>>>>>>>>>>>>>>>>> JVM bug, crash due to user code OOM, in case the worker has >>>>>>>>>>>>>>>>>> lost network >>>>>>>>>>>>>>>>>> connectivity (then it may be called but it won't be able to >>>>>>>>>>>>>>>>>> do anything >>>>>>>>>>>>>>>>>> useful), in case this is running on a preemptible VM and it >>>>>>>>>>>>>>>>>> was preempted >>>>>>>>>>>>>>>>>> by the underlying cluster manager without notice or if the >>>>>>>>>>>>>>>>>> worker was too >>>>>>>>>>>>>>>>>> busy with other stuff (eg calling other Teardown functions) >>>>>>>>>>>>>>>>>> until the >>>>>>>>>>>>>>>>>> preemption timeout elapsed, in case the underlying hardware >>>>>>>>>>>>>>>>>> simply failed >>>>>>>>>>>>>>>>>> (which happens quite often at scale), and in many other >>>>>>>>>>>>>>>>>> conditions. >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> "Best effort" is the commonly used term to describe such >>>>>>>>>>>>>>>>>> behavior. Please feel free to file bugs for cases where you >>>>>>>>>>>>>>>>>> observed a >>>>>>>>>>>>>>>>>> runner not call Teardown in a situation where it was >>>>>>>>>>>>>>>>>> possible to call it >>>>>>>>>>>>>>>>>> but the runner made insufficient effort. >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> On Sun, Feb 18, 2018, 9:02 AM Romain Manni-Bucau < >>>>>>>>>>>>>>>>>> rmannibu...@gmail.com> wrote: >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> 2018-02-18 18:00 GMT+01:00 Eugene Kirpichov < >>>>>>>>>>>>>>>>>>> kirpic...@google.com>: >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> On Sun, Feb 18, 2018, 2:06 AM Romain Manni-Bucau < >>>>>>>>>>>>>>>>>>>> rmannibu...@gmail.com> wrote: >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> Le 18 févr. 2018 00:23, "Kenneth Knowles" < >>>>>>>>>>>>>>>>>>>>> k...@google.com> a écrit : >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> On Sat, Feb 17, 2018 at 3:09 PM, Romain Manni-Bucau < >>>>>>>>>>>>>>>>>>>>> rmannibu...@gmail.com> wrote: >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> If you give an example of a high-level need (e.g. >>>>>>>>>>>>>>>>>>>>>> "I'm trying to write an IO for system $x and it requires >>>>>>>>>>>>>>>>>>>>>> the following >>>>>>>>>>>>>>>>>>>>>> initialization and the following cleanup logic and the >>>>>>>>>>>>>>>>>>>>>> following processing >>>>>>>>>>>>>>>>>>>>>> in between") I'll be better able to help you. >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> Take a simple example of a transform requiring a >>>>>>>>>>>>>>>>>>>>>> connection. Using bundles is a perf killer since size is >>>>>>>>>>>>>>>>>>>>>> not controlled. >>>>>>>>>>>>>>>>>>>>>> Using teardown doesnt allow you to release the >>>>>>>>>>>>>>>>>>>>>> connection since it is a >>>>>>>>>>>>>>>>>>>>>> best effort thing. Not releasing the connection makes >>>>>>>>>>>>>>>>>>>>>> you pay a lot - aws >>>>>>>>>>>>>>>>>>>>>> ;) - or prevents you to launch other processings - >>>>>>>>>>>>>>>>>>>>>> concurrent limit. >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> For this example @Teardown is an exact fit. If things >>>>>>>>>>>>>>>>>>>>> die so badly that @Teardown is not called then nothing >>>>>>>>>>>>>>>>>>>>> else can be called >>>>>>>>>>>>>>>>>>>>> to close the connection either. What AWS service are you >>>>>>>>>>>>>>>>>>>>> thinking of that >>>>>>>>>>>>>>>>>>>>> stays open for a long time when everything at the other >>>>>>>>>>>>>>>>>>>>> end has died? >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> You assume connections are kind of stateless but some >>>>>>>>>>>>>>>>>>>>> (proprietary) protocols requires some closing exchanges >>>>>>>>>>>>>>>>>>>>> which are not only >>>>>>>>>>>>>>>>>>>>> "im leaving". >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> For aws i was thinking about starting some services - >>>>>>>>>>>>>>>>>>>>> machines - on the fly in a pipeline startup and closing >>>>>>>>>>>>>>>>>>>>> them at the end. If >>>>>>>>>>>>>>>>>>>>> teardown is not called you leak machines and money. You >>>>>>>>>>>>>>>>>>>>> can say it can be >>>>>>>>>>>>>>>>>>>>> done another way...as the full pipeline ;). >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> I dont want to be picky but if beam cant handle its >>>>>>>>>>>>>>>>>>>>> components lifecycle it can be used at scale for generic >>>>>>>>>>>>>>>>>>>>> pipelines and if >>>>>>>>>>>>>>>>>>>>> bound to some particular IO. >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> What does prevent to enforce teardown - ignoring the >>>>>>>>>>>>>>>>>>>>> interstellar crash case which cant be handled by any >>>>>>>>>>>>>>>>>>>>> human system? Nothing >>>>>>>>>>>>>>>>>>>>> technically. Why do you push to not handle it? Is it due >>>>>>>>>>>>>>>>>>>>> to some legacy >>>>>>>>>>>>>>>>>>>>> code on dataflow or something else? >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> Teardown *is* already documented and implemented this >>>>>>>>>>>>>>>>>>>> way (best-effort). So I'm not sure what kind of change >>>>>>>>>>>>>>>>>>>> you're asking for. >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> Remove "best effort" from the javadoc. If it is not call >>>>>>>>>>>>>>>>>>> then it is a bug and we are done :). >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> Also what does it mean for the users? Direct runner >>>>>>>>>>>>>>>>>>>>> does it so if a user udes the RI in test, he will get a >>>>>>>>>>>>>>>>>>>>> different behavior >>>>>>>>>>>>>>>>>>>>> in prod? Also dont forget the user doesnt know what the >>>>>>>>>>>>>>>>>>>>> IOs he composes use >>>>>>>>>>>>>>>>>>>>> so this is so impacting for the whole product than he >>>>>>>>>>>>>>>>>>>>> must be handled IMHO. >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> I understand the portability culture is new in big >>>>>>>>>>>>>>>>>>>>> data world but it is not a reason to ignore what people >>>>>>>>>>>>>>>>>>>>> did for years and >>>>>>>>>>>>>>>>>>>>> do it wrong before doing right ;). >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> My proposal is to list what can prevent to guarantee - >>>>>>>>>>>>>>>>>>>>> in the normal IT conditions - the execution of teardown. >>>>>>>>>>>>>>>>>>>>> Then we see if we >>>>>>>>>>>>>>>>>>>>> can handle it and only if there is a technical reason we >>>>>>>>>>>>>>>>>>>>> cant we make it >>>>>>>>>>>>>>>>>>>>> experimental/unsupported in the api. I know spark and >>>>>>>>>>>>>>>>>>>>> flink can, any >>>>>>>>>>>>>>>>>>>>> unknown blocker for other runners? >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> Technical note: even a kill should go through java >>>>>>>>>>>>>>>>>>>>> shutdown hooks otherwise your environment (beam enclosing >>>>>>>>>>>>>>>>>>>>> software) is >>>>>>>>>>>>>>>>>>>>> fully unhandled and your overall system is uncontrolled. >>>>>>>>>>>>>>>>>>>>> Only case where it >>>>>>>>>>>>>>>>>>>>> is not true is when the software is always owned by a >>>>>>>>>>>>>>>>>>>>> vendor and never >>>>>>>>>>>>>>>>>>>>> installed on customer environment. In this case it >>>>>>>>>>>>>>>>>>>>> belongd to the vendor to >>>>>>>>>>>>>>>>>>>>> handle beam API and not to beam to adjust its API for a >>>>>>>>>>>>>>>>>>>>> vendor - otherwise >>>>>>>>>>>>>>>>>>>>> all unsupported features by one runner should be made >>>>>>>>>>>>>>>>>>>>> optional right? >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> All state is not about network, even in distributed >>>>>>>>>>>>>>>>>>>>> systems so this is key to have an explicit and defined >>>>>>>>>>>>>>>>>>>>> lifecycle. >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> Kenn >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>> >>>>>>> >>>>>> >>>>>