Author: davsclaus
Date: Tue Dec 20 07:42:17 2011
New Revision: 1221129
URL: http://svn.apache.org/viewvc?rev=1221129&view=rev
Log:
CAMEL-4800: request/reply over JMS include details about correlationID if
timeout occurs. Reduce noise in log to only log a WARN and no stacktrace etc.
Removed:
camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/UnknownReplyMessageException.java
Modified:
camel/trunk/camel-core/src/main/java/org/apache/camel/ExchangeTimedOutException.java
camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/PersistentQueueReplyHandler.java
camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/PersistentQueueReplyManager.java
camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/ReplyHolder.java
camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/ReplyManagerSupport.java
camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/TemporaryQueueReplyHandler.java
camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/TemporaryQueueReplyManager.java
Modified:
camel/trunk/camel-core/src/main/java/org/apache/camel/ExchangeTimedOutException.java
URL:
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/ExchangeTimedOutException.java?rev=1221129&r1=1221128&r2=1221129&view=diff
==============================================================================
---
camel/trunk/camel-core/src/main/java/org/apache/camel/ExchangeTimedOutException.java
(original)
+++
camel/trunk/camel-core/src/main/java/org/apache/camel/ExchangeTimedOutException.java
Tue Dec 20 07:42:17 2011
@@ -31,6 +31,11 @@ public class ExchangeTimedOutException e
this.timeout = timeout;
}
+ public ExchangeTimedOutException(Exchange exchange, long timeout, String
message) {
+ super("The OUT message was not received within: " + timeout + " millis
due " + message, exchange);
+ this.timeout = timeout;
+ }
+
/**
* Return the timeout which expired in milliseconds
*/
Modified:
camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/PersistentQueueReplyHandler.java
URL:
http://svn.apache.org/viewvc/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/PersistentQueueReplyHandler.java?rev=1221129&r1=1221128&r2=1221129&view=diff
==============================================================================
---
camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/PersistentQueueReplyHandler.java
(original)
+++
camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/PersistentQueueReplyHandler.java
Tue Dec 20 07:42:17 2011
@@ -26,12 +26,9 @@ import org.apache.camel.Exchange;
*/
public class PersistentQueueReplyHandler extends TemporaryQueueReplyHandler {
- private MessageSelectorCreator dynamicMessageSelector;
-
public PersistentQueueReplyHandler(ReplyManager replyManager, Exchange
exchange, AsyncCallback callback,
- String originalCorrelationId, long
timeout, MessageSelectorCreator dynamicMessageSelector) {
- super(replyManager, exchange, callback, originalCorrelationId,
timeout);
- this.dynamicMessageSelector = dynamicMessageSelector;
+ String originalCorrelationId, String
correlationId, long timeout) {
+ super(replyManager, exchange, callback, originalCorrelationId,
correlationId, timeout);
}
}
Modified:
camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/PersistentQueueReplyManager.java
URL:
http://svn.apache.org/viewvc/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/PersistentQueueReplyManager.java?rev=1221129&r1=1221128&r2=1221129&view=diff
==============================================================================
---
camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/PersistentQueueReplyManager.java
(original)
+++
camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/PersistentQueueReplyManager.java
Tue Dec 20 07:42:17 2011
@@ -44,7 +44,7 @@ public class PersistentQueueReplyManager
String originalCorrelationId, String
correlationId, long requestTimeout) {
// add to correlation map
PersistentQueueReplyHandler handler = new
PersistentQueueReplyHandler(replyManager, exchange, callback,
- originalCorrelationId, requestTimeout, dynamicMessageSelector);
+ originalCorrelationId, correlationId, requestTimeout);
correlation.put(correlationId, handler, requestTimeout);
return correlationId;
}
@@ -76,9 +76,9 @@ public class PersistentQueueReplyManager
} else {
// we could not correlate the received reply message to a matching
request and therefore
// we cannot continue routing the unknown message
- String text = "Reply received for unknown correlationID [" +
correlationID + "] -> " + message;
+ String text = "Reply received for unknown correlationID [" +
correlationID + "]. The message will be ignored: " + message;
+ // log a warn and then ignore the message
log.warn(text);
- throw new UnknownReplyMessageException(text, message,
correlationID);
}
}
Modified:
camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/ReplyHolder.java
URL:
http://svn.apache.org/viewvc/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/ReplyHolder.java?rev=1221129&r1=1221128&r2=1221129&view=diff
==============================================================================
---
camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/ReplyHolder.java
(original)
+++
camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/ReplyHolder.java
Tue Dec 20 07:42:17 2011
@@ -33,23 +33,27 @@ public class ReplyHolder {
private final AsyncCallback callback;
private final Message message;
private final String originalCorrelationId;
+ private final String correlationId;
private long timeout;
/**
* Constructor to use when a reply message was received
*/
- public ReplyHolder(Exchange exchange, AsyncCallback callback, String
originalCorrelationId, Message message) {
+ public ReplyHolder(Exchange exchange, AsyncCallback callback, String
originalCorrelationId,
+ String correlationId, Message message) {
this.exchange = exchange;
this.callback = callback;
this.originalCorrelationId = originalCorrelationId;
+ this.correlationId = correlationId;
this.message = message;
}
/**
* Constructor to use when a timeout occurred
*/
- public ReplyHolder(Exchange exchange, AsyncCallback callback, String
originalCorrelationId, long timeout) {
- this(exchange, callback, originalCorrelationId, null);
+ public ReplyHolder(Exchange exchange, AsyncCallback callback, String
originalCorrelationId,
+ String correlationId, long timeout) {
+ this(exchange, callback, originalCorrelationId, correlationId, null);
this.timeout = timeout;
}
@@ -72,6 +76,15 @@ public class ReplyHolder {
}
/**
+ * Gets the correlation id
+ *
+ * @see #getOriginalCorrelationId()
+ */
+ public String getCorrelationId() {
+ return correlationId;
+ }
+
+ /**
* Gets the received message
*
* @return the received message, or <tt>null</tt> if timeout occurred and
no message has been received
Modified:
camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/ReplyManagerSupport.java
URL:
http://svn.apache.org/viewvc/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/ReplyManagerSupport.java?rev=1221129&r1=1221128&r2=1221129&view=diff
==============================================================================
---
camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/ReplyManagerSupport.java
(original)
+++
camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/ReplyManagerSupport.java
Tue Dec 20 07:42:17 2011
@@ -100,7 +100,7 @@ public abstract class ReplyManagerSuppor
return;
}
- log.debug("Received reply message with correlationID: {} -> {}",
correlationID, message);
+ log.debug("Received reply message with correlationID [{}] -> {}",
correlationID, message);
// handle the reply message
handleReplyMessage(correlationID, message);
@@ -114,8 +114,14 @@ public abstract class ReplyManagerSuppor
boolean timeout = holder.isTimeout();
if (timeout) {
+ // timeout occurred do a WARN log so its easier to spot in
the logs
+ log.warn("Timeout occurred after {} millis waiting for
reply message with correlationID [{}]."
+ + " Setting ExchangeTimedOutException on
ExchangeId: {} and continue routing.",
+ new Object[]{holder.getRequestTimeout(),
holder.getCorrelationId(), exchange.getExchangeId()});
+
// no response, so lets set a timed out exception
- exchange.setException(new
ExchangeTimedOutException(exchange, holder.getRequestTimeout()));
+ String msg = "reply message with correlationID: " +
holder.getCorrelationId() + " not received";
+ exchange.setException(new
ExchangeTimedOutException(exchange, holder.getRequestTimeout(), msg));
} else {
JmsMessage response = new JmsMessage(message,
endpoint.getBinding());
Object body = response.getBody();
Modified:
camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/TemporaryQueueReplyHandler.java
URL:
http://svn.apache.org/viewvc/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/TemporaryQueueReplyHandler.java?rev=1221129&r1=1221128&r2=1221129&view=diff
==============================================================================
---
camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/TemporaryQueueReplyHandler.java
(original)
+++
camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/TemporaryQueueReplyHandler.java
Tue Dec 20 07:42:17 2011
@@ -34,27 +34,29 @@ public class TemporaryQueueReplyHandler
protected final AsyncCallback callback;
// remember the original correlation id, in case the server returns back a
reply with a messed up correlation id
protected final String originalCorrelationId;
+ protected final String correlationId;
protected final long timeout;
public TemporaryQueueReplyHandler(ReplyManager replyManager, Exchange
exchange, AsyncCallback callback,
- String originalCorrelationId, long
timeout) {
+ String originalCorrelationId, String
correlationId, long timeout) {
this.replyManager = replyManager;
this.exchange = exchange;
this.originalCorrelationId = originalCorrelationId;
+ this.correlationId = correlationId;
this.callback = callback;
this.timeout = timeout;
}
public void onReply(String correlationId, Message reply) {
// create holder object with the the reply
- ReplyHolder holder = new ReplyHolder(exchange, callback,
originalCorrelationId, reply);
+ ReplyHolder holder = new ReplyHolder(exchange, callback,
originalCorrelationId, correlationId, reply);
// process the reply
replyManager.processReply(holder);
}
public void onTimeout(String correlationId) {
// create holder object without the reply which means a timeout
occurred
- ReplyHolder holder = new ReplyHolder(exchange, callback,
originalCorrelationId, timeout);
+ ReplyHolder holder = new ReplyHolder(exchange, callback,
originalCorrelationId, correlationId, timeout);
// process timeout
replyManager.processReply(holder);
}
Modified:
camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/TemporaryQueueReplyManager.java
URL:
http://svn.apache.org/viewvc/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/TemporaryQueueReplyManager.java?rev=1221129&r1=1221128&r2=1221129&view=diff
==============================================================================
---
camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/TemporaryQueueReplyManager.java
(original)
+++
camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/TemporaryQueueReplyManager.java
Tue Dec 20 07:42:17 2011
@@ -38,7 +38,7 @@ public class TemporaryQueueReplyManager
public String registerReply(ReplyManager replyManager, Exchange exchange,
AsyncCallback callback,
String originalCorrelationId, String
correlationId, long requestTimeout) {
// add to correlation map
- TemporaryQueueReplyHandler handler = new
TemporaryQueueReplyHandler(this, exchange, callback, originalCorrelationId,
requestTimeout);
+ TemporaryQueueReplyHandler handler = new
TemporaryQueueReplyHandler(this, exchange, callback, originalCorrelationId,
correlationId, requestTimeout);
correlation.put(correlationId, handler, requestTimeout);
return correlationId;
}
@@ -66,9 +66,9 @@ public class TemporaryQueueReplyManager
} else {
// we could not correlate the received reply message to a matching
request and therefore
// we cannot continue routing the unknown message
- String text = "Reply received for unknown correlationID [" +
correlationID + "] -> " + message;
+ String text = "Reply received for unknown correlationID [" +
correlationID + "]. The message will be ignored: " + message;
+ // log a warn and then ignore the message
log.warn(text);
- throw new UnknownReplyMessageException(text, message,
correlationID);
}
}