This provides minimal interoperability with existing asyncio code,
by adding a portage.util.futures.unix_events.DefaultEventLoopPolicy
class that makes asyncio use portage's internal event loop when an
instance is passed into asyncio.set_event_loop_policy(). The
get_event_loop() method of this policy returns an instance of a
_PortageEventLoop class that wraps portage's internal event loop and
implements asyncio's AbstractEventLoop interface.

The portage.util.futures.asyncio module refers to the real
asyncio module when available, and otherwise falls back to a
minimal implementation that works with python2.7. The included
EventLoopInForkTestCase demonstrates usage, and works with all
supported versions of python, include python2.7.

Bug: https://bugs.gentoo.org/649588
---
 pym/portage/tests/util/futures/asyncio/__init__.py |   0
 pym/portage/tests/util/futures/asyncio/__test__.py |   0
 .../futures/asyncio/test_event_loop_in_fork.py     |  62 +++++++
 pym/portage/util/_eventloop/EventLoop.py           |  11 +-
 pym/portage/util/futures/__init__.py               |   9 +
 pym/portage/util/futures/_asyncio.py               | 116 +++++++++++++
 pym/portage/util/futures/events.py                 | 191 +++++++++++++++++++++
 pym/portage/util/futures/futures.py                |   7 +-
 pym/portage/util/futures/unix_events.py            |  91 ++++++++++
 9 files changed, 479 insertions(+), 8 deletions(-)
 create mode 100644 pym/portage/tests/util/futures/asyncio/__init__.py
 create mode 100644 pym/portage/tests/util/futures/asyncio/__test__.py
 create mode 100644 
pym/portage/tests/util/futures/asyncio/test_event_loop_in_fork.py
 create mode 100644 pym/portage/util/futures/_asyncio.py
 create mode 100644 pym/portage/util/futures/events.py
 create mode 100644 pym/portage/util/futures/unix_events.py

diff --git a/pym/portage/tests/util/futures/asyncio/__init__.py 
b/pym/portage/tests/util/futures/asyncio/__init__.py
new file mode 100644
index 000000000..e69de29bb
diff --git a/pym/portage/tests/util/futures/asyncio/__test__.py 
b/pym/portage/tests/util/futures/asyncio/__test__.py
new file mode 100644
index 000000000..e69de29bb
diff --git a/pym/portage/tests/util/futures/asyncio/test_event_loop_in_fork.py 
b/pym/portage/tests/util/futures/asyncio/test_event_loop_in_fork.py
new file mode 100644
index 000000000..1ef46229b
--- /dev/null
+++ b/pym/portage/tests/util/futures/asyncio/test_event_loop_in_fork.py
@@ -0,0 +1,62 @@
+# Copyright 2018 Gentoo Foundation
+# Distributed under the terms of the GNU General Public License v2
+
+import multiprocessing
+from portage.tests import TestCase
+from portage.util.futures import asyncio
+from portage.util.futures.unix_events import DefaultEventLoopPolicy
+
+
+def fork_main(parent_conn, child_conn):
+       parent_conn.close()
+       loop = asyncio.get_event_loop()
+       # This fails with python's default event loop policy,
+       # see https://bugs.python.org/issue22087.
+       loop.run_until_complete(asyncio.sleep(0.1))
+
+
+def async_main(loop=None):
+       loop = loop or asyncio.get_event_loop()
+       future = loop.create_future()
+
+       # Since python2.7 does not support Process.sentinel, use Pipe to
+       # monitor for process exit.
+       parent_conn, child_conn = multiprocessing.Pipe()
+
+       def eof_callback():
+               loop.remove_reader(parent_conn.fileno())
+               parent_conn.close()
+               future.set_result(None)
+
+       loop.add_reader(parent_conn.fileno(), eof_callback)
+       proc = multiprocessing.Process(target=fork_main, args=(parent_conn, 
child_conn))
+       proc.start()
+       child_conn.close()
+
+       return future
+
+
+class EventLoopInForkTestCase(TestCase):
+       """
+       The default asyncio event loop policy does not support loops
+       running in forks, see https://bugs.python.org/issue22087.
+       Portage's DefaultEventLoopPolicy supports forks.
+       """
+
+       def testEventLoopInForkTestCase(self):
+               initial_policy = asyncio.get_event_loop_policy()
+               if not isinstance(initial_policy, DefaultEventLoopPolicy):
+                       asyncio.set_event_loop_policy(DefaultEventLoopPolicy())
+               try:
+                       loop = asyncio.get_event_loop()
+                       exit_future = loop.create_future()
+                       def trigger_exit(*args):
+                               exit_future.set_result(True)
+
+                       def start_async_main():
+                               
async_main(loop=loop).add_done_callback(trigger_exit)
+
+                       loop.call_soon(start_async_main)
+                       loop.run_until_complete(exit_future)
+               finally:
+                       asyncio.set_event_loop_policy(initial_policy)
diff --git a/pym/portage/util/_eventloop/EventLoop.py 
b/pym/portage/util/_eventloop/EventLoop.py
index 72eb407fc..d53a76ba1 100644
--- a/pym/portage/util/_eventloop/EventLoop.py
+++ b/pym/portage/util/_eventloop/EventLoop.py
@@ -23,8 +23,9 @@ except ImportError:
 
 import portage
 portage.proxy.lazyimport.lazyimport(globals(),
-       'portage.util.futures.futures:_EventLoopFuture',
+       'portage.util.futures.futures:Future',
        'portage.util.futures.executor.fork:ForkExecutor',
+       'portage.util.futures.unix_events:_PortageEventLoop',
 )
 
 from portage import OrderedDict
