Hello, thanks Eugene for improving the documentation so we can close
this thread.

Reuven, I understood the semantics of the methods, what surprised me was that I
interpreted the new documentation as if a runner could simply ignore to call
@Teardown, and we already have dealt with the issues of not doing this when
there is an exception on the element methods
(startBundle/processElement/finishBundle),  we can leak resources by not calling
teardown, as the Spark runner user reported in the link I sent.

So considering that a runner should try at best to call that method, I promoted
some of the methods of ParDoLifecycleTest to be ValidatesRunner to ensure that
runners call teardown after exceptions and I filled BEAM-3245 so the
DataflowRunner try at its best to respect the lifecycle when it can. (Note I
auto-assigned this JIRA but it is up to you guys to reassign it to the person
who can work on it).


On Wed, Feb 21, 2018 at 7:26 AM, Reuven Lax <re...@google.com> wrote:
> To close the loop here:
>
> Romain, I think your actual concern was that the Javadoc made it sound like
> a runner could simply decide not to call Teardown. If so, then I agree with
> you - the Javadoc was misleading (and appears it was confusing to Ismael as
> well). If a runner destroys a DoFn, it _must_ call TearDown before it calls
> Setup on a new DoFn.
>
> If so, then most of the back and forth on this thread had little to do with
> your actual concern. However it did take almost three days of discussion
> before Eugene understood what your real concern was, leading to the side
> discussions.
>
> Reuven
>
> On Mon, Feb 19, 2018 at 6:08 PM, Reuven Lax <re...@google.com> wrote:
>>
>> +1 This PR clarifies the semantics quite a bit.
>>
>> On Mon, Feb 19, 2018 at 3:24 PM, Eugene Kirpichov <kirpic...@google.com>
>> wrote:
>>>
>>> I've sent out a PR editing the Javadoc
>>> https://github.com/apache/beam/pull/4711 . Hopefully, that should be
>>> sufficient.
>>>
>>> On Mon, Feb 19, 2018 at 3:20 PM Reuven Lax <re...@google.com> wrote:
>>>>
>>>> Ismael, your understanding is appropriate for FinishBundle.
>>>>
>>>> One basic issue with this understanding, is that the lifecycle of a DoFn
>>>> is much longer than a single bundle (which I think you expressed by adding
>>>> the *s). How long the DoFn lives is not defined. In fact a runner is
>>>> completely free to decide that it will _never_ destroy the DoFn, in which
>>>> case TearDown is never called simply because the DoFn was never torn down.
>>>>
>>>> Also, as mentioned before, the runner can only call TearDown in cases
>>>> where the shutdown is in its control. If the JVM is shut down externally,
>>>> the runner has no chance to call TearDown. This means that while TearDown 
>>>> is
>>>> appropriate for cleaning up in-process resources (open connections, etc.),
>>>> it's not the right answer for cleaning up persistent resources. If you rely
>>>> on TearDown to delete VMs or delete files, there will be cases in which
>>>> those files of VMs are not deleted.
>>>>
>>>> What we are _not_ saying is that the runner is free to just ignore
>>>> TearDown. If the runner is explicitly destroying a DoFn object, it should
>>>> call TearDown.
>>>>
>>>> Reuven
>>>>
>>>>
>>>> On Mon, Feb 19, 2018 at 2:35 PM, Ismaël Mejía <ieme...@gmail.com> wrote:
>>>>>
>>>>> I also had a different understanding of the lifecycle of a DoFn.
>>>>>
>>>>> My understanding of the use case for every method in the DoFn was clear
>>>>> and
>>>>> perfectly aligned with Thomas explanation, but what I understood was
>>>>> that in a
>>>>> general terms ‘@Setup was where I got resources/prepare connections and
>>>>> @Teardown where I free them’, so calling Teardown seemed essential to
>>>>> have a
>>>>> complete lifecycle:
>>>>> Setup → StartBundle* → ProcessElement* → FinishBundle* → Teardown
>>>>>
>>>>> The fact that @Teardown could not be called is a new detail for me too,
>>>>> and I
>>>>> also find weird to have a method that may or not be called as part of
>>>>> an API,
>>>>> why would users implement teardown if it will not be called? In that
>>>>> case
>>>>> probably a cleaner approach would be to get rid of that method
>>>>> altogether, no?
>>>>>
>>>>> But well maybe that’s not so easy too, there was another point: Some
>>>>> user
>>>>> reported an issue with leaking resources using KafkaIO in the Spark
>>>>> runner, for
>>>>> ref.
>>>>> https://apachebeam.slack.com/archives/C1AAFJYMP/p1510596938000622
>>>>>
>>>>> In that moment my understanding was that there was something fishy
>>>>> because we
>>>>> should be calling Teardown to close correctly the connections and free
>>>>> the
>>>>> resources in case of exceptions on start/process/finish, so I filled a
>>>>> JIRA and
>>>>> fixed this by enforcing the call of teardown for the Spark runner and
>>>>> the Flink
>>>>> runner:
>>>>> https://issues.apache.org/jira/browse/BEAM-3187
>>>>> https://issues.apache.org/jira/browse/BEAM-3244
>>>>>
>>>>> As you can see not calling this method does have consequences at least
>>>>> for
>>>>> non-containerized runners. Of course a runner that uses containers
>>>>> could not
>>>>> care about cleaning the resources this way, but a long living JVM in a
>>>>> Hadoop
>>>>> environment probably won’t have the same luck. So I am not sure that
>>>>> having a
>>>>> loose semantic there is the right option, I mean, runners could simply
>>>>> guarantee
>>>>> that they call teardown and if teardown takes too long they can decide
>>>>> to send a
>>>>> signal or kill the process/container/etc and go ahead, that way at
>>>>> least users
>>>>> would have a motivation to implement the teardown method, otherwise it
>>>>> doesn’t
>>>>> make any sense to have it (API wise).
>>>>>
>>>>> On Mon, Feb 19, 2018 at 11:30 PM, Eugene Kirpichov
>>>>> <kirpic...@google.com> wrote:
>>>>> > Romain, would it be fair to say that currently the goal of your
>>>>> > participation in this discussion is to identify situations where
>>>>> > @Teardown
>>>>> > in principle could have been called, but some of the current runners
>>>>> > don't
>>>>> > make a good enough effort to call it? If yes - as I said before,
>>>>> > please, by
>>>>> > all means, file bugs of the form "Runner X doesn't call @Teardown in
>>>>> > situation Y" if you're aware of any, and feel free to send PRs fixing
>>>>> > runner
>>>>> > X to reliably call @Teardown in situation Y. I think we all agree
>>>>> > that this
>>>>> > would be a good improvement.
>>>>> >
>>>>> > On Mon, Feb 19, 2018 at 2:03 PM Romain Manni-Bucau
>>>>> > <rmannibu...@gmail.com>
>>>>> > wrote:
>>>>> >>
>>>>> >>
>>>>> >>
>>>>> >> Le 19 févr. 2018 22:56, "Reuven Lax" <re...@google.com> a écrit :
>>>>> >>
>>>>> >>
>>>>> >>
>>>>> >> On Mon, Feb 19, 2018 at 1:51 PM, Romain Manni-Bucau
>>>>> >> <rmannibu...@gmail.com> wrote:
>>>>> >>>
>>>>> >>>
>>>>> >>>
>>>>> >>> Le 19 févr. 2018 21:28, "Reuven Lax" <re...@google.com> a écrit :
>>>>> >>>
>>>>> >>> How do you call teardown? There are cases in which the Java code
>>>>> >>> gets no
>>>>> >>> indication that the restart is happening (e.g. cases where the
>>>>> >>> machine
>>>>> >>> itself is taken down)
>>>>> >>>
>>>>> >>>
>>>>> >>> This is a bug, 0 downtime maintenance is very doable in 2018 ;).
>>>>> >>> Crashes
>>>>> >>> are bugs, kill -9 to shutdown is a bug too. Other cases let call
>>>>> >>> shutdown
>>>>> >>> with a hook worse case.
>>>>> >>
>>>>> >>
>>>>> >> What you say here is simply not true.
>>>>> >>
>>>>> >> There are many scenarios in which workers shutdown with no
>>>>> >> opportunity for
>>>>> >> any sort of shutdown hook. Sometimes the entire machine gets
>>>>> >> shutdown, and
>>>>> >> not even the OS will have much of a chance to do anything. At scale
>>>>> >> this
>>>>> >> will happen with some regularity, and a distributed system that
>>>>> >> assumes this
>>>>> >> will not happen is a poor distributed system.
>>>>> >>
>>>>> >>
>>>>> >> This is part of the infra and there is no reason the machine is
>>>>> >> shutdown
>>>>> >> without shutting down what runs on it before except if it is a bug
>>>>> >> in the
>>>>> >> software or setup. I can hear you maybe dont do it everywhere but
>>>>> >> there is
>>>>> >> no blocker to do it. Means you can shutdown the machines and
>>>>> >> guarantee
>>>>> >> teardown is called.
>>>>> >>
>>>>> >> Where i go is simply that it is doable and beam sdk core can assume
>>>>> >> setup
>>>>> >> is well done. If there is a best effort downside due to that - with
>>>>> >> the
>>>>> >> meaning you defined - it is an impl bug or a user installation
>>>>> >> issue.
>>>>> >>
>>>>> >> Technically all is true.
>>>>> >>
>>>>> >> What can prevent teardown is a hardware failure or so. This is fine
>>>>> >> and
>>>>> >> doesnt need to be in doc since it is life in IT and obvious or must
>>>>> >> be very
>>>>> >> explicit to avoid current ambiguity.
>>>>> >>
>>>>> >>
>>>>> >>>
>>>>> >>>
>>>>> >>>
>>>>> >>>
>>>>> >>> On Mon, Feb 19, 2018, 12:24 PM Romain Manni-Bucau
>>>>> >>> <rmannibu...@gmail.com>
>>>>> >>> wrote:
>>>>> >>>>
>>>>> >>>> Restarting doesnt mean you dont call teardown. Except a bug there
>>>>> >>>> is no
>>>>> >>>> reason - technically - it happens, no reason.
>>>>> >>>>
>>>>> >>>> Le 19 févr. 2018 21:14, "Reuven Lax" <re...@google.com> a écrit :
>>>>> >>>>>
>>>>> >>>>> Workers restarting is not a bug, it's standard often expected.
>>>>> >>>>>
>>>>> >>>>> On Mon, Feb 19, 2018, 12:03 PM Romain Manni-Bucau
>>>>> >>>>> <rmannibu...@gmail.com> wrote:
>>>>> >>>>>>
>>>>> >>>>>> Nothing, as mentionned it is a bug so recovery is a bug recovery
>>>>> >>>>>> (procedure)
>>>>> >>>>>>
>>>>> >>>>>> Le 19 févr. 2018 19:42, "Eugene Kirpichov"
>>>>> >>>>>> <kirpic...@google.com> a
>>>>> >>>>>> écrit :
>>>>> >>>>>>>
>>>>> >>>>>>> So what would you like to happen if there is a crash? The DoFn
>>>>> >>>>>>> instance no longer exists because the JVM it ran on no longer
>>>>> >>>>>>> exists. What
>>>>> >>>>>>> should Teardown be called on?
>>>>> >>>>>>>
>>>>> >>>>>>>
>>>>> >>>>>>> On Mon, Feb 19, 2018, 10:20 AM Romain Manni-Bucau
>>>>> >>>>>>> <rmannibu...@gmail.com> wrote:
>>>>> >>>>>>>>
>>>>> >>>>>>>> This is what i want and not 999999 teardowns for 1000000
>>>>> >>>>>>>> setups
>>>>> >>>>>>>> until there is an unexpected crash (= a bug).
>>>>> >>>>>>>>
>>>>> >>>>>>>> Le 19 févr. 2018 18:57, "Reuven Lax" <re...@google.com> a
>>>>> >>>>>>>> écrit :
>>>>> >>>>>>>>>
>>>>> >>>>>>>>>
>>>>> >>>>>>>>>
>>>>> >>>>>>>>> On Mon, Feb 19, 2018 at 7:11 AM, Romain Manni-Bucau
>>>>> >>>>>>>>> <rmannibu...@gmail.com> wrote:
>>>>> >>>>>>>>>>
>>>>> >>>>>>>>>>
>>>>> >>>>>>>>>>
>>>>> >>>>>>>>>> 2018-02-19 15:57 GMT+01:00 Reuven Lax <re...@google.com>:
>>>>> >>>>>>>>>>>
>>>>> >>>>>>>>>>>
>>>>> >>>>>>>>>>>
>>>>> >>>>>>>>>>> On Mon, Feb 19, 2018 at 12:35 AM, Romain Manni-Bucau
>>>>> >>>>>>>>>>> <rmannibu...@gmail.com> wrote:
>>>>> >>>>>>>>>>>>
>>>>> >>>>>>>>>>>> @Reuven: in practise it is created by pool of 256 but
>>>>> >>>>>>>>>>>> leads to
>>>>> >>>>>>>>>>>> the same pattern, the teardown is just a "if
>>>>> >>>>>>>>>>>> (iCreatedThem) releaseThem();"
>>>>> >>>>>>>>>>>
>>>>> >>>>>>>>>>>
>>>>> >>>>>>>>>>> How do you control "256?" Even if you have a pool of 256
>>>>> >>>>>>>>>>> workers,
>>>>> >>>>>>>>>>> nothing in Beam guarantees how many threads and DoFns are
>>>>> >>>>>>>>>>> created per
>>>>> >>>>>>>>>>> worker. In theory the runner might decide to create 1000
>>>>> >>>>>>>>>>> threads on each
>>>>> >>>>>>>>>>> worker.
>>>>> >>>>>>>>>>
>>>>> >>>>>>>>>>
>>>>> >>>>>>>>>> Nop was the other way around, in this case on AWS you can
>>>>> >>>>>>>>>> get 256
>>>>> >>>>>>>>>> instances at once but not 512 (which will be 2x256). So when
>>>>> >>>>>>>>>> you compute the
>>>>> >>>>>>>>>> distribution you allocate to some fn the role to own the
>>>>> >>>>>>>>>> instance lookup and
>>>>> >>>>>>>>>> releasing.
>>>>> >>>>>>>>>
>>>>> >>>>>>>>>
>>>>> >>>>>>>>> I still don't understand. Let's be more precise. If you write
>>>>> >>>>>>>>> the
>>>>> >>>>>>>>> following code:
>>>>> >>>>>>>>>
>>>>> >>>>>>>>>    pCollection.apply(ParDo.of(new MyDoFn()));
>>>>> >>>>>>>>>
>>>>> >>>>>>>>> There is no way to control how many instances of MyDoFn are
>>>>> >>>>>>>>> created. The runner might decided to create a million
>>>>> >>>>>>>>> instances of this
>>>>> >>>>>>>>> class across your worker pool, which means that you will get
>>>>> >>>>>>>>> a million Setup
>>>>> >>>>>>>>> and Teardown calls.
>>>>> >>>>>>>>>
>>>>> >>>>>>>>>>
>>>>> >>>>>>>>>> Anyway this was just an example of an external resource you
>>>>> >>>>>>>>>> must
>>>>> >>>>>>>>>> release. Real topic is that beam should define asap a
>>>>> >>>>>>>>>> guaranteed generic
>>>>> >>>>>>>>>> lifecycle to let user embrace its programming model.
>>>>> >>>>>>>>>>
>>>>> >>>>>>>>>>>
>>>>> >>>>>>>>>>>
>>>>> >>>>>>>>>>>
>>>>> >>>>>>>>>>>>
>>>>> >>>>>>>>>>>> @Eugene:
>>>>> >>>>>>>>>>>> 1. wait logic is about passing the value which is not
>>>>> >>>>>>>>>>>> always
>>>>> >>>>>>>>>>>> possible (like 15% of cases from my raw estimate)
>>>>> >>>>>>>>>>>> 2. sdf: i'll try to detail why i mention SDF more here
>>>>> >>>>>>>>>>>>
>>>>> >>>>>>>>>>>>
>>>>> >>>>>>>>>>>> Concretely beam exposes a portable API (included in the
>>>>> >>>>>>>>>>>> SDK
>>>>> >>>>>>>>>>>> core). This API defines a *container* API and therefore
>>>>> >>>>>>>>>>>> implies bean
>>>>> >>>>>>>>>>>> lifecycles. I'll not detail them all but just use the
>>>>> >>>>>>>>>>>> sources and dofn (not
>>>>> >>>>>>>>>>>> sdf) to illustrate the idea I'm trying to develop.
>>>>> >>>>>>>>>>>>
>>>>> >>>>>>>>>>>> A. Source
>>>>> >>>>>>>>>>>>
>>>>> >>>>>>>>>>>> A source computes a partition plan with 2 primitives:
>>>>> >>>>>>>>>>>> estimateSize and split. As an user you can expect both to
>>>>> >>>>>>>>>>>> be called on the
>>>>> >>>>>>>>>>>> same bean instance to avoid to pay the same connection
>>>>> >>>>>>>>>>>> cost(s) twice.
>>>>> >>>>>>>>>>>> Concretely:
>>>>> >>>>>>>>>>>>
>>>>> >>>>>>>>>>>> connect()
>>>>> >>>>>>>>>>>> try {
>>>>> >>>>>>>>>>>>   estimateSize()
>>>>> >>>>>>>>>>>>   split()
>>>>> >>>>>>>>>>>> } finally {
>>>>> >>>>>>>>>>>>   disconnect()
>>>>> >>>>>>>>>>>> }
>>>>> >>>>>>>>>>>>
>>>>> >>>>>>>>>>>> this is not guaranteed by the API so you must do:
>>>>> >>>>>>>>>>>>
>>>>> >>>>>>>>>>>> connect()
>>>>> >>>>>>>>>>>> try {
>>>>> >>>>>>>>>>>>   estimateSize()
>>>>> >>>>>>>>>>>> } finally {
>>>>> >>>>>>>>>>>>   disconnect()
>>>>> >>>>>>>>>>>> }
>>>>> >>>>>>>>>>>> connect()
>>>>> >>>>>>>>>>>> try {
>>>>> >>>>>>>>>>>>   split()
>>>>> >>>>>>>>>>>> } finally {
>>>>> >>>>>>>>>>>>   disconnect()
>>>>> >>>>>>>>>>>> }
>>>>> >>>>>>>>>>>>
>>>>> >>>>>>>>>>>> + a workaround with an internal estimate size since this
>>>>> >>>>>>>>>>>> primitive is often called in split but you dont want to
>>>>> >>>>>>>>>>>> connect twice in the
>>>>> >>>>>>>>>>>> second phase.
>>>>> >>>>>>>>>>>>
>>>>> >>>>>>>>>>>> Why do you need that? Simply cause you want to define an
>>>>> >>>>>>>>>>>> API to
>>>>> >>>>>>>>>>>> implement sources which initializes the source bean and
>>>>> >>>>>>>>>>>> destroys it.
>>>>> >>>>>>>>>>>> I insists it is a very very basic concern for such API.
>>>>> >>>>>>>>>>>> However
>>>>> >>>>>>>>>>>> beam doesn't embraces it and doesn't assume it so building
>>>>> >>>>>>>>>>>> any API on top of
>>>>> >>>>>>>>>>>> beam is very hurtful today and for direct beam users you
>>>>> >>>>>>>>>>>> hit the exact same
>>>>> >>>>>>>>>>>> issues - check how IO are implemented, the static
>>>>> >>>>>>>>>>>> utilities which create
>>>>> >>>>>>>>>>>> volatile connections preventing to reuse existing
>>>>> >>>>>>>>>>>> connection in a single
>>>>> >>>>>>>>>>>> method
>>>>> >>>>>>>>>>>>
>>>>> >>>>>>>>>>>> (https://github.com/apache/beam/blob/master/sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java#L862).
>>>>> >>>>>>>>>>>>
>>>>> >>>>>>>>>>>> Same logic applies to the reader which is then created.
>>>>> >>>>>>>>>>>>
>>>>> >>>>>>>>>>>> B. DoFn & SDF
>>>>> >>>>>>>>>>>>
>>>>> >>>>>>>>>>>> As a fn dev you expect the same from the beam runtime:
>>>>> >>>>>>>>>>>> init();
>>>>> >>>>>>>>>>>> try { while (...) process(); } finally { destroy(); } and
>>>>> >>>>>>>>>>>> that it is
>>>>> >>>>>>>>>>>> executed on the exact same instance to be able to be
>>>>> >>>>>>>>>>>> stateful at that level
>>>>> >>>>>>>>>>>> for expensive connections/operations/flow state handling.
>>>>> >>>>>>>>>>>>
>>>>> >>>>>>>>>>>> As you mentionned with the million example, this sequence
>>>>> >>>>>>>>>>>> should
>>>>> >>>>>>>>>>>> happen for each single instance so 1M times for your
>>>>> >>>>>>>>>>>> example.
>>>>> >>>>>>>>>>>>
>>>>> >>>>>>>>>>>> Now why did I mention SDF several times? Because SDF is a
>>>>> >>>>>>>>>>>> generalisation of both cases (source and dofn). Therefore
>>>>> >>>>>>>>>>>> it creates way
>>>>> >>>>>>>>>>>> more instances and requires to have a way more
>>>>> >>>>>>>>>>>> strict/explicit definition of
>>>>> >>>>>>>>>>>> the exact lifecycle and which instance does what. Since
>>>>> >>>>>>>>>>>> beam handles the
>>>>> >>>>>>>>>>>> full lifecycle of the bean instances it must provide
>>>>> >>>>>>>>>>>> init/destroy hooks
>>>>> >>>>>>>>>>>> (setup/teardown) which can be stateful.
>>>>> >>>>>>>>>>>>
>>>>> >>>>>>>>>>>> If you take the JDBC example which was mentionned earlier.
>>>>> >>>>>>>>>>>> Today, because of the teardown issue it uses bundles.
>>>>> >>>>>>>>>>>> Since bundles size is
>>>>> >>>>>>>>>>>> not defined - and will not with SDF, it must use a pool to
>>>>> >>>>>>>>>>>> be able to reuse
>>>>> >>>>>>>>>>>> a connection instance to not correct performances. Now
>>>>> >>>>>>>>>>>> with the SDF and the
>>>>> >>>>>>>>>>>> split increase, how do you handle the pool size? Generally
>>>>> >>>>>>>>>>>> in batch you use
>>>>> >>>>>>>>>>>> a single connection per thread to avoid to consume all
>>>>> >>>>>>>>>>>> database connections.
>>>>> >>>>>>>>>>>> With a pool you have 2 choices: 1. use a pool of 1, 2. use
>>>>> >>>>>>>>>>>> a pool a bit
>>>>> >>>>>>>>>>>> higher but multiplied by the number of beans you will
>>>>> >>>>>>>>>>>> likely x2 or 3 the
>>>>> >>>>>>>>>>>> connection count and make the execution fail with "no more
>>>>> >>>>>>>>>>>> connection
>>>>> >>>>>>>>>>>> available". I you picked 1 (pool of #1), then you still
>>>>> >>>>>>>>>>>> have to have a
>>>>> >>>>>>>>>>>> reliable teardown by pool instance (close() generally) to
>>>>> >>>>>>>>>>>> ensure you release
>>>>> >>>>>>>>>>>> the pool and don't leak the connection information in the
>>>>> >>>>>>>>>>>> JVM. In all case
>>>>> >>>>>>>>>>>> you come back to the init()/destroy() lifecycle even if
>>>>> >>>>>>>>>>>> you fake to get
>>>>> >>>>>>>>>>>> connections with bundles.
>>>>> >>>>>>>>>>>>
>>>>> >>>>>>>>>>>> Just to make it obvious: SDF mentions are just cause SDF
>>>>> >>>>>>>>>>>> imply
>>>>> >>>>>>>>>>>> all the current issues with the loose definition of the
>>>>> >>>>>>>>>>>> bean lifecycles at
>>>>> >>>>>>>>>>>> an exponential level, nothing else.
>>>>> >>>>>>>>>>>>
>>>>> >>>>>>>>>>>>
>>>>> >>>>>>>>>>>>
>>>>> >>>>>>>>>>>> Romain Manni-Bucau
>>>>> >>>>>>>>>>>> @rmannibucau |  Blog | Old Blog | Github | LinkedIn | Book
>>>>> >>>>>>>>>>>>
>>>>> >>>>>>>>>>>> 2018-02-18 22:32 GMT+01:00 Eugene Kirpichov
>>>>> >>>>>>>>>>>> <kirpic...@google.com>:
>>>>> >>>>>>>>>>>>>
>>>>> >>>>>>>>>>>>> The kind of whole-transform lifecycle you're mentioning
>>>>> >>>>>>>>>>>>> can be
>>>>> >>>>>>>>>>>>> accomplished using the Wait transform as I suggested in
>>>>> >>>>>>>>>>>>> the thread above,
>>>>> >>>>>>>>>>>>> and I believe it should become the canonical way to do
>>>>> >>>>>>>>>>>>> that.
>>>>> >>>>>>>>>>>>>
>>>>> >>>>>>>>>>>>> (Would like to reiterate one more time, as the main
>>>>> >>>>>>>>>>>>> author of
>>>>> >>>>>>>>>>>>> most design documents related to SDF and of its
>>>>> >>>>>>>>>>>>> implementation in the Java
>>>>> >>>>>>>>>>>>> direct and dataflow runner that SDF is fully unrelated to
>>>>> >>>>>>>>>>>>> the topic of
>>>>> >>>>>>>>>>>>> cleanup - I'm very confused as to why it keeps coming up)
>>>>> >>>>>>>>>>>>>
>>>>> >>>>>>>>>>>>>
>>>>> >>>>>>>>>>>>> On Sun, Feb 18, 2018, 1:15 PM Romain Manni-Bucau
>>>>> >>>>>>>>>>>>> <rmannibu...@gmail.com> wrote:
>>>>> >>>>>>>>>>>>>>
>>>>> >>>>>>>>>>>>>> I kind of agree except transforms lack a lifecycle too.
>>>>> >>>>>>>>>>>>>> My
>>>>> >>>>>>>>>>>>>> understanding is that sdf could be a way to unify it and
>>>>> >>>>>>>>>>>>>> clean the api.
>>>>> >>>>>>>>>>>>>>
>>>>> >>>>>>>>>>>>>> Otherwise how to normalize - single api -  lifecycle of
>>>>> >>>>>>>>>>>>>> transforms?
>>>>> >>>>>>>>>>>>>>
>>>>> >>>>>>>>>>>>>> Le 18 févr. 2018 21:32, "Ben Chambers"
>>>>> >>>>>>>>>>>>>> <bchamb...@apache.org>
>>>>> >>>>>>>>>>>>>> a écrit :
>>>>> >>>>>>>>>>>>>>>
>>>>> >>>>>>>>>>>>>>> Are you sure that focusing on the cleanup of specific
>>>>> >>>>>>>>>>>>>>> DoFn's
>>>>> >>>>>>>>>>>>>>> is appropriate? Many cases where cleanup is necessary,
>>>>> >>>>>>>>>>>>>>> it is around an
>>>>> >>>>>>>>>>>>>>> entire composite PTransform. I think there have been
>>>>> >>>>>>>>>>>>>>> discussions/proposals
>>>>> >>>>>>>>>>>>>>> around a more methodical "cleanup" option, but those
>>>>> >>>>>>>>>>>>>>> haven't been
>>>>> >>>>>>>>>>>>>>> implemented, to the best of my knowledge.
>>>>> >>>>>>>>>>>>>>>
>>>>> >>>>>>>>>>>>>>> For instance, consider the steps of a FileIO:
>>>>> >>>>>>>>>>>>>>> 1. Write to a bunch (N shards) of temporary files
>>>>> >>>>>>>>>>>>>>> 2. When all temporary files are complete, attempt to do
>>>>> >>>>>>>>>>>>>>> a
>>>>> >>>>>>>>>>>>>>> bulk copy to put them in the final destination.
>>>>> >>>>>>>>>>>>>>> 3. Cleanup all the temporary files.
>>>>> >>>>>>>>>>>>>>>
>>>>> >>>>>>>>>>>>>>> (This is often desirable because it minimizes the
>>>>> >>>>>>>>>>>>>>> chance of
>>>>> >>>>>>>>>>>>>>> seeing partial/incomplete results in the final
>>>>> >>>>>>>>>>>>>>> destination).
>>>>> >>>>>>>>>>>>>>>
>>>>> >>>>>>>>>>>>>>> In the above, you'd want step 1 to execute on many
>>>>> >>>>>>>>>>>>>>> workers,
>>>>> >>>>>>>>>>>>>>> likely using a ParDo (say N different workers).
>>>>> >>>>>>>>>>>>>>> The move step should only happen once, so on one
>>>>> >>>>>>>>>>>>>>> worker. This
>>>>> >>>>>>>>>>>>>>> means it will be a different DoFn, likely with some
>>>>> >>>>>>>>>>>>>>> stuff done to ensure it
>>>>> >>>>>>>>>>>>>>> runs on one worker.
>>>>> >>>>>>>>>>>>>>>
>>>>> >>>>>>>>>>>>>>> In such a case, cleanup / @TearDown of the DoFn is not
>>>>> >>>>>>>>>>>>>>> enough. We need an API for a PTransform to schedule
>>>>> >>>>>>>>>>>>>>> some cleanup work for
>>>>> >>>>>>>>>>>>>>> when the transform is "done". In batch this is
>>>>> >>>>>>>>>>>>>>> relatively straightforward,
>>>>> >>>>>>>>>>>>>>> but doesn't exist. This is the source of some problems,
>>>>> >>>>>>>>>>>>>>> such as BigQuery
>>>>> >>>>>>>>>>>>>>> sink leaving files around that have failed to import
>>>>> >>>>>>>>>>>>>>> into BigQuery.
>>>>> >>>>>>>>>>>>>>>
>>>>> >>>>>>>>>>>>>>> In streaming this is less straightforward -- do you
>>>>> >>>>>>>>>>>>>>> want to
>>>>> >>>>>>>>>>>>>>> wait until the end of the pipeline? Or do you want to
>>>>> >>>>>>>>>>>>>>> wait until the end of
>>>>> >>>>>>>>>>>>>>> the window? In practice, you just want to wait until
>>>>> >>>>>>>>>>>>>>> you know nobody will
>>>>> >>>>>>>>>>>>>>> need the resource anymore.
>>>>> >>>>>>>>>>>>>>>
>>>>> >>>>>>>>>>>>>>> This led to some discussions around a "cleanup" API,
>>>>> >>>>>>>>>>>>>>> where
>>>>> >>>>>>>>>>>>>>> you could have a transform that output resource
>>>>> >>>>>>>>>>>>>>> objects. Each resource
>>>>> >>>>>>>>>>>>>>> object would have logic for cleaning it up. And there
>>>>> >>>>>>>>>>>>>>> would be something
>>>>> >>>>>>>>>>>>>>> that indicated what parts of the pipeline needed that
>>>>> >>>>>>>>>>>>>>> resource, and what
>>>>> >>>>>>>>>>>>>>> kind of temporal lifetime those objects had. As soon as
>>>>> >>>>>>>>>>>>>>> that part of the
>>>>> >>>>>>>>>>>>>>> pipeline had advanced far enough that it would no
>>>>> >>>>>>>>>>>>>>> longer need the resources,
>>>>> >>>>>>>>>>>>>>> they would get cleaned up. This can be done at pipeline
>>>>> >>>>>>>>>>>>>>> shutdown, or
>>>>> >>>>>>>>>>>>>>> incrementally during a streaming pipeline, etc.
>>>>> >>>>>>>>>>>>>>>
>>>>> >>>>>>>>>>>>>>> Would something like this be a better fit for your use
>>>>> >>>>>>>>>>>>>>> case?
>>>>> >>>>>>>>>>>>>>> If not, why is handling teardown within a single DoFn
>>>>> >>>>>>>>>>>>>>> sufficient?
>>>>> >>>>>>>>>>>>>>>
>>>>> >>>>>>>>>>>>>>> On Sun, Feb 18, 2018 at 11:53 AM Romain Manni-Bucau
>>>>> >>>>>>>>>>>>>>> <rmannibu...@gmail.com> wrote:
>>>>> >>>>>>>>>>>>>>>>
>>>>> >>>>>>>>>>>>>>>> Yes 1M. Lets try to explain you simplifying the
>>>>> >>>>>>>>>>>>>>>> overall
>>>>> >>>>>>>>>>>>>>>> execution. Each instance - one fn so likely in a
>>>>> >>>>>>>>>>>>>>>> thread of a worker - has
>>>>> >>>>>>>>>>>>>>>> its lifecycle. Caricaturally: "new" and garbage
>>>>> >>>>>>>>>>>>>>>> collection.
>>>>> >>>>>>>>>>>>>>>>
>>>>> >>>>>>>>>>>>>>>> In practise, new is often an unsafe allocate
>>>>> >>>>>>>>>>>>>>>> (deserialization) but it doesnt matter here.
>>>>> >>>>>>>>>>>>>>>>
>>>>> >>>>>>>>>>>>>>>> What i want is any "new" to have a following setup
>>>>> >>>>>>>>>>>>>>>> before
>>>>> >>>>>>>>>>>>>>>> any process or stattbundle and the last time beam has
>>>>> >>>>>>>>>>>>>>>> the instance before it
>>>>> >>>>>>>>>>>>>>>> is gc-ed and after last finishbundle it calls
>>>>> >>>>>>>>>>>>>>>> teardown.
>>>>> >>>>>>>>>>>>>>>>
>>>>> >>>>>>>>>>>>>>>> It is as simple as it.
>>>>> >>>>>>>>>>>>>>>> This way no need to comibe fn in a way making a fn not
>>>>> >>>>>>>>>>>>>>>> self
>>>>> >>>>>>>>>>>>>>>> contained to implement basic transforms.
>>>>> >>>>>>>>>>>>>>>>
>>>>> >>>>>>>>>>>>>>>> Le 18 févr. 2018 20:07, "Reuven Lax"
>>>>> >>>>>>>>>>>>>>>> <re...@google.com> a
>>>>> >>>>>>>>>>>>>>>> écrit :
>>>>> >>>>>>>>>>>>>>>>>
>>>>> >>>>>>>>>>>>>>>>>
>>>>> >>>>>>>>>>>>>>>>>
>>>>> >>>>>>>>>>>>>>>>> On Sun, Feb 18, 2018 at 10:50 AM, Romain Manni-Bucau
>>>>> >>>>>>>>>>>>>>>>> <rmannibu...@gmail.com> wrote:
>>>>> >>>>>>>>>>>>>>>>>>
>>>>> >>>>>>>>>>>>>>>>>>
>>>>> >>>>>>>>>>>>>>>>>>
>>>>> >>>>>>>>>>>>>>>>>> Le 18 févr. 2018 19:28, "Ben Chambers"
>>>>> >>>>>>>>>>>>>>>>>> <bchamb...@apache.org> a écrit :
>>>>> >>>>>>>>>>>>>>>>>>
>>>>> >>>>>>>>>>>>>>>>>> It feels like his thread may be a bit off-track.
>>>>> >>>>>>>>>>>>>>>>>> Rather
>>>>> >>>>>>>>>>>>>>>>>> than focusing on the semantics of the existing
>>>>> >>>>>>>>>>>>>>>>>> methods -- which have been
>>>>> >>>>>>>>>>>>>>>>>> noted to be meet many existing use cases -- it would
>>>>> >>>>>>>>>>>>>>>>>> be helpful to focus on
>>>>> >>>>>>>>>>>>>>>>>> more on the reason you are looking for something
>>>>> >>>>>>>>>>>>>>>>>> with different semantics.
>>>>> >>>>>>>>>>>>>>>>>>
>>>>> >>>>>>>>>>>>>>>>>> Some possibilities (I'm not sure which one you are
>>>>> >>>>>>>>>>>>>>>>>> trying
>>>>> >>>>>>>>>>>>>>>>>> to do):
>>>>> >>>>>>>>>>>>>>>>>>
>>>>> >>>>>>>>>>>>>>>>>> 1. Clean-up some external, global resource, that was
>>>>> >>>>>>>>>>>>>>>>>> initialized once during the startup of the pipeline.
>>>>> >>>>>>>>>>>>>>>>>> If this is the case,
>>>>> >>>>>>>>>>>>>>>>>> how are you ensuring it was really only initialized
>>>>> >>>>>>>>>>>>>>>>>> once (and not once per
>>>>> >>>>>>>>>>>>>>>>>> worker, per thread, per instance, etc.)? How do you
>>>>> >>>>>>>>>>>>>>>>>> know when the pipeline
>>>>> >>>>>>>>>>>>>>>>>> should release it? If the answer is "when it reaches
>>>>> >>>>>>>>>>>>>>>>>> step X", then what
>>>>> >>>>>>>>>>>>>>>>>> about a streaming pipeline?
>>>>> >>>>>>>>>>>>>>>>>>
>>>>> >>>>>>>>>>>>>>>>>>
>>>>> >>>>>>>>>>>>>>>>>> When the dofn is no more needed logically ie when
>>>>> >>>>>>>>>>>>>>>>>> the
>>>>> >>>>>>>>>>>>>>>>>> batch is done or stream is stopped (manually or by a
>>>>> >>>>>>>>>>>>>>>>>> jvm shutdown)
>>>>> >>>>>>>>>>>>>>>>>
>>>>> >>>>>>>>>>>>>>>>>
>>>>> >>>>>>>>>>>>>>>>> I'm really not following what this means.
>>>>> >>>>>>>>>>>>>>>>>
>>>>> >>>>>>>>>>>>>>>>> Let's say that a pipeline is running 1000 workers,
>>>>> >>>>>>>>>>>>>>>>> and each
>>>>> >>>>>>>>>>>>>>>>> worker is running 1000 threads (each running a copy
>>>>> >>>>>>>>>>>>>>>>> of the same DoFn). How
>>>>> >>>>>>>>>>>>>>>>> many cleanups do you want (do you want 1000 * 1000 =
>>>>> >>>>>>>>>>>>>>>>> 1M cleanups) and when
>>>>> >>>>>>>>>>>>>>>>> do you want it called? When the entire pipeline is
>>>>> >>>>>>>>>>>>>>>>> shut down? When an
>>>>> >>>>>>>>>>>>>>>>> individual worker is about to shut down (which may be
>>>>> >>>>>>>>>>>>>>>>> temporary - may be
>>>>> >>>>>>>>>>>>>>>>> about to start back up)? Something else?
>>>>> >>>>>>>>>>>>>>>>>
>>>>> >>>>>>>>>>>>>>>>>
>>>>> >>>>>>>>>>>>>>>>>>
>>>>> >>>>>>>>>>>>>>>>>>
>>>>> >>>>>>>>>>>>>>>>>>
>>>>> >>>>>>>>>>>>>>>>>>
>>>>> >>>>>>>>>>>>>>>>>> 2. Finalize some resources that are used within some
>>>>> >>>>>>>>>>>>>>>>>> region of the pipeline. While, the DoFn lifecycle
>>>>> >>>>>>>>>>>>>>>>>> methods are not a good fit
>>>>> >>>>>>>>>>>>>>>>>> for this (they are focused on managing resources
>>>>> >>>>>>>>>>>>>>>>>> within the DoFn), you could
>>>>> >>>>>>>>>>>>>>>>>> model this on how FileIO finalizes the files that it
>>>>> >>>>>>>>>>>>>>>>>> produced. For instance:
>>>>> >>>>>>>>>>>>>>>>>>    a) ParDo generates "resource IDs" (or some token
>>>>> >>>>>>>>>>>>>>>>>> that
>>>>> >>>>>>>>>>>>>>>>>> stores information about resources)
>>>>> >>>>>>>>>>>>>>>>>>    b) "Require Deterministic Input" (to prevent
>>>>> >>>>>>>>>>>>>>>>>> retries
>>>>> >>>>>>>>>>>>>>>>>> from changing resource IDs)
>>>>> >>>>>>>>>>>>>>>>>>    c) ParDo that initializes the resources
>>>>> >>>>>>>>>>>>>>>>>>    d) Pipeline segments that use the resources, and
>>>>> >>>>>>>>>>>>>>>>>> eventually output the fact they're done
>>>>> >>>>>>>>>>>>>>>>>>    e) "Require Deterministic Input"
>>>>> >>>>>>>>>>>>>>>>>>    f) ParDo that frees the resources
>>>>> >>>>>>>>>>>>>>>>>>
>>>>> >>>>>>>>>>>>>>>>>> By making the use of the resource part of the data
>>>>> >>>>>>>>>>>>>>>>>> it is
>>>>> >>>>>>>>>>>>>>>>>> possible to "checkpoint" which resources may be in
>>>>> >>>>>>>>>>>>>>>>>> use or have been finished
>>>>> >>>>>>>>>>>>>>>>>> by using the require deterministic input. This is
>>>>> >>>>>>>>>>>>>>>>>> important to ensuring
>>>>> >>>>>>>>>>>>>>>>>> everything is actually cleaned up.
>>>>> >>>>>>>>>>>>>>>>>>
>>>>> >>>>>>>>>>>>>>>>>>
>>>>> >>>>>>>>>>>>>>>>>> I nees that but generic and not case by case to
>>>>> >>>>>>>>>>>>>>>>>> industrialize some api on top of beam.
>>>>> >>>>>>>>>>>>>>>>>>
>>>>> >>>>>>>>>>>>>>>>>>
>>>>> >>>>>>>>>>>>>>>>>>
>>>>> >>>>>>>>>>>>>>>>>> 3. Some other use case that I may be missing? If it
>>>>> >>>>>>>>>>>>>>>>>> is
>>>>> >>>>>>>>>>>>>>>>>> this case, could you elaborate on what you are
>>>>> >>>>>>>>>>>>>>>>>> trying to accomplish? That
>>>>> >>>>>>>>>>>>>>>>>> would help me understand both the problems with
>>>>> >>>>>>>>>>>>>>>>>> existing options and
>>>>> >>>>>>>>>>>>>>>>>> possibly what could be done to help.
>>>>> >>>>>>>>>>>>>>>>>>
>>>>> >>>>>>>>>>>>>>>>>>
>>>>> >>>>>>>>>>>>>>>>>> I understand there are sorkaround for almost all
>>>>> >>>>>>>>>>>>>>>>>> cases but
>>>>> >>>>>>>>>>>>>>>>>> means each transform is different in its lifecycle
>>>>> >>>>>>>>>>>>>>>>>> handling  except i
>>>>> >>>>>>>>>>>>>>>>>> dislike it a lot at a scale and as a user since you
>>>>> >>>>>>>>>>>>>>>>>> cant put any unified
>>>>> >>>>>>>>>>>>>>>>>> practise on top of beam, it also makes beam very
>>>>> >>>>>>>>>>>>>>>>>> hard to integrate or to use
>>>>> >>>>>>>>>>>>>>>>>> to build higher level libraries or softwares.
>>>>> >>>>>>>>>>>>>>>>>>
>>>>> >>>>>>>>>>>>>>>>>> This is why i tried to not start the workaround
>>>>> >>>>>>>>>>>>>>>>>> discussions and just stay at API level.
>>>>> >>>>>>>>>>>>>>>>>>
>>>>> >>>>>>>>>>>>>>>>>>
>>>>> >>>>>>>>>>>>>>>>>>
>>>>> >>>>>>>>>>>>>>>>>> -- Ben
>>>>> >>>>>>>>>>>>>>>>>>
>>>>> >>>>>>>>>>>>>>>>>>
>>>>> >>>>>>>>>>>>>>>>>> On Sun, Feb 18, 2018 at 9:56 AM Romain Manni-Bucau
>>>>> >>>>>>>>>>>>>>>>>> <rmannibu...@gmail.com> wrote:
>>>>> >>>>>>>>>>>>>>>>>>>
>>>>> >>>>>>>>>>>>>>>>>>> 2018-02-18 18:36 GMT+01:00 Eugene Kirpichov
>>>>> >>>>>>>>>>>>>>>>>>> <kirpic...@google.com>:
>>>>> >>>>>>>>>>>>>>>>>>>>
>>>>> >>>>>>>>>>>>>>>>>>>> "Machine state" is overly low-level because many
>>>>> >>>>>>>>>>>>>>>>>>>> of the
>>>>> >>>>>>>>>>>>>>>>>>>> possible reasons can happen on a perfectly fine
>>>>> >>>>>>>>>>>>>>>>>>>> machine.
>>>>> >>>>>>>>>>>>>>>>>>>> If you'd like to rephrase it to "it will be called
>>>>> >>>>>>>>>>>>>>>>>>>> except in various situations where it's logically
>>>>> >>>>>>>>>>>>>>>>>>>> impossible or impractical
>>>>> >>>>>>>>>>>>>>>>>>>> to guarantee that it's called", that's fine. Or
>>>>> >>>>>>>>>>>>>>>>>>>> you can list some of the
>>>>> >>>>>>>>>>>>>>>>>>>> examples above.
>>>>> >>>>>>>>>>>>>>>>>>>
>>>>> >>>>>>>>>>>>>>>>>>>
>>>>> >>>>>>>>>>>>>>>>>>> Sounds ok to me
>>>>> >>>>>>>>>>>>>>>>>>>
>>>>> >>>>>>>>>>>>>>>>>>>>
>>>>> >>>>>>>>>>>>>>>>>>>>
>>>>> >>>>>>>>>>>>>>>>>>>> The main point for the user is, you *will* see
>>>>> >>>>>>>>>>>>>>>>>>>> non-preventable situations where it couldn't be
>>>>> >>>>>>>>>>>>>>>>>>>> called - it's not just
>>>>> >>>>>>>>>>>>>>>>>>>> intergalactic crashes - so if the logic is very
>>>>> >>>>>>>>>>>>>>>>>>>> important (e.g. cleaning up
>>>>> >>>>>>>>>>>>>>>>>>>> a large amount of temporary files, shutting down a
>>>>> >>>>>>>>>>>>>>>>>>>> large number of VMs you
>>>>> >>>>>>>>>>>>>>>>>>>> started etc), you have to express it using one of
>>>>> >>>>>>>>>>>>>>>>>>>> the other methods that
>>>>> >>>>>>>>>>>>>>>>>>>> have stricter guarantees (which obviously come at
>>>>> >>>>>>>>>>>>>>>>>>>> a cost, e.g. no
>>>>> >>>>>>>>>>>>>>>>>>>> pass-by-reference).
>>>>> >>>>>>>>>>>>>>>>>>>
>>>>> >>>>>>>>>>>>>>>>>>>
>>>>> >>>>>>>>>>>>>>>>>>> FinishBundle has the exact same guarantee sadly so
>>>>> >>>>>>>>>>>>>>>>>>> not
>>>>> >>>>>>>>>>>>>>>>>>> which which other method you speak about.
>>>>> >>>>>>>>>>>>>>>>>>> Concretely if you make it really
>>>>> >>>>>>>>>>>>>>>>>>> unreliable - this is what best effort sounds to me
>>>>> >>>>>>>>>>>>>>>>>>> - then users can use it
>>>>> >>>>>>>>>>>>>>>>>>> to clean anything but if you make it "can happen
>>>>> >>>>>>>>>>>>>>>>>>> but it is unexpected and
>>>>> >>>>>>>>>>>>>>>>>>> means something happent" then it is fine to have a
>>>>> >>>>>>>>>>>>>>>>>>> manual - or auto if fancy
>>>>> >>>>>>>>>>>>>>>>>>> - recovery procedure. This is where it makes all
>>>>> >>>>>>>>>>>>>>>>>>> the difference and impacts
>>>>> >>>>>>>>>>>>>>>>>>> the developpers, ops (all users basically).
>>>>> >>>>>>>>>>>>>>>>>>>
>>>>> >>>>>>>>>>>>>>>>>>>>
>>>>> >>>>>>>>>>>>>>>>>>>>
>>>>> >>>>>>>>>>>>>>>>>>>> On Sun, Feb 18, 2018 at 9:16 AM Romain Manni-Bucau
>>>>> >>>>>>>>>>>>>>>>>>>> <rmannibu...@gmail.com> wrote:
>>>>> >>>>>>>>>>>>>>>>>>>>>
>>>>> >>>>>>>>>>>>>>>>>>>>> Agree Eugene except that "best effort" means
>>>>> >>>>>>>>>>>>>>>>>>>>> that. It
>>>>> >>>>>>>>>>>>>>>>>>>>> is also often used to say "at will" and this is
>>>>> >>>>>>>>>>>>>>>>>>>>> what triggered this thread.
>>>>> >>>>>>>>>>>>>>>>>>>>>
>>>>> >>>>>>>>>>>>>>>>>>>>> I'm fine using "except if the machine state
>>>>> >>>>>>>>>>>>>>>>>>>>> prevents
>>>>> >>>>>>>>>>>>>>>>>>>>> it" but "best effort" is too open and can be very
>>>>> >>>>>>>>>>>>>>>>>>>>> badly and wrongly
>>>>> >>>>>>>>>>>>>>>>>>>>> perceived by users (like I did).
>>>>> >>>>>>>>>>>>>>>>>>>>>
>>>>> >>>>>>>>>>>>>>>>>>>>>
>>>>> >>>>>>>>>>>>>>>>>>>>> Romain Manni-Bucau
>>>>> >>>>>>>>>>>>>>>>>>>>> @rmannibucau |  Blog | Old Blog | Github |
>>>>> >>>>>>>>>>>>>>>>>>>>> LinkedIn |
>>>>> >>>>>>>>>>>>>>>>>>>>> Book
>>>>> >>>>>>>>>>>>>>>>>>>>>
>>>>> >>>>>>>>>>>>>>>>>>>>> 2018-02-18 18:13 GMT+01:00 Eugene Kirpichov
>>>>> >>>>>>>>>>>>>>>>>>>>> <kirpic...@google.com>:
>>>>> >>>>>>>>>>>>>>>>>>>>>>
>>>>> >>>>>>>>>>>>>>>>>>>>>> It will not be called if it's impossible to call
>>>>> >>>>>>>>>>>>>>>>>>>>>> it:
>>>>> >>>>>>>>>>>>>>>>>>>>>> in the example situation you have (intergalactic
>>>>> >>>>>>>>>>>>>>>>>>>>>> crash), and in a number of
>>>>> >>>>>>>>>>>>>>>>>>>>>> more common cases: eg in case the worker
>>>>> >>>>>>>>>>>>>>>>>>>>>> container has crashed (eg user code
>>>>> >>>>>>>>>>>>>>>>>>>>>> in a different thread called a C library over
>>>>> >>>>>>>>>>>>>>>>>>>>>> JNI and it segfaulted), JVM
>>>>> >>>>>>>>>>>>>>>>>>>>>> bug, crash due to user code OOM, in case the
>>>>> >>>>>>>>>>>>>>>>>>>>>> worker has lost network
>>>>> >>>>>>>>>>>>>>>>>>>>>> connectivity (then it may be called but it won't
>>>>> >>>>>>>>>>>>>>>>>>>>>> be able to do anything
>>>>> >>>>>>>>>>>>>>>>>>>>>> useful), in case this is running on a
>>>>> >>>>>>>>>>>>>>>>>>>>>> preemptible VM and it was preempted by
>>>>> >>>>>>>>>>>>>>>>>>>>>> the underlying cluster manager without notice or
>>>>> >>>>>>>>>>>>>>>>>>>>>> if the worker was too busy
>>>>> >>>>>>>>>>>>>>>>>>>>>> with other stuff (eg calling other Teardown
>>>>> >>>>>>>>>>>>>>>>>>>>>> functions) until the preemption
>>>>> >>>>>>>>>>>>>>>>>>>>>> timeout elapsed, in case the underlying hardware
>>>>> >>>>>>>>>>>>>>>>>>>>>> simply failed (which
>>>>> >>>>>>>>>>>>>>>>>>>>>> happens quite often at scale), and in many other
>>>>> >>>>>>>>>>>>>>>>>>>>>> conditions.
>>>>> >>>>>>>>>>>>>>>>>>>>>>
>>>>> >>>>>>>>>>>>>>>>>>>>>> "Best effort" is the commonly used term to
>>>>> >>>>>>>>>>>>>>>>>>>>>> describe
>>>>> >>>>>>>>>>>>>>>>>>>>>> such behavior. Please feel free to file bugs for
>>>>> >>>>>>>>>>>>>>>>>>>>>> cases where you observed a
>>>>> >>>>>>>>>>>>>>>>>>>>>> runner not call Teardown in a situation where it
>>>>> >>>>>>>>>>>>>>>>>>>>>> was possible to call it but
>>>>> >>>>>>>>>>>>>>>>>>>>>> the runner made insufficient effort.
>>>>> >>>>>>>>>>>>>>>>>>>>>>
>>>>> >>>>>>>>>>>>>>>>>>>>>>
>>>>> >>>>>>>>>>>>>>>>>>>>>> On Sun, Feb 18, 2018, 9:02 AM Romain Manni-Bucau
>>>>> >>>>>>>>>>>>>>>>>>>>>> <rmannibu...@gmail.com> wrote:
>>>>> >>>>>>>>>>>>>>>>>>>>>>>
>>>>> >>>>>>>>>>>>>>>>>>>>>>> 2018-02-18 18:00 GMT+01:00 Eugene Kirpichov
>>>>> >>>>>>>>>>>>>>>>>>>>>>> <kirpic...@google.com>:
>>>>> >>>>>>>>>>>>>>>>>>>>>>>>
>>>>> >>>>>>>>>>>>>>>>>>>>>>>>
>>>>> >>>>>>>>>>>>>>>>>>>>>>>>
>>>>> >>>>>>>>>>>>>>>>>>>>>>>> On Sun, Feb 18, 2018, 2:06 AM Romain
>>>>> >>>>>>>>>>>>>>>>>>>>>>>> Manni-Bucau
>>>>> >>>>>>>>>>>>>>>>>>>>>>>> <rmannibu...@gmail.com> wrote:
>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>
>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>
>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>
>>>>> >>>>>>>>>>>>>>>>>>>>>>>>> Le 18 févr. 2018 00:23, "Kenneth Knowles"
>>>>> >>>>>>>>>>>>>>>>>>>>>>>>> <k...@google.com> a écrit :
>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>
>>>>> >>>>>>>>>>>>>>>>>>>>>>>>> On Sat, Feb 17, 2018 at 3:09 PM, Romain
>>>>> >>>>>>>>>>>>>>>>>>>>>>>>> Manni-Bucau
>>>>> >>>>>>>>>>>>>>>>>>>>>>>>> <rmannibu...@gmail.com> wrote:
>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>> If you give an example of a high-level need
>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>> (e.g.
>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>> "I'm trying to write an IO for system $x and
>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>> it requires the following
>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>> initialization and the following cleanup
>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>> logic and the following processing
>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>> in between") I'll be better able to help
>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>> you.
>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>>
>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>> Take a simple example of a transform
>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>> requiring a
>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>> connection. Using bundles is a perf killer
>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>> since size is not controlled.
>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>> Using teardown doesnt allow you to release
>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>> the connection since it is a best
>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>> effort thing. Not releasing the connection
>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>> makes you pay a lot - aws ;) - or
>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>> prevents you to launch other processings -
>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>> concurrent limit.
>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>
>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>
>>>>> >>>>>>>>>>>>>>>>>>>>>>>>> For this example @Teardown is an exact fit.
>>>>> >>>>>>>>>>>>>>>>>>>>>>>>> If
>>>>> >>>>>>>>>>>>>>>>>>>>>>>>> things die so badly that @Teardown is not
>>>>> >>>>>>>>>>>>>>>>>>>>>>>>> called then nothing else can be
>>>>> >>>>>>>>>>>>>>>>>>>>>>>>> called to close the connection either. What
>>>>> >>>>>>>>>>>>>>>>>>>>>>>>> AWS service are you thinking of
>>>>> >>>>>>>>>>>>>>>>>>>>>>>>> that stays open for a long time when
>>>>> >>>>>>>>>>>>>>>>>>>>>>>>> everything at the other end has died?
>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>
>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>
>>>>> >>>>>>>>>>>>>>>>>>>>>>>>> You assume connections are kind of stateless
>>>>> >>>>>>>>>>>>>>>>>>>>>>>>> but
>>>>> >>>>>>>>>>>>>>>>>>>>>>>>> some (proprietary) protocols requires some
>>>>> >>>>>>>>>>>>>>>>>>>>>>>>> closing exchanges which are not
>>>>> >>>>>>>>>>>>>>>>>>>>>>>>> only "im leaving".
>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>
>>>>> >>>>>>>>>>>>>>>>>>>>>>>>> For aws i was thinking about starting some
>>>>> >>>>>>>>>>>>>>>>>>>>>>>>> services
>>>>> >>>>>>>>>>>>>>>>>>>>>>>>> - machines - on the fly in a pipeline startup
>>>>> >>>>>>>>>>>>>>>>>>>>>>>>> and closing them at the end.
>>>>> >>>>>>>>>>>>>>>>>>>>>>>>> If teardown is not called you leak machines
>>>>> >>>>>>>>>>>>>>>>>>>>>>>>> and money. You can say it can be
>>>>> >>>>>>>>>>>>>>>>>>>>>>>>> done another way...as the full pipeline ;).
>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>
>>>>> >>>>>>>>>>>>>>>>>>>>>>>>> I dont want to be picky but if beam cant
>>>>> >>>>>>>>>>>>>>>>>>>>>>>>> handle its
>>>>> >>>>>>>>>>>>>>>>>>>>>>>>> components lifecycle it can be used at scale
>>>>> >>>>>>>>>>>>>>>>>>>>>>>>> for generic pipelines and if
>>>>> >>>>>>>>>>>>>>>>>>>>>>>>> bound to some particular IO.
>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>
>>>>> >>>>>>>>>>>>>>>>>>>>>>>>> What does prevent to enforce teardown -
>>>>> >>>>>>>>>>>>>>>>>>>>>>>>> ignoring
>>>>> >>>>>>>>>>>>>>>>>>>>>>>>> the interstellar crash case which cant be
>>>>> >>>>>>>>>>>>>>>>>>>>>>>>> handled by any human system?
>>>>> >>>>>>>>>>>>>>>>>>>>>>>>> Nothing technically. Why do you push to not
>>>>> >>>>>>>>>>>>>>>>>>>>>>>>> handle it? Is it due to some
>>>>> >>>>>>>>>>>>>>>>>>>>>>>>> legacy code on dataflow or something else?
>>>>> >>>>>>>>>>>>>>>>>>>>>>>>
>>>>> >>>>>>>>>>>>>>>>>>>>>>>> Teardown *is* already documented and
>>>>> >>>>>>>>>>>>>>>>>>>>>>>> implemented
>>>>> >>>>>>>>>>>>>>>>>>>>>>>> this way (best-effort). So I'm not sure what
>>>>> >>>>>>>>>>>>>>>>>>>>>>>> kind of change you're asking
>>>>> >>>>>>>>>>>>>>>>>>>>>>>> for.
>>>>> >>>>>>>>>>>>>>>>>>>>>>>
>>>>> >>>>>>>>>>>>>>>>>>>>>>>
>>>>> >>>>>>>>>>>>>>>>>>>>>>> Remove "best effort" from the javadoc. If it is
>>>>> >>>>>>>>>>>>>>>>>>>>>>> not
>>>>> >>>>>>>>>>>>>>>>>>>>>>> call then it is a bug and we are done :).
>>>>> >>>>>>>>>>>>>>>>>>>>>>>
>>>>> >>>>>>>>>>>>>>>>>>>>>>>>
>>>>> >>>>>>>>>>>>>>>>>>>>>>>>
>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>
>>>>> >>>>>>>>>>>>>>>>>>>>>>>>> Also what does it mean for the users? Direct
>>>>> >>>>>>>>>>>>>>>>>>>>>>>>> runner
>>>>> >>>>>>>>>>>>>>>>>>>>>>>>> does it so if a user udes the RI in test, he
>>>>> >>>>>>>>>>>>>>>>>>>>>>>>> will get a different behavior
>>>>> >>>>>>>>>>>>>>>>>>>>>>>>> in prod? Also dont forget the user doesnt
>>>>> >>>>>>>>>>>>>>>>>>>>>>>>> know what the IOs he composes use
>>>>> >>>>>>>>>>>>>>>>>>>>>>>>> so this is so impacting for the whole product
>>>>> >>>>>>>>>>>>>>>>>>>>>>>>> than he must be handled IMHO.
>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>
>>>>> >>>>>>>>>>>>>>>>>>>>>>>>> I understand the portability culture is new
>>>>> >>>>>>>>>>>>>>>>>>>>>>>>> in big
>>>>> >>>>>>>>>>>>>>>>>>>>>>>>> data world but it is not a reason to ignore
>>>>> >>>>>>>>>>>>>>>>>>>>>>>>> what people did for years and do
>>>>> >>>>>>>>>>>>>>>>>>>>>>>>> it wrong before doing right ;).
>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>
>>>>> >>>>>>>>>>>>>>>>>>>>>>>>> My proposal is to list what can prevent to
>>>>> >>>>>>>>>>>>>>>>>>>>>>>>> guarantee - in the normal IT conditions - the
>>>>> >>>>>>>>>>>>>>>>>>>>>>>>> execution of teardown. Then we
>>>>> >>>>>>>>>>>>>>>>>>>>>>>>> see if we can handle it and only if there is
>>>>> >>>>>>>>>>>>>>>>>>>>>>>>> a technical reason we cant we
>>>>> >>>>>>>>>>>>>>>>>>>>>>>>> make it experimental/unsupported in the api.
>>>>> >>>>>>>>>>>>>>>>>>>>>>>>> I know spark and flink can, any
>>>>> >>>>>>>>>>>>>>>>>>>>>>>>> unknown blocker for other runners?
>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>
>>>>> >>>>>>>>>>>>>>>>>>>>>>>>> Technical note: even a kill should go through
>>>>> >>>>>>>>>>>>>>>>>>>>>>>>> java
>>>>> >>>>>>>>>>>>>>>>>>>>>>>>> shutdown hooks otherwise your environment
>>>>> >>>>>>>>>>>>>>>>>>>>>>>>> (beam enclosing software) is fully
>>>>> >>>>>>>>>>>>>>>>>>>>>>>>> unhandled and your overall system is
>>>>> >>>>>>>>>>>>>>>>>>>>>>>>> uncontrolled. Only case where it is not
>>>>> >>>>>>>>>>>>>>>>>>>>>>>>> true is when the software is always owned by
>>>>> >>>>>>>>>>>>>>>>>>>>>>>>> a vendor and never installed on
>>>>> >>>>>>>>>>>>>>>>>>>>>>>>> customer environment. In this case it belongd
>>>>> >>>>>>>>>>>>>>>>>>>>>>>>> to the vendor to handle beam
>>>>> >>>>>>>>>>>>>>>>>>>>>>>>> API and not to beam to adjust its API for a
>>>>> >>>>>>>>>>>>>>>>>>>>>>>>> vendor - otherwise all
>>>>> >>>>>>>>>>>>>>>>>>>>>>>>> unsupported features by one runner should be
>>>>> >>>>>>>>>>>>>>>>>>>>>>>>> made optional right?
>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>
>>>>> >>>>>>>>>>>>>>>>>>>>>>>>> All state is not about network, even in
>>>>> >>>>>>>>>>>>>>>>>>>>>>>>> distributed
>>>>> >>>>>>>>>>>>>>>>>>>>>>>>> systems so this is key to have an explicit
>>>>> >>>>>>>>>>>>>>>>>>>>>>>>> and defined lifecycle.
>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>
>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>
>>>>> >>>>>>>>>>>>>>>>>>>>>>>>> Kenn
>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>
>>>>> >>>>>>>>>>>>>>>>>>>>>>>>>
>>>>> >>>>>>>>>>>>>>>>>>>>>
>>>>> >>>>>>>>>>>>>>>>>>
>>>>> >>>>>>>>>>>>>>>>>
>>>>> >>>>>>>>>>>>
>>>>> >>>>>>>>>>>
>>>>> >>>>>>>>>>
>>>>> >>>>>>>>>
>>>>> >>>
>>>>> >>
>>>>> >>
>>>>> >
>>>>
>>>>
>>
>

Reply via email to