Do you have data that supports this?

Note that in reality for something like passing an element between DoFns,
the constant in o(1) actually matters. Decreasing SDK harness overhead is a
good thing though.

On Mon, Mar 12, 2018 at 10:14 AM, Romain Manni-Bucau <rmannibu...@gmail.com>
wrote:

> By itself just the overhead of instantiating a wrapper (so nothing with
> the recent JVM GC improvement done for the stream/optional usages). After
> if you use the chaining you have a light overhead but still o(1) you can
> desire to skip when doing sync code but which will enable you to run way
> faster doing IO/async code by optimizing the CPU usage properly when you
> tune your slaves/workers. So tempted to summarize it as "has an overhead
> allowing to not run slower". It doesn't prevent beam to still expose a
> synchronous API collapse at evaluation time in a single fn which will give
> you the best of both worlds.
>
>
> Romain Manni-Bucau
> @rmannibucau <https://twitter.com/rmannibucau> |  Blog
> <https://rmannibucau.metawerx.net/> | Old Blog
> <http://rmannibucau.wordpress.com> | Github
> <https://github.com/rmannibucau> | LinkedIn
> <https://www.linkedin.com/in/rmannibucau> | Book
> <https://www.packtpub.com/application-development/java-ee-8-high-performance>
>
> 2018-03-12 18:08 GMT+01:00 Lukasz Cwik <lc...@google.com>:
>
>> It is expected that SDKs will have all their cores fully utilized by
>> processing bundles in parallel and not by performing intrabundle
>> parallelization. This allows for DoFns to be chained together via regular
>> method calls because the overhead to pass a single element through all the
>> DoFn's should be as minimal as possible
>>
>> What is the overhead of using a completion stage vs using a regular
>> method call?
>>
>> On Sun, Mar 11, 2018 at 10:18 PM, Romain Manni-Bucau <
>> rmannibu...@gmail.com> wrote:
>>
>>>
>>>
>>> Le 12 mars 2018 00:19, "Reuven Lax" <re...@google.com> a écrit :
>>>
>>>
>>>
>>>
>>> On Sun, Mar 11, 2018 at 7:40 AM Romain Manni-Bucau <
>>> rmannibu...@gmail.com> wrote:
>>>
>>>> makes me think beam should maybe do 2 internals changes before moving
>>>> forward on (s)df API changes:
>>>>
>>>> 1. define a beam singleton per JVM (classloader hierarchy actually but
>>>> you get the idea I think) which can be used to store things locally for
>>>> reuse - see 2 for an example or metrics pusher work Etienne does could
>>>> benefit from it too
>>>>
>>>
>>> I think we do need something like this, but it needs to be a bit more
>>> than a singleton per JVM. For one thing it needs to be at least per
>>> pipeline within a JVM. You might run multiple tests in a single JVM, and it
>>> should also be possible to run those tests in parallel without the static
>>> state interfering with each other. I also think the state needs to be
>>> addressable per step (i.e. a ParDo can look up its static state without
>>> caring about static state belonging to another ParDo).
>>>
>>>
>>> Agree but you can register in a singleton the pipeline (as "ref" not as
>>> instance) and therefore hit the same need. +1 to have scopes (singleton,
>>> pipeline, thread) but it still requires a single singleton to handle
>>> serialization ;).
>>>
>>>
>>>
>>>
>>> 2. define a SPI to load (s)dofn parameter provider instead of having an
>>>> ArgProvider which provides everything which is supported. This way you can
>>>> use any kind of parameter and the parameterproviders can use 1. to handle
>>>> their own state. First impl of the parameterprovider SPI would be a) state
>>>> b) timer c) reactive handlers and potentially user parameter providers
>>>> (service like which can be singleton in the scope of a "JVM" thanks to 1).
>>>>
>>>>
>>>> Romain Manni-Bucau
>>>> @rmannibucau <https://twitter.com/rmannibucau> |  Blog
>>>> <https://rmannibucau.metawerx.net/> | Old Blog
>>>> <http://rmannibucau.wordpress.com> | Github
>>>> <https://github.com/rmannibucau> | LinkedIn
>>>> <https://www.linkedin.com/in/rmannibucau> | Book
>>>> <https://www.packtpub.com/application-development/java-ee-8-high-performance>
>>>>
>>>> 2018-03-11 15:32 GMT+01:00 Reuven Lax <re...@google.com>:
>>>>
>>>>> Yep. Introduce OutputEmitter, and Process context no longer has much
>>>>> use.
>>>>>
>>>>> On Sun, Mar 11, 2018, 11:19 AM Romain Manni-Bucau <
>>>>> rmannibu...@gmail.com> wrote:
>>>>>
>>>>>> Which is still a key feature for sdf but agree it can be dropped for
>>>>>> an outputemitter pattern and the dofn moved to a plain parameters 
>>>>>> injection
>>>>>> based pattern. Both (which completionstage) stays compatible :).
>>>>>>
>>>>>> Le 11 mars 2018 13:12, "Reuven Lax" <re...@google.com> a écrit :
>>>>>>
>>>>>>> I think process context should go away completely. At that point it
>>>>>>> has little use except for a way to send output downstream.
>>>>>>>
>>>>>>> On Sun, Mar 11, 2018, 6:07 AM Romain Manni-Bucau <
>>>>>>> rmannibu...@gmail.com> wrote:
>>>>>>>
>>>>>>>> Hmm, thinking out loud but completionstage should/could be extended
>>>>>>>> to replace processcontext since it represents element and output at the
>>>>>>>> same time no?
>>>>>>>>
>>>>>>>> Le 11 mars 2018 00:57, "Kenneth Knowles" <k...@google.com> a écrit :
>>>>>>>>
>>>>>>>>> Yea, I think it could. But it is probably more readable to not
>>>>>>>>> overload the term, plus certainly a bit simpler in implementation. So
>>>>>>>>> perhaps @AsyncElement to make it very clear.
>>>>>>>>>
>>>>>>>>> Kenn
>>>>>>>>>
>>>>>>>>> On Sat, Mar 10, 2018 at 1:32 PM Reuven Lax <re...@google.com>
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>>> Ken, can NewDoFn distinguish at generation time the difference
>>>>>>>>>> between:
>>>>>>>>>>
>>>>>>>>>>     public void process(@Element CompletionStage<InputT> element,
>>>>>>>>>> ...) {
>>>>>>>>>>
>>>>>>>>>> and
>>>>>>>>>>
>>>>>>>>>>     public void process(@Element Input element, ...) {
>>>>>>>>>>
>>>>>>>>>> If not, then we would probably need separate annotations....
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> On Sat, Mar 10, 2018 at 11:09 AM Kenneth Knowles <k...@google.com>
>>>>>>>>>> wrote:
>>>>>>>>>>
>>>>>>>>>>> Nice! I agree that providing a CompletionStage for chaining is
>>>>>>>>>>> much better than an ExecutorService, and very clear.
>>>>>>>>>>>
>>>>>>>>>>> It is very feasible to add support that looks like
>>>>>>>>>>>
>>>>>>>>>>>   new DoFn<InputT, OutputT>() {
>>>>>>>>>>>     @ProcessElement
>>>>>>>>>>>     public void process(@Element CompletionStage<InputT>
>>>>>>>>>>> element, ...) {
>>>>>>>>>>>       element.thenApply(...)
>>>>>>>>>>>     }
>>>>>>>>>>>   }
>>>>>>>>>>>
>>>>>>>>>>> If we had this available, I think users could even experiment
>>>>>>>>>>> with this often as it might help even where it isn't obvious.
>>>>>>>>>>>
>>>>>>>>>>> My main hesitation is that big part of Beam is giving a
>>>>>>>>>>> basic/imperative style of programming a DoFn that executes in a 
>>>>>>>>>>> very smart
>>>>>>>>>>> functional/parallel way. Full future-oriented programming is
>>>>>>>>>>> not explored much outside of Javascript (and maybe Haskell) and 
>>>>>>>>>>> requires
>>>>>>>>>>> greater discipline in programming in a functional manner - if you 
>>>>>>>>>>> are
>>>>>>>>>>> mutating stuff in your callback you are going to have bugs, and 
>>>>>>>>>>> then when
>>>>>>>>>>> you add concurrency control you are going to have bad performance 
>>>>>>>>>>> and
>>>>>>>>>>> deadlocks. So I definitely wouldn't make it the default or want to 
>>>>>>>>>>> spend
>>>>>>>>>>> all our support effort on teaching advanced programming technique.
>>>>>>>>>>>
>>>>>>>>>>> Kenn
>>>>>>>>>>>
>>>>>>>>>>> On Sat, Mar 10, 2018 at 9:31 AM Romain Manni-Bucau <
>>>>>>>>>>> rmannibu...@gmail.com> wrote:
>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> 2018-03-10 17:30 GMT+01:00 Reuven Lax <re...@google.com>:
>>>>>>>>>>>>
>>>>>>>>>>>>> Have you considered drafting in detail what you think this API
>>>>>>>>>>>>> might look like?
>>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> Yes, but it is after the "enhancements" - for my use cases -
>>>>>>>>>>>> and "bugs" list so didn't started to work on it much.
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>> If it's a radically different API, it might be more
>>>>>>>>>>>>> appropriate as an alternative parallel Beam API rather than a 
>>>>>>>>>>>>> replacement
>>>>>>>>>>>>> for the current API (there is also one such fluent API in the 
>>>>>>>>>>>>> works).
>>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> What I plan is to draft it on top of beam (so the "useless"
>>>>>>>>>>>> case I spoke about before) and then propose to impl it ~natively 
>>>>>>>>>>>> and move
>>>>>>>>>>>> it as main API for another major.
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>> On Sat, Mar 10, 2018 at 7:23 AM Romain Manni-Bucau <
>>>>>>>>>>>>> rmannibu...@gmail.com> wrote:
>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> 2018-03-10 16:19 GMT+01:00 Reuven Lax <re...@google.com>:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> This is another version (maybe a better, Java 8 idiomatic
>>>>>>>>>>>>>>> one?) of what Kenn suggested.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Note that with NewDoFn this need not be incompatible (so
>>>>>>>>>>>>>>> might not require waiting till Beam 3.0). We can recognize new 
>>>>>>>>>>>>>>> parameters
>>>>>>>>>>>>>>> to processElement and populate add needed.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> This is right however in my head it was a single way
>>>>>>>>>>>>>> movemenent to enforce the design to be reactive and not fake a 
>>>>>>>>>>>>>> reactive API
>>>>>>>>>>>>>> with a sync and not reactive impl which is what would be done 
>>>>>>>>>>>>>> today with
>>>>>>>>>>>>>> both support I fear.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> On Sat, Mar 10, 2018, 12:13 PM Romain Manni-Bucau <
>>>>>>>>>>>>>>> rmannibu...@gmail.com> wrote:
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Yes, for the dofn for instance, instead of having
>>>>>>>>>>>>>>>> processcontext.element()=<T> you get a CompletionStage<T> and 
>>>>>>>>>>>>>>>> output gets
>>>>>>>>>>>>>>>> it as well.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> This way you register an execution chain. Mixed with
>>>>>>>>>>>>>>>> streams you get a big data java 8/9/10 API which enabkes any 
>>>>>>>>>>>>>>>> connectivity
>>>>>>>>>>>>>>>> in a wel performing manner ;).
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Le 10 mars 2018 13:56, "Reuven Lax" <re...@google.com> a
>>>>>>>>>>>>>>>> écrit :
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> So you mean the user should have a way of registering
>>>>>>>>>>>>>>>>> asynchronous activity with a callback (the callback must be 
>>>>>>>>>>>>>>>>> registered with
>>>>>>>>>>>>>>>>> Beam, because Beam needs to know not to mark the element as 
>>>>>>>>>>>>>>>>> done until all
>>>>>>>>>>>>>>>>> associated callbacks have completed). I think that's 
>>>>>>>>>>>>>>>>> basically what Kenn
>>>>>>>>>>>>>>>>> was suggesting, unless I'm missing something.
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> On Fri, Mar 9, 2018 at 11:07 PM Romain Manni-Bucau <
>>>>>>>>>>>>>>>>> rmannibu...@gmail.com> wrote:
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> Yes, callback based. Beam today is synchronous and until
>>>>>>>>>>>>>>>>>> bundles+combines are reactive friendly, beam will be 
>>>>>>>>>>>>>>>>>> synchronous whatever
>>>>>>>>>>>>>>>>>> other parts do. Becoming reactive will enable to manage the 
>>>>>>>>>>>>>>>>>> threading
>>>>>>>>>>>>>>>>>> issues properly and to have better scalability on the 
>>>>>>>>>>>>>>>>>> overall execution
>>>>>>>>>>>>>>>>>> when remote IO are involved.
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> However it requires to break source, sdf design to use
>>>>>>>>>>>>>>>>>> completionstage - or equivalent - to chain the processing 
>>>>>>>>>>>>>>>>>> properly and in
>>>>>>>>>>>>>>>>>> an unified fashion.
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> Le 9 mars 2018 23:48, "Reuven Lax" <re...@google.com> a
>>>>>>>>>>>>>>>>>> écrit :
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> If you're talking about reactive programming, at a
>>>>>>>>>>>>>>>>>> certain level beam is already reactive. Are you referring to 
>>>>>>>>>>>>>>>>>> a specific way
>>>>>>>>>>>>>>>>>> of writing the code?
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> On Fri, Mar 9, 2018 at 1:59 PM Reuven Lax <
>>>>>>>>>>>>>>>>>> re...@google.com> wrote:
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> What do you mean by reactive?
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> On Fri, Mar 9, 2018, 6:58 PM Romain Manni-Bucau <
>>>>>>>>>>>>>>>>>>> rmannibu...@gmail.com> wrote:
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> @Kenn: why not preferring to make beam reactive? Would
>>>>>>>>>>>>>>>>>>>> alow to scale way more without having to hardly 
>>>>>>>>>>>>>>>>>>>> synchronize multithreading.
>>>>>>>>>>>>>>>>>>>> Elegant and efficient :). Beam 3?
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> Le 9 mars 2018 22:49, "Kenneth Knowles" <k...@google.com>
>>>>>>>>>>>>>>>>>>>> a écrit :
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> I will start with the "exciting futuristic" answer,
>>>>>>>>>>>>>>>>>>>>> which is that we envision the new DoFn to be able to 
>>>>>>>>>>>>>>>>>>>>> provide an automatic
>>>>>>>>>>>>>>>>>>>>> ExecutorService parameters that you can use as you wish.
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>     new DoFn<>() {
>>>>>>>>>>>>>>>>>>>>>       @ProcessElement
>>>>>>>>>>>>>>>>>>>>>       public void process(ProcessContext ctx,
>>>>>>>>>>>>>>>>>>>>> ExecutorService executorService) {
>>>>>>>>>>>>>>>>>>>>>           ... launch some futures, put them in
>>>>>>>>>>>>>>>>>>>>> instance vars ...
>>>>>>>>>>>>>>>>>>>>>       }
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>       @FinishBundle
>>>>>>>>>>>>>>>>>>>>>       public void finish(...) {
>>>>>>>>>>>>>>>>>>>>>          ... block on futures, output results if
>>>>>>>>>>>>>>>>>>>>> appropriate ...
>>>>>>>>>>>>>>>>>>>>>       }
>>>>>>>>>>>>>>>>>>>>>     }
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> This way, the Java SDK harness can use its overarching
>>>>>>>>>>>>>>>>>>>>> knowledge of what is going on in a computation to, for 
>>>>>>>>>>>>>>>>>>>>> example, share a
>>>>>>>>>>>>>>>>>>>>> thread pool between different bits. This was one reason 
>>>>>>>>>>>>>>>>>>>>> to delete
>>>>>>>>>>>>>>>>>>>>> IntraBundleParallelization - it didn't allow the runner 
>>>>>>>>>>>>>>>>>>>>> and user code to
>>>>>>>>>>>>>>>>>>>>> properly manage how many things were going on 
>>>>>>>>>>>>>>>>>>>>> concurrently. And mostly the
>>>>>>>>>>>>>>>>>>>>> runner should own parallelizing to max out cores and what 
>>>>>>>>>>>>>>>>>>>>> user code needs
>>>>>>>>>>>>>>>>>>>>> is asynchrony hooks that can interact with that. However, 
>>>>>>>>>>>>>>>>>>>>> this feature is
>>>>>>>>>>>>>>>>>>>>> not thoroughly considered. TBD how much the harness 
>>>>>>>>>>>>>>>>>>>>> itself manages blocking
>>>>>>>>>>>>>>>>>>>>> on outstanding requests versus it being your 
>>>>>>>>>>>>>>>>>>>>> responsibility in
>>>>>>>>>>>>>>>>>>>>> FinishBundle, etc.
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> I haven't explored rolling your own here, if you are
>>>>>>>>>>>>>>>>>>>>> willing to do the knob tuning to get the threading 
>>>>>>>>>>>>>>>>>>>>> acceptable for your
>>>>>>>>>>>>>>>>>>>>> particular use case. Perhaps someone else can weigh in.
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> Kenn
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> On Fri, Mar 9, 2018 at 1:38 PM Josh Ferge <
>>>>>>>>>>>>>>>>>>>>> josh.fe...@bounceexchange.com> wrote:
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>> Hello all:
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>> Our team has a pipeline that make external network
>>>>>>>>>>>>>>>>>>>>>> calls. These pipelines are currently super slow, and the 
>>>>>>>>>>>>>>>>>>>>>> hypothesis is that
>>>>>>>>>>>>>>>>>>>>>> they are slow because we are not threading for our 
>>>>>>>>>>>>>>>>>>>>>> network calls. The
>>>>>>>>>>>>>>>>>>>>>> github issue below provides some discussion around this:
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>> https://github.com/apache/beam/pull/957
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>> In beam 1.0, there was IntraBundleParallelization,
>>>>>>>>>>>>>>>>>>>>>> which helped with this. However, this was removed 
>>>>>>>>>>>>>>>>>>>>>> because it didn't comply
>>>>>>>>>>>>>>>>>>>>>> with a few BEAM paradigms.
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>> Questions going forward:
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>> What is advised for jobs that make blocking network
>>>>>>>>>>>>>>>>>>>>>> calls? It seems bundling the elements into groups of 
>>>>>>>>>>>>>>>>>>>>>> size X prior to
>>>>>>>>>>>>>>>>>>>>>> passing to the DoFn, and managing the threading within 
>>>>>>>>>>>>>>>>>>>>>> the function might
>>>>>>>>>>>>>>>>>>>>>> work. thoughts?
>>>>>>>>>>>>>>>>>>>>>> Are these types of jobs even suitable for beam?
>>>>>>>>>>>>>>>>>>>>>> Are there any plans to develop features that help
>>>>>>>>>>>>>>>>>>>>>> with this?
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>> Thanks
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>
>>>
>>
>

Reply via email to