Author: davsclaus
Date: Thu Jan  8 11:47:16 2009
New Revision: 732808

URL: http://svn.apache.org/viewvc?rev=732808&view=rev
Log:
Merged revisions 732793 via svnmerge from 
https://svn.apache.org/repos/asf/activemq/camel/trunk

........
  r732793 | davsclaus | 2009-01-08 20:01:22 +0100 (Thu, 08 Jan 2009) | 1 line
  
  CAMEL-1234: Added onRedelivery option to DeadLetterChannel to allow 
processing a custom processor before every redelivery attempt.
........

Added:
    
activemq/camel/branches/camel-1.x/camel-core/src/test/java/org/apache/camel/processor/DeadLetterChannelOnRedeliveryTest.java
      - copied, changed from r732793, 
activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/DeadLetterChannelOnRedeliveryTest.java
Modified:
    activemq/camel/branches/camel-1.x/   (props changed)
    
activemq/camel/branches/camel-1.x/camel-core/src/main/java/org/apache/camel/builder/DeadLetterChannelBuilder.java
    
activemq/camel/branches/camel-1.x/camel-core/src/main/java/org/apache/camel/processor/DeadLetterChannel.java
    
activemq/camel/branches/camel-1.x/camel-core/src/test/resources/log4j.properties

Propchange: activemq/camel/branches/camel-1.x/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Thu Jan  8 11:47:16 2009
@@ -1 +1 @@
-/activemq/camel/trunk:709850,711200,711206,711219-711220,711523,711531,711756,711784,711859,711874,711962,711971,712064,712119,712148,712662,712692,712925,713013,713107,713136,713273,713290,713292,713295,713314,713475,713625,713932,713944,714032,717965,717989,718242,718273,718312-718515,719163-719184,719334,719339,719524,719662,719848,719851,719855,719864,719978-719979,720207,720435-720437,720806,721272,721331,721333-721334,721360,721669,721764,721813,721985,722005,722070,722110,722415,722438,722726,722845,722878,723264,723314,723325-723327,723409,723835,723966,724122,724619,724681,725040,725309-725320,725340,725351,725569-725572,725612,725652-725660,725715,725883,726339,726640-726645,726932,727113,727375,727377,727624,727713,727946,729401,729892,730069,730132,730154,730157,730275,730299,730504-730505,730508,730571,730599,730759,730903,730916,730923,730936,730992,731126,731168-731169,731488,731492,731799,731824,731836,731844,731860,732207,732210,732237,732246-732247,732378,7
 32589-732590,732625
+/activemq/camel/trunk:709850,711200,711206,711219-711220,711523,711531,711756,711784,711859,711874,711962,711971,712064,712119,712148,712662,712692,712925,713013,713107,713136,713273,713290,713292,713295,713314,713475,713625,713932,713944,714032,717965,717989,718242,718273,718312-718515,719163-719184,719334,719339,719524,719662,719848,719851,719855,719864,719978-719979,720207,720435-720437,720806,721272,721331,721333-721334,721360,721669,721764,721813,721985,722005,722070,722110,722415,722438,722726,722845,722878,723264,723314,723325-723327,723409,723835,723966,724122,724619,724681,725040,725309-725320,725340,725351,725569-725572,725612,725652-725660,725715,725883,726339,726640-726645,726932,727113,727375,727377,727624,727713,727946,729401,729892,730069,730132,730154,730157,730275,730299,730504-730505,730508,730571,730599,730759,730903,730916,730923,730936,730992,731126,731168-731169,731488,731492,731799,731824,731836,731844,731860,732207,732210,732237,732246-732247,732378,7
 32589-732590,732625,732793

Propchange: activemq/camel/branches/camel-1.x/
------------------------------------------------------------------------------
Binary property 'svnmerge-integrated' - no diff available.

Modified: 
activemq/camel/branches/camel-1.x/camel-core/src/main/java/org/apache/camel/builder/DeadLetterChannelBuilder.java
URL: 
http://svn.apache.org/viewvc/activemq/camel/branches/camel-1.x/camel-core/src/main/java/org/apache/camel/builder/DeadLetterChannelBuilder.java?rev=732808&r1=732807&r2=732808&view=diff
==============================================================================
--- 
activemq/camel/branches/camel-1.x/camel-core/src/main/java/org/apache/camel/builder/DeadLetterChannelBuilder.java
 (original)
+++ 
activemq/camel/branches/camel-1.x/camel-core/src/main/java/org/apache/camel/builder/DeadLetterChannelBuilder.java
 Thu Jan  8 11:47:16 2009
