Author: kpvdr
Date: Wed Jun  3 15:25:56 2009
New Revision: 781431

URL: http://svn.apache.org/viewvc?rev=781431&view=rev
Log:
Python cluster test improvements and some additional tests

Modified:
    qpid/trunk/qpid/cpp/src/tests/cluster.py
    qpid/trunk/qpid/cpp/src/tests/run_cluster_tests
    qpid/trunk/qpid/cpp/src/tests/testlib.py

Modified: qpid/trunk/qpid/cpp/src/tests/cluster.py
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/cluster.py?rev=781431&r1=781430&r2=781431&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/cluster.py (original)
+++ qpid/trunk/qpid/cpp/src/tests/cluster.py Wed Jun  3 15:25:56 2009
@@ -29,10 +29,9 @@
         try:
             clusterName = "cluster-01"
             self.createCheckCluster(clusterName, 5)
-            self.checkNumBrokers(5)
             self.stopCheckCluster(clusterName)
         except:
-            self.killAllClusters()
+            self.killAllClusters(True)
             raise
 
     def test_Cluster_02_MultipleClusterInitialization(self):
@@ -40,13 +39,13 @@
         try:
             for i in range(0, 5):
                 clusterName = "cluster-02.%d" % i
-                self.createCluster(clusterName, 5)
+                self.createCheckCluster(clusterName, 5)
             self.checkNumBrokers(25)
             self.killCluster("cluster-02.2")
             self.checkNumBrokers(20)
             self.stopCheckAll()
         except:
-            self.killAllClusters()
+            self.killAllClusters(True)
             raise
         
     def test_Cluster_03_AddRemoveNodes(self):
@@ -66,7 +65,7 @@
             self.checkNumClusterBrokers(clusterName, 7)
             self.stopCheckAll()
         except:
-            self.killAllClusters()
+            self.killAllClusters(True)
             raise
 
     def test_Cluster_04_RemoveRestoreNodes(self):
@@ -93,7 +92,7 @@
             self.checkNumClusterBrokers(clusterName, 6)
             self.stopCheckAll()
         except:
-            self.killAllClusters()
+            self.killAllClusters(True)
             raise
         
     def test_Cluster_05_KillAllNodesThenRecover(self):
@@ -105,170 +104,126 @@
             self.createCheckCluster(clusterName, 6)
             self.stopCheckAll()
         except:
-            self.killAllClusters()
+            self.killAllClusters(True)
             raise
     
     def test_Cluster_06_PublishConsume(self):
         """Publish then consume 100 messages from a single cluster"""
         try:
-            clusterName = "cluster-06"
-            self.createCheckCluster(clusterName, 3)
-            self.sendReceiveMsgs(0, clusterName, "test-exchange-06", 
"test-queue-06", 100)
-            self.stopCheckAll()
+            dh = TestBaseCluster.DirectExchangeTestHelper(self, "cluster-06", 
3, "test-exchange-06", ["test-queue-06"])
+            dh.sendMsgs(100)
+            dh.finalizeTest()
         except:
-            self.killAllClusters()
+            self.killAllClusters(True)
             raise
     
     def test_Cluster_07_MultiplePublishConsume(self):
         """Staggered publish and consume on a single cluster"""
         try:
-            clusterName = "cluster-07"
-            exchangeName = "test-exchange-07"
-            queueName = "test-queue-07"
-            self.createCheckCluster(clusterName, 3)
-            self.createBindDirectExchangeQueue(0, clusterName, exchangeName, 
queueName)
-            txMsgs  = self.sendMsgs(0, clusterName, exchangeName, queueName, 
20) # 20, 0
-            rxMsgs  = self.receiveMsgs(1, clusterName, queueName, 10) # 10, 10
-            txMsgs += self.sendMsgs(2, clusterName, exchangeName, queueName, 
20) # 30, 10
-            rxMsgs += self.receiveMsgs(0, clusterName, queueName, 20) # 10, 30
-            txMsgs += self.sendMsgs(1, clusterName, exchangeName, queueName, 
20) # 30, 30 
-            rxMsgs += self.receiveMsgs(2, clusterName, queueName, 20) # 10, 50
-            txMsgs += self.sendMsgs(0, clusterName, exchangeName, queueName, 
20) # 30, 50
-            rxMsgs += self.receiveMsgs(1, clusterName, queueName, 30) # 0, 80
-            self.stopCheckAll()
-            if txMsgs != rxMsgs:
-                print "txMsgs=%s" % txMsgs
-                print "rxMsgs=%s" % rxMsgs
-                self.fail("Send - receive message mismatch")
+            dh = TestBaseCluster.DirectExchangeTestHelper(self, "cluster-07", 
3, "test-exchange-07", ["test-queue-07"])
+                                  #  tx  rx  nodes
+                                  #   0   0  0 1 2
+            dh.sendMsgs(20)       #  20   0  *
+            dh.receiveMsgs(10, 1) #  20  10    *
+            dh.sendMsgs(20, 2)    #  40  10      *
+            dh.receiveMsgs(20, 0) #  40  30  *
+            dh.sendMsgs(20, 1)    #  60  30    *
+            dh.receiveMsgs(20, 2) #  60  50       *
+            dh.sendMsgs(20, 0)    #  80  50  *
+            dh.finalizeTest()
         except:
-            self.killAllClusters()
+            self.killAllClusters(True)
             raise
     
     def test_Cluster_08_MsgPublishConsumeAddRemoveNodes(self):
         """Staggered publish and consume interleaved with adding and removing 
nodes on a single cluster"""
         try:
-            clusterName = "cluster-08"
-            exchangeName = "test-exchange-08"
-            queueName = "test-queue-08"
-            self.createCheckCluster(clusterName, 3)
-            self.createBindDirectExchangeQueue(0, clusterName, exchangeName, 
queueName)
-            txMsgs = self.sendMsgs(0, clusterName, exchangeName, queueName, 
20) # 20, 0
-            for i in range(3,6):
-                self.createClusterNode(i, clusterName)
-            self.checkNumClusterBrokers(clusterName, 6)
-            txMsgs += self.sendMsgs(1, clusterName, exchangeName, queueName, 
20) # 40, 0
-            self.killNode(0, clusterName)
-            self.checkNumClusterBrokers(clusterName, 5)
-            rxMsgs = self.receiveMsgs(2, clusterName, queueName, 10) # 30, 10
-            self.killNode(2, clusterName)
-            self.checkNumClusterBrokers(clusterName, 4)
-            rxMsgs += self.receiveMsgs(3, clusterName, queueName, 20) # 10, 30
-            self.createClusterNode(6, clusterName)
-            self.checkNumClusterBrokers(clusterName, 5)
-            txMsgs += self.sendMsgs(4, clusterName, exchangeName, queueName, 
20) # 30, 30 
-            rxMsgs += self.receiveMsgs(5, clusterName, queueName, 20) # 10, 50
-            self.createClusterNode(7, clusterName)
-            self.checkNumClusterBrokers(clusterName, 6)
-            txMsgs += self.sendMsgs(6, clusterName, exchangeName, queueName, 
20) # 30, 50
-            rxMsgs += self.receiveMsgs(1, clusterName, queueName, 30) # 0, 80
-            self.stopCheckAll()
-            if txMsgs != rxMsgs:
-                print "txMsgs=%s" % txMsgs
-                print "rxMsgs=%s" % rxMsgs
-                self.fail("Send - receive message mismatch")
+            dh = TestBaseCluster.DirectExchangeTestHelper(self, "cluster-08", 
3, "test-exchange-08", ["test-queue-08"])
+                                  #  tx  rx  nodes
+                                  #   0   0  0 1 2
+            dh.sendMsgs(20)       #  20   0  *
+            dh.addNodes(2)        #          0 1 2 3 4
+            dh.sendMsgs(20, 1)    #  40   0    *  
+            dh.killNode(0)        #          . 1 2 3 4
+            dh.receiveMsgs(10, 2) #  40  10      *
+            dh.killNode(2)        #          . 1 . 3 4
+            dh.receiveMsgs(20, 3) #  40  30        *
+            dh.addNodes()         #          . 1 . 3 4 5
+            dh.sendMsgs(20, 4)    #  60  30          *
+            dh.receiveMsgs(20, 5) #  60  50            *
+            dh.addNodes()         #          . 1 . 3 4 5 6
+            dh.sendMsgs(20, 6)    #  80  50              *
+            dh.killNode(5)        #          . 1 . 3 4 . 6
+            dh.finalizeTest()
         except:
