Execute pkg_pretend phases in a coroutine while parallel-fetch
is running concurrently. When it's time to execute the pkg_pretend
phase for a remote binary package, use a Scheduler _get_prefetcher
method to get a running prefetcher if available, and otherwise
start a new fetcher.

Since pkg_pretend phases now run inside of the --keep-going retry
loop, --keep-going is now able to recover from pkg_pretend
failures, which fixes bug 404157.

Bug: https://bugs.gentoo.org/404157
Bug: https://bugs.gentoo.org/710432
Signed-off-by: Zac Medico <zmed...@gentoo.org>
---
[PATCH v2] records failed packages for correct interaction with
emerge --keep-going, which fixes bug 404157

 lib/_emerge/Scheduler.py | 142 +++++++++++++++++++++++++++------------
 1 file changed, 99 insertions(+), 43 deletions(-)

diff --git a/lib/_emerge/Scheduler.py b/lib/_emerge/Scheduler.py
index a69421288..465f928a0 100644
--- a/lib/_emerge/Scheduler.py
+++ b/lib/_emerge/Scheduler.py
@@ -25,6 +25,7 @@ from portage._sets import SETPREFIX
 from portage._sets.base import InternalPackageSet
 from portage.util import ensure_dirs, writemsg, writemsg_level
 from portage.util.futures import asyncio
+from portage.util.futures.compat_coroutine import coroutine, coroutine_return
 from portage.util.SlotObject import SlotObject
 from portage.util._async.SchedulerInterface import SchedulerInterface
 from portage.package.ebuild.digestcheck import digestcheck
@@ -766,7 +767,8 @@ class Scheduler(PollScheduler):
 
                return prefetcher
 
-       def _run_pkg_pretend(self):
+       @coroutine
+       def _run_pkg_pretend(self, loop=None):
                """
                Since pkg_pretend output may be important, this method sends all
                output directly to stdout (regardless of options like --quiet or
@@ -774,7 +776,7 @@ class Scheduler(PollScheduler):
                """
 
                failures = 0
-               sched_iface = self._sched_iface
+               sched_iface = loop = asyncio._wrap_loop(loop or 
self._sched_iface)
 
                for x in self._mergelist:
                        if not isinstance(x, Package):
@@ -789,18 +791,28 @@ class Scheduler(PollScheduler):
                        if "pretend" not in x.defined_phases:
                                continue
 
-                       out_str =">>> Running pre-merge checks for " + 
colorize("INFORM", x.cpv) + "\n"
-                       portage.util.writemsg_stdout(out_str, noiselevel=-1)
+                       out_str = "Running pre-merge checks for " + 
colorize("INFORM", x.cpv)
+                       self._status_msg(out_str)
 
                        root_config = x.root_config
-                       settings = self.pkgsettings[root_config.root]
+                       settings = self._allocate_config(root_config.root)
                        settings.setcpv(x)
+                       if not x.built:
+                               # Get required SRC_URI metadata (it's not 
cached in x.metadata
+                               # because some packages have an extremely large 
SRC_URI value).
+                               portdb = root_config.trees["porttree"].dbapi
+                               (settings.configdict["pkg"]["SRC_URI"],) = 
yield portdb.async_aux_get(
+                                       x.cpv, ["SRC_URI"], myrepo=x.repo, 
loop=loop
+                               )
 
                        # setcpv/package.env allows for per-package 
PORTAGE_TMPDIR so we
                        # have to validate it for each package
                        rval = _check_temp_dir(settings)
                        if rval != os.EX_OK:
-                               return rval
+                               failures += 1
+                               self._record_pkg_failure(x, settings, FAILURE)
+                               self._deallocate_config(settings)
+                               continue
 
                        build_dir_path = os.path.join(
                                os.path.realpath(settings["PORTAGE_TMPDIR"]),
@@ -809,7 +821,7 @@ class Scheduler(PollScheduler):
                        settings["PORTAGE_BUILDDIR"] = build_dir_path
                        build_dir = EbuildBuildDir(scheduler=sched_iface,
                                settings=settings)
-                       sched_iface.run_until_complete(build_dir.async_lock())
+                       yield build_dir.async_lock()
                        current_task = None
 
                        try:
@@ -835,7 +847,7 @@ class Scheduler(PollScheduler):
                                                phase='clean', 
scheduler=sched_iface, settings=settings)
                                        current_task = clean_phase
                                        clean_phase.start()
