Author: rhs
Date: Sun Oct 11 05:11:48 2009
New Revision: 824024

URL: http://svn.apache.org/viewvc?rev=824024&view=rev
Log:
resent linked variable; fixed possible drain failure during reconnect

Modified:
    qpid/trunk/qpid/python/qpid/driver.py
    qpid/trunk/qpid/python/qpid/messaging.py

Modified: qpid/trunk/qpid/python/qpid/driver.py
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/python/qpid/driver.py?rev=824024&r1=824023&r2=824024&view=diff
==============================================================================
--- qpid/trunk/qpid/python/qpid/driver.py (original)
+++ qpid/trunk/qpid/python/qpid/driver.py Sun Oct 11 05:11:48 2009
@@ -151,8 +151,11 @@
     for ssn in self.connection.sessions.values():
       for m in ssn.acked + ssn.unacked + ssn.incoming:
         m._transfer_id = None
+      for snd in ssn.senders:
+        snd.linked = False
       for rcv in ssn.receivers:
         rcv.impending = rcv.received
+        rcv.linked = False
 
   @synchronized
   def wakeup(self):
@@ -663,16 +666,22 @@
       sst.write_cmd(MessageFlow(rcv.destination, credit_unit.byte, 
UNLIMITED.value))
       sst.write_cmd(MessageFlow(rcv.destination, credit_unit.message, delta))
       rcv.impending += delta
-    elif delta < 0:
+    elif delta < 0 and not rcv.draining:
       _rcv.draining = True
-      def flush_stop_cmplt():
+      def do_stop():
         rcv.impending = rcv.received
         _rcv.draining = False
         self.grant(rcv)
-      if rcv.drain:
-        sst.write_cmd(MessageFlush(rcv.destination, sync=True), 
flush_stop_cmplt)
-      else:
-        sst.write_cmd(MessageStop(rcv.destination, sync=True), 
flush_stop_cmplt)
+      sst.write_cmd(MessageStop(rcv.destination, sync=True), do_stop)
+
+    if rcv.draining:
+      def do_flush():
+        rcv.impending = rcv.received
+        rcv.granted = rcv.impending
+        _rcv.draining = False
+        rcv.draining = False
+      sst.write_cmd(MessageFlush(rcv.destination, sync=True), do_flush)
+
 
   def process_receiver(self, rcv):
     if rcv.closed: return

Modified: qpid/trunk/qpid/python/qpid/messaging.py
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/python/qpid/messaging.py?rev=824024&r1=824023&r2=824024&view=diff
==============================================================================
--- qpid/trunk/qpid/python/qpid/messaging.py (original)
+++ qpid/trunk/qpid/python/qpid/messaging.py Sun Oct 11 05:11:48 2009
@@ -646,7 +646,7 @@
     self.started = started
     self.capacity = options.get("capacity", UNLIMITED)
     self.granted = Serial(0)
-    self.drain = False
+    self.draining = False
     self.impending = Serial(0)
     self.received = Serial(0)
     self.returned = Serial(0)
@@ -722,11 +722,9 @@
     self._ewait(lambda: self.impending >= self.granted)
     msg = self.session._get(self._pred, timeout=timeout)
     if msg is None:
-      self.drain = True
-      self.granted = self.received
+      self.draining = True
       self._wakeup()
-      self._ewait(lambda: self.impending == self.received)
-      self.drain = False
+      self._ewait(lambda: not self.draining)
       self._grant()
       self._wakeup()
       msg = self.session._get(self._pred, timeout=0)



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:[email protected]

Reply via email to