-            self.killAllClusters()
+            self.killAllClusters(True)
             raise
      
     def test_Cluster_09_MsgPublishConsumeRemoveRestoreNodes(self):
         """Publish and consume messages interleaved with adding and restoring 
previous nodes on a single cluster"""
         try:
-            clusterName = "cluster-09"
-            exchangeName = "test-exchange-09"
-            queueName = "test-queue-09"
-            self.createCheckCluster(clusterName, 6)
-            self.createBindDirectExchangeQueue(0, clusterName, exchangeName, 
queueName)
-            txMsgs = self.sendMsgs(0, clusterName, exchangeName, queueName, 
20) # 20, 0
-            self.killNode(2, clusterName)
-            self.checkNumClusterBrokers(clusterName, 5)
-            txMsgs += self.sendMsgs(1, clusterName, exchangeName, queueName, 
20) # 40, 0
-            self.killNode(0, clusterName)
-            self.checkNumClusterBrokers(clusterName, 4)
-            rxMsgs = self.receiveMsgs(3, clusterName, queueName, 10) # 30, 10
-            self.killNode(4, clusterName)
-            self.checkNumClusterBrokers(clusterName, 3)
-            rxMsgs += self.receiveMsgs(5, clusterName, queueName, 20) # 10, 30
-            self.createClusterNode(2, clusterName)
-            self.checkNumClusterBrokers(clusterName, 4)
-            txMsgs += self.sendMsgs(1, clusterName, exchangeName, queueName, 
20) # 30, 30
-            self.createClusterNode(0, clusterName)
-            self.checkNumClusterBrokers(clusterName, 5)
-            rxMsgs += self.receiveMsgs(2, clusterName, queueName, 20) # 10, 50
-            self.createClusterNode(4, clusterName)
-            self.checkNumClusterBrokers(clusterName, 6)
-            txMsgs += self.sendMsgs(0, clusterName, exchangeName, queueName, 
20) # 30, 50
-            rxMsgs += self.receiveMsgs(4, clusterName, queueName, 30) # 0, 80
-            self.stopCheckAll()
-            if txMsgs != rxMsgs:
-                print "txMsgs=%s" % txMsgs
-                print "rxMsgs=%s" % rxMsgs
-                self.fail("Send - receive message mismatch")
+            dh = TestBaseCluster.DirectExchangeTestHelper(self, "cluster-09", 
6, "test-exchange-09", ["test-queue-09"])
+                                  #  tx  rx  nodes
+                                  #   0   0  0 1 2 3 4 5
+            dh.sendMsgs(20)       #  20   0  *
+            dh.killNode(2)        #          0 1 . 3 4 5
+            dh.sendMsgs(20, 1)    #  40   0    *
+            dh.killNode(0)        #          . 1 . 3 4 5
+            dh.receiveMsgs(10, 3) #  40  10        *
+            dh.killNode(4)        #          . 1 . 3 . 5
+            dh.receiveMsgs(20, 5) #  40  30            *
+            dh.restoreNode(2)     #          . 1 2 3 . 5
+            dh.sendMsgs(20, 1)    #  60  30    *
+            dh.restoreNode(0)     #          0 1 2 3 . 5
+            dh.receiveMsgs(20, 0) #  60  50  *
+            dh.killNode(2)        #          0 1 . 3 . 5
+            dh.restoreNode(2)     #          0 1 2 3 . 5
+            dh.sendMsgs(20, 2)    #  80  50      *
+            dh.finalizeTest()
         except:
-            self.killAllClusters()
+            self.killAllClusters(True)
             raise
    
     def test_Cluster_10_LinearNodeKillCreateProgression(self):
         """Publish and consume messages while linearly killing all original 
nodes and replacing them with new ones"""
         try:
-            clusterName = "cluster-10"
-            exchangeName = "test-exchange-10"
-            queueName = "test-queue-10"
-            self.createCheckCluster(clusterName, 4)
-            self.createBindDirectExchangeQueue(2, clusterName, exchangeName, 
queueName)
-            txMsgs = self.sendMsgs(2, clusterName, exchangeName, queueName, 20)
-            rxMsgs = self.receiveMsgs(3, clusterName, queueName, 10)
-            for i in range(0, 16):
-                self.killNode(i, clusterName)
-                self.createClusterNode(i+4, clusterName)
-                self.checkNumClusterBrokers(clusterName, 4)
-                txMsgs += self.sendMsgs(i+1, clusterName, exchangeName, 
queueName, 20)
-                rxMsgs += self.receiveMsgs(i+2, clusterName, queueName, 20)
-            rxMsgs += self.receiveMsgs(16, clusterName, queueName, 10)
-            self.stopCheckAll()
-            if txMsgs != rxMsgs:
-                print "txMsgs=%s" % txMsgs
-                print "rxMsgs=%s" % rxMsgs
-                self.fail("Send - receive message mismatch")
+            dh = TestBaseCluster.DirectExchangeTestHelper(self, "cluster-10", 
4, "test-exchange-10", ["test-queue-10"])
+                                        #  tx  rx  nodes
+                                        #   0   0  0 1 2 3
+            dh.sendMsgs(20)             #  20   0  *
+            dh.receiveMsgs(10, 1)       #  20  10    *
+            for i in range(0, 16):      # First loop:
+                dh.killNode(i)          #          . 1 2 3
+                dh.addNodes()           #          . 1 2 3 4
+                dh.sendMsgs(20, i+1)    #  40  10    *
+                dh.receiveMsgs(20, i+2) #  40  30      *
+                                        # After loop:
+                                        # 340 330  . . . . . . . . . . . . . . 
. . 16 17 18 19
+            dh.finalizeTest()
         except:
-            self.killAllClusters()
+            self.killAllClusters(True)
             raise
     
     def test_Cluster_11_CircularNodeKillRestoreProgression(self):
         """Publish and consume messages while circularly killing all original 
nodes and restoring them again"""
         try:
