This is similar to asyncio.gather, but takes an iterator of
futures as input, and includes support for max_jobs and max_load
parameters. For bug 653946, this will be used to asynchronously
gather the results of the portdbapi.async_fetch_map calls that
are required to generate a Manifest, while using the max_jobs
parameter to limit the number of concurrent async_aux_get calls.

Bug: https://bugs.gentoo.org/653946
---
 pym/portage/util/futures/iter_completed.py | 68 ++++++++++++++++++++++++++++++
 1 file changed, 68 insertions(+)

diff --git a/pym/portage/util/futures/iter_completed.py 
b/pym/portage/util/futures/iter_completed.py
index 5ad075305..4e52a499f 100644
--- a/pym/portage/util/futures/iter_completed.py
+++ b/pym/portage/util/futures/iter_completed.py
@@ -112,3 +112,71 @@ def async_iter_completed(futures, max_jobs=None, 
max_load=None, loop=None):
                # cleanup in case of interruption by SIGINT, etc
                scheduler.cancel()
                scheduler.wait()
+
+
+def iter_gather(futures, max_jobs=None, max_load=None, loop=None):
+       """
+       This is similar to asyncio.gather, but takes an iterator of
+       futures as input, and includes support for max_jobs and max_load
+       parameters.
+
+       @param futures: iterator of asyncio.Future (or compatible)
+       @type futures: iterator
+       @param max_jobs: max number of futures to process concurrently (default
+               is multiprocessing.cpu_count())
+       @type max_jobs: int
+       @param max_load: max load allowed when scheduling a new future,
+               otherwise schedule no more than 1 future at a time (default
+               is multiprocessing.cpu_count())
+       @type max_load: int or float
+       @param loop: event loop
+       @type loop: EventLoop
+       @return: a Future resulting in a list of done input futures, in the
+               same order that they were yielded from the input iterator
+       @rtype: asyncio.Future (or compatible)
+       """
+       loop = loop or global_event_loop()
+       loop = getattr(loop, '_asyncio_wrapper', loop)
+       result = loop.create_future()
+       futures_list = []
+
+       def future_generator():
+               for future in futures:
+                       futures_list.append(future)
+                       yield future
+
+       completed_iter = async_iter_completed(
+               future_generator(),
+               max_jobs=max_jobs,
+               max_load=max_load,
+               loop=loop,
+       )
+
+       def handle_result(future_done_set):
+               if result.cancelled():
+                       return
+
+               try:
+                       handle_result.current_task = next(completed_iter)
+               except StopIteration:
+                       result.set_result(futures_list)
+               else:
+                       
handle_result.current_task.add_done_callback(handle_result)
+
+       try:
+               handle_result.current_task = next(completed_iter)
+       except StopIteration:
+               handle_result.current_task = None
+               result.set_result(futures_list)
+       else:
+               handle_result.current_task.add_done_callback(handle_result)
+
+       def cancel_callback(result):
+               if (result.cancelled() and
+                       handle_result.current_task is not None and
+                       not handle_result.current_task.done()):
+                       handle_result.current_task.cancel()
+
+       result.add_done_callback(cancel_callback)
+
+       return result
-- 
2.13.6


Reply via email to