http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JmsQueueWildcardSendReceiveTest.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JmsQueueWildcardSendReceiveTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JmsQueueWildcardSendReceiveTest.java new file mode 100644 index 0000000..b689664 --- /dev/null +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JmsQueueWildcardSendReceiveTest.java @@ -0,0 +1,174 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq; + +import javax.jms.DeliveryMode; +import javax.jms.Destination; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Session; +import javax.jms.TextMessage; + +import org.apache.activemq.command.ActiveMQDestination; +import org.apache.activemq.test.JmsTopicSendReceiveTest; + +/** + * + */ +public class JmsQueueWildcardSendReceiveTest extends JmsTopicSendReceiveTest { + + private String destination1String = "TEST.ONE.ONE"; + private String destination2String = "TEST.ONE.ONE.ONE"; + private String destination3String = "TEST.ONE.TWO"; + private String destination4String = "TEST.TWO.ONE"; + + /** + * Sets a test to have a queue destination and non-persistent delivery mode. + * + * @see junit.framework.TestCase#setUp() + */ + protected void setUp() throws Exception { + topic = false; + deliveryMode = DeliveryMode.NON_PERSISTENT; + super.setUp(); + } + + /** + * Returns the consumer subject. + * + * @return String - consumer subject + * @see org.apache.activemq.test.TestSupport#getConsumerSubject() + */ + protected String getConsumerSubject() { + return "FOO.>"; + } + + /** + * Returns the producer subject. + * + * @return String - producer subject + * @see org.apache.activemq.test.TestSupport#getProducerSubject() + */ + protected String getProducerSubject() { + return "FOO.BAR.HUMBUG"; + } + + public void testReceiveWildcardQueueEndAsterisk() throws Exception { + connection.start(); + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + + ActiveMQDestination destination1 = (ActiveMQDestination)session.createQueue(destination1String); + ActiveMQDestination destination3 = (ActiveMQDestination)session.createQueue(destination3String); + + Message m = null; + MessageConsumer consumer = null; + String text = null; + + sendMessage(session, destination1, destination1String); + sendMessage(session, destination3, destination3String); + ActiveMQDestination destination6 = (ActiveMQDestination)session.createQueue("TEST.ONE.*"); + consumer = session.createConsumer(destination6); + m = consumer.receive(1000); + assertNotNull(m); + text = ((TextMessage)m).getText(); + if (!(text.equals(destination1String) || text.equals(destination3String))) { + fail("unexpected message:" + text); + } + m = consumer.receive(1000); + assertNotNull(m); + text = ((TextMessage)m).getText(); + if (!(text.equals(destination1String) || text.equals(destination3String))) { + fail("unexpected message:" + text); + } + assertNull(consumer.receiveNoWait()); + } + + public void testReceiveWildcardQueueEndGreaterThan() throws Exception { + connection.start(); + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + + ActiveMQDestination destination1 = (ActiveMQDestination)session.createQueue(destination1String); + ActiveMQDestination destination2 = (ActiveMQDestination)session.createQueue(destination2String); + ActiveMQDestination destination3 = (ActiveMQDestination)session.createQueue(destination3String); + + Message m = null; + MessageConsumer consumer = null; + String text = null; + + sendMessage(session, destination1, destination1String); + sendMessage(session, destination2, destination2String); + sendMessage(session, destination3, destination3String); + ActiveMQDestination destination7 = (ActiveMQDestination)session.createQueue("TEST.ONE.>"); + consumer = session.createConsumer(destination7); + m = consumer.receive(1000); + assertNotNull(m); + text = ((TextMessage)m).getText(); + if (!(text.equals(destination1String) || text.equals(destination2String) || text.equals(destination3String))) { + fail("unexpected message:" + text); + } + m = consumer.receive(1000); + assertNotNull(m); + if (!(text.equals(destination1String) || text.equals(destination2String) || text.equals(destination3String))) { + fail("unexpected message:" + text); + } + m = consumer.receive(1000); + assertNotNull(m); + if (!(text.equals(destination1String) || text.equals(destination2String) || text.equals(destination3String))) { + fail("unexpected message:" + text); + } + assertNull(consumer.receiveNoWait()); + } + + public void testReceiveWildcardQueueMidAsterisk() throws Exception { + connection.start(); + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + + ActiveMQDestination destination1 = (ActiveMQDestination)session.createQueue(destination1String); + ActiveMQDestination destination4 = (ActiveMQDestination)session.createQueue(destination4String); + + Message m = null; + MessageConsumer consumer = null; + String text = null; + + sendMessage(session, destination1, destination1String); + sendMessage(session, destination4, destination4String); + ActiveMQDestination destination8 = (ActiveMQDestination)session.createQueue("TEST.*.ONE"); + consumer = session.createConsumer(destination8); + m = consumer.receive(1000); + assertNotNull(m); + text = ((TextMessage)m).getText(); + if (!(text.equals(destination1String) || text.equals(destination4String))) { + fail("unexpected message:" + text); + } + m = consumer.receive(1000); + assertNotNull(m); + text = ((TextMessage)m).getText(); + if (!(text.equals(destination1String) || text.equals(destination4String))) { + fail("unexpected message:" + text); + } + assertNull(consumer.receiveNoWait()); + + } + + private void sendMessage(Session session, Destination destination, String text) throws JMSException { + MessageProducer producer = session.createProducer(destination); + producer.send(session.createTextMessage(text)); + producer.close(); + } +}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JmsRedeliveredTest.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JmsRedeliveredTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JmsRedeliveredTest.java new file mode 100644 index 0000000..e5d90d6 --- /dev/null +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JmsRedeliveredTest.java @@ -0,0 +1,562 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq; + +import java.util.concurrent.TimeUnit; +import javax.jms.Connection; +import javax.jms.DeliveryMode; +import javax.jms.Destination; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Queue; +import javax.jms.Session; +import javax.jms.TextMessage; +import javax.jms.Topic; + +import junit.framework.Test; +import junit.framework.TestCase; +import junit.framework.TestSuite; +import org.apache.activemq.transport.vm.VMTransport; +import org.apache.activemq.util.Wait; + +/** + * + */ +public class JmsRedeliveredTest extends TestCase { + + private Connection connection; + + /* + * (non-Javadoc) + * + * @see junit.framework.TestCase#setUp() + */ + protected void setUp() throws Exception { + connection = createConnection(); + } + + /** + * @see junit.framework.TestCase#tearDown() + */ + protected void tearDown() throws Exception { + if (connection != null) { + connection.close(); + connection = null; + } + } + + /** + * Creates a connection. + * + * @return connection + * @throws Exception + */ + protected Connection createConnection() throws Exception { + ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory( + "vm://localhost?broker.persistent=false"); + return factory.createConnection(); + } + + /** + * Tests if a message unacknowledged message gets to be resent when the + * session is closed and then a new consumer session is created. + * + */ + public void testQueueSessionCloseMarksMessageRedelivered() throws JMSException { + connection.start(); + + Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE); + Queue queue = session.createQueue("queue-" + getName()); + MessageProducer producer = createProducer(session, queue); + producer.send(createTextMessage(session)); + + // Consume the message... + MessageConsumer consumer = session.createConsumer(queue); + Message msg = consumer.receive(1000); + assertNotNull(msg); + assertFalse("Message should not be redelivered.", msg.getJMSRedelivered()); + // Don't ack the message. + + // Reset the session. This should cause the Unacked message to be + // redelivered. + session.close(); + session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE); + + // Attempt to Consume the message... + consumer = session.createConsumer(queue); + msg = consumer.receive(2000); + assertNotNull(msg); + assertTrue("Message should be redelivered.", msg.getJMSRedelivered()); + msg.acknowledge(); + + session.close(); + } + + + + public void testQueueSessionCloseMarksUnAckedMessageRedelivered() throws JMSException { + connection.start(); + + Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE); + Queue queue = session.createQueue("queue-" + getName()); + MessageProducer producer = createProducer(session, queue); + producer.send(createTextMessage(session, "1")); + producer.send(createTextMessage(session, "2")); + + // Consume the message... + MessageConsumer consumer = session.createConsumer(queue); + Message msg = consumer.receive(1000); + assertNotNull(msg); + assertFalse("Message should not be redelivered.", msg.getJMSRedelivered()); + assertEquals("1", ((TextMessage)msg).getText()); + msg.acknowledge(); + + // Don't ack the message. + msg = consumer.receive(1000); + assertNotNull(msg); + assertFalse("Message should not be redelivered.", msg.getJMSRedelivered()); + assertEquals("2", ((TextMessage)msg).getText()); + + // Reset the session. This should cause the Unacked message to be + // redelivered. + session.close(); + session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE); + + // Attempt to Consume the message... + consumer = session.createConsumer(queue); + msg = consumer.receive(2000); + assertNotNull(msg); + assertEquals("2", ((TextMessage)msg).getText()); + assertTrue("Message should be redelivered.", msg.getJMSRedelivered()); + msg.acknowledge(); + + session.close(); + } + + /** + * Tests session recovery and that the redelivered message is marked as + * such. Session uses client acknowledgement, the destination is a queue. + * + * @throws JMSException + */ + public void testQueueRecoverMarksMessageRedelivered() throws JMSException { + connection.start(); + + Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE); + Queue queue = session.createQueue("queue-" + getName()); + MessageProducer producer = createProducer(session, queue); + producer.send(createTextMessage(session)); + + // Consume the message... + MessageConsumer consumer = session.createConsumer(queue); + Message msg = consumer.receive(1000); + assertNotNull(msg); + assertFalse("Message should not be redelivered.", msg.getJMSRedelivered()); + // Don't ack the message. + + // Reset the session. This should cause the Unacked message to be + // redelivered. + session.recover(); + + // Attempt to Consume the message... + msg = consumer.receive(2000); + assertNotNull(msg); + assertTrue("Message should be redelivered.", msg.getJMSRedelivered()); + msg.acknowledge(); + + session.close(); + } + + /** + * Tests rollback message to be marked as redelivered. Session uses client + * acknowledgement and the destination is a queue. + * + * @throws JMSException + */ + public void testQueueRollbackMarksMessageRedelivered() throws JMSException { + connection.start(); + + Session session = connection.createSession(true, Session.CLIENT_ACKNOWLEDGE); + Queue queue = session.createQueue("queue-" + getName()); + MessageProducer producer = createProducer(session, queue); + producer.send(createTextMessage(session)); + session.commit(); + + // Get the message... Should not be redelivered. + MessageConsumer consumer = session.createConsumer(queue); + Message msg = consumer.receive(1000); + assertNotNull(msg); + assertFalse("Message should not be redelivered.", msg.getJMSRedelivered()); + + // Rollback.. should cause redelivery. + session.rollback(); + + // Attempt to Consume the message... + msg = consumer.receive(2000); + assertNotNull(msg); + assertTrue("Message should be redelivered.", msg.getJMSRedelivered()); + + session.commit(); + session.close(); + } + + /** + * Tests if the message gets to be re-delivered when the session closes and + * that the re-delivered message is marked as such. Session uses client + * acknowledgment, the destination is a topic and the consumer is a durable + * subscriber. + * + * @throws JMSException + */ + public void testDurableTopicSessionCloseMarksMessageRedelivered() throws JMSException { + connection.setClientID(getName()); + connection.start(); + + Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE); + Topic topic = session.createTopic("topic-" + getName()); + MessageConsumer consumer = session.createDurableSubscriber(topic, "sub1"); + + // This case only works with persistent messages since transient + // messages + // are dropped when the consumer goes offline. + MessageProducer producer = session.createProducer(topic); + producer.setDeliveryMode(DeliveryMode.PERSISTENT); + producer.send(createTextMessage(session)); + + // Consume the message... + Message msg = consumer.receive(1000); + assertNotNull(msg); + assertFalse("Message should not be re-delivered.", msg.getJMSRedelivered()); + // Don't ack the message. + + // Reset the session. This should cause the Unacked message to be + // re-delivered. + session.close(); + session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE); + + // Attempt to Consume the message... + consumer = session.createDurableSubscriber(topic, "sub1"); + msg = consumer.receive(2000); + assertNotNull(msg); + assertTrue("Message should be redelivered.", msg.getJMSRedelivered()); + msg.acknowledge(); + + session.close(); + } + + /** + * Tests session recovery and that the redelivered message is marked as + * such. Session uses client acknowledgement, the destination is a topic and + * the consumer is a durable suscriber. + * + * @throws JMSException + */ + public void testDurableTopicRecoverMarksMessageRedelivered() throws JMSException { + connection.setClientID(getName()); + connection.start(); + + Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE); + Topic topic = session.createTopic("topic-" + getName()); + MessageConsumer consumer = session.createDurableSubscriber(topic, "sub1"); + + MessageProducer producer = createProducer(session, topic); + producer.send(createTextMessage(session)); + + // Consume the message... + Message msg = consumer.receive(1000); + assertNotNull(msg); + assertFalse("Message should not be redelivered.", msg.getJMSRedelivered()); + // Don't ack the message. + + // Reset the session. This should cause the Unacked message to be + // redelivered. + session.recover(); + + // Attempt to Consume the message... + msg = consumer.receive(2000); + assertNotNull(msg); + assertTrue("Message should be redelivered.", msg.getJMSRedelivered()); + msg.acknowledge(); + + session.close(); + } + + /** + * Tests rollback message to be marked as redelivered. Session uses client + * acknowledgement and the destination is a topic. + * + * @throws JMSException + */ + public void testDurableTopicRollbackMarksMessageRedelivered() throws JMSException { + connection.setClientID(getName()); + connection.start(); + + Session session = connection.createSession(true, Session.CLIENT_ACKNOWLEDGE); + Topic topic = session.createTopic("topic-" + getName()); + MessageConsumer consumer = session.createDurableSubscriber(topic, "sub1"); + + MessageProducer producer = createProducer(session, topic); + producer.send(createTextMessage(session)); + session.commit(); + + // Get the message... Should not be redelivered. + Message msg = consumer.receive(1000); + assertNotNull(msg); + assertFalse("Message should not be redelivered.", msg.getJMSRedelivered()); + + // Rollback.. should cause redelivery. + session.rollback(); + + // Attempt to Consume the message... + msg = consumer.receive(2000); + assertNotNull(msg); + assertTrue("Message should be redelivered.", msg.getJMSRedelivered()); + + session.commit(); + session.close(); + } + + /** + * + * + * @throws JMSException + */ + public void testTopicRecoverMarksMessageRedelivered() throws JMSException { + + connection.setClientID(getName()); + connection.start(); + + Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE); + Topic topic = session.createTopic("topic-" + getName()); + MessageConsumer consumer = session.createConsumer(topic); + + MessageProducer producer = createProducer(session, topic); + producer.send(createTextMessage(session)); + + // Consume the message... + Message msg = consumer.receive(1000); + assertNotNull(msg); + assertFalse("Message should not be redelivered.", msg.getJMSRedelivered()); + // Don't ack the message. + + // Reset the session. This should cause the Unacked message to be + // redelivered. + session.recover(); + + // Attempt to Consume the message... + msg = consumer.receive(2000); + assertNotNull(msg); + assertTrue("Message should be redelivered.", msg.getJMSRedelivered()); + msg.acknowledge(); + + session.close(); + } + + /** + * Tests rollback message to be marked as redelivered. Session uses client + * acknowledgement and the destination is a topic. + * + * @throws JMSException + */ + public void testTopicRollbackMarksMessageRedelivered() throws JMSException { + connection.setClientID(getName()); + connection.start(); + + Session session = connection.createSession(true, Session.CLIENT_ACKNOWLEDGE); + Topic topic = session.createTopic("topic-" + getName()); + MessageConsumer consumer = session.createConsumer(topic); + + MessageProducer producer = createProducer(session, topic); + producer.send(createTextMessage(session)); + session.commit(); + + // Get the message... Should not be redelivered. + Message msg = consumer.receive(1000); + assertNotNull(msg); + assertFalse("Message should not be redelivered.", msg.getJMSRedelivered()); + + // Rollback.. should cause redelivery. + session.rollback(); + + // Attempt to Consume the message... + msg = consumer.receive(2000); + assertNotNull(msg); + assertTrue("Message should be redelivered.", msg.getJMSRedelivered()); + + session.commit(); + session.close(); + } + + public void testNoReceiveConsumerDisconnectDoesNotIncrementRedelivery() throws Exception { + connection.setClientID(getName()); + connection.start(); + + Connection keepBrokerAliveConnection = createConnection(); + keepBrokerAliveConnection.start(); + + Session session = connection.createSession(true, Session.CLIENT_ACKNOWLEDGE); + Queue queue = session.createQueue("queue-" + getName()); + final MessageConsumer consumer = session.createConsumer(queue); + + MessageProducer producer = createProducer(session, queue); + producer.send(createTextMessage(session)); + session.commit(); + + Wait.waitFor(new Wait.Condition() { + @Override + public boolean isSatisified() throws Exception { + return ((ActiveMQMessageConsumer)consumer).getMessageSize() == 1; + } + }); + + // whack the connection - like a rebalance or tcp drop + ((ActiveMQConnection)connection).getTransport().narrow(VMTransport.class).stop(); + + session = keepBrokerAliveConnection.createSession(true, Session.CLIENT_ACKNOWLEDGE); + MessageConsumer messageConsumer = session.createConsumer(queue); + Message msg = messageConsumer.receive(1000); + assertNotNull(msg); + msg.acknowledge(); + + assertFalse("Message should not be redelivered.", msg.getJMSRedelivered()); + session.close(); + keepBrokerAliveConnection.close(); + } + + public void testNoReceiveConsumerDoesNotIncrementRedelivery() throws Exception { + connection.setClientID(getName()); + connection.start(); + + Session session = connection.createSession(true, Session.CLIENT_ACKNOWLEDGE); + Queue queue = session.createQueue("queue-" + getName()); + MessageConsumer consumer = session.createConsumer(queue); + + MessageProducer producer = createProducer(session, queue); + producer.send(createTextMessage(session)); + session.commit(); + + TimeUnit.SECONDS.sleep(1); + consumer.close(); + + consumer = session.createConsumer(queue); + Message msg = consumer.receive(1000); + assertNotNull(msg); + + assertFalse("Message should not be redelivered.", msg.getJMSRedelivered()); + session.close(); + } + + public void testNoReceiveDurableConsumerDoesNotIncrementRedelivery() throws Exception { + connection.setClientID(getName()); + connection.start(); + + Session session = connection.createSession(true, Session.CLIENT_ACKNOWLEDGE); + Topic topic = session.createTopic("topic-" + getName()); + MessageConsumer consumer = session.createDurableSubscriber(topic, "sub"); + + MessageProducer producer = createProducer(session, topic); + producer.send(createTextMessage(session)); + session.commit(); + + TimeUnit.SECONDS.sleep(1); + consumer.close(); + + consumer = session.createDurableSubscriber(topic, "sub"); + Message msg = consumer.receive(1000); + assertNotNull(msg); + + assertFalse("Message should not be redelivered.", msg.getJMSRedelivered()); + session.close(); + } + + /** + * Creates a text message. + * + * @param session + * @return TextMessage. + * @throws JMSException + */ + private TextMessage createTextMessage(Session session) throws JMSException { + return createTextMessage(session, "Hello"); + } + + private TextMessage createTextMessage(Session session, String txt) throws JMSException { + return session.createTextMessage(txt); + } + + /** + * Creates a producer. + * + * @param session + * @param queue - destination. + * @return MessageProducer + * @throws JMSException + */ + private MessageProducer createProducer(Session session, Destination queue) throws JMSException { + MessageProducer producer = session.createProducer(queue); + producer.setDeliveryMode(getDeliveryMode()); + return producer; + } + + /** + * Returns delivery mode. + * + * @return int - persistent delivery mode. + */ + protected int getDeliveryMode() { + return DeliveryMode.PERSISTENT; + } + + /** + * Run the JmsRedeliverTest with the delivery mode set as persistent. + */ + public static final class PersistentCase extends JmsRedeliveredTest { + + /** + * Returns delivery mode. + * + * @return int - persistent delivery mode. + */ + protected int getDeliveryMode() { + return DeliveryMode.PERSISTENT; + } + } + + /** + * Run the JmsRedeliverTest with the delivery mode set as non-persistent. + */ + public static final class TransientCase extends JmsRedeliveredTest { + + /** + * Returns delivery mode. + * + * @return int - non-persistent delivery mode. + */ + protected int getDeliveryMode() { + return DeliveryMode.NON_PERSISTENT; + } + } + + public static Test suite() { + TestSuite suite = new TestSuite(); + suite.addTestSuite(PersistentCase.class); + suite.addTestSuite(TransientCase.class); + return suite; + } +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JmsRollbackRedeliveryTest.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JmsRollbackRedeliveryTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JmsRollbackRedeliveryTest.java new file mode 100644 index 0000000..91e29c0 --- /dev/null +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JmsRollbackRedeliveryTest.java @@ -0,0 +1,358 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.activemq; + +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicInteger; + +import javax.jms.Connection; +import javax.jms.ConnectionFactory; +import javax.jms.Destination; +import javax.jms.JMSException; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Session; +import javax.jms.TextMessage; + +import org.apache.activemq.broker.BrokerService; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.junit.Rule; +import org.junit.rules.TestName; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import static org.junit.Assert.*; + +public class JmsRollbackRedeliveryTest { + @Rule + public TestName testName = new TestName(); + + protected static final Logger LOG = LoggerFactory.getLogger(JmsRollbackRedeliveryTest.class); + final int nbMessages = 10; + final String destinationName = "Destination"; + final String brokerUrl = "vm://localhost?create=false"; + boolean consumerClose = true; + boolean rollback = true; + BrokerService broker; + + @Before + public void setUp() throws Exception { + LOG.debug("Starting " + testName.getMethodName()); + broker = new BrokerService(); + broker.setPersistent(false); + broker.setUseJmx(false); + broker.start(); + broker.waitUntilStarted(); + } + + @After + public void tearDown() throws Exception { + if (broker != null) { + broker.stop(); + broker.waitUntilStopped(); + } + LOG.debug("Finishing " + testName.getMethodName()); + Thread.sleep(100); + } + + @Test + public void testRedelivery() throws Exception { + doTestRedelivery(brokerUrl, false); + } + + @Test + public void testRedeliveryWithInterleavedProducer() throws Exception { + doTestRedelivery(brokerUrl, true); + } + + + @Test + public void testRedeliveryWithPrefetch0() throws Exception { + doTestRedelivery(brokerUrl + "?jms.prefetchPolicy.queuePrefetch=0", true); + } + + @Test + public void testRedeliveryWithPrefetch1() throws Exception { + doTestRedelivery(brokerUrl + "?jms.prefetchPolicy.queuePrefetch=1", true); + } + + public void doTestRedelivery(String brokerUrl, boolean interleaveProducer) throws Exception { + LOG.debug("entering doTestRedelivery interleaveProducer is " + interleaveProducer); + ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(brokerUrl); + + Connection connection = connectionFactory.createConnection(); + connection.start(); + + if (interleaveProducer) { + populateDestinationWithInterleavedProducer(nbMessages, destinationName, connection); + } else { + populateDestination(nbMessages, destinationName, connection); + } + + // Consume messages and rollback transactions + { + AtomicInteger received = new AtomicInteger(); + Map<String, Boolean> rolledback = new ConcurrentHashMap<String, Boolean>(); + while (received.get() < nbMessages) { + Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE); + Destination destination = session.createQueue(destinationName); + MessageConsumer consumer = session.createConsumer(destination); + TextMessage msg = (TextMessage) consumer.receive(6000000); + if (msg != null) { + if (msg != null && rolledback.put(msg.getText(), Boolean.TRUE) != null) { + LOG.info("Received message " + msg.getText() + " (" + received.getAndIncrement() + ")" + msg.getJMSMessageID()); + assertTrue(msg.getJMSRedelivered()); + assertEquals(2, msg.getLongProperty("JMSXDeliveryCount")); + session.commit(); + } else { + LOG.info("Rollback message " + msg.getText() + " id: " + msg.getJMSMessageID()); + assertFalse("should not have redelivery flag set, id: " + msg.getJMSMessageID(), msg.getJMSRedelivered()); + session.rollback(); + } + } + consumer.close(); + session.close(); + } + } + } + + @Test + public void testRedeliveryOnSingleConsumer() throws Exception { + + ConnectionFactory connectionFactory = + new ActiveMQConnectionFactory(brokerUrl); + Connection connection = connectionFactory.createConnection(); + connection.start(); + + populateDestinationWithInterleavedProducer(nbMessages, destinationName, connection); + + // Consume messages and rollback transactions + { + AtomicInteger received = new AtomicInteger(); + Map<String, Boolean> rolledback = new ConcurrentHashMap<String, Boolean>(); + Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE); + Destination destination = session.createQueue(destinationName); + MessageConsumer consumer = session.createConsumer(destination); + while (received.get() < nbMessages) { + TextMessage msg = (TextMessage) consumer.receive(6000000); + if (msg != null) { + if (msg != null && rolledback.put(msg.getText(), Boolean.TRUE) != null) { + LOG.info("Received message " + msg.getText() + " (" + received.getAndIncrement() + ")" + msg.getJMSMessageID()); + assertTrue(msg.getJMSRedelivered()); + session.commit(); + } else { + LOG.info("Rollback message " + msg.getText() + " id: " + msg.getJMSMessageID()); + session.rollback(); + } + } + } + consumer.close(); + session.close(); + } + } + + + @Test + public void testRedeliveryOnSingleSession() throws Exception { + + ConnectionFactory connectionFactory = + new ActiveMQConnectionFactory(brokerUrl); + Connection connection = connectionFactory.createConnection(); + connection.start(); + + populateDestination(nbMessages, destinationName, connection); + + // Consume messages and rollback transactions + { + AtomicInteger received = new AtomicInteger(); + Map<String, Boolean> rolledback = new ConcurrentHashMap<String, Boolean>(); + Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE); + Destination destination = session.createQueue(destinationName); + while (received.get() < nbMessages) { + MessageConsumer consumer = session.createConsumer(destination); + TextMessage msg = (TextMessage) consumer.receive(6000000); + if (msg != null) { + if (msg != null && rolledback.put(msg.getText(), Boolean.TRUE) != null) { + LOG.info("Received message " + msg.getText() + " (" + received.getAndIncrement() + ")" + msg.getJMSMessageID()); + assertTrue(msg.getJMSRedelivered()); + session.commit(); + } else { + LOG.info("Rollback message " + msg.getText() + " id: " + msg.getJMSMessageID()); + session.rollback(); + } + } + consumer.close(); + } + session.close(); + } + } + + // AMQ-1593 + @Test + public void testValidateRedeliveryCountOnRollback() throws Exception { + + final int numMessages = 1; + ConnectionFactory connectionFactory = + new ActiveMQConnectionFactory(brokerUrl); + Connection connection = connectionFactory.createConnection(); + connection.start(); + + populateDestination(numMessages, destinationName, connection); + + { + AtomicInteger received = new AtomicInteger(); + final int maxRetries = new RedeliveryPolicy().getMaximumRedeliveries(); + while (received.get() < maxRetries) { + Session session = connection.createSession(true, Session.SESSION_TRANSACTED); + Destination destination = session.createQueue(destinationName); + + MessageConsumer consumer = session.createConsumer(destination); + TextMessage msg = (TextMessage) consumer.receive(1000); + if (msg != null) { + LOG.info("Received message " + msg.getText() + " (" + received.getAndIncrement() + ")" + msg.getJMSMessageID()); + assertEquals("redelivery property matches deliveries", received.get(), msg.getLongProperty("JMSXDeliveryCount")); + session.rollback(); + } + session.close(); + } + consumeMessage(connection, maxRetries + 1); + } + } + + // AMQ-1593 + @Test + public void testValidateRedeliveryCountOnRollbackWithPrefetch0() throws Exception { + + final int numMessages = 1; + ConnectionFactory connectionFactory = + new ActiveMQConnectionFactory(brokerUrl + "?jms.prefetchPolicy.queuePrefetch=0"); + Connection connection = connectionFactory.createConnection(); + connection.start(); + + populateDestination(numMessages, destinationName, connection); + + { + AtomicInteger received = new AtomicInteger(); + final int maxRetries = new RedeliveryPolicy().getMaximumRedeliveries(); + while (received.get() < maxRetries) { + Session session = connection.createSession(true, Session.SESSION_TRANSACTED); + Destination destination = session.createQueue(destinationName); + + MessageConsumer consumer = session.createConsumer(destination); + TextMessage msg = (TextMessage) consumer.receive(1000); + if (msg != null) { + LOG.info("Received message " + msg.getText() + " (" + received.getAndIncrement() + ")" + msg.getJMSMessageID()); + assertEquals("redelivery property matches deliveries", received.get(), msg.getLongProperty("JMSXDeliveryCount")); + session.rollback(); + } + session.close(); + } + + consumeMessage(connection, maxRetries + 1); + } + } + + + private void consumeMessage(Connection connection, final int deliveryCount) + throws JMSException { + Session session = connection.createSession(true, Session.SESSION_TRANSACTED); + Destination destination = session.createQueue(destinationName); + MessageConsumer consumer = session.createConsumer(destination); + TextMessage msg = (TextMessage) consumer.receive(1000); + assertNotNull(msg); + assertEquals("redelivery property matches deliveries", deliveryCount, msg.getLongProperty("JMSXDeliveryCount")); + session.commit(); + session.close(); + } + + @Test + public void testRedeliveryPropertyWithNoRollback() throws Exception { + final int numMessages = 1; + ConnectionFactory connectionFactory = + new ActiveMQConnectionFactory(brokerUrl); + Connection connection = connectionFactory.createConnection(); + connection.start(); + + populateDestination(numMessages, destinationName, connection); + connection.close(); + + { + AtomicInteger received = new AtomicInteger(); + final int maxRetries = new RedeliveryPolicy().getMaximumRedeliveries(); + while (received.get() < maxRetries) { + connection = connectionFactory.createConnection(); + connection.start(); + Session session = connection.createSession(true, Session.SESSION_TRANSACTED); + Destination destination = session.createQueue(destinationName); + + MessageConsumer consumer = session.createConsumer(destination); + TextMessage msg = (TextMessage) consumer.receive(2000); + if (msg != null) { + LOG.info("Received message " + msg.getText() + " (" + received.getAndIncrement() + ")" + msg.getJMSMessageID()); + assertEquals("redelivery property matches deliveries", received.get(), msg.getLongProperty("JMSXDeliveryCount")); + } + session.close(); + connection.close(); + } + connection = connectionFactory.createConnection(); + connection.start(); + consumeMessage(connection, maxRetries + 1); + } + } + + private void populateDestination(final int nbMessages, + final String destinationName, Connection connection) + throws JMSException { + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + Destination destination = session.createQueue(destinationName); + MessageProducer producer = session.createProducer(destination); + for (int i = 1; i <= nbMessages; i++) { + producer.send(session.createTextMessage("<hello id='" + i + "'/>")); + } + producer.close(); + session.close(); + } + + + private void populateDestinationWithInterleavedProducer(final int nbMessages, + final String destinationName, Connection connection) + throws JMSException { + Session session1 = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + Destination destination1 = session1.createQueue(destinationName); + MessageProducer producer1 = session1.createProducer(destination1); + Session session2 = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + Destination destination2 = session2.createQueue(destinationName); + MessageProducer producer2 = session2.createProducer(destination2); + + for (int i = 1; i <= nbMessages; i++) { + if (i%2 == 0) { + producer1.send(session1.createTextMessage("<hello id='" + i + "'/>")); + } else { + producer2.send(session2.createTextMessage("<hello id='" + i + "'/>")); + } + } + producer1.close(); + session1.close(); + producer2.close(); + session2.close(); + } + +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JmsSendReceiveTestSupport.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JmsSendReceiveTestSupport.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JmsSendReceiveTestSupport.java new file mode 100644 index 0000000..d852f54 --- /dev/null +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JmsSendReceiveTestSupport.java @@ -0,0 +1,237 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.Date; +import java.util.Iterator; +import java.util.List; + +import javax.jms.DeliveryMode; +import javax.jms.Destination; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageListener; +import javax.jms.MessageProducer; +import javax.jms.Session; +import javax.jms.TextMessage; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * + */ +public class JmsSendReceiveTestSupport extends TestSupport implements MessageListener { + private static final Logger LOG = LoggerFactory.getLogger(JmsSendReceiveTestSupport.class); + + protected int messageCount = 100; + protected String[] data; + protected Session session; + protected MessageConsumer consumer; + protected MessageProducer producer; + protected Destination consumerDestination; + protected Destination producerDestination; + protected List<Message> messages = createConcurrentList(); + protected boolean topic = true; + protected boolean durable; + protected int deliveryMode = DeliveryMode.PERSISTENT; + protected final Object lock = new Object(); + protected boolean verbose; + + /* + * @see junit.framework.TestCase#setUp() + */ + protected void setUp() throws Exception { + super.setUp(); + String temp = System.getProperty("messageCount"); + + if (temp != null) { + int i = Integer.parseInt(temp); + if (i > 0) { + messageCount = i; + } + } + + LOG.info("Message count for test case is: " + messageCount); + data = new String[messageCount]; + + for (int i = 0; i < messageCount; i++) { + data[i] = "Text for message: " + i + " at " + new Date(); + } + } + + /** + * Sends and consumes the messages. + * + * @throws Exception + */ + public void testSendReceive() throws Exception { + messages.clear(); + for (int i = 0; i < data.length; i++) { + Message message = session.createTextMessage(data[i]); + message.setStringProperty("stringProperty", data[i]); + message.setIntProperty("intProperty", i); + + if (verbose) { + if (LOG.isDebugEnabled()) { + LOG.debug("About to send a message: " + message + " with text: " + data[i]); + } + } + + sendToProducer(producer, producerDestination, message); + messageSent(); + } + + assertMessagesAreReceived(); + LOG.info("" + data.length + " messages(s) received, closing down connections"); + } + + /** + * Sends a message to a destination using the supplied producer + * @param producer + * @param producerDestination + * @param message + * @throws JMSException + */ + protected void sendToProducer(MessageProducer producer, + Destination producerDestination, Message message) throws JMSException { + producer.send(producerDestination, message); + } + + /** + * Asserts messages are received. + * + * @throws JMSException + */ + protected void assertMessagesAreReceived() throws JMSException { + waitForMessagesToBeDelivered(); + assertMessagesReceivedAreValid(messages); + } + + /** + * Tests if the messages received are valid. + * + * @param receivedMessages - list of received messages. + * @throws JMSException + */ + protected void assertMessagesReceivedAreValid(List<Message> receivedMessages) throws JMSException { + List<Object> copyOfMessages = Arrays.asList(receivedMessages.toArray()); + int counter = 0; + + if (data.length != copyOfMessages.size()) { + for (Iterator<Object> iter = copyOfMessages.iterator(); iter.hasNext();) { + TextMessage message = (TextMessage)iter.next(); + if (LOG.isInfoEnabled()) { + LOG.info("<== " + counter++ + " = " + message.getText()); + } + } + } + + assertEquals("Not enough messages received", data.length, receivedMessages.size()); + + for (int i = 0; i < data.length; i++) { + TextMessage received = (TextMessage)receivedMessages.get(i); + String text = received.getText(); + String stringProperty = received.getStringProperty("stringProperty"); + int intProperty = received.getIntProperty("intProperty"); + + if (verbose) { + if (LOG.isDebugEnabled()) { + LOG.info("Received Text: " + text); + } + } + + assertEquals("Message: " + i, data[i], text); + assertEquals(data[i], stringProperty); + assertEquals(i, intProperty); + } + } + + /** + * Waits for messages to be delivered. + */ + protected void waitForMessagesToBeDelivered() { + long maxWaitTime = 60000; + long waitTime = maxWaitTime; + long start = (maxWaitTime <= 0) ? 0 : System.currentTimeMillis(); + + synchronized (lock) { + while (messages.size() < data.length && waitTime >= 0) { + try { + lock.wait(200); + } catch (InterruptedException e) { + e.printStackTrace(); + } + + waitTime = maxWaitTime - (System.currentTimeMillis() - start); + } + } + } + + /* + * (non-Javadoc) + * + * @see javax.jms.MessageListener#onMessage(javax.jms.Message) + */ + public synchronized void onMessage(Message message) { + consumeMessage(message, messages); + } + + /** + * Consumes messages. + * + * @param message - message to be consumed. + * @param messageList -list of consumed messages. + */ + protected void consumeMessage(Message message, List<Message> messageList) { + if (verbose) { + if (LOG.isDebugEnabled()) { + LOG.info("Received message: " + message); + } + } + + messageList.add(message); + + if (messageList.size() >= data.length) { + synchronized (lock) { + lock.notifyAll(); + } + } + } + + /** + * Returns the ArrayList as a synchronized list. + * + * @return List + */ + protected List<Message> createConcurrentList() { + return Collections.synchronizedList(new ArrayList<Message>()); + } + + /** + * Just a hook so can insert failure tests + * + * @throws Exception + */ + protected void messageSent() throws Exception { + + } +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JmsSendReceiveWithMessageExpirationTest.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JmsSendReceiveWithMessageExpirationTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JmsSendReceiveWithMessageExpirationTest.java new file mode 100644 index 0000000..391253e --- /dev/null +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JmsSendReceiveWithMessageExpirationTest.java @@ -0,0 +1,310 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq; + +import java.util.Date; +import java.util.Vector; +import java.util.concurrent.TimeUnit; + +import javax.jms.Connection; +import javax.jms.DeliveryMode; +import javax.jms.Destination; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Session; +import javax.jms.Topic; + +import org.apache.activemq.broker.BrokerRegistry; +import org.apache.activemq.broker.region.DestinationStatistics; +import org.apache.activemq.command.ActiveMQDestination; +import org.apache.activemq.util.Wait; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * + */ +public class JmsSendReceiveWithMessageExpirationTest extends TestSupport { + + private static final Logger LOG = LoggerFactory.getLogger(JmsSendReceiveWithMessageExpirationTest.class); + + protected int messageCount = 100; + protected String[] data; + protected Session session; + protected Destination consumerDestination; + protected Destination producerDestination; + protected boolean durable; + protected int deliveryMode = DeliveryMode.PERSISTENT; + protected long timeToLive = 5000; + protected boolean verbose; + + protected Connection connection; + + protected void setUp() throws Exception { + + super.setUp(); + + data = new String[messageCount]; + + for (int i = 0; i < messageCount; i++) { + data[i] = "Text for message: " + i + " at " + new Date(); + } + + connectionFactory = createConnectionFactory(); + connection = createConnection(); + + if (durable) { + connection.setClientID(getClass().getName()); + } + + session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + } + + /** + * Test consuming an expired queue. + * + * @throws Exception + */ + public void testConsumeExpiredQueue() throws Exception { + + MessageProducer producer = createProducer(timeToLive); + + consumerDestination = session.createQueue(getConsumerSubject()); + producerDestination = session.createQueue(getProducerSubject()); + + MessageConsumer consumer = createConsumer(); + connection.start(); + + for (int i = 0; i < data.length; i++) { + Message message = session.createTextMessage(data[i]); + message.setStringProperty("stringProperty", data[i]); + message.setIntProperty("intProperty", i); + + if (verbose) { + if (LOG.isDebugEnabled()) { + LOG.debug("About to send a queue message: " + message + " with text: " + data[i]); + } + } + + producer.send(producerDestination, message); + } + + // sleeps a second longer than the expiration time. + // Basically waits till queue expires. + Thread.sleep(timeToLive + 1000); + + // message should have expired. + assertNull(consumer.receive(1000)); + } + + public void testConsumeExpiredQueueAndDlq() throws Exception { + + MessageProducer producerNormal = createProducer(0); + MessageProducer producerExpire = createProducer(500); + + consumerDestination = session.createQueue("ActiveMQ.DLQ"); + MessageConsumer dlqConsumer = createConsumer(); + + consumerDestination = session.createQueue(getConsumerSubject()); + producerDestination = session.createQueue(getProducerSubject()); + + + Connection consumerConnection = createConnection(); + ActiveMQPrefetchPolicy prefetchPolicy = new ActiveMQPrefetchPolicy(); + prefetchPolicy.setAll(10); + ((ActiveMQConnection)consumerConnection).setPrefetchPolicy(prefetchPolicy); + Session consumerSession = consumerConnection.createSession(false, Session.CLIENT_ACKNOWLEDGE); + MessageConsumer consumer = consumerSession.createConsumer(consumerDestination); + consumerConnection.start(); + connection.start(); + + String msgBody = new String(new byte[20*1024]); + for (int i = 0; i < data.length; i++) { + Message message = session.createTextMessage(msgBody); + producerExpire.send(producerDestination, message); + } + + for (int i = 0; i < data.length; i++) { + Message message = session.createTextMessage(msgBody); + producerNormal.send(producerDestination, message); + } + + Vector<Message> messages = new Vector<Message>(); + Message received; + while ((received = consumer.receive(1000)) != null) { + messages.add(received); + if (messages.size() == 1) { + TimeUnit.SECONDS.sleep(1); + } + received.acknowledge(); + }; + + assertEquals("got all (normal plus one with ttl) messages", messageCount + 1, messages.size()); + + Vector<Message> dlqMessages = new Vector<Message>(); + while ((received = dlqConsumer.receive(1000)) != null) { + dlqMessages.add(received); + }; + + assertEquals("got dlq messages", data.length - 1, dlqMessages.size()); + + final DestinationStatistics view = getDestinationStatistics(BrokerRegistry.getInstance().findFirst(), ActiveMQDestination.transform(consumerDestination)); + + // wait for all to inflight to expire + assertTrue("all inflight messages expired ", Wait.waitFor(new Wait.Condition() { + @Override + public boolean isSatisified() throws Exception { + return view.getInflight().getCount() == 0; + } + })); + assertEquals("Wrong inFlightCount: ", 0, view.getInflight().getCount()); + + LOG.info("Stats: received: " + messages.size() + ", messages: " + view.getMessages().getCount() + ", enqueues: " + view.getEnqueues().getCount() + ", dequeues: " + view.getDequeues().getCount() + + ", dispatched: " + view.getDispatched().getCount() + ", inflight: " + view.getInflight().getCount() + ", expired: " + view.getExpired().getCount()); + + } + + /** + * Sends and consumes the messages to a queue destination. + * + * @throws Exception + */ + public void testConsumeQueue() throws Exception { + + MessageProducer producer = createProducer(0); + + consumerDestination = session.createQueue(getConsumerSubject()); + producerDestination = session.createQueue(getProducerSubject()); + + MessageConsumer consumer = createConsumer(); + connection.start(); + + for (int i = 0; i < data.length; i++) { + Message message = session.createTextMessage(data[i]); + message.setStringProperty("stringProperty", data[i]); + message.setIntProperty("intProperty", i); + + if (verbose) { + if (LOG.isDebugEnabled()) { + LOG.debug("About to send a queue message: " + message + " with text: " + data[i]); + } + } + + producer.send(producerDestination, message); + } + + // should receive a queue since there is no expiration. + assertNotNull(consumer.receive(1000)); + } + + /** + * Test consuming an expired topic. + * + * @throws Exception + */ + public void testConsumeExpiredTopic() throws Exception { + + MessageProducer producer = createProducer(timeToLive); + + consumerDestination = session.createTopic(getConsumerSubject()); + producerDestination = session.createTopic(getProducerSubject()); + + MessageConsumer consumer = createConsumer(); + connection.start(); + + for (int i = 0; i < data.length; i++) { + Message message = session.createTextMessage(data[i]); + message.setStringProperty("stringProperty", data[i]); + message.setIntProperty("intProperty", i); + + if (verbose) { + if (LOG.isDebugEnabled()) { + LOG.debug("About to send a topic message: " + message + " with text: " + data[i]); + } + } + + producer.send(producerDestination, message); + } + + // sleeps a second longer than the expiration time. + // Basically waits till topic expires. + Thread.sleep(timeToLive + 1000); + + // message should have expired. + assertNull(consumer.receive(1000)); + } + + /** + * Sends and consumes the messages to a topic destination. + * + * @throws Exception + */ + public void testConsumeTopic() throws Exception { + + MessageProducer producer = createProducer(0); + + consumerDestination = session.createTopic(getConsumerSubject()); + producerDestination = session.createTopic(getProducerSubject()); + + MessageConsumer consumer = createConsumer(); + connection.start(); + + for (int i = 0; i < data.length; i++) { + Message message = session.createTextMessage(data[i]); + message.setStringProperty("stringProperty", data[i]); + message.setIntProperty("intProperty", i); + + if (verbose) { + if (LOG.isDebugEnabled()) { + LOG.debug("About to send a topic message: " + message + " with text: " + data[i]); + } + } + + producer.send(producerDestination, message); + } + + // should receive a topic since there is no expiration. + assertNotNull(consumer.receive(1000)); + } + + protected MessageProducer createProducer(long timeToLive) throws JMSException { + MessageProducer producer = session.createProducer(null); + producer.setDeliveryMode(deliveryMode); + producer.setTimeToLive(timeToLive); + + return producer; + } + + protected MessageConsumer createConsumer() throws JMSException { + if (durable) { + LOG.info("Creating durable consumer"); + return session.createDurableSubscriber((Topic)consumerDestination, getName()); + } + return session.createConsumer(consumerDestination); + } + + protected void tearDown() throws Exception { + LOG.info("Dumping stats..."); + LOG.info("Closing down connection"); + + session.close(); + connection.close(); + } + +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JmsSendWithAsyncCallbackTest.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JmsSendWithAsyncCallbackTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JmsSendWithAsyncCallbackTest.java new file mode 100644 index 0000000..cf14453 --- /dev/null +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JmsSendWithAsyncCallbackTest.java @@ -0,0 +1,127 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq; + +import java.util.concurrent.CountDownLatch; + +import javax.jms.Connection; +import javax.jms.DeliveryMode; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageListener; +import javax.jms.Queue; +import javax.jms.Session; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * + */ +public class JmsSendWithAsyncCallbackTest extends TestSupport { + + private static final Logger LOG = LoggerFactory.getLogger(JmsSendWithAsyncCallbackTest.class); + + private Connection connection; + + @Override + protected void setUp() throws Exception { + super.setUp(); + connection = createConnection(); + } + + /** + * @see junit.framework.TestCase#tearDown() + */ + @Override + protected void tearDown() throws Exception { + if (connection != null) { + connection.close(); + connection = null; + } + super.tearDown(); + } + + public void testAsyncCallbackIsFaster() throws JMSException, InterruptedException { + connection.start(); + + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + Queue queue = session.createQueue(getName()); + + // setup a consumer to drain messages.. + MessageConsumer consumer = session.createConsumer(queue); + consumer.setMessageListener(new MessageListener() { + @Override + public void onMessage(Message message) { + } + }); + + // warmup... + for (int i = 0; i < 10; i++) { + benchmarkNonCallbackRate(); + benchmarkCallbackRate(); + } + + double callbackRate = benchmarkCallbackRate(); + double nonCallbackRate = benchmarkNonCallbackRate(); + + LOG.info(String.format("AsyncCallback Send rate: %,.2f m/s", callbackRate)); + LOG.info(String.format("NonAsyncCallback Send rate: %,.2f m/s", nonCallbackRate)); + + // The async style HAS to be faster than the non-async style.. + assertTrue("async rate[" + callbackRate + "] should beat non-async rate[" + nonCallbackRate + "]", callbackRate / nonCallbackRate > 1.5); + } + + private double benchmarkNonCallbackRate() throws JMSException { + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + Queue queue = session.createQueue(getName()); + int count = 1000; + ActiveMQMessageProducer producer = (ActiveMQMessageProducer) session.createProducer(queue); + producer.setDeliveryMode(DeliveryMode.PERSISTENT); + long start = System.currentTimeMillis(); + for (int i = 0; i < count; i++) { + producer.send(session.createTextMessage("Hello")); + } + return 1000.0 * count / (System.currentTimeMillis() - start); + } + + private double benchmarkCallbackRate() throws JMSException, InterruptedException { + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + Queue queue = session.createQueue(getName()); + int count = 1000; + final CountDownLatch messagesSent = new CountDownLatch(count); + ActiveMQMessageProducer producer = (ActiveMQMessageProducer) session.createProducer(queue); + producer.setDeliveryMode(DeliveryMode.PERSISTENT); + long start = System.currentTimeMillis(); + for (int i = 0; i < count; i++) { + producer.send(session.createTextMessage("Hello"), new AsyncCallback() { + @Override + public void onSuccess() { + messagesSent.countDown(); + } + + @Override + public void onException(JMSException exception) { + exception.printStackTrace(); + } + }); + } + messagesSent.await(); + return 1000.0 * count / (System.currentTimeMillis() - start); + } +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JmsSessionRecoverTest.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JmsSessionRecoverTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JmsSessionRecoverTest.java new file mode 100644 index 0000000..a9e1b24 --- /dev/null +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JmsSessionRecoverTest.java @@ -0,0 +1,294 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +import javax.jms.Connection; +import javax.jms.DeliveryMode; +import javax.jms.Destination; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageListener; +import javax.jms.MessageProducer; +import javax.jms.Session; +import javax.jms.TextMessage; + +import junit.framework.TestCase; +import org.apache.activemq.command.ActiveMQQueue; +import org.apache.activemq.command.ActiveMQTopic; + +/** + * Testcases to see if Session.recover() work. + * + * + */ +public class JmsSessionRecoverTest extends TestCase { + + private Connection connection; + private ActiveMQConnectionFactory factory; + private Destination dest; + + /** + * @see junit.framework.TestCase#setUp() + */ + protected void setUp() throws Exception { + factory = new ActiveMQConnectionFactory("vm://localhost?broker.persistent=false"); + connection = factory.createConnection(); + } + + /** + * @see junit.framework.TestCase#tearDown() + */ + protected void tearDown() throws Exception { + if (connection != null) { + connection.close(); + connection = null; + } + } + + /** + * + * @throws JMSException + * @throws InterruptedException + */ + public void testQueueSynchRecover() throws JMSException, InterruptedException { + dest = new ActiveMQQueue("Queue-" + System.currentTimeMillis()); + doTestSynchRecover(); + } + + /** + * + * @throws JMSException + * @throws InterruptedException + */ + public void testQueueAsynchRecover() throws JMSException, InterruptedException { + dest = new ActiveMQQueue("Queue-" + System.currentTimeMillis()); + doTestAsynchRecover(); + } + + /** + * + * @throws JMSException + * @throws InterruptedException + */ + public void testTopicSynchRecover() throws JMSException, InterruptedException { + dest = new ActiveMQTopic("Topic-" + System.currentTimeMillis()); + doTestSynchRecover(); + } + + /** + * + * @throws JMSException + * @throws InterruptedException + */ + public void testTopicAsynchRecover() throws JMSException, InterruptedException { + dest = new ActiveMQTopic("Topic-" + System.currentTimeMillis()); + doTestAsynchRecover(); + } + + /** + * + * @throws JMSException + * @throws InterruptedException + */ + public void testQueueAsynchRecoverWithAutoAck() throws JMSException, InterruptedException { + dest = new ActiveMQQueue("Queue-" + System.currentTimeMillis()); + doTestAsynchRecoverWithAutoAck(); + } + + /** + * + * @throws JMSException + * @throws InterruptedException + */ + public void testTopicAsynchRecoverWithAutoAck() throws JMSException, InterruptedException { + dest = new ActiveMQTopic("Topic-" + System.currentTimeMillis()); + doTestAsynchRecoverWithAutoAck(); + } + + /** + * Test to make sure that a Sync recover works. + * + * @throws JMSException + */ + public void doTestSynchRecover() throws JMSException { + Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE); + MessageConsumer consumer = session.createConsumer(dest); + connection.start(); + + MessageProducer producer = session.createProducer(dest); + producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); + producer.send(session.createTextMessage("First")); + producer.send(session.createTextMessage("Second")); + + TextMessage message = (TextMessage)consumer.receive(1000); + assertEquals("First", message.getText()); + assertFalse(message.getJMSRedelivered()); + message.acknowledge(); + + message = (TextMessage)consumer.receive(1000); + assertEquals("Second", message.getText()); + assertFalse(message.getJMSRedelivered()); + + session.recover(); + + message = (TextMessage)consumer.receive(2000); + assertEquals("Second", message.getText()); + assertTrue(message.getJMSRedelivered()); + + message.acknowledge(); + } + + /** + * Test to make sure that a Async recover works. + * + * @throws JMSException + * @throws InterruptedException + */ + public void doTestAsynchRecover() throws JMSException, InterruptedException { + + final Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE); + final String errorMessage[] = new String[] {null}; + final CountDownLatch doneCountDownLatch = new CountDownLatch(1); + + MessageConsumer consumer = session.createConsumer(dest); + + MessageProducer producer = session.createProducer(dest); + producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); + producer.send(session.createTextMessage("First")); + producer.send(session.createTextMessage("Second")); + + consumer.setMessageListener(new MessageListener() { + int counter; + + public void onMessage(Message msg) { + counter++; + try { + TextMessage message = (TextMessage)msg; + switch (counter) { + case 1: + assertEquals("First", message.getText()); + assertFalse(message.getJMSRedelivered()); + message.acknowledge(); + + break; + case 2: + assertEquals("Second", message.getText()); + assertFalse(message.getJMSRedelivered()); + session.recover(); + break; + + case 3: + assertEquals("Second", message.getText()); + assertTrue(message.getJMSRedelivered()); + message.acknowledge(); + doneCountDownLatch.countDown(); + break; + + default: + errorMessage[0] = "Got too many messages: " + counter; + doneCountDownLatch.countDown(); + } + } catch (Throwable e) { + e.printStackTrace(); + errorMessage[0] = "Got exception: " + e; + doneCountDownLatch.countDown(); + } + } + }); + connection.start(); + + if (doneCountDownLatch.await(5, TimeUnit.SECONDS)) { + if (errorMessage[0] != null) { + fail(errorMessage[0]); + } + } else { + fail("Timeout waiting for async message delivery to complete."); + } + + } + + /** + * Test to make sure that a Async recover works when using AUTO_ACKNOWLEDGE. + * + * @throws JMSException + * @throws InterruptedException + */ + public void doTestAsynchRecoverWithAutoAck() throws JMSException, InterruptedException { + + final Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + final String errorMessage[] = new String[] {null}; + final CountDownLatch doneCountDownLatch = new CountDownLatch(1); + + MessageConsumer consumer = session.createConsumer(dest); + + MessageProducer producer = session.createProducer(dest); + producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); + producer.send(session.createTextMessage("First")); + producer.send(session.createTextMessage("Second")); + + consumer.setMessageListener(new MessageListener() { + int counter; + + public void onMessage(Message msg) { + counter++; + try { + TextMessage message = (TextMessage)msg; + switch (counter) { + case 1: + assertEquals("First", message.getText()); + assertFalse(message.getJMSRedelivered()); + break; + case 2: + // This should rollback the delivery of this message.. + // and re-deliver. + assertEquals("Second", message.getText()); + assertFalse(message.getJMSRedelivered()); + session.recover(); + break; + + case 3: + assertEquals("Second", message.getText()); + assertTrue(message.getJMSRedelivered()); + doneCountDownLatch.countDown(); + break; + + default: + errorMessage[0] = "Got too many messages: " + counter; + doneCountDownLatch.countDown(); + } + } catch (Throwable e) { + e.printStackTrace(); + errorMessage[0] = "Got exception: " + e; + doneCountDownLatch.countDown(); + } + } + }); + connection.start(); + + if (doneCountDownLatch.await(5000, TimeUnit.SECONDS)) { + if (errorMessage[0] != null) { + fail(errorMessage[0]); + } + } else { + fail("Timeout waiting for async message delivery to complete."); + } + } +}
