Yea, I think it could. But it is probably more readable to not overload the
term, plus certainly a bit simpler in implementation. So perhaps
@AsyncElement to make it very clear.

Kenn

On Sat, Mar 10, 2018 at 1:32 PM Reuven Lax <re...@google.com> wrote:

> Ken, can NewDoFn distinguish at generation time the difference between:
>
>     public void process(@Element CompletionStage<InputT> element, ...) {
>
> and
>
>     public void process(@Element Input element, ...) {
>
> If not, then we would probably need separate annotations....
>
>
>
>
>
> On Sat, Mar 10, 2018 at 11:09 AM Kenneth Knowles <k...@google.com> wrote:
>
>> Nice! I agree that providing a CompletionStage for chaining is much
>> better than an ExecutorService, and very clear.
>>
>> It is very feasible to add support that looks like
>>
>>   new DoFn<InputT, OutputT>() {
>>     @ProcessElement
>>     public void process(@Element CompletionStage<InputT> element, ...) {
>>       element.thenApply(...)
>>     }
>>   }
>>
>> If we had this available, I think users could even experiment with this
>> often as it might help even where it isn't obvious.
>>
>> My main hesitation is that big part of Beam is giving a basic/imperative
>> style of programming a DoFn that executes in a very smart
>> functional/parallel way. Full future-oriented programming is not
>> explored much outside of Javascript (and maybe Haskell) and requires
>> greater discipline in programming in a functional manner - if you are
>> mutating stuff in your callback you are going to have bugs, and then when
>> you add concurrency control you are going to have bad performance and
>> deadlocks. So I definitely wouldn't make it the default or want to spend
>> all our support effort on teaching advanced programming technique.
>>
>> Kenn
>>
>> On Sat, Mar 10, 2018 at 9:31 AM Romain Manni-Bucau <rmannibu...@gmail.com>
>> wrote:
>>
>>>
>>>
>>> 2018-03-10 17:30 GMT+01:00 Reuven Lax <re...@google.com>:
>>>
>>>> Have you considered drafting in detail what you think this API might
>>>> look like?
>>>>
>>>
>>>
>>> Yes, but it is after the "enhancements" - for my use cases - and "bugs"
>>> list so didn't started to work on it much.
>>>
>>>
>>>>
>>>> If it's a radically different API, it might be more appropriate as an
>>>> alternative parallel Beam API rather than a replacement for the current API
>>>> (there is also one such fluent API in the works).
>>>>
>>>
>>> What I plan is to draft it on top of beam (so the "useless" case I spoke
>>> about before) and then propose to impl it ~natively and move it as main API
>>> for another major.
>>>
>>>
>>>
>>>
>>>>
>>>>
>>>> On Sat, Mar 10, 2018 at 7:23 AM Romain Manni-Bucau <
>>>> rmannibu...@gmail.com> wrote:
>>>>
>>>>>
>>>>>
>>>>> 2018-03-10 16:19 GMT+01:00 Reuven Lax <re...@google.com>:
>>>>>
>>>>>> This is another version (maybe a better, Java 8 idiomatic one?) of
>>>>>> what Kenn suggested.
>>>>>>
>>>>>> Note that with NewDoFn this need not be incompatible (so might not
>>>>>> require waiting till Beam 3.0). We can recognize new parameters to
>>>>>> processElement and populate add needed.
>>>>>>
>>>>>
>>>>> This is right however in my head it was a single way movemenent to
>>>>> enforce the design to be reactive and not fake a reactive API with a sync
>>>>> and not reactive impl which is what would be done today with both support 
>>>>> I
>>>>> fear.
>>>>>
>>>>>
>>>>>>
>>>>>> On Sat, Mar 10, 2018, 12:13 PM Romain Manni-Bucau <
>>>>>> rmannibu...@gmail.com> wrote:
>>>>>>
>>>>>>> Yes, for the dofn for instance, instead of having
>>>>>>> processcontext.element()=<T> you get a CompletionStage<T> and output 
>>>>>>> gets
>>>>>>> it as well.
>>>>>>>
>>>>>>> This way you register an execution chain. Mixed with streams you get
>>>>>>> a big data java 8/9/10 API which enabkes any connectivity in a wel
>>>>>>> performing manner ;).
>>>>>>>
>>>>>>> Le 10 mars 2018 13:56, "Reuven Lax" <re...@google.com> a écrit :
>>>>>>>
>>>>>>>> So you mean the user should have a way of registering asynchronous
>>>>>>>> activity with a callback (the callback must be registered with Beam,
>>>>>>>> because Beam needs to know not to mark the element as done until all
>>>>>>>> associated callbacks have completed). I think that's basically what 
>>>>>>>> Kenn
>>>>>>>> was suggesting, unless I'm missing something.
>>>>>>>>
>>>>>>>>
>>>>>>>> On Fri, Mar 9, 2018 at 11:07 PM Romain Manni-Bucau <
>>>>>>>> rmannibu...@gmail.com> wrote:
>>>>>>>>
>>>>>>>>> Yes, callback based. Beam today is synchronous and until
>>>>>>>>> bundles+combines are reactive friendly, beam will be synchronous 
>>>>>>>>> whatever
>>>>>>>>> other parts do. Becoming reactive will enable to manage the threading
>>>>>>>>> issues properly and to have better scalability on the overall 
>>>>>>>>> execution
>>>>>>>>> when remote IO are involved.
>>>>>>>>>
>>>>>>>>> However it requires to break source, sdf design to use
>>>>>>>>> completionstage - or equivalent - to chain the processing properly 
>>>>>>>>> and in
>>>>>>>>> an unified fashion.
>>>>>>>>>
>>>>>>>>> Le 9 mars 2018 23:48, "Reuven Lax" <re...@google.com> a écrit :
>>>>>>>>>
>>>>>>>>> If you're talking about reactive programming, at a certain level
>>>>>>>>> beam is already reactive. Are you referring to a specific way of 
>>>>>>>>> writing
>>>>>>>>> the code?
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On Fri, Mar 9, 2018 at 1:59 PM Reuven Lax <re...@google.com>
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>>> What do you mean by reactive?
>>>>>>>>>>
>>>>>>>>>> On Fri, Mar 9, 2018, 6:58 PM Romain Manni-Bucau <
>>>>>>>>>> rmannibu...@gmail.com> wrote:
>>>>>>>>>>
>>>>>>>>>>> @Kenn: why not preferring to make beam reactive? Would alow to
>>>>>>>>>>> scale way more without having to hardly synchronize multithreading. 
>>>>>>>>>>> Elegant
>>>>>>>>>>> and efficient :). Beam 3?
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> Le 9 mars 2018 22:49, "Kenneth Knowles" <k...@google.com> a
>>>>>>>>>>> écrit :
>>>>>>>>>>>
>>>>>>>>>>>> I will start with the "exciting futuristic" answer, which is
>>>>>>>>>>>> that we envision the new DoFn to be able to provide an automatic
>>>>>>>>>>>> ExecutorService parameters that you can use as you wish.
>>>>>>>>>>>>
>>>>>>>>>>>>     new DoFn<>() {
>>>>>>>>>>>>       @ProcessElement
>>>>>>>>>>>>       public void process(ProcessContext ctx, ExecutorService
>>>>>>>>>>>> executorService) {
>>>>>>>>>>>>           ... launch some futures, put them in instance vars ...
>>>>>>>>>>>>       }
>>>>>>>>>>>>
>>>>>>>>>>>>       @FinishBundle
>>>>>>>>>>>>       public void finish(...) {
>>>>>>>>>>>>          ... block on futures, output results if appropriate ...
>>>>>>>>>>>>       }
>>>>>>>>>>>>     }
>>>>>>>>>>>>
>>>>>>>>>>>> This way, the Java SDK harness can use its overarching
>>>>>>>>>>>> knowledge of what is going on in a computation to, for example, 
>>>>>>>>>>>> share a
>>>>>>>>>>>> thread pool between different bits. This was one reason to delete
>>>>>>>>>>>> IntraBundleParallelization - it didn't allow the runner and user 
>>>>>>>>>>>> code to
>>>>>>>>>>>> properly manage how many things were going on concurrently. And 
>>>>>>>>>>>> mostly the
>>>>>>>>>>>> runner should own parallelizing to max out cores and what user 
>>>>>>>>>>>> code needs
>>>>>>>>>>>> is asynchrony hooks that can interact with that. However, this 
>>>>>>>>>>>> feature is
>>>>>>>>>>>> not thoroughly considered. TBD how much the harness itself manages 
>>>>>>>>>>>> blocking
>>>>>>>>>>>> on outstanding requests versus it being your responsibility in
>>>>>>>>>>>> FinishBundle, etc.
>>>>>>>>>>>>
>>>>>>>>>>>> I haven't explored rolling your own here, if you are willing to
>>>>>>>>>>>> do the knob tuning to get the threading acceptable for your 
>>>>>>>>>>>> particular use
>>>>>>>>>>>> case. Perhaps someone else can weigh in.
>>>>>>>>>>>>
>>>>>>>>>>>> Kenn
>>>>>>>>>>>>
>>>>>>>>>>>> On Fri, Mar 9, 2018 at 1:38 PM Josh Ferge <
>>>>>>>>>>>> josh.fe...@bounceexchange.com> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>>> Hello all:
>>>>>>>>>>>>>
>>>>>>>>>>>>> Our team has a pipeline that make external network calls.
>>>>>>>>>>>>> These pipelines are currently super slow, and the hypothesis is 
>>>>>>>>>>>>> that they
>>>>>>>>>>>>> are slow because we are not threading for our network calls. The 
>>>>>>>>>>>>> github
>>>>>>>>>>>>> issue below provides some discussion around this:
>>>>>>>>>>>>>
>>>>>>>>>>>>> https://github.com/apache/beam/pull/957
>>>>>>>>>>>>>
>>>>>>>>>>>>> In beam 1.0, there was IntraBundleParallelization, which
>>>>>>>>>>>>> helped with this. However, this was removed because it didn't 
>>>>>>>>>>>>> comply with a
>>>>>>>>>>>>> few BEAM paradigms.
>>>>>>>>>>>>>
>>>>>>>>>>>>> Questions going forward:
>>>>>>>>>>>>>
>>>>>>>>>>>>> What is advised for jobs that make blocking network calls? It
>>>>>>>>>>>>> seems bundling the elements into groups of size X prior to 
>>>>>>>>>>>>> passing to the
>>>>>>>>>>>>> DoFn, and managing the threading within the function might work. 
>>>>>>>>>>>>> thoughts?
>>>>>>>>>>>>> Are these types of jobs even suitable for beam?
>>>>>>>>>>>>> Are there any plans to develop features that help with this?
>>>>>>>>>>>>>
>>>>>>>>>>>>> Thanks
>>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>
>>>>>
>>>

Reply via email to