Author: orudyy
Date: Fri Jun  5 13:37:53 2015
New Revision: 1683751

URL: http://svn.apache.org/r1683751
Log:
QPID-6567: [Python Client 0-8..0-91] ensure client won't send messages 
after/before sending flow-ok on suspending/resuming respectively. work by 
Lorenz Quack <[email protected]>

Modified:
    qpid/trunk/qpid/python/qpid/client.py
    qpid/trunk/qpid/python/qpid/peer.py

Modified: qpid/trunk/qpid/python/qpid/client.py
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/python/qpid/client.py?rev=1683751&r1=1683750&r2=1683751&view=diff
==============================================================================
--- qpid/trunk/qpid/python/qpid/client.py (original)
+++ qpid/trunk/qpid/python/qpid/client.py Fri Jun  5 13:37:53 2015
@@ -208,12 +208,12 @@ class ClientDelegate(Delegate):
     ch.closed(msg)
 
   def channel_flow(self, ch, msg):
-    # On resuming we want to minimize the possibility of sending a message 
before flow-ok has been sent.
+    # On resuming we don't want to send a message before flow-ok has been sent.
     # Therefore, we send flow-ok before we set the flow_control flag.
     if msg.active:
         msg.flow_ok()
     ch.set_flow_control(not msg.active)
-    # On pausing we want to minimize the possibility of sending a message 
after flow-ok has been sent.
+    # On suspending we don't want to send a message after flow-ok has been 
sent.
     # Therefore, we send flow-ok after we set the flow_control flag.
     if not msg.active:
         msg.flow_ok()

Modified: qpid/trunk/qpid/python/qpid/peer.py
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/python/qpid/peer.py?rev=1683751&r1=1683750&r2=1683751&view=diff
==============================================================================
--- qpid/trunk/qpid/python/qpid/peer.py (original)
+++ qpid/trunk/qpid/python/qpid/peer.py Fri Jun  5 13:37:53 2015
@@ -232,7 +232,7 @@ class Channel:
     self.synchronous = True
 
     self._flow_control_wait_failure = 
options.get("qpid.flow_control_wait_failure", 60)
-    self._flow_control_wc = threading.Condition()
+    self._flow_control_wait_condition = threading.Condition()
     self._flow_control = False
 
   def closed(self, reason):
@@ -347,8 +347,12 @@ class Channel:
       self.futures[cmd_id] = future
 
     if frame.method.klass.name == "basic" and frame.method.name == "publish":
+      self._flow_control_wait_condition.acquire()
       self.check_flow_control()
-    self.write(frame, content)
+      self.write(frame, content)
+      self._flow_control_wait_condition.release()
+    else:
+      self.write(frame, content)
 
     try:
       # here we depend on all nowait fields being named nowait
@@ -392,21 +396,19 @@ class Channel:
 
   # part of flow control for AMQP 0-8, 0-9, and 0-9-1
   def set_flow_control(self, value):
-    self._flow_control_wc.acquire()
+    self._flow_control_wait_condition.acquire()
     self._flow_control = value
     if value == False:
-      self._flow_control_wc.notify()
-    self._flow_control_wc.release()
+      self._flow_control_wait_condition.notify()
+    self._flow_control_wait_condition.release()
 
   # part of flow control for AMQP 0-8, 0-9, and 0-9-1
   def check_flow_control(self):
-    self._flow_control_wc.acquire()
     if self._flow_control:
-      self._flow_control_wc.wait(self._flow_control_wait_failure)
+      self._flow_control_wait_condition.wait(self._flow_control_wait_failure)
     if self._flow_control:
-      self._flow_control_wc.release()
+      self._flow_control_wait_condition.release()
       raise Timeout("Unable to send message for " + 
str(self._flow_control_wait_failure) + " seconds due to broker enforced flow 
control")
-    self._flow_control_wc.release()
 
   def __getattr__(self, name):
     type = self.spec.method(name)



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to