By itself just the overhead of instantiating a wrapper (so nothing with the
recent JVM GC improvement done for the stream/optional usages). After if
you use the chaining you have a light overhead but still o(1) you can
desire to skip when doing sync code but which will enable you to run way
faster doing IO/async code by optimizing the CPU usage properly when you
tune your slaves/workers. So tempted to summarize it as "has an overhead
allowing to not run slower". It doesn't prevent beam to still expose a
synchronous API collapse at evaluation time in a single fn which will give
you the best of both worlds.


Romain Manni-Bucau
@rmannibucau <https://twitter.com/rmannibucau> |  Blog
<https://rmannibucau.metawerx.net/> | Old Blog
<http://rmannibucau.wordpress.com> | Github <https://github.com/rmannibucau> |
LinkedIn <https://www.linkedin.com/in/rmannibucau> | Book
<https://www.packtpub.com/application-development/java-ee-8-high-performance>

2018-03-12 18:08 GMT+01:00 Lukasz Cwik <lc...@google.com>:

> It is expected that SDKs will have all their cores fully utilized by
> processing bundles in parallel and not by performing intrabundle
> parallelization. This allows for DoFns to be chained together via regular
> method calls because the overhead to pass a single element through all the
> DoFn's should be as minimal as possible
>
> What is the overhead of using a completion stage vs using a regular method
> call?
>
> On Sun, Mar 11, 2018 at 10:18 PM, Romain Manni-Bucau <
> rmannibu...@gmail.com> wrote:
>
>>
>>
>> Le 12 mars 2018 00:19, "Reuven Lax" <re...@google.com> a écrit :
>>
>>
>>
>>
>> On Sun, Mar 11, 2018 at 7:40 AM Romain Manni-Bucau <rmannibu...@gmail.com>
>> wrote:
>>
>>> makes me think beam should maybe do 2 internals changes before moving
>>> forward on (s)df API changes:
>>>
>>> 1. define a beam singleton per JVM (classloader hierarchy actually but
>>> you get the idea I think) which can be used to store things locally for
>>> reuse - see 2 for an example or metrics pusher work Etienne does could
>>> benefit from it too
>>>
>>
>> I think we do need something like this, but it needs to be a bit more
>> than a singleton per JVM. For one thing it needs to be at least per
>> pipeline within a JVM. You might run multiple tests in a single JVM, and it
>> should also be possible to run those tests in parallel without the static
>> state interfering with each other. I also think the state needs to be
>> addressable per step (i.e. a ParDo can look up its static state without
>> caring about static state belonging to another ParDo).
>>
>>
>> Agree but you can register in a singleton the pipeline (as "ref" not as
>> instance) and therefore hit the same need. +1 to have scopes (singleton,
>> pipeline, thread) but it still requires a single singleton to handle
>> serialization ;).
>>
>>
>>
>>
>> 2. define a SPI to load (s)dofn parameter provider instead of having an
>>> ArgProvider which provides everything which is supported. This way you can
>>> use any kind of parameter and the parameterproviders can use 1. to handle
>>> their own state. First impl of the parameterprovider SPI would be a) state
>>> b) timer c) reactive handlers and potentially user parameter providers
>>> (service like which can be singleton in the scope of a "JVM" thanks to 1).
>>>
>>>
>>> Romain Manni-Bucau
>>> @rmannibucau <https://twitter.com/rmannibucau> |  Blog
>>> <https://rmannibucau.metawerx.net/> | Old Blog
>>> <http://rmannibucau.wordpress.com> | Github
>>> <https://github.com/rmannibucau> | LinkedIn
>>> <https://www.linkedin.com/in/rmannibucau> | Book
>>> <https://www.packtpub.com/application-development/java-ee-8-high-performance>
>>>
>>> 2018-03-11 15:32 GMT+01:00 Reuven Lax <re...@google.com>:
>>>
>>>> Yep. Introduce OutputEmitter, and Process context no longer has much
>>>> use.
>>>>
>>>> On Sun, Mar 11, 2018, 11:19 AM Romain Manni-Bucau <
>>>> rmannibu...@gmail.com> wrote:
>>>>
>>>>> Which is still a key feature for sdf but agree it can be dropped for
>>>>> an outputemitter pattern and the dofn moved to a plain parameters 
>>>>> injection
>>>>> based pattern. Both (which completionstage) stays compatible :).
>>>>>
>>>>> Le 11 mars 2018 13:12, "Reuven Lax" <re...@google.com> a écrit :
>>>>>
>>>>>> I think process context should go away completely. At that point it
>>>>>> has little use except for a way to send output downstream.
>>>>>>
>>>>>> On Sun, Mar 11, 2018, 6:07 AM Romain Manni-Bucau <
>>>>>> rmannibu...@gmail.com> wrote:
>>>>>>
>>>>>>> Hmm, thinking out loud but completionstage should/could be extended
>>>>>>> to replace processcontext since it represents element and output at the
>>>>>>> same time no?
>>>>>>>
>>>>>>> Le 11 mars 2018 00:57, "Kenneth Knowles" <k...@google.com> a écrit :
>>>>>>>
>>>>>>>> Yea, I think it could. But it is probably more readable to not
>>>>>>>> overload the term, plus certainly a bit simpler in implementation. So
>>>>>>>> perhaps @AsyncElement to make it very clear.
>>>>>>>>
>>>>>>>> Kenn
>>>>>>>>
>>>>>>>> On Sat, Mar 10, 2018 at 1:32 PM Reuven Lax <re...@google.com>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> Ken, can NewDoFn distinguish at generation time the difference
>>>>>>>>> between:
>>>>>>>>>
>>>>>>>>>     public void process(@Element CompletionStage<InputT> element,
>>>>>>>>> ...) {
>>>>>>>>>
>>>>>>>>> and
>>>>>>>>>
>>>>>>>>>     public void process(@Element Input element, ...) {
>>>>>>>>>
>>>>>>>>> If not, then we would probably need separate annotations....
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On Sat, Mar 10, 2018 at 11:09 AM Kenneth Knowles <k...@google.com>
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>>> Nice! I agree that providing a CompletionStage for chaining is
>>>>>>>>>> much better than an ExecutorService, and very clear.
>>>>>>>>>>
>>>>>>>>>> It is very feasible to add support that looks like
>>>>>>>>>>
>>>>>>>>>>   new DoFn<InputT, OutputT>() {
>>>>>>>>>>     @ProcessElement
>>>>>>>>>>     public void process(@Element CompletionStage<InputT> element,
>>>>>>>>>> ...) {
>>>>>>>>>>       element.thenApply(...)
>>>>>>>>>>     }
>>>>>>>>>>   }
>>>>>>>>>>
>>>>>>>>>> If we had this available, I think users could even experiment
>>>>>>>>>> with this often as it might help even where it isn't obvious.
>>>>>>>>>>
>>>>>>>>>> My main hesitation is that big part of Beam is giving a
>>>>>>>>>> basic/imperative style of programming a DoFn that executes in a very 
>>>>>>>>>> smart
>>>>>>>>>> functional/parallel way. Full future-oriented programming is not
>>>>>>>>>> explored much outside of Javascript (and maybe Haskell) and requires
>>>>>>>>>> greater discipline in programming in a functional manner - if you are
>>>>>>>>>> mutating stuff in your callback you are going to have bugs, and then 
>>>>>>>>>> when
>>>>>>>>>> you add concurrency control you are going to have bad performance and
>>>>>>>>>> deadlocks. So I definitely wouldn't make it the default or want to 
>>>>>>>>>> spend
>>>>>>>>>> all our support effort on teaching advanced programming technique.
>>>>>>>>>>
>>>>>>>>>> Kenn
>>>>>>>>>>
>>>>>>>>>> On Sat, Mar 10, 2018 at 9:31 AM Romain Manni-Bucau <
>>>>>>>>>> rmannibu...@gmail.com> wrote:
>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> 2018-03-10 17:30 GMT+01:00 Reuven Lax <re...@google.com>:
>>>>>>>>>>>
>>>>>>>>>>>> Have you considered drafting in detail what you think this API
>>>>>>>>>>>> might look like?
>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> Yes, but it is after the "enhancements" - for my use cases - and
>>>>>>>>>>> "bugs" list so didn't started to work on it much.
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> If it's a radically different API, it might be more appropriate
>>>>>>>>>>>> as an alternative parallel Beam API rather than a replacement for 
>>>>>>>>>>>> the
>>>>>>>>>>>> current API (there is also one such fluent API in the works).
>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> What I plan is to draft it on top of beam (so the "useless" case
>>>>>>>>>>> I spoke about before) and then propose to impl it ~natively and 
>>>>>>>>>>> move it as
>>>>>>>>>>> main API for another major.
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> On Sat, Mar 10, 2018 at 7:23 AM Romain Manni-Bucau <
>>>>>>>>>>>> rmannibu...@gmail.com> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>> 2018-03-10 16:19 GMT+01:00 Reuven Lax <re...@google.com>:
>>>>>>>>>>>>>
>>>>>>>>>>>>>> This is another version (maybe a better, Java 8 idiomatic
>>>>>>>>>>>>>> one?) of what Kenn suggested.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Note that with NewDoFn this need not be incompatible (so
>>>>>>>>>>>>>> might not require waiting till Beam 3.0). We can recognize new 
>>>>>>>>>>>>>> parameters
>>>>>>>>>>>>>> to processElement and populate add needed.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>> This is right however in my head it was a single way
>>>>>>>>>>>>> movemenent to enforce the design to be reactive and not fake a 
>>>>>>>>>>>>> reactive API
>>>>>>>>>>>>> with a sync and not reactive impl which is what would be done 
>>>>>>>>>>>>> today with
>>>>>>>>>>>>> both support I fear.
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> On Sat, Mar 10, 2018, 12:13 PM Romain Manni-Bucau <
>>>>>>>>>>>>>> rmannibu...@gmail.com> wrote:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Yes, for the dofn for instance, instead of having
>>>>>>>>>>>>>>> processcontext.element()=<T> you get a CompletionStage<T> and 
>>>>>>>>>>>>>>> output gets
>>>>>>>>>>>>>>> it as well.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> This way you register an execution chain. Mixed with streams
>>>>>>>>>>>>>>> you get a big data java 8/9/10 API which enabkes any 
>>>>>>>>>>>>>>> connectivity in a wel
>>>>>>>>>>>>>>> performing manner ;).
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Le 10 mars 2018 13:56, "Reuven Lax" <re...@google.com> a
>>>>>>>>>>>>>>> écrit :
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> So you mean the user should have a way of registering
>>>>>>>>>>>>>>>> asynchronous activity with a callback (the callback must be 
>>>>>>>>>>>>>>>> registered with
>>>>>>>>>>>>>>>> Beam, because Beam needs to know not to mark the element as 
>>>>>>>>>>>>>>>> done until all
>>>>>>>>>>>>>>>> associated callbacks have completed). I think that's basically 
>>>>>>>>>>>>>>>> what Kenn
>>>>>>>>>>>>>>>> was suggesting, unless I'm missing something.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> On Fri, Mar 9, 2018 at 11:07 PM Romain Manni-Bucau <
>>>>>>>>>>>>>>>> rmannibu...@gmail.com> wrote:
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Yes, callback based. Beam today is synchronous and until
>>>>>>>>>>>>>>>>> bundles+combines are reactive friendly, beam will be 
>>>>>>>>>>>>>>>>> synchronous whatever
>>>>>>>>>>>>>>>>> other parts do. Becoming reactive will enable to manage the 
>>>>>>>>>>>>>>>>> threading
>>>>>>>>>>>>>>>>> issues properly and to have better scalability on the overall 
>>>>>>>>>>>>>>>>> execution
>>>>>>>>>>>>>>>>> when remote IO are involved.
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> However it requires to break source, sdf design to use
>>>>>>>>>>>>>>>>> completionstage - or equivalent - to chain the processing 
>>>>>>>>>>>>>>>>> properly and in
>>>>>>>>>>>>>>>>> an unified fashion.
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Le 9 mars 2018 23:48, "Reuven Lax" <re...@google.com> a
>>>>>>>>>>>>>>>>> écrit :
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> If you're talking about reactive programming, at a certain
>>>>>>>>>>>>>>>>> level beam is already reactive. Are you referring to a 
>>>>>>>>>>>>>>>>> specific way of
>>>>>>>>>>>>>>>>> writing the code?
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> On Fri, Mar 9, 2018 at 1:59 PM Reuven Lax <
>>>>>>>>>>>>>>>>> re...@google.com> wrote:
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> What do you mean by reactive?
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> On Fri, Mar 9, 2018, 6:58 PM Romain Manni-Bucau <
>>>>>>>>>>>>>>>>>> rmannibu...@gmail.com> wrote:
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> @Kenn: why not preferring to make beam reactive? Would
>>>>>>>>>>>>>>>>>>> alow to scale way more without having to hardly synchronize 
>>>>>>>>>>>>>>>>>>> multithreading.
>>>>>>>>>>>>>>>>>>> Elegant and efficient :). Beam 3?
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> Le 9 mars 2018 22:49, "Kenneth Knowles" <k...@google.com>
>>>>>>>>>>>>>>>>>>> a écrit :
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> I will start with the "exciting futuristic" answer,
>>>>>>>>>>>>>>>>>>>> which is that we envision the new DoFn to be able to 
>>>>>>>>>>>>>>>>>>>> provide an automatic
>>>>>>>>>>>>>>>>>>>> ExecutorService parameters that you can use as you wish.
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>     new DoFn<>() {
>>>>>>>>>>>>>>>>>>>>       @ProcessElement
>>>>>>>>>>>>>>>>>>>>       public void process(ProcessContext ctx,
>>>>>>>>>>>>>>>>>>>> ExecutorService executorService) {
>>>>>>>>>>>>>>>>>>>>           ... launch some futures, put them in instance
>>>>>>>>>>>>>>>>>>>> vars ...
>>>>>>>>>>>>>>>>>>>>       }
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>       @FinishBundle
>>>>>>>>>>>>>>>>>>>>       public void finish(...) {
>>>>>>>>>>>>>>>>>>>>          ... block on futures, output results if
>>>>>>>>>>>>>>>>>>>> appropriate ...
>>>>>>>>>>>>>>>>>>>>       }
>>>>>>>>>>>>>>>>>>>>     }
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> This way, the Java SDK harness can use its overarching
>>>>>>>>>>>>>>>>>>>> knowledge of what is going on in a computation to, for 
>>>>>>>>>>>>>>>>>>>> example, share a
>>>>>>>>>>>>>>>>>>>> thread pool between different bits. This was one reason to 
>>>>>>>>>>>>>>>>>>>> delete
>>>>>>>>>>>>>>>>>>>> IntraBundleParallelization - it didn't allow the runner 
>>>>>>>>>>>>>>>>>>>> and user code to
>>>>>>>>>>>>>>>>>>>> properly manage how many things were going on 
>>>>>>>>>>>>>>>>>>>> concurrently. And mostly the
>>>>>>>>>>>>>>>>>>>> runner should own parallelizing to max out cores and what 
>>>>>>>>>>>>>>>>>>>> user code needs
>>>>>>>>>>>>>>>>>>>> is asynchrony hooks that can interact with that. However, 
>>>>>>>>>>>>>>>>>>>> this feature is
>>>>>>>>>>>>>>>>>>>> not thoroughly considered. TBD how much the harness itself 
>>>>>>>>>>>>>>>>>>>> manages blocking
>>>>>>>>>>>>>>>>>>>> on outstanding requests versus it being your 
>>>>>>>>>>>>>>>>>>>> responsibility in
>>>>>>>>>>>>>>>>>>>> FinishBundle, etc.
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> I haven't explored rolling your own here, if you are
>>>>>>>>>>>>>>>>>>>> willing to do the knob tuning to get the threading 
>>>>>>>>>>>>>>>>>>>> acceptable for your
>>>>>>>>>>>>>>>>>>>> particular use case. Perhaps someone else can weigh in.
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> Kenn
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> On Fri, Mar 9, 2018 at 1:38 PM Josh Ferge <
>>>>>>>>>>>>>>>>>>>> josh.fe...@bounceexchange.com> wrote:
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> Hello all:
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> Our team has a pipeline that make external network
>>>>>>>>>>>>>>>>>>>>> calls. These pipelines are currently super slow, and the 
>>>>>>>>>>>>>>>>>>>>> hypothesis is that
>>>>>>>>>>>>>>>>>>>>> they are slow because we are not threading for our 
>>>>>>>>>>>>>>>>>>>>> network calls. The
>>>>>>>>>>>>>>>>>>>>> github issue below provides some discussion around this:
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> https://github.com/apache/beam/pull/957
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> In beam 1.0, there was IntraBundleParallelization,
>>>>>>>>>>>>>>>>>>>>> which helped with this. However, this was removed because 
>>>>>>>>>>>>>>>>>>>>> it didn't comply
>>>>>>>>>>>>>>>>>>>>> with a few BEAM paradigms.
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> Questions going forward:
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> What is advised for jobs that make blocking network
>>>>>>>>>>>>>>>>>>>>> calls? It seems bundling the elements into groups of size 
>>>>>>>>>>>>>>>>>>>>> X prior to
>>>>>>>>>>>>>>>>>>>>> passing to the DoFn, and managing the threading within 
>>>>>>>>>>>>>>>>>>>>> the function might
>>>>>>>>>>>>>>>>>>>>> work. thoughts?
>>>>>>>>>>>>>>>>>>>>> Are these types of jobs even suitable for beam?
>>>>>>>>>>>>>>>>>>>>> Are there any plans to develop features that help with
>>>>>>>>>>>>>>>>>>>>> this?
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> Thanks
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>
>>>
>>
>

Reply via email to