Author: davsclaus
Date: Tue May 29 12:52:20 2012
New Revision: 1343704

URL: http://svn.apache.org/viewvc?rev=1343704&view=rev
Log:
CAMEL-5316: Failover EIP now detects shutdown in progress and breaks out from 
failover loop.

Added:
    
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/FailoverLoadBalancerBreakoutDuringShutdownTest.java
      - copied, changed from r1343673, 
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/RedeliveryErrorHandlerBreakoutDuringShutdownTest.java
Modified:
    
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/loadbalancer/FailOverLoadBalancer.java

Modified: 
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/loadbalancer/FailOverLoadBalancer.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/loadbalancer/FailOverLoadBalancer.java?rev=1343704&r1=1343703&r2=1343704&view=diff
==============================================================================
--- 
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/loadbalancer/FailOverLoadBalancer.java
 (original)
+++ 
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/loadbalancer/FailOverLoadBalancer.java
 Tue May 29 12:52:20 2012
@@ -17,10 +17,13 @@
 package org.apache.camel.processor.loadbalancer;
 
 import java.util.List;
+import java.util.concurrent.RejectedExecutionException;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.camel.AsyncCallback;
 import org.apache.camel.AsyncProcessor;
+import org.apache.camel.CamelContext;
+import org.apache.camel.CamelContextAware;
 import org.apache.camel.Exchange;
 import org.apache.camel.Processor;
 import org.apache.camel.Traceable;
@@ -36,9 +39,10 @@ import org.apache.camel.util.ObjectHelpe
  * as the failover load balancer is a specialized pipeline. So the trick is to 
keep doing the same as the
  * pipeline to ensure it works the same and the async routing engine is 
flawless.
  */
-public class FailOverLoadBalancer extends LoadBalancerSupport implements 
Traceable {
+public class FailOverLoadBalancer extends LoadBalancerSupport implements 
Traceable, CamelContextAware {
 
     private final List<Class<?>> exceptions;
+    private CamelContext camelContext;
     private boolean roundRobin;
     private int maximumFailoverAttempts = -1;
 
@@ -60,6 +64,16 @@ public class FailOverLoadBalancer extend
         }
     }
 
+    @Override
+    public CamelContext getCamelContext() {
+        return camelContext;
+    }
+
+    @Override
+    public void setCamelContext(CamelContext camelContext) {
+        this.camelContext = camelContext;
+    }
+
     public List<Class<?>> getExceptions() {
         return exceptions;
     }
@@ -113,6 +127,16 @@ public class FailOverLoadBalancer extend
         return answer;
     }
 
+    @Override
+    public boolean isRunAllowed() {
+        // determine if we can still run, or the camel context is forcing a 
shutdown
+        boolean forceShutdown = 
camelContext.getShutdownStrategy().forceShutdown(this);
+        if (forceShutdown) {
+            log.trace("Run not allowed as ShutdownStrategy is forcing shutting 
down");
+        }
+        return !forceShutdown && super.isRunAllowed();
+    }
+
     public boolean process(final Exchange exchange, final AsyncCallback 
callback) {
         final List<Processor> processors = getProcessors();
 
@@ -133,6 +157,18 @@ public class FailOverLoadBalancer extend
         log.trace("Failover starting with endpoint index {}", index);
 
         while (first || shouldFailOver(copy)) {
+
+            // can we still run
+            if (!isRunAllowed()) {
+                log.trace("Run not allowed, will reject executing exchange: 
{}", exchange);
+                if (exchange.getException() == null) {
+                    exchange.setException(new RejectedExecutionException());
+                }
+                // we cannot process so invoke callback
+                callback.done(true);
+                return true;
+            }
+
             if (!first) {
                 attempts.incrementAndGet();
                 // are we exhausted by attempts?
@@ -240,6 +276,17 @@ public class FailOverLoadBalancer extend
             }
 
             while (shouldFailOver(copy)) {
+
+                // can we still run
+                if (!isRunAllowed()) {
+                    log.trace("Run not allowed, will reject executing 
exchange: {}", exchange);
+                    if (exchange.getException() == null) {
+                        exchange.setException(new 
RejectedExecutionException());
+                    }
+                    // we cannot process so invoke callback
+                    callback.done(false);
+                }
+
                 attempts.incrementAndGet();
                 // are we exhausted by attempts?
                 if (maximumFailoverAttempts > -1 && attempts.get() > 
maximumFailoverAttempts) {

Copied: 
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/FailoverLoadBalancerBreakoutDuringShutdownTest.java
 (from r1343673, 
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/RedeliveryErrorHandlerBreakoutDuringShutdownTest.java)
URL: 
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/FailoverLoadBalancerBreakoutDuringShutdownTest.java?p2=camel/trunk/camel-core/src/test/java/org/apache/camel/processor/FailoverLoadBalancerBreakoutDuringShutdownTest.java&p1=camel/trunk/camel-core/src/test/java/org/apache/camel/processor/RedeliveryErrorHandlerBreakoutDuringShutdownTest.java&r1=1343673&r2=1343704&rev=1343704&view=diff
==============================================================================
--- 
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/RedeliveryErrorHandlerBreakoutDuringShutdownTest.java
 (original)
+++ 
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/FailoverLoadBalancerBreakoutDuringShutdownTest.java
 Tue May 29 12:52:20 2012
@@ -23,11 +23,11 @@ import org.apache.camel.builder.RouteBui
 import org.apache.camel.util.StopWatch;
 
 /**
- * Tests that the redelivery error handler will break out if CamelContext is 
shutting down.
+ * Tests that the failover load balancer will break out if CamelContext is 
shutting down.
  */
-public class RedeliveryErrorHandlerBreakoutDuringShutdownTest extends 
ContextTestSupport {
+public class FailoverLoadBalancerBreakoutDuringShutdownTest extends 
ContextTestSupport {
 
-    public void testRedelivery() throws Exception {
+    public void testFailover() throws Exception {
 
         getMockEndpoint("mock:before").expectedMessageCount(1);
         getMockEndpoint("mock:after").expectedMessageCount(0);
@@ -44,7 +44,7 @@ public class RedeliveryErrorHandlerBreak
         context.stop();
 
         // should take less than 5 seconds
-        assertTrue("Should take less than 5 seconds, was {}", watch.stop() < 
5000);
+        assertTrue("Should take less than 5 seconds, was " + watch.taken(), 
watch.stop() < 5000);
     }
 
     @Override
@@ -52,17 +52,29 @@ public class RedeliveryErrorHandlerBreak
         return new RouteBuilder() {
             @Override
             public void configure() throws Exception {
-                // just keep on redelivering
-                
errorHandler(defaultErrorHandler().maximumRedeliveries(-1).redeliveryDelay(1000));
 
                 from("seda:start")
                     .to("mock:before")
+                    // just keep on failover
+                    .loadBalance().failover(-1, false, true)
+                        .to("direct:a")
+                        .to("direct:b")
+                    .end()
+                    .to("mock:after");
+
+                from("direct:a")
                     .process(new Processor() {
                         public void process(Exchange exchange) throws 
Exception {
                             throw new IllegalArgumentException("Forced");
                         }
-                    })
-                    .to("mock:after");
+                    });
+
+                from("direct:b")
+                    .process(new Processor() {
+                        public void process(Exchange exchange) throws 
Exception {
+                            throw new IllegalArgumentException("Forced");
+                        }
+                    });
             }
         };
     }


Reply via email to