Ken, can NewDoFn distinguish at generation time the difference between: public void process(@Element CompletionStage<InputT> element, ...) {
and public void process(@Element Input element, ...) { If not, then we would probably need separate annotations.... On Sat, Mar 10, 2018 at 11:09 AM Kenneth Knowles <k...@google.com> wrote: > Nice! I agree that providing a CompletionStage for chaining is much better > than an ExecutorService, and very clear. > > It is very feasible to add support that looks like > > new DoFn<InputT, OutputT>() { > @ProcessElement > public void process(@Element CompletionStage<InputT> element, ...) { > element.thenApply(...) > } > } > > If we had this available, I think users could even experiment with this > often as it might help even where it isn't obvious. > > My main hesitation is that big part of Beam is giving a basic/imperative > style of programming a DoFn that executes in a very smart > functional/parallel way. Full future-oriented programming is not explored > much outside of Javascript (and maybe Haskell) and requires greater > discipline in programming in a functional manner - if you are mutating > stuff in your callback you are going to have bugs, and then when you add > concurrency control you are going to have bad performance and deadlocks. So > I definitely wouldn't make it the default or want to spend all our support > effort on teaching advanced programming technique. > > Kenn > > On Sat, Mar 10, 2018 at 9:31 AM Romain Manni-Bucau <rmannibu...@gmail.com> > wrote: > >> >> >> 2018-03-10 17:30 GMT+01:00 Reuven Lax <re...@google.com>: >> >>> Have you considered drafting in detail what you think this API might >>> look like? >>> >> >> >> Yes, but it is after the "enhancements" - for my use cases - and "bugs" >> list so didn't started to work on it much. >> >> >>> >>> If it's a radically different API, it might be more appropriate as an >>> alternative parallel Beam API rather than a replacement for the current API >>> (there is also one such fluent API in the works). >>> >> >> What I plan is to draft it on top of beam (so the "useless" case I spoke >> about before) and then propose to impl it ~natively and move it as main API >> for another major. >> >> >> >> >>> >>> >>> On Sat, Mar 10, 2018 at 7:23 AM Romain Manni-Bucau < >>> rmannibu...@gmail.com> wrote: >>> >>>> >>>> >>>> 2018-03-10 16:19 GMT+01:00 Reuven Lax <re...@google.com>: >>>> >>>>> This is another version (maybe a better, Java 8 idiomatic one?) of >>>>> what Kenn suggested. >>>>> >>>>> Note that with NewDoFn this need not be incompatible (so might not >>>>> require waiting till Beam 3.0). We can recognize new parameters to >>>>> processElement and populate add needed. >>>>> >>>> >>>> This is right however in my head it was a single way movemenent to >>>> enforce the design to be reactive and not fake a reactive API with a sync >>>> and not reactive impl which is what would be done today with both support I >>>> fear. >>>> >>>> >>>>> >>>>> On Sat, Mar 10, 2018, 12:13 PM Romain Manni-Bucau < >>>>> rmannibu...@gmail.com> wrote: >>>>> >>>>>> Yes, for the dofn for instance, instead of having >>>>>> processcontext.element()=<T> you get a CompletionStage<T> and output gets >>>>>> it as well. >>>>>> >>>>>> This way you register an execution chain. Mixed with streams you get >>>>>> a big data java 8/9/10 API which enabkes any connectivity in a wel >>>>>> performing manner ;). >>>>>> >>>>>> Le 10 mars 2018 13:56, "Reuven Lax" <re...@google.com> a écrit : >>>>>> >>>>>>> So you mean the user should have a way of registering asynchronous >>>>>>> activity with a callback (the callback must be registered with Beam, >>>>>>> because Beam needs to know not to mark the element as done until all >>>>>>> associated callbacks have completed). I think that's basically what Kenn >>>>>>> was suggesting, unless I'm missing something. >>>>>>> >>>>>>> >>>>>>> On Fri, Mar 9, 2018 at 11:07 PM Romain Manni-Bucau < >>>>>>> rmannibu...@gmail.com> wrote: >>>>>>> >>>>>>>> Yes, callback based. Beam today is synchronous and until >>>>>>>> bundles+combines are reactive friendly, beam will be synchronous >>>>>>>> whatever >>>>>>>> other parts do. Becoming reactive will enable to manage the threading >>>>>>>> issues properly and to have better scalability on the overall execution >>>>>>>> when remote IO are involved. >>>>>>>> >>>>>>>> However it requires to break source, sdf design to use >>>>>>>> completionstage - or equivalent - to chain the processing properly and >>>>>>>> in >>>>>>>> an unified fashion. >>>>>>>> >>>>>>>> Le 9 mars 2018 23:48, "Reuven Lax" <re...@google.com> a écrit : >>>>>>>> >>>>>>>> If you're talking about reactive programming, at a certain level >>>>>>>> beam is already reactive. Are you referring to a specific way of >>>>>>>> writing >>>>>>>> the code? >>>>>>>> >>>>>>>> >>>>>>>> On Fri, Mar 9, 2018 at 1:59 PM Reuven Lax <re...@google.com> wrote: >>>>>>>> >>>>>>>>> What do you mean by reactive? >>>>>>>>> >>>>>>>>> On Fri, Mar 9, 2018, 6:58 PM Romain Manni-Bucau < >>>>>>>>> rmannibu...@gmail.com> wrote: >>>>>>>>> >>>>>>>>>> @Kenn: why not preferring to make beam reactive? Would alow to >>>>>>>>>> scale way more without having to hardly synchronize multithreading. >>>>>>>>>> Elegant >>>>>>>>>> and efficient :). Beam 3? >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> Le 9 mars 2018 22:49, "Kenneth Knowles" <k...@google.com> a >>>>>>>>>> écrit : >>>>>>>>>> >>>>>>>>>>> I will start with the "exciting futuristic" answer, which is >>>>>>>>>>> that we envision the new DoFn to be able to provide an automatic >>>>>>>>>>> ExecutorService parameters that you can use as you wish. >>>>>>>>>>> >>>>>>>>>>> new DoFn<>() { >>>>>>>>>>> @ProcessElement >>>>>>>>>>> public void process(ProcessContext ctx, ExecutorService >>>>>>>>>>> executorService) { >>>>>>>>>>> ... launch some futures, put them in instance vars ... >>>>>>>>>>> } >>>>>>>>>>> >>>>>>>>>>> @FinishBundle >>>>>>>>>>> public void finish(...) { >>>>>>>>>>> ... block on futures, output results if appropriate ... >>>>>>>>>>> } >>>>>>>>>>> } >>>>>>>>>>> >>>>>>>>>>> This way, the Java SDK harness can use its overarching knowledge >>>>>>>>>>> of what is going on in a computation to, for example, share a >>>>>>>>>>> thread pool >>>>>>>>>>> between different bits. This was one reason to delete >>>>>>>>>>> IntraBundleParallelization - it didn't allow the runner and user >>>>>>>>>>> code to >>>>>>>>>>> properly manage how many things were going on concurrently. And >>>>>>>>>>> mostly the >>>>>>>>>>> runner should own parallelizing to max out cores and what user code >>>>>>>>>>> needs >>>>>>>>>>> is asynchrony hooks that can interact with that. However, this >>>>>>>>>>> feature is >>>>>>>>>>> not thoroughly considered. TBD how much the harness itself manages >>>>>>>>>>> blocking >>>>>>>>>>> on outstanding requests versus it being your responsibility in >>>>>>>>>>> FinishBundle, etc. >>>>>>>>>>> >>>>>>>>>>> I haven't explored rolling your own here, if you are willing to >>>>>>>>>>> do the knob tuning to get the threading acceptable for your >>>>>>>>>>> particular use >>>>>>>>>>> case. Perhaps someone else can weigh in. >>>>>>>>>>> >>>>>>>>>>> Kenn >>>>>>>>>>> >>>>>>>>>>> On Fri, Mar 9, 2018 at 1:38 PM Josh Ferge < >>>>>>>>>>> josh.fe...@bounceexchange.com> wrote: >>>>>>>>>>> >>>>>>>>>>>> Hello all: >>>>>>>>>>>> >>>>>>>>>>>> Our team has a pipeline that make external network calls. These >>>>>>>>>>>> pipelines are currently super slow, and the hypothesis is that >>>>>>>>>>>> they are >>>>>>>>>>>> slow because we are not threading for our network calls. The >>>>>>>>>>>> github issue >>>>>>>>>>>> below provides some discussion around this: >>>>>>>>>>>> >>>>>>>>>>>> https://github.com/apache/beam/pull/957 >>>>>>>>>>>> >>>>>>>>>>>> In beam 1.0, there was IntraBundleParallelization, which helped >>>>>>>>>>>> with this. However, this was removed because it didn't comply with >>>>>>>>>>>> a few >>>>>>>>>>>> BEAM paradigms. >>>>>>>>>>>> >>>>>>>>>>>> Questions going forward: >>>>>>>>>>>> >>>>>>>>>>>> What is advised for jobs that make blocking network calls? It >>>>>>>>>>>> seems bundling the elements into groups of size X prior to passing >>>>>>>>>>>> to the >>>>>>>>>>>> DoFn, and managing the threading within the function might work. >>>>>>>>>>>> thoughts? >>>>>>>>>>>> Are these types of jobs even suitable for beam? >>>>>>>>>>>> Are there any plans to develop features that help with this? >>>>>>>>>>>> >>>>>>>>>>>> Thanks >>>>>>>>>>>> >>>>>>>>>>> >>>>>>>> >>>> >>