http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6371d64f/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverTransactionTest.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverTransactionTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverTransactionTest.java index eb5bc61..c129791 100644 --- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverTransactionTest.java +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverTransactionTest.java @@ -16,29 +16,23 @@ */ package org.apache.activemq.transport.failover; -import junit.framework.Test; - import org.apache.activemq.ActiveMQConnection; import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.ActiveMQMessageConsumer; import org.apache.activemq.AutoFailTestSupport; -import org.apache.activemq.TestSupport; -import org.apache.activemq.broker.BrokerPlugin; -import org.apache.activemq.broker.BrokerPluginSupport; -import org.apache.activemq.broker.BrokerService; -import org.apache.activemq.broker.ConnectionContext; -import org.apache.activemq.broker.ConsumerBrokerExchange; -import org.apache.activemq.broker.ProducerBrokerExchange; -import org.apache.activemq.broker.region.RegionBroker; -import org.apache.activemq.broker.util.DestinationPathSeparatorBroker; -import org.apache.activemq.command.ActiveMQDestination; -import org.apache.activemq.command.ConsumerInfo; -import org.apache.activemq.command.MessageAck; -import org.apache.activemq.command.TransactionId; -import org.apache.activemq.store.PersistenceAdapter; -import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter; +import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQConnectionContext; +import org.apache.activemq.artemis.jms.server.embedded.EmbeddedJMS; +import org.apache.activemq.broker.artemiswrapper.OpenwireArtemisBaseTest; import org.apache.activemq.transport.TransportListener; import org.apache.activemq.util.SocketProxy; +import org.jboss.byteman.contrib.bmunit.BMRule; +import org.jboss.byteman.contrib.bmunit.BMRules; +import org.jboss.byteman.contrib.bmunit.BMUnitRunner; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -69,28 +63,30 @@ import java.util.concurrent.atomic.AtomicBoolean; // see https://issues.apache.org/activemq/browse/AMQ-2473 // https://issues.apache.org/activemq/browse/AMQ-2590 -public class FailoverTransactionTest extends TestSupport { +@RunWith(BMUnitRunner.class) +public class FailoverTransactionTest extends OpenwireArtemisBaseTest { private static final Logger LOG = LoggerFactory.getLogger(FailoverTransactionTest.class); private static final String QUEUE_NAME = "Failover.WithTx"; - private static final String TRANSPORT_URI = "tcp://localhost:0"; - private String url; - BrokerService broker; + private String url = newURI(0); - public static Test suite() { - return suite(FailoverTransactionTest.class); - } + private static final AtomicBoolean doByteman = new AtomicBoolean(false); + private static CountDownLatch brokerStopLatch; + + private static SocketProxy proxy; + private static boolean firstSend; + private static int count; + + private static EmbeddedJMS broker; - @Override + @Before public void setUp() throws Exception { - super.setMaxTestTime(2 * 60 * 1000); // some boxes can be real slow - super.setAutoFail(true); - super.setUp(); + doByteman.set(false); + brokerStopLatch = new CountDownLatch(1); } - @Override + @After public void tearDown() throws Exception { - super.tearDown(); stopBroker(); } @@ -101,39 +97,19 @@ public class FailoverTransactionTest extends TestSupport { } private void startCleanBroker() throws Exception { - startBroker(true); - } - - public void startBroker(boolean deleteAllMessagesOnStartup) throws Exception { - broker = createBroker(deleteAllMessagesOnStartup); - broker.start(); + startBroker(); } - public void startBroker(boolean deleteAllMessagesOnStartup, String bindAddress) throws Exception { - broker = createBroker(deleteAllMessagesOnStartup, bindAddress); + public void startBroker() throws Exception { + broker = createBroker(); broker.start(); } - public BrokerService createBroker(boolean deleteAllMessagesOnStartup) throws Exception { - return createBroker(deleteAllMessagesOnStartup, TRANSPORT_URI); - } - - public BrokerService createBroker(boolean deleteAllMessagesOnStartup, String bindAddress) throws Exception { - broker = new BrokerService(); - broker.setUseJmx(false); - broker.setAdvisorySupport(false); - broker.addConnector(bindAddress); - broker.setDeleteAllMessagesOnStartup(deleteAllMessagesOnStartup); - - url = broker.getTransportConnectors().get(0).getConnectUri().toString(); - - return broker; - } - public void configureConnectionFactory(ActiveMQConnectionFactory factory) { // nothing to do } + @Test public void testFailoverProducerCloseBeforeTransaction() throws Exception { startCleanBroker(); ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("failover:(" + url + ")"); @@ -148,55 +124,31 @@ public class FailoverTransactionTest extends TestSupport { // restart to force failover and connection state recovery before the commit broker.stop(); - startBroker(false, url); + startBroker(); session.commit(); - assertNotNull("we got the message", consumer.receive(20000)); + Assert.assertNotNull("we got the message", consumer.receive(20000)); session.commit(); connection.close(); } - public void initCombosForTestFailoverCommitReplyLost() { - String osName = System.getProperty("os.name"); - Object[] persistenceAdapters; - if (!osName.equalsIgnoreCase("AIX") && !osName.equalsIgnoreCase("SunOS")) { - persistenceAdapters = new Object[]{PersistenceAdapterChoice.KahaDB, PersistenceAdapterChoice.LevelDB, PersistenceAdapterChoice.JDBC}; - } - else { - persistenceAdapters = new Object[]{PersistenceAdapterChoice.KahaDB, PersistenceAdapterChoice.JDBC}; - } - addCombinationValues("defaultPersistenceAdapter", persistenceAdapters); - } - - @SuppressWarnings("unchecked") + @Test + @BMRules( + rules = { + @BMRule( + name = "set no return response and stop the broker", + targetClass = "org.apache.activemq.artemis.core.protocol.openwire.OpenWireConnection", + targetMethod = "processCommitTransactionOnePhase", + targetLocation = "EXIT", + binding = "owconn:OpenWireConnection = $0; context = owconn.getContext()", + action = "org.apache.activemq.transport.failover.FailoverTransactionTest.holdResponseAndStopBroker(context)") + } + ) public void testFailoverCommitReplyLost() throws Exception { - broker = createBroker(true); - setDefaultPersistenceAdapter(broker); - - broker.setPlugins(new BrokerPlugin[]{new BrokerPluginSupport() { - @Override - public void commitTransaction(ConnectionContext context, - TransactionId xid, - boolean onePhase) throws Exception { - super.commitTransaction(context, xid, onePhase); - // so commit will hang as if reply is lost - context.setDontSendReponse(true); - Executors.newSingleThreadExecutor().execute(new Runnable() { - @Override - public void run() { - LOG.info("Stopping broker post commit..."); - try { - broker.stop(); - } - catch (Exception e) { - e.printStackTrace(); - } - } - }); - } - }}); - broker.start(); + broker = createBroker(); + startBrokerWithDurableQueue(); + doByteman.set(true); ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("failover:(" + url + ")"); configureConnectionFactory(cf); @@ -211,14 +163,13 @@ public class FailoverTransactionTest extends TestSupport { final CountDownLatch commitDoneLatch = new CountDownLatch(1); // broker will die on commit reply so this will hang till restart Executors.newSingleThreadExecutor().execute(new Runnable() { - @Override public void run() { LOG.info("doing async commit..."); try { session.commit(); } catch (JMSException e) { - assertTrue(e instanceof TransactionRolledBackException); + Assert.assertTrue(e instanceof TransactionRolledBackException); LOG.info("got commit exception: ", e); } commitDoneLatch.countDown(); @@ -227,29 +178,27 @@ public class FailoverTransactionTest extends TestSupport { }); // will be stopped by the plugin - broker.waitUntilStopped(); - broker = createBroker(false, url); - setDefaultPersistenceAdapter(broker); + brokerStopLatch.await(); + doByteman.set(false); + broker = createBroker(); broker.start(); - assertTrue("tx committed through failover", commitDoneLatch.await(30, TimeUnit.SECONDS)); + Assert.assertTrue("tx committed through failover", commitDoneLatch.await(30, TimeUnit.SECONDS)); // new transaction Message msg = consumer.receive(20000); LOG.info("Received: " + msg); - assertNotNull("we got the message", msg); - assertNull("we got just one message", consumer.receive(2000)); + Assert.assertNotNull("we got the message", msg); + Assert.assertNull("we got just one message", consumer.receive(2000)); session.commit(); consumer.close(); connection.close(); // ensure no dangling messages with fresh broker etc broker.stop(); - broker.waitUntilStopped(); LOG.info("Checking for remaining/hung messages.."); - broker = createBroker(false, url); - setDefaultPersistenceAdapter(broker); + broker = createBroker(); broker.start(); // after restart, ensure no dangling messages @@ -264,152 +213,38 @@ public class FailoverTransactionTest extends TestSupport { msg = consumer.receive(5000); } LOG.info("Received: " + msg); - assertNull("no messges left dangling but got: " + msg, msg); + Assert.assertNull("no messges left dangling but got: " + msg, msg); connection.close(); } @SuppressWarnings("unchecked") + @Test public void testFailoverCommitReplyLostWithDestinationPathSeparator() throws Exception { - - broker = createBroker(true); - setDefaultPersistenceAdapter(broker); - - broker.setPlugins(new BrokerPlugin[]{new DestinationPathSeparatorBroker(), new BrokerPluginSupport() { - @Override - public void commitTransaction(ConnectionContext context, - TransactionId xid, - boolean onePhase) throws Exception { - super.commitTransaction(context, xid, onePhase); - // so commit will hang as if reply is lost - context.setDontSendReponse(true); - Executors.newSingleThreadExecutor().execute(new Runnable() { - @Override - public void run() { - LOG.info("Stopping broker post commit..."); - try { - broker.stop(); - } - catch (Exception e) { - e.printStackTrace(); - } - } - }); - } - }}); - broker.start(); - - ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("failover:(" + url + ")"); - configureConnectionFactory(cf); - Connection connection = cf.createConnection(); - connection.start(); - final Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE); - Queue destination = session.createQueue(QUEUE_NAME.replace('.', '/') + "?consumer.prefetchSize=0"); - - MessageConsumer consumer = session.createConsumer(destination); - produceMessage(session, destination); - - final CountDownLatch commitDoneLatch = new CountDownLatch(1); - // broker will die on commit reply so this will hang till restart - Executors.newSingleThreadExecutor().execute(new Runnable() { - @Override - public void run() { - LOG.info("doing async commit..."); - try { - session.commit(); - } - catch (JMSException e) { - assertTrue(e instanceof TransactionRolledBackException); - LOG.info("got commit exception: ", e); - } - commitDoneLatch.countDown(); - LOG.info("done async commit"); - } - }); - - // will be stopped by the plugin - broker.waitUntilStopped(); - broker = createBroker(false, url); - setDefaultPersistenceAdapter(broker); - broker.setPlugins(new BrokerPlugin[]{new DestinationPathSeparatorBroker()}); - broker.start(); - - assertTrue("tx committed trough failover", commitDoneLatch.await(30, TimeUnit.SECONDS)); - - // new transaction - Message msg = consumer.receive(20000); - LOG.info("Received: " + msg); - assertNotNull("we got the message", msg); - assertNull("we got just one message", consumer.receive(2000)); - session.commit(); - consumer.close(); - connection.close(); - - // ensure no dangling messages with fresh broker etc - broker.stop(); - broker.waitUntilStopped(); - - LOG.info("Checking for remaining/hung messages.."); - broker = createBroker(false, url); - setDefaultPersistenceAdapter(broker); - broker.setPlugins(new BrokerPlugin[]{new DestinationPathSeparatorBroker()}); - broker.start(); - - // after restart, ensure no dangling messages - cf = new ActiveMQConnectionFactory("failover:(" + url + ")"); - configureConnectionFactory(cf); - connection = cf.createConnection(); - connection.start(); - Session session2 = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); - consumer = session2.createConsumer(destination); - msg = consumer.receive(1000); - if (msg == null) { - msg = consumer.receive(5000); - } - LOG.info("Received: " + msg); - assertNull("no messges left dangling but got: " + msg, msg); - connection.close(); - - ActiveMQDestination[] destinations = broker.getRegionBroker().getDestinations(); - for (ActiveMQDestination dest : destinations) { - LOG.info("Destinations list: " + dest); - } - assertEquals("Only one destination", 1, broker.getRegionBroker().getDestinations().length); - } - - public void initCombosForTestFailoverSendReplyLost() { - addCombinationValues("defaultPersistenceAdapter", new Object[]{PersistenceAdapterChoice.KahaDB, PersistenceAdapterChoice.JDBC - // not implemented for AMQ store or PersistenceAdapterChoice.LevelDB - }); + //the original test validates destinations using forward slash (/) as + //separators instead of dot (.). The broker internally uses a plugin + //called DestinationPathSeparatorBroker to convert every occurrence of + // "/" into "." inside the server. + //Artemis doesn't support "/" so far and this test doesn't make sense therefore. } @SuppressWarnings("unchecked") + @Test + @BMRules( + rules = { + @BMRule( + name = "set no return response and stop the broker", + targetClass = "org.apache.activemq.artemis.core.protocol.openwire.OpenWireConnection", + targetMethod = "processMessage", + targetLocation = "EXIT", + binding = "owconn:OpenWireConnection = $0; context = owconn.getContext()", + action = "org.apache.activemq.transport.failover.FailoverTransactionTest.holdResponseAndStopBroker(context)") + } + ) public void testFailoverSendReplyLost() throws Exception { - broker = createBroker(true); - setDefaultPersistenceAdapter(broker); - - broker.setPlugins(new BrokerPlugin[]{new BrokerPluginSupport() { - @Override - public void send(ProducerBrokerExchange producerExchange, - org.apache.activemq.command.Message messageSend) throws Exception { - // so send will hang as if reply is lost - super.send(producerExchange, messageSend); - producerExchange.getConnectionContext().setDontSendReponse(true); - Executors.newSingleThreadExecutor().execute(new Runnable() { - @Override - public void run() { - LOG.info("Stopping broker post send..."); - try { - broker.stop(); - } - catch (Exception e) { - e.printStackTrace(); - } - } - }); - } - }}); - broker.start(); + broker = createBroker(); + startBrokerWithDurableQueue(); + doByteman.set(true); ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("failover:(" + url + ")?jms.watchTopicAdvisories=false"); configureConnectionFactory(cf); @@ -422,7 +257,6 @@ public class FailoverTransactionTest extends TestSupport { final CountDownLatch sendDoneLatch = new CountDownLatch(1); // broker will die on send reply so this will hang till restart Executors.newSingleThreadExecutor().execute(new Runnable() { - @Override public void run() { LOG.info("doing async send..."); try { @@ -431,7 +265,7 @@ public class FailoverTransactionTest extends TestSupport { catch (JMSException e) { //assertTrue(e instanceof TransactionRolledBackException); LOG.error("got send exception: ", e); - fail("got unexpected send exception" + e); + Assert.fail("got unexpected send exception" + e); } sendDoneLatch.countDown(); LOG.info("done async send"); @@ -439,33 +273,27 @@ public class FailoverTransactionTest extends TestSupport { }); // will be stopped by the plugin - broker.waitUntilStopped(); - broker = createBroker(false, url); - setDefaultPersistenceAdapter(broker); + brokerStopLatch.await(); + doByteman.set(false); + broker = createBroker(); LOG.info("restarting...."); broker.start(); - assertTrue("message sent through failover", sendDoneLatch.await(30, TimeUnit.SECONDS)); + Assert.assertTrue("message sent through failover", sendDoneLatch.await(30, TimeUnit.SECONDS)); // new transaction Message msg = consumer.receive(20000); LOG.info("Received: " + msg); - assertNotNull("we got the message", msg); - assertNull("we got just one message", consumer.receive(2000)); + Assert.assertNotNull("we got the message", msg); + Assert.assertNull("we got just one message", consumer.receive(2000)); consumer.close(); connection.close(); - // verify stats - assertEquals("no newly queued messages", 0, ((RegionBroker) broker.getRegionBroker()).getDestinationStatistics().getEnqueues().getCount()); - assertEquals("1 dequeue", 1, ((RegionBroker) broker.getRegionBroker()).getDestinationStatistics().getDequeues().getCount()); - // ensure no dangling messages with fresh broker etc broker.stop(); - broker.waitUntilStopped(); LOG.info("Checking for remaining/hung messages with second restart.."); - broker = createBroker(false, url); - setDefaultPersistenceAdapter(broker); + broker = createBroker(); broker.start(); // after restart, ensure no dangling messages @@ -480,64 +308,33 @@ public class FailoverTransactionTest extends TestSupport { msg = consumer.receive(5000); } LOG.info("Received: " + msg); - assertNull("no messges left dangling but got: " + msg, msg); + Assert.assertNull("no messges left dangling but got: " + msg, msg); connection.close(); } - public void initCombosForTestFailoverConnectionSendReplyLost() { - addCombinationValues("defaultPersistenceAdapter", new Object[]{PersistenceAdapterChoice.KahaDB, PersistenceAdapterChoice.JDBC - // last producer message id store feature not implemented for AMQ store - // or PersistenceAdapterChoice.LevelDB - }); - } - @SuppressWarnings("unchecked") + @Test + @BMRules( + rules = { + @BMRule( + name = "set no return response and stop the broker", + targetClass = "org.apache.activemq.artemis.core.protocol.openwire.OpenWireConnection", + targetMethod = "processMessage", + targetLocation = "EXIT", + binding = "owconn:OpenWireConnection = $0; context = owconn.getContext()", + action = "org.apache.activemq.transport.failover.FailoverTransactionTest.holdResponseAndStopProxyOnFirstSend(context)") + } + ) public void testFailoverConnectionSendReplyLost() throws Exception { - broker = createBroker(true); - PersistenceAdapter store = setDefaultPersistenceAdapter(broker); - if (store instanceof KahaDBPersistenceAdapter) { - // duplicate checker not updated on canceled tasks, even it - // it was, recovery of the audit would fail as the message is - // not recorded in the store and the audit may not be up to date. - // So if duplicate messages are an absolute no no after restarts, - // ConcurrentStoreAndDispatchQueues must be disabled - ((KahaDBPersistenceAdapter) store).setConcurrentStoreAndDispatchQueues(false); - } - - final SocketProxy proxy = new SocketProxy(); - - broker.setPlugins(new BrokerPlugin[]{new BrokerPluginSupport() { - private boolean firstSend = true; - - @Override - public void send(ProducerBrokerExchange producerExchange, - org.apache.activemq.command.Message messageSend) throws Exception { - // so send will hang as if reply is lost - super.send(producerExchange, messageSend); - if (firstSend) { - firstSend = false; - - producerExchange.getConnectionContext().setDontSendReponse(true); - Executors.newSingleThreadExecutor().execute(new Runnable() { - @Override - public void run() { - LOG.info("Stopping connection post send..."); - try { - proxy.close(); - } - catch (Exception e) { - e.printStackTrace(); - } - } - }); - } - } - }}); - broker.start(); + broker = createBroker(); + proxy = new SocketProxy(); + firstSend = true; + startBrokerWithDurableQueue(); proxy.setTarget(new URI(url)); proxy.open(); + doByteman.set(true); ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("failover:(" + proxy.getUrl().toASCIIString() + ")?jms.watchTopicAdvisories=false"); configureConnectionFactory(cf); @@ -550,7 +347,6 @@ public class FailoverTransactionTest extends TestSupport { final CountDownLatch sendDoneLatch = new CountDownLatch(1); // proxy connection will die on send reply so this will hang on failover reconnect till open Executors.newSingleThreadExecutor().execute(new Runnable() { - @Override public void run() { LOG.info("doing async send..."); try { @@ -566,29 +362,24 @@ public class FailoverTransactionTest extends TestSupport { }); // will be closed by the plugin - assertTrue("proxy was closed", proxy.waitUntilClosed(30)); + Assert.assertTrue("proxy was closed", proxy.waitUntilClosed(30)); LOG.info("restarting proxy"); proxy.open(); - assertTrue("message sent through failover", sendDoneLatch.await(30, TimeUnit.SECONDS)); + Assert.assertTrue("message sent through failover", sendDoneLatch.await(30, TimeUnit.SECONDS)); Message msg = consumer.receive(20000); LOG.info("Received: " + msg); - assertNotNull("we got the message", msg); - assertNull("we got just one message", consumer.receive(2000)); + Assert.assertNotNull("we got the message", msg); + Assert.assertNull("we got just one message", consumer.receive(2000)); consumer.close(); connection.close(); - // verify stats, connection dup suppression means dups don't get to broker - assertEquals("one queued message", 1, ((RegionBroker) broker.getRegionBroker()).getDestinationStatistics().getEnqueues().getCount()); - // ensure no dangling messages with fresh broker etc broker.stop(); - broker.waitUntilStopped(); LOG.info("Checking for remaining/hung messages with restart.."); - broker = createBroker(false, url); - setDefaultPersistenceAdapter(broker); + broker = createBroker(); broker.start(); // after restart, ensure no dangling messages @@ -603,10 +394,11 @@ public class FailoverTransactionTest extends TestSupport { msg = consumer.receive(5000); } LOG.info("Received: " + msg); - assertNull("no messges left dangling but got: " + msg, msg); + Assert.assertNull("no messges left dangling but got: " + msg, msg); connection.close(); } + @Test public void testFailoverProducerCloseBeforeTransactionFailWhenDisabled() throws Exception { startCleanBroker(); ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("failover:(" + url + ")?trackTransactionProducers=false"); @@ -621,16 +413,17 @@ public class FailoverTransactionTest extends TestSupport { // restart to force failover and connection state recovery before the commit broker.stop(); - startBroker(false, url); + startBroker(); session.commit(); // without tracking producers, message will not be replayed on recovery - assertNull("we got the message", consumer.receive(5000)); + Assert.assertNull("we got the message", consumer.receive(5000)); session.commit(); connection.close(); } + @Test public void testFailoverMultipleProducerCloseBeforeTransaction() throws Exception { startCleanBroker(); ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("failover:(" + url + ")"); @@ -653,17 +446,18 @@ public class FailoverTransactionTest extends TestSupport { // restart to force failover and connection state recovery before the commit broker.stop(); - startBroker(false, url); + startBroker(); session.commit(); for (int i = 0; i < count; i++) { - assertNotNull("we got all the message: " + count, consumer.receive(20000)); + Assert.assertNotNull("we got all the message: " + count, consumer.receive(20000)); } session.commit(); connection.close(); } // https://issues.apache.org/activemq/browse/AMQ-2772 + @Test public void testFailoverWithConnectionConsumer() throws Exception { startCleanBroker(); ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("failover:(" + url + ")"); @@ -677,15 +471,12 @@ public class FailoverTransactionTest extends TestSupport { final CountDownLatch connectionConsumerGotOne = new CountDownLatch(1); final Session poolSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); connection.createConnectionConsumer(destination, null, new ServerSessionPool() { - @Override public ServerSession getServerSession() throws JMSException { return new ServerSession() { - @Override public Session getSession() throws JMSException { return poolSession; } - @Override public void start() throws JMSException { connectionConsumerGotOne.countDown(); poolSession.run(); @@ -707,18 +498,30 @@ public class FailoverTransactionTest extends TestSupport { // restart to force failover and connection state recovery before the commit broker.stop(); - startBroker(false, url); + startBroker(); session.commit(); for (int i = 0; i < count - 1; i++) { - assertNotNull("Failed to get message: " + count, consumer.receive(20000)); + Assert.assertNotNull("Failed to get message: " + count, consumer.receive(20000)); } session.commit(); connection.close(); - assertTrue("connectionconsumer did not get a message", connectionConsumerGotOne.await(10, TimeUnit.SECONDS)); + Assert.assertTrue("connectionconsumer did not get a message", connectionConsumerGotOne.await(10, TimeUnit.SECONDS)); } + @Test + @BMRules( + rules = { + @BMRule( + name = "set no return response and stop the broker", + targetClass = "org.apache.activemq.artemis.core.protocol.openwire.OpenWireConnection", + targetMethod = "processMessageAck", + targetLocation = "ENTRY", + binding = "owconn:OpenWireConnection = $0; context = owconn.getContext()", + action = "org.apache.activemq.transport.failover.FailoverTransactionTest.holdResponseAndStopBroker(context)") + } + ) public void testFailoverConsumerAckLost() throws Exception { // as failure depends on hash order of state tracker recovery, do a few times for (int i = 0; i < 3; i++) { @@ -734,31 +537,10 @@ public class FailoverTransactionTest extends TestSupport { @SuppressWarnings("unchecked") public void doTestFailoverConsumerAckLost(final int pauseSeconds) throws Exception { - broker = createBroker(true); - setDefaultPersistenceAdapter(broker); - - broker.setPlugins(new BrokerPlugin[]{new BrokerPluginSupport() { - - // broker is killed on delivered ack as prefetch is 1 - @Override - public void acknowledge(ConsumerBrokerExchange consumerExchange, final MessageAck ack) throws Exception { - - consumerExchange.getConnectionContext().setDontSendReponse(true); - Executors.newSingleThreadExecutor().execute(new Runnable() { - @Override - public void run() { - LOG.info("Stopping broker on ack: " + ack); - try { - broker.stop(); - } - catch (Exception e) { - e.printStackTrace(); - } - } - }); - } - }}); + broker = createBroker(); broker.start(); + brokerStopLatch = new CountDownLatch(1); + doByteman.set(true); Vector<Connection> connections = new Vector<>(); ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("failover:(" + url + ")"); @@ -789,7 +571,6 @@ public class FailoverTransactionTest extends TestSupport { final CountDownLatch commitDoneLatch = new CountDownLatch(1); final AtomicBoolean gotTransactionRolledBackException = new AtomicBoolean(false); Executors.newSingleThreadExecutor().execute(new Runnable() { - @Override public void run() { LOG.info("doing async commit after consume..."); try { @@ -839,12 +620,12 @@ public class FailoverTransactionTest extends TestSupport { }); // will be stopped by the plugin - broker.waitUntilStopped(); - broker = createBroker(false, url); - setDefaultPersistenceAdapter(broker); + brokerStopLatch.await(); + broker = createBroker(); broker.start(); + doByteman.set(false); - assertTrue("tx committed through failover", commitDoneLatch.await(30, TimeUnit.SECONDS)); + Assert.assertTrue("tx committed through failover", commitDoneLatch.await(30, TimeUnit.SECONDS)); LOG.info("received message count: " + receivedMessages.size()); @@ -852,10 +633,10 @@ public class FailoverTransactionTest extends TestSupport { Message msg = consumer1.receive(gotTransactionRolledBackException.get() ? 5000 : 20000); LOG.info("post: from consumer1 received: " + msg); if (gotTransactionRolledBackException.get()) { - assertNotNull("should be available again after commit rollback ex", msg); + Assert.assertNotNull("should be available again after commit rollback ex", msg); } else { - assertNull("should be nothing left for consumer as receive should have committed", msg); + Assert.assertNull("should be nothing left for consumer as receive should have committed", msg); } consumerSession1.commit(); @@ -864,7 +645,7 @@ public class FailoverTransactionTest extends TestSupport { // consumer2 should get other message msg = consumer2.receive(10000); LOG.info("post: from consumer2 received: " + msg); - assertNotNull("got second message on consumer2", msg); + Assert.assertNotNull("got second message on consumer2", msg); consumerSession2.commit(); } @@ -874,11 +655,9 @@ public class FailoverTransactionTest extends TestSupport { // ensure no dangling messages with fresh broker etc broker.stop(); - broker.waitUntilStopped(); LOG.info("Checking for remaining/hung messages.."); - broker = createBroker(false, url); - setDefaultPersistenceAdapter(broker); + broker = createBroker(); broker.start(); // after restart, ensure no dangling messages @@ -893,36 +672,29 @@ public class FailoverTransactionTest extends TestSupport { msg = sweeper.receive(5000); } LOG.info("Sweep received: " + msg); - assertNull("no messges left dangling but got: " + msg, msg); + Assert.assertNull("no messges left dangling but got: " + msg, msg); connection.close(); + + broker.stop(); } + @Test + @BMRules( + rules = { + @BMRule( + name = "set no return response and stop the broker", + targetClass = "org.apache.activemq.artemis.core.protocol.openwire.OpenWireConnection", + targetMethod = "processRemoveConsumer", + targetLocation = "ENTRY", + binding = "owconn:OpenWireConnection = $0; context = owconn.getContext()", + action = "org.apache.activemq.transport.failover.FailoverTransactionTest.stopBrokerOnCounter(context)") + } + ) public void testPoolingNConsumesAfterReconnect() throws Exception { - broker = createBroker(true); - setDefaultPersistenceAdapter(broker); + broker = createBroker(); + startBrokerWithDurableQueue(); - broker.setPlugins(new BrokerPlugin[]{new BrokerPluginSupport() { - int count = 0; - - @Override - public void removeConsumer(ConnectionContext context, final ConsumerInfo info) throws Exception { - if (count++ == 1) { - Executors.newSingleThreadExecutor().execute(new Runnable() { - @Override - public void run() { - LOG.info("Stopping broker on removeConsumer: " + info); - try { - broker.stop(); - } - catch (Exception e) { - e.printStackTrace(); - } - } - }); - } - } - }}); - broker.start(); + doByteman.set(true); Vector<Connection> connections = new Vector<>(); ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("failover:(" + url + ")"); @@ -951,6 +723,7 @@ public class FailoverTransactionTest extends TestSupport { for (int i = 0; i < consumerCount; i++) { consumers.push(consumerSession.createConsumer(destination)); } + final ExecutorService executorService = Executors.newCachedThreadPool(); final FailoverTransport failoverTransport = ((ActiveMQConnection) connection).getTransport().narrow(FailoverTransport.class); @@ -973,7 +746,6 @@ public class FailoverTransactionTest extends TestSupport { for (int i = 0; i < consumerCount && !consumers.isEmpty(); i++) { executorService.execute(new Runnable() { - @Override public void run() { MessageConsumer localConsumer = null; try { @@ -1011,9 +783,9 @@ public class FailoverTransactionTest extends TestSupport { consumer.close(); // will be stopped by the plugin - broker.waitUntilStopped(); - broker = createBroker(false, url); - setDefaultPersistenceAdapter(broker); + brokerStopLatch.await(); + doByteman.set(false); + broker = createBroker(); broker.start(); consumer = consumerSession.createConsumer(destination); @@ -1023,8 +795,9 @@ public class FailoverTransactionTest extends TestSupport { for (int i = 0; i < 4 && msg == null; i++) { msg = consumer.receive(1000); } + LOG.info("post: from consumer1 received: " + msg); - assertNotNull("got message after failover", msg); + Assert.assertNotNull("got message after failover", msg); msg.acknowledge(); for (Connection c : connections) { @@ -1032,8 +805,15 @@ public class FailoverTransactionTest extends TestSupport { } } + private void startBrokerWithDurableQueue() throws Exception { + broker.start(); + //auto created queue can't survive a restart, so we need this + broker.getJMSServerManager().createQueue(false, QUEUE_NAME, null, true, QUEUE_NAME); + } + + @Test public void testAutoRollbackWithMissingRedeliveries() throws Exception { - broker = createBroker(true); + broker = createBroker(); broker.start(); ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("failover:(" + url + ")"); configureConnectionFactory(cf); @@ -1047,32 +827,32 @@ public class FailoverTransactionTest extends TestSupport { produceMessage(producerSession, destination); Message msg = consumer.receive(20000); - assertNotNull(msg); + Assert.assertNotNull(msg); broker.stop(); - broker = createBroker(false, url); + broker = createBroker(); // use empty jdbc store so that default wait(0) for redeliveries will timeout after failover - setPersistenceAdapter(broker, PersistenceAdapterChoice.JDBC); broker.start(); try { consumerSession.commit(); - fail("expected transaciton rolledback ex"); + Assert.fail("expected transaciton rolledback ex"); } catch (TransactionRolledBackException expected) { } broker.stop(); - broker = createBroker(false, url); + broker = createBroker(); broker.start(); - assertNotNull("should get rolledback message from original restarted broker", consumer.receive(20000)); + Assert.assertNotNull("should get rolledback message from original restarted broker", consumer.receive(20000)); connection.close(); } + @Test public void testWaitForMissingRedeliveries() throws Exception { LOG.info("testWaitForMissingRedeliveries()"); - broker = createBroker(true); + broker = createBroker(); broker.start(); ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("failover:(" + url + ")?jms.consumerFailoverRedeliveryWaitPeriod=30000"); configureConnectionFactory(cf); @@ -1088,18 +868,15 @@ public class FailoverTransactionTest extends TestSupport { if (msg == null) { AutoFailTestSupport.dumpAllThreads("missing-"); } - assertNotNull("got message just produced", msg); + Assert.assertNotNull("got message just produced", msg); broker.stop(); - broker = createBroker(false, url); - // use empty jdbc store so that wait for re-deliveries occur when failover resumes - setPersistenceAdapter(broker, PersistenceAdapterChoice.JDBC); + broker = createBroker(); broker.start(); final CountDownLatch commitDone = new CountDownLatch(1); // will block pending re-deliveries Executors.newSingleThreadExecutor().execute(new Runnable() { - @Override public void run() { LOG.info("doing async commit..."); try { @@ -1112,18 +889,19 @@ public class FailoverTransactionTest extends TestSupport { }); broker.stop(); - broker = createBroker(false, url); + broker = createBroker(); broker.start(); - assertTrue("commit was successful", commitDone.await(30, TimeUnit.SECONDS)); + Assert.assertTrue("commit was successful", commitDone.await(30, TimeUnit.SECONDS)); - assertNull("should not get committed message", consumer.receive(5000)); + Assert.assertNull("should not get committed message", consumer.receive(5000)); connection.close(); } + @Test public void testReDeliveryWhilePending() throws Exception { LOG.info("testReDeliveryWhilePending()"); - broker = createBroker(true); + broker = createBroker(); broker.start(); ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("failover:(" + url + ")?jms.consumerFailoverRedeliveryWaitPeriod=10000"); configureConnectionFactory(cf); @@ -1139,13 +917,13 @@ public class FailoverTransactionTest extends TestSupport { if (msg == null) { AutoFailTestSupport.dumpAllThreads("missing-"); } - assertNotNull("got message just produced", msg); + Assert.assertNotNull("got message just produced", msg); // add another consumer into the mix that may get the message after restart MessageConsumer consumer2 = consumerSession.createConsumer(consumerSession.createQueue(QUEUE_NAME + "?consumer.prefetchSize=1")); broker.stop(); - broker = createBroker(false, url); + broker = createBroker(); broker.start(); final CountDownLatch commitDone = new CountDownLatch(1); @@ -1154,7 +932,6 @@ public class FailoverTransactionTest extends TestSupport { // commit may fail if other consumer gets the message on restart Executors.newSingleThreadExecutor().execute(new Runnable() { - @Override public void run() { LOG.info("doing async commit..."); try { @@ -1169,24 +946,24 @@ public class FailoverTransactionTest extends TestSupport { } }); - assertTrue("commit completed ", commitDone.await(15, TimeUnit.SECONDS)); + Assert.assertTrue("commit completed ", commitDone.await(15, TimeUnit.SECONDS)); // either message redelivered in existing tx or consumed by consumer2 // should not be available again in any event - assertNull("consumer should not get rolled back on non redelivered message or duplicate", consumer.receive(5000)); + Assert.assertNull("consumer should not get rolled back on non redelivered message or duplicate", consumer.receive(5000)); // consumer replay is hashmap order dependent on a failover connection state recover so need to deal with both cases if (exceptions.isEmpty()) { LOG.info("commit succeeded, message was redelivered to the correct consumer after restart so commit was fine"); - assertNull("consumer2 not get a second message consumed by 1", consumer2.receive(2000)); + Assert.assertNull("consumer2 not get a second message consumed by 1", consumer2.receive(2000)); } else { LOG.info("commit failed, consumer2 should get it", exceptions.get(0)); - assertNotNull("consumer2 got message", consumer2.receive(2000)); + Assert.assertNotNull("consumer2 got message", consumer2.receive(2000)); consumerSession.commit(); // no message should be in dlq MessageConsumer dlqConsumer = consumerSession.createConsumer(consumerSession.createQueue("ActiveMQ.DLQ")); - assertNull("nothing in the dlq", dlqConsumer.receive(5000)); + Assert.assertNull("nothing in the dlq", dlqConsumer.receive(5000)); } connection.close(); } @@ -1198,4 +975,63 @@ public class FailoverTransactionTest extends TestSupport { producer.close(); } + public static void holdResponseAndStopBroker(final AMQConnectionContext context) { + if (doByteman.get()) { + context.setDontSendReponse(true); + Executors.newSingleThreadExecutor().execute(new Runnable() { + public void run() { + LOG.info("Stopping broker post commit..."); + try { + broker.stop(); + } + catch (Exception e) { + e.printStackTrace(); + } + finally { + brokerStopLatch.countDown(); + } + } + }); + } + } + + public static void holdResponseAndStopProxyOnFirstSend(final AMQConnectionContext context) { + if (doByteman.get()) { + if (firstSend) { + firstSend = false; + context.setDontSendReponse(true); + Executors.newSingleThreadExecutor().execute(new Runnable() { + public void run() { + LOG.info("Stopping connection post send..."); + try { + proxy.close(); + } + catch (Exception e) { + e.printStackTrace(); + } + } + }); + } + } + } + + public static void stopBrokerOnCounter(final AMQConnectionContext context) { + if (doByteman.get()) { + if (count++ == 1) { + Executors.newSingleThreadExecutor().execute(new Runnable() { + public void run() { + try { + broker.stop(); + } + catch (Exception e) { + e.printStackTrace(); + } + finally { + brokerStopLatch.countDown(); + } + } + }); + } + } + } }
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6371d64f/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverTransportBackupsTest.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverTransportBackupsTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverTransportBackupsTest.java index 0ba3939..149af92 100644 --- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverTransportBackupsTest.java +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverTransportBackupsTest.java @@ -23,7 +23,8 @@ import static org.junit.Assert.assertTrue; import java.io.IOException; import java.net.URI; -import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.artemis.jms.server.embedded.EmbeddedJMS; +import org.apache.activemq.broker.artemiswrapper.OpenwireArtemisBaseTest; import org.apache.activemq.transport.Transport; import org.apache.activemq.transport.TransportFactory; import org.apache.activemq.transport.TransportListener; @@ -34,7 +35,7 @@ import org.junit.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public class FailoverTransportBackupsTest { +public class FailoverTransportBackupsTest extends OpenwireArtemisBaseTest { private static final Logger LOG = LoggerFactory.getLogger(FailoverTransportBackupsTest.class); @@ -43,23 +44,11 @@ public class FailoverTransportBackupsTest { private int transportInterruptions; private int transportResumptions; - BrokerService broker1; - BrokerService broker2; - BrokerService broker3; + EmbeddedJMS[] servers = new EmbeddedJMS[3]; @Before public void setUp() throws Exception { - broker1 = createBroker("1"); - broker2 = createBroker("2"); - broker3 = createBroker("3"); - - broker1.start(); - broker2.start(); - broker3.start(); - - broker1.waitUntilStarted(); - broker2.waitUntilStarted(); - broker3.waitUntilStarted(); + setUpClusterServers(servers); // Reset stats transportInterruptions = 0; @@ -71,13 +60,7 @@ public class FailoverTransportBackupsTest { if (transport != null) { transport.stop(); } - - broker1.stop(); - broker1.waitUntilStopped(); - broker2.stop(); - broker2.waitUntilStopped(); - broker3.stop(); - broker3.waitUntilStopped(); + shutDownClusterServers(servers); } @Test @@ -111,7 +94,7 @@ public class FailoverTransportBackupsTest { } })); - broker1.stop(); + servers[0].stop(); assertTrue("Timed out waiting for Backups to connect.", Wait.waitFor(new Wait.Condition() { @Override @@ -124,7 +107,7 @@ public class FailoverTransportBackupsTest { assertTrue("Incorrect number of Transport interruptions", transportInterruptions >= 1); assertTrue("Incorrect number of Transport resumptions", transportResumptions >= 1); - broker2.stop(); + servers[1].stop(); assertTrue("Timed out waiting for Backups to connect.", Wait.waitFor(new Wait.Condition() { @Override @@ -153,7 +136,7 @@ public class FailoverTransportBackupsTest { } })); - broker1.stop(); + servers[0].stop(); assertTrue("Timed out waiting for Backups to connect.", Wait.waitFor(new Wait.Condition() { @Override @@ -163,7 +146,7 @@ public class FailoverTransportBackupsTest { } })); - broker2.stop(); + servers[1].stop(); assertTrue("Timed out waiting for Backups to connect.", Wait.waitFor(new Wait.Condition() { @Override @@ -174,20 +157,11 @@ public class FailoverTransportBackupsTest { })); } - private BrokerService createBroker(String name) throws Exception { - BrokerService bs = new BrokerService(); - bs.setBrokerName(name); - bs.setUseJmx(false); - bs.setPersistent(false); - bs.addConnector("tcp://localhost:0"); - return bs; - } - protected Transport createTransport(int backups) throws Exception { String connectionUri = "failover://(" + - broker1.getTransportConnectors().get(0).getPublishableConnectString() + "," + - broker2.getTransportConnectors().get(0).getPublishableConnectString() + "," + - broker3.getTransportConnectors().get(0).getPublishableConnectString() + ")"; + newURI(0) + "," + + newURI(1) + "," + + newURI(2) + ")"; if (backups > 0) { connectionUri += "?randomize=false&backup=true&backupPoolSize=" + backups; http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6371d64f/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverTransportBrokerTest.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverTransportBrokerTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverTransportBrokerTest.java index 806faca..15d28d3 100644 --- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverTransportBrokerTest.java +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverTransportBrokerTest.java @@ -18,41 +18,205 @@ package org.apache.activemq.transport.failover; import java.io.IOException; import java.net.URI; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; import java.util.concurrent.TimeUnit; import javax.jms.DeliveryMode; +import javax.jms.MessageNotWriteableException; -import junit.framework.Test; - +import org.apache.activemq.artemis.core.config.Configuration; +import org.apache.activemq.artemis.jms.server.config.impl.JMSConfigurationImpl; +import org.apache.activemq.artemis.jms.server.embedded.EmbeddedJMS; import org.apache.activemq.broker.StubConnection; -import org.apache.activemq.broker.TransportConnector; +import org.apache.activemq.broker.artemiswrapper.OpenwireArtemisBaseTest; import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.ActiveMQQueue; +import org.apache.activemq.command.ActiveMQTextMessage; import org.apache.activemq.command.ActiveMQTopic; import org.apache.activemq.command.BrokerInfo; +import org.apache.activemq.command.ConnectionId; import org.apache.activemq.command.ConnectionInfo; import org.apache.activemq.command.ConsumerInfo; +import org.apache.activemq.command.Message; +import org.apache.activemq.command.MessageDispatch; +import org.apache.activemq.command.MessageId; import org.apache.activemq.command.ProducerInfo; import org.apache.activemq.command.SessionInfo; -import org.apache.activemq.network.NetworkTestSupport; import org.apache.activemq.transport.Transport; import org.apache.activemq.transport.TransportFactory; import org.apache.activemq.transport.TransportListener; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public class FailoverTransportBrokerTest extends NetworkTestSupport { +@RunWith(Parameterized.class) +public class FailoverTransportBrokerTest extends OpenwireArtemisBaseTest { private static final Logger LOG = LoggerFactory.getLogger(FailoverTransportBrokerTest.class); + protected ArrayList<StubConnection> connections = new ArrayList<StubConnection>(); + protected long idGenerator; + protected int msgIdGenerator; + protected int maxWait = 10000; + public static final boolean FAST_NO_MESSAGE_LEFT_ASSERT = System.getProperty("FAST_NO_MESSAGE_LEFT_ASSERT", "true").equals("true"); + + @Parameterized.Parameters + public static Collection<Object[]> getParams() + { + return Arrays.asList(new Object[][] { + {Integer.valueOf(DeliveryMode.NON_PERSISTENT), new ActiveMQQueue("TEST")}, + {Integer.valueOf(DeliveryMode.NON_PERSISTENT), new ActiveMQTopic("TEST")}, + {Integer.valueOf(DeliveryMode.PERSISTENT), new ActiveMQQueue("TEST")}, + {Integer.valueOf(DeliveryMode.PERSISTENT), new ActiveMQTopic("TEST")} + }); + } + + private EmbeddedJMS server; + private EmbeddedJMS remoteServer; public ActiveMQDestination destination; public int deliveryMode; - public void initCombosForTestPublisherFailsOver() { - addCombinationValues("deliveryMode", new Object[]{Integer.valueOf(DeliveryMode.NON_PERSISTENT), Integer.valueOf(DeliveryMode.PERSISTENT)}); - addCombinationValues("destination", new Object[]{new ActiveMQQueue("TEST"), new ActiveMQTopic("TEST")}); + public FailoverTransportBrokerTest(int deliveryMode, ActiveMQDestination destination) { + this.deliveryMode = deliveryMode; + this.destination = destination; + } + + @Before + public void setUp() throws Exception { + Configuration config0 = createConfig(0); + server = new EmbeddedJMS().setConfiguration(config0).setJmsConfiguration(new JMSConfigurationImpl()); + Configuration config1 = createConfig(1); + remoteServer = new EmbeddedJMS().setConfiguration(config1).setJmsConfiguration(new JMSConfigurationImpl()); + server.start(); + remoteServer.start(); + } + + @After + public void tearDown() throws Exception { + for (StubConnection conn : connections) { + try { + conn.stop(); + } + catch (Exception e) { + } + } + try { + remoteServer.stop(); + } + catch (Exception e) { + } + try { + server.stop(); + } + catch (Exception e) { + } + } + + protected StubConnection createConnection() throws Exception { + Transport transport = TransportFactory.connect(new URI(newURI(0))); + StubConnection connection = new StubConnection(transport); + connections.add(connection); + return connection; + } + + protected StubConnection createRemoteConnection() throws Exception { + Transport transport = TransportFactory.connect(new URI(newURI(1))); + StubConnection connection = new StubConnection(transport); + connections.add(connection); + return connection; + } + + protected ConnectionInfo createConnectionInfo() throws Exception { + ConnectionInfo info = new ConnectionInfo(); + info.setConnectionId(new ConnectionId("connection:" + (++idGenerator))); + info.setClientId(info.getConnectionId().getValue()); + return info; + } + + protected SessionInfo createSessionInfo(ConnectionInfo connectionInfo) throws Exception { + SessionInfo info = new SessionInfo(connectionInfo, ++idGenerator); + return info; + } + + protected ConsumerInfo createConsumerInfo(SessionInfo sessionInfo, + ActiveMQDestination destination) throws Exception { + ConsumerInfo info = new ConsumerInfo(sessionInfo, ++idGenerator); + info.setBrowser(false); + info.setDestination(destination); + info.setPrefetchSize(1000); + info.setDispatchAsync(false); + return info; + } + + protected ProducerInfo createProducerInfo(SessionInfo sessionInfo) throws Exception { + ProducerInfo info = new ProducerInfo(sessionInfo, ++idGenerator); + return info; } + protected Message createMessage(ProducerInfo producerInfo, ActiveMQDestination destination, int deliveryMode) { + Message message = createMessage(producerInfo, destination); + message.setPersistent(deliveryMode == DeliveryMode.PERSISTENT); + return message; + } + + protected Message createMessage(ProducerInfo producerInfo, ActiveMQDestination destination) { + ActiveMQTextMessage message = new ActiveMQTextMessage(); + message.setMessageId(new MessageId(producerInfo, ++msgIdGenerator)); + message.setDestination(destination); + message.setPersistent(false); + try { + message.setText("Test Message Payload."); + } + catch (MessageNotWriteableException e) { + } + return message; + } + + public Message receiveMessage(StubConnection connection) throws InterruptedException { + return receiveMessage(connection, maxWait); + } + + public Message receiveMessage(StubConnection connection, long timeout) throws InterruptedException { + while (true) { + Object o = connection.getDispatchQueue().poll(timeout, TimeUnit.MILLISECONDS); + + if (o == null) { + return null; + } + if (o instanceof MessageDispatch) { + + MessageDispatch dispatch = (MessageDispatch) o; + if (dispatch.getMessage() == null) { + return null; + } + dispatch.setMessage(dispatch.getMessage().copy()); + dispatch.getMessage().setRedeliveryCounter(dispatch.getRedeliveryCounter()); + return dispatch.getMessage(); + } + } + } + + protected void assertNoMessagesLeft(StubConnection connection) throws InterruptedException { + long wait = FAST_NO_MESSAGE_LEFT_ASSERT ? 0 : maxWait; + while (true) { + Object o = connection.getDispatchQueue().poll(wait, TimeUnit.MILLISECONDS); + if (o == null) { + return; + } + if (o instanceof MessageDispatch && ((MessageDispatch) o).getMessage() != null) { + Assert.fail("Received a message: " + ((MessageDispatch) o).getMessage().getMessageId()); + } + } + } + + @Test public void testPublisherFailsOver() throws Exception { // Start a normal consumer on the local broker @@ -92,19 +256,22 @@ public class FailoverTransportBrokerTest extends NetworkTestSupport { // See which broker we were connected to. StubConnection connectionA; StubConnection connectionB; - TransportConnector serverA; - if (connector.getServer().getConnectURI().equals(ft.getConnectedTransportURI())) { + + + EmbeddedJMS serverA; + + if (new URI(newURI(0)).equals(ft.getConnectedTransportURI())) { connectionA = connection1; connectionB = connection2; - serverA = connector; + serverA = server; } else { connectionA = connection2; connectionB = connection1; - serverA = remoteConnector; + serverA = remoteServer; } - assertNotNull(receiveMessage(connectionA)); + Assert.assertNotNull(receiveMessage(connectionA)); assertNoMessagesLeft(connectionB); // Dispose the server so that it fails over to the other server. @@ -113,7 +280,7 @@ public class FailoverTransportBrokerTest extends NetworkTestSupport { connection3.request(createMessage(producerInfo3, destination, deliveryMode)); - assertNotNull(receiveMessage(connectionB)); + Assert.assertNotNull(receiveMessage(connectionB)); assertNoMessagesLeft(connectionA); } @@ -150,34 +317,16 @@ public class FailoverTransportBrokerTest extends NetworkTestSupport { while (count++ < 20 && info[0] == null) { TimeUnit.SECONDS.sleep(1); } - assertNotNull("got a valid brokerInfo after 20 secs", info[0]); - assertNull("no peer brokers present", info[0].getPeerBrokerInfos()); - } - - @Override - protected String getLocalURI() { - return "tcp://localhost:0?wireFormat.tcpNoDelayEnabled=true"; - } - - @Override - protected String getRemoteURI() { - return "tcp://localhost:0?wireFormat.tcpNoDelayEnabled=true"; + Assert.assertNotNull("got a valid brokerInfo after 20 secs", info[0]); + Assert.assertNull("no peer brokers present", info[0].getPeerBrokerInfos()); } protected StubConnection createFailoverConnection(TransportListener listener) throws Exception { - URI failoverURI = new URI("failover://" + connector.getServer().getConnectURI() + "," + remoteConnector.getServer().getConnectURI() + ""); + URI failoverURI = new URI("failover://" + newURI(0) + "," + newURI(1) + ""); Transport transport = TransportFactory.connect(failoverURI); StubConnection connection = new StubConnection(transport, listener); connections.add(connection); return connection; } - public static Test suite() { - return suite(FailoverTransportBrokerTest.class); - } - - public static void main(String[] args) { - junit.textui.TestRunner.run(suite()); - } - } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6371d64f/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverTransportUriHandlingTest.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverTransportUriHandlingTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverTransportUriHandlingTest.java index 8155575..d64cc58 100644 --- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverTransportUriHandlingTest.java +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverTransportUriHandlingTest.java @@ -22,7 +22,6 @@ import java.lang.reflect.Field; import java.net.URI; import java.util.Collection; -import org.apache.activemq.transport.failover.FailoverTransport; import org.junit.Test; public class FailoverTransportUriHandlingTest { http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6371d64f/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverUpdateURIsTest.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverUpdateURIsTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverUpdateURIsTest.java index e792228..002a788 100644 --- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverUpdateURIsTest.java +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverUpdateURIsTest.java @@ -30,38 +30,46 @@ import javax.jms.Session; import junit.framework.TestCase; import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.artemis.core.config.Configuration; +import org.apache.activemq.artemis.jms.server.config.impl.JMSConfigurationImpl; +import org.apache.activemq.artemis.jms.server.embedded.EmbeddedJMS; import org.apache.activemq.broker.BrokerService; import org.apache.activemq.broker.TransportConnector; +import org.apache.activemq.broker.artemiswrapper.OpenwireArtemisBaseTest; import org.apache.activemq.network.NetworkConnector; import org.apache.log4j.Logger; +import org.junit.After; +import org.junit.Assert; +import org.junit.Test; -public class FailoverUpdateURIsTest extends TestCase { +public class FailoverUpdateURIsTest extends OpenwireArtemisBaseTest { private static final String QUEUE_NAME = "test.failoverupdateuris"; private static final Logger LOG = Logger.getLogger(FailoverUpdateURIsTest.class); - String firstTcpUri = "tcp://localhost:61616"; - String secondTcpUri = "tcp://localhost:61626"; + String firstTcpUri = newURI(0); + String secondTcpUri = newURI(10); Connection connection = null; - BrokerService bs1 = null; - BrokerService bs2 = null; + EmbeddedJMS server0 = null; + EmbeddedJMS server1 = null; - @Override + @After public void tearDown() throws Exception { if (connection != null) { connection.close(); } - if (bs1 != null) { - bs1.stop(); + if (server0 != null) { + server0.stop(); } - if (bs2 != null) { - bs2.stop(); + if (server1 != null) { + server1.stop(); } } + @Test public void testUpdateURIsViaFile() throws Exception { - String targetDir = "target/" + getName(); + String targetDir = "target/testUpdateURIsViaFile"; new File(targetDir).mkdir(); File updateFile = new File(targetDir + "/updateURIsFile.txt"); LOG.info(updateFile); @@ -72,8 +80,9 @@ public class FailoverUpdateURIsTest extends TestCase { out.write(firstTcpUri.getBytes()); out.close(); - bs1 = createBroker("bs1", firstTcpUri); - bs1.start(); + Configuration config0 = createConfig(0); + server0 = new EmbeddedJMS().setConfiguration(config0).setJmsConfiguration(new JMSConfigurationImpl()); + server0.start(); // no failover uri's to start with, must be read from file... ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("failover:()?updateURIsURL=file:///" + updateFile.getAbsoluteFile()); @@ -86,14 +95,14 @@ public class FailoverUpdateURIsTest extends TestCase { Message message = session.createTextMessage("Test message"); producer.send(message); Message msg = consumer.receive(2000); - assertNotNull(msg); + Assert.assertNotNull(msg); - bs1.stop(); - bs1.waitUntilStopped(); - bs1 = null; + server0.stop(); + server0 = null; - bs2 = createBroker("bs2", secondTcpUri); - bs2.start(); + Configuration config1 = createConfig(10); + server1 = new EmbeddedJMS().setConfiguration(config1).setJmsConfiguration(new JMSConfigurationImpl()); + server1.start(); // add the transport uri for broker number 2 out = new FileOutputStream(updateFile, true); @@ -103,25 +112,16 @@ public class FailoverUpdateURIsTest extends TestCase { producer.send(message); msg = consumer.receive(2000); - assertNotNull(msg); - } - - private BrokerService createBroker(String name, String tcpUri) throws Exception { - BrokerService bs = new BrokerService(); - bs.setBrokerName(name); - bs.setUseJmx(false); - bs.setPersistent(false); - bs.addConnector(tcpUri); - return bs; + Assert.assertNotNull(msg); } + @Test public void testAutoUpdateURIs() throws Exception { - - bs1 = new BrokerService(); - bs1.setUseJmx(false); - TransportConnector transportConnector = bs1.addConnector(firstTcpUri); - transportConnector.setUpdateClusterClients(true); - bs1.start(); + Configuration config0 = createConfig(0); + deployClusterConfiguration(config0, 10); + server0 = new EmbeddedJMS().setConfiguration(config0).setJmsConfiguration(new JMSConfigurationImpl()); + server0.start(); + Assert.assertTrue(server0.waitClusterForming(100, TimeUnit.MILLISECONDS, 20, 1)); ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("failover:(" + firstTcpUri + ")"); connection = cf.createConnection(); @@ -133,24 +133,23 @@ public class FailoverUpdateURIsTest extends TestCase { Message message = session.createTextMessage("Test message"); producer.send(message); Message msg = consumer.receive(4000); - assertNotNull(msg); + Assert.assertNotNull(msg); - bs2 = createBroker("bs2", secondTcpUri); - NetworkConnector networkConnector = bs2.addNetworkConnector("static:(" + firstTcpUri + ")"); - networkConnector.setDuplex(true); - bs2.start(); - LOG.info("started brokerService 2"); - bs2.waitUntilStarted(); + Configuration config1 = createConfig(10); + deployClusterConfiguration(config1, 0); + server1 = new EmbeddedJMS().setConfiguration(config1).setJmsConfiguration(new JMSConfigurationImpl()); + server1.start(); + Assert.assertTrue(server0.waitClusterForming(100, TimeUnit.MILLISECONDS, 20, 2)); + Assert.assertTrue(server1.waitClusterForming(100, TimeUnit.MILLISECONDS, 20, 2)); TimeUnit.SECONDS.sleep(4); LOG.info("stopping brokerService 1"); - bs1.stop(); - bs1.waitUntilStopped(); - bs1 = null; + server0.stop(); + server0 = null; producer.send(message); msg = consumer.receive(4000); - assertNotNull(msg); + Assert.assertNotNull(msg); } } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6371d64f/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverUriTest.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverUriTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverUriTest.java index ae637ef..a028832 100644 --- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverUriTest.java +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverUriTest.java @@ -43,4 +43,5 @@ public class FailoverUriTest extends TransportUriTest { public static Test suite() { return suite(FailoverUriTest.class); } + } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6371d64f/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/InitalReconnectDelayTest.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/InitalReconnectDelayTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/InitalReconnectDelayTest.java index 34e7333..dad241c 100644 --- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/InitalReconnectDelayTest.java +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/InitalReconnectDelayTest.java @@ -18,6 +18,7 @@ package org.apache.activemq.transport.failover; import java.io.IOException; import java.util.Date; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import javax.jms.Connection; import javax.jms.Message; @@ -26,9 +27,13 @@ import javax.jms.Queue; import javax.jms.Session; import org.apache.activemq.ActiveMQConnectionFactory; -import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.artemis.core.config.Configuration; +import org.apache.activemq.artemis.jms.server.config.impl.JMSConfigurationImpl; +import org.apache.activemq.artemis.jms.server.embedded.EmbeddedJMS; +import org.apache.activemq.broker.artemiswrapper.OpenwireArtemisBaseTest; import org.apache.activemq.transport.TransportListener; import org.junit.After; +import org.junit.Assert; import org.junit.Before; import org.junit.Test; import org.slf4j.Logger; @@ -36,19 +41,20 @@ import org.slf4j.LoggerFactory; import static org.junit.Assert.*; -public class InitalReconnectDelayTest { +public class InitalReconnectDelayTest extends OpenwireArtemisBaseTest { private static final transient Logger LOG = LoggerFactory.getLogger(InitalReconnectDelayTest.class); - protected BrokerService broker1; - protected BrokerService broker2; + protected EmbeddedJMS server1; + protected EmbeddedJMS server2; + +// protected BrokerService broker1; +// protected BrokerService broker2; @Test public void testInitialReconnectDelay() throws Exception { - String uriString = "failover://(tcp://localhost:" + - broker1.getTransportConnectors().get(0).getConnectUri().getPort() + - ",tcp://localhost:" + - broker2.getTransportConnectors().get(0).getConnectUri().getPort() + + String uriString = "failover://(" + newURI(1) + + "," + newURI(2) + ")?randomize=false&initialReconnectDelay=15000"; ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(uriString); @@ -67,7 +73,7 @@ public class InitalReconnectDelayTest { //Halt the broker1... LOG.info("Stopping the Broker1..."); start = (new Date()).getTime(); - broker1.stop(); + server1.stop(); LOG.info("Attempting to send... failover should kick in..."); producer.send(session.createTextMessage("TEST")); @@ -81,10 +87,8 @@ public class InitalReconnectDelayTest { @Test public void testNoSuspendedCallbackOnNoReconnect() throws Exception { - String uriString = "failover://(tcp://localhost:" + - broker1.getTransportConnectors().get(0).getConnectUri().getPort() + - ",tcp://localhost:" + - broker2.getTransportConnectors().get(0).getConnectUri().getPort() + + String uriString = "failover://(" + newURI(1) + + "," + newURI(2) + ")?randomize=false&maxReconnectAttempts=0"; ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(uriString); @@ -124,7 +128,7 @@ public class InitalReconnectDelayTest { calls.set(0); LOG.info("Stopping the Broker1..."); - broker1.stop(); + server1.stop(); LOG.info("Attempting to send... failover should throw on disconnect"); try { @@ -140,25 +144,19 @@ public class InitalReconnectDelayTest { @Before public void setUp() throws Exception { - final String dataDir = "target/data/shared"; + Configuration config1 = createConfig(1); + Configuration config2 = createConfig(2); - broker1 = new BrokerService(); + deployClusterConfiguration(config1, 2); + deployClusterConfiguration(config2, 1); - broker1.setBrokerName("broker1"); - broker1.setDeleteAllMessagesOnStartup(true); - broker1.setDataDirectory(dataDir); - broker1.addConnector("tcp://localhost:0"); - broker1.setUseJmx(false); - broker1.start(); - broker1.waitUntilStarted(); + server1 = new EmbeddedJMS().setConfiguration(config1).setJmsConfiguration(new JMSConfigurationImpl()); + server2 = new EmbeddedJMS().setConfiguration(config2).setJmsConfiguration(new JMSConfigurationImpl()); - broker2 = new BrokerService(); - broker2.setBrokerName("broker2"); - broker2.setDataDirectory(dataDir); - broker2.setUseJmx(false); - broker2.addConnector("tcp://localhost:0"); - broker2.start(); - broker2.waitUntilStarted(); + server1.start(); + server2.start(); + Assert.assertTrue(server1.waitClusterForming(100, TimeUnit.MILLISECONDS, 20, 2)); + Assert.assertTrue(server2.waitClusterForming(100, TimeUnit.MILLISECONDS, 20, 2)); } @@ -172,16 +170,8 @@ public class InitalReconnectDelayTest { @After public void tearDown() throws Exception { - - if (broker1.isStarted()) { - broker1.stop(); - broker1.waitUntilStopped(); - } - - if (broker2.isStarted()) { - broker2.stop(); - broker2.waitUntilStopped(); - } + server1.stop(); + server2.stop(); } } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6371d64f/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/ReconnectTest.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/ReconnectTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/ReconnectTest.java index 4ba5516..83d43af 100644 --- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/ReconnectTest.java +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/ReconnectTest.java @@ -28,29 +28,33 @@ import javax.jms.JMSException; import javax.jms.MessageConsumer; import javax.jms.MessageProducer; import javax.jms.Session; - -import junit.framework.TestCase; +import javax.jms.TextMessage; import org.apache.activemq.ActiveMQConnection; import org.apache.activemq.ActiveMQConnectionFactory; -import org.apache.activemq.broker.BrokerService; -import org.apache.activemq.broker.TransportConnector; +import org.apache.activemq.artemis.core.config.Configuration; +import org.apache.activemq.artemis.jms.server.config.impl.JMSConfigurationImpl; +import org.apache.activemq.artemis.jms.server.embedded.EmbeddedJMS; +import org.apache.activemq.broker.artemiswrapper.OpenwireArtemisBaseTest; import org.apache.activemq.command.ActiveMQQueue; import org.apache.activemq.transport.TransportListener; import org.apache.activemq.transport.mock.MockTransport; -import org.apache.activemq.util.ServiceStopper; import org.apache.activemq.util.Wait; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public class ReconnectTest extends TestCase { +public class ReconnectTest extends OpenwireArtemisBaseTest { public static final int MESSAGES_PER_ITTERATION = 10; public static final int WORKER_COUNT = 10; private static final Logger LOG = LoggerFactory.getLogger(ReconnectTest.class); - private BrokerService bs; + private EmbeddedJMS bs; private URI tcpUri; private final AtomicInteger resumedCount = new AtomicInteger(); private final AtomicInteger interruptedCount = new AtomicInteger(); @@ -102,7 +106,7 @@ public class ReconnectTest extends TestCase { } public void start() { - new Thread(this).start(); + new Thread(this, name).start(); } public void stop() { @@ -129,13 +133,19 @@ public class ReconnectTest extends TestCase { MessageConsumer consumer = session.createConsumer(queue); MessageProducer producer = session.createProducer(queue); producer.setDeliveryMode(DeliveryMode.PERSISTENT); + while (!stop.get()) { + for (int i = 0; i < MESSAGES_PER_ITTERATION; i++) { - producer.send(session.createTextMessage("TEST:" + i)); + TextMessage text = session.createTextMessage(name + " TEST:" + i); + text.setStringProperty("myprop", name + " TEST:" + i); + producer.send(text); } + for (int i = 0; i < MESSAGES_PER_ITTERATION; i++) { - consumer.receive(); + TextMessage m = (TextMessage) consumer.receive(); } + iterations.incrementAndGet(); } session.close(); @@ -159,11 +169,12 @@ public class ReconnectTest extends TestCase { public synchronized void assertNoErrors() { if (error != null) { error.printStackTrace(); - fail("Worker " + name + " got Exception: " + error); + Assert.fail("Worker " + name + " got Exception: " + error); } } } + @Test public void testReconnects() throws Exception { for (int k = 1; k < 10; k++) { @@ -181,7 +192,7 @@ public class ReconnectTest extends TestCase { LOG.info("Test run " + k + ": Waiting for worker " + i + " to finish an iteration."); Thread.sleep(1000); } - assertTrue("Test run " + k + ": Worker " + i + " never completed an interation.", c != 0); + Assert.assertTrue("Test run " + k + ": Worker " + i + " never completed an interation.", c != 0); workers[i].assertNoErrors(); } @@ -192,7 +203,7 @@ public class ReconnectTest extends TestCase { workers[i].failConnection(); } - assertTrue("Timed out waiting for all connections to be interrupted.", Wait.waitFor(new Wait.Condition() { + Assert.assertTrue("Timed out waiting for all connections to be interrupted.", Wait.waitFor(new Wait.Condition() { @Override public boolean isSatisified() throws Exception { LOG.debug("Test run waiting for connections to get interrupted.. at: " + interruptedCount.get()); @@ -201,7 +212,7 @@ public class ReconnectTest extends TestCase { }, TimeUnit.SECONDS.toMillis(60))); // Wait for the connections to re-establish... - assertTrue("Timed out waiting for all connections to be resumed.", Wait.waitFor(new Wait.Condition() { + Assert.assertTrue("Timed out waiting for all connections to be resumed.", Wait.waitFor(new Wait.Condition() { @Override public boolean isSatisified() throws Exception { LOG.debug("Test run waiting for connections to get resumed.. at: " + resumedCount.get()); @@ -220,26 +231,25 @@ public class ReconnectTest extends TestCase { } } - @Override - protected void setUp() throws Exception { - bs = new BrokerService(); - bs.setPersistent(false); - bs.setUseJmx(true); - TransportConnector connector = bs.addConnector("tcp://localhost:0"); + @Before + public void setUp() throws Exception { + Configuration config = createConfig(0); + bs = new EmbeddedJMS().setConfiguration(config).setJmsConfiguration(new JMSConfigurationImpl()); bs.start(); - tcpUri = connector.getConnectUri(); + tcpUri = new URI(newURI(0)); + workers = new Worker[WORKER_COUNT]; for (int i = 0; i < WORKER_COUNT; i++) { - workers[i] = new Worker("" + i); + workers[i] = new Worker("worker-" + i); workers[i].start(); } } - @Override - protected void tearDown() throws Exception { + @After + public void tearDown() throws Exception { for (int i = 0; i < WORKER_COUNT; i++) { workers[i].stop(); } - new ServiceStopper().stop(bs); + bs.stop(); } } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6371d64f/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/SlowConnectionTest.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/SlowConnectionTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/SlowConnectionTest.java index 3a55473..ed6040d 100644 --- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/SlowConnectionTest.java +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/SlowConnectionTest.java @@ -24,15 +24,16 @@ import java.util.concurrent.CountDownLatch; import javax.jms.Connection; import javax.net.ServerSocketFactory; -import junit.framework.TestCase; - import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.util.Wait; +import org.junit.Assert; +import org.junit.Test; -public class SlowConnectionTest extends TestCase { +public class SlowConnectionTest { private CountDownLatch socketReadyLatch = new CountDownLatch(1); + @Test public void testSlowConnection() throws Exception { MockBroker broker = new MockBroker(); @@ -57,7 +58,7 @@ public class SlowConnectionTest extends TestCase { }).start(); int count = 0; - assertTrue("Transport count: " + count + ", expected <= 1", Wait.waitFor(new Wait.Condition() { + Assert.assertTrue("Transport count: " + count + ", expected <= 1", Wait.waitFor(new Wait.Condition() { @Override public boolean isSatisified() throws Exception { int count = 0;