Execute pkg_pretend phases in a coroutine while parallel-fetch is running concurrently. When it's time to execute the pkg_pretend phase for a remote binary package, use a Scheduler _get_prefetcher method to get a running prefetcher if available, and otherwise start a new fetcher.
Since pkg_pretend phases now run inside of the --keep-going retry loop, --keep-going is now able to recover from pkg_pretend failures, which fixes bug 404157. Bug: https://bugs.gentoo.org/404157 Bug: https://bugs.gentoo.org/710432 Signed-off-by: Zac Medico <zmed...@gentoo.org> --- [PATCH v2] records failed packages for correct interaction with emerge --keep-going, which fixes bug 404157 lib/_emerge/Scheduler.py | 142 +++++++++++++++++++++++++++------------ 1 file changed, 99 insertions(+), 43 deletions(-) diff --git a/lib/_emerge/Scheduler.py b/lib/_emerge/Scheduler.py index a69421288..465f928a0 100644 --- a/lib/_emerge/Scheduler.py +++ b/lib/_emerge/Scheduler.py @@ -25,6 +25,7 @@ from portage._sets import SETPREFIX from portage._sets.base import InternalPackageSet from portage.util import ensure_dirs, writemsg, writemsg_level from portage.util.futures import asyncio +from portage.util.futures.compat_coroutine import coroutine, coroutine_return from portage.util.SlotObject import SlotObject from portage.util._async.SchedulerInterface import SchedulerInterface from portage.package.ebuild.digestcheck import digestcheck @@ -766,7 +767,8 @@ class Scheduler(PollScheduler): return prefetcher - def _run_pkg_pretend(self): + @coroutine + def _run_pkg_pretend(self, loop=None): """ Since pkg_pretend output may be important, this method sends all output directly to stdout (regardless of options like --quiet or @@ -774,7 +776,7 @@ class Scheduler(PollScheduler): """ failures = 0 - sched_iface = self._sched_iface + sched_iface = loop = asyncio._wrap_loop(loop or self._sched_iface) for x in self._mergelist: if not isinstance(x, Package): @@ -789,18 +791,28 @@ class Scheduler(PollScheduler): if "pretend" not in x.defined_phases: continue - out_str =">>> Running pre-merge checks for " + colorize("INFORM", x.cpv) + "\n" - portage.util.writemsg_stdout(out_str, noiselevel=-1) + out_str = "Running pre-merge checks for " + colorize("INFORM", x.cpv) + self._status_msg(out_str) root_config = x.root_config - settings = self.pkgsettings[root_config.root] + settings = self._allocate_config(root_config.root) settings.setcpv(x) + if not x.built: + # Get required SRC_URI metadata (it's not cached in x.metadata + # because some packages have an extremely large SRC_URI value). + portdb = root_config.trees["porttree"].dbapi + (settings.configdict["pkg"]["SRC_URI"],) = yield portdb.async_aux_get( + x.cpv, ["SRC_URI"], myrepo=x.repo, loop=loop + ) # setcpv/package.env allows for per-package PORTAGE_TMPDIR so we # have to validate it for each package rval = _check_temp_dir(settings) if rval != os.EX_OK: - return rval + failures += 1 + self._record_pkg_failure(x, settings, FAILURE) + self._deallocate_config(settings) + continue build_dir_path = os.path.join( os.path.realpath(settings["PORTAGE_TMPDIR"]), @@ -809,7 +821,7 @@ class Scheduler(PollScheduler): settings["PORTAGE_BUILDDIR"] = build_dir_path build_dir = EbuildBuildDir(scheduler=sched_iface, settings=settings) - sched_iface.run_until_complete(build_dir.async_lock()) + yield build_dir.async_lock() current_task = None try: @@ -835,7 +847,7 @@ class Scheduler(PollScheduler): phase='clean', scheduler=sched_iface, settings=settings) current_task = clean_phase clean_phase.start() - clean_phase.wait() + yield clean_phase.async_wait() if x.built: tree = "bintree" @@ -845,13 +857,19 @@ class Scheduler(PollScheduler): # Display fetch on stdout, so that it's always clear what # is consuming time here. if bintree.isremote(x.cpv): - fetcher = BinpkgFetcher(pkg=x, - scheduler=sched_iface) - fetcher.start() - if fetcher.wait() != os.EX_OK: + fetcher = self._get_prefetcher(x) + if fetcher is None: + fetcher = BinpkgFetcher(pkg=x, scheduler=loop) + fetcher.start() + # We only set the fetched value when fetcher + # is a BinpkgFetcher, since BinpkgPrefetcher + # handles fetch, verification, and the + # bintree.inject call which moves the file. + fetched = fetcher.pkg_path + if (yield fetcher.async_wait()) != os.EX_OK: failures += 1 + self._record_pkg_failure(x, settings, fetcher.returncode) continue - fetched = fetcher.pkg_path if fetched is False: filename = bintree.getname(x.cpv) @@ -861,8 +879,9 @@ class Scheduler(PollScheduler): scheduler=sched_iface, _pkg_path=filename) current_task = verifier verifier.start() - if verifier.wait() != os.EX_OK: + if (yield verifier.async_wait()) != os.EX_OK: failures += 1 + self._record_pkg_failure(x, settings, verifier.returncode) continue if fetched: @@ -870,8 +889,7 @@ class Scheduler(PollScheduler): infloc = os.path.join(build_dir_path, "build-info") ensure_dirs(infloc) - self._sched_iface.run_until_complete( - bintree.dbapi.unpack_metadata(settings, infloc, loop=self._sched_iface)) + yield bintree.dbapi.unpack_metadata(settings, infloc, loop=loop) ebuild_path = os.path.join(infloc, x.pf + ".ebuild") settings.configdict["pkg"]["EMERGE_FROM"] = "binary" settings.configdict["pkg"]["MERGE_TYPE"] = "binary" @@ -905,28 +923,45 @@ class Scheduler(PollScheduler): current_task = pretend_phase pretend_phase.start() - ret = pretend_phase.wait() + ret = yield pretend_phase.async_wait() if ret != os.EX_OK: failures += 1 + self._record_pkg_failure(x, settings, ret) portage.elog.elog_process(x.cpv, settings) finally: if current_task is not None: if current_task.isAlive(): current_task.cancel() - current_task.wait() if current_task.returncode == os.EX_OK: clean_phase = EbuildPhase(background=False, phase='clean', scheduler=sched_iface, settings=settings) clean_phase.start() - clean_phase.wait() + yield clean_phase.async_wait() - sched_iface.run_until_complete(build_dir.async_unlock()) + yield build_dir.async_unlock() + self._deallocate_config(settings) if failures: - return FAILURE - return os.EX_OK + return coroutine_return(FAILURE) + coroutine_return(os.EX_OK) + + def _record_pkg_failure(self, pkg, settings, ret): + """Record a package failure. This eliminates the package + from the --keep-going merge list, and immediately calls + _failed_pkg_msg if we have not been terminated.""" + self._failed_pkgs.append( + self._failed_pkg( + build_dir=settings.get("PORTAGE_BUILDDIR"), + build_log=settings.get("PORTAGE_LOG_FILE"), + pkg=pkg, + returncode=ret, + ) + ) + if not self._terminated_tasks: + self._failed_pkg_msg(self._failed_pkgs[-1], "emerge", "for") + self._status_display.failed = len(self._failed_pkgs) def merge(self): if "--resume" in self.myopts: @@ -988,11 +1023,6 @@ class Scheduler(PollScheduler): if rval != os.EX_OK and not keep_going: return rval - if not fetchonly: - rval = self._run_pkg_pretend() - if rval != os.EX_OK: - return rval - while True: received_signal = [] @@ -1389,8 +1419,6 @@ class Scheduler(PollScheduler): if self._opts_no_background.intersection(self.myopts): self._set_max_jobs(1) - self._add_prefetchers() - self._add_packages() failed_pkgs = self._failed_pkgs portage.locks._quiet = self._background portage.elog.add_listener(self._elog_listener) @@ -1406,6 +1434,30 @@ class Scheduler(PollScheduler): rval = os.EX_OK try: + self._add_prefetchers() + if not self._build_opts.fetchonly: + # Run pkg_pretend concurrently with parallel-fetch, and be careful + # to respond appropriately to termination, so that we don't start + # any new tasks after we've been terminated. Temporarily make the + # status display quiet so that its output is not interleaved with + # pkg_pretend output. + status_quiet = self._status_display.quiet + self._status_display.quiet = True + try: + rval = self._sched_iface.run_until_complete( + self._run_pkg_pretend(loop=self._sched_iface) + ) + except asyncio.CancelledError: + self.terminate() + finally: + self._status_display.quiet = status_quiet + self._termination_check() + if self._terminated_tasks: + rval = 128 + signal.SIGINT + if rval != os.EX_OK: + return rval + + self._add_packages() self._main_loop() finally: self._main_loop_cleanup() @@ -1742,6 +1794,23 @@ class Scheduler(PollScheduler): return bool(state_change) + def _get_prefetcher(self, pkg): + try: + prefetcher = self._prefetchers.pop(pkg, None) + except KeyError: + # KeyError observed with PyPy 1.8, despite None given as default. + # Note that PyPy 1.8 has the same WeakValueDictionary code as + # CPython 2.7, so it may be possible for CPython to raise KeyError + # here as well. + prefetcher = None + if prefetcher is not None and not prefetcher.isAlive(): + try: + self._task_queues.fetch._task_queue.remove(prefetcher) + except ValueError: + pass + prefetcher = None + return prefetcher + def _task(self, pkg): pkg_to_replace = None @@ -1758,20 +1827,7 @@ class Scheduler(PollScheduler): "installed", pkg.root_config, installed=True, operation="uninstall") - try: - prefetcher = self._prefetchers.pop(pkg, None) - except KeyError: - # KeyError observed with PyPy 1.8, despite None given as default. - # Note that PyPy 1.8 has the same WeakValueDictionary code as - # CPython 2.7, so it may be possible for CPython to raise KeyError - # here as well. - prefetcher = None - if prefetcher is not None and not prefetcher.isAlive(): - try: - self._task_queues.fetch._task_queue.remove(prefetcher) - except ValueError: - pass - prefetcher = None + prefetcher = self._get_prefetcher(pkg) task = MergeListItem(args_set=self._args_set, background=self._background, binpkg_opts=self._binpkg_opts, -- 2.25.3