In python versions that support asyncio, this allows API consumers to use the asyncio.create_subprocess_exec() function with portage's internal event loop. Currently, subprocess.PIPE is not implemented because that would require an implementation of asyncio's private asyncio.unix_events._UnixReadPipeTransport class. However, it's possible to use pipes created with os.pipe() for stdin, stdout, and stderr, as demonstrated in the included unit tests.
Bug: https://bugs.gentoo.org/649588 --- .../util/futures/asyncio/test_subprocess_exec.py | 136 +++++++++++++++++++++ pym/portage/util/futures/unix_events.py | 98 +++++++++++++++ 2 files changed, 234 insertions(+) create mode 100644 pym/portage/tests/util/futures/asyncio/test_subprocess_exec.py diff --git a/pym/portage/tests/util/futures/asyncio/test_subprocess_exec.py b/pym/portage/tests/util/futures/asyncio/test_subprocess_exec.py new file mode 100644 index 000000000..f77d362f3 --- /dev/null +++ b/pym/portage/tests/util/futures/asyncio/test_subprocess_exec.py @@ -0,0 +1,136 @@ +# Copyright 2018 Gentoo Foundation +# Distributed under the terms of the GNU General Public License v2 + +import errno +import fcntl +import os + +from portage.process import find_binary +from portage.tests import TestCase +from portage.util.futures import asyncio +from portage.util.futures.executor.fork import ForkExecutor +from portage.util.futures.unix_events import DefaultEventLoopPolicy + + +class SubprocessExecTestCase(TestCase): + def _run_test(self, test, cleanup): + initial_policy = asyncio.get_event_loop_policy() + if not isinstance(initial_policy, DefaultEventLoopPolicy): + asyncio.set_event_loop_policy(DefaultEventLoopPolicy()) + + try: + test(asyncio.get_event_loop()) + finally: + cleanup(asyncio.get_event_loop()) + asyncio.set_event_loop_policy(initial_policy) + + def testEcho(self): + if not hasattr(asyncio, 'create_subprocess_exec'): + self.skipTest('create_subprocess_exec not implemented for python2') + + args_tuple = (b'hello', b'world') + echo_binary = find_binary("echo") + self.assertNotEqual(echo_binary, None) + echo_binary = os.fsencode(echo_binary) + + # Use os.pipe(), since this loop does not implement the + # ReadTransport necessary for subprocess.PIPE support. + stdout_pr, stdout_pw = os.pipe() + stdout_pr = open(stdout_pr, 'rb', 0) + stdout_pw = open(stdout_pw, 'wb', 0) + files = [stdout_pr, stdout_pw] + + def test(loop): + with open(os.devnull, 'rb', 0) as devnull: + proc = loop.run_until_complete( + asyncio.create_subprocess_exec( + echo_binary, *args_tuple, + stdin=devnull, stdout=stdout_pw, stderr=stdout_pw)) + + # This belongs exclusively to the subprocess now. + stdout_pw.close() + + read_buffer = [] + fcntl.fcntl(stdout_pr.fileno(), fcntl.F_SETFL, + fcntl.fcntl(stdout_pr.fileno(), fcntl.F_GETFL) | os.O_NONBLOCK) + loop.add_reader(stdout_pr.fileno(), + lambda: read_buffer.extend(self._read_stdio(stdout_pr))) + + self.assertEqual(loop.run_until_complete(proc.wait()), os.EX_OK) + self.assertEqual(tuple(b''.join(read_buffer).split()), args_tuple) + + def cleanup(loop): + loop.remove_reader(stdout_pr.fileno()) + for f in files: + f.close() + + self._run_test(test, cleanup) + + def testCat(self): + if not hasattr(asyncio, 'create_subprocess_exec'): + self.skipTest('create_subprocess_exec not implemented for python2') + + stdin_data = b'hello world' + cat_binary = find_binary("cat") + self.assertNotEqual(cat_binary, None) + cat_binary = os.fsencode(cat_binary) + + # Use os.pipe(), since this loop does not implement the + # ReadTransport necessary for subprocess.PIPE support. + stdout_pr, stdout_pw = os.pipe() + stdout_pr = open(stdout_pr, 'rb', 0) + stdout_pw = open(stdout_pw, 'wb', 0) + + stdin_pr, stdin_pw = os.pipe() + stdin_pr = open(stdin_pr, 'rb', 0) + stdin_pw = open(stdin_pw, 'wb', 0) + + files = [stdout_pr, stdout_pw, stdin_pr, stdin_pw] + + def test(loop): + proc = loop.run_until_complete( + asyncio.create_subprocess_exec( + cat_binary, + stdin=stdin_pr, stdout=stdout_pw, stderr=stdout_pw)) + + # These belong exclusively to the subprocess now. + stdout_pw.close() + stdin_pr.close() + + read_buffer = [] + fcntl.fcntl(stdout_pr.fileno(), fcntl.F_SETFL, + fcntl.fcntl(stdout_pr.fileno(), fcntl.F_GETFL) | os.O_NONBLOCK) + loop.add_reader(stdout_pr.fileno(), + lambda: read_buffer.extend(self._read_stdio(stdout_pr))) + + with ForkExecutor(loop=loop) as executor: + writer = asyncio.ensure_future(loop.run_in_executor( + executor, stdin_pw.write, stdin_data), loop=loop) + + # This belongs exlusively to the writer now. + stdin_pw.close() + loop.run_until_complete(writer) + + self.assertEqual(loop.run_until_complete(proc.wait()), os.EX_OK) + self.assertEqual(b''.join(read_buffer), stdin_data) + + def cleanup(loop): + loop.remove_reader(stdout_pr.fileno()) + for f in files: + f.close() + + self._run_test(test, cleanup) + + @staticmethod + def _read_stdio(stdio_pr): + while True: + try: + buf = stdio_pr.read() + except OSError as e: + if e.errno == errno.EAGAIN: + break + else: + if buf: + yield buf + else: + break diff --git a/pym/portage/util/futures/unix_events.py b/pym/portage/util/futures/unix_events.py index 6fcef45fa..d5c0480bf 100644 --- a/pym/portage/util/futures/unix_events.py +++ b/pym/portage/util/futures/unix_events.py @@ -7,11 +7,15 @@ __all__ = ( ) try: + from asyncio.base_subprocess import BaseSubprocessTransport as _BaseSubprocessTransport from asyncio.unix_events import AbstractChildWatcher as _AbstractChildWatcher except ImportError: _AbstractChildWatcher = object + _BaseSubprocessTransport = object +import functools import os +import subprocess from portage.util._eventloop.global_event_loop import ( global_event_loop as _global_event_loop, @@ -75,6 +79,100 @@ class _PortageEventLoop(events.AbstractEventLoop): """ return asyncio.Task(coro, loop=self) + def subprocess_exec(self, protocol_factory, program, *args, **kwargs): + """ + Run subprocesses asynchronously using the subprocess module. + + @type protocol_factory: callable + @param protocol_factory: must instantiate a subclass of the + asyncio.SubprocessProtocol class + @type program: str or bytes + @param program: the program to execute + @type args: str or bytes + @param args: program's arguments + @type kwargs: varies + @param kwargs: subprocess.Popen parameters + @rtype: asyncio.Future + @return: Returns a pair of (transport, protocol), where transport + is an instance of BaseSubprocessTransport + """ + + # python2.7 does not allow arguments with defaults after *args + stdin = kwargs.pop('stdin', subprocess.PIPE) + stdout = kwargs.pop('stdout', subprocess.PIPE) + stderr = kwargs.pop('stderr', subprocess.PIPE) + + if subprocess.PIPE in (stdin, stdout, stderr): + # Requires connect_read/write_pipe implementation, for example + # see asyncio.unix_events._UnixReadPipeTransport. + raise NotImplementedError() + + universal_newlines = kwargs.pop('universal_newlines', False) + shell = kwargs.pop('shell', False) + bufsize = kwargs.pop('bufsize', 0) + + if universal_newlines: + raise ValueError("universal_newlines must be False") + if shell: + raise ValueError("shell must be False") + if bufsize != 0: + raise ValueError("bufsize must be 0") + popen_args = (program,) + args + for arg in popen_args: + if not isinstance(arg, (str, bytes)): + raise TypeError("program arguments must be " + "a bytes or text string, not %s" + % type(arg).__name__) + result = self.create_future() + self._make_subprocess_transport( + result, protocol_factory(), popen_args, False, stdin, stdout, stderr, + bufsize, **kwargs) + return result + + def _make_subprocess_transport(self, result, protocol, args, shell, + stdin, stdout, stderr, bufsize, extra=None, **kwargs): + waiter = self.create_future() + transp = _UnixSubprocessTransport(self, + protocol, args, shell, stdin, stdout, stderr, bufsize, + waiter=waiter, extra=extra, + **kwargs) + + self._loop._asyncio_child_watcher.add_child_handler( + transp.get_pid(), self._child_watcher_callback, transp) + + waiter.add_done_callback(functools.partial( + self._subprocess_transport_callback, transp, protocol, result)) + + def _subprocess_transport_callback(self, transp, protocol, result, waiter): + if waiter.exception() is None: + result.set_result((transp, protocol)) + else: + transp.close() + wait_transp = asyncio.ensure_future(transp._wait(), loop=self) + wait_transp.add_done_callback( + functools.partial(self._subprocess_transport_failure, + result, waiter.exception())) + + def _child_watcher_callback(self, pid, returncode, transp): + self.call_soon_threadsafe(transp._process_exited, returncode) + + def _subprocess_transport_failure(self, result, exception, wait_transp): + result.set_exception(wait_transp.exception() or exception) + + +class _UnixSubprocessTransport(_BaseSubprocessTransport): + """ + This is identical to the standard library's private + asyncio.unix_events._UnixSubprocessTransport class, except that + subprocess.PIPE is not implemented for stdin, since that would + require connect_write_pipe support in the event loop. For example, + see the asyncio.unix_events._UnixWritePipeTransport class. + """ + def _start(self, args, shell, stdin, stdout, stderr, bufsize, **kwargs): + self._proc = subprocess.Popen( + args, shell=shell, stdin=stdin, stdout=stdout, stderr=stderr, + universal_newlines=False, bufsize=bufsize, **kwargs) + class AbstractChildWatcher(_AbstractChildWatcher): def add_child_handler(self, pid, callback, *args): -- 2.13.6