Hmm, thinking out loud but completionstage should/could be extended to replace processcontext since it represents element and output at the same time no?
Le 11 mars 2018 00:57, "Kenneth Knowles" <k...@google.com> a écrit : > Yea, I think it could. But it is probably more readable to not overload > the term, plus certainly a bit simpler in implementation. So perhaps > @AsyncElement to make it very clear. > > Kenn > > On Sat, Mar 10, 2018 at 1:32 PM Reuven Lax <re...@google.com> wrote: > >> Ken, can NewDoFn distinguish at generation time the difference between: >> >> public void process(@Element CompletionStage<InputT> element, ...) { >> >> and >> >> public void process(@Element Input element, ...) { >> >> If not, then we would probably need separate annotations.... >> >> >> >> >> >> On Sat, Mar 10, 2018 at 11:09 AM Kenneth Knowles <k...@google.com> wrote: >> >>> Nice! I agree that providing a CompletionStage for chaining is much >>> better than an ExecutorService, and very clear. >>> >>> It is very feasible to add support that looks like >>> >>> new DoFn<InputT, OutputT>() { >>> @ProcessElement >>> public void process(@Element CompletionStage<InputT> element, ...) { >>> element.thenApply(...) >>> } >>> } >>> >>> If we had this available, I think users could even experiment with this >>> often as it might help even where it isn't obvious. >>> >>> My main hesitation is that big part of Beam is giving a >>> basic/imperative style of programming a DoFn that executes in a very smart >>> functional/parallel way. Full future-oriented programming is not >>> explored much outside of Javascript (and maybe Haskell) and requires >>> greater discipline in programming in a functional manner - if you are >>> mutating stuff in your callback you are going to have bugs, and then when >>> you add concurrency control you are going to have bad performance and >>> deadlocks. So I definitely wouldn't make it the default or want to spend >>> all our support effort on teaching advanced programming technique. >>> >>> Kenn >>> >>> On Sat, Mar 10, 2018 at 9:31 AM Romain Manni-Bucau < >>> rmannibu...@gmail.com> wrote: >>> >>>> >>>> >>>> 2018-03-10 17:30 GMT+01:00 Reuven Lax <re...@google.com>: >>>> >>>>> Have you considered drafting in detail what you think this API might >>>>> look like? >>>>> >>>> >>>> >>>> Yes, but it is after the "enhancements" - for my use cases - and "bugs" >>>> list so didn't started to work on it much. >>>> >>>> >>>>> >>>>> If it's a radically different API, it might be more appropriate as an >>>>> alternative parallel Beam API rather than a replacement for the current >>>>> API >>>>> (there is also one such fluent API in the works). >>>>> >>>> >>>> What I plan is to draft it on top of beam (so the "useless" case I >>>> spoke about before) and then propose to impl it ~natively and move it as >>>> main API for another major. >>>> >>>> >>>> >>>> >>>>> >>>>> >>>>> On Sat, Mar 10, 2018 at 7:23 AM Romain Manni-Bucau < >>>>> rmannibu...@gmail.com> wrote: >>>>> >>>>>> >>>>>> >>>>>> 2018-03-10 16:19 GMT+01:00 Reuven Lax <re...@google.com>: >>>>>> >>>>>>> This is another version (maybe a better, Java 8 idiomatic one?) of >>>>>>> what Kenn suggested. >>>>>>> >>>>>>> Note that with NewDoFn this need not be incompatible (so might not >>>>>>> require waiting till Beam 3.0). We can recognize new parameters to >>>>>>> processElement and populate add needed. >>>>>>> >>>>>> >>>>>> This is right however in my head it was a single way movemenent to >>>>>> enforce the design to be reactive and not fake a reactive API with a sync >>>>>> and not reactive impl which is what would be done today with both >>>>>> support I >>>>>> fear. >>>>>> >>>>>> >>>>>>> >>>>>>> On Sat, Mar 10, 2018, 12:13 PM Romain Manni-Bucau < >>>>>>> rmannibu...@gmail.com> wrote: >>>>>>> >>>>>>>> Yes, for the dofn for instance, instead of having >>>>>>>> processcontext.element()=<T> you get a CompletionStage<T> and output >>>>>>>> gets >>>>>>>> it as well. >>>>>>>> >>>>>>>> This way you register an execution chain. Mixed with streams you >>>>>>>> get a big data java 8/9/10 API which enabkes any connectivity in a wel >>>>>>>> performing manner ;). >>>>>>>> >>>>>>>> Le 10 mars 2018 13:56, "Reuven Lax" <re...@google.com> a écrit : >>>>>>>> >>>>>>>>> So you mean the user should have a way of registering asynchronous >>>>>>>>> activity with a callback (the callback must be registered with Beam, >>>>>>>>> because Beam needs to know not to mark the element as done until all >>>>>>>>> associated callbacks have completed). I think that's basically what >>>>>>>>> Kenn >>>>>>>>> was suggesting, unless I'm missing something. >>>>>>>>> >>>>>>>>> >>>>>>>>> On Fri, Mar 9, 2018 at 11:07 PM Romain Manni-Bucau < >>>>>>>>> rmannibu...@gmail.com> wrote: >>>>>>>>> >>>>>>>>>> Yes, callback based. Beam today is synchronous and until >>>>>>>>>> bundles+combines are reactive friendly, beam will be synchronous >>>>>>>>>> whatever >>>>>>>>>> other parts do. Becoming reactive will enable to manage the threading >>>>>>>>>> issues properly and to have better scalability on the overall >>>>>>>>>> execution >>>>>>>>>> when remote IO are involved. >>>>>>>>>> >>>>>>>>>> However it requires to break source, sdf design to use >>>>>>>>>> completionstage - or equivalent - to chain the processing properly >>>>>>>>>> and in >>>>>>>>>> an unified fashion. >>>>>>>>>> >>>>>>>>>> Le 9 mars 2018 23:48, "Reuven Lax" <re...@google.com> a écrit : >>>>>>>>>> >>>>>>>>>> If you're talking about reactive programming, at a certain level >>>>>>>>>> beam is already reactive. Are you referring to a specific way of >>>>>>>>>> writing >>>>>>>>>> the code? >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> On Fri, Mar 9, 2018 at 1:59 PM Reuven Lax <re...@google.com> >>>>>>>>>> wrote: >>>>>>>>>> >>>>>>>>>>> What do you mean by reactive? >>>>>>>>>>> >>>>>>>>>>> On Fri, Mar 9, 2018, 6:58 PM Romain Manni-Bucau < >>>>>>>>>>> rmannibu...@gmail.com> wrote: >>>>>>>>>>> >>>>>>>>>>>> @Kenn: why not preferring to make beam reactive? Would alow to >>>>>>>>>>>> scale way more without having to hardly synchronize >>>>>>>>>>>> multithreading. Elegant >>>>>>>>>>>> and efficient :). Beam 3? >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> Le 9 mars 2018 22:49, "Kenneth Knowles" <k...@google.com> a >>>>>>>>>>>> écrit : >>>>>>>>>>>> >>>>>>>>>>>>> I will start with the "exciting futuristic" answer, which is >>>>>>>>>>>>> that we envision the new DoFn to be able to provide an automatic >>>>>>>>>>>>> ExecutorService parameters that you can use as you wish. >>>>>>>>>>>>> >>>>>>>>>>>>> new DoFn<>() { >>>>>>>>>>>>> @ProcessElement >>>>>>>>>>>>> public void process(ProcessContext ctx, ExecutorService >>>>>>>>>>>>> executorService) { >>>>>>>>>>>>> ... launch some futures, put them in instance vars >>>>>>>>>>>>> ... >>>>>>>>>>>>> } >>>>>>>>>>>>> >>>>>>>>>>>>> @FinishBundle >>>>>>>>>>>>> public void finish(...) { >>>>>>>>>>>>> ... block on futures, output results if appropriate >>>>>>>>>>>>> ... >>>>>>>>>>>>> } >>>>>>>>>>>>> } >>>>>>>>>>>>> >>>>>>>>>>>>> This way, the Java SDK harness can use its overarching >>>>>>>>>>>>> knowledge of what is going on in a computation to, for example, >>>>>>>>>>>>> share a >>>>>>>>>>>>> thread pool between different bits. This was one reason to delete >>>>>>>>>>>>> IntraBundleParallelization - it didn't allow the runner and user >>>>>>>>>>>>> code to >>>>>>>>>>>>> properly manage how many things were going on concurrently. And >>>>>>>>>>>>> mostly the >>>>>>>>>>>>> runner should own parallelizing to max out cores and what user >>>>>>>>>>>>> code needs >>>>>>>>>>>>> is asynchrony hooks that can interact with that. However, this >>>>>>>>>>>>> feature is >>>>>>>>>>>>> not thoroughly considered. TBD how much the harness itself >>>>>>>>>>>>> manages blocking >>>>>>>>>>>>> on outstanding requests versus it being your responsibility in >>>>>>>>>>>>> FinishBundle, etc. >>>>>>>>>>>>> >>>>>>>>>>>>> I haven't explored rolling your own here, if you are willing >>>>>>>>>>>>> to do the knob tuning to get the threading acceptable for your >>>>>>>>>>>>> particular >>>>>>>>>>>>> use case. Perhaps someone else can weigh in. >>>>>>>>>>>>> >>>>>>>>>>>>> Kenn >>>>>>>>>>>>> >>>>>>>>>>>>> On Fri, Mar 9, 2018 at 1:38 PM Josh Ferge < >>>>>>>>>>>>> josh.fe...@bounceexchange.com> wrote: >>>>>>>>>>>>> >>>>>>>>>>>>>> Hello all: >>>>>>>>>>>>>> >>>>>>>>>>>>>> Our team has a pipeline that make external network calls. >>>>>>>>>>>>>> These pipelines are currently super slow, and the hypothesis is >>>>>>>>>>>>>> that they >>>>>>>>>>>>>> are slow because we are not threading for our network calls. The >>>>>>>>>>>>>> github >>>>>>>>>>>>>> issue below provides some discussion around this: >>>>>>>>>>>>>> >>>>>>>>>>>>>> https://github.com/apache/beam/pull/957 >>>>>>>>>>>>>> >>>>>>>>>>>>>> In beam 1.0, there was IntraBundleParallelization, which >>>>>>>>>>>>>> helped with this. However, this was removed because it didn't >>>>>>>>>>>>>> comply with a >>>>>>>>>>>>>> few BEAM paradigms. >>>>>>>>>>>>>> >>>>>>>>>>>>>> Questions going forward: >>>>>>>>>>>>>> >>>>>>>>>>>>>> What is advised for jobs that make blocking network calls? It >>>>>>>>>>>>>> seems bundling the elements into groups of size X prior to >>>>>>>>>>>>>> passing to the >>>>>>>>>>>>>> DoFn, and managing the threading within the function might work. >>>>>>>>>>>>>> thoughts? >>>>>>>>>>>>>> Are these types of jobs even suitable for beam? >>>>>>>>>>>>>> Are there any plans to develop features that help with this? >>>>>>>>>>>>>> >>>>>>>>>>>>>> Thanks >>>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>> >>>>>> >>>>