CAMEL-8538 Add inOut support to the camel-rabitmq component
Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/8fe4288f Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/8fe4288f Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/8fe4288f Branch: refs/heads/master Commit: 8fe4288f2a19d1b8894f65b04a7ac871983b8938 Parents: 43b7953 Author: Brad Reitmeyer <brrei...@cisco.com> Authored: Tue Apr 28 13:57:39 2015 -0500 Committer: Brad Reitmeyer <git...@bradreitmeyer.com> Committed: Mon Jun 8 16:24:39 2015 -0500 ---------------------------------------------------------------------- .../component/rabbitmq/RabbitMQComponent.java | 4 +- .../component/rabbitmq/RabbitMQConstants.java | 1 + .../component/rabbitmq/RabbitMQConsumer.java | 71 +---- .../component/rabbitmq/RabbitMQEndpoint.java | 228 +++++++++++++- .../rabbitmq/RabbitMQMessageConverter.java | 219 ++++++++++++++ .../component/rabbitmq/RabbitMQProducer.java | 296 ++++++++++--------- .../camel/component/rabbitmq/ReplyToType.java | 26 ++ .../rabbitmq/reply/CorrelationListener.java | 44 +++ .../rabbitmq/reply/CorrelationTimeoutMap.java | 120 ++++++++ .../rabbitmq/reply/MessageSentCallback.java | 38 +++ .../rabbitmq/reply/QueueReplyHandler.java | 34 +++ .../component/rabbitmq/reply/ReplyHandler.java | 43 +++ .../component/rabbitmq/reply/ReplyHolder.java | 123 ++++++++ .../component/rabbitmq/reply/ReplyManager.java | 76 +++++ .../rabbitmq/reply/ReplyManagerSupport.java | 238 +++++++++++++++ .../reply/TemporaryQueueReplyHandler.java | 70 +++++ .../reply/TemporaryQueueReplyManager.java | 156 ++++++++++ ...ageIdAsCorrelationIdMessageSentCallback.java | 51 ++++ .../rabbitmq/RabbitMQEndpointTest.java | 38 ++- .../rabbitmq/RabbitMQInOutIntTest.java | 200 +++++++++++++ .../rabbitmq/RabbitMQProducerTest.java | 1 + .../testbeans/TestSerializableObject.java | 26 ++ 22 files changed, 1898 insertions(+), 205 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/8fe4288f/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQComponent.java ---------------------------------------------------------------------- diff --git a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQComponent.java b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQComponent.java index 54dd2bf..c125421 100644 --- a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQComponent.java +++ b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQComponent.java @@ -18,14 +18,16 @@ package org.apache.camel.component.rabbitmq; import java.net.URI; import java.util.Map; + import javax.net.ssl.TrustManager; -import com.rabbitmq.client.ConnectionFactory; import org.apache.camel.CamelContext; import org.apache.camel.impl.UriEndpointComponent; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.rabbitmq.client.ConnectionFactory; + public class RabbitMQComponent extends UriEndpointComponent { private static final Logger LOG = LoggerFactory.getLogger(RabbitMQComponent.class); http://git-wip-us.apache.org/repos/asf/camel/blob/8fe4288f/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQConstants.java ---------------------------------------------------------------------- diff --git a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQConstants.java b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQConstants.java index f2e5568..cf4ab45 100644 --- a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQConstants.java +++ b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQConstants.java @@ -28,6 +28,7 @@ public final class RabbitMQConstants { public static final String DELIVERY_MODE = "rabbitmq.DELIVERY_MODE"; public static final String USERID = "rabbitmq.USERID"; public static final String CLUSTERID = "rabbitmq.CLUSTERID"; + public static final String REQUEST_TIMEOUT = "rabbitmq.REQUEST_TIMEOUT"; public static final String REPLY_TO = "rabbitmq.REPLY_TO"; public static final String CONTENT_ENCODING = "rabbitmq.CONTENT_ENCODING"; public static final String TYPE = "rabbitmq.TYPE"; http://git-wip-us.apache.org/repos/asf/camel/blob/8fe4288f/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQConsumer.java ---------------------------------------------------------------------- diff --git a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQConsumer.java b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQConsumer.java index a4a6362..bf142ce 100644 --- a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQConsumer.java +++ b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQConsumer.java @@ -27,15 +27,17 @@ import com.rabbitmq.client.AMQP; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.Envelope; + import org.apache.camel.Exchange; import org.apache.camel.ExchangePattern; import org.apache.camel.Message; import org.apache.camel.Processor; import org.apache.camel.impl.DefaultConsumer; + public class RabbitMQConsumer extends DefaultConsumer { - ExecutorService executor; - Connection conn; + private ExecutorService executor; + private Connection conn; private int closeTimeout = 30 * 1000; private final RabbitMQEndpoint endpoint; @@ -55,7 +57,6 @@ public class RabbitMQConsumer extends DefaultConsumer { } @Override - public RabbitMQEndpoint getEndpoint() { return (RabbitMQEndpoint) super.getEndpoint(); } @@ -79,7 +80,7 @@ public class RabbitMQConsumer extends DefaultConsumer { // setup the basicQos if (endpoint.isPrefetchEnabled()) { channel.basicQos(endpoint.getPrefetchSize(), endpoint.getPrefetchCount(), - endpoint.isPrefetchGlobal()); + endpoint.isPrefetchGlobal()); } return channel; } @@ -182,10 +183,11 @@ public class RabbitMQConsumer extends DefaultConsumer { AMQP.BasicProperties properties, byte[] body) throws IOException { Exchange exchange = consumer.endpoint.createRabbitExchange(envelope, properties, body); - mergeAmqpProperties(exchange, properties); + endpoint.getMessageConverter().mergeAmqpProperties(exchange, properties); boolean sendReply = properties.getReplyTo() != null; if (sendReply && !exchange.getPattern().isOutCapable()) { + log.debug("In an inOut capable route"); exchange.setPattern(ExchangePattern.InOut); } @@ -208,17 +210,20 @@ public class RabbitMQConsumer extends DefaultConsumer { if (!exchange.isFailed()) { // processing success if (sendReply && exchange.getPattern().isOutCapable()) { - AMQP.BasicProperties replyProps = new AMQP.BasicProperties.Builder() - .headers(msg.getHeaders()) - .correlationId(properties.getCorrelationId()) - .build(); - channel.basicPublish("", properties.getReplyTo(), replyProps, msg.getBody(byte[].class)); + endpoint.publishExchangeToChannel(exchange, channel, properties.getReplyTo()); } if (!consumer.endpoint.isAutoAck()) { log.trace("Acknowledging receipt [delivery_tag={}]", deliveryTag); channel.basicAck(deliveryTag, false); } - } else { + } + else if (endpoint.isTransferException() && exchange.getPattern().isOutCapable()) { + // the inOut exchange failed so put the exception in the body and send back + msg.setBody(exchange.getException()); + exchange.setOut(msg); + endpoint.publishExchangeToChannel(exchange, channel, properties.getReplyTo()); + } + else { boolean isRequeueHeaderSet = msg.getHeader(RabbitMQConstants.REQUEUE, false, boolean.class); // processing failed, then reject and handle the exception if (deliveryTag != 0 && !consumer.endpoint.isAutoAck()) { @@ -236,49 +241,6 @@ public class RabbitMQConsumer extends DefaultConsumer { } /** - * Will take an {@link Exchange} and add header values back to the {@link Exchange#getIn()} - */ - private void mergeAmqpProperties(Exchange exchange, AMQP.BasicProperties properties) { - - if (properties.getType() != null) { - exchange.getIn().setHeader(RabbitMQConstants.TYPE, properties.getType()); - } - if (properties.getAppId() != null) { - exchange.getIn().setHeader(RabbitMQConstants.APP_ID, properties.getAppId()); - } - if (properties.getClusterId() != null) { - exchange.getIn().setHeader(RabbitMQConstants.CLUSTERID, properties.getClusterId()); - } - if (properties.getContentEncoding() != null) { - exchange.getIn().setHeader(RabbitMQConstants.CONTENT_ENCODING, properties.getContentEncoding()); - } - if (properties.getContentType() != null) { - exchange.getIn().setHeader(RabbitMQConstants.CONTENT_TYPE, properties.getContentType()); - } - if (properties.getCorrelationId() != null) { - exchange.getIn().setHeader(RabbitMQConstants.CORRELATIONID, properties.getCorrelationId()); - } - if (properties.getExpiration() != null) { - exchange.getIn().setHeader(RabbitMQConstants.EXPIRATION, properties.getExpiration()); - } - if (properties.getMessageId() != null) { - exchange.getIn().setHeader(RabbitMQConstants.MESSAGE_ID, properties.getMessageId()); - } - if (properties.getPriority() != null) { - exchange.getIn().setHeader(RabbitMQConstants.PRIORITY, properties.getPriority()); - } - if (properties.getReplyTo() != null) { - exchange.getIn().setHeader(RabbitMQConstants.REPLY_TO, properties.getReplyTo()); - } - if (properties.getTimestamp() != null) { - exchange.getIn().setHeader(RabbitMQConstants.TIMESTAMP, properties.getTimestamp()); - } - if (properties.getUserId() != null) { - exchange.getIn().setHeader(RabbitMQConstants.USERID, properties.getUserId()); - } - } - - /** * Bind consumer to channel */ public void start() throws IOException { @@ -333,5 +295,4 @@ public class RabbitMQConsumer extends DefaultConsumer { return null; } } - } http://git-wip-us.apache.org/repos/asf/camel/blob/8fe4288f/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQEndpoint.java ---------------------------------------------------------------------- diff --git a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQEndpoint.java b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQEndpoint.java index 7638682..6cd0bca 100644 --- a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQEndpoint.java +++ b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQEndpoint.java @@ -16,7 +16,13 @@ */ package org.apache.camel.component.rabbitmq; +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; import java.io.IOException; +import java.io.InputStream; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; +import java.io.Serializable; import java.net.URISyntaxException; import java.security.KeyManagementException; import java.security.NoSuchAlgorithmException; @@ -25,6 +31,7 @@ import java.util.Map; import java.util.UUID; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; + import javax.net.ssl.TrustManager; import com.rabbitmq.client.AMQP; @@ -34,11 +41,14 @@ import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.Envelope; import com.rabbitmq.client.LongString; + import org.apache.camel.Consumer; import org.apache.camel.Exchange; import org.apache.camel.Message; +import org.apache.camel.NoTypeConversionAvailableException; import org.apache.camel.Processor; import org.apache.camel.Producer; +import org.apache.camel.TypeConversionException; import org.apache.camel.impl.DefaultEndpoint; import org.apache.camel.impl.DefaultExchange; import org.apache.camel.impl.DefaultMessage; @@ -46,9 +56,12 @@ import org.apache.camel.spi.Metadata; import org.apache.camel.spi.UriEndpoint; import org.apache.camel.spi.UriParam; import org.apache.camel.spi.UriPath; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; @UriEndpoint(scheme = "rabbitmq", title = "RabbitMQ", syntax = "rabbitmq:hostname:portNumber/exchangeName", consumerClass = RabbitMQConsumer.class, label = "messaging") public class RabbitMQEndpoint extends DefaultEndpoint { + private static final Logger LOG = LoggerFactory.getLogger(RabbitMQEndpoint.class); @UriPath @Metadata(required = "true") private String hostname; @@ -135,6 +148,24 @@ public class RabbitMQEndpoint extends DefaultEndpoint { @UriParam private ArgsConfigurer exchangeArgsConfigurer; + @UriParam + private long requestTimeout = 20000; + @UriParam + private long requestTimeoutCheckerInterval = 1000; + @UriParam + private boolean transferException = false; + // camel-jms supports this setting but it is not currently configurable in camel-rabbitmq + private boolean useMessageIDAsCorrelationID = true; + // camel-jms supports this setting but it is not currently configurable in camel-rabbitmq + private String replyToType = ReplyToType.Temporary.name(); + // camel-jms supports this setting but it is not currently configurable in camel-rabbitmq + private String replyTo = null; + + private RabbitMQMessageConverter messageConverter = new RabbitMQMessageConverter(); + + // header to indicate that the message body needs to be de-serialized + private static final String SERIALIZE_HEADER = "CamelSerialize"; + public RabbitMQEndpoint() { } @@ -150,12 +181,34 @@ public class RabbitMQEndpoint extends DefaultEndpoint { public Exchange createRabbitExchange(Envelope envelope, AMQP.BasicProperties properties, byte[] body) { Exchange exchange = new DefaultExchange(getCamelContext(), getExchangePattern()); - Message message = new DefaultMessage(); - exchange.setIn(message); + setRabbitExchange(exchange, envelope, properties, body); + return exchange; + } - message.setHeader(RabbitMQConstants.ROUTING_KEY, envelope.getRoutingKey()); - message.setHeader(RabbitMQConstants.EXCHANGE_NAME, envelope.getExchange()); - message.setHeader(RabbitMQConstants.DELIVERY_TAG, envelope.getDeliveryTag()); + /** + * Gets the message converter to convert between rabbit and camel + * @return + */ + protected RabbitMQMessageConverter getMessageConverter() { + return messageConverter; + } + + public void setRabbitExchange(Exchange camelExchange, Envelope envelope, AMQP.BasicProperties properties, byte[] body) { + Message message; + if (camelExchange.getIn() != null) { + // Use the existing message so we keep the headers + message = camelExchange.getIn(); + } + else { + message = new DefaultMessage(); + camelExchange.setIn(message); + } + + if (envelope != null) { + message.setHeader(RabbitMQConstants.ROUTING_KEY, envelope.getRoutingKey()); + message.setHeader(RabbitMQConstants.EXCHANGE_NAME, envelope.getExchange()); + message.setHeader(RabbitMQConstants.DELIVERY_TAG, envelope.getDeliveryTag()); + } Map<String, Object> headers = properties.getHeaders(); if (headers != null) { @@ -169,9 +222,109 @@ public class RabbitMQEndpoint extends DefaultEndpoint { } } - message.setBody(body); + if (hasSerializeHeader(properties)) { + Object messageBody = null; + try (InputStream b = new ByteArrayInputStream(body); ObjectInputStream o = new ObjectInputStream(b)) { + messageBody = o.readObject(); + } catch (IOException | ClassNotFoundException e) { + LOG.warn("Could not deserialize the object"); + } + if (messageBody instanceof Throwable) { + LOG.debug("Reply was an Exception. Setting the Exception on the Exchange"); + camelExchange.setException((Throwable) messageBody); + } else { + message.setBody(messageBody); + } + } else { + // Set the body as a byte[] and let the type converter deal with it + message.setBody(body); + } - return exchange; + } + + private boolean hasSerializeHeader(AMQP.BasicProperties properties) { + if (properties == null || properties.getHeaders() == null) { + return false; + } + if (properties.getHeaders().containsKey(SERIALIZE_HEADER) && properties.getHeaders().get(SERIALIZE_HEADER).equals(true)) { + return true; + } + return false; + } + + /** + * Sends the body that is on the exchange + * @param camelExchange + * @param channel + * @param properties + * @throws IOException + */ + public void publishExchangeToChannel(Exchange camelExchange, Channel channel, String routingKey) throws IOException { + Message msg; + if (camelExchange.hasOut()) { + msg = camelExchange.getOut(); + } else { + msg = camelExchange.getIn(); + } + + // Remove the SERIALIZE_HEADER in case it was previously set + if (msg.getHeaders() != null && msg.getHeaders().containsKey(SERIALIZE_HEADER)) { + LOG.debug("Removing the {} header", SERIALIZE_HEADER); + msg.getHeaders().remove(SERIALIZE_HEADER); + } + + AMQP.BasicProperties properties; + byte[] body; + try { + // To maintain backwards compatibility try the TypeConverter (The DefaultTypeConverter seems to only work on Strings) + body = camelExchange.getContext().getTypeConverter().mandatoryConvertTo(byte[].class, camelExchange, msg.getBody()); + + properties = getMessageConverter().buildProperties(camelExchange).build(); + } catch (NoTypeConversionAvailableException | TypeConversionException e) { + if (msg.getBody() instanceof Serializable) { + // Add the header so the reply processor knows to de-serialize it + msg.getHeaders().put(SERIALIZE_HEADER, true); + + properties = getMessageConverter().buildProperties(camelExchange).build(); + + try (ByteArrayOutputStream b = new ByteArrayOutputStream(); ObjectOutputStream o = new ObjectOutputStream(b)) { + o.writeObject(msg.getBody()); + body = b.toByteArray(); + } + } + else if (msg.getBody() == null) { + properties = getMessageConverter().buildProperties(camelExchange).build(); + body = null; + } + else { + LOG.warn("Could not convert {} to byte[]", msg.getBody()); + throw new IOException(e); + } + } + String rabbitExchange = getExchangeName(msg); + + Boolean mandatory = camelExchange.getIn().getHeader(RabbitMQConstants.MANDATORY, isMandatory(), Boolean.class); + Boolean immediate = camelExchange.getIn().getHeader(RabbitMQConstants.IMMEDIATE, isImmediate(), Boolean.class); + + + LOG.debug("Sending message to exchange: {} with CorrelationId = {}", rabbitExchange, properties.getCorrelationId()); + + channel.basicPublish(rabbitExchange, routingKey, mandatory, immediate, properties, body); + } + + /** + * Extracts name of the rabbitmq exchange + * + * @param msg + * @return + */ + protected String getExchangeName(Message msg) { + String exchangeName = msg.getHeader(RabbitMQConstants.EXCHANGE_NAME, String.class); + // If it is BridgeEndpoint we should ignore the message header of EXCHANGE_NAME + if (exchangeName == null || isBridgeEndpoint()) { + exchangeName = getExchangeName(); + } + return exchangeName; } @Override @@ -712,9 +865,6 @@ public class RabbitMQEndpoint extends DefaultEndpoint { return channelPoolMaxSize; } - /** - * Set maximum number of opened channel in pool - */ public void setChannelPoolMaxSize(int channelPoolMaxSize) { this.channelPoolMaxSize = channelPoolMaxSize; } @@ -763,14 +913,14 @@ public class RabbitMQEndpoint extends DefaultEndpoint { public ArgsConfigurer getQueueArgsConfigurer() { return queueArgsConfigurer; } - + /** * Set the configurer for setting the queue args in Channel.queueDeclare */ public void setQueueArgsConfigurer(ArgsConfigurer queueArgsConfigurer) { this.queueArgsConfigurer = queueArgsConfigurer; } - + public ArgsConfigurer getExchangeArgsConfigurer() { return exchangeArgsConfigurer; } @@ -781,4 +931,58 @@ public class RabbitMQEndpoint extends DefaultEndpoint { public void setExchangeArgsConfigurer(ArgsConfigurer exchangeArgsConfigurer) { this.exchangeArgsConfigurer = exchangeArgsConfigurer; } + + /** + * Set timeout for waiting for a reply when using the InOut Exchange Pattern (in milliseconds) + */ + public void setRequestTimeout(long requestTimeout) { + this.requestTimeout = requestTimeout; + } + + public long getRequestTimeout() { + return requestTimeout; + } + + /** + * Set requestTimeoutCheckerInterval for inOut exchange + */ + public void setRequestTimeoutCheckerInterval(long requestTimeoutCheckerInterval) { + this.requestTimeoutCheckerInterval = requestTimeoutCheckerInterval; + } + + public long getRequestTimeoutCheckerInterval() { + return requestTimeoutCheckerInterval; + } + + /** + * Get useMessageIDAsCorrelationID for inOut exchange + */ + public boolean isUseMessageIDAsCorrelationID() { + return useMessageIDAsCorrelationID; + } + + /** + * When true and an inOut Exchange failed on the consumer side send the caused Exception back in the response + */ + public void setTransferException(boolean transferException) { + this.transferException = transferException; + } + + public boolean isTransferException() { + return transferException; + } + + /** + * Get replyToType for inOut exchange + */ + public String getReplyToType() { + return replyToType; + } + + /** + * Gets the Queue to reply to if you dont want to use temporary reply queues + */ + public String getReplyTo() { + return replyTo; + } } http://git-wip-us.apache.org/repos/asf/camel/blob/8fe4288f/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQMessageConverter.java ---------------------------------------------------------------------- diff --git a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQMessageConverter.java b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQMessageConverter.java new file mode 100644 index 0000000..95abe81 --- /dev/null +++ b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQMessageConverter.java @@ -0,0 +1,219 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.rabbitmq; + +import java.math.BigDecimal; +import java.sql.Timestamp; +import java.util.Date; +import java.util.HashMap; +import java.util.Map; + +import org.apache.camel.Exchange; +import org.apache.camel.Message; +import org.apache.camel.util.ObjectHelper; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.rabbitmq.client.AMQP; +import com.rabbitmq.client.LongString; + +public class RabbitMQMessageConverter { + protected static final Logger LOG = LoggerFactory.getLogger(RabbitMQMessageConverter.class); + + /** + * Will take an {@link Exchange} and add header values back to the {@link Exchange#getIn()} + */ + public void mergeAmqpProperties(Exchange exchange, AMQP.BasicProperties properties) { + + if (properties.getType() != null) { + exchange.getIn().setHeader(RabbitMQConstants.TYPE, properties.getType()); + } + if (properties.getAppId() != null) { + exchange.getIn().setHeader(RabbitMQConstants.APP_ID, properties.getAppId()); + } + if (properties.getClusterId() != null) { + exchange.getIn().setHeader(RabbitMQConstants.CLUSTERID, properties.getClusterId()); + } + if (properties.getContentEncoding() != null) { + exchange.getIn().setHeader(RabbitMQConstants.CONTENT_ENCODING, properties.getContentEncoding()); + } + if (properties.getContentType() != null) { + exchange.getIn().setHeader(RabbitMQConstants.CONTENT_TYPE, properties.getContentType()); + } + if (properties.getCorrelationId() != null) { + exchange.getIn().setHeader(RabbitMQConstants.CORRELATIONID, properties.getCorrelationId()); + } + if (properties.getExpiration() != null) { + exchange.getIn().setHeader(RabbitMQConstants.EXPIRATION, properties.getExpiration()); + } + if (properties.getMessageId() != null) { + exchange.getIn().setHeader(RabbitMQConstants.MESSAGE_ID, properties.getMessageId()); + } + if (properties.getPriority() != null) { + exchange.getIn().setHeader(RabbitMQConstants.PRIORITY, properties.getPriority()); + } + if (properties.getReplyTo() != null) { + exchange.getIn().setHeader(RabbitMQConstants.REPLY_TO, properties.getReplyTo()); + } + if (properties.getTimestamp() != null) { + exchange.getIn().setHeader(RabbitMQConstants.TIMESTAMP, properties.getTimestamp()); + } + if (properties.getUserId() != null) { + exchange.getIn().setHeader(RabbitMQConstants.USERID, properties.getUserId()); + } + } + + public AMQP.BasicProperties.Builder buildProperties(Exchange exchange) { + AMQP.BasicProperties.Builder properties = new AMQP.BasicProperties.Builder(); + + Message msg; + if (exchange.hasOut()) { + msg = exchange.getOut(); + } else { + msg = exchange.getIn(); + } + + final Object contentType = msg.getHeader(RabbitMQConstants.CONTENT_TYPE); + if (contentType != null) { + properties.contentType(contentType.toString()); + } + + final Object priority = msg.getHeader(RabbitMQConstants.PRIORITY); + if (priority != null) { + properties.priority(Integer.parseInt(priority.toString())); + } + + final Object messageId = msg.getHeader(RabbitMQConstants.MESSAGE_ID); + if (messageId != null) { + properties.messageId(messageId.toString()); + } + + final Object clusterId = msg.getHeader(RabbitMQConstants.CLUSTERID); + if (clusterId != null) { + properties.clusterId(clusterId.toString()); + } + + final Object replyTo = msg.getHeader(RabbitMQConstants.REPLY_TO); + if (replyTo != null) { + properties.replyTo(replyTo.toString()); + } + + final Object correlationId = msg.getHeader(RabbitMQConstants.CORRELATIONID); + if (correlationId != null) { + properties.correlationId(correlationId.toString()); + } + + final Object deliveryMode = msg.getHeader(RabbitMQConstants.DELIVERY_MODE); + if (deliveryMode != null) { + properties.deliveryMode(Integer.parseInt(deliveryMode.toString())); + } + + final Object userId = msg.getHeader(RabbitMQConstants.USERID); + if (userId != null) { + properties.userId(userId.toString()); + } + + final Object type = msg.getHeader(RabbitMQConstants.TYPE); + if (type != null) { + properties.type(type.toString()); + } + + final Object contentEncoding = msg.getHeader(RabbitMQConstants.CONTENT_ENCODING); + if (contentEncoding != null) { + properties.contentEncoding(contentEncoding.toString()); + } + + final Object expiration = msg.getHeader(RabbitMQConstants.EXPIRATION); + if (expiration != null) { + properties.expiration(expiration.toString()); + } + + final Object appId = msg.getHeader(RabbitMQConstants.APP_ID); + if (appId != null) { + properties.appId(appId.toString()); + } + + final Object timestamp = msg.getHeader(RabbitMQConstants.TIMESTAMP); + if (timestamp != null) { + properties.timestamp(new Date(Long.parseLong(timestamp.toString()))); + } + + final Map<String, Object> headers = msg.getHeaders(); + Map<String, Object> filteredHeaders = new HashMap<String, Object>(); + + // TODO: Add support for a HeaderFilterStrategy. See: org.apache.camel.component.jms.JmsBinding#shouldOutputHeader + for (Map.Entry<String, Object> header : headers.entrySet()) { + + // filter header values. + Object value = getValidRabbitMQHeaderValue(header.getValue()); + if (value != null) { + filteredHeaders.put(header.getKey(), header.getValue()); + } else if (LOG.isDebugEnabled()) { + if (header.getValue() == null) { + LOG.debug("Ignoring header: {} with null value", header.getKey()); + } else { + LOG.debug("Ignoring header: {} of class: {} with value: {}", + new Object[] { header.getKey(), ObjectHelper.classCanonicalName(header.getValue()), header.getValue() }); + } + } + } + + properties.headers(filteredHeaders); + + return properties; + } + + /** + * Strategy to test if the given header is valid. Without this, the + * com.rabbitmq.client.impl.Frame.java class will throw an + * IllegalArgumentException (invalid value in table) and close the + * connection. + * + * @param headerValue the header value + * @return the value to use, <tt>null</tt> to ignore this header + * @see com.rabbitmq.client.impl.Frame#fieldValueSize + */ + private Object getValidRabbitMQHeaderValue(Object headerValue) { + if (headerValue instanceof String) { + return headerValue; + } else if (headerValue instanceof BigDecimal) { + return headerValue; + } else if (headerValue instanceof Number) { + return headerValue; + } else if (headerValue instanceof Boolean) { + return headerValue; + } else if (headerValue instanceof Date) { + return headerValue; + } else if (headerValue instanceof byte[]) { + return headerValue; + } else if (headerValue instanceof LongString) { + return headerValue; + } else if (headerValue instanceof Timestamp) { + return headerValue; + } else if (headerValue instanceof Byte) { + return headerValue; + } else if (headerValue instanceof Double) { + return headerValue; + } else if (headerValue instanceof Float) { + return headerValue; + } else if (headerValue instanceof Long) { + return headerValue; + } + + return null; + } +} http://git-wip-us.apache.org/repos/asf/camel/blob/8fe4288f/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQProducer.java ---------------------------------------------------------------------- diff --git a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQProducer.java b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQProducer.java index 28858a6..b8c8ba2 100644 --- a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQProducer.java +++ b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQProducer.java @@ -17,28 +17,38 @@ package org.apache.camel.component.rabbitmq; import java.io.IOException; -import java.math.BigDecimal; -import java.util.Date; -import java.util.HashMap; -import java.util.Map; import java.util.concurrent.ExecutorService; +import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.atomic.AtomicBoolean; + import com.rabbitmq.client.AMQP; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; + +import org.apache.camel.AsyncCallback; import org.apache.camel.Exchange; +import org.apache.camel.FailedToCreateProducerException; import org.apache.camel.component.rabbitmq.pool.PoolableChannelFactory; -import org.apache.camel.impl.DefaultProducer; +import org.apache.camel.component.rabbitmq.reply.ReplyManager; +import org.apache.camel.component.rabbitmq.reply.TemporaryQueueReplyManager; +import org.apache.camel.impl.DefaultAsyncProducer; import org.apache.camel.util.ObjectHelper; +import org.apache.camel.util.ServiceHelper; import org.apache.commons.pool.ObjectPool; import org.apache.commons.pool.impl.GenericObjectPool; -public class RabbitMQProducer extends DefaultProducer { +public class RabbitMQProducer extends DefaultAsyncProducer { private Connection conn; private ObjectPool<Channel> channelPool; private ExecutorService executorService; private int closeTimeout = 30 * 1000; + private final AtomicBoolean started = new AtomicBoolean(false); + + private static final String GENERATED_CORRELATION_ID_PREFIX = "Camel-"; + private ReplyManager replyManager; public RabbitMQProducer(RabbitMQEndpoint endpoint) throws IOException { super(endpoint); @@ -115,6 +125,7 @@ public class RabbitMQProducer extends DefaultProducer { @Override protected void doStop() throws Exception { + unInitReplyManager(); closeConnectionAndChannel(); if (executorService != null) { getEndpoint().getCamelContext().getExecutorServiceManager().shutdownNow(executorService); @@ -122,13 +133,76 @@ public class RabbitMQProducer extends DefaultProducer { } } - @Override - public void process(Exchange exchange) throws Exception { - String exchangeName = exchange.getIn().getHeader(RabbitMQConstants.EXCHANGE_NAME, String.class); + public boolean process(Exchange exchange, AsyncCallback callback) { + // deny processing if we are not started + if (!isRunAllowed()) { + if (exchange.getException() == null) { + exchange.setException(new RejectedExecutionException()); + } + // we cannot process so invoke callback + callback.done(true); + return true; + } + + try { + if (exchange.getPattern().isOutCapable()) { + // in out requires a bit more work than in only + return processInOut(exchange, callback); + } else { + // in only + return processInOnly(exchange, callback); + } + } catch (Throwable e) { + // must catch exception to ensure callback is invoked as expected + // to let Camel error handling deal with this + exchange.setException(e); + callback.done(true); + return true; + } + } + + protected boolean processInOut(final Exchange exchange, final AsyncCallback callback) throws Exception { + final org.apache.camel.Message in = exchange.getIn(); + + initReplyManager(); + + // the request timeout can be overruled by a header otherwise the endpoint configured value is used + final long timeout = exchange.getIn().getHeader(RabbitMQConstants.REQUEST_TIMEOUT, getEndpoint().getRequestTimeout(), long.class); + + final String originalCorrelationId = in.getHeader(RabbitMQConstants.CORRELATIONID, String.class); + + // we append the 'Camel-' prefix to know it was generated by us + String correlationId = GENERATED_CORRELATION_ID_PREFIX + getEndpoint().getCamelContext().getUuidGenerator().generateUuid(); + in.setHeader(RabbitMQConstants.CORRELATIONID, correlationId); + + in.setHeader(RabbitMQConstants.REPLY_TO, replyManager.getReplyTo()); + + String exchangeName = in.getHeader(RabbitMQConstants.EXCHANGE_NAME, String.class); // If it is BridgeEndpoint we should ignore the message header of EXCHANGE_NAME if (exchangeName == null || getEndpoint().isBridgeEndpoint()) { exchangeName = getEndpoint().getExchangeName(); } + + String key = in.getHeader(RabbitMQConstants.ROUTING_KEY, null, String.class); + // we just need to make sure RoutingKey option take effect if it is not BridgeEndpoint + if (key == null || getEndpoint().isBridgeEndpoint()) { + key = getEndpoint().getRoutingKey() == null ? "" : getEndpoint().getRoutingKey(); + } + if (ObjectHelper.isEmpty(key) && ObjectHelper.isEmpty(exchangeName)) { + throw new IllegalArgumentException("ExchangeName and RoutingKey is not provided in the endpoint: " + getEndpoint()); + } + log.debug("Registering reply for {}", correlationId); + + replyManager.registerReply(replyManager, exchange, callback, originalCorrelationId, correlationId, timeout); + + basicPublish(exchange, exchangeName, key); + // continue routing asynchronously (reply will be processed async when its received) + return false; + } + + private boolean processInOnly(Exchange exchange, AsyncCallback callback) throws Exception { + String exchangeName = getEndpoint().getExchangeName(exchange.getIn()); + String key = exchange.getIn().getHeader(RabbitMQConstants.ROUTING_KEY, null, String.class); // we just need to make sure RoutingKey option take effect if it is not BridgeEndpoint if (key == null || getEndpoint().isBridgeEndpoint()) { @@ -137,12 +211,13 @@ public class RabbitMQProducer extends DefaultProducer { if (ObjectHelper.isEmpty(key) && ObjectHelper.isEmpty(exchangeName)) { throw new IllegalArgumentException("ExchangeName and RoutingKey is not provided in the endpoint: " + getEndpoint()); } - byte[] messageBodyBytes = exchange.getIn().getMandatoryBody(byte[].class); - AMQP.BasicProperties properties = buildProperties(exchange).build(); - Boolean mandatory = exchange.getIn().getHeader(RabbitMQConstants.MANDATORY, getEndpoint().isMandatory(), Boolean.class); - Boolean immediate = exchange.getIn().getHeader(RabbitMQConstants.IMMEDIATE, getEndpoint().isImmediate(), Boolean.class); - - basicPublish(exchangeName, key, mandatory, immediate, properties, messageBodyBytes); + + basicPublish(exchange, exchangeName, key); + if (callback != null) { + // we are synchronous so return true + callback.done(true); + } + return true; } /** @@ -150,13 +225,10 @@ public class RabbitMQProducer extends DefaultProducer { * * @param exchange Target exchange * @param routingKey Routing key - * @param mandatory This flag tells the server how to react if the message cannot be routed to a queue. - * @param immediate This flag tells the server how to react if the message cannot be routed to a queue consumer immediately. * @param properties Header properties * @param body Body content */ - private void basicPublish(final String exchange, final String routingKey, final boolean mandatory, final boolean immediate, - final AMQP.BasicProperties properties, final byte[] body) throws Exception { + private void basicPublish(final Exchange camelExchange, final String rabbitExchange, final String routingKey) throws Exception { if (channelPool == null) { // Open connection and channel lazily openConnectionAndChannelPool(); @@ -164,135 +236,95 @@ public class RabbitMQProducer extends DefaultProducer { execute(new ChannelCallback<Void>() { @Override public Void doWithChannel(Channel channel) throws Exception { - channel.basicPublish(exchange, routingKey, mandatory, immediate, properties, body); + getEndpoint().publishExchangeToChannel(camelExchange, channel, routingKey); return null; } }); } AMQP.BasicProperties.Builder buildProperties(Exchange exchange) { - AMQP.BasicProperties.Builder properties = new AMQP.BasicProperties.Builder(); - - final Object contentType = exchange.getIn().getHeader(RabbitMQConstants.CONTENT_TYPE); - if (contentType != null) { - properties.contentType(contentType.toString()); - } - - final Object priority = exchange.getIn().getHeader(RabbitMQConstants.PRIORITY); - if (priority != null) { - properties.priority(Integer.parseInt(priority.toString())); - } - - final Object messageId = exchange.getIn().getHeader(RabbitMQConstants.MESSAGE_ID); - if (messageId != null) { - properties.messageId(messageId.toString()); - } - - final Object clusterId = exchange.getIn().getHeader(RabbitMQConstants.CLUSTERID); - if (clusterId != null) { - properties.clusterId(clusterId.toString()); - } - - final Object replyTo = exchange.getIn().getHeader(RabbitMQConstants.REPLY_TO); - if (replyTo != null) { - properties.replyTo(replyTo.toString()); - } - - final Object correlationId = exchange.getIn().getHeader(RabbitMQConstants.CORRELATIONID); - if (correlationId != null) { - properties.correlationId(correlationId.toString()); - } - - final Object deliveryMode = exchange.getIn().getHeader(RabbitMQConstants.DELIVERY_MODE); - if (deliveryMode != null) { - properties.deliveryMode(Integer.parseInt(deliveryMode.toString())); - } - - final Object userId = exchange.getIn().getHeader(RabbitMQConstants.USERID); - if (userId != null) { - properties.userId(userId.toString()); - } - - final Object type = exchange.getIn().getHeader(RabbitMQConstants.TYPE); - if (type != null) { - properties.type(type.toString()); - } - - final Object contentEncoding = exchange.getIn().getHeader(RabbitMQConstants.CONTENT_ENCODING); - if (contentEncoding != null) { - properties.contentEncoding(contentEncoding.toString()); - } - - final Object expiration = exchange.getIn().getHeader(RabbitMQConstants.EXPIRATION); - if (expiration != null) { - properties.expiration(expiration.toString()); - } + return getEndpoint().getMessageConverter().buildProperties(exchange); + } - final Object appId = exchange.getIn().getHeader(RabbitMQConstants.APP_ID); - if (appId != null) { - properties.appId(appId.toString()); - } + public int getCloseTimeout() { + return closeTimeout; + } - final Object timestamp = exchange.getIn().getHeader(RabbitMQConstants.TIMESTAMP); - if (timestamp != null) { - properties.timestamp(new Date(Long.parseLong(timestamp.toString()))); - } + public void setCloseTimeout(int closeTimeout) { + this.closeTimeout = closeTimeout; + } - final Map<String, Object> headers = exchange.getIn().getHeaders(); - Map<String, Object> filteredHeaders = new HashMap<String, Object>(); - - // TODO: Add support for a HeaderFilterStrategy. See: org.apache.camel.component.jms.JmsBinding#shouldOutputHeader - for (Map.Entry<String, Object> header : headers.entrySet()) { - - // filter header values. - Object value = getValidRabbitMQHeaderValue(header.getValue()); - if (value != null) { - filteredHeaders.put(header.getKey(), header.getValue()); - } else if (log.isDebugEnabled()) { - if (header.getValue() == null) { - log.debug("Ignoring header: {} with null value", header.getKey()); - } else { - log.debug("Ignoring header: {} of class: {} with value: {}", - new Object[]{header.getKey(), ObjectHelper.classCanonicalName(header.getValue()), header.getValue()}); + protected void initReplyManager() { + if (!started.get()) { + synchronized (this) { + if (started.get()) { + return; } + log.debug("Starting reply manager"); + // must use the classloader from the application context when creating reply manager, + // as it should inherit the classloader from app context and not the current which may be + // a different classloader + ClassLoader current = Thread.currentThread().getContextClassLoader(); + ClassLoader ac = getEndpoint().getCamelContext().getApplicationContextClassLoader(); + try { + if (ac != null) { + Thread.currentThread().setContextClassLoader(ac); + } + // validate that replyToType and replyTo is configured accordingly + if (getEndpoint().getReplyToType() != null) { + // setting temporary with a fixed replyTo is not supported + if (getEndpoint().getReplyTo() != null && getEndpoint().getReplyToType().equals(ReplyToType.Temporary.name())) { + throw new IllegalArgumentException("ReplyToType " + ReplyToType.Temporary + + " is not supported when replyTo " + getEndpoint().getReplyTo() + " is also configured."); + } + } + + if (getEndpoint().getReplyTo() != null) { + // specifying reply queues is not currently supported + throw new IllegalArgumentException("Specifying replyTo " + getEndpoint().getReplyTo() + " is currently not supported."); + } else { + replyManager = createReplyManager(); + log.debug("Using RabbitMQReplyManager: {} to process replies from temporary queue", replyManager); + } + } catch (Exception e) { + throw new FailedToCreateProducerException(getEndpoint(), e); + } finally { + if (ac != null) { + Thread.currentThread().setContextClassLoader(current); + } + } + started.set(true); } } - - properties.headers(filteredHeaders); - - return properties; } - /** - * Strategy to test if the given header is valid - * - * @param headerValue the header value - * @return the value to use, <tt>null</tt> to ignore this header - * @see com.rabbitmq.client.impl.Frame#fieldValueSize - */ - private Object getValidRabbitMQHeaderValue(Object headerValue) { - if (headerValue instanceof String) { - return headerValue; - } else if (headerValue instanceof BigDecimal) { - return headerValue; - } else if (headerValue instanceof Number) { - return headerValue; - } else if (headerValue instanceof Boolean) { - return headerValue; - } else if (headerValue instanceof Date) { - return headerValue; - } else if (headerValue instanceof byte[]) { - return headerValue; + protected void unInitReplyManager() { + try { + if (replyManager != null) { + if (log.isDebugEnabled()) { + log.debug("Stopping JmsReplyManager: {} from processing replies from: {}", replyManager, + getEndpoint().getReplyTo() != null ? getEndpoint().getReplyTo() : "temporary queue"); + } + ServiceHelper.stopService(replyManager); + } + } catch (Exception e) { + throw ObjectHelper.wrapRuntimeCamelException(e); + } finally { + started.set(false); } - return null; } - public int getCloseTimeout() { - return closeTimeout; - } + protected ReplyManager createReplyManager() throws Exception { + // use a temporary queue + ReplyManager replyManager = new TemporaryQueueReplyManager(getEndpoint().getCamelContext()); + replyManager.setEndpoint(getEndpoint()); - public void setCloseTimeout(int closeTimeout) { - this.closeTimeout = closeTimeout; - } + String name = "RabbitMQReplyManagerTimeoutChecker[" + getEndpoint().getExchangeName() + "]"; + ScheduledExecutorService replyManagerExecutorService = getEndpoint().getCamelContext().getExecutorServiceManager().newSingleThreadScheduledExecutor(name, name); + replyManager.setScheduledExecutorService(replyManagerExecutorService); + log.info("Starting reply manager service " + name); + ServiceHelper.startService(replyManager); -} + return replyManager; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/camel/blob/8fe4288f/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/ReplyToType.java ---------------------------------------------------------------------- diff --git a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/ReplyToType.java b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/ReplyToType.java new file mode 100644 index 0000000..dc805d5 --- /dev/null +++ b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/ReplyToType.java @@ -0,0 +1,26 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.rabbitmq; + +/** + * Types for replyTo queues + * + * @version + */ +public enum ReplyToType { + Temporary, Shared, Exclusive +} http://git-wip-us.apache.org/repos/asf/camel/blob/8fe4288f/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/reply/CorrelationListener.java ---------------------------------------------------------------------- diff --git a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/reply/CorrelationListener.java b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/reply/CorrelationListener.java new file mode 100644 index 0000000..c87381f --- /dev/null +++ b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/reply/CorrelationListener.java @@ -0,0 +1,44 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.rabbitmq.reply; + +/** + * Listener for events when correlation id's changes. + */ +public interface CorrelationListener { + + /** + * Callback when a new correlation id is added + * + * @param key the correlation id + */ + void onPut(String key); + + /** + * Callback when a correlation id is removed + * + * @param key the correlation id + */ + void onRemove(String key); + + /** + * Callback when a correlation id is evicted due timeout + * + * @param key the correlation id + */ + void onEviction(String key); +} http://git-wip-us.apache.org/repos/asf/camel/blob/8fe4288f/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/reply/CorrelationTimeoutMap.java ---------------------------------------------------------------------- diff --git a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/reply/CorrelationTimeoutMap.java b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/reply/CorrelationTimeoutMap.java new file mode 100644 index 0000000..fad4fc0 --- /dev/null +++ b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/reply/CorrelationTimeoutMap.java @@ -0,0 +1,120 @@ +package org.apache.camel.component.rabbitmq.reply; + +import java.util.concurrent.ScheduledExecutorService; + +import org.apache.camel.support.DefaultTimeoutMap; + +/** + * A {@link org.apache.camel.TimeoutMap} which is used to track reply messages which + * has been timed out, and thus should trigger the waiting {@link org.apache.camel.Exchange} to + * timeout as well. + * + * @version + */ +public class CorrelationTimeoutMap extends DefaultTimeoutMap<String, ReplyHandler> { + + private CorrelationListener listener; + + public CorrelationTimeoutMap(ScheduledExecutorService executor, long requestMapPollTimeMillis) { + super(executor, requestMapPollTimeMillis); + } + + public void setListener(CorrelationListener listener) { + // there is only one listener needed + this.listener = listener; + } + + public boolean onEviction(String key, ReplyHandler value) { + try { + if (listener != null) { + listener.onEviction(key); + } + } catch (Throwable e) { + // ignore + } + + // trigger timeout + try { + value.onTimeout(key); + } catch (Throwable e) { + // must ignore so we ensure we evict the element + log.warn("Error processing onTimeout for correlationID: " + key + " due: " + e.getMessage() + ". This exception is ignored.", e); + } + + // return true to remove the element + log.trace("Evicted correlationID: {}", key); + return true; + } + + @Override + public ReplyHandler get(String key) { + ReplyHandler answer = super.get(key); + log.trace("Get correlationID: {} -> {}", key, answer != null); + return answer; + } + + @Override + public ReplyHandler put(String key, ReplyHandler value, long timeoutMillis) { + try { + if (listener != null) { + listener.onPut(key); + } + } catch (Throwable e) { + // ignore + } + + ReplyHandler result; + if (timeoutMillis <= 0) { + // no timeout (must use Integer.MAX_VALUE) + result = super.put(key, value, Integer.MAX_VALUE); + } else { + result = super.put(key, value, timeoutMillis); + } + log.info("Added correlationID: {} to timeout after: {} millis", key, timeoutMillis); + return result; + } + + @Override + public ReplyHandler putIfAbsent(String key, ReplyHandler value, long timeoutMillis) { + log.info("in putIfAbsent with key {}", key); + + try { + if (listener != null) { + listener.onPut(key); + } + } catch (Throwable e) { + // ignore + } + + ReplyHandler result; + if (timeoutMillis <= 0) { + // no timeout (must use Integer.MAX_VALUE) + result = super.putIfAbsent(key, value, Integer.MAX_VALUE); + } else { + result = super.putIfAbsent(key, value, timeoutMillis); + } + if (result == null) { + log.trace("Added correlationID: {} to timeout after: {} millis", key, timeoutMillis); + } else { + log.trace("Duplicate correlationID: {} detected", key); + } + return result; + } + + @Override + public ReplyHandler remove(String key) { + try { + if (listener != null) { + listener.onRemove(key); + } + } catch (Throwable e) { + // ignore + } + + ReplyHandler answer = super.remove(key); + log.trace("Removed correlationID: {} -> {}", key, answer != null); + return answer; + } + +} + http://git-wip-us.apache.org/repos/asf/camel/blob/8fe4288f/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/reply/MessageSentCallback.java ---------------------------------------------------------------------- diff --git a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/reply/MessageSentCallback.java b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/reply/MessageSentCallback.java new file mode 100644 index 0000000..09c2130 --- /dev/null +++ b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/reply/MessageSentCallback.java @@ -0,0 +1,38 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.rabbitmq.reply; + +import com.rabbitmq.client.Connection; + + + +/** + * Callback when a {@link Message} has been sent. + * + * @version + */ +public interface MessageSentCallback { + + /** + * Callback when the message has been sent. + * + * @param session the session + * @param message the message + * @param destination the destination + */ + void sent(Connection session, byte[] message, String destination); +} http://git-wip-us.apache.org/repos/asf/camel/blob/8fe4288f/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/reply/QueueReplyHandler.java ---------------------------------------------------------------------- diff --git a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/reply/QueueReplyHandler.java b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/reply/QueueReplyHandler.java new file mode 100644 index 0000000..b4f5804 --- /dev/null +++ b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/reply/QueueReplyHandler.java @@ -0,0 +1,34 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.rabbitmq.reply; + +import org.apache.camel.AsyncCallback; +import org.apache.camel.Exchange; + +/** + * {@link ReplyHandler} to handle processing replies when using regular queues. + * + * @version + */ +public class QueueReplyHandler extends TemporaryQueueReplyHandler { + + public QueueReplyHandler(ReplyManager replyManager, Exchange exchange, AsyncCallback callback, + String originalCorrelationId, String correlationId, long timeout) { + super(replyManager, exchange, callback, originalCorrelationId, correlationId, timeout); + } + +} http://git-wip-us.apache.org/repos/asf/camel/blob/8fe4288f/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/reply/ReplyHandler.java ---------------------------------------------------------------------- diff --git a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/reply/ReplyHandler.java b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/reply/ReplyHandler.java new file mode 100644 index 0000000..a8fe319 --- /dev/null +++ b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/reply/ReplyHandler.java @@ -0,0 +1,43 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.rabbitmq.reply; + +import com.rabbitmq.client.AMQP; + + +/** + * Handles a reply. + * + * @version + */ +public interface ReplyHandler { + + /** + * The reply message was received + * + * @param correlationId the correlation id + * @param reply the reply message + */ + void onReply(String correlationId, AMQP.BasicProperties properties, byte[] reply); + + /** + * The reply message was not received and a timeout triggered + * + * @param correlationId the correlation id + */ + void onTimeout(String correlationId); +} http://git-wip-us.apache.org/repos/asf/camel/blob/8fe4288f/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/reply/ReplyHolder.java ---------------------------------------------------------------------- diff --git a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/reply/ReplyHolder.java b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/reply/ReplyHolder.java new file mode 100644 index 0000000..d2890d0 --- /dev/null +++ b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/reply/ReplyHolder.java @@ -0,0 +1,123 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.rabbitmq.reply; + +import org.apache.camel.AsyncCallback; +import org.apache.camel.Exchange; + +import com.rabbitmq.client.AMQP; + +/** + * Holder which contains the {@link Exchange} and {@link org.apache.camel.AsyncCallback} to be used + * when the reply arrives, so we can set the reply on the {@link Exchange} and continue routing using the callback. + * + * @version + */ +public class ReplyHolder { + + private final Exchange exchange; + private final AsyncCallback callback; + private final byte[] message; + private final String originalCorrelationId; + private final String correlationId; + private long timeout; + private AMQP.BasicProperties properties; + + /** + * Constructor to use when a reply message was received + */ + public ReplyHolder(Exchange exchange, AsyncCallback callback, String originalCorrelationId, + String correlationId,AMQP.BasicProperties properties, byte[] message) { + this.exchange = exchange; + this.callback = callback; + this.originalCorrelationId = originalCorrelationId; + this.correlationId = correlationId; + this.properties = properties; + this.message = message; + } + + /** + * Constructor to use when a timeout occurred + */ + public ReplyHolder(Exchange exchange, AsyncCallback callback, String originalCorrelationId, + String correlationId, long timeout) { + this(exchange, callback, originalCorrelationId, correlationId, null, null); + this.timeout = timeout; + } + + public Exchange getExchange() { + return exchange; + } + + public AsyncCallback getCallback() { + return callback; + } + + /** + * Gets the original correlation id, if one was set when sending the message. + * <p/> + * Some JMS brokers will mess with the correlation id and send back a different/empty correlation id. + * So we need to remember it so we can restore the correlation id. + */ + public String getOriginalCorrelationId() { + return originalCorrelationId; + } + + /** + * Gets the correlation id + * + * @see #getOriginalCorrelationId() + */ + public String getCorrelationId() { + return correlationId; + } + + /** + * Gets the received message + * + * @return the received message, or <tt>null</tt> if timeout occurred and no message has been received + * @see #isTimeout() + */ + public byte[] getMessage() { + return message; + } + + /** + * Whether timeout triggered or not. + * <p/> + * A timeout is triggered if <tt>requestTimeout</tt> option has been configured, and a reply message has <b>not</b> been + * received within that time frame. + */ + public boolean isTimeout() { + return message == null; + } + + /** + * The timeout value + */ + public long getRequestTimeout() { + return timeout; + } + + /** + * The message properties + * @return + */ + public AMQP.BasicProperties getProperties(){ + return properties; + } +} http://git-wip-us.apache.org/repos/asf/camel/blob/8fe4288f/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/reply/ReplyManager.java ---------------------------------------------------------------------- diff --git a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/reply/ReplyManager.java b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/reply/ReplyManager.java new file mode 100644 index 0000000..7c1c015 --- /dev/null +++ b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/reply/ReplyManager.java @@ -0,0 +1,76 @@ +package org.apache.camel.component.rabbitmq.reply; + +import java.util.concurrent.ScheduledExecutorService; + +import org.apache.camel.AsyncCallback; +import org.apache.camel.Exchange; +import org.apache.camel.component.rabbitmq.RabbitMQEndpoint; + + +/** + * The {@link ReplyManager} is responsible for handling <a href="http://camel.apache.org/request-reply.html">request-reply</a> + * over RabbitMQ. + * + * @version + */ +public interface ReplyManager { + + /** + * Sets the belonging {@link org.apache.camel.component.jms.JmsEndpoint}. + */ + void setEndpoint(RabbitMQEndpoint endpoint); + + /** + * Sets the reply to queue the manager should listen for replies. + * <p/> + * The queue is either a temporary or a persistent queue. + */ + void setReplyTo(String replyTo); + + /** + * Gets the reply to queue being used + */ + String getReplyTo(); + + /** + * Register a reply + * + * @param replyManager the reply manager being used + * @param exchange the exchange + * @param callback the callback + * @param originalCorrelationId an optional original correlation id + * @param correlationId the correlation id to expect being used + * @param requestTimeout the timeout + * @return the correlation id used + */ + String registerReply(ReplyManager replyManager, Exchange exchange, AsyncCallback callback, + String originalCorrelationId, String correlationId, long requestTimeout); + + /** + * Sets the scheduled to use when checking for timeouts (no reply received within a given time period) + */ + void setScheduledExecutorService(ScheduledExecutorService executorService); + + + /** + * Updates the correlation id to the new correlation id. + * <p/> + * This is only used when <tt>useMessageIDasCorrelationID</tt> option is used, which means a + * provisional correlation id is first used, then after the message has been sent, the real + * correlation id is known. This allows us then to update the internal mapping to expect the + * real correlation id. + * + * @param correlationId the provisional correlation id + * @param newCorrelationId the real correlation id + * @param requestTimeout the timeout + */ + void updateCorrelationId(String correlationId, String newCorrelationId, long requestTimeout); + + + /** + * Process the reply + * + * @param holder containing needed data to process the reply and continue routing + */ + void processReply(ReplyHolder holder); +} http://git-wip-us.apache.org/repos/asf/camel/blob/8fe4288f/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/reply/ReplyManagerSupport.java ---------------------------------------------------------------------- diff --git a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/reply/ReplyManagerSupport.java b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/reply/ReplyManagerSupport.java new file mode 100644 index 0000000..6e41f99 --- /dev/null +++ b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/reply/ReplyManagerSupport.java @@ -0,0 +1,238 @@ +package org.apache.camel.component.rabbitmq.reply; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; + +import org.apache.camel.AsyncCallback; +import org.apache.camel.CamelContext; +import org.apache.camel.Exchange; +import org.apache.camel.ExchangeTimedOutException; +import org.apache.camel.component.rabbitmq.RabbitMQConstants; +import org.apache.camel.component.rabbitmq.RabbitMQEndpoint; +import org.apache.camel.support.ServiceSupport; +import org.apache.camel.util.ExchangeHelper; +import org.apache.camel.util.ObjectHelper; +import org.apache.camel.util.ServiceHelper; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.rabbitmq.client.AMQP; +import com.rabbitmq.client.Connection; + + +public abstract class ReplyManagerSupport extends ServiceSupport implements ReplyManager{ + + protected final Logger log = LoggerFactory.getLogger(ReplyManagerSupport.class); + protected final CamelContext camelContext; + + protected ScheduledExecutorService executorService; + protected RabbitMQEndpoint endpoint; + protected String replyTo; + protected Connection listenerContainer; + private int closeTimeout = 30 * 1000; + protected final CountDownLatch replyToLatch = new CountDownLatch(1); + protected final long replyToTimeout = 1000; + protected CorrelationTimeoutMap correlation; + + public ReplyManagerSupport(CamelContext camelContext) { + this.camelContext = camelContext; + } + + public void setScheduledExecutorService(ScheduledExecutorService executorService) { + this.executorService = executorService; + } + + public void setEndpoint(RabbitMQEndpoint endpoint) { + this.endpoint = endpoint; + } + + public void setReplyTo(String replyTo) { + log.debug("ReplyTo destination: {}", replyTo); + this.replyTo = replyTo; + // trigger latch as the reply to has been resolved and set + replyToLatch.countDown(); + } + + public String getReplyTo() { + if (replyTo != null) { + return replyTo; + } + try { + // the reply to destination has to be resolved using a DestinationResolver using + // the MessageListenerContainer which occurs asynchronously so we have to wait + // for that to happen before we can retrieve the reply to destination to be used + log.trace("Waiting for replyTo to be set"); + boolean done = replyToLatch.await(replyToTimeout, TimeUnit.MILLISECONDS); + if (!done) { + log.warn("ReplyTo destination was not set and timeout occurred"); + } else { + log.trace("Waiting for replyTo to be set done"); + } + } catch (InterruptedException e) { + // ignore + } + return replyTo; + } + + public String registerReply(ReplyManager replyManager, Exchange exchange, AsyncCallback callback, + String originalCorrelationId, String correlationId, long requestTimeout) { + log.debug("in registerReply"); + // add to correlation map + QueueReplyHandler handler = new QueueReplyHandler(replyManager, exchange, callback, + originalCorrelationId, correlationId, requestTimeout); + // Just make sure we don't override the old value of the correlationId + ReplyHandler result = correlation.putIfAbsent(correlationId, handler, requestTimeout); + if (result != null) { + String logMessage = String.format("The correlationId [%s] is not unique.", correlationId); + throw new IllegalArgumentException(logMessage); + } + return correlationId; + } + + + protected abstract ReplyHandler createReplyHandler(ReplyManager replyManager, Exchange exchange, AsyncCallback callback, + String originalCorrelationId, String correlationId, long requestTimeout); + + public void onMessage(AMQP.BasicProperties properties, byte[] message) { + String correlationID = properties.getCorrelationId(); + + if (correlationID == null) { + log.warn("Ignoring message with no correlationID: {}", message); + return; + } + + log.debug("Received reply message with correlationID [{}] -> {}", correlationID, message); + + // handle the reply message + handleReplyMessage(correlationID, properties, message); + } + + public void processReply(ReplyHolder holder) { + log.info("in processReply"); + if (holder != null && isRunAllowed()) { + try { + Exchange exchange = holder.getExchange(); + + boolean timeout = holder.isTimeout(); + if (timeout) { + // timeout occurred do a WARN log so its easier to spot in the logs + if (log.isWarnEnabled()) { + log.warn("Timeout occurred after {} millis waiting for reply message with correlationID [{}] on destination {}." + + " Setting ExchangeTimedOutException on {} and continue routing.", + new Object[]{holder.getRequestTimeout(), holder.getCorrelationId(), replyTo, ExchangeHelper.logIds(exchange)}); + } + + // no response, so lets set a timed out exception + String msg = "reply message with correlationID: " + holder.getCorrelationId() + " not received on destination: " + replyTo; + exchange.setException(new ExchangeTimedOutException(exchange, holder.getRequestTimeout(), msg)); + } else { + + endpoint.setRabbitExchange(exchange, null, holder.getProperties(), holder.getMessage()); + + // restore correlation id in case the remote server messed with it + if (holder.getOriginalCorrelationId() != null) { + if(exchange.getOut() != null){ + exchange.getOut().setHeader(RabbitMQConstants.CORRELATIONID, holder.getOriginalCorrelationId()); + } + else{ + exchange.getIn().setHeader(RabbitMQConstants.CORRELATIONID, holder.getOriginalCorrelationId()); + } + } + + } + } finally { + // notify callback + AsyncCallback callback = holder.getCallback(); + callback.done(false); + } + } + } + + + + protected abstract void handleReplyMessage(String correlationID, AMQP.BasicProperties properties, byte[] message); + + protected abstract Connection createListenerContainer() throws Exception; + + /** + * <b>IMPORTANT:</b> This logic is only being used due to high performance in-memory only + * testing using InOut over JMS. Its unlikely to happen in a real life situation with communication + * to a remote broker, which always will be slower to send back reply, before Camel had a chance + * to update it's internal correlation map. + */ + protected ReplyHandler waitForProvisionCorrelationToBeUpdated(String correlationID, byte[] message) { + // race condition, when using messageID as correlationID then we store a provisional correlation id + // at first, which gets updated with the JMSMessageID after the message has been sent. And in the unlikely + // event that the reply comes back really really fast, and the correlation map hasn't yet been updated + // from the provisional id to the JMSMessageID. If so we have to wait a bit and lookup again. + if (log.isWarnEnabled()) { + log.warn("Early reply received with correlationID [{}] -> {}", correlationID, message); + } + + ReplyHandler answer = null; + + // wait up till 5 seconds + boolean done = false; + int counter = 0; + while (!done && counter++ < 50) { + log.trace("Early reply not found handler at attempt {}. Waiting a bit longer.", counter); + try { + Thread.sleep(100); + } catch (InterruptedException e) { + // ignore + } + + // try again + answer = correlation.get(correlationID); + done = answer != null; + + if (answer != null) { + if (log.isTraceEnabled()) { + log.trace("Early reply with correlationID [{}] has been matched after {} attempts and can be processed using handler: {}", + new Object[]{correlationID, counter, answer}); + } + } + } + + return answer; + } + + @Override + protected void doStart() throws Exception { + ObjectHelper.notNull(executorService, "executorService", this); + ObjectHelper.notNull(endpoint, "endpoint", this); + + // timeout map to use for purging messages which have timed out, while waiting for an expected reply + // when doing request/reply over JMS + log.debug("Using timeout checker interval with {} millis", endpoint.getRequestTimeoutCheckerInterval()); + correlation = new CorrelationTimeoutMap(executorService, endpoint.getRequestTimeoutCheckerInterval()); + ServiceHelper.startService(correlation); + + // create JMS listener and start it + + listenerContainer = createListenerContainer(); + + log.debug("Using executor {}", executorService); + + } + + + @Override + protected void doStop() throws Exception { + ServiceHelper.stopService(correlation); + + if (listenerContainer != null) { + log.debug("Closing connection: {} with timeout: {} ms.", listenerContainer, closeTimeout); + listenerContainer.close(closeTimeout); + listenerContainer = null; + } + + // must also stop executor service + if (executorService != null) { + camelContext.getExecutorServiceManager().shutdownGraceful(executorService); + executorService = null; + } + } + +} http://git-wip-us.apache.org/repos/asf/camel/blob/8fe4288f/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/reply/TemporaryQueueReplyHandler.java ---------------------------------------------------------------------- diff --git a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/reply/TemporaryQueueReplyHandler.java b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/reply/TemporaryQueueReplyHandler.java new file mode 100644 index 0000000..c7fcf41 --- /dev/null +++ b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/reply/TemporaryQueueReplyHandler.java @@ -0,0 +1,70 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.rabbitmq.reply; + +import org.apache.camel.AsyncCallback; +import org.apache.camel.Exchange; +import org.apache.camel.component.rabbitmq.RabbitMQConsumer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.rabbitmq.client.AMQP; + +/** + * {@link ReplyHandler} to handle processing replies when using temporary queues. + * + * @version + */ +public class TemporaryQueueReplyHandler implements ReplyHandler { + + protected final Logger log = LoggerFactory.getLogger(TemporaryQueueReplyHandler.class); + + // task queue to add the holder so we can process the reply + protected final ReplyManager replyManager; + protected final Exchange exchange; + protected final AsyncCallback callback; + // remember the original correlation id, in case the server returns back a reply with a messed up correlation id + protected final String originalCorrelationId; + protected final String correlationId; + protected final long timeout; + + public TemporaryQueueReplyHandler(ReplyManager replyManager, Exchange exchange, AsyncCallback callback, + String originalCorrelationId, String correlationId, long timeout) { + this.replyManager = replyManager; + this.exchange = exchange; + this.originalCorrelationId = originalCorrelationId; + this.correlationId = correlationId; + this.callback = callback; + this.timeout = timeout; + } + + public void onReply(String correlationId, AMQP.BasicProperties properties, byte[] reply) { + // create holder object with the the reply + log.info("in onReply with correlationId= {}", correlationId); + ReplyHolder holder = new ReplyHolder(exchange, callback, originalCorrelationId, correlationId, properties, reply); + // process the reply + replyManager.processReply(holder); + } + + public void onTimeout(String correlationId) { + // create holder object without the reply which means a timeout occurred + log.info("in onTimeout with correlationId= {}", correlationId); + ReplyHolder holder = new ReplyHolder(exchange, callback, originalCorrelationId, correlationId, timeout); + // process timeout + replyManager.processReply(holder); + } +}