commit: f1e9389d64b6ded41d0dac99979a049cfa27e75c Author: Zac Medico <zmedico <AT> gentoo <DOT> org> AuthorDate: Wed Apr 8 05:00:11 2020 +0000 Commit: Zac Medico <zmedico <AT> gentoo <DOT> org> CommitDate: Wed Apr 8 05:29:48 2020 +0000 URL: https://gitweb.gentoo.org/proj/portage.git/commit/?id=f1e9389d
Revert "PipeLogger: non-blocking write to pipe (bug 709746)" This reverts commit 27712651aa7014a960b012dc89457df09677edc1. Bug: https://bugs.gentoo.org/716636 Signed-off-by: Zac Medico <zmedico <AT> gentoo.org> lib/portage/tests/process/test_PopenProcess.py | 41 +--------------- lib/portage/util/_async/PipeLogger.py | 67 +++++--------------------- 2 files changed, 14 insertions(+), 94 deletions(-) diff --git a/lib/portage/tests/process/test_PopenProcess.py b/lib/portage/tests/process/test_PopenProcess.py index d4e97f210..ed506b814 100644 --- a/lib/portage/tests/process/test_PopenProcess.py +++ b/lib/portage/tests/process/test_PopenProcess.py @@ -9,8 +9,6 @@ from portage.tests import TestCase from portage.util._async.PipeLogger import PipeLogger from portage.util._async.PopenProcess import PopenProcess from portage.util._eventloop.global_event_loop import global_event_loop -from portage.util.futures._asyncio.streams import _reader -from portage.util.futures.compat_coroutine import coroutine, coroutine_return from _emerge.PipeReader import PipeReader class PopenPipeTestCase(TestCase): @@ -75,41 +73,8 @@ class PopenPipeTestCase(TestCase): return content.decode('ascii', 'replace') - @coroutine - def _testPipeLoggerToPipe(self, test_string, loop=None): - """ - Test PipeLogger writing to a pipe connected to a PipeReader. - This verifies that PipeLogger does not deadlock when writing - to a pipe that's drained by a PipeReader running in the same - process (requires non-blocking write). - """ - - producer = PopenProcess(proc=subprocess.Popen( - ["bash", "-c", self._echo_cmd % test_string], - stdout=subprocess.PIPE, stderr=subprocess.STDOUT), - scheduler=loop) - - pr, pw = os.pipe() - - consumer = producer.pipe_reader = PipeLogger(background=True, - input_fd=producer.proc.stdout, - log_file_path=os.fdopen(pw, 'wb', 0)) - - reader = _reader(pr, loop=loop) - yield producer.async_start() - content = yield reader - yield producer.async_wait() - yield consumer.async_wait() - - self.assertEqual(producer.returncode, os.EX_OK) - self.assertEqual(consumer.returncode, os.EX_OK) - - coroutine_return(content.decode('ascii', 'replace')) - def testPopenPipe(self): - loop = global_event_loop() - - for x in (1, 2, 5, 6, 7, 8, 2**5, 2**10, 2**12, 2**13, 2**14, 2**15, 2**16): + for x in (1, 2, 5, 6, 7, 8, 2**5, 2**10, 2**12, 2**13, 2**14): test_string = x * "a" output = self._testPipeReader(test_string) self.assertEqual(test_string, output, @@ -118,7 +83,3 @@ class PopenPipeTestCase(TestCase): output = self._testPipeLogger(test_string) self.assertEqual(test_string, output, "x = %s, len(output) = %s" % (x, len(output))) - - output = loop.run_until_complete(self._testPipeLoggerToPipe(test_string, loop=loop)) - self.assertEqual(test_string, output, - "x = %s, len(output) = %s" % (x, len(output))) diff --git a/lib/portage/util/_async/PipeLogger.py b/lib/portage/util/_async/PipeLogger.py index 6b03988a1..a4258f350 100644 --- a/lib/portage/util/_async/PipeLogger.py +++ b/lib/portage/util/_async/PipeLogger.py @@ -8,9 +8,6 @@ import sys import portage from portage import os, _encodings, _unicode_encode -from portage.util.futures import asyncio -from portage.util.futures.compat_coroutine import coroutine -from portage.util.futures.unix_events import _set_nonblocking from _emerge.AbstractPollTask import AbstractPollTask class PipeLogger(AbstractPollTask): @@ -24,15 +21,13 @@ class PipeLogger(AbstractPollTask): """ __slots__ = ("input_fd", "log_file_path", "stdout_fd") + \ - ("_io_loop_task", "_log_file", "_log_file_real") + ("_log_file", "_log_file_real") def _start(self): log_file_path = self.log_file_path - if hasattr(log_file_path, 'write'): - self._log_file = log_file_path - _set_nonblocking(self._log_file.fileno()) - elif log_file_path is not None: + if log_file_path is not None: + self._log_file = open(_unicode_encode(log_file_path, encoding=_encodings['fs'], errors='strict'), mode='ab') if log_file_path.endswith('.gz'): @@ -62,8 +57,7 @@ class PipeLogger(AbstractPollTask): fcntl.fcntl(fd, fcntl.F_SETFD, fcntl.fcntl(fd, fcntl.F_GETFD) | fcntl.FD_CLOEXEC) - self._io_loop_task = asyncio.ensure_future(self._io_loop(fd), loop=self.scheduler) - self._io_loop_task.add_done_callback(self._io_loop_done) + self.scheduler.add_reader(fd, self._output_handler, fd) self._registered = True def _cancel(self): @@ -71,8 +65,8 @@ class PipeLogger(AbstractPollTask): if self.returncode is None: self.returncode = self._cancelled_returncode - @coroutine - def _io_loop(self, fd): + def _output_handler(self, fd): + background = self.background stdout_fd = self.stdout_fd log_file = self._log_file @@ -82,18 +76,14 @@ class PipeLogger(AbstractPollTask): if buf is None: # not a POLLIN event, EAGAIN, etc... - future = self.scheduler.create_future() - self.scheduler.add_reader(fd, future.set_result, None) - try: - yield future - finally: - self.scheduler.remove_reader(fd) - future.done() or future.cancel() - continue + break if not buf: # EOF - return + self._unregister() + self.returncode = self.returncode or os.EX_OK + self._async_wait() + break else: if not background and stdout_fd is not None: @@ -130,34 +120,8 @@ class PipeLogger(AbstractPollTask): fcntl.F_GETFL) ^ os.O_NONBLOCK) if log_file is not None: - write_buf = buf - while True: - try: - if write_buf is not None: - log_file.write(write_buf) - write_buf = None - log_file.flush() - except EnvironmentError as e: - if e.errno != errno.EAGAIN: - raise - future = self.scheduler.create_future() - self.scheduler.add_writer(self._log_file.fileno(), future.set_result, None) - try: - yield future - finally: - self.scheduler.remove_writer(self._log_file.fileno()) - future.done() or future.cancel() - else: - break - - def _io_loop_done(self, future): - try: - future.result() - except asyncio.CancelledError: - self.cancel() - self._was_cancelled() - self.returncode = self.returncode or os.EX_OK - self._async_wait() + log_file.write(buf) + log_file.flush() def _unregister(self): if self.input_fd is not None: @@ -169,16 +133,11 @@ class PipeLogger(AbstractPollTask): self.input_fd.close() self.input_fd = None - if self._io_loop_task is not None: - self._io_loop_task.done() or self._io_loop_task.cancel() - self._io_loop_task = None - if self.stdout_fd is not None: os.close(self.stdout_fd) self.stdout_fd = None if self._log_file is not None: - self.scheduler.remove_writer(self._log_file.fileno()) self._log_file.close() self._log_file = None