In order to avoid event loop recursion, pass fetchlist_dict to ManifestTask as a Future.
Bug: https://bugs.gentoo.org/653946 --- .../ebuild/_parallel_manifest/ManifestScheduler.py | 82 ++++++++++++++++++---- .../ebuild/_parallel_manifest/ManifestTask.py | 22 ++++++ pym/portage/tests/dbapi/test_portdb_cache.py | 1 + 3 files changed, 91 insertions(+), 14 deletions(-) diff --git a/pym/portage/package/ebuild/_parallel_manifest/ManifestScheduler.py b/pym/portage/package/ebuild/_parallel_manifest/ManifestScheduler.py index 38ac4825e..07794522e 100644 --- a/pym/portage/package/ebuild/_parallel_manifest/ManifestScheduler.py +++ b/pym/portage/package/ebuild/_parallel_manifest/ManifestScheduler.py @@ -4,9 +4,9 @@ import portage from portage import os from portage.dep import _repo_separator -from portage.exception import InvalidDependString from portage.localization import _ from portage.util._async.AsyncScheduler import AsyncScheduler +from portage.util.futures.iter_completed import iter_gather from .ManifestTask import ManifestTask class ManifestScheduler(AsyncScheduler): @@ -63,21 +63,14 @@ class ManifestScheduler(AsyncScheduler): cpv_list = portdb.cp_list(cp, mytree=[repo_config.location]) if not cpv_list: continue - fetchlist_dict = {} - try: - for cpv in cpv_list: - fetchlist_dict[cpv] = \ - list(portdb.getFetchMap(cpv, mytree=mytree)) - except InvalidDependString as e: - portage.writemsg( - _("!!! %s%s%s: SRC_URI: %s\n") % - (cp, _repo_separator, repo_config.name, e), - noiselevel=-1) - self._error_count += 1 - continue + # Use _future_fetchlist(max_jobs=1), since we + # spawn concurrent ManifestTask instances. yield ManifestTask(cp=cp, distdir=distdir, - fetchlist_dict=fetchlist_dict, repo_config=repo_config, + fetchlist_dict=_future_fetchlist( + self._event_loop, portdb, repo_config, cp, cpv_list, + max_jobs=1), + repo_config=repo_config, gpg_cmd=self._gpg_cmd, gpg_vars=self._gpg_vars, force_sign_key=self._force_sign_key) @@ -91,3 +84,64 @@ class ManifestScheduler(AsyncScheduler): noiselevel=-1) AsyncScheduler._task_exit(self, task) + + +def _future_fetchlist(loop, portdb, repo_config, cp, cpv_list, + max_jobs=None, max_load=None): + """ + Asynchronous form of FetchlistDict, with max_jobs and max_load + parameters in order to control async_aux_get concurrency. + + @param loop: event loop + @type loop: EventLoop + @param portdb: portdbapi instance + @type portdb: portdbapi + @param repo_config: repository configuration for a Manifest + @type repo_config: RepoConfig + @param cp: cp for a Manifest + @type cp: str + @param cpv_list: list of ebuild cpv values for a Manifest + @type cpv_list: list + @param max_jobs: max number of futures to process concurrently (default + is multiprocessing.cpu_count()) + @type max_jobs: int + @param max_load: max load allowed when scheduling a new future, + otherwise schedule no more than 1 future at a time (default + is multiprocessing.cpu_count()) + @type max_load: int or float + @return: a Future resulting in a Mapping compatible with FetchlistDict + @rtype: asyncio.Future (or compatible) + """ + loop = getattr(loop, '_asyncio_wrapper', loop) + result = loop.create_future() + + def gather_done(gather_result): + if result.cancelled(): + return + + e = None + for future in gather_result.result(): + if (future.done() and future.exception() is not None): + # Retrieve exceptions from all futures in order to + # avoid triggering the event loop's error handler. + e = future.exception() + + if e is None: + result.set_result(dict((k, list(v.result())) + for k, v in zip(cpv_list, gather_result.result()))) + else: + result.set_exception(e) + + gather_result = iter_gather( + (portdb.async_fetch_map(cpv, mytree=repo_config.location, loop=loop) + for cpv in cpv_list), + max_jobs=max_jobs, + max_load=max_load, + loop=loop, + ) + + gather_result.add_done_callback(gather_done) + result.add_done_callback(lambda result: + gather_result.cancel if result.cancelled() else gather_result) + + return result diff --git a/pym/portage/package/ebuild/_parallel_manifest/ManifestTask.py b/pym/portage/package/ebuild/_parallel_manifest/ManifestTask.py index 0ee2b910d..6f5fe5b16 100644 --- a/pym/portage/package/ebuild/_parallel_manifest/ManifestTask.py +++ b/pym/portage/package/ebuild/_parallel_manifest/ManifestTask.py @@ -8,8 +8,12 @@ import subprocess from portage import os from portage import _unicode_encode, _encodings from portage.const import MANIFEST2_IDENTIFIERS +from portage.dep import _repo_separator +from portage.exception import InvalidDependString +from portage.localization import _ from portage.util import (atomic_ofstream, grablines, shlex_split, varexpand, writemsg) +from portage.util._async.AsyncTaskFuture import AsyncTaskFuture from portage.util._async.PipeLogger import PipeLogger from portage.util._async.PopenProcess import PopenProcess from _emerge.CompositeTask import CompositeTask @@ -29,6 +33,24 @@ class ManifestTask(CompositeTask): def _start(self): self._manifest_path = os.path.join(self.repo_config.location, self.cp, "Manifest") + + self._start_task( + AsyncTaskFuture(future=self.fetchlist_dict), + self._start_with_fetchlist) + + def _start_with_fetchlist(self, fetchlist_task): + if self._default_exit(fetchlist_task) != os.EX_OK: + if not self.fetchlist_dict.cancelled(): + try: + self.fetchlist_dict.result() + except InvalidDependString as e: + writemsg( + _("!!! %s%s%s: SRC_URI: %s\n") % + (self.cp, _repo_separator, self.repo_config.name, e), + noiselevel=-1) + self._async_wait() + return + self.fetchlist_dict = self.fetchlist_dict.result() manifest_proc = ManifestProcess(cp=self.cp, distdir=self.distdir, fetchlist_dict=self.fetchlist_dict, repo_config=self.repo_config, scheduler=self.scheduler) diff --git a/pym/portage/tests/dbapi/test_portdb_cache.py b/pym/portage/tests/dbapi/test_portdb_cache.py index bd934460a..1f139b256 100644 --- a/pym/portage/tests/dbapi/test_portdb_cache.py +++ b/pym/portage/tests/dbapi/test_portdb_cache.py @@ -38,6 +38,7 @@ class PortdbCacheTestCase(TestCase): portage_python = portage._python_interpreter egencache_cmd = (portage_python, "-b", "-Wd", os.path.join(self.bindir, "egencache"), + "--update-manifests", "--sign-manifests=n", "--repo", "test_repo", "--repositories-configuration", settings.repositories.config_string()) python_cmd = (portage_python, "-b", "-Wd", "-c") -- 2.13.6