Author: aconway Date: Tue Nov 24 22:40:53 2009 New Revision: 883909 URL: http://svn.apache.org/viewvc?rev=883909&view=rev Log: Added flow control to failover_test in cluster_tests.py.
Modified: qpid/trunk/qpid/cpp/src/tests/cluster_tests.py qpid/trunk/qpid/python/qpid/brokertest.py Modified: qpid/trunk/qpid/cpp/src/tests/cluster_tests.py URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/cluster_tests.py?rev=883909&r1=883908&r2=883909&view=diff ============================================================================== --- qpid/trunk/qpid/cpp/src/tests/cluster_tests.py (original) +++ qpid/trunk/qpid/cpp/src/tests/cluster_tests.py Tue Nov 24 22:40:53 2009 @@ -73,9 +73,9 @@ # Start sender and receiver threads cluster[0].declare_queue("test-queue") - receiver = NumberedReceiver(cluster[1]) + sender = NumberedSender(cluster[1], 1000) # Max queue depth + receiver = NumberedReceiver(cluster[2], sender) receiver.start() - sender = NumberedSender(cluster[2]) sender.start() # Kill original brokers, start new ones for the duration. Modified: qpid/trunk/qpid/python/qpid/brokertest.py URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/python/qpid/brokertest.py?rev=883909&r1=883908&r2=883909&view=diff ============================================================================== --- qpid/trunk/qpid/python/qpid/brokertest.py (original) +++ qpid/trunk/qpid/python/qpid/brokertest.py Tue Nov 24 22:40:53 2009 @@ -324,26 +324,48 @@ Thread to run a sender client and send numbered messages until stopped. """ - def __init__(self, broker): + def __init__(self, broker, max_depth=None): + """ + max_depth: enable flow control, ensure sent - received <= max_depth. + Requires self.received(n) to be called each time messages are received. + """ StoppableThread.__init__(self) self.sender = broker.test.popen( [broker.test.sender_exec, "--port", broker.port()], expect=EXPECT_RUNNING) + self.condition = Condition() + self.max = max_depth + self.received = 0 def run(self): try: self.sent = 0 while not self.stopped: + if self.max: + self.condition.acquire() + while self.sent - self.received > self.max: + self.condition.wait() + self.condition.release() self.sender.stdin.write(str(self.sent)+"\n") self.sender.stdin.flush() self.sent += 1 except Exception, e: self.error = RethrownException(e, self.sender.pname) + def notify_received(self, count): + """Called by receiver to enable flow control. count = messages received so far.""" + self.condition.acquire() + self.received = count + self.condition.notify() + self.condition.release() + class NumberedReceiver(Thread): """ Thread to run a receiver client and verify it receives sequentially numbered messages. """ - def __init__(self, broker): + def __init__(self, broker, sender = None): + """ + sender: enable flow control. Call sender.received(n) for each message received. + """ Thread.__init__(self) self.test = broker.test self.receiver = self.test.popen( @@ -351,7 +373,8 @@ self.stopat = None self.lock = Lock() self.error = None - + self.sender = sender + def run(self): try: self.received = 0 @@ -360,7 +383,10 @@ try: m = int(self.receiver.stdout.readline()) assert(m <= self.received) # Allow for duplicates - if (m == self.received): self.received += 1 + if (m == self.received): + self.received += 1 + if self.sender: + self.sender.notify_received(self.received) finally: self.lock.release() except Exception, e: --------------------------------------------------------------------- Apache Qpid - AMQP Messaging Implementation Project: http://qpid.apache.org Use/Interact: mailto:commits-subscr...@qpid.apache.org