This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git

commit 93dd1b6c4dd1351c3baf4f317fa72d7adae4c2c2
Author: Claus Ibsen <claus.ib...@gmail.com>
AuthorDate: Wed Mar 7 10:17:12 2018 +0100

    CAMEL-12329: camel-rabbitmq. Use a new header to override sending to 
another destination name in the producer to avoid issue with from->to sending 
to itself in from. This is now aligned with how eg the JMS component does it 
with a special OVERRIDE header for overrding the destination.
---
 components/camel-rabbitmq/readme.txt               | 13 +++
 .../camel/component/rabbitmq/RabbitMQEndpoint.java | 13 ---
 .../rabbitmq/RabbitMQMessagePublisher.java         | 13 ++-
 .../camel/component/rabbitmq/RabbitMQProducer.java |  2 +-
 .../rabbitmq/reply/CorrelationTimeoutMap.java      |  4 +-
 .../rabbitmq/reply/TemporaryQueueReplyHandler.java |  4 +-
 .../rabbitmq/reply/TemporaryQueueReplyManager.java |  6 +-
 .../component/rabbitmq/RabbitMQIntBasicTest.java   | 94 ++++++++++++++++++++++
 8 files changed, 125 insertions(+), 24 deletions(-)

diff --git a/components/camel-rabbitmq/readme.txt 
b/components/camel-rabbitmq/readme.txt
new file mode 100644
index 0000000..b95b015
--- /dev/null
+++ b/components/camel-rabbitmq/readme.txt
@@ -0,0 +1,13 @@
+Integration testing
+===================
+
+The camel-rabbitmq component has both unit tests and integration tests.
+The integration tests requires a running RabbitMQ broker. This broker can be 
run via Docker
+
+    docker run -d -p 5672:5672 -e RABBITMQ_DEFAULT_USER=cameltest -e 
RABBITMQ_DEFAULT_PASS=cameltest --hostname my-rabbit --name some-rabbit 
rabbitmq:3
+
+And then the integration tests can be run via Maven
+
+    mvn test -P itest
+
+
diff --git 
a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQEndpoint.java
 
b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQEndpoint.java
index 11abfe7..d3567b5 100644
--- 
a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQEndpoint.java
+++ 
b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQEndpoint.java
@@ -223,19 +223,6 @@ public class RabbitMQEndpoint extends DefaultEndpoint 
implements AsyncEndpoint {
         new RabbitMQMessagePublisher(camelExchange, channel, routingKey, 
this).publish();
     }
 
