It is expected that SDKs will have all their cores fully utilized by
processing bundles in parallel and not by performing intrabundle
parallelization. This allows for DoFns to be chained together via regular
method calls because the overhead to pass a single element through all the
DoFn's should be as minimal as possible

What is the overhead of using a completion stage vs using a regular method
call?

On Sun, Mar 11, 2018 at 10:18 PM, Romain Manni-Bucau <rmannibu...@gmail.com>
wrote:

>
>
> Le 12 mars 2018 00:19, "Reuven Lax" <re...@google.com> a écrit :
>
>
>
>
> On Sun, Mar 11, 2018 at 7:40 AM Romain Manni-Bucau <rmannibu...@gmail.com>
> wrote:
>
>> makes me think beam should maybe do 2 internals changes before moving
>> forward on (s)df API changes:
>>
>> 1. define a beam singleton per JVM (classloader hierarchy actually but
>> you get the idea I think) which can be used to store things locally for
>> reuse - see 2 for an example or metrics pusher work Etienne does could
>> benefit from it too
>>
>
> I think we do need something like this, but it needs to be a bit more than
> a singleton per JVM. For one thing it needs to be at least per pipeline
> within a JVM. You might run multiple tests in a single JVM, and it should
> also be possible to run those tests in parallel without the static state
> interfering with each other. I also think the state needs to be addressable
> per step (i.e. a ParDo can look up its static state without caring about
> static state belonging to another ParDo).
>
>
> Agree but you can register in a singleton the pipeline (as "ref" not as
> instance) and therefore hit the same need. +1 to have scopes (singleton,
> pipeline, thread) but it still requires a single singleton to handle
> serialization ;).
>
>
>
>
> 2. define a SPI to load (s)dofn parameter provider instead of having an
>> ArgProvider which provides everything which is supported. This way you can
>> use any kind of parameter and the parameterproviders can use 1. to handle
>> their own state. First impl of the parameterprovider SPI would be a) state
>> b) timer c) reactive handlers and potentially user parameter providers
>> (service like which can be singleton in the scope of a "JVM" thanks to 1).
>>
>>
>> Romain Manni-Bucau
>> @rmannibucau <https://twitter.com/rmannibucau> |  Blog
>> <https://rmannibucau.metawerx.net/> | Old Blog
>> <http://rmannibucau.wordpress.com> | Github
>> <https://github.com/rmannibucau> | LinkedIn
>> <https://www.linkedin.com/in/rmannibucau> | Book
>> <https://www.packtpub.com/application-development/java-ee-8-high-performance>
>>
>> 2018-03-11 15:32 GMT+01:00 Reuven Lax <re...@google.com>:
>>
>>> Yep. Introduce OutputEmitter, and Process context no longer has much use.
>>>
>>> On Sun, Mar 11, 2018, 11:19 AM Romain Manni-Bucau <rmannibu...@gmail.com>
>>> wrote:
>>>
>>>> Which is still a key feature for sdf but agree it can be dropped for an
>>>> outputemitter pattern and the dofn moved to a plain parameters injection
>>>> based pattern. Both (which completionstage) stays compatible :).
>>>>
>>>> Le 11 mars 2018 13:12, "Reuven Lax" <re...@google.com> a écrit :
>>>>
>>>>> I think process context should go away completely. At that point it
>>>>> has little use except for a way to send output downstream.
>>>>>
>>>>> On Sun, Mar 11, 2018, 6:07 AM Romain Manni-Bucau <
>>>>> rmannibu...@gmail.com> wrote:
>>>>>
>>>>>> Hmm, thinking out loud but completionstage should/could be extended
>>>>>> to replace processcontext since it represents element and output at the
>>>>>> same time no?
>>>>>>
>>>>>> Le 11 mars 2018 00:57, "Kenneth Knowles" <k...@google.com> a écrit :
>>>>>>
>>>>>>> Yea, I think it could. But it is probably more readable to not
>>>>>>> overload the term, plus certainly a bit simpler in implementation. So
>>>>>>> perhaps @AsyncElement to make it very clear.
>>>>>>>
>>>>>>> Kenn
>>>>>>>
>>>>>>> On Sat, Mar 10, 2018 at 1:32 PM Reuven Lax <re...@google.com> wrote:
>>>>>>>
>>>>>>>> Ken, can NewDoFn distinguish at generation time the difference
>>>>>>>> between:
>>>>>>>>
>>>>>>>>     public void process(@Element CompletionStage<InputT> element,
>>>>>>>> ...) {
>>>>>>>>
>>>>>>>> and
>>>>>>>>
>>>>>>>>     public void process(@Element Input element, ...) {
>>>>>>>>
>>>>>>>> If not, then we would probably need separate annotations....
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> On Sat, Mar 10, 2018 at 11:09 AM Kenneth Knowles <k...@google.com>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> Nice! I agree that providing a CompletionStage for chaining is
>>>>>>>>> much better than an ExecutorService, and very clear.
>>>>>>>>>
>>>>>>>>> It is very feasible to add support that looks like
>>>>>>>>>
>>>>>>>>>   new DoFn<InputT, OutputT>() {
>>>>>>>>>     @ProcessElement
>>>>>>>>>     public void process(@Element CompletionStage<InputT> element,
>>>>>>>>> ...) {
>>>>>>>>>       element.thenApply(...)
>>>>>>>>>     }
>>>>>>>>>   }
>>>>>>>>>
>>>>>>>>> If we had this available, I think users could even experiment with
>>>>>>>>> this often as it might help even where it isn't obvious.
>>>>>>>>>
>>>>>>>>> My main hesitation is that big part of Beam is giving a
>>>>>>>>> basic/imperative style of programming a DoFn that executes in a very 
>>>>>>>>> smart
>>>>>>>>> functional/parallel way. Full future-oriented programming is not
>>>>>>>>> explored much outside of Javascript (and maybe Haskell) and requires
>>>>>>>>> greater discipline in programming in a functional manner - if you are
>>>>>>>>> mutating stuff in your callback you are going to have bugs, and then 
>>>>>>>>> when
>>>>>>>>> you add concurrency control you are going to have bad performance and
>>>>>>>>> deadlocks. So I definitely wouldn't make it the default or want to 
>>>>>>>>> spend
>>>>>>>>> all our support effort on teaching advanced programming technique.
>>>>>>>>>
>>>>>>>>> Kenn
>>>>>>>>>
>>>>>>>>> On Sat, Mar 10, 2018 at 9:31 AM Romain Manni-Bucau <
>>>>>>>>> rmannibu...@gmail.com> wrote:
>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> 2018-03-10 17:30 GMT+01:00 Reuven Lax <re...@google.com>:
>>>>>>>>>>
>>>>>>>>>>> Have you considered drafting in detail what you think this API
>>>>>>>>>>> might look like?
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> Yes, but it is after the "enhancements" - for my use cases - and
>>>>>>>>>> "bugs" list so didn't started to work on it much.
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> If it's a radically different API, it might be more appropriate
>>>>>>>>>>> as an alternative parallel Beam API rather than a replacement for 
>>>>>>>>>>> the
>>>>>>>>>>> current API (there is also one such fluent API in the works).
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> What I plan is to draft it on top of beam (so the "useless" case
>>>>>>>>>> I spoke about before) and then propose to impl it ~natively and move 
>>>>>>>>>> it as
>>>>>>>>>> main API for another major.
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> On Sat, Mar 10, 2018 at 7:23 AM Romain Manni-Bucau <
>>>>>>>>>>> rmannibu...@gmail.com> wrote:
>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> 2018-03-10 16:19 GMT+01:00 Reuven Lax <re...@google.com>:
>>>>>>>>>>>>
>>>>>>>>>>>>> This is another version (maybe a better, Java 8 idiomatic
>>>>>>>>>>>>> one?) of what Kenn suggested.
>>>>>>>>>>>>>
>>>>>>>>>>>>> Note that with NewDoFn this need not be incompatible (so might
>>>>>>>>>>>>> not require waiting till Beam 3.0). We can recognize new 
>>>>>>>>>>>>> parameters to
>>>>>>>>>>>>> processElement and populate add needed.
>>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> This is right however in my head it was a single way movemenent
>>>>>>>>>>>> to enforce the design to be reactive and not fake a reactive API 
>>>>>>>>>>>> with a
>>>>>>>>>>>> sync and not reactive impl which is what would be done today with 
>>>>>>>>>>>> both
>>>>>>>>>>>> support I fear.
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>> On Sat, Mar 10, 2018, 12:13 PM Romain Manni-Bucau <
>>>>>>>>>>>>> rmannibu...@gmail.com> wrote:
>>>>>>>>>>>>>
>>>>>>>>>>>>>> Yes, for the dofn for instance, instead of having
>>>>>>>>>>>>>> processcontext.element()=<T> you get a CompletionStage<T> and 
>>>>>>>>>>>>>> output gets
>>>>>>>>>>>>>> it as well.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> This way you register an execution chain. Mixed with streams
>>>>>>>>>>>>>> you get a big data java 8/9/10 API which enabkes any 
>>>>>>>>>>>>>> connectivity in a wel
>>>>>>>>>>>>>> performing manner ;).
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Le 10 mars 2018 13:56, "Reuven Lax" <re...@google.com> a
>>>>>>>>>>>>>> écrit :
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> So you mean the user should have a way of registering
>>>>>>>>>>>>>>> asynchronous activity with a callback (the callback must be 
>>>>>>>>>>>>>>> registered with
>>>>>>>>>>>>>>> Beam, because Beam needs to know not to mark the element as 
>>>>>>>>>>>>>>> done until all
>>>>>>>>>>>>>>> associated callbacks have completed). I think that's basically 
>>>>>>>>>>>>>>> what Kenn
>>>>>>>>>>>>>>> was suggesting, unless I'm missing something.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> On Fri, Mar 9, 2018 at 11:07 PM Romain Manni-Bucau <
>>>>>>>>>>>>>>> rmannibu...@gmail.com> wrote:
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Yes, callback based. Beam today is synchronous and until
>>>>>>>>>>>>>>>> bundles+combines are reactive friendly, beam will be 
>>>>>>>>>>>>>>>> synchronous whatever
>>>>>>>>>>>>>>>> other parts do. Becoming reactive will enable to manage the 
>>>>>>>>>>>>>>>> threading
>>>>>>>>>>>>>>>> issues properly and to have better scalability on the overall 
>>>>>>>>>>>>>>>> execution
>>>>>>>>>>>>>>>> when remote IO are involved.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> However it requires to break source, sdf design to use
>>>>>>>>>>>>>>>> completionstage - or equivalent - to chain the processing 
>>>>>>>>>>>>>>>> properly and in
>>>>>>>>>>>>>>>> an unified fashion.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Le 9 mars 2018 23:48, "Reuven Lax" <re...@google.com> a
>>>>>>>>>>>>>>>> écrit :
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> If you're talking about reactive programming, at a certain
>>>>>>>>>>>>>>>> level beam is already reactive. Are you referring to a 
>>>>>>>>>>>>>>>> specific way of
>>>>>>>>>>>>>>>> writing the code?
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> On Fri, Mar 9, 2018 at 1:59 PM Reuven Lax <re...@google.com>
>>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> What do you mean by reactive?
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> On Fri, Mar 9, 2018, 6:58 PM Romain Manni-Bucau <
>>>>>>>>>>>>>>>>> rmannibu...@gmail.com> wrote:
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> @Kenn: why not preferring to make beam reactive? Would
>>>>>>>>>>>>>>>>>> alow to scale way more without having to hardly synchronize 
>>>>>>>>>>>>>>>>>> multithreading.
>>>>>>>>>>>>>>>>>> Elegant and efficient :). Beam 3?
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> Le 9 mars 2018 22:49, "Kenneth Knowles" <k...@google.com>
>>>>>>>>>>>>>>>>>> a écrit :
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> I will start with the "exciting futuristic" answer,
>>>>>>>>>>>>>>>>>>> which is that we envision the new DoFn to be able to 
>>>>>>>>>>>>>>>>>>> provide an automatic
>>>>>>>>>>>>>>>>>>> ExecutorService parameters that you can use as you wish.
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>     new DoFn<>() {
>>>>>>>>>>>>>>>>>>>       @ProcessElement
>>>>>>>>>>>>>>>>>>>       public void process(ProcessContext ctx,
>>>>>>>>>>>>>>>>>>> ExecutorService executorService) {
>>>>>>>>>>>>>>>>>>>           ... launch some futures, put them in instance
>>>>>>>>>>>>>>>>>>> vars ...
>>>>>>>>>>>>>>>>>>>       }
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>       @FinishBundle
>>>>>>>>>>>>>>>>>>>       public void finish(...) {
>>>>>>>>>>>>>>>>>>>          ... block on futures, output results if
>>>>>>>>>>>>>>>>>>> appropriate ...
>>>>>>>>>>>>>>>>>>>       }
>>>>>>>>>>>>>>>>>>>     }
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> This way, the Java SDK harness can use its overarching
>>>>>>>>>>>>>>>>>>> knowledge of what is going on in a computation to, for 
>>>>>>>>>>>>>>>>>>> example, share a
>>>>>>>>>>>>>>>>>>> thread pool between different bits. This was one reason to 
>>>>>>>>>>>>>>>>>>> delete
>>>>>>>>>>>>>>>>>>> IntraBundleParallelization - it didn't allow the runner and 
>>>>>>>>>>>>>>>>>>> user code to
>>>>>>>>>>>>>>>>>>> properly manage how many things were going on concurrently. 
>>>>>>>>>>>>>>>>>>> And mostly the
>>>>>>>>>>>>>>>>>>> runner should own parallelizing to max out cores and what 
>>>>>>>>>>>>>>>>>>> user code needs
>>>>>>>>>>>>>>>>>>> is asynchrony hooks that can interact with that. However, 
>>>>>>>>>>>>>>>>>>> this feature is
>>>>>>>>>>>>>>>>>>> not thoroughly considered. TBD how much the harness itself 
>>>>>>>>>>>>>>>>>>> manages blocking
>>>>>>>>>>>>>>>>>>> on outstanding requests versus it being your responsibility 
>>>>>>>>>>>>>>>>>>> in
>>>>>>>>>>>>>>>>>>> FinishBundle, etc.
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> I haven't explored rolling your own here, if you are
>>>>>>>>>>>>>>>>>>> willing to do the knob tuning to get the threading 
>>>>>>>>>>>>>>>>>>> acceptable for your
>>>>>>>>>>>>>>>>>>> particular use case. Perhaps someone else can weigh in.
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> Kenn
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> On Fri, Mar 9, 2018 at 1:38 PM Josh Ferge <
>>>>>>>>>>>>>>>>>>> josh.fe...@bounceexchange.com> wrote:
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> Hello all:
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> Our team has a pipeline that make external network
>>>>>>>>>>>>>>>>>>>> calls. These pipelines are currently super slow, and the 
>>>>>>>>>>>>>>>>>>>> hypothesis is that
>>>>>>>>>>>>>>>>>>>> they are slow because we are not threading for our network 
>>>>>>>>>>>>>>>>>>>> calls. The
>>>>>>>>>>>>>>>>>>>> github issue below provides some discussion around this:
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> https://github.com/apache/beam/pull/957
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> In beam 1.0, there was IntraBundleParallelization,
>>>>>>>>>>>>>>>>>>>> which helped with this. However, this was removed because 
>>>>>>>>>>>>>>>>>>>> it didn't comply
>>>>>>>>>>>>>>>>>>>> with a few BEAM paradigms.
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> Questions going forward:
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> What is advised for jobs that make blocking network
>>>>>>>>>>>>>>>>>>>> calls? It seems bundling the elements into groups of size 
>>>>>>>>>>>>>>>>>>>> X prior to
>>>>>>>>>>>>>>>>>>>> passing to the DoFn, and managing the threading within the 
>>>>>>>>>>>>>>>>>>>> function might
>>>>>>>>>>>>>>>>>>>> work. thoughts?
>>>>>>>>>>>>>>>>>>>> Are these types of jobs even suitable for beam?
>>>>>>>>>>>>>>>>>>>> Are there any plans to develop features that help with
>>>>>>>>>>>>>>>>>>>> this?
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> Thanks
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>
>>
>

Reply via email to