Repository: activemq Updated Branches: refs/heads/master 7dad09a9c -> 96ce14b27
AMQ-6858 - Fix several durable subscription bridge propagation issues Durable network proxy subs will now be properly created across multiple bridges when 3 or more brokers are used. Demand will be properly synced and removed. Project: http://git-wip-us.apache.org/repos/asf/activemq/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/96ce14b2 Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/96ce14b2 Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/96ce14b2 Branch: refs/heads/master Commit: 96ce14b278fef9e5f428f1c3c07ce5c09fd8f9a8 Parents: 7dad09a Author: Christopher L. Shannon (cshannon) <christopher.l.shan...@gmail.com> Authored: Mon Nov 6 08:42:03 2017 -0500 Committer: Christopher L. Shannon (cshannon) <christopher.l.shan...@gmail.com> Committed: Thu Nov 9 11:21:18 2017 -0500 ---------------------------------------------------------------------- .../apache/activemq/network/ConduitBridge.java | 16 +- .../network/DemandForwardingBridgeSupport.java | 49 +++- .../activemq/network/DurableConduitBridge.java | 19 +- .../network/NetworkBridgeConfiguration.java | 13 + .../DurableThreeBrokerNetworkBridgeTest.java | 241 +++++++++++++++++++ .../VerifyNetworkConsumersDisconnectTest.java | 68 +++++- 6 files changed, 384 insertions(+), 22 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq/blob/96ce14b2/activemq-broker/src/main/java/org/apache/activemq/network/ConduitBridge.java ---------------------------------------------------------------------- diff --git a/activemq-broker/src/main/java/org/apache/activemq/network/ConduitBridge.java b/activemq-broker/src/main/java/org/apache/activemq/network/ConduitBridge.java index 3c0b85b..6ced896 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/network/ConduitBridge.java +++ b/activemq-broker/src/main/java/org/apache/activemq/network/ConduitBridge.java @@ -56,12 +56,16 @@ public class ConduitBridge extends DemandForwardingBridge { } protected boolean addToAlreadyInterestedConsumers(ConsumerInfo info, boolean isForcedDurable) { - // search through existing subscriptions and see if we have a match - if (info.isNetworkSubscription()) { + //If a network subscription and a queue check if isConduitNetworkQueueSubscriptions is true + //If true then we want to try and conduit + //For topics we always want to conduit regardless of network subscription or not + if (info.isNetworkSubscription() && info.getDestination().isQueue() && + !configuration.isConduitNetworkQueueSubscriptions()) { return false; } boolean matched = false; + // search through existing subscriptions and see if we have a match for (DemandSubscription ds : subscriptionMapByLocalId.values()) { DestinationFilter filter = DestinationFilter.parseFilter(ds.getLocalInfo().getDestination()); if (canConduit(ds) && filter.matches(info.getDestination())) { @@ -86,9 +90,13 @@ public class ConduitBridge extends DemandForwardingBridge { } // we want to conduit statically included consumers which are local networkSubs - // but we don't want to conduit remote network subs i.e. (proxy proxy) consumers + // but we don't want to conduit remote network queue subs i.e. (proxy proxy) consumers + // unless isConduitNetworkQueueSubscriptions is true + // We always want to conduit topic subscriptions private boolean canConduit(DemandSubscription ds) { - return ds.isStaticallyIncluded() || !ds.getRemoteInfo().isNetworkSubscription(); + return ds.isStaticallyIncluded() || ds.getRemoteInfo().getDestination().isTopic() || + !ds.getRemoteInfo().isNetworkSubscription() || + (ds.getRemoteInfo().getDestination().isQueue() && configuration.isConduitNetworkQueueSubscriptions()); } @Override http://git-wip-us.apache.org/repos/asf/activemq/blob/96ce14b2/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java ---------------------------------------------------------------------- diff --git a/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java b/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java index 879ab39..efdfa5a 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java +++ b/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java @@ -661,6 +661,16 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br } } + /** + * Checks whether or not this consumer is a direct bridge network subscription + * @param info + * @return + */ + protected boolean isBridgeNS(ConsumerInfo info) { + return (info.getSubscriptionName() != null && info.getSubscriptionName().startsWith(DURABLE_SUB_PREFIX)) && + (info.getClientId() == null || info.getClientId().startsWith(configuration.getName())); + } + protected void serviceRemoteCommand(Command command) { if (!disposed.get()) { try { @@ -694,7 +704,9 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br for (ConsumerInfo info : subInfo.getSubscriptionInfos()) { //re-add any process any non-NC consumers that match the //dynamicallyIncludedDestinations list - if((info.getSubscriptionName() == null || !info.getSubscriptionName().startsWith(DURABLE_SUB_PREFIX)) && + //Also re-add network consumers that are not part of this direct + //bridge (proxy of proxy bridges) + if((info.getSubscriptionName() == null || !isBridgeNS(info)) && NetworkBridgeUtils.matchesDestinations(dynamicallyIncludedDestinations, info.getDestination())) { serviceRemoteConsumerAdvisory(info); } @@ -986,7 +998,9 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br localBroker.oneway(sending); //remove subscriber from map - i.remove(); + if (i != null) { + i.remove(); + } } } @@ -1072,7 +1086,7 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br subscriptionMapByLocalId.remove(sub.getLocalInfo().getConsumerId()); subscriptionMapByRemoteId.remove(sub.getRemoteInfo().getConsumerId()); - // continue removal in separate thread to free up this thread for outstanding responses + // continue removal in separate thread to free up tshis thread for outstanding responses // Serialize with removeDestination operations so that removeSubs are serialized with // removeDestinations such that all removeSub advisories are generated serialExecutor.execute(new Runnable() { @@ -1080,7 +1094,18 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br public void run() { sub.waitForCompletion(); try { - localBroker.oneway(sub.getLocalInfo().createRemoveCommand()); + //If removing a network durable subscription that still has durable remote subs + //make sure we cleanup the durable subscription properly - necessary when using + //durable subscriptions and 3 or more brokers + if (configuration.isConduitSubscriptions() && + sub.getLocalInfo().getSubscriptionName() != null && + sub.getLocalInfo().getSubscriptionName().startsWith(DURABLE_SUB_PREFIX) && + sub.getDurableRemoteSubs().size() > 0) { + sub.getDurableRemoteSubs().clear(); + cleanupDurableSub(sub, null); + } else { + localBroker.oneway(sub.getLocalInfo().createRemoveCommand()); + } } catch (IOException e) { LOG.warn("failed to deliver remove command for local subscription, for remote {}", sub.getRemoteInfo().getConsumerId(), e); } @@ -1315,13 +1340,17 @@ public abstract class DemandForwardingBridgeSupport implements NetworkBridge, Br for (ActiveMQDestination dest : dests) { if (isPermissableDestination(dest)) { DemandSubscription sub = createDemandSubscription(dest, null); - sub.setStaticallyIncluded(true); - try { - addSubscription(sub); - } catch (IOException e) { - LOG.error("Failed to add static destination {}", dest, e); + if (sub != null) { + sub.setStaticallyIncluded(true); + try { + addSubscription(sub); + } catch (IOException e) { + LOG.error("Failed to add static destination {}", dest, e); + } + LOG.trace("{}, bridging messages for static destination: {}", configuration.getBrokerName(), dest); + } else { + LOG.info("{}, static destination excluded: {}, demand already exists", configuration.getBrokerName(), dest); } - LOG.trace("{}, bridging messages for static destination: {}", configuration.getBrokerName(), dest); } else { LOG.info("{}, static destination excluded: {}", configuration.getBrokerName(), dest); } http://git-wip-us.apache.org/repos/asf/activemq/blob/96ce14b2/activemq-broker/src/main/java/org/apache/activemq/network/DurableConduitBridge.java ---------------------------------------------------------------------- diff --git a/activemq-broker/src/main/java/org/apache/activemq/network/DurableConduitBridge.java b/activemq-broker/src/main/java/org/apache/activemq/network/DurableConduitBridge.java index fb2b6c9..42f30a4 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/network/DurableConduitBridge.java +++ b/activemq-broker/src/main/java/org/apache/activemq/network/DurableConduitBridge.java @@ -75,12 +75,15 @@ public class DurableConduitBridge extends ConduitBridge { String candidateSubName = getSubscriberName(dest); for (Subscription subscription : topicRegion.getDurableSubscriptions().values()) { String subName = subscription.getConsumerInfo().getSubscriptionName(); - if (subName != null && subName.equals(candidateSubName)) { + String clientId = subscription.getContext().getClientId(); + if (subName != null && subName.equals(candidateSubName) && clientId.startsWith(configuration.getName())) { DemandSubscription sub = createDemandSubscription(dest, subName); - sub.getLocalInfo().setSubscriptionName(getSubscriberName(dest)); - sub.setStaticallyIncluded(true); - addSubscription(sub); - break; + if (sub != null) { + sub.getLocalInfo().setSubscriptionName(getSubscriberName(dest)); + sub.setStaticallyIncluded(true); + addSubscription(sub); + break; + } } } } @@ -139,8 +142,12 @@ public class DurableConduitBridge extends ConduitBridge { info.setSubscriptionName(getSubscriberName(info.getDestination())); // and override the consumerId with something unique so that it won't // be removed if the durable subscriber (at the other end) goes away - info.setConsumerId(new ConsumerId(localSessionInfo.getSessionId(), + //Only do this for direct bridge consumers - proxy network consumers we don't + //want to replace the consumerId or cleanup won't happen properly + if (info.getBrokerPath().length == 1 || (info.getBrokerPath().length > 1 && info.getBrokerPath()[0] == remoteBrokerPath[0])) { + info.setConsumerId(new ConsumerId(localSessionInfo.getSessionId(), consumerIdGenerator.getNextSequenceId())); + } } info.setSelector(null); DemandSubscription demandSubscription = doCreateDemandSubscription(info); http://git-wip-us.apache.org/repos/asf/activemq/blob/96ce14b2/activemq-broker/src/main/java/org/apache/activemq/network/NetworkBridgeConfiguration.java ---------------------------------------------------------------------- diff --git a/activemq-broker/src/main/java/org/apache/activemq/network/NetworkBridgeConfiguration.java b/activemq-broker/src/main/java/org/apache/activemq/network/NetworkBridgeConfiguration.java index 9e596d4..b2ca78a 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/network/NetworkBridgeConfiguration.java +++ b/activemq-broker/src/main/java/org/apache/activemq/network/NetworkBridgeConfiguration.java @@ -29,6 +29,11 @@ import org.apache.activemq.command.ConsumerInfo; public class NetworkBridgeConfiguration { private boolean conduitSubscriptions = true; + /** + * Whether or not network subscriptions on queues are eligible to be conduit + * Default is false + */ + private boolean conduitNetworkQueueSubscriptions; private boolean useVirtualDestSubs; private boolean dynamicOnly; private boolean syncDurableSubs; @@ -85,6 +90,14 @@ public class NetworkBridgeConfiguration { this.conduitSubscriptions = conduitSubscriptions; } + public boolean isConduitNetworkQueueSubscriptions() { + return conduitNetworkQueueSubscriptions; + } + + public void setConduitNetworkQueueSubscriptions(boolean conduitNetworkQueueSubscriptions) { + this.conduitNetworkQueueSubscriptions = conduitNetworkQueueSubscriptions; + } + /** * @return the dynamicOnly */ http://git-wip-us.apache.org/repos/asf/activemq/blob/96ce14b2/activemq-unit-tests/src/test/java/org/apache/activemq/network/DurableThreeBrokerNetworkBridgeTest.java ---------------------------------------------------------------------- diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/network/DurableThreeBrokerNetworkBridgeTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/network/DurableThreeBrokerNetworkBridgeTest.java new file mode 100644 index 0000000..ff09a1c --- /dev/null +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/network/DurableThreeBrokerNetworkBridgeTest.java @@ -0,0 +1,241 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.network; + +import java.net.URI; +import java.util.ArrayList; +import java.util.List; + +import javax.jms.Connection; +import javax.jms.MessageConsumer; +import javax.jms.Session; + +import org.apache.activemq.JmsMultipleBrokersTestSupport; +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.broker.region.Destination; +import org.apache.activemq.broker.region.DestinationFilter; +import org.apache.activemq.broker.region.DurableTopicSubscription; +import org.apache.activemq.command.ActiveMQDestination; +import org.apache.activemq.command.ActiveMQTopic; +import org.apache.activemq.util.MessageIdList; +import org.apache.activemq.util.SubscriptionKey; +import org.apache.activemq.util.Wait; +import org.apache.activemq.util.Wait.Condition; + +import com.google.common.collect.Lists; + +import junit.framework.Test; + +/** + * Test to make sure durable subscriptions propagate properly throughout network bridges + * and that conduit subscriptions work properly + */ +public class DurableThreeBrokerNetworkBridgeTest extends JmsMultipleBrokersTestSupport { + + @Override + protected NetworkConnector bridgeBrokers(String localBrokerName, String remoteBrokerName) throws Exception { + NetworkConnector connector = super.bridgeBrokers(localBrokerName, remoteBrokerName); + connector.setDynamicallyIncludedDestinations( + Lists.<ActiveMQDestination> newArrayList(new ActiveMQTopic("TEST.FOO?forceDurable=true"))); + connector.setDuplex(true); + connector.setDecreaseNetworkConsumerPriority(false); + connector.setConduitSubscriptions(true); + connector.setSyncDurableSubs(true); + connector.setNetworkTTL(-1); + return connector; + } + + /** + * BrokerA -> BrokerB -> BrokerC + */ + public void testDurablePropagation() throws Exception { + // Setup broker networks + bridgeBrokers("BrokerA", "BrokerB"); + bridgeBrokers("BrokerB", "BrokerC"); + + startAllBrokers(); + + // Setup destination + ActiveMQTopic dest = (ActiveMQTopic) createDestination("TEST.FOO", true); + + // Setup consumers + Session ses = createSession("BrokerA"); + MessageConsumer clientA = ses.createDurableSubscriber(dest, "subA"); + MessageConsumer clientB = ses.createDurableSubscriber(dest, "subB"); + + // let consumers propagate around the network + assertNCDurableSubsCount(brokers.get("BrokerB").broker, dest, 1); + assertNCDurableSubsCount(brokers.get("BrokerC").broker, dest, 1); + assertNCDurableSubsCount(brokers.get("BrokerA").broker, dest, 0); + + sendMessages("BrokerC", dest, 1); + assertNotNull(clientA.receive(1000)); + assertNotNull(clientB.receive(1000)); + + //bring online a consumer on the other side + Session ses2 = createSession("BrokerC"); + MessageConsumer clientC = ses2.createDurableSubscriber(dest, "subC"); + //there will be 2 network durables, 1 for each direction of the bridge + assertNCDurableSubsCount(brokers.get("BrokerB").broker, dest, 2); + assertNCDurableSubsCount(brokers.get("BrokerC").broker, dest, 1); + assertNCDurableSubsCount(brokers.get("BrokerA").broker, dest, 1); + + clientA.close(); + clientB.close(); + clientC.close(); + ses.unsubscribe("subA"); + ses.unsubscribe("subB"); + ses2.unsubscribe("subC"); + + assertNCDurableSubsCount(brokers.get("BrokerA").broker, dest, 0); + assertNCDurableSubsCount(brokers.get("BrokerB").broker, dest, 0); + assertNCDurableSubsCount(brokers.get("BrokerC").broker, dest, 0); + + } + + public void testForceDurablePropagation() throws Exception { + // Setup broker networks + bridgeBrokers("BrokerA", "BrokerB"); + bridgeBrokers("BrokerB", "BrokerC"); + + startAllBrokers(); + + // Setup destination + ActiveMQTopic dest = (ActiveMQTopic) createDestination("TEST.FOO", true); + + // Setup consumers + Session ses = createSession("BrokerA"); + MessageConsumer clientA = ses.createConsumer(dest); + + // let consumers propagate around the network + assertNCDurableSubsCount(brokers.get("BrokerB").broker, dest, 1); + assertNCDurableSubsCount(brokers.get("BrokerC").broker, dest, 1); + assertNCDurableSubsCount(brokers.get("BrokerA").broker, dest, 0); + + sendMessages("BrokerC", dest, 1); + assertNotNull(clientA.receive(1000)); + + Session ses2 = createSession("BrokerC"); + MessageConsumer clientC = ses2.createConsumer(dest); + assertNCDurableSubsCount(brokers.get("BrokerB").broker, dest, 2); + assertNCDurableSubsCount(brokers.get("BrokerC").broker, dest, 1); + assertNCDurableSubsCount(brokers.get("BrokerA").broker, dest, 1); + + clientA.close(); + clientC.close(); + + assertNCDurableSubsCount(brokers.get("BrokerA").broker, dest, 0); + assertNCDurableSubsCount(brokers.get("BrokerB").broker, dest, 0); + assertNCDurableSubsCount(brokers.get("BrokerC").broker, dest, 0); + } + + public void testDurablePropagationSync() throws Exception { + // Setup broker networks + NetworkConnector nc1 = bridgeBrokers("BrokerA", "BrokerB"); + NetworkConnector nc2 = bridgeBrokers("BrokerB", "BrokerC"); + + startAllBrokers(); + + nc1.stop(); + nc2.stop(); + + // Setup destination + ActiveMQTopic dest = (ActiveMQTopic) createDestination("TEST.FOO", true); + + // Setup consumers + Session ses = createSession("BrokerA"); + Session ses2 = createSession("BrokerC"); + MessageConsumer clientA = ses.createDurableSubscriber(dest, "subA"); + MessageConsumer clientB = ses.createDurableSubscriber(dest, "subB"); + MessageConsumer clientC = ses2.createDurableSubscriber(dest, "subC"); + + assertNCDurableSubsCount(brokers.get("BrokerA").broker, dest, 0); + assertNCDurableSubsCount(brokers.get("BrokerB").broker, dest, 0); + assertNCDurableSubsCount(brokers.get("BrokerC").broker, dest, 0); + + nc1.start(); + nc2.start(); + + //there will be 2 network durables, 1 for each direction of the bridge + assertNCDurableSubsCount(brokers.get("BrokerB").broker, dest, 2); + assertNCDurableSubsCount(brokers.get("BrokerC").broker, dest, 1); + assertNCDurableSubsCount(brokers.get("BrokerA").broker, dest, 1); + + clientA.close(); + clientB.close(); + clientC.close(); + } + + + protected void assertNCDurableSubsCount(final BrokerService brokerService, final ActiveMQTopic dest, + final int count) throws Exception { + assertTrue(Wait.waitFor(new Condition() { + @Override + public boolean isSatisified() throws Exception { + return count == getNCDurableSubs(brokerService, dest).size(); + } + }, 10000, 500)); + } + + protected List<DurableTopicSubscription> getNCDurableSubs(final BrokerService brokerService, + final ActiveMQTopic dest) throws Exception { + List<DurableTopicSubscription> subs = new ArrayList<>(); + Destination d = brokerService.getDestination(dest); + org.apache.activemq.broker.region.Topic destination = null; + if (d instanceof DestinationFilter) { + destination = ((DestinationFilter) d).getAdaptor(org.apache.activemq.broker.region.Topic.class); + } else { + destination = (org.apache.activemq.broker.region.Topic) d; + } + + for (SubscriptionKey key : destination.getDurableTopicSubs().keySet()) { + if (key.getSubscriptionName().startsWith(DemandForwardingBridge.DURABLE_SUB_PREFIX)) { + DurableTopicSubscription sub = destination.getDurableTopicSubs().get(key); + if (sub != null) { + subs.add(sub); + } + } + } + + return subs; + } + + @Override + public void setUp() throws Exception { + super.setAutoFail(true); + super.setUp(); + String options = new String("?persistent=false&useJmx=false"); + createBroker(new URI("broker:(tcp://localhost:61616)/BrokerA" + options)); + createBroker(new URI("broker:(tcp://localhost:61617)/BrokerB" + options)); + createBroker(new URI("broker:(tcp://localhost:61618)/BrokerC" + options)); + } + + @Override + protected void configureBroker(BrokerService broker) { + broker.setBrokerId(broker.getBrokerName()); + } + + protected Session createSession(String broker) throws Exception { + Connection con = createConnection(broker); + con.start(); + return con.createSession(false, Session.AUTO_ACKNOWLEDGE); + } + + public static Test suite() { + return suite(DurableThreeBrokerNetworkBridgeTest.class); + } +} http://git-wip-us.apache.org/repos/asf/activemq/blob/96ce14b2/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/VerifyNetworkConsumersDisconnectTest.java ---------------------------------------------------------------------- diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/VerifyNetworkConsumersDisconnectTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/VerifyNetworkConsumersDisconnectTest.java index 9eeb28c..d04e1f5 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/VerifyNetworkConsumersDisconnectTest.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/VerifyNetworkConsumersDisconnectTest.java @@ -61,14 +61,19 @@ public class VerifyNetworkConsumersDisconnectTest extends JmsMultipleBrokersTest unhandledExceptions.isEmpty()); } - public NetworkConnector bridge(String from, String to) throws Exception { + public NetworkConnector bridge(String from, String to, boolean conduitNetworkQueueSubscriptions) throws Exception { NetworkConnector networkConnector = bridgeBrokers(from, to, true, NETWORK_TTL, CONDUIT); networkConnector.setSuppressDuplicateQueueSubscriptions(true); networkConnector.setDecreaseNetworkConsumerPriority(true); networkConnector.setDuplex(DUPLEX); + networkConnector.setConduitNetworkQueueSubscriptions(conduitNetworkQueueSubscriptions); return networkConnector; } + public NetworkConnector bridge(String from, String to) throws Exception { + return bridge(from, to, false); + } + /*why conduit proxy proxy consumers gets us in a knot w.r.t removal DC-7 for CA-9, add DB-15, remove CA-9, add CB-8 CB-8 add DC-7 @@ -137,6 +142,63 @@ public class VerifyNetworkConsumersDisconnectTest extends JmsMultipleBrokersTest } + public void testConsumerOnEachBrokerNetworkQueueConduitSubs() throws Exception { + bridge("Broker0", "Broker1", true); + if (!DUPLEX) bridge("Broker1", "Broker0", true); + + bridge("Broker1", "Broker2", true); + if (!DUPLEX) bridge("Broker2", "Broker1", true); + + startAllBrokers(); + waitForBridgeFormation(brokers.get("Broker0").broker, 1, 0); + waitForBridgeFormation(brokers.get("Broker2").broker, 1, 0); + waitForBridgeFormation(brokers.get("Broker1").broker, 1, 0); + waitForBridgeFormation(brokers.get("Broker1").broker, 1, 1); + + Destination dest = createDestination("TEST.FOO", false); + + // Setup consumers + for (int i = 0; i < BROKER_COUNT; i++) { + consumerMap.put("Consumer:" + i + ":0", createConsumer("Broker" + i, dest)); + } + + //Conduit network queue conduit subs is true so should only be 2 subs + assertExactConsumersConnect("Broker0", 2, 1, TIMEOUT); + assertExactConsumersConnect("Broker2", 2, 1, TIMEOUT); + // still should be 3 subs for the middle broker, 1 for each direction + assertExactConsumersConnect("Broker1", 3, 1, TIMEOUT); + + assertNoUnhandledExceptions(); + + LOG.info("Complete the mesh - 0->2"); + + // shorter route + NetworkConnector nc = bridge("Broker0", "Broker2"); + nc.setBrokerName("Broker0"); + nc.start(); + + + if (!DUPLEX) { + LOG.info("... complete the mesh - 2->0"); + nc = bridge("Broker2", "Broker0"); + nc.setBrokerName("Broker2"); + nc.start(); + } + + // reverse order close + consumerMap.get("Consumer:" + 2 + ":0").close(); + TimeUnit.SECONDS.sleep(1); + consumerMap.get("Consumer:" + 1 + ":0").close(); + TimeUnit.SECONDS.sleep(1); + consumerMap.get("Consumer:" + 0 + ":0").close(); + + LOG.info("Check for no consumers.."); + for (int i = 0; i < BROKER_COUNT; i++) { + assertExactConsumersConnect("Broker" + i, 0, 0, TIMEOUT); + } + + } + public void testXConsumerOnEachBroker() throws Exception { bridge("Broker0", "Broker1"); if (!DUPLEX) bridge("Broker1", "Broker0"); @@ -232,13 +294,14 @@ public class VerifyNetworkConsumersDisconnectTest extends JmsMultipleBrokersTest }, timeout)); } + @Override public void setUp() throws Exception { super.setAutoFail(true); super.setUp(); unhandledExceptions.clear(); Thread.setDefaultUncaughtExceptionHandler(this); - + // Setup n brokers for (int i = 0; i < BROKER_COUNT; i++) { createBroker(new URI("broker:(tcp://localhost:6161" + i + ")/Broker" + i + "?persistent=false&useJmx=true&brokerId=Broker" + i)); @@ -256,6 +319,7 @@ public class VerifyNetworkConsumersDisconnectTest extends JmsMultipleBrokersTest brokerService.setDestinationPolicy(policyMap); } + @Override public void uncaughtException(Thread t, Throwable e) { synchronized(unhandledExceptions) { unhandledExceptions.put(t, e);