http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/ProducerFlowControlTest.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/ProducerFlowControlTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/ProducerFlowControlTest.java new file mode 100644 index 0000000..70e15ec --- /dev/null +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/ProducerFlowControlTest.java @@ -0,0 +1,348 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq; + +import java.io.IOException; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; + +import javax.jms.ConnectionFactory; +import javax.jms.DeliveryMode; +import javax.jms.JMSException; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Session; +import javax.jms.TextMessage; + +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.broker.TransportConnector; +import org.apache.activemq.broker.region.policy.PolicyEntry; +import org.apache.activemq.broker.region.policy.PolicyMap; +import org.apache.activemq.broker.region.policy.VMPendingQueueMessageStoragePolicy; +import org.apache.activemq.broker.region.policy.VMPendingSubscriberMessageStoragePolicy; +import org.apache.activemq.command.ActiveMQQueue; +import org.apache.activemq.transport.tcp.TcpTransport; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class ProducerFlowControlTest extends JmsTestSupport { + static final Logger LOG = LoggerFactory.getLogger(ProducerFlowControlTest.class); + ActiveMQQueue queueA = new ActiveMQQueue("QUEUE.A"); + ActiveMQQueue queueB = new ActiveMQQueue("QUEUE.B"); + protected TransportConnector connector; + protected ActiveMQConnection connection; + // used to test sendFailIfNoSpace on SystemUsage + protected final AtomicBoolean gotResourceException = new AtomicBoolean(false); + + public void test2ndPubisherWithProducerWindowSendConnectionThatIsBlocked() throws Exception { + ActiveMQConnectionFactory factory = (ActiveMQConnectionFactory)createConnectionFactory(); + factory.setProducerWindowSize(1024 * 64); + connection = (ActiveMQConnection)factory.createConnection(); + connections.add(connection); + connection.start(); + + Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE); + MessageConsumer consumer = session.createConsumer(queueB); + + // Test sending to Queue A + // 1 few sends should not block until the producer window is used up. + fillQueue(queueA); + + // Test sending to Queue B it should not block since the connection + // should not be blocked. + CountDownLatch pubishDoneToQeueuB = asyncSendTo(queueB, "Message 1"); + assertTrue(pubishDoneToQeueuB.await(2, TimeUnit.SECONDS)); + + TextMessage msg = (TextMessage)consumer.receive(); + assertEquals("Message 1", msg.getText()); + msg.acknowledge(); + + pubishDoneToQeueuB = asyncSendTo(queueB, "Message 2"); + assertTrue(pubishDoneToQeueuB.await(2, TimeUnit.SECONDS)); + + msg = (TextMessage)consumer.receive(); + assertEquals("Message 2", msg.getText()); + msg.acknowledge(); + } + + public void testPubisherRecoverAfterBlock() throws Exception { + ActiveMQConnectionFactory factory = (ActiveMQConnectionFactory)createConnectionFactory(); + connection = (ActiveMQConnection)factory.createConnection(); + connections.add(connection); + connection.start(); + + final Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE); + final MessageProducer producer = session.createProducer(queueA); + + final AtomicBoolean done = new AtomicBoolean(true); + final AtomicBoolean keepGoing = new AtomicBoolean(true); + + + Thread thread = new Thread("Filler") { + int i; + @Override + public void run() { + while (keepGoing.get()) { + done.set(false); + try { + producer.send(session.createTextMessage("Test message " + ++i)); + LOG.info("sent: " + i); + } catch (JMSException e) { + } + } + } + }; + thread.start(); + waitForBlockedOrResourceLimit(done); + + // after receiveing messges, producer should continue sending messages + // (done == false) + MessageConsumer consumer = session.createConsumer(queueA); + TextMessage msg; + for (int idx = 0; idx < 5; ++idx) { + msg = (TextMessage) consumer.receive(1000); + LOG.info("received: " + idx + ", msg: " + msg.getJMSMessageID()); + msg.acknowledge(); + } + Thread.sleep(1000); + keepGoing.set(false); + + assertFalse("producer has resumed", done.get()); + } + + public void testAsyncPubisherRecoverAfterBlock() throws Exception { + ActiveMQConnectionFactory factory = (ActiveMQConnectionFactory)createConnectionFactory(); + factory.setProducerWindowSize(1024 * 5); + factory.setUseAsyncSend(true); + connection = (ActiveMQConnection)factory.createConnection(); + connections.add(connection); + connection.start(); + + final Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE); + final MessageProducer producer = session.createProducer(queueA); + + final AtomicBoolean done = new AtomicBoolean(true); + final AtomicBoolean keepGoing = new AtomicBoolean(true); + + + Thread thread = new Thread("Filler") { + int i; + @Override + public void run() { + while (keepGoing.get()) { + done.set(false); + try { + producer.send(session.createTextMessage("Test message " + ++i)); + LOG.info("sent: " + i); + } catch (JMSException e) { + } + } + } + }; + thread.start(); + waitForBlockedOrResourceLimit(done); + + // after receiveing messges, producer should continue sending messages + // (done == false) + MessageConsumer consumer = session.createConsumer(queueA); + TextMessage msg; + for (int idx = 0; idx < 5; ++idx) { + msg = (TextMessage) consumer.receive(1000); + assertNotNull("Got a message", msg); + LOG.info("received: " + idx + ", msg: " + msg.getJMSMessageID()); + msg.acknowledge(); + } + Thread.sleep(1000); + keepGoing.set(false); + + assertFalse("producer has resumed", done.get()); + } + + public void test2ndPubisherWithSyncSendConnectionThatIsBlocked() throws Exception { + ActiveMQConnectionFactory factory = (ActiveMQConnectionFactory)createConnectionFactory(); + factory.setAlwaysSyncSend(true); + connection = (ActiveMQConnection)factory.createConnection(); + connections.add(connection); + connection.start(); + + Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE); + MessageConsumer consumer = session.createConsumer(queueB); + + // Test sending to Queue A + // 1st send should not block. But the rest will. + fillQueue(queueA); + + // Test sending to Queue B it should not block. + CountDownLatch pubishDoneToQeueuB = asyncSendTo(queueB, "Message 1"); + assertTrue(pubishDoneToQeueuB.await(2, TimeUnit.SECONDS)); + + TextMessage msg = (TextMessage)consumer.receive(); + assertEquals("Message 1", msg.getText()); + msg.acknowledge(); + + pubishDoneToQeueuB = asyncSendTo(queueB, "Message 2"); + assertTrue(pubishDoneToQeueuB.await(2, TimeUnit.SECONDS)); + + msg = (TextMessage)consumer.receive(); + assertEquals("Message 2", msg.getText()); + msg.acknowledge(); + } + + public void testSimpleSendReceive() throws Exception { + ActiveMQConnectionFactory factory = (ActiveMQConnectionFactory)createConnectionFactory(); + factory.setAlwaysSyncSend(true); + connection = (ActiveMQConnection)factory.createConnection(); + connections.add(connection); + connection.start(); + + Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE); + MessageConsumer consumer = session.createConsumer(queueA); + + // Test sending to Queue B it should not block. + CountDownLatch pubishDoneToQeueuA = asyncSendTo(queueA, "Message 1"); + assertTrue(pubishDoneToQeueuA.await(2, TimeUnit.SECONDS)); + + TextMessage msg = (TextMessage)consumer.receive(); + assertEquals("Message 1", msg.getText()); + msg.acknowledge(); + + pubishDoneToQeueuA = asyncSendTo(queueA, "Message 2"); + assertTrue(pubishDoneToQeueuA.await(2, TimeUnit.SECONDS)); + + msg = (TextMessage)consumer.receive(); + assertEquals("Message 2", msg.getText()); + msg.acknowledge(); + } + + public void test2ndPubisherWithStandardConnectionThatIsBlocked() throws Exception { + ConnectionFactory factory = createConnectionFactory(); + connection = (ActiveMQConnection)factory.createConnection(); + connections.add(connection); + connection.start(); + + // Test sending to Queue A + // 1st send should not block. + fillQueue(queueA); + + // Test sending to Queue B it should block. + // Since even though the it's queue limits have not been reached, the + // connection + // is blocked. + CountDownLatch pubishDoneToQeueuB = asyncSendTo(queueB, "Message 1"); + assertFalse(pubishDoneToQeueuB.await(2, TimeUnit.SECONDS)); + } + + private void fillQueue(final ActiveMQQueue queue) throws JMSException, InterruptedException { + final AtomicBoolean done = new AtomicBoolean(true); + final AtomicBoolean keepGoing = new AtomicBoolean(true); + + // Starts an async thread that every time it publishes it sets the done + // flag to false. + // Once the send starts to block it will not reset the done flag + // anymore. + new Thread("Fill thread.") { + public void run() { + Session session = null; + try { + session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + MessageProducer producer = session.createProducer(queue); + producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); + while (keepGoing.get()) { + done.set(false); + producer.send(session.createTextMessage("Hello World")); + } + } catch (JMSException e) { + } finally { + safeClose(session); + } + } + }.start(); + + waitForBlockedOrResourceLimit(done); + keepGoing.set(false); + } + + protected void waitForBlockedOrResourceLimit(final AtomicBoolean done) + throws InterruptedException { + while (true) { + Thread.sleep(1000); + // the producer is blocked once the done flag stays true or there is a resource exception + if (done.get() || gotResourceException.get()) { + break; + } + done.set(true); + } + } + + private CountDownLatch asyncSendTo(final ActiveMQQueue queue, final String message) throws JMSException { + final CountDownLatch done = new CountDownLatch(1); + new Thread("Send thread.") { + public void run() { + Session session = null; + try { + session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + MessageProducer producer = session.createProducer(queue); + producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); + producer.send(session.createTextMessage(message)); + done.countDown(); + } catch (JMSException e) { + } finally { + safeClose(session); + } + } + }.start(); + return done; + } + + protected BrokerService createBroker() throws Exception { + BrokerService service = new BrokerService(); + service.setPersistent(false); + service.setUseJmx(false); + + // Setup a destination policy where it takes only 1 message at a time. + PolicyMap policyMap = new PolicyMap(); + PolicyEntry policy = new PolicyEntry(); + policy.setMemoryLimit(1); + policy.setPendingSubscriberPolicy(new VMPendingSubscriberMessageStoragePolicy()); + policy.setPendingQueuePolicy(new VMPendingQueueMessageStoragePolicy()); + policy.setProducerFlowControl(true); + policyMap.setDefaultEntry(policy); + service.setDestinationPolicy(policyMap); + + connector = service.addConnector("tcp://localhost:0"); + return service; + } + + public void setUp() throws Exception { + setAutoFail(true); + super.setUp(); + } + + protected void tearDown() throws Exception { + if (connection != null) { + TcpTransport t = (TcpTransport)connection.getTransport().narrow(TcpTransport.class); + t.getTransportListener().onException(new IOException("Disposed.")); + connection.getTransport().stop(); + } + super.tearDown(); + } + + protected ConnectionFactory createConnectionFactory() throws Exception { + return new ActiveMQConnectionFactory(connector.getConnectUri()); + } +}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/QueueConsumerPriorityTest.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/QueueConsumerPriorityTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/QueueConsumerPriorityTest.java new file mode 100644 index 0000000..2a61820 --- /dev/null +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/QueueConsumerPriorityTest.java @@ -0,0 +1,94 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq; + +import javax.jms.Connection; +import javax.jms.ConnectionFactory; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Session; + +import junit.framework.TestCase; + +import org.apache.activemq.command.ActiveMQQueue; + +public class QueueConsumerPriorityTest extends TestCase { + + private static final String VM_BROKER_URL = "vm://localhost?broker.persistent=false&broker.useJmx=true"; + + public QueueConsumerPriorityTest(String name) { + super(name); + } + + @Override + protected void setUp() throws Exception { + super.setUp(); + } + + @Override + protected void tearDown() throws Exception { + super.tearDown(); + } + + private Connection createConnection(final boolean start) throws JMSException { + ConnectionFactory cf = new ActiveMQConnectionFactory(VM_BROKER_URL); + Connection conn = cf.createConnection(); + if (start) { + conn.start(); + } + return conn; + } + + public void testQueueConsumerPriority() throws JMSException, InterruptedException { + Connection conn = createConnection(true); + + Session consumerLowPriority = null; + Session consumerHighPriority = null; + Session senderSession = null; + + try { + + consumerLowPriority = conn.createSession(false, Session.AUTO_ACKNOWLEDGE); + consumerHighPriority = conn.createSession(false, Session.AUTO_ACKNOWLEDGE); + assertNotNull(consumerHighPriority); + senderSession = conn.createSession(false, Session.AUTO_ACKNOWLEDGE); + String queueName = getClass().getName(); + ActiveMQQueue low = new ActiveMQQueue(queueName+"?consumer.priority=1"); + MessageConsumer lowConsumer = consumerLowPriority.createConsumer(low); + + ActiveMQQueue high = new ActiveMQQueue(queueName+"?consumer.priority=2"); + MessageConsumer highConsumer = consumerLowPriority.createConsumer(high); + + ActiveMQQueue senderQueue = new ActiveMQQueue(queueName); + + MessageProducer producer = senderSession.createProducer(senderQueue); + + Message msg = senderSession.createTextMessage("test"); + for (int i =0; i< 10000;i++) { + producer.send(msg); + assertNotNull("null on iteration: " + i, highConsumer.receive(500)); + } + assertNull(lowConsumer.receive(2000)); + + } finally { + conn.close(); + } + } +} + http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/ReconnectWithSameClientIDTest.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/ReconnectWithSameClientIDTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/ReconnectWithSameClientIDTest.java new file mode 100644 index 0000000..81d13b2 --- /dev/null +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/ReconnectWithSameClientIDTest.java @@ -0,0 +1,116 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq; + +import java.util.concurrent.atomic.AtomicBoolean; +import javax.jms.Connection; +import javax.jms.ConnectionFactory; +import javax.jms.InvalidClientIDException; +import javax.jms.JMSException; +import javax.jms.Session; + +import junit.framework.Test; +import org.apache.activemq.util.DefaultTestAppender; +import org.apache.log4j.Appender; +import org.apache.log4j.spi.LoggingEvent; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class ReconnectWithSameClientIDTest extends EmbeddedBrokerTestSupport { + private static final Logger LOG = LoggerFactory.getLogger(ReconnectWithSameClientIDTest.class); + + protected Connection connection; + protected boolean transacted; + protected int authMode = Session.AUTO_ACKNOWLEDGE; + public boolean useFailover = false; + + public static Test suite() { + return suite(ReconnectWithSameClientIDTest.class); + } + + public void initCombosForTestReconnectMultipleTimesWithSameClientID() { + addCombinationValues("useFailover", new Object[]{Boolean.FALSE, Boolean.TRUE}); + } + + public void testReconnectMultipleTimesWithSameClientID() throws Exception { + + org.apache.log4j.Logger log4jLogger = + org.apache.log4j.Logger.getLogger(org.apache.activemq.broker.jmx.ManagedTransportConnection.class); + final AtomicBoolean failed = new AtomicBoolean(false); + + Appender appender = new DefaultTestAppender() { + @Override + public void doAppend(LoggingEvent event) { + if (event.getMessage().toString().startsWith("Failed to register MBean")) { + LOG.info("received unexpected log message: " + event.getMessage()); + failed.set(true); + } + } + }; + log4jLogger.addAppender(appender); + try { + connection = connectionFactory.createConnection(); + useConnection(connection); + + // now lets create another which should fail + for (int i = 1; i < 11; i++) { + Connection connection2 = connectionFactory.createConnection(); + try { + useConnection(connection2); + fail("Should have thrown InvalidClientIDException on attempt" + i); + } catch (InvalidClientIDException e) { + LOG.info("Caught expected: " + e); + } finally { + connection2.close(); + } + } + + // now lets try closing the original connection and creating a new + // connection with the same ID + connection.close(); + connection = connectionFactory.createConnection(); + useConnection(connection); + } finally { + log4jLogger.removeAppender(appender); + } + assertFalse("failed on unexpected log event", failed.get()); + } + + @Override + protected ConnectionFactory createConnectionFactory() throws Exception { + return new ActiveMQConnectionFactory((useFailover ? "failover:" : "") + + broker.getTransportConnectors().get(0).getPublishableConnectString()); + } + + protected void setUp() throws Exception { + bindAddress = "tcp://localhost:0"; + super.setUp(); + } + + protected void tearDown() throws Exception { + if (connection != null) { + connection.close(); + connection = null; + } + super.tearDown(); + } + + protected void useConnection(Connection connection) throws JMSException { + connection.setClientID("foo"); + connection.start(); + } +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/RedeliveryPolicyTest.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/RedeliveryPolicyTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/RedeliveryPolicyTest.java new file mode 100644 index 0000000..e2b5867 --- /dev/null +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/RedeliveryPolicyTest.java @@ -0,0 +1,830 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageListener; +import javax.jms.MessageProducer; +import javax.jms.ServerSession; +import javax.jms.ServerSessionPool; +import javax.jms.Session; +import javax.jms.TextMessage; +import junit.framework.Test; + +import org.apache.activemq.broker.region.policy.RedeliveryPolicyMap; +import org.apache.activemq.command.ActiveMQMessage; +import org.apache.activemq.command.ActiveMQQueue; +import org.apache.activemq.command.ActiveMQTextMessage; +import org.apache.activemq.command.ActiveMQTopic; +import org.apache.activemq.util.Wait; + +public class RedeliveryPolicyTest extends JmsTestSupport { + + public static Test suite() { + return suite(RedeliveryPolicyTest.class); + } + + public static void main(String[] args) { + junit.textui.TestRunner.run(suite()); + } + + + public void testGetNext() throws Exception { + + RedeliveryPolicy policy = new RedeliveryPolicy(); + policy.setInitialRedeliveryDelay(0); + policy.setRedeliveryDelay(500); + policy.setBackOffMultiplier((short) 2); + policy.setUseExponentialBackOff(true); + + long delay = policy.getNextRedeliveryDelay(0); + assertEquals(500, delay); + delay = policy.getNextRedeliveryDelay(delay); + assertEquals(500*2, delay); + delay = policy.getNextRedeliveryDelay(delay); + assertEquals(500*4, delay); + + policy.setUseExponentialBackOff(false); + delay = policy.getNextRedeliveryDelay(delay); + assertEquals(500, delay); + } + + /** + * @throws Exception + */ + public void testExponentialRedeliveryPolicyDelaysDeliveryOnRollback() throws Exception { + + // Receive a message with the JMS API + RedeliveryPolicy policy = connection.getRedeliveryPolicy(); + policy.setInitialRedeliveryDelay(0); + policy.setRedeliveryDelay(500); + policy.setBackOffMultiplier((short) 2); + policy.setUseExponentialBackOff(true); + + connection.start(); + Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE); + ActiveMQQueue destination = new ActiveMQQueue(getName()); + MessageProducer producer = session.createProducer(destination); + + MessageConsumer consumer = session.createConsumer(destination); + + // Send the messages + producer.send(session.createTextMessage("1st")); + producer.send(session.createTextMessage("2nd")); + session.commit(); + + TextMessage m; + m = (TextMessage)consumer.receive(1000); + assertNotNull(m); + assertEquals("1st", m.getText()); + session.rollback(); + + // No delay on first rollback.. + m = (TextMessage)consumer.receive(100); + assertNotNull(m); + session.rollback(); + + // Show subsequent re-delivery delay is incrementing. + m = (TextMessage)consumer.receive(100); + assertNull(m); + + m = (TextMessage)consumer.receive(700); + assertNotNull(m); + assertEquals("1st", m.getText()); + session.rollback(); + + // Show re-delivery delay is incrementing exponentially + m = (TextMessage)consumer.receive(100); + assertNull(m); + m = (TextMessage)consumer.receive(500); + assertNull(m); + m = (TextMessage)consumer.receive(700); + assertNotNull(m); + assertEquals("1st", m.getText()); + + } + + + /** + * @throws Exception + */ + public void testNornalRedeliveryPolicyDelaysDeliveryOnRollback() throws Exception { + + // Receive a message with the JMS API + RedeliveryPolicy policy = connection.getRedeliveryPolicy(); + policy.setInitialRedeliveryDelay(0); + policy.setRedeliveryDelay(500); + + connection.start(); + Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE); + ActiveMQQueue destination = new ActiveMQQueue(getName()); + MessageProducer producer = session.createProducer(destination); + + MessageConsumer consumer = session.createConsumer(destination); + + // Send the messages + producer.send(session.createTextMessage("1st")); + producer.send(session.createTextMessage("2nd")); + session.commit(); + + TextMessage m; + m = (TextMessage)consumer.receive(1000); + assertNotNull(m); + assertEquals("1st", m.getText()); + session.rollback(); + + // No delay on first rollback.. + m = (TextMessage)consumer.receive(100); + assertNotNull(m); + session.rollback(); + + // Show subsequent re-delivery delay is incrementing. + m = (TextMessage)consumer.receive(100); + assertNull(m); + m = (TextMessage)consumer.receive(700); + assertNotNull(m); + assertEquals("1st", m.getText()); + session.rollback(); + + // The message gets redelivered after 500 ms every time since + // we are not using exponential backoff. + m = (TextMessage)consumer.receive(100); + assertNull(m); + m = (TextMessage)consumer.receive(700); + assertNotNull(m); + assertEquals("1st", m.getText()); + + } + + /** + * @throws Exception + */ + public void testDLQHandling() throws Exception { + + // Receive a message with the JMS API + RedeliveryPolicy policy = connection.getRedeliveryPolicy(); + policy.setInitialRedeliveryDelay(100); + policy.setUseExponentialBackOff(false); + policy.setMaximumRedeliveries(2); + + connection.start(); + Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE); + ActiveMQQueue destination = new ActiveMQQueue("TEST"); + MessageProducer producer = session.createProducer(destination); + + MessageConsumer consumer = session.createConsumer(destination); + MessageConsumer dlqConsumer = session.createConsumer(new ActiveMQQueue("ActiveMQ.DLQ")); + + // Send the messages + producer.send(session.createTextMessage("1st")); + producer.send(session.createTextMessage("2nd")); + session.commit(); + + TextMessage m; + m = (TextMessage)consumer.receive(1000); + assertNotNull(m); + assertEquals("1st", m.getText()); + session.rollback(); + + m = (TextMessage)consumer.receive(1000); + assertNotNull(m); + assertEquals("1st", m.getText()); + session.rollback(); + + m = (TextMessage)consumer.receive(2000); + assertNotNull(m); + assertEquals("1st", m.getText()); + session.rollback(); + + // The last rollback should cause the 1st message to get sent to the DLQ + m = (TextMessage)consumer.receive(1000); + assertNotNull(m); + assertEquals("2nd", m.getText()); + session.commit(); + + // We should be able to get the message off the DLQ now. + m = (TextMessage)dlqConsumer.receive(1000); + assertNotNull(m); + assertEquals("1st", m.getText()); + String cause = m.getStringProperty(ActiveMQMessage.DLQ_DELIVERY_FAILURE_CAUSE_PROPERTY); + assertTrue("cause exception has policy ref", cause.contains("RedeliveryPolicy")); + session.commit(); + + } + + + /** + * @throws Exception + */ + public void testInfiniteMaximumNumberOfRedeliveries() throws Exception { + + // Receive a message with the JMS API + RedeliveryPolicy policy = connection.getRedeliveryPolicy(); + policy.setInitialRedeliveryDelay(100); + policy.setUseExponentialBackOff(false); + // let's set the maximum redeliveries to no maximum (ie. infinite) + policy.setMaximumRedeliveries(-1); + + + connection.start(); + Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE); + ActiveMQQueue destination = new ActiveMQQueue("TEST"); + MessageProducer producer = session.createProducer(destination); + + MessageConsumer consumer = session.createConsumer(destination); + + // Send the messages + producer.send(session.createTextMessage("1st")); + producer.send(session.createTextMessage("2nd")); + session.commit(); + + TextMessage m; + + m = (TextMessage)consumer.receive(1000); + assertNotNull(m); + assertEquals("1st", m.getText()); + session.rollback(); + + //we should be able to get the 1st message redelivered until a session.commit is called + m = (TextMessage)consumer.receive(1000); + assertNotNull(m); + assertEquals("1st", m.getText()); + session.rollback(); + + m = (TextMessage)consumer.receive(2000); + assertNotNull(m); + assertEquals("1st", m.getText()); + session.rollback(); + + m = (TextMessage)consumer.receive(2000); + assertNotNull(m); + assertEquals("1st", m.getText()); + session.rollback(); + + m = (TextMessage)consumer.receive(2000); + assertNotNull(m); + assertEquals("1st", m.getText()); + session.rollback(); + + m = (TextMessage)consumer.receive(2000); + assertNotNull(m); + assertEquals("1st", m.getText()); + session.commit(); + + m = (TextMessage)consumer.receive(2000); + assertNotNull(m); + assertEquals("2nd", m.getText()); + session.commit(); + + } + + /** + * @throws Exception + */ + public void testMaximumRedeliveryDelay() throws Exception { + + // Receive a message with the JMS API + RedeliveryPolicy policy = connection.getRedeliveryPolicy(); + policy.setInitialRedeliveryDelay(10); + policy.setUseExponentialBackOff(true); + policy.setMaximumRedeliveries(-1); + policy.setRedeliveryDelay(50); + policy.setMaximumRedeliveryDelay(1000); + policy.setBackOffMultiplier((short) 2); + policy.setUseExponentialBackOff(true); + + connection.start(); + Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE); + ActiveMQQueue destination = new ActiveMQQueue("TEST"); + MessageProducer producer = session.createProducer(destination); + + MessageConsumer consumer = session.createConsumer(destination); + + // Send the messages + producer.send(session.createTextMessage("1st")); + producer.send(session.createTextMessage("2nd")); + session.commit(); + + TextMessage m; + + for(int i = 0; i < 10; ++i) { + // we should be able to get the 1st message redelivered until a session.commit is called + m = (TextMessage)consumer.receive(2000); + assertNotNull(m); + assertEquals("1st", m.getText()); + session.rollback(); + } + + m = (TextMessage)consumer.receive(2000); + assertNotNull(m); + assertEquals("1st", m.getText()); + session.commit(); + + m = (TextMessage)consumer.receive(2000); + assertNotNull(m); + assertEquals("2nd", m.getText()); + session.commit(); + + assertTrue(policy.getNextRedeliveryDelay(Long.MAX_VALUE) == 1000 ); + } + + /** + * @throws Exception + */ + public void testZeroMaximumNumberOfRedeliveries() throws Exception { + + // Receive a message with the JMS API + RedeliveryPolicy policy = connection.getRedeliveryPolicy(); + policy.setInitialRedeliveryDelay(100); + policy.setUseExponentialBackOff(false); + //let's set the maximum redeliveries to 0 + policy.setMaximumRedeliveries(0); + + connection.start(); + Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE); + ActiveMQQueue destination = new ActiveMQQueue("TEST"); + MessageProducer producer = session.createProducer(destination); + + MessageConsumer consumer = session.createConsumer(destination); + + // Send the messages + producer.send(session.createTextMessage("1st")); + producer.send(session.createTextMessage("2nd")); + session.commit(); + + TextMessage m; + m = (TextMessage)consumer.receive(1000); + assertNotNull(m); + assertEquals("1st", m.getText()); + session.rollback(); + + //the 1st message should not be redelivered since maximumRedeliveries is set to 0 + m = (TextMessage)consumer.receive(1000); + assertNotNull(m); + assertEquals("2nd", m.getText()); + session.commit(); + + + + } + + public void testRepeatedRedeliveryReceiveNoCommit() throws Exception { + + connection.start(); + Session dlqSession = connection.createSession(true, Session.SESSION_TRANSACTED); + ActiveMQQueue destination = new ActiveMQQueue("TEST"); + MessageProducer producer = dlqSession.createProducer(destination); + + // Send the messages + producer.send(dlqSession.createTextMessage("1st")); + + dlqSession.commit(); + MessageConsumer dlqConsumer = dlqSession.createConsumer(new ActiveMQQueue("ActiveMQ.DLQ")); + + final int maxRedeliveries = 4; + for (int i=0;i<=maxRedeliveries +1;i++) { + + connection = (ActiveMQConnection)factory.createConnection(userName, password); + connections.add(connection); + // Receive a message with the JMS API + RedeliveryPolicy policy = connection.getRedeliveryPolicy(); + policy.setInitialRedeliveryDelay(0); + policy.setUseExponentialBackOff(false); + policy.setMaximumRedeliveries(maxRedeliveries); + + connection.start(); + Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE); + MessageConsumer consumer = session.createConsumer(destination); + + ActiveMQTextMessage m = ((ActiveMQTextMessage)consumer.receive(4000)); + if (i<=maxRedeliveries) { + assertEquals("1st", m.getText()); + assertEquals(i, m.getRedeliveryCounter()); + } else { + assertNull("null on exceeding redelivery count", m); + } + connection.close(); + connections.remove(connection); + } + + // We should be able to get the message off the DLQ now. + TextMessage m = (TextMessage)dlqConsumer.receive(1000); + assertNotNull("Got message from DLQ", m); + assertEquals("1st", m.getText()); + String cause = m.getStringProperty(ActiveMQMessage.DLQ_DELIVERY_FAILURE_CAUSE_PROPERTY); + assertTrue("cause exception has policy ref", cause.contains("RedeliveryPolicy")); + dlqSession.commit(); + + } + + + public void testRepeatedRedeliveryOnMessageNoCommit() throws Exception { + + connection.start(); + Session dlqSession = connection.createSession(true, Session.SESSION_TRANSACTED); + ActiveMQQueue destination = new ActiveMQQueue("TEST"); + MessageProducer producer = dlqSession.createProducer(destination); + + // Send the messages + producer.send(dlqSession.createTextMessage("1st")); + + dlqSession.commit(); + MessageConsumer dlqConsumer = dlqSession.createConsumer(new ActiveMQQueue("ActiveMQ.DLQ")); + + final int maxRedeliveries = 4; + final AtomicInteger receivedCount = new AtomicInteger(0); + + for (int i=0;i<=maxRedeliveries+1;i++) { + + connection = (ActiveMQConnection)factory.createConnection(userName, password); + connections.add(connection); + + RedeliveryPolicy policy = connection.getRedeliveryPolicy(); + policy.setInitialRedeliveryDelay(0); + policy.setUseExponentialBackOff(false); + policy.setMaximumRedeliveries(maxRedeliveries); + + connection.start(); + final Session session = connection.createSession(true, Session.SESSION_TRANSACTED); + MessageConsumer consumer = session.createConsumer(destination); + final CountDownLatch done = new CountDownLatch(1); + + consumer.setMessageListener(new MessageListener(){ + @Override + public void onMessage(Message message) { + try { + ActiveMQTextMessage m = (ActiveMQTextMessage)message; + assertEquals("1st", m.getText()); + assertEquals(receivedCount.get(), m.getRedeliveryCounter()); + receivedCount.incrementAndGet(); + done.countDown(); + } catch (Exception ignored) { + ignored.printStackTrace(); + } + } + }); + + if (i<=maxRedeliveries) { + assertTrue("listener done", done.await(5, TimeUnit.SECONDS)); + } else { + // final redlivery gets poisoned before dispatch + assertFalse("listener done", done.await(1, TimeUnit.SECONDS)); + } + connection.close(); + connections.remove(connection); + } + + // We should be able to get the message off the DLQ now. + TextMessage m = (TextMessage)dlqConsumer.receive(1000); + assertNotNull("Got message from DLQ", m); + assertEquals("1st", m.getText()); + String cause = m.getStringProperty(ActiveMQMessage.DLQ_DELIVERY_FAILURE_CAUSE_PROPERTY); + assertTrue("cause exception has policy ref", cause.contains("RedeliveryPolicy")); + dlqSession.commit(); + + } + + public void testRepeatedRedeliveryServerSessionNoCommit() throws Exception { + + connection.start(); + Session dlqSession = connection.createSession(true, Session.SESSION_TRANSACTED); + ActiveMQQueue destination = new ActiveMQQueue("TEST"); + MessageProducer producer = dlqSession.createProducer(destination); + + // Send the messages + producer.send(dlqSession.createTextMessage("1st")); + + dlqSession.commit(); + MessageConsumer dlqConsumer = dlqSession.createConsumer(new ActiveMQQueue("ActiveMQ.DLQ")); + + final int maxRedeliveries = 4; + final AtomicInteger receivedCount = new AtomicInteger(0); + + for (int i=0;i<=maxRedeliveries+1;i++) { + + connection = (ActiveMQConnection)factory.createConnection(userName, password); + connections.add(connection); + + RedeliveryPolicy policy = connection.getRedeliveryPolicy(); + policy.setInitialRedeliveryDelay(0); + policy.setUseExponentialBackOff(false); + policy.setMaximumRedeliveries(maxRedeliveries); + + connection.start(); + final CountDownLatch done = new CountDownLatch(1); + + final ActiveMQSession session = (ActiveMQSession) connection.createSession(true, Session.SESSION_TRANSACTED); + session.setMessageListener(new MessageListener() { + @Override + public void onMessage(Message message) { + try { + ActiveMQTextMessage m = (ActiveMQTextMessage) message; + assertEquals("1st", m.getText()); + assertEquals(receivedCount.get(), m.getRedeliveryCounter()); + receivedCount.incrementAndGet(); + done.countDown(); + } catch (Exception ignored) { + ignored.printStackTrace(); + } + } + }); + + connection.createConnectionConsumer( + destination, + null, + new ServerSessionPool() { + @Override + public ServerSession getServerSession() throws JMSException { + return new ServerSession() { + @Override + public Session getSession() throws JMSException { + return session; + } + + @Override + public void start() throws JMSException { + } + }; + } + }, + 100, + false); + + Wait.waitFor(new Wait.Condition() { + @Override + public boolean isSatisified() throws Exception { + session.run(); + return done.await(10, TimeUnit.MILLISECONDS); + } + }); + + if (i<=maxRedeliveries) { + assertTrue("listener done @" + i, done.await(5, TimeUnit.SECONDS)); + } else { + // final redlivery gets poisoned before dispatch + assertFalse("listener not done @" + i, done.await(1, TimeUnit.SECONDS)); + } + connection.close(); + connections.remove(connection); + } + + // We should be able to get the message off the DLQ now. + TextMessage m = (TextMessage)dlqConsumer.receive(1000); + assertNotNull("Got message from DLQ", m); + assertEquals("1st", m.getText()); + String cause = m.getStringProperty(ActiveMQMessage.DLQ_DELIVERY_FAILURE_CAUSE_PROPERTY); + assertTrue("cause exception has policy ref", cause.contains("RedeliveryPolicy")); + dlqSession.commit(); + + } + + public void testInitialRedeliveryDelayZero() throws Exception { + + // Receive a message with the JMS API + RedeliveryPolicy policy = connection.getRedeliveryPolicy(); + policy.setInitialRedeliveryDelay(0); + policy.setUseExponentialBackOff(false); + policy.setMaximumRedeliveries(1); + + connection.start(); + Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE); + ActiveMQQueue destination = new ActiveMQQueue("TEST"); + MessageProducer producer = session.createProducer(destination); + + MessageConsumer consumer = session.createConsumer(destination); + + // Send the messages + producer.send(session.createTextMessage("1st")); + producer.send(session.createTextMessage("2nd")); + session.commit(); + + TextMessage m; + m = (TextMessage)consumer.receive(100); + assertNotNull(m); + assertEquals("1st", m.getText()); + session.rollback(); + + m = (TextMessage)consumer.receive(100); + assertNotNull(m); + assertEquals("1st", m.getText()); + + m = (TextMessage)consumer.receive(100); + assertNotNull(m); + assertEquals("2nd", m.getText()); + session.commit(); + + session.commit(); + } + + + public void testInitialRedeliveryDelayOne() throws Exception { + + // Receive a message with the JMS API + RedeliveryPolicy policy = connection.getRedeliveryPolicy(); + policy.setInitialRedeliveryDelay(1000); + policy.setUseExponentialBackOff(false); + policy.setMaximumRedeliveries(1); + + connection.start(); + Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE); + ActiveMQQueue destination = new ActiveMQQueue("TEST"); + MessageProducer producer = session.createProducer(destination); + + MessageConsumer consumer = session.createConsumer(destination); + + // Send the messages + producer.send(session.createTextMessage("1st")); + producer.send(session.createTextMessage("2nd")); + session.commit(); + + TextMessage m; + m = (TextMessage)consumer.receive(100); + assertNotNull(m); + assertEquals("1st", m.getText()); + session.rollback(); + + m = (TextMessage)consumer.receive(100); + assertNull(m); + + m = (TextMessage)consumer.receive(2000); + assertNotNull(m); + assertEquals("1st", m.getText()); + + m = (TextMessage)consumer.receive(100); + assertNotNull(m); + assertEquals("2nd", m.getText()); + session.commit(); + } + + public void testRedeliveryDelayOne() throws Exception { + + // Receive a message with the JMS API + RedeliveryPolicy policy = connection.getRedeliveryPolicy(); + policy.setInitialRedeliveryDelay(0); + policy.setRedeliveryDelay(1000); + policy.setUseExponentialBackOff(false); + policy.setMaximumRedeliveries(2); + + connection.start(); + Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE); + ActiveMQQueue destination = new ActiveMQQueue("TEST"); + MessageProducer producer = session.createProducer(destination); + + MessageConsumer consumer = session.createConsumer(destination); + + // Send the messages + producer.send(session.createTextMessage("1st")); + producer.send(session.createTextMessage("2nd")); + session.commit(); + + TextMessage m; + m = (TextMessage)consumer.receive(100); + assertNotNull(m); + assertEquals("1st", m.getText()); + session.rollback(); + + m = (TextMessage)consumer.receive(100); + assertNotNull("first immediate redelivery", m); + session.rollback(); + + m = (TextMessage)consumer.receive(100); + assertNull("second delivery delayed: " + m, m); + + m = (TextMessage)consumer.receive(2000); + assertNotNull(m); + assertEquals("1st", m.getText()); + + m = (TextMessage)consumer.receive(100); + assertNotNull(m); + assertEquals("2nd", m.getText()); + session.commit(); + } + + public void testRedeliveryPolicyPerDestination() throws Exception { + + RedeliveryPolicy queuePolicy = new RedeliveryPolicy(); + queuePolicy.setInitialRedeliveryDelay(0); + queuePolicy.setRedeliveryDelay(1000); + queuePolicy.setUseExponentialBackOff(false); + queuePolicy.setMaximumRedeliveries(2); + + RedeliveryPolicy topicPolicy = new RedeliveryPolicy(); + topicPolicy.setInitialRedeliveryDelay(0); + topicPolicy.setRedeliveryDelay(1000); + topicPolicy.setUseExponentialBackOff(false); + topicPolicy.setMaximumRedeliveries(3); + + // Receive a message with the JMS API + RedeliveryPolicyMap map = connection.getRedeliveryPolicyMap(); + map.put(new ActiveMQTopic(">"), topicPolicy); + map.put(new ActiveMQQueue(">"), queuePolicy); + + connection.start(); + Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE); + ActiveMQQueue queue = new ActiveMQQueue("TEST"); + ActiveMQTopic topic = new ActiveMQTopic("TEST"); + + MessageProducer producer = session.createProducer(null); + + MessageConsumer queueConsumer = session.createConsumer(queue); + MessageConsumer topicConsumer = session.createConsumer(topic); + + // Send the messages + producer.send(queue, session.createTextMessage("1st")); + producer.send(queue, session.createTextMessage("2nd")); + producer.send(topic, session.createTextMessage("1st")); + producer.send(topic, session.createTextMessage("2nd")); + + session.commit(); + + TextMessage m; + m = (TextMessage)queueConsumer.receive(100); + assertNotNull(m); + assertEquals("1st", m.getText()); + m = (TextMessage)topicConsumer.receive(100); + assertNotNull(m); + assertEquals("1st", m.getText()); + m = (TextMessage)queueConsumer.receive(100); + assertNotNull(m); + assertEquals("2nd", m.getText()); + m = (TextMessage)topicConsumer.receive(100); + assertNotNull(m); + assertEquals("2nd", m.getText()); + session.rollback(); + + m = (TextMessage)queueConsumer.receive(100); + assertNotNull("first immediate redelivery", m); + m = (TextMessage)topicConsumer.receive(100); + assertNotNull("first immediate redelivery", m); + session.rollback(); + + m = (TextMessage)queueConsumer.receive(100); + assertNull("second delivery delayed: " + m, m); + m = (TextMessage)topicConsumer.receive(100); + assertNull("second delivery delayed: " + m, m); + + m = (TextMessage)queueConsumer.receive(2000); + assertNotNull(m); + assertEquals("1st", m.getText()); + m = (TextMessage)topicConsumer.receive(2000); + assertNotNull(m); + assertEquals("1st", m.getText()); + + m = (TextMessage)queueConsumer.receive(100); + assertNotNull(m); + assertEquals("2nd", m.getText()); + m = (TextMessage)topicConsumer.receive(100); + assertNotNull(m); + assertEquals("2nd", m.getText()); + session.rollback(); + + m = (TextMessage)queueConsumer.receive(2000); + assertNotNull(m); + assertEquals("1st", m.getText()); + m = (TextMessage)topicConsumer.receive(2000); + assertNotNull(m); + assertEquals("1st", m.getText()); + + m = (TextMessage)queueConsumer.receive(100); + assertNotNull(m); + assertEquals("2nd", m.getText()); + m = (TextMessage)topicConsumer.receive(100); + assertNotNull(m); + assertEquals("2nd", m.getText()); + session.rollback(); + + // No third attempt for the Queue consumer + m = (TextMessage)queueConsumer.receive(2000); + assertNull(m); + m = (TextMessage)topicConsumer.receive(2000); + assertNotNull(m); + assertEquals("1st", m.getText()); + + m = (TextMessage)queueConsumer.receive(100); + assertNull(m); + m = (TextMessage)topicConsumer.receive(100); + assertNotNull(m); + assertEquals("2nd", m.getText()); + session.commit(); + } +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/RemoveDestinationTest.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/RemoveDestinationTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/RemoveDestinationTest.java new file mode 100644 index 0000000..009221b --- /dev/null +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/RemoveDestinationTest.java @@ -0,0 +1,165 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq; + +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +import java.net.URI; + +import javax.jms.Connection; +import javax.jms.ConnectionFactory; +import javax.jms.JMSException; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Session; +import javax.jms.TextMessage; +import javax.jms.Topic; +import javax.management.ObjectName; + +import org.apache.activemq.advisory.DestinationSource; +import org.apache.activemq.broker.BrokerFactory; +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.broker.jmx.DestinationViewMBean; +import org.apache.activemq.command.ActiveMQDestination; +import org.apache.activemq.command.ActiveMQTopic; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +public class RemoveDestinationTest { + + private static final String VM_BROKER_URL = "vm://localhost?create=false"; + private static final String BROKER_URL = "broker:vm://localhost?broker.persistent=false&broker.useJmx=true"; + + BrokerService broker; + + @Before + public void setUp() throws Exception { + broker = BrokerFactory.createBroker(new URI(BROKER_URL)); + broker.start(); + broker.waitUntilStarted(); + } + + @After + public void tearDown() throws Exception { + broker.stop(); + broker.waitUntilStopped(); + broker = null; + } + + private Connection createConnection(final boolean start) throws JMSException { + ConnectionFactory cf = new ActiveMQConnectionFactory(VM_BROKER_URL); + Connection conn = cf.createConnection(); + if (start) { + conn.start(); + } + return conn; + } + + @Test + public void testRemoveDestinationWithoutSubscriber() throws Exception { + + ActiveMQConnection amqConnection = (ActiveMQConnection) createConnection(true); + DestinationSource destinationSource = amqConnection.getDestinationSource(); + Session session = amqConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); + Topic topic = session.createTopic("TEST.FOO"); + MessageProducer producer = session.createProducer(topic); + MessageConsumer consumer = session.createConsumer(topic); + + TextMessage msg = session.createTextMessage("Hellow World"); + producer.send(msg); + assertNotNull(consumer.receive(5000)); + Thread.sleep(1000); + + ActiveMQTopic amqTopic = (ActiveMQTopic) topic; + assertTrue(destinationSource.getTopics().contains(amqTopic)); + + consumer.close(); + producer.close(); + session.close(); + + Thread.sleep(3000); + amqConnection.destroyDestination((ActiveMQDestination) topic); + Thread.sleep(3000); + assertFalse(destinationSource.getTopics().contains(amqTopic)); + } + + @Test + public void testRemoveDestinationWithSubscriber() throws Exception { + ActiveMQConnection amqConnection = (ActiveMQConnection) createConnection(true); + DestinationSource destinationSource = amqConnection.getDestinationSource(); + + Session session = amqConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); + Topic topic = session.createTopic("TEST.FOO"); + MessageProducer producer = session.createProducer(topic); + MessageConsumer consumer = session.createConsumer(topic); + + TextMessage msg = session.createTextMessage("Hellow World"); + producer.send(msg); + assertNotNull(consumer.receive(5000)); + Thread.sleep(1000); + + ActiveMQTopic amqTopic = (ActiveMQTopic) topic; + + assertTrue(destinationPresentInAdminView(broker, amqTopic)); + assertTrue(destinationSource.getTopics().contains(amqTopic)); + + // This line generates a broker error since the consumer is still active. + try { + amqConnection.destroyDestination((ActiveMQDestination) topic); + fail("expect exception on destroy if comsumer present"); + } catch (JMSException expected) { + assertTrue(expected.getMessage().indexOf(amqTopic.getTopicName()) != -1); + } + + Thread.sleep(3000); + + assertTrue(destinationSource.getTopics().contains(amqTopic)); + assertTrue(destinationPresentInAdminView(broker, amqTopic)); + + consumer.close(); + producer.close(); + session.close(); + + Thread.sleep(3000); + + // The destination will not be removed with this call, but if you remove + // the call above that generates the error it will. + amqConnection.destroyDestination(amqTopic); + Thread.sleep(3000); + assertFalse(destinationSource.getTopics().contains(amqTopic)); + assertFalse(destinationPresentInAdminView(broker, amqTopic)); + } + + private boolean destinationPresentInAdminView(BrokerService broker2, ActiveMQTopic amqTopic) throws Exception { + boolean found = false; + for (ObjectName name : broker.getAdminView().getTopics()) { + + DestinationViewMBean proxy = (DestinationViewMBean) + broker.getManagementContext().newProxyInstance(name, DestinationViewMBean.class, true); + + if (proxy.getName().equals(amqTopic.getPhysicalName())) { + found = true; + break; + } + } + return found; + } +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/SpringTestSupport.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/SpringTestSupport.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/SpringTestSupport.java new file mode 100644 index 0000000..f713120 --- /dev/null +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/SpringTestSupport.java @@ -0,0 +1,65 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.activemq; + +import java.util.Arrays; +import java.util.HashSet; +import java.util.Set; + +import junit.framework.TestCase; + +import org.springframework.context.support.AbstractApplicationContext; + +/** + * A useful base class for spring based unit test cases + * + * + */ +public abstract class SpringTestSupport extends TestCase { + + protected AbstractApplicationContext context; + + @Override + protected void setUp() throws Exception { + context = createApplicationContext(); + } + + protected abstract AbstractApplicationContext createApplicationContext();; + + @Override + protected void tearDown() throws Exception { + if (context != null) { + context.destroy(); + } + } + + protected Object getBean(String name) { + Object bean = context.getBean(name); + if (bean == null) { + fail("Should have found bean named '" + name + "' in the Spring ApplicationContext"); + } + return bean; + } + + protected void assertSetEquals(String description, Object[] expected, Set<?> actual) { + Set<Object> expectedSet = new HashSet<Object>(); + expectedSet.addAll(Arrays.asList(expected)); + assertEquals(description, expectedSet, actual); + } + +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/TestSupport.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/TestSupport.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/TestSupport.java new file mode 100644 index 0000000..a762f89 --- /dev/null +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/TestSupport.java @@ -0,0 +1,228 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq; + +import java.io.File; +import java.io.IOException; +import java.util.Map; + +import javax.jms.Connection; +import javax.jms.Destination; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.TextMessage; + +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.broker.region.DestinationStatistics; +import org.apache.activemq.broker.region.RegionBroker; +import org.apache.activemq.command.ActiveMQDestination; +import org.apache.activemq.command.ActiveMQMessage; +import org.apache.activemq.command.ActiveMQQueue; +import org.apache.activemq.command.ActiveMQTopic; +import org.apache.activemq.store.PersistenceAdapter; +import org.apache.activemq.store.jdbc.JDBCPersistenceAdapter; +import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter; +import org.apache.activemq.store.leveldb.LevelDBPersistenceAdapter; +import org.apache.activemq.store.memory.MemoryPersistenceAdapter; + +/** + * Useful base class for unit test cases + * + * + */ +public abstract class TestSupport extends CombinationTestSupport { + + protected ActiveMQConnectionFactory connectionFactory; + protected boolean topic = true; + public PersistenceAdapterChoice defaultPersistenceAdapter = PersistenceAdapterChoice.KahaDB; + + protected ActiveMQMessage createMessage() { + return new ActiveMQMessage(); + } + + protected Destination createDestination(String subject) { + if (topic) { + return new ActiveMQTopic(subject); + } else { + return new ActiveMQQueue(subject); + } + } + + protected Destination createDestination() { + return createDestination(getDestinationString()); + } + + /** + * Returns the name of the destination used in this test case + */ + protected String getDestinationString() { + return getClass().getName() + "." + getName(true); + } + + /** + * @param messsage + * @param firstSet + * @param secondSet + */ + protected void assertTextMessagesEqual(String messsage, Message[] firstSet, Message[] secondSet) + throws JMSException { + assertEquals("Message count does not match: " + messsage, firstSet.length, secondSet.length); + for (int i = 0; i < secondSet.length; i++) { + TextMessage m1 = (TextMessage)firstSet[i]; + TextMessage m2 = (TextMessage)secondSet[i]; + assertFalse("Message " + (i + 1) + " did not match : " + messsage + ": expected {" + m1 + + "}, but was {" + m2 + "}", m1 == null ^ m2 == null); + assertEquals("Message " + (i + 1) + " did not match: " + messsage + ": expected {" + m1 + + "}, but was {" + m2 + "}", m1.getText(), m2.getText()); + } + } + + protected ActiveMQConnectionFactory createConnectionFactory() throws Exception { + return new ActiveMQConnectionFactory("vm://localhost?broker.persistent=false"); + } + + /** + * Factory method to create a new connection + */ + protected Connection createConnection() throws Exception { + return getConnectionFactory().createConnection(); + } + + public ActiveMQConnectionFactory getConnectionFactory() throws Exception { + if (connectionFactory == null) { + connectionFactory = createConnectionFactory(); + assertTrue("Should have created a connection factory!", connectionFactory != null); + } + return connectionFactory; + } + + protected String getConsumerSubject() { + return getSubject(); + } + + protected String getProducerSubject() { + return getSubject(); + } + + protected String getSubject() { + return getName(); + } + + public static void recursiveDelete(File f) { + if (f.isDirectory()) { + File[] files = f.listFiles(); + for (int i = 0; i < files.length; i++) { + recursiveDelete(files[i]); + } + } + f.delete(); + } + + public static void removeMessageStore() { + if (System.getProperty("activemq.store.dir") != null) { + recursiveDelete(new File(System.getProperty("activemq.store.dir"))); + } + if (System.getProperty("derby.system.home") != null) { + recursiveDelete(new File(System.getProperty("derby.system.home"))); + } + } + + public static DestinationStatistics getDestinationStatistics(BrokerService broker, ActiveMQDestination destination) { + DestinationStatistics result = null; + org.apache.activemq.broker.region.Destination dest = getDestination(broker, destination); + if (dest != null) { + result = dest.getDestinationStatistics(); + } + return result; + } + + public static org.apache.activemq.broker.region.Destination getDestination(BrokerService target, ActiveMQDestination destination) { + org.apache.activemq.broker.region.Destination result = null; + for (org.apache.activemq.broker.region.Destination dest : getDestinationMap(target, destination).values()) { + if (dest.getName().equals(destination.getPhysicalName())) { + result = dest; + break; + } + } + return result; + } + + private static Map<ActiveMQDestination, org.apache.activemq.broker.region.Destination> getDestinationMap(BrokerService target, + ActiveMQDestination destination) { + RegionBroker regionBroker = (RegionBroker) target.getRegionBroker(); + if (destination.isTemporary()) { + return destination.isQueue() ? regionBroker.getTempQueueRegion().getDestinationMap() : + regionBroker.getTempTopicRegion().getDestinationMap(); + } + return destination.isQueue() ? + regionBroker.getQueueRegion().getDestinationMap() : + regionBroker.getTopicRegion().getDestinationMap(); + } + + public static enum PersistenceAdapterChoice {LevelDB, KahaDB, AMQ, JDBC, MEM }; + + public PersistenceAdapter setDefaultPersistenceAdapter(BrokerService broker) throws IOException { + return setPersistenceAdapter(broker, defaultPersistenceAdapter); + } + + public static PersistenceAdapter setPersistenceAdapter(BrokerService broker, PersistenceAdapterChoice choice) throws IOException { + PersistenceAdapter adapter = null; + switch (choice) { + case JDBC: + JDBCPersistenceAdapter jdbcPersistenceAdapter = new JDBCPersistenceAdapter(); + jdbcPersistenceAdapter.setUseLock(false); // rollback (at shutdown) on derby can take a long time with file io etc + adapter = jdbcPersistenceAdapter; + break; + case KahaDB: + adapter = new KahaDBPersistenceAdapter(); + break; + case LevelDB: + adapter = new LevelDBPersistenceAdapter(); + break; + case MEM: + adapter = new MemoryPersistenceAdapter(); + break; + } + broker.setPersistenceAdapter(adapter); + adapter.setDirectory(new File(broker.getBrokerDataDirectory(), choice.name())); + return adapter; + } + + public void stopBrokerWithStoreFailure(BrokerService broker, PersistenceAdapterChoice choice) throws Exception { + switch (choice) { + case KahaDB: + KahaDBPersistenceAdapter kahaDBPersistenceAdapter = (KahaDBPersistenceAdapter) broker.getPersistenceAdapter(); + + // have the broker stop with an IOException on next checkpoint so it has a pending local transaction to recover + kahaDBPersistenceAdapter.getStore().getJournal().close(); + break; + default: + // just stop normally by default + broker.stop(); + } + broker.waitUntilStopped(); + } + + + /** + * Test if base directory contains spaces + */ + protected void assertBaseDirectoryContainsSpaces() { + assertFalse("Base directory cannot contain spaces.", new File(System.getProperty("basedir", ".")).getAbsoluteFile().toString().contains(" ")); + } + +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/TimeStampTest.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/TimeStampTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/TimeStampTest.java new file mode 100644 index 0000000..2ec57b5 --- /dev/null +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/TimeStampTest.java @@ -0,0 +1,97 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq; + +import javax.jms.Connection; +import javax.jms.DeliveryMode; +import javax.jms.Destination; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Session; + +import junit.framework.TestCase; +import org.apache.activemq.broker.BrokerPlugin; +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.broker.TransportConnector; +import org.apache.activemq.broker.util.UDPTraceBrokerPlugin; +import org.apache.activemq.broker.view.ConnectionDotFilePlugin; + +public class TimeStampTest extends TestCase { + public void test() throws Exception { + BrokerService broker = new BrokerService(); + broker.setPersistent(false); + broker.setUseJmx(true); + broker.setPlugins(new BrokerPlugin[] {new ConnectionDotFilePlugin(), new UDPTraceBrokerPlugin()}); + TransportConnector tcpConnector = broker.addConnector("tcp://localhost:0"); + broker.addConnector("stomp://localhost:0"); + broker.start(); + + // Create a ConnectionFactory + ActiveMQConnectionFactory connectionFactory = + new ActiveMQConnectionFactory(tcpConnector.getConnectUri()); + + // Create a Connection + Connection connection = connectionFactory.createConnection(); + connection.start(); + + // Create a Session + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + + // Create the destination Queue + Destination destination = session.createQueue("TEST.FOO"); + + // Create a MessageProducer from the Session to the Topic or Queue + MessageProducer producer = session.createProducer(destination); + producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); + + // Create a messages + Message sentMessage = session.createMessage(); + + // Tell the producer to send the message + long beforeSend = System.currentTimeMillis(); + producer.send(sentMessage); + long afterSend = System.currentTimeMillis(); + + // assert message timestamp is in window + assertTrue(beforeSend <= sentMessage.getJMSTimestamp() && sentMessage.getJMSTimestamp() <= afterSend); + + // Create a MessageConsumer from the Session to the Topic or Queue + MessageConsumer consumer = session.createConsumer(destination); + + // Wait for a message + Message receivedMessage = consumer.receive(1000); + + // assert we got the same message ID we sent + assertEquals(sentMessage.getJMSMessageID(), receivedMessage.getJMSMessageID()); + + // assert message timestamp is in window + assertTrue("JMS Message Timestamp should be set during the send method: \n" + " beforeSend = " + beforeSend + "\n" + " getJMSTimestamp = " + + receivedMessage.getJMSTimestamp() + "\n" + " afterSend = " + afterSend + "\n", beforeSend <= receivedMessage.getJMSTimestamp() + && receivedMessage.getJMSTimestamp() <= afterSend); + + // assert message timestamp is unchanged + assertEquals("JMS Message Timestamp of recieved message should be the same as the sent message\n ", sentMessage.getJMSTimestamp(), receivedMessage.getJMSTimestamp()); + + // Clean up + producer.close(); + consumer.close(); + session.close(); + connection.close(); + broker.stop(); + } +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/TransactionContextTest.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/TransactionContextTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/TransactionContextTest.java new file mode 100644 index 0000000..5e45d52 --- /dev/null +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/TransactionContextTest.java @@ -0,0 +1,140 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.fail; + +import java.util.concurrent.atomic.AtomicInteger; + +import javax.jms.TransactionRolledBackException; + +import org.apache.activemq.transaction.Synchronization; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +public class TransactionContextTest { + + TransactionContext underTest; + ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("vm://localhost"); + ActiveMQConnection connection; + + + @Before + public void setup() throws Exception { + connection = factory.createActiveMQConnection(); + underTest = new TransactionContext(connection); + } + + @After + public void tearDown() throws Exception { + connection.close(); + } + + @Test + public void testSyncBeforeEndCalledOnceOnRollback() throws Exception { + final AtomicInteger beforeEndCountA = new AtomicInteger(0); + final AtomicInteger beforeEndCountB = new AtomicInteger(0); + final AtomicInteger rollbackCountA = new AtomicInteger(0); + final AtomicInteger rollbackCountB = new AtomicInteger(0); + underTest.addSynchronization(new Synchronization() { + @Override + public void beforeEnd() throws Exception { + if (beforeEndCountA.getAndIncrement() == 0) { + throw new TransactionRolledBackException("force rollback"); + } + } + + @Override + public void afterCommit() throws Exception { + fail("exepcted rollback exception"); + } + + @Override + public void afterRollback() throws Exception { + rollbackCountA.incrementAndGet(); + } + + }); + + underTest.addSynchronization(new Synchronization() { + @Override + public void beforeEnd() throws Exception { + beforeEndCountB.getAndIncrement(); + } + + @Override + public void afterCommit() throws Exception { + fail("exepcted rollback exception"); + } + + @Override + public void afterRollback() throws Exception { + rollbackCountB.incrementAndGet(); + } + + }); + + + try { + underTest.commit(); + fail("exepcted rollback exception"); + } catch (TransactionRolledBackException expected) { + } + + assertEquals("beforeEnd A called once", 1, beforeEndCountA.get()); + assertEquals("beforeEnd B called once", 1, beforeEndCountA.get()); + assertEquals("rollbackCount B 0", 1, rollbackCountB.get()); + assertEquals("rollbackCount A B", rollbackCountA.get(), rollbackCountB.get()); + } + + @Test + public void testSyncIndexCleared() throws Exception { + final AtomicInteger beforeEndCountA = new AtomicInteger(0); + final AtomicInteger rollbackCountA = new AtomicInteger(0); + Synchronization sync = new Synchronization() { + @Override + public void beforeEnd() throws Exception { + beforeEndCountA.getAndIncrement(); + } + @Override + public void afterCommit() throws Exception { + fail("exepcted rollback exception"); + } + @Override + public void afterRollback() throws Exception { + rollbackCountA.incrementAndGet(); + } + }; + + underTest.begin(); + underTest.addSynchronization(sync); + underTest.rollback(); + + assertEquals("beforeEnd", 1, beforeEndCountA.get()); + assertEquals("rollback", 1, rollbackCountA.get()); + + // do it again + underTest.begin(); + underTest.addSynchronization(sync); + underTest.rollback(); + + assertEquals("beforeEnd", 2, beforeEndCountA.get()); + assertEquals("rollback", 2, rollbackCountA.get()); + } +}
