[ 
https://issues.apache.org/jira/browse/BEAM-5497?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Beam JIRA Bot updated BEAM-5497:
--------------------------------
    Labels: stale-P2  (was: )

> Provide support for Gevent
> --------------------------
>
>                 Key: BEAM-5497
>                 URL: https://issues.apache.org/jira/browse/BEAM-5497
>             Project: Beam
>          Issue Type: Improvement
>          Components: sdk-py-harness
>            Reporter: Rakesh Kumar
>            Priority: P2
>              Labels: stale-P2
>
> [Gevent|http://www.gevent.org/] is basically used to make parallel network 
> calls. We are using gevent in one of the transformation methods to call 
> internal services. Before using the gevent we also patch it as mentioned 
> [here|https://github.com/grpc/grpc/issues/4629#issuecomment-376962677]. 
> The transformation method is making multiple network call in parallel. Here 
> is the code snippet:
> {code}
> /__init__.py
>    import gevent.monkey
>    gevent.monkey.patch_all()
> /transform.py
> from gevent import Greenlet
> from gevent import joinall
> def filter_out_invalid_users(events):
>    key, user_id_data_pairs = events
>    user_ids = [user_id for user_id, data in user_id_data_pairs]
>    jobs = []
>    id_chunks = utils.chunk_list_evenly(user_ids, BATCH_SIZE)
>    for id_chunk in id_chunks:
>     jobs.append(Greenlet.spawn(_call_users_service, # _call_user_service_ 
> method is making the network call.
>    list(id_chunk)))
>    """
>    Here we increase the timeout based on the number of greenlets we are 
> running, to account for yielding
>    among greenlets
>    """
>    join_timeout = GREENLET_TIMEOUT + len(jobs) * GREENLET_TIMEOUT * 0.1
>    joinall(jobs, timeout=join_timeout)
>    successful_jobs = [job for job in jobs if job.successful()]
>    valid_user_ids = []
>    for job in successful_jobs:
>       network_response = job.get()
>       valid_user_ids.append(network_response.user_id)
>       yield valid_user_ids
> def _call_users_service(user_ids):
>     # make network call and return response
>     ..
>     ..
>     return network_response
> {code}
> This allows pipelines to start and partially run. However the tasks produce a 
> stream of gevent exceptions and do not make any progress.:
> {code}
>  
> Exception greenlet.error: error('cannot switch to a different thread',) in 
> 'grpc._cython.cygrpc.run_loop' ignored
> Traceback (most recent call last):
>   File "src/gevent/event.py", line 240, in gevent._event.Event.wait
>   File "src/gevent/event.py", line 140, in 
> gevent._event._AbstractLinkable._wait
>   File "src/gevent/event.py", line 117, in 
> gevent._event._AbstractLinkable._wait_core
>   File "src/gevent/event.py", line 119, in 
> gevent._event._AbstractLinkable._wait_core
>   File "src/gevent/_greenlet_primitives.py", line 59, in 
> gevent.__greenlet_primitives.SwitchOutGreenletWithLoop.switch
>   File "src/gevent/_greenlet_primitives.py", line 59, in 
> gevent.__greenlet_primitives.SwitchOutGreenletWithLoop.switch
>   File "src/gevent/_greenlet_primitives.py", line 63, in 
> gevent.__greenlet_primitives.SwitchOutGreenletWithLoop.switch
>   File "src/gevent/__greenlet_primitives.pxd", line 35, in 
> gevent.__greenlet_primitives._greenlet_switch
> greenlet.error: cannot switch to a different thread
>  {code}
> The alternative approach is to use multiprocess module as shown 
> [here|https://github.com/apache/beam/blob/59c85b44d156bb7b4462d80fcb5591f860235708/sdks/python/apache_beam/internal/util.py#L117]
>  & 
> [here|https://github.com/apache/beam/blob/7bd73a51b670755bbb19e1291003722d5d16bdc5/sdks/python/apache_beam/io/filebasedsink.py#L313].
> Gevent is lightweight and good for parallelizing IO/Network bound jobs and it 
> can efficiently use resources and it can scale well in case heavy load.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to