In python versions that support asyncio, this allows API consumers
to use subprocess.PIPE for asyncio.create_subprocess_exec() stdout
and stderr parameters.

Bug: https://bugs.gentoo.org/649588
---
 .../util/futures/asyncio/test_subprocess_exec.py   |  30 ++++
 pym/portage/util/futures/unix_events.py            | 157 ++++++++++++++++++++-
 2 files changed, 184 insertions(+), 3 deletions(-)

diff --git a/pym/portage/tests/util/futures/asyncio/test_subprocess_exec.py 
b/pym/portage/tests/util/futures/asyncio/test_subprocess_exec.py
index d30f48c43..94984fc93 100644
--- a/pym/portage/tests/util/futures/asyncio/test_subprocess_exec.py
+++ b/pym/portage/tests/util/futures/asyncio/test_subprocess_exec.py
@@ -2,6 +2,7 @@
 # Distributed under the terms of the GNU General Public License v2
 
 import os
+import subprocess
 
 from portage.process import find_binary
 from portage.tests import TestCase
@@ -161,3 +162,32 @@ class SubprocessExecTestCase(TestCase):
                                        f.close()
 
                self._run_test(test)
+
+       def testReadTransport(self):
+               """
+               Test asyncio.create_subprocess_exec(stdout=subprocess.PIPE) 
which
+               requires an AbstractEventLoop.connect_read_pipe implementation
+               (and a ReadTransport implementation for it to return).
+               """
+               if not hasattr(asyncio, 'create_subprocess_exec'):
+                       self.skipTest('create_subprocess_exec not implemented 
for python2')
+
+               args_tuple = (b'hello', b'world')
+               echo_binary = find_binary("echo")
+               self.assertNotEqual(echo_binary, None)
+               echo_binary = echo_binary.encode()
+
+               def test(loop):
+                       with open(os.devnull, 'rb', 0) as devnull:
+                               proc = loop.run_until_complete(
+                                       asyncio.create_subprocess_exec(
+                                       echo_binary, *args_tuple,
+                                       stdin=devnull,
+                                       stdout=subprocess.PIPE, 
stderr=subprocess.STDOUT))
+
+                       self.assertEqual(
+                               
tuple(loop.run_until_complete(proc.stdout.read()).split()),
+                               args_tuple)
+                       self.assertEqual(loop.run_until_complete(proc.wait()), 
os.EX_OK)
+
+               self._run_test(test)
diff --git a/pym/portage/util/futures/unix_events.py 
b/pym/portage/util/futures/unix_events.py
index 1abc420e1..6ba0adff6 100644
--- a/pym/portage/util/futures/unix_events.py
+++ b/pym/portage/util/futures/unix_events.py
@@ -9,12 +9,18 @@ __all__ = (
 try:
        from asyncio.base_subprocess import BaseSubprocessTransport as 
_BaseSubprocessTransport
        from asyncio.unix_events import AbstractChildWatcher as 
_AbstractChildWatcher
+       from asyncio.transports import ReadTransport as _ReadTransport
 except ImportError:
        _AbstractChildWatcher = object
        _BaseSubprocessTransport = object
+       _ReadTransport = object
 
+import errno
+import fcntl
 import functools
+import logging
 import os
+import stat
 import subprocess
 
 from portage.util._eventloop.global_event_loop import (
@@ -81,6 +87,35 @@ class _PortageEventLoop(events.AbstractEventLoop):
                """
                return asyncio.Task(coro, loop=self)
 
+       def connect_read_pipe(self, protocol_factory, pipe):
+               """
+               Register read pipe in event loop. Set the pipe to non-blocking 
mode.
+
+               @type protocol_factory: callable
+               @param protocol_factory: must instantiate object with Protocol 
interface
+               @type pipe: file
+               @param pipe: a pipe to read from
+               @rtype: asyncio.Future
+               @return: Return pair (transport, protocol), where transport 
supports the
+                       ReadTransport interface.
+               """
+               protocol = protocol_factory()
+               result = self.create_future()
+               waiter = self.create_future()
+               transport = self._make_read_pipe_transport(pipe, protocol, 
waiter=waiter)
+
+               def waiter_callback(waiter):
+                       try:
+                               waiter.result()
+                       except Exception as e:
+                               transport.close()
+                               result.set_exception(e)
+                       else:
+                               result.set_result((transport, protocol))
+
+               waiter.add_done_callback(waiter_callback)
+               return result
+
        def subprocess_exec(self, protocol_factory, program, *args, **kwargs):
                """
                Run subprocesses asynchronously using the subprocess module.
@@ -104,9 +139,9 @@ class _PortageEventLoop(events.AbstractEventLoop):
                stdout = kwargs.pop('stdout', subprocess.PIPE)
                stderr = kwargs.pop('stderr', subprocess.PIPE)
 
-               if subprocess.PIPE in (stdin, stdout, stderr):
-                       # Requires connect_read/write_pipe implementation, for 
example
-                       # see asyncio.unix_events._UnixReadPipeTransport.
+               if stdin == subprocess.PIPE:
+                       # Requires connect_write_pipe implementation, for 
example
+                       # see asyncio.unix_events._UnixWritePipeTransport.
                        raise NotImplementedError()
 
                universal_newlines = kwargs.pop('universal_newlines', False)
@@ -131,6 +166,10 @@ class _PortageEventLoop(events.AbstractEventLoop):
                        bufsize, **kwargs)
                return result
 
+       def _make_read_pipe_transport(self, pipe, protocol, waiter=None,
+                                                                 extra=None):
+               return _UnixReadPipeTransport(self, pipe, protocol, waiter, 
extra)
+
        def _make_subprocess_transport(self, result, protocol, args, shell,
                stdin, stdout, stderr, bufsize, extra=None, **kwargs):
                waiter = self.create_future()
@@ -162,6 +201,118 @@ class _PortageEventLoop(events.AbstractEventLoop):
                result.set_exception(wait_transp.exception() or exception)
 
 
+if hasattr(os, 'set_blocking'):
+       def _set_nonblocking(fd):
+               os.set_blocking(fd, False)
+else:
+       def _set_nonblocking(fd):
+               flags = fcntl.fcntl(fd, fcntl.F_GETFL)
+               flags = flags | os.O_NONBLOCK
+               fcntl.fcntl(fd, fcntl.F_SETFL, flags)
+
+
+class _UnixReadPipeTransport(_ReadTransport):
+       """
+       This is identical to the standard library's private
+       asyncio.unix_events._UnixReadPipeTransport class, except that it
+       only calls public AbstractEventLoop methods.
+       """
+
+       max_size = 256 * 1024  # max bytes we read in one event loop iteration
+
+       def __init__(self, loop, pipe, protocol, waiter=None, extra=None):
+               super().__init__(extra)
+               self._extra['pipe'] = pipe
+               self._loop = loop
+               self._pipe = pipe
+               self._fileno = pipe.fileno()
+               self._protocol = protocol
+               self._closing = False
+
+               mode = os.fstat(self._fileno).st_mode
+               if not (stat.S_ISFIFO(mode) or
+                               stat.S_ISSOCK(mode) or
+                               stat.S_ISCHR(mode)):
+                       self._pipe = None
+                       self._fileno = None
+                       self._protocol = None
+                       raise ValueError("Pipe transport is for pipes/sockets 
only.")
+
+               _set_nonblocking(self._fileno)
+
+               self._loop.call_soon(self._protocol.connection_made, self)
+               # only start reading when connection_made() has been called
+               self._loop.call_soon(self._loop.add_reader,
+                                                        self._fileno, 
self._read_ready)
+               if waiter is not None:
+                       # only wake up the waiter when connection_made() has 
been called
+                       self._loop.call_soon(
+                               lambda: None if waiter.cancelled() else 
waiter.set_result(None))
+
+       def _read_ready(self):
+               try:
+                       data = os.read(self._fileno, self.max_size)
+               except (BlockingIOError, InterruptedError):
+                       pass
+               except OSError as exc:
+                       self._fatal_error(exc, 'Fatal read error on pipe 
transport')
+               else:
+                       if data:
+                               self._protocol.data_received(data)
+                       else:
+                               self._closing = True
+                               self._loop.remove_reader(self._fileno)
+                               
self._loop.call_soon(self._protocol.eof_received)
+                               
self._loop.call_soon(self._call_connection_lost, None)
+
+       def pause_reading(self):
+               self._loop.remove_reader(self._fileno)
+
+       def resume_reading(self):
+               self._loop.add_reader(self._fileno, self._read_ready)
+
+       def set_protocol(self, protocol):
+               self._protocol = protocol
+
+       def get_protocol(self):
+               return self._protocol
+
+       def is_closing(self):
+               return self._closing
+
+       def close(self):
+               if not self._closing:
+                       self._close(None)
+
+       def _fatal_error(self, exc, message='Fatal error on pipe transport'):
+               # should be called by exception handler only
+               if (isinstance(exc, OSError) and exc.errno == errno.EIO):
+                       if self._loop.get_debug():
+                               logging.debug("%r: %s", self, message, 
exc_info=True)
+               else:
+                       self._loop.call_exception_handler({
+                               'message': message,
+                               'exception': exc,
+                               'transport': self,
+                               'protocol': self._protocol,
+                       })
+               self._close(exc)
+
+       def _close(self, exc):
+               self._closing = True
+               self._loop.remove_reader(self._fileno)
+               self._loop.call_soon(self._call_connection_lost, exc)
+
+       def _call_connection_lost(self, exc):
+               try:
+                       self._protocol.connection_lost(exc)
+               finally:
+                       self._pipe.close()
+                       self._pipe = None
+                       self._protocol = None
+                       self._loop = None
+
+
 class _UnixSubprocessTransport(_BaseSubprocessTransport):
        """
        This is identical to the standard library's private
-- 
2.13.6


Reply via email to