-            clusterName = "cluster-11"
-            exchangeName = "test-exchange-11"
-            queueName = "test-queue-11"
-            self.createCheckCluster(clusterName, 4)
-            self.createBindDirectExchangeQueue(2, clusterName, exchangeName, 
queueName)
-            txMsgs = self.sendMsgs(3, clusterName, exchangeName, queueName, 20)
-            rxMsgs = self.receiveMsgs(0, clusterName, queueName, 10)
-            self.killNode(0, clusterName)
-            self.killNode(1, clusterName)
-            for i in range(0, 16):
-                self.killNode((i + 2) % 4, clusterName)
-                self.createClusterNode(i % 4, clusterName)
-                self.checkNumClusterBrokers(clusterName, 2)
-                txMsgs += self.sendMsgs((i + 3) % 4, clusterName, 
exchangeName, queueName, 20)
-                rxMsgs += self.receiveMsgs((i + 4) % 4, clusterName, 
queueName, 20)
-            rxMsgs += self.receiveMsgs(3, clusterName, queueName, 10)
-            self.stopCheckAll()
-            if txMsgs != rxMsgs:
-                print "txMsgs=%s" % txMsgs
-                print "rxMsgs=%s" % rxMsgs
-                self.fail("Send - receive message mismatch")
+            dh = TestBaseCluster.DirectExchangeTestHelper(self, "cluster-11", 
4, "test-exchange-11", ["test-queue-11"])
+                                                #  tx  rx  nodes
+                                                #   0   0  0 1 2 3
+            dh.sendMsgs(20, 3)                  #  20   0        *
+            dh.receiveMsgs(10)                  #  20  10  *
+            dh.killNode(0)                      #          . 1 2 3
+            dh.killNode(1)                      #          . . 2 3
+            for i in range(0, 16):              # First loop:
+                dh.killNode((i + 2) % 4)        #          . . . 3
+                dh.restoreNode(i % 4)           #          0 . . 3
+                dh.sendMsgs(20, (i + 3) % 4)    #  40  10        *
+                dh.receiveMsgs(20, (i + 4) % 4) #  40  30  *
+                                                # After loop:
+                                                # 340 330  . . 2 3
+            dh.finalizeTest()
         except:
-            self.killAllClusters()
+            self.killAllClusters(True)
             raise
         
     def test_Cluster_12_KillAllNodesRecoverMessages(self):
@@ -277,180 +232,94 @@
             print " No store loaded, skipped"
             return
         try:
-            clusterName = "cluster-12"
-            exchangeName = "test-exchange-12"
-            queueName = "test-queue-12"
-            self.createCheckCluster(clusterName, 4)
-            self.createBindDirectExchangeQueue(2, clusterName, exchangeName, 
queueName)
-            txMsgs  = self.sendMsgs(0, clusterName, exchangeName, queueName, 
20)
-            rxMsgs  = self.receiveMsgs(1, clusterName, queueName, 10)
-            txMsgs += self.sendMsgs(2, clusterName, exchangeName, queueName, 
20)
-            rxMsgs += self.receiveMsgs(3, clusterName, queueName, 20)
-            self.killNode(0, clusterName)
-            self.createClusterNode(4, clusterName)
-            self.checkNumClusterBrokers(clusterName, 4)
-            txMsgs += self.sendMsgs(4, clusterName, exchangeName, queueName, 
20)
-            rxMsgs += self.receiveMsgs(1, clusterName, queueName, 20)
-            self.killNode(2, clusterName)
-            self.createClusterNode(0, clusterName)
-            self.createClusterNode(5, clusterName)
-            self.checkNumClusterBrokers(clusterName, 5)
-            txMsgs += self.sendMsgs(0, clusterName, exchangeName, queueName, 
20)
-            rxMsgs += self.receiveMsgs(3, clusterName, queueName, 20)
-            self.killAllClusters()
-            self.checkNumClusterBrokers(clusterName, 0)
-            self.createCluster(clusterName)
-            self.createClusterNode(3, clusterName) # last node to be used
-            self.createClusterNode(0, clusterName)
-            self.createClusterNode(1, clusterName)
-            self.createClusterNode(2, clusterName)
-            self.createClusterNode(4, clusterName)
-            self.createClusterNode(5, clusterName)
-            rxMsgs += self.receiveMsgs(0, clusterName, queueName, 10)
-            if txMsgs != rxMsgs:
-                print "txMsgs=%s" % txMsgs
-                print "rxMsgs=%s" % rxMsgs
-                self.fail("Send - receive message mismatch")
+            dh = TestBaseCluster.DirectExchangeTestHelper(self, "cluster-12", 
4, "test-exchange-12", ["test-queue-12"])
+                                  #  tx  rx  nodes
+                                  #   0   0  0 1 2 3
+            dh.sendMsgs(20, 2)    #  20   0      *
+            dh.receiveMsgs(10, 1) #  20  10    *
+            dh.killNode(1)        #          0 . 2 3
+            dh.sendMsgs(20, 0)    #  40  10  *
+            dh.receiveMsgs(20, 3) #  40  30        *
+            dh.killNode(2)        #          0 . . 3
+            dh.addNodes(2)        #          0 . . 3 4 5
+            dh.sendMsgs(20, 5)    #  60  30            *
+            dh.receiveMsgs(20, 4) #  60  50          *
+            dh.killCluster()      # cluster does not exist
+            dh.restoreCluster()   #  60  50  . . . . . .
+            dh.restoreNodes()     #          0 1 2 3 4 5
+            dh.finalizeTest()
         except:
-            self.killAllClusters()
+            self.killAllClusters(True)
             raise         
     
     def test_Cluster_13_TopicExchange(self):
-        """Create topic exchange in a cluster and make sure it replicates 
correctly"""
+        """Create topic exchange in a cluster and make sure it behaves 
correctly"""
         try:
-            clusterName = "cluster-13"
-            self.createCheckCluster(clusterName, 4)
-            topicExchangeName = "test-exchange-13"
-            topicQueueNameKeyList = {"test-queue-13-A" : "#.A", 
"test-queue-13-B" : "#.B", "test-queue-13-C" : "C.#", "test-queue-13-D" : "D.#"}
-            self.createBindTopicExchangeQueues(2, clusterName, 
topicExchangeName, topicQueueNameKeyList)
-            
-            # Place initial messages
-            txMsgsA = txMsgsC = self.sendMsgs(3, clusterName, 
topicExchangeName, "C.hello.A", 10) # (10, 0, 10, 0)
-            self.sendMsgs(2, clusterName, topicExchangeName, "hello", 10) # 
Should not go to any queue
-            txMsgsD = self.sendMsgs(1, clusterName, topicExchangeName, 
"D.hello.A", 10) # (20, 0, 10, 10)
-            txMsgsA += txMsgsD
-            txMsgsB = self.sendMsgs(0, clusterName, topicExchangeName, 
"hello.B", 20) # (20, 20, 10, 10)
+            topicQueueNameKeyList = {"test-queue-13-A" : "#.A", 
"test-queue-13-B" : "*.B", "test-queue-13-C" : "C.#", "test-queue-13-D" : "D.*"}
+            th = TestBaseCluster.TopicExchangeTestHelper(self, "cluster-13", 
4, "test-exchange-13", topicQueueNameKeyList)
+             # Place initial messages
+            th.sendMsgs("C.hello.A", 10)
+            th.sendMsgs("hello.world", 10) # matches none of the queues
+            th.sendMsgs("D.hello.A", 10)
+            th.sendMsgs("hello.B", 20)
+            th.sendMsgs("D.hello", 20)
             # Kill and add some nodes
-            self.killNode(0, clusterName)
-            self.killNode(2, clusterName)
-            self.createClusterNode(4, clusterName)
-            self.createClusterNode(5, clusterName)
-            self.checkNumClusterBrokers(clusterName, 4)
+            th.killNode(0)
+            th.killNode(2)
+            th.addNodes(2)
             # Pull 10 messages from each queue
