Replace os.fork with multiprocessing.Process, in order to leverage any pre-fork and post-fork interpreter housekeeping that it provides, promoting a healthy state for the forked interpreter.
Since multiprocessing.Process closes sys.__stdin__, fix relevant code to use the portage._get_stdin() compatibility function. In case there's a legitimate need to inherit stdin for things like PROPERTIES=interactive support, create a temporary duplicate of fd_pipes[0] when appropriate, and restore sys.stdin and sys.__stdin__ in the subprocess. Bug: https://bugs.gentoo.org/730192 Signed-off-by: Zac Medico <zmed...@gentoo.org> --- [PATCH v2] * Use sentinel for all python versions * Add _proc_join coroutine for non-blocking join lib/portage/process.py | 4 +- lib/portage/sync/controller.py | 4 +- lib/portage/util/_async/ForkProcess.py | 146 +++++++++++++++++++------ 3 files changed, 119 insertions(+), 35 deletions(-) diff --git a/lib/portage/process.py b/lib/portage/process.py index 6af668db4..b7316c89d 100644 --- a/lib/portage/process.py +++ b/lib/portage/process.py @@ -1,5 +1,5 @@ # portage.py -- core Portage functionality -# Copyright 1998-2019 Gentoo Authors +# Copyright 1998-2020 Gentoo Authors # Distributed under the terms of the GNU General Public License v2 @@ -107,7 +107,7 @@ def sanitize_fds(): if _set_inheritable is not None: whitelist = frozenset([ - sys.__stdin__.fileno(), + portage._get_stdin().fileno(), sys.__stdout__.fileno(), sys.__stderr__.fileno(), ]) diff --git a/lib/portage/sync/controller.py b/lib/portage/sync/controller.py index c4c72e73a..43bb5d520 100644 --- a/lib/portage/sync/controller.py +++ b/lib/portage/sync/controller.py @@ -1,4 +1,4 @@ -# Copyright 2014-2019 Gentoo Authors +# Copyright 2014-2020 Gentoo Authors # Distributed under the terms of the GNU General Public License v2 from __future__ import print_function @@ -231,7 +231,7 @@ class SyncManager(object): # Redirect command stderr to stdout, in order to prevent # spurious cron job emails (bug 566132). spawn_kwargs["fd_pipes"] = { - 0: sys.__stdin__.fileno(), + 0: portage._get_stdin().fileno(), 1: sys.__stdout__.fileno(), 2: sys.__stdout__.fileno() } diff --git a/lib/portage/util/_async/ForkProcess.py b/lib/portage/util/_async/ForkProcess.py index d84e93833..eb01a6232 100644 --- a/lib/portage/util/_async/ForkProcess.py +++ b/lib/portage/util/_async/ForkProcess.py @@ -1,37 +1,123 @@ -# Copyright 2012-2013 Gentoo Foundation +# Copyright 2012-2020 Gentoo Authors # Distributed under the terms of the GNU General Public License v2 +import fcntl +import functools +import multiprocessing import signal import sys -import traceback import portage from portage import os +from portage.util.futures import asyncio +from portage.util.futures.compat_coroutine import coroutine from _emerge.SpawnProcess import SpawnProcess class ForkProcess(SpawnProcess): - __slots__ = () + __slots__ = ('_proc', '_proc_join_task') + + # Number of seconds between poll attempts for process exit status + # (after the sentinel has become ready). + _proc_join_interval = 0.1 def _spawn(self, args, fd_pipes=None, **kwargs): """ - Fork a subprocess, apply local settings, and call fetch(). + Override SpawnProcess._spawn to fork a subprocess that calls + self._run(). This uses multiprocessing.Process in order to leverage + any pre-fork and post-fork interpreter housekeeping that it provides, + promoting a healthy state for the forked interpreter. """ - - parent_pid = os.getpid() - pid = None + # Since multiprocessing.Process closes sys.__stdin__, create a + # temporary duplicate of fd_pipes[0] so that sys.__stdin__ can + # be restored in the subprocess, in case this is needed for + # things like PROPERTIES=interactive support. + stdin_dup = None try: - pid = os.fork() + stdin_fd = fd_pipes.get(0) + if stdin_fd is not None and stdin_fd == portage._get_stdin().fileno(): + stdin_dup = os.dup(stdin_fd) + fcntl.fcntl(stdin_dup, fcntl.F_SETFD, + fcntl.fcntl(stdin_fd, fcntl.F_GETFD)) + fd_pipes[0] = stdin_dup + self._proc = multiprocessing.Process(target=self._bootstrap, args=(fd_pipes,)) + self._proc.start() + finally: + if stdin_dup is not None: + os.close(stdin_dup) + + self._proc_join_task = asyncio.ensure_future( + self._proc_join(self._proc)) + self._proc_join_task.add_done_callback( + functools.partial(self._proc_join_done, self._proc)) + + return [self._proc.pid] + + def _cancel(self): + if self._proc is None: + super(ForkProcess, self)._cancel() + else: + self._proc.terminate() + + def _async_wait(self): + if self._proc_join_task is None: + super(ForkProcess, self)._async_wait() - if pid != 0: - if not isinstance(pid, int): - raise AssertionError( - "fork returned non-integer: %s" % (repr(pid),)) - return [pid] + def _async_waitpid(self): + if self._proc_join_task is None: + super(ForkProcess, self)._async_waitpid() - rval = 1 + @coroutine + def _proc_join(self, proc): + sentinel_reader = self.scheduler.create_future() + self.scheduler.add_reader(proc.sentinel, + lambda: sentinel_reader.done() or sentinel_reader.set_result(None)) + try: + yield sentinel_reader + finally: + # If multiprocessing.Process supports the close method, then + # access to proc.sentinel will raise ValueError if the + # sentinel has been closed. In this case it's not safe to call + # remove_reader, since the file descriptor may have been closed + # and then reallocated to a concurrent coroutine. When the + # close method is not supported, proc.sentinel remains open + # until proc's finalizer is called. try: + self.scheduler.remove_reader(proc.sentinel) + except ValueError: + pass + + # Now that proc.sentinel is ready, poll until process exit + # status has become available. + while True: + proc.join(0) + if proc.exitcode is not None: + break + yield asyncio.sleep(self._proc_join_interval) + + def _proc_join_done(self, proc, future): + future.cancelled() or future.result() + self._was_cancelled() + if self.returncode is None: + self.returncode = proc.exitcode + self._proc = None + if hasattr(proc, 'close'): + proc.close() + self._proc_join_task = None + self._async_wait() + + def _unregister(self): + super(ForkProcess, self)._unregister() + if self._proc is not None: + if self._proc.is_alive(): + self._proc.terminate() + self._proc = None + if self._proc_join_task is not None: + self._proc_join_task.cancel() + self._proc_join_task = None + + def _bootstrap(self, fd_pipes): # Use default signal handlers in order to avoid problems # killing subprocesses as reported in bug #353239. signal.signal(signal.SIGINT, signal.SIG_DFL) @@ -52,24 +138,22 @@ class ForkProcess(SpawnProcess): # (see _setup_pipes docstring). portage.process._setup_pipes(fd_pipes, close_fds=False) - rval = self._run() - except SystemExit: - raise - except: - traceback.print_exc() - # os._exit() skips stderr flush! - sys.stderr.flush() - finally: - os._exit(rval) + # Since multiprocessing.Process closes sys.__stdin__ and + # makes sys.stdin refer to os.devnull, restore it when + # appropriate. + if 0 in fd_pipes: + # It's possible that sys.stdin.fileno() is already 0, + # and in that case the above _setup_pipes call will + # have already updated its identity via dup2. Otherwise, + # perform the dup2 call now, and also copy the file + # descriptor flags. + if sys.stdin.fileno() != 0: + os.dup2(0, sys.stdin.fileno()) + fcntl.fcntl(sys.stdin.fileno(), fcntl.F_SETFD, + fcntl.fcntl(0, fcntl.F_GETFD)) + sys.__stdin__ = sys.stdin - finally: - if pid == 0 or (pid is None and os.getpid() != parent_pid): - # Call os._exit() from a finally block in order - # to suppress any finally blocks from earlier - # in the call stack (see bug #345289). This - # finally block has to be setup before the fork - # in order to avoid a race condition. - os._exit(1) + sys.exit(self._run()) def _run(self): raise NotImplementedError(self) -- 2.25.3