http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4126Test.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4126Test.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4126Test.java new file mode 100644 index 0000000..4d6d39c --- /dev/null +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4126Test.java @@ -0,0 +1,180 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.bugs; + +import java.net.Socket; +import java.net.URI; + +import javax.management.ObjectName; +import javax.net.SocketFactory; +import javax.net.ssl.SSLSocketFactory; + +import junit.framework.TestCase; + +import org.apache.activemq.ActiveMQConnection; +import org.apache.activemq.ActiveMQSslConnectionFactory; +import org.apache.activemq.broker.BrokerFactory; +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.transport.stomp.Stomp; +import org.apache.activemq.transport.stomp.StompConnection; +import org.apache.activemq.transport.stomp.StompFrame; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +/** + * + */ +public class AMQ4126Test { + + protected BrokerService broker; + + protected String java_security_auth_login_config = "java.security.auth.login.config"; + protected String xbean = "xbean:"; + protected String confBase = "src/test/resources/org/apache/activemq/bugs/amq4126"; + protected String certBase = "src/test/resources/org/apache/activemq/security"; + protected String JaasStompSSLBroker_xml = "JaasStompSSLBroker.xml"; + protected StompConnection stompConnection = new StompConnection(); + private final static String destinationName = "TEST.QUEUE"; + protected String oldLoginConf = null; + + @Before + public void before() throws Exception { + if (System.getProperty(java_security_auth_login_config) != null) { + oldLoginConf = System.getProperty(java_security_auth_login_config); + } + System.setProperty(java_security_auth_login_config, confBase + "/login.config"); + broker = BrokerFactory.createBroker(xbean + confBase + "/" + JaasStompSSLBroker_xml); + + broker.setDeleteAllMessagesOnStartup(true); + broker.setUseJmx(true); + broker.start(); + broker.waitUntilStarted(); + } + + @After + public void after() throws Exception { + broker.stop(); + + if (oldLoginConf != null) { + System.setProperty(java_security_auth_login_config, oldLoginConf); + } + } + + public Socket createSocket(String host, int port) throws Exception { + System.setProperty("javax.net.ssl.trustStore", certBase + "/broker1.ks"); + System.setProperty("javax.net.ssl.trustStorePassword", "password"); + System.setProperty("javax.net.ssl.trustStoreType", "jks"); + System.setProperty("javax.net.ssl.keyStore", certBase + "/client.ks"); + System.setProperty("javax.net.ssl.keyStorePassword", "password"); + System.setProperty("javax.net.ssl.keyStoreType", "jks"); + + SocketFactory factory = SSLSocketFactory.getDefault(); + return factory.createSocket(host, port); + } + + public void stompConnectTo(String connectorName, String extraHeaders) throws Exception { + String host = broker.getConnectorByName(connectorName).getConnectUri().getHost(); + int port = broker.getConnectorByName(connectorName).getConnectUri().getPort(); + stompConnection.open(createSocket(host, port)); + String extra = extraHeaders != null ? extraHeaders : "\n"; + stompConnection.sendFrame("CONNECT\n" + extra + "\n" + Stomp.NULL); + + StompFrame f = stompConnection.receive(); + TestCase.assertEquals(f.getBody(), "CONNECTED", f.getAction()); + stompConnection.close(); + } + + @Test + public void testStompSSLWithUsernameAndPassword() throws Exception { + stompConnectTo("stomp+ssl", "login:system\n" + "passcode:manager\n"); + } + + @Test + public void testStompSSLWithCertificate() throws Exception { + stompConnectTo("stomp+ssl", null); + } + + @Test + public void testStompNIOSSLWithUsernameAndPassword() throws Exception { + stompConnectTo("stomp+nio+ssl", "login:system\n" + "passcode:manager\n"); + } + + @Test + public void testStompNIOSSLWithCertificate() throws Exception { + stompConnectTo("stomp+nio+ssl", null); + } + + public void openwireConnectTo(String connectorName, String username, String password) throws Exception { + URI brokerURI = broker.getConnectorByName(connectorName).getConnectUri(); + String uri = "ssl://" + brokerURI.getHost() + ":" + brokerURI.getPort(); + ActiveMQSslConnectionFactory cf = new ActiveMQSslConnectionFactory(uri); + cf.setTrustStore("org/apache/activemq/security/broker1.ks"); + cf.setTrustStorePassword("password"); + cf.setKeyStore("org/apache/activemq/security/client.ks"); + cf.setKeyStorePassword("password"); + ActiveMQConnection connection = null; + if (username != null || password != null) { + connection = (ActiveMQConnection)cf.createConnection(username, password); + } else { + connection = (ActiveMQConnection)cf.createConnection(); + } + TestCase.assertNotNull(connection); + connection.start(); + connection.stop(); + } + + @Test + public void testOpenwireSSLWithUsernameAndPassword() throws Exception { + openwireConnectTo("openwire+ssl", "system", "manager"); + } + + @Test + public void testOpenwireSSLWithCertificate() throws Exception { + openwireConnectTo("openwire+ssl", null, null); + } + + @Test + public void testOpenwireNIOSSLWithUsernameAndPassword() throws Exception { + openwireConnectTo("openwire+nio+ssl", "system", "mmanager"); + } + + @Test + public void testOpenwireNIOSSLWithCertificate() throws Exception { + openwireConnectTo("openwire+nio+ssl", null, null); + } + + @Test + public void testJmx() throws Exception { + TestCase.assertFalse(findDestination(destinationName)); + broker.getAdminView().addQueue(destinationName); + TestCase.assertTrue(findDestination(destinationName)); + broker.getAdminView().removeQueue(destinationName); + TestCase.assertFalse(findDestination(destinationName)); + } + + private boolean findDestination(String name) throws Exception { + ObjectName[] destinations = broker.getAdminView().getQueues(); + for (ObjectName destination : destinations) { + if (destination.toString().contains(name)) { + return true; + } + } + return false; + } + +}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4133Test.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4133Test.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4133Test.java new file mode 100644 index 0000000..9ca08bb --- /dev/null +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4133Test.java @@ -0,0 +1,107 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.bugs; + +import java.net.Socket; + +import javax.net.SocketFactory; +import javax.net.ssl.SSLSocketFactory; + +import junit.framework.TestCase; + +import org.apache.activemq.broker.BrokerFactory; +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.transport.stomp.Stomp; +import org.apache.activemq.transport.stomp.StompConnection; +import org.apache.activemq.transport.stomp.StompFrame; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +public class AMQ4133Test { + + protected String java_security_auth_login_config = "java.security.auth.login.config"; + protected String xbean = "xbean:"; + protected String confBase = "src/test/resources/org/apache/activemq/bugs/amq4126"; + protected String certBase = "src/test/resources/org/apache/activemq/security"; + protected String activemqXml = "InconsistentConnectorPropertiesBehaviour.xml"; + protected BrokerService broker; + + protected String oldLoginConf = null; + + @Before + public void before() throws Exception { + if (System.getProperty(java_security_auth_login_config) != null) { + oldLoginConf = System.getProperty(java_security_auth_login_config); + } + System.setProperty(java_security_auth_login_config, confBase + "/" + "login.config"); + broker = BrokerFactory.createBroker(xbean + confBase + "/" + activemqXml); + + broker.start(); + broker.waitUntilStarted(); + } + + @After + public void after() throws Exception { + if (broker != null) { + broker.stop(); + broker.waitUntilStopped(); + } + } + + @Test + public void stompSSLTransportNeedClientAuthTrue() throws Exception { + stompConnectTo("localhost", broker.getConnectorByName("stomp+ssl").getConnectUri().getPort()); + } + + @Test + public void stompSSLNeedClientAuthTrue() throws Exception { + stompConnectTo("localhost", broker.getConnectorByName("stomp+ssl+special").getConnectUri().getPort()); + } + + @Test + public void stompNIOSSLTransportNeedClientAuthTrue() throws Exception { + stompConnectTo("localhost", broker.getConnectorByName("stomp+nio+ssl").getConnectUri().getPort()); + } + + @Test + public void stompNIOSSLNeedClientAuthTrue() throws Exception { + stompConnectTo("localhost", broker.getConnectorByName("stomp+nio+ssl+special").getConnectUri().getPort()); + } + + public Socket createSocket(String host, int port) throws Exception { + System.setProperty("javax.net.ssl.trustStore", certBase + "/" + "broker1.ks"); + System.setProperty("javax.net.ssl.trustStorePassword", "password"); + System.setProperty("javax.net.ssl.trustStoreType", "jks"); + System.setProperty("javax.net.ssl.keyStore", certBase + "/" + "client.ks"); + System.setProperty("javax.net.ssl.keyStorePassword", "password"); + System.setProperty("javax.net.ssl.keyStoreType", "jks"); + + SocketFactory factory = SSLSocketFactory.getDefault(); + return factory.createSocket(host, port); + } + + public void stompConnectTo(String host, int port) throws Exception { + StompConnection stompConnection = new StompConnection(); + stompConnection.open(createSocket(host, port)); + stompConnection.sendFrame("CONNECT\n" + "\n" + Stomp.NULL); + StompFrame f = stompConnection.receive(); + TestCase.assertEquals(f.getBody(), "CONNECTED", f.getAction()); + stompConnection.close(); + } + +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4147Test.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4147Test.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4147Test.java new file mode 100644 index 0000000..cf7ca45 --- /dev/null +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4147Test.java @@ -0,0 +1,211 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.activemq.bugs; + +import java.net.URI; +import java.util.concurrent.Semaphore; + +import javax.jms.Message; +import javax.jms.MessageListener; + +import org.apache.activemq.JmsMultipleBrokersTestSupport; +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.broker.region.Destination; +import org.apache.activemq.command.ActiveMQDestination; +import org.apache.activemq.network.DemandForwardingBridgeSupport; +import org.apache.activemq.util.MessageIdList; +import org.apache.activemq.util.Wait; + +/** + * This test demonstrates a bug in {@link DemandForwardingBridgeSupport} when + * bridges are VM-to-VM. Specifically, memory usage from the local broker is + * manipulated by the remote broker. + */ +public class AMQ4147Test extends JmsMultipleBrokersTestSupport { + /** + * This test demonstrates the bug: namely, when a message is bridged over + * the VMTransport, its memory usage continues to refer to the originating + * broker. As a result, memory usage is never accounted for on the remote + * broker, and the local broker's memory usage is only decreased once the + * message is consumed on the remote broker. + */ + public void testVMTransportRemoteMemoryUsage() throws Exception { + BrokerService broker1 = createBroker(new URI( + "broker:(vm://broker1)/broker1?persistent=false")); + + BrokerService broker2 = createBroker(new URI( + "broker:(vm://broker2)/broker2?persistent=false")); + + startAllBrokers(); + + // Forward messages from broker1 to broker2 over the VM transport. + bridgeBrokers("broker1", "broker2").start(); + + // Verify that broker1 and broker2's test queues have no memory usage. + ActiveMQDestination testQueue = createDestination( + AMQ4147Test.class.getSimpleName() + ".queue", false); + final Destination broker1TestQueue = broker1.getDestination(testQueue); + final Destination broker2TestQueue = broker2.getDestination(testQueue); + + assertEquals(0, broker1TestQueue.getMemoryUsage().getUsage()); + assertEquals(0, broker2TestQueue.getMemoryUsage().getUsage()); + + // Produce a message to broker1's test queue and verify that broker1's + // memory usage has increased, but broker2 still has no memory usage. + sendMessages("broker1", testQueue, 1); + assertTrue(broker1TestQueue.getMemoryUsage().getUsage() > 0); + assertEquals(0, broker2TestQueue.getMemoryUsage().getUsage()); + + // Create a consumer on broker2 that is synchronized to allow detection + // of "in flight" messages to the consumer. + MessageIdList broker2Messages = getBrokerMessages("broker2"); + final Semaphore consumerReady = new Semaphore(0); + final Semaphore consumerProceed = new Semaphore(0); + + broker2Messages.setParent(new MessageListener() { + @Override + public void onMessage(Message message) { + consumerReady.release(); + try { + consumerProceed.acquire(); + } catch (InterruptedException ex) { + Thread.currentThread().interrupt(); + } + } + }); + + createConsumer("broker2", testQueue); + + // Verify that when broker2's consumer receives the message, the memory + // usage has moved broker1 to broker2. The first assertion is expected + // to fail due to the bug; the try/finally ensures the consumer is + // released prior to failure so that the broker can shut down. + consumerReady.acquire(); + + try { + assertTrue("Memory Usage Should be Zero: ", Wait.waitFor(new Wait.Condition() { + @Override + public boolean isSatisified() throws Exception { + return broker1TestQueue.getMemoryUsage().getUsage() == 0; + } + })); + assertTrue(broker2TestQueue.getMemoryUsage().getUsage() > 0); + } finally { + // Consume the message and verify that there is no more memory + // usage. + consumerProceed.release(); + } + + assertTrue("Memory Usage Should be Zero: ", Wait.waitFor(new Wait.Condition() { + @Override + public boolean isSatisified() throws Exception { + return broker1TestQueue.getMemoryUsage().getUsage() == 0; + } + })); + assertTrue("Memory Usage Should be Zero: ", Wait.waitFor(new Wait.Condition() { + @Override + public boolean isSatisified() throws Exception { + return broker2TestQueue.getMemoryUsage().getUsage() == 0; + } + })); + } + + /** + * This test demonstrates that the bug is VMTransport-specific and does not + * occur when bridges occur using other protocols. + */ + public void testTcpTransportRemoteMemoryUsage() throws Exception { + BrokerService broker1 = createBroker(new URI( + "broker:(vm://broker1)/broker1?persistent=false")); + + BrokerService broker2 = createBroker(new URI( + "broker:(tcp://localhost:61616)/broker2?persistent=false")); + + startAllBrokers(); + + // Forward messages from broker1 to broker2 over the TCP transport. + bridgeBrokers("broker1", "broker2").start(); + + // Verify that broker1 and broker2's test queues have no memory usage. + ActiveMQDestination testQueue = createDestination( + AMQ4147Test.class.getSimpleName() + ".queue", false); + final Destination broker1TestQueue = broker1.getDestination(testQueue); + final Destination broker2TestQueue = broker2.getDestination(testQueue); + + assertEquals(0, broker1TestQueue.getMemoryUsage().getUsage()); + assertEquals(0, broker2TestQueue.getMemoryUsage().getUsage()); + + // Produce a message to broker1's test queue and verify that broker1's + // memory usage has increased, but broker2 still has no memory usage. + sendMessages("broker1", testQueue, 1); + assertTrue(broker1TestQueue.getMemoryUsage().getUsage() > 0); + assertEquals(0, broker2TestQueue.getMemoryUsage().getUsage()); + + // Create a consumer on broker2 that is synchronized to allow detection + // of "in flight" messages to the consumer. + MessageIdList broker2Messages = getBrokerMessages("broker2"); + final Semaphore consumerReady = new Semaphore(0); + final Semaphore consumerProceed = new Semaphore(0); + + broker2Messages.setParent(new MessageListener() { + @Override + public void onMessage(Message message) { + consumerReady.release(); + try { + consumerProceed.acquire(); + } catch (InterruptedException ex) { + Thread.currentThread().interrupt(); + } + } + }); + + createConsumer("broker2", testQueue); + + // Verify that when broker2's consumer receives the message, the memory + // usage has moved broker1 to broker2. + consumerReady.acquire(); + + try { + assertTrue("Memory Usage Should be Zero: ", Wait.waitFor(new Wait.Condition() { + @Override + public boolean isSatisified() throws Exception { + return broker1TestQueue.getMemoryUsage().getUsage() == 0; + } + })); + assertTrue(broker2TestQueue.getMemoryUsage().getUsage() > 0); + } finally { + // Consume the message and verify that there is no more memory + // usage. + consumerProceed.release(); + } + + // Pause to allow ACK to be processed. + assertTrue("Memory Usage Should be Zero: ", Wait.waitFor(new Wait.Condition() { + @Override + public boolean isSatisified() throws Exception { + return broker1TestQueue.getMemoryUsage().getUsage() == 0; + } + })); + assertTrue("Memory Usage Should be Zero: ", Wait.waitFor(new Wait.Condition() { + @Override + public boolean isSatisified() throws Exception { + return broker2TestQueue.getMemoryUsage().getUsage() == 0; + } + })); + } +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4148Test.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4148Test.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4148Test.java new file mode 100644 index 0000000..906131e --- /dev/null +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4148Test.java @@ -0,0 +1,95 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.activemq.bugs; + +import java.net.URI; +import java.util.Arrays; +import java.util.concurrent.TimeUnit; + +import org.apache.activemq.JmsMultipleBrokersTestSupport; +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.broker.region.Destination; +import org.apache.activemq.command.ActiveMQDestination; +import org.apache.activemq.network.DemandForwardingBridgeSupport; +import org.apache.activemq.network.NetworkConnector; +import org.apache.activemq.util.Wait; +import org.junit.Assert; + +/** + * This test demonstrates a bug in {@link DemandForwardingBridgeSupport} whereby + * a static subscription from broker1 to broker2 is forwarded to broker3 even + * though the network TTL is 1. This results in duplicate subscriptions on + * broker3. + */ +public class AMQ4148Test extends JmsMultipleBrokersTestSupport { + + public void test() throws Exception { + // Create a hub-and-spoke network where each hub-spoke pair share + // messages on a test queue. + BrokerService hub = createBroker(new URI("broker:(vm://hub)/hub?persistent=false")); + + final BrokerService[] spokes = new BrokerService[4]; + for (int i = 0; i < spokes.length; i++) { + spokes[i] = createBroker(new URI("broker:(vm://spoke" + i + ")/spoke" + i + "?persistent=false")); + + } + startAllBrokers(); + + ActiveMQDestination testQueue = createDestination(AMQ4148Test.class.getSimpleName() + ".queue", false); + + NetworkConnector[] ncs = new NetworkConnector[spokes.length]; + for (int i = 0; i < spokes.length; i++) { + NetworkConnector nc = bridgeBrokers("hub", "spoke" + i); + nc.setNetworkTTL(1); + nc.setDuplex(true); + nc.setConduitSubscriptions(false); + nc.setStaticallyIncludedDestinations(Arrays.asList(testQueue)); + nc.start(); + + ncs[i] = nc; + } + + waitForBridgeFormation(); + + // Pause to allow subscriptions to be created. + TimeUnit.SECONDS.sleep(5); + + // Verify that the hub has a subscription from each spoke, but that each + // spoke has a single subscription from the hub (since the network TTL is 1). + final Destination hubTestQueue = hub.getDestination(testQueue); + assertTrue("Expecting {" + spokes.length + "} consumer but was {" + hubTestQueue.getConsumers().size() + "}", + Wait.waitFor(new Wait.Condition() { + + @Override + public boolean isSatisified() throws Exception { + return spokes.length == hubTestQueue.getConsumers().size(); + } + }) + ); + + // Now check each spoke has exactly one consumer on the Queue. + for (int i = 0; i < 4; i++) { + Destination spokeTestQueue = spokes[i].getDestination(testQueue); + Assert.assertEquals(1, spokeTestQueue.getConsumers().size()); + } + + for (NetworkConnector nc : ncs) { + nc.stop(); + } + } +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4157Test.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4157Test.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4157Test.java new file mode 100644 index 0000000..d29ec08 --- /dev/null +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4157Test.java @@ -0,0 +1,175 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.bugs; + +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; + +import java.util.Vector; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; + +import javax.jms.BytesMessage; +import javax.jms.DeliveryMode; +import javax.jms.Destination; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Session; + +import org.apache.activemq.ActiveMQConnection; +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.command.ActiveMQQueue; +import org.apache.activemq.command.ConnectionControl; +import org.junit.After; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +public class AMQ4157Test { + static final Logger LOG = LoggerFactory.getLogger(AMQ4157Test.class); + private BrokerService broker; + private ActiveMQConnectionFactory connectionFactory; + private final Destination destination = new ActiveMQQueue("Test"); + private final String payloadString = new String(new byte[8*1024]); + private final boolean useBytesMessage= true; + private final int parallelProducer = 20; + private final int parallelConsumer = 100; + + private final Vector<Exception> exceptions = new Vector<Exception>(); + long toSend = 1000; + + @Test + public void testPublishCountsWithRollbackConsumer() throws Exception { + + startBroker(true); + + final AtomicLong sharedCount = new AtomicLong(toSend); + ExecutorService executorService = Executors.newCachedThreadPool(); + + for (int i=0; i< parallelConsumer; i++) { + executorService.execute(new Runnable() { + @Override + public void run() { + try { + consumeOneAndRollback(); + } catch (Exception e) { + exceptions.add(e); + } + } + }); + } + + for (int i=0; i< parallelProducer; i++) { + executorService.execute(new Runnable() { + @Override + public void run() { + try { + publishMessages(sharedCount, 0); + } catch (Exception e) { + exceptions.add(e); + } + } + }); + } + + executorService.shutdown(); + executorService.awaitTermination(30, TimeUnit.MINUTES); + assertTrue("Producers done in time", executorService.isTerminated()); + assertTrue("No exceptions: " + exceptions, exceptions.isEmpty()); + + restartBroker(500); + + LOG.info("Attempting consume of {} messages", toSend); + + consumeMessages(toSend); + } + + private void consumeOneAndRollback() throws Exception { + ActiveMQConnection connection = (ActiveMQConnection) connectionFactory.createConnection(); + connection.start(); + Session session = connection.createSession(true, Session.SESSION_TRANSACTED); + MessageConsumer consumer = session.createConsumer(destination); + Message message = null; + while (message == null) { + message = consumer.receive(1000); + } + session.rollback(); + connection.close(); + } + + private void consumeMessages(long count) throws Exception { + ActiveMQConnection connection = (ActiveMQConnection) connectionFactory.createConnection(); + connection.start(); + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + MessageConsumer consumer = session.createConsumer(destination); + for (int i=0; i<count; i++) { + assertNotNull("got message "+ i, consumer.receive(20000)); + } + assertNull("none left over", consumer.receive(2000)); + } + + private void restartBroker(int restartDelay) throws Exception { + stopBroker(); + TimeUnit.MILLISECONDS.sleep(restartDelay); + startBroker(false); + } + + @After + public void stopBroker() throws Exception { + if (broker != null) { + broker.stop(); + broker.waitUntilStopped(); + } + } + + private void publishMessages(AtomicLong count, int expiry) throws Exception { + ActiveMQConnection connection = (ActiveMQConnection) connectionFactory.createConnection(); + connection.setWatchTopicAdvisories(false); + connection.start(); + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + + MessageProducer producer = session.createProducer(destination); + while ( (count.getAndDecrement()) > 0) { + Message message = null; + if (useBytesMessage) { + message = session.createBytesMessage(); + ((BytesMessage) message).writeBytes(payloadString.getBytes()); + } else { + message = session.createTextMessage(payloadString); + } + producer.send(message, DeliveryMode.PERSISTENT, 5, expiry); + } + connection.syncSendPacket(new ConnectionControl()); + connection.close(); + } + + public void startBroker(boolean deleteAllMessages) throws Exception { + broker = new BrokerService(); + broker.setDeleteAllMessagesOnStartup(deleteAllMessages); + broker.addConnector("tcp://0.0.0.0:0"); + broker.start(); + + String options = "?jms.redeliveryPolicy.maximumRedeliveries=-1&jms.prefetchPolicy.all=1000&jms.watchTopicAdvisories=false&jms.useAsyncSend=true&jms.alwaysSessionAsync=false&jms.dispatchAsync=false&socketBufferSize=131072&ioBufferSize=16384&wireFormat.tightEncodingEnabled=false&wireFormat.cacheSize=8192"; + connectionFactory = new ActiveMQConnectionFactory(broker.getTransportConnectors().get(0).getConnectUri() + options); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4160Test.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4160Test.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4160Test.java new file mode 100644 index 0000000..4867f28 --- /dev/null +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4160Test.java @@ -0,0 +1,393 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.activemq.bugs; + +import java.io.IOException; +import java.net.URI; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +import javax.management.ObjectName; + +import org.apache.activemq.JmsMultipleBrokersTestSupport; +import org.apache.activemq.broker.Broker; +import org.apache.activemq.broker.BrokerFilter; +import org.apache.activemq.broker.BrokerPlugin; +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.broker.ConnectionContext; +import org.apache.activemq.command.ConnectionInfo; +import org.apache.activemq.command.DiscoveryEvent; +import org.apache.activemq.network.DiscoveryNetworkConnector; +import org.apache.activemq.network.NetworkBridge; +import org.apache.activemq.network.NetworkBridgeListener; +import org.apache.activemq.network.NetworkConnector; +import org.apache.activemq.thread.TaskRunnerFactory; +import org.apache.activemq.transport.Transport; +import org.apache.activemq.transport.discovery.DiscoveryAgent; +import org.apache.activemq.transport.discovery.DiscoveryListener; +import org.apache.activemq.transport.discovery.simple.SimpleDiscoveryAgent; +import org.junit.Assert; + +/** + * This test demonstrates a number of race conditions in + * {@link DiscoveryNetworkConnector} that can result in an active bridge no + * longer being reported as active and vice-versa, an inactive bridge still + * being reported as active. + */ +public class AMQ4160Test extends JmsMultipleBrokersTestSupport { + final long MAX_TEST_TIME = TimeUnit.MINUTES.toMillis(2); + + /** + * Since these tests involve wait conditions, protect against indefinite + * waits (due to unanticipated issues). + */ + public void setUp() throws Exception { + setAutoFail(true); + setMaxTestTime(MAX_TEST_TIME); + super.setUp(); + } + + /** + * This test demonstrates how concurrent attempts to establish a bridge to + * the same remote broker are allowed to occur. Connection uniqueness will + * cause whichever bridge creation attempt is second to fail. However, this + * failure erases the entry in + * {@link DiscoveryNetworkConnector#activeBridges()} that represents the + * successful first bridge creation attempt. + */ + public void testLostActiveBridge() throws Exception { + final long ATTEMPT_TO_CREATE_DELAY = TimeUnit.SECONDS.toMillis(15); + + // Start two brokers with a bridge from broker1 to broker2. + BrokerService broker1 = createBroker(new URI( + "broker:(vm://broker1)/broker1?persistent=false")); + final BrokerService broker2 = createBroker(new URI( + "broker:(vm://broker2)/broker2?persistent=false")); + + // Allow the concurrent local bridge connections to be made even though + // they are duplicated; this prevents both of the bridge attempts from + // failing in the case that the local and remote bridges are established + // out-of-order. + BrokerPlugin ignoreAddConnectionPlugin = new BrokerPlugin() { + @Override + public Broker installPlugin(Broker broker) throws Exception { + return new BrokerFilter(broker) { + @Override + public void addConnection(ConnectionContext context, + ConnectionInfo info) throws Exception { + // ignore + } + }; + } + }; + + broker1.setPlugins(new BrokerPlugin[] { ignoreAddConnectionPlugin }); + + startAllBrokers(); + + // Start a bridge from broker1 to broker2. The discovery agent attempts + // to create the bridge concurrently with two threads, and the + // synchronization in createBridge ensures that pre-patch both threads + // actually attempt to start bridges. Post-patch, only one thread is + // allowed to start the bridge. + final CountDownLatch attemptLatch = new CountDownLatch(2); + final CountDownLatch createLatch = new CountDownLatch(2); + + DiscoveryNetworkConnector nc = new DiscoveryNetworkConnector() { + @Override + public void onServiceAdd(DiscoveryEvent event) { + // Pre-and-post patch, two threads attempt to establish a bridge + // to the same remote broker. + attemptLatch.countDown(); + super.onServiceAdd(event); + } + + @Override + protected NetworkBridge createBridge(Transport localTransport, + Transport remoteTransport, final DiscoveryEvent event) { + // Pre-patch, the two threads are allowed to create the bridge. + // Post-patch, only the first thread is allowed. Wait a + // reasonable delay once both attempts are detected to allow + // the two bridge creations to occur concurrently (pre-patch). + // Post-patch, the wait will timeout and allow the first (and + // only) bridge creation to occur. + try { + attemptLatch.await(); + createLatch.countDown(); + createLatch.await(ATTEMPT_TO_CREATE_DELAY, + TimeUnit.MILLISECONDS); + return super.createBridge(localTransport, remoteTransport, + event); + } catch (InterruptedException e) { + Thread.interrupted(); + return null; + } + } + }; + + nc.setDiscoveryAgent(new DiscoveryAgent() { + TaskRunnerFactory taskRunner = new TaskRunnerFactory(); + DiscoveryListener listener; + + @Override + public void start() throws Exception { + taskRunner.init(); + taskRunner.execute(new Runnable() { + @Override + public void run() { + listener.onServiceAdd(new DiscoveryEvent(broker2 + .getVmConnectorURI().toString())); + } + }); + taskRunner.execute(new Runnable() { + @Override + public void run() { + listener.onServiceAdd(new DiscoveryEvent(broker2 + .getVmConnectorURI().toString())); + } + }); + } + + @Override + public void stop() throws Exception { + taskRunner.shutdown(); + } + + @Override + public void setDiscoveryListener(DiscoveryListener listener) { + this.listener = listener; + } + + @Override + public void registerService(String name) throws IOException { + } + + @Override + public void serviceFailed(DiscoveryEvent event) throws IOException { + listener.onServiceRemove(event); + } + }); + + broker1.addNetworkConnector(nc); + nc.start(); + + // Wait for the bridge to be formed by the first attempt. + waitForBridge(broker1.getBrokerName(), broker2.getBrokerName(), + MAX_TEST_TIME, TimeUnit.MILLISECONDS); + + // Pre-patch, the second bridge creation attempt fails and removes the + // first (successful) bridge creation attempt from the + // list of active bridges. Post-patch, the second bridge creation + // attempt is prevented, so the first bridge creation attempt + // remains "active". This assertion is expected to fail pre-patch and + // pass post-patch. + Assert.assertFalse(nc.activeBridges().isEmpty()); + } + + /** + * This test demonstrates a race condition where a failed bridge can be + * removed from the list of active bridges in + * {@link DiscoveryNetworkConnector} before it has been added. Eventually, + * the failed bridge is added, but never removed, which causes subsequent + * bridge creation attempts to be ignored. The result is a network connector + * that thinks it has an active bridge, when in fact it doesn't. + */ + public void testInactiveBridgStillActive() throws Exception { + // Start two brokers with a bridge from broker1 to broker2. + BrokerService broker1 = createBroker(new URI( + "broker:(vm://broker1)/broker1?persistent=false")); + final BrokerService broker2 = createBroker(new URI( + "broker:(vm://broker2)/broker2?persistent=false")); + + // Force bridge failure by having broker1 disallow connections. + BrokerPlugin disallowAddConnectionPlugin = new BrokerPlugin() { + @Override + public Broker installPlugin(Broker broker) throws Exception { + return new BrokerFilter(broker) { + @Override + public void addConnection(ConnectionContext context, + ConnectionInfo info) throws Exception { + throw new Exception( + "Test exception to force bridge failure"); + } + }; + } + }; + + broker1.setPlugins(new BrokerPlugin[] { disallowAddConnectionPlugin }); + + startAllBrokers(); + + // Start a bridge from broker1 to broker2. The bridge delays returning + // from start until after the bridge failure has been processed; + // this leaves the first bridge creation attempt recorded as active, + // even though it failed. + final SimpleDiscoveryAgent da = new SimpleDiscoveryAgent(); + da.setServices(new URI[] { broker2.getVmConnectorURI() }); + + final CountDownLatch attemptLatch = new CountDownLatch(3); + final CountDownLatch removedLatch = new CountDownLatch(1); + + DiscoveryNetworkConnector nc = new DiscoveryNetworkConnector() { + @Override + public void onServiceAdd(DiscoveryEvent event) { + attemptLatch.countDown(); + super.onServiceAdd(event); + } + + @Override + public void onServiceRemove(DiscoveryEvent event) { + super.onServiceRemove(event); + removedLatch.countDown(); + } + + @Override + protected NetworkBridge createBridge(Transport localTransport, + Transport remoteTransport, final DiscoveryEvent event) { + final NetworkBridge next = super.createBridge(localTransport, + remoteTransport, event); + return new NetworkBridge() { + + @Override + public void start() throws Exception { + next.start(); + // Delay returning until the failed service has been + // removed. + removedLatch.await(); + } + + @Override + public void stop() throws Exception { + next.stop(); + } + + @Override + public void serviceRemoteException(Throwable error) { + next.serviceRemoteException(error); + } + + @Override + public void serviceLocalException(Throwable error) { + next.serviceLocalException(error); + } + + @Override + public void setNetworkBridgeListener( + NetworkBridgeListener listener) { + next.setNetworkBridgeListener(listener); + } + + @Override + public String getRemoteAddress() { + return next.getRemoteAddress(); + } + + @Override + public String getRemoteBrokerName() { + return next.getRemoteBrokerName(); + } + + @Override + public String getRemoteBrokerId() { + return next.getRemoteBrokerId(); + } + + @Override + public String getLocalAddress() { + return next.getLocalAddress(); + } + + @Override + public String getLocalBrokerName() { + return next.getLocalBrokerName(); + } + + @Override + public long getEnqueueCounter() { + return next.getEnqueueCounter(); + } + + @Override + public long getDequeueCounter() { + return next.getDequeueCounter(); + } + + @Override + public void setMbeanObjectName(ObjectName objectName) { + next.setMbeanObjectName(objectName); + } + + @Override + public ObjectName getMbeanObjectName() { + return next.getMbeanObjectName(); + } + + public void resetStats(){ + next.resetStats(); + } + }; + } + }; + nc.setDiscoveryAgent(da); + + broker1.addNetworkConnector(nc); + nc.start(); + + // All bridge attempts should fail, so the attempt latch should get + // triggered. However, because of the race condition, the first attempt + // is considered successful and causes further attempts to stop. + // Therefore, this wait will time out and cause the test to fail. + Assert.assertTrue(attemptLatch.await(30, TimeUnit.SECONDS)); + } + + /** + * This test verifies that when a network connector is restarted, any + * bridges that were active at the time of the stop are allowed to be + * re-established (i.e., the "active events" data structure in + * {@link DiscoveryNetworkConnector} is reset. + */ + public void testAllowAttemptsAfterRestart() throws Exception { + final long STOP_DELAY = TimeUnit.SECONDS.toMillis(10); + + // Start two brokers with a bridge from broker1 to broker2. + BrokerService broker1 = createBroker(new URI( + "broker:(vm://broker1)/broker1?persistent=false")); + final BrokerService broker2 = createBroker(new URI( + "broker:(vm://broker2)/broker2?persistent=false")); + + startAllBrokers(); + + // Start a bridge from broker1 to broker2. + NetworkConnector nc = bridgeBrokers(broker1.getBrokerName(), + broker2.getBrokerName()); + nc.start(); + + waitForBridge(broker1.getBrokerName(), broker2.getBrokerName(), + MAX_TEST_TIME, TimeUnit.MILLISECONDS); + + // Restart the network connector and verify that the bridge is + // re-established. The pause between start/stop is to account for the + // asynchronous closure. + nc.stop(); + Thread.sleep(STOP_DELAY); + nc.start(); + + waitForBridge(broker1.getBrokerName(), broker2.getBrokerName(), + MAX_TEST_TIME, TimeUnit.MILLISECONDS); + } +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4212Test.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4212Test.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4212Test.java new file mode 100644 index 0000000..141a881 --- /dev/null +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4212Test.java @@ -0,0 +1,358 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.bugs; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +import java.io.File; +import java.io.IOException; +import java.util.Collection; +import java.util.concurrent.TimeUnit; + +import javax.jms.Connection; +import javax.jms.DeliveryMode; +import javax.jms.Destination; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Session; +import javax.jms.TextMessage; +import javax.jms.Topic; +import javax.management.ObjectName; + +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.command.ActiveMQQueue; +import org.apache.activemq.command.ActiveMQTopic; +import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter; +import org.apache.activemq.store.kahadb.disk.journal.DataFile; +import org.apache.activemq.util.Wait; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class AMQ4212Test { + + private static final Logger LOG = LoggerFactory.getLogger(AMQ4212Test.class); + + private BrokerService service; + private String connectionUri; + private ActiveMQConnectionFactory cf; + + private final int MSG_COUNT = 256; + + @Before + public void setUp() throws IOException, Exception { + createBroker(true, false); + } + + public void createBroker(boolean deleteAllMessages, boolean recover) throws Exception { + service = new BrokerService(); + service.setBrokerName("InactiveSubTest"); + service.setDeleteAllMessagesOnStartup(deleteAllMessages); + service.setAdvisorySupport(false); + service.setPersistent(true); + service.setUseJmx(true); + service.setKeepDurableSubsActive(false); + + KahaDBPersistenceAdapter pa=new KahaDBPersistenceAdapter(); + File dataFile=new File("KahaDB"); + pa.setDirectory(dataFile); + pa.setJournalMaxFileLength(10*1024); + pa.setCheckpointInterval(TimeUnit.SECONDS.toMillis(5)); + pa.setCleanupInterval(TimeUnit.SECONDS.toMillis(5)); + pa.setForceRecoverIndex(recover); + + service.setPersistenceAdapter(pa); + service.start(); + service.waitUntilStarted(); + + connectionUri = "vm://InactiveSubTest?create=false"; + cf = new ActiveMQConnectionFactory(connectionUri); + } + + private void restartBroker() throws Exception { + stopBroker(); + createBroker(false, false); + } + + private void recoverBroker() throws Exception { + stopBroker(); + createBroker(false, true); + } + + @After + public void stopBroker() throws Exception { + if (service != null) { + service.stop(); + service.waitUntilStopped(); + service = null; + } + } + + @Test + public void testDirableSubPrefetchRecovered() throws Exception { + + ActiveMQQueue queue = new ActiveMQQueue("MyQueue"); + ActiveMQTopic topic = new ActiveMQTopic("MyDurableTopic"); + + // Send to a Queue to create some journal files + sendMessages(queue); + + LOG.info("There are currently [{}] journal log files.", getNumberOfJournalFiles()); + + createInactiveDurableSub(topic); + + assertTrue("Should have an inactive durable sub", Wait.waitFor(new Wait.Condition() { + @Override + public boolean isSatisified() throws Exception { + ObjectName[] subs = service.getAdminView().getInactiveDurableTopicSubscribers(); + return subs != null && subs.length == 1 ? true : false; + } + })); + + // Now send some more to the queue to create even more files. + sendMessages(queue); + + LOG.info("There are currently [{}] journal log files.", getNumberOfJournalFiles()); + assertTrue(getNumberOfJournalFiles() > 1); + + LOG.info("Restarting the broker."); + restartBroker(); + LOG.info("Restarted the broker."); + + LOG.info("There are currently [{}] journal log files.", getNumberOfJournalFiles()); + assertTrue(getNumberOfJournalFiles() > 1); + + assertTrue("Should have an inactive durable sub", Wait.waitFor(new Wait.Condition() { + @Override + public boolean isSatisified() throws Exception { + ObjectName[] subs = service.getAdminView().getInactiveDurableTopicSubscribers(); + return subs != null && subs.length == 1 ? true : false; + } + })); + + // Clear out all queue data + service.getAdminView().removeQueue(queue.getQueueName()); + + assertTrue("Less than two journal files expected, was " + getNumberOfJournalFiles(), Wait.waitFor(new Wait.Condition() { + @Override + public boolean isSatisified() throws Exception { + return getNumberOfJournalFiles() <= 2; + } + }, TimeUnit.MINUTES.toMillis(2))); + + LOG.info("Sending {} Messages to the Topic.", MSG_COUNT); + // Send some messages to the inactive destination + sendMessages(topic); + + LOG.info("Attempt to consume {} messages from the Topic.", MSG_COUNT); + assertEquals(MSG_COUNT, consumeFromInactiveDurableSub(topic)); + + LOG.info("Recovering the broker."); + recoverBroker(); + LOG.info("Recovering the broker."); + + assertTrue("Should have an inactive durable sub", Wait.waitFor(new Wait.Condition() { + @Override + public boolean isSatisified() throws Exception { + ObjectName[] subs = service.getAdminView().getInactiveDurableTopicSubscribers(); + return subs != null && subs.length == 1 ? true : false; + } + })); + } + + @Test + public void testDurableAcksNotDropped() throws Exception { + + ActiveMQQueue queue = new ActiveMQQueue("MyQueue"); + ActiveMQTopic topic = new ActiveMQTopic("MyDurableTopic"); + + // Create durable sub in first data file. + createInactiveDurableSub(topic); + + assertTrue("Should have an inactive durable sub", Wait.waitFor(new Wait.Condition() { + @Override + public boolean isSatisified() throws Exception { + ObjectName[] subs = service.getAdminView().getInactiveDurableTopicSubscribers(); + return subs != null && subs.length == 1 ? true : false; + } + })); + + // Send to a Topic + sendMessages(topic, 1); + + // Send to a Queue to create some journal files + sendMessages(queue); + + LOG.info("Before consume there are currently [{}] journal log files.", getNumberOfJournalFiles()); + + // Consume all the Messages leaving acks behind. + consumeDurableMessages(topic, 1); + + LOG.info("After consume there are currently [{}] journal log files.", getNumberOfJournalFiles()); + + // Now send some more to the queue to create even more files. + sendMessages(queue); + + LOG.info("More Queued. There are currently [{}] journal log files.", getNumberOfJournalFiles()); + assertTrue(getNumberOfJournalFiles() > 1); + + LOG.info("Restarting the broker."); + restartBroker(); + LOG.info("Restarted the broker."); + + LOG.info("There are currently [{}] journal log files.", getNumberOfJournalFiles()); + assertTrue(getNumberOfJournalFiles() > 1); + + assertTrue("Should have an inactive durable sub", Wait.waitFor(new Wait.Condition() { + @Override + public boolean isSatisified() throws Exception { + ObjectName[] subs = service.getAdminView().getInactiveDurableTopicSubscribers(); + return subs != null && subs.length == 1 ? true : false; + } + })); + + // Clear out all queue data + service.getAdminView().removeQueue(queue.getQueueName()); + + assertTrue("Less than three journal file expected, was " + getNumberOfJournalFiles(), Wait.waitFor(new Wait.Condition() { + @Override + public boolean isSatisified() throws Exception { + return getNumberOfJournalFiles() <= 3; + } + }, TimeUnit.MINUTES.toMillis(3))); + + // See if we receive any message they should all be acked. + tryConsumeExpectNone(topic); + + LOG.info("There are currently [{}] journal log files.", getNumberOfJournalFiles()); + + LOG.info("Recovering the broker."); + recoverBroker(); + LOG.info("Recovering the broker."); + + LOG.info("There are currently [{}] journal log files.", getNumberOfJournalFiles()); + + assertTrue("Should have an inactive durable sub", Wait.waitFor(new Wait.Condition() { + @Override + public boolean isSatisified() throws Exception { + ObjectName[] subs = service.getAdminView().getInactiveDurableTopicSubscribers(); + return subs != null && subs.length == 1 ? true : false; + } + })); + + // See if we receive any message they should all be acked. + tryConsumeExpectNone(topic); + + assertTrue("Less than three journal file expected, was " + getNumberOfJournalFiles(), Wait.waitFor(new Wait.Condition() { + @Override + public boolean isSatisified() throws Exception { + return getNumberOfJournalFiles() == 1; + } + }, TimeUnit.MINUTES.toMillis(1))); + } + + private int getNumberOfJournalFiles() throws IOException { + Collection<DataFile> files = + ((KahaDBPersistenceAdapter) service.getPersistenceAdapter()).getStore().getJournal().getFileMap().values(); + int reality = 0; + for (DataFile file : files) { + if (file != null) { + reality++; + } + } + + return reality; + } + + private void createInactiveDurableSub(Topic topic) throws Exception { + Connection connection = cf.createConnection(); + connection.setClientID("Inactive"); + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + MessageConsumer consumer = session.createDurableSubscriber(topic, "Inactive"); + consumer.close(); + connection.close(); + } + + private void consumeDurableMessages(Topic topic, int count) throws Exception { + Connection connection = cf.createConnection(); + connection.setClientID("Inactive"); + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + MessageConsumer consumer = session.createDurableSubscriber(topic, "Inactive"); + connection.start(); + for (int i = 0; i < count; ++i) { + if (consumer.receive(TimeUnit.SECONDS.toMillis(10)) == null) { + fail("should have received a message"); + } + } + consumer.close(); + connection.close(); + } + + private void tryConsumeExpectNone(Topic topic) throws Exception { + Connection connection = cf.createConnection(); + connection.setClientID("Inactive"); + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + MessageConsumer consumer = session.createDurableSubscriber(topic, "Inactive"); + connection.start(); + if (consumer.receive(TimeUnit.SECONDS.toMillis(10)) != null) { + fail("Should be no messages for this durable."); + } + consumer.close(); + connection.close(); + } + + private int consumeFromInactiveDurableSub(Topic topic) throws Exception { + Connection connection = cf.createConnection(); + connection.setClientID("Inactive"); + connection.start(); + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + MessageConsumer consumer = session.createDurableSubscriber(topic, "Inactive"); + + int count = 0; + + while (consumer.receive(10000) != null) { + count++; + } + + consumer.close(); + connection.close(); + + return count; + } + + private void sendMessages(Destination destination) throws Exception { + sendMessages(destination, MSG_COUNT); + } + + private void sendMessages(Destination destination, int count) throws Exception { + Connection connection = cf.createConnection(); + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + MessageProducer producer = session.createProducer(destination); + producer.setDeliveryMode(DeliveryMode.PERSISTENT); + for (int i = 0; i < count; ++i) { + TextMessage message = session.createTextMessage("Message #" + i + " for destination: " + destination); + producer.send(message); + } + connection.close(); + } + +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4213Test.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4213Test.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4213Test.java new file mode 100644 index 0000000..fddb6b1 --- /dev/null +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4213Test.java @@ -0,0 +1,89 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.bugs; + +import static org.junit.Assert.fail; + +import javax.jms.JMSException; +import javax.jms.Session; + +import org.apache.activemq.ActiveMQConnection; +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.broker.BrokerPlugin; +import org.apache.activemq.broker.BrokerPluginSupport; +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.broker.ConnectionContext; +import org.apache.activemq.command.ActiveMQQueue; +import org.apache.activemq.command.ProducerInfo; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +public class AMQ4213Test { + + private static BrokerService brokerService; + private static String BROKER_ADDRESS = "tcp://localhost:0"; + private static String TEST_QUEUE = "testQueue"; + private static ActiveMQQueue queue = new ActiveMQQueue(TEST_QUEUE); + + private String connectionUri; + + @SuppressWarnings("unchecked") + @Before + public void setUp() throws Exception { + brokerService = new BrokerService(); + brokerService.setPersistent(false); + brokerService.setUseJmx(true); + brokerService.setDeleteAllMessagesOnStartup(true); + connectionUri = brokerService.addConnector(BROKER_ADDRESS).getPublishableConnectString(); + + brokerService.setPlugins(new BrokerPlugin[]{ + new BrokerPluginSupport() { + + @Override + public void addProducer(ConnectionContext context, ProducerInfo info) throws Exception { + throw new javax.jms.JMSSecurityException(connectionUri); + } + } + }); + + brokerService.start(); + brokerService.waitUntilStarted(); + } + + @After + public void tearDown() throws Exception { + brokerService.stop(); + brokerService.waitUntilStopped(); + } + + @Test + public void testExceptionOnProducerCreateThrows() throws Exception { + ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(connectionUri); + ActiveMQConnection connection = (ActiveMQConnection) factory.createConnection(); + + Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE); + + connection.start(); + + try { + session.createProducer(queue); + fail("Should not be able to create this producer."); + } catch (JMSException ex) { + } + } +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4220Test.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4220Test.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4220Test.java new file mode 100644 index 0000000..7084bde --- /dev/null +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4220Test.java @@ -0,0 +1,120 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.bugs; + +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; + +import java.io.IOException; +import java.util.ArrayList; + +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.command.ActiveMQQueue; +import org.apache.activemq.store.PersistenceAdapter; +import org.apache.activemq.store.kahadb.FilteredKahaDBPersistenceAdapter; +import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter; +import org.apache.activemq.store.kahadb.MultiKahaDBPersistenceAdapter; +import org.apache.activemq.util.Wait; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class AMQ4220Test { + + static final Logger LOG = LoggerFactory.getLogger(AMQ4220Test.class); + private final static int maxFileLength = 1024*1024*32; + private final static String destinationName = "TEST.QUEUE"; + BrokerService broker; + + @Before + public void setUp() throws Exception { + prepareBrokerWithMultiStore(true); + broker.start(); + broker.waitUntilStarted(); + } + + @After + public void tearDown() throws Exception { + broker.stop(); + } + + protected BrokerService createBroker(PersistenceAdapter kaha) throws Exception { + BrokerService broker = new BrokerService(); + broker.setUseJmx(true); + broker.setBrokerName("localhost"); + broker.setPersistenceAdapter(kaha); + return broker; + } + + @Test + public void testRestartAfterQueueDelete() throws Exception { + + // Ensure we have an Admin View. + assertTrue("Broker doesn't have an Admin View.", Wait.waitFor(new Wait.Condition() { + @Override + public boolean isSatisified() throws Exception { + return (broker.getAdminView()) != null; + } + })); + + + LOG.info("Adding initial destination: {}", destinationName); + + broker.getAdminView().addQueue(destinationName); + + assertNotNull(broker.getDestination(new ActiveMQQueue(destinationName))); + + LOG.info("Removing initial destination: {}", destinationName); + + broker.getAdminView().removeQueue(destinationName); + + LOG.info("Adding back destination: {}", destinationName); + + broker.getAdminView().addQueue(destinationName); + + assertNotNull(broker.getDestination(new ActiveMQQueue(destinationName))); + } + + protected KahaDBPersistenceAdapter createStore(boolean delete) throws IOException { + KahaDBPersistenceAdapter kaha = new KahaDBPersistenceAdapter(); + kaha.setJournalMaxFileLength(maxFileLength); + kaha.setCleanupInterval(5000); + if (delete) { + kaha.deleteAllMessages(); + } + return kaha; + } + + public void prepareBrokerWithMultiStore(boolean deleteAllMessages) throws Exception { + + MultiKahaDBPersistenceAdapter multiKahaDBPersistenceAdapter = new MultiKahaDBPersistenceAdapter(); + if (deleteAllMessages) { + multiKahaDBPersistenceAdapter.deleteAllMessages(); + } + ArrayList<FilteredKahaDBPersistenceAdapter> adapters = new ArrayList<FilteredKahaDBPersistenceAdapter>(); + + FilteredKahaDBPersistenceAdapter template = new FilteredKahaDBPersistenceAdapter(); + template.setPersistenceAdapter(createStore(deleteAllMessages)); + template.setPerDestination(true); + adapters.add(template); + + multiKahaDBPersistenceAdapter.setFilteredPersistenceAdapters(adapters); + broker = createBroker(multiKahaDBPersistenceAdapter); + } +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4221Test.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4221Test.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4221Test.java new file mode 100644 index 0000000..55e8027 --- /dev/null +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4221Test.java @@ -0,0 +1,267 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.bugs; + +import java.util.HashSet; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import javax.jms.BytesMessage; +import javax.jms.Connection; +import javax.jms.DeliveryMode; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Session; +import junit.framework.Test; +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.ActiveMQPrefetchPolicy; +import org.apache.activemq.TestSupport; +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.broker.TransportConnector; +import org.apache.activemq.broker.region.DestinationStatistics; +import org.apache.activemq.broker.region.policy.FilePendingQueueMessageStoragePolicy; +import org.apache.activemq.broker.region.policy.PolicyEntry; +import org.apache.activemq.broker.region.policy.PolicyMap; +import org.apache.activemq.command.ActiveMQDestination; +import org.apache.activemq.command.ActiveMQQueue; +import org.apache.activemq.store.kahadb.plist.PListStoreImpl; +import org.apache.activemq.util.DefaultTestAppender; +import org.apache.log4j.Level; +import org.apache.log4j.LogManager; +import org.apache.log4j.spi.LoggingEvent; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class AMQ4221Test extends TestSupport { + private static final Logger LOG = LoggerFactory.getLogger(AMQ4221Test.class); + public int PAYLOAD_SIZE_BYTES = 4 * 1024; + public int NUM_TO_SEND = 60000; + public int NUM_CONCURRENT_PRODUCERS = 20; + public int QUEUE_COUNT = 1; + public int TMP_JOURNAL_MAX_FILE_SIZE = 10 * 1024 * 1024; + + public int DLQ_PURGE_INTERVAL = 30000; + + public int MESSAGE_TIME_TO_LIVE = 20000; + public int EXPIRE_SWEEP_PERIOD = 200; + public int TMP_JOURNAL_GC_PERIOD = 50; + public int RECEIVE_POLL_PERIOD = 4000; + private int RECEIVE_BATCH = 5000; + + final byte[] payload = new byte[PAYLOAD_SIZE_BYTES]; + final AtomicInteger counter = new AtomicInteger(0); + final HashSet<Throwable> exceptions = new HashSet<Throwable>(); + BrokerService brokerService; + private String brokerUrlString; + ExecutorService executorService = Executors.newCachedThreadPool(); + final AtomicBoolean done = new AtomicBoolean(false); + + public static Test suite() { + return suite(AMQ4221Test.class); + } + + @Override + public void setUp() throws Exception { + + LogManager.getRootLogger().addAppender(new DefaultTestAppender() { + + @Override + public void doAppend(LoggingEvent event) { + if (event.getLevel().isGreaterOrEqual(Level.ERROR)) { + System.err.println("exit on error: " + event.getMessage()); + done.set(true); + new Thread() { + public void run() { + System.exit(787); + } + }.start(); + } + } + }); + + done.set(false); + brokerService = new BrokerService(); + brokerService.setDeleteAllMessagesOnStartup(true); + brokerService.setDestinations(new ActiveMQDestination[]{new ActiveMQQueue("ActiveMQ.DLQ")}); + + PolicyEntry defaultPolicy = new PolicyEntry(); + defaultPolicy.setPendingQueuePolicy(new FilePendingQueueMessageStoragePolicy()); + defaultPolicy.setExpireMessagesPeriod(EXPIRE_SWEEP_PERIOD); + defaultPolicy.setProducerFlowControl(false); + defaultPolicy.setMemoryLimit(50 * 1024 * 1024); + + brokerService.getSystemUsage().getMemoryUsage().setLimit(50 * 1024 * 1024); + + + PolicyMap destinationPolicyMap = new PolicyMap(); + destinationPolicyMap.setDefaultEntry(defaultPolicy); + brokerService.setDestinationPolicy(destinationPolicyMap); + + + PListStoreImpl tempDataStore = new PListStoreImpl(); + tempDataStore.setDirectory(brokerService.getTmpDataDirectory()); + tempDataStore.setJournalMaxFileLength(TMP_JOURNAL_MAX_FILE_SIZE); + tempDataStore.setCleanupInterval(TMP_JOURNAL_GC_PERIOD); + tempDataStore.setIndexPageSize(200); + tempDataStore.setIndexEnablePageCaching(false); + + brokerService.setTempDataStore(tempDataStore); + brokerService.setAdvisorySupport(false); + TransportConnector tcp = brokerService.addConnector("tcp://localhost:0"); + brokerService.start(); + brokerUrlString = tcp.getPublishableConnectString(); + } + + @Override + public void tearDown() throws Exception { + brokerService.stop(); + brokerService.waitUntilStopped(); + executorService.shutdownNow(); + } + + public void testProduceConsumeExpireHalf() throws Exception { + + final org.apache.activemq.broker.region.Queue dlq = + (org.apache.activemq.broker.region.Queue) getDestination(brokerService, new ActiveMQQueue("ActiveMQ.DLQ")); + + if (DLQ_PURGE_INTERVAL > 0) { + executorService.execute(new Runnable() { + @Override + public void run() { + while (!done.get()) { + try { + Thread.sleep(DLQ_PURGE_INTERVAL); + LOG.info("Purge DLQ, current size: " + dlq.getDestinationStatistics().getMessages().getCount()); + dlq.purge(); + } catch (InterruptedException allDone) { + } catch (Throwable e) { + e.printStackTrace(); + exceptions.add(e); + } + } + } + }); + + } + + final CountDownLatch latch = new CountDownLatch(QUEUE_COUNT); + for (int i = 0; i < QUEUE_COUNT; i++) { + final int id = i; + executorService.execute(new Runnable() { + @Override + public void run() { + try { + doProduceConsumeExpireHalf(id, latch); + } catch (Throwable e) { + e.printStackTrace(); + exceptions.add(e); + } + } + }); + } + + while (!done.get()) { + done.set(latch.await(5, TimeUnit.SECONDS)); + } + executorService.shutdown(); + executorService.awaitTermination(5, TimeUnit.MINUTES); + + assertTrue("no exceptions:" + exceptions, exceptions.isEmpty()); + + } + + public void doProduceConsumeExpireHalf(int id, CountDownLatch latch) throws Exception { + + final ActiveMQQueue queue = new ActiveMQQueue("Q" + id); + + final ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(brokerUrlString); + ActiveMQPrefetchPolicy prefecthPolicy = new ActiveMQPrefetchPolicy(); + prefecthPolicy.setAll(0); + factory.setPrefetchPolicy(prefecthPolicy); + Connection connection = factory.createConnection(); + connection.start(); + final MessageConsumer consumer = connection.createSession(false, Session.AUTO_ACKNOWLEDGE).createConsumer(queue, "on = 'true'"); + + executorService.execute(new Runnable() { + @Override + public void run() { + try { + while (!done.get()) { + Thread.sleep(RECEIVE_POLL_PERIOD); + for (int i = 0; i < RECEIVE_BATCH && !done.get(); i++) { + + Message message = consumer.receive(1000); + if (message != null) { + counter.incrementAndGet(); + if (counter.get() > 0 && counter.get() % 500 == 0) { + LOG.info("received: " + counter.get() + ", " + message.getJMSDestination().toString()); + } + } + } + } + } catch (JMSException ignored) { + + } catch (Exception e) { + e.printStackTrace(); + exceptions.add(e); + } + } + }); + + final AtomicInteger accumulator = new AtomicInteger(0); + final CountDownLatch producersDone = new CountDownLatch(NUM_CONCURRENT_PRODUCERS); + + for (int i = 0; i < NUM_CONCURRENT_PRODUCERS; i++) { + executorService.execute(new Runnable() { + @Override + public void run() { + try { + Connection sendConnection = factory.createConnection(); + sendConnection.start(); + Session sendSession = sendConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); + MessageProducer producer = sendSession.createProducer(queue); + producer.setTimeToLive(MESSAGE_TIME_TO_LIVE); + producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); + + while (accumulator.incrementAndGet() < NUM_TO_SEND && !done.get()) { + BytesMessage message = sendSession.createBytesMessage(); + message.writeBytes(payload); + message.setStringProperty("on", String.valueOf(accumulator.get() % 2 == 0)); + producer.send(message); + + } + producersDone.countDown(); + } catch (Exception e) { + e.printStackTrace(); + exceptions.add(e); + } + } + }); + } + + producersDone.await(10, TimeUnit.MINUTES); + + final DestinationStatistics view = getDestinationStatistics(brokerService, queue); + LOG.info("total expired so far " + view.getExpired().getCount() + ", " + queue.getQueueName()); + latch.countDown(); + } +}
