Author: aconway
Date: Wed Jan 25 18:48:55 2012
New Revision: 1235871

URL: http://svn.apache.org/viewvc?rev=1235871&view=rev
Log:
QPID-3603: Test to verify C++ client failover is working.

- TcpConnector: set identifier early so it is available in error messages.

Modified:
    qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/client/TCPConnector.cpp
    qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/ha/ConnectionExcluder.cpp
    qpid/branches/qpid-3603-2/qpid/cpp/src/tests/brokertest.py
    qpid/branches/qpid-3603-2/qpid/cpp/src/tests/cluster_tests.py
    qpid/branches/qpid-3603-2/qpid/cpp/src/tests/ha_tests.py

Modified: qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/client/TCPConnector.cpp
URL: 
http://svn.apache.org/viewvc/qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/client/TCPConnector.cpp?rev=1235871&r1=1235870&r2=1235871&view=diff
==============================================================================
--- qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/client/TCPConnector.cpp 
(original)
+++ qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/client/TCPConnector.cpp Wed Jan 
25 18:48:55 2012
@@ -97,7 +97,7 @@ void TCPConnector::connect(const std::st
         boost::bind(&TCPConnector::connected, this, _1),
         boost::bind(&TCPConnector::connectFailed, this, _3));
     closed = false;
-
+    identifier = str(format("[%1%]") % socket.getFullAddress());
     connector->start(poller);
 }
 
@@ -120,8 +120,6 @@ void TCPConnector::start(sys::AsynchIO* 
     for (int i = 0; i < 4; i++) {
         aio->queueReadBuffer(new Buff(maxFrameSize));
     }
-
-    identifier = str(format("[%1%]") % socket.getFullAddress());
 }
 
 void TCPConnector::initAmqp() {
@@ -131,7 +129,7 @@ void TCPConnector::initAmqp() {
 
 void TCPConnector::connectFailed(const std::string& msg) {
     connector = 0;
-    QPID_LOG(warning, "Connect failed: " << msg);
+    QPID_LOG(warning, "Connect failed: " << msg << " " << identifier);
     socket.close();
     if (!closed)
         closed = true;
@@ -185,7 +183,7 @@ sys::ShutdownHandler* TCPConnector::getS
     return shutdownHandler;
 }
 
-const std::string& TCPConnector::getIdentifier() const { 
+const std::string& TCPConnector::getIdentifier() const {
     return identifier;
 }
 

Modified: qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/ha/ConnectionExcluder.cpp
URL: 
http://svn.apache.org/viewvc/qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/ha/ConnectionExcluder.cpp?rev=1235871&r1=1235870&r2=1235871&view=diff
==============================================================================
--- qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/ha/ConnectionExcluder.cpp 
(original)
+++ qpid/branches/qpid-3603-2/qpid/cpp/src/qpid/ha/ConnectionExcluder.cpp Wed 
Jan 25 18:48:55 2012
@@ -34,8 +34,6 @@ void ConnectionExcluder::opened(broker::
         && !connection.getClientProperties().isSet(ADMIN_TAG))
         throw Exception(
             QPID_MSG("HA: Backup broker rejected connection " << 
connection.getMgmtId()));
-    else 
-        QPID_LOG(debug, "HA: Backup broker accepted connection" << 
connection.getMgmtId());
 }
 
 const std::string ConnectionExcluder::ADMIN_TAG="qpid.ha-admin";

Modified: qpid/branches/qpid-3603-2/qpid/cpp/src/tests/brokertest.py
URL: 
http://svn.apache.org/viewvc/qpid/branches/qpid-3603-2/qpid/cpp/src/tests/brokertest.py?rev=1235871&r1=1235870&r2=1235871&view=diff
==============================================================================
--- qpid/branches/qpid-3603-2/qpid/cpp/src/tests/brokertest.py (original)
+++ qpid/branches/qpid-3603-2/qpid/cpp/src/tests/brokertest.py Wed Jan 25 
18:48:55 2012
@@ -198,16 +198,17 @@ class Popen(subprocess.Popen):
                 os.kill( self.pid , signal.SIGTERM)
             except AttributeError: # no os.kill, using taskkill.. (Windows 
only)
                 os.popen('TASKKILL /PID ' +str(self.pid) + ' /F')
-        self._cleanup()
+        self.wait()
 
     def kill(self):
-        try: subprocess.Popen.kill(self)
+        try:
+            subprocess.Popen.kill(self)
         except AttributeError:          # No terminate method
             try:
                 os.kill( self.pid , signal.SIGKILL)
             except AttributeError: # no os.kill, using taskkill.. (Windows 
only)
                 os.popen('TASKKILL /PID ' +str(self.pid) + ' /F')
