This is an automated email from the ASF dual-hosted git repository.
dmvolod pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/master by this push:
new fddedbc CAMEL-12654: RabbitMQ Headers - Headers with null value are
skipped
fddedbc is described below
commit fddedbc44de60ccc214c0c40be420dad7e1d13da
Author: Dmitry Volodin <[email protected]>
AuthorDate: Tue Jul 24 11:08:45 2018 +0300
CAMEL-12654: RabbitMQ Headers - Headers with null value are skipped
---
.../src/main/docs/rabbitmq-component.adoc | 8 ++-
.../component/rabbitmq/RabbitMQComponent.java | 17 ++++-
.../camel/component/rabbitmq/RabbitMQEndpoint.java | 13 ++++
.../rabbitmq/RabbitMQMessageConverter.java | 13 +++-
.../rabbitmq/reply/ReplyManagerSupport.java | 1 +
.../component/rabbitmq/RabbitMQComponentTest.java | 3 +
.../rabbitmq/RabbitMQProducerIntTest.java | 75 ++++++++++++++++++++++
.../springboot/RabbitMQComponentConfiguration.java | 14 +++-
8 files changed, 138 insertions(+), 6 deletions(-)
diff --git a/components/camel-rabbitmq/src/main/docs/rabbitmq-component.adoc
b/components/camel-rabbitmq/src/main/docs/rabbitmq-component.adoc
index e6e330c..823f22b 100644
--- a/components/camel-rabbitmq/src/main/docs/rabbitmq-component.adoc
+++ b/components/camel-rabbitmq/src/main/docs/rabbitmq-component.adoc
@@ -47,14 +47,14 @@ exchange name determines which exchange the queue will bind
to.
=== Options
// component options: START
-The RabbitMQ component supports 49 options, which are listed below.
+The RabbitMQ component supports 50 options, which are listed below.
[width="100%",cols="2,5,^1,2",options="header"]
|===
| Name | Description | Default | Type
-| *hostname* (common) | The hostname of the running rabbitmq instance or
cluster. | | String
+| *hostname* (common) | The hostname of the running RabbitMQ instance or
cluster. | | String
| *portNumber* (common) | Port number for the host with the running rabbitmq
instance or cluster. | 5672 | int
| *username* (security) | Username in case of authenticated access | guest |
String
| *password* (security) | Password for authenticated access | guest | String
@@ -102,6 +102,7 @@ The RabbitMQ component supports 49 options, which are
listed below.
| *deadLetterQueue* (common) | The name of the dead letter queue | | String
| *deadLetterRoutingKey* (common) | The routing key for the dead letter
exchange | | String
| *deadLetterExchangeType* (common) | The type of the dead letter exchange |
direct | String
+| *allowNullHeaders* (producer) | Allow pass null values to header | false |
boolean
| *resolveProperty Placeholders* (advanced) | Whether the component should
resolve property placeholders on itself when starting. Only properties which
are of String type can use property placeholders. | true | boolean
|===
// component options: END
@@ -126,7 +127,7 @@ with the following path and query parameters:
|===
-==== Query Parameters (61 parameters):
+==== Query Parameters (62 parameters):
[width="100%",cols="2,5,^1,2",options="header"]
@@ -163,6 +164,7 @@ with the following path and query parameters:
| *exceptionHandler* (consumer) | To let the consumer use a custom
ExceptionHandler. Notice if the option bridgeErrorHandler is enabled then this
options is not in use. By default the consumer will deal with exceptions, that
will be logged at WARN or ERROR level and ignored. | | ExceptionHandler
| *exchangePattern* (consumer) | Sets the exchange pattern when the consumer
creates an exchange. | | ExchangePattern
| *threadPoolSize* (consumer) | The consumer uses a Thread Pool Executor with
a fixed number of threads. This setting allows you to set that number of
threads. | 10 | int
+| *allowNullHeaders* (producer) | Allow pass null values to header | false |
boolean
| *bridgeEndpoint* (producer) | If the bridgeEndpoint is true, the producer
will ignore the message header of rabbitmq.EXCHANGE_NAME and
rabbitmq.ROUTING_KEY | false | boolean
| *channelPoolMaxSize* (producer) | Get maximum number of opened channel in
pool | 10 | int
| *channelPoolMaxWait* (producer) | Set the maximum number of milliseconds to
wait for a channel from the pool | 1000 | long
diff --git
a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQComponent.java
b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQComponent.java
index 96c1ac8..69a3e8a 100644
---
a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQComponent.java
+++
b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQComponent.java
@@ -81,6 +81,8 @@ public class RabbitMQComponent extends UriEndpointComponent {
private String deadLetterQueue;
@Metadata(label = "common", defaultValue = "direct", enums =
"direct,fanout,headers,topic")
private String deadLetterExchangeType = "direct";
+ @Metadata(label = "producer")
+ private boolean allowNullHeaders;
@Metadata(label = "security")
private String sslProtocol;
@Metadata(label = "security")
@@ -241,6 +243,7 @@ public class RabbitMQComponent extends UriEndpointComponent
{
endpoint.setDeadLetterExchangeType(getDeadLetterExchangeType());
endpoint.setDeadLetterQueue(getDeadLetterQueue());
endpoint.setDeadLetterRoutingKey(getDeadLetterRoutingKey());
+ endpoint.setAllowNullHeaders(isAllowNullHeaders());
setProperties(endpoint, params);
if (LOG.isDebugEnabled()) {
@@ -262,6 +265,8 @@ public class RabbitMQComponent extends UriEndpointComponent
{
endpoint.getExchangeArgs().putAll(IntrospectionSupport.extractProperties(argsCopy,
EXCHANGE_ARG_PREFIX));
endpoint.getQueueArgs().putAll(IntrospectionSupport.extractProperties(argsCopy,
QUEUE_ARG_PREFIX));
endpoint.getBindingArgs().putAll(IntrospectionSupport.extractProperties(argsCopy,
BINDING_ARG_PREFIX));
+ // Change null headers processing for message converter
+
endpoint.getMessageConverter().setAllowNullHeaders(endpoint.isAllowNullHeaders());
return endpoint;
}
@@ -271,7 +276,7 @@ public class RabbitMQComponent extends UriEndpointComponent
{
}
/**
- * The hostname of the running rabbitmq instance or cluster.
+ * The hostname of the running RabbitMQ instance or cluster.
*/
public void setHostname(String hostname) {
this.hostname = hostname;
@@ -859,4 +864,14 @@ public class RabbitMQComponent extends
UriEndpointComponent {
this.deadLetterExchangeType = deadLetterExchangeType;
}
+ /**
+ * Allow pass null values to header
+ */
+ public boolean isAllowNullHeaders() {
+ return allowNullHeaders;
+ }
+
+ public void setAllowNullHeaders(boolean allowNullHeaders) {
+ this.allowNullHeaders = allowNullHeaders;
+ }
}
diff --git
a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQEndpoint.java
b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQEndpoint.java
index 837ca59..9cdbf34 100644
---
a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQEndpoint.java
+++
b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQEndpoint.java
@@ -175,6 +175,8 @@ public class RabbitMQEndpoint extends DefaultEndpoint
implements AsyncEndpoint {
private long publisherAcknowledgementsTimeout;
@UriParam(label = "producer")
private boolean guaranteedDeliveries;
+ @UriParam(label = "producer")
+ private boolean allowNullHeaders;
// camel-jms supports this setting but it is not currently configurable in
camel-rabbitmq
private boolean useMessageIDAsCorrelationID = true;
// camel-jms supports this setting but it is not currently configurable in
camel-rabbitmq
@@ -999,6 +1001,17 @@ public class RabbitMQEndpoint extends DefaultEndpoint
implements AsyncEndpoint {
this.exclusiveConsumer = exclusiveConsumer;
}
+ /**
+ * Allow pass null values to header
+ */
+ public boolean isAllowNullHeaders() {
+ return allowNullHeaders;
+ }
+
+ public void setAllowNullHeaders(boolean allowNullHeaders) {
+ this.allowNullHeaders = allowNullHeaders;
+ }
+
public boolean isPassive() {
return passive;
}
diff --git
a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQMessageConverter.java
b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQMessageConverter.java
index 6cb535e..07902ca 100644
---
a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQMessageConverter.java
+++
b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQMessageConverter.java
@@ -37,6 +37,8 @@ import org.slf4j.LoggerFactory;
public class RabbitMQMessageConverter {
protected static final Logger LOG =
LoggerFactory.getLogger(RabbitMQMessageConverter.class);
+ private boolean allowNullHeaders;
+
/**
* Will take an {@link Exchange} and add header values back to the {@link
Exchange#getIn()}
*/
@@ -165,7 +167,8 @@ public class RabbitMQMessageConverter {
for (Map.Entry<String, Object> header : headers.entrySet()) {
// filter header values.
Object value = getValidRabbitMQHeaderValue(header.getValue());
- if (value != null) {
+
+ if (value != null || isAllowNullHeaders()) {
filteredHeaders.put(header.getKey(), header.getValue());
} else if (LOG.isDebugEnabled()) {
if (header.getValue() == null) {
@@ -305,4 +308,12 @@ public class RabbitMQMessageConverter {
private Object isSerializeHeaderEnabled(final AMQP.BasicProperties
properties) {
return properties.getHeaders().get(RabbitMQEndpoint.SERIALIZE_HEADER);
}
+
+ public boolean isAllowNullHeaders() {
+ return allowNullHeaders;
+ }
+
+ public void setAllowNullHeaders(boolean allowNullHeaders) {
+ this.allowNullHeaders = allowNullHeaders;
+ }
}
diff --git
a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/reply/ReplyManagerSupport.java
b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/reply/ReplyManagerSupport.java
index 15b990a..1ddf056 100644
---
a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/reply/ReplyManagerSupport.java
+++
b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/reply/ReplyManagerSupport.java
@@ -224,6 +224,7 @@ public abstract class ReplyManagerSupport extends
ServiceSupport implements Repl
ObjectHelper.notNull(executorService, "executorService", this);
ObjectHelper.notNull(endpoint, "endpoint", this);
+ messageConverter.setAllowNullHeaders(endpoint.isAllowNullHeaders());
// timeout map to use for purging messages which have timed out, while
waiting for an expected reply
// when doing request/reply over JMS
log.debug("Using timeout checker interval with {} millis",
endpoint.getRequestTimeoutCheckerInterval());
diff --git
a/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQComponentTest.java
b/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQComponentTest.java
index 4b59443..f732e6e 100644
---
a/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQComponentTest.java
+++
b/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQComponentTest.java
@@ -45,6 +45,7 @@ public class RabbitMQComponentTest {
assertEquals(true, endpoint.isAutoDelete());
assertEquals(true, endpoint.isDurable());
assertEquals(false, endpoint.isExclusiveConsumer());
+ assertEquals(false, endpoint.isAllowNullHeaders());
assertEquals("direct", endpoint.getExchangeType());
assertEquals(ConnectionFactory.DEFAULT_CONNECTION_TIMEOUT,
endpoint.getConnectionTimeout());
assertEquals(ConnectionFactory.DEFAULT_CHANNEL_MAX,
endpoint.getRequestedChannelMax());
@@ -70,6 +71,7 @@ public class RabbitMQComponentTest {
params.put("requestedFrameMax", 789);
params.put("requestedHeartbeat", 321);
params.put("exclusiveConsumer", true);
+ params.put("allowNullHeaders", true);
RabbitMQEndpoint endpoint = createEndpoint(params);
@@ -89,6 +91,7 @@ public class RabbitMQComponentTest {
assertEquals(789, endpoint.getRequestedFrameMax());
assertEquals(321, endpoint.getRequestedHeartbeat());
assertEquals(true, endpoint.isExclusiveConsumer());
+ assertEquals(true, endpoint.isAllowNullHeaders());
}
private RabbitMQEndpoint createEndpoint(Map<String, Object> params) throws
Exception {
diff --git
a/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQProducerIntTest.java
b/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQProducerIntTest.java
index 77bb1c1..d91bcc4 100644
---
a/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQProducerIntTest.java
+++
b/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQProducerIntTest.java
@@ -18,7 +18,9 @@ package org.apache.camel.component.rabbitmq;
import java.io.IOException;
import java.util.ArrayList;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
import java.util.concurrent.TimeoutException;
import com.rabbitmq.client.AMQP;
@@ -30,15 +32,22 @@ import org.apache.camel.Produce;
import org.apache.camel.ProducerTemplate;
import org.apache.camel.RuntimeCamelException;
import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.util.ObjectHelper;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
public class RabbitMQProducerIntTest extends AbstractRabbitMQIntTest {
+ private static final Logger LOGGER =
LoggerFactory.getLogger(RabbitMQProducerIntTest.class);
+
private static final String EXCHANGE = "ex1";
private static final String ROUTE = "route1";
+ private static final String CUSTOM_HEADER = "CustomHeader";
private static final String BASIC_URI_FORMAT =
"rabbitmq:localhost:5672/%s?routingKey=%s&username=cameltest&password=cameltest&skipQueueDeclare=true";
private static final String BASIC_URI = String.format(BASIC_URI_FORMAT,
EXCHANGE, ROUTE);
+ private static final String ALLOW_NULL_HEADERS = BASIC_URI +
"&allowNullHeaders=true";
private static final String PUBLISHER_ACKNOWLEDGES_URI = BASIC_URI +
"&mandatory=true&publisherAcknowledgements=true";
private static final String PUBLISHER_ACKNOWLEDGES_BAD_ROUTE_URI =
String.format(BASIC_URI_FORMAT, EXCHANGE, "route2") +
"&publisherAcknowledgements=true";
private static final String GUARANTEED_DELIVERY_URI = BASIC_URI +
"&mandatory=true&guaranteedDeliveries=true";
@@ -47,6 +56,9 @@ public class RabbitMQProducerIntTest extends
AbstractRabbitMQIntTest {
@Produce(uri = "direct:start")
protected ProducerTemplate template;
+
+ @Produce(uri = "direct:start-allow-null-headers")
+ protected ProducerTemplate templateAllowNullHeaders;
@Produce(uri = "direct:start-with-confirms")
protected ProducerTemplate templateWithConfirms;
@@ -73,6 +85,7 @@ public class RabbitMQProducerIntTest extends
AbstractRabbitMQIntTest {
@Override
public void configure() throws Exception {
from("direct:start").to(BASIC_URI);
+ from("direct:start-allow-null-headers").to(ALLOW_NULL_HEADERS);
from("direct:start-with-confirms").to(PUBLISHER_ACKNOWLEDGES_URI);
from("direct:start-with-confirms-bad-route").to(PUBLISHER_ACKNOWLEDGES_BAD_ROUTE_URI);
from("direct:start-with-guaranteed-delivery").to(GUARANTEED_DELIVERY_URI);
@@ -105,6 +118,38 @@ public class RabbitMQProducerIntTest extends
AbstractRabbitMQIntTest {
assertThatBodiesReceivedIn(received, "new message");
}
+
+ @Test
+ public void producedMessageWithNotNullHeaders() throws
InterruptedException, IOException, TimeoutException {
+ final List<String> received = new ArrayList<>();
+ final Map<String, Object> receivedHeaders = new HashMap<String,
Object>();
+ Map<String, Object> headers = new HashMap<String, Object>();
+
+ headers.put(RabbitMQConstants.EXCHANGE_NAME, EXCHANGE);
+ headers.put(CUSTOM_HEADER, CUSTOM_HEADER.toLowerCase());
+
+ channel.basicConsume("sammyq", true, new
ArrayPopulatingConsumer(received, receivedHeaders));
+
+ template.sendBodyAndHeaders("new message", headers);
+
+ assertThatBodiesAndHeadersReceivedIn(receivedHeaders, headers,
received, "new message");
+ }
+
+ @Test
+ public void producedMessageAllowNullHeaders() throws InterruptedException,
IOException, TimeoutException {
+ final List<String> received = new ArrayList<>();
+ final Map<String, Object> receivedHeaders = new HashMap<String,
Object>();
+ Map<String, Object> headers = new HashMap<String, Object>();
+
+ headers.put(RabbitMQConstants.EXCHANGE_NAME, null);
+ headers.put(CUSTOM_HEADER, null);
+
+ channel.basicConsume("sammyq", true, new
ArrayPopulatingConsumer(received, receivedHeaders));
+
+ templateAllowNullHeaders.sendBodyAndHeaders("new message", headers);
+
+ assertThatBodiesAndHeadersReceivedIn(receivedHeaders, headers,
received, "new message");
+ }
private void assertThatBodiesReceivedIn(final List<String> received, final
String... expected) throws InterruptedException {
Thread.sleep(500);
@@ -114,6 +159,25 @@ public class RabbitMQProducerIntTest extends
AbstractRabbitMQIntTest {
assertEquals(body, received.get(0));
}
}
+
+ private void assertThatBodiesAndHeadersReceivedIn(Map<String, Object>
receivedHeaders, Map<String, Object> expectedHeaders,
+ final List<String>
received, final String... expected) throws InterruptedException {
+ Thread.sleep(500);
+
+ assertListSize(received, expected.length);
+ for (String body : expected) {
+ assertEquals(body, received.get(0));
+ }
+
+ for (Map.Entry<String, Object> headers : expectedHeaders.entrySet()) {
+ Object receivedValue = receivedHeaders.get(headers.getKey());
+ Object expectedValue = headers.getValue();
+
+ assertTrue("Header key " + headers.getKey() + " not found",
receivedHeaders.containsKey(headers.getKey()));
+ assertEquals(0, ObjectHelper.compare(receivedValue == null ? "" :
receivedValue.toString(), expectedValue == null ? "" :
expectedValue.toString()));
+ }
+
+ }
@Test
public void
producedMessageIsReceivedWhenPublisherAcknowledgementsAreEnabled() throws
InterruptedException, IOException, TimeoutException {
@@ -162,10 +226,18 @@ public class RabbitMQProducerIntTest extends
AbstractRabbitMQIntTest {
private class ArrayPopulatingConsumer extends DefaultConsumer {
private final List<String> received;
+ private final Map<String, Object> receivedHeaders;
ArrayPopulatingConsumer(final List<String> received) {
super(RabbitMQProducerIntTest.this.channel);
this.received = received;
+ receivedHeaders = new HashMap<String, Object>();
+ }
+
+ ArrayPopulatingConsumer(final List<String> received, Map<String,
Object> receivedHeaders) {
+ super(RabbitMQProducerIntTest.this.channel);
+ this.received = received;
+ this.receivedHeaders = receivedHeaders;
}
@Override
@@ -173,6 +245,9 @@ public class RabbitMQProducerIntTest extends
AbstractRabbitMQIntTest {
Envelope envelope,
AMQP.BasicProperties properties,
byte[] body) throws IOException {
+ LOGGER.info("AMQP.BasicProperties: {}", properties);
+
+ receivedHeaders.putAll(properties.getHeaders());
received.add(new String(body));
}
}
diff --git
a/platforms/spring-boot/components-starter/camel-rabbitmq-starter/src/main/java/org/apache/camel/component/rabbitmq/springboot/RabbitMQComponentConfiguration.java
b/platforms/spring-boot/components-starter/camel-rabbitmq-starter/src/main/java/org/apache/camel/component/rabbitmq/springboot/RabbitMQComponentConfiguration.java
index b2a40b2..d9da3f6 100644
---
a/platforms/spring-boot/components-starter/camel-rabbitmq-starter/src/main/java/org/apache/camel/component/rabbitmq/springboot/RabbitMQComponentConfiguration.java
+++
b/platforms/spring-boot/components-starter/camel-rabbitmq-starter/src/main/java/org/apache/camel/component/rabbitmq/springboot/RabbitMQComponentConfiguration.java
@@ -38,7 +38,7 @@ public class RabbitMQComponentConfiguration
*/
private Boolean enabled;
/**
- * The hostname of the running rabbitmq instance or cluster.
+ * The hostname of the running RabbitMQ instance or cluster.
*/
private String hostname;
/**
@@ -282,6 +282,10 @@ public class RabbitMQComponentConfiguration
*/
private String deadLetterExchangeType = "direct";
/**
+ * Allow pass null values to header
+ */
+ private Boolean allowNullHeaders = false;
+ /**
* Whether the component should resolve property placeholders on itself
when
* starting. Only properties which are of String type can use property
* placeholders.
@@ -675,6 +679,14 @@ public class RabbitMQComponentConfiguration
this.deadLetterExchangeType = deadLetterExchangeType;
}
+ public Boolean getAllowNullHeaders() {
+ return allowNullHeaders;
+ }
+
+ public void setAllowNullHeaders(Boolean allowNullHeaders) {
+ this.allowNullHeaders = allowNullHeaders;
+ }
+
public Boolean getResolvePropertyPlaceholders() {
return resolvePropertyPlaceholders;
}