commit: ece99145a48157b60212d511c8053f8f6c0532a9
Author: Zac Medico <zmedico <AT> gentoo <DOT> org>
AuthorDate: Mon Apr 16 16:27:39 2018 +0000
Commit: Zac Medico <zmedico <AT> gentoo <DOT> org>
CommitDate: Mon Apr 16 16:42:58 2018 +0000
URL: https://gitweb.gentoo.org/proj/portage.git/commit/?id=ece99145
Use asyncio.wait from standard library when available
pym/portage/tests/util/futures/test_retry.py | 46 +++++++++++-----------
.../futures/{_asyncio.py => _asyncio/__init__.py} | 11 ++++++
.../util/futures/{wait.py => _asyncio/tasks.py} | 17 ++++++--
pym/portage/util/futures/iter_completed.py | 5 ++-
4 files changed, 51 insertions(+), 28 deletions(-)
diff --git a/pym/portage/tests/util/futures/test_retry.py
b/pym/portage/tests/util/futures/test_retry.py
index 7641e4e92..409f50971 100644
--- a/pym/portage/tests/util/futures/test_retry.py
+++ b/pym/portage/tests/util/futures/test_retry.py
@@ -11,9 +11,9 @@ except ImportError:
from portage.tests import TestCase
from portage.util._eventloop.global_event_loop import global_event_loop
from portage.util.backoff import RandomExponentialBackoff
+from portage.util.futures import asyncio
from portage.util.futures.futures import TimeoutError
from portage.util.futures.retry import retry
-from portage.util.futures.wait import wait
from portage.util.monotonic import monotonic
@@ -57,7 +57,7 @@ class HangForever(object):
class RetryTestCase(TestCase):
def testSucceedLater(self):
- loop = global_event_loop()
+ loop = global_event_loop()._asyncio_wrapper
func = SucceedLater(1)
func_coroutine = functools.partial(loop.run_in_executor, None,
func)
decorator = retry(try_max=9999,
@@ -67,51 +67,51 @@ class RetryTestCase(TestCase):
self.assertEqual(result, 'success')
def testSucceedNever(self):
- loop = global_event_loop()
+ loop = global_event_loop()._asyncio_wrapper
func = SucceedNever()
func_coroutine = functools.partial(loop.run_in_executor, None,
func)
decorator = retry(try_max=4, try_timeout=None,
delay_func=RandomExponentialBackoff(multiplier=0.1,
base=2))
decorated_func = decorator(func_coroutine)
- done, pending =
loop.run_until_complete(wait([decorated_func()]))
+ done, pending =
loop.run_until_complete(asyncio.wait([decorated_func()], loop=loop))
self.assertEqual(len(done), 1)
- self.assertTrue(isinstance(done[0].exception().__cause__,
SucceedNeverException))
+ self.assertTrue(isinstance(done.pop().exception().__cause__,
SucceedNeverException))
def testSucceedNeverReraise(self):
- loop = global_event_loop()
+ loop = global_event_loop()._asyncio_wrapper
func = SucceedNever()
func_coroutine = functools.partial(loop.run_in_executor, None,
func)
decorator = retry(reraise=True, try_max=4, try_timeout=None,
delay_func=RandomExponentialBackoff(multiplier=0.1,
base=2))
decorated_func = decorator(func_coroutine)
- done, pending =
loop.run_until_complete(wait([decorated_func()]))
+ done, pending =
loop.run_until_complete(asyncio.wait([decorated_func()], loop=loop))
self.assertEqual(len(done), 1)
- self.assertTrue(isinstance(done[0].exception(),
SucceedNeverException))
+ self.assertTrue(isinstance(done.pop().exception(),
SucceedNeverException))
def testHangForever(self):
- loop = global_event_loop()
+ loop = global_event_loop()._asyncio_wrapper
func = HangForever()
func_coroutine = functools.partial(loop.run_in_executor, None,
func)
decorator = retry(try_max=2, try_timeout=0.1,
delay_func=RandomExponentialBackoff(multiplier=0.1,
base=2))
decorated_func = decorator(func_coroutine)
- done, pending =
loop.run_until_complete(wait([decorated_func()]))
+ done, pending =
loop.run_until_complete(asyncio.wait([decorated_func()], loop=loop))
self.assertEqual(len(done), 1)
- self.assertTrue(isinstance(done[0].exception().__cause__,
TimeoutError))
+ self.assertTrue(isinstance(done.pop().exception().__cause__,
TimeoutError))
def testHangForeverReraise(self):
- loop = global_event_loop()
+ loop = global_event_loop()._asyncio_wrapper
func = HangForever()
func_coroutine = functools.partial(loop.run_in_executor, None,
func)
decorator = retry(reraise=True, try_max=2, try_timeout=0.1,
delay_func=RandomExponentialBackoff(multiplier=0.1,
base=2))
decorated_func = decorator(func_coroutine)
- done, pending =
loop.run_until_complete(wait([decorated_func()]))
+ done, pending =
loop.run_until_complete(asyncio.wait([decorated_func()], loop=loop))
self.assertEqual(len(done), 1)
- self.assertTrue(isinstance(done[0].exception(), TimeoutError))
+ self.assertTrue(isinstance(done.pop().exception(),
TimeoutError))
def testCancelRetry(self):
- loop = global_event_loop()
+ loop = global_event_loop()._asyncio_wrapper
func = SucceedNever()
func_coroutine = functools.partial(loop.run_in_executor, None,
func)
decorator = retry(try_timeout=0.1,
@@ -119,29 +119,29 @@ class RetryTestCase(TestCase):
decorated_func = decorator(func_coroutine)
future = decorated_func()
loop.call_later(0.3, future.cancel)
- done, pending = loop.run_until_complete(wait([future]))
+ done, pending = loop.run_until_complete(asyncio.wait([future],
loop=loop))
self.assertEqual(len(done), 1)
- self.assertTrue(done[0].cancelled())
+ self.assertTrue(done.pop().cancelled())
def testOverallTimeoutWithException(self):
- loop = global_event_loop()
+ loop = global_event_loop()._asyncio_wrapper
func = SucceedNever()
func_coroutine = functools.partial(loop.run_in_executor, None,
func)
decorator = retry(try_timeout=0.1, overall_timeout=0.3,
delay_func=RandomExponentialBackoff(multiplier=0.1,
base=2))
decorated_func = decorator(func_coroutine)
- done, pending =
loop.run_until_complete(wait([decorated_func()]))
+ done, pending =
loop.run_until_complete(asyncio.wait([decorated_func()], loop=loop))
self.assertEqual(len(done), 1)
- self.assertTrue(isinstance(done[0].exception().__cause__,
SucceedNeverException))
+ self.assertTrue(isinstance(done.pop().exception().__cause__,
SucceedNeverException))
def testOverallTimeoutWithTimeoutError(self):
- loop = global_event_loop()
+ loop = global_event_loop()._asyncio_wrapper
# results in TimeoutError because it hangs forever
func = HangForever()
func_coroutine = functools.partial(loop.run_in_executor, None,
func)
decorator = retry(try_timeout=0.1, overall_timeout=0.3,
delay_func=RandomExponentialBackoff(multiplier=0.1,
base=2))
decorated_func = decorator(func_coroutine)
- done, pending =
loop.run_until_complete(wait([decorated_func()]))
+ done, pending =
loop.run_until_complete(asyncio.wait([decorated_func()], loop=loop))
self.assertEqual(len(done), 1)
- self.assertTrue(isinstance(done[0].exception().__cause__,
TimeoutError))
+ self.assertTrue(isinstance(done.pop().exception().__cause__,
TimeoutError))
diff --git a/pym/portage/util/futures/_asyncio.py
b/pym/portage/util/futures/_asyncio/__init__.py
similarity index 94%
rename from pym/portage/util/futures/_asyncio.py
rename to pym/portage/util/futures/_asyncio/__init__.py
index 0f84f14b7..eca4ea284 100644
--- a/pym/portage/util/futures/_asyncio.py
+++ b/pym/portage/util/futures/_asyncio/__init__.py
@@ -2,6 +2,9 @@
# Distributed under the terms of the GNU General Public License v2
__all__ = (
+ 'ALL_COMPLETED',
+ 'FIRST_COMPLETED',
+ 'FIRST_EXCEPTION',
'ensure_future',
'get_child_watcher',
'get_event_loop',
@@ -10,6 +13,7 @@ __all__ = (
'set_event_loop_policy',
'sleep',
'Task',
+ 'wait',
)
try:
@@ -22,6 +26,13 @@ portage.proxy.lazyimport.lazyimport(globals(),
'portage.util.futures.unix_events:DefaultEventLoopPolicy',
)
from portage.util.futures.futures import Future
+from portage.util.futures._asyncio.tasks import (
+ ALL_COMPLETED,
+ FIRST_COMPLETED,
+ FIRST_EXCEPTION,
+ wait,
+)
+
_lock = threading.Lock()
_policy = None
diff --git a/pym/portage/util/futures/wait.py
b/pym/portage/util/futures/_asyncio/tasks.py
similarity index 89%
rename from pym/portage/util/futures/wait.py
rename to pym/portage/util/futures/_asyncio/tasks.py
index bd85bb053..392a58e64 100644
--- a/pym/portage/util/futures/wait.py
+++ b/pym/portage/util/futures/_asyncio/tasks.py
@@ -1,6 +1,13 @@
# Copyright 2018 Gentoo Foundation
# Distributed under the terms of the GNU General Public License v2
+___all___ = (
+ 'ALL_COMPLETED',
+ 'FIRST_COMPLETED',
+ 'FIRST_EXCEPTION',
+ 'wait',
+)
+
try:
from asyncio import ALL_COMPLETED, FIRST_COMPLETED, FIRST_EXCEPTION
except ImportError:
@@ -8,7 +15,10 @@ except ImportError:
FIRST_COMPLETED ='FIRST_COMPLETED'
FIRST_EXCEPTION = 'FIRST_EXCEPTION'
-from portage.util._eventloop.global_event_loop import global_event_loop
+
+from portage.util._eventloop.global_event_loop import (
+ global_event_loop as _global_event_loop,
+)
def wait(futures, loop=None, timeout=None, return_when=ALL_COMPLETED):
@@ -30,7 +40,8 @@ def wait(futures, loop=None, timeout=None,
return_when=ALL_COMPLETED):
@return: tuple of (done, pending).
@rtype: asyncio.Future (or compatible)
"""
- loop = loop or global_event_loop()
+ loop = loop or _global_event_loop()
+ loop = getattr(loop, '_asyncio_wrapper', loop)
result_future = loop.create_future()
_Waiter(futures, timeout, return_when, result_future, loop)
return result_future
@@ -89,4 +100,4 @@ class _Waiter(object):
else:
pending.append(future)
future.remove_done_callback(self._done_callback)
- self._result_future.set_result((done, pending))
+ self._result_future.set_result((set(done), set(pending)))
diff --git a/pym/portage/util/futures/iter_completed.py
b/pym/portage/util/futures/iter_completed.py
index 583a20f3b..8d324de84 100644
--- a/pym/portage/util/futures/iter_completed.py
+++ b/pym/portage/util/futures/iter_completed.py
@@ -6,7 +6,7 @@ import multiprocessing
from portage.util._async.AsyncTaskFuture import AsyncTaskFuture
from portage.util._async.TaskScheduler import TaskScheduler
from portage.util._eventloop.global_event_loop import global_event_loop
-from portage.util.futures.wait import wait, FIRST_COMPLETED
+from portage.util.futures import asyncio
def iter_completed(futures, max_jobs=None, max_load=None, loop=None):
@@ -53,7 +53,8 @@ def iter_completed(futures, max_jobs=None, max_load=None,
loop=None):
# task_generator is exhausted
while future_map:
done, pending = loop.run_until_complete(
- wait(list(future_map.values()),
return_when=FIRST_COMPLETED))
+ asyncio.wait(list(future_map.values()),
+ return_when=asyncio.FIRST_COMPLETED, loop=loop))
for future in done:
del future_map[id(future)]
yield future