http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/virtual/MirroredQueueCorrectMemoryUsageTest.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/virtual/MirroredQueueCorrectMemoryUsageTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/virtual/MirroredQueueCorrectMemoryUsageTest.java new file mode 100644 index 0000000..4c19c7b --- /dev/null +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/virtual/MirroredQueueCorrectMemoryUsageTest.java @@ -0,0 +1,165 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.broker.virtual; + +import java.util.Arrays; +import java.util.LinkedList; +import java.util.List; + +import javax.jms.Connection; +import javax.jms.DeliveryMode; +import javax.jms.Destination; +import javax.jms.MessageProducer; +import javax.jms.Session; + +import org.apache.activemq.EmbeddedBrokerTestSupport; +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.broker.region.DestinationInterceptor; +import org.apache.activemq.broker.region.policy.PolicyEntry; +import org.apache.activemq.broker.region.policy.PolicyMap; +import org.apache.activemq.broker.region.virtual.MirroredQueue; +import org.apache.activemq.command.ActiveMQDestination; +import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter; +import org.apache.activemq.usage.MemoryUsage; +import org.apache.activemq.usage.StoreUsage; +import org.apache.activemq.usage.SystemUsage; +import org.apache.activemq.usage.TempUsage; +import org.apache.activemq.util.IOHelper; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.util.Assert; + +/** + * This test will determine that the producer flow control does not kick in. + * The original MirroredQueue implementation was causing the queue to update + * the topic memory usage instead of the queue memory usage. + * The reason is that the message memory usage instance will not be updated + * unless it is null. This was the case when the message was initially sent + * to the topic but then it was non-null when it was being sent to the queue. + * When the region destination was set, the associated memory usage was not + * updated to the passed queue destination and thus the memory usage of the + * topic was being updated instead. + * + * @author Claudio Corsi + */ +public class MirroredQueueCorrectMemoryUsageTest extends EmbeddedBrokerTestSupport { + + private static final Logger logger = LoggerFactory.getLogger(MirroredQueueCorrectMemoryUsageTest.class); + + private static final long ONE_MB = 0x0100000; + private static final long TEN_MB = ONE_MB * 10; + private static final long TWENTY_MB = TEN_MB * 2; + + private static final String CREATED_STATIC_FOR_PERSISTENT = "created.static.for.persistent"; + + @Override + protected boolean isPersistent() { + return true; + } + + @Override + protected BrokerService createBroker() throws Exception { + // Create the broker service instance.... + BrokerService broker = super.createBroker(); + // Create and add the mirrored queue destination interceptor .... + DestinationInterceptor[] destinationInterceptors = new DestinationInterceptor[1]; + MirroredQueue mq = new MirroredQueue(); + mq.setCopyMessage(true); + mq.setPrefix(""); + mq.setPostfix(".qmirror"); + destinationInterceptors[0] = mq; + broker.setDestinationInterceptors(destinationInterceptors); + // Create the destination policy for the topics and queues + PolicyMap policyMap = new PolicyMap(); + List<PolicyEntry> entries = new LinkedList<PolicyEntry>(); + // Create Topic policy entry + PolicyEntry policyEntry = new PolicyEntry(); + super.useTopic = true; + ActiveMQDestination destination = super.createDestination(">"); + Assert.isTrue(destination.isTopic(), "Created destination was not a topic"); + policyEntry.setDestination(destination); + policyEntry.setProducerFlowControl(true); + policyEntry.setMemoryLimit(ONE_MB); // x10 + entries.add(policyEntry); + // Create Queue policy entry + policyEntry = new PolicyEntry(); + super.useTopic = false; + destination = super.createDestination(CREATED_STATIC_FOR_PERSISTENT); + Assert.isTrue(destination.isQueue(), "Created destination was not a queue"); + policyEntry.setDestination(destination); + policyEntry.setProducerFlowControl(true); + policyEntry.setMemoryLimit(TEN_MB); + entries.add(policyEntry); + policyMap.setPolicyEntries(entries); + broker.setDestinationPolicy(policyMap); + // Set destinations + broker.setDestinations(new ActiveMQDestination[] { destination }); + // Set system usage + SystemUsage memoryManager = new SystemUsage(); + MemoryUsage memoryUsage = new MemoryUsage(); + memoryUsage.setLimit(TEN_MB); + memoryManager.setMemoryUsage(memoryUsage); + StoreUsage storeUsage = new StoreUsage(); + storeUsage.setLimit(TWENTY_MB); + memoryManager.setStoreUsage(storeUsage); + TempUsage tempDiskUsage = new TempUsage(); + tempDiskUsage.setLimit(TEN_MB); + memoryManager.setTempUsage(tempDiskUsage); + broker.setSystemUsage(memoryManager); + // Set the persistent adapter + KahaDBPersistenceAdapter persistenceAdapter = new KahaDBPersistenceAdapter(); + persistenceAdapter.setJournalMaxFileLength((int)TEN_MB); + // Delete all current messages... + IOHelper.deleteFile(persistenceAdapter.getDirectory()); + broker.setPersistenceAdapter(persistenceAdapter); + return broker; + } + + @Before + protected void setUp() throws Exception { + super.setUp(); + } + + @After + protected void tearDown() throws Exception { + super.tearDown(); + } + + @Test(timeout=40000) + public void testNoMemoryUsageIncreaseForTopic() throws Exception { + Connection connection = super.createConnection(); + connection.start(); + Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE); + Destination destination = session.createQueue(CREATED_STATIC_FOR_PERSISTENT); + MessageProducer producer = session.createProducer(destination); + producer.setDeliveryMode(DeliveryMode.PERSISTENT); + char[] m = new char[1024]; + Arrays.fill(m, 'x'); + // create some messages that have 1k each + for (int i = 1; i < 12000; i++) { + producer.send(session.createTextMessage(new String(m))); + logger.debug("Sent message: " + i); + } + producer.close(); + session.close(); + connection.stop(); + connection.close(); + } +}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/virtual/MirroredQueueTest.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/virtual/MirroredQueueTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/virtual/MirroredQueueTest.java new file mode 100644 index 0000000..acbe3b9 --- /dev/null +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/virtual/MirroredQueueTest.java @@ -0,0 +1,116 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.broker.virtual; + +import javax.jms.Connection; +import javax.jms.Destination; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Session; +import javax.jms.TemporaryQueue; + +import org.apache.activemq.EmbeddedBrokerTestSupport; +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.broker.region.RegionBroker; +import org.apache.activemq.command.ActiveMQQueue; +import org.apache.activemq.command.ActiveMQTopic; +import org.apache.activemq.spring.ConsumerBean; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * + */ +public class MirroredQueueTest extends EmbeddedBrokerTestSupport { + private static final transient Logger LOG = LoggerFactory.getLogger(MirroredQueueTest.class); + private Connection connection; + + public void testSendingToQueueIsMirrored() throws Exception { + if (connection == null) { + connection = createConnection(); + } + connection.start(); + + ConsumerBean messageList = new ConsumerBean(); + messageList.setVerbose(true); + + Destination consumeDestination = createConsumeDestination(); + + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + LOG.info("Consuming from: " + consumeDestination); + + MessageConsumer c1 = session.createConsumer(consumeDestination); + c1.setMessageListener(messageList); + + // create topic producer + ActiveMQQueue sendDestination = new ActiveMQQueue(getQueueName()); + LOG.info("Sending to: " + sendDestination); + + MessageProducer producer = session.createProducer(sendDestination); + assertNotNull(producer); + + int total = 10; + for (int i = 0; i < total; i++) { + producer.send(session.createTextMessage("message: " + i)); + } + + ///Thread.sleep(1000000); + + messageList.assertMessagesArrived(total); + + LOG.info("Received: " + messageList); + } + + public void testTempMirroredQueuesClearDown() throws Exception{ + if (connection == null) { + connection = createConnection(); + } + connection.start(); + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + TemporaryQueue tempQueue = session.createTemporaryQueue(); + RegionBroker rb = (RegionBroker) broker.getBroker().getAdaptor( + RegionBroker.class); + assertTrue(rb.getDestinationMap().size()==5); + tempQueue.delete(); + assertTrue(rb.getDestinationMap().size()==4); + } + + protected Destination createConsumeDestination() { + return new ActiveMQTopic("VirtualTopic.Mirror." + getQueueName()); + } + + protected String getQueueName() { + return "My.Queue"; + } + + @Override + protected BrokerService createBroker() throws Exception { + BrokerService answer = new BrokerService(); + answer.setUseMirroredQueues(true); + answer.setPersistent(isPersistent()); + answer.addConnector(bindAddress); + return answer; + } + + @Override + protected void tearDown() throws Exception { + if (connection != null) { + connection.close(); + } + super.tearDown(); + } +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/virtual/MirroredQueueUsingVirtualTopicQueueTest.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/virtual/MirroredQueueUsingVirtualTopicQueueTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/virtual/MirroredQueueUsingVirtualTopicQueueTest.java new file mode 100644 index 0000000..f8cccf4 --- /dev/null +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/virtual/MirroredQueueUsingVirtualTopicQueueTest.java @@ -0,0 +1,34 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.broker.virtual; + +import javax.jms.Destination; + +import org.apache.activemq.command.ActiveMQQueue; + +/** + * + * + */ +public class MirroredQueueUsingVirtualTopicQueueTest extends MirroredQueueTest { + + @Override + protected Destination createConsumeDestination() { + String queueName = "Consumer.A.VirtualTopic.Mirror." + getQueueName(); + return new ActiveMQQueue(queueName); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/virtual/VirtualDestPerfTest.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/virtual/VirtualDestPerfTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/virtual/VirtualDestPerfTest.java new file mode 100644 index 0000000..b822f5d --- /dev/null +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/virtual/VirtualDestPerfTest.java @@ -0,0 +1,209 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.broker.virtual; + + +import java.util.ArrayList; +import java.util.Collection; +import java.util.LinkedHashMap; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; +import javax.jms.Connection; +import javax.jms.DeliveryMode; +import javax.jms.MessageProducer; +import javax.jms.Session; +import javax.management.ObjectName; +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.broker.jmx.QueueViewMBean; +import org.apache.activemq.broker.region.DestinationInterceptor; +import org.apache.activemq.broker.region.policy.PolicyEntry; +import org.apache.activemq.broker.region.policy.PolicyMap; +import org.apache.activemq.broker.region.virtual.CompositeTopic; +import org.apache.activemq.broker.region.virtual.VirtualDestination; +import org.apache.activemq.broker.region.virtual.VirtualDestinationInterceptor; +import org.apache.activemq.command.ActiveMQBytesMessage; +import org.apache.activemq.command.ActiveMQDestination; +import org.apache.activemq.command.ActiveMQQueue; +import org.apache.activemq.command.ActiveMQTopic; +import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter; +import org.apache.activemq.util.ByteSequence; +import org.junit.Ignore; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + + +public class VirtualDestPerfTest { + + private static final Logger LOG = LoggerFactory.getLogger(VirtualDestPerfTest.class); + public int messageSize = 5*1024; + public int messageCount = 10000; + ActiveMQTopic target = new ActiveMQTopic("target"); + BrokerService brokerService; + ActiveMQConnectionFactory connectionFactory; + + @Test + @Ignore("comparison test - 'new' no wait on future with async send broker side is always on") + public void testAsyncSendBurstToFillCache() throws Exception { + startBroker(4, true, true); + connectionFactory.setUseAsyncSend(true); + + // a burst of messages to fill the cache + messageCount = 22000; + messageSize = 10*1024; + + LinkedHashMap<Integer, Long> results = new LinkedHashMap<Integer, Long>(); + + final ActiveMQQueue queue = new ActiveMQQueue("targetQ"); + for (Integer numThreads : new Integer[]{1, 2}) { + ExecutorService executor = Executors.newFixedThreadPool(numThreads); + final AtomicLong numMessagesToSend = new AtomicLong(messageCount); + purge(); + long startTime = System.currentTimeMillis(); + for (int i=0;i<numThreads;i++) { + executor.execute(new Runnable(){ + @Override + public void run() { + try { + produceMessages(numMessagesToSend, queue); + } catch (Exception e) { + e.printStackTrace(); + } + } + }); + } + executor.shutdown(); + executor.awaitTermination(5, TimeUnit.MINUTES); + long endTime = System.currentTimeMillis(); + long seconds = (endTime - startTime) / 1000; + LOG.info("For numThreads {} duration {}", numThreads.intValue(), seconds); + results.put(numThreads, seconds); + LOG.info("Broker got {} messages", brokerService.getAdminView().getTotalEnqueueCount()); + } + + brokerService.stop(); + brokerService.waitUntilStopped(); + LOG.info("Results: {}", results); + } + + private void purge() throws Exception { + ObjectName[] queues = brokerService.getAdminView().getQueues(); + if (queues.length == 1) { + QueueViewMBean queueViewMBean = (QueueViewMBean) + brokerService.getManagementContext().newProxyInstance(queues[0], QueueViewMBean.class, false); + queueViewMBean.purge(); + } + } + + @Test + @Ignore("comparison test - takes too long and really needs a peek at the graph") + public void testPerf() throws Exception { + LinkedHashMap<Integer, Long> resultsT = new LinkedHashMap<Integer, Long>(); + LinkedHashMap<Integer, Long> resultsF = new LinkedHashMap<Integer, Long>(); + + for (int i=2;i<11;i++) { + for (Boolean concurrent : new Boolean[]{true, false}) { + startBroker(i, concurrent, false); + + long startTime = System.currentTimeMillis(); + produceMessages(new AtomicLong(messageCount), target); + long endTime = System.currentTimeMillis(); + long seconds = (endTime - startTime) / 1000; + LOG.info("For routes {} duration {}", i, seconds); + if (concurrent) { + resultsT.put(i, seconds); + } else { + resultsF.put(i, seconds); + } + brokerService.stop(); + brokerService.waitUntilStopped(); + } + } + LOG.info("results T{} F{}", resultsT, resultsF); + LOG.info("http://www.chartgo.com/samples.do?chart=line&border=1&show3d=0&width=600&height=500&roundedge=1&transparency=1&legend=1&title=Send:10k::Concurrent-v-Serial&xtitle=routes&ytitle=Duration(seconds)&chrtbkgndcolor=white&threshold=0.0&lang=en" + + "&xaxis1=" + toStr(resultsT.keySet()) + + "&yaxis1=" + toStr(resultsT.values()) + + "&group1=concurrent" + + "&xaxis2=" + toStr(resultsF.keySet()) + + "&yaxis2=" + toStr(resultsF.values()) + + "&group2=serial" + + "&from=linejsp"); + } + + private String toStr(Collection set) { + return set.toString().replace(",","%0D%0A").replace("[","").replace("]","").replace(" ", ""); + } + + protected void produceMessages(AtomicLong messageCount, ActiveMQDestination destination) throws Exception { + final ByteSequence payLoad = new ByteSequence(new byte[messageSize]); + Connection connection = connectionFactory.createConnection(); + MessageProducer messageProducer = connection.createSession(false, Session.AUTO_ACKNOWLEDGE).createProducer(destination); + messageProducer.setDeliveryMode(DeliveryMode.PERSISTENT); + ActiveMQBytesMessage message = new ActiveMQBytesMessage(); + message.setContent(payLoad); + while (messageCount.decrementAndGet() >= 0) { + messageProducer.send(message); + } + connection.close(); + } + + private void startBroker(int fanoutCount, boolean concurrentSend, boolean concurrentStoreAndDispatchQueues) throws Exception { + brokerService = new BrokerService(); + brokerService.setDeleteAllMessagesOnStartup(true); + brokerService.setUseVirtualTopics(true); + brokerService.addConnector("tcp://0.0.0.0:0"); + brokerService.setAdvisorySupport(false); + PolicyMap destPolicyMap = new PolicyMap(); + PolicyEntry defaultEntry = new PolicyEntry(); + defaultEntry.setExpireMessagesPeriod(0); + defaultEntry.setOptimizedDispatch(true); + defaultEntry.setCursorMemoryHighWaterMark(110); + destPolicyMap.setDefaultEntry(defaultEntry); + brokerService.setDestinationPolicy(destPolicyMap); + + CompositeTopic route = new CompositeTopic(); + route.setName("target"); + route.setForwardOnly(true); + route.setConcurrentSend(concurrentSend); + Collection<ActiveMQQueue> routes = new ArrayList<ActiveMQQueue>(); + for (int i=0; i<fanoutCount; i++) { + routes.add(new ActiveMQQueue("route." + i)); + } + route.setForwardTo(routes); + VirtualDestinationInterceptor interceptor = new VirtualDestinationInterceptor(); + interceptor.setVirtualDestinations(new VirtualDestination[]{route}); + brokerService.setDestinationInterceptors(new DestinationInterceptor[]{interceptor}); + brokerService.start(); + + connectionFactory = new ActiveMQConnectionFactory(brokerService.getTransportConnectors().get(0).getPublishableConnectString()); + connectionFactory.setWatchTopicAdvisories(false); + if (brokerService.getPersistenceAdapter() instanceof KahaDBPersistenceAdapter) { + + //with parallel sends and no consumers, concurrentStoreAnd dispatch, which uses a single thread by default + // will stop/impeed write batching. The num threads will need tweaking when consumers are in the mix but may introduce + // order issues + ((KahaDBPersistenceAdapter)brokerService.getPersistenceAdapter()).setConcurrentStoreAndDispatchQueues(concurrentStoreAndDispatchQueues); + } + } +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/virtual/VirtualTopicDLQTest.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/virtual/VirtualTopicDLQTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/virtual/VirtualTopicDLQTest.java new file mode 100644 index 0000000..7c853cf --- /dev/null +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/virtual/VirtualTopicDLQTest.java @@ -0,0 +1,415 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.broker.virtual; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +import javax.jms.DeliveryMode; +import javax.jms.Destination; +import javax.jms.ExceptionListener; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageListener; +import javax.jms.Session; +import javax.jms.TextMessage; + +import junit.framework.TestCase; + +import org.apache.activemq.ActiveMQConnection; +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.ActiveMQMessageProducer; +import org.apache.activemq.ActiveMQSession; +import org.apache.activemq.RedeliveryPolicy; +import org.apache.activemq.broker.BrokerFactory; +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.broker.region.Queue; +import org.apache.activemq.broker.region.RegionBroker; +import org.apache.activemq.command.ActiveMQQueue; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Unit test for virtual topics and DLQ messaging. See individual test for more + * detail + * + */ +public class VirtualTopicDLQTest extends TestCase { + private static BrokerService broker; + + private static final Logger LOG = LoggerFactory.getLogger(VirtualTopicDLQTest.class); + + static final String jmsConnectionURI = "failover:(vm://localhost)"; + + // Virtual Topic that the test publishes 10 messages to + private static final String virtualTopicName = "VirtualTopic.Test"; + + // Queues that receive all the messages send to the virtual topic + private static final String consumer1Prefix = "Consumer.A."; + private static final String consumer2Prefix = "Consumer.B."; + private static final String consumer3Prefix = "Consumer.C."; + + // Expected Individual Dead Letter Queue names that are tied to the + // Subscriber Queues + private static final String dlqPrefix = "ActiveMQ.DLQ.Topic."; + + // Number of messages + private static final int numberMessages = 6; + + @Before + public void setUp() throws Exception { + try { + broker = BrokerFactory.createBroker("xbean:org/apache/activemq/broker/virtual/virtual-individual-dlq.xml", true); + broker.start(); + broker.waitUntilStarted(); + } catch (Exception e) { + e.printStackTrace(); + throw e; + } + } + + @After + public void tearDown() throws Exception { + try { + // Purge the DLQ's so counts are correct for next run + purgeDestination(dlqPrefix + consumer1Prefix + virtualTopicName); + purgeDestination(dlqPrefix + consumer2Prefix + virtualTopicName); + purgeDestination(dlqPrefix + consumer3Prefix + virtualTopicName); + } catch (Exception e) { + e.printStackTrace(); + } + + if (broker != null) { + broker.stop(); + broker.waitUntilStopped(); + broker = null; + } + } + + /* + * This test verifies that all undelivered messages sent to a consumers + * listening on a queue associated with a virtual topic with be forwarded to + * separate DLQ's. + * + * Note that the broker config, deadLetterStrategy need to have the enable + * audit set to false so that duplicate message sent from a topic to + * individual consumers are forwarded to the DLQ + * + * <deadLetterStrategy> <bean + * xmlns="http://www.springframework.org/schema/beans" + * class="org.apache.activemq.broker.region.policy.IndividualDeadLetterStrategy" + * > <property name="useQueueForQueueMessages" value="true"></property> + * <property name="processNonPersistent" value="true"></property> <property + * name="processExpired" value="false"></property> <property + * name="enableAudit" value="false"></property> + * + * </bean> </deadLetterStrategy> + */ + @Test + public void testVirtualTopicSubscriberDeadLetterQueue() throws Exception { + + TestConsumer consumer1 = null; + TestConsumer consumer2 = null; + TestConsumer consumer3 = null; + TestConsumer dlqConsumer1 = null; + TestConsumer dlqConsumer2 = null; + TestConsumer dlqConsumer3 = null; + + try { + + // The first 2 consumers will rollback, ultimately causing messages + // to land on the DLQ + consumer1 = new TestConsumer(consumer1Prefix + virtualTopicName, false, numberMessages, true); + thread(consumer1, false); + + consumer2 = new TestConsumer(consumer2Prefix + virtualTopicName, false, numberMessages, true); + thread(consumer2, false); + + // TestConsumer that does not throw exceptions, messages should not + // land on DLQ + consumer3 = new TestConsumer(consumer3Prefix + virtualTopicName, false, numberMessages, false); + thread(consumer3, false); + + // TestConsumer to read the expected Dead Letter Queue + dlqConsumer1 = new TestConsumer(dlqPrefix + consumer1Prefix + virtualTopicName, false, numberMessages, false); + thread(dlqConsumer1, false); + + dlqConsumer2 = new TestConsumer(dlqPrefix + consumer2Prefix + virtualTopicName, false, numberMessages, false); + thread(dlqConsumer2, false); + + dlqConsumer3 = new TestConsumer(dlqPrefix + consumer3Prefix + virtualTopicName, false, numberMessages, false); + thread(dlqConsumer3, false); + + // Give the consumers a second to start + Thread.sleep(1000); + + // Start the producer + TestProducer producer = new TestProducer(virtualTopicName, true, numberMessages); + thread(producer, false); + + assertTrue("sent all producer messages in time, count is: " + producer.getLatch().getCount(), producer.getLatch().await(10, TimeUnit.SECONDS)); + LOG.info("producer successful, count = " + producer.getLatch().getCount()); + + assertTrue("remaining consumer1 count should be zero, is: " + consumer1.getLatch().getCount(), consumer1.getLatch().await(10, TimeUnit.SECONDS)); + LOG.info("consumer1 successful, count = " + consumer1.getLatch().getCount()); + + assertTrue("remaining consumer2 count should be zero, is: " + consumer2.getLatch().getCount(), consumer2.getLatch().await(10, TimeUnit.SECONDS)); + LOG.info("consumer2 successful, count = " + consumer2.getLatch().getCount()); + + assertTrue("remaining consumer3 count should be zero, is: " + consumer3.getLatch().getCount(), consumer3.getLatch().await(10, TimeUnit.SECONDS)); + LOG.info("consumer3 successful, count = " + consumer3.getLatch().getCount()); + + assertTrue("remaining dlqConsumer1 count should be zero, is: " + dlqConsumer1.getLatch().getCount(), + dlqConsumer1.getLatch().await(10, TimeUnit.SECONDS)); + LOG.info("dlqConsumer1 successful, count = " + dlqConsumer1.getLatch().getCount()); + + assertTrue("remaining dlqConsumer2 count should be zero, is: " + dlqConsumer2.getLatch().getCount(), + dlqConsumer2.getLatch().await(10, TimeUnit.SECONDS)); + LOG.info("dlqConsumer2 successful, count = " + dlqConsumer2.getLatch().getCount()); + + assertTrue("remaining dlqConsumer3 count should be " + numberMessages + ", is: " + dlqConsumer3.getLatch().getCount(), dlqConsumer3.getLatch() + .getCount() == numberMessages); + LOG.info("dlqConsumer2 successful, count = " + dlqConsumer2.getLatch().getCount()); + + } catch (Exception e) { + e.printStackTrace(); + throw e; + } finally { + // Tell consumers to stop (don't read any more messages after this) + if (consumer1 != null) + consumer1.setStop(true); + if (consumer2 != null) + consumer2.setStop(true); + if (consumer3 != null) + consumer3.setStop(true); + if (dlqConsumer1 != null) + dlqConsumer1.setStop(true); + if (dlqConsumer2 != null) + dlqConsumer2.setStop(true); + if (dlqConsumer3 != null) + dlqConsumer3.setStop(true); + } + } + + private static Thread thread(Runnable runnable, boolean daemon) { + Thread brokerThread = new Thread(runnable); + brokerThread.setDaemon(daemon); + brokerThread.start(); + return brokerThread; + } + + private class TestProducer implements Runnable { + private String destinationName = null; + private boolean isTopic = true; + private int numberMessages = 0; + private CountDownLatch latch = null; + + public TestProducer(String destinationName, boolean isTopic, int numberMessages) { + this.destinationName = destinationName; + this.isTopic = isTopic; + this.numberMessages = numberMessages; + latch = new CountDownLatch(numberMessages); + } + + public CountDownLatch getLatch() { + return latch; + } + + public void run() { + ActiveMQConnectionFactory connectionFactory = null; + ActiveMQConnection connection = null; + ActiveMQSession session = null; + Destination destination = null; + + try { + LOG.info("Started TestProducer for destination (" + destinationName + ")"); + + connectionFactory = new ActiveMQConnectionFactory(jmsConnectionURI); + connection = (ActiveMQConnection) connectionFactory.createConnection(); + connection.start(); + session = (ActiveMQSession) connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + + if (isTopic) { + destination = session.createTopic(this.destinationName); + } else { + destination = session.createQueue(this.destinationName); + } + + // Create a MessageProducer from the Session to the Topic or + // Queue + ActiveMQMessageProducer producer = (ActiveMQMessageProducer) session.createProducer(destination); + producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); + + for (int i = 0; i < numberMessages; i++) { + TextMessage message = (TextMessage) session.createTextMessage("I am a message :: " + String.valueOf(i)); + try { + producer.send(message); + + } catch (Exception deeperException) { + LOG.info("Producer for destination (" + destinationName + ") Caught: " + deeperException); + } + + latch.countDown(); + Thread.sleep(1000); + } + + LOG.info("Finished TestProducer for destination (" + destinationName + ")"); + + } catch (Exception e) { + LOG.error("Terminating TestProducer(" + destinationName + ")Caught: " + e); + e.printStackTrace(); + + } finally { + try { + // Clean up + if (session != null) + session.close(); + if (connection != null) + connection.close(); + + } catch (Exception e) { + e.printStackTrace(); + LOG.error("Closing connection/session (" + destinationName + ")Caught: " + e); + } + } + } + } + + private class TestConsumer implements Runnable, ExceptionListener, MessageListener { + private String destinationName = null; + private boolean isTopic = true; + private CountDownLatch latch = null; + private int maxRedeliveries = 0; + private int receivedMessageCounter = 0; + private boolean bFakeFail = false; + private boolean bStop = false; + + private ActiveMQConnectionFactory connectionFactory = null; + private ActiveMQConnection connection = null; + private Session session = null; + private MessageConsumer consumer = null; + + public TestConsumer(String destinationName, boolean isTopic, int expectedNumberMessages, boolean bFakeFail) { + this.destinationName = destinationName; + this.isTopic = isTopic; + latch = new CountDownLatch(expectedNumberMessages * (this.bFakeFail ? (maxRedeliveries + 1) : 1)); + this.bFakeFail = bFakeFail; + } + + public CountDownLatch getLatch() { + return latch; + } + + public void run() { + + try { + LOG.info("Started TestConsumer for destination (" + destinationName + ")"); + + connectionFactory = new ActiveMQConnectionFactory(jmsConnectionURI); + connection = (ActiveMQConnection) connectionFactory.createConnection(); + connection.start(); + session = connection.createSession(true, Session.SESSION_TRANSACTED); + + RedeliveryPolicy policy = connection.getRedeliveryPolicy(); + policy.setInitialRedeliveryDelay(1); + policy.setUseExponentialBackOff(false); + policy.setMaximumRedeliveries(maxRedeliveries); + + connection.setExceptionListener(this); + + Destination destination = null; + if (isTopic) { + destination = session.createTopic(destinationName); + } else { + destination = session.createQueue(destinationName); + } + + consumer = session.createConsumer(destination); + consumer.setMessageListener(this); + + while (!bStop) { + Thread.sleep(100); + } + + LOG.info("Finished TestConsumer for destination name (" + destinationName + ") remaining " + this.latch.getCount() + " messages " + + this.toString()); + + } catch (Exception e) { + LOG.error("Consumer (" + destinationName + ") Caught: " + e); + e.printStackTrace(); + } finally { + try { + // Clean up + if (consumer != null) + consumer.close(); + if (session != null) + session.close(); + if (connection != null) + connection.close(); + + } catch (Exception e) { + e.printStackTrace(); + LOG.error("Closing connection/session (" + destinationName + ")Caught: " + e); + } + } + } + + public synchronized void onException(JMSException ex) { + ex.printStackTrace(); + LOG.error("Consumer for destination, (" + destinationName + "), JMS Exception occured. Shutting down client."); + } + + public synchronized void setStop(boolean bStop) { + this.bStop = bStop; + } + + public synchronized void onMessage(Message message) { + receivedMessageCounter++; + latch.countDown(); + + LOG.info("Consumer for destination (" + destinationName + ") latch countdown: " + latch.getCount() + " :: Number messages received " + + this.receivedMessageCounter); + + try { + LOG.info("Consumer for destination (" + destinationName + ") Received message id :: " + message.getJMSMessageID()); + + if (!bFakeFail) { + LOG.info("Consumer on destination " + destinationName + " committing JMS Session for message: " + message.toString()); + session.commit(); + } else { + LOG.info("Consumer on destination " + destinationName + " rolling back JMS Session for message: " + message.toString()); + session.rollback(); // rolls back all the consumed messages + // on the session to + } + + } catch (JMSException ex) { + ex.printStackTrace(); + LOG.error("Error reading JMS Message from destination " + destinationName + "."); + } + } + } + + private static void purgeDestination(String destination) throws Exception { + final Queue dest = (Queue) ((RegionBroker) broker.getRegionBroker()).getQueueRegion().getDestinationMap().get(new ActiveMQQueue(destination)); + dest.purge(); + assertEquals(0, dest.getDestinationStatistics().getMessages().getCount()); + } +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/virtual/VirtualTopicDisconnectSelectorTest.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/virtual/VirtualTopicDisconnectSelectorTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/virtual/VirtualTopicDisconnectSelectorTest.java new file mode 100644 index 0000000..8b95345 --- /dev/null +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/virtual/VirtualTopicDisconnectSelectorTest.java @@ -0,0 +1,185 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.broker.virtual; + +import java.net.URI; + +import javax.jms.Connection; +import javax.jms.Destination; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageListener; +import javax.jms.MessageProducer; +import javax.jms.Session; +import javax.jms.TextMessage; +import org.apache.activemq.EmbeddedBrokerTestSupport; +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.command.ActiveMQQueue; +import org.apache.activemq.command.ActiveMQTopic; +import org.apache.activemq.spring.ConsumerBean; +import org.apache.activemq.xbean.XBeanBrokerFactory; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Test case for https://issues.apache.org/jira/browse/AMQ-3004 + */ + +public class VirtualTopicDisconnectSelectorTest extends EmbeddedBrokerTestSupport { + + private static final Logger LOG = LoggerFactory.getLogger(VirtualTopicDisconnectSelectorTest.class); + protected Connection connection; + + public void testVirtualTopicSelectorDisconnect() throws Exception { + testVirtualTopicDisconnect("odd = 'no'", 3000, 1500); + } + + public void testVirtualTopicNoSelectorDisconnect() throws Exception { + testVirtualTopicDisconnect(null, 3000, 3000); + } + + public void testVirtualTopicDisconnect(String messageSelector, int total , int expected) throws Exception { + if (connection == null) { + connection = createConnection(); + } + connection.start(); + + final ConsumerBean messageList = new ConsumerBean(); + + Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE); + + Destination producerDestination = getProducerDestination(); + Destination destination = getConsumerDsetination(); + + LOG.info("Sending to: " + producerDestination); + LOG.info("Consuming from: " + destination ); + + MessageConsumer consumer = createConsumer(session, destination, messageSelector); + + MessageListener listener = new MessageListener(){ + public void onMessage(Message message){ + messageList.onMessage(message); + try { + message.acknowledge(); + } catch (JMSException e) { + e.printStackTrace(); + } + } + }; + + consumer.setMessageListener(listener); + + + // create topic producer + MessageProducer producer = session.createProducer(producerDestination); + assertNotNull(producer); + + int disconnectCount = total/3; + int reconnectCount = (total * 2)/3; + + for (int i = 0; i < total; i++) { + producer.send(createMessage(session, i)); + + if (i==disconnectCount){ + consumer.close(); + } + if (i==reconnectCount){ + consumer = createConsumer(session, destination, messageSelector); + consumer.setMessageListener(listener); + } + } + + assertMessagesArrived(messageList, expected ,10000); + } + + protected Destination getConsumerDsetination() { + return new ActiveMQQueue("Consumer.VirtualTopic.TEST"); + } + + + protected Destination getProducerDestination() { + return new ActiveMQTopic("VirtualTopic.TEST"); + } + + protected void setUp() throws Exception { + super.setUp(); + } + + protected MessageConsumer createConsumer(Session session, Destination destination, String messageSelector) throws JMSException { + if (messageSelector != null) { + return session.createConsumer(destination, messageSelector); + } else { + return session.createConsumer(destination); + } + } + + protected TextMessage createMessage(Session session, int i) throws JMSException { + TextMessage textMessage = session.createTextMessage("message: " + i); + if (i % 2 != 0) { + textMessage.setStringProperty("odd", "yes"); + } else { + textMessage.setStringProperty("odd", "no"); + } + textMessage.setIntProperty("i", i); + return textMessage; + } + + + + protected void assertMessagesArrived(ConsumerBean messageList, int expected, long timeout) { + messageList.assertMessagesArrived(expected,timeout); + + messageList.flushMessages(); + + + LOG.info("validate no other messages on queues"); + try { + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + + Destination destination1 = getConsumerDsetination(); + + MessageConsumer c1 = session.createConsumer(destination1, null); + c1.setMessageListener(messageList); + + + LOG.info("send one simple message that should go to both consumers"); + MessageProducer producer = session.createProducer(getProducerDestination()); + assertNotNull(producer); + + producer.send(session.createTextMessage("Last Message")); + + messageList.assertMessagesArrived(1); + + } catch (JMSException e) { + e.printStackTrace(); + fail("unexpeced ex while waiting for last messages: " + e); + } + } + + + protected String getBrokerConfigUri() { + return "org/apache/activemq/broker/virtual/disconnected-selector.xml"; + } + + protected BrokerService createBroker() throws Exception { + XBeanBrokerFactory factory = new XBeanBrokerFactory(); + BrokerService answer = factory.createBroker(new URI(getBrokerConfigUri())); + return answer; + } + +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/virtual/VirtualTopicPubSubTest.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/virtual/VirtualTopicPubSubTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/virtual/VirtualTopicPubSubTest.java new file mode 100644 index 0000000..42e6e60 --- /dev/null +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/virtual/VirtualTopicPubSubTest.java @@ -0,0 +1,129 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.broker.virtual; + +import java.util.Vector; + +import javax.jms.Connection; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Session; + +import junit.framework.Test; + +import org.apache.activemq.EmbeddedBrokerTestSupport; +import org.apache.activemq.command.ActiveMQQueue; +import org.apache.activemq.command.ActiveMQTopic; +import org.apache.activemq.spring.ConsumerBean; + +/** + * + * + */ +public class VirtualTopicPubSubTest extends EmbeddedBrokerTestSupport { + + private Vector<Connection> connections = new Vector<Connection>(); + public int ackMode = Session.AUTO_ACKNOWLEDGE; + + public static Test suite() { + return suite(VirtualTopicPubSubTest.class); + } + + public void initCombosForTestVirtualTopicCreation() { + addCombinationValues("ackMode", new Object[] {new Integer(Session.AUTO_ACKNOWLEDGE), new Integer(Session.CLIENT_ACKNOWLEDGE) }); + } + + private boolean doneTwice = false; + + public void testVirtualTopicCreation() throws Exception { + doTestVirtualTopicCreation(10); + } + + public void doTestVirtualTopicCreation(int total) throws Exception { + + ConsumerBean messageList = new ConsumerBean() { + public synchronized void onMessage(Message message) { + super.onMessage(message); + if (ackMode == Session.CLIENT_ACKNOWLEDGE) { + try { + message.acknowledge(); + } catch (JMSException e) { + e.printStackTrace(); + } + } + + } + }; + messageList.setVerbose(true); + + String queueAName = getVirtualTopicConsumerName(); + // create consumer 'cluster' + ActiveMQQueue queue1 = new ActiveMQQueue(queueAName); + ActiveMQQueue queue2 = new ActiveMQQueue(queueAName); + + Session session = createStartAndTrackConnection().createSession(false, ackMode); + MessageConsumer c1 = session.createConsumer(queue1); + + session = createStartAndTrackConnection().createSession(false, ackMode); + MessageConsumer c2 = session.createConsumer(queue2); + + c1.setMessageListener(messageList); + c2.setMessageListener(messageList); + + // create topic producer + Session producerSession = createStartAndTrackConnection().createSession(false, ackMode); + MessageProducer producer = producerSession.createProducer(new ActiveMQTopic(getVirtualTopicName())); + assertNotNull(producer); + + for (int i = 0; i < total; i++) { + producer.send(producerSession.createTextMessage("message: " + i)); + } + + messageList.assertMessagesArrived(total); + + // do twice so we confirm messages do not get redelivered after client acknowledgement + if( doneTwice == false ) { + doneTwice = true; + doTestVirtualTopicCreation(0); + } + } + + private Connection createStartAndTrackConnection() throws Exception { + Connection connection = createConnection(); + connection.start(); + connections.add(connection); + return connection; + } + + protected String getVirtualTopicName() { + return "VirtualTopic.TEST"; + } + + protected String getVirtualTopicConsumerName() { + return "Consumer.A.VirtualTopic.TEST"; + } + + + protected void tearDown() throws Exception { + for (Connection connection: connections) { + connection.close(); + } + super.tearDown(); + } +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/virtual/VirtualTopicPubSubUsingXBeanTest.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/virtual/VirtualTopicPubSubUsingXBeanTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/virtual/VirtualTopicPubSubUsingXBeanTest.java new file mode 100644 index 0000000..6952331 --- /dev/null +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/virtual/VirtualTopicPubSubUsingXBeanTest.java @@ -0,0 +1,52 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.broker.virtual; + +import java.net.URI; + +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.xbean.XBeanBrokerFactory; + +/** + * + * + */ +public class VirtualTopicPubSubUsingXBeanTest extends VirtualTopicPubSubTest { + + protected String getVirtualTopicConsumerName() { + return "VirtualTopicConsumers.ConsumerNumberOne.FOO"; + } + + protected String getVirtualTopicName() { + return "FOO"; + } + + protected BrokerService createBroker() throws Exception { + XBeanBrokerFactory factory = new XBeanBrokerFactory(); + BrokerService answer = factory.createBroker(new URI(getBrokerConfigUri())); + + // lets disable persistence as we are a test + answer.setPersistent(false); + + return answer; + } + + protected String getBrokerConfigUri() { + return "org/apache/activemq/broker/virtual/global-virtual-topics.xml"; + } + +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/virtual/VirtualTopicSelectorTest.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/virtual/VirtualTopicSelectorTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/virtual/VirtualTopicSelectorTest.java new file mode 100644 index 0000000..3287bab --- /dev/null +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/virtual/VirtualTopicSelectorTest.java @@ -0,0 +1,105 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.broker.virtual; + +import javax.jms.Destination; +import javax.jms.JMSException; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Session; + +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.broker.region.DestinationInterceptor; +import org.apache.activemq.broker.region.virtual.VirtualDestination; +import org.apache.activemq.broker.region.virtual.VirtualDestinationInterceptor; +import org.apache.activemq.broker.region.virtual.VirtualTopic; +import org.apache.activemq.command.ActiveMQQueue; +import org.apache.activemq.command.ActiveMQTopic; +import org.apache.activemq.spring.ConsumerBean; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class VirtualTopicSelectorTest extends CompositeTopicTest { + + private static final Logger LOG = LoggerFactory.getLogger(VirtualTopicSelectorTest.class); + + protected Destination getConsumer1Dsetination() { + return new ActiveMQQueue("Consumer.1.VirtualTopic.TEST"); + } + + protected Destination getConsumer2Dsetination() { + return new ActiveMQQueue("Consumer.2.VirtualTopic.TEST"); + } + + protected Destination getProducerDestination() { + return new ActiveMQTopic("VirtualTopic.TEST"); + } + + @Override + protected void assertMessagesArrived(ConsumerBean messageList1, ConsumerBean messageList2) { + messageList1.assertMessagesArrived(total/2); + messageList2.assertMessagesArrived(total/2); + + messageList1.flushMessages(); + messageList2.flushMessages(); + + LOG.info("validate no other messages on queues"); + try { + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + + Destination destination1 = getConsumer1Dsetination(); + Destination destination2 = getConsumer2Dsetination(); + MessageConsumer c1 = session.createConsumer(destination1, null); + MessageConsumer c2 = session.createConsumer(destination2, null); + c1.setMessageListener(messageList1); + c2.setMessageListener(messageList2); + + + LOG.info("send one simple message that should go to both consumers"); + MessageProducer producer = session.createProducer(getProducerDestination()); + assertNotNull(producer); + + producer.send(session.createTextMessage("Last Message")); + + messageList1.assertMessagesArrived(1); + messageList2.assertMessagesArrived(1); + + } catch (JMSException e) { + e.printStackTrace(); + fail("unexpeced ex while waiting for last messages: " + e); + } + } + + @Override + protected BrokerService createBroker() throws Exception { + // use message selectors on consumers that need to propagate up to the virtual + // topic dispatch so that un matched messages do not linger on subscription queues + messageSelector1 = "odd = 'yes'"; + messageSelector2 = "odd = 'no'"; + + BrokerService broker = new BrokerService(); + broker.setPersistent(false); + + VirtualTopic virtualTopic = new VirtualTopic(); + // the new config that enables selectors on the intercepter + virtualTopic.setSelectorAware(true); + VirtualDestinationInterceptor interceptor = new VirtualDestinationInterceptor(); + interceptor.setVirtualDestinations(new VirtualDestination[]{virtualTopic}); + broker.setDestinationInterceptors(new DestinationInterceptor[]{interceptor}); + return broker; + } +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/virtual/VirtualTopicsAndDurableSubsTest.java ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/virtual/VirtualTopicsAndDurableSubsTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/virtual/VirtualTopicsAndDurableSubsTest.java new file mode 100644 index 0000000..d1e709f --- /dev/null +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/virtual/VirtualTopicsAndDurableSubsTest.java @@ -0,0 +1,103 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.broker.virtual; + +import javax.jms.Connection; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Session; + +import org.apache.activemq.broker.jmx.MBeanTest; +import org.apache.activemq.command.ActiveMQQueue; +import org.apache.activemq.command.ActiveMQTopic; +import org.apache.activemq.spring.ConsumerBean; + +public class VirtualTopicsAndDurableSubsTest extends MBeanTest { + + private Connection connection; + + public void testVirtualTopicCreationAndDurableSubs() throws Exception { + if (connection == null) { + connection = createConnection(); + } + connection.setClientID(getAClientID()); + connection.start(); + + ConsumerBean messageList = new ConsumerBean(); + messageList.setVerbose(true); + + String queueAName = getVirtualTopicConsumerName(); + // create consumer 'cluster' + ActiveMQQueue queue1 = new ActiveMQQueue(queueAName); + ActiveMQQueue queue2 = new ActiveMQQueue(queueAName); + + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + MessageConsumer c1 = session.createConsumer(queue1); + MessageConsumer c2 = session.createConsumer(queue2); + + c1.setMessageListener(messageList); + c2.setMessageListener(messageList); + + // create topic producer + MessageProducer producer = session.createProducer(new ActiveMQTopic(getVirtualTopicName())); + assertNotNull(producer); + + int total = 10; + for (int i = 0; i < total; i++) { + producer.send(session.createTextMessage("message: " + i)); + } + messageList.assertMessagesArrived(total); + + //Add and remove durable subscriber after using VirtualTopics + assertCreateAndDestroyDurableSubscriptions(); + } + + protected String getAClientID(){ + return "VirtualTopicCreationAndDurableSubs"; + } + + protected String getVirtualTopicName() { + return "VirtualTopic.TEST"; + } + + + protected String getVirtualTopicConsumerName() { + return "Consumer.A.VirtualTopic.TEST"; + } + + protected String getDurableSubscriberName(){ + return "Sub1"; + } + + protected String getDurableSubscriberTopicName(){ + return "simple.topic"; + } + + protected void tearDown() throws Exception { + if (connection != null) { + connection.close(); + } + super.tearDown(); + } + + //Overrides test cases from MBeanTest to avoid having them run. + public void testMBeans() throws Exception {} + public void testMoveMessages() throws Exception {} + public void testRetryMessages() throws Exception {} + public void testMoveMessagesBySelector() throws Exception {} + public void testCopyMessagesBySelector() throws Exception {} +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/virtual/composite-queue.xml ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/virtual/composite-queue.xml b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/virtual/composite-queue.xml new file mode 100644 index 0000000..ed3bc73 --- /dev/null +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/virtual/composite-queue.xml @@ -0,0 +1,47 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> + +<!-- this file can only be parsed using the xbean-spring library --> +<!-- START SNIPPET: xbean --> +<beans + xmlns="http://www.springframework.org/schema/beans" + xmlns:amq="http://activemq.apache.org/schema/core" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.0.xsd + http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd"> + + <bean class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer" /> + + <broker persistent="false" useJmx="false" xmlns="http://activemq.apache.org/schema/core"> + <destinationInterceptors> + <virtualDestinationInterceptor> + <virtualDestinations> + <compositeQueue name="MY.QUEUE"> + <forwardTo> + <queue physicalName="FOO" /> + <topic physicalName="BAR" /> + </forwardTo> + </compositeQueue> + </virtualDestinations> + </virtualDestinationInterceptor> + </destinationInterceptors> + + </broker> + +</beans> +<!-- END SNIPPET: xbean --> http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/virtual/composite-topic.xml ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/virtual/composite-topic.xml b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/virtual/composite-topic.xml new file mode 100644 index 0000000..ded6471 --- /dev/null +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/virtual/composite-topic.xml @@ -0,0 +1,47 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> + +<!-- this file can only be parsed using the xbean-spring library --> +<!-- START SNIPPET: xbean --> +<beans + xmlns="http://www.springframework.org/schema/beans" + xmlns:amq="http://activemq.apache.org/schema/core" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.0.xsd + http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd"> + + <bean class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer" /> + + <broker xmlns="http://activemq.apache.org/schema/core"> + <destinationInterceptors> + <virtualDestinationInterceptor> + <virtualDestinations> + <compositeTopic name="MY.TOPIC"> + <forwardTo> + <queue physicalName="FOO" /> + <topic physicalName="BAR" /> + </forwardTo> + </compositeTopic> + </virtualDestinations> + </virtualDestinationInterceptor> + </destinationInterceptors> + + </broker> + +</beans> +<!-- END SNIPPET: xbean --> http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/virtual/disconnected-selector.xml ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/virtual/disconnected-selector.xml b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/virtual/disconnected-selector.xml new file mode 100644 index 0000000..2772910 --- /dev/null +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/virtual/disconnected-selector.xml @@ -0,0 +1,43 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> + +<!-- this file can only be parsed using the xbean-spring library --> +<!-- START SNIPPET: xbean --> +<beans + xmlns="http://www.springframework.org/schema/beans" + xmlns:amq="http://activemq.apache.org/schema/core" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.0.xsd + http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd"> + + <bean class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer" /> + + <broker xmlns="http://activemq.apache.org/schema/core" persistent="false"> + <destinationInterceptors> + <virtualDestinationInterceptor> + <virtualDestinations> + <virtualTopic name="VirtualTopic.>" prefix="Consumer." selectorAware="true"/> + </virtualDestinations> + </virtualDestinationInterceptor> + </destinationInterceptors> + <plugins> + <virtualSelectorCacheBrokerPlugin persistFile = "target/selectorcache.data"/> + </plugins> + </broker> +</beans> +<!-- END SNIPPET: xbean --> http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/virtual/filtered-queue.xml ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/virtual/filtered-queue.xml b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/virtual/filtered-queue.xml new file mode 100644 index 0000000..d51f03c --- /dev/null +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/virtual/filtered-queue.xml @@ -0,0 +1,47 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> + +<!-- this file can only be parsed using the xbean-spring library --> +<!-- START SNIPPET: xbean --> +<beans + xmlns="http://www.springframework.org/schema/beans" + xmlns:amq="http://activemq.apache.org/schema/core" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.0.xsd + http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd"> + + <bean class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer" /> + + <broker xmlns="http://activemq.apache.org/schema/core"> + <destinationInterceptors> + <virtualDestinationInterceptor> + <virtualDestinations> + <compositeQueue name="MY.QUEUE"> + <forwardTo> + <filteredDestination selector="odd = 'yes'" queue="FOO"/> + <filteredDestination selector="i = 5" topic="BAR"/> + </forwardTo> + </compositeQueue> + </virtualDestinations> + </virtualDestinationInterceptor> + </destinationInterceptors> + + </broker> + +</beans> +<!-- END SNIPPET: xbean --> http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/virtual/global-virtual-topics.xml ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/virtual/global-virtual-topics.xml b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/virtual/global-virtual-topics.xml new file mode 100644 index 0000000..ddd0667 --- /dev/null +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/virtual/global-virtual-topics.xml @@ -0,0 +1,42 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> + +<!-- this file can only be parsed using the xbean-spring library --> +<!-- START SNIPPET: xbean --> +<beans + xmlns="http://www.springframework.org/schema/beans" + xmlns:amq="http://activemq.apache.org/schema/core" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.0.xsd + http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd"> + + <bean class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer" /> + + <broker xmlns="http://activemq.apache.org/schema/core"> + <destinationInterceptors> + <virtualDestinationInterceptor> + <virtualDestinations> + <virtualTopic name=">" prefix="VirtualTopicConsumers.*." selectorAware="false"/> + </virtualDestinations> + </virtualDestinationInterceptor> + </destinationInterceptors> + + </broker> + +</beans> +<!-- END SNIPPET: xbean --> http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/virtual/virtual-individual-dlq.xml ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/virtual/virtual-individual-dlq.xml b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/virtual/virtual-individual-dlq.xml new file mode 100644 index 0000000..d725436 --- /dev/null +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/virtual/virtual-individual-dlq.xml @@ -0,0 +1,80 @@ +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> +<!-- START SNIPPET: example --> +<beans + xmlns="http://www.springframework.org/schema/beans" + xmlns:amq="http://activemq.apache.org/schema/core" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.0.xsd + http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd"> + + + + <!-- + The <broker> element is used to configure the ActiveMQ broker. + --> + <broker xmlns="http://activemq.apache.org/schema/core" brokerName="bcBroker"> + + <destinationInterceptors> + <virtualDestinationInterceptor> + <virtualDestinations> + <virtualTopic name="VirtualTopic.>" prefix="Consumer.*." /> + </virtualDestinations> + </virtualDestinationInterceptor> + </destinationInterceptors> + + <destinationPolicy> + <policyMap> + <policyEntries> + <policyEntry queue=">" memoryLimit="128 mb" > + <deadLetterStrategy> + <bean xmlns="http://www.springframework.org/schema/beans" + class="org.apache.activemq.broker.region.policy.IndividualDeadLetterStrategy"> + <property name="useQueueForQueueMessages" value="true"></property> + <property name="processNonPersistent" value="true"></property> + <property name="processExpired" value="false"></property> + <property name="enableAudit" value="false"></property> + + </bean> + </deadLetterStrategy> + </policyEntry> + <policyEntry topic=">" memoryLimit="128 mb" > + <deadLetterStrategy> + <bean xmlns="http://www.springframework.org/schema/beans" + class="org.apache.activemq.broker.region.policy.IndividualDeadLetterStrategy"> + <property name="useQueueForQueueMessages" value="true"></property> + <property name="processNonPersistent" value="true"></property> + <property name="processExpired" value="false"></property> + <property name="enableAudit" value="false"></property> + + </bean> + </deadLetterStrategy> + </policyEntry> + </policyEntries> + </policyMap> + </destinationPolicy> + + <managementContext> + <managementContext createConnector="false"/> + </managementContext> + + </broker> + + + +</beans> +<!-- END SNIPPET: example --> http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/60979268/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/virtual/virtual-topics-and-interceptor.xml ---------------------------------------------------------------------- diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/virtual/virtual-topics-and-interceptor.xml b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/virtual/virtual-topics-and-interceptor.xml new file mode 100644 index 0000000..fcce72e --- /dev/null +++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/broker/virtual/virtual-topics-and-interceptor.xml @@ -0,0 +1,50 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> + +<!-- this file can only be parsed using the xbean-spring library --> +<!-- START SNIPPET: xbean --> +<beans + xmlns="http://www.springframework.org/schema/beans" + xmlns:amq="http://activemq.apache.org/schema/core" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.0.xsd + http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd"> + + <bean class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer" /> + + <broker xmlns="http://activemq.apache.org/schema/core" persistent="false"> + + + <destinationInterceptors> + <!-- custom destination interceptor --> + <bean xmlns="http://www.springframework.org/schema/beans" class="org.apache.activemq.broker.virtual.DestinationInterceptorDurableSubTest$SimpleDestinationInterceptor" /> + + <virtualDestinationInterceptor> + <virtualDestinations> + <virtualTopic name=">" prefix="VirtualTopicConsumers.*." selectorAware="false"/> + </virtualDestinations> + </virtualDestinationInterceptor> + </destinationInterceptors> + + <managementContext> + <managementContext createConnector="true" connectorPort="1299"/> + </managementContext> + </broker> + +</beans> +<!-- END SNIPPET: xbean -->
