Fix thread leak in tests
Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/e30a1868 Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/e30a1868 Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/e30a1868 Branch: refs/heads/refactor-openwire Commit: e30a186853eda5b714c6a95e3b2d400038908335 Parents: b333a8f Author: Howard Gao <[email protected]> Authored: Wed Mar 16 20:41:40 2016 +0800 Committer: Clebert Suconic <[email protected]> Committed: Thu Mar 17 14:10:46 2016 -0400 ---------------------------------------------------------------------- .../artemiswrapper/OpenwireArtemisBaseTest.java | 2 - .../transport/failover/FailoverTimeoutTest.java | 57 +-- .../failover/FailoverTransactionTest.java | 373 ++++++++++--------- .../failover/TwoBrokerFailoverClusterTest.java | 12 +- 4 files changed, 235 insertions(+), 209 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/e30a1868/tests/activemq5-unit-tests/src/main/java/org/apache/activemq/broker/artemiswrapper/OpenwireArtemisBaseTest.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/main/java/org/apache/activemq/broker/artemiswrapper/OpenwireArtemisBaseTest.java b/tests/activemq5-unit-tests/src/main/java/org/apache/activemq/broker/artemiswrapper/OpenwireArtemisBaseTest.java index 2f3a330..b523433 100644 --- a/tests/activemq5-unit-tests/src/main/java/org/apache/activemq/broker/artemiswrapper/OpenwireArtemisBaseTest.java +++ b/tests/activemq5-unit-tests/src/main/java/org/apache/activemq/broker/artemiswrapper/OpenwireArtemisBaseTest.java @@ -34,7 +34,6 @@ import org.apache.activemq.artemis.core.settings.impl.AddressSettings; import org.apache.activemq.artemis.jms.server.config.impl.JMSConfigurationImpl; import org.apache.activemq.artemis.jms.server.embedded.EmbeddedJMS; import org.apache.activemq.artemis.tests.util.ThreadLeakCheckRule; -import org.apache.activemq.artemis.utils.uri.URISchema; import org.apache.activemq.artemis.utils.uri.URISupport; import org.apache.activemq.broker.BrokerService; import org.junit.Assert; @@ -47,7 +46,6 @@ import javax.management.MBeanServerInvocationHandler; import javax.management.ObjectName; public class OpenwireArtemisBaseTest { - @Rule public CleanupThreadRule cleanupRules = new CleanupThreadRule(); http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/e30a1868/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverTimeoutTest.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverTimeoutTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverTimeoutTest.java index c5ee02f..72b8c43 100644 --- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverTimeoutTest.java +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverTimeoutTest.java @@ -33,7 +33,6 @@ import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.artemis.core.config.Configuration; import org.apache.activemq.artemis.jms.server.config.impl.JMSConfigurationImpl; import org.apache.activemq.artemis.jms.server.embedded.EmbeddedJMS; -import org.apache.activemq.broker.BrokerService; import org.apache.activemq.broker.artemiswrapper.OpenwireArtemisBaseTest; import org.junit.After; import org.junit.Before; @@ -97,28 +96,35 @@ public class FailoverTimeoutTest extends OpenwireArtemisBaseTest { long timeout = 1000; ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("failover:(" + tcpUri + ")?timeout=" + timeout + "&useExponentialBackOff=false"); Connection connection = cf.createConnection(); - Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); - MessageProducer producer = session.createProducer(session.createQueue(QUEUE_NAME)); - TextMessage message = session.createTextMessage("Test message"); - producer.send(message); - - server.stop(); - try { + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + MessageProducer producer = session.createProducer(session.createQueue(QUEUE_NAME)); + TextMessage message = session.createTextMessage("Test message"); producer.send(message); - } - catch (JMSException jmse) { - assertEquals("Failover timeout of " + timeout + " ms reached.", jmse.getMessage()); - } - Configuration config = createConfig(0); - server = new EmbeddedJMS().setConfiguration(config).setJmsConfiguration(new JMSConfigurationImpl()); - server.start(); + server.stop(); - producer.send(message); + try { + producer.send(message); + } + catch (JMSException jmse) { + assertEquals("Failover timeout of " + timeout + " ms reached.", jmse.getMessage()); + } - server.stop(); - server = null; + Configuration config = createConfig(0); + server = new EmbeddedJMS().setConfiguration(config).setJmsConfiguration(new JMSConfigurationImpl()); + server.start(); + + producer.send(message); + + server.stop(); + server = null; + } + finally { + if (connection != null) { + connection.close(); + } + } } @Test @@ -126,10 +132,17 @@ public class FailoverTimeoutTest extends OpenwireArtemisBaseTest { ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("failover:(" + tcpUri + ")?useExponentialBackOff=false"); ActiveMQConnection connection = (ActiveMQConnection) cf.createConnection(); - connection.start(); - FailoverTransport failoverTransport = connection.getTransport().narrow(FailoverTransport.class); + try { + connection.start(); + FailoverTransport failoverTransport = connection.getTransport().narrow(FailoverTransport.class); - URI[] bunchOfUnknownAndOneKnown = new URI[]{new URI("tcp://unknownHost:" + tcpUri.getPort()), new URI("tcp://unknownHost2:" + tcpUri.getPort()), new URI("tcp://localhost:2222")}; - failoverTransport.add(false, bunchOfUnknownAndOneKnown); + URI[] bunchOfUnknownAndOneKnown = new URI[]{new URI("tcp://unknownHost:" + tcpUri.getPort()), new URI("tcp://unknownHost2:" + tcpUri.getPort()), new URI("tcp://localhost:2222")}; + failoverTransport.add(false, bunchOfUnknownAndOneKnown); + } + finally { + if (connection != null) { + connection.close(); + } + } } } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/e30a1868/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverTransactionTest.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverTransactionTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverTransactionTest.java index 6cd6942..4aaec57 100644 --- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverTransactionTest.java +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverTransactionTest.java @@ -21,7 +21,6 @@ import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.ActiveMQMessageConsumer; import org.apache.activemq.AutoFailTestSupport; import org.apache.activemq.artemis.core.protocol.openwire.OpenWireConnection; -import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQConnectionContext; import org.apache.activemq.artemis.jms.server.embedded.EmbeddedJMS; import org.apache.activemq.broker.artemiswrapper.OpenwireArtemisBaseTest; import org.apache.activemq.transport.TransportListener; @@ -541,114 +540,120 @@ public class FailoverTransactionTest extends OpenwireArtemisBaseTest { doByteman.set(true); Vector<Connection> connections = new Vector<>(); + Connection connection = null; + Message msg = null; + Queue destination = null; ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("failover:(" + url + ")"); - configureConnectionFactory(cf); - Connection connection = cf.createConnection(); - connection.start(); - connections.add(connection); - final Session producerSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); - final Queue destination = producerSession.createQueue(QUEUE_NAME + "?consumer.prefetchSize=1"); - - connection = cf.createConnection(); - connection.start(); - connections.add(connection); - final Session consumerSession1 = connection.createSession(true, Session.AUTO_ACKNOWLEDGE); - - connection = cf.createConnection(); - connection.start(); - connections.add(connection); - final Session consumerSession2 = connection.createSession(true, Session.AUTO_ACKNOWLEDGE); - - final MessageConsumer consumer1 = consumerSession1.createConsumer(destination); - final MessageConsumer consumer2 = consumerSession2.createConsumer(destination); - - produceMessage(producerSession, destination); - produceMessage(producerSession, destination); - - final Vector<Message> receivedMessages = new Vector<>(); - final CountDownLatch commitDoneLatch = new CountDownLatch(1); - final AtomicBoolean gotTransactionRolledBackException = new AtomicBoolean(false); - new Thread() { - public void run() { - LOG.info("doing async commit after consume..."); - try { - Message msg = consumer1.receive(20000); - LOG.info("consumer1 first attempt got message: " + msg); - receivedMessages.add(msg); - - // give some variance to the runs - TimeUnit.SECONDS.sleep(pauseSeconds * 2); - - // should not get a second message as there are two messages and two consumers - // and prefetch=1, but with failover and unordered connection restore it can get the second - // message. - - // For the transaction to complete it needs to get the same one or two messages - // again so that the acks line up. - // If redelivery order is different, the commit should fail with an ex - // - msg = consumer1.receive(5000); - LOG.info("consumer1 second attempt got message: " + msg); - if (msg != null) { + try { + configureConnectionFactory(cf); + connection = cf.createConnection(); + connection.start(); + connections.add(connection); + final Session producerSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + destination = producerSession.createQueue(QUEUE_NAME + "?consumer.prefetchSize=1"); + + connection = cf.createConnection(); + connection.start(); + connections.add(connection); + final Session consumerSession1 = connection.createSession(true, Session.AUTO_ACKNOWLEDGE); + + connection = cf.createConnection(); + connection.start(); + connections.add(connection); + final Session consumerSession2 = connection.createSession(true, Session.AUTO_ACKNOWLEDGE); + + final MessageConsumer consumer1 = consumerSession1.createConsumer(destination); + final MessageConsumer consumer2 = consumerSession2.createConsumer(destination); + + produceMessage(producerSession, destination); + produceMessage(producerSession, destination); + + final Vector<Message> receivedMessages = new Vector<>(); + final CountDownLatch commitDoneLatch = new CountDownLatch(1); + final AtomicBoolean gotTransactionRolledBackException = new AtomicBoolean(false); + new Thread() { + public void run() { + LOG.info("doing async commit after consume..."); + try { + Message msg = consumer1.receive(20000); + LOG.info("consumer1 first attempt got message: " + msg); receivedMessages.add(msg); - } - LOG.info("committing consumer1 session: " + receivedMessages.size() + " messsage(s)"); - try { - consumerSession1.commit(); - } - catch (JMSException expectedSometimes) { - LOG.info("got exception ex on commit", expectedSometimes); - if (expectedSometimes instanceof TransactionRolledBackException) { - gotTransactionRolledBackException.set(true); - // ok, message one was not replayed so we expect the rollback + // give some variance to the runs + TimeUnit.SECONDS.sleep(pauseSeconds * 2); + + // should not get a second message as there are two messages and two consumers + // and prefetch=1, but with failover and unordered connection restore it can get the second + // message. + + // For the transaction to complete it needs to get the same one or two messages + // again so that the acks line up. + // If redelivery order is different, the commit should fail with an ex + // + msg = consumer1.receive(5000); + LOG.info("consumer1 second attempt got message: " + msg); + if (msg != null) { + receivedMessages.add(msg); } - else { - throw expectedSometimes; + + LOG.info("committing consumer1 session: " + receivedMessages.size() + " messsage(s)"); + try { + consumerSession1.commit(); } + catch (JMSException expectedSometimes) { + LOG.info("got exception ex on commit", expectedSometimes); + if (expectedSometimes instanceof TransactionRolledBackException) { + gotTransactionRolledBackException.set(true); + // ok, message one was not replayed so we expect the rollback + } + else { + throw expectedSometimes; + } + } + commitDoneLatch.countDown(); + LOG.info("done async commit"); + } + catch (Exception e) { + e.printStackTrace(); } - commitDoneLatch.countDown(); - LOG.info("done async commit"); - } - catch (Exception e) { - e.printStackTrace(); } - } - }.start(); + }.start(); - // will be stopped by the plugin - brokerStopLatch.await(); - broker = createBroker(); - broker.start(); - doByteman.set(false); + // will be stopped by the plugin + brokerStopLatch.await(); + broker = createBroker(); + broker.start(); + doByteman.set(false); - Assert.assertTrue("tx committed through failover", commitDoneLatch.await(30, TimeUnit.SECONDS)); + Assert.assertTrue("tx committed through failover", commitDoneLatch.await(30, TimeUnit.SECONDS)); - LOG.info("received message count: " + receivedMessages.size()); + LOG.info("received message count: " + receivedMessages.size()); - // new transaction - Message msg = consumer1.receive(gotTransactionRolledBackException.get() ? 5000 : 20000); - LOG.info("post: from consumer1 received: " + msg); - if (gotTransactionRolledBackException.get()) { - Assert.assertNotNull("should be available again after commit rollback ex", msg); - } - else { - Assert.assertNull("should be nothing left for consumer as receive should have committed", msg); - } - consumerSession1.commit(); - - if (gotTransactionRolledBackException.get() || !gotTransactionRolledBackException.get() && receivedMessages.size() == 1) { - // just one message successfully consumed or none consumed - // consumer2 should get other message - msg = consumer2.receive(10000); - LOG.info("post: from consumer2 received: " + msg); - Assert.assertNotNull("got second message on consumer2", msg); - consumerSession2.commit(); + // new transaction + msg = consumer1.receive(gotTransactionRolledBackException.get() ? 5000 : 20000); + LOG.info("post: from consumer1 received: " + msg); + if (gotTransactionRolledBackException.get()) { + Assert.assertNotNull("should be available again after commit rollback ex", msg); + } + else { + Assert.assertNull("should be nothing left for consumer as receive should have committed", msg); + } + consumerSession1.commit(); + + if (gotTransactionRolledBackException.get() || !gotTransactionRolledBackException.get() && receivedMessages.size() == 1) { + // just one message successfully consumed or none consumed + // consumer2 should get other message + msg = consumer2.receive(10000); + LOG.info("post: from consumer2 received: " + msg); + Assert.assertNotNull("got second message on consumer2", msg); + consumerSession2.commit(); + } } - - for (Connection c : connections) { - c.close(); + finally { + for (Connection c : connections) { + c.close(); + } } // ensure no dangling messages with fresh broker etc @@ -694,111 +699,115 @@ public class FailoverTransactionTest extends OpenwireArtemisBaseTest { doByteman.set(true); Vector<Connection> connections = new Vector<>(); - ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("failover:(" + url + ")"); - configureConnectionFactory(cf); - Connection connection = cf.createConnection(); - connection.start(); - Session producerSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); - final Queue destination = producerSession.createQueue(QUEUE_NAME + "?consumer.prefetchSize=1"); - - produceMessage(producerSession, destination); - connection.close(); - - connection = cf.createConnection(); - connection.start(); - connections.add(connection); - final Session consumerSession = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE); - - final int sessionCount = 10; - final Stack<Session> sessions = new Stack<>(); - for (int i = 0; i < sessionCount; i++) { - sessions.push(connection.createSession(false, Session.AUTO_ACKNOWLEDGE)); - } - - final int consumerCount = 1000; - final Deque<MessageConsumer> consumers = new ArrayDeque<>(); - for (int i = 0; i < consumerCount; i++) { - consumers.push(consumerSession.createConsumer(destination)); - } - final ExecutorService executorService = Executors.newCachedThreadPool(); - final FailoverTransport failoverTransport = ((ActiveMQConnection) connection).getTransport().narrow(FailoverTransport.class); - final TransportListener delegate = failoverTransport.getTransportListener(); - failoverTransport.setTransportListener(new TransportListener() { - @Override - public void onCommand(Object command) { - delegate.onCommand(command); + try { + ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("failover:(" + url + ")"); + configureConnectionFactory(cf); + Connection connection = cf.createConnection(); + connection.start(); + Session producerSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + final Queue destination = producerSession.createQueue(QUEUE_NAME + "?consumer.prefetchSize=1"); + + produceMessage(producerSession, destination); + connection.close(); + + connection = cf.createConnection(); + connection.start(); + connections.add(connection); + final Session consumerSession = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE); + + final int sessionCount = 10; + final Stack<Session> sessions = new Stack<>(); + for (int i = 0; i < sessionCount; i++) { + sessions.push(connection.createSession(false, Session.AUTO_ACKNOWLEDGE)); } - @Override - public void onException(IOException error) { - delegate.onException(error); + final int consumerCount = 1000; + final Deque<MessageConsumer> consumers = new ArrayDeque<>(); + for (int i = 0; i < consumerCount; i++) { + consumers.push(consumerSession.createConsumer(destination)); } - @Override - public void transportInterupted() { + final FailoverTransport failoverTransport = ((ActiveMQConnection) connection).getTransport().narrow(FailoverTransport.class); + final TransportListener delegate = failoverTransport.getTransportListener(); + failoverTransport.setTransportListener(new TransportListener() { + @Override + public void onCommand(Object command) { + delegate.onCommand(command); + } - LOG.error("Transport interrupted: " + failoverTransport, new RuntimeException("HERE")); - for (int i = 0; i < consumerCount && !consumers.isEmpty(); i++) { + @Override + public void onException(IOException error) { + delegate.onException(error); + } - executorService.execute(new Runnable() { - public void run() { - MessageConsumer localConsumer = null; - try { - synchronized (delegate) { - localConsumer = consumers.pop(); - } - localConsumer.receive(1); + @Override + public void transportInterupted() { - LOG.info("calling close() " + ((ActiveMQMessageConsumer) localConsumer).getConsumerId()); - localConsumer.close(); - } - catch (NoSuchElementException nse) { - } - catch (Exception ignored) { - LOG.error("Ex on: " + ((ActiveMQMessageConsumer) localConsumer).getConsumerId(), ignored); + LOG.error("Transport interrupted: " + failoverTransport, new RuntimeException("HERE")); + for (int i = 0; i < consumerCount && !consumers.isEmpty(); i++) { + + executorService.execute(new Runnable() { + public void run() { + MessageConsumer localConsumer = null; + try { + synchronized (delegate) { + localConsumer = consumers.pop(); + } + localConsumer.receive(1); + + LOG.info("calling close() " + ((ActiveMQMessageConsumer) localConsumer).getConsumerId()); + localConsumer.close(); + } + catch (NoSuchElementException nse) { + } + catch (Exception ignored) { + LOG.error("Ex on: " + ((ActiveMQMessageConsumer) localConsumer).getConsumerId(), ignored); + } } - } - }); + }); + } + + delegate.transportInterupted(); } - delegate.transportInterupted(); - } + @Override + public void transportResumed() { + delegate.transportResumed(); + } + }); - @Override - public void transportResumed() { - delegate.transportResumed(); + MessageConsumer consumer = null; + synchronized (delegate) { + consumer = consumers.pop(); } - }); + LOG.info("calling close to trigger broker stop " + ((ActiveMQMessageConsumer) consumer).getConsumerId()); + consumer.close(); - MessageConsumer consumer = null; - synchronized (delegate) { - consumer = consumers.pop(); - } - LOG.info("calling close to trigger broker stop " + ((ActiveMQMessageConsumer) consumer).getConsumerId()); - consumer.close(); + // will be stopped by the plugin + brokerStopLatch.await(); + doByteman.set(false); + broker = createBroker(); + broker.start(); - // will be stopped by the plugin - brokerStopLatch.await(); - doByteman.set(false); - broker = createBroker(); - broker.start(); + consumer = consumerSession.createConsumer(destination); + LOG.info("finally consuming message: " + ((ActiveMQMessageConsumer) consumer).getConsumerId()); - consumer = consumerSession.createConsumer(destination); - LOG.info("finally consuming message: " + ((ActiveMQMessageConsumer) consumer).getConsumerId()); + Message msg = null; + for (int i = 0; i < 4 && msg == null; i++) { + msg = consumer.receive(1000); + } - Message msg = null; - for (int i = 0; i < 4 && msg == null; i++) { - msg = consumer.receive(1000); + LOG.info("post: from consumer1 received: " + msg); + Assert.assertNotNull("got message after failover", msg); + msg.acknowledge(); } - - LOG.info("post: from consumer1 received: " + msg); - Assert.assertNotNull("got message after failover", msg); - msg.acknowledge(); - - for (Connection c : connections) { - c.close(); + finally { + executorService.shutdown(); + for (Connection c : connections) { + c.close(); + } } } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/e30a1868/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/TwoBrokerFailoverClusterTest.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/TwoBrokerFailoverClusterTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/TwoBrokerFailoverClusterTest.java index dc91873..5759547 100644 --- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/TwoBrokerFailoverClusterTest.java +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/transport/failover/TwoBrokerFailoverClusterTest.java @@ -27,7 +27,6 @@ import org.junit.Assert; import org.junit.Before; import org.junit.Test; -import javax.jms.Connection; import javax.jms.MessageConsumer; import javax.jms.Queue; import javax.jms.Session; @@ -80,8 +79,12 @@ public class TwoBrokerFailoverClusterTest extends OpenwireArtemisBaseTest { @Before public void setUp() throws Exception { - Configuration config0 = createConfig("127.0.0.1", 0); - Configuration config1 = createConfig("127.0.0.1", 1); + HashMap<String, String> map = new HashMap<>(); + map.put("rebalanceClusterClients", "true"); + map.put("updateClusterClients", "true"); + map.put("updateClusterClientsOnRemove", "true"); + Configuration config0 = createConfig("127.0.0.1", 0, map); + Configuration config1 = createConfig("127.0.0.1", 1, map); deployClusterConfiguration(config0, 1); deployClusterConfiguration(config1, 0); @@ -99,6 +102,9 @@ public class TwoBrokerFailoverClusterTest extends OpenwireArtemisBaseTest { @After public void tearDown() throws Exception { + for (ActiveMQConnection conn : connections) { + conn.close(); + } server0.stop(); server1.stop(); }
