This is similar to asyncio.gather, but takes an iterator of futures as input, and includes support for max_jobs and max_load parameters. For bug 653946, this will be used to asynchronously gather the results of the portdbapi.async_fetch_map calls that are required to generate a Manifest, while using the max_jobs parameter to limit the number of concurrent async_aux_get calls.
Bug: https://bugs.gentoo.org/653946 --- pym/portage/util/futures/iter_completed.py | 68 ++++++++++++++++++++++++++++++ 1 file changed, 68 insertions(+) diff --git a/pym/portage/util/futures/iter_completed.py b/pym/portage/util/futures/iter_completed.py index 5ad075305..4e52a499f 100644 --- a/pym/portage/util/futures/iter_completed.py +++ b/pym/portage/util/futures/iter_completed.py @@ -112,3 +112,71 @@ def async_iter_completed(futures, max_jobs=None, max_load=None, loop=None): # cleanup in case of interruption by SIGINT, etc scheduler.cancel() scheduler.wait() + + +def iter_gather(futures, max_jobs=None, max_load=None, loop=None): + """ + This is similar to asyncio.gather, but takes an iterator of + futures as input, and includes support for max_jobs and max_load + parameters. + + @param futures: iterator of asyncio.Future (or compatible) + @type futures: iterator + @param max_jobs: max number of futures to process concurrently (default + is multiprocessing.cpu_count()) + @type max_jobs: int + @param max_load: max load allowed when scheduling a new future, + otherwise schedule no more than 1 future at a time (default + is multiprocessing.cpu_count()) + @type max_load: int or float + @param loop: event loop + @type loop: EventLoop + @return: a Future resulting in a list of done input futures, in the + same order that they were yielded from the input iterator + @rtype: asyncio.Future (or compatible) + """ + loop = loop or global_event_loop() + loop = getattr(loop, '_asyncio_wrapper', loop) + result = loop.create_future() + futures_list = [] + + def future_generator(): + for future in futures: + futures_list.append(future) + yield future + + completed_iter = async_iter_completed( + future_generator(), + max_jobs=max_jobs, + max_load=max_load, + loop=loop, + ) + + def handle_result(future_done_set): + if result.cancelled(): + return + + try: + handle_result.current_task = next(completed_iter) + except StopIteration: + result.set_result(futures_list) + else: + handle_result.current_task.add_done_callback(handle_result) + + try: + handle_result.current_task = next(completed_iter) + except StopIteration: + handle_result.current_task = None + result.set_result(futures_list) + else: + handle_result.current_task.add_done_callback(handle_result) + + def cancel_callback(result): + if (result.cancelled() and + handle_result.current_task is not None and + not handle_result.current_task.done()): + handle_result.current_task.cancel() + + result.add_done_callback(cancel_callback) + + return result -- 2.13.6