commit:     9f96ce5105e7bd2580ae9acc34d6ebad914dae47
Author:     Zac Medico <zmedico <AT> gentoo <DOT> org>
AuthorDate: Sat Feb  3 19:36:05 2024 +0000
Commit:     Zac Medico <zmedico <AT> gentoo <DOT> org>
CommitDate: Sat Feb  3 19:55:19 2024 +0000
URL:        https://gitweb.gentoo.org/proj/portage.git/commit/?id=9f96ce51

ForkProcess: Use duplicate fd_pipes in _send_fd_pipes thread

In order to allow callers to manage the lifecycle of fd_pipes
file descriptors, create duplicates for _send_fd_pipes to
close when it has finished sending them.

This fixes bug 916601 in a nice way, allowing commit
3b1234ba69a31709cd5aec1ae070901e3a28bb7c to be reverted.

Bug: https://bugs.gentoo.org/916601
Signed-off-by: Zac Medico <zmedico <AT> gentoo.org>

 lib/portage/util/_async/ForkProcess.py | 55 +++++++++++++++++++++++++++++-----
 1 file changed, 48 insertions(+), 7 deletions(-)

diff --git a/lib/portage/util/_async/ForkProcess.py 
b/lib/portage/util/_async/ForkProcess.py
index 711bd2a7ba..3acbe34fc6 100644
--- a/lib/portage/util/_async/ForkProcess.py
+++ b/lib/portage/util/_async/ForkProcess.py
@@ -24,6 +24,8 @@ class ForkProcess(SpawnProcess):
         "kwargs",
         "target",
         "_child_connection",
+        # Duplicate file descriptors for use by _send_fd_pipes background 
thread.
+        "_fd_pipes",
     )
 
     _file_names = ("connection", "slave_fd")
@@ -53,7 +55,13 @@ class ForkProcess(SpawnProcess):
                 duplex=self._HAVE_SEND_HANDLE
             )
 
-        self._proc = self._spawn(self.args, fd_pipes=self.fd_pipes)
+        # Handle fd_pipes in _main instead, since file descriptors are
+        # not inherited with the multiprocessing "spawn" start method.
+        # Pass fd_pipes=None to spawn here so that it doesn't leave
+        # a closed stdin duplicate in fd_pipes (that would trigger
+        # "Bad file descriptor" error if we tried to send it via
+        # send_handle).
+        self._proc = self._spawn(self.args, fd_pipes=None)
 
         self._registered = True
 
@@ -74,6 +82,25 @@ class ForkProcess(SpawnProcess):
                 self.fd_pipes[1] = slave_fd
                 self.fd_pipes[2] = slave_fd
                 self._files = self._files_dict(connection=connection, 
slave_fd=slave_fd)
+
+                # Create duplicate file descriptors in self._fd_pipes
+                # so that the caller is free to manage the lifecycle
+                # of the original fd_pipes.
+                self._fd_pipes = {}
+                fd_map = {}
+                for dest, src in list(self.fd_pipes.items()):
+                    if src not in fd_map:
+                        src_new = fd_map[src] = os.dup(src)
+                        old_fdflags = fcntl.fcntl(src, fcntl.F_GETFD)
+                        fcntl.fcntl(src_new, fcntl.F_SETFD, old_fdflags)
+                        os.set_inheritable(
+                            src_new, not bool(old_fdflags & fcntl.FD_CLOEXEC)
+                        )
+                    self._fd_pipes[dest] = fd_map[src]
+
+                asyncio.ensure_future(
+                    self._proc.wait(), self.scheduler
+                ).add_done_callback(self._close_fd_pipes)
             else:
                 master_fd = connection
 
@@ -81,6 +108,19 @@ class ForkProcess(SpawnProcess):
                 master_fd, log_file_path=self.logfile, stdout_fd=stdout_fd
             )
 
+    def _close_fd_pipes(self, future):
+        """
+        Cleanup self._fd_pipes if needed, since _send_fd_pipes could
+        have been cancelled.
+        """
+        # future.result() raises asyncio.CancelledError if
+        # future.cancelled(), but that should not happen.
+        future.result()
+        if self._fd_pipes is not None:
+            for fd in set(self._fd_pipes.values()):
+                os.close(fd)
+            self._fd_pipes = None
+
     @property
     def _fd_pipes_send_handle(self):
         """Returns True if we have a connection to implement fd_pipes via 
send_handle."""
@@ -95,9 +135,9 @@ class ForkProcess(SpawnProcess):
         Communicate with _bootstrap to send fd_pipes via send_handle.
         This performs blocking IO, intended for invocation via run_in_executor.
         """
-        fd_list = list(set(self.fd_pipes.values()))
+        fd_list = list(set(self._fd_pipes.values()))
         self._files.connection.send(
-            (self.fd_pipes, fd_list),
+            (self._fd_pipes, fd_list),
         )
         for fd in fd_list:
             multiprocessing.reduction.send_handle(
@@ -106,6 +146,11 @@ class ForkProcess(SpawnProcess):
                 self.pid,
             )
 
+        # self._fd_pipes contains duplicates that must be closed.
+        for fd in fd_list:
+            os.close(fd)
+        self._fd_pipes = None
+
     async def _main(self, build_logger, pipe_logger, loop=None):
         try:
             if self._fd_pipes_send_handle:
@@ -167,10 +212,6 @@ class ForkProcess(SpawnProcess):
                 )
                 fd_pipes[0] = stdin_dup
 
-            if self._fd_pipes_send_handle:
-                # Handle fd_pipes in _main instead.
-                fd_pipes = None
-
             proc = multiprocessing.Process(
                 target=self._bootstrap,
                 args=(

Reply via email to