On Fri, 19 Jun 2020 13:39:18 -0700 Zac Medico <zmed...@gentoo.org> wrote:
> Add support to write to a non-blocking pipe instead of a > log file. This is needed for the purposes of bug 709746, > where PipeLogger will write to a pipe that is drained > by anoher PipeLogger instance which is running in the same > process. > > Bug: https://bugs.gentoo.org/709746 > Signed-off-by: Zac Medico <zmed...@gentoo.org> > --- > lib/portage/tests/process/test_PipeLogger.py | 58 ++++++++++++++++ > lib/portage/util/_async/PipeLogger.py | 73 > +++++++++++++++----- 2 files changed, 115 insertions(+), 16 > deletions(-) create mode 100644 > lib/portage/tests/process/test_PipeLogger.py > > diff --git a/lib/portage/tests/process/test_PipeLogger.py > b/lib/portage/tests/process/test_PipeLogger.py new file mode 100644 > index 000000000..2bd94cf39 > --- /dev/null > +++ b/lib/portage/tests/process/test_PipeLogger.py > @@ -0,0 +1,58 @@ > +# Copyright 2020 Gentoo Authors > +# Distributed under the terms of the GNU General Public License v2 > + > +from portage import os > +from portage.tests import TestCase > +from portage.util._async.PipeLogger import PipeLogger > +from portage.util.futures import asyncio > +from portage.util.futures._asyncio.streams import _reader, _writer > +from portage.util.futures.compat_coroutine import coroutine, > coroutine_return +from portage.util.futures.unix_events import > _set_nonblocking + > + > +class PipeLoggerTestCase(TestCase): > + > + @coroutine > + def _testPipeLoggerToPipe(self, test_string, loop=None): > + """ > + Test PipeLogger writing to a pipe connected to a > PipeReader. > + This verifies that PipeLogger does not deadlock when > writing > + to a pipe that's drained by a PipeReader running in > the same > + process (requires non-blocking write). > + """ > + > + input_fd, writer_pipe = os.pipe() > + _set_nonblocking(writer_pipe) > + writer_pipe = os.fdopen(writer_pipe, 'wb', 0) > + writer = asyncio.ensure_future(_writer(writer_pipe, > test_string.encode('ascii'), loop=loop), loop=loop) > + writer.add_done_callback(lambda writer: > writer_pipe.close()) + > + pr, pw = os.pipe() > + > + consumer = PipeLogger(background=True, > + input_fd=input_fd, > + log_file_path=os.fdopen(pw, 'wb', 0), > + scheduler=loop) > + consumer.start() > + > + # Before starting the reader, wait here for a > moment, in order > + # to exercise PipeLogger's handling of EAGAIN during > write. > + yield asyncio.wait([writer], timeout=0.01) > + > + reader = _reader(pr, loop=loop) > + yield writer > + content = yield reader > + yield consumer.async_wait() > + > + self.assertEqual(consumer.returncode, os.EX_OK) > + > + coroutine_return(content.decode('ascii', 'replace')) > + > + def testPipeLogger(self): > + loop = asyncio._wrap_loop() > + > + for x in (1, 2, 5, 6, 7, 8, 2**5, 2**10, 2**12, > 2**13, 2**14, 2**17, 2**17 + 1): > + test_string = x * "a" > + output = > loop.run_until_complete(self._testPipeLoggerToPipe(test_string, > loop=loop)) > + self.assertEqual(test_string, output, > + "x = %s, len(output) = %s" % (x, > len(output))) diff --git a/lib/portage/util/_async/PipeLogger.py > b/lib/portage/util/_async/PipeLogger.py index a4258f350..ce8afb846 > 100644 --- a/lib/portage/util/_async/PipeLogger.py > +++ b/lib/portage/util/_async/PipeLogger.py > @@ -8,6 +8,10 @@ import sys > > import portage > from portage import os, _encodings, _unicode_encode > +from portage.util.futures import asyncio > +from portage.util.futures._asyncio.streams import _writer > +from portage.util.futures.compat_coroutine import coroutine > +from portage.util.futures.unix_events import _set_nonblocking > from _emerge.AbstractPollTask import AbstractPollTask > > class PipeLogger(AbstractPollTask): > @@ -21,13 +25,16 @@ class PipeLogger(AbstractPollTask): > """ > > __slots__ = ("input_fd", "log_file_path", "stdout_fd") + \ > - ("_log_file", "_log_file_real") > + ("_io_loop_task", "_log_file", "_log_file_nb", > "_log_file_real") > def _start(self): > > log_file_path = self.log_file_path > - if log_file_path is not None: > - > + if hasattr(log_file_path, 'write'): > + self._log_file_nb = True > + self._log_file = log_file_path > + _set_nonblocking(self._log_file.fileno()) > + elif log_file_path is not None: > self._log_file = > open(_unicode_encode(log_file_path, encoding=_encodings['fs'], > errors='strict'), mode='ab') if log_file_path.endswith('.gz'): > @@ -40,9 +47,9 @@ class PipeLogger(AbstractPollTask): > mode=0o660) > > if isinstance(self.input_fd, int): > - fd = self.input_fd > - else: > - fd = self.input_fd.fileno() > + self.input_fd = os.fdopen(self.input_fd, > 'rb', 0) + > + fd = self.input_fd.fileno() > > fcntl.fcntl(fd, fcntl.F_SETFL, > fcntl.fcntl(fd, fcntl.F_GETFL) | > os.O_NONBLOCK) @@ -57,7 +64,8 @@ class PipeLogger(AbstractPollTask): > fcntl.fcntl(fd, fcntl.F_SETFD, > fcntl.fcntl(fd, > fcntl.F_GETFD) | fcntl.FD_CLOEXEC) > - self.scheduler.add_reader(fd, self._output_handler, > fd) > + self._io_loop_task = > asyncio.ensure_future(self._io_loop(self.input_fd), > loop=self.scheduler) > + > self._io_loop_task.add_done_callback(self._io_loop_done) > self._registered = True > def _cancel(self): > @@ -65,25 +73,36 @@ class PipeLogger(AbstractPollTask): > if self.returncode is None: > self.returncode = self._cancelled_returncode > > - def _output_handler(self, fd): > - > + @coroutine > + def _io_loop(self, input_file): > background = self.background > stdout_fd = self.stdout_fd > log_file = self._log_file > + fd = input_file.fileno() > > while True: > buf = self._read_buf(fd) > > if buf is None: > # not a POLLIN event, EAGAIN, etc... > - break > + future = > self.scheduler.create_future() > + self.scheduler.add_reader(fd, > future.set_result, None) > + try: > + yield future > + finally: > + # The loop and input file > may have been closed. > + if not > self.scheduler.is_closed(): > + future.done() or > future.cancel() > + # Do not call > remove_reader in cases where fd has > + # been closed and > then re-allocated to a concurrent > + # coroutine as in > bug 716636. > + if not > input_file.closed: > + > self.scheduler.remove_reader(fd) > + continue > > if not buf: > # EOF > - self._unregister() > - self.returncode = self.returncode or > os.EX_OK > - self._async_wait() > - break > + return > > else: > if not background and stdout_fd is > not None: @@ -120,8 +139,25 @@ class PipeLogger(AbstractPollTask): > fcntl.F_GETFL) > ^ os.O_NONBLOCK) > if log_file is not None: > - log_file.write(buf) > - log_file.flush() > + if self._log_file_nb: > + # Use the _writer > function which uses os.write, since the > + # log_file.write > method looses data when an EAGAIN occurs. > + yield > _writer(log_file, buf, loop=self.scheduler) > + else: > + # For gzip.GzipFile > instances, the above _writer function > + # will not work > because data written directly to the file > + # descriptor > bypasses compression. > + log_file.write(buf) > + log_file.flush() > + > + def _io_loop_done(self, future): > + try: > + future.result() > + except asyncio.CancelledError: > + self.cancel() > + self._was_cancelled() > + self.returncode = self.returncode or os.EX_OK > + self._async_wait() > > def _unregister(self): > if self.input_fd is not None: > @@ -133,11 +169,16 @@ class PipeLogger(AbstractPollTask): > self.input_fd.close() > self.input_fd = None > > + if self._io_loop_task is not None: > + self._io_loop_task.done() or > self._io_loop_task.cancel() > + self._io_loop_task = None > + > if self.stdout_fd is not None: > os.close(self.stdout_fd) > self.stdout_fd = None > > if self._log_file is not None: > + > self.scheduler.remove_writer(self._log_file.fileno()) > self._log_file.close() self._log_file = None > Looks good