Add support to write to a non-blocking pipe instead of a log file. This is needed for the purposes of bug 709746, where PipeLogger will write to a pipe that is drained by anoher PipeLogger instance which is running in the same process.
Bug: https://bugs.gentoo.org/709746 Signed-off-by: Zac Medico <zmed...@gentoo.org> --- lib/portage/tests/process/test_PipeLogger.py | 58 ++++++++++++++++ lib/portage/util/_async/PipeLogger.py | 73 +++++++++++++++----- 2 files changed, 115 insertions(+), 16 deletions(-) create mode 100644 lib/portage/tests/process/test_PipeLogger.py diff --git a/lib/portage/tests/process/test_PipeLogger.py b/lib/portage/tests/process/test_PipeLogger.py new file mode 100644 index 000000000..2bd94cf39 --- /dev/null +++ b/lib/portage/tests/process/test_PipeLogger.py @@ -0,0 +1,58 @@ +# Copyright 2020 Gentoo Authors +# Distributed under the terms of the GNU General Public License v2 + +from portage import os +from portage.tests import TestCase +from portage.util._async.PipeLogger import PipeLogger +from portage.util.futures import asyncio +from portage.util.futures._asyncio.streams import _reader, _writer +from portage.util.futures.compat_coroutine import coroutine, coroutine_return +from portage.util.futures.unix_events import _set_nonblocking + + +class PipeLoggerTestCase(TestCase): + + @coroutine + def _testPipeLoggerToPipe(self, test_string, loop=None): + """ + Test PipeLogger writing to a pipe connected to a PipeReader. + This verifies that PipeLogger does not deadlock when writing + to a pipe that's drained by a PipeReader running in the same + process (requires non-blocking write). + """ + + input_fd, writer_pipe = os.pipe() + _set_nonblocking(writer_pipe) + writer_pipe = os.fdopen(writer_pipe, 'wb', 0) + writer = asyncio.ensure_future(_writer(writer_pipe, test_string.encode('ascii'), loop=loop), loop=loop) + writer.add_done_callback(lambda writer: writer_pipe.close()) + + pr, pw = os.pipe() + + consumer = PipeLogger(background=True, + input_fd=input_fd, + log_file_path=os.fdopen(pw, 'wb', 0), + scheduler=loop) + consumer.start() + + # Before starting the reader, wait here for a moment, in order + # to exercise PipeLogger's handling of EAGAIN during write. + yield asyncio.wait([writer], timeout=0.01) + + reader = _reader(pr, loop=loop) + yield writer + content = yield reader + yield consumer.async_wait() + + self.assertEqual(consumer.returncode, os.EX_OK) + + coroutine_return(content.decode('ascii', 'replace')) + + def testPipeLogger(self): + loop = asyncio._wrap_loop() + + for x in (1, 2, 5, 6, 7, 8, 2**5, 2**10, 2**12, 2**13, 2**14, 2**17, 2**17 + 1): + test_string = x * "a" + output = loop.run_until_complete(self._testPipeLoggerToPipe(test_string, loop=loop)) + self.assertEqual(test_string, output, + "x = %s, len(output) = %s" % (x, len(output))) diff --git a/lib/portage/util/_async/PipeLogger.py b/lib/portage/util/_async/PipeLogger.py index a4258f350..ce8afb846 100644 --- a/lib/portage/util/_async/PipeLogger.py +++ b/lib/portage/util/_async/PipeLogger.py @@ -8,6 +8,10 @@ import sys import portage from portage import os, _encodings, _unicode_encode +from portage.util.futures import asyncio +from portage.util.futures._asyncio.streams import _writer +from portage.util.futures.compat_coroutine import coroutine +from portage.util.futures.unix_events import _set_nonblocking from _emerge.AbstractPollTask import AbstractPollTask class PipeLogger(AbstractPollTask): @@ -21,13 +25,16 @@ class PipeLogger(AbstractPollTask): """ __slots__ = ("input_fd", "log_file_path", "stdout_fd") + \ - ("_log_file", "_log_file_real") + ("_io_loop_task", "_log_file", "_log_file_nb", "_log_file_real") def _start(self): log_file_path = self.log_file_path - if log_file_path is not None: - + if hasattr(log_file_path, 'write'): + self._log_file_nb = True + self._log_file = log_file_path + _set_nonblocking(self._log_file.fileno()) + elif log_file_path is not None: self._log_file = open(_unicode_encode(log_file_path, encoding=_encodings['fs'], errors='strict'), mode='ab') if log_file_path.endswith('.gz'): @@ -40,9 +47,9 @@ class PipeLogger(AbstractPollTask): mode=0o660) if isinstance(self.input_fd, int): - fd = self.input_fd - else: - fd = self.input_fd.fileno() + self.input_fd = os.fdopen(self.input_fd, 'rb', 0) + + fd = self.input_fd.fileno() fcntl.fcntl(fd, fcntl.F_SETFL, fcntl.fcntl(fd, fcntl.F_GETFL) | os.O_NONBLOCK) @@ -57,7 +64,8 @@ class PipeLogger(AbstractPollTask): fcntl.fcntl(fd, fcntl.F_SETFD, fcntl.fcntl(fd, fcntl.F_GETFD) | fcntl.FD_CLOEXEC) - self.scheduler.add_reader(fd, self._output_handler, fd) + self._io_loop_task = asyncio.ensure_future(self._io_loop(self.input_fd), loop=self.scheduler) + self._io_loop_task.add_done_callback(self._io_loop_done) self._registered = True def _cancel(self): @@ -65,25 +73,36 @@ class PipeLogger(AbstractPollTask): if self.returncode is None: self.returncode = self._cancelled_returncode - def _output_handler(self, fd): - + @coroutine + def _io_loop(self, input_file): background = self.background stdout_fd = self.stdout_fd log_file = self._log_file + fd = input_file.fileno() while True: buf = self._read_buf(fd) if buf is None: # not a POLLIN event, EAGAIN, etc... - break + future = self.scheduler.create_future() + self.scheduler.add_reader(fd, future.set_result, None) + try: + yield future + finally: + # The loop and input file may have been closed. + if not self.scheduler.is_closed(): + future.done() or future.cancel() + # Do not call remove_reader in cases where fd has + # been closed and then re-allocated to a concurrent + # coroutine as in bug 716636. + if not input_file.closed: + self.scheduler.remove_reader(fd) + continue if not buf: # EOF - self._unregister() - self.returncode = self.returncode or os.EX_OK - self._async_wait() - break + return else: if not background and stdout_fd is not None: @@ -120,8 +139,25 @@ class PipeLogger(AbstractPollTask): fcntl.F_GETFL) ^ os.O_NONBLOCK) if log_file is not None: - log_file.write(buf) - log_file.flush() + if self._log_file_nb: + # Use the _writer function which uses os.write, since the + # log_file.write method looses data when an EAGAIN occurs. + yield _writer(log_file, buf, loop=self.scheduler) + else: + # For gzip.GzipFile instances, the above _writer function + # will not work because data written directly to the file + # descriptor bypasses compression. + log_file.write(buf) + log_file.flush() + + def _io_loop_done(self, future): + try: + future.result() + except asyncio.CancelledError: + self.cancel() + self._was_cancelled() + self.returncode = self.returncode or os.EX_OK + self._async_wait() def _unregister(self): if self.input_fd is not None: @@ -133,11 +169,16 @@ class PipeLogger(AbstractPollTask): self.input_fd.close() self.input_fd = None + if self._io_loop_task is not None: + self._io_loop_task.done() or self._io_loop_task.cancel() + self._io_loop_task = None + if self.stdout_fd is not None: os.close(self.stdout_fd) self.stdout_fd = None if self._log_file is not None: + self.scheduler.remove_writer(self._log_file.fileno()) self._log_file.close() self._log_file = None -- 2.25.3