Add support to write to a non-blocking pipe instead of a
log file. This is needed for the purposes of bug 709746,
where PipeLogger will write to a pipe that is drained
by anoher PipeLogger instance which is running in the same
process.

Bug: https://bugs.gentoo.org/709746
Signed-off-by: Zac Medico <zmed...@gentoo.org>
---
 lib/portage/tests/process/test_PipeLogger.py | 58 ++++++++++++++++
 lib/portage/util/_async/PipeLogger.py        | 73 +++++++++++++++-----
 2 files changed, 115 insertions(+), 16 deletions(-)
 create mode 100644 lib/portage/tests/process/test_PipeLogger.py

diff --git a/lib/portage/tests/process/test_PipeLogger.py 
b/lib/portage/tests/process/test_PipeLogger.py
new file mode 100644
index 000000000..2bd94cf39
--- /dev/null
+++ b/lib/portage/tests/process/test_PipeLogger.py
@@ -0,0 +1,58 @@
+# Copyright 2020 Gentoo Authors
+# Distributed under the terms of the GNU General Public License v2
+
+from portage import os
+from portage.tests import TestCase
+from portage.util._async.PipeLogger import PipeLogger
+from portage.util.futures import asyncio
+from portage.util.futures._asyncio.streams import _reader, _writer
+from portage.util.futures.compat_coroutine import coroutine, coroutine_return
+from portage.util.futures.unix_events import _set_nonblocking
+
+
+class PipeLoggerTestCase(TestCase):
+
+       @coroutine
+       def _testPipeLoggerToPipe(self, test_string, loop=None):
+               """
+               Test PipeLogger writing to a pipe connected to a PipeReader.
+               This verifies that PipeLogger does not deadlock when writing
+               to a pipe that's drained by a PipeReader running in the same
+               process (requires non-blocking write).
+               """
+
+               input_fd, writer_pipe = os.pipe()
+               _set_nonblocking(writer_pipe)
+               writer_pipe = os.fdopen(writer_pipe, 'wb', 0)
+               writer = asyncio.ensure_future(_writer(writer_pipe, 
test_string.encode('ascii'), loop=loop), loop=loop)
+               writer.add_done_callback(lambda writer: writer_pipe.close())
+
+               pr, pw = os.pipe()
+
+               consumer = PipeLogger(background=True,
+                       input_fd=input_fd,
+                       log_file_path=os.fdopen(pw, 'wb', 0),
+                       scheduler=loop)
+               consumer.start()
+
+               # Before starting the reader, wait here for a moment, in order
+               # to exercise PipeLogger's handling of EAGAIN during write.
+               yield asyncio.wait([writer], timeout=0.01)
+
+               reader = _reader(pr, loop=loop)
+               yield writer
+               content = yield reader
+               yield consumer.async_wait()
+
+               self.assertEqual(consumer.returncode, os.EX_OK)
+
+               coroutine_return(content.decode('ascii', 'replace'))
+
+       def testPipeLogger(self):
+               loop = asyncio._wrap_loop()
+
+               for x in (1, 2, 5, 6, 7, 8, 2**5, 2**10, 2**12, 2**13, 2**14, 
2**17, 2**17 + 1):
+                       test_string = x * "a"
+                       output = 
loop.run_until_complete(self._testPipeLoggerToPipe(test_string, loop=loop))
+                       self.assertEqual(test_string, output,
+                               "x = %s, len(output) = %s" % (x, len(output)))
diff --git a/lib/portage/util/_async/PipeLogger.py 
b/lib/portage/util/_async/PipeLogger.py
index a4258f350..ce8afb846 100644
--- a/lib/portage/util/_async/PipeLogger.py
+++ b/lib/portage/util/_async/PipeLogger.py
@@ -8,6 +8,10 @@ import sys
 
 import portage
 from portage import os, _encodings, _unicode_encode
+from portage.util.futures import asyncio
+from portage.util.futures._asyncio.streams import _writer
+from portage.util.futures.compat_coroutine import coroutine
+from portage.util.futures.unix_events import _set_nonblocking
 from _emerge.AbstractPollTask import AbstractPollTask
 
 class PipeLogger(AbstractPollTask):
