commit:     edba848a013e1797faeacc1911a5e6571fca3ca7
Author:     Zac Medico <zmedico <AT> gentoo <DOT> org>
AuthorDate: Thu Jul  5 04:44:42 2018 +0000
Commit:     Zac Medico <zmedico <AT> gentoo <DOT> org>
CommitDate: Wed Jul 11 07:40:59 2018 +0000
URL:        https://gitweb.gentoo.org/proj/portage.git/commit/?id=edba848a

Add python2 compatible coroutine support (bug 660426)

For readability, it's desirable to make asynchronous code use
coroutines to avoid callbacks when possible. For python2 compatibility,
generators that yield Futures can be used to implement coroutines.

Add a compat_coroutine module which provides a @coroutine decorator
and a coroutine_return function that can be used to return a value
from a generator. The decorated function returns a Future which is
done when the generator is exhausted. Usage is very similar to asyncio
coroutine usage in python3.4 (see unit tests).

Bug: https://bugs.gentoo.org/660426

 .../tests/util/futures/test_compat_coroutine.py    | 159 +++++++++++++++++++++
 pym/portage/util/futures/compat_coroutine.py       | 112 +++++++++++++++
 2 files changed, 271 insertions(+)

diff --git a/pym/portage/tests/util/futures/test_compat_coroutine.py 
b/pym/portage/tests/util/futures/test_compat_coroutine.py
new file mode 100644
index 000000000..cbc070869
--- /dev/null
+++ b/pym/portage/tests/util/futures/test_compat_coroutine.py
@@ -0,0 +1,159 @@
+# Copyright 2018 Gentoo Foundation
+# Distributed under the terms of the GNU General Public License v2
+
+from portage.util.futures import asyncio
+from portage.util.futures.compat_coroutine import (
+       coroutine,
+       coroutine_return,
+)
+from portage.tests import TestCase
+
+
+class CompatCoroutineTestCase(TestCase):
+
+       def test_returning_coroutine(self):
+               @coroutine
+               def returning_coroutine():
+                       yield asyncio.sleep(0)
+                       coroutine_return('success')
+
+               self.assertEqual('success',
+                       
asyncio.get_event_loop().run_until_complete(returning_coroutine()))
+
+       def test_raising_coroutine(self):
+
+               class TestException(Exception):
+                       pass
+
+               @coroutine
+               def raising_coroutine():
+                       yield asyncio.sleep(0)
+                       raise TestException('exception')
+
+               self.assertRaises(TestException,
+                       asyncio.get_event_loop().run_until_complete, 
raising_coroutine())
+
+       def test_catching_coroutine(self):
+
+               class TestException(Exception):
+                       pass
+
+               @coroutine
+               def catching_coroutine(loop=None):
+                       loop = asyncio._wrap_loop(loop)
+                       future = loop.create_future()
+                       loop.call_soon(future.set_exception, 
TestException('exception'))
+                       try:
+                               yield future
+                       except TestException:
+                               self.assertTrue(True)
+                       else:
+                               self.assertTrue(False)
+                       coroutine_return('success')
+
+               loop = asyncio.get_event_loop()
+               self.assertEqual('success',
+                       loop.run_until_complete(catching_coroutine(loop=loop)))
+
+       def test_cancelled_coroutine(self):
+
+               @coroutine
+               def cancelled_coroutine(loop=None):
+                       loop = asyncio._wrap_loop(loop)
+                       while True:
+                               yield loop.create_future()
+
+               loop = asyncio.get_event_loop()
+               future = cancelled_coroutine(loop=loop)
+               loop.call_soon(future.cancel)
+
+               self.assertRaises(asyncio.CancelledError,
+                       loop.run_until_complete, future)
+
+       def test_cancelled_future(self):
+
+               @coroutine
+               def cancelled_future_coroutine(loop=None):
+                       loop = asyncio._wrap_loop(loop)
+                       while True:
+                               future = loop.create_future()
+                               loop.call_soon(future.cancel)
+                               yield future
+
+               loop = asyncio.get_event_loop()
+               self.assertRaises(asyncio.CancelledError,
+                       loop.run_until_complete, 
cancelled_future_coroutine(loop=loop))
+
+       def test_yield_expression_result(self):
+               @coroutine
+               def yield_expression_coroutine():
+                       for i in range(3):
+                               x = yield asyncio.sleep(0, result=i)
+                               self.assertEqual(x, i)
+
+               
asyncio.get_event_loop().run_until_complete(yield_expression_coroutine())
+
+       def test_method_coroutine(self):
+
+               class Cubby(object):
+
+                       _empty = object()
+
+                       def __init__(self, loop):
+                               self._loop = loop
+                               self._value = self._empty
+                               self._waiters = []
+
+                       def _notify(self):
+                               waiters = self._waiters
+                               self._waiters = []
+                               for waiter in waiters:
+                                       waiter.cancelled() or 
waiter.set_result(None)
+
+                       def _wait(self):
+                               waiter = self._loop.create_future()
+                               self._waiters.append(waiter)
+                               return waiter
+
+                       @coroutine
+                       def read(self):
+                               while self._value is self._empty:
+                                       yield self._wait()
+
+                               value = self._value
+                               self._value = self._empty
+                               self._notify()
+                               coroutine_return(value)
+
+                       @coroutine
+                       def write(self, value):
+                               while self._value is not self._empty:
+                                       yield self._wait()
+
+                               self._value = value
+                               self._notify()
+
+               @coroutine
+               def writer_coroutine(cubby, values, sentinel):
+                       for value in values:
+                               yield cubby.write(value)
+                       yield cubby.write(sentinel)
+
+               @coroutine
+               def reader_coroutine(cubby, sentinel):
+                       results = []
+                       while True:
+                               result = yield cubby.read()
+                               if result == sentinel:
+                                       break
+                               results.append(result)
+                       coroutine_return(results)
+
+               loop = asyncio.get_event_loop()
+               cubby = Cubby(loop)
+               values = list(range(3))
+               writer = asyncio.ensure_future(writer_coroutine(cubby, values, 
None), loop=loop)
+               reader = asyncio.ensure_future(reader_coroutine(cubby, None), 
loop=loop)
+               loop.run_until_complete(asyncio.wait([writer, reader]))
+
+               self.assertEqual(reader.result(), values)

