http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3436Test.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3436Test.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3436Test.java new file mode 100644 index 0000000..65e0783 --- /dev/null +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3436Test.java @@ -0,0 +1,202 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.bugs; + +import java.net.URI; +import java.util.Random; +import java.util.concurrent.CountDownLatch; + +import javax.jms.DeliveryMode; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageListener; +import javax.jms.MessageProducer; +import javax.jms.Session; + +import org.apache.activemq.ActiveMQConnection; +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.ActiveMQMessageConsumer; +import org.apache.activemq.ActiveMQSession; +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.broker.region.policy.PolicyEntry; +import org.apache.activemq.broker.region.policy.PolicyMap; +import org.apache.activemq.broker.region.policy.SharedDeadLetterStrategy; +import org.apache.activemq.command.ActiveMQQueue; +import org.apache.activemq.store.PersistenceAdapter; +import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class AMQ3436Test { + + protected static final Logger LOG = LoggerFactory.getLogger(AMQ3436Test.class); + + private BrokerService broker; + private PersistenceAdapter adapter; + private boolean useCache = true; + private boolean prioritizeMessages = true; + + protected PersistenceAdapter createPersistenceAdapter(boolean delete) throws Exception { + KahaDBPersistenceAdapter adapter = new KahaDBPersistenceAdapter(); + adapter.setConcurrentStoreAndDispatchQueues(false); + adapter.setConcurrentStoreAndDispatchTopics(false); + adapter.deleteAllMessages(); + return adapter; + } + + @Before + public void setUp() throws Exception { + broker = new BrokerService(); + broker.setBrokerName("priorityTest"); + broker.setAdvisorySupport(false); + broker.setUseJmx(false); + adapter = createPersistenceAdapter(true); + broker.setPersistenceAdapter(adapter); + PolicyEntry policy = new PolicyEntry(); + policy.setPrioritizedMessages(prioritizeMessages); + policy.setUseCache(useCache); + policy.setProducerFlowControl(false); + PolicyMap policyMap = new PolicyMap(); + policyMap.put(new ActiveMQQueue("TEST"), policy); + + // do not process expired for one test + PolicyEntry ignoreExpired = new PolicyEntry(); + SharedDeadLetterStrategy ignoreExpiredStrategy = new SharedDeadLetterStrategy(); + ignoreExpiredStrategy.setProcessExpired(false); + ignoreExpired.setDeadLetterStrategy(ignoreExpiredStrategy); + + broker.setDestinationPolicy(policyMap); + broker.start(); + broker.waitUntilStarted(); + } + + protected void tearDown() throws Exception { + broker.stop(); + broker.waitUntilStopped(); + } + + @Test + public void testPriorityWhenConsumerCreatedBeforeProduction() throws Exception { + + int messageCount = 200; + URI failoverUri = new URI("vm://priorityTest?jms.prefetchPolicy.all=1"); + + ActiveMQQueue dest = new ActiveMQQueue("TEST?consumer.dispatchAsync=false"); + + ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory(failoverUri); + cf.setDispatchAsync(false); + + // Create producer + ActiveMQConnection producerConnection = (ActiveMQConnection) cf.createConnection(); + producerConnection.setMessagePrioritySupported(true); + producerConnection.start(); + final Session producerSession = producerConnection.createSession(true, Session.SESSION_TRANSACTED); + MessageProducer producer = producerSession.createProducer(dest); + + ActiveMQMessageConsumer consumer; + + // Create consumer on separate connection + ActiveMQConnection consumerConnection = (ActiveMQConnection) cf.createConnection(); + consumerConnection.setMessagePrioritySupported(true); + consumerConnection.start(); + final ActiveMQSession consumerSession = (ActiveMQSession) consumerConnection.createSession(true, + Session.SESSION_TRANSACTED); + consumer = (ActiveMQMessageConsumer) consumerSession.createConsumer(dest); + + // Produce X number of messages with a session commit after each message + Random random = new Random(); + for (int i = 0; i < messageCount; ++i) { + + Message message = producerSession.createTextMessage("Test message #" + i); + producer.send(message, DeliveryMode.PERSISTENT, random.nextInt(10), 45*1000); + producerSession.commit(); + } + producer.close(); + + // *************************************************** + // If we create the consumer here instead of above, the + // the messages will be consumed in priority order + // *************************************************** + //consumer = (ActiveMQMessageConsumer) consumerSession.createConsumer(dest); + + // Consume all of the messages we produce using a listener. + // Don't exit until we get all the messages. + final CountDownLatch latch = new CountDownLatch(messageCount); + final StringBuffer failureMessage = new StringBuffer(); + consumer.setMessageListener(new MessageListener() { + int lowestPrioritySeen = 10; + + boolean firstMessage = true; + + public void onMessage(Message msg) { + try { + + int currentPriority = msg.getJMSPriority(); + LOG.debug(currentPriority + "<=" + lowestPrioritySeen); + + // Ignore the first message priority since it is prefetched + // and is out of order by design + if (firstMessage == true) { + firstMessage = false; + LOG.debug("Ignoring first message since it was prefetched"); + + } else { + + // Verify that we never see a priority higher than the + // lowest + // priority seen + if (lowestPrioritySeen > currentPriority) { + lowestPrioritySeen = currentPriority; + } + if (lowestPrioritySeen < currentPriority) { + failureMessage.append("Incorrect priority seen (Lowest Priority = " + lowestPrioritySeen + + " Current Priority = " + currentPriority + ")" + + System.getProperty("line.separator")); + } + } + + } catch (JMSException e) { + e.printStackTrace(); + } finally { + latch.countDown(); + LOG.debug("Messages remaining = " + latch.getCount()); + } + } + }); + + latch.await(); + consumer.close(); + + // Cleanup producer resources + producerSession.close(); + producerConnection.stop(); + producerConnection.close(); + + // Cleanup consumer resources + consumerSession.close(); + consumerConnection.stop(); + consumerConnection.close(); + + // Report the failure if found + if (failureMessage.length() > 0) { + Assert.fail(failureMessage.toString()); + } + } +}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3445Test.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3445Test.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3445Test.java new file mode 100644 index 0000000..73035e2 --- /dev/null +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3445Test.java @@ -0,0 +1,150 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.bugs; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +import javax.jms.Connection; +import javax.jms.ConnectionFactory; +import javax.jms.DeliveryMode; +import javax.jms.MessageProducer; +import javax.jms.Session; +import javax.management.ObjectName; + +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.broker.jmx.QueueViewMBean; +import org.apache.activemq.store.jdbc.JDBCPersistenceAdapter; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +public class AMQ3445Test { + + private ConnectionFactory connectionFactory; + private BrokerService broker; + private String connectionUri; + + private final String queueName = "Consumer.MyApp.VirtualTopic.FOO"; + private final String topicName = "VirtualTopic.FOO"; + + @Before + public void startBroker() throws Exception { + createBroker(true); + } + + private void createBroker(boolean deleteMessages) throws Exception { + broker = new BrokerService(); + broker.setDeleteAllMessagesOnStartup(deleteMessages); + broker.setPersistenceAdapter(new JDBCPersistenceAdapter()); + broker.setAdvisorySupport(false); + broker.addConnector("tcp://0.0.0.0:0"); + broker.start(); + broker.waitUntilStarted(); + connectionUri = broker.getTransportConnectors().get(0).getPublishableConnectString(); + connectionFactory = new ActiveMQConnectionFactory(connectionUri); + } + + private void restartBroker() throws Exception { + if (broker != null) { + broker.stop(); + broker.waitUntilStopped(); + } + + createBroker(false); + } + + @After + public void tearDown() throws Exception { + if (broker != null) { + broker.stop(); + broker.waitUntilStopped(); + } + } + + @Test + public void testJDBCRetiansDestinationAfterRestart() throws Exception { + + broker.getAdminView().addQueue(queueName); + broker.getAdminView().addTopic(topicName); + + assertTrue(findDestination(queueName, false)); + assertTrue(findDestination(topicName, true)); + + QueueViewMBean queue = getProxyToQueueViewMBean(); + assertEquals(0, queue.getQueueSize()); + + restartBroker(); + + assertTrue(findDestination(queueName, false)); + queue = getProxyToQueueViewMBean(); + assertEquals(0, queue.getQueueSize()); + + sendMessage(); + restartBroker(); + assertTrue(findDestination(queueName, false)); + + queue = getProxyToQueueViewMBean(); + assertEquals(1, queue.getQueueSize()); + sendMessage(); + assertEquals(2, queue.getQueueSize()); + + restartBroker(); + assertTrue(findDestination(queueName, false)); + queue = getProxyToQueueViewMBean(); + assertEquals(2, queue.getQueueSize()); + } + + private void sendMessage() throws Exception { + Connection connection = connectionFactory.createConnection(); + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + MessageProducer producer = session.createProducer(session.createTopic(topicName)); + producer.setDeliveryMode(DeliveryMode.PERSISTENT); + producer.send(session.createTextMessage("Testing")); + producer.close(); + connection.close(); + } + + private QueueViewMBean getProxyToQueueViewMBean() throws Exception { + ObjectName queueViewMBeanName = new ObjectName("org.apache.activemq" + + ":destinationType=Queue,destinationName=" + queueName + + ",type=Broker,brokerName=localhost"); + QueueViewMBean proxy = (QueueViewMBean) broker.getManagementContext() + .newProxyInstance(queueViewMBeanName, QueueViewMBean.class, true); + return proxy; + } + + private boolean findDestination(String name, boolean topic) throws Exception { + + ObjectName[] destinations; + + if (topic) { + destinations = broker.getAdminView().getTopics(); + } else { + destinations = broker.getAdminView().getQueues(); + } + + for (ObjectName destination : destinations) { + if (destination.toString().contains(name)) { + return true; + } + } + + return false; + } +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3454Test.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3454Test.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3454Test.java new file mode 100644 index 0000000..99c12fc --- /dev/null +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3454Test.java @@ -0,0 +1,74 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.bugs; + +import javax.jms.Connection; +import javax.jms.Message; +import javax.jms.MessageProducer; +import javax.jms.Session; +import junit.framework.TestCase; +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.command.ActiveMQDestination; +import org.apache.activemq.command.ActiveMQQueue; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class AMQ3454Test extends TestCase { + + private static final Logger LOG = LoggerFactory.getLogger(AMQ3454Test.class); + private static final int MESSAGES_COUNT = 10000; + + public void testSendWithLotsOfDestinations() throws Exception { + final BrokerService broker = new BrokerService(); + broker.setPersistent(false); + broker.setUseJmx(false); + broker.setDeleteAllMessagesOnStartup(true); + + broker.addConnector("tcp://localhost:0"); + + // populate a bunch of destinations, validate the impact on a call to send + ActiveMQDestination[] destinations = new ActiveMQDestination[MESSAGES_COUNT]; + for (int idx = 0; idx < MESSAGES_COUNT; ++idx) { + destinations[idx] = new ActiveMQQueue(getDestinationName() + "-" + idx); + } + broker.setDestinations(destinations); + broker.start(); + + ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory( + broker.getTransportConnectors().get(0).getPublishableConnectString()); + final Connection connection = factory.createConnection(); + connection.start(); + + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + MessageProducer producer = session.createProducer(new ActiveMQQueue(getDestinationName())); + + long start = System.currentTimeMillis(); + for (int idx = 0; idx < MESSAGES_COUNT; ++idx) { + Message message = session.createTextMessage("" + idx); + producer.send(message); + } + LOG.info("Duration: " + (System.currentTimeMillis() - start) + " millis"); + producer.close(); + session.close(); + + } + + protected String getDestinationName() { + return getClass().getName() + "." + getName(); + } +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3465Test.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3465Test.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3465Test.java new file mode 100644 index 0000000..bac3829 --- /dev/null +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3465Test.java @@ -0,0 +1,195 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.bugs; + +import static org.junit.Assert.*; + +import java.io.ByteArrayOutputStream; +import java.io.DataOutputStream; +import java.io.IOException; + +import javax.jms.Connection; +import javax.jms.ConnectionFactory; +import javax.jms.Destination; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Session; +import javax.jms.TextMessage; +import javax.jms.XAConnection; +import javax.jms.XAConnectionFactory; +import javax.jms.XASession; +import javax.transaction.xa.XAResource; +import javax.transaction.xa.Xid; + +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.ActiveMQMessageProducer; +import org.apache.activemq.ActiveMQSession; +import org.apache.activemq.ActiveMQXAConnectionFactory; +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.command.ActiveMQQueue; +import org.apache.activemq.command.ActiveMQTextMessage; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +public class AMQ3465Test +{ + private final String xaDestinationName = "DestinationXA"; + private final String destinationName = "Destination"; + private BrokerService broker; + private String connectionUri; + private long txGenerator = System.currentTimeMillis(); + + private XAConnectionFactory xaConnectionFactory; + private ConnectionFactory connectionFactory; + + @Before + public void startBroker() throws Exception { + broker = new BrokerService(); + broker.setDeleteAllMessagesOnStartup(true); + broker.setPersistent(false); + broker.setUseJmx(false); + broker.addConnector("tcp://0.0.0.0:0"); + broker.start(); + broker.waitUntilStarted(); + + connectionUri = broker.getTransportConnectors().get(0).getPublishableConnectString(); + + connectionFactory = new ActiveMQConnectionFactory(connectionUri); + xaConnectionFactory = new ActiveMQXAConnectionFactory(connectionUri); + } + + @After + public void stopBroker() throws Exception { + broker.stop(); + broker.waitUntilStopped(); + } + + @Test + public void testMixedXAandNonXAorTXSessions() throws Exception { + + XAConnection xaConnection = xaConnectionFactory.createXAConnection(); + xaConnection.start(); + XASession session = xaConnection.createXASession(); + XAResource resource = session.getXAResource(); + Destination dest = new ActiveMQQueue(xaDestinationName); + + // publish a message + Xid tid = createXid(); + resource.start(tid, XAResource.TMNOFLAGS); + MessageProducer producer = session.createProducer(dest); + ActiveMQTextMessage message = new ActiveMQTextMessage(); + message.setText("Some Text"); + producer.send(message); + resource.end(tid, XAResource.TMSUCCESS); + resource.commit(tid, true); + session.close(); + + session = xaConnection.createXASession(); + MessageConsumer consumer = session.createConsumer(dest); + tid = createXid(); + resource = session.getXAResource(); + resource.start(tid, XAResource.TMNOFLAGS); + TextMessage receivedMessage = (TextMessage) consumer.receive(1000); + assertNotNull(receivedMessage); + assertEquals("Some Text", receivedMessage.getText()); + resource.end(tid, XAResource.TMSUCCESS); + + // Test that a normal session doesn't operate on XASession state. + Connection connection2 = connectionFactory.createConnection(); + connection2.start(); + ActiveMQSession session2 = (ActiveMQSession) connection2.createSession(false, Session.AUTO_ACKNOWLEDGE); + + if (session2.isTransacted()) { + session2.rollback(); + } + + session2.close(); + + resource.commit(tid, true); + } + + @Test + public void testMixedXAandNonXALocalTXSessions() throws Exception { + + XAConnection xaConnection = xaConnectionFactory.createXAConnection(); + xaConnection.start(); + XASession session = xaConnection.createXASession(); + XAResource resource = session.getXAResource(); + Destination dest = new ActiveMQQueue(xaDestinationName); + + // publish a message + Xid tid = createXid(); + resource.start(tid, XAResource.TMNOFLAGS); + MessageProducer producer = session.createProducer(dest); + ActiveMQTextMessage message = new ActiveMQTextMessage(); + message.setText("Some Text"); + producer.send(message); + resource.end(tid, XAResource.TMSUCCESS); + resource.commit(tid, true); + session.close(); + + session = xaConnection.createXASession(); + MessageConsumer consumer = session.createConsumer(dest); + tid = createXid(); + resource = session.getXAResource(); + resource.start(tid, XAResource.TMNOFLAGS); + TextMessage receivedMessage = (TextMessage) consumer.receive(1000); + assertNotNull(receivedMessage); + assertEquals("Some Text", receivedMessage.getText()); + resource.end(tid, XAResource.TMSUCCESS); + + // Test that a normal session doesn't operate on XASession state. + Connection connection2 = connectionFactory.createConnection(); + connection2.start(); + ActiveMQSession session2 = (ActiveMQSession) connection2.createSession(true, Session.AUTO_ACKNOWLEDGE); + Destination destination = new ActiveMQQueue(destinationName); + ActiveMQMessageProducer producer2 = (ActiveMQMessageProducer) session2.createProducer(destination); + producer2.send(session2.createTextMessage("Local-TX")); + + if (session2.isTransacted()) { + session2.rollback(); + } + + session2.close(); + + resource.commit(tid, true); + } + + public Xid createXid() throws IOException { + + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + DataOutputStream os = new DataOutputStream(baos); + os.writeLong(++txGenerator); + os.close(); + final byte[] bs = baos.toByteArray(); + + return new Xid() { + public int getFormatId() { + return 86; + } + + public byte[] getGlobalTransactionId() { + return bs; + } + + public byte[] getBranchQualifier() { + return bs; + } + }; + } +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3529Test.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3529Test.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3529Test.java new file mode 100644 index 0000000..7e8b9d0 --- /dev/null +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3529Test.java @@ -0,0 +1,177 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +package org.apache.activemq.bugs; + +import java.util.Properties; + +import javax.jms.Connection; +import javax.jms.ConnectionFactory; +import javax.jms.Destination; +import javax.jms.JMSException; +import javax.jms.MessageConsumer; +import javax.jms.Session; +import javax.naming.Context; +import javax.naming.InitialContext; +import javax.naming.NamingException; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.fail; + +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.broker.BrokerService; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class AMQ3529Test { + + private static Logger LOG = LoggerFactory.getLogger(AMQ3529Test.class); + + private ConnectionFactory connectionFactory; + private Connection connection; + private Session session; + private BrokerService broker; + private String connectionUri; + private MessageConsumer consumer; + private Context ctx = null; + + @Before + public void startBroker() throws Exception { + broker = new BrokerService(); + broker.setDeleteAllMessagesOnStartup(true); + broker.setPersistent(false); + broker.setUseJmx(false); + broker.addConnector("tcp://0.0.0.0:0"); + broker.start(); + broker.waitUntilStarted(); + + connectionUri = broker.getTransportConnectors().get(0).getPublishableConnectString(); + + connectionFactory = new ActiveMQConnectionFactory(connectionUri); + } + + @After + public void stopBroker() throws Exception { + broker.stop(); + broker.waitUntilStopped(); + } + + @Test(timeout = 60000) + public void testInterruptionAffects() throws Exception { + ThreadGroup tg = new ThreadGroup("tg"); + + assertEquals(0, tg.activeCount()); + + Thread client = new Thread(tg, "client") { + + @Override + public void run() { + try { + connection = connectionFactory.createConnection(); + session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + assertNotNull(session); + + Properties props = new Properties(); + props.setProperty(Context.INITIAL_CONTEXT_FACTORY, "org.apache.activemq.jndi.ActiveMQInitialContextFactory"); + props.setProperty(Context.PROVIDER_URL, "tcp://0.0.0.0:0"); + ctx = null; + try { + ctx = new InitialContext(props); + } catch (NoClassDefFoundError e) { + throw new NamingException(e.toString()); + } catch (Exception e) { + throw new NamingException(e.toString()); + } + Destination destination = (Destination) ctx.lookup("dynamicTopics/example.C"); + consumer = session.createConsumer(destination); + consumer.receive(10000); + } catch (Exception e) { + // Expect an exception here from the interrupt. + } finally { + // next line is the nature of the test, if I remove this + // line, everything works OK + try { + consumer.close(); + } catch (JMSException e) { + fail("Consumer Close failed with" + e.getMessage()); + } + try { + session.close(); + } catch (JMSException e) { + fail("Session Close failed with" + e.getMessage()); + } + try { + connection.close(); + } catch (JMSException e) { + fail("Connection Close failed with" + e.getMessage()); + } + try { + ctx.close(); + } catch (Exception e) { + fail("Connection Close failed with" + e.getMessage()); + } + } + } + }; + client.start(); + Thread.sleep(5000); + client.interrupt(); + client.join(); + Thread.sleep(2000); + Thread[] remainThreads = new Thread[tg.activeCount()]; + tg.enumerate(remainThreads); + for (Thread t : remainThreads) { + if (t.isAlive() && !t.isDaemon()) + fail("Remaining thread: " + t.toString()); + } + + ThreadGroup root = Thread.currentThread().getThreadGroup().getParent(); + while (root.getParent() != null) { + root = root.getParent(); + } + visit(root, 0); + } + + // This method recursively visits all thread groups under `group'. + public static void visit(ThreadGroup group, int level) { + // Get threads in `group' + int numThreads = group.activeCount(); + Thread[] threads = new Thread[numThreads * 2]; + numThreads = group.enumerate(threads, false); + + // Enumerate each thread in `group' + for (int i = 0; i < numThreads; i++) { + // Get thread + Thread thread = threads[i]; + LOG.debug("Thread:" + thread.getName() + " is still running"); + } + + // Get thread subgroups of `group' + int numGroups = group.activeGroupCount(); + ThreadGroup[] groups = new ThreadGroup[numGroups * 2]; + numGroups = group.enumerate(groups, false); + + // Recursively visit each subgroup + for (int i = 0; i < numGroups; i++) { + visit(groups[i], level + 1); + } + } +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3537Test.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3537Test.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3537Test.java new file mode 100644 index 0000000..fe8e3fd --- /dev/null +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3537Test.java @@ -0,0 +1,104 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.bugs; + +import static org.junit.Assert.assertEquals; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.ObjectOutputStream; +import java.io.Serializable; +import java.lang.reflect.InvocationHandler; +import java.lang.reflect.Method; +import java.util.ArrayList; +import java.util.List; + +import org.apache.activemq.util.ClassLoadingAwareObjectInputStream; +import org.junit.Before; +import org.junit.Test; + +/** + * Quick port to java to support AMQ build. + * + * This test demonstrates the classloader problem in the + * ClassLoadingAwareObjectInputStream impl. If the first interface in the proxy + * interfaces list is JDK and there are any subsequent interfaces that are NOT + * JDK interfaces the ClassLoadingAwareObjectInputStream will ignore their + * respective classloaders and cause the Proxy to throw an + * IllegalArgumentException because the core JDK classloader can't load the + * interfaces that are not JDK interfaces. + * + * See AMQ-3537 + * + * @author jason.yankus + * + */ +@SuppressWarnings({ "rawtypes", "unchecked" }) +public class AMQ3537Test implements InvocationHandler, Serializable { + + private static final long serialVersionUID = 1L; + + /** + * If the first and second element in this array are swapped, the test will + * fail. + */ + public static final Class[] TEST_CLASSES = new Class[] { List.class, NonJDKList.class, Serializable.class }; + + /** Underlying list */ + private final List l = new ArrayList<String>(); + + @Before + public void setUp() throws Exception { + l.add("foo"); + } + + @Test + public void testDeserializeProxy() throws Exception { + // create the proxy + List proxy = (List) java.lang.reflect.Proxy.newProxyInstance(this.getClass().getClassLoader(), TEST_CLASSES, this); + + // serialize it + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + ObjectOutputStream oos = new ObjectOutputStream(baos); + oos.writeObject(proxy); + byte[] serializedProxy = baos.toByteArray(); + oos.close(); + baos.close(); + + // deserialize the proxy + ClassLoadingAwareObjectInputStream claois = + new ClassLoadingAwareObjectInputStream(new ByteArrayInputStream(serializedProxy)); + + // this is where it fails due to the rudimentary classloader selection + // in ClassLoadingAwareObjectInputStream + List deserializedProxy = (List) claois.readObject(); + + claois.close(); + + // assert the invocation worked + assertEquals("foo", deserializedProxy.get(0)); + } + + @Override + public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { + return method.invoke(l, args); + } + + public interface NonJDKList { + int size(); + } +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3567Test.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3567Test.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3567Test.java new file mode 100644 index 0000000..b4ce82f --- /dev/null +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3567Test.java @@ -0,0 +1,208 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.bugs; + +import static org.junit.Assert.fail; + +import java.util.concurrent.atomic.AtomicBoolean; + +import javax.jms.Connection; +import javax.jms.Destination; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageListener; +import javax.jms.MessageProducer; +import javax.jms.Queue; +import javax.jms.Session; + +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.util.DefaultTestAppender; +import org.apache.log4j.Appender; +import org.apache.log4j.Level; +import org.apache.log4j.spi.LoggingEvent; +import org.junit.Before; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * @author Claudio Corsi + * + */ +public class AMQ3567Test { + + private static Logger logger = LoggerFactory.getLogger(AMQ3567Test.class); + + private ActiveMQConnectionFactory factory; + private Connection connection; + private Session sessionWithListener, session; + private Queue destination; + private MessageConsumer consumer; + private Thread thread; + private BrokerService broker; + private String connectionUri; + + /** + * @throws java.lang.Exception + */ + @Before + public void setUp() throws Exception { + startBroker(); + initializeConsumer(); + startConsumer(); + } + + @Test + public void runTest() throws Exception { + produceSingleMessage(); + org.apache.log4j.Logger log4jLogger = org.apache.log4j.Logger.getLogger("org.apache.activemq.util.ServiceSupport"); + final AtomicBoolean failed = new AtomicBoolean(false); + + Appender appender = new DefaultTestAppender() { + @Override + public void doAppend(LoggingEvent event) { + if (event.getThrowableInformation() != null) { + if (event.getThrowableInformation().getThrowable() instanceof InterruptedException) { + InterruptedException ie = (InterruptedException)event.getThrowableInformation().getThrowable(); + if (ie.getMessage().startsWith("Could not stop service:")) { + logger.info("Received an interrupted exception : ", ie); + failed.set(true); + } + } + } + } + }; + log4jLogger.addAppender(appender); + + Level level = log4jLogger.getLevel(); + log4jLogger.setLevel(Level.DEBUG); + + try { + stopConsumer(); + stopBroker(); + if (failed.get()) { + fail("An Interrupt exception was generated"); + } + + } finally { + log4jLogger.setLevel(level); + log4jLogger.removeAppender(appender); + } + } + + private void startBroker() throws Exception { + broker = new BrokerService(); + broker.setDataDirectory("target/data"); + connectionUri = broker.addConnector("tcp://localhost:0?wireFormat.maxInactivityDuration=30000&transport.closeAsync=false&transport.threadName&soTimeout=60000&transport.keepAlive=false&transport.useInactivityMonitor=false").getPublishableConnectString(); + broker.start(true); + broker.waitUntilStarted(); + } + + private void stopBroker() throws Exception { + broker.stop(); + broker.waitUntilStopped(); + } + + private void initializeConsumer() throws JMSException { + logger.info("Initializing the consumer messagor that will just not do anything...."); + factory = new ActiveMQConnectionFactory(); + factory.setBrokerURL("failover:("+connectionUri+"?wireFormat.maxInactivityDuration=30000&keepAlive=true&soTimeout=60000)?jms.watchTopicAdvisories=false&jms.useAsyncSend=false&jms.dispatchAsync=true&jms.producerWindowSize=10485760&jms.copyMessageOnSend=false&jms.disableTimeStampsByDefault=true&InitialReconnectDelay=1000&maxReconnectDelay=10000&maxReconnectAttempts=400&useExponentialBackOff=true"); + connection = factory.createConnection(); + connection.start(); + sessionWithListener = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + destination = sessionWithListener.createQueue("EMPTY.QUEUE"); + } + + private void startConsumer() throws Exception { + logger.info("Starting the consumer"); + consumer = sessionWithListener.createConsumer(destination); + consumer.setMessageListener(new MessageListener() { + + @Override + public void onMessage(Message message) { + logger.info("Received a message: " + message); + } + + }); + + thread = new Thread(new Runnable() { + + private Session session; + + @Override + public void run() { + try { + destination = session.createQueue("EMPTY.QUEUE"); + MessageConsumer consumer = session.createConsumer(destination); + for (int cnt = 0; cnt < 2; cnt++) { + Message message = consumer.receive(50000); + logger.info("Received message: " + message); + } + } catch (JMSException e) { + logger.debug("Received an exception while processing messages", e); + } finally { + try { + session.close(); + } catch (JMSException e) { + logger.debug("Received an exception while closing session", e); + } + } + } + + public Runnable setSession(Session session) { + this.session = session; + return this; + } + + }.setSession(session)) { + { + start(); + } + }; + } + + private void stopConsumer() throws JMSException { + logger.info("Stopping the consumer"); + try { + thread.join(); + } catch (InterruptedException e) { + logger.debug("Received an exception while waiting for thread to complete", e); + } + if (sessionWithListener != null) { + sessionWithListener.close(); + } + if (connection != null) { + connection.stop(); + } + } + + private void produceSingleMessage() throws JMSException { + ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(); + factory.setBrokerURL(connectionUri); + Connection connection = factory.createConnection(); + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + Destination destination = session.createQueue("EMPTY.QUEUE"); + MessageProducer producer = session.createProducer(destination); + producer.send(session.createTextMessage("Single Message")); + producer.close(); + session.close(); + connection.close(); + } +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3622Test.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3622Test.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3622Test.java new file mode 100644 index 0000000..e08279c --- /dev/null +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3622Test.java @@ -0,0 +1,109 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.bugs; + +import static org.junit.Assert.fail; + +import java.io.File; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.atomic.AtomicBoolean; + +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.broker.region.policy.FilePendingSubscriberMessageStoragePolicy; +import org.apache.activemq.broker.region.policy.LastImageSubscriptionRecoveryPolicy; +import org.apache.activemq.broker.region.policy.PolicyEntry; +import org.apache.activemq.broker.region.policy.PolicyMap; +import org.apache.activemq.transport.stomp.Stomp; +import org.apache.activemq.transport.stomp.StompConnection; +import org.apache.activemq.util.DefaultTestAppender; +import org.apache.log4j.Appender; +import org.apache.log4j.Logger; +import org.apache.log4j.spi.LoggingEvent; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +public class AMQ3622Test { + + protected BrokerService broker; + protected AtomicBoolean failed = new AtomicBoolean(false); + protected String connectionUri; + protected Appender appender = new DefaultTestAppender() { + + @Override + public void doAppend(LoggingEvent event) { + System.err.println(event.getMessage()); + if (event.getThrowableInformation() != null) { + if (event.getThrowableInformation().getThrowable() instanceof NullPointerException) { + failed.set(true); + } + } + } + }; + + @Before + public void before() throws Exception { + Logger.getRootLogger().addAppender(appender); + + broker = new BrokerService(); + broker.setDataDirectory("target" + File.separator + "activemq-data"); + broker.setPersistent(true); + broker.setDeleteAllMessagesOnStartup(true); + PolicyEntry policy = new PolicyEntry(); + policy.setTopic(">"); + policy.setProducerFlowControl(false); + policy.setMemoryLimit(1 * 1024 * 1024); + policy.setPendingSubscriberPolicy(new FilePendingSubscriberMessageStoragePolicy()); + policy.setSubscriptionRecoveryPolicy(new LastImageSubscriptionRecoveryPolicy()); + policy.setExpireMessagesPeriod(500); + List<PolicyEntry> entries = new ArrayList<PolicyEntry>(); + + entries.add(policy); + PolicyMap pMap = new PolicyMap(); + pMap.setPolicyEntries(entries); + broker.setDestinationPolicy(pMap); + + connectionUri = broker.addConnector("stomp://localhost:0").getPublishableConnectString(); + + broker.start(); + broker.waitUntilStarted(); + } + + @After + public void after() throws Exception { + broker.stop(); + broker.waitUntilStopped(); + Logger.getRootLogger().removeAppender(appender); + } + + @Test + public void go() throws Exception { + StompConnection connection = new StompConnection(); + Integer port = Integer.parseInt(connectionUri.split(":")[2]); + connection.open("localhost", port); + connection.connect("", ""); + connection.subscribe("/topic/foobar", Stomp.Headers.Subscribe.AckModeValues.CLIENT); + connection.disconnect(); + Thread.sleep(1000); + + if (failed.get()) { + fail("Received NullPointerException"); + } + } + +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3625Test.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3625Test.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3625Test.java new file mode 100644 index 0000000..a386202 --- /dev/null +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3625Test.java @@ -0,0 +1,111 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.bugs; + +import org.apache.activemq.broker.BrokerFactory; +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.util.DefaultTestAppender; +import org.apache.log4j.Appender; +import org.apache.log4j.Logger; +import org.apache.log4j.spi.LoggingEvent; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import java.io.File; +import java.util.concurrent.atomic.AtomicBoolean; + +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +/** + * + */ + +public class AMQ3625Test { + + protected BrokerService broker1; + protected BrokerService broker2; + + protected AtomicBoolean authenticationFailed = new AtomicBoolean(false); + protected AtomicBoolean gotNPE = new AtomicBoolean(false); + + protected String java_security_auth_login_config = "java.security.auth.login.config"; + protected String xbean = "xbean:"; + protected String base = "src/test/resources/org/apache/activemq/bugs/amq3625"; + protected String conf = "conf"; + protected String keys = "keys"; + protected String JaasStompSSLBroker1_xml = "JaasStompSSLBroker1.xml"; + protected String JaasStompSSLBroker2_xml = "JaasStompSSLBroker2.xml"; + + protected String oldLoginConf = null; + + @Before + public void before() throws Exception { + if (System.getProperty(java_security_auth_login_config) != null) { + oldLoginConf = System.getProperty(java_security_auth_login_config); + } + System.setProperty(java_security_auth_login_config, base + "/" + conf + "/" + "login.config"); + broker1 = BrokerFactory.createBroker(xbean + base + "/" + conf + "/" + JaasStompSSLBroker1_xml); + broker2 = BrokerFactory.createBroker(xbean + base + "/" + conf + "/" + JaasStompSSLBroker2_xml); + + broker1.start(); + broker1.waitUntilStarted(); + broker2.start(); + broker2.waitUntilStarted(); + } + + @After + public void after() throws Exception { + broker1.stop(); + broker2.stop(); + + if (oldLoginConf != null) { + System.setProperty(java_security_auth_login_config, oldLoginConf); + } + } + + @Test + public void go() throws Exception { + Appender appender = new DefaultTestAppender() { + @Override + public void doAppend(LoggingEvent event) { + if (event.getThrowableInformation() != null) { + Throwable t = event.getThrowableInformation().getThrowable(); + if (t instanceof SecurityException) { + authenticationFailed.set(true); + } + if (t instanceof NullPointerException) { + gotNPE.set(true); + } + } + } + }; + Logger.getRootLogger().addAppender(appender); + + String connectURI = broker1.getConnectorByName("openwire").getConnectUri().toString(); + connectURI = connectURI.replace("?needClientAuth=true", ""); + broker2.addNetworkConnector("static:(" + connectURI + ")").start(); + + Thread.sleep(10 * 1000); + + Logger.getRootLogger().removeAppender(appender); + + assertTrue(authenticationFailed.get()); + assertFalse(gotNPE.get()); + } +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3674Test.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3674Test.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3674Test.java new file mode 100644 index 0000000..47ab754 --- /dev/null +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3674Test.java @@ -0,0 +1,120 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.activemq.bugs; + +import static org.junit.Assert.*; + +import javax.jms.Connection; +import javax.jms.DeliveryMode; +import javax.jms.MessageProducer; +import javax.jms.Session; +import javax.jms.TopicSubscriber; + +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.broker.TransportConnector; +import org.apache.activemq.broker.jmx.BrokerView; +import org.apache.activemq.command.ActiveMQTopic; +import org.apache.activemq.util.Wait; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class AMQ3674Test { + + private static Logger LOG = LoggerFactory.getLogger(AMQ3674Test.class); + + private final static int deliveryMode = DeliveryMode.NON_PERSISTENT; + private final static ActiveMQTopic destination = new ActiveMQTopic("XYZ"); + + private ActiveMQConnectionFactory factory; + private BrokerService broker; + + @Test + public void removeSubscription() throws Exception { + + final Connection producerConnection = factory.createConnection(); + producerConnection.start(); + final Connection consumerConnection = factory.createConnection(); + + consumerConnection.setClientID("subscriber1"); + Session consumerMQSession = consumerConnection.createSession(false, Session.CLIENT_ACKNOWLEDGE); + TopicSubscriber activeConsumer = (TopicSubscriber) consumerMQSession.createDurableSubscriber(destination, "myTopic"); + consumerConnection.start(); + + Session session = producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); + MessageProducer producer = session.createProducer(destination); + producer.setDeliveryMode(deliveryMode); + + final BrokerView brokerView = broker.getAdminView(); + + assertEquals(1, brokerView.getDurableTopicSubscribers().length); + + LOG.info("Current Durable Topic Subscriptions: " + brokerView.getDurableTopicSubscribers().length); + + try { + brokerView.destroyDurableSubscriber("subscriber1", "myTopic"); + fail("Expected Exception for Durable consumer is in use"); + } catch(Exception e) { + LOG.info("Recieved expected exception: " + e.getMessage()); + } + + LOG.info("Current Durable Topic Subscriptions: " + brokerView.getDurableTopicSubscribers().length); + + assertEquals(1, brokerView.getDurableTopicSubscribers().length); + + activeConsumer.close(); + consumerConnection.stop(); + + assertTrue("The subscription should be in the inactive state.", Wait.waitFor(new Wait.Condition() { + + @Override + public boolean isSatisified() throws Exception { + return brokerView.getInactiveDurableTopicSubscribers().length == 1; + } + })); + + try { + brokerView.destroyDurableSubscriber("subscriber1", "myTopic"); + } finally { + producer.close(); + producerConnection.close(); + } + } + + @Before + public void setUp() throws Exception { + broker = new BrokerService(); + broker.setPersistent(false); + broker.setUseJmx(true); + broker.setDeleteAllMessagesOnStartup(true); + TransportConnector connector = broker.addConnector("tcp://localhost:0"); + broker.start(); + + factory = new ActiveMQConnectionFactory(connector.getPublishableConnectString()); + factory.setAlwaysSyncSend(true); + factory.setDispatchAsync(false); + } + + @After + public void tearDown() throws Exception { + broker.stop(); + } +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3675Test.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3675Test.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3675Test.java new file mode 100644 index 0000000..c8e4bf4 --- /dev/null +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3675Test.java @@ -0,0 +1,161 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.activemq.bugs; + +import static org.junit.Assert.*; + +import javax.jms.Connection; +import javax.jms.DeliveryMode; +import javax.jms.MessageProducer; +import javax.jms.Session; +import javax.jms.TopicSubscriber; +import javax.management.ObjectName; + +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.broker.TransportConnector; +import org.apache.activemq.broker.jmx.BrokerView; +import org.apache.activemq.broker.jmx.TopicViewMBean; +import org.apache.activemq.command.ActiveMQTopic; +import org.apache.activemq.util.Wait; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class AMQ3675Test { + + private static Logger LOG = LoggerFactory.getLogger(AMQ3675Test.class); + + private final static int deliveryMode = DeliveryMode.NON_PERSISTENT; + private final static ActiveMQTopic destination = new ActiveMQTopic("XYZ"); + + private ActiveMQConnectionFactory factory; + private BrokerService broker; + + public TopicViewMBean getTopicView() throws Exception { + ObjectName destinationName = broker.getAdminView().getTopics()[0]; + TopicViewMBean topicView = (TopicViewMBean) broker.getManagementContext().newProxyInstance(destinationName, TopicViewMBean.class, true); + return topicView; + } + + @Test + public void countConsumers() throws Exception { + + final Connection producerConnection = factory.createConnection(); + producerConnection.start(); + final Connection consumerConnection = factory.createConnection(); + + consumerConnection.setClientID("subscriber1"); + Session consumerMQSession = consumerConnection.createSession(false, Session.CLIENT_ACKNOWLEDGE); + TopicSubscriber consumer = consumerMQSession.createDurableSubscriber(destination, "myTopic"); + consumerConnection.start(); + + Session session = producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); + MessageProducer producer = session.createProducer(destination); + producer.setDeliveryMode(deliveryMode); + + final BrokerView brokerView = broker.getAdminView(); + final TopicViewMBean topicView = getTopicView(); + + assertTrue("Should have one consumer on topic: ", Wait.waitFor(new Wait.Condition() { + @Override + public boolean isSatisified() throws Exception { + return topicView.getConsumerCount() == 1; + } + })); + + consumer.close(); + + assertTrue("Durable consumer should now be inactive.", Wait.waitFor(new Wait.Condition() { + @Override + public boolean isSatisified() throws Exception { + return brokerView.getInactiveDurableTopicSubscribers().length == 1; + } + })); + + try { + brokerView.removeTopic(destination.getTopicName()); + } catch (Exception e1) { + fail("Unable to remove destination:" + destination.getPhysicalName()); + } + + assertTrue("Should have no topics on the broker", Wait.waitFor(new Wait.Condition() { + @Override + public boolean isSatisified() throws Exception { + return brokerView.getTopics().length == 0; + } + })); + + try { + brokerView.destroyDurableSubscriber("subscriber1", "myTopic"); + } catch(Exception e) { + fail("Exception not expected when attempting to delete Durable consumer."); + } + + assertTrue("Should be no durable consumers active or inactive.", Wait.waitFor(new Wait.Condition() { + @Override + public boolean isSatisified() throws Exception { + return brokerView.getInactiveDurableTopicSubscribers().length == 0 && + brokerView.getDurableTopicSubscribers().length == 0; + } + })); + + consumer = consumerMQSession.createDurableSubscriber(destination, "myTopic"); + + consumer.close(); + + assertTrue("Should be one consumer on the Topic.", Wait.waitFor(new Wait.Condition() { + @Override + public boolean isSatisified() throws Exception { + LOG.info("Number of inactive consumers: " + brokerView.getInactiveDurableTopicSubscribers().length); + return brokerView.getInactiveDurableTopicSubscribers().length == 1; + } + })); + + final TopicViewMBean recreatedTopicView = getTopicView(); + + assertTrue("Should have one consumer on topic: ", Wait.waitFor(new Wait.Condition() { + @Override + public boolean isSatisified() throws Exception { + return recreatedTopicView.getConsumerCount() == 1; + } + })); + } + + @Before + public void setUp() throws Exception { + broker = new BrokerService(); + broker.setPersistent(false); + broker.setUseJmx(true); + broker.setAdvisorySupport(false); + broker.setDeleteAllMessagesOnStartup(true); + TransportConnector connector = broker.addConnector("tcp://localhost:0"); + broker.start(); + + factory = new ActiveMQConnectionFactory(connector.getPublishableConnectString()); + factory.setAlwaysSyncSend(true); + factory.setDispatchAsync(false); + } + + @After + public void tearDown() throws Exception { + broker.stop(); + } +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3678Test.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3678Test.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3678Test.java new file mode 100644 index 0000000..3c79fcf --- /dev/null +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3678Test.java @@ -0,0 +1,220 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.activemq.bugs; + +import java.io.IOException; +import java.lang.management.ManagementFactory; +import java.net.ServerSocket; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +import javax.jms.Connection; +import javax.jms.DeliveryMode; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageListener; +import javax.jms.MessageProducer; +import javax.jms.Session; +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.ActiveMQTopicSubscriber; +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.broker.jmx.ManagementContext; +import org.apache.activemq.command.ActiveMQTopic; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import static org.junit.Assert.fail; + + +public class AMQ3678Test implements MessageListener { + + public int deliveryMode = DeliveryMode.NON_PERSISTENT; + + + private BrokerService broker; + + AtomicInteger messagesSent = new AtomicInteger(0); + AtomicInteger messagesReceived = new AtomicInteger(0); + + ActiveMQTopic destination = new ActiveMQTopic("XYZ"); + + int port; + int jmxport; + + + final CountDownLatch latch = new CountDownLatch(2); + + + public static void main(String[] args) throws Exception { + + } + + + public static int findFreePort() throws IOException { + ServerSocket socket = null; + + try { + // 0 is open a socket on any free port + socket = new ServerSocket(0); + return socket.getLocalPort(); + } finally { + if (socket != null) { + socket.close(); + } + } + } + + + @Test + public void countConsumers() throws JMSException { + ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("tcp://localhost:" + port); + factory.setAlwaysSyncSend(true); + factory.setDispatchAsync(false); + + final Connection producerConnection = factory.createConnection(); + producerConnection.start(); + + final Connection consumerConnection = factory.createConnection(); + + consumerConnection.setClientID("subscriber1"); + Session consumerMQSession = consumerConnection.createSession(false, Session.CLIENT_ACKNOWLEDGE); + + ActiveMQTopicSubscriber activeConsumer = (ActiveMQTopicSubscriber) consumerMQSession.createDurableSubscriber(destination, "myTopic?consumer.prefetchSize=1"); + + activeConsumer.setMessageListener(this); + + consumerConnection.start(); + + + final Session producerSession = producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); + final MessageProducer producer = producerSession.createProducer(destination); + producer.setDeliveryMode(deliveryMode); + + Thread t = new Thread(new Runnable() { + + private boolean done = false; + + public void run() { + while (!done) { + if (messagesSent.get() == 50) { + try { + broker.getAdminView().removeTopic(destination.getTopicName()); + } catch (Exception e1) { + // TODO Auto-generated catch block + e1.printStackTrace(); + System.err.flush(); + fail("Unable to remove destination:" + + destination.getPhysicalName()); + } + } + + try { + producer.send(producerSession.createTextMessage()); + int val = messagesSent.incrementAndGet(); + + System.out.println("sent message (" + val + ")"); + System.out.flush(); + + if (val == 100) { + done = true; + latch.countDown(); + producer.close(); + producerSession.close(); + + } + } catch (JMSException e) { + // TODO Auto-generated catch block + e.printStackTrace(); + } + } + } + }); + + t.start(); + + try { + if (!latch.await(10, TimeUnit.SECONDS)) { + fail("did not receive all the messages"); + } + } catch (InterruptedException e) { + // TODO Auto-generated catch block + fail("did not receive all the messages, exception waiting for latch"); + e.printStackTrace(); + } + + +// + + + } + + @Before + public void setUp() throws Exception { + + try { + port = findFreePort(); + jmxport = findFreePort(); + } catch (Exception e) { + fail("Unable to obtain a free port on which to start the broker"); + } + + System.out.println("Starting broker"); + System.out.flush(); + broker = new BrokerService(); + broker.setPersistent(false); + ManagementContext ctx = new ManagementContext(ManagementFactory.getPlatformMBeanServer()); + ctx.setConnectorPort(jmxport); + broker.setManagementContext(ctx); + broker.setUseJmx(true); +// broker.setAdvisorySupport(false); +// broker.setDeleteAllMessagesOnStartup(true); + + broker.addConnector("tcp://localhost:" + port).setName("Default"); + broker.start(); + + + System.out.println("End of Broker Setup"); + System.out.flush(); + } + + @After + public void tearDown() throws Exception { + broker.stop(); + } + + + @Override + public void onMessage(Message message) { + try { + message.acknowledge(); + int val = messagesReceived.incrementAndGet(); + System.out.println("received message (" + val + ")"); + System.out.flush(); + if (messagesReceived.get() == 100) { + latch.countDown(); + } + } catch (JMSException e) { + // TODO Auto-generated catch block + e.printStackTrace(); + } + + } + + +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3732Test.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3732Test.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3732Test.java new file mode 100644 index 0000000..601901b --- /dev/null +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3732Test.java @@ -0,0 +1,174 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.bugs; + +import static org.junit.Assert.assertEquals; + +import java.util.Random; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; + +import javax.jms.Connection; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Queue; +import javax.jms.Session; + +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.ActiveMQSession; +import org.apache.activemq.broker.BrokerService; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class AMQ3732Test { + + private static Logger LOG = LoggerFactory.getLogger(AMQ3732Test.class); + + private ActiveMQConnectionFactory connectionFactory; + private Connection connection; + private Session session; + private BrokerService broker; + private String connectionUri; + + private final Random pause = new Random(); + private final long NUM_MESSAGES = 25000; + private final AtomicLong totalConsumed = new AtomicLong(); + + @Before + public void startBroker() throws Exception { + broker = new BrokerService(); + broker.setDeleteAllMessagesOnStartup(true); + broker.setPersistent(false); + broker.setUseJmx(false); + broker.addConnector("tcp://0.0.0.0:0"); + broker.start(); + broker.waitUntilStarted(); + + connectionUri = broker.getTransportConnectors().get(0).getPublishableConnectString(); + + connectionFactory = new ActiveMQConnectionFactory(connectionUri); + connectionFactory.getPrefetchPolicy().setAll(0); + } + + @After + public void stopBroker() throws Exception { + connection.close(); + + broker.stop(); + broker.waitUntilStopped(); + } + + @Test(timeout = 1200000) + public void testInterruptionAffects() throws Exception { + + connection = connectionFactory.createConnection(); + connection.start(); + session = connection.createSession(false, ActiveMQSession.INDIVIDUAL_ACKNOWLEDGE); + + Queue queue = session.createQueue("AMQ3732Test"); + + final LinkedBlockingQueue<Message> workQueue = new LinkedBlockingQueue<Message>(); + + final MessageConsumer consumer1 = session.createConsumer(queue); + final MessageConsumer consumer2 = session.createConsumer(queue); + final MessageProducer producer = session.createProducer(queue); + + Thread consumer1Thread = new Thread(new Runnable() { + + @Override + public void run() { + try { + while (totalConsumed.get() < NUM_MESSAGES) { + Message message = consumer1.receiveNoWait(); + if (message != null) { + workQueue.add(message); + } + } + } catch(Exception e) { + LOG.error("Caught an unexpected error: ", e); + } + } + }); + consumer1Thread.start(); + + Thread consumer2Thread = new Thread(new Runnable() { + + @Override + public void run() { + try { + while (totalConsumed.get() < NUM_MESSAGES) { + Message message = consumer2.receive(50); + if (message != null) { + workQueue.add(message); + } + } + } catch(Exception e) { + LOG.error("Caught an unexpected error: ", e); + } + } + }); + consumer2Thread.start(); + + Thread producerThread = new Thread(new Runnable() { + + @Override + public void run() { + try { + for (int i = 0; i < NUM_MESSAGES; ++i) { + producer.send(session.createTextMessage("TEST")); + TimeUnit.MILLISECONDS.sleep(pause.nextInt(10)); + } + } catch(Exception e) { + LOG.error("Caught an unexpected error: ", e); + } + } + }); + producerThread.start(); + + Thread ackingThread = new Thread(new Runnable() { + + @Override + public void run() { + try { + while (totalConsumed.get() < NUM_MESSAGES) { + Message message = workQueue.take(); + message.acknowledge(); + totalConsumed.incrementAndGet(); + if ((totalConsumed.get() % 100) == 0) { + LOG.info("Consumed " + totalConsumed.get() + " messages so far."); + } + } + } catch(Exception e) { + LOG.error("Caught an unexpected error: ", e); + } + } + }); + ackingThread.start(); + + producerThread.join(); + consumer1Thread.join(); + consumer2Thread.join(); + ackingThread.join(); + + assertEquals(NUM_MESSAGES, totalConsumed.get()); + } +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3779Test.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3779Test.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3779Test.java new file mode 100644 index 0000000..5a410e8 --- /dev/null +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3779Test.java @@ -0,0 +1,76 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.activemq.bugs; + +import java.util.concurrent.atomic.AtomicBoolean; +import javax.jms.Connection; +import javax.jms.DeliveryMode; +import javax.jms.MessageProducer; +import javax.jms.Session; +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.AutoFailTestSupport; +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.broker.util.LoggingBrokerPlugin; +import org.apache.activemq.util.DefaultTestAppender; +import org.apache.log4j.Appender; +import org.apache.log4j.Logger; +import org.apache.log4j.spi.LoggingEvent; + +public class AMQ3779Test extends AutoFailTestSupport { + + private static final Logger logger = Logger.getLogger(AMQ3779Test.class); + private static final String qName = "QNameToFind"; + + public void testLogPerDest() throws Exception { + + final AtomicBoolean ok = new AtomicBoolean(false); + Appender appender = new DefaultTestAppender() { + @Override + public void doAppend(LoggingEvent event) { + if (event.getLoggerName().toString().contains(qName)) { + ok.set(true); + } + } + }; + logger.getRootLogger().addAppender(appender); + + try { + + BrokerService broker = new BrokerService(); + LoggingBrokerPlugin loggingBrokerPlugin = new LoggingBrokerPlugin(); + loggingBrokerPlugin.setPerDestinationLogger(true); + loggingBrokerPlugin.setLogAll(true); + broker.setPlugins(new LoggingBrokerPlugin[]{loggingBrokerPlugin}); + broker.start(); + + + Connection connection = new ActiveMQConnectionFactory(broker.getVmConnectorURI()).createConnection(); + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + MessageProducer messageProducer = session.createProducer(session.createQueue(qName)); + messageProducer.setDeliveryMode(DeliveryMode.PERSISTENT); + connection.start(); + + messageProducer.send(session.createTextMessage("Hi")); + connection.close(); + + assertTrue("got expected log message", ok.get()); + } finally { + logger.removeAppender(appender); + } + } +}
