Ismael, your understanding is appropriate for FinishBundle.

One basic issue with this understanding, is that the lifecycle of a DoFn is
much longer than a single bundle (which I think you expressed by adding the
*s). How long the DoFn lives is not defined. In fact a runner is completely
free to decide that it will _never_ destroy the DoFn, in which case
TearDown is never called simply because the DoFn was never torn down.

Also, as mentioned before, the runner can only call TearDown in cases where
the shutdown is in its control. If the JVM is shut down externally, the
runner has no chance to call TearDown. This means that while TearDown is
appropriate for cleaning up in-process resources (open connections, etc.),
it's not the right answer for cleaning up persistent resources. If you rely
on TearDown to delete VMs or delete files, there will be cases in which
those files of VMs are not deleted.

What we are _not_ saying is that the runner is free to just ignore
TearDown. If the runner is explicitly destroying a DoFn object, it should
call TearDown.

Reuven


On Mon, Feb 19, 2018 at 2:35 PM, Ismaël Mejía <ieme...@gmail.com> wrote:

> I also had a different understanding of the lifecycle of a DoFn.
>
> My understanding of the use case for every method in the DoFn was clear and
> perfectly aligned with Thomas explanation, but what I understood was that
> in a
> general terms ‘@Setup was where I got resources/prepare connections and
> @Teardown where I free them’, so calling Teardown seemed essential to have
> a
> complete lifecycle:
> Setup → StartBundle* → ProcessElement* → FinishBundle* → Teardown
>
> The fact that @Teardown could not be called is a new detail for me too,
> and I
> also find weird to have a method that may or not be called as part of an
> API,
> why would users implement teardown if it will not be called? In that case
> probably a cleaner approach would be to get rid of that method altogether,
> no?
>
> But well maybe that’s not so easy too, there was another point: Some user
> reported an issue with leaking resources using KafkaIO in the Spark
> runner, for
> ref.
> https://apachebeam.slack.com/archives/C1AAFJYMP/p1510596938000622
>
> In that moment my understanding was that there was something fishy because
> we
> should be calling Teardown to close correctly the connections and free the
> resources in case of exceptions on start/process/finish, so I filled a
> JIRA and
> fixed this by enforcing the call of teardown for the Spark runner and the
> Flink
> runner:
> https://issues.apache.org/jira/browse/BEAM-3187
> https://issues.apache.org/jira/browse/BEAM-3244
>
> As you can see not calling this method does have consequences at least for
> non-containerized runners. Of course a runner that uses containers could
> not
> care about cleaning the resources this way, but a long living JVM in a
> Hadoop
> environment probably won’t have the same luck. So I am not sure that
> having a
> loose semantic there is the right option, I mean, runners could simply
> guarantee
> that they call teardown and if teardown takes too long they can decide to
> send a
> signal or kill the process/container/etc and go ahead, that way at least
> users
> would have a motivation to implement the teardown method, otherwise it
> doesn’t
> make any sense to have it (API wise).
>
> On Mon, Feb 19, 2018 at 11:30 PM, Eugene Kirpichov <kirpic...@google.com>
> wrote:
> > Romain, would it be fair to say that currently the goal of your
> > participation in this discussion is to identify situations where
> @Teardown
> > in principle could have been called, but some of the current runners
> don't
> > make a good enough effort to call it? If yes - as I said before, please,
> by
> > all means, file bugs of the form "Runner X doesn't call @Teardown in
> > situation Y" if you're aware of any, and feel free to send PRs fixing
> runner
> > X to reliably call @Teardown in situation Y. I think we all agree that
> this
> > would be a good improvement.
> >
> > On Mon, Feb 19, 2018 at 2:03 PM Romain Manni-Bucau <
> rmannibu...@gmail.com>
> > wrote:
> >>
> >>
> >>
> >> Le 19 févr. 2018 22:56, "Reuven Lax" <re...@google.com> a écrit :
> >>
> >>
> >>
> >> On Mon, Feb 19, 2018 at 1:51 PM, Romain Manni-Bucau
> >> <rmannibu...@gmail.com> wrote:
> >>>
> >>>
> >>>
> >>> Le 19 févr. 2018 21:28, "Reuven Lax" <re...@google.com> a écrit :
> >>>
> >>> How do you call teardown? There are cases in which the Java code gets
> no
> >>> indication that the restart is happening (e.g. cases where the machine
> >>> itself is taken down)
> >>>
> >>>
> >>> This is a bug, 0 downtime maintenance is very doable in 2018 ;).
> Crashes
> >>> are bugs, kill -9 to shutdown is a bug too. Other cases let call
> shutdown
> >>> with a hook worse case.
> >>
> >>
> >> What you say here is simply not true.
> >>
> >> There are many scenarios in which workers shutdown with no opportunity
> for
> >> any sort of shutdown hook. Sometimes the entire machine gets shutdown,
> and
> >> not even the OS will have much of a chance to do anything. At scale this
> >> will happen with some regularity, and a distributed system that assumes
> this
> >> will not happen is a poor distributed system.
> >>
> >>
> >> This is part of the infra and there is no reason the machine is shutdown
> >> without shutting down what runs on it before except if it is a bug in
> the
> >> software or setup. I can hear you maybe dont do it everywhere but there
> is
> >> no blocker to do it. Means you can shutdown the machines and guarantee
> >> teardown is called.
> >>
> >> Where i go is simply that it is doable and beam sdk core can assume
> setup
> >> is well done. If there is a best effort downside due to that - with the
> >> meaning you defined - it is an impl bug or a user installation issue.
> >>
> >> Technically all is true.
> >>
> >> What can prevent teardown is a hardware failure or so. This is fine and
> >> doesnt need to be in doc since it is life in IT and obvious or must be
> very
> >> explicit to avoid current ambiguity.
> >>
> >>
> >>>
> >>>
> >>>
> >>>
> >>> On Mon, Feb 19, 2018, 12:24 PM Romain Manni-Bucau <
> rmannibu...@gmail.com>
> >>> wrote:
> >>>>
> >>>> Restarting doesnt mean you dont call teardown. Except a bug there is
> no
> >>>> reason - technically - it happens, no reason.
> >>>>
> >>>> Le 19 févr. 2018 21:14, "Reuven Lax" <re...@google.com> a écrit :
> >>>>>
> >>>>> Workers restarting is not a bug, it's standard often expected.
> >>>>>
> >>>>> On Mon, Feb 19, 2018, 12:03 PM Romain Manni-Bucau
> >>>>> <rmannibu...@gmail.com> wrote:
> >>>>>>
> >>>>>> Nothing, as mentionned it is a bug so recovery is a bug recovery
> >>>>>> (procedure)
> >>>>>>
> >>>>>> Le 19 févr. 2018 19:42, "Eugene Kirpichov" <kirpic...@google.com> a
> >>>>>> écrit :
> >>>>>>>
> >>>>>>> So what would you like to happen if there is a crash? The DoFn
> >>>>>>> instance no longer exists because the JVM it ran on no longer
> exists. What
> >>>>>>> should Teardown be called on?
> >>>>>>>
> >>>>>>>
> >>>>>>> On Mon, Feb 19, 2018, 10:20 AM Romain Manni-Bucau
> >>>>>>> <rmannibu...@gmail.com> wrote:
> >>>>>>>>
> >>>>>>>> This is what i want and not 999999 teardowns for 1000000 setups
> >>>>>>>> until there is an unexpected crash (= a bug).
> >>>>>>>>
> >>>>>>>> Le 19 févr. 2018 18:57, "Reuven Lax" <re...@google.com> a écrit :
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>> On Mon, Feb 19, 2018 at 7:11 AM, Romain Manni-Bucau
> >>>>>>>>> <rmannibu...@gmail.com> wrote:
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>> 2018-02-19 15:57 GMT+01:00 Reuven Lax <re...@google.com>:
> >>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>> On Mon, Feb 19, 2018 at 12:35 AM, Romain Manni-Bucau
> >>>>>>>>>>> <rmannibu...@gmail.com> wrote:
> >>>>>>>>>>>>
> >>>>>>>>>>>> @Reuven: in practise it is created by pool of 256 but leads to
> >>>>>>>>>>>> the same pattern, the teardown is just a "if (iCreatedThem)
> releaseThem();"
> >>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>> How do you control "256?" Even if you have a pool of 256
> workers,
> >>>>>>>>>>> nothing in Beam guarantees how many threads and DoFns are
> created per
> >>>>>>>>>>> worker. In theory the runner might decide to create 1000
> threads on each
> >>>>>>>>>>> worker.
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>> Nop was the other way around, in this case on AWS you can get
> 256
> >>>>>>>>>> instances at once but not 512 (which will be 2x256). So when
> you compute the
> >>>>>>>>>> distribution you allocate to some fn the role to own the
> instance lookup and
> >>>>>>>>>> releasing.
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>> I still don't understand. Let's be more precise. If you write the
> >>>>>>>>> following code:
> >>>>>>>>>
> >>>>>>>>>    pCollection.apply(ParDo.of(new MyDoFn()));
> >>>>>>>>>
> >>>>>>>>> There is no way to control how many instances of MyDoFn are
> >>>>>>>>> created. The runner might decided to create a million instances
> of this
> >>>>>>>>> class across your worker pool, which means that you will get a
> million Setup
> >>>>>>>>> and Teardown calls.
> >>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>> Anyway this was just an example of an external resource you must
> >>>>>>>>>> release. Real topic is that beam should define asap a
> guaranteed generic
> >>>>>>>>>> lifecycle to let user embrace its programming model.
> >>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>>>
> >>>>>>>>>>>> @Eugene:
> >>>>>>>>>>>> 1. wait logic is about passing the value which is not always
> >>>>>>>>>>>> possible (like 15% of cases from my raw estimate)
> >>>>>>>>>>>> 2. sdf: i'll try to detail why i mention SDF more here
> >>>>>>>>>>>>
> >>>>>>>>>>>>
> >>>>>>>>>>>> Concretely beam exposes a portable API (included in the SDK
> >>>>>>>>>>>> core). This API defines a *container* API and therefore
> implies bean
> >>>>>>>>>>>> lifecycles. I'll not detail them all but just use the sources
> and dofn (not
> >>>>>>>>>>>> sdf) to illustrate the idea I'm trying to develop.
> >>>>>>>>>>>>
> >>>>>>>>>>>> A. Source
> >>>>>>>>>>>>
> >>>>>>>>>>>> A source computes a partition plan with 2 primitives:
> >>>>>>>>>>>> estimateSize and split. As an user you can expect both to be
> called on the
> >>>>>>>>>>>> same bean instance to avoid to pay the same connection
> cost(s) twice.
> >>>>>>>>>>>> Concretely:
> >>>>>>>>>>>>
> >>>>>>>>>>>> connect()
> >>>>>>>>>>>> try {
> >>>>>>>>>>>>   estimateSize()
> >>>>>>>>>>>>   split()
> >>>>>>>>>>>> } finally {
> >>>>>>>>>>>>   disconnect()
> >>>>>>>>>>>> }
> >>>>>>>>>>>>
> >>>>>>>>>>>> this is not guaranteed by the API so you must do:
> >>>>>>>>>>>>
> >>>>>>>>>>>> connect()
> >>>>>>>>>>>> try {
> >>>>>>>>>>>>   estimateSize()
> >>>>>>>>>>>> } finally {
> >>>>>>>>>>>>   disconnect()
> >>>>>>>>>>>> }
> >>>>>>>>>>>> connect()
> >>>>>>>>>>>> try {
> >>>>>>>>>>>>   split()
> >>>>>>>>>>>> } finally {
> >>>>>>>>>>>>   disconnect()
> >>>>>>>>>>>> }
> >>>>>>>>>>>>
> >>>>>>>>>>>> + a workaround with an internal estimate size since this
> >>>>>>>>>>>> primitive is often called in split but you dont want to
> connect twice in the
> >>>>>>>>>>>> second phase.
> >>>>>>>>>>>>
> >>>>>>>>>>>> Why do you need that? Simply cause you want to define an API
> to
> >>>>>>>>>>>> implement sources which initializes the source bean and
> destroys it.
> >>>>>>>>>>>> I insists it is a very very basic concern for such API.
> However
> >>>>>>>>>>>> beam doesn't embraces it and doesn't assume it so building
> any API on top of
> >>>>>>>>>>>> beam is very hurtful today and for direct beam users you hit
> the exact same
> >>>>>>>>>>>> issues - check how IO are implemented, the static utilities
> which create
> >>>>>>>>>>>> volatile connections preventing to reuse existing connection
> in a single
> >>>>>>>>>>>> method
> >>>>>>>>>>>> (https://github.com/apache/beam/blob/master/sdks/java/io/
> elasticsearch/src/main/java/org/apache/beam/sdk/io/
> elasticsearch/ElasticsearchIO.java#L862).
> >>>>>>>>>>>>
> >>>>>>>>>>>> Same logic applies to the reader which is then created.
> >>>>>>>>>>>>
> >>>>>>>>>>>> B. DoFn & SDF
> >>>>>>>>>>>>
> >>>>>>>>>>>> As a fn dev you expect the same from the beam runtime: init();
> >>>>>>>>>>>> try { while (...) process(); } finally { destroy(); } and
> that it is
> >>>>>>>>>>>> executed on the exact same instance to be able to be stateful
> at that level
> >>>>>>>>>>>> for expensive connections/operations/flow state handling.
> >>>>>>>>>>>>
> >>>>>>>>>>>> As you mentionned with the million example, this sequence
> should
> >>>>>>>>>>>> happen for each single instance so 1M times for your example.
> >>>>>>>>>>>>
> >>>>>>>>>>>> Now why did I mention SDF several times? Because SDF is a
> >>>>>>>>>>>> generalisation of both cases (source and dofn). Therefore it
> creates way
> >>>>>>>>>>>> more instances and requires to have a way more
> strict/explicit definition of
> >>>>>>>>>>>> the exact lifecycle and which instance does what. Since beam
> handles the
> >>>>>>>>>>>> full lifecycle of the bean instances it must provide
> init/destroy hooks
> >>>>>>>>>>>> (setup/teardown) which can be stateful.
> >>>>>>>>>>>>
> >>>>>>>>>>>> If you take the JDBC example which was mentionned earlier.
> >>>>>>>>>>>> Today, because of the teardown issue it uses bundles. Since
> bundles size is
> >>>>>>>>>>>> not defined - and will not with SDF, it must use a pool to be
> able to reuse
> >>>>>>>>>>>> a connection instance to not correct performances. Now with
> the SDF and the
> >>>>>>>>>>>> split increase, how do you handle the pool size? Generally in
> batch you use
> >>>>>>>>>>>> a single connection per thread to avoid to consume all
> database connections.
> >>>>>>>>>>>> With a pool you have 2 choices: 1. use a pool of 1, 2. use a
> pool a bit
> >>>>>>>>>>>> higher but multiplied by the number of beans you will likely
> x2 or 3 the
> >>>>>>>>>>>> connection count and make the execution fail with "no more
> connection
> >>>>>>>>>>>> available". I you picked 1 (pool of #1), then you still have
> to have a
> >>>>>>>>>>>> reliable teardown by pool instance (close() generally) to
> ensure you release
> >>>>>>>>>>>> the pool and don't leak the connection information in the
> JVM. In all case
> >>>>>>>>>>>> you come back to the init()/destroy() lifecycle even if you
> fake to get
> >>>>>>>>>>>> connections with bundles.
> >>>>>>>>>>>>
> >>>>>>>>>>>> Just to make it obvious: SDF mentions are just cause SDF imply
> >>>>>>>>>>>> all the current issues with the loose definition of the bean
> lifecycles at
> >>>>>>>>>>>> an exponential level, nothing else.
> >>>>>>>>>>>>
> >>>>>>>>>>>>
> >>>>>>>>>>>>
> >>>>>>>>>>>> Romain Manni-Bucau
> >>>>>>>>>>>> @rmannibucau |  Blog | Old Blog | Github | LinkedIn | Book
> >>>>>>>>>>>>
> >>>>>>>>>>>> 2018-02-18 22:32 GMT+01:00 Eugene Kirpichov
> >>>>>>>>>>>> <kirpic...@google.com>:
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> The kind of whole-transform lifecycle you're mentioning can
> be
> >>>>>>>>>>>>> accomplished using the Wait transform as I suggested in the
> thread above,
> >>>>>>>>>>>>> and I believe it should become the canonical way to do that.
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> (Would like to reiterate one more time, as the main author of
> >>>>>>>>>>>>> most design documents related to SDF and of its
> implementation in the Java
> >>>>>>>>>>>>> direct and dataflow runner that SDF is fully unrelated to
> the topic of
> >>>>>>>>>>>>> cleanup - I'm very confused as to why it keeps coming up)
> >>>>>>>>>>>>>
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> On Sun, Feb 18, 2018, 1:15 PM Romain Manni-Bucau
> >>>>>>>>>>>>> <rmannibu...@gmail.com> wrote:
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> I kind of agree except transforms lack a lifecycle too. My
> >>>>>>>>>>>>>> understanding is that sdf could be a way to unify it and
> clean the api.
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> Otherwise how to normalize - single api -  lifecycle of
> >>>>>>>>>>>>>> transforms?
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> Le 18 févr. 2018 21:32, "Ben Chambers" <
> bchamb...@apache.org>
> >>>>>>>>>>>>>> a écrit :
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> Are you sure that focusing on the cleanup of specific
> DoFn's
> >>>>>>>>>>>>>>> is appropriate? Many cases where cleanup is necessary, it
> is around an
> >>>>>>>>>>>>>>> entire composite PTransform. I think there have been
> discussions/proposals
> >>>>>>>>>>>>>>> around a more methodical "cleanup" option, but those
> haven't been
> >>>>>>>>>>>>>>> implemented, to the best of my knowledge.
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> For instance, consider the steps of a FileIO:
> >>>>>>>>>>>>>>> 1. Write to a bunch (N shards) of temporary files
> >>>>>>>>>>>>>>> 2. When all temporary files are complete, attempt to do a
> >>>>>>>>>>>>>>> bulk copy to put them in the final destination.
> >>>>>>>>>>>>>>> 3. Cleanup all the temporary files.
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> (This is often desirable because it minimizes the chance of
> >>>>>>>>>>>>>>> seeing partial/incomplete results in the final
> destination).
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> In the above, you'd want step 1 to execute on many workers,
> >>>>>>>>>>>>>>> likely using a ParDo (say N different workers).
> >>>>>>>>>>>>>>> The move step should only happen once, so on one worker.
> This
> >>>>>>>>>>>>>>> means it will be a different DoFn, likely with some stuff
> done to ensure it
> >>>>>>>>>>>>>>> runs on one worker.
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> In such a case, cleanup / @TearDown of the DoFn is not
> >>>>>>>>>>>>>>> enough. We need an API for a PTransform to schedule some
> cleanup work for
> >>>>>>>>>>>>>>> when the transform is "done". In batch this is relatively
> straightforward,
> >>>>>>>>>>>>>>> but doesn't exist. This is the source of some problems,
> such as BigQuery
> >>>>>>>>>>>>>>> sink leaving files around that have failed to import into
> BigQuery.
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> In streaming this is less straightforward -- do you want to
> >>>>>>>>>>>>>>> wait until the end of the pipeline? Or do you want to wait
> until the end of
> >>>>>>>>>>>>>>> the window? In practice, you just want to wait until you
> know nobody will
> >>>>>>>>>>>>>>> need the resource anymore.
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> This led to some discussions around a "cleanup" API, where
> >>>>>>>>>>>>>>> you could have a transform that output resource objects.
> Each resource
> >>>>>>>>>>>>>>> object would have logic for cleaning it up. And there
> would be something
> >>>>>>>>>>>>>>> that indicated what parts of the pipeline needed that
> resource, and what
> >>>>>>>>>>>>>>> kind of temporal lifetime those objects had. As soon as
> that part of the
> >>>>>>>>>>>>>>> pipeline had advanced far enough that it would no longer
> need the resources,
> >>>>>>>>>>>>>>> they would get cleaned up. This can be done at pipeline
> shutdown, or
> >>>>>>>>>>>>>>> incrementally during a streaming pipeline, etc.
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> Would something like this be a better fit for your use
> case?
> >>>>>>>>>>>>>>> If not, why is handling teardown within a single DoFn
> sufficient?
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> On Sun, Feb 18, 2018 at 11:53 AM Romain Manni-Bucau
> >>>>>>>>>>>>>>> <rmannibu...@gmail.com> wrote:
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> Yes 1M. Lets try to explain you simplifying the overall
> >>>>>>>>>>>>>>>> execution. Each instance - one fn so likely in a thread
> of a worker - has
> >>>>>>>>>>>>>>>> its lifecycle. Caricaturally: "new" and garbage
> collection.
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> In practise, new is often an unsafe allocate
> >>>>>>>>>>>>>>>> (deserialization) but it doesnt matter here.
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> What i want is any "new" to have a following setup before
> >>>>>>>>>>>>>>>> any process or stattbundle and the last time beam has the
> instance before it
> >>>>>>>>>>>>>>>> is gc-ed and after last finishbundle it calls teardown.
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> It is as simple as it.
> >>>>>>>>>>>>>>>> This way no need to comibe fn in a way making a fn not
> self
> >>>>>>>>>>>>>>>> contained to implement basic transforms.
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> Le 18 févr. 2018 20:07, "Reuven Lax" <re...@google.com> a
> >>>>>>>>>>>>>>>> écrit :
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> On Sun, Feb 18, 2018 at 10:50 AM, Romain Manni-Bucau
> >>>>>>>>>>>>>>>>> <rmannibu...@gmail.com> wrote:
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> Le 18 févr. 2018 19:28, "Ben Chambers"
> >>>>>>>>>>>>>>>>>> <bchamb...@apache.org> a écrit :
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> It feels like his thread may be a bit off-track. Rather
> >>>>>>>>>>>>>>>>>> than focusing on the semantics of the existing methods
> -- which have been
> >>>>>>>>>>>>>>>>>> noted to be meet many existing use cases -- it would be
> helpful to focus on
> >>>>>>>>>>>>>>>>>> more on the reason you are looking for something with
> different semantics.
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> Some possibilities (I'm not sure which one you are
> trying
> >>>>>>>>>>>>>>>>>> to do):
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> 1. Clean-up some external, global resource, that was
> >>>>>>>>>>>>>>>>>> initialized once during the startup of the pipeline. If
> this is the case,
> >>>>>>>>>>>>>>>>>> how are you ensuring it was really only initialized
> once (and not once per
> >>>>>>>>>>>>>>>>>> worker, per thread, per instance, etc.)? How do you
> know when the pipeline
> >>>>>>>>>>>>>>>>>> should release it? If the answer is "when it reaches
> step X", then what
> >>>>>>>>>>>>>>>>>> about a streaming pipeline?
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> When the dofn is no more needed logically ie when the
> >>>>>>>>>>>>>>>>>> batch is done or stream is stopped (manually or by a
> jvm shutdown)
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> I'm really not following what this means.
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>> Let's say that a pipeline is running 1000 workers, and
> each
> >>>>>>>>>>>>>>>>> worker is running 1000 threads (each running a copy of
> the same DoFn). How
> >>>>>>>>>>>>>>>>> many cleanups do you want (do you want 1000 * 1000 = 1M
> cleanups) and when
> >>>>>>>>>>>>>>>>> do you want it called? When the entire pipeline is shut
> down? When an
> >>>>>>>>>>>>>>>>> individual worker is about to shut down (which may be
> temporary - may be
> >>>>>>>>>>>>>>>>> about to start back up)? Something else?
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> 2. Finalize some resources that are used within some
> >>>>>>>>>>>>>>>>>> region of the pipeline. While, the DoFn lifecycle
> methods are not a good fit
> >>>>>>>>>>>>>>>>>> for this (they are focused on managing resources within
> the DoFn), you could
> >>>>>>>>>>>>>>>>>> model this on how FileIO finalizes the files that it
> produced. For instance:
> >>>>>>>>>>>>>>>>>>    a) ParDo generates "resource IDs" (or some token that
> >>>>>>>>>>>>>>>>>> stores information about resources)
> >>>>>>>>>>>>>>>>>>    b) "Require Deterministic Input" (to prevent retries
> >>>>>>>>>>>>>>>>>> from changing resource IDs)
> >>>>>>>>>>>>>>>>>>    c) ParDo that initializes the resources
> >>>>>>>>>>>>>>>>>>    d) Pipeline segments that use the resources, and
> >>>>>>>>>>>>>>>>>> eventually output the fact they're done
> >>>>>>>>>>>>>>>>>>    e) "Require Deterministic Input"
> >>>>>>>>>>>>>>>>>>    f) ParDo that frees the resources
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> By making the use of the resource part of the data it is
> >>>>>>>>>>>>>>>>>> possible to "checkpoint" which resources may be in use
> or have been finished
> >>>>>>>>>>>>>>>>>> by using the require deterministic input. This is
> important to ensuring
> >>>>>>>>>>>>>>>>>> everything is actually cleaned up.
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> I nees that but generic and not case by case to
> >>>>>>>>>>>>>>>>>> industrialize some api on top of beam.
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> 3. Some other use case that I may be missing? If it is
> >>>>>>>>>>>>>>>>>> this case, could you elaborate on what you are trying
> to accomplish? That
> >>>>>>>>>>>>>>>>>> would help me understand both the problems with
> existing options and
> >>>>>>>>>>>>>>>>>> possibly what could be done to help.
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> I understand there are sorkaround for almost all cases
> but
> >>>>>>>>>>>>>>>>>> means each transform is different in its lifecycle
> handling  except i
> >>>>>>>>>>>>>>>>>> dislike it a lot at a scale and as a user since you
> cant put any unified
> >>>>>>>>>>>>>>>>>> practise on top of beam, it also makes beam very hard
> to integrate or to use
> >>>>>>>>>>>>>>>>>> to build higher level libraries or softwares.
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> This is why i tried to not start the workaround
> >>>>>>>>>>>>>>>>>> discussions and just stay at API level.
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> -- Ben
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>> On Sun, Feb 18, 2018 at 9:56 AM Romain Manni-Bucau
> >>>>>>>>>>>>>>>>>> <rmannibu...@gmail.com> wrote:
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>> 2018-02-18 18:36 GMT+01:00 Eugene Kirpichov
> >>>>>>>>>>>>>>>>>>> <kirpic...@google.com>:
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>> "Machine state" is overly low-level because many of
> the
> >>>>>>>>>>>>>>>>>>>> possible reasons can happen on a perfectly fine
> machine.
> >>>>>>>>>>>>>>>>>>>> If you'd like to rephrase it to "it will be called
> >>>>>>>>>>>>>>>>>>>> except in various situations where it's logically
> impossible or impractical
> >>>>>>>>>>>>>>>>>>>> to guarantee that it's called", that's fine. Or you
> can list some of the
> >>>>>>>>>>>>>>>>>>>> examples above.
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>> Sounds ok to me
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>> The main point for the user is, you *will* see
> >>>>>>>>>>>>>>>>>>>> non-preventable situations where it couldn't be
> called - it's not just
> >>>>>>>>>>>>>>>>>>>> intergalactic crashes - so if the logic is very
> important (e.g. cleaning up
> >>>>>>>>>>>>>>>>>>>> a large amount of temporary files, shutting down a
> large number of VMs you
> >>>>>>>>>>>>>>>>>>>> started etc), you have to express it using one of the
> other methods that
> >>>>>>>>>>>>>>>>>>>> have stricter guarantees (which obviously come at a
> cost, e.g. no
> >>>>>>>>>>>>>>>>>>>> pass-by-reference).
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>> FinishBundle has the exact same guarantee sadly so not
> >>>>>>>>>>>>>>>>>>> which which other method you speak about. Concretely
> if you make it really
> >>>>>>>>>>>>>>>>>>> unreliable - this is what best effort sounds to me -
> then users can use it
> >>>>>>>>>>>>>>>>>>> to clean anything but if you make it "can happen but
> it is unexpected and
> >>>>>>>>>>>>>>>>>>> means something happent" then it is fine to have a
> manual - or auto if fancy
> >>>>>>>>>>>>>>>>>>> - recovery procedure. This is where it makes all the
> difference and impacts
> >>>>>>>>>>>>>>>>>>> the developpers, ops (all users basically).
> >>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>> On Sun, Feb 18, 2018 at 9:16 AM Romain Manni-Bucau
> >>>>>>>>>>>>>>>>>>>> <rmannibu...@gmail.com> wrote:
> >>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>> Agree Eugene except that "best effort" means that. It
> >>>>>>>>>>>>>>>>>>>>> is also often used to say "at will" and this is what
> triggered this thread.
> >>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>> I'm fine using "except if the machine state prevents
> >>>>>>>>>>>>>>>>>>>>> it" but "best effort" is too open and can be very
> badly and wrongly
> >>>>>>>>>>>>>>>>>>>>> perceived by users (like I did).
> >>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>> Romain Manni-Bucau
> >>>>>>>>>>>>>>>>>>>>> @rmannibucau |  Blog | Old Blog | Github | LinkedIn |
> >>>>>>>>>>>>>>>>>>>>> Book
> >>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>> 2018-02-18 18:13 GMT+01:00 Eugene Kirpichov
> >>>>>>>>>>>>>>>>>>>>> <kirpic...@google.com>:
> >>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>> It will not be called if it's impossible to call it:
> >>>>>>>>>>>>>>>>>>>>>> in the example situation you have (intergalactic
> crash), and in a number of
> >>>>>>>>>>>>>>>>>>>>>> more common cases: eg in case the worker container
> has crashed (eg user code
> >>>>>>>>>>>>>>>>>>>>>> in a different thread called a C library over JNI
> and it segfaulted), JVM
> >>>>>>>>>>>>>>>>>>>>>> bug, crash due to user code OOM, in case the worker
> has lost network
> >>>>>>>>>>>>>>>>>>>>>> connectivity (then it may be called but it won't be
> able to do anything
> >>>>>>>>>>>>>>>>>>>>>> useful), in case this is running on a preemptible
> VM and it was preempted by
> >>>>>>>>>>>>>>>>>>>>>> the underlying cluster manager without notice or if
> the worker was too busy
> >>>>>>>>>>>>>>>>>>>>>> with other stuff (eg calling other Teardown
> functions) until the preemption
> >>>>>>>>>>>>>>>>>>>>>> timeout elapsed, in case the underlying hardware
> simply failed (which
> >>>>>>>>>>>>>>>>>>>>>> happens quite often at scale), and in many other
> conditions.
> >>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>> "Best effort" is the commonly used term to describe
> >>>>>>>>>>>>>>>>>>>>>> such behavior. Please feel free to file bugs for
> cases where you observed a
> >>>>>>>>>>>>>>>>>>>>>> runner not call Teardown in a situation where it
> was possible to call it but
> >>>>>>>>>>>>>>>>>>>>>> the runner made insufficient effort.
> >>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>> On Sun, Feb 18, 2018, 9:02 AM Romain Manni-Bucau
> >>>>>>>>>>>>>>>>>>>>>> <rmannibu...@gmail.com> wrote:
> >>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>> 2018-02-18 18:00 GMT+01:00 Eugene Kirpichov
> >>>>>>>>>>>>>>>>>>>>>>> <kirpic...@google.com>:
> >>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>> On Sun, Feb 18, 2018, 2:06 AM Romain Manni-Bucau
> >>>>>>>>>>>>>>>>>>>>>>>> <rmannibu...@gmail.com> wrote:
> >>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>> Le 18 févr. 2018 00:23, "Kenneth Knowles"
> >>>>>>>>>>>>>>>>>>>>>>>>> <k...@google.com> a écrit :
> >>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>> On Sat, Feb 17, 2018 at 3:09 PM, Romain
> Manni-Bucau
> >>>>>>>>>>>>>>>>>>>>>>>>> <rmannibu...@gmail.com> wrote:
> >>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>> If you give an example of a high-level need
> (e.g.
> >>>>>>>>>>>>>>>>>>>>>>>>>> "I'm trying to write an IO for system $x and it
> requires the following
> >>>>>>>>>>>>>>>>>>>>>>>>>> initialization and the following cleanup logic
> and the following processing
> >>>>>>>>>>>>>>>>>>>>>>>>>> in between") I'll be better able to help you.
> >>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>> Take a simple example of a transform requiring a
> >>>>>>>>>>>>>>>>>>>>>>>>>> connection. Using bundles is a perf killer
> since size is not controlled.
> >>>>>>>>>>>>>>>>>>>>>>>>>> Using teardown doesnt allow you to release the
> connection since it is a best
> >>>>>>>>>>>>>>>>>>>>>>>>>> effort thing. Not releasing the connection
> makes you pay a lot - aws ;) - or
> >>>>>>>>>>>>>>>>>>>>>>>>>> prevents you to launch other processings -
> concurrent limit.
> >>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>> For this example @Teardown is an exact fit. If
> >>>>>>>>>>>>>>>>>>>>>>>>> things die so badly that @Teardown is not called
> then nothing else can be
> >>>>>>>>>>>>>>>>>>>>>>>>> called to close the connection either. What AWS
> service are you thinking of
> >>>>>>>>>>>>>>>>>>>>>>>>> that stays open for a long time when everything
> at the other end has died?
> >>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>> You assume connections are kind of stateless but
> >>>>>>>>>>>>>>>>>>>>>>>>> some (proprietary) protocols requires some
> closing exchanges which are not
> >>>>>>>>>>>>>>>>>>>>>>>>> only "im leaving".
> >>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>> For aws i was thinking about starting some
> services
> >>>>>>>>>>>>>>>>>>>>>>>>> - machines - on the fly in a pipeline startup
> and closing them at the end.
> >>>>>>>>>>>>>>>>>>>>>>>>> If teardown is not called you leak machines and
> money. You can say it can be
> >>>>>>>>>>>>>>>>>>>>>>>>> done another way...as the full pipeline ;).
> >>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>> I dont want to be picky but if beam cant handle
> its
> >>>>>>>>>>>>>>>>>>>>>>>>> components lifecycle it can be used at scale for
> generic pipelines and if
> >>>>>>>>>>>>>>>>>>>>>>>>> bound to some particular IO.
> >>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>> What does prevent to enforce teardown - ignoring
> >>>>>>>>>>>>>>>>>>>>>>>>> the interstellar crash case which cant be
> handled by any human system?
> >>>>>>>>>>>>>>>>>>>>>>>>> Nothing technically. Why do you push to not
> handle it? Is it due to some
> >>>>>>>>>>>>>>>>>>>>>>>>> legacy code on dataflow or something else?
> >>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>> Teardown *is* already documented and implemented
> >>>>>>>>>>>>>>>>>>>>>>>> this way (best-effort). So I'm not sure what kind
> of change you're asking
> >>>>>>>>>>>>>>>>>>>>>>>> for.
> >>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>> Remove "best effort" from the javadoc. If it is not
> >>>>>>>>>>>>>>>>>>>>>>> call then it is a bug and we are done :).
> >>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>> Also what does it mean for the users? Direct
> runner
> >>>>>>>>>>>>>>>>>>>>>>>>> does it so if a user udes the RI in test, he
> will get a different behavior
> >>>>>>>>>>>>>>>>>>>>>>>>> in prod? Also dont forget the user doesnt know
> what the IOs he composes use
> >>>>>>>>>>>>>>>>>>>>>>>>> so this is so impacting for the whole product
> than he must be handled IMHO.
> >>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>> I understand the portability culture is new in
> big
> >>>>>>>>>>>>>>>>>>>>>>>>> data world but it is not a reason to ignore what
> people did for years and do
> >>>>>>>>>>>>>>>>>>>>>>>>> it wrong before doing right ;).
> >>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>> My proposal is to list what can prevent to
> >>>>>>>>>>>>>>>>>>>>>>>>> guarantee - in the normal IT conditions - the
> execution of teardown. Then we
> >>>>>>>>>>>>>>>>>>>>>>>>> see if we can handle it and only if there is a
> technical reason we cant we
> >>>>>>>>>>>>>>>>>>>>>>>>> make it experimental/unsupported in the api. I
> know spark and flink can, any
> >>>>>>>>>>>>>>>>>>>>>>>>> unknown blocker for other runners?
> >>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>> Technical note: even a kill should go through
> java
> >>>>>>>>>>>>>>>>>>>>>>>>> shutdown hooks otherwise your environment (beam
> enclosing software) is fully
> >>>>>>>>>>>>>>>>>>>>>>>>> unhandled and your overall system is
> uncontrolled. Only case where it is not
> >>>>>>>>>>>>>>>>>>>>>>>>> true is when the software is always owned by a
> vendor and never installed on
> >>>>>>>>>>>>>>>>>>>>>>>>> customer environment. In this case it belongd to
> the vendor to handle beam
> >>>>>>>>>>>>>>>>>>>>>>>>> API and not to beam to adjust its API for a
> vendor - otherwise all
> >>>>>>>>>>>>>>>>>>>>>>>>> unsupported features by one runner should be
> made optional right?
> >>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>> All state is not about network, even in
> distributed
> >>>>>>>>>>>>>>>>>>>>>>>>> systems so this is key to have an explicit and
> defined lifecycle.
> >>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>> Kenn
> >>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>>
> >>>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>
> >>>
> >>
> >>
> >
>

Reply via email to