Repository: activemq
Updated Branches:
  refs/heads/master 0050f22b9 -> a03865560


https://issues.apache.org/jira/browse/AMQ-6423

Fixing durable sync over a network bridge so that network subscriptions
that are no longer permissible are also cleaned up


Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/a0386556
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/a0386556
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/a0386556

Branch: refs/heads/master
Commit: a038655605e8fa1de279b37989ba69a68f83c601
Parents: 0050f22
Author: Christopher L. Shannon (cshannon) <[email protected]>
Authored: Thu Sep 8 08:27:49 2016 -0400
Committer: Christopher L. Shannon (cshannon) <[email protected]>
Committed: Thu Sep 8 08:28:26 2016 -0400

----------------------------------------------------------------------
 .../network/DemandForwardingBridgeSupport.java  |  12 +-
 .../activemq/network/DurableConduitBridge.java  |  25 ++++
 .../network/DurableSyncNetworkBridgeTest.java   | 129 ++++++++++++++++++-
 .../network/DynamicNetworkTestSupport.java      |  12 ++
 4 files changed, 170 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/a0386556/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
----------------------------------------------------------------------
diff --git 
a/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
 
b/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
index 8a3a56a..70449f0 100644
--- 
a/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
+++ 
b/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
@@ -652,12 +652,14 @@ public abstract class DemandForwardingBridgeSupport 
implements NetworkBridge, Br
                             this.brokerService.getBrokerName(), 
