http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3140Test.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3140Test.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3140Test.java new file mode 100644 index 0000000..fd71558 --- /dev/null +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3140Test.java @@ -0,0 +1,145 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.bugs; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.fail; + +import java.io.File; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; + +import javax.jms.Connection; +import javax.jms.ConnectionFactory; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageListener; +import javax.jms.MessageProducer; +import javax.jms.Session; + +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.ScheduledMessage; +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.util.IOHelper; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +public class AMQ3140Test { + + private static final int MESSAGES_PER_THREAD = 100; + + private static final int THREAD_COUNT = 10; + + private BrokerService broker; + + private static final String QUEUE_NAME = "test"; + + private static class Sender extends Thread { + + private static final int DELAY = 3000; + + @Override + public void run() { + try { + ConnectionFactory cf = new ActiveMQConnectionFactory("vm://localhost"); + Connection connection = cf.createConnection(); + connection.start(); + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + MessageProducer producer = session.createProducer(session.createQueue(QUEUE_NAME)); + Message message = session.createTextMessage("test"); + for (int i = 0; i < MESSAGES_PER_THREAD; i++) { + message.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY, DELAY); + producer.send(message); + } + session.close(); + connection.close(); + } catch (JMSException e) { + fail(e.getMessage()); + } + } + } + + @Before + public void setup() throws Exception { + File schedulerDirectory = new File("target/test/ScheduledDB"); + + IOHelper.mkdirs(schedulerDirectory); + IOHelper.deleteChildren(schedulerDirectory); + + broker = new BrokerService(); + broker.setSchedulerSupport(true); + broker.setPersistent(true); + broker.setDeleteAllMessagesOnStartup(true); + broker.setDataDirectory("target"); + broker.setSchedulerDirectoryFile(schedulerDirectory); + broker.setUseJmx(false); + broker.addConnector("vm://localhost"); + + broker.start(); + broker.waitUntilStarted(); + } + + @After + public void tearDown() throws Exception { + broker.stop(); + } + + @Test + public void noMessageLostOnConcurrentScheduling() throws JMSException, InterruptedException { + + final AtomicLong receiveCounter = new AtomicLong(); + + ConnectionFactory cf = new ActiveMQConnectionFactory("vm://localhost"); + Connection connection = cf.createConnection(); + connection.start(); + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + + MessageConsumer consumer = session.createConsumer(session.createQueue(QUEUE_NAME)); + consumer.setMessageListener(new MessageListener() { + + @Override + public void onMessage(Message message) { + receiveCounter.incrementAndGet(); + } + }); + + List<Sender> senderThreads = new ArrayList<Sender>(); + for (int i = 0; i < THREAD_COUNT; i++) { + Sender sender = new Sender(); + senderThreads.add(sender); + } + for (Sender sender : senderThreads) { + sender.start(); + } + for (Sender sender : senderThreads) { + sender.join(); + } + + // wait until all scheduled messages has been received + TimeUnit.MINUTES.sleep(2); + + session.close(); + connection.close(); + + assertEquals(MESSAGES_PER_THREAD * THREAD_COUNT, receiveCounter.get()); + } + +} \ No newline at end of file
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3141Test.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3141Test.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3141Test.java new file mode 100644 index 0000000..1209bd7 --- /dev/null +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3141Test.java @@ -0,0 +1,117 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.bugs; + +import static org.junit.Assert.assertTrue; + +import java.io.File; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +import javax.jms.Connection; +import javax.jms.ConnectionFactory; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageListener; +import javax.jms.MessageProducer; +import javax.jms.Session; + +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.ScheduledMessage; +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.util.IOHelper; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +public class AMQ3141Test { + + private static final int MAX_MESSAGES = 100; + + private static final long DELAY_IN_MS = 100; + + private static final String QUEUE_NAME = "target.queue"; + + private BrokerService broker; + + private final CountDownLatch messageCountDown = new CountDownLatch(MAX_MESSAGES); + + private ConnectionFactory factory; + + @Before + public void setup() throws Exception { + + broker = new BrokerService(); + broker.setPersistent(true); + broker.setSchedulerSupport(true); + broker.setDataDirectory("target"); + broker.setUseJmx(false); + broker.addConnector("vm://localhost"); + + File schedulerDirectory = new File("target/test/ScheduledDB"); + IOHelper.mkdirs(schedulerDirectory); + IOHelper.deleteChildren(schedulerDirectory); + broker.setSchedulerDirectoryFile(schedulerDirectory); + + broker.start(); + broker.waitUntilStarted(); + + factory = new ActiveMQConnectionFactory("vm://localhost"); + } + + private void sendMessages() throws Exception { + Connection connection = factory.createConnection(); + connection.start(); + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + MessageProducer producer = session.createProducer(session.createQueue(QUEUE_NAME)); + for (int i = 0; i < MAX_MESSAGES; i++) { + Message message = session.createTextMessage(); + message.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY, DELAY_IN_MS); + producer.send(message); + } + connection.close(); + } + + @Test + public void testNoMissingMessagesOnShortScheduleDelay() throws Exception { + + Connection connection = factory.createConnection(); + connection.start(); + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + MessageConsumer consumer = session.createConsumer(session.createQueue(QUEUE_NAME)); + + consumer.setMessageListener(new MessageListener() { + @Override + public void onMessage(Message message) { + messageCountDown.countDown(); + } + }); + sendMessages(); + + boolean receiveComplete = messageCountDown.await(5, TimeUnit.SECONDS); + + connection.close(); + + assertTrue("expect all messages received but " + messageCountDown.getCount() + " are missing", receiveComplete); + } + + @After + public void tearDown() throws Exception { + broker.stop(); + } + +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3145Test.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3145Test.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3145Test.java new file mode 100644 index 0000000..81128bd --- /dev/null +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3145Test.java @@ -0,0 +1,133 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.bugs; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +import javax.jms.Connection; +import javax.jms.ConnectionFactory; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Queue; +import javax.jms.Session; +import javax.jms.TextMessage; +import javax.management.MalformedObjectNameException; +import javax.management.ObjectName; + +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.broker.jmx.QueueViewMBean; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class AMQ3145Test { + private static final Logger LOG = LoggerFactory.getLogger(AMQ3145Test.class); + private final String MESSAGE_TEXT = new String(new byte[1024]); + BrokerService broker; + ConnectionFactory factory; + Connection connection; + Session session; + Queue queue; + MessageConsumer consumer; + + @Before + public void createBroker() throws Exception { + createBroker(true); + } + + public void createBroker(boolean deleteAll) throws Exception { + broker = new BrokerService(); + broker.setDeleteAllMessagesOnStartup(deleteAll); + broker.setDataDirectory("target/AMQ3145Test"); + broker.setUseJmx(true); + broker.getManagementContext().setCreateConnector(false); + broker.addConnector("tcp://localhost:0"); + broker.start(); + broker.waitUntilStarted(); + factory = new ActiveMQConnectionFactory(broker.getTransportConnectors().get(0).getConnectUri().toString()); + connection = factory.createConnection(); + connection.start(); + session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE); + } + + @After + public void tearDown() throws Exception { + if (consumer != null) { + consumer.close(); + } + session.close(); + connection.stop(); + connection.close(); + broker.stop(); + } + + @Test + public void testCacheDisableReEnable() throws Exception { + createProducerAndSendMessages(1); + QueueViewMBean proxy = getProxyToQueueViewMBean(); + assertTrue("cache is enabled", proxy.isCacheEnabled()); + tearDown(); + createBroker(false); + proxy = getProxyToQueueViewMBean(); + assertEquals("one pending message", 1, proxy.getQueueSize()); + assertTrue("cache is disabled when there is a pending message", !proxy.isCacheEnabled()); + + createConsumer(1); + createProducerAndSendMessages(1); + assertTrue("cache is enabled again on next send when there are no messages", proxy.isCacheEnabled()); + } + + private QueueViewMBean getProxyToQueueViewMBean() + throws MalformedObjectNameException, JMSException { + ObjectName queueViewMBeanName = new ObjectName("org.apache.activemq" + + ":destinationType=Queue,destinationName=" + queue.getQueueName() + + ",type=Broker,brokerName=localhost"); + QueueViewMBean proxy = (QueueViewMBean) broker.getManagementContext() + .newProxyInstance(queueViewMBeanName, + QueueViewMBean.class, true); + return proxy; + } + + private void createProducerAndSendMessages(int numToSend) throws Exception { + queue = session.createQueue("test1"); + MessageProducer producer = session.createProducer(queue); + for (int i = 0; i < numToSend; i++) { + TextMessage message = session.createTextMessage(MESSAGE_TEXT + i); + if (i != 0 && i % 50000 == 0) { + LOG.info("sent: " + i); + } + producer.send(message); + } + producer.close(); + } + + private void createConsumer(int numToConsume) throws Exception { + consumer = session.createConsumer(queue); + // wait for buffer fill out + for (int i = 0; i < numToConsume; ++i) { + Message message = consumer.receive(2000); + message.acknowledge(); + } + consumer.close(); + } +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3157Test.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3157Test.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3157Test.java new file mode 100644 index 0000000..f18af6f --- /dev/null +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3157Test.java @@ -0,0 +1,175 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.bugs; + +import java.util.Arrays; +import java.util.List; +import java.util.concurrent.TimeUnit; + +import javax.jms.Connection; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Session; +import javax.management.ObjectName; + +import org.apache.activemq.EmbeddedBrokerTestSupport; +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.broker.jmx.DestinationViewMBean; +import org.apache.activemq.broker.region.DestinationInterceptor; +import org.apache.activemq.broker.region.policy.PolicyEntry; +import org.apache.activemq.broker.region.policy.PolicyMap; +import org.apache.activemq.broker.region.virtual.MirroredQueue; +import org.apache.activemq.command.ActiveMQDestination; +import org.apache.activemq.command.ActiveMQQueue; +import org.apache.activemq.command.ActiveMQTopic; +import org.apache.activemq.spring.ConsumerBean; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class AMQ3157Test extends EmbeddedBrokerTestSupport { + + private static final transient Logger LOG = LoggerFactory.getLogger(AMQ3157Test.class); + private Connection connection; + + public void testInactiveMirroredQueueIsCleanedUp() throws Exception { + + if (connection == null) { + connection = createConnection(); + } + connection.start(); + + ConsumerBean messageList = new ConsumerBean(); + messageList.setVerbose(true); + + ActiveMQDestination consumeDestination = createConsumeDestination(); + + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + LOG.info("Consuming from: " + consumeDestination); + + MessageConsumer c1 = session.createConsumer(consumeDestination); + c1.setMessageListener(messageList); + + // create topic producer + ActiveMQQueue sendDestination = new ActiveMQQueue(getQueueName()); + LOG.info("Sending to: " + sendDestination); + + MessageProducer producer = session.createProducer(sendDestination); + assertNotNull(producer); + + final int total = 10; + for (int i = 0; i < total; i++) { + producer.send(session.createTextMessage("message: " + i)); + } + + messageList.assertMessagesArrived(total); + LOG.info("Received: " + messageList); + messageList.flushMessages(); + + MessageConsumer c2 = session.createConsumer(sendDestination); + c2.setMessageListener(messageList); + messageList.assertMessagesArrived(total); + LOG.info("Q Received: " + messageList); + + connection.close(); + + List<ObjectName> topics = Arrays.asList(broker.getAdminView().getTopics()); + assertTrue(topics.contains(createObjectName(consumeDestination))); + List<ObjectName> queues = Arrays.asList(broker.getAdminView().getQueues()); + assertTrue(queues.contains(createObjectName(sendDestination))); + + Thread.sleep(TimeUnit.SECONDS.toMillis(10)); + + topics = Arrays.asList(broker.getAdminView().getTopics()); + if (topics != null) { + assertFalse("Virtual Topic Desination did not get cleaned up.", + topics.contains(createObjectName(consumeDestination))); + } + queues = Arrays.asList(broker.getAdminView().getQueues()); + if (queues != null) { + assertFalse("Mirrored Queue Desination did not get cleaned up.", + queues.contains(createObjectName(sendDestination))); + } + } + + protected ActiveMQDestination createConsumeDestination() { + return new ActiveMQTopic("VirtualTopic.Mirror." + getQueueName()); + } + + protected String getQueueName() { + return "My.Queue"; + } + + @Override + protected BrokerService createBroker() throws Exception { + BrokerService answer = new BrokerService(); + answer.setUseMirroredQueues(true); + answer.setPersistent(isPersistent()); + answer.setSchedulePeriodForDestinationPurge(1000); + + PolicyEntry entry = new PolicyEntry(); + entry.setGcInactiveDestinations(true); + entry.setInactiveTimoutBeforeGC(5000); + entry.setProducerFlowControl(true); + PolicyMap map = new PolicyMap(); + map.setDefaultEntry(entry); + + MirroredQueue mirrorQ = new MirroredQueue(); + mirrorQ.setCopyMessage(true); + DestinationInterceptor[] destinationInterceptors = new DestinationInterceptor[]{mirrorQ}; + answer.setDestinationInterceptors(destinationInterceptors); + + answer.setDestinationPolicy(map); + answer.addConnector(bindAddress); + + return answer; + } + + protected DestinationViewMBean createView(ActiveMQDestination destination) throws Exception { + String domain = "org.apache.activemq"; + ObjectName name; + if (destination.isQueue()) { + name = new ObjectName(domain + ":BrokerName=localhost,Type=Queue,Destination=" + destination.getPhysicalName()); + } else { + name = new ObjectName(domain + ":BrokerName=localhost,Type=Topic,Destination=" + destination.getPhysicalName()); + } + return (DestinationViewMBean) broker.getManagementContext().newProxyInstance(name, DestinationViewMBean.class, + true); + } + + protected ObjectName createObjectName(ActiveMQDestination destination) throws Exception { + String domain = "org.apache.activemq"; + ObjectName name; + if (destination.isQueue()) { + name = new ObjectName(domain + ":type=Broker,brokerName=localhost," + + "destinationType=Queue,destinationName=" + destination.getPhysicalName()); + } else { + name = new ObjectName(domain + ":type=Broker,brokerName=localhost," + + "destinationType=Topic,destinationName=" + destination.getPhysicalName()); + } + + return name; + } + + @Override + protected void tearDown() throws Exception { + if (connection != null) { + connection.close(); + } + super.tearDown(); + } + +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3167Test.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3167Test.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3167Test.java new file mode 100644 index 0000000..7e3048a --- /dev/null +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3167Test.java @@ -0,0 +1,462 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.activemq.bugs; + +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +import java.util.ArrayList; + +import javax.jms.Connection; +import javax.jms.Destination; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Session; + +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.broker.region.policy.PolicyEntry; +import org.apache.activemq.broker.region.policy.PolicyMap; +import org.apache.activemq.command.ActiveMQDestination; +import org.apache.activemq.command.ActiveMQMessage; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +/** + * Test the loss of messages detected during testing with ActiveMQ 5.4.1 and 5.4.2. + * <p/> + * Symptoms: - 1 record is lost "early" in the stream. - no more records lost. + * <p/> + * Test Configuration: - Broker Settings: - Destination Policy - Occurs with "Destination Policy" using Store Cursor and + * a memory limit - Not reproduced without "Destination Policy" defined - Persistence Adapter - Memory: Does not occur. + * - KahaDB: Occurs. - Messages - Occurs with TextMessage and BinaryMessage - Persistent messages. + * <p/> + * Notes: - Lower memory limits increase the rate of occurrence. - Higher memory limits may prevent the problem + * (probably because memory limits not reached). - Producers sending a number of messages before consumers come online + * increases rate of occurrence. + */ + +public class AMQ3167Test { + protected BrokerService embeddedBroker; + + protected static final int MEMORY_LIMIT = 16 * 1024; + + protected static boolean Debug_f = false; + + protected long Producer_stop_time = 0; + protected long Consumer_stop_time = 0; + protected long Consumer_startup_delay_ms = 2000; + protected boolean Stop_after_error = true; + + protected Connection JMS_conn; + protected long Num_error = 0; + + // // //// + // // UTILITIES //// + // // //// + + /** + * Create a new, unsecured, client connection to the test broker using the given username and password. This + * connection bypasses all security. + * <p/> + * Don't forget to start the connection or no messages will be received by consumers even though producers will work + * fine. + * + * @username name of the JMS user for the connection; may be null. + * @password Password for the JMS user; may be null. + */ + + protected Connection createUnsecuredConnection(String username, String password) throws javax.jms.JMSException { + ActiveMQConnectionFactory conn_fact; + + conn_fact = new ActiveMQConnectionFactory(embeddedBroker.getVmConnectorURI()); + + return conn_fact.createConnection(username, password); + } + + // // //// + // // TEST FUNCTIONALITY //// + // // //// + + @Before + public void testPrep() throws Exception { + embeddedBroker = new BrokerService(); + configureBroker(embeddedBroker); + embeddedBroker.start(); + embeddedBroker.waitUntilStarted(); + + // Prepare the connection + JMS_conn = createUnsecuredConnection(null, null); + JMS_conn.start(); + } + + @After + public void testCleanup() throws java.lang.Exception { + JMS_conn.stop(); + embeddedBroker.stop(); + } + + protected void configureBroker(BrokerService broker_svc) throws Exception { + + broker_svc.setBrokerName("testbroker1"); + + broker_svc.setUseJmx(false); + broker_svc.setPersistent(true); + broker_svc.setDataDirectory("target/AMQ3167Test"); + configureDestinationPolicy(broker_svc); + } + + /** + * NOTE: overrides any prior policy map defined for the broker service. + */ + + protected void configureDestinationPolicy(BrokerService broker_svc) { + PolicyMap pol_map; + PolicyEntry pol_ent; + ArrayList<PolicyEntry> ent_list; + + ent_list = new ArrayList<PolicyEntry>(); + + // + // QUEUES + // + + pol_ent = new PolicyEntry(); + pol_ent.setQueue(">"); + pol_ent.setMemoryLimit(MEMORY_LIMIT); + pol_ent.setProducerFlowControl(false); + ent_list.add(pol_ent); + + // + // COMPLETE POLICY MAP + // + + pol_map = new PolicyMap(); + pol_map.setPolicyEntries(ent_list); + + broker_svc.setDestinationPolicy(pol_map); + } + + // // //// + // // TEST //// + // // //// + + @Test + public void testQueueLostMessage() throws Exception { + Destination dest; + + dest = ActiveMQDestination.createDestination("lostmsgtest.queue", ActiveMQDestination.QUEUE_TYPE); + + // 10 seconds from now + Producer_stop_time = java.lang.System.nanoTime() + (10L * 1000000000L); + + // 15 seconds from now + Consumer_stop_time = Producer_stop_time + (5L * 1000000000L); + + runLostMsgTest(dest, 1000000, 1, 1, false); + + // Make sure failures in the threads are thoroughly reported in the JUnit framework. + assertTrue(Num_error == 0); + } + + /** + * + */ + + protected static void log(String msg) { + if (Debug_f) + java.lang.System.err.println(msg); + } + + /** + * Main body of the lost-message test. + */ + + protected void runLostMsgTest(Destination dest, int num_msg, int num_send_per_sess, int num_recv_per_sess, boolean topic_f) throws Exception { + Thread prod_thread; + Thread cons_thread; + String tag; + Session sess; + MessageProducer prod; + MessageConsumer cons; + int ack_mode; + + // + // Start the producer + // + + tag = "prod"; + log(">> Starting producer " + tag); + + sess = JMS_conn.createSession((num_send_per_sess > 1), Session.AUTO_ACKNOWLEDGE); + prod = sess.createProducer(dest); + + prod_thread = new producerThread(sess, prod, tag, num_msg, num_send_per_sess); + prod_thread.start(); + log("Started producer " + tag); + + // + // Delay before starting consumers + // + + log("Waiting before starting consumers"); + java.lang.Thread.sleep(Consumer_startup_delay_ms); + + // + // Now create and start the consumer + // + + tag = "cons"; + log(">> Starting consumer"); + + if (num_recv_per_sess > 1) + ack_mode = Session.CLIENT_ACKNOWLEDGE; + else + ack_mode = Session.AUTO_ACKNOWLEDGE; + + sess = JMS_conn.createSession(false, ack_mode); + cons = sess.createConsumer(dest); + + cons_thread = new consumerThread(sess, cons, tag, num_msg, num_recv_per_sess); + cons_thread.start(); + log("Started consumer " + tag); + + // + // Wait for the producer and consumer to finish. + // + + log("< waiting for producer."); + prod_thread.join(); + + log("< waiting for consumer."); + cons_thread.join(); + + log("Shutting down"); + } + + // // //// + // // INTERNAL CLASSES //// + // // //// + + /** + * Producer thread - runs a single producer until the maximum number of messages is sent, the producer stop time is + * reached, or a test error is detected. + */ + + protected class producerThread extends Thread { + protected Session msgSess; + protected MessageProducer msgProd; + protected String producerTag; + protected int numMsg; + protected int numPerSess; + protected long producer_stop_time; + + producerThread(Session sess, MessageProducer prod, String tag, int num_msg, int sess_size) { + super(); + + producer_stop_time = 0; + msgSess = sess; + msgProd = prod; + producerTag = tag; + numMsg = num_msg; + numPerSess = sess_size; + } + + public void execTest() throws Exception { + Message msg; + int sess_start; + int cur; + + sess_start = 0; + cur = 0; + while ((cur < numMsg) && (!didTimeOut()) && ((!Stop_after_error) || (Num_error == 0))) { + msg = msgSess.createTextMessage("test message from " + producerTag); + msg.setStringProperty("testprodtag", producerTag); + msg.setIntProperty("seq", cur); + + if (msg instanceof ActiveMQMessage) { + ((ActiveMQMessage) msg).setResponseRequired(true); + } + + // + // Send the message. + // + + msgProd.send(msg); + cur++; + + // + // Commit if the number of messages per session has been reached, and + // transactions are being used (only when > 1 msg per sess). + // + + if ((numPerSess > 1) && ((cur - sess_start) >= numPerSess)) { + msgSess.commit(); + sess_start = cur; + } + } + + // Make sure to send the final commit, if there were sends since the last commit. + if ((numPerSess > 1) && ((cur - sess_start) > 0)) + msgSess.commit(); + + if (cur < numMsg) + log("* Producer " + producerTag + " timed out at " + java.lang.System.nanoTime() + " (stop time " + producer_stop_time + ")"); + } + + /** + * Check whether it is time for the producer to terminate. + */ + + protected boolean didTimeOut() { + if ((Producer_stop_time > 0) && (java.lang.System.nanoTime() >= Producer_stop_time)) + return true; + + return false; + } + + /** + * Run the producer. + */ + + @Override + public void run() { + try { + log("- running producer " + producerTag); + execTest(); + log("- finished running producer " + producerTag); + } catch (Throwable thrown) { + Num_error++; + fail("producer " + producerTag + " failed: " + thrown.getMessage()); + throw new Error("producer " + producerTag + " failed", thrown); + } + } + + @Override + public String toString() { + return producerTag; + } + } + + /** + * Producer thread - runs a single consumer until the maximum number of messages is received, the consumer stop time + * is reached, or a test error is detected. + */ + + protected class consumerThread extends Thread { + protected Session msgSess; + protected MessageConsumer msgCons; + protected String consumerTag; + protected int numMsg; + protected int numPerSess; + + consumerThread(Session sess, MessageConsumer cons, String tag, int num_msg, int sess_size) { + super(); + + msgSess = sess; + msgCons = cons; + consumerTag = tag; + numMsg = num_msg; + numPerSess = sess_size; + } + + public void execTest() throws Exception { + Message msg; + int sess_start; + int cur; + + msg = null; + sess_start = 0; + cur = 0; + + while ((cur < numMsg) && (!didTimeOut()) && ((!Stop_after_error) || (Num_error == 0))) { + // + // Use a timeout of 1 second to periodically check the consumer timeout. + // + msg = msgCons.receive(1000); + if (msg != null) { + checkMessage(msg, cur); + cur++; + + if ((numPerSess > 1) && ((cur - sess_start) >= numPerSess)) { + msg.acknowledge(); + sess_start = cur; + } + } + } + + // Acknowledge the last messages, if they were not yet acknowledged. + if ((numPerSess > 1) && ((cur - sess_start) > 0)) + msg.acknowledge(); + + if (cur < numMsg) + log("* Consumer " + consumerTag + " timed out"); + } + + /** + * Check whether it is time for the consumer to terminate. + */ + + protected boolean didTimeOut() { + if ((Consumer_stop_time > 0) && (java.lang.System.nanoTime() >= Consumer_stop_time)) + return true; + + return false; + } + + /** + * Verify the message received. Sequence numbers are checked and are expected to exactly match the message + * number (starting at 0). + */ + + protected void checkMessage(Message msg, int exp_seq) throws javax.jms.JMSException { + int seq; + + seq = msg.getIntProperty("seq"); + + if (exp_seq != seq) { + Num_error++; + fail("*** Consumer " + consumerTag + " expected seq " + exp_seq + "; received " + seq); + } + } + + /** + * Run the consumer. + */ + + @Override + public void run() { + try { + log("- running consumer " + consumerTag); + execTest(); + log("- running consumer " + consumerTag); + } catch (Throwable thrown) { + Num_error++; + fail("consumer " + consumerTag + " failed: " + thrown.getMessage()); + throw new Error("consumer " + consumerTag + " failed", thrown); + } + } + + @Override + public String toString() { + return consumerTag; + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3274Test.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3274Test.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3274Test.java new file mode 100644 index 0000000..48c5cbb --- /dev/null +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3274Test.java @@ -0,0 +1,734 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.activemq.bugs; + +import static org.junit.Assert.assertTrue; + +import java.net.URI; +import java.net.URISyntaxException; +import java.util.ArrayList; +import java.util.Enumeration; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +import javax.jms.Connection; +import javax.jms.Destination; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Session; +import javax.jms.TextMessage; + +import org.apache.activemq.ActiveMQConnection; +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.command.ActiveMQDestination; +import org.apache.activemq.network.DiscoveryNetworkConnector; +import org.apache.activemq.network.NetworkConnector; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class AMQ3274Test { + private static final transient Logger LOG = LoggerFactory.getLogger(AMQ3274Test.class); + + protected static int Next_broker_num = 0; + protected EmbeddedTcpBroker broker1; + protected EmbeddedTcpBroker broker2; + + protected int nextEchoId = 0; + protected boolean testError = false; + + protected int echoResponseFill = 0; // Number of "filler" response messages per request + + public AMQ3274Test() throws Exception { + broker1 = new EmbeddedTcpBroker(); + broker2 = new EmbeddedTcpBroker(); + + broker1.coreConnectTo(broker2, true); + broker2.coreConnectTo(broker1, true); + } + + public void logMessage(String msg) { + System.out.println(msg); + System.out.flush(); + } + + public void testMessages(Session sess, MessageProducer req_prod, Destination resp_dest, int num_msg) throws Exception { + MessageConsumer resp_cons; + TextMessage msg; + MessageClient cons_client; + int cur; + int tot_expected; + + resp_cons = sess.createConsumer(resp_dest); + + cons_client = new MessageClient(resp_cons, num_msg); + cons_client.start(); + + cur = 0; + while ((cur < num_msg) && (!testError)) { + msg = sess.createTextMessage("MSG AAAA " + cur); + msg.setIntProperty("SEQ", 100 + cur); + msg.setStringProperty("TEST", "TOPO"); + msg.setJMSReplyTo(resp_dest); + + if (cur == (num_msg - 1)) + msg.setBooleanProperty("end-of-response", true); + + req_prod.send(msg); + + cur++; + } + + cons_client.waitShutdown(5000); + + if (cons_client.shutdown()) { + LOG.debug("Consumer client shutdown complete"); + } else { + LOG.debug("Consumer client shutdown incomplete!!!"); + } + + tot_expected = num_msg * (echoResponseFill + 1); + + if (cons_client.getNumMsgReceived() == tot_expected) { + LOG.info("Have " + tot_expected + " messages, as-expected"); + } else { + LOG.error("Have " + cons_client.getNumMsgReceived() + " messages; expected " + tot_expected); + testError = true; + } + + resp_cons.close(); + } + + /** + * Test one destination between the given "producer broker" and + * "consumer broker" specified. + */ + public void testOneDest(Connection conn, Session sess, Destination cons_dest, String prod_broker_url, String cons_broker_url, int num_msg) throws Exception { + int echo_id; + + EchoService echo_svc; + String echo_queue_name; + Destination prod_dest; + MessageProducer msg_prod; + + synchronized (this) { + echo_id = this.nextEchoId; + this.nextEchoId++; + } + + echo_queue_name = "echo.queue." + echo_id; + + LOG.trace("destroying the echo queue in case an old one exists"); + removeQueue(conn, echo_queue_name); + + echo_svc = new EchoService(echo_queue_name, prod_broker_url); + echo_svc.start(); + + LOG.trace("Creating echo queue and producer"); + prod_dest = sess.createQueue(echo_queue_name); + msg_prod = sess.createProducer(prod_dest); + + testMessages(sess, msg_prod, cons_dest, num_msg); + + echo_svc.shutdown(); + msg_prod.close(); + } + + /** + * TEST TEMPORARY TOPICS + */ + public void testTempTopic(String prod_broker_url, String cons_broker_url) throws Exception { + Connection conn; + Session sess; + Destination cons_dest; + int num_msg; + + num_msg = 5; + + LOG.info("TESTING TEMP TOPICS " + prod_broker_url + " -> " + cons_broker_url + " (" + num_msg + " messages)"); + + conn = createConnection(cons_broker_url); + conn.start(); + sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE); + + LOG.trace("Creating destination"); + cons_dest = sess.createTemporaryTopic(); + + testOneDest(conn, sess, cons_dest, prod_broker_url, cons_broker_url, num_msg); + + sess.close(); + conn.close(); + } + + /** + * TEST TOPICS + */ + public void testTopic(String prod_broker_url, String cons_broker_url) throws Exception { + int num_msg; + + Connection conn; + Session sess; + String topic_name; + + Destination cons_dest; + + num_msg = 5; + + LOG.info("TESTING TOPICS " + prod_broker_url + " -> " + cons_broker_url + " (" + num_msg + " messages)"); + + conn = createConnection(cons_broker_url); + conn.start(); + sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE); + + topic_name = "topotest2.perm.topic"; + LOG.trace("Removing existing Topic"); + removeTopic(conn, topic_name); + LOG.trace("Creating Topic, " + topic_name); + cons_dest = sess.createTopic(topic_name); + + testOneDest(conn, sess, cons_dest, prod_broker_url, cons_broker_url, num_msg); + + removeTopic(conn, topic_name); + sess.close(); + conn.close(); + } + + /** + * TEST TEMPORARY QUEUES + */ + public void testTempQueue(String prod_broker_url, String cons_broker_url) throws Exception { + int num_msg; + + Connection conn; + Session sess; + + Destination cons_dest; + + num_msg = 5; + + LOG.info("TESTING TEMP QUEUES " + prod_broker_url + " -> " + cons_broker_url + " (" + num_msg + " messages)"); + + conn = createConnection(cons_broker_url); + conn.start(); + sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE); + + LOG.trace("Creating destination"); + cons_dest = sess.createTemporaryQueue(); + + testOneDest(conn, sess, cons_dest, prod_broker_url, cons_broker_url, num_msg); + + sess.close(); + conn.close(); + } + + /** + * TEST QUEUES + */ + public void testQueue(String prod_broker_url, String cons_broker_url) throws Exception { + int num_msg; + + Connection conn; + Session sess; + String queue_name; + + Destination cons_dest; + + num_msg = 5; + + LOG.info("TESTING QUEUES " + prod_broker_url + " -> " + cons_broker_url + " (" + num_msg + " messages)"); + + conn = createConnection(cons_broker_url); + conn.start(); + sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE); + + queue_name = "topotest2.perm.queue"; + LOG.trace("Removing existing Queue"); + removeQueue(conn, queue_name); + LOG.trace("Creating Queue, " + queue_name); + cons_dest = sess.createQueue(queue_name); + + testOneDest(conn, sess, cons_dest, prod_broker_url, cons_broker_url, num_msg); + + removeQueue(conn, queue_name); + sess.close(); + conn.close(); + } + + @Test + public void run() throws Exception { + Thread start1; + Thread start2; + + testError = false; + + // Use threads to avoid startup deadlock since the first broker started waits until + // it knows the name of the remote broker before finishing its startup, which means + // the remote must already be running. + + start1 = new Thread() { + public void run() { + try { + broker1.start(); + } catch (Exception ex) { + LOG.error(null, ex); + } + } + }; + + start2 = new Thread() { + public void run() { + try { + broker2.start(); + } catch (Exception ex) { + LOG.error(null, ex); + } + } + }; + + start1.start(); + start2.start(); + + start1.join(); + start2.join(); + + if (!testError) { + this.testTempTopic(broker1.getConnectionUrl(), broker2.getConnectionUrl()); + } + if (!testError) { + this.testTempQueue(broker1.getConnectionUrl(), broker2.getConnectionUrl()); + } + if (!testError) { + this.testTopic(broker1.getConnectionUrl(), broker2.getConnectionUrl()); + } + if (!testError) { + this.testQueue(broker1.getConnectionUrl(), broker2.getConnectionUrl()); + } + Thread.sleep(100); + + shutdown(); + + assertTrue(!testError); + } + + public void shutdown() throws Exception { + broker1.stop(); + broker2.stop(); + } + + /** + * @param args + * the command line arguments + */ + public static void main(String[] args) { + AMQ3274Test main_obj; + + try { + main_obj = new AMQ3274Test(); + main_obj.run(); + } catch (Exception ex) { + ex.printStackTrace(); + LOG.error(null, ex); + System.exit(0); + } + } + + protected Connection createConnection(String url) throws Exception { + return org.apache.activemq.ActiveMQConnection.makeConnection(url); + } + + protected static void removeQueue(Connection conn, String dest_name) throws java.lang.Exception { + org.apache.activemq.command.ActiveMQDestination dest; + + if (conn instanceof org.apache.activemq.ActiveMQConnection) { + dest = org.apache.activemq.command.ActiveMQDestination.createDestination(dest_name, + (byte) org.apache.activemq.command.ActiveMQDestination.QUEUE_TYPE); + ((org.apache.activemq.ActiveMQConnection) conn).destroyDestination(dest); + } + } + + protected static void removeTopic(Connection conn, String dest_name) throws java.lang.Exception { + org.apache.activemq.command.ActiveMQDestination dest; + + if (conn instanceof org.apache.activemq.ActiveMQConnection) { + dest = org.apache.activemq.command.ActiveMQDestination.createDestination(dest_name, + (byte) org.apache.activemq.command.ActiveMQDestination.TOPIC_TYPE); + ((org.apache.activemq.ActiveMQConnection) conn).destroyDestination(dest); + } + } + + @SuppressWarnings("rawtypes") + public static String fmtMsgInfo(Message msg) throws Exception { + StringBuilder msg_desc; + String prop; + Enumeration prop_enum; + + msg_desc = new StringBuilder(); + msg_desc = new StringBuilder(); + + if (msg instanceof TextMessage) { + msg_desc.append(((TextMessage) msg).getText()); + } else { + msg_desc.append("["); + msg_desc.append(msg.getClass().getName()); + msg_desc.append("]"); + } + + prop_enum = msg.getPropertyNames(); + while (prop_enum.hasMoreElements()) { + prop = (String) prop_enum.nextElement(); + msg_desc.append("; "); + msg_desc.append(prop); + msg_desc.append("="); + msg_desc.append(msg.getStringProperty(prop)); + } + + return msg_desc.toString(); + } + + // //////////////////////////////////////////////////////////////////////////////////////////////////////////////////// + // /////////////////////////////////////////////// INTERNAL CLASSES + // ///////////////////////////////////////////////// + // //////////////////////////////////////////////////////////////////////////////////////////////////////////////////// + + protected class EmbeddedTcpBroker { + protected BrokerService brokerSvc; + protected int brokerNum; + protected String brokerName; + protected String brokerId; + protected int port; + protected String tcpUrl; + + public EmbeddedTcpBroker() throws Exception { + brokerSvc = new BrokerService(); + + synchronized (this.getClass()) { + brokerNum = Next_broker_num; + Next_broker_num++; + } + + brokerName = "broker" + brokerNum; + brokerId = "b" + brokerNum; + + brokerSvc.setBrokerName(brokerName); + brokerSvc.setBrokerId(brokerId); + brokerSvc.setPersistent(false); + brokerSvc.setUseJmx(false); + tcpUrl = brokerSvc.addConnector("tcp://localhost:0").getPublishableConnectString(); + } + + public Connection createConnection() throws URISyntaxException, JMSException { + Connection result; + + result = org.apache.activemq.ActiveMQConnection.makeConnection(this.tcpUrl); + + return result; + } + + public String getConnectionUrl() { + return this.tcpUrl; + } + + /** + * Create network connections to the given broker using the + * network-connector configuration of CORE brokers (e.g. + * core1.bus.dev1.coresys.tmcs) + * + * @param other + * @param duplex_f + */ + public void coreConnectTo(EmbeddedTcpBroker other, boolean duplex_f) throws Exception { + this.makeConnectionTo(other, duplex_f, true); + this.makeConnectionTo(other, duplex_f, false); + } + + public void start() throws Exception { + brokerSvc.start(); + } + + public void stop() throws Exception { + brokerSvc.stop(); + } + + /** + * Make one connection to the other embedded broker, of the specified + * type (queue or topic) using the standard CORE broker networking. + * + * @param other + * @param duplex_f + * @param queue_f + * @throws Exception + */ + protected void makeConnectionTo(EmbeddedTcpBroker other, boolean duplex_f, boolean queue_f) throws Exception { + NetworkConnector nw_conn; + String prefix; + ActiveMQDestination excl_dest; + ArrayList<ActiveMQDestination> excludes; + + nw_conn = new DiscoveryNetworkConnector(new URI("static:(" + other.tcpUrl + ")")); + nw_conn.setDuplex(duplex_f); + + if (queue_f) + nw_conn.setConduitSubscriptions(false); + else + nw_conn.setConduitSubscriptions(true); + + nw_conn.setNetworkTTL(5); + nw_conn.setSuppressDuplicateQueueSubscriptions(true); + nw_conn.setDecreaseNetworkConsumerPriority(true); + nw_conn.setBridgeTempDestinations(true); + + if (queue_f) { + prefix = "queue"; + excl_dest = ActiveMQDestination.createDestination(">", ActiveMQDestination.QUEUE_TYPE); + } else { + prefix = "topic"; + excl_dest = ActiveMQDestination.createDestination(">", ActiveMQDestination.TOPIC_TYPE); + } + + excludes = new ArrayList<ActiveMQDestination>(); + excludes.add(excl_dest); + nw_conn.setExcludedDestinations(excludes); + + if (duplex_f) + nw_conn.setName(this.brokerId + "<-" + prefix + "->" + other.brokerId); + else + nw_conn.setName(this.brokerId + "-" + prefix + "->" + other.brokerId); + + brokerSvc.addNetworkConnector(nw_conn); + } + } + + protected class MessageClient extends java.lang.Thread { + protected MessageConsumer msgCons; + protected boolean shutdownInd; + protected int expectedCount; + protected int lastSeq = 0; + protected int msgCount = 0; + protected boolean haveFirstSeq; + protected CountDownLatch shutdownLatch; + + public MessageClient(MessageConsumer cons, int num_to_expect) { + msgCons = cons; + expectedCount = (num_to_expect * (echoResponseFill + 1)); + shutdownLatch = new CountDownLatch(1); + } + + public void run() { + CountDownLatch latch; + + try { + synchronized (this) { + latch = shutdownLatch; + } + + shutdownInd = false; + processMessages(); + + latch.countDown(); + } catch (Exception exc) { + LOG.error("message client error", exc); + } + } + + public void waitShutdown(long timeout) { + CountDownLatch latch; + + try { + synchronized (this) { + latch = shutdownLatch; + } + + if (latch != null) + latch.await(timeout, TimeUnit.MILLISECONDS); + else + LOG.info("echo client shutdown: client does not appear to be active"); + } catch (InterruptedException int_exc) { + LOG.warn("wait for message client shutdown interrupted", int_exc); + } + } + + public boolean shutdown() { + boolean down_ind; + + if (!shutdownInd) { + shutdownInd = true; + } + + waitShutdown(200); + + synchronized (this) { + if ((shutdownLatch == null) || (shutdownLatch.getCount() == 0)) + down_ind = true; + else + down_ind = false; + } + + return down_ind; + } + + public int getNumMsgReceived() { + return msgCount; + } + + protected void processMessages() throws Exception { + Message in_msg; + + haveFirstSeq = false; + while ((!shutdownInd) && (!testError)) { + in_msg = msgCons.receive(100); + + if (in_msg != null) { + msgCount++; + checkMessage(in_msg); + } + } + } + + protected void checkMessage(Message in_msg) throws Exception { + int seq; + + LOG.debug("received message " + fmtMsgInfo(in_msg)); + + if (in_msg.propertyExists("SEQ")) { + seq = in_msg.getIntProperty("SEQ"); + + if ((haveFirstSeq) && (seq != (lastSeq + 1))) { + LOG.error("***ERROR*** incorrect sequence number; expected " + Integer.toString(lastSeq + 1) + " but have " + Integer.toString(seq)); + + testError = true; + } + + lastSeq = seq; + + if (msgCount > expectedCount) { + LOG.warn("*** have more messages than expected; have " + msgCount + "; expect " + expectedCount); + + testError = true; + } + } + + if (in_msg.propertyExists("end-of-response")) { + LOG.trace("received end-of-response message"); + shutdownInd = true; + } + } + } + + protected class EchoService extends java.lang.Thread { + protected String destName; + protected Connection jmsConn; + protected Session sess; + protected MessageConsumer msg_cons; + protected boolean Shutdown_ind; + + protected Destination req_dest; + protected Destination resp_dest; + protected MessageProducer msg_prod; + + protected CountDownLatch waitShutdown; + + public EchoService(String dest, Connection broker_conn) throws Exception { + destName = dest; + jmsConn = broker_conn; + + Shutdown_ind = false; + + sess = jmsConn.createSession(false, Session.AUTO_ACKNOWLEDGE); + req_dest = sess.createQueue(destName); + msg_cons = sess.createConsumer(req_dest); + + jmsConn.start(); + + waitShutdown = new CountDownLatch(1); + } + + public EchoService(String dest, String broker_url) throws Exception { + this(dest, ActiveMQConnection.makeConnection(broker_url)); + } + + public void run() { + Message req; + + try { + LOG.info("STARTING ECHO SERVICE"); + + while (!Shutdown_ind) { + req = msg_cons.receive(100); + if (req != null) { + if (LOG.isDebugEnabled()) + LOG.debug("ECHO request message " + req.toString()); + + resp_dest = req.getJMSReplyTo(); + if (resp_dest != null) { + msg_prod = sess.createProducer(resp_dest); + msg_prod.send(req); + msg_prod.close(); + msg_prod = null; + } else { + LOG.warn("invalid request: no reply-to destination given"); + } + } + } + } catch (Exception ex) { + LOG.error(null, ex); + } finally { + LOG.info("shutting down test echo service"); + + try { + jmsConn.stop(); + } catch (javax.jms.JMSException jms_exc) { + LOG.warn("error on shutting down JMS connection", jms_exc); + } + + synchronized (this) { + waitShutdown.countDown(); + } + } + } + + /** + * Shut down the service, waiting up to 3 seconds for the service to + * terminate. + */ + public void shutdown() { + CountDownLatch wait_l; + + synchronized (this) { + wait_l = waitShutdown; + } + + Shutdown_ind = true; + + try { + if (wait_l != null) { + if (wait_l.await(3000, TimeUnit.MILLISECONDS)) { + LOG.info("echo service shutdown complete"); + } else { + LOG.warn("timeout waiting for echo service shutdown"); + } + } else { + LOG.info("echo service shutdown: service does not appear to be active"); + } + } catch (InterruptedException int_exc) { + LOG.warn("interrupted while waiting for echo service shutdown"); + } + } + } +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3324Test.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3324Test.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3324Test.java new file mode 100644 index 0000000..a1e9b93 --- /dev/null +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3324Test.java @@ -0,0 +1,148 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.bugs; + +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; + +import javax.jms.BytesMessage; +import javax.jms.Connection; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Session; +import javax.jms.TemporaryQueue; +import javax.jms.Topic; + +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.advisory.AdvisorySupport; +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.broker.region.DestinationInterceptor; +import org.apache.activemq.broker.region.policy.PolicyEntry; +import org.apache.activemq.broker.region.policy.PolicyMap; +import org.apache.activemq.broker.region.virtual.MirroredQueue; +import org.apache.activemq.command.ActiveMQDestination; +import org.apache.activemq.util.Wait; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class AMQ3324Test { + + private static final transient Logger LOG = LoggerFactory.getLogger(AMQ3324Test.class); + + private static final String bindAddress = "tcp://0.0.0.0:0"; + private BrokerService broker; + private ActiveMQConnectionFactory cf; + + private static final int MESSAGE_COUNT = 100; + + @Before + public void setUp() throws Exception { + broker = this.createBroker(); + String address = broker.getTransportConnectors().get(0).getPublishableConnectString(); + broker.start(); + broker.waitUntilStarted(); + + cf = new ActiveMQConnectionFactory(address); + } + + @After + public void tearDown() throws Exception { + if (broker != null) { + broker.stop(); + broker.waitUntilStopped(); + } + } + + @Test + public void testTempMessageConsumedAdvisoryConnectionClose() throws Exception { + + Connection connection = cf.createConnection(); + connection.start(); + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + + final TemporaryQueue queue = session.createTemporaryQueue(); + MessageConsumer consumer = session.createConsumer(queue); + + final Topic advisoryTopic = AdvisorySupport.getMessageConsumedAdvisoryTopic((ActiveMQDestination) queue); + + MessageConsumer advisoryConsumer = session.createConsumer(advisoryTopic); + MessageProducer producer = session.createProducer(queue); + + // send lots of messages to the tempQueue + for (int i = 0; i < MESSAGE_COUNT; i++) { + BytesMessage m = session.createBytesMessage(); + m.writeBytes(new byte[1024]); + producer.send(m); + } + + // consume one message from tempQueue + Message msg = consumer.receive(5000); + assertNotNull(msg); + + // check one advisory message has produced on the advisoryTopic + Message advCmsg = advisoryConsumer.receive(5000); + assertNotNull(advCmsg); + + connection.close(); + LOG.debug("Connection closed, destinations should now become inactive."); + + assertTrue("The destination " + advisoryTopic + "was not removed. ", Wait.waitFor(new Wait.Condition() { + @Override + public boolean isSatisified() throws Exception { + return broker.getAdminView().getTopics().length == 0; + } + })); + + assertTrue("The destination " + queue + " was not removed. ", Wait.waitFor(new Wait.Condition() { + @Override + public boolean isSatisified() throws Exception { + return broker.getAdminView().getTemporaryQueues().length == 0; + } + })); + } + + protected BrokerService createBroker() throws Exception { + BrokerService answer = new BrokerService(); + answer.setUseMirroredQueues(true); + answer.setPersistent(false); + answer.setSchedulePeriodForDestinationPurge(1000); + + PolicyEntry entry = new PolicyEntry(); + entry.setGcInactiveDestinations(true); + entry.setInactiveTimoutBeforeGC(2000); + entry.setProducerFlowControl(true); + entry.setAdvisoryForConsumed(true); + entry.setAdvisoryForFastProducers(true); + entry.setAdvisoryForDelivery(true); + PolicyMap map = new PolicyMap(); + map.setDefaultEntry(entry); + + MirroredQueue mirrorQ = new MirroredQueue(); + mirrorQ.setCopyMessage(true); + DestinationInterceptor[] destinationInterceptors = new DestinationInterceptor[]{mirrorQ}; + answer.setDestinationInterceptors(destinationInterceptors); + + answer.setDestinationPolicy(map); + answer.addConnector(bindAddress); + + return answer; + } +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3352Test.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3352Test.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3352Test.java new file mode 100644 index 0000000..aa84d2d --- /dev/null +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3352Test.java @@ -0,0 +1,75 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.bugs; + +import javax.jms.DeliveryMode; +import javax.jms.MessageProducer; +import javax.jms.Session; + +import org.apache.activemq.ActiveMQConnection; +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.broker.TransportConnector; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +public class AMQ3352Test +{ + TransportConnector connector; + BrokerService brokerService; + + @Before + public void startBroker() throws Exception { + brokerService = new BrokerService(); + brokerService.setDeleteAllMessagesOnStartup(true); + connector = brokerService.addConnector("tcp://0.0.0.0:0"); + brokerService.start(); + } + + @After + public void stopBroker() throws Exception { + brokerService.stop(); + } + + @Test + public void verifyEnqueueLargeNumWithStateTracker() throws Exception { + String url = "failover:(" + connector.getPublishableConnectString() + ")?jms.useAsyncSend=true&trackMessages=true&maxCacheSize=131072"; + + ActiveMQConnection conn = (ActiveMQConnection)new ActiveMQConnectionFactory(url).createConnection(null, null); + + Session session = conn.createSession(false, Session.CLIENT_ACKNOWLEDGE); + + MessageProducer producer = session.createProducer(session.createQueue("EVENTQ")); + producer.setDeliveryMode(DeliveryMode.PERSISTENT); + producer.setDisableMessageID(true); + producer.setDisableMessageTimestamp(true); + + StringBuffer buffer = new StringBuffer(); + for (int i=0;i<1024;i++) + { + buffer.append(String.valueOf(Math.random())); + } + String payload = buffer.toString(); + + for (int i=0; i<10000; i++) { + StringBuffer buff = new StringBuffer("x"); + buff.append(payload); + producer.send(session.createTextMessage(buff.toString())); + } + } +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3405Test.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3405Test.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3405Test.java new file mode 100644 index 0000000..9711d06 --- /dev/null +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ3405Test.java @@ -0,0 +1,280 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.bugs; + +import java.util.concurrent.atomic.AtomicInteger; + +import javax.jms.Connection; +import javax.jms.DeliveryMode; +import javax.jms.Destination; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageListener; +import javax.jms.MessageProducer; +import javax.jms.Queue; +import javax.jms.Session; +import javax.jms.TextMessage; +import javax.jms.Topic; +import javax.management.MalformedObjectNameException; +import javax.management.ObjectName; + +import org.apache.activemq.ActiveMQConnection; +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.RedeliveryPolicy; +import org.apache.activemq.TestSupport; +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.broker.jmx.QueueViewMBean; +import org.apache.activemq.broker.region.policy.DeadLetterStrategy; +import org.apache.activemq.broker.region.policy.PolicyEntry; +import org.apache.activemq.broker.region.policy.PolicyMap; +import org.apache.activemq.command.ActiveMQQueue; +import org.apache.activemq.util.Wait; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class AMQ3405Test extends TestSupport { + private static final Logger LOG = LoggerFactory.getLogger(AMQ3405Test.class); + + private Connection connection; + private Session session; + private MessageConsumer consumer; + private MessageProducer producer; + private int deliveryMode = DeliveryMode.PERSISTENT; + private Destination dlqDestination; + private MessageConsumer dlqConsumer; + private BrokerService broker; + + private int messageCount; + private Destination destination; + private int rollbackCount; + private Session dlqSession; + private final Error[] error = new Error[1]; + private boolean topic = true; + private boolean durableSubscriber = true; + + public void testTransientTopicMessage() throws Exception { + topic = true; + deliveryMode = DeliveryMode.NON_PERSISTENT; + durableSubscriber = true; + doTest(); + } + + protected BrokerService createBroker() throws Exception { + BrokerService broker = new BrokerService(); + broker.setPersistent(false); + PolicyEntry policy = new PolicyEntry(); + DeadLetterStrategy defaultDeadLetterStrategy = policy.getDeadLetterStrategy(); + if(defaultDeadLetterStrategy!=null) { + defaultDeadLetterStrategy.setProcessNonPersistent(true); + } + PolicyMap pMap = new PolicyMap(); + pMap.setDefaultEntry(policy); + broker.setDestinationPolicy(pMap); + return broker; + } + + protected void doTest() throws Exception { + messageCount = 200; + connection.start(); + + final QueueViewMBean dlqView = getProxyToDLQ(); + + ActiveMQConnection amqConnection = (ActiveMQConnection) connection; + rollbackCount = amqConnection.getRedeliveryPolicy().getMaximumRedeliveries() + 1; + LOG.info("Will redeliver messages: " + rollbackCount + " times"); + + makeConsumer(); + makeDlqConsumer(); + dlqConsumer.close(); + + sendMessages(); + + // now lets receive and rollback N times + int maxRollbacks = messageCount * rollbackCount; + + consumer.setMessageListener(new RollbackMessageListener(maxRollbacks, rollbackCount)); + + // We receive and rollback into the DLQ N times moving the DLQ messages back to their + // original Q to test that they are continually placed back in the DLQ. + for (int i = 0; i < 2; ++i) { + + assertTrue("DLQ was not filled as expected", Wait.waitFor(new Wait.Condition() { + @Override + public boolean isSatisified() throws Exception { + return dlqView.getQueueSize() == messageCount; + } + })); + + connection.stop(); + + assertEquals("DLQ should be full now.", messageCount, dlqView.getQueueSize()); + + String moveTo; + if (topic) { + moveTo = "topic://" + ((Topic) getDestination()).getTopicName(); + } else { + moveTo = "queue://" + ((Queue) getDestination()).getQueueName(); + } + + LOG.debug("Moving " + messageCount + " messages from ActiveMQ.DLQ to " + moveTo); + dlqView.moveMatchingMessagesTo("", moveTo); + + assertTrue("DLQ was not emptied as expected", Wait.waitFor(new Wait.Condition() { + @Override + public boolean isSatisified() throws Exception { + return dlqView.getQueueSize() == 0; + } + })); + + connection.start(); + } + } + + protected void makeConsumer() throws JMSException { + Destination destination = getDestination(); + LOG.info("Consuming from: " + destination); + if (durableSubscriber) { + consumer = session.createDurableSubscriber((Topic)destination, destination.toString()); + } else { + consumer = session.createConsumer(destination); + } + } + + protected void makeDlqConsumer() throws JMSException { + dlqDestination = createDlqDestination(); + + LOG.info("Consuming from dead letter on: " + dlqDestination); + dlqConsumer = dlqSession.createConsumer(dlqDestination); + } + + @Override + protected void setUp() throws Exception { + broker = createBroker(); + broker.start(); + broker.waitUntilStarted(); + + connection = createConnection(); + connection.setClientID(createClientId()); + + session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE); + connection.start(); + + dlqSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + } + + @Override + protected void tearDown() throws Exception { + dlqConsumer.close(); + dlqSession.close(); + session.close(); + + if (broker != null) { + broker.stop(); + broker.waitUntilStopped(); + } + }; + + @Override + protected ActiveMQConnectionFactory createConnectionFactory() + throws Exception { + ActiveMQConnectionFactory answer = super.createConnectionFactory(); + RedeliveryPolicy policy = new RedeliveryPolicy(); + policy.setMaximumRedeliveries(3); + policy.setBackOffMultiplier((short) 1); + policy.setRedeliveryDelay(0); + policy.setInitialRedeliveryDelay(0); + policy.setUseExponentialBackOff(false); + answer.setRedeliveryPolicy(policy); + return answer; + } + + protected void sendMessages() throws JMSException { + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + producer = session.createProducer(getDestination()); + producer.setDeliveryMode(deliveryMode); + + LOG.info("Sending " + messageCount + " messages to: " + getDestination()); + for (int i = 0; i < messageCount; i++) { + Message message = createMessage(session, i); + producer.send(message); + } + } + + protected TextMessage createMessage(Session session, int i) throws JMSException { + return session.createTextMessage(getMessageText(i)); + } + + protected String getMessageText(int i) { + return "message: " + i; + } + + protected Destination createDlqDestination() { + return new ActiveMQQueue("ActiveMQ.DLQ"); + } + + private QueueViewMBean getProxyToDLQ() throws MalformedObjectNameException, JMSException { + ObjectName queueViewMBeanName = new ObjectName( + "org.apache.activemq:type=Broker,brokerName=localhost," + + "destinationType=Queue,destinationName=ActiveMQ.DLQ"); + QueueViewMBean proxy = (QueueViewMBean) broker.getManagementContext() + .newProxyInstance(queueViewMBeanName, QueueViewMBean.class, true); + return proxy; + } + + protected Destination getDestination() { + if (destination == null) { + destination = createDestination(); + } + return destination; + } + + protected String createClientId() { + return toString(); + } + + class RollbackMessageListener implements MessageListener { + + final int maxRollbacks; + final int deliveryCount; + final AtomicInteger rollbacks = new AtomicInteger(); + + RollbackMessageListener(int c, int delvery) { + maxRollbacks = c; + deliveryCount = delvery; + } + + @Override + public void onMessage(Message message) { + try { + int expectedMessageId = rollbacks.get() / deliveryCount; + LOG.info("expecting messageId: " + expectedMessageId); + rollbacks.incrementAndGet(); + session.rollback(); + } catch (Throwable e) { + LOG.error("unexpected exception:" + e, e); + // propagating assertError to execution task will cause a hang + // at shutdown + if (e instanceof Error) { + error[0] = (Error) e; + } else { + fail("unexpected exception: " + e); + } + } + } + } +}
