So what would you like to happen if there is a crash? The DoFn instance no
longer exists because the JVM it ran on no longer exists. What should
Teardown be called on?

On Mon, Feb 19, 2018, 10:20 AM Romain Manni-Bucau <rmannibu...@gmail.com>
wrote:

> This is what i want and not 999999 teardowns for 1000000 setups until
> there is an unexpected crash (= a bug).
>
> Le 19 févr. 2018 18:57, "Reuven Lax" <re...@google.com> a écrit :
>
>>
>>
>> On Mon, Feb 19, 2018 at 7:11 AM, Romain Manni-Bucau <
>> rmannibu...@gmail.com> wrote:
>>
>>>
>>>
>>> 2018-02-19 15:57 GMT+01:00 Reuven Lax <re...@google.com>:
>>>
>>>>
>>>>
>>>> On Mon, Feb 19, 2018 at 12:35 AM, Romain Manni-Bucau <
>>>> rmannibu...@gmail.com> wrote:
>>>>
>>>>> @Reuven: in practise it is created by pool of 256 but leads to the
>>>>> same pattern, the teardown is just a "if (iCreatedThem) releaseThem();"
>>>>>
>>>>
>>>> How do you control "256?" Even if you have a pool of 256 workers,
>>>> nothing in Beam guarantees how many threads and DoFns are created per
>>>> worker. In theory the runner might decide to create 1000 threads on each
>>>> worker.
>>>>
>>>
>>> Nop was the other way around, in this case on AWS you can get 256
>>> instances at once but not 512 (which will be 2x256). So when you compute
>>> the distribution you allocate to some fn the role to own the instance
>>> lookup and releasing.
>>>
>>
>> I still don't understand. Let's be more precise. If you write the
>> following code:
>>
>>    pCollection.apply(ParDo.of(new MyDoFn()));
>>
>> There is no way to control how many instances of MyDoFn are created. The
>> runner might decided to create a million instances of this class across
>> your worker pool, which means that you will get a million Setup and
>> Teardown calls.
>>
>>
>>> Anyway this was just an example of an external resource you must
>>> release. Real topic is that beam should define asap a guaranteed generic
>>> lifecycle to let user embrace its programming model.
>>>
>>>
>>>>
>>>>
>>>>
>>>>> @Eugene:
>>>>> 1. wait logic is about passing the value which is not always possible
>>>>> (like 15% of cases from my raw estimate)
>>>>> 2. sdf: i'll try to detail why i mention SDF more here
>>>>>
>>>>>
>>>>> Concretely beam exposes a portable API (included in the SDK core).
>>>>> This API defines a *container* API and therefore implies bean lifecycles.
>>>>> I'll not detail them all but just use the sources and dofn (not sdf) to
>>>>> illustrate the idea I'm trying to develop.
>>>>>
>>>>> A. Source
>>>>>
>>>>> A source computes a partition plan with 2 primitives: estimateSize and
>>>>> split. As an user you can expect both to be called on the same bean
>>>>> instance to avoid to pay the same connection cost(s) twice. Concretely:
>>>>>
>>>>> connect()
>>>>> try {
>>>>>   estimateSize()
>>>>>   split()
>>>>> } finally {
>>>>>   disconnect()
>>>>> }
>>>>>
>>>>> this is not guaranteed by the API so you must do:
>>>>>
>>>>> connect()
>>>>> try {
>>>>>   estimateSize()
>>>>> } finally {
>>>>>   disconnect()
>>>>> }
>>>>> connect()
>>>>> try {
>>>>>   split()
>>>>> } finally {
>>>>>   disconnect()
>>>>> }
>>>>>
>>>>> + a workaround with an internal estimate size since this primitive is
>>>>> often called in split but you dont want to connect twice in the second
>>>>> phase.
>>>>>
>>>>> Why do you need that? Simply cause you want to define an API to
>>>>> implement sources which initializes the source bean and destroys it.
>>>>> I insists it is a very very basic concern for such API. However beam
>>>>> doesn't embraces it and doesn't assume it so building any API on top of
>>>>> beam is very hurtful today and for direct beam users you hit the exact 
>>>>> same
>>>>> issues - check how IO are implemented, the static utilities which create
>>>>> volatile connections preventing to reuse existing connection in a single
>>>>> method (
>>>>> https://github.com/apache/beam/blob/master/sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java#L862
>>>>> ).
>>>>>
>>>>> Same logic applies to the reader which is then created.
>>>>>
>>>>> B. DoFn & SDF
>>>>>
>>>>> As a fn dev you expect the same from the beam runtime: init(); try {
>>>>> while (...) process(); } finally { destroy(); } and that it is executed on
>>>>> the exact same instance to be able to be stateful at that level for
>>>>> expensive connections/operations/flow state handling.
>>>>>
>>>>> As you mentionned with the million example, this sequence should
>>>>> happen for each single instance so 1M times for your example.
>>>>>
>>>>> Now why did I mention SDF several times? Because SDF is a
>>>>> generalisation of both cases (source and dofn). Therefore it creates way
>>>>> more instances and requires to have a way more strict/explicit definition
>>>>> of the exact lifecycle and which instance does what. Since beam handles 
>>>>> the
>>>>> full lifecycle of the bean instances it must provide init/destroy hooks
>>>>> (setup/teardown) which can be stateful.
>>>>>
>>>>> If you take the JDBC example which was mentionned earlier. Today,
>>>>> because of the teardown issue it uses bundles. Since bundles size is not
>>>>> defined - and will not with SDF, it must use a pool to be able to reuse a
>>>>> connection instance to not correct performances. Now with the SDF and the
>>>>> split increase, how do you handle the pool size? Generally in batch you 
>>>>> use
>>>>> a single connection per thread to avoid to consume all database
>>>>> connections. With a pool you have 2 choices: 1. use a pool of 1, 2. use a
>>>>> pool a bit higher but multiplied by the number of beans you will likely x2
>>>>> or 3 the connection count and make the execution fail with "no more
>>>>> connection available". I you picked 1 (pool of #1), then you still have to
>>>>> have a reliable teardown by pool instance (close() generally) to ensure 
>>>>> you
>>>>> release the pool and don't leak the connection information in the JVM. In
>>>>> all case you come back to the init()/destroy() lifecycle even if you fake
>>>>> to get connections with bundles.
>>>>>
>>>>> Just to make it obvious: SDF mentions are just cause SDF imply all the
>>>>> current issues with the loose definition of the bean lifecycles at an
>>>>> exponential level, nothing else.
>>>>>
>>>>>
>>>>>
>>>>> Romain Manni-Bucau
>>>>> @rmannibucau <https://twitter.com/rmannibucau> |  Blog
>>>>> <https://rmannibucau.metawerx.net/> | Old Blog
>>>>> <http://rmannibucau.wordpress.com> | Github
>>>>> <https://github.com/rmannibucau> | LinkedIn
>>>>> <https://www.linkedin.com/in/rmannibucau> | Book
>>>>> <https://www.packtpub.com/application-development/java-ee-8-high-performance>
>>>>>
>>>>> 2018-02-18 22:32 GMT+01:00 Eugene Kirpichov <kirpic...@google.com>:
>>>>>
>>>>>> The kind of whole-transform lifecycle you're mentioning can be
>>>>>> accomplished using the Wait transform as I suggested in the thread above,
>>>>>> and I believe it should become the canonical way to do that.
>>>>>>
>>>>>> (Would like to reiterate one more time, as the main author of most
>>>>>> design documents related to SDF and of its implementation in the Java
>>>>>> direct and dataflow runner that SDF is fully unrelated to the topic of
>>>>>> cleanup - I'm very confused as to why it keeps coming up)
>>>>>>
>>>>>> On Sun, Feb 18, 2018, 1:15 PM Romain Manni-Bucau <
>>>>>> rmannibu...@gmail.com> wrote:
>>>>>>
>>>>>>> I kind of agree except transforms lack a lifecycle too. My
>>>>>>> understanding is that sdf could be a way to unify it and clean the api.
>>>>>>>
>>>>>>> Otherwise how to normalize - single api -  lifecycle of transforms?
>>>>>>>
>>>>>>> Le 18 févr. 2018 21:32, "Ben Chambers" <bchamb...@apache.org> a
>>>>>>> écrit :
>>>>>>>
>>>>>>>> Are you sure that focusing on the cleanup of specific DoFn's is
>>>>>>>> appropriate? Many cases where cleanup is necessary, it is around an 
>>>>>>>> entire
>>>>>>>> composite PTransform. I think there have been discussions/proposals 
>>>>>>>> around
>>>>>>>> a more methodical "cleanup" option, but those haven't been 
>>>>>>>> implemented, to
>>>>>>>> the best of my knowledge.
>>>>>>>>
>>>>>>>> For instance, consider the steps of a FileIO:
>>>>>>>> 1. Write to a bunch (N shards) of temporary files
>>>>>>>> 2. When all temporary files are complete, attempt to do a bulk copy
>>>>>>>> to put them in the final destination.
>>>>>>>> 3. Cleanup all the temporary files.
>>>>>>>>
>>>>>>>> (This is often desirable because it minimizes the chance of seeing
>>>>>>>> partial/incomplete results in the final destination).
>>>>>>>>
>>>>>>>> In the above, you'd want step 1 to execute on many workers, likely
>>>>>>>> using a ParDo (say N different workers).
>>>>>>>> The move step should only happen once, so on one worker. This means
>>>>>>>> it will be a different DoFn, likely with some stuff done to ensure it 
>>>>>>>> runs
>>>>>>>> on one worker.
>>>>>>>>
>>>>>>>> In such a case, cleanup / @TearDown of the DoFn is not enough. We
>>>>>>>> need an API for a PTransform to schedule some cleanup work for when the
>>>>>>>> transform is "done". In batch this is relatively straightforward, but
>>>>>>>> doesn't exist. This is the source of some problems, such as BigQuery 
>>>>>>>> sink
>>>>>>>> leaving files around that have failed to import into BigQuery.
>>>>>>>>
>>>>>>>> In streaming this is less straightforward -- do you want to wait
>>>>>>>> until the end of the pipeline? Or do you want to wait until the end of 
>>>>>>>> the
>>>>>>>> window? In practice, you just want to wait until you know nobody will 
>>>>>>>> need
>>>>>>>> the resource anymore.
>>>>>>>>
>>>>>>>> This led to some discussions around a "cleanup" API, where you
>>>>>>>> could have a transform that output resource objects. Each resource 
>>>>>>>> object
>>>>>>>> would have logic for cleaning it up. And there would be something that
>>>>>>>> indicated what parts of the pipeline needed that resource, and what 
>>>>>>>> kind of
>>>>>>>> temporal lifetime those objects had. As soon as that part of the 
>>>>>>>> pipeline
>>>>>>>> had advanced far enough that it would no longer need the resources, 
>>>>>>>> they
>>>>>>>> would get cleaned up. This can be done at pipeline shutdown, or
>>>>>>>> incrementally during a streaming pipeline, etc.
>>>>>>>>
>>>>>>>> Would something like this be a better fit for your use case? If
>>>>>>>> not, why is handling teardown within a single DoFn sufficient?
>>>>>>>>
>>>>>>>> On Sun, Feb 18, 2018 at 11:53 AM Romain Manni-Bucau <
>>>>>>>> rmannibu...@gmail.com> wrote:
>>>>>>>>
>>>>>>>>> Yes 1M. Lets try to explain you simplifying the overall execution.
>>>>>>>>> Each instance - one fn so likely in a thread of a worker - has its
>>>>>>>>> lifecycle. Caricaturally: "new" and garbage collection.
>>>>>>>>>
>>>>>>>>> In practise, new is often an unsafe allocate (deserialization) but
>>>>>>>>> it doesnt matter here.
>>>>>>>>>
>>>>>>>>> What i want is any "new" to have a following setup before any
>>>>>>>>> process or stattbundle and the last time beam has the instance before 
>>>>>>>>> it is
>>>>>>>>> gc-ed and after last finishbundle it calls teardown.
>>>>>>>>>
>>>>>>>>> It is as simple as it.
>>>>>>>>> This way no need to comibe fn in a way making a fn not self
>>>>>>>>> contained to implement basic transforms.
>>>>>>>>>
>>>>>>>>> Le 18 févr. 2018 20:07, "Reuven Lax" <re...@google.com> a écrit :
>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> On Sun, Feb 18, 2018 at 10:50 AM, Romain Manni-Bucau <
>>>>>>>>>> rmannibu...@gmail.com> wrote:
>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> Le 18 févr. 2018 19:28, "Ben Chambers" <bchamb...@apache.org> a
>>>>>>>>>>> écrit :
>>>>>>>>>>>
>>>>>>>>>>> It feels like his thread may be a bit off-track. Rather than
>>>>>>>>>>> focusing on the semantics of the existing methods -- which have 
>>>>>>>>>>> been noted
>>>>>>>>>>> to be meet many existing use cases -- it would be helpful to focus 
>>>>>>>>>>> on more
>>>>>>>>>>> on the reason you are looking for something with different 
>>>>>>>>>>> semantics.
>>>>>>>>>>>
>>>>>>>>>>> Some possibilities (I'm not sure which one you are trying to do):
>>>>>>>>>>>
>>>>>>>>>>> 1. Clean-up some external, global resource, that was initialized
>>>>>>>>>>> once during the startup of the pipeline. If this is the case, how 
>>>>>>>>>>> are you
>>>>>>>>>>> ensuring it was really only initialized once (and not once per 
>>>>>>>>>>> worker, per
>>>>>>>>>>> thread, per instance, etc.)? How do you know when the pipeline 
>>>>>>>>>>> should
>>>>>>>>>>> release it? If the answer is "when it reaches step X", then what 
>>>>>>>>>>> about a
>>>>>>>>>>> streaming pipeline?
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> When the dofn is no more needed logically ie when the batch is
>>>>>>>>>>> done or stream is stopped (manually or by a jvm shutdown)
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> I'm really not following what this means.
>>>>>>>>>>
>>>>>>>>>> Let's say that a pipeline is running 1000 workers, and each
>>>>>>>>>> worker is running 1000 threads (each running a copy of the same 
>>>>>>>>>> DoFn). How
>>>>>>>>>> many cleanups do you want (do you want 1000 * 1000 = 1M cleanups) 
>>>>>>>>>> and when
>>>>>>>>>> do you want it called? When the entire pipeline is shut down? When an
>>>>>>>>>> individual worker is about to shut down (which may be temporary - 
>>>>>>>>>> may be
>>>>>>>>>> about to start back up)? Something else?
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> 2. Finalize some resources that are used within some region of
>>>>>>>>>>> the pipeline. While, the DoFn lifecycle methods are not a good fit 
>>>>>>>>>>> for this
>>>>>>>>>>> (they are focused on managing resources within the DoFn), you could 
>>>>>>>>>>> model
>>>>>>>>>>> this on how FileIO finalizes the files that it produced. For 
>>>>>>>>>>> instance:
>>>>>>>>>>>    a) ParDo generates "resource IDs" (or some token that stores
>>>>>>>>>>> information about resources)
>>>>>>>>>>>    b) "Require Deterministic Input" (to prevent retries from
>>>>>>>>>>> changing resource IDs)
>>>>>>>>>>>    c) ParDo that initializes the resources
>>>>>>>>>>>    d) Pipeline segments that use the resources, and eventually
>>>>>>>>>>> output the fact they're done
>>>>>>>>>>>    e) "Require Deterministic Input"
>>>>>>>>>>>    f) ParDo that frees the resources
>>>>>>>>>>>
>>>>>>>>>>> By making the use of the resource part of the data it is
>>>>>>>>>>> possible to "checkpoint" which resources may be in use or have been
>>>>>>>>>>> finished by using the require deterministic input. This is 
>>>>>>>>>>> important to
>>>>>>>>>>> ensuring everything is actually cleaned up.
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> I nees that but generic and not case by case to industrialize
>>>>>>>>>>> some api on top of beam.
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> 3. Some other use case that I may be missing? If it is this
>>>>>>>>>>> case, could you elaborate on what you are trying to accomplish? 
>>>>>>>>>>> That would
>>>>>>>>>>> help me understand both the problems with existing options and 
>>>>>>>>>>> possibly
>>>>>>>>>>> what could be done to help.
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> I understand there are sorkaround for almost all cases but means
>>>>>>>>>>> each transform is different in its lifecycle handling  except i 
>>>>>>>>>>> dislike it
>>>>>>>>>>> a lot at a scale and as a user since you cant put any unified 
>>>>>>>>>>> practise on
>>>>>>>>>>> top of beam, it also makes beam very hard to integrate or to use to 
>>>>>>>>>>> build
>>>>>>>>>>> higher level libraries or softwares.
>>>>>>>>>>>
>>>>>>>>>>> This is why i tried to not start the workaround discussions and
>>>>>>>>>>> just stay at API level.
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> -- Ben
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> On Sun, Feb 18, 2018 at 9:56 AM Romain Manni-Bucau <
>>>>>>>>>>> rmannibu...@gmail.com> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> 2018-02-18 18:36 GMT+01:00 Eugene Kirpichov <
>>>>>>>>>>>> kirpic...@google.com>:
>>>>>>>>>>>>
>>>>>>>>>>>>> "Machine state" is overly low-level because many of the
>>>>>>>>>>>>> possible reasons can happen on a perfectly fine machine.
>>>>>>>>>>>>> If you'd like to rephrase it to "it will be called except in
>>>>>>>>>>>>> various situations where it's logically impossible or impractical 
>>>>>>>>>>>>> to
>>>>>>>>>>>>> guarantee that it's called", that's fine. Or you can list some of 
>>>>>>>>>>>>> the
>>>>>>>>>>>>> examples above.
>>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> Sounds ok to me
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>> The main point for the user is, you *will* see non-preventable
>>>>>>>>>>>>> situations where it couldn't be called - it's not just 
>>>>>>>>>>>>> intergalactic
>>>>>>>>>>>>> crashes - so if the logic is very important (e.g. cleaning up a 
>>>>>>>>>>>>> large
>>>>>>>>>>>>> amount of temporary files, shutting down a large number of VMs 
>>>>>>>>>>>>> you started
>>>>>>>>>>>>> etc), you have to express it using one of the other methods that 
>>>>>>>>>>>>> have
>>>>>>>>>>>>> stricter guarantees (which obviously come at a cost, e.g. no
>>>>>>>>>>>>> pass-by-reference).
>>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> FinishBundle has the exact same guarantee sadly so not which
>>>>>>>>>>>> which other method you speak about. Concretely if you make it 
>>>>>>>>>>>> really
>>>>>>>>>>>> unreliable - this is what best effort sounds to me - then users 
>>>>>>>>>>>> can use it
>>>>>>>>>>>> to clean anything but if you make it "can happen but it is 
>>>>>>>>>>>> unexpected and
>>>>>>>>>>>> means something happent" then it is fine to have a manual - or 
>>>>>>>>>>>> auto if
>>>>>>>>>>>> fancy - recovery procedure. This is where it makes all the 
>>>>>>>>>>>> difference and
>>>>>>>>>>>> impacts the developpers, ops (all users basically).
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>> On Sun, Feb 18, 2018 at 9:16 AM Romain Manni-Bucau <
>>>>>>>>>>>>> rmannibu...@gmail.com> wrote:
>>>>>>>>>>>>>
>>>>>>>>>>>>>> Agree Eugene except that "best effort" means that. It is also
>>>>>>>>>>>>>> often used to say "at will" and this is what triggered this 
>>>>>>>>>>>>>> thread.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> I'm fine using "except if the machine state prevents it" but
>>>>>>>>>>>>>> "best effort" is too open and can be very badly and wrongly 
>>>>>>>>>>>>>> perceived by
>>>>>>>>>>>>>> users (like I did).
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Romain Manni-Bucau
>>>>>>>>>>>>>> @rmannibucau <https://twitter.com/rmannibucau> |  Blog
>>>>>>>>>>>>>> <https://rmannibucau.metawerx.net/> | Old Blog
>>>>>>>>>>>>>> <http://rmannibucau.wordpress.com> | Github
>>>>>>>>>>>>>> <https://github.com/rmannibucau> | LinkedIn
>>>>>>>>>>>>>> <https://www.linkedin.com/in/rmannibucau> | Book
>>>>>>>>>>>>>> <https://www.packtpub.com/application-development/java-ee-8-high-performance>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> 2018-02-18 18:13 GMT+01:00 Eugene Kirpichov <
>>>>>>>>>>>>>> kirpic...@google.com>:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> It will not be called if it's impossible to call it: in the
>>>>>>>>>>>>>>> example situation you have (intergalactic crash), and in a 
>>>>>>>>>>>>>>> number of more
>>>>>>>>>>>>>>> common cases: eg in case the worker container has crashed (eg 
>>>>>>>>>>>>>>> user code in
>>>>>>>>>>>>>>> a different thread called a C library over JNI and it 
>>>>>>>>>>>>>>> segfaulted), JVM bug,
>>>>>>>>>>>>>>> crash due to user code OOM, in case the worker has lost network
>>>>>>>>>>>>>>> connectivity (then it may be called but it won't be able to do 
>>>>>>>>>>>>>>> anything
>>>>>>>>>>>>>>> useful), in case this is running on a preemptible VM and it was 
>>>>>>>>>>>>>>> preempted
>>>>>>>>>>>>>>> by the underlying cluster manager without notice or if the 
>>>>>>>>>>>>>>> worker was too
>>>>>>>>>>>>>>> busy with other stuff (eg calling other Teardown functions) 
>>>>>>>>>>>>>>> until the
>>>>>>>>>>>>>>> preemption timeout elapsed, in case the underlying hardware 
>>>>>>>>>>>>>>> simply failed
>>>>>>>>>>>>>>> (which happens quite often at scale), and in many other 
>>>>>>>>>>>>>>> conditions.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> "Best effort" is the commonly used term to describe such
>>>>>>>>>>>>>>> behavior. Please feel free to file bugs for cases where you 
>>>>>>>>>>>>>>> observed a
>>>>>>>>>>>>>>> runner not call Teardown in a situation where it was possible 
>>>>>>>>>>>>>>> to call it
>>>>>>>>>>>>>>> but the runner made insufficient effort.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> On Sun, Feb 18, 2018, 9:02 AM Romain Manni-Bucau <
>>>>>>>>>>>>>>> rmannibu...@gmail.com> wrote:
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> 2018-02-18 18:00 GMT+01:00 Eugene Kirpichov <
>>>>>>>>>>>>>>>> kirpic...@google.com>:
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> On Sun, Feb 18, 2018, 2:06 AM Romain Manni-Bucau <
>>>>>>>>>>>>>>>>> rmannibu...@gmail.com> wrote:
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> Le 18 févr. 2018 00:23, "Kenneth Knowles" <k...@google.com>
>>>>>>>>>>>>>>>>>> a écrit :
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> On Sat, Feb 17, 2018 at 3:09 PM, Romain Manni-Bucau <
>>>>>>>>>>>>>>>>>> rmannibu...@gmail.com> wrote:
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> If you give an example of a high-level need (e.g. "I'm
>>>>>>>>>>>>>>>>>>> trying to write an IO for system $x and it requires the 
>>>>>>>>>>>>>>>>>>> following
>>>>>>>>>>>>>>>>>>> initialization and the following cleanup logic and the 
>>>>>>>>>>>>>>>>>>> following processing
>>>>>>>>>>>>>>>>>>> in between") I'll be better able to help you.
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> Take a simple example of a transform requiring a
>>>>>>>>>>>>>>>>>>> connection. Using bundles is a perf killer since size is 
>>>>>>>>>>>>>>>>>>> not controlled.
>>>>>>>>>>>>>>>>>>> Using teardown doesnt allow you to release the connection 
>>>>>>>>>>>>>>>>>>> since it is a
>>>>>>>>>>>>>>>>>>> best effort thing. Not releasing the connection makes you 
>>>>>>>>>>>>>>>>>>> pay a lot - aws
>>>>>>>>>>>>>>>>>>> ;) - or prevents you to launch other processings - 
>>>>>>>>>>>>>>>>>>> concurrent limit.
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> For this example @Teardown is an exact fit. If things die
>>>>>>>>>>>>>>>>>> so badly that @Teardown is not called then nothing else can 
>>>>>>>>>>>>>>>>>> be called to
>>>>>>>>>>>>>>>>>> close the connection either. What AWS service are you 
>>>>>>>>>>>>>>>>>> thinking of that
>>>>>>>>>>>>>>>>>> stays open for a long time when everything at the other end 
>>>>>>>>>>>>>>>>>> has died?
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> You assume connections are kind of stateless but some
>>>>>>>>>>>>>>>>>> (proprietary) protocols requires some closing exchanges 
>>>>>>>>>>>>>>>>>> which are not only
>>>>>>>>>>>>>>>>>> "im leaving".
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> For aws i was thinking about starting some services -
>>>>>>>>>>>>>>>>>> machines - on the fly in a pipeline startup and closing them 
>>>>>>>>>>>>>>>>>> at the end. If
>>>>>>>>>>>>>>>>>> teardown is not called you leak machines and money. You can 
>>>>>>>>>>>>>>>>>> say it can be
>>>>>>>>>>>>>>>>>> done another way...as the full pipeline ;).
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> I dont want to be picky but if beam cant handle its
>>>>>>>>>>>>>>>>>> components lifecycle it can be used at scale for generic 
>>>>>>>>>>>>>>>>>> pipelines and if
>>>>>>>>>>>>>>>>>> bound to some particular IO.
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> What does prevent to enforce teardown - ignoring the
>>>>>>>>>>>>>>>>>> interstellar crash case which cant be handled by any human 
>>>>>>>>>>>>>>>>>> system? Nothing
>>>>>>>>>>>>>>>>>> technically. Why do you push to not handle it? Is it due to 
>>>>>>>>>>>>>>>>>> some legacy
>>>>>>>>>>>>>>>>>> code on dataflow or something else?
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Teardown *is* already documented and implemented this way
>>>>>>>>>>>>>>>>> (best-effort). So I'm not sure what kind of change you're 
>>>>>>>>>>>>>>>>> asking for.
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Remove "best effort" from the javadoc. If it is not call
>>>>>>>>>>>>>>>> then it is a bug and we are done :).
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> Also what does it mean for the users? Direct runner does
>>>>>>>>>>>>>>>>>> it so if a user udes the RI in test, he will get a different 
>>>>>>>>>>>>>>>>>> behavior in
>>>>>>>>>>>>>>>>>> prod? Also dont forget the user doesnt know what the IOs he 
>>>>>>>>>>>>>>>>>> composes use so
>>>>>>>>>>>>>>>>>> this is so impacting for the whole product than he must be 
>>>>>>>>>>>>>>>>>> handled IMHO.
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> I understand the portability culture is new in big data
>>>>>>>>>>>>>>>>>> world but it is not a reason to ignore what people did for 
>>>>>>>>>>>>>>>>>> years and do it
>>>>>>>>>>>>>>>>>> wrong before doing right ;).
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> My proposal is to list what can prevent to guarantee - in
>>>>>>>>>>>>>>>>>> the normal IT conditions - the execution of teardown. Then 
>>>>>>>>>>>>>>>>>> we see if we can
>>>>>>>>>>>>>>>>>> handle it and only if there is a technical reason we cant we 
>>>>>>>>>>>>>>>>>> make it
>>>>>>>>>>>>>>>>>> experimental/unsupported in the api. I know spark and flink 
>>>>>>>>>>>>>>>>>> can, any
>>>>>>>>>>>>>>>>>> unknown blocker for other runners?
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> Technical note: even a kill should go through java
>>>>>>>>>>>>>>>>>> shutdown hooks otherwise your environment (beam enclosing 
>>>>>>>>>>>>>>>>>> software) is
>>>>>>>>>>>>>>>>>> fully unhandled and your overall system is uncontrolled. 
>>>>>>>>>>>>>>>>>> Only case where it
>>>>>>>>>>>>>>>>>> is not true is when the software is always owned by a vendor 
>>>>>>>>>>>>>>>>>> and never
>>>>>>>>>>>>>>>>>> installed on customer environment. In this case it belongd 
>>>>>>>>>>>>>>>>>> to the vendor to
>>>>>>>>>>>>>>>>>> handle beam API and not to beam to adjust its API for a 
>>>>>>>>>>>>>>>>>> vendor - otherwise
>>>>>>>>>>>>>>>>>> all unsupported features by one runner should be made 
>>>>>>>>>>>>>>>>>> optional right?
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> All state is not about network, even in distributed
>>>>>>>>>>>>>>>>>> systems so this is key to have an explicit and defined 
>>>>>>>>>>>>>>>>>> lifecycle.
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> Kenn
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>
>>>>>
>>>>
>>>
>>

Reply via email to