commit:     96cc073263910e7c1b0f48cc08a4db4b56ffe408
Author:     Zac Medico <zmedico <AT> gentoo <DOT> org>
AuthorDate: Tue May  1 08:15:39 2018 +0000
Commit:     Zac Medico <zmedico <AT> gentoo <DOT> org>
CommitDate: Tue May  8 16:42:03 2018 +0000
URL:        https://gitweb.gentoo.org/proj/portage.git/commit/?id=96cc0732

global_event_loop: use asyncio event loop (bug 654390)

For python3.4 and later, in the main process, replace portage's
internal event loop with the standard library's asyncio event
loop. Continue to use portage's internal event loop in subprocesses,
since asyncio's event loop is not guaranteed to work well in
subprocesses (see upstream python issues 22087 and 29703).

An _AsyncioEventLoopPolicy class, derived from _PortageEventLoopPolicy,
is needed for some unit tests that modify asyncio's event loop policy.
This policy is not needed for anything other than unit testing. Portage
uses asyncio's default event loop policy, and API consumers are free to
use any desired event loop policy.

Portage's asynchronous functions that accept a 'loop' parameter will
work with any compatible asyncio.AbstractEventLoop implementation, since
an internal _wrap_loop function automatically adapts the loop for
internal use.

Bug: https://bugs.gentoo.org/654390
Reviewed-by: Brian Dolbec <dolsen <AT> gentoo.org>
Reviewed-by: Alec Warner <antarus <AT> gentoo.org>

 pym/portage/util/_async/SchedulerInterface.py     |  3 +
 pym/portage/util/_eventloop/asyncio_event_loop.py | 77 +++++++++++++++++++++++
 pym/portage/util/_eventloop/global_event_loop.py  |  5 +-
 pym/portage/util/futures/_asyncio/__init__.py     | 17 +++++
 pym/portage/util/futures/unix_events.py           | 43 ++++++++++++-
 5 files changed, 143 insertions(+), 2 deletions(-)

diff --git a/pym/portage/util/_async/SchedulerInterface.py 
b/pym/portage/util/_async/SchedulerInterface.py
index f1a3e9b0b..ec6417da1 100644
--- a/pym/portage/util/_async/SchedulerInterface.py
+++ b/pym/portage/util/_async/SchedulerInterface.py
@@ -33,6 +33,9 @@ class SchedulerInterface(SlotObject):
                "time",
 
                "_asyncio_child_watcher",
+               # This attribute it used by _wrap_loop to detect if the
+               # loop already has a suitable wrapper.
+               "_asyncio_wrapper",
        )
 
        __slots__ = _event_loop_attrs + ("_event_loop", "_is_background")

diff --git a/pym/portage/util/_eventloop/asyncio_event_loop.py 
b/pym/portage/util/_eventloop/asyncio_event_loop.py
new file mode 100644
index 000000000..b365939b0
--- /dev/null
+++ b/pym/portage/util/_eventloop/asyncio_event_loop.py
@@ -0,0 +1,77 @@
+# Copyright 2018 Gentoo Foundation
+# Distributed under the terms of the GNU General Public License v2
+
+try:
+       import asyncio as _real_asyncio
+       from asyncio.events import AbstractEventLoop as _AbstractEventLoop
+except ImportError:
+       # Allow ImportModulesTestCase to succeed.
+       _real_asyncio = None
+       _AbstractEventLoop = object
+
+
+class AsyncioEventLoop(_AbstractEventLoop):
+       """
+       Implementation of asyncio.AbstractEventLoop which wraps asyncio's
+       event loop and is minimally compatible with _PortageEventLoop.
+       """
+
+       # Use portage's internal event loop in subprocesses, as a workaround
+       # for https://bugs.python.org/issue22087, and also
+       # https://bugs.python.org/issue29703 which affects pypy3-5.10.1.
+       supports_multiprocessing = False
+
+       def __init__(self, loop=None):
+               loop = loop or _real_asyncio.get_event_loop()
+               self._loop = loop
+               self.run_until_complete = loop.run_until_complete
+               self.call_soon = loop.call_soon
+               self.call_soon_threadsafe = loop.call_soon_threadsafe
+               self.call_later = loop.call_later
+               self.call_at = loop.call_at
+               self.is_running = loop.is_running
+               self.is_closed = loop.is_closed
+               self.close = loop.close
+               self.create_future = (loop.create_future
+                       if hasattr(loop, 'create_future') else 
self._create_future)
+               self.create_task = loop.create_task
+               self.add_reader = loop.add_reader
+               self.remove_reader = loop.remove_reader
+               self.add_writer = loop.add_writer
+               self.remove_writer = loop.remove_writer
+               self.run_in_executor = loop.run_in_executor
+               self.time = loop.time
+               self.default_exception_handler = loop.default_exception_handler
+               self.call_exception_handler = loop.call_exception_handler
+               self.set_debug = loop.set_debug
+               self.get_debug = loop.get_debug
+
+       def _create_future(self):
+               """
+               Provide AbstractEventLoop.create_future() for python3.4.
+               """
+               return _real_asyncio.Future(loop=self._loop)
+
+       @property
+       def _asyncio_child_watcher(self):
+               """
+               Portage internals use this as a layer of indirection for
+               asyncio.get_child_watcher(), in order to support versions of
+               python where asyncio is not available.
+
+               @rtype: asyncio.AbstractChildWatcher
+               @return: the internal event loop's AbstractChildWatcher 
interface
+               """
+               return _real_asyncio.get_child_watcher()
+
+       @property
+       def _asyncio_wrapper(self):
+               """
+               Portage internals use this as a layer of indirection in cases
+               where a wrapper around an asyncio.AbstractEventLoop 
implementation
+               is needed for purposes of compatiblity.
+
+               @rtype: asyncio.AbstractEventLoop
+               @return: the internal event loop's AbstractEventLoop interface
+               """
+               return self