@@ -38,6 +38,7 @@
  */
 public class DeadLetterChannelBuilder extends ErrorHandlerBuilderSupport {
     private RedeliveryPolicy redeliveryPolicy = new RedeliveryPolicy();
+    private Processor onRedelivery;
     private ExceptionPolicyStrategy exceptionPolicyStrategy = 
ErrorHandlerSupport.createDefaultExceptionPolicyStrategy();
     private ProcessorFactory deadLetterFactory;
     private Processor defaultDeadLetterEndpoint;
@@ -63,8 +64,8 @@
     }
 
     public Processor createErrorHandler(RouteContext routeContext, Processor 
processor) throws Exception {
-        Processor deadLetter = getDeadLetterFactory().createProcessor();       
-        DeadLetterChannel answer = new DeadLetterChannel(processor, 
deadLetter, getRedeliveryPolicy(), getLogger(), getExceptionPolicyStrategy());
+        Processor deadLetter = getDeadLetterFactory().createProcessor();
+        DeadLetterChannel answer = new DeadLetterChannel(processor, 
deadLetter, onRedelivery, getRedeliveryPolicy(), getLogger(), 
getExceptionPolicyStrategy());
         configure(answer);
         return answer;
     }
@@ -106,6 +107,16 @@
         return this;
     }
 
+    public DeadLetterChannelBuilder retriesExhaustedLogLevel(LoggingLevel 
retriesExhaustedLogLevel) {
+        
getRedeliveryPolicy().setRetriesExhaustedLogLevel(retriesExhaustedLogLevel);
+        return this;
+    }
+
+    public DeadLetterChannelBuilder retryAttemptedLogLevel(LoggingLevel 
retryAttemptedLogLevel) {
+        
getRedeliveryPolicy().setRetryAttemptedLogLevel(retryAttemptedLogLevel);
+        return this;
+    }
+
     /**
      * Sets the logger used for caught exceptions
      */
@@ -152,6 +163,16 @@
         return this;
     }
 
+    /**
+     * Sets a processor that should be processed <b>before</b> a redelivey 
attempt.
+     * <p/>
+     * Can be used to change the {...@link org.apache.camel.Exchange} 
<b>before</b> its being redelivered.
+     */
+    public DeadLetterChannelBuilder onRedelivery(Processor processor) {
+        setOnRedelivery(processor);
+        return this;
+    }
+
     // Properties
     // 
-------------------------------------------------------------------------
     public RedeliveryPolicy getRedeliveryPolicy() {
@@ -250,4 +271,16 @@
         this.exceptionPolicyStrategy = exceptionPolicyStrategy;
     }
 
+    public Processor getOnRedelivery() {
+        return onRedelivery;
+    }
+
+    public void setOnRedelivery(Processor onRedelivery) {
+        this.onRedelivery = onRedelivery;
+    }
+
+    @Override
+    public String toString() {
+        return "DeadLetterChannelBuilder(" + (deadLetterFactory != null ? 
deadLetterFactory : defaultDeadLetterEndpoint) + ")";
+    }
 }

Modified: 
activemq/camel/branches/camel-1.x/camel-core/src/main/java/org/apache/camel/processor/DeadLetterChannel.java
URL: 
http://svn.apache.org/viewvc/activemq/camel/branches/camel-1.x/camel-core/src/main/java/org/apache/camel/processor/DeadLetterChannel.java?rev=732808&r1=732807&r2=732808&view=diff
==============================================================================
--- 
activemq/camel/branches/camel-1.x/camel-core/src/main/java/org/apache/camel/processor/DeadLetterChannel.java
 (original)
+++ 
activemq/camel/branches/camel-1.x/camel-core/src/main/java/org/apache/camel/processor/DeadLetterChannel.java
 Thu Jan  8 11:47:16 2009
@@ -58,6 +58,7 @@
     private AsyncProcessor outputAsync;
     private RedeliveryPolicy redeliveryPolicy;
     private Logger logger;
+    private Processor redeliveryProcessor;
 
     private class RedeliveryData {
         int redeliveryCounter;
@@ -104,16 +105,11 @@
         } 
     }
 
