fix for https://issues.apache.org/jira/browse/AMQ-4714
Project: http://git-wip-us.apache.org/repos/asf/activemq/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/8d31e44e Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/8d31e44e Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/8d31e44e Branch: refs/heads/trunk Commit: 8d31e44e8d0615632d428f0a6778ce36ba5e02ee Parents: 0a5b143 Author: rajdavies <[email protected]> Authored: Fri Sep 6 13:45:37 2013 +0100 Committer: rajdavies <[email protected]> Committed: Fri Sep 6 13:46:44 2013 +0100 ---------------------------------------------------------------------- .../camel/component/broker/BrokerEndpoint.java | 19 ++--- .../broker/BrokerComponentXMLConfigTest.java | 63 ++++++++++++++--- .../activemq/camel/component/broker/camel.xml | 73 +++++++++++++------- 3 files changed, 114 insertions(+), 41 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq/blob/8d31e44e/activemq-camel/src/main/java/org/apache/activemq/camel/component/broker/BrokerEndpoint.java ---------------------------------------------------------------------- diff --git a/activemq-camel/src/main/java/org/apache/activemq/camel/component/broker/BrokerEndpoint.java b/activemq-camel/src/main/java/org/apache/activemq/camel/component/broker/BrokerEndpoint.java index e0d896e..e327669 100644 --- a/activemq-camel/src/main/java/org/apache/activemq/camel/component/broker/BrokerEndpoint.java +++ b/activemq-camel/src/main/java/org/apache/activemq/camel/component/broker/BrokerEndpoint.java @@ -22,8 +22,6 @@ import java.util.concurrent.CopyOnWriteArrayList; import org.apache.activemq.broker.ProducerBrokerExchange; import org.apache.activemq.broker.inteceptor.MessageInterceptor; import org.apache.activemq.broker.inteceptor.MessageInterceptorRegistry; -import org.apache.activemq.broker.view.MessageBrokerView; -import org.apache.activemq.broker.view.MessageBrokerViewRegistry; import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.Message; import org.apache.camel.Consumer; @@ -46,7 +44,6 @@ public class BrokerEndpoint extends DefaultEndpoint implements MultipleConsumers @UriParam private final BrokerConfiguration configuration; - private MessageBrokerView messageBrokerView; private MessageInterceptorRegistry messageInterceptorRegistry; @@ -92,8 +89,7 @@ public class BrokerEndpoint extends DefaultEndpoint implements MultipleConsumers @Override protected void doStart() throws Exception { super.doStart(); - messageBrokerView = MessageBrokerViewRegistry.getInstance().lookup(configuration.getBrokerName()); - messageInterceptorRegistry = new MessageInterceptorRegistry(messageBrokerView.getBrokerService()); + messageInterceptorRegistry = MessageInterceptorRegistry.getInstance().get(configuration.getBrokerName()); for (MessageInterceptor messageInterceptor : messageInterceptorList) { addMessageInterceptor(messageInterceptor); } @@ -119,11 +115,18 @@ public class BrokerEndpoint extends DefaultEndpoint implements MultipleConsumers } protected void inject(ProducerBrokerExchange producerBrokerExchange, Message message) throws Exception { + ProducerBrokerExchange pbe = producerBrokerExchange; if (message != null) { - if (message.getDestination() == null) { - message.setDestination(destination); + message.setDestination(destination); + if (producerBrokerExchange != null && producerBrokerExchange.getRegionDestination() != null){ + if (!producerBrokerExchange.getRegionDestination().getActiveMQDestination().equals(destination)){ + //The message broker will create a new ProducerBrokerExchange with the + //correct region broker set + pbe = null; + } } - messageInterceptorRegistry.injectMessage(producerBrokerExchange, message); + + messageInterceptorRegistry.injectMessage(pbe, message); } } } http://git-wip-us.apache.org/repos/asf/activemq/blob/8d31e44e/activemq-camel/src/test/java/org/apache/activemq/camel/component/broker/BrokerComponentXMLConfigTest.java ---------------------------------------------------------------------- diff --git a/activemq-camel/src/test/java/org/apache/activemq/camel/component/broker/BrokerComponentXMLConfigTest.java b/activemq-camel/src/test/java/org/apache/activemq/camel/component/broker/BrokerComponentXMLConfigTest.java index 0696170..e3b7227 100644 --- a/activemq-camel/src/test/java/org/apache/activemq/camel/component/broker/BrokerComponentXMLConfigTest.java +++ b/activemq-camel/src/test/java/org/apache/activemq/camel/component/broker/BrokerComponentXMLConfigTest.java @@ -47,16 +47,18 @@ public class BrokerComponentXMLConfigTest { private static final Logger LOG = LoggerFactory.getLogger(BrokerComponentXMLConfigTest.class); protected static final String TOPIC_NAME = "test.broker.component.topic"; protected static final String QUEUE_NAME = "test.broker.component.queue"; + protected static final String ROUTE_QUEUE_NAME = "test.broker.component.route"; + protected static final String DIVERTED_QUEUE_NAME = "test.broker.component.ProcessLater"; + protected static final int DIVERT_COUNT = 100; + protected BrokerService brokerService; protected ActiveMQConnectionFactory factory; protected Connection producerConnection; protected Connection consumerConnection; protected Session consumerSession; protected Session producerSession; - protected MessageConsumer consumer; - protected MessageProducer producer; - protected Topic topic; - protected int messageCount = 5000; + + protected int messageCount = 1000; protected int timeOutInSeconds = 10; @Before @@ -69,10 +71,9 @@ public class BrokerComponentXMLConfigTest { producerConnection = factory.createConnection(); producerConnection.start(); consumerSession = consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); - topic = consumerSession.createTopic(TOPIC_NAME); + producerSession = producerConnection.createSession(false,Session.AUTO_ACKNOWLEDGE); - consumer = consumerSession.createConsumer(topic); - producer = producerSession.createProducer(topic); + } protected BrokerService createBroker(String resource) throws Exception { @@ -110,9 +111,10 @@ public class BrokerComponentXMLConfigTest { public void testReRouteAll() throws Exception { final ActiveMQQueue queue = new ActiveMQQueue(QUEUE_NAME); + Topic topic = consumerSession.createTopic(TOPIC_NAME); final CountDownLatch latch = new CountDownLatch(messageCount); - consumer = consumerSession.createConsumer(queue); + MessageConsumer consumer = consumerSession.createConsumer(queue); consumer.setMessageListener(new MessageListener() { @Override public void onMessage(javax.jms.Message message) { @@ -124,6 +126,8 @@ public class BrokerComponentXMLConfigTest { } } }); + MessageProducer producer = producerSession.createProducer(topic); + for (int i = 0; i < messageCount; i++){ javax.jms.Message message = producerSession.createTextMessage("test: " + i); producer.send(message); @@ -134,7 +138,50 @@ public class BrokerComponentXMLConfigTest { } + @Test + public void testRouteWithDestinationLimit() throws Exception { + final ActiveMQQueue routeQueue = new ActiveMQQueue(ROUTE_QUEUE_NAME); + + final CountDownLatch routeLatch = new CountDownLatch(DIVERT_COUNT); + MessageConsumer messageConsumer = consumerSession.createConsumer(routeQueue); + messageConsumer.setMessageListener(new MessageListener() { + @Override + public void onMessage(javax.jms.Message message) { + try { + routeLatch.countDown(); + } catch (Throwable e) { + e.printStackTrace(); + } + } + }); + + final CountDownLatch divertLatch = new CountDownLatch(messageCount-DIVERT_COUNT); + MessageConsumer divertConsumer = consumerSession.createConsumer(new ActiveMQQueue(DIVERTED_QUEUE_NAME)); + divertConsumer.setMessageListener(new MessageListener() { + @Override + public void onMessage(javax.jms.Message message) { + try { + divertLatch.countDown(); + } catch (Throwable e) { + e.printStackTrace(); + } + } + }); + + + MessageProducer producer = producerSession.createProducer(routeQueue); + + for (int i = 0; i < messageCount; i++){ + javax.jms.Message message = producerSession.createTextMessage("test: " + i); + producer.send(message); + } + + routeLatch.await(timeOutInSeconds, TimeUnit.SECONDS); + divertLatch.await(timeOutInSeconds,TimeUnit.SECONDS); + assertEquals(0,routeLatch.getCount()); + assertEquals(0,divertLatch.getCount()); + } } http://git-wip-us.apache.org/repos/asf/activemq/blob/8d31e44e/activemq-camel/src/test/resources/org/apache/activemq/camel/component/broker/camel.xml ---------------------------------------------------------------------- diff --git a/activemq-camel/src/test/resources/org/apache/activemq/camel/component/broker/camel.xml b/activemq-camel/src/test/resources/org/apache/activemq/camel/component/broker/camel.xml index b23a281..750c134 100644 --- a/activemq-camel/src/test/resources/org/apache/activemq/camel/component/broker/camel.xml +++ b/activemq-camel/src/test/resources/org/apache/activemq/camel/component/broker/camel.xml @@ -1,36 +1,59 @@ - <!-- - Licensed to the Apache Software Foundation (ASF) under one or more - contributor license agreements. See the NOTICE file distributed with - this work for additional information regarding copyright ownership. - The ASF licenses this file to You under the Apache License, Version 2.0 - (the "License"); you may not use this file except in compliance with - the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - - Unless required by applicable law or agreed to in writing, software - distributed under the License is distributed on an "AS IS" BASIS, - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - See the License for the specific language governing permissions and - limitations under the License. +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. --> <beans - xmlns="http://www.springframework.org/schema/beans" - xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" - xsi:schemaLocation=" + xmlns="http://www.springframework.org/schema/beans" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation=" http://camel.apache.org/schema/spring http://camel.apache.org/schema/spring/camel-spring.xsd http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd"> - - <camelContext id="camel" trace="false" xmlns="http://camel.apache.org/schema/spring"> + + <camelContext id="camel" trace="false" xmlns="http://camel.apache.org/schema/spring"> <!-- You can use Spring XML syntax to define the routes here using the <route> element --> - <route id ="brokerComponentTest"> + <route id="brokerComponentTest"> <from uri="broker:topic:test.broker.>"/> - <setHeader headerName="JMSPriority"> - <constant>9</constant> - </setHeader> + <setHeader headerName="JMSPriority"> + <constant>9</constant> + </setHeader> <to uri="broker:queue:test.broker.component.queue"/> </route> + + <route id="brokerComponentDLQAboveLimitTest"> + <from uri="broker:queue:test.broker.component.route"/> + <choice> + <when> + <spel>#{@destinationView.enqueueCount >= 100}</spel> + <to uri="broker:queue:test.broker.component.ProcessLater"/> + </when> + <otherwise> + <to uri="broker:queue:test.broker.component.route"/> + </otherwise> + </choice> + </route> + + </camelContext> -</beans> \ No newline at end of file + <bean id="brokerView" class="org.apache.activemq.broker.view.MessageBrokerView"> + <constructor-arg value="testBroker"/> + </bean> + <bean id="destinationView" factory-bean="brokerView" factory-method="getDestinationView"> + <constructor-arg value="test.broker.component.route"/> + + </bean> +</beans> +
