Repository: qpid-jms Updated Branches: refs/heads/master cb0f1914c -> 96ddae77b
NO-JIRA: Add test to cover durable subscription update of noLocal value against ActiveMQ. Project: http://git-wip-us.apache.org/repos/asf/qpid-jms/repo Commit: http://git-wip-us.apache.org/repos/asf/qpid-jms/commit/96ddae77 Tree: http://git-wip-us.apache.org/repos/asf/qpid-jms/tree/96ddae77 Diff: http://git-wip-us.apache.org/repos/asf/qpid-jms/diff/96ddae77 Branch: refs/heads/master Commit: 96ddae77bf45fe909e36fa6f1ed5e1d62dd6713c Parents: cb0f191 Author: Timothy Bish <[email protected]> Authored: Tue May 26 15:35:09 2015 -0400 Committer: Timothy Bish <[email protected]> Committed: Tue May 26 15:35:09 2015 -0400 ---------------------------------------------------------------------- .../jms/consumer/JmsDurableSubscriberTest.java | 126 +++++++++++++++++-- 1 file changed, 113 insertions(+), 13 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/96ddae77/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/consumer/JmsDurableSubscriberTest.java ---------------------------------------------------------------------- diff --git a/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/consumer/JmsDurableSubscriberTest.java b/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/consumer/JmsDurableSubscriberTest.java index 9b540c5..e2bcb5b 100644 --- a/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/consumer/JmsDurableSubscriberTest.java +++ b/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/consumer/JmsDurableSubscriberTest.java @@ -18,6 +18,7 @@ package org.apache.qpid.jms.consumer; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; @@ -37,6 +38,8 @@ import javax.jms.TopicSubscriber; import org.apache.activemq.broker.jmx.BrokerViewMBean; import org.apache.activemq.broker.jmx.TopicViewMBean; import org.apache.qpid.jms.support.AmqpTestSupport; +import org.apache.qpid.jms.support.Wait; +import org.junit.Ignore; import org.junit.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -48,11 +51,17 @@ public class JmsDurableSubscriberTest extends AmqpTestSupport { protected static final Logger LOG = LoggerFactory.getLogger(JmsMessageConsumerTest.class); + private static final int MSG_COUNT = 10; + @Override public boolean isPersistent() { return true; } + public String getSubscriptionName() { + return name.getMethodName() + "-subscriber"; + } + @Test(timeout = 60000) public void testCreateDurableSubscriber() throws Exception { connection = createAmqpConnection(); @@ -62,7 +71,7 @@ public class JmsDurableSubscriberTest extends AmqpTestSupport { Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); assertNotNull(session); Topic topic = session.createTopic(name.getMethodName()); - MessageConsumer consumer = session.createDurableSubscriber(topic, name.getMethodName() + "-subscriber"); + MessageConsumer consumer = session.createDurableSubscriber(topic, getSubscriptionName()); TopicViewMBean proxy = getProxyToTopic(name.getMethodName()); assertEquals(0, proxy.getQueueSize()); @@ -85,12 +94,12 @@ public class JmsDurableSubscriberTest extends AmqpTestSupport { Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); assertNotNull(session); Topic topic = session.createTopic(name.getMethodName()); - session.createDurableSubscriber(topic, name.getMethodName() + "-subscriber").close(); + session.createDurableSubscriber(topic, getSubscriptionName()).close(); BrokerViewMBean broker = getProxyToBroker(); assertEquals(1, broker.getInactiveDurableTopicSubscribers().length); - session.unsubscribe(name.getMethodName() + "-subscriber"); + session.unsubscribe(getSubscriptionName()); assertEquals(0, broker.getInactiveDurableTopicSubscribers().length); assertEquals(0, broker.getDurableTopicSubscribers().length); @@ -110,7 +119,7 @@ public class JmsDurableSubscriberTest extends AmqpTestSupport { assertEquals(0, broker.getInactiveDurableTopicSubscribers().length); try { - session.unsubscribe(name.getMethodName() + "-subscriber"); + session.unsubscribe(getSubscriptionName()); fail("Should have thrown an InvalidDestinationException"); } catch (InvalidDestinationException ide) { } @@ -125,7 +134,7 @@ public class JmsDurableSubscriberTest extends AmqpTestSupport { Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); assertNotNull(session); Topic topic = session.createTopic(name.getMethodName()); - MessageConsumer consumer = session.createDurableSubscriber(topic, name.getMethodName() + "-subscriber"); + MessageConsumer consumer = session.createDurableSubscriber(topic, getSubscriptionName()); assertNotNull(consumer); BrokerViewMBean broker = getProxyToBroker(); @@ -133,7 +142,7 @@ public class JmsDurableSubscriberTest extends AmqpTestSupport { assertEquals(0, broker.getInactiveDurableTopicSubscribers().length); try { - session.unsubscribe(name.getMethodName() + "-subscriber"); + session.unsubscribe(getSubscriptionName()); fail("Should have thrown a JMSException"); } catch (JMSException ex) { } @@ -151,7 +160,7 @@ public class JmsDurableSubscriberTest extends AmqpTestSupport { Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); assertNotNull(session); Topic topic = session.createTopic(name.getMethodName()); - MessageConsumer consumer = session.createDurableSubscriber(topic, name.getMethodName() + "-subscriber"); + MessageConsumer consumer = session.createDurableSubscriber(topic, getSubscriptionName()); assertNotNull(consumer); BrokerViewMBean broker = getProxyToBroker(); @@ -159,7 +168,7 @@ public class JmsDurableSubscriberTest extends AmqpTestSupport { assertEquals(0, broker.getInactiveDurableTopicSubscribers().length); try { - session.unsubscribe(name.getMethodName() + "-subscriber"); + session.unsubscribe(getSubscriptionName()); fail("Should have thrown a JMSException"); } catch (JMSException ex) { } @@ -172,7 +181,7 @@ public class JmsDurableSubscriberTest extends AmqpTestSupport { assertEquals(0, broker.getDurableTopicSubscribers().length); assertEquals(1, broker.getInactiveDurableTopicSubscribers().length); - session.unsubscribe(name.getMethodName() + "-subscriber"); + session.unsubscribe(getSubscriptionName()); assertEquals(0, broker.getDurableTopicSubscribers().length); assertEquals(0, broker.getInactiveDurableTopicSubscribers().length); @@ -187,7 +196,7 @@ public class JmsDurableSubscriberTest extends AmqpTestSupport { Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); assertNotNull(session); Topic topic = session.createTopic(name.getMethodName()); - TopicSubscriber subscriber = session.createDurableSubscriber(topic, name.getMethodName() + "-subscriber"); + TopicSubscriber subscriber = session.createDurableSubscriber(topic, getSubscriptionName()); TopicViewMBean proxy = getProxyToTopic(name.getMethodName()); assertEquals(0, proxy.getQueueSize()); @@ -200,7 +209,7 @@ public class JmsDurableSubscriberTest extends AmqpTestSupport { assertEquals(0, brokerService.getAdminView().getDurableTopicSubscribers().length); assertEquals(1, brokerService.getAdminView().getInactiveDurableTopicSubscribers().length); - subscriber = session.createDurableSubscriber(topic, name.getMethodName() + "-subscriber"); + subscriber = session.createDurableSubscriber(topic, getSubscriptionName()); assertEquals(1, brokerService.getAdminView().getDurableTopicSubscribers().length); assertEquals(0, brokerService.getAdminView().getInactiveDurableTopicSubscribers().length); @@ -217,7 +226,7 @@ public class JmsDurableSubscriberTest extends AmqpTestSupport { Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); assertNotNull(session); Topic topic = session.createTopic(name.getMethodName()); - TopicSubscriber subscriber = session.createDurableSubscriber(topic, name.getMethodName() + "-subscriber"); + TopicSubscriber subscriber = session.createDurableSubscriber(topic, getSubscriptionName()); TopicViewMBean proxy = getProxyToTopic(name.getMethodName()); assertEquals(0, proxy.getQueueSize()); @@ -236,7 +245,7 @@ public class JmsDurableSubscriberTest extends AmqpTestSupport { producer.close(); LOG.info("Bringing offline subscription back online."); - subscriber = session.createDurableSubscriber(topic, name.getMethodName() + "-subscriber"); + subscriber = session.createDurableSubscriber(topic, getSubscriptionName()); assertEquals(1, brokerService.getAdminView().getDurableTopicSubscribers().length); assertEquals(0, brokerService.getAdminView().getInactiveDurableTopicSubscribers().length); @@ -253,4 +262,95 @@ public class JmsDurableSubscriberTest extends AmqpTestSupport { assertTrue("Only recieved messages: " + messages.getCount(), messages.await(30, TimeUnit.SECONDS)); } + + @Ignore("Fails currently as ActiveMQ doesn't update the recovered subscription") + @Test + public void testDurableResubscribeWithNewNoLocalValue() throws Exception { + connection = createAmqpConnection(); + connection.setClientID("DURABLE-AMQP"); + connection.start(); + + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + Topic topic = session.createTopic(getDestinationName()); + + // Create a Durable Topic Subscription with noLocal set to true. + MessageConsumer durableSubscriber = session.createDurableSubscriber(topic, getSubscriptionName(), null, true); + + // Create a Durable Topic Subscription with noLocal set to true. + MessageConsumer nonDurableSubscriber = session.createConsumer(topic); + + // Public first set, only the non durable sub should get these. + publishToTopic(session, topic); + + LOG.debug("Testing that noLocal=true subscription doesn't get any messages."); + + // Standard subscriber should receive them + for (int i = 0; i < MSG_COUNT; ++i) { + Message message = nonDurableSubscriber.receive(5000); + assertNotNull(message); + } + + // Durable noLocal=true subscription should not receive them + { + Message message = durableSubscriber.receive(2000); + assertNull(message); + } + + // Public second set for testing durable sub changed. + publishToTopic(session, topic); + + assertEquals(1, brokerService.getAdminView().getDurableTopicSubscribers().length); + assertEquals(0, brokerService.getAdminView().getInactiveDurableTopicSubscribers().length); + + // Durable now goes inactive. + durableSubscriber.close(); + + assertTrue("Should have no durables.", Wait.waitFor(new Wait.Condition() { + + @Override + public boolean isSatisified() throws Exception { + return brokerService.getAdminView().getDurableTopicSubscribers().length == 0; + } + })); + assertTrue("Should have an inactive sub.", Wait.waitFor(new Wait.Condition() { + + @Override + public boolean isSatisified() throws Exception { + return brokerService.getAdminView().getInactiveDurableTopicSubscribers().length == 1; + } + })); + + LOG.debug("Testing that updated noLocal=false subscription does get any messages."); + + // Recreate a Durable Topic Subscription with noLocal set to false. + durableSubscriber = session.createDurableSubscriber(topic, getSubscriptionName(), null, false); + + assertEquals(1, brokerService.getAdminView().getDurableTopicSubscribers().length); + assertEquals(0, brokerService.getAdminView().getInactiveDurableTopicSubscribers().length); + + // Durable noLocal=false subscription should not receive them as the subscriptions should + // have been removed and recreated to update the noLocal flag. + { + Message message = durableSubscriber.receive(2000); + assertNull(message); + } + + // Public third set which should get queued for the durable sub with noLocal=false + publishToTopic(session, topic); + + // Durable subscriber should receive them + for (int i = 0; i < MSG_COUNT; ++i) { + Message message = durableSubscriber.receive(5000); + assertNotNull("Should get local messages now", message); + } + } + + private void publishToTopic(Session session, Topic destination) throws Exception { + MessageProducer producer = session.createProducer(destination); + for (int i = 0; i < MSG_COUNT; ++i) { + producer.send(session.createMessage()); + } + + producer.close(); + } } --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
