Nice! I agree that providing a CompletionStage for chaining is much better
than an ExecutorService, and very clear.

It is very feasible to add support that looks like

  new DoFn<InputT, OutputT>() {
    @ProcessElement
    public void process(@Element CompletionStage<InputT> element, ...) {
      element.thenApply(...)
    }
  }

If we had this available, I think users could even experiment with this
often as it might help even where it isn't obvious.

My main hesitation is that big part of Beam is giving a basic/imperative
style of programming a DoFn that executes in a very smart
functional/parallel way. Full future-oriented programming is not explored
much outside of Javascript (and maybe Haskell) and requires greater
discipline in programming in a functional manner - if you are mutating
stuff in your callback you are going to have bugs, and then when you add
concurrency control you are going to have bad performance and deadlocks. So
I definitely wouldn't make it the default or want to spend all our support
effort on teaching advanced programming technique.

Kenn

On Sat, Mar 10, 2018 at 9:31 AM Romain Manni-Bucau <rmannibu...@gmail.com>
wrote:

>
>
> 2018-03-10 17:30 GMT+01:00 Reuven Lax <re...@google.com>:
>
>> Have you considered drafting in detail what you think this API might look
>> like?
>>
>
>
> Yes, but it is after the "enhancements" - for my use cases - and "bugs"
> list so didn't started to work on it much.
>
>
>>
>> If it's a radically different API, it might be more appropriate as an
>> alternative parallel Beam API rather than a replacement for the current API
>> (there is also one such fluent API in the works).
>>
>
> What I plan is to draft it on top of beam (so the "useless" case I spoke
> about before) and then propose to impl it ~natively and move it as main API
> for another major.
>
>
>
>
>>
>>
>> On Sat, Mar 10, 2018 at 7:23 AM Romain Manni-Bucau <rmannibu...@gmail.com>
>> wrote:
>>
>>>
>>>
>>> 2018-03-10 16:19 GMT+01:00 Reuven Lax <re...@google.com>:
>>>
>>>> This is another version (maybe a better, Java 8 idiomatic one?) of what
>>>> Kenn suggested.
>>>>
>>>> Note that with NewDoFn this need not be incompatible (so might not
>>>> require waiting till Beam 3.0). We can recognize new parameters to
>>>> processElement and populate add needed.
>>>>
>>>
>>> This is right however in my head it was a single way movemenent to
>>> enforce the design to be reactive and not fake a reactive API with a sync
>>> and not reactive impl which is what would be done today with both support I
>>> fear.
>>>
>>>
>>>>
>>>> On Sat, Mar 10, 2018, 12:13 PM Romain Manni-Bucau <
>>>> rmannibu...@gmail.com> wrote:
>>>>
>>>>> Yes, for the dofn for instance, instead of having
>>>>> processcontext.element()=<T> you get a CompletionStage<T> and output gets
>>>>> it as well.
>>>>>
>>>>> This way you register an execution chain. Mixed with streams you get a
>>>>> big data java 8/9/10 API which enabkes any connectivity in a wel 
>>>>> performing
>>>>> manner ;).
>>>>>
>>>>> Le 10 mars 2018 13:56, "Reuven Lax" <re...@google.com> a écrit :
>>>>>
>>>>>> So you mean the user should have a way of registering asynchronous
>>>>>> activity with a callback (the callback must be registered with Beam,
>>>>>> because Beam needs to know not to mark the element as done until all
>>>>>> associated callbacks have completed). I think that's basically what Kenn
>>>>>> was suggesting, unless I'm missing something.
>>>>>>
>>>>>>
>>>>>> On Fri, Mar 9, 2018 at 11:07 PM Romain Manni-Bucau <
>>>>>> rmannibu...@gmail.com> wrote:
>>>>>>
>>>>>>> Yes, callback based. Beam today is synchronous and until
>>>>>>> bundles+combines are reactive friendly, beam will be synchronous 
>>>>>>> whatever
>>>>>>> other parts do. Becoming reactive will enable to manage the threading
>>>>>>> issues properly and to have better scalability on the overall execution
>>>>>>> when remote IO are involved.
>>>>>>>
>>>>>>> However it requires to break source, sdf design to use
>>>>>>> completionstage - or equivalent - to chain the processing properly and 
>>>>>>> in
>>>>>>> an unified fashion.
>>>>>>>
>>>>>>> Le 9 mars 2018 23:48, "Reuven Lax" <re...@google.com> a écrit :
>>>>>>>
>>>>>>> If you're talking about reactive programming, at a certain level
>>>>>>> beam is already reactive. Are you referring to a specific way of writing
>>>>>>> the code?
>>>>>>>
>>>>>>>
>>>>>>> On Fri, Mar 9, 2018 at 1:59 PM Reuven Lax <re...@google.com> wrote:
>>>>>>>
>>>>>>>> What do you mean by reactive?
>>>>>>>>
>>>>>>>> On Fri, Mar 9, 2018, 6:58 PM Romain Manni-Bucau <
>>>>>>>> rmannibu...@gmail.com> wrote:
>>>>>>>>
>>>>>>>>> @Kenn: why not preferring to make beam reactive? Would alow to
>>>>>>>>> scale way more without having to hardly synchronize multithreading. 
>>>>>>>>> Elegant
>>>>>>>>> and efficient :). Beam 3?
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> Le 9 mars 2018 22:49, "Kenneth Knowles" <k...@google.com> a écrit :
>>>>>>>>>
>>>>>>>>>> I will start with the "exciting futuristic" answer, which is that
>>>>>>>>>> we envision the new DoFn to be able to provide an automatic 
>>>>>>>>>> ExecutorService
>>>>>>>>>> parameters that you can use as you wish.
>>>>>>>>>>
>>>>>>>>>>     new DoFn<>() {
>>>>>>>>>>       @ProcessElement
>>>>>>>>>>       public void process(ProcessContext ctx, ExecutorService
>>>>>>>>>> executorService) {
>>>>>>>>>>           ... launch some futures, put them in instance vars ...
>>>>>>>>>>       }
>>>>>>>>>>
>>>>>>>>>>       @FinishBundle
>>>>>>>>>>       public void finish(...) {
>>>>>>>>>>          ... block on futures, output results if appropriate ...
>>>>>>>>>>       }
>>>>>>>>>>     }
>>>>>>>>>>
>>>>>>>>>> This way, the Java SDK harness can use its overarching knowledge
>>>>>>>>>> of what is going on in a computation to, for example, share a thread 
>>>>>>>>>> pool
>>>>>>>>>> between different bits. This was one reason to delete
>>>>>>>>>> IntraBundleParallelization - it didn't allow the runner and user 
>>>>>>>>>> code to
>>>>>>>>>> properly manage how many things were going on concurrently. And 
>>>>>>>>>> mostly the
>>>>>>>>>> runner should own parallelizing to max out cores and what user code 
>>>>>>>>>> needs
>>>>>>>>>> is asynchrony hooks that can interact with that. However, this 
>>>>>>>>>> feature is
>>>>>>>>>> not thoroughly considered. TBD how much the harness itself manages 
>>>>>>>>>> blocking
>>>>>>>>>> on outstanding requests versus it being your responsibility in
>>>>>>>>>> FinishBundle, etc.
>>>>>>>>>>
>>>>>>>>>> I haven't explored rolling your own here, if you are willing to
>>>>>>>>>> do the knob tuning to get the threading acceptable for your 
>>>>>>>>>> particular use
>>>>>>>>>> case. Perhaps someone else can weigh in.
>>>>>>>>>>
>>>>>>>>>> Kenn
>>>>>>>>>>
>>>>>>>>>> On Fri, Mar 9, 2018 at 1:38 PM Josh Ferge <
>>>>>>>>>> josh.fe...@bounceexchange.com> wrote:
>>>>>>>>>>
>>>>>>>>>>> Hello all:
>>>>>>>>>>>
>>>>>>>>>>> Our team has a pipeline that make external network calls. These
>>>>>>>>>>> pipelines are currently super slow, and the hypothesis is that they 
>>>>>>>>>>> are
>>>>>>>>>>> slow because we are not threading for our network calls. The github 
>>>>>>>>>>> issue
>>>>>>>>>>> below provides some discussion around this:
>>>>>>>>>>>
>>>>>>>>>>> https://github.com/apache/beam/pull/957
>>>>>>>>>>>
>>>>>>>>>>> In beam 1.0, there was IntraBundleParallelization, which helped
>>>>>>>>>>> with this. However, this was removed because it didn't comply with 
>>>>>>>>>>> a few
>>>>>>>>>>> BEAM paradigms.
>>>>>>>>>>>
>>>>>>>>>>> Questions going forward:
>>>>>>>>>>>
>>>>>>>>>>> What is advised for jobs that make blocking network calls? It
>>>>>>>>>>> seems bundling the elements into groups of size X prior to passing 
>>>>>>>>>>> to the
>>>>>>>>>>> DoFn, and managing the threading within the function might work. 
>>>>>>>>>>> thoughts?
>>>>>>>>>>> Are these types of jobs even suitable for beam?
>>>>>>>>>>> Are there any plans to develop features that help with this?
>>>>>>>>>>>
>>>>>>>>>>> Thanks
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>
>>>
>

Reply via email to