This is an automated email from the ASF dual-hosted git repository.

dmvolod pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git


The following commit(s) were added to refs/heads/master by this push:
     new fddedbc  CAMEL-12654: RabbitMQ Headers - Headers with null value are 
skipped
fddedbc is described below

commit fddedbc44de60ccc214c0c40be420dad7e1d13da
Author: Dmitry Volodin <[email protected]>
AuthorDate: Tue Jul 24 11:08:45 2018 +0300

    CAMEL-12654: RabbitMQ Headers - Headers with null value are skipped
---
 .../src/main/docs/rabbitmq-component.adoc          |  8 ++-
 .../component/rabbitmq/RabbitMQComponent.java      | 17 ++++-
 .../camel/component/rabbitmq/RabbitMQEndpoint.java | 13 ++++
 .../rabbitmq/RabbitMQMessageConverter.java         | 13 +++-
 .../rabbitmq/reply/ReplyManagerSupport.java        |  1 +
 .../component/rabbitmq/RabbitMQComponentTest.java  |  3 +
 .../rabbitmq/RabbitMQProducerIntTest.java          | 75 ++++++++++++++++++++++
 .../springboot/RabbitMQComponentConfiguration.java | 14 +++-
 8 files changed, 138 insertions(+), 6 deletions(-)

diff --git a/components/camel-rabbitmq/src/main/docs/rabbitmq-component.adoc 
b/components/camel-rabbitmq/src/main/docs/rabbitmq-component.adoc
index e6e330c..823f22b 100644
--- a/components/camel-rabbitmq/src/main/docs/rabbitmq-component.adoc
+++ b/components/camel-rabbitmq/src/main/docs/rabbitmq-component.adoc
@@ -47,14 +47,14 @@ exchange name determines which exchange the queue will bind 
to.
 === Options
 
 // component options: START
-The RabbitMQ component supports 49 options, which are listed below.
+The RabbitMQ component supports 50 options, which are listed below.
 
 
 
 [width="100%",cols="2,5,^1,2",options="header"]
 |===
 | Name | Description | Default | Type
-| *hostname* (common) | The hostname of the running rabbitmq instance or 
cluster. |  | String
+| *hostname* (common) | The hostname of the running RabbitMQ instance or 
cluster. |  | String
 | *portNumber* (common) | Port number for the host with the running rabbitmq 
instance or cluster. | 5672 | int
 | *username* (security) | Username in case of authenticated access | guest | 
String
 | *password* (security) | Password for authenticated access | guest | String
@@ -102,6 +102,7 @@ The RabbitMQ component supports 49 options, which are 
listed below.
 | *deadLetterQueue* (common) | The name of the dead letter queue |  | String
 | *deadLetterRoutingKey* (common) | The routing key for the dead letter 
exchange |  | String
 | *deadLetterExchangeType* (common) | The type of the dead letter exchange | 
direct | String
+| *allowNullHeaders* (producer) | Allow pass null values to header | false | 
boolean
 | *resolveProperty Placeholders* (advanced) | Whether the component should 
resolve property placeholders on itself when starting. Only properties which 
are of String type can use property placeholders. | true | boolean
 |===
 // component options: END
@@ -126,7 +127,7 @@ with the following path and query parameters:
 |===
 
 
-==== Query Parameters (61 parameters):
+==== Query Parameters (62 parameters):
 
 
 [width="100%",cols="2,5,^1,2",options="header"]
@@ -163,6 +164,7 @@ with the following path and query parameters:
 | *exceptionHandler* (consumer) | To let the consumer use a custom 
ExceptionHandler. Notice if the option bridgeErrorHandler is enabled then this 
options is not in use. By default the consumer will deal with exceptions, that 
will be logged at WARN or ERROR level and ignored. |  | ExceptionHandler
 | *exchangePattern* (consumer) | Sets the exchange pattern when the consumer 
creates an exchange. |  | ExchangePattern
 | *threadPoolSize* (consumer) | The consumer uses a Thread Pool Executor with 
a fixed number of threads. This setting allows you to set that number of 
threads. | 10 | int
+| *allowNullHeaders* (producer) | Allow pass null values to header | false | 
boolean
 | *bridgeEndpoint* (producer) | If the bridgeEndpoint is true, the producer 
will ignore the message header of rabbitmq.EXCHANGE_NAME and 
rabbitmq.ROUTING_KEY | false | boolean
 | *channelPoolMaxSize* (producer) | Get maximum number of opened channel in 
