Author: davsclaus
Date: Mon Mar 23 13:00:43 2009
New Revision: 757395
URL: http://svn.apache.org/viewvc?rev=757395&view=rev
Log:
CAMEL-585: Added option transferException to send a cause exception back as
reply when using request-reply with Camel JMS.
Added:
camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsTransferExceptionTest.java
- copied, changed from r757317,
camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsTransferExchangeTest.java
Modified:
camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/EndpointMessageListener.java
camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsBinding.java
camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConfiguration.java
camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsEndpoint.java
camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsProducer.java
Modified:
camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/EndpointMessageListener.java
URL:
http://svn.apache.org/viewvc/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/EndpointMessageListener.java?rev=757395&r1=757394&r2=757395&view=diff
==============================================================================
---
camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/EndpointMessageListener.java
(original)
+++
camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/EndpointMessageListener.java
Mon Mar 23 13:00:43 2009
@@ -60,11 +60,12 @@
}
public void onMessage(final Message message) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(endpoint + " consumer receiving JMS message: " +
message);
+ }
+
RuntimeCamelException rce = null;
try {
- if (LOG.isDebugEnabled()) {
- LOG.debug(endpoint + " consumer receiving JMS message: " +
message);
- }
Destination replyDestination = getReplyToDestination(message);
final JmsExchange exchange = createExchange(message,
replyDestination);
if (eagerLoadingOfProperties) {
@@ -76,28 +77,41 @@
// get the correct jms message to send as reply
JmsMessage body = null;
+ Exception cause = null;
+ boolean sendReply = false;
if (exchange.isFailed()) {
if (exchange.getException() != null) {
// an exception occurred while processing
- // TODO: Camel-585 somekind of flag to determine if we
should send the exchange back to the client
- // or do as now where we wrap as runtime exception to be
thrown back to spring so it can do rollback
- rce = wrapRuntimeCamelException(exchange.getException());
+ if (endpoint.isTransferException()) {
+ // send the exception as reply
+ body = null;
+ cause = exchange.getException();
+ sendReply = true;
+ } else {
+ // only throw exception if endpoint is not configured
to transfer exceptions
+ // back to caller
+ rce =
wrapRuntimeCamelException(exchange.getException());
+ }
} else if (exchange.getFault().getBody() != null) {
// a fault occurred while processing
body = exchange.getFault();
+ sendReply = true;
}
} else {
// process OK so get the reply
body = exchange.getOut(false);
+ sendReply = true;
}
// send the reply if we got a response and the exchange is out
capable
- if (rce == null && body != null && !disableReplyTo &&
exchange.getPattern().isOutCapable()) {
- sendReply(replyDestination, message, exchange, body);
+ if (sendReply && !disableReplyTo &&
exchange.getPattern().isOutCapable()) {
+ sendReply(replyDestination, message, exchange, body, cause);
}
+
} catch (Exception e) {
rce = wrapRuntimeCamelException(e);
}
+
if (rce != null) {
getExceptionHandler().handleException(rce);
throw rce;
@@ -194,7 +208,8 @@
// Implementation methods
//-------------------------------------------------------------------------
- protected void sendReply(Destination replyDestination, final Message
message, final JmsExchange exchange, final JmsMessage out) {
+ protected void sendReply(Destination replyDestination, final Message
message, final JmsExchange exchange,
+ final JmsMessage out, final Exception cause) {
if (replyDestination == null) {
if (LOG.isDebugEnabled()) {
LOG.debug("Cannot send reply message as there is no
replyDestination for: " + out);
@@ -203,7 +218,7 @@
}
getTemplate().send(replyDestination, new MessageCreator() {
public Message createMessage(Session session) throws JMSException {
- Message reply = endpoint.getBinding().makeJmsMessage(exchange,
out, session);
+ Message reply = endpoint.getBinding().makeJmsMessage(exchange,
out, session, cause);
if
(endpoint.getConfiguration().isUseMessageIDAsCorrelationID()) {
String messageID =
exchange.getIn().getHeader("JMSMessageID", String.class);
Modified:
camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsBinding.java
URL:
http://svn.apache.org/viewvc/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsBinding.java?rev=757395&r1=757394&r2=757395&view=diff
==============================================================================
---
camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsBinding.java
(original)
+++
camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsBinding.java
Mon Mar 23 13:00:43 2009
@@ -176,24 +176,28 @@
/**
* Creates a JMS message from the Camel exchange and message
*
+ * @param exchange the current exchange
* @param session the JMS session used to create the message
* @return a newly created JMS Message instance containing the
* @throws JMSException if the message could not be created
*/
public Message makeJmsMessage(Exchange exchange, Session session) throws
JMSException {
- return makeJmsMessage(exchange, exchange.getIn(), session);
+ return makeJmsMessage(exchange, exchange.getIn(), session, null);
}
/**
* Creates a JMS message from the Camel exchange and message
*
+ * @param exchange the current exchange
+ * @param camelMessage the body to make a javax.jms.Message as
* @param session the JMS session used to create the message
+ * @param cause optional exception occured that should be sent as reply
instead of a regular body
* @return a newly created JMS Message instance containing the
* @throws JMSException if the message could not be created
*/
- public Message makeJmsMessage(Exchange exchange, org.apache.camel.Message
camelMessage, Session session)
- throws JMSException {
+ public Message makeJmsMessage(Exchange exchange, org.apache.camel.Message
camelMessage, Session session, Exception cause) throws JMSException {
Message answer = null;
+
boolean alwaysCopy = (endpoint != null) &&
endpoint.getConfiguration().isAlwaysCopyMessage();
if (!alwaysCopy && camelMessage instanceof JmsMessage) {
JmsMessage jmsMessage = (JmsMessage)camelMessage;
@@ -201,10 +205,22 @@
answer = jmsMessage.getJmsMessage();
}
}
+
if (answer == null) {
- answer = createJmsMessage(exchange, camelMessage.getBody(),
camelMessage.getHeaders(), session, exchange.getContext());
- appendJmsProperties(answer, exchange, camelMessage);
+ if (cause != null) {
+ // an exception occured so send it as response
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Will create JmsMessage with caused exception: "
+ cause);
+ }
+ // create jms message containg the caused exception
+ answer = createJmsMessage(cause, session);
+ } else {
+ // create regular jms message using the camel message body
+ answer = createJmsMessage(exchange, camelMessage.getBody(),
camelMessage.getHeaders(), session, exchange.getContext());
+ appendJmsProperties(answer, exchange, camelMessage);
+ }
}
+
return answer;
}
@@ -218,8 +234,7 @@
/**
* Appends the JMS headers from the Camel {...@link JmsMessage}
*/
- public void appendJmsProperties(Message jmsMessage, Exchange exchange,
org.apache.camel.Message in)
- throws JMSException {
+ public void appendJmsProperties(Message jmsMessage, Exchange exchange,
org.apache.camel.Message in) throws JMSException {
Set<Map.Entry<String, Object>> entries = in.getHeaders().entrySet();
for (Map.Entry<String, Object> entry : entries) {
String headerName = entry.getKey();
@@ -296,6 +311,13 @@
return null;
}
+ protected Message createJmsMessage(Exception cause, Session session)
throws JMSException {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Using JmsMessageType: " + Object);
+ }
+ return session.createObjectMessage(cause);
+ }
+
protected Message createJmsMessage(Exchange exchange, Object body,
Map<String, Object> headers, Session session, CamelContext context) throws
JMSException {
JmsMessageType type = null;
Modified:
camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConfiguration.java
URL:
http://svn.apache.org/viewvc/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConfiguration.java?rev=757395&r1=757394&r2=757395&view=diff
==============================================================================
---
camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConfiguration.java
(original)
+++
camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConfiguration.java
Mon Mar 23 13:00:43 2009
@@ -134,6 +134,7 @@
private JmsMessageType jmsMessageType;
private JmsKeyFormatStrategy jmsKeyFormatStrategy;
private boolean transferExchange;
+ private boolean transferException;
public JmsConfiguration() {
}
@@ -1174,4 +1175,12 @@
public void setTransferExchange(boolean transferExchange) {
this.transferExchange = transferExchange;
}
+
+ public boolean isTransferException() {
+ return transferException;
+ }
+
+ public void setTransferException(boolean transferException) {
+ this.transferException = transferException;
+ }
}
Modified:
camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsEndpoint.java
URL:
http://svn.apache.org/viewvc/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsEndpoint.java?rev=757395&r1=757394&r2=757395&view=diff
==============================================================================
---
camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsEndpoint.java
(original)
+++
camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsEndpoint.java
Mon Mar 23 13:00:43 2009
@@ -787,6 +787,14 @@
getConfiguration().setTransferExchange(transferExchange);
}
+ public boolean isTransferException() {
+ return getConfiguration().isTransferException();
+ }
+
+ public void setTransferException(boolean transferException) {
+ getConfiguration().setTransferException(transferException);
+ }
+
// Implementation methods
//-------------------------------------------------------------------------
Modified:
camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsProducer.java
URL:
http://svn.apache.org/viewvc/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsProducer.java?rev=757395&r1=757394&r2=757395&view=diff
==============================================================================
---
camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsProducer.java
(original)
+++
camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsProducer.java
Mon Mar 23 13:00:43 2009
@@ -99,13 +99,11 @@
try {
JmsConfiguration c = endpoint.getConfiguration();
if (c.getReplyTo() != null) {
- requestor = new
PersistentReplyToRequestor(endpoint.getConfiguration(), endpoint
- .getExecutorService());
+ requestor = new
PersistentReplyToRequestor(endpoint.getConfiguration(),
endpoint.getExecutorService());
requestor.start();
} else {
if (affinity == RequestorAffinity.PER_PRODUCER) {
- requestor = new
Requestor(endpoint.getConfiguration(), endpoint
- .getExecutorService());
+ requestor = new
Requestor(endpoint.getConfiguration(), endpoint.getExecutorService());
requestor.start();
} else if (affinity == RequestorAffinity.PER_ENDPOINT)
{
requestor = endpoint.getRequestor();
@@ -175,11 +173,11 @@
final CamelJmsTemplate template =
(CamelJmsTemplate)getInOutTemplate();
MessageCreator messageCreator = new MessageCreator() {
public Message createMessage(Session session) throws
JMSException {
- Message message =
endpoint.getBinding().makeJmsMessage(exchange, in, session);
+ Message message =
endpoint.getBinding().makeJmsMessage(exchange, in, session, null);
message.setJMSReplyTo(replyTo);
requestor.setReplyToSelectorHeader(in, message);
- FutureTask future = null;
+ FutureTask future;
future = (!msgIdAsCorrId)
?
requestor.getReceiveFuture(message.getJMSCorrelationID(),
endpoint.getConfiguration().getRequestTimeout())
: requestor.getReceiveFuture(callback);
@@ -223,13 +221,32 @@
}
}
if (message != null) {
- exchange.setOut(new JmsMessage(message,
endpoint.getBinding()));
+ // the response can be an exception
+ JmsMessage response = new JmsMessage(message,
endpoint.getBinding());
+ Object body = response.getBody();
+
+ if (endpoint.isTransferException() && body instanceof
Exception) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Reply recieved. Setting reply as
Exception: " + body);
+ }
+ // we got an exception back and endpoint was configued
to transfer exception
+ // therefore set response as exception
+ exchange.setException((Exception) body);
+ } else {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Reply recieved. Setting reply as OUT
message: " + body);
+ }
+ // regular response
+ exchange.setOut(response);
+ }
+
+ // correlation
if (correlationId != null) {
message.setJMSCorrelationID(correlationId);
exchange.getOut(false).setHeader("JMSCorrelationID",
correlationId);
}
} else {
- // lets set a timed out exception
+ // no response, so lets set a timed out exception
exchange.setException(new
ExchangeTimedOutException(exchange, requestTimeout));
}
} catch (Exception e) {
@@ -238,7 +255,7 @@
} else {
MessageCreator messageCreator = new MessageCreator() {
public Message createMessage(Session session) throws
JMSException {
- Message message =
endpoint.getBinding().makeJmsMessage(exchange, in, session);
+ Message message =
endpoint.getBinding().makeJmsMessage(exchange, in, session, null);
if (LOG.isDebugEnabled()) {
LOG.debug(endpoint + " sending JMS message: " +
message);
}
Copied:
camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsTransferExceptionTest.java
(from r757317,
camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsTransferExchangeTest.java)
URL:
http://svn.apache.org/viewvc/camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsTransferExceptionTest.java?p2=camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsTransferExceptionTest.java&p1=camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsTransferExchangeTest.java&r1=757317&r2=757395&rev=757395&view=diff
==============================================================================
---
camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsTransferExchangeTest.java
(original)
+++
camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsTransferExceptionTest.java
Mon Mar 23 13:00:43 2009
@@ -23,53 +23,55 @@
import org.apache.camel.ContextTestSupport;
import org.apache.camel.Exchange;
import org.apache.camel.Processor;
+import org.apache.camel.RuntimeCamelException;
import org.apache.camel.builder.RouteBuilder;
-import org.apache.camel.component.mock.MockEndpoint;
import static
org.apache.camel.component.jms.JmsComponent.jmsComponentClientAcknowledge;
+import org.apache.camel.component.mock.MockEndpoint;
/**
* @version $Revision$
*/
-public class JmsTransferExchangeTest extends ContextTestSupport {
-
- protected String getUri() {
- return "activemq:queue:foo?transferExchange=true";
- }
-
- public void testBodyOnly() throws Exception {
- MockEndpoint mock = getMockEndpoint("mock:result");
- mock.expectedBodiesReceived("Hello World");
+public class JmsTransferExceptionTest extends ContextTestSupport {
- template.sendBody("direct:start", "Hello World");
+ private static int counter;
- assertMockEndpointsSatisfied();
+ protected String getUri() {
+ return "activemq:queue:foo?transferException=true";
}
- public void testBodyAndHeaderOnly() throws Exception {
- MockEndpoint mock = getMockEndpoint("mock:result");
- mock.expectedBodiesReceived("Hello World");
- mock.expectedHeaderReceived("foo", "cheese");
-
- template.sendBodyAndHeader("direct:start", "Hello World", "foo",
"cheese");
+ @Override
+ protected void setUp() throws Exception {
+ counter = 0;
+ super.setUp();
+ }
+
+ public void testOk() throws Exception {
+ Object out = template.requestBody(getUri(), "Hello World");
+ assertEquals("Bye World", out);
+
+ assertEquals(1, counter);
+ }
+
+ public void testTransferExeption() throws Exception {
+ // should fail as we thrown an exception
+ MockEndpoint dead = getMockEndpoint("mock:dead");
+ dead.expectedMessageCount(1);
+
+ // we send something that causes a remote exception
+ // then we expect our producer template to thrown
+ // an exception with the remote exception as cause
+ try {
+ template.requestBody(getUri(), "Kabom");
+ fail("Should have thrown an exception");
+ } catch (RuntimeCamelException e) {
+ assertEquals("Boom", e.getCause().getMessage());
+ assertNotNull("Should contain a remote stacktrace",
e.getCause().getStackTrace());
+ }
assertMockEndpointsSatisfied();
- }
- public void testSendExchange() throws Exception {
- MockEndpoint mock = getMockEndpoint("mock:result");
- mock.expectedBodiesReceived("Hello World");
- mock.expectedHeaderReceived("foo", "cheese");
- mock.expectedPropertyReceived("bar", 123);
-
- template.send("direct:start", new Processor() {
- public void process(Exchange exchange) throws Exception {
- exchange.getIn().setBody("Hello World");
- exchange.getIn().setHeader("foo", "cheese");
- exchange.setProperty("bar", 123);
- }
- });
-
- assertMockEndpointsSatisfied();
+ // we still try redeliver
+ assertEquals(3, counter);
}
protected CamelContext createCamelContext() throws Exception {
@@ -86,10 +88,22 @@
return new RouteBuilder() {
@Override
public void configure() throws Exception {
- from("direct:start").to(getUri());
- from(getUri()).to("mock:result");
+
errorHandler(deadLetterChannel("mock:dead").maximumRedeliveries(2).delay(0).logStackTrace(false));
+
+ from(getUri())
+ .process(new Processor() {
+ public void process(Exchange exchange) throws
Exception {
+ counter++;
+
+ String body =
exchange.getIn().getBody(String.class);
+ if (body.equals("Kabom")) {
+ throw new IllegalArgumentException("Boom");
+ }
+ exchange.getOut().setBody("Bye World");
+ }
+ });
}
};
}
-}
+}
\ No newline at end of file