By itself just the overhead of instantiating a wrapper (so nothing with the recent JVM GC improvement done for the stream/optional usages). After if you use the chaining you have a light overhead but still o(1) you can desire to skip when doing sync code but which will enable you to run way faster doing IO/async code by optimizing the CPU usage properly when you tune your slaves/workers. So tempted to summarize it as "has an overhead allowing to not run slower". It doesn't prevent beam to still expose a synchronous API collapse at evaluation time in a single fn which will give you the best of both worlds.
Romain Manni-Bucau @rmannibucau <https://twitter.com/rmannibucau> | Blog <https://rmannibucau.metawerx.net/> | Old Blog <http://rmannibucau.wordpress.com> | Github <https://github.com/rmannibucau> | LinkedIn <https://www.linkedin.com/in/rmannibucau> | Book <https://www.packtpub.com/application-development/java-ee-8-high-performance> 2018-03-12 18:08 GMT+01:00 Lukasz Cwik <lc...@google.com>: > It is expected that SDKs will have all their cores fully utilized by > processing bundles in parallel and not by performing intrabundle > parallelization. This allows for DoFns to be chained together via regular > method calls because the overhead to pass a single element through all the > DoFn's should be as minimal as possible > > What is the overhead of using a completion stage vs using a regular method > call? > > On Sun, Mar 11, 2018 at 10:18 PM, Romain Manni-Bucau < > rmannibu...@gmail.com> wrote: > >> >> >> Le 12 mars 2018 00:19, "Reuven Lax" <re...@google.com> a écrit : >> >> >> >> >> On Sun, Mar 11, 2018 at 7:40 AM Romain Manni-Bucau <rmannibu...@gmail.com> >> wrote: >> >>> makes me think beam should maybe do 2 internals changes before moving >>> forward on (s)df API changes: >>> >>> 1. define a beam singleton per JVM (classloader hierarchy actually but >>> you get the idea I think) which can be used to store things locally for >>> reuse - see 2 for an example or metrics pusher work Etienne does could >>> benefit from it too >>> >> >> I think we do need something like this, but it needs to be a bit more >> than a singleton per JVM. For one thing it needs to be at least per >> pipeline within a JVM. You might run multiple tests in a single JVM, and it >> should also be possible to run those tests in parallel without the static >> state interfering with each other. I also think the state needs to be >> addressable per step (i.e. a ParDo can look up its static state without >> caring about static state belonging to another ParDo). >> >> >> Agree but you can register in a singleton the pipeline (as "ref" not as >> instance) and therefore hit the same need. +1 to have scopes (singleton, >> pipeline, thread) but it still requires a single singleton to handle >> serialization ;). >> >> >> >> >> 2. define a SPI to load (s)dofn parameter provider instead of having an >>> ArgProvider which provides everything which is supported. This way you can >>> use any kind of parameter and the parameterproviders can use 1. to handle >>> their own state. First impl of the parameterprovider SPI would be a) state >>> b) timer c) reactive handlers and potentially user parameter providers >>> (service like which can be singleton in the scope of a "JVM" thanks to 1). >>> >>> >>> Romain Manni-Bucau >>> @rmannibucau <https://twitter.com/rmannibucau> | Blog >>> <https://rmannibucau.metawerx.net/> | Old Blog >>> <http://rmannibucau.wordpress.com> | Github >>> <https://github.com/rmannibucau> | LinkedIn >>> <https://www.linkedin.com/in/rmannibucau> | Book >>> <https://www.packtpub.com/application-development/java-ee-8-high-performance> >>> >>> 2018-03-11 15:32 GMT+01:00 Reuven Lax <re...@google.com>: >>> >>>> Yep. Introduce OutputEmitter, and Process context no longer has much >>>> use. >>>> >>>> On Sun, Mar 11, 2018, 11:19 AM Romain Manni-Bucau < >>>> rmannibu...@gmail.com> wrote: >>>> >>>>> Which is still a key feature for sdf but agree it can be dropped for >>>>> an outputemitter pattern and the dofn moved to a plain parameters >>>>> injection >>>>> based pattern. Both (which completionstage) stays compatible :). >>>>> >>>>> Le 11 mars 2018 13:12, "Reuven Lax" <re...@google.com> a écrit : >>>>> >>>>>> I think process context should go away completely. At that point it >>>>>> has little use except for a way to send output downstream. >>>>>> >>>>>> On Sun, Mar 11, 2018, 6:07 AM Romain Manni-Bucau < >>>>>> rmannibu...@gmail.com> wrote: >>>>>> >>>>>>> Hmm, thinking out loud but completionstage should/could be extended >>>>>>> to replace processcontext since it represents element and output at the >>>>>>> same time no? >>>>>>> >>>>>>> Le 11 mars 2018 00:57, "Kenneth Knowles" <k...@google.com> a écrit : >>>>>>> >>>>>>>> Yea, I think it could. But it is probably more readable to not >>>>>>>> overload the term, plus certainly a bit simpler in implementation. So >>>>>>>> perhaps @AsyncElement to make it very clear. >>>>>>>> >>>>>>>> Kenn >>>>>>>> >>>>>>>> On Sat, Mar 10, 2018 at 1:32 PM Reuven Lax <re...@google.com> >>>>>>>> wrote: >>>>>>>> >>>>>>>>> Ken, can NewDoFn distinguish at generation time the difference >>>>>>>>> between: >>>>>>>>> >>>>>>>>> public void process(@Element CompletionStage<InputT> element, >>>>>>>>> ...) { >>>>>>>>> >>>>>>>>> and >>>>>>>>> >>>>>>>>> public void process(@Element Input element, ...) { >>>>>>>>> >>>>>>>>> If not, then we would probably need separate annotations.... >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> On Sat, Mar 10, 2018 at 11:09 AM Kenneth Knowles <k...@google.com> >>>>>>>>> wrote: >>>>>>>>> >>>>>>>>>> Nice! I agree that providing a CompletionStage for chaining is >>>>>>>>>> much better than an ExecutorService, and very clear. >>>>>>>>>> >>>>>>>>>> It is very feasible to add support that looks like >>>>>>>>>> >>>>>>>>>> new DoFn<InputT, OutputT>() { >>>>>>>>>> @ProcessElement >>>>>>>>>> public void process(@Element CompletionStage<InputT> element, >>>>>>>>>> ...) { >>>>>>>>>> element.thenApply(...) >>>>>>>>>> } >>>>>>>>>> } >>>>>>>>>> >>>>>>>>>> If we had this available, I think users could even experiment >>>>>>>>>> with this often as it might help even where it isn't obvious. >>>>>>>>>> >>>>>>>>>> My main hesitation is that big part of Beam is giving a >>>>>>>>>> basic/imperative style of programming a DoFn that executes in a very >>>>>>>>>> smart >>>>>>>>>> functional/parallel way. Full future-oriented programming is not >>>>>>>>>> explored much outside of Javascript (and maybe Haskell) and requires >>>>>>>>>> greater discipline in programming in a functional manner - if you are >>>>>>>>>> mutating stuff in your callback you are going to have bugs, and then >>>>>>>>>> when >>>>>>>>>> you add concurrency control you are going to have bad performance and >>>>>>>>>> deadlocks. So I definitely wouldn't make it the default or want to >>>>>>>>>> spend >>>>>>>>>> all our support effort on teaching advanced programming technique. >>>>>>>>>> >>>>>>>>>> Kenn >>>>>>>>>> >>>>>>>>>> On Sat, Mar 10, 2018 at 9:31 AM Romain Manni-Bucau < >>>>>>>>>> rmannibu...@gmail.com> wrote: >>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> 2018-03-10 17:30 GMT+01:00 Reuven Lax <re...@google.com>: >>>>>>>>>>> >>>>>>>>>>>> Have you considered drafting in detail what you think this API >>>>>>>>>>>> might look like? >>>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> Yes, but it is after the "enhancements" - for my use cases - and >>>>>>>>>>> "bugs" list so didn't started to work on it much. >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> If it's a radically different API, it might be more appropriate >>>>>>>>>>>> as an alternative parallel Beam API rather than a replacement for >>>>>>>>>>>> the >>>>>>>>>>>> current API (there is also one such fluent API in the works). >>>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> What I plan is to draft it on top of beam (so the "useless" case >>>>>>>>>>> I spoke about before) and then propose to impl it ~natively and >>>>>>>>>>> move it as >>>>>>>>>>> main API for another major. >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> On Sat, Mar 10, 2018 at 7:23 AM Romain Manni-Bucau < >>>>>>>>>>>> rmannibu...@gmail.com> wrote: >>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> 2018-03-10 16:19 GMT+01:00 Reuven Lax <re...@google.com>: >>>>>>>>>>>>> >>>>>>>>>>>>>> This is another version (maybe a better, Java 8 idiomatic >>>>>>>>>>>>>> one?) of what Kenn suggested. >>>>>>>>>>>>>> >>>>>>>>>>>>>> Note that with NewDoFn this need not be incompatible (so >>>>>>>>>>>>>> might not require waiting till Beam 3.0). We can recognize new >>>>>>>>>>>>>> parameters >>>>>>>>>>>>>> to processElement and populate add needed. >>>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> This is right however in my head it was a single way >>>>>>>>>>>>> movemenent to enforce the design to be reactive and not fake a >>>>>>>>>>>>> reactive API >>>>>>>>>>>>> with a sync and not reactive impl which is what would be done >>>>>>>>>>>>> today with >>>>>>>>>>>>> both support I fear. >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> On Sat, Mar 10, 2018, 12:13 PM Romain Manni-Bucau < >>>>>>>>>>>>>> rmannibu...@gmail.com> wrote: >>>>>>>>>>>>>> >>>>>>>>>>>>>>> Yes, for the dofn for instance, instead of having >>>>>>>>>>>>>>> processcontext.element()=<T> you get a CompletionStage<T> and >>>>>>>>>>>>>>> output gets >>>>>>>>>>>>>>> it as well. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> This way you register an execution chain. Mixed with streams >>>>>>>>>>>>>>> you get a big data java 8/9/10 API which enabkes any >>>>>>>>>>>>>>> connectivity in a wel >>>>>>>>>>>>>>> performing manner ;). >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> Le 10 mars 2018 13:56, "Reuven Lax" <re...@google.com> a >>>>>>>>>>>>>>> écrit : >>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> So you mean the user should have a way of registering >>>>>>>>>>>>>>>> asynchronous activity with a callback (the callback must be >>>>>>>>>>>>>>>> registered with >>>>>>>>>>>>>>>> Beam, because Beam needs to know not to mark the element as >>>>>>>>>>>>>>>> done until all >>>>>>>>>>>>>>>> associated callbacks have completed). I think that's basically >>>>>>>>>>>>>>>> what Kenn >>>>>>>>>>>>>>>> was suggesting, unless I'm missing something. >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> On Fri, Mar 9, 2018 at 11:07 PM Romain Manni-Bucau < >>>>>>>>>>>>>>>> rmannibu...@gmail.com> wrote: >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> Yes, callback based. Beam today is synchronous and until >>>>>>>>>>>>>>>>> bundles+combines are reactive friendly, beam will be >>>>>>>>>>>>>>>>> synchronous whatever >>>>>>>>>>>>>>>>> other parts do. Becoming reactive will enable to manage the >>>>>>>>>>>>>>>>> threading >>>>>>>>>>>>>>>>> issues properly and to have better scalability on the overall >>>>>>>>>>>>>>>>> execution >>>>>>>>>>>>>>>>> when remote IO are involved. >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> However it requires to break source, sdf design to use >>>>>>>>>>>>>>>>> completionstage - or equivalent - to chain the processing >>>>>>>>>>>>>>>>> properly and in >>>>>>>>>>>>>>>>> an unified fashion. >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> Le 9 mars 2018 23:48, "Reuven Lax" <re...@google.com> a >>>>>>>>>>>>>>>>> écrit : >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> If you're talking about reactive programming, at a certain >>>>>>>>>>>>>>>>> level beam is already reactive. Are you referring to a >>>>>>>>>>>>>>>>> specific way of >>>>>>>>>>>>>>>>> writing the code? >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> On Fri, Mar 9, 2018 at 1:59 PM Reuven Lax < >>>>>>>>>>>>>>>>> re...@google.com> wrote: >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> What do you mean by reactive? >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> On Fri, Mar 9, 2018, 6:58 PM Romain Manni-Bucau < >>>>>>>>>>>>>>>>>> rmannibu...@gmail.com> wrote: >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> @Kenn: why not preferring to make beam reactive? Would >>>>>>>>>>>>>>>>>>> alow to scale way more without having to hardly synchronize >>>>>>>>>>>>>>>>>>> multithreading. >>>>>>>>>>>>>>>>>>> Elegant and efficient :). Beam 3? >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> Le 9 mars 2018 22:49, "Kenneth Knowles" <k...@google.com> >>>>>>>>>>>>>>>>>>> a écrit : >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> I will start with the "exciting futuristic" answer, >>>>>>>>>>>>>>>>>>>> which is that we envision the new DoFn to be able to >>>>>>>>>>>>>>>>>>>> provide an automatic >>>>>>>>>>>>>>>>>>>> ExecutorService parameters that you can use as you wish. >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> new DoFn<>() { >>>>>>>>>>>>>>>>>>>> @ProcessElement >>>>>>>>>>>>>>>>>>>> public void process(ProcessContext ctx, >>>>>>>>>>>>>>>>>>>> ExecutorService executorService) { >>>>>>>>>>>>>>>>>>>> ... launch some futures, put them in instance >>>>>>>>>>>>>>>>>>>> vars ... >>>>>>>>>>>>>>>>>>>> } >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> @FinishBundle >>>>>>>>>>>>>>>>>>>> public void finish(...) { >>>>>>>>>>>>>>>>>>>> ... block on futures, output results if >>>>>>>>>>>>>>>>>>>> appropriate ... >>>>>>>>>>>>>>>>>>>> } >>>>>>>>>>>>>>>>>>>> } >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> This way, the Java SDK harness can use its overarching >>>>>>>>>>>>>>>>>>>> knowledge of what is going on in a computation to, for >>>>>>>>>>>>>>>>>>>> example, share a >>>>>>>>>>>>>>>>>>>> thread pool between different bits. This was one reason to >>>>>>>>>>>>>>>>>>>> delete >>>>>>>>>>>>>>>>>>>> IntraBundleParallelization - it didn't allow the runner >>>>>>>>>>>>>>>>>>>> and user code to >>>>>>>>>>>>>>>>>>>> properly manage how many things were going on >>>>>>>>>>>>>>>>>>>> concurrently. And mostly the >>>>>>>>>>>>>>>>>>>> runner should own parallelizing to max out cores and what >>>>>>>>>>>>>>>>>>>> user code needs >>>>>>>>>>>>>>>>>>>> is asynchrony hooks that can interact with that. However, >>>>>>>>>>>>>>>>>>>> this feature is >>>>>>>>>>>>>>>>>>>> not thoroughly considered. TBD how much the harness itself >>>>>>>>>>>>>>>>>>>> manages blocking >>>>>>>>>>>>>>>>>>>> on outstanding requests versus it being your >>>>>>>>>>>>>>>>>>>> responsibility in >>>>>>>>>>>>>>>>>>>> FinishBundle, etc. >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> I haven't explored rolling your own here, if you are >>>>>>>>>>>>>>>>>>>> willing to do the knob tuning to get the threading >>>>>>>>>>>>>>>>>>>> acceptable for your >>>>>>>>>>>>>>>>>>>> particular use case. Perhaps someone else can weigh in. >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> Kenn >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> On Fri, Mar 9, 2018 at 1:38 PM Josh Ferge < >>>>>>>>>>>>>>>>>>>> josh.fe...@bounceexchange.com> wrote: >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> Hello all: >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> Our team has a pipeline that make external network >>>>>>>>>>>>>>>>>>>>> calls. These pipelines are currently super slow, and the >>>>>>>>>>>>>>>>>>>>> hypothesis is that >>>>>>>>>>>>>>>>>>>>> they are slow because we are not threading for our >>>>>>>>>>>>>>>>>>>>> network calls. The >>>>>>>>>>>>>>>>>>>>> github issue below provides some discussion around this: >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> https://github.com/apache/beam/pull/957 >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> In beam 1.0, there was IntraBundleParallelization, >>>>>>>>>>>>>>>>>>>>> which helped with this. However, this was removed because >>>>>>>>>>>>>>>>>>>>> it didn't comply >>>>>>>>>>>>>>>>>>>>> with a few BEAM paradigms. >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> Questions going forward: >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> What is advised for jobs that make blocking network >>>>>>>>>>>>>>>>>>>>> calls? It seems bundling the elements into groups of size >>>>>>>>>>>>>>>>>>>>> X prior to >>>>>>>>>>>>>>>>>>>>> passing to the DoFn, and managing the threading within >>>>>>>>>>>>>>>>>>>>> the function might >>>>>>>>>>>>>>>>>>>>> work. thoughts? >>>>>>>>>>>>>>>>>>>>> Are these types of jobs even suitable for beam? >>>>>>>>>>>>>>>>>>>>> Are there any plans to develop features that help with >>>>>>>>>>>>>>>>>>>>> this? >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> Thanks >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>> >>> >> >