AMQ-6858 - reworking durable subscription propagation fix

Significantly reworking previous fix so that the client id is properly
changed when tracking network proxy subscriptions. This makes it so
removal is done properly

(cherry picked from commit 41211c78d19b545a2352584d3598346aa3705be4)


Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/97fe20a5
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/97fe20a5
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/97fe20a5

Branch: refs/heads/activemq-5.15.x
Commit: 97fe20a5721a39b70f841f303024fa30352d7336
Parents: d5a987b
Author: Christopher L. Shannon <christopher.l.shan...@gmail.com>
Authored: Sun Nov 12 15:37:40 2017 -0500
Committer: Christopher L. Shannon (cshannon) <christopher.l.shan...@gmail.com>
Committed: Thu Nov 16 07:50:19 2017 -0500

----------------------------------------------------------------------
 .../apache/activemq/network/ConduitBridge.java  |   8 +-
 .../network/DemandForwardingBridgeSupport.java  |  86 ++-
 .../activemq/network/DemandSubscription.java    |   8 +
 .../activemq/network/DurableConduitBridge.java  |   6 +-
 .../DurableFiveBrokerNetworkBridgeTest.java     | 576 +++++++++++++++++++
 .../DurableThreeBrokerNetworkBridgeTest.java    | 241 --------
 6 files changed, 659 insertions(+), 266 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/97fe20a5/activemq-broker/src/main/java/org/apache/activemq/network/ConduitBridge.java
----------------------------------------------------------------------
diff --git 
a/activemq-broker/src/main/java/org/apache/activemq/network/ConduitBridge.java 
b/activemq-broker/src/main/java/org/apache/activemq/network/ConduitBridge.java
index 6ced896..bc9d004 100644
--- 
a/activemq-broker/src/main/java/org/apache/activemq/network/ConduitBridge.java
+++ 
b/activemq-broker/src/main/java/org/apache/activemq/network/ConduitBridge.java
@@ -20,6 +20,7 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
 
+import org.apache.activemq.command.BrokerId;
 import org.apache.activemq.command.ConsumerId;
 import org.apache.activemq.command.ConsumerInfo;
 import org.apache.activemq.command.SubscriptionInfo;
@@ -80,7 +81,12 @@ public class ConduitBridge extends DemandForwardingBridge {
                         ds.addForcedDurableConsumer(info.getConsumerId());
                     }
                 } else {
-                    ds.getDurableRemoteSubs().add(new 
SubscriptionInfo(info.getClientId(), info.getSubscriptionName()));
+                       if (isProxyNSConsumer(info)) {
+                           final BrokerId[] path = info.getBrokerPath();
+                           addProxyNetworkSubscription(ds, path, 
info.getSubscriptionName());
+                       } else {
+                               ds.getDurableRemoteSubs().add(new 
SubscriptionInfo(info.getClientId(), info.getSubscriptionName()));
+                       }
                 }
                 matched = true;
                 // continue - we want interest to any existing 
DemandSubscriptions

http://git-wip-us.apache.org/repos/asf/activemq/blob/97fe20a5/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
----------------------------------------------------------------------
diff --git 
a/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
 
b/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
index efdfa5a..03e79e4 100644
--- 
a/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
+++ 
b/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
@@ -36,6 +36,7 @@ import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import javax.management.ObjectName;
 
@@ -94,7 +95,6 @@ import org.apache.activemq.transport.Transport;
 import org.apache.activemq.transport.TransportDisposedIOException;
 import org.apache.activemq.transport.TransportFilter;
 import org.apache.activemq.transport.failover.FailoverTransport;
-import org.apache.activemq.transport.tcp.SslTransport;
 import org.apache.activemq.transport.tcp.TcpTransport;
 import org.apache.activemq.util.IdGenerator;
 import org.apache.activemq.util.IntrospectionSupport;
@@ -666,11 +666,52 @@ public abstract class DemandForwardingBridgeSupport 
implements NetworkBridge, Br
      * @param info
      * @return
      */
