Handle the case where the loop has not been explicitly closed before exit. In this case, run_exitfuncs previously tried to call coroutine functions as though they were normal functions, which obvously would not behave correctly.
Solve this problem by storing the coroutine functions in a separate _coroutine_exithandlers list that belongs exclusively to the run_coroutine_exitfuncs function, so that it is safe to close the loop and call run_coroutine_exitfuncs from inside a run_exitfuncs hook. A _thread_weakrefs_atexit hook already exists that will close weakly referenced loops. The _thread_weakrefs_atexit hook is now fixed to release its lock when it closes a loop, since the same lock may need to be re-acquired when run_coroutine_exitfuncs runs. The included test case demonstrates that run_exitfuncs will run via an atexit hook and correctly terminate the socks5 proxy in a standalone program using the portage API (like eclean). Due to a deadlock that will occur if an _exit_function atexit hook from the multiprocessing module executes before run_exitfuncs, a portage.process._atexit_register_run_exitfuncs() function needs to be called in order to re-order the hooks after the first process has been started via the multiprocessing module. The natural place to call this is in the ForkProcess class, using a global variable to trigger the call just once. Fixes: c3ebdbb42e72 ("elog/mod_custom: Spawn processes in background") Bug: https://bugs.gentoo.org/937384 Closes: https://github.com/gentoo/portage/pull/1366 Signed-off-by: Zac Medico <zmed...@gentoo.org> --- lib/portage/process.py | 37 +++++++++-- lib/portage/tests/__init__.py | 19 ++++++ lib/portage/tests/util/test_socks5.py | 66 +++++++++++++++++-- lib/portage/util/_async/ForkProcess.py | 8 +++ lib/portage/util/futures/_asyncio/__init__.py | 24 ++++--- 5 files changed, 133 insertions(+), 21 deletions(-) diff --git a/lib/portage/process.py b/lib/portage/process.py index 6e4e0d7162..23e2507b53 100644 --- a/lib/portage/process.py +++ b/lib/portage/process.py @@ -3,6 +3,7 @@ # Distributed under the terms of the GNU General Public License v2 +import asyncio as _asyncio import atexit import errno import fcntl @@ -193,6 +194,7 @@ def spawn_fakeroot(mycommand, fakeroot_state=None, opt_name=None, **keywords): _exithandlers = [] +_coroutine_exithandlers = [] def atexit_register(func, *args, **kargs): @@ -200,7 +202,12 @@ def atexit_register(func, *args, **kargs): what is registered. For example, when portage restarts itself via os.execv, the atexit module does not work so we have to do it manually by calling the run_exitfuncs() function in this module.""" - _exithandlers.append((func, args, kargs)) + # The internal asyncio wrapper module would trigger a circular import + # if used here. + if _asyncio.iscoroutinefunction(func): + _coroutine_exithandlers.append((func, args, kargs)) + else: + _exithandlers.append((func, args, kargs)) def run_exitfuncs(): @@ -232,12 +239,16 @@ async def run_coroutine_exitfuncs(): This is the same as run_exitfuncs but it uses asyncio.iscoroutinefunction to check which functions to run. It is called by the AsyncioEventLoop _close_main method just before the loop is closed. + + If the loop is explicitly closed before exit, then that will cause + run_coroutine_exitfuncs to run before run_exitfuncs. Otherwise, a + run_exitfuncs hook will close it, causing run_coroutine_exitfuncs to be + called via run_exitfuncs. """ tasks = [] - for index, (func, targs, kargs) in reversed(list(enumerate(_exithandlers))): - if asyncio.iscoroutinefunction(func): - del _exithandlers[index] - tasks.append(asyncio.ensure_future(func(*targs, **kargs))) + while _coroutine_exithandlers: + func, targs, kargs = _coroutine_exithandlers.pop() + tasks.append(asyncio.ensure_future(func(*targs, **kargs))) tracebacks = [] exc_info = None for task in tasks: @@ -255,7 +266,21 @@ async def run_coroutine_exitfuncs(): raise exc_info[1].with_traceback(exc_info[2]) -atexit.register(run_exitfuncs) +def _atexit_register_run_exitfuncs(): + """ + Register the run_exitfuncs atexit hook. If this hook is not called + before the multiprocessing module's _exit_function, then there will + be a deadlock. In order to prevent the deadlock, this function must + be called in order to re-order the hooks after the first process has + been started via the multiprocessing module. The natural place to + call this is in the ForkProcess class, though it should also be + called once before, in case the ForkProcess class is never called. + """ + atexit.unregister(run_exitfuncs) + atexit.register(run_exitfuncs) + + +_atexit_register_run_exitfuncs() # It used to be necessary for API consumers to remove pids from spawned_pids, # since otherwise it would accumulate a pids endlessly. Now, spawned_pids is diff --git a/lib/portage/tests/__init__.py b/lib/portage/tests/__init__.py index 23dd366d89..79373dfbb0 100644 --- a/lib/portage/tests/__init__.py +++ b/lib/portage/tests/__init__.py @@ -16,6 +16,7 @@ from portage import os from portage.util import no_color from portage import _encodings from portage import _unicode_decode +from portage.const import PORTAGE_PYM_PATH from portage.output import colorize from portage.proxy.objectproxy import ObjectProxy @@ -65,6 +66,24 @@ def cnf_sbindir(): return os.path.join(portage.const.EPREFIX or "/", "usr", "sbin") +def get_pythonpath(): + """ + Prefix current PYTHONPATH with PORTAGE_PYM_PATH, and normalize. + """ + pythonpath = os.environ.get("PYTHONPATH") + if pythonpath is not None and not pythonpath.strip(): + pythonpath = None + if pythonpath is not None and pythonpath.split(":")[0] == PORTAGE_PYM_PATH: + pass + else: + if pythonpath is None: + pythonpath = "" + else: + pythonpath = ":" + pythonpath + pythonpath = PORTAGE_PYM_PATH + pythonpath + return pythonpath + + class TestCase(unittest.TestCase): """ We need a way to mark a unit test as "ok to fail" diff --git a/lib/portage/tests/util/test_socks5.py b/lib/portage/tests/util/test_socks5.py index e7bc2d699f..35f919d970 100644 --- a/lib/portage/tests/util/test_socks5.py +++ b/lib/portage/tests/util/test_socks5.py @@ -3,14 +3,16 @@ import asyncio import functools +import os import shutil import socket import struct +import subprocess import tempfile import time import portage -from portage.tests import TestCase +from portage.tests import TestCase, get_pythonpath from portage.util import socks5 from portage.util.futures.executor.fork import ForkExecutor from portage.util._eventloop.global_event_loop import global_event_loop @@ -199,10 +201,10 @@ class Socks5ServerTestCase(TestCase): path = "/index.html" proxy = None tempdir = tempfile.mkdtemp() - previous_exithandlers = portage.process._exithandlers + previous_exithandlers = portage.process._coroutine_exithandlers try: - portage.process._exithandlers = [] + portage.process._coroutine_exithandlers = [] with AsyncHTTPServer(host, {path: content}, loop) as server: settings = { "PORTAGE_TMPDIR": tempdir, @@ -225,11 +227,11 @@ class Socks5ServerTestCase(TestCase): finally: try: # Also run_coroutine_exitfuncs to test atexit hook cleanup. - self.assertNotEqual(portage.process._exithandlers, []) + self.assertNotEqual(portage.process._coroutine_exithandlers, []) await portage.process.run_coroutine_exitfuncs() - self.assertEqual(portage.process._exithandlers, []) + self.assertEqual(portage.process._coroutine_exithandlers, []) finally: - portage.process._exithandlers = previous_exithandlers + portage.process._coroutine_exithandlers = previous_exithandlers shutil.rmtree(tempdir) @@ -269,3 +271,55 @@ class Socks5ServerLoopCloseTestCase(TestCase): shutil.rmtree(tempdir) return not socks5.proxy.is_running() + + +class Socks5ServerAtExitTestCase(TestCase): + """ + For bug 937384, test that the socks5 proxy is automatically + terminated by portage.process.run_exitfuncs(), using a subprocess + for isolation. + + Note that if the subprocess is created via fork then it will be + vulnerable to python issue 83856 which is only fixed in python3.13, + so this test uses python -c to ensure that atexit hooks will work. + """ + + def testSocks5ServerAtExit(self): + tempdir = tempfile.mkdtemp() + try: + env = os.environ.copy() + env["PYTHONPATH"] = get_pythonpath() + output = subprocess.check_output( + [ + portage._python_interpreter, + "-c", + """ +import sys + +from portage.const import PORTAGE_BIN_PATH +from portage.util import socks5 +from portage.util._eventloop.global_event_loop import global_event_loop + +tempdir = sys.argv[0] +loop = global_event_loop() + +settings = { + "PORTAGE_TMPDIR": tempdir, + "PORTAGE_BIN_PATH": PORTAGE_BIN_PATH, +} + +socks5.get_socks5_proxy(settings) +loop.run_until_complete(socks5.proxy.ready()) +print(socks5.proxy._proc.pid, flush=True) +""", + tempdir, + ], + env=env, + ) + + pid = int(output.strip()) + + with self.assertRaises(ProcessLookupError): + os.kill(pid, 0) + finally: + shutil.rmtree(tempdir) diff --git a/lib/portage/util/_async/ForkProcess.py b/lib/portage/util/_async/ForkProcess.py index e6cfdefb88..946978b301 100644 --- a/lib/portage/util/_async/ForkProcess.py +++ b/lib/portage/util/_async/ForkProcess.py @@ -15,6 +15,8 @@ from portage.cache.mappings import slot_dict_class from portage.util.futures import asyncio from _emerge.SpawnProcess import SpawnProcess +_registered_run_exitfuncs = None + class ForkProcess(SpawnProcess): # NOTE: This class overrides the meaning of the SpawnProcess 'args' @@ -206,6 +208,8 @@ class ForkProcess(SpawnProcess): promoting a healthy state for the forked interpreter. """ + global _registered_run_exitfuncs + if self.__class__._run is ForkProcess._run: # target replaces the deprecated self._run method target = self.target @@ -252,6 +256,10 @@ class ForkProcess(SpawnProcess): if stdin_dup is not None: os.close(stdin_dup) + if _registered_run_exitfuncs != portage.getpid(): + portage.process._atexit_register_run_exitfuncs() + _registered_run_exitfuncs = portage.getpid() + return portage.process.MultiprocessingProcess(proc) def _cancel(self): diff --git a/lib/portage/util/futures/_asyncio/__init__.py b/lib/portage/util/futures/_asyncio/__init__.py index e377a9cdd1..8942bcb67e 100644 --- a/lib/portage/util/futures/_asyncio/__init__.py +++ b/lib/portage/util/futures/_asyncio/__init__.py @@ -346,15 +346,21 @@ def _get_running_loop(): def _thread_weakrefs_atexit(): - with _thread_weakrefs.lock: - if _thread_weakrefs.pid == portage.getpid(): - while True: - try: - thread_key, loop = _thread_weakrefs.loops.popitem() - except KeyError: - break - else: - loop.close() + while True: + loop = None + with _thread_weakrefs.lock: + if _thread_weakrefs.pid != portage.getpid(): + return + + try: + thread_key, loop = _thread_weakrefs.loops.popitem() + except KeyError: + return + + # Release the lock while closing the loop, since it may call + # run_coroutine_exitfuncs interally. + if loop is not None: + loop.close() _thread_weakrefs = types.SimpleNamespace( -- 2.44.2