@@ -188,15 +189,13 @@ class EventLoop(object):
                self._sigchld_write = None
                self._sigchld_src_id = None
                self._pid = os.getpid()
+               self._asyncio_wrapper = _PortageEventLoop(loop=self)
 
        def create_future(self):
                """
-               Create a Future object attached to the loop. This returns
-               an instance of _EventLoopFuture, because EventLoop is currently
-               missing some of the asyncio.AbstractEventLoop methods that
-               asyncio.Future requires.
+               Create a Future object attached to the loop.
                """
-               return _EventLoopFuture(loop=self)
+               return Future(loop=self._asyncio_wrapper)
 
        def _new_source_id(self):
                """
diff --git a/pym/portage/util/futures/__init__.py 
b/pym/portage/util/futures/__init__.py
index e69de29bb..789080c85 100644
--- a/pym/portage/util/futures/__init__.py
+++ b/pym/portage/util/futures/__init__.py
@@ -0,0 +1,9 @@
+
+__all__ = (
+       'asyncio',
+)
+
+try:
+       import asyncio
+except ImportError:
+       from portage.util.futures import _asyncio as asyncio
diff --git a/pym/portage/util/futures/_asyncio.py 
b/pym/portage/util/futures/_asyncio.py
new file mode 100644
index 000000000..6874e133f
--- /dev/null
+++ b/pym/portage/util/futures/_asyncio.py
@@ -0,0 +1,116 @@
+# Copyright 2018 Gentoo Foundation
+# Distributed under the terms of the GNU General Public License v2
+
+__all__ = (
+       'ensure_future',
+       'get_event_loop',
+       'get_event_loop_policy',
+       'set_event_loop_policy',
+       'sleep',
+       'Task',
+)
+
+import functools
+
+try:
+       import threading
+except ImportError:
+       import dummy_threading as threading
+
+import portage
+portage.proxy.lazyimport.lazyimport(globals(),
+       'portage.util.futures.unix_events:DefaultEventLoopPolicy',
+)
+from portage.util.futures.futures import Future
+
+_lock = threading.Lock()
+_policy = None
+
+
+def get_event_loop_policy():
+       """
+       Get the current event loop policy.
+
+       @rtype: asyncio.AbstractEventLoopPolicy (or compatible)
+       @return: the current event loop policy
+       """
+       global _lock, _policy
+       with _lock:
+               if _policy is None:
+                       _policy = DefaultEventLoopPolicy()
+               return _policy
+
+
+def set_event_loop_policy(policy):
+       """
+       Set the current event loop policy. If policy is None, the default
+       policy is restored.
+
+       @type policy: asyncio.AbstractEventLoopPolicy or None
+       @param policy: new event loop policy
+       """
+       global _lock, _policy
+       with _lock:
+               _policy = policy or DefaultEventLoopPolicy()
+
+
+def get_event_loop():
+       """
+       Equivalent to calling get_event_loop_policy().get_event_loop().
+
+       @rtype: asyncio.AbstractEventLoop (or compatible)
+       @return: the event loop for the current context
+       """
+       return get_event_loop_policy().get_event_loop()
+
+
+class Task(Future):
+       """
+       Schedule the execution of a coroutine: wrap it in a future. A task
+       is a subclass of Future.
+       """
+       def __init__(self, coro, loop=None):
+               raise NotImplementedError
+
+
+def ensure_future(coro_or_future, loop=None):
+       """
+       Wrap a coroutine or an awaitable in a future.
+
+       If the argument is a Future, it is returned directly.
+
+       @type coro_or_future: coroutine or Future
+       @param coro_or_future: coroutine or future to wrap
+       @type loop: asyncio.AbstractEventLoop (or compatible)
+       @param loop: event loop
+       @rtype: asyncio.Future (or compatible)
+       @return: an instance of Future
+       """
+       if isinstance(coro_or_future, Future):
+               return coro_or_future
+       raise NotImplementedError
+
+
+def sleep(delay, result=None, loop=None):
+       """
+       Create a future that completes after a given time (in seconds). If
+       result is provided, it is produced to the caller when the future
+       completes.
+
+       @type delay: int or float
+       @param delay: delay seconds
+       @type result: object
+       @param result: result of the future
+       @type loop: asyncio.AbstractEventLoop (or compatible)
+       @param loop: event loop
+       @rtype: asyncio.Future (or compatible)
+       @return: an instance of Future
+       """
+       loop = loop or get_event_loop()
+       future = loop.create_future()
+       handle = loop.call_later(delay, functools.partial(future.set_result, 
result))
+       def cancel_callback(future):
+               if future.cancelled():
+                       handle.cancel()
+       future.add_done_callback(cancel_callback)
+       return future
diff --git a/pym/portage/util/futures/events.py 
b/pym/portage/util/futures/events.py
new file mode 100644
index 000000000..b772bc242
--- /dev/null
+++ b/pym/portage/util/futures/events.py
@@ -0,0 +1,191 @@
+# Copyright 2018 Gentoo Foundation
+# Distributed under the terms of the GNU General Public License v2
+
+__all__ = (
+       'AbstractEventLoopPolicy',
+       'AbstractEventLoop',
+)
+
+import socket
+import subprocess
+
+try:
+       from asyncio.events import (
+               AbstractEventLoop as _AbstractEventLoop,
+               AbstractEventLoopPolicy as _AbstractEventLoopPolicy,
+       )
+except ImportError:
+       _AbstractEventLoop = object
+       _AbstractEventLoopPolicy = object
+
+
+class AbstractEventLoopPolicy(_AbstractEventLoopPolicy):
+    """Abstract policy for accessing the event loop."""
+
+    def get_event_loop(self):
+        raise NotImplementedError
+
+    def set_event_loop(self, loop):
+        raise NotImplementedError
+
+    def new_event_loop(self):
+        raise NotImplementedError
+
+    def get_child_watcher(self):
+        raise NotImplementedError
+
+    def set_child_watcher(self, watcher):
+        raise NotImplementedError
+
+
+class AbstractEventLoop(_AbstractEventLoop):
+       """Abstract event loop."""
+
+       def run_forever(self):
+               raise NotImplementedError
+
+       def run_until_complete(self, future):
+               raise NotImplementedError
+
+       def stop(self):
+               raise NotImplementedError
+
+       def is_running(self):
+               raise NotImplementedError
+
+       def is_closed(self):
+               raise NotImplementedError
+
+       def close(self):
+               raise NotImplementedError
+
+       def shutdown_asyncgens(self):
+               raise NotImplementedError
+
+       def _timer_handle_cancelled(self, handle):
+               raise NotImplementedError
+
+       def call_soon(self, callback, *args):
+               return self.call_later(0, callback, *args)
+
+       def call_later(self, delay, callback, *args):
+               raise NotImplementedError
+
+       def call_at(self, when, callback, *args):
+               raise NotImplementedError
+
+       def time(self):
+               raise NotImplementedError
+
+       def create_future(self):
+               raise NotImplementedError
+
+       def create_task(self, coro):
+               raise NotImplementedError
+
+       def call_soon_threadsafe(self, callback, *args):
+               raise NotImplementedError
+
+       def run_in_executor(self, executor, func, *args):
+               raise NotImplementedError
+
+       def set_default_executor(self, executor):
+               raise NotImplementedError
+
+       def getaddrinfo(self, host, port, family=0, type=0, proto=0, flags=0):
+               raise NotImplementedError
+
+       def getnameinfo(self, sockaddr, flags=0):
+               raise NotImplementedError
+
+       def create_connection(self, protocol_factory, host=None, port=None,
+                                                 ssl=None, family=0, proto=0, 
flags=0, sock=None,
+                                                 local_addr=None, 
server_hostname=None):
+               raise NotImplementedError
+
+       def create_server(self, protocol_factory, host=None, port=None,
+                                         family=socket.AF_UNSPEC, 
flags=socket.AI_PASSIVE,
+                                         sock=None, backlog=100, ssl=None, 
reuse_address=None,
+                                         reuse_port=None):
+               raise NotImplementedError
+
+       def create_unix_connection(self, protocol_factory, path,
+                                                          ssl=None, sock=None,
+                                                          
server_hostname=None):
+               raise NotImplementedError
+
+       def create_unix_server(self, protocol_factory, path,
+                                                  sock=None, backlog=100, 
ssl=None):
+               raise NotImplementedError
+
+       def create_datagram_endpoint(self, protocol_factory,
+                                                                
local_addr=None, remote_addr=None,
+                                                                family=0, 
proto=0, flags=0,
+                                                                
reuse_address=None, reuse_port=None,
+                                                                
allow_broadcast=None, sock=None):
+               raise NotImplementedError
+
+       def connect_read_pipe(self, protocol_factory, pipe):
+               raise NotImplementedError
+
+       def connect_write_pipe(self, protocol_factory, pipe):
+               raise NotImplementedError
+
+       def subprocess_shell(self, protocol_factory, cmd, stdin=subprocess.PIPE,
+                                                stdout=subprocess.PIPE, 
stderr=subprocess.PIPE,
+                                                **kwargs):
+               raise NotImplementedError
+
+       def subprocess_exec(self, protocol_factory, *args, **kwargs):
+               for k in ('stdin', 'stdout', 'stderr'):
+                       kwargs.setdefault(k, subprocess.PIPE)
+               raise NotImplementedError
+
+       def add_writer(self, fd, callback, *args):
+               raise NotImplementedError
+
+       def remove_writer(self, fd):
+               raise NotImplementedError
+
+       def sock_recv(self, sock, nbytes):
+               raise NotImplementedError
+
+       def sock_sendall(self, sock, data):
+               raise NotImplementedError
+
+       def sock_connect(self, sock, address):
+               raise NotImplementedError
+
+       def sock_accept(self, sock):
+               raise NotImplementedError
+
+       def add_signal_handler(self, sig, callback, *args):
+               raise NotImplementedError
+
+       def remove_signal_handler(self, sig):
+               raise NotImplementedError
+
+       def set_task_factory(self, factory):
+               raise NotImplementedError
+
+       def get_task_factory(self):
+               raise NotImplementedError
+
+       def get_exception_handler(self):
+               raise NotImplementedError
+
+       def set_exception_handler(self, handler):
+               raise NotImplementedError
+
+       def default_exception_handler(self, context):
+               raise NotImplementedError
+
+       def call_exception_handler(self, context):
+               raise NotImplementedError
+
+       def get_debug(self):
+               raise NotImplementedError
+
+       def set_debug(self, enabled):
+               raise NotImplementedError
+
diff --git a/pym/portage/util/futures/futures.py 
b/pym/portage/util/futures/futures.py
index cd56a27eb..dc4e0a7d7 100644
--- a/pym/portage/util/futures/futures.py
+++ b/pym/portage/util/futures/futures.py
@@ -41,7 +41,10 @@ except ImportError:
 
        Future = None
 
-from portage.util._eventloop.global_event_loop import global_event_loop
+import portage
+portage.proxy.lazyimport.lazyimport(globals(),
+       
'portage.util._eventloop.global_event_loop:global_event_loop@_global_event_loop',
+)
 
 _PENDING = 'PENDING'
 _CANCELLED = 'CANCELLED'
@@ -69,7 +72,7 @@ class _EventLoopFuture(object):
                the default event loop.
                """
                if loop is None:
