Thank you Rakesh! Let us know how it goes if you get a chance to try
multiprocess.

On Tue, Sep 25, 2018 at 9:19 AM, Rakesh Kumar <[email protected]> wrote:

> Hi Ahmet,
>
> I filed the jira ticket https://issues.apache.org/jira/browse/BEAM-5497.
> Let me know if you need anything else from us.
>
> Thank you,
> Rakesh
>
> On Mon, Sep 24, 2018 at 5:08 PM Ahmet Altay <[email protected]> wrote:
>
>> On Sun, Sep 23, 2018 at 9:21 PM, Rakesh Kumar <[email protected]>
>> wrote:
>>
>>> Thanks Ahmet for providing a reference code. I will give it a try.
>>>
>>> I also tried to read the code it feels like you are using the
>>> multiprocess for parallelizing runtime jobs. We wanted to use Gevent
>>> because it is lightweight and good for parallelizing IO/Network bound jobs.
>>>
>>
>> We are using this code for IO bound operation. For example [1], here it
>> is used to make calls into GCS in parallel with batches of files.
>>
>> [1] https://github.com/apache/beam/blob/7bd73a51b670755bbb19e129100372
>> 2d5d16bdc5/sdks/python/apache_beam/io/filebasedsink.py#L313
>>
>>
>>> I would also recommend providing Gevent support in the future because it
>>> can efficiently use resources and it can scale well in heavy load.
>>>
>>
>> Do you mind filing a JIRA for the gevent issue so that we can keep track
>> of it?
>>
>>
>>>
>>>
>>>
>>> On Fri, Sep 21, 2018 at 5:58 PM Ahmet Altay <[email protected]> wrote:
>>>
>>>> Thank you for the example it helps.
>>>>
>>>> I still do not know what is wrong with gevent. Would you consider using
>>>> multiprocessing package? We are already using that to accomplish something
>>>> similar in file based sinks, and there is already utility function that
>>>> wraps it around similar to your example [1].
>>>>
>>>> [1] https://github.com/apache/beam/blob/59c85b44d156bb7b4462d80fcb5591
>>>> f860235708/sdks/python/apache_beam/internal/util.py#L117
>>>>
>>>> Ahmet
>>>>
>>>>
>>>> On Wed, Sep 19, 2018 at 10:25 PM, Rakesh Kumar <[email protected]>
>>>> wrote:
>>>>
>>>>>
>>>>> Gevent <http://www.gevent.org/> is basically used to make parallel
>>>>> network calls. We are using gevent in one of the transformation methods to
>>>>> call internal services. The transformation method is making multiple
>>>>> network call in parallel. Here is the code snippet:
>>>>> /__init__.py
>>>>> import gevent.monkey
>>>>> gevent.monkey.patch_all()
>>>>>
>>>>> /transform.py
>>>>> from gevent import Greenlet
>>>>> from gevent import joinall
>>>>> def filter_out_invalid_users(events):
>>>>>    key, user_id_data_pairs = events
>>>>>    user_ids = [user_id for user_id, data in user_id_data_pairs]
>>>>>
>>>>>    jobs = []
>>>>>    id_chunks = utils.chunk_list_evenly(user_ids, BATCH_SIZE)
>>>>>    for id_chunk in id_chunks:
>>>>>       jobs.append(Greenlet.spawn(_call_users_service, #
>>>>> _call_user_service_ method is making the network call.
>>>>>                                  list(id_chunk)))
>>>>>
>>>>>    """
>>>>>    Here we increase the timeout based on the number of greenlets we
>>>>> are running, to account for yielding
>>>>>    among greenlets
>>>>>    """
>>>>>    join_timeout = GREENLET_TIMEOUT + len(jobs) * GREENLET_TIMEOUT *
>>>>> 0.1
>>>>>    joinall(jobs, timeout=join_timeout)
>>>>>
>>>>>    successful_jobs = [job for job in jobs if job.successful()]
>>>>>    valid_user_ids = []
>>>>>    for job in successful_jobs:
>>>>>       network_response = job.get()
>>>>>       valid_user_ids.append(network_response.user_id)
>>>>>    yield valid_user_ids
>>>>>
>>>>> def _call_users_service(user_ids):
>>>>>    # make network call and return response
>>>>>    ..
>>>>>    ..
>>>>>    return network_response
>>>>>
>>>>> On Tue, Sep 18, 2018 at 7:07 PM Ahmet Altay <[email protected]> wrote:
>>>>>
>>>>>> I am also not familiar with gevent. Could you explain what are you
>>>>>> trying to do and how do you plan to use gevent?
>>>>>>
>>>>>> On Tue, Sep 18, 2018 at 9:38 AM, Lukasz Cwik <[email protected]>
>>>>>> wrote:
>>>>>>
>>>>>>> I don't think anyone has tried what your doing. The code that your
>>>>>>> working with is very new.
>>>>>>>
>>>>>>> On Mon, Sep 17, 2018 at 5:02 PM Micah Wylde <[email protected]> wrote:
>>>>>>>
>>>>>>>> Hi all,
>>>>>>>>
>>>>>>>> We're using the Python SDK with the portable Flink runner and
>>>>>>>> running into some problems integrating gevent. We're patching the gRPC
>>>>>>>> runtime for gevent as described in [0] which allows pipelines to start 
>>>>>>>> and
>>>>>>>> partially run. However the tasks produce a stream of gevent exceptions:
>>>>>>>>
>>>>>>>> Exception greenlet.error: error('cannot switch to a different
>>>>>>>> thread',) in 'grpc._cython.cygrpc.run_loop' ignored
>>>>>>>> Traceback (most recent call last):
>>>>>>>>   File "src/gevent/event.py", line 240, in gevent._event.Event.wait
>>>>>>>>   File "src/gevent/event.py", line 140, in gevent._event._
>>>>>>>> AbstractLinkable._wait
>>>>>>>>   File "src/gevent/event.py", line 117, in gevent._event._
>>>>>>>> AbstractLinkable._wait_core
>>>>>>>>   File "src/gevent/event.py", line 119, in gevent._event._
>>>>>>>> AbstractLinkable._wait_core
>>>>>>>>   File "src/gevent/_greenlet_primitives.py", line 59, in
>>>>>>>> gevent.__greenlet_primitives.SwitchOutGreenletWithLoop.switch
>>>>>>>>   File "src/gevent/_greenlet_primitives.py", line 59, in
>>>>>>>> gevent.__greenlet_primitives.SwitchOutGreenletWithLoop.switch
>>>>>>>>   File "src/gevent/_greenlet_primitives.py", line 63, in
>>>>>>>> gevent.__greenlet_primitives.SwitchOutGreenletWithLoop.switch
>>>>>>>>   File "src/gevent/__greenlet_primitives.pxd", line 35, in
>>>>>>>> gevent.__greenlet_primitives._greenlet_switch
>>>>>>>> greenlet.error: cannot switch to a different thread
>>>>>>>>
>>>>>>>> and do not make any progress.
>>>>>>>>
>>>>>>>> Has anybody else successfully used the portable python sdk with
>>>>>>>> gevent? Or is there a recommended alternative for doing async IO in 
>>>>>>>> python
>>>>>>>> pipelines?
>>>>>>>>
>>>>>>>> [0] https://github.com/grpc/grpc/issues/4629#issuecomment-376962677
>>>>>>>>
>>>>>>>> Micah
>>>>>>>>
>>>>>>>
>>>>>> --
>>>>> Rakesh Kumar
>>>>> Software Engineer
>>>>> 510-761-1364 <(510)%20761-1364> |
>>>>>
>>>>> <https://www.lyft.com/>
>>>>>
>>>>
>>>> --
>>> Rakesh Kumar
>>> Software Engineer
>>> 510-761-1364 <(510)%20761-1364> |
>>>
>>> <https://www.lyft.com/>
>>>
>> --
> Rakesh Kumar
> Software Engineer
> 510-761-1364 |
>
> <https://www.lyft.com/>
>

Reply via email to