Since construction of FetchTask instances requires results from aux_get calls that would trigger event loop recursion when executed synchronously, add a _fetch_tasks_future function to construct FetchTask instances asynchronously and return a Future. Use an _EbuildFetchTasks class to wait for the FetchTask instances to become available, and then execute them.
Bug: https://bugs.gentoo.org/654038 --- pym/portage/_emirrordist/FetchIterator.py | 311 ++++++++++++++++++++---------- 1 file changed, 206 insertions(+), 105 deletions(-) diff --git a/pym/portage/_emirrordist/FetchIterator.py b/pym/portage/_emirrordist/FetchIterator.py index 38419799d..bd3b98cd0 100644 --- a/pym/portage/_emirrordist/FetchIterator.py +++ b/pym/portage/_emirrordist/FetchIterator.py @@ -7,14 +7,18 @@ from portage import os from portage.checksum import (_apply_hash_filter, _filter_unaccelarated_hashes, _hash_filter) from portage.dep import use_reduce -from portage.exception import PortageException +from portage.exception import PortageException, PortageKeyError +from portage.util._async.AsyncTaskFuture import AsyncTaskFuture +from portage.util._async.TaskScheduler import TaskScheduler +from portage.util.futures.iter_completed import iter_gather from .FetchTask import FetchTask +from _emerge.CompositeTask import CompositeTask + class FetchIterator(object): def __init__(self, config): self._config = config - self._log_failure = config.log_failure self._terminated = threading.Event() def terminate(self): @@ -41,9 +45,6 @@ class FetchIterator(object): portdb = self._config.portdb get_repo_for_location = portdb.repositories.get_repo_for_location - file_owners = self._config.file_owners - file_failures = self._config.file_failures - restrict_mirror_exemptions = self._config.restrict_mirror_exemptions hash_filter = _hash_filter( portdb.settings.get("PORTAGE_CHECKSUM_FILTER", "")) @@ -59,110 +60,210 @@ class FetchIterator(object): # Reset state so the Manifest is pulled once # for this cp / tree combination. - digests = None repo_config = get_repo_for_location(tree) + digests_future = portdb._event_loop.create_future() for cpv in portdb.cp_list(cp, mytree=tree): if self._terminated.is_set(): return - try: - restrict, = portdb.aux_get(cpv, ("RESTRICT",), - mytree=tree) - except (KeyError, PortageException) as e: - self._log_failure("%s\t\taux_get exception %s" % - (cpv, e)) - continue - - # Here we use matchnone=True to ignore conditional parts - # of RESTRICT since they don't apply unconditionally. - # Assume such conditionals only apply on the client side. - try: - restrict = frozenset(use_reduce(restrict, - flat=True, matchnone=True)) - except PortageException as e: - self._log_failure("%s\t\tuse_reduce exception %s" % - (cpv, e)) - continue - - if "fetch" in restrict: - continue - - try: - uri_map = portdb.getFetchMap(cpv) - except PortageException as e: - self._log_failure("%s\t\tgetFetchMap exception %s" % - (cpv, e)) - continue - - if not uri_map: - continue - - if "mirror" in restrict: - skip = False - if restrict_mirror_exemptions is not None: - new_uri_map = {} - for filename, uri_tuple in uri_map.items(): - for uri in uri_tuple: - if uri[:9] == "mirror://": - i = uri.find("/", 9) - if i != -1 and uri[9:i].strip("/") in \ - restrict_mirror_exemptions: - new_uri_map[filename] = uri_tuple - break - if new_uri_map: - uri_map = new_uri_map - else: - skip = True - else: - skip = True - - if skip: - continue - - # Parse Manifest for this cp if we haven't yet. - if digests is None: - try: - digests = repo_config.load_manifest( - os.path.join(repo_config.location, cp) - ).getTypeDigests("DIST") - except (EnvironmentError, PortageException) as e: - for filename in uri_map: - self._log_failure( - "%s\t%s\tManifest exception %s" % - (cpv, filename, e)) - file_failures[filename] = cpv - continue - - if not digests: - for filename in uri_map: - self._log_failure("%s\t%s\tdigest entry missing" % - (cpv, filename)) - file_failures[filename] = cpv - continue - - for filename, uri_tuple in uri_map.items(): - file_digests = digests.get(filename) - if file_digests is None: - self._log_failure("%s\t%s\tdigest entry missing" % - (cpv, filename)) - file_failures[filename] = cpv - continue - if filename in file_owners: - continue - file_owners[filename] = cpv - - file_digests = \ - _filter_unaccelarated_hashes(file_digests) - if hash_filter is not None: - file_digests = _apply_hash_filter( - file_digests, hash_filter) - - yield FetchTask(cpv=cpv, - background=True, - digests=file_digests, - distfile=filename, - restrict=restrict, - uri_tuple=uri_tuple, - config=self._config) + yield _EbuildFetchTasks( + fetch_tasks_future=_fetch_tasks_future( + self._config, + hash_filter, + repo_config, + digests_future, + cpv) + ) + + +class _EbuildFetchTasks(CompositeTask): + """ + This executes asynchronously constructed FetchTask instances for + each of the files referenced by an ebuild. + """ + __slots__ = ('fetch_tasks_future',) + def _start(self): + self._start_task(AsyncTaskFuture(future=self.fetch_tasks_future), + self._start_fetch_tasks) + + def _start_fetch_tasks(self, task): + if self._default_exit(task) != os.EX_OK: + self._async_wait() + return + + self._start_task( + TaskScheduler( + iter(self.fetch_tasks_future.result()), + max_jobs=1, + event_loop=self.scheduler), + self._default_final_exit) + + +def _fetch_tasks_future(config, hash_filter, repo_config, digests_future, cpv): + """ + Asynchronously construct FetchTask instances for each of the files + referenced by an ebuild. + + @param config: emirrordist config + @type config: portage._emirrordist.Config.Config + @param hash_filter: PORTAGE_CHECKSUM_FILTER settings + @type hash_filter: portage.checksum._hash_filter + @param repo_config: repository configuration + @type repo_config: RepoConfig + @param digests_future: future that contains cached distfiles digests + for the current cp if available + @type digests_future: asyncio.Future + @param cpv: current ebuild cpv + @type cpv: portage.versions._pkg_str + + @return: A future that results in a list containing FetchTask + instances for each of the files referenced by an ebuild. + @rtype: asyncio.Future (or compatible) + """ + + loop = config.portdb._event_loop + result = loop.create_future() + fetch_tasks = [] + + def aux_get_done(gather_result): + if result.cancelled(): + return + + aux_get_result, fetch_map_result = gather_result.result() + try: + restrict, = aux_get_result.result() + except (PortageKeyError, PortageException) as e: + config.log_failure("%s\t\taux_get exception %s" % + (cpv, e)) + result.set_result(fetch_tasks) + return + + # Here we use matchnone=True to ignore conditional parts + # of RESTRICT since they don't apply unconditionally. + # Assume such conditionals only apply on the client side. + try: + restrict = frozenset(use_reduce(restrict, + flat=True, matchnone=True)) + except PortageException as e: + config.log_failure("%s\t\tuse_reduce exception %s" % + (cpv, e)) + result.set_result(fetch_tasks) + return + + if "fetch" in restrict: + result.set_result(fetch_tasks) + return + + try: + uri_map = fetch_map_result.result() + except PortageException as e: + config.log_failure("%s\t\tgetFetchMap exception %s" % + (cpv, e)) + result.set_result(fetch_tasks) + return + + if not uri_map: + result.set_result(fetch_tasks) + return + + if "mirror" in restrict: + skip = False + if config.restrict_mirror_exemptions is not None: + new_uri_map = {} + for filename, uri_tuple in uri_map.items(): + for uri in uri_tuple: + if uri[:9] == "mirror://": + i = uri.find("/", 9) + if i != -1 and uri[9:i].strip("/") in \ + config.restrict_mirror_exemptions: + new_uri_map[filename] = uri_tuple + break + if new_uri_map: + uri_map = new_uri_map + else: + skip = True + else: + skip = True + + if skip: + result.set_result(fetch_tasks) + return + + # Parse Manifest for this cp if we haven't yet. + if not digests_future.done(): + try: + digests = repo_config.load_manifest( + os.path.join(repo_config.location, cpv.cp)).\ + getTypeDigests("DIST") + except (EnvironmentError, PortageException) as e: + digests_future.set_exception(e) + for filename in uri_map: + config.log_failure( + "%s\t%s\tManifest exception %s" % + (cpv, filename, e)) + config.file_failures[filename] = cpv + result.set_result(fetch_tasks) + return + else: + digests_future.set_result(digests) + + digests = digests_future.result() + if not digests: + for filename in uri_map: + config.log_failure("%s\t%s\tdigest entry missing" % + (cpv, filename)) + config.file_failures[filename] = cpv + result.set_result(fetch_tasks) + return + + for filename, uri_tuple in uri_map.items(): + file_digests = digests.get(filename) + if file_digests is None: + config.log_failure("%s\t%s\tdigest entry missing" % + (cpv, filename)) + config.file_failures[filename] = cpv + continue + if filename in config.file_owners: + continue + config.file_owners[filename] = cpv + + file_digests = \ + _filter_unaccelarated_hashes(file_digests) + if hash_filter is not None: + file_digests = _apply_hash_filter( + file_digests, hash_filter) + + fetch_tasks.append(FetchTask( + cpv=cpv, + background=True, + digests=file_digests, + distfile=filename, + restrict=restrict, + uri_tuple=uri_tuple, + config=config)) + + result.set_result(fetch_tasks) + + def future_generator(): + yield config.portdb.async_aux_get(cpv, ("RESTRICT",), + myrepo=repo_config.name, loop=loop) + yield config.portdb.async_fetch_map(cpv, + mytree=repo_config.location, loop=loop) + + # Use iter_gather(max_jobs=1) to limit the number of processes per + # _EbuildFetchTask instance, and also to avoid spawning two bash + # processes for the same cpv simultaneously (the second one can + # use metadata cached by the first one). + gather_result = iter_gather( + future_generator(), + max_jobs=1, + loop=loop, + ) + gather_result.add_done_callback(aux_get_done) + result.add_done_callback(lambda result: + gather_result.cancel() if result.cancelled() and + not gather_result.done() else None) + + return result -- 2.13.6