Author: ningjiang
Date: Tue Dec  2 18:54:15 2008
New Revision: 722726

URL: http://svn.apache.org/viewvc?rev=722726&view=rev
Log:
CAMEL-1129 Enhance ErrorHandler RedeliveryPolicy with a Timer manager to avoid 
locking current thread while sleeping

Modified:
    
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/DeadLetterChannel.java
    
activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/DeadLetterChannelTest.java

Modified: 
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/DeadLetterChannel.java
URL: 
http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/DeadLetterChannel.java?rev=722726&r1=722725&r2=722726&view=diff
==============================================================================
--- 
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/DeadLetterChannel.java
 (original)
+++ 
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/DeadLetterChannel.java
 Tue Dec  2 18:54:15 2008
@@ -16,6 +16,8 @@
  */
 package org.apache.camel.processor;
 
+import java.util.Timer;
+import java.util.TimerTask;
 import java.util.concurrent.RejectedExecutionException;
 
 import org.apache.camel.AsyncCallback;
@@ -50,6 +52,7 @@
     private static final transient Log LOG = 
LogFactory.getLog(DeadLetterChannel.class);
     private static final String FAILURE_HANDLED_PROPERTY = 
DeadLetterChannel.class.getName() + ".FAILURE_HANDLED";
     
+    private static Timer timer = new Timer();
     private Processor output;
     private Processor deadLetter;
     private AsyncProcessor outputAsync;
@@ -67,6 +70,40 @@
         RedeliveryPolicy currentRedeliveryPolicy = redeliveryPolicy;
         Processor failureProcessor = deadLetter;
     }
