Author: aconway
Date: Mon Feb 13 16:18:53 2012
New Revision: 1243585

URL: http://svn.apache.org/viewvc?rev=1243585&view=rev
Log:
QPID-3603: Added failover test for HA brokers.

Disabled: failing due to known issue in current code, enable when fixed.

Modified:
    qpid/branches/qpid-3603-2/qpid/cpp/src/tests/brokertest.py
    qpid/branches/qpid-3603-2/qpid/cpp/src/tests/ha_tests.py

Modified: qpid/branches/qpid-3603-2/qpid/cpp/src/tests/brokertest.py
URL: 
http://svn.apache.org/viewvc/qpid/branches/qpid-3603-2/qpid/cpp/src/tests/brokertest.py?rev=1243585&r1=1243584&r2=1243585&view=diff
==============================================================================
--- qpid/branches/qpid-3603-2/qpid/cpp/src/tests/brokertest.py (original)
+++ qpid/branches/qpid-3603-2/qpid/cpp/src/tests/brokertest.py Mon Feb 13 
16:18:53 2012
@@ -658,10 +658,14 @@ class NumberedReceiver(Thread):
         except Exception:
             self.error = RethrownException(self.receiver.pname)
 
+    def check(self):
+        """Raise an exception if there has been an error"""
+        if self.error: raise self.error
+
     def stop(self):
         """Returns when termination message is received"""
         join(self)
-        if self.error: raise self.error
+        self.check()
 
 class ErrorGenerator(StoppableThread):
     """

Modified: qpid/branches/qpid-3603-2/qpid/cpp/src/tests/ha_tests.py
URL: 
http://svn.apache.org/viewvc/qpid/branches/qpid-3603-2/qpid/cpp/src/tests/ha_tests.py?rev=1243585&r1=1243584&r2=1243585&view=diff
==============================================================================
--- qpid/branches/qpid-3603-2/qpid/cpp/src/tests/ha_tests.py (original)
+++ qpid/branches/qpid-3603-2/qpid/cpp/src/tests/ha_tests.py Mon Feb 13 
16:18:53 2012
@@ -52,7 +52,6 @@ class HaBroker(Broker):
 class ShortTests(BrokerTest):
     """Short HA functionality tests."""
 
-    # FIXME aconway 2011-11-15: work around async configuration replication.
     # Wait for an address to become valid.
     def wait(self, session, address):
         def check():
@@ -62,7 +61,6 @@ class ShortTests(BrokerTest):
             except NotFound: return False
         assert retry(check), "Timed out waiting for %s"%(address)
 
-    # FIXME aconway 2012-01-23: work around async configuration replication.
     # Wait for address to become valid on a backup broker.
     def wait_backup(self, backup, address):
         bs = self.connect_admin(backup).session()
@@ -114,15 +112,14 @@ class ShortTests(BrokerTest):
             us = primary.connect_old().session(str(qpid.datatypes.uuid4()))
             us.exchange_unbind(exchange=prefix+"e4", binding_key="", 
queue=prefix+"q4")
             p.sender(prefix+"e4").send(Message("drop1")) # Should be dropped
-            # FIXME aconway 2011-11-24: need a marker so we can wait till sync 
is done.
+            # Need a marker so we can wait till sync is done.
             p.sender(queue(prefix+"x", "configuration"))
 
         def verify(b, prefix, p):
             """Verify setup was replicated to backup b"""
 
-            # FIXME aconway 2011-11-21: wait for configuration to replicate.
+            # Wait for configuration to replicate.
             self.wait(b, prefix+"x");
-            # FIXME aconway 2011-11-24: assert_browse_retry to deal with async 
replication.
             self.assert_browse_retry(b, prefix+"q1", ["b", "1", "4"])
 
             self.assertEqual(p.receiver(prefix+"q1").fetch(timeout=0).content, 
"b")
@@ -295,13 +292,64 @@ class ShortTests(BrokerTest):
         brokers[0].connect().session().sender(
             "q;{create:always,%s}"%(self.qpid_replicate())).send("a")
         for b in brokers[1:]: self.assert_browse_backup(b, "q", ["a"])
-        # FIXME aconway 2012-01-30: failing - not using set URL?
         brokers[0].kill()
         brokers[2].promote()            # c must fail over to b.
         brokers[2].connect().session().sender("q").send("b")
         self.assert_browse_backup(brokers[1], "q", ["a","b"])
         for b in brokers[1:]: b.kill()
 
+class LongTests(BrokerTest):
+    """Tests that can run for a long time if -DDURATION=<minutes> is set"""
+
+    def duration(self):
+        d = self.config.defines.get("DURATION")
+        if d: return float(d)*60
+        else: return 3                  # Default is to be quick
+
+
+    def disable_test_failover(self):
+        """Test failover with continuous send-receive"""
+        # FIXME aconway 2012-02-03: fails due to dropped messages,
+        # known issue: sending messages to new primary before
+        # backups are ready.
+
+        # Start a cluster, all members will be killed during the test.
+        brokers = [ HaBroker(self, name=name, expect=EXPECT_EXIT_FAIL)
+                    for name in ["ha0","ha1","ha2"] ]
+        url = ",".join([b.host_port() for b in brokers])
+        for b in brokers: b.set_broker_url(url)
+        brokers[0].promote()
+
+        # Start sender and receiver threads
+        sender = NumberedSender(brokers[0], max_depth=1000, 
failover_updates=False)
+        receiver = NumberedReceiver(brokers[0], sender=sender, 
failover_updates=False)
+        receiver.start()
+        sender.start()
+        # Wait for sender & receiver to get up and running
+        assert retry(lambda: receiver.received > 100)
+        # Kill and restart brokers in a cycle:
+        endtime = time.time() + self.duration()
+        i = 0
+        while time.time() < endtime or i < 3: # At least 3 iterations
+            sender.sender.assert_running()
+            receiver.receiver.assert_running()
+            port = brokers[i].port()
+            brokers[i].kill()
+            brokers.append(
+                HaBroker(self, name="ha%d"%(i+3), broker_url=url, port=port,
+                         expect=EXPECT_EXIT_FAIL))
+            i += 1
+            brokers[i].promote()
+            n = receiver.received       # Verify we're still running
+            def enough():
+                receiver.check()        # Verify no exceptions
+                return receiver.received > n + 100
+            assert retry(enough, timeout=5)
+
+        sender.stop()
+        receiver.stop()
+        for b in brokers[i:]: b.kill()
+
 if __name__ == "__main__":
     shutil.rmtree("brokertest.tmp", True)
     os.execvp("qpid-python-test", ["qpid-python-test", "-m", "ha_tests"] + 
sys.argv[1:])



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:[email protected]

Reply via email to