[CAMEL-6364] Improve processor wrapping for Try/Catch/Finally
Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/66939043 Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/66939043 Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/66939043 Branch: refs/heads/master Commit: 66939043e3acc33f522440166c819cd62cf98c9b Parents: d824bce Author: Guillaume Nodet <[email protected]> Authored: Fri May 17 11:33:17 2013 +0200 Committer: Guillaume Nodet <[email protected]> Committed: Fri May 17 13:33:02 2013 +0200 ---------------------------------------------------------------------- .../src/main/java/org/apache/camel/Exchange.java | 1 + .../org/apache/camel/model/FinallyDefinition.java | 3 +- .../java/org/apache/camel/model/TryDefinition.java | 16 +- .../org/apache/camel/processor/AOPProcessor.java | 2 +- .../org/apache/camel/processor/CatchProcessor.java | 67 ++++- .../apache/camel/processor/FinallyProcessor.java | 83 +++++ .../org/apache/camel/processor/TryProcessor.java | 282 ++------------- 7 files changed, 187 insertions(+), 267 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/66939043/camel-core/src/main/java/org/apache/camel/Exchange.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/Exchange.java b/camel-core/src/main/java/org/apache/camel/Exchange.java index 3420e73..1c2e1bc 100644 --- a/camel-core/src/main/java/org/apache/camel/Exchange.java +++ b/camel-core/src/main/java/org/apache/camel/Exchange.java @@ -106,6 +106,7 @@ public interface Exchange { String DUPLICATE_MESSAGE = "CamelDuplicateMessage"; String EXCEPTION_CAUGHT = "CamelExceptionCaught"; + String EXCEPTION_HANDLED = "CamelExceptionHandled"; String EVALUATE_EXPRESSION_RESULT = "CamelEvaluateExpressionResult"; String ERRORHANDLER_HANDLED = "CamelErrorHandlerHandled"; String EXTERNAL_REDELIVERED = "CamelExternalRedelivered"; http://git-wip-us.apache.org/repos/asf/camel/blob/66939043/camel-core/src/main/java/org/apache/camel/model/FinallyDefinition.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/model/FinallyDefinition.java b/camel-core/src/main/java/org/apache/camel/model/FinallyDefinition.java index defa7e3..836ef62 100644 --- a/camel-core/src/main/java/org/apache/camel/model/FinallyDefinition.java +++ b/camel-core/src/main/java/org/apache/camel/model/FinallyDefinition.java @@ -21,6 +21,7 @@ import javax.xml.bind.annotation.XmlAccessorType; import javax.xml.bind.annotation.XmlRootElement; import org.apache.camel.Processor; +import org.apache.camel.processor.FinallyProcessor; import org.apache.camel.spi.RouteContext; /** @@ -55,6 +56,6 @@ public class FinallyDefinition extends OutputDefinition<FinallyDefinition> { } // do finally does mandate a child processor - return this.createChildProcessor(routeContext, true); + return new FinallyProcessor(this.createChildProcessor(routeContext, false)); } } http://git-wip-us.apache.org/repos/asf/camel/blob/66939043/camel-core/src/main/java/org/apache/camel/model/TryDefinition.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/model/TryDefinition.java b/camel-core/src/main/java/org/apache/camel/model/TryDefinition.java index d64a7e6..6f824fe 100644 --- a/camel-core/src/main/java/org/apache/camel/model/TryDefinition.java +++ b/camel-core/src/main/java/org/apache/camel/model/TryDefinition.java @@ -76,18 +76,20 @@ public class TryDefinition extends OutputDefinition<TryDefinition> { throw new IllegalArgumentException("Definition has no children on " + this); } - Processor finallyProcessor = null; - if (finallyClause != null) { - finallyProcessor = createProcessor(routeContext, finallyClause); - } - - List<CatchProcessor> catchProcessors = new ArrayList<CatchProcessor>(); + List<Processor> catchProcessors = new ArrayList<Processor>(); if (catchClauses != null) { for (CatchDefinition catchClause : catchClauses) { - catchProcessors.add(catchClause.createProcessor(routeContext)); + catchProcessors.add(createProcessor(routeContext, catchClause)); } } + FinallyDefinition finallyDefinition = finallyClause; + if (finallyDefinition == null) { + finallyDefinition = new FinallyDefinition(); + finallyDefinition.setParent(this); + } + Processor finallyProcessor = createProcessor(routeContext, finallyDefinition); + // must have either a catch or finally if (finallyClause == null && catchClauses == null) { throw new IllegalArgumentException("doTry must have one or more catch or finally blocks on " + this); http://git-wip-us.apache.org/repos/asf/camel/blob/66939043/camel-core/src/main/java/org/apache/camel/processor/AOPProcessor.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/processor/AOPProcessor.java b/camel-core/src/main/java/org/apache/camel/processor/AOPProcessor.java index ceaf333..2ac5ca3 100644 --- a/camel-core/src/main/java/org/apache/camel/processor/AOPProcessor.java +++ b/camel-core/src/main/java/org/apache/camel/processor/AOPProcessor.java @@ -25,7 +25,7 @@ import org.apache.camel.Processor; */ public class AOPProcessor extends TryProcessor { - public AOPProcessor(Processor tryProcessor, List<CatchProcessor> catchClauses, Processor finallyProcessor) { + public AOPProcessor(Processor tryProcessor, List<Processor> catchClauses, Processor finallyProcessor) { super(tryProcessor, catchClauses, finallyProcessor); } http://git-wip-us.apache.org/repos/asf/camel/blob/66939043/camel-core/src/main/java/org/apache/camel/processor/CatchProcessor.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/processor/CatchProcessor.java b/camel-core/src/main/java/org/apache/camel/processor/CatchProcessor.java index b897ace..20fcb29 100644 --- a/camel-core/src/main/java/org/apache/camel/processor/CatchProcessor.java +++ b/camel-core/src/main/java/org/apache/camel/processor/CatchProcessor.java @@ -19,11 +19,15 @@ package org.apache.camel.processor; import java.util.Iterator; import java.util.List; +import org.apache.camel.AsyncCallback; import org.apache.camel.Exchange; import org.apache.camel.Predicate; import org.apache.camel.Processor; import org.apache.camel.Traceable; +import org.apache.camel.util.ExchangeHelper; import org.apache.camel.util.ObjectHelper; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * A processor which catches exceptions. @@ -31,6 +35,8 @@ import org.apache.camel.util.ObjectHelper; * @version */ public class CatchProcessor extends DelegateAsyncProcessor implements Traceable { + private static final transient Logger LOG = LoggerFactory.getLogger(CatchProcessor.class); + private final List<Class<? extends Throwable>> exceptions; private final Predicate onWhen; private final Predicate handled; @@ -51,9 +57,63 @@ public class CatchProcessor extends DelegateAsyncProcessor implements Traceable return "catch"; } + @Override + public boolean process(final Exchange exchange, final AsyncCallback callback) { + Exception e = exchange.getException(); + Throwable caught = catches(exchange, e); + // If a previous catch clause handled the exception or if this clause does not match, exit + if (exchange.getProperty(Exchange.EXCEPTION_HANDLED) != null || caught == null) { + callback.done(true); + return true; + } + if (LOG.isTraceEnabled()) { + LOG.trace("This CatchProcessor catches the exception: {} caused by: {}", caught.getClass().getName(), e.getMessage()); + } + + // store the last to endpoint as the failure endpoint + if (exchange.getProperty(Exchange.FAILURE_ENDPOINT) == null) { + exchange.setProperty(Exchange.FAILURE_ENDPOINT, exchange.getProperty(Exchange.TO_ENDPOINT)); + } + // give the rest of the pipeline another chance + exchange.setProperty(Exchange.EXCEPTION_HANDLED, true); + exchange.setProperty(Exchange.EXCEPTION_CAUGHT, e); + exchange.setException(null); + // and we should not be regarded as exhausted as we are in a try .. catch block + exchange.removeProperty(Exchange.REDELIVERY_EXHAUSTED); + + // is the exception handled by the catch clause + final boolean handled = handles(exchange); + + if (LOG.isDebugEnabled()) { + LOG.debug("The exception is handled: {} for the exception: {} caused by: {}", + new Object[]{handled, e.getClass().getName(), e.getMessage()}); + } + + boolean sync = super.processNext(exchange, new AsyncCallback() { + public void done(boolean doneSync) { + if (!handled) { + if (exchange.getException() == null) { + exchange.setException(exchange.getProperty(Exchange.EXCEPTION_CAUGHT, Exception.class)); + } + } + // always clear redelivery exhausted in a catch clause + exchange.removeProperty(Exchange.REDELIVERY_EXHAUSTED); + + if (!doneSync) { + // signal callback to continue routing async + ExchangeHelper.prepareOutToIn(exchange); + } + + callback.done(doneSync); + } + }); + + return sync; + } + /** * Returns with the exception that is caught by this processor. - * + * * This method traverses exception causes, so sometimes the exception * returned from this method might be one of causes of the parameter * passed. @@ -62,7 +122,7 @@ public class CatchProcessor extends DelegateAsyncProcessor implements Traceable * @param exception the thrown exception * @return Throwable that this processor catches. <tt>null</tt> if nothing matches. */ - public Throwable catches(Exchange exchange, Throwable exception) { + protected Throwable catches(Exchange exchange, Throwable exception) { // use the exception iterator to walk the caused by hierarchy Iterator<Throwable> it = ObjectHelper.createExceptionIterator(exception); while (it.hasNext()) { @@ -79,14 +139,13 @@ public class CatchProcessor extends DelegateAsyncProcessor implements Traceable return null; } - /** * Whether this catch processor handles the exception it have caught * * @param exchange the current exchange * @return <tt>true</tt> if this processor handles it, <tt>false</tt> otherwise. */ - public boolean handles(Exchange exchange) { + protected boolean handles(Exchange exchange) { if (handled == null) { // handle by default return true; http://git-wip-us.apache.org/repos/asf/camel/blob/66939043/camel-core/src/main/java/org/apache/camel/processor/FinallyProcessor.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/processor/FinallyProcessor.java b/camel-core/src/main/java/org/apache/camel/processor/FinallyProcessor.java new file mode 100644 index 0000000..c88c7c9 --- /dev/null +++ b/camel-core/src/main/java/org/apache/camel/processor/FinallyProcessor.java @@ -0,0 +1,83 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.processor; + +import org.apache.camel.AsyncCallback; +import org.apache.camel.Exchange; +import org.apache.camel.Processor; +import org.apache.camel.Traceable; +import org.apache.camel.util.ExchangeHelper; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Processor to handle do finally supporting asynchronous routing engine + * + * @version + */ +public class FinallyProcessor extends DelegateAsyncProcessor implements Traceable { + + private static final transient Logger LOG = LoggerFactory.getLogger(FinallyProcessor.class); + + public FinallyProcessor(Processor processor) { + super(processor); + } + + @Override + protected boolean processNext(final Exchange exchange, final AsyncCallback callback) { + // clear exception so finally block can be executed + final Exception e = exchange.getException(); + exchange.setException(null); + // but store the caught exception as a property + if (e != null) { + exchange.setProperty(Exchange.EXCEPTION_CAUGHT, e); + } + // store the last to endpoint as the failure endpoint + if (exchange.getProperty(Exchange.FAILURE_ENDPOINT) == null) { + exchange.setProperty(Exchange.FAILURE_ENDPOINT, exchange.getProperty(Exchange.TO_ENDPOINT)); + } + + boolean sync = super.processNext(exchange, new AsyncCallback() { + public void done(boolean doneSync) { + if (e == null) { + exchange.removeProperty(Exchange.FAILURE_ENDPOINT); + } else { + // set exception back on exchange + exchange.setException(e); + exchange.setProperty(Exchange.EXCEPTION_CAUGHT, e); + } + + if (!doneSync) { + // signal callback to continue routing async + ExchangeHelper.prepareOutToIn(exchange); + LOG.trace("Processing complete for exchangeId: {} >>> {}", exchange.getExchangeId(), exchange); + } + callback.done(doneSync); + } + }); + return sync; + } + + @Override + public String toString() { + return "Finally{" + getProcessor() + "}"; + } + + public String getTraceLabel() { + return "finally"; + } +} http://git-wip-us.apache.org/repos/asf/camel/blob/66939043/camel-core/src/main/java/org/apache/camel/processor/TryProcessor.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/processor/TryProcessor.java b/camel-core/src/main/java/org/apache/camel/processor/TryProcessor.java index 6cac402..bc8f98f 100644 --- a/camel-core/src/main/java/org/apache/camel/processor/TryProcessor.java +++ b/camel-core/src/main/java/org/apache/camel/processor/TryProcessor.java @@ -17,7 +17,6 @@ package org.apache.camel.processor; import java.util.ArrayList; -import java.util.Collection; import java.util.Iterator; import java.util.List; @@ -43,20 +42,20 @@ import org.slf4j.LoggerFactory; public class TryProcessor extends ServiceSupport implements AsyncProcessor, Navigate<Processor>, Traceable { private static final transient Logger LOG = LoggerFactory.getLogger(TryProcessor.class); - protected final AsyncProcessor tryProcessor; - protected final DoCatchProcessor catchProcessor; - protected final DoFinallyProcessor finallyProcessor; - private List<AsyncProcessor> processors; + protected final Processor tryProcessor; + protected final List<Processor> catchClauses; + protected final Processor finallyProcessor; - public TryProcessor(Processor tryProcessor, List<CatchProcessor> catchClauses, Processor finallyProcessor) { - this.tryProcessor = AsyncProcessorConverterHelper.convert(tryProcessor); - this.catchProcessor = new DoCatchProcessor(catchClauses); - this.finallyProcessor = new DoFinallyProcessor(finallyProcessor); + public TryProcessor(Processor tryProcessor, List<Processor> catchClauses, Processor finallyProcessor) { + this.tryProcessor = tryProcessor; + this.catchClauses = catchClauses; + this.finallyProcessor = finallyProcessor; } public String toString() { + String catchText = catchClauses == null || catchClauses.isEmpty() ? "": " Catches {" + catchClauses + "}"; String finallyText = (finallyProcessor == null) ? "" : " Finally {" + finallyProcessor + "}"; - return "Try {" + tryProcessor + "} " + (catchProcessor != null ? catchProcessor : "") + finallyText; + return "Try {" + tryProcessor + "}" + catchText + finallyText; } public String getTraceLabel() { @@ -68,14 +67,18 @@ public class TryProcessor extends ServiceSupport implements AsyncProcessor, Navi } public boolean process(Exchange exchange, AsyncCallback callback) { - Iterator<AsyncProcessor> processors = getProcessors().iterator(); + Iterator<Processor> processors = next().iterator(); + + Object lastHandled = exchange.getProperty(Exchange.EXCEPTION_HANDLED); + exchange.setProperty(Exchange.EXCEPTION_HANDLED, null); while (continueRouting(processors, exchange)) { ExchangeHelper.prepareOutToIn(exchange); // process the next processor - AsyncProcessor processor = processors.next(); - boolean sync = process(exchange, callback, processor, processors); + Processor processor = processors.next(); + AsyncProcessor async = AsyncProcessorConverterHelper.convert(processor); + boolean sync = process(exchange, callback, processors, async, lastHandled); // continue as long its being processed synchronously if (!sync) { @@ -89,13 +92,15 @@ public class TryProcessor extends ServiceSupport implements AsyncProcessor, Navi } ExchangeHelper.prepareOutToIn(exchange); + exchange.setProperty(Exchange.EXCEPTION_HANDLED, lastHandled); LOG.trace("Processing complete for exchangeId: {} >>> {}", exchange.getExchangeId(), exchange); callback.done(true); return true; } protected boolean process(final Exchange exchange, final AsyncCallback callback, - final AsyncProcessor processor, final Iterator<AsyncProcessor> processors) { + final Iterator<Processor> processors, final AsyncProcessor processor, + final Object lastHandled) { // this does the actual processing so log at trace level LOG.trace("Processing exchangeId: {} >>> {}", exchange.getExchangeId(), exchange); @@ -113,8 +118,8 @@ public class TryProcessor extends ServiceSupport implements AsyncProcessor, Navi ExchangeHelper.prepareOutToIn(exchange); // process the next processor - AsyncProcessor processor = processors.next(); - doneSync = process(exchange, callback, processor, processors); + AsyncProcessor processor = AsyncProcessorConverterHelper.convert(processors.next()); + doneSync = process(exchange, callback, processors, processor, lastHandled); if (!doneSync) { LOG.trace("Processing exchangeId: {} is continued being processed asynchronously", exchange.getExchangeId()); @@ -125,6 +130,7 @@ public class TryProcessor extends ServiceSupport implements AsyncProcessor, Navi } ExchangeHelper.prepareOutToIn(exchange); + exchange.setProperty(Exchange.EXCEPTION_HANDLED, lastHandled); LOG.trace("Processing complete for exchangeId: {} >>> {}", exchange.getExchangeId(), exchange); callback.done(false); } @@ -133,11 +139,7 @@ public class TryProcessor extends ServiceSupport implements AsyncProcessor, Navi return sync; } - protected Collection<AsyncProcessor> getProcessors() { - return processors; - } - - protected boolean continueRouting(Iterator<AsyncProcessor> it, Exchange exchange) { + protected boolean continueRouting(Iterator<Processor> it, Exchange exchange) { Object stop = exchange.getProperty(Exchange.ROUTE_STOP); if (stop != null) { boolean doStop = exchange.getContext().getTypeConverter().convertTo(Boolean.class, stop); @@ -152,16 +154,11 @@ public class TryProcessor extends ServiceSupport implements AsyncProcessor, Navi } protected void doStart() throws Exception { - processors = new ArrayList<AsyncProcessor>(); - processors.add(tryProcessor); - processors.add(catchProcessor); - processors.add(finallyProcessor); - ServiceHelper.startServices(tryProcessor, catchProcessor, finallyProcessor); + ServiceHelper.startServices(tryProcessor, catchClauses, finallyProcessor); } protected void doStop() throws Exception { - ServiceHelper.stopServices(finallyProcessor, catchProcessor, tryProcessor); - processors.clear(); + ServiceHelper.stopServices(tryProcessor, catchClauses, finallyProcessor); } public List<Processor> next() { @@ -172,8 +169,8 @@ public class TryProcessor extends ServiceSupport implements AsyncProcessor, Navi if (tryProcessor != null) { answer.add(tryProcessor); } - if (catchProcessor != null) { - answer.add(catchProcessor); + if (catchClauses != null) { + answer.addAll(catchClauses); } if (finallyProcessor != null) { answer.add(finallyProcessor); @@ -182,230 +179,7 @@ public class TryProcessor extends ServiceSupport implements AsyncProcessor, Navi } public boolean hasNext() { - return tryProcessor != null; - } - - /** - * Processor to handle do catch supporting asynchronous routing engine - */ - private final class DoCatchProcessor extends ServiceSupport implements AsyncProcessor, Navigate<Processor>, Traceable { - - private final List<CatchProcessor> catchClauses; - - private DoCatchProcessor(List<CatchProcessor> catchClauses) { - this.catchClauses = catchClauses; - } - - public void process(Exchange exchange) throws Exception { - AsyncProcessorHelper.process(this, exchange); - } - - public boolean process(final Exchange exchange, final AsyncCallback callback) { - Exception e = exchange.getException(); - - if (catchClauses == null || e == null) { - return true; - } - - // find a catch clause to use - CatchProcessor processor = null; - for (CatchProcessor catchClause : catchClauses) { - Throwable caught = catchClause.catches(exchange, e); - if (caught != null) { - if (LOG.isTraceEnabled()) { - LOG.trace("This TryProcessor catches the exception: {} caused by: {}", caught.getClass().getName(), e.getMessage()); - } - processor = catchClause; - break; - } - } - - if (processor != null) { - // create the handle processor which performs the actual logic - // this processor just lookup the right catch clause to use and then let the - // HandleDoCatchProcessor do all the hard work (separate of concerns) - HandleDoCatchProcessor cool = new HandleDoCatchProcessor(processor); - return AsyncProcessorHelper.process(cool, exchange, callback); - } else { - if (LOG.isTraceEnabled()) { - LOG.trace("This TryProcessor does not catch the exception: {} caused by: {}", e.getClass().getName(), e.getMessage()); - } - } - - return true; - } - - @Override - protected void doStart() throws Exception { - ServiceHelper.startService(catchClauses); - } - - @Override - protected void doStop() throws Exception { - ServiceHelper.stopServices(catchClauses); - } - - @Override - public String toString() { - return "Catches{" + catchClauses + "}"; - } - - public String getTraceLabel() { - return "doCatch"; - } - - public List<Processor> next() { - List<Processor> answer = new ArrayList<Processor>(); - if (catchProcessor != null) { - answer.addAll(catchClauses); - } - return answer; - } - - public boolean hasNext() { - return catchClauses != null && catchClauses.size() > 0; - } - } - - /** - * Processor to handle do finally supporting asynchronous routing engine - */ - private final class DoFinallyProcessor extends DelegateAsyncProcessor implements Traceable { - - private DoFinallyProcessor(Processor processor) { - super(processor); - } - - @Override - protected boolean processNext(final Exchange exchange, final AsyncCallback callback) { - // clear exception so finally block can be executed - final Exception e = exchange.getException(); - exchange.setException(null); - // but store the caught exception as a property - if (e != null) { - exchange.setProperty(Exchange.EXCEPTION_CAUGHT, e); - } - // store the last to endpoint as the failure endpoint - if (exchange.getProperty(Exchange.FAILURE_ENDPOINT) == null) { - exchange.setProperty(Exchange.FAILURE_ENDPOINT, exchange.getProperty(Exchange.TO_ENDPOINT)); - } - - boolean sync = super.processNext(exchange, new AsyncCallback() { - public void done(boolean doneSync) { - // we only have to handle async completion of the pipeline - if (doneSync) { - return; - } - - if (e == null) { - exchange.removeProperty(Exchange.FAILURE_ENDPOINT); - } else { - // set exception back on exchange - exchange.setException(e); - exchange.setProperty(Exchange.EXCEPTION_CAUGHT, e); - } - - // signal callback to continue routing async - ExchangeHelper.prepareOutToIn(exchange); - LOG.trace("Processing complete for exchangeId: {} >>> {}", exchange.getExchangeId(), exchange); - callback.done(false); - } - }); - - if (sync) { - if (e == null) { - exchange.removeProperty(Exchange.FAILURE_ENDPOINT); - } else { - // set exception back on exchange - exchange.setException(e); - exchange.setProperty(Exchange.EXCEPTION_CAUGHT, e); - } - } - - return sync; - } - - @Override - public String toString() { - return "Finally{" + getProcessor() + "}"; - } - - public String getTraceLabel() { - return "doFinally"; - } - } - - /** - * Processor to handle do catch supporting asynchronous routing engine - */ - private final class HandleDoCatchProcessor extends DelegateAsyncProcessor { - - private final CatchProcessor catchClause; - - private HandleDoCatchProcessor(CatchProcessor processor) { - super(processor); - this.catchClause = processor; - } - - @Override - protected boolean processNext(final Exchange exchange, final AsyncCallback callback) { - final Exception caught = exchange.getException(); - if (caught == null) { - return true; - } - - // store the last to endpoint as the failure endpoint - if (exchange.getProperty(Exchange.FAILURE_ENDPOINT) == null) { - exchange.setProperty(Exchange.FAILURE_ENDPOINT, exchange.getProperty(Exchange.TO_ENDPOINT)); - } - // give the rest of the pipeline another chance - exchange.setProperty(Exchange.EXCEPTION_CAUGHT, caught); - exchange.setException(null); - // and we should not be regarded as exhausted as we are in a try .. catch block - exchange.removeProperty(Exchange.REDELIVERY_EXHAUSTED); - - // is the exception handled by the catch clause - final Boolean handled = catchClause.handles(exchange); - - if (LOG.isDebugEnabled()) { - LOG.debug("The exception is handled: {} for the exception: {} caused by: {}", - new Object[]{handled, caught.getClass().getName(), caught.getMessage()}); - } - - boolean sync = super.processNext(exchange, new AsyncCallback() { - public void done(boolean doneSync) { - // we only have to handle async completion of the pipeline - if (doneSync) { - return; - } - - if (!handled) { - if (exchange.getException() == null) { - exchange.setException(exchange.getProperty(Exchange.EXCEPTION_CAUGHT, Exception.class)); - } - } - // always clear redelivery exhausted in a catch clause - exchange.removeProperty(Exchange.REDELIVERY_EXHAUSTED); - - // signal callback to continue routing async - ExchangeHelper.prepareOutToIn(exchange); - callback.done(false); - } - }); - - if (sync) { - // set exception back on exchange - if (!handled) { - if (exchange.getException() == null) { - exchange.setException(exchange.getProperty(Exchange.EXCEPTION_CAUGHT, Exception.class)); - } - } - // always clear redelivery exhausted in a catch clause - exchange.removeProperty(Exchange.REDELIVERY_EXHAUSTED); - } - - return sync; - } + return tryProcessor != null || catchClauses != null && !catchClauses.isEmpty() || finallyProcessor != null; } }
