Le 12 mars 2018 00:19, "Reuven Lax" <re...@google.com> a écrit :
On Sun, Mar 11, 2018 at 7:40 AM Romain Manni-Bucau <rmannibu...@gmail.com> wrote: > makes me think beam should maybe do 2 internals changes before moving > forward on (s)df API changes: > > 1. define a beam singleton per JVM (classloader hierarchy actually but you > get the idea I think) which can be used to store things locally for reuse - > see 2 for an example or metrics pusher work Etienne does could benefit from > it too > I think we do need something like this, but it needs to be a bit more than a singleton per JVM. For one thing it needs to be at least per pipeline within a JVM. You might run multiple tests in a single JVM, and it should also be possible to run those tests in parallel without the static state interfering with each other. I also think the state needs to be addressable per step (i.e. a ParDo can look up its static state without caring about static state belonging to another ParDo). Agree but you can register in a singleton the pipeline (as "ref" not as instance) and therefore hit the same need. +1 to have scopes (singleton, pipeline, thread) but it still requires a single singleton to handle serialization ;). 2. define a SPI to load (s)dofn parameter provider instead of having an > ArgProvider which provides everything which is supported. This way you can > use any kind of parameter and the parameterproviders can use 1. to handle > their own state. First impl of the parameterprovider SPI would be a) state > b) timer c) reactive handlers and potentially user parameter providers > (service like which can be singleton in the scope of a "JVM" thanks to 1). > > > Romain Manni-Bucau > @rmannibucau <https://twitter.com/rmannibucau> | Blog > <https://rmannibucau.metawerx.net/> | Old Blog > <http://rmannibucau.wordpress.com> | Github > <https://github.com/rmannibucau> | LinkedIn > <https://www.linkedin.com/in/rmannibucau> | Book > <https://www.packtpub.com/application-development/java-ee-8-high-performance> > > 2018-03-11 15:32 GMT+01:00 Reuven Lax <re...@google.com>: > >> Yep. Introduce OutputEmitter, and Process context no longer has much use. >> >> On Sun, Mar 11, 2018, 11:19 AM Romain Manni-Bucau <rmannibu...@gmail.com> >> wrote: >> >>> Which is still a key feature for sdf but agree it can be dropped for an >>> outputemitter pattern and the dofn moved to a plain parameters injection >>> based pattern. Both (which completionstage) stays compatible :). >>> >>> Le 11 mars 2018 13:12, "Reuven Lax" <re...@google.com> a écrit : >>> >>>> I think process context should go away completely. At that point it has >>>> little use except for a way to send output downstream. >>>> >>>> On Sun, Mar 11, 2018, 6:07 AM Romain Manni-Bucau <rmannibu...@gmail.com> >>>> wrote: >>>> >>>>> Hmm, thinking out loud but completionstage should/could be extended to >>>>> replace processcontext since it represents element and output at the same >>>>> time no? >>>>> >>>>> Le 11 mars 2018 00:57, "Kenneth Knowles" <k...@google.com> a écrit : >>>>> >>>>>> Yea, I think it could. But it is probably more readable to not >>>>>> overload the term, plus certainly a bit simpler in implementation. So >>>>>> perhaps @AsyncElement to make it very clear. >>>>>> >>>>>> Kenn >>>>>> >>>>>> On Sat, Mar 10, 2018 at 1:32 PM Reuven Lax <re...@google.com> wrote: >>>>>> >>>>>>> Ken, can NewDoFn distinguish at generation time the difference >>>>>>> between: >>>>>>> >>>>>>> public void process(@Element CompletionStage<InputT> element, >>>>>>> ...) { >>>>>>> >>>>>>> and >>>>>>> >>>>>>> public void process(@Element Input element, ...) { >>>>>>> >>>>>>> If not, then we would probably need separate annotations.... >>>>>>> >>>>>>> >>>>>>> >>>>>>> >>>>>>> >>>>>>> On Sat, Mar 10, 2018 at 11:09 AM Kenneth Knowles <k...@google.com> >>>>>>> wrote: >>>>>>> >>>>>>>> Nice! I agree that providing a CompletionStage for chaining is much >>>>>>>> better than an ExecutorService, and very clear. >>>>>>>> >>>>>>>> It is very feasible to add support that looks like >>>>>>>> >>>>>>>> new DoFn<InputT, OutputT>() { >>>>>>>> @ProcessElement >>>>>>>> public void process(@Element CompletionStage<InputT> element, >>>>>>>> ...) { >>>>>>>> element.thenApply(...) >>>>>>>> } >>>>>>>> } >>>>>>>> >>>>>>>> If we had this available, I think users could even experiment with >>>>>>>> this often as it might help even where it isn't obvious. >>>>>>>> >>>>>>>> My main hesitation is that big part of Beam is giving a >>>>>>>> basic/imperative style of programming a DoFn that executes in a very >>>>>>>> smart >>>>>>>> functional/parallel way. Full future-oriented programming is not >>>>>>>> explored much outside of Javascript (and maybe Haskell) and requires >>>>>>>> greater discipline in programming in a functional manner - if you are >>>>>>>> mutating stuff in your callback you are going to have bugs, and then >>>>>>>> when >>>>>>>> you add concurrency control you are going to have bad performance and >>>>>>>> deadlocks. So I definitely wouldn't make it the default or want to >>>>>>>> spend >>>>>>>> all our support effort on teaching advanced programming technique. >>>>>>>> >>>>>>>> Kenn >>>>>>>> >>>>>>>> On Sat, Mar 10, 2018 at 9:31 AM Romain Manni-Bucau < >>>>>>>> rmannibu...@gmail.com> wrote: >>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> 2018-03-10 17:30 GMT+01:00 Reuven Lax <re...@google.com>: >>>>>>>>> >>>>>>>>>> Have you considered drafting in detail what you think this API >>>>>>>>>> might look like? >>>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> Yes, but it is after the "enhancements" - for my use cases - and >>>>>>>>> "bugs" list so didn't started to work on it much. >>>>>>>>> >>>>>>>>> >>>>>>>>>> >>>>>>>>>> If it's a radically different API, it might be more appropriate >>>>>>>>>> as an alternative parallel Beam API rather than a replacement for the >>>>>>>>>> current API (there is also one such fluent API in the works). >>>>>>>>>> >>>>>>>>> >>>>>>>>> What I plan is to draft it on top of beam (so the "useless" case I >>>>>>>>> spoke about before) and then propose to impl it ~natively and move it >>>>>>>>> as >>>>>>>>> main API for another major. >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> On Sat, Mar 10, 2018 at 7:23 AM Romain Manni-Bucau < >>>>>>>>>> rmannibu...@gmail.com> wrote: >>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> 2018-03-10 16:19 GMT+01:00 Reuven Lax <re...@google.com>: >>>>>>>>>>> >>>>>>>>>>>> This is another version (maybe a better, Java 8 idiomatic one?) >>>>>>>>>>>> of what Kenn suggested. >>>>>>>>>>>> >>>>>>>>>>>> Note that with NewDoFn this need not be incompatible (so might >>>>>>>>>>>> not require waiting till Beam 3.0). We can recognize new >>>>>>>>>>>> parameters to >>>>>>>>>>>> processElement and populate add needed. >>>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> This is right however in my head it was a single way movemenent >>>>>>>>>>> to enforce the design to be reactive and not fake a reactive API >>>>>>>>>>> with a >>>>>>>>>>> sync and not reactive impl which is what would be done today with >>>>>>>>>>> both >>>>>>>>>>> support I fear. >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> On Sat, Mar 10, 2018, 12:13 PM Romain Manni-Bucau < >>>>>>>>>>>> rmannibu...@gmail.com> wrote: >>>>>>>>>>>> >>>>>>>>>>>>> Yes, for the dofn for instance, instead of having >>>>>>>>>>>>> processcontext.element()=<T> you get a CompletionStage<T> and >>>>>>>>>>>>> output gets >>>>>>>>>>>>> it as well. >>>>>>>>>>>>> >>>>>>>>>>>>> This way you register an execution chain. Mixed with streams >>>>>>>>>>>>> you get a big data java 8/9/10 API which enabkes any connectivity >>>>>>>>>>>>> in a wel >>>>>>>>>>>>> performing manner ;). >>>>>>>>>>>>> >>>>>>>>>>>>> Le 10 mars 2018 13:56, "Reuven Lax" <re...@google.com> a >>>>>>>>>>>>> écrit : >>>>>>>>>>>>> >>>>>>>>>>>>>> So you mean the user should have a way of registering >>>>>>>>>>>>>> asynchronous activity with a callback (the callback must be >>>>>>>>>>>>>> registered with >>>>>>>>>>>>>> Beam, because Beam needs to know not to mark the element as done >>>>>>>>>>>>>> until all >>>>>>>>>>>>>> associated callbacks have completed). I think that's basically >>>>>>>>>>>>>> what Kenn >>>>>>>>>>>>>> was suggesting, unless I'm missing something. >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> On Fri, Mar 9, 2018 at 11:07 PM Romain Manni-Bucau < >>>>>>>>>>>>>> rmannibu...@gmail.com> wrote: >>>>>>>>>>>>>> >>>>>>>>>>>>>>> Yes, callback based. Beam today is synchronous and until >>>>>>>>>>>>>>> bundles+combines are reactive friendly, beam will be >>>>>>>>>>>>>>> synchronous whatever >>>>>>>>>>>>>>> other parts do. Becoming reactive will enable to manage the >>>>>>>>>>>>>>> threading >>>>>>>>>>>>>>> issues properly and to have better scalability on the overall >>>>>>>>>>>>>>> execution >>>>>>>>>>>>>>> when remote IO are involved. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> However it requires to break source, sdf design to use >>>>>>>>>>>>>>> completionstage - or equivalent - to chain the processing >>>>>>>>>>>>>>> properly and in >>>>>>>>>>>>>>> an unified fashion. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> Le 9 mars 2018 23:48, "Reuven Lax" <re...@google.com> a >>>>>>>>>>>>>>> écrit : >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> If you're talking about reactive programming, at a certain >>>>>>>>>>>>>>> level beam is already reactive. Are you referring to a specific >>>>>>>>>>>>>>> way of >>>>>>>>>>>>>>> writing the code? >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> On Fri, Mar 9, 2018 at 1:59 PM Reuven Lax <re...@google.com> >>>>>>>>>>>>>>> wrote: >>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> What do you mean by reactive? >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> On Fri, Mar 9, 2018, 6:58 PM Romain Manni-Bucau < >>>>>>>>>>>>>>>> rmannibu...@gmail.com> wrote: >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> @Kenn: why not preferring to make beam reactive? Would >>>>>>>>>>>>>>>>> alow to scale way more without having to hardly synchronize >>>>>>>>>>>>>>>>> multithreading. >>>>>>>>>>>>>>>>> Elegant and efficient :). Beam 3? >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> Le 9 mars 2018 22:49, "Kenneth Knowles" <k...@google.com> >>>>>>>>>>>>>>>>> a écrit : >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> I will start with the "exciting futuristic" answer, which >>>>>>>>>>>>>>>>>> is that we envision the new DoFn to be able to provide an >>>>>>>>>>>>>>>>>> automatic >>>>>>>>>>>>>>>>>> ExecutorService parameters that you can use as you wish. >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> new DoFn<>() { >>>>>>>>>>>>>>>>>> @ProcessElement >>>>>>>>>>>>>>>>>> public void process(ProcessContext ctx, >>>>>>>>>>>>>>>>>> ExecutorService executorService) { >>>>>>>>>>>>>>>>>> ... launch some futures, put them in instance >>>>>>>>>>>>>>>>>> vars ... >>>>>>>>>>>>>>>>>> } >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> @FinishBundle >>>>>>>>>>>>>>>>>> public void finish(...) { >>>>>>>>>>>>>>>>>> ... block on futures, output results if >>>>>>>>>>>>>>>>>> appropriate ... >>>>>>>>>>>>>>>>>> } >>>>>>>>>>>>>>>>>> } >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> This way, the Java SDK harness can use its overarching >>>>>>>>>>>>>>>>>> knowledge of what is going on in a computation to, for >>>>>>>>>>>>>>>>>> example, share a >>>>>>>>>>>>>>>>>> thread pool between different bits. This was one reason to >>>>>>>>>>>>>>>>>> delete >>>>>>>>>>>>>>>>>> IntraBundleParallelization - it didn't allow the runner and >>>>>>>>>>>>>>>>>> user code to >>>>>>>>>>>>>>>>>> properly manage how many things were going on concurrently. >>>>>>>>>>>>>>>>>> And mostly the >>>>>>>>>>>>>>>>>> runner should own parallelizing to max out cores and what >>>>>>>>>>>>>>>>>> user code needs >>>>>>>>>>>>>>>>>> is asynchrony hooks that can interact with that. However, >>>>>>>>>>>>>>>>>> this feature is >>>>>>>>>>>>>>>>>> not thoroughly considered. TBD how much the harness itself >>>>>>>>>>>>>>>>>> manages blocking >>>>>>>>>>>>>>>>>> on outstanding requests versus it being your responsibility >>>>>>>>>>>>>>>>>> in >>>>>>>>>>>>>>>>>> FinishBundle, etc. >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> I haven't explored rolling your own here, if you are >>>>>>>>>>>>>>>>>> willing to do the knob tuning to get the threading >>>>>>>>>>>>>>>>>> acceptable for your >>>>>>>>>>>>>>>>>> particular use case. Perhaps someone else can weigh in. >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> Kenn >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> On Fri, Mar 9, 2018 at 1:38 PM Josh Ferge < >>>>>>>>>>>>>>>>>> josh.fe...@bounceexchange.com> wrote: >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> Hello all: >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> Our team has a pipeline that make external network >>>>>>>>>>>>>>>>>>> calls. These pipelines are currently super slow, and the >>>>>>>>>>>>>>>>>>> hypothesis is that >>>>>>>>>>>>>>>>>>> they are slow because we are not threading for our network >>>>>>>>>>>>>>>>>>> calls. The >>>>>>>>>>>>>>>>>>> github issue below provides some discussion around this: >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> https://github.com/apache/beam/pull/957 >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> In beam 1.0, there was IntraBundleParallelization, which >>>>>>>>>>>>>>>>>>> helped with this. However, this was removed because it >>>>>>>>>>>>>>>>>>> didn't comply with a >>>>>>>>>>>>>>>>>>> few BEAM paradigms. >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> Questions going forward: >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> What is advised for jobs that make blocking network >>>>>>>>>>>>>>>>>>> calls? It seems bundling the elements into groups of size X >>>>>>>>>>>>>>>>>>> prior to >>>>>>>>>>>>>>>>>>> passing to the DoFn, and managing the threading within the >>>>>>>>>>>>>>>>>>> function might >>>>>>>>>>>>>>>>>>> work. thoughts? >>>>>>>>>>>>>>>>>>> Are these types of jobs even suitable for beam? >>>>>>>>>>>>>>>>>>> Are there any plans to develop features that help with >>>>>>>>>>>>>>>>>>> this? >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> Thanks >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>> >>>>>>>>> >