Updated Branches: refs/heads/trunk 2eb0203f0 -> 8d31e44e8
Fix for https://issues.apache.org/jira/browse/AMQ-4714 Project: http://git-wip-us.apache.org/repos/asf/activemq/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/0a5b1438 Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/0a5b1438 Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/0a5b1438 Branch: refs/heads/trunk Commit: 0a5b14386fee99ca9d435145dcaaa189728c19e6 Parents: 2eb0203 Author: rajdavies <[email protected]> Authored: Fri Sep 6 13:28:53 2013 +0100 Committer: rajdavies <[email protected]> Committed: Fri Sep 6 13:46:43 2013 +0100 ---------------------------------------------------------------------- .../inteceptor/MessageInterceptorRegistry.java | 30 ++++++++++++- .../activemq/broker/view/MessageBrokerView.java | 16 +++++++ .../interceptor/MessageInterceptorTest.java | 45 +++++++++++++++++--- 3 files changed, 85 insertions(+), 6 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq/blob/0a5b1438/activemq-broker/src/main/java/org/apache/activemq/broker/inteceptor/MessageInterceptorRegistry.java ---------------------------------------------------------------------- diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/inteceptor/MessageInterceptorRegistry.java b/activemq-broker/src/main/java/org/apache/activemq/broker/inteceptor/MessageInterceptorRegistry.java index ec05b9e..6dbb8aa 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/inteceptor/MessageInterceptorRegistry.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/inteceptor/MessageInterceptorRegistry.java @@ -16,7 +16,11 @@ */ package org.apache.activemq.broker.inteceptor; +import java.util.HashMap; +import java.util.Map; + import org.apache.activemq.broker.Broker; +import org.apache.activemq.broker.BrokerRegistry; import org.apache.activemq.broker.BrokerService; import org.apache.activemq.broker.MutableBrokerFilter; import org.apache.activemq.broker.ProducerBrokerExchange; @@ -27,11 +31,35 @@ import org.slf4j.LoggerFactory; public class MessageInterceptorRegistry { private static final Logger LOG = LoggerFactory.getLogger(MessageInterceptorRegistry.class); + private static final MessageInterceptorRegistry INSTANCE = new MessageInterceptorRegistry(); private final BrokerService brokerService; private MessageInterceptorFilter filter; + private final Map<BrokerService, MessageInterceptorRegistry> messageInterceptorRegistryMap = new HashMap<BrokerService, MessageInterceptorRegistry>(); + + + public static MessageInterceptorRegistry getInstance() { + return INSTANCE; + } + + public MessageInterceptorRegistry get(String brokerName){ + BrokerService brokerService = BrokerRegistry.getInstance().lookup(brokerName); + return get(brokerService); + } + public synchronized MessageInterceptorRegistry get(BrokerService brokerService){ + MessageInterceptorRegistry result = messageInterceptorRegistryMap.get(brokerService); + if (result == null){ + result = new MessageInterceptorRegistry(brokerService); + messageInterceptorRegistryMap.put(brokerService,result); + } + return result; + } + + private MessageInterceptorRegistry(){ + this.brokerService=null; + } - public MessageInterceptorRegistry(BrokerService brokerService) { + private MessageInterceptorRegistry(BrokerService brokerService) { this.brokerService = brokerService; } http://git-wip-us.apache.org/repos/asf/activemq/blob/0a5b1438/activemq-broker/src/main/java/org/apache/activemq/broker/view/MessageBrokerView.java ---------------------------------------------------------------------- diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/view/MessageBrokerView.java b/activemq-broker/src/main/java/org/apache/activemq/broker/view/MessageBrokerView.java index 570fc5a..316157a 100644 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/view/MessageBrokerView.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/view/MessageBrokerView.java @@ -21,6 +21,7 @@ import java.util.HashSet; import java.util.Map; import java.util.Set; +import org.apache.activemq.broker.BrokerRegistry; import org.apache.activemq.broker.BrokerService; import org.apache.activemq.broker.region.Destination; import org.apache.activemq.command.ActiveMQDestination; @@ -55,6 +56,21 @@ public class MessageBrokerView { } } + /** + * Create a view of a running Broker + * @param brokerName + */ + public MessageBrokerView(String brokerName){ + this.brokerService = BrokerRegistry.getInstance().lookup(brokerName); + if (brokerService == null){ + throw new NullPointerException("BrokerService is null"); + } + if (!brokerService.isStarted()){ + throw new IllegalStateException("BrokerService " + brokerService.getBrokerName() + " is not started"); + } + } + + /** * @return the brokerName http://git-wip-us.apache.org/repos/asf/activemq/blob/0a5b1438/activemq-broker/src/test/java/org/apache/activemq/broker/interceptor/MessageInterceptorTest.java ---------------------------------------------------------------------- diff --git a/activemq-broker/src/test/java/org/apache/activemq/broker/interceptor/MessageInterceptorTest.java b/activemq-broker/src/test/java/org/apache/activemq/broker/interceptor/MessageInterceptorTest.java index 379bd5a..2e593d8 100644 --- a/activemq-broker/src/test/java/org/apache/activemq/broker/interceptor/MessageInterceptorTest.java +++ b/activemq-broker/src/test/java/org/apache/activemq/broker/interceptor/MessageInterceptorTest.java @@ -81,7 +81,9 @@ public class MessageInterceptorTest extends TestCase { } } - public void testNormalOperation() throws Exception { + + + public void testNoIntercept() throws Exception { final CountDownLatch latch = new CountDownLatch(messageCount); consumer.setMessageListener(new MessageListener() { @@ -101,8 +103,41 @@ public class MessageInterceptorTest extends TestCase { } + public void testNoStackOverFlow() throws Exception { + + + final MessageInterceptorRegistry registry = MessageInterceptorRegistry.getInstance().get(BrokerRegistry.getInstance().findFirst()); + registry.addMessageInterceptorForTopic(topic.getTopicName(), new MessageInterceptor() { + @Override + public void intercept(ProducerBrokerExchange producerExchange, Message message) { + + try { + registry.injectMessage(producerExchange, message); + } catch (Exception e) { + e.printStackTrace(); + } + } + }); + + final CountDownLatch latch = new CountDownLatch(messageCount); + consumer.setMessageListener(new MessageListener() { + @Override + public void onMessage(javax.jms.Message message) { + latch.countDown(); + + } + }); + for (int i = 0; i < messageCount; i++){ + javax.jms.Message message = producerSession.createTextMessage("test: " + i); + producer.send(message); + } + + latch.await(timeOutInSeconds, TimeUnit.SECONDS); + assertEquals(0,latch.getCount()); + } + public void testInterceptorAll() throws Exception { - MessageInterceptorRegistry registry = new MessageInterceptorRegistry(BrokerRegistry.getInstance().findFirst()); + MessageInterceptorRegistry registry = MessageInterceptorRegistry.getInstance().get(BrokerRegistry.getInstance().findFirst()); registry.addMessageInterceptorForTopic(topic.getTopicName(), new MessageInterceptor() { @Override public void intercept(ProducerBrokerExchange producerExchange, Message message) { @@ -132,7 +167,7 @@ public class MessageInterceptorTest extends TestCase { public void testReRouteAll() throws Exception { final ActiveMQQueue queue = new ActiveMQQueue("Reroute.From."+topic.getTopicName()); - final MessageInterceptorRegistry registry = new MessageInterceptorRegistry(BrokerRegistry.getInstance().findFirst()); + final MessageInterceptorRegistry registry = MessageInterceptorRegistry.getInstance().get(BrokerRegistry.getInstance().findFirst()); registry.addMessageInterceptorForTopic(topic.getTopicName(), new MessageInterceptor() { @Override public void intercept(ProducerBrokerExchange producerExchange, Message message) { @@ -167,7 +202,7 @@ public class MessageInterceptorTest extends TestCase { public void testReRouteAllWithNullProducerExchange() throws Exception { final ActiveMQQueue queue = new ActiveMQQueue("Reroute.From."+topic.getTopicName()); - final MessageInterceptorRegistry registry = new MessageInterceptorRegistry(BrokerRegistry.getInstance().findFirst()); + final MessageInterceptorRegistry registry = MessageInterceptorRegistry.getInstance().get(BrokerRegistry.getInstance().findFirst()); registry.addMessageInterceptorForTopic(topic.getTopicName(), new MessageInterceptor() { @Override public void intercept(ProducerBrokerExchange producerExchange, Message message) { @@ -203,7 +238,7 @@ public class MessageInterceptorTest extends TestCase { final ActiveMQQueue testQueue = new ActiveMQQueue("testQueueFor."+getName()); - final MessageInterceptorRegistry registry = new MessageInterceptorRegistry(BrokerRegistry.getInstance().findFirst()); + final MessageInterceptorRegistry registry = MessageInterceptorRegistry.getInstance().get(BrokerRegistry.getInstance().findFirst()); registry.addMessageInterceptorForTopic(">", new MessageInterceptor() { @Override public void intercept(ProducerBrokerExchange producerExchange, Message message) {
