Ken, can NewDoFn distinguish at generation time the difference between:

    public void process(@Element CompletionStage<InputT> element, ...) {

and

    public void process(@Element Input element, ...) {

If not, then we would probably need separate annotations....





On Sat, Mar 10, 2018 at 11:09 AM Kenneth Knowles <k...@google.com> wrote:

> Nice! I agree that providing a CompletionStage for chaining is much better
> than an ExecutorService, and very clear.
>
> It is very feasible to add support that looks like
>
>   new DoFn<InputT, OutputT>() {
>     @ProcessElement
>     public void process(@Element CompletionStage<InputT> element, ...) {
>       element.thenApply(...)
>     }
>   }
>
> If we had this available, I think users could even experiment with this
> often as it might help even where it isn't obvious.
>
> My main hesitation is that big part of Beam is giving a basic/imperative
> style of programming a DoFn that executes in a very smart
> functional/parallel way. Full future-oriented programming is not explored
> much outside of Javascript (and maybe Haskell) and requires greater
> discipline in programming in a functional manner - if you are mutating
> stuff in your callback you are going to have bugs, and then when you add
> concurrency control you are going to have bad performance and deadlocks. So
> I definitely wouldn't make it the default or want to spend all our support
> effort on teaching advanced programming technique.
>
> Kenn
>
> On Sat, Mar 10, 2018 at 9:31 AM Romain Manni-Bucau <rmannibu...@gmail.com>
> wrote:
>
>>
>>
>> 2018-03-10 17:30 GMT+01:00 Reuven Lax <re...@google.com>:
>>
>>> Have you considered drafting in detail what you think this API might
>>> look like?
>>>
>>
>>
>> Yes, but it is after the "enhancements" - for my use cases - and "bugs"
>> list so didn't started to work on it much.
>>
>>
>>>
>>> If it's a radically different API, it might be more appropriate as an
>>> alternative parallel Beam API rather than a replacement for the current API
>>> (there is also one such fluent API in the works).
>>>
>>
>> What I plan is to draft it on top of beam (so the "useless" case I spoke
>> about before) and then propose to impl it ~natively and move it as main API
>> for another major.
>>
>>
>>
>>
>>>
>>>
>>> On Sat, Mar 10, 2018 at 7:23 AM Romain Manni-Bucau <
>>> rmannibu...@gmail.com> wrote:
>>>
>>>>
>>>>
>>>> 2018-03-10 16:19 GMT+01:00 Reuven Lax <re...@google.com>:
>>>>
>>>>> This is another version (maybe a better, Java 8 idiomatic one?) of
>>>>> what Kenn suggested.
>>>>>
>>>>> Note that with NewDoFn this need not be incompatible (so might not
>>>>> require waiting till Beam 3.0). We can recognize new parameters to
>>>>> processElement and populate add needed.
>>>>>
>>>>
>>>> This is right however in my head it was a single way movemenent to
>>>> enforce the design to be reactive and not fake a reactive API with a sync
>>>> and not reactive impl which is what would be done today with both support I
>>>> fear.
>>>>
>>>>
>>>>>
>>>>> On Sat, Mar 10, 2018, 12:13 PM Romain Manni-Bucau <
>>>>> rmannibu...@gmail.com> wrote:
>>>>>
>>>>>> Yes, for the dofn for instance, instead of having
>>>>>> processcontext.element()=<T> you get a CompletionStage<T> and output gets
>>>>>> it as well.
>>>>>>
>>>>>> This way you register an execution chain. Mixed with streams you get
>>>>>> a big data java 8/9/10 API which enabkes any connectivity in a wel
>>>>>> performing manner ;).
>>>>>>
>>>>>> Le 10 mars 2018 13:56, "Reuven Lax" <re...@google.com> a écrit :
>>>>>>
>>>>>>> So you mean the user should have a way of registering asynchronous
>>>>>>> activity with a callback (the callback must be registered with Beam,
>>>>>>> because Beam needs to know not to mark the element as done until all
>>>>>>> associated callbacks have completed). I think that's basically what Kenn
>>>>>>> was suggesting, unless I'm missing something.
>>>>>>>
>>>>>>>
>>>>>>> On Fri, Mar 9, 2018 at 11:07 PM Romain Manni-Bucau <
>>>>>>> rmannibu...@gmail.com> wrote:
>>>>>>>
>>>>>>>> Yes, callback based. Beam today is synchronous and until
>>>>>>>> bundles+combines are reactive friendly, beam will be synchronous 
>>>>>>>> whatever
>>>>>>>> other parts do. Becoming reactive will enable to manage the threading
>>>>>>>> issues properly and to have better scalability on the overall execution
>>>>>>>> when remote IO are involved.
>>>>>>>>
>>>>>>>> However it requires to break source, sdf design to use
>>>>>>>> completionstage - or equivalent - to chain the processing properly and 
>>>>>>>> in
>>>>>>>> an unified fashion.
>>>>>>>>
>>>>>>>> Le 9 mars 2018 23:48, "Reuven Lax" <re...@google.com> a écrit :
>>>>>>>>
>>>>>>>> If you're talking about reactive programming, at a certain level
>>>>>>>> beam is already reactive. Are you referring to a specific way of 
>>>>>>>> writing
>>>>>>>> the code?
>>>>>>>>
>>>>>>>>
>>>>>>>> On Fri, Mar 9, 2018 at 1:59 PM Reuven Lax <re...@google.com> wrote:
>>>>>>>>
>>>>>>>>> What do you mean by reactive?
>>>>>>>>>
>>>>>>>>> On Fri, Mar 9, 2018, 6:58 PM Romain Manni-Bucau <
>>>>>>>>> rmannibu...@gmail.com> wrote:
>>>>>>>>>
>>>>>>>>>> @Kenn: why not preferring to make beam reactive? Would alow to
>>>>>>>>>> scale way more without having to hardly synchronize multithreading. 
>>>>>>>>>> Elegant
>>>>>>>>>> and efficient :). Beam 3?
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> Le 9 mars 2018 22:49, "Kenneth Knowles" <k...@google.com> a
>>>>>>>>>> écrit :
>>>>>>>>>>
>>>>>>>>>>> I will start with the "exciting futuristic" answer, which is
>>>>>>>>>>> that we envision the new DoFn to be able to provide an automatic
>>>>>>>>>>> ExecutorService parameters that you can use as you wish.
>>>>>>>>>>>
>>>>>>>>>>>     new DoFn<>() {
>>>>>>>>>>>       @ProcessElement
>>>>>>>>>>>       public void process(ProcessContext ctx, ExecutorService
>>>>>>>>>>> executorService) {
>>>>>>>>>>>           ... launch some futures, put them in instance vars ...
>>>>>>>>>>>       }
>>>>>>>>>>>
>>>>>>>>>>>       @FinishBundle
>>>>>>>>>>>       public void finish(...) {
>>>>>>>>>>>          ... block on futures, output results if appropriate ...
>>>>>>>>>>>       }
>>>>>>>>>>>     }
>>>>>>>>>>>
>>>>>>>>>>> This way, the Java SDK harness can use its overarching knowledge
>>>>>>>>>>> of what is going on in a computation to, for example, share a 
>>>>>>>>>>> thread pool
>>>>>>>>>>> between different bits. This was one reason to delete
>>>>>>>>>>> IntraBundleParallelization - it didn't allow the runner and user 
>>>>>>>>>>> code to
>>>>>>>>>>> properly manage how many things were going on concurrently. And 
>>>>>>>>>>> mostly the
>>>>>>>>>>> runner should own parallelizing to max out cores and what user code 
>>>>>>>>>>> needs
>>>>>>>>>>> is asynchrony hooks that can interact with that. However, this 
>>>>>>>>>>> feature is
>>>>>>>>>>> not thoroughly considered. TBD how much the harness itself manages 
>>>>>>>>>>> blocking
>>>>>>>>>>> on outstanding requests versus it being your responsibility in
>>>>>>>>>>> FinishBundle, etc.
>>>>>>>>>>>
>>>>>>>>>>> I haven't explored rolling your own here, if you are willing to
>>>>>>>>>>> do the knob tuning to get the threading acceptable for your 
>>>>>>>>>>> particular use
>>>>>>>>>>> case. Perhaps someone else can weigh in.
>>>>>>>>>>>
>>>>>>>>>>> Kenn
>>>>>>>>>>>
>>>>>>>>>>> On Fri, Mar 9, 2018 at 1:38 PM Josh Ferge <
>>>>>>>>>>> josh.fe...@bounceexchange.com> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> Hello all:
>>>>>>>>>>>>
>>>>>>>>>>>> Our team has a pipeline that make external network calls. These
>>>>>>>>>>>> pipelines are currently super slow, and the hypothesis is that 
>>>>>>>>>>>> they are
>>>>>>>>>>>> slow because we are not threading for our network calls. The 
>>>>>>>>>>>> github issue
>>>>>>>>>>>> below provides some discussion around this:
>>>>>>>>>>>>
>>>>>>>>>>>> https://github.com/apache/beam/pull/957
>>>>>>>>>>>>
>>>>>>>>>>>> In beam 1.0, there was IntraBundleParallelization, which helped
>>>>>>>>>>>> with this. However, this was removed because it didn't comply with 
>>>>>>>>>>>> a few
>>>>>>>>>>>> BEAM paradigms.
>>>>>>>>>>>>
>>>>>>>>>>>> Questions going forward:
>>>>>>>>>>>>
>>>>>>>>>>>> What is advised for jobs that make blocking network calls? It
>>>>>>>>>>>> seems bundling the elements into groups of size X prior to passing 
>>>>>>>>>>>> to the
>>>>>>>>>>>> DoFn, and managing the threading within the function might work. 
>>>>>>>>>>>> thoughts?
>>>>>>>>>>>> Are these types of jobs even suitable for beam?
>>>>>>>>>>>> Are there any plans to develop features that help with this?
>>>>>>>>>>>>
>>>>>>>>>>>> Thanks
>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>
>>>>
>>

Reply via email to