commit: 5a5ed99cb5a6e8913df2e9ca29b4b4d5c179c20f Author: Zac Medico <zmedico <AT> gentoo <DOT> org> AuthorDate: Sat May 5 23:04:10 2018 +0000 Commit: Zac Medico <zmedico <AT> gentoo <DOT> org> CommitDate: Sun May 6 00:35:44 2018 +0000 URL: https://gitweb.gentoo.org/proj/portage.git/commit/?id=5a5ed99c
RetryTestCase: support ThreadPoolExecutor (bug 654390) In order to support the default asyncio event loop's ThreadPoolExecutor, use a threading.Event instance to support cancellation of tasks. Bug: https://bugs.gentoo.org/654390 pym/portage/tests/util/futures/test_retry.py | 96 +++++++++++++++++++++------- 1 file changed, 74 insertions(+), 22 deletions(-) diff --git a/pym/portage/tests/util/futures/test_retry.py b/pym/portage/tests/util/futures/test_retry.py index cdca7d294..781eac9a1 100644 --- a/pym/portage/tests/util/futures/test_retry.py +++ b/pym/portage/tests/util/futures/test_retry.py @@ -1,8 +1,6 @@ # Copyright 2018 Gentoo Foundation # Distributed under the terms of the GNU General Public License v2 -import functools - try: import threading except ImportError: @@ -28,10 +26,17 @@ class SucceedLater(object): self._succeed_time = monotonic() + duration def __call__(self): + loop = global_event_loop() + result = loop.create_future() remaining = self._succeed_time - monotonic() if remaining > 0: - raise SucceedLaterException('time until success: {} seconds'.format(remaining)) - return 'success' + loop.call_soon_threadsafe(lambda: None if result.done() else + result.set_exception(SucceedLaterException( + 'time until success: {} seconds'.format(remaining)))) + else: + loop.call_soon_threadsafe(lambda: None if result.done() else + result.set_result('success')) + return result class SucceedNeverException(Exception): @@ -43,7 +48,11 @@ class SucceedNever(object): A callable object that never succeeds. """ def __call__(self): - raise SucceedNeverException('expected failure') + loop = global_event_loop() + result = loop.create_future() + loop.call_soon_threadsafe(lambda: None if result.done() else + result.set_exception(SucceedNeverException('expected failure'))) + return result class HangForever(object): @@ -51,14 +60,21 @@ class HangForever(object): A callable object that sleeps forever. """ def __call__(self): - threading.Event().wait() + return global_event_loop().create_future() class RetryTestCase(TestCase): + + def _wrap_coroutine_func(self, coroutine_func): + """ + Derived classes may override this method in order to implement + alternative forms of execution. + """ + return coroutine_func + def testSucceedLater(self): loop = global_event_loop() - func = SucceedLater(1) - func_coroutine = functools.partial(loop.run_in_executor, None, func) + func_coroutine = self._wrap_coroutine_func(SucceedLater(1)) decorator = retry(try_max=9999, delay_func=RandomExponentialBackoff(multiplier=0.1, base=2)) decorated_func = decorator(func_coroutine) @@ -67,8 +83,7 @@ class RetryTestCase(TestCase): def testSucceedNever(self): loop = global_event_loop() - func = SucceedNever() - func_coroutine = functools.partial(loop.run_in_executor, None, func) + func_coroutine = self._wrap_coroutine_func(SucceedNever()) decorator = retry(try_max=4, try_timeout=None, delay_func=RandomExponentialBackoff(multiplier=0.1, base=2)) decorated_func = decorator(func_coroutine) @@ -78,8 +93,7 @@ class RetryTestCase(TestCase): def testSucceedNeverReraise(self): loop = global_event_loop() - func = SucceedNever() - func_coroutine = functools.partial(loop.run_in_executor, None, func) + func_coroutine = self._wrap_coroutine_func(SucceedNever()) decorator = retry(reraise=True, try_max=4, try_timeout=None, delay_func=RandomExponentialBackoff(multiplier=0.1, base=2)) decorated_func = decorator(func_coroutine) @@ -89,8 +103,7 @@ class RetryTestCase(TestCase): def testHangForever(self): loop = global_event_loop() - func = HangForever() - func_coroutine = functools.partial(loop.run_in_executor, None, func) + func_coroutine = self._wrap_coroutine_func(HangForever()) decorator = retry(try_max=2, try_timeout=0.1, delay_func=RandomExponentialBackoff(multiplier=0.1, base=2)) decorated_func = decorator(func_coroutine) @@ -100,8 +113,7 @@ class RetryTestCase(TestCase): def testHangForeverReraise(self): loop = global_event_loop() - func = HangForever() - func_coroutine = functools.partial(loop.run_in_executor, None, func) + func_coroutine = self._wrap_coroutine_func(HangForever()) decorator = retry(reraise=True, try_max=2, try_timeout=0.1, delay_func=RandomExponentialBackoff(multiplier=0.1, base=2)) decorated_func = decorator(func_coroutine) @@ -111,8 +123,7 @@ class RetryTestCase(TestCase): def testCancelRetry(self): loop = global_event_loop() - func = SucceedNever() - func_coroutine = functools.partial(loop.run_in_executor, None, func) + func_coroutine = self._wrap_coroutine_func(SucceedNever()) decorator = retry(try_timeout=0.1, delay_func=RandomExponentialBackoff(multiplier=0.1, base=2)) decorated_func = decorator(func_coroutine) @@ -124,8 +135,7 @@ class RetryTestCase(TestCase): def testOverallTimeoutWithException(self): loop = global_event_loop() - func = SucceedNever() - func_coroutine = functools.partial(loop.run_in_executor, None, func) + func_coroutine = self._wrap_coroutine_func(SucceedNever()) decorator = retry(try_timeout=0.1, overall_timeout=0.3, delay_func=RandomExponentialBackoff(multiplier=0.1, base=2)) decorated_func = decorator(func_coroutine) @@ -136,11 +146,53 @@ class RetryTestCase(TestCase): def testOverallTimeoutWithTimeoutError(self): loop = global_event_loop() # results in TimeoutError because it hangs forever - func = HangForever() - func_coroutine = functools.partial(loop.run_in_executor, None, func) + func_coroutine = self._wrap_coroutine_func(HangForever()) decorator = retry(try_timeout=0.1, overall_timeout=0.3, delay_func=RandomExponentialBackoff(multiplier=0.1, base=2)) decorated_func = decorator(func_coroutine) done, pending = loop.run_until_complete(asyncio.wait([decorated_func()], loop=loop)) self.assertEqual(len(done), 1) self.assertTrue(isinstance(done.pop().exception().__cause__, asyncio.TimeoutError)) + + +class RetryExecutorTestCase(RetryTestCase): + """ + Wrap each coroutine function with AbstractEventLoop.run_in_executor, + in order to test the event loop's default executor. The executor + may use either a thread or a subprocess, and either case is + automatically detected and handled. + """ + def _wrap_coroutine_func(self, coroutine_func): + parent_loop = global_event_loop() + + # Since ThreadPoolExecutor does not propagate cancellation of a + # parent_future to the underlying coroutine, use kill_switch to + # propagate task cancellation to wrapper, so that HangForever's + # thread returns when retry eventually cancels parent_future. + def wrapper(kill_switch): + loop = global_event_loop() + if loop is parent_loop: + # thread in main process + result = coroutine_func() + event = threading.Event() + loop.call_soon_threadsafe(result.add_done_callback, + lambda result: event.set()) + loop.call_soon_threadsafe(kill_switch.add_done_callback, + lambda kill_switch: event.set()) + event.wait() + return result.result() + else: + # child process + return loop.run_until_complete(coroutine_func()) + + def execute_wrapper(): + kill_switch = parent_loop.create_future() + parent_future = asyncio.ensure_future( + parent_loop.run_in_executor(None, wrapper, kill_switch), + loop=parent_loop) + parent_future.add_done_callback( + lambda parent_future: None if kill_switch.done() + else kill_switch.set_result(None)) + return parent_future + + return execute_wrapper