Author: rhs
Date: Sun Oct 11 05:11:48 2009
New Revision: 824024
URL: http://svn.apache.org/viewvc?rev=824024&view=rev
Log:
resent linked variable; fixed possible drain failure during reconnect
Modified:
qpid/trunk/qpid/python/qpid/driver.py
qpid/trunk/qpid/python/qpid/messaging.py
Modified: qpid/trunk/qpid/python/qpid/driver.py
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/python/qpid/driver.py?rev=824024&r1=824023&r2=824024&view=diff
==============================================================================
--- qpid/trunk/qpid/python/qpid/driver.py (original)
+++ qpid/trunk/qpid/python/qpid/driver.py Sun Oct 11 05:11:48 2009
@@ -151,8 +151,11 @@
for ssn in self.connection.sessions.values():
for m in ssn.acked + ssn.unacked + ssn.incoming:
m._transfer_id = None
+ for snd in ssn.senders:
+ snd.linked = False
for rcv in ssn.receivers:
rcv.impending = rcv.received
+ rcv.linked = False
@synchronized
def wakeup(self):
@@ -663,16 +666,22 @@
sst.write_cmd(MessageFlow(rcv.destination, credit_unit.byte,
UNLIMITED.value))
sst.write_cmd(MessageFlow(rcv.destination, credit_unit.message, delta))
rcv.impending += delta
- elif delta < 0:
+ elif delta < 0 and not rcv.draining:
_rcv.draining = True
- def flush_stop_cmplt():
+ def do_stop():
rcv.impending = rcv.received
_rcv.draining = False
self.grant(rcv)
- if rcv.drain:
- sst.write_cmd(MessageFlush(rcv.destination, sync=True),
flush_stop_cmplt)
- else:
- sst.write_cmd(MessageStop(rcv.destination, sync=True),
flush_stop_cmplt)
+ sst.write_cmd(MessageStop(rcv.destination, sync=True), do_stop)
+
+ if rcv.draining:
+ def do_flush():
+ rcv.impending = rcv.received
+ rcv.granted = rcv.impending
+ _rcv.draining = False
+ rcv.draining = False
+ sst.write_cmd(MessageFlush(rcv.destination, sync=True), do_flush)
+
def process_receiver(self, rcv):
if rcv.closed: return
Modified: qpid/trunk/qpid/python/qpid/messaging.py
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/python/qpid/messaging.py?rev=824024&r1=824023&r2=824024&view=diff
==============================================================================
--- qpid/trunk/qpid/python/qpid/messaging.py (original)
+++ qpid/trunk/qpid/python/qpid/messaging.py Sun Oct 11 05:11:48 2009
@@ -646,7 +646,7 @@
self.started = started
self.capacity = options.get("capacity", UNLIMITED)
self.granted = Serial(0)
- self.drain = False
+ self.draining = False
self.impending = Serial(0)
self.received = Serial(0)
self.returned = Serial(0)
@@ -722,11 +722,9 @@
self._ewait(lambda: self.impending >= self.granted)
msg = self.session._get(self._pred, timeout=timeout)
if msg is None:
- self.drain = True
- self.granted = self.received
+ self.draining = True
self._wakeup()
- self._ewait(lambda: self.impending == self.received)
- self.drain = False
+ self._ewait(lambda: not self.draining)
self._grant()
self._wakeup()
msg = self.session._get(self._pred, timeout=0)
---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project: http://qpid.apache.org
Use/Interact: mailto:[email protected]