commit:     2b6e90fadfb1adcd8ccd2f313aa009b3d19ffefe
Author:     Zac Medico <zmedico <AT> gentoo <DOT> org>
AuthorDate: Sun Apr 15 23:42:49 2018 +0000
Commit:     Zac Medico <zmedico <AT> gentoo <DOT> org>
CommitDate: Sun Apr 15 23:58:05 2018 +0000
URL:        https://gitweb.gentoo.org/proj/portage.git/commit/?id=2b6e90fa

EventLoop: support add_reader/writer fd overlap (bug 649588)

The AbstractEventLoop add_reader and add_writer methods need to support
simultaneous registration of reader and writer callbacks for the same
fd. For example, this feature is used by the standard library's
asyncio.unix_events._UnixWritePipeTransport class, which is used to
implement AbstractEventLoop.subprocess_exec(stdin=subprocess.PIPE).

Bug: https://bugs.gentoo.org/649588

 pym/portage/util/_eventloop/EventLoop.py | 83 ++++++++++++++++++++++++++++----
 1 file changed, 73 insertions(+), 10 deletions(-)

diff --git a/pym/portage/util/_eventloop/EventLoop.py 
b/pym/portage/util/_eventloop/EventLoop.py
index 32dc2fc9d..35d0a35ba 100644
--- a/pym/portage/util/_eventloop/EventLoop.py
+++ b/pym/portage/util/_eventloop/EventLoop.py
@@ -4,6 +4,7 @@
 from __future__ import division
 
 import errno
+import functools
 import logging
 import os
 import select
@@ -95,19 +96,20 @@ class EventLoop(object):
                        self._callback(*self._args)
                        return False
 
-       class _repeat_callback(object):
+       class _selector_callback(object):
                """
                Wraps an callback, and always returns True, for callbacks that
                are supposed to run repeatedly.
                """
-               __slots__ = ("_args", "_callback")
+               __slots__ = ("_args", "_callbacks")
 
-               def __init__(self, callback, args):
-                       self._callback = callback
-                       self._args = args
+               def __init__(self, callbacks):
+                       self._callbacks = callbacks
 
                def __call__(self, fd, event):
-                       self._callback(*self._args)
+                       for callback, mask in self._callbacks:
+                               if event & mask:
+                                       callback()
                        return True
 
        def __init__(self, main=True):
@@ -189,6 +191,9 @@ class EventLoop(object):
                        self.IO_OUT = PollConstants.POLLOUT
                        self.IO_PRI = PollConstants.POLLPRI
 
+               self._EVENT_READ = self.IO_IN | self.IO_HUP
+               self._EVENT_WRITE = self.IO_OUT
+
                self._child_handlers = {}
                self._sigchld_read = None
                self._sigchld_write = None
@@ -602,7 +607,19 @@ class EventLoop(object):
 
                Use functools.partial to pass keywords to the callback.
                """
-               self.io_add_watch(fd, self.IO_IN, 
self._repeat_callback(callback, args))
+               handler = self._poll_event_handlers.get(fd)
+               callbacks = [(functools.partial(callback, *args), 
self._EVENT_READ)]
+               selector_mask = self._EVENT_READ
+               if handler is not None:
+                       if not isinstance(handler.callback, 
self._selector_callback):
+                               raise AssertionError("add_reader called with fd 
"
+                                       "registered directly via io_add_watch")
+                       for item in handler.callback._callbacks:
+                               callback, mask = item
+                               if mask != self._EVENT_READ:
+                                       selector_mask |= mask
+                                       callbacks.append(item)
+               self.io_add_watch(fd, selector_mask, 
self._selector_callback(callbacks))
 
        def remove_reader(self, fd):
                """
@@ -610,7 +627,24 @@ class EventLoop(object):
                """
                handler = self._poll_event_handlers.get(fd)
                if handler is not None:
-                       return self.source_remove(handler.source_id)
+                       if not isinstance(handler.callback, 
self._selector_callback):
+                               raise AssertionError("remove_reader called with 
fd "
+                                       "registered directly via io_add_watch")
+                       callbacks = []
+                       selector_mask = 0
+                       removed = False
+                       for item in handler.callback._callbacks:
+                               callback, mask = item
+                               if mask == self._EVENT_READ:
+                                       removed = True
+                               else:
+                                       selector_mask |= mask
+                                       callbacks.append(item)
+                       self.source_remove(handler.source_id)
+                       if callbacks:
+                               self.io_add_watch(fd, selector_mask,
+                                       self._selector_callback(callbacks))
+                       return removed
                return False
 
        def add_writer(self, fd, callback, *args):
@@ -620,7 +654,19 @@ class EventLoop(object):
 
                Use functools.partial to pass keywords to the callback.
                """
-               self.io_add_watch(fd, self.IO_OUT, 
self._repeat_callback(callback, args))
+               handler = self._poll_event_handlers.get(fd)
+               callbacks = [(functools.partial(callback, *args), 
self._EVENT_WRITE)]
+               selector_mask = self._EVENT_WRITE
+               if handler is not None:
+                       if not isinstance(handler.callback, 
self._selector_callback):
+                               raise AssertionError("add_reader called with fd 
"
+                                       "registered directly via io_add_watch")
+                       for item in handler.callback._callbacks:
+                               callback, mask = item
+                               if mask != self._EVENT_WRITE:
+                                       selector_mask |= mask
+                                       callbacks.append(item)
+               self.io_add_watch(fd, selector_mask, 
self._selector_callback(callbacks))
 
        def remove_writer(self, fd):
                """
@@ -628,7 +674,24 @@ class EventLoop(object):
                """
                handler = self._poll_event_handlers.get(fd)
                if handler is not None:
-                       return self.source_remove(handler.source_id)
+                       if not isinstance(handler.callback, 
self._selector_callback):
+                               raise AssertionError("remove_reader called with 
fd "
+                                       "registered directly via io_add_watch")
+                       callbacks = []
+                       selector_mask = 0
+                       removed = False
+                       for item in handler.callback._callbacks:
+                               callback, mask = item
+                               if mask == self._EVENT_WRITE:
+                                       removed = True
+                               else:
+                                       selector_mask |= mask
+                                       callbacks.append(item)
+                       self.source_remove(handler.source_id)
+                       if callbacks:
+                               self.io_add_watch(fd, selector_mask,
+                                       self._selector_callback(callbacks))
+                       return removed
                return False
 
        def io_add_watch(self, f, condition, callback, *args):

Reply via email to