-            rxMsgsA =  self.receiveMsgs(1, clusterName, "test-queue-13-A", 10) 
# (10, 20, 10, 10)           
-            rxMsgsB =  self.receiveMsgs(3, clusterName, "test-queue-13-B", 10) 
# (10, 10, 10, 10)                
-            rxMsgsC =  self.receiveMsgs(4, clusterName, "test-queue-13-C", 10) 
# (10, 10, 0, 10)            
-            rxMsgsD =  self.receiveMsgs(5, clusterName, "test-queue-13-D", 10) 
# (10, 10, 0, 0)
+            th.receiveMsgs(10)
             # Kill and add another node
-            self.killNode(4, clusterName)
-            self.createClusterNode(6, clusterName)
-            self.checkNumClusterBrokers(clusterName, 4)
+            th.killNode(4)
+            th.addNodes()
             # Add two more queues
-            self.createBindTopicExchangeQueues(6, clusterName, 
topicExchangeName, {"test-queue-13-E" : "#.bye.A", "test-queue-13-F" : 
"#.bye.B"})
+            th.addQueues({"test-queue-13-E" : "#.bye.A", "test-queue-13-F" : 
"#.bye.B"})
             # Place more messages
-            txMsgs = self.sendMsgs(3, clusterName, topicExchangeName, 
"C.bye.A", 10) # (20, 10, 10, 0, 10, 0)
-            txMsgsA += txMsgs
-            txMsgsC += txMsgs
-            txMsgsE  = txMsgs
-            self.sendMsgs(1, clusterName, topicExchangeName, "bye", 20) # 
Should not go to any queue
-            txMsgs = self.sendMsgs(5, clusterName, topicExchangeName, 
"D.bye.B", 20) # (20, 30, 10, 20, 10, 20)
-            txMsgsB += txMsgs
-            txMsgsD += txMsgs
-            txMsgsF  = txMsgs
+            th.sendMsgs("C.bye.A", 10)
+            th.sendMsgs("hello.bye", 20) # matches none of the queues
+            th.sendMsgs("hello.bye.B", 20)
+            th.sendMsgs("D.bye", 20)
             # Kill all nodes but one
-            self.killNode(1, clusterName)
-            self.killNode(3, clusterName)
-            self.killNode(6, clusterName)
-            self.checkNumClusterBrokers(clusterName, 1)
-            # Pull all remaining messages from each queue
-            rxMsgsA += self.receiveMsgs(5, clusterName, "test-queue-13-A", 20) 
        
-            rxMsgsB += self.receiveMsgs(5, clusterName, "test-queue-13-B", 30) 
              
-            rxMsgsC += self.receiveMsgs(5, clusterName, "test-queue-13-C", 10) 
         
-            rxMsgsD += self.receiveMsgs(5, clusterName, "test-queue-13-D", 20)
-            rxMsgsE  = self.receiveMsgs(5, clusterName, "test-queue-13-E", 10)
-            rxMsgsF  = self.receiveMsgs(5, clusterName, "test-queue-13-F", 20)
-            # Check messages
-            self.stopCheckAll()
-            if txMsgsA != rxMsgsA:
-                self.fail("Send - receive message mismatch for queue A")
-            if txMsgsB != rxMsgsB:
-                self.fail("Send - receive message mismatch for queue B")
-            if txMsgsC != rxMsgsC:
-                self.fail("Send - receive message mismatch for queue C")
-            if txMsgsD != rxMsgsD:
-                self.fail("Send - receive message mismatch for queue D")
-            if txMsgsE != rxMsgsE:
-                self.fail("Send - receive message mismatch for queue E")
-            if txMsgsF != rxMsgsF:
-                self.fail("Send - receive message mismatch for queue F")
+            th.killNode(1)
+            th.killNode(3)
+            th.killNode(6)
+            # Pull all remaining messages from each queue and check messages
+            th.finalizeTest()
         except:
-            self.killAllClusters()
+            self.killAllClusters(True)
             raise  
      
     def test_Cluster_14_FanoutExchange(self):
-        """Create fanout exchange in a cluster and make sure it replicates 
correctly"""
+        """Create fanout exchange in a cluster and make sure it behaves 
correctly"""
         try:
-            clusterName = "cluster-14"
-            self.createCheckCluster(clusterName, 4)
-            fanoutExchangeName = "test-exchange-14"
             fanoutQueueNameList = ["test-queue-14-A", "test-queue-14-B", 
"test-queue-14-C"]
-            self.createBindFanoutExchangeQueues(2, clusterName, 
fanoutExchangeName, fanoutQueueNameList)
-            
+            fh = TestBaseCluster.FanoutExchangeTestHelper(self, "cluster-14", 
4, "test-exchange-14", fanoutQueueNameList)
             # Place initial 20 messages, retrieve 10
-            txMsg = self.sendMsgs(3, clusterName, fanoutExchangeName, None, 20)
-            rxMsgA =  self.receiveMsgs(1, clusterName, "test-queue-14-A", 10)  
   
-            rxMsgB =  self.receiveMsgs(3, clusterName, "test-queue-14-B", 10)  
         
-            rxMsgC =  self.receiveMsgs(0, clusterName, "test-queue-14-C", 10)  
     
+            fh.sendMsgs(20)
+            fh.receiveMsgs(10)
             # Kill and add some nodes
-            self.killNode(0, clusterName)
-            self.killNode(2, clusterName)
-            self.createClusterNode(4, clusterName)
-            self.createClusterNode(5, clusterName)
-            self.checkNumClusterBrokers(clusterName, 4)
+            fh.killNode(0)
+            fh.killNode(2)
+            fh.addNodes(2)
             # Place another 20 messages, retrieve 20
-            txMsg += self.sendMsgs(3, clusterName, fanoutExchangeName, None, 
20)
-            rxMsgA += self.receiveMsgs(1, clusterName, "test-queue-14-A", 20)  
   
-            rxMsgB += self.receiveMsgs(3, clusterName, "test-queue-14-B", 20)  
         
-            rxMsgC += self.receiveMsgs(4, clusterName, "test-queue-14-C", 20)  
     
+            fh.sendMsgs(20)
+            fh.receiveMsgs(20)
             # Kill and add another node
-            self.killNode(4, clusterName)
-            self.createClusterNode(6, clusterName)
-            self.checkNumClusterBrokers(clusterName, 4)
+            fh.killNode(4)
+            fh.addNodes()
             # Add another 2 queues
-            self.createBindFanoutExchangeQueues(6, clusterName, 
fanoutExchangeName, ["test-queue-14-D", "test-queue-14-E"])
+            fh.addQueues(["test-queue-14-D", "test-queue-14-E"])
             # Place another 20 messages, retrieve 20
-            tmp = self.sendMsgs(3, clusterName, fanoutExchangeName, None, 20)
-            txMsg += tmp
-            rxMsgA += self.receiveMsgs(1, clusterName, "test-queue-14-A", 20)  
   
-            rxMsgB += self.receiveMsgs(3, clusterName, "test-queue-14-B", 20)  
         
-            rxMsgC += self.receiveMsgs(6, clusterName, "test-queue-14-C", 20)  
     
-            rxMsgD  = self.receiveMsgs(6, clusterName, "test-queue-14-D", 10)  
     
