Repository: activemq
Updated Branches:
  refs/heads/master a01578ad4 -> 43c3cae2c


http://git-wip-us.apache.org/repos/asf/activemq/blob/43c3cae2/activemq-runtime-config/src/test/java/org/apache/activemq/java/JavaNetworkConnectorTest.java
----------------------------------------------------------------------
diff --git 
a/activemq-runtime-config/src/test/java/org/apache/activemq/java/JavaNetworkConnectorTest.java
 
b/activemq-runtime-config/src/test/java/org/apache/activemq/java/JavaNetworkConnectorTest.java
new file mode 100644
index 0000000..db0b715
--- /dev/null
+++ 
b/activemq-runtime-config/src/test/java/org/apache/activemq/java/JavaNetworkConnectorTest.java
@@ -0,0 +1,170 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.java;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertSame;
+import static org.junit.Assert.assertTrue;
+
+import java.net.URI;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.activemq.RuntimeConfigTestSupport;
+import org.apache.activemq.broker.BrokerPlugin;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.activemq.command.ActiveMQTopic;
+import org.apache.activemq.network.DiscoveryNetworkConnector;
+import org.apache.activemq.network.NetworkConnector;
+import org.apache.activemq.plugin.java.JavaRuntimeConfigurationBroker;
+import org.apache.activemq.plugin.java.JavaRuntimeConfigurationPlugin;
+import org.apache.activemq.util.Wait;
+import org.junit.Test;
+
+public class JavaNetworkConnectorTest extends RuntimeConfigTestSupport {
+
+    public static final int SLEEP = 2; // seconds
+    private JavaRuntimeConfigurationBroker javaConfigBroker;
+
+    public void startBroker(BrokerService brokerService) throws Exception {
+        this.brokerService = brokerService;
+        brokerService.setPlugins(new BrokerPlugin[]{new 
JavaRuntimeConfigurationPlugin()});
+        brokerService.setPersistent(false);
+        brokerService.start();
+        brokerService.waitUntilStarted();
+
+        javaConfigBroker =
+                (JavaRuntimeConfigurationBroker) 
brokerService.getBroker().getAdaptor(JavaRuntimeConfigurationBroker.class);
+
+    }
+
+    @Test
+    public void testNew() throws Exception {
+        final BrokerService brokerService = new BrokerService();
+        startBroker(brokerService);
+        assertTrue("broker alive", brokerService.isStarted());
+        assertEquals("no network connectors", 0, 
brokerService.getNetworkConnectors().size());
+
+        DiscoveryNetworkConnector nc = createNetworkConnector();
+
+        javaConfigBroker.addNetworkConnector(nc);
+
+        assertTrue("new network connectors", Wait.waitFor(new Wait.Condition() 
{
+            @Override
+            public boolean isSatisified() throws Exception {
+                return 1 == brokerService.getNetworkConnectors().size();
+            }
+        }));
+
+        NetworkConnector networkConnector = 
brokerService.getNetworkConnectors().get(0);
+        javaConfigBroker.addNetworkConnector(nc);
+        TimeUnit.SECONDS.sleep(SLEEP);
+        assertEquals("no new network connectors", 1, 
brokerService.getNetworkConnectors().size());
+        assertSame("same instance", networkConnector, 
brokerService.getNetworkConnectors().get(0));
+
+        // verify nested elements
+        assertEquals("has exclusions", 2, 
networkConnector.getExcludedDestinations().size());
+
+        assertEquals("one statically included", 1, 
networkConnector.getStaticallyIncludedDestinations().size());
+        assertEquals("one dynamically included", 1, 
networkConnector.getDynamicallyIncludedDestinations().size());
+        assertEquals("one durable", 1, 
networkConnector.getDurableDestinations().size());
+
+    }
+
+
+    @Test
+    public void testMod() throws Exception {
+        final BrokerService brokerService = new BrokerService();
+        startBroker(brokerService);
+        assertTrue("broker alive", brokerService.isStarted());
+        assertEquals("no network connectors", 0, 
brokerService.getNetworkConnectors().size());
+
+        DiscoveryNetworkConnector nc = createNetworkConnector();
+        javaConfigBroker.addNetworkConnector(nc);
+        TimeUnit.SECONDS.sleep(SLEEP);
+
+        assertEquals("one network connectors", 1, 
brokerService.getNetworkConnectors().size());
+
+        // track the original
+        NetworkConnector networkConnector = 
brokerService.getNetworkConnectors().get(0);
+        assertEquals("network ttl is default", 1, 
networkConnector.getNetworkTTL());
+
+        nc.setNetworkTTL(2);
+        javaConfigBroker.updateNetworkConnector(nc);
+        TimeUnit.SECONDS.sleep(SLEEP);
+        assertEquals("still one network connectors", 1, 
brokerService.getNetworkConnectors().size());
+
+        NetworkConnector modNetworkConnector = 
brokerService.getNetworkConnectors().get(0);
+        assertEquals("got ttl update", 2, modNetworkConnector.getNetworkTTL());
+
+        // apply again - ensure no change
+        javaConfigBroker.updateNetworkConnector(nc);
+        assertEquals("no new network connectors", 1, 
brokerService.getNetworkConnectors().size());
+        assertSame("same instance", modNetworkConnector, 
brokerService.getNetworkConnectors().get(0));
+    }
+
+    @Test
+    public void testRemove() throws Exception {
+        final BrokerService brokerService = new BrokerService();
+        startBroker(brokerService);
+        assertTrue("broker alive", brokerService.isStarted());
+        assertEquals("no network connectors", 0, 
brokerService.getNetworkConnectors().size());
+
+        DiscoveryNetworkConnector nc1 = new DiscoveryNetworkConnector();
+        nc1.setUri(new URI("static:(tcp://localhost:5555)"));
+        nc1.setNetworkTTL(1);
+        nc1.setName("one");
+
+        DiscoveryNetworkConnector nc2 = new DiscoveryNetworkConnector();
+        nc2.setUri(new URI("static:(tcp://localhost:5555)"));
+        nc2.setNetworkTTL(1);
+        nc2.setName("one");
+
+        javaConfigBroker.addNetworkConnector(nc1);
+        javaConfigBroker.addNetworkConnector(nc2);
+
+        TimeUnit.SECONDS.sleep(SLEEP);
+        assertEquals("correct network connectors", 2, 
brokerService.getNetworkConnectors().size());
+
+        javaConfigBroker.removeNetworkConnector(nc2);
+
+        assertTrue("expected mod on time", Wait.waitFor(new Wait.Condition() {
+            @Override
+            public boolean isSatisified() throws Exception {
+                return 1 == brokerService.getNetworkConnectors().size();
+            }
+        }));
+
+        NetworkConnector remainingNetworkConnector = 
brokerService.getNetworkConnectors().get(0);
+        assertEquals("name match", "one", remainingNetworkConnector.getName());
+    }
+
+    private DiscoveryNetworkConnector createNetworkConnector() throws 
Exception {
+        DiscoveryNetworkConnector nc = new DiscoveryNetworkConnector();
+        nc.setUri(new URI("static:(tcp://localhost:5555)"));
+        nc.setNetworkTTL(1);
+        nc.setName("one");
+        nc.setExcludedDestinations(Arrays.asList(new ActiveMQTopic("LAN.>"), 
new ActiveMQQueue("LAN.>")));
+        
nc.setDynamicallyIncludedDestinations(Arrays.<ActiveMQDestination>asList(new 
ActiveMQQueue("DynamicallyIncluded.*")));
+        
nc.setStaticallyIncludedDestinations(Arrays.<ActiveMQDestination>asList(new 
ActiveMQTopic("StaticallyIncluded.*")));
+        nc.setDurableDestinations(new 
HashSet<>(Arrays.<ActiveMQDestination>asList(new 
ActiveMQTopic("durableDest"))));
+        return nc;
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq/blob/43c3cae2/activemq-runtime-config/src/test/java/org/apache/activemq/java/JavaPolicyEntryTest.java
----------------------------------------------------------------------
diff --git 
a/activemq-runtime-config/src/test/java/org/apache/activemq/java/JavaPolicyEntryTest.java
 
b/activemq-runtime-config/src/test/java/org/apache/activemq/java/JavaPolicyEntryTest.java
new file mode 100644
index 0000000..94a5496
--- /dev/null
+++ 
b/activemq-runtime-config/src/test/java/org/apache/activemq/java/JavaPolicyEntryTest.java
@@ -0,0 +1,141 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.java;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.util.Arrays;
+import java.util.concurrent.TimeUnit;
+
+import javax.jms.Session;
+
+import org.apache.activemq.ActiveMQConnection;
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.RuntimeConfigTestSupport;
+import org.apache.activemq.broker.BrokerPlugin;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.region.policy.PolicyEntry;
+import org.apache.activemq.broker.region.policy.PolicyMap;
+import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.activemq.command.ActiveMQTopic;
+import org.apache.activemq.plugin.java.JavaRuntimeConfigurationBroker;
+import org.apache.activemq.plugin.java.JavaRuntimeConfigurationPlugin;
+import org.junit.Test;
+
+public class JavaPolicyEntryTest extends RuntimeConfigTestSupport {
+
+    public static final int SLEEP = 2; // seconds
+    private JavaRuntimeConfigurationBroker javaConfigBroker;
+
+    public void startBroker(BrokerService brokerService) throws Exception {
+        this.brokerService = brokerService;
+        brokerService.setPlugins(new BrokerPlugin[]{new 
JavaRuntimeConfigurationPlugin()});
+        brokerService.setPersistent(false);
+        brokerService.start();
+        brokerService.waitUntilStarted();
+
+        javaConfigBroker =
+                (JavaRuntimeConfigurationBroker) 
brokerService.getBroker().getAdaptor(JavaRuntimeConfigurationBroker.class);
+    }
+
+    @Test
+    public void testMod() throws Exception {
+        BrokerService brokerService = new BrokerService();
+        PolicyMap policyMap = new PolicyMap();
+        PolicyEntry entry = new PolicyEntry();
+        entry.setQueue(">");
+        entry.setMemoryLimit(1024);
+        policyMap.setPolicyEntries(Arrays.asList(entry));
+        brokerService.setDestinationPolicy(policyMap);
+
+
+        startBroker(brokerService);
+        assertTrue("broker alive", brokerService.isStarted());
+
+        verifyQueueLimit("Before", 1024);
+
+        //Reapply new limit
+        entry.setMemoryLimit(4194304);
+        javaConfigBroker.modifyPolicyEntry(entry);
+        TimeUnit.SECONDS.sleep(SLEEP);
+
+        verifyQueueLimit("After", 4194304);
+
+      // change to existing dest
+        verifyQueueLimit("Before", 4194304);
+    }
+
+    @Test
+    public void testAddNdMod() throws Exception {
+        BrokerService brokerService = new BrokerService();
+        PolicyMap policyMap = new PolicyMap();
+        PolicyEntry entry = new PolicyEntry();
+        entry.setQueue(">");
+        entry.setMemoryLimit(1024);
+        policyMap.setPolicyEntries(Arrays.asList(entry));
+        brokerService.setDestinationPolicy(policyMap);
+
+        startBroker(brokerService);
+        assertTrue("broker alive", brokerService.isStarted());
+
+        verifyQueueLimit("Before", 1024);
+        verifyTopicLimit("Before", 
brokerService.getSystemUsage().getMemoryUsage().getLimit());
+
+        entry.setMemoryLimit(2048);
+        javaConfigBroker.modifyPolicyEntry(entry);
+        TimeUnit.SECONDS.sleep(SLEEP);
+
+        PolicyEntry newEntry = new PolicyEntry();
+        newEntry.setTopic(">");
+        newEntry.setMemoryLimit(2048);
+        javaConfigBroker.addNewPolicyEntry(newEntry);
+        TimeUnit.SECONDS.sleep(SLEEP);
+
+        verifyTopicLimit("After", 2048l);
+        verifyQueueLimit("After", 2048);
+
+        // change to existing dest
+        verifyTopicLimit("Before", 2048l);
+    }
+
+    private void verifyQueueLimit(String dest, int memoryLimit) throws 
Exception {
+        ActiveMQConnection connection = (ActiveMQConnection) new 
ActiveMQConnectionFactory("vm://localhost").createConnection();
+        try {
+            connection.start();
+            Session session = connection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
+            session.createConsumer(session.createQueue(dest));
+
+            assertEquals(memoryLimit, 
brokerService.getRegionBroker().getDestinationMap().get(new 
ActiveMQQueue(dest)).getMemoryUsage().getLimit());
+        } finally {
+            connection.close();
+        }
+    }
+
+    private void verifyTopicLimit(String dest, long memoryLimit) throws 
Exception {
+        ActiveMQConnection connection = (ActiveMQConnection) new 
ActiveMQConnectionFactory("vm://localhost").createConnection();
+        try {
+            connection.start();
+            Session session = connection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
+            session.createConsumer(session.createTopic(dest));
+
+            assertEquals(memoryLimit, 
brokerService.getRegionBroker().getDestinationMap().get(new 
ActiveMQTopic(dest)).getMemoryUsage().getLimit());
+        } finally {
+            connection.close();
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq/blob/43c3cae2/activemq-runtime-config/src/test/java/org/apache/activemq/java/JavaVirtualDestTest.java
----------------------------------------------------------------------
diff --git 
a/activemq-runtime-config/src/test/java/org/apache/activemq/java/JavaVirtualDestTest.java
 
b/activemq-runtime-config/src/test/java/org/apache/activemq/java/JavaVirtualDestTest.java
new file mode 100644
index 0000000..fd8d407
--- /dev/null
+++ 
b/activemq-runtime-config/src/test/java/org/apache/activemq/java/JavaVirtualDestTest.java
@@ -0,0 +1,376 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.java;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertSame;
+import static org.junit.Assert.assertTrue;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.activemq.AbstractVirtualDestTest;
+import org.apache.activemq.broker.BrokerPlugin;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.region.DestinationInterceptor;
+import org.apache.activemq.broker.region.virtual.CompositeQueue;
+import org.apache.activemq.broker.region.virtual.FilteredDestination;
+import org.apache.activemq.broker.region.virtual.VirtualDestination;
+import org.apache.activemq.broker.region.virtual.VirtualDestinationInterceptor;
+import org.apache.activemq.broker.region.virtual.VirtualTopic;
+import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.activemq.command.ActiveMQTopic;
+import org.apache.activemq.plugin.java.JavaRuntimeConfigurationBroker;
+import org.apache.activemq.plugin.java.JavaRuntimeConfigurationPlugin;
+import org.apache.activemq.util.Wait;
+import org.junit.Test;
+
+public class JavaVirtualDestTest extends AbstractVirtualDestTest {
+
+    public static final int SLEEP = 2; // seconds
+    private JavaRuntimeConfigurationBroker javaConfigBroker;
+
+    public void startBroker(BrokerService brokerService) throws Exception {
+        this.brokerService = brokerService;
+        brokerService.setPlugins(new BrokerPlugin[]{new 
JavaRuntimeConfigurationPlugin()});
+        brokerService.setPersistent(false);
+        brokerService.start();
+        brokerService.waitUntilStarted();
+
+        javaConfigBroker =
+                (JavaRuntimeConfigurationBroker) 
brokerService.getBroker().getAdaptor(JavaRuntimeConfigurationBroker.class);
+    }
+
+    @Test
+    public void testNew() throws Exception {
+        startBroker(new BrokerService());
+        assertTrue("broker alive", brokerService.isStarted());
+
+        // default config has support for VirtualTopic.>
+        DestinationInterceptor[] interceptors  = 
brokerService.getDestinationInterceptors();
+        assertEquals("one interceptor", 1, interceptors.length);
+        assertTrue("it is virtual topic interceptor", interceptors[0] 
instanceof VirtualDestinationInterceptor);
+
+        VirtualDestinationInterceptor defaultValue = 
(VirtualDestinationInterceptor) interceptors[0];
+        assertEquals("default names in place", "VirtualTopic.>",
+                
defaultValue.getVirtualDestinations()[0].getVirtualDestination().getPhysicalName());
+
+        exerciseVirtualTopic("VirtualTopic.Default");
+
+        javaConfigBroker.setVirtualDestinations(new 
VirtualDestination[]{buildVirtualTopic("A.>", false)});
+        TimeUnit.SECONDS.sleep(SLEEP);
+
+        assertEquals("one interceptor", 1, interceptors.length);
+        assertTrue("it is virtual topic interceptor", interceptors[0] 
instanceof VirtualDestinationInterceptor);
+
+        // update will happen on addDestination
+        exerciseVirtualTopic("A.Default");
+
+        VirtualDestinationInterceptor newValue = 
(VirtualDestinationInterceptor) interceptors[0];
+        assertEquals("new names in place", "A.>",
+                
defaultValue.getVirtualDestinations()[0].getVirtualDestination().getPhysicalName());
+
+        // apply again - ensure no change
+        javaConfigBroker.setVirtualDestinations(new 
VirtualDestination[]{buildVirtualTopic("A.>", false)});
+        TimeUnit.SECONDS.sleep(SLEEP);
+        assertSame("same instance", newValue, 
brokerService.getDestinationInterceptors()[0]);
+    }
+    @Test
+    public void testNewComposite() throws Exception {
+        startBroker(new BrokerService());
+        assertTrue("broker alive", brokerService.isStarted());
+
+        CompositeQueue queue = 
buildCompositeQueue("VirtualDestination.CompositeQueue",
+                Arrays.asList(new 
ActiveMQQueue("VirtualDestination.QueueConsumer"),
+                new ActiveMQTopic("VirtualDestination.TopicConsumer")));
+
+        javaConfigBroker.setVirtualDestinations(new 
VirtualDestination[]{queue});
+        TimeUnit.SECONDS.sleep(SLEEP);
+
+        exerciseCompositeQueue("VirtualDestination.CompositeQueue", 
"VirtualDestination.QueueConsumer");
+    }
+
+
+    @Test
+    public void testModComposite() throws Exception {
+        BrokerService brokerService = new BrokerService();
+
+        CompositeQueue queue = 
buildCompositeQueue("VirtualDestination.CompositeQueue",
+                Arrays.asList(new 
ActiveMQQueue("VirtualDestination.QueueConsumer"),
+                new ActiveMQTopic("VirtualDestination.TopicConsumer")));
+
+        brokerService.setDestinationInterceptors(new DestinationInterceptor[] {
+                buildInterceptor(new VirtualDestination[]{queue})});
+
+        startBroker(brokerService);
+        assertTrue("broker alive", brokerService.isStarted());
+
+
+        exerciseCompositeQueue("VirtualDestination.CompositeQueue", 
"VirtualDestination.QueueConsumer");
+
+        //Apply updated config
+        CompositeQueue newConfig = 
buildCompositeQueue("VirtualDestination.CompositeQueue", false,
+                Arrays.asList(new 
ActiveMQQueue("VirtualDestination.QueueConsumer"),
+                new ActiveMQTopic("VirtualDestination.TopicConsumer")));
+        javaConfigBroker.setVirtualDestinations(new 
VirtualDestination[]{newConfig});
+        TimeUnit.SECONDS.sleep(SLEEP);
+
+
+        exerciseCompositeQueue("VirtualDestination.CompositeQueue", 
"VirtualDestination.QueueConsumer");
+        exerciseCompositeQueue("VirtualDestination.CompositeQueue", 
"VirtualDestination.CompositeQueue");
+    }
+
+
+    @Test
+    public void testNewNoDefaultVirtualTopicSupport() throws Exception {
+        BrokerService brokerService = new BrokerService();
+        brokerService.setUseVirtualTopics(false);
+        startBroker(brokerService);
+
+        TimeUnit.SECONDS.sleep(SLEEP);
+
+        assertTrue("broker alive", brokerService.isStarted());
+
+        DestinationInterceptor[] interceptors  = 
brokerService.getDestinationInterceptors();
+        assertEquals("one interceptor", 0, interceptors.length);
+
+        //apply new config
+        javaConfigBroker.setVirtualDestinations(new 
VirtualDestination[]{buildVirtualTopic("A.>", false)});
+        TimeUnit.SECONDS.sleep(SLEEP);
+        // update will happen on addDestination
+        exerciseVirtualTopic("A.Default");
+
+        interceptors  = brokerService.getDestinationInterceptors();
+        assertEquals("one interceptor", 1, interceptors.length);
+        assertTrue("it is virtual topic interceptor", interceptors[0] 
instanceof VirtualDestinationInterceptor);
+
+        //apply new config again, make sure still just 1 interceptor
+        javaConfigBroker.setVirtualDestinations(new 
VirtualDestination[]{buildVirtualTopic("A.>", false)});
+        TimeUnit.SECONDS.sleep(SLEEP);
+        // update will happen on addDestination
+        exerciseVirtualTopic("A.Default");
+
+        interceptors  = brokerService.getDestinationInterceptors();
+        assertEquals("one interceptor", 1, interceptors.length);
+        assertTrue("it is virtual topic interceptor", interceptors[0] 
instanceof VirtualDestinationInterceptor);
+
+    }
+
+
+    @Test
+    public void testNewWithMirrorQueueSupport() throws Exception {
+        BrokerService brokerService = new BrokerService();
+        brokerService.setUseMirroredQueues(true);
+        startBroker(brokerService);
+        assertTrue("broker alive", brokerService.isStarted());
+
+        TimeUnit.SECONDS.sleep(SLEEP);
+
+        assertTrue("broker alive", brokerService.isStarted());
+
+        DestinationInterceptor[] interceptors  = 
brokerService.getDestinationInterceptors();
+        assertEquals("expected interceptor", 2, interceptors.length);
+
+        //apply new config
+        javaConfigBroker.setVirtualDestinations(new 
VirtualDestination[]{buildVirtualTopic("A.>", false)});
+        TimeUnit.SECONDS.sleep(SLEEP);
+
+        // update will happen on addDestination
+        exerciseVirtualTopic("A.Default");
+
+        interceptors  = brokerService.getDestinationInterceptors();
+        assertEquals("expected interceptor", 2, interceptors.length);
+        assertTrue("it is virtual topic interceptor", interceptors[0] 
instanceof VirtualDestinationInterceptor);
+
+        VirtualDestinationInterceptor newValue = 
(VirtualDestinationInterceptor) interceptors[0];
+
+        // apply again - ensure no change
+        javaConfigBroker.setVirtualDestinations(new 
VirtualDestination[]{buildVirtualTopic("A.>", false)});
+        assertSame("same instance", newValue, 
brokerService.getDestinationInterceptors()[0]);
+    }
+
+    @Test
+    public void testRemove() throws Exception {
+        final BrokerService brokerService = new BrokerService();
+        brokerService.setDestinationInterceptors(new DestinationInterceptor[] {
+                buildInterceptor(new 
VirtualDestination[]{buildVirtualTopic("A.>", false)})});
+
+        startBroker(brokerService);
+        assertTrue("broker alive", brokerService.isStarted());
+
+        DestinationInterceptor[] interceptors  = 
brokerService.getDestinationInterceptors();
+        assertEquals("one interceptor", 1, interceptors.length);
+        assertTrue("it is virtual topic interceptor", interceptors[0] 
instanceof VirtualDestinationInterceptor);
+
+        exerciseVirtualTopic("A.Default");
+
+        VirtualDestinationInterceptor defaultValue = 
(VirtualDestinationInterceptor) interceptors[0];
+        assertEquals("configured names in place", "A.>",
+                
defaultValue.getVirtualDestinations()[0].getVirtualDestination().getPhysicalName());
+
+        exerciseVirtualTopic("A.Default");
+
+        //apply empty config - this removes all virtual destinations from the 
interceptor
+        javaConfigBroker.setVirtualDestinations(new VirtualDestination[]{});
+        TimeUnit.SECONDS.sleep(SLEEP);
+
+        // update will happen on addDestination
+        forceAddDestination("AnyDest");
+
+        assertTrue("getVirtualDestinations empty on time", Wait.waitFor(new 
Wait.Condition() {
+            @Override
+            public boolean isSatisified() {
+                return 0 == 
((VirtualDestinationInterceptor)brokerService.getDestinationInterceptors()[0]).
+                        getVirtualDestinations().length;
+            }
+        }));
+
+        // reverse the remove, add again
+        javaConfigBroker.setVirtualDestinations(new 
VirtualDestination[]{buildVirtualTopic("A.>", false)});
+        TimeUnit.SECONDS.sleep(SLEEP);
+
+        // update will happen on addDestination
+        exerciseVirtualTopic("A.NewOne");
+
+        interceptors  = brokerService.getDestinationInterceptors();
+        assertEquals("expected interceptor", 1, interceptors.length);
+        assertTrue("it is virtual topic interceptor", interceptors[0] 
instanceof VirtualDestinationInterceptor);
+    }
+
+    @Test
+    public void testMod() throws Exception {
+        final BrokerService brokerService = new BrokerService();
+        brokerService.setDestinationInterceptors(new DestinationInterceptor[] {
+                buildInterceptor(new 
VirtualDestination[]{buildVirtualTopic("A.>", false)})});
+
+        startBroker(brokerService);
+        assertTrue("broker alive", brokerService.isStarted());
+
+        assertEquals("one interceptor", 1, 
brokerService.getDestinationInterceptors().length);
+        exerciseVirtualTopic("A.Default");
+
+        //apply new config
+        javaConfigBroker.setVirtualDestinations(new 
VirtualDestination[]{buildVirtualTopic("B.>", false)});
+        TimeUnit.SECONDS.sleep(SLEEP);
+        exerciseVirtualTopic("B.Default");
+
+        assertEquals("still one interceptor", 1, 
brokerService.getDestinationInterceptors().length);
+    }
+
+
+    @Test
+    public void testModWithMirroredQueue() throws Exception {
+        final BrokerService brokerService = new BrokerService();
+        brokerService.setUseMirroredQueues(true);
+        brokerService.setDestinationInterceptors(new DestinationInterceptor[] {
+                buildInterceptor(new 
VirtualDestination[]{buildVirtualTopic("A.>", false)})});
+
+        startBroker(brokerService);
+        assertTrue("broker alive", brokerService.isStarted());
+
+        TimeUnit.SECONDS.sleep(SLEEP);
+
+        assertEquals("one interceptor", 1, 
brokerService.getDestinationInterceptors().length);
+        exerciseVirtualTopic("A.Default");
+
+        //apply new config
+        javaConfigBroker.setVirtualDestinations(new 
VirtualDestination[]{buildVirtualTopic("B.>", false)});
+        TimeUnit.SECONDS.sleep(SLEEP);
+        exerciseVirtualTopic("B.Default");
+
+        assertEquals("still one interceptor", 1, 
brokerService.getDestinationInterceptors().length);
+    }
+
+    @Test
+    public void testNewFilteredComposite() throws Exception {
+        final BrokerService brokerService = new BrokerService();
+        startBroker(brokerService);
+        assertTrue("broker alive", brokerService.isStarted());
+
+        FilteredDestination filteredDestination = new FilteredDestination();
+        filteredDestination.setSelector("odd = 'yes'");
+        filteredDestination.setQueue("VirtualDestination.QueueConsumer");
+        CompositeQueue queue = 
buildCompositeQueue("VirtualDestination.FilteredCompositeQueue",
+                Arrays.asList(filteredDestination));
+
+        //apply new config
+        javaConfigBroker.setVirtualDestinations(new 
VirtualDestination[]{queue});
+        TimeUnit.SECONDS.sleep(SLEEP);
+
+        
exerciseFilteredCompositeQueue("VirtualDestination.FilteredCompositeQueue", 
"VirtualDestination.QueueConsumer", "yes");
+    }
+
+    @Test
+    public void testModFilteredComposite() throws Exception {
+        final BrokerService brokerService = new BrokerService();
+        FilteredDestination filteredDestination = new FilteredDestination();
+        filteredDestination.setSelector("odd = 'yes'");
+        filteredDestination.setQueue("VirtualDestination.QueueConsumer");
+        CompositeQueue queue = 
buildCompositeQueue("VirtualDestination.FilteredCompositeQueue",
+                Arrays.asList(filteredDestination));
+
+        brokerService.setDestinationInterceptors(new DestinationInterceptor[] {
+                buildInterceptor(new VirtualDestination[]{queue})});
+
+        startBroker(brokerService);
+        assertTrue("broker alive", brokerService.isStarted());
+
+        
exerciseFilteredCompositeQueue("VirtualDestination.FilteredCompositeQueue", 
"VirtualDestination.QueueConsumer", "yes");
+
+        filteredDestination = new FilteredDestination();
+        filteredDestination.setSelector("odd = 'no'");
+        filteredDestination.setQueue("VirtualDestination.QueueConsumer");
+        queue = 
buildCompositeQueue("VirtualDestination.FilteredCompositeQueue",
+                Arrays.asList(filteredDestination));
+
+        //apply new config
+        javaConfigBroker.setVirtualDestinations(new 
VirtualDestination[]{queue});
+        TimeUnit.SECONDS.sleep(SLEEP);
+
+        
exerciseFilteredCompositeQueue("VirtualDestination.FilteredCompositeQueue", 
"VirtualDestination.QueueConsumer", "no");
+        
exerciseFilteredCompositeQueue("VirtualDestination.FilteredCompositeQueue", 
"VirtualDestination.QueueConsumer", "no");
+    }
+
+
+    protected static CompositeQueue buildCompositeQueue(String name, 
Collection<?> forwardTo) {
+        return buildCompositeQueue(name, true, forwardTo);
+    }
+
+    protected static CompositeQueue buildCompositeQueue(String name, boolean 
forwardOnly,
+            Collection<?> forwardTo) {
+        CompositeQueue queue = new CompositeQueue();
+        queue.setForwardOnly(forwardOnly);
+        queue.setName(name);
+        queue.setForwardTo(forwardTo);
+        return queue;
+    }
+
+    protected static VirtualTopic buildVirtualTopic(String name, boolean 
selectorAware) {
+        VirtualTopic virtualTopic = new VirtualTopic();
+        virtualTopic.setSelectorAware(selectorAware);
+        virtualTopic.setName(name);
+        return virtualTopic;
+    }
+
+    protected static VirtualDestinationInterceptor 
buildInterceptor(VirtualDestination[] virtualDestinations) {
+        VirtualDestinationInterceptor virtualDestinationInterceptor = new 
VirtualDestinationInterceptor();
+        
virtualDestinationInterceptor.setVirtualDestinations(virtualDestinations);
+        return virtualDestinationInterceptor;
+    }
+
+}

Reply via email to