On Mon, Feb 19, 2018 at 12:35 AM, Romain Manni-Bucau <rmannibu...@gmail.com>

> @Reuven: in practise it is created by pool of 256 but leads to the same
> pattern, the teardown is just a "if (iCreatedThem) releaseThem();"

How do you control "256?" Even if you have a pool of 256 workers, nothing
in Beam guarantees how many threads and DoFns are created per worker. In
theory the runner might decide to create 1000 threads on each worker.

> @Eugene:
> 1. wait logic is about passing the value which is not always possible
> (like 15% of cases from my raw estimate)
> 2. sdf: i'll try to detail why i mention SDF more here
> Concretely beam exposes a portable API (included in the SDK core). This
> API defines a *container* API and therefore implies bean lifecycles. I'll
> not detail them all but just use the sources and dofn (not sdf) to
> illustrate the idea I'm trying to develop.
> A. Source
> A source computes a partition plan with 2 primitives: estimateSize and
> split. As an user you can expect both to be called on the same bean
> instance to avoid to pay the same connection cost(s) twice. Concretely:
> connect()
> try {
>   estimateSize()
>   split()
> } finally {
>   disconnect()
> }
> this is not guaranteed by the API so you must do:
> connect()
> try {
>   estimateSize()
> } finally {
>   disconnect()
> }
> connect()
> try {
>   split()
> } finally {
>   disconnect()
> }
> + a workaround with an internal estimate size since this primitive is
> often called in split but you dont want to connect twice in the second
> phase.
> Why do you need that? Simply cause you want to define an API to implement
> sources which initializes the source bean and destroys it.
> I insists it is a very very basic concern for such API. However beam
> doesn't embraces it and doesn't assume it so building any API on top of
> beam is very hurtful today and for direct beam users you hit the exact same
> issues - check how IO are implemented, the static utilities which create
> volatile connections preventing to reuse existing connection in a single
> method (https://github.com/apache/beam/blob/master/sdks/java/io/
> elasticsearch/src/main/java/org/apache/beam/sdk/io/
> elasticsearch/ElasticsearchIO.java#L862).
> Same logic applies to the reader which is then created.
> B. DoFn & SDF
> As a fn dev you expect the same from the beam runtime: init(); try { while
> (...) process(); } finally { destroy(); } and that it is executed on the
> exact same instance to be able to be stateful at that level for expensive
> connections/operations/flow state handling.
> As you mentionned with the million example, this sequence should happen
> for each single instance so 1M times for your example.
> Now why did I mention SDF several times? Because SDF is a generalisation
> of both cases (source and dofn). Therefore it creates way more instances
> and requires to have a way more strict/explicit definition of the exact
> lifecycle and which instance does what. Since beam handles the full
> lifecycle of the bean instances it must provide init/destroy hooks
> (setup/teardown) which can be stateful.
> If you take the JDBC example which was mentionned earlier. Today, because
> of the teardown issue it uses bundles. Since bundles size is not defined -
> and will not with SDF, it must use a pool to be able to reuse a connection
> instance to not correct performances. Now with the SDF and the split
> increase, how do you handle the pool size? Generally in batch you use a
> single connection per thread to avoid to consume all database connections.
> With a pool you have 2 choices: 1. use a pool of 1, 2. use a pool a bit
> higher but multiplied by the number of beans you will likely x2 or 3 the
> connection count and make the execution fail with "no more connection
> available". I you picked 1 (pool of #1), then you still have to have a
> reliable teardown by pool instance (close() generally) to ensure you
> release the pool and don't leak the connection information in the JVM. In
> all case you come back to the init()/destroy() lifecycle even if you fake
> to get connections with bundles.
> Just to make it obvious: SDF mentions are just cause SDF imply all the
> current issues with the loose definition of the bean lifecycles at an
> exponential level, nothing else.
> Romain Manni-Bucau
> @rmannibucau <https://twitter.com/rmannibucau> |  Blog
> <https://rmannibucau.metawerx.net/> | Old Blog
> <http://rmannibucau.wordpress.com> | Github
> <https://github.com/rmannibucau> | LinkedIn
> <https://www.linkedin.com/in/rmannibucau> | Book
> <https://www.packtpub.com/application-development/java-ee-8-high-performance>
> 2018-02-18 22:32 GMT+01:00 Eugene Kirpichov <kirpic...@google.com>:
>> The kind of whole-transform lifecycle you're mentioning can be
>> accomplished using the Wait transform as I suggested in the thread above,
>> and I believe it should become the canonical way to do that.
>> (Would like to reiterate one more time, as the main author of most design
>> documents related to SDF and of its implementation in the Java direct and
>> dataflow runner that SDF is fully unrelated to the topic of cleanup - I'm
>> very confused as to why it keeps coming up)
>> On Sun, Feb 18, 2018, 1:15 PM Romain Manni-Bucau <rmannibu...@gmail.com>
>> wrote:
>>> I kind of agree except transforms lack a lifecycle too. My understanding
>>> is that sdf could be a way to unify it and clean the api.
>>> Otherwise how to normalize - single api -  lifecycle of transforms?
>>> Le 18 févr. 2018 21:32, "Ben Chambers" <bchamb...@apache.org> a écrit :
>>>> Are you sure that focusing on the cleanup of specific DoFn's is
>>>> appropriate? Many cases where cleanup is necessary, it is around an entire
>>>> composite PTransform. I think there have been discussions/proposals around
>>>> a more methodical "cleanup" option, but those haven't been implemented, to
>>>> the best of my knowledge.
>>>> For instance, consider the steps of a FileIO:
>>>> 1. Write to a bunch (N shards) of temporary files
>>>> 2. When all temporary files are complete, attempt to do a bulk copy to
>>>> put them in the final destination.
>>>> 3. Cleanup all the temporary files.
>>>> (This is often desirable because it minimizes the chance of seeing
>>>> partial/incomplete results in the final destination).
>>>> In the above, you'd want step 1 to execute on many workers, likely
>>>> using a ParDo (say N different workers).
>>>> The move step should only happen once, so on one worker. This means it
>>>> will be a different DoFn, likely with some stuff done to ensure it runs on
>>>> one worker.
>>>> In such a case, cleanup / @TearDown of the DoFn is not enough. We need
>>>> an API for a PTransform to schedule some cleanup work for when the
>>>> transform is "done". In batch this is relatively straightforward, but
>>>> doesn't exist. This is the source of some problems, such as BigQuery sink
>>>> leaving files around that have failed to import into BigQuery.
>>>> In streaming this is less straightforward -- do you want to wait until
>>>> the end of the pipeline? Or do you want to wait until the end of the
>>>> window? In practice, you just want to wait until you know nobody will need
>>>> the resource anymore.
>>>> This led to some discussions around a "cleanup" API, where you could
>>>> have a transform that output resource objects. Each resource object would
>>>> have logic for cleaning it up. And there would be something that indicated
>>>> what parts of the pipeline needed that resource, and what kind of temporal
>>>> lifetime those objects had. As soon as that part of the pipeline had
>>>> advanced far enough that it would no longer need the resources, they would
>>>> get cleaned up. This can be done at pipeline shutdown, or incrementally
>>>> during a streaming pipeline, etc.
>>>> Would something like this be a better fit for your use case? If not,
>>>> why is handling teardown within a single DoFn sufficient?
>>>> On Sun, Feb 18, 2018 at 11:53 AM Romain Manni-Bucau <
>>>> rmannibu...@gmail.com> wrote:
>>>>> Yes 1M. Lets try to explain you simplifying the overall execution.
>>>>> Each instance - one fn so likely in a thread of a worker - has its
>>>>> lifecycle. Caricaturally: "new" and garbage collection.
>>>>> In practise, new is often an unsafe allocate (deserialization) but it
>>>>> doesnt matter here.
>>>>> What i want is any "new" to have a following setup before any process
>>>>> or stattbundle and the last time beam has the instance before it is gc-ed
>>>>> and after last finishbundle it calls teardown.
>>>>> It is as simple as it.
>>>>> This way no need to comibe fn in a way making a fn not self contained
>>>>> to implement basic transforms.
>>>>> Le 18 févr. 2018 20:07, "Reuven Lax" <re...@google.com> a écrit :
>>>>>> On Sun, Feb 18, 2018 at 10:50 AM, Romain Manni-Bucau <
>>>>>> rmannibu...@gmail.com> wrote:
>>>>>>> Le 18 févr. 2018 19:28, "Ben Chambers" <bchamb...@apache.org> a
>>>>>>> écrit :
>>>>>>> It feels like his thread may be a bit off-track. Rather than
>>>>>>> focusing on the semantics of the existing methods -- which have been 
>>>>>>> noted
>>>>>>> to be meet many existing use cases -- it would be helpful to focus on 
>>>>>>> more
>>>>>>> on the reason you are looking for something with different semantics.
>>>>>>> Some possibilities (I'm not sure which one you are trying to do):
>>>>>>> 1. Clean-up some external, global resource, that was initialized
>>>>>>> once during the startup of the pipeline. If this is the case, how are 
>>>>>>> you
>>>>>>> ensuring it was really only initialized once (and not once per worker, 
>>>>>>> per
>>>>>>> thread, per instance, etc.)? How do you know when the pipeline should
>>>>>>> release it? If the answer is "when it reaches step X", then what about a
>>>>>>> streaming pipeline?
>>>>>>> When the dofn is no more needed logically ie when the batch is done
>>>>>>> or stream is stopped (manually or by a jvm shutdown)
>>>>>> I'm really not following what this means.
>>>>>> Let's say that a pipeline is running 1000 workers, and each worker is
>>>>>> running 1000 threads (each running a copy of the same DoFn). How many
>>>>>> cleanups do you want (do you want 1000 * 1000 = 1M cleanups) and when do
>>>>>> you want it called? When the entire pipeline is shut down? When an
>>>>>> individual worker is about to shut down (which may be temporary - may be
>>>>>> about to start back up)? Something else?
>>>>>>> 2. Finalize some resources that are used within some region of the
>>>>>>> pipeline. While, the DoFn lifecycle methods are not a good fit for this
>>>>>>> (they are focused on managing resources within the DoFn), you could 
>>>>>>> model
>>>>>>> this on how FileIO finalizes the files that it produced. For instance:
>>>>>>>    a) ParDo generates "resource IDs" (or some token that stores
>>>>>>> information about resources)
>>>>>>>    b) "Require Deterministic Input" (to prevent retries from
>>>>>>> changing resource IDs)
>>>>>>>    c) ParDo that initializes the resources
>>>>>>>    d) Pipeline segments that use the resources, and eventually
>>>>>>> output the fact they're done
>>>>>>>    e) "Require Deterministic Input"
>>>>>>>    f) ParDo that frees the resources
>>>>>>> By making the use of the resource part of the data it is possible to
>>>>>>> "checkpoint" which resources may be in use or have been finished by 
>>>>>>> using
>>>>>>> the require deterministic input. This is important to ensuring 
>>>>>>> everything
>>>>>>> is actually cleaned up.
>>>>>>> I nees that but generic and not case by case to industrialize some
>>>>>>> api on top of beam.
>>>>>>> 3. Some other use case that I may be missing? If it is this case,
>>>>>>> could you elaborate on what you are trying to accomplish? That would 
>>>>>>> help
>>>>>>> me understand both the problems with existing options and possibly what
>>>>>>> could be done to help.
>>>>>>> I understand there are sorkaround for almost all cases but means
>>>>>>> each transform is different in its lifecycle handling  except i dislike 
>>>>>>> it
>>>>>>> a lot at a scale and as a user since you cant put any unified practise 
>>>>>>> on
>>>>>>> top of beam, it also makes beam very hard to integrate or to use to 
>>>>>>> build
>>>>>>> higher level libraries or softwares.
>>>>>>> This is why i tried to not start the workaround discussions and just
>>>>>>> stay at API level.
>>>>>>> -- Ben
>>>>>>> On Sun, Feb 18, 2018 at 9:56 AM Romain Manni-Bucau <
>>>>>>> rmannibu...@gmail.com> wrote:
>>>>>>>> 2018-02-18 18:36 GMT+01:00 Eugene Kirpichov <kirpic...@google.com>:
>>>>>>>>> "Machine state" is overly low-level because many of the possible
>>>>>>>>> reasons can happen on a perfectly fine machine.
>>>>>>>>> If you'd like to rephrase it to "it will be called except in
>>>>>>>>> various situations where it's logically impossible or impractical to
>>>>>>>>> guarantee that it's called", that's fine. Or you can list some of the
>>>>>>>>> examples above.
>>>>>>>> Sounds ok to me
>>>>>>>>> The main point for the user is, you *will* see non-preventable
>>>>>>>>> situations where it couldn't be called - it's not just intergalactic
>>>>>>>>> crashes - so if the logic is very important (e.g. cleaning up a large
>>>>>>>>> amount of temporary files, shutting down a large number of VMs you 
>>>>>>>>> started
>>>>>>>>> etc), you have to express it using one of the other methods that have
>>>>>>>>> stricter guarantees (which obviously come at a cost, e.g. no
>>>>>>>>> pass-by-reference).
>>>>>>>> FinishBundle has the exact same guarantee sadly so not which which
>>>>>>>> other method you speak about. Concretely if you make it really 
>>>>>>>> unreliable -
>>>>>>>> this is what best effort sounds to me - then users can use it to clean
>>>>>>>> anything but if you make it "can happen but it is unexpected and means
>>>>>>>> something happent" then it is fine to have a manual - or auto if fancy 
>>>>>>>> -
>>>>>>>> recovery procedure. This is where it makes all the difference and 
>>>>>>>> impacts
>>>>>>>> the developpers, ops (all users basically).
>>>>>>>>> On Sun, Feb 18, 2018 at 9:16 AM Romain Manni-Bucau <
>>>>>>>>> rmannibu...@gmail.com> wrote:
>>>>>>>>>> Agree Eugene except that "best effort" means that. It is also
>>>>>>>>>> often used to say "at will" and this is what triggered this thread.
>>>>>>>>>> I'm fine using "except if the machine state prevents it" but
>>>>>>>>>> "best effort" is too open and can be very badly and wrongly 
>>>>>>>>>> perceived by
>>>>>>>>>> users (like I did).
>>>>>>>>>> Romain Manni-Bucau
>>>>>>>>>> @rmannibucau <https://twitter.com/rmannibucau> |  Blog
>>>>>>>>>> <https://rmannibucau.metawerx.net/> | Old Blog
>>>>>>>>>> <http://rmannibucau.wordpress.com> | Github
>>>>>>>>>> <https://github.com/rmannibucau> | LinkedIn
>>>>>>>>>> <https://www.linkedin.com/in/rmannibucau> | Book
>>>>>>>>>> <https://www.packtpub.com/application-development/java-ee-8-high-performance>
>>>>>>>>>> 2018-02-18 18:13 GMT+01:00 Eugene Kirpichov <kirpic...@google.com
>>>>>>>>>> >:
>>>>>>>>>>> It will not be called if it's impossible to call it: in the
>>>>>>>>>>> example situation you have (intergalactic crash), and in a number 
>>>>>>>>>>> of more
>>>>>>>>>>> common cases: eg in case the worker container has crashed (eg user 
>>>>>>>>>>> code in
>>>>>>>>>>> a different thread called a C library over JNI and it segfaulted), 
>>>>>>>>>>> JVM bug,
>>>>>>>>>>> crash due to user code OOM, in case the worker has lost network
>>>>>>>>>>> connectivity (then it may be called but it won't be able to do 
>>>>>>>>>>> anything
>>>>>>>>>>> useful), in case this is running on a preemptible VM and it was 
>>>>>>>>>>> preempted
>>>>>>>>>>> by the underlying cluster manager without notice or if the worker 
>>>>>>>>>>> was too
>>>>>>>>>>> busy with other stuff (eg calling other Teardown functions) until 
>>>>>>>>>>> the
>>>>>>>>>>> preemption timeout elapsed, in case the underlying hardware simply 
>>>>>>>>>>> failed
>>>>>>>>>>> (which happens quite often at scale), and in many other conditions.
>>>>>>>>>>> "Best effort" is the commonly used term to describe such
>>>>>>>>>>> behavior. Please feel free to file bugs for cases where you 
>>>>>>>>>>> observed a
>>>>>>>>>>> runner not call Teardown in a situation where it was possible to 
>>>>>>>>>>> call it
>>>>>>>>>>> but the runner made insufficient effort.
>>>>>>>>>>> On Sun, Feb 18, 2018, 9:02 AM Romain Manni-Bucau <
>>>>>>>>>>> rmannibu...@gmail.com> wrote:
>>>>>>>>>>>> 2018-02-18 18:00 GMT+01:00 Eugene Kirpichov <
>>>>>>>>>>>> kirpic...@google.com>:
>>>>>>>>>>>>> On Sun, Feb 18, 2018, 2:06 AM Romain Manni-Bucau <
>>>>>>>>>>>>> rmannibu...@gmail.com> wrote:
>>>>>>>>>>>>>> Le 18 févr. 2018 00:23, "Kenneth Knowles" <k...@google.com> a
>>>>>>>>>>>>>> écrit :
>>>>>>>>>>>>>> On Sat, Feb 17, 2018 at 3:09 PM, Romain Manni-Bucau <
>>>>>>>>>>>>>> rmannibu...@gmail.com> wrote:
>>>>>>>>>>>>>>> If you give an example of a high-level need (e.g. "I'm
>>>>>>>>>>>>>>> trying to write an IO for system $x and it requires the 
>>>>>>>>>>>>>>> following
>>>>>>>>>>>>>>> initialization and the following cleanup logic and the 
>>>>>>>>>>>>>>> following processing
>>>>>>>>>>>>>>> in between") I'll be better able to help you.
>>>>>>>>>>>>>>> Take a simple example of a transform requiring a connection.
>>>>>>>>>>>>>>> Using bundles is a perf killer since size is not controlled. 
>>>>>>>>>>>>>>> Using teardown
>>>>>>>>>>>>>>> doesnt allow you to release the connection since it is a best 
>>>>>>>>>>>>>>> effort thing.
>>>>>>>>>>>>>>> Not releasing the connection makes you pay a lot - aws ;) - or 
>>>>>>>>>>>>>>> prevents you
>>>>>>>>>>>>>>> to launch other processings - concurrent limit.
>>>>>>>>>>>>>> For this example @Teardown is an exact fit. If things die so
>>>>>>>>>>>>>> badly that @Teardown is not called then nothing else can be 
>>>>>>>>>>>>>> called to close
>>>>>>>>>>>>>> the connection either. What AWS service are you thinking of that 
>>>>>>>>>>>>>> stays open
>>>>>>>>>>>>>> for a long time when everything at the other end has died?
>>>>>>>>>>>>>> You assume connections are kind of stateless but some
>>>>>>>>>>>>>> (proprietary) protocols requires some closing exchanges which 
>>>>>>>>>>>>>> are not only
>>>>>>>>>>>>>> "im leaving".
>>>>>>>>>>>>>> For aws i was thinking about starting some services -
>>>>>>>>>>>>>> machines - on the fly in a pipeline startup and closing them at 
>>>>>>>>>>>>>> the end. If
>>>>>>>>>>>>>> teardown is not called you leak machines and money. You can say 
>>>>>>>>>>>>>> it can be
>>>>>>>>>>>>>> done another way...as the full pipeline ;).
>>>>>>>>>>>>>> I dont want to be picky but if beam cant handle its
>>>>>>>>>>>>>> components lifecycle it can be used at scale for generic 
>>>>>>>>>>>>>> pipelines and if
>>>>>>>>>>>>>> bound to some particular IO.
>>>>>>>>>>>>>> What does prevent to enforce teardown - ignoring the
>>>>>>>>>>>>>> interstellar crash case which cant be handled by any human 
>>>>>>>>>>>>>> system? Nothing
>>>>>>>>>>>>>> technically. Why do you push to not handle it? Is it due to some 
>>>>>>>>>>>>>> legacy
>>>>>>>>>>>>>> code on dataflow or something else?
>>>>>>>>>>>>> Teardown *is* already documented and implemented this way
>>>>>>>>>>>>> (best-effort). So I'm not sure what kind of change you're asking 
>>>>>>>>>>>>> for.
>>>>>>>>>>>> Remove "best effort" from the javadoc. If it is not call then
>>>>>>>>>>>> it is a bug and we are done :).
>>>>>>>>>>>>>> Also what does it mean for the users? Direct runner does it
>>>>>>>>>>>>>> so if a user udes the RI in test, he will get a different 
>>>>>>>>>>>>>> behavior in prod?
>>>>>>>>>>>>>> Also dont forget the user doesnt know what the IOs he composes 
>>>>>>>>>>>>>> use so this
>>>>>>>>>>>>>> is so impacting for the whole product than he must be handled 
>>>>>>>>>>>>>> IMHO.
>>>>>>>>>>>>>> I understand the portability culture is new in big data world
>>>>>>>>>>>>>> but it is not a reason to ignore what people did for years and 
>>>>>>>>>>>>>> do it wrong
>>>>>>>>>>>>>> before doing right ;).
>>>>>>>>>>>>>> My proposal is to list what can prevent to guarantee - in the
>>>>>>>>>>>>>> normal IT conditions - the execution of teardown. Then we see if 
>>>>>>>>>>>>>> we can
>>>>>>>>>>>>>> handle it and only if there is a technical reason we cant we 
>>>>>>>>>>>>>> make it
>>>>>>>>>>>>>> experimental/unsupported in the api. I know spark and flink can, 
>>>>>>>>>>>>>> any
>>>>>>>>>>>>>> unknown blocker for other runners?
>>>>>>>>>>>>>> Technical note: even a kill should go through java shutdown
>>>>>>>>>>>>>> hooks otherwise your environment (beam enclosing software) is 
>>>>>>>>>>>>>> fully
>>>>>>>>>>>>>> unhandled and your overall system is uncontrolled. Only case 
>>>>>>>>>>>>>> where it is
>>>>>>>>>>>>>> not true is when the software is always owned by a vendor and 
>>>>>>>>>>>>>> never
>>>>>>>>>>>>>> installed on customer environment. In this case it belongd to 
>>>>>>>>>>>>>> the vendor to
>>>>>>>>>>>>>> handle beam API and not to beam to adjust its API for a vendor - 
>>>>>>>>>>>>>> otherwise
>>>>>>>>>>>>>> all unsupported features by one runner should be made optional 
>>>>>>>>>>>>>> right?
>>>>>>>>>>>>>> All state is not about network, even in distributed systems
>>>>>>>>>>>>>> so this is key to have an explicit and defined lifecycle.
>>>>>>>>>>>>>> Kenn

Reply via email to