-            rxMsgE  = self.receiveMsgs(6, clusterName, "test-queue-14-E", 10)  
     
+            fh.sendMsgs(20)
+            fh.receiveMsgs(20)     
             # Kill all nodes but one
-            self.killNode(1, clusterName)
-            self.killNode(3, clusterName)
-            self.killNode(6, clusterName)
-            self.checkNumClusterBrokers(clusterName, 1)
-            # Pull all remaining messages from each queue
-            rxMsgA += self.receiveMsgs(5, clusterName, "test-queue-14-A", 10)  
         
-            rxMsgB += self.receiveMsgs(5, clusterName, "test-queue-14-B", 10)  
           
-            rxMsgC += self.receiveMsgs(5, clusterName, "test-queue-14-C", 10)  
          
-            rxMsgD += self.receiveMsgs(5, clusterName, "test-queue-14-D", 10)  
          
-            rxMsgE += self.receiveMsgs(5, clusterName, "test-queue-14-E", 10)  
          
+            fh.killNode(1)
+            fh.killNode(3)
+            fh.killNode(6)
             # Check messages
-            self.stopCheckAll()
-            if txMsg != rxMsgA:
-                self.fail("Send - receive message mismatch for queue A")
-            if txMsg != rxMsgB:
-                self.fail("Send - receive message mismatch for queue B")
-            if txMsg != rxMsgC:
-                self.fail("Send - receive message mismatch for queue C")
-            if tmp != rxMsgD:
-                self.fail("Send - receive message mismatch for queue D")
-            if tmp != rxMsgE:
-                self.fail("Send - receive message mismatch for queue E")
+            fh.finalizeTest()
         except:
-            self.killAllClusters()
+            self.killAllClusters(True)
             raise
 
 # Start the test here

Modified: qpid/trunk/qpid/cpp/src/tests/run_cluster_tests
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/run_cluster_tests?rev=781431&r1=781430&r2=781431&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/run_cluster_tests (original)
+++ qpid/trunk/qpid/cpp/src/tests/run_cluster_tests Wed Jun  3 15:25:56 2009
@@ -1,4 +1,4 @@
-#!/bin/sh
+#!/bin/bash
 
 #
 # Licensed to the Apache Software Foundation (ASF) under one
@@ -23,13 +23,13 @@
 TEST_DIR=${top_builddir}/src/tests
 
 # Check AIS requirements
-id -nG | grep '\<ais\>' >/dev/null || NOGROUP="You are not a member of the ais 
group."
-ps -u root | grep 'aisexec\|corosync' >/dev/null || NOAISEXEC="The aisexec or 
corosync daemon is not running as root"
+id -nG | grep '\<ais\>' > /dev/null || NOGROUP="You are not a member of the 
ais group."
+ps -u root | grep 'aisexec\|corosync' > /dev/null || NOAISEXEC="The aisexec or 
corosync daemon is not running as root"
 
 if test -n "${NOGROUP}" -o -n "${NOAISEXEC}"; then
     cat <<EOF
 
-    ========= WARNING: CLUSTERING TESTS DISABLED ==============
+    ======== WARNING: PYTHON CLUSTER TESTS DISABLED ===========
 
     Tests that depend on the openais library (used for clustering)
     will not be run because:
@@ -43,22 +43,28 @@
        exit 0
 fi
 
-export PYTHONPATH=$srcdir:$srcdir/../../../python
-export RUN_CLUSTER_TESTS=1
+# Check XML exchange requirements
+XML_LIB=$srcdir/../.libs/xml.so
+if [ -f ${XML_LIB} ]; then
+       export XML_LIB
+fi
+
+export PYTHONPATH=${srcdir}:${srcdir}/../../../python
 export QPIDD_EXEC=${top_builddir}/src/qpidd
 export CLUSTER_LIB=${top_builddir}/src/.libs/cluster.so
-export QPID_CONFIG_EXEC=$srcdir/../../../python/commands/qpid-config
-export QPID_ROUTE_EXEC=$srcdir/../../../python/commands/qpid-route
+export QPID_CONFIG_EXEC=${srcdir}/../../../python/commands/qpid-config
+export QPID_ROUTE_EXEC=${srcdir}/../../../python/commands/qpid-route
 export RECEIVER_EXEC=${top_builddir}/src/tests/receiver
 export SENDER_EXEC=${top_builddir}/src/tests/sender
 
+
 #Make sure temp dir exists if this is the first to use it
 TMP_STORE_DIR=${TEST_DIR}/test_tmp
 if ! test -d ${TMP_STORE_DIR} ; then
-    mkdir -p ${TMP_STORE_DIR}
        mkdir -p ${TMP_STORE_DIR}/cluster
 else
-       rm -rf "${TMP_STORE_DIR}/cluster"
+    # Delete old cluster test dirs
+    rm -rf "${TMP_STORE_DIR}/cluster"
     mkdir -p "${TMP_STORE_DIR}/cluster"
 fi
 export TMP_STORE_DIR
@@ -66,6 +72,6 @@
 
 sg ais -c "${srcdir}/cluster.py -v"
 RETCODE=$?
-if test x$RETCODE != x0; then 
-    echo "FAIL cluster tests"; exit 1;
+if test x${RETCODE} != x0; then 
+    exit 1;
 fi

Modified: qpid/trunk/qpid/cpp/src/tests/testlib.py
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/testlib.py?rev=781431&r1=781430&r2=781431&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/testlib.py (original)
+++ qpid/trunk/qpid/cpp/src/tests/testlib.py Wed Jun  3 15:25:56 2009
@@ -21,7 +21,7 @@
 # Support library for qpid python tests.
 #
 
-import os, signal, subprocess, unittest
+import os, re, signal, subprocess, unittest
 
 class TestBase(unittest.TestCase):
     """
@@ -32,8 +32,8 @@
     The following environment vars control if and how the test is run, and 
determine where many of the helper
     executables/libs are to be found.
     """
-    _storeEnable = os.getenv("STORE_ENABLE") != None # Must be True for 
durability to be enabled during the test
     _storeLib = os.getenv("STORE_LIB")
+    _storeEnable = _storeLib != None # Must be True for durability to be 
enabled during the test
     _qpiddExec = os.getenv("QPIDD_EXEC", "/usr/sbin/qpidd")
     _tempStoreDir = os.path.abspath(os.getenv("TMP_STORE_DIR", "/tmp/qpid"))
     
@@ -55,22 +55,7 @@
                 return " --%s yes" % key
             else:
                 return " --%s no" % key
-    
-    def _paramNum(self, key, val):
-        if val != None:
-            return " --%s %d" % (key, val)
-        return ""
-    
-    def _paramString(self, key, val):
-        if val != None:
-            return " --%s %s" % (key, val)
-        return ""
-    
-    def _paramStringList(self, key, valList, val):
-        if val in valList:
-            return " --%s %s" % (key, val)
-        return ""
-    
+       
     # --- Helper functions for message creation ---
     
     def _makeMessage(self, msgSize):
@@ -115,17 +100,37 @@
         #print "started broker: pid=%d, port=%d args: %s" % (pid, port, 
qpiddArgs)
         return (pid, port)
     
-    def killBroker(self, pid):
+    def killBroker(self, nodeTuple, ignoreFailures = False):
         """Kill a broker using kill -9"""
