commit:     dcbcac809213537afaa6b4f9822146a2e984f773
Author:     Zac Medico <zmedico <AT> gentoo <DOT> org>
AuthorDate: Mon Dec  7 06:05:04 2020 +0000
Commit:     Zac Medico <zmedico <AT> gentoo <DOT> org>
CommitDate: Mon Dec  7 07:48:10 2020 +0000
URL:        https://gitweb.gentoo.org/proj/portage.git/commit/?id=dcbcac80

_safe_loop: instantiate asyncio loop for API consumer thread

In order to maintain compatibility with an API consumer thread which
has not instantiated an asyncio loop for the current thread prior
to calling the portage API, instantiate a loop on its behalf. Since
a ResourceWarning will be triggered if the loop has not been closed
before the process exits, add the loop to a WeakValueDictionary,
and close it if it still exists during exit for the current pid.

Fixes: cecd2f8a259c ("Use default asyncio event loop implementation in API 
consumer threads")
Bug: https://bugs.gentoo.org/758755
Signed-off-by: Zac Medico <zmedico <AT> gentoo.org>

 lib/portage/util/futures/_asyncio/__init__.py | 49 ++++++++++++++++++++++-----
 1 file changed, 40 insertions(+), 9 deletions(-)

diff --git a/lib/portage/util/futures/_asyncio/__init__.py 
b/lib/portage/util/futures/_asyncio/__init__.py
index 6f3395a91..d39f31786 100644
--- a/lib/portage/util/futures/_asyncio/__init__.py
+++ b/lib/portage/util/futures/_asyncio/__init__.py
@@ -21,7 +21,8 @@ __all__ = (
 )
 
 import subprocess
-import sys
+import types
+import weakref
 
 import asyncio as _real_asyncio
 
@@ -249,23 +250,53 @@ def _safe_loop():
        the main thread, this returns a globally shared event loop instance.
 
        For external API consumers calling from a non-main thread, an
-       asyncio loop must be registered for the current thread, or else an
-       error will be raised like this:
+       asyncio loop must be registered for the current thread, or else the
+       asyncio.get_event_loop() function will raise an error like this:
 
          RuntimeError: There is no current event loop in thread 'Thread-1'.
 
-       In order to avoid this RuntimeError, the external API consumer
-       is responsible for setting an event loop and managing its lifecycle.
-       For example, this code will set an event loop for the current thread:
+       In order to avoid this RuntimeError, a loop will be automatically
+       created like this:
 
          asyncio.set_event_loop(asyncio.new_event_loop())
 
-       In order to avoid a ResourceWarning, the caller should also close the
-       corresponding loop before the current thread terminates.
+       In order to avoid a ResourceWarning, automatically created loops
+       are added to a WeakValueDictionary, and closed via an atexit hook
+       if they still exist during exit for the current pid.
 
        @rtype: asyncio.AbstractEventLoop (or compatible)
        @return: event loop instance
        """
        if portage._internal_caller or threading.current_thread() is 
threading.main_thread():
                return _global_event_loop()
-       return _AsyncioEventLoop()
+
+       thread_key = threading.get_ident()
+       with _thread_weakrefs.lock:
+               if _thread_weakrefs.pid != portage.getpid():
+                       _thread_weakrefs.pid = portage.getpid()
+                       _thread_weakrefs.loops = weakref.WeakValueDictionary()
+               try:
+                       loop = _thread_weakrefs.loops[thread_key]
+               except KeyError:
+                       try:
+                               _real_asyncio.get_event_loop()
+                       except RuntimeError:
+                               
_real_asyncio.set_event_loop(_real_asyncio.new_event_loop())
+                       loop = _thread_weakrefs.loops[thread_key] = 
_AsyncioEventLoop()
+       return loop
+
+
+def _thread_weakrefs_atexit():
+       with _thread_weakrefs.lock:
+               if _thread_weakrefs.pid == portage.getpid():
+                       while True:
+                               try:
+                                       thread_key, loop = 
_thread_weakrefs.loops.popitem()
+                               except KeyError:
+                                       break
+                               else:
+                                       loop.close()
+
+
+_thread_weakrefs = types.SimpleNamespace(lock=threading.Lock(), loops=None, 
pid=None)
+portage.process.atexit_register(_thread_weakrefs_atexit)

Reply via email to