commit:     c77afbc31fa687cc612a6f946b324bf4d74d8175
Author:     Zac Medico <zmedico <AT> gentoo <DOT> org>
AuthorDate: Mon Apr 30 01:49:18 2018 +0000
Commit:     Zac Medico <zmedico <AT> gentoo <DOT> org>
CommitDate: Mon Apr 30 02:14:41 2018 +0000
URL:        https://gitweb.gentoo.org/proj/portage.git/commit/?id=c77afbc3

EventLoop: call add_reader/writer callbacks after pipe is closed (bug 654382)

Callbacks registered via add_reader/writer methods need to be called
when the other end of a pipe is closed, which does not result in a
normal read or write event. Therefore, respond to other event types
as well, for compatibility with the asyncio event loop implementation.

The included unit tests demonstrate asyncio compatible behavior for
both reader and writer callbacks.

Bug: https://bugs.gentoo.org/654382

 .../tests/util/futures/asyncio/test_pipe_closed.py | 133 +++++++++++++++++++++
 pym/portage/util/_eventloop/EventLoop.py           |   7 +-
 2 files changed, 138 insertions(+), 2 deletions(-)

diff --git a/pym/portage/tests/util/futures/asyncio/test_pipe_closed.py 
b/pym/portage/tests/util/futures/asyncio/test_pipe_closed.py
new file mode 100644
index 000000000..1ecddab78
--- /dev/null
+++ b/pym/portage/tests/util/futures/asyncio/test_pipe_closed.py
@@ -0,0 +1,133 @@
+# Copyright 2018 Gentoo Foundation
+# Distributed under the terms of the GNU General Public License v2
+
+import errno
+import os
+import pty
+import shutil
+import socket
+import sys
+import tempfile
+
+from portage.tests import TestCase
+from portage.util.futures import asyncio
+from portage.util.futures.unix_events import (
+       DefaultEventLoopPolicy,
+       _set_nonblocking,
+)
+
+
+class _PipeClosedTestCase(object):
+
+       def test_pipe(self):
+               read_end, write_end = os.pipe()
+               self._do_test(read_end, write_end)
+
+       def test_pty_device(self):
+               try:
+                       read_end, write_end = pty.openpty()
+               except EnvironmentError:
+                       self.skipTest('pty not available')
+               self._do_test(read_end, write_end)
+
+       def test_domain_socket(self):
+               if sys.version_info >= (3, 2):
+                       read_end, write_end = socket.socketpair()
+               else:
+                       self.skipTest('socket detach not supported')
+               self._do_test(read_end.detach(), write_end.detach())
+
+       def test_named_pipe(self):
+               tempdir = tempfile.mkdtemp()
+               try:
+                       fifo_path = os.path.join(tempdir, 'fifo')
+                       os.mkfifo(fifo_path)
+                       self._do_test(os.open(fifo_path, 
os.O_NONBLOCK|os.O_RDONLY),
+                               os.open(fifo_path, os.O_NONBLOCK|os.O_WRONLY))
+               finally:
+                       shutil.rmtree(tempdir)
+
+
+class ReaderPipeClosedTestCase(_PipeClosedTestCase, TestCase):
+       """
+       Test that a reader callback is called after the other end of
+       the pipe has been closed.
+       """
+       def _do_test(self, read_end, write_end):
+               initial_policy = asyncio.get_event_loop_policy()
+               if not isinstance(initial_policy, DefaultEventLoopPolicy):
+                       asyncio.set_event_loop_policy(DefaultEventLoopPolicy())
+
+               loop = asyncio.get_event_loop()
+               read_end = os.fdopen(read_end, 'rb', 0)
+               write_end = os.fdopen(write_end, 'wb', 0)
+               try:
+                       def reader_callback():
+                               if not reader_callback.called.done():
+                                       reader_callback.called.set_result(None)
+
+                       reader_callback.called = loop.create_future()
+                       loop.add_reader(read_end.fileno(), reader_callback)
+
+                       # Allow the loop to check for IO events, and assert
+                       # that our future is still not done.
+                       loop.run_until_complete(asyncio.sleep(0, loop=loop))
+                       self.assertFalse(reader_callback.called.done())
+
+                       # Demonstrate that the callback is called afer the
+                       # other end of the pipe has been closed.
+                       write_end.close()
+                       loop.run_until_complete(reader_callback.called)
+               finally:
+                       loop.remove_reader(read_end.fileno())
+                       write_end.close()
+                       read_end.close()
+                       asyncio.set_event_loop_policy(initial_policy)
+
+
+class WriterPipeClosedTestCase(_PipeClosedTestCase, TestCase):
+       """
+       Test that a writer callback is called after the other end of
+       the pipe has been closed.
+       """
+       def _do_test(self, read_end, write_end):
+               initial_policy = asyncio.get_event_loop_policy()
+               if not isinstance(initial_policy, DefaultEventLoopPolicy):
+                       asyncio.set_event_loop_policy(DefaultEventLoopPolicy())
+
+               loop = asyncio.get_event_loop()
+               read_end = os.fdopen(read_end, 'rb', 0)
+               write_end = os.fdopen(write_end, 'wb', 0)
+               try:
+                       def writer_callback():
+                               if not writer_callback.called.done():
+                                       writer_callback.called.set_result(None)
+
+                       writer_callback.called = loop.create_future()
+                       _set_nonblocking(write_end.fileno())
+                       loop.add_writer(write_end.fileno(), writer_callback)
+
+                       # Fill up the pipe, so that no writer callbacks should 
be
+                       # received until the state has changed.
+                       while True:
+                               try:
+                                       os.write(write_end.fileno(), 512 * b'0')
+                               except EnvironmentError as e:
+                                       if e.errno != errno.EAGAIN:
+                                               raise
+                                       break
+
+                       # Allow the loop to check for IO events, and assert
+                       # that our future is still not done.
+                       loop.run_until_complete(asyncio.sleep(0, loop=loop))
+                       self.assertFalse(writer_callback.called.done())
+
+                       # Demonstrate that the callback is called afer the
+                       # other end of the pipe has been closed.
+                       read_end.close()
+                       loop.run_until_complete(writer_callback.called)
+               finally:
+                       loop.remove_writer(write_end.fileno())
+                       write_end.close()
+                       read_end.close()
+                       asyncio.set_event_loop_policy(initial_policy)

diff --git a/pym/portage/util/_eventloop/EventLoop.py 
b/pym/portage/util/_eventloop/EventLoop.py
index 6a8b906ed..fc7380b03 100644
--- a/pym/portage/util/_eventloop/EventLoop.py
+++ b/pym/portage/util/_eventloop/EventLoop.py
@@ -192,8 +192,11 @@ class EventLoop(object):
                        self.IO_OUT = PollConstants.POLLOUT
                        self.IO_PRI = PollConstants.POLLPRI
 
-               self._EVENT_READ = self.IO_IN | self.IO_HUP
-               self._EVENT_WRITE = self.IO_OUT
+               # These trigger both reader and writer callbacks.
+               EVENT_SHARED = self.IO_HUP | self.IO_ERR | self.IO_NVAL
+
+               self._EVENT_READ = self.IO_IN | EVENT_SHARED
+               self._EVENT_WRITE = self.IO_OUT | EVENT_SHARED
 
                self._child_handlers = {}
                self._sigchld_read = None

Reply via email to