Handle the case where the loop has not been explicitly closed before
exit. In this case, run_exitfuncs previously tried to call coroutine
functions as though they were normal functions, which obvously would
not behave correctly.

Solve this problem by storing the coroutine functions in a separate
_coroutine_exithandlers list that belongs exclusively to the
run_coroutine_exitfuncs function, so that it is safe to close the
loop and call run_coroutine_exitfuncs from inside a run_exitfuncs
hook. A _thread_weakrefs_atexit hook already exists that will close
weakly referenced loops. The _thread_weakrefs_atexit hook is now
fixed to release its lock when it closes a loop, since the same
lock may need to be re-acquired when run_coroutine_exitfuncs runs.

The included test case demonstrates that run_exitfuncs will run
via an atexit hook and correctly terminate the socks5 proxy in a
standalone program using the portage API (like eclean).

Due to a deadlock that will occur if an _exit_function atexit hook
from the multiprocessing module executes before run_exitfuncs, a
portage.process._atexit_register_run_exitfuncs() function needs to
be called in order to re-order the hooks after the first process
has been started via the multiprocessing module. The natural place
to call this is in the ForkProcess class, using a global variable
to trigger the call just once.

Fixes: c3ebdbb42e72 ("elog/mod_custom: Spawn processes in background")
Bug: https://bugs.gentoo.org/937384
Closes: https://github.com/gentoo/portage/pull/1366
Signed-off-by: Zac Medico <zmed...@gentoo.org>
---
 lib/portage/process.py                        | 37 +++++++++--
 lib/portage/tests/__init__.py                 | 19 ++++++
 lib/portage/tests/util/test_socks5.py         | 66 +++++++++++++++++--
 lib/portage/util/_async/ForkProcess.py        |  8 +++
 lib/portage/util/futures/_asyncio/__init__.py | 24 ++++---
 5 files changed, 133 insertions(+), 21 deletions(-)

diff --git a/lib/portage/process.py b/lib/portage/process.py
index 6e4e0d7162..23e2507b53 100644
--- a/lib/portage/process.py
+++ b/lib/portage/process.py
@@ -3,6 +3,7 @@
 # Distributed under the terms of the GNU General Public License v2
 
 
+import asyncio as _asyncio
 import atexit
 import errno
 import fcntl
@@ -193,6 +194,7 @@ def spawn_fakeroot(mycommand, fakeroot_state=None, 
opt_name=None, **keywords):
 
 
 _exithandlers = []
+_coroutine_exithandlers = []
 
 
 def atexit_register(func, *args, **kargs):
@@ -200,7 +202,12 @@ def atexit_register(func, *args, **kargs):
     what is registered.  For example, when portage restarts itself via
     os.execv, the atexit module does not work so we have to do it
     manually by calling the run_exitfuncs() function in this module."""
-    _exithandlers.append((func, args, kargs))
+    # The internal asyncio wrapper module would trigger a circular import
+    # if used here.
+    if _asyncio.iscoroutinefunction(func):
+        _coroutine_exithandlers.append((func, args, kargs))
+    else:
+        _exithandlers.append((func, args, kargs))
 
 
 def run_exitfuncs():
@@ -232,12 +239,16 @@ async def run_coroutine_exitfuncs():
     This is the same as run_exitfuncs but it uses asyncio.iscoroutinefunction
     to check which functions to run. It is called by the AsyncioEventLoop
     _close_main method just before the loop is closed.
+
+    If the loop is explicitly closed before exit, then that will cause
+    run_coroutine_exitfuncs to run before run_exitfuncs. Otherwise, a
+    run_exitfuncs hook will close it, causing run_coroutine_exitfuncs to be
+    called via run_exitfuncs.
     """
     tasks = []
-    for index, (func, targs, kargs) in 
reversed(list(enumerate(_exithandlers))):
-        if asyncio.iscoroutinefunction(func):
-            del _exithandlers[index]
-            tasks.append(asyncio.ensure_future(func(*targs, **kargs)))
+    while _coroutine_exithandlers:
+        func, targs, kargs = _coroutine_exithandlers.pop()
+        tasks.append(asyncio.ensure_future(func(*targs, **kargs)))
     tracebacks = []
     exc_info = None
     for task in tasks:
@@ -255,7 +266,21 @@ async def run_coroutine_exitfuncs():
         raise exc_info[1].with_traceback(exc_info[2])
 
 
