Repository: camel Updated Branches: refs/heads/master 43b79533d -> 8fe4288f2
http://git-wip-us.apache.org/repos/asf/camel/blob/8fe4288f/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/reply/TemporaryQueueReplyManager.java ---------------------------------------------------------------------- diff --git a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/reply/TemporaryQueueReplyManager.java b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/reply/TemporaryQueueReplyManager.java new file mode 100644 index 0000000..6cae778 --- /dev/null +++ b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/reply/TemporaryQueueReplyManager.java @@ -0,0 +1,156 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.rabbitmq.reply; + +import java.io.IOException; +import java.util.Map.Entry; + +import org.apache.camel.AsyncCallback; +import org.apache.camel.CamelContext; +import org.apache.camel.Exchange; + +import com.rabbitmq.client.AMQP; +import com.rabbitmq.client.AMQP.Queue.DeclareOk; +import com.rabbitmq.client.Channel; +import com.rabbitmq.client.Connection; +import com.rabbitmq.client.Envelope; + + +/** + * A {@link ReplyManager} when using temporary queues. + * + * @version + */ +public class TemporaryQueueReplyManager extends ReplyManagerSupport { + + private RabbitConsumer consumer; + + public TemporaryQueueReplyManager(CamelContext camelContext) { + super(camelContext); + } + + protected ReplyHandler createReplyHandler(ReplyManager replyManager, Exchange exchange, AsyncCallback callback, + String originalCorrelationId, String correlationId, long requestTimeout) { + return new TemporaryQueueReplyHandler(this, exchange, callback, originalCorrelationId, correlationId, requestTimeout); + } + + public void updateCorrelationId(String correlationId, String newCorrelationId, long requestTimeout) { + log.trace("Updated provisional correlationId [{}] to expected correlationId [{}]", correlationId, newCorrelationId); + + ReplyHandler handler = correlation.remove(correlationId); + if (handler != null) { + correlation.put(newCorrelationId, handler, requestTimeout); + } + } + + @Override + protected void handleReplyMessage(String correlationID, AMQP.BasicProperties properties, byte[] message) { + ReplyHandler handler = correlation.get(correlationID); + if (handler == null && endpoint.isUseMessageIDAsCorrelationID()) { + handler = waitForProvisionCorrelationToBeUpdated(correlationID, message); + } + if (handler != null) { + correlation.remove(correlationID); + handler.onReply(correlationID, properties, message); + } else { + // we could not correlate the received reply message to a matching request and therefore + // we cannot continue routing the unknown message + // log a warn and then ignore the message + log.warn("Reply received for unknown correlationID [{}]. The message will be ignored: {}", correlationID, message); + } + } + + @Override + protected Connection createListenerContainer() throws Exception { + + log.debug("Creating connection"); + Connection conn = endpoint.connect(executorService); + + log.debug("Creating channel"); + Channel channel = conn.createChannel(); + // setup the basicQos + if (endpoint.isPrefetchEnabled()) { + channel.basicQos(endpoint.getPrefetchSize(), endpoint.getPrefetchCount(), + endpoint.isPrefetchGlobal()); + } + + //Let the server pick a random name for us + DeclareOk result = channel.queueDeclare(); + log.debug("Temporary queue name {}", result.getQueue()); + setReplyTo(result.getQueue()); + + //TODO check for the RabbitMQConstants.EXCHANGE_NAME header + channel.queueBind(getReplyTo(), endpoint.getExchangeName(), getReplyTo()); + + consumer = new RabbitConsumer(this, channel); + consumer.start(); + + return conn; + } + + @Override + protected void doStop() throws Exception { + super.doStop(); + consumer.stop(); + } + + //TODO combine with class in RabbitMQConsumer + class RabbitConsumer extends com.rabbitmq.client.DefaultConsumer { + + private final TemporaryQueueReplyManager consumer; + private final Channel channel; + private String tag; + + /** + * Constructs a new instance and records its association to the + * passed-in channel. + * + * @param channel the channel to which this consumer is attached + */ + public RabbitConsumer(TemporaryQueueReplyManager consumer, Channel channel) { + super(channel); + this.consumer = consumer; + this.channel = channel; + } + + @Override + public void handleDelivery(String consumerTag, Envelope envelope, + AMQP.BasicProperties properties, byte[] body) throws IOException { + + consumer.onMessage(properties, body); + } + + /** + * Bind consumer to channel + */ + private void start() throws IOException { + tag = channel.basicConsume(getReplyTo(), endpoint.isAutoAck(), this); + } + + /** + * Unbind consumer from channel + */ + private void stop() throws IOException { + if (channel.isOpen()) { + if (tag != null) { + channel.basicCancel(tag); + } + channel.close(); + } + } + } +} http://git-wip-us.apache.org/repos/asf/camel/blob/8fe4288f/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/reply/UseMessageIdAsCorrelationIdMessageSentCallback.java ---------------------------------------------------------------------- diff --git a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/reply/UseMessageIdAsCorrelationIdMessageSentCallback.java b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/reply/UseMessageIdAsCorrelationIdMessageSentCallback.java new file mode 100644 index 0000000..3521bec --- /dev/null +++ b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/reply/UseMessageIdAsCorrelationIdMessageSentCallback.java @@ -0,0 +1,51 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.rabbitmq.reply; + +import java.util.UUID; + +import com.rabbitmq.client.Connection; + + + +/** + * Callback to be used when using the option <tt>useMessageIDAsCorrelationID</tt>. + * <p/> + * This callback will keep the correlation registration in {@link ReplyManager} up-to-date with + * the <tt>JMSMessageID</tt> which was assigned and used when the message was sent. + * + * @version + */ +public class UseMessageIdAsCorrelationIdMessageSentCallback implements MessageSentCallback { + + private ReplyManager replyManager; + private String correlationId; + private long requestTimeout; + + public UseMessageIdAsCorrelationIdMessageSentCallback(ReplyManager replyManager, String correlationId, long requestTimeout) { + this.replyManager = replyManager; + this.correlationId = correlationId; + this.requestTimeout = requestTimeout; + } + + public void sent(Connection session, byte[] message, String destination) { + String newCorrelationID = UUID.randomUUID().toString(); + if (newCorrelationID != null) { + replyManager.updateCorrelationId(correlationId, newCorrelationID, requestTimeout); + } + } +} http://git-wip-us.apache.org/repos/asf/camel/blob/8fe4288f/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQEndpointTest.java ---------------------------------------------------------------------- diff --git a/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQEndpointTest.java b/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQEndpointTest.java index 52e76c8..19c580f 100644 --- a/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQEndpointTest.java +++ b/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQEndpointTest.java @@ -18,6 +18,7 @@ package org.apache.camel.component.rabbitmq; import java.io.IOException; import java.math.BigDecimal; +import java.sql.Timestamp; import java.util.Date; import java.util.HashMap; import java.util.Map; @@ -25,17 +26,18 @@ import java.util.UUID; import java.util.concurrent.Executors; import java.util.concurrent.ThreadPoolExecutor; -import com.rabbitmq.client.AMQP; -import com.rabbitmq.client.Address; -import com.rabbitmq.client.ConnectionFactory; -import com.rabbitmq.client.Envelope; -import com.rabbitmq.client.impl.LongStringHelper; import org.apache.camel.Exchange; import org.apache.camel.impl.JndiRegistry; import org.apache.camel.test.junit4.CamelTestSupport; import org.junit.Test; import org.mockito.Mockito; +import com.rabbitmq.client.AMQP; +import com.rabbitmq.client.Address; +import com.rabbitmq.client.ConnectionFactory; +import com.rabbitmq.client.Envelope; +import com.rabbitmq.client.impl.LongStringHelper; + public class RabbitMQEndpointTest extends CamelTestSupport { private Envelope envelope = Mockito.mock(Envelope.class); @@ -107,6 +109,10 @@ public class RabbitMQEndpointTest extends CamelTestSupport { customHeaders.put("dateHeader", new Date(0)); customHeaders.put("byteArrayHeader", "foo".getBytes()); customHeaders.put("longStringHeader", LongStringHelper.asLongString("Some really long string")); + customHeaders.put("timestampHeader", new Timestamp(4200)); + customHeaders.put("byteHeader", new Byte((byte) 0)); + customHeaders.put("floatHeader", new Float(42.4242)); + customHeaders.put("longHeader", new Long(420000000000000000L)); Mockito.when(properties.getHeaders()).thenReturn(customHeaders); byte[] body = new byte[20]; @@ -122,6 +128,10 @@ public class RabbitMQEndpointTest extends CamelTestSupport { assertEquals(new Date(0), exchange.getIn().getHeader("dateHeader")); assertArrayEquals("foo".getBytes(), (byte[]) exchange.getIn().getHeader("byteArrayHeader")); assertEquals("Some really long string", exchange.getIn().getHeader("longStringHeader")); + assertEquals(new Timestamp(4200), exchange.getIn().getHeader("timestampHeader")); + assertEquals(new Byte((byte) 0), exchange.getIn().getHeader("byteHeader")); + assertEquals(new Float(42.4242), exchange.getIn().getHeader("floatHeader")); + assertEquals(new Long(420000000000000000L), exchange.getIn().getHeader("longHeader")); assertEquals(body, exchange.getIn().getBody()); } @@ -219,4 +229,22 @@ public class RabbitMQEndpointTest extends CamelTestSupport { assertEquals(654, connectionFactory.getNetworkRecoveryInterval()); assertFalse(connectionFactory.isTopologyRecoveryEnabled()); } + + @Test + public void createEndpointWithTransferExceptionEnabled() throws Exception { + RabbitMQEndpoint endpoint = context.getEndpoint("rabbitmq:localhost/exchange?transferException=true", RabbitMQEndpoint.class); + assertEquals(true, endpoint.isTransferException()); + } + + @Test + public void createEndpointWithReplyTimeout() throws Exception { + RabbitMQEndpoint endpoint = context.getEndpoint("rabbitmq:localhost/exchange?requestTimeout=2000", RabbitMQEndpoint.class); + assertEquals(2000, endpoint.getRequestTimeout()); + } + + @Test + public void createEndpointWithRequestTimeoutCheckerInterval() throws Exception { + RabbitMQEndpoint endpoint = context.getEndpoint("rabbitmq:localhost/exchange?requestTimeoutCheckerInterval=1000", RabbitMQEndpoint.class); + assertEquals(1000, endpoint.getRequestTimeoutCheckerInterval()); + } } http://git-wip-us.apache.org/repos/asf/camel/blob/8fe4288f/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQInOutIntTest.java ---------------------------------------------------------------------- diff --git a/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQInOutIntTest.java b/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQInOutIntTest.java new file mode 100644 index 0000000..5c1223e --- /dev/null +++ b/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQInOutIntTest.java @@ -0,0 +1,200 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.rabbitmq; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.apache.camel.CamelExecutionException; +import org.apache.camel.Endpoint; +import org.apache.camel.EndpointInject; +import org.apache.camel.Exchange; +import org.apache.camel.Processor; +import org.apache.camel.Produce; +import org.apache.camel.ProducerTemplate; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.component.rabbitmq.testbeans.TestSerializableObject; +import org.apache.camel.test.junit4.CamelTestSupport; +import org.junit.Test; + +public class RabbitMQInOutIntTest extends CamelTestSupport { + + private static final String EXCHANGE = "ex5"; + public static final String ROUTING_KEY = "rk5"; + public static final long TIMEOUT_MS = 2000; + + @Produce(uri = "direct:start") + protected ProducerTemplate template; + + @Produce(uri = "direct:rabbitMQ") + protected ProducerTemplate directProducer; + + @EndpointInject(uri = "rabbitmq:localhost:5672/" + EXCHANGE + "?exchangeType=direct&username=cameltest&password=cameltest" + "&autoAck=true&queue=q4&routingKey=" + ROUTING_KEY + + "&transferException=true&requestTimeout=" + TIMEOUT_MS) + private Endpoint rabbitMQEndpoint; + + @Override + protected RouteBuilder createRouteBuilder() throws Exception { + return new RouteBuilder() { + + @Override + public void configure() throws Exception { + + from("direct:rabbitMQ").id("producingRoute").setHeader("routeHeader", simple("routeHeader")).inOut(rabbitMQEndpoint); + + from(rabbitMQEndpoint).id("consumingRoute").log("Receiving message").process(new Processor() { + public void process(Exchange exchange) throws Exception { + if (exchange.getIn().getBody(TestSerializableObject.class) != null) { + TestSerializableObject foo = exchange.getIn().getBody(TestSerializableObject.class); + foo.setDescription("foobar"); + } + + else if (exchange.getIn().getBody(String.class) != null) { + if (exchange.getIn().getBody(String.class).contains("header")) { + assertEquals(exchange.getIn().getHeader("String"), "String"); + assertEquals(exchange.getIn().getHeader("routeHeader"), "routeHeader"); + } + + if (exchange.getIn().getBody(String.class).contains("Exception")) { + throw new IllegalArgumentException("Boom"); + } + + if (exchange.getIn().getBody(String.class).contains("TimeOut")) { + Thread.sleep(TIMEOUT_MS * 2); + } + + exchange.getIn().setBody(exchange.getIn().getBody(String.class) + " response"); + } + + } + }); + } + }; + } + + @Test + public void inOutRaceConditionTest1() throws InterruptedException, IOException { + String reply = template.requestBodyAndHeader("direct:rabbitMQ", "test1", RabbitMQConstants.EXCHANGE_NAME, EXCHANGE, String.class); + assertEquals("test1 response", reply); + } + + @Test + public void inOutRaceConditionTest2() throws InterruptedException, IOException { + String reply = template.requestBodyAndHeader("direct:rabbitMQ", "test2", RabbitMQConstants.EXCHANGE_NAME, EXCHANGE, String.class); + assertEquals("test2 response", reply); + } + + @Test + public void headerTest() throws InterruptedException, IOException { + Map<String, Object> headers = new HashMap<>(); + + TestSerializableObject testObject = new TestSerializableObject(); + testObject.setName("header"); + + headers.put("String", "String"); + headers.put("Boolean", new Boolean(false)); + + // This will blow up the connection if not removed before sending the message + headers.put("TestObject1", testObject); + // This will blow up the connection if not removed before sending the message + headers.put("class", testObject.getClass()); + // This will mess up de-serialization if not removed before sending the message + headers.put("CamelSerialize", true); + + // populate a map and an arrayList + Map<Object, Object> tmpMap = new HashMap<>(); + List<String> tmpList = new ArrayList<>(); + for (int i = 0; i < 3; i++) { + String name = "header" + i; + tmpList.add(name); + tmpMap.put(name, name); + } + // This will blow up the connection if not removed before sending the message + headers.put("arrayList", tmpList); + // This will blow up the connection if not removed before sending the message + headers.put("map", tmpMap); + + String reply = template.requestBodyAndHeaders("direct:rabbitMQ", "header", headers, String.class); + assertEquals("header response", reply); + } + + @Test + public void serializeTest() throws InterruptedException, IOException { + TestSerializableObject foo = new TestSerializableObject(); + foo.setName("foobar"); + + TestSerializableObject reply = template.requestBodyAndHeader("direct:rabbitMQ", foo, RabbitMQConstants.EXCHANGE_NAME, EXCHANGE, TestSerializableObject.class); + assertEquals("foobar", reply.getName()); + assertEquals("foobar", reply.getDescription()); + } + + @Test + public void testSerializableObject() throws IOException { + TestSerializableObject foo = new TestSerializableObject(); + foo.setName("foobar"); + + byte[] body = null; + try (ByteArrayOutputStream b = new ByteArrayOutputStream(); ObjectOutputStream o = new ObjectOutputStream(b)) { + o.writeObject(foo); + body = b.toByteArray(); + } + + TestSerializableObject newFoo = null; + try (InputStream b = new ByteArrayInputStream(body); ObjectInputStream o = new ObjectInputStream(b)) { + newFoo = (TestSerializableObject) o.readObject(); + } catch (IOException | ClassNotFoundException e) { + } + assertEquals(foo.getName(), newFoo.getName()); + } + + @Test + public void inOutExceptionTest() { + try { + template.requestBodyAndHeader("direct:rabbitMQ", "Exception", RabbitMQConstants.EXCHANGE_NAME, EXCHANGE, String.class); + fail("This should have thrown an exception"); + } catch (CamelExecutionException e) { + assertEquals(e.getCause().getClass(), IllegalArgumentException.class); + } catch (Exception e) { + fail("This should have caught CamelExecutionException"); + } + } + + @Test + public void inOutTimeOutTest() { + try { + template.requestBodyAndHeader("direct:rabbitMQ", "TimeOut", RabbitMQConstants.EXCHANGE_NAME, EXCHANGE, String.class); + fail("This should have thrown a timeOut exception"); + } catch (CamelExecutionException e) { + // expected + } catch (Exception e) { + fail("This should have caught CamelExecutionException"); + } + } + + @Test + public void inOutNullTest() { + template.requestBodyAndHeader("direct:rabbitMQ", null, RabbitMQConstants.EXCHANGE_NAME, EXCHANGE, Object.class); + } +} http://git-wip-us.apache.org/repos/asf/camel/blob/8fe4288f/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQProducerTest.java ---------------------------------------------------------------------- diff --git a/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQProducerTest.java b/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQProducerTest.java index 7b2df60..cefece5 100644 --- a/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQProducerTest.java +++ b/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/RabbitMQProducerTest.java @@ -51,6 +51,7 @@ public class RabbitMQProducerTest { Mockito.when(exchange.getIn()).thenReturn(message); Mockito.when(endpoint.connect(Matchers.any(ExecutorService.class))).thenReturn(conn); Mockito.when(conn.createChannel()).thenReturn(null); + Mockito.when(endpoint.getMessageConverter()).thenReturn(new RabbitMQMessageConverter()); } @Test http://git-wip-us.apache.org/repos/asf/camel/blob/8fe4288f/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/testbeans/TestSerializableObject.java ---------------------------------------------------------------------- diff --git a/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/testbeans/TestSerializableObject.java b/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/testbeans/TestSerializableObject.java new file mode 100644 index 0000000..1fdffd0 --- /dev/null +++ b/components/camel-rabbitmq/src/test/java/org/apache/camel/component/rabbitmq/testbeans/TestSerializableObject.java @@ -0,0 +1,26 @@ +package org.apache.camel.component.rabbitmq.testbeans; + +import java.io.Serializable; + +public class TestSerializableObject implements Serializable { + private static final long serialVersionUID = 1L; + + private String description; + private String name; + + public String getDescription() { + return description; + } + + public void setDescription(String description) { + this.description = description; + } + + public String getName() { + return name; + } + + public void setName(String name) { + this.name = name; + } +} \ No newline at end of file