-        self._cleanup()
+        self.wait()
 
     def _cleanup(self):
         """Clean up after a dead process"""
@@ -555,22 +556,22 @@ class NumberedSender(Thread):
     """
 
     def __init__(self, broker, max_depth=None, queue="test-queue",
-                 connection_options=Cluster.CONNECTION_OPTIONS):
+                 connection_options=Cluster.CONNECTION_OPTIONS,
+                 failover_updates=True, url=None):
         """
         max_depth: enable flow control, ensure sent - received <= max_depth.
         Requires self.notify_received(n) to be called each time messages are 
received.
         """
         Thread.__init__(self)
+        cmd = ["qpid-send",
+             "--broker", url or broker.host_port(),
+               "--address", "%s;{create:always}"%queue,
+               "--connection-options", "{%s}"%(connection_options),
+               "--content-stdin"
+               ]
+        if failover_updates: cmd += ["--failover-updates"]
         self.sender = broker.test.popen(
-            ["qpid-send",
-             "--broker", "localhost:%s"%broker.port(),
-             "--address", "%s;{create:always}"%queue,
-             "--failover-updates",
-             "--connection-options", "{%s}"%(connection_options),
-             "--content-stdin"
-             ],
-            expect=EXPECT_RUNNING,
-            stdin=PIPE)
+            cmd, expect=EXPECT_RUNNING, stdin=PIPE)
         self.condition = Condition()
         self.max = max_depth
         self.received = 0
@@ -617,30 +618,31 @@ class NumberedReceiver(Thread):
     Thread to run a receiver client and verify it receives
     sequentially numbered messages.
     """
-    def __init__(self, broker, sender = None, queue="test-queue",
-                 connection_options=Cluster.CONNECTION_OPTIONS):
+    def __init__(self, broker, sender=None, queue="test-queue",
+                 connection_options=Cluster.CONNECTION_OPTIONS,
+                 failover_updates=True, url=None):
         """
         sender: enable flow control. Call sender.received(n) for each message 
received.
         """
         Thread.__init__(self)
         self.test = broker.test
+        cmd = ["qpid-receive",
+               "--broker", url or broker.host_port(),
+               "--address", "%s;{create:always}"%queue,
+               "--connection-options", "{%s}"%(connection_options),
+               "--forever"
+               ]
+        if failover_updates: cmd += [ "--failover-updates" ]
         self.receiver = self.test.popen(
-            ["qpid-receive",
-             "--broker", "localhost:%s"%broker.port(),
-             "--address", "%s;{create:always}"%queue,
-             "--failover-updates",
-             "--connection-options", "{%s}"%(connection_options),
-             "--forever"
-             ],
-            expect=EXPECT_RUNNING,
-            stdout=PIPE)
+            cmd, expect=EXPECT_RUNNING, stdout=PIPE)
         self.lock = Lock()
         self.error = None
         self.sender = sender
         self.received = 0
 
     def read_message(self):
-        return int(self.receiver.stdout.readline())
+        n = int(self.receiver.stdout.readline())
+        return n
 
     def run(self):
         try:

Modified: qpid/branches/qpid-3603-2/qpid/cpp/src/tests/cluster_tests.py
URL: 
http://svn.apache.org/viewvc/qpid/branches/qpid-3603-2/qpid/cpp/src/tests/cluster_tests.py?rev=1235871&r1=1235870&r2=1235871&view=diff
==============================================================================
--- qpid/branches/qpid-3603-2/qpid/cpp/src/tests/cluster_tests.py (original)
+++ qpid/branches/qpid-3603-2/qpid/cpp/src/tests/cluster_tests.py Wed Jan 25 
18:48:55 2012
@@ -1046,8 +1046,8 @@ class LongTests(BrokerTest):
 
         # Start sender and receiver threads
         cluster[0].declare_queue("test-queue")
-        sender = NumberedSender(cluster[0], 1000) # Max queue depth
-        receiver = NumberedReceiver(cluster[0], sender)
+        sender = NumberedSender(cluster[0], max_depth=1000)
+        receiver = NumberedReceiver(cluster[0], sender=sender)
         receiver.start()
         sender.start()
         # Wait for sender & receiver to get up and running

Modified: qpid/branches/qpid-3603-2/qpid/cpp/src/tests/ha_tests.py
URL: 
http://svn.apache.org/viewvc/qpid/branches/qpid-3603-2/qpid/cpp/src/tests/ha_tests.py?rev=1235871&r1=1235870&r2=1235871&view=diff
==============================================================================
--- qpid/branches/qpid-3603-2/qpid/cpp/src/tests/ha_tests.py (original)
+++ qpid/branches/qpid-3603-2/qpid/cpp/src/tests/ha_tests.py Wed Jan 25 
18:48:55 2012
@@ -22,7 +22,7 @@ import os, signal, sys, time, imp, re, s
 from qpid.messaging import Message, NotFound, ConnectionError, Connection
 from brokertest import *
 from threading import Thread, Lock, Condition
-from logging import getLogger
+from logging import getLogger, WARN, ERROR, DEBUG
 
 
 log = getLogger("qpid.ha-tests")
@@ -39,7 +39,8 @@ class ShortTests(BrokerTest):
                                   ] + args,
                       **kwargs)
 
-    # FIXME aconway 2011-11-15: work around async replication.
+    # FIXME aconway 2011-11-15: work around async wiring replication.
+    # Wait for an address to become valid.
     def wait(self, session, address):
         def check():
             try:
@@ -48,6 +49,13 @@ class ShortTests(BrokerTest):
             except NotFound: return False
         assert retry(check), "Timed out waiting for %s"%(address)
 
+    # FIXME aconway 2012-01-23: workaround: we need to give the
+    # backup a chance to attach to the queue.
+    def wait_backup(self, backup, address):
+        bs = self.connect_admin(backup).session()
+        self.wait(bs, address)
+        bs.connection.close()
+
     def set_ha_status(self, address, status):
         os.system("qpid-ha-status %s %s"%(address, status))
 
@@ -209,7 +217,8 @@ class ShortTests(BrokerTest):
             raise
 
     def test_failover(self):
-        """Verify that backups rejects connections and that fail-over works"""
+        """Verify that backups rejects connections and that fail-over works in 
python client"""
+        getLogger().setLevel(ERROR) # Disable WARNING log messages due to 
failover
         primary = self.ha_broker(name="primary", expect=EXPECT_EXIT_FAIL, 
broker_url="primary") # Temp hack to identify primary
         backup = self.ha_broker(name="backup", broker_url=primary.host_port())
         # Check that backup rejects normal connections
@@ -223,20 +232,36 @@ class ShortTests(BrokerTest):
         # Test discovery: should connect to primary after reject by backup
         c = backup.connect(reconnect_urls=[primary.host_port(), 
backup.host_port()], reconnect=True)
         s = c.session()
-        s.sender("q;{create:always,%s}"%(self.qpid_replicate())).send("foo", 
sync=True)
-        # FIXME aconway 2012-01-23: we shouldn't need the wait and retry here,
-        # send(sync=True) shouldn't return till the backup acknowledges.
-        bs = self.connect_admin(backup).session()
-        self.wait(bs, "q")
-        self.assert_browse_retry(bs, "q", ["foo"])
-        bs.connection.close()
+        sender = s.sender("q;{create:always,%s}"%(self.qpid_replicate()))
+        self.wait_backup(backup, "q")
+        sender.send("foo")
+        primary.kill()
+        assert retry(lambda: not is_running(primary.pid))
+        self.set_ha_status(backup.host_port(), "primary")         # Promote 
the backup
+        self.assert_browse_retry(s, "q", ["foo"])
+        c.close()
+
+    def test_failover_cpp(self):
+        primary = self.ha_broker(name="primary", expect=EXPECT_EXIT_FAIL, 
broker_url="primary") # Temp hack to identify primary
+        backup = self.ha_broker(name="backup", broker_url=primary.host_port())
+        url="%s,%s"%(primary.host_port(), backup.host_port())
+        
primary.connect().session().sender("q;{create:always,%s}"%(self.qpid_replicate()))
+        self.wait_backup(backup, "q")
+
+        sender = NumberedSender(primary, url=url, queue="q", failover_updates 
= False)
+        receiver = NumberedReceiver(primary, url=url, queue="q", 
failover_updates = False)
+        receiver.start()
+        sender.start()
+        self.wait_backup(backup, "q")
+        assert retry(lambda: receiver.received > 10) # Wait for some messages 
to get thru
 
         primary.kill()
-        # Promote the backup
+        assert retry(lambda: not is_running(primary.pid)) # Wait for primary 
to die
         self.set_ha_status(backup.host_port(), "primary")
-        # FIXME aconway 2012-01-23: should re-use session s below
-        self.assert_browse_retry(c.session(), "q", ["foo"])
-        c.close()
+        n = receiver.received       # Make sure we are still running
+        assert retry(lambda: receiver.received > n + 10)
+        sender.stop()
+        receiver.stop()
 
 if __name__ == "__main__":
     shutil.rmtree("brokertest.tmp", True)



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:[email protected]

Reply via email to