Ultimately the loop arguments that necessitate the _wrap_loop
function can be removed, because our aim since bug 761538 should
be to eliminate them. Meanwhile, we don't want _wrap_loop to return
redundant AsyncioEventLoop instances if we can easily prevent it.

Therefore, use _safe_loop(create=False) to look up the AsyncioEventLoop
instance associated with the current thread, and avoid creating
redundant instances. This serves to guard against garbage collection
of AsyncioEventLoop instances which may have _coroutine_exithandlers
added by the atexit_register function since commit cb0c09d8cecb from
bug 937740.

If _safe_loop(create=False) fails to associate a loop with the current
thread, raise an AssertionError for portage internal API consumers.
It's not known whether external API consumers will trigger this case,
so if it happens then emit a UserWarning and return a temporary
AsyncioEventLoop instance.

Fixes: cb0c09d8cecb ("Support coroutine exitfuncs for non-main loops")
Bug: https://bugs.gentoo.org/938127
Bug: https://bugs.gentoo.org/937740
Bug: https://bugs.gentoo.org/761538
Closes: https://github.com/gentoo/portage/pull/1372
Signed-off-by: Zac Medico <zmed...@gentoo.org>
---
 lib/portage/util/futures/_asyncio/__init__.py | 43 ++++++++++++++++---
 1 file changed, 38 insertions(+), 5 deletions(-)

diff --git a/lib/portage/util/futures/_asyncio/__init__.py 
b/lib/portage/util/futures/_asyncio/__init__.py
index 8805e35756..c960d03630 100644
--- a/lib/portage/util/futures/_asyncio/__init__.py
+++ b/lib/portage/util/futures/_asyncio/__init__.py
@@ -26,6 +26,7 @@ __all__ = (
 
 import sys
 import types
+import warnings
 import weakref
 
 import asyncio as _real_asyncio
@@ -46,6 +47,7 @@ from asyncio import (
 )
 
 import threading
+from typing import Optional
 
 import portage
 
@@ -251,11 +253,35 @@ def _wrap_loop(loop=None):
     # The default loop returned by _wrap_loop should be consistent
     # with global_event_loop, in order to avoid accidental registration
     # of callbacks with a loop that is not intended to run.
-    loop = loop or _safe_loop()
-    return loop if hasattr(loop, "_asyncio_wrapper") else 
_AsyncioEventLoop(loop=loop)
+    if hasattr(loop, "_asyncio_wrapper"):
+        return loop
+
+    # This returns a running loop if it exists, and otherwise returns
+    # a loop associated with the current thread.
+    safe_loop = _safe_loop(create=loop is None)
+    if safe_loop is not None and (loop is None or safe_loop._loop is loop):
+        return safe_loop
+
+    if safe_loop is None:
+        msg = f"_wrap_loop argument '{loop}' not associated with thread 
'{threading.get_ident()}'"
+    else:
+        msg = f"_wrap_loop argument '{loop}' different frome loop 
'{safe_loop._loop}' already associated with thread '{threading.get_ident()}'"
+
+    if portage._internal_caller:
+        raise AssertionError(msg)
 
+    # It's not known whether external API consumers will trigger this case,
+    # so if it happens then emit a UserWarning before returning a temporary
+    # AsyncioEventLoop instance.
+    warnings.warn(msg, UserWarning, stacklevel=2)
 
-def _safe_loop():
+    # We could possibly add a weak reference in _thread_weakrefs.loops when
+    # safe_loop is None, but if safe_loop is not None, then there is a
+    # collision in _thread_weakrefs.loops that would need to be resolved.
+    return _AsyncioEventLoop(loop=loop)
+
+
+def _safe_loop(create: Optional[bool] = True) -> Optional[_AsyncioEventLoop]:
     """
     Return an event loop that's safe to use within the current context.
     For portage internal callers or external API consumers calling from
@@ -276,8 +302,13 @@ def _safe_loop():
     are added to a WeakValueDictionary, and closed via an atexit hook
     if they still exist during exit for the current pid.
 
-    @rtype: asyncio.AbstractEventLoop (or compatible)
-    @return: event loop instance
+    @type create: bool
+    @param create: Create a loop by default if a loop is not already associated
+        with the current thread. If create is False, then return None if a loop
+        is not already associated with the current thread.
+    @rtype: AsyncioEventLoop or None
+    @return: event loop instance, or None if the create parameter is False and
+        a loop is not already associated with the current thread.
     """
     loop = _get_running_loop()
     if loop is not None:
@@ -292,6 +323,8 @@ def _safe_loop():
         try:
             loop = _thread_weakrefs.loops[thread_key]
         except KeyError:
+            if not create:
+                return None
             try:
                 try:
                     _loop = _real_asyncio.get_running_loop()
-- 
2.44.2


Reply via email to