http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e4decdc1/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/consumer/JmsClientAckTest.java ---------------------------------------------------------------------- diff --git a/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/consumer/JmsClientAckTest.java b/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/consumer/JmsClientAckTest.java new file mode 100644 index 0000000..aa97a60 --- /dev/null +++ b/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/consumer/JmsClientAckTest.java @@ -0,0 +1,361 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.qpid.jms.consumer; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; + +import javax.jms.Connection; +import javax.jms.DeliveryMode; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageListener; +import javax.jms.MessageProducer; +import javax.jms.Queue; +import javax.jms.Session; +import javax.jms.TextMessage; + +import org.apache.activemq.broker.jmx.QueueViewMBean; +import org.apache.qpid.jms.support.AmqpTestSupport; +import org.apache.qpid.jms.support.Wait; +import org.junit.After; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Test that Session CLIENT_ACKNOWLEDGE works as expected. + */ +public class JmsClientAckTest extends AmqpTestSupport { + + protected static final Logger LOG = LoggerFactory.getLogger(JmsClientAckTest.class); + + private Connection connection; + + @Override + @After + public void tearDown() throws Exception { + connection.close(); + super.tearDown(); + } + + @Test(timeout = 60000) + public void testAckedMessageAreConsumed() throws Exception { + connection = createAmqpConnection(); + connection.start(); + Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE); + Queue queue = session.createQueue(name.getMethodName()); + MessageProducer producer = session.createProducer(queue); + producer.send(session.createTextMessage("Hello")); + + final QueueViewMBean proxy = getProxyToQueue(name.getMethodName()); + assertEquals(1, proxy.getQueueSize()); + + // Consume the message... + MessageConsumer consumer = session.createConsumer(queue); + Message msg = consumer.receive(1000); + assertNotNull(msg); + msg.acknowledge(); + + assertTrue("Queued message not consumed.", Wait.waitFor(new Wait.Condition() { + + @Override + public boolean isSatisified() throws Exception { + return proxy.getQueueSize() == 0; + } + })); + + connection.close(); + } + + @Test(timeout = 60000) + public void testLastMessageAcked() throws Exception { + connection = createAmqpConnection(); + connection.start(); + Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE); + Queue queue = session.createQueue(name.getMethodName()); + MessageProducer producer = session.createProducer(queue); + producer.send(session.createTextMessage("Hello")); + producer.send(session.createTextMessage("Hello2")); + producer.send(session.createTextMessage("Hello3")); + + final QueueViewMBean proxy = getProxyToQueue(name.getMethodName()); + assertEquals(3, proxy.getQueueSize()); + + // Consume the message... + MessageConsumer consumer = session.createConsumer(queue); + Message msg = consumer.receive(1000); + assertNotNull(msg); + msg = consumer.receive(1000); + assertNotNull(msg); + msg = consumer.receive(1000); + assertNotNull(msg); + msg.acknowledge(); + + assertTrue("Queued message not consumed.", Wait.waitFor(new Wait.Condition() { + + @Override + public boolean isSatisified() throws Exception { + return proxy.getQueueSize() == 0; + } + })); + } + + @Test(timeout = 60000) + public void testUnAckedMessageAreNotConsumedOnSessionClose() throws Exception { + connection = createAmqpConnection(); + connection.start(); + Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE); + Queue queue = session.createQueue(name.getMethodName()); + MessageProducer producer = session.createProducer(queue); + producer.send(session.createTextMessage("Hello")); + + final QueueViewMBean proxy = getProxyToQueue(name.getMethodName()); + assertEquals(1, proxy.getQueueSize()); + + // Consume the message...but don't ack it. + MessageConsumer consumer = session.createConsumer(queue); + Message msg = consumer.receive(1000); + assertNotNull(msg); + session.close(); + + assertEquals(1, proxy.getQueueSize()); + + // Consume the message...and this time we ack it. + session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE); + consumer = session.createConsumer(queue); + msg = consumer.receive(2000); + assertNotNull(msg); + msg.acknowledge(); + + assertTrue("Queued message not consumed.", Wait.waitFor(new Wait.Condition() { + + @Override + public boolean isSatisified() throws Exception { + return proxy.getQueueSize() == 0; + } + })); + } + + @Test(timeout = 60000) + public void testAckedMessageAreConsumedByAsync() throws Exception { + connection = createAmqpConnection(); + connection.start(); + Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE); + Queue queue = session.createQueue(name.getMethodName()); + MessageProducer producer = session.createProducer(queue); + producer.send(session.createTextMessage("Hello")); + + final QueueViewMBean proxy = getProxyToQueue(name.getMethodName()); + assertEquals(1, proxy.getQueueSize()); + + // Consume the message... + MessageConsumer consumer = session.createConsumer(queue); + consumer.setMessageListener(new MessageListener() { + + @Override + public void onMessage(Message message) { + try { + message.acknowledge(); + } catch (JMSException e) { + LOG.warn("Unexpected exception on acknowledge: {}", e.getMessage()); + } + } + }); + + assertTrue("Queued message not consumed.", Wait.waitFor(new Wait.Condition() { + + @Override + public boolean isSatisified() throws Exception { + return proxy.getQueueSize() == 0; + } + })); + } + + @Test(timeout = 60000) + public void testUnAckedAsyncMessageAreNotConsumedOnSessionClose() throws Exception { + connection = createAmqpConnection(); + connection.start(); + Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE); + Queue queue = session.createQueue(name.getMethodName()); + MessageProducer producer = session.createProducer(queue); + producer.send(session.createTextMessage("Hello")); + + final QueueViewMBean proxy = getProxyToQueue(name.getMethodName()); + assertEquals(1, proxy.getQueueSize()); + + // Consume the message... + MessageConsumer consumer = session.createConsumer(queue); + consumer.setMessageListener(new MessageListener() { + + @Override + public void onMessage(Message message) { + // Don't ack the message. + } + }); + + session.close(); + assertEquals(1, proxy.getQueueSize()); + + // Now we consume and ack the Message. + session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE); + consumer = session.createConsumer(queue); + Message msg = consumer.receive(2000); + assertNotNull(msg); + msg.acknowledge(); + + assertTrue("Queued message not consumed.", Wait.waitFor(new Wait.Condition() { + + @Override + public boolean isSatisified() throws Exception { + return proxy.getQueueSize() == 0; + } + })); + } + + @Test(timeout=90000) + public void testAckMarksAllConsumerMessageAsConsumed() throws Exception { + connection = createAmqpConnection(); + connection.start(); + Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE); + Queue queue = session.createQueue(name.getMethodName()); + + final int MSG_COUNT = 30; + final AtomicReference<Message> lastMessage = new AtomicReference<Message>(); + final CountDownLatch done = new CountDownLatch(MSG_COUNT); + + MessageListener myListener = new MessageListener() { + + @Override + public void onMessage(Message message) { + lastMessage.set(message); + done.countDown(); + } + }; + + MessageConsumer consumer1 = session.createConsumer(queue); + consumer1.setMessageListener(myListener); + MessageConsumer consumer2 = session.createConsumer(queue); + consumer2.setMessageListener(myListener); + MessageConsumer consumer3 = session.createConsumer(queue); + consumer3.setMessageListener(myListener); + + MessageProducer producer = session.createProducer(queue); + for (int i = 0; i < MSG_COUNT; ++i) { + producer.send(session.createTextMessage("Hello: " + i)); + } + + final QueueViewMBean proxy = getProxyToQueue(name.getMethodName()); + assertEquals(MSG_COUNT, proxy.getQueueSize()); + + assertTrue("Failed to consume all messages.", done.await(20, TimeUnit.SECONDS)); + assertNotNull(lastMessage.get()); + assertEquals(MSG_COUNT, proxy.getInFlightCount()); + + lastMessage.get().acknowledge(); + + assertTrue("Queued message not consumed.", Wait.waitFor(new Wait.Condition() { + + @Override + public boolean isSatisified() throws Exception { + return proxy.getQueueSize() == 0; + } + })); + } + + @Test(timeout=60000) + public void testUnackedAreRecovered() throws Exception { + connection = createAmqpConnection(); + connection.start(); + Session consumerSession = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE); + Queue queue = consumerSession.createQueue(name.getMethodName()); + MessageConsumer consumer = consumerSession.createConsumer(queue); + Session producerSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + MessageProducer producer = producerSession.createProducer(queue); + producer.setDeliveryMode(DeliveryMode.PERSISTENT); + + TextMessage sent1 = producerSession.createTextMessage(); + sent1.setText("msg1"); + producer.send(sent1); + TextMessage sent2 = producerSession.createTextMessage(); + sent1.setText("msg2"); + producer.send(sent2); + TextMessage sent3 = producerSession.createTextMessage(); + sent1.setText("msg3"); + producer.send(sent3); + + consumer.receive(5000); + Message rec2 = consumer.receive(5000); + consumer.receive(5000); + rec2.acknowledge(); + + TextMessage sent4 = producerSession.createTextMessage(); + sent4.setText("msg4"); + producer.send(sent4); + + Message rec4 = consumer.receive(5000); + assertNotNull(rec4); + assertTrue(rec4.equals(sent4)); + consumerSession.recover(); + rec4 = consumer.receive(5000); + assertNotNull(rec4); + assertTrue(rec4.equals(sent4)); + assertTrue(rec4.getJMSRedelivered()); + rec4.acknowledge(); + } + + @Test(timeout=60000) + public void testRecoverRedelivery() throws Exception { + final CountDownLatch redelivery = new CountDownLatch(6); + connection = createAmqpConnection(); + connection.start(); + + final Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE); + Queue queue = session.createQueue(name.getMethodName()); + MessageConsumer consumer = session.createConsumer(queue); + + consumer.setMessageListener(new MessageListener() { + @Override + public void onMessage(Message message) { + try { + LOG.info("Got message: " + message.getJMSMessageID()); + if (message.getJMSRedelivered()) { + LOG.info("It's a redelivery."); + redelivery.countDown(); + } + LOG.info("calling recover() on the session to force redelivery."); + session.recover(); + } catch (JMSException e) { + e.printStackTrace(); + } + } + }); + + connection.start(); + + MessageProducer producer = session.createProducer(queue); + producer.send(session.createTextMessage("test")); + + assertTrue("we got 6 redeliveries", redelivery.await(20, TimeUnit.SECONDS)); + } +}
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e4decdc1/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/consumer/JmsConsumerPriorityDispatchTest.java ---------------------------------------------------------------------- diff --git a/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/consumer/JmsConsumerPriorityDispatchTest.java b/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/consumer/JmsConsumerPriorityDispatchTest.java new file mode 100644 index 0000000..018d6bc --- /dev/null +++ b/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/consumer/JmsConsumerPriorityDispatchTest.java @@ -0,0 +1,123 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.qpid.jms.consumer; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; + +import java.util.concurrent.TimeUnit; + +import javax.jms.Connection; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Queue; +import javax.jms.Session; + +import org.apache.activemq.broker.jmx.QueueViewMBean; +import org.apache.qpid.jms.JmsConnection; +import org.apache.qpid.jms.support.AmqpTestSupport; +import org.apache.qpid.jms.support.Wait; +import org.junit.Before; +import org.junit.Test; + +/** + * Test for Message priority ordering. + */ +public class JmsConsumerPriorityDispatchTest extends AmqpTestSupport { + + private Connection connection; + + @Override + @Before + public void setUp() throws Exception { + super.setUp(); + connection = createAmqpConnection(); + } + + @Test(timeout = 60000) + public void testPrefetchedMessageArePriorityOrdered() throws Exception { + connection.start(); + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + Queue queue = session.createQueue(name.getMethodName()); + MessageProducer producer = session.createProducer(queue); + MessageConsumer consumer = session.createConsumer(queue); + Message message = null; + + for (int i = 0; i < 10; i++) { + message = session.createTextMessage(); + producer.setPriority(i); + producer.send(message); + } + + // Wait for all sent to be dispatched. + final QueueViewMBean proxy = getProxyToQueue(name.getMethodName()); + Wait.waitFor(new Wait.Condition() { + @Override + public boolean isSatisified() throws Exception { + return proxy.getInFlightCount() == 10; + } + }); + + // We need to make sure that all messages are in the prefetch buffer. + TimeUnit.SECONDS.sleep(4); + + for (int i = 9; i >= 0; i--) { + message = consumer.receive(5000); + assertNotNull(message); + assertEquals(i, message.getJMSPriority()); + } + } + + @Test(timeout = 60000) + public void testPrefetchedMessageAreNotPriorityOrdered() throws Exception { + // We are assuming that Broker side priority support is not enabled in the create + // broker method in AmqpTestSupport. If that changes then this test will sometimes + // fail. + ((JmsConnection) connection).setMessagePrioritySupported(false); + connection.start(); + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + Queue queue = session.createQueue(name.getMethodName()); + MessageProducer producer = session.createProducer(queue); + MessageConsumer consumer = session.createConsumer(queue); + Message message = null; + + for (int i = 0; i < 10; i++) { + message = session.createTextMessage(); + producer.setPriority(i); + producer.send(message); + } + + // Wait for all sent to be dispatched. + final QueueViewMBean proxy = getProxyToQueue(name.getMethodName()); + Wait.waitFor(new Wait.Condition() { + @Override + public boolean isSatisified() throws Exception { + return proxy.getInFlightCount() == 10; + } + }); + + // We need to make sure that all messages are in the prefetch buffer. + TimeUnit.SECONDS.sleep(4); + + for (int i = 0; i < 10; i++) { + message = consumer.receive(5000); + assertNotNull(message); + assertEquals(i, message.getJMSPriority()); + } + } +} http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e4decdc1/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/consumer/JmsCreateResourcesInOnMessageTest.java ---------------------------------------------------------------------- diff --git a/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/consumer/JmsCreateResourcesInOnMessageTest.java b/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/consumer/JmsCreateResourcesInOnMessageTest.java new file mode 100644 index 0000000..dbbee26 --- /dev/null +++ b/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/consumer/JmsCreateResourcesInOnMessageTest.java @@ -0,0 +1,94 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.qpid.jms.consumer; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; + +import javax.jms.Connection; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageListener; +import javax.jms.MessageProducer; +import javax.jms.Queue; +import javax.jms.Session; + +import org.apache.activemq.broker.jmx.QueueViewMBean; +import org.apache.qpid.jms.support.AmqpTestSupport; +import org.apache.qpid.jms.support.Wait; +import org.junit.Test; + +/** + * Test the case where messages are sent and consumers are created in onMessage. + */ +public class JmsCreateResourcesInOnMessageTest extends AmqpTestSupport { + + @Test(timeout = 60000) + public void testCreateProducerInOnMessage() throws Exception { + Connection connection = createAmqpConnection(); + connection.start(); + + final Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + assertNotNull(session); + Queue queue = session.createQueue(name.getMethodName()); + MessageConsumer consumer = session.createConsumer(queue); + final Queue forwardQ = session.createQueue(name.getMethodName() + "-forwarded"); + MessageProducer producer = session.createProducer(queue); + producer.send(session.createTextMessage("TEST-MESSAGE")); + producer.close(); + + final QueueViewMBean proxy = getProxyToQueue(queue.getQueueName()); + assertEquals(1, proxy.getQueueSize()); + + consumer.setMessageListener(new MessageListener() { + + @Override + public void onMessage(Message message) { + + try { + LOG.debug("Received async message: {}", message); + MessageProducer producer = session.createProducer(forwardQ); + producer.send(message); + LOG.debug("forwarded async message: {}", message); + } catch (Throwable e) { + LOG.debug("Caught exception: {}", e); + throw new RuntimeException(e.getMessage()); + } + } + }); + + assertTrue("Queued message not consumed.", Wait.waitFor(new Wait.Condition() { + + @Override + public boolean isSatisified() throws Exception { + return proxy.getQueueSize() == 0; + } + })); + + final QueueViewMBean proxy2 = getProxyToQueue(forwardQ.getQueueName()); + assertTrue("Queued message not consumed.", Wait.waitFor(new Wait.Condition() { + + @Override + public boolean isSatisified() throws Exception { + return proxy2.getQueueSize() == 1; + } + })); + + connection.close(); + } +} http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e4decdc1/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/consumer/JmsDurableSubscriberTest.java ---------------------------------------------------------------------- diff --git a/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/consumer/JmsDurableSubscriberTest.java b/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/consumer/JmsDurableSubscriberTest.java new file mode 100644 index 0000000..5994184 --- /dev/null +++ b/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/consumer/JmsDurableSubscriberTest.java @@ -0,0 +1,143 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.qpid.jms.consumer; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +import javax.jms.Message; +import javax.jms.MessageListener; +import javax.jms.MessageProducer; +import javax.jms.Session; +import javax.jms.Topic; +import javax.jms.TopicSubscriber; + +import org.apache.activemq.broker.jmx.TopicViewMBean; +import org.apache.qpid.jms.support.AmqpTestSupport; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Test Durable Topic Subscriber functionality. + */ +public class JmsDurableSubscriberTest extends AmqpTestSupport { + + protected static final Logger LOG = LoggerFactory.getLogger(JmsMessageConsumerTest.class); + + @Override + public boolean isPersistent() { + return true; + } + + @Test(timeout = 60000) + public void testCreateDuableSubscriber() throws Exception { + connection = createAmqpConnection(); + connection.setClientID("DURABLE-AMQP"); + connection.start(); + + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + assertNotNull(session); + Topic topic = session.createTopic(name.getMethodName()); + session.createDurableSubscriber(topic, name.getMethodName() + "-subscriber"); + + TopicViewMBean proxy = getProxyToTopic(name.getMethodName()); + assertEquals(0, proxy.getQueueSize()); + + assertEquals(1, brokerService.getAdminView().getDurableTopicSubscribers().length); + } + + @Test(timeout = 60000) + public void testDurableGoesOfflineAndReturns() throws Exception { + connection = createAmqpConnection(); + connection.setClientID("DURABLE-AMQP"); + connection.start(); + + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + assertNotNull(session); + Topic topic = session.createTopic(name.getMethodName()); + TopicSubscriber subscriber = session.createDurableSubscriber(topic, name.getMethodName() + "-subscriber"); + + TopicViewMBean proxy = getProxyToTopic(name.getMethodName()); + assertEquals(0, proxy.getQueueSize()); + + assertEquals(1, brokerService.getAdminView().getDurableTopicSubscribers().length); + assertEquals(0, brokerService.getAdminView().getInactiveDurableTopicSubscribers().length); + + subscriber.close(); + + assertEquals(0, brokerService.getAdminView().getDurableTopicSubscribers().length); + assertEquals(1, brokerService.getAdminView().getInactiveDurableTopicSubscribers().length); + + subscriber = session.createDurableSubscriber(topic, name.getMethodName() + "-subscriber"); + + assertEquals(1, brokerService.getAdminView().getDurableTopicSubscribers().length); + assertEquals(0, brokerService.getAdminView().getInactiveDurableTopicSubscribers().length); + } + + @Test(timeout = 60000) + public void testOfflineSubscriberGetsItsMessages() throws Exception { + connection = createAmqpConnection(); + connection.setClientID("DURABLE-AMQP"); + connection.start(); + + final int MSG_COUNT = 5; + + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + assertNotNull(session); + Topic topic = session.createTopic(name.getMethodName()); + TopicSubscriber subscriber = session.createDurableSubscriber(topic, name.getMethodName() + "-subscriber"); + + TopicViewMBean proxy = getProxyToTopic(name.getMethodName()); + assertEquals(0, proxy.getQueueSize()); + assertEquals(1, brokerService.getAdminView().getDurableTopicSubscribers().length); + assertEquals(0, brokerService.getAdminView().getInactiveDurableTopicSubscribers().length); + + subscriber.close(); + + assertEquals(0, brokerService.getAdminView().getDurableTopicSubscribers().length); + assertEquals(1, brokerService.getAdminView().getInactiveDurableTopicSubscribers().length); + + MessageProducer producer = session.createProducer(topic); + for (int i = 0; i < MSG_COUNT; i++) { + producer.send(session.createTextMessage("Message: " + i)); + } + producer.close(); + + LOG.info("Bringing offline subscription back online."); + subscriber = session.createDurableSubscriber(topic, name.getMethodName() + "-subscriber"); + + assertEquals(1, brokerService.getAdminView().getDurableTopicSubscribers().length); + assertEquals(0, brokerService.getAdminView().getInactiveDurableTopicSubscribers().length); + + final CountDownLatch messages = new CountDownLatch(MSG_COUNT); + subscriber.setMessageListener(new MessageListener() { + + @Override + public void onMessage(Message message) { + LOG.info("Consumer got a message: {}", message); + messages.countDown(); + } + }); + + assertTrue("Only recieved messages: " + messages.getCount(), messages.await(30, TimeUnit.SECONDS)); + } +} http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e4decdc1/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/consumer/JmsMessageConsumerClosedTest.java ---------------------------------------------------------------------- diff --git a/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/consumer/JmsMessageConsumerClosedTest.java b/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/consumer/JmsMessageConsumerClosedTest.java new file mode 100644 index 0000000..8fbcbad --- /dev/null +++ b/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/consumer/JmsMessageConsumerClosedTest.java @@ -0,0 +1,89 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.qpid.jms.consumer; + +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageListener; +import javax.jms.Queue; +import javax.jms.Session; + +import org.apache.qpid.jms.support.AmqpTestSupport; +import org.junit.Test; + +/** + * Tests MessageConsumer method contracts after the MessageConsumer is closed. + */ +public class JmsMessageConsumerClosedTest extends AmqpTestSupport { + + protected MessageConsumer consumer; + + protected MessageConsumer createConsumer() throws Exception { + connection = createAmqpConnection(); + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + Queue destination = session.createQueue(name.getMethodName()); + MessageConsumer consumer = session.createConsumer(destination); + consumer.close(); + return consumer; + } + + @Override + public void setUp() throws Exception { + super.setUp(); + consumer = createConsumer(); + } + + @Test(timeout=30000, expected=JMSException.class) + public void testGetMessageSelectorFails() throws JMSException { + consumer.getMessageSelector(); + } + + @Test(timeout=30000, expected=JMSException.class) + public void testGetMessageListenerFails() throws JMSException { + consumer.getMessageListener(); + } + + @Test(timeout=30000, expected=JMSException.class) + public void testSetMessageListenerFails() throws JMSException { + consumer.setMessageListener(new MessageListener() { + @Override + public void onMessage(Message message) { + } + }); + } + + @Test(timeout=30000, expected=JMSException.class) + public void testRreceiveFails() throws JMSException { + consumer.receive(); + } + + @Test(timeout=30000, expected=JMSException.class) + public void testRreceiveTimedFails() throws JMSException { + consumer.receive(11); + } + + @Test(timeout=30000, expected=JMSException.class) + public void testRreceiveNoWaitFails() throws JMSException { + consumer.receiveNoWait(); + } + + @Test(timeout=30000) + public void testClose() throws JMSException { + consumer.close(); + } +} http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e4decdc1/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/consumer/JmsMessageConsumerFailedTest.java ---------------------------------------------------------------------- diff --git a/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/consumer/JmsMessageConsumerFailedTest.java b/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/consumer/JmsMessageConsumerFailedTest.java new file mode 100644 index 0000000..85b4b37 --- /dev/null +++ b/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/consumer/JmsMessageConsumerFailedTest.java @@ -0,0 +1,65 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.qpid.jms.consumer; + +import static org.junit.Assert.assertTrue; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +import javax.jms.ExceptionListener; +import javax.jms.JMSException; +import javax.jms.MessageConsumer; +import javax.jms.Queue; +import javax.jms.Session; + +import org.apache.qpid.jms.JmsConnection; +import org.apache.qpid.jms.support.Wait; + +/** + * Tests MessageConsumer method contracts after the MessageConsumer connection fails. + */ +public class JmsMessageConsumerFailedTest extends JmsMessageConsumerClosedTest { + + @Override + protected MessageConsumer createConsumer() throws Exception { + final CountDownLatch latch = new CountDownLatch(1); + connection = createAmqpConnection(); + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + Queue destination = session.createQueue(name.getMethodName()); + MessageConsumer consumer = session.createConsumer(destination); + connection.setExceptionListener(new ExceptionListener() { + + @Override + public void onException(JMSException exception) { + latch.countDown(); + } + }); + connection.start(); + stopPrimaryBroker(); + assertTrue(latch.await(10, TimeUnit.SECONDS)); + final JmsConnection jmsConnection = (JmsConnection) connection; + assertTrue(Wait.waitFor(new Wait.Condition() { + + @Override + public boolean isSatisified() throws Exception { + return !jmsConnection.isConnected(); + } + })); + return consumer; + } +} http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e4decdc1/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/consumer/JmsMessageConsumerTest.java ---------------------------------------------------------------------- diff --git a/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/consumer/JmsMessageConsumerTest.java b/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/consumer/JmsMessageConsumerTest.java new file mode 100644 index 0000000..609b46a --- /dev/null +++ b/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/consumer/JmsMessageConsumerTest.java @@ -0,0 +1,467 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.qpid.jms.consumer; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +import javax.jms.Connection; +import javax.jms.DeliveryMode; +import javax.jms.InvalidSelectorException; +import javax.jms.JMSException; +import javax.jms.JMSSecurityException; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageListener; +import javax.jms.MessageProducer; +import javax.jms.Queue; +import javax.jms.Session; +import javax.jms.TextMessage; +import javax.jms.Topic; + +import org.apache.activemq.broker.jmx.QueueViewMBean; +import org.apache.activemq.broker.jmx.TopicViewMBean; +import org.apache.qpid.jms.JmsMessageAvailableListener; +import org.apache.qpid.jms.JmsMessageConsumer; +import org.apache.qpid.jms.support.AmqpTestSupport; +import org.apache.qpid.jms.support.Wait; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Test for basic JMS MessageConsumer functionality. + */ +public class JmsMessageConsumerTest extends AmqpTestSupport { + + protected static final Logger LOG = LoggerFactory.getLogger(JmsMessageConsumerTest.class); + + @Override + public boolean isPersistent() { + return true; + } + + @Test(timeout = 60000) + public void testCreateMessageConsumer() throws Exception { + connection = createAmqpConnection(); + connection.start(); + + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + assertNotNull(session); + Queue queue = session.createQueue(name.getMethodName()); + session.createConsumer(queue); + + QueueViewMBean proxy = getProxyToQueue(name.getMethodName()); + assertEquals(0, proxy.getQueueSize()); + } + + @Test(timeout = 60000) + public void testSyncConsumeFromQueue() throws Exception { + connection = createAmqpConnection(); + connection.start(); + + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + assertNotNull(session); + Queue queue = session.createQueue(name.getMethodName()); + MessageConsumer consumer = session.createConsumer(queue); + + sendToAmqQueue(1); + + final QueueViewMBean proxy = getProxyToQueue(name.getMethodName()); + assertEquals(1, proxy.getQueueSize()); + + assertNotNull("Failed to receive any message.", consumer.receive(2000)); + + assertTrue("Queued message not consumed.", Wait.waitFor(new Wait.Condition() { + + @Override + public boolean isSatisified() throws Exception { + return proxy.getQueueSize() == 0; + } + })); + } + + @Test(timeout = 60000) + public void testSyncConsumeFromTopic() throws Exception { + connection = createAmqpConnection(); + connection.start(); + + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + assertNotNull(session); + Topic topic = session.createTopic(name.getMethodName()); + MessageConsumer consumer = session.createConsumer(topic); + + sendToAmqTopic(1); + + final TopicViewMBean proxy = getProxyToTopic(name.getMethodName()); + //assertEquals(1, proxy.getQueueSize()); + + assertNotNull("Failed to receive any message.", consumer.receive(2000)); + + assertTrue("Published message not consumed.", Wait.waitFor(new Wait.Condition() { + + @Override + public boolean isSatisified() throws Exception { + return proxy.getQueueSize() == 0; + } + })); + } + + @Test(timeout = 60000) + public void testMessageAvailableConsumer() throws Exception { + connection = createAmqpConnection(); + connection.start(); + + final int MSG_COUNT = 10; + final AtomicInteger available = new AtomicInteger(); + + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + assertNotNull(session); + Queue queue = session.createQueue(name.getMethodName()); + MessageConsumer consumer = session.createConsumer(queue); + ((JmsMessageConsumer) consumer).setAvailableListener(new JmsMessageAvailableListener() { + + @Override + public void onMessageAvailable(MessageConsumer consumer) { + available.incrementAndGet(); + } + }); + + sendToAmqQueue(MSG_COUNT); + + final QueueViewMBean proxy = getProxyToQueue(name.getMethodName()); + assertEquals(MSG_COUNT, proxy.getQueueSize()); + + assertTrue("Listener not notified of correct number of messages.", Wait.waitFor(new Wait.Condition() { + + @Override + public boolean isSatisified() throws Exception { + return available.get() == MSG_COUNT; + } + })); + + // All should be immediately ready for consume. + for (int i = 0; i < MSG_COUNT; ++i) { + assertNotNull(consumer.receiveNoWait()); + } + + assertTrue("Queued message not consumed.", Wait.waitFor(new Wait.Condition() { + + @Override + public boolean isSatisified() throws Exception { + return proxy.getQueueSize() == 0; + } + })); + } + + /** + * Test to check if consumer thread wakes up inside a receive(timeout) after + * a message is dispatched to the consumer. We do a long poll here to ensure + * that a blocked receive with timeout does eventually get a Message. We don't + * want to test the short poll and retry case here since that's not what we are + * testing. + * + * @throws Exception + */ + @Test(timeout=60000) + public void testConsumerReceiveBeforeMessageDispatched() throws Exception { + final Connection connection = createAmqpConnection(); + this.connection = connection; + connection.start(); + + final Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + final Queue queue = session.createQueue(name.getMethodName()); + + Thread t = new Thread() { + @Override + public void run() { + try { + TimeUnit.SECONDS.sleep(10); + MessageProducer producer = session.createProducer(queue); + producer.send(session.createTextMessage("Hello")); + } catch (Exception e) { + LOG.warn("Caught during message send: {}", e.getMessage()); + } + } + }; + t.start(); + MessageConsumer consumer = session.createConsumer(queue); + Message msg = consumer.receive(60000); + assertNotNull(msg); + } + + @Test(timeout=60000) + public void testAsynchronousMessageConsumption() throws Exception { + final int msgCount = 4; + final Connection connection = createAmqpConnection(); + final AtomicInteger counter = new AtomicInteger(0); + final CountDownLatch done = new CountDownLatch(1); + this.connection = connection; + + connection.start(); + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + Queue destination = session.createQueue(name.getMethodName()); + MessageConsumer consumer = session.createConsumer(destination); + + consumer.setMessageListener(new MessageListener() { + @Override + public void onMessage(Message m) { + LOG.debug("Async consumer got Message: {}", m); + counter.incrementAndGet(); + if (counter.get() == msgCount) { + done.countDown(); + } + } + }); + + sendToAmqQueue(msgCount); + assertTrue(done.await(1000, TimeUnit.MILLISECONDS)); + TimeUnit.SECONDS.sleep(1); + assertEquals(msgCount, counter.get()); + } + + @Test(timeout=60000) + public void testSyncReceiveFailsWhenListenerSet() throws Exception { + final int msgCount = 4; + final Connection connection = createAmqpConnection(); + final AtomicInteger counter = new AtomicInteger(0); + final CountDownLatch done = new CountDownLatch(1); + this.connection = connection; + + connection.start(); + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + Queue destination = session.createQueue(name.getMethodName()); + MessageConsumer consumer = session.createConsumer(destination); + + consumer.setMessageListener(new MessageListener() { + @Override + public void onMessage(Message m) { + LOG.debug("Async consumer got Message: {}", m); + counter.incrementAndGet(); + if (counter.get() == msgCount) { + done.countDown(); + } + } + }); + + try { + consumer.receive(); + fail("Should have thrown an exception."); + } catch (JMSException ex) { + } + + try { + consumer.receive(1000); + fail("Should have thrown an exception."); + } catch (JMSException ex) { + } + + try { + consumer.receiveNoWait(); + fail("Should have thrown an exception."); + } catch (JMSException ex) { + } + } + + @Test(timeout=60000) + public void testSetMessageListenerAfterStartAndSend() throws Exception { + final int msgCount = 4; + final Connection connection = createAmqpConnection(); + final AtomicInteger counter = new AtomicInteger(0); + final CountDownLatch done = new CountDownLatch(1); + this.connection = connection; + + connection.start(); + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + Queue destination = session.createQueue(name.getMethodName()); + MessageConsumer consumer = session.createConsumer(destination); + sendToAmqQueue(msgCount); + + consumer.setMessageListener(new MessageListener() { + @Override + public void onMessage(Message m) { + LOG.debug("Async consumer got Message: {}", m); + counter.incrementAndGet(); + if (counter.get() == msgCount) { + done.countDown(); + } + } + }); + + assertTrue(done.await(1000, TimeUnit.MILLISECONDS)); + TimeUnit.SECONDS.sleep(1); + assertEquals(msgCount, counter.get()); + } + + @Test(timeout=60000) + public void testNoReceivedMessagesWhenConnectionNotStarted() throws Exception { + connection = createAmqpConnection(); + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + Queue destination = session.createQueue(name.getMethodName()); + MessageConsumer consumer = session.createConsumer(destination); + sendToAmqQueue(3); + assertNull(consumer.receive(2000)); + } + + @Test(timeout = 60000) + public void testMessagesAreAckedAMQProducer() throws Exception { + int messagesSent = 3; + assertTrue(brokerService.isPersistent()); + + connection = createActiveMQConnection(); + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + Queue queue = session.createQueue(name.getMethodName()); + MessageProducer p = session.createProducer(queue); + TextMessage message = null; + for (int i=0; i < messagesSent; i++) { + message = session.createTextMessage(); + String messageText = "Hello " + i + " sent at " + new java.util.Date().toString(); + message.setText(messageText); + LOG.debug(">>>> Sent [{}]", messageText); + p.send(message); + } + + // After the first restart we should get all messages sent above + restartPrimaryBroker(); + int messagesReceived = readAllMessages(); + assertEquals(messagesSent, messagesReceived); + + // This time there should be no messages on this queue + restartPrimaryBroker(); + messagesReceived = readAllMessages(); + assertEquals(0, messagesReceived); + } + + @Test(timeout = 60000) + public void testMessagesAreAckedAMQPProducer() throws Exception { + int messagesSent = 3; + + connection = createAmqpConnection(); + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + Queue queue = session.createQueue(name.getMethodName()); + MessageProducer producer = session.createProducer(queue); + producer.setDeliveryMode(DeliveryMode.PERSISTENT); + TextMessage message = null; + for (int i=0; i < messagesSent; i++) { + message = session.createTextMessage(); + String messageText = "Hello " + i + " sent at " + new java.util.Date().toString(); + message.setText(messageText); + LOG.debug(">>>> Sent [{}]", messageText); + producer.send(message); + } + + connection.close(); + + // After the first restart we should get all messages sent above + restartPrimaryBroker(); + int messagesReceived = readAllMessages(); + assertEquals(messagesSent, messagesReceived); + + // This time there should be no messages on this queue + restartPrimaryBroker(); + messagesReceived = readAllMessages(); + assertEquals(0, messagesReceived); + } + + private int readAllMessages() throws Exception { + return readAllMessages(null); + } + + private int readAllMessages(String selector) throws Exception { + Connection connection = createAmqpConnection(); + connection.start(); + try { + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + Queue queue = session.createQueue(name.getMethodName()); + int messagesReceived = 0; + MessageConsumer consumer; + + if (selector == null) { + consumer = session.createConsumer(queue); + } else { + consumer = session.createConsumer(queue, selector); + } + + Message msg = consumer.receive(5000); + while (msg != null) { + assertNotNull(msg); + assertTrue(msg instanceof TextMessage); + TextMessage textMessage = (TextMessage) msg; + LOG.debug(">>>> Received [{}]", textMessage.getText()); + messagesReceived++; + msg = consumer.receive(5000); + } + + consumer.close(); + return messagesReceived; + } finally { + connection.close(); + } + } + + @Test(timeout=30000) + public void testSelectors() throws Exception{ + connection = createAmqpConnection(); + connection.start(); + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + Queue queue = session.createQueue(name.getMethodName()); + MessageProducer p = session.createProducer(queue); + + TextMessage message = session.createTextMessage(); + message.setText("hello"); + p.send(message, DeliveryMode.PERSISTENT, 5, 0); + + message = session.createTextMessage(); + message.setText("hello + 9"); + p.send(message, DeliveryMode.PERSISTENT, 9, 0); + + QueueViewMBean proxy = getProxyToQueue(name.getMethodName()); + assertEquals(2, proxy.getQueueSize()); + + MessageConsumer consumer = session.createConsumer(queue, "JMSPriority > 8"); + Message msg = consumer.receive(5000); + assertNotNull(msg); + assertTrue(msg instanceof TextMessage); + assertEquals("hello + 9", ((TextMessage) msg).getText()); + assertNull(consumer.receive(1000)); + } + + @Test(timeout=90000, expected=JMSSecurityException.class) + public void testConsumerNotAuthorized() throws Exception{ + connection = createAmqpConnection("guest", "password"); + connection.start(); + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + Queue queue = session.createQueue("USERS." + name.getMethodName()); + session.createConsumer(queue); + } + + @Test(timeout=90000, expected=InvalidSelectorException.class) + public void testInvalidSelector() throws Exception{ + connection = createAmqpConnection(); + connection.start(); + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + Queue queue = session.createQueue(name.getMethodName()); + session.createConsumer(queue, "3+5"); + } +} http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e4decdc1/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/consumer/JmsMessageGroupTest.java ---------------------------------------------------------------------- diff --git a/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/consumer/JmsMessageGroupTest.java b/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/consumer/JmsMessageGroupTest.java new file mode 100644 index 0000000..395edfe --- /dev/null +++ b/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/consumer/JmsMessageGroupTest.java @@ -0,0 +1,89 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.qpid.jms.consumer; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; + +import javax.jms.Connection; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Queue; +import javax.jms.Session; +import javax.jms.TextMessage; + +import org.apache.qpid.jms.support.AmqpTestSupport; +import org.junit.Ignore; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class JmsMessageGroupTest extends AmqpTestSupport { + + private static final Logger LOG = LoggerFactory.getLogger(JmsMessageGroupTest.class); + + @Ignore // TODO - FIXME + @Test(timeout = 60000) + public void testGroupedMessagesDeliveredToOnlyOneConsumer() throws Exception { + connection = createAmqpConnection(); + connection.start(); + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + Queue queue = session.createQueue(name.getMethodName()); + MessageConsumer consumer1 = session.createConsumer(queue); + MessageProducer producer = session.createProducer(queue); + + // Send the messages. + for (int i = 0; i < 4; i++) { + TextMessage message = session.createTextMessage("message " + i); + message.setStringProperty("JMSXGroupID", "TEST-GROUP"); + message.setIntProperty("JMSXGroupSeq", i + 1); + LOG.info("sending message: " + message); + producer.send(message); + } + + // All the messages should have been sent down connection 1.. just get + // the first 3 + for (int i = 0; i < 3; i++) { + TextMessage m1 = (TextMessage) consumer1.receive(500); + assertNotNull("m1 is null for index: " + i, m1); + assertEquals(m1.getIntProperty("JMSXGroupSeq"), i + 1); + } + + // Setup a second connection + Connection connection1 = createAmqpConnection(); + connection1.start(); + Session session2 = connection1.createSession(false, Session.AUTO_ACKNOWLEDGE); + MessageConsumer consumer2 = session2.createConsumer(queue); + + // Close the first consumer. + consumer1.close(); + + // The last messages should now go the the second consumer. + for (int i = 0; i < 1; i++) { + TextMessage m1 = (TextMessage) consumer2.receive(500); + assertNotNull("m1 is null for index: " + i, m1); + assertEquals(m1.getIntProperty("JMSXGroupSeq"), 4 + i); + } + + // assert that there are no other messages left for the consumer 2 + Message m = consumer2.receive(100); + assertNull("consumer 2 has some messages left", m); + connection1.close(); + } +} http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e4decdc1/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/consumer/JmsQueueBrowserTest.java ---------------------------------------------------------------------- diff --git a/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/consumer/JmsQueueBrowserTest.java b/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/consumer/JmsQueueBrowserTest.java new file mode 100644 index 0000000..dde6451 --- /dev/null +++ b/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/consumer/JmsQueueBrowserTest.java @@ -0,0 +1,140 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.qpid.jms.consumer; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; + +import java.util.Enumeration; + +import javax.jms.Message; +import javax.jms.Queue; +import javax.jms.QueueBrowser; +import javax.jms.Session; + +import org.apache.activemq.broker.jmx.QueueViewMBean; +import org.apache.qpid.jms.JmsConnection; +import org.apache.qpid.jms.support.AmqpTestSupport; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Test Basic Queue Browser implementation. + */ +public class JmsQueueBrowserTest extends AmqpTestSupport { + + protected static final Logger LOG = LoggerFactory.getLogger(JmsQueueBrowserTest.class); + + @Test(timeout = 60000) + public void testCreateQueueBrowser() throws Exception { + connection = createAmqpConnection(); + connection.start(); + + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + assertNotNull(session); + Queue queue = session.createQueue(name.getMethodName()); + session.createConsumer(queue).close(); + + QueueBrowser browser = session.createBrowser(queue); + assertNotNull(browser); + + QueueViewMBean proxy = getProxyToQueue(name.getMethodName()); + assertEquals(0, proxy.getQueueSize()); + } + + @SuppressWarnings("rawtypes") + @Test(timeout = 60000) + public void testNoMessagesBrowserHasNoElements() throws Exception { + connection = createAmqpConnection(); + connection.start(); + + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + assertNotNull(session); + Queue queue = session.createQueue(name.getMethodName()); + session.createConsumer(queue).close(); + + QueueBrowser browser = session.createBrowser(queue); + assertNotNull(browser); + + QueueViewMBean proxy = getProxyToQueue(name.getMethodName()); + assertEquals(0, proxy.getQueueSize()); + + Enumeration enumeration = browser.getEnumeration(); + assertFalse(enumeration.hasMoreElements()); + } + + @SuppressWarnings("rawtypes") + @Test(timeout = 60000) + public void testBrowseAllInQueue() throws Exception { + connection = createAmqpConnection(); + connection.start(); + + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + assertNotNull(session); + Queue queue = session.createQueue(name.getMethodName()); + sendToAmqQueue(5); + + QueueViewMBean proxy = getProxyToQueue(name.getMethodName()); + assertEquals(5, proxy.getQueueSize()); + + QueueBrowser browser = session.createBrowser(queue); + assertNotNull(browser); + Enumeration enumeration = browser.getEnumeration(); + int count = 0; + while (enumeration.hasMoreElements()) { + Message msg = (Message) enumeration.nextElement(); + assertNotNull(msg); + LOG.debug("Recv: {}", msg); + count++; + } + assertFalse(enumeration.hasMoreElements()); + assertEquals(5, count); + } + + @SuppressWarnings("rawtypes") + @Test(timeout = 90000) + public void testBrowseAllInQueueSmallPrefetch() throws Exception { + connection = createAmqpConnection(); + ((JmsConnection) connection).getPrefetchPolicy().setQueueBrowserPrefetch(10); + connection.start(); + + final int MSG_COUNT = 30; + + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + assertNotNull(session); + Queue queue = session.createQueue(name.getMethodName()); + sendToAmqQueue(MSG_COUNT); + + QueueViewMBean proxy = getProxyToQueue(name.getMethodName()); + assertEquals(MSG_COUNT, proxy.getQueueSize()); + + QueueBrowser browser = session.createBrowser(queue); + assertNotNull(browser); + Enumeration enumeration = browser.getEnumeration(); + int count = 0; + while (enumeration.hasMoreElements()) { + Message msg = (Message) enumeration.nextElement(); + assertNotNull(msg); + LOG.debug("Recv: {}", msg); + count++; + } + assertFalse(enumeration.hasMoreElements()); + assertEquals(MSG_COUNT, count); + } +} http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e4decdc1/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/consumer/JmsZeroPrefetchTest.java ---------------------------------------------------------------------- diff --git a/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/consumer/JmsZeroPrefetchTest.java b/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/consumer/JmsZeroPrefetchTest.java new file mode 100644 index 0000000..eef250b --- /dev/null +++ b/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/consumer/JmsZeroPrefetchTest.java @@ -0,0 +1,110 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.qpid.jms.consumer; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; + +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageListener; +import javax.jms.MessageProducer; +import javax.jms.Queue; +import javax.jms.Session; +import javax.jms.TextMessage; + +import org.apache.qpid.jms.JmsConnection; +import org.apache.qpid.jms.support.AmqpTestSupport; +import org.junit.Ignore; +import org.junit.Test; + +/** + * + */ +public class JmsZeroPrefetchTest extends AmqpTestSupport { + + @Test(timeout=60000, expected=JMSException.class) + public void testCannotUseMessageListener() throws Exception { + connection = createAmqpConnection(); + ((JmsConnection)connection).getPrefetchPolicy().setAll(0); + connection.start(); + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + Queue queue = session.createQueue(name.getMethodName()); + MessageConsumer consumer = session.createConsumer(queue); + + MessageListener listener = new MessageListener() { + + @Override + public void onMessage(Message message) { + } + }; + + consumer.setMessageListener(listener); + } + + @Test(timeout = 60000) + public void testPullConsumerWorks() throws Exception { + connection = createAmqpConnection(); + ((JmsConnection)connection).getPrefetchPolicy().setAll(0); + connection.start(); + + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + Queue queue = session.createQueue(name.getMethodName()); + MessageProducer producer = session.createProducer(queue); + producer.send(session.createTextMessage("Hello World!")); + + // now lets receive it + MessageConsumer consumer = session.createConsumer(queue); + Message answer = consumer.receive(5000); + assertNotNull("Should have received a message!", answer); + // check if method will return at all and will return a null + answer = consumer.receive(1); + assertNull("Should have not received a message!", answer); + answer = consumer.receiveNoWait(); + assertNull("Should have not received a message!", answer); + } + + @Ignore // ActiveMQ doesn't honor link credit. + @Test(timeout = 60000) + public void testTwoConsumers() throws Exception { + connection = createAmqpConnection(); + ((JmsConnection)connection).getPrefetchPolicy().setAll(0); + connection.start(); + + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + Queue queue = session.createQueue(name.getMethodName()); + + MessageProducer producer = session.createProducer(queue); + producer.send(session.createTextMessage("Msg1")); + producer.send(session.createTextMessage("Msg2")); + + // now lets receive it + MessageConsumer consumer1 = session.createConsumer(queue); + MessageConsumer consumer2 = session.createConsumer(queue); + TextMessage answer = (TextMessage)consumer1.receive(5000); + assertNotNull(answer); + assertEquals("Should have received a message!", answer.getText(), "Msg1"); + answer = (TextMessage)consumer2.receive(5000); + assertNotNull(answer); + assertEquals("Should have received a message!", answer.getText(), "Msg2"); + + answer = (TextMessage)consumer2.receiveNoWait(); + assertNull("Should have not received a message!", answer); + } +} http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e4decdc1/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/destinations/JmsTemporaryQueueTest.java ---------------------------------------------------------------------- diff --git a/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/destinations/JmsTemporaryQueueTest.java b/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/destinations/JmsTemporaryQueueTest.java new file mode 100644 index 0000000..1eccaf2 --- /dev/null +++ b/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/destinations/JmsTemporaryQueueTest.java @@ -0,0 +1,49 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.qpid.jms.destinations; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; + +import javax.jms.Session; +import javax.jms.TemporaryQueue; + +import org.apache.qpid.jms.support.AmqpTestSupport; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Test functionality of Temporary Queues. + */ +public class JmsTemporaryQueueTest extends AmqpTestSupport { + + protected static final Logger LOG = LoggerFactory.getLogger(JmsTemporaryQueueTest.class); + + @Test(timeout = 60000) + public void testCreateTemporaryQueue() throws Exception { + connection = createAmqpConnection(); + connection.start(); + + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + assertNotNull(session); + TemporaryQueue queue = session.createTemporaryQueue(); + session.createConsumer(queue); + + assertEquals(1, brokerService.getAdminView().getTemporaryQueues().length); + } +} http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e4decdc1/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/destinations/JmsTemporaryTopicTest.java ---------------------------------------------------------------------- diff --git a/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/destinations/JmsTemporaryTopicTest.java b/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/destinations/JmsTemporaryTopicTest.java new file mode 100644 index 0000000..95e3303 --- /dev/null +++ b/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/destinations/JmsTemporaryTopicTest.java @@ -0,0 +1,52 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.qpid.jms.destinations; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; + +import javax.jms.Session; +import javax.jms.TemporaryTopic; + +import org.apache.qpid.jms.support.AmqpTestSupport; +import org.junit.Ignore; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Test functionality of Temporary Topics + */ +public class JmsTemporaryTopicTest extends AmqpTestSupport { + + protected static final Logger LOG = LoggerFactory.getLogger(JmsTemporaryTopicTest.class); + + // Temp Topics not yet supported on the Broker. + @Ignore + @Test(timeout = 60000) + public void testCreateTemporaryTopic() throws Exception { + connection = createAmqpConnection(); + connection.start(); + + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + assertNotNull(session); + TemporaryTopic topic = session.createTemporaryTopic(); + session.createConsumer(topic); + + assertEquals(1, brokerService.getAdminView().getTemporaryTopics().length); + } +} http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e4decdc1/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/discovery/JmsAmqpDiscoveryTest.java ---------------------------------------------------------------------- diff --git a/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/discovery/JmsAmqpDiscoveryTest.java b/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/discovery/JmsAmqpDiscoveryTest.java new file mode 100644 index 0000000..77cf6ce --- /dev/null +++ b/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/discovery/JmsAmqpDiscoveryTest.java @@ -0,0 +1,165 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.qpid.jms.discovery; + +import static org.junit.Assert.assertTrue; + +import java.net.URI; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +import javax.jms.Connection; + +import org.apache.qpid.jms.JmsConnection; +import org.apache.qpid.jms.JmsConnectionFactory; +import org.apache.qpid.jms.JmsConnectionListener; +import org.apache.qpid.jms.message.JmsInboundMessageDispatch; +import org.apache.qpid.jms.support.AmqpTestSupport; +import org.apache.qpid.jms.support.Wait; +import org.junit.Before; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Test that a Broker using AMQP can be discovered and JMS operations can be performed. + */ +public class JmsAmqpDiscoveryTest extends AmqpTestSupport implements JmsConnectionListener { + + private static final Logger LOG = LoggerFactory.getLogger(JmsAmqpDiscoveryTest.class); + + private CountDownLatch interrupted; + private CountDownLatch restored; + private JmsConnection jmsConnection; + + @Override + @Before + public void setUp() throws Exception { + super.setUp(); + + interrupted = new CountDownLatch(1); + restored = new CountDownLatch(1); + } + + @Test(timeout=60000) + public void testRunningBrokerIsDiscovered() throws Exception { + connection = createConnection(); + connection.start(); + + assertTrue("connection never connected.", Wait.waitFor(new Wait.Condition() { + + @Override + public boolean isSatisified() throws Exception { + return jmsConnection.isConnected(); + } + })); + } + + @Test(timeout=60000) + public void testConnectionFailsWhenBrokerGoesDown() throws Exception { + connection = createConnection(); + connection.start(); + + assertTrue("connection never connected.", Wait.waitFor(new Wait.Condition() { + + @Override + public boolean isSatisified() throws Exception { + return jmsConnection.isConnected(); + } + })); + + LOG.info("Connection established, stopping broker."); + stopPrimaryBroker(); + + assertTrue("Interrupted event never fired", interrupted.await(30, TimeUnit.SECONDS)); + } + + @Test(timeout=60000) + public void testConnectionRestoresAfterBrokerRestarted() throws Exception { + connection = createConnection(); + connection.start(); + + assertTrue("connection never connected.", Wait.waitFor(new Wait.Condition() { + + @Override + public boolean isSatisified() throws Exception { + return jmsConnection.isConnected(); + } + })); + + stopPrimaryBroker(); + assertTrue(interrupted.await(20, TimeUnit.SECONDS)); + startPrimaryBroker(); + assertTrue(restored.await(20, TimeUnit.SECONDS)); + } + + @Test(timeout=60000) + public void testDiscoversAndReconnectsToSecondaryBroker() throws Exception { + + connection = createConnection(); + connection.start(); + + assertTrue("connection never connected.", Wait.waitFor(new Wait.Condition() { + + @Override + public boolean isSatisified() throws Exception { + return jmsConnection.isConnected(); + } + })); + + startNewBroker(); + stopPrimaryBroker(); + + assertTrue(interrupted.await(20, TimeUnit.SECONDS)); + assertTrue(restored.await(20, TimeUnit.SECONDS)); + } + + @Override + protected boolean isAmqpDiscovery() { + return true; + } + + protected Connection createConnection() throws Exception { + JmsConnectionFactory factory = new JmsConnectionFactory( + "discovery:(multicast://default)?maxReconnectDelay=500"); + connection = factory.createConnection(); + jmsConnection = (JmsConnection) connection; + jmsConnection.addConnectionListener(this); + return connection; + } + + @Override + public void onConnectionFailure(Throwable error) { + LOG.info("Connection reported failover: {}", error.getMessage()); + } + + @Override + public void onConnectionInterrupted(URI remoteURI) { + LOG.info("Connection reports interrupted. Lost connection to -> {}", remoteURI); + interrupted.countDown(); + } + + @Override + public void onConnectionRestored(URI remoteURI) { + LOG.info("Connection reports restored. Connected to -> {}", remoteURI); + restored.countDown(); + } + + @Override + public void onMessage(JmsInboundMessageDispatch envelope) { + } +} http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e4decdc1/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/discovery/JmsDiscoveryProviderTest.java ---------------------------------------------------------------------- diff --git a/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/discovery/JmsDiscoveryProviderTest.java b/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/discovery/JmsDiscoveryProviderTest.java new file mode 100644 index 0000000..3d53957 --- /dev/null +++ b/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/discovery/JmsDiscoveryProviderTest.java @@ -0,0 +1,107 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.qpid.jms.discovery; + +import static org.junit.Assert.assertNotNull; + +import java.io.IOException; +import java.net.URI; + +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.broker.TransportConnector; +import org.apache.qpid.jms.provider.DefaultProviderListener; +import org.apache.qpid.jms.provider.Provider; +import org.apache.qpid.jms.provider.discovery.DiscoveryProviderFactory; +import org.junit.After; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TestName; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Test basic discovery of remote brokers + */ +public class JmsDiscoveryProviderTest { + + protected static final Logger LOG = LoggerFactory.getLogger(JmsDiscoveryProviderTest.class); + + @Rule public TestName name = new TestName(); + + private BrokerService broker; + + @Before + public void setup() throws Exception { + broker = createBroker(); + broker.start(); + broker.waitUntilStarted(); + } + + @After + public void tearDown() throws Exception { + if (broker != null) { + broker.stop(); + broker.waitUntilStopped(); + broker = null; + } + } + + @Test(timeout=30000) + public void testCreateDiscvoeryProvider() throws Exception { + URI discoveryUri = new URI("discovery:multicast://default"); + Provider provider = DiscoveryProviderFactory.createAsync(discoveryUri); + assertNotNull(provider); + + DefaultProviderListener listener = new DefaultProviderListener(); + provider.setProviderListener(listener); + provider.start(); + provider.close(); + } + + @Test(timeout=30000, expected=IllegalStateException.class) + public void testStartFailsWithNoListener() throws Exception { + URI discoveryUri = new URI("discovery:multicast://default"); + Provider provider = + DiscoveryProviderFactory.createAsync(discoveryUri); + assertNotNull(provider); + provider.start(); + provider.close(); + } + + @Test(timeout=30000, expected=IOException.class) + public void testCreateFailsWithUnknownAgent() throws Exception { + URI discoveryUri = new URI("discovery:unknown://default"); + Provider provider = DiscoveryProviderFactory.createAsync(discoveryUri); + provider.close(); + } + + protected BrokerService createBroker() throws Exception { + + BrokerService brokerService = new BrokerService(); + brokerService.setBrokerName("localhost"); + brokerService.setPersistent(false); + brokerService.setAdvisorySupport(false); + brokerService.setUseJmx(false); + + TransportConnector connector = brokerService.addConnector("amqp://0.0.0.0:0"); + connector.setName("amqp"); + connector.setDiscoveryUri(new URI("multicast://default")); + + return brokerService; + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
