On 6/18/20 7:13 AM, Brian Dolbec wrote: > On Thu, 18 Jun 2020 00:35:48 -0700 > Zac Medico <zmed...@gentoo.org> wrote: > >> In the coroutine finally clause, do not call remove_writer in cases >> where fd has been closed and then re-allocated to a concurrent >> coroutine as in bug 716636. >> >> Also, assume that the caller will put the file in non-blocking mode >> and close the file when done, so that this function is suitable for >> use within a loop. >> >> Bug: https://bugs.gentoo.org/728580 >> Signed-off-by: Zac Medico <zmed...@gentoo.org> >> --- >> lib/portage/util/futures/_asyncio/process.py | 11 ++++- >> lib/portage/util/futures/_asyncio/streams.py | 50 >> ++++++++++---------- 2 files changed, 33 insertions(+), 28 >> deletions(-) >> >> diff --git a/lib/portage/util/futures/_asyncio/process.py >> b/lib/portage/util/futures/_asyncio/process.py index >> 020164c9b..2d3e9b0fd 100644 --- >> a/lib/portage/util/futures/_asyncio/process.py +++ >> b/lib/portage/util/futures/_asyncio/process.py @@ -1,9 +1,12 @@ >> -# Copyright 2018 Gentoo Foundation >> +# Copyright 2018-2020 Gentoo Authors >> # Distributed under the terms of the GNU General Public License v2 >> >> +import os >> + >> import portage >> portage.proxy.lazyimport.lazyimport(globals(), >> 'portage.util.futures:asyncio', >> + 'portage.util.futures.unix_events:_set_nonblocking', >> ) >> from portage.util.futures._asyncio.streams import _reader, _writer >> from portage.util.futures.compat_coroutine import coroutine, >> coroutine_return @@ -59,7 +62,11 @@ class _Process(object): >> if input is not None: >> if self._proc.stdin is None: >> raise TypeError('communicate: >> expected file or int, got {}'.format(type(self._proc.stdin))) >> - writer = >> asyncio.ensure_future(_writer(self._proc.stdin, input), >> loop=self._loop) >> + stdin = self._proc.stdin >> + stdin = os.fdopen(stdin, 'wb', 0) if >> isinstance(stdin, int) else stdin >> + _set_nonblocking(stdin.fileno()) >> + writer = >> asyncio.ensure_future(_writer(stdin, input, loop=self._loop), >> loop=self._loop) >> + writer.add_done_callback(lambda writer: >> stdin.close()) >> try: >> yield asyncio.wait(futures + [self.wait()], >> loop=self._loop) diff --git >> a/lib/portage/util/futures/_asyncio/streams.py >> b/lib/portage/util/futures/_asyncio/streams.py index >> 650a16491..870307e1e 100644 --- >> a/lib/portage/util/futures/_asyncio/streams.py +++ >> b/lib/portage/util/futures/_asyncio/streams.py @@ -1,4 +1,4 @@ -# >> Copyright 2018 Gentoo Foundation +# Copyright 2018-2020 Gentoo Authors >> # Distributed under the terms of the GNU General Public License v2 >> >> import errno >> @@ -8,7 +8,6 @@ import portage >> portage.proxy.lazyimport.lazyimport(globals(), >> '_emerge.PipeReader:PipeReader', >> 'portage.util.futures:asyncio', >> - 'portage.util.futures.unix_events:_set_nonblocking', >> ) >> from portage.util.futures.compat_coroutine import coroutine >> >> @@ -59,38 +58,37 @@ class _Reader(object): >> @coroutine >> def _writer(output_file, content, loop=None): >> """ >> - Asynchronously write bytes to output file, and close it when >> - done. If an EnvironmentError other than EAGAIN is >> encountered, >> - which typically indicates that the other end of the pipe has >> - close, the error is raised. This function is a coroutine. >> + Asynchronously write bytes to output file. The output file is >> + assumed to be in non-blocking mode. If an EnvironmentError >> + other than EAGAIN is encountered, which typically indicates >> that >> + the other end of the pipe has closed, the error is raised. >> + This function is a coroutine. >> >> - @param output_file: output file descriptor >> - @type output_file: file or int >> + @param output_file: output file >> + @type output_file: file object >> @param content: content to write >> @type content: bytes >> @param loop: asyncio.AbstractEventLoop (or compatible) >> @type loop: event loop >> """ >> - fd = output_file if isinstance(output_file, int) else >> output_file.fileno() >> - _set_nonblocking(fd) >> loop = asyncio._wrap_loop(loop) >> - try: >> - while content: >> + fd = output_file.fileno() >> + while content: >> + try: >> + content = content[os.write(fd, content):] >> + except EnvironmentError as e: >> + if e.errno != errno.EAGAIN: >> + raise >> waiter = loop.create_future() >> - loop.add_writer(fd, lambda: >> waiter.set_result(None)) >> + loop.add_writer(fd, lambda: waiter.done() or >> waiter.set_result(None)) try: >> yield waiter >> - while content: >> - try: >> - content = >> content[os.write(fd, content):] >> - except EnvironmentError as e: >> - if e.errno == >> errno.EAGAIN: >> - break >> - else: >> - raise >> finally: >> - loop.remove_writer(fd) >> - except GeneratorExit: >> - raise >> - finally: >> - os.close(output_file) if isinstance(output_file, >> int) else output_file.close() >> + # The loop and output file may have >> been closed. >> + if not loop.is_closed(): >> + waiter.done() or >> waiter.cancel() >> + # Do not call remove_writer >> in cases where fd has >> + # been closed and then >> re-allocated to a concurrent >> + # coroutine as in bug 716636. >> + if not output_file.closed: >> + >> loop.remove_writer(fd) > > > looks fine to me >
Thanks, merged: https://gitweb.gentoo.org/proj/portage.git/commit/?id=92be5a02e452eb0810d2974bc7fa5ee2056ef8e7 -- Thanks, Zac
signature.asc
Description: OpenPGP digital signature