Do you have data that supports this? Note that in reality for something like passing an element between DoFns, the constant in o(1) actually matters. Decreasing SDK harness overhead is a good thing though.
On Mon, Mar 12, 2018 at 10:14 AM, Romain Manni-Bucau <rmannibu...@gmail.com> wrote: > By itself just the overhead of instantiating a wrapper (so nothing with > the recent JVM GC improvement done for the stream/optional usages). After > if you use the chaining you have a light overhead but still o(1) you can > desire to skip when doing sync code but which will enable you to run way > faster doing IO/async code by optimizing the CPU usage properly when you > tune your slaves/workers. So tempted to summarize it as "has an overhead > allowing to not run slower". It doesn't prevent beam to still expose a > synchronous API collapse at evaluation time in a single fn which will give > you the best of both worlds. > > > Romain Manni-Bucau > @rmannibucau <https://twitter.com/rmannibucau> | Blog > <https://rmannibucau.metawerx.net/> | Old Blog > <http://rmannibucau.wordpress.com> | Github > <https://github.com/rmannibucau> | LinkedIn > <https://www.linkedin.com/in/rmannibucau> | Book > <https://www.packtpub.com/application-development/java-ee-8-high-performance> > > 2018-03-12 18:08 GMT+01:00 Lukasz Cwik <lc...@google.com>: > >> It is expected that SDKs will have all their cores fully utilized by >> processing bundles in parallel and not by performing intrabundle >> parallelization. This allows for DoFns to be chained together via regular >> method calls because the overhead to pass a single element through all the >> DoFn's should be as minimal as possible >> >> What is the overhead of using a completion stage vs using a regular >> method call? >> >> On Sun, Mar 11, 2018 at 10:18 PM, Romain Manni-Bucau < >> rmannibu...@gmail.com> wrote: >> >>> >>> >>> Le 12 mars 2018 00:19, "Reuven Lax" <re...@google.com> a écrit : >>> >>> >>> >>> >>> On Sun, Mar 11, 2018 at 7:40 AM Romain Manni-Bucau < >>> rmannibu...@gmail.com> wrote: >>> >>>> makes me think beam should maybe do 2 internals changes before moving >>>> forward on (s)df API changes: >>>> >>>> 1. define a beam singleton per JVM (classloader hierarchy actually but >>>> you get the idea I think) which can be used to store things locally for >>>> reuse - see 2 for an example or metrics pusher work Etienne does could >>>> benefit from it too >>>> >>> >>> I think we do need something like this, but it needs to be a bit more >>> than a singleton per JVM. For one thing it needs to be at least per >>> pipeline within a JVM. You might run multiple tests in a single JVM, and it >>> should also be possible to run those tests in parallel without the static >>> state interfering with each other. I also think the state needs to be >>> addressable per step (i.e. a ParDo can look up its static state without >>> caring about static state belonging to another ParDo). >>> >>> >>> Agree but you can register in a singleton the pipeline (as "ref" not as >>> instance) and therefore hit the same need. +1 to have scopes (singleton, >>> pipeline, thread) but it still requires a single singleton to handle >>> serialization ;). >>> >>> >>> >>> >>> 2. define a SPI to load (s)dofn parameter provider instead of having an >>>> ArgProvider which provides everything which is supported. This way you can >>>> use any kind of parameter and the parameterproviders can use 1. to handle >>>> their own state. First impl of the parameterprovider SPI would be a) state >>>> b) timer c) reactive handlers and potentially user parameter providers >>>> (service like which can be singleton in the scope of a "JVM" thanks to 1). >>>> >>>> >>>> Romain Manni-Bucau >>>> @rmannibucau <https://twitter.com/rmannibucau> | Blog >>>> <https://rmannibucau.metawerx.net/> | Old Blog >>>> <http://rmannibucau.wordpress.com> | Github >>>> <https://github.com/rmannibucau> | LinkedIn >>>> <https://www.linkedin.com/in/rmannibucau> | Book >>>> <https://www.packtpub.com/application-development/java-ee-8-high-performance> >>>> >>>> 2018-03-11 15:32 GMT+01:00 Reuven Lax <re...@google.com>: >>>> >>>>> Yep. Introduce OutputEmitter, and Process context no longer has much >>>>> use. >>>>> >>>>> On Sun, Mar 11, 2018, 11:19 AM Romain Manni-Bucau < >>>>> rmannibu...@gmail.com> wrote: >>>>> >>>>>> Which is still a key feature for sdf but agree it can be dropped for >>>>>> an outputemitter pattern and the dofn moved to a plain parameters >>>>>> injection >>>>>> based pattern. Both (which completionstage) stays compatible :). >>>>>> >>>>>> Le 11 mars 2018 13:12, "Reuven Lax" <re...@google.com> a écrit : >>>>>> >>>>>>> I think process context should go away completely. At that point it >>>>>>> has little use except for a way to send output downstream. >>>>>>> >>>>>>> On Sun, Mar 11, 2018, 6:07 AM Romain Manni-Bucau < >>>>>>> rmannibu...@gmail.com> wrote: >>>>>>> >>>>>>>> Hmm, thinking out loud but completionstage should/could be extended >>>>>>>> to replace processcontext since it represents element and output at the >>>>>>>> same time no? >>>>>>>> >>>>>>>> Le 11 mars 2018 00:57, "Kenneth Knowles" <k...@google.com> a écrit : >>>>>>>> >>>>>>>>> Yea, I think it could. But it is probably more readable to not >>>>>>>>> overload the term, plus certainly a bit simpler in implementation. So >>>>>>>>> perhaps @AsyncElement to make it very clear. >>>>>>>>> >>>>>>>>> Kenn >>>>>>>>> >>>>>>>>> On Sat, Mar 10, 2018 at 1:32 PM Reuven Lax <re...@google.com> >>>>>>>>> wrote: >>>>>>>>> >>>>>>>>>> Ken, can NewDoFn distinguish at generation time the difference >>>>>>>>>> between: >>>>>>>>>> >>>>>>>>>> public void process(@Element CompletionStage<InputT> element, >>>>>>>>>> ...) { >>>>>>>>>> >>>>>>>>>> and >>>>>>>>>> >>>>>>>>>> public void process(@Element Input element, ...) { >>>>>>>>>> >>>>>>>>>> If not, then we would probably need separate annotations.... >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> On Sat, Mar 10, 2018 at 11:09 AM Kenneth Knowles <k...@google.com> >>>>>>>>>> wrote: >>>>>>>>>> >>>>>>>>>>> Nice! I agree that providing a CompletionStage for chaining is >>>>>>>>>>> much better than an ExecutorService, and very clear. >>>>>>>>>>> >>>>>>>>>>> It is very feasible to add support that looks like >>>>>>>>>>> >>>>>>>>>>> new DoFn<InputT, OutputT>() { >>>>>>>>>>> @ProcessElement >>>>>>>>>>> public void process(@Element CompletionStage<InputT> >>>>>>>>>>> element, ...) { >>>>>>>>>>> element.thenApply(...) >>>>>>>>>>> } >>>>>>>>>>> } >>>>>>>>>>> >>>>>>>>>>> If we had this available, I think users could even experiment >>>>>>>>>>> with this often as it might help even where it isn't obvious. >>>>>>>>>>> >>>>>>>>>>> My main hesitation is that big part of Beam is giving a >>>>>>>>>>> basic/imperative style of programming a DoFn that executes in a >>>>>>>>>>> very smart >>>>>>>>>>> functional/parallel way. Full future-oriented programming is >>>>>>>>>>> not explored much outside of Javascript (and maybe Haskell) and >>>>>>>>>>> requires >>>>>>>>>>> greater discipline in programming in a functional manner - if you >>>>>>>>>>> are >>>>>>>>>>> mutating stuff in your callback you are going to have bugs, and >>>>>>>>>>> then when >>>>>>>>>>> you add concurrency control you are going to have bad performance >>>>>>>>>>> and >>>>>>>>>>> deadlocks. So I definitely wouldn't make it the default or want to >>>>>>>>>>> spend >>>>>>>>>>> all our support effort on teaching advanced programming technique. >>>>>>>>>>> >>>>>>>>>>> Kenn >>>>>>>>>>> >>>>>>>>>>> On Sat, Mar 10, 2018 at 9:31 AM Romain Manni-Bucau < >>>>>>>>>>> rmannibu...@gmail.com> wrote: >>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> 2018-03-10 17:30 GMT+01:00 Reuven Lax <re...@google.com>: >>>>>>>>>>>> >>>>>>>>>>>>> Have you considered drafting in detail what you think this API >>>>>>>>>>>>> might look like? >>>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> Yes, but it is after the "enhancements" - for my use cases - >>>>>>>>>>>> and "bugs" list so didn't started to work on it much. >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> If it's a radically different API, it might be more >>>>>>>>>>>>> appropriate as an alternative parallel Beam API rather than a >>>>>>>>>>>>> replacement >>>>>>>>>>>>> for the current API (there is also one such fluent API in the >>>>>>>>>>>>> works). >>>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> What I plan is to draft it on top of beam (so the "useless" >>>>>>>>>>>> case I spoke about before) and then propose to impl it ~natively >>>>>>>>>>>> and move >>>>>>>>>>>> it as main API for another major. >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> On Sat, Mar 10, 2018 at 7:23 AM Romain Manni-Bucau < >>>>>>>>>>>>> rmannibu...@gmail.com> wrote: >>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> 2018-03-10 16:19 GMT+01:00 Reuven Lax <re...@google.com>: >>>>>>>>>>>>>> >>>>>>>>>>>>>>> This is another version (maybe a better, Java 8 idiomatic >>>>>>>>>>>>>>> one?) of what Kenn suggested. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> Note that with NewDoFn this need not be incompatible (so >>>>>>>>>>>>>>> might not require waiting till Beam 3.0). We can recognize new >>>>>>>>>>>>>>> parameters >>>>>>>>>>>>>>> to processElement and populate add needed. >>>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> This is right however in my head it was a single way >>>>>>>>>>>>>> movemenent to enforce the design to be reactive and not fake a >>>>>>>>>>>>>> reactive API >>>>>>>>>>>>>> with a sync and not reactive impl which is what would be done >>>>>>>>>>>>>> today with >>>>>>>>>>>>>> both support I fear. >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> On Sat, Mar 10, 2018, 12:13 PM Romain Manni-Bucau < >>>>>>>>>>>>>>> rmannibu...@gmail.com> wrote: >>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> Yes, for the dofn for instance, instead of having >>>>>>>>>>>>>>>> processcontext.element()=<T> you get a CompletionStage<T> and >>>>>>>>>>>>>>>> output gets >>>>>>>>>>>>>>>> it as well. >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> This way you register an execution chain. Mixed with >>>>>>>>>>>>>>>> streams you get a big data java 8/9/10 API which enabkes any >>>>>>>>>>>>>>>> connectivity >>>>>>>>>>>>>>>> in a wel performing manner ;). >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> Le 10 mars 2018 13:56, "Reuven Lax" <re...@google.com> a >>>>>>>>>>>>>>>> écrit : >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> So you mean the user should have a way of registering >>>>>>>>>>>>>>>>> asynchronous activity with a callback (the callback must be >>>>>>>>>>>>>>>>> registered with >>>>>>>>>>>>>>>>> Beam, because Beam needs to know not to mark the element as >>>>>>>>>>>>>>>>> done until all >>>>>>>>>>>>>>>>> associated callbacks have completed). I think that's >>>>>>>>>>>>>>>>> basically what Kenn >>>>>>>>>>>>>>>>> was suggesting, unless I'm missing something. >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> On Fri, Mar 9, 2018 at 11:07 PM Romain Manni-Bucau < >>>>>>>>>>>>>>>>> rmannibu...@gmail.com> wrote: >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> Yes, callback based. Beam today is synchronous and until >>>>>>>>>>>>>>>>>> bundles+combines are reactive friendly, beam will be >>>>>>>>>>>>>>>>>> synchronous whatever >>>>>>>>>>>>>>>>>> other parts do. Becoming reactive will enable to manage the >>>>>>>>>>>>>>>>>> threading >>>>>>>>>>>>>>>>>> issues properly and to have better scalability on the >>>>>>>>>>>>>>>>>> overall execution >>>>>>>>>>>>>>>>>> when remote IO are involved. >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> However it requires to break source, sdf design to use >>>>>>>>>>>>>>>>>> completionstage - or equivalent - to chain the processing >>>>>>>>>>>>>>>>>> properly and in >>>>>>>>>>>>>>>>>> an unified fashion. >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> Le 9 mars 2018 23:48, "Reuven Lax" <re...@google.com> a >>>>>>>>>>>>>>>>>> écrit : >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> If you're talking about reactive programming, at a >>>>>>>>>>>>>>>>>> certain level beam is already reactive. Are you referring to >>>>>>>>>>>>>>>>>> a specific way >>>>>>>>>>>>>>>>>> of writing the code? >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> On Fri, Mar 9, 2018 at 1:59 PM Reuven Lax < >>>>>>>>>>>>>>>>>> re...@google.com> wrote: >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> What do you mean by reactive? >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> On Fri, Mar 9, 2018, 6:58 PM Romain Manni-Bucau < >>>>>>>>>>>>>>>>>>> rmannibu...@gmail.com> wrote: >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> @Kenn: why not preferring to make beam reactive? Would >>>>>>>>>>>>>>>>>>>> alow to scale way more without having to hardly >>>>>>>>>>>>>>>>>>>> synchronize multithreading. >>>>>>>>>>>>>>>>>>>> Elegant and efficient :). Beam 3? >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> Le 9 mars 2018 22:49, "Kenneth Knowles" <k...@google.com> >>>>>>>>>>>>>>>>>>>> a écrit : >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> I will start with the "exciting futuristic" answer, >>>>>>>>>>>>>>>>>>>>> which is that we envision the new DoFn to be able to >>>>>>>>>>>>>>>>>>>>> provide an automatic >>>>>>>>>>>>>>>>>>>>> ExecutorService parameters that you can use as you wish. >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> new DoFn<>() { >>>>>>>>>>>>>>>>>>>>> @ProcessElement >>>>>>>>>>>>>>>>>>>>> public void process(ProcessContext ctx, >>>>>>>>>>>>>>>>>>>>> ExecutorService executorService) { >>>>>>>>>>>>>>>>>>>>> ... launch some futures, put them in >>>>>>>>>>>>>>>>>>>>> instance vars ... >>>>>>>>>>>>>>>>>>>>> } >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> @FinishBundle >>>>>>>>>>>>>>>>>>>>> public void finish(...) { >>>>>>>>>>>>>>>>>>>>> ... block on futures, output results if >>>>>>>>>>>>>>>>>>>>> appropriate ... >>>>>>>>>>>>>>>>>>>>> } >>>>>>>>>>>>>>>>>>>>> } >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> This way, the Java SDK harness can use its overarching >>>>>>>>>>>>>>>>>>>>> knowledge of what is going on in a computation to, for >>>>>>>>>>>>>>>>>>>>> example, share a >>>>>>>>>>>>>>>>>>>>> thread pool between different bits. This was one reason >>>>>>>>>>>>>>>>>>>>> to delete >>>>>>>>>>>>>>>>>>>>> IntraBundleParallelization - it didn't allow the runner >>>>>>>>>>>>>>>>>>>>> and user code to >>>>>>>>>>>>>>>>>>>>> properly manage how many things were going on >>>>>>>>>>>>>>>>>>>>> concurrently. And mostly the >>>>>>>>>>>>>>>>>>>>> runner should own parallelizing to max out cores and what >>>>>>>>>>>>>>>>>>>>> user code needs >>>>>>>>>>>>>>>>>>>>> is asynchrony hooks that can interact with that. However, >>>>>>>>>>>>>>>>>>>>> this feature is >>>>>>>>>>>>>>>>>>>>> not thoroughly considered. TBD how much the harness >>>>>>>>>>>>>>>>>>>>> itself manages blocking >>>>>>>>>>>>>>>>>>>>> on outstanding requests versus it being your >>>>>>>>>>>>>>>>>>>>> responsibility in >>>>>>>>>>>>>>>>>>>>> FinishBundle, etc. >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> I haven't explored rolling your own here, if you are >>>>>>>>>>>>>>>>>>>>> willing to do the knob tuning to get the threading >>>>>>>>>>>>>>>>>>>>> acceptable for your >>>>>>>>>>>>>>>>>>>>> particular use case. Perhaps someone else can weigh in. >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> Kenn >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> On Fri, Mar 9, 2018 at 1:38 PM Josh Ferge < >>>>>>>>>>>>>>>>>>>>> josh.fe...@bounceexchange.com> wrote: >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> Hello all: >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> Our team has a pipeline that make external network >>>>>>>>>>>>>>>>>>>>>> calls. These pipelines are currently super slow, and the >>>>>>>>>>>>>>>>>>>>>> hypothesis is that >>>>>>>>>>>>>>>>>>>>>> they are slow because we are not threading for our >>>>>>>>>>>>>>>>>>>>>> network calls. The >>>>>>>>>>>>>>>>>>>>>> github issue below provides some discussion around this: >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> https://github.com/apache/beam/pull/957 >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> In beam 1.0, there was IntraBundleParallelization, >>>>>>>>>>>>>>>>>>>>>> which helped with this. However, this was removed >>>>>>>>>>>>>>>>>>>>>> because it didn't comply >>>>>>>>>>>>>>>>>>>>>> with a few BEAM paradigms. >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> Questions going forward: >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> What is advised for jobs that make blocking network >>>>>>>>>>>>>>>>>>>>>> calls? It seems bundling the elements into groups of >>>>>>>>>>>>>>>>>>>>>> size X prior to >>>>>>>>>>>>>>>>>>>>>> passing to the DoFn, and managing the threading within >>>>>>>>>>>>>>>>>>>>>> the function might >>>>>>>>>>>>>>>>>>>>>> work. thoughts? >>>>>>>>>>>>>>>>>>>>>> Are these types of jobs even suitable for beam? >>>>>>>>>>>>>>>>>>>>>> Are there any plans to develop features that help >>>>>>>>>>>>>>>>>>>>>> with this? >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> Thanks >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>> >>>> >>> >> >