This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git

commit 972e12433931eeb4cbaf3621519dfaa9010bc0ed
Author: Claus Ibsen <claus.ib...@gmail.com>
AuthorDate: Wed Mar 7 09:40:10 2018 +0100

    CAMEL-12329: camel-rabbitmq. Use a new header to override sending to 
another destination name in the producer to avoid issue with from->to sending 
to itself in from. This is now aligned with how eg the JMS component does it 
with a special OVERRIDE header for overrding the destination.
---
 .../src/main/docs/rabbitmq-component.adoc          | 24 ++++++++++++++++++----
 .../component/rabbitmq/RabbitMQConstants.java      |  3 +++
 .../rabbitmq/RabbitMQMessagePublisher.java         | 11 +++++-----
 .../camel/component/rabbitmq/RabbitMQProducer.java | 16 ++++++++++++---
 4 files changed, 42 insertions(+), 12 deletions(-)

diff --git a/components/camel-rabbitmq/src/main/docs/rabbitmq-component.adoc 
b/components/camel-rabbitmq/src/main/docs/rabbitmq-component.adoc
index d781a04..2727679 100644
--- a/components/camel-rabbitmq/src/main/docs/rabbitmq-component.adoc
+++ b/components/camel-rabbitmq/src/main/docs/rabbitmq-component.adoc
@@ -173,6 +173,8 @@ that will be used when producing a message
 
 |`rabbitmq.DELIVERY_TAG` |The rabbitmq delivery tag of the received message
 
+|`rabbitmq.REDELIVERY_TAG` |Whether the message is a redelivered
+
 |`rabbitmq.REQUEUE` |*Camel 2.14.2:* This is used by the consumer to control 
rejection of the
 message. When the consumer is complete processing the exchange, and if
 the exchange failed, then the consumer is going to reject the message
@@ -191,7 +193,9 @@ camel exchange then they will be set on the RabbitMQ 
message.
 
 |`rabbitmq.ROUTING_KEY` |The routing key that will be used when sending the 
message
 
-|`rabbitmq.EXCHANGE_NAME` |The exchange the message was received from, or sent 
to
+|`rabbitmq.EXCHANGE_NAME` |The exchange the message was received from
+
+|`rabbitmq.EXCHANGE_OVERRIDE_NAME` | *Camel 2.21:* Used for force sending the 
message to this exchange instead of the endpoint configured name on the producer
 
 |`rabbitmq.CONTENT_TYPE` |The contentType to set on the RabbitMQ message
 
@@ -276,7 +280,7 @@ public Map<String, Object> bindArgsBuilder() {
 }
 ----
 
-==== Routing between exchanges
+==== Issue when routing between exchanges (in Camel 2.20.x or older)
 
 If you for example want to route messages from one Rabbit exchange to another 
as shown
 in the example below with foo -> bar:
@@ -295,7 +299,7 @@ and instead send the message to `foo`.
 
 To avoid this you need to either:
 
-- remove the header
+- Remove the header:
 
 [source,java]
 ----
@@ -304,7 +308,7 @@ from("rabbitmq://localhost/foo")
   .to("rabbitmq://localhost/bar")
 ----
 
-- or turn on `bridgeEndpoint` mode on the producer.
+- Or turn on `bridgeEndpoint` mode on the producer:
 
 [source,java]
 ----
@@ -312,3 +316,15 @@ from("rabbitmq://localhost/foo")
   .to("rabbitmq://localhost/bar?bridgeEndpoint=true")
 ----
 
+From Camel 2.21 onwards this has been improved so you can easily route between 
exchanges.
+The header `rabbitmq.EXCHANGE_NAME` is not longer used by the producer to 
override the destination exchange.
+Instead a new header `rabbitmq.EXCHANGE_OVERRIDE_NAME` can be used to send to 
a different exchange.
+For example to send to cheese exchange you can do
+
+[source,java]
+----
+from("rabbitmq://localhost/foo")
+  .setHeader("rabbitmq.EXCHANGE_OVERRIDE_NAME", constant("cheese"))
+  .to("rabbitmq://localhost/bar")
+----
+
diff --git 
a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQConstants.java
 
