Thank you for the example it helps.

I still do not know what is wrong with gevent. Would you consider using
multiprocessing package? We are already using that to accomplish something
similar in file based sinks, and there is already utility function that
wraps it around similar to your example [1].

[1]
https://github.com/apache/beam/blob/59c85b44d156bb7b4462d80fcb5591f860235708/sdks/python/apache_beam/internal/util.py#L117

Ahmet


On Wed, Sep 19, 2018 at 10:25 PM, Rakesh Kumar <[email protected]> wrote:

>
> Gevent <http://www.gevent.org/> is basically used to make parallel
> network calls. We are using gevent in one of the transformation methods to
> call internal services. The transformation method is making multiple
> network call in parallel. Here is the code snippet:
> /__init__.py
> import gevent.monkey
> gevent.monkey.patch_all()
>
> /transform.py
> from gevent import Greenlet
> from gevent import joinall
> def filter_out_invalid_users(events):
>    key, user_id_data_pairs = events
>    user_ids = [user_id for user_id, data in user_id_data_pairs]
>
>    jobs = []
>    id_chunks = utils.chunk_list_evenly(user_ids, BATCH_SIZE)
>    for id_chunk in id_chunks:
>       jobs.append(Greenlet.spawn(_call_users_service, #
> _call_user_service_ method is making the network call.
>                                  list(id_chunk)))
>
>    """
>    Here we increase the timeout based on the number of greenlets we are
> running, to account for yielding
>    among greenlets
>    """
>    join_timeout = GREENLET_TIMEOUT + len(jobs) * GREENLET_TIMEOUT * 0.1
>    joinall(jobs, timeout=join_timeout)
>
>    successful_jobs = [job for job in jobs if job.successful()]
>    valid_user_ids = []
>    for job in successful_jobs:
>       network_response = job.get()
>       valid_user_ids.append(network_response.user_id)
>    yield valid_user_ids
>
> def _call_users_service(user_ids):
>    # make network call and return response
>    ..
>    ..
>    return network_response
>
> On Tue, Sep 18, 2018 at 7:07 PM Ahmet Altay <[email protected]> wrote:
>
>> I am also not familiar with gevent. Could you explain what are you trying
>> to do and how do you plan to use gevent?
>>
>> On Tue, Sep 18, 2018 at 9:38 AM, Lukasz Cwik <[email protected]> wrote:
>>
>>> I don't think anyone has tried what your doing. The code that your
>>> working with is very new.
>>>
>>> On Mon, Sep 17, 2018 at 5:02 PM Micah Wylde <[email protected]> wrote:
>>>
>>>> Hi all,
>>>>
>>>> We're using the Python SDK with the portable Flink runner and running
>>>> into some problems integrating gevent. We're patching the gRPC runtime for
>>>> gevent as described in [0] which allows pipelines to start and partially
>>>> run. However the tasks produce a stream of gevent exceptions:
>>>>
>>>> Exception greenlet.error: error('cannot switch to a different thread',)
>>>> in 'grpc._cython.cygrpc.run_loop' ignored
>>>> Traceback (most recent call last):
>>>>   File "src/gevent/event.py", line 240, in gevent._event.Event.wait
>>>>   File "src/gevent/event.py", line 140, in gevent._event._
>>>> AbstractLinkable._wait
>>>>   File "src/gevent/event.py", line 117, in gevent._event._
>>>> AbstractLinkable._wait_core
>>>>   File "src/gevent/event.py", line 119, in gevent._event._
>>>> AbstractLinkable._wait_core
>>>>   File "src/gevent/_greenlet_primitives.py", line 59, in
>>>> gevent.__greenlet_primitives.SwitchOutGreenletWithLoop.switch
>>>>   File "src/gevent/_greenlet_primitives.py", line 59, in
>>>> gevent.__greenlet_primitives.SwitchOutGreenletWithLoop.switch
>>>>   File "src/gevent/_greenlet_primitives.py", line 63, in
>>>> gevent.__greenlet_primitives.SwitchOutGreenletWithLoop.switch
>>>>   File "src/gevent/__greenlet_primitives.pxd", line 35, in
>>>> gevent.__greenlet_primitives._greenlet_switch
>>>> greenlet.error: cannot switch to a different thread
>>>>
>>>> and do not make any progress.
>>>>
>>>> Has anybody else successfully used the portable python sdk with gevent?
>>>> Or is there a recommended alternative for doing async IO in python
>>>> pipelines?
>>>>
>>>> [0] https://github.com/grpc/grpc/issues/4629#issuecomment-376962677
>>>>
>>>> Micah
>>>>
>>>
>> --
> Rakesh Kumar
> Software Engineer
> 510-761-1364 |
>
> <https://www.lyft.com/>
>

Reply via email to