Author: ningjiang
Date: Tue Nov  4 00:09:44 2008
New Revision: 711206

URL: http://svn.apache.org/viewvc?rev=711206&view=rev
Log:
CAMEL-1051 Get HandleFaultProcessor to work with AsyncProcessor

Modified:
    
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/HandleFaultProcessor.java

Modified: 
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/HandleFaultProcessor.java
URL: 
http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/HandleFaultProcessor.java?rev=711206&r1=711205&r2=711206&view=diff
==============================================================================
--- 
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/HandleFaultProcessor.java
 (original)
+++ 
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/HandleFaultProcessor.java
 Tue Nov  4 00:09:44 2008
@@ -16,15 +16,60 @@
  */
 package org.apache.camel.processor;
 
+import org.apache.camel.AsyncCallback;
+import org.apache.camel.AsyncProcessor;
 import org.apache.camel.CamelException;
 import org.apache.camel.Exchange;
 import org.apache.camel.Message;
+import org.apache.camel.util.AsyncProcessorHelper;
 
-public class HandleFaultProcessor extends DelegateProcessor {
+public class HandleFaultProcessor extends DelegateProcessor implements 
AsyncProcessor {
+    
+    @Override
+    public String toString() {
+        return "HandleFaultProcessor(" + processor + ")";
+    }
 
     @Override
     public void process(Exchange exchange) throws Exception {
-        super.process(exchange);
+        AsyncProcessorHelper.process(this, exchange);
+    }
+
+    public boolean process(final Exchange exchange, final AsyncCallback 
callback) {
+        if (processor == null) {
+            // no processor so nothing to process, so return
+            callback.done(true);
+            return true;
+        }      
+
+        if (processor instanceof AsyncProcessor) {
+            return ((AsyncProcessor)processor).process(exchange, new 
AsyncCallback() {
+                
+                public void done(boolean doneSynchronously) {
+                    callback.done(doneSynchronously);
+                    Message faultMessage = exchange.getFault(false);
+                    if (faultMessage != null) {
+                        final Object faultBody = faultMessage.getBody();
+                        if (faultBody != null) {
+                            faultMessage.setBody(null); // Reset it since we 
are handling it.
+                            if (faultBody instanceof Throwable) {
+                                exchange.setException((Throwable)faultBody);
+                            } else {
+                                exchange.setException(new 
CamelException("Message contains fault of type "
+                                    + faultBody.getClass().getName() + ":\n" + 
faultBody));
+                            }
+                        }
+                    }                    
+                }                
+            });
+        }
+        
+        try {
+            processor.process(exchange);
+        } catch (Throwable e) {
+            exchange.setException(e);
+        }
+        
         final Message faultMessage = exchange.getFault(false);
         if (faultMessage != null) {
             final Object faultBody = faultMessage.getBody();
@@ -38,5 +83,7 @@
                 }
             }
         }
+        callback.done(true);
+        return true;
     }
 }


Reply via email to