commit: 2b6e90fadfb1adcd8ccd2f313aa009b3d19ffefe Author: Zac Medico <zmedico <AT> gentoo <DOT> org> AuthorDate: Sun Apr 15 23:42:49 2018 +0000 Commit: Zac Medico <zmedico <AT> gentoo <DOT> org> CommitDate: Sun Apr 15 23:58:05 2018 +0000 URL: https://gitweb.gentoo.org/proj/portage.git/commit/?id=2b6e90fa
EventLoop: support add_reader/writer fd overlap (bug 649588) The AbstractEventLoop add_reader and add_writer methods need to support simultaneous registration of reader and writer callbacks for the same fd. For example, this feature is used by the standard library's asyncio.unix_events._UnixWritePipeTransport class, which is used to implement AbstractEventLoop.subprocess_exec(stdin=subprocess.PIPE). Bug: https://bugs.gentoo.org/649588 pym/portage/util/_eventloop/EventLoop.py | 83 ++++++++++++++++++++++++++++---- 1 file changed, 73 insertions(+), 10 deletions(-) diff --git a/pym/portage/util/_eventloop/EventLoop.py b/pym/portage/util/_eventloop/EventLoop.py index 32dc2fc9d..35d0a35ba 100644 --- a/pym/portage/util/_eventloop/EventLoop.py +++ b/pym/portage/util/_eventloop/EventLoop.py @@ -4,6 +4,7 @@ from __future__ import division import errno +import functools import logging import os import select @@ -95,19 +96,20 @@ class EventLoop(object): self._callback(*self._args) return False - class _repeat_callback(object): + class _selector_callback(object): """ Wraps an callback, and always returns True, for callbacks that are supposed to run repeatedly. """ - __slots__ = ("_args", "_callback") + __slots__ = ("_args", "_callbacks") - def __init__(self, callback, args): - self._callback = callback - self._args = args + def __init__(self, callbacks): + self._callbacks = callbacks def __call__(self, fd, event): - self._callback(*self._args) + for callback, mask in self._callbacks: + if event & mask: + callback() return True def __init__(self, main=True): @@ -189,6 +191,9 @@ class EventLoop(object): self.IO_OUT = PollConstants.POLLOUT self.IO_PRI = PollConstants.POLLPRI + self._EVENT_READ = self.IO_IN | self.IO_HUP + self._EVENT_WRITE = self.IO_OUT + self._child_handlers = {} self._sigchld_read = None self._sigchld_write = None @@ -602,7 +607,19 @@ class EventLoop(object): Use functools.partial to pass keywords to the callback. """ - self.io_add_watch(fd, self.IO_IN, self._repeat_callback(callback, args)) + handler = self._poll_event_handlers.get(fd) + callbacks = [(functools.partial(callback, *args), self._EVENT_READ)] + selector_mask = self._EVENT_READ + if handler is not None: + if not isinstance(handler.callback, self._selector_callback): + raise AssertionError("add_reader called with fd " + "registered directly via io_add_watch") + for item in handler.callback._callbacks: + callback, mask = item + if mask != self._EVENT_READ: + selector_mask |= mask + callbacks.append(item) + self.io_add_watch(fd, selector_mask, self._selector_callback(callbacks)) def remove_reader(self, fd): """ @@ -610,7 +627,24 @@ class EventLoop(object): """ handler = self._poll_event_handlers.get(fd) if handler is not None: - return self.source_remove(handler.source_id) + if not isinstance(handler.callback, self._selector_callback): + raise AssertionError("remove_reader called with fd " + "registered directly via io_add_watch") + callbacks = [] + selector_mask = 0 + removed = False + for item in handler.callback._callbacks: + callback, mask = item + if mask == self._EVENT_READ: + removed = True + else: + selector_mask |= mask + callbacks.append(item) + self.source_remove(handler.source_id) + if callbacks: + self.io_add_watch(fd, selector_mask, + self._selector_callback(callbacks)) + return removed return False def add_writer(self, fd, callback, *args): @@ -620,7 +654,19 @@ class EventLoop(object): Use functools.partial to pass keywords to the callback. """ - self.io_add_watch(fd, self.IO_OUT, self._repeat_callback(callback, args)) + handler = self._poll_event_handlers.get(fd) + callbacks = [(functools.partial(callback, *args), self._EVENT_WRITE)] + selector_mask = self._EVENT_WRITE + if handler is not None: + if not isinstance(handler.callback, self._selector_callback): + raise AssertionError("add_reader called with fd " + "registered directly via io_add_watch") + for item in handler.callback._callbacks: + callback, mask = item + if mask != self._EVENT_WRITE: + selector_mask |= mask + callbacks.append(item) + self.io_add_watch(fd, selector_mask, self._selector_callback(callbacks)) def remove_writer(self, fd): """ @@ -628,7 +674,24 @@ class EventLoop(object): """ handler = self._poll_event_handlers.get(fd) if handler is not None: - return self.source_remove(handler.source_id) + if not isinstance(handler.callback, self._selector_callback): + raise AssertionError("remove_reader called with fd " + "registered directly via io_add_watch") + callbacks = [] + selector_mask = 0 + removed = False + for item in handler.callback._callbacks: + callback, mask = item + if mask == self._EVENT_WRITE: + removed = True + else: + selector_mask |= mask + callbacks.append(item) + self.source_remove(handler.source_id) + if callbacks: + self.io_add_watch(fd, selector_mask, + self._selector_callback(callbacks)) + return removed return False def io_add_watch(self, f, condition, callback, *args):
