Author: gsim
Date: Wed Oct 29 21:21:54 2014
New Revision: 1635306
URL: http://svn.apache.org/r1635306
Log:
Support for heartbeats
Modified:
qpid/proton/branches/examples/tutorial/proton_events.py
Modified: qpid/proton/branches/examples/tutorial/proton_events.py
URL:
http://svn.apache.org/viewvc/qpid/proton/branches/examples/tutorial/proton_events.py?rev=1635306&r1=1635305&r2=1635306&view=diff
==============================================================================
--- qpid/proton/branches/examples/tutorial/proton_events.py (original)
+++ qpid/proton/branches/examples/tutorial/proton_events.py Wed Oct 29 21:21:54
2014
@@ -24,10 +24,11 @@ from select import select
class AmqpConnection(object):
- def __init__(self, conn, sock, events):
+ def __init__(self, conn, sock, events, heartbeat=None):
self.events = events
self.conn = conn
self.transport = Transport()
+ if heartbeat: self.transport.idle_timeout = heartbeat
self.transport.bind(self.conn)
self.socket = sock
self.socket.setblocking(0)
@@ -144,6 +145,11 @@ class AmqpConnection(object):
self.transport.unbind()
self.events.dispatch(ApplicationEvent("disconnected",
connection=self.conn))
+ def tick(self):
+ t = self.transport.tick(time.time())
+ if t: return t - time.time()
+ else: return None
+
class Acceptor:
def __init__(self, events, loop, host, port):
@@ -182,6 +188,7 @@ class Acceptor:
self.loop.add(AmqpConnection(self.events.connection(), sock,
self.events).accept())
def removed(self): pass
+ def tick(self): return None
class EventInjector(object):
def __init__(self, events):
@@ -215,6 +222,7 @@ class EventInjector(object):
self.events.dispatch(self.queue.get())
def removed(self): pass
+ def tick(self): return None
class Events(object):
@@ -304,6 +312,11 @@ class ScheduledEvents(Events):
def empty(self):
return super(ScheduledEvents, self).empty and len(self._events) == 0
+def _min(a, b):
+ if a and b: return min(a, b)
+ elif a: return a
+ else: return b
+
class SelectLoop(object):
def __init__(self, events):
@@ -342,10 +355,12 @@ class SelectLoop(object):
reading = []
writing = []
closed = []
+ tick = None
for s in self.selectables:
if s.reading(): reading.append(s)
if s.writing(): writing.append(s)
if s.closed(): closed.append(s)
+ else: tick = _min(tick, s.tick())
for s in closed:
self.selectables.remove(s)
@@ -359,9 +374,12 @@ class SelectLoop(object):
timeout = 0
if self.events.next_interval and (timeout is None or
self.events.next_interval < timeout):
timeout = self.events.next_interval
+ if tick:
+ timeout = _min(tick, timeout)
if reading or writing or timeout:
readable, writable, _ = select(reading, writing, [], timeout)
-
+ for s in self.selectables:
+ s.tick()
for s in readable:
s.readable()
for s in writable:
@@ -832,8 +850,9 @@ class Connector(Handler):
def _connect(self, connection):
host, port = connection.address.next()
- print "connecting to %s:%i" % (host, port)
- self.loop.add(AmqpConnection(connection, socket.socket(),
self.loop.events).connect(host, port))
+ #print "connecting to %s:%i" % (host, port)
+ heartbeat = connection.heartbeat if hasattr(connection, 'heartbeat')
else None
+ self.loop.add(AmqpConnection(connection, socket.socket(),
self.loop.events, heartbeat=heartbeat).connect(host, port))
def on_connection_local_open(self, event):
if hasattr(event.connection, "address"):
@@ -959,9 +978,10 @@ class EventLoop(object):
self.trigger = None
self.container_id = str(generate_uuid())
- def connect(self, url=None, urls=None, address=None, handler=None,
reconnect=None):
+ def connect(self, url=None, urls=None, address=None, handler=None,
reconnect=None, heartbeat=None):
context = MessagingContext(self.events.connection(), handler=handler)
context.conn.container = self.container_id or str(generate_uuid())
+ context.conn.heartbeat = heartbeat
if url: context.conn.address = Url(url)
elif urls: context.conn.address = Urls(urls)
elif address: context.conn.address = address
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]