Romain, would it be fair to say that currently the goal of your
participation in this discussion is to identify situations where @Teardown
in principle could have been called, but some of the current runners don't
make a good enough effort to call it? If yes - as I said before, please, by
all means, file bugs of the form "Runner X doesn't call @Teardown in
situation Y" if you're aware of any, and feel free to send PRs fixing
runner X to reliably call @Teardown in situation Y. I think we all agree
that this would be a good improvement.

On Mon, Feb 19, 2018 at 2:03 PM Romain Manni-Bucau <[email protected]>
wrote:

>
>
> Le 19 févr. 2018 22:56, "Reuven Lax" <[email protected]> a écrit :
>
>
>
> On Mon, Feb 19, 2018 at 1:51 PM, Romain Manni-Bucau <[email protected]
> > wrote:
>
>>
>>
>> Le 19 févr. 2018 21:28, "Reuven Lax" <[email protected]> a écrit :
>>
>> How do you call teardown? There are cases in which the Java code gets no
>> indication that the restart is happening (e.g. cases where the machine
>> itself is taken down)
>>
>>
>> This is a bug, 0 downtime maintenance is very doable in 2018 ;). Crashes
>> are bugs, kill -9 to shutdown is a bug too. Other cases let call shutdown
>> with a hook worse case.
>>
>
> What you say here is simply not true.
>
> There are many scenarios in which workers shutdown with no opportunity for
> any sort of shutdown hook. Sometimes the entire machine gets shutdown, and
> not even the OS will have much of a chance to do anything. At scale this
> will happen with some regularity, and a distributed system that assumes
> this will not happen is a poor distributed system.
>
>
> This is part of the infra and there is no reason the machine is shutdown
> without shutting down what runs on it before except if it is a bug in the
> software or setup. I can hear you maybe dont do it everywhere but there is
> no blocker to do it. Means you can shutdown the machines and guarantee
> teardown is called.
>
> Where i go is simply that it is doable and beam sdk core can assume setup
> is well done. If there is a best effort downside due to that - with the
> meaning you defined - it is an impl bug or a user installation issue.
>
> Technically all is true.
>
> What can prevent teardown is a hardware failure or so. This is fine and
> doesnt need to be in doc since it is life in IT and obvious or must be very
> explicit to avoid current ambiguity.
>
>
>
>>
>>
>>
>> On Mon, Feb 19, 2018, 12:24 PM Romain Manni-Bucau <[email protected]>
>> wrote:
>>
>>> Restarting doesnt mean you dont call teardown. Except a bug there is no
>>> reason - technically - it happens, no reason.
>>>
>>> Le 19 févr. 2018 21:14, "Reuven Lax" <[email protected]> a écrit :
>>>
>>>> Workers restarting is not a bug, it's standard often expected.
>>>>
>>>> On Mon, Feb 19, 2018, 12:03 PM Romain Manni-Bucau <
>>>> [email protected]> wrote:
>>>>
>>>>> Nothing, as mentionned it is a bug so recovery is a bug recovery
>>>>> (procedure)
>>>>>
>>>>> Le 19 févr. 2018 19:42, "Eugene Kirpichov" <[email protected]> a
>>>>> écrit :
>>>>>
>>>>>> So what would you like to happen if there is a crash? The DoFn
>>>>>> instance no longer exists because the JVM it ran on no longer exists. 
>>>>>> What
>>>>>> should Teardown be called on?
>>>>>>
>>>>>> On Mon, Feb 19, 2018, 10:20 AM Romain Manni-Bucau <
>>>>>> [email protected]> wrote:
>>>>>>
>>>>>>> This is what i want and not 999999 teardowns for 1000000 setups
>>>>>>> until there is an unexpected crash (= a bug).
>>>>>>>
>>>>>>> Le 19 févr. 2018 18:57, "Reuven Lax" <[email protected]> a écrit :
>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> On Mon, Feb 19, 2018 at 7:11 AM, Romain Manni-Bucau <
>>>>>>>> [email protected]> wrote:
>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> 2018-02-19 15:57 GMT+01:00 Reuven Lax <[email protected]>:
>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> On Mon, Feb 19, 2018 at 12:35 AM, Romain Manni-Bucau <
>>>>>>>>>> [email protected]> wrote:
>>>>>>>>>>
>>>>>>>>>>> @Reuven: in practise it is created by pool of 256 but leads to
>>>>>>>>>>> the same pattern, the teardown is just a "if (iCreatedThem) 
>>>>>>>>>>> releaseThem();"
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> How do you control "256?" Even if you have a pool of 256 workers,
>>>>>>>>>> nothing in Beam guarantees how many threads and DoFns are created per
>>>>>>>>>> worker. In theory the runner might decide to create 1000 threads on 
>>>>>>>>>> each
>>>>>>>>>> worker.
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>> Nop was the other way around, in this case on AWS you can get 256
>>>>>>>>> instances at once but not 512 (which will be 2x256). So when you 
>>>>>>>>> compute
>>>>>>>>> the distribution you allocate to some fn the role to own the instance
>>>>>>>>> lookup and releasing.
>>>>>>>>>
>>>>>>>>
>>>>>>>> I still don't understand. Let's be more precise. If you write the
>>>>>>>> following code:
>>>>>>>>
>>>>>>>>    pCollection.apply(ParDo.of(new MyDoFn()));
>>>>>>>>
>>>>>>>> There is no way to control how many instances of MyDoFn are
>>>>>>>> created. The runner might decided to create a million instances of this
>>>>>>>> class across your worker pool, which means that you will get a million
>>>>>>>> Setup and Teardown calls.
>>>>>>>>
>>>>>>>>
>>>>>>>>> Anyway this was just an example of an external resource you must
>>>>>>>>> release. Real topic is that beam should define asap a guaranteed 
>>>>>>>>> generic
>>>>>>>>> lifecycle to let user embrace its programming model.
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>> @Eugene:
>>>>>>>>>>> 1. wait logic is about passing the value which is not always
>>>>>>>>>>> possible (like 15% of cases from my raw estimate)
>>>>>>>>>>> 2. sdf: i'll try to detail why i mention SDF more here
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> Concretely beam exposes a portable API (included in the SDK
>>>>>>>>>>> core). This API defines a *container* API and therefore implies bean
>>>>>>>>>>> lifecycles. I'll not detail them all but just use the sources and 
>>>>>>>>>>> dofn (not
>>>>>>>>>>> sdf) to illustrate the idea I'm trying to develop.
>>>>>>>>>>>
>>>>>>>>>>> A. Source
>>>>>>>>>>>
>>>>>>>>>>> A source computes a partition plan with 2 primitives:
>>>>>>>>>>> estimateSize and split. As an user you can expect both to be called 
>>>>>>>>>>> on the
>>>>>>>>>>> same bean instance to avoid to pay the same connection cost(s) 
>>>>>>>>>>> twice.
>>>>>>>>>>> Concretely:
>>>>>>>>>>>
>>>>>>>>>>> connect()
>>>>>>>>>>> try {
>>>>>>>>>>>   estimateSize()
>>>>>>>>>>>   split()
>>>>>>>>>>> } finally {
>>>>>>>>>>>   disconnect()
>>>>>>>>>>> }
>>>>>>>>>>>
>>>>>>>>>>> this is not guaranteed by the API so you must do:
>>>>>>>>>>>
>>>>>>>>>>> connect()
>>>>>>>>>>> try {
>>>>>>>>>>>   estimateSize()
>>>>>>>>>>> } finally {
>>>>>>>>>>>   disconnect()
>>>>>>>>>>> }
>>>>>>>>>>> connect()
>>>>>>>>>>> try {
>>>>>>>>>>>   split()
>>>>>>>>>>> } finally {
>>>>>>>>>>>   disconnect()
>>>>>>>>>>> }
>>>>>>>>>>>
>>>>>>>>>>> + a workaround with an internal estimate size since this
>>>>>>>>>>> primitive is often called in split but you dont want to connect 
>>>>>>>>>>> twice in
>>>>>>>>>>> the second phase.
>>>>>>>>>>>
>>>>>>>>>>> Why do you need that? Simply cause you want to define an API to
>>>>>>>>>>> implement sources which initializes the source bean and destroys it.
>>>>>>>>>>> I insists it is a very very basic concern for such API. However
>>>>>>>>>>> beam doesn't embraces it and doesn't assume it so building any API 
>>>>>>>>>>> on top
>>>>>>>>>>> of beam is very hurtful today and for direct beam users you hit the 
>>>>>>>>>>> exact
>>>>>>>>>>> same issues - check how IO are implemented, the static utilities 
>>>>>>>>>>> which
>>>>>>>>>>> create volatile connections preventing to reuse existing connection 
>>>>>>>>>>> in a
>>>>>>>>>>> single method (
>>>>>>>>>>> https://github.com/apache/beam/blob/master/sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java#L862
>>>>>>>>>>> ).
>>>>>>>>>>>
>>>>>>>>>>> Same logic applies to the reader which is then created.
>>>>>>>>>>>
>>>>>>>>>>> B. DoFn & SDF
>>>>>>>>>>>
>>>>>>>>>>> As a fn dev you expect the same from the beam runtime: init();
>>>>>>>>>>> try { while (...) process(); } finally { destroy(); } and that it is
>>>>>>>>>>> executed on the exact same instance to be able to be stateful at 
>>>>>>>>>>> that level
>>>>>>>>>>> for expensive connections/operations/flow state handling.
>>>>>>>>>>>
>>>>>>>>>>> As you mentionned with the million example, this sequence should
>>>>>>>>>>> happen for each single instance so 1M times for your example.
>>>>>>>>>>>
>>>>>>>>>>> Now why did I mention SDF several times? Because SDF is a
>>>>>>>>>>> generalisation of both cases (source and dofn). Therefore it 
>>>>>>>>>>> creates way
>>>>>>>>>>> more instances and requires to have a way more strict/explicit 
>>>>>>>>>>> definition
>>>>>>>>>>> of the exact lifecycle and which instance does what. Since beam 
>>>>>>>>>>> handles the
>>>>>>>>>>> full lifecycle of the bean instances it must provide init/destroy 
>>>>>>>>>>> hooks
>>>>>>>>>>> (setup/teardown) which can be stateful.
>>>>>>>>>>>
>>>>>>>>>>> If you take the JDBC example which was mentionned earlier.
>>>>>>>>>>> Today, because of the teardown issue it uses bundles. Since bundles 
>>>>>>>>>>> size is
>>>>>>>>>>> not defined - and will not with SDF, it must use a pool to be able 
>>>>>>>>>>> to reuse
>>>>>>>>>>> a connection instance to not correct performances. Now with the SDF 
>>>>>>>>>>> and the
>>>>>>>>>>> split increase, how do you handle the pool size? Generally in batch 
>>>>>>>>>>> you use
>>>>>>>>>>> a single connection per thread to avoid to consume all database
>>>>>>>>>>> connections. With a pool you have 2 choices: 1. use a pool of 1, 2. 
>>>>>>>>>>> use a
>>>>>>>>>>> pool a bit higher but multiplied by the number of beans you will 
>>>>>>>>>>> likely x2
>>>>>>>>>>> or 3 the connection count and make the execution fail with "no more
>>>>>>>>>>> connection available". I you picked 1 (pool of #1), then you still 
>>>>>>>>>>> have to
>>>>>>>>>>> have a reliable teardown by pool instance (close() generally) to 
>>>>>>>>>>> ensure you
>>>>>>>>>>> release the pool and don't leak the connection information in the 
>>>>>>>>>>> JVM. In
>>>>>>>>>>> all case you come back to the init()/destroy() lifecycle even if 
>>>>>>>>>>> you fake
>>>>>>>>>>> to get connections with bundles.
>>>>>>>>>>>
>>>>>>>>>>> Just to make it obvious: SDF mentions are just cause SDF imply
>>>>>>>>>>> all the current issues with the loose definition of the bean 
>>>>>>>>>>> lifecycles at
>>>>>>>>>>> an exponential level, nothing else.
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> Romain Manni-Bucau
>>>>>>>>>>> @rmannibucau <https://twitter.com/rmannibucau> |  Blog
>>>>>>>>>>> <https://rmannibucau.metawerx.net/> | Old Blog
>>>>>>>>>>> <http://rmannibucau.wordpress.com> | Github
>>>>>>>>>>> <https://github.com/rmannibucau> | LinkedIn
>>>>>>>>>>> <https://www.linkedin.com/in/rmannibucau> | Book
>>>>>>>>>>> <https://www.packtpub.com/application-development/java-ee-8-high-performance>
>>>>>>>>>>>
>>>>>>>>>>> 2018-02-18 22:32 GMT+01:00 Eugene Kirpichov <
>>>>>>>>>>> [email protected]>:
>>>>>>>>>>>
>>>>>>>>>>>> The kind of whole-transform lifecycle you're mentioning can be
>>>>>>>>>>>> accomplished using the Wait transform as I suggested in the thread 
>>>>>>>>>>>> above,
>>>>>>>>>>>> and I believe it should become the canonical way to do that.
>>>>>>>>>>>>
>>>>>>>>>>>> (Would like to reiterate one more time, as the main author of
>>>>>>>>>>>> most design documents related to SDF and of its implementation in 
>>>>>>>>>>>> the Java
>>>>>>>>>>>> direct and dataflow runner that SDF is fully unrelated to the 
>>>>>>>>>>>> topic of
>>>>>>>>>>>> cleanup - I'm very confused as to why it keeps coming up)
>>>>>>>>>>>>
>>>>>>>>>>>> On Sun, Feb 18, 2018, 1:15 PM Romain Manni-Bucau <
>>>>>>>>>>>> [email protected]> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>>> I kind of agree except transforms lack a lifecycle too. My
>>>>>>>>>>>>> understanding is that sdf could be a way to unify it and clean 
>>>>>>>>>>>>> the api.
>>>>>>>>>>>>>
>>>>>>>>>>>>> Otherwise how to normalize - single api -  lifecycle of
>>>>>>>>>>>>> transforms?
>>>>>>>>>>>>>
>>>>>>>>>>>>> Le 18 févr. 2018 21:32, "Ben Chambers" <[email protected]>
>>>>>>>>>>>>> a écrit :
>>>>>>>>>>>>>
>>>>>>>>>>>>>> Are you sure that focusing on the cleanup of specific DoFn's
>>>>>>>>>>>>>> is appropriate? Many cases where cleanup is necessary, it is 
>>>>>>>>>>>>>> around an
>>>>>>>>>>>>>> entire composite PTransform. I think there have been 
>>>>>>>>>>>>>> discussions/proposals
>>>>>>>>>>>>>> around a more methodical "cleanup" option, but those haven't been
>>>>>>>>>>>>>> implemented, to the best of my knowledge.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> For instance, consider the steps of a FileIO:
>>>>>>>>>>>>>> 1. Write to a bunch (N shards) of temporary files
>>>>>>>>>>>>>> 2. When all temporary files are complete, attempt to do a
>>>>>>>>>>>>>> bulk copy to put them in the final destination.
>>>>>>>>>>>>>> 3. Cleanup all the temporary files.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> (This is often desirable because it minimizes the chance of
>>>>>>>>>>>>>> seeing partial/incomplete results in the final destination).
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> In the above, you'd want step 1 to execute on many workers,
>>>>>>>>>>>>>> likely using a ParDo (say N different workers).
>>>>>>>>>>>>>> The move step should only happen once, so on one worker. This
>>>>>>>>>>>>>> means it will be a different DoFn, likely with some stuff done 
>>>>>>>>>>>>>> to ensure it
>>>>>>>>>>>>>> runs on one worker.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> In such a case, cleanup / @TearDown of the DoFn is not
>>>>>>>>>>>>>> enough. We need an API for a PTransform to schedule some cleanup 
>>>>>>>>>>>>>> work for
>>>>>>>>>>>>>> when the transform is "done". In batch this is relatively 
>>>>>>>>>>>>>> straightforward,
>>>>>>>>>>>>>> but doesn't exist. This is the source of some problems, such as 
>>>>>>>>>>>>>> BigQuery
>>>>>>>>>>>>>> sink leaving files around that have failed to import into 
>>>>>>>>>>>>>> BigQuery.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> In streaming this is less straightforward -- do you want to
>>>>>>>>>>>>>> wait until the end of the pipeline? Or do you want to wait until 
>>>>>>>>>>>>>> the end of
>>>>>>>>>>>>>> the window? In practice, you just want to wait until you know 
>>>>>>>>>>>>>> nobody will
>>>>>>>>>>>>>> need the resource anymore.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> This led to some discussions around a "cleanup" API, where
>>>>>>>>>>>>>> you could have a transform that output resource objects. Each 
>>>>>>>>>>>>>> resource
>>>>>>>>>>>>>> object would have logic for cleaning it up. And there would be 
>>>>>>>>>>>>>> something
>>>>>>>>>>>>>> that indicated what parts of the pipeline needed that resource, 
>>>>>>>>>>>>>> and what
>>>>>>>>>>>>>> kind of temporal lifetime those objects had. As soon as that 
>>>>>>>>>>>>>> part of the
>>>>>>>>>>>>>> pipeline had advanced far enough that it would no longer need the
>>>>>>>>>>>>>> resources, they would get cleaned up. This can be done at 
>>>>>>>>>>>>>> pipeline
>>>>>>>>>>>>>> shutdown, or incrementally during a streaming pipeline, etc.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Would something like this be a better fit for your use case?
>>>>>>>>>>>>>> If not, why is handling teardown within a single DoFn sufficient?
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> On Sun, Feb 18, 2018 at 11:53 AM Romain Manni-Bucau <
>>>>>>>>>>>>>> [email protected]> wrote:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Yes 1M. Lets try to explain you simplifying the overall
>>>>>>>>>>>>>>> execution. Each instance - one fn so likely in a thread of a 
>>>>>>>>>>>>>>> worker - has
>>>>>>>>>>>>>>> its lifecycle. Caricaturally: "new" and garbage collection.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> In practise, new is often an unsafe allocate
>>>>>>>>>>>>>>> (deserialization) but it doesnt matter here.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> What i want is any "new" to have a following setup before
>>>>>>>>>>>>>>> any process or stattbundle and the last time beam has the 
>>>>>>>>>>>>>>> instance before
>>>>>>>>>>>>>>> it is gc-ed and after last finishbundle it calls teardown.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> It is as simple as it.
>>>>>>>>>>>>>>> This way no need to comibe fn in a way making a fn not self
>>>>>>>>>>>>>>> contained to implement basic transforms.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Le 18 févr. 2018 20:07, "Reuven Lax" <[email protected]> a
>>>>>>>>>>>>>>> écrit :
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> On Sun, Feb 18, 2018 at 10:50 AM, Romain Manni-Bucau <
>>>>>>>>>>>>>>>> [email protected]> wrote:
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Le 18 févr. 2018 19:28, "Ben Chambers" <
>>>>>>>>>>>>>>>>> [email protected]> a écrit :
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> It feels like his thread may be a bit off-track. Rather
>>>>>>>>>>>>>>>>> than focusing on the semantics of the existing methods -- 
>>>>>>>>>>>>>>>>> which have been
>>>>>>>>>>>>>>>>> noted to be meet many existing use cases -- it would be 
>>>>>>>>>>>>>>>>> helpful to focus on
>>>>>>>>>>>>>>>>> more on the reason you are looking for something with 
>>>>>>>>>>>>>>>>> different semantics.
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Some possibilities (I'm not sure which one you are trying
>>>>>>>>>>>>>>>>> to do):
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> 1. Clean-up some external, global resource, that was
>>>>>>>>>>>>>>>>> initialized once during the startup of the pipeline. If this 
>>>>>>>>>>>>>>>>> is the case,
>>>>>>>>>>>>>>>>> how are you ensuring it was really only initialized once (and 
>>>>>>>>>>>>>>>>> not once per
>>>>>>>>>>>>>>>>> worker, per thread, per instance, etc.)? How do you know when 
>>>>>>>>>>>>>>>>> the pipeline
>>>>>>>>>>>>>>>>> should release it? If the answer is "when it reaches step X", 
>>>>>>>>>>>>>>>>> then what
>>>>>>>>>>>>>>>>> about a streaming pipeline?
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> When the dofn is no more needed logically ie when the
>>>>>>>>>>>>>>>>> batch is done or stream is stopped (manually or by a jvm 
>>>>>>>>>>>>>>>>> shutdown)
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> I'm really not following what this means.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Let's say that a pipeline is running 1000 workers, and each
>>>>>>>>>>>>>>>> worker is running 1000 threads (each running a copy of the 
>>>>>>>>>>>>>>>> same DoFn). How
>>>>>>>>>>>>>>>> many cleanups do you want (do you want 1000 * 1000 = 1M 
>>>>>>>>>>>>>>>> cleanups) and when
>>>>>>>>>>>>>>>> do you want it called? When the entire pipeline is shut down? 
>>>>>>>>>>>>>>>> When an
>>>>>>>>>>>>>>>> individual worker is about to shut down (which may be 
>>>>>>>>>>>>>>>> temporary - may be
>>>>>>>>>>>>>>>> about to start back up)? Something else?
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> 2. Finalize some resources that are used within some
>>>>>>>>>>>>>>>>> region of the pipeline. While, the DoFn lifecycle methods are 
>>>>>>>>>>>>>>>>> not a good
>>>>>>>>>>>>>>>>> fit for this (they are focused on managing resources within 
>>>>>>>>>>>>>>>>> the DoFn), you
>>>>>>>>>>>>>>>>> could model this on how FileIO finalizes the files that it 
>>>>>>>>>>>>>>>>> produced. For
>>>>>>>>>>>>>>>>> instance:
>>>>>>>>>>>>>>>>>    a) ParDo generates "resource IDs" (or some token that
>>>>>>>>>>>>>>>>> stores information about resources)
>>>>>>>>>>>>>>>>>    b) "Require Deterministic Input" (to prevent retries
>>>>>>>>>>>>>>>>> from changing resource IDs)
>>>>>>>>>>>>>>>>>    c) ParDo that initializes the resources
>>>>>>>>>>>>>>>>>    d) Pipeline segments that use the resources, and
>>>>>>>>>>>>>>>>> eventually output the fact they're done
>>>>>>>>>>>>>>>>>    e) "Require Deterministic Input"
>>>>>>>>>>>>>>>>>    f) ParDo that frees the resources
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> By making the use of the resource part of the data it is
>>>>>>>>>>>>>>>>> possible to "checkpoint" which resources may be in use or 
>>>>>>>>>>>>>>>>> have been
>>>>>>>>>>>>>>>>> finished by using the require deterministic input. This is 
>>>>>>>>>>>>>>>>> important to
>>>>>>>>>>>>>>>>> ensuring everything is actually cleaned up.
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> I nees that but generic and not case by case to
>>>>>>>>>>>>>>>>> industrialize some api on top of beam.
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> 3. Some other use case that I may be missing? If it is
>>>>>>>>>>>>>>>>> this case, could you elaborate on what you are trying to 
>>>>>>>>>>>>>>>>> accomplish? That
>>>>>>>>>>>>>>>>> would help me understand both the problems with existing 
>>>>>>>>>>>>>>>>> options and
>>>>>>>>>>>>>>>>> possibly what could be done to help.
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> I understand there are sorkaround for almost all cases but
>>>>>>>>>>>>>>>>> means each transform is different in its lifecycle handling  
>>>>>>>>>>>>>>>>> except i
>>>>>>>>>>>>>>>>> dislike it a lot at a scale and as a user since you cant put 
>>>>>>>>>>>>>>>>> any unified
>>>>>>>>>>>>>>>>> practise on top of beam, it also makes beam very hard to 
>>>>>>>>>>>>>>>>> integrate or to
>>>>>>>>>>>>>>>>> use to build higher level libraries or softwares.
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> This is why i tried to not start the workaround
>>>>>>>>>>>>>>>>> discussions and just stay at API level.
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> -- Ben
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> On Sun, Feb 18, 2018 at 9:56 AM Romain Manni-Bucau <
>>>>>>>>>>>>>>>>> [email protected]> wrote:
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> 2018-02-18 18:36 GMT+01:00 Eugene Kirpichov <
>>>>>>>>>>>>>>>>>> [email protected]>:
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> "Machine state" is overly low-level because many of the
>>>>>>>>>>>>>>>>>>> possible reasons can happen on a perfectly fine machine.
>>>>>>>>>>>>>>>>>>> If you'd like to rephrase it to "it will be called
>>>>>>>>>>>>>>>>>>> except in various situations where it's logically 
>>>>>>>>>>>>>>>>>>> impossible or impractical
>>>>>>>>>>>>>>>>>>> to guarantee that it's called", that's fine. Or you can 
>>>>>>>>>>>>>>>>>>> list some of the
>>>>>>>>>>>>>>>>>>> examples above.
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> Sounds ok to me
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> The main point for the user is, you *will* see
>>>>>>>>>>>>>>>>>>> non-preventable situations where it couldn't be called - 
>>>>>>>>>>>>>>>>>>> it's not just
>>>>>>>>>>>>>>>>>>> intergalactic crashes - so if the logic is very important 
>>>>>>>>>>>>>>>>>>> (e.g. cleaning up
>>>>>>>>>>>>>>>>>>> a large amount of temporary files, shutting down a large 
>>>>>>>>>>>>>>>>>>> number of VMs you
>>>>>>>>>>>>>>>>>>> started etc), you have to express it using one of the other 
>>>>>>>>>>>>>>>>>>> methods that
>>>>>>>>>>>>>>>>>>> have stricter guarantees (which obviously come at a cost, 
>>>>>>>>>>>>>>>>>>> e.g. no
>>>>>>>>>>>>>>>>>>> pass-by-reference).
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> FinishBundle has the exact same guarantee sadly so not
>>>>>>>>>>>>>>>>>> which which other method you speak about. Concretely if you 
>>>>>>>>>>>>>>>>>> make it really
>>>>>>>>>>>>>>>>>> unreliable - this is what best effort sounds to me - then 
>>>>>>>>>>>>>>>>>> users can use it
>>>>>>>>>>>>>>>>>> to clean anything but if you make it "can happen but it is 
>>>>>>>>>>>>>>>>>> unexpected and
>>>>>>>>>>>>>>>>>> means something happent" then it is fine to have a manual - 
>>>>>>>>>>>>>>>>>> or auto if
>>>>>>>>>>>>>>>>>> fancy - recovery procedure. This is where it makes all the 
>>>>>>>>>>>>>>>>>> difference and
>>>>>>>>>>>>>>>>>> impacts the developpers, ops (all users basically).
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> On Sun, Feb 18, 2018 at 9:16 AM Romain Manni-Bucau <
>>>>>>>>>>>>>>>>>>> [email protected]> wrote:
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> Agree Eugene except that "best effort" means that. It
>>>>>>>>>>>>>>>>>>>> is also often used to say "at will" and this is what 
>>>>>>>>>>>>>>>>>>>> triggered this thread.
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> I'm fine using "except if the machine state prevents
>>>>>>>>>>>>>>>>>>>> it" but "best effort" is too open and can be very badly 
>>>>>>>>>>>>>>>>>>>> and wrongly
>>>>>>>>>>>>>>>>>>>> perceived by users (like I did).
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> Romain Manni-Bucau
>>>>>>>>>>>>>>>>>>>> @rmannibucau <https://twitter.com/rmannibucau> |  Blog
>>>>>>>>>>>>>>>>>>>> <https://rmannibucau.metawerx.net/> | Old Blog
>>>>>>>>>>>>>>>>>>>> <http://rmannibucau.wordpress.com> | Github
>>>>>>>>>>>>>>>>>>>> <https://github.com/rmannibucau> | LinkedIn
>>>>>>>>>>>>>>>>>>>> <https://www.linkedin.com/in/rmannibucau> | Book
>>>>>>>>>>>>>>>>>>>> <https://www.packtpub.com/application-development/java-ee-8-high-performance>
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> 2018-02-18 18:13 GMT+01:00 Eugene Kirpichov <
>>>>>>>>>>>>>>>>>>>> [email protected]>:
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> It will not be called if it's impossible to call it:
>>>>>>>>>>>>>>>>>>>>> in the example situation you have (intergalactic crash), 
>>>>>>>>>>>>>>>>>>>>> and in a number of
>>>>>>>>>>>>>>>>>>>>> more common cases: eg in case the worker container has 
>>>>>>>>>>>>>>>>>>>>> crashed (eg user
>>>>>>>>>>>>>>>>>>>>> code in a different thread called a C library over JNI 
>>>>>>>>>>>>>>>>>>>>> and it segfaulted),
>>>>>>>>>>>>>>>>>>>>> JVM bug, crash due to user code OOM, in case the worker 
>>>>>>>>>>>>>>>>>>>>> has lost network
>>>>>>>>>>>>>>>>>>>>> connectivity (then it may be called but it won't be able 
>>>>>>>>>>>>>>>>>>>>> to do anything
>>>>>>>>>>>>>>>>>>>>> useful), in case this is running on a preemptible VM and 
>>>>>>>>>>>>>>>>>>>>> it was preempted
>>>>>>>>>>>>>>>>>>>>> by the underlying cluster manager without notice or if 
>>>>>>>>>>>>>>>>>>>>> the worker was too
>>>>>>>>>>>>>>>>>>>>> busy with other stuff (eg calling other Teardown 
>>>>>>>>>>>>>>>>>>>>> functions) until the
>>>>>>>>>>>>>>>>>>>>> preemption timeout elapsed, in case the underlying 
>>>>>>>>>>>>>>>>>>>>> hardware simply failed
>>>>>>>>>>>>>>>>>>>>> (which happens quite often at scale), and in many other 
>>>>>>>>>>>>>>>>>>>>> conditions.
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> "Best effort" is the commonly used term to describe
>>>>>>>>>>>>>>>>>>>>> such behavior. Please feel free to file bugs for cases 
>>>>>>>>>>>>>>>>>>>>> where you observed a
>>>>>>>>>>>>>>>>>>>>> runner not call Teardown in a situation where it was 
>>>>>>>>>>>>>>>>>>>>> possible to call it
>>>>>>>>>>>>>>>>>>>>> but the runner made insufficient effort.
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> On Sun, Feb 18, 2018, 9:02 AM Romain Manni-Bucau <
>>>>>>>>>>>>>>>>>>>>> [email protected]> wrote:
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>> 2018-02-18 18:00 GMT+01:00 Eugene Kirpichov <
>>>>>>>>>>>>>>>>>>>>>> [email protected]>:
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>> On Sun, Feb 18, 2018, 2:06 AM Romain Manni-Bucau <
>>>>>>>>>>>>>>>>>>>>>>> [email protected]> wrote:
>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>> Le 18 févr. 2018 00:23, "Kenneth Knowles" <
>>>>>>>>>>>>>>>>>>>>>>>> [email protected]> a écrit :
>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>> On Sat, Feb 17, 2018 at 3:09 PM, Romain Manni-Bucau
>>>>>>>>>>>>>>>>>>>>>>>> <[email protected]> wrote:
>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>> If you give an example of a high-level need (e.g.
>>>>>>>>>>>>>>>>>>>>>>>>> "I'm trying to write an IO for system $x and it 
>>>>>>>>>>>>>>>>>>>>>>>>> requires the following
>>>>>>>>>>>>>>>>>>>>>>>>> initialization and the following cleanup logic and 
>>>>>>>>>>>>>>>>>>>>>>>>> the following processing
>>>>>>>>>>>>>>>>>>>>>>>>> in between") I'll be better able to help you.
>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>> Take a simple example of a transform requiring a
>>>>>>>>>>>>>>>>>>>>>>>>> connection. Using bundles is a perf killer since size 
>>>>>>>>>>>>>>>>>>>>>>>>> is not controlled.
>>>>>>>>>>>>>>>>>>>>>>>>> Using teardown doesnt allow you to release the 
>>>>>>>>>>>>>>>>>>>>>>>>> connection since it is a
>>>>>>>>>>>>>>>>>>>>>>>>> best effort thing. Not releasing the connection makes 
>>>>>>>>>>>>>>>>>>>>>>>>> you pay a lot - aws
>>>>>>>>>>>>>>>>>>>>>>>>> ;) - or prevents you to launch other processings - 
>>>>>>>>>>>>>>>>>>>>>>>>> concurrent limit.
>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>> For this example @Teardown is an exact fit. If
>>>>>>>>>>>>>>>>>>>>>>>> things die so badly that @Teardown is not called then 
>>>>>>>>>>>>>>>>>>>>>>>> nothing else can be
>>>>>>>>>>>>>>>>>>>>>>>> called to close the connection either. What AWS 
>>>>>>>>>>>>>>>>>>>>>>>> service are you thinking of
>>>>>>>>>>>>>>>>>>>>>>>> that stays open for a long time when everything at the 
>>>>>>>>>>>>>>>>>>>>>>>> other end has died?
>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>> You assume connections are kind of stateless but
>>>>>>>>>>>>>>>>>>>>>>>> some (proprietary) protocols requires some closing 
>>>>>>>>>>>>>>>>>>>>>>>> exchanges which are not
>>>>>>>>>>>>>>>>>>>>>>>> only "im leaving".
>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>> For aws i was thinking about starting some services
>>>>>>>>>>>>>>>>>>>>>>>> - machines - on the fly in a pipeline startup and 
>>>>>>>>>>>>>>>>>>>>>>>> closing them at the end.
>>>>>>>>>>>>>>>>>>>>>>>> If teardown is not called you leak machines and money. 
>>>>>>>>>>>>>>>>>>>>>>>> You can say it can
>>>>>>>>>>>>>>>>>>>>>>>> be done another way...as the full pipeline ;).
>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>> I dont want to be picky but if beam cant handle its
>>>>>>>>>>>>>>>>>>>>>>>> components lifecycle it can be used at scale for 
>>>>>>>>>>>>>>>>>>>>>>>> generic pipelines and if
>>>>>>>>>>>>>>>>>>>>>>>> bound to some particular IO.
>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>> What does prevent to enforce teardown - ignoring
>>>>>>>>>>>>>>>>>>>>>>>> the interstellar crash case which cant be handled by 
>>>>>>>>>>>>>>>>>>>>>>>> any human system?
>>>>>>>>>>>>>>>>>>>>>>>> Nothing technically. Why do you push to not handle it? 
>>>>>>>>>>>>>>>>>>>>>>>> Is it due to some
>>>>>>>>>>>>>>>>>>>>>>>> legacy code on dataflow or something else?
>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>> Teardown *is* already documented and implemented
>>>>>>>>>>>>>>>>>>>>>>> this way (best-effort). So I'm not sure what kind of 
>>>>>>>>>>>>>>>>>>>>>>> change you're asking
>>>>>>>>>>>>>>>>>>>>>>> for.
>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>> Remove "best effort" from the javadoc. If it is not
>>>>>>>>>>>>>>>>>>>>>> call then it is a bug and we are done :).
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>> Also what does it mean for the users? Direct runner
>>>>>>>>>>>>>>>>>>>>>>>> does it so if a user udes the RI in test, he will get 
>>>>>>>>>>>>>>>>>>>>>>>> a different behavior
>>>>>>>>>>>>>>>>>>>>>>>> in prod? Also dont forget the user doesnt know what 
>>>>>>>>>>>>>>>>>>>>>>>> the IOs he composes use
>>>>>>>>>>>>>>>>>>>>>>>> so this is so impacting for the whole product than he 
>>>>>>>>>>>>>>>>>>>>>>>> must be handled IMHO.
>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>> I understand the portability culture is new in big
>>>>>>>>>>>>>>>>>>>>>>>> data world but it is not a reason to ignore what 
>>>>>>>>>>>>>>>>>>>>>>>> people did for years and
>>>>>>>>>>>>>>>>>>>>>>>> do it wrong before doing right ;).
>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>> My proposal is to list what can prevent to
>>>>>>>>>>>>>>>>>>>>>>>> guarantee - in the normal IT conditions - the 
>>>>>>>>>>>>>>>>>>>>>>>> execution of teardown. Then
>>>>>>>>>>>>>>>>>>>>>>>> we see if we can handle it and only if there is a 
>>>>>>>>>>>>>>>>>>>>>>>> technical reason we cant
>>>>>>>>>>>>>>>>>>>>>>>> we make it experimental/unsupported in the api. I know 
>>>>>>>>>>>>>>>>>>>>>>>> spark and flink can,
>>>>>>>>>>>>>>>>>>>>>>>> any unknown blocker for other runners?
>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>> Technical note: even a kill should go through java
>>>>>>>>>>>>>>>>>>>>>>>> shutdown hooks otherwise your environment (beam 
>>>>>>>>>>>>>>>>>>>>>>>> enclosing software) is
>>>>>>>>>>>>>>>>>>>>>>>> fully unhandled and your overall system is 
>>>>>>>>>>>>>>>>>>>>>>>> uncontrolled. Only case where it
>>>>>>>>>>>>>>>>>>>>>>>> is not true is when the software is always owned by a 
>>>>>>>>>>>>>>>>>>>>>>>> vendor and never
>>>>>>>>>>>>>>>>>>>>>>>> installed on customer environment. In this case it 
>>>>>>>>>>>>>>>>>>>>>>>> belongd to the vendor to
>>>>>>>>>>>>>>>>>>>>>>>> handle beam API and not to beam to adjust its API for 
>>>>>>>>>>>>>>>>>>>>>>>> a vendor - otherwise
>>>>>>>>>>>>>>>>>>>>>>>> all unsupported features by one runner should be made 
>>>>>>>>>>>>>>>>>>>>>>>> optional right?
>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>> All state is not about network, even in distributed
>>>>>>>>>>>>>>>>>>>>>>>> systems so this is key to have an explicit and defined 
>>>>>>>>>>>>>>>>>>>>>>>> lifecycle.
>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>> Kenn
>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>
>
>

Reply via email to