I've sent out a PR editing the Javadoc
https://github.com/apache/beam/pull/4711 . Hopefully, that should be
sufficient.

On Mon, Feb 19, 2018 at 3:20 PM Reuven Lax <re...@google.com> wrote:

> Ismael, your understanding is appropriate for FinishBundle.
>
> One basic issue with this understanding, is that the lifecycle of a DoFn
> is much longer than a single bundle (which I think you expressed by adding
> the *s). How long the DoFn lives is not defined. In fact a runner is
> completely free to decide that it will _never_ destroy the DoFn, in which
> case TearDown is never called simply because the DoFn was never torn down.
>
> Also, as mentioned before, the runner can only call TearDown in cases
> where the shutdown is in its control. If the JVM is shut down externally,
> the runner has no chance to call TearDown. This means that while TearDown
> is appropriate for cleaning up in-process resources (open connections,
> etc.), it's not the right answer for cleaning up persistent resources. If
> you rely on TearDown to delete VMs or delete files, there will be cases in
> which those files of VMs are not deleted.
>
> What we are _not_ saying is that the runner is free to just ignore
> TearDown. If the runner is explicitly destroying a DoFn object, it should
> call TearDown.
>
> Reuven
>
>
> On Mon, Feb 19, 2018 at 2:35 PM, Ismaël Mejía <ieme...@gmail.com> wrote:
>
>> I also had a different understanding of the lifecycle of a DoFn.
>>
>> My understanding of the use case for every method in the DoFn was clear
>> and
>> perfectly aligned with Thomas explanation, but what I understood was that
>> in a
>> general terms ‘@Setup was where I got resources/prepare connections and
>> @Teardown where I free them’, so calling Teardown seemed essential to
>> have a
>> complete lifecycle:
>> Setup → StartBundle* → ProcessElement* → FinishBundle* → Teardown
>>
>> The fact that @Teardown could not be called is a new detail for me too,
>> and I
>> also find weird to have a method that may or not be called as part of an
>> API,
>> why would users implement teardown if it will not be called? In that case
>> probably a cleaner approach would be to get rid of that method
>> altogether, no?
>>
>> But well maybe that’s not so easy too, there was another point: Some user
>> reported an issue with leaking resources using KafkaIO in the Spark
>> runner, for
>> ref.
>> https://apachebeam.slack.com/archives/C1AAFJYMP/p1510596938000622
>>
>> In that moment my understanding was that there was something fishy
>> because we
>> should be calling Teardown to close correctly the connections and free the
>> resources in case of exceptions on start/process/finish, so I filled a
>> JIRA and
>> fixed this by enforcing the call of teardown for the Spark runner and the
>> Flink
>> runner:
>> https://issues.apache.org/jira/browse/BEAM-3187
>> https://issues.apache.org/jira/browse/BEAM-3244
>>
>> As you can see not calling this method does have consequences at least for
>> non-containerized runners. Of course a runner that uses containers could
>> not
>> care about cleaning the resources this way, but a long living JVM in a
>> Hadoop
>> environment probably won’t have the same luck. So I am not sure that
>> having a
>> loose semantic there is the right option, I mean, runners could simply
>> guarantee
>> that they call teardown and if teardown takes too long they can decide to
>> send a
>> signal or kill the process/container/etc and go ahead, that way at least
>> users
>> would have a motivation to implement the teardown method, otherwise it
>> doesn’t
>> make any sense to have it (API wise).
>>
>> On Mon, Feb 19, 2018 at 11:30 PM, Eugene Kirpichov <kirpic...@google.com>
>> wrote:
>> > Romain, would it be fair to say that currently the goal of your
>> > participation in this discussion is to identify situations where
>> @Teardown
>> > in principle could have been called, but some of the current runners
>> don't
>> > make a good enough effort to call it? If yes - as I said before,
>> please, by
>> > all means, file bugs of the form "Runner X doesn't call @Teardown in
>> > situation Y" if you're aware of any, and feel free to send PRs fixing
>> runner
>> > X to reliably call @Teardown in situation Y. I think we all agree that
>> this
>> > would be a good improvement.
>> >
>> > On Mon, Feb 19, 2018 at 2:03 PM Romain Manni-Bucau <
>> rmannibu...@gmail.com>
>> > wrote:
>> >>
>> >>
>> >>
>> >> Le 19 févr. 2018 22:56, "Reuven Lax" <re...@google.com> a écrit :
>> >>
>> >>
>> >>
>> >> On Mon, Feb 19, 2018 at 1:51 PM, Romain Manni-Bucau
>> >> <rmannibu...@gmail.com> wrote:
>> >>>
>> >>>
>> >>>
>> >>> Le 19 févr. 2018 21:28, "Reuven Lax" <re...@google.com> a écrit :
>> >>>
>> >>> How do you call teardown? There are cases in which the Java code gets
>> no
>> >>> indication that the restart is happening (e.g. cases where the machine
>> >>> itself is taken down)
>> >>>
>> >>>
>> >>> This is a bug, 0 downtime maintenance is very doable in 2018 ;).
>> Crashes
>> >>> are bugs, kill -9 to shutdown is a bug too. Other cases let call
>> shutdown
>> >>> with a hook worse case.
>> >>
>> >>
>> >> What you say here is simply not true.
>> >>
>> >> There are many scenarios in which workers shutdown with no opportunity
>> for
>> >> any sort of shutdown hook. Sometimes the entire machine gets shutdown,
>> and
>> >> not even the OS will have much of a chance to do anything. At scale
>> this
>> >> will happen with some regularity, and a distributed system that
>> assumes this
>> >> will not happen is a poor distributed system.
>> >>
>> >>
>> >> This is part of the infra and there is no reason the machine is
>> shutdown
>> >> without shutting down what runs on it before except if it is a bug in
>> the
>> >> software or setup. I can hear you maybe dont do it everywhere but
>> there is
>> >> no blocker to do it. Means you can shutdown the machines and guarantee
>> >> teardown is called.
>> >>
>> >> Where i go is simply that it is doable and beam sdk core can assume
>> setup
>> >> is well done. If there is a best effort downside due to that - with the
>> >> meaning you defined - it is an impl bug or a user installation issue.
>> >>
>> >> Technically all is true.
>> >>
>> >> What can prevent teardown is a hardware failure or so. This is fine and
>> >> doesnt need to be in doc since it is life in IT and obvious or must be
>> very
>> >> explicit to avoid current ambiguity.
>> >>
>> >>
>> >>>
>> >>>
>> >>>
>> >>>
>> >>> On Mon, Feb 19, 2018, 12:24 PM Romain Manni-Bucau <
>> rmannibu...@gmail.com>
>> >>> wrote:
>> >>>>
>> >>>> Restarting doesnt mean you dont call teardown. Except a bug there is
>> no
>> >>>> reason - technically - it happens, no reason.
>> >>>>
>> >>>> Le 19 févr. 2018 21:14, "Reuven Lax" <re...@google.com> a écrit :
>> >>>>>
>> >>>>> Workers restarting is not a bug, it's standard often expected.
>> >>>>>
>> >>>>> On Mon, Feb 19, 2018, 12:03 PM Romain Manni-Bucau
>> >>>>> <rmannibu...@gmail.com> wrote:
>> >>>>>>
>> >>>>>> Nothing, as mentionned it is a bug so recovery is a bug recovery
>> >>>>>> (procedure)
>> >>>>>>
>> >>>>>> Le 19 févr. 2018 19:42, "Eugene Kirpichov" <kirpic...@google.com>
>> a
>> >>>>>> écrit :
>> >>>>>>>
>> >>>>>>> So what would you like to happen if there is a crash? The DoFn
>> >>>>>>> instance no longer exists because the JVM it ran on no longer
>> exists. What
>> >>>>>>> should Teardown be called on?
>> >>>>>>>
>> >>>>>>>
>> >>>>>>> On Mon, Feb 19, 2018, 10:20 AM Romain Manni-Bucau
>> >>>>>>> <rmannibu...@gmail.com> wrote:
>> >>>>>>>>
>> >>>>>>>> This is what i want and not 999999 teardowns for 1000000 setups
>> >>>>>>>> until there is an unexpected crash (= a bug).
>> >>>>>>>>
>> >>>>>>>> Le 19 févr. 2018 18:57, "Reuven Lax" <re...@google.com> a écrit
>> :
>> >>>>>>>>>
>> >>>>>>>>>
>> >>>>>>>>>
>> >>>>>>>>> On Mon, Feb 19, 2018 at 7:11 AM, Romain Manni-Bucau
>> >>>>>>>>> <rmannibu...@gmail.com> wrote:
>> >>>>>>>>>>
>> >>>>>>>>>>
>> >>>>>>>>>>
>> >>>>>>>>>> 2018-02-19 15:57 GMT+01:00 Reuven Lax <re...@google.com>:
>> >>>>>>>>>>>
>> >>>>>>>>>>>
>> >>>>>>>>>>>
>> >>>>>>>>>>> On Mon, Feb 19, 2018 at 12:35 AM, Romain Manni-Bucau
>> >>>>>>>>>>> <rmannibu...@gmail.com> wrote:
>> >>>>>>>>>>>>
>> >>>>>>>>>>>> @Reuven: in practise it is created by pool of 256 but leads
>> to
>> >>>>>>>>>>>> the same pattern, the teardown is just a "if (iCreatedThem)
>> releaseThem();"
>> >>>>>>>>>>>
>> >>>>>>>>>>>
>> >>>>>>>>>>> How do you control "256?" Even if you have a pool of 256
>> workers,
>> >>>>>>>>>>> nothing in Beam guarantees how many threads and DoFns are
>> created per
>> >>>>>>>>>>> worker. In theory the runner might decide to create 1000
>> threads on each
>> >>>>>>>>>>> worker.
>> >>>>>>>>>>
>> >>>>>>>>>>
>> >>>>>>>>>> Nop was the other way around, in this case on AWS you can get
>> 256
>> >>>>>>>>>> instances at once but not 512 (which will be 2x256). So when
>> you compute the
>> >>>>>>>>>> distribution you allocate to some fn the role to own the
>> instance lookup and
>> >>>>>>>>>> releasing.
>> >>>>>>>>>
>> >>>>>>>>>
>> >>>>>>>>> I still don't understand. Let's be more precise. If you write
>> the
>> >>>>>>>>> following code:
>> >>>>>>>>>
>> >>>>>>>>>    pCollection.apply(ParDo.of(new MyDoFn()));
>> >>>>>>>>>
>> >>>>>>>>> There is no way to control how many instances of MyDoFn are
>> >>>>>>>>> created. The runner might decided to create a million instances
>> of this
>> >>>>>>>>> class across your worker pool, which means that you will get a
>> million Setup
>> >>>>>>>>> and Teardown calls.
>> >>>>>>>>>
>> >>>>>>>>>>
>> >>>>>>>>>> Anyway this was just an example of an external resource you
>> must
>> >>>>>>>>>> release. Real topic is that beam should define asap a
>> guaranteed generic
>> >>>>>>>>>> lifecycle to let user embrace its programming model.
>> >>>>>>>>>>
>> >>>>>>>>>>>
>> >>>>>>>>>>>
>> >>>>>>>>>>>
>> >>>>>>>>>>>>
>> >>>>>>>>>>>> @Eugene:
>> >>>>>>>>>>>> 1. wait logic is about passing the value which is not always
>> >>>>>>>>>>>> possible (like 15% of cases from my raw estimate)
>> >>>>>>>>>>>> 2. sdf: i'll try to detail why i mention SDF more here
>> >>>>>>>>>>>>
>> >>>>>>>>>>>>
>> >>>>>>>>>>>> Concretely beam exposes a portable API (included in the SDK
>> >>>>>>>>>>>> core). This API defines a *container* API and therefore
>> implies bean
>> >>>>>>>>>>>> lifecycles. I'll not detail them all but just use the
>> sources and dofn (not
>> >>>>>>>>>>>> sdf) to illustrate the idea I'm trying to develop.
>> >>>>>>>>>>>>
>> >>>>>>>>>>>> A. Source
>> >>>>>>>>>>>>
>> >>>>>>>>>>>> A source computes a partition plan with 2 primitives:
>> >>>>>>>>>>>> estimateSize and split. As an user you can expect both to be
>> called on the
>> >>>>>>>>>>>> same bean instance to avoid to pay the same connection
>> cost(s) twice.
>> >>>>>>>>>>>> Concretely:
>> >>>>>>>>>>>>
>> >>>>>>>>>>>> connect()
>> >>>>>>>>>>>> try {
>> >>>>>>>>>>>>   estimateSize()
>> >>>>>>>>>>>>   split()
>> >>>>>>>>>>>> } finally {
>> >>>>>>>>>>>>   disconnect()
>> >>>>>>>>>>>> }
>> >>>>>>>>>>>>
>> >>>>>>>>>>>> this is not guaranteed by the API so you must do:
>> >>>>>>>>>>>>
>> >>>>>>>>>>>> connect()
>> >>>>>>>>>>>> try {
>> >>>>>>>>>>>>   estimateSize()
>> >>>>>>>>>>>> } finally {
>> >>>>>>>>>>>>   disconnect()
>> >>>>>>>>>>>> }
>> >>>>>>>>>>>> connect()
>> >>>>>>>>>>>> try {
>> >>>>>>>>>>>>   split()
>> >>>>>>>>>>>> } finally {
>> >>>>>>>>>>>>   disconnect()
>> >>>>>>>>>>>> }
>> >>>>>>>>>>>>
>> >>>>>>>>>>>> + a workaround with an internal estimate size since this
>> >>>>>>>>>>>> primitive is often called in split but you dont want to
>> connect twice in the
>> >>>>>>>>>>>> second phase.
>> >>>>>>>>>>>>
>> >>>>>>>>>>>> Why do you need that? Simply cause you want to define an API
>> to
>> >>>>>>>>>>>> implement sources which initializes the source bean and
>> destroys it.
>> >>>>>>>>>>>> I insists it is a very very basic concern for such API.
>> However
>> >>>>>>>>>>>> beam doesn't embraces it and doesn't assume it so building
>> any API on top of
>> >>>>>>>>>>>> beam is very hurtful today and for direct beam users you hit
>> the exact same
>> >>>>>>>>>>>> issues - check how IO are implemented, the static utilities
>> which create
>> >>>>>>>>>>>> volatile connections preventing to reuse existing connection
>> in a single
>> >>>>>>>>>>>> method
>> >>>>>>>>>>>> (
>> https://github.com/apache/beam/blob/master/sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java#L862
>> ).
>> >>>>>>>>>>>>
>> >>>>>>>>>>>> Same logic applies to the reader which is then created.
>> >>>>>>>>>>>>
>> >>>>>>>>>>>> B. DoFn & SDF
>> >>>>>>>>>>>>
>> >>>>>>>>>>>> As a fn dev you expect the same from the beam runtime:
>> init();
>> >>>>>>>>>>>> try { while (...) process(); } finally { destroy(); } and
>> that it is
>> >>>>>>>>>>>> executed on the exact same instance to be able to be
>> stateful at that level
>> >>>>>>>>>>>> for expensive connections/operations/flow state handling.
>> >>>>>>>>>>>>
>> >>>>>>>>>>>> As you mentionned with the million example, this sequence
>> should
>> >>>>>>>>>>>> happen for each single instance so 1M times for your example.
>> >>>>>>>>>>>>
>> >>>>>>>>>>>> Now why did I mention SDF several times? Because SDF is a
>> >>>>>>>>>>>> generalisation of both cases (source and dofn). Therefore it
>> creates way
>> >>>>>>>>>>>> more instances and requires to have a way more
>> strict/explicit definition of
>> >>>>>>>>>>>> the exact lifecycle and which instance does what. Since beam
>> handles the
>> >>>>>>>>>>>> full lifecycle of the bean instances it must provide
>> init/destroy hooks
>> >>>>>>>>>>>> (setup/teardown) which can be stateful.
>> >>>>>>>>>>>>
>> >>>>>>>>>>>> If you take the JDBC example which was mentionned earlier.
>> >>>>>>>>>>>> Today, because of the teardown issue it uses bundles. Since
>> bundles size is
>> >>>>>>>>>>>> not defined - and will not with SDF, it must use a pool to
>> be able to reuse
>> >>>>>>>>>>>> a connection instance to not correct performances. Now with
>> the SDF and the
>> >>>>>>>>>>>> split increase, how do you handle the pool size? Generally
>> in batch you use
>> >>>>>>>>>>>> a single connection per thread to avoid to consume all
>> database connections.
>> >>>>>>>>>>>> With a pool you have 2 choices: 1. use a pool of 1, 2. use a
>> pool a bit
>> >>>>>>>>>>>> higher but multiplied by the number of beans you will likely
>> x2 or 3 the
>> >>>>>>>>>>>> connection count and make the execution fail with "no more
>> connection
>> >>>>>>>>>>>> available". I you picked 1 (pool of #1), then you still have
>> to have a
>> >>>>>>>>>>>> reliable teardown by pool instance (close() generally) to
>> ensure you release
>> >>>>>>>>>>>> the pool and don't leak the connection information in the
>> JVM. In all case
>> >>>>>>>>>>>> you come back to the init()/destroy() lifecycle even if you
>> fake to get
>> >>>>>>>>>>>> connections with bundles.
>> >>>>>>>>>>>>
>> >>>>>>>>>>>> Just to make it obvious: SDF mentions are just cause SDF
>> imply
>> >>>>>>>>>>>> all the current issues with the loose definition of the bean
>> lifecycles at
>> >>>>>>>>>>>> an exponential level, nothing else.
>> >>>>>>>>>>>>
>> >>>>>>>>>>>>
>> >>>>>>>>>>>>
>> >>>>>>>>>>>> Romain Manni-Bucau
>> >>>>>>>>>>>> @rmannibucau |  Blog | Old Blog | Github | LinkedIn | Book
>> >>>>>>>>>>>>
>> >>>>>>>>>>>> 2018-02-18 22:32 GMT+01:00 Eugene Kirpichov
>> >>>>>>>>>>>> <kirpic...@google.com>:
>> >>>>>>>>>>>>>
>> >>>>>>>>>>>>> The kind of whole-transform lifecycle you're mentioning can
>> be
>> >>>>>>>>>>>>> accomplished using the Wait transform as I suggested in the
>> thread above,
>> >>>>>>>>>>>>> and I believe it should become the canonical way to do that.
>> >>>>>>>>>>>>>
>> >>>>>>>>>>>>> (Would like to reiterate one more time, as the main author
>> of
>> >>>>>>>>>>>>> most design documents related to SDF and of its
>> implementation in the Java
>> >>>>>>>>>>>>> direct and dataflow runner that SDF is fully unrelated to
>> the topic of
>> >>>>>>>>>>>>> cleanup - I'm very confused as to why it keeps coming up)
>> >>>>>>>>>>>>>
>> >>>>>>>>>>>>>
>> >>>>>>>>>>>>> On Sun, Feb 18, 2018, 1:15 PM Romain Manni-Bucau
>> >>>>>>>>>>>>> <rmannibu...@gmail.com> wrote:
>> >>>>>>>>>>>>>>
>> >>>>>>>>>>>>>> I kind of agree except transforms lack a lifecycle too. My
>> >>>>>>>>>>>>>> understanding is that sdf could be a way to unify it and
>> clean the api.
>> >>>>>>>>>>>>>>
>> >>>>>>>>>>>>>> Otherwise how to normalize - single api -  lifecycle of
>> >>>>>>>>>>>>>> transforms?
>> >>>>>>>>>>>>>>
>> >>>>>>>>>>>>>> Le 18 févr. 2018 21:32, "Ben Chambers" <
>> bchamb...@apache.org>
>> >>>>>>>>>>>>>> a écrit :
>> >>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>> Are you sure that focusing on the cleanup of specific
>> DoFn's
>> >>>>>>>>>>>>>>> is appropriate? Many cases where cleanup is necessary, it
>> is around an
>> >>>>>>>>>>>>>>> entire composite PTransform. I think there have been
>> discussions/proposals
>> >>>>>>>>>>>>>>> around a more methodical "cleanup" option, but those
>> haven't been
>> >>>>>>>>>>>>>>> implemented, to the best of my knowledge.
>> >>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>> For instance, consider the steps of a FileIO:
>> >>>>>>>>>>>>>>> 1. Write to a bunch (N shards) of temporary files
>> >>>>>>>>>>>>>>> 2. When all temporary files are complete, attempt to do a
>> >>>>>>>>>>>>>>> bulk copy to put them in the final destination.
>> >>>>>>>>>>>>>>> 3. Cleanup all the temporary files.
>> >>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>> (This is often desirable because it minimizes the chance
>> of
>> >>>>>>>>>>>>>>> seeing partial/incomplete results in the final
>> destination).
>> >>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>> In the above, you'd want step 1 to execute on many
>> workers,
>> >>>>>>>>>>>>>>> likely using a ParDo (say N different workers).
>> >>>>>>>>>>>>>>> The move step should only happen once, so on one worker.
>> This
>> >>>>>>>>>>>>>>> means it will be a different DoFn, likely with some stuff
>> done to ensure it
>> >>>>>>>>>>>>>>> runs on one worker.
>> >>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>> In such a case, cleanup / @TearDown of the DoFn is not
>> >>>>>>>>>>>>>>> enough. We need an API for a PTransform to schedule some
>> cleanup work for
>> >>>>>>>>>>>>>>> when the transform is "done". In batch this is relatively
>> straightforward,
>> >>>>>>>>>>>>>>> but doesn't exist. This is the source of some problems,
>> such as BigQuery
>> >>>>>>>>>>>>>>> sink leaving files around that have failed to import into
>> BigQuery.
>> >>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>> In streaming this is less straightforward -- do you want
>> to
>> >>>>>>>>>>>>>>> wait until the end of the pipeline? Or do you want to
>> wait until the end of
>> >>>>>>>>>>>>>>> the window? In practice, you just want to wait until you
>> know nobody will
>> >>>>>>>>>>>>>>> need the resource anymore.
>> >>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>> This led to some discussions around a "cleanup" API, where
>> >>>>>>>>>>>>>>> you could have a transform that output resource objects.
>> Each resource
>> >>>>>>>>>>>>>>> object would have logic for cleaning it up. And there
>> would be something
>> >>>>>>>>>>>>>>> that indicated what parts of the pipeline needed that
>> resource, and what
>> >>>>>>>>>>>>>>> kind of temporal lifetime those objects had. As soon as
>> that part of the
>> >>>>>>>>>>>>>>> pipeline had advanced far enough that it would no longer
>> need the resources,
>> >>>>>>>>>>>>>>> they would get cleaned up. This can be done at pipeline
>> shutdown, or
>> >>>>>>>>>>>>>>> incrementally during a streaming pipeline, etc.
>> >>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>> Would something like this be a better fit for your use
>> case?
>> >>>>>>>>>>>>>>> If not, why is handling teardown within a single DoFn
>> sufficient?
>> >>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>> On Sun, Feb 18, 2018 at 11:53 AM Romain Manni-Bucau
>> >>>>>>>>>>>>>>> <rmannibu...@gmail.com> wrote:
>> >>>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>>> Yes 1M. Lets try to explain you simplifying the overall
>> >>>>>>>>>>>>>>>> execution. Each instance - one fn so likely in a thread
>> of a worker - has
>> >>>>>>>>>>>>>>>> its lifecycle. Caricaturally: "new" and garbage
>> collection.
>> >>>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>>> In practise, new is often an unsafe allocate
>> >>>>>>>>>>>>>>>> (deserialization) but it doesnt matter here.
>> >>>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>>> What i want is any "new" to have a following setup before
>> >>>>>>>>>>>>>>>> any process or stattbundle and the last time beam has
>> the instance before it
>> >>>>>>>>>>>>>>>> is gc-ed and after last finishbundle it calls teardown.
>> >>>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>>> It is as simple as it.
>> >>>>>>>>>>>>>>>> This way no need to comibe fn in a way making a fn not
>> self
>> >>>>>>>>>>>>>>>> contained to implement basic transforms.
>> >>>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>>> Le 18 févr. 2018 20:07, "Reuven Lax" <re...@google.com>
>> a
>> >>>>>>>>>>>>>>>> écrit :
>> >>>>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>>>> On Sun, Feb 18, 2018 at 10:50 AM, Romain Manni-Bucau
>> >>>>>>>>>>>>>>>>> <rmannibu...@gmail.com> wrote:
>> >>>>>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>>>>> Le 18 févr. 2018 19:28, "Ben Chambers"
>> >>>>>>>>>>>>>>>>>> <bchamb...@apache.org> a écrit :
>> >>>>>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>>>>> It feels like his thread may be a bit off-track. Rather
>> >>>>>>>>>>>>>>>>>> than focusing on the semantics of the existing methods
>> -- which have been
>> >>>>>>>>>>>>>>>>>> noted to be meet many existing use cases -- it would
>> be helpful to focus on
>> >>>>>>>>>>>>>>>>>> more on the reason you are looking for something with
>> different semantics.
>> >>>>>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>>>>> Some possibilities (I'm not sure which one you are
>> trying
>> >>>>>>>>>>>>>>>>>> to do):
>> >>>>>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>>>>> 1. Clean-up some external, global resource, that was
>> >>>>>>>>>>>>>>>>>> initialized once during the startup of the pipeline.
>> If this is the case,
>> >>>>>>>>>>>>>>>>>> how are you ensuring it was really only initialized
>> once (and not once per
>> >>>>>>>>>>>>>>>>>> worker, per thread, per instance, etc.)? How do you
>> know when the pipeline
>> >>>>>>>>>>>>>>>>>> should release it? If the answer is "when it reaches
>> step X", then what
>> >>>>>>>>>>>>>>>>>> about a streaming pipeline?
>> >>>>>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>>>>> When the dofn is no more needed logically ie when the
>> >>>>>>>>>>>>>>>>>> batch is done or stream is stopped (manually or by a
>> jvm shutdown)
>> >>>>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>>>> I'm really not following what this means.
>> >>>>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>>>> Let's say that a pipeline is running 1000 workers, and
>> each
>> >>>>>>>>>>>>>>>>> worker is running 1000 threads (each running a copy of
>> the same DoFn). How
>> >>>>>>>>>>>>>>>>> many cleanups do you want (do you want 1000 * 1000 = 1M
>> cleanups) and when
>> >>>>>>>>>>>>>>>>> do you want it called? When the entire pipeline is shut
>> down? When an
>> >>>>>>>>>>>>>>>>> individual worker is about to shut down (which may be
>> temporary - may be
>> >>>>>>>>>>>>>>>>> about to start back up)? Something else?
>> >>>>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>>>>> 2. Finalize some resources that are used within some
>> >>>>>>>>>>>>>>>>>> region of the pipeline. While, the DoFn lifecycle
>> methods are not a good fit
>> >>>>>>>>>>>>>>>>>> for this (they are focused on managing resources
>> within the DoFn), you could
>> >>>>>>>>>>>>>>>>>> model this on how FileIO finalizes the files that it
>> produced. For instance:
>> >>>>>>>>>>>>>>>>>>    a) ParDo generates "resource IDs" (or some token
>> that
>> >>>>>>>>>>>>>>>>>> stores information about resources)
>> >>>>>>>>>>>>>>>>>>    b) "Require Deterministic Input" (to prevent retries
>> >>>>>>>>>>>>>>>>>> from changing resource IDs)
>> >>>>>>>>>>>>>>>>>>    c) ParDo that initializes the resources
>> >>>>>>>>>>>>>>>>>>    d) Pipeline segments that use the resources, and
>> >>>>>>>>>>>>>>>>>> eventually output the fact they're done
>> >>>>>>>>>>>>>>>>>>    e) "Require Deterministic Input"
>> >>>>>>>>>>>>>>>>>>    f) ParDo that frees the resources
>> >>>>>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>>>>> By making the use of the resource part of the data it
>> is
>> >>>>>>>>>>>>>>>>>> possible to "checkpoint" which resources may be in use
>> or have been finished
>> >>>>>>>>>>>>>>>>>> by using the require deterministic input. This is
>> important to ensuring
>> >>>>>>>>>>>>>>>>>> everything is actually cleaned up.
>> >>>>>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>>>>> I nees that but generic and not case by case to
>> >>>>>>>>>>>>>>>>>> industrialize some api on top of beam.
>> >>>>>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>>>>> 3. Some other use case that I may be missing? If it is
>> >>>>>>>>>>>>>>>>>> this case, could you elaborate on what you are trying
>> to accomplish? That
>> >>>>>>>>>>>>>>>>>> would help me understand both the problems with
>> existing options and
>> >>>>>>>>>>>>>>>>>> possibly what could be done to help.
>> >>>>>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>>>>> I understand there are sorkaround for almost all cases
>> but
>> >>>>>>>>>>>>>>>>>> means each transform is different in its lifecycle
>> handling  except i
>> >>>>>>>>>>>>>>>>>> dislike it a lot at a scale and as a user since you
>> cant put any unified
>> >>>>>>>>>>>>>>>>>> practise on top of beam, it also makes beam very hard
>> to integrate or to use
>> >>>>>>>>>>>>>>>>>> to build higher level libraries or softwares.
>> >>>>>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>>>>> This is why i tried to not start the workaround
>> >>>>>>>>>>>>>>>>>> discussions and just stay at API level.
>> >>>>>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>>>>> -- Ben
>> >>>>>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>>>>> On Sun, Feb 18, 2018 at 9:56 AM Romain Manni-Bucau
>> >>>>>>>>>>>>>>>>>> <rmannibu...@gmail.com> wrote:
>> >>>>>>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>>>>>> 2018-02-18 18:36 GMT+01:00 Eugene Kirpichov
>> >>>>>>>>>>>>>>>>>>> <kirpic...@google.com>:
>> >>>>>>>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>>>>>>> "Machine state" is overly low-level because many of
>> the
>> >>>>>>>>>>>>>>>>>>>> possible reasons can happen on a perfectly fine
>> machine.
>> >>>>>>>>>>>>>>>>>>>> If you'd like to rephrase it to "it will be called
>> >>>>>>>>>>>>>>>>>>>> except in various situations where it's logically
>> impossible or impractical
>> >>>>>>>>>>>>>>>>>>>> to guarantee that it's called", that's fine. Or you
>> can list some of the
>> >>>>>>>>>>>>>>>>>>>> examples above.
>> >>>>>>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>>>>>> Sounds ok to me
>> >>>>>>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>>>>>>> The main point for the user is, you *will* see
>> >>>>>>>>>>>>>>>>>>>> non-preventable situations where it couldn't be
>> called - it's not just
>> >>>>>>>>>>>>>>>>>>>> intergalactic crashes - so if the logic is very
>> important (e.g. cleaning up
>> >>>>>>>>>>>>>>>>>>>> a large amount of temporary files, shutting down a
>> large number of VMs you
>> >>>>>>>>>>>>>>>>>>>> started etc), you have to express it using one of
>> the other methods that
>> >>>>>>>>>>>>>>>>>>>> have stricter guarantees (which obviously come at a
>> cost, e.g. no
>> >>>>>>>>>>>>>>>>>>>> pass-by-reference).
>> >>>>>>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>>>>>> FinishBundle has the exact same guarantee sadly so not
>> >>>>>>>>>>>>>>>>>>> which which other method you speak about. Concretely
>> if you make it really
>> >>>>>>>>>>>>>>>>>>> unreliable - this is what best effort sounds to me -
>> then users can use it
>> >>>>>>>>>>>>>>>>>>> to clean anything but if you make it "can happen but
>> it is unexpected and
>> >>>>>>>>>>>>>>>>>>> means something happent" then it is fine to have a
>> manual - or auto if fancy
>> >>>>>>>>>>>>>>>>>>> - recovery procedure. This is where it makes all the
>> difference and impacts
>> >>>>>>>>>>>>>>>>>>> the developpers, ops (all users basically).
>> >>>>>>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>>>>>>> On Sun, Feb 18, 2018 at 9:16 AM Romain Manni-Bucau
>> >>>>>>>>>>>>>>>>>>>> <rmannibu...@gmail.com> wrote:
>> >>>>>>>>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>>>>>>>> Agree Eugene except that "best effort" means that.
>> It
>> >>>>>>>>>>>>>>>>>>>>> is also often used to say "at will" and this is
>> what triggered this thread.
>> >>>>>>>>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>>>>>>>> I'm fine using "except if the machine state prevents
>> >>>>>>>>>>>>>>>>>>>>> it" but "best effort" is too open and can be very
>> badly and wrongly
>> >>>>>>>>>>>>>>>>>>>>> perceived by users (like I did).
>> >>>>>>>>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>>>>>>>> Romain Manni-Bucau
>> >>>>>>>>>>>>>>>>>>>>> @rmannibucau |  Blog | Old Blog | Github | LinkedIn
>> |
>> >>>>>>>>>>>>>>>>>>>>> Book
>> >>>>>>>>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>>>>>>>> 2018-02-18 18:13 GMT+01:00 Eugene Kirpichov
>> >>>>>>>>>>>>>>>>>>>>> <kirpic...@google.com>:
>> >>>>>>>>>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>>>>>>>>> It will not be called if it's impossible to call
>> it:
>> >>>>>>>>>>>>>>>>>>>>>> in the example situation you have (intergalactic
>> crash), and in a number of
>> >>>>>>>>>>>>>>>>>>>>>> more common cases: eg in case the worker container
>> has crashed (eg user code
>> >>>>>>>>>>>>>>>>>>>>>> in a different thread called a C library over JNI
>> and it segfaulted), JVM
>> >>>>>>>>>>>>>>>>>>>>>> bug, crash due to user code OOM, in case the
>> worker has lost network
>> >>>>>>>>>>>>>>>>>>>>>> connectivity (then it may be called but it won't
>> be able to do anything
>> >>>>>>>>>>>>>>>>>>>>>> useful), in case this is running on a preemptible
>> VM and it was preempted by
>> >>>>>>>>>>>>>>>>>>>>>> the underlying cluster manager without notice or
>> if the worker was too busy
>> >>>>>>>>>>>>>>>>>>>>>> with other stuff (eg calling other Teardown
>> functions) until the preemption
>> >>>>>>>>>>>>>>>>>>>>>> timeout elapsed, in case the underlying hardware
>> simply failed (which
>> >>>>>>>>>>>>>>>>>>>>>> happens quite often at scale), and in many other
>> conditions.
>> >>>>>>>>>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>>>>>>>>> "Best effort" is the commonly used term to describe
>> >>>>>>>>>>>>>>>>>>>>>> such behavior. Please feel free to file bugs for
>> cases where you observed a
>> >>>>>>>>>>>>>>>>>>>>>> runner not call Teardown in a situation where it
>> was possible to call it but
>> >>>>>>>>>>>>>>>>>>>>>> the runner made insufficient effort.
>> >>>>>>>>>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>>>>>>>>> On Sun, Feb 18, 2018, 9:02 AM Romain Manni-Bucau
>> >>>>>>>>>>>>>>>>>>>>>> <rmannibu...@gmail.com> wrote:
>> >>>>>>>>>>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>>>>>>>>>> 2018-02-18 18:00 GMT+01:00 Eugene Kirpichov
>> >>>>>>>>>>>>>>>>>>>>>>> <kirpic...@google.com>:
>> >>>>>>>>>>>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>>>>>>>>>>> On Sun, Feb 18, 2018, 2:06 AM Romain Manni-Bucau
>> >>>>>>>>>>>>>>>>>>>>>>>> <rmannibu...@gmail.com> wrote:
>> >>>>>>>>>>>>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>>>>>>>>>>>> Le 18 févr. 2018 00:23, "Kenneth Knowles"
>> >>>>>>>>>>>>>>>>>>>>>>>>> <k...@google.com> a écrit :
>> >>>>>>>>>>>>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>>>>>>>>>>>> On Sat, Feb 17, 2018 at 3:09 PM, Romain
>> Manni-Bucau
>> >>>>>>>>>>>>>>>>>>>>>>>>> <rmannibu...@gmail.com> wrote:
>> >>>>>>>>>>>>>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>>>>>>>>>>>>> If you give an example of a high-level need
>> (e.g.
>> >>>>>>>>>>>>>>>>>>>>>>>>>> "I'm trying to write an IO for system $x and
>> it requires the following
>> >>>>>>>>>>>>>>>>>>>>>>>>>> initialization and the following cleanup logic
>> and the following processing
>> >>>>>>>>>>>>>>>>>>>>>>>>>> in between") I'll be better able to help you.
>> >>>>>>>>>>>>>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>>>>>>>>>>>>> Take a simple example of a transform requiring
>> a
>> >>>>>>>>>>>>>>>>>>>>>>>>>> connection. Using bundles is a perf killer
>> since size is not controlled.
>> >>>>>>>>>>>>>>>>>>>>>>>>>> Using teardown doesnt allow you to release the
>> connection since it is a best
>> >>>>>>>>>>>>>>>>>>>>>>>>>> effort thing. Not releasing the connection
>> makes you pay a lot - aws ;) - or
>> >>>>>>>>>>>>>>>>>>>>>>>>>> prevents you to launch other processings -
>> concurrent limit.
>> >>>>>>>>>>>>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>>>>>>>>>>>> For this example @Teardown is an exact fit. If
>> >>>>>>>>>>>>>>>>>>>>>>>>> things die so badly that @Teardown is not
>> called then nothing else can be
>> >>>>>>>>>>>>>>>>>>>>>>>>> called to close the connection either. What AWS
>> service are you thinking of
>> >>>>>>>>>>>>>>>>>>>>>>>>> that stays open for a long time when everything
>> at the other end has died?
>> >>>>>>>>>>>>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>>>>>>>>>>>> You assume connections are kind of stateless but
>> >>>>>>>>>>>>>>>>>>>>>>>>> some (proprietary) protocols requires some
>> closing exchanges which are not
>> >>>>>>>>>>>>>>>>>>>>>>>>> only "im leaving".
>> >>>>>>>>>>>>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>>>>>>>>>>>> For aws i was thinking about starting some
>> services
>> >>>>>>>>>>>>>>>>>>>>>>>>> - machines - on the fly in a pipeline startup
>> and closing them at the end.
>> >>>>>>>>>>>>>>>>>>>>>>>>> If teardown is not called you leak machines and
>> money. You can say it can be
>> >>>>>>>>>>>>>>>>>>>>>>>>> done another way...as the full pipeline ;).
>> >>>>>>>>>>>>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>>>>>>>>>>>> I dont want to be picky but if beam cant handle
>> its
>> >>>>>>>>>>>>>>>>>>>>>>>>> components lifecycle it can be used at scale
>> for generic pipelines and if
>> >>>>>>>>>>>>>>>>>>>>>>>>> bound to some particular IO.
>> >>>>>>>>>>>>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>>>>>>>>>>>> What does prevent to enforce teardown - ignoring
>> >>>>>>>>>>>>>>>>>>>>>>>>> the interstellar crash case which cant be
>> handled by any human system?
>> >>>>>>>>>>>>>>>>>>>>>>>>> Nothing technically. Why do you push to not
>> handle it? Is it due to some
>> >>>>>>>>>>>>>>>>>>>>>>>>> legacy code on dataflow or something else?
>> >>>>>>>>>>>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>>>>>>>>>>> Teardown *is* already documented and implemented
>> >>>>>>>>>>>>>>>>>>>>>>>> this way (best-effort). So I'm not sure what
>> kind of change you're asking
>> >>>>>>>>>>>>>>>>>>>>>>>> for.
>> >>>>>>>>>>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>>>>>>>>>> Remove "best effort" from the javadoc. If it is
>> not
>> >>>>>>>>>>>>>>>>>>>>>>> call then it is a bug and we are done :).
>> >>>>>>>>>>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>>>>>>>>>>>> Also what does it mean for the users? Direct
>> runner
>> >>>>>>>>>>>>>>>>>>>>>>>>> does it so if a user udes the RI in test, he
>> will get a different behavior
>> >>>>>>>>>>>>>>>>>>>>>>>>> in prod? Also dont forget the user doesnt know
>> what the IOs he composes use
>> >>>>>>>>>>>>>>>>>>>>>>>>> so this is so impacting for the whole product
>> than he must be handled IMHO.
>> >>>>>>>>>>>>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>>>>>>>>>>>> I understand the portability culture is new in
>> big
>> >>>>>>>>>>>>>>>>>>>>>>>>> data world but it is not a reason to ignore
>> what people did for years and do
>> >>>>>>>>>>>>>>>>>>>>>>>>> it wrong before doing right ;).
>> >>>>>>>>>>>>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>>>>>>>>>>>> My proposal is to list what can prevent to
>> >>>>>>>>>>>>>>>>>>>>>>>>> guarantee - in the normal IT conditions - the
>> execution of teardown. Then we
>> >>>>>>>>>>>>>>>>>>>>>>>>> see if we can handle it and only if there is a
>> technical reason we cant we
>> >>>>>>>>>>>>>>>>>>>>>>>>> make it experimental/unsupported in the api. I
>> know spark and flink can, any
>> >>>>>>>>>>>>>>>>>>>>>>>>> unknown blocker for other runners?
>> >>>>>>>>>>>>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>>>>>>>>>>>> Technical note: even a kill should go through
>> java
>> >>>>>>>>>>>>>>>>>>>>>>>>> shutdown hooks otherwise your environment (beam
>> enclosing software) is fully
>> >>>>>>>>>>>>>>>>>>>>>>>>> unhandled and your overall system is
>> uncontrolled. Only case where it is not
>> >>>>>>>>>>>>>>>>>>>>>>>>> true is when the software is always owned by a
>> vendor and never installed on
>> >>>>>>>>>>>>>>>>>>>>>>>>> customer environment. In this case it belongd
>> to the vendor to handle beam
>> >>>>>>>>>>>>>>>>>>>>>>>>> API and not to beam to adjust its API for a
>> vendor - otherwise all
>> >>>>>>>>>>>>>>>>>>>>>>>>> unsupported features by one runner should be
>> made optional right?
>> >>>>>>>>>>>>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>>>>>>>>>>>> All state is not about network, even in
>> distributed
>> >>>>>>>>>>>>>>>>>>>>>>>>> systems so this is key to have an explicit and
>> defined lifecycle.
>> >>>>>>>>>>>>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>>>>>>>>>>>> Kenn
>> >>>>>>>>>>>>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>>>>>
>> >>>>>>>>>>>>>>>>>
>> >>>>>>>>>>>>
>> >>>>>>>>>>>
>> >>>>>>>>>>
>> >>>>>>>>>
>> >>>
>> >>
>> >>
>> >
>>
>
>

Reply via email to