The iter_completed function is similar to asyncio.as_completed, but
takes an iterator of futures as input, and includes support for
max_jobs and max_load parameters. The default values for max_jobs
and max_load correspond to multiprocessing.cpu_count().

Example usage for async_aux_get:

  import portage
  from portage.util.futures.iter_completed import iter_completed

  portdb = portage.portdb
  future_cpv = {}

  def future_generator():
    for cpv in portdb.cp_list('sys-apps/portage'):
      future = portdb.async_aux_get(cpv, portage.auxdbkeys)
      future_cpv[id(future)] = cpv
      yield future

  for future in iter_completed(future_generator()):
    cpv = future_cpv.pop(id(future))
    try:
      result = future.result()
    except KeyError as e:
      # aux_get failed
      print('error:', cpv, e)
    else:
      print(cpv, result)

See: https://docs.python.org/3/library/asyncio-task.html#asyncio.as_completed
Bug: https://bugs.gentoo.org/648790
---
 .../tests/util/futures/test_iter_completed.py      | 50 ++++++++++++
 pym/portage/util/_async/FuturePollTask.py          | 27 ++++++
 pym/portage/util/futures/iter_completed.py         | 63 ++++++++++++++
 pym/portage/util/futures/wait.py                   | 95 ++++++++++++++++++++++
 4 files changed, 235 insertions(+)
 create mode 100644 pym/portage/tests/util/futures/test_iter_completed.py
 create mode 100644 pym/portage/util/_async/FuturePollTask.py
 create mode 100644 pym/portage/util/futures/iter_completed.py
 create mode 100644 pym/portage/util/futures/wait.py

