commit: be61882996099322bb3a1e82e71f475b4141ad40 Author: Zac Medico <zmedico <AT> gentoo <DOT> org> AuthorDate: Tue Apr 24 23:28:08 2018 +0000 Commit: Zac Medico <zmedico <AT> gentoo <DOT> org> CommitDate: Fri Apr 27 21:33:00 2018 +0000 URL: https://gitweb.gentoo.org/proj/portage.git/commit/?id=be618829
Add iter_gather function (bug 653946) This is similar to asyncio.gather, but takes an iterator of futures as input, and includes support for max_jobs and max_load parameters. For bug 653946, this will be used to asynchronously gather the results of the portdbapi.async_fetch_map calls that are required to generate a Manifest, while using the max_jobs parameter to limit the number of concurrent async_aux_get calls. Bug: https://bugs.gentoo.org/653946 pym/portage/util/futures/iter_completed.py | 73 ++++++++++++++++++++++++++++++ 1 file changed, 73 insertions(+) diff --git a/pym/portage/util/futures/iter_completed.py b/pym/portage/util/futures/iter_completed.py index 5ad075305..1d6a9a4bd 100644 --- a/pym/portage/util/futures/iter_completed.py +++ b/pym/portage/util/futures/iter_completed.py @@ -112,3 +112,76 @@ def async_iter_completed(futures, max_jobs=None, max_load=None, loop=None): # cleanup in case of interruption by SIGINT, etc scheduler.cancel() scheduler.wait() + + +def iter_gather(futures, max_jobs=None, max_load=None, loop=None): + """ + This is similar to asyncio.gather, but takes an iterator of + futures as input, and includes support for max_jobs and max_load + parameters. + + @param futures: iterator of asyncio.Future (or compatible) + @type futures: iterator + @param max_jobs: max number of futures to process concurrently (default + is multiprocessing.cpu_count()) + @type max_jobs: int + @param max_load: max load allowed when scheduling a new future, + otherwise schedule no more than 1 future at a time (default + is multiprocessing.cpu_count()) + @type max_load: int or float + @param loop: event loop + @type loop: EventLoop + @return: a Future resulting in a list of done input futures, in the + same order that they were yielded from the input iterator + @rtype: asyncio.Future (or compatible) + """ + loop = loop or global_event_loop() + loop = getattr(loop, '_asyncio_wrapper', loop) + result = loop.create_future() + futures_list = [] + + def future_generator(): + for future in futures: + futures_list.append(future) + yield future + + completed_iter = async_iter_completed( + future_generator(), + max_jobs=max_jobs, + max_load=max_load, + loop=loop, + ) + + def handle_result(future_done_set): + if result.cancelled(): + if not future_done_set.cancelled(): + # All exceptions must be consumed from future_done_set, in order + # to avoid triggering the event loop's exception handler. + list(future.exception() for future in future_done_set.result() + if not future.cancelled()) + return + + try: + handle_result.current_task = next(completed_iter) + except StopIteration: + result.set_result(futures_list) + else: + handle_result.current_task.add_done_callback(handle_result) + + try: + handle_result.current_task = next(completed_iter) + except StopIteration: + handle_result.current_task = None + result.set_result(futures_list) + else: + handle_result.current_task.add_done_callback(handle_result) + + def cancel_callback(result): + if (result.cancelled() and + handle_result.current_task is not None and + not handle_result.current_task.done()): + handle_result.current_task.cancel() + + result.add_done_callback(cancel_callback) + + return result
