Repository: activemq
Updated Branches:
  refs/heads/master 6c01b641b -> 7c293b661


https://issues.apache.org/jira/browse/AMQ-6430

When a nolocal durable consumer reconnects the new connectionId is properly 
captured for
the NoLocal expression so that nolocal works on reconnect.  Also fixed
the detection of the nolocal value changing on consumer connect.


Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/7c293b66
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/7c293b66
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/7c293b66

Branch: refs/heads/master
Commit: 7c293b661f22245ce21bf2b5aa1c5bf4192cb8c5
Parents: 6c01b64
Author: Christopher L. Shannon (cshannon) <christopher.l.shan...@gmail.com>
Authored: Wed Sep 21 09:32:37 2016 -0400
Committer: Christopher L. Shannon (cshannon) <christopher.l.shan...@gmail.com>
Committed: Wed Sep 21 09:34:46 2016 -0400

----------------------------------------------------------------------
 .../activemq/broker/region/TopicRegion.java     |  15 +-
 .../DurableSubscriptionWithNoLocalTest.java     | 143 ++++++++++++++++++-
 2 files changed, 153 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/7c293b66/activemq-broker/src/main/java/org/apache/activemq/broker/region/TopicRegion.java
----------------------------------------------------------------------
diff --git 
a/activemq-broker/src/main/java/org/apache/activemq/broker/region/TopicRegion.java
 
b/activemq-broker/src/main/java/org/apache/activemq/broker/region/TopicRegion.java
index 51c9beb..eca3449 100755
--- 
a/activemq-broker/src/main/java/org/apache/activemq/broker/region/TopicRegion.java
+++ 
b/activemq-broker/src/main/java/org/apache/activemq/broker/region/TopicRegion.java
@@ -162,6 +162,12 @@ public class TopicRegion extends AbstractRegion {
                         sub.context = context;
                         sub.deactivate(keepDurableSubsActive, 
info.getLastDeliveredSequenceId());
                     }