-    /**
-     * Extracts name of the rabbitmq exchange
-     */
-    protected String getExchangeName(Message msg) {
-        String exchangeName = msg.getHeader(RabbitMQConstants.EXCHANGE_NAME, 
String.class);
-        // If it is BridgeEndpoint we should ignore the message header of
-        // EXCHANGE_NAME
-        if (exchangeName == null || isBridgeEndpoint()) {
-            exchangeName = getExchangeName();
-        }
-        return exchangeName;
-    }
-
     @Override
     public Consumer createConsumer(Processor processor) throws Exception {
         RabbitMQConsumer consumer = new RabbitMQConsumer(this, processor);
diff --git 
a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQMessagePublisher.java
 
b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQMessagePublisher.java
index 7e59eec..f5f3bb9 100644
--- 
a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQMessagePublisher.java
+++ 
b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQMessagePublisher.java
@@ -105,12 +105,19 @@ public class RabbitMQMessagePublisher {
     }
 
     private void publishToRabbitMQ(final AMQP.BasicProperties properties, 
final byte[] body) throws IOException {
-        String rabbitExchange = endpoint.getExchangeName(message);
+        // remove the OVERRIDE header so it does not propagate
+        String exchangeName = (String) 
message.removeHeader(RabbitMQConstants.EXCHANGE_OVERRIDE_NAME);
+        // If it is BridgeEndpoint we should ignore the message header of 
EXCHANGE_OVERRIDE_NAME
+        if (exchangeName == null || endpoint.isBridgeEndpoint()) {
+            exchangeName = endpoint.getExchangeName();
+        } else {
+            LOG.debug("Overriding header: {} detected sending message to 
exchange: {}", RabbitMQConstants.EXCHANGE_OVERRIDE_NAME, exchangeName);
+        }
 
         Boolean mandatory = 
camelExchange.getIn().getHeader(RabbitMQConstants.MANDATORY, 
endpoint.isMandatory(), Boolean.class);
         Boolean immediate = 
camelExchange.getIn().getHeader(RabbitMQConstants.IMMEDIATE, 
endpoint.isImmediate(), Boolean.class);
 
-        LOG.debug("Sending message to exchange: {} with CorrelationId: {}", 
rabbitExchange, properties.getCorrelationId());
+        LOG.debug("Sending message to exchange: {} with CorrelationId: {}", 
exchangeName, properties.getCorrelationId());
 
         if (isPublisherAcknowledgements()) {
             channel.confirmSelect();
@@ -121,7 +128,7 @@ public class RabbitMQMessagePublisher {
         }
 
         try {
-            channel.basicPublish(rabbitExchange, routingKey, mandatory, 
immediate, properties, body);
+            channel.basicPublish(exchangeName, routingKey, mandatory, 
immediate, properties, body);
             if (isPublisherAcknowledgements()) {
                 waitForConfirmation();
             }
diff --git 
a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQProducer.java
 
b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQProducer.java
index a1cc075..996d352 100644
--- 
a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQProducer.java
+++ 
b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQProducer.java
@@ -372,7 +372,7 @@ public class RabbitMQProducer extends DefaultAsyncProducer {
         String name = "RabbitMQReplyManagerTimeoutChecker[" + 
getEndpoint().getExchangeName() + "]";
         ScheduledExecutorService replyManagerExecutorService = 
getEndpoint().getCamelContext().getExecutorServiceManager().newSingleThreadScheduledExecutor(name,
 name);
         replyManager.setScheduledExecutorService(replyManagerExecutorService);
-        log.info("Starting reply manager service " + name);
+        log.debug("Staring ReplyManager: {}", name);
         ServiceHelper.startService(replyManager);
 
         return replyManager;
diff --git 
a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/reply/CorrelationTimeoutMap.java
 
b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/reply/CorrelationTimeoutMap.java
index 8f052a0..7ed1089 100644
--- 
a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/reply/CorrelationTimeoutMap.java
+++ 
b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/reply/CorrelationTimeoutMap.java
@@ -86,13 +86,13 @@ public class CorrelationTimeoutMap extends 
DefaultTimeoutMap<String, ReplyHandle
         } else {
             result = super.put(key, value, timeoutMillis);
         }
-        log.info("Added correlationID: {} to timeout after: {} millis", key, 
timeoutMillis);
+        log.debug("Added correlationID: {} to timeout after: {} millis", key, 
timeoutMillis);
         return result;
     }
 
     @Override
     public ReplyHandler putIfAbsent(String key, ReplyHandler value, long 
timeoutMillis) {
-        log.info("in putIfAbsent with key {}", key);
+        log.trace("putIfAbsent with key {}", key);
 
         try {
             if (listener != null) {
diff --git 
a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/reply/TemporaryQueueReplyHandler.java
 
b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/reply/TemporaryQueueReplyHandler.java
index 542e8c0..10234a4 100644
--- 
a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/reply/TemporaryQueueReplyHandler.java
+++ 
b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/reply/TemporaryQueueReplyHandler.java
@@ -53,7 +53,7 @@ public class TemporaryQueueReplyHandler implements 
ReplyHandler {
 
     public void onReply(String correlationId, AMQP.BasicProperties properties, 
byte[] reply) {
         // create holder object with the the reply
-        log.info("in onReply with correlationId= {}", correlationId);
+        log.debug("onReply with correlationId: {}", correlationId);
         ReplyHolder holder = new ReplyHolder(exchange, callback, 
originalCorrelationId, correlationId, properties, reply);
         // process the reply
         replyManager.processReply(holder);
@@ -61,7 +61,7 @@ public class TemporaryQueueReplyHandler implements 
ReplyHandler {
 
     public void onTimeout(String correlationId) {
         // create holder object without the reply which means a timeout 
occurred
-        log.info("in onTimeout with correlationId= {}", correlationId);
+        log.debug("onTimeout with correlationId: {}", correlationId);
         ReplyHolder holder = new ReplyHolder(exchange, callback, 
originalCorrelationId, correlationId, timeout);
         // process timeout
         replyManager.processReply(holder);
diff --git 
a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/reply/TemporaryQueueReplyManager.java
 
b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/reply/TemporaryQueueReplyManager.java
index 6f03017..842d43e 100644
--- 
a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/reply/TemporaryQueueReplyManager.java
+++ 
b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/reply/TemporaryQueueReplyManager.java
@@ -76,10 +76,10 @@ public class TemporaryQueueReplyManager extends 
ReplyManagerSupport {
     @Override
     protected Connection createListenerContainer() throws Exception {
 
-        log.debug("Creating connection");
+        log.trace("Creating connection");
         Connection conn = endpoint.connect(executorService);
 
-        log.debug("Creating channel");
+        log.trace("Creating channel");
         Channel channel = conn.createChannel();
         // setup the basicQos
         if (endpoint.isPrefetchEnabled()) {
@@ -89,7 +89,7 @@ public class TemporaryQueueReplyManager extends 
ReplyManagerSupport {
 
         //Let the server pick a random name for us
         DeclareOk result = channel.queueDeclare();
-        log.info("Using temporary queue name: {}", result.getQueue());
+        log.debug("Using temporary queue name: {}", result.getQueue());
         setReplyTo(result.getQueue());
 
         //TODO check for the RabbitMQConstants.EXCHANGE_NAME header
diff --git 
a/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQIntBasicTest.java
 
b/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQIntBasicTest.java
new file mode 100644
index 0000000..209f940
--- /dev/null
+++ 
b/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQIntBasicTest.java
@@ -0,0 +1,94 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.rabbitmq;
+
+import org.apache.camel.Endpoint;
+import org.apache.camel.EndpointInject;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.junit.Test;
+
+public class RabbitMQIntBasicTest extends AbstractRabbitMQIntTest {
+
+    // Startup RabbitMQ via Docker
+    // docker run -d -it -p 5672:5672 -e RABBITMQ_DEFAULT_USER=cameltest -e 
RABBITMQ_DEFAULT_PASS=cameltest --hostname my-rabbit --name some-rabbit 
rabbitmq:3
+
+    @EndpointInject(uri = 
"rabbitmq:localhost:5672/foo?username=cameltest&password=cameltest")
+    private Endpoint foo;
+
+    @EndpointInject(uri = 
"rabbitmq:localhost:5672/bar?username=cameltest&password=cameltest")
+    private Endpoint bar;
+
+    @EndpointInject(uri = "mock:result")
+    private MockEndpoint mock;
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                from(foo)
+                    .log("FOO received: ${body}")
+                    .to(bar);
+
+                from(bar)
+                    .log("BAR received: ${body}")
+                    .to(mock)
+                    .transform().simple("Bye ${body}");
+            }
+        };
+    }
+
+    @Test
+    public void sentBasicInOnly() throws Exception {
+        mock.expectedBodiesReceived("World");
+
+        log.info("Sending to FOO");
+        template.sendBody(foo, "World");
+        log.info("Sending to FOO done");
+
+        mock.assertIsSatisfied();
+    }
+
+    @Test
+    public void sentBasicInOut() throws Exception {
+        mock.expectedBodiesReceived("World");
+
+        log.info("Sending to FOO");
+        String out = template.requestBody(foo, "World", String.class);
+        assertEquals("Bye World", out);
+        log.info("Sending to FOO done");
+
+        mock.assertIsSatisfied();
+    }
+
+    @Test
+    public void sentBasicInOutTwo() throws Exception {
+        mock.expectedBodiesReceived("World", "Camel");
+
+        log.info("Sending to FOO");
+        String out = template.requestBody(foo, "World", String.class);
+        assertEquals("Bye World", out);
+        out = template.requestBody(foo, "Camel", String.class);
+        assertEquals("Bye Camel", out);
+        log.info("Sending to FOO done");
+
+        mock.assertIsSatisfied();
+    }
+
+}
+

-- 
To stop receiving notification emails like this one, please contact
davscl...@apache.org.

Reply via email to