-                                       clean_phase.wait()
+                                       yield clean_phase.async_wait()
 
                                if x.built:
                                        tree = "bintree"
@@ -845,13 +857,19 @@ class Scheduler(PollScheduler):
                                        # Display fetch on stdout, so that it's 
always clear what
                                        # is consuming time here.
                                        if bintree.isremote(x.cpv):
-                                               fetcher = BinpkgFetcher(pkg=x,
-                                                       scheduler=sched_iface)
-                                               fetcher.start()
-                                               if fetcher.wait() != os.EX_OK:
+                                               fetcher = 
self._get_prefetcher(x)
+                                               if fetcher is None:
+                                                       fetcher = 
BinpkgFetcher(pkg=x, scheduler=loop)
+                                                       fetcher.start()
+                                                       # We only set the 
fetched value when fetcher
+                                                       # is a BinpkgFetcher, 
since BinpkgPrefetcher
+                                                       # handles fetch, 
verification, and the
+                                                       # bintree.inject call 
which moves the file.
+                                                       fetched = 
fetcher.pkg_path
+                                               if (yield fetcher.async_wait()) 
!= os.EX_OK:
                                                        failures += 1
+                                                       
self._record_pkg_failure(x, settings, fetcher.returncode)
                                                        continue
-                                               fetched = fetcher.pkg_path
 
                                        if fetched is False:
                                                filename = 
bintree.getname(x.cpv)
@@ -861,8 +879,9 @@ class Scheduler(PollScheduler):
                                                scheduler=sched_iface, 
_pkg_path=filename)
                                        current_task = verifier
                                        verifier.start()
-                                       if verifier.wait() != os.EX_OK:
+                                       if (yield verifier.async_wait()) != 
os.EX_OK:
                                                failures += 1
+                                               self._record_pkg_failure(x, 
settings, verifier.returncode)
                                                continue
 
                                        if fetched:
@@ -870,8 +889,7 @@ class Scheduler(PollScheduler):
 
                                        infloc = os.path.join(build_dir_path, 
"build-info")
                                        ensure_dirs(infloc)
-                                       self._sched_iface.run_until_complete(
-                                               
bintree.dbapi.unpack_metadata(settings, infloc, loop=self._sched_iface))
+                                       yield 
bintree.dbapi.unpack_metadata(settings, infloc, loop=loop)
                                        ebuild_path = os.path.join(infloc, x.pf 
+ ".ebuild")
                                        
settings.configdict["pkg"]["EMERGE_FROM"] = "binary"
                                        
settings.configdict["pkg"]["MERGE_TYPE"] = "binary"
@@ -905,28 +923,45 @@ class Scheduler(PollScheduler):
 
                                current_task = pretend_phase
                                pretend_phase.start()
-                               ret = pretend_phase.wait()
+                               ret = yield pretend_phase.async_wait()
                                if ret != os.EX_OK:
                                        failures += 1
+                                       self._record_pkg_failure(x, settings, 
ret)
                                portage.elog.elog_process(x.cpv, settings)
                        finally:
 
                                if current_task is not None:
                                        if current_task.isAlive():
                                                current_task.cancel()
-                                               current_task.wait()
                                        if current_task.returncode == os.EX_OK:
                                                clean_phase = 
EbuildPhase(background=False,
                                                        phase='clean', 
scheduler=sched_iface,
                                                        settings=settings)
                                                clean_phase.start()
-                                               clean_phase.wait()
+                                               yield clean_phase.async_wait()
 
-                               
sched_iface.run_until_complete(build_dir.async_unlock())
+                               yield build_dir.async_unlock()
+                               self._deallocate_config(settings)
 
                if failures:
