Author: davsclaus
Date: Mon Jun 20 17:03:45 2011
New Revision: 1137705

URL: http://svn.apache.org/viewvc?rev=1137705&view=rev
Log:
CAMEL-1817: Optimized to only do defensive copy of Exchange if redelivery has 
been enabled.

Modified:
    
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/ErrorHandlerSupport.java
    
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/RedeliveryErrorHandler.java

Modified: 
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/ErrorHandlerSupport.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/ErrorHandlerSupport.java?rev=1137705&r1=1137704&r2=1137705&view=diff
==============================================================================
--- 
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/ErrorHandlerSupport.java
 (original)
+++ 
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/ErrorHandlerSupport.java
 Mon Jun 20 17:03:45 2011
@@ -41,8 +41,8 @@ public abstract class ErrorHandlerSuppor
 
     protected final transient Logger log = LoggerFactory.getLogger(getClass());
 
-    private final Map<ExceptionPolicyKey, OnExceptionDefinition> 
exceptionPolicies = new LinkedHashMap<ExceptionPolicyKey, 
OnExceptionDefinition>();
-    private ExceptionPolicyStrategy exceptionPolicy = 
createDefaultExceptionPolicyStrategy();
+    protected final Map<ExceptionPolicyKey, OnExceptionDefinition> 
exceptionPolicies = new LinkedHashMap<ExceptionPolicyKey, 
OnExceptionDefinition>();
+    protected ExceptionPolicyStrategy exceptionPolicy = 
createDefaultExceptionPolicyStrategy();
 
     public void addExceptionPolicy(OnExceptionDefinition exceptionType) {
         Processor processor = exceptionType.getErrorHandler();

Modified: 
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/RedeliveryErrorHandler.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/RedeliveryErrorHandler.java?rev=1137705&r1=1137704&r2=1137705&view=diff
==============================================================================
--- 
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/RedeliveryErrorHandler.java
 (original)
+++ 
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/RedeliveryErrorHandler.java
 Mon Jun 20 17:03:45 2011
@@ -33,6 +33,7 @@ import org.apache.camel.impl.converter.A
 import org.apache.camel.model.OnExceptionDefinition;
 import org.apache.camel.spi.SubUnitOfWorkCallback;
 import org.apache.camel.util.AsyncProcessorHelper;
+import org.apache.camel.util.CamelContextHelper;
 import org.apache.camel.util.EventHelper;
 import org.apache.camel.util.ExchangeHelper;
 import org.apache.camel.util.MessageHelper;
@@ -46,7 +47,7 @@ import org.apache.camel.util.ServiceHelp
  * This implementation should contain all the error handling logic and the sub 
classes
  * should only configure it according to what they support.
  *
- * @version 
+ * @version
  */
 public abstract class RedeliveryErrorHandler extends ErrorHandlerSupport 
implements AsyncProcessor {
 
@@ -62,6 +63,7 @@ public abstract class RedeliveryErrorHan
     protected final Predicate retryWhilePolicy;
     protected final CamelLogger logger;
     protected final boolean useOriginalMessagePolicy;
+    protected boolean redeliveryEnabled;
 
     /**
      * Contains the current redelivery data
@@ -88,7 +90,7 @@ public abstract class RedeliveryErrorHan
     /**
      * Tasks which performs asynchronous redelivery attempts, and being 
triggered by a
      * {@link java.util.concurrent.ScheduledExecutorService} to avoid having 
any threads blocking if a task
-     * has to be delayed before a redelivery attempt is performed. 
+     * has to be delayed before a redelivery attempt is performed.
      */
     private class AsyncRedeliveryTask implements Callable<Boolean> {
 
@@ -171,8 +173,8 @@ public abstract class RedeliveryErrorHan
         }
     }
 
-    public RedeliveryErrorHandler(CamelContext camelContext, Processor output, 
CamelLogger logger, 
-            Processor redeliveryProcessor, RedeliveryPolicy redeliveryPolicy, 
Processor deadLetter, 
+    public RedeliveryErrorHandler(CamelContext camelContext, Processor output, 
CamelLogger logger,
+            Processor redeliveryProcessor, RedeliveryPolicy redeliveryPolicy, 
Processor deadLetter,
             String deadLetterUri, boolean useOriginalMessagePolicy, Predicate 
retryWhile, String executorServiceRef) {
 
         ObjectHelper.notNull(camelContext, "CamelContext", this);
@@ -207,11 +209,6 @@ public abstract class RedeliveryErrorHan
         return processErrorHandler(exchange, callback, new RedeliveryData());
     }
 
-    protected Exchange defensiveCopyExchange(Exchange exchange) {
-        // TODO: Optimize to only copy if redelivery is possible/enabled
-        return ExchangeHelper.createCopy(exchange, true);
-    }
-
     /**
      * Process the exchange using redelivery error handling.
      */
@@ -219,7 +216,7 @@ public abstract class RedeliveryErrorHan
 
         // do a defensive copy of the original Exchange, which is needed for 
redelivery so we can ensure the
         // original Exchange is being redelivered, and not a mutated Exchange
-        data.original = defensiveCopyExchange(exchange);
+        data.original = defensiveCopyExchangeIfNeeded(exchange);
 
         // use looping to have redelivery attempts
         while (true) {
@@ -429,6 +426,21 @@ public abstract class RedeliveryErrorHan
     }
 
     /**
+     * Performs a defensive copy of the exchange if needed
+     *
+     * @param exchange the exchange
+     * @return the defensive copy, or <tt>null</tt> if not needed (redelivery 
is not enabled).
+     */
+    protected Exchange defensiveCopyExchangeIfNeeded(Exchange exchange) {
+        // only do a defensive copy if redelivery is enabled
+        if (redeliveryEnabled) {
+            return ExchangeHelper.createCopy(exchange, true);
+        } else {
+            return null;
+        }
+    }
+
+    /**
      * Strategy whether the exchange has an exception that we should try to 
handle.
      * <p/>
      * Standard implementations should just look for an exception.
@@ -536,6 +548,9 @@ public abstract class RedeliveryErrorHan
     }
 
     protected void prepareExchangeForRedelivery(Exchange exchange, 
RedeliveryData data) {
+        // there must be a defensive copy of the exchange
+        ObjectHelper.notNull(data.original, "Defensive copy of Exchange is 
null", this);
+
         // okay we will give it another go so clear the exception so we can 
try again
         exchange.setException(null);
 
@@ -669,7 +684,7 @@ public abstract class RedeliveryErrorHan
                     exchange.setOut(null);
                 }
             }
-            
+
             // reset cached streams so they can be read again
             MessageHelper.resetStreamCache(exchange.getIn());
 
@@ -913,6 +928,55 @@ public abstract class RedeliveryErrorHan
         }
     }
 
+    /**
+     * Determines if redelivery is enabled by checking if any of the 
redelivery policy
+     * settings may allow redeliveries.
+     *
+     * @return <tt>true</tt> if redelivery is possible, <tt>false</tt> 
otherwise
+     * @throws Exception can be thrown
+     */
+    private boolean determineIfRedeliveryIsEnabled() throws Exception {
+        // determine if redeliver is enabled either on error handler
+        if (getRedeliveryPolicy().getMaximumRedeliveries() != 0) {
+            // must check for != 0 as (-1 means redeliver forever)
+            return true;
+        }
+        if (retryWhilePolicy != null) {
+            return true;
+        }
+
+        // or on the exception policies
+        if (!exceptionPolicies.isEmpty()) {
+            // walk them to see if any of them have a maximum redeliveries > 0 
or retry until set
+            for (OnExceptionDefinition def : exceptionPolicies.values()) {
+
+                if (def.getRedeliveryPolicy() != null) {
+                    String ref = def.getRedeliveryPolicyRef();
+                    if (ref != null) {
+                        // lookup in registry if ref provided
+                        RedeliveryPolicy policy = 
CamelContextHelper.mandatoryLookup(camelContext, ref, RedeliveryPolicy.class);
+                        if (policy.getMaximumRedeliveries() != 0) {
+                            // must check for != 0 as (-1 means redeliver 
forever)
+                            return true;
+                        }
+                    } else {
+                        Integer max = 
CamelContextHelper.parseInteger(camelContext, 
def.getRedeliveryPolicy().getMaximumRedeliveries());
+                        if (max != null && max != 0) {
+                            // must check for != 0 as (-1 means redeliver 
forever)
+                            return true;
+                        }
+                    }
+                }
+
+                if (def.getRetryWhilePolicy() != null || def.getRetryWhile() 
!= null) {
+                    return true;
+                }
+            }
+        }
+
+        return false;
+    }
+
     @Override
     protected void doStart() throws Exception {
         ServiceHelper.startServices(output, outputAsync, deadLetter);
@@ -928,6 +992,12 @@ public abstract class RedeliveryErrorHan
                 executorService = 
camelContext.getExecutorServiceStrategy().newScheduledThreadPool(this, 
"ErrorHandlerRedeliveryTask");
             }
         }
+
+        // determine if redeliver is enabled or not
+        redeliveryEnabled = determineIfRedeliveryIsEnabled();
+        if (log.isDebugEnabled()) {
+            log.debug("Redelivery enabled: {} on error handler: {}", 
redeliveryEnabled, this);
+        }
     }
 
     @Override


Reply via email to