commit:     f1e9389d64b6ded41d0dac99979a049cfa27e75c
Author:     Zac Medico <zmedico <AT> gentoo <DOT> org>
AuthorDate: Wed Apr  8 05:00:11 2020 +0000
Commit:     Zac Medico <zmedico <AT> gentoo <DOT> org>
CommitDate: Wed Apr  8 05:29:48 2020 +0000
URL:        https://gitweb.gentoo.org/proj/portage.git/commit/?id=f1e9389d

Revert "PipeLogger: non-blocking write to pipe (bug 709746)"

This reverts commit 27712651aa7014a960b012dc89457df09677edc1.

Bug: https://bugs.gentoo.org/716636
Signed-off-by: Zac Medico <zmedico <AT> gentoo.org>

 lib/portage/tests/process/test_PopenProcess.py | 41 +---------------
 lib/portage/util/_async/PipeLogger.py          | 67 +++++---------------------
 2 files changed, 14 insertions(+), 94 deletions(-)

diff --git a/lib/portage/tests/process/test_PopenProcess.py 
b/lib/portage/tests/process/test_PopenProcess.py
index d4e97f210..ed506b814 100644
--- a/lib/portage/tests/process/test_PopenProcess.py
+++ b/lib/portage/tests/process/test_PopenProcess.py
@@ -9,8 +9,6 @@ from portage.tests import TestCase
 from portage.util._async.PipeLogger import PipeLogger
 from portage.util._async.PopenProcess import PopenProcess
 from portage.util._eventloop.global_event_loop import global_event_loop
-from portage.util.futures._asyncio.streams import _reader
-from portage.util.futures.compat_coroutine import coroutine, coroutine_return
 from _emerge.PipeReader import PipeReader
 
 class PopenPipeTestCase(TestCase):
@@ -75,41 +73,8 @@ class PopenPipeTestCase(TestCase):
 
                return content.decode('ascii', 'replace')
 
-       @coroutine
-       def _testPipeLoggerToPipe(self, test_string, loop=None):
-               """
-               Test PipeLogger writing to a pipe connected to a PipeReader.
-               This verifies that PipeLogger does not deadlock when writing
-               to a pipe that's drained by a PipeReader running in the same
-               process (requires non-blocking write).
-               """
-
-               producer = PopenProcess(proc=subprocess.Popen(
-                       ["bash", "-c", self._echo_cmd % test_string],
-                       stdout=subprocess.PIPE, stderr=subprocess.STDOUT),
-                       scheduler=loop)
-
-               pr, pw = os.pipe()
-
-               consumer = producer.pipe_reader = PipeLogger(background=True,
-                       input_fd=producer.proc.stdout,
-                       log_file_path=os.fdopen(pw, 'wb', 0))
-
-               reader = _reader(pr, loop=loop)
-               yield producer.async_start()
-               content = yield reader
-               yield producer.async_wait()
-               yield consumer.async_wait()
-
-               self.assertEqual(producer.returncode, os.EX_OK)
-               self.assertEqual(consumer.returncode, os.EX_OK)
-
-               coroutine_return(content.decode('ascii', 'replace'))
-
        def testPopenPipe(self):
