commit:     1aa9a71731f3ab05e10190493956c70998abf12a
Author:     Zac Medico <zmedico <AT> gentoo <DOT> org>
AuthorDate: Fri Mar 16 19:37:54 2018 +0000
Commit:     Zac Medico <zmedico <AT> gentoo <DOT> org>
CommitDate: Mon Apr  2 16:53:24 2018 +0000
URL:        https://gitweb.gentoo.org/proj/portage.git/commit/?id=1aa9a717

Add retry decorator (API inspired by tenacity)

This decorator will be useful for retrying asynchronous
operations, such as gpg key refresh (bug 649276). The
API is inspired by tenacity, but is simpler. Only
asynchronous functions (like @asyncio.coroutine functions)
are supported. In order to retry a synchronous function,
first convert it to an asynchronous function as follows:

    asynchronous_func = functools.partial(
        loop.run_in_executor, None, synchronous_func)

Bug: https://bugs.gentoo.org/649276
See: https://github.com/jd/tenacity
Reviewed-by: Alec Warner <antarus <AT> gentoo.org>

 pym/portage/tests/util/futures/test_retry.py | 147 +++++++++++++++++++++
 pym/portage/util/futures/futures.py          |   6 +
 pym/portage/util/futures/retry.py            | 183 +++++++++++++++++++++++++++
 3 files changed, 336 insertions(+)