diff --git a/pym/portage/util/_eventloop/global_event_loop.py 
b/pym/portage/util/_eventloop/global_event_loop.py
index a3ee9248d..2f6371dc1 100644
--- a/pym/portage/util/_eventloop/global_event_loop.py
+++ b/pym/portage/util/_eventloop/global_event_loop.py
@@ -2,10 +2,13 @@
 # Distributed under the terms of the GNU General Public License v2
 
 import os
+import sys
 
 from .EventLoop import EventLoop
+from portage.util._eventloop.asyncio_event_loop import AsyncioEventLoop
 
-_default_constructor = EventLoop
+_asyncio_enabled = sys.version_info >= (3, 4)
+_default_constructor = AsyncioEventLoop if _asyncio_enabled else EventLoop
 
 # If _default_constructor doesn't support multiprocessing,
 # then _multiprocessing_constructor is used in subprocesses.

diff --git a/pym/portage/util/futures/_asyncio/__init__.py 
b/pym/portage/util/futures/_asyncio/__init__.py
index e62de7a69..1273afa02 100644
--- a/pym/portage/util/futures/_asyncio/__init__.py
+++ b/pym/portage/util/futures/_asyncio/__init__.py
@@ -20,6 +20,11 @@ __all__ = (
        'wait',
 )
 
+try:
+       import asyncio as _real_asyncio
+except ImportError:
+       _real_asyncio = None
+
 try:
        import threading
 except ImportError:
@@ -29,6 +34,8 @@ import portage
 portage.proxy.lazyimport.lazyimport(globals(),
        'portage.util.futures.unix_events:DefaultEventLoopPolicy',
 )
+from portage.util._eventloop.asyncio_event_loop import AsyncioEventLoop as 
_AsyncioEventLoop
+from portage.util._eventloop.global_event_loop import _asyncio_enabled
 from portage.util.futures.futures import (
        CancelledError,
        Future,
@@ -162,3 +169,13 @@ def _wrap_loop(loop=None):
        @return: event loop
        """
        return loop or get_event_loop()
+
+
+if _asyncio_enabled:
+       get_event_loop_policy = _real_asyncio.get_event_loop_policy
+       set_event_loop_policy = _real_asyncio.set_event_loop_policy
+
+       def _wrap_loop(loop=None):
+               loop = loop or get_event_loop()
+               return (loop if hasattr(loop, '_asyncio_wrapper')
+                       else _AsyncioEventLoop(loop=loop))

diff --git a/pym/portage/util/futures/unix_events.py 
b/pym/portage/util/futures/unix_events.py
index 00f522b61..ce520db00 100644
--- a/pym/portage/util/futures/unix_events.py
+++ b/pym/portage/util/futures/unix_events.py
@@ -7,6 +7,7 @@ __all__ = (
 )
 
 try:
+       import asyncio as _real_asyncio
        from asyncio.base_subprocess import BaseSubprocessTransport as 
_BaseSubprocessTransport
        from asyncio.unix_events import AbstractChildWatcher as 
_AbstractChildWatcher
        from asyncio.transports import (
@@ -14,6 +15,7 @@ try:
                WriteTransport as _WriteTransport,
        )
 except ImportError:
+       _real_asyncio = None
        _AbstractChildWatcher = object
        _BaseSubprocessTransport = object
        _ReadTransport = object
@@ -30,6 +32,7 @@ import subprocess
 import sys
 
 from portage.util._eventloop.global_event_loop import (
+       _asyncio_enabled,
        global_event_loop as _global_event_loop,
 )
 from portage.util.futures import (
@@ -678,4 +681,42 @@ class 
_PortageEventLoopPolicy(events.AbstractEventLoopPolicy):
                return _global_event_loop()._asyncio_child_watcher
 
 
-DefaultEventLoopPolicy = _PortageEventLoopPolicy
+class _AsyncioEventLoopPolicy(_PortageEventLoopPolicy):
+       """
+       Implementation of asyncio.AbstractEventLoopPolicy based on asyncio's
+       event loop. This supports running event loops in forks,
+       which is not supported by the default asyncio event loop policy,
+       see https://bugs.python.org/issue22087 and also
+       https://bugs.python.org/issue29703 which affects pypy3-5.10.1.
+       """
+       _MAIN_PID = os.getpid()
+
+       def __init__(self):
+               super(_AsyncioEventLoopPolicy, self).__init__()
+               self._default_policy = _real_asyncio.DefaultEventLoopPolicy()
+
+       def get_event_loop(self):
+               """
+               Get the event loop for the current context.
+
+               Returns an event loop object implementing the AbstractEventLoop
+               interface.
+
+               @rtype: asyncio.AbstractEventLoop (or compatible)
+               @return: the current event loop policy
+               """
+               if os.getpid() == self._MAIN_PID:
+                       return self._default_policy.get_event_loop()
+               else:
+                       return super(_AsyncioEventLoopPolicy, 
self).get_event_loop()
+
+       def get_child_watcher(self):
+               """Get the watcher for child processes."""
+               if os.getpid() == self._MAIN_PID:
+                       return self._default_policy.get_child_watcher()
+               else:
+                       return super(_AsyncioEventLoopPolicy, 
self).get_child_watcher()
+
+
+DefaultEventLoopPolicy = (_AsyncioEventLoopPolicy if _asyncio_enabled
+       else _PortageEventLoopPolicy)

Reply via email to