Repository: qpid-jms
Updated Branches:
  refs/heads/master cb0f1914c -> 96ddae77b


NO-JIRA: Add test to cover durable subscription update of noLocal value
against ActiveMQ.

Project: http://git-wip-us.apache.org/repos/asf/qpid-jms/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-jms/commit/96ddae77
Tree: http://git-wip-us.apache.org/repos/asf/qpid-jms/tree/96ddae77
Diff: http://git-wip-us.apache.org/repos/asf/qpid-jms/diff/96ddae77

Branch: refs/heads/master
Commit: 96ddae77bf45fe909e36fa6f1ed5e1d62dd6713c
Parents: cb0f191
Author: Timothy Bish <[email protected]>
Authored: Tue May 26 15:35:09 2015 -0400
Committer: Timothy Bish <[email protected]>
Committed: Tue May 26 15:35:09 2015 -0400

----------------------------------------------------------------------
 .../jms/consumer/JmsDurableSubscriberTest.java  | 126 +++++++++++++++++--
 1 file changed, 113 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/96ddae77/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/consumer/JmsDurableSubscriberTest.java
----------------------------------------------------------------------
diff --git 
a/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/consumer/JmsDurableSubscriberTest.java
 
b/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/consumer/JmsDurableSubscriberTest.java
index 9b540c5..e2bcb5b 100644
--- 
a/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/consumer/JmsDurableSubscriberTest.java
+++ 
b/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/consumer/JmsDurableSubscriberTest.java
@@ -18,6 +18,7 @@ package org.apache.qpid.jms.consumer;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
@@ -37,6 +38,8 @@ import javax.jms.TopicSubscriber;
 import org.apache.activemq.broker.jmx.BrokerViewMBean;
 import org.apache.activemq.broker.jmx.TopicViewMBean;
 import org.apache.qpid.jms.support.AmqpTestSupport;
+import org.apache.qpid.jms.support.Wait;
+import org.junit.Ignore;
 import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -48,11 +51,17 @@ public class JmsDurableSubscriberTest extends 
