Workers restarting is not a bug, it's standard often expected.

On Mon, Feb 19, 2018, 12:03 PM Romain Manni-Bucau <rmannibu...@gmail.com>
wrote:

> Nothing, as mentionned it is a bug so recovery is a bug recovery
> (procedure)
>
> Le 19 févr. 2018 19:42, "Eugene Kirpichov" <kirpic...@google.com> a
> écrit :
>
>> So what would you like to happen if there is a crash? The DoFn instance
>> no longer exists because the JVM it ran on no longer exists. What should
>> Teardown be called on?
>>
>> On Mon, Feb 19, 2018, 10:20 AM Romain Manni-Bucau <rmannibu...@gmail.com>
>> wrote:
>>
>>> This is what i want and not 999999 teardowns for 1000000 setups until
>>> there is an unexpected crash (= a bug).
>>>
>>> Le 19 févr. 2018 18:57, "Reuven Lax" <re...@google.com> a écrit :
>>>
>>>>
>>>>
>>>> On Mon, Feb 19, 2018 at 7:11 AM, Romain Manni-Bucau <
>>>> rmannibu...@gmail.com> wrote:
>>>>
>>>>>
>>>>>
>>>>> 2018-02-19 15:57 GMT+01:00 Reuven Lax <re...@google.com>:
>>>>>
>>>>>>
>>>>>>
>>>>>> On Mon, Feb 19, 2018 at 12:35 AM, Romain Manni-Bucau <
>>>>>> rmannibu...@gmail.com> wrote:
>>>>>>
>>>>>>> @Reuven: in practise it is created by pool of 256 but leads to the
>>>>>>> same pattern, the teardown is just a "if (iCreatedThem) releaseThem();"
>>>>>>>
>>>>>>
>>>>>> How do you control "256?" Even if you have a pool of 256 workers,
>>>>>> nothing in Beam guarantees how many threads and DoFns are created per
>>>>>> worker. In theory the runner might decide to create 1000 threads on each
>>>>>> worker.
>>>>>>
>>>>>
>>>>> Nop was the other way around, in this case on AWS you can get 256
>>>>> instances at once but not 512 (which will be 2x256). So when you compute
>>>>> the distribution you allocate to some fn the role to own the instance
>>>>> lookup and releasing.
>>>>>
>>>>
>>>> I still don't understand. Let's be more precise. If you write the
>>>> following code:
>>>>
>>>>    pCollection.apply(ParDo.of(new MyDoFn()));
>>>>
>>>> There is no way to control how many instances of MyDoFn are created.
>>>> The runner might decided to create a million instances of this class across
>>>> your worker pool, which means that you will get a million Setup and
>>>> Teardown calls.
>>>>
>>>>
>>>>> Anyway this was just an example of an external resource you must
>>>>> release. Real topic is that beam should define asap a guaranteed generic
>>>>> lifecycle to let user embrace its programming model.
>>>>>
>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>> @Eugene:
>>>>>>> 1. wait logic is about passing the value which is not always
>>>>>>> possible (like 15% of cases from my raw estimate)
>>>>>>> 2. sdf: i'll try to detail why i mention SDF more here
>>>>>>>
>>>>>>>
>>>>>>> Concretely beam exposes a portable API (included in the SDK core).
>>>>>>> This API defines a *container* API and therefore implies bean 
>>>>>>> lifecycles.
>>>>>>> I'll not detail them all but just use the sources and dofn (not sdf) to
>>>>>>> illustrate the idea I'm trying to develop.
>>>>>>>
>>>>>>> A. Source
>>>>>>>
>>>>>>> A source computes a partition plan with 2 primitives: estimateSize
>>>>>>> and split. As an user you can expect both to be called on the same bean
>>>>>>> instance to avoid to pay the same connection cost(s) twice. Concretely:
>>>>>>>
>>>>>>> connect()
>>>>>>> try {
>>>>>>>   estimateSize()
>>>>>>>   split()
>>>>>>> } finally {
>>>>>>>   disconnect()
>>>>>>> }
>>>>>>>
>>>>>>> this is not guaranteed by the API so you must do:
>>>>>>>
>>>>>>> connect()
>>>>>>> try {
>>>>>>>   estimateSize()
>>>>>>> } finally {
>>>>>>>   disconnect()
>>>>>>> }
>>>>>>> connect()
>>>>>>> try {
>>>>>>>   split()
>>>>>>> } finally {
>>>>>>>   disconnect()
>>>>>>> }
>>>>>>>
>>>>>>> + a workaround with an internal estimate size since this primitive
>>>>>>> is often called in split but you dont want to connect twice in the 
>>>>>>> second
>>>>>>> phase.
>>>>>>>
>>>>>>> Why do you need that? Simply cause you want to define an API to
>>>>>>> implement sources which initializes the source bean and destroys it.
>>>>>>> I insists it is a very very basic concern for such API. However beam
>>>>>>> doesn't embraces it and doesn't assume it so building any API on top of
>>>>>>> beam is very hurtful today and for direct beam users you hit the exact 
>>>>>>> same
>>>>>>> issues - check how IO are implemented, the static utilities which create
>>>>>>> volatile connections preventing to reuse existing connection in a single
>>>>>>> method (
>>>>>>> https://github.com/apache/beam/blob/master/sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java#L862
>>>>>>> ).
>>>>>>>
>>>>>>> Same logic applies to the reader which is then created.
>>>>>>>
>>>>>>> B. DoFn & SDF
>>>>>>>
>>>>>>> As a fn dev you expect the same from the beam runtime: init(); try {
>>>>>>> while (...) process(); } finally { destroy(); } and that it is executed 
>>>>>>> on
>>>>>>> the exact same instance to be able to be stateful at that level for
>>>>>>> expensive connections/operations/flow state handling.
>>>>>>>
>>>>>>> As you mentionned with the million example, this sequence should
>>>>>>> happen for each single instance so 1M times for your example.
>>>>>>>
>>>>>>> Now why did I mention SDF several times? Because SDF is a
>>>>>>> generalisation of both cases (source and dofn). Therefore it creates way
>>>>>>> more instances and requires to have a way more strict/explicit 
>>>>>>> definition
>>>>>>> of the exact lifecycle and which instance does what. Since beam handles 
>>>>>>> the
>>>>>>> full lifecycle of the bean instances it must provide init/destroy hooks
>>>>>>> (setup/teardown) which can be stateful.
>>>>>>>
>>>>>>> If you take the JDBC example which was mentionned earlier. Today,
>>>>>>> because of the teardown issue it uses bundles. Since bundles size is not
>>>>>>> defined - and will not with SDF, it must use a pool to be able to reuse 
>>>>>>> a
>>>>>>> connection instance to not correct performances. Now with the SDF and 
>>>>>>> the
>>>>>>> split increase, how do you handle the pool size? Generally in batch you 
>>>>>>> use
>>>>>>> a single connection per thread to avoid to consume all database
>>>>>>> connections. With a pool you have 2 choices: 1. use a pool of 1, 2. use 
>>>>>>> a
>>>>>>> pool a bit higher but multiplied by the number of beans you will likely 
>>>>>>> x2
>>>>>>> or 3 the connection count and make the execution fail with "no more
>>>>>>> connection available". I you picked 1 (pool of #1), then you still have 
>>>>>>> to
>>>>>>> have a reliable teardown by pool instance (close() generally) to ensure 
>>>>>>> you
>>>>>>> release the pool and don't leak the connection information in the JVM. 
>>>>>>> In
>>>>>>> all case you come back to the init()/destroy() lifecycle even if you 
>>>>>>> fake
>>>>>>> to get connections with bundles.
>>>>>>>
>>>>>>> Just to make it obvious: SDF mentions are just cause SDF imply all
>>>>>>> the current issues with the loose definition of the bean lifecycles at 
>>>>>>> an
>>>>>>> exponential level, nothing else.
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> Romain Manni-Bucau
>>>>>>> @rmannibucau <https://twitter.com/rmannibucau> |  Blog
>>>>>>> <https://rmannibucau.metawerx.net/> | Old Blog
>>>>>>> <http://rmannibucau.wordpress.com> | Github
>>>>>>> <https://github.com/rmannibucau> | LinkedIn
>>>>>>> <https://www.linkedin.com/in/rmannibucau> | Book
>>>>>>> <https://www.packtpub.com/application-development/java-ee-8-high-performance>
>>>>>>>
>>>>>>> 2018-02-18 22:32 GMT+01:00 Eugene Kirpichov <kirpic...@google.com>:
>>>>>>>
>>>>>>>> The kind of whole-transform lifecycle you're mentioning can be
>>>>>>>> accomplished using the Wait transform as I suggested in the thread 
>>>>>>>> above,
>>>>>>>> and I believe it should become the canonical way to do that.
>>>>>>>>
>>>>>>>> (Would like to reiterate one more time, as the main author of most
>>>>>>>> design documents related to SDF and of its implementation in the Java
>>>>>>>> direct and dataflow runner that SDF is fully unrelated to the topic of
>>>>>>>> cleanup - I'm very confused as to why it keeps coming up)
>>>>>>>>
>>>>>>>> On Sun, Feb 18, 2018, 1:15 PM Romain Manni-Bucau <
>>>>>>>> rmannibu...@gmail.com> wrote:
>>>>>>>>
>>>>>>>>> I kind of agree except transforms lack a lifecycle too. My
>>>>>>>>> understanding is that sdf could be a way to unify it and clean the 
>>>>>>>>> api.
>>>>>>>>>
>>>>>>>>> Otherwise how to normalize - single api -  lifecycle of transforms?
>>>>>>>>>
>>>>>>>>> Le 18 févr. 2018 21:32, "Ben Chambers" <bchamb...@apache.org> a
>>>>>>>>> écrit :
>>>>>>>>>
>>>>>>>>>> Are you sure that focusing on the cleanup of specific DoFn's is
>>>>>>>>>> appropriate? Many cases where cleanup is necessary, it is around an 
>>>>>>>>>> entire
>>>>>>>>>> composite PTransform. I think there have been discussions/proposals 
>>>>>>>>>> around
>>>>>>>>>> a more methodical "cleanup" option, but those haven't been 
>>>>>>>>>> implemented, to
>>>>>>>>>> the best of my knowledge.
>>>>>>>>>>
>>>>>>>>>> For instance, consider the steps of a FileIO:
>>>>>>>>>> 1. Write to a bunch (N shards) of temporary files
>>>>>>>>>> 2. When all temporary files are complete, attempt to do a bulk
>>>>>>>>>> copy to put them in the final destination.
>>>>>>>>>> 3. Cleanup all the temporary files.
>>>>>>>>>>
>>>>>>>>>> (This is often desirable because it minimizes the chance of
>>>>>>>>>> seeing partial/incomplete results in the final destination).
>>>>>>>>>>
>>>>>>>>>> In the above, you'd want step 1 to execute on many workers,
>>>>>>>>>> likely using a ParDo (say N different workers).
>>>>>>>>>> The move step should only happen once, so on one worker. This
>>>>>>>>>> means it will be a different DoFn, likely with some stuff done to 
>>>>>>>>>> ensure it
>>>>>>>>>> runs on one worker.
>>>>>>>>>>
>>>>>>>>>> In such a case, cleanup / @TearDown of the DoFn is not enough. We
>>>>>>>>>> need an API for a PTransform to schedule some cleanup work for when 
>>>>>>>>>> the
>>>>>>>>>> transform is "done". In batch this is relatively straightforward, but
>>>>>>>>>> doesn't exist. This is the source of some problems, such as BigQuery 
>>>>>>>>>> sink
>>>>>>>>>> leaving files around that have failed to import into BigQuery.
>>>>>>>>>>
>>>>>>>>>> In streaming this is less straightforward -- do you want to wait
>>>>>>>>>> until the end of the pipeline? Or do you want to wait until the end 
>>>>>>>>>> of the
>>>>>>>>>> window? In practice, you just want to wait until you know nobody 
>>>>>>>>>> will need
>>>>>>>>>> the resource anymore.
>>>>>>>>>>
>>>>>>>>>> This led to some discussions around a "cleanup" API, where you
>>>>>>>>>> could have a transform that output resource objects. Each resource 
>>>>>>>>>> object
>>>>>>>>>> would have logic for cleaning it up. And there would be something 
>>>>>>>>>> that
>>>>>>>>>> indicated what parts of the pipeline needed that resource, and what 
>>>>>>>>>> kind of
>>>>>>>>>> temporal lifetime those objects had. As soon as that part of the 
>>>>>>>>>> pipeline
>>>>>>>>>> had advanced far enough that it would no longer need the resources, 
>>>>>>>>>> they
>>>>>>>>>> would get cleaned up. This can be done at pipeline shutdown, or
>>>>>>>>>> incrementally during a streaming pipeline, etc.
>>>>>>>>>>
>>>>>>>>>> Would something like this be a better fit for your use case? If
>>>>>>>>>> not, why is handling teardown within a single DoFn sufficient?
>>>>>>>>>>
>>>>>>>>>> On Sun, Feb 18, 2018 at 11:53 AM Romain Manni-Bucau <
>>>>>>>>>> rmannibu...@gmail.com> wrote:
>>>>>>>>>>
>>>>>>>>>>> Yes 1M. Lets try to explain you simplifying the overall
>>>>>>>>>>> execution. Each instance - one fn so likely in a thread of a worker 
>>>>>>>>>>> - has
>>>>>>>>>>> its lifecycle. Caricaturally: "new" and garbage collection.
>>>>>>>>>>>
>>>>>>>>>>> In practise, new is often an unsafe allocate (deserialization)
>>>>>>>>>>> but it doesnt matter here.
>>>>>>>>>>>
>>>>>>>>>>> What i want is any "new" to have a following setup before any
>>>>>>>>>>> process or stattbundle and the last time beam has the instance 
>>>>>>>>>>> before it is
>>>>>>>>>>> gc-ed and after last finishbundle it calls teardown.
>>>>>>>>>>>
>>>>>>>>>>> It is as simple as it.
>>>>>>>>>>> This way no need to comibe fn in a way making a fn not self
>>>>>>>>>>> contained to implement basic transforms.
>>>>>>>>>>>
>>>>>>>>>>> Le 18 févr. 2018 20:07, "Reuven Lax" <re...@google.com> a
>>>>>>>>>>> écrit :
>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> On Sun, Feb 18, 2018 at 10:50 AM, Romain Manni-Bucau <
>>>>>>>>>>>> rmannibu...@gmail.com> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>> Le 18 févr. 2018 19:28, "Ben Chambers" <bchamb...@apache.org>
>>>>>>>>>>>>> a écrit :
>>>>>>>>>>>>>
>>>>>>>>>>>>> It feels like his thread may be a bit off-track. Rather than
>>>>>>>>>>>>> focusing on the semantics of the existing methods -- which have 
>>>>>>>>>>>>> been noted
>>>>>>>>>>>>> to be meet many existing use cases -- it would be helpful to 
>>>>>>>>>>>>> focus on more
>>>>>>>>>>>>> on the reason you are looking for something with different 
>>>>>>>>>>>>> semantics.
>>>>>>>>>>>>>
>>>>>>>>>>>>> Some possibilities (I'm not sure which one you are trying to
>>>>>>>>>>>>> do):
>>>>>>>>>>>>>
>>>>>>>>>>>>> 1. Clean-up some external, global resource, that was
>>>>>>>>>>>>> initialized once during the startup of the pipeline. If this is 
>>>>>>>>>>>>> the case,
>>>>>>>>>>>>> how are you ensuring it was really only initialized once (and not 
>>>>>>>>>>>>> once per
>>>>>>>>>>>>> worker, per thread, per instance, etc.)? How do you know when the 
>>>>>>>>>>>>> pipeline
>>>>>>>>>>>>> should release it? If the answer is "when it reaches step X", 
>>>>>>>>>>>>> then what
>>>>>>>>>>>>> about a streaming pipeline?
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>> When the dofn is no more needed logically ie when the batch is
>>>>>>>>>>>>> done or stream is stopped (manually or by a jvm shutdown)
>>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> I'm really not following what this means.
>>>>>>>>>>>>
>>>>>>>>>>>> Let's say that a pipeline is running 1000 workers, and each
>>>>>>>>>>>> worker is running 1000 threads (each running a copy of the same 
>>>>>>>>>>>> DoFn). How
>>>>>>>>>>>> many cleanups do you want (do you want 1000 * 1000 = 1M cleanups) 
>>>>>>>>>>>> and when
>>>>>>>>>>>> do you want it called? When the entire pipeline is shut down? When 
>>>>>>>>>>>> an
>>>>>>>>>>>> individual worker is about to shut down (which may be temporary - 
>>>>>>>>>>>> may be
>>>>>>>>>>>> about to start back up)? Something else?
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>> 2. Finalize some resources that are used within some region of
>>>>>>>>>>>>> the pipeline. While, the DoFn lifecycle methods are not a good 
>>>>>>>>>>>>> fit for this
>>>>>>>>>>>>> (they are focused on managing resources within the DoFn), you 
>>>>>>>>>>>>> could model
>>>>>>>>>>>>> this on how FileIO finalizes the files that it produced. For 
>>>>>>>>>>>>> instance:
>>>>>>>>>>>>>    a) ParDo generates "resource IDs" (or some token that
>>>>>>>>>>>>> stores information about resources)
>>>>>>>>>>>>>    b) "Require Deterministic Input" (to prevent retries from
>>>>>>>>>>>>> changing resource IDs)
>>>>>>>>>>>>>    c) ParDo that initializes the resources
>>>>>>>>>>>>>    d) Pipeline segments that use the resources, and eventually
>>>>>>>>>>>>> output the fact they're done
>>>>>>>>>>>>>    e) "Require Deterministic Input"
>>>>>>>>>>>>>    f) ParDo that frees the resources
>>>>>>>>>>>>>
>>>>>>>>>>>>> By making the use of the resource part of the data it is
>>>>>>>>>>>>> possible to "checkpoint" which resources may be in use or have 
>>>>>>>>>>>>> been
>>>>>>>>>>>>> finished by using the require deterministic input. This is 
>>>>>>>>>>>>> important to
>>>>>>>>>>>>> ensuring everything is actually cleaned up.
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>> I nees that but generic and not case by case to industrialize
>>>>>>>>>>>>> some api on top of beam.
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>> 3. Some other use case that I may be missing? If it is this
>>>>>>>>>>>>> case, could you elaborate on what you are trying to accomplish? 
>>>>>>>>>>>>> That would
>>>>>>>>>>>>> help me understand both the problems with existing options and 
>>>>>>>>>>>>> possibly
>>>>>>>>>>>>> what could be done to help.
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>> I understand there are sorkaround for almost all cases but
>>>>>>>>>>>>> means each transform is different in its lifecycle handling  
>>>>>>>>>>>>> except i
>>>>>>>>>>>>> dislike it a lot at a scale and as a user since you cant put any 
>>>>>>>>>>>>> unified
>>>>>>>>>>>>> practise on top of beam, it also makes beam very hard to 
>>>>>>>>>>>>> integrate or to
>>>>>>>>>>>>> use to build higher level libraries or softwares.
>>>>>>>>>>>>>
>>>>>>>>>>>>> This is why i tried to not start the workaround discussions
>>>>>>>>>>>>> and just stay at API level.
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>> -- Ben
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>> On Sun, Feb 18, 2018 at 9:56 AM Romain Manni-Bucau <
>>>>>>>>>>>>> rmannibu...@gmail.com> wrote:
>>>>>>>>>>>>>
>>>>>>>>>>>>>> 2018-02-18 18:36 GMT+01:00 Eugene Kirpichov <
>>>>>>>>>>>>>> kirpic...@google.com>:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> "Machine state" is overly low-level because many of the
>>>>>>>>>>>>>>> possible reasons can happen on a perfectly fine machine.
>>>>>>>>>>>>>>> If you'd like to rephrase it to "it will be called except in
>>>>>>>>>>>>>>> various situations where it's logically impossible or 
>>>>>>>>>>>>>>> impractical to
>>>>>>>>>>>>>>> guarantee that it's called", that's fine. Or you can list some 
>>>>>>>>>>>>>>> of the
>>>>>>>>>>>>>>> examples above.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Sounds ok to me
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> The main point for the user is, you *will* see
>>>>>>>>>>>>>>> non-preventable situations where it couldn't be called - it's 
>>>>>>>>>>>>>>> not just
>>>>>>>>>>>>>>> intergalactic crashes - so if the logic is very important (e.g. 
>>>>>>>>>>>>>>> cleaning up
>>>>>>>>>>>>>>> a large amount of temporary files, shutting down a large number 
>>>>>>>>>>>>>>> of VMs you
>>>>>>>>>>>>>>> started etc), you have to express it using one of the other 
>>>>>>>>>>>>>>> methods that
>>>>>>>>>>>>>>> have stricter guarantees (which obviously come at a cost, e.g. 
>>>>>>>>>>>>>>> no
>>>>>>>>>>>>>>> pass-by-reference).
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> FinishBundle has the exact same guarantee sadly so not which
>>>>>>>>>>>>>> which other method you speak about. Concretely if you make it 
>>>>>>>>>>>>>> really
>>>>>>>>>>>>>> unreliable - this is what best effort sounds to me - then users 
>>>>>>>>>>>>>> can use it
>>>>>>>>>>>>>> to clean anything but if you make it "can happen but it is 
>>>>>>>>>>>>>> unexpected and
>>>>>>>>>>>>>> means something happent" then it is fine to have a manual - or 
>>>>>>>>>>>>>> auto if
>>>>>>>>>>>>>> fancy - recovery procedure. This is where it makes all the 
>>>>>>>>>>>>>> difference and
>>>>>>>>>>>>>> impacts the developpers, ops (all users basically).
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> On Sun, Feb 18, 2018 at 9:16 AM Romain Manni-Bucau <
>>>>>>>>>>>>>>> rmannibu...@gmail.com> wrote:
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Agree Eugene except that "best effort" means that. It is
>>>>>>>>>>>>>>>> also often used to say "at will" and this is what triggered 
>>>>>>>>>>>>>>>> this thread.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> I'm fine using "except if the machine state prevents it"
>>>>>>>>>>>>>>>> but "best effort" is too open and can be very badly and 
>>>>>>>>>>>>>>>> wrongly perceived
>>>>>>>>>>>>>>>> by users (like I did).
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Romain Manni-Bucau
>>>>>>>>>>>>>>>> @rmannibucau <https://twitter.com/rmannibucau> |  Blog
>>>>>>>>>>>>>>>> <https://rmannibucau.metawerx.net/> | Old Blog
>>>>>>>>>>>>>>>> <http://rmannibucau.wordpress.com> | Github
>>>>>>>>>>>>>>>> <https://github.com/rmannibucau> | LinkedIn
>>>>>>>>>>>>>>>> <https://www.linkedin.com/in/rmannibucau> | Book
>>>>>>>>>>>>>>>> <https://www.packtpub.com/application-development/java-ee-8-high-performance>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> 2018-02-18 18:13 GMT+01:00 Eugene Kirpichov <
>>>>>>>>>>>>>>>> kirpic...@google.com>:
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> It will not be called if it's impossible to call it: in
>>>>>>>>>>>>>>>>> the example situation you have (intergalactic crash), and in 
>>>>>>>>>>>>>>>>> a number of
>>>>>>>>>>>>>>>>> more common cases: eg in case the worker container has 
>>>>>>>>>>>>>>>>> crashed (eg user
>>>>>>>>>>>>>>>>> code in a different thread called a C library over JNI and it 
>>>>>>>>>>>>>>>>> segfaulted),
>>>>>>>>>>>>>>>>> JVM bug, crash due to user code OOM, in case the worker has 
>>>>>>>>>>>>>>>>> lost network
>>>>>>>>>>>>>>>>> connectivity (then it may be called but it won't be able to 
>>>>>>>>>>>>>>>>> do anything
>>>>>>>>>>>>>>>>> useful), in case this is running on a preemptible VM and it 
>>>>>>>>>>>>>>>>> was preempted
>>>>>>>>>>>>>>>>> by the underlying cluster manager without notice or if the 
>>>>>>>>>>>>>>>>> worker was too
>>>>>>>>>>>>>>>>> busy with other stuff (eg calling other Teardown functions) 
>>>>>>>>>>>>>>>>> until the
>>>>>>>>>>>>>>>>> preemption timeout elapsed, in case the underlying hardware 
>>>>>>>>>>>>>>>>> simply failed
>>>>>>>>>>>>>>>>> (which happens quite often at scale), and in many other 
>>>>>>>>>>>>>>>>> conditions.
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> "Best effort" is the commonly used term to describe such
>>>>>>>>>>>>>>>>> behavior. Please feel free to file bugs for cases where you 
>>>>>>>>>>>>>>>>> observed a
>>>>>>>>>>>>>>>>> runner not call Teardown in a situation where it was possible 
>>>>>>>>>>>>>>>>> to call it
>>>>>>>>>>>>>>>>> but the runner made insufficient effort.
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> On Sun, Feb 18, 2018, 9:02 AM Romain Manni-Bucau <
>>>>>>>>>>>>>>>>> rmannibu...@gmail.com> wrote:
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> 2018-02-18 18:00 GMT+01:00 Eugene Kirpichov <
>>>>>>>>>>>>>>>>>> kirpic...@google.com>:
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> On Sun, Feb 18, 2018, 2:06 AM Romain Manni-Bucau <
>>>>>>>>>>>>>>>>>>> rmannibu...@gmail.com> wrote:
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> Le 18 févr. 2018 00:23, "Kenneth Knowles" <
>>>>>>>>>>>>>>>>>>>> k...@google.com> a écrit :
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> On Sat, Feb 17, 2018 at 3:09 PM, Romain Manni-Bucau <
>>>>>>>>>>>>>>>>>>>> rmannibu...@gmail.com> wrote:
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> If you give an example of a high-level need (e.g. "I'm
>>>>>>>>>>>>>>>>>>>>> trying to write an IO for system $x and it requires the 
>>>>>>>>>>>>>>>>>>>>> following
>>>>>>>>>>>>>>>>>>>>> initialization and the following cleanup logic and the 
>>>>>>>>>>>>>>>>>>>>> following processing
>>>>>>>>>>>>>>>>>>>>> in between") I'll be better able to help you.
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> Take a simple example of a transform requiring a
>>>>>>>>>>>>>>>>>>>>> connection. Using bundles is a perf killer since size is 
>>>>>>>>>>>>>>>>>>>>> not controlled.
>>>>>>>>>>>>>>>>>>>>> Using teardown doesnt allow you to release the connection 
>>>>>>>>>>>>>>>>>>>>> since it is a
>>>>>>>>>>>>>>>>>>>>> best effort thing. Not releasing the connection makes you 
>>>>>>>>>>>>>>>>>>>>> pay a lot - aws
>>>>>>>>>>>>>>>>>>>>> ;) - or prevents you to launch other processings - 
>>>>>>>>>>>>>>>>>>>>> concurrent limit.
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> For this example @Teardown is an exact fit. If things
>>>>>>>>>>>>>>>>>>>> die so badly that @Teardown is not called then nothing 
>>>>>>>>>>>>>>>>>>>> else can be called
>>>>>>>>>>>>>>>>>>>> to close the connection either. What AWS service are you 
>>>>>>>>>>>>>>>>>>>> thinking of that
>>>>>>>>>>>>>>>>>>>> stays open for a long time when everything at the other 
>>>>>>>>>>>>>>>>>>>> end has died?
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> You assume connections are kind of stateless but some
>>>>>>>>>>>>>>>>>>>> (proprietary) protocols requires some closing exchanges 
>>>>>>>>>>>>>>>>>>>> which are not only
>>>>>>>>>>>>>>>>>>>> "im leaving".
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> For aws i was thinking about starting some services -
>>>>>>>>>>>>>>>>>>>> machines - on the fly in a pipeline startup and closing 
>>>>>>>>>>>>>>>>>>>> them at the end. If
>>>>>>>>>>>>>>>>>>>> teardown is not called you leak machines and money. You 
>>>>>>>>>>>>>>>>>>>> can say it can be
>>>>>>>>>>>>>>>>>>>> done another way...as the full pipeline ;).
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> I dont want to be picky but if beam cant handle its
>>>>>>>>>>>>>>>>>>>> components lifecycle it can be used at scale for generic 
>>>>>>>>>>>>>>>>>>>> pipelines and if
>>>>>>>>>>>>>>>>>>>> bound to some particular IO.
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> What does prevent to enforce teardown - ignoring the
>>>>>>>>>>>>>>>>>>>> interstellar crash case which cant be handled by any human 
>>>>>>>>>>>>>>>>>>>> system? Nothing
>>>>>>>>>>>>>>>>>>>> technically. Why do you push to not handle it? Is it due 
>>>>>>>>>>>>>>>>>>>> to some legacy
>>>>>>>>>>>>>>>>>>>> code on dataflow or something else?
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> Teardown *is* already documented and implemented this
>>>>>>>>>>>>>>>>>>> way (best-effort). So I'm not sure what kind of change 
>>>>>>>>>>>>>>>>>>> you're asking for.
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> Remove "best effort" from the javadoc. If it is not call
>>>>>>>>>>>>>>>>>> then it is a bug and we are done :).
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> Also what does it mean for the users? Direct runner
>>>>>>>>>>>>>>>>>>>> does it so if a user udes the RI in test, he will get a 
>>>>>>>>>>>>>>>>>>>> different behavior
>>>>>>>>>>>>>>>>>>>> in prod? Also dont forget the user doesnt know what the 
>>>>>>>>>>>>>>>>>>>> IOs he composes use
>>>>>>>>>>>>>>>>>>>> so this is so impacting for the whole product than he must 
>>>>>>>>>>>>>>>>>>>> be handled IMHO.
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> I understand the portability culture is new in big data
>>>>>>>>>>>>>>>>>>>> world but it is not a reason to ignore what people did for 
>>>>>>>>>>>>>>>>>>>> years and do it
>>>>>>>>>>>>>>>>>>>> wrong before doing right ;).
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> My proposal is to list what can prevent to guarantee -
>>>>>>>>>>>>>>>>>>>> in the normal IT conditions - the execution of teardown. 
>>>>>>>>>>>>>>>>>>>> Then we see if we
>>>>>>>>>>>>>>>>>>>> can handle it and only if there is a technical reason we 
>>>>>>>>>>>>>>>>>>>> cant we make it
>>>>>>>>>>>>>>>>>>>> experimental/unsupported in the api. I know spark and 
>>>>>>>>>>>>>>>>>>>> flink can, any
>>>>>>>>>>>>>>>>>>>> unknown blocker for other runners?
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> Technical note: even a kill should go through java
>>>>>>>>>>>>>>>>>>>> shutdown hooks otherwise your environment (beam enclosing 
>>>>>>>>>>>>>>>>>>>> software) is
>>>>>>>>>>>>>>>>>>>> fully unhandled and your overall system is uncontrolled. 
>>>>>>>>>>>>>>>>>>>> Only case where it
>>>>>>>>>>>>>>>>>>>> is not true is when the software is always owned by a 
>>>>>>>>>>>>>>>>>>>> vendor and never
>>>>>>>>>>>>>>>>>>>> installed on customer environment. In this case it belongd 
>>>>>>>>>>>>>>>>>>>> to the vendor to
>>>>>>>>>>>>>>>>>>>> handle beam API and not to beam to adjust its API for a 
>>>>>>>>>>>>>>>>>>>> vendor - otherwise
>>>>>>>>>>>>>>>>>>>> all unsupported features by one runner should be made 
>>>>>>>>>>>>>>>>>>>> optional right?
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> All state is not about network, even in distributed
>>>>>>>>>>>>>>>>>>>> systems so this is key to have an explicit and defined 
>>>>>>>>>>>>>>>>>>>> lifecycle.
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> Kenn
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>

Reply via email to