commit: a9e8ebaa6979ccf0bb385e457d695bedc7b65bf5 Author: Zac Medico <zmedico <AT> gentoo <DOT> org> AuthorDate: Tue Apr 17 09:51:36 2018 +0000 Commit: Zac Medico <zmedico <AT> gentoo <DOT> org> CommitDate: Tue Apr 17 17:44:58 2018 +0000 URL: https://gitweb.gentoo.org/proj/portage.git/commit/?id=a9e8ebaa
Add async_iter_completed for asyncio migration (bug 591760) This serves as a wrapper around portage's internal TaskScheduler class, allowing TaskScheduler API consumers to be migrated to use asyncio interfaces. Bug: https://bugs.gentoo.org/591760 .../tests/util/futures/test_iter_completed.py | 37 ++++++++++++- pym/portage/util/futures/iter_completed.py | 61 +++++++++++++++++++--- 2 files changed, 91 insertions(+), 7 deletions(-) diff --git a/pym/portage/tests/util/futures/test_iter_completed.py b/pym/portage/tests/util/futures/test_iter_completed.py index 9c23aefb1..1344523c6 100644 --- a/pym/portage/tests/util/futures/test_iter_completed.py +++ b/pym/portage/tests/util/futures/test_iter_completed.py @@ -5,7 +5,11 @@ import time from portage.tests import TestCase from portage.util._async.ForkProcess import ForkProcess from portage.util._eventloop.global_event_loop import global_event_loop -from portage.util.futures.iter_completed import iter_completed +from portage.util.futures import asyncio +from portage.util.futures.iter_completed import ( + iter_completed, + async_iter_completed, +) class SleepProcess(ForkProcess): @@ -48,3 +52,34 @@ class IterCompletedTestCase(TestCase): for seconds, future in zip(expected_order, iter_completed(future_generator(), max_jobs=True, max_load=None, loop=loop)): self.assertEqual(seconds, future.result()) + + def testAsyncCancel(self): + + loop = global_event_loop()._asyncio_wrapper + input_futures = set() + future_count = 3 + + def future_generator(): + for i in range(future_count): + future = loop.create_future() + loop.call_soon(lambda future: None if future.done() + else future.set_result(None), future) + input_futures.add(future) + yield future + + for future_done_set in async_iter_completed(future_generator(), + max_jobs=True, max_load=None, loop=loop): + future_done_set.cancel() + break + + # With max_jobs=True, async_iter_completed should have executed + # the generator until it raised StopIteration. + self.assertEqual(future_count, len(input_futures)) + + loop.run_until_complete(asyncio.wait(input_futures, loop=loop)) + + # The futures may have results or they may have been cancelled + # by TaskScheduler, and behavior varies depending on the python + # interpreter. + for future in input_futures: + future.cancelled() or future.result() diff --git a/pym/portage/util/futures/iter_completed.py b/pym/portage/util/futures/iter_completed.py index 8d324de84..5ad075305 100644 --- a/pym/portage/util/futures/iter_completed.py +++ b/pym/portage/util/futures/iter_completed.py @@ -1,6 +1,7 @@ # Copyright 2018 Gentoo Foundation # Distributed under the terms of the GNU General Public License v2 +import functools import multiprocessing from portage.util._async.AsyncTaskFuture import AsyncTaskFuture @@ -31,6 +32,38 @@ def iter_completed(futures, max_jobs=None, max_load=None, loop=None): """ loop = loop or global_event_loop() loop = getattr(loop, '_asyncio_wrapper', loop) + + for future_done_set in async_iter_completed(futures, + max_jobs=max_jobs, max_load=max_load, loop=loop): + for future in loop.run_until_complete(future_done_set): + yield future + + +def async_iter_completed(futures, max_jobs=None, max_load=None, loop=None): + """ + An asynchronous version of iter_completed. This yields futures, which + when done, result in a set of input futures that are done. This serves + as a wrapper around portage's internal TaskScheduler class, using + standard asyncio interfaces. + + @param futures: iterator of asyncio.Future (or compatible) + @type futures: iterator + @param max_jobs: max number of futures to process concurrently (default + is multiprocessing.cpu_count()) + @type max_jobs: int + @param max_load: max load allowed when scheduling a new future, + otherwise schedule no more than 1 future at a time (default + is multiprocessing.cpu_count()) + @type max_load: int or float + @param loop: event loop + @type loop: EventLoop + @return: iterator of futures, which when done, result in a set of + input futures that are done + @rtype: iterator + """ + loop = loop or global_event_loop() + loop = getattr(loop, '_asyncio_wrapper', loop) + max_jobs = max_jobs or multiprocessing.cpu_count() max_load = max_load or multiprocessing.cpu_count() @@ -46,19 +79,35 @@ def iter_completed(futures, max_jobs=None, max_load=None, loop=None): max_load=max_load, event_loop=loop._loop) + def done_callback(future_done_set, wait_result): + """Propagate results from wait_result to future_done_set.""" + if future_done_set.cancelled(): + return + done, pending = wait_result.result() + for future in done: + del future_map[id(future)] + future_done_set.set_result(done) + + def cancel_callback(wait_result, future_done_set): + """Cancel wait_result if future_done_set has been cancelled.""" + if future_done_set.cancelled() and not wait_result.done(): + wait_result.cancel() + try: scheduler.start() # scheduler should ensure that future_map is non-empty until # task_generator is exhausted while future_map: - done, pending = loop.run_until_complete( + wait_result = asyncio.ensure_future( asyncio.wait(list(future_map.values()), - return_when=asyncio.FIRST_COMPLETED, loop=loop)) - for future in done: - del future_map[id(future)] - yield future - + return_when=asyncio.FIRST_COMPLETED, loop=loop), loop=loop) + future_done_set = loop.create_future() + future_done_set.add_done_callback( + functools.partial(cancel_callback, wait_result)) + wait_result.add_done_callback( + functools.partial(done_callback, future_done_set)) + yield future_done_set finally: # cleanup in case of interruption by SIGINT, etc scheduler.cancel()