@Reuven: in practise it is created by pool of 256 but leads to the same
pattern, the teardown is just a "if (iCreatedThem) releaseThem();"
1. wait logic is about passing the value which is not always possible (like
15% of cases from my raw estimate)
2. sdf: i'll try to detail why i mention SDF more here

Concretely beam exposes a portable API (included in the SDK core). This API
defines a *container* API and therefore implies bean lifecycles. I'll not
detail them all but just use the sources and dofn (not sdf) to illustrate
the idea I'm trying to develop.

A. Source

A source computes a partition plan with 2 primitives: estimateSize and
split. As an user you can expect both to be called on the same bean
instance to avoid to pay the same connection cost(s) twice. Concretely:

try {
} finally {

this is not guaranteed by the API so you must do:

try {
} finally {
try {
} finally {

+ a workaround with an internal estimate size since this primitive is often
called in split but you dont want to connect twice in the second phase.

Why do you need that? Simply cause you want to define an API to implement
sources which initializes the source bean and destroys it.
I insists it is a very very basic concern for such API. However beam
doesn't embraces it and doesn't assume it so building any API on top of
beam is very hurtful today and for direct beam users you hit the exact same
issues - check how IO are implemented, the static utilities which create
volatile connections preventing to reuse existing connection in a single
method (

Same logic applies to the reader which is then created.

B. DoFn & SDF

As a fn dev you expect the same from the beam runtime: init(); try { while
(...) process(); } finally { destroy(); } and that it is executed on the
exact same instance to be able to be stateful at that level for expensive
connections/operations/flow state handling.

As you mentionned with the million example, this sequence should happen for
each single instance so 1M times for your example.

Now why did I mention SDF several times? Because SDF is a generalisation of
both cases (source and dofn). Therefore it creates way more instances and
requires to have a way more strict/explicit definition of the exact
lifecycle and which instance does what. Since beam handles the full
lifecycle of the bean instances it must provide init/destroy hooks
(setup/teardown) which can be stateful.

If you take the JDBC example which was mentionned earlier. Today, because
of the teardown issue it uses bundles. Since bundles size is not defined -
and will not with SDF, it must use a pool to be able to reuse a connection
instance to not correct performances. Now with the SDF and the split
increase, how do you handle the pool size? Generally in batch you use a
single connection per thread to avoid to consume all database connections.
With a pool you have 2 choices: 1. use a pool of 1, 2. use a pool a bit
higher but multiplied by the number of beans you will likely x2 or 3 the
connection count and make the execution fail with "no more connection
available". I you picked 1 (pool of #1), then you still have to have a
reliable teardown by pool instance (close() generally) to ensure you
release the pool and don't leak the connection information in the JVM. In
all case you come back to the init()/destroy() lifecycle even if you fake
to get connections with bundles.

Just to make it obvious: SDF mentions are just cause SDF imply all the
current issues with the loose definition of the bean lifecycles at an
exponential level, nothing else.

Romain Manni-Bucau
@rmannibucau <https://twitter.com/rmannibucau> |  Blog
<https://rmannibucau.metawerx.net/> | Old Blog
<http://rmannibucau.wordpress.com> | Github <https://github.com/rmannibucau> |
LinkedIn <https://www.linkedin.com/in/rmannibucau> | Book

2018-02-18 22:32 GMT+01:00 Eugene Kirpichov <kirpic...@google.com>:

> The kind of whole-transform lifecycle you're mentioning can be
> accomplished using the Wait transform as I suggested in the thread above,
> and I believe it should become the canonical way to do that.
> (Would like to reiterate one more time, as the main author of most design
> documents related to SDF and of its implementation in the Java direct and
> dataflow runner that SDF is fully unrelated to the topic of cleanup - I'm
> very confused as to why it keeps coming up)
> On Sun, Feb 18, 2018, 1:15 PM Romain Manni-Bucau <rmannibu...@gmail.com>
> wrote:
>> I kind of agree except transforms lack a lifecycle too. My understanding
>> is that sdf could be a way to unify it and clean the api.
>> Otherwise how to normalize - single api -  lifecycle of transforms?
>> Le 18 févr. 2018 21:32, "Ben Chambers" <bchamb...@apache.org> a écrit :
>>> Are you sure that focusing on the cleanup of specific DoFn's is
>>> appropriate? Many cases where cleanup is necessary, it is around an entire
>>> composite PTransform. I think there have been discussions/proposals around
>>> a more methodical "cleanup" option, but those haven't been implemented, to
>>> the best of my knowledge.
>>> For instance, consider the steps of a FileIO:
>>> 1. Write to a bunch (N shards) of temporary files
>>> 2. When all temporary files are complete, attempt to do a bulk copy to
>>> put them in the final destination.
>>> 3. Cleanup all the temporary files.
>>> (This is often desirable because it minimizes the chance of seeing
>>> partial/incomplete results in the final destination).
>>> In the above, you'd want step 1 to execute on many workers, likely using
>>> a ParDo (say N different workers).
>>> The move step should only happen once, so on one worker. This means it
>>> will be a different DoFn, likely with some stuff done to ensure it runs on
>>> one worker.
>>> In such a case, cleanup / @TearDown of the DoFn is not enough. We need
>>> an API for a PTransform to schedule some cleanup work for when the
>>> transform is "done". In batch this is relatively straightforward, but
>>> doesn't exist. This is the source of some problems, such as BigQuery sink
>>> leaving files around that have failed to import into BigQuery.
>>> In streaming this is less straightforward -- do you want to wait until
>>> the end of the pipeline? Or do you want to wait until the end of the
>>> window? In practice, you just want to wait until you know nobody will need
>>> the resource anymore.
>>> This led to some discussions around a "cleanup" API, where you could
>>> have a transform that output resource objects. Each resource object would
>>> have logic for cleaning it up. And there would be something that indicated
>>> what parts of the pipeline needed that resource, and what kind of temporal
>>> lifetime those objects had. As soon as that part of the pipeline had
>>> advanced far enough that it would no longer need the resources, they would
>>> get cleaned up. This can be done at pipeline shutdown, or incrementally
>>> during a streaming pipeline, etc.
>>> Would something like this be a better fit for your use case? If not, why
>>> is handling teardown within a single DoFn sufficient?
>>> On Sun, Feb 18, 2018 at 11:53 AM Romain Manni-Bucau <
>>> rmannibu...@gmail.com> wrote:
>>>> Yes 1M. Lets try to explain you simplifying the overall execution. Each
>>>> instance - one fn so likely in a thread of a worker - has its lifecycle.
>>>> Caricaturally: "new" and garbage collection.
>>>> In practise, new is often an unsafe allocate (deserialization) but it
>>>> doesnt matter here.
>>>> What i want is any "new" to have a following setup before any process
>>>> or stattbundle and the last time beam has the instance before it is gc-ed
>>>> and after last finishbundle it calls teardown.
>>>> It is as simple as it.
>>>> This way no need to comibe fn in a way making a fn not self contained
>>>> to implement basic transforms.
>>>> Le 18 févr. 2018 20:07, "Reuven Lax" <re...@google.com> a écrit :
>>>>> On Sun, Feb 18, 2018 at 10:50 AM, Romain Manni-Bucau <
>>>>> rmannibu...@gmail.com> wrote:
>>>>>> Le 18 févr. 2018 19:28, "Ben Chambers" <bchamb...@apache.org> a
>>>>>> écrit :
>>>>>> It feels like his thread may be a bit off-track. Rather than focusing
>>>>>> on the semantics of the existing methods -- which have been noted to be
>>>>>> meet many existing use cases -- it would be helpful to focus on more on 
>>>>>> the
>>>>>> reason you are looking for something with different semantics.
>>>>>> Some possibilities (I'm not sure which one you are trying to do):
>>>>>> 1. Clean-up some external, global resource, that was initialized once
>>>>>> during the startup of the pipeline. If this is the case, how are you
>>>>>> ensuring it was really only initialized once (and not once per worker, 
>>>>>> per
>>>>>> thread, per instance, etc.)? How do you know when the pipeline should
>>>>>> release it? If the answer is "when it reaches step X", then what about a
>>>>>> streaming pipeline?
>>>>>> When the dofn is no more needed logically ie when the batch is done
>>>>>> or stream is stopped (manually or by a jvm shutdown)
>>>>> I'm really not following what this means.
>>>>> Let's say that a pipeline is running 1000 workers, and each worker is
>>>>> running 1000 threads (each running a copy of the same DoFn). How many
>>>>> cleanups do you want (do you want 1000 * 1000 = 1M cleanups) and when do
>>>>> you want it called? When the entire pipeline is shut down? When an
>>>>> individual worker is about to shut down (which may be temporary - may be
>>>>> about to start back up)? Something else?
>>>>>> 2. Finalize some resources that are used within some region of the
>>>>>> pipeline. While, the DoFn lifecycle methods are not a good fit for this
>>>>>> (they are focused on managing resources within the DoFn), you could model
>>>>>> this on how FileIO finalizes the files that it produced. For instance:
>>>>>>    a) ParDo generates "resource IDs" (or some token that stores
>>>>>> information about resources)
>>>>>>    b) "Require Deterministic Input" (to prevent retries from changing
>>>>>> resource IDs)
>>>>>>    c) ParDo that initializes the resources
>>>>>>    d) Pipeline segments that use the resources, and eventually output
>>>>>> the fact they're done
>>>>>>    e) "Require Deterministic Input"
>>>>>>    f) ParDo that frees the resources
>>>>>> By making the use of the resource part of the data it is possible to
>>>>>> "checkpoint" which resources may be in use or have been finished by using
>>>>>> the require deterministic input. This is important to ensuring everything
>>>>>> is actually cleaned up.
>>>>>> I nees that but generic and not case by case to industrialize some
>>>>>> api on top of beam.
>>>>>> 3. Some other use case that I may be missing? If it is this case,
>>>>>> could you elaborate on what you are trying to accomplish? That would help
>>>>>> me understand both the problems with existing options and possibly what
>>>>>> could be done to help.
>>>>>> I understand there are sorkaround for almost all cases but means each
>>>>>> transform is different in its lifecycle handling  except i dislike it a 
>>>>>> lot
>>>>>> at a scale and as a user since you cant put any unified practise on top 
>>>>>> of
>>>>>> beam, it also makes beam very hard to integrate or to use to build higher
>>>>>> level libraries or softwares.
>>>>>> This is why i tried to not start the workaround discussions and just
>>>>>> stay at API level.
>>>>>> -- Ben
>>>>>> On Sun, Feb 18, 2018 at 9:56 AM Romain Manni-Bucau <
>>>>>> rmannibu...@gmail.com> wrote:
>>>>>>> 2018-02-18 18:36 GMT+01:00 Eugene Kirpichov <kirpic...@google.com>:
>>>>>>>> "Machine state" is overly low-level because many of the possible
>>>>>>>> reasons can happen on a perfectly fine machine.
>>>>>>>> If you'd like to rephrase it to "it will be called except in
>>>>>>>> various situations where it's logically impossible or impractical to
>>>>>>>> guarantee that it's called", that's fine. Or you can list some of the
>>>>>>>> examples above.
>>>>>>> Sounds ok to me
>>>>>>>> The main point for the user is, you *will* see non-preventable
>>>>>>>> situations where it couldn't be called - it's not just intergalactic
>>>>>>>> crashes - so if the logic is very important (e.g. cleaning up a large
>>>>>>>> amount of temporary files, shutting down a large number of VMs you 
>>>>>>>> started
>>>>>>>> etc), you have to express it using one of the other methods that have
>>>>>>>> stricter guarantees (which obviously come at a cost, e.g. no
>>>>>>>> pass-by-reference).
>>>>>>> FinishBundle has the exact same guarantee sadly so not which which
>>>>>>> other method you speak about. Concretely if you make it really 
>>>>>>> unreliable -
>>>>>>> this is what best effort sounds to me - then users can use it to clean
>>>>>>> anything but if you make it "can happen but it is unexpected and means
>>>>>>> something happent" then it is fine to have a manual - or auto if fancy -
>>>>>>> recovery procedure. This is where it makes all the difference and 
>>>>>>> impacts
>>>>>>> the developpers, ops (all users basically).
>>>>>>>> On Sun, Feb 18, 2018 at 9:16 AM Romain Manni-Bucau <
>>>>>>>> rmannibu...@gmail.com> wrote:
>>>>>>>>> Agree Eugene except that "best effort" means that. It is also
>>>>>>>>> often used to say "at will" and this is what triggered this thread.
>>>>>>>>> I'm fine using "except if the machine state prevents it" but "best
>>>>>>>>> effort" is too open and can be very badly and wrongly perceived by 
>>>>>>>>> users
>>>>>>>>> (like I did).
>>>>>>>>> Romain Manni-Bucau
>>>>>>>>> @rmannibucau <https://twitter.com/rmannibucau> |  Blog
>>>>>>>>> <https://rmannibucau.metawerx.net/> | Old Blog
>>>>>>>>> <http://rmannibucau.wordpress.com> | Github
>>>>>>>>> <https://github.com/rmannibucau> | LinkedIn
>>>>>>>>> <https://www.linkedin.com/in/rmannibucau> | Book
>>>>>>>>> <https://www.packtpub.com/application-development/java-ee-8-high-performance>
>>>>>>>>> 2018-02-18 18:13 GMT+01:00 Eugene Kirpichov <kirpic...@google.com>
>>>>>>>>> :
>>>>>>>>>> It will not be called if it's impossible to call it: in the
>>>>>>>>>> example situation you have (intergalactic crash), and in a number of 
>>>>>>>>>> more
>>>>>>>>>> common cases: eg in case the worker container has crashed (eg user 
>>>>>>>>>> code in
>>>>>>>>>> a different thread called a C library over JNI and it segfaulted), 
>>>>>>>>>> JVM bug,
>>>>>>>>>> crash due to user code OOM, in case the worker has lost network
>>>>>>>>>> connectivity (then it may be called but it won't be able to do 
>>>>>>>>>> anything
>>>>>>>>>> useful), in case this is running on a preemptible VM and it was 
>>>>>>>>>> preempted
>>>>>>>>>> by the underlying cluster manager without notice or if the worker 
>>>>>>>>>> was too
>>>>>>>>>> busy with other stuff (eg calling other Teardown functions) until the
>>>>>>>>>> preemption timeout elapsed, in case the underlying hardware simply 
>>>>>>>>>> failed
>>>>>>>>>> (which happens quite often at scale), and in many other conditions.
>>>>>>>>>> "Best effort" is the commonly used term to describe such
>>>>>>>>>> behavior. Please feel free to file bugs for cases where you observed 
>>>>>>>>>> a
>>>>>>>>>> runner not call Teardown in a situation where it was possible to 
>>>>>>>>>> call it
>>>>>>>>>> but the runner made insufficient effort.
>>>>>>>>>> On Sun, Feb 18, 2018, 9:02 AM Romain Manni-Bucau <
>>>>>>>>>> rmannibu...@gmail.com> wrote:
>>>>>>>>>>> 2018-02-18 18:00 GMT+01:00 Eugene Kirpichov <
>>>>>>>>>>> kirpic...@google.com>:
>>>>>>>>>>>> On Sun, Feb 18, 2018, 2:06 AM Romain Manni-Bucau <
>>>>>>>>>>>> rmannibu...@gmail.com> wrote:
>>>>>>>>>>>>> Le 18 févr. 2018 00:23, "Kenneth Knowles" <k...@google.com> a
>>>>>>>>>>>>> écrit :
>>>>>>>>>>>>> On Sat, Feb 17, 2018 at 3:09 PM, Romain Manni-Bucau <
>>>>>>>>>>>>> rmannibu...@gmail.com> wrote:
>>>>>>>>>>>>>> If you give an example of a high-level need (e.g. "I'm trying
>>>>>>>>>>>>>> to write an IO for system $x and it requires the following 
>>>>>>>>>>>>>> initialization
>>>>>>>>>>>>>> and the following cleanup logic and the following processing in 
>>>>>>>>>>>>>> between")
>>>>>>>>>>>>>> I'll be better able to help you.
>>>>>>>>>>>>>> Take a simple example of a transform requiring a connection.
>>>>>>>>>>>>>> Using bundles is a perf killer since size is not controlled. 
>>>>>>>>>>>>>> Using teardown
>>>>>>>>>>>>>> doesnt allow you to release the connection since it is a best 
>>>>>>>>>>>>>> effort thing.
>>>>>>>>>>>>>> Not releasing the connection makes you pay a lot - aws ;) - or 
>>>>>>>>>>>>>> prevents you
>>>>>>>>>>>>>> to launch other processings - concurrent limit.
>>>>>>>>>>>>> For this example @Teardown is an exact fit. If things die so
>>>>>>>>>>>>> badly that @Teardown is not called then nothing else can be 
>>>>>>>>>>>>> called to close
>>>>>>>>>>>>> the connection either. What AWS service are you thinking of that 
>>>>>>>>>>>>> stays open
>>>>>>>>>>>>> for a long time when everything at the other end has died?
>>>>>>>>>>>>> You assume connections are kind of stateless but some
>>>>>>>>>>>>> (proprietary) protocols requires some closing exchanges which are 
>>>>>>>>>>>>> not only
>>>>>>>>>>>>> "im leaving".
>>>>>>>>>>>>> For aws i was thinking about starting some services - machines
>>>>>>>>>>>>> - on the fly in a pipeline startup and closing them at the end. 
>>>>>>>>>>>>> If teardown
>>>>>>>>>>>>> is not called you leak machines and money. You can say it can be 
>>>>>>>>>>>>> done
>>>>>>>>>>>>> another way...as the full pipeline ;).
>>>>>>>>>>>>> I dont want to be picky but if beam cant handle its components
>>>>>>>>>>>>> lifecycle it can be used at scale for generic pipelines and if 
>>>>>>>>>>>>> bound to
>>>>>>>>>>>>> some particular IO.
>>>>>>>>>>>>> What does prevent to enforce teardown - ignoring the
>>>>>>>>>>>>> interstellar crash case which cant be handled by any human 
>>>>>>>>>>>>> system? Nothing
>>>>>>>>>>>>> technically. Why do you push to not handle it? Is it due to some 
>>>>>>>>>>>>> legacy
>>>>>>>>>>>>> code on dataflow or something else?
>>>>>>>>>>>> Teardown *is* already documented and implemented this way
>>>>>>>>>>>> (best-effort). So I'm not sure what kind of change you're asking 
>>>>>>>>>>>> for.
>>>>>>>>>>> Remove "best effort" from the javadoc. If it is not call then it
>>>>>>>>>>> is a bug and we are done :).
>>>>>>>>>>>>> Also what does it mean for the users? Direct runner does it so
>>>>>>>>>>>>> if a user udes the RI in test, he will get a different behavior 
>>>>>>>>>>>>> in prod?
>>>>>>>>>>>>> Also dont forget the user doesnt know what the IOs he composes 
>>>>>>>>>>>>> use so this
>>>>>>>>>>>>> is so impacting for the whole product than he must be handled 
>>>>>>>>>>>>> IMHO.
>>>>>>>>>>>>> I understand the portability culture is new in big data world
>>>>>>>>>>>>> but it is not a reason to ignore what people did for years and do 
>>>>>>>>>>>>> it wrong
>>>>>>>>>>>>> before doing right ;).
>>>>>>>>>>>>> My proposal is to list what can prevent to guarantee - in the
>>>>>>>>>>>>> normal IT conditions - the execution of teardown. Then we see if 
>>>>>>>>>>>>> we can
>>>>>>>>>>>>> handle it and only if there is a technical reason we cant we make 
>>>>>>>>>>>>> it
>>>>>>>>>>>>> experimental/unsupported in the api. I know spark and flink can, 
>>>>>>>>>>>>> any
>>>>>>>>>>>>> unknown blocker for other runners?
>>>>>>>>>>>>> Technical note: even a kill should go through java shutdown
>>>>>>>>>>>>> hooks otherwise your environment (beam enclosing software) is 
>>>>>>>>>>>>> fully
>>>>>>>>>>>>> unhandled and your overall system is uncontrolled. Only case 
>>>>>>>>>>>>> where it is
>>>>>>>>>>>>> not true is when the software is always owned by a vendor and 
>>>>>>>>>>>>> never
>>>>>>>>>>>>> installed on customer environment. In this case it belongd to the 
>>>>>>>>>>>>> vendor to
>>>>>>>>>>>>> handle beam API and not to beam to adjust its API for a vendor - 
>>>>>>>>>>>>> otherwise
>>>>>>>>>>>>> all unsupported features by one runner should be made optional 
>>>>>>>>>>>>> right?
>>>>>>>>>>>>> All state is not about network, even in distributed systems so
>>>>>>>>>>>>> this is key to have an explicit and defined lifecycle.
>>>>>>>>>>>>> Kenn

Reply via email to