pool | 10 | int
 | *channelPoolMaxWait* (producer) | Set the maximum number of milliseconds to 
wait for a channel from the pool | 1000 | long
diff --git 
a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQComponent.java
 
b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQComponent.java
index 96c1ac8..69a3e8a 100644
--- 
a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQComponent.java
+++ 
b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQComponent.java
@@ -81,6 +81,8 @@ public class RabbitMQComponent extends UriEndpointComponent {
     private String deadLetterQueue;
     @Metadata(label = "common", defaultValue = "direct", enums = 
"direct,fanout,headers,topic")
     private String deadLetterExchangeType = "direct";
+    @Metadata(label = "producer")
+    private boolean allowNullHeaders;
     @Metadata(label = "security")
     private String sslProtocol;
     @Metadata(label = "security")
@@ -241,6 +243,7 @@ public class RabbitMQComponent extends UriEndpointComponent 
{
         endpoint.setDeadLetterExchangeType(getDeadLetterExchangeType());
         endpoint.setDeadLetterQueue(getDeadLetterQueue());
         endpoint.setDeadLetterRoutingKey(getDeadLetterRoutingKey());
+        endpoint.setAllowNullHeaders(isAllowNullHeaders());
         setProperties(endpoint, params);
 
         if (LOG.isDebugEnabled()) {
@@ -262,6 +265,8 @@ public class RabbitMQComponent extends UriEndpointComponent 
{
         
endpoint.getExchangeArgs().putAll(IntrospectionSupport.extractProperties(argsCopy,
 EXCHANGE_ARG_PREFIX));
         
endpoint.getQueueArgs().putAll(IntrospectionSupport.extractProperties(argsCopy, 
QUEUE_ARG_PREFIX));
         
endpoint.getBindingArgs().putAll(IntrospectionSupport.extractProperties(argsCopy,
 BINDING_ARG_PREFIX));
+        // Change null headers processing for message converter
+        
endpoint.getMessageConverter().setAllowNullHeaders(endpoint.isAllowNullHeaders());
 
         return endpoint;
     }
@@ -271,7 +276,7 @@ public class RabbitMQComponent extends UriEndpointComponent 
{
     }
 
     /**
-     * The hostname of the running rabbitmq instance or cluster.
+     * The hostname of the running RabbitMQ instance or cluster.
      */
     public void setHostname(String hostname) {
         this.hostname = hostname;
@@ -859,4 +864,14 @@ public class RabbitMQComponent extends 
UriEndpointComponent {
         this.deadLetterExchangeType = deadLetterExchangeType;
     }
 
+    /**
+     * Allow pass null values to header
+     */
+    public boolean isAllowNullHeaders() {
+        return allowNullHeaders;
+    }
+
+    public void setAllowNullHeaders(boolean allowNullHeaders) {
+        this.allowNullHeaders = allowNullHeaders;
+    }
 }
diff --git 
a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQEndpoint.java
 
b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQEndpoint.java
index 837ca59..9cdbf34 100644
--- 
a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQEndpoint.java
+++ 
b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQEndpoint.java
@@ -175,6 +175,8 @@ public class RabbitMQEndpoint extends DefaultEndpoint 
implements AsyncEndpoint {
     private long publisherAcknowledgementsTimeout;
     @UriParam(label = "producer")
     private boolean guaranteedDeliveries;
+    @UriParam(label = "producer")
+    private boolean allowNullHeaders;
     // camel-jms supports this setting but it is not currently configurable in 
camel-rabbitmq
     private boolean useMessageIDAsCorrelationID = true;
     // camel-jms supports this setting but it is not currently configurable in 
camel-rabbitmq
@@ -999,6 +1001,17 @@ public class RabbitMQEndpoint extends DefaultEndpoint 
implements AsyncEndpoint {
         this.exclusiveConsumer = exclusiveConsumer;
     }
 
+    /**
+     * Allow pass null values to header
+     */
+    public boolean isAllowNullHeaders() {
+        return allowNullHeaders;
+    }
+
+    public void setAllowNullHeaders(boolean allowNullHeaders) {
+        this.allowNullHeaders = allowNullHeaders;
+    }
+
     public boolean isPassive() {
         return passive;
     }
diff --git 
a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQMessageConverter.java
 
b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQMessageConverter.java
index 6cb535e..07902ca 100644
--- 
a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQMessageConverter.java
+++ 
b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQMessageConverter.java
@@ -37,6 +37,8 @@ import org.slf4j.LoggerFactory;
 public class RabbitMQMessageConverter {
     protected static final Logger LOG = 
LoggerFactory.getLogger(RabbitMQMessageConverter.class);
 
+    private boolean allowNullHeaders;
+    
     /**
      * Will take an {@link Exchange} and add header values back to the {@link 
Exchange#getIn()}
      */
@@ -165,7 +167,8 @@ public class RabbitMQMessageConverter {
         for (Map.Entry<String, Object> header : headers.entrySet()) {
             // filter header values.
             Object value = getValidRabbitMQHeaderValue(header.getValue());
-            if (value != null) {
+            
+            if (value != null || isAllowNullHeaders()) {
                 filteredHeaders.put(header.getKey(), header.getValue());
             } else if (LOG.isDebugEnabled()) {
                 if (header.getValue() == null) {
@@ -305,4 +308,12 @@ public class RabbitMQMessageConverter {
     private Object isSerializeHeaderEnabled(final AMQP.BasicProperties 
properties) {
         return properties.getHeaders().get(RabbitMQEndpoint.SERIALIZE_HEADER);
     }
+
+    public boolean isAllowNullHeaders() {
+        return allowNullHeaders;
+    }
+
+    public void setAllowNullHeaders(boolean allowNullHeaders) {
+        this.allowNullHeaders = allowNullHeaders;
+    }
 }
diff --git 
a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/reply/ReplyManagerSupport.java
 
b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/reply/ReplyManagerSupport.java
index 15b990a..1ddf056 100644
--- 
a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/reply/ReplyManagerSupport.java
+++ 
b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/reply/ReplyManagerSupport.java
@@ -224,6 +224,7 @@ public abstract class ReplyManagerSupport extends 
ServiceSupport implements Repl
         ObjectHelper.notNull(executorService, "executorService", this);
         ObjectHelper.notNull(endpoint, "endpoint", this);
 
+        messageConverter.setAllowNullHeaders(endpoint.isAllowNullHeaders());
         // timeout map to use for purging messages which have timed out, while 
waiting for an expected reply
         // when doing request/reply over JMS
         log.debug("Using timeout checker interval with {} millis", 
endpoint.getRequestTimeoutCheckerInterval());
diff --git 
a/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQComponentTest.java
 
b/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQComponentTest.java
index 4b59443..f732e6e 100644
--- 
a/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQComponentTest.java
+++ 
b/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQComponentTest.java
@@ -45,6 +45,7 @@ public class RabbitMQComponentTest {
         assertEquals(true, endpoint.isAutoDelete());
         assertEquals(true, endpoint.isDurable());
         assertEquals(false, endpoint.isExclusiveConsumer());
+        assertEquals(false, endpoint.isAllowNullHeaders());
         assertEquals("direct", endpoint.getExchangeType());
         assertEquals(ConnectionFactory.DEFAULT_CONNECTION_TIMEOUT, 
endpoint.getConnectionTimeout());
         assertEquals(ConnectionFactory.DEFAULT_CHANNEL_MAX, 
endpoint.getRequestedChannelMax());
@@ -70,6 +71,7 @@ public class RabbitMQComponentTest {
         params.put("requestedFrameMax", 789);
         params.put("requestedHeartbeat", 321);
         params.put("exclusiveConsumer", true);
+        params.put("allowNullHeaders", true);
 
         RabbitMQEndpoint endpoint = createEndpoint(params);
 
@@ -89,6 +91,7 @@ public class RabbitMQComponentTest {
         assertEquals(789, endpoint.getRequestedFrameMax());
         assertEquals(321, endpoint.getRequestedHeartbeat());
         assertEquals(true, endpoint.isExclusiveConsumer());
+        assertEquals(true, endpoint.isAllowNullHeaders());
     }
 
     private RabbitMQEndpoint createEndpoint(Map<String, Object> params) throws 
Exception {
diff --git 
a/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQProducerIntTest.java
 
b/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQProducerIntTest.java
index 77bb1c1..d91bcc4 100644
--- 
a/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQProducerIntTest.java
+++ 
b/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQProducerIntTest.java
@@ -18,7 +18,9 @@ package org.apache.camel.component.rabbitmq;
 
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.concurrent.TimeoutException;
 
 import com.rabbitmq.client.AMQP;
@@ -30,15 +32,22 @@ import org.apache.camel.Produce;
 import org.apache.camel.ProducerTemplate;
 import org.apache.camel.RuntimeCamelException;
 import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.util.ObjectHelper;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 public class RabbitMQProducerIntTest extends AbstractRabbitMQIntTest {
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(RabbitMQProducerIntTest.class);
+    
     private static final String EXCHANGE = "ex1";
     private static final String ROUTE = "route1";
+    private static final String CUSTOM_HEADER = "CustomHeader";
     private static final String BASIC_URI_FORMAT = 
"rabbitmq:localhost:5672/%s?routingKey=%s&username=cameltest&password=cameltest&skipQueueDeclare=true";
     private static final String BASIC_URI = String.format(BASIC_URI_FORMAT, 
EXCHANGE, ROUTE);
+    private static final String ALLOW_NULL_HEADERS = BASIC_URI + 
"&allowNullHeaders=true";
     private static final String PUBLISHER_ACKNOWLEDGES_URI = BASIC_URI + 
"&mandatory=true&publisherAcknowledgements=true";
     private static final String PUBLISHER_ACKNOWLEDGES_BAD_ROUTE_URI = 
String.format(BASIC_URI_FORMAT, EXCHANGE, "route2") + 
"&publisherAcknowledgements=true";
     private static final String GUARANTEED_DELIVERY_URI = BASIC_URI + 
"&mandatory=true&guaranteedDeliveries=true";
@@ -47,6 +56,9 @@ public class RabbitMQProducerIntTest extends 
AbstractRabbitMQIntTest {
 
     @Produce(uri = "direct:start")
     protected ProducerTemplate template;
+    
+    @Produce(uri = "direct:start-allow-null-headers")
+    protected ProducerTemplate templateAllowNullHeaders;
 
     @Produce(uri = "direct:start-with-confirms")
     protected ProducerTemplate templateWithConfirms;
@@ -73,6 +85,7 @@ public class RabbitMQProducerIntTest extends 
AbstractRabbitMQIntTest {
             @Override
             public void configure() throws Exception {
                 from("direct:start").to(BASIC_URI);
+                from("direct:start-allow-null-headers").to(ALLOW_NULL_HEADERS);
                 
from("direct:start-with-confirms").to(PUBLISHER_ACKNOWLEDGES_URI);
                 
from("direct:start-with-confirms-bad-route").to(PUBLISHER_ACKNOWLEDGES_BAD_ROUTE_URI);
                 
from("direct:start-with-guaranteed-delivery").to(GUARANTEED_DELIVERY_URI);
@@ -105,6 +118,38 @@ public class RabbitMQProducerIntTest extends 
AbstractRabbitMQIntTest {
 
         assertThatBodiesReceivedIn(received, "new message");
     }
+    
+    @Test
+    public void producedMessageWithNotNullHeaders() throws 
InterruptedException, IOException, TimeoutException {
+        final List<String> received = new ArrayList<>();
+        final Map<String, Object> receivedHeaders = new HashMap<String, 
Object>();
+        Map<String, Object> headers = new HashMap<String, Object>();
+        
+        headers.put(RabbitMQConstants.EXCHANGE_NAME, EXCHANGE);
+        headers.put(CUSTOM_HEADER, CUSTOM_HEADER.toLowerCase());
+        
+        channel.basicConsume("sammyq", true, new 
ArrayPopulatingConsumer(received, receivedHeaders));
+
+        template.sendBodyAndHeaders("new message", headers);
+
+        assertThatBodiesAndHeadersReceivedIn(receivedHeaders, headers, 
received, "new message");
+    }
+    
+    @Test
+    public void producedMessageAllowNullHeaders() throws InterruptedException, 
IOException, TimeoutException {
+        final List<String> received = new ArrayList<>();
+        final Map<String, Object> receivedHeaders = new HashMap<String, 
Object>();
+        Map<String, Object> headers = new HashMap<String, Object>();
+        
+        headers.put(RabbitMQConstants.EXCHANGE_NAME, null);
+        headers.put(CUSTOM_HEADER, null);
+        
+        channel.basicConsume("sammyq", true, new 
ArrayPopulatingConsumer(received, receivedHeaders));
+
+        templateAllowNullHeaders.sendBodyAndHeaders("new message", headers);
+
+        assertThatBodiesAndHeadersReceivedIn(receivedHeaders, headers, 
received, "new message");
+    }
 
     private void assertThatBodiesReceivedIn(final List<String> received, final 
String... expected) throws InterruptedException {
         Thread.sleep(500);
@@ -114,6 +159,25 @@ public class RabbitMQProducerIntTest extends 
AbstractRabbitMQIntTest {
             assertEquals(body, received.get(0));
         }
     }
+    
+    private void assertThatBodiesAndHeadersReceivedIn(Map<String, Object> 
receivedHeaders, Map<String, Object> expectedHeaders,
+                                                      final List<String> 
received, final String... expected) throws InterruptedException {
+        Thread.sleep(500);
+
+        assertListSize(received, expected.length);
+        for (String body : expected) {
+            assertEquals(body, received.get(0));
+        }
+        
+        for (Map.Entry<String, Object> headers : expectedHeaders.entrySet()) {
+            Object receivedValue = receivedHeaders.get(headers.getKey());
+            Object expectedValue = headers.getValue();
+            
+            assertTrue("Header key " + headers.getKey() + " not found", 
receivedHeaders.containsKey(headers.getKey()));
+            assertEquals(0, ObjectHelper.compare(receivedValue == null ? "" : 
receivedValue.toString(), expectedValue == null ? "" : 
expectedValue.toString()));
+        }
+        
+    }
 
     @Test
     public void 
producedMessageIsReceivedWhenPublisherAcknowledgementsAreEnabled() throws 
InterruptedException, IOException, TimeoutException {
@@ -162,10 +226,18 @@ public class RabbitMQProducerIntTest extends 
AbstractRabbitMQIntTest {
 
     private class ArrayPopulatingConsumer extends DefaultConsumer {
         private final List<String> received;
+        private final Map<String, Object> receivedHeaders;
 
         ArrayPopulatingConsumer(final List<String> received) {
             super(RabbitMQProducerIntTest.this.channel);
             this.received = received;
+            receivedHeaders = new HashMap<String, Object>();
+        }
+        
+        ArrayPopulatingConsumer(final List<String> received, Map<String, 
Object> receivedHeaders) {
+            super(RabbitMQProducerIntTest.this.channel);
+            this.received = received;
+            this.receivedHeaders = receivedHeaders;
         }
 
         @Override
@@ -173,6 +245,9 @@ public class RabbitMQProducerIntTest extends 
AbstractRabbitMQIntTest {
                                    Envelope envelope,
                                    AMQP.BasicProperties properties,
                                    byte[] body) throws IOException {
+            LOGGER.info("AMQP.BasicProperties: {}", properties);
+            
+            receivedHeaders.putAll(properties.getHeaders());
             received.add(new String(body));
         }
     }
diff --git 
a/platforms/spring-boot/components-starter/camel-rabbitmq-starter/src/main/java/org/apache/camel/component/rabbitmq/springboot/RabbitMQComponentConfiguration.java
 
b/platforms/spring-boot/components-starter/camel-rabbitmq-starter/src/main/java/org/apache/camel/component/rabbitmq/springboot/RabbitMQComponentConfiguration.java
index b2a40b2..d9da3f6 100644
--- 
a/platforms/spring-boot/components-starter/camel-rabbitmq-starter/src/main/java/org/apache/camel/component/rabbitmq/springboot/RabbitMQComponentConfiguration.java
+++ 
b/platforms/spring-boot/components-starter/camel-rabbitmq-starter/src/main/java/org/apache/camel/component/rabbitmq/springboot/RabbitMQComponentConfiguration.java
@@ -38,7 +38,7 @@ public class RabbitMQComponentConfiguration
      */
     private Boolean enabled;
     /**
-     * The hostname of the running rabbitmq instance or cluster.
+     * The hostname of the running RabbitMQ instance or cluster.
      */
     private String hostname;
     /**
@@ -282,6 +282,10 @@ public class RabbitMQComponentConfiguration
      */
     private String deadLetterExchangeType = "direct";
     /**
+     * Allow pass null values to header
+     */
+    private Boolean allowNullHeaders = false;
+    /**
      * Whether the component should resolve property placeholders on itself 
when
      * starting. Only properties which are of String type can use property
      * placeholders.
@@ -675,6 +679,14 @@ public class RabbitMQComponentConfiguration
         this.deadLetterExchangeType = deadLetterExchangeType;
     }
 
+    public Boolean getAllowNullHeaders() {
+        return allowNullHeaders;
+    }
+
+    public void setAllowNullHeaders(Boolean allowNullHeaders) {
+        this.allowNullHeaders = allowNullHeaders;
+    }
+
     public Boolean getResolvePropertyPlaceholders() {
         return resolvePropertyPlaceholders;
     }

Reply via email to