+                    //If NoLocal we need to update the NoLocal selector with 
the new connectionId
+                    //Simply setting the selector with the current one will 
trigger a
+                    //refresh of of the connectionId for the NoLocal expression
+                    if (info.isNoLocal()) {
+                        sub.setSelector(sub.getSelector());
+                    }
                     subscriptions.put(info.getConsumerId(), sub);
                 }
             } else {
@@ -189,8 +195,9 @@ public class TopicRegion extends AbstractRegion {
                 // deactivate only if given context is same
                 // as what is in the sub. otherwise, during linksteal
                 // sub will get new context, but will be removed here
-                if (sub.getContext() == context)
+                if (sub.getContext() == context) {
                     sub.deactivate(keepDurableSubsActive, 
info.getLastDeliveredSequenceId());
+                }
             }
         } else {
             super.removeConsumer(context, info);
@@ -373,6 +380,12 @@ public class TopicRegion extends AbstractRegion {
         if (info1.getSelector() != null && 
!info1.getSelector().equals(info2.getSelector())) {
             return true;
         }
+        // Prior to V11 the broker did not store the noLocal value for durable 
subs.
+        if (broker.getBrokerService().getStoreOpenWireVersion() >= 11) {
+            if (info1.isNoLocal() ^ info2.isNoLocal()) {
+                return true;
+            }
+        }
         return !info1.getDestination().equals(info2.getDestination());
     }
 

http://git-wip-us.apache.org/repos/asf/activemq/blob/7c293b66/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/DurableSubscriptionWithNoLocalTest.java
----------------------------------------------------------------------
diff --git 
a/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/DurableSubscriptionWithNoLocalTest.java
 
b/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/DurableSubscriptionWithNoLocalTest.java
index 4ecf811..ecbfac1 100644
--- 
a/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/DurableSubscriptionWithNoLocalTest.java
+++ 
b/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/DurableSubscriptionWithNoLocalTest.java
@@ -20,18 +20,29 @@ import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
 
 import java.io.File;
-
+import java.util.Arrays;
+import java.util.Collection;
+
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageListener;
+import javax.jms.MessageProducer;
 import javax.jms.Session;
+import javax.jms.TextMessage;
 import javax.jms.Topic;
 import javax.jms.TopicConnection;
 import javax.jms.TopicPublisher;
 import javax.jms.TopicSession;
 import javax.jms.TopicSubscriber;
 
+import org.apache.activemq.ActiveMQConnection;
 import org.apache.activemq.ActiveMQConnectionFactory;
 import org.apache.activemq.broker.BrokerService;
 import org.apache.activemq.broker.TransportConnector;
+import org.apache.activemq.command.ActiveMQTopic;
 import org.apache.activemq.command.CommandTypes;
+import org.apache.activemq.network.DurableSyncNetworkBridgeTest.FLOW;
 import org.apache.activemq.store.kahadb.KahaDBStore;
 import org.junit.After;
 import org.junit.Before;
@@ -39,12 +50,16 @@ import org.junit.Ignore;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.TestName;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 /**
  * Test for spec compliance for durable subscriptions that change the noLocal 
flag.
  */
+@RunWith(Parameterized.class)
 public class DurableSubscriptionWithNoLocalTest {
 
     private static final Logger LOG = 
LoggerFactory.getLogger(DurableSubscriptionWithNoLocalTest.class);
@@ -57,6 +72,19 @@ public class DurableSubscriptionWithNoLocalTest {
     private BrokerService brokerService;
     private String connectionUri;
     private ActiveMQConnectionFactory factory;
+    private final boolean keepDurableSubsActive;
+
+    @Parameters(name="keepDurableSubsActive={0}")
+    public static Collection<Object[]> data() {
+        return Arrays.asList(new Object[][] {
+                {true},
+                {false}
+        });
+    }
+
+    public DurableSubscriptionWithNoLocalTest(final boolean 
keepDurableSubsActive) {
+        this.keepDurableSubsActive = keepDurableSubsActive;
+    }
 
     @Before
     public void setUp() throws Exception {
@@ -69,7 +97,115 @@ public class DurableSubscriptionWithNoLocalTest {
         brokerService.waitUntilStopped();
     }
 
-    @Ignore("Requires Broker be able to remove and recreate on noLocal change")
+    /**
+     * Make sure that NoLocal works for connection started/stopped
+     *
+     * @throws JMSException
+     */
+    @Test(timeout = 60000)
+    public void testNoLocalStillWorkWithConnectionRestart() throws Exception {
+        ActiveMQConnection connection = null;
+        try {
+            connection = (ActiveMQConnection) factory.createConnection();
+            connection.setClientID("test-client");
+            connection.start();
+            test(connection, "test message 1");
+            connection.stop();
+            connection.start();
+            test(connection, "test message 2");
+        } finally {
+            if (connection != null) {
+                connection.close();
+            }
+        }
+    }
+
+    /**
+     * Make sure that NoLocal works for multiple connections to the same 
subscription
+     *
+     * @throws JMSException
+     */
+    @Test(timeout = 60000)
+    public void testNoLocalStillWorksNewConnection() throws Exception {
+        ActiveMQConnection connection = null;
+        try {
+            connection = (ActiveMQConnection) factory.createConnection();
+            connection.setClientID("test-client");
+            connection.start();
+            test(connection, "test message 1");
+        } finally {
+            if (connection != null) {
+                connection.close();
+            }
+        }
+
+        try {
+            connection = (ActiveMQConnection) factory.createConnection();
+            connection.setClientID("test-client");
+            connection.start();
+            test(connection, "test message 2");
+        } finally {
+            if (connection != null) {
+                connection.close();
+            }
+        }
+    }
+
+    /**
+     * Make sure that NoLocal works after restart
+     *
+     * @throws JMSException
+     */
+    @Test(timeout = 60000)
+    public void testNoLocalStillWorksRestartBroker() throws Exception {
+        ActiveMQConnection connection = null;
+        try {
+            connection = (ActiveMQConnection) factory.createConnection();
+            connection.setClientID("test-client");
+            connection.start();
+            test(connection, "test message 1");
+        } finally {
+            if (connection != null) {
+                connection.close();
+            }
+        }
+
+        tearDown();
+        createBroker(false);
+
+        try {
+            connection = (ActiveMQConnection) factory.createConnection();
+            connection.setClientID("test-client");
+            connection.start();
+            test(connection, "test message 2");
+        } finally {
+            if (connection != null) {
+                connection.close();
+            }
+        }
+    }
+
+    void test(final ActiveMQConnection connection, final String body) throws 
Exception {
+
+        Session incomingMessagesSession = connection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
+        Topic topic = incomingMessagesSession.createTopic("test.topic");
+        TopicSubscriber consumer = 
incomingMessagesSession.createDurableSubscriber(topic, "test-subscription", 
null, true);
+
+        Session outgoingMessagesSession = connection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
+        Destination destination = 
outgoingMessagesSession.createTopic("test.topic");
+        MessageProducer producer = 
outgoingMessagesSession.createProducer(destination);
+        TextMessage textMessage = 
outgoingMessagesSession.createTextMessage(body);
+        producer.send(textMessage);
+        producer.close();
+        System.out.println("message sent: " + textMessage.getJMSMessageID() + 
"; body: " + textMessage.getText());
+        outgoingMessagesSession.close();
+
+        assertNull(consumer.receive(2000));
+
+        consumer.close();
+        incomingMessagesSession.close();
+    }
+
     @Test(timeout = 60000)
     public void testDurableSubWithNoLocalChange() throws Exception {
         TopicConnection connection = factory.createTopicConnection();
@@ -126,7 +262,6 @@ public class DurableSubscriptionWithNoLocalTest {
         assertNull(durableSub.receive(100));
     }
 
-    @Ignore("Requires Broker be able to remove and recreate on noLocal change")
     @Test(timeout = 60000)
     public void testInvertedDurableSubWithNoLocalChange() throws Exception {
         TopicConnection connection = factory.createTopicConnection();
@@ -247,7 +382,6 @@ public class DurableSubscriptionWithNoLocalTest {
         assertNull(durableSub.receive(100));
     }
 
-    @Ignore("Requires Broker be able to remove and recreate on noLocal change")
     @Test(timeout = 60000)
     public void testInvertedDurableSubWithNoLocalChangeAfterRestart() throws 
Exception {
         TopicConnection connection = factory.createTopicConnection();
@@ -322,6 +456,7 @@ public class DurableSubscriptionWithNoLocalTest {
         brokerService.setStoreOpenWireVersion(CommandTypes.PROTOCOL_VERSION);
         brokerService.setUseJmx(false);
         brokerService.setDeleteAllMessagesOnStartup(deleteAllMessages);
+        brokerService.setKeepDurableSubsActive(keepDurableSubsActive);
         TransportConnector connector = 
brokerService.addConnector("tcp://0.0.0.0:0");
 
         brokerService.start();

Reply via email to