No more but can try to gather some figures and compare it to beam dofn overhead which should be at the same level or a bit more here since it is never unwrapped whereas completionfuture is a raw code chain without beam in the middle.
Le 12 mars 2018 18:18, "Lukasz Cwik" <lc...@google.com> a écrit : > Do you have data that supports this? > > Note that in reality for something like passing an element between DoFns, > the constant in o(1) actually matters. Decreasing SDK harness overhead is a > good thing though. > > On Mon, Mar 12, 2018 at 10:14 AM, Romain Manni-Bucau < > rmannibu...@gmail.com> wrote: > >> By itself just the overhead of instantiating a wrapper (so nothing with >> the recent JVM GC improvement done for the stream/optional usages). After >> if you use the chaining you have a light overhead but still o(1) you can >> desire to skip when doing sync code but which will enable you to run way >> faster doing IO/async code by optimizing the CPU usage properly when you >> tune your slaves/workers. So tempted to summarize it as "has an overhead >> allowing to not run slower". It doesn't prevent beam to still expose a >> synchronous API collapse at evaluation time in a single fn which will give >> you the best of both worlds. >> >> >> Romain Manni-Bucau >> @rmannibucau <https://twitter.com/rmannibucau> | Blog >> <https://rmannibucau.metawerx.net/> | Old Blog >> <http://rmannibucau.wordpress.com> | Github >> <https://github.com/rmannibucau> | LinkedIn >> <https://www.linkedin.com/in/rmannibucau> | Book >> <https://www.packtpub.com/application-development/java-ee-8-high-performance> >> >> 2018-03-12 18:08 GMT+01:00 Lukasz Cwik <lc...@google.com>: >> >>> It is expected that SDKs will have all their cores fully utilized by >>> processing bundles in parallel and not by performing intrabundle >>> parallelization. This allows for DoFns to be chained together via regular >>> method calls because the overhead to pass a single element through all the >>> DoFn's should be as minimal as possible >>> >>> What is the overhead of using a completion stage vs using a regular >>> method call? >>> >>> On Sun, Mar 11, 2018 at 10:18 PM, Romain Manni-Bucau < >>> rmannibu...@gmail.com> wrote: >>> >>>> >>>> >>>> Le 12 mars 2018 00:19, "Reuven Lax" <re...@google.com> a écrit : >>>> >>>> >>>> >>>> >>>> On Sun, Mar 11, 2018 at 7:40 AM Romain Manni-Bucau < >>>> rmannibu...@gmail.com> wrote: >>>> >>>>> makes me think beam should maybe do 2 internals changes before moving >>>>> forward on (s)df API changes: >>>>> >>>>> 1. define a beam singleton per JVM (classloader hierarchy actually but >>>>> you get the idea I think) which can be used to store things locally for >>>>> reuse - see 2 for an example or metrics pusher work Etienne does could >>>>> benefit from it too >>>>> >>>> >>>> I think we do need something like this, but it needs to be a bit more >>>> than a singleton per JVM. For one thing it needs to be at least per >>>> pipeline within a JVM. You might run multiple tests in a single JVM, and it >>>> should also be possible to run those tests in parallel without the static >>>> state interfering with each other. I also think the state needs to be >>>> addressable per step (i.e. a ParDo can look up its static state without >>>> caring about static state belonging to another ParDo). >>>> >>>> >>>> Agree but you can register in a singleton the pipeline (as "ref" not as >>>> instance) and therefore hit the same need. +1 to have scopes (singleton, >>>> pipeline, thread) but it still requires a single singleton to handle >>>> serialization ;). >>>> >>>> >>>> >>>> >>>> 2. define a SPI to load (s)dofn parameter provider instead of having an >>>>> ArgProvider which provides everything which is supported. This way you can >>>>> use any kind of parameter and the parameterproviders can use 1. to handle >>>>> their own state. First impl of the parameterprovider SPI would be a) state >>>>> b) timer c) reactive handlers and potentially user parameter providers >>>>> (service like which can be singleton in the scope of a "JVM" thanks to 1). >>>>> >>>>> >>>>> Romain Manni-Bucau >>>>> @rmannibucau <https://twitter.com/rmannibucau> | Blog >>>>> <https://rmannibucau.metawerx.net/> | Old Blog >>>>> <http://rmannibucau.wordpress.com> | Github >>>>> <https://github.com/rmannibucau> | LinkedIn >>>>> <https://www.linkedin.com/in/rmannibucau> | Book >>>>> <https://www.packtpub.com/application-development/java-ee-8-high-performance> >>>>> >>>>> 2018-03-11 15:32 GMT+01:00 Reuven Lax <re...@google.com>: >>>>> >>>>>> Yep. Introduce OutputEmitter, and Process context no longer has much >>>>>> use. >>>>>> >>>>>> On Sun, Mar 11, 2018, 11:19 AM Romain Manni-Bucau < >>>>>> rmannibu...@gmail.com> wrote: >>>>>> >>>>>>> Which is still a key feature for sdf but agree it can be dropped for >>>>>>> an outputemitter pattern and the dofn moved to a plain parameters >>>>>>> injection >>>>>>> based pattern. Both (which completionstage) stays compatible :). >>>>>>> >>>>>>> Le 11 mars 2018 13:12, "Reuven Lax" <re...@google.com> a écrit : >>>>>>> >>>>>>>> I think process context should go away completely. At that point it >>>>>>>> has little use except for a way to send output downstream. >>>>>>>> >>>>>>>> On Sun, Mar 11, 2018, 6:07 AM Romain Manni-Bucau < >>>>>>>> rmannibu...@gmail.com> wrote: >>>>>>>> >>>>>>>>> Hmm, thinking out loud but completionstage should/could be >>>>>>>>> extended to replace processcontext since it represents element and >>>>>>>>> output >>>>>>>>> at the same time no? >>>>>>>>> >>>>>>>>> Le 11 mars 2018 00:57, "Kenneth Knowles" <k...@google.com> a >>>>>>>>> écrit : >>>>>>>>> >>>>>>>>>> Yea, I think it could. But it is probably more readable to not >>>>>>>>>> overload the term, plus certainly a bit simpler in implementation. So >>>>>>>>>> perhaps @AsyncElement to make it very clear. >>>>>>>>>> >>>>>>>>>> Kenn >>>>>>>>>> >>>>>>>>>> On Sat, Mar 10, 2018 at 1:32 PM Reuven Lax <re...@google.com> >>>>>>>>>> wrote: >>>>>>>>>> >>>>>>>>>>> Ken, can NewDoFn distinguish at generation time the difference >>>>>>>>>>> between: >>>>>>>>>>> >>>>>>>>>>> public void process(@Element CompletionStage<InputT> >>>>>>>>>>> element, ...) { >>>>>>>>>>> >>>>>>>>>>> and >>>>>>>>>>> >>>>>>>>>>> public void process(@Element Input element, ...) { >>>>>>>>>>> >>>>>>>>>>> If not, then we would probably need separate annotations.... >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> On Sat, Mar 10, 2018 at 11:09 AM Kenneth Knowles <k...@google.com> >>>>>>>>>>> wrote: >>>>>>>>>>> >>>>>>>>>>>> Nice! I agree that providing a CompletionStage for chaining is >>>>>>>>>>>> much better than an ExecutorService, and very clear. >>>>>>>>>>>> >>>>>>>>>>>> It is very feasible to add support that looks like >>>>>>>>>>>> >>>>>>>>>>>> new DoFn<InputT, OutputT>() { >>>>>>>>>>>> @ProcessElement >>>>>>>>>>>> public void process(@Element CompletionStage<InputT> >>>>>>>>>>>> element, ...) { >>>>>>>>>>>> element.thenApply(...) >>>>>>>>>>>> } >>>>>>>>>>>> } >>>>>>>>>>>> >>>>>>>>>>>> If we had this available, I think users could even experiment >>>>>>>>>>>> with this often as it might help even where it isn't obvious. >>>>>>>>>>>> >>>>>>>>>>>> My main hesitation is that big part of Beam is giving a >>>>>>>>>>>> basic/imperative style of programming a DoFn that executes in a >>>>>>>>>>>> very smart >>>>>>>>>>>> functional/parallel way. Full future-oriented programming is >>>>>>>>>>>> not explored much outside of Javascript (and maybe Haskell) and >>>>>>>>>>>> requires >>>>>>>>>>>> greater discipline in programming in a functional manner - if you >>>>>>>>>>>> are >>>>>>>>>>>> mutating stuff in your callback you are going to have bugs, and >>>>>>>>>>>> then when >>>>>>>>>>>> you add concurrency control you are going to have bad performance >>>>>>>>>>>> and >>>>>>>>>>>> deadlocks. So I definitely wouldn't make it the default or want to >>>>>>>>>>>> spend >>>>>>>>>>>> all our support effort on teaching advanced programming technique. >>>>>>>>>>>> >>>>>>>>>>>> Kenn >>>>>>>>>>>> >>>>>>>>>>>> On Sat, Mar 10, 2018 at 9:31 AM Romain Manni-Bucau < >>>>>>>>>>>> rmannibu...@gmail.com> wrote: >>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> 2018-03-10 17:30 GMT+01:00 Reuven Lax <re...@google.com>: >>>>>>>>>>>>> >>>>>>>>>>>>>> Have you considered drafting in detail what you think this >>>>>>>>>>>>>> API might look like? >>>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> Yes, but it is after the "enhancements" - for my use cases - >>>>>>>>>>>>> and "bugs" list so didn't started to work on it much. >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> If it's a radically different API, it might be more >>>>>>>>>>>>>> appropriate as an alternative parallel Beam API rather than a >>>>>>>>>>>>>> replacement >>>>>>>>>>>>>> for the current API (there is also one such fluent API in the >>>>>>>>>>>>>> works). >>>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> What I plan is to draft it on top of beam (so the "useless" >>>>>>>>>>>>> case I spoke about before) and then propose to impl it ~natively >>>>>>>>>>>>> and move >>>>>>>>>>>>> it as main API for another major. >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> On Sat, Mar 10, 2018 at 7:23 AM Romain Manni-Bucau < >>>>>>>>>>>>>> rmannibu...@gmail.com> wrote: >>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> 2018-03-10 16:19 GMT+01:00 Reuven Lax <re...@google.com>: >>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> This is another version (maybe a better, Java 8 idiomatic >>>>>>>>>>>>>>>> one?) of what Kenn suggested. >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> Note that with NewDoFn this need not be incompatible (so >>>>>>>>>>>>>>>> might not require waiting till Beam 3.0). We can recognize new >>>>>>>>>>>>>>>> parameters >>>>>>>>>>>>>>>> to processElement and populate add needed. >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> This is right however in my head it was a single way >>>>>>>>>>>>>>> movemenent to enforce the design to be reactive and not fake a >>>>>>>>>>>>>>> reactive API >>>>>>>>>>>>>>> with a sync and not reactive impl which is what would be done >>>>>>>>>>>>>>> today with >>>>>>>>>>>>>>> both support I fear. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> On Sat, Mar 10, 2018, 12:13 PM Romain Manni-Bucau < >>>>>>>>>>>>>>>> rmannibu...@gmail.com> wrote: >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> Yes, for the dofn for instance, instead of having >>>>>>>>>>>>>>>>> processcontext.element()=<T> you get a CompletionStage<T> and >>>>>>>>>>>>>>>>> output gets >>>>>>>>>>>>>>>>> it as well. >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> This way you register an execution chain. Mixed with >>>>>>>>>>>>>>>>> streams you get a big data java 8/9/10 API which enabkes any >>>>>>>>>>>>>>>>> connectivity >>>>>>>>>>>>>>>>> in a wel performing manner ;). >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> Le 10 mars 2018 13:56, "Reuven Lax" <re...@google.com> a >>>>>>>>>>>>>>>>> écrit : >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> So you mean the user should have a way of registering >>>>>>>>>>>>>>>>>> asynchronous activity with a callback (the callback must be >>>>>>>>>>>>>>>>>> registered with >>>>>>>>>>>>>>>>>> Beam, because Beam needs to know not to mark the element as >>>>>>>>>>>>>>>>>> done until all >>>>>>>>>>>>>>>>>> associated callbacks have completed). I think that's >>>>>>>>>>>>>>>>>> basically what Kenn >>>>>>>>>>>>>>>>>> was suggesting, unless I'm missing something. >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> On Fri, Mar 9, 2018 at 11:07 PM Romain Manni-Bucau < >>>>>>>>>>>>>>>>>> rmannibu...@gmail.com> wrote: >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> Yes, callback based. Beam today is synchronous and until >>>>>>>>>>>>>>>>>>> bundles+combines are reactive friendly, beam will be >>>>>>>>>>>>>>>>>>> synchronous whatever >>>>>>>>>>>>>>>>>>> other parts do. Becoming reactive will enable to manage the >>>>>>>>>>>>>>>>>>> threading >>>>>>>>>>>>>>>>>>> issues properly and to have better scalability on the >>>>>>>>>>>>>>>>>>> overall execution >>>>>>>>>>>>>>>>>>> when remote IO are involved. >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> However it requires to break source, sdf design to use >>>>>>>>>>>>>>>>>>> completionstage - or equivalent - to chain the processing >>>>>>>>>>>>>>>>>>> properly and in >>>>>>>>>>>>>>>>>>> an unified fashion. >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> Le 9 mars 2018 23:48, "Reuven Lax" <re...@google.com> a >>>>>>>>>>>>>>>>>>> écrit : >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> If you're talking about reactive programming, at a >>>>>>>>>>>>>>>>>>> certain level beam is already reactive. Are you referring >>>>>>>>>>>>>>>>>>> to a specific way >>>>>>>>>>>>>>>>>>> of writing the code? >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> On Fri, Mar 9, 2018 at 1:59 PM Reuven Lax < >>>>>>>>>>>>>>>>>>> re...@google.com> wrote: >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> What do you mean by reactive? >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> On Fri, Mar 9, 2018, 6:58 PM Romain Manni-Bucau < >>>>>>>>>>>>>>>>>>>> rmannibu...@gmail.com> wrote: >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> @Kenn: why not preferring to make beam reactive? Would >>>>>>>>>>>>>>>>>>>>> alow to scale way more without having to hardly >>>>>>>>>>>>>>>>>>>>> synchronize multithreading. >>>>>>>>>>>>>>>>>>>>> Elegant and efficient :). Beam 3? >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> Le 9 mars 2018 22:49, "Kenneth Knowles" < >>>>>>>>>>>>>>>>>>>>> k...@google.com> a écrit : >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> I will start with the "exciting futuristic" answer, >>>>>>>>>>>>>>>>>>>>>> which is that we envision the new DoFn to be able to >>>>>>>>>>>>>>>>>>>>>> provide an automatic >>>>>>>>>>>>>>>>>>>>>> ExecutorService parameters that you can use as you wish. >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> new DoFn<>() { >>>>>>>>>>>>>>>>>>>>>> @ProcessElement >>>>>>>>>>>>>>>>>>>>>> public void process(ProcessContext ctx, >>>>>>>>>>>>>>>>>>>>>> ExecutorService executorService) { >>>>>>>>>>>>>>>>>>>>>> ... launch some futures, put them in >>>>>>>>>>>>>>>>>>>>>> instance vars ... >>>>>>>>>>>>>>>>>>>>>> } >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> @FinishBundle >>>>>>>>>>>>>>>>>>>>>> public void finish(...) { >>>>>>>>>>>>>>>>>>>>>> ... block on futures, output results if >>>>>>>>>>>>>>>>>>>>>> appropriate ... >>>>>>>>>>>>>>>>>>>>>> } >>>>>>>>>>>>>>>>>>>>>> } >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> This way, the Java SDK harness can use its >>>>>>>>>>>>>>>>>>>>>> overarching knowledge of what is going on in a >>>>>>>>>>>>>>>>>>>>>> computation to, for example, >>>>>>>>>>>>>>>>>>>>>> share a thread pool between different bits. This was one >>>>>>>>>>>>>>>>>>>>>> reason to delete >>>>>>>>>>>>>>>>>>>>>> IntraBundleParallelization - it didn't allow the runner >>>>>>>>>>>>>>>>>>>>>> and user code to >>>>>>>>>>>>>>>>>>>>>> properly manage how many things were going on >>>>>>>>>>>>>>>>>>>>>> concurrently. And mostly the >>>>>>>>>>>>>>>>>>>>>> runner should own parallelizing to max out cores and >>>>>>>>>>>>>>>>>>>>>> what user code needs >>>>>>>>>>>>>>>>>>>>>> is asynchrony hooks that can interact with that. >>>>>>>>>>>>>>>>>>>>>> However, this feature is >>>>>>>>>>>>>>>>>>>>>> not thoroughly considered. TBD how much the harness >>>>>>>>>>>>>>>>>>>>>> itself manages blocking >>>>>>>>>>>>>>>>>>>>>> on outstanding requests versus it being your >>>>>>>>>>>>>>>>>>>>>> responsibility in >>>>>>>>>>>>>>>>>>>>>> FinishBundle, etc. >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> I haven't explored rolling your own here, if you are >>>>>>>>>>>>>>>>>>>>>> willing to do the knob tuning to get the threading >>>>>>>>>>>>>>>>>>>>>> acceptable for your >>>>>>>>>>>>>>>>>>>>>> particular use case. Perhaps someone else can weigh in. >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> Kenn >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> On Fri, Mar 9, 2018 at 1:38 PM Josh Ferge < >>>>>>>>>>>>>>>>>>>>>> josh.fe...@bounceexchange.com> wrote: >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> Hello all: >>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> Our team has a pipeline that make external network >>>>>>>>>>>>>>>>>>>>>>> calls. These pipelines are currently super slow, and >>>>>>>>>>>>>>>>>>>>>>> the hypothesis is that >>>>>>>>>>>>>>>>>>>>>>> they are slow because we are not threading for our >>>>>>>>>>>>>>>>>>>>>>> network calls. The >>>>>>>>>>>>>>>>>>>>>>> github issue below provides some discussion around this: >>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> https://github.com/apache/beam/pull/957 >>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> In beam 1.0, there was IntraBundleParallelization, >>>>>>>>>>>>>>>>>>>>>>> which helped with this. However, this was removed >>>>>>>>>>>>>>>>>>>>>>> because it didn't comply >>>>>>>>>>>>>>>>>>>>>>> with a few BEAM paradigms. >>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> Questions going forward: >>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> What is advised for jobs that make blocking network >>>>>>>>>>>>>>>>>>>>>>> calls? It seems bundling the elements into groups of >>>>>>>>>>>>>>>>>>>>>>> size X prior to >>>>>>>>>>>>>>>>>>>>>>> passing to the DoFn, and managing the threading within >>>>>>>>>>>>>>>>>>>>>>> the function might >>>>>>>>>>>>>>>>>>>>>>> work. thoughts? >>>>>>>>>>>>>>>>>>>>>>> Are these types of jobs even suitable for beam? >>>>>>>>>>>>>>>>>>>>>>> Are there any plans to develop features that help >>>>>>>>>>>>>>>>>>>>>>> with this? >>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>>> Thanks >>>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>> >>>>> >>>> >>> >> >