AmqpTestSupport {
 
     protected static final Logger LOG = 
LoggerFactory.getLogger(JmsMessageConsumerTest.class);
 
+    private static final int MSG_COUNT = 10;
+
     @Override
     public boolean isPersistent() {
         return true;
     }
 
+    public String getSubscriptionName() {
+        return name.getMethodName() + "-subscriber";
+    }
+
     @Test(timeout = 60000)
     public void testCreateDurableSubscriber() throws Exception {
         connection = createAmqpConnection();
@@ -62,7 +71,7 @@ public class JmsDurableSubscriberTest extends AmqpTestSupport 
{
         Session session = connection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
         assertNotNull(session);
         Topic topic = session.createTopic(name.getMethodName());
-        MessageConsumer consumer = session.createDurableSubscriber(topic, 
name.getMethodName() + "-subscriber");
+        MessageConsumer consumer = session.createDurableSubscriber(topic, 
getSubscriptionName());
 
         TopicViewMBean proxy = getProxyToTopic(name.getMethodName());
         assertEquals(0, proxy.getQueueSize());
@@ -85,12 +94,12 @@ public class JmsDurableSubscriberTest extends 
AmqpTestSupport {
         Session session = connection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
         assertNotNull(session);
         Topic topic = session.createTopic(name.getMethodName());
-        session.createDurableSubscriber(topic, name.getMethodName() + 
"-subscriber").close();
+        session.createDurableSubscriber(topic, getSubscriptionName()).close();
 
         BrokerViewMBean broker = getProxyToBroker();
         assertEquals(1, broker.getInactiveDurableTopicSubscribers().length);
 
-        session.unsubscribe(name.getMethodName() + "-subscriber");
+        session.unsubscribe(getSubscriptionName());
 
         assertEquals(0, broker.getInactiveDurableTopicSubscribers().length);
         assertEquals(0, broker.getDurableTopicSubscribers().length);
@@ -110,7 +119,7 @@ public class JmsDurableSubscriberTest extends 
AmqpTestSupport {
         assertEquals(0, broker.getInactiveDurableTopicSubscribers().length);
 
         try {
-            session.unsubscribe(name.getMethodName() + "-subscriber");
+            session.unsubscribe(getSubscriptionName());
             fail("Should have thrown an InvalidDestinationException");
         } catch (InvalidDestinationException ide) {
         }
@@ -125,7 +134,7 @@ public class JmsDurableSubscriberTest extends 
AmqpTestSupport {
         Session session = connection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
         assertNotNull(session);
         Topic topic = session.createTopic(name.getMethodName());
-        MessageConsumer consumer = session.createDurableSubscriber(topic, 
name.getMethodName() + "-subscriber");
+        MessageConsumer consumer = session.createDurableSubscriber(topic, 
getSubscriptionName());
         assertNotNull(consumer);
 
         BrokerViewMBean broker = getProxyToBroker();
@@ -133,7 +142,7 @@ public class JmsDurableSubscriberTest extends 
AmqpTestSupport {
         assertEquals(0, broker.getInactiveDurableTopicSubscribers().length);
 
         try {
-            session.unsubscribe(name.getMethodName() + "-subscriber");
+            session.unsubscribe(getSubscriptionName());
             fail("Should have thrown a JMSException");
         } catch (JMSException ex) {
         }
@@ -151,7 +160,7 @@ public class JmsDurableSubscriberTest extends 
AmqpTestSupport {
         Session session = connection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
         assertNotNull(session);
         Topic topic = session.createTopic(name.getMethodName());
-        MessageConsumer consumer = session.createDurableSubscriber(topic, 
name.getMethodName() + "-subscriber");
+        MessageConsumer consumer = session.createDurableSubscriber(topic, 
getSubscriptionName());
         assertNotNull(consumer);
 
         BrokerViewMBean broker = getProxyToBroker();
@@ -159,7 +168,7 @@ public class JmsDurableSubscriberTest extends 
AmqpTestSupport {
         assertEquals(0, broker.getInactiveDurableTopicSubscribers().length);
 
         try {
-            session.unsubscribe(name.getMethodName() + "-subscriber");
+            session.unsubscribe(getSubscriptionName());
             fail("Should have thrown a JMSException");
         } catch (JMSException ex) {
         }
@@ -172,7 +181,7 @@ public class JmsDurableSubscriberTest extends 
AmqpTestSupport {
         assertEquals(0, broker.getDurableTopicSubscribers().length);
         assertEquals(1, broker.getInactiveDurableTopicSubscribers().length);
 
-        session.unsubscribe(name.getMethodName() + "-subscriber");
+        session.unsubscribe(getSubscriptionName());
 
         assertEquals(0, broker.getDurableTopicSubscribers().length);
         assertEquals(0, broker.getInactiveDurableTopicSubscribers().length);
@@ -187,7 +196,7 @@ public class JmsDurableSubscriberTest extends 
AmqpTestSupport {
         Session session = connection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
         assertNotNull(session);
         Topic topic = session.createTopic(name.getMethodName());
-        TopicSubscriber subscriber = session.createDurableSubscriber(topic, 
name.getMethodName() + "-subscriber");
+        TopicSubscriber subscriber = session.createDurableSubscriber(topic, 
getSubscriptionName());
 
         TopicViewMBean proxy = getProxyToTopic(name.getMethodName());
         assertEquals(0, proxy.getQueueSize());
@@ -200,7 +209,7 @@ public class JmsDurableSubscriberTest extends 
AmqpTestSupport {
         assertEquals(0, 
brokerService.getAdminView().getDurableTopicSubscribers().length);
         assertEquals(1, 
brokerService.getAdminView().getInactiveDurableTopicSubscribers().length);
 
-        subscriber = session.createDurableSubscriber(topic, 
name.getMethodName() + "-subscriber");
+        subscriber = session.createDurableSubscriber(topic, 
getSubscriptionName());
 
         assertEquals(1, 
brokerService.getAdminView().getDurableTopicSubscribers().length);
         assertEquals(0, 
brokerService.getAdminView().getInactiveDurableTopicSubscribers().length);
@@ -217,7 +226,7 @@ public class JmsDurableSubscriberTest extends 
AmqpTestSupport {
         Session session = connection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
         assertNotNull(session);
         Topic topic = session.createTopic(name.getMethodName());
-        TopicSubscriber subscriber = session.createDurableSubscriber(topic, 
name.getMethodName() + "-subscriber");
+        TopicSubscriber subscriber = session.createDurableSubscriber(topic, 
getSubscriptionName());
 
         TopicViewMBean proxy = getProxyToTopic(name.getMethodName());
         assertEquals(0, proxy.getQueueSize());
@@ -236,7 +245,7 @@ public class JmsDurableSubscriberTest extends 
AmqpTestSupport {
         producer.close();
 
         LOG.info("Bringing offline subscription back online.");
-        subscriber = session.createDurableSubscriber(topic, 
name.getMethodName() + "-subscriber");
+        subscriber = session.createDurableSubscriber(topic, 
getSubscriptionName());
 
         assertEquals(1, 
brokerService.getAdminView().getDurableTopicSubscribers().length);
         assertEquals(0, 
brokerService.getAdminView().getInactiveDurableTopicSubscribers().length);
@@ -253,4 +262,95 @@ public class JmsDurableSubscriberTest extends 
AmqpTestSupport {
 
         assertTrue("Only recieved messages: " + messages.getCount(), 
messages.await(30, TimeUnit.SECONDS));
     }
+
+    @Ignore("Fails currently as ActiveMQ doesn't update the recovered 
subscription")
+    @Test
+    public void testDurableResubscribeWithNewNoLocalValue() throws Exception {
+        connection = createAmqpConnection();
+        connection.setClientID("DURABLE-AMQP");
+        connection.start();
+
+        Session session = connection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
+        Topic topic = session.createTopic(getDestinationName());
+
+        // Create a Durable Topic Subscription with noLocal set to true.
+        MessageConsumer durableSubscriber = 
session.createDurableSubscriber(topic, getSubscriptionName(), null, true);
+
+        // Create a Durable Topic Subscription with noLocal set to true.
+        MessageConsumer nonDurableSubscriber = session.createConsumer(topic);
+
+        // Public first set, only the non durable sub should get these.
+        publishToTopic(session, topic);
+
+        LOG.debug("Testing that noLocal=true subscription doesn't get any 
messages.");
+
+        // Standard subscriber should receive them
+        for (int i = 0; i < MSG_COUNT; ++i) {
+            Message message = nonDurableSubscriber.receive(5000);
+            assertNotNull(message);
+        }
+
+        // Durable noLocal=true subscription should not receive them
+        {
+            Message message = durableSubscriber.receive(2000);
+            assertNull(message);
+        }
+
+        // Public second set for testing durable sub changed.
+        publishToTopic(session, topic);
+
+        assertEquals(1, 
brokerService.getAdminView().getDurableTopicSubscribers().length);
+        assertEquals(0, 
brokerService.getAdminView().getInactiveDurableTopicSubscribers().length);
+
+        // Durable now goes inactive.
+        durableSubscriber.close();
+
+        assertTrue("Should have no durables.", Wait.waitFor(new 
Wait.Condition() {
+
+            @Override
+            public boolean isSatisified() throws Exception {
+                return 
brokerService.getAdminView().getDurableTopicSubscribers().length == 0;
+            }
+        }));
+        assertTrue("Should have an inactive sub.", Wait.waitFor(new 
Wait.Condition() {
+
+            @Override
+            public boolean isSatisified() throws Exception {
+                return 
brokerService.getAdminView().getInactiveDurableTopicSubscribers().length == 1;
+            }
+        }));
+
+        LOG.debug("Testing that updated noLocal=false subscription does get 
any messages.");
+
+        // Recreate a Durable Topic Subscription with noLocal set to false.
+        durableSubscriber = session.createDurableSubscriber(topic, 
getSubscriptionName(), null, false);
+
+        assertEquals(1, 
brokerService.getAdminView().getDurableTopicSubscribers().length);
+        assertEquals(0, 
brokerService.getAdminView().getInactiveDurableTopicSubscribers().length);
+
+        // Durable noLocal=false subscription should not receive them as the 
subscriptions should
+        // have been removed and recreated to update the noLocal flag.
+        {
+            Message message = durableSubscriber.receive(2000);
+            assertNull(message);
+        }
+
+        // Public third set which should get queued for the durable sub with 
noLocal=false
+        publishToTopic(session, topic);
+
+        // Durable subscriber should receive them
+        for (int i = 0; i < MSG_COUNT; ++i) {
+            Message message = durableSubscriber.receive(5000);
+            assertNotNull("Should get local messages now", message);
+        }
+    }
+
+    private void publishToTopic(Session session, Topic destination) throws 
Exception {
+        MessageProducer producer = session.createProducer(destination);
+        for (int i = 0; i < MSG_COUNT; ++i) {
+            producer.send(session.createMessage());
+        }
+
+        producer.close();
+    }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to