-        os.kill(pid, signal.SIGTERM)
-        #print "killed broker: pid=%d" % pid
+        try:
+            os.kill(nodeTuple[self.PID], signal.SIGKILL)
+            try:
+                os.waitpid(nodeTuple[self.PID], 0)
+            except:
+                pass
+            #print "killed broker: port=%d pid=%d" % (nodeTuple[self.PORT], 
nodeTuple[self.PID])
+        except:
+            if ignoreFailures:
+                print "WARNING: killBroker (port=%d pid=%d) failed - 
ignoring." % (nodeTuple[self.PORT], nodeTuple[self.PID])
+            else:
+                raise
     
-    def stopBroker(self, port):
+    def stopBroker(self, nodeTuple, ignoreFailures = False):
         """Stop a broker using qpidd -q"""
-        ret = os.spawnl(os.P_WAIT, self._qpiddExec, self._qpiddExec, 
"--port=%d" % port, "-q")
-        if ret != 0:
-            raise Exception("stopBroker(): port=%d: qpidd -q returned %d" % 
(port, ret))
-        #print "stopped broker: port=%d" % port 
+        try:
+            ret = os.spawnl(os.P_WAIT, self._qpiddExec, self._qpiddExec, 
"--port=%d" % nodeTuple[self.PORT], "--quit")
+            if ret != 0:
+                raise Exception("stopBroker(): port=%d: qpidd -q returned %d" 
% (port, ret))
+            try:
+                os.waitpid(nodeTuple[self.PID], 0)
+            except:
+                pass
+            #print "stopped broker: port=%d pid=%d" % (nodeTuple[self.PORT], 
nodeTuple[self.PID])
+        except:
+            if ignoreFailures:
+                print "WARNING: stopBroker (port=%d pid=%d) failed - 
ignoring." % (nodeTuple[self.PORT], nodeTuple[self.PID])
+            else:
+                raise
 
 
 
@@ -138,8 +143,10 @@
     The following environment vars control if and how the test is run, and 
determine where many of the helper
     executables/libs are to be found.
     """
-    _runClusterTests = os.getenv("RUN_CLUSTER_TESTS") != None # Must be True 
for these cluster tests to run
     _clusterLib = os.getenv("CLUSTER_LIB")
+    _clusterTestEnable = _clusterLib != None # Must be True for these cluster 
tests to run
+    _xmlLib = os.getenv("XML_LIB")
+    _xmlEnable = _xmlLib != None
     _qpidConfigExec = os.getenv("QPID_CONFIG_EXEC", "/usr/bin/qpid-config")
     _qpidRouteExec = os.getenv("QPID_ROUTE_EXEC", "/usr/bin/qpid-route")
     _receiverExec = os.getenv("RECEIVER_EXEC", 
"/usr/libexec/qpid/test/receiver")
@@ -164,10 +171,19 @@
     
     def run(self, res):
         """ Skip cluster testing if env var RUN_CLUSTER_TESTS is not 
defined."""
-        if not self._runClusterTests:
+        if not self._clusterTestEnable:
             return
         unittest.TestCase.run(self, res)
     
+    # --- Private helper / convenience functions ---
+    
+    def _checkPids(self, clusterName = None):
+        for pid, port in self.getTupleList():
+            try:
+                os.kill(pid, 0)
+            except:
+                raise Exception("_checkPids(): Broker with pid %d expected but 
does not exist! (crashed?)" % pid)
+        
     
     # --- Starting cluster node(s) ---
     
@@ -196,22 +212,25 @@
     
     # --- Cluster and node status ---
     
-    def getTupleList(self):
+    def getTupleList(self, clusterName = None):
         """Get list of (pid, port) tuples of all known cluster brokers"""
         tList = []
-        for l in self._clusterDict.itervalues():
-            for t in l.itervalues():
-                tList.append(t)
+        for c, l in self._clusterDict.iteritems():
+            if clusterName == None or c == clusterName:
+                for t in l.itervalues():
+                    tList.append(t)
         return tList
     
     def getNumBrokers(self):
         """Get total number of brokers in all known clusters"""
         return len(self.getTupleList())
     
-    def checkNumBrokers(self, expected):
+    def checkNumBrokers(self, expected = None, checkPids = True):
         """Check that the total number of brokers in all known clusters is the 
expected value"""
-        if self.getNumBrokers() != expected:
+        if expected != None and self.getNumBrokers() != expected:
             raise Exception("Unexpected number of brokers: expected %d, found 
%d" % (expected, self.getNumBrokers()))
+        if checkPids:
+            self._checkPids()
 
     def getClusterTupleList(self, clusterName):
         """Get list of (pid, port) tuples of all nodes in named cluster"""
@@ -227,11 +246,13 @@
         """Get the (pid, port) tuple for the given cluster node"""
         return self._clusterDict[clusterName][nodeNumber]
     
-    def checkNumClusterBrokers(self, clusterName, expected):
+    def checkNumClusterBrokers(self, clusterName, expected = None, checkPids = 
True):
         """Check that the total number of brokers in the named cluster is the 
expected value"""
-        if self.getNumClusterBrokers(clusterName) != expected:
+        if expected != None and self.getNumClusterBrokers(clusterName) != 
expected:
             raise Exception("Unexpected number of brokers in cluster %s: 
expected %d, found %d" % \
                             (clusterName, expected, 
self.getNumClusterBrokers(clusterName)))
+        if checkPids:
+            self._checkPids(clusterName)
 
     def clusterExists(self, clusterName):
         """ Return True if clusterName exists, False otherwise"""
@@ -250,16 +271,16 @@
     
     # --- Kill cluster nodes using signal 9 ---
     
-    def killNode(self, nodeNumber, clusterName, updateDict = True):
+    def killNode(self, nodeNumber, clusterName, updateDict = True, 
ignoreFailures = False):
         """Kill the given node in the named cluster using kill -9"""
-        self.killBroker(self.getNodeTuple(nodeNumber, clusterName)[self.PID])
+        self.killBroker(self.getNodeTuple(nodeNumber, clusterName), 
ignoreFailures)
         if updateDict:
             del(self._clusterDict[clusterName][nodeNumber])
     
-    def killCluster(self, clusterName, updateDict = True):
+    def killCluster(self, clusterName, updateDict = True, ignoreFailures = 
False):
         """Kill all nodes in the named cluster"""
         for n in self._clusterDict[clusterName].iterkeys():
-            self.killNode(n, clusterName, False)
+            self.killNode(n, clusterName, False, ignoreFailures)
         if updateDict:
             del(self._clusterDict[clusterName])
     
@@ -270,46 +291,46 @@
             raise Exception("Unable to kill cluster %s; %d nodes still exist" 
% \
                             (clusterName, 
self.getNumClusterBrokers(clusterName)))
     
-    def killAllClusters(self):
+    def killAllClusters(self, ignoreFailures = False):
         """Kill all known clusters"""
         for n in self._clusterDict.iterkeys():
-            self.killCluster(n, False)
+            self.killCluster(n, False, ignoreFailures)
         self._clusterDict.clear() 
     
-    def killAllClustersCheck(self):
+    def killAllClustersCheck(self, ignoreFailures = False):
         """Kill all known clusters and check that the cluster dictionary is 
