Author: davsclaus
Date: Fri Apr 29 10:36:13 2011
New Revision: 1097761
URL: http://svn.apache.org/viewvc?rev=1097761&view=rev
Log:
CAMEL-3913: Fixed JMS consumer may WARN a ClassCastException during processing
message. Fixed and improved logic for detecting if JMS consumer should send
back a reply or not.
Added:
camel/trunk/tests/camel-itest/src/test/java/org/apache/camel/itest/jms/JmsHttpPostIssueTest.java
(with props)
camel/trunk/tests/camel-itest/src/test/java/org/apache/camel/itest/jms/JmsHttpPostIssueWithMockTest.java
(with props)
Modified:
camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/EndpointMessageListener.java
Modified:
camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/EndpointMessageListener.java
URL:
http://svn.apache.org/viewvc/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/EndpointMessageListener.java?rev=1097761&r1=1097760&r2=1097761&view=diff
==============================================================================
---
camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/EndpointMessageListener.java
(original)
+++
camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/EndpointMessageListener.java
Fri Apr 29 10:36:13 2011
@@ -70,22 +70,25 @@ public class EndpointMessageListener imp
LOG.debug(endpoint + " consumer received JMS message: " + message);
}
+ boolean sendReply;
RuntimeCamelException rce = null;
try {
Object replyDestination = getReplyToDestination(message);
+ // we can only send back a reply if there was a reply destination
configured
+ // and disableReplyTo hasn't been explicit enabled
+ sendReply = replyDestination != null && !disableReplyTo;
+
final Exchange exchange = createExchange(message,
replyDestination);
if (eagerLoadingOfProperties) {
exchange.getIn().getHeaders();
}
-
- // process the exchange
- LOG.trace("onMessage.process START");
-
String correlationId = message.getJMSCorrelationID();
if (correlationId != null) {
LOG.debug("Received Message has JMSCorrelationID [" +
correlationId + "]");
}
+ // process the exchange
+ LOG.trace("onMessage.process START");
try {
processor.process(exchange);
} catch (Throwable e) {
@@ -93,39 +96,44 @@ public class EndpointMessageListener imp
}
LOG.trace("onMessage.process END");
- // get the correct jms message to send as reply
- JmsMessage body = null;
+ // now we evaluate the processing of the exchange and determine if
it was a success or failure
+ // we also grab information from the exchange to be used for
sending back a reply (if we are to do so)
+ // so the following logic seems a bit complicated at first glance
+
+ // if we send back a reply it can either be the message body or
transferring a caused exception
+ org.apache.camel.Message body = null;
Exception cause = null;
- boolean sendReply = false;
+
if (exchange.isFailed() || exchange.isRollbackOnly()) {
- if (exchange.getException() != null) {
+ if (exchange.isRollbackOnly()) {
+ // rollback only so wrap an exception so we can rethrow
the exception to cause rollback
+ rce = wrapRuntimeCamelException(new
RollbackExchangeException(exchange));
+ } else if (exchange.getException() != null) {
// an exception occurred while processing
if (endpoint.isTransferException()) {
- // send the exception as reply
+ // send the exception as reply, so null body and set
the exception as the cause
body = null;
cause = exchange.getException();
- sendReply = true;
} else {
// only throw exception if endpoint is not configured
to transfer exceptions back to caller
// do not send a reply but wrap and rethrow the
exception
rce =
wrapRuntimeCamelException(exchange.getException());
}
- } else if (exchange.isRollbackOnly()) {
- // rollback only so wrap an exception so we can rethrow
the exception to cause rollback
- rce = wrapRuntimeCamelException(new
RollbackExchangeException(exchange));
- } else if (exchange.getOut().getBody() != null) {
+ } else if (exchange.hasOut() && exchange.getOut().isFault()) {
// a fault occurred while processing
- body = (JmsMessage) exchange.getOut();
- sendReply = true;
+ body = exchange.getOut();
+ cause = null;
+ }
+ } else {
+ // process OK so get the reply body if we are InOut and has a
body
+ if (sendReply && exchange.getPattern().isOutCapable() &&
exchange.hasOut()) {
+ body = exchange.getOut();
+ cause = null;
}
- } else if (exchange.hasOut()) {
- // process OK so get the reply
- body = (JmsMessage) exchange.getOut();
- sendReply = true;
}
- // send the reply if we got a response and the exchange is out
capable
- if (rce == null && sendReply && !disableReplyTo &&
exchange.getPattern().isOutCapable()) {
+ // send back reply if there was no error and we are supposed to
send back a reply
+ if (rce == null && sendReply && (body != null || cause != null)) {
LOG.trace("onMessage.sendReply START");
if (replyDestination instanceof Destination) {
sendReply((Destination)replyDestination, message,
exchange, body, cause);
@@ -139,6 +147,7 @@ public class EndpointMessageListener imp
rce = wrapRuntimeCamelException(e);
}
+ // an exception occurred so rethrow to trigger rollback on JMS listener
if (rce != null) {
handleException(rce);
LOG.trace("onMessage END throwing exception: {}",
rce.getMessage());
@@ -264,7 +273,7 @@ public class EndpointMessageListener imp
}
protected void sendReply(Destination replyDestination, final Message
message, final Exchange exchange,
- final JmsMessage out, final Exception cause) {
+ final org.apache.camel.Message out, final
Exception cause) {
if (replyDestination == null) {
if (LOG.isDebugEnabled()) {
LOG.debug("Cannot send reply message as there is no
replyDestination for: " + out);
@@ -286,7 +295,7 @@ public class EndpointMessageListener imp
}
protected void sendReply(String replyDestination, final Message message,
final Exchange exchange,
- final JmsMessage out, final Exception cause) {
+ final org.apache.camel.Message out, final
Exception cause) {
if (replyDestination == null) {
if (LOG.isDebugEnabled()) {
LOG.debug("Cannot send reply message as there is no
replyDestination for: " + out);
Added:
camel/trunk/tests/camel-itest/src/test/java/org/apache/camel/itest/jms/JmsHttpPostIssueTest.java
URL:
http://svn.apache.org/viewvc/camel/trunk/tests/camel-itest/src/test/java/org/apache/camel/itest/jms/JmsHttpPostIssueTest.java?rev=1097761&view=auto
==============================================================================
---
camel/trunk/tests/camel-itest/src/test/java/org/apache/camel/itest/jms/JmsHttpPostIssueTest.java
(added)
+++
camel/trunk/tests/camel-itest/src/test/java/org/apache/camel/itest/jms/JmsHttpPostIssueTest.java
Fri Apr 29 10:36:13 2011
@@ -0,0 +1,93 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.itest.jms;
+
+import javax.naming.Context;
+
+import org.apache.activemq.camel.component.ActiveMQComponent;
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.camel.builder.NotifyBuilder;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.test.junit4.CamelTestSupport;
+import org.apache.camel.util.jndi.JndiContext;
+import org.junit.Test;
+
+import static org.apache.camel.Exchange.CONTENT_TYPE;
+import static org.apache.camel.Exchange.HTTP_METHOD;
+import static org.apache.camel.Exchange.HTTP_RESPONSE_CODE;
+
+/**
+ * Based on user forum.
+ *
+ * @version
+ */
+public class JmsHttpPostIssueTest extends CamelTestSupport {
+
+ @Test
+ public void testJmsInOnlyHttpPostIssue() throws Exception {
+ NotifyBuilder notify = new
NotifyBuilder(context).whenCompleted(1).from("jms*").create();
+
+ template.sendBody("jms:queue:in", "Hello World");
+
+ assertTrue("Should complete the JMS route",
notify.matchesMockWaitTime());
+ }
+
+ @Test
+ public void testJmsInOutHttpPostIssue() throws Exception {
+ String out = template.requestBody("jms:queue:in", "Hello World",
String.class);
+ assertEquals("OK", out);
+ }
+
+ @Override
+ protected RouteBuilder createRouteBuilder() throws Exception {
+ return new RouteBuilder() {
+ public void configure() {
+ from("jms:queue:in")
+ .setBody().simple("name=${body}")
+
.setHeader(CONTENT_TYPE).constant("application/x-www-form-urlencoded")
+ .setHeader(HTTP_METHOD).constant("POST")
+ .to("http://localhost:9080/myservice");
+
+ from("jetty:http://0.0.0.0:9080/myservice")
+ .process(new Processor() {
+ @Override
+ public void process(Exchange exchange) throws
Exception {
+ String body =
exchange.getIn().getBody(String.class);
+ assertEquals("name=Hello World", body);
+
+ exchange.getOut().setBody("OK");
+ exchange.getOut().setHeader(CONTENT_TYPE,
"text/plain");
+ exchange.getOut().setHeader(HTTP_RESPONSE_CODE,
200);
+ }
+ });
+ }
+ };
+ }
+
+ @Override
+ protected Context createJndiContext() throws Exception {
+ JndiContext answer = new JndiContext();
+
+ // add ActiveMQ with embedded broker
+ ActiveMQComponent amq =
ActiveMQComponent.activeMQComponent("vm://localhost?broker.persistent=false");
+ amq.setCamelContext(context);
+ answer.bind("jms", amq);
+ return answer;
+ }
+
+}
\ No newline at end of file
Propchange:
camel/trunk/tests/camel-itest/src/test/java/org/apache/camel/itest/jms/JmsHttpPostIssueTest.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange:
camel/trunk/tests/camel-itest/src/test/java/org/apache/camel/itest/jms/JmsHttpPostIssueTest.java
------------------------------------------------------------------------------
svn:keywords = Rev Date
Added:
camel/trunk/tests/camel-itest/src/test/java/org/apache/camel/itest/jms/JmsHttpPostIssueWithMockTest.java
URL:
http://svn.apache.org/viewvc/camel/trunk/tests/camel-itest/src/test/java/org/apache/camel/itest/jms/JmsHttpPostIssueWithMockTest.java?rev=1097761&view=auto
==============================================================================
---
camel/trunk/tests/camel-itest/src/test/java/org/apache/camel/itest/jms/JmsHttpPostIssueWithMockTest.java
(added)
+++
camel/trunk/tests/camel-itest/src/test/java/org/apache/camel/itest/jms/JmsHttpPostIssueWithMockTest.java
Fri Apr 29 10:36:13 2011
@@ -0,0 +1,97 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.itest.jms;
+
+import javax.naming.Context;
+
+import org.apache.activemq.camel.component.ActiveMQComponent;
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.test.junit4.CamelTestSupport;
+import org.apache.camel.util.jndi.JndiContext;
+import org.junit.Test;
+
+import static org.apache.camel.Exchange.CONTENT_TYPE;
+import static org.apache.camel.Exchange.HTTP_METHOD;
+import static org.apache.camel.Exchange.HTTP_RESPONSE_CODE;
+
+/**
+ * Based on user forum.
+ *
+ * @version
+ */
+public class JmsHttpPostIssueWithMockTest extends CamelTestSupport {
+
+ @Test
+ public void testJmsInOnlyHttpPostIssue() throws Exception {
+ getMockEndpoint("mock:result").expectedMessageCount(1);
+
+ template.sendBody("jms:queue:in", "Hello World");
+
+ assertMockEndpointsSatisfied();
+ }
+
+ @Test
+ public void testJmsInOutHttpPostIssue() throws Exception {
+ getMockEndpoint("mock:result").expectedMessageCount(1);
+
+ String out = template.requestBody("jms:queue:in", "Hello World",
String.class);
+ assertEquals("OK", out);
+
+ assertMockEndpointsSatisfied();
+ }
+
+ @Override
+ protected RouteBuilder createRouteBuilder() throws Exception {
+ return new RouteBuilder() {
+ public void configure() {
+ from("jms:queue:in")
+ .setBody().simple("name=${body}")
+
.setHeader(CONTENT_TYPE).constant("application/x-www-form-urlencoded")
+ .setHeader(HTTP_METHOD).constant("POST")
+ .to("http://localhost:9080/myservice")
+ .to("mock:result");
+
+ from("jetty:http://0.0.0.0:9080/myservice")
+ .process(new Processor() {
+ @Override
+ public void process(Exchange exchange) throws
Exception {
+ String body =
exchange.getIn().getBody(String.class);
+ assertEquals("name=Hello World", body);
+
+ exchange.getOut().setBody("OK");
+ exchange.getOut().setHeader(CONTENT_TYPE,
"text/plain");
+ exchange.getOut().setHeader(HTTP_RESPONSE_CODE,
200);
+ }
+ });
+ }
+ };
+ }
+
+ @Override
+ protected Context createJndiContext() throws Exception {
+ JndiContext answer = new JndiContext();
+
+ // add ActiveMQ with embedded broker
+ ActiveMQComponent amq =
ActiveMQComponent.activeMQComponent("vm://localhost?broker.persistent=false");
+ amq.setCamelContext(context);
+ answer.bind("jms", amq);
+ return answer;
+ }
+
+}
\ No newline at end of file
Propchange:
camel/trunk/tests/camel-itest/src/test/java/org/apache/camel/itest/jms/JmsHttpPostIssueWithMockTest.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange:
camel/trunk/tests/camel-itest/src/test/java/org/apache/camel/itest/jms/JmsHttpPostIssueWithMockTest.java
------------------------------------------------------------------------------
svn:keywords = Rev Date