diff --git a/pym/portage/tests/util/futures/test_iter_completed.py 
b/pym/portage/tests/util/futures/test_iter_completed.py
new file mode 100644
index 000000000..6607d871c
--- /dev/null
+++ b/pym/portage/tests/util/futures/test_iter_completed.py
@@ -0,0 +1,50 @@
+# Copyright 2018 Gentoo Foundation
+# Distributed under the terms of the GNU General Public License v2
+
+import time
+from portage.tests import TestCase
+from portage.util._async.ForkProcess import ForkProcess
+from portage.util._eventloop.global_event_loop import global_event_loop
+from portage.util.futures.iter_completed import iter_completed
+
+
+class SleepProcess(ForkProcess):
+       __slots__ = ('future', 'seconds')
+       def _start(self):
+               self.addExitListener(self._future_done)
+               ForkProcess._start(self)
+
+       def _future_done(self, task):
+               self.future.set_result(self.seconds)
+
+       def _run(self):
+               time.sleep(self.seconds)
+
+
+class IterCompletedTestCase(TestCase):
+
+       def testIterCompleted(self):
+
+               # Mark this as todo, since we don't want to fail if heavy system
+               # load causes the tasks to finish in an unexpected order.
+               self.todo = True
+
+               loop = global_event_loop()
+               tasks = [
+                       SleepProcess(seconds=0.200),
+                       SleepProcess(seconds=0.100),
+                       SleepProcess(seconds=0.001),
+               ]
+
+               expected_order = sorted(task.seconds for task in tasks)
+
+               def future_generator():
+                       for task in tasks:
+                               task.future = loop.create_future()
+                               task.scheduler = loop
+                               task.start()
+                               yield task.future
+
+               for seconds, future in zip(expected_order, 
iter_completed(future_generator(),
+                       max_jobs=None, max_load=None, loop=loop)):
+                       self.assertEqual(seconds, future.result())
diff --git a/pym/portage/util/_async/FuturePollTask.py 
b/pym/portage/util/_async/FuturePollTask.py
new file mode 100644
index 000000000..6b7cdf7d5
--- /dev/null
+++ b/pym/portage/util/_async/FuturePollTask.py
@@ -0,0 +1,27 @@
+# Copyright 2018 Gentoo Foundation
+# Distributed under the terms of the GNU General Public License v2
+
+import os
+import signal
+
+from _emerge.AbstractPollTask import AbstractPollTask
+
+
+class FuturePollTask(AbstractPollTask):
+       """
+       Wraps a Future in an AsynchronousTask, which is useful for
+       scheduling with TaskScheduler.
+       """
+       __slots__ = ('future',)
+       def _start(self):
+               self.future.add_done_callback(self._done_callback)
+
+       def _done_callback(self, future):
+               if future.cancelled():
+                       self.cancelled = True
+                       self.returncode = -signal.SIGINT
+               elif future.exception() is None:
+                       self.returncode = os.EX_OK
+               else:
+                       self.returncode = 1
+               self.wait()
diff --git a/pym/portage/util/futures/iter_completed.py 
b/pym/portage/util/futures/iter_completed.py
new file mode 100644
index 000000000..0540cc986
--- /dev/null
+++ b/pym/portage/util/futures/iter_completed.py
@@ -0,0 +1,63 @@
+# Copyright 2018 Gentoo Foundation
+# Distributed under the terms of the GNU General Public License v2
+
+import multiprocessing
+
+from portage.util._async.FuturePollTask import FuturePollTask
+from portage.util._async.TaskScheduler import TaskScheduler
+from portage.util._eventloop.global_event_loop import global_event_loop
+from portage.util.futures.wait import wait, FIRST_COMPLETED
+
+
+def iter_completed(futures, max_jobs=None, max_load=None, loop=None):
+       """
+       This is similar to asyncio.as_completed, but takes an iterator of
+       futures as input, and includes support for max_jobs and max_load
+       parameters.
+
+       @param futures: iterator of asyncio.Future (or compatible)
+       @type futures: iterator
+       @param max_jobs: max number of futures to process concurrently (default
+               is multiprocessing.cpu_count())
+       @type max_jobs: int
+       @param max_load: max load allowed when scheduling a new future,
+               otherwise schedule no more than 1 future at a time (default
+               is multiprocessing.cpu_count())
+       @type max_load: int or float
+       @param loop: event loop
+       @type loop: EventLoop
+       @return: iterator of futures that are done
+       @rtype: iterator
+       """
+       loop = loop or global_event_loop()
+       max_jobs = max_jobs or multiprocessing.cpu_count()
+       max_load = max_load or multiprocessing.cpu_count()
+
+       future_map = {}
+       def task_generator():
+               for future in futures:
+                       future_map[id(future)] = future
+                       yield FuturePollTask(future=future)
+
+       scheduler = TaskScheduler(
+               task_generator(),
+               max_jobs=max_jobs,
+               max_load=max_load,
+               event_loop=loop)
+
+       try:
+               scheduler.start()
+
+               # scheduler should ensure that future_map is non-empty until
+               # task_generator is exhausted
+               while future_map:
+                       done, pending = loop.run_until_complete(
+                               wait(*list(future_map.values()), 
return_when=FIRST_COMPLETED))
+                       for future in done:
+                               del future_map[id(future)]
+                               yield future
+
+       finally:
+               # cleanup in case of interruption by SIGINT, etc
+               scheduler.cancel()
+               scheduler.wait()
diff --git a/pym/portage/util/futures/wait.py b/pym/portage/util/futures/wait.py
new file mode 100644
index 000000000..a65a25ac4
--- /dev/null
+++ b/pym/portage/util/futures/wait.py
@@ -0,0 +1,95 @@
+# Copyright 2018 Gentoo Foundation
+# Distributed under the terms of the GNU General Public License v2
+
+try:
+       from asyncio import ALL_COMPLETED, FIRST_COMPLETED, FIRST_EXCEPTION
+except ImportError:
+       ALL_COMPLETED = 'ALL_COMPLETED'
+       FIRST_COMPLETED ='FIRST_COMPLETED'
+       FIRST_EXCEPTION = 'FIRST_EXCEPTION'
+
+from portage.util._eventloop.global_event_loop import global_event_loop
+
+
+# Use **kwargs since python2.7 does not allow arguments with defaults
+# to follow *futures.
+def wait(*futures, **kwargs):
+       """
+       Use portage's internal EventLoop to emulate asyncio.wait:
+       https://docs.python.org/3/library/asyncio-task.html#asyncio.wait
+
+       @param future: future to wait for
+       @type future: asyncio.Future (or compatible)
+       @param timeout: number of seconds to wait (wait indefinitely if
+               not specified)
+       @type timeout: int or float
+       @param return_when: indicates when this function should return, must
+               be one of the constants ALL_COMPLETED, FIRST_COMPLETED, or
+               FIRST_EXCEPTION (default is ALL_COMPLETED)
+       @type return_when: object
+       @param loop: event loop
+       @type loop: EventLoop
+       @return: tuple of (done, pending).
+       @rtype: asyncio.Future (or compatible)
+       """
+       if not futures:
+               raise TypeError("wait() missing 1 required positional argument: 
'future'")
+       loop = kwargs.pop('loop', None)
+       timeout = kwargs.pop('timeout', None)
+       return_when = kwargs.pop('return_when', ALL_COMPLETED)
+       if kwargs:
+               raise TypeError("wait() got an unexpected keyword argument 
'{}'".\
+                       format(next(iter(kwargs))))
+       loop = loop or global_event_loop()
+       result_future = loop.create_future()
+       _Waiter(futures, timeout, return_when, result_future, loop)
+       return result_future
+
+
+class _Waiter(object):
+       def __init__(self, futures, timeout, return_when, result_future, loop):
+               self._futures = futures
+               self._completed = set()
+               self._exceptions = set()
+               self._return_when = return_when
+               self._result_future = result_future
+               self._loop = loop
+               self._ready = False
+               self._timeout = None
+               for future in self._futures:
+                       future.add_done_callback(self._done_callback)
+               if timeout is not None:
+                       self._timeout = loop.call_later(timeout, 
self._timeout_callback)
+
+       def _timeout_callback(self):
+               if not self._ready:
+                       self._ready = True
+                       self._ready_callback()
+
+       def _done_callback(self, future):
+               if future.cancelled() or future.exception() is None:
+                       self._completed.add(id(future))
+               else:
+                       self._exceptions.add(id(future))
+               if not self._ready and (
+                       (self._return_when is FIRST_COMPLETED and 
self._completed) or
+                       (self._return_when is FIRST_EXCEPTION and 
self._exceptions) or
+                       (len(self._futures) == len(self._completed) + 
len(self._exceptions))):
+                       self._ready = True
+                       # use call_soon in case multiple callbacks complete in 
quick succession
+                       self._loop.call_soon(self._ready_callback)
+
+       def _ready_callback(self):
+               if self._timeout is not None:
+                       self._timeout.cancel()
+                       self._timeout = None
+               done = []
+               pending = []
+               done_ids = self._completed.union(self._exceptions)
+               for future in self._futures:
+                       if id(future) in done_ids:
+                               done.append(future)
+                       else:
+                               pending.append(future)
+                               future.remove_done_callback(self._done_callback)
+               self._result_future.set_result((done, pending))
-- 
2.13.6


Reply via email to