@@ -21,13 +25,16 @@ class PipeLogger(AbstractPollTask):
        """
 
        __slots__ = ("input_fd", "log_file_path", "stdout_fd") + \
-               ("_log_file", "_log_file_real")
+               ("_io_loop_task", "_log_file", "_log_file_nb", "_log_file_real")
 
        def _start(self):
 
                log_file_path = self.log_file_path
-               if log_file_path is not None:
-
+               if hasattr(log_file_path, 'write'):
+                       self._log_file_nb = True
+                       self._log_file = log_file_path
+                       _set_nonblocking(self._log_file.fileno())
+               elif log_file_path is not None:
                        self._log_file = open(_unicode_encode(log_file_path,
                                encoding=_encodings['fs'], errors='strict'), 
mode='ab')
                        if log_file_path.endswith('.gz'):
@@ -40,9 +47,9 @@ class PipeLogger(AbstractPollTask):
                                mode=0o660)
 
                if isinstance(self.input_fd, int):
-                       fd = self.input_fd
-               else:
-                       fd = self.input_fd.fileno()
+                       self.input_fd = os.fdopen(self.input_fd, 'rb', 0)
+
+               fd = self.input_fd.fileno()
 
                fcntl.fcntl(fd, fcntl.F_SETFL,
                        fcntl.fcntl(fd, fcntl.F_GETFL) | os.O_NONBLOCK)
@@ -57,7 +64,8 @@ class PipeLogger(AbstractPollTask):
                                fcntl.fcntl(fd, fcntl.F_SETFD,
                                        fcntl.fcntl(fd, fcntl.F_GETFD) | 
fcntl.FD_CLOEXEC)
 
-               self.scheduler.add_reader(fd, self._output_handler, fd)
+               self._io_loop_task = 
asyncio.ensure_future(self._io_loop(self.input_fd), loop=self.scheduler)
+               self._io_loop_task.add_done_callback(self._io_loop_done)
                self._registered = True
 
        def _cancel(self):
@@ -65,25 +73,36 @@ class PipeLogger(AbstractPollTask):
                if self.returncode is None:
                        self.returncode = self._cancelled_returncode
 
-       def _output_handler(self, fd):
-
+       @coroutine
+       def _io_loop(self, input_file):
                background = self.background
                stdout_fd = self.stdout_fd
                log_file = self._log_file 
+               fd = input_file.fileno()
 
                while True:
                        buf = self._read_buf(fd)
 
                        if buf is None:
                                # not a POLLIN event, EAGAIN, etc...
-                               break
+                               future = self.scheduler.create_future()
+                               self.scheduler.add_reader(fd, 
future.set_result, None)
+                               try:
+                                       yield future
+                               finally:
+                                       # The loop and input file may have been 
closed.
+                                       if not self.scheduler.is_closed():
+                                               future.done() or future.cancel()
+                                               # Do not call remove_reader in 
cases where fd has
+                                               # been closed and then 
re-allocated to a concurrent
+                                               # coroutine as in bug 716636.
+                                               if not input_file.closed:
+                                                       
self.scheduler.remove_reader(fd)
+                               continue
 
                        if not buf:
                                # EOF
-                               self._unregister()
-                               self.returncode = self.returncode or os.EX_OK
-                               self._async_wait()
-                               break
+                               return
 
                        else:
                                if not background and stdout_fd is not None:
@@ -120,8 +139,25 @@ class PipeLogger(AbstractPollTask):
                                                                fcntl.F_GETFL) 
^ os.O_NONBLOCK)
 
                                if log_file is not None:
-                                       log_file.write(buf)
-                                       log_file.flush()
+                                       if self._log_file_nb:
+                                               # Use the _writer function 
which uses os.write, since the
+                                               # log_file.write method looses 
data when an EAGAIN occurs.
+                                               yield _writer(log_file, buf, 
loop=self.scheduler)
+                                       else:
+                                               # For gzip.GzipFile instances, 
the above _writer function
+                                               # will not work because data 
written directly to the file
+                                               # descriptor bypasses 
compression.
+                                               log_file.write(buf)
+                                               log_file.flush()
+
+       def _io_loop_done(self, future):
+               try:
+                       future.result()
+               except asyncio.CancelledError:
+                       self.cancel()
+                       self._was_cancelled()
+               self.returncode = self.returncode or os.EX_OK
+               self._async_wait()
 
        def _unregister(self):
                if self.input_fd is not None:
@@ -133,11 +169,16 @@ class PipeLogger(AbstractPollTask):
                                self.input_fd.close()
                        self.input_fd = None
 
+               if self._io_loop_task is not None:
+                       self._io_loop_task.done() or self._io_loop_task.cancel()
+                       self._io_loop_task = None
+
                if self.stdout_fd is not None:
                        os.close(self.stdout_fd)
                        self.stdout_fd = None
 
                if self._log_file is not None:
+                       self.scheduler.remove_writer(self._log_file.fileno())
                        self._log_file.close()
                        self._log_file = None
 
-- 
2.25.3


Reply via email to