Updated Branches: refs/heads/camel-2.12.x 43c0d79ea -> 609998bb3 refs/heads/master 8d9950ee6 -> 2f3e58688
CAMEL-6821: Added support for custom headers being transferred in camel-rabbitmq. Thanks to David Keen for the patch. Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/2f3e5868 Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/2f3e5868 Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/2f3e5868 Branch: refs/heads/master Commit: 2f3e58688d91115264418bd5de9d03edbdded3be Parents: 8d9950e Author: Claus Ibsen <[email protected]> Authored: Mon Oct 28 12:46:12 2013 +0100 Committer: Claus Ibsen <[email protected]> Committed: Mon Oct 28 12:46:12 2013 +0100 ---------------------------------------------------------------------- .../component/rabbitmq/RabbitMQConsumer.java | 2 +- .../component/rabbitmq/RabbitMQEndpoint.java | 19 +++++++- .../component/rabbitmq/RabbitMQProducer.java | 45 +++++++++++++++++ .../rabbitmq/RabbitMQEndpointTest.java | 51 +++++++++++++++++++- .../rabbitmq/RabbitMQProducerTest.java | 33 +++++++++++++ 5 files changed, 146 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/2f3e5868/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQConsumer.java ---------------------------------------------------------------------- diff --git a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQConsumer.java b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQConsumer.java index 4f13045..f2fa128 100644 --- a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQConsumer.java +++ b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQConsumer.java @@ -118,7 +118,7 @@ public class RabbitMQConsumer extends DefaultConsumer { public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { - Exchange exchange = consumer.endpoint.createRabbitExchange(envelope, body); + Exchange exchange = consumer.endpoint.createRabbitExchange(envelope, properties, body); mergeAmqpProperties(exchange, properties); log.trace("Created exchange [exchange={}]", exchange); http://git-wip-us.apache.org/repos/asf/camel/blob/2f3e5868/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQEndpoint.java ---------------------------------------------------------------------- diff --git a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQEndpoint.java b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQEndpoint.java index ffb8515..4423721 100644 --- a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQEndpoint.java +++ b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQEndpoint.java @@ -18,13 +18,17 @@ package org.apache.camel.component.rabbitmq; import java.io.IOException; import java.net.URISyntaxException; +import java.util.Map; import java.util.UUID; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import com.rabbitmq.client.AMQP; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.Envelope; +import com.rabbitmq.client.LongString; + import org.apache.camel.Consumer; import org.apache.camel.Exchange; import org.apache.camel.Message; @@ -56,7 +60,7 @@ public class RabbitMQEndpoint extends DefaultEndpoint { super(endpointUri, component); } - public Exchange createRabbitExchange(Envelope envelope, byte[] body) { + public Exchange createRabbitExchange(Envelope envelope, AMQP.BasicProperties properties, byte[] body) { Exchange exchange = new DefaultExchange(getCamelContext(), getExchangePattern()); Message message = new DefaultMessage(); @@ -65,6 +69,19 @@ public class RabbitMQEndpoint extends DefaultEndpoint { message.setHeader(RabbitMQConstants.ROUTING_KEY, envelope.getRoutingKey()); message.setHeader(RabbitMQConstants.EXCHANGE_NAME, envelope.getExchange()); message.setHeader(RabbitMQConstants.DELIVERY_TAG, envelope.getDeliveryTag()); + + Map<String, Object> headers = properties.getHeaders(); + if (headers != null) { + for (Map.Entry<String, Object> entry : headers.entrySet()) { + // Convert LongStrings to String. + if (entry.getValue() instanceof LongString) { + message.setHeader(entry.getKey(), entry.getValue().toString()); + } else { + message.setHeader(entry.getKey(), entry.getValue()); + } + } + } + message.setBody(body); return exchange; http://git-wip-us.apache.org/repos/asf/camel/blob/2f3e5868/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQProducer.java ---------------------------------------------------------------------- diff --git a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQProducer.java b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQProducer.java index 1336de9..3bebb3f 100644 --- a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQProducer.java +++ b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQProducer.java @@ -17,7 +17,10 @@ package org.apache.camel.component.rabbitmq; import java.io.IOException; +import java.math.BigDecimal; import java.util.Date; +import java.util.HashMap; +import java.util.Map; import java.util.concurrent.Executors; import com.rabbitmq.client.AMQP; @@ -132,6 +135,48 @@ public class RabbitMQProducer extends DefaultProducer { properties.timestamp(new Date(Long.parseLong(timestamp.toString()))); } + final Map<String, Object> headers = exchange.getIn().getHeaders(); + Map<String, Object> filteredHeaders = new HashMap<String, Object>(); + + // TODO: Add support for a HeaderFilterStrategy. See: org.apache.camel.component.jms.JmsBinding#shouldOutputHeader + for (Map.Entry<String, Object> header : headers.entrySet()) { + + // filter header values. + Object value = getValidRabbitMQHeaderValue(header.getValue()); + if (value != null) { + filteredHeaders.put(header.getKey(), header.getValue()); + } else if (log.isDebugEnabled()) { + log.debug("Ignoring header: {} of class: {} with value: {}", + new Object[]{header.getKey(), header.getValue().getClass().getName(), header.getValue()}); + } + } + + properties.headers(filteredHeaders); + return properties; } + + /** + * Strategy to test if the given header is valid + * + * @param headerValue the header value + * @return the value to use, <tt>null</tt> to ignore this header + * @see com.rabbitmq.client.impl.Frame#fieldValueSize + */ + private Object getValidRabbitMQHeaderValue(Object headerValue) { + if (headerValue instanceof String) { + return headerValue; + } else if (headerValue instanceof BigDecimal) { + return headerValue; + } else if (headerValue instanceof Number) { + return headerValue; + } else if (headerValue instanceof Boolean) { + return headerValue; + } else if (headerValue instanceof Date) { + return headerValue; + } else if (headerValue instanceof byte[]) { + return headerValue; + } + return null; + } } http://git-wip-us.apache.org/repos/asf/camel/blob/2f3e5868/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQEndpointTest.java ---------------------------------------------------------------------- diff --git a/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQEndpointTest.java b/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQEndpointTest.java index df29279..2f30177 100644 --- a/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQEndpointTest.java +++ b/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQEndpointTest.java @@ -16,10 +16,16 @@ */ package org.apache.camel.component.rabbitmq; +import java.math.BigDecimal; +import java.util.Date; +import java.util.HashMap; +import java.util.Map; import java.util.UUID; import java.util.concurrent.ThreadPoolExecutor; +import com.rabbitmq.client.AMQP; import com.rabbitmq.client.Envelope; +import com.rabbitmq.client.impl.LongStringHelper; import org.apache.camel.Exchange; import org.apache.camel.test.junit4.CamelTestSupport; import org.junit.Test; @@ -28,9 +34,10 @@ import org.mockito.Mockito; public class RabbitMQEndpointTest extends CamelTestSupport { private Envelope envelope = Mockito.mock(Envelope.class); + private AMQP.BasicProperties properties = Mockito.mock(AMQP.BasicProperties.class); @Test - public void testCreatingRabbitExchangeSetsHeaders() throws Exception { + public void testCreatingRabbitExchangeSetsStandardHeaders() throws Exception { RabbitMQEndpoint endpoint = context.getEndpoint("rabbitmq:localhost/exchange", RabbitMQEndpoint.class); String routingKey = UUID.randomUUID().toString(); @@ -40,9 +47,10 @@ public class RabbitMQEndpointTest extends CamelTestSupport { Mockito.when(envelope.getRoutingKey()).thenReturn(routingKey); Mockito.when(envelope.getExchange()).thenReturn(exchangeName); Mockito.when(envelope.getDeliveryTag()).thenReturn(tag); + Mockito.when(properties.getHeaders()).thenReturn(null); byte[] body = new byte[20]; - Exchange exchange = endpoint.createRabbitExchange(envelope, body); + Exchange exchange = endpoint.createRabbitExchange(envelope, properties, body); assertEquals(exchangeName, exchange.getIn().getHeader(RabbitMQConstants.EXCHANGE_NAME)); assertEquals(routingKey, exchange.getIn().getHeader(RabbitMQConstants.ROUTING_KEY)); assertEquals(tag, exchange.getIn().getHeader(RabbitMQConstants.DELIVERY_TAG)); @@ -50,6 +58,45 @@ public class RabbitMQEndpointTest extends CamelTestSupport { } @Test + public void testCreatingRabbitExchangeSetsCustomHeaders() throws Exception { + RabbitMQEndpoint endpoint = context.getEndpoint("rabbitmq:localhost/exchange", RabbitMQEndpoint.class); + + String routingKey = UUID.randomUUID().toString(); + String exchangeName = UUID.randomUUID().toString(); + long tag = UUID.randomUUID().toString().hashCode(); + + Mockito.when(envelope.getRoutingKey()).thenReturn(routingKey); + Mockito.when(envelope.getExchange()).thenReturn(exchangeName); + Mockito.when(envelope.getDeliveryTag()).thenReturn(tag); + + Map<String, Object> customHeaders = new HashMap<String, Object>(); + customHeaders.put("stringHeader", "A string"); + customHeaders.put("bigDecimalHeader", new BigDecimal("12.34")); + customHeaders.put("integerHeader", 42); + customHeaders.put("doubleHeader", 42.24); + customHeaders.put("booleanHeader", true); + customHeaders.put("dateHeader", new Date(0)); + customHeaders.put("byteArrayHeader", "foo".getBytes()); + customHeaders.put("longStringHeader", LongStringHelper.asLongString("Some really long string")); + Mockito.when(properties.getHeaders()).thenReturn(customHeaders); + + byte[] body = new byte[20]; + Exchange exchange = endpoint.createRabbitExchange(envelope, properties, body); + assertEquals(exchangeName, exchange.getIn().getHeader(RabbitMQConstants.EXCHANGE_NAME)); + assertEquals(routingKey, exchange.getIn().getHeader(RabbitMQConstants.ROUTING_KEY)); + assertEquals(tag, exchange.getIn().getHeader(RabbitMQConstants.DELIVERY_TAG)); + assertEquals("A string", exchange.getIn().getHeader("stringHeader")); + assertEquals(new BigDecimal("12.34"), exchange.getIn().getHeader("bigDecimalHeader")); + assertEquals(42, exchange.getIn().getHeader("integerHeader")); + assertEquals(42.24, exchange.getIn().getHeader("doubleHeader")); + assertEquals(true, exchange.getIn().getHeader("booleanHeader")); + assertEquals(new Date(0), exchange.getIn().getHeader("dateHeader")); + assertArrayEquals("foo".getBytes(), (byte[]) exchange.getIn().getHeader("byteArrayHeader")); + assertEquals("Some really long string", exchange.getIn().getHeader("longStringHeader")); + assertEquals(body, exchange.getIn().getBody()); + } + + @Test public void creatingExecutorUsesThreadPoolSettings() throws Exception { RabbitMQEndpoint endpoint = context.getEndpoint("rabbitmq:localhost/exchange?threadPoolSize=20", RabbitMQEndpoint.class); assertEquals(20, endpoint.getThreadPoolSize()); http://git-wip-us.apache.org/repos/asf/camel/blob/2f3e5868/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQProducerTest.java ---------------------------------------------------------------------- diff --git a/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQProducerTest.java b/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQProducerTest.java index 188d540..7b2df60 100644 --- a/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQProducerTest.java +++ b/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQProducerTest.java @@ -18,6 +18,10 @@ package org.apache.camel.component.rabbitmq; import java.io.IOException; +import java.math.BigDecimal; +import java.util.Date; +import java.util.HashMap; +import java.util.Map; import java.util.concurrent.ExecutorService; import com.rabbitmq.client.AMQP; @@ -31,7 +35,9 @@ import org.junit.Test; import org.mockito.Matchers; import org.mockito.Mockito; +import static org.junit.Assert.assertArrayEquals; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; public class RabbitMQProducerTest { @@ -150,4 +156,31 @@ public class RabbitMQProducerTest { AMQP.BasicProperties props = producer.buildProperties(exchange).build(); assertEquals(12345123, props.getTimestamp().getTime()); } + + @Test + public void testPropertiesUsesCustomHeaders() throws IOException { + RabbitMQProducer producer = new RabbitMQProducer(endpoint); + Map<String, Object> customHeaders = new HashMap<String, Object>(); + customHeaders.put("stringHeader", "A string"); + customHeaders.put("bigDecimalHeader", new BigDecimal("12.34")); + customHeaders.put("integerHeader", 42); + customHeaders.put("doubleHeader", 42.24); + customHeaders.put("booleanHeader", true); + customHeaders.put("dateHeader", new Date(0)); + customHeaders.put("byteArrayHeader", "foo".getBytes()); + customHeaders.put("invalidHeader", new Something()); + message.setHeaders(customHeaders); + AMQP.BasicProperties props = producer.buildProperties(exchange).build(); + assertEquals("A string", props.getHeaders().get("stringHeader")); + assertEquals(new BigDecimal("12.34"), props.getHeaders().get("bigDecimalHeader")); + assertEquals(42, props.getHeaders().get("integerHeader")); + assertEquals(42.24, props.getHeaders().get("doubleHeader")); + assertEquals(true, props.getHeaders().get("booleanHeader")); + assertEquals(new Date(0), props.getHeaders().get("dateHeader")); + assertArrayEquals("foo".getBytes(), (byte[]) props.getHeaders().get("byteArrayHeader")); + assertNull(props.getHeaders().get("invalidHeader")); + } + + private static class Something { + } }
