http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/network/DuplexNetworkMBeanTest.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/network/DuplexNetworkMBeanTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/network/DuplexNetworkMBeanTest.java new file mode 100644 index 0000000..213d4ae --- /dev/null +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/network/DuplexNetworkMBeanTest.java @@ -0,0 +1,171 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.network; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assume.assumeNotNull; + +import java.net.MalformedURLException; +import java.util.Set; + +import javax.management.ObjectInstance; +import javax.management.ObjectName; + +import org.apache.activemq.broker.BrokerService; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class DuplexNetworkMBeanTest { + + protected static final Logger LOG = LoggerFactory.getLogger(DuplexNetworkMBeanTest.class); + protected final int numRestarts = 3; + + protected BrokerService createBroker() throws Exception { + BrokerService broker = new BrokerService(); + broker.setBrokerName("broker"); + broker.addConnector("tcp://localhost:61617?transport.reuseAddress=true"); + + return broker; + } + + protected BrokerService createNetworkedBroker() throws Exception { + BrokerService broker = new BrokerService(); + broker.setBrokerName("networkedBroker"); + broker.addConnector("tcp://localhost:62617?transport.reuseAddress=true"); + NetworkConnector networkConnector = broker.addNetworkConnector("static:(tcp://localhost:61617?wireFormat.maxInactivityDuration=500)?useExponentialBackOff=false"); + networkConnector.setDuplex(true); + return broker; + } + + @Test + public void testMbeanPresenceOnNetworkBrokerRestart() throws Exception { + BrokerService broker = createBroker(); + try { + broker.start(); + assertEquals(1, countMbeans(broker, "connector", 30000)); + assertEquals(0, countMbeans(broker, "connectionName")); + BrokerService networkedBroker = null; + for (int i=0; i<numRestarts; i++) { + networkedBroker = createNetworkedBroker(); + try { + networkedBroker.start(); + assertEquals(1, countMbeans(networkedBroker, "networkBridge", 2000)); + assertEquals(1, countMbeans(broker, "networkBridge", 2000)); + assertEquals(2, countMbeans(broker, "connectionName")); + } finally { + networkedBroker.stop(); + networkedBroker.waitUntilStopped(); + } + assertEquals(0, countMbeans(networkedBroker, "stopped")); + assertEquals(0, countMbeans(broker, "networkBridge")); + } + + assertEquals(0, countMbeans(networkedBroker, "networkBridge")); + assertEquals(0, countMbeans(networkedBroker, "connector")); + assertEquals(0, countMbeans(networkedBroker, "connectionName")); + assertEquals(1, countMbeans(broker, "connector")); + } finally { + broker.stop(); + broker.waitUntilStopped(); + } + } + + @Test + public void testMbeanPresenceOnBrokerRestart() throws Exception { + + BrokerService networkedBroker = createNetworkedBroker(); + try { + networkedBroker.start(); + assertEquals(1, countMbeans(networkedBroker, "connector=networkConnectors", 30000)); + assertEquals(0, countMbeans(networkedBroker, "connectionName")); + + BrokerService broker = null; + for (int i=0; i<numRestarts; i++) { + broker = createBroker(); + try { + broker.start(); + assertEquals(1, countMbeans(networkedBroker, "networkBridge", 5000)); + assertEquals("restart number: " + i, 2, countMbeans(broker, "connectionName", 10000)); + } finally { + broker.stop(); + broker.waitUntilStopped(); + } + assertEquals(0, countMbeans(broker, "stopped")); + } + + assertEquals(1, countMbeans(networkedBroker, "connector=networkConnectors")); + assertEquals(0, countMbeans(networkedBroker, "connectionName")); + assertEquals(0, countMbeans(broker, "connectionName")); + } finally { + networkedBroker.stop(); + networkedBroker.waitUntilStopped(); + } + } + + private int countMbeans(BrokerService broker, String type) throws Exception { + return countMbeans(broker, type, 0); + } + + private int countMbeans(BrokerService broker, String type, int timeout) throws Exception { + final long expiryTime = System.currentTimeMillis() + timeout; + + if (!type.contains("=")) { + type = type + "=*"; + } + + final ObjectName beanName = new ObjectName("org.apache.activemq:type=Broker,brokerName=" + + broker.getBrokerName() + "," + type +",*"); + Set<ObjectName> mbeans = null; + int count = 0; + do { + if (timeout > 0) { + Thread.sleep(100); + } + + LOG.info("Query name: " + beanName); + mbeans = broker.getManagementContext().queryNames(beanName, null); + if (mbeans != null) { + count = mbeans.size(); + } else { + logAllMbeans(broker); + } + } while ((mbeans == null || mbeans.isEmpty()) && expiryTime > System.currentTimeMillis()); + + // If port 1099 is in use when the Broker starts, starting the jmx connector + // will fail. So, if we have no mbsc to query, skip the test. + if (timeout > 0) { + assumeNotNull(mbeans); + } + + return count; + } + + private void logAllMbeans(BrokerService broker) throws MalformedURLException { + try { + // trace all existing MBeans + Set<?> all = broker.getManagementContext().queryNames(null, null); + LOG.info("Total MBean count=" + all.size()); + for (Object o : all) { + ObjectInstance bean = (ObjectInstance)o; + LOG.info(bean.getObjectName().toString()); + } + } catch (Exception ignored) { + LOG.warn("getMBeanServer ex: " + ignored); + } + } +}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/network/DuplexNetworkTest.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/network/DuplexNetworkTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/network/DuplexNetworkTest.java new file mode 100644 index 0000000..3afa7d1 --- /dev/null +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/network/DuplexNetworkTest.java @@ -0,0 +1,60 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.network; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +import javax.jms.MessageProducer; +import javax.jms.TemporaryQueue; + +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.util.Wait; +import org.junit.Test; + +public class DuplexNetworkTest extends SimpleNetworkTest { + + @Override + protected String getLocalBrokerURI() { + return "org/apache/activemq/network/duplexLocalBroker.xml"; + } + + @Override + protected BrokerService createRemoteBroker() throws Exception { + BrokerService broker = new BrokerService(); + broker.setBrokerName("remoteBroker"); + broker.addConnector("tcp://localhost:61617"); + return broker; + } + + @Test + public void testTempQueues() throws Exception { + TemporaryQueue temp = localSession.createTemporaryQueue(); + MessageProducer producer = localSession.createProducer(temp); + producer.send(localSession.createTextMessage("test")); + Thread.sleep(100); + assertEquals("Destination not created", 1, remoteBroker.getAdminView().getTemporaryQueues().length); + temp.delete(); + + assertTrue("Destination not deleted", Wait.waitFor(new Wait.Condition() { + @Override + public boolean isSatisified() throws Exception { + return 0 == remoteBroker.getAdminView().getTemporaryQueues().length; + } + })); + } +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/network/DynamicallyIncludedDestinationsDuplexNetworkTest.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/network/DynamicallyIncludedDestinationsDuplexNetworkTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/network/DynamicallyIncludedDestinationsDuplexNetworkTest.java new file mode 100644 index 0000000..3dd8be6 --- /dev/null +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/network/DynamicallyIncludedDestinationsDuplexNetworkTest.java @@ -0,0 +1,117 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.network; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; + +import java.lang.reflect.Field; +import java.util.concurrent.CopyOnWriteArrayList; + +import javax.jms.MessageProducer; +import javax.jms.TemporaryQueue; + +import org.apache.activemq.advisory.AdvisorySupport; +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.broker.TransportConnection; +import org.apache.activemq.broker.TransportConnector; +import org.junit.Test; + +/** + * @author <a href="http://www.christianposta.com/blog">Christian Posta</a> + */ +public class DynamicallyIncludedDestinationsDuplexNetworkTest extends SimpleNetworkTest { + + private static final int REMOTE_BROKER_TCP_PORT = 61617; + + @Override + protected String getLocalBrokerURI() { + return "org/apache/activemq/network/duplexDynamicIncludedDestLocalBroker.xml"; + } + + @Override + protected BrokerService createRemoteBroker() throws Exception { + BrokerService broker = new BrokerService(); + broker.setBrokerName("remoteBroker"); + broker.addConnector("tcp://localhost:" + REMOTE_BROKER_TCP_PORT); + return broker; + } + + // we have to override this, because with dynamicallyIncludedDestinations working properly + // (see https://issues.apache.org/jira/browse/AMQ-4209) you can't get request/response + // with temps working (there is no wild card like there is for staticallyIncludedDest) + // + @Override + public void testRequestReply() throws Exception { + + } + + @Test + public void testTempQueues() throws Exception { + TemporaryQueue temp = localSession.createTemporaryQueue(); + MessageProducer producer = localSession.createProducer(temp); + producer.send(localSession.createTextMessage("test")); + Thread.sleep(100); + assertEquals("Destination not created", 1, remoteBroker.getAdminView().getTemporaryQueues().length); + temp.delete(); + Thread.sleep(100); + assertEquals("Destination not deleted", 0, remoteBroker.getAdminView().getTemporaryQueues().length); + } + + @Test + public void testDynamicallyIncludedDestinationsForDuplex() throws Exception{ + // Once the bridge is set up, we should see the filter used for the duplex end of the bridge + // only subscribe to the specific destinations included in the <dynamicallyIncludedDestinations> list + // so let's test that the filter is correct, let's also test the subscription on the localbroker + // is correct + + // the bridge on the remote broker has the correct filter + TransportConnection bridgeConnection = getDuplexBridgeConnectionFromRemote(); + assertNotNull(bridgeConnection); + DemandForwardingBridge duplexBridge = getDuplexBridgeFromConnection(bridgeConnection); + assertNotNull(duplexBridge); + NetworkBridgeConfiguration configuration = getConfigurationFromNetworkBridge(duplexBridge); + assertNotNull(configuration); + assertFalse("This destinationFilter does not include ONLY the destinations specified in dynamicallyIncludedDestinations", + configuration.getDestinationFilter().equals(AdvisorySupport.CONSUMER_ADVISORY_TOPIC_PREFIX + ">")); + assertEquals("There are other patterns in the destinationFilter that shouldn't be there", + "ActiveMQ.Advisory.Consumer.Queue.include.test.foo,ActiveMQ.Advisory.Consumer.Topic.include.test.bar", + configuration.getDestinationFilter()); + } + + private NetworkBridgeConfiguration getConfigurationFromNetworkBridge(DemandForwardingBridgeSupport duplexBridge) throws NoSuchFieldException, IllegalAccessException { + Field f = DemandForwardingBridgeSupport.class.getDeclaredField("configuration"); + f.setAccessible(true); + NetworkBridgeConfiguration configuration = (NetworkBridgeConfiguration) f.get(duplexBridge); + return configuration; + } + + private DemandForwardingBridge getDuplexBridgeFromConnection(TransportConnection bridgeConnection) throws NoSuchFieldException, IllegalAccessException { + Field f = TransportConnection.class.getDeclaredField("duplexBridge"); + f.setAccessible(true); + DemandForwardingBridge bridge = (DemandForwardingBridge) f.get(bridgeConnection); + return bridge; + } + + public TransportConnection getDuplexBridgeConnectionFromRemote() { + TransportConnector transportConnector = remoteBroker.getTransportConnectorByScheme("tcp"); + CopyOnWriteArrayList<TransportConnection> transportConnections = transportConnector.getConnections(); + TransportConnection duplexBridgeConnectionFromRemote = transportConnections.get(0); + return duplexBridgeConnectionFromRemote; + } +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/network/FailoverStaticNetworkTest.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/network/FailoverStaticNetworkTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/network/FailoverStaticNetworkTest.java new file mode 100644 index 0000000..4113f14 --- /dev/null +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/network/FailoverStaticNetworkTest.java @@ -0,0 +1,467 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.network; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +import java.net.URI; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.Vector; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.atomic.AtomicBoolean; + +import javax.jms.Connection; +import javax.jms.ConnectionFactory; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.Session; +import javax.management.ObjectName; +import javax.net.ssl.KeyManager; +import javax.net.ssl.TrustManager; + +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.broker.SslContext; +import org.apache.activemq.broker.TransportConnector; +import org.apache.activemq.command.ActiveMQDestination; +import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter; +import org.apache.activemq.transport.tcp.SslBrokerServiceTest; +import org.apache.activemq.util.IntrospectionSupport; +import org.apache.activemq.util.JMXSupport; +import org.apache.activemq.util.Wait; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class FailoverStaticNetworkTest { + protected static final Logger LOG = LoggerFactory.getLogger(FailoverStaticNetworkTest.class); + + private final static String DESTINATION_NAME = "testQ"; + protected BrokerService brokerA; + protected BrokerService brokerA1; + protected BrokerService brokerB; + protected BrokerService brokerC; + + + private SslContext sslContext; + + protected BrokerService createBroker(String scheme, String listenPort, String[] networkToPorts) throws Exception { + return createBroker(scheme, listenPort, networkToPorts, null); + } + + protected BrokerService createBroker(String scheme, String listenPort, String[] networkToPorts, + HashMap<String, String> networkProps) throws Exception { + BrokerService broker = new BrokerService(); + broker.getManagementContext().setCreateConnector(false); + broker.setSslContext(sslContext); + broker.setDeleteAllMessagesOnStartup(true); + broker.setBrokerName("Broker_" + listenPort); + // lazy init listener on broker start + TransportConnector transportConnector = new TransportConnector(); + transportConnector.setUri(new URI(scheme + "://localhost:" + listenPort)); + List<TransportConnector> transportConnectors = new ArrayList<TransportConnector>(); + transportConnectors.add(transportConnector); + broker.setTransportConnectors(transportConnectors); + if (networkToPorts != null && networkToPorts.length > 0) { + StringBuilder builder = new StringBuilder("static:(failover:(" + scheme + "://localhost:"); + builder.append(networkToPorts[0]); + for (int i=1;i<networkToPorts.length; i++) { + builder.append("," + scheme + "://localhost:" + networkToPorts[i]); + } + // limit the reconnects in case of initial random connection to slave + // leaving randomize on verifies that this config is picked up + builder.append(")?maxReconnectAttempts=0)?useExponentialBackOff=false"); + NetworkConnector nc = broker.addNetworkConnector(builder.toString()); + if (networkProps != null) { + IntrospectionSupport.setProperties(nc, networkProps); + } + } + return broker; + } + + private BrokerService createBroker(String listenPort, String dataDir) throws Exception { + BrokerService broker = new BrokerService(); + broker.setUseJmx(false); + broker.getManagementContext().setCreateConnector(false); + broker.setBrokerName("Broker_Shared"); + // lazy create transport connector on start completion + TransportConnector connector = new TransportConnector(); + connector.setUri(new URI("tcp://localhost:" + listenPort)); + broker.addConnector(connector); + broker.setDataDirectory(dataDir); + return broker; + } + + @Before + public void setUp() throws Exception { + KeyManager[] km = SslBrokerServiceTest.getKeyManager(); + TrustManager[] tm = SslBrokerServiceTest.getTrustManager(); + sslContext = new SslContext(km, tm, null); + } + + @After + public void tearDown() throws Exception { + brokerB.stop(); + brokerB.waitUntilStopped(); + + brokerA.stop(); + brokerA.waitUntilStopped(); + + if (brokerA1 != null) { + brokerA1.stop(); + brokerA1.waitUntilStopped(); + } + + if (brokerC != null) { + brokerC.stop(); + brokerC.waitUntilStopped(); + } + } + + @Test + public void testSendReceiveAfterReconnect() throws Exception { + brokerA = createBroker("tcp", "61617", null); + brokerA.start(); + brokerB = createBroker("tcp", "62617", new String[]{"61617"}); + brokerB.start(); + doTestNetworkSendReceive(); + + LOG.info("stopping brokerA"); + brokerA.stop(); + brokerA.waitUntilStopped(); + + LOG.info("restarting brokerA"); + brokerA = createBroker("tcp", "61617", null); + brokerA.start(); + + doTestNetworkSendReceive(); + } + + @Test + public void testSendReceiveFailover() throws Exception { + brokerA = createBroker("tcp", "61617", null); + brokerA.start(); + brokerB = createBroker("tcp", "62617", new String[]{"61617", "63617"}); + brokerB.start(); + doTestNetworkSendReceive(); + + // check mbean + Set<String> bridgeNames = getNetworkBridgeMBeanName(brokerB); + assertEquals("only one bridgeName: " + bridgeNames, 1, bridgeNames.size()); + + LOG.info("stopping brokerA"); + brokerA.stop(); + brokerA.waitUntilStopped(); + + LOG.info("restarting brokerA"); + brokerA = createBroker("tcp", "63617", null); + brokerA.start(); + + doTestNetworkSendReceive(); + + Set<String> otherBridgeNames = getNetworkBridgeMBeanName(brokerB); + assertEquals("only one bridgeName: " + otherBridgeNames, 1, otherBridgeNames.size()); + + assertTrue("there was an addition", bridgeNames.addAll(otherBridgeNames)); + } + + private Set<String> getNetworkBridgeMBeanName(BrokerService brokerB) throws Exception { + Set<String> names = new HashSet<String>(); + for (ObjectName objectName : brokerB.getManagementContext().queryNames(null, null)) { + if (objectName.getKeyProperty("networkBridge") != null) { + names.add(objectName.getKeyProperty("networkBridge")); + } + } + return names; + } + + @Test + public void testSendReceiveFailoverDuplex() throws Exception { + final Vector<Throwable> errors = new Vector<Throwable>(); + final String dataDir = "target/data/shared"; + brokerA = createBroker("61617", dataDir); + brokerA.start(); + + final BrokerService slave = createBroker("63617", dataDir); + brokerA1 = slave; + ExecutorService executor = Executors.newCachedThreadPool(); + executor.execute(new Runnable() { + @Override + public void run() { + try { + slave.start(); + } catch (Exception e) { + e.printStackTrace(); + errors.add(e); + } + } + }); + executor.shutdown(); + + HashMap<String, String> networkConnectorProps = new HashMap<String, String>(); + networkConnectorProps.put("duplex", "true"); + brokerB = createBroker("tcp", "62617", new String[]{"61617", "63617"}, networkConnectorProps); + brokerB.start(); + + doTestNetworkSendReceive(brokerA, brokerB); + doTestNetworkSendReceive(brokerB, brokerA); + + LOG.info("stopping brokerA (master shared_broker)"); + brokerA.stop(); + brokerA.waitUntilStopped(); + + // wait for slave to start + brokerA1.waitUntilStarted(); + + doTestNetworkSendReceive(brokerA1, brokerB); + doTestNetworkSendReceive(brokerB, brokerA1); + + assertTrue("No unexpected exceptions " + errors, errors.isEmpty()); + } + + @Test + // master slave piggy in the middle setup + public void testSendReceiveFailoverDuplexWithPIM() throws Exception { + final String dataDir = "target/data/shared/pim"; + brokerA = createBroker("61617", dataDir); + brokerA.start(); + + final BrokerService slave = createBroker("63617", dataDir); + brokerA1 = slave; + ExecutorService executor = Executors.newCachedThreadPool(); + executor.execute(new Runnable() { + @Override + public void run() { + try { + slave.start(); + } catch (Exception e) { + e.printStackTrace(); + } + } + }); + executor.shutdown(); + + HashMap<String, String> networkConnectorProps = new HashMap<String, String>(); + networkConnectorProps.put("duplex", "true"); + networkConnectorProps.put("networkTTL", "2"); + + brokerB = createBroker("tcp", "62617", new String[]{"61617", "63617"}, networkConnectorProps); + brokerB.start(); + + assertTrue("all props applied", networkConnectorProps.isEmpty()); + networkConnectorProps.put("duplex", "true"); + networkConnectorProps.put("networkTTL", "2"); + + brokerC = createBroker("tcp", "64617", new String[]{"61617", "63617"}, networkConnectorProps); + brokerC.start(); + assertTrue("all props applied a second time", networkConnectorProps.isEmpty()); + + doTestNetworkSendReceive(brokerC, brokerB); + doTestNetworkSendReceive(brokerB, brokerC); + + LOG.info("stopping brokerA (master shared_broker)"); + brokerA.stop(); + brokerA.waitUntilStopped(); + + doTestNetworkSendReceive(brokerC, brokerB); + doTestNetworkSendReceive(brokerB, brokerC); + + brokerC.stop(); + brokerC.waitUntilStopped(); + } + + /** + * networked broker started after target so first connect attempt succeeds + * start order is important + */ + @Test + public void testSendReceive() throws Exception { + + brokerA = createBroker("tcp", "61617", null); + brokerA.start(); + brokerB = createBroker("tcp", "62617", new String[]{"61617","1111"}); + brokerB.start(); + + doTestNetworkSendReceive(); + } + + @Test + public void testSendReceiveSsl() throws Exception { + + brokerA = createBroker("ssl", "61617", null); + brokerA.start(); + brokerB = createBroker("ssl", "62617", new String[]{"61617", "1111"}); + brokerB.start(); + + doTestNetworkSendReceive(); + } + + @Test + public void testRepeatedSendReceiveWithMasterSlaveAlternate() throws Exception { + doTestRepeatedSendReceiveWithMasterSlaveAlternate(null); + } + + @Test + public void testRepeatedSendReceiveWithMasterSlaveAlternateDuplex() throws Exception { + HashMap<String, String> networkConnectorProps = new HashMap<String, String>(); + networkConnectorProps.put("duplex", "true"); + + doTestRepeatedSendReceiveWithMasterSlaveAlternate(networkConnectorProps); + } + + public void doTestRepeatedSendReceiveWithMasterSlaveAlternate(HashMap<String, String> networkConnectorProps) throws Exception { + + brokerB = createBroker("tcp", "62617", new String[]{"61610","61611"}, networkConnectorProps); + brokerB.start(); + + final AtomicBoolean done = new AtomicBoolean(false); + ExecutorService executorService = Executors.newCachedThreadPool(); + executorService.execute(new Runnable() { + @Override + public void run() { + try { + while (!done.get()) { + brokerA = createBroker("tcp", "61610", null); + brokerA.setBrokerName("Pair"); + brokerA.setBrokerObjectName(new ObjectName(brokerA.getManagementContext().getJmxDomainName() + ":" + "BrokerName=" + + JMXSupport.encodeObjectNamePart("A") + "," + "Type=Broker")); + ((KahaDBPersistenceAdapter)brokerA.getPersistenceAdapter()).getLocker().setLockAcquireSleepInterval(1000); + brokerA.start(); + brokerA.waitUntilStopped(); + + // restart after peer taken over + brokerA1.waitUntilStarted(); + } + } catch (Exception ignored) { + LOG.info("A create/start, unexpected: " + ignored, ignored); + } + } + }); + + // start with brokerA as master + Wait.waitFor(new Wait.Condition() { + @Override + public boolean isSatisified() throws Exception { + return brokerA != null && brokerA.waitUntilStarted(); + } + }); + + executorService.execute(new Runnable() { + @Override + public void run() { + try { + while (!done.get()) { + brokerA1 = createBroker("tcp", "61611", null); + brokerA1.setBrokerName("Pair"); + // so they can coexist in local jmx we set the object name b/c the brokername identifies the shared store + brokerA1.setBrokerObjectName(new ObjectName(brokerA.getManagementContext().getJmxDomainName() + ":" + "BrokerName=" + + JMXSupport.encodeObjectNamePart("A1") + "," + "Type=Broker")); + ((KahaDBPersistenceAdapter)brokerA1.getPersistenceAdapter()).getLocker().setLockAcquireSleepInterval(1000); + brokerA1.start(); + brokerA1.waitUntilStopped(); + + // restart after peer taken over + brokerA.waitUntilStarted(); + } + } catch (Exception ignored) { + LOG.info("A1 create/start, unexpected: " + ignored, ignored); + } + } + }); + + for (int i=0; i<4; i++) { + BrokerService currentMaster = (i%2 == 0 ? brokerA : brokerA1); + LOG.info("iteration: " + i + ", using: " + currentMaster.getBrokerObjectName().getKeyProperty("BrokerName")); + currentMaster.waitUntilStarted(); + + doTestNetworkSendReceive(brokerB, currentMaster); + + LOG.info("Stopping " + currentMaster.getBrokerObjectName().getKeyProperty("BrokerName")); + currentMaster.stop(); + currentMaster.waitUntilStopped(); + } + + done.set(true); + LOG.info("all done"); + executorService.shutdownNow(); + } + + private void doTestNetworkSendReceive() throws Exception, JMSException { + doTestNetworkSendReceive(brokerB, brokerA); + } + + private void doTestNetworkSendReceive(final BrokerService to, final BrokerService from) throws Exception, JMSException { + + LOG.info("Creating Consumer on the networked broker ..." + from); + + SslContext.setCurrentSslContext(sslContext); + // Create a consumer on brokerA + ConnectionFactory consFactory = createConnectionFactory(from); + Connection consConn = consFactory.createConnection(); + consConn.start(); + Session consSession = consConn.createSession(false, Session.AUTO_ACKNOWLEDGE); + ActiveMQDestination destination = (ActiveMQDestination) consSession.createQueue(DESTINATION_NAME); + final MessageConsumer consumer = consSession.createConsumer(destination); + + LOG.info("publishing to " + to); + + sendMessageTo(destination, to); + + boolean gotMessage = Wait.waitFor(new Wait.Condition() { + @Override + public boolean isSatisified() throws Exception { + Message message = consumer.receive(5000); + LOG.info("from: " + from.getBrokerObjectName().getKeyProperty("BrokerName") + ", received: " + message); + return message != null; + } + }); + try { + consConn.close(); + } catch (JMSException ignored) { + } + assertTrue("consumer on A got message", gotMessage); + } + + private void sendMessageTo(ActiveMQDestination destination, BrokerService brokerService) throws Exception { + ConnectionFactory factory = createConnectionFactory(brokerService); + Connection conn = factory.createConnection(); + conn.start(); + Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE); + session.createProducer(destination).send(session.createTextMessage("Hi")); + conn.close(); + } + + protected ConnectionFactory createConnectionFactory(final BrokerService broker) throws Exception { + String url = broker.getTransportConnectors().get(0).getServer().getConnectURI().toString(); + ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(url); + connectionFactory.setOptimizedMessageDispatch(true); + connectionFactory.setDispatchAsync(false); + connectionFactory.setUseAsyncSend(false); + connectionFactory.setOptimizeAcknowledge(false); + connectionFactory.setAlwaysSyncSend(true); + return connectionFactory; + } +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/network/ForwardingBridgeTest.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/network/ForwardingBridgeTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/network/ForwardingBridgeTest.java new file mode 100644 index 0000000..71006e3 --- /dev/null +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/network/ForwardingBridgeTest.java @@ -0,0 +1,154 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.network; + +import javax.jms.DeliveryMode; + +import junit.framework.Test; + +import org.apache.activemq.broker.StubConnection; +import org.apache.activemq.command.ActiveMQDestination; +import org.apache.activemq.command.ActiveMQMessage; +import org.apache.activemq.command.ConnectionInfo; +import org.apache.activemq.command.ConsumerInfo; +import org.apache.activemq.command.Message; +import org.apache.activemq.command.ProducerInfo; +import org.apache.activemq.command.SessionInfo; + +public class ForwardingBridgeTest extends NetworkTestSupport { + + public ActiveMQDestination destination; + public byte destinationType; + public int deliveryMode; + private ForwardingBridge bridge; + + public void initCombosForTestForwardMessageCompressed() { + addCombinationValues("deliveryMode", new Object[] {new Integer(DeliveryMode.NON_PERSISTENT), + new Integer(DeliveryMode.PERSISTENT)}); + addCombinationValues("destinationType", new Object[] {new Byte(ActiveMQDestination.QUEUE_TYPE), + new Byte(ActiveMQDestination.TOPIC_TYPE)}); + } + + public void testForwardMessageCompressed() throws Exception { + + bridge.setUseCompression(true); + + // Start a producer on local broker + StubConnection connection1 = createConnection(); + ConnectionInfo connectionInfo1 = createConnectionInfo(); + SessionInfo sessionInfo1 = createSessionInfo(connectionInfo1); + ProducerInfo producerInfo = createProducerInfo(sessionInfo1); + connection1.send(connectionInfo1); + connection1.send(sessionInfo1); + connection1.send(producerInfo); + + destination = createDestinationInfo(connection1, connectionInfo1, destinationType); + + // Start a consumer on a remote broker + StubConnection connection2 = createRemoteConnection(); + ConnectionInfo connectionInfo2 = createConnectionInfo(); + SessionInfo sessionInfo2 = createSessionInfo(connectionInfo2); + connection2.send(connectionInfo2); + connection2.send(sessionInfo2); + ConsumerInfo consumerInfo = createConsumerInfo(sessionInfo2, destination); + connection2.send(consumerInfo); + Thread.sleep(1000); + // Give forwarding bridge a chance to finish setting up + try { + Thread.sleep(1000); + } catch (InterruptedException ie) { + ie.printStackTrace(); + } + + // Send the message to the local boker. + connection1.send(createMessage(producerInfo, destination, deliveryMode)); + + // Make sure the message was delivered via the remote. + Message m = receiveMessage(connection2); + assertNotNull(m); + + // Make sure its compressed now + ActiveMQMessage message = (ActiveMQMessage) m; + assertTrue(message.isCompressed()); + } + + public void initCombosForTestAddConsumerThenSend() { + addCombinationValues("deliveryMode", new Object[] {new Integer(DeliveryMode.NON_PERSISTENT), + new Integer(DeliveryMode.PERSISTENT)}); + addCombinationValues("destinationType", new Object[] {new Byte(ActiveMQDestination.QUEUE_TYPE), + new Byte(ActiveMQDestination.TOPIC_TYPE)}); + } + + public void testAddConsumerThenSend() throws Exception { + // Start a producer on local broker + StubConnection connection1 = createConnection(); + ConnectionInfo connectionInfo1 = createConnectionInfo(); + SessionInfo sessionInfo1 = createSessionInfo(connectionInfo1); + ProducerInfo producerInfo = createProducerInfo(sessionInfo1); + connection1.send(connectionInfo1); + connection1.send(sessionInfo1); + connection1.send(producerInfo); + + destination = createDestinationInfo(connection1, connectionInfo1, destinationType); + + // Start a consumer on a remote broker + StubConnection connection2 = createRemoteConnection(); + ConnectionInfo connectionInfo2 = createConnectionInfo(); + SessionInfo sessionInfo2 = createSessionInfo(connectionInfo2); + connection2.send(connectionInfo2); + connection2.send(sessionInfo2); + ConsumerInfo consumerInfo = createConsumerInfo(sessionInfo2, destination); + connection2.send(consumerInfo); + Thread.sleep(1000); + // Give forwarding bridge a chance to finish setting up + try { + Thread.sleep(1000); + } catch (InterruptedException ie) { + ie.printStackTrace(); + } + + // Send the message to the local boker. + connection1.send(createMessage(producerInfo, destination, deliveryMode)); + + // Make sure the message was delivered via the remote. + + Message m = receiveMessage(connection2); + assertNotNull(m); + } + + protected void setUp() throws Exception { + super.setUp(); + bridge = new ForwardingBridge(createTransport(), createRemoteTransport()); + bridge.setClientId("local-remote-bridge"); + bridge.setDispatchAsync(false); + bridge.start(); + } + + protected void tearDown() throws Exception { + bridge.stop(); + super.tearDown(); + } + + public static Test suite() { + return suite(ForwardingBridgeTest.class); + } + + public static void main(String[] args) { + junit.textui.TestRunner.run(suite()); + } + +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/network/MQTTNetworkOfBrokersFailoverTest.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/network/MQTTNetworkOfBrokersFailoverTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/network/MQTTNetworkOfBrokersFailoverTest.java new file mode 100644 index 0000000..928a7a6 --- /dev/null +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/network/MQTTNetworkOfBrokersFailoverTest.java @@ -0,0 +1,254 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.network; + +import java.net.URI; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +import javax.jms.Connection; +import javax.jms.Destination; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageListener; +import javax.jms.Session; +import javax.management.ObjectName; + +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.broker.TransportConnector; +import org.apache.activemq.broker.jmx.BrokerViewMBean; +import org.apache.activemq.broker.jmx.DurableSubscriptionViewMBean; +import org.apache.activemq.store.PersistenceAdapter; +import org.apache.commons.lang.ArrayUtils; +import org.fusesource.hawtdispatch.Dispatch; +import org.fusesource.mqtt.client.BlockingConnection; +import org.fusesource.mqtt.client.MQTT; +import org.fusesource.mqtt.client.QoS; +import org.fusesource.mqtt.client.Topic; +import org.fusesource.mqtt.client.Tracer; +import org.fusesource.mqtt.codec.MQTTFrame; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Created by ceposta + * <a href="http://christianposta.com/blog>http://christianposta.com/blog</a>. + */ +public class MQTTNetworkOfBrokersFailoverTest extends NetworkTestSupport { + + private static final Logger LOG = LoggerFactory.getLogger(MQTTNetworkOfBrokersFailoverTest.class); + private int localBrokerMQTTPort = -1; + private int remoteBrokerMQTTPort = -1; + + @Override + protected void setUp() throws Exception { + useJmx=true; + super.setUp(); + + URI ncUri = new URI("static:(" + connector.getConnectUri().toString() + ")"); + NetworkConnector nc = new DiscoveryNetworkConnector(ncUri); + nc.setDuplex(true); + remoteBroker.addNetworkConnector(nc); + nc.start(); + + // mqtt port should have been assigned by now + assertFalse(localBrokerMQTTPort == -1); + assertFalse(remoteBrokerMQTTPort == -1); + } + + @Override + protected void tearDown() throws Exception { + if (remoteBroker.isStarted()) { + remoteBroker.stop(); + remoteBroker.waitUntilStopped(); + } + if (broker.isStarted()) { + broker.stop(); + broker.waitUntilStopped(); + } + super.tearDown(); + } + + @Test + public void testNoStaleSubscriptionAcrossNetwork() throws Exception { + + // before we get started, we want an async way to be able to know when + // the durable consumer has been networked so we can assert that it indeed + // would have a durable subscriber. for example, when we subscribe on remote broker, + // a network-sub would be created on local broker and we want to listen for when that + // even happens. we do that with advisory messages and a latch: + CountDownLatch consumerNetworked = listenForConsumersOn(broker); + + // create a subscription with Clean == 0 (durable sub for QoS==1 && QoS==2) + // on the remote broker. this sub should still be there after we disconnect + MQTT remoteMqtt = createMQTTTcpConnection("foo", false, remoteBrokerMQTTPort); + BlockingConnection remoteConn = remoteMqtt.blockingConnection(); + remoteConn.connect(); + remoteConn.subscribe(new Topic[]{new Topic("foo/bar", QoS.AT_LEAST_ONCE)}); + + assertTrue("No destination detected!", consumerNetworked.await(1, TimeUnit.SECONDS)); + assertQueueExistsOn(remoteBroker, "Consumer.foo_AT_LEAST_ONCE.VirtualTopic.foo.bar"); + assertQueueExistsOn(broker, "Consumer.foo_AT_LEAST_ONCE.VirtualTopic.foo.bar"); + remoteConn.disconnect(); + + // now we reconnect the same sub on the local broker, again with clean==0 + MQTT localMqtt = createMQTTTcpConnection("foo", false, localBrokerMQTTPort); + BlockingConnection localConn = localMqtt.blockingConnection(); + localConn.connect(); + localConn.subscribe(new Topic[]{new Topic("foo/bar", QoS.AT_LEAST_ONCE)}); + + // now let's connect back up to remote broker and send a message + remoteConn = remoteMqtt.blockingConnection(); + remoteConn.connect(); + remoteConn.publish("foo/bar", "Hello, World!".getBytes(), QoS.AT_LEAST_ONCE, false); + + // now we should see that message on the local broker because the subscription + // should have been properly networked... we'll give a sec of grace for the + // networking and forwarding to have happened properly + org.fusesource.mqtt.client.Message msg = localConn.receive(100, TimeUnit.SECONDS); + assertNotNull(msg); + msg.ack(); + String response = new String(msg.getPayload()); + assertEquals("Hello, World!", response); + assertEquals("foo/bar", msg.getTopic()); + + // Now... we SHOULD NOT see a message on the remote broker because we already + // consumed it on the local broker... having the same message on the remote broker + // would effectively give us duplicates in a distributed topic scenario: + remoteConn.subscribe(new Topic[]{new Topic("foo/bar", QoS.AT_LEAST_ONCE)}); + msg = remoteConn.receive(500, TimeUnit.MILLISECONDS); + assertNull("We have duplicate messages across the cluster for a distributed topic", msg); + } + + private CountDownLatch listenForConsumersOn(BrokerService broker) throws Exception { + final CountDownLatch latch = new CountDownLatch(1); + + URI brokerUri = broker.getVmConnectorURI(); + + final ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory(brokerUri.toASCIIString()); + final Connection connection = cf.createConnection(); + connection.start(); + final Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + Destination dest = session.createTopic("ActiveMQ.Advisory.Consumer.Queue.Consumer.foo:AT_LEAST_ONCE.VirtualTopic.foo.bar"); + MessageConsumer consumer = session.createConsumer(dest); + consumer.setMessageListener(new MessageListener() { + @Override + public void onMessage(Message message) { + latch.countDown(); + // shutdown this connection + Dispatch.getGlobalQueue().execute(new Runnable() { + @Override + public void run() { + try { + session.close(); + connection.close(); + } catch (JMSException e) { + e.printStackTrace(); + } + } + }); + } + }); + + return latch; + } + + private void assertQueueExistsOn(BrokerService broker, String queueName) throws Exception { + BrokerViewMBean brokerView = broker.getAdminView(); + ObjectName[] queueNames = brokerView.getQueues(); + assertEquals(1, queueNames.length); + + assertTrue(queueNames[0].toString().contains(queueName)); + } + + @SuppressWarnings("unused") + private void assertOneDurableSubOn(BrokerService broker, String subName) throws Exception { + BrokerViewMBean brokerView = broker.getAdminView(); + ObjectName[] activeDurableSubs = brokerView.getDurableTopicSubscribers(); + ObjectName[] inactiveDurableSubs = brokerView.getInactiveDurableTopicSubscribers(); + ObjectName[] allDurables = (ObjectName[]) ArrayUtils.addAll(activeDurableSubs, inactiveDurableSubs); + assertEquals(1, allDurables.length); + + // at this point our assertions should prove that we have only on durable sub + DurableSubscriptionViewMBean durableSubView = (DurableSubscriptionViewMBean) + broker.getManagementContext().newProxyInstance(allDurables[0], DurableSubscriptionViewMBean.class, true); + + assertEquals(subName, durableSubView.getClientId()); + } + + @Override + protected BrokerService createBroker() throws Exception { + BrokerService broker = super.createBroker(); + broker.setPersistent(true); + broker.setBrokerName("local"); + broker.setDataDirectory("target/activemq-data"); + broker.setDeleteAllMessagesOnStartup(true); + TransportConnector tc = broker.addConnector(getDefaultMQTTTransportConnectorUri()); + localBrokerMQTTPort = tc.getConnectUri().getPort(); + return broker; + } + + @Override + protected BrokerService createRemoteBroker(PersistenceAdapter persistenceAdapter) throws Exception { + BrokerService broker = super.createRemoteBroker(persistenceAdapter); + broker.setPersistent(true); + broker.setDeleteAllMessagesOnStartup(true); + broker.setDataDirectory("target/activemq-data"); + TransportConnector tc = broker.addConnector(getDefaultMQTTTransportConnectorUri()); + remoteBrokerMQTTPort = tc.getConnectUri().getPort(); + return broker; + } + + private String getDefaultMQTTTransportConnectorUri(){ + return "mqtt://localhost:0?transport.subscriptionStrategy=mqtt-virtual-topic-subscriptions"; + } + + private MQTT createMQTTTcpConnection(String clientId, boolean clean, int port) throws Exception { + MQTT mqtt = new MQTT(); + mqtt.setConnectAttemptsMax(1); + mqtt.setReconnectAttemptsMax(0); + mqtt.setTracer(createTracer()); + if (clientId != null) { + mqtt.setClientId(clientId); + } + mqtt.setCleanSession(clean); + mqtt.setHost("localhost", port); + return mqtt; + } + + protected Tracer createTracer() { + return new Tracer() { + @Override + public void onReceive(MQTTFrame frame) { + LOG.info("Client Received:\n" + frame); + } + + @Override + public void onSend(MQTTFrame frame) { + LOG.info("Client Sent:\n" + frame); + } + + @Override + public void debug(String message, Object... args) { + LOG.info(String.format(message, args)); + } + }; + } +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/network/MulticastNetworkTest.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/network/MulticastNetworkTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/network/MulticastNetworkTest.java new file mode 100644 index 0000000..7813b07 --- /dev/null +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/network/MulticastNetworkTest.java @@ -0,0 +1,38 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.network; + +/** + * + */ +public class MulticastNetworkTest extends SimpleNetworkTest { + + protected String getRemoteBrokerURI() { + return "org/apache/activemq/network/multicast/remoteBroker.xml"; + } + + protected String getLocalBrokerURI() { + return "org/apache/activemq/network/multicast/localBroker.xml"; + } + + // blocked out for multi cast because temp dest request reply isn't supported + // with dynamicallyAddedDestinations + @Override + public void testRequestReply() throws Exception { + + } +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/network/NetworkBrokerDetachTest.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/network/NetworkBrokerDetachTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/network/NetworkBrokerDetachTest.java new file mode 100644 index 0000000..ceef43c --- /dev/null +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/network/NetworkBrokerDetachTest.java @@ -0,0 +1,305 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.network; + +import static org.junit.Assert.assertTrue; + +import java.io.File; +import java.util.Arrays; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +import javax.jms.Connection; +import javax.jms.ConnectionFactory; +import javax.jms.Message; +import javax.jms.MessageListener; +import javax.jms.Session; +import javax.jms.TopicSubscriber; +import javax.management.ObjectName; + +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.ActiveMQPrefetchPolicy; +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.broker.jmx.BrokerView; +import org.apache.activemq.broker.jmx.DestinationViewMBean; +import org.apache.activemq.command.ActiveMQDestination; +import org.apache.activemq.command.ActiveMQTopic; +import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter; +import org.apache.activemq.util.Wait; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class NetworkBrokerDetachTest { + + private final static String BROKER_NAME = "broker"; + private final static String REM_BROKER_NAME = "networkedBroker"; + private final static String DESTINATION_NAME = "testQ"; + private final static int NUM_CONSUMERS = 1; + + protected static final Logger LOG = LoggerFactory.getLogger(NetworkBrokerDetachTest.class); + protected final int numRestarts = 3; + protected final int networkTTL = 2; + protected final boolean dynamicOnly = false; + + protected BrokerService broker; + protected BrokerService networkedBroker; + + protected BrokerService createBroker() throws Exception { + BrokerService broker = new BrokerService(); + broker.setBrokerName(BROKER_NAME); + configureBroker(broker); + broker.addConnector("tcp://localhost:61617"); + NetworkConnector networkConnector = broker.addNetworkConnector("static:(tcp://localhost:62617?wireFormat.maxInactivityDuration=500)?useExponentialBackOff=false"); + configureNetworkConnector(networkConnector); + return broker; + } + + protected BrokerService createNetworkedBroker() throws Exception { + BrokerService broker = new BrokerService(); + broker.setBrokerName(REM_BROKER_NAME); + configureBroker(broker); + broker.getManagementContext().setCreateConnector(false); + broker.addConnector("tcp://localhost:62617"); + NetworkConnector networkConnector = broker.addNetworkConnector("static:(tcp://localhost:61617?wireFormat.maxInactivityDuration=500)?useExponentialBackOff=false"); + configureNetworkConnector(networkConnector); + return broker; + } + + private void configureNetworkConnector(NetworkConnector networkConnector) { + networkConnector.setDuplex(false); + networkConnector.setNetworkTTL(networkTTL); + networkConnector.setDynamicOnly(dynamicOnly); + } + + // variants for each store.... + protected void configureBroker(BrokerService broker) throws Exception { + KahaDBPersistenceAdapter persistenceAdapter = new KahaDBPersistenceAdapter(); + persistenceAdapter.setDirectory(new File("target/activemq-data/kahadb/" + broker.getBrokerName() + "NetworBrokerDetatchTest")); + broker.setPersistenceAdapter(persistenceAdapter); + } + + @Before + public void init() throws Exception { + broker = createBroker(); + broker.setDeleteAllMessagesOnStartup(true); + broker.start(); + + networkedBroker = createNetworkedBroker(); + networkedBroker.setDeleteAllMessagesOnStartup(true); + networkedBroker.start(); + } + + @After + public void cleanup() throws Exception { + networkedBroker.stop(); + networkedBroker.waitUntilStopped(); + + broker.stop(); + broker.waitUntilStopped(); + } + + @Test + public void testNetworkedBrokerDetach() throws Exception { + LOG.info("Creating Consumer on the networked broker ..."); + // Create a consumer on the networked broker + ConnectionFactory consFactory = createConnectionFactory(networkedBroker); + Connection consConn = consFactory.createConnection(); + Session consSession = consConn.createSession(false, Session.AUTO_ACKNOWLEDGE); + ActiveMQDestination destination = (ActiveMQDestination) consSession.createQueue(DESTINATION_NAME); + for(int i=0; i<NUM_CONSUMERS; i++) { + consSession.createConsumer(destination); + } + + assertTrue("got expected consumer count from mbean within time limit", + verifyConsumerCount(1, destination, broker)); + + LOG.info("Stopping Consumer on the networked broker ..."); + // Closing the connection will also close the consumer + consConn.close(); + + // We should have 0 consumer for the queue on the local broker + assertTrue("got expected 0 count from mbean within time limit", verifyConsumerCount(0, destination, broker)); + } + + @Test + public void testNetworkedBrokerDurableSubAfterRestart() throws Exception { + + final AtomicInteger count = new AtomicInteger(0); + MessageListener counter = new MessageListener() { + @Override + public void onMessage(Message message) { + count.incrementAndGet(); + } + }; + + LOG.info("Creating durable consumer on each broker ..."); + ActiveMQTopic destination = registerDurableConsumer(networkedBroker, counter); + registerDurableConsumer(broker, counter); + + assertTrue("got expected consumer count from local broker mbean within time limit", + verifyConsumerCount(2, destination, broker)); + + assertTrue("got expected consumer count from network broker mbean within time limit", + verifyConsumerCount(2, destination, networkedBroker)); + + sendMessageTo(destination, broker); + + assertTrue("Got one message on each", verifyMessageCount(2, count)); + + LOG.info("Stopping brokerTwo..."); + networkedBroker.stop(); + networkedBroker.waitUntilStopped(); + + LOG.info("restarting broker Two..."); + networkedBroker = createNetworkedBroker(); + networkedBroker.start(); + + LOG.info("Recreating durable Consumer on the broker after restart..."); + registerDurableConsumer(networkedBroker, counter); + + // give advisories a chance to percolate + TimeUnit.SECONDS.sleep(5); + + sendMessageTo(destination, broker); + + // expect similar after restart + assertTrue("got expected consumer count from local broker mbean within time limit", + verifyConsumerCount(2, destination, broker)); + + // a durable sub is auto bridged on restart unless dynamicOnly=true + assertTrue("got expected consumer count from network broker mbean within time limit", + verifyConsumerCount(2, destination, networkedBroker)); + + assertTrue("got no inactive subs on broker", verifyDurableConsumerCount(0, broker)); + assertTrue("got no inactive subs on other broker", verifyDurableConsumerCount(0, networkedBroker)); + + assertTrue("Got two more messages after restart", verifyMessageCount(4, count)); + TimeUnit.SECONDS.sleep(1); + assertTrue("still Got just two more messages", verifyMessageCount(4, count)); + } + + private boolean verifyMessageCount(final int i, final AtomicInteger count) throws Exception { + return Wait.waitFor(new Wait.Condition() { + @Override + public boolean isSatisified() throws Exception { + return i == count.get(); + } + }); + } + + private ActiveMQTopic registerDurableConsumer( + BrokerService brokerService, MessageListener listener) throws Exception { + ConnectionFactory factory = createConnectionFactory(brokerService); + Connection connection = factory.createConnection(); + connection.setClientID("DurableOne"); + connection.start(); + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + ActiveMQTopic destination = (ActiveMQTopic) session.createTopic(DESTINATION_NAME); + // unique to a broker + TopicSubscriber sub = session.createDurableSubscriber(destination, "SubOne" + brokerService.getBrokerName()); + sub.setMessageListener(listener); + return destination; + } + + private void sendMessageTo(ActiveMQTopic destination, BrokerService brokerService) throws Exception { + ConnectionFactory factory = createConnectionFactory(brokerService); + Connection conn = factory.createConnection(); + conn.start(); + Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE); + session.createProducer(destination).send(session.createTextMessage("Hi")); + conn.close(); + } + + protected ConnectionFactory createConnectionFactory(final BrokerService broker) throws Exception { + String url = broker.getTransportConnectors().get(0).getServer().getConnectURI().toString(); + ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(url); + connectionFactory.setOptimizedMessageDispatch(true); + connectionFactory.setCopyMessageOnSend(false); + connectionFactory.setUseCompression(false); + connectionFactory.setDispatchAsync(false); + connectionFactory.setUseAsyncSend(false); + connectionFactory.setOptimizeAcknowledge(false); + connectionFactory.setWatchTopicAdvisories(true); + ActiveMQPrefetchPolicy qPrefetchPolicy= new ActiveMQPrefetchPolicy(); + qPrefetchPolicy.setQueuePrefetch(100); + qPrefetchPolicy.setTopicPrefetch(1000); + connectionFactory.setPrefetchPolicy(qPrefetchPolicy); + connectionFactory.setAlwaysSyncSend(true); + return connectionFactory; + } + + // JMX Helper Methods + private boolean verifyConsumerCount(final long expectedCount, final ActiveMQDestination destination, final BrokerService broker) throws Exception { + return Wait.waitFor(new Wait.Condition() { + @Override + public boolean isSatisified() throws Exception { + boolean result = false; + try { + + ObjectName[] destinations; + + if (destination.isQueue()) { + destinations = broker.getAdminView().getQueues(); + } else { + destinations = broker.getAdminView().getTopics(); + } + + // We should have 1 consumer for the queue on the local broker + for (ObjectName name : destinations) { + DestinationViewMBean view = (DestinationViewMBean) + broker.getManagementContext().newProxyInstance(name, DestinationViewMBean.class, true); + + if (view.getName().equals(destination.getPhysicalName())) { + LOG.info("Consumers for " + destination.getPhysicalName() + " on " + broker + " : " + view.getConsumerCount()); + LOG.info("Subs: " + Arrays.asList(view.getSubscriptions())); + if (expectedCount == view.getConsumerCount()) { + result = true; + } + } + } + + } catch (Exception ignoreAndRetry) { + } + return result; + } + }); + } + + private boolean verifyDurableConsumerCount(final long expectedCount, final BrokerService broker) throws Exception { + return Wait.waitFor(new Wait.Condition() { + @Override + public boolean isSatisified() throws Exception { + boolean result = false; + BrokerView view = broker.getAdminView(); + + if (view != null) { + ObjectName[] subs = broker.getAdminView().getInactiveDurableTopicSubscribers(); + if (subs != null) { + LOG.info("inactive durable subs on " + broker + " : " + Arrays.asList(subs)); + if (expectedCount == subs.length) { + result = true; + } + } + } + return result; + } + }); + } +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/network/NetworkConnectionsTest.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/network/NetworkConnectionsTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/network/NetworkConnectionsTest.java new file mode 100644 index 0000000..f91243e --- /dev/null +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/network/NetworkConnectionsTest.java @@ -0,0 +1,245 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.network; + +import junit.framework.TestCase; +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.command.ActiveMQQueue; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.jms.*; + +public class NetworkConnectionsTest extends TestCase { + private static final Logger LOG = LoggerFactory.getLogger(NetworkConnectionsTest.class); + + private static final String LOCAL_BROKER_TRANSPORT_URI = "tcp://localhost:61616"; + private static final String REMOTE_BROKER_TRANSPORT_URI = "tcp://localhost:61617"; + private static final String DESTINATION_NAME = "TEST.RECONNECT"; + + private BrokerService localBroker; + private BrokerService remoteBroker; + + @Test + public void testIsStarted() throws Exception { + LOG.info("testIsStarted is starting..."); + + LOG.info("Adding network connector..."); + NetworkConnector nc = localBroker.addNetworkConnector("static:(" + REMOTE_BROKER_TRANSPORT_URI + ")"); + nc.setName("NC1"); + + LOG.info("Starting network connector..."); + nc.start(); + assertTrue(nc.isStarted()); + + LOG.info("Stopping network connector..."); + nc.stop(); + + while (nc.isStopping()) { + LOG.info("... still stopping ..."); + Thread.sleep(100); + } + + assertTrue(nc.isStopped()); + assertFalse(nc.isStarted()); + + LOG.info("Starting network connector..."); + nc.start(); + assertTrue(nc.isStarted()); + + LOG.info("Stopping network connector..."); + nc.stop(); + + while (nc.isStopping()) { + LOG.info("... still stopping ..."); + Thread.sleep(100); + } + + assertTrue(nc.isStopped()); + assertFalse(nc.isStarted()); + } + + @Test + public void testNetworkConnectionRestart() throws Exception { + LOG.info("testNetworkConnectionRestart is starting..."); + + LOG.info("Adding network connector..."); + NetworkConnector nc = localBroker.addNetworkConnector("static:(" + REMOTE_BROKER_TRANSPORT_URI + ")"); + nc.setName("NC1"); + nc.start(); + assertTrue(nc.isStarted()); + + LOG.info("Setting up Message Producer and Consumer"); + ActiveMQQueue destination = new ActiveMQQueue(DESTINATION_NAME); + + ActiveMQConnectionFactory localFactory = new ActiveMQConnectionFactory(LOCAL_BROKER_TRANSPORT_URI); + Connection localConnection = localFactory.createConnection(); + localConnection.start(); + Session localSession = localConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); + MessageProducer localProducer = localSession.createProducer(destination); + + ActiveMQConnectionFactory remoteFactory = new ActiveMQConnectionFactory(REMOTE_BROKER_TRANSPORT_URI); + Connection remoteConnection = remoteFactory.createConnection(); + remoteConnection.start(); + Session remoteSession = remoteConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); + MessageConsumer remoteConsumer = remoteSession.createConsumer(destination); + + Message message = localSession.createTextMessage("test"); + localProducer.send(message); + + LOG.info("Testing initial network connection..."); + message = remoteConsumer.receive(10000); + assertNotNull(message); + + LOG.info("Stopping network connection..."); + nc.stop(); + assertFalse(nc.isStarted()); + + LOG.info("Sending 2nd message..."); + message = localSession.createTextMessage("test stop"); + localProducer.send(message); + + message = remoteConsumer.receive(1000); + assertNull("Message should not have been delivered since NetworkConnector was stopped", message); + + LOG.info("(Re)starting network connection..."); + nc.start(); + assertTrue(nc.isStarted()); + + LOG.info("Wait for 2nd message to get forwarded and received..."); + message = remoteConsumer.receive(10000); + assertNotNull("Should have received 2nd message", message); + } + + @Test + public void testNetworkConnectionReAddURI() throws Exception { + LOG.info("testNetworkConnectionReAddURI is starting..."); + + LOG.info("Adding network connector 'NC1'..."); + NetworkConnector nc = localBroker.addNetworkConnector("static:(" + REMOTE_BROKER_TRANSPORT_URI + ")"); + nc.setName("NC1"); + nc.start(); + assertTrue(nc.isStarted()); + + LOG.info("Looking up network connector by name..."); + NetworkConnector nc1 = localBroker.getNetworkConnectorByName("NC1"); + assertNotNull("Should find network connector 'NC1'", nc1); + assertTrue(nc1.isStarted()); + assertEquals(nc, nc1); + + LOG.info("Setting up producer and consumer..."); + ActiveMQQueue destination = new ActiveMQQueue(DESTINATION_NAME); + + ActiveMQConnectionFactory localFactory = new ActiveMQConnectionFactory(LOCAL_BROKER_TRANSPORT_URI); + Connection localConnection = localFactory.createConnection(); + localConnection.start(); + Session localSession = localConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); + MessageProducer localProducer = localSession.createProducer(destination); + + ActiveMQConnectionFactory remoteFactory = new ActiveMQConnectionFactory(REMOTE_BROKER_TRANSPORT_URI); + Connection remoteConnection = remoteFactory.createConnection(); + remoteConnection.start(); + Session remoteSession = remoteConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); + MessageConsumer remoteConsumer = remoteSession.createConsumer(destination); + + Message message = localSession.createTextMessage("test"); + localProducer.send(message); + + LOG.info("Testing initial network connection..."); + message = remoteConsumer.receive(10000); + assertNotNull(message); + + LOG.info("Stopping network connector 'NC1'..."); + nc.stop(); + assertFalse(nc.isStarted()); + + LOG.info("Removing network connector..."); + assertTrue(localBroker.removeNetworkConnector(nc)); + + nc1 = localBroker.getNetworkConnectorByName("NC1"); + assertNull("Should not find network connector 'NC1'", nc1); + + LOG.info("Re-adding network connector 'NC2'..."); + nc = localBroker.addNetworkConnector("static:(" + REMOTE_BROKER_TRANSPORT_URI + ")"); + nc.setName("NC2"); + nc.start(); + assertTrue(nc.isStarted()); + + LOG.info("Looking up network connector by name..."); + NetworkConnector nc2 = localBroker.getNetworkConnectorByName("NC2"); + assertNotNull(nc2); + assertTrue(nc2.isStarted()); + assertEquals(nc, nc2); + + LOG.info("Testing re-added network connection..."); + message = localSession.createTextMessage("test"); + localProducer.send(message); + + message = remoteConsumer.receive(10000); + assertNotNull(message); + + LOG.info("Stopping network connector..."); + nc.stop(); + assertFalse(nc.isStarted()); + + LOG.info("Removing network connection 'NC2'"); + assertTrue(localBroker.removeNetworkConnector(nc)); + + nc2 = localBroker.getNetworkConnectorByName("NC2"); + assertNull("Should not find network connector 'NC2'", nc2); + } + + @Override + protected void setUp() throws Exception { + LOG.info("Setting up LocalBroker"); + localBroker = new BrokerService(); + localBroker.setBrokerName("LocalBroker"); + localBroker.setUseJmx(false); + localBroker.setPersistent(false); + localBroker.setTransportConnectorURIs(new String[]{LOCAL_BROKER_TRANSPORT_URI}); + localBroker.start(); + localBroker.waitUntilStarted(); + + LOG.info("Setting up RemoteBroker"); + remoteBroker = new BrokerService(); + remoteBroker.setBrokerName("RemoteBroker"); + remoteBroker.setUseJmx(false); + remoteBroker.setPersistent(false); + remoteBroker.setTransportConnectorURIs(new String[]{REMOTE_BROKER_TRANSPORT_URI}); + remoteBroker.start(); + remoteBroker.waitUntilStarted(); + } + + @Override + protected void tearDown() throws Exception { + if (localBroker.isStarted()) { + LOG.info("Stopping LocalBroker"); + localBroker.stop(); + localBroker.waitUntilStopped(); + localBroker = null; + } + + if (remoteBroker.isStarted()) { + LOG.info("Stopping RemoteBroker"); + remoteBroker.stop(); + remoteBroker.waitUntilStopped(); + remoteBroker = null; + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/network/NetworkDestinationFilterTest.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/network/NetworkDestinationFilterTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/network/NetworkDestinationFilterTest.java new file mode 100644 index 0000000..80c5855 --- /dev/null +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/network/NetworkDestinationFilterTest.java @@ -0,0 +1,45 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.network; + +import junit.framework.TestCase; +import org.apache.activemq.advisory.AdvisorySupport; +import org.apache.activemq.command.ActiveMQDestination; +import org.apache.activemq.command.ActiveMQQueue; +import org.apache.activemq.command.ActiveMQTempQueue; +import org.apache.activemq.command.ActiveMQTopic; + +import java.util.ArrayList; +import java.util.List; + +public class NetworkDestinationFilterTest extends TestCase { + + public void testFilter() throws Exception { + NetworkBridgeConfiguration config = new NetworkBridgeConfiguration(); + assertEquals(AdvisorySupport.CONSUMER_ADVISORY_TOPIC_PREFIX + ">", config.getDestinationFilter()); + List<ActiveMQDestination> dests = new ArrayList<ActiveMQDestination>(); + config.setDynamicallyIncludedDestinations(dests); + assertEquals(AdvisorySupport.CONSUMER_ADVISORY_TOPIC_PREFIX + ">", config.getDestinationFilter()); + dests.add(new ActiveMQQueue("TEST.>")); + dests.add(new ActiveMQTopic("TEST.>")); + dests.add(new ActiveMQTempQueue("TEST.>")); + String prefix = AdvisorySupport.CONSUMER_ADVISORY_TOPIC_PREFIX; + assertEquals(prefix + "Queue.TEST.>," + prefix + "Topic.TEST.>", config.getDestinationFilter()); + } + + +}