subInfo.getBrokerName());
 
                     if (configuration.isSyncDurableSubs() && 
configuration.isConduitSubscriptions()
-                            && !configuration.isDynamicOnly() && 
subInfo.getSubscriptionInfos() != null) {
+                            && !configuration.isDynamicOnly()) {
                         if (started.get()) {
-                            for (ConsumerInfo info : 
subInfo.getSubscriptionInfos()) {
-                                
if(!info.getSubscriptionName().startsWith(DURABLE_SUB_PREFIX) &&
-                                        
matchesDynamicallyIncludedDestinations(info.getDestination())) {
-                                    serviceRemoteConsumerAdvisory(info);
+                            if (subInfo.getSubscriptionInfos() != null) {
+                                for (ConsumerInfo info : 
subInfo.getSubscriptionInfos()) {
+                                    
if(!info.getSubscriptionName().startsWith(DURABLE_SUB_PREFIX) &&
+                                            
matchesDynamicallyIncludedDestinations(info.getDestination())) {
+                                        serviceRemoteConsumerAdvisory(info);
+                                    }
                                 }
                             }
 

http://git-wip-us.apache.org/repos/asf/activemq/blob/a0386556/activemq-broker/src/main/java/org/apache/activemq/network/DurableConduitBridge.java
----------------------------------------------------------------------
diff --git 
a/activemq-broker/src/main/java/org/apache/activemq/network/DurableConduitBridge.java
 
b/activemq-broker/src/main/java/org/apache/activemq/network/DurableConduitBridge.java
index e699272..969c386 100644
--- 
a/activemq-broker/src/main/java/org/apache/activemq/network/DurableConduitBridge.java
+++ 
b/activemq-broker/src/main/java/org/apache/activemq/network/DurableConduitBridge.java
@@ -26,6 +26,7 @@ import org.apache.activemq.broker.region.TopicRegion;
 import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.command.ConsumerId;
 import org.apache.activemq.command.ConsumerInfo;
+import org.apache.activemq.command.RemoveSubscriptionInfo;
 import org.apache.activemq.filter.DestinationFilter;
 import org.apache.activemq.transport.Transport;
 import org.apache.activemq.util.TypeConversionSupport;
@@ -88,6 +89,30 @@ public class DurableConduitBridge extends ConduitBridge {
                         LOG.error("Failed to add static destination {}", dest, 
e);
                     }
                     LOG.trace("Forwarding messages for durable destination: 
{}", dest);
+                } else if (configuration.isSyncDurableSubs() && 
!isPermissableDestination(dest)) {
+                    if (dest.isTopic()) {
+                        RegionBroker regionBroker = (RegionBroker) 
brokerService.getRegionBroker();
+                        TopicRegion topicRegion = (TopicRegion) 
regionBroker.getTopicRegion();
+
+                        String candidateSubName = getSubscriberName(dest);
+                        for (Subscription subscription : 
topicRegion.getDurableSubscriptions().values()) {
+                            String subName = 
subscription.getConsumerInfo().getSubscriptionName();
+                            if (subName != null && 
subName.equals(candidateSubName)) {
+                               try {
+                                    // remove the NC subscription as it is no 
longer for a permissable dest
+                                    RemoveSubscriptionInfo sending = new 
RemoveSubscriptionInfo();
+                                    sending.setClientId(localClientId);
+                                    sending.setSubscriptionName(subName);
+                                    
sending.setConnectionId(this.localConnectionInfo.getConnectionId());
+                                    localBroker.oneway(sending);
+                                } catch (IOException e) {
+                                    LOG.debug("Exception removing NC durable 
subscription: {}", subName, e);
+                                    serviceRemoteException(e);
+                                }
+                                break;
+                            }
+                        }
+                    }
                 }
             }
         }

http://git-wip-us.apache.org/repos/asf/activemq/blob/a0386556/activemq-unit-tests/src/test/java/org/apache/activemq/network/DurableSyncNetworkBridgeTest.java
----------------------------------------------------------------------
diff --git 
a/activemq-unit-tests/src/test/java/org/apache/activemq/network/DurableSyncNetworkBridgeTest.java
 
b/activemq-unit-tests/src/test/java/org/apache/activemq/network/DurableSyncNetworkBridgeTest.java
index 67e9e24..62b3dec 100644
--- 
a/activemq-unit-tests/src/test/java/org/apache/activemq/network/DurableSyncNetworkBridgeTest.java
+++ 
b/activemq-unit-tests/src/test/java/org/apache/activemq/network/DurableSyncNetworkBridgeTest.java
@@ -24,6 +24,7 @@ import java.util.List;
 import java.util.concurrent.TimeUnit;
 
 import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
 import javax.jms.Session;
 
 import org.apache.activemq.ActiveMQConnectionFactory;
@@ -56,14 +57,18 @@ public class DurableSyncNetworkBridgeTest extends 
DynamicNetworkTestSupport {
 
     protected static final Logger LOG = 
LoggerFactory.getLogger(DurableSyncNetworkBridgeTest.class);
 
+    protected String staticIncludeTopics = "include.static.test";
+    protected String includedTopics = "include.test.>";
     protected String testTopicName2 = "include.test.bar2";
     private boolean dynamicOnly = false;
+    private boolean forceDurable = false;
     private byte remoteBrokerWireFormatVersion = CommandTypes.PROTOCOL_VERSION;
     public static enum FLOW {FORWARD, REVERSE};
 
     private BrokerService broker1;
     private BrokerService broker2;
     private Session session1;
+    private Session session2;
     private final FLOW flow;
 
     @Rule
@@ -98,7 +103,10 @@ public class DurableSyncNetworkBridgeTest extends 
DynamicNetworkTestSupport {
 
     @Before
     public void setUp() throws Exception {
+        includedTopics = "include.test.>";
+        staticIncludeTopics = "include.static.test";
         dynamicOnly = false;
+        forceDurable = false;
         remoteBrokerWireFormatVersion = CommandTypes.PROTOCOL_VERSION;
         doSetUp(true, true, tempFolder.newFolder(), tempFolder.newFolder());
     }
@@ -135,6 +143,7 @@ public class DurableSyncNetworkBridgeTest extends 
DynamicNetworkTestSupport {
         assertNCDurableSubsCount(broker2, topic, 1);
 
         restartBrokers(true);
+        assertBridgeStarted();
 
         assertSubscriptionsCount(broker1, topic, 1);
         assertNCDurableSubsCount(broker2, topic, 1);
@@ -157,6 +166,43 @@ public class DurableSyncNetworkBridgeTest extends 
DynamicNetworkTestSupport {
 
         doTearDown();
         restartBroker(broker1, false);
+        restartBroker(broker2, false);
+
+        //Send some messages to the NC sub and make sure it can still be 
deleted
+        MessageProducer prod = session2.createProducer(topic);
+        for (int i = 0; i < 10; i++) {
+            prod.send(session2.createTextMessage("test"));
+        }
+
+        assertSubscriptionsCount(broker1, topic, 1);
+        removeSubscription(broker1, topic, subName);
+        assertSubscriptionsCount(broker1, topic, 0);
+        doTearDown();
+
+        //Test that on successful reconnection of the bridge that
+        //the NC sub will be removed
+        restartBroker(broker2, true);
+        assertNCDurableSubsCount(broker2, topic, 1);
+        restartBroker(broker1, true);
+        assertBridgeStarted();
+        assertNCDurableSubsCount(broker2, topic, 0);
+
+    }
+
+    @Test
+    public void testRemoveSubscriptionWithBridgeOfflineIncludedChanged() 
throws Exception {
+        final ActiveMQTopic topic = new ActiveMQTopic(testTopicName);
+        MessageConsumer sub1 = session1.createDurableSubscriber(topic, 
subName);
+        sub1.close();
+
+        assertSubscriptionsCount(broker1, topic, 1);
+        assertNCDurableSubsCount(broker2, topic, 1);
+
+        doTearDown();
+
+        //change the included topics to make sure we still cleanup 
non-matching NC durables
+        includedTopics = "different.topic";
+        restartBroker(broker1, false);
         assertSubscriptionsCount(broker1, topic, 1);
         removeSubscription(broker1, topic, subName);
         assertSubscriptionsCount(broker1, topic, 0);
@@ -166,8 +212,74 @@ public class DurableSyncNetworkBridgeTest extends 
DynamicNetworkTestSupport {
         restartBroker(broker2, true);
         assertNCDurableSubsCount(broker2, topic, 1);
         restartBroker(broker1, true);
+        assertBridgeStarted();
+        assertNCDurableSubsCount(broker2, topic, 0);
+
+    }
+
+    @Test
+    public void testSubscriptionRemovedAfterIncludedChanged() throws Exception 
{
+        final ActiveMQTopic topic = new ActiveMQTopic(testTopicName);
+        MessageConsumer sub1 = session1.createDurableSubscriber(topic, 
subName);
+        sub1.close();
+
+        assertSubscriptionsCount(broker1, topic, 1);
+        assertNCDurableSubsCount(broker2, topic, 1);
+
+        doTearDown();
+
+        //change the included topics to make sure we still cleanup 
non-matching NC durables
+        includedTopics = "different.topic";
+        restartBroker(broker1, false);
+        assertSubscriptionsCount(broker1, topic, 1);
+
+        //Test that on successful reconnection of the bridge that
+        //the NC sub will be removed because even though the local 
subscription exists,
+        //it no longer matches the included filter
+        restartBroker(broker2, true);
+        assertNCDurableSubsCount(broker2, topic, 1);
+        restartBroker(broker1, true);
+        assertBridgeStarted();
         assertNCDurableSubsCount(broker2, topic, 0);
+        assertSubscriptionsCount(broker1, topic, 1);
+
+    }
+
+    @Test
+    public void testSubscriptionRemovedAfterStaticChanged() throws Exception {
+        forceDurable = true;
+        this.restartBrokers(true);
+
+        final ActiveMQTopic topic = new 
ActiveMQTopic(this.staticIncludeTopics);
+        MessageConsumer sub1 = session1.createDurableSubscriber(topic, 
subName);
+        sub1.close();
+
+        assertSubscriptionsCount(broker1, topic, 1);
+        assertNCDurableSubsCount(broker2, topic, 1);
+
+        doTearDown();
 
+        //change the included topics to make sure we still cleanup 
non-matching NC durables
+        staticIncludeTopics = "different.topic";
+        this.restartBrokers(false);
+        assertSubscriptionsCount(broker1, topic, 1);
+        assertNCDurableSubsCount(broker2, topic, 1);
+
+        //Send some messages to the NC sub and make sure it can still be 
deleted
+        MessageProducer prod = session2.createProducer(topic);
+        for (int i = 0; i < 10; i++) {
+            prod.send(session2.createTextMessage("test"));
+        }
+
+        //Test that on successful reconnection of the bridge that
+        //the NC sub will be removed because even though the local 
subscription exists,
+        //it no longer matches the included static filter
+        restartBroker(broker2, true);
+        assertNCDurableSubsCount(broker2, topic, 1);
+        restartBroker(broker1, true);
+        assertBridgeStarted();
+        assertNCDurableSubsCount(broker2, topic, 0);
+        assertSubscriptionsCount(broker1, topic, 1);
     }
 
     @Test
@@ -199,9 +311,9 @@ public class DurableSyncNetworkBridgeTest extends 
DynamicNetworkTestSupport {
 
         //After sync, remove old NC and create one for topic 2
         restartBroker(broker1, true);
+        assertBridgeStarted();
         assertNCDurableSubsCount(broker2, topic, 0);
         assertNCDurableSubsCount(broker2, topic2, 1);
-
     }
 
     @Test
@@ -225,6 +337,7 @@ public class DurableSyncNetworkBridgeTest extends 
DynamicNetworkTestSupport {
         assertSubscriptionsCount(broker1, topic2, 1);
 
         restartBrokers(true);
+        assertBridgeStarted();
         assertNCDurableSubsCount(broker2, topic, 1);
         assertNCDurableSubsCount(broker2, topic2, 1);
         assertNCDurableSubsCount(broker2, excludeTopic, 0);
@@ -265,6 +378,7 @@ public class DurableSyncNetworkBridgeTest extends 
DynamicNetworkTestSupport {
             assertNCDurableSubsCount(broker2, new 
ActiveMQTopic("include.test." + i), 1);
         }
 
+        assertBridgeStarted();
     }
 
 
@@ -291,6 +405,7 @@ public class DurableSyncNetworkBridgeTest extends 
DynamicNetworkTestSupport {
         //not be added
         restartBrokers(true);
         assertNCDurableSubsCount(broker2, topic, 0);
+        assertBridgeStarted();
 
     }
 
@@ -312,6 +427,7 @@ public class DurableSyncNetworkBridgeTest extends 
DynamicNetworkTestSupport {
 
         restartBrokers(true);
         assertNCDurableSubsCount(broker2, topic, 0);
+        assertBridgeStarted();
     }
 
     @Test
@@ -335,6 +451,7 @@ public class DurableSyncNetworkBridgeTest extends 
DynamicNetworkTestSupport {
         //bring online again
         session1.createDurableSubscriber(topic, subName);
         assertNCDurableSubsCount(broker2, topic, 1);
+        assertBridgeStarted();
 
     }
 
@@ -358,6 +475,7 @@ public class DurableSyncNetworkBridgeTest extends 
DynamicNetworkTestSupport {
         restartBrokers(true);
         assertNCDurableSubsCount(broker2, topic, 1);
         assertNCDurableSubsCount(broker2, excludeTopic, 0);
+        assertBridgeStarted();
 
     }
 
@@ -389,6 +507,7 @@ public class DurableSyncNetworkBridgeTest extends 
DynamicNetworkTestSupport {
         //between the sync command and the online durables that are added over
         //the consumer advisory
         restartBrokers(true);
+        assertBridgeStarted();
 
         //Re-create
         session1.createDurableSubscriber(topic, subName);
@@ -460,7 +579,7 @@ public class DurableSyncNetworkBridgeTest extends 
DynamicNetworkTestSupport {
                 public boolean isSatisified() throws Exception {
                     return 
localBroker.getNetworkConnectors().get(0).activeBridges().size() == 1;
                 }
-            }, 10000, 500);
+            }, 5000, 500);
         }
         localSession = localConnection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
 
@@ -469,6 +588,7 @@ public class DurableSyncNetworkBridgeTest extends 
DynamicNetworkTestSupport {
             session1 = localSession;
         } else {
             broker2 = localBroker;
+            session2 = localSession;
         }
     }
 
@@ -486,6 +606,7 @@ public class DurableSyncNetworkBridgeTest extends 
DynamicNetworkTestSupport {
 
         if (flow.equals(FLOW.FORWARD)) {
             broker2 = remoteBroker;
+            session2 = remoteSession;
         } else {
             broker1 = remoteBroker;
             session1 = remoteSession;
@@ -524,8 +645,10 @@ public class DurableSyncNetworkBridgeTest extends 
DynamicNetworkTestSupport {
         connector.setDuplex(true);
         connector.setStaticBridge(false);
         connector.setSyncDurableSubs(true);
+        connector.setStaticallyIncludedDestinations(
+                Lists.<ActiveMQDestination>newArrayList(new 
ActiveMQTopic(staticIncludeTopics + "?forceDurable=" + forceDurable)));
         connector.setDynamicallyIncludedDestinations(
-                Lists.<ActiveMQDestination>newArrayList(new 
ActiveMQTopic("include.test.>")));
+                Lists.<ActiveMQDestination>newArrayList(new 
ActiveMQTopic(includedTopics)));
         connector.setExcludedDestinations(
                 Lists.<ActiveMQDestination>newArrayList(new 
ActiveMQTopic(excludeTopicName)));
         return connector;

http://git-wip-us.apache.org/repos/asf/activemq/blob/a0386556/activemq-unit-tests/src/test/java/org/apache/activemq/network/DynamicNetworkTestSupport.java
----------------------------------------------------------------------
diff --git 
a/activemq-unit-tests/src/test/java/org/apache/activemq/network/DynamicNetworkTestSupport.java
 
b/activemq-unit-tests/src/test/java/org/apache/activemq/network/DynamicNetworkTestSupport.java
index 0b388cc..4b8942b 100644
--- 
a/activemq-unit-tests/src/test/java/org/apache/activemq/network/DynamicNetworkTestSupport.java
+++ 
b/activemq-unit-tests/src/test/java/org/apache/activemq/network/DynamicNetworkTestSupport.java
@@ -88,6 +88,16 @@ public abstract class DynamicNetworkTestSupport {
         }
     }
 
+
+    protected void assertBridgeStarted() throws Exception {
+        assertTrue(Wait.waitFor(new Condition() {
+            @Override
+            public boolean isSatisified() throws Exception {
+                return 
localBroker.getNetworkConnectors().get(0).activeBridges().size() == 1;
+            }
+        }, 10000, 500));
+    }
+
     protected RemoveSubscriptionInfo getRemoveSubscriptionInfo(final 
ConnectionContext context,
             final BrokerService brokerService) throws Exception {
         RemoveSubscriptionInfo info = new RemoveSubscriptionInfo();
@@ -181,6 +191,7 @@ public abstract class DynamicNetworkTestSupport {
             destination = (Topic) d;
         }
 
+
         for (SubscriptionKey key : destination.getDurableTopicSubs().keySet()) 
{
             if 
(key.getSubscriptionName().startsWith(DemandForwardingBridge.DURABLE_SUB_PREFIX))
 {
                 DurableTopicSubscription sub = 
destination.getDurableTopicSubs().get(key);
@@ -189,6 +200,7 @@ public abstract class DynamicNetworkTestSupport {
                 }
             }
         }
+
         return subs;
     }
 

Reply via email to