-                       self._loop = global_event_loop()
+                       self._loop = _global_event_loop()._asyncio_wrapper
                else:
                        self._loop = loop
                self._callbacks = []
diff --git a/pym/portage/util/futures/unix_events.py 
b/pym/portage/util/futures/unix_events.py
new file mode 100644
index 000000000..ed4c6e519
--- /dev/null
+++ b/pym/portage/util/futures/unix_events.py
@@ -0,0 +1,91 @@
+# Copyright 2018 Gentoo Foundation
+# Distributed under the terms of the GNU General Public License v2
+
+__all__ = (
+       'DefaultEventLoopPolicy',
+)
+
+from portage.util._eventloop.global_event_loop import (
+       global_event_loop as _global_event_loop,
+)
+from portage.util.futures import (
+       asyncio,
+       events,
+)
+from portage.util.futures.futures import Future
+
+
+class _PortageEventLoop(events.AbstractEventLoop):
+       """
+       Implementation of asyncio.AbstractEventLoop which wraps portage's
+       internal event loop.
+       """
+
+       def __init__(self, loop):
+               """
+               @type loop: EventLoop
+               @param loop: an instance of portage's internal event loop
+               """
+               self._loop = loop
+               self.call_soon = loop.call_soon
+               self.call_soon_threadsafe = loop.call_soon_threadsafe
+               self.call_later = loop.call_later
+               self.call_at = loop.call_at
+               self.is_closed = loop.is_closed
+               self.close = loop.close
+               self.create_future = loop.create_future
+               self.add_reader = loop.add_reader
+               self.remove_reader = loop.remove_reader
+               self.add_writer = loop.add_writer
+               self.remove_writer = loop.remove_writer
+               self.run_in_executor = loop.run_in_executor
+               self.time = loop.time
+               self.set_debug = loop.set_debug
+               self.get_debug = loop.get_debug
+
+       def run_until_complete(self, future):
+               """
+               Run the event loop until a Future is done.
+
+               @type future: asyncio.Future
+               @param future: a Future to wait for
+               @rtype: object
+               @return: the Future's result
+               @raise: the Future's exception
+               """
+               return self._loop.run_until_complete(
+                       asyncio.ensure_future(future, loop=self))
+
+       def create_task(self, coro):
+               """
+               Schedule a coroutine object.
+
+               @type coro: coroutine
+               @param coro: a coroutine to schedule
+               @rtype: asyncio.Task
+               @return: a task object
+               """
+               return asyncio.Task(coro, loop=self)
+
+
+class _PortageEventLoopPolicy(events.AbstractEventLoopPolicy):
+       """
+       Implementation of asyncio.AbstractEventLoopPolicy based on portage's
+       internal event loop. This supports running event loops in forks,
+       which is not supported by the default asyncio event loop policy,
+       see https://bugs.python.org/issue22087.
+       """
+       def get_event_loop(self):
+               """
+               Get the event loop for the current context.
+
+               Returns an event loop object implementing the AbstractEventLoop
+               interface.
+
+               @rtype: asyncio.AbstractEventLoop (or compatible)
+               @return: the current event loop policy
+               """
+               return _global_event_loop()._asyncio_wrapper
+
+
+DefaultEventLoopPolicy = _PortageEventLoopPolicy
-- 
2.13.6


Reply via email to