http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/network/DuplexNetworkMBeanTest.java
----------------------------------------------------------------------
diff --git 
a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/network/DuplexNetworkMBeanTest.java
 
b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/network/DuplexNetworkMBeanTest.java
new file mode 100644
index 0000000..213d4ae
--- /dev/null
+++ 
b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/network/DuplexNetworkMBeanTest.java
@@ -0,0 +1,171 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.network;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assume.assumeNotNull;
+
+import java.net.MalformedURLException;
+import java.util.Set;
+
+import javax.management.ObjectInstance;
+import javax.management.ObjectName;
+
+import org.apache.activemq.broker.BrokerService;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class DuplexNetworkMBeanTest {
+
+    protected static final Logger LOG = 
LoggerFactory.getLogger(DuplexNetworkMBeanTest.class);
+    protected final int numRestarts = 3;
+
+    protected BrokerService createBroker() throws Exception {
+        BrokerService broker = new BrokerService();
+        broker.setBrokerName("broker");
+        
broker.addConnector("tcp://localhost:61617?transport.reuseAddress=true");
+
+        return broker;
+    }
+
+    protected BrokerService createNetworkedBroker() throws Exception {
+        BrokerService broker = new BrokerService();
+        broker.setBrokerName("networkedBroker");
+        
broker.addConnector("tcp://localhost:62617?transport.reuseAddress=true");
+        NetworkConnector networkConnector = 
broker.addNetworkConnector("static:(tcp://localhost:61617?wireFormat.maxInactivityDuration=500)?useExponentialBackOff=false");
+        networkConnector.setDuplex(true);
+        return broker;
+    }
+
+    @Test
+    public void testMbeanPresenceOnNetworkBrokerRestart() throws Exception {
+        BrokerService broker = createBroker();
+        try {
+            broker.start();
+            assertEquals(1, countMbeans(broker, "connector", 30000));
+            assertEquals(0, countMbeans(broker, "connectionName"));
+            BrokerService networkedBroker = null;
+            for (int i=0; i<numRestarts; i++) {
+                networkedBroker = createNetworkedBroker();
+                try {
+                    networkedBroker.start();
+                    assertEquals(1, countMbeans(networkedBroker, 
"networkBridge", 2000));
+                    assertEquals(1, countMbeans(broker, "networkBridge", 
2000));
+                    assertEquals(2, countMbeans(broker, "connectionName"));
+                } finally {
+                    networkedBroker.stop();
+                    networkedBroker.waitUntilStopped();
+                }
+                assertEquals(0, countMbeans(networkedBroker, "stopped"));
+                assertEquals(0, countMbeans(broker, "networkBridge"));
+            }
+
+            assertEquals(0, countMbeans(networkedBroker, "networkBridge"));
+            assertEquals(0, countMbeans(networkedBroker, "connector"));
+            assertEquals(0, countMbeans(networkedBroker, "connectionName"));
+            assertEquals(1, countMbeans(broker, "connector"));
+        } finally {
+            broker.stop();
+            broker.waitUntilStopped();
+        }
+    }
+
+    @Test
+    public void testMbeanPresenceOnBrokerRestart() throws Exception {
+
+        BrokerService networkedBroker = createNetworkedBroker();
+        try {
+            networkedBroker.start();
+            assertEquals(1, countMbeans(networkedBroker, 
"connector=networkConnectors", 30000));
+            assertEquals(0, countMbeans(networkedBroker, "connectionName"));
+
+            BrokerService broker = null;
+            for (int i=0; i<numRestarts; i++) {
+                broker = createBroker();
+                try {
+                    broker.start();
+                    assertEquals(1, countMbeans(networkedBroker, 
"networkBridge", 5000));
+                    assertEquals("restart number: " + i, 2, 
countMbeans(broker, "connectionName", 10000));
+                } finally {
+                    broker.stop();
+                    broker.waitUntilStopped();
+                }
+                assertEquals(0, countMbeans(broker, "stopped"));
+            }
+
+            assertEquals(1, countMbeans(networkedBroker, 
"connector=networkConnectors"));
+            assertEquals(0, countMbeans(networkedBroker, "connectionName"));
+            assertEquals(0, countMbeans(broker, "connectionName"));
+        } finally {
+            networkedBroker.stop();
+            networkedBroker.waitUntilStopped();
+        }
+    }
+
+    private int countMbeans(BrokerService broker, String type) throws 
Exception {
+        return countMbeans(broker, type, 0);
+    }
+
+    private int countMbeans(BrokerService broker, String type, int timeout) 
throws Exception {
+        final long expiryTime = System.currentTimeMillis() + timeout;
+
+        if (!type.contains("=")) {
+            type = type + "=*";
+        }
+
+        final ObjectName beanName = new 
ObjectName("org.apache.activemq:type=Broker,brokerName="
+                + broker.getBrokerName() + "," + type +",*");
+        Set<ObjectName> mbeans = null;
+        int count = 0;
+        do {
+            if (timeout > 0) {
+                Thread.sleep(100);
+            }
+
+            LOG.info("Query name: " + beanName);
+            mbeans = broker.getManagementContext().queryNames(beanName, null);
+            if (mbeans != null) {
+                count = mbeans.size();
+            } else {
+                logAllMbeans(broker);
+            }
+        } while ((mbeans == null || mbeans.isEmpty()) && expiryTime > 
System.currentTimeMillis());
+
+        // If port 1099 is in use when the Broker starts, starting the jmx 
connector
+        // will fail.  So, if we have no mbsc to query, skip the test.
+        if (timeout > 0) {
+            assumeNotNull(mbeans);
+        }
+
+        return count;
+    }
+
+    private void logAllMbeans(BrokerService broker) throws 
MalformedURLException {
+        try {
+            // trace all existing MBeans
+            Set<?> all = broker.getManagementContext().queryNames(null, null);
+            LOG.info("Total MBean count=" + all.size());
+            for (Object o : all) {
+                ObjectInstance bean = (ObjectInstance)o;
+                LOG.info(bean.getObjectName().toString());
+            }
+        } catch (Exception ignored) {
+            LOG.warn("getMBeanServer ex: " + ignored);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/network/DuplexNetworkTest.java
----------------------------------------------------------------------
diff --git 
a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/network/DuplexNetworkTest.java
 
b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/network/DuplexNetworkTest.java
new file mode 100644
index 0000000..3afa7d1
--- /dev/null
+++ 
b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/network/DuplexNetworkTest.java
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.network;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import javax.jms.MessageProducer;
+import javax.jms.TemporaryQueue;
+
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.util.Wait;
+import org.junit.Test;
+
+public class DuplexNetworkTest extends SimpleNetworkTest {
+
+    @Override
+    protected String getLocalBrokerURI() {
+        return "org/apache/activemq/network/duplexLocalBroker.xml";
+    }
+
+    @Override
+    protected BrokerService createRemoteBroker() throws Exception {
+        BrokerService broker = new BrokerService();
+        broker.setBrokerName("remoteBroker");
+        broker.addConnector("tcp://localhost:61617");
+        return broker;
+    }
+
+    @Test
+    public void testTempQueues() throws Exception {
+        TemporaryQueue temp = localSession.createTemporaryQueue();
+        MessageProducer producer = localSession.createProducer(temp);
+        producer.send(localSession.createTextMessage("test"));
+        Thread.sleep(100);
+        assertEquals("Destination not created", 1, 
remoteBroker.getAdminView().getTemporaryQueues().length);
+        temp.delete();
+
+        assertTrue("Destination not deleted", Wait.waitFor(new 
Wait.Condition() {
+            @Override
+            public boolean isSatisified() throws Exception {
+                return 0 == 
remoteBroker.getAdminView().getTemporaryQueues().length;
+            }
+        }));
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/network/DynamicallyIncludedDestinationsDuplexNetworkTest.java
----------------------------------------------------------------------
diff --git 
a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/network/DynamicallyIncludedDestinationsDuplexNetworkTest.java
 
b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/network/DynamicallyIncludedDestinationsDuplexNetworkTest.java
new file mode 100644
index 0000000..3dd8be6
--- /dev/null
+++ 
b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/network/DynamicallyIncludedDestinationsDuplexNetworkTest.java
@@ -0,0 +1,117 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.network;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+
+import java.lang.reflect.Field;
+import java.util.concurrent.CopyOnWriteArrayList;
+
+import javax.jms.MessageProducer;
+import javax.jms.TemporaryQueue;
+
+import org.apache.activemq.advisory.AdvisorySupport;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.TransportConnection;
+import org.apache.activemq.broker.TransportConnector;
+import org.junit.Test;
+
+/**
+ * @author <a href="http://www.christianposta.com/blog";>Christian Posta</a>
+ */
+public class DynamicallyIncludedDestinationsDuplexNetworkTest extends 
SimpleNetworkTest {
+
+    private static final int REMOTE_BROKER_TCP_PORT = 61617;
+
+    @Override
+    protected String getLocalBrokerURI() {
+        return 
"org/apache/activemq/network/duplexDynamicIncludedDestLocalBroker.xml";
+    }
+
+    @Override
+    protected BrokerService createRemoteBroker() throws Exception {
+        BrokerService broker = new BrokerService();
+        broker.setBrokerName("remoteBroker");
+        broker.addConnector("tcp://localhost:" + REMOTE_BROKER_TCP_PORT);
+        return broker;
+    }
+
+    // we have to override this, because with dynamicallyIncludedDestinations 
working properly
+    // (see https://issues.apache.org/jira/browse/AMQ-4209) you can't get 
request/response
+    // with temps working (there is no wild card like there is for 
staticallyIncludedDest)
+    //
+    @Override
+    public void testRequestReply() throws Exception {
+
+    }
+
+    @Test
+    public void testTempQueues() throws Exception {
+        TemporaryQueue temp = localSession.createTemporaryQueue();
+        MessageProducer producer = localSession.createProducer(temp);
+        producer.send(localSession.createTextMessage("test"));
+        Thread.sleep(100);
+        assertEquals("Destination not created", 1, 
remoteBroker.getAdminView().getTemporaryQueues().length);
+        temp.delete();
+        Thread.sleep(100);
+        assertEquals("Destination not deleted", 0, 
remoteBroker.getAdminView().getTemporaryQueues().length);
+    }
+
+    @Test
+    public void testDynamicallyIncludedDestinationsForDuplex()  throws 
Exception{
+        // Once the bridge is set up, we should see the filter used for the 
duplex end of the bridge
+        // only subscribe to the specific destinations included in the 
<dynamicallyIncludedDestinations> list
+        // so let's test that the filter is correct, let's also test the 
subscription on the localbroker
+        // is correct
+
+        // the bridge on the remote broker has the correct filter
+        TransportConnection bridgeConnection = 
getDuplexBridgeConnectionFromRemote();
+        assertNotNull(bridgeConnection);
+        DemandForwardingBridge duplexBridge = 
getDuplexBridgeFromConnection(bridgeConnection);
+        assertNotNull(duplexBridge);
+        NetworkBridgeConfiguration configuration = 
getConfigurationFromNetworkBridge(duplexBridge);
+        assertNotNull(configuration);
+        assertFalse("This destinationFilter does not include ONLY the 
destinations specified in dynamicallyIncludedDestinations",
+                
configuration.getDestinationFilter().equals(AdvisorySupport.CONSUMER_ADVISORY_TOPIC_PREFIX
 + ">"));
+        assertEquals("There are other patterns in the destinationFilter that 
shouldn't be there",
+                
"ActiveMQ.Advisory.Consumer.Queue.include.test.foo,ActiveMQ.Advisory.Consumer.Topic.include.test.bar",
+                configuration.getDestinationFilter());
+    }
+
+    private NetworkBridgeConfiguration 
getConfigurationFromNetworkBridge(DemandForwardingBridgeSupport duplexBridge) 
throws NoSuchFieldException, IllegalAccessException {
+        Field f = 
DemandForwardingBridgeSupport.class.getDeclaredField("configuration");
+        f.setAccessible(true);
+        NetworkBridgeConfiguration configuration = 
(NetworkBridgeConfiguration) f.get(duplexBridge);
+        return configuration;
+    }
+
+    private DemandForwardingBridge 
getDuplexBridgeFromConnection(TransportConnection bridgeConnection) throws 
NoSuchFieldException, IllegalAccessException {
+        Field f = TransportConnection.class.getDeclaredField("duplexBridge");
+        f.setAccessible(true);
+        DemandForwardingBridge bridge = (DemandForwardingBridge) 
f.get(bridgeConnection);
+        return bridge;
+    }
+
+    public TransportConnection getDuplexBridgeConnectionFromRemote() {
+        TransportConnector transportConnector = 
remoteBroker.getTransportConnectorByScheme("tcp");
+        CopyOnWriteArrayList<TransportConnection> transportConnections = 
transportConnector.getConnections();
+        TransportConnection duplexBridgeConnectionFromRemote = 
transportConnections.get(0);
+        return duplexBridgeConnectionFromRemote;
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/network/FailoverStaticNetworkTest.java
----------------------------------------------------------------------
diff --git 
a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/network/FailoverStaticNetworkTest.java
 
b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/network/FailoverStaticNetworkTest.java
new file mode 100644
index 0000000..4113f14
--- /dev/null
+++ 
b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/network/FailoverStaticNetworkTest.java
@@ -0,0 +1,467 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.network;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.Vector;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.Session;
+import javax.management.ObjectName;
+import javax.net.ssl.KeyManager;
+import javax.net.ssl.TrustManager;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.SslContext;
+import org.apache.activemq.broker.TransportConnector;
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter;
+import org.apache.activemq.transport.tcp.SslBrokerServiceTest;
+import org.apache.activemq.util.IntrospectionSupport;
+import org.apache.activemq.util.JMXSupport;
+import org.apache.activemq.util.Wait;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class FailoverStaticNetworkTest {
+    protected static final Logger LOG = 
LoggerFactory.getLogger(FailoverStaticNetworkTest.class);
+
+    private final static String DESTINATION_NAME = "testQ";
+    protected BrokerService brokerA;
+    protected BrokerService brokerA1;
+    protected BrokerService brokerB;
+    protected BrokerService brokerC;
+
+
+    private SslContext sslContext;
+
+    protected BrokerService createBroker(String scheme, String listenPort, 
String[] networkToPorts) throws Exception {
+        return createBroker(scheme, listenPort, networkToPorts, null);
+    }
+
+    protected BrokerService createBroker(String scheme, String listenPort, 
String[] networkToPorts,
+                                         HashMap<String, String> networkProps) 
throws Exception {
+        BrokerService broker = new BrokerService();
+        broker.getManagementContext().setCreateConnector(false);
+        broker.setSslContext(sslContext);
+        broker.setDeleteAllMessagesOnStartup(true);
+        broker.setBrokerName("Broker_" + listenPort);
+        // lazy init listener on broker start
+        TransportConnector transportConnector = new TransportConnector();
+        transportConnector.setUri(new URI(scheme + "://localhost:" + 
listenPort));
+        List<TransportConnector> transportConnectors = new 
ArrayList<TransportConnector>();
+        transportConnectors.add(transportConnector);
+        broker.setTransportConnectors(transportConnectors);
+        if (networkToPorts != null && networkToPorts.length > 0) {
+            StringBuilder builder = new StringBuilder("static:(failover:(" + 
scheme + "://localhost:");
+            builder.append(networkToPorts[0]);
+            for (int i=1;i<networkToPorts.length; i++) {
+                builder.append("," + scheme + "://localhost:" + 
networkToPorts[i]);
+            }
+            // limit the reconnects in case of initial random connection to 
slave
+            // leaving randomize on verifies that this config is picked up
+            
builder.append(")?maxReconnectAttempts=0)?useExponentialBackOff=false");
+            NetworkConnector nc = 
broker.addNetworkConnector(builder.toString());
+            if (networkProps != null) {
+                IntrospectionSupport.setProperties(nc, networkProps);
+            }
+        }
+        return broker;
+    }
+
+    private BrokerService createBroker(String listenPort, String dataDir) 
throws Exception {
+        BrokerService broker = new BrokerService();
+        broker.setUseJmx(false);
+        broker.getManagementContext().setCreateConnector(false);
+        broker.setBrokerName("Broker_Shared");
+        // lazy create transport connector on start completion
+        TransportConnector connector = new TransportConnector();
+        connector.setUri(new URI("tcp://localhost:" + listenPort));
+        broker.addConnector(connector);
+        broker.setDataDirectory(dataDir);
+        return broker;
+    }
+
+    @Before
+    public void setUp() throws Exception {
+        KeyManager[] km = SslBrokerServiceTest.getKeyManager();
+        TrustManager[] tm = SslBrokerServiceTest.getTrustManager();
+        sslContext = new SslContext(km, tm, null);
+    }
+
+    @After
+    public void tearDown() throws Exception {
+        brokerB.stop();
+        brokerB.waitUntilStopped();
+
+        brokerA.stop();
+        brokerA.waitUntilStopped();
+
+        if (brokerA1 != null) {
+            brokerA1.stop();
+            brokerA1.waitUntilStopped();
+        }
+
+        if (brokerC != null) {
+            brokerC.stop();
+            brokerC.waitUntilStopped();
+        }
+    }
+
+    @Test
+    public void testSendReceiveAfterReconnect() throws Exception {
+        brokerA = createBroker("tcp", "61617", null);
+        brokerA.start();
+        brokerB = createBroker("tcp", "62617", new String[]{"61617"});
+        brokerB.start();
+        doTestNetworkSendReceive();
+
+        LOG.info("stopping brokerA");
+        brokerA.stop();
+        brokerA.waitUntilStopped();
+
+        LOG.info("restarting brokerA");
+        brokerA = createBroker("tcp", "61617", null);
+        brokerA.start();
+
+        doTestNetworkSendReceive();
+    }
+
+    @Test
+    public void testSendReceiveFailover() throws Exception {
+        brokerA = createBroker("tcp", "61617", null);
+        brokerA.start();
+        brokerB = createBroker("tcp", "62617", new String[]{"61617", "63617"});
+        brokerB.start();
+        doTestNetworkSendReceive();
+
+        // check mbean
+        Set<String> bridgeNames = getNetworkBridgeMBeanName(brokerB);
+        assertEquals("only one bridgeName: " + bridgeNames, 1, 
bridgeNames.size());
+
+        LOG.info("stopping brokerA");
+        brokerA.stop();
+        brokerA.waitUntilStopped();
+
+        LOG.info("restarting brokerA");
+        brokerA = createBroker("tcp", "63617", null);
+        brokerA.start();
+
+        doTestNetworkSendReceive();
+
+        Set<String> otherBridgeNames = getNetworkBridgeMBeanName(brokerB);
+        assertEquals("only one bridgeName: " + otherBridgeNames, 1, 
otherBridgeNames.size());
+
+        assertTrue("there was an addition", 
bridgeNames.addAll(otherBridgeNames));
+    }
+
+    private Set<String> getNetworkBridgeMBeanName(BrokerService brokerB) 
throws Exception {
+        Set<String> names = new HashSet<String>();
+        for (ObjectName objectName : 
brokerB.getManagementContext().queryNames(null, null)) {
+            if (objectName.getKeyProperty("networkBridge") != null) {
+                names.add(objectName.getKeyProperty("networkBridge"));
+            }
+        }
+        return names;
+    }
+
+    @Test
+    public void testSendReceiveFailoverDuplex() throws Exception {
+        final Vector<Throwable> errors = new Vector<Throwable>();
+        final String dataDir = "target/data/shared";
+        brokerA = createBroker("61617", dataDir);
+        brokerA.start();
+
+        final BrokerService slave = createBroker("63617", dataDir);
+        brokerA1 = slave;
+        ExecutorService executor = Executors.newCachedThreadPool();
+        executor.execute(new Runnable() {
+            @Override
+            public void run() {
+                try {
+                    slave.start();
+                } catch (Exception e) {
+                    e.printStackTrace();
+                    errors.add(e);
+                }
+            }
+        });
+        executor.shutdown();
+
+        HashMap<String, String> networkConnectorProps = new HashMap<String, 
String>();
+        networkConnectorProps.put("duplex", "true");
+        brokerB = createBroker("tcp", "62617", new String[]{"61617", "63617"}, 
networkConnectorProps);
+        brokerB.start();
+
+        doTestNetworkSendReceive(brokerA, brokerB);
+        doTestNetworkSendReceive(brokerB, brokerA);
+
+        LOG.info("stopping brokerA (master shared_broker)");
+        brokerA.stop();
+        brokerA.waitUntilStopped();
+
+        // wait for slave to start
+        brokerA1.waitUntilStarted();
+
+        doTestNetworkSendReceive(brokerA1, brokerB);
+        doTestNetworkSendReceive(brokerB, brokerA1);
+
+        assertTrue("No unexpected exceptions " + errors, errors.isEmpty());
+    }
+
+    @Test
+    // master slave piggy in the middle setup
+    public void testSendReceiveFailoverDuplexWithPIM() throws Exception {
+        final String dataDir = "target/data/shared/pim";
+        brokerA = createBroker("61617", dataDir);
+        brokerA.start();
+
+        final BrokerService slave = createBroker("63617", dataDir);
+        brokerA1 = slave;
+        ExecutorService executor = Executors.newCachedThreadPool();
+        executor.execute(new Runnable() {
+            @Override
+            public void run() {
+                try {
+                    slave.start();
+                } catch (Exception e) {
+                    e.printStackTrace();
+                }
+            }
+        });
+        executor.shutdown();
+
+        HashMap<String, String> networkConnectorProps = new HashMap<String, 
String>();
+        networkConnectorProps.put("duplex", "true");
+        networkConnectorProps.put("networkTTL", "2");
+
+        brokerB = createBroker("tcp", "62617", new String[]{"61617", "63617"}, 
networkConnectorProps);
+        brokerB.start();
+
+        assertTrue("all props applied", networkConnectorProps.isEmpty());
+        networkConnectorProps.put("duplex", "true");
+        networkConnectorProps.put("networkTTL", "2");
+
+        brokerC = createBroker("tcp", "64617", new String[]{"61617", "63617"}, 
networkConnectorProps);
+        brokerC.start();
+        assertTrue("all props applied a second time", 
networkConnectorProps.isEmpty());
+
+        doTestNetworkSendReceive(brokerC, brokerB);
+        doTestNetworkSendReceive(brokerB, brokerC);
+
+        LOG.info("stopping brokerA (master shared_broker)");
+        brokerA.stop();
+        brokerA.waitUntilStopped();
+
+        doTestNetworkSendReceive(brokerC, brokerB);
+        doTestNetworkSendReceive(brokerB, brokerC);
+
+        brokerC.stop();
+        brokerC.waitUntilStopped();
+    }
+
+    /**
+     * networked broker started after target so first connect attempt succeeds
+     * start order is important
+     */
+    @Test
+    public void testSendReceive() throws Exception {
+
+        brokerA = createBroker("tcp", "61617", null);
+        brokerA.start();
+        brokerB = createBroker("tcp", "62617", new String[]{"61617","1111"});
+        brokerB.start();
+
+        doTestNetworkSendReceive();
+    }
+
+    @Test
+    public void testSendReceiveSsl() throws Exception {
+
+        brokerA = createBroker("ssl", "61617", null);
+        brokerA.start();
+        brokerB = createBroker("ssl", "62617", new String[]{"61617", "1111"});
+        brokerB.start();
+
+        doTestNetworkSendReceive();
+    }
+
+    @Test
+    public void testRepeatedSendReceiveWithMasterSlaveAlternate() throws 
Exception {
+        doTestRepeatedSendReceiveWithMasterSlaveAlternate(null);
+    }
+
+    @Test
+    public void testRepeatedSendReceiveWithMasterSlaveAlternateDuplex() throws 
Exception {
+        HashMap<String, String> networkConnectorProps = new HashMap<String, 
String>();
+        networkConnectorProps.put("duplex", "true");
+
+        
doTestRepeatedSendReceiveWithMasterSlaveAlternate(networkConnectorProps);
+    }
+
+    public void 
doTestRepeatedSendReceiveWithMasterSlaveAlternate(HashMap<String, String> 
networkConnectorProps) throws Exception {
+
+        brokerB = createBroker("tcp", "62617", new String[]{"61610","61611"}, 
networkConnectorProps);
+        brokerB.start();
+
+        final AtomicBoolean done = new AtomicBoolean(false);
+        ExecutorService executorService = Executors.newCachedThreadPool();
+        executorService.execute(new Runnable() {
+            @Override
+            public void run() {
+                try {
+                    while (!done.get()) {
+                        brokerA = createBroker("tcp", "61610", null);
+                        brokerA.setBrokerName("Pair");
+                        brokerA.setBrokerObjectName(new 
ObjectName(brokerA.getManagementContext().getJmxDomainName() + ":" + 
"BrokerName="
+                                + JMXSupport.encodeObjectNamePart("A") + "," + 
"Type=Broker"));
+                        
((KahaDBPersistenceAdapter)brokerA.getPersistenceAdapter()).getLocker().setLockAcquireSleepInterval(1000);
+                        brokerA.start();
+                        brokerA.waitUntilStopped();
+
+                        // restart after peer taken over
+                        brokerA1.waitUntilStarted();
+                    }
+                } catch (Exception ignored) {
+                    LOG.info("A create/start, unexpected: " + ignored, 
ignored);
+                }
+            }
+        });
+
+        // start with brokerA as master
+        Wait.waitFor(new Wait.Condition() {
+            @Override
+            public boolean isSatisified() throws Exception {
+                return brokerA != null && brokerA.waitUntilStarted();
+            }
+        });
+
+        executorService.execute(new Runnable() {
+            @Override
+            public void run() {
+                try {
+                    while (!done.get()) {
+                        brokerA1 = createBroker("tcp", "61611", null);
+                        brokerA1.setBrokerName("Pair");
+                        // so they can coexist in local jmx we set the object 
name b/c the brokername identifies the shared store
+                        brokerA1.setBrokerObjectName(new 
ObjectName(brokerA.getManagementContext().getJmxDomainName() + ":" + 
"BrokerName="
+                            + JMXSupport.encodeObjectNamePart("A1") + "," + 
"Type=Broker"));
+                        
((KahaDBPersistenceAdapter)brokerA1.getPersistenceAdapter()).getLocker().setLockAcquireSleepInterval(1000);
+                        brokerA1.start();
+                        brokerA1.waitUntilStopped();
+
+                        // restart after peer taken over
+                        brokerA.waitUntilStarted();
+                    }
+                } catch (Exception ignored) {
+                    LOG.info("A1 create/start, unexpected: " + ignored, 
ignored);
+                }
+            }
+        });
+
+        for (int i=0; i<4; i++) {
+            BrokerService currentMaster =  (i%2 == 0 ? brokerA : brokerA1);
+            LOG.info("iteration: " + i + ", using: " + 
currentMaster.getBrokerObjectName().getKeyProperty("BrokerName"));
+            currentMaster.waitUntilStarted();
+
+            doTestNetworkSendReceive(brokerB, currentMaster);
+
+            LOG.info("Stopping " + 
currentMaster.getBrokerObjectName().getKeyProperty("BrokerName"));
+            currentMaster.stop();
+            currentMaster.waitUntilStopped();
+        }
+
+        done.set(true);
+        LOG.info("all done");
+        executorService.shutdownNow();
+    }
+
+    private void doTestNetworkSendReceive() throws Exception, JMSException {
+        doTestNetworkSendReceive(brokerB, brokerA);
+    }
+
+    private void doTestNetworkSendReceive(final BrokerService to, final 
BrokerService from) throws Exception, JMSException {
+
+        LOG.info("Creating Consumer on the networked broker ..." + from);
+
+        SslContext.setCurrentSslContext(sslContext);
+        // Create a consumer on brokerA
+        ConnectionFactory consFactory = createConnectionFactory(from);
+        Connection consConn = consFactory.createConnection();
+        consConn.start();
+        Session consSession = consConn.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
+        ActiveMQDestination destination = (ActiveMQDestination) 
consSession.createQueue(DESTINATION_NAME);
+        final MessageConsumer consumer = 
consSession.createConsumer(destination);
+
+        LOG.info("publishing to " + to);
+
+        sendMessageTo(destination, to);
+
+        boolean gotMessage = Wait.waitFor(new Wait.Condition() {
+            @Override
+            public boolean isSatisified() throws Exception {
+                Message message = consumer.receive(5000);
+                LOG.info("from:  " + 
from.getBrokerObjectName().getKeyProperty("BrokerName") +  ", received: " + 
message);
+                return message != null;
+            }
+        });
+        try {
+            consConn.close();
+        } catch (JMSException ignored) {
+        }
+        assertTrue("consumer on A got message", gotMessage);
+    }
+
+    private void sendMessageTo(ActiveMQDestination destination, BrokerService 
brokerService) throws Exception {
+        ConnectionFactory factory = createConnectionFactory(brokerService);
+        Connection conn = factory.createConnection();
+        conn.start();
+        Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        
session.createProducer(destination).send(session.createTextMessage("Hi"));
+        conn.close();
+    }
+
+    protected ConnectionFactory createConnectionFactory(final BrokerService 
broker) throws Exception {
+        String url = 
broker.getTransportConnectors().get(0).getServer().getConnectURI().toString();
+        ActiveMQConnectionFactory connectionFactory = new 
ActiveMQConnectionFactory(url);
+        connectionFactory.setOptimizedMessageDispatch(true);
+        connectionFactory.setDispatchAsync(false);
+        connectionFactory.setUseAsyncSend(false);
+        connectionFactory.setOptimizeAcknowledge(false);
+        connectionFactory.setAlwaysSyncSend(true);
+        return connectionFactory;
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/network/ForwardingBridgeTest.java
----------------------------------------------------------------------
diff --git 
a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/network/ForwardingBridgeTest.java
 
b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/network/ForwardingBridgeTest.java
new file mode 100644
index 0000000..71006e3
--- /dev/null
+++ 
b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/network/ForwardingBridgeTest.java
@@ -0,0 +1,154 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.network;
+
+import javax.jms.DeliveryMode;
+
+import junit.framework.Test;
+
+import org.apache.activemq.broker.StubConnection;
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.ActiveMQMessage;
+import org.apache.activemq.command.ConnectionInfo;
+import org.apache.activemq.command.ConsumerInfo;
+import org.apache.activemq.command.Message;
+import org.apache.activemq.command.ProducerInfo;
+import org.apache.activemq.command.SessionInfo;
+
+public class ForwardingBridgeTest extends NetworkTestSupport {
+
+    public ActiveMQDestination destination;
+    public byte destinationType;
+    public int deliveryMode;
+    private ForwardingBridge bridge;
+
+    public void initCombosForTestForwardMessageCompressed() {
+        addCombinationValues("deliveryMode", new Object[] {new 
Integer(DeliveryMode.NON_PERSISTENT),
+                                                           new 
Integer(DeliveryMode.PERSISTENT)});
+        addCombinationValues("destinationType", new Object[] {new 
Byte(ActiveMQDestination.QUEUE_TYPE),
+                                                              new 
Byte(ActiveMQDestination.TOPIC_TYPE)});
+    }
+
+    public void testForwardMessageCompressed() throws Exception {
+
+        bridge.setUseCompression(true);
+
+        // Start a producer on local broker
+        StubConnection connection1 = createConnection();
+        ConnectionInfo connectionInfo1 = createConnectionInfo();
+        SessionInfo sessionInfo1 = createSessionInfo(connectionInfo1);
+        ProducerInfo producerInfo = createProducerInfo(sessionInfo1);
+        connection1.send(connectionInfo1);
+        connection1.send(sessionInfo1);
+        connection1.send(producerInfo);
+
+        destination = createDestinationInfo(connection1, connectionInfo1, 
destinationType);
+
+        // Start a consumer on a remote broker
+        StubConnection connection2 = createRemoteConnection();
+        ConnectionInfo connectionInfo2 = createConnectionInfo();
+        SessionInfo sessionInfo2 = createSessionInfo(connectionInfo2);
+        connection2.send(connectionInfo2);
+        connection2.send(sessionInfo2);
+        ConsumerInfo consumerInfo = createConsumerInfo(sessionInfo2, 
destination);
+        connection2.send(consumerInfo);
+        Thread.sleep(1000);
+        // Give forwarding bridge a chance to finish setting up
+        try {
+            Thread.sleep(1000);
+        } catch (InterruptedException ie) {
+            ie.printStackTrace();
+        }
+
+        // Send the message to the local boker.
+        connection1.send(createMessage(producerInfo, destination, 
deliveryMode));
+
+        // Make sure the message was delivered via the remote.
+        Message m = receiveMessage(connection2);
+        assertNotNull(m);
+
+        // Make sure its compressed now
+        ActiveMQMessage message = (ActiveMQMessage) m;
+        assertTrue(message.isCompressed());
+    }
+
+    public void initCombosForTestAddConsumerThenSend() {
+        addCombinationValues("deliveryMode", new Object[] {new 
Integer(DeliveryMode.NON_PERSISTENT),
+                                                           new 
Integer(DeliveryMode.PERSISTENT)});
+        addCombinationValues("destinationType", new Object[] {new 
Byte(ActiveMQDestination.QUEUE_TYPE),
+                                                              new 
Byte(ActiveMQDestination.TOPIC_TYPE)});
+    }
+
+    public void testAddConsumerThenSend() throws Exception {
+        // Start a producer on local broker
+        StubConnection connection1 = createConnection();
+        ConnectionInfo connectionInfo1 = createConnectionInfo();
+        SessionInfo sessionInfo1 = createSessionInfo(connectionInfo1);
+        ProducerInfo producerInfo = createProducerInfo(sessionInfo1);
+        connection1.send(connectionInfo1);
+        connection1.send(sessionInfo1);
+        connection1.send(producerInfo);
+
+        destination = createDestinationInfo(connection1, connectionInfo1, 
destinationType);
+
+        // Start a consumer on a remote broker
+        StubConnection connection2 = createRemoteConnection();
+        ConnectionInfo connectionInfo2 = createConnectionInfo();
+        SessionInfo sessionInfo2 = createSessionInfo(connectionInfo2);
+        connection2.send(connectionInfo2);
+        connection2.send(sessionInfo2);
+        ConsumerInfo consumerInfo = createConsumerInfo(sessionInfo2, 
destination);
+        connection2.send(consumerInfo);
+        Thread.sleep(1000);
+        // Give forwarding bridge a chance to finish setting up
+        try {
+            Thread.sleep(1000);
+        } catch (InterruptedException ie) {
+            ie.printStackTrace();
+        }
+
+        // Send the message to the local boker.
+        connection1.send(createMessage(producerInfo, destination, 
deliveryMode));
+
+        // Make sure the message was delivered via the remote.
+
+        Message m = receiveMessage(connection2);
+        assertNotNull(m);
+    }
+
+    protected void setUp() throws Exception {
+        super.setUp();
+        bridge = new ForwardingBridge(createTransport(), 
createRemoteTransport());
+        bridge.setClientId("local-remote-bridge");
+        bridge.setDispatchAsync(false);
+        bridge.start();
+    }
+
+    protected void tearDown() throws Exception {
+        bridge.stop();
+        super.tearDown();
+    }
+
+    public static Test suite() {
+        return suite(ForwardingBridgeTest.class);
+    }
+
+    public static void main(String[] args) {
+        junit.textui.TestRunner.run(suite());
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/network/MQTTNetworkOfBrokersFailoverTest.java
----------------------------------------------------------------------
diff --git 
a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/network/MQTTNetworkOfBrokersFailoverTest.java
 
b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/network/MQTTNetworkOfBrokersFailoverTest.java
new file mode 100644
index 0000000..928a7a6
--- /dev/null
+++ 
b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/network/MQTTNetworkOfBrokersFailoverTest.java
@@ -0,0 +1,254 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.network;
+
+import java.net.URI;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import javax.jms.Connection;
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
+import javax.jms.Session;
+import javax.management.ObjectName;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.TransportConnector;
+import org.apache.activemq.broker.jmx.BrokerViewMBean;
+import org.apache.activemq.broker.jmx.DurableSubscriptionViewMBean;
+import org.apache.activemq.store.PersistenceAdapter;
+import org.apache.commons.lang.ArrayUtils;
+import org.fusesource.hawtdispatch.Dispatch;
+import org.fusesource.mqtt.client.BlockingConnection;
+import org.fusesource.mqtt.client.MQTT;
+import org.fusesource.mqtt.client.QoS;
+import org.fusesource.mqtt.client.Topic;
+import org.fusesource.mqtt.client.Tracer;
+import org.fusesource.mqtt.codec.MQTTFrame;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Created by ceposta
+ * <a href="http://christianposta.com/blog>http://christianposta.com/blog</a>.
+ */
+public class MQTTNetworkOfBrokersFailoverTest extends NetworkTestSupport {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(MQTTNetworkOfBrokersFailoverTest.class);
+    private int localBrokerMQTTPort = -1;
+    private int remoteBrokerMQTTPort = -1;
+
+    @Override
+    protected void setUp() throws Exception {
+        useJmx=true;
+        super.setUp();
+
+        URI ncUri = new URI("static:(" + connector.getConnectUri().toString() 
+ ")");
+        NetworkConnector nc = new DiscoveryNetworkConnector(ncUri);
+        nc.setDuplex(true);
+        remoteBroker.addNetworkConnector(nc);
+        nc.start();
+
+        // mqtt port should have been assigned by now
+        assertFalse(localBrokerMQTTPort == -1);
+        assertFalse(remoteBrokerMQTTPort == -1);
+    }
+
+    @Override
+    protected void tearDown() throws Exception {
+        if (remoteBroker.isStarted()) {
+            remoteBroker.stop();
+            remoteBroker.waitUntilStopped();
+        }
+        if (broker.isStarted()) {
+            broker.stop();
+            broker.waitUntilStopped();
+        }
+        super.tearDown();
+    }
+
+    @Test
+    public void testNoStaleSubscriptionAcrossNetwork() throws Exception {
+
+        // before we get started, we want an async way to be able to know when
+        // the durable consumer has been networked so we can assert that it 
indeed
+        // would have a durable subscriber. for example, when we subscribe on 
remote broker,
+        // a network-sub would be created on local broker and we want to 
listen for when that
+        // even happens. we do that with advisory messages and a latch:
+        CountDownLatch consumerNetworked = listenForConsumersOn(broker);
+
+        // create a subscription with Clean == 0 (durable sub for QoS==1 && 
QoS==2)
+        // on the remote broker. this sub should still be there after we 
disconnect
+        MQTT remoteMqtt = createMQTTTcpConnection("foo", false, 
remoteBrokerMQTTPort);
+        BlockingConnection remoteConn = remoteMqtt.blockingConnection();
+        remoteConn.connect();
+        remoteConn.subscribe(new Topic[]{new Topic("foo/bar", 
QoS.AT_LEAST_ONCE)});
+
+        assertTrue("No destination detected!", consumerNetworked.await(1, 
TimeUnit.SECONDS));
+        assertQueueExistsOn(remoteBroker, 
"Consumer.foo_AT_LEAST_ONCE.VirtualTopic.foo.bar");
+        assertQueueExistsOn(broker, 
"Consumer.foo_AT_LEAST_ONCE.VirtualTopic.foo.bar");
+        remoteConn.disconnect();
+
+        // now we reconnect the same sub on the local broker, again with 
clean==0
+        MQTT localMqtt = createMQTTTcpConnection("foo", false, 
localBrokerMQTTPort);
+        BlockingConnection localConn = localMqtt.blockingConnection();
+        localConn.connect();
+        localConn.subscribe(new Topic[]{new Topic("foo/bar", 
QoS.AT_LEAST_ONCE)});
+
+        // now let's connect back up to remote broker and send a message
+        remoteConn = remoteMqtt.blockingConnection();
+        remoteConn.connect();
+        remoteConn.publish("foo/bar", "Hello, World!".getBytes(), 
QoS.AT_LEAST_ONCE, false);
+
+        // now we should see that message on the local broker because the 
subscription
+        // should have been properly networked... we'll give a sec of grace 
for the
+        // networking and forwarding to have happened properly
+        org.fusesource.mqtt.client.Message msg = localConn.receive(100, 
TimeUnit.SECONDS);
+        assertNotNull(msg);
+        msg.ack();
+        String response = new String(msg.getPayload());
+        assertEquals("Hello, World!", response);
+        assertEquals("foo/bar", msg.getTopic());
+
+        // Now... we SHOULD NOT see a message on the remote broker because we 
already
+        // consumed it on the local broker... having the same message on the 
remote broker
+        // would effectively give us duplicates in a distributed topic 
scenario:
+        remoteConn.subscribe(new Topic[]{new Topic("foo/bar", 
QoS.AT_LEAST_ONCE)});
+        msg = remoteConn.receive(500, TimeUnit.MILLISECONDS);
+        assertNull("We have duplicate messages across the cluster for a 
distributed topic", msg);
+    }
+
+    private CountDownLatch listenForConsumersOn(BrokerService broker) throws 
Exception {
+        final CountDownLatch latch = new CountDownLatch(1);
+
+        URI brokerUri = broker.getVmConnectorURI();
+
+        final ActiveMQConnectionFactory cf = new 
ActiveMQConnectionFactory(brokerUri.toASCIIString());
+        final Connection connection = cf.createConnection();
+        connection.start();
+        final Session session = connection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
+        Destination dest = 
session.createTopic("ActiveMQ.Advisory.Consumer.Queue.Consumer.foo:AT_LEAST_ONCE.VirtualTopic.foo.bar");
+        MessageConsumer consumer = session.createConsumer(dest);
+        consumer.setMessageListener(new MessageListener() {
+            @Override
+            public void onMessage(Message message) {
+                latch.countDown();
+                // shutdown this connection
+                Dispatch.getGlobalQueue().execute(new Runnable() {
+                    @Override
+                    public void run() {
+                        try {
+                            session.close();
+                            connection.close();
+                        } catch (JMSException e) {
+                            e.printStackTrace();
+                        }
+                    }
+                });
+            }
+        });
+
+        return latch;
+    }
+
+    private void assertQueueExistsOn(BrokerService broker, String queueName) 
throws Exception {
+        BrokerViewMBean brokerView = broker.getAdminView();
+        ObjectName[] queueNames = brokerView.getQueues();
+        assertEquals(1, queueNames.length);
+
+        assertTrue(queueNames[0].toString().contains(queueName));
+    }
+
+    @SuppressWarnings("unused")
+    private void assertOneDurableSubOn(BrokerService broker, String subName) 
throws Exception {
+        BrokerViewMBean brokerView = broker.getAdminView();
+        ObjectName[] activeDurableSubs = 
brokerView.getDurableTopicSubscribers();
+        ObjectName[] inactiveDurableSubs = 
brokerView.getInactiveDurableTopicSubscribers();
+        ObjectName[] allDurables = (ObjectName[]) 
ArrayUtils.addAll(activeDurableSubs, inactiveDurableSubs);
+        assertEquals(1, allDurables.length);
+
+        // at this point our assertions should prove that we have only on 
durable sub
+        DurableSubscriptionViewMBean durableSubView = 
(DurableSubscriptionViewMBean)
+                broker.getManagementContext().newProxyInstance(allDurables[0], 
DurableSubscriptionViewMBean.class, true);
+
+        assertEquals(subName, durableSubView.getClientId());
+    }
+
+    @Override
+    protected BrokerService createBroker() throws Exception {
+        BrokerService broker =  super.createBroker();
+        broker.setPersistent(true);
+        broker.setBrokerName("local");
+        broker.setDataDirectory("target/activemq-data");
+        broker.setDeleteAllMessagesOnStartup(true);
+        TransportConnector tc = 
broker.addConnector(getDefaultMQTTTransportConnectorUri());
+        localBrokerMQTTPort = tc.getConnectUri().getPort();
+        return broker;
+    }
+
+    @Override
+    protected BrokerService createRemoteBroker(PersistenceAdapter 
persistenceAdapter) throws Exception {
+        BrokerService broker = super.createRemoteBroker(persistenceAdapter);
+        broker.setPersistent(true);
+        broker.setDeleteAllMessagesOnStartup(true);
+        broker.setDataDirectory("target/activemq-data");
+        TransportConnector tc = 
broker.addConnector(getDefaultMQTTTransportConnectorUri());
+        remoteBrokerMQTTPort = tc.getConnectUri().getPort();
+        return broker;
+    }
+
+    private String getDefaultMQTTTransportConnectorUri(){
+        return 
"mqtt://localhost:0?transport.subscriptionStrategy=mqtt-virtual-topic-subscriptions";
+    }
+
+    private MQTT createMQTTTcpConnection(String clientId, boolean clean, int 
port) throws Exception {
+        MQTT mqtt = new MQTT();
+        mqtt.setConnectAttemptsMax(1);
+        mqtt.setReconnectAttemptsMax(0);
+        mqtt.setTracer(createTracer());
+        if (clientId != null) {
+            mqtt.setClientId(clientId);
+        }
+        mqtt.setCleanSession(clean);
+        mqtt.setHost("localhost", port);
+        return mqtt;
+    }
+
+    protected Tracer createTracer() {
+        return new Tracer() {
+            @Override
+            public void onReceive(MQTTFrame frame) {
+                LOG.info("Client Received:\n" + frame);
+            }
+
+            @Override
+            public void onSend(MQTTFrame frame) {
+                LOG.info("Client Sent:\n" + frame);
+            }
+
+            @Override
+            public void debug(String message, Object... args) {
+                LOG.info(String.format(message, args));
+            }
+        };
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/network/MulticastNetworkTest.java
----------------------------------------------------------------------
diff --git 
a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/network/MulticastNetworkTest.java
 
b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/network/MulticastNetworkTest.java
new file mode 100644
index 0000000..7813b07
--- /dev/null
+++ 
b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/network/MulticastNetworkTest.java
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.network;
+
+/**
+ * 
+ */
+public class MulticastNetworkTest extends SimpleNetworkTest {
+
+    protected String getRemoteBrokerURI() {
+        return "org/apache/activemq/network/multicast/remoteBroker.xml";
+    }
+
+    protected String getLocalBrokerURI() {
+        return "org/apache/activemq/network/multicast/localBroker.xml";
+    }
+
+    // blocked out for multi cast because temp dest request reply isn't 
supported
+    // with dynamicallyAddedDestinations
+    @Override
+    public void testRequestReply() throws Exception {
+
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/network/NetworkBrokerDetachTest.java
----------------------------------------------------------------------
diff --git 
a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/network/NetworkBrokerDetachTest.java
 
b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/network/NetworkBrokerDetachTest.java
new file mode 100644
index 0000000..ceef43c
--- /dev/null
+++ 
b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/network/NetworkBrokerDetachTest.java
@@ -0,0 +1,305 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.network;
+
+import static org.junit.Assert.assertTrue;
+
+import java.io.File;
+import java.util.Arrays;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.Message;
+import javax.jms.MessageListener;
+import javax.jms.Session;
+import javax.jms.TopicSubscriber;
+import javax.management.ObjectName;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.ActiveMQPrefetchPolicy;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.jmx.BrokerView;
+import org.apache.activemq.broker.jmx.DestinationViewMBean;
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.ActiveMQTopic;
+import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter;
+import org.apache.activemq.util.Wait;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class NetworkBrokerDetachTest {
+
+    private final static String BROKER_NAME = "broker";
+    private final static String REM_BROKER_NAME = "networkedBroker";
+    private final static String DESTINATION_NAME = "testQ";
+    private final static int NUM_CONSUMERS = 1;
+
+    protected static final Logger LOG = 
LoggerFactory.getLogger(NetworkBrokerDetachTest.class);
+    protected final int numRestarts = 3;
+    protected final int networkTTL = 2;
+    protected final boolean dynamicOnly = false;
+
+    protected BrokerService broker;
+    protected BrokerService networkedBroker;
+
+    protected BrokerService createBroker() throws Exception {
+        BrokerService broker = new BrokerService();
+        broker.setBrokerName(BROKER_NAME);
+        configureBroker(broker);
+        broker.addConnector("tcp://localhost:61617");
+        NetworkConnector networkConnector = 
broker.addNetworkConnector("static:(tcp://localhost:62617?wireFormat.maxInactivityDuration=500)?useExponentialBackOff=false");
+        configureNetworkConnector(networkConnector);
+        return broker;
+    }
+
+    protected BrokerService createNetworkedBroker() throws Exception {
+        BrokerService broker = new BrokerService();
+        broker.setBrokerName(REM_BROKER_NAME);
+        configureBroker(broker);
+        broker.getManagementContext().setCreateConnector(false);
+        broker.addConnector("tcp://localhost:62617");
+        NetworkConnector networkConnector = 
broker.addNetworkConnector("static:(tcp://localhost:61617?wireFormat.maxInactivityDuration=500)?useExponentialBackOff=false");
+        configureNetworkConnector(networkConnector);
+        return broker;
+    }
+
+    private void configureNetworkConnector(NetworkConnector networkConnector) {
+        networkConnector.setDuplex(false);
+        networkConnector.setNetworkTTL(networkTTL);
+        networkConnector.setDynamicOnly(dynamicOnly);
+    }
+
+    // variants for each store....
+    protected void configureBroker(BrokerService broker) throws Exception {
+        KahaDBPersistenceAdapter persistenceAdapter = new 
KahaDBPersistenceAdapter();
+        persistenceAdapter.setDirectory(new 
File("target/activemq-data/kahadb/" + broker.getBrokerName() + 
"NetworBrokerDetatchTest"));
+        broker.setPersistenceAdapter(persistenceAdapter);
+    }
+
+    @Before
+    public void init() throws Exception {
+        broker = createBroker();
+        broker.setDeleteAllMessagesOnStartup(true);
+        broker.start();
+
+        networkedBroker = createNetworkedBroker();
+        networkedBroker.setDeleteAllMessagesOnStartup(true);
+        networkedBroker.start();
+    }
+
+    @After
+    public void cleanup() throws Exception {
+        networkedBroker.stop();
+        networkedBroker.waitUntilStopped();
+
+        broker.stop();
+        broker.waitUntilStopped();
+    }
+
+    @Test
+    public void testNetworkedBrokerDetach() throws Exception {
+        LOG.info("Creating Consumer on the networked broker ...");
+        // Create a consumer on the networked broker
+        ConnectionFactory consFactory = 
createConnectionFactory(networkedBroker);
+        Connection consConn = consFactory.createConnection();
+        Session consSession = consConn.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
+        ActiveMQDestination destination = (ActiveMQDestination) 
consSession.createQueue(DESTINATION_NAME);
+        for(int i=0; i<NUM_CONSUMERS; i++) {
+            consSession.createConsumer(destination);
+        }
+
+        assertTrue("got expected consumer count from mbean within time limit",
+                   verifyConsumerCount(1, destination, broker));
+
+        LOG.info("Stopping Consumer on the networked broker ...");
+        // Closing the connection will also close the consumer
+        consConn.close();
+
+        // We should have 0 consumer for the queue on the local broker
+        assertTrue("got expected 0 count from mbean within time limit", 
verifyConsumerCount(0, destination, broker));
+    }
+
+    @Test
+    public void testNetworkedBrokerDurableSubAfterRestart() throws Exception {
+
+        final AtomicInteger count = new AtomicInteger(0);
+        MessageListener counter = new MessageListener() {
+            @Override
+            public void onMessage(Message message) {
+                count.incrementAndGet();
+            }
+        };
+
+        LOG.info("Creating durable consumer on each broker ...");
+        ActiveMQTopic destination = registerDurableConsumer(networkedBroker, 
counter);
+        registerDurableConsumer(broker, counter);
+
+        assertTrue("got expected consumer count from local broker mbean within 
time limit",
+                verifyConsumerCount(2, destination, broker));
+
+        assertTrue("got expected consumer count from network broker mbean 
within time limit",
+                verifyConsumerCount(2, destination, networkedBroker));
+
+        sendMessageTo(destination, broker);
+
+        assertTrue("Got one message on each", verifyMessageCount(2, count));
+
+        LOG.info("Stopping brokerTwo...");
+        networkedBroker.stop();
+        networkedBroker.waitUntilStopped();
+
+        LOG.info("restarting  broker Two...");
+        networkedBroker = createNetworkedBroker();
+        networkedBroker.start();
+
+        LOG.info("Recreating durable Consumer on the broker after restart...");
+        registerDurableConsumer(networkedBroker, counter);
+
+        // give advisories a chance to percolate
+        TimeUnit.SECONDS.sleep(5);
+
+        sendMessageTo(destination, broker);
+
+        // expect similar after restart
+        assertTrue("got expected consumer count from local broker mbean within 
time limit",
+                verifyConsumerCount(2, destination, broker));
+
+        // a durable sub is auto bridged on restart unless dynamicOnly=true
+        assertTrue("got expected consumer count from network broker mbean 
within time limit",
+                verifyConsumerCount(2, destination, networkedBroker));
+
+        assertTrue("got no inactive subs on broker", 
verifyDurableConsumerCount(0, broker));
+        assertTrue("got no inactive subs on other broker", 
verifyDurableConsumerCount(0, networkedBroker));
+
+        assertTrue("Got two more messages after restart", 
verifyMessageCount(4, count));
+        TimeUnit.SECONDS.sleep(1);
+        assertTrue("still Got just two more messages", verifyMessageCount(4, 
count));
+    }
+
+    private boolean verifyMessageCount(final int i, final AtomicInteger count) 
throws Exception {
+        return Wait.waitFor(new Wait.Condition() {
+            @Override
+            public boolean isSatisified() throws Exception {
+                return i == count.get();
+            }
+        });
+    }
+
+    private ActiveMQTopic registerDurableConsumer(
+            BrokerService brokerService, MessageListener listener) throws 
Exception {
+        ConnectionFactory factory = createConnectionFactory(brokerService);
+        Connection connection = factory.createConnection();
+        connection.setClientID("DurableOne");
+        connection.start();
+        Session session = connection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
+        ActiveMQTopic destination = (ActiveMQTopic) 
session.createTopic(DESTINATION_NAME);
+        // unique to a broker
+        TopicSubscriber sub = session.createDurableSubscriber(destination, 
"SubOne" + brokerService.getBrokerName());
+        sub.setMessageListener(listener);
+        return destination;
+    }
+
+    private void sendMessageTo(ActiveMQTopic destination, BrokerService 
brokerService) throws Exception {
+        ConnectionFactory factory = createConnectionFactory(brokerService);
+        Connection conn = factory.createConnection();
+        conn.start();
+        Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        
session.createProducer(destination).send(session.createTextMessage("Hi"));
+        conn.close();
+    }
+
+    protected ConnectionFactory createConnectionFactory(final BrokerService 
broker) throws Exception {
+        String url = 
broker.getTransportConnectors().get(0).getServer().getConnectURI().toString();
+        ActiveMQConnectionFactory connectionFactory = new 
ActiveMQConnectionFactory(url);
+        connectionFactory.setOptimizedMessageDispatch(true);
+        connectionFactory.setCopyMessageOnSend(false);
+        connectionFactory.setUseCompression(false);
+        connectionFactory.setDispatchAsync(false);
+        connectionFactory.setUseAsyncSend(false);
+        connectionFactory.setOptimizeAcknowledge(false);
+        connectionFactory.setWatchTopicAdvisories(true);
+        ActiveMQPrefetchPolicy qPrefetchPolicy= new ActiveMQPrefetchPolicy();
+        qPrefetchPolicy.setQueuePrefetch(100);
+        qPrefetchPolicy.setTopicPrefetch(1000);
+        connectionFactory.setPrefetchPolicy(qPrefetchPolicy);
+        connectionFactory.setAlwaysSyncSend(true);
+        return connectionFactory;
+    }
+
+    // JMX Helper Methods
+    private boolean verifyConsumerCount(final long expectedCount, final 
ActiveMQDestination destination, final BrokerService broker) throws Exception {
+        return Wait.waitFor(new Wait.Condition() {
+            @Override
+            public boolean isSatisified() throws Exception {
+                boolean result = false;
+                try {
+
+                    ObjectName[] destinations;
+
+                    if (destination.isQueue()) {
+                        destinations = broker.getAdminView().getQueues();
+                    } else {
+                        destinations = broker.getAdminView().getTopics();
+                    }
+
+                    // We should have 1 consumer for the queue on the local 
broker
+                    for (ObjectName name : destinations) {
+                        DestinationViewMBean view = (DestinationViewMBean)
+                            
broker.getManagementContext().newProxyInstance(name, 
DestinationViewMBean.class, true);
+
+                        if 
(view.getName().equals(destination.getPhysicalName())) {
+                            LOG.info("Consumers for " + 
destination.getPhysicalName() + " on " + broker + " : " + 
view.getConsumerCount());
+                            LOG.info("Subs: " + 
Arrays.asList(view.getSubscriptions()));
+                            if (expectedCount == view.getConsumerCount()) {
+                                result = true;
+                            }
+                        }
+                    }
+
+                } catch (Exception ignoreAndRetry) {
+                }
+                return result;
+            }
+        });
+    }
+
+    private boolean verifyDurableConsumerCount(final long expectedCount, final 
BrokerService broker) throws Exception {
+        return Wait.waitFor(new Wait.Condition() {
+            @Override
+            public boolean isSatisified() throws Exception {
+                boolean result = false;
+                BrokerView view = broker.getAdminView();
+
+                if (view != null) {
+                    ObjectName[] subs = 
broker.getAdminView().getInactiveDurableTopicSubscribers();
+                    if (subs != null) {
+                        LOG.info("inactive durable subs on " + broker + " : " 
+ Arrays.asList(subs));
+                        if (expectedCount == subs.length) {
+                            result = true;
+                        }
+                    }
+                }
+                return result;
+            }
+        });
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/network/NetworkConnectionsTest.java
----------------------------------------------------------------------
diff --git 
a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/network/NetworkConnectionsTest.java
 
b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/network/NetworkConnectionsTest.java
new file mode 100644
index 0000000..f91243e
--- /dev/null
+++ 
b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/network/NetworkConnectionsTest.java
@@ -0,0 +1,245 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.network;
+
+import junit.framework.TestCase;
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.command.ActiveMQQueue;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.jms.*;
+
+public class NetworkConnectionsTest extends TestCase {
+    private static final Logger LOG = 
LoggerFactory.getLogger(NetworkConnectionsTest.class);
+
+    private static final String LOCAL_BROKER_TRANSPORT_URI = 
"tcp://localhost:61616";
+    private static final String REMOTE_BROKER_TRANSPORT_URI = 
"tcp://localhost:61617";
+    private static final String DESTINATION_NAME = "TEST.RECONNECT";
+
+    private BrokerService localBroker;
+    private BrokerService remoteBroker;
+
+    @Test
+    public void testIsStarted() throws Exception {
+        LOG.info("testIsStarted is starting...");
+
+        LOG.info("Adding network connector...");
+        NetworkConnector nc = localBroker.addNetworkConnector("static:(" + 
REMOTE_BROKER_TRANSPORT_URI + ")");
+        nc.setName("NC1");
+
+        LOG.info("Starting network connector...");
+        nc.start();
+        assertTrue(nc.isStarted());
+
+        LOG.info("Stopping network connector...");
+        nc.stop();
+
+        while (nc.isStopping()) {
+            LOG.info("... still stopping ...");
+            Thread.sleep(100);
+        }
+
+        assertTrue(nc.isStopped());
+        assertFalse(nc.isStarted());
+
+        LOG.info("Starting network connector...");
+        nc.start();
+        assertTrue(nc.isStarted());
+
+        LOG.info("Stopping network connector...");
+        nc.stop();
+
+        while (nc.isStopping()) {
+            LOG.info("... still stopping ...");
+            Thread.sleep(100);
+        }
+
+        assertTrue(nc.isStopped());
+        assertFalse(nc.isStarted());
+    }
+
+    @Test
+    public void testNetworkConnectionRestart() throws Exception {
+        LOG.info("testNetworkConnectionRestart is starting...");
+
+        LOG.info("Adding network connector...");
+        NetworkConnector nc = localBroker.addNetworkConnector("static:(" + 
REMOTE_BROKER_TRANSPORT_URI + ")");
+        nc.setName("NC1");
+        nc.start();
+        assertTrue(nc.isStarted());
+
+        LOG.info("Setting up Message Producer and Consumer");
+        ActiveMQQueue destination = new ActiveMQQueue(DESTINATION_NAME);
+
+        ActiveMQConnectionFactory localFactory = new 
ActiveMQConnectionFactory(LOCAL_BROKER_TRANSPORT_URI);
+        Connection localConnection = localFactory.createConnection();
+        localConnection.start();
+        Session localSession = localConnection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
+        MessageProducer localProducer = 
localSession.createProducer(destination);
+
+        ActiveMQConnectionFactory remoteFactory = new 
ActiveMQConnectionFactory(REMOTE_BROKER_TRANSPORT_URI);
+        Connection remoteConnection = remoteFactory.createConnection();
+        remoteConnection.start();
+        Session remoteSession = remoteConnection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
+        MessageConsumer remoteConsumer = 
remoteSession.createConsumer(destination);
+
+        Message message = localSession.createTextMessage("test");
+        localProducer.send(message);
+
+        LOG.info("Testing initial network connection...");
+        message = remoteConsumer.receive(10000);
+        assertNotNull(message);
+
+        LOG.info("Stopping network connection...");
+        nc.stop();
+        assertFalse(nc.isStarted());
+
+        LOG.info("Sending 2nd message...");
+        message = localSession.createTextMessage("test stop");
+        localProducer.send(message);
+
+        message = remoteConsumer.receive(1000);
+        assertNull("Message should not have been delivered since 
NetworkConnector was stopped", message);
+
+        LOG.info("(Re)starting network connection...");
+        nc.start();
+        assertTrue(nc.isStarted());
+
+        LOG.info("Wait for 2nd message to get forwarded and received...");
+        message = remoteConsumer.receive(10000);
+        assertNotNull("Should have received 2nd message", message);
+    }
+
+    @Test
+    public void testNetworkConnectionReAddURI() throws Exception {
+        LOG.info("testNetworkConnectionReAddURI is starting...");
+
+        LOG.info("Adding network connector 'NC1'...");
+        NetworkConnector nc = localBroker.addNetworkConnector("static:(" + 
REMOTE_BROKER_TRANSPORT_URI + ")");
+        nc.setName("NC1");
+        nc.start();
+        assertTrue(nc.isStarted());
+
+        LOG.info("Looking up network connector by name...");
+        NetworkConnector nc1 = localBroker.getNetworkConnectorByName("NC1");
+        assertNotNull("Should find network connector 'NC1'", nc1);
+        assertTrue(nc1.isStarted());
+        assertEquals(nc, nc1);
+
+        LOG.info("Setting up producer and consumer...");
+        ActiveMQQueue destination = new ActiveMQQueue(DESTINATION_NAME);
+
+        ActiveMQConnectionFactory localFactory = new 
ActiveMQConnectionFactory(LOCAL_BROKER_TRANSPORT_URI);
+        Connection localConnection = localFactory.createConnection();
+        localConnection.start();
+        Session localSession = localConnection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
+        MessageProducer localProducer = 
localSession.createProducer(destination);
+
+        ActiveMQConnectionFactory remoteFactory = new 
ActiveMQConnectionFactory(REMOTE_BROKER_TRANSPORT_URI);
+        Connection remoteConnection = remoteFactory.createConnection();
+        remoteConnection.start();
+        Session remoteSession = remoteConnection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
+        MessageConsumer remoteConsumer = 
remoteSession.createConsumer(destination);
+
+        Message message = localSession.createTextMessage("test");
+        localProducer.send(message);
+
+        LOG.info("Testing initial network connection...");
+        message = remoteConsumer.receive(10000);
+        assertNotNull(message);
+
+        LOG.info("Stopping network connector 'NC1'...");
+        nc.stop();
+        assertFalse(nc.isStarted());
+
+        LOG.info("Removing network connector...");
+        assertTrue(localBroker.removeNetworkConnector(nc));
+
+        nc1 = localBroker.getNetworkConnectorByName("NC1");
+        assertNull("Should not find network connector 'NC1'", nc1);
+
+        LOG.info("Re-adding network connector 'NC2'...");
+        nc = localBroker.addNetworkConnector("static:(" + 
REMOTE_BROKER_TRANSPORT_URI + ")");
+        nc.setName("NC2");
+        nc.start();
+        assertTrue(nc.isStarted());
+
+        LOG.info("Looking up network connector by name...");
+        NetworkConnector nc2 = localBroker.getNetworkConnectorByName("NC2");
+        assertNotNull(nc2);
+        assertTrue(nc2.isStarted());
+        assertEquals(nc, nc2);
+
+        LOG.info("Testing re-added network connection...");
+        message = localSession.createTextMessage("test");
+        localProducer.send(message);
+
+        message = remoteConsumer.receive(10000);
+        assertNotNull(message);
+
+        LOG.info("Stopping network connector...");
+        nc.stop();
+        assertFalse(nc.isStarted());
+
+        LOG.info("Removing network connection 'NC2'");
+        assertTrue(localBroker.removeNetworkConnector(nc));
+
+        nc2 = localBroker.getNetworkConnectorByName("NC2");
+        assertNull("Should not find network connector 'NC2'", nc2);
+    }
+
+    @Override
+    protected void setUp() throws Exception {
+        LOG.info("Setting up LocalBroker");
+        localBroker = new BrokerService();
+        localBroker.setBrokerName("LocalBroker");
+        localBroker.setUseJmx(false);
+        localBroker.setPersistent(false);
+        localBroker.setTransportConnectorURIs(new 
String[]{LOCAL_BROKER_TRANSPORT_URI});
+        localBroker.start();
+        localBroker.waitUntilStarted();
+
+        LOG.info("Setting up RemoteBroker");
+        remoteBroker = new BrokerService();
+        remoteBroker.setBrokerName("RemoteBroker");
+        remoteBroker.setUseJmx(false);
+        remoteBroker.setPersistent(false);
+        remoteBroker.setTransportConnectorURIs(new 
String[]{REMOTE_BROKER_TRANSPORT_URI});
+        remoteBroker.start();
+        remoteBroker.waitUntilStarted();
+    }
+
+    @Override
+    protected void tearDown() throws Exception {
+        if (localBroker.isStarted()) {
+            LOG.info("Stopping LocalBroker");
+            localBroker.stop();
+            localBroker.waitUntilStopped();
+            localBroker = null;
+        }
+
+        if (remoteBroker.isStarted()) {
+            LOG.info("Stopping RemoteBroker");
+            remoteBroker.stop();
+            remoteBroker.waitUntilStopped();
+            remoteBroker = null;
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/network/NetworkDestinationFilterTest.java
----------------------------------------------------------------------
diff --git 
a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/network/NetworkDestinationFilterTest.java
 
b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/network/NetworkDestinationFilterTest.java
new file mode 100644
index 0000000..80c5855
--- /dev/null
+++ 
b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/network/NetworkDestinationFilterTest.java
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.network;
+
+import junit.framework.TestCase;
+import org.apache.activemq.advisory.AdvisorySupport;
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.activemq.command.ActiveMQTempQueue;
+import org.apache.activemq.command.ActiveMQTopic;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class NetworkDestinationFilterTest extends TestCase {
+
+    public void testFilter() throws Exception {
+        NetworkBridgeConfiguration config = new NetworkBridgeConfiguration();
+        assertEquals(AdvisorySupport.CONSUMER_ADVISORY_TOPIC_PREFIX + ">", 
config.getDestinationFilter());
+        List<ActiveMQDestination> dests = new ArrayList<ActiveMQDestination>();
+        config.setDynamicallyIncludedDestinations(dests);
+        assertEquals(AdvisorySupport.CONSUMER_ADVISORY_TOPIC_PREFIX + ">", 
config.getDestinationFilter());
+        dests.add(new ActiveMQQueue("TEST.>"));
+        dests.add(new ActiveMQTopic("TEST.>"));
+        dests.add(new ActiveMQTempQueue("TEST.>"));
+        String prefix = AdvisorySupport.CONSUMER_ADVISORY_TOPIC_PREFIX;
+        assertEquals(prefix + "Queue.TEST.>," + prefix + "Topic.TEST.>", 
config.getDestinationFilter());
+    }
+
+
+}

Reply via email to