That's a good point that this "IO" time should be tracked differently.

For a single level, a wrapper/utility that correctly and completely
(and transparently) implements the "naive" bit I sketched above under
the hood may be sufficient and implementable purely in user-space, and
quite useful.

On Thu, Jan 24, 2019 at 7:38 PM Scott Wegner <sc...@apache.org> wrote:
>
> Makes sense to me. We should make it easier to write DoFn's in this pattern 
> that has emerged as common among I/O connectors.
>
> Enabling asynchronous task chaining across a fusion tree is more complicated 
> but not necessary for this scenario.
>
> On Thu, Jan 24, 2019 at 10:13 AM Steve Niemitz <sniem...@apache.org> wrote:
>>
>> It's also important to note that in many (most?) IO frameworks (gRPC, 
>> finagle, etc), asynchronous IO is typically completely non-blocking, so 
>> there generally won't be a large number of threads waiting for IO to 
>> complete.  (netty uses a small pool of threads for the Event Loop Group for 
>> example).
>>
>> But in general I agree with Reuven, runners should not count threads in use 
>> in other thread pools for IO for the purpose of autoscaling (or most kinds 
>> of accounting).
>>
>> On Thu, Jan 24, 2019 at 12:54 PM Reuven Lax <re...@google.com> wrote:
>>>
>>> As Steve said, the main rationale for this is so that asynchronous IOs (or 
>>> in general, asynchronous remote calls) call be made. To some degree this 
>>> addresses Scott's concern: the asynchronous threads should be, for the most 
>>> part, simply waiting for IOs to complete; the reason to do the waiting 
>>> asynchronously is so that the main threadpool does not become blocked, 
>>> causing the pipeline to become IO bound. A runner like Dataflow should not 
>>> be tracking these threads for the purpose of autoscaling, as adding more 
>>> workers will (usually) not cause these calls to complete any faster.
>>>
>>> Reuven
>>>
>>> On Thu, Jan 24, 2019 at 7:28 AM Steve Niemitz <sniem...@apache.org> wrote:
>>>>
>>>> I think I agree with a lot of what you said here, I'm just going to 
>>>> restate my initial use-case to try to make it more clear as well.
>>>>
>>>> From my usage of beam, I feel like the big benefit of async DoFns would be 
>>>> to allow batched IO to be implemented more simply inside a DoFn.  Even in 
>>>> the Beam SDK itself, there are a lot of IOs that batch up IO operations in 
>>>> ProcessElement and wait for them to complete in FinishBundle ([1][2], 
>>>> etc).  From my experience, things like error handling, emitting outputs as 
>>>> the result of an asynchronous operation completing (in the correct window, 
>>>> with the correct timestamp, etc) get pretty tricky, and it would be great 
>>>> for the SDK to provide support natively for it.
>>>>
>>>> It's also probably good to point out that really only DoFns that do IO 
>>>> should be asynchronous, normal CPU bound DoFns have no reason to be 
>>>> asynchronous.
>>>>
>>>> A really good example of this is an IO I had written recently for 
>>>> Bigtable, it takes an input PCollection of ByteStrings representing row 
>>>> keys, and returns a PCollection of the row data from bigtable.  Naively 
>>>> this could be implemented by simply blocking on the Bigtable read inside 
>>>> the ParDo, however this would limit throughput substantially (even 
>>>> assuming an avg read latency is 1ms, thats still only 1000 QPS / instance 
>>>> of the ParDo).  My implementation batches many reads together (as they 
>>>> arrive at the DoFn), executes them once the batch is big enough (or some 
>>>> time passes), and then emits them once the batch read completes.  Emitting 
>>>> them in the correct window and handling errors gets tricky, so this is 
>>>> certainly something I'd love the framework itself to handle.
>>>>
>>>> I also don't see a big benefit of making a DoFn receive a future, if all a 
>>>> user is ever supposed to do is attach a continuation to it, that could 
>>>> just as easily be done by the runner itself, basically just invoking the 
>>>> entire ParDo as a continuation on the future (which then assumes the 
>>>> runner is even representing these tasks as futures internally).
>>>>
>>>> Making the DoFn itself actually return a future could be an option, even 
>>>> if the language itself doesn't support something like `await`, you could 
>>>> still implement it yourself in the DoFn, however, it seems like it'd be a 
>>>> strange contrast to the non-async version, which returns void.
>>>>
>>>> [1] 
>>>> https://github.com/apache/beam/blob/428dbaf2465292e9e53affd094e4b38cc1d754e5/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java#L720
>>>> [2] 
>>>> https://github.com/apache/beam/blob/428dbaf2465292e9e53affd094e4b38cc1d754e5/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java#L1080
>>>>
>>>>
>>>> On Thu, Jan 24, 2019 at 8:43 AM Robert Bradshaw <rober...@google.com> 
>>>> wrote:
>>>>>
>>>>> If I understand correctly, the end goal is to process input elements
>>>>> of a DoFn asynchronously. Were I to do this naively, I would implement
>>>>> DoFns that simply take and receive [Serializable?]CompletionStages as
>>>>> element types, followed by a DoFn that adds a callback to emit on
>>>>> completion (possibly via a queue to avoid being-on-the-wrong-thread
>>>>> issues) and whose finalize forces all completions. This would, of
>>>>> course, interact poorly with processing time tracking, fusion breaks,
>>>>> watermark tracking, counter attribution, window propagation, etc. so
>>>>> it is desirable to make it part of the system itself.
>>>>>
>>>>> Taking a OutputReceiver<CompletionStage<OutputT>> seems like a decent
>>>>> API. The invoking of the downstream process could be chained onto
>>>>> this, with all the implicit tracking and tracing set up correctly.
>>>>> Taking a CompletionStage as input means a DoFn would not have to
>>>>> create its output CompletionStage ex nihilo and possibly allow for
>>>>> better chaining (depending on the asynchronous APIs used).
>>>>>
>>>>> Even better might be to simply let the invocation of all
>>>>> DoFn.process() methods be asynchronous, but as Java doesn't offer an
>>>>> await primitive to relinquish control in the middle of a function body
>>>>> this might be hard.
>>>>>
>>>>> I think for correctness, completion would have to be forced at the end
>>>>> of each bundle. If your bundles are large enough, this may not be that
>>>>> big of a deal. In this case you could also start executing subsequent
>>>>> bundles while waiting for prior ones to complete.
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> On Wed, Jan 23, 2019 at 11:58 PM Bharath Kumara Subramanian
>>>>> <codin.mart...@gmail.com> wrote:
>>>>> >>
>>>>> >> I'd love to see something like this as well.  Also +1 to 
>>>>> >> process(@Element InputT element, @Output 
>>>>> >> OutputReceiver<CompletionStage<OutputT>>). I don't know if there's 
>>>>> >> much benefit to passing a future in, since the framework itself could 
>>>>> >> hook up the process function to complete when the future completes.
>>>>> >
>>>>> >
>>>>> > One benefit we get by wrapping the input with CompletionStage is to 
>>>>> > mandate[1] users to chain their processing logic to the input future; 
>>>>> > thereby, ensuring asynchrony for the most part. However, it is still 
>>>>> > possible for users to go out of their way and write blocking code.
>>>>> >
>>>>> > Although, I am not sure how counter intuitive it is for the runners to 
>>>>> > wrap the input element into a future before passing it to the user code.
>>>>> >
>>>>> > Bharath
>>>>> >
>>>>> > [1] CompletionStage interface does not define methods for initially 
>>>>> > creating, forcibly completing normally or exceptionally, probing 
>>>>> > completion status or results, or awaiting completion of a stage. 
>>>>> > Implementations of CompletionStage may provide means of achieving such 
>>>>> > effects, as appropriate
>>>>> >
>>>>> >
>>>>> > On Wed, Jan 23, 2019 at 11:31 AM Kenneth Knowles <k...@apache.org> 
>>>>> > wrote:
>>>>> >>
>>>>> >> I think your concerns are valid but i want to clarify about "first 
>>>>> >> class async APIs". Does "first class" mean that it is a 
>>>>> >> well-encapsulated abstraction? or does it mean that the user can more 
>>>>> >> or less do whatever they want? These are opposite but both valid 
>>>>> >> meanings for "first class", to me.
>>>>> >>
>>>>> >> I would not want to encourage users to do explicit multi-threaded 
>>>>> >> programming or control parallelism. Part of the point of Beam is to 
>>>>> >> gain big data parallelism without explicit multithreading. I see 
>>>>> >> asynchronous chaining of futures (or their best-approximation in your 
>>>>> >> language of choice) as a highly disciplined way of doing asynchronous 
>>>>> >> dependency-driven computation that is nonetheless conceptually, and 
>>>>> >> readably, straight-line code. Threads are not required nor the only 
>>>>> >> way to execute this code. In fact you might often want to execute 
>>>>> >> without threading for a reference implementation to provide 
>>>>> >> canonically correct results. APIs that leak lower-level details of 
>>>>> >> threads are asking for trouble.
>>>>> >>
>>>>> >> One of our other ideas was to provide a dynamic parameter of type 
>>>>> >> ExecutorService. The SDK harness (pre-portability: the runner) would 
>>>>> >> control and observe parallelism while the user could simply register 
>>>>> >> tasks. Providing a future/promise API is even more disciplined.
>>>>> >>
>>>>> >> Kenn
>>>>> >>
>>>>> >> On Wed, Jan 23, 2019 at 10:35 AM Scott Wegner <sc...@apache.org> wrote:
>>>>> >>>
>>>>> >>> A related question is how to make execution observable such that a 
>>>>> >>> runner can make proper scaling decisions. Runners decide how to 
>>>>> >>> schedule bundles within and across multiple worker instances, and can 
>>>>> >>> use information about execution to make dynamic scaling decisions. 
>>>>> >>> First-class async APIs seem like they would encourage DoFn authors to 
>>>>> >>> implement their own parallelization, rather than deferring to the 
>>>>> >>> runner that should be more capable of providing the right level of 
>>>>> >>> parallelism.
>>>>> >>>
>>>>> >>> In the Dataflow worker harness, we estimate execution time to 
>>>>> >>> PTransform steps by sampling execution time on the execution thread 
>>>>> >>> and attributing it to the currently invoked method. This approach is 
>>>>> >>> fairly simple and possible because we assume that execution happens 
>>>>> >>> within the thread controlled by the runner. Some DoFn's already 
>>>>> >>> implement their own async logic and break this assumption; I would 
>>>>> >>> expect more if we make async built into the DoFn APIs.
>>>>> >>>
>>>>> >>> So: this isn't an argument against async APIs, but rather: does this 
>>>>> >>> break execution observability, and are there other lightweight 
>>>>> >>> mechanisms for attributing execution time of async work?
>>>>> >>>
>>>>> >>> On Tue, Jan 22, 2019 at 7:08 PM Kenneth Knowles <k...@google.com> 
>>>>> >>> wrote:
>>>>> >>>>
>>>>> >>>> When executed over the portable APIs, it will be primarily the Java 
>>>>> >>>> SDK harness that makes all of these decisions. If we wanted runners 
>>>>> >>>> to have some insight into it we would have to add it to the Beam 
>>>>> >>>> model protos. I don't have any suggestions there, so I would leave 
>>>>> >>>> it out of this discussion until there's good ideas. We could learn a 
>>>>> >>>> lot by trying it out just in the SDK harness.
>>>>> >>>>
>>>>> >>>> Kenn
>>>>> >>>>
>>>>> >>>> On Tue, Jan 22, 2019 at 6:12 PM Xinyu Liu <xinyuliu...@gmail.com> 
>>>>> >>>> wrote:
>>>>> >>>>>
>>>>> >>>>> I don't have a strong opinion on the resolution of the futures 
>>>>> >>>>> regarding to @FinishBundle invocation. Leaving it to be unspecified 
>>>>> >>>>> does give runners more room to implement it with their own support.
>>>>> >>>>>
>>>>> >>>>> Optimization is also another great point. Fuse seems pretty complex 
>>>>> >>>>> to me too if we need to find a way to chain the resulting future 
>>>>> >>>>> into the next transform, or leave the async transform as a 
>>>>> >>>>> standalone stage initially?
>>>>> >>>>>
>>>>> >>>>> Btw, I was counting the number of replies before we hit the 
>>>>> >>>>> portability. Seems after 4 replies fuse finally showed up :).
>>>>> >>>>>
>>>>> >>>>> Thanks,
>>>>> >>>>> Xinyu
>>>>> >>>>>
>>>>> >>>>>
>>>>> >>>>> On Tue, Jan 22, 2019 at 5:42 PM Kenneth Knowles <k...@google.com> 
>>>>> >>>>> wrote:
>>>>> >>>>>>
>>>>> >>>>>>
>>>>> >>>>>>
>>>>> >>>>>> On Tue, Jan 22, 2019, 17:23 Reuven Lax <re...@google.com wrote:
>>>>> >>>>>>>
>>>>> >>>>>>>
>>>>> >>>>>>>
>>>>> >>>>>>> On Tue, Jan 22, 2019 at 5:08 PM Xinyu Liu <xinyuliu...@gmail.com> 
>>>>> >>>>>>> wrote:
>>>>> >>>>>>>>
>>>>> >>>>>>>> @Steve: it's good to see that this is going to be useful in your 
>>>>> >>>>>>>> use cases as well. Thanks for sharing the code from Scio! I can 
>>>>> >>>>>>>> see in your implementation that waiting for the future 
>>>>> >>>>>>>> completion is part of the @FinishBundle. We are thinking of 
>>>>> >>>>>>>> taking advantage of the underlying runner async support so the 
>>>>> >>>>>>>> user-level code won't need to implement this logic, e.g. Samza 
>>>>> >>>>>>>> has an AsyncSteamTask api that provides a callback to invoke 
>>>>> >>>>>>>> after future completion[1], and Flink also has AsyncFunction api 
>>>>> >>>>>>>> [2] which provides a ResultFuture similar to the API we 
>>>>> >>>>>>>> discussed.
>>>>> >>>>>>>
>>>>> >>>>>>>
>>>>> >>>>>>> Can this be done correctly? What I mean is that if the process 
>>>>> >>>>>>> dies, can you guarantee that no data is lost? Beam currently 
>>>>> >>>>>>> guarantees this for FinishBundle, but if you use an arbitrary 
>>>>> >>>>>>> async framework this might not be true.
>>>>> >>>>>>
>>>>> >>>>>>
>>>>> >>>>>> What a Beam runner guarantees is that *if* the bundle is 
>>>>> >>>>>> committed, *then* finishbundle has run. So it seems just as easy 
>>>>> >>>>>> to say *if* a bundle is committed, *then* every async result has 
>>>>> >>>>>> been resolved.
>>>>> >>>>>>
>>>>> >>>>>> If the process dies the two cases should be naturally analogous.
>>>>> >>>>>>
>>>>> >>>>>> But it raises the question of whether they should be resolved 
>>>>> >>>>>> prior to finishbundle, after, or unspecified. I lean toward 
>>>>> >>>>>> unspecified.
>>>>> >>>>>>
>>>>> >>>>>> That's for a single ParDo. Where this could get complex is 
>>>>> >>>>>> optimizing fused stages for greater asynchrony.
>>>>> >>>>>>
>>>>> >>>>>> Kenn
>>>>> >>>>>>
>>>>> >>>>>>>
>>>>> >>>>>>>>
>>>>> >>>>>>>> A simple use case for this is to execute a Runnable 
>>>>> >>>>>>>> asynchronously in user's own executor. The following code 
>>>>> >>>>>>>> illustrates Kenn's option #2, with a very simple single-thread 
>>>>> >>>>>>>> pool being the executor:
>>>>> >>>>>>>>
>>>>> >>>>>>>> new DoFn<InputT, OutputT>() {
>>>>> >>>>>>>>   @ProcessElement
>>>>> >>>>>>>>   public void process(@Element InputT element, @Output 
>>>>> >>>>>>>> OutputReceiver<CompletionStage<OutputT>> outputReceiver) {
>>>>> >>>>>>>>     CompletableFuture<OutputT> future = 
>>>>> >>>>>>>> CompletableFuture.supplyAsync(
>>>>> >>>>>>>>         () -> someOutput,
>>>>> >>>>>>>>         Executors.newSingleThreadExecutor());
>>>>> >>>>>>>>     outputReceiver.output(future);
>>>>> >>>>>>>>   }
>>>>> >>>>>>>> }
>>>>> >>>>>>>>
>>>>> >>>>>>>> The neat thing about this API is that the user can choose their 
>>>>> >>>>>>>> own async framework and we only expect the output to be a 
>>>>> >>>>>>>> CompletionStage.
>>>>> >>>>>>>>
>>>>> >>>>>>>>
>>>>> >>>>>>>> For the implementation of bundling, can we compose a 
>>>>> >>>>>>>> CompletableFuture from each element in the bundle, e.g. 
>>>>> >>>>>>>> CompletableFuture.allOf(...), and then invoke @FinishBundle when 
>>>>> >>>>>>>> this future is complete? Seems this might work.
>>>>> >>>>>>>>
>>>>> >>>>>>>> Thanks,
>>>>> >>>>>>>> Xinyu
>>>>> >>>>>>>>
>>>>> >>>>>>>>
>>>>> >>>>>>>> [1] 
>>>>> >>>>>>>> https://samza.apache.org/learn/documentation/1.0.0/api/javadocs/org/apache/samza/task/AsyncStreamTask.html
>>>>> >>>>>>>> [2] 
>>>>> >>>>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/stream/operators/asyncio.html
>>>>> >>>>>>>>
>>>>> >>>>>>>> On Tue, Jan 22, 2019 at 2:45 PM Steve Niemitz 
>>>>> >>>>>>>> <sniem...@apache.org> wrote:
>>>>> >>>>>>>>>
>>>>> >>>>>>>>> I'd love to see something like this as well.  Also +1 to 
>>>>> >>>>>>>>> process(@Element InputT element, @Output 
>>>>> >>>>>>>>> OutputReceiver<CompletionStage<OutputT>>).  I don't know if 
>>>>> >>>>>>>>> there's much benefit to passing a future in, since the 
>>>>> >>>>>>>>> framework itself could hook up the process function to complete 
>>>>> >>>>>>>>> when the future completes.
>>>>> >>>>>>>>>
>>>>> >>>>>>>>> I feel like I've spent a bunch of time writing very similar 
>>>>> >>>>>>>>> "kick off a future in ProcessElement, join it in FinishBundle" 
>>>>> >>>>>>>>> code, and looking around beam itself a lot of built-in 
>>>>> >>>>>>>>> transforms do it as well.  Scio provides a few AsyncDoFn 
>>>>> >>>>>>>>> implementations [1] but it'd be great to see this as a 
>>>>> >>>>>>>>> first-class concept in beam itself.  Doing error handling, 
>>>>> >>>>>>>>> concurrency, etc correctly can be tricky.
>>>>> >>>>>>>>>
>>>>> >>>>>>>>> [1] 
>>>>> >>>>>>>>> https://github.com/spotify/scio/blob/master/scio-core/src/main/java/com/spotify/scio/transforms/BaseAsyncDoFn.java
>>>>> >>>>>>>>>
>>>>> >>>>>>>>> On Tue, Jan 22, 2019 at 5:39 PM Kenneth Knowles 
>>>>> >>>>>>>>> <k...@google.com> wrote:
>>>>> >>>>>>>>>>
>>>>> >>>>>>>>>> If the input is a CompletionStage<InputT> then the output 
>>>>> >>>>>>>>>> should also be a CompletionStage<OutputT>, since all you 
>>>>> >>>>>>>>>> should do is async chaining. We could enforce this by giving 
>>>>> >>>>>>>>>> the DoFn an OutputReceiver(CompletionStage<OutputT>).
>>>>> >>>>>>>>>>
>>>>> >>>>>>>>>> Another possibility that might be even more robust against 
>>>>> >>>>>>>>>> poor future use could be process(@Element InputT element, 
>>>>> >>>>>>>>>> @Output OutputReceiver<CompletionStage<OutputT>>). In this 
>>>>> >>>>>>>>>> way, the process method itself will be async chained, rather 
>>>>> >>>>>>>>>> than counting on the user to do the right thing.
>>>>> >>>>>>>>>>
>>>>> >>>>>>>>>> We should see how these look in real use cases. The way that 
>>>>> >>>>>>>>>> processing is split between @ProcessElement and @FinishBundle 
>>>>> >>>>>>>>>> might complicate things.
>>>>> >>>>>>>>>>
>>>>> >>>>>>>>>> Kenn
>>>>> >>>>>>>>>>
>>>>> >>>>>>>>>> On Tue, Jan 22, 2019 at 12:44 PM Xinyu Liu 
>>>>> >>>>>>>>>> <xinyuliu...@gmail.com> wrote:
>>>>> >>>>>>>>>>>
>>>>> >>>>>>>>>>> Hi, guys,
>>>>> >>>>>>>>>>>
>>>>> >>>>>>>>>>> As more users try out Beam running on the SamzaRunner, we got 
>>>>> >>>>>>>>>>> a lot of asks for an asynchronous processing API. There are a 
>>>>> >>>>>>>>>>> few reasons for these asks:
>>>>> >>>>>>>>>>>
>>>>> >>>>>>>>>>> The users here are experienced in asynchronous programming. 
>>>>> >>>>>>>>>>> With async frameworks such as Netty and ParSeq and libs like 
>>>>> >>>>>>>>>>> async jersey client, they are able to make remote calls 
>>>>> >>>>>>>>>>> efficiently and the libraries help manage the execution 
>>>>> >>>>>>>>>>> threads underneath. Async remote calls are very common in 
>>>>> >>>>>>>>>>> most of our streaming applications today.
>>>>> >>>>>>>>>>> Many jobs are running on a multi-tenancy cluster. Async 
>>>>> >>>>>>>>>>> processing helps for less resource usage and fast computation 
>>>>> >>>>>>>>>>> (less context switch).
>>>>> >>>>>>>>>>>
>>>>> >>>>>>>>>>> I asked about the async support in a previous email thread. 
>>>>> >>>>>>>>>>> The following API was mentioned in the reply:
>>>>> >>>>>>>>>>>
>>>>> >>>>>>>>>>>   new DoFn<InputT, OutputT>() {
>>>>> >>>>>>>>>>>     @ProcessElement
>>>>> >>>>>>>>>>>     public void process(@Element CompletionStage<InputT> 
>>>>> >>>>>>>>>>> element, ...) {
>>>>> >>>>>>>>>>>       element.thenApply(...)
>>>>> >>>>>>>>>>>     }
>>>>> >>>>>>>>>>>   }
>>>>> >>>>>>>>>>>
>>>>> >>>>>>>>>>> We are wondering whether there are any discussions on this 
>>>>> >>>>>>>>>>> API and related docs. It is awesome that you guys already 
>>>>> >>>>>>>>>>> considered having DoFn to process asynchronously. Out of 
>>>>> >>>>>>>>>>> curiosity, this API seems to create a CompletionState out of 
>>>>> >>>>>>>>>>> the input element (probably using framework's executor) and 
>>>>> >>>>>>>>>>> then allow user to chain on it. To us, it seems more 
>>>>> >>>>>>>>>>> convenient if the DoFn output a CompletionStage<OutputT> or 
>>>>> >>>>>>>>>>> pass in a CompletionStage<OutputT> to invoke upon completion.
>>>>> >>>>>>>>>>>
>>>>> >>>>>>>>>>> We would like to discuss further on the async API and 
>>>>> >>>>>>>>>>> hopefully we will have a great support in Beam. Really 
>>>>> >>>>>>>>>>> appreciate the feedback!
>>>>> >>>>>>>>>>>
>>>>> >>>>>>>>>>> Thanks,
>>>>> >>>>>>>>>>> Xinyu
>>>>> >>>
>>>>> >>>
>>>>> >>>
>>>>> >>> --
>>>>> >>>
>>>>> >>>
>>>>> >>>
>>>>> >>>
>>>>> >>> Got feedback? tinyurl.com/swegner-feedback
>
>
>
> --
>
>
>
>
> Got feedback? tinyurl.com/swegner-feedback

Reply via email to