-atexit.register(run_exitfuncs)
+def _atexit_register_run_exitfuncs():
+    """
+    Register the run_exitfuncs atexit hook. If this hook is not called
+    before the multiprocessing module's _exit_function, then there will
+    be a deadlock. In order to prevent the deadlock, this function must
+    be called in order to re-order the hooks after the first process has
+    been started via the multiprocessing module. The natural place to
+    call this is in the ForkProcess class, though it should also be
+    called once before, in case the ForkProcess class is never called.
+    """
+    atexit.unregister(run_exitfuncs)
+    atexit.register(run_exitfuncs)
+
+
+_atexit_register_run_exitfuncs()
 
 # It used to be necessary for API consumers to remove pids from spawned_pids,
 # since otherwise it would accumulate a pids endlessly. Now, spawned_pids is
diff --git a/lib/portage/tests/__init__.py b/lib/portage/tests/__init__.py
index 23dd366d89..79373dfbb0 100644
--- a/lib/portage/tests/__init__.py
+++ b/lib/portage/tests/__init__.py
@@ -16,6 +16,7 @@ from portage import os
 from portage.util import no_color
 from portage import _encodings
 from portage import _unicode_decode
+from portage.const import PORTAGE_PYM_PATH
 from portage.output import colorize
 from portage.proxy.objectproxy import ObjectProxy
 
@@ -65,6 +66,24 @@ def cnf_sbindir():
     return os.path.join(portage.const.EPREFIX or "/", "usr", "sbin")
 
 
+def get_pythonpath():
+    """
+    Prefix current PYTHONPATH with PORTAGE_PYM_PATH, and normalize.
+    """
+    pythonpath = os.environ.get("PYTHONPATH")
+    if pythonpath is not None and not pythonpath.strip():
+        pythonpath = None
+    if pythonpath is not None and pythonpath.split(":")[0] == PORTAGE_PYM_PATH:
+        pass
+    else:
+        if pythonpath is None:
+            pythonpath = ""
+        else:
+            pythonpath = ":" + pythonpath
+        pythonpath = PORTAGE_PYM_PATH + pythonpath
+    return pythonpath
+
+
 class TestCase(unittest.TestCase):
     """
     We need a way to mark a unit test as "ok to fail"
diff --git a/lib/portage/tests/util/test_socks5.py 
b/lib/portage/tests/util/test_socks5.py
index e7bc2d699f..35f919d970 100644
--- a/lib/portage/tests/util/test_socks5.py
+++ b/lib/portage/tests/util/test_socks5.py
@@ -3,14 +3,16 @@
 
 import asyncio
 import functools
+import os
 import shutil
 import socket
 import struct
+import subprocess
 import tempfile
 import time
 
 import portage
-from portage.tests import TestCase
+from portage.tests import TestCase, get_pythonpath
 from portage.util import socks5
 from portage.util.futures.executor.fork import ForkExecutor
 from portage.util._eventloop.global_event_loop import global_event_loop
@@ -199,10 +201,10 @@ class Socks5ServerTestCase(TestCase):
         path = "/index.html"
         proxy = None
         tempdir = tempfile.mkdtemp()
-        previous_exithandlers = portage.process._exithandlers
+        previous_exithandlers = portage.process._coroutine_exithandlers
 
         try:
-            portage.process._exithandlers = []
+            portage.process._coroutine_exithandlers = []
             with AsyncHTTPServer(host, {path: content}, loop) as server:
                 settings = {
                     "PORTAGE_TMPDIR": tempdir,
@@ -225,11 +227,11 @@ class Socks5ServerTestCase(TestCase):
         finally:
             try:
                 # Also run_coroutine_exitfuncs to test atexit hook cleanup.
-                self.assertNotEqual(portage.process._exithandlers, [])
+                self.assertNotEqual(portage.process._coroutine_exithandlers, 
[])
                 await portage.process.run_coroutine_exitfuncs()
-                self.assertEqual(portage.process._exithandlers, [])
+                self.assertEqual(portage.process._coroutine_exithandlers, [])
             finally:
-                portage.process._exithandlers = previous_exithandlers
+                portage.process._coroutine_exithandlers = previous_exithandlers
                 shutil.rmtree(tempdir)
 
 
@@ -269,3 +271,55 @@ class Socks5ServerLoopCloseTestCase(TestCase):
             shutil.rmtree(tempdir)
 
         return not socks5.proxy.is_running()
+
+
+class Socks5ServerAtExitTestCase(TestCase):
+    """
+    For bug 937384, test that the socks5 proxy is automatically
+    terminated by portage.process.run_exitfuncs(), using a subprocess
+    for isolation.
+
+    Note that if the subprocess is created via fork then it will be
+    vulnerable to python issue 83856 which is only fixed in python3.13,
+    so this test uses python -c to ensure that atexit hooks will work.
+    """
+
+    def testSocks5ServerAtExit(self):
+        tempdir = tempfile.mkdtemp()
+        try:
+            env = os.environ.copy()
+            env["PYTHONPATH"] = get_pythonpath()
+            output = subprocess.check_output(
+                [
+                    portage._python_interpreter,
+                    "-c",
+                    """
+import sys
+
+from portage.const import PORTAGE_BIN_PATH
+from portage.util import socks5
+from portage.util._eventloop.global_event_loop import global_event_loop
+
+tempdir = sys.argv[0]
+loop = global_event_loop()
+
+settings = {
+    "PORTAGE_TMPDIR": tempdir,
+    "PORTAGE_BIN_PATH": PORTAGE_BIN_PATH,
+}
+
+socks5.get_socks5_proxy(settings)
+loop.run_until_complete(socks5.proxy.ready())
+print(socks5.proxy._proc.pid, flush=True)
+""",
+                    tempdir,
+                ],
+                env=env,
+            )
+
+            pid = int(output.strip())
+
+            with self.assertRaises(ProcessLookupError):
+                os.kill(pid, 0)
+        finally:
+            shutil.rmtree(tempdir)
diff --git a/lib/portage/util/_async/ForkProcess.py 
b/lib/portage/util/_async/ForkProcess.py
index e6cfdefb88..946978b301 100644
--- a/lib/portage/util/_async/ForkProcess.py
+++ b/lib/portage/util/_async/ForkProcess.py
@@ -15,6 +15,8 @@ from portage.cache.mappings import slot_dict_class
 from portage.util.futures import asyncio
 from _emerge.SpawnProcess import SpawnProcess
 
+_registered_run_exitfuncs = None
+
 
 class ForkProcess(SpawnProcess):
     # NOTE: This class overrides the meaning of the SpawnProcess 'args'
@@ -206,6 +208,8 @@ class ForkProcess(SpawnProcess):
         promoting a healthy state for the forked interpreter.
         """
 
