Author: orudyy
Date: Fri Jun 5 13:37:53 2015
New Revision: 1683751
URL: http://svn.apache.org/r1683751
Log:
QPID-6567: [Python Client 0-8..0-91] ensure client won't send messages
after/before sending flow-ok on suspending/resuming respectively. work by
Lorenz Quack <[email protected]>
Modified:
qpid/trunk/qpid/python/qpid/client.py
qpid/trunk/qpid/python/qpid/peer.py
Modified: qpid/trunk/qpid/python/qpid/client.py
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/python/qpid/client.py?rev=1683751&r1=1683750&r2=1683751&view=diff
==============================================================================
--- qpid/trunk/qpid/python/qpid/client.py (original)
+++ qpid/trunk/qpid/python/qpid/client.py Fri Jun 5 13:37:53 2015
@@ -208,12 +208,12 @@ class ClientDelegate(Delegate):
ch.closed(msg)
def channel_flow(self, ch, msg):
- # On resuming we want to minimize the possibility of sending a message
before flow-ok has been sent.
+ # On resuming we don't want to send a message before flow-ok has been sent.
# Therefore, we send flow-ok before we set the flow_control flag.
if msg.active:
msg.flow_ok()
ch.set_flow_control(not msg.active)
- # On pausing we want to minimize the possibility of sending a message
after flow-ok has been sent.
+ # On suspending we don't want to send a message after flow-ok has been
sent.
# Therefore, we send flow-ok after we set the flow_control flag.
if not msg.active:
msg.flow_ok()
Modified: qpid/trunk/qpid/python/qpid/peer.py
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/python/qpid/peer.py?rev=1683751&r1=1683750&r2=1683751&view=diff
==============================================================================
--- qpid/trunk/qpid/python/qpid/peer.py (original)
+++ qpid/trunk/qpid/python/qpid/peer.py Fri Jun 5 13:37:53 2015
@@ -232,7 +232,7 @@ class Channel:
self.synchronous = True
self._flow_control_wait_failure =
options.get("qpid.flow_control_wait_failure", 60)
- self._flow_control_wc = threading.Condition()
+ self._flow_control_wait_condition = threading.Condition()
self._flow_control = False
def closed(self, reason):
@@ -347,8 +347,12 @@ class Channel:
self.futures[cmd_id] = future
if frame.method.klass.name == "basic" and frame.method.name == "publish":
+ self._flow_control_wait_condition.acquire()
self.check_flow_control()
- self.write(frame, content)
+ self.write(frame, content)
+ self._flow_control_wait_condition.release()
+ else:
+ self.write(frame, content)
try:
# here we depend on all nowait fields being named nowait
@@ -392,21 +396,19 @@ class Channel:
# part of flow control for AMQP 0-8, 0-9, and 0-9-1
def set_flow_control(self, value):
- self._flow_control_wc.acquire()
+ self._flow_control_wait_condition.acquire()
self._flow_control = value
if value == False:
- self._flow_control_wc.notify()
- self._flow_control_wc.release()
+ self._flow_control_wait_condition.notify()
+ self._flow_control_wait_condition.release()
# part of flow control for AMQP 0-8, 0-9, and 0-9-1
def check_flow_control(self):
- self._flow_control_wc.acquire()
if self._flow_control:
- self._flow_control_wc.wait(self._flow_control_wait_failure)
+ self._flow_control_wait_condition.wait(self._flow_control_wait_failure)
if self._flow_control:
- self._flow_control_wc.release()
+ self._flow_control_wait_condition.release()
raise Timeout("Unable to send message for " +
str(self._flow_control_wait_failure) + " seconds due to broker enforced flow
control")
- self._flow_control_wc.release()
def __getattr__(self, name):
type = self.spec.method(name)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]