Repository: qpid-python Updated Branches: refs/heads/master 81f09ae33 -> 037c57387
QPID-7317: Fix hangs in qpid.messaging. Hang is observed in processes using qpid.messaging with a thread blocked waiting for the Selector to wake it, but no Selector.run thread. This patch removes all the known ways that this hang can occur. Either we function normally or immediately raise an exception and log to the "qpid.messaging" logger a message starting with "qpid.messaging:" The following issues are fixed: 1. The Selector.run() thread raises a fatal exception. Use of qpid.messaging will re-raise the exception immediately, not hang. 2. The process forks, so child has no Selector thread. https://issues.apache.org/jira/browse/QPID-5637 resets the Selector after a fork. In addition we now: - Close Selector.waiter: its file descriptors are shared with the parent which can cause havoc if they "steal" each other's wakeups. - Replace Endpoint._lock in related endpoints with a BrokenLock. If the parent is holding locks when it forks, they remain locked forever in the child. BrokenLock.acquire() raises instead of hanging. 3. Selector.stop() called on atexit. Selector.stop was registered via atexit, which could cause a hang if qpid.messaging was used in a later-executing atexit function. That has been removed, Selector.run() is in a daemon thread so there is no need for stop() 4. User calls Selector.stop() directly There is no reason to do this for the default Selector used by qpid.messaging, so for that case stop() is now ignored. It works as before for code that creates its own qpid.Selector instances. Project: http://git-wip-us.apache.org/repos/asf/qpid-python/repo Commit: http://git-wip-us.apache.org/repos/asf/qpid-python/commit/037c5738 Tree: http://git-wip-us.apache.org/repos/asf/qpid-python/tree/037c5738 Diff: http://git-wip-us.apache.org/repos/asf/qpid-python/diff/037c5738 Branch: refs/heads/master Commit: 037c5738734d8fecb7b7f7e7af4e4f14f9cd3a64 Parents: 81f09ae Author: Alan Conway <[email protected]> Authored: Fri Sep 23 17:23:55 2016 -0400 Committer: Alan Conway <[email protected]> Committed: Fri Sep 23 17:26:39 2016 -0400 ---------------------------------------------------------------------- qpid/selector.py | 72 ++++++++++++++++++++++++++++++++++++++++++++------- 1 file changed, 63 insertions(+), 9 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/qpid-python/blob/037c5738/qpid/selector.py ---------------------------------------------------------------------- diff --git a/qpid/selector.py b/qpid/selector.py index 4a15d56..32e542b 100644 --- a/qpid/selector.py +++ b/qpid/selector.py @@ -16,13 +16,15 @@ # specific language governing permissions and limitations # under the License. # -import atexit, time, errno, os +import time, errno, os from compat import select, SelectError, set, selectable_waiter, format_exc from threading import Thread, Lock from logging import getLogger log = getLogger("qpid.messaging") +class SelectorException(Exception): pass + class Acceptor: def __init__(self, sock, handler): @@ -53,8 +55,12 @@ class Selector: Selector.lock.acquire() try: if Selector.DEFAULT is None or Selector._current_pid != os.getpid(): + # If we forked, mark the existing Selector dead. + if Selector.DEFAULT is not None: + log.warning("qpid.messaging: process was forked") + Selector.DEFAULT.dead( + SelectorException("qpid.messaging: forked child process used parent connection"), True) sel = Selector() - atexit.register(sel.stop) sel.start() Selector.DEFAULT = sel Selector._current_pid = os.getpid() @@ -73,6 +79,9 @@ class Selector: self.exception = None def wakeup(self): + if self.exception: + log.error(str(self.exception)) + raise self.exception self.waiter.wakeup() def register(self, selectable): @@ -102,13 +111,14 @@ class Selector: def start(self): self.stopped = False + self.exception = None self.thread = Thread(target=self.run) self.thread.setDaemon(True) self.thread.start(); def run(self): try: - while not self.stopped: + while not self.stopped and not self.exception: wakeup = None for sel in self.selectables.copy(): t = self._update(sel) @@ -152,16 +162,60 @@ class Selector: if w is not None and now > w: sel.timeout() except Exception, e: - self.exception = e - info = format_exc() - log.error("qpid.messaging I/O thread has died: %s" % str(e)) - for sel in self.selectables.copy(): - if hasattr(sel, "abort"): - sel.abort(e, info) + log.error("qpid.messaging: I/O thread has died: %s\n%s" % (e, format_exc())) + dead(e, False) raise + self.dead(SelectorException("qpid.messaging: I/O thread exited"), False) def stop(self, timeout=None): + """Stop the selector and wait for it's thread to exit. + Ignored for the shared default() selector, which stops when the process exits. + + """ + if self.DEFAULT == self: # Never stop the DEFAULT Selector + return self.stopped = True self.wakeup() self.thread.join(timeout) + self.dead(SelectorException("qpid.messaging: I/O thread stopped"), False) + + def dead(self, e, forked): + """Mark the Selector as dead if it is stopped for any reason. + Ensure there any future calls to wait() will raise an exception. + If the thread died because of a fork() then ensure further that + attempting to take the connections lock also raises. + """ self.thread = None + self.exception = e + for sel in self.selectables.copy(): + try: + # Mark the connection as failed + sel.connection.error = e + if forked: + # Replace connection's locks, they may be permanently locked in the forked child. + c = sel.connection + c.error = e + c._lock = BrokenLock(e) + for ssn in c.sessions.values(): + ssn._lock = c._lock + for l in ssn.senders + ssn.receivers: + l._lock = c._lock + except: + pass + try: + if forked: + self.waiter.close() # Don't mess with the parent's FDs + else: + self.waiter.wakeup() # In case somebody re-waited while we were cleaning up. + except: + pass + +class BrokenLock(object): + """Dummy lock-like object that raises an exception. Used in forked child to + replace locks that may be held in the parent process.""" + def __init__(self, exception): + self.exception = exception + + def acquire(self): + log.error(str(self.exception)) + raise self.exception --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