empty"""
-        self.killAllClusters()
+        self.killAllClusters(ignoreFailures)
         self.checkNumBrokers(0)
     
     # --- Stop cluster nodes using qpidd -q ---
     
-    def stopNode(self, nodeNumber, clusterName, updateDict = True):
+    def stopNode(self, nodeNumber, clusterName, updateDict = True, 
ignoreFailures = False):
         """Stop the given node in the named cluster using qpidd -q"""
-        self.stopBroker(self.getNodeTuple(nodeNumber, clusterName)[self.PORT])
+        self.stopBroker(self.getNodeTuple(nodeNumber, clusterName), 
ignoreFailures)
         if updateDict:
             del(self._clusterDict[clusterName][nodeNumber])
     
-    def stopAllClusters(self):
+    def stopAllClusters(self, ignoreFailures = False):
         """Stop all known clusters"""
         for n in self._clusterDict.iterkeys():
-            self.stopCluster(n, False)
+            self.stopCluster(n, False, ignoreFailures)
         self._clusterDict.clear() 
 
     
-    def stopCluster(self, clusterName, updateDict = True):
+    def stopCluster(self, clusterName, updateDict = True, ignoreFailures = 
False):
         """Stop all nodes in the named cluster"""
         for n in self._clusterDict[clusterName].iterkeys():
-            self.stopNode(n, clusterName, False)
+            self.stopNode(n, clusterName, False, ignoreFailures)
         if updateDict:
             del(self._clusterDict[clusterName])
     
-    def stopCheckCluster(self, clusterName):
+    def stopCheckCluster(self, clusterName, ignoreFailures = False):
         """Stop the named cluster and check that the name is removed from the 
cluster dictionary"""
-        self.stopCluster(clusterName)
+        self.stopCluster(clusterName, True, ignoreFailures)
         if self.clusterExists(clusterName):
             raise Exception("Unable to kill cluster %s; %d nodes still exist" 
% (clusterName, self.getNumClusterBrokers(clusterName)))
     
-    def stopCheckAll(self):
+    def stopCheckAll(self, ignoreFailures = False):
         """Kill all known clusters and check that the cluster dictionary is 
