Yea, I think it could. But it is probably more readable to not overload the term, plus certainly a bit simpler in implementation. So perhaps @AsyncElement to make it very clear.
Kenn On Sat, Mar 10, 2018 at 1:32 PM Reuven Lax <re...@google.com> wrote: > Ken, can NewDoFn distinguish at generation time the difference between: > > public void process(@Element CompletionStage<InputT> element, ...) { > > and > > public void process(@Element Input element, ...) { > > If not, then we would probably need separate annotations.... > > > > > > On Sat, Mar 10, 2018 at 11:09 AM Kenneth Knowles <k...@google.com> wrote: > >> Nice! I agree that providing a CompletionStage for chaining is much >> better than an ExecutorService, and very clear. >> >> It is very feasible to add support that looks like >> >> new DoFn<InputT, OutputT>() { >> @ProcessElement >> public void process(@Element CompletionStage<InputT> element, ...) { >> element.thenApply(...) >> } >> } >> >> If we had this available, I think users could even experiment with this >> often as it might help even where it isn't obvious. >> >> My main hesitation is that big part of Beam is giving a basic/imperative >> style of programming a DoFn that executes in a very smart >> functional/parallel way. Full future-oriented programming is not >> explored much outside of Javascript (and maybe Haskell) and requires >> greater discipline in programming in a functional manner - if you are >> mutating stuff in your callback you are going to have bugs, and then when >> you add concurrency control you are going to have bad performance and >> deadlocks. So I definitely wouldn't make it the default or want to spend >> all our support effort on teaching advanced programming technique. >> >> Kenn >> >> On Sat, Mar 10, 2018 at 9:31 AM Romain Manni-Bucau <rmannibu...@gmail.com> >> wrote: >> >>> >>> >>> 2018-03-10 17:30 GMT+01:00 Reuven Lax <re...@google.com>: >>> >>>> Have you considered drafting in detail what you think this API might >>>> look like? >>>> >>> >>> >>> Yes, but it is after the "enhancements" - for my use cases - and "bugs" >>> list so didn't started to work on it much. >>> >>> >>>> >>>> If it's a radically different API, it might be more appropriate as an >>>> alternative parallel Beam API rather than a replacement for the current API >>>> (there is also one such fluent API in the works). >>>> >>> >>> What I plan is to draft it on top of beam (so the "useless" case I spoke >>> about before) and then propose to impl it ~natively and move it as main API >>> for another major. >>> >>> >>> >>> >>>> >>>> >>>> On Sat, Mar 10, 2018 at 7:23 AM Romain Manni-Bucau < >>>> rmannibu...@gmail.com> wrote: >>>> >>>>> >>>>> >>>>> 2018-03-10 16:19 GMT+01:00 Reuven Lax <re...@google.com>: >>>>> >>>>>> This is another version (maybe a better, Java 8 idiomatic one?) of >>>>>> what Kenn suggested. >>>>>> >>>>>> Note that with NewDoFn this need not be incompatible (so might not >>>>>> require waiting till Beam 3.0). We can recognize new parameters to >>>>>> processElement and populate add needed. >>>>>> >>>>> >>>>> This is right however in my head it was a single way movemenent to >>>>> enforce the design to be reactive and not fake a reactive API with a sync >>>>> and not reactive impl which is what would be done today with both support >>>>> I >>>>> fear. >>>>> >>>>> >>>>>> >>>>>> On Sat, Mar 10, 2018, 12:13 PM Romain Manni-Bucau < >>>>>> rmannibu...@gmail.com> wrote: >>>>>> >>>>>>> Yes, for the dofn for instance, instead of having >>>>>>> processcontext.element()=<T> you get a CompletionStage<T> and output >>>>>>> gets >>>>>>> it as well. >>>>>>> >>>>>>> This way you register an execution chain. Mixed with streams you get >>>>>>> a big data java 8/9/10 API which enabkes any connectivity in a wel >>>>>>> performing manner ;). >>>>>>> >>>>>>> Le 10 mars 2018 13:56, "Reuven Lax" <re...@google.com> a écrit : >>>>>>> >>>>>>>> So you mean the user should have a way of registering asynchronous >>>>>>>> activity with a callback (the callback must be registered with Beam, >>>>>>>> because Beam needs to know not to mark the element as done until all >>>>>>>> associated callbacks have completed). I think that's basically what >>>>>>>> Kenn >>>>>>>> was suggesting, unless I'm missing something. >>>>>>>> >>>>>>>> >>>>>>>> On Fri, Mar 9, 2018 at 11:07 PM Romain Manni-Bucau < >>>>>>>> rmannibu...@gmail.com> wrote: >>>>>>>> >>>>>>>>> Yes, callback based. Beam today is synchronous and until >>>>>>>>> bundles+combines are reactive friendly, beam will be synchronous >>>>>>>>> whatever >>>>>>>>> other parts do. Becoming reactive will enable to manage the threading >>>>>>>>> issues properly and to have better scalability on the overall >>>>>>>>> execution >>>>>>>>> when remote IO are involved. >>>>>>>>> >>>>>>>>> However it requires to break source, sdf design to use >>>>>>>>> completionstage - or equivalent - to chain the processing properly >>>>>>>>> and in >>>>>>>>> an unified fashion. >>>>>>>>> >>>>>>>>> Le 9 mars 2018 23:48, "Reuven Lax" <re...@google.com> a écrit : >>>>>>>>> >>>>>>>>> If you're talking about reactive programming, at a certain level >>>>>>>>> beam is already reactive. Are you referring to a specific way of >>>>>>>>> writing >>>>>>>>> the code? >>>>>>>>> >>>>>>>>> >>>>>>>>> On Fri, Mar 9, 2018 at 1:59 PM Reuven Lax <re...@google.com> >>>>>>>>> wrote: >>>>>>>>> >>>>>>>>>> What do you mean by reactive? >>>>>>>>>> >>>>>>>>>> On Fri, Mar 9, 2018, 6:58 PM Romain Manni-Bucau < >>>>>>>>>> rmannibu...@gmail.com> wrote: >>>>>>>>>> >>>>>>>>>>> @Kenn: why not preferring to make beam reactive? Would alow to >>>>>>>>>>> scale way more without having to hardly synchronize multithreading. >>>>>>>>>>> Elegant >>>>>>>>>>> and efficient :). Beam 3? >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> Le 9 mars 2018 22:49, "Kenneth Knowles" <k...@google.com> a >>>>>>>>>>> écrit : >>>>>>>>>>> >>>>>>>>>>>> I will start with the "exciting futuristic" answer, which is >>>>>>>>>>>> that we envision the new DoFn to be able to provide an automatic >>>>>>>>>>>> ExecutorService parameters that you can use as you wish. >>>>>>>>>>>> >>>>>>>>>>>> new DoFn<>() { >>>>>>>>>>>> @ProcessElement >>>>>>>>>>>> public void process(ProcessContext ctx, ExecutorService >>>>>>>>>>>> executorService) { >>>>>>>>>>>> ... launch some futures, put them in instance vars ... >>>>>>>>>>>> } >>>>>>>>>>>> >>>>>>>>>>>> @FinishBundle >>>>>>>>>>>> public void finish(...) { >>>>>>>>>>>> ... block on futures, output results if appropriate ... >>>>>>>>>>>> } >>>>>>>>>>>> } >>>>>>>>>>>> >>>>>>>>>>>> This way, the Java SDK harness can use its overarching >>>>>>>>>>>> knowledge of what is going on in a computation to, for example, >>>>>>>>>>>> share a >>>>>>>>>>>> thread pool between different bits. This was one reason to delete >>>>>>>>>>>> IntraBundleParallelization - it didn't allow the runner and user >>>>>>>>>>>> code to >>>>>>>>>>>> properly manage how many things were going on concurrently. And >>>>>>>>>>>> mostly the >>>>>>>>>>>> runner should own parallelizing to max out cores and what user >>>>>>>>>>>> code needs >>>>>>>>>>>> is asynchrony hooks that can interact with that. However, this >>>>>>>>>>>> feature is >>>>>>>>>>>> not thoroughly considered. TBD how much the harness itself manages >>>>>>>>>>>> blocking >>>>>>>>>>>> on outstanding requests versus it being your responsibility in >>>>>>>>>>>> FinishBundle, etc. >>>>>>>>>>>> >>>>>>>>>>>> I haven't explored rolling your own here, if you are willing to >>>>>>>>>>>> do the knob tuning to get the threading acceptable for your >>>>>>>>>>>> particular use >>>>>>>>>>>> case. Perhaps someone else can weigh in. >>>>>>>>>>>> >>>>>>>>>>>> Kenn >>>>>>>>>>>> >>>>>>>>>>>> On Fri, Mar 9, 2018 at 1:38 PM Josh Ferge < >>>>>>>>>>>> josh.fe...@bounceexchange.com> wrote: >>>>>>>>>>>> >>>>>>>>>>>>> Hello all: >>>>>>>>>>>>> >>>>>>>>>>>>> Our team has a pipeline that make external network calls. >>>>>>>>>>>>> These pipelines are currently super slow, and the hypothesis is >>>>>>>>>>>>> that they >>>>>>>>>>>>> are slow because we are not threading for our network calls. The >>>>>>>>>>>>> github >>>>>>>>>>>>> issue below provides some discussion around this: >>>>>>>>>>>>> >>>>>>>>>>>>> https://github.com/apache/beam/pull/957 >>>>>>>>>>>>> >>>>>>>>>>>>> In beam 1.0, there was IntraBundleParallelization, which >>>>>>>>>>>>> helped with this. However, this was removed because it didn't >>>>>>>>>>>>> comply with a >>>>>>>>>>>>> few BEAM paradigms. >>>>>>>>>>>>> >>>>>>>>>>>>> Questions going forward: >>>>>>>>>>>>> >>>>>>>>>>>>> What is advised for jobs that make blocking network calls? It >>>>>>>>>>>>> seems bundling the elements into groups of size X prior to >>>>>>>>>>>>> passing to the >>>>>>>>>>>>> DoFn, and managing the threading within the function might work. >>>>>>>>>>>>> thoughts? >>>>>>>>>>>>> Are these types of jobs even suitable for beam? >>>>>>>>>>>>> Are there any plans to develop features that help with this? >>>>>>>>>>>>> >>>>>>>>>>>>> Thanks >>>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>> >>>>> >>>