[
https://issues.apache.org/jira/browse/CAMEL-12654?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16553955#comment-16553955
]
ASF GitHub Bot commented on CAMEL-12654:
----------------------------------------
asfgit closed pull request #2436: CAMEL-12654: RabbitMQ Headers - Headers with
null value are skipped
URL: https://github.com/apache/camel/pull/2436
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git a/components/camel-rabbitmq/src/main/docs/rabbitmq-component.adoc
b/components/camel-rabbitmq/src/main/docs/rabbitmq-component.adoc
index e6e330c384e..823f22b7fbc 100644
--- a/components/camel-rabbitmq/src/main/docs/rabbitmq-component.adoc
+++ b/components/camel-rabbitmq/src/main/docs/rabbitmq-component.adoc
@@ -47,14 +47,14 @@ exchange name determines which exchange the queue will bind
to.
=== Options
// component options: START
-The RabbitMQ component supports 49 options, which are listed below.
+The RabbitMQ component supports 50 options, which are listed below.
[width="100%",cols="2,5,^1,2",options="header"]
|===
| Name | Description | Default | Type
-| *hostname* (common) | The hostname of the running rabbitmq instance or
cluster. | | String
+| *hostname* (common) | The hostname of the running RabbitMQ instance or
cluster. | | String
| *portNumber* (common) | Port number for the host with the running rabbitmq
instance or cluster. | 5672 | int
| *username* (security) | Username in case of authenticated access | guest |
String
| *password* (security) | Password for authenticated access | guest | String
@@ -102,6 +102,7 @@ The RabbitMQ component supports 49 options, which are
listed below.
| *deadLetterQueue* (common) | The name of the dead letter queue | | String
| *deadLetterRoutingKey* (common) | The routing key for the dead letter
exchange | | String
| *deadLetterExchangeType* (common) | The type of the dead letter exchange |
direct | String
+| *allowNullHeaders* (producer) | Allow pass null values to header | false |
boolean
| *resolveProperty Placeholders* (advanced) | Whether the component should
resolve property placeholders on itself when starting. Only properties which
are of String type can use property placeholders. | true | boolean
|===
// component options: END
@@ -126,7 +127,7 @@ with the following path and query parameters:
|===
-==== Query Parameters (61 parameters):
+==== Query Parameters (62 parameters):
[width="100%",cols="2,5,^1,2",options="header"]
@@ -163,6 +164,7 @@ with the following path and query parameters:
| *exceptionHandler* (consumer) | To let the consumer use a custom
ExceptionHandler. Notice if the option bridgeErrorHandler is enabled then this
options is not in use. By default the consumer will deal with exceptions, that
will be logged at WARN or ERROR level and ignored. | | ExceptionHandler
| *exchangePattern* (consumer) | Sets the exchange pattern when the consumer
creates an exchange. | | ExchangePattern
| *threadPoolSize* (consumer) | The consumer uses a Thread Pool Executor with
a fixed number of threads. This setting allows you to set that number of
threads. | 10 | int
+| *allowNullHeaders* (producer) | Allow pass null values to header | false |
boolean
| *bridgeEndpoint* (producer) | If the bridgeEndpoint is true, the producer
will ignore the message header of rabbitmq.EXCHANGE_NAME and
rabbitmq.ROUTING_KEY | false | boolean
| *channelPoolMaxSize* (producer) | Get maximum number of opened channel in
pool | 10 | int
| *channelPoolMaxWait* (producer) | Set the maximum number of milliseconds to
wait for a channel from the pool | 1000 | long
diff --git
a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQComponent.java
b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQComponent.java
index 96c1ac8dfac..69a3e8ac34d 100644
---
a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQComponent.java
+++
b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQComponent.java
@@ -81,6 +81,8 @@
private String deadLetterQueue;
@Metadata(label = "common", defaultValue = "direct", enums =
"direct,fanout,headers,topic")
private String deadLetterExchangeType = "direct";
+ @Metadata(label = "producer")
+ private boolean allowNullHeaders;
@Metadata(label = "security")
private String sslProtocol;
@Metadata(label = "security")
@@ -241,6 +243,7 @@ protected RabbitMQEndpoint createEndpoint(String uri,
endpoint.setDeadLetterExchangeType(getDeadLetterExchangeType());
endpoint.setDeadLetterQueue(getDeadLetterQueue());
endpoint.setDeadLetterRoutingKey(getDeadLetterRoutingKey());
+ endpoint.setAllowNullHeaders(isAllowNullHeaders());
setProperties(endpoint, params);
if (LOG.isDebugEnabled()) {
@@ -262,6 +265,8 @@ protected RabbitMQEndpoint createEndpoint(String uri,
endpoint.getExchangeArgs().putAll(IntrospectionSupport.extractProperties(argsCopy,
EXCHANGE_ARG_PREFIX));
endpoint.getQueueArgs().putAll(IntrospectionSupport.extractProperties(argsCopy,
QUEUE_ARG_PREFIX));
endpoint.getBindingArgs().putAll(IntrospectionSupport.extractProperties(argsCopy,
BINDING_ARG_PREFIX));
+ // Change null headers processing for message converter
+
endpoint.getMessageConverter().setAllowNullHeaders(endpoint.isAllowNullHeaders());
return endpoint;
}
@@ -271,7 +276,7 @@ public String getHostname() {
}
/**
- * The hostname of the running rabbitmq instance or cluster.
+ * The hostname of the running RabbitMQ instance or cluster.
*/
public void setHostname(String hostname) {
this.hostname = hostname;
@@ -859,4 +864,14 @@ public void setDeadLetterExchangeType(String
deadLetterExchangeType) {
this.deadLetterExchangeType = deadLetterExchangeType;
}
+ /**
+ * Allow pass null values to header
+ */
+ public boolean isAllowNullHeaders() {
+ return allowNullHeaders;
+ }
+
+ public void setAllowNullHeaders(boolean allowNullHeaders) {
+ this.allowNullHeaders = allowNullHeaders;
+ }
}
diff --git
a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQEndpoint.java
b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQEndpoint.java
index 837ca59def6..9cdbf341b98 100644
---
a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQEndpoint.java
+++
b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQEndpoint.java
@@ -175,6 +175,8 @@
private long publisherAcknowledgementsTimeout;
@UriParam(label = "producer")
private boolean guaranteedDeliveries;
+ @UriParam(label = "producer")
+ private boolean allowNullHeaders;
// camel-jms supports this setting but it is not currently configurable in
camel-rabbitmq
private boolean useMessageIDAsCorrelationID = true;
// camel-jms supports this setting but it is not currently configurable in
camel-rabbitmq
@@ -999,6 +1001,17 @@ public void setExclusiveConsumer(boolean
exclusiveConsumer) {
this.exclusiveConsumer = exclusiveConsumer;
}
+ /**
+ * Allow pass null values to header
+ */
+ public boolean isAllowNullHeaders() {
+ return allowNullHeaders;
+ }
+
+ public void setAllowNullHeaders(boolean allowNullHeaders) {
+ this.allowNullHeaders = allowNullHeaders;
+ }
+
public boolean isPassive() {
return passive;
}
diff --git
a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQMessageConverter.java
b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQMessageConverter.java
index 6cb535e988e..07902ca14ee 100644
---
a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQMessageConverter.java
+++
b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQMessageConverter.java
@@ -37,6 +37,8 @@
public class RabbitMQMessageConverter {
protected static final Logger LOG =
LoggerFactory.getLogger(RabbitMQMessageConverter.class);
+ private boolean allowNullHeaders;
+
/**
* Will take an {@link Exchange} and add header values back to the {@link
Exchange#getIn()}
*/
@@ -165,7 +167,8 @@ public void mergeAmqpProperties(Exchange exchange,
AMQP.BasicProperties properti
for (Map.Entry<String, Object> header : headers.entrySet()) {
// filter header values.
Object value = getValidRabbitMQHeaderValue(header.getValue());
- if (value != null) {
+
+ if (value != null || isAllowNullHeaders()) {
filteredHeaders.put(header.getKey(), header.getValue());
} else if (LOG.isDebugEnabled()) {
if (header.getValue() == null) {
@@ -305,4 +308,12 @@ private boolean hasHeaders(final AMQP.BasicProperties
properties) {
private Object isSerializeHeaderEnabled(final AMQP.BasicProperties
properties) {
return properties.getHeaders().get(RabbitMQEndpoint.SERIALIZE_HEADER);
}
+
+ public boolean isAllowNullHeaders() {
+ return allowNullHeaders;
+ }
+
+ public void setAllowNullHeaders(boolean allowNullHeaders) {
+ this.allowNullHeaders = allowNullHeaders;
+ }
}
diff --git
a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/reply/ReplyManagerSupport.java
b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/reply/ReplyManagerSupport.java
index 15b990a7c2c..1ddf056385e 100644
---
a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/reply/ReplyManagerSupport.java
+++
b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/reply/ReplyManagerSupport.java
@@ -224,6 +224,7 @@ protected void doStart() throws Exception {
ObjectHelper.notNull(executorService, "executorService", this);
ObjectHelper.notNull(endpoint, "endpoint", this);
+ messageConverter.setAllowNullHeaders(endpoint.isAllowNullHeaders());
// timeout map to use for purging messages which have timed out, while
waiting for an expected reply
// when doing request/reply over JMS
log.debug("Using timeout checker interval with {} millis",
endpoint.getRequestTimeoutCheckerInterval());
diff --git
a/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQComponentTest.java
b/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQComponentTest.java
index 4b594432529..f732e6e38f5 100644
---
a/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQComponentTest.java
+++
b/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQComponentTest.java
@@ -45,6 +45,7 @@ public void testDefaultProperties() throws Exception {
assertEquals(true, endpoint.isAutoDelete());
assertEquals(true, endpoint.isDurable());
assertEquals(false, endpoint.isExclusiveConsumer());
+ assertEquals(false, endpoint.isAllowNullHeaders());
assertEquals("direct", endpoint.getExchangeType());
assertEquals(ConnectionFactory.DEFAULT_CONNECTION_TIMEOUT,
endpoint.getConnectionTimeout());
assertEquals(ConnectionFactory.DEFAULT_CHANNEL_MAX,
endpoint.getRequestedChannelMax());
@@ -70,6 +71,7 @@ public void testPropertiesSet() throws Exception {
params.put("requestedFrameMax", 789);
params.put("requestedHeartbeat", 321);
params.put("exclusiveConsumer", true);
+ params.put("allowNullHeaders", true);
RabbitMQEndpoint endpoint = createEndpoint(params);
@@ -89,6 +91,7 @@ public void testPropertiesSet() throws Exception {
assertEquals(789, endpoint.getRequestedFrameMax());
assertEquals(321, endpoint.getRequestedHeartbeat());
assertEquals(true, endpoint.isExclusiveConsumer());
+ assertEquals(true, endpoint.isAllowNullHeaders());
}
private RabbitMQEndpoint createEndpoint(Map<String, Object> params) throws
Exception {
diff --git
a/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQProducerIntTest.java
b/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQProducerIntTest.java
index 77bb1c1b321..d91bcc4bf73 100644
---
a/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQProducerIntTest.java
+++
b/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQProducerIntTest.java
@@ -18,7 +18,9 @@
import java.io.IOException;
import java.util.ArrayList;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
import java.util.concurrent.TimeoutException;
import com.rabbitmq.client.AMQP;
@@ -30,15 +32,22 @@
import org.apache.camel.ProducerTemplate;
import org.apache.camel.RuntimeCamelException;
import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.util.ObjectHelper;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
public class RabbitMQProducerIntTest extends AbstractRabbitMQIntTest {
+ private static final Logger LOGGER =
LoggerFactory.getLogger(RabbitMQProducerIntTest.class);
+
private static final String EXCHANGE = "ex1";
private static final String ROUTE = "route1";
+ private static final String CUSTOM_HEADER = "CustomHeader";
private static final String BASIC_URI_FORMAT =
"rabbitmq:localhost:5672/%s?routingKey=%s&username=cameltest&password=cameltest&skipQueueDeclare=true";
private static final String BASIC_URI = String.format(BASIC_URI_FORMAT,
EXCHANGE, ROUTE);
+ private static final String ALLOW_NULL_HEADERS = BASIC_URI +
"&allowNullHeaders=true";
private static final String PUBLISHER_ACKNOWLEDGES_URI = BASIC_URI +
"&mandatory=true&publisherAcknowledgements=true";
private static final String PUBLISHER_ACKNOWLEDGES_BAD_ROUTE_URI =
String.format(BASIC_URI_FORMAT, EXCHANGE, "route2") +
"&publisherAcknowledgements=true";
private static final String GUARANTEED_DELIVERY_URI = BASIC_URI +
"&mandatory=true&guaranteedDeliveries=true";
@@ -47,6 +56,9 @@
@Produce(uri = "direct:start")
protected ProducerTemplate template;
+
+ @Produce(uri = "direct:start-allow-null-headers")
+ protected ProducerTemplate templateAllowNullHeaders;
@Produce(uri = "direct:start-with-confirms")
protected ProducerTemplate templateWithConfirms;
@@ -73,6 +85,7 @@ protected RouteBuilder createRouteBuilder() throws Exception {
@Override
public void configure() throws Exception {
from("direct:start").to(BASIC_URI);
+ from("direct:start-allow-null-headers").to(ALLOW_NULL_HEADERS);
from("direct:start-with-confirms").to(PUBLISHER_ACKNOWLEDGES_URI);
from("direct:start-with-confirms-bad-route").to(PUBLISHER_ACKNOWLEDGES_BAD_ROUTE_URI);
from("direct:start-with-guaranteed-delivery").to(GUARANTEED_DELIVERY_URI);
@@ -105,6 +118,38 @@ public void producedMessageIsReceived() throws
InterruptedException, IOException
assertThatBodiesReceivedIn(received, "new message");
}
+
+ @Test
+ public void producedMessageWithNotNullHeaders() throws
InterruptedException, IOException, TimeoutException {
+ final List<String> received = new ArrayList<>();
+ final Map<String, Object> receivedHeaders = new HashMap<String,
Object>();
+ Map<String, Object> headers = new HashMap<String, Object>();
+
+ headers.put(RabbitMQConstants.EXCHANGE_NAME, EXCHANGE);
+ headers.put(CUSTOM_HEADER, CUSTOM_HEADER.toLowerCase());
+
+ channel.basicConsume("sammyq", true, new
ArrayPopulatingConsumer(received, receivedHeaders));
+
+ template.sendBodyAndHeaders("new message", headers);
+
+ assertThatBodiesAndHeadersReceivedIn(receivedHeaders, headers,
received, "new message");
+ }
+
+ @Test
+ public void producedMessageAllowNullHeaders() throws InterruptedException,
IOException, TimeoutException {
+ final List<String> received = new ArrayList<>();
+ final Map<String, Object> receivedHeaders = new HashMap<String,
Object>();
+ Map<String, Object> headers = new HashMap<String, Object>();
+
+ headers.put(RabbitMQConstants.EXCHANGE_NAME, null);
+ headers.put(CUSTOM_HEADER, null);
+
+ channel.basicConsume("sammyq", true, new
ArrayPopulatingConsumer(received, receivedHeaders));
+
+ templateAllowNullHeaders.sendBodyAndHeaders("new message", headers);
+
+ assertThatBodiesAndHeadersReceivedIn(receivedHeaders, headers,
received, "new message");
+ }
private void assertThatBodiesReceivedIn(final List<String> received, final
String... expected) throws InterruptedException {
Thread.sleep(500);
@@ -114,6 +159,25 @@ private void assertThatBodiesReceivedIn(final List<String>
received, final Strin
assertEquals(body, received.get(0));
}
}
+
+ private void assertThatBodiesAndHeadersReceivedIn(Map<String, Object>
receivedHeaders, Map<String, Object> expectedHeaders,
+ final List<String>
received, final String... expected) throws InterruptedException {
+ Thread.sleep(500);
+
+ assertListSize(received, expected.length);
+ for (String body : expected) {
+ assertEquals(body, received.get(0));
+ }
+
+ for (Map.Entry<String, Object> headers : expectedHeaders.entrySet()) {
+ Object receivedValue = receivedHeaders.get(headers.getKey());
+ Object expectedValue = headers.getValue();
+
+ assertTrue("Header key " + headers.getKey() + " not found",
receivedHeaders.containsKey(headers.getKey()));
+ assertEquals(0, ObjectHelper.compare(receivedValue == null ? "" :
receivedValue.toString(), expectedValue == null ? "" :
expectedValue.toString()));
+ }
+
+ }
@Test
public void
producedMessageIsReceivedWhenPublisherAcknowledgementsAreEnabled() throws
InterruptedException, IOException, TimeoutException {
@@ -162,10 +226,18 @@ public void
shouldSuccessfullyProduceMessageWhenGuaranteedDeliveryIsActivatedOnA
private class ArrayPopulatingConsumer extends DefaultConsumer {
private final List<String> received;
+ private final Map<String, Object> receivedHeaders;
ArrayPopulatingConsumer(final List<String> received) {
super(RabbitMQProducerIntTest.this.channel);
this.received = received;
+ receivedHeaders = new HashMap<String, Object>();
+ }
+
+ ArrayPopulatingConsumer(final List<String> received, Map<String,
Object> receivedHeaders) {
+ super(RabbitMQProducerIntTest.this.channel);
+ this.received = received;
+ this.receivedHeaders = receivedHeaders;
}
@Override
@@ -173,6 +245,9 @@ public void handleDelivery(String consumerTag,
Envelope envelope,
AMQP.BasicProperties properties,
byte[] body) throws IOException {
+ LOGGER.info("AMQP.BasicProperties: {}", properties);
+
+ receivedHeaders.putAll(properties.getHeaders());
received.add(new String(body));
}
}
diff --git
a/platforms/spring-boot/components-starter/camel-rabbitmq-starter/src/main/java/org/apache/camel/component/rabbitmq/springboot/RabbitMQComponentConfiguration.java
b/platforms/spring-boot/components-starter/camel-rabbitmq-starter/src/main/java/org/apache/camel/component/rabbitmq/springboot/RabbitMQComponentConfiguration.java
index b2a40b24c3f..d9da3f6dde9 100644
---
a/platforms/spring-boot/components-starter/camel-rabbitmq-starter/src/main/java/org/apache/camel/component/rabbitmq/springboot/RabbitMQComponentConfiguration.java
+++
b/platforms/spring-boot/components-starter/camel-rabbitmq-starter/src/main/java/org/apache/camel/component/rabbitmq/springboot/RabbitMQComponentConfiguration.java
@@ -38,7 +38,7 @@
*/
private Boolean enabled;
/**
- * The hostname of the running rabbitmq instance or cluster.
+ * The hostname of the running RabbitMQ instance or cluster.
*/
private String hostname;
/**
@@ -281,6 +281,10 @@
* The type of the dead letter exchange
*/
private String deadLetterExchangeType = "direct";
+ /**
+ * Allow pass null values to header
+ */
+ private Boolean allowNullHeaders = false;
/**
* Whether the component should resolve property placeholders on itself
when
* starting. Only properties which are of String type can use property
@@ -675,6 +679,14 @@ public void setDeadLetterExchangeType(String
deadLetterExchangeType) {
this.deadLetterExchangeType = deadLetterExchangeType;
}
+ public Boolean getAllowNullHeaders() {
+ return allowNullHeaders;
+ }
+
+ public void setAllowNullHeaders(Boolean allowNullHeaders) {
+ this.allowNullHeaders = allowNullHeaders;
+ }
+
public Boolean getResolvePropertyPlaceholders() {
return resolvePropertyPlaceholders;
}
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
> RabbitMQ Headers - Headers with null value are skipped.
> -------------------------------------------------------
>
> Key: CAMEL-12654
> URL: https://issues.apache.org/jira/browse/CAMEL-12654
> Project: Camel
> Issue Type: Bug
> Components: camel-rabbitmq
> Affects Versions: 2.21.1
> Reporter: Prakhar
> Priority: Minor
> Labels: headers, rabbitmq
>
> Reference: Conversation with Claus Ibsen on
> [Stackoverflow|https://stackoverflow.com/questions/50583749/apache-camel-how-to-setheader-value-as-null]
> org.apache.camel.component.rabbitmq.RabbitMQMessageConverter.buildProperties()
> skips the headers which have value null. The actual java client for RabbitMQ
> does not have this check. It accepts a basic Map<String,Object> which can
> accept null value.
> [Reference|https://rabbitmq.github.io/rabbitmq-java-client/api/current/com/rabbitmq/client/AMQP.BasicProperties.Builder.html]
> . Check the definition of headers() method
> *The business scenario where we use it*. On the RabbitMQ, we accept messages
> from multiple sources. Depending on whether a specific header is null or not,
> we route these message downstream in our processing pipeline.
> If required, I could provide a working example of rabbitmq java-client to
> demonstrate that the headers with null value are not skipped
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)