empty"""
         self.stopAllClusters()
         self.checkNumBrokers(0)
@@ -479,11 +500,252 @@
         if wait:
             receiver.wait()
         return msgs
-   
-    def sendReceiveMsgs(self, nodeNumber, clusterName, exchangeName, 
queueName, numMsgs, wait = True, msgSize = None):
-        self.createBindDirectExchangeQueue(nodeNumber, clusterName, 
exchangeName, queueName)
-        txMsgs = self.sendMsgs(nodeNumber, clusterName, exchangeName, 
queueName, numMsgs, msgSize, wait)
-        rxMsgs = self.receiveMsgs(nodeNumber, clusterName, queueName, numMsgs, 
wait)
-        if txMsgs != rxMsgs:
-            self.fail("Send - receive message mismatch")
+
+
+    # --- Exchange-specific helper inner classes ---
+
+    class TestHelper:
+        """
+        This is a "virtual" superclass for test helpers, and is not useful on 
its own, but the
+        per-exchange subclasses are designed to keep track of the messages 
sent to and received
+        from queues which have bindings to that exchange type.
+        """
+        
+        def __init__(self, testBaseCluster, clusterName, numNodes, 
exchangeName, queueNameList):
+        
+            """Dictionary of queues and lists of messages sent to them."""
+            self._txMsgs = {}
+            """Dictionary of queues and lists of messages received from 
them."""
+            self._rxMsgs = {}
+            """List of node numbers currently in the cluster"""
+            self._nodes = []
+            """List of node numbers which have been killed and can therefore 
be recovered"""
+            self._deadNodes = []
+            """Last node to be used"""
+            self._lastNode = None
+            
+            self._testBaseCluster = testBaseCluster
+            self._clusterName = clusterName
+            self._exchangeName = exchangeName
+            self._queueNameList = queueNameList
+            self._addQueues(queueNameList)
+            self._testBaseCluster.createCheckCluster(clusterName, numNodes)
+            self._nodes.extend(range(0, numNodes))
+        
+        def _addQueues(self, queueNameList):
+             for qn in queueNameList:
+                if not qn in self._txMsgs:
+                    self._txMsgs[qn] = []
+                if not qn in self._rxMsgs:
+                    self._rxMsgs[qn] = []
+       
+        def _bindQueue(self, queueName, bindingKey, nodeNumber = None):
+            """Bind a queue to an exchange using a binding key."""
+            if nodeNumber == None:
+                nodeNumber = self._nodes[0] # first available node
+            self._testBaseCluster.addQueue(nodeNumber, self._clusterName, 
queueName)
+            self._testBaseCluster.bind(nodeNumber, self._clusterName, 
self._exchangeName, queueName, bindingKey)
+        
+        def _highestNodeNumber(self):
+            """Find the highest node number used so far between the current 
nodes and those stopped/killed."""
+            highestNode = self._nodes[-1]
+            if len(self._deadNodes) == 0:
+                return highestNode
+            highestDeadNode = self._deadNodes[-1]
+            if highestNode > highestDeadNode:
+                return highestNode
+            return highestDeadNode
+        
+        def killCluster(self):
+            """Kill all nodes in the cluster"""
+            self._testBaseCluster.killCluster(self._clusterName)
+            self._testBaseCluster.checkNumClusterBrokers(self._clusterName, 0)
+            self._deadNodes.extend(self._nodes)
+            self._deadNodes.sort()
+            del self._nodes[:]
+        
+        def restoreCluster(self, lastNode = None, restoreNodes = True):
+            """Restore a previously killed cluster"""
+            self._testBaseCluster.createCluster(self._clusterName)
+            if restoreNodes:
+                numNodes = len(self._deadNodes)
+                self.restoreNodes(lastNode)
+                
self._testBaseCluster.checkNumClusterBrokers(self._clusterName, numNodes)
+        
+        def addNodes(self, numberOfNodes = 1):
+            """Add a fixed number of nodes to the cluster."""
+            nodeStart = self._highestNodeNumber() + 1
+            for i in range(0, numberOfNodes):
+                nodeNumber = nodeStart + i
+                self._testBaseCluster.createClusterNode(nodeNumber, 
self._clusterName)
+                self._nodes.append(nodeNumber)
+            self._testBaseCluster.checkNumClusterBrokers(self._clusterName, 
len(self._nodes))
+        
+        def restoreNode(self, nodeNumber):
+            """Restore a cluster node that has been previously killed"""
+            if nodeNumber not in self._deadNodes:
+                raise Exception("restoreNode(): Node number %d not in dead 
node list %s" % (nodeNumber, self._deadNodes))
+            self._testBaseCluster.createClusterNode(nodeNumber, 
self._clusterName)
+            self._deadNodes.remove(nodeNumber)
+            self._nodes.append(nodeNumber)
+            self._nodes.sort()
+        
+        def restoreNodes(self, lastNode = None):
+            """Restore all known cluster nodes that have been previously 
killed starting with a known last-used node"""
+            if len(self._nodes) == 0: # restore last-used node first
+                if lastNode == None:
+                    lastNode = self._lastNode
+                self.restoreNode(lastNode)
+            while len(self._deadNodes) > 0:
+                self.restoreNode(self._deadNodes[0])
+        
+        def killNode(self, nodeNumber):
+            """Kill a cluster node (if it is in the _nodes list)."""
+            if nodeNumber not in self._nodes:
+                raise Exception("killNode(): Node number %d not in node list 
%s" % (nodeNumber, self._nodes))
+            self._testBaseCluster.killNode(nodeNumber, self._clusterName)
+            self._nodes.remove(nodeNumber)
+            self._deadNodes.append(nodeNumber)
+            self._deadNodes.sort()
+        
+        def sendMsgs(self, routingKey, numMsgs, nodeNumber = None, msgSize = 
None, wait = True):
+            """Send a fixed number of messages using the given routing key."""
+            if nodeNumber == None:
+                nodeNumber = self._nodes[0] # Use first available node
+            msgs = self._testBaseCluster._makeMessageList(numMsgs, msgSize)
+            sender = self._testBaseCluster.createSender(nodeNumber, 
self._clusterName, self._exchangeName, routingKey)
+            sender.stdin.write(msgs)
+            sender.stdin.close()
+            if wait:
+                sender.wait()
+            self._lastNode = nodeNumber
+            return msgs.split()
+        
+        # TODO - this i/f is messy: one mumMsgs can be given, but a list of 
queues
+        #        so assuming numMsgs for each queue
+        #        A mechanism is needed to specify a different numMsgs per queue
+        def receiveMsgs(self, numMsgs, nodeNumber = None, queueNameList = 
None, wait = True):
+            """Receive a fixed number of messages from a named queue. If 
numMsgs == None, get all remaining messages."""
+            if nodeNumber == None:
+                nodeNumber = self._nodes[0] # Use first available node
+            if queueNameList == None:
+                queueNameList = self._txMsgs.iterkeys()
+            for qn in queueNameList:
+                nm = numMsgs
+                if nm == None:
+                    nm = len(self._txMsgs[qn]) - len(self._rxMsgs[qn]) # get 
all remaining messages
+                if nm > 0:
+                    receiver = 
self._testBaseCluster.createReciever(nodeNumber, self._clusterName, qn, nm)
+                    cnt = 0
+                    while cnt < nm:
+                        rx = receiver.stdout.readline().strip()
+                        if rx == "" and receiver.poll() != None: break
+                        self._rxMsgs[qn].append(rx)
+                        cnt = cnt + 1
+                    if wait:
+                        receiver.wait()
+                    self._lastNode = nodeNumber
+        
+        def receiveRemainingMsgs(self, nodeNumber = None, queueNameList = 
None, wait = True):
+            """Receive all remaining messages on named queue."""
+            self.receiveMsgs(None, nodeNumber, queueNameList, wait)
+        
+        def checkMsgs(self):
+            """Return True if all expected messages have been received (ie the 
transmit and receive list are identical)."""
+            txMsgTot = 0
+            rxMsgTot = 0
+            for qn, txMsgList in self._txMsgs.iteritems():
+                rxMsgList = self._rxMsgs[qn]
+                txMsgTot = txMsgTot + len(txMsgList)
+                rxMsgTot = rxMsgTot + len(rxMsgList)
+                if len(txMsgList) != len(rxMsgList):
+                    return False
+                for i, m in enumerate(txMsgList):
+                    if m != rxMsgList[i]:
+                        return False
+            if txMsgTot == 0 and rxMsgTot == 0:
+                print "WARNING: No messages were either sent or received"
+            return True
+        
+        def finalizeTest(self):
+            """Recover all the remaining messages on all queues, then check 
that all expected messages were received."""
+            self.receiveRemainingMsgs()
+            self._testBaseCluster.stopCheckAll()
+            if not self.checkMsgs():
+                self._testBaseCluster.fail("Send - receive message mismatch")
+                self.printMsgs()                
+        
+        def printMsgs(self, txMsgs = True, rxMsgs = True):
+            """Print all messages transmitted and received."""
+            for qn, txMsgList in self._txMsgs.iteritems():
+                print "Queue: %s" % qn
+                if txMsgs:
+                    print "  txMsgList = %s" % txMsgList
+                if rxMsgs:
+                    rxMsgList = self._rxMsgs[qn]
+                    print "  rxMsgList = %s" % rxMsgList
+
+
+    class DirectExchangeTestHelper(TestHelper):
+        
+        def __init__(self, testBaseCluster, clusterName, numNodes, 
exchangeName, queueNameList):
+            TestBaseCluster.TestHelper.__init__(self, testBaseCluster, 
clusterName, numNodes, exchangeName, queueNameList)
+            self._testBaseCluster.addExchange(0, clusterName, "direct", 
exchangeName)
+            for qn in queueNameList:
+                self._bindQueue(qn, qn)
+        
+        def addQueues(self, queueNameList):
+            self._addQueues(queueNameList)
+            for qn in queueNameList:
+                self._bindQueue(qn, qn)
+                
+        def sendMsgs(self, numMsgs, nodeNumber = None, queueNameList = None, 
msgSize = None, wait = True):
+            if queueNameList == None:
+                queueNameList = self._txMsgs.iterkeys()
+            for qn in queueNameList:
+                
self._txMsgs[qn].extend(TestBaseCluster.TestHelper.sendMsgs(self, qn, numMsgs, 
nodeNumber, msgSize, wait))
+
+
+    class TopicExchangeTestHelper(TestHelper):
+        
+        def __init__(self, testBaseCluster, clusterName, numNodes, 
exchangeName, queueNameKeyList):
+            self._queueNameKeyList = queueNameKeyList
+            TestBaseCluster.TestHelper.__init__(self, testBaseCluster, 
clusterName, numNodes, exchangeName, queueNameKeyList.iterkeys())
+            self._testBaseCluster.addExchange(0, clusterName, "topic", 
exchangeName)
+            for qn, bk in queueNameKeyList.iteritems():
+                self._bindQueue(qn, bk)                
+        
+        def addQueues(self, queueNameKeyList):
+            self._addQueues(queueNameKeyList.iterkeys())
+            for qn, bk in queueNameKeyList.iteritems():
+                self._bindQueue(qn, bk)
+        
+        def _prepareRegex(self, bk):
+            # This regex conversion is not very complete - there are other 
chars that should be escaped too
+            return "^%s$" % bk.replace(".", r"\.").replace("*", 
r"[^.]*").replace("#", ".*")
+        
+        def sendMsgs(self, routingKey, numMsgs, nodeNumber = None, msgSize = 
None, wait = True):
+            msgList = TestBaseCluster.TestHelper.sendMsgs(self, routingKey, 
numMsgs, nodeNumber, msgSize, wait)
+            for qn, bk in self._queueNameKeyList.iteritems():
+                if re.match(self._prepareRegex(bk), routingKey):
+                    self._txMsgs[qn].extend(msgList)
+
+
+    class FanoutExchangeTestHelper(TestHelper):
+        
+        def __init__(self, testBaseCluster, clusterName, numNodes, 
exchangeName, queueNameList):
+            TestBaseCluster.TestHelper.__init__(self, testBaseCluster, 
clusterName, numNodes, exchangeName, queueNameList)
+            self._testBaseCluster.addExchange(0, clusterName, "fanout", 
exchangeName)
+            for qn in queueNameList:
+                self._bindQueue(qn, "")
+        
+        def addQueues(self, queueNameList):
+            self._addQueues(queueNameList)
+            for qn in queueNameList:
+                self._bindQueue(qn, "")
+                
+        def sendMsgs(self, numMsgs, nodeNumber = None, msgSize = None, wait = 
True):
+            msgList = TestBaseCluster.TestHelper.sendMsgs(self, "", numMsgs, 
nodeNumber, msgSize, wait)
+            for ml in self._txMsgs.itervalues():
+                ml.extend(msgList) 
             
\ No newline at end of file



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:[email protected]

Reply via email to