-               loop = global_event_loop()
-
-               for x in (1, 2, 5, 6, 7, 8, 2**5, 2**10, 2**12, 2**13, 2**14, 
2**15, 2**16):
+               for x in (1, 2, 5, 6, 7, 8, 2**5, 2**10, 2**12, 2**13, 2**14):
                        test_string = x * "a"
                        output = self._testPipeReader(test_string)
                        self.assertEqual(test_string, output,
@@ -118,7 +83,3 @@ class PopenPipeTestCase(TestCase):
                        output = self._testPipeLogger(test_string)
                        self.assertEqual(test_string, output,
                                "x = %s, len(output) = %s" % (x, len(output)))
-
-                       output = 
loop.run_until_complete(self._testPipeLoggerToPipe(test_string, loop=loop))
-                       self.assertEqual(test_string, output,
-                               "x = %s, len(output) = %s" % (x, len(output)))

diff --git a/lib/portage/util/_async/PipeLogger.py 
b/lib/portage/util/_async/PipeLogger.py
index 6b03988a1..a4258f350 100644
--- a/lib/portage/util/_async/PipeLogger.py
+++ b/lib/portage/util/_async/PipeLogger.py
@@ -8,9 +8,6 @@ import sys
 
 import portage
 from portage import os, _encodings, _unicode_encode
-from portage.util.futures import asyncio
-from portage.util.futures.compat_coroutine import coroutine
-from portage.util.futures.unix_events import _set_nonblocking
 from _emerge.AbstractPollTask import AbstractPollTask
 
 class PipeLogger(AbstractPollTask):
@@ -24,15 +21,13 @@ class PipeLogger(AbstractPollTask):
        """
 
        __slots__ = ("input_fd", "log_file_path", "stdout_fd") + \
-               ("_io_loop_task", "_log_file", "_log_file_real")
+               ("_log_file", "_log_file_real")
 
        def _start(self):
 
                log_file_path = self.log_file_path
-               if hasattr(log_file_path, 'write'):
-                       self._log_file = log_file_path
-                       _set_nonblocking(self._log_file.fileno())
-               elif log_file_path is not None:
+               if log_file_path is not None:
+
                        self._log_file = open(_unicode_encode(log_file_path,
                                encoding=_encodings['fs'], errors='strict'), 
mode='ab')
                        if log_file_path.endswith('.gz'):
@@ -62,8 +57,7 @@ class PipeLogger(AbstractPollTask):
                                fcntl.fcntl(fd, fcntl.F_SETFD,
                                        fcntl.fcntl(fd, fcntl.F_GETFD) | 
fcntl.FD_CLOEXEC)
 
-               self._io_loop_task = asyncio.ensure_future(self._io_loop(fd), 
loop=self.scheduler)
-               self._io_loop_task.add_done_callback(self._io_loop_done)
+               self.scheduler.add_reader(fd, self._output_handler, fd)
                self._registered = True
 
        def _cancel(self):
@@ -71,8 +65,8 @@ class PipeLogger(AbstractPollTask):
                if self.returncode is None:
                        self.returncode = self._cancelled_returncode
 
-       @coroutine
-       def _io_loop(self, fd):
+       def _output_handler(self, fd):
+
                background = self.background
                stdout_fd = self.stdout_fd
                log_file = self._log_file 
@@ -82,18 +76,14 @@ class PipeLogger(AbstractPollTask):
 
                        if buf is None:
                                # not a POLLIN event, EAGAIN, etc...
-                               future = self.scheduler.create_future()
-                               self.scheduler.add_reader(fd, 
future.set_result, None)
-                               try:
-                                       yield future
-                               finally:
-                                       self.scheduler.remove_reader(fd)
-                                       future.done() or future.cancel()
-                               continue
+                               break
 
                        if not buf:
                                # EOF
-                               return
+                               self._unregister()
+                               self.returncode = self.returncode or os.EX_OK
+                               self._async_wait()
+                               break
 
                        else:
                                if not background and stdout_fd is not None:
@@ -130,34 +120,8 @@ class PipeLogger(AbstractPollTask):
                                                                fcntl.F_GETFL) 
^ os.O_NONBLOCK)
 
                                if log_file is not None:
-                                       write_buf = buf
-                                       while True:
-                                               try:
-                                                       if write_buf is not 
None:
-                                                               
log_file.write(write_buf)
-                                                               write_buf = None
-                                                       log_file.flush()
-                                               except EnvironmentError as e:
-                                                       if e.errno != 
errno.EAGAIN:
-                                                               raise
-                                                       future = 
self.scheduler.create_future()
-                                                       
self.scheduler.add_writer(self._log_file.fileno(), future.set_result, None)
-                                                       try:
-                                                               yield future
-                                                       finally:
-                                                               
self.scheduler.remove_writer(self._log_file.fileno())
-                                                               future.done() 
or future.cancel()
-                                               else:
-                                                       break
-
-       def _io_loop_done(self, future):
-               try:
-                       future.result()
-               except asyncio.CancelledError:
-                       self.cancel()
-                       self._was_cancelled()
-               self.returncode = self.returncode or os.EX_OK
-               self._async_wait()
+                                       log_file.write(buf)
+                                       log_file.flush()
 
        def _unregister(self):
                if self.input_fd is not None:
@@ -169,16 +133,11 @@ class PipeLogger(AbstractPollTask):
                                self.input_fd.close()
                        self.input_fd = None
 
-               if self._io_loop_task is not None:
-                       self._io_loop_task.done() or self._io_loop_task.cancel()
-                       self._io_loop_task = None
-
                if self.stdout_fd is not None:
                        os.close(self.stdout_fd)
                        self.stdout_fd = None
 
                if self._log_file is not None:
-                       self.scheduler.remove_writer(self._log_file.fileno())
                        self._log_file.close()
                        self._log_file = None
 

Reply via email to