No more but can try to gather some figures and compare it to beam dofn
overhead which should be at the same level or a bit more here since it is
never unwrapped whereas completionfuture is a raw code chain without beam
in the middle.

Le 12 mars 2018 18:18, "Lukasz Cwik" <lc...@google.com> a écrit :

> Do you have data that supports this?
>
> Note that in reality for something like passing an element between DoFns,
> the constant in o(1) actually matters. Decreasing SDK harness overhead is a
> good thing though.
>
> On Mon, Mar 12, 2018 at 10:14 AM, Romain Manni-Bucau <
> rmannibu...@gmail.com> wrote:
>
>> By itself just the overhead of instantiating a wrapper (so nothing with
>> the recent JVM GC improvement done for the stream/optional usages). After
>> if you use the chaining you have a light overhead but still o(1) you can
>> desire to skip when doing sync code but which will enable you to run way
>> faster doing IO/async code by optimizing the CPU usage properly when you
>> tune your slaves/workers. So tempted to summarize it as "has an overhead
>> allowing to not run slower". It doesn't prevent beam to still expose a
>> synchronous API collapse at evaluation time in a single fn which will give
>> you the best of both worlds.
>>
>>
>> Romain Manni-Bucau
>> @rmannibucau <https://twitter.com/rmannibucau> |  Blog
>> <https://rmannibucau.metawerx.net/> | Old Blog
>> <http://rmannibucau.wordpress.com> | Github
>> <https://github.com/rmannibucau> | LinkedIn
>> <https://www.linkedin.com/in/rmannibucau> | Book
>> <https://www.packtpub.com/application-development/java-ee-8-high-performance>
>>
>> 2018-03-12 18:08 GMT+01:00 Lukasz Cwik <lc...@google.com>:
>>
>>> It is expected that SDKs will have all their cores fully utilized by
>>> processing bundles in parallel and not by performing intrabundle
>>> parallelization. This allows for DoFns to be chained together via regular
>>> method calls because the overhead to pass a single element through all the
>>> DoFn's should be as minimal as possible
>>>
>>> What is the overhead of using a completion stage vs using a regular
>>> method call?
>>>
>>> On Sun, Mar 11, 2018 at 10:18 PM, Romain Manni-Bucau <
>>> rmannibu...@gmail.com> wrote:
>>>
>>>>
>>>>
>>>> Le 12 mars 2018 00:19, "Reuven Lax" <re...@google.com> a écrit :
>>>>
>>>>
>>>>
>>>>
>>>> On Sun, Mar 11, 2018 at 7:40 AM Romain Manni-Bucau <
>>>> rmannibu...@gmail.com> wrote:
>>>>
>>>>> makes me think beam should maybe do 2 internals changes before moving
>>>>> forward on (s)df API changes:
>>>>>
>>>>> 1. define a beam singleton per JVM (classloader hierarchy actually but
>>>>> you get the idea I think) which can be used to store things locally for
>>>>> reuse - see 2 for an example or metrics pusher work Etienne does could
>>>>> benefit from it too
>>>>>
>>>>
>>>> I think we do need something like this, but it needs to be a bit more
>>>> than a singleton per JVM. For one thing it needs to be at least per
>>>> pipeline within a JVM. You might run multiple tests in a single JVM, and it
>>>> should also be possible to run those tests in parallel without the static
>>>> state interfering with each other. I also think the state needs to be
>>>> addressable per step (i.e. a ParDo can look up its static state without
>>>> caring about static state belonging to another ParDo).
>>>>
>>>>
>>>> Agree but you can register in a singleton the pipeline (as "ref" not as
>>>> instance) and therefore hit the same need. +1 to have scopes (singleton,
>>>> pipeline, thread) but it still requires a single singleton to handle
>>>> serialization ;).
>>>>
>>>>
>>>>
>>>>
>>>> 2. define a SPI to load (s)dofn parameter provider instead of having an
>>>>> ArgProvider which provides everything which is supported. This way you can
>>>>> use any kind of parameter and the parameterproviders can use 1. to handle
>>>>> their own state. First impl of the parameterprovider SPI would be a) state
>>>>> b) timer c) reactive handlers and potentially user parameter providers
>>>>> (service like which can be singleton in the scope of a "JVM" thanks to 1).
>>>>>
>>>>>
>>>>> Romain Manni-Bucau
>>>>> @rmannibucau <https://twitter.com/rmannibucau> |  Blog
>>>>> <https://rmannibucau.metawerx.net/> | Old Blog
>>>>> <http://rmannibucau.wordpress.com> | Github
>>>>> <https://github.com/rmannibucau> | LinkedIn
>>>>> <https://www.linkedin.com/in/rmannibucau> | Book
>>>>> <https://www.packtpub.com/application-development/java-ee-8-high-performance>
>>>>>
>>>>> 2018-03-11 15:32 GMT+01:00 Reuven Lax <re...@google.com>:
>>>>>
>>>>>> Yep. Introduce OutputEmitter, and Process context no longer has much
>>>>>> use.
>>>>>>
>>>>>> On Sun, Mar 11, 2018, 11:19 AM Romain Manni-Bucau <
>>>>>> rmannibu...@gmail.com> wrote:
>>>>>>
>>>>>>> Which is still a key feature for sdf but agree it can be dropped for
>>>>>>> an outputemitter pattern and the dofn moved to a plain parameters 
>>>>>>> injection
>>>>>>> based pattern. Both (which completionstage) stays compatible :).
>>>>>>>
>>>>>>> Le 11 mars 2018 13:12, "Reuven Lax" <re...@google.com> a écrit :
>>>>>>>
>>>>>>>> I think process context should go away completely. At that point it
>>>>>>>> has little use except for a way to send output downstream.
>>>>>>>>
>>>>>>>> On Sun, Mar 11, 2018, 6:07 AM Romain Manni-Bucau <
>>>>>>>> rmannibu...@gmail.com> wrote:
>>>>>>>>
>>>>>>>>> Hmm, thinking out loud but completionstage should/could be
>>>>>>>>> extended to replace processcontext since it represents element and 
>>>>>>>>> output
>>>>>>>>> at the same time no?
>>>>>>>>>
>>>>>>>>> Le 11 mars 2018 00:57, "Kenneth Knowles" <k...@google.com> a
>>>>>>>>> écrit :
>>>>>>>>>
>>>>>>>>>> Yea, I think it could. But it is probably more readable to not
>>>>>>>>>> overload the term, plus certainly a bit simpler in implementation. So
>>>>>>>>>> perhaps @AsyncElement to make it very clear.
>>>>>>>>>>
>>>>>>>>>> Kenn
>>>>>>>>>>
>>>>>>>>>> On Sat, Mar 10, 2018 at 1:32 PM Reuven Lax <re...@google.com>
>>>>>>>>>> wrote:
>>>>>>>>>>
>>>>>>>>>>> Ken, can NewDoFn distinguish at generation time the difference
>>>>>>>>>>> between:
>>>>>>>>>>>
>>>>>>>>>>>     public void process(@Element CompletionStage<InputT>
>>>>>>>>>>> element, ...) {
>>>>>>>>>>>
>>>>>>>>>>> and
>>>>>>>>>>>
>>>>>>>>>>>     public void process(@Element Input element, ...) {
>>>>>>>>>>>
>>>>>>>>>>> If not, then we would probably need separate annotations....
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> On Sat, Mar 10, 2018 at 11:09 AM Kenneth Knowles <k...@google.com>
>>>>>>>>>>> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> Nice! I agree that providing a CompletionStage for chaining is
>>>>>>>>>>>> much better than an ExecutorService, and very clear.
>>>>>>>>>>>>
>>>>>>>>>>>> It is very feasible to add support that looks like
>>>>>>>>>>>>
>>>>>>>>>>>>   new DoFn<InputT, OutputT>() {
>>>>>>>>>>>>     @ProcessElement
>>>>>>>>>>>>     public void process(@Element CompletionStage<InputT>
>>>>>>>>>>>> element, ...) {
>>>>>>>>>>>>       element.thenApply(...)
>>>>>>>>>>>>     }
>>>>>>>>>>>>   }
>>>>>>>>>>>>
>>>>>>>>>>>> If we had this available, I think users could even experiment
>>>>>>>>>>>> with this often as it might help even where it isn't obvious.
>>>>>>>>>>>>
>>>>>>>>>>>> My main hesitation is that big part of Beam is giving a
>>>>>>>>>>>> basic/imperative style of programming a DoFn that executes in a 
>>>>>>>>>>>> very smart
>>>>>>>>>>>> functional/parallel way. Full future-oriented programming is
>>>>>>>>>>>> not explored much outside of Javascript (and maybe Haskell) and 
>>>>>>>>>>>> requires
>>>>>>>>>>>> greater discipline in programming in a functional manner - if you 
>>>>>>>>>>>> are
>>>>>>>>>>>> mutating stuff in your callback you are going to have bugs, and 
>>>>>>>>>>>> then when
>>>>>>>>>>>> you add concurrency control you are going to have bad performance 
>>>>>>>>>>>> and
>>>>>>>>>>>> deadlocks. So I definitely wouldn't make it the default or want to 
>>>>>>>>>>>> spend
>>>>>>>>>>>> all our support effort on teaching advanced programming technique.
>>>>>>>>>>>>
>>>>>>>>>>>> Kenn
>>>>>>>>>>>>
>>>>>>>>>>>> On Sat, Mar 10, 2018 at 9:31 AM Romain Manni-Bucau <
>>>>>>>>>>>> rmannibu...@gmail.com> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>> 2018-03-10 17:30 GMT+01:00 Reuven Lax <re...@google.com>:
>>>>>>>>>>>>>
>>>>>>>>>>>>>> Have you considered drafting in detail what you think this
>>>>>>>>>>>>>> API might look like?
>>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>> Yes, but it is after the "enhancements" - for my use cases -
>>>>>>>>>>>>> and "bugs" list so didn't started to work on it much.
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> If it's a radically different API, it might be more
>>>>>>>>>>>>>> appropriate as an alternative parallel Beam API rather than a 
>>>>>>>>>>>>>> replacement
>>>>>>>>>>>>>> for the current API (there is also one such fluent API in the 
>>>>>>>>>>>>>> works).
>>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>> What I plan is to draft it on top of beam (so the "useless"
>>>>>>>>>>>>> case I spoke about before) and then propose to impl it ~natively 
>>>>>>>>>>>>> and move
>>>>>>>>>>>>> it as main API for another major.
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> On Sat, Mar 10, 2018 at 7:23 AM Romain Manni-Bucau <
>>>>>>>>>>>>>> rmannibu...@gmail.com> wrote:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> 2018-03-10 16:19 GMT+01:00 Reuven Lax <re...@google.com>:
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> This is another version (maybe a better, Java 8 idiomatic
>>>>>>>>>>>>>>>> one?) of what Kenn suggested.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Note that with NewDoFn this need not be incompatible (so
>>>>>>>>>>>>>>>> might not require waiting till Beam 3.0). We can recognize new 
>>>>>>>>>>>>>>>> parameters
>>>>>>>>>>>>>>>> to processElement and populate add needed.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> This is right however in my head it was a single way
>>>>>>>>>>>>>>> movemenent to enforce the design to be reactive and not fake a 
>>>>>>>>>>>>>>> reactive API
>>>>>>>>>>>>>>> with a sync and not reactive impl which is what would be done 
>>>>>>>>>>>>>>> today with
>>>>>>>>>>>>>>> both support I fear.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> On Sat, Mar 10, 2018, 12:13 PM Romain Manni-Bucau <
>>>>>>>>>>>>>>>> rmannibu...@gmail.com> wrote:
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Yes, for the dofn for instance, instead of having
>>>>>>>>>>>>>>>>> processcontext.element()=<T> you get a CompletionStage<T> and 
>>>>>>>>>>>>>>>>> output gets
>>>>>>>>>>>>>>>>> it as well.
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> This way you register an execution chain. Mixed with
>>>>>>>>>>>>>>>>> streams you get a big data java 8/9/10 API which enabkes any 
>>>>>>>>>>>>>>>>> connectivity
>>>>>>>>>>>>>>>>> in a wel performing manner ;).
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Le 10 mars 2018 13:56, "Reuven Lax" <re...@google.com> a
>>>>>>>>>>>>>>>>> écrit :
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> So you mean the user should have a way of registering
>>>>>>>>>>>>>>>>>> asynchronous activity with a callback (the callback must be 
>>>>>>>>>>>>>>>>>> registered with
>>>>>>>>>>>>>>>>>> Beam, because Beam needs to know not to mark the element as 
>>>>>>>>>>>>>>>>>> done until all
>>>>>>>>>>>>>>>>>> associated callbacks have completed). I think that's 
>>>>>>>>>>>>>>>>>> basically what Kenn
>>>>>>>>>>>>>>>>>> was suggesting, unless I'm missing something.
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> On Fri, Mar 9, 2018 at 11:07 PM Romain Manni-Bucau <
>>>>>>>>>>>>>>>>>> rmannibu...@gmail.com> wrote:
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> Yes, callback based. Beam today is synchronous and until
>>>>>>>>>>>>>>>>>>> bundles+combines are reactive friendly, beam will be 
>>>>>>>>>>>>>>>>>>> synchronous whatever
>>>>>>>>>>>>>>>>>>> other parts do. Becoming reactive will enable to manage the 
>>>>>>>>>>>>>>>>>>> threading
>>>>>>>>>>>>>>>>>>> issues properly and to have better scalability on the 
>>>>>>>>>>>>>>>>>>> overall execution
>>>>>>>>>>>>>>>>>>> when remote IO are involved.
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> However it requires to break source, sdf design to use
>>>>>>>>>>>>>>>>>>> completionstage - or equivalent - to chain the processing 
>>>>>>>>>>>>>>>>>>> properly and in
>>>>>>>>>>>>>>>>>>> an unified fashion.
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> Le 9 mars 2018 23:48, "Reuven Lax" <re...@google.com> a
>>>>>>>>>>>>>>>>>>> écrit :
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> If you're talking about reactive programming, at a
>>>>>>>>>>>>>>>>>>> certain level beam is already reactive. Are you referring 
>>>>>>>>>>>>>>>>>>> to a specific way
>>>>>>>>>>>>>>>>>>> of writing the code?
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> On Fri, Mar 9, 2018 at 1:59 PM Reuven Lax <
>>>>>>>>>>>>>>>>>>> re...@google.com> wrote:
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> What do you mean by reactive?
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> On Fri, Mar 9, 2018, 6:58 PM Romain Manni-Bucau <
>>>>>>>>>>>>>>>>>>>> rmannibu...@gmail.com> wrote:
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> @Kenn: why not preferring to make beam reactive? Would
>>>>>>>>>>>>>>>>>>>>> alow to scale way more without having to hardly 
>>>>>>>>>>>>>>>>>>>>> synchronize multithreading.
>>>>>>>>>>>>>>>>>>>>> Elegant and efficient :). Beam 3?
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> Le 9 mars 2018 22:49, "Kenneth Knowles" <
>>>>>>>>>>>>>>>>>>>>> k...@google.com> a écrit :
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>> I will start with the "exciting futuristic" answer,
>>>>>>>>>>>>>>>>>>>>>> which is that we envision the new DoFn to be able to 
>>>>>>>>>>>>>>>>>>>>>> provide an automatic
>>>>>>>>>>>>>>>>>>>>>> ExecutorService parameters that you can use as you wish.
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>     new DoFn<>() {
>>>>>>>>>>>>>>>>>>>>>>       @ProcessElement
>>>>>>>>>>>>>>>>>>>>>>       public void process(ProcessContext ctx,
>>>>>>>>>>>>>>>>>>>>>> ExecutorService executorService) {
>>>>>>>>>>>>>>>>>>>>>>           ... launch some futures, put them in
>>>>>>>>>>>>>>>>>>>>>> instance vars ...
>>>>>>>>>>>>>>>>>>>>>>       }
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>       @FinishBundle
>>>>>>>>>>>>>>>>>>>>>>       public void finish(...) {
>>>>>>>>>>>>>>>>>>>>>>          ... block on futures, output results if
>>>>>>>>>>>>>>>>>>>>>> appropriate ...
>>>>>>>>>>>>>>>>>>>>>>       }
>>>>>>>>>>>>>>>>>>>>>>     }
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>> This way, the Java SDK harness can use its
>>>>>>>>>>>>>>>>>>>>>> overarching knowledge of what is going on in a 
>>>>>>>>>>>>>>>>>>>>>> computation to, for example,
>>>>>>>>>>>>>>>>>>>>>> share a thread pool between different bits. This was one 
>>>>>>>>>>>>>>>>>>>>>> reason to delete
>>>>>>>>>>>>>>>>>>>>>> IntraBundleParallelization - it didn't allow the runner 
>>>>>>>>>>>>>>>>>>>>>> and user code to
>>>>>>>>>>>>>>>>>>>>>> properly manage how many things were going on 
>>>>>>>>>>>>>>>>>>>>>> concurrently. And mostly the
>>>>>>>>>>>>>>>>>>>>>> runner should own parallelizing to max out cores and 
>>>>>>>>>>>>>>>>>>>>>> what user code needs
>>>>>>>>>>>>>>>>>>>>>> is asynchrony hooks that can interact with that. 
>>>>>>>>>>>>>>>>>>>>>> However, this feature is
>>>>>>>>>>>>>>>>>>>>>> not thoroughly considered. TBD how much the harness 
>>>>>>>>>>>>>>>>>>>>>> itself manages blocking
>>>>>>>>>>>>>>>>>>>>>> on outstanding requests versus it being your 
>>>>>>>>>>>>>>>>>>>>>> responsibility in
>>>>>>>>>>>>>>>>>>>>>> FinishBundle, etc.
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>> I haven't explored rolling your own here, if you are
>>>>>>>>>>>>>>>>>>>>>> willing to do the knob tuning to get the threading 
>>>>>>>>>>>>>>>>>>>>>> acceptable for your
>>>>>>>>>>>>>>>>>>>>>> particular use case. Perhaps someone else can weigh in.
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>> Kenn
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>> On Fri, Mar 9, 2018 at 1:38 PM Josh Ferge <
>>>>>>>>>>>>>>>>>>>>>> josh.fe...@bounceexchange.com> wrote:
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>> Hello all:
>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>> Our team has a pipeline that make external network
>>>>>>>>>>>>>>>>>>>>>>> calls. These pipelines are currently super slow, and 
>>>>>>>>>>>>>>>>>>>>>>> the hypothesis is that
>>>>>>>>>>>>>>>>>>>>>>> they are slow because we are not threading for our 
>>>>>>>>>>>>>>>>>>>>>>> network calls. The
>>>>>>>>>>>>>>>>>>>>>>> github issue below provides some discussion around this:
>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>> https://github.com/apache/beam/pull/957
>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>> In beam 1.0, there was IntraBundleParallelization,
>>>>>>>>>>>>>>>>>>>>>>> which helped with this. However, this was removed 
>>>>>>>>>>>>>>>>>>>>>>> because it didn't comply
>>>>>>>>>>>>>>>>>>>>>>> with a few BEAM paradigms.
>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>> Questions going forward:
>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>> What is advised for jobs that make blocking network
>>>>>>>>>>>>>>>>>>>>>>> calls? It seems bundling the elements into groups of 
>>>>>>>>>>>>>>>>>>>>>>> size X prior to
>>>>>>>>>>>>>>>>>>>>>>> passing to the DoFn, and managing the threading within 
>>>>>>>>>>>>>>>>>>>>>>> the function might
>>>>>>>>>>>>>>>>>>>>>>> work. thoughts?
>>>>>>>>>>>>>>>>>>>>>>> Are these types of jobs even suitable for beam?
>>>>>>>>>>>>>>>>>>>>>>> Are there any plans to develop features that help
>>>>>>>>>>>>>>>>>>>>>>> with this?
>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>> Thanks
>>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>
>>>>
>>>
>>
>

Reply via email to