+        global _registered_run_exitfuncs
+
         if self.__class__._run is ForkProcess._run:
             # target replaces the deprecated self._run method
             target = self.target
@@ -252,6 +256,10 @@ class ForkProcess(SpawnProcess):
             if stdin_dup is not None:
                 os.close(stdin_dup)
 
+        if _registered_run_exitfuncs != portage.getpid():
+            portage.process._atexit_register_run_exitfuncs()
+            _registered_run_exitfuncs = portage.getpid()
+
         return portage.process.MultiprocessingProcess(proc)
 
     def _cancel(self):
diff --git a/lib/portage/util/futures/_asyncio/__init__.py 
b/lib/portage/util/futures/_asyncio/__init__.py
index e377a9cdd1..8942bcb67e 100644
--- a/lib/portage/util/futures/_asyncio/__init__.py
+++ b/lib/portage/util/futures/_asyncio/__init__.py
@@ -346,15 +346,21 @@ def _get_running_loop():
 
 
 def _thread_weakrefs_atexit():
-    with _thread_weakrefs.lock:
-        if _thread_weakrefs.pid == portage.getpid():
-            while True:
-                try:
-                    thread_key, loop = _thread_weakrefs.loops.popitem()
-                except KeyError:
-                    break
-                else:
-                    loop.close()
+    while True:
+        loop = None
+        with _thread_weakrefs.lock:
+            if _thread_weakrefs.pid != portage.getpid():
+                return
+
+            try:
+                thread_key, loop = _thread_weakrefs.loops.popitem()
+            except KeyError:
+                return
+
+        # Release the lock while closing the loop, since it may call
+        # run_coroutine_exitfuncs interally.
+        if loop is not None:
+            loop.close()
 
 
 _thread_weakrefs = types.SimpleNamespace(
-- 
2.44.2


Reply via email to