That's a good point that this "IO" time should be tracked differently.
For a single level, a wrapper/utility that correctly and completely (and transparently) implements the "naive" bit I sketched above under the hood may be sufficient and implementable purely in user-space, and quite useful. On Thu, Jan 24, 2019 at 7:38 PM Scott Wegner <sc...@apache.org> wrote: > > Makes sense to me. We should make it easier to write DoFn's in this pattern > that has emerged as common among I/O connectors. > > Enabling asynchronous task chaining across a fusion tree is more complicated > but not necessary for this scenario. > > On Thu, Jan 24, 2019 at 10:13 AM Steve Niemitz <sniem...@apache.org> wrote: >> >> It's also important to note that in many (most?) IO frameworks (gRPC, >> finagle, etc), asynchronous IO is typically completely non-blocking, so >> there generally won't be a large number of threads waiting for IO to >> complete. (netty uses a small pool of threads for the Event Loop Group for >> example). >> >> But in general I agree with Reuven, runners should not count threads in use >> in other thread pools for IO for the purpose of autoscaling (or most kinds >> of accounting). >> >> On Thu, Jan 24, 2019 at 12:54 PM Reuven Lax <re...@google.com> wrote: >>> >>> As Steve said, the main rationale for this is so that asynchronous IOs (or >>> in general, asynchronous remote calls) call be made. To some degree this >>> addresses Scott's concern: the asynchronous threads should be, for the most >>> part, simply waiting for IOs to complete; the reason to do the waiting >>> asynchronously is so that the main threadpool does not become blocked, >>> causing the pipeline to become IO bound. A runner like Dataflow should not >>> be tracking these threads for the purpose of autoscaling, as adding more >>> workers will (usually) not cause these calls to complete any faster. >>> >>> Reuven >>> >>> On Thu, Jan 24, 2019 at 7:28 AM Steve Niemitz <sniem...@apache.org> wrote: >>>> >>>> I think I agree with a lot of what you said here, I'm just going to >>>> restate my initial use-case to try to make it more clear as well. >>>> >>>> From my usage of beam, I feel like the big benefit of async DoFns would be >>>> to allow batched IO to be implemented more simply inside a DoFn. Even in >>>> the Beam SDK itself, there are a lot of IOs that batch up IO operations in >>>> ProcessElement and wait for them to complete in FinishBundle ([1][2], >>>> etc). From my experience, things like error handling, emitting outputs as >>>> the result of an asynchronous operation completing (in the correct window, >>>> with the correct timestamp, etc) get pretty tricky, and it would be great >>>> for the SDK to provide support natively for it. >>>> >>>> It's also probably good to point out that really only DoFns that do IO >>>> should be asynchronous, normal CPU bound DoFns have no reason to be >>>> asynchronous. >>>> >>>> A really good example of this is an IO I had written recently for >>>> Bigtable, it takes an input PCollection of ByteStrings representing row >>>> keys, and returns a PCollection of the row data from bigtable. Naively >>>> this could be implemented by simply blocking on the Bigtable read inside >>>> the ParDo, however this would limit throughput substantially (even >>>> assuming an avg read latency is 1ms, thats still only 1000 QPS / instance >>>> of the ParDo). My implementation batches many reads together (as they >>>> arrive at the DoFn), executes them once the batch is big enough (or some >>>> time passes), and then emits them once the batch read completes. Emitting >>>> them in the correct window and handling errors gets tricky, so this is >>>> certainly something I'd love the framework itself to handle. >>>> >>>> I also don't see a big benefit of making a DoFn receive a future, if all a >>>> user is ever supposed to do is attach a continuation to it, that could >>>> just as easily be done by the runner itself, basically just invoking the >>>> entire ParDo as a continuation on the future (which then assumes the >>>> runner is even representing these tasks as futures internally). >>>> >>>> Making the DoFn itself actually return a future could be an option, even >>>> if the language itself doesn't support something like `await`, you could >>>> still implement it yourself in the DoFn, however, it seems like it'd be a >>>> strange contrast to the non-async version, which returns void. >>>> >>>> [1] >>>> https://github.com/apache/beam/blob/428dbaf2465292e9e53affd094e4b38cc1d754e5/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java#L720 >>>> [2] >>>> https://github.com/apache/beam/blob/428dbaf2465292e9e53affd094e4b38cc1d754e5/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java#L1080 >>>> >>>> >>>> On Thu, Jan 24, 2019 at 8:43 AM Robert Bradshaw <rober...@google.com> >>>> wrote: >>>>> >>>>> If I understand correctly, the end goal is to process input elements >>>>> of a DoFn asynchronously. Were I to do this naively, I would implement >>>>> DoFns that simply take and receive [Serializable?]CompletionStages as >>>>> element types, followed by a DoFn that adds a callback to emit on >>>>> completion (possibly via a queue to avoid being-on-the-wrong-thread >>>>> issues) and whose finalize forces all completions. This would, of >>>>> course, interact poorly with processing time tracking, fusion breaks, >>>>> watermark tracking, counter attribution, window propagation, etc. so >>>>> it is desirable to make it part of the system itself. >>>>> >>>>> Taking a OutputReceiver<CompletionStage<OutputT>> seems like a decent >>>>> API. The invoking of the downstream process could be chained onto >>>>> this, with all the implicit tracking and tracing set up correctly. >>>>> Taking a CompletionStage as input means a DoFn would not have to >>>>> create its output CompletionStage ex nihilo and possibly allow for >>>>> better chaining (depending on the asynchronous APIs used). >>>>> >>>>> Even better might be to simply let the invocation of all >>>>> DoFn.process() methods be asynchronous, but as Java doesn't offer an >>>>> await primitive to relinquish control in the middle of a function body >>>>> this might be hard. >>>>> >>>>> I think for correctness, completion would have to be forced at the end >>>>> of each bundle. If your bundles are large enough, this may not be that >>>>> big of a deal. In this case you could also start executing subsequent >>>>> bundles while waiting for prior ones to complete. >>>>> >>>>> >>>>> >>>>> >>>>> On Wed, Jan 23, 2019 at 11:58 PM Bharath Kumara Subramanian >>>>> <codin.mart...@gmail.com> wrote: >>>>> >> >>>>> >> I'd love to see something like this as well. Also +1 to >>>>> >> process(@Element InputT element, @Output >>>>> >> OutputReceiver<CompletionStage<OutputT>>). I don't know if there's >>>>> >> much benefit to passing a future in, since the framework itself could >>>>> >> hook up the process function to complete when the future completes. >>>>> > >>>>> > >>>>> > One benefit we get by wrapping the input with CompletionStage is to >>>>> > mandate[1] users to chain their processing logic to the input future; >>>>> > thereby, ensuring asynchrony for the most part. However, it is still >>>>> > possible for users to go out of their way and write blocking code. >>>>> > >>>>> > Although, I am not sure how counter intuitive it is for the runners to >>>>> > wrap the input element into a future before passing it to the user code. >>>>> > >>>>> > Bharath >>>>> > >>>>> > [1] CompletionStage interface does not define methods for initially >>>>> > creating, forcibly completing normally or exceptionally, probing >>>>> > completion status or results, or awaiting completion of a stage. >>>>> > Implementations of CompletionStage may provide means of achieving such >>>>> > effects, as appropriate >>>>> > >>>>> > >>>>> > On Wed, Jan 23, 2019 at 11:31 AM Kenneth Knowles <k...@apache.org> >>>>> > wrote: >>>>> >> >>>>> >> I think your concerns are valid but i want to clarify about "first >>>>> >> class async APIs". Does "first class" mean that it is a >>>>> >> well-encapsulated abstraction? or does it mean that the user can more >>>>> >> or less do whatever they want? These are opposite but both valid >>>>> >> meanings for "first class", to me. >>>>> >> >>>>> >> I would not want to encourage users to do explicit multi-threaded >>>>> >> programming or control parallelism. Part of the point of Beam is to >>>>> >> gain big data parallelism without explicit multithreading. I see >>>>> >> asynchronous chaining of futures (or their best-approximation in your >>>>> >> language of choice) as a highly disciplined way of doing asynchronous >>>>> >> dependency-driven computation that is nonetheless conceptually, and >>>>> >> readably, straight-line code. Threads are not required nor the only >>>>> >> way to execute this code. In fact you might often want to execute >>>>> >> without threading for a reference implementation to provide >>>>> >> canonically correct results. APIs that leak lower-level details of >>>>> >> threads are asking for trouble. >>>>> >> >>>>> >> One of our other ideas was to provide a dynamic parameter of type >>>>> >> ExecutorService. The SDK harness (pre-portability: the runner) would >>>>> >> control and observe parallelism while the user could simply register >>>>> >> tasks. Providing a future/promise API is even more disciplined. >>>>> >> >>>>> >> Kenn >>>>> >> >>>>> >> On Wed, Jan 23, 2019 at 10:35 AM Scott Wegner <sc...@apache.org> wrote: >>>>> >>> >>>>> >>> A related question is how to make execution observable such that a >>>>> >>> runner can make proper scaling decisions. Runners decide how to >>>>> >>> schedule bundles within and across multiple worker instances, and can >>>>> >>> use information about execution to make dynamic scaling decisions. >>>>> >>> First-class async APIs seem like they would encourage DoFn authors to >>>>> >>> implement their own parallelization, rather than deferring to the >>>>> >>> runner that should be more capable of providing the right level of >>>>> >>> parallelism. >>>>> >>> >>>>> >>> In the Dataflow worker harness, we estimate execution time to >>>>> >>> PTransform steps by sampling execution time on the execution thread >>>>> >>> and attributing it to the currently invoked method. This approach is >>>>> >>> fairly simple and possible because we assume that execution happens >>>>> >>> within the thread controlled by the runner. Some DoFn's already >>>>> >>> implement their own async logic and break this assumption; I would >>>>> >>> expect more if we make async built into the DoFn APIs. >>>>> >>> >>>>> >>> So: this isn't an argument against async APIs, but rather: does this >>>>> >>> break execution observability, and are there other lightweight >>>>> >>> mechanisms for attributing execution time of async work? >>>>> >>> >>>>> >>> On Tue, Jan 22, 2019 at 7:08 PM Kenneth Knowles <k...@google.com> >>>>> >>> wrote: >>>>> >>>> >>>>> >>>> When executed over the portable APIs, it will be primarily the Java >>>>> >>>> SDK harness that makes all of these decisions. If we wanted runners >>>>> >>>> to have some insight into it we would have to add it to the Beam >>>>> >>>> model protos. I don't have any suggestions there, so I would leave >>>>> >>>> it out of this discussion until there's good ideas. We could learn a >>>>> >>>> lot by trying it out just in the SDK harness. >>>>> >>>> >>>>> >>>> Kenn >>>>> >>>> >>>>> >>>> On Tue, Jan 22, 2019 at 6:12 PM Xinyu Liu <xinyuliu...@gmail.com> >>>>> >>>> wrote: >>>>> >>>>> >>>>> >>>>> I don't have a strong opinion on the resolution of the futures >>>>> >>>>> regarding to @FinishBundle invocation. Leaving it to be unspecified >>>>> >>>>> does give runners more room to implement it with their own support. >>>>> >>>>> >>>>> >>>>> Optimization is also another great point. Fuse seems pretty complex >>>>> >>>>> to me too if we need to find a way to chain the resulting future >>>>> >>>>> into the next transform, or leave the async transform as a >>>>> >>>>> standalone stage initially? >>>>> >>>>> >>>>> >>>>> Btw, I was counting the number of replies before we hit the >>>>> >>>>> portability. Seems after 4 replies fuse finally showed up :). >>>>> >>>>> >>>>> >>>>> Thanks, >>>>> >>>>> Xinyu >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> On Tue, Jan 22, 2019 at 5:42 PM Kenneth Knowles <k...@google.com> >>>>> >>>>> wrote: >>>>> >>>>>> >>>>> >>>>>> >>>>> >>>>>> >>>>> >>>>>> On Tue, Jan 22, 2019, 17:23 Reuven Lax <re...@google.com wrote: >>>>> >>>>>>> >>>>> >>>>>>> >>>>> >>>>>>> >>>>> >>>>>>> On Tue, Jan 22, 2019 at 5:08 PM Xinyu Liu <xinyuliu...@gmail.com> >>>>> >>>>>>> wrote: >>>>> >>>>>>>> >>>>> >>>>>>>> @Steve: it's good to see that this is going to be useful in your >>>>> >>>>>>>> use cases as well. Thanks for sharing the code from Scio! I can >>>>> >>>>>>>> see in your implementation that waiting for the future >>>>> >>>>>>>> completion is part of the @FinishBundle. We are thinking of >>>>> >>>>>>>> taking advantage of the underlying runner async support so the >>>>> >>>>>>>> user-level code won't need to implement this logic, e.g. Samza >>>>> >>>>>>>> has an AsyncSteamTask api that provides a callback to invoke >>>>> >>>>>>>> after future completion[1], and Flink also has AsyncFunction api >>>>> >>>>>>>> [2] which provides a ResultFuture similar to the API we >>>>> >>>>>>>> discussed. >>>>> >>>>>>> >>>>> >>>>>>> >>>>> >>>>>>> Can this be done correctly? What I mean is that if the process >>>>> >>>>>>> dies, can you guarantee that no data is lost? Beam currently >>>>> >>>>>>> guarantees this for FinishBundle, but if you use an arbitrary >>>>> >>>>>>> async framework this might not be true. >>>>> >>>>>> >>>>> >>>>>> >>>>> >>>>>> What a Beam runner guarantees is that *if* the bundle is >>>>> >>>>>> committed, *then* finishbundle has run. So it seems just as easy >>>>> >>>>>> to say *if* a bundle is committed, *then* every async result has >>>>> >>>>>> been resolved. >>>>> >>>>>> >>>>> >>>>>> If the process dies the two cases should be naturally analogous. >>>>> >>>>>> >>>>> >>>>>> But it raises the question of whether they should be resolved >>>>> >>>>>> prior to finishbundle, after, or unspecified. I lean toward >>>>> >>>>>> unspecified. >>>>> >>>>>> >>>>> >>>>>> That's for a single ParDo. Where this could get complex is >>>>> >>>>>> optimizing fused stages for greater asynchrony. >>>>> >>>>>> >>>>> >>>>>> Kenn >>>>> >>>>>> >>>>> >>>>>>> >>>>> >>>>>>>> >>>>> >>>>>>>> A simple use case for this is to execute a Runnable >>>>> >>>>>>>> asynchronously in user's own executor. The following code >>>>> >>>>>>>> illustrates Kenn's option #2, with a very simple single-thread >>>>> >>>>>>>> pool being the executor: >>>>> >>>>>>>> >>>>> >>>>>>>> new DoFn<InputT, OutputT>() { >>>>> >>>>>>>> @ProcessElement >>>>> >>>>>>>> public void process(@Element InputT element, @Output >>>>> >>>>>>>> OutputReceiver<CompletionStage<OutputT>> outputReceiver) { >>>>> >>>>>>>> CompletableFuture<OutputT> future = >>>>> >>>>>>>> CompletableFuture.supplyAsync( >>>>> >>>>>>>> () -> someOutput, >>>>> >>>>>>>> Executors.newSingleThreadExecutor()); >>>>> >>>>>>>> outputReceiver.output(future); >>>>> >>>>>>>> } >>>>> >>>>>>>> } >>>>> >>>>>>>> >>>>> >>>>>>>> The neat thing about this API is that the user can choose their >>>>> >>>>>>>> own async framework and we only expect the output to be a >>>>> >>>>>>>> CompletionStage. >>>>> >>>>>>>> >>>>> >>>>>>>> >>>>> >>>>>>>> For the implementation of bundling, can we compose a >>>>> >>>>>>>> CompletableFuture from each element in the bundle, e.g. >>>>> >>>>>>>> CompletableFuture.allOf(...), and then invoke @FinishBundle when >>>>> >>>>>>>> this future is complete? Seems this might work. >>>>> >>>>>>>> >>>>> >>>>>>>> Thanks, >>>>> >>>>>>>> Xinyu >>>>> >>>>>>>> >>>>> >>>>>>>> >>>>> >>>>>>>> [1] >>>>> >>>>>>>> https://samza.apache.org/learn/documentation/1.0.0/api/javadocs/org/apache/samza/task/AsyncStreamTask.html >>>>> >>>>>>>> [2] >>>>> >>>>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/stream/operators/asyncio.html >>>>> >>>>>>>> >>>>> >>>>>>>> On Tue, Jan 22, 2019 at 2:45 PM Steve Niemitz >>>>> >>>>>>>> <sniem...@apache.org> wrote: >>>>> >>>>>>>>> >>>>> >>>>>>>>> I'd love to see something like this as well. Also +1 to >>>>> >>>>>>>>> process(@Element InputT element, @Output >>>>> >>>>>>>>> OutputReceiver<CompletionStage<OutputT>>). I don't know if >>>>> >>>>>>>>> there's much benefit to passing a future in, since the >>>>> >>>>>>>>> framework itself could hook up the process function to complete >>>>> >>>>>>>>> when the future completes. >>>>> >>>>>>>>> >>>>> >>>>>>>>> I feel like I've spent a bunch of time writing very similar >>>>> >>>>>>>>> "kick off a future in ProcessElement, join it in FinishBundle" >>>>> >>>>>>>>> code, and looking around beam itself a lot of built-in >>>>> >>>>>>>>> transforms do it as well. Scio provides a few AsyncDoFn >>>>> >>>>>>>>> implementations [1] but it'd be great to see this as a >>>>> >>>>>>>>> first-class concept in beam itself. Doing error handling, >>>>> >>>>>>>>> concurrency, etc correctly can be tricky. >>>>> >>>>>>>>> >>>>> >>>>>>>>> [1] >>>>> >>>>>>>>> https://github.com/spotify/scio/blob/master/scio-core/src/main/java/com/spotify/scio/transforms/BaseAsyncDoFn.java >>>>> >>>>>>>>> >>>>> >>>>>>>>> On Tue, Jan 22, 2019 at 5:39 PM Kenneth Knowles >>>>> >>>>>>>>> <k...@google.com> wrote: >>>>> >>>>>>>>>> >>>>> >>>>>>>>>> If the input is a CompletionStage<InputT> then the output >>>>> >>>>>>>>>> should also be a CompletionStage<OutputT>, since all you >>>>> >>>>>>>>>> should do is async chaining. We could enforce this by giving >>>>> >>>>>>>>>> the DoFn an OutputReceiver(CompletionStage<OutputT>). >>>>> >>>>>>>>>> >>>>> >>>>>>>>>> Another possibility that might be even more robust against >>>>> >>>>>>>>>> poor future use could be process(@Element InputT element, >>>>> >>>>>>>>>> @Output OutputReceiver<CompletionStage<OutputT>>). In this >>>>> >>>>>>>>>> way, the process method itself will be async chained, rather >>>>> >>>>>>>>>> than counting on the user to do the right thing. >>>>> >>>>>>>>>> >>>>> >>>>>>>>>> We should see how these look in real use cases. The way that >>>>> >>>>>>>>>> processing is split between @ProcessElement and @FinishBundle >>>>> >>>>>>>>>> might complicate things. >>>>> >>>>>>>>>> >>>>> >>>>>>>>>> Kenn >>>>> >>>>>>>>>> >>>>> >>>>>>>>>> On Tue, Jan 22, 2019 at 12:44 PM Xinyu Liu >>>>> >>>>>>>>>> <xinyuliu...@gmail.com> wrote: >>>>> >>>>>>>>>>> >>>>> >>>>>>>>>>> Hi, guys, >>>>> >>>>>>>>>>> >>>>> >>>>>>>>>>> As more users try out Beam running on the SamzaRunner, we got >>>>> >>>>>>>>>>> a lot of asks for an asynchronous processing API. There are a >>>>> >>>>>>>>>>> few reasons for these asks: >>>>> >>>>>>>>>>> >>>>> >>>>>>>>>>> The users here are experienced in asynchronous programming. >>>>> >>>>>>>>>>> With async frameworks such as Netty and ParSeq and libs like >>>>> >>>>>>>>>>> async jersey client, they are able to make remote calls >>>>> >>>>>>>>>>> efficiently and the libraries help manage the execution >>>>> >>>>>>>>>>> threads underneath. Async remote calls are very common in >>>>> >>>>>>>>>>> most of our streaming applications today. >>>>> >>>>>>>>>>> Many jobs are running on a multi-tenancy cluster. Async >>>>> >>>>>>>>>>> processing helps for less resource usage and fast computation >>>>> >>>>>>>>>>> (less context switch). >>>>> >>>>>>>>>>> >>>>> >>>>>>>>>>> I asked about the async support in a previous email thread. >>>>> >>>>>>>>>>> The following API was mentioned in the reply: >>>>> >>>>>>>>>>> >>>>> >>>>>>>>>>> new DoFn<InputT, OutputT>() { >>>>> >>>>>>>>>>> @ProcessElement >>>>> >>>>>>>>>>> public void process(@Element CompletionStage<InputT> >>>>> >>>>>>>>>>> element, ...) { >>>>> >>>>>>>>>>> element.thenApply(...) >>>>> >>>>>>>>>>> } >>>>> >>>>>>>>>>> } >>>>> >>>>>>>>>>> >>>>> >>>>>>>>>>> We are wondering whether there are any discussions on this >>>>> >>>>>>>>>>> API and related docs. It is awesome that you guys already >>>>> >>>>>>>>>>> considered having DoFn to process asynchronously. Out of >>>>> >>>>>>>>>>> curiosity, this API seems to create a CompletionState out of >>>>> >>>>>>>>>>> the input element (probably using framework's executor) and >>>>> >>>>>>>>>>> then allow user to chain on it. To us, it seems more >>>>> >>>>>>>>>>> convenient if the DoFn output a CompletionStage<OutputT> or >>>>> >>>>>>>>>>> pass in a CompletionStage<OutputT> to invoke upon completion. >>>>> >>>>>>>>>>> >>>>> >>>>>>>>>>> We would like to discuss further on the async API and >>>>> >>>>>>>>>>> hopefully we will have a great support in Beam. Really >>>>> >>>>>>>>>>> appreciate the feedback! >>>>> >>>>>>>>>>> >>>>> >>>>>>>>>>> Thanks, >>>>> >>>>>>>>>>> Xinyu >>>>> >>> >>>>> >>> >>>>> >>> >>>>> >>> -- >>>>> >>> >>>>> >>> >>>>> >>> >>>>> >>> >>>>> >>> Got feedback? tinyurl.com/swegner-feedback > > > > -- > > > > > Got feedback? tinyurl.com/swegner-feedback