On Mon, Feb 19, 2018 at 12:35 AM, Romain Manni-Bucau <rmannibu...@gmail.com> wrote:
> @Reuven: in practise it is created by pool of 256 but leads to the same > pattern, the teardown is just a "if (iCreatedThem) releaseThem();" > How do you control "256?" Even if you have a pool of 256 workers, nothing in Beam guarantees how many threads and DoFns are created per worker. In theory the runner might decide to create 1000 threads on each worker. > @Eugene: > 1. wait logic is about passing the value which is not always possible > (like 15% of cases from my raw estimate) > 2. sdf: i'll try to detail why i mention SDF more here > > > Concretely beam exposes a portable API (included in the SDK core). This > API defines a *container* API and therefore implies bean lifecycles. I'll > not detail them all but just use the sources and dofn (not sdf) to > illustrate the idea I'm trying to develop. > > A. Source > > A source computes a partition plan with 2 primitives: estimateSize and > split. As an user you can expect both to be called on the same bean > instance to avoid to pay the same connection cost(s) twice. Concretely: > > connect() > try { > estimateSize() > split() > } finally { > disconnect() > } > > this is not guaranteed by the API so you must do: > > connect() > try { > estimateSize() > } finally { > disconnect() > } > connect() > try { > split() > } finally { > disconnect() > } > > + a workaround with an internal estimate size since this primitive is > often called in split but you dont want to connect twice in the second > phase. > > Why do you need that? Simply cause you want to define an API to implement > sources which initializes the source bean and destroys it. > I insists it is a very very basic concern for such API. However beam > doesn't embraces it and doesn't assume it so building any API on top of > beam is very hurtful today and for direct beam users you hit the exact same > issues - check how IO are implemented, the static utilities which create > volatile connections preventing to reuse existing connection in a single > method (https://github.com/apache/beam/blob/master/sdks/java/io/ > elasticsearch/src/main/java/org/apache/beam/sdk/io/ > elasticsearch/ElasticsearchIO.java#L862). > > Same logic applies to the reader which is then created. > > B. DoFn & SDF > > As a fn dev you expect the same from the beam runtime: init(); try { while > (...) process(); } finally { destroy(); } and that it is executed on the > exact same instance to be able to be stateful at that level for expensive > connections/operations/flow state handling. > > As you mentionned with the million example, this sequence should happen > for each single instance so 1M times for your example. > > Now why did I mention SDF several times? Because SDF is a generalisation > of both cases (source and dofn). Therefore it creates way more instances > and requires to have a way more strict/explicit definition of the exact > lifecycle and which instance does what. Since beam handles the full > lifecycle of the bean instances it must provide init/destroy hooks > (setup/teardown) which can be stateful. > > If you take the JDBC example which was mentionned earlier. Today, because > of the teardown issue it uses bundles. Since bundles size is not defined - > and will not with SDF, it must use a pool to be able to reuse a connection > instance to not correct performances. Now with the SDF and the split > increase, how do you handle the pool size? Generally in batch you use a > single connection per thread to avoid to consume all database connections. > With a pool you have 2 choices: 1. use a pool of 1, 2. use a pool a bit > higher but multiplied by the number of beans you will likely x2 or 3 the > connection count and make the execution fail with "no more connection > available". I you picked 1 (pool of #1), then you still have to have a > reliable teardown by pool instance (close() generally) to ensure you > release the pool and don't leak the connection information in the JVM. In > all case you come back to the init()/destroy() lifecycle even if you fake > to get connections with bundles. > > Just to make it obvious: SDF mentions are just cause SDF imply all the > current issues with the loose definition of the bean lifecycles at an > exponential level, nothing else. > > > > Romain Manni-Bucau > @rmannibucau <https://twitter.com/rmannibucau> | Blog > <https://rmannibucau.metawerx.net/> | Old Blog > <http://rmannibucau.wordpress.com> | Github > <https://github.com/rmannibucau> | LinkedIn > <https://www.linkedin.com/in/rmannibucau> | Book > <https://www.packtpub.com/application-development/java-ee-8-high-performance> > > 2018-02-18 22:32 GMT+01:00 Eugene Kirpichov <kirpic...@google.com>: > >> The kind of whole-transform lifecycle you're mentioning can be >> accomplished using the Wait transform as I suggested in the thread above, >> and I believe it should become the canonical way to do that. >> >> (Would like to reiterate one more time, as the main author of most design >> documents related to SDF and of its implementation in the Java direct and >> dataflow runner that SDF is fully unrelated to the topic of cleanup - I'm >> very confused as to why it keeps coming up) >> >> On Sun, Feb 18, 2018, 1:15 PM Romain Manni-Bucau <rmannibu...@gmail.com> >> wrote: >> >>> I kind of agree except transforms lack a lifecycle too. My understanding >>> is that sdf could be a way to unify it and clean the api. >>> >>> Otherwise how to normalize - single api - lifecycle of transforms? >>> >>> Le 18 févr. 2018 21:32, "Ben Chambers" <bchamb...@apache.org> a écrit : >>> >>>> Are you sure that focusing on the cleanup of specific DoFn's is >>>> appropriate? Many cases where cleanup is necessary, it is around an entire >>>> composite PTransform. I think there have been discussions/proposals around >>>> a more methodical "cleanup" option, but those haven't been implemented, to >>>> the best of my knowledge. >>>> >>>> For instance, consider the steps of a FileIO: >>>> 1. Write to a bunch (N shards) of temporary files >>>> 2. When all temporary files are complete, attempt to do a bulk copy to >>>> put them in the final destination. >>>> 3. Cleanup all the temporary files. >>>> >>>> (This is often desirable because it minimizes the chance of seeing >>>> partial/incomplete results in the final destination). >>>> >>>> In the above, you'd want step 1 to execute on many workers, likely >>>> using a ParDo (say N different workers). >>>> The move step should only happen once, so on one worker. This means it >>>> will be a different DoFn, likely with some stuff done to ensure it runs on >>>> one worker. >>>> >>>> In such a case, cleanup / @TearDown of the DoFn is not enough. We need >>>> an API for a PTransform to schedule some cleanup work for when the >>>> transform is "done". In batch this is relatively straightforward, but >>>> doesn't exist. This is the source of some problems, such as BigQuery sink >>>> leaving files around that have failed to import into BigQuery. >>>> >>>> In streaming this is less straightforward -- do you want to wait until >>>> the end of the pipeline? Or do you want to wait until the end of the >>>> window? In practice, you just want to wait until you know nobody will need >>>> the resource anymore. >>>> >>>> This led to some discussions around a "cleanup" API, where you could >>>> have a transform that output resource objects. Each resource object would >>>> have logic for cleaning it up. And there would be something that indicated >>>> what parts of the pipeline needed that resource, and what kind of temporal >>>> lifetime those objects had. As soon as that part of the pipeline had >>>> advanced far enough that it would no longer need the resources, they would >>>> get cleaned up. This can be done at pipeline shutdown, or incrementally >>>> during a streaming pipeline, etc. >>>> >>>> Would something like this be a better fit for your use case? If not, >>>> why is handling teardown within a single DoFn sufficient? >>>> >>>> On Sun, Feb 18, 2018 at 11:53 AM Romain Manni-Bucau < >>>> rmannibu...@gmail.com> wrote: >>>> >>>>> Yes 1M. Lets try to explain you simplifying the overall execution. >>>>> Each instance - one fn so likely in a thread of a worker - has its >>>>> lifecycle. Caricaturally: "new" and garbage collection. >>>>> >>>>> In practise, new is often an unsafe allocate (deserialization) but it >>>>> doesnt matter here. >>>>> >>>>> What i want is any "new" to have a following setup before any process >>>>> or stattbundle and the last time beam has the instance before it is gc-ed >>>>> and after last finishbundle it calls teardown. >>>>> >>>>> It is as simple as it. >>>>> This way no need to comibe fn in a way making a fn not self contained >>>>> to implement basic transforms. >>>>> >>>>> Le 18 févr. 2018 20:07, "Reuven Lax" <re...@google.com> a écrit : >>>>> >>>>>> >>>>>> >>>>>> On Sun, Feb 18, 2018 at 10:50 AM, Romain Manni-Bucau < >>>>>> rmannibu...@gmail.com> wrote: >>>>>> >>>>>>> >>>>>>> >>>>>>> Le 18 févr. 2018 19:28, "Ben Chambers" <bchamb...@apache.org> a >>>>>>> écrit : >>>>>>> >>>>>>> It feels like his thread may be a bit off-track. Rather than >>>>>>> focusing on the semantics of the existing methods -- which have been >>>>>>> noted >>>>>>> to be meet many existing use cases -- it would be helpful to focus on >>>>>>> more >>>>>>> on the reason you are looking for something with different semantics. >>>>>>> >>>>>>> Some possibilities (I'm not sure which one you are trying to do): >>>>>>> >>>>>>> 1. Clean-up some external, global resource, that was initialized >>>>>>> once during the startup of the pipeline. If this is the case, how are >>>>>>> you >>>>>>> ensuring it was really only initialized once (and not once per worker, >>>>>>> per >>>>>>> thread, per instance, etc.)? How do you know when the pipeline should >>>>>>> release it? If the answer is "when it reaches step X", then what about a >>>>>>> streaming pipeline? >>>>>>> >>>>>>> >>>>>>> When the dofn is no more needed logically ie when the batch is done >>>>>>> or stream is stopped (manually or by a jvm shutdown) >>>>>>> >>>>>> >>>>>> I'm really not following what this means. >>>>>> >>>>>> Let's say that a pipeline is running 1000 workers, and each worker is >>>>>> running 1000 threads (each running a copy of the same DoFn). How many >>>>>> cleanups do you want (do you want 1000 * 1000 = 1M cleanups) and when do >>>>>> you want it called? When the entire pipeline is shut down? When an >>>>>> individual worker is about to shut down (which may be temporary - may be >>>>>> about to start back up)? Something else? >>>>>> >>>>>> >>>>>> >>>>>>> >>>>>>> >>>>>>> >>>>>>> 2. Finalize some resources that are used within some region of the >>>>>>> pipeline. While, the DoFn lifecycle methods are not a good fit for this >>>>>>> (they are focused on managing resources within the DoFn), you could >>>>>>> model >>>>>>> this on how FileIO finalizes the files that it produced. For instance: >>>>>>> a) ParDo generates "resource IDs" (or some token that stores >>>>>>> information about resources) >>>>>>> b) "Require Deterministic Input" (to prevent retries from >>>>>>> changing resource IDs) >>>>>>> c) ParDo that initializes the resources >>>>>>> d) Pipeline segments that use the resources, and eventually >>>>>>> output the fact they're done >>>>>>> e) "Require Deterministic Input" >>>>>>> f) ParDo that frees the resources >>>>>>> >>>>>>> By making the use of the resource part of the data it is possible to >>>>>>> "checkpoint" which resources may be in use or have been finished by >>>>>>> using >>>>>>> the require deterministic input. This is important to ensuring >>>>>>> everything >>>>>>> is actually cleaned up. >>>>>>> >>>>>>> >>>>>>> I nees that but generic and not case by case to industrialize some >>>>>>> api on top of beam. >>>>>>> >>>>>>> >>>>>>> >>>>>>> 3. Some other use case that I may be missing? If it is this case, >>>>>>> could you elaborate on what you are trying to accomplish? That would >>>>>>> help >>>>>>> me understand both the problems with existing options and possibly what >>>>>>> could be done to help. >>>>>>> >>>>>>> >>>>>>> I understand there are sorkaround for almost all cases but means >>>>>>> each transform is different in its lifecycle handling except i dislike >>>>>>> it >>>>>>> a lot at a scale and as a user since you cant put any unified practise >>>>>>> on >>>>>>> top of beam, it also makes beam very hard to integrate or to use to >>>>>>> build >>>>>>> higher level libraries or softwares. >>>>>>> >>>>>>> This is why i tried to not start the workaround discussions and just >>>>>>> stay at API level. >>>>>>> >>>>>>> >>>>>>> >>>>>>> -- Ben >>>>>>> >>>>>>> >>>>>>> On Sun, Feb 18, 2018 at 9:56 AM Romain Manni-Bucau < >>>>>>> rmannibu...@gmail.com> wrote: >>>>>>> >>>>>>>> 2018-02-18 18:36 GMT+01:00 Eugene Kirpichov <kirpic...@google.com>: >>>>>>>> >>>>>>>>> "Machine state" is overly low-level because many of the possible >>>>>>>>> reasons can happen on a perfectly fine machine. >>>>>>>>> If you'd like to rephrase it to "it will be called except in >>>>>>>>> various situations where it's logically impossible or impractical to >>>>>>>>> guarantee that it's called", that's fine. Or you can list some of the >>>>>>>>> examples above. >>>>>>>>> >>>>>>>> >>>>>>>> Sounds ok to me >>>>>>>> >>>>>>>> >>>>>>>>> >>>>>>>>> The main point for the user is, you *will* see non-preventable >>>>>>>>> situations where it couldn't be called - it's not just intergalactic >>>>>>>>> crashes - so if the logic is very important (e.g. cleaning up a large >>>>>>>>> amount of temporary files, shutting down a large number of VMs you >>>>>>>>> started >>>>>>>>> etc), you have to express it using one of the other methods that have >>>>>>>>> stricter guarantees (which obviously come at a cost, e.g. no >>>>>>>>> pass-by-reference). >>>>>>>>> >>>>>>>> >>>>>>>> FinishBundle has the exact same guarantee sadly so not which which >>>>>>>> other method you speak about. Concretely if you make it really >>>>>>>> unreliable - >>>>>>>> this is what best effort sounds to me - then users can use it to clean >>>>>>>> anything but if you make it "can happen but it is unexpected and means >>>>>>>> something happent" then it is fine to have a manual - or auto if fancy >>>>>>>> - >>>>>>>> recovery procedure. This is where it makes all the difference and >>>>>>>> impacts >>>>>>>> the developpers, ops (all users basically). >>>>>>>> >>>>>>>> >>>>>>>>> >>>>>>>>> On Sun, Feb 18, 2018 at 9:16 AM Romain Manni-Bucau < >>>>>>>>> rmannibu...@gmail.com> wrote: >>>>>>>>> >>>>>>>>>> Agree Eugene except that "best effort" means that. It is also >>>>>>>>>> often used to say "at will" and this is what triggered this thread. >>>>>>>>>> >>>>>>>>>> I'm fine using "except if the machine state prevents it" but >>>>>>>>>> "best effort" is too open and can be very badly and wrongly >>>>>>>>>> perceived by >>>>>>>>>> users (like I did). >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> Romain Manni-Bucau >>>>>>>>>> @rmannibucau <https://twitter.com/rmannibucau> | Blog >>>>>>>>>> <https://rmannibucau.metawerx.net/> | Old Blog >>>>>>>>>> <http://rmannibucau.wordpress.com> | Github >>>>>>>>>> <https://github.com/rmannibucau> | LinkedIn >>>>>>>>>> <https://www.linkedin.com/in/rmannibucau> | Book >>>>>>>>>> <https://www.packtpub.com/application-development/java-ee-8-high-performance> >>>>>>>>>> >>>>>>>>>> 2018-02-18 18:13 GMT+01:00 Eugene Kirpichov <kirpic...@google.com >>>>>>>>>> >: >>>>>>>>>> >>>>>>>>>>> It will not be called if it's impossible to call it: in the >>>>>>>>>>> example situation you have (intergalactic crash), and in a number >>>>>>>>>>> of more >>>>>>>>>>> common cases: eg in case the worker container has crashed (eg user >>>>>>>>>>> code in >>>>>>>>>>> a different thread called a C library over JNI and it segfaulted), >>>>>>>>>>> JVM bug, >>>>>>>>>>> crash due to user code OOM, in case the worker has lost network >>>>>>>>>>> connectivity (then it may be called but it won't be able to do >>>>>>>>>>> anything >>>>>>>>>>> useful), in case this is running on a preemptible VM and it was >>>>>>>>>>> preempted >>>>>>>>>>> by the underlying cluster manager without notice or if the worker >>>>>>>>>>> was too >>>>>>>>>>> busy with other stuff (eg calling other Teardown functions) until >>>>>>>>>>> the >>>>>>>>>>> preemption timeout elapsed, in case the underlying hardware simply >>>>>>>>>>> failed >>>>>>>>>>> (which happens quite often at scale), and in many other conditions. >>>>>>>>>>> >>>>>>>>>>> "Best effort" is the commonly used term to describe such >>>>>>>>>>> behavior. Please feel free to file bugs for cases where you >>>>>>>>>>> observed a >>>>>>>>>>> runner not call Teardown in a situation where it was possible to >>>>>>>>>>> call it >>>>>>>>>>> but the runner made insufficient effort. >>>>>>>>>>> >>>>>>>>>>> On Sun, Feb 18, 2018, 9:02 AM Romain Manni-Bucau < >>>>>>>>>>> rmannibu...@gmail.com> wrote: >>>>>>>>>>> >>>>>>>>>>>> 2018-02-18 18:00 GMT+01:00 Eugene Kirpichov < >>>>>>>>>>>> kirpic...@google.com>: >>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> On Sun, Feb 18, 2018, 2:06 AM Romain Manni-Bucau < >>>>>>>>>>>>> rmannibu...@gmail.com> wrote: >>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> Le 18 févr. 2018 00:23, "Kenneth Knowles" <k...@google.com> a >>>>>>>>>>>>>> écrit : >>>>>>>>>>>>>> >>>>>>>>>>>>>> On Sat, Feb 17, 2018 at 3:09 PM, Romain Manni-Bucau < >>>>>>>>>>>>>> rmannibu...@gmail.com> wrote: >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> If you give an example of a high-level need (e.g. "I'm >>>>>>>>>>>>>>> trying to write an IO for system $x and it requires the >>>>>>>>>>>>>>> following >>>>>>>>>>>>>>> initialization and the following cleanup logic and the >>>>>>>>>>>>>>> following processing >>>>>>>>>>>>>>> in between") I'll be better able to help you. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> Take a simple example of a transform requiring a connection. >>>>>>>>>>>>>>> Using bundles is a perf killer since size is not controlled. >>>>>>>>>>>>>>> Using teardown >>>>>>>>>>>>>>> doesnt allow you to release the connection since it is a best >>>>>>>>>>>>>>> effort thing. >>>>>>>>>>>>>>> Not releasing the connection makes you pay a lot - aws ;) - or >>>>>>>>>>>>>>> prevents you >>>>>>>>>>>>>>> to launch other processings - concurrent limit. >>>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> For this example @Teardown is an exact fit. If things die so >>>>>>>>>>>>>> badly that @Teardown is not called then nothing else can be >>>>>>>>>>>>>> called to close >>>>>>>>>>>>>> the connection either. What AWS service are you thinking of that >>>>>>>>>>>>>> stays open >>>>>>>>>>>>>> for a long time when everything at the other end has died? >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> You assume connections are kind of stateless but some >>>>>>>>>>>>>> (proprietary) protocols requires some closing exchanges which >>>>>>>>>>>>>> are not only >>>>>>>>>>>>>> "im leaving". >>>>>>>>>>>>>> >>>>>>>>>>>>>> For aws i was thinking about starting some services - >>>>>>>>>>>>>> machines - on the fly in a pipeline startup and closing them at >>>>>>>>>>>>>> the end. If >>>>>>>>>>>>>> teardown is not called you leak machines and money. You can say >>>>>>>>>>>>>> it can be >>>>>>>>>>>>>> done another way...as the full pipeline ;). >>>>>>>>>>>>>> >>>>>>>>>>>>>> I dont want to be picky but if beam cant handle its >>>>>>>>>>>>>> components lifecycle it can be used at scale for generic >>>>>>>>>>>>>> pipelines and if >>>>>>>>>>>>>> bound to some particular IO. >>>>>>>>>>>>>> >>>>>>>>>>>>>> What does prevent to enforce teardown - ignoring the >>>>>>>>>>>>>> interstellar crash case which cant be handled by any human >>>>>>>>>>>>>> system? Nothing >>>>>>>>>>>>>> technically. Why do you push to not handle it? Is it due to some >>>>>>>>>>>>>> legacy >>>>>>>>>>>>>> code on dataflow or something else? >>>>>>>>>>>>>> >>>>>>>>>>>>> Teardown *is* already documented and implemented this way >>>>>>>>>>>>> (best-effort). So I'm not sure what kind of change you're asking >>>>>>>>>>>>> for. >>>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> Remove "best effort" from the javadoc. If it is not call then >>>>>>>>>>>> it is a bug and we are done :). >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>>> Also what does it mean for the users? Direct runner does it >>>>>>>>>>>>>> so if a user udes the RI in test, he will get a different >>>>>>>>>>>>>> behavior in prod? >>>>>>>>>>>>>> Also dont forget the user doesnt know what the IOs he composes >>>>>>>>>>>>>> use so this >>>>>>>>>>>>>> is so impacting for the whole product than he must be handled >>>>>>>>>>>>>> IMHO. >>>>>>>>>>>>>> >>>>>>>>>>>>>> I understand the portability culture is new in big data world >>>>>>>>>>>>>> but it is not a reason to ignore what people did for years and >>>>>>>>>>>>>> do it wrong >>>>>>>>>>>>>> before doing right ;). >>>>>>>>>>>>>> >>>>>>>>>>>>>> My proposal is to list what can prevent to guarantee - in the >>>>>>>>>>>>>> normal IT conditions - the execution of teardown. Then we see if >>>>>>>>>>>>>> we can >>>>>>>>>>>>>> handle it and only if there is a technical reason we cant we >>>>>>>>>>>>>> make it >>>>>>>>>>>>>> experimental/unsupported in the api. I know spark and flink can, >>>>>>>>>>>>>> any >>>>>>>>>>>>>> unknown blocker for other runners? >>>>>>>>>>>>>> >>>>>>>>>>>>>> Technical note: even a kill should go through java shutdown >>>>>>>>>>>>>> hooks otherwise your environment (beam enclosing software) is >>>>>>>>>>>>>> fully >>>>>>>>>>>>>> unhandled and your overall system is uncontrolled. Only case >>>>>>>>>>>>>> where it is >>>>>>>>>>>>>> not true is when the software is always owned by a vendor and >>>>>>>>>>>>>> never >>>>>>>>>>>>>> installed on customer environment. In this case it belongd to >>>>>>>>>>>>>> the vendor to >>>>>>>>>>>>>> handle beam API and not to beam to adjust its API for a vendor - >>>>>>>>>>>>>> otherwise >>>>>>>>>>>>>> all unsupported features by one runner should be made optional >>>>>>>>>>>>>> right? >>>>>>>>>>>>>> >>>>>>>>>>>>>> All state is not about network, even in distributed systems >>>>>>>>>>>>>> so this is key to have an explicit and defined lifecycle. >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> Kenn >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>> >>>>>>> >>>>>> >