b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQConstants.java
index 8f2ef49..d796a58 100644
--- 
a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQConstants.java
+++ 
b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQConstants.java
@@ -17,8 +17,10 @@
 package org.apache.camel.component.rabbitmq;
 
 public final class RabbitMQConstants {
+
     // TODO need to change the constant which is start with camel
     public static final String ROUTING_KEY = "rabbitmq.ROUTING_KEY";
+    public static final String EXCHANGE_OVERRIDE_NAME = 
"rabbitmq.EXCHANGE_OVERRIDE_NAME";
     public static final String EXCHANGE_NAME = "rabbitmq.EXCHANGE_NAME";
     public static final String CONTENT_TYPE = "rabbitmq.CONTENT_TYPE";
     public static final String PRIORITY = "rabbitmq.PRIORITY";
@@ -47,6 +49,7 @@ public final class RabbitMQConstants {
     public static final String RABBITMQ_QUEUE_MESSAGE_TTL_KEY = 
"x-message-ttl";
     public static final String RABBITMQ_QUEUE_TTL_KEY = "x-expires";
     
+
     private RabbitMQConstants() {
         //Constants class
     }
diff --git 
a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQMessagePublisher.java
 
b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQMessagePublisher.java
index 85e657f..7e59eec 100644
--- 
a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQMessagePublisher.java
+++ 
b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQMessagePublisher.java
@@ -49,7 +49,7 @@ public class RabbitMQMessagePublisher {
     private final ReturnListener guaranteedDeliveryReturnListener = new 
ReturnListener() {
         @Override
         public void handleReturn(int replyCode, String replyText, String 
exchange, String routingKey, AMQP.BasicProperties properties, byte[] body) 
throws IOException {
-            LOG.warn("Delivery failed for exchange {} and routing key {}; 
replyCode = {}; replyText = {}", exchange, routingKey, replyCode, replyText);
+            LOG.warn("Delivery failed for exchange: {} and routing key: {}; 
replyCode: {}; replyText: {}", exchange, routingKey, replyCode, replyText);
             basicReturnReceived = true;
         }
     };
@@ -67,11 +67,12 @@ public class RabbitMQMessagePublisher {
 
         // Remove the SERIALIZE_HEADER in case it was previously set
         if (message.getHeaders() != null && 
message.getHeaders().containsKey(RabbitMQEndpoint.SERIALIZE_HEADER)) {
-            LOG.debug("Removing the {} header", 
RabbitMQEndpoint.SERIALIZE_HEADER);
+            LOG.trace("Removing header: {}", 
RabbitMQEndpoint.SERIALIZE_HEADER);
             message.getHeaders().remove(RabbitMQEndpoint.SERIALIZE_HEADER);
         }
         if (routingKey != null && 
routingKey.startsWith(RabbitMQConstants.RABBITMQ_DIRECT_REPLY_ROUTING_KEY)) {
             message.setHeader(RabbitMQConstants.EXCHANGE_NAME, 
RabbitMQConstants.RABBITMQ_DIRECT_REPLY_EXCHANGE); // use default exchange for 
reply-to messages
+            message.setHeader(RabbitMQConstants.EXCHANGE_OVERRIDE_NAME, 
RabbitMQConstants.RABBITMQ_DIRECT_REPLY_EXCHANGE); // use default exchange for 
reply-to messages
         }
 
         return message;
@@ -95,7 +96,7 @@ public class RabbitMQMessagePublisher {
                 properties = 
endpoint.getMessageConverter().buildProperties(camelExchange).build();
                 body = null;
             } else {
-                LOG.warn("Could not convert {} to byte[]", message.getBody());
+                LOG.warn("Cannot convert {} to byte[]", message.getBody());
                 throw new RuntimeCamelException(e);
             }
         }
@@ -109,7 +110,7 @@ public class RabbitMQMessagePublisher {
         Boolean mandatory = 
camelExchange.getIn().getHeader(RabbitMQConstants.MANDATORY, 
endpoint.isMandatory(), Boolean.class);
         Boolean immediate = 
camelExchange.getIn().getHeader(RabbitMQConstants.IMMEDIATE, 
endpoint.isImmediate(), Boolean.class);
 
-        LOG.debug("Sending message to exchange: {} with CorrelationId = {}", 
rabbitExchange, properties.getCorrelationId());
+        LOG.debug("Sending message to exchange: {} with CorrelationId: {}", 
rabbitExchange, properties.getCorrelationId());
 
         if (isPublisherAcknowledgements()) {
             channel.confirmSelect();
@@ -153,7 +154,7 @@ public class RabbitMQMessagePublisher {
             o.writeObject(msg.getBody());
             return b.toByteArray();
         } catch (NotSerializableException nse) {
-            LOG.warn("Can not send object " + msg.getBody().getClass() + " via 
RabbitMQ because it contains non-serializable objects.");
+            LOG.warn("Cannot send object {} via RabbitMQ because it contains 
non-serializable objects.", msg.getBody().getClass());
             throw new RuntimeCamelException(nse);
         }
     }
diff --git 
a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQProducer.java
 
b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQProducer.java
index 759039d..a1cc075 100644
--- 
a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQProducer.java
+++ 
b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQProducer.java
@@ -220,10 +220,13 @@ public class RabbitMQProducer extends 
DefaultAsyncProducer {
 
         in.setHeader(RabbitMQConstants.REPLY_TO, replyManager.getReplyTo());
 
-        String exchangeName = in.getHeader(RabbitMQConstants.EXCHANGE_NAME, 
String.class);
-        // If it is BridgeEndpoint we should ignore the message header of 
EXCHANGE_NAME
+        // remove the OVERRIDE header so it does not propagate
+        String exchangeName = (String) 
exchange.getIn().removeHeader(RabbitMQConstants.EXCHANGE_OVERRIDE_NAME);
+        // If it is BridgeEndpoint we should ignore the message header of 
EXCHANGE_OVERRIDE_NAME
         if (exchangeName == null || getEndpoint().isBridgeEndpoint()) {
             exchangeName = getEndpoint().getExchangeName();
+        } else {
+            log.debug("Overriding header: {} detected sending message to 
exchange: {}", RabbitMQConstants.EXCHANGE_OVERRIDE_NAME, exchangeName);
         }
 
         String key = in.getHeader(RabbitMQConstants.ROUTING_KEY, String.class);
@@ -249,7 +252,14 @@ public class RabbitMQProducer extends DefaultAsyncProducer 
{
     }
 
     private boolean processInOnly(Exchange exchange, AsyncCallback callback) 
throws Exception {
-        String exchangeName = getEndpoint().getExchangeName(exchange.getIn());
+        // remove the OVERRIDE header so it does not propagate
+        String exchangeName = (String) 
exchange.getIn().removeHeader(RabbitMQConstants.EXCHANGE_OVERRIDE_NAME);
+        // If it is BridgeEndpoint we should ignore the message header of 
EXCHANGE_OVERRIDE_NAME
+        if (exchangeName == null || getEndpoint().isBridgeEndpoint()) {
+            exchangeName = getEndpoint().getExchangeName();
+        } else {
+            log.debug("Overriding header: {} detected sending message to 
exchange: {}", RabbitMQConstants.EXCHANGE_OVERRIDE_NAME, exchangeName);
+        }
 
         String key = exchange.getIn().getHeader(RabbitMQConstants.ROUTING_KEY, 
String.class);
         // we just need to make sure RoutingKey option take effect if it is 
not BridgeEndpoint

-- 
To stop receiving notification emails like this one, please contact
davscl...@apache.org.

Reply via email to