Author: kgiusti
Date: Wed Nov 11 22:45:42 2015
New Revision: 1713943
URL: http://svn.apache.org/viewvc?rev=1713943&view=rev
Log:
QPID-6839: python-qpid: Log the failure of the Selector thread
Modified:
qpid/trunk/qpid/python/qpid/messaging/driver.py
qpid/trunk/qpid/python/qpid/selector.py
Modified: qpid/trunk/qpid/python/qpid/messaging/driver.py
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/python/qpid/messaging/driver.py?rev=1713943&r1=1713942&r2=1713943&view=diff
==============================================================================
--- qpid/trunk/qpid/python/qpid/messaging/driver.py (original)
+++ qpid/trunk/qpid/python/qpid/messaging/driver.py Wed Nov 11 22:45:42 2015
@@ -390,18 +390,37 @@ class Driver:
@synchronized
def reading(self):
+ """Called by the Selector I/O thread to determine if the driver needs to
+ wait on the arrival of network data (call self.readable() callback)
+ """
return self._transport is not None and \
self._transport.reading(True)
@synchronized
def writing(self):
+ """Called by the Selector I/O thread to determine if it should block
+ waiting for output bandwidth (call the self.writeable() callback)
+ """
return self._transport is not None and \
self._transport.writing(self.engine.pending())
@synchronized
def timing(self):
+ """Called by the Selector I/O thread to determine if it should wake up the
+ driver (call the timeout() callback
+ """
return self._timeout
+ @synchronized
+ def abort(self, exc, info):
+ """Called if the Selector I/O thread hits an unrecoverable error and fails.
+ """
+ try:
+ self.connection.error = exc
+ log.error("I/O Thread Fatal error: %s\n%s" % (str(exc), info))
+ except:
+ pass
+
def _check_retry_ok(self):
"""We consider a reconnect to have suceeded only when we have received
open-ok from the peer.
Modified: qpid/trunk/qpid/python/qpid/selector.py
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/python/qpid/selector.py?rev=1713943&r1=1713942&r2=1713943&view=diff
==============================================================================
--- qpid/trunk/qpid/python/qpid/selector.py (original)
+++ qpid/trunk/qpid/python/qpid/selector.py Wed Nov 11 22:45:42 2015
@@ -17,8 +17,11 @@
# under the License.
#
import atexit, time, errno, os
-from compat import select, SelectError, set, selectable_waiter
+from compat import select, SelectError, set, selectable_waiter, format_exc
from threading import Thread, Lock
+from logging import getLogger
+
+log = getLogger("qpid.messaging")
class Acceptor:
@@ -67,6 +70,7 @@ class Selector:
self.reading.add(self.waiter)
self.stopped = False
self.thread = None
+ self.exception = None
def wakeup(self):
self.waiter.wakeup()
@@ -103,48 +107,58 @@ class Selector:
self.thread.start();
def run(self):
- while not self.stopped:
- wakeup = None
- for sel in self.selectables.copy():
- t = self._update(sel)
- if t is not None:
- if wakeup is None:
- wakeup = t
- else:
- wakeup = min(wakeup, t)
-
- rd = []
- wr = []
- ex = []
-
- while True:
- try:
- if wakeup is None:
- timeout = None
- else:
- timeout = max(0, wakeup - time.time())
- rd, wr, ex = select(self.reading, self.writing, (), timeout)
- break
- except SelectError, e:
- # Repeat the select call if we were interrupted.
- if e[0] == errno.EINTR:
- continue
- else:
- raise
-
- for sel in wr:
- if sel.writing():
- sel.writeable()
-
- for sel in rd:
- if sel.reading():
- sel.readable()
-
- now = time.time()
+ try:
+ while not self.stopped:
+ wakeup = None
+ for sel in self.selectables.copy():
+ t = self._update(sel)
+ if t is not None:
+ if wakeup is None:
+ wakeup = t
+ else:
+ wakeup = min(wakeup, t)
+
+ rd = []
+ wr = []
+ ex = []
+
+ while True:
+ try:
+ if wakeup is None:
+ timeout = None
+ else:
+ timeout = max(0, wakeup - time.time())
+ rd, wr, ex = select(self.reading, self.writing, (), timeout)
+ break
+ except SelectError, e:
+ # Repeat the select call if we were interrupted.
+ if e[0] == errno.EINTR:
+ continue
+ else:
+ # unrecoverable: promote to outer try block
+ raise
+
+ for sel in wr:
+ if sel.writing():
+ sel.writeable()
+
+ for sel in rd:
+ if sel.reading():
+ sel.readable()
+
+ now = time.time()
+ for sel in self.selectables.copy():
+ w = sel.timing()
+ if w is not None and now > w:
+ sel.timeout()
+ except Exception, e:
+ self.exception = e
+ info = format_exc()
+ log.error("qpid.messaging I/O thread has died: %s" % str(e))
for sel in self.selectables.copy():
- w = sel.timing()
- if w is not None and now > w:
- sel.timeout()
+ if hasattr(sel, "abort"):
+ sel.abort(e, info)
+ raise
def stop(self, timeout=None):
self.stopped = True
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]