In the coroutine finally clause, do not call remove_writer in cases
where fd has been closed and then re-allocated to a concurrent
coroutine as in bug 716636.

Also, assume that the caller will put the file in non-blocking mode
and close the file when done, so that this function is suitable for
use within a loop.

Bug: https://bugs.gentoo.org/728580
Signed-off-by: Zac Medico <zmed...@gentoo.org>
---
 lib/portage/util/futures/_asyncio/process.py | 11 ++++-
 lib/portage/util/futures/_asyncio/streams.py | 50 ++++++++++----------
 2 files changed, 33 insertions(+), 28 deletions(-)

diff --git a/lib/portage/util/futures/_asyncio/process.py 
b/lib/portage/util/futures/_asyncio/process.py
index 020164c9b..2d3e9b0fd 100644
--- a/lib/portage/util/futures/_asyncio/process.py
+++ b/lib/portage/util/futures/_asyncio/process.py
@@ -1,9 +1,12 @@
-# Copyright 2018 Gentoo Foundation
+# Copyright 2018-2020 Gentoo Authors
 # Distributed under the terms of the GNU General Public License v2
 
+import os
+
 import portage
 portage.proxy.lazyimport.lazyimport(globals(),
        'portage.util.futures:asyncio',
+       'portage.util.futures.unix_events:_set_nonblocking',
 )
 from portage.util.futures._asyncio.streams import _reader, _writer
 from portage.util.futures.compat_coroutine import coroutine, coroutine_return
@@ -59,7 +62,11 @@ class _Process(object):
                if input is not None:
                        if self._proc.stdin is None:
                                raise TypeError('communicate: expected file or 
int, got {}'.format(type(self._proc.stdin)))
-                       writer = 
asyncio.ensure_future(_writer(self._proc.stdin, input), loop=self._loop)
+                       stdin = self._proc.stdin
+                       stdin = os.fdopen(stdin, 'wb', 0) if isinstance(stdin, 
int) else stdin
+                       _set_nonblocking(stdin.fileno())
+                       writer = asyncio.ensure_future(_writer(stdin, input, 
loop=self._loop), loop=self._loop)
+                       writer.add_done_callback(lambda writer: stdin.close())
 
                try:
                        yield asyncio.wait(futures + [self.wait()], 
loop=self._loop)
diff --git a/lib/portage/util/futures/_asyncio/streams.py 
b/lib/portage/util/futures/_asyncio/streams.py
index 650a16491..870307e1e 100644
--- a/lib/portage/util/futures/_asyncio/streams.py
+++ b/lib/portage/util/futures/_asyncio/streams.py
@@ -1,4 +1,4 @@
-# Copyright 2018 Gentoo Foundation
+# Copyright 2018-2020 Gentoo Authors
 # Distributed under the terms of the GNU General Public License v2
 
 import errno
@@ -8,7 +8,6 @@ import portage
 portage.proxy.lazyimport.lazyimport(globals(),
        '_emerge.PipeReader:PipeReader',
        'portage.util.futures:asyncio',
-       'portage.util.futures.unix_events:_set_nonblocking',
 )
 from portage.util.futures.compat_coroutine import coroutine
 
@@ -59,38 +58,37 @@ class _Reader(object):
 @coroutine
 def _writer(output_file, content, loop=None):
        """
-       Asynchronously write bytes to output file, and close it when
-       done. If an EnvironmentError other than EAGAIN is encountered,
-       which typically indicates that the other end of the pipe has
-       close, the error is raised. This function is a coroutine.
+       Asynchronously write bytes to output file. The output file is
+       assumed to be in non-blocking mode. If an EnvironmentError
+       other than EAGAIN is encountered, which typically indicates that
+       the other end of the pipe has closed, the error is raised.
+       This function is a coroutine.
 
-       @param output_file: output file descriptor
-       @type output_file: file or int
+       @param output_file: output file
+       @type output_file: file object
        @param content: content to write
        @type content: bytes
        @param loop: asyncio.AbstractEventLoop (or compatible)
        @type loop: event loop
        """
-       fd = output_file if isinstance(output_file, int) else 
output_file.fileno()
-       _set_nonblocking(fd)
        loop = asyncio._wrap_loop(loop)
-       try:
-               while content:
+       fd = output_file.fileno()
+       while content:
+               try:
+                       content = content[os.write(fd, content):]
+               except EnvironmentError as e:
+                       if e.errno != errno.EAGAIN:
+                               raise
                        waiter = loop.create_future()
-                       loop.add_writer(fd, lambda: waiter.set_result(None))
+                       loop.add_writer(fd, lambda: waiter.done() or 
waiter.set_result(None))
                        try:
                                yield waiter
-                               while content:
-                                       try:
-                                               content = content[os.write(fd, 
content):]
-                                       except EnvironmentError as e:
-                                               if e.errno == errno.EAGAIN:
-                                                       break
-                                               else:
-                                                       raise
                        finally:
-                               loop.remove_writer(fd)
-       except GeneratorExit:
-               raise
-       finally:
-               os.close(output_file) if isinstance(output_file, int) else 
output_file.close()
+                               # The loop and output file may have been closed.
+                               if not loop.is_closed():
+                                       waiter.done() or waiter.cancel()
+                                       # Do not call remove_writer in cases 
where fd has
+                                       # been closed and then re-allocated to 
a concurrent
+                                       # coroutine as in bug 716636.
+                                       if not output_file.closed:
+                                               loop.remove_writer(fd)
-- 
2.25.3


Reply via email to