Repository: activemq Updated Branches: refs/heads/master a01578ad4 -> 43c3cae2c
http://git-wip-us.apache.org/repos/asf/activemq/blob/43c3cae2/activemq-runtime-config/src/test/java/org/apache/activemq/java/JavaNetworkConnectorTest.java ---------------------------------------------------------------------- diff --git a/activemq-runtime-config/src/test/java/org/apache/activemq/java/JavaNetworkConnectorTest.java b/activemq-runtime-config/src/test/java/org/apache/activemq/java/JavaNetworkConnectorTest.java new file mode 100644 index 0000000..db0b715 --- /dev/null +++ b/activemq-runtime-config/src/test/java/org/apache/activemq/java/JavaNetworkConnectorTest.java @@ -0,0 +1,170 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.java; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertSame; +import static org.junit.Assert.assertTrue; + +import java.net.URI; +import java.util.Arrays; +import java.util.HashSet; +import java.util.concurrent.TimeUnit; + +import org.apache.activemq.RuntimeConfigTestSupport; +import org.apache.activemq.broker.BrokerPlugin; +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.command.ActiveMQDestination; +import org.apache.activemq.command.ActiveMQQueue; +import org.apache.activemq.command.ActiveMQTopic; +import org.apache.activemq.network.DiscoveryNetworkConnector; +import org.apache.activemq.network.NetworkConnector; +import org.apache.activemq.plugin.java.JavaRuntimeConfigurationBroker; +import org.apache.activemq.plugin.java.JavaRuntimeConfigurationPlugin; +import org.apache.activemq.util.Wait; +import org.junit.Test; + +public class JavaNetworkConnectorTest extends RuntimeConfigTestSupport { + + public static final int SLEEP = 2; // seconds + private JavaRuntimeConfigurationBroker javaConfigBroker; + + public void startBroker(BrokerService brokerService) throws Exception { + this.brokerService = brokerService; + brokerService.setPlugins(new BrokerPlugin[]{new JavaRuntimeConfigurationPlugin()}); + brokerService.setPersistent(false); + brokerService.start(); + brokerService.waitUntilStarted(); + + javaConfigBroker = + (JavaRuntimeConfigurationBroker) brokerService.getBroker().getAdaptor(JavaRuntimeConfigurationBroker.class); + + } + + @Test + public void testNew() throws Exception { + final BrokerService brokerService = new BrokerService(); + startBroker(brokerService); + assertTrue("broker alive", brokerService.isStarted()); + assertEquals("no network connectors", 0, brokerService.getNetworkConnectors().size()); + + DiscoveryNetworkConnector nc = createNetworkConnector(); + + javaConfigBroker.addNetworkConnector(nc); + + assertTrue("new network connectors", Wait.waitFor(new Wait.Condition() { + @Override + public boolean isSatisified() throws Exception { + return 1 == brokerService.getNetworkConnectors().size(); + } + })); + + NetworkConnector networkConnector = brokerService.getNetworkConnectors().get(0); + javaConfigBroker.addNetworkConnector(nc); + TimeUnit.SECONDS.sleep(SLEEP); + assertEquals("no new network connectors", 1, brokerService.getNetworkConnectors().size()); + assertSame("same instance", networkConnector, brokerService.getNetworkConnectors().get(0)); + + // verify nested elements + assertEquals("has exclusions", 2, networkConnector.getExcludedDestinations().size()); + + assertEquals("one statically included", 1, networkConnector.getStaticallyIncludedDestinations().size()); + assertEquals("one dynamically included", 1, networkConnector.getDynamicallyIncludedDestinations().size()); + assertEquals("one durable", 1, networkConnector.getDurableDestinations().size()); + + } + + + @Test + public void testMod() throws Exception { + final BrokerService brokerService = new BrokerService(); + startBroker(brokerService); + assertTrue("broker alive", brokerService.isStarted()); + assertEquals("no network connectors", 0, brokerService.getNetworkConnectors().size()); + + DiscoveryNetworkConnector nc = createNetworkConnector(); + javaConfigBroker.addNetworkConnector(nc); + TimeUnit.SECONDS.sleep(SLEEP); + + assertEquals("one network connectors", 1, brokerService.getNetworkConnectors().size()); + + // track the original + NetworkConnector networkConnector = brokerService.getNetworkConnectors().get(0); + assertEquals("network ttl is default", 1, networkConnector.getNetworkTTL()); + + nc.setNetworkTTL(2); + javaConfigBroker.updateNetworkConnector(nc); + TimeUnit.SECONDS.sleep(SLEEP); + assertEquals("still one network connectors", 1, brokerService.getNetworkConnectors().size()); + + NetworkConnector modNetworkConnector = brokerService.getNetworkConnectors().get(0); + assertEquals("got ttl update", 2, modNetworkConnector.getNetworkTTL()); + + // apply again - ensure no change + javaConfigBroker.updateNetworkConnector(nc); + assertEquals("no new network connectors", 1, brokerService.getNetworkConnectors().size()); + assertSame("same instance", modNetworkConnector, brokerService.getNetworkConnectors().get(0)); + } + + @Test + public void testRemove() throws Exception { + final BrokerService brokerService = new BrokerService(); + startBroker(brokerService); + assertTrue("broker alive", brokerService.isStarted()); + assertEquals("no network connectors", 0, brokerService.getNetworkConnectors().size()); + + DiscoveryNetworkConnector nc1 = new DiscoveryNetworkConnector(); + nc1.setUri(new URI("static:(tcp://localhost:5555)")); + nc1.setNetworkTTL(1); + nc1.setName("one"); + + DiscoveryNetworkConnector nc2 = new DiscoveryNetworkConnector(); + nc2.setUri(new URI("static:(tcp://localhost:5555)")); + nc2.setNetworkTTL(1); + nc2.setName("one"); + + javaConfigBroker.addNetworkConnector(nc1); + javaConfigBroker.addNetworkConnector(nc2); + + TimeUnit.SECONDS.sleep(SLEEP); + assertEquals("correct network connectors", 2, brokerService.getNetworkConnectors().size()); + + javaConfigBroker.removeNetworkConnector(nc2); + + assertTrue("expected mod on time", Wait.waitFor(new Wait.Condition() { + @Override + public boolean isSatisified() throws Exception { + return 1 == brokerService.getNetworkConnectors().size(); + } + })); + + NetworkConnector remainingNetworkConnector = brokerService.getNetworkConnectors().get(0); + assertEquals("name match", "one", remainingNetworkConnector.getName()); + } + + private DiscoveryNetworkConnector createNetworkConnector() throws Exception { + DiscoveryNetworkConnector nc = new DiscoveryNetworkConnector(); + nc.setUri(new URI("static:(tcp://localhost:5555)")); + nc.setNetworkTTL(1); + nc.setName("one"); + nc.setExcludedDestinations(Arrays.asList(new ActiveMQTopic("LAN.>"), new ActiveMQQueue("LAN.>"))); + nc.setDynamicallyIncludedDestinations(Arrays.<ActiveMQDestination>asList(new ActiveMQQueue("DynamicallyIncluded.*"))); + nc.setStaticallyIncludedDestinations(Arrays.<ActiveMQDestination>asList(new ActiveMQTopic("StaticallyIncluded.*"))); + nc.setDurableDestinations(new HashSet<>(Arrays.<ActiveMQDestination>asList(new ActiveMQTopic("durableDest")))); + return nc; + } +} http://git-wip-us.apache.org/repos/asf/activemq/blob/43c3cae2/activemq-runtime-config/src/test/java/org/apache/activemq/java/JavaPolicyEntryTest.java ---------------------------------------------------------------------- diff --git a/activemq-runtime-config/src/test/java/org/apache/activemq/java/JavaPolicyEntryTest.java b/activemq-runtime-config/src/test/java/org/apache/activemq/java/JavaPolicyEntryTest.java new file mode 100644 index 0000000..94a5496 --- /dev/null +++ b/activemq-runtime-config/src/test/java/org/apache/activemq/java/JavaPolicyEntryTest.java @@ -0,0 +1,141 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.java; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +import java.util.Arrays; +import java.util.concurrent.TimeUnit; + +import javax.jms.Session; + +import org.apache.activemq.ActiveMQConnection; +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.RuntimeConfigTestSupport; +import org.apache.activemq.broker.BrokerPlugin; +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.broker.region.policy.PolicyEntry; +import org.apache.activemq.broker.region.policy.PolicyMap; +import org.apache.activemq.command.ActiveMQQueue; +import org.apache.activemq.command.ActiveMQTopic; +import org.apache.activemq.plugin.java.JavaRuntimeConfigurationBroker; +import org.apache.activemq.plugin.java.JavaRuntimeConfigurationPlugin; +import org.junit.Test; + +public class JavaPolicyEntryTest extends RuntimeConfigTestSupport { + + public static final int SLEEP = 2; // seconds + private JavaRuntimeConfigurationBroker javaConfigBroker; + + public void startBroker(BrokerService brokerService) throws Exception { + this.brokerService = brokerService; + brokerService.setPlugins(new BrokerPlugin[]{new JavaRuntimeConfigurationPlugin()}); + brokerService.setPersistent(false); + brokerService.start(); + brokerService.waitUntilStarted(); + + javaConfigBroker = + (JavaRuntimeConfigurationBroker) brokerService.getBroker().getAdaptor(JavaRuntimeConfigurationBroker.class); + } + + @Test + public void testMod() throws Exception { + BrokerService brokerService = new BrokerService(); + PolicyMap policyMap = new PolicyMap(); + PolicyEntry entry = new PolicyEntry(); + entry.setQueue(">"); + entry.setMemoryLimit(1024); + policyMap.setPolicyEntries(Arrays.asList(entry)); + brokerService.setDestinationPolicy(policyMap); + + + startBroker(brokerService); + assertTrue("broker alive", brokerService.isStarted()); + + verifyQueueLimit("Before", 1024); + + //Reapply new limit + entry.setMemoryLimit(4194304); + javaConfigBroker.modifyPolicyEntry(entry); + TimeUnit.SECONDS.sleep(SLEEP); + + verifyQueueLimit("After", 4194304); + + // change to existing dest + verifyQueueLimit("Before", 4194304); + } + + @Test + public void testAddNdMod() throws Exception { + BrokerService brokerService = new BrokerService(); + PolicyMap policyMap = new PolicyMap(); + PolicyEntry entry = new PolicyEntry(); + entry.setQueue(">"); + entry.setMemoryLimit(1024); + policyMap.setPolicyEntries(Arrays.asList(entry)); + brokerService.setDestinationPolicy(policyMap); + + startBroker(brokerService); + assertTrue("broker alive", brokerService.isStarted()); + + verifyQueueLimit("Before", 1024); + verifyTopicLimit("Before", brokerService.getSystemUsage().getMemoryUsage().getLimit()); + + entry.setMemoryLimit(2048); + javaConfigBroker.modifyPolicyEntry(entry); + TimeUnit.SECONDS.sleep(SLEEP); + + PolicyEntry newEntry = new PolicyEntry(); + newEntry.setTopic(">"); + newEntry.setMemoryLimit(2048); + javaConfigBroker.addNewPolicyEntry(newEntry); + TimeUnit.SECONDS.sleep(SLEEP); + + verifyTopicLimit("After", 2048l); + verifyQueueLimit("After", 2048); + + // change to existing dest + verifyTopicLimit("Before", 2048l); + } + + private void verifyQueueLimit(String dest, int memoryLimit) throws Exception { + ActiveMQConnection connection = (ActiveMQConnection) new ActiveMQConnectionFactory("vm://localhost").createConnection(); + try { + connection.start(); + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + session.createConsumer(session.createQueue(dest)); + + assertEquals(memoryLimit, brokerService.getRegionBroker().getDestinationMap().get(new ActiveMQQueue(dest)).getMemoryUsage().getLimit()); + } finally { + connection.close(); + } + } + + private void verifyTopicLimit(String dest, long memoryLimit) throws Exception { + ActiveMQConnection connection = (ActiveMQConnection) new ActiveMQConnectionFactory("vm://localhost").createConnection(); + try { + connection.start(); + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + session.createConsumer(session.createTopic(dest)); + + assertEquals(memoryLimit, brokerService.getRegionBroker().getDestinationMap().get(new ActiveMQTopic(dest)).getMemoryUsage().getLimit()); + } finally { + connection.close(); + } + } +} http://git-wip-us.apache.org/repos/asf/activemq/blob/43c3cae2/activemq-runtime-config/src/test/java/org/apache/activemq/java/JavaVirtualDestTest.java ---------------------------------------------------------------------- diff --git a/activemq-runtime-config/src/test/java/org/apache/activemq/java/JavaVirtualDestTest.java b/activemq-runtime-config/src/test/java/org/apache/activemq/java/JavaVirtualDestTest.java new file mode 100644 index 0000000..fd8d407 --- /dev/null +++ b/activemq-runtime-config/src/test/java/org/apache/activemq/java/JavaVirtualDestTest.java @@ -0,0 +1,376 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.java; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertSame; +import static org.junit.Assert.assertTrue; + +import java.util.Arrays; +import java.util.Collection; +import java.util.concurrent.TimeUnit; + +import org.apache.activemq.AbstractVirtualDestTest; +import org.apache.activemq.broker.BrokerPlugin; +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.broker.region.DestinationInterceptor; +import org.apache.activemq.broker.region.virtual.CompositeQueue; +import org.apache.activemq.broker.region.virtual.FilteredDestination; +import org.apache.activemq.broker.region.virtual.VirtualDestination; +import org.apache.activemq.broker.region.virtual.VirtualDestinationInterceptor; +import org.apache.activemq.broker.region.virtual.VirtualTopic; +import org.apache.activemq.command.ActiveMQQueue; +import org.apache.activemq.command.ActiveMQTopic; +import org.apache.activemq.plugin.java.JavaRuntimeConfigurationBroker; +import org.apache.activemq.plugin.java.JavaRuntimeConfigurationPlugin; +import org.apache.activemq.util.Wait; +import org.junit.Test; + +public class JavaVirtualDestTest extends AbstractVirtualDestTest { + + public static final int SLEEP = 2; // seconds + private JavaRuntimeConfigurationBroker javaConfigBroker; + + public void startBroker(BrokerService brokerService) throws Exception { + this.brokerService = brokerService; + brokerService.setPlugins(new BrokerPlugin[]{new JavaRuntimeConfigurationPlugin()}); + brokerService.setPersistent(false); + brokerService.start(); + brokerService.waitUntilStarted(); + + javaConfigBroker = + (JavaRuntimeConfigurationBroker) brokerService.getBroker().getAdaptor(JavaRuntimeConfigurationBroker.class); + } + + @Test + public void testNew() throws Exception { + startBroker(new BrokerService()); + assertTrue("broker alive", brokerService.isStarted()); + + // default config has support for VirtualTopic.> + DestinationInterceptor[] interceptors = brokerService.getDestinationInterceptors(); + assertEquals("one interceptor", 1, interceptors.length); + assertTrue("it is virtual topic interceptor", interceptors[0] instanceof VirtualDestinationInterceptor); + + VirtualDestinationInterceptor defaultValue = (VirtualDestinationInterceptor) interceptors[0]; + assertEquals("default names in place", "VirtualTopic.>", + defaultValue.getVirtualDestinations()[0].getVirtualDestination().getPhysicalName()); + + exerciseVirtualTopic("VirtualTopic.Default"); + + javaConfigBroker.setVirtualDestinations(new VirtualDestination[]{buildVirtualTopic("A.>", false)}); + TimeUnit.SECONDS.sleep(SLEEP); + + assertEquals("one interceptor", 1, interceptors.length); + assertTrue("it is virtual topic interceptor", interceptors[0] instanceof VirtualDestinationInterceptor); + + // update will happen on addDestination + exerciseVirtualTopic("A.Default"); + + VirtualDestinationInterceptor newValue = (VirtualDestinationInterceptor) interceptors[0]; + assertEquals("new names in place", "A.>", + defaultValue.getVirtualDestinations()[0].getVirtualDestination().getPhysicalName()); + + // apply again - ensure no change + javaConfigBroker.setVirtualDestinations(new VirtualDestination[]{buildVirtualTopic("A.>", false)}); + TimeUnit.SECONDS.sleep(SLEEP); + assertSame("same instance", newValue, brokerService.getDestinationInterceptors()[0]); + } + @Test + public void testNewComposite() throws Exception { + startBroker(new BrokerService()); + assertTrue("broker alive", brokerService.isStarted()); + + CompositeQueue queue = buildCompositeQueue("VirtualDestination.CompositeQueue", + Arrays.asList(new ActiveMQQueue("VirtualDestination.QueueConsumer"), + new ActiveMQTopic("VirtualDestination.TopicConsumer"))); + + javaConfigBroker.setVirtualDestinations(new VirtualDestination[]{queue}); + TimeUnit.SECONDS.sleep(SLEEP); + + exerciseCompositeQueue("VirtualDestination.CompositeQueue", "VirtualDestination.QueueConsumer"); + } + + + @Test + public void testModComposite() throws Exception { + BrokerService brokerService = new BrokerService(); + + CompositeQueue queue = buildCompositeQueue("VirtualDestination.CompositeQueue", + Arrays.asList(new ActiveMQQueue("VirtualDestination.QueueConsumer"), + new ActiveMQTopic("VirtualDestination.TopicConsumer"))); + + brokerService.setDestinationInterceptors(new DestinationInterceptor[] { + buildInterceptor(new VirtualDestination[]{queue})}); + + startBroker(brokerService); + assertTrue("broker alive", brokerService.isStarted()); + + + exerciseCompositeQueue("VirtualDestination.CompositeQueue", "VirtualDestination.QueueConsumer"); + + //Apply updated config + CompositeQueue newConfig = buildCompositeQueue("VirtualDestination.CompositeQueue", false, + Arrays.asList(new ActiveMQQueue("VirtualDestination.QueueConsumer"), + new ActiveMQTopic("VirtualDestination.TopicConsumer"))); + javaConfigBroker.setVirtualDestinations(new VirtualDestination[]{newConfig}); + TimeUnit.SECONDS.sleep(SLEEP); + + + exerciseCompositeQueue("VirtualDestination.CompositeQueue", "VirtualDestination.QueueConsumer"); + exerciseCompositeQueue("VirtualDestination.CompositeQueue", "VirtualDestination.CompositeQueue"); + } + + + @Test + public void testNewNoDefaultVirtualTopicSupport() throws Exception { + BrokerService brokerService = new BrokerService(); + brokerService.setUseVirtualTopics(false); + startBroker(brokerService); + + TimeUnit.SECONDS.sleep(SLEEP); + + assertTrue("broker alive", brokerService.isStarted()); + + DestinationInterceptor[] interceptors = brokerService.getDestinationInterceptors(); + assertEquals("one interceptor", 0, interceptors.length); + + //apply new config + javaConfigBroker.setVirtualDestinations(new VirtualDestination[]{buildVirtualTopic("A.>", false)}); + TimeUnit.SECONDS.sleep(SLEEP); + // update will happen on addDestination + exerciseVirtualTopic("A.Default"); + + interceptors = brokerService.getDestinationInterceptors(); + assertEquals("one interceptor", 1, interceptors.length); + assertTrue("it is virtual topic interceptor", interceptors[0] instanceof VirtualDestinationInterceptor); + + //apply new config again, make sure still just 1 interceptor + javaConfigBroker.setVirtualDestinations(new VirtualDestination[]{buildVirtualTopic("A.>", false)}); + TimeUnit.SECONDS.sleep(SLEEP); + // update will happen on addDestination + exerciseVirtualTopic("A.Default"); + + interceptors = brokerService.getDestinationInterceptors(); + assertEquals("one interceptor", 1, interceptors.length); + assertTrue("it is virtual topic interceptor", interceptors[0] instanceof VirtualDestinationInterceptor); + + } + + + @Test + public void testNewWithMirrorQueueSupport() throws Exception { + BrokerService brokerService = new BrokerService(); + brokerService.setUseMirroredQueues(true); + startBroker(brokerService); + assertTrue("broker alive", brokerService.isStarted()); + + TimeUnit.SECONDS.sleep(SLEEP); + + assertTrue("broker alive", brokerService.isStarted()); + + DestinationInterceptor[] interceptors = brokerService.getDestinationInterceptors(); + assertEquals("expected interceptor", 2, interceptors.length); + + //apply new config + javaConfigBroker.setVirtualDestinations(new VirtualDestination[]{buildVirtualTopic("A.>", false)}); + TimeUnit.SECONDS.sleep(SLEEP); + + // update will happen on addDestination + exerciseVirtualTopic("A.Default"); + + interceptors = brokerService.getDestinationInterceptors(); + assertEquals("expected interceptor", 2, interceptors.length); + assertTrue("it is virtual topic interceptor", interceptors[0] instanceof VirtualDestinationInterceptor); + + VirtualDestinationInterceptor newValue = (VirtualDestinationInterceptor) interceptors[0]; + + // apply again - ensure no change + javaConfigBroker.setVirtualDestinations(new VirtualDestination[]{buildVirtualTopic("A.>", false)}); + assertSame("same instance", newValue, brokerService.getDestinationInterceptors()[0]); + } + + @Test + public void testRemove() throws Exception { + final BrokerService brokerService = new BrokerService(); + brokerService.setDestinationInterceptors(new DestinationInterceptor[] { + buildInterceptor(new VirtualDestination[]{buildVirtualTopic("A.>", false)})}); + + startBroker(brokerService); + assertTrue("broker alive", brokerService.isStarted()); + + DestinationInterceptor[] interceptors = brokerService.getDestinationInterceptors(); + assertEquals("one interceptor", 1, interceptors.length); + assertTrue("it is virtual topic interceptor", interceptors[0] instanceof VirtualDestinationInterceptor); + + exerciseVirtualTopic("A.Default"); + + VirtualDestinationInterceptor defaultValue = (VirtualDestinationInterceptor) interceptors[0]; + assertEquals("configured names in place", "A.>", + defaultValue.getVirtualDestinations()[0].getVirtualDestination().getPhysicalName()); + + exerciseVirtualTopic("A.Default"); + + //apply empty config - this removes all virtual destinations from the interceptor + javaConfigBroker.setVirtualDestinations(new VirtualDestination[]{}); + TimeUnit.SECONDS.sleep(SLEEP); + + // update will happen on addDestination + forceAddDestination("AnyDest"); + + assertTrue("getVirtualDestinations empty on time", Wait.waitFor(new Wait.Condition() { + @Override + public boolean isSatisified() { + return 0 == ((VirtualDestinationInterceptor)brokerService.getDestinationInterceptors()[0]). + getVirtualDestinations().length; + } + })); + + // reverse the remove, add again + javaConfigBroker.setVirtualDestinations(new VirtualDestination[]{buildVirtualTopic("A.>", false)}); + TimeUnit.SECONDS.sleep(SLEEP); + + // update will happen on addDestination + exerciseVirtualTopic("A.NewOne"); + + interceptors = brokerService.getDestinationInterceptors(); + assertEquals("expected interceptor", 1, interceptors.length); + assertTrue("it is virtual topic interceptor", interceptors[0] instanceof VirtualDestinationInterceptor); + } + + @Test + public void testMod() throws Exception { + final BrokerService brokerService = new BrokerService(); + brokerService.setDestinationInterceptors(new DestinationInterceptor[] { + buildInterceptor(new VirtualDestination[]{buildVirtualTopic("A.>", false)})}); + + startBroker(brokerService); + assertTrue("broker alive", brokerService.isStarted()); + + assertEquals("one interceptor", 1, brokerService.getDestinationInterceptors().length); + exerciseVirtualTopic("A.Default"); + + //apply new config + javaConfigBroker.setVirtualDestinations(new VirtualDestination[]{buildVirtualTopic("B.>", false)}); + TimeUnit.SECONDS.sleep(SLEEP); + exerciseVirtualTopic("B.Default"); + + assertEquals("still one interceptor", 1, brokerService.getDestinationInterceptors().length); + } + + + @Test + public void testModWithMirroredQueue() throws Exception { + final BrokerService brokerService = new BrokerService(); + brokerService.setUseMirroredQueues(true); + brokerService.setDestinationInterceptors(new DestinationInterceptor[] { + buildInterceptor(new VirtualDestination[]{buildVirtualTopic("A.>", false)})}); + + startBroker(brokerService); + assertTrue("broker alive", brokerService.isStarted()); + + TimeUnit.SECONDS.sleep(SLEEP); + + assertEquals("one interceptor", 1, brokerService.getDestinationInterceptors().length); + exerciseVirtualTopic("A.Default"); + + //apply new config + javaConfigBroker.setVirtualDestinations(new VirtualDestination[]{buildVirtualTopic("B.>", false)}); + TimeUnit.SECONDS.sleep(SLEEP); + exerciseVirtualTopic("B.Default"); + + assertEquals("still one interceptor", 1, brokerService.getDestinationInterceptors().length); + } + + @Test + public void testNewFilteredComposite() throws Exception { + final BrokerService brokerService = new BrokerService(); + startBroker(brokerService); + assertTrue("broker alive", brokerService.isStarted()); + + FilteredDestination filteredDestination = new FilteredDestination(); + filteredDestination.setSelector("odd = 'yes'"); + filteredDestination.setQueue("VirtualDestination.QueueConsumer"); + CompositeQueue queue = buildCompositeQueue("VirtualDestination.FilteredCompositeQueue", + Arrays.asList(filteredDestination)); + + //apply new config + javaConfigBroker.setVirtualDestinations(new VirtualDestination[]{queue}); + TimeUnit.SECONDS.sleep(SLEEP); + + exerciseFilteredCompositeQueue("VirtualDestination.FilteredCompositeQueue", "VirtualDestination.QueueConsumer", "yes"); + } + + @Test + public void testModFilteredComposite() throws Exception { + final BrokerService brokerService = new BrokerService(); + FilteredDestination filteredDestination = new FilteredDestination(); + filteredDestination.setSelector("odd = 'yes'"); + filteredDestination.setQueue("VirtualDestination.QueueConsumer"); + CompositeQueue queue = buildCompositeQueue("VirtualDestination.FilteredCompositeQueue", + Arrays.asList(filteredDestination)); + + brokerService.setDestinationInterceptors(new DestinationInterceptor[] { + buildInterceptor(new VirtualDestination[]{queue})}); + + startBroker(brokerService); + assertTrue("broker alive", brokerService.isStarted()); + + exerciseFilteredCompositeQueue("VirtualDestination.FilteredCompositeQueue", "VirtualDestination.QueueConsumer", "yes"); + + filteredDestination = new FilteredDestination(); + filteredDestination.setSelector("odd = 'no'"); + filteredDestination.setQueue("VirtualDestination.QueueConsumer"); + queue = buildCompositeQueue("VirtualDestination.FilteredCompositeQueue", + Arrays.asList(filteredDestination)); + + //apply new config + javaConfigBroker.setVirtualDestinations(new VirtualDestination[]{queue}); + TimeUnit.SECONDS.sleep(SLEEP); + + exerciseFilteredCompositeQueue("VirtualDestination.FilteredCompositeQueue", "VirtualDestination.QueueConsumer", "no"); + exerciseFilteredCompositeQueue("VirtualDestination.FilteredCompositeQueue", "VirtualDestination.QueueConsumer", "no"); + } + + + protected static CompositeQueue buildCompositeQueue(String name, Collection<?> forwardTo) { + return buildCompositeQueue(name, true, forwardTo); + } + + protected static CompositeQueue buildCompositeQueue(String name, boolean forwardOnly, + Collection<?> forwardTo) { + CompositeQueue queue = new CompositeQueue(); + queue.setForwardOnly(forwardOnly); + queue.setName(name); + queue.setForwardTo(forwardTo); + return queue; + } + + protected static VirtualTopic buildVirtualTopic(String name, boolean selectorAware) { + VirtualTopic virtualTopic = new VirtualTopic(); + virtualTopic.setSelectorAware(selectorAware); + virtualTopic.setName(name); + return virtualTopic; + } + + protected static VirtualDestinationInterceptor buildInterceptor(VirtualDestination[] virtualDestinations) { + VirtualDestinationInterceptor virtualDestinationInterceptor = new VirtualDestinationInterceptor(); + virtualDestinationInterceptor.setVirtualDestinations(virtualDestinations); + return virtualDestinationInterceptor; + } + +}