Repository: activemq Updated Branches: refs/heads/activemq-5.11.x e756111d0 -> 256bcf334
AMQ-5903 - add patch that fixes the broker camel component to take all header values Project: http://git-wip-us.apache.org/repos/asf/activemq/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/256bcf33 Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/256bcf33 Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/256bcf33 Branch: refs/heads/activemq-5.11.x Commit: 256bcf3342ea8c31f4f06cfcab3330d396a845d6 Parents: e756111 Author: Heath Kesler <[email protected]> Authored: Wed Jul 29 17:48:16 2015 -0600 Committer: Daniel Kulp <[email protected]> Committed: Thu Jul 30 10:52:30 2015 -0400 ---------------------------------------------------------------------- .../camel/component/broker/BrokerProducer.java | 89 +++++++------------- .../broker/BrokerComponentXMLConfigTest.java | 55 ++++++++---- .../activemq/camel/component/broker/camel.xml | 45 +++++----- 3 files changed, 94 insertions(+), 95 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq/blob/256bcf33/activemq-camel/src/main/java/org/apache/activemq/camel/component/broker/BrokerProducer.java ---------------------------------------------------------------------- diff --git a/activemq-camel/src/main/java/org/apache/activemq/camel/component/broker/BrokerProducer.java b/activemq-camel/src/main/java/org/apache/activemq/camel/component/broker/BrokerProducer.java index fcf1256..82adad4 100644 --- a/activemq-camel/src/main/java/org/apache/activemq/camel/component/broker/BrokerProducer.java +++ b/activemq-camel/src/main/java/org/apache/activemq/camel/component/broker/BrokerProducer.java @@ -16,17 +16,17 @@ */ package org.apache.activemq.camel.component.broker; -import java.util.Map; - import org.apache.activemq.broker.ProducerBrokerExchange; import org.apache.activemq.command.ActiveMQMessage; import org.apache.camel.AsyncCallback; import org.apache.camel.Exchange; import org.apache.camel.Message; import org.apache.camel.component.jms.JmsMessage; -import org.apache.camel.converter.ObjectConverter; import org.apache.camel.impl.DefaultAsyncProducer; +import javax.jms.JMSException; +import java.util.Map; + public class BrokerProducer extends DefaultAsyncProducer { private final BrokerEndpoint brokerEndpoint; @@ -53,6 +53,7 @@ public class BrokerProducer extends DefaultAsyncProducer { protected boolean processInOnly(final Exchange exchange, final AsyncCallback callback) { try { ActiveMQMessage message = getMessage(exchange); + if (message != null) { message.setDestination(brokerEndpoint.getDestination()); //if the ProducerBrokerExchange is null the broker will create it @@ -67,76 +68,48 @@ public class BrokerProducer extends DefaultAsyncProducer { return true; } - private ActiveMQMessage getMessage(Exchange exchange) throws Exception { - ActiveMQMessage result; - Message camelMessage; + private ActiveMQMessage getMessage(Exchange exchange) throws IllegalStateException, JMSException { + Message camelMessage = getMessageFromExchange(exchange); + checkOriginalMessage(camelMessage); + ActiveMQMessage result = (ActiveMQMessage) ((JmsMessage) camelMessage).getJmsMessage(); + applyNewHeaders(result, camelMessage.getHeaders()); + return result; + } + + private Message getMessageFromExchange(Exchange exchange) { if (exchange.hasOut()) { - camelMessage = exchange.getOut(); - } else { - camelMessage = exchange.getIn(); + return exchange.getOut(); } - Map<String, Object> headers = camelMessage.getHeaders(); + return exchange.getIn(); + } + private void checkOriginalMessage(Message camelMessage) throws IllegalStateException { /** * We purposely don't want to support injecting messages half-way through * broker processing - use the activemq camel component for that - but - * we will support changing message headers and destinations + * we will support changing message headers and destinations. */ - if (camelMessage instanceof JmsMessage) { - JmsMessage jmsMessage = (JmsMessage) camelMessage; - if (jmsMessage.getJmsMessage() instanceof ActiveMQMessage) { - result = (ActiveMQMessage) jmsMessage.getJmsMessage(); - //lets apply any new message headers - setJmsHeaders(result, headers); - } else { - throw new IllegalStateException("Not the original message from the broker " + jmsMessage.getJmsMessage()); - } - } else { + + if (!(camelMessage instanceof JmsMessage)) { throw new IllegalStateException("Not the original message from the broker " + camelMessage); } - return result; + javax.jms.Message message = ((JmsMessage) camelMessage).getJmsMessage(); + + if (!(message instanceof ActiveMQMessage)) { + throw new IllegalStateException("Not the original message from the broker " + message); + } } - private void setJmsHeaders(ActiveMQMessage message, Map<String, Object> headers) { - message.setReadOnlyProperties(false); + private void applyNewHeaders(ActiveMQMessage message, Map<String, Object> headers) throws JMSException { for (Map.Entry<String, Object> entry : headers.entrySet()) { - if (entry.getKey().equalsIgnoreCase("JMSDeliveryMode")) { - Object value = entry.getValue(); - if (value instanceof Number) { - Number number = (Number) value; - message.setJMSDeliveryMode(number.intValue()); - } - } - if (entry.getKey().equalsIgnoreCase("JmsPriority")) { - Integer value = ObjectConverter.toInteger(entry.getValue()); - if (value != null) { - message.setJMSPriority(value.intValue()); - } - } - if (entry.getKey().equalsIgnoreCase("JMSTimestamp")) { - Long value = ObjectConverter.toLong(entry.getValue()); - if (value != null) { - message.setJMSTimestamp(value.longValue()); - } - } - if (entry.getKey().equalsIgnoreCase("JMSExpiration")) { - Long value = ObjectConverter.toLong(entry.getValue()); - if (value != null) { - message.setJMSExpiration(value.longValue()); - } - } - if (entry.getKey().equalsIgnoreCase("JMSRedelivered")) { - message.setJMSRedelivered(ObjectConverter.toBool(entry.getValue())); - } - if (entry.getKey().equalsIgnoreCase("JMSType")) { - Object value = entry.getValue(); - if (value != null) { - message.setJMSType(value.toString()); - } + String key = entry.getKey(); + Object value = entry.getValue(); + if(value == null) { + continue; } + message.setObjectProperty(key, value.toString(), false); } } - } http://git-wip-us.apache.org/repos/asf/activemq/blob/256bcf33/activemq-camel/src/test/java/org/apache/activemq/camel/component/broker/BrokerComponentXMLConfigTest.java ---------------------------------------------------------------------- diff --git a/activemq-camel/src/test/java/org/apache/activemq/camel/component/broker/BrokerComponentXMLConfigTest.java b/activemq-camel/src/test/java/org/apache/activemq/camel/component/broker/BrokerComponentXMLConfigTest.java index c2fc3f6..2773baa 100644 --- a/activemq-camel/src/test/java/org/apache/activemq/camel/component/broker/BrokerComponentXMLConfigTest.java +++ b/activemq-camel/src/test/java/org/apache/activemq/camel/component/broker/BrokerComponentXMLConfigTest.java @@ -16,22 +16,10 @@ */ package org.apache.activemq.camel.component.broker; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; - -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; - -import javax.jms.Connection; -import javax.jms.MessageConsumer; -import javax.jms.MessageListener; -import javax.jms.MessageProducer; -import javax.jms.Session; -import javax.jms.Topic; - import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.broker.BrokerRegistry; import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.command.ActiveMQMessage; import org.apache.activemq.command.ActiveMQQueue; import org.apache.activemq.xbean.BrokerFactoryBean; import org.junit.After; @@ -41,6 +29,14 @@ import org.springframework.core.io.ClassPathResource; import org.springframework.core.io.FileSystemResource; import org.springframework.core.io.Resource; +import javax.jms.*; +import java.util.Enumeration; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + public class BrokerComponentXMLConfigTest { protected static final String CONF_ROOT = "src/test/resources/org/apache/activemq/camel/component/broker/"; @@ -70,7 +66,6 @@ public class BrokerComponentXMLConfigTest { producerConnection = factory.createConnection(); producerConnection.start(); consumerSession = consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); - producerSession = producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); } @@ -133,7 +128,6 @@ public class BrokerComponentXMLConfigTest { latch.await(timeOutInSeconds, TimeUnit.SECONDS); assertEquals(0, latch.getCount()); - } @Test @@ -179,4 +173,35 @@ public class BrokerComponentXMLConfigTest { assertEquals(0, divertLatch.getCount()); } + @Test + public void testPreserveOriginalHeaders() throws Exception { + final ActiveMQQueue queue = new ActiveMQQueue(QUEUE_NAME); + + Topic topic = consumerSession.createTopic(TOPIC_NAME); + + final CountDownLatch latch = new CountDownLatch(messageCount); + MessageConsumer consumer = consumerSession.createConsumer(queue); + consumer.setMessageListener(new MessageListener() { + @Override + public void onMessage(javax.jms.Message message) { + try { + assertEquals("321", message.getStringProperty("JMSXGroupID")); + assertEquals("custom", message.getStringProperty("CustomHeader")); + latch.countDown(); + } catch (Throwable e) { + e.printStackTrace(); + } + } + }); + MessageProducer producer = producerSession.createProducer(topic); + + for (int i = 0; i < messageCount; i++) { + javax.jms.Message message = producerSession.createTextMessage("test: " + i); + message.setStringProperty("JMSXGroupID", "123"); + producer.send(message); + } + + latch.await(timeOutInSeconds, TimeUnit.SECONDS); + assertEquals(0, latch.getCount()); + } } http://git-wip-us.apache.org/repos/asf/activemq/blob/256bcf33/activemq-camel/src/test/resources/org/apache/activemq/camel/component/broker/camel.xml ---------------------------------------------------------------------- diff --git a/activemq-camel/src/test/resources/org/apache/activemq/camel/component/broker/camel.xml b/activemq-camel/src/test/resources/org/apache/activemq/camel/component/broker/camel.xml index 750c134..b84350b 100644 --- a/activemq-camel/src/test/resources/org/apache/activemq/camel/component/broker/camel.xml +++ b/activemq-camel/src/test/resources/org/apache/activemq/camel/component/broker/camel.xml @@ -15,45 +15,46 @@ limitations under the License. --> -<beans - xmlns="http://www.springframework.org/schema/beans" - xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" - xsi:schemaLocation=" - http://camel.apache.org/schema/spring http://camel.apache.org/schema/spring/camel-spring.xsd - http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd"> +<beans xmlns="http://www.springframework.org/schema/beans" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://camel.apache.org/schema/spring http://camel.apache.org/schema/spring/camel-spring.xsd + http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd"> <camelContext id="camel" trace="false" xmlns="http://camel.apache.org/schema/spring"> - - <!-- You can use Spring XML syntax to define the routes here using the <route> element --> <route id="brokerComponentTest"> <from uri="broker:topic:test.broker.>"/> <setHeader headerName="JMSPriority"> <constant>9</constant> </setHeader> + <setHeader headerName="JMSXGroupID"> + <constant>321</constant> + </setHeader> + <setHeader headerName="CustomHeader"> + <constant>custom</constant> + </setHeader> <to uri="broker:queue:test.broker.component.queue"/> </route> - <route id="brokerComponentDLQAboveLimitTest"> - <from uri="broker:queue:test.broker.component.route"/> - <choice> - <when> - <spel>#{@destinationView.enqueueCount >= 100}</spel> - <to uri="broker:queue:test.broker.component.ProcessLater"/> - </when> - <otherwise> - <to uri="broker:queue:test.broker.component.route"/> - </otherwise> - </choice> + <route id="brokerComponentDLQAboveLimitTest"> + <from uri="broker:queue:test.broker.component.route"/> + <choice> + <when> + <spel>#{@destinationView.enqueueCount >= 100}</spel> + <to uri="broker:queue:test.broker.component.ProcessLater"/> + </when> + <otherwise> + <to uri="broker:queue:test.broker.component.route"/> + </otherwise> + </choice> </route> - - </camelContext> + <bean id="brokerView" class="org.apache.activemq.broker.view.MessageBrokerView"> <constructor-arg value="testBroker"/> </bean> + <bean id="destinationView" factory-bean="brokerView" factory-method="getDestinationView"> <constructor-arg value="test.broker.component.route"/> - </bean> </beans>
