commit: 9f96ce5105e7bd2580ae9acc34d6ebad914dae47 Author: Zac Medico <zmedico <AT> gentoo <DOT> org> AuthorDate: Sat Feb 3 19:36:05 2024 +0000 Commit: Zac Medico <zmedico <AT> gentoo <DOT> org> CommitDate: Sat Feb 3 19:55:19 2024 +0000 URL: https://gitweb.gentoo.org/proj/portage.git/commit/?id=9f96ce51
ForkProcess: Use duplicate fd_pipes in _send_fd_pipes thread In order to allow callers to manage the lifecycle of fd_pipes file descriptors, create duplicates for _send_fd_pipes to close when it has finished sending them. This fixes bug 916601 in a nice way, allowing commit 3b1234ba69a31709cd5aec1ae070901e3a28bb7c to be reverted. Bug: https://bugs.gentoo.org/916601 Signed-off-by: Zac Medico <zmedico <AT> gentoo.org> lib/portage/util/_async/ForkProcess.py | 55 +++++++++++++++++++++++++++++----- 1 file changed, 48 insertions(+), 7 deletions(-) diff --git a/lib/portage/util/_async/ForkProcess.py b/lib/portage/util/_async/ForkProcess.py index 711bd2a7ba..3acbe34fc6 100644 --- a/lib/portage/util/_async/ForkProcess.py +++ b/lib/portage/util/_async/ForkProcess.py @@ -24,6 +24,8 @@ class ForkProcess(SpawnProcess): "kwargs", "target", "_child_connection", + # Duplicate file descriptors for use by _send_fd_pipes background thread. + "_fd_pipes", ) _file_names = ("connection", "slave_fd") @@ -53,7 +55,13 @@ class ForkProcess(SpawnProcess): duplex=self._HAVE_SEND_HANDLE ) - self._proc = self._spawn(self.args, fd_pipes=self.fd_pipes) + # Handle fd_pipes in _main instead, since file descriptors are + # not inherited with the multiprocessing "spawn" start method. + # Pass fd_pipes=None to spawn here so that it doesn't leave + # a closed stdin duplicate in fd_pipes (that would trigger + # "Bad file descriptor" error if we tried to send it via + # send_handle). + self._proc = self._spawn(self.args, fd_pipes=None) self._registered = True @@ -74,6 +82,25 @@ class ForkProcess(SpawnProcess): self.fd_pipes[1] = slave_fd self.fd_pipes[2] = slave_fd self._files = self._files_dict(connection=connection, slave_fd=slave_fd) + + # Create duplicate file descriptors in self._fd_pipes + # so that the caller is free to manage the lifecycle + # of the original fd_pipes. + self._fd_pipes = {} + fd_map = {} + for dest, src in list(self.fd_pipes.items()): + if src not in fd_map: + src_new = fd_map[src] = os.dup(src) + old_fdflags = fcntl.fcntl(src, fcntl.F_GETFD) + fcntl.fcntl(src_new, fcntl.F_SETFD, old_fdflags) + os.set_inheritable( + src_new, not bool(old_fdflags & fcntl.FD_CLOEXEC) + ) + self._fd_pipes[dest] = fd_map[src] + + asyncio.ensure_future( + self._proc.wait(), self.scheduler + ).add_done_callback(self._close_fd_pipes) else: master_fd = connection @@ -81,6 +108,19 @@ class ForkProcess(SpawnProcess): master_fd, log_file_path=self.logfile, stdout_fd=stdout_fd ) + def _close_fd_pipes(self, future): + """ + Cleanup self._fd_pipes if needed, since _send_fd_pipes could + have been cancelled. + """ + # future.result() raises asyncio.CancelledError if + # future.cancelled(), but that should not happen. + future.result() + if self._fd_pipes is not None: + for fd in set(self._fd_pipes.values()): + os.close(fd) + self._fd_pipes = None + @property def _fd_pipes_send_handle(self): """Returns True if we have a connection to implement fd_pipes via send_handle.""" @@ -95,9 +135,9 @@ class ForkProcess(SpawnProcess): Communicate with _bootstrap to send fd_pipes via send_handle. This performs blocking IO, intended for invocation via run_in_executor. """ - fd_list = list(set(self.fd_pipes.values())) + fd_list = list(set(self._fd_pipes.values())) self._files.connection.send( - (self.fd_pipes, fd_list), + (self._fd_pipes, fd_list), ) for fd in fd_list: multiprocessing.reduction.send_handle( @@ -106,6 +146,11 @@ class ForkProcess(SpawnProcess): self.pid, ) + # self._fd_pipes contains duplicates that must be closed. + for fd in fd_list: + os.close(fd) + self._fd_pipes = None + async def _main(self, build_logger, pipe_logger, loop=None): try: if self._fd_pipes_send_handle: @@ -167,10 +212,6 @@ class ForkProcess(SpawnProcess): ) fd_pipes[0] = stdin_dup - if self._fd_pipes_send_handle: - # Handle fd_pipes in _main instead. - fd_pipes = None - proc = multiprocessing.Process( target=self._bootstrap, args=(
