On Fri, 19 Jun 2020 13:39:18 -0700
Zac Medico <zmed...@gentoo.org> wrote:

> Add support to write to a non-blocking pipe instead of a
> log file. This is needed for the purposes of bug 709746,
> where PipeLogger will write to a pipe that is drained
> by anoher PipeLogger instance which is running in the same
> process.
> 
> Bug: https://bugs.gentoo.org/709746
> Signed-off-by: Zac Medico <zmed...@gentoo.org>
> ---
>  lib/portage/tests/process/test_PipeLogger.py | 58 ++++++++++++++++
>  lib/portage/util/_async/PipeLogger.py        | 73
> +++++++++++++++----- 2 files changed, 115 insertions(+), 16
> deletions(-) create mode 100644
> lib/portage/tests/process/test_PipeLogger.py
> 
> diff --git a/lib/portage/tests/process/test_PipeLogger.py
> b/lib/portage/tests/process/test_PipeLogger.py new file mode 100644
> index 000000000..2bd94cf39
> --- /dev/null
> +++ b/lib/portage/tests/process/test_PipeLogger.py
> @@ -0,0 +1,58 @@
> +# Copyright 2020 Gentoo Authors
> +# Distributed under the terms of the GNU General Public License v2
> +
> +from portage import os
> +from portage.tests import TestCase
> +from portage.util._async.PipeLogger import PipeLogger
> +from portage.util.futures import asyncio
> +from portage.util.futures._asyncio.streams import _reader, _writer
> +from portage.util.futures.compat_coroutine import coroutine,
> coroutine_return +from portage.util.futures.unix_events import
> _set_nonblocking +
> +
> +class PipeLoggerTestCase(TestCase):
> +
> +     @coroutine
> +     def _testPipeLoggerToPipe(self, test_string, loop=None):
> +             """
> +             Test PipeLogger writing to a pipe connected to a
> PipeReader.
> +             This verifies that PipeLogger does not deadlock when
> writing
> +             to a pipe that's drained by a PipeReader running in
> the same
> +             process (requires non-blocking write).
> +             """
> +
> +             input_fd, writer_pipe = os.pipe()
> +             _set_nonblocking(writer_pipe)
> +             writer_pipe = os.fdopen(writer_pipe, 'wb', 0)
> +             writer = asyncio.ensure_future(_writer(writer_pipe,
> test_string.encode('ascii'), loop=loop), loop=loop)
> +             writer.add_done_callback(lambda writer:
> writer_pipe.close()) +
> +             pr, pw = os.pipe()
> +
> +             consumer = PipeLogger(background=True,
> +                     input_fd=input_fd,
> +                     log_file_path=os.fdopen(pw, 'wb', 0),
> +                     scheduler=loop)
> +             consumer.start()
> +
> +             # Before starting the reader, wait here for a
> moment, in order
> +             # to exercise PipeLogger's handling of EAGAIN during
> write.
> +             yield asyncio.wait([writer], timeout=0.01)
> +
> +             reader = _reader(pr, loop=loop)
> +             yield writer
> +             content = yield reader
> +             yield consumer.async_wait()
> +
> +             self.assertEqual(consumer.returncode, os.EX_OK)
> +
> +             coroutine_return(content.decode('ascii', 'replace'))
> +
> +     def testPipeLogger(self):
> +             loop = asyncio._wrap_loop()
> +
> +             for x in (1, 2, 5, 6, 7, 8, 2**5, 2**10, 2**12,
> 2**13, 2**14, 2**17, 2**17 + 1):
> +                     test_string = x * "a"
> +                     output =
> loop.run_until_complete(self._testPipeLoggerToPipe(test_string,
> loop=loop))
> +                     self.assertEqual(test_string, output,
> +                             "x = %s, len(output) = %s" % (x,
> len(output))) diff --git a/lib/portage/util/_async/PipeLogger.py
> b/lib/portage/util/_async/PipeLogger.py index a4258f350..ce8afb846
> 100644 --- a/lib/portage/util/_async/PipeLogger.py
> +++ b/lib/portage/util/_async/PipeLogger.py
> @@ -8,6 +8,10 @@ import sys
>  
>  import portage
>  from portage import os, _encodings, _unicode_encode
> +from portage.util.futures import asyncio
> +from portage.util.futures._asyncio.streams import _writer
> +from portage.util.futures.compat_coroutine import coroutine
> +from portage.util.futures.unix_events import _set_nonblocking
>  from _emerge.AbstractPollTask import AbstractPollTask
>  
>  class PipeLogger(AbstractPollTask):
> @@ -21,13 +25,16 @@ class PipeLogger(AbstractPollTask):
>       """
>  
>       __slots__ = ("input_fd", "log_file_path", "stdout_fd") + \
> -             ("_log_file", "_log_file_real")
> +             ("_io_loop_task", "_log_file", "_log_file_nb",
> "_log_file_real") 
>       def _start(self):
>  
>               log_file_path = self.log_file_path
> -             if log_file_path is not None:
> -
> +             if hasattr(log_file_path, 'write'):
> +                     self._log_file_nb = True
> +                     self._log_file = log_file_path
> +                     _set_nonblocking(self._log_file.fileno())
> +             elif log_file_path is not None:
>                       self._log_file =
> open(_unicode_encode(log_file_path, encoding=_encodings['fs'],
> errors='strict'), mode='ab') if log_file_path.endswith('.gz'):
> @@ -40,9 +47,9 @@ class PipeLogger(AbstractPollTask):
>                               mode=0o660)
>  
>               if isinstance(self.input_fd, int):
> -                     fd = self.input_fd
> -             else:
> -                     fd = self.input_fd.fileno()
> +                     self.input_fd = os.fdopen(self.input_fd,
> 'rb', 0) +
> +             fd = self.input_fd.fileno()
>  
>               fcntl.fcntl(fd, fcntl.F_SETFL,
>                       fcntl.fcntl(fd, fcntl.F_GETFL) |
> os.O_NONBLOCK) @@ -57,7 +64,8 @@ class PipeLogger(AbstractPollTask):
>                               fcntl.fcntl(fd, fcntl.F_SETFD,
>                                       fcntl.fcntl(fd,
> fcntl.F_GETFD) | fcntl.FD_CLOEXEC) 
> -             self.scheduler.add_reader(fd, self._output_handler,
> fd)
> +             self._io_loop_task =
> asyncio.ensure_future(self._io_loop(self.input_fd),
> loop=self.scheduler)
> +
> self._io_loop_task.add_done_callback(self._io_loop_done)
> self._registered = True 
>       def _cancel(self):
> @@ -65,25 +73,36 @@ class PipeLogger(AbstractPollTask):
>               if self.returncode is None:
>                       self.returncode = self._cancelled_returncode
>  
> -     def _output_handler(self, fd):
> -
> +     @coroutine
> +     def _io_loop(self, input_file):
>               background = self.background
>               stdout_fd = self.stdout_fd
>               log_file = self._log_file 
> +             fd = input_file.fileno()
>  
>               while True:
>                       buf = self._read_buf(fd)
>  
>                       if buf is None:
>                               # not a POLLIN event, EAGAIN, etc...
> -                             break
> +                             future =
> self.scheduler.create_future()
> +                             self.scheduler.add_reader(fd,
> future.set_result, None)
> +                             try:
> +                                     yield future
> +                             finally:
> +                                     # The loop and input file
> may have been closed.
> +                                     if not
> self.scheduler.is_closed():
> +                                             future.done() or
> future.cancel()
> +                                             # Do not call
> remove_reader in cases where fd has
> +                                             # been closed and
> then re-allocated to a concurrent
> +                                             # coroutine as in
> bug 716636.
> +                                             if not
> input_file.closed:
> +
> self.scheduler.remove_reader(fd)
> +                             continue
>  
>                       if not buf:
>                               # EOF
> -                             self._unregister()
> -                             self.returncode = self.returncode or
> os.EX_OK
> -                             self._async_wait()
> -                             break
> +                             return
>  
>                       else:
>                               if not background and stdout_fd is
> not None: @@ -120,8 +139,25 @@ class PipeLogger(AbstractPollTask):
>                                                               fcntl.F_GETFL)
> ^ os.O_NONBLOCK) 
>                               if log_file is not None:
> -                                     log_file.write(buf)
> -                                     log_file.flush()
> +                                     if self._log_file_nb:
> +                                             # Use the _writer
> function which uses os.write, since the
> +                                             # log_file.write
> method looses data when an EAGAIN occurs.
> +                                             yield
> _writer(log_file, buf, loop=self.scheduler)
> +                                     else:
> +                                             # For gzip.GzipFile
> instances, the above _writer function
> +                                             # will not work
> because data written directly to the file
> +                                             # descriptor
> bypasses compression.
> +                                             log_file.write(buf)
> +                                             log_file.flush()
> +
> +     def _io_loop_done(self, future):
> +             try:
> +                     future.result()
> +             except asyncio.CancelledError:
> +                     self.cancel()
> +                     self._was_cancelled()
> +             self.returncode = self.returncode or os.EX_OK
> +             self._async_wait()
>  
>       def _unregister(self):
>               if self.input_fd is not None:
> @@ -133,11 +169,16 @@ class PipeLogger(AbstractPollTask):
>                               self.input_fd.close()
>                       self.input_fd = None
>  
> +             if self._io_loop_task is not None:
> +                     self._io_loop_task.done() or
> self._io_loop_task.cancel()
> +                     self._io_loop_task = None
> +
>               if self.stdout_fd is not None:
>                       os.close(self.stdout_fd)
>                       self.stdout_fd = None
>  
>               if self._log_file is not None:
> +
> self.scheduler.remove_writer(self._log_file.fileno())
> self._log_file.close() self._log_file = None
>  

Looks good

Reply via email to