commit:     a9e8ebaa6979ccf0bb385e457d695bedc7b65bf5
Author:     Zac Medico <zmedico <AT> gentoo <DOT> org>
AuthorDate: Tue Apr 17 09:51:36 2018 +0000
Commit:     Zac Medico <zmedico <AT> gentoo <DOT> org>
CommitDate: Tue Apr 17 17:44:58 2018 +0000
URL:        https://gitweb.gentoo.org/proj/portage.git/commit/?id=a9e8ebaa

Add async_iter_completed for asyncio migration (bug 591760)

This serves as a wrapper around portage's internal TaskScheduler
class, allowing TaskScheduler API consumers to be migrated to use
asyncio interfaces.

Bug: https://bugs.gentoo.org/591760

 .../tests/util/futures/test_iter_completed.py      | 37 ++++++++++++-
 pym/portage/util/futures/iter_completed.py         | 61 +++++++++++++++++++---
 2 files changed, 91 insertions(+), 7 deletions(-)

diff --git a/pym/portage/tests/util/futures/test_iter_completed.py 
b/pym/portage/tests/util/futures/test_iter_completed.py
index 9c23aefb1..1344523c6 100644
--- a/pym/portage/tests/util/futures/test_iter_completed.py
+++ b/pym/portage/tests/util/futures/test_iter_completed.py
@@ -5,7 +5,11 @@ import time
 from portage.tests import TestCase
 from portage.util._async.ForkProcess import ForkProcess
 from portage.util._eventloop.global_event_loop import global_event_loop
-from portage.util.futures.iter_completed import iter_completed
+from portage.util.futures import asyncio
+from portage.util.futures.iter_completed import (
+       iter_completed,
+       async_iter_completed,
+)
 
 
 class SleepProcess(ForkProcess):
@@ -48,3 +52,34 @@ class IterCompletedTestCase(TestCase):
                for seconds, future in zip(expected_order, 
iter_completed(future_generator(),
                        max_jobs=True, max_load=None, loop=loop)):
                        self.assertEqual(seconds, future.result())
+
+       def testAsyncCancel(self):
+
+               loop = global_event_loop()._asyncio_wrapper
+               input_futures = set()
+               future_count = 3
+
+               def future_generator():
+                       for i in range(future_count):
+                               future = loop.create_future()
+                               loop.call_soon(lambda future: None if 
future.done()
+                                       else future.set_result(None), future)
+                               input_futures.add(future)
+                               yield future
+
+               for future_done_set in async_iter_completed(future_generator(),
+                       max_jobs=True, max_load=None, loop=loop):
+                       future_done_set.cancel()
+                       break
+
+               # With max_jobs=True, async_iter_completed should have executed
+               # the generator until it raised StopIteration.
+               self.assertEqual(future_count, len(input_futures))
+
+               loop.run_until_complete(asyncio.wait(input_futures, loop=loop))
+
+               # The futures may have results or they may have been cancelled
+               # by TaskScheduler, and behavior varies depending on the python
+               # interpreter.
+               for future in input_futures:
+                       future.cancelled() or future.result()

diff --git a/pym/portage/util/futures/iter_completed.py 
b/pym/portage/util/futures/iter_completed.py
index 8d324de84..5ad075305 100644
--- a/pym/portage/util/futures/iter_completed.py
+++ b/pym/portage/util/futures/iter_completed.py
@@ -1,6 +1,7 @@
 # Copyright 2018 Gentoo Foundation
 # Distributed under the terms of the GNU General Public License v2
 
+import functools
 import multiprocessing
 
 from portage.util._async.AsyncTaskFuture import AsyncTaskFuture
@@ -31,6 +32,38 @@ def iter_completed(futures, max_jobs=None, max_load=None, 
loop=None):
        """
        loop = loop or global_event_loop()
        loop = getattr(loop, '_asyncio_wrapper', loop)
+
+       for future_done_set in async_iter_completed(futures,
+               max_jobs=max_jobs, max_load=max_load, loop=loop):
+               for future in loop.run_until_complete(future_done_set):
+                       yield future
+
+
+def async_iter_completed(futures, max_jobs=None, max_load=None, loop=None):
+       """
+       An asynchronous version of iter_completed. This yields futures, which
+       when done, result in a set of input futures that are done. This serves
+       as a wrapper around portage's internal TaskScheduler class, using
+       standard asyncio interfaces.
+
+       @param futures: iterator of asyncio.Future (or compatible)
+       @type futures: iterator
+       @param max_jobs: max number of futures to process concurrently (default
+               is multiprocessing.cpu_count())
+       @type max_jobs: int
+       @param max_load: max load allowed when scheduling a new future,
+               otherwise schedule no more than 1 future at a time (default
+               is multiprocessing.cpu_count())
+       @type max_load: int or float
+       @param loop: event loop
+       @type loop: EventLoop
+       @return: iterator of futures, which when done, result in a set of
+               input futures that are done
+       @rtype: iterator
+       """
+       loop = loop or global_event_loop()
+       loop = getattr(loop, '_asyncio_wrapper', loop)
+
        max_jobs = max_jobs or multiprocessing.cpu_count()
        max_load = max_load or multiprocessing.cpu_count()
 
@@ -46,19 +79,35 @@ def iter_completed(futures, max_jobs=None, max_load=None, 
loop=None):
                max_load=max_load,
                event_loop=loop._loop)
 
+       def done_callback(future_done_set, wait_result):
+               """Propagate results from wait_result to future_done_set."""
+               if future_done_set.cancelled():
+                       return
+               done, pending = wait_result.result()
+               for future in done:
+                       del future_map[id(future)]
+               future_done_set.set_result(done)
+
+       def cancel_callback(wait_result, future_done_set):
+               """Cancel wait_result if future_done_set has been cancelled."""
+               if future_done_set.cancelled() and not wait_result.done():
+                       wait_result.cancel()
+
        try:
                scheduler.start()
 
                # scheduler should ensure that future_map is non-empty until
                # task_generator is exhausted
                while future_map:
-                       done, pending = loop.run_until_complete(
+                       wait_result = asyncio.ensure_future(
                                asyncio.wait(list(future_map.values()),
-                               return_when=asyncio.FIRST_COMPLETED, loop=loop))
-                       for future in done:
-                               del future_map[id(future)]
-                               yield future
-
+                               return_when=asyncio.FIRST_COMPLETED, 
loop=loop), loop=loop)
+                       future_done_set = loop.create_future()
+                       future_done_set.add_done_callback(
+                               functools.partial(cancel_callback, wait_result))
+                       wait_result.add_done_callback(
+                               functools.partial(done_callback, 
future_done_set))
+                       yield future_done_set
        finally:
                # cleanup in case of interruption by SIGINT, etc
                scheduler.cancel()

Reply via email to