-    public DeadLetterChannel(Processor output, Processor deadLetter) {
-        this(output, deadLetter, new RedeliveryPolicy(), 
DeadLetterChannel.createDefaultLogger(),
-            ErrorHandlerSupport.createDefaultExceptionPolicyStrategy());
-    }
-
-    public DeadLetterChannel(Processor output, Processor deadLetter, 
RedeliveryPolicy redeliveryPolicy, Logger logger, ExceptionPolicyStrategy 
exceptionPolicyStrategy) {
-        this.deadLetter = deadLetter;
+    public DeadLetterChannel(Processor output, Processor deadLetter, Processor 
redeliveryProcessor, RedeliveryPolicy redeliveryPolicy, Logger logger, 
ExceptionPolicyStrategy exceptionPolicyStrategy) {
         this.output = output;
+        this.deadLetter = deadLetter;
+        this.redeliveryProcessor = redeliveryProcessor;
         this.outputAsync = AsyncProcessorTypeConverter.convert(output);
-
         this.redeliveryPolicy = redeliveryPolicy;
         this.logger = logger;
         setExceptionPolicy(exceptionPolicyStrategy);
@@ -128,11 +124,18 @@
         return "DeadLetterChannel[" + output + ", " + deadLetter + "]";
     }
 
+    public void process(Exchange exchange) throws Exception {
+        AsyncProcessorHelper.process(this, exchange);
+    }
+
     public boolean process(Exchange exchange, final AsyncCallback callback) {
         return process(exchange, callback, new RedeliveryData());
     }
 
-    public boolean process(final Exchange exchange, final AsyncCallback 
callback, final RedeliveryData data) {
+    /**
+     * Processes the exchange using decorated with this dead letter channel.
+     */
+    protected boolean process(final Exchange exchange, final AsyncCallback 
callback, final RedeliveryData data) {
 
         while (true) {
             // we can't keep retrying if the route is being shutdown.
@@ -158,12 +161,13 @@
                 handleException(exchange, data);
             }
 
-            // should we redeliver or not?
-            if 
(!data.currentRedeliveryPolicy.shouldRedeliver(data.redeliveryCounter)) {
+            // compute if we should redeliver or not
+            boolean shouldRedeliver = shouldRedeliver(exchange, data);
+            if (!shouldRedeliver) {
                 return deliverToFaultProcessor(exchange, callback, data);
             }
 
-            // should we redeliver
+            // if we are redelivering then sleep before trying again
             if (data.redeliveryCounter > 0) {
                 // okay we will give it another go so clear the exception so 
we can try again
                 if (exchange.getException() != null) {
@@ -172,6 +176,9 @@
 
                 // wait until we should redeliver
                 data.redeliveryDelay = 
data.currentRedeliveryPolicy.sleep(data.redeliveryDelay);
+
+                // letting onRedeliver be executed
+                deliverToRedeliveryProcessor(exchange, callback, data);
             }
 
             // process the exchange
@@ -206,20 +213,6 @@
 
     }
 
-    private void logFailedDelivery(String message, RedeliveryData data, 
Throwable e) {
-        LoggingLevel newLogLevel = null;
-        if 
(data.currentRedeliveryPolicy.shouldRedeliver(data.redeliveryCounter)) {
-            newLogLevel = 
data.currentRedeliveryPolicy.getRetryAttemptedLogLevel();
-        } else {
-            newLogLevel = 
data.currentRedeliveryPolicy.getRetriesExhaustedLogLevel();
-        }
-        if (e != null) {
-            logger.log(message, e, newLogLevel);
-        } else {
-            logger.log(message, newLogLevel);
-        }
-    }
-
     protected void asyncProcess(final Exchange exchange, final AsyncCallback 
callback, final RedeliveryData data) {
         // set the timer here
         if (!isRunAllowed()) {
@@ -242,9 +235,11 @@
         // did previous processing caused an exception?
         if (exchange.getException() != null) {
             handleException(exchange, data);
-        }        
+        }
         
-        if 
(!data.currentRedeliveryPolicy.shouldRedeliver(data.redeliveryCounter)) {
+        // compute if we should redeliver or not
+        boolean shouldRedeliver = shouldRedeliver(exchange, data);
+        if (!shouldRedeliver) {
             deliverToFaultProcessor(exchange, callback, data);
             return;
         }
@@ -259,7 +254,10 @@
             // wait until we should redeliver
             data.redeliveryDelay = 
data.currentRedeliveryPolicy.getRedeliveryDelay(data.redeliveryDelay);
             timer.schedule(new RedeliverTimerTask(exchange, callback, data), 
data.redeliveryDelay);
-        }        
+
+            // letting onRedeliver be executed
+            deliverToRedeliveryProcessor(exchange, callback, data);
+        }
     }
     
     private void handleException(Exchange exchange, RedeliveryData data) {
@@ -275,14 +273,39 @@
             Processor processor = exceptionPolicy.getErrorHandler();
             if (processor != null) {
                 data.failureProcessor = processor;
-            }                    
+            }
         }
-        
-        logFailedDelivery("Failed delivery for exchangeId: " + 
exchange.getExchangeId() + ". On delivery attempt: " + data.redeliveryCounter + 
" caught: " + e, data, e);
+
+        String msg = "Failed delivery for exchangeId: " + 
exchange.getExchangeId()
+                + ". On delivery attempt: " + data.redeliveryCounter + " 
caught: " + e;
+        logFailedDelivery(true, exchange, msg, data, e);
+
         data.redeliveryCounter = incrementRedeliveryCounter(exchange, e);
-        
     }
-    
+
+    /**
+     * Gives an optional configure redelivery processor a chance to process 
before the Exchange
+     * will be redelivered. This can be used to alter the Exchange.
+     */
+    private boolean deliverToRedeliveryProcessor(final Exchange exchange, 
final AsyncCallback callback,
+                                            final RedeliveryData data) {
+        if (redeliveryProcessor == null) {
+            return true;
+        }
+
+        if (LOG.isTraceEnabled()) {
+            LOG.trace("RedeliveryProcessor " + redeliveryProcessor + " is 
processing Exchange before its redelivered");
+        }
+        AsyncProcessor afp = 
AsyncProcessorTypeConverter.convert(redeliveryProcessor);
+        boolean sync = afp.process(exchange, new AsyncCallback() {
+            public void done(boolean sync) {
+                callback.done(data.sync);
+            }
+        });
+
+        return sync;
+    }
+
     private boolean deliverToFaultProcessor(final Exchange exchange, final 
AsyncCallback callback,
                                             final RedeliveryData data) {
         // we did not success with the redelivery so now we let the failure 
processor handle it
@@ -299,45 +322,27 @@
             }
         });
 
