also lgtm.

On Sat, Mar 31, 2018 at 10:46 PM, Zac Medico <zmed...@gentoo.org> wrote:

> This decorator will be useful for retrying asynchronous
> operations, such as gpg key refresh (bug 649276). The
> API is inspired by tenacity, but is simpler. Only
> asynchronous functions (like @asyncio.coroutine functions)
> are supported. In order to retry a synchronous function,
> first convert it to an asynchronous function as follows:
>
>     asynchronous_func = functools.partial(
>         loop.run_in_executor, None, synchronous_func)
>
> Bug: https://bugs.gentoo.org/649276
> See: https://github.com/jd/tenacity
> ---
>  pym/portage/tests/util/futures/test_retry.py | 147 ++++++++++++++++++++++
>  pym/portage/util/futures/futures.py          |   6 +
>  pym/portage/util/futures/retry.py            | 178
> +++++++++++++++++++++++++++
>  3 files changed, 331 insertions(+)
>  create mode 100644 pym/portage/tests/util/futures/test_retry.py
>  create mode 100644 pym/portage/util/futures/retry.py
>
> diff --git a/pym/portage/tests/util/futures/test_retry.py
> b/pym/portage/tests/util/futures/test_retry.py
> new file mode 100644
> index 000000000..7641e4e92
> --- /dev/null
> +++ b/pym/portage/tests/util/futures/test_retry.py
> @@ -0,0 +1,147 @@
> +# Copyright 2018 Gentoo Foundation
> +# Distributed under the terms of the GNU General Public License v2
> +
> +import functools
> +
> +try:
> +       import threading
> +except ImportError:
> +       import dummy_threading as threading
> +
> +from portage.tests import TestCase
> +from portage.util._eventloop.global_event_loop import global_event_loop
> +from portage.util.backoff import RandomExponentialBackoff
> +from portage.util.futures.futures import TimeoutError
> +from portage.util.futures.retry import retry
> +from portage.util.futures.wait import wait
> +from portage.util.monotonic import monotonic
> +
> +
> +class SucceedLaterException(Exception):
> +       pass
> +
> +
> +class SucceedLater(object):
> +       """
> +       A callable object that succeeds some duration of time has passed.
> +       """
> +       def __init__(self, duration):
> +               self._succeed_time = monotonic() + duration
> +
> +       def __call__(self):
> +               remaining = self._succeed_time - monotonic()
> +               if remaining > 0:
> +                       raise SucceedLaterException('time until success:
> {} seconds'.format(remaining))
> +               return 'success'
> +
> +
> +class SucceedNeverException(Exception):
> +       pass
> +
> +
> +class SucceedNever(object):
> +       """
> +       A callable object that never succeeds.
> +       """
> +       def __call__(self):
> +               raise SucceedNeverException('expected failure')
> +
> +
> +class HangForever(object):
> +       """
> +       A callable object that sleeps forever.
> +       """
> +       def __call__(self):
> +               threading.Event().wait()
> +
> +
> +class RetryTestCase(TestCase):
> +       def testSucceedLater(self):
> +               loop = global_event_loop()
> +               func = SucceedLater(1)
> +               func_coroutine = functools.partial(loop.run_in_executor,
> None, func)
> +               decorator = retry(try_max=9999,
> +                       delay_func=RandomExponentialBackoff(multiplier=0.1,
> base=2))
> +               decorated_func = decorator(func_coroutine)
> +               result = loop.run_until_complete(decorated_func())
> +               self.assertEqual(result, 'success')
> +
> +       def testSucceedNever(self):
> +               loop = global_event_loop()
> +               func = SucceedNever()
> +               func_coroutine = functools.partial(loop.run_in_executor,
> None, func)
> +               decorator = retry(try_max=4, try_timeout=None,
> +                       delay_func=RandomExponentialBackoff(multiplier=0.1,
> base=2))
> +               decorated_func = decorator(func_coroutine)
> +               done, pending = loop.run_until_complete(wait([
> decorated_func()]))
> +               self.assertEqual(len(done), 1)
> +               self.assertTrue(isinstance(done[0].exception().__cause__,
> SucceedNeverException))
> +
> +       def testSucceedNeverReraise(self):
> +               loop = global_event_loop()
> +               func = SucceedNever()
> +               func_coroutine = functools.partial(loop.run_in_executor,
> None, func)
> +               decorator = retry(reraise=True, try_max=4,
> try_timeout=None,
> +                       delay_func=RandomExponentialBackoff(multiplier=0.1,
> base=2))
> +               decorated_func = decorator(func_coroutine)
> +               done, pending = loop.run_until_complete(wait([
> decorated_func()]))
> +               self.assertEqual(len(done), 1)
> +               self.assertTrue(isinstance(done[0].exception(),
> SucceedNeverException))
> +
> +       def testHangForever(self):
> +               loop = global_event_loop()
> +               func = HangForever()
> +               func_coroutine = functools.partial(loop.run_in_executor,
> None, func)
> +               decorator = retry(try_max=2, try_timeout=0.1,
> +                       delay_func=RandomExponentialBackoff(multiplier=0.1,
> base=2))
> +               decorated_func = decorator(func_coroutine)
> +               done, pending = loop.run_until_complete(wait([
> decorated_func()]))
> +               self.assertEqual(len(done), 1)
> +               self.assertTrue(isinstance(done[0].exception().__cause__,
> TimeoutError))
> +
> +       def testHangForeverReraise(self):
> +               loop = global_event_loop()
> +               func = HangForever()
> +               func_coroutine = functools.partial(loop.run_in_executor,
> None, func)
> +               decorator = retry(reraise=True, try_max=2, try_timeout=0.1,
> +                       delay_func=RandomExponentialBackoff(multiplier=0.1,
> base=2))
> +               decorated_func = decorator(func_coroutine)
> +               done, pending = loop.run_until_complete(wait([
> decorated_func()]))
> +               self.assertEqual(len(done), 1)
> +               self.assertTrue(isinstance(done[0].exception(),
> TimeoutError))
> +
> +       def testCancelRetry(self):
> +               loop = global_event_loop()
> +               func = SucceedNever()
> +               func_coroutine = functools.partial(loop.run_in_executor,
> None, func)
> +               decorator = retry(try_timeout=0.1,
> +                       delay_func=RandomExponentialBackoff(multiplier=0.1,
> base=2))
> +               decorated_func = decorator(func_coroutine)
> +               future = decorated_func()
> +               loop.call_later(0.3, future.cancel)
> +               done, pending = loop.run_until_complete(wait([future]))
> +               self.assertEqual(len(done), 1)
> +               self.assertTrue(done[0].cancelled())
> +
> +       def testOverallTimeoutWithException(self):
> +               loop = global_event_loop()
> +               func = SucceedNever()
> +               func_coroutine = functools.partial(loop.run_in_executor,
> None, func)
> +               decorator = retry(try_timeout=0.1, overall_timeout=0.3,
> +                       delay_func=RandomExponentialBackoff(multiplier=0.1,
> base=2))
> +               decorated_func = decorator(func_coroutine)
> +               done, pending = loop.run_until_complete(wait([
> decorated_func()]))
> +               self.assertEqual(len(done), 1)
> +               self.assertTrue(isinstance(done[0].exception().__cause__,
> SucceedNeverException))
> +
> +       def testOverallTimeoutWithTimeoutError(self):
> +               loop = global_event_loop()
> +               # results in TimeoutError because it hangs forever
> +               func = HangForever()
> +               func_coroutine = functools.partial(loop.run_in_executor,
> None, func)
> +               decorator = retry(try_timeout=0.1, overall_timeout=0.3,
> +                       delay_func=RandomExponentialBackoff(multiplier=0.1,
> base=2))
> +               decorated_func = decorator(func_coroutine)
> +               done, pending = loop.run_until_complete(wait([
> decorated_func()]))
> +               self.assertEqual(len(done), 1)
> +               self.assertTrue(isinstance(done[0].exception().__cause__,
> TimeoutError))
> diff --git a/pym/portage/util/futures/futures.py
> b/pym/portage/util/futures/futures.py
> index dcf593c01..cd56a27eb 100644
> --- a/pym/portage/util/futures/futures.py
> +++ b/pym/portage/util/futures/futures.py
> @@ -11,6 +11,7 @@ __all__ = (
>         'CancelledError',
>         'Future',
>         'InvalidStateError',
> +       'TimeoutError',
>  )
>
>  try:
> @@ -18,6 +19,7 @@ try:
>                 CancelledError,
>                 Future,
>                 InvalidStateError,
> +               TimeoutError,
>         )
>  except ImportError:
>
> @@ -30,6 +32,10 @@ except ImportError:
>                 def __init__(self):
>                         Error.__init__(self, "cancelled")
>
> +       class TimeoutError(Error):
> +               def __init__(self):
> +                       Error.__init__(self, "timed out")
> +
>         class InvalidStateError(Error):
>                 pass
>
> diff --git a/pym/portage/util/futures/retry.py b/pym/portage/util/futures/
> retry.py
> new file mode 100644
> index 000000000..2caf1bbac
> --- /dev/null
> +++ b/pym/portage/util/futures/retry.py
> @@ -0,0 +1,178 @@
> +# Copyright 2018 Gentoo Foundation
> +# Distributed under the terms of the GNU General Public License v2
> +
> +import functools
> +
> +from portage.exception import PortageException
> +from portage.util._eventloop.global_event_loop import global_event_loop
> +from portage.util.futures.futures import TimeoutError
> +
> +
> +class RetryError(PortageException):
> +       """Raised when retry fails."""
> +       def __init__(self):
> +               PortageException.__init__(self, "retry error")
> +
> +
> +def retry(try_max=None, try_timeout=None, overall_timeout=None,
> +       delay_func=None, reraise=False, loop=None):
> +       """
> +       Create and return a retry decorator. The decorator is intended to
> +       operate only on a coroutine function.
> +
> +       @param try_max: maximum number of tries
> +       @type try_max: int or None
> +       @param try_timeout: number of seconds to wait for a try to succeed
> +               before cancelling it, which is only effective if func
> returns
> +               tasks that support cancellation
> +       @type try_timeout: float or None
> +       @param overall_timeout: number of seconds to wait for retires to
> +               succeed before aborting, which is only effective if func
> returns
> +               tasks that support cancellation
> +       @type overall_timeout: float or None
> +       @param delay_func: function that takes an int argument
> corresponding
> +               to the number of previous tries and returns a number of
> seconds
> +               to wait before the next try
> +       @type delay_func: callable
> +       @param reraise: Reraise the last exception, instead of RetryError
> +       @type reraise: bool
> +       @param loop: event loop
> +       @type loop: EventLoop
> +       @return: func decorated with retry support
> +       @rtype: callable
> +       """
> +       return functools.partial(_retry_wrapper, loop, try_max,
> try_timeout,
> +               overall_timeout, delay_func, reraise)
> +
> +
> +def _retry_wrapper(loop, try_max, try_timeout, overall_timeout,
> delay_func,
> +       reraise, func):
> +       """
> +       Create and return a decorated function.
> +       """
> +       return functools.partial(_retry, loop, try_max, try_timeout,
> +               overall_timeout, delay_func, reraise, func)
> +
> +
> +def _retry(loop, try_max, try_timeout, overall_timeout, delay_func,
> +       reraise, func, *args, **kwargs):
> +       """
> +       Retry coroutine, used to implement retry decorator.
> +
> +       @return: func return value
> +       @rtype: asyncio.Future (or compatible)
> +       """
> +       loop = loop or global_event_loop()
> +       future = loop.create_future()
> +       _Retry(future, loop, try_max, try_timeout, overall_timeout,
> delay_func,
> +               reraise, functools.partial(func, *args, **kwargs))
> +       return future
> +
> +
> +class _Retry(object):
> +       def __init__(self, future, loop, try_max, try_timeout,
> overall_timeout,
> +               delay_func, reraise, func):
> +               self._future = future
> +               self._loop = loop
> +               self._try_max = try_max
> +               self._try_timeout = try_timeout
> +               self._delay_func = delay_func
> +               self._reraise = reraise
> +               self._func = func
> +
> +               self._try_timeout_handle = None
> +               self._overall_timeout_handle = None
> +               self._overall_timeout_expired = None
> +               self._tries = 0
> +               self._current_task = None
> +               self._previous_result = None
> +
> +               future.add_done_callback(self._cancel_callback)
> +               if overall_timeout is not None:
> +                       self._overall_timeout_handle = loop.call_later(
> +                               overall_timeout, self._overall_timeout_
> callback)
> +               self._begin_try()
> +
> +       def _cancel_callback(self, future):
> +               if future.cancelled() and self._current_task is not None:
> +                       self._current_task.cancel()
> +
> +       def _try_timeout_callback(self):
> +               self._try_timeout_handle = None
> +               self._current_task.cancel()
> +
> +       def _overall_timeout_callback(self):
> +               self._overall_timeout_handle = None
> +               self._overall_timeout_expired = True
> +               self._current_task.cancel()
> +               self._retry_error()
> +
> +       def _begin_try(self):
> +               self._tries += 1
> +               self._current_task = self._func()
> +               self._current_task.add_done_callback(self._try_done)
> +               if self._try_timeout is not None:
> +                       self._try_timeout_handle = self._loop.call_later(
> +                               self._try_timeout,
> self._try_timeout_callback)
> +
> +       def _try_done(self, future):
> +               self._current_task = None
> +
> +               if self._try_timeout_handle is not None:
> +                       self._try_timeout_handle.cancel()
> +                       self._try_timeout_handle = None
> +
> +               if not future.cancelled():
> +                       # consume exception, so that the event loop
> +                       # exception handler does not report it
> +                       future.exception()
> +
> +               if self._overall_timeout_expired:
> +                       return
> +
> +               try:
> +                       if self._future.cancelled():
> +                               return
> +
> +                       self._previous_result = future
> +                       if not (future.cancelled() or future.exception()
> is not None):
> +                               # success
> +                               self._future.set_result(future.result())
> +                               return
> +               finally:
> +                       if self._future.done() and
> self._overall_timeout_handle is not None:
> +                               self._overall_timeout_handle.cancel()
> +                               self._overall_timeout_handle = None
> +
> +               if self._try_max is not None and self._tries >=
> self._try_max:
> +                       self._retry_error()
> +                       return
> +
> +               if self._delay_func is not None:
> +                       delay = self._delay_func(self._tries)
> +                       self._current_task = self._loop.call_later(delay,
> self._delay_done)
> +                       return
> +
> +               self._begin_try()
> +
> +       def _delay_done(self):
> +               self._current_task = None
> +
> +               if self._future.cancelled() or
> self._overall_timeout_expired:
> +                       return
> +
> +               self._begin_try()
> +
> +       def _retry_error(self):
> +               if self._previous_result is None or self._previous_result.
> cancelled():
> +                       cause = TimeoutError()
> +               else:
> +                       cause = self._previous_result.exception()
> +
> +               if self._reraise:
> +                       e = cause
> +               else:
> +                       e = RetryError()
> +                       e.__cause__ = cause
> +
> +               self._future.set_exception(e)
> --
> 2.13.6
>
>
>

Reply via email to