Author: davsclaus
Date: Fri Apr 29 10:36:13 2011
New Revision: 1097761

URL: http://svn.apache.org/viewvc?rev=1097761&view=rev
Log:
CAMEL-3913: Fixed JMS consumer may WARN a ClassCastException during processing 
message. Fixed and improved logic for detecting if JMS consumer should send 
back a reply or not.

Added:
    
camel/trunk/tests/camel-itest/src/test/java/org/apache/camel/itest/jms/JmsHttpPostIssueTest.java
   (with props)
    
camel/trunk/tests/camel-itest/src/test/java/org/apache/camel/itest/jms/JmsHttpPostIssueWithMockTest.java
   (with props)
Modified:
    
camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/EndpointMessageListener.java

Modified: 
camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/EndpointMessageListener.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/EndpointMessageListener.java?rev=1097761&r1=1097760&r2=1097761&view=diff
==============================================================================
--- 
camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/EndpointMessageListener.java
 (original)
+++ 
camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/EndpointMessageListener.java
 Fri Apr 29 10:36:13 2011
@@ -70,22 +70,25 @@ public class EndpointMessageListener imp
             LOG.debug(endpoint + " consumer received JMS message: " + message);
         }
 
+        boolean sendReply;
         RuntimeCamelException rce = null;
         try {
             Object replyDestination = getReplyToDestination(message);
+            // we can only send back a reply if there was a reply destination 
configured
+            // and disableReplyTo hasn't been explicit enabled
+            sendReply = replyDestination != null && !disableReplyTo;
+
             final Exchange exchange = createExchange(message, 
replyDestination);
             if (eagerLoadingOfProperties) {
                 exchange.getIn().getHeaders();
             }
-
-            // process the exchange
-            LOG.trace("onMessage.process START");
-
             String correlationId = message.getJMSCorrelationID();
             if (correlationId != null) {
                 LOG.debug("Received Message has JMSCorrelationID [" + 
correlationId + "]");
             }
 
+            // process the exchange
+            LOG.trace("onMessage.process START");
             try {
                 processor.process(exchange);
             } catch (Throwable e) {
@@ -93,39 +96,44 @@ public class EndpointMessageListener imp
             }
             LOG.trace("onMessage.process END");
 
-            // get the correct jms message to send as reply
-            JmsMessage body = null;
+            // now we evaluate the processing of the exchange and determine if 
it was a success or failure
+            // we also grab information from the exchange to be used for 
sending back a reply (if we are to do so)
+            // so the following logic seems a bit complicated at first glance
+
+            // if we send back a reply it can either be the message body or 
transferring a caused exception
+            org.apache.camel.Message body = null;
             Exception cause = null;
-            boolean sendReply = false;
+
             if (exchange.isFailed() || exchange.isRollbackOnly()) {
-                if (exchange.getException() != null) {
+                if (exchange.isRollbackOnly()) {
+                    // rollback only so wrap an exception so we can rethrow 
the exception to cause rollback
+                    rce = wrapRuntimeCamelException(new 
RollbackExchangeException(exchange));
+                } else if (exchange.getException() != null) {
                     // an exception occurred while processing
                     if (endpoint.isTransferException()) {
-                        // send the exception as reply
+                        // send the exception as reply, so null body and set 
the exception as the cause
                         body = null;
                         cause = exchange.getException();
-                        sendReply = true;
                     } else {
                         // only throw exception if endpoint is not configured 
to transfer exceptions back to caller
                         // do not send a reply but wrap and rethrow the 
exception
                         rce = 
wrapRuntimeCamelException(exchange.getException());
                     }
-                } else if (exchange.isRollbackOnly()) {
-                    // rollback only so wrap an exception so we can rethrow 
the exception to cause rollback
-                    rce = wrapRuntimeCamelException(new 
RollbackExchangeException(exchange));
-                } else if (exchange.getOut().getBody() != null) {
+                } else if (exchange.hasOut() && exchange.getOut().isFault()) {
                     // a fault occurred while processing
-                    body = (JmsMessage) exchange.getOut();
-                    sendReply = true;
+                    body = exchange.getOut();
+                    cause = null;
+                }
+            } else {
+                // process OK so get the reply body if we are InOut and has a 
body
+                if (sendReply && exchange.getPattern().isOutCapable() && 
exchange.hasOut()) {
+                    body = exchange.getOut();
+                    cause = null;
                 }
-            } else if (exchange.hasOut()) {
-                // process OK so get the reply
-                body = (JmsMessage) exchange.getOut();
-                sendReply = true;
             }
 
-            // send the reply if we got a response and the exchange is out 
capable
-            if (rce == null && sendReply && !disableReplyTo && 
exchange.getPattern().isOutCapable()) {
+            // send back reply if there was no error and we are supposed to 
send back a reply
+            if (rce == null && sendReply && (body != null || cause != null)) {
                 LOG.trace("onMessage.sendReply START");
                 if (replyDestination instanceof Destination) {
                     sendReply((Destination)replyDestination, message, 
exchange, body, cause);
@@ -139,6 +147,7 @@ public class EndpointMessageListener imp
             rce = wrapRuntimeCamelException(e);
         }
 
+        // an exception occurred so rethrow to trigger rollback on JMS listener
         if (rce != null) {
             handleException(rce);
             LOG.trace("onMessage END throwing exception: {}", 
rce.getMessage());
@@ -264,7 +273,7 @@ public class EndpointMessageListener imp
     }
 
     protected void sendReply(Destination replyDestination, final Message 
message, final Exchange exchange,
-                             final JmsMessage out, final Exception cause) {
+                             final org.apache.camel.Message out, final 
Exception cause) {
         if (replyDestination == null) {
             if (LOG.isDebugEnabled()) {
                 LOG.debug("Cannot send reply message as there is no 
replyDestination for: " + out);
@@ -286,7 +295,7 @@ public class EndpointMessageListener imp
     }
 
     protected void sendReply(String replyDestination, final Message message, 
final Exchange exchange,
-                             final JmsMessage out, final Exception cause) {
+                             final org.apache.camel.Message out, final 
Exception cause) {
         if (replyDestination == null) {
             if (LOG.isDebugEnabled()) {
                 LOG.debug("Cannot send reply message as there is no 
replyDestination for: " + out);

Added: 
camel/trunk/tests/camel-itest/src/test/java/org/apache/camel/itest/jms/JmsHttpPostIssueTest.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/tests/camel-itest/src/test/java/org/apache/camel/itest/jms/JmsHttpPostIssueTest.java?rev=1097761&view=auto
==============================================================================
--- 
camel/trunk/tests/camel-itest/src/test/java/org/apache/camel/itest/jms/JmsHttpPostIssueTest.java
 (added)
+++ 
camel/trunk/tests/camel-itest/src/test/java/org/apache/camel/itest/jms/JmsHttpPostIssueTest.java
 Fri Apr 29 10:36:13 2011
@@ -0,0 +1,93 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.itest.jms;
+
+import javax.naming.Context;
+
+import org.apache.activemq.camel.component.ActiveMQComponent;
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.camel.builder.NotifyBuilder;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.test.junit4.CamelTestSupport;
+import org.apache.camel.util.jndi.JndiContext;
+import org.junit.Test;
+
+import static org.apache.camel.Exchange.CONTENT_TYPE;
+import static org.apache.camel.Exchange.HTTP_METHOD;
+import static org.apache.camel.Exchange.HTTP_RESPONSE_CODE;
+
+/**
+ * Based on user forum.
+ *
+ * @version 
+ */
+public class JmsHttpPostIssueTest extends CamelTestSupport {
+
+    @Test
+    public void testJmsInOnlyHttpPostIssue() throws Exception {
+        NotifyBuilder notify = new 
NotifyBuilder(context).whenCompleted(1).from("jms*").create();
+
+        template.sendBody("jms:queue:in", "Hello World");
+
+        assertTrue("Should complete the JMS route", 
notify.matchesMockWaitTime());
+    }
+
+    @Test
+    public void testJmsInOutHttpPostIssue() throws Exception {
+        String out = template.requestBody("jms:queue:in", "Hello World", 
String.class);
+        assertEquals("OK", out);
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            public void configure() {
+                from("jms:queue:in")
+                    .setBody().simple("name=${body}")
+                    
.setHeader(CONTENT_TYPE).constant("application/x-www-form-urlencoded")
+                    .setHeader(HTTP_METHOD).constant("POST")
+                    .to("http://localhost:9080/myservice";);
+
+                from("jetty:http://0.0.0.0:9080/myservice";)
+                    .process(new Processor() {
+                        @Override
+                        public void process(Exchange exchange) throws 
Exception {
+                            String body = 
exchange.getIn().getBody(String.class);
+                            assertEquals("name=Hello World", body);
+
+                            exchange.getOut().setBody("OK");
+                            exchange.getOut().setHeader(CONTENT_TYPE, 
"text/plain");
+                            exchange.getOut().setHeader(HTTP_RESPONSE_CODE, 
200);
+                        }
+                    });
+            }
+        };
+    }
+
+    @Override
+    protected Context createJndiContext() throws Exception {
+        JndiContext answer = new JndiContext();
+
+        // add ActiveMQ with embedded broker
+        ActiveMQComponent amq = 
ActiveMQComponent.activeMQComponent("vm://localhost?broker.persistent=false");
+        amq.setCamelContext(context);
+        answer.bind("jms", amq);
+        return answer;
+    }
+
+}
\ No newline at end of file

Propchange: 
camel/trunk/tests/camel-itest/src/test/java/org/apache/camel/itest/jms/JmsHttpPostIssueTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: 
camel/trunk/tests/camel-itest/src/test/java/org/apache/camel/itest/jms/JmsHttpPostIssueTest.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Added: 
camel/trunk/tests/camel-itest/src/test/java/org/apache/camel/itest/jms/JmsHttpPostIssueWithMockTest.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/tests/camel-itest/src/test/java/org/apache/camel/itest/jms/JmsHttpPostIssueWithMockTest.java?rev=1097761&view=auto
==============================================================================
--- 
camel/trunk/tests/camel-itest/src/test/java/org/apache/camel/itest/jms/JmsHttpPostIssueWithMockTest.java
 (added)
+++ 
camel/trunk/tests/camel-itest/src/test/java/org/apache/camel/itest/jms/JmsHttpPostIssueWithMockTest.java
 Fri Apr 29 10:36:13 2011
@@ -0,0 +1,97 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.itest.jms;
+
+import javax.naming.Context;
+
+import org.apache.activemq.camel.component.ActiveMQComponent;
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.test.junit4.CamelTestSupport;
+import org.apache.camel.util.jndi.JndiContext;
+import org.junit.Test;
+
+import static org.apache.camel.Exchange.CONTENT_TYPE;
+import static org.apache.camel.Exchange.HTTP_METHOD;
+import static org.apache.camel.Exchange.HTTP_RESPONSE_CODE;
+
+/**
+ * Based on user forum.
+ *
+ * @version 
+ */
+public class JmsHttpPostIssueWithMockTest extends CamelTestSupport {
+
+    @Test
+    public void testJmsInOnlyHttpPostIssue() throws Exception {
+        getMockEndpoint("mock:result").expectedMessageCount(1);
+
+        template.sendBody("jms:queue:in", "Hello World");
+
+        assertMockEndpointsSatisfied();
+    }
+
+    @Test
+    public void testJmsInOutHttpPostIssue() throws Exception {
+        getMockEndpoint("mock:result").expectedMessageCount(1);
+
+        String out = template.requestBody("jms:queue:in", "Hello World", 
String.class);
+        assertEquals("OK", out);
+
+        assertMockEndpointsSatisfied();
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            public void configure() {
+                from("jms:queue:in")
+                    .setBody().simple("name=${body}")
+                    
.setHeader(CONTENT_TYPE).constant("application/x-www-form-urlencoded")
+                    .setHeader(HTTP_METHOD).constant("POST")
+                    .to("http://localhost:9080/myservice";)
+                    .to("mock:result");
+
+                from("jetty:http://0.0.0.0:9080/myservice";)
+                    .process(new Processor() {
+                        @Override
+                        public void process(Exchange exchange) throws 
Exception {
+                            String body = 
exchange.getIn().getBody(String.class);
+                            assertEquals("name=Hello World", body);
+
+                            exchange.getOut().setBody("OK");
+                            exchange.getOut().setHeader(CONTENT_TYPE, 
"text/plain");
+                            exchange.getOut().setHeader(HTTP_RESPONSE_CODE, 
200);
+                        }
+                    });
+            }
+        };
+    }
+
+    @Override
+    protected Context createJndiContext() throws Exception {
+        JndiContext answer = new JndiContext();
+
+        // add ActiveMQ with embedded broker
+        ActiveMQComponent amq = 
ActiveMQComponent.activeMQComponent("vm://localhost?broker.persistent=false");
+        amq.setCamelContext(context);
+        answer.bind("jms", amq);
+        return answer;
+    }
+
+}
\ No newline at end of file

Propchange: 
camel/trunk/tests/camel-itest/src/test/java/org/apache/camel/itest/jms/JmsHttpPostIssueWithMockTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: 
camel/trunk/tests/camel-itest/src/test/java/org/apache/camel/itest/jms/JmsHttpPostIssueWithMockTest.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date


Reply via email to