Author: hadrian
Date: Tue Oct 21 21:34:22 2008
New Revision: 706857
URL: http://svn.apache.org/viewvc?rev=706857&view=rev
Log:
CAMEL-901. Patch applied with thanks to Claus!
Modified:
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/DeadLetterChannel.java
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Pipeline.java
activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/issues/BelasThreadErrorHandlerIssue901Test.java
Modified:
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/DeadLetterChannel.java
URL:
http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/DeadLetterChannel.java?rev=706857&r1=706856&r2=706857&view=diff
==============================================================================
---
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/DeadLetterChannel.java
(original)
+++
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/DeadLetterChannel.java
Tue Oct 21 21:34:22 2008
@@ -176,7 +176,9 @@
return;
}
data.sync = false;
- if (exchange.getException() != null) {
+ // only process if the exchange hasn't failed
+ // and it has not been handled by the error processor
+ if (exchange.getException() != null &&
!isFailureHandled(exchange)) {
process(exchange, callback, data);
} else {
callback.done(sync);
Modified:
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Pipeline.java
URL:
http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Pipeline.java?rev=706857&r1=706856&r2=706857&view=diff
==============================================================================
---
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Pipeline.java
(original)
+++
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Pipeline.java
Tue Oct 21 21:34:22 2008
@@ -62,16 +62,15 @@
Exchange nextExchange = original;
boolean first = true;
while (true) {
- boolean handledException = Boolean.TRUE.equals(
-
nextExchange.getProperty(Exchange.EXCEPTION_HANDLED_PROPERTY));
- if (nextExchange.isFailed() || handledException) {
+ boolean exceptionHandled = hasExceptionBeenHandled(nextExchange);
+ if (nextExchange.isFailed() || exceptionHandled) {
// The Exchange.EXCEPTION_HANDLED_PROPERTY property is only
set if satisfactory handling was done
// by the error handler. It's still an exception, the
exchange still failed.
if (LOG.isDebugEnabled()) {
LOG.debug("Message exchange has failed so breaking out of
pipeline: " + nextExchange
+ " exception: " + nextExchange.getException() +
" fault: "
+ nextExchange.getFault(false)
- + (handledException ? " handled by the error
handler" : ""));
+ + (exceptionHandled ? " handled by the error
handler" : ""));
}
break;
}
@@ -97,17 +96,25 @@
// If we get here then the pipeline was processed entirely
// synchronously.
+ if (LOG.isTraceEnabled()) {
+ // logging nextExchange as it contains the exchange that might
have altered the payload and since
+ // we are logging the completion if will be confusing if we log
the original instead
+ // we could also consider logging the original and the
nextExchange then we have *before* and *after* snapshots
+ LOG.trace("Processing compelete for exchangeId: " +
original.getExchangeId() + " >>> " + nextExchange);
+ }
ExchangeHelper.copyResults(original, nextExchange);
callback.done(true);
return true;
}
private boolean process(final Exchange original, final Exchange exchange,
final AsyncCallback callback, final Iterator<Processor> processors,
AsyncProcessor processor) {
+ if (LOG.isTraceEnabled()) {
+ // this does the actual processing so log at trace level
+ LOG.trace("Processing exchangeId: " + exchange.getExchangeId() + "
>>> " + exchange);
+ }
return processor.process(exchange, new AsyncCallback() {
public void done(boolean sync) {
-
- // We only have to handle async completion of
- // the pipeline..
+ // We only have to handle async completion of the pipeline..
if (sync) {
return;
}
@@ -117,10 +124,15 @@
while (processors.hasNext()) {
AsyncProcessor processor =
AsyncProcessorTypeConverter.convert(processors.next());
- if (nextExchange.isFailed()) {
+ boolean exceptionHandled =
hasExceptionBeenHandled(nextExchange);
+ if (nextExchange.isFailed() || exceptionHandled) {
+ // The Exchange.EXCEPTION_HANDLED_PROPERTY property is
only set if satisfactory handling was done
+ // by the error handler. It's still an exception,
the exchange still failed.
if (LOG.isDebugEnabled()) {
- LOG.debug("Message exchange has failed so breaking
out of pipeline: " + nextExchange + " exception: " +
nextExchange.getException() + " fault: "
- + nextExchange.getFault(false));
+ LOG.debug("Message exchange has failed so breaking
out of pipeline: " + nextExchange
+ + " exception: " +
nextExchange.getException() + " fault: "
+ + nextExchange.getFault(false)
+ + (exceptionHandled ? " handled by the
error handler" : ""));
}
break;
}
@@ -138,8 +150,15 @@
});
}
+
+ private static boolean hasExceptionBeenHandled(Exchange nextExchange) {
+ return
Boolean.TRUE.equals(nextExchange.getProperty(Exchange.EXCEPTION_HANDLED_PROPERTY));
+ }
+
/**
* Strategy method to create the next exchange from the previous exchange.
+ * <p/>
+ * Remember to copy the original exchange id otherwise correlation of ids
in the log is a problem
*
* @param producer the producer used to send to the endpoint
* @param previousExchange the previous exchange
@@ -147,6 +166,12 @@
*/
protected Exchange createNextExchange(Processor producer, Exchange
previousExchange) {
Exchange answer = previousExchange.newInstance();
+ // we must use the same id as this is a snapshot strategy where Camel
copies a snapshot
+ // before processing the next step in the pipeline, so we have a
snapshot of the exchange
+ // just before. This snapshot is used if Camel should do redeliveries
(re try) using
+ // DeadLetterChannel. That is why it's important the id is the same,
as it is the *same*
+ // exchange being routed.
+ answer.setExchangeId(previousExchange.getExchangeId());
answer.getProperties().putAll(previousExchange.getProperties());
Modified:
activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/issues/BelasThreadErrorHandlerIssue901Test.java
URL:
http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/issues/BelasThreadErrorHandlerIssue901Test.java?rev=706857&r1=706856&r2=706857&view=diff
==============================================================================
---
activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/issues/BelasThreadErrorHandlerIssue901Test.java
(original)
+++
activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/issues/BelasThreadErrorHandlerIssue901Test.java
Tue Oct 21 21:34:22 2008
@@ -19,6 +19,7 @@
import org.apache.camel.ContextTestSupport;
import org.apache.camel.Exchange;
import org.apache.camel.Processor;
+import org.apache.camel.RuntimeCamelException;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.component.mock.MockEndpoint;
import org.apache.commons.logging.Log;
@@ -45,14 +46,14 @@
public void testThreadErrorHandlerLogging() throws Exception {
MockEndpoint handled = getMockEndpoint("mock:handled");
-
- template.sendBody("seda:errorTest", msg1);
-
- handled.expectedMessageCount(1);
handled.expectedBodiesReceived(msg3);
- // TODO: Enable this when looking into this issue
- //Thread.sleep(3000);
+ try {
+ template.sendBody("direct:errorTest", msg1);
+ fail("Should have thrown a MyBelaException");
+ } catch (RuntimeCamelException e) {
+ assertTrue(e.getCause() instanceof MyBelaException);
+ }
assertMockEndpointsSatisfied();
@@ -64,18 +65,10 @@
protected RouteBuilder createRouteBuilder() throws Exception {
return new RouteBuilder() {
public void configure() throws Exception {
- //getContext().addInterceptStrategy(new Tracer());
errorHandler(deadLetterChannel("mock:handled").maximumRedeliveries(redelivery));
- // using the onException and handled(true) works
-
//onException(Exception.class).maximumRedeliveries(redelivery).handled(true).to("mock:handled");
-
- from("seda:errorTest")
- // TODO: When using thread there is a multi threading /
concurreny issue in Camel
- // hard to debug as it tend only to surface when unit test
is running really fast
- // (no break points)
-
- //.thread(5).maxSize(5)
+ from("direct:errorTest")
+ .thread(5)
// Processor #1
.process(new Processor() {
public void process(Exchange exchange) throws
Exception {
@@ -95,14 +88,19 @@
// Processor #3
.process(new Processor() {
public void process(Exchange exchange) throws
Exception {
- //Thread.sleep(100);
callCounter3++;
LOG.debug("Processor #3 Received A " +
exchange.getIn().getBody());
- throw new Exception("Forced exception by unit
test");
+ throw new MyBelaException("Forced exception by
unit test");
}
});
}
};
}
+ public static class MyBelaException extends Exception {
+ public MyBelaException(String message) {
+ super(message);
+ }
+ }
+
}