Author: davsclaus
Date: Sun Sep 25 07:33:36 2011
New Revision: 1175313
URL: http://svn.apache.org/viewvc?rev=1175313&view=rev
Log:
CAMEL-4484: Fixed recipient list EIP using custom expression which throws
exception, not being triggered by onException.
Added:
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/EvaluateExpressionProcessor.java
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/EvaluateExpressionProcessorTest.java
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/RecipientListThrowExceptionFromExpressionTest.java
Modified:
camel/trunk/camel-core/src/main/java/org/apache/camel/Exchange.java
camel/trunk/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinition.java
camel/trunk/camel-core/src/main/java/org/apache/camel/model/RecipientListDefinition.java
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/RecipientList.java
camel/trunk/camel-core/src/test/java/org/apache/camel/builder/RouteBuilderTest.java
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/Exchange.java
URL:
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/Exchange.java?rev=1175313&r1=1175312&r2=1175313&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/Exchange.java
(original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/Exchange.java Sun Sep
25 07:33:36 2011
@@ -104,8 +104,9 @@ public interface Exchange {
String DISABLE_HTTP_STREAM_CACHE = "CamelDisableHttpStreamCache";
String DUPLICATE_MESSAGE = "CamelDuplicateMessage";
- String ERRORHANDLER_HANDLED = "CamelErrorHandlerHandled";
- String EXCEPTION_CAUGHT = "CamelExceptionCaught";
+ String EXCEPTION_CAUGHT = "CamelExceptionCaught";
+ String EVALUATE_EXPRESSION_RESULT = "CamelEvaluateExpressionResult";
+ String ERRORHANDLER_HANDLED = "CamelErrorHandlerHandled";
String FAILURE_HANDLED = "CamelFailureHandled";
String FAILURE_ENDPOINT = "CamelFailureEndpoint";
Modified:
camel/trunk/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinition.java
URL:
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinition.java?rev=1175313&r1=1175312&r2=1175313&view=diff
==============================================================================
---
camel/trunk/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinition.java
(original)
+++
camel/trunk/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinition.java
Sun Sep 25 07:33:36 2011
@@ -261,8 +261,8 @@ public abstract class ProcessorDefinitio
// do not use error handler for recipient list as it offers fine
grained error handlers for its outputs
// however if share unit of work is enabled, we need to wrap an
error handler on the recipient list parent
RecipientListDefinition def = (RecipientListDefinition) defn;
- if (def.isShareUnitOfWork() && child == null) {
- // only wrap the parent (not the children of the multicast)
+ if (def.isShareUnitOfWork()) {
+ // note a recipient list cannot have children so no need for a
child == null check
wrapChannelInErrorHandler(channel, routeContext);
} else {
log.trace("{} is part of multicast/recipientList which have
special error handling so no error handler is applied", defn);
Modified:
camel/trunk/camel-core/src/main/java/org/apache/camel/model/RecipientListDefinition.java
URL:
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/model/RecipientListDefinition.java?rev=1175313&r1=1175312&r2=1175313&view=diff
==============================================================================
---
camel/trunk/camel-core/src/main/java/org/apache/camel/model/RecipientListDefinition.java
(original)
+++
camel/trunk/camel-core/src/main/java/org/apache/camel/model/RecipientListDefinition.java
Sun Sep 25 07:33:36 2011
@@ -16,6 +16,8 @@
*/
package org.apache.camel.model;
+import java.util.ArrayList;
+import java.util.List;
import java.util.concurrent.ExecutorService;
import javax.xml.bind.annotation.XmlAccessType;
@@ -27,6 +29,8 @@ import javax.xml.bind.annotation.XmlTran
import org.apache.camel.Expression;
import org.apache.camel.Processor;
import org.apache.camel.model.language.ExpressionDefinition;
+import org.apache.camel.processor.EvaluateExpressionProcessor;
+import org.apache.camel.processor.Pipeline;
import org.apache.camel.processor.RecipientList;
import org.apache.camel.processor.aggregate.AggregationStrategy;
import org.apache.camel.processor.aggregate.UseLatestAggregationStrategy;
@@ -97,7 +101,7 @@ public class RecipientListDefinition<Typ
@Override
public Processor createProcessor(RouteContext routeContext) throws
Exception {
- Expression expression = getExpression().createExpression(routeContext);
+ final Expression expression =
getExpression().createExpression(routeContext);
RecipientList answer;
if (delimiter != null) {
@@ -135,7 +139,29 @@ public class RecipientListDefinition<Typ
throw new IllegalArgumentException("Timeout is used but
ParallelProcessing has not been enabled.");
}
- return answer;
+ // create a pipeline with two processors
+ // the first is the eval processor which evaluates the expression to
use
+ // the second is the recipient list
+ List<Processor> pipe = new ArrayList<Processor>(2);
+
+ // the eval processor must be wrapped in error handler, so in case
there was an
+ // error during evaluation, the error handler can deal with it
+ // the recipient list is not in error handler, as its has its own
special error handling
+ // when sending to the recipients individually
+ Processor evalProcessor = new EvaluateExpressionProcessor(expression);
+ evalProcessor = super.wrapInErrorHandler(routeContext, evalProcessor);
+
+ pipe.add(evalProcessor);
+ pipe.add(answer);
+
+ // wrap in nested pipeline so this appears as one processor
+ // (threads definition does this as well)
+ return new Pipeline(routeContext.getCamelContext(), pipe) {
+ @Override
+ public String toString() {
+ return "RecipientList[" + expression + "]";
+ }
+ };
}
private AggregationStrategy createAggregationStrategy(RouteContext
routeContext) {
Added:
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/EvaluateExpressionProcessor.java
URL:
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/EvaluateExpressionProcessor.java?rev=1175313&view=auto
==============================================================================
---
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/EvaluateExpressionProcessor.java
(added)
+++
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/EvaluateExpressionProcessor.java
Sun Sep 25 07:33:36 2011
@@ -0,0 +1,69 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor;
+
+import org.apache.camel.AsyncCallback;
+import org.apache.camel.AsyncProcessor;
+import org.apache.camel.Exchange;
+import org.apache.camel.Expression;
+import org.apache.camel.Traceable;
+import org.apache.camel.util.AsyncProcessorHelper;
+
+/**
+ * A {@link org.apache.camel.Processor} which evaluates an {@link Expression}
+ * and stores the result as a property on the {@link Exchange} with the key
+ * {@link Exchange#EVALUATE_EXPRESSION_RESULT}.
+ * <p/>
+ * This processor will in case of evaluation exceptions, set the caused
exception
+ * on the {@link Exchange}.
+ */
+public class EvaluateExpressionProcessor implements AsyncProcessor, Traceable {
+
+ private final Expression expression;
+
+ public EvaluateExpressionProcessor(Expression expression) {
+ this.expression = expression;
+ }
+
+ @Override
+ public void process(Exchange exchange) throws Exception {
+ AsyncProcessorHelper.process(this, exchange);
+ }
+
+ @Override
+ public boolean process(Exchange exchange, AsyncCallback callback) {
+ try {
+ Object result = expression.evaluate(exchange, Object.class);
+ exchange.setProperty(Exchange.EVALUATE_EXPRESSION_RESULT, result);
+ } catch (Throwable e) {
+ exchange.setException(e);
+ } finally {
+ callback.done(true);
+ }
+ return true;
+ }
+
+ @Override
+ public String toString() {
+ return "EvalExpression[" + expression + "]";
+ }
+
+ public String getTraceLabel() {
+ return "eval[" + expression + "]";
+ }
+
+}
Modified:
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/RecipientList.java
URL:
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/RecipientList.java?rev=1175313&r1=1175312&r2=1175313&view=diff
==============================================================================
---
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/RecipientList.java
(original)
+++
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/RecipientList.java
Sun Sep 25 07:33:36 2011
@@ -101,7 +101,13 @@ public class RecipientList extends Servi
throw new IllegalStateException("RecipientList has not been
started: " + this);
}
- Object recipientList = expression.evaluate(exchange, Object.class);
+ // use the evaluate expression result if exists
+ Object recipientList =
exchange.removeProperty(Exchange.EVALUATE_EXPRESSION_RESULT);
+ if (recipientList == null && expression != null) {
+ // fallback and evaluate the expression
+ recipientList = expression.evaluate(exchange, Object.class);
+ }
+
return sendToRecipientList(exchange, recipientList, callback);
}
Modified:
camel/trunk/camel-core/src/test/java/org/apache/camel/builder/RouteBuilderTest.java
URL:
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/builder/RouteBuilderTest.java?rev=1175313&r1=1175312&r2=1175313&view=diff
==============================================================================
---
camel/trunk/camel-core/src/test/java/org/apache/camel/builder/RouteBuilderTest.java
(original)
+++
camel/trunk/camel-core/src/test/java/org/apache/camel/builder/RouteBuilderTest.java
Sun Sep 25 07:33:36 2011
@@ -17,6 +17,7 @@
package org.apache.camel.builder;
import java.util.ArrayList;
+import java.util.Iterator;
import java.util.List;
import org.apache.camel.CamelContext;
@@ -32,6 +33,7 @@ import org.apache.camel.impl.DefaultCame
import org.apache.camel.impl.EventDrivenConsumerRoute;
import org.apache.camel.processor.ChoiceProcessor;
import org.apache.camel.processor.DeadLetterChannel;
+import org.apache.camel.processor.EvaluateExpressionProcessor;
import org.apache.camel.processor.FilterProcessor;
import org.apache.camel.processor.MulticastProcessor;
import org.apache.camel.processor.Pipeline;
@@ -367,7 +369,17 @@ public class RouteBuilderTest extends Te
EventDrivenConsumerRoute consumer =
assertIsInstanceOf(EventDrivenConsumerRoute.class, route);
Channel channel = unwrapChannel(consumer.getProcessor());
- assertIsInstanceOf(RecipientList.class,
channel.getNextProcessor());
+ Pipeline line = assertIsInstanceOf(Pipeline.class,
channel.getNextProcessor());
+ Iterator it = line.getProcessors().iterator();
+
+ // EvaluateExpressionProcessor should be wrapped in error handler
+ Object first = it.next();
+ first = assertIsInstanceOf(DeadLetterChannel.class,
first).getOutput();
+ assertIsInstanceOf(EvaluateExpressionProcessor.class, first);
+
+ // and the second should NOT be wrapped in error handler
+ Object second = it.next();
+ assertIsInstanceOf(RecipientList.class, second);
}
}
Added:
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/EvaluateExpressionProcessorTest.java
URL:
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/EvaluateExpressionProcessorTest.java?rev=1175313&view=auto
==============================================================================
---
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/EvaluateExpressionProcessorTest.java
(added)
+++
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/EvaluateExpressionProcessorTest.java
Sun Sep 25 07:33:36 2011
@@ -0,0 +1,76 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor;
+
+import org.apache.camel.CamelExecutionException;
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.Exchange;
+import org.apache.camel.Expression;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+
+/**
+ *
+ */
+public class EvaluateExpressionProcessorTest extends ContextTestSupport {
+
+ public void testOk() throws Exception {
+ MockEndpoint mock = getMockEndpoint("mock:result");
+ mock.expectedBodiesReceived("World");
+ mock.expectedPropertyReceived(Exchange.EVALUATE_EXPRESSION_RESULT,
"Hello World");
+
+ template.sendBody("direct:start", "World");
+
+ assertMockEndpointsSatisfied();
+ }
+
+ public void testFail() throws Exception {
+ MockEndpoint mock = getMockEndpoint("mock:result");
+ mock.expectedMessageCount(0);
+
+ try {
+ template.sendBody("direct:fail", "World");
+ fail("Should have thrown exception");
+ } catch (CamelExecutionException e) {
+ assertIsInstanceOf(IllegalArgumentException.class, e.getCause());
+ assertEquals("Forced", e.getCause().getMessage());
+ }
+
+ assertMockEndpointsSatisfied();
+ }
+
+ @Override
+ protected RouteBuilder createRouteBuilder() throws Exception {
+ return new RouteBuilder() {
+ @Override
+ public void configure() throws Exception {
+ from("direct:start")
+ .process(new
EvaluateExpressionProcessor(body().prepend("Hello ")))
+ .to("mock:result");
+
+ from("direct:fail")
+ .process(new EvaluateExpressionProcessor(new Expression() {
+ @Override
+ public <T> T evaluate(Exchange exchange, Class<T>
type) {
+ throw new IllegalArgumentException("Forced");
+ }
+ }))
+ .to("mock:result");
+ }
+ };
+ }
+}
Added:
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/RecipientListThrowExceptionFromExpressionTest.java
URL:
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/RecipientListThrowExceptionFromExpressionTest.java?rev=1175313&view=auto
==============================================================================
---
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/RecipientListThrowExceptionFromExpressionTest.java
(added)
+++
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/RecipientListThrowExceptionFromExpressionTest.java
Sun Sep 25 07:33:36 2011
@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor;
+
+import java.util.List;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.Exchange;
+import org.apache.camel.ExpressionEvaluationException;
+import org.apache.camel.builder.RouteBuilder;
+
+/**
+ *
+ */
+public class RecipientListThrowExceptionFromExpressionTest extends
ContextTestSupport {
+
+ public void testRecipientListAndVerifyException() throws Exception {
+ getMockEndpoint("mock:error").expectedMessageCount(1);
+ getMockEndpoint("mock:result").expectedMessageCount(0);
+
+ template.sendBody("direct:start", "Hello World");
+
+ assertMockEndpointsSatisfied();
+ }
+
+ @Override
+ protected RouteBuilder createRouteBuilder() throws Exception {
+ return new RouteBuilder() {
+ @Override
+ public void configure() throws Exception {
+ onException(ExpressionEvaluationException.class)
+ .handled(true)
+ .to("mock://error");
+
+ from("direct://start")
+ .to("log:foo")
+
.recipientList().method(RecipientListThrowExceptionFromExpressionTest.class,
"sendTo")
+ .to("mock://result")
+ .end();
+ }
+ };
+ }
+
+ public List<String> sendTo(Exchange exchange) throws
ExpressionEvaluationException {
+ throw new ExpressionEvaluationException(null, exchange, null);
+ }
+
+}