-                       return FAILURE
-               return os.EX_OK
+                       return coroutine_return(FAILURE)
+               coroutine_return(os.EX_OK)
+
+       def _record_pkg_failure(self, pkg, settings, ret):
+               """Record a package failure. This eliminates the package
+               from the --keep-going merge list, and immediately calls
+               _failed_pkg_msg if we have not been terminated."""
+               self._failed_pkgs.append(
+                       self._failed_pkg(
+                               build_dir=settings.get("PORTAGE_BUILDDIR"),
+                               build_log=settings.get("PORTAGE_LOG_FILE"),
+                               pkg=pkg,
+                               returncode=ret,
+                       )
+               )
+               if not self._terminated_tasks:
+                       self._failed_pkg_msg(self._failed_pkgs[-1], "emerge", 
"for")
+                       self._status_display.failed = len(self._failed_pkgs)
 
        def merge(self):
                if "--resume" in self.myopts:
@@ -988,11 +1023,6 @@ class Scheduler(PollScheduler):
                if rval != os.EX_OK and not keep_going:
                        return rval
 
-               if not fetchonly:
-                       rval = self._run_pkg_pretend()
-                       if rval != os.EX_OK:
-                               return rval
-
                while True:
 
                        received_signal = []
@@ -1389,8 +1419,6 @@ class Scheduler(PollScheduler):
                if self._opts_no_background.intersection(self.myopts):
                        self._set_max_jobs(1)
 
-               self._add_prefetchers()
-               self._add_packages()
                failed_pkgs = self._failed_pkgs
                portage.locks._quiet = self._background
                portage.elog.add_listener(self._elog_listener)
@@ -1406,6 +1434,30 @@ class Scheduler(PollScheduler):
                rval = os.EX_OK
 
                try:
+                       self._add_prefetchers()
+                       if not self._build_opts.fetchonly:
+                               # Run pkg_pretend concurrently with 
parallel-fetch, and be careful
+                               # to respond appropriately to termination, so 
that we don't start
+                               # any new tasks after we've been terminated. 
Temporarily make the
+                               # status display quiet so that its output is 
not interleaved with
+                               # pkg_pretend output.
+                               status_quiet = self._status_display.quiet
+                               self._status_display.quiet = True
+                               try:
+                                       rval = 
self._sched_iface.run_until_complete(
+                                               
self._run_pkg_pretend(loop=self._sched_iface)
+                                       )
+                               except asyncio.CancelledError:
+                                       self.terminate()
+                               finally:
+                                       self._status_display.quiet = 
status_quiet
+                               self._termination_check()
+                               if self._terminated_tasks:
+                                       rval = 128 + signal.SIGINT
+                               if rval != os.EX_OK:
+                                       return rval
+
+                       self._add_packages()
                        self._main_loop()
                finally:
                        self._main_loop_cleanup()
@@ -1742,6 +1794,23 @@ class Scheduler(PollScheduler):
 
                return bool(state_change)
 
+       def _get_prefetcher(self, pkg):
+               try:
+                       prefetcher = self._prefetchers.pop(pkg, None)
+               except KeyError:
+                       # KeyError observed with PyPy 1.8, despite None given 
as default.
+                       # Note that PyPy 1.8 has the same WeakValueDictionary 
code as
+                       # CPython 2.7, so it may be possible for CPython to 
raise KeyError
+                       # here as well.
+                       prefetcher = None
+               if prefetcher is not None and not prefetcher.isAlive():
+                       try:
+                               
self._task_queues.fetch._task_queue.remove(prefetcher)
+                       except ValueError:
+                               pass
+                       prefetcher = None
+               return prefetcher
+
        def _task(self, pkg):
 
                pkg_to_replace = None
@@ -1758,20 +1827,7 @@ class Scheduler(PollScheduler):
                                        "installed", pkg.root_config, 
installed=True,
                                        operation="uninstall")
 
-               try:
-                       prefetcher = self._prefetchers.pop(pkg, None)
-               except KeyError:
-                       # KeyError observed with PyPy 1.8, despite None given 
as default.
-                       # Note that PyPy 1.8 has the same WeakValueDictionary 
code as
-                       # CPython 2.7, so it may be possible for CPython to 
raise KeyError
-                       # here as well.
-                       prefetcher = None
-               if prefetcher is not None and not prefetcher.isAlive():
-                       try:
-                               
self._task_queues.fetch._task_queue.remove(prefetcher)
-                       except ValueError:
-                               pass
-                       prefetcher = None
+               prefetcher = self._get_prefetcher(pkg)
 
                task = MergeListItem(args_set=self._args_set,
                        background=self._background, 
binpkg_opts=self._binpkg_opts,
-- 
2.25.3


Reply via email to