http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ1282.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ1282.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ1282.java new file mode 100644 index 0000000..9cd240f --- /dev/null +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ1282.java @@ -0,0 +1,188 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.bugs; + +import javax.jms.Connection; +import javax.jms.ConnectionFactory; +import javax.jms.JMSException; +import javax.jms.MapMessage; +import javax.jms.Session; + +import junit.framework.TestCase; + +import org.apache.activemq.ActiveMQConnectionFactory; + +/** + * An AMQ-1282 Test + */ +public class AMQ1282 extends TestCase { + private ConnectionFactory factory; + private Connection connection; + private MapMessage message; + + @Override + protected void setUp() throws Exception { + factory = new ActiveMQConnectionFactory("vm://localhost?broker.persistent=false"); + connection = factory.createConnection(); + connection.start(); + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + message = session.createMapMessage(); + super.setUp(); + } + + @Override + protected void tearDown() throws Exception { + connection.close(); + super.tearDown(); + } + + public void testUnmappedBooleanMessage() throws JMSException { + Object expected; + try { + expected = Boolean.valueOf(null); + } catch (Exception ex) { + expected = ex; + } + try { + Boolean actual = message.getBoolean("foo"); + assertEquals(expected, actual); + } catch (Exception ex) { + assertEquals(expected, ex); + } + } + + public void testUnmappedIntegerMessage() throws JMSException { + Object expected; + try { + expected = Integer.valueOf(null); + } catch (Exception ex) { + expected = ex; + } + try { + Integer actual = message.getInt("foo"); + assertEquals(expected, actual); + } catch (Exception ex) { + Class<?> aClass = expected.getClass(); + assertTrue(aClass.isInstance(ex)); + } + } + + public void testUnmappedShortMessage() throws JMSException { + Object expected; + try { + expected = Short.valueOf(null); + } catch (Exception ex) { + expected = ex; + } + try { + Short actual = message.getShort("foo"); + assertEquals(expected, actual); + } catch (Exception ex) { + Class<?> aClass = expected.getClass(); + assertTrue(aClass.isInstance(ex)); + } + } + + public void testUnmappedLongMessage() throws JMSException { + Object expected; + try { + expected = Long.valueOf(null); + } catch (Exception ex) { + expected = ex; + } + try { + Long actual = message.getLong("foo"); + assertEquals(expected, actual); + } catch (Exception ex) { + Class<?> aClass = expected.getClass(); + assertTrue(aClass.isInstance(ex)); + } + } + + public void testUnmappedStringMessage() throws JMSException { + Object expected; + try { + expected = String.valueOf(null); + } catch (Exception ex) { + expected = ex; + } + try { + String actual = message.getString("foo"); + assertEquals(expected, actual); + } catch (Exception ex) { + Class<?> aClass = expected.getClass(); + assertTrue(aClass.isInstance(ex)); + } + } + + public void testUnmappedCharMessage() throws JMSException { + try { + message.getChar("foo"); + fail("should have thrown NullPointerException"); + } catch (NullPointerException success) { + assertNotNull(success); + } + } + + public void testUnmappedByteMessage() throws JMSException { + Object expected; + try { + expected = Byte.valueOf(null); + } catch (Exception ex) { + expected = ex; + } + try { + Byte actual = message.getByte("foo"); + assertEquals(expected, actual); + } catch (Exception ex) { + Class<?> aClass = expected.getClass(); + assertTrue(aClass.isInstance(ex)); + } + } + + public void testUnmappedDoubleMessage() throws JMSException { + Object expected; + try { + expected = Double.valueOf(null); + } catch (Exception ex) { + expected = ex; + } + try { + Double actual = message.getDouble("foo"); + assertEquals(expected, actual); + } catch (Exception ex) { + Class<?> aClass = expected.getClass(); + assertTrue(aClass.isInstance(ex)); + } + } + + public void testUnmappedFloatMessage() throws JMSException { + Object expected; + try { + expected = Float.valueOf(null); + } catch (Exception ex) { + expected = ex; + } + try { + Float actual = message.getFloat("foo"); + assertEquals(expected, actual); + } catch (Exception ex) { + Class<?> aClass = expected.getClass(); + assertTrue(aClass.isInstance(ex)); + } + } +}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ1687Test.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ1687Test.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ1687Test.java new file mode 100644 index 0000000..8e636d0 --- /dev/null +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ1687Test.java @@ -0,0 +1,104 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.bugs; + +import javax.jms.Connection; +import javax.jms.ConnectionFactory; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Session; + +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.EmbeddedBrokerTestSupport; +import org.apache.activemq.command.ActiveMQQueue; +import org.apache.activemq.command.ActiveMQTopic; +import org.apache.activemq.spring.ConsumerBean; + +/** + * + * + */ +public class AMQ1687Test extends EmbeddedBrokerTestSupport { + + private Connection connection; + + @Override + protected ConnectionFactory createConnectionFactory() throws Exception { + //prefetch change is not required, but test will not fail w/o it, only spew errors in the AMQ log. + return new ActiveMQConnectionFactory( + broker.getTransportConnectors().get(0).getPublishableConnectString() +"?jms.prefetchPolicy.all=5"); + } + + public void testVirtualTopicCreation() throws Exception { + if (connection == null) { + connection = createConnection(); + } + connection.start(); + + ConsumerBean messageList = new ConsumerBean(); + messageList.setVerbose(true); + + String queueAName = getVirtualTopicConsumerName(); + String queueBName = getVirtualTopicConsumerNameB(); + + // create consumer 'cluster' + ActiveMQQueue queue1 = new ActiveMQQueue(queueAName); + ActiveMQQueue queue2 = new ActiveMQQueue(queueBName); + + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + MessageConsumer c1 = session.createConsumer(queue1); + MessageConsumer c2 = session.createConsumer(queue2); + + c1.setMessageListener(messageList); + c2.setMessageListener(messageList); + + // create topic producer + ActiveMQTopic topic = new ActiveMQTopic(getVirtualTopicName()); + MessageProducer producer = session.createProducer(topic); + assertNotNull(producer); + + int total = 100; + for (int i = 0; i < total; i++) { + producer.send(session.createTextMessage("message: " + i)); + } + + messageList.assertMessagesArrived(total*2); + } + + protected String getVirtualTopicName() { + return "VirtualTopic.TEST"; + } + + protected String getVirtualTopicConsumerName() { + return "Consumer.A.VirtualTopic.TEST"; + } + + protected String getVirtualTopicConsumerNameB() { + return "Consumer.B.VirtualTopic.TEST"; + } + + protected void setUp() throws Exception { + this.bindAddress="tcp://localhost:0"; + super.setUp(); + } + protected void tearDown() throws Exception { + if (connection != null) { + connection.close(); + } + super.tearDown(); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ1853Test.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ1853Test.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ1853Test.java new file mode 100644 index 0000000..a3c3b1d --- /dev/null +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ1853Test.java @@ -0,0 +1,368 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.bugs; + +import static org.junit.Assert.*; + +import java.net.URI; +import java.util.Hashtable; +import java.util.Iterator; +import java.util.Map; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.atomic.AtomicInteger; + +import javax.jms.DeliveryMode; +import javax.jms.Destination; +import javax.jms.ExceptionListener; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageListener; +import javax.jms.Session; +import javax.jms.TextMessage; + +import org.apache.activemq.ActiveMQConnection; +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.ActiveMQMessageProducer; +import org.apache.activemq.ActiveMQSession; +import org.apache.activemq.RedeliveryPolicy; +import org.apache.activemq.broker.BrokerFactory; +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.util.Wait; +import org.apache.activemq.util.Wait.Condition; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Test validates that the AMQ consumer blocks on redelivery of a message, + * through all redeliveries, until the message is either successfully consumed + * or sent to the DLQ. + */ +public class AMQ1853Test { + private static BrokerService broker; + + private static final Logger LOG = LoggerFactory.getLogger(AMQ1853Test.class); + static final String jmsConnectionURI = "failover:(vm://localhost)"; + + // Virtual Topic that the test publishes 10 messages to + private static final String queueFail = "Queue.BlockingConsumer.QueueFail"; + + // Number of messages + + private final int producerMessages = 5; + private final int totalNumberMessages = producerMessages * 2; + private final int maxRedeliveries = 2; + private final int redeliveryDelay = 1000; + + private Map<String, AtomicInteger> messageList = null; + + @Before + public void setUp() throws Exception { + broker = BrokerFactory.createBroker(new URI("broker:()/localhost?persistent=false")); + broker.setUseJmx(false); + broker.setDeleteAllMessagesOnStartup(true); + broker.start(); + broker.waitUntilStarted(); + } + + @After + public void tearDown() throws Exception { + if (broker != null) { + broker.stop(); + broker.waitUntilStopped(); + broker = null; + } + } + + @Test + public void testConsumerMessagesAreNotOrdered() throws Exception { + + TestConsumer consumerAllFail = null; + messageList = new Hashtable<String, AtomicInteger>(); + + try { + + // The first 2 consumers will rollback, ultimately causing messages to land on the DLQ + + TestProducer producerAllFail = new TestProducer(queueFail); + thread(producerAllFail, false); + + consumerAllFail = new TestConsumer(queueFail, true); + thread(consumerAllFail, false); + + // Give the consumers a second to start + Thread.sleep(1000); + + thread(producerAllFail, false); + + // Give the consumers a second to start + Thread.sleep(1000); + + producerAllFail.getLatch().await(); + + LOG.info("producer successful, count = " + producerAllFail.getLatch().getCount()); + LOG.info("final message list size = " + messageList.size()); + + assertTrue("message list size = " + messageList.size() + " exptected:" + totalNumberMessages, + Wait.waitFor(new Condition() { + @Override + public boolean isSatisified() throws Exception { + return totalNumberMessages == messageList.size(); + } + })); + + consumerAllFail.getLatch().await(); + + LOG.info("consumerAllFail successful, count = " + consumerAllFail.getLatch().getCount()); + + Iterator<String> keys = messageList.keySet().iterator(); + for (AtomicInteger counter : messageList.values()) { + String message = keys.next(); + LOG.info("final count for message " + message + " counter = " + counter.get()); + assertTrue("for message " + message + " counter = " + counter.get(), counter.get() == maxRedeliveries + 1); + } + + assertFalse(consumerAllFail.messageReceiptIsOrdered()); + } finally { + if (consumerAllFail != null) { + consumerAllFail.setStop(true); + } + } + } + + private static Thread thread(Runnable runnable, boolean daemon) { + Thread brokerThread = new Thread(runnable); + brokerThread.setDaemon(daemon); + brokerThread.start(); + return brokerThread; + } + + private class TestProducer implements Runnable { + + private CountDownLatch latch = null; + private String destinationName = null; + + public TestProducer(String destinationName) { + this.destinationName = destinationName; + // We run the producer 2 times + latch = new CountDownLatch(totalNumberMessages); + } + + public CountDownLatch getLatch() { + return latch; + } + + public void run() { + + ActiveMQConnectionFactory connectionFactory = null; + ActiveMQConnection connection = null; + ActiveMQSession session = null; + Destination destination = null; + + try { + LOG.info("Started TestProducer for destination (" + destinationName + ")"); + + connectionFactory = new ActiveMQConnectionFactory(jmsConnectionURI); + connection = (ActiveMQConnection) connectionFactory.createConnection(); + connection.setCopyMessageOnSend(false); + connection.start(); + session = (ActiveMQSession) connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + + destination = session.createQueue(this.destinationName); + + // Create a MessageProducer from the Session to the Topic or Queue + ActiveMQMessageProducer producer = (ActiveMQMessageProducer) session.createProducer(destination); + producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); + + for (int i = 0; i < (producerMessages); i++) { + TextMessage message = (TextMessage) session.createTextMessage(); + message.setLongProperty("TestTime", (System.currentTimeMillis())); + try { + producer.send(message); + LOG.info("Producer (" + destinationName + ")\n" + message.getJMSMessageID() + " = sent messageId\n"); + + latch.countDown(); + LOG.info(" Latch count " + latch.getCount()); + LOG.info("Producer message list size = " + messageList.keySet().size()); + messageList.put(message.getJMSMessageID(), new AtomicInteger(0)); + LOG.info("Producer message list size = " + messageList.keySet().size()); + + } catch (Exception deeperException) { + LOG.info("Producer for destination (" + destinationName + ") Caught: " + deeperException); + } + + Thread.sleep(1000); + } + + LOG.info("Finished TestProducer for destination (" + destinationName + ")"); + + } catch (Exception e) { + LOG.error("Terminating TestProducer(" + destinationName + ")Caught: " + e); + } finally { + try { + if (session != null) { + session.close(); + } + if (connection != null) { + connection.close(); + } + } catch (Exception e) { + LOG.error("Closing connection/session (" + destinationName + ")Caught: " + e); + } + } + } + } + + private class TestConsumer implements Runnable, ExceptionListener, MessageListener { + + private CountDownLatch latch = null; + private int receivedMessageCounter = 0; + private boolean bFakeFail = false; + String destinationName = null; + boolean bMessageReceiptIsOrdered = true; + boolean bStop = false; + String previousMessageId = null; + + private ActiveMQConnectionFactory connectionFactory = null; + private ActiveMQConnection connection = null; + private Session session = null; + private MessageConsumer consumer = null; + + public TestConsumer(String destinationName, boolean bFakeFail) { + this.bFakeFail = bFakeFail; + latch = new CountDownLatch(totalNumberMessages * (this.bFakeFail ? (maxRedeliveries + 1) : 1)); + this.destinationName = destinationName; + } + + public CountDownLatch getLatch() { + return latch; + } + + public boolean messageReceiptIsOrdered() { + return bMessageReceiptIsOrdered; + } + + public void run() { + + try { + LOG.info("Started TestConsumer for destination (" + destinationName + ")"); + + connectionFactory = new ActiveMQConnectionFactory(jmsConnectionURI); + connection = (ActiveMQConnection) connectionFactory.createConnection(); + connection.setNonBlockingRedelivery(true); + session = connection.createSession(true, Session.SESSION_TRANSACTED); + + RedeliveryPolicy policy = connection.getRedeliveryPolicy(); + policy.setInitialRedeliveryDelay(redeliveryDelay); + policy.setBackOffMultiplier(-1); + policy.setRedeliveryDelay(redeliveryDelay); + policy.setMaximumRedeliveryDelay(-1); + policy.setUseExponentialBackOff(false); + policy.setMaximumRedeliveries(maxRedeliveries); + + connection.setExceptionListener(this); + Destination destination = session.createQueue(destinationName); + consumer = session.createConsumer(destination); + consumer.setMessageListener(this); + + connection.start(); + + while (!bStop) { + Thread.sleep(100); + } + + LOG.info("Finished TestConsumer for destination name (" + destinationName + ") remaining " + this.latch.getCount() + + " messages " + this.toString()); + + } catch (Exception e) { + LOG.error("Consumer (" + destinationName + ") Caught: " + e); + } finally { + try { + if (consumer != null) { + consumer.close(); + } + if (session != null) { + session.close(); + } + if (connection != null) { + connection.close(); + } + } catch (Exception e) { + LOG.error("Closing connection/session (" + destinationName + ")Caught: " + e); + } + } + } + + public synchronized void onException(JMSException ex) { + LOG.error("Consumer for destination, (" + destinationName + "), JMS Exception occured. Shutting down client."); + } + + public synchronized void setStop(boolean bStop) { + this.bStop = bStop; + } + + public synchronized void onMessage(Message message) { + receivedMessageCounter++; + latch.countDown(); + + LOG.info("Consumer for destination (" + destinationName + ") latch countdown: " + latch.getCount() + + " :: Number messages received " + this.receivedMessageCounter); + + try { + + if (receivedMessageCounter % (maxRedeliveries + 1) == 1) { + previousMessageId = message.getJMSMessageID(); + } + + if (bMessageReceiptIsOrdered) { + bMessageReceiptIsOrdered = previousMessageId.trim().equals(message.getJMSMessageID()); + } + + final String jmsMessageId = message.getJMSMessageID(); + assertTrue("Did not find expected ", Wait.waitFor(new Wait.Condition() { + @Override + public boolean isSatisified() throws Exception { + return messageList.containsKey(jmsMessageId); + } + })); + + AtomicInteger counter = messageList.get(jmsMessageId); + counter.incrementAndGet(); + + LOG.info("Consumer for destination (" + destinationName + ")\n" + message.getJMSMessageID() + " = currentMessageId\n" + + previousMessageId + " = previousMessageId\n" + bMessageReceiptIsOrdered + "= bMessageReceiptIsOrdered\n" + + ">>LATENCY " + (System.currentTimeMillis() - message.getLongProperty("TestTime")) + "\n" + "message counter = " + + counter.get()); + + if (!bFakeFail) { + LOG.debug("Consumer on destination " + destinationName + " committing JMS Session for message: " + message.toString()); + session.commit(); + } else { + LOG.debug("Consumer on destination " + destinationName + " rolling back JMS Session for message: " + message.toString()); + session.rollback(); // rolls back all the consumed messages on the session to + } + + } catch (Exception ex) { + ex.printStackTrace(); + LOG.error("Error reading JMS Message from destination " + destinationName + "."); + } + } + } +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ1866.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ1866.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ1866.java new file mode 100644 index 0000000..1bdd72e --- /dev/null +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ1866.java @@ -0,0 +1,224 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.bugs; + +import java.util.ArrayList; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; + +import javax.jms.Connection; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Session; +import javax.jms.TextMessage; + +import junit.framework.TestCase; + +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.broker.region.policy.PolicyEntry; +import org.apache.activemq.broker.region.policy.PolicyMap; +import org.apache.activemq.command.ActiveMQQueue; +import org.apache.activemq.leveldb.LevelDBStore; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * This is a test case for the issue reported at: + * https://issues.apache.org/activemq/browse/AMQ-1866 + * + * If you have a JMS producer sending messages to multiple fast consumers and + * one slow consumer, eventually all consumers will run as slow as + * the slowest consumer. + */ +public class AMQ1866 extends TestCase { + + private static final Logger log = LoggerFactory.getLogger(ConsumerThread.class); + private BrokerService brokerService; + private ArrayList<Thread> threads = new ArrayList<Thread>(); + + private final String ACTIVEMQ_BROKER_BIND = "tcp://localhost:0"; + private String ACTIVEMQ_BROKER_URI; + + AtomicBoolean shutdown = new AtomicBoolean(); + private ActiveMQQueue destination; + + @Override + protected void setUp() throws Exception { + // Start an embedded broker up. + brokerService = new BrokerService(); + LevelDBStore adaptor = new LevelDBStore(); + brokerService.setPersistenceAdapter(adaptor); + brokerService.deleteAllMessages(); + + // A small max page size makes this issue occur faster. + PolicyMap policyMap = new PolicyMap(); + PolicyEntry pe = new PolicyEntry(); + pe.setMaxPageSize(1); + policyMap.put(new ActiveMQQueue(">"), pe); + brokerService.setDestinationPolicy(policyMap); + + brokerService.addConnector(ACTIVEMQ_BROKER_BIND); + brokerService.start(); + + ACTIVEMQ_BROKER_URI = brokerService.getTransportConnectors().get(0).getPublishableConnectString(); + destination = new ActiveMQQueue(getName()); + } + + @Override + protected void tearDown() throws Exception { + // Stop any running threads. + shutdown.set(true); + for (Thread t : threads) { + t.interrupt(); + t.join(); + } + brokerService.stop(); + } + + public void testConsumerSlowDownPrefetch0() throws Exception { + ACTIVEMQ_BROKER_URI = ACTIVEMQ_BROKER_URI + "?jms.prefetchPolicy.queuePrefetch=0"; + doTestConsumerSlowDown(); + } + + public void testConsumerSlowDownPrefetch10() throws Exception { + ACTIVEMQ_BROKER_URI = ACTIVEMQ_BROKER_URI + "?jms.prefetchPolicy.queuePrefetch=10"; + doTestConsumerSlowDown(); + } + + public void testConsumerSlowDownDefaultPrefetch() throws Exception { + doTestConsumerSlowDown(); + } + + public void doTestConsumerSlowDown() throws Exception { + + // Preload the queue. + produce(20000); + + Thread producer = new Thread() { + @Override + public void run() { + try { + while(!shutdown.get()) { + produce(1000); + } + } catch (Exception e) { + } + } + }; + threads.add(producer); + producer.start(); + + // This is the slow consumer. + ConsumerThread c1 = new ConsumerThread("Consumer-1"); + threads.add(c1); + c1.start(); + + // Wait a bit so that the slow consumer gets assigned most of the messages. + Thread.sleep(500); + ConsumerThread c2 = new ConsumerThread("Consumer-2"); + threads.add(c2); + c2.start(); + + int totalReceived = 0; + for ( int i=0; i < 30; i++) { + Thread.sleep(1000); + long c1Counter = c1.counter.getAndSet(0); + long c2Counter = c2.counter.getAndSet(0); + log.debug("c1: "+c1Counter+", c2: "+c2Counter); + totalReceived += c1Counter; + totalReceived += c2Counter; + + // Once message have been flowing for a few seconds, start asserting that c2 always gets messages. It should be receiving about 100 / sec + if( i > 10 ) { + assertTrue("Total received=" + totalReceived + ", Consumer 2 should be receiving new messages every second.", c2Counter > 0); + } + } + } + + public void produce(int count) throws Exception { + Connection connection=null; + try { + ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(ACTIVEMQ_BROKER_URI); + factory.setDispatchAsync(true); + + connection = factory.createConnection(); + + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + MessageProducer producer = session.createProducer(destination); + connection.start(); + + for( int i=0 ; i< count; i++ ) { + producer.send(session.createTextMessage(getName()+" Message "+(++i))); + } + + } finally { + try { + connection.close(); + } catch (Throwable e) { + } + } + } + + public class ConsumerThread extends Thread { + final AtomicLong counter = new AtomicLong(); + + public ConsumerThread(String threadId) { + super(threadId); + } + + public void run() { + Connection connection=null; + try { + log.debug(getName() + ": is running"); + + ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(ACTIVEMQ_BROKER_URI); + factory.setDispatchAsync(true); + + connection = factory.createConnection(); + + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + MessageConsumer consumer = session.createConsumer(destination); + connection.start(); + + while (!shutdown.get()) { + TextMessage msg = (TextMessage)consumer.receive(1000); + if ( msg!=null ) { + int sleepingTime; + if (getName().equals("Consumer-1")) { + sleepingTime = 1000 * 1000; + } else { + sleepingTime = 1; + } + counter.incrementAndGet(); + Thread.sleep(sleepingTime); + } + } + + } catch (Exception e) { + } finally { + log.debug(getName() + ": is stopping"); + try { + connection.close(); + } catch (Throwable e) { + } + } + } + + } + +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ1893Test.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ1893Test.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ1893Test.java new file mode 100644 index 0000000..f5ccd50 --- /dev/null +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ1893Test.java @@ -0,0 +1,196 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.bugs; + +import junit.framework.TestCase; +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.command.ActiveMQQueue; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.jms.Connection; +import javax.jms.ConnectionFactory; +import javax.jms.DeliveryMode; +import javax.jms.Destination; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageListener; +import javax.jms.MessageProducer; +import javax.jms.Session; +import javax.jms.TextMessage; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +public class AMQ1893Test extends TestCase { + + private static final Logger log = LoggerFactory.getLogger(AMQ1893Test.class); + + static final String QUEUE_NAME = "TEST"; + + static final int MESSAGE_COUNT_OF_ONE_GROUP = 10000; + + static final int[] PRIORITIES = new int[]{0, 5, 10}; + + static final boolean debug = false; + + private BrokerService brokerService; + + private ActiveMQQueue destination; + + @Override + protected void setUp() throws Exception { + brokerService = new BrokerService(); + brokerService.setDeleteAllMessagesOnStartup(true); + brokerService.addConnector("tcp://localhost:0"); + brokerService.start(); + destination = new ActiveMQQueue(QUEUE_NAME); + } + + @Override + protected void tearDown() throws Exception { + // Stop any running threads. + brokerService.stop(); + } + + + public void testProduceConsumeWithSelector() throws Exception { + new TestProducer().produceMessages(); + new TestConsumer().consume(); + } + + + class TestProducer { + + public void produceMessages() throws Exception { + ConnectionFactory connectionFactory = new ActiveMQConnectionFactory( + brokerService.getTransportConnectors().get(0).getConnectUri().toString() + ); + Connection connection = connectionFactory.createConnection(); + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + Destination destination = session.createQueue(QUEUE_NAME); + MessageProducer producer = session.createProducer(destination); + producer.setDeliveryMode(DeliveryMode.PERSISTENT); + + long start = System.currentTimeMillis(); + + for (int priority : PRIORITIES) { + + String name = null; + if (priority == 10) { + name = "high"; + } else if (priority == 5) { + name = "mid"; + } else { + name = "low"; + } + + for (int i = 1; i <= MESSAGE_COUNT_OF_ONE_GROUP; i++) { + + TextMessage message = session.createTextMessage(name + "_" + i); + message.setIntProperty("priority", priority); + + producer.send(message); + } + } + + long end = System.currentTimeMillis(); + + log.info("sent " + (MESSAGE_COUNT_OF_ONE_GROUP * 3) + " messages in " + (end - start) + " ms"); + + producer.close(); + session.close(); + connection.close(); + } + } + + class TestConsumer { + + private CountDownLatch finishLatch = new CountDownLatch(1); + + + + public void consume() throws Exception { + ConnectionFactory connectionFactory = new ActiveMQConnectionFactory( + brokerService.getTransportConnectors().get(0).getConnectUri().toString() + ); + + + final int totalMessageCount = MESSAGE_COUNT_OF_ONE_GROUP * PRIORITIES.length; + final AtomicInteger counter = new AtomicInteger(); + final MessageListener listener = new MessageListener() { + public void onMessage(Message message) { + + if (debug) { + try { + log.info(((TextMessage) message).getText()); + } catch (JMSException e) { + e.printStackTrace(); + } + } + + if (counter.incrementAndGet() == totalMessageCount) { + + finishLatch.countDown(); + + } + } + }; + + int consumerCount = PRIORITIES.length; + Connection[] connections = new Connection[consumerCount]; + Session[] sessions = new Session[consumerCount]; + MessageConsumer[] consumers = new MessageConsumer[consumerCount]; + + for (int i = 0; i < consumerCount; i++) { + String selector = "priority = " + PRIORITIES[i]; + + connections[i] = connectionFactory.createConnection(); + sessions[i] = connections[i].createSession(false, Session.AUTO_ACKNOWLEDGE); + + consumers[i] = sessions[i].createConsumer(destination, selector); + consumers[i].setMessageListener(listener); + } + + for (Connection connection : connections) { + connection.start(); + } + + log.info("received " + counter.get() + " messages"); + + assertTrue("got all messages in time", finishLatch.await(60, TimeUnit.SECONDS)); + + log.info("received " + counter.get() + " messages"); + + for (MessageConsumer consumer : consumers) { + consumer.close(); + } + + for (Session session : sessions) { + session.close(); + } + + for (Connection connection : connections) { + connection.close(); + } + } + + } + +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ1917Test.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ1917Test.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ1917Test.java new file mode 100644 index 0000000..f896342 --- /dev/null +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ1917Test.java @@ -0,0 +1,228 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.bugs; + +import junit.framework.TestCase; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; + +import javax.jms.Connection; +import javax.jms.DeliveryMode; +import javax.jms.Destination; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Session; +import javax.jms.TextMessage; + +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.command.ActiveMQDestination; + + +public class AMQ1917Test extends TestCase { + + private static final int NUM_MESSAGES = 4000; + private static final int NUM_THREADS = 10; + private static final String REQUEST_QUEUE = "mock.in.queue"; + private static final String REPLY_QUEUE = "mock.out.queue"; + + private Destination requestDestination = ActiveMQDestination.createDestination( + REQUEST_QUEUE, ActiveMQDestination.QUEUE_TYPE); + private Destination replyDestination = ActiveMQDestination.createDestination( + REPLY_QUEUE, ActiveMQDestination.QUEUE_TYPE); + + private CountDownLatch roundTripLatch = new CountDownLatch(NUM_MESSAGES); + private CountDownLatch errorLatch = new CountDownLatch(1); + private ThreadPoolExecutor tpe; + private final String BROKER_URL = "tcp://localhost:0"; + private String connectionUri; + private BrokerService broker = null; + private boolean working = true; + + // trival session/producer pool + final Session[] sessions = new Session[NUM_THREADS]; + final MessageProducer[] producers = new MessageProducer[NUM_THREADS]; + + public void setUp() throws Exception { + broker = new BrokerService(); + broker.setPersistent(false); + broker.addConnector(BROKER_URL); + broker.start(); + + connectionUri = broker.getTransportConnectors().get(0).getPublishableConnectString(); + + BlockingQueue<Runnable> queue = new ArrayBlockingQueue<Runnable>(10000); + tpe = new ThreadPoolExecutor(NUM_THREADS, NUM_THREADS, 60000, + TimeUnit.MILLISECONDS, queue); + ThreadFactory limitedthreadFactory = new LimitedThreadFactory(tpe.getThreadFactory()); + tpe.setThreadFactory(limitedthreadFactory); + } + + public void tearDown() throws Exception { + broker.stop(); + tpe.shutdown(); + } + + public void testLoadedSendRecieveWithCorrelationId() throws Exception { + + ActiveMQConnectionFactory connectionFactory = new org.apache.activemq.ActiveMQConnectionFactory(); + connectionFactory.setBrokerURL(connectionUri); + Connection connection = connectionFactory.createConnection(); + setupReceiver(connection); + + connection = connectionFactory.createConnection(); + connection.start(); + + // trival session/producer pool + for (int i=0; i<NUM_THREADS; i++) { + sessions[i] = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + producers[i] = sessions[i].createProducer(requestDestination); + } + + for (int i = 0; i < NUM_MESSAGES; i++) { + MessageSenderReceiver msr = new MessageSenderReceiver(requestDestination, + replyDestination, "Test Message : " + i); + tpe.execute(msr); + } + + while (!roundTripLatch.await(4000, TimeUnit.MILLISECONDS)) { + if (errorLatch.await(1000, TimeUnit.MILLISECONDS)) { + fail("there was an error, check the console for thread or thread allocation failure"); + break; + } + } + working = false; + } + + private void setupReceiver(final Connection connection) throws Exception { + + final Session session = connection.createSession(false, + Session.AUTO_ACKNOWLEDGE); + final MessageConsumer consumer = session + .createConsumer(requestDestination); + final MessageProducer sender = session.createProducer(replyDestination); + connection.start(); + + new Thread() { + public void run() { + while (working) { + // wait for messages in infinitive loop + // time out is set to show the client is awaiting + try { + TextMessage msg = (TextMessage) consumer.receive(20000); + if (msg == null) { + errorLatch.countDown(); + fail("Response timed out." + + " latchCount=" + roundTripLatch.getCount()); + } else { + String result = msg.getText(); + //System.out.println("Request:" + (i++) + // + ", msg=" + result + ", ID" + msg.getJMSMessageID()); + TextMessage response = session.createTextMessage(); + response.setJMSCorrelationID(msg.getJMSMessageID()); + response.setText(result); + sender.send(response); + } + } catch (JMSException e) { + if (working) { + errorLatch.countDown(); + fail("Unexpected exception:" + e); + } + } + } + } + }.start(); + } + + class MessageSenderReceiver implements Runnable { + + Destination reqDest; + Destination replyDest; + String origMsg; + + public MessageSenderReceiver(Destination reqDest, + Destination replyDest, String msg) throws Exception { + this.replyDest = replyDest; + this.reqDest = reqDest; + this.origMsg = msg; + } + + private int getIndexFromCurrentThread() { + String name = Thread.currentThread().getName(); + String num = name.substring(name.lastIndexOf('-') +1); + int idx = Integer.parseInt(num) -1; + assertTrue("idx is in range: idx=" + idx, idx < NUM_THREADS); + return idx; + } + + public void run() { + try { + // get thread session and producer from pool + int threadIndex = getIndexFromCurrentThread(); + Session session = sessions[threadIndex]; + MessageProducer producer = producers[threadIndex]; + + final Message sendJmsMsg = session.createTextMessage(origMsg); + producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); + producer.send(sendJmsMsg); + + String jmsId = sendJmsMsg.getJMSMessageID(); + String selector = "JMSCorrelationID='" + jmsId + "'"; + + MessageConsumer consumer = session.createConsumer(replyDest, + selector); + Message receiveJmsMsg = consumer.receive(2000); + consumer.close(); + if (receiveJmsMsg == null) { + errorLatch.countDown(); + fail("Unable to receive response for:" + origMsg + + ", with selector=" + selector); + } else { + //System.out.println("received response message :" + // + ((TextMessage) receiveJmsMsg).getText() + // + " with selector : " + selector); + roundTripLatch.countDown(); + } + } catch (JMSException e) { + fail("unexpected exception:" + e); + } + } + } + + public class LimitedThreadFactory implements ThreadFactory { + int threadCount; + private ThreadFactory factory; + public LimitedThreadFactory(ThreadFactory threadFactory) { + this.factory = threadFactory; + } + + public Thread newThread(Runnable arg0) { + if (++threadCount > NUM_THREADS) { + errorLatch.countDown(); + fail("too many threads requested"); + } + return factory.newThread(arg0); + } + } + } + http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ1936Test.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ1936Test.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ1936Test.java new file mode 100644 index 0000000..2c86562 --- /dev/null +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ1936Test.java @@ -0,0 +1,310 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.bugs; + +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; + +import javax.jms.DeliveryMode; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.Queue; +import javax.jms.QueueConnection; +import javax.jms.QueueConnectionFactory; +import javax.jms.QueueReceiver; +import javax.jms.QueueSender; +import javax.jms.QueueSession; +import javax.jms.TextMessage; +import javax.naming.NamingException; + +import junit.framework.TestCase; + +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.AutoFailTestSupport; +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.util.Wait; +import org.apache.log4j.Logger; + +/** + * A AMQ1936Test + * + */ +public class AMQ1936Test extends TestCase { + private final static Logger logger = Logger.getLogger(AMQ1936Test.class); + private final static String TEST_QUEUE_NAME = "dynamicQueues/duplicate.message.test.queue"; + // //-- + // + private final static long TEST_MESSAGE_COUNT = 6000; // The number of test messages to use + // + // //-- + private final static int CONSUMER_COUNT = 2; // The number of message receiver instances + private final static boolean TRANSACTED_RECEIVE = true; // Flag used by receiver which indicates messages should be + // processed within a JMS transaction + + private final ThreadPoolExecutor threadPool = new ThreadPoolExecutor(CONSUMER_COUNT, CONSUMER_COUNT, Long.MAX_VALUE, TimeUnit.SECONDS, + new LinkedBlockingQueue<Runnable>()); + private final ThreadedMessageReceiver[] receivers = new ThreadedMessageReceiver[CONSUMER_COUNT]; + private BrokerService broker = null; + static QueueConnectionFactory connectionFactory = null; + + @Override + protected void setUp() throws Exception { + super.setUp(); + + broker = new BrokerService(); + broker.getSystemUsage().getMemoryUsage().setLimit(5 * 1024 * 1024); + broker.setBrokerName("test"); + broker.setDeleteAllMessagesOnStartup(true); + broker.start(); + connectionFactory = new ActiveMQConnectionFactory("vm://test"); + ; + } + + @Override + protected void tearDown() throws Exception { + super.tearDown(); + + if (threadPool != null) { + // signal receivers to stop + for (ThreadedMessageReceiver receiver : receivers) { + receiver.setShouldStop(true); + } + + logger.info("Waiting for receivers to shutdown.."); + if (!threadPool.awaitTermination(10, TimeUnit.SECONDS)) { + logger.warn("Not all receivers completed shutdown."); + } else { + logger.info("All receivers shutdown successfully.."); + } + } + + logger.debug("Stoping the broker."); + + if (broker != null) { + broker.stop(); + } + } + + private void sendTextMessage(String queueName, int i) throws JMSException, NamingException { + QueueConnectionFactory connectionFactory = new ActiveMQConnectionFactory("vm://test"); + QueueConnection queueConnection = null; + QueueSession session = null; + QueueSender sender = null; + Queue queue = null; + TextMessage message = null; + + try { + + // Create the queue connection + queueConnection = connectionFactory.createQueueConnection(); + + session = queueConnection.createQueueSession(false, QueueSession.AUTO_ACKNOWLEDGE); + queue = session.createQueue(TEST_QUEUE_NAME); + sender = session.createSender(queue); + sender.setDeliveryMode(DeliveryMode.PERSISTENT); + + message = session.createTextMessage(String.valueOf(i)); + + // send the message + sender.send(message); + + if (session.getTransacted()) { + session.commit(); + } + if (i % 1000 == 0) { + logger.info("Message successfully sent to : " + queue.getQueueName() + " messageid: " + message.getJMSMessageID() + " content:" + + message.getText()); + } + } finally { + if (sender != null) { + sender.close(); + } + if (session != null) { + session.close(); + } + if (queueConnection != null) { + queueConnection.close(); + } + } + } + + public void testForDuplicateMessages() throws Exception { + final ConcurrentHashMap<String, String> messages = new ConcurrentHashMap<String, String>(); + final Object lock = new Object(); + final CountDownLatch duplicateSignal = new CountDownLatch(1); + final AtomicInteger messageCount = new AtomicInteger(0); + + // add 1/2 the number of our total messages + for (int i = 0; i < TEST_MESSAGE_COUNT / 2; i++) { + if (duplicateSignal.getCount() == 0) { + fail("Duplicate message id detected"); + } + sendTextMessage(TEST_QUEUE_NAME, i); + } + + // create a number of consumers to read of the messages and start them with a handler which simply stores the + // message ids + // in a Map and checks for a duplicate + for (int i = 0; i < CONSUMER_COUNT; i++) { + receivers[i] = new ThreadedMessageReceiver(TEST_QUEUE_NAME, new IMessageHandler() { + + @Override + public void onMessage(Message message) throws Exception { + synchronized (lock) { + int current = messageCount.incrementAndGet(); + if (current % 1000 == 0) { + logger.info("Received message:" + message.getJMSMessageID() + " with content: " + ((TextMessage) message).getText()); + } + if (messages.containsKey(message.getJMSMessageID())) { + duplicateSignal.countDown(); + logger.fatal("duplicate message id detected:" + message.getJMSMessageID()); + fail("Duplicate message id detected:" + message.getJMSMessageID()); + } else { + messages.put(message.getJMSMessageID(), message.getJMSMessageID()); + } + } + } + }); + threadPool.submit(receivers[i]); + } + + // starting adding the remaining messages + for (int i = 0; i < TEST_MESSAGE_COUNT / 2; i++) { + if (duplicateSignal.getCount() == 0) { + fail("Duplicate message id detected"); + } + sendTextMessage(TEST_QUEUE_NAME, i); + } + + logger.info("sent all " + TEST_MESSAGE_COUNT + " messages"); + + // allow some time for messages to be delivered to receivers. + boolean ok = Wait.waitFor(new Wait.Condition() { + @Override + public boolean isSatisified() throws Exception { + return TEST_MESSAGE_COUNT == messages.size(); + } + }, TimeUnit.MINUTES.toMillis(7)); + if (!ok) { + AutoFailTestSupport.dumpAllThreads("--STUCK?--"); + } + assertEquals("Number of messages received does not match the number sent", TEST_MESSAGE_COUNT, messages.size()); + assertEquals(TEST_MESSAGE_COUNT, messageCount.get()); + } + + private final static class ThreadedMessageReceiver implements Runnable { + + private IMessageHandler handler = null; + private final AtomicBoolean shouldStop = new AtomicBoolean(false); + + public ThreadedMessageReceiver(String queueName, IMessageHandler handler) { + this.handler = handler; + } + + @Override + public void run() { + + QueueConnection queueConnection = null; + QueueSession session = null; + QueueReceiver receiver = null; + Queue queue = null; + Message message = null; + try { + try { + + queueConnection = connectionFactory.createQueueConnection(); + // create a transacted session + session = queueConnection.createQueueSession(TRANSACTED_RECEIVE, QueueSession.AUTO_ACKNOWLEDGE); + queue = session.createQueue(TEST_QUEUE_NAME); + receiver = session.createReceiver(queue); + + // start the connection + queueConnection.start(); + + logger.info("Receiver " + Thread.currentThread().getName() + " connected."); + + // start receive loop + while (!(shouldStop.get() || Thread.currentThread().isInterrupted())) { + try { + message = receiver.receive(200); + } catch (Exception e) { + // + // ignore interrupted exceptions + // + if (e instanceof InterruptedException || e.getCause() instanceof InterruptedException) { + /* ignore */ + } else { + throw e; + } + } + + if (message != null && this.handler != null) { + this.handler.onMessage(message); + } + + // commit session on successful handling of message + if (session.getTransacted()) { + session.commit(); + } + } + + logger.info("Receiver " + Thread.currentThread().getName() + " shutting down."); + + } finally { + if (receiver != null) { + try { + receiver.close(); + } catch (JMSException e) { + logger.warn(e); + } + } + if (session != null) { + try { + session.close(); + } catch (JMSException e) { + logger.warn(e); + } + } + if (queueConnection != null) { + queueConnection.close(); + } + } + } catch (JMSException e) { + logger.error(e); + e.printStackTrace(); + } catch (NamingException e) { + logger.error(e); + } catch (Exception e) { + logger.error(e); + e.printStackTrace(); + } + } + + public void setShouldStop(Boolean shouldStop) { + this.shouldStop.set(shouldStop); + } + } + + public interface IMessageHandler { + void onMessage(Message message) throws Exception; + } +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2021Test.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2021Test.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2021Test.java new file mode 100644 index 0000000..9abab3f --- /dev/null +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2021Test.java @@ -0,0 +1,267 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.bugs; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +import java.lang.Thread.UncaughtExceptionHandler; +import java.util.ArrayList; +import java.util.Vector; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +import javax.jms.Connection; +import javax.jms.ExceptionListener; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageListener; +import javax.jms.MessageProducer; +import javax.jms.Session; +import javax.jms.TextMessage; + +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.command.ActiveMQQueue; +import org.apache.activemq.command.ActiveMQTopic; +import org.junit.After; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TestName; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * This is a test case for the issue reported at: https://issues.apache.org/activemq/browse/AMQ-2021 Bug is modification + * of inflight message properties so the failure can manifest itself in a bunch or ways, from message receipt with null + * properties to marshall errors + */ +public class AMQ2021Test implements ExceptionListener, UncaughtExceptionHandler { + + private static final Logger log = LoggerFactory.getLogger(AMQ2021Test.class); + BrokerService brokerService; + ArrayList<Thread> threads = new ArrayList<Thread>(); + Vector<Throwable> exceptions; + + @Rule + public TestName name = new TestName(); + + AMQ2021Test testCase; + + private final String ACTIVEMQ_BROKER_BIND = "tcp://localhost:0"; + private String CONSUMER_BROKER_URL = "?jms.redeliveryPolicy.maximumRedeliveries=1&jms.redeliveryPolicy.initialRedeliveryDelay=0"; + private String PRODUCER_BROKER_URL; + + private final int numMessages = 1000; + private final int numConsumers = 2; + private final int dlqMessages = numMessages / 2; + + private CountDownLatch receivedLatch; + private ActiveMQTopic destination; + private CountDownLatch started; + + @Before + public void setUp() throws Exception { + Thread.setDefaultUncaughtExceptionHandler(this); + testCase = this; + + // Start an embedded broker up. + brokerService = new BrokerService(); + brokerService.setDeleteAllMessagesOnStartup(true); + brokerService.addConnector(ACTIVEMQ_BROKER_BIND); + brokerService.start(); + destination = new ActiveMQTopic(name.getMethodName()); + exceptions = new Vector<Throwable>(); + + CONSUMER_BROKER_URL = brokerService.getTransportConnectors().get(0).getPublishableConnectString() + CONSUMER_BROKER_URL; + PRODUCER_BROKER_URL = brokerService.getTransportConnectors().get(0).getPublishableConnectString(); + + receivedLatch = new CountDownLatch(numConsumers * (numMessages + dlqMessages)); + started = new CountDownLatch(1); + } + + @After + public void tearDown() throws Exception { + for (Thread t : threads) { + t.interrupt(); + t.join(); + } + brokerService.stop(); + } + + @Test(timeout=240000) + public void testConcurrentTopicResendToDLQ() throws Exception { + + for (int i = 0; i < numConsumers; i++) { + ConsumerThread c1 = new ConsumerThread("Consumer-" + i); + threads.add(c1); + c1.start(); + } + + assertTrue(started.await(10, TimeUnit.SECONDS)); + + Thread producer = new Thread() { + @Override + public void run() { + try { + produce(numMessages); + } catch (Exception e) { + } + } + }; + threads.add(producer); + producer.start(); + + boolean allGood = receivedLatch.await(90, TimeUnit.SECONDS); + for (Throwable t : exceptions) { + log.error("failing test with first exception", t); + fail("exception during test : " + t); + } + assertTrue("excepted messages received within time limit", allGood); + + assertEquals(0, exceptions.size()); + + for (int i = 0; i < numConsumers; i++) { + // last recovery sends message to deq so is not received again + assertEquals(dlqMessages * 2, ((ConsumerThread) threads.get(i)).recoveries); + assertEquals(numMessages + dlqMessages, ((ConsumerThread) threads.get(i)).counter); + } + + // half of the messages for each consumer should go to the dlq but duplicates will + // be suppressed + consumeFromDLQ(dlqMessages); + + } + + private void consumeFromDLQ(int messageCount) throws Exception { + ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(CONSUMER_BROKER_URL); + Connection connection = connectionFactory.createConnection(); + connection.start(); + + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + MessageConsumer dlqConsumer = session.createConsumer(new ActiveMQQueue("ActiveMQ.DLQ")); + int count = 0; + for (int i = 0; i < messageCount; i++) { + if (dlqConsumer.receive(1000) == null) { + break; + } + count++; + } + assertEquals(messageCount, count); + } + + public void produce(int count) throws Exception { + Connection connection = null; + try { + ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(PRODUCER_BROKER_URL); + connection = factory.createConnection(); + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + MessageProducer producer = session.createProducer(destination); + producer.setTimeToLive(0); + connection.start(); + + for (int i = 0; i < count; i++) { + int id = i + 1; + TextMessage message = session.createTextMessage(name.getMethodName() + " Message " + id); + message.setIntProperty("MsgNumber", id); + producer.send(message); + + if (id % 500 == 0) { + log.info("sent " + id + ", ith " + message); + } + } + } catch (JMSException e) { + log.error("unexpected ex on produce", e); + exceptions.add(e); + } finally { + try { + if (connection != null) { + connection.close(); + } + } catch (Throwable e) { + } + } + } + + public class ConsumerThread extends Thread implements MessageListener { + public long counter = 0; + public long recoveries = 0; + private Session session; + + public ConsumerThread(String threadId) { + super(threadId); + } + + @Override + public void run() { + try { + ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(CONSUMER_BROKER_URL); + Connection connection = connectionFactory.createConnection(); + connection.setExceptionListener(testCase); + connection.setClientID(getName()); + session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE); + MessageConsumer consumer = session.createDurableSubscriber(destination, getName()); + consumer.setMessageListener(this); + connection.start(); + + started.countDown(); + + } catch (JMSException exception) { + log.error("unexpected ex in consumer run", exception); + exceptions.add(exception); + } + } + + @Override + public void onMessage(Message message) { + try { + counter++; + int messageNumber = message.getIntProperty("MsgNumber"); + if (messageNumber % 2 == 0) { + session.recover(); + recoveries++; + } else { + message.acknowledge(); + } + + if (counter % 200 == 0) { + log.info("recoveries:" + recoveries + ", Received " + counter + ", counter'th " + message); + } + receivedLatch.countDown(); + } catch (Exception e) { + log.error("unexpected ex on onMessage", e); + exceptions.add(e); + } + } + + } + + @Override + public void onException(JMSException exception) { + log.info("Unexpected JMSException", exception); + exceptions.add(exception); + } + + @Override + public void uncaughtException(Thread thread, Throwable exception) { + log.info("Unexpected exception from thread " + thread + ", ex: " + exception); + exceptions.add(exception); + } +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2084Test.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2084Test.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2084Test.java new file mode 100644 index 0000000..1f31864 --- /dev/null +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ2084Test.java @@ -0,0 +1,181 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.bugs; + +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +import java.util.Properties; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +import javax.jms.Message; +import javax.jms.MessageListener; +import javax.jms.Queue; +import javax.jms.QueueConnection; +import javax.jms.QueueConnectionFactory; +import javax.jms.QueueReceiver; +import javax.jms.QueueSession; +import javax.jms.Session; +import javax.jms.TextMessage; +import javax.jms.Topic; +import javax.jms.TopicConnection; +import javax.jms.TopicConnectionFactory; +import javax.jms.TopicPublisher; +import javax.jms.TopicSession; +import javax.jms.TopicSubscriber; +import javax.naming.InitialContext; + +import org.apache.activemq.broker.BrokerService; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class AMQ2084Test { + + private static final Logger LOG = LoggerFactory.getLogger(AMQ2084Test.class); + BrokerService broker; + CountDownLatch qreceived; + String connectionUri; + + @Before + public void startBroker() throws Exception { + broker = new BrokerService(); + broker.setPersistent(false); + connectionUri = broker.addConnector("tcp://localhost:0").getPublishableConnectString(); + broker.start(); + + qreceived = new CountDownLatch(1); + } + + @After + public void stopBroker() throws Exception { + if (broker != null) { + broker.stop(); + } + } + + public void listenQueue(final String queueName, final String selectors) { + try { + Properties props = new Properties(); + props.put("java.naming.factory.initial", "org.apache.activemq.jndi.ActiveMQInitialContextFactory"); + props.put("java.naming.provider.url", connectionUri); + props.put("queue.queueName", queueName); + + javax.naming.Context ctx = new InitialContext(props); + QueueConnectionFactory factory = (QueueConnectionFactory) ctx.lookup("ConnectionFactory"); + QueueConnection conn = factory.createQueueConnection(); + final Queue queue = (Queue) ctx.lookup("queueName"); + QueueSession session = conn.createQueueSession(false, Session.AUTO_ACKNOWLEDGE); + QueueReceiver receiver = session.createReceiver(queue, selectors); + System.out.println("Message Selector: " + receiver.getMessageSelector()); + receiver.setMessageListener(new MessageListener() { + public void onMessage(Message message) { + try { + if (message instanceof TextMessage) { + TextMessage txtMsg = (TextMessage) message; + String msg = txtMsg.getText(); + LOG.info("Queue Message Received: " + queueName + " - " + msg); + qreceived.countDown(); + + } + message.acknowledge(); + } catch (Throwable e) { + e.printStackTrace(); + } + } + }); + conn.start(); + } catch (Exception e) { + e.printStackTrace(); + } + } + + public void listenTopic(final String topicName, final String selectors) { + try { + Properties props = new Properties(); + props.put("java.naming.factory.initial", "org.apache.activemq.jndi.ActiveMQInitialContextFactory"); + props.put("java.naming.provider.url", connectionUri); + props.put("topic.topicName", topicName); + + javax.naming.Context ctx = new InitialContext(props); + TopicConnectionFactory factory = (TopicConnectionFactory) ctx.lookup("ConnectionFactory"); + TopicConnection conn = factory.createTopicConnection(); + final Topic topic = (Topic) ctx.lookup("topicName"); + TopicSession session = conn.createTopicSession(false, Session.AUTO_ACKNOWLEDGE); + TopicSubscriber receiver = session.createSubscriber(topic, selectors, false); + + receiver.setMessageListener(new MessageListener() { + public void onMessage(Message message) { + try { + if (message instanceof TextMessage) { + TextMessage txtMsg = (TextMessage) message; + String msg = txtMsg.getText(); + LOG.info("Topic Message Received: " + topicName + " - " + msg); + } + message.acknowledge(); + } catch (Exception e) { + e.printStackTrace(); + } + } + }); + conn.start(); + } catch (Exception e) { + e.printStackTrace(); + } + } + + public void publish(String topicName, String message) { + try { + Properties props = new Properties(); + props.put("java.naming.factory.initial", "org.apache.activemq.jndi.ActiveMQInitialContextFactory"); + props.put("java.naming.provider.url", connectionUri); + props.put("topic.topicName", topicName); + javax.naming.Context ctx = new InitialContext(props); + TopicConnectionFactory factory = (TopicConnectionFactory) ctx.lookup("ConnectionFactory"); + TopicConnection conn = factory.createTopicConnection(); + Topic topic = (Topic) ctx.lookup("topicName"); + TopicSession session = conn.createTopicSession(false, Session.AUTO_ACKNOWLEDGE); + TopicPublisher publisher = session.createPublisher(topic); + if (message != null) { + Message msg = session.createTextMessage(message); + publisher.send(msg); + } + } catch (Exception e) { + e.printStackTrace(); + } + } + + @Test + public void tryXpathSelectorMatch() throws Exception { + String xPath = "XPATH '//books//book[@lang=''en'']'"; + listenQueue("Consumer.Sample.VirtualTopic.TestXpath", xPath); + publish("VirtualTopic.TestXpath", "<?xml version=\"1.0\" encoding=\"UTF-8\"?><books><book lang=\"en\">ABC</book></books>"); + assertTrue("topic received: ", qreceived.await(20, TimeUnit.SECONDS)); + } + + @Test + public void tryXpathSelectorNoMatch() throws Exception { + String xPath = "XPATH '//books//book[@lang=''es'']'"; + listenQueue("Consumer.Sample.VirtualTopic.TestXpath", xPath); + publish("VirtualTopic.TestXpath", "<?xml version=\"1.0\" encoding=\"UTF-8\"?><books><book lang=\"en\">ABC</book></books>"); + assertFalse("topic did not receive unmatched", qreceived.await(5, TimeUnit.SECONDS)); + } + +}
