Restarting doesnt mean you dont call teardown. Except a bug there is no
reason - technically - it happens, no reason.

Le 19 févr. 2018 21:14, "Reuven Lax" <re...@google.com> a écrit :

> Workers restarting is not a bug, it's standard often expected.
>
> On Mon, Feb 19, 2018, 12:03 PM Romain Manni-Bucau <rmannibu...@gmail.com>
> wrote:
>
>> Nothing, as mentionned it is a bug so recovery is a bug recovery
>> (procedure)
>>
>> Le 19 févr. 2018 19:42, "Eugene Kirpichov" <kirpic...@google.com> a
>> écrit :
>>
>>> So what would you like to happen if there is a crash? The DoFn instance
>>> no longer exists because the JVM it ran on no longer exists. What should
>>> Teardown be called on?
>>>
>>> On Mon, Feb 19, 2018, 10:20 AM Romain Manni-Bucau <rmannibu...@gmail.com>
>>> wrote:
>>>
>>>> This is what i want and not 999999 teardowns for 1000000 setups until
>>>> there is an unexpected crash (= a bug).
>>>>
>>>> Le 19 févr. 2018 18:57, "Reuven Lax" <re...@google.com> a écrit :
>>>>
>>>>>
>>>>>
>>>>> On Mon, Feb 19, 2018 at 7:11 AM, Romain Manni-Bucau <
>>>>> rmannibu...@gmail.com> wrote:
>>>>>
>>>>>>
>>>>>>
>>>>>> 2018-02-19 15:57 GMT+01:00 Reuven Lax <re...@google.com>:
>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> On Mon, Feb 19, 2018 at 12:35 AM, Romain Manni-Bucau <
>>>>>>> rmannibu...@gmail.com> wrote:
>>>>>>>
>>>>>>>> @Reuven: in practise it is created by pool of 256 but leads to the
>>>>>>>> same pattern, the teardown is just a "if (iCreatedThem) releaseThem();"
>>>>>>>>
>>>>>>>
>>>>>>> How do you control "256?" Even if you have a pool of 256 workers,
>>>>>>> nothing in Beam guarantees how many threads and DoFns are created per
>>>>>>> worker. In theory the runner might decide to create 1000 threads on each
>>>>>>> worker.
>>>>>>>
>>>>>>
>>>>>> Nop was the other way around, in this case on AWS you can get 256
>>>>>> instances at once but not 512 (which will be 2x256). So when you compute
>>>>>> the distribution you allocate to some fn the role to own the instance
>>>>>> lookup and releasing.
>>>>>>
>>>>>
>>>>> I still don't understand. Let's be more precise. If you write the
>>>>> following code:
>>>>>
>>>>>    pCollection.apply(ParDo.of(new MyDoFn()));
>>>>>
>>>>> There is no way to control how many instances of MyDoFn are created.
>>>>> The runner might decided to create a million instances of this class 
>>>>> across
>>>>> your worker pool, which means that you will get a million Setup and
>>>>> Teardown calls.
>>>>>
>>>>>
>>>>>> Anyway this was just an example of an external resource you must
>>>>>> release. Real topic is that beam should define asap a guaranteed generic
>>>>>> lifecycle to let user embrace its programming model.
>>>>>>
>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>> @Eugene:
>>>>>>>> 1. wait logic is about passing the value which is not always
>>>>>>>> possible (like 15% of cases from my raw estimate)
>>>>>>>> 2. sdf: i'll try to detail why i mention SDF more here
>>>>>>>>
>>>>>>>>
>>>>>>>> Concretely beam exposes a portable API (included in the SDK core).
>>>>>>>> This API defines a *container* API and therefore implies bean 
>>>>>>>> lifecycles.
>>>>>>>> I'll not detail them all but just use the sources and dofn (not sdf) to
>>>>>>>> illustrate the idea I'm trying to develop.
>>>>>>>>
>>>>>>>> A. Source
>>>>>>>>
>>>>>>>> A source computes a partition plan with 2 primitives: estimateSize
>>>>>>>> and split. As an user you can expect both to be called on the same bean
>>>>>>>> instance to avoid to pay the same connection cost(s) twice. Concretely:
>>>>>>>>
>>>>>>>> connect()
>>>>>>>> try {
>>>>>>>>   estimateSize()
>>>>>>>>   split()
>>>>>>>> } finally {
>>>>>>>>   disconnect()
>>>>>>>> }
>>>>>>>>
>>>>>>>> this is not guaranteed by the API so you must do:
>>>>>>>>
>>>>>>>> connect()
>>>>>>>> try {
>>>>>>>>   estimateSize()
>>>>>>>> } finally {
>>>>>>>>   disconnect()
>>>>>>>> }
>>>>>>>> connect()
>>>>>>>> try {
>>>>>>>>   split()
>>>>>>>> } finally {
>>>>>>>>   disconnect()
>>>>>>>> }
>>>>>>>>
>>>>>>>> + a workaround with an internal estimate size since this primitive
>>>>>>>> is often called in split but you dont want to connect twice in the 
>>>>>>>> second
>>>>>>>> phase.
>>>>>>>>
>>>>>>>> Why do you need that? Simply cause you want to define an API to
>>>>>>>> implement sources which initializes the source bean and destroys it.
>>>>>>>> I insists it is a very very basic concern for such API. However
>>>>>>>> beam doesn't embraces it and doesn't assume it so building any API on 
>>>>>>>> top
>>>>>>>> of beam is very hurtful today and for direct beam users you hit the 
>>>>>>>> exact
>>>>>>>> same issues - check how IO are implemented, the static utilities which
>>>>>>>> create volatile connections preventing to reuse existing connection in 
>>>>>>>> a
>>>>>>>> single method (https://github.com/apache/
>>>>>>>> beam/blob/master/sdks/java/io/elasticsearch/src/main/java/
>>>>>>>> org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java#L862).
>>>>>>>>
>>>>>>>> Same logic applies to the reader which is then created.
>>>>>>>>
>>>>>>>> B. DoFn & SDF
>>>>>>>>
>>>>>>>> As a fn dev you expect the same from the beam runtime: init(); try
>>>>>>>> { while (...) process(); } finally { destroy(); } and that it is 
>>>>>>>> executed
>>>>>>>> on the exact same instance to be able to be stateful at that level for
>>>>>>>> expensive connections/operations/flow state handling.
>>>>>>>>
>>>>>>>> As you mentionned with the million example, this sequence should
>>>>>>>> happen for each single instance so 1M times for your example.
>>>>>>>>
>>>>>>>> Now why did I mention SDF several times? Because SDF is a
>>>>>>>> generalisation of both cases (source and dofn). Therefore it creates 
>>>>>>>> way
>>>>>>>> more instances and requires to have a way more strict/explicit 
>>>>>>>> definition
>>>>>>>> of the exact lifecycle and which instance does what. Since beam 
>>>>>>>> handles the
>>>>>>>> full lifecycle of the bean instances it must provide init/destroy hooks
>>>>>>>> (setup/teardown) which can be stateful.
>>>>>>>>
>>>>>>>> If you take the JDBC example which was mentionned earlier. Today,
>>>>>>>> because of the teardown issue it uses bundles. Since bundles size is 
>>>>>>>> not
>>>>>>>> defined - and will not with SDF, it must use a pool to be able to 
>>>>>>>> reuse a
>>>>>>>> connection instance to not correct performances. Now with the SDF and 
>>>>>>>> the
>>>>>>>> split increase, how do you handle the pool size? Generally in batch 
>>>>>>>> you use
>>>>>>>> a single connection per thread to avoid to consume all database
>>>>>>>> connections. With a pool you have 2 choices: 1. use a pool of 1, 2. 
>>>>>>>> use a
>>>>>>>> pool a bit higher but multiplied by the number of beans you will 
>>>>>>>> likely x2
>>>>>>>> or 3 the connection count and make the execution fail with "no more
>>>>>>>> connection available". I you picked 1 (pool of #1), then you still 
>>>>>>>> have to
>>>>>>>> have a reliable teardown by pool instance (close() generally) to 
>>>>>>>> ensure you
>>>>>>>> release the pool and don't leak the connection information in the JVM. 
>>>>>>>> In
>>>>>>>> all case you come back to the init()/destroy() lifecycle even if you 
>>>>>>>> fake
>>>>>>>> to get connections with bundles.
>>>>>>>>
>>>>>>>> Just to make it obvious: SDF mentions are just cause SDF imply all
>>>>>>>> the current issues with the loose definition of the bean lifecycles at 
>>>>>>>> an
>>>>>>>> exponential level, nothing else.
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> Romain Manni-Bucau
>>>>>>>> @rmannibucau <https://twitter.com/rmannibucau> |  Blog
>>>>>>>> <https://rmannibucau.metawerx.net/> | Old Blog
>>>>>>>> <http://rmannibucau.wordpress.com> | Github
>>>>>>>> <https://github.com/rmannibucau> | LinkedIn
>>>>>>>> <https://www.linkedin.com/in/rmannibucau> | Book
>>>>>>>> <https://www.packtpub.com/application-development/java-ee-8-high-performance>
>>>>>>>>
>>>>>>>> 2018-02-18 22:32 GMT+01:00 Eugene Kirpichov <kirpic...@google.com>:
>>>>>>>>
>>>>>>>>> The kind of whole-transform lifecycle you're mentioning can be
>>>>>>>>> accomplished using the Wait transform as I suggested in the thread 
>>>>>>>>> above,
>>>>>>>>> and I believe it should become the canonical way to do that.
>>>>>>>>>
>>>>>>>>> (Would like to reiterate one more time, as the main author of most
>>>>>>>>> design documents related to SDF and of its implementation in the Java
>>>>>>>>> direct and dataflow runner that SDF is fully unrelated to the topic of
>>>>>>>>> cleanup - I'm very confused as to why it keeps coming up)
>>>>>>>>>
>>>>>>>>> On Sun, Feb 18, 2018, 1:15 PM Romain Manni-Bucau <
>>>>>>>>> rmannibu...@gmail.com> wrote:
>>>>>>>>>
>>>>>>>>>> I kind of agree except transforms lack a lifecycle too. My
>>>>>>>>>> understanding is that sdf could be a way to unify it and clean the 
>>>>>>>>>> api.
>>>>>>>>>>
>>>>>>>>>> Otherwise how to normalize - single api -  lifecycle of
>>>>>>>>>> transforms?
>>>>>>>>>>
>>>>>>>>>> Le 18 févr. 2018 21:32, "Ben Chambers" <bchamb...@apache.org> a
>>>>>>>>>> écrit :
>>>>>>>>>>
>>>>>>>>>>> Are you sure that focusing on the cleanup of specific DoFn's is
>>>>>>>>>>> appropriate? Many cases where cleanup is necessary, it is around an 
>>>>>>>>>>> entire
>>>>>>>>>>> composite PTransform. I think there have been discussions/proposals 
>>>>>>>>>>> around
>>>>>>>>>>> a more methodical "cleanup" option, but those haven't been 
>>>>>>>>>>> implemented, to
>>>>>>>>>>> the best of my knowledge.
>>>>>>>>>>>
>>>>>>>>>>> For instance, consider the steps of a FileIO:
>>>>>>>>>>> 1. Write to a bunch (N shards) of temporary files
>>>>>>>>>>> 2. When all temporary files are complete, attempt to do a bulk
>>>>>>>>>>> copy to put them in the final destination.
>>>>>>>>>>> 3. Cleanup all the temporary files.
>>>>>>>>>>>
>>>>>>>>>>> (This is often desirable because it minimizes the chance of
>>>>>>>>>>> seeing partial/incomplete results in the final destination).
>>>>>>>>>>>
>>>>>>>>>>> In the above, you'd want step 1 to execute on many workers,
>>>>>>>>>>> likely using a ParDo (say N different workers).
>>>>>>>>>>> The move step should only happen once, so on one worker. This
>>>>>>>>>>> means it will be a different DoFn, likely with some stuff done to 
>>>>>>>>>>> ensure it
>>>>>>>>>>> runs on one worker.
>>>>>>>>>>>
>>>>>>>>>>> In such a case, cleanup / @TearDown of the DoFn is not enough.
>>>>>>>>>>> We need an API for a PTransform to schedule some cleanup work for 
>>>>>>>>>>> when the
>>>>>>>>>>> transform is "done". In batch this is relatively straightforward, 
>>>>>>>>>>> but
>>>>>>>>>>> doesn't exist. This is the source of some problems, such as 
>>>>>>>>>>> BigQuery sink
>>>>>>>>>>> leaving files around that have failed to import into BigQuery.
>>>>>>>>>>>
>>>>>>>>>>> In streaming this is less straightforward -- do you want to wait
>>>>>>>>>>> until the end of the pipeline? Or do you want to wait until the end 
>>>>>>>>>>> of the
>>>>>>>>>>> window? In practice, you just want to wait until you know nobody 
>>>>>>>>>>> will need
>>>>>>>>>>> the resource anymore.
>>>>>>>>>>>
>>>>>>>>>>> This led to some discussions around a "cleanup" API, where you
>>>>>>>>>>> could have a transform that output resource objects. Each resource 
>>>>>>>>>>> object
>>>>>>>>>>> would have logic for cleaning it up. And there would be something 
>>>>>>>>>>> that
>>>>>>>>>>> indicated what parts of the pipeline needed that resource, and what 
>>>>>>>>>>> kind of
>>>>>>>>>>> temporal lifetime those objects had. As soon as that part of the 
>>>>>>>>>>> pipeline
>>>>>>>>>>> had advanced far enough that it would no longer need the resources, 
>>>>>>>>>>> they
>>>>>>>>>>> would get cleaned up. This can be done at pipeline shutdown, or
>>>>>>>>>>> incrementally during a streaming pipeline, etc.
>>>>>>>>>>>
>>>>>>>>>>> Would something like this be a better fit for your use case? If
>>>>>>>>>>> not, why is handling teardown within a single DoFn sufficient?
>>>>>>>>>>>
>>>>>>>>>>> On Sun, Feb 18, 2018 at 11:53 AM Romain Manni-Bucau <
>>>>>>>>>>> rmannibu...@gmail.com> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> Yes 1M. Lets try to explain you simplifying the overall
>>>>>>>>>>>> execution. Each instance - one fn so likely in a thread of a 
>>>>>>>>>>>> worker - has
>>>>>>>>>>>> its lifecycle. Caricaturally: "new" and garbage collection.
>>>>>>>>>>>>
>>>>>>>>>>>> In practise, new is often an unsafe allocate (deserialization)
>>>>>>>>>>>> but it doesnt matter here.
>>>>>>>>>>>>
>>>>>>>>>>>> What i want is any "new" to have a following setup before any
>>>>>>>>>>>> process or stattbundle and the last time beam has the instance 
>>>>>>>>>>>> before it is
>>>>>>>>>>>> gc-ed and after last finishbundle it calls teardown.
>>>>>>>>>>>>
>>>>>>>>>>>> It is as simple as it.
>>>>>>>>>>>> This way no need to comibe fn in a way making a fn not self
>>>>>>>>>>>> contained to implement basic transforms.
>>>>>>>>>>>>
>>>>>>>>>>>> Le 18 févr. 2018 20:07, "Reuven Lax" <re...@google.com> a
>>>>>>>>>>>> écrit :
>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>> On Sun, Feb 18, 2018 at 10:50 AM, Romain Manni-Bucau <
>>>>>>>>>>>>> rmannibu...@gmail.com> wrote:
>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Le 18 févr. 2018 19:28, "Ben Chambers" <bchamb...@apache.org>
>>>>>>>>>>>>>> a écrit :
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> It feels like his thread may be a bit off-track. Rather than
>>>>>>>>>>>>>> focusing on the semantics of the existing methods -- which have 
>>>>>>>>>>>>>> been noted
>>>>>>>>>>>>>> to be meet many existing use cases -- it would be helpful to 
>>>>>>>>>>>>>> focus on more
>>>>>>>>>>>>>> on the reason you are looking for something with different 
>>>>>>>>>>>>>> semantics.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Some possibilities (I'm not sure which one you are trying to
>>>>>>>>>>>>>> do):
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> 1. Clean-up some external, global resource, that was
>>>>>>>>>>>>>> initialized once during the startup of the pipeline. If this is 
>>>>>>>>>>>>>> the case,
>>>>>>>>>>>>>> how are you ensuring it was really only initialized once (and 
>>>>>>>>>>>>>> not once per
>>>>>>>>>>>>>> worker, per thread, per instance, etc.)? How do you know when 
>>>>>>>>>>>>>> the pipeline
>>>>>>>>>>>>>> should release it? If the answer is "when it reaches step X", 
>>>>>>>>>>>>>> then what
>>>>>>>>>>>>>> about a streaming pipeline?
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> When the dofn is no more needed logically ie when the batch
>>>>>>>>>>>>>> is done or stream is stopped (manually or by a jvm shutdown)
>>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>> I'm really not following what this means.
>>>>>>>>>>>>>
>>>>>>>>>>>>> Let's say that a pipeline is running 1000 workers, and each
>>>>>>>>>>>>> worker is running 1000 threads (each running a copy of the same 
>>>>>>>>>>>>> DoFn). How
>>>>>>>>>>>>> many cleanups do you want (do you want 1000 * 1000 = 1M cleanups) 
>>>>>>>>>>>>> and when
>>>>>>>>>>>>> do you want it called? When the entire pipeline is shut down? 
>>>>>>>>>>>>> When an
>>>>>>>>>>>>> individual worker is about to shut down (which may be temporary - 
>>>>>>>>>>>>> may be
>>>>>>>>>>>>> about to start back up)? Something else?
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> 2. Finalize some resources that are used within some region
>>>>>>>>>>>>>> of the pipeline. While, the DoFn lifecycle methods are not a 
>>>>>>>>>>>>>> good fit for
>>>>>>>>>>>>>> this (they are focused on managing resources within the DoFn), 
>>>>>>>>>>>>>> you could
>>>>>>>>>>>>>> model this on how FileIO finalizes the files that it produced. 
>>>>>>>>>>>>>> For instance:
>>>>>>>>>>>>>>    a) ParDo generates "resource IDs" (or some token that
>>>>>>>>>>>>>> stores information about resources)
>>>>>>>>>>>>>>    b) "Require Deterministic Input" (to prevent retries from
>>>>>>>>>>>>>> changing resource IDs)
>>>>>>>>>>>>>>    c) ParDo that initializes the resources
>>>>>>>>>>>>>>    d) Pipeline segments that use the resources, and
>>>>>>>>>>>>>> eventually output the fact they're done
>>>>>>>>>>>>>>    e) "Require Deterministic Input"
>>>>>>>>>>>>>>    f) ParDo that frees the resources
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> By making the use of the resource part of the data it is
>>>>>>>>>>>>>> possible to "checkpoint" which resources may be in use or have 
>>>>>>>>>>>>>> been
>>>>>>>>>>>>>> finished by using the require deterministic input. This is 
>>>>>>>>>>>>>> important to
>>>>>>>>>>>>>> ensuring everything is actually cleaned up.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> I nees that but generic and not case by case to industrialize
>>>>>>>>>>>>>> some api on top of beam.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> 3. Some other use case that I may be missing? If it is this
>>>>>>>>>>>>>> case, could you elaborate on what you are trying to accomplish? 
>>>>>>>>>>>>>> That would
>>>>>>>>>>>>>> help me understand both the problems with existing options and 
>>>>>>>>>>>>>> possibly
>>>>>>>>>>>>>> what could be done to help.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> I understand there are sorkaround for almost all cases but
>>>>>>>>>>>>>> means each transform is different in its lifecycle handling  
>>>>>>>>>>>>>> except i
>>>>>>>>>>>>>> dislike it a lot at a scale and as a user since you cant put any 
>>>>>>>>>>>>>> unified
>>>>>>>>>>>>>> practise on top of beam, it also makes beam very hard to 
>>>>>>>>>>>>>> integrate or to
>>>>>>>>>>>>>> use to build higher level libraries or softwares.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> This is why i tried to not start the workaround discussions
>>>>>>>>>>>>>> and just stay at API level.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> -- Ben
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> On Sun, Feb 18, 2018 at 9:56 AM Romain Manni-Bucau <
>>>>>>>>>>>>>> rmannibu...@gmail.com> wrote:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> 2018-02-18 18:36 GMT+01:00 Eugene Kirpichov <
>>>>>>>>>>>>>>> kirpic...@google.com>:
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> "Machine state" is overly low-level because many of the
>>>>>>>>>>>>>>>> possible reasons can happen on a perfectly fine machine.
>>>>>>>>>>>>>>>> If you'd like to rephrase it to "it will be called except
>>>>>>>>>>>>>>>> in various situations where it's logically impossible or 
>>>>>>>>>>>>>>>> impractical to
>>>>>>>>>>>>>>>> guarantee that it's called", that's fine. Or you can list some 
>>>>>>>>>>>>>>>> of the
>>>>>>>>>>>>>>>> examples above.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Sounds ok to me
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> The main point for the user is, you *will* see
>>>>>>>>>>>>>>>> non-preventable situations where it couldn't be called - it's 
>>>>>>>>>>>>>>>> not just
>>>>>>>>>>>>>>>> intergalactic crashes - so if the logic is very important 
>>>>>>>>>>>>>>>> (e.g. cleaning up
>>>>>>>>>>>>>>>> a large amount of temporary files, shutting down a large 
>>>>>>>>>>>>>>>> number of VMs you
>>>>>>>>>>>>>>>> started etc), you have to express it using one of the other 
>>>>>>>>>>>>>>>> methods that
>>>>>>>>>>>>>>>> have stricter guarantees (which obviously come at a cost, e.g. 
>>>>>>>>>>>>>>>> no
>>>>>>>>>>>>>>>> pass-by-reference).
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> FinishBundle has the exact same guarantee sadly so not which
>>>>>>>>>>>>>>> which other method you speak about. Concretely if you make it 
>>>>>>>>>>>>>>> really
>>>>>>>>>>>>>>> unreliable - this is what best effort sounds to me - then users 
>>>>>>>>>>>>>>> can use it
>>>>>>>>>>>>>>> to clean anything but if you make it "can happen but it is 
>>>>>>>>>>>>>>> unexpected and
>>>>>>>>>>>>>>> means something happent" then it is fine to have a manual - or 
>>>>>>>>>>>>>>> auto if
>>>>>>>>>>>>>>> fancy - recovery procedure. This is where it makes all the 
>>>>>>>>>>>>>>> difference and
>>>>>>>>>>>>>>> impacts the developpers, ops (all users basically).
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> On Sun, Feb 18, 2018 at 9:16 AM Romain Manni-Bucau <
>>>>>>>>>>>>>>>> rmannibu...@gmail.com> wrote:
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Agree Eugene except that "best effort" means that. It is
>>>>>>>>>>>>>>>>> also often used to say "at will" and this is what triggered 
>>>>>>>>>>>>>>>>> this thread.
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> I'm fine using "except if the machine state prevents it"
>>>>>>>>>>>>>>>>> but "best effort" is too open and can be very badly and 
>>>>>>>>>>>>>>>>> wrongly perceived
>>>>>>>>>>>>>>>>> by users (like I did).
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Romain Manni-Bucau
>>>>>>>>>>>>>>>>> @rmannibucau <https://twitter.com/rmannibucau> |  Blog
>>>>>>>>>>>>>>>>> <https://rmannibucau.metawerx.net/> | Old Blog
>>>>>>>>>>>>>>>>> <http://rmannibucau.wordpress.com> | Github
>>>>>>>>>>>>>>>>> <https://github.com/rmannibucau> | LinkedIn
>>>>>>>>>>>>>>>>> <https://www.linkedin.com/in/rmannibucau> | Book
>>>>>>>>>>>>>>>>> <https://www.packtpub.com/application-development/java-ee-8-high-performance>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> 2018-02-18 18:13 GMT+01:00 Eugene Kirpichov <
>>>>>>>>>>>>>>>>> kirpic...@google.com>:
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> It will not be called if it's impossible to call it: in
>>>>>>>>>>>>>>>>>> the example situation you have (intergalactic crash), and in 
>>>>>>>>>>>>>>>>>> a number of
>>>>>>>>>>>>>>>>>> more common cases: eg in case the worker container has 
>>>>>>>>>>>>>>>>>> crashed (eg user
>>>>>>>>>>>>>>>>>> code in a different thread called a C library over JNI and 
>>>>>>>>>>>>>>>>>> it segfaulted),
>>>>>>>>>>>>>>>>>> JVM bug, crash due to user code OOM, in case the worker has 
>>>>>>>>>>>>>>>>>> lost network
>>>>>>>>>>>>>>>>>> connectivity (then it may be called but it won't be able to 
>>>>>>>>>>>>>>>>>> do anything
>>>>>>>>>>>>>>>>>> useful), in case this is running on a preemptible VM and it 
>>>>>>>>>>>>>>>>>> was preempted
>>>>>>>>>>>>>>>>>> by the underlying cluster manager without notice or if the 
>>>>>>>>>>>>>>>>>> worker was too
>>>>>>>>>>>>>>>>>> busy with other stuff (eg calling other Teardown functions) 
>>>>>>>>>>>>>>>>>> until the
>>>>>>>>>>>>>>>>>> preemption timeout elapsed, in case the underlying hardware 
>>>>>>>>>>>>>>>>>> simply failed
>>>>>>>>>>>>>>>>>> (which happens quite often at scale), and in many other 
>>>>>>>>>>>>>>>>>> conditions.
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> "Best effort" is the commonly used term to describe such
>>>>>>>>>>>>>>>>>> behavior. Please feel free to file bugs for cases where you 
>>>>>>>>>>>>>>>>>> observed a
>>>>>>>>>>>>>>>>>> runner not call Teardown in a situation where it was 
>>>>>>>>>>>>>>>>>> possible to call it
>>>>>>>>>>>>>>>>>> but the runner made insufficient effort.
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> On Sun, Feb 18, 2018, 9:02 AM Romain Manni-Bucau <
>>>>>>>>>>>>>>>>>> rmannibu...@gmail.com> wrote:
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> 2018-02-18 18:00 GMT+01:00 Eugene Kirpichov <
>>>>>>>>>>>>>>>>>>> kirpic...@google.com>:
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> On Sun, Feb 18, 2018, 2:06 AM Romain Manni-Bucau <
>>>>>>>>>>>>>>>>>>>> rmannibu...@gmail.com> wrote:
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> Le 18 févr. 2018 00:23, "Kenneth Knowles" <
>>>>>>>>>>>>>>>>>>>>> k...@google.com> a écrit :
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> On Sat, Feb 17, 2018 at 3:09 PM, Romain Manni-Bucau <
>>>>>>>>>>>>>>>>>>>>> rmannibu...@gmail.com> wrote:
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>> If you give an example of a high-level need (e.g.
>>>>>>>>>>>>>>>>>>>>>> "I'm trying to write an IO for system $x and it requires 
>>>>>>>>>>>>>>>>>>>>>> the following
>>>>>>>>>>>>>>>>>>>>>> initialization and the following cleanup logic and the 
>>>>>>>>>>>>>>>>>>>>>> following processing
>>>>>>>>>>>>>>>>>>>>>> in between") I'll be better able to help you.
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>> Take a simple example of a transform requiring a
>>>>>>>>>>>>>>>>>>>>>> connection. Using bundles is a perf killer since size is 
>>>>>>>>>>>>>>>>>>>>>> not controlled.
>>>>>>>>>>>>>>>>>>>>>> Using teardown doesnt allow you to release the 
>>>>>>>>>>>>>>>>>>>>>> connection since it is a
>>>>>>>>>>>>>>>>>>>>>> best effort thing. Not releasing the connection makes 
>>>>>>>>>>>>>>>>>>>>>> you pay a lot - aws
>>>>>>>>>>>>>>>>>>>>>> ;) - or prevents you to launch other processings - 
>>>>>>>>>>>>>>>>>>>>>> concurrent limit.
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> For this example @Teardown is an exact fit. If things
>>>>>>>>>>>>>>>>>>>>> die so badly that @Teardown is not called then nothing 
>>>>>>>>>>>>>>>>>>>>> else can be called
>>>>>>>>>>>>>>>>>>>>> to close the connection either. What AWS service are you 
>>>>>>>>>>>>>>>>>>>>> thinking of that
>>>>>>>>>>>>>>>>>>>>> stays open for a long time when everything at the other 
>>>>>>>>>>>>>>>>>>>>> end has died?
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> You assume connections are kind of stateless but some
>>>>>>>>>>>>>>>>>>>>> (proprietary) protocols requires some closing exchanges 
>>>>>>>>>>>>>>>>>>>>> which are not only
>>>>>>>>>>>>>>>>>>>>> "im leaving".
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> For aws i was thinking about starting some services -
>>>>>>>>>>>>>>>>>>>>> machines - on the fly in a pipeline startup and closing 
>>>>>>>>>>>>>>>>>>>>> them at the end. If
>>>>>>>>>>>>>>>>>>>>> teardown is not called you leak machines and money. You 
>>>>>>>>>>>>>>>>>>>>> can say it can be
>>>>>>>>>>>>>>>>>>>>> done another way...as the full pipeline ;).
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> I dont want to be picky but if beam cant handle its
>>>>>>>>>>>>>>>>>>>>> components lifecycle it can be used at scale for generic 
>>>>>>>>>>>>>>>>>>>>> pipelines and if
>>>>>>>>>>>>>>>>>>>>> bound to some particular IO.
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> What does prevent to enforce teardown - ignoring the
>>>>>>>>>>>>>>>>>>>>> interstellar crash case which cant be handled by any 
>>>>>>>>>>>>>>>>>>>>> human system? Nothing
>>>>>>>>>>>>>>>>>>>>> technically. Why do you push to not handle it? Is it due 
>>>>>>>>>>>>>>>>>>>>> to some legacy
>>>>>>>>>>>>>>>>>>>>> code on dataflow or something else?
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> Teardown *is* already documented and implemented this
>>>>>>>>>>>>>>>>>>>> way (best-effort). So I'm not sure what kind of change 
>>>>>>>>>>>>>>>>>>>> you're asking for.
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> Remove "best effort" from the javadoc. If it is not call
>>>>>>>>>>>>>>>>>>> then it is a bug and we are done :).
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> Also what does it mean for the users? Direct runner
>>>>>>>>>>>>>>>>>>>>> does it so if a user udes the RI in test, he will get a 
>>>>>>>>>>>>>>>>>>>>> different behavior
>>>>>>>>>>>>>>>>>>>>> in prod? Also dont forget the user doesnt know what the 
>>>>>>>>>>>>>>>>>>>>> IOs he composes use
>>>>>>>>>>>>>>>>>>>>> so this is so impacting for the whole product than he 
>>>>>>>>>>>>>>>>>>>>> must be handled IMHO.
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> I understand the portability culture is new in big
>>>>>>>>>>>>>>>>>>>>> data world but it is not a reason to ignore what people 
>>>>>>>>>>>>>>>>>>>>> did for years and
>>>>>>>>>>>>>>>>>>>>> do it wrong before doing right ;).
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> My proposal is to list what can prevent to guarantee -
>>>>>>>>>>>>>>>>>>>>> in the normal IT conditions - the execution of teardown. 
>>>>>>>>>>>>>>>>>>>>> Then we see if we
>>>>>>>>>>>>>>>>>>>>> can handle it and only if there is a technical reason we 
>>>>>>>>>>>>>>>>>>>>> cant we make it
>>>>>>>>>>>>>>>>>>>>> experimental/unsupported in the api. I know spark and 
>>>>>>>>>>>>>>>>>>>>> flink can, any
>>>>>>>>>>>>>>>>>>>>> unknown blocker for other runners?
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> Technical note: even a kill should go through java
>>>>>>>>>>>>>>>>>>>>> shutdown hooks otherwise your environment (beam enclosing 
>>>>>>>>>>>>>>>>>>>>> software) is
>>>>>>>>>>>>>>>>>>>>> fully unhandled and your overall system is uncontrolled. 
>>>>>>>>>>>>>>>>>>>>> Only case where it
>>>>>>>>>>>>>>>>>>>>> is not true is when the software is always owned by a 
>>>>>>>>>>>>>>>>>>>>> vendor and never
>>>>>>>>>>>>>>>>>>>>> installed on customer environment. In this case it 
>>>>>>>>>>>>>>>>>>>>> belongd to the vendor to
>>>>>>>>>>>>>>>>>>>>> handle beam API and not to beam to adjust its API for a 
>>>>>>>>>>>>>>>>>>>>> vendor - otherwise
>>>>>>>>>>>>>>>>>>>>> all unsupported features by one runner should be made 
>>>>>>>>>>>>>>>>>>>>> optional right?
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> All state is not about network, even in distributed
>>>>>>>>>>>>>>>>>>>>> systems so this is key to have an explicit and defined 
>>>>>>>>>>>>>>>>>>>>> lifecycle.
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> Kenn
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>

Reply via email to