Author: davsclaus
Date: Sun Sep 25 07:33:36 2011
New Revision: 1175313

URL: http://svn.apache.org/viewvc?rev=1175313&view=rev
Log:
CAMEL-4484: Fixed recipient list EIP using custom expression which throws 
exception, not being triggered by onException.

Added:
    
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/EvaluateExpressionProcessor.java
    
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/EvaluateExpressionProcessorTest.java
    
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/RecipientListThrowExceptionFromExpressionTest.java
Modified:
    camel/trunk/camel-core/src/main/java/org/apache/camel/Exchange.java
    
camel/trunk/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinition.java
    
camel/trunk/camel-core/src/main/java/org/apache/camel/model/RecipientListDefinition.java
    
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/RecipientList.java
    
camel/trunk/camel-core/src/test/java/org/apache/camel/builder/RouteBuilderTest.java

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/Exchange.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/Exchange.java?rev=1175313&r1=1175312&r2=1175313&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/Exchange.java 
(original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/Exchange.java Sun Sep 
25 07:33:36 2011
@@ -104,8 +104,9 @@ public interface Exchange {
     String DISABLE_HTTP_STREAM_CACHE = "CamelDisableHttpStreamCache";
     String DUPLICATE_MESSAGE         = "CamelDuplicateMessage";
 
-    String ERRORHANDLER_HANDLED = "CamelErrorHandlerHandled";
-    String EXCEPTION_CAUGHT     = "CamelExceptionCaught";
+    String EXCEPTION_CAUGHT           = "CamelExceptionCaught";
+    String EVALUATE_EXPRESSION_RESULT = "CamelEvaluateExpressionResult";
+    String ERRORHANDLER_HANDLED       = "CamelErrorHandlerHandled";
 
     String FAILURE_HANDLED      = "CamelFailureHandled";
     String FAILURE_ENDPOINT     = "CamelFailureEndpoint";

Modified: 
camel/trunk/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinition.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinition.java?rev=1175313&r1=1175312&r2=1175313&view=diff
==============================================================================
--- 
camel/trunk/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinition.java
 (original)
+++ 
camel/trunk/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinition.java
 Sun Sep 25 07:33:36 2011
@@ -261,8 +261,8 @@ public abstract class ProcessorDefinitio
             // do not use error handler for recipient list as it offers fine 
grained error handlers for its outputs
             // however if share unit of work is enabled, we need to wrap an 
error handler on the recipient list parent
             RecipientListDefinition def = (RecipientListDefinition) defn;
-            if (def.isShareUnitOfWork() && child == null) {
-                // only wrap the parent (not the children of the multicast)
+            if (def.isShareUnitOfWork()) {
+                // note a recipient list cannot have children so no need for a 
child == null check
                 wrapChannelInErrorHandler(channel, routeContext);
             } else {
                 log.trace("{} is part of multicast/recipientList which have 
special error handling so no error handler is applied", defn);

Modified: 
camel/trunk/camel-core/src/main/java/org/apache/camel/model/RecipientListDefinition.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/model/RecipientListDefinition.java?rev=1175313&r1=1175312&r2=1175313&view=diff
==============================================================================
--- 
camel/trunk/camel-core/src/main/java/org/apache/camel/model/RecipientListDefinition.java
 (original)
+++ 
camel/trunk/camel-core/src/main/java/org/apache/camel/model/RecipientListDefinition.java
 Sun Sep 25 07:33:36 2011
@@ -16,6 +16,8 @@
  */
 package org.apache.camel.model;
 
+import java.util.ArrayList;
+import java.util.List;
 import java.util.concurrent.ExecutorService;
 
 import javax.xml.bind.annotation.XmlAccessType;
@@ -27,6 +29,8 @@ import javax.xml.bind.annotation.XmlTran
 import org.apache.camel.Expression;
 import org.apache.camel.Processor;
 import org.apache.camel.model.language.ExpressionDefinition;
+import org.apache.camel.processor.EvaluateExpressionProcessor;
+import org.apache.camel.processor.Pipeline;
 import org.apache.camel.processor.RecipientList;
 import org.apache.camel.processor.aggregate.AggregationStrategy;
 import org.apache.camel.processor.aggregate.UseLatestAggregationStrategy;
@@ -97,7 +101,7 @@ public class RecipientListDefinition<Typ
 
     @Override
     public Processor createProcessor(RouteContext routeContext) throws 
Exception {
-        Expression expression = getExpression().createExpression(routeContext);
+        final Expression expression = 
getExpression().createExpression(routeContext);
 
         RecipientList answer;
         if (delimiter != null) {
@@ -135,7 +139,29 @@ public class RecipientListDefinition<Typ
             throw new IllegalArgumentException("Timeout is used but 
ParallelProcessing has not been enabled.");
         }
 
-        return answer;
+        // create a pipeline with two processors
+        // the first is the eval processor which evaluates the expression to 
use
+        // the second is the recipient list
+        List<Processor> pipe = new ArrayList<Processor>(2);
+
+        // the eval processor must be wrapped in error handler, so in case 
there was an
+        // error during evaluation, the error handler can deal with it
+        // the recipient list is not in error handler, as its has its own 
special error handling
+        // when sending to the recipients individually
+        Processor evalProcessor = new EvaluateExpressionProcessor(expression);
+        evalProcessor = super.wrapInErrorHandler(routeContext, evalProcessor);
+
+        pipe.add(evalProcessor);
+        pipe.add(answer);
+
+        // wrap in nested pipeline so this appears as one processor
+        // (threads definition does this as well)
+        return new Pipeline(routeContext.getCamelContext(), pipe) {
+            @Override
+            public String toString() {
+                return "RecipientList[" + expression + "]";
+            }
+        };
     }
     
     private AggregationStrategy createAggregationStrategy(RouteContext 
routeContext) {

Added: 
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/EvaluateExpressionProcessor.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/EvaluateExpressionProcessor.java?rev=1175313&view=auto
==============================================================================
--- 
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/EvaluateExpressionProcessor.java
 (added)
+++ 
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/EvaluateExpressionProcessor.java
 Sun Sep 25 07:33:36 2011
@@ -0,0 +1,69 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor;
+
+import org.apache.camel.AsyncCallback;
+import org.apache.camel.AsyncProcessor;
+import org.apache.camel.Exchange;
+import org.apache.camel.Expression;
+import org.apache.camel.Traceable;
+import org.apache.camel.util.AsyncProcessorHelper;
+
+/**
+ * A {@link org.apache.camel.Processor} which evaluates an {@link Expression}
+ * and stores the result as a property on the {@link Exchange} with the key
+ * {@link Exchange#EVALUATE_EXPRESSION_RESULT}.
+ * <p/>
+ * This processor will in case of evaluation exceptions, set the caused 
exception
+ * on the {@link Exchange}.
+ */
+public class EvaluateExpressionProcessor implements AsyncProcessor, Traceable {
+
+    private final Expression expression;
+
+    public EvaluateExpressionProcessor(Expression expression) {
+        this.expression = expression;
+    }
+
+    @Override
+    public void process(Exchange exchange) throws Exception {
+        AsyncProcessorHelper.process(this, exchange);
+    }
+
+    @Override
+    public boolean process(Exchange exchange, AsyncCallback callback) {
+        try {
+            Object result = expression.evaluate(exchange, Object.class);
+            exchange.setProperty(Exchange.EVALUATE_EXPRESSION_RESULT, result);
+        } catch (Throwable e) {
+            exchange.setException(e);
+        } finally {
+            callback.done(true);
+        }
+        return true;
+    }
+
+    @Override
+    public String toString() {
+        return "EvalExpression[" + expression + "]";
+    }
+
+    public String getTraceLabel() {
+        return "eval[" + expression + "]";
+    }
+
+}

Modified: 
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/RecipientList.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/RecipientList.java?rev=1175313&r1=1175312&r2=1175313&view=diff
==============================================================================
--- 
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/RecipientList.java
 (original)
+++ 
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/RecipientList.java
 Sun Sep 25 07:33:36 2011
@@ -101,7 +101,13 @@ public class RecipientList extends Servi
             throw new IllegalStateException("RecipientList has not been 
started: " + this);
         }
 
-        Object recipientList = expression.evaluate(exchange, Object.class);
+        // use the evaluate expression result if exists
+        Object recipientList = 
exchange.removeProperty(Exchange.EVALUATE_EXPRESSION_RESULT);
+        if (recipientList == null && expression != null) {
+            // fallback and evaluate the expression
+            recipientList = expression.evaluate(exchange, Object.class);
+        }
+
         return sendToRecipientList(exchange, recipientList, callback);
     }
 

Modified: 
camel/trunk/camel-core/src/test/java/org/apache/camel/builder/RouteBuilderTest.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/builder/RouteBuilderTest.java?rev=1175313&r1=1175312&r2=1175313&view=diff
==============================================================================
--- 
camel/trunk/camel-core/src/test/java/org/apache/camel/builder/RouteBuilderTest.java
 (original)
+++ 
camel/trunk/camel-core/src/test/java/org/apache/camel/builder/RouteBuilderTest.java
 Sun Sep 25 07:33:36 2011
@@ -17,6 +17,7 @@
 package org.apache.camel.builder;
 
 import java.util.ArrayList;
+import java.util.Iterator;
 import java.util.List;
 
 import org.apache.camel.CamelContext;
@@ -32,6 +33,7 @@ import org.apache.camel.impl.DefaultCame
 import org.apache.camel.impl.EventDrivenConsumerRoute;
 import org.apache.camel.processor.ChoiceProcessor;
 import org.apache.camel.processor.DeadLetterChannel;
+import org.apache.camel.processor.EvaluateExpressionProcessor;
 import org.apache.camel.processor.FilterProcessor;
 import org.apache.camel.processor.MulticastProcessor;
 import org.apache.camel.processor.Pipeline;
@@ -367,7 +369,17 @@ public class RouteBuilderTest extends Te
             EventDrivenConsumerRoute consumer = 
assertIsInstanceOf(EventDrivenConsumerRoute.class, route);
             Channel channel = unwrapChannel(consumer.getProcessor());
 
-            assertIsInstanceOf(RecipientList.class, 
channel.getNextProcessor());
+            Pipeline line = assertIsInstanceOf(Pipeline.class, 
channel.getNextProcessor());
+            Iterator it = line.getProcessors().iterator();
+
+            // EvaluateExpressionProcessor should be wrapped in error handler
+            Object first = it.next();
+            first = assertIsInstanceOf(DeadLetterChannel.class, 
first).getOutput();
+            assertIsInstanceOf(EvaluateExpressionProcessor.class, first);
+
+            // and the second should NOT be wrapped in error handler
+            Object second = it.next();
+            assertIsInstanceOf(RecipientList.class, second);
         }
     }
 

Added: 
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/EvaluateExpressionProcessorTest.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/EvaluateExpressionProcessorTest.java?rev=1175313&view=auto
==============================================================================
--- 
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/EvaluateExpressionProcessorTest.java
 (added)
+++ 
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/EvaluateExpressionProcessorTest.java
 Sun Sep 25 07:33:36 2011
@@ -0,0 +1,76 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor;
+
+import org.apache.camel.CamelExecutionException;
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.Exchange;
+import org.apache.camel.Expression;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+
+/**
+ *
+ */
+public class EvaluateExpressionProcessorTest extends ContextTestSupport {
+
+    public void testOk() throws Exception {
+        MockEndpoint mock = getMockEndpoint("mock:result");
+        mock.expectedBodiesReceived("World");
+        mock.expectedPropertyReceived(Exchange.EVALUATE_EXPRESSION_RESULT, 
"Hello World");
+
+        template.sendBody("direct:start", "World");
+
+        assertMockEndpointsSatisfied();
+    }
+
+    public void testFail() throws Exception {
+        MockEndpoint mock = getMockEndpoint("mock:result");
+        mock.expectedMessageCount(0);
+
+        try {
+            template.sendBody("direct:fail", "World");
+            fail("Should have thrown exception");
+        } catch (CamelExecutionException e) {
+            assertIsInstanceOf(IllegalArgumentException.class, e.getCause());
+            assertEquals("Forced", e.getCause().getMessage());
+        }
+
+        assertMockEndpointsSatisfied();
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                from("direct:start")
+                    .process(new 
EvaluateExpressionProcessor(body().prepend("Hello ")))
+                    .to("mock:result");
+
+                from("direct:fail")
+                    .process(new EvaluateExpressionProcessor(new Expression() {
+                        @Override
+                        public <T> T evaluate(Exchange exchange, Class<T> 
type) {
+                            throw new IllegalArgumentException("Forced");
+                        }
+                    }))
+                    .to("mock:result");
+            }
+        };
+    }
+}

Added: 
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/RecipientListThrowExceptionFromExpressionTest.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/RecipientListThrowExceptionFromExpressionTest.java?rev=1175313&view=auto
==============================================================================
--- 
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/RecipientListThrowExceptionFromExpressionTest.java
 (added)
+++ 
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/RecipientListThrowExceptionFromExpressionTest.java
 Sun Sep 25 07:33:36 2011
@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor;
+
+import java.util.List;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.Exchange;
+import org.apache.camel.ExpressionEvaluationException;
+import org.apache.camel.builder.RouteBuilder;
+
+/**
+ *
+ */
+public class RecipientListThrowExceptionFromExpressionTest extends 
ContextTestSupport {
+
+    public void testRecipientListAndVerifyException() throws Exception {
+        getMockEndpoint("mock:error").expectedMessageCount(1);
+        getMockEndpoint("mock:result").expectedMessageCount(0);
+
+        template.sendBody("direct:start", "Hello World");
+
+        assertMockEndpointsSatisfied();
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                onException(ExpressionEvaluationException.class)
+                    .handled(true)
+                    .to("mock://error");
+
+                from("direct://start")
+                    .to("log:foo")
+                    
.recipientList().method(RecipientListThrowExceptionFromExpressionTest.class, 
"sendTo")
+                        .to("mock://result")
+                    .end();
+            }
+        };
+    }
+
+    public List<String> sendTo(Exchange exchange) throws 
ExpressionEvaluationException {
+        throw new ExpressionEvaluationException(null, exchange, null);
+    }
+
+}


Reply via email to