This provides minimal interoperability with existing asyncio code, by adding a portage.util.futures.unix_events.DefaultEventLoopPolicy class that makes asyncio use portage's internal event loop when an instance is passed into asyncio.set_event_loop_policy(). The get_event_loop() method of this policy returns an instance of a _PortageEventLoop class that wraps portage's internal event loop and implements asyncio's AbstractEventLoop interface.
The portage.util.futures.asyncio module refers to the real asyncio module when available, and otherwise falls back to a minimal implementation that works with python2.7. The included EventLoopInForkTestCase demonstrates usage, and works with all supported versions of python, include python2.7. Bug: https://bugs.gentoo.org/649588 --- pym/portage/tests/util/futures/asyncio/__init__.py | 0 pym/portage/tests/util/futures/asyncio/__test__.py | 0 .../futures/asyncio/test_event_loop_in_fork.py | 62 +++++++ pym/portage/util/_eventloop/EventLoop.py | 11 +- pym/portage/util/futures/__init__.py | 9 + pym/portage/util/futures/_asyncio.py | 116 +++++++++++++ pym/portage/util/futures/events.py | 191 +++++++++++++++++++++ pym/portage/util/futures/futures.py | 7 +- pym/portage/util/futures/unix_events.py | 91 ++++++++++ 9 files changed, 479 insertions(+), 8 deletions(-) create mode 100644 pym/portage/tests/util/futures/asyncio/__init__.py create mode 100644 pym/portage/tests/util/futures/asyncio/__test__.py create mode 100644 pym/portage/tests/util/futures/asyncio/test_event_loop_in_fork.py create mode 100644 pym/portage/util/futures/_asyncio.py create mode 100644 pym/portage/util/futures/events.py create mode 100644 pym/portage/util/futures/unix_events.py diff --git a/pym/portage/tests/util/futures/asyncio/__init__.py b/pym/portage/tests/util/futures/asyncio/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/pym/portage/tests/util/futures/asyncio/__test__.py b/pym/portage/tests/util/futures/asyncio/__test__.py new file mode 100644 index 000000000..e69de29bb diff --git a/pym/portage/tests/util/futures/asyncio/test_event_loop_in_fork.py b/pym/portage/tests/util/futures/asyncio/test_event_loop_in_fork.py new file mode 100644 index 000000000..1ef46229b --- /dev/null +++ b/pym/portage/tests/util/futures/asyncio/test_event_loop_in_fork.py @@ -0,0 +1,62 @@ +# Copyright 2018 Gentoo Foundation +# Distributed under the terms of the GNU General Public License v2 + +import multiprocessing +from portage.tests import TestCase +from portage.util.futures import asyncio +from portage.util.futures.unix_events import DefaultEventLoopPolicy + + +def fork_main(parent_conn, child_conn): + parent_conn.close() + loop = asyncio.get_event_loop() + # This fails with python's default event loop policy, + # see https://bugs.python.org/issue22087. + loop.run_until_complete(asyncio.sleep(0.1)) + + +def async_main(loop=None): + loop = loop or asyncio.get_event_loop() + future = loop.create_future() + + # Since python2.7 does not support Process.sentinel, use Pipe to + # monitor for process exit. + parent_conn, child_conn = multiprocessing.Pipe() + + def eof_callback(): + loop.remove_reader(parent_conn.fileno()) + parent_conn.close() + future.set_result(None) + + loop.add_reader(parent_conn.fileno(), eof_callback) + proc = multiprocessing.Process(target=fork_main, args=(parent_conn, child_conn)) + proc.start() + child_conn.close() + + return future + + +class EventLoopInForkTestCase(TestCase): + """ + The default asyncio event loop policy does not support loops + running in forks, see https://bugs.python.org/issue22087. + Portage's DefaultEventLoopPolicy supports forks. + """ + + def testEventLoopInForkTestCase(self): + initial_policy = asyncio.get_event_loop_policy() + if not isinstance(initial_policy, DefaultEventLoopPolicy): + asyncio.set_event_loop_policy(DefaultEventLoopPolicy()) + try: + loop = asyncio.get_event_loop() + exit_future = loop.create_future() + def trigger_exit(*args): + exit_future.set_result(True) + + def start_async_main(): + async_main(loop=loop).add_done_callback(trigger_exit) + + loop.call_soon(start_async_main) + loop.run_until_complete(exit_future) + finally: + asyncio.set_event_loop_policy(initial_policy) diff --git a/pym/portage/util/_eventloop/EventLoop.py b/pym/portage/util/_eventloop/EventLoop.py index 72eb407fc..d53a76ba1 100644 --- a/pym/portage/util/_eventloop/EventLoop.py +++ b/pym/portage/util/_eventloop/EventLoop.py @@ -23,8 +23,9 @@ except ImportError: import portage portage.proxy.lazyimport.lazyimport(globals(), - 'portage.util.futures.futures:_EventLoopFuture', + 'portage.util.futures.futures:Future', 'portage.util.futures.executor.fork:ForkExecutor', + 'portage.util.futures.unix_events:_PortageEventLoop', ) from portage import OrderedDict @@ -188,15 +189,13 @@ class EventLoop(object): self._sigchld_write = None self._sigchld_src_id = None self._pid = os.getpid() + self._asyncio_wrapper = _PortageEventLoop(loop=self) def create_future(self): """ - Create a Future object attached to the loop. This returns - an instance of _EventLoopFuture, because EventLoop is currently - missing some of the asyncio.AbstractEventLoop methods that - asyncio.Future requires. + Create a Future object attached to the loop. """ - return _EventLoopFuture(loop=self) + return Future(loop=self._asyncio_wrapper) def _new_source_id(self): """ diff --git a/pym/portage/util/futures/__init__.py b/pym/portage/util/futures/__init__.py index e69de29bb..789080c85 100644 --- a/pym/portage/util/futures/__init__.py +++ b/pym/portage/util/futures/__init__.py @@ -0,0 +1,9 @@ + +__all__ = ( + 'asyncio', +) + +try: + import asyncio +except ImportError: + from portage.util.futures import _asyncio as asyncio diff --git a/pym/portage/util/futures/_asyncio.py b/pym/portage/util/futures/_asyncio.py new file mode 100644 index 000000000..6874e133f --- /dev/null +++ b/pym/portage/util/futures/_asyncio.py @@ -0,0 +1,116 @@ +# Copyright 2018 Gentoo Foundation +# Distributed under the terms of the GNU General Public License v2 + +__all__ = ( + 'ensure_future', + 'get_event_loop', + 'get_event_loop_policy', + 'set_event_loop_policy', + 'sleep', + 'Task', +) + +import functools + +try: + import threading +except ImportError: + import dummy_threading as threading + +import portage +portage.proxy.lazyimport.lazyimport(globals(), + 'portage.util.futures.unix_events:DefaultEventLoopPolicy', +) +from portage.util.futures.futures import Future + +_lock = threading.Lock() +_policy = None + + +def get_event_loop_policy(): + """ + Get the current event loop policy. + + @rtype: asyncio.AbstractEventLoopPolicy (or compatible) + @return: the current event loop policy + """ + global _lock, _policy + with _lock: + if _policy is None: + _policy = DefaultEventLoopPolicy() + return _policy + + +def set_event_loop_policy(policy): + """ + Set the current event loop policy. If policy is None, the default + policy is restored. + + @type policy: asyncio.AbstractEventLoopPolicy or None + @param policy: new event loop policy + """ + global _lock, _policy + with _lock: + _policy = policy or DefaultEventLoopPolicy() + + +def get_event_loop(): + """ + Equivalent to calling get_event_loop_policy().get_event_loop(). + + @rtype: asyncio.AbstractEventLoop (or compatible) + @return: the event loop for the current context + """ + return get_event_loop_policy().get_event_loop() + + +class Task(Future): + """ + Schedule the execution of a coroutine: wrap it in a future. A task + is a subclass of Future. + """ + def __init__(self, coro, loop=None): + raise NotImplementedError + + +def ensure_future(coro_or_future, loop=None): + """ + Wrap a coroutine or an awaitable in a future. + + If the argument is a Future, it is returned directly. + + @type coro_or_future: coroutine or Future + @param coro_or_future: coroutine or future to wrap + @type loop: asyncio.AbstractEventLoop (or compatible) + @param loop: event loop + @rtype: asyncio.Future (or compatible) + @return: an instance of Future + """ + if isinstance(coro_or_future, Future): + return coro_or_future + raise NotImplementedError + + +def sleep(delay, result=None, loop=None): + """ + Create a future that completes after a given time (in seconds). If + result is provided, it is produced to the caller when the future + completes. + + @type delay: int or float + @param delay: delay seconds + @type result: object + @param result: result of the future + @type loop: asyncio.AbstractEventLoop (or compatible) + @param loop: event loop + @rtype: asyncio.Future (or compatible) + @return: an instance of Future + """ + loop = loop or get_event_loop() + future = loop.create_future() + handle = loop.call_later(delay, functools.partial(future.set_result, result)) + def cancel_callback(future): + if future.cancelled(): + handle.cancel() + future.add_done_callback(cancel_callback) + return future diff --git a/pym/portage/util/futures/events.py b/pym/portage/util/futures/events.py new file mode 100644 index 000000000..b772bc242 --- /dev/null +++ b/pym/portage/util/futures/events.py @@ -0,0 +1,191 @@ +# Copyright 2018 Gentoo Foundation +# Distributed under the terms of the GNU General Public License v2 + +__all__ = ( + 'AbstractEventLoopPolicy', + 'AbstractEventLoop', +) + +import socket +import subprocess + +try: + from asyncio.events import ( + AbstractEventLoop as _AbstractEventLoop, + AbstractEventLoopPolicy as _AbstractEventLoopPolicy, + ) +except ImportError: + _AbstractEventLoop = object + _AbstractEventLoopPolicy = object + + +class AbstractEventLoopPolicy(_AbstractEventLoopPolicy): + """Abstract policy for accessing the event loop.""" + + def get_event_loop(self): + raise NotImplementedError + + def set_event_loop(self, loop): + raise NotImplementedError + + def new_event_loop(self): + raise NotImplementedError + + def get_child_watcher(self): + raise NotImplementedError + + def set_child_watcher(self, watcher): + raise NotImplementedError + + +class AbstractEventLoop(_AbstractEventLoop): + """Abstract event loop.""" + + def run_forever(self): + raise NotImplementedError + + def run_until_complete(self, future): + raise NotImplementedError + + def stop(self): + raise NotImplementedError + + def is_running(self): + raise NotImplementedError + + def is_closed(self): + raise NotImplementedError + + def close(self): + raise NotImplementedError + + def shutdown_asyncgens(self): + raise NotImplementedError + + def _timer_handle_cancelled(self, handle): + raise NotImplementedError + + def call_soon(self, callback, *args): + return self.call_later(0, callback, *args) + + def call_later(self, delay, callback, *args): + raise NotImplementedError + + def call_at(self, when, callback, *args): + raise NotImplementedError + + def time(self): + raise NotImplementedError + + def create_future(self): + raise NotImplementedError + + def create_task(self, coro): + raise NotImplementedError + + def call_soon_threadsafe(self, callback, *args): + raise NotImplementedError + + def run_in_executor(self, executor, func, *args): + raise NotImplementedError + + def set_default_executor(self, executor): + raise NotImplementedError + + def getaddrinfo(self, host, port, family=0, type=0, proto=0, flags=0): + raise NotImplementedError + + def getnameinfo(self, sockaddr, flags=0): + raise NotImplementedError + + def create_connection(self, protocol_factory, host=None, port=None, + ssl=None, family=0, proto=0, flags=0, sock=None, + local_addr=None, server_hostname=None): + raise NotImplementedError + + def create_server(self, protocol_factory, host=None, port=None, + family=socket.AF_UNSPEC, flags=socket.AI_PASSIVE, + sock=None, backlog=100, ssl=None, reuse_address=None, + reuse_port=None): + raise NotImplementedError + + def create_unix_connection(self, protocol_factory, path, + ssl=None, sock=None, + server_hostname=None): + raise NotImplementedError + + def create_unix_server(self, protocol_factory, path, + sock=None, backlog=100, ssl=None): + raise NotImplementedError + + def create_datagram_endpoint(self, protocol_factory, + local_addr=None, remote_addr=None, + family=0, proto=0, flags=0, + reuse_address=None, reuse_port=None, + allow_broadcast=None, sock=None): + raise NotImplementedError + + def connect_read_pipe(self, protocol_factory, pipe): + raise NotImplementedError + + def connect_write_pipe(self, protocol_factory, pipe): + raise NotImplementedError + + def subprocess_shell(self, protocol_factory, cmd, stdin=subprocess.PIPE, + stdout=subprocess.PIPE, stderr=subprocess.PIPE, + **kwargs): + raise NotImplementedError + + def subprocess_exec(self, protocol_factory, *args, **kwargs): + for k in ('stdin', 'stdout', 'stderr'): + kwargs.setdefault(k, subprocess.PIPE) + raise NotImplementedError + + def add_writer(self, fd, callback, *args): + raise NotImplementedError + + def remove_writer(self, fd): + raise NotImplementedError + + def sock_recv(self, sock, nbytes): + raise NotImplementedError + + def sock_sendall(self, sock, data): + raise NotImplementedError + + def sock_connect(self, sock, address): + raise NotImplementedError + + def sock_accept(self, sock): + raise NotImplementedError + + def add_signal_handler(self, sig, callback, *args): + raise NotImplementedError + + def remove_signal_handler(self, sig): + raise NotImplementedError + + def set_task_factory(self, factory): + raise NotImplementedError + + def get_task_factory(self): + raise NotImplementedError + + def get_exception_handler(self): + raise NotImplementedError + + def set_exception_handler(self, handler): + raise NotImplementedError + + def default_exception_handler(self, context): + raise NotImplementedError + + def call_exception_handler(self, context): + raise NotImplementedError + + def get_debug(self): + raise NotImplementedError + + def set_debug(self, enabled): + raise NotImplementedError + diff --git a/pym/portage/util/futures/futures.py b/pym/portage/util/futures/futures.py index cd56a27eb..dc4e0a7d7 100644 --- a/pym/portage/util/futures/futures.py +++ b/pym/portage/util/futures/futures.py @@ -41,7 +41,10 @@ except ImportError: Future = None -from portage.util._eventloop.global_event_loop import global_event_loop +import portage +portage.proxy.lazyimport.lazyimport(globals(), + 'portage.util._eventloop.global_event_loop:global_event_loop@_global_event_loop', +) _PENDING = 'PENDING' _CANCELLED = 'CANCELLED' @@ -69,7 +72,7 @@ class _EventLoopFuture(object): the default event loop. """ if loop is None: - self._loop = global_event_loop() + self._loop = _global_event_loop()._asyncio_wrapper else: self._loop = loop self._callbacks = [] diff --git a/pym/portage/util/futures/unix_events.py b/pym/portage/util/futures/unix_events.py new file mode 100644 index 000000000..ed4c6e519 --- /dev/null +++ b/pym/portage/util/futures/unix_events.py @@ -0,0 +1,91 @@ +# Copyright 2018 Gentoo Foundation +# Distributed under the terms of the GNU General Public License v2 + +__all__ = ( + 'DefaultEventLoopPolicy', +) + +from portage.util._eventloop.global_event_loop import ( + global_event_loop as _global_event_loop, +) +from portage.util.futures import ( + asyncio, + events, +) +from portage.util.futures.futures import Future + + +class _PortageEventLoop(events.AbstractEventLoop): + """ + Implementation of asyncio.AbstractEventLoop which wraps portage's + internal event loop. + """ + + def __init__(self, loop): + """ + @type loop: EventLoop + @param loop: an instance of portage's internal event loop + """ + self._loop = loop + self.call_soon = loop.call_soon + self.call_soon_threadsafe = loop.call_soon_threadsafe + self.call_later = loop.call_later + self.call_at = loop.call_at + self.is_closed = loop.is_closed + self.close = loop.close + self.create_future = loop.create_future + self.add_reader = loop.add_reader + self.remove_reader = loop.remove_reader + self.add_writer = loop.add_writer + self.remove_writer = loop.remove_writer + self.run_in_executor = loop.run_in_executor + self.time = loop.time + self.set_debug = loop.set_debug + self.get_debug = loop.get_debug + + def run_until_complete(self, future): + """ + Run the event loop until a Future is done. + + @type future: asyncio.Future + @param future: a Future to wait for + @rtype: object + @return: the Future's result + @raise: the Future's exception + """ + return self._loop.run_until_complete( + asyncio.ensure_future(future, loop=self)) + + def create_task(self, coro): + """ + Schedule a coroutine object. + + @type coro: coroutine + @param coro: a coroutine to schedule + @rtype: asyncio.Task + @return: a task object + """ + return asyncio.Task(coro, loop=self) + + +class _PortageEventLoopPolicy(events.AbstractEventLoopPolicy): + """ + Implementation of asyncio.AbstractEventLoopPolicy based on portage's + internal event loop. This supports running event loops in forks, + which is not supported by the default asyncio event loop policy, + see https://bugs.python.org/issue22087. + """ + def get_event_loop(self): + """ + Get the event loop for the current context. + + Returns an event loop object implementing the AbstractEventLoop + interface. + + @rtype: asyncio.AbstractEventLoop (or compatible) + @return: the current event loop policy + """ + return _global_event_loop()._asyncio_wrapper + + +DefaultEventLoopPolicy = _PortageEventLoopPolicy -- 2.13.6