-    protected boolean isBridgeNS(ConsumerInfo info) {
+    protected boolean isDirectBridgeConsumer(ConsumerInfo info) {
         return (info.getSubscriptionName() != null && 
info.getSubscriptionName().startsWith(DURABLE_SUB_PREFIX)) &&
                 (info.getClientId() == null || 
info.getClientId().startsWith(configuration.getName()));
     }
 
+    private boolean isProxyBridgeSubscription(SubscriptionInfo info) {
+        if (info.getSubcriptionName() != null && info.getClientId() != null) {
+            if (info.getSubscriptionName().startsWith(DURABLE_SUB_PREFIX)
+                    && 
!info.getClientId().startsWith(configuration.getName())) {
+                return true;
+            }
+        }
+        return false;
+    }
+
+    protected void addProxyNetworkSubscription(final DemandSubscription sub, 
final BrokerId[] path, String subName) {
+        if (sub != null && path.length > 1 && subName != null) {
+            String b1 = path[path.length-1].toString();
+            String b2 = path[path.length-2].toString();
+            final SubscriptionInfo newSubInfo = new SubscriptionInfo(b2 + 
"_inbound_" + b1, subName);
+            sub.getDurableRemoteSubs().add(newSubInfo);
+            sub.getNetworkDemandConsumerMap().computeIfAbsent(newSubInfo, v -> 
new AtomicInteger()).incrementAndGet();
+            LOG.debug("Adding proxy network subscription {} to demand 
subscription", newSubInfo);
+        } else {
+            LOG.debug("Skipping addProxyNetworkSubscription");
+        }
+    }
+
+    private String getProxyBridgeClientId(SubscriptionInfo info) {
+        String[] clientIdTokens = info.getClientId().split("_");
+        String newClientId = "";
+        if (clientIdTokens.length > 2) {
+            for (int j = clientIdTokens.length - 3; j < clientIdTokens.length; 
j++) {
+                newClientId += clientIdTokens[j];
+                if (j < clientIdTokens.length -1) {
+                    newClientId += "_";
+                }
+            }
+        }
+        return newClientId;
+    }
+
+    protected boolean isProxyNSConsumer(ConsumerInfo info) {
+        return info.getBrokerPath() != null && info.getBrokerPath().length > 1;
+    }
+
     protected void serviceRemoteCommand(Command command) {
         if (!disposed.get()) {
             try {
@@ -706,7 +747,7 @@ public abstract class DemandForwardingBridgeSupport 
implements NetworkBridge, Br
                                                     
//dynamicallyIncludedDestinations list
                                                     //Also re-add network 
consumers that are not part of this direct
                                                     //bridge (proxy of proxy 
bridges)
-                                                    
if((info.getSubscriptionName() == null || !isBridgeNS(info)) &&
+                                                    
if((info.getSubscriptionName() == null || !isDirectBridgeConsumer(info)) &&
                                                             
NetworkBridgeUtils.matchesDestinations(dynamicallyIncludedDestinations, 
info.getDestination())) {
                                                         
serviceRemoteConsumerAdvisory(info);
                                                     }
@@ -975,8 +1016,22 @@ public abstract class DemandForwardingBridgeSupport 
implements NetworkBridge, Br
             for (Iterator<DemandSubscription> i = 
subscriptionMapByLocalId.values().iterator(); i.hasNext(); ) {
                 DemandSubscription ds = i.next();
                 boolean removed = 
ds.getDurableRemoteSubs().remove(subscriptionInfo);
+
                 if (removed) {
                     cleanupDurableSub(ds, i);
+                //If this is a proxy bridge subscription we need to try 
changing the clientId
+                } else if (!removed && 
isProxyBridgeSubscription(subscriptionInfo)){
+                    
subscriptionInfo.setClientId(getProxyBridgeClientId(subscriptionInfo));
+                    if (ds.getDurableRemoteSubs().contains(subscriptionInfo)) {
+                        AtomicInteger count = 
ds.getNetworkDemandConsumerMap().computeIfAbsent(subscriptionInfo, v -> new 
AtomicInteger());
+                        count.decrementAndGet();
+                        //Only remove the durable remote sub if the count <= 0
+                        if (count.get() <= 0) {
+                            ds.getDurableRemoteSubs().remove(subscriptionInfo);
+                            
ds.getNetworkDemandConsumerMap().remove(subscriptionInfo);
+                            cleanupDurableSub(ds, i);
+                        }
+                    }
                 }
             }
         }
@@ -984,6 +1039,7 @@ public abstract class DemandForwardingBridgeSupport 
implements NetworkBridge, Br
 
     private void cleanupDurableSub(final DemandSubscription ds,
             Iterator<DemandSubscription> i) throws IOException {
+
         if (ds != null && ds.getLocalDurableSubscriber() != null && 
ds.getDurableRemoteSubs().isEmpty()
                 && ds.getForcedDurableConsumersSize() == 0) {
             // deactivate subscriber
@@ -998,9 +1054,7 @@ public abstract class DemandForwardingBridgeSupport 
implements NetworkBridge, Br
             localBroker.oneway(sending);
 
             //remove subscriber from map
-            if (i != null) {
-                i.remove();
-            }
+            i.remove();
         }
     }
 
@@ -1094,18 +1148,7 @@ public abstract class DemandForwardingBridgeSupport 
implements NetworkBridge, Br
                 public void run() {
                     sub.waitForCompletion();
                     try {
-                        //If removing a network durable subscription that 
still has durable remote subs
-                        //make sure we cleanup the durable subscription 
properly - necessary when using
-                        //durable subscriptions and 3 or more brokers
-                        if (configuration.isConduitSubscriptions() &&
-                                sub.getLocalInfo().getSubscriptionName() != 
null &&
-                                
sub.getLocalInfo().getSubscriptionName().startsWith(DURABLE_SUB_PREFIX) &&
-                                sub.getDurableRemoteSubs().size() > 0) {
-                            sub.getDurableRemoteSubs().clear();
-                            cleanupDurableSub(sub, null);
-                        } else {
-                            
localBroker.oneway(sub.getLocalInfo().createRemoveCommand());
-                        }
+                        
localBroker.oneway(sub.getLocalInfo().createRemoveCommand());
                     } catch (IOException e) {
                         LOG.warn("failed to deliver remove command for local 
subscription, for remote {}", sub.getRemoteInfo().getConsumerId(), e);
                     }
@@ -1367,7 +1410,12 @@ public abstract class DemandForwardingBridgeSupport 
implements NetworkBridge, Br
                 undoMapRegistration(sub);
             } else {
                 if (consumerInfo.isDurable()) {
-                    sub.getDurableRemoteSubs().add(new 
SubscriptionInfo(sub.getRemoteInfo().getClientId(), 
consumerInfo.getSubscriptionName()));
+                       if (isProxyNSConsumer(sub.getRemoteInfo())) {
+                               BrokerId[] path = 
sub.getRemoteInfo().getBrokerPath();
+                               addProxyNetworkSubscription(sub, path, 
consumerInfo.getSubscriptionName());
+                       } else {
+                               sub.getDurableRemoteSubs().add(new 
SubscriptionInfo(sub.getRemoteInfo().getClientId(), 
consumerInfo.getSubscriptionName()));
+                       }
                 }
                 addSubscription(sub);
                 LOG.debug("{} new demand subscription: {}", 
configuration.getBrokerName(), sub);

http://git-wip-us.apache.org/repos/asf/activemq/blob/97fe20a5/activemq-broker/src/main/java/org/apache/activemq/network/DemandSubscription.java
----------------------------------------------------------------------
diff --git 
a/activemq-broker/src/main/java/org/apache/activemq/network/DemandSubscription.java
 
b/activemq-broker/src/main/java/org/apache/activemq/network/DemandSubscription.java
index 371df0a..96a9baf 100644
--- 
a/activemq-broker/src/main/java/org/apache/activemq/network/DemandSubscription.java
+++ 
b/activemq-broker/src/main/java/org/apache/activemq/network/DemandSubscription.java
@@ -16,7 +16,9 @@
  */
 package org.apache.activemq.network;
 
+import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.CopyOnWriteArraySet;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -42,6 +44,8 @@ public class DemandSubscription {
     private final AtomicBoolean activeWaiter = new AtomicBoolean();
     private final Set<SubscriptionInfo> durableRemoteSubs = new 
CopyOnWriteArraySet<SubscriptionInfo>();
     private final Set<ConsumerId> forcedDurableConsumers = new 
CopyOnWriteArraySet<ConsumerId>();
+    //Used for proxy network consumers
+    private final Map<SubscriptionInfo, AtomicInteger> 
networkDemandConsumerMap = new ConcurrentHashMap<>();
     private SubscriptionInfo localDurableSubscriber;
 
     private NetworkBridgeFilter networkBridgeFilter;
@@ -83,6 +87,10 @@ public class DemandSubscription {
         return durableRemoteSubs;
     }
 
+    public Map<SubscriptionInfo, AtomicInteger> getNetworkDemandConsumerMap() {
+        return networkDemandConsumerMap;
+    }
+
     /**
      * @return true if there are no interested consumers
      */

http://git-wip-us.apache.org/repos/asf/activemq/blob/97fe20a5/activemq-broker/src/main/java/org/apache/activemq/network/DurableConduitBridge.java
----------------------------------------------------------------------
diff --git 
a/activemq-broker/src/main/java/org/apache/activemq/network/DurableConduitBridge.java
 
b/activemq-broker/src/main/java/org/apache/activemq/network/DurableConduitBridge.java
index 42f30a4..8d14f74 100644
--- 
a/activemq-broker/src/main/java/org/apache/activemq/network/DurableConduitBridge.java
+++ 
b/activemq-broker/src/main/java/org/apache/activemq/network/DurableConduitBridge.java
@@ -142,12 +142,8 @@ public class DurableConduitBridge extends ConduitBridge {
             info.setSubscriptionName(getSubscriberName(info.getDestination()));
             // and override the consumerId with something unique so that it 
won't
             // be removed if the durable subscriber (at the other end) goes 
away
-            //Only do this for direct bridge consumers - proxy network 
consumers we don't
-            //want to replace the consumerId or cleanup won't happen properly
-            if (info.getBrokerPath().length == 1 || 
(info.getBrokerPath().length > 1 && info.getBrokerPath()[0] == 
remoteBrokerPath[0])) {
-                info.setConsumerId(new 
ConsumerId(localSessionInfo.getSessionId(),
+           info.setConsumerId(new ConsumerId(localSessionInfo.getSessionId(),
                                consumerIdGenerator.getNextSequenceId()));
-            }
         }
         info.setSelector(null);
         DemandSubscription demandSubscription = 
doCreateDemandSubscription(info);

http://git-wip-us.apache.org/repos/asf/activemq/blob/97fe20a5/activemq-unit-tests/src/test/java/org/apache/activemq/network/DurableFiveBrokerNetworkBridgeTest.java
----------------------------------------------------------------------
diff --git 
a/activemq-unit-tests/src/test/java/org/apache/activemq/network/DurableFiveBrokerNetworkBridgeTest.java
 
b/activemq-unit-tests/src/test/java/org/apache/activemq/network/DurableFiveBrokerNetworkBridgeTest.java
new file mode 100644
index 0000000..4a63553
--- /dev/null
+++ 
b/activemq-unit-tests/src/test/java/org/apache/activemq/network/DurableFiveBrokerNetworkBridgeTest.java
@@ -0,0 +1,576 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.network;
+
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.List;
+
+import javax.jms.Connection;
+import javax.jms.MessageConsumer;
+import javax.jms.Session;
+
+import org.apache.activemq.JmsMultipleBrokersTestSupport;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.region.Destination;
+import org.apache.activemq.broker.region.DestinationFilter;
+import org.apache.activemq.broker.region.DurableTopicSubscription;
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.ActiveMQTopic;
+import org.apache.activemq.util.SubscriptionKey;
+import org.apache.activemq.util.Wait;
+import org.apache.activemq.util.Wait.Condition;
+
+import com.google.common.collect.Lists;
+
+import junit.framework.Test;
+
+/**
+ * Test to make sure durable subscriptions propagate properly throughout 
network bridges
+ * and that conduit subscriptions work properly
+ */
+public class DurableFiveBrokerNetworkBridgeTest extends 
JmsMultipleBrokersTestSupport {
+
+    private boolean duplex = true;
+
+    @Override
+    protected NetworkConnector bridgeBrokers(String localBrokerName, String 
remoteBrokerName) throws Exception {
+        NetworkConnector connector = super.bridgeBrokers(localBrokerName, 
remoteBrokerName);
+        connector.setDynamicallyIncludedDestinations(
+                Lists.<ActiveMQDestination> newArrayList(new 
ActiveMQTopic("TEST.FOO?forceDurable=true")));
+        connector.setDuplex(duplex);
+        connector.setDecreaseNetworkConsumerPriority(false);
+        connector.setConduitSubscriptions(true);
+        connector.setSyncDurableSubs(true);
+        connector.setNetworkTTL(-1);
+        return connector;
+    }
+
+    public void testDurablePropagationDuplex() throws Exception {
+        duplex = true;
+        testDurablePropagation();
+    }
+
+    public void testDurablePropagationOneWay() throws Exception {
+        duplex = false;
+        testDurablePropagation();
+    }
+
+    /**
+     * BrokerA -> BrokerB -> BrokerC
+     */
+    protected void testDurablePropagation() throws Exception {
+        // Setup broker networks
+        bridgeBrokers("BrokerA", "BrokerB");
+        bridgeBrokers("BrokerB", "BrokerC");
+        if (!duplex) {
+            bridgeBrokers("BrokerB", "BrokerA");
+            bridgeBrokers("BrokerC", "BrokerB");
+        }
+
+        startAllBrokers();
+
+        // Setup destination
+        ActiveMQTopic dest = (ActiveMQTopic) createDestination("TEST.FOO", 
true);
+
+        // Setup consumers
+        Session ses = createSession("BrokerA");
+        MessageConsumer clientA = ses.createDurableSubscriber(dest, "subA");
+        MessageConsumer clientB = ses.createDurableSubscriber(dest, "subB");
+
+        // let consumers propagate around the network
+        assertNCDurableSubsCount(brokers.get("BrokerB").broker, dest, 1);
+        assertNCDurableSubsCount(brokers.get("BrokerC").broker, dest, 1);
+        assertNCDurableSubsCount(brokers.get("BrokerA").broker, dest, 0);
+
+        sendMessages("BrokerC", dest, 1);
+        assertNotNull(clientA.receive(1000));
+        assertNotNull(clientB.receive(1000));
+
+        //bring online a consumer on the other side
+        Session ses2 = createSession("BrokerC");
+        MessageConsumer clientC = ses2.createDurableSubscriber(dest, "subC");
+        //there will be 2 network durables, 1 for each direction of the bridge
+        assertNCDurableSubsCount(brokers.get("BrokerB").broker, dest, 2);
+        assertNCDurableSubsCount(brokers.get("BrokerC").broker, dest, 1);
+        assertNCDurableSubsCount(brokers.get("BrokerA").broker, dest, 1);
+
+        clientA.close();
+        clientB.close();
+        clientC.close();
+        ses.unsubscribe("subA");
+        ses.unsubscribe("subB");
+        ses2.unsubscribe("subC");
+
+        assertNCDurableSubsCount(brokers.get("BrokerA").broker, dest, 0);
+        assertNCDurableSubsCount(brokers.get("BrokerB").broker, dest, 0);
+        assertNCDurableSubsCount(brokers.get("BrokerC").broker, dest, 0);
+
+    }
+
+    public void testDurablePropagationConsumerAllBrokersDuplex() throws 
Exception {
+        duplex = true;
+        testDurablePropagationConsumerAllBrokers();
+    }
+
+    public void testDurablePropagationConsumerAllBrokersOneWay() throws 
Exception {
+        duplex = false;
+        testDurablePropagationConsumerAllBrokers();
+    }
+
+    protected void testDurablePropagationConsumerAllBrokers() throws Exception 
{
+        // Setup broker networks
+        bridgeBrokers("BrokerA", "BrokerB");
+        bridgeBrokers("BrokerB", "BrokerC");
+        if (!duplex) {
+            bridgeBrokers("BrokerB", "BrokerA");
+            bridgeBrokers("BrokerC", "BrokerB");
+        }
+
+        startAllBrokers();
+
+        // Setup destination
+        ActiveMQTopic dest = (ActiveMQTopic) createDestination("TEST.FOO", 
true);
+
+        // Setup consumers
+        Session ses = createSession("BrokerA");
+        MessageConsumer clientA = ses.createDurableSubscriber(dest, "subA");
+
+        // let consumers propagate around the network
+        assertNCDurableSubsCount(brokers.get("BrokerB").broker, dest, 1);
+        assertNCDurableSubsCount(brokers.get("BrokerC").broker, dest, 1);
+        assertNCDurableSubsCount(brokers.get("BrokerA").broker, dest, 0);
+
+        //bring online a consumer on the other side
+        Session ses2 = createSession("BrokerB");
+        MessageConsumer clientB = ses2.createDurableSubscriber(dest, "subB");
+
+        assertNCDurableSubsCount(brokers.get("BrokerB").broker, dest, 1);
+        assertNCDurableSubsCount(brokers.get("BrokerC").broker, dest, 1);
+        assertNCDurableSubsCount(brokers.get("BrokerA").broker, dest, 1);
+
+        Session ses3 = createSession("BrokerC");
+        MessageConsumer clientC = ses3.createDurableSubscriber(dest, "subC");
+
+        assertNCDurableSubsCount(brokers.get("BrokerB").broker, dest, 2);
+        assertNCDurableSubsCount(brokers.get("BrokerC").broker, dest, 1);
+        assertNCDurableSubsCount(brokers.get("BrokerA").broker, dest, 1);
+
+
+        clientA.close();
+        clientB.close();
+        clientC.close();
+        ses.unsubscribe("subA");
+        ses2.unsubscribe("subB");
+        ses3.unsubscribe("subC");
+
+
+        assertNCDurableSubsCount(brokers.get("BrokerA").broker, dest, 0);
+        assertNCDurableSubsCount(brokers.get("BrokerB").broker, dest, 0);
+        assertNCDurableSubsCount(brokers.get("BrokerC").broker, dest, 0);
+
+    }
+
+    public void testDurablePropagation5BrokerDuplex() throws Exception {
+        duplex = true;
+        testDurablePropagation5Broker();
+    }
+
+    public void testDurablePropagation5BrokerOneWay() throws Exception {
+        duplex = false;
+        testDurablePropagation5Broker();
+    }
+
+    protected void testDurablePropagation5Broker() throws Exception {
+        // Setup broker networks
+        bridgeBrokers("BrokerA", "BrokerB");
+        bridgeBrokers("BrokerB", "BrokerC");
+        bridgeBrokers("BrokerC", "BrokerD");
+        bridgeBrokers("BrokerD", "BrokerE");
+        if (!duplex) {
+            bridgeBrokers("BrokerB", "BrokerA");
+            bridgeBrokers("BrokerC", "BrokerB");
+            bridgeBrokers("BrokerD", "BrokerC");
+            bridgeBrokers("BrokerE", "BrokerD");
+        }
+
+        startAllBrokers();
+
+        // Setup destination
+        ActiveMQTopic dest = (ActiveMQTopic) createDestination("TEST.FOO", 
true);
+
+        // Setup consumers
+        Session ses = createSession("BrokerA");
+        MessageConsumer clientA = ses.createDurableSubscriber(dest, "subA");
+        Thread.sleep(1000);
+
+        // let consumers propagate around the network
+        assertNCDurableSubsCount(brokers.get("BrokerB").broker, dest, 1);
+        assertNCDurableSubsCount(brokers.get("BrokerC").broker, dest, 1);
+        assertNCDurableSubsCount(brokers.get("BrokerD").broker, dest, 1);
+        assertNCDurableSubsCount(brokers.get("BrokerE").broker, dest, 1);
+        assertNCDurableSubsCount(brokers.get("BrokerA").broker, dest, 0);
+
+        sendMessages("BrokerE", dest, 1);
+        assertNotNull(clientA.receive(1000));
+
+        //bring online a consumer on the other side
+        Session ses2 = createSession("BrokerE");
+        MessageConsumer clientE = ses2.createDurableSubscriber(dest, "subE");
+        Thread.sleep(1000);
+
+        //there will be 2 network durables, 1 for each direction of the bridge
+        assertNCDurableSubsCount(brokers.get("BrokerB").broker, dest, 2);
+        assertNCDurableSubsCount(brokers.get("BrokerC").broker, dest, 2);
+        assertNCDurableSubsCount(brokers.get("BrokerD").broker, dest, 2);
+        assertNCDurableSubsCount(brokers.get("BrokerE").broker, dest, 1);
+        assertNCDurableSubsCount(brokers.get("BrokerA").broker, dest, 1);
+
+        clientA.close();
+        clientE.close();
+        ses.unsubscribe("subA");
+        ses2.unsubscribe("subE");
+
+        assertNCDurableSubsCount(brokers.get("BrokerA").broker, dest, 0);
+        assertNCDurableSubsCount(brokers.get("BrokerB").broker, dest, 0);
+        assertNCDurableSubsCount(brokers.get("BrokerC").broker, dest, 0);
+        assertNCDurableSubsCount(brokers.get("BrokerD").broker, dest, 0);
+        assertNCDurableSubsCount(brokers.get("BrokerE").broker, dest, 0);
+
+    }
+
+    public void testDurablePropagationSpokeDuplex() throws Exception {
+        duplex = true;
+        testDurablePropagationSpoke();
+    }
+
+    public void testDurablePropagationSpokeOneWay() throws Exception {
+        duplex = false;
+        testDurablePropagationSpoke();
+    }
+
+    protected void testDurablePropagationSpoke() throws Exception {
+        // Setup broker networks
+        bridgeBrokers("BrokerA", "BrokerB");
+        bridgeBrokers("BrokerB", "BrokerC");
+        bridgeBrokers("BrokerB", "BrokerD");
+        if (!duplex) {
+            bridgeBrokers("BrokerB", "BrokerA");
+            bridgeBrokers("BrokerC", "BrokerB");
+            bridgeBrokers("BrokerD", "BrokerB");
+        }
+
+        startAllBrokers();
+
+        // Setup destination
+        ActiveMQTopic dest = (ActiveMQTopic) createDestination("TEST.FOO", 
true);
+
+        // Setup consumers
+        Session ses = createSession("BrokerA");
+        Session ses2 = createSession("BrokerB");
+        Session ses3 = createSession("BrokerC");
+        Session ses4 = createSession("BrokerD");
+
+        MessageConsumer clientA = ses.createDurableSubscriber(dest, "subA");
+        MessageConsumer clientAB = ses.createDurableSubscriber(dest, "subAB");
+        Thread.sleep(1000);
+
+        // let consumers propagate around the network
+        assertNCDurableSubsCount(brokers.get("BrokerB").broker, dest, 1);
+        assertNCDurableSubsCount(brokers.get("BrokerC").broker, dest, 1);
+        assertNCDurableSubsCount(brokers.get("BrokerD").broker, dest, 1);
+        assertNCDurableSubsCount(brokers.get("BrokerA").broker, dest, 0);
+
+        MessageConsumer clientD = ses4.createDurableSubscriber(dest, "subD");
+        Thread.sleep(1000);
+
+        assertNCDurableSubsCount(brokers.get("BrokerB").broker, dest, 2);
+        assertNCDurableSubsCount(brokers.get("BrokerC").broker, dest, 1);
+        assertNCDurableSubsCount(brokers.get("BrokerD").broker, dest, 1);
+        assertNCDurableSubsCount(brokers.get("BrokerA").broker, dest, 1);
+
+        sendMessages("BrokerA", dest, 1);
+        assertNotNull(clientD.receive(1000));
+        sendMessages("BrokerC", dest, 1);
+        assertNotNull(clientD.receive(1000));
+
+        MessageConsumer clientB = ses2.createDurableSubscriber(dest, "subB");
+        MessageConsumer clientC = ses3.createDurableSubscriber(dest, "subC");
+        Thread.sleep(1000);
+
+        assertNCDurableSubsCount(brokers.get("BrokerB").broker, dest, 3);
+        assertNCDurableSubsCount(brokers.get("BrokerC").broker, dest, 1);
+        assertNCDurableSubsCount(brokers.get("BrokerD").broker, dest, 1);
+        assertNCDurableSubsCount(brokers.get("BrokerA").broker, dest, 1);
+
+        clientA.close();
+        clientAB.close();
+        clientB.close();
+        clientC.close();
+        clientD.close();
+
+        ses.unsubscribe("subA");
+        ses.unsubscribe("subAB");
+        ses2.unsubscribe("subB");
+        ses3.unsubscribe("subC");
+        ses4.unsubscribe("subD");
+
+        assertNCDurableSubsCount(brokers.get("BrokerA").broker, dest, 0);
+        assertNCDurableSubsCount(brokers.get("BrokerB").broker, dest, 0);
+        assertNCDurableSubsCount(brokers.get("BrokerC").broker, dest, 0);
+        assertNCDurableSubsCount(brokers.get("BrokerD").broker, dest, 0);
+    }
+
+    public void testForceDurablePropagationDuplex() throws Exception {
+        duplex = true;
+        testForceDurablePropagation();
+    }
+
+    public void testForceDurablePropagationOneWay() throws Exception {
+        duplex = false;
+        testForceDurablePropagation();
+    }
+
+    protected void testForceDurablePropagation() throws Exception {
+        // Setup broker networks
+        bridgeBrokers("BrokerA", "BrokerB");
+        bridgeBrokers("BrokerB", "BrokerC");
+        if (!duplex) {
+            bridgeBrokers("BrokerB", "BrokerA");
+            bridgeBrokers("BrokerC", "BrokerB");
+        }
+
+        startAllBrokers();
+
+        // Setup destination
+        ActiveMQTopic dest = (ActiveMQTopic) createDestination("TEST.FOO", 
true);
+
+        // Setup consumers
+        Session ses = createSession("BrokerA");
+        MessageConsumer clientA = ses.createConsumer(dest);
+        Thread.sleep(1000);
+
+        // let consumers propagate around the network
+        assertNCDurableSubsCount(brokers.get("BrokerB").broker, dest, 1);
+        assertNCDurableSubsCount(brokers.get("BrokerC").broker, dest, 1);
+        assertNCDurableSubsCount(brokers.get("BrokerA").broker, dest, 0);
+
+        sendMessages("BrokerC", dest, 1);
+        assertNotNull(clientA.receive(1000));
+
+        Session ses2 = createSession("BrokerC");
+        MessageConsumer clientC = ses2.createConsumer(dest);
+        Thread.sleep(1000);
+
+        assertNCDurableSubsCount(brokers.get("BrokerB").broker, dest, 2);
+        assertNCDurableSubsCount(brokers.get("BrokerC").broker, dest, 1);
+        assertNCDurableSubsCount(brokers.get("BrokerA").broker, dest, 1);
+
+        clientA.close();
+        clientC.close();
+
+        assertNCDurableSubsCount(brokers.get("BrokerA").broker, dest, 0);
+        assertNCDurableSubsCount(brokers.get("BrokerB").broker, dest, 0);
+        assertNCDurableSubsCount(brokers.get("BrokerC").broker, dest, 0);
+    }
+
+    public void testDurablePropagationSyncDuplex() throws Exception {
+        duplex = true;
+        testDurablePropagationSync();
+    }
+
+    public void testDurablePropagationSyncOneWay() throws Exception {
+        duplex = false;
+        testDurablePropagationSync();
+    }
+
+    protected void testDurablePropagationSync() throws Exception {
+        // Setup broker networks
+        NetworkConnector nc1 = bridgeBrokers("BrokerA", "BrokerB");
+        NetworkConnector nc2 = bridgeBrokers("BrokerB", "BrokerC");
+
+        NetworkConnector nc3 = null;
+        NetworkConnector nc4 = null;
+        if (!duplex) {
+            nc3 = bridgeBrokers("BrokerB", "BrokerA");
+            nc4 = bridgeBrokers("BrokerC", "BrokerB");
+        }
+
+        startAllBrokers();
+
+        nc1.stop();
+        nc2.stop();
+
+        if (!duplex) {
+            nc3.stop();
+            nc4.stop();
+        }
+
+        // Setup destination
+        ActiveMQTopic dest = (ActiveMQTopic) createDestination("TEST.FOO", 
true);
+
+        // Setup consumers
+        Session ses = createSession("BrokerA");
+        Session ses2 = createSession("BrokerC");
+        MessageConsumer clientA = ses.createDurableSubscriber(dest, "subA");
+        MessageConsumer clientB = ses.createDurableSubscriber(dest, "subB");
+        MessageConsumer clientC = ses2.createDurableSubscriber(dest, "subC");
+        Thread.sleep(1000);
+
+        assertNCDurableSubsCount(brokers.get("BrokerA").broker, dest, 0);
+        assertNCDurableSubsCount(brokers.get("BrokerB").broker, dest, 0);
+        assertNCDurableSubsCount(brokers.get("BrokerC").broker, dest, 0);
+
+        nc1.start();
+        nc2.start();
+        if (!duplex) {
+            nc3.start();
+            nc4.start();
+        }
+
+        //there will be 2 network durables, 1 for each direction of the bridge
+        assertNCDurableSubsCount(brokers.get("BrokerB").broker, dest, 2);
+        assertNCDurableSubsCount(brokers.get("BrokerC").broker, dest, 1);
+        assertNCDurableSubsCount(brokers.get("BrokerA").broker, dest, 1);
+
+        clientA.close();
+        clientB.close();
+        clientC.close();
+    }
+
+    public void testDurablePropagationMultipleBridgesDifferentDestinations() 
throws Exception {
+        duplex = true;
+
+        // Setup broker networks
+        bridgeBrokers("BrokerA", "BrokerB");
+        bridgeBrokers("BrokerB", "BrokerC");
+
+        //Duplicate the bridges with different included destinations - valid 
use case
+        NetworkConnector nc3 = bridgeBrokers("BrokerA", "BrokerB");
+        NetworkConnector nc4 = bridgeBrokers("BrokerB", "BrokerC");
+        nc3.setName("nc3");
+        nc4.setName("nc4");
+        nc3.setDynamicallyIncludedDestinations(
+                Lists.<ActiveMQDestination> newArrayList(new 
ActiveMQTopic("TEST.FOO2?forceDurable=true")));
+        nc4.setDynamicallyIncludedDestinations(
+                Lists.<ActiveMQDestination> newArrayList(new 
ActiveMQTopic("TEST.FOO2?forceDurable=true")));
+
+        startAllBrokers();
+
+        // Setup destination
+        ActiveMQTopic dest = (ActiveMQTopic) createDestination("TEST.FOO", 
true);
+        ActiveMQTopic dest2 = (ActiveMQTopic) createDestination("TEST.FOO2", 
true);
+
+        // Setup consumers
+        Session ses = createSession("BrokerA");
+        Session ses2 = createSession("BrokerC");
+        MessageConsumer clientA = ses.createDurableSubscriber(dest, "subA");
+        MessageConsumer clientAa = ses.createDurableSubscriber(dest2, "subAa");
+        MessageConsumer clientC = ses2.createDurableSubscriber(dest, "subC");
+        MessageConsumer clientCc = ses2.createDurableSubscriber(dest2, 
"subCc");
+        Thread.sleep(1000);
+
+        //make sure network durables are online
+        assertNCDurableSubsCount(brokers.get("BrokerB").broker, dest, 2);
+        assertNCDurableSubsCount(brokers.get("BrokerC").broker, dest, 1);
+        assertNCDurableSubsCount(brokers.get("BrokerA").broker, dest, 1);
+        assertNCDurableSubsCount(brokers.get("BrokerB").broker, dest2, 2);
+        assertNCDurableSubsCount(brokers.get("BrokerC").broker, dest2, 1);
+        assertNCDurableSubsCount(brokers.get("BrokerA").broker, dest2, 1);
+
+        clientA.close();
+        clientC.close();
+        ses.unsubscribe("subA");
+        ses2.unsubscribe("subC");
+
+        assertNCDurableSubsCount(brokers.get("BrokerB").broker, dest, 0);
+        assertNCDurableSubsCount(brokers.get("BrokerC").broker, dest, 0);
+        assertNCDurableSubsCount(brokers.get("BrokerA").broker, dest, 0);
+        assertNCDurableSubsCount(brokers.get("BrokerB").broker, dest2, 2);
+        assertNCDurableSubsCount(brokers.get("BrokerC").broker, dest2, 1);
+        assertNCDurableSubsCount(brokers.get("BrokerA").broker, dest2, 1);
+
+        clientAa.close();
+        clientCc.close();
+        ses.unsubscribe("subAa");
+        ses2.unsubscribe("subCc");
+
+        assertNCDurableSubsCount(brokers.get("BrokerB").broker, dest2, 0);
+        assertNCDurableSubsCount(brokers.get("BrokerC").broker, dest2, 0);
+        assertNCDurableSubsCount(brokers.get("BrokerA").broker, dest2, 0);
+    }
+
+    protected void assertNCDurableSubsCount(final BrokerService brokerService, 
final ActiveMQTopic dest,
+            final int count) throws Exception {
+        assertTrue(Wait.waitFor(new Condition() {
+            @Override
+            public boolean isSatisified() throws Exception {
+                return count == getNCDurableSubs(brokerService, dest).size();
+            }
+        }, 10000, 500));
+    }
+
+    protected List<DurableTopicSubscription> getNCDurableSubs(final 
BrokerService brokerService,
+            final ActiveMQTopic dest) throws Exception {
+        List<DurableTopicSubscription> subs = new ArrayList<>();
+        Destination d = brokerService.getDestination(dest);
+        org.apache.activemq.broker.region.Topic destination = null;
+        if (d instanceof DestinationFilter) {
+            destination = ((DestinationFilter) 
d).getAdaptor(org.apache.activemq.broker.region.Topic.class);
+        } else {
+            destination = (org.apache.activemq.broker.region.Topic) d;
+        }
+
+        for (SubscriptionKey key : destination.getDurableTopicSubs().keySet()) 
{
+            if 
(key.getSubscriptionName().startsWith(DemandForwardingBridge.DURABLE_SUB_PREFIX))
 {
+                DurableTopicSubscription sub = 
destination.getDurableTopicSubs().get(key);
+                if (sub != null) {
+                    subs.add(sub);
+                }
+            }
+        }
+
+        return subs;
+    }
+
+    @Override
+    public void setUp() throws Exception {
+        super.setAutoFail(true);
+        super.setUp();
+        String options = new String("?persistent=false&useJmx=false");
+        createBroker(new URI("broker:(tcp://localhost:61616)/BrokerA" + 
options));
+        createBroker(new URI("broker:(tcp://localhost:61617)/BrokerB" + 
options));
+        createBroker(new URI("broker:(tcp://localhost:61618)/BrokerC" + 
options));
+        createBroker(new URI("broker:(tcp://localhost:61619)/BrokerD" + 
options));
+        createBroker(new URI("broker:(tcp://localhost:61620)/BrokerE" + 
options));
+    }
+
+    @Override
+    protected void configureBroker(BrokerService broker) {
+        broker.setBrokerId(broker.getBrokerName());
+    }
+
+    protected Session createSession(String broker) throws Exception {
+        Connection con = createConnection(broker);
+        con.start();
+        return con.createSession(false, Session.AUTO_ACKNOWLEDGE);
+    }
+
+    public static Test suite() {
+        return suite(DurableFiveBrokerNetworkBridgeTest.class);
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq/blob/97fe20a5/activemq-unit-tests/src/test/java/org/apache/activemq/network/DurableThreeBrokerNetworkBridgeTest.java
----------------------------------------------------------------------
diff --git 
a/activemq-unit-tests/src/test/java/org/apache/activemq/network/DurableThreeBrokerNetworkBridgeTest.java
 
b/activemq-unit-tests/src/test/java/org/apache/activemq/network/DurableThreeBrokerNetworkBridgeTest.java
deleted file mode 100644
index ff09a1c..0000000
--- 
a/activemq-unit-tests/src/test/java/org/apache/activemq/network/DurableThreeBrokerNetworkBridgeTest.java
+++ /dev/null
@@ -1,241 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.network;
-
-import java.net.URI;
-import java.util.ArrayList;
-import java.util.List;
-
-import javax.jms.Connection;
-import javax.jms.MessageConsumer;
-import javax.jms.Session;
-
-import org.apache.activemq.JmsMultipleBrokersTestSupport;
-import org.apache.activemq.broker.BrokerService;
-import org.apache.activemq.broker.region.Destination;
-import org.apache.activemq.broker.region.DestinationFilter;
-import org.apache.activemq.broker.region.DurableTopicSubscription;
-import org.apache.activemq.command.ActiveMQDestination;
-import org.apache.activemq.command.ActiveMQTopic;
-import org.apache.activemq.util.MessageIdList;
-import org.apache.activemq.util.SubscriptionKey;
-import org.apache.activemq.util.Wait;
-import org.apache.activemq.util.Wait.Condition;
-
-import com.google.common.collect.Lists;
-
-import junit.framework.Test;
-
-/**
- * Test to make sure durable subscriptions propagate properly throughout 
network bridges
- * and that conduit subscriptions work properly
- */
-public class DurableThreeBrokerNetworkBridgeTest extends 
JmsMultipleBrokersTestSupport {
-
-    @Override
-    protected NetworkConnector bridgeBrokers(String localBrokerName, String 
remoteBrokerName) throws Exception {
-        NetworkConnector connector = super.bridgeBrokers(localBrokerName, 
remoteBrokerName);
-        connector.setDynamicallyIncludedDestinations(
-                Lists.<ActiveMQDestination> newArrayList(new 
ActiveMQTopic("TEST.FOO?forceDurable=true")));
-        connector.setDuplex(true);
-        connector.setDecreaseNetworkConsumerPriority(false);
-        connector.setConduitSubscriptions(true);
-        connector.setSyncDurableSubs(true);
-        connector.setNetworkTTL(-1);
-        return connector;
-    }
-
-    /**
-     * BrokerA -> BrokerB -> BrokerC
-     */
-    public void testDurablePropagation() throws Exception {
-        // Setup broker networks
-        bridgeBrokers("BrokerA", "BrokerB");
-        bridgeBrokers("BrokerB", "BrokerC");
-
-        startAllBrokers();
-
-        // Setup destination
-        ActiveMQTopic dest = (ActiveMQTopic) createDestination("TEST.FOO", 
true);
-
-        // Setup consumers
-        Session ses = createSession("BrokerA");
-        MessageConsumer clientA = ses.createDurableSubscriber(dest, "subA");
-        MessageConsumer clientB = ses.createDurableSubscriber(dest, "subB");
-
-        // let consumers propagate around the network
-        assertNCDurableSubsCount(brokers.get("BrokerB").broker, dest, 1);
-        assertNCDurableSubsCount(brokers.get("BrokerC").broker, dest, 1);
-        assertNCDurableSubsCount(brokers.get("BrokerA").broker, dest, 0);
-
-        sendMessages("BrokerC", dest, 1);
-        assertNotNull(clientA.receive(1000));
-        assertNotNull(clientB.receive(1000));
-
-        //bring online a consumer on the other side
-        Session ses2 = createSession("BrokerC");
-        MessageConsumer clientC = ses2.createDurableSubscriber(dest, "subC");
-        //there will be 2 network durables, 1 for each direction of the bridge
-        assertNCDurableSubsCount(brokers.get("BrokerB").broker, dest, 2);
-        assertNCDurableSubsCount(brokers.get("BrokerC").broker, dest, 1);
-        assertNCDurableSubsCount(brokers.get("BrokerA").broker, dest, 1);
-
-        clientA.close();
-        clientB.close();
-        clientC.close();
-        ses.unsubscribe("subA");
-        ses.unsubscribe("subB");
-        ses2.unsubscribe("subC");
-
-        assertNCDurableSubsCount(brokers.get("BrokerA").broker, dest, 0);
-        assertNCDurableSubsCount(brokers.get("BrokerB").broker, dest, 0);
-        assertNCDurableSubsCount(brokers.get("BrokerC").broker, dest, 0);
-
-    }
-
-    public void testForceDurablePropagation() throws Exception {
-        // Setup broker networks
-        bridgeBrokers("BrokerA", "BrokerB");
-        bridgeBrokers("BrokerB", "BrokerC");
-
-        startAllBrokers();
-
-        // Setup destination
-        ActiveMQTopic dest = (ActiveMQTopic) createDestination("TEST.FOO", 
true);
-
-        // Setup consumers
-        Session ses = createSession("BrokerA");
-        MessageConsumer clientA = ses.createConsumer(dest);
-
-        // let consumers propagate around the network
-        assertNCDurableSubsCount(brokers.get("BrokerB").broker, dest, 1);
-        assertNCDurableSubsCount(brokers.get("BrokerC").broker, dest, 1);
-        assertNCDurableSubsCount(brokers.get("BrokerA").broker, dest, 0);
-
-        sendMessages("BrokerC", dest, 1);
-        assertNotNull(clientA.receive(1000));
-
-        Session ses2 = createSession("BrokerC");
-        MessageConsumer clientC = ses2.createConsumer(dest);
-        assertNCDurableSubsCount(brokers.get("BrokerB").broker, dest, 2);
-        assertNCDurableSubsCount(brokers.get("BrokerC").broker, dest, 1);
-        assertNCDurableSubsCount(brokers.get("BrokerA").broker, dest, 1);
-
-        clientA.close();
-        clientC.close();
-
-        assertNCDurableSubsCount(brokers.get("BrokerA").broker, dest, 0);
-        assertNCDurableSubsCount(brokers.get("BrokerB").broker, dest, 0);
-        assertNCDurableSubsCount(brokers.get("BrokerC").broker, dest, 0);
-    }
-
-    public void testDurablePropagationSync() throws Exception {
-        // Setup broker networks
-        NetworkConnector nc1 = bridgeBrokers("BrokerA", "BrokerB");
-        NetworkConnector nc2 = bridgeBrokers("BrokerB", "BrokerC");
-
-        startAllBrokers();
-
-        nc1.stop();
-        nc2.stop();
-
-        // Setup destination
-        ActiveMQTopic dest = (ActiveMQTopic) createDestination("TEST.FOO", 
true);
-
-        // Setup consumers
-        Session ses = createSession("BrokerA");
-        Session ses2 = createSession("BrokerC");
-        MessageConsumer clientA = ses.createDurableSubscriber(dest, "subA");
-        MessageConsumer clientB = ses.createDurableSubscriber(dest, "subB");
-        MessageConsumer clientC = ses2.createDurableSubscriber(dest, "subC");
-
-        assertNCDurableSubsCount(brokers.get("BrokerA").broker, dest, 0);
-        assertNCDurableSubsCount(brokers.get("BrokerB").broker, dest, 0);
-        assertNCDurableSubsCount(brokers.get("BrokerC").broker, dest, 0);
-
-        nc1.start();
-        nc2.start();
-
-        //there will be 2 network durables, 1 for each direction of the bridge
-        assertNCDurableSubsCount(brokers.get("BrokerB").broker, dest, 2);
-        assertNCDurableSubsCount(brokers.get("BrokerC").broker, dest, 1);
-        assertNCDurableSubsCount(brokers.get("BrokerA").broker, dest, 1);
-
-        clientA.close();
-        clientB.close();
-        clientC.close();
-    }
-
-
-    protected void assertNCDurableSubsCount(final BrokerService brokerService, 
final ActiveMQTopic dest,
-            final int count) throws Exception {
-        assertTrue(Wait.waitFor(new Condition() {
-            @Override
-            public boolean isSatisified() throws Exception {
-                return count == getNCDurableSubs(brokerService, dest).size();
-            }
-        }, 10000, 500));
-    }
-
-    protected List<DurableTopicSubscription> getNCDurableSubs(final 
BrokerService brokerService,
-            final ActiveMQTopic dest) throws Exception {
-        List<DurableTopicSubscription> subs = new ArrayList<>();
-        Destination d = brokerService.getDestination(dest);
-        org.apache.activemq.broker.region.Topic destination = null;
-        if (d instanceof DestinationFilter) {
-            destination = ((DestinationFilter) 
d).getAdaptor(org.apache.activemq.broker.region.Topic.class);
-        } else {
-            destination = (org.apache.activemq.broker.region.Topic) d;
-        }
-
-        for (SubscriptionKey key : destination.getDurableTopicSubs().keySet()) 
{
-            if 
(key.getSubscriptionName().startsWith(DemandForwardingBridge.DURABLE_SUB_PREFIX))
 {
-                DurableTopicSubscription sub = 
destination.getDurableTopicSubs().get(key);
-                if (sub != null) {
-                    subs.add(sub);
-                }
-            }
-        }
-
-        return subs;
-    }
-
-    @Override
-    public void setUp() throws Exception {
-        super.setAutoFail(true);
-        super.setUp();
-        String options = new String("?persistent=false&useJmx=false");
-        createBroker(new URI("broker:(tcp://localhost:61616)/BrokerA" + 
options));
-        createBroker(new URI("broker:(tcp://localhost:61617)/BrokerB" + 
options));
-        createBroker(new URI("broker:(tcp://localhost:61618)/BrokerC" + 
options));
-    }
-
-    @Override
-    protected void configureBroker(BrokerService broker) {
-        broker.setBrokerId(broker.getBrokerName());
-    }
-
-    protected Session createSession(String broker) throws Exception {
-        Connection con = createConnection(broker);
-        con.start();
-        return con.createSession(false, Session.AUTO_ACKNOWLEDGE);
-    }
-
-    public static Test suite() {
-        return suite(DurableThreeBrokerNetworkBridgeTest.class);
-    }
-}

Reply via email to