On Mon, May 10, 2021 at 4:28 PM Ahmet Altay <[email protected]> wrote:
> > > On Mon, May 10, 2021 at 8:01 AM Stephan Hoyer <[email protected]> wrote: > >> Hi Beam devs, >> >> I've been exploring recently how to optimize IO bound steps for my Python >> Beam pipelines, and have come up with a solution that I think might make >> sense to upstream into Beam's Python SDK. >> >> It appears that Beam runners (at least the Cloud Dataflow runner) >> typically use only a single thread per Python process. >> > > I thought the default was not 1 but something else (12?). Maybe that > changed. > > >> The number of threads per worker can be adjusted with flags, but only for >> the entire pipeline. This behavior makes sense *in general* under the >> worst-case assumption that user-code in Python is CPU bound and requires >> the GIL. >> >> However, multiple threads can be quite helpful in many cases, e.g., >> 1. CPU bound tasks that release the GIL. This is typically the case when >> using libraries for numerical computing, such as NumPy and pandas. >> 2. IO bound tasks that can be run asynchronously, e.g., reading/writing >> files or RPCs. This is the use-case for which not using threads can be most >> problematic, e.g., in a recent dataflow pipeline reading/writing lots of >> relatively small files (~1-10 MB) to cloud storage with the default number >> of threads per worker, I found that I was only using ~20% of available CPU. >> >> Because the optimal number of threads for Python code can be quite >> heterogeneous, I would like to be able to indicate that particular steps of >> my Beam pipelines should be executed using more threads. This would be >> particularly valuable for writing libraries of custom IO transforms, which >> should still conservatively assume that *other* steps in user provided >> pipelines may be CPU bound. >> >> The solution I've come up with is to use beam.BatchElements with a ParDo >> function that executes tasks in separate threads (via >> concurrent.futures.ThreadPool). I've used this to make high-level wrappers >> like beam.Map, beam.MapTuple, etc that execute with multiple threads. This >> seems to work pretty well for my use-cases. I can put these in my own >> library, of course, but perhaps these would make sense upstream into Beam's >> Python SDK itself? >> > > I believe a related idea (async pardo) was discussed and some work was > done earlier (https://issues.apache.org/jira/browse/BEAM-6550). AFAIK > Flink also has a similar concept ( > https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/datastream/operators/asyncio/) > as well. > > Perhaps you can share a bit more details about your proposal along with > your code and people could provide more feedback on that. > > Yes, async ParDo does look like the same thing! This sounds like exactly the same use-case -- a ParDo for IO that can be asynchronously executed. In fact, Python has async IO, too. In my particular case threads are slightly more convenient (the libraries I'm using do not natively support async), but that is really a minor detail. If we had a AsyncParDo for Python, I could make that work very easily. In the worst case, I could use a separate thread inside each async call. In any case, see here for my implementation of a ThreadMap ptransform: https://github.com/google/xarray-beam/blob/0.0.1/xarray_beam/_src/threadmap.py Let me know if you think this might be of interest upstream in Beam. I agree that in the long term this makes sense to be implemented in runners, though I guess that might be more challenging to implement. >> One alternative would be supporting this sort of concurrency control >> inside Beam runners. In principle, I imagine runners could tune thread-pool >> size for each stage automatically, e.g., based on CPU usage. To be honest, >> I'm a little surprised this doesn't happen already, but I'm sure there are >> good reasons why not. >> > > Runner support would be the ideal solution. Because runners could decide > on the most optimal pool size based on the real time information. > Supporting and using annotations would provide helpful hints for the > runners. At least the latter part is in progres IIRC. > > >> >> Let me know what you think! >> >> Cheers, >> Stephan >> >>