diff --git a/pym/portage/tests/util/futures/test_retry.py 
b/pym/portage/tests/util/futures/test_retry.py
new file mode 100644
index 000000000..7641e4e92
--- /dev/null
+++ b/pym/portage/tests/util/futures/test_retry.py
@@ -0,0 +1,147 @@
+# Copyright 2018 Gentoo Foundation
+# Distributed under the terms of the GNU General Public License v2
+
+import functools
+
+try:
+       import threading
+except ImportError:
+       import dummy_threading as threading
+
+from portage.tests import TestCase
+from portage.util._eventloop.global_event_loop import global_event_loop
+from portage.util.backoff import RandomExponentialBackoff
+from portage.util.futures.futures import TimeoutError
+from portage.util.futures.retry import retry
+from portage.util.futures.wait import wait
+from portage.util.monotonic import monotonic
+
+
+class SucceedLaterException(Exception):
+       pass
+
+
+class SucceedLater(object):
+       """
+       A callable object that succeeds some duration of time has passed.
+       """
+       def __init__(self, duration):
+               self._succeed_time = monotonic() + duration
+
+       def __call__(self):
+               remaining = self._succeed_time - monotonic()
+               if remaining > 0:
+                       raise SucceedLaterException('time until success: {} 
seconds'.format(remaining))
+               return 'success'
+
+
+class SucceedNeverException(Exception):
+       pass
+
+
+class SucceedNever(object):
+       """
+       A callable object that never succeeds.
+       """
+       def __call__(self):
+               raise SucceedNeverException('expected failure')
+
+
+class HangForever(object):
+       """
+       A callable object that sleeps forever.
+       """
+       def __call__(self):
+               threading.Event().wait()
+
+
+class RetryTestCase(TestCase):
+       def testSucceedLater(self):
+               loop = global_event_loop()
+               func = SucceedLater(1)
+               func_coroutine = functools.partial(loop.run_in_executor, None, 
func)
+               decorator = retry(try_max=9999,
+                       delay_func=RandomExponentialBackoff(multiplier=0.1, 
base=2))
+               decorated_func = decorator(func_coroutine)
+               result = loop.run_until_complete(decorated_func())
+               self.assertEqual(result, 'success')
+
+       def testSucceedNever(self):
+               loop = global_event_loop()
+               func = SucceedNever()
+               func_coroutine = functools.partial(loop.run_in_executor, None, 
func)
+               decorator = retry(try_max=4, try_timeout=None,
+                       delay_func=RandomExponentialBackoff(multiplier=0.1, 
base=2))
+               decorated_func = decorator(func_coroutine)
+               done, pending = 
loop.run_until_complete(wait([decorated_func()]))
+               self.assertEqual(len(done), 1)
+               self.assertTrue(isinstance(done[0].exception().__cause__, 
SucceedNeverException))
+
+       def testSucceedNeverReraise(self):
+               loop = global_event_loop()
+               func = SucceedNever()
+               func_coroutine = functools.partial(loop.run_in_executor, None, 
func)
+               decorator = retry(reraise=True, try_max=4, try_timeout=None,
+                       delay_func=RandomExponentialBackoff(multiplier=0.1, 
base=2))
+               decorated_func = decorator(func_coroutine)
+               done, pending = 
loop.run_until_complete(wait([decorated_func()]))
+               self.assertEqual(len(done), 1)
+               self.assertTrue(isinstance(done[0].exception(), 
SucceedNeverException))
+
+       def testHangForever(self):
+               loop = global_event_loop()
+               func = HangForever()
+               func_coroutine = functools.partial(loop.run_in_executor, None, 
func)
+               decorator = retry(try_max=2, try_timeout=0.1,
+                       delay_func=RandomExponentialBackoff(multiplier=0.1, 
base=2))
+               decorated_func = decorator(func_coroutine)
+               done, pending = 
loop.run_until_complete(wait([decorated_func()]))
+               self.assertEqual(len(done), 1)
+               self.assertTrue(isinstance(done[0].exception().__cause__, 
TimeoutError))
+
+       def testHangForeverReraise(self):
+               loop = global_event_loop()
+               func = HangForever()
+               func_coroutine = functools.partial(loop.run_in_executor, None, 
func)
+               decorator = retry(reraise=True, try_max=2, try_timeout=0.1,
+                       delay_func=RandomExponentialBackoff(multiplier=0.1, 
base=2))
+               decorated_func = decorator(func_coroutine)
+               done, pending = 
loop.run_until_complete(wait([decorated_func()]))
+               self.assertEqual(len(done), 1)
+               self.assertTrue(isinstance(done[0].exception(), TimeoutError))
+
+       def testCancelRetry(self):
+               loop = global_event_loop()
+               func = SucceedNever()
+               func_coroutine = functools.partial(loop.run_in_executor, None, 
func)
+               decorator = retry(try_timeout=0.1,
+                       delay_func=RandomExponentialBackoff(multiplier=0.1, 
base=2))
+               decorated_func = decorator(func_coroutine)
+               future = decorated_func()
+               loop.call_later(0.3, future.cancel)
+               done, pending = loop.run_until_complete(wait([future]))
+               self.assertEqual(len(done), 1)
+               self.assertTrue(done[0].cancelled())
+
+       def testOverallTimeoutWithException(self):
+               loop = global_event_loop()
+               func = SucceedNever()
+               func_coroutine = functools.partial(loop.run_in_executor, None, 
func)
+               decorator = retry(try_timeout=0.1, overall_timeout=0.3,
+                       delay_func=RandomExponentialBackoff(multiplier=0.1, 
base=2))
+               decorated_func = decorator(func_coroutine)
+               done, pending = 
loop.run_until_complete(wait([decorated_func()]))
+               self.assertEqual(len(done), 1)
+               self.assertTrue(isinstance(done[0].exception().__cause__, 
SucceedNeverException))
+
+       def testOverallTimeoutWithTimeoutError(self):
+               loop = global_event_loop()
+               # results in TimeoutError because it hangs forever
+               func = HangForever()
+               func_coroutine = functools.partial(loop.run_in_executor, None, 
func)
+               decorator = retry(try_timeout=0.1, overall_timeout=0.3,
+                       delay_func=RandomExponentialBackoff(multiplier=0.1, 
base=2))
+               decorated_func = decorator(func_coroutine)
+               done, pending = 
loop.run_until_complete(wait([decorated_func()]))
+               self.assertEqual(len(done), 1)
+               self.assertTrue(isinstance(done[0].exception().__cause__, 
TimeoutError))

diff --git a/pym/portage/util/futures/futures.py 
b/pym/portage/util/futures/futures.py
index dcf593c01..cd56a27eb 100644
--- a/pym/portage/util/futures/futures.py
+++ b/pym/portage/util/futures/futures.py
@@ -11,6 +11,7 @@ __all__ = (
        'CancelledError',
        'Future',
        'InvalidStateError',
+       'TimeoutError',
 )
 
 try:
@@ -18,6 +19,7 @@ try:
                CancelledError,
                Future,
                InvalidStateError,
+               TimeoutError,
        )
 except ImportError:
 
@@ -30,6 +32,10 @@ except ImportError:
                def __init__(self):
                        Error.__init__(self, "cancelled")
 
+       class TimeoutError(Error):
+               def __init__(self):
+                       Error.__init__(self, "timed out")
+
        class InvalidStateError(Error):
                pass
 

