On Sun, Feb 18, 2018 at 10:50 AM, Romain Manni-Bucau <rmannibu...@gmail.com>
wrote:

>
>
> Le 18 févr. 2018 19:28, "Ben Chambers" <bchamb...@apache.org> a écrit :
>
> It feels like his thread may be a bit off-track. Rather than focusing on
> the semantics of the existing methods -- which have been noted to be meet
> many existing use cases -- it would be helpful to focus on more on the
> reason you are looking for something with different semantics.
>
> Some possibilities (I'm not sure which one you are trying to do):
>
> 1. Clean-up some external, global resource, that was initialized once
> during the startup of the pipeline. If this is the case, how are you
> ensuring it was really only initialized once (and not once per worker, per
> thread, per instance, etc.)? How do you know when the pipeline should
> release it? If the answer is "when it reaches step X", then what about a
> streaming pipeline?
>
>
> When the dofn is no more needed logically ie when the batch is done or
> stream is stopped (manually or by a jvm shutdown)
>

I'm really not following what this means.

Let's say that a pipeline is running 1000 workers, and each worker is
running 1000 threads (each running a copy of the same DoFn). How many
cleanups do you want (do you want 1000 * 1000 = 1M cleanups) and when do
you want it called? When the entire pipeline is shut down? When an
individual worker is about to shut down (which may be temporary - may be
about to start back up)? Something else?



>
>
>
> 2. Finalize some resources that are used within some region of the
> pipeline. While, the DoFn lifecycle methods are not a good fit for this
> (they are focused on managing resources within the DoFn), you could model
> this on how FileIO finalizes the files that it produced. For instance:
>    a) ParDo generates "resource IDs" (or some token that stores
> information about resources)
>    b) "Require Deterministic Input" (to prevent retries from changing
> resource IDs)
>    c) ParDo that initializes the resources
>    d) Pipeline segments that use the resources, and eventually output the
> fact they're done
>    e) "Require Deterministic Input"
>    f) ParDo that frees the resources
>
> By making the use of the resource part of the data it is possible to
> "checkpoint" which resources may be in use or have been finished by using
> the require deterministic input. This is important to ensuring everything
> is actually cleaned up.
>
>
> I nees that but generic and not case by case to industrialize some api on
> top of beam.
>
>
>
> 3. Some other use case that I may be missing? If it is this case, could
> you elaborate on what you are trying to accomplish? That would help me
> understand both the problems with existing options and possibly what could
> be done to help.
>
>
> I understand there are sorkaround for almost all cases but means each
> transform is different in its lifecycle handling  except i dislike it a lot
> at a scale and as a user since you cant put any unified practise on top of
> beam, it also makes beam very hard to integrate or to use to build higher
> level libraries or softwares.
>
> This is why i tried to not start the workaround discussions and just stay
> at API level.
>
>
>
> -- Ben
>
>
> On Sun, Feb 18, 2018 at 9:56 AM Romain Manni-Bucau <rmannibu...@gmail.com>
> wrote:
>
>> 2018-02-18 18:36 GMT+01:00 Eugene Kirpichov <kirpic...@google.com>:
>>
>>> "Machine state" is overly low-level because many of the possible reasons
>>> can happen on a perfectly fine machine.
>>> If you'd like to rephrase it to "it will be called except in various
>>> situations where it's logically impossible or impractical to guarantee that
>>> it's called", that's fine. Or you can list some of the examples above.
>>>
>>
>> Sounds ok to me
>>
>>
>>>
>>> The main point for the user is, you *will* see non-preventable
>>> situations where it couldn't be called - it's not just intergalactic
>>> crashes - so if the logic is very important (e.g. cleaning up a large
>>> amount of temporary files, shutting down a large number of VMs you started
>>> etc), you have to express it using one of the other methods that have
>>> stricter guarantees (which obviously come at a cost, e.g. no
>>> pass-by-reference).
>>>
>>
>> FinishBundle has the exact same guarantee sadly so not which which other
>> method you speak about. Concretely if you make it really unreliable - this
>> is what best effort sounds to me - then users can use it to clean anything
>> but if you make it "can happen but it is unexpected and means something
>> happent" then it is fine to have a manual - or auto if fancy - recovery
>> procedure. This is where it makes all the difference and impacts the
>> developpers, ops (all users basically).
>>
>>
>>>
>>> On Sun, Feb 18, 2018 at 9:16 AM Romain Manni-Bucau <
>>> rmannibu...@gmail.com> wrote:
>>>
>>>> Agree Eugene except that "best effort" means that. It is also often
>>>> used to say "at will" and this is what triggered this thread.
>>>>
>>>> I'm fine using "except if the machine state prevents it" but "best
>>>> effort" is too open and can be very badly and wrongly perceived by users
>>>> (like I did).
>>>>
>>>>
>>>> Romain Manni-Bucau
>>>> @rmannibucau <https://twitter.com/rmannibucau> |  Blog
>>>> <https://rmannibucau.metawerx.net/> | Old Blog
>>>> <http://rmannibucau.wordpress.com> | Github
>>>> <https://github.com/rmannibucau> | LinkedIn
>>>> <https://www.linkedin.com/in/rmannibucau> | Book
>>>> <https://www.packtpub.com/application-development/java-ee-8-high-performance>
>>>>
>>>> 2018-02-18 18:13 GMT+01:00 Eugene Kirpichov <kirpic...@google.com>:
>>>>
>>>>> It will not be called if it's impossible to call it: in the example
>>>>> situation you have (intergalactic crash), and in a number of more common
>>>>> cases: eg in case the worker container has crashed (eg user code in a
>>>>> different thread called a C library over JNI and it segfaulted), JVM bug,
>>>>> crash due to user code OOM, in case the worker has lost network
>>>>> connectivity (then it may be called but it won't be able to do anything
>>>>> useful), in case this is running on a preemptible VM and it was preempted
>>>>> by the underlying cluster manager without notice or if the worker was too
>>>>> busy with other stuff (eg calling other Teardown functions) until the
>>>>> preemption timeout elapsed, in case the underlying hardware simply failed
>>>>> (which happens quite often at scale), and in many other conditions.
>>>>>
>>>>> "Best effort" is the commonly used term to describe such behavior.
>>>>> Please feel free to file bugs for cases where you observed a runner not
>>>>> call Teardown in a situation where it was possible to call it but the
>>>>> runner made insufficient effort.
>>>>>
>>>>> On Sun, Feb 18, 2018, 9:02 AM Romain Manni-Bucau <
>>>>> rmannibu...@gmail.com> wrote:
>>>>>
>>>>>> 2018-02-18 18:00 GMT+01:00 Eugene Kirpichov <kirpic...@google.com>:
>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> On Sun, Feb 18, 2018, 2:06 AM Romain Manni-Bucau <
>>>>>>> rmannibu...@gmail.com> wrote:
>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> Le 18 févr. 2018 00:23, "Kenneth Knowles" <k...@google.com> a
>>>>>>>> écrit :
>>>>>>>>
>>>>>>>> On Sat, Feb 17, 2018 at 3:09 PM, Romain Manni-Bucau <
>>>>>>>> rmannibu...@gmail.com> wrote:
>>>>>>>>>
>>>>>>>>> If you give an example of a high-level need (e.g. "I'm trying to
>>>>>>>>> write an IO for system $x and it requires the following 
>>>>>>>>> initialization and
>>>>>>>>> the following cleanup logic and the following processing in between") 
>>>>>>>>> I'll
>>>>>>>>> be better able to help you.
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> Take a simple example of a transform requiring a connection. Using
>>>>>>>>> bundles is a perf killer since size is not controlled. Using teardown
>>>>>>>>> doesnt allow you to release the connection since it is a best effort 
>>>>>>>>> thing.
>>>>>>>>> Not releasing the connection makes you pay a lot - aws ;) - or 
>>>>>>>>> prevents you
>>>>>>>>> to launch other processings - concurrent limit.
>>>>>>>>>
>>>>>>>>
>>>>>>>> For this example @Teardown is an exact fit. If things die so badly
>>>>>>>> that @Teardown is not called then nothing else can be called to close 
>>>>>>>> the
>>>>>>>> connection either. What AWS service are you thinking of that stays 
>>>>>>>> open for
>>>>>>>> a long time when everything at the other end has died?
>>>>>>>>
>>>>>>>>
>>>>>>>> You assume connections are kind of stateless but some (proprietary)
>>>>>>>> protocols requires some closing exchanges which are not only "im 
>>>>>>>> leaving".
>>>>>>>>
>>>>>>>> For aws i was thinking about starting some services - machines - on
>>>>>>>> the fly in a pipeline startup and closing them at the end. If teardown 
>>>>>>>> is
>>>>>>>> not called you leak machines and money. You can say it can be done 
>>>>>>>> another
>>>>>>>> way...as the full pipeline ;).
>>>>>>>>
>>>>>>>> I dont want to be picky but if beam cant handle its components
>>>>>>>> lifecycle it can be used at scale for generic pipelines and if bound to
>>>>>>>> some particular IO.
>>>>>>>>
>>>>>>>> What does prevent to enforce teardown - ignoring the interstellar
>>>>>>>> crash case which cant be handled by any human system? Nothing 
>>>>>>>> technically.
>>>>>>>> Why do you push to not handle it? Is it due to some legacy code on 
>>>>>>>> dataflow
>>>>>>>> or something else?
>>>>>>>>
>>>>>>> Teardown *is* already documented and implemented this way
>>>>>>> (best-effort). So I'm not sure what kind of change you're asking for.
>>>>>>>
>>>>>>
>>>>>> Remove "best effort" from the javadoc. If it is not call then it is a
>>>>>> bug and we are done :).
>>>>>>
>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>> Also what does it mean for the users? Direct runner does it so if a
>>>>>>>> user udes the RI in test, he will get a different behavior in prod? 
>>>>>>>> Also
>>>>>>>> dont forget the user doesnt know what the IOs he composes use so this 
>>>>>>>> is so
>>>>>>>> impacting for the whole product than he must be handled IMHO.
>>>>>>>>
>>>>>>>> I understand the portability culture is new in big data world but
>>>>>>>> it is not a reason to ignore what people did for years and do it wrong
>>>>>>>> before doing right ;).
>>>>>>>>
>>>>>>>> My proposal is to list what can prevent to guarantee - in the
>>>>>>>> normal IT conditions - the execution of teardown. Then we see if we can
>>>>>>>> handle it and only if there is a technical reason we cant we make it
>>>>>>>> experimental/unsupported in the api. I know spark and flink can, any
>>>>>>>> unknown blocker for other runners?
>>>>>>>>
>>>>>>>> Technical note: even a kill should go through java shutdown hooks
>>>>>>>> otherwise your environment (beam enclosing software) is fully 
>>>>>>>> unhandled and
>>>>>>>> your overall system is uncontrolled. Only case where it is not true is 
>>>>>>>> when
>>>>>>>> the software is always owned by a vendor and never installed on 
>>>>>>>> customer
>>>>>>>> environment. In this case it belongd to the vendor to handle beam API 
>>>>>>>> and
>>>>>>>> not to beam to adjust its API for a vendor - otherwise all unsupported
>>>>>>>> features by one runner should be made optional right?
>>>>>>>>
>>>>>>>> All state is not about network, even in distributed systems so this
>>>>>>>> is key to have an explicit and defined lifecycle.
>>>>>>>>
>>>>>>>>
>>>>>>>> Kenn
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>
>

Reply via email to