Author: ningjiang
Date: Tue Nov 4 00:09:44 2008
New Revision: 711206
URL: http://svn.apache.org/viewvc?rev=711206&view=rev
Log:
CAMEL-1051 Get HandleFaultProcessor to work with AsyncProcessor
Modified:
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/HandleFaultProcessor.java
Modified:
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/HandleFaultProcessor.java
URL:
http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/HandleFaultProcessor.java?rev=711206&r1=711205&r2=711206&view=diff
==============================================================================
---
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/HandleFaultProcessor.java
(original)
+++
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/HandleFaultProcessor.java
Tue Nov 4 00:09:44 2008
@@ -16,15 +16,60 @@
*/
package org.apache.camel.processor;
+import org.apache.camel.AsyncCallback;
+import org.apache.camel.AsyncProcessor;
import org.apache.camel.CamelException;
import org.apache.camel.Exchange;
import org.apache.camel.Message;
+import org.apache.camel.util.AsyncProcessorHelper;
-public class HandleFaultProcessor extends DelegateProcessor {
+public class HandleFaultProcessor extends DelegateProcessor implements
AsyncProcessor {
+
+ @Override
+ public String toString() {
+ return "HandleFaultProcessor(" + processor + ")";
+ }
@Override
public void process(Exchange exchange) throws Exception {
- super.process(exchange);
+ AsyncProcessorHelper.process(this, exchange);
+ }
+
+ public boolean process(final Exchange exchange, final AsyncCallback
callback) {
+ if (processor == null) {
+ // no processor so nothing to process, so return
+ callback.done(true);
+ return true;
+ }
+
+ if (processor instanceof AsyncProcessor) {
+ return ((AsyncProcessor)processor).process(exchange, new
AsyncCallback() {
+
+ public void done(boolean doneSynchronously) {
+ callback.done(doneSynchronously);
+ Message faultMessage = exchange.getFault(false);
+ if (faultMessage != null) {
+ final Object faultBody = faultMessage.getBody();
+ if (faultBody != null) {
+ faultMessage.setBody(null); // Reset it since we
are handling it.
+ if (faultBody instanceof Throwable) {
+ exchange.setException((Throwable)faultBody);
+ } else {
+ exchange.setException(new
CamelException("Message contains fault of type "
+ + faultBody.getClass().getName() + ":\n" +
faultBody));
+ }
+ }
+ }
+ }
+ });
+ }
+
+ try {
+ processor.process(exchange);
+ } catch (Throwable e) {
+ exchange.setException(e);
+ }
+
final Message faultMessage = exchange.getFault(false);
if (faultMessage != null) {
final Object faultBody = faultMessage.getBody();
@@ -38,5 +83,7 @@
}
}
}
+ callback.done(true);
+ return true;
}
}