[
https://issues.apache.org/jira/browse/BEAM-5497?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17137579#comment-17137579
]
Beam JIRA Bot commented on BEAM-5497:
-------------------------------------
This issue was marked "stale-P2" and has not received a public comment in 14
days. It is now automatically moved to P3. If you are still affected by it, you
can comment and move it back to P2.
> Provide support for Gevent
> --------------------------
>
> Key: BEAM-5497
> URL: https://issues.apache.org/jira/browse/BEAM-5497
> Project: Beam
> Issue Type: Improvement
> Components: sdk-py-harness
> Reporter: Rakesh Kumar
> Priority: P3
>
> [Gevent|http://www.gevent.org/] is basically used to make parallel network
> calls. We are using gevent in one of the transformation methods to call
> internal services. Before using the gevent we also patch it as mentioned
> [here|https://github.com/grpc/grpc/issues/4629#issuecomment-376962677].
> The transformation method is making multiple network call in parallel. Here
> is the code snippet:
> {code}
> /__init__.py
> import gevent.monkey
> gevent.monkey.patch_all()
> /transform.py
> from gevent import Greenlet
> from gevent import joinall
> def filter_out_invalid_users(events):
> key, user_id_data_pairs = events
> user_ids = [user_id for user_id, data in user_id_data_pairs]
> jobs = []
> id_chunks = utils.chunk_list_evenly(user_ids, BATCH_SIZE)
> for id_chunk in id_chunks:
> jobs.append(Greenlet.spawn(_call_users_service, # _call_user_service_
> method is making the network call.
> list(id_chunk)))
> """
> Here we increase the timeout based on the number of greenlets we are
> running, to account for yielding
> among greenlets
> """
> join_timeout = GREENLET_TIMEOUT + len(jobs) * GREENLET_TIMEOUT * 0.1
> joinall(jobs, timeout=join_timeout)
> successful_jobs = [job for job in jobs if job.successful()]
> valid_user_ids = []
> for job in successful_jobs:
> network_response = job.get()
> valid_user_ids.append(network_response.user_id)
> yield valid_user_ids
> def _call_users_service(user_ids):
> # make network call and return response
> ..
> ..
> return network_response
> {code}
> This allows pipelines to start and partially run. However the tasks produce a
> stream of gevent exceptions and do not make any progress.:
> {code}
>
> Exception greenlet.error: error('cannot switch to a different thread',) in
> 'grpc._cython.cygrpc.run_loop' ignored
> Traceback (most recent call last):
> File "src/gevent/event.py", line 240, in gevent._event.Event.wait
> File "src/gevent/event.py", line 140, in
> gevent._event._AbstractLinkable._wait
> File "src/gevent/event.py", line 117, in
> gevent._event._AbstractLinkable._wait_core
> File "src/gevent/event.py", line 119, in
> gevent._event._AbstractLinkable._wait_core
> File "src/gevent/_greenlet_primitives.py", line 59, in
> gevent.__greenlet_primitives.SwitchOutGreenletWithLoop.switch
> File "src/gevent/_greenlet_primitives.py", line 59, in
> gevent.__greenlet_primitives.SwitchOutGreenletWithLoop.switch
> File "src/gevent/_greenlet_primitives.py", line 63, in
> gevent.__greenlet_primitives.SwitchOutGreenletWithLoop.switch
> File "src/gevent/__greenlet_primitives.pxd", line 35, in
> gevent.__greenlet_primitives._greenlet_switch
> greenlet.error: cannot switch to a different thread
> {code}
> The alternative approach is to use multiprocess module as shown
> [here|https://github.com/apache/beam/blob/59c85b44d156bb7b4462d80fcb5591f860235708/sdks/python/apache_beam/internal/util.py#L117]
> &
> [here|https://github.com/apache/beam/blob/7bd73a51b670755bbb19e1291003722d5d16bdc5/sdks/python/apache_beam/io/filebasedsink.py#L313].
> Gevent is lightweight and good for parallelizing IO/Network bound jobs and it
> can efficiently use resources and it can scale well in case heavy load.
>
--
This message was sent by Atlassian Jira
(v8.3.4#803005)