Author: gsim
Date: Wed Oct 29 21:21:54 2014
New Revision: 1635306

URL: http://svn.apache.org/r1635306
Log:
Support for heartbeats

Modified:
    qpid/proton/branches/examples/tutorial/proton_events.py

Modified: qpid/proton/branches/examples/tutorial/proton_events.py
URL: 
http://svn.apache.org/viewvc/qpid/proton/branches/examples/tutorial/proton_events.py?rev=1635306&r1=1635305&r2=1635306&view=diff
==============================================================================
--- qpid/proton/branches/examples/tutorial/proton_events.py (original)
+++ qpid/proton/branches/examples/tutorial/proton_events.py Wed Oct 29 21:21:54 
2014
@@ -24,10 +24,11 @@ from select import select
 
 class AmqpConnection(object):
 
-    def __init__(self, conn, sock, events):
+    def __init__(self, conn, sock, events, heartbeat=None):
         self.events = events
         self.conn = conn
         self.transport = Transport()
+        if heartbeat: self.transport.idle_timeout = heartbeat
         self.transport.bind(self.conn)
         self.socket = sock
         self.socket.setblocking(0)
@@ -144,6 +145,11 @@ class AmqpConnection(object):
             self.transport.unbind()
             self.events.dispatch(ApplicationEvent("disconnected", 
connection=self.conn))
 
+    def tick(self):
+        t = self.transport.tick(time.time())
+        if t: return t - time.time()
+        else: return None
+
 class Acceptor:
 
     def __init__(self, events, loop, host, port):
@@ -182,6 +188,7 @@ class Acceptor:
             self.loop.add(AmqpConnection(self.events.connection(), sock, 
self.events).accept())
 
     def removed(self): pass
+    def tick(self): return None
 
 class EventInjector(object):
     def __init__(self, events):
@@ -215,6 +222,7 @@ class EventInjector(object):
             self.events.dispatch(self.queue.get())
 
     def removed(self): pass
+    def tick(self): return None
 
 
 class Events(object):
@@ -304,6 +312,11 @@ class ScheduledEvents(Events):
     def empty(self):
         return super(ScheduledEvents, self).empty and len(self._events) == 0
 
+def _min(a, b):
+    if a and b: return min(a, b)
+    elif a: return a
+    else: return b
+
 class SelectLoop(object):
 
     def __init__(self, events):
@@ -342,10 +355,12 @@ class SelectLoop(object):
             reading = []
             writing = []
             closed = []
+            tick = None
             for s in self.selectables:
                 if s.reading(): reading.append(s)
                 if s.writing(): writing.append(s)
                 if s.closed(): closed.append(s)
+                else: tick = _min(tick, s.tick())
 
             for s in closed:
                 self.selectables.remove(s)
@@ -359,9 +374,12 @@ class SelectLoop(object):
             timeout = 0
         if self.events.next_interval and (timeout is None or 
self.events.next_interval < timeout):
             timeout = self.events.next_interval
+        if tick:
+            timeout = _min(tick, timeout)
         if reading or writing or timeout:
             readable, writable, _ = select(reading, writing, [], timeout)
-
+            for s in self.selectables:
+                s.tick()
             for s in readable:
                 s.readable()
             for s in writable:
@@ -832,8 +850,9 @@ class Connector(Handler):
 
     def _connect(self, connection):
         host, port = connection.address.next()
-        print "connecting to %s:%i" % (host, port)
-        self.loop.add(AmqpConnection(connection, socket.socket(), 
self.loop.events).connect(host, port))
+        #print "connecting to %s:%i" % (host, port)
+        heartbeat = connection.heartbeat if hasattr(connection, 'heartbeat') 
else None
+        self.loop.add(AmqpConnection(connection, socket.socket(), 
self.loop.events, heartbeat=heartbeat).connect(host, port))
 
     def on_connection_local_open(self, event):
         if hasattr(event.connection, "address"):
@@ -959,9 +978,10 @@ class EventLoop(object):
         self.trigger = None
         self.container_id = str(generate_uuid())
 
-    def connect(self, url=None, urls=None, address=None, handler=None, 
reconnect=None):
+    def connect(self, url=None, urls=None, address=None, handler=None, 
reconnect=None, heartbeat=None):
         context = MessagingContext(self.events.connection(), handler=handler)
         context.conn.container = self.container_id or str(generate_uuid())
+        context.conn.heartbeat = heartbeat
         if url: context.conn.address = Url(url)
         elif urls: context.conn.address = Urls(urls)
         elif address: context.conn.address = address



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to