Nice! I agree that providing a CompletionStage for chaining is much better than an ExecutorService, and very clear.
It is very feasible to add support that looks like new DoFn<InputT, OutputT>() { @ProcessElement public void process(@Element CompletionStage<InputT> element, ...) { element.thenApply(...) } } If we had this available, I think users could even experiment with this often as it might help even where it isn't obvious. My main hesitation is that big part of Beam is giving a basic/imperative style of programming a DoFn that executes in a very smart functional/parallel way. Full future-oriented programming is not explored much outside of Javascript (and maybe Haskell) and requires greater discipline in programming in a functional manner - if you are mutating stuff in your callback you are going to have bugs, and then when you add concurrency control you are going to have bad performance and deadlocks. So I definitely wouldn't make it the default or want to spend all our support effort on teaching advanced programming technique. Kenn On Sat, Mar 10, 2018 at 9:31 AM Romain Manni-Bucau <rmannibu...@gmail.com> wrote: > > > 2018-03-10 17:30 GMT+01:00 Reuven Lax <re...@google.com>: > >> Have you considered drafting in detail what you think this API might look >> like? >> > > > Yes, but it is after the "enhancements" - for my use cases - and "bugs" > list so didn't started to work on it much. > > >> >> If it's a radically different API, it might be more appropriate as an >> alternative parallel Beam API rather than a replacement for the current API >> (there is also one such fluent API in the works). >> > > What I plan is to draft it on top of beam (so the "useless" case I spoke > about before) and then propose to impl it ~natively and move it as main API > for another major. > > > > >> >> >> On Sat, Mar 10, 2018 at 7:23 AM Romain Manni-Bucau <rmannibu...@gmail.com> >> wrote: >> >>> >>> >>> 2018-03-10 16:19 GMT+01:00 Reuven Lax <re...@google.com>: >>> >>>> This is another version (maybe a better, Java 8 idiomatic one?) of what >>>> Kenn suggested. >>>> >>>> Note that with NewDoFn this need not be incompatible (so might not >>>> require waiting till Beam 3.0). We can recognize new parameters to >>>> processElement and populate add needed. >>>> >>> >>> This is right however in my head it was a single way movemenent to >>> enforce the design to be reactive and not fake a reactive API with a sync >>> and not reactive impl which is what would be done today with both support I >>> fear. >>> >>> >>>> >>>> On Sat, Mar 10, 2018, 12:13 PM Romain Manni-Bucau < >>>> rmannibu...@gmail.com> wrote: >>>> >>>>> Yes, for the dofn for instance, instead of having >>>>> processcontext.element()=<T> you get a CompletionStage<T> and output gets >>>>> it as well. >>>>> >>>>> This way you register an execution chain. Mixed with streams you get a >>>>> big data java 8/9/10 API which enabkes any connectivity in a wel >>>>> performing >>>>> manner ;). >>>>> >>>>> Le 10 mars 2018 13:56, "Reuven Lax" <re...@google.com> a écrit : >>>>> >>>>>> So you mean the user should have a way of registering asynchronous >>>>>> activity with a callback (the callback must be registered with Beam, >>>>>> because Beam needs to know not to mark the element as done until all >>>>>> associated callbacks have completed). I think that's basically what Kenn >>>>>> was suggesting, unless I'm missing something. >>>>>> >>>>>> >>>>>> On Fri, Mar 9, 2018 at 11:07 PM Romain Manni-Bucau < >>>>>> rmannibu...@gmail.com> wrote: >>>>>> >>>>>>> Yes, callback based. Beam today is synchronous and until >>>>>>> bundles+combines are reactive friendly, beam will be synchronous >>>>>>> whatever >>>>>>> other parts do. Becoming reactive will enable to manage the threading >>>>>>> issues properly and to have better scalability on the overall execution >>>>>>> when remote IO are involved. >>>>>>> >>>>>>> However it requires to break source, sdf design to use >>>>>>> completionstage - or equivalent - to chain the processing properly and >>>>>>> in >>>>>>> an unified fashion. >>>>>>> >>>>>>> Le 9 mars 2018 23:48, "Reuven Lax" <re...@google.com> a écrit : >>>>>>> >>>>>>> If you're talking about reactive programming, at a certain level >>>>>>> beam is already reactive. Are you referring to a specific way of writing >>>>>>> the code? >>>>>>> >>>>>>> >>>>>>> On Fri, Mar 9, 2018 at 1:59 PM Reuven Lax <re...@google.com> wrote: >>>>>>> >>>>>>>> What do you mean by reactive? >>>>>>>> >>>>>>>> On Fri, Mar 9, 2018, 6:58 PM Romain Manni-Bucau < >>>>>>>> rmannibu...@gmail.com> wrote: >>>>>>>> >>>>>>>>> @Kenn: why not preferring to make beam reactive? Would alow to >>>>>>>>> scale way more without having to hardly synchronize multithreading. >>>>>>>>> Elegant >>>>>>>>> and efficient :). Beam 3? >>>>>>>>> >>>>>>>>> >>>>>>>>> Le 9 mars 2018 22:49, "Kenneth Knowles" <k...@google.com> a écrit : >>>>>>>>> >>>>>>>>>> I will start with the "exciting futuristic" answer, which is that >>>>>>>>>> we envision the new DoFn to be able to provide an automatic >>>>>>>>>> ExecutorService >>>>>>>>>> parameters that you can use as you wish. >>>>>>>>>> >>>>>>>>>> new DoFn<>() { >>>>>>>>>> @ProcessElement >>>>>>>>>> public void process(ProcessContext ctx, ExecutorService >>>>>>>>>> executorService) { >>>>>>>>>> ... launch some futures, put them in instance vars ... >>>>>>>>>> } >>>>>>>>>> >>>>>>>>>> @FinishBundle >>>>>>>>>> public void finish(...) { >>>>>>>>>> ... block on futures, output results if appropriate ... >>>>>>>>>> } >>>>>>>>>> } >>>>>>>>>> >>>>>>>>>> This way, the Java SDK harness can use its overarching knowledge >>>>>>>>>> of what is going on in a computation to, for example, share a thread >>>>>>>>>> pool >>>>>>>>>> between different bits. This was one reason to delete >>>>>>>>>> IntraBundleParallelization - it didn't allow the runner and user >>>>>>>>>> code to >>>>>>>>>> properly manage how many things were going on concurrently. And >>>>>>>>>> mostly the >>>>>>>>>> runner should own parallelizing to max out cores and what user code >>>>>>>>>> needs >>>>>>>>>> is asynchrony hooks that can interact with that. However, this >>>>>>>>>> feature is >>>>>>>>>> not thoroughly considered. TBD how much the harness itself manages >>>>>>>>>> blocking >>>>>>>>>> on outstanding requests versus it being your responsibility in >>>>>>>>>> FinishBundle, etc. >>>>>>>>>> >>>>>>>>>> I haven't explored rolling your own here, if you are willing to >>>>>>>>>> do the knob tuning to get the threading acceptable for your >>>>>>>>>> particular use >>>>>>>>>> case. Perhaps someone else can weigh in. >>>>>>>>>> >>>>>>>>>> Kenn >>>>>>>>>> >>>>>>>>>> On Fri, Mar 9, 2018 at 1:38 PM Josh Ferge < >>>>>>>>>> josh.fe...@bounceexchange.com> wrote: >>>>>>>>>> >>>>>>>>>>> Hello all: >>>>>>>>>>> >>>>>>>>>>> Our team has a pipeline that make external network calls. These >>>>>>>>>>> pipelines are currently super slow, and the hypothesis is that they >>>>>>>>>>> are >>>>>>>>>>> slow because we are not threading for our network calls. The github >>>>>>>>>>> issue >>>>>>>>>>> below provides some discussion around this: >>>>>>>>>>> >>>>>>>>>>> https://github.com/apache/beam/pull/957 >>>>>>>>>>> >>>>>>>>>>> In beam 1.0, there was IntraBundleParallelization, which helped >>>>>>>>>>> with this. However, this was removed because it didn't comply with >>>>>>>>>>> a few >>>>>>>>>>> BEAM paradigms. >>>>>>>>>>> >>>>>>>>>>> Questions going forward: >>>>>>>>>>> >>>>>>>>>>> What is advised for jobs that make blocking network calls? It >>>>>>>>>>> seems bundling the elements into groups of size X prior to passing >>>>>>>>>>> to the >>>>>>>>>>> DoFn, and managing the threading within the function might work. >>>>>>>>>>> thoughts? >>>>>>>>>>> Are these types of jobs even suitable for beam? >>>>>>>>>>> Are there any plans to develop features that help with this? >>>>>>>>>>> >>>>>>>>>>> Thanks >>>>>>>>>>> >>>>>>>>>> >>>>>>> >>> >