Author: hadrian
Date: Tue Oct 21 21:34:22 2008
New Revision: 706857

URL: http://svn.apache.org/viewvc?rev=706857&view=rev
Log:
CAMEL-901.  Patch applied with thanks to Claus!

Modified:
    
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/DeadLetterChannel.java
    
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Pipeline.java
    
activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/issues/BelasThreadErrorHandlerIssue901Test.java

Modified: 
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/DeadLetterChannel.java
URL: 
http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/DeadLetterChannel.java?rev=706857&r1=706856&r2=706857&view=diff
==============================================================================
--- 
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/DeadLetterChannel.java
 (original)
+++ 
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/DeadLetterChannel.java
 Tue Oct 21 21:34:22 2008
@@ -176,7 +176,9 @@
                         return;
                     }
                     data.sync = false;
-                    if (exchange.getException() != null) {
+                    // only process if the exchange hasn't failed
+                    // and it has not been handled by the error processor
+                    if (exchange.getException() != null && 
!isFailureHandled(exchange)) {
                         process(exchange, callback, data);
                     } else {
                         callback.done(sync);

Modified: 
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Pipeline.java
URL: 
http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Pipeline.java?rev=706857&r1=706856&r2=706857&view=diff
==============================================================================
--- 
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Pipeline.java
 (original)
+++ 
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Pipeline.java
 Tue Oct 21 21:34:22 2008
@@ -62,16 +62,15 @@
         Exchange nextExchange = original;
         boolean first = true;
         while (true) {
-            boolean handledException = Boolean.TRUE.equals(
-                    
nextExchange.getProperty(Exchange.EXCEPTION_HANDLED_PROPERTY));
-            if (nextExchange.isFailed() || handledException) {
+            boolean exceptionHandled = hasExceptionBeenHandled(nextExchange);
+            if (nextExchange.isFailed() || exceptionHandled) {
                 // The Exchange.EXCEPTION_HANDLED_PROPERTY property is only 
set if satisfactory handling was done 
                 //  by the error handler.  It's still an exception, the 
exchange still failed.
                 if (LOG.isDebugEnabled()) {
                     LOG.debug("Message exchange has failed so breaking out of 
pipeline: " + nextExchange
                               + " exception: " + nextExchange.getException() + 
" fault: "
                               + nextExchange.getFault(false)
-                              + (handledException ? " handled by the error 
handler" : ""));
+                              + (exceptionHandled ? " handled by the error 
handler" : ""));
                 }
                 break;
             }
@@ -97,17 +96,25 @@
 
         // If we get here then the pipeline was processed entirely
         // synchronously.
+        if (LOG.isTraceEnabled()) {
+            // logging nextExchange as it contains the exchange that might 
have altered the payload and since
+            // we are logging the completion if will be confusing if we log 
the original instead
+            // we could also consider logging the original and the 
nextExchange then we have *before* and *after* snapshots
+            LOG.trace("Processing compelete for exchangeId: " + 
original.getExchangeId() + " >>> " + nextExchange);
+        }
         ExchangeHelper.copyResults(original, nextExchange);
         callback.done(true);
         return true;
     }
 
     private boolean process(final Exchange original, final Exchange exchange, 
final AsyncCallback callback, final Iterator<Processor> processors, 
AsyncProcessor processor) {
+        if (LOG.isTraceEnabled()) {
+            // this does the actual processing so log at trace level
+            LOG.trace("Processing exchangeId: " + exchange.getExchangeId() + " 
>>> " + exchange);
+        }
         return processor.process(exchange, new AsyncCallback() {
             public void done(boolean sync) {
-
-                // We only have to handle async completion of
-                // the pipeline..
+                // We only have to handle async completion of the pipeline..
                 if (sync) {
                     return;
                 }
@@ -117,10 +124,15 @@
                 while (processors.hasNext()) {
                     AsyncProcessor processor = 
AsyncProcessorTypeConverter.convert(processors.next());
 
-                    if (nextExchange.isFailed()) {
+                    boolean exceptionHandled = 
hasExceptionBeenHandled(nextExchange);
+                    if (nextExchange.isFailed() || exceptionHandled) {
+                        // The Exchange.EXCEPTION_HANDLED_PROPERTY property is 
only set if satisfactory handling was done
+                        //  by the error handler.  It's still an exception, 
the exchange still failed.
                         if (LOG.isDebugEnabled()) {
-                            LOG.debug("Message exchange has failed so breaking 
out of pipeline: " + nextExchange + " exception: " + 
nextExchange.getException() + " fault: "
-                                    + nextExchange.getFault(false));
+                            LOG.debug("Message exchange has failed so breaking 
out of pipeline: " + nextExchange
+                                      + " exception: " + 
nextExchange.getException() + " fault: "
+                                      + nextExchange.getFault(false)
+                                      + (exceptionHandled ? " handled by the 
error handler" : ""));
                         }
                         break;
                     }
@@ -138,8 +150,15 @@
         });
     }
 
+
+    private static boolean hasExceptionBeenHandled(Exchange nextExchange) {
+        return 
Boolean.TRUE.equals(nextExchange.getProperty(Exchange.EXCEPTION_HANDLED_PROPERTY));
+    }
+
     /**
      * Strategy method to create the next exchange from the previous exchange.
+     * <p/>
+     * Remember to copy the original exchange id otherwise correlation of ids 
in the log is a problem
      *
      * @param producer         the producer used to send to the endpoint
      * @param previousExchange the previous exchange
@@ -147,6 +166,12 @@
      */
     protected Exchange createNextExchange(Processor producer, Exchange 
previousExchange) {
         Exchange answer = previousExchange.newInstance();
+        // we must use the same id as this is a snapshot strategy where Camel 
copies a snapshot
+        // before processing the next step in the pipeline, so we have a 
snapshot of the exchange
+        // just before. This snapshot is used if Camel should do redeliveries 
(re try) using
+        // DeadLetterChannel. That is why it's important the id is the same, 
as it is the *same*
+        // exchange being routed.
+        answer.setExchangeId(previousExchange.getExchangeId());
 
         answer.getProperties().putAll(previousExchange.getProperties());
 

Modified: 
activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/issues/BelasThreadErrorHandlerIssue901Test.java
URL: 
http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/issues/BelasThreadErrorHandlerIssue901Test.java?rev=706857&r1=706856&r2=706857&view=diff
==============================================================================
--- 
activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/issues/BelasThreadErrorHandlerIssue901Test.java
 (original)
+++ 
activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/issues/BelasThreadErrorHandlerIssue901Test.java
 Tue Oct 21 21:34:22 2008
@@ -19,6 +19,7 @@
 import org.apache.camel.ContextTestSupport;
 import org.apache.camel.Exchange;
 import org.apache.camel.Processor;
+import org.apache.camel.RuntimeCamelException;
 import org.apache.camel.builder.RouteBuilder;
 import org.apache.camel.component.mock.MockEndpoint;
 import org.apache.commons.logging.Log;
@@ -45,14 +46,14 @@
 
     public void testThreadErrorHandlerLogging() throws Exception {
         MockEndpoint handled = getMockEndpoint("mock:handled");
-
-        template.sendBody("seda:errorTest", msg1);
-
-        handled.expectedMessageCount(1);
         handled.expectedBodiesReceived(msg3);
 
-        // TODO: Enable this when looking into this issue
-        //Thread.sleep(3000);
+        try {
+            template.sendBody("direct:errorTest", msg1);
+            fail("Should have thrown a MyBelaException");
+        } catch (RuntimeCamelException e) {
+            assertTrue(e.getCause() instanceof MyBelaException);
+        }
 
         assertMockEndpointsSatisfied();
 
@@ -64,18 +65,10 @@
     protected RouteBuilder createRouteBuilder() throws Exception {
         return new RouteBuilder() {
             public void configure() throws Exception {
-                //getContext().addInterceptStrategy(new Tracer());
                 
errorHandler(deadLetterChannel("mock:handled").maximumRedeliveries(redelivery));
 
-                // using the onException and handled(true) works
-                
//onException(Exception.class).maximumRedeliveries(redelivery).handled(true).to("mock:handled");
-                
-                from("seda:errorTest")
-                    // TODO: When using thread there is a multi threading / 
concurreny issue in Camel
-                    // hard to debug as it tend only to surface when unit test 
is running really fast
-                    // (no break points)
-
-                    //.thread(5).maxSize(5)
+                from("direct:errorTest")
+                    .thread(5)
                     // Processor #1
                     .process(new Processor() {
                         public void process(Exchange exchange) throws 
Exception {
@@ -95,14 +88,19 @@
                     // Processor #3
                     .process(new Processor() {
                         public void process(Exchange exchange) throws 
Exception {
-                            //Thread.sleep(100);
                             callCounter3++;
                             LOG.debug("Processor #3 Received A " + 
exchange.getIn().getBody());
-                            throw new Exception("Forced exception by unit 
test");
+                            throw new MyBelaException("Forced exception by 
unit test");
                         }
                     });
             }
         };
     }
 
+    public static class MyBelaException extends Exception {
+        public MyBelaException(String message) {
+            super(message);
+        }
+    }
+
 }


Reply via email to