-        // The line below shouldn't be needed, it is invoked by the 
AsyncCallback above
-        //restoreExceptionOnExchange(exchange, data.handledPredicate);
-        logFailedDelivery("Failed delivery for exchangeId: " + 
exchange.getExchangeId() + ". Handled by the failure processor: " + 
data.failureProcessor, data, null);
+        String msg = "Failed delivery for exchangeId: " + 
exchange.getExchangeId()
+                + ". Handled by the failure processor: " + 
data.failureProcessor;
+        logFailedDelivery(false, exchange, msg, data, null);
+
         return sync;
     }
 
+    // Properties
+    // 
-------------------------------------------------------------------------
+
     public static boolean isFailureHandled(Exchange exchange) {
-        return exchange.getProperty(FAILURE_HANDLED_PROPERTY) != null 
+        return exchange.getProperty(FAILURE_HANDLED_PROPERTY) != null
             || exchange.getIn().getHeader(CAUGHT_EXCEPTION_HEADER) != null;
     }
 
     public static void setFailureHandled(Exchange exchange) {
         exchange.setProperty(FAILURE_HANDLED_PROPERTY, 
exchange.getException());
-        exchange.getIn().setHeader(CAUGHT_EXCEPTION_HEADER, 
exchange.getException());        
+        exchange.getIn().setHeader(CAUGHT_EXCEPTION_HEADER, 
exchange.getException());
         exchange.setException(null);
     }
 
