I kind of agree except transforms lack a lifecycle too. My understanding is
that sdf could be a way to unify it and clean the api.

Otherwise how to normalize - single api -  lifecycle of transforms?

Le 18 févr. 2018 21:32, "Ben Chambers" <bchamb...@apache.org> a écrit :

> Are you sure that focusing on the cleanup of specific DoFn's is
> appropriate? Many cases where cleanup is necessary, it is around an entire
> composite PTransform. I think there have been discussions/proposals around
> a more methodical "cleanup" option, but those haven't been implemented, to
> the best of my knowledge.
>
> For instance, consider the steps of a FileIO:
> 1. Write to a bunch (N shards) of temporary files
> 2. When all temporary files are complete, attempt to do a bulk copy to put
> them in the final destination.
> 3. Cleanup all the temporary files.
>
> (This is often desirable because it minimizes the chance of seeing
> partial/incomplete results in the final destination).
>
> In the above, you'd want step 1 to execute on many workers, likely using a
> ParDo (say N different workers).
> The move step should only happen once, so on one worker. This means it
> will be a different DoFn, likely with some stuff done to ensure it runs on
> one worker.
>
> In such a case, cleanup / @TearDown of the DoFn is not enough. We need an
> API for a PTransform to schedule some cleanup work for when the transform
> is "done". In batch this is relatively straightforward, but doesn't exist.
> This is the source of some problems, such as BigQuery sink leaving files
> around that have failed to import into BigQuery.
>
> In streaming this is less straightforward -- do you want to wait until the
> end of the pipeline? Or do you want to wait until the end of the window? In
> practice, you just want to wait until you know nobody will need the
> resource anymore.
>
> This led to some discussions around a "cleanup" API, where you could have
> a transform that output resource objects. Each resource object would have
> logic for cleaning it up. And there would be something that indicated what
> parts of the pipeline needed that resource, and what kind of temporal
> lifetime those objects had. As soon as that part of the pipeline had
> advanced far enough that it would no longer need the resources, they would
> get cleaned up. This can be done at pipeline shutdown, or incrementally
> during a streaming pipeline, etc.
>
> Would something like this be a better fit for your use case? If not, why
> is handling teardown within a single DoFn sufficient?
>
> On Sun, Feb 18, 2018 at 11:53 AM Romain Manni-Bucau <rmannibu...@gmail.com>
> wrote:
>
>> Yes 1M. Lets try to explain you simplifying the overall execution. Each
>> instance - one fn so likely in a thread of a worker - has its lifecycle.
>> Caricaturally: "new" and garbage collection.
>>
>> In practise, new is often an unsafe allocate (deserialization) but it
>> doesnt matter here.
>>
>> What i want is any "new" to have a following setup before any process or
>> stattbundle and the last time beam has the instance before it is gc-ed and
>> after last finishbundle it calls teardown.
>>
>> It is as simple as it.
>> This way no need to comibe fn in a way making a fn not self contained to
>> implement basic transforms.
>>
>> Le 18 févr. 2018 20:07, "Reuven Lax" <re...@google.com> a écrit :
>>
>>>
>>>
>>> On Sun, Feb 18, 2018 at 10:50 AM, Romain Manni-Bucau <
>>> rmannibu...@gmail.com> wrote:
>>>
>>>>
>>>>
>>>> Le 18 févr. 2018 19:28, "Ben Chambers" <bchamb...@apache.org> a écrit :
>>>>
>>>> It feels like his thread may be a bit off-track. Rather than focusing
>>>> on the semantics of the existing methods -- which have been noted to be
>>>> meet many existing use cases -- it would be helpful to focus on more on the
>>>> reason you are looking for something with different semantics.
>>>>
>>>> Some possibilities (I'm not sure which one you are trying to do):
>>>>
>>>> 1. Clean-up some external, global resource, that was initialized once
>>>> during the startup of the pipeline. If this is the case, how are you
>>>> ensuring it was really only initialized once (and not once per worker, per
>>>> thread, per instance, etc.)? How do you know when the pipeline should
>>>> release it? If the answer is "when it reaches step X", then what about a
>>>> streaming pipeline?
>>>>
>>>>
>>>> When the dofn is no more needed logically ie when the batch is done or
>>>> stream is stopped (manually or by a jvm shutdown)
>>>>
>>>
>>> I'm really not following what this means.
>>>
>>> Let's say that a pipeline is running 1000 workers, and each worker is
>>> running 1000 threads (each running a copy of the same DoFn). How many
>>> cleanups do you want (do you want 1000 * 1000 = 1M cleanups) and when do
>>> you want it called? When the entire pipeline is shut down? When an
>>> individual worker is about to shut down (which may be temporary - may be
>>> about to start back up)? Something else?
>>>
>>>
>>>
>>>>
>>>>
>>>>
>>>> 2. Finalize some resources that are used within some region of the
>>>> pipeline. While, the DoFn lifecycle methods are not a good fit for this
>>>> (they are focused on managing resources within the DoFn), you could model
>>>> this on how FileIO finalizes the files that it produced. For instance:
>>>>    a) ParDo generates "resource IDs" (or some token that stores
>>>> information about resources)
>>>>    b) "Require Deterministic Input" (to prevent retries from changing
>>>> resource IDs)
>>>>    c) ParDo that initializes the resources
>>>>    d) Pipeline segments that use the resources, and eventually output
>>>> the fact they're done
>>>>    e) "Require Deterministic Input"
>>>>    f) ParDo that frees the resources
>>>>
>>>> By making the use of the resource part of the data it is possible to
>>>> "checkpoint" which resources may be in use or have been finished by using
>>>> the require deterministic input. This is important to ensuring everything
>>>> is actually cleaned up.
>>>>
>>>>
>>>> I nees that but generic and not case by case to industrialize some api
>>>> on top of beam.
>>>>
>>>>
>>>>
>>>> 3. Some other use case that I may be missing? If it is this case, could
>>>> you elaborate on what you are trying to accomplish? That would help me
>>>> understand both the problems with existing options and possibly what could
>>>> be done to help.
>>>>
>>>>
>>>> I understand there are sorkaround for almost all cases but means each
>>>> transform is different in its lifecycle handling  except i dislike it a lot
>>>> at a scale and as a user since you cant put any unified practise on top of
>>>> beam, it also makes beam very hard to integrate or to use to build higher
>>>> level libraries or softwares.
>>>>
>>>> This is why i tried to not start the workaround discussions and just
>>>> stay at API level.
>>>>
>>>>
>>>>
>>>> -- Ben
>>>>
>>>>
>>>> On Sun, Feb 18, 2018 at 9:56 AM Romain Manni-Bucau <
>>>> rmannibu...@gmail.com> wrote:
>>>>
>>>>> 2018-02-18 18:36 GMT+01:00 Eugene Kirpichov <kirpic...@google.com>:
>>>>>
>>>>>> "Machine state" is overly low-level because many of the possible
>>>>>> reasons can happen on a perfectly fine machine.
>>>>>> If you'd like to rephrase it to "it will be called except in various
>>>>>> situations where it's logically impossible or impractical to guarantee 
>>>>>> that
>>>>>> it's called", that's fine. Or you can list some of the examples above.
>>>>>>
>>>>>
>>>>> Sounds ok to me
>>>>>
>>>>>
>>>>>>
>>>>>> The main point for the user is, you *will* see non-preventable
>>>>>> situations where it couldn't be called - it's not just intergalactic
>>>>>> crashes - so if the logic is very important (e.g. cleaning up a large
>>>>>> amount of temporary files, shutting down a large number of VMs you 
>>>>>> started
>>>>>> etc), you have to express it using one of the other methods that have
>>>>>> stricter guarantees (which obviously come at a cost, e.g. no
>>>>>> pass-by-reference).
>>>>>>
>>>>>
>>>>> FinishBundle has the exact same guarantee sadly so not which which
>>>>> other method you speak about. Concretely if you make it really unreliable 
>>>>> -
>>>>> this is what best effort sounds to me - then users can use it to clean
>>>>> anything but if you make it "can happen but it is unexpected and means
>>>>> something happent" then it is fine to have a manual - or auto if fancy -
>>>>> recovery procedure. This is where it makes all the difference and impacts
>>>>> the developpers, ops (all users basically).
>>>>>
>>>>>
>>>>>>
>>>>>> On Sun, Feb 18, 2018 at 9:16 AM Romain Manni-Bucau <
>>>>>> rmannibu...@gmail.com> wrote:
>>>>>>
>>>>>>> Agree Eugene except that "best effort" means that. It is also often
>>>>>>> used to say "at will" and this is what triggered this thread.
>>>>>>>
>>>>>>> I'm fine using "except if the machine state prevents it" but "best
>>>>>>> effort" is too open and can be very badly and wrongly perceived by users
>>>>>>> (like I did).
>>>>>>>
>>>>>>>
>>>>>>> Romain Manni-Bucau
>>>>>>> @rmannibucau <https://twitter.com/rmannibucau> |  Blog
>>>>>>> <https://rmannibucau.metawerx.net/> | Old Blog
>>>>>>> <http://rmannibucau.wordpress.com> | Github
>>>>>>> <https://github.com/rmannibucau> | LinkedIn
>>>>>>> <https://www.linkedin.com/in/rmannibucau> | Book
>>>>>>> <https://www.packtpub.com/application-development/java-ee-8-high-performance>
>>>>>>>
>>>>>>> 2018-02-18 18:13 GMT+01:00 Eugene Kirpichov <kirpic...@google.com>:
>>>>>>>
>>>>>>>> It will not be called if it's impossible to call it: in the example
>>>>>>>> situation you have (intergalactic crash), and in a number of more 
>>>>>>>> common
>>>>>>>> cases: eg in case the worker container has crashed (eg user code in a
>>>>>>>> different thread called a C library over JNI and it segfaulted), JVM 
>>>>>>>> bug,
>>>>>>>> crash due to user code OOM, in case the worker has lost network
>>>>>>>> connectivity (then it may be called but it won't be able to do anything
>>>>>>>> useful), in case this is running on a preemptible VM and it was 
>>>>>>>> preempted
>>>>>>>> by the underlying cluster manager without notice or if the worker was 
>>>>>>>> too
>>>>>>>> busy with other stuff (eg calling other Teardown functions) until the
>>>>>>>> preemption timeout elapsed, in case the underlying hardware simply 
>>>>>>>> failed
>>>>>>>> (which happens quite often at scale), and in many other conditions.
>>>>>>>>
>>>>>>>> "Best effort" is the commonly used term to describe such behavior.
>>>>>>>> Please feel free to file bugs for cases where you observed a runner not
>>>>>>>> call Teardown in a situation where it was possible to call it but the
>>>>>>>> runner made insufficient effort.
>>>>>>>>
>>>>>>>> On Sun, Feb 18, 2018, 9:02 AM Romain Manni-Bucau <
>>>>>>>> rmannibu...@gmail.com> wrote:
>>>>>>>>
>>>>>>>>> 2018-02-18 18:00 GMT+01:00 Eugene Kirpichov <kirpic...@google.com>
>>>>>>>>> :
>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> On Sun, Feb 18, 2018, 2:06 AM Romain Manni-Bucau <
>>>>>>>>>> rmannibu...@gmail.com> wrote:
>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> Le 18 févr. 2018 00:23, "Kenneth Knowles" <k...@google.com> a
>>>>>>>>>>> écrit :
>>>>>>>>>>>
>>>>>>>>>>> On Sat, Feb 17, 2018 at 3:09 PM, Romain Manni-Bucau <
>>>>>>>>>>> rmannibu...@gmail.com> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>> If you give an example of a high-level need (e.g. "I'm trying
>>>>>>>>>>>> to write an IO for system $x and it requires the following 
>>>>>>>>>>>> initialization
>>>>>>>>>>>> and the following cleanup logic and the following processing in 
>>>>>>>>>>>> between")
>>>>>>>>>>>> I'll be better able to help you.
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> Take a simple example of a transform requiring a connection.
>>>>>>>>>>>> Using bundles is a perf killer since size is not controlled. Using 
>>>>>>>>>>>> teardown
>>>>>>>>>>>> doesnt allow you to release the connection since it is a best 
>>>>>>>>>>>> effort thing.
>>>>>>>>>>>> Not releasing the connection makes you pay a lot - aws ;) - or 
>>>>>>>>>>>> prevents you
>>>>>>>>>>>> to launch other processings - concurrent limit.
>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> For this example @Teardown is an exact fit. If things die so
>>>>>>>>>>> badly that @Teardown is not called then nothing else can be called 
>>>>>>>>>>> to close
>>>>>>>>>>> the connection either. What AWS service are you thinking of that 
>>>>>>>>>>> stays open
>>>>>>>>>>> for a long time when everything at the other end has died?
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> You assume connections are kind of stateless but some
>>>>>>>>>>> (proprietary) protocols requires some closing exchanges which are 
>>>>>>>>>>> not only
>>>>>>>>>>> "im leaving".
>>>>>>>>>>>
>>>>>>>>>>> For aws i was thinking about starting some services - machines -
>>>>>>>>>>> on the fly in a pipeline startup and closing them at the end. If 
>>>>>>>>>>> teardown
>>>>>>>>>>> is not called you leak machines and money. You can say it can be 
>>>>>>>>>>> done
>>>>>>>>>>> another way...as the full pipeline ;).
>>>>>>>>>>>
>>>>>>>>>>> I dont want to be picky but if beam cant handle its components
>>>>>>>>>>> lifecycle it can be used at scale for generic pipelines and if 
>>>>>>>>>>> bound to
>>>>>>>>>>> some particular IO.
>>>>>>>>>>>
>>>>>>>>>>> What does prevent to enforce teardown - ignoring the
>>>>>>>>>>> interstellar crash case which cant be handled by any human system? 
>>>>>>>>>>> Nothing
>>>>>>>>>>> technically. Why do you push to not handle it? Is it due to some 
>>>>>>>>>>> legacy
>>>>>>>>>>> code on dataflow or something else?
>>>>>>>>>>>
>>>>>>>>>> Teardown *is* already documented and implemented this way
>>>>>>>>>> (best-effort). So I'm not sure what kind of change you're asking for.
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>> Remove "best effort" from the javadoc. If it is not call then it
>>>>>>>>> is a bug and we are done :).
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>> Also what does it mean for the users? Direct runner does it so
>>>>>>>>>>> if a user udes the RI in test, he will get a different behavior in 
>>>>>>>>>>> prod?
>>>>>>>>>>> Also dont forget the user doesnt know what the IOs he composes use 
>>>>>>>>>>> so this
>>>>>>>>>>> is so impacting for the whole product than he must be handled IMHO.
>>>>>>>>>>>
>>>>>>>>>>> I understand the portability culture is new in big data world
>>>>>>>>>>> but it is not a reason to ignore what people did for years and do 
>>>>>>>>>>> it wrong
>>>>>>>>>>> before doing right ;).
>>>>>>>>>>>
>>>>>>>>>>> My proposal is to list what can prevent to guarantee - in the
>>>>>>>>>>> normal IT conditions - the execution of teardown. Then we see if we 
>>>>>>>>>>> can
>>>>>>>>>>> handle it and only if there is a technical reason we cant we make it
>>>>>>>>>>> experimental/unsupported in the api. I know spark and flink can, any
>>>>>>>>>>> unknown blocker for other runners?
>>>>>>>>>>>
>>>>>>>>>>> Technical note: even a kill should go through java shutdown
>>>>>>>>>>> hooks otherwise your environment (beam enclosing software) is fully
>>>>>>>>>>> unhandled and your overall system is uncontrolled. Only case where 
>>>>>>>>>>> it is
>>>>>>>>>>> not true is when the software is always owned by a vendor and never
>>>>>>>>>>> installed on customer environment. In this case it belongd to the 
>>>>>>>>>>> vendor to
>>>>>>>>>>> handle beam API and not to beam to adjust its API for a vendor - 
>>>>>>>>>>> otherwise
>>>>>>>>>>> all unsupported features by one runner should be made optional 
>>>>>>>>>>> right?
>>>>>>>>>>>
>>>>>>>>>>> All state is not about network, even in distributed systems so
>>>>>>>>>>> this is key to have an explicit and defined lifecycle.
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> Kenn
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>
>>>>
>>>

Reply via email to