diff --git a/pym/portage/util/futures/compat_coroutine.py 
b/pym/portage/util/futures/compat_coroutine.py
new file mode 100644
index 000000000..17400b74d
--- /dev/null
+++ b/pym/portage/util/futures/compat_coroutine.py
@@ -0,0 +1,112 @@
+# Copyright 2018 Gentoo Foundation
+# Distributed under the terms of the GNU General Public License v2
+
+from portage.util.futures import asyncio
+import functools
+
+
+def coroutine(generator_func):
+       """
+       A decorator for a generator function that behaves as coroutine function.
+       The generator should yield a Future instance in order to wait for it,
+       and the result becomes the result of the current yield-expression,
+       via the PEP 342 generator send() method.
+
+       The decorated function returns a Future which is done when the generator
+       is exhausted. The generator can return a value via the coroutine_return
+       function.
+
+       @param generator_func: A generator function that yields Futures, and
+               will receive the result of each Future as the result of the
+               corresponding yield-expression.
+       @type generator_func: function
+       @rtype: function
+       @return: A function which calls the given generator function and
+               returns a Future that is done when the generator is exhausted.
+       """
+       # Note that functools.partial does not work for decoration of
+       # methods, since it doesn't implement the descriptor protocol.
+       # This problem is solve by defining a wrapper function.
+       @functools.wraps(generator_func)
+       def wrapped(*args, **kwargs):
+               return _generator_future(generator_func, *args, **kwargs)
+       return wrapped
+
+
+def coroutine_return(result=None):
+       """
+       Terminate the current coroutine and set the result of the associated
+       Future.
+
+       @param result: of the current coroutine's Future
+       @type object
+       """
+       raise _CoroutineReturnValue(result)
+
+
+def _generator_future(generator_func, *args, **kwargs):
+       """
+       Call generator_func with the given arguments, and return a Future
+       that is done when the resulting generation is exhausted. If a
+       keyword argument named 'loop' is given, then it is used instead of
+       the default event loop.
+       """
+       loop = asyncio._wrap_loop(kwargs.get('loop'))
+       result = loop.create_future()
+       _GeneratorTask(generator_func(*args, **kwargs), result, loop=loop)
+       return result
+
+
+class _CoroutineReturnValue(Exception):
+       def __init__(self, result):
+               self.result = result
+
+
+class _GeneratorTask(object):
+       """
+       Asynchronously executes the generator to completion, waiting for
+       the result of each Future that it yields, and sending the result
+       to the generator.
+       """
+       def __init__(self, generator, result, loop):
+               self._generator = generator
+               self._result = result
+               self._loop = loop
+               result.add_done_callback(self._cancel_callback)
+               loop.call_soon(self._next)
+
+       def _cancel_callback(self, result):
+               if result.cancelled():
+                       self._generator.close()
+
+       def _next(self, previous=None):
+               if self._result.cancelled():
+                       if previous is not None:
+                               # Consume exceptions, in order to avoid 
triggering
+                               # the event loop's exception handler.
+                               previous.cancelled() or previous.exception()
+                       return
+               try:
+                       if previous is None:
+                               future = next(self._generator)
+                       elif previous.cancelled():
+                               self._generator.throw(asyncio.CancelledError())
+                               future = next(self._generator)
+                       elif previous.exception() is None:
+                               future = self._generator.send(previous.result())
+                       else:
+                               self._generator.throw(previous.exception())
+                               future = next(self._generator)
+
+               except _CoroutineReturnValue as e:
+                       if not self._result.cancelled():
+                               self._result.set_result(e.result)
+               except StopIteration:
+                       if not self._result.cancelled():
+                               self._result.set_result(None)
+               except Exception as e:
+                       if not self._result.cancelled():
+                               self._result.set_exception(e)
+               else:
+                       future = asyncio.ensure_future(future, loop=self._loop)
+                       future.add_done_callback(self._next)

Reply via email to