In python versions that support asyncio, this allows API consumers to use subprocess.PIPE for asyncio.create_subprocess_exec() stdin parameters.
Bug: https://bugs.gentoo.org/649588 --- .../util/futures/asyncio/test_subprocess_exec.py | 34 +++ pym/portage/util/futures/transports.py | 90 +++++++ pym/portage/util/futures/unix_events.py | 259 ++++++++++++++++++++- 3 files changed, 372 insertions(+), 11 deletions(-) create mode 100644 pym/portage/util/futures/transports.py diff --git a/pym/portage/tests/util/futures/asyncio/test_subprocess_exec.py b/pym/portage/tests/util/futures/asyncio/test_subprocess_exec.py index 94984fc93..8c8c395ca 100644 --- a/pym/portage/tests/util/futures/asyncio/test_subprocess_exec.py +++ b/pym/portage/tests/util/futures/asyncio/test_subprocess_exec.py @@ -191,3 +191,37 @@ class SubprocessExecTestCase(TestCase): self.assertEqual(loop.run_until_complete(proc.wait()), os.EX_OK) self._run_test(test) + + def testWriteTransport(self): + """ + Test asyncio.create_subprocess_exec(stdin=subprocess.PIPE) which + requires an AbstractEventLoop.connect_write_pipe implementation + (and a WriteTransport implementation for it to return). + """ + if not hasattr(asyncio, 'create_subprocess_exec'): + self.skipTest('create_subprocess_exec not implemented for python2') + + stdin_data = b'hello world' + cat_binary = find_binary("cat") + self.assertNotEqual(cat_binary, None) + cat_binary = cat_binary.encode() + + def test(loop): + proc = loop.run_until_complete( + asyncio.create_subprocess_exec( + cat_binary, + stdin=subprocess.PIPE, + stdout=subprocess.PIPE, stderr=subprocess.STDOUT)) + + # This buffers data when necessary to avoid blocking. + proc.stdin.write(stdin_data) + # Any buffered data is written asynchronously after the + # close method is called. + proc.stdin.close() + + self.assertEqual( + loop.run_until_complete(proc.stdout.read()), + stdin_data) + self.assertEqual(loop.run_until_complete(proc.wait()), os.EX_OK) + + self._run_test(test) diff --git a/pym/portage/util/futures/transports.py b/pym/portage/util/futures/transports.py new file mode 100644 index 000000000..60ea93073 --- /dev/null +++ b/pym/portage/util/futures/transports.py @@ -0,0 +1,90 @@ +# Copyright 2018 Gentoo Foundation +# Distributed under the terms of the GNU General Public License v2 + +try: + from asyncio.transports import Transport as _Transport +except ImportError: + _Transport = object + + +class _FlowControlMixin(_Transport): + """ + This is identical to the standard library's private + asyncio.transports._FlowControlMixin class. + + All the logic for (write) flow control in a mix-in base class. + + The subclass must implement get_write_buffer_size(). It must call + _maybe_pause_protocol() whenever the write buffer size increases, + and _maybe_resume_protocol() whenever it decreases. It may also + override set_write_buffer_limits() (e.g. to specify different + defaults). + + The subclass constructor must call super().__init__(extra). This + will call set_write_buffer_limits(). + + The user may call set_write_buffer_limits() and + get_write_buffer_size(), and their protocol's pause_writing() and + resume_writing() may be called. + """ + + def __init__(self, extra=None, loop=None): + super().__init__(extra) + assert loop is not None + self._loop = loop + self._protocol_paused = False + self._set_write_buffer_limits() + + def _maybe_pause_protocol(self): + size = self.get_write_buffer_size() + if size <= self._high_water: + return + if not self._protocol_paused: + self._protocol_paused = True + try: + self._protocol.pause_writing() + except Exception as exc: + self._loop.call_exception_handler({ + 'message': 'protocol.pause_writing() failed', + 'exception': exc, + 'transport': self, + 'protocol': self._protocol, + }) + + def _maybe_resume_protocol(self): + if (self._protocol_paused and + self.get_write_buffer_size() <= self._low_water): + self._protocol_paused = False + try: + self._protocol.resume_writing() + except Exception as exc: + self._loop.call_exception_handler({ + 'message': 'protocol.resume_writing() failed', + 'exception': exc, + 'transport': self, + 'protocol': self._protocol, + }) + + def get_write_buffer_limits(self): + return (self._low_water, self._high_water) + + def _set_write_buffer_limits(self, high=None, low=None): + if high is None: + if low is None: + high = 64*1024 + else: + high = 4*low + if low is None: + low = high // 4 + if not high >= low >= 0: + raise ValueError('high (%r) must be >= low (%r) must be >= 0' % + (high, low)) + self._high_water = high + self._low_water = low + + def set_write_buffer_limits(self, high=None, low=None): + self._set_write_buffer_limits(high=high, low=low) + self._maybe_pause_protocol() + + def get_write_buffer_size(self): + raise NotImplementedError diff --git a/pym/portage/util/futures/unix_events.py b/pym/portage/util/futures/unix_events.py index 9d84ab6aa..a1d7cac80 100644 --- a/pym/portage/util/futures/unix_events.py +++ b/pym/portage/util/futures/unix_events.py @@ -9,19 +9,25 @@ __all__ = ( try: from asyncio.base_subprocess import BaseSubprocessTransport as _BaseSubprocessTransport from asyncio.unix_events import AbstractChildWatcher as _AbstractChildWatcher - from asyncio.transports import ReadTransport as _ReadTransport + from asyncio.transports import ( + ReadTransport as _ReadTransport, + WriteTransport as _WriteTransport, + ) except ImportError: _AbstractChildWatcher = object _BaseSubprocessTransport = object _ReadTransport = object + _WriteTransport = object import errno import fcntl import functools import logging import os +import socket import stat import subprocess +import sys from portage.util._eventloop.global_event_loop import ( global_event_loop as _global_event_loop, @@ -30,7 +36,7 @@ from portage.util.futures import ( asyncio, events, ) -from portage.util.futures.futures import Future +from portage.util.futures.transports import _FlowControlMixin class _PortageEventLoop(events.AbstractEventLoop): @@ -117,6 +123,35 @@ class _PortageEventLoop(events.AbstractEventLoop): waiter.add_done_callback(waiter_callback) return result + def connect_write_pipe(self, protocol_factory, pipe): + """ + Register write pipe in event loop. Set the pipe to non-blocking mode. + + @type protocol_factory: callable + @param protocol_factory: must instantiate object with Protocol interface + @type pipe: file + @param pipe: a pipe to write to + @rtype: asyncio.Future + @return: Return pair (transport, protocol), where transport supports the + WriteTransport interface. + """ + protocol = protocol_factory() + result = self.create_future() + waiter = self.create_future() + transport = self._make_write_pipe_transport(pipe, protocol, waiter) + + def waiter_callback(waiter): + try: + waiter.result() + except Exception as e: + transport.close() + result.set_exception(e) + else: + result.set_result((transport, protocol)) + + waiter.add_done_callback(waiter_callback) + return result + def subprocess_exec(self, protocol_factory, program, *args, **kwargs): """ Run subprocesses asynchronously using the subprocess module. @@ -140,11 +175,6 @@ class _PortageEventLoop(events.AbstractEventLoop): stdout = kwargs.pop('stdout', subprocess.PIPE) stderr = kwargs.pop('stderr', subprocess.PIPE) - if stdin == subprocess.PIPE: - # Requires connect_write_pipe implementation, for example - # see asyncio.unix_events._UnixWritePipeTransport. - raise NotImplementedError() - universal_newlines = kwargs.pop('universal_newlines', False) shell = kwargs.pop('shell', False) bufsize = kwargs.pop('bufsize', 0) @@ -171,6 +201,10 @@ class _PortageEventLoop(events.AbstractEventLoop): extra=None): return _UnixReadPipeTransport(self, pipe, protocol, waiter, extra) + def _make_write_pipe_transport(self, pipe, protocol, waiter=None, + extra=None): + return _UnixWritePipeTransport(self, pipe, protocol, waiter, extra) + def _make_subprocess_transport(self, result, protocol, args, shell, stdin, stdout, stderr, bufsize, extra=None, **kwargs): waiter = self.create_future() @@ -314,18 +348,221 @@ class _UnixReadPipeTransport(_ReadTransport): self._loop = None +class _UnixWritePipeTransport(_FlowControlMixin, _WriteTransport): + """ + This is identical to the standard library's private + asyncio.unix_events._UnixWritePipeTransport class, except that it + only calls public AbstractEventLoop methods. + """ + + def __init__(self, loop, pipe, protocol, waiter=None, extra=None): + super().__init__(extra, loop) + self._extra['pipe'] = pipe + self._pipe = pipe + self._fileno = pipe.fileno() + self._protocol = protocol + self._buffer = bytearray() + self._conn_lost = 0 + self._closing = False # Set when close() or write_eof() called. + + mode = os.fstat(self._fileno).st_mode + is_char = stat.S_ISCHR(mode) + is_fifo = stat.S_ISFIFO(mode) + is_socket = stat.S_ISSOCK(mode) + if not (is_char or is_fifo or is_socket): + self._pipe = None + self._fileno = None + self._protocol = None + raise ValueError("Pipe transport is only for " + "pipes, sockets and character devices") + + _set_nonblocking(self._fileno) + self._loop.call_soon(self._protocol.connection_made, self) + + # On AIX, the reader trick (to be notified when the read end of the + # socket is closed) only works for sockets. On other platforms it + # works for pipes and sockets. (Exception: OS X 10.4? Issue #19294.) + if is_socket or (is_fifo and not sys.platform.startswith("aix")): + # only start reading when connection_made() has been called + self._loop.call_soon(self._loop.add_reader, + self._fileno, self._read_ready) + + if waiter is not None: + # only wake up the waiter when connection_made() has been called + self._loop.call_soon( + lambda: None if waiter.cancelled() else waiter.set_result(None)) + + def get_write_buffer_size(self): + return len(self._buffer) + + def _read_ready(self): + # Pipe was closed by peer. + if self._loop.get_debug(): + logging.info("%r was closed by peer", self) + if self._buffer: + self._close(BrokenPipeError()) + else: + self._close() + + def write(self, data): + assert isinstance(data, (bytes, bytearray, memoryview)), repr(data) + if isinstance(data, bytearray): + data = memoryview(data) + if not data: + return + + if self._conn_lost or self._closing: + self._conn_lost += 1 + return + + if not self._buffer: + # Attempt to send it right away first. + try: + n = os.write(self._fileno, data) + except (BlockingIOError, InterruptedError): + n = 0 + except Exception as exc: + self._conn_lost += 1 + self._fatal_error(exc, 'Fatal write error on pipe transport') + return + if n == len(data): + return + elif n > 0: + data = memoryview(data)[n:] + self._loop.add_writer(self._fileno, self._write_ready) + + self._buffer += data + self._maybe_pause_protocol() + + def _write_ready(self): + assert self._buffer, 'Data should not be empty' + + try: + n = os.write(self._fileno, self._buffer) + except (BlockingIOError, InterruptedError): + pass + except Exception as exc: + self._buffer.clear() + self._conn_lost += 1 + # Remove writer here, _fatal_error() doesn't it + # because _buffer is empty. + self._loop.remove_writer(self._fileno) + self._fatal_error(exc, 'Fatal write error on pipe transport') + else: + if n == len(self._buffer): + self._buffer.clear() + self._loop.remove_writer(self._fileno) + self._maybe_resume_protocol() # May append to buffer. + if self._closing: + self._loop.remove_reader(self._fileno) + self._call_connection_lost(None) + return + elif n > 0: + del self._buffer[:n] + + def can_write_eof(self): + return True + + def write_eof(self): + if self._closing: + return + assert self._pipe + self._closing = True + if not self._buffer: + self._loop.remove_reader(self._fileno) + self._loop.call_soon(self._call_connection_lost, None) + + def set_protocol(self, protocol): + self._protocol = protocol + + def get_protocol(self): + return self._protocol + + def is_closing(self): + return self._closing + + def close(self): + if self._pipe is not None and not self._closing: + # write_eof is all what we needed to close the write pipe + self.write_eof() + + def abort(self): + self._close(None) + + def _fatal_error(self, exc, message='Fatal error on pipe transport'): + # should be called by exception handler only + if isinstance(exc, + (BrokenPipeError, ConnectionResetError, ConnectionAbortedError)): + if self._loop.get_debug(): + logging.debug("%r: %s", self, message, exc_info=True) + else: + self._loop.call_exception_handler({ + 'message': message, + 'exception': exc, + 'transport': self, + 'protocol': self._protocol, + }) + self._close(exc) + + def _close(self, exc=None): + self._closing = True + if self._buffer: + self._loop.remove_writer(self._fileno) + self._buffer.clear() + self._loop.remove_reader(self._fileno) + self._loop.call_soon(self._call_connection_lost, exc) + + def _call_connection_lost(self, exc): + try: + self._protocol.connection_lost(exc) + finally: + self._pipe.close() + self._pipe = None + self._protocol = None + self._loop = None + + +if hasattr(os, 'set_inheritable'): + # Python 3.4 and newer + _set_inheritable = os.set_inheritable +else: + def _set_inheritable(fd, inheritable): + cloexec_flag = getattr(fcntl, 'FD_CLOEXEC', 1) + + old = fcntl.fcntl(fd, fcntl.F_GETFD) + if not inheritable: + fcntl.fcntl(fd, fcntl.F_SETFD, old | cloexec_flag) + else: + fcntl.fcntl(fd, fcntl.F_SETFD, old & ~cloexec_flag) + + class _UnixSubprocessTransport(_BaseSubprocessTransport): """ This is identical to the standard library's private - asyncio.unix_events._UnixSubprocessTransport class, except that - subprocess.PIPE is not implemented for stdin, since that would - require connect_write_pipe support in the event loop. For example, - see the asyncio.unix_events._UnixWritePipeTransport class. + asyncio.unix_events._UnixSubprocessTransport class, except that it + only calls public AbstractEventLoop methods. """ def _start(self, args, shell, stdin, stdout, stderr, bufsize, **kwargs): + stdin_w = None + if stdin == subprocess.PIPE: + # Use a socket pair for stdin, since not all platforms + # support selecting read events on the write end of a + # socket (which we use in order to detect closing of the + # other end). Notably this is needed on AIX, and works + # just fine on other platforms. + stdin, stdin_w = socket.socketpair() + + # Mark the write end of the stdin pipe as non-inheritable, + # needed by close_fds=False on Python 3.3 and older + # (Python 3.4 implements the PEP 446, socketpair returns + # non-inheritable sockets) + _set_inheritable(stdin_w.fileno(), False) self._proc = subprocess.Popen( args, shell=shell, stdin=stdin, stdout=stdout, stderr=stderr, universal_newlines=False, bufsize=bufsize, **kwargs) + if stdin_w is not None: + stdin.close() + self._proc.stdin = os.fdopen(stdin_w.detach(), 'wb', bufsize) class AbstractChildWatcher(_AbstractChildWatcher): -- 2.13.6