commit:     d31db4dfb58fcd95f2590dfaed19bce4ef31bbd2
Author:     Zac Medico <zmedico <AT> gentoo <DOT> org>
AuthorDate: Thu Apr 12 03:56:25 2018 +0000
Commit:     Zac Medico <zmedico <AT> gentoo <DOT> org>
CommitDate: Fri Apr 13 07:10:10 2018 +0000
URL:        https://gitweb.gentoo.org/proj/portage.git/commit/?id=d31db4df

Implement _PortageEventLoop.subprocess_exec (bug 649588)

In python versions that support asyncio, this allows API consumers
to use the asyncio.create_subprocess_exec() function with portage's
internal event loop. Currently, subprocess.PIPE is not implemented
because that would require an implementation of asyncio's private
asyncio.unix_events._UnixReadPipeTransport class. However, it's
possible to use pipes created with os.pipe() for stdin, stdout,
and stderr, as demonstrated in the included unit tests.

Bug: https://bugs.gentoo.org/649588

 .../util/futures/asyncio/test_subprocess_exec.py   | 163 +++++++++++++++++++++
 pym/portage/util/futures/unix_events.py            |  98 +++++++++++++
 2 files changed, 261 insertions(+)

diff --git a/pym/portage/tests/util/futures/asyncio/test_subprocess_exec.py 
b/pym/portage/tests/util/futures/asyncio/test_subprocess_exec.py
new file mode 100644
index 000000000..d30f48c43
--- /dev/null
+++ b/pym/portage/tests/util/futures/asyncio/test_subprocess_exec.py
@@ -0,0 +1,163 @@
+# Copyright 2018 Gentoo Foundation
+# Distributed under the terms of the GNU General Public License v2
+
+import os
+
+from portage.process import find_binary
+from portage.tests import TestCase
+from portage.util.futures import asyncio
+from portage.util.futures.executor.fork import ForkExecutor
+from portage.util.futures.unix_events import DefaultEventLoopPolicy
+from _emerge.PipeReader import PipeReader
+
+
+def reader(input_file, loop=None):
+       """
+       Asynchronously read a binary input file.
+
+       @param input_file: binary input file
+       @type input_file: file
+       @param loop: event loop
+       @type loop: EventLoop
+       @return: bytes
+       @rtype: asyncio.Future (or compatible)
+       """
+       loop = loop or asyncio.get_event_loop()
+       loop = getattr(loop, '_asyncio_wrapper', loop)
+       future = loop.create_future()
+       _Reader(future, input_file, loop)
+       return future
+
+
+class _Reader(object):
+       def __init__(self, future, input_file, loop):
+               self._future = future
+               self._pipe_reader = PipeReader(
+                       input_files={'input_file':input_file}, 
scheduler=loop._loop)
+
+               self._future.add_done_callback(self._cancel_callback)
+               self._pipe_reader.addExitListener(self._eof)
+               self._pipe_reader.start()
+
+       def _cancel_callback(self, future):
+               if future.cancelled():
+                       self._cancel()
+
+       def _eof(self, pipe_reader):
+               self._pipe_reader = None
+               self._future.set_result(pipe_reader.getvalue())
+
+       def _cancel(self):
+               if self._pipe_reader is not None and self._pipe_reader.poll() 
is None:
+                       self._pipe_reader.removeExitListener(self._eof)
+                       self._pipe_reader.cancel()
+                       self._pipe_reader = None
+
+
+class SubprocessExecTestCase(TestCase):
+       def _run_test(self, test):
+               initial_policy = asyncio.get_event_loop_policy()
+               if not isinstance(initial_policy, DefaultEventLoopPolicy):
+                       asyncio.set_event_loop_policy(DefaultEventLoopPolicy())
+
+               try:
+                       test(asyncio.get_event_loop())
+               finally:
+                       asyncio.set_event_loop_policy(initial_policy)
+
+       def testEcho(self):
+               if not hasattr(asyncio, 'create_subprocess_exec'):
+                       self.skipTest('create_subprocess_exec not implemented 
for python2')
+
+               args_tuple = (b'hello', b'world')
+               echo_binary = find_binary("echo")
+               self.assertNotEqual(echo_binary, None)
+               echo_binary = echo_binary.encode()
+
+               # Use os.pipe(), since this loop does not implement the
+               # ReadTransport necessary for subprocess.PIPE support.
+               stdout_pr, stdout_pw = os.pipe()
+               stdout_pr = os.fdopen(stdout_pr, 'rb', 0)
+               stdout_pw = os.fdopen(stdout_pw, 'wb', 0)
+               files = [stdout_pr, stdout_pw]
+
+               def test(loop):
+                       output = None
+                       try:
+                               with open(os.devnull, 'rb', 0) as devnull:
+                                       proc = loop.run_until_complete(
+                                               asyncio.create_subprocess_exec(
+                                               echo_binary, *args_tuple,
+                                               stdin=devnull, 
stdout=stdout_pw, stderr=stdout_pw))
+
+                               # This belongs exclusively to the subprocess 
now.
+                               stdout_pw.close()
+
+                               output = asyncio.ensure_future(
+                                       reader(stdout_pr, loop=loop), loop=loop)
+
+                               self.assertEqual(
+                                       loop.run_until_complete(proc.wait()), 
os.EX_OK)
+                               self.assertEqual(
+                                       
tuple(loop.run_until_complete(output).split()), args_tuple)
+                       finally:
+                               if output is not None and not output.done():
+                                       output.cancel()
+                               for f in files:
+                                       f.close()
+
+               self._run_test(test)
+
+       def testCat(self):
+               if not hasattr(asyncio, 'create_subprocess_exec'):
+                       self.skipTest('create_subprocess_exec not implemented 
for python2')
+
+               stdin_data = b'hello world'
+               cat_binary = find_binary("cat")
+               self.assertNotEqual(cat_binary, None)
+               cat_binary = cat_binary.encode()
+
+               # Use os.pipe(), since this loop does not implement the
+               # ReadTransport necessary for subprocess.PIPE support.
+               stdout_pr, stdout_pw = os.pipe()
+               stdout_pr = os.fdopen(stdout_pr, 'rb', 0)
+               stdout_pw = os.fdopen(stdout_pw, 'wb', 0)
+
+               stdin_pr, stdin_pw = os.pipe()
+               stdin_pr = os.fdopen(stdin_pr, 'rb', 0)
+               stdin_pw = os.fdopen(stdin_pw, 'wb', 0)
+
+               files = [stdout_pr, stdout_pw, stdin_pr, stdin_pw]
+
+               def test(loop):
+                       output = None
+                       try:
+                               proc = loop.run_until_complete(
+                                       asyncio.create_subprocess_exec(
+                                       cat_binary,
+                                       stdin=stdin_pr, stdout=stdout_pw, 
stderr=stdout_pw))
+
+                               # These belong exclusively to the subprocess 
now.
+                               stdout_pw.close()
+                               stdin_pr.close()
+
+                               output = asyncio.ensure_future(
+                                       reader(stdout_pr, loop=loop), loop=loop)
+
+                               with ForkExecutor(loop=loop) as executor:
+                                       writer = 
asyncio.ensure_future(loop.run_in_executor(
+                                               executor, stdin_pw.write, 
stdin_data), loop=loop)
+
+                                       # This belongs exclusively to the 
writer now.
+                                       stdin_pw.close()
+                                       loop.run_until_complete(writer)
+
+                               
self.assertEqual(loop.run_until_complete(proc.wait()), os.EX_OK)
+                               
self.assertEqual(loop.run_until_complete(output), stdin_data)
+                       finally:
+                               if output is not None and not output.done():
+                                       output.cancel()
+                               for f in files:
+                                       f.close()
+
+               self._run_test(test)

diff --git a/pym/portage/util/futures/unix_events.py 
b/pym/portage/util/futures/unix_events.py
index 5434cd942..1abc420e1 100644
--- a/pym/portage/util/futures/unix_events.py
+++ b/pym/portage/util/futures/unix_events.py
@@ -7,11 +7,15 @@ __all__ = (
 )
 
 try:
+       from asyncio.base_subprocess import BaseSubprocessTransport as 
_BaseSubprocessTransport
        from asyncio.unix_events import AbstractChildWatcher as 
_AbstractChildWatcher
 except ImportError:
        _AbstractChildWatcher = object
+       _BaseSubprocessTransport = object
 
+import functools
 import os
+import subprocess
 
 from portage.util._eventloop.global_event_loop import (
        global_event_loop as _global_event_loop,
@@ -77,6 +81,100 @@ class _PortageEventLoop(events.AbstractEventLoop):
                """
                return asyncio.Task(coro, loop=self)
 
+       def subprocess_exec(self, protocol_factory, program, *args, **kwargs):
+               """
+               Run subprocesses asynchronously using the subprocess module.
+
+               @type protocol_factory: callable
+               @param protocol_factory: must instantiate a subclass of the
+                       asyncio.SubprocessProtocol class
+               @type program: str or bytes
+               @param program: the program to execute
+               @type args: str or bytes
+               @param args: program's arguments
+               @type kwargs: varies
+               @param kwargs: subprocess.Popen parameters
+               @rtype: asyncio.Future
+               @return: Returns a pair of (transport, protocol), where 
transport
+                       is an instance of BaseSubprocessTransport
+               """
+
+               # python2.7 does not allow arguments with defaults after *args
+               stdin = kwargs.pop('stdin', subprocess.PIPE)
+               stdout = kwargs.pop('stdout', subprocess.PIPE)
+               stderr = kwargs.pop('stderr', subprocess.PIPE)
+
+               if subprocess.PIPE in (stdin, stdout, stderr):
+                       # Requires connect_read/write_pipe implementation, for 
example
+                       # see asyncio.unix_events._UnixReadPipeTransport.
+                       raise NotImplementedError()
+
+               universal_newlines = kwargs.pop('universal_newlines', False)
+               shell = kwargs.pop('shell', False)
+               bufsize = kwargs.pop('bufsize', 0)
+
+               if universal_newlines:
+                       raise ValueError("universal_newlines must be False")
+               if shell:
+                       raise ValueError("shell must be False")
+               if bufsize != 0:
+                       raise ValueError("bufsize must be 0")
+               popen_args = (program,) + args
+               for arg in popen_args:
+                       if not isinstance(arg, (str, bytes)):
+                               raise TypeError("program arguments must be "
+                                                               "a bytes or 
text string, not %s"
+                                                               % 
type(arg).__name__)
+               result = self.create_future()
+               self._make_subprocess_transport(
+                       result, protocol_factory(), popen_args, False, stdin, 
stdout, stderr,
+                       bufsize, **kwargs)
+               return result
+
+       def _make_subprocess_transport(self, result, protocol, args, shell,
+               stdin, stdout, stderr, bufsize, extra=None, **kwargs):
+               waiter = self.create_future()
+               transp = _UnixSubprocessTransport(self,
+                       protocol, args, shell, stdin, stdout, stderr, bufsize,
+                       waiter=waiter, extra=extra,
+                       **kwargs)
+
+               self._loop._asyncio_child_watcher.add_child_handler(
+                       transp.get_pid(), self._child_watcher_callback, transp)
+
+               waiter.add_done_callback(functools.partial(
+                       self._subprocess_transport_callback, transp, protocol, 
result))
+
+       def _subprocess_transport_callback(self, transp, protocol, result, 
waiter):
+               if waiter.exception() is None:
+                       result.set_result((transp, protocol))
+               else:
+                       transp.close()
+                       wait_transp = asyncio.ensure_future(transp._wait(), 
loop=self)
+                       wait_transp.add_done_callback(
+                               
functools.partial(self._subprocess_transport_failure,
+                               result, waiter.exception()))
+
+       def _child_watcher_callback(self, pid, returncode, transp):
+               self.call_soon_threadsafe(transp._process_exited, returncode)
+
+       def _subprocess_transport_failure(self, result, exception, wait_transp):
+               result.set_exception(wait_transp.exception() or exception)
+
+
+class _UnixSubprocessTransport(_BaseSubprocessTransport):
+       """
+       This is identical to the standard library's private
+       asyncio.unix_events._UnixSubprocessTransport class, except that
+       subprocess.PIPE is not implemented for stdin, since that would
+       require connect_write_pipe support in the event loop. For example,
+       see the asyncio.unix_events._UnixWritePipeTransport class.
+       """
+       def _start(self, args, shell, stdin, stdout, stderr, bufsize, **kwargs):
+               self._proc = subprocess.Popen(
+                       args, shell=shell, stdin=stdin, stdout=stdout, 
stderr=stderr,
+                       universal_newlines=False, bufsize=bufsize, **kwargs)
+
 
 class AbstractChildWatcher(_AbstractChildWatcher):
        def add_child_handler(self, pid, callback, *args):

Reply via email to