diff --git a/pym/portage/util/futures/retry.py 
b/pym/portage/util/futures/retry.py
new file mode 100644
index 000000000..902ed163a
--- /dev/null
+++ b/pym/portage/util/futures/retry.py
@@ -0,0 +1,183 @@
+# Copyright 2018 Gentoo Foundation
+# Distributed under the terms of the GNU General Public License v2
+
+__all__ = (
+       'RetryError',
+       'retry',
+)
+
+import functools
+
+from portage.exception import PortageException
+from portage.util._eventloop.global_event_loop import global_event_loop
+from portage.util.futures.futures import TimeoutError
+
+
+class RetryError(PortageException):
+       """Raised when retry fails."""
+       def __init__(self):
+               PortageException.__init__(self, "retry error")
+
+
+def retry(try_max=None, try_timeout=None, overall_timeout=None,
+       delay_func=None, reraise=False, loop=None):
+       """
+       Create and return a retry decorator. The decorator is intended to
+       operate only on a coroutine function.
+
+       @param try_max: maximum number of tries
+       @type try_max: int or None
+       @param try_timeout: number of seconds to wait for a try to succeed
+               before cancelling it, which is only effective if func returns
+               tasks that support cancellation
+       @type try_timeout: float or None
+       @param overall_timeout: number of seconds to wait for retires to
+               succeed before aborting, which is only effective if func returns
+               tasks that support cancellation
+       @type overall_timeout: float or None
+       @param delay_func: function that takes an int argument corresponding
+               to the number of previous tries and returns a number of seconds
+               to wait before the next try
+       @type delay_func: callable
+       @param reraise: Reraise the last exception, instead of RetryError
+       @type reraise: bool
+       @param loop: event loop
+       @type loop: EventLoop
+       @return: func decorated with retry support
+       @rtype: callable
+       """
+       return functools.partial(_retry_wrapper, loop, try_max, try_timeout,
+               overall_timeout, delay_func, reraise)
+
+
+def _retry_wrapper(loop, try_max, try_timeout, overall_timeout, delay_func,
+       reraise, func):
+       """
+       Create and return a decorated function.
+       """
+       return functools.partial(_retry, loop, try_max, try_timeout,
+               overall_timeout, delay_func, reraise, func)
+
+
+def _retry(loop, try_max, try_timeout, overall_timeout, delay_func,
+       reraise, func, *args, **kwargs):
+       """
+       Retry coroutine, used to implement retry decorator.
+
+       @return: func return value
+       @rtype: asyncio.Future (or compatible)
+       """
+       loop = loop or global_event_loop()
+       future = loop.create_future()
+       _Retry(future, loop, try_max, try_timeout, overall_timeout, delay_func,
+               reraise, functools.partial(func, *args, **kwargs))
+       return future
+
+
+class _Retry(object):
+       def __init__(self, future, loop, try_max, try_timeout, overall_timeout,
+               delay_func, reraise, func):
+               self._future = future
+               self._loop = loop
+               self._try_max = try_max
+               self._try_timeout = try_timeout
+               self._delay_func = delay_func
+               self._reraise = reraise
+               self._func = func
+
+               self._try_timeout_handle = None
+               self._overall_timeout_handle = None
+               self._overall_timeout_expired = None
+               self._tries = 0
+               self._current_task = None
+               self._previous_result = None
+
+               future.add_done_callback(self._cancel_callback)
+               if overall_timeout is not None:
+                       self._overall_timeout_handle = loop.call_later(
+                               overall_timeout, self._overall_timeout_callback)
+               self._begin_try()
+
+       def _cancel_callback(self, future):
+               if future.cancelled() and self._current_task is not None:
+                       self._current_task.cancel()
+
+       def _try_timeout_callback(self):
+               self._try_timeout_handle = None
+               self._current_task.cancel()
+
+       def _overall_timeout_callback(self):
+               self._overall_timeout_handle = None
+               self._overall_timeout_expired = True
+               self._current_task.cancel()
+               self._retry_error()
+
+       def _begin_try(self):
+               self._tries += 1
+               self._current_task = self._func()
+               self._current_task.add_done_callback(self._try_done)
+               if self._try_timeout is not None:
+                       self._try_timeout_handle = self._loop.call_later(
+                               self._try_timeout, self._try_timeout_callback)
+
+       def _try_done(self, future):
+               self._current_task = None
+
+               if self._try_timeout_handle is not None:
+                       self._try_timeout_handle.cancel()
+                       self._try_timeout_handle = None
+
+               if not future.cancelled():
+                       # consume exception, so that the event loop
+                       # exception handler does not report it
+                       future.exception()
+
+               if self._overall_timeout_expired:
+                       return
+
+               try:
+                       if self._future.cancelled():
+                               return
+
+                       self._previous_result = future
+                       if not (future.cancelled() or future.exception() is not 
None):
+                               # success
+                               self._future.set_result(future.result())
+                               return
+               finally:
+                       if self._future.done() and self._overall_timeout_handle 
is not None:
+                               self._overall_timeout_handle.cancel()
+                               self._overall_timeout_handle = None
+
+               if self._try_max is not None and self._tries >= self._try_max:
+                       self._retry_error()
+                       return
+
+               if self._delay_func is not None:
+                       delay = self._delay_func(self._tries)
+                       self._current_task = self._loop.call_later(delay, 
self._delay_done)
+                       return
+
+               self._begin_try()
+
+       def _delay_done(self):
+               self._current_task = None
+
+               if self._future.cancelled() or self._overall_timeout_expired:
+                       return
+
+               self._begin_try()
+
+       def _retry_error(self):
+               if self._previous_result is None or 
self._previous_result.cancelled():
+                       cause = TimeoutError()
+               else:
+                       cause = self._previous_result.exception()
+
+               if self._reraise:
+                       e = cause
+               else:
+                       e = RetryError()
+                       e.__cause__ = cause
+
+               self._future.set_exception(e)

Reply via email to