In python versions that support asyncio, this allows API consumers
to use the asyncio.create_subprocess_exec() function with portage's
internal event loop. Currently, subprocess.PIPE is not implemented
because that would require an implementation of asyncio's private
asyncio.unix_events._UnixReadPipeTransport class. However, it's
possible to use pipes created with os.pipe() for stdin, stdout,
and stderr, as demonstrated in the included unit tests.

Bug: https://bugs.gentoo.org/649588
---
 .../util/futures/asyncio/test_subprocess_exec.py   | 136 +++++++++++++++++++++
 pym/portage/util/futures/unix_events.py            |  98 +++++++++++++++
 2 files changed, 234 insertions(+)
 create mode 100644 
pym/portage/tests/util/futures/asyncio/test_subprocess_exec.py

diff --git a/pym/portage/tests/util/futures/asyncio/test_subprocess_exec.py 
b/pym/portage/tests/util/futures/asyncio/test_subprocess_exec.py
new file mode 100644
index 000000000..f77d362f3
--- /dev/null
+++ b/pym/portage/tests/util/futures/asyncio/test_subprocess_exec.py
@@ -0,0 +1,136 @@
+# Copyright 2018 Gentoo Foundation
+# Distributed under the terms of the GNU General Public License v2
+
+import errno
+import fcntl
+import os
+
+from portage.process import find_binary
+from portage.tests import TestCase
+from portage.util.futures import asyncio
+from portage.util.futures.executor.fork import ForkExecutor
+from portage.util.futures.unix_events import DefaultEventLoopPolicy
+
+
+class SubprocessExecTestCase(TestCase):
+       def _run_test(self, test, cleanup):
+               initial_policy = asyncio.get_event_loop_policy()
+               if not isinstance(initial_policy, DefaultEventLoopPolicy):
+                       asyncio.set_event_loop_policy(DefaultEventLoopPolicy())
+
+               try:
+                       test(asyncio.get_event_loop())
+               finally:
+                       cleanup(asyncio.get_event_loop())
+                       asyncio.set_event_loop_policy(initial_policy)
+
+       def testEcho(self):
+               if not hasattr(asyncio, 'create_subprocess_exec'):
+                       self.skipTest('create_subprocess_exec not implemented 
for python2')
+
+               args_tuple = (b'hello', b'world')
+               echo_binary = find_binary("echo")
+               self.assertNotEqual(echo_binary, None)
+               echo_binary = os.fsencode(echo_binary)
+
+               # Use os.pipe(), since this loop does not implement the
+               # ReadTransport necessary for subprocess.PIPE support.
+               stdout_pr, stdout_pw = os.pipe()
+               stdout_pr = open(stdout_pr, 'rb', 0)
+               stdout_pw = open(stdout_pw, 'wb', 0)
+               files = [stdout_pr, stdout_pw]
+
+               def test(loop):
+                       with open(os.devnull, 'rb', 0) as devnull:
+                               proc = loop.run_until_complete(
+                                       asyncio.create_subprocess_exec(
+                                       echo_binary, *args_tuple,
+                                       stdin=devnull, stdout=stdout_pw, 
stderr=stdout_pw))
+
+                       # This belongs exclusively to the subprocess now.
+                       stdout_pw.close()
+
+                       read_buffer = []
+                       fcntl.fcntl(stdout_pr.fileno(), fcntl.F_SETFL,
+                               fcntl.fcntl(stdout_pr.fileno(), fcntl.F_GETFL) 
| os.O_NONBLOCK)
+                       loop.add_reader(stdout_pr.fileno(),
+                               lambda: 
read_buffer.extend(self._read_stdio(stdout_pr)))
+
+                       self.assertEqual(loop.run_until_complete(proc.wait()), 
os.EX_OK)
+                       self.assertEqual(tuple(b''.join(read_buffer).split()), 
args_tuple)
+
+               def cleanup(loop):
+                       loop.remove_reader(stdout_pr.fileno())
+                       for f in files:
+                               f.close()
+
+               self._run_test(test, cleanup)
+
+       def testCat(self):
+               if not hasattr(asyncio, 'create_subprocess_exec'):
+                       self.skipTest('create_subprocess_exec not implemented 
for python2')
+
+               stdin_data = b'hello world'
+               cat_binary = find_binary("cat")
+               self.assertNotEqual(cat_binary, None)
+               cat_binary = os.fsencode(cat_binary)
+
+               # Use os.pipe(), since this loop does not implement the
+               # ReadTransport necessary for subprocess.PIPE support.
+               stdout_pr, stdout_pw = os.pipe()
+               stdout_pr = open(stdout_pr, 'rb', 0)
+               stdout_pw = open(stdout_pw, 'wb', 0)
+
+               stdin_pr, stdin_pw = os.pipe()
+               stdin_pr = open(stdin_pr, 'rb', 0)
+               stdin_pw = open(stdin_pw, 'wb', 0)
+
+               files = [stdout_pr, stdout_pw, stdin_pr, stdin_pw]
+
+               def test(loop):
+                       proc = loop.run_until_complete(
+                               asyncio.create_subprocess_exec(
+                               cat_binary,
+                               stdin=stdin_pr, stdout=stdout_pw, 
stderr=stdout_pw))
+
+                       # These belong exclusively to the subprocess now.
+                       stdout_pw.close()
+                       stdin_pr.close()
+
+                       read_buffer = []
+                       fcntl.fcntl(stdout_pr.fileno(), fcntl.F_SETFL,
+                               fcntl.fcntl(stdout_pr.fileno(), fcntl.F_GETFL) 
| os.O_NONBLOCK)
+                       loop.add_reader(stdout_pr.fileno(),
+                               lambda: 
read_buffer.extend(self._read_stdio(stdout_pr)))
+
+                       with ForkExecutor(loop=loop) as executor:
+                               writer = 
asyncio.ensure_future(loop.run_in_executor(
+                                       executor, stdin_pw.write, stdin_data), 
loop=loop)
+
+                               # This belongs exlusively to the writer now.
+                               stdin_pw.close()
+                               loop.run_until_complete(writer)
+
+                       self.assertEqual(loop.run_until_complete(proc.wait()), 
os.EX_OK)
+                       self.assertEqual(b''.join(read_buffer), stdin_data)
+
+               def cleanup(loop):
+                       loop.remove_reader(stdout_pr.fileno())
+                       for f in files:
+                               f.close()
+
+               self._run_test(test, cleanup)
+
+       @staticmethod
+       def _read_stdio(stdio_pr):
+               while True:
+                       try:
+                               buf = stdio_pr.read()
+                       except OSError as e:
+                               if e.errno == errno.EAGAIN:
+                                       break
+                       else:
+                               if buf:
+                                       yield buf
+                               else:
+                                       break
diff --git a/pym/portage/util/futures/unix_events.py 
b/pym/portage/util/futures/unix_events.py
index 6fcef45fa..d5c0480bf 100644
--- a/pym/portage/util/futures/unix_events.py
+++ b/pym/portage/util/futures/unix_events.py
@@ -7,11 +7,15 @@ __all__ = (
 )
 
 try:
+       from asyncio.base_subprocess import BaseSubprocessTransport as 
_BaseSubprocessTransport
        from asyncio.unix_events import AbstractChildWatcher as 
_AbstractChildWatcher
 except ImportError:
        _AbstractChildWatcher = object
+       _BaseSubprocessTransport = object
 
+import functools
 import os
+import subprocess
 
 from portage.util._eventloop.global_event_loop import (
        global_event_loop as _global_event_loop,
@@ -75,6 +79,100 @@ class _PortageEventLoop(events.AbstractEventLoop):
                """
                return asyncio.Task(coro, loop=self)
 
+       def subprocess_exec(self, protocol_factory, program, *args, **kwargs):
+               """
+               Run subprocesses asynchronously using the subprocess module.
+
+               @type protocol_factory: callable
+               @param protocol_factory: must instantiate a subclass of the
+                       asyncio.SubprocessProtocol class
+               @type program: str or bytes
+               @param program: the program to execute
+               @type args: str or bytes
+               @param args: program's arguments
+               @type kwargs: varies
+               @param kwargs: subprocess.Popen parameters
+               @rtype: asyncio.Future
+               @return: Returns a pair of (transport, protocol), where 
transport
+                       is an instance of BaseSubprocessTransport
+               """
+
+               # python2.7 does not allow arguments with defaults after *args
+               stdin = kwargs.pop('stdin', subprocess.PIPE)
+               stdout = kwargs.pop('stdout', subprocess.PIPE)
+               stderr = kwargs.pop('stderr', subprocess.PIPE)
+
+               if subprocess.PIPE in (stdin, stdout, stderr):
+                       # Requires connect_read/write_pipe implementation, for 
example
+                       # see asyncio.unix_events._UnixReadPipeTransport.
+                       raise NotImplementedError()
+
+               universal_newlines = kwargs.pop('universal_newlines', False)
+               shell = kwargs.pop('shell', False)
+               bufsize = kwargs.pop('bufsize', 0)
+
+               if universal_newlines:
+                       raise ValueError("universal_newlines must be False")
+               if shell:
+                       raise ValueError("shell must be False")
+               if bufsize != 0:
+                       raise ValueError("bufsize must be 0")
+               popen_args = (program,) + args
+               for arg in popen_args:
+                       if not isinstance(arg, (str, bytes)):
+                               raise TypeError("program arguments must be "
+                                                               "a bytes or 
text string, not %s"
+                                                               % 
type(arg).__name__)
+               result = self.create_future()
+               self._make_subprocess_transport(
+                       result, protocol_factory(), popen_args, False, stdin, 
stdout, stderr,
+                       bufsize, **kwargs)
+               return result
+
+       def _make_subprocess_transport(self, result, protocol, args, shell,
+               stdin, stdout, stderr, bufsize, extra=None, **kwargs):
+               waiter = self.create_future()
+               transp = _UnixSubprocessTransport(self,
+                       protocol, args, shell, stdin, stdout, stderr, bufsize,
+                       waiter=waiter, extra=extra,
+                       **kwargs)
+
+               self._loop._asyncio_child_watcher.add_child_handler(
+                       transp.get_pid(), self._child_watcher_callback, transp)
+
+               waiter.add_done_callback(functools.partial(
+                       self._subprocess_transport_callback, transp, protocol, 
result))
+
+       def _subprocess_transport_callback(self, transp, protocol, result, 
waiter):
+               if waiter.exception() is None:
+                       result.set_result((transp, protocol))
+               else:
+                       transp.close()
+                       wait_transp = asyncio.ensure_future(transp._wait(), 
loop=self)
+                       wait_transp.add_done_callback(
+                               
functools.partial(self._subprocess_transport_failure,
+                               result, waiter.exception()))
+
+       def _child_watcher_callback(self, pid, returncode, transp):
+               self.call_soon_threadsafe(transp._process_exited, returncode)
+
+       def _subprocess_transport_failure(self, result, exception, wait_transp):
+               result.set_exception(wait_transp.exception() or exception)
+
+
+class _UnixSubprocessTransport(_BaseSubprocessTransport):
+       """
+       This is identical to the standard library's private
+       asyncio.unix_events._UnixSubprocessTransport class, except that
+       subprocess.PIPE is not implemented for stdin, since that would
+       require connect_write_pipe support in the event loop. For example,
+       see the asyncio.unix_events._UnixWritePipeTransport class.
+       """
+       def _start(self, args, shell, stdin, stdout, stderr, bufsize, **kwargs):
+               self._proc = subprocess.Popen(
+                       args, shell=shell, stdin=stdin, stdout=stdout, 
stderr=stderr,
+                       universal_newlines=False, bufsize=bufsize, **kwargs)
+
 
 class AbstractChildWatcher(_AbstractChildWatcher):
        def add_child_handler(self, pid, callback, *args):
-- 
2.13.6


Reply via email to