Thank you Rakesh! Let us know how it goes if you get a chance to try multiprocess.
On Tue, Sep 25, 2018 at 9:19 AM, Rakesh Kumar <[email protected]> wrote: > Hi Ahmet, > > I filed the jira ticket https://issues.apache.org/jira/browse/BEAM-5497. > Let me know if you need anything else from us. > > Thank you, > Rakesh > > On Mon, Sep 24, 2018 at 5:08 PM Ahmet Altay <[email protected]> wrote: > >> On Sun, Sep 23, 2018 at 9:21 PM, Rakesh Kumar <[email protected]> >> wrote: >> >>> Thanks Ahmet for providing a reference code. I will give it a try. >>> >>> I also tried to read the code it feels like you are using the >>> multiprocess for parallelizing runtime jobs. We wanted to use Gevent >>> because it is lightweight and good for parallelizing IO/Network bound jobs. >>> >> >> We are using this code for IO bound operation. For example [1], here it >> is used to make calls into GCS in parallel with batches of files. >> >> [1] https://github.com/apache/beam/blob/7bd73a51b670755bbb19e129100372 >> 2d5d16bdc5/sdks/python/apache_beam/io/filebasedsink.py#L313 >> >> >>> I would also recommend providing Gevent support in the future because it >>> can efficiently use resources and it can scale well in heavy load. >>> >> >> Do you mind filing a JIRA for the gevent issue so that we can keep track >> of it? >> >> >>> >>> >>> >>> On Fri, Sep 21, 2018 at 5:58 PM Ahmet Altay <[email protected]> wrote: >>> >>>> Thank you for the example it helps. >>>> >>>> I still do not know what is wrong with gevent. Would you consider using >>>> multiprocessing package? We are already using that to accomplish something >>>> similar in file based sinks, and there is already utility function that >>>> wraps it around similar to your example [1]. >>>> >>>> [1] https://github.com/apache/beam/blob/59c85b44d156bb7b4462d80fcb5591 >>>> f860235708/sdks/python/apache_beam/internal/util.py#L117 >>>> >>>> Ahmet >>>> >>>> >>>> On Wed, Sep 19, 2018 at 10:25 PM, Rakesh Kumar <[email protected]> >>>> wrote: >>>> >>>>> >>>>> Gevent <http://www.gevent.org/> is basically used to make parallel >>>>> network calls. We are using gevent in one of the transformation methods to >>>>> call internal services. The transformation method is making multiple >>>>> network call in parallel. Here is the code snippet: >>>>> /__init__.py >>>>> import gevent.monkey >>>>> gevent.monkey.patch_all() >>>>> >>>>> /transform.py >>>>> from gevent import Greenlet >>>>> from gevent import joinall >>>>> def filter_out_invalid_users(events): >>>>> key, user_id_data_pairs = events >>>>> user_ids = [user_id for user_id, data in user_id_data_pairs] >>>>> >>>>> jobs = [] >>>>> id_chunks = utils.chunk_list_evenly(user_ids, BATCH_SIZE) >>>>> for id_chunk in id_chunks: >>>>> jobs.append(Greenlet.spawn(_call_users_service, # >>>>> _call_user_service_ method is making the network call. >>>>> list(id_chunk))) >>>>> >>>>> """ >>>>> Here we increase the timeout based on the number of greenlets we >>>>> are running, to account for yielding >>>>> among greenlets >>>>> """ >>>>> join_timeout = GREENLET_TIMEOUT + len(jobs) * GREENLET_TIMEOUT * >>>>> 0.1 >>>>> joinall(jobs, timeout=join_timeout) >>>>> >>>>> successful_jobs = [job for job in jobs if job.successful()] >>>>> valid_user_ids = [] >>>>> for job in successful_jobs: >>>>> network_response = job.get() >>>>> valid_user_ids.append(network_response.user_id) >>>>> yield valid_user_ids >>>>> >>>>> def _call_users_service(user_ids): >>>>> # make network call and return response >>>>> .. >>>>> .. >>>>> return network_response >>>>> >>>>> On Tue, Sep 18, 2018 at 7:07 PM Ahmet Altay <[email protected]> wrote: >>>>> >>>>>> I am also not familiar with gevent. Could you explain what are you >>>>>> trying to do and how do you plan to use gevent? >>>>>> >>>>>> On Tue, Sep 18, 2018 at 9:38 AM, Lukasz Cwik <[email protected]> >>>>>> wrote: >>>>>> >>>>>>> I don't think anyone has tried what your doing. The code that your >>>>>>> working with is very new. >>>>>>> >>>>>>> On Mon, Sep 17, 2018 at 5:02 PM Micah Wylde <[email protected]> wrote: >>>>>>> >>>>>>>> Hi all, >>>>>>>> >>>>>>>> We're using the Python SDK with the portable Flink runner and >>>>>>>> running into some problems integrating gevent. We're patching the gRPC >>>>>>>> runtime for gevent as described in [0] which allows pipelines to start >>>>>>>> and >>>>>>>> partially run. However the tasks produce a stream of gevent exceptions: >>>>>>>> >>>>>>>> Exception greenlet.error: error('cannot switch to a different >>>>>>>> thread',) in 'grpc._cython.cygrpc.run_loop' ignored >>>>>>>> Traceback (most recent call last): >>>>>>>> File "src/gevent/event.py", line 240, in gevent._event.Event.wait >>>>>>>> File "src/gevent/event.py", line 140, in gevent._event._ >>>>>>>> AbstractLinkable._wait >>>>>>>> File "src/gevent/event.py", line 117, in gevent._event._ >>>>>>>> AbstractLinkable._wait_core >>>>>>>> File "src/gevent/event.py", line 119, in gevent._event._ >>>>>>>> AbstractLinkable._wait_core >>>>>>>> File "src/gevent/_greenlet_primitives.py", line 59, in >>>>>>>> gevent.__greenlet_primitives.SwitchOutGreenletWithLoop.switch >>>>>>>> File "src/gevent/_greenlet_primitives.py", line 59, in >>>>>>>> gevent.__greenlet_primitives.SwitchOutGreenletWithLoop.switch >>>>>>>> File "src/gevent/_greenlet_primitives.py", line 63, in >>>>>>>> gevent.__greenlet_primitives.SwitchOutGreenletWithLoop.switch >>>>>>>> File "src/gevent/__greenlet_primitives.pxd", line 35, in >>>>>>>> gevent.__greenlet_primitives._greenlet_switch >>>>>>>> greenlet.error: cannot switch to a different thread >>>>>>>> >>>>>>>> and do not make any progress. >>>>>>>> >>>>>>>> Has anybody else successfully used the portable python sdk with >>>>>>>> gevent? Or is there a recommended alternative for doing async IO in >>>>>>>> python >>>>>>>> pipelines? >>>>>>>> >>>>>>>> [0] https://github.com/grpc/grpc/issues/4629#issuecomment-376962677 >>>>>>>> >>>>>>>> Micah >>>>>>>> >>>>>>> >>>>>> -- >>>>> Rakesh Kumar >>>>> Software Engineer >>>>> 510-761-1364 <(510)%20761-1364> | >>>>> >>>>> <https://www.lyft.com/> >>>>> >>>> >>>> -- >>> Rakesh Kumar >>> Software Engineer >>> 510-761-1364 <(510)%20761-1364> | >>> >>> <https://www.lyft.com/> >>> >> -- > Rakesh Kumar > Software Engineer > 510-761-1364 | > > <https://www.lyft.com/> >
