http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/ExclusiveConsumerTest.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/ExclusiveConsumerTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/ExclusiveConsumerTest.java new file mode 100644 index 0000000..53ef28f --- /dev/null +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/ExclusiveConsumerTest.java @@ -0,0 +1,357 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq; + +import javax.jms.Connection; +import javax.jms.ConnectionFactory; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Session; + +import junit.framework.TestCase; + +import org.apache.activemq.command.ActiveMQQueue; + +public class ExclusiveConsumerTest extends TestCase { + + private static final String VM_BROKER_URL = "vm://localhost?broker.persistent=false&broker.useJmx=true"; + + public ExclusiveConsumerTest(String name) { + super(name); + } + + @Override + protected void setUp() throws Exception { + super.setUp(); + } + + @Override + protected void tearDown() throws Exception { + super.tearDown(); + } + + private Connection createConnection(final boolean start) throws JMSException { + ConnectionFactory cf = new ActiveMQConnectionFactory(VM_BROKER_URL); + Connection conn = cf.createConnection(); + if (start) { + conn.start(); + } + return conn; + } + + public void testExclusiveConsumerSelectedCreatedFirst() throws JMSException, InterruptedException { + Connection conn = createConnection(true); + + Session exclusiveSession = null; + Session fallbackSession = null; + Session senderSession = null; + + try { + + exclusiveSession = conn.createSession(false, Session.AUTO_ACKNOWLEDGE); + fallbackSession = conn.createSession(false, Session.AUTO_ACKNOWLEDGE); + senderSession = conn.createSession(false, Session.AUTO_ACKNOWLEDGE); + + ActiveMQQueue exclusiveQueue = new ActiveMQQueue("TEST.QUEUE1?consumer.exclusive=true"); + MessageConsumer exclusiveConsumer = exclusiveSession.createConsumer(exclusiveQueue); + + ActiveMQQueue fallbackQueue = new ActiveMQQueue("TEST.QUEUE1"); + MessageConsumer fallbackConsumer = fallbackSession.createConsumer(fallbackQueue); + + ActiveMQQueue senderQueue = new ActiveMQQueue("TEST.QUEUE1"); + + MessageProducer producer = senderSession.createProducer(senderQueue); + + Message msg = senderSession.createTextMessage("test"); + producer.send(msg); + // TODO need two send a 2nd message - bug AMQ-1024 + // producer.send(msg); + Thread.sleep(100); + + // Verify exclusive consumer receives the message. + assertNotNull(exclusiveConsumer.receive(100)); + assertNull(fallbackConsumer.receive(100)); + + } finally { + fallbackSession.close(); + senderSession.close(); + conn.close(); + } + + } + + public void testExclusiveConsumerSelectedCreatedAfter() throws JMSException, InterruptedException { + Connection conn = createConnection(true); + + Session exclusiveSession = null; + Session fallbackSession = null; + Session senderSession = null; + + try { + + exclusiveSession = conn.createSession(false, Session.AUTO_ACKNOWLEDGE); + fallbackSession = conn.createSession(false, Session.AUTO_ACKNOWLEDGE); + senderSession = conn.createSession(false, Session.AUTO_ACKNOWLEDGE); + + ActiveMQQueue fallbackQueue = new ActiveMQQueue("TEST.QUEUE5"); + MessageConsumer fallbackConsumer = fallbackSession.createConsumer(fallbackQueue); + + ActiveMQQueue exclusiveQueue = new ActiveMQQueue("TEST.QUEUE5?consumer.exclusive=true"); + MessageConsumer exclusiveConsumer = exclusiveSession.createConsumer(exclusiveQueue); + + ActiveMQQueue senderQueue = new ActiveMQQueue("TEST.QUEUE5"); + + MessageProducer producer = senderSession.createProducer(senderQueue); + + Message msg = senderSession.createTextMessage("test"); + producer.send(msg); + Thread.sleep(100); + + // Verify exclusive consumer receives the message. + assertNotNull(exclusiveConsumer.receive(100)); + assertNull(fallbackConsumer.receive(100)); + + } finally { + fallbackSession.close(); + senderSession.close(); + conn.close(); + } + + } + + public void testFailoverToAnotherExclusiveConsumerCreatedFirst() throws JMSException, + InterruptedException { + Connection conn = createConnection(true); + + Session exclusiveSession1 = null; + Session exclusiveSession2 = null; + Session fallbackSession = null; + Session senderSession = null; + + try { + + exclusiveSession1 = conn.createSession(false, Session.AUTO_ACKNOWLEDGE); + exclusiveSession2 = conn.createSession(false, Session.AUTO_ACKNOWLEDGE); + fallbackSession = conn.createSession(false, Session.AUTO_ACKNOWLEDGE); + senderSession = conn.createSession(false, Session.AUTO_ACKNOWLEDGE); + + // This creates the exclusive consumer first which avoids AMQ-1024 + // bug. + ActiveMQQueue exclusiveQueue = new ActiveMQQueue("TEST.QUEUE2?consumer.exclusive=true"); + MessageConsumer exclusiveConsumer1 = exclusiveSession1.createConsumer(exclusiveQueue); + MessageConsumer exclusiveConsumer2 = exclusiveSession2.createConsumer(exclusiveQueue); + + ActiveMQQueue fallbackQueue = new ActiveMQQueue("TEST.QUEUE2"); + MessageConsumer fallbackConsumer = fallbackSession.createConsumer(fallbackQueue); + + ActiveMQQueue senderQueue = new ActiveMQQueue("TEST.QUEUE2"); + + MessageProducer producer = senderSession.createProducer(senderQueue); + + Message msg = senderSession.createTextMessage("test"); + producer.send(msg); + Thread.sleep(100); + + // Verify exclusive consumer receives the message. + assertNotNull(exclusiveConsumer1.receive(100)); + assertNull(exclusiveConsumer2.receive(100)); + assertNull(fallbackConsumer.receive(100)); + + // Close the exclusive consumer to verify the non-exclusive consumer + // takes over + exclusiveConsumer1.close(); + + producer.send(msg); + producer.send(msg); + + assertNotNull(exclusiveConsumer2.receive(100)); + assertNull(fallbackConsumer.receive(100)); + + } finally { + fallbackSession.close(); + senderSession.close(); + conn.close(); + } + + } + + public void testFailoverToAnotherExclusiveConsumerCreatedAfter() throws JMSException, + InterruptedException { + Connection conn = createConnection(true); + + Session exclusiveSession1 = null; + Session exclusiveSession2 = null; + Session fallbackSession = null; + Session senderSession = null; + + try { + + exclusiveSession1 = conn.createSession(false, Session.AUTO_ACKNOWLEDGE); + exclusiveSession2 = conn.createSession(false, Session.AUTO_ACKNOWLEDGE); + fallbackSession = conn.createSession(false, Session.AUTO_ACKNOWLEDGE); + senderSession = conn.createSession(false, Session.AUTO_ACKNOWLEDGE); + + // This creates the exclusive consumer first which avoids AMQ-1024 + // bug. + ActiveMQQueue exclusiveQueue = new ActiveMQQueue("TEST.QUEUE6?consumer.exclusive=true"); + MessageConsumer exclusiveConsumer1 = exclusiveSession1.createConsumer(exclusiveQueue); + + ActiveMQQueue fallbackQueue = new ActiveMQQueue("TEST.QUEUE6"); + MessageConsumer fallbackConsumer = fallbackSession.createConsumer(fallbackQueue); + + MessageConsumer exclusiveConsumer2 = exclusiveSession2.createConsumer(exclusiveQueue); + + ActiveMQQueue senderQueue = new ActiveMQQueue("TEST.QUEUE6"); + + MessageProducer producer = senderSession.createProducer(senderQueue); + + Message msg = senderSession.createTextMessage("test"); + producer.send(msg); + Thread.sleep(100); + + // Verify exclusive consumer receives the message. + assertNotNull(exclusiveConsumer1.receive(100)); + assertNull(exclusiveConsumer2.receive(100)); + assertNull(fallbackConsumer.receive(100)); + + // Close the exclusive consumer to verify the non-exclusive consumer + // takes over + exclusiveConsumer1.close(); + + producer.send(msg); + producer.send(msg); + + assertNotNull(exclusiveConsumer2.receive(1000)); + assertNull(fallbackConsumer.receive(100)); + + } finally { + fallbackSession.close(); + senderSession.close(); + conn.close(); + } + + } + + public void testFailoverToNonExclusiveConsumer() throws JMSException, InterruptedException { + Connection conn = createConnection(true); + + Session exclusiveSession = null; + Session fallbackSession = null; + Session senderSession = null; + + try { + + exclusiveSession = conn.createSession(false, Session.AUTO_ACKNOWLEDGE); + fallbackSession = conn.createSession(false, Session.AUTO_ACKNOWLEDGE); + senderSession = conn.createSession(false, Session.AUTO_ACKNOWLEDGE); + + // This creates the exclusive consumer first which avoids AMQ-1024 + // bug. + ActiveMQQueue exclusiveQueue = new ActiveMQQueue("TEST.QUEUE3?consumer.exclusive=true"); + MessageConsumer exclusiveConsumer = exclusiveSession.createConsumer(exclusiveQueue); + + ActiveMQQueue fallbackQueue = new ActiveMQQueue("TEST.QUEUE3"); + MessageConsumer fallbackConsumer = fallbackSession.createConsumer(fallbackQueue); + + ActiveMQQueue senderQueue = new ActiveMQQueue("TEST.QUEUE3"); + + MessageProducer producer = senderSession.createProducer(senderQueue); + + Message msg = senderSession.createTextMessage("test"); + producer.send(msg); + Thread.sleep(100); + + // Verify exclusive consumer receives the message. + assertNotNull(exclusiveConsumer.receive(100)); + assertNull(fallbackConsumer.receive(100)); + + // Close the exclusive consumer to verify the non-exclusive consumer + // takes over + exclusiveConsumer.close(); + + producer.send(msg); + + assertNotNull(fallbackConsumer.receive(100)); + + } finally { + fallbackSession.close(); + senderSession.close(); + conn.close(); + } + + } + + public void testFallbackToExclusiveConsumer() throws JMSException, InterruptedException { + Connection conn = createConnection(true); + + Session exclusiveSession = null; + Session fallbackSession = null; + Session senderSession = null; + + try { + + exclusiveSession = conn.createSession(false, Session.AUTO_ACKNOWLEDGE); + fallbackSession = conn.createSession(false, Session.AUTO_ACKNOWLEDGE); + senderSession = conn.createSession(false, Session.AUTO_ACKNOWLEDGE); + + // This creates the exclusive consumer first which avoids AMQ-1024 + // bug. + ActiveMQQueue exclusiveQueue = new ActiveMQQueue("TEST.QUEUE4?consumer.exclusive=true"); + MessageConsumer exclusiveConsumer = exclusiveSession.createConsumer(exclusiveQueue); + + ActiveMQQueue fallbackQueue = new ActiveMQQueue("TEST.QUEUE4"); + MessageConsumer fallbackConsumer = fallbackSession.createConsumer(fallbackQueue); + + ActiveMQQueue senderQueue = new ActiveMQQueue("TEST.QUEUE4"); + + MessageProducer producer = senderSession.createProducer(senderQueue); + + Message msg = senderSession.createTextMessage("test"); + producer.send(msg); + Thread.sleep(100); + + // Verify exclusive consumer receives the message. + assertNotNull(exclusiveConsumer.receive(100)); + assertNull(fallbackConsumer.receive(100)); + + // Close the exclusive consumer to verify the non-exclusive consumer + // takes over + exclusiveConsumer.close(); + + producer.send(msg); + + // Verify other non-exclusive consumer receices the message. + assertNotNull(fallbackConsumer.receive(100)); + + // Create exclusive consumer to determine if it will start receiving + // the messages. + exclusiveConsumer = exclusiveSession.createConsumer(exclusiveQueue); + + producer.send(msg); + assertNotNull(exclusiveConsumer.receive(100)); + assertNull(fallbackConsumer.receive(100)); + + } finally { + fallbackSession.close(); + senderSession.close(); + conn.close(); + } + + } +}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/ExpiryHogTest.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/ExpiryHogTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/ExpiryHogTest.java new file mode 100644 index 0000000..eff6efa --- /dev/null +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/ExpiryHogTest.java @@ -0,0 +1,82 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq; + +import java.util.concurrent.TimeUnit; +import javax.jms.ConnectionFactory; +import javax.jms.Session; +import javax.jms.TextMessage; +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.broker.region.policy.PolicyEntry; +import org.apache.activemq.broker.region.policy.PolicyMap; +import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.BlockJUnit4ClassRunner; + +/** + * User: gtully + */ +@RunWith(BlockJUnit4ClassRunner.class) +public class ExpiryHogTest extends JmsMultipleClientsTestSupport { + boolean sleep = false; + + int numMessages = 4; + + @Test(timeout = 2 * 60 * 1000) + public void testImmediateDispatchWhenCacheDisabled() throws Exception { + ConnectionFactory f = createConnectionFactory(); + destination = createDestination(); + startConsumers(f, destination); + sleep = true; + this.startProducers(f, destination, numMessages); + allMessagesList.assertMessagesReceived(numMessages); + } + + protected BrokerService createBroker() throws Exception { + BrokerService bs = new BrokerService(); + bs.setDeleteAllMessagesOnStartup(true); + PolicyMap policyMap = new PolicyMap(); + PolicyEntry defaultEntry = new PolicyEntry(); + defaultEntry.setExpireMessagesPeriod(5000); + defaultEntry.setUseCache(false); + policyMap.setDefaultEntry(defaultEntry); + bs.setDestinationPolicy(policyMap); + + KahaDBPersistenceAdapter ad = (KahaDBPersistenceAdapter) bs.getPersistenceAdapter(); + ad.setConcurrentStoreAndDispatchQueues(true); + return bs; + } + + protected TextMessage createTextMessage(Session session, String initText) throws Exception { + if (sleep) { + TimeUnit.SECONDS.sleep(10); + } + TextMessage msg = super.createTextMessage(session, initText); + msg.setJMSExpiration(4000); + return msg; + } + + @Override + @Before + public void setUp() throws Exception { + autoFail = false; + persistent = true; + super.setUp(); + } +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JMSConsumerTest.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JMSConsumerTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JMSConsumerTest.java new file mode 100644 index 0000000..c793dc8 --- /dev/null +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JMSConsumerTest.java @@ -0,0 +1,937 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq; + +import java.lang.Thread.UncaughtExceptionHandler; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +import javax.jms.BytesMessage; +import javax.jms.DeliveryMode; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageListener; +import javax.jms.MessageProducer; +import javax.jms.Session; +import javax.jms.TextMessage; +import javax.jms.Topic; +import javax.management.ObjectName; + +import junit.framework.Test; + +import org.apache.activemq.broker.jmx.DestinationViewMBean; +import org.apache.activemq.command.ActiveMQDestination; +import org.apache.activemq.command.ActiveMQQueue; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Test cases used to test the JMS message consumer. + * + * + */ +public class JMSConsumerTest extends JmsTestSupport { + + private static final Logger LOG = LoggerFactory.getLogger(JMSConsumerTest.class); + + public ActiveMQDestination destination; + public int deliveryMode; + public int prefetch; + public int ackMode; + public byte destinationType; + public boolean durableConsumer; + + public static Test suite() { + return suite(JMSConsumerTest.class); + } + + public static void main(String[] args) { + junit.textui.TestRunner.run(suite()); + } + + public void initCombosForTestMessageListenerWithConsumerCanBeStopped() { + addCombinationValues("deliveryMode", new Object[] {Integer.valueOf(DeliveryMode.NON_PERSISTENT), Integer.valueOf(DeliveryMode.PERSISTENT)}); + addCombinationValues("destinationType", new Object[] {Byte.valueOf(ActiveMQDestination.QUEUE_TYPE), Byte.valueOf(ActiveMQDestination.TOPIC_TYPE), + Byte.valueOf(ActiveMQDestination.TEMP_QUEUE_TYPE), Byte.valueOf(ActiveMQDestination.TEMP_TOPIC_TYPE)}); + } + + public void testMessageListenerWithConsumerCanBeStopped() throws Exception { + + final AtomicInteger counter = new AtomicInteger(0); + final CountDownLatch done1 = new CountDownLatch(1); + final CountDownLatch done2 = new CountDownLatch(1); + + // Receive a message with the JMS API + connection.start(); + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + destination = createDestination(session, destinationType); + ActiveMQMessageConsumer consumer = (ActiveMQMessageConsumer)session.createConsumer(destination); + consumer.setMessageListener(new MessageListener() { + @Override + public void onMessage(Message m) { + counter.incrementAndGet(); + if (counter.get() == 1) { + done1.countDown(); + } + if (counter.get() == 2) { + done2.countDown(); + } + } + }); + + // Send a first message to make sure that the consumer dispatcher is + // running + sendMessages(session, destination, 1); + assertTrue(done1.await(1, TimeUnit.SECONDS)); + assertEquals(1, counter.get()); + + // Stop the consumer. + consumer.stop(); + + // Send a message, but should not get delivered. + sendMessages(session, destination, 1); + assertFalse(done2.await(1, TimeUnit.SECONDS)); + assertEquals(1, counter.get()); + + // Start the consumer, and the message should now get delivered. + consumer.start(); + assertTrue(done2.await(1, TimeUnit.SECONDS)); + assertEquals(2, counter.get()); + } + + + public void testMessageListenerWithConsumerCanBeStoppedConcurently() throws Exception { + + final AtomicInteger counter = new AtomicInteger(0); + final CountDownLatch closeDone = new CountDownLatch(1); + + connection.start(); + Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE); + destination = createDestination(session, ActiveMQDestination.QUEUE_TYPE); + + // preload the queue + sendMessages(session, destination, 2000); + + + final ActiveMQMessageConsumer consumer = (ActiveMQMessageConsumer)session.createConsumer(destination); + + final Map<Thread, Throwable> exceptions = + Collections.synchronizedMap(new HashMap<Thread, Throwable>()); + Thread.setDefaultUncaughtExceptionHandler(new UncaughtExceptionHandler() { + @Override + public void uncaughtException(Thread t, Throwable e) { + LOG.error("Uncaught exception:", e); + exceptions.put(t, e); + } + }); + + final class AckAndClose implements Runnable { + private final Message message; + + public AckAndClose(Message m) { + this.message = m; + } + + @Override + public void run() { + try { + int count = counter.incrementAndGet(); + if (count == 590) { + // close in a separate thread is ok by jms + consumer.close(); + closeDone.countDown(); + } + if (count % 200 == 0) { + // ensure there are some outstanding messages + // ack every 200 + message.acknowledge(); + } + } catch (Exception e) { + LOG.error("Exception on close or ack:", e); + exceptions.put(Thread.currentThread(), e); + } + } + }; + + final ExecutorService executor = Executors.newCachedThreadPool(); + consumer.setMessageListener(new MessageListener() { + @Override + public void onMessage(Message m) { + // ack and close eventually in separate thread + executor.execute(new AckAndClose(m)); + } + }); + + assertTrue(closeDone.await(20, TimeUnit.SECONDS)); + // await possible exceptions + Thread.sleep(1000); + assertTrue("no exceptions: " + exceptions, exceptions.isEmpty()); + } + + + public void initCombosForTestMutiReceiveWithPrefetch1() { + addCombinationValues("deliveryMode", new Object[] {Integer.valueOf(DeliveryMode.NON_PERSISTENT), Integer.valueOf(DeliveryMode.PERSISTENT)}); + addCombinationValues("ackMode", new Object[] {Integer.valueOf(Session.AUTO_ACKNOWLEDGE), Integer.valueOf(Session.DUPS_OK_ACKNOWLEDGE), + Integer.valueOf(Session.CLIENT_ACKNOWLEDGE)}); + addCombinationValues("destinationType", new Object[] {Byte.valueOf(ActiveMQDestination.QUEUE_TYPE), Byte.valueOf(ActiveMQDestination.TOPIC_TYPE), + Byte.valueOf(ActiveMQDestination.TEMP_QUEUE_TYPE), Byte.valueOf(ActiveMQDestination.TEMP_TOPIC_TYPE)}); + } + + public void testMutiReceiveWithPrefetch1() throws Exception { + + // Set prefetch to 1 + connection.getPrefetchPolicy().setAll(1); + connection.start(); + + // Use all the ack modes + Session session = connection.createSession(false, ackMode); + destination = createDestination(session, destinationType); + MessageConsumer consumer = session.createConsumer(destination); + + // Send the messages + sendMessages(session, destination, 4); + + // Make sure 4 messages were delivered. + Message message = null; + for (int i = 0; i < 4; i++) { + message = consumer.receive(1000); + assertNotNull(message); + } + assertNull(consumer.receiveNoWait()); + message.acknowledge(); + } + + public void initCombosForTestDurableConsumerSelectorChange() { + addCombinationValues("deliveryMode", new Object[] {Integer.valueOf(DeliveryMode.NON_PERSISTENT), Integer.valueOf(DeliveryMode.PERSISTENT)}); + addCombinationValues("destinationType", new Object[] {Byte.valueOf(ActiveMQDestination.TOPIC_TYPE)}); + } + + public void testDurableConsumerSelectorChange() throws Exception { + + // Receive a message with the JMS API + connection.setClientID("test"); + connection.start(); + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + destination = createDestination(session, destinationType); + MessageProducer producer = session.createProducer(destination); + producer.setDeliveryMode(deliveryMode); + MessageConsumer consumer = session.createDurableSubscriber((Topic)destination, "test", "color='red'", false); + + // Send the messages + TextMessage message = session.createTextMessage("1st"); + message.setStringProperty("color", "red"); + producer.send(message); + + Message m = consumer.receive(1000); + assertNotNull(m); + assertEquals("1st", ((TextMessage)m).getText()); + + // Change the subscription. + consumer.close(); + consumer = session.createDurableSubscriber((Topic)destination, "test", "color='blue'", false); + + message = session.createTextMessage("2nd"); + message.setStringProperty("color", "red"); + producer.send(message); + message = session.createTextMessage("3rd"); + message.setStringProperty("color", "blue"); + producer.send(message); + + // Selector should skip the 2nd message. + m = consumer.receive(1000); + assertNotNull(m); + assertEquals("3rd", ((TextMessage)m).getText()); + + assertNull(consumer.receiveNoWait()); + } + + public void initCombosForTestSendReceiveBytesMessage() { + addCombinationValues("deliveryMode", new Object[] {Integer.valueOf(DeliveryMode.NON_PERSISTENT), Integer.valueOf(DeliveryMode.PERSISTENT)}); + addCombinationValues("destinationType", new Object[] {Byte.valueOf(ActiveMQDestination.QUEUE_TYPE), Byte.valueOf(ActiveMQDestination.TOPIC_TYPE), + Byte.valueOf(ActiveMQDestination.TEMP_QUEUE_TYPE), Byte.valueOf(ActiveMQDestination.TEMP_TOPIC_TYPE)}); + } + + public void testSendReceiveBytesMessage() throws Exception { + + // Receive a message with the JMS API + connection.start(); + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + destination = createDestination(session, destinationType); + MessageConsumer consumer = session.createConsumer(destination); + MessageProducer producer = session.createProducer(destination); + + BytesMessage message = session.createBytesMessage(); + message.writeBoolean(true); + message.writeBoolean(false); + producer.send(message); + + // Make sure only 1 message was delivered. + BytesMessage m = (BytesMessage)consumer.receive(1000); + assertNotNull(m); + assertTrue(m.readBoolean()); + assertFalse(m.readBoolean()); + + assertNull(consumer.receiveNoWait()); + } + + public void initCombosForTestSetMessageListenerAfterStart() { + addCombinationValues("deliveryMode", new Object[] {Integer.valueOf(DeliveryMode.NON_PERSISTENT), Integer.valueOf(DeliveryMode.PERSISTENT)}); + addCombinationValues("destinationType", new Object[] {Byte.valueOf(ActiveMQDestination.QUEUE_TYPE), Byte.valueOf(ActiveMQDestination.TOPIC_TYPE), + Byte.valueOf(ActiveMQDestination.TEMP_QUEUE_TYPE), Byte.valueOf(ActiveMQDestination.TEMP_TOPIC_TYPE)}); + } + + public void testSetMessageListenerAfterStart() throws Exception { + + final AtomicInteger counter = new AtomicInteger(0); + final CountDownLatch done = new CountDownLatch(1); + + // Receive a message with the JMS API + connection.start(); + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + destination = createDestination(session, destinationType); + MessageConsumer consumer = session.createConsumer(destination); + + // Send the messages + sendMessages(session, destination, 4); + + // See if the message get sent to the listener + consumer.setMessageListener(new MessageListener() { + @Override + public void onMessage(Message m) { + counter.incrementAndGet(); + if (counter.get() == 4) { + done.countDown(); + } + } + }); + + assertTrue(done.await(1000, TimeUnit.MILLISECONDS)); + Thread.sleep(200); + + // Make sure only 4 messages were delivered. + assertEquals(4, counter.get()); + } + + public void initCombosForTestPassMessageListenerIntoCreateConsumer() { + addCombinationValues("destinationType", new Object[] {Byte.valueOf(ActiveMQDestination.QUEUE_TYPE), Byte.valueOf(ActiveMQDestination.TOPIC_TYPE)}); + } + + public void testPassMessageListenerIntoCreateConsumer() throws Exception { + + final AtomicInteger counter = new AtomicInteger(0); + final CountDownLatch done = new CountDownLatch(1); + + // Receive a message with the JMS API + connection.start(); + ActiveMQSession session = (ActiveMQSession) connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + destination = createDestination(session, destinationType); + MessageConsumer consumer = session.createConsumer(destination, new MessageListener() { + @Override + public void onMessage(Message m) { + counter.incrementAndGet(); + if (counter.get() == 4) { + done.countDown(); + } + } + }); + assertNotNull(consumer); + + // Send the messages + sendMessages(session, destination, 4); + + assertTrue(done.await(1000, TimeUnit.MILLISECONDS)); + Thread.sleep(200); + + // Make sure only 4 messages were delivered. + assertEquals(4, counter.get()); + } + + public void initCombosForTestMessageListenerOnMessageCloseUnackedWithPrefetch1StayInQueue() { + addCombinationValues("deliveryMode", new Object[] {Integer.valueOf(DeliveryMode.NON_PERSISTENT), Integer.valueOf(DeliveryMode.PERSISTENT)}); + addCombinationValues("ackMode", new Object[] {Integer.valueOf(Session.CLIENT_ACKNOWLEDGE)}); + addCombinationValues("destinationType", new Object[] {Byte.valueOf(ActiveMQDestination.QUEUE_TYPE)}); + } + + public void testMessageListenerOnMessageCloseUnackedWithPrefetch1StayInQueue() throws Exception { + + final AtomicInteger counter = new AtomicInteger(0); + final CountDownLatch sendDone = new CountDownLatch(1); + final CountDownLatch got2Done = new CountDownLatch(1); + + // Set prefetch to 1 + connection.getPrefetchPolicy().setAll(1); + // This test case does not work if optimized message dispatch is used as + // the main thread send block until the consumer receives the + // message. This test depends on thread decoupling so that the main + // thread can stop the consumer thread. + connection.setOptimizedMessageDispatch(false); + connection.start(); + + // Use all the ack modes + Session session = connection.createSession(false, ackMode); + destination = createDestination(session, destinationType); + MessageConsumer consumer = session.createConsumer(destination); + consumer.setMessageListener(new MessageListener() { + @Override + public void onMessage(Message m) { + try { + TextMessage tm = (TextMessage)m; + LOG.info("Got in first listener: " + tm.getText()); + assertEquals("" + counter.get(), tm.getText()); + counter.incrementAndGet(); + if (counter.get() == 2) { + sendDone.await(); + connection.close(); + got2Done.countDown(); + } + tm.acknowledge(); + } catch (Throwable e) { + e.printStackTrace(); + } + } + }); + + // Send the messages + sendMessages(session, destination, 4); + sendDone.countDown(); + + // Wait for first 2 messages to arrive. + assertTrue(got2Done.await(100000, TimeUnit.MILLISECONDS)); + + // Re-start connection. + connection = (ActiveMQConnection)factory.createConnection(); + connections.add(connection); + + connection.getPrefetchPolicy().setAll(1); + connection.start(); + + // Pickup the remaining messages. + final CountDownLatch done2 = new CountDownLatch(1); + session = connection.createSession(false, ackMode); + consumer = session.createConsumer(destination); + consumer.setMessageListener(new MessageListener() { + @Override + public void onMessage(Message m) { + try { + TextMessage tm = (TextMessage)m; + LOG.info("Got in second listener: " + tm.getText()); + // order is not guaranteed as the connection is started before the listener is set. + // assertEquals("" + counter.get(), tm.getText()); + counter.incrementAndGet(); + if (counter.get() == 4) { + done2.countDown(); + } + } catch (Throwable e) { + LOG.error("unexpected ex onMessage: ", e); + } + } + }); + + assertTrue(done2.await(1000, TimeUnit.MILLISECONDS)); + Thread.sleep(200); + + // assert msg 2 was redelivered as close() from onMessages() will only ack in auto_ack and dups_ok mode + assertEquals(5, counter.get()); + } + + public void initCombosForTestMessageListenerAutoAckOnCloseWithPrefetch1() { + addCombinationValues("deliveryMode", new Object[] {Integer.valueOf(DeliveryMode.NON_PERSISTENT), Integer.valueOf(DeliveryMode.PERSISTENT)}); + addCombinationValues("ackMode", new Object[] {Integer.valueOf(Session.AUTO_ACKNOWLEDGE), Integer.valueOf(Session.CLIENT_ACKNOWLEDGE)}); + addCombinationValues("destinationType", new Object[] {Byte.valueOf(ActiveMQDestination.QUEUE_TYPE)}); + } + + public void testMessageListenerAutoAckOnCloseWithPrefetch1() throws Exception { + + final AtomicInteger counter = new AtomicInteger(0); + final CountDownLatch sendDone = new CountDownLatch(1); + final CountDownLatch got2Done = new CountDownLatch(1); + + // Set prefetch to 1 + connection.getPrefetchPolicy().setAll(1); + // This test case does not work if optimized message dispatch is used as + // the main thread send block until the consumer receives the + // message. This test depends on thread decoupling so that the main + // thread can stop the consumer thread. + connection.setOptimizedMessageDispatch(false); + connection.start(); + + // Use all the ack modes + Session session = connection.createSession(false, ackMode); + destination = createDestination(session, destinationType); + MessageConsumer consumer = session.createConsumer(destination); + consumer.setMessageListener(new MessageListener() { + @Override + public void onMessage(Message m) { + try { + TextMessage tm = (TextMessage)m; + LOG.info("Got in first listener: " + tm.getText()); + assertEquals("" + counter.get(), tm.getText()); + counter.incrementAndGet(); + m.acknowledge(); + if (counter.get() == 2) { + sendDone.await(); + connection.close(); + got2Done.countDown(); + } + } catch (Throwable e) { + e.printStackTrace(); + } + } + }); + + // Send the messages + sendMessages(session, destination, 4); + sendDone.countDown(); + + // Wait for first 2 messages to arrive. + assertTrue(got2Done.await(100000, TimeUnit.MILLISECONDS)); + + // Re-start connection. + connection = (ActiveMQConnection)factory.createConnection(); + connections.add(connection); + + connection.getPrefetchPolicy().setAll(1); + connection.start(); + + // Pickup the remaining messages. + final CountDownLatch done2 = new CountDownLatch(1); + session = connection.createSession(false, ackMode); + consumer = session.createConsumer(destination); + consumer.setMessageListener(new MessageListener() { + @Override + public void onMessage(Message m) { + try { + TextMessage tm = (TextMessage)m; + LOG.info("Got in second listener: " + tm.getText()); + counter.incrementAndGet(); + if (counter.get() == 4) { + done2.countDown(); + } + } catch (Throwable e) { + LOG.error("unexpected ex onMessage: ", e); + } + } + }); + + assertTrue(done2.await(1000, TimeUnit.MILLISECONDS)); + Thread.sleep(200); + + // close from onMessage with Auto_ack will ack + // Make sure only 4 messages were delivered. + assertEquals(4, counter.get()); + } + + public void initCombosForTestMessageListenerWithConsumerWithPrefetch1() { + addCombinationValues("deliveryMode", new Object[] {Integer.valueOf(DeliveryMode.NON_PERSISTENT), Integer.valueOf(DeliveryMode.PERSISTENT)}); + addCombinationValues("destinationType", new Object[] {Byte.valueOf(ActiveMQDestination.QUEUE_TYPE), Byte.valueOf(ActiveMQDestination.TOPIC_TYPE), + Byte.valueOf(ActiveMQDestination.TEMP_QUEUE_TYPE), Byte.valueOf(ActiveMQDestination.TEMP_TOPIC_TYPE)}); + } + + public void testMessageListenerWithConsumerWithPrefetch1() throws Exception { + + final AtomicInteger counter = new AtomicInteger(0); + final CountDownLatch done = new CountDownLatch(1); + + // Receive a message with the JMS API + connection.getPrefetchPolicy().setAll(1); + connection.start(); + + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + destination = createDestination(session, destinationType); + MessageConsumer consumer = session.createConsumer(destination); + consumer.setMessageListener(new MessageListener() { + @Override + public void onMessage(Message m) { + counter.incrementAndGet(); + if (counter.get() == 4) { + done.countDown(); + } + } + }); + + // Send the messages + sendMessages(session, destination, 4); + + assertTrue(done.await(1000, TimeUnit.MILLISECONDS)); + Thread.sleep(200); + + // Make sure only 4 messages were delivered. + assertEquals(4, counter.get()); + } + + public void initCombosForTestMessageListenerWithConsumer() { + addCombinationValues("deliveryMode", new Object[] {Integer.valueOf(DeliveryMode.NON_PERSISTENT), Integer.valueOf(DeliveryMode.PERSISTENT)}); + addCombinationValues("destinationType", new Object[] {Byte.valueOf(ActiveMQDestination.QUEUE_TYPE), Byte.valueOf(ActiveMQDestination.TOPIC_TYPE), + Byte.valueOf(ActiveMQDestination.TEMP_QUEUE_TYPE), Byte.valueOf(ActiveMQDestination.TEMP_TOPIC_TYPE)}); + } + + public void testMessageListenerWithConsumer() throws Exception { + + final AtomicInteger counter = new AtomicInteger(0); + final CountDownLatch done = new CountDownLatch(1); + + // Receive a message with the JMS API + connection.start(); + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + destination = createDestination(session, destinationType); + MessageConsumer consumer = session.createConsumer(destination); + consumer.setMessageListener(new MessageListener() { + @Override + public void onMessage(Message m) { + counter.incrementAndGet(); + if (counter.get() == 4) { + done.countDown(); + } + } + }); + + // Send the messages + sendMessages(session, destination, 4); + + assertTrue(done.await(1000, TimeUnit.MILLISECONDS)); + Thread.sleep(200); + + // Make sure only 4 messages were delivered. + assertEquals(4, counter.get()); + } + + public void initCombosForTestUnackedWithPrefetch1StayInQueue() { + addCombinationValues("deliveryMode", new Object[] {Integer.valueOf(DeliveryMode.NON_PERSISTENT), Integer.valueOf(DeliveryMode.PERSISTENT)}); + addCombinationValues("ackMode", new Object[] {Integer.valueOf(Session.AUTO_ACKNOWLEDGE), Integer.valueOf(Session.DUPS_OK_ACKNOWLEDGE), + Integer.valueOf(Session.CLIENT_ACKNOWLEDGE)}); + addCombinationValues("destinationType", new Object[] {Byte.valueOf(ActiveMQDestination.QUEUE_TYPE)}); + } + + public void testUnackedWithPrefetch1StayInQueue() throws Exception { + + // Set prefetch to 1 + connection.getPrefetchPolicy().setAll(1); + connection.start(); + + // Use all the ack modes + Session session = connection.createSession(false, ackMode); + destination = createDestination(session, destinationType); + MessageConsumer consumer = session.createConsumer(destination); + + // Send the messages + sendMessages(session, destination, 4); + + // Only pick up the first 2 messages. + Message message = null; + for (int i = 0; i < 2; i++) { + message = consumer.receive(1000); + assertNotNull(message); + } + message.acknowledge(); + + connection.close(); + connection = (ActiveMQConnection)factory.createConnection(); + connections.add(connection); + connection.getPrefetchPolicy().setAll(1); + connection.start(); + + // Use all the ack modes + session = connection.createSession(false, ackMode); + consumer = session.createConsumer(destination); + + // Pickup the rest of the messages. + for (int i = 0; i < 2; i++) { + message = consumer.receive(1000); + assertNotNull(message); + } + message.acknowledge(); + assertNull(consumer.receiveNoWait()); + + } + + public void initCombosForTestPrefetch1MessageNotDispatched() { + addCombinationValues("deliveryMode", new Object[] {Integer.valueOf(DeliveryMode.NON_PERSISTENT), Integer.valueOf(DeliveryMode.PERSISTENT)}); + } + + public void testPrefetch1MessageNotDispatched() throws Exception { + + // Set prefetch to 1 + connection.getPrefetchPolicy().setAll(1); + connection.start(); + + Session session = connection.createSession(true, 0); + destination = new ActiveMQQueue("TEST"); + MessageConsumer consumer = session.createConsumer(destination); + + // Send 2 messages to the destination. + sendMessages(session, destination, 2); + session.commit(); + + // The prefetch should fill up with 1 message. + // Since prefetch is still full, the 2nd message should get dispatched + // to another consumer.. lets create the 2nd consumer test that it does + // make sure it does. + ActiveMQConnection connection2 = (ActiveMQConnection)factory.createConnection(); + connection2.start(); + connections.add(connection2); + Session session2 = connection2.createSession(true, 0); + MessageConsumer consumer2 = session2.createConsumer(destination); + + // Pick up the first message. + Message message1 = consumer.receive(1000); + assertNotNull(message1); + + // Pick up the 2nd messages. + Message message2 = consumer2.receive(5000); + assertNotNull(message2); + + session.commit(); + session2.commit(); + + assertNull(consumer.receiveNoWait()); + + } + + public void initCombosForTestDontStart() { + addCombinationValues("deliveryMode", new Object[] {Integer.valueOf(DeliveryMode.NON_PERSISTENT)}); + addCombinationValues("destinationType", new Object[] {Byte.valueOf(ActiveMQDestination.QUEUE_TYPE), Byte.valueOf(ActiveMQDestination.TOPIC_TYPE)}); + } + + public void testDontStart() throws Exception { + + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + destination = createDestination(session, destinationType); + MessageConsumer consumer = session.createConsumer(destination); + + // Send the messages + sendMessages(session, destination, 1); + + // Make sure no messages were delivered. + assertNull(consumer.receive(1000)); + } + + public void initCombosForTestStartAfterSend() { + addCombinationValues("deliveryMode", new Object[] {Integer.valueOf(DeliveryMode.NON_PERSISTENT)}); + addCombinationValues("destinationType", new Object[] {Byte.valueOf(ActiveMQDestination.QUEUE_TYPE), Byte.valueOf(ActiveMQDestination.TOPIC_TYPE)}); + } + + public void testStartAfterSend() throws Exception { + + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + destination = createDestination(session, destinationType); + MessageConsumer consumer = session.createConsumer(destination); + + // Send the messages + sendMessages(session, destination, 1); + + // Start the conncection after the message was sent. + connection.start(); + + // Make sure only 1 message was delivered. + assertNotNull(consumer.receive(1000)); + assertNull(consumer.receiveNoWait()); + } + + public void initCombosForTestReceiveMessageWithConsumer() { + addCombinationValues("deliveryMode", new Object[] {Integer.valueOf(DeliveryMode.NON_PERSISTENT), Integer.valueOf(DeliveryMode.PERSISTENT)}); + addCombinationValues("destinationType", new Object[] {Byte.valueOf(ActiveMQDestination.QUEUE_TYPE), Byte.valueOf(ActiveMQDestination.TOPIC_TYPE), + Byte.valueOf(ActiveMQDestination.TEMP_QUEUE_TYPE), Byte.valueOf(ActiveMQDestination.TEMP_TOPIC_TYPE)}); + } + + public void testReceiveMessageWithConsumer() throws Exception { + + // Receive a message with the JMS API + connection.start(); + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + destination = createDestination(session, destinationType); + MessageConsumer consumer = session.createConsumer(destination); + + // Send the messages + sendMessages(session, destination, 1); + + // Make sure only 1 message was delivered. + Message m = consumer.receive(1000); + assertNotNull(m); + assertEquals("0", ((TextMessage)m).getText()); + assertNull(consumer.receiveNoWait()); + } + + + public void testDupsOkConsumer() throws Exception { + + // Receive a message with the JMS API + connection.start(); + Session session = connection.createSession(false, Session.DUPS_OK_ACKNOWLEDGE); + destination = createDestination(session, ActiveMQDestination.QUEUE_TYPE); + MessageConsumer consumer = session.createConsumer(destination); + + // Send the messages + sendMessages(session, destination, 4); + + // Make sure only 4 message are delivered. + for( int i=0; i < 4; i++){ + Message m = consumer.receive(1000); + assertNotNull(m); + } + assertNull(consumer.receive(1000)); + + // Close out the consumer.. no other messages should be left on the queue. + consumer.close(); + + consumer = session.createConsumer(destination); + assertNull(consumer.receive(1000)); + } + + public void testRedispatchOfUncommittedTx() throws Exception { + + connection.start(); + Session session = connection.createSession(true, Session.SESSION_TRANSACTED); + destination = createDestination(session, ActiveMQDestination.QUEUE_TYPE); + + sendMessages(connection, destination, 2); + + MessageConsumer consumer = session.createConsumer(destination); + assertNotNull(consumer.receive(1000)); + assertNotNull(consumer.receive(1000)); + + // install another consumer while message dispatch is unacked/uncommitted + Session redispatchSession = connection.createSession(true, Session.SESSION_TRANSACTED); + MessageConsumer redispatchConsumer = redispatchSession.createConsumer(destination); + + // no commit so will auto rollback and get re-dispatched to redisptachConsumer + session.close(); + + Message msg = redispatchConsumer.receive(1000); + assertNotNull(msg); + assertTrue("redelivered flag set", msg.getJMSRedelivered()); + assertEquals(2, msg.getLongProperty("JMSXDeliveryCount")); + + msg = redispatchConsumer.receive(1000); + assertNotNull(msg); + assertTrue(msg.getJMSRedelivered()); + assertEquals(2, msg.getLongProperty("JMSXDeliveryCount")); + redispatchSession.commit(); + + assertNull(redispatchConsumer.receive(500)); + redispatchSession.close(); + } + + + public void testRedispatchOfRolledbackTx() throws Exception { + + connection.start(); + Session session = connection.createSession(true, Session.SESSION_TRANSACTED); + destination = createDestination(session, ActiveMQDestination.QUEUE_TYPE); + + sendMessages(connection, destination, 2); + + MessageConsumer consumer = session.createConsumer(destination); + assertNotNull(consumer.receive(1000)); + assertNotNull(consumer.receive(1000)); + + // install another consumer while message dispatch is unacked/uncommitted + Session redispatchSession = connection.createSession(true, Session.SESSION_TRANSACTED); + MessageConsumer redispatchConsumer = redispatchSession.createConsumer(destination); + + session.rollback(); + session.close(); + + Message msg = redispatchConsumer.receive(1000); + assertNotNull(msg); + assertTrue(msg.getJMSRedelivered()); + assertEquals(2, msg.getLongProperty("JMSXDeliveryCount")); + msg = redispatchConsumer.receive(1000); + assertNotNull(msg); + assertTrue(msg.getJMSRedelivered()); + assertEquals(2, msg.getLongProperty("JMSXDeliveryCount")); + redispatchSession.commit(); + + assertNull(redispatchConsumer.receive(500)); + redispatchSession.close(); + } + + + public void initCombosForTestAckOfExpired() { + addCombinationValues("destinationType", + new Object[] {Byte.valueOf(ActiveMQDestination.QUEUE_TYPE), Byte.valueOf(ActiveMQDestination.TOPIC_TYPE)}); + } + + public void testAckOfExpired() throws Exception { + + ActiveMQConnectionFactory fact = new ActiveMQConnectionFactory("vm://localhost?jms.prefetchPolicy.all=4&jms.sendAcksAsync=false"); + connection = fact.createActiveMQConnection(); + + connection.start(); + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + destination = (ActiveMQDestination) (destinationType == ActiveMQDestination.QUEUE_TYPE ? + session.createQueue("test") : session.createTopic("test")); + + MessageConsumer consumer = session.createConsumer(destination); + connection.setStatsEnabled(true); + + Session sendSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + MessageProducer producer = sendSession.createProducer(destination); + producer.setTimeToLive(1000); + final int count = 4; + for (int i = 0; i < count; i++) { + TextMessage message = sendSession.createTextMessage("" + i); + producer.send(message); + } + + // let first bunch in queue expire + Thread.sleep(2000); + + producer.setTimeToLive(0); + for (int i = 0; i < count; i++) { + TextMessage message = sendSession.createTextMessage("no expiry" + i); + producer.send(message); + } + + ActiveMQMessageConsumer amqConsumer = (ActiveMQMessageConsumer) consumer; + + for(int i=0; i<count; i++) { + TextMessage msg = (TextMessage) amqConsumer.receive(); + assertNotNull(msg); + assertTrue("message has \"no expiry\" text: " + msg.getText(), msg.getText().contains("no expiry")); + + // force an ack when there are expired messages + amqConsumer.acknowledge(); + } + assertEquals("consumer has expiredMessages", count, amqConsumer.getConsumerStats().getExpiredMessageCount().getCount()); + + DestinationViewMBean view = createView(destination); + + assertEquals("Wrong inFlightCount: " + view.getInFlightCount(), 0, view.getInFlightCount()); + assertEquals("Wrong dispatch count: " + view.getDispatchCount(), 8, view.getDispatchCount()); + assertEquals("Wrong dequeue count: " + view.getDequeueCount(), 8, view.getDequeueCount()); + assertEquals("Wrong expired count: " + view.getExpiredCount(), 4, view.getExpiredCount()); + } + + protected DestinationViewMBean createView(ActiveMQDestination destination) throws Exception { + + String domain = "org.apache.activemq"; + ObjectName name; + if (destination.isQueue()) { + name = new ObjectName(domain + ":type=Broker,brokerName=localhost,destinationType=Queue,destinationName=test"); + } else { + name = new ObjectName(domain + ":type=Broker,brokerName=localhost,destinationType=Topic,destinationName=test"); + } + return (DestinationViewMBean)broker.getManagementContext().newProxyInstance(name, DestinationViewMBean.class, true); + } + +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JMSDurableTopicRedeliverTest.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JMSDurableTopicRedeliverTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JMSDurableTopicRedeliverTest.java new file mode 100644 index 0000000..0380614 --- /dev/null +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JMSDurableTopicRedeliverTest.java @@ -0,0 +1,80 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq; + +import javax.jms.Message; +import javax.jms.Session; +import javax.jms.TextMessage; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * + */ +public class JMSDurableTopicRedeliverTest extends JmsTopicRedeliverTest { + + private static final Logger LOG = LoggerFactory.getLogger(JMSDurableTopicRedeliverTest.class); + + protected void setUp() throws Exception { + durable = true; + super.setUp(); + } + + /** + * Sends and consumes the messages. + * + * @throws Exception + */ + public void testRedeliverNewSession() throws Exception { + String text = "TEST: " + System.currentTimeMillis(); + Message sendMessage = session.createTextMessage(text); + + if (verbose) { + LOG.info("About to send a message: " + sendMessage + " with text: " + text); + } + producer.send(producerDestination, sendMessage); + + // receive but don't acknowledge + Message unackMessage = consumer.receive(1000); + assertNotNull(unackMessage); + String unackId = unackMessage.getJMSMessageID(); + assertEquals(((TextMessage)unackMessage).getText(), text); + assertFalse(unackMessage.getJMSRedelivered()); + assertEquals(unackMessage.getIntProperty("JMSXDeliveryCount"), 1); + consumeSession.close(); + consumer.close(); + + // receive then acknowledge + consumeSession = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE); + consumer = createConsumer(); + Message ackMessage = consumer.receive(1000); + assertNotNull(ackMessage); + ackMessage.acknowledge(); + String ackId = ackMessage.getJMSMessageID(); + assertEquals(((TextMessage)ackMessage).getText(), text); + assertTrue(ackMessage.getJMSRedelivered()); + assertEquals(ackMessage.getIntProperty("JMSXDeliveryCount"), 2); + assertEquals(unackId, ackId); + consumeSession.close(); + consumer.close(); + + consumeSession = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE); + consumer = createConsumer(); + assertNull(consumer.receive(1000)); + } +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JMSExclusiveConsumerTest.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JMSExclusiveConsumerTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JMSExclusiveConsumerTest.java new file mode 100644 index 0000000..eda6faa --- /dev/null +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JMSExclusiveConsumerTest.java @@ -0,0 +1,152 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq; + +import javax.jms.DeliveryMode; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Session; + +import junit.framework.Test; +import org.apache.activemq.command.ActiveMQQueue; + +/** + * Test cases used to test the JMS message exclusive consumers. + * + * + */ +public class JMSExclusiveConsumerTest extends JmsTestSupport { + + public int deliveryMode; + + public static Test suite() { + return suite(JMSExclusiveConsumerTest.class); + } + + public static void main(String[] args) { + junit.textui.TestRunner.run(suite()); + } + + public void initCombosForTestRoundRobinDispatchOnNonExclusive() { + addCombinationValues("deliveryMode", new Object[] {Integer.valueOf(DeliveryMode.NON_PERSISTENT), Integer.valueOf(DeliveryMode.PERSISTENT)}); + } + + /** + * Shows that by default messages are round robined across a set of + * consumers. + * + * @throws Exception + */ + public void testRoundRobinDispatchOnNonExclusive() throws Exception { + + // Receive a message with the JMS API + connection.start(); + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + ActiveMQQueue destination = new ActiveMQQueue("TEST"); + MessageProducer producer = session.createProducer(destination); + producer.setDeliveryMode(deliveryMode); + + MessageConsumer consumer1 = session.createConsumer(destination); + MessageConsumer consumer2 = session.createConsumer(destination); + + // Send the messages + producer.send(session.createTextMessage("1st")); + producer.send(session.createTextMessage("2nd")); + + Message m; + m = consumer2.receive(1000); + assertNotNull(m); + + m = consumer1.receive(1000); + assertNotNull(m); + + assertNull(consumer1.receiveNoWait()); + assertNull(consumer2.receiveNoWait()); + } + + public void initCombosForTestDispatchExclusive() { + addCombinationValues("deliveryMode", new Object[] {Integer.valueOf(DeliveryMode.NON_PERSISTENT), Integer.valueOf(DeliveryMode.PERSISTENT)}); + } + + /** + * Shows that if the "?consumer.exclusive=true" option is added to + * destination, then all messages are routed to 1 consumer. + * + * @throws Exception + */ + public void testDispatchExclusive() throws Exception { + + // Receive a message with the JMS API + connection.start(); + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + ActiveMQQueue destination = new ActiveMQQueue("TEST?consumer.exclusive=true"); + MessageProducer producer = session.createProducer(destination); + producer.setDeliveryMode(deliveryMode); + + MessageConsumer consumer1 = session.createConsumer(destination); + MessageConsumer consumer2 = session.createConsumer(destination); + + // Send the messages + producer.send(session.createTextMessage("1st")); + producer.send(session.createTextMessage("2nd")); + producer.send(session.createTextMessage("3nd")); + + Message m; + m = consumer2.receive(1000); + if (m != null) { + // Consumer 2 should get all the messages. + for (int i = 0; i < 2; i++) { + m = consumer2.receive(1000); + assertNotNull(m); + } + } else { + // Consumer 1 should get all the messages. + for (int i = 0; i < 3; i++) { + m = consumer1.receive(1000); + assertNotNull(m); + } + } + + assertNull(consumer1.receiveNoWait()); + assertNull(consumer2.receiveNoWait()); + } + + public void testMixExclusiveWithNonExclusive() throws Exception { + ActiveMQQueue exclusiveQueue = new ActiveMQQueue("TEST.FOO?consumer.exclusive=true"); + ActiveMQQueue nonExclusiveQueue = new ActiveMQQueue("TEST.FOO?consumer.exclusive=false"); + + connection.start(); + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + + MessageConsumer nonExCon = session.createConsumer(nonExclusiveQueue); + MessageConsumer exCon = session.createConsumer(exclusiveQueue); + + MessageProducer prod = session.createProducer(exclusiveQueue); + prod.send(session.createMessage()); + prod.send(session.createMessage()); + prod.send(session.createMessage()); + + Message m; + for (int i = 0; i < 3; i++) { + m = exCon.receive(1000); + assertNotNull(m); + m = nonExCon.receive(1000); + assertNull(m); + } + } +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JMSIndividualAckTest.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JMSIndividualAckTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JMSIndividualAckTest.java new file mode 100644 index 0000000..ec3c153 --- /dev/null +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JMSIndividualAckTest.java @@ -0,0 +1,161 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq; + +import javax.jms.Connection; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Queue; +import javax.jms.Session; +import javax.jms.TextMessage; + +/** + * + */ +public class JMSIndividualAckTest extends TestSupport { + + private Connection connection; + + @Override + protected void setUp() throws Exception { + super.setUp(); + connection = createConnection(); + } + + /** + * @see junit.framework.TestCase#tearDown() + */ + @Override + protected void tearDown() throws Exception { + if (connection != null) { + connection.close(); + connection = null; + } + super.tearDown(); + } + + /** + * Tests if acknowledged messages are being consumed. + * + * @throws JMSException + */ + public void testAckedMessageAreConsumed() throws JMSException { + connection.start(); + Session session = connection.createSession(false, ActiveMQSession.INDIVIDUAL_ACKNOWLEDGE); + Queue queue = session.createQueue(getQueueName()); + MessageProducer producer = session.createProducer(queue); + producer.send(session.createTextMessage("Hello")); + + // Consume the message... + MessageConsumer consumer = session.createConsumer(queue); + Message msg = consumer.receive(1000); + assertNotNull(msg); + msg.acknowledge(); + + // Reset the session. + session.close(); + session = connection.createSession(false, ActiveMQSession.INDIVIDUAL_ACKNOWLEDGE); + + // Attempt to Consume the message... + consumer = session.createConsumer(queue); + msg = consumer.receive(1000); + assertNull(msg); + + session.close(); + } + + /** + * Tests if acknowledged messages are being consumed. + * + * @throws JMSException + */ + public void testLastMessageAcked() throws JMSException { + connection.start(); + Session session = connection.createSession(false, ActiveMQSession.INDIVIDUAL_ACKNOWLEDGE); + Queue queue = session.createQueue(getQueueName()); + MessageProducer producer = session.createProducer(queue); + TextMessage msg1 = session.createTextMessage("msg1"); + TextMessage msg2 = session.createTextMessage("msg2"); + TextMessage msg3 = session.createTextMessage("msg3"); + producer.send(msg1); + producer.send(msg2); + producer.send(msg3); + + // Consume the message... + MessageConsumer consumer = session.createConsumer(queue); + Message msg = consumer.receive(1000); + assertNotNull(msg); + msg = consumer.receive(1000); + assertNotNull(msg); + msg = consumer.receive(1000); + assertNotNull(msg); + msg.acknowledge(); + + // Reset the session. + session.close(); + session = connection.createSession(false, ActiveMQSession.INDIVIDUAL_ACKNOWLEDGE); + + // Attempt to Consume the message... + consumer = session.createConsumer(queue); + msg = consumer.receive(1000); + assertNotNull(msg); + assertEquals(msg1,msg); + msg = consumer.receive(1000); + assertNotNull(msg); + assertEquals(msg2,msg); + msg = consumer.receive(1000); + assertNull(msg); + session.close(); + } + + /** + * Tests if unacknowledged messages are being re-delivered when the consumer connects again. + * + * @throws JMSException + */ + public void testUnAckedMessageAreNotConsumedOnSessionClose() throws JMSException { + connection.start(); + Session session = connection.createSession(false, ActiveMQSession.INDIVIDUAL_ACKNOWLEDGE); + Queue queue = session.createQueue(getQueueName()); + MessageProducer producer = session.createProducer(queue); + producer.send(session.createTextMessage("Hello")); + + // Consume the message... + MessageConsumer consumer = session.createConsumer(queue); + Message msg = consumer.receive(1000); + assertNotNull(msg); + // Don't ack the message. + + // Reset the session. This should cause the unacknowledged message to be re-delivered. + session.close(); + session = connection.createSession(false, ActiveMQSession.INDIVIDUAL_ACKNOWLEDGE); + + // Attempt to Consume the message... + consumer = session.createConsumer(queue); + msg = consumer.receive(2000); + assertNotNull(msg); + msg.acknowledge(); + + session.close(); + } + + protected String getQueueName() { + return getClass().getName() + "." + getName(); + } +}
