also lgtm. On Sat, Mar 31, 2018 at 10:46 PM, Zac Medico <zmed...@gentoo.org> wrote:
> This decorator will be useful for retrying asynchronous > operations, such as gpg key refresh (bug 649276). The > API is inspired by tenacity, but is simpler. Only > asynchronous functions (like @asyncio.coroutine functions) > are supported. In order to retry a synchronous function, > first convert it to an asynchronous function as follows: > > asynchronous_func = functools.partial( > loop.run_in_executor, None, synchronous_func) > > Bug: https://bugs.gentoo.org/649276 > See: https://github.com/jd/tenacity > --- > pym/portage/tests/util/futures/test_retry.py | 147 ++++++++++++++++++++++ > pym/portage/util/futures/futures.py | 6 + > pym/portage/util/futures/retry.py | 178 > +++++++++++++++++++++++++++ > 3 files changed, 331 insertions(+) > create mode 100644 pym/portage/tests/util/futures/test_retry.py > create mode 100644 pym/portage/util/futures/retry.py > > diff --git a/pym/portage/tests/util/futures/test_retry.py > b/pym/portage/tests/util/futures/test_retry.py > new file mode 100644 > index 000000000..7641e4e92 > --- /dev/null > +++ b/pym/portage/tests/util/futures/test_retry.py > @@ -0,0 +1,147 @@ > +# Copyright 2018 Gentoo Foundation > +# Distributed under the terms of the GNU General Public License v2 > + > +import functools > + > +try: > + import threading > +except ImportError: > + import dummy_threading as threading > + > +from portage.tests import TestCase > +from portage.util._eventloop.global_event_loop import global_event_loop > +from portage.util.backoff import RandomExponentialBackoff > +from portage.util.futures.futures import TimeoutError > +from portage.util.futures.retry import retry > +from portage.util.futures.wait import wait > +from portage.util.monotonic import monotonic > + > + > +class SucceedLaterException(Exception): > + pass > + > + > +class SucceedLater(object): > + """ > + A callable object that succeeds some duration of time has passed. > + """ > + def __init__(self, duration): > + self._succeed_time = monotonic() + duration > + > + def __call__(self): > + remaining = self._succeed_time - monotonic() > + if remaining > 0: > + raise SucceedLaterException('time until success: > {} seconds'.format(remaining)) > + return 'success' > + > + > +class SucceedNeverException(Exception): > + pass > + > + > +class SucceedNever(object): > + """ > + A callable object that never succeeds. > + """ > + def __call__(self): > + raise SucceedNeverException('expected failure') > + > + > +class HangForever(object): > + """ > + A callable object that sleeps forever. > + """ > + def __call__(self): > + threading.Event().wait() > + > + > +class RetryTestCase(TestCase): > + def testSucceedLater(self): > + loop = global_event_loop() > + func = SucceedLater(1) > + func_coroutine = functools.partial(loop.run_in_executor, > None, func) > + decorator = retry(try_max=9999, > + delay_func=RandomExponentialBackoff(multiplier=0.1, > base=2)) > + decorated_func = decorator(func_coroutine) > + result = loop.run_until_complete(decorated_func()) > + self.assertEqual(result, 'success') > + > + def testSucceedNever(self): > + loop = global_event_loop() > + func = SucceedNever() > + func_coroutine = functools.partial(loop.run_in_executor, > None, func) > + decorator = retry(try_max=4, try_timeout=None, > + delay_func=RandomExponentialBackoff(multiplier=0.1, > base=2)) > + decorated_func = decorator(func_coroutine) > + done, pending = loop.run_until_complete(wait([ > decorated_func()])) > + self.assertEqual(len(done), 1) > + self.assertTrue(isinstance(done[0].exception().__cause__, > SucceedNeverException)) > + > + def testSucceedNeverReraise(self): > + loop = global_event_loop() > + func = SucceedNever() > + func_coroutine = functools.partial(loop.run_in_executor, > None, func) > + decorator = retry(reraise=True, try_max=4, > try_timeout=None, > + delay_func=RandomExponentialBackoff(multiplier=0.1, > base=2)) > + decorated_func = decorator(func_coroutine) > + done, pending = loop.run_until_complete(wait([ > decorated_func()])) > + self.assertEqual(len(done), 1) > + self.assertTrue(isinstance(done[0].exception(), > SucceedNeverException)) > + > + def testHangForever(self): > + loop = global_event_loop() > + func = HangForever() > + func_coroutine = functools.partial(loop.run_in_executor, > None, func) > + decorator = retry(try_max=2, try_timeout=0.1, > + delay_func=RandomExponentialBackoff(multiplier=0.1, > base=2)) > + decorated_func = decorator(func_coroutine) > + done, pending = loop.run_until_complete(wait([ > decorated_func()])) > + self.assertEqual(len(done), 1) > + self.assertTrue(isinstance(done[0].exception().__cause__, > TimeoutError)) > + > + def testHangForeverReraise(self): > + loop = global_event_loop() > + func = HangForever() > + func_coroutine = functools.partial(loop.run_in_executor, > None, func) > + decorator = retry(reraise=True, try_max=2, try_timeout=0.1, > + delay_func=RandomExponentialBackoff(multiplier=0.1, > base=2)) > + decorated_func = decorator(func_coroutine) > + done, pending = loop.run_until_complete(wait([ > decorated_func()])) > + self.assertEqual(len(done), 1) > + self.assertTrue(isinstance(done[0].exception(), > TimeoutError)) > + > + def testCancelRetry(self): > + loop = global_event_loop() > + func = SucceedNever() > + func_coroutine = functools.partial(loop.run_in_executor, > None, func) > + decorator = retry(try_timeout=0.1, > + delay_func=RandomExponentialBackoff(multiplier=0.1, > base=2)) > + decorated_func = decorator(func_coroutine) > + future = decorated_func() > + loop.call_later(0.3, future.cancel) > + done, pending = loop.run_until_complete(wait([future])) > + self.assertEqual(len(done), 1) > + self.assertTrue(done[0].cancelled()) > + > + def testOverallTimeoutWithException(self): > + loop = global_event_loop() > + func = SucceedNever() > + func_coroutine = functools.partial(loop.run_in_executor, > None, func) > + decorator = retry(try_timeout=0.1, overall_timeout=0.3, > + delay_func=RandomExponentialBackoff(multiplier=0.1, > base=2)) > + decorated_func = decorator(func_coroutine) > + done, pending = loop.run_until_complete(wait([ > decorated_func()])) > + self.assertEqual(len(done), 1) > + self.assertTrue(isinstance(done[0].exception().__cause__, > SucceedNeverException)) > + > + def testOverallTimeoutWithTimeoutError(self): > + loop = global_event_loop() > + # results in TimeoutError because it hangs forever > + func = HangForever() > + func_coroutine = functools.partial(loop.run_in_executor, > None, func) > + decorator = retry(try_timeout=0.1, overall_timeout=0.3, > + delay_func=RandomExponentialBackoff(multiplier=0.1, > base=2)) > + decorated_func = decorator(func_coroutine) > + done, pending = loop.run_until_complete(wait([ > decorated_func()])) > + self.assertEqual(len(done), 1) > + self.assertTrue(isinstance(done[0].exception().__cause__, > TimeoutError)) > diff --git a/pym/portage/util/futures/futures.py > b/pym/portage/util/futures/futures.py > index dcf593c01..cd56a27eb 100644 > --- a/pym/portage/util/futures/futures.py > +++ b/pym/portage/util/futures/futures.py > @@ -11,6 +11,7 @@ __all__ = ( > 'CancelledError', > 'Future', > 'InvalidStateError', > + 'TimeoutError', > ) > > try: > @@ -18,6 +19,7 @@ try: > CancelledError, > Future, > InvalidStateError, > + TimeoutError, > ) > except ImportError: > > @@ -30,6 +32,10 @@ except ImportError: > def __init__(self): > Error.__init__(self, "cancelled") > > + class TimeoutError(Error): > + def __init__(self): > + Error.__init__(self, "timed out") > + > class InvalidStateError(Error): > pass > > diff --git a/pym/portage/util/futures/retry.py b/pym/portage/util/futures/ > retry.py > new file mode 100644 > index 000000000..2caf1bbac > --- /dev/null > +++ b/pym/portage/util/futures/retry.py > @@ -0,0 +1,178 @@ > +# Copyright 2018 Gentoo Foundation > +# Distributed under the terms of the GNU General Public License v2 > + > +import functools > + > +from portage.exception import PortageException > +from portage.util._eventloop.global_event_loop import global_event_loop > +from portage.util.futures.futures import TimeoutError > + > + > +class RetryError(PortageException): > + """Raised when retry fails.""" > + def __init__(self): > + PortageException.__init__(self, "retry error") > + > + > +def retry(try_max=None, try_timeout=None, overall_timeout=None, > + delay_func=None, reraise=False, loop=None): > + """ > + Create and return a retry decorator. The decorator is intended to > + operate only on a coroutine function. > + > + @param try_max: maximum number of tries > + @type try_max: int or None > + @param try_timeout: number of seconds to wait for a try to succeed > + before cancelling it, which is only effective if func > returns > + tasks that support cancellation > + @type try_timeout: float or None > + @param overall_timeout: number of seconds to wait for retires to > + succeed before aborting, which is only effective if func > returns > + tasks that support cancellation > + @type overall_timeout: float or None > + @param delay_func: function that takes an int argument > corresponding > + to the number of previous tries and returns a number of > seconds > + to wait before the next try > + @type delay_func: callable > + @param reraise: Reraise the last exception, instead of RetryError > + @type reraise: bool > + @param loop: event loop > + @type loop: EventLoop > + @return: func decorated with retry support > + @rtype: callable > + """ > + return functools.partial(_retry_wrapper, loop, try_max, > try_timeout, > + overall_timeout, delay_func, reraise) > + > + > +def _retry_wrapper(loop, try_max, try_timeout, overall_timeout, > delay_func, > + reraise, func): > + """ > + Create and return a decorated function. > + """ > + return functools.partial(_retry, loop, try_max, try_timeout, > + overall_timeout, delay_func, reraise, func) > + > + > +def _retry(loop, try_max, try_timeout, overall_timeout, delay_func, > + reraise, func, *args, **kwargs): > + """ > + Retry coroutine, used to implement retry decorator. > + > + @return: func return value > + @rtype: asyncio.Future (or compatible) > + """ > + loop = loop or global_event_loop() > + future = loop.create_future() > + _Retry(future, loop, try_max, try_timeout, overall_timeout, > delay_func, > + reraise, functools.partial(func, *args, **kwargs)) > + return future > + > + > +class _Retry(object): > + def __init__(self, future, loop, try_max, try_timeout, > overall_timeout, > + delay_func, reraise, func): > + self._future = future > + self._loop = loop > + self._try_max = try_max > + self._try_timeout = try_timeout > + self._delay_func = delay_func > + self._reraise = reraise > + self._func = func > + > + self._try_timeout_handle = None > + self._overall_timeout_handle = None > + self._overall_timeout_expired = None > + self._tries = 0 > + self._current_task = None > + self._previous_result = None > + > + future.add_done_callback(self._cancel_callback) > + if overall_timeout is not None: > + self._overall_timeout_handle = loop.call_later( > + overall_timeout, self._overall_timeout_ > callback) > + self._begin_try() > + > + def _cancel_callback(self, future): > + if future.cancelled() and self._current_task is not None: > + self._current_task.cancel() > + > + def _try_timeout_callback(self): > + self._try_timeout_handle = None > + self._current_task.cancel() > + > + def _overall_timeout_callback(self): > + self._overall_timeout_handle = None > + self._overall_timeout_expired = True > + self._current_task.cancel() > + self._retry_error() > + > + def _begin_try(self): > + self._tries += 1 > + self._current_task = self._func() > + self._current_task.add_done_callback(self._try_done) > + if self._try_timeout is not None: > + self._try_timeout_handle = self._loop.call_later( > + self._try_timeout, > self._try_timeout_callback) > + > + def _try_done(self, future): > + self._current_task = None > + > + if self._try_timeout_handle is not None: > + self._try_timeout_handle.cancel() > + self._try_timeout_handle = None > + > + if not future.cancelled(): > + # consume exception, so that the event loop > + # exception handler does not report it > + future.exception() > + > + if self._overall_timeout_expired: > + return > + > + try: > + if self._future.cancelled(): > + return > + > + self._previous_result = future > + if not (future.cancelled() or future.exception() > is not None): > + # success > + self._future.set_result(future.result()) > + return > + finally: > + if self._future.done() and > self._overall_timeout_handle is not None: > + self._overall_timeout_handle.cancel() > + self._overall_timeout_handle = None > + > + if self._try_max is not None and self._tries >= > self._try_max: > + self._retry_error() > + return > + > + if self._delay_func is not None: > + delay = self._delay_func(self._tries) > + self._current_task = self._loop.call_later(delay, > self._delay_done) > + return > + > + self._begin_try() > + > + def _delay_done(self): > + self._current_task = None > + > + if self._future.cancelled() or > self._overall_timeout_expired: > + return > + > + self._begin_try() > + > + def _retry_error(self): > + if self._previous_result is None or self._previous_result. > cancelled(): > + cause = TimeoutError() > + else: > + cause = self._previous_result.exception() > + > + if self._reraise: > + e = cause > + else: > + e = RetryError() > + e.__cause__ = cause > + > + self._future.set_exception(e) > -- > 2.13.6 > > >