+    
+    private class RedeliverTimerTask extends TimerTask {
+        private final Exchange exchange;
+        private final AsyncCallback callback;
+        private final RedeliveryData data;
+        
+        public RedeliverTimerTask(Exchange exchange, AsyncCallback callback, 
RedeliveryData data) {
+            this.exchange = exchange;
+            this.callback = callback;
+            this.data = data;
+        }
+
+        @Override
+        public void run() {
+            //only handle the real AsyncProcess the exchange 
+            outputAsync.process(exchange, new AsyncCallback() {
+                public void done(boolean sync) {
+                    // Only handle the async case...
+                    if (sync) {
+                        return;
+                    }
+                    data.sync = false;
+                    // only process if the exchange hasn't failed
+                    // and it has not been handled by the error processor
+                    if (exchange.getException() != null && 
!isFailureHandled(exchange)) {
+                        // if we are redelivering then sleep before trying 
again
+                        asyncProcess(exchange, callback, data);
+                    } else {
+                        callback.done(sync);
+                    }
+                }
+            });                
+        } 
+    }
 
     public DeadLetterChannel(Processor output, Processor deadLetter) {
         this(output, deadLetter, new RedeliveryPolicy(), 
DeadLetterChannel.createDefaultLogger(),
@@ -126,49 +163,14 @@
 
             // did previous processing caused an exception?
             if (exchange.getException() != null) {
-                Throwable e = exchange.getException();
-                // set the original caused exception
-                exchange.setProperty(EXCEPTION_CAUSE_PROPERTY, e);
-
-                // find the error handler to use (if any)
-                ExceptionType exceptionPolicy = getExceptionPolicy(exchange, 
e);
-                if (exceptionPolicy != null) {
-                    data.currentRedeliveryPolicy = 
exceptionPolicy.createRedeliveryPolicy(exchange.getContext(), 
data.currentRedeliveryPolicy);
-                    data.handledPredicate = exceptionPolicy.getHandledPolicy();
-                    data.retryUntilPredicate = 
exceptionPolicy.getRetryUntilPolicy();
-                    Processor processor = exceptionPolicy.getErrorHandler();
-                    if (processor != null) {
-                        data.failureProcessor = processor;
-                    }                    
-                }
-                
-                logFailedDelivery(true, exchange, "Failed delivery for 
exchangeId: " + exchange.getExchangeId()
-                        + ". On delivery attempt: " + data.redeliveryCounter + 
" caught: " + e, data, e);
-                data.redeliveryCounter = incrementRedeliveryCounter(exchange, 
e);
+                handleException(exchange, data);
             }
 
             // compute if we should redeliver or not
             boolean shouldRedeliver = shouldRedeliver(exchange, data);
             if (!shouldRedeliver) {
-                // we did not success with the redelivery so now we let the 
failure processor handle it
-                setFailureHandled(exchange);
-                // must decrement the redelivery counter as we didn't process 
the redelivery but is
-                // handling by the failure handler. So we must -1 to not let 
the counter be out-of-sync
-                decrementRedeliveryCounter(exchange);
-
-                AsyncProcessor afp = 
AsyncProcessorTypeConverter.convert(data.failureProcessor);
-                boolean sync = afp.process(exchange, new AsyncCallback() {
-                    public void done(boolean sync) {
-                        restoreExceptionOnExchange(exchange, 
data.handledPredicate);
-                        callback.done(data.sync);
-                    }
-                });
-
-                // The line below shouldn't be needed, it is invoked by the 
AsyncCallback above
-                //restoreExceptionOnExchange(exchange, data.handledPredicate);
-                logFailedDelivery(false, exchange, "Failed delivery for 
exchangeId: " + exchange.getExchangeId()
-                        + ". Handled by the failure processor: " + 
data.failureProcessor, data, null);
-                return sync;
+                return deliverToFaultProcessor(exchange, callback, data);
+                
             }
 
             // if we are redelivering then sleep before trying again
@@ -192,8 +194,9 @@
                     data.sync = false;
                     // only process if the exchange hasn't failed
                     // and it has not been handled by the error processor
-                    if (exchange.getException() != null && 
!isFailureHandled(exchange)) {
-                        process(exchange, callback, data);
+                    if (exchange.getException() != null && 
!isFailureHandled(exchange)) {                        
+                        //TODO Call the Timer for the asyncProcessor
+                        asyncProcess(exchange, callback, data);
                     } else {
                         callback.done(sync);
                     }
@@ -213,6 +216,97 @@
 
     }
 
+    protected void asyncProcess(final Exchange exchange, final AsyncCallback 
callback, final RedeliveryData data) {
+        // set the timer here
+        if (!isRunAllowed()) {
+            if (exchange.getException() == null) {
+                exchange.setException(new RejectedExecutionException());
+            }
+            callback.done(data.sync);
+            return;
+        }
+
+        // if the exchange is transacted then let the underlying system handle 
the redelivery etc.
+        // this DeadLetterChannel is only for non transacted exchanges
+        if (exchange.isTransacted() && exchange.getException() != null) {
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("This is a transacted exchange, bypassing this 
DeadLetterChannel: " + this + " for exchange: " + exchange);
+            }
+            return;
+        }
+        
+        // did previous processing caused an exception?
+        if (exchange.getException() != null) {
+            handleException(exchange, data);
+        }
+        
+        // compute if we should redeliver or not
+        boolean shouldRedeliver = shouldRedeliver(exchange, data);
+        if (!shouldRedeliver) {
+            deliverToFaultProcessor(exchange, callback, data);
+            return;
+        }
+        
+        // process the next try
+        // if we are redelivering then sleep before trying again
+        if (data.redeliveryCounter > 0) {
+            // okay we will give it another go so clear the exception so we 
can try again
+            if (exchange.getException() != null) {
+                exchange.setException(null);
+            }
+            // wait until we should redeliver
+            data.redeliveryDelay = 
data.currentRedeliveryPolicy.getRedeliveryDelay(data.redeliveryDelay);
+            timer.schedule(new RedeliverTimerTask(exchange, callback, data), 
data.redeliveryDelay);
+        }        
+    }
+    
+    private void handleException(Exchange exchange, RedeliveryData data) {
+        Throwable e = exchange.getException();
+        // set the original caused exception
+        exchange.setProperty(EXCEPTION_CAUSE_PROPERTY, e);
+
+        // find the error handler to use (if any)
+        ExceptionType exceptionPolicy = getExceptionPolicy(exchange, e);
+        if (exceptionPolicy != null) {
+            data.currentRedeliveryPolicy = 
exceptionPolicy.createRedeliveryPolicy(exchange.getContext(), 
data.currentRedeliveryPolicy);
+            data.handledPredicate = exceptionPolicy.getHandledPolicy();
+            data.retryUntilPredicate = exceptionPolicy.getRetryUntilPolicy();
+            Processor processor = exceptionPolicy.getErrorHandler();
+            if (processor != null) {
+                data.failureProcessor = processor;
+            }                    
+        }
+        
+        logFailedDelivery(true, exchange, "Failed delivery for exchangeId: " + 
exchange.getExchangeId()
+                + ". On delivery attempt: " + data.redeliveryCounter + " 
caught: " + e, data, e);
+        data.redeliveryCounter = incrementRedeliveryCounter(exchange, e);
+        
+    }
+    
+    private boolean deliverToFaultProcessor(final Exchange exchange, final 
AsyncCallback callback,
+                                            final RedeliveryData data) {
+        // we did not success with the redelivery so now we let the failure 
processor handle it
+        setFailureHandled(exchange);
+        // must decrement the redelivery counter as we didn't process the 
redelivery but is
+        // handling by the failure handler. So we must -1 to not let the 
counter be out-of-sync
+        decrementRedeliveryCounter(exchange);
+
+        AsyncProcessor afp = 
AsyncProcessorTypeConverter.convert(data.failureProcessor);
+        boolean sync = afp.process(exchange, new AsyncCallback() {
+            public void done(boolean sync) {
+                restoreExceptionOnExchange(exchange, data.handledPredicate);
+                callback.done(data.sync);
+            }
+        });
+
+        // The line below shouldn't be needed, it is invoked by the 
AsyncCallback above
+        // restoreExceptionOnExchange(exchange, data.handledPredicate);
+        logFailedDelivery(false, exchange, "Failed delivery for exchangeId: " 
+ exchange.getExchangeId()
+                                           + ". Handled by the failure 
processor: " + data.failureProcessor,
+                          data, null);
+        return sync;
+    }
+
     // Properties
     // 
-------------------------------------------------------------------------
 
@@ -344,5 +438,6 @@
     @Override
     protected void doStop() throws Exception {
         ServiceHelper.stopServices(deadLetter, output);
-    }
+    }    
+    
 }

Modified: 
activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/DeadLetterChannelTest.java
URL: 
http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/DeadLetterChannelTest.java?rev=722726&r1=722725&r2=722726&view=diff
==============================================================================
--- 
activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/DeadLetterChannelTest.java
 (original)
+++ 
activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/DeadLetterChannelTest.java
 Tue Dec  2 18:54:15 2008
@@ -16,6 +16,8 @@
  */
 package org.apache.camel.processor;
 
+import org.apache.camel.AsyncCallback;
+import org.apache.camel.AsyncProcessor;
 import org.apache.camel.ContextTestSupport;
 import org.apache.camel.Endpoint;
 import org.apache.camel.Exchange;
@@ -76,7 +78,7 @@
     }
 
     protected RouteBuilder createRouteBuilder() {
-        final Processor processor = new Processor() {
+        final Processor processor = new AsyncProcessor() {
             public void process(Exchange exchange) {
                 Integer counter = 
exchange.getIn().getHeader(DeadLetterChannel.REDELIVERY_COUNTER,
                                                              Integer.class);
@@ -86,13 +88,30 @@
                                                + " being less than: " + 
failUntilAttempt);
                 }
             }
+
+            public boolean process(Exchange exchange, AsyncCallback callback) 
{                
+                Integer counter = 
exchange.getIn().getHeader(DeadLetterChannel.REDELIVERY_COUNTER,
+                                                             Integer.class);
+                int attempt = (counter == null) ? 1 : counter + 1;
+                if (attempt > 1) {
+                    assertEquals("Now we should use TimerThread to call the 
process", Thread.currentThread().getName(), "Timer-0");
+                }
+                
+                if (attempt < failUntilAttempt) {
+                    // we can't throw the exception here , or the callback 
will not be invoked.
+                    exchange.setException(new RuntimeException("Failed to 
process due to attempt: " + attempt
+                                               + " being less than: " + 
failUntilAttempt));
+                }
+                callback.done(false);
+                return false;
+            }
         };
 
         return new RouteBuilder() {
             public void configure() {
                 from("direct:start").errorHandler(
                     deadLetterChannel("mock:failed").maximumRedeliveries(2)
-                        .delay(1)
+                        .delay(1000)
                         .loggingLevel(LoggingLevel.DEBUG)
 
                 ).process(processor).to("mock:success");


Reply via email to