-    protected static void restoreExceptionOnExchange(Exchange exchange, 
Predicate handledPredicate) {
-        if (handledPredicate == null || !handledPredicate.matches(exchange)) {
-            if (LOG.isDebugEnabled()) {
-                LOG.debug("This exchange is not handled so its marked as 
failed: " + exchange);
-            }
-            // exception not handled, put exception back in the exchange
-            
exchange.setException(exchange.getProperty(FAILURE_HANDLED_PROPERTY, 
Throwable.class));
-        } else {
-            if (LOG.isDebugEnabled()) {
-                LOG.debug("This exchange is handled so its marked as not 
failed: " + exchange);
-            }
-            exchange.setProperty(Exchange.EXCEPTION_HANDLED_PROPERTY, 
Boolean.TRUE);
-        }
-    }
-
-    public void process(Exchange exchange) throws Exception {
-        AsyncProcessorHelper.process(this, exchange);
-    }
-
-    // Properties
-    // 
-------------------------------------------------------------------------
-
     /**
      * Returns the output processor
      */
@@ -379,6 +384,39 @@
     // Implementation methods
     // 
-------------------------------------------------------------------------
 
+    protected static void restoreExceptionOnExchange(Exchange exchange, 
Predicate handledPredicate) {
+        if (handledPredicate == null || !handledPredicate.matches(exchange)) {
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("This exchange is not handled so its marked as 
failed: " + exchange);
+            }
+            // exception not handled, put exception back in the exchange
+            
exchange.setException(exchange.getProperty(FAILURE_HANDLED_PROPERTY, 
Throwable.class));
+        } else {
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("This exchange is handled so its marked as not 
failed: " + exchange);
+            }
+            exchange.setProperty(Exchange.EXCEPTION_HANDLED_PROPERTY, 
Boolean.TRUE);
+        }
+    }
+
+    private void logFailedDelivery(boolean shouldRedeliver, Exchange exchange, 
String message, RedeliveryData data, Throwable e) {
+        LoggingLevel newLogLevel;
+        if (shouldRedeliver) {
+            newLogLevel = 
data.currentRedeliveryPolicy.getRetryAttemptedLogLevel();
+        } else {
+            newLogLevel = 
data.currentRedeliveryPolicy.getRetriesExhaustedLogLevel();
+        }
+        if (e != null) {
+            logger.log(message, e, newLogLevel);
+        } else {
+            logger.log(message, newLogLevel);
+        }
+    }
+
+    private boolean shouldRedeliver(Exchange exchange, RedeliveryData data) {
+        return 
data.currentRedeliveryPolicy.shouldRedeliver(data.redeliveryCounter);
+    }
+
     /**
      * Increments the redelivery counter and adds the redelivered flag if the
      * message has been redelivered

Copied: 
activemq/camel/branches/camel-1.x/camel-core/src/test/java/org/apache/camel/processor/DeadLetterChannelOnRedeliveryTest.java
 (from r732793, 
activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/DeadLetterChannelOnRedeliveryTest.java)
URL: 
http://svn.apache.org/viewvc/activemq/camel/branches/camel-1.x/camel-core/src/test/java/org/apache/camel/processor/DeadLetterChannelOnRedeliveryTest.java?p2=activemq/camel/branches/camel-1.x/camel-core/src/test/java/org/apache/camel/processor/DeadLetterChannelOnRedeliveryTest.java&p1=activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/DeadLetterChannelOnRedeliveryTest.java&r1=732793&r2=732808&rev=732808&view=diff
==============================================================================
--- 
activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/DeadLetterChannelOnRedeliveryTest.java
 (original)
+++ 
activemq/camel/branches/camel-1.x/camel-core/src/test/java/org/apache/camel/processor/DeadLetterChannelOnRedeliveryTest.java
 Thu Jan  8 11:47:16 2009
@@ -21,7 +21,6 @@
 import org.apache.camel.Processor;
 import org.apache.camel.builder.RouteBuilder;
 import org.apache.camel.component.mock.MockEndpoint;
-import org.apache.camel.processor.onexception.MyTechnicalException;
 
 /**
  * Unit test for testing possibility to modify exchange before redelivering
@@ -67,7 +66,7 @@
                 errorHandler(deadLetterChannel("mock:error")
                         .onRedelivery(new MyRedeliverPrcessor())
                         // setting delay to zero is just to make unit teting 
faster
-                        .delay(0L));
+                        .initialRedeliveryDelay(0L));
                 // END SNIPPET: e1
 
 
@@ -75,7 +74,7 @@
                     public void process(Exchange exchange) throws Exception {
                         // force some error so Camel will do redelivery
                         if (++counter <= 3) {
-                            throw new MyTechnicalException("Forced by unit 
test");
+                            throw new IllegalArgumentException("Forced by unit 
test");
                         }
                     }
                 }).to("mock:result");

Modified: 
activemq/camel/branches/camel-1.x/camel-core/src/test/resources/log4j.properties
URL: 
http://svn.apache.org/viewvc/activemq/camel/branches/camel-1.x/camel-core/src/test/resources/log4j.properties?rev=732808&r1=732807&r2=732808&view=diff
==============================================================================
--- 
activemq/camel/branches/camel-1.x/camel-core/src/test/resources/log4j.properties
 (original)
+++ 
activemq/camel/branches/camel-1.x/camel-core/src/test/resources/log4j.properties
 Thu Jan  8 11:47:16 2009
@@ -21,7 +21,7 @@
 log4j.rootLogger=INFO, out
 
 log4j.logger.org.apache.activemq.spring=WARN
-log4j.logger.org.apache.camel=DEBUG
+#log4j.logger.org.apache.camel=DEBUG
 log4j.logger.org.apache.camel.impl.converter=WARN
 
 # CONSOLE appender not used by default


Reply via email to