http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/88a12e8c/bdbstore/systests/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/MultiNodeTest.java ---------------------------------------------------------------------- diff --git a/bdbstore/systests/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/MultiNodeTest.java b/bdbstore/systests/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/MultiNodeTest.java index b80f002..ad9df21 100644 --- a/bdbstore/systests/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/MultiNodeTest.java +++ b/bdbstore/systests/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/MultiNodeTest.java @@ -19,20 +19,33 @@ */ package org.apache.qpid.server.store.berkeleydb.replication; +import static junit.framework.TestCase.assertEquals; +import static org.apache.qpid.systests.Utils.INDEX; +import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.CoreMatchers.is; +import static org.hamcrest.CoreMatchers.nullValue; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertThat; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + import java.io.File; import java.net.URI; import java.nio.file.Files; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collections; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.UUID; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; +import java.util.stream.Collectors; import javax.jms.Connection; import javax.jms.Destination; @@ -48,11 +61,11 @@ import com.sleepycat.je.Durability; import com.sleepycat.je.EnvironmentConfig; import com.sleepycat.je.rep.ReplicatedEnvironment; import com.sleepycat.je.rep.ReplicationConfig; +import org.junit.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.qpid.server.model.Broker; -import org.apache.qpid.server.model.Protocol; import org.apache.qpid.server.model.State; import org.apache.qpid.server.util.FileUtils; import org.apache.qpid.server.virtualhostnode.berkeleydb.BDBHARemoteReplicationNode; @@ -60,120 +73,94 @@ import org.apache.qpid.server.virtualhostnode.berkeleydb.BDBHAVirtualHostNode; import org.apache.qpid.server.virtualhostnode.berkeleydb.NodeRole; import org.apache.qpid.systests.ConnectionBuilder; import org.apache.qpid.systests.GenericConnectionListener; -import org.apache.qpid.test.utils.QpidBrokerTestCase; +import org.apache.qpid.systests.Utils; +import org.apache.qpid.test.utils.PortHelper; import org.apache.qpid.test.utils.TestUtils; +import org.apache.qpid.tests.utils.ConfigItem; +import org.apache.qpid.tests.utils.RunBrokerAdmin; -public class MultiNodeTest extends QpidBrokerTestCase +@RunBrokerAdmin(type = "BDB-HA") +@GroupConfig(numberOfNodes = 3, groupName = "test") +@ConfigItem(name = Broker.BROKER_FAIL_STARTUP_WITH_ERRORED_CHILD, value = "false") +public class MultiNodeTest extends GroupJmsTestBase { private static final Logger LOGGER = LoggerFactory.getLogger(MultiNodeTest.class); - private static final String VIRTUAL_HOST = "test"; - private static final int NUMBER_OF_NODES = 3; - private static final int FAILOVER_COMPLETION_TIMEOUT = 60000; - - private GroupCreator _groupCreator; - - private FailoverAwaitingListener _failoverListener; - - /** Used when expectation is client will (re)-connect */ - private ConnectionBuilder _positiveFailoverBuilder; - - /** Used when expectation is client will not (re)-connect */ - private ConnectionBuilder _negativeFailoverBuilder; - - @Override - protected void setUp() throws Exception - { - assertTrue(isJavaBroker()); - assertTrue(isBrokerStorePersistent()); - _groupCreator = new GroupCreator(this, VIRTUAL_HOST, NUMBER_OF_NODES); - _groupCreator.configureClusterNodes(); + private FailoverAwaitingListener _failoverListener = new FailoverAwaitingListener(); - _positiveFailoverBuilder = _groupCreator.getConnectionBuilderForAllClusterNodes(); - _negativeFailoverBuilder = _groupCreator.getConnectionBuilderForAllClusterNodes(200, 2); - - _groupCreator.startCluster(); - _failoverListener = new FailoverAwaitingListener(); - - super.setUp(); - } - - @Override - public void startDefaultBroker() throws Exception - { - // Don't start default broker provided by QBTC. - } + private static final int FAILOVER_COMPLETION_TIMEOUT = 60000; + @Test public void testLossOfMasterNodeCausesClientToFailover() throws Exception { - final Connection connection = _positiveFailoverBuilder.build(); - getJmsProvider().addGenericConnectionListener(connection, _failoverListener); + final Connection connection = getConnectionBuilder().build(); + try + { + getJmsProvider().addGenericConnectionListener(connection, _failoverListener); - final int activeBrokerPort = _groupCreator.getBrokerPortNumberFromConnection(connection); - LOGGER.info("Active connection port {}", activeBrokerPort); + final int masterPort = getJmsProvider().getConnectedURI(connection).getPort(); + LOGGER.info("Active connection port {}", masterPort); - _groupCreator.stopNode(activeBrokerPort); - LOGGER.info("Node is stopped"); - _failoverListener.awaitFailoverCompletion(FAILOVER_COMPLETION_TIMEOUT); - LOGGER.info("Listener has finished"); - // any op to ensure connection remains - connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + getBrokerAdmin().stopNode(masterPort); + LOGGER.info("Node is stopped"); + _failoverListener.awaitFailoverCompletion(FAILOVER_COMPLETION_TIMEOUT); + LOGGER.info("Listener has finished"); + // any op to ensure connection remains + connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + } + finally + { + connection.close(); + } } + @Test public void testLossOfReplicaNodeDoesNotCauseClientToFailover() throws Exception { - final Connection connection = _positiveFailoverBuilder.build(); - getJmsProvider().addGenericConnectionListener(connection, _failoverListener); - - final int activeBrokerPort = _groupCreator.getBrokerPortNumberFromConnection(connection); - LOGGER.info("Active connection port {}", activeBrokerPort); + final Connection connection = getConnectionBuilder().build(); + try + { + getJmsProvider().addGenericConnectionListener(connection, _failoverListener); - final int inactiveBrokerPort = _groupCreator.getPortNumberOfAnInactiveBroker(connection); - LOGGER.info("Stopping inactive broker on port {} ", inactiveBrokerPort); + final int activeBrokerPort = getJmsProvider().getConnectedURI(connection).getPort(); + LOGGER.info("Active connection port {}", activeBrokerPort); - _groupCreator.stopNode(inactiveBrokerPort); + final int inactiveBrokerPort = getBrokerAdmin().getAmqpPort(activeBrokerPort); + LOGGER.info("Stopping inactive broker on port {} ", inactiveBrokerPort); - _failoverListener.assertNoFailoverCompletionWithin(2000); + getBrokerAdmin().stopNode(inactiveBrokerPort); - // any op to ensure connection remains - connection.createSession(false, Session.AUTO_ACKNOWLEDGE); - } + _failoverListener.assertNoFailoverCompletionWithin(2000); - public void testLossOfQuorumCausesClientDisconnection() throws Exception - { - if (getBrokerProtocol().equals(Protocol.AMQP_1_0)) - { - // TODO - QPIDJMS-366 - there seems to be a client defect when a JMS operation is interrupted - // by a graceful connection close from the client side. - return; + // any op to ensure connection remains + connection.createSession(false, Session.AUTO_ACKNOWLEDGE); } - - final Connection connection = _negativeFailoverBuilder.build(); - Session session = connection.createSession(true, Session.SESSION_TRANSACTED); - Destination destination = session.createQueue(getTestQueueName()); - getJmsProvider().createQueue(session, getTestQueueName()); - - Set<Integer> ports = _groupCreator.getBrokerPortNumbersForNodes(); - - final int activeBrokerPort = _groupCreator.getBrokerPortNumberFromConnection(connection); - ports.remove(activeBrokerPort); - - // Stop all other nodes - for (Integer p : ports) + finally { - _groupCreator.stopNode(p); + connection.close(); } + } + @Test + public void testLossOfQuorumCausesClientDisconnection() throws Exception + { + final Connection connection = getConnectionBuilder().build(); try { + getJmsProvider().addGenericConnectionListener(connection, _failoverListener); + Set<Integer> ports = + Arrays.stream(getBrokerAdmin().getGroupAmqpPorts()).boxed().collect(Collectors.toSet()); + final int activeBrokerPort = getJmsProvider().getConnectedURI(connection).getPort(); + ports.remove(activeBrokerPort); + + // Stop all other nodes + for (Integer p : ports) + { + getBrokerAdmin().stopNode(p); + } - sendMessage(session, destination, 1); - fail("Exception not thrown - sending message within a transaction should fail without quorum"); - } - catch(JMSException jms) - { - // PASS + _failoverListener.awaitPreFailover(2000); } finally { @@ -184,7 +171,10 @@ public class MultiNodeTest extends QpidBrokerTestCase // New connections should now fail as vhost will be unavailable try { - Connection unexpectedConnection = _negativeFailoverBuilder.build(); + Connection unexpectedConnection = getConnectionBuilder() + .setFailoverReconnectAttempts(SHORT_FAILOVER_CYCLECOUNT) + .setFailoverReconnectDelay(SHORT_FAILOVER_CONNECTDELAY) + .build(); fail("Got unexpected connection to node in group without quorum " + unexpectedConnection); } catch (JMSException je) @@ -198,442 +188,490 @@ public class MultiNodeTest extends QpidBrokerTestCase * test ensures that open messaging transactions are correctly rolled-back as quorum is lost, * and later the node rejoins the group in either master or replica role. */ + @Test public void testQuorumLostAndRestored_OriginalMasterRejoinsTheGroup() throws Exception { - final Connection connection = _positiveFailoverBuilder.build(); - getJmsProvider().addGenericConnectionListener(connection, _failoverListener); + final Connection connection = getConnectionBuilder().build(); + try + { + getJmsProvider().addGenericConnectionListener(connection, _failoverListener); + Destination dest = createTestQueue(connection); - Session session = connection.createSession(true, Session.SESSION_TRANSACTED); - getJmsProvider().createQueue(session, getTestQueueName()); - Destination dest = session.createQueue(getTestQueueName()); - session.close(); + Set<Integer> ports = + Arrays.stream(getBrokerAdmin().getGroupAmqpPorts()).boxed().collect(Collectors.toSet()); - Set<Integer> ports = _groupCreator.getBrokerPortNumbersForNodes(); + final int activeBrokerPort = getJmsProvider().getConnectedURI(connection).getPort(); + ports.remove(activeBrokerPort); - final int activeBrokerPort = _groupCreator.getBrokerPortNumberFromConnection(connection); - ports.remove(activeBrokerPort); + Session session1 = connection.createSession(true, Session.SESSION_TRANSACTED); + Session session2 = connection.createSession(true, Session.SESSION_TRANSACTED); - Session session1 = connection.createSession(true, Session.SESSION_TRANSACTED); - Session session2 = connection.createSession(true, Session.SESSION_TRANSACTED); + session1.createConsumer(dest).close(); - session1.createConsumer(dest).close(); + MessageProducer producer1 = session1.createProducer(dest); + producer1.send(session1.createMessage()); + MessageProducer producer2 = session2.createProducer(dest); + producer2.send(session2.createMessage()); - MessageProducer producer1 = session1.createProducer(dest); - producer1.send(session1.createMessage()); - MessageProducer producer2 = session2.createProducer(dest); - producer2.send(session2.createMessage()); + // Leave transactions open, this will leave two store transactions open on the store - // Leave transactions open, this will leave two store transactions open on the store + // Stop all other nodes + for (Integer p : ports) + { + getBrokerAdmin().stopNode(p); + } - // Stop all other nodes - for (Integer p : ports) - { - _groupCreator.stopNode(p); - } + // Await the old master discovering that it is all alone + getBrokerAdmin().awaitNodeRole(activeBrokerPort, "WAITING"); - // Await the old master discovering that it is all alone - _groupCreator.awaitNodeToAttainRole(activeBrokerPort, "WAITING"); + // Restart all other nodes + for (Integer p : ports) + { + getBrokerAdmin().startNode(p); + } - // Restart all other nodes - for (Integer p : ports) + _failoverListener.awaitFailoverCompletion(FAILOVER_COMPLETION_TIMEOUT); + + getBrokerAdmin().awaitNodeRole(activeBrokerPort, "MASTER", "REPLICA"); + } + finally { - _groupCreator.startNode(p); + connection.close(); } - - _failoverListener.awaitFailoverCompletion(FAILOVER_COMPLETION_TIMEOUT); - - _groupCreator.awaitNodeToAttainRole(activeBrokerPort, "MASTER", "REPLICA"); } + @Test public void testPersistentMessagesAvailableAfterFailover() throws Exception { - final Connection connection = _positiveFailoverBuilder.build(); - getJmsProvider().addGenericConnectionListener(connection, _failoverListener); + final Connection connection = getConnectionBuilder().build(); + try + { + getJmsProvider().addGenericConnectionListener(connection, _failoverListener); + Destination queue = createTestQueue(connection); - Session session = connection.createSession(true, Session.SESSION_TRANSACTED); - getJmsProvider().createQueue(session, getTestQueueName()); - Destination queue = session.createQueue(getTestQueueName()); - session.close(); + final int activeBrokerPort = getJmsProvider().getConnectedURI(connection).getPort(); - final int activeBrokerPort = _groupCreator.getBrokerPortNumberFromConnection(connection); + Session producingSession = connection.createSession(true, Session.SESSION_TRANSACTED); + Utils.sendMessages(producingSession, queue, 10); - Session producingSession = connection.createSession(true, Session.SESSION_TRANSACTED); - sendMessage(producingSession, queue, 10); + getBrokerAdmin().stopNode(activeBrokerPort); + LOGGER.info("Old master (broker port {}) is stopped", activeBrokerPort); - _groupCreator.stopNode(activeBrokerPort); - LOGGER.info("Old master (broker port {}) is stopped", activeBrokerPort); + _failoverListener.awaitFailoverCompletion(FAILOVER_COMPLETION_TIMEOUT); + LOGGER.info("Failover has finished"); - _failoverListener.awaitFailoverCompletion(FAILOVER_COMPLETION_TIMEOUT); - LOGGER.info("Failover has finished"); + final int activeBrokerPortAfterFailover = getJmsProvider().getConnectedURI(connection).getPort(); + LOGGER.info("New master (broker port {}) after failover", activeBrokerPortAfterFailover); - final int activeBrokerPortAfterFailover = _groupCreator.getBrokerPortNumberFromConnection(connection); - LOGGER.info("New master (broker port {}) after failover", activeBrokerPortAfterFailover); + Session consumingSession = connection.createSession(true, Session.SESSION_TRANSACTED); + MessageConsumer consumer = consumingSession.createConsumer(queue); - Session consumingSession = connection.createSession(true, Session.SESSION_TRANSACTED); - MessageConsumer consumer = consumingSession.createConsumer(queue); - - connection.start(); - for(int i = 0; i < 10; i++) + connection.start(); + for (int i = 0; i < 10; i++) + { + Message m = consumer.receive(getReceiveTimeout()); + assertNotNull("Message " + i + " is not received", m); + assertEquals("Unexpected message received", i, m.getIntProperty(INDEX)); + } + consumingSession.commit(); + } + finally { - Message m = consumer.receive(getReceiveTimeout()); - assertNotNull("Message " + i + " is not received", m); - assertEquals("Unexpected message received", i, m.getIntProperty(INDEX)); + connection.close(); } - consumingSession.commit(); } + @Test public void testTransferMasterFromLocalNode() throws Exception { - final Connection connection = _positiveFailoverBuilder.build(); - - Session session = connection.createSession(true, Session.SESSION_TRANSACTED); - getJmsProvider().createQueue(session, getTestQueueName()); - session.close(); + final Connection connection = getConnectionBuilder().build(); + try + { + Destination queue = createTestQueue(connection); - final int activeBrokerPort = _groupCreator.getBrokerPortNumberFromConnection(connection); - LOGGER.info("Active connection port {}", activeBrokerPort); + final int activeBrokerPort = getJmsProvider().getConnectedURI(connection).getPort(); + LOGGER.info("Active connection port {}", activeBrokerPort); - final int inactiveBrokerPort = _groupCreator.getPortNumberOfAnInactiveBroker(connection); - LOGGER.info("Update role attribute on inactive broker on port {}", inactiveBrokerPort); + final int inactiveBrokerPort = getBrokerAdmin().getAmqpPort(activeBrokerPort); + LOGGER.info("Update role attribute on inactive broker on port {}", inactiveBrokerPort); - // transfer mastership 3 times in order to verify - // that repeated mastership transfer to the same node works, See QPID-6996 - transferMasterFromLocalNode(connection, inactiveBrokerPort, activeBrokerPort); - transferMasterFromLocalNode(connection, activeBrokerPort, inactiveBrokerPort); - transferMasterFromLocalNode(connection, inactiveBrokerPort, activeBrokerPort); + // transfer mastership 3 times in order to verify + // that repeated mastership transfer to the same node works, See QPID-6996 + transferMasterFromLocalNode(connection, queue, inactiveBrokerPort, activeBrokerPort); + transferMasterFromLocalNode(connection, queue, activeBrokerPort, inactiveBrokerPort); + transferMasterFromLocalNode(connection, queue, inactiveBrokerPort, activeBrokerPort); + } + finally + { + connection.close(); + } } private void transferMasterFromLocalNode(final Connection connection, + final Destination queue, final int inactiveBrokerPort, final int activeBrokerPort) throws Exception { _failoverListener = new FailoverAwaitingListener(); getJmsProvider().addGenericConnectionListener(connection, _failoverListener); - Map<String, Object> attributes = _groupCreator.getNodeAttributes(inactiveBrokerPort); + Map<String, Object> attributes = getBrokerAdmin().getNodeAttributes(inactiveBrokerPort); assertEquals("Inactive broker has unexpected role", "REPLICA", attributes.get(BDBHAVirtualHostNode.ROLE)); - _groupCreator.setNodeAttributes(inactiveBrokerPort, Collections.singletonMap(BDBHAVirtualHostNode.ROLE, "MASTER")); + getBrokerAdmin().setNodeAttributes(inactiveBrokerPort, + Collections.singletonMap(BDBHAVirtualHostNode.ROLE, "MASTER")); _failoverListener.awaitFailoverCompletion(FAILOVER_COMPLETION_TIMEOUT); LOGGER.info("Listener has finished"); - attributes = _groupCreator.getNodeAttributes(inactiveBrokerPort); + attributes = getBrokerAdmin().getNodeAttributes(inactiveBrokerPort); assertEquals("Inactive broker has unexpected role", "MASTER", attributes.get(BDBHAVirtualHostNode.ROLE)); - assertProducingConsuming(connection); + assertThat(Utils.produceConsume(connection, queue), is(equalTo(true))); - _groupCreator.awaitNodeToAttainRole(activeBrokerPort, "REPLICA"); + getBrokerAdmin().awaitNodeRole(activeBrokerPort, "REPLICA"); } + @Test public void testTransferMasterFromRemoteNode() throws Exception { - final Connection connection = _positiveFailoverBuilder.build(); - - Session session = connection.createSession(true, Session.SESSION_TRANSACTED); - getJmsProvider().createQueue(session, getTestQueueName()); - session.close(); - - final int activeBrokerPort = _groupCreator.getBrokerPortNumberFromConnection(connection); - LOGGER.info("Active connection port {}", activeBrokerPort); - - final int inactiveBrokerPort = _groupCreator.getPortNumberOfAnInactiveBroker(connection); - LOGGER.info("Update role attribute on inactive broker on port {}", inactiveBrokerPort); - - // transfer mastership 3 times in order to verify - // that repeated mastership transfer to the same node works, See QPID-6996 - transferMasterFromRemoteNode(connection, activeBrokerPort, inactiveBrokerPort); - transferMasterFromRemoteNode(connection, inactiveBrokerPort, activeBrokerPort); - transferMasterFromRemoteNode(connection, activeBrokerPort, inactiveBrokerPort); + final Connection connection = getConnectionBuilder().build(); + try + { + Destination queue = createTestQueue(connection); + final int activeBrokerPort = getJmsProvider().getConnectedURI(connection).getPort(); + LOGGER.info("Active connection port {}", activeBrokerPort); + + final int inactiveBrokerPort = getBrokerAdmin().getAmqpPort(activeBrokerPort); + LOGGER.info("Update role attribute on inactive broker on port {}", inactiveBrokerPort); + + // transfer mastership 3 times in order to verify + // that repeated mastership transfer to the same node works, See QPID-6996 + transferMasterFromRemoteNode(connection, queue, activeBrokerPort, inactiveBrokerPort); + transferMasterFromRemoteNode(connection, queue, inactiveBrokerPort, activeBrokerPort); + transferMasterFromRemoteNode(connection, queue, activeBrokerPort, inactiveBrokerPort); + } + finally + { + connection.close(); + } } private void transferMasterFromRemoteNode(final Connection connection, + final Destination queue, final int activeBrokerPort, final int inactiveBrokerPort) throws Exception { _failoverListener = new FailoverAwaitingListener(); getJmsProvider().addGenericConnectionListener(connection, _failoverListener); - _groupCreator.awaitNodeToAttainRole(activeBrokerPort, inactiveBrokerPort, "REPLICA"); - Map<String, Object> attributes = _groupCreator.getNodeAttributes(activeBrokerPort, inactiveBrokerPort); + getBrokerAdmin().awaitRemoteNodeRole(activeBrokerPort, inactiveBrokerPort, "REPLICA"); + Map<String, Object> attributes = getBrokerAdmin().getRemoteNodeAttributes(activeBrokerPort, inactiveBrokerPort); assertEquals("Inactive broker has unexpected role", "REPLICA", attributes.get(BDBHAVirtualHostNode.ROLE)); - _groupCreator.setNodeAttributes(activeBrokerPort, inactiveBrokerPort, Collections.singletonMap(BDBHAVirtualHostNode.ROLE, "MASTER")); + getBrokerAdmin().setRemoteNodeAttributes(activeBrokerPort, + inactiveBrokerPort, + Collections.singletonMap(BDBHAVirtualHostNode.ROLE, "MASTER")); _failoverListener.awaitFailoverCompletion(FAILOVER_COMPLETION_TIMEOUT); LOGGER.info("Listener has finished"); - attributes = _groupCreator.getNodeAttributes(inactiveBrokerPort); + attributes = getBrokerAdmin().getNodeAttributes(inactiveBrokerPort); assertEquals("Inactive broker has unexpected role", "MASTER", attributes.get(BDBHAVirtualHostNode.ROLE)); - assertProducingConsuming(connection); + assertThat(Utils.produceConsume(connection, queue), is(equalTo(true))); - _groupCreator.awaitNodeToAttainRole(activeBrokerPort, "REPLICA"); + getBrokerAdmin().awaitNodeRole(activeBrokerPort, "REPLICA"); } + + @Test public void testTransferMasterWhilstMessagesInFlight() throws Exception { - final Connection connection = _positiveFailoverBuilder.build(); - getJmsProvider().addGenericConnectionListener(connection, _failoverListener); - Session session = connection.createSession(true, Session.SESSION_TRANSACTED); - getJmsProvider().createQueue(session, getTestQueueName()); - final Destination destination = session.createQueue(getTestQueueName()); - - final AtomicBoolean masterTransferred = new AtomicBoolean(false); - final AtomicBoolean keepRunning = new AtomicBoolean(true); - final AtomicReference<Exception> workerException = new AtomicReference<>(); - final CountDownLatch producedOneBefore = new CountDownLatch(1); - final CountDownLatch producedOneAfter = new CountDownLatch(1); - final CountDownLatch workerShutdown = new CountDownLatch(1); - - Runnable producer = () -> { - try - { - int count = 0; - MessageProducer producer1 = session.createProducer(destination); + final Connection connection = getConnectionBuilder().build(); + try + { + getJmsProvider().addGenericConnectionListener(connection, _failoverListener); + + final Destination destination = createTestQueue(connection); - while (keepRunning.get()) + final AtomicBoolean masterTransferred = new AtomicBoolean(false); + final AtomicBoolean keepRunning = new AtomicBoolean(true); + final AtomicReference<Exception> workerException = new AtomicReference<>(); + final CountDownLatch producedOneBefore = new CountDownLatch(1); + final CountDownLatch producedOneAfter = new CountDownLatch(1); + final CountDownLatch workerShutdown = new CountDownLatch(1); + + Session session = connection.createSession(true, Session.SESSION_TRANSACTED); + Runnable producer = () -> { + try { - String messageText = "message" + count; - try + int count = 0; + MessageProducer producer1 = session.createProducer(destination); + + while (keepRunning.get()) { - Message message = session.createTextMessage(messageText); - producer1.send(message); - session.commit(); - LOGGER.debug("Sent message " + count); + String messageText = "message" + count; + try + { + Message message = session.createTextMessage(messageText); + producer1.send(message); + session.commit(); + LOGGER.debug("Sent message " + count); - producedOneBefore.countDown(); + producedOneBefore.countDown(); - if (masterTransferred.get()) + if (masterTransferred.get()) + { + producedOneAfter.countDown(); + } + count++; + } + catch (javax.jms.IllegalStateException ise) { - producedOneAfter.countDown(); + throw ise; + } + catch (TransactionRolledBackException trbe) + { + // Pass - failover in prgoress + } + catch (JMSException je) + { + // Pass - failover in progress } - count++; - } - catch (javax.jms.IllegalStateException ise) - { - throw ise; - } - catch (TransactionRolledBackException trbe) - { - // Pass - failover in prgoress - } - catch(JMSException je) - { - // Pass - failover in progress } } - } - catch (Exception e) - { - workerException.set(e); - } - finally - { - workerShutdown.countDown(); - } - }; + catch (Exception e) + { + workerException.set(e); + } + finally + { + workerShutdown.countDown(); + } + }; - Thread backgroundWorker = new Thread(producer); - backgroundWorker.start(); + Thread backgroundWorker = new Thread(producer); + backgroundWorker.start(); - boolean workerRunning = producedOneBefore.await(5000, TimeUnit.MILLISECONDS); - assertTrue(workerRunning); + boolean workerRunning = producedOneBefore.await(5000, TimeUnit.MILLISECONDS); + assertTrue(workerRunning); - final int activeBrokerPort = _groupCreator.getBrokerPortNumberFromConnection(connection); - LOGGER.info("Active connection port {}", activeBrokerPort); + final int activeBrokerPort = getJmsProvider().getConnectedURI(connection).getPort(); + LOGGER.info("Active connection port {}", activeBrokerPort); - final int inactiveBrokerPort = _groupCreator.getPortNumberOfAnInactiveBroker(connection); - LOGGER.info("Update role attribute on inactive broker on port {}", inactiveBrokerPort); + final int inactiveBrokerPort = getBrokerAdmin().getAmqpPort(activeBrokerPort); + LOGGER.info("Update role attribute on inactive broker on port {}", inactiveBrokerPort); - _groupCreator.awaitNodeToAttainRole(activeBrokerPort, inactiveBrokerPort, "REPLICA"); - Map<String, Object> attributes = _groupCreator.getNodeAttributes(activeBrokerPort, inactiveBrokerPort); - assertEquals("Inactive broker has unexpected role", "REPLICA", attributes.get(BDBHAVirtualHostNode.ROLE)); + getBrokerAdmin().awaitNodeRole(inactiveBrokerPort, "REPLICA"); + Map<String, Object> attributes = getBrokerAdmin().getNodeAttributes(inactiveBrokerPort); + assertEquals("Inactive broker has unexpected role", "REPLICA", attributes.get(BDBHAVirtualHostNode.ROLE)); - _groupCreator.setNodeAttributes(activeBrokerPort, inactiveBrokerPort, Collections.singletonMap(BDBHAVirtualHostNode.ROLE, "MASTER")); + getBrokerAdmin().setNodeAttributes(inactiveBrokerPort, + Collections.singletonMap(BDBHAVirtualHostNode.ROLE, "MASTER")); - _failoverListener.awaitFailoverCompletion(FAILOVER_COMPLETION_TIMEOUT); - LOGGER.info("Failover has finished"); + _failoverListener.awaitFailoverCompletion(FAILOVER_COMPLETION_TIMEOUT); + LOGGER.info("Failover has finished"); - attributes = _groupCreator.getNodeAttributes(inactiveBrokerPort); - assertEquals("New master has unexpected role", "MASTER", attributes.get(BDBHAVirtualHostNode.ROLE)); + attributes = getBrokerAdmin().getNodeAttributes(inactiveBrokerPort); + assertEquals("New master has unexpected role", "MASTER", attributes.get(BDBHAVirtualHostNode.ROLE)); - _groupCreator.awaitNodeToAttainRole(activeBrokerPort, "REPLICA"); + getBrokerAdmin().awaitNodeRole(activeBrokerPort, "REPLICA"); - LOGGER.info("Master transfer known to have completed successfully."); - masterTransferred.set(true); + LOGGER.info("Master transfer known to have completed successfully."); + masterTransferred.set(true); - boolean producedMore = producedOneAfter.await(5000, TimeUnit.MILLISECONDS); - assertTrue("Should have successfully produced at least one message after transfer complete", producedMore); + boolean producedMore = producedOneAfter.await(5000, TimeUnit.MILLISECONDS); + assertTrue("Should have successfully produced at least one message after transfer complete", producedMore); - keepRunning.set(false); - boolean shutdown = workerShutdown.await(5000, TimeUnit.MILLISECONDS); - assertTrue("Worker thread should have shutdown", shutdown); + keepRunning.set(false); + boolean shutdown = workerShutdown.await(5000, TimeUnit.MILLISECONDS); + assertTrue("Worker thread should have shutdown", shutdown); - backgroundWorker.join(5000); - assertNull(workerException.get()); + backgroundWorker.join(5000); + assertThat(workerException.get(), is(nullValue())); - assertNotNull(session.createTemporaryQueue()); + assertNotNull(session.createTemporaryQueue()); + } + finally + { + connection.close(); + } } + @Test public void testInFlightTransactionsWhilstMajorityIsLost() throws Exception { - if (getBrokerProtocol().equals(Protocol.AMQP_1_0)) - { - // TODO - QPIDJMS-366 - there seems to be a client defect when a JMS operation is interrupted - // by a graceful connection close from the client side. - return; - } - int connectionNumber = Integer.getInteger("MultiNodeTest.testInFlightTransactionsWhilstMajorityIsLost.numberOfConnections", 20); - ExecutorService executorService = Executors.newFixedThreadPool(connectionNumber + NUMBER_OF_NODES - 1); + int connectionNumber = Integer.getInteger( + "MultiNodeTest.testInFlightTransactionsWhilstMajorityIsLost.numberOfConnections", + 20); + ExecutorService executorService = Executors.newFixedThreadPool(connectionNumber + 2); try { - final ConnectionBuilder consumerBuilder = _groupCreator.getConnectionBuilderForAllClusterNodes(100, 100); - final Connection consumerConnection = consumerBuilder.build(); - Session s = consumerConnection.createSession(true, Session.SESSION_TRANSACTED); - getJmsProvider().createQueue(s, getTestQueueName()); - s.close(); + final ConnectionBuilder connectionBuilder = + getConnectionBuilder().setFailoverReconnectDelay(100).setFailoverReconnectAttempts(100); + final Connection consumerConnection = connectionBuilder.build(); + try + { + Destination destination = createTestQueue(consumerConnection); + consumerConnection.start(); - consumerConnection.start(); + final Session consumerSession = consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); + consumerSession.createConsumer(destination).setMessageListener(message -> { + try + { + LOGGER.info("Message received: " + ((TextMessage) message).getText()); + } + catch (JMSException e) + { + LOGGER.error("Failure to get message text", e); + } + }); - final Session consumerSession = consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); - final Destination destination = consumerSession.createQueue(getTestQueueName()); - consumerSession.createConsumer(destination).setMessageListener(message -> { - try + final Connection[] connections = new Connection[connectionNumber]; + final Session[] sessions = new Session[connectionNumber]; + for (int i = 0; i < sessions.length; i++) { - LOGGER.info("Message received: " + ((TextMessage) message).getText()); + connections[i] = connectionBuilder.setClientId("test-" + UUID.randomUUID()).build(); + sessions[i] = connections[i].createSession(true, Session.SESSION_TRANSACTED); + LOGGER.info("Session {} is created", i); } - catch (JMSException e) + try { - LOGGER.error("Failure to get message text", e); - } - }); - - final Connection[] connections = new Connection[connectionNumber]; - final Session[] sessions = new Session[connectionNumber]; - for (int i = 0; i < sessions.length; i++) - { - final ConnectionBuilder builder = _groupCreator.getConnectionBuilderForAllClusterNodes(100, 100); - connections[i] = builder.build(); - sessions[i] = connections[i].createSession(true, Session.SESSION_TRANSACTED); - LOGGER.info("Session {} is created", i); - } + Set<Integer> ports = + Arrays.stream(getBrokerAdmin().getGroupAmqpPorts()).boxed().collect(Collectors.toSet()); - List<Integer> ports = new ArrayList<>(_groupCreator.getBrokerPortNumbersForNodes()); + int maxMessageSize = 10; + StringBuilder sb = new StringBuilder(); + for (int i = 0; i < maxMessageSize - 2; i++) + { + sb.append("X"); + } + String messageText = sb.toString(); + for (int n = 0; n < 3; n++) + { + LOGGER.info("Starting iteration {}", n); - int maxMessageSize = 10; - StringBuilder sb = new StringBuilder(); - for (int i = 0; i < maxMessageSize - 2; i++) - { - sb.append("X"); - } - String messageText = sb.toString(); - for (int n = 0; n < NUMBER_OF_NODES; n++) - { - LOGGER.info("Starting iteration {}", n); + FailoverAwaitingListener failoverListener = new FailoverAwaitingListener(connectionNumber); - FailoverAwaitingListener failoverListener = new FailoverAwaitingListener(connectionNumber); + for (int i = 0; i < sessions.length; i++) + { + Connection connection = connections[i]; + getJmsProvider().addGenericConnectionListener(connection, failoverListener); - for (int i = 0; i < sessions.length; i++) - { - Connection connection = connections[i]; - getJmsProvider().addGenericConnectionListener(connection, failoverListener); + MessageProducer producer = sessions[i].createProducer(destination); + Message message = sessions[i].createTextMessage(messageText + "-" + i); + producer.send(message); + } - MessageProducer producer = sessions[i].createProducer(destination); - Message message = sessions[i].createTextMessage(messageText + "-" + i); - producer.send(message); - } + LOGGER.info("All publishing sessions have uncommitted transactions"); - LOGGER.info("All publishing sessions have uncommitted transactions"); + final int activeBrokerPort = getJmsProvider().getConnectedURI(connections[0]).getPort(); + LOGGER.info("Active connection port {}", activeBrokerPort); - final int activeBrokerPort = _groupCreator.getBrokerPortNumberFromConnection(connections[0]); - LOGGER.info("Active connection port {}", activeBrokerPort); + List<Integer> inactivePorts = new ArrayList<>(ports); + inactivePorts.remove(new Integer(activeBrokerPort)); - List<Integer> inactivePorts = new ArrayList<>(ports); - inactivePorts.remove(new Integer(activeBrokerPort)); + final CountDownLatch latch = new CountDownLatch(inactivePorts.size()); + for (int port : inactivePorts) + { + final int inactiveBrokerPort = port; + LOGGER.info("Stop node for inactive broker on port " + inactiveBrokerPort); + + executorService.submit(() -> { + try + { + getBrokerAdmin().setNodeAttributes(inactiveBrokerPort, + Collections.singletonMap( + BDBHAVirtualHostNode.DESIRED_STATE, + State.STOPPED.name())); + } + catch (Exception e) + { + LOGGER.error("Failed to stop node on broker with port {}", inactiveBrokerPort, e); + } + finally + { + latch.countDown(); + } + }); + } - final CountDownLatch latch = new CountDownLatch(inactivePorts.size()); - for (int port : inactivePorts) - { - final int inactiveBrokerPort = port; - LOGGER.info("Stop node for inactive broker on port " + inactiveBrokerPort); + latch.await(500, TimeUnit.MILLISECONDS); - executorService.submit(() -> { - try + LOGGER.info("Committing transactions in parallel to provoke a lot of syncing to disk"); + for (final Session session : sessions) { - _groupCreator.setNodeAttributes(inactiveBrokerPort, - inactiveBrokerPort, - Collections.singletonMap( - BDBHAVirtualHostNode.DESIRED_STATE, - State.STOPPED.name())); + executorService.submit(() -> { + try + { + session.commit(); + } + catch (JMSException e) + { + // majority of commits might fail due to insufficient replicas + } + }); } - catch (Exception e) + + LOGGER.info("Verify that stopped nodes are in detached role"); + for (int port : inactivePorts) { - LOGGER.error("Failed to stop node on broker with port {}", inactiveBrokerPort, e); + getBrokerAdmin().awaitNodeRole(port, NodeRole.DETACHED.name()); } - finally + + LOGGER.info("Start stopped nodes"); + for (int port : inactivePorts) { - latch.countDown(); + LOGGER.info("Starting node for inactive broker on port " + port); + try + { + getBrokerAdmin().setNodeAttributes(port, + Collections.singletonMap( + BDBHAVirtualHostNode.DESIRED_STATE, + State.ACTIVE.name())); + } + catch (Exception e) + { + LOGGER.error("Failed to start node on broker with port " + port, e); + } } - }); - } - latch.await(500, TimeUnit.MILLISECONDS); + for (int port : ports) + { + getBrokerAdmin().awaitNodeRole(port, "REPLICA", "MASTER"); + } - LOGGER.info("Committing transactions in parallel to provoke a lot of syncing to disk"); - for (final Session session : sessions) - { - executorService.submit(() -> { - try + if (failoverListener.isFailoverStarted()) { - session.commit(); + LOGGER.info("Waiting for failover completion"); + failoverListener.awaitFailoverCompletion(20000 * connectionNumber); + LOGGER.info("Failover has finished"); } - catch (JMSException e) + else { - // majority of commits might fail due to insufficient replicas + LOGGER.info("Failover never started"); } - }); - } - - LOGGER.info("Verify that stopped nodes are in detached role"); - for (int port : inactivePorts) - { - _groupCreator.awaitNodeToAttainRole(port, NodeRole.DETACHED.name()); + } } - - LOGGER.info("Start stopped nodes"); - for (int port : inactivePorts) + finally { - LOGGER.info("Starting node for inactive broker on port " + port); - try - { - _groupCreator.setNodeAttributes(port, - port, - Collections.singletonMap( - BDBHAVirtualHostNode.DESIRED_STATE, - State.ACTIVE.name())); - } - catch (Exception e) + for (Connection c: connections) { - LOGGER.error("Failed to start node on broker with port " + port, e); + try + { + c.close(); + } + finally + { + LOGGER.error("Unexpected exception on connection close"); + } } } - - for (int port : ports) - { - _groupCreator.awaitNodeToAttainRole(port, "REPLICA", "MASTER"); - } - - if (failoverListener.isFailoverStarted()) - { - LOGGER.info("Waiting for failover completion"); - failoverListener.awaitFailoverCompletion(20000 * connectionNumber); - LOGGER.info("Failover has finished"); - } - else - { - LOGGER.info("Failover never started"); - } + } + finally + { + consumerConnection.close(); } } finally @@ -646,127 +684,148 @@ public class MultiNodeTest extends QpidBrokerTestCase * Tests aims to demonstrate that in a disaster situation (where all nodes except the master is lost), that operation * can be continued from a single node using the QUORUM_OVERRIDE feature. */ + @Test public void testQuorumOverride() throws Exception { - final Connection connection = _positiveFailoverBuilder.build(); - getJmsProvider().addGenericConnectionListener(connection, _failoverListener); - Session session = connection.createSession(true, Session.SESSION_TRANSACTED); - getJmsProvider().createQueue(session, getTestQueueName()); - session.close(); + final Connection connection = getConnectionBuilder().build(); + try + { + getJmsProvider().addGenericConnectionListener(connection, _failoverListener); + Destination queue = createTestQueue(connection); - Set<Integer> ports = _groupCreator.getBrokerPortNumbersForNodes(); + Set<Integer> ports = + Arrays.stream(getBrokerAdmin().getGroupAmqpPorts()).boxed().collect(Collectors.toSet()); - final int activeBrokerPort = _groupCreator.getBrokerPortNumberFromConnection(connection); - ports.remove(activeBrokerPort); + final int activeBrokerPort = getJmsProvider().getConnectedURI(connection).getPort(); + ports.remove(activeBrokerPort); - // Stop all other nodes - for (Integer p : ports) - { - _groupCreator.stopNode(p); - } + // Stop all other nodes + for (Integer p : ports) + { + getBrokerAdmin().stopNode(p); + } - LOGGER.info("Awaiting failover to start"); - _failoverListener.awaitPreFailover(FAILOVER_COMPLETION_TIMEOUT); - LOGGER.info("Failover has begun"); + LOGGER.info("Awaiting failover to start"); + _failoverListener.awaitPreFailover(FAILOVER_COMPLETION_TIMEOUT); + LOGGER.info("Failover has begun"); - Map<String, Object> attributes = _groupCreator.getNodeAttributes(activeBrokerPort); - assertEquals("Broker has unexpected quorum override", new Integer(0), attributes.get(BDBHAVirtualHostNode.QUORUM_OVERRIDE)); - _groupCreator.setNodeAttributes(activeBrokerPort, Collections.singletonMap(BDBHAVirtualHostNode.QUORUM_OVERRIDE, 1)); + Map<String, Object> attributes = getBrokerAdmin().getNodeAttributes(activeBrokerPort); + assertEquals("Broker has unexpected quorum override", + new Integer(0), + attributes.get(BDBHAVirtualHostNode.QUORUM_OVERRIDE)); + getBrokerAdmin().setNodeAttributes(activeBrokerPort, + Collections.singletonMap(BDBHAVirtualHostNode.QUORUM_OVERRIDE, 1)); - attributes = _groupCreator.getNodeAttributes(activeBrokerPort); - assertEquals("Broker has unexpected quorum override", new Integer(1), attributes.get(BDBHAVirtualHostNode.QUORUM_OVERRIDE)); + attributes = getBrokerAdmin().getNodeAttributes(activeBrokerPort); + assertEquals("Broker has unexpected quorum override", + new Integer(1), + attributes.get(BDBHAVirtualHostNode.QUORUM_OVERRIDE)); - _failoverListener.awaitFailoverCompletion(FAILOVER_COMPLETION_TIMEOUT); - LOGGER.info("Failover has finished"); + _failoverListener.awaitFailoverCompletion(FAILOVER_COMPLETION_TIMEOUT); + LOGGER.info("Failover has finished"); - assertProducingConsuming(connection); + assertThat(Utils.produceConsume(connection, queue), is(equalTo(true))); + } + finally + { + connection.close(); + } } + @Test public void testPriority() throws Exception { - final Connection connection = _positiveFailoverBuilder.build(); - getJmsProvider().addGenericConnectionListener(connection, _failoverListener); - Session session = connection.createSession(true, Session.SESSION_TRANSACTED); - getJmsProvider().createQueue(session, getTestQueueName()); - session.close(); + final Connection connection = getConnectionBuilder().build(); + try + { + getJmsProvider().addGenericConnectionListener(connection, _failoverListener); + Destination queue = createTestQueue(connection); - final int activeBrokerPort = _groupCreator.getBrokerPortNumberFromConnection(connection); - LOGGER.info("Active connection port {}", activeBrokerPort); + final int activeBrokerPort = getJmsProvider().getConnectedURI(connection).getPort(); + LOGGER.info("Active connection port {}", activeBrokerPort); - int priority = 1; - Integer highestPriorityBrokerPort = null; - Set<Integer> ports = _groupCreator.getBrokerPortNumbersForNodes(); - for (Integer port : ports) - { - if (activeBrokerPort != port) + int priority = 1; + Integer highestPriorityBrokerPort = null; + Set<Integer> ports = + Arrays.stream(getBrokerAdmin().getGroupAmqpPorts()).boxed().collect(Collectors.toSet()); + for (Integer port : ports) { - priority = priority + 1; - highestPriorityBrokerPort = port; - _groupCreator.setNodeAttributes(port, port, Collections.singletonMap(BDBHAVirtualHostNode.PRIORITY, priority)); - Map<String, Object> attributes = _groupCreator.getNodeAttributes(port, port); - assertEquals("Broker has unexpected priority", priority, attributes.get(BDBHAVirtualHostNode.PRIORITY)); + if (activeBrokerPort != port) + { + priority = priority + 1; + highestPriorityBrokerPort = port; + getBrokerAdmin().setNodeAttributes(port, + Collections.singletonMap(BDBHAVirtualHostNode.PRIORITY, + priority)); + Map<String, Object> attributes = getBrokerAdmin().getNodeAttributes(port); + assertEquals("Broker has unexpected priority", + priority, + attributes.get(BDBHAVirtualHostNode.PRIORITY)); + } } - } - LOGGER.info("Broker on port {} has the highest priority of {}", highestPriorityBrokerPort, priority); + LOGGER.info("Broker on port {} has the highest priority of {}", highestPriorityBrokerPort, priority); - // make sure all remote nodes are materialized on the master - // in order to make sure that DBPing is not invoked - for (Integer port : ports) - { - if (activeBrokerPort != port) + for (Integer port : ports) { - _groupCreator.awaitNodeToAttainAttributeValue(activeBrokerPort, port, BDBHARemoteReplicationNode.ROLE, "REPLICA"); + if (activeBrokerPort != port) + { + getBrokerAdmin().awaitNodeRole(port, BDBHARemoteReplicationNode.ROLE, "REPLICA"); + } } - } - // do work on master - assertProducingConsuming(connection); + // do work on master + assertThat(Utils.produceConsume(connection, queue), is(equalTo(true))); - Map<String, Object> masterNodeAttributes = _groupCreator.getNodeAttributes(activeBrokerPort); + Map<String, Object> masterNodeAttributes = getBrokerAdmin().getNodeAttributes(activeBrokerPort); - Object lastTransactionId = masterNodeAttributes.get(BDBHAVirtualHostNode.LAST_KNOWN_REPLICATION_TRANSACTION_ID); - assertTrue("Unexpected last transaction id: " + lastTransactionId, lastTransactionId instanceof Number); + Object lastTransactionId = + masterNodeAttributes.get(BDBHAVirtualHostNode.LAST_KNOWN_REPLICATION_TRANSACTION_ID); + assertTrue("Unexpected last transaction id: " + lastTransactionId, lastTransactionId instanceof Number); - // make sure all remote nodes have the same transaction id as master - for (Integer port : ports) - { - if (activeBrokerPort != port) + // make sure all remote nodes have the same transaction id as master + for (Integer port : ports) { - _groupCreator.awaitNodeToAttainAttributeValue(activeBrokerPort, - port, - BDBHARemoteReplicationNode.LAST_KNOWN_REPLICATION_TRANSACTION_ID, - String.valueOf(lastTransactionId)); + if (activeBrokerPort != port) + { + getBrokerAdmin().awaitNodeToAttainAttributeValue(activeBrokerPort, + BDBHAVirtualHostNode.LAST_KNOWN_REPLICATION_TRANSACTION_ID, + lastTransactionId); + } } - } - LOGGER.info("Shutting down the MASTER"); - _groupCreator.stopNode(activeBrokerPort); + LOGGER.info("Shutting down the MASTER"); + getBrokerAdmin().stopNode(activeBrokerPort); - _failoverListener.awaitFailoverCompletion(FAILOVER_COMPLETION_TIMEOUT); - LOGGER.info("Listener has finished"); + _failoverListener.awaitFailoverCompletion(FAILOVER_COMPLETION_TIMEOUT); + LOGGER.info("Listener has finished"); - Map<String, Object> attributes = _groupCreator.getNodeAttributes(highestPriorityBrokerPort, highestPriorityBrokerPort); - assertEquals("Inactive broker has unexpected role", "MASTER", attributes.get(BDBHAVirtualHostNode.ROLE)); + Map<String, Object> attributes = + getBrokerAdmin().getNodeAttributes(highestPriorityBrokerPort); + assertEquals("Inactive broker has unexpected role", "MASTER", attributes.get(BDBHAVirtualHostNode.ROLE)); - assertProducingConsuming(connection); + assertThat(Utils.produceConsume(connection, queue), is(equalTo(true))); + } + finally + { + connection.close(); + } } + @Test public void testClusterCannotStartWithIntruder() throws Exception { - //set property explicitly as test requires broker to start to enable check for ERRORED nodes - setSystemProperty(Broker.BROKER_FAIL_STARTUP_WITH_ERRORED_CHILD, String.valueOf(Boolean.FALSE)); - - int intruderPort = getNextAvailable(Collections.max(_groupCreator.getBdbPortNumbers()) + 1); + int intruderPort = + new PortHelper().getNextAvailable(Arrays.stream(getBrokerAdmin().getBdbPorts()).max().getAsInt() + 1); String nodeName = "intruder"; - String nodeHostPort = _groupCreator.getIpAddressOfBrokerHost() + ":" + intruderPort; + String nodeHostPort = getBrokerAdmin().getHost() + ":" + intruderPort; File environmentPathFile = Files.createTempDirectory("qpid-work-intruder").toFile(); try { environmentPathFile.mkdirs(); ReplicationConfig replicationConfig = - new ReplicationConfig(_groupCreator.getGroupName(), nodeName, nodeHostPort); - replicationConfig.setHelperHosts(_groupCreator.getHelperHostPort()); + new ReplicationConfig("test", nodeName, nodeHostPort); + replicationConfig.setHelperHosts(getBrokerAdmin().getHelperHostPort()); EnvironmentConfig envConfig = new EnvironmentConfig(); envConfig.setAllowCreate(true); envConfig.setTransactional(true); @@ -775,7 +834,9 @@ public class MultiNodeTest extends QpidBrokerTestCase Durability.ReplicaAckPolicy.SIMPLE_MAJORITY)); final String currentThreadName = Thread.currentThread().getName(); - try(ReplicatedEnvironment intruder = new ReplicatedEnvironment(environmentPathFile, replicationConfig, envConfig)) + try (ReplicatedEnvironment intruder = new ReplicatedEnvironment(environmentPathFile, + replicationConfig, + envConfig)) { LOGGER.debug("Intruder started"); } @@ -784,23 +845,24 @@ public class MultiNodeTest extends QpidBrokerTestCase Thread.currentThread().setName(currentThreadName); } - for (int port : _groupCreator.getBrokerPortNumbersForNodes()) + Set<Integer> ports = + Arrays.stream(getBrokerAdmin().getGroupAmqpPorts()).boxed().collect(Collectors.toSet()); + for (int port : ports) { - _groupCreator.awaitNodeToAttainAttributeValue(port, - port, - BDBHAVirtualHostNode.STATE, - State.ERRORED.name()); + getBrokerAdmin().awaitNodeToAttainAttributeValue(port, + BDBHAVirtualHostNode.STATE, + State.ERRORED.name()); } - _groupCreator.stopCluster(); - _groupCreator.startCluster(); + getBrokerAdmin().stop(); + getBrokerAdmin().start(false); - for (int port : _groupCreator.getBrokerPortNumbersForNodes()) + for (int port : ports) { - _groupCreator.awaitNodeToAttainAttributeValue(port, - port, - BDBHAVirtualHostNode.STATE, - State.ERRORED.name()); + getBrokerAdmin().awaitNodeToAttainAttributeValue(port, + + BDBHAVirtualHostNode.STATE, + State.ERRORED.name()); } } finally @@ -844,10 +906,10 @@ public class MultiNodeTest extends QpidBrokerTestCase if (!_failoverCompletionLatch.await(delay, TimeUnit.MILLISECONDS)) { LOGGER.warn("Failover did not occur, dumping threads:\n\n" + TestUtils.dumpThreads() + "\n"); - Map<Integer,String> threadDumps = _groupCreator.groupThreadumps(); - for (Map.Entry<Integer,String> entry : threadDumps.entrySet()) + Map<Integer, String> threadDumps = getBrokerAdmin().groupThreadDumps(); + for (Map.Entry<Integer, String> entry : threadDumps.entrySet()) { - LOGGER.warn("Broker {} thread dump:\n\n {}" , entry.getKey(), entry.getValue()); + LOGGER.warn("Broker {} thread dump:\n\n {}", entry.getKey(), entry.getValue()); } } assertEquals("Failover did not occur", 0, _failoverCompletionLatch.getCount()); @@ -870,5 +932,4 @@ public class MultiNodeTest extends QpidBrokerTestCase return _failoverStarted; } } - }
http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/88a12e8c/bdbstore/systests/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/TwoNodeTest.java ---------------------------------------------------------------------- diff --git a/bdbstore/systests/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/TwoNodeTest.java b/bdbstore/systests/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/TwoNodeTest.java index 3e0783d..151592c 100644 --- a/bdbstore/systests/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/TwoNodeTest.java +++ b/bdbstore/systests/src/test/java/org/apache/qpid/server/store/berkeleydb/replication/TwoNodeTest.java @@ -19,157 +19,151 @@ */ package org.apache.qpid.server.store.berkeleydb.replication; -import java.util.Collections; +import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.CoreMatchers.is; +import static org.junit.Assert.assertThat; +import static org.junit.Assert.fail; + import java.util.Map; import javax.jms.Connection; import javax.jms.JMSException; -import javax.jms.Session; +import javax.jms.Queue; + +import org.junit.Test; -import org.apache.qpid.server.model.Protocol; import org.apache.qpid.server.virtualhostnode.berkeleydb.BDBHAVirtualHostNode; -import org.apache.qpid.systests.ConnectionBuilder; -import org.apache.qpid.test.utils.QpidBrokerTestCase; +import org.apache.qpid.systests.Utils; +import org.apache.qpid.tests.utils.RunBrokerAdmin; -public class TwoNodeTest extends QpidBrokerTestCase +@RunBrokerAdmin(type = "BDB-HA") +@GroupConfig(numberOfNodes = 2, groupName = "test") +public class TwoNodeTest extends GroupJmsTestBase { - private static final String VIRTUAL_HOST = "test"; - - private static final int NUMBER_OF_NODES = 2; - - private GroupCreator _groupCreator; - /** Used when expectation is client will not (re)-connect */ - private ConnectionBuilder _positiveFailoverBuilder; - - /** Used when expectation is client will not (re)-connect */ - private ConnectionBuilder _negativeFailoverBuilder; - - @Override - protected void setUp() throws Exception + @Test + public void testMasterDesignatedPrimaryCanBeRestartedWithoutReplica() throws Exception { - assertTrue(isJavaBroker()); - assertTrue(isBrokerStorePersistent()); - - super.setUp(); - - _groupCreator = new GroupCreator(this, VIRTUAL_HOST, NUMBER_OF_NODES); - } + final Connection initialConnection = getConnectionBuilder().build(); + int masterPort; + Queue queue; + try + { + queue = createTestQueue(initialConnection); + masterPort = getJmsProvider().getConnectedURI(initialConnection).getPort(); + getBrokerAdmin().setDesignatedPrimary(masterPort, true); + } + finally + { + initialConnection.close(); + } + getBrokerAdmin().stop(); + getBrokerAdmin().startNode(masterPort); - @Override - public void startDefaultBroker() throws Exception - { - // Don't start default broker provided by QBTC. + assertProduceConsume(queue); } - private void startCluster(boolean designedPrimary) throws Exception + @Test + public void testClusterRestartWithoutDesignatedPrimary() throws Exception { - _groupCreator.configureClusterNodes(); - _groupCreator.setDesignatedPrimaryOnFirstBroker(designedPrimary); - _positiveFailoverBuilder = _groupCreator.getConnectionBuilderForAllClusterNodes(); - _negativeFailoverBuilder = _groupCreator.getConnectionBuilderForAllClusterNodes(200, 2); - _groupCreator.startCluster(); - } + Queue queue; + final Connection initialConnection = getConnectionBuilder().build(); + try + { + queue = createTestQueue(initialConnection); + assertThat(Utils.produceConsume(initialConnection, queue), is(equalTo(true))); + } + finally + { + initialConnection.close(); + } - public void testMasterDesignatedPrimaryCanBeRestartedWithoutReplica() throws Exception - { - startCluster(true); - - final Connection initialConnection = _positiveFailoverBuilder.build(); - Session session = initialConnection.createSession(true, Session.SESSION_TRANSACTED); - getJmsProvider().createQueue(session, getTestQueueName()); - session.close(); - - int masterPort = _groupCreator.getBrokerPortNumberFromConnection(initialConnection); - assertProducingConsuming(initialConnection); - initialConnection.close(); - _groupCreator.stopCluster(); - _groupCreator.startNode(masterPort); - final Connection secondConnection = _positiveFailoverBuilder.build(); - assertProducingConsuming(secondConnection); - secondConnection.close(); - } + getBrokerAdmin().stop(); + getBrokerAdmin().start(); - public void testClusterRestartWithoutDesignatedPrimary() throws Exception - { - startCluster(false); - - final Connection initialConnection = _positiveFailoverBuilder.build(); - Session session = initialConnection.createSession(true, Session.SESSION_TRANSACTED); - getJmsProvider().createQueue(session, getTestQueueName()); - session.close(); - - assertProducingConsuming(initialConnection); - initialConnection.close(); - _groupCreator.stopCluster(); - _groupCreator.startClusterParallel(); - final Connection secondConnection = _positiveFailoverBuilder.build(); - assertProducingConsuming(secondConnection); - secondConnection.close(); + assertProduceConsume(queue); } + @Test public void testDesignatedPrimaryContinuesAfterSecondaryStopped() throws Exception { - startCluster(true); - _groupCreator.stopNode(_groupCreator.getBrokerPortNumberOfSecondaryNode()); + int masterPort; + Queue queue; + final Connection initialConnection = getConnectionBuilder().build(); + try + { + masterPort = getJmsProvider().getConnectedURI(initialConnection).getPort(); + queue = createTestQueue(initialConnection); + getBrokerAdmin().setDesignatedPrimary(masterPort, true); + } + finally + { + initialConnection.close(); + } - final Connection connection = _positiveFailoverBuilder.build(); - Session session = connection.createSession(true, Session.SESSION_TRANSACTED); - getJmsProvider().createQueue(session, getTestQueueName()); - session.close(); + int replicaPort = getBrokerAdmin().getAmqpPort(masterPort); + getBrokerAdmin().stopNode(replicaPort); - assertNotNull("Expected to get a valid connection to primary", connection); - assertProducingConsuming(connection); + assertProduceConsume(queue); } + @Test public void testPersistentOperationsFailOnNonDesignatedPrimaryAfterSecondaryStopped() throws Exception { - if (getBrokerProtocol().equals(Protocol.AMQP_1_0)) + int masterPort; + Queue queue ; + final Connection initialConnection = getConnectionBuilder().build(); + try { - // TODO - there seems to be a client defect when a JMS operation is interrupted - // by a graceful connection close from the client side. - return; + masterPort = getJmsProvider().getConnectedURI(initialConnection).getPort(); + queue = createTestQueue(initialConnection); + } + finally + { + initialConnection.close(); } - startCluster(false); - - final Connection initialConnection = _negativeFailoverBuilder.build(); - Session session = initialConnection.createSession(true, Session.SESSION_TRANSACTED); - getJmsProvider().createQueue(session, getTestQueueName()); - initialConnection.close(); + int replicaPort = getBrokerAdmin().getAmqpPort(masterPort); - _groupCreator.stopNode(_groupCreator.getBrokerPortNumberOfSecondaryNode()); + getBrokerAdmin().stopNode(replicaPort); try { - - Connection connection = _negativeFailoverBuilder.build(); - assertProducingConsuming(connection); + Connection connection = getConnectionBuilder().setFailoverReconnectDelay(SHORT_FAILOVER_CONNECTDELAY) + .setFailoverReconnectAttempts(SHORT_FAILOVER_CYCLECOUNT) + .build(); + Utils.produceConsume(connection, queue); fail("Exception not thrown"); } catch(JMSException e) { - // JMSException should be thrown either on getConnection, or produce/consume + // JMSException should be thrown either on connection open, or produce/consume // depending on whether the relative timing of the node discovering that the // secondary has gone. } } + @Test public void testSecondaryDoesNotBecomePrimaryWhenDesignatedPrimaryStopped() throws Exception { - if (getBrokerProtocol().equals(Protocol.AMQP_1_0)) + int masterPort; + final Connection initialConnection = getConnectionBuilder().build(); + try { - // TODO - there seems to be a client defect when a JMS operation is interrupted - // by a graceful connection close from the client side. - return; + masterPort = getJmsProvider().getConnectedURI(initialConnection).getPort(); + getBrokerAdmin().setDesignatedPrimary(masterPort, true); + } + finally + { + initialConnection.close(); } - startCluster(true); - _groupCreator.stopNode(_groupCreator.getBrokerPortNumberOfPrimary()); - + getBrokerAdmin().stopNode(masterPort); try { - _negativeFailoverBuilder.build(); + getConnectionBuilder().setFailoverReconnectDelay(SHORT_FAILOVER_CONNECTDELAY) + .setFailoverReconnectAttempts(SHORT_FAILOVER_CYCLECOUNT) + .build(); fail("Connection not expected"); } catch (JMSException e) @@ -178,70 +172,92 @@ public class TwoNodeTest extends QpidBrokerTestCase } } + @Test public void testInitialDesignatedPrimaryStateOfNodes() throws Exception { - startCluster(true); + int masterPort; + final Connection initialConnection = getConnectionBuilder().build(); + try + { + masterPort = getJmsProvider().getConnectedURI(initialConnection).getPort(); + getBrokerAdmin().setDesignatedPrimary(masterPort, true); + } + finally + { + initialConnection.close(); + } + + Map<String, Object> + primaryNodeAttributes = getBrokerAdmin().getNodeAttributes(masterPort); + assertThat("Expected primary node to be set as designated primary", + primaryNodeAttributes.get(BDBHAVirtualHostNode.DESIGNATED_PRIMARY), is(equalTo(true))); - Map<String, Object> primaryNodeAttributes = _groupCreator.getNodeAttributes(_groupCreator.getBrokerPortNumberOfPrimary()); - assertTrue("Expected primary node to be set as designated primary", - (Boolean) primaryNodeAttributes.get(BDBHAVirtualHostNode.DESIGNATED_PRIMARY)); + int replicaPort = getBrokerAdmin().getAmqpPort(masterPort); - Map<String, Object> secondaryNodeAttributes = _groupCreator.getNodeAttributes(_groupCreator.getBrokerPortNumberOfSecondaryNode()); - assertFalse("Expected secondary node to NOT be set as designated primary", - (Boolean) secondaryNodeAttributes.get(BDBHAVirtualHostNode.DESIGNATED_PRIMARY)); + Map<String, Object> secondaryNodeAttributes = getBrokerAdmin().getNodeAttributes(replicaPort); + assertThat("Expected secondary node to NOT be set as designated primary", + secondaryNodeAttributes.get(BDBHAVirtualHostNode.DESIGNATED_PRIMARY), is(equalTo(false))); } + @Test public void testSecondaryDesignatedAsPrimaryAfterOriginalPrimaryStopped() throws Exception { - startCluster(true); - - final Connection initialConnection = _positiveFailoverBuilder.build(); - Session session = initialConnection.createSession(true, Session.SESSION_TRANSACTED); - getJmsProvider().createQueue(session, getTestQueueName()); - initialConnection.close(); + int masterPort; + Queue queue; + final Connection initialConnection = getConnectionBuilder().build(); + try + { + masterPort = getJmsProvider().getConnectedURI(initialConnection).getPort(); + getBrokerAdmin().setDesignatedPrimary(masterPort, true); + queue = createTestQueue(initialConnection); + } + finally + { + initialConnection.close(); + } - _groupCreator.stopNode(_groupCreator.getBrokerPortNumberOfPrimary()); + getBrokerAdmin().stopNode(masterPort); + int replicaPort = getBrokerAdmin().getAmqpPort(masterPort); - Map<String, Object> secondaryNodeAttributes = _groupCreator.getNodeAttributes(_groupCreator.getBrokerPortNumberOfSecondaryNode()); - assertFalse("Expected node to NOT be set as designated primary", (Boolean) secondaryNodeAttributes.get(BDBHAVirtualHostNode.DESIGNATED_PRIMARY)); + Map<String, Object> secondaryNodeAttributes = getBrokerAdmin().getNodeAttributes(replicaPort); + assertThat("Expected secondary node to NOT be set as designated primary", + secondaryNodeAttributes.get(BDBHAVirtualHostNode.DESIGNATED_PRIMARY), is(equalTo(false))); - _groupCreator.setNodeAttributes(_groupCreator.getBrokerPortNumberOfSecondaryNode(), Collections.<String, Object>singletonMap(BDBHAVirtualHostNode.DESIGNATED_PRIMARY, true)); + getBrokerAdmin().setDesignatedPrimary(replicaPort, true); + getBrokerAdmin().awaitNodeRole(replicaPort, "MASTER"); - int timeout = 5000; - long limit = System.currentTimeMillis() + timeout; - while( !((Boolean)secondaryNodeAttributes.get(BDBHAVirtualHostNode.DESIGNATED_PRIMARY)) && System.currentTimeMillis() < limit) - { - Thread.sleep(100); - secondaryNodeAttributes = _groupCreator.getNodeAttributes(_groupCreator.getBrokerPortNumberOfSecondaryNode()); - } - assertTrue("Expected secondary to transition to primary within " + timeout, (Boolean) secondaryNodeAttributes.get(BDBHAVirtualHostNode.DESIGNATED_PRIMARY)); - - final Connection connection = _positiveFailoverBuilder.build(); - assertNotNull("Expected to get a valid connection to new primary", connection); - assertProducingConsuming(connection); + assertProduceConsume(queue); } + @Test public void testSetDesignatedAfterReplicaBeingStopped() throws Exception { - startCluster(false); - - final Connection initialConnection = _positiveFailoverBuilder.build(); - Session session = initialConnection.createSession(true, Session.SESSION_TRANSACTED); - getJmsProvider().createQueue(session, getTestQueueName()); - initialConnection.close(); + final Connection initialConnection = getConnectionBuilder().build(); + int masterPort; + Queue queue; + try + { + masterPort = getJmsProvider().getConnectedURI(initialConnection).getPort(); + queue = createTestQueue(initialConnection); + } + finally + { + initialConnection.close(); + } - _groupCreator.stopNode(_groupCreator.getBrokerPortNumberOfSecondaryNode()); + int replicaPort = getBrokerAdmin().getAmqpPort(masterPort); + getBrokerAdmin().stopNode(replicaPort); - Map<String, Object> secondaryNodeAttributes = _groupCreator.getNodeAttributes(_groupCreator.getBrokerPortNumberOfPrimary()); - assertFalse("Expected node to NOT be set as designated primary", (Boolean) secondaryNodeAttributes.get(BDBHAVirtualHostNode.DESIGNATED_PRIMARY)); + Map<String, Object> + primaryNodeAttributes = getBrokerAdmin().getNodeAttributes(masterPort); + assertThat("Expected node to NOT be set as designated primary", + primaryNodeAttributes.get(BDBHAVirtualHostNode.DESIGNATED_PRIMARY), is(equalTo(false))); - _groupCreator.setNodeAttributes(_groupCreator.getBrokerPortNumberOfPrimary(), Collections.<String, Object>singletonMap(BDBHAVirtualHostNode.DESIGNATED_PRIMARY, true)); - _groupCreator.awaitNodeToAttainRole(_groupCreator.getBrokerPortNumberOfPrimary(), "MASTER" ); + getBrokerAdmin().setDesignatedPrimary(masterPort, true); + getBrokerAdmin().awaitNodeRole(masterPort, "MASTER"); - final Connection connection = _positiveFailoverBuilder.build(); - assertNotNull("Expected to get a valid connection to primary", connection); - assertProducingConsuming(connection); + assertProduceConsume(queue); } } http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/88a12e8c/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index 2e8d623..7cd016a 100644 --- a/pom.xml +++ b/pom.xml @@ -201,6 +201,7 @@ <module>systests/protocol-tests-amqp-0-8</module> <module>systests/protocol-tests-amqp-0-10</module> <module>systests/protocol-tests-amqp-1-0</module> + <module>systests/qpid-systests-spawn-admin</module> <module>systests/end-to-end-conversion-tests</module> <module>perftests</module> <module>qpid-perftests-systests</module> @@ -453,6 +454,12 @@ <version>${project.version}</version> </dependency> + <dependency> + <groupId>org.apache.qpid</groupId> + <artifactId>qpid-systests-spawn-admin</artifactId> + <version>${project.version}</version> + </dependency> + <!-- External dependencies --> <dependency> <groupId>org.apache.qpid</groupId> http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/88a12e8c/systests/qpid-systests-http-management/src/main/java/org/apache/qpid/tests/http/HttpTestBase.java ---------------------------------------------------------------------- diff --git a/systests/qpid-systests-http-management/src/main/java/org/apache/qpid/tests/http/HttpTestBase.java b/systests/qpid-systests-http-management/src/main/java/org/apache/qpid/tests/http/HttpTestBase.java index a41ee39..91255ef 100644 --- a/systests/qpid-systests-http-management/src/main/java/org/apache/qpid/tests/http/HttpTestBase.java +++ b/systests/qpid-systests-http-management/src/main/java/org/apache/qpid/tests/http/HttpTestBase.java @@ -20,6 +20,8 @@ package org.apache.qpid.tests.http; +import static org.apache.qpid.systests.Utils.getJmsProvider; + import java.net.InetSocketAddress; import javax.jms.Connection; @@ -32,11 +34,9 @@ import org.junit.Rule; import org.junit.rules.TestName; import org.apache.qpid.server.model.Protocol; -import org.apache.qpid.systests.AmqpManagementFacade; import org.apache.qpid.systests.ConnectionBuilder; import org.apache.qpid.systests.JmsProvider; -import org.apache.qpid.systests.QpidJmsClient0xProvider; -import org.apache.qpid.systests.QpidJmsClientProvider; +import org.apache.qpid.systests.Utils; import org.apache.qpid.tests.utils.BrokerAdmin; import org.apache.qpid.tests.utils.BrokerAdminUsingTestBase; @@ -59,17 +59,7 @@ public abstract class HttpTestBase extends BrokerAdminUsingTestBase _helper = new HttpTestHelper(getBrokerAdmin(), config != null && config.useVirtualHostAsHost() ? getVirtualHost() : null); - Protocol protocol = getProtocol(); - AmqpManagementFacade managementFacade = new AmqpManagementFacade(protocol); - if (protocol == Protocol.AMQP_1_0) - { - _jmsProvider = new QpidJmsClientProvider(managementFacade); - } - else - { - _jmsProvider = new QpidJmsClient0xProvider(); - } - + _jmsProvider = getJmsProvider(); } @After @@ -121,14 +111,12 @@ public abstract class HttpTestBase extends BrokerAdminUsingTestBase protected static long getReceiveTimeout() { - return Long.getLong("qpid.test_receive_timeout", 1000L); + return Utils.getReceiveTimeout(); } protected static Protocol getProtocol() { - return Protocol.valueOf("AMQP_" + System.getProperty("broker.version", "0-9-1") - .replace('-', '_') - .replace('.', '_')); + return Utils.getProtocol(); } protected String getTestName() http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/88a12e8c/systests/qpid-systests-jms-core/src/main/java/org/apache/qpid/systests/AmqpManagementFacade.java ---------------------------------------------------------------------- diff --git a/systests/qpid-systests-jms-core/src/main/java/org/apache/qpid/systests/AmqpManagementFacade.java b/systests/qpid-systests-jms-core/src/main/java/org/apache/qpid/systests/AmqpManagementFacade.java index 647d34a..5ac57cf 100644 --- a/systests/qpid-systests-jms-core/src/main/java/org/apache/qpid/systests/AmqpManagementFacade.java +++ b/systests/qpid-systests-jms-core/src/main/java/org/apache/qpid/systests/AmqpManagementFacade.java @@ -492,10 +492,12 @@ public class AmqpManagementFacade return new HashMap<>(bodyMap); } } - throw new IllegalArgumentException("Management read failed : " - + response.getStringProperty("statusCode") - + " - " - + response.getStringProperty("statusDescription")); + throw new AmqpManagementFacade.OperationUnsuccessfulException("Management read failed : " + + response.getStringProperty("statusCode") + + " - " + + response.getStringProperty( + "statusDescription"), + response.getIntProperty("statusCode")); } finally { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org For additional commands, e-mail: commits-h...@qpid.apache.org