Author: davsclaus
Date: Tue May 29 12:52:20 2012
New Revision: 1343704
URL: http://svn.apache.org/viewvc?rev=1343704&view=rev
Log:
CAMEL-5316: Failover EIP now detects shutdown in progress and breaks out from
failover loop.
Added:
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/FailoverLoadBalancerBreakoutDuringShutdownTest.java
- copied, changed from r1343673,
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/RedeliveryErrorHandlerBreakoutDuringShutdownTest.java
Modified:
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/loadbalancer/FailOverLoadBalancer.java
Modified:
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/loadbalancer/FailOverLoadBalancer.java
URL:
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/loadbalancer/FailOverLoadBalancer.java?rev=1343704&r1=1343703&r2=1343704&view=diff
==============================================================================
---
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/loadbalancer/FailOverLoadBalancer.java
(original)
+++
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/loadbalancer/FailOverLoadBalancer.java
Tue May 29 12:52:20 2012
@@ -17,10 +17,13 @@
package org.apache.camel.processor.loadbalancer;
import java.util.List;
+import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.camel.AsyncCallback;
import org.apache.camel.AsyncProcessor;
+import org.apache.camel.CamelContext;
+import org.apache.camel.CamelContextAware;
import org.apache.camel.Exchange;
import org.apache.camel.Processor;
import org.apache.camel.Traceable;
@@ -36,9 +39,10 @@ import org.apache.camel.util.ObjectHelpe
* as the failover load balancer is a specialized pipeline. So the trick is to
keep doing the same as the
* pipeline to ensure it works the same and the async routing engine is
flawless.
*/
-public class FailOverLoadBalancer extends LoadBalancerSupport implements
Traceable {
+public class FailOverLoadBalancer extends LoadBalancerSupport implements
Traceable, CamelContextAware {
private final List<Class<?>> exceptions;
+ private CamelContext camelContext;
private boolean roundRobin;
private int maximumFailoverAttempts = -1;
@@ -60,6 +64,16 @@ public class FailOverLoadBalancer extend
}
}
+ @Override
+ public CamelContext getCamelContext() {
+ return camelContext;
+ }
+
+ @Override
+ public void setCamelContext(CamelContext camelContext) {
+ this.camelContext = camelContext;
+ }
+
public List<Class<?>> getExceptions() {
return exceptions;
}
@@ -113,6 +127,16 @@ public class FailOverLoadBalancer extend
return answer;
}
+ @Override
+ public boolean isRunAllowed() {
+ // determine if we can still run, or the camel context is forcing a
shutdown
+ boolean forceShutdown =
camelContext.getShutdownStrategy().forceShutdown(this);
+ if (forceShutdown) {
+ log.trace("Run not allowed as ShutdownStrategy is forcing shutting
down");
+ }
+ return !forceShutdown && super.isRunAllowed();
+ }
+
public boolean process(final Exchange exchange, final AsyncCallback
callback) {
final List<Processor> processors = getProcessors();
@@ -133,6 +157,18 @@ public class FailOverLoadBalancer extend
log.trace("Failover starting with endpoint index {}", index);
while (first || shouldFailOver(copy)) {
+
+ // can we still run
+ if (!isRunAllowed()) {
+ log.trace("Run not allowed, will reject executing exchange:
{}", exchange);
+ if (exchange.getException() == null) {
+ exchange.setException(new RejectedExecutionException());
+ }
+ // we cannot process so invoke callback
+ callback.done(true);
+ return true;
+ }
+
if (!first) {
attempts.incrementAndGet();
// are we exhausted by attempts?
@@ -240,6 +276,17 @@ public class FailOverLoadBalancer extend
}
while (shouldFailOver(copy)) {
+
+ // can we still run
+ if (!isRunAllowed()) {
+ log.trace("Run not allowed, will reject executing
exchange: {}", exchange);
+ if (exchange.getException() == null) {
+ exchange.setException(new
RejectedExecutionException());
+ }
+ // we cannot process so invoke callback
+ callback.done(false);
+ }
+
attempts.incrementAndGet();
// are we exhausted by attempts?
if (maximumFailoverAttempts > -1 && attempts.get() >
maximumFailoverAttempts) {
Copied:
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/FailoverLoadBalancerBreakoutDuringShutdownTest.java
(from r1343673,
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/RedeliveryErrorHandlerBreakoutDuringShutdownTest.java)
URL:
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/FailoverLoadBalancerBreakoutDuringShutdownTest.java?p2=camel/trunk/camel-core/src/test/java/org/apache/camel/processor/FailoverLoadBalancerBreakoutDuringShutdownTest.java&p1=camel/trunk/camel-core/src/test/java/org/apache/camel/processor/RedeliveryErrorHandlerBreakoutDuringShutdownTest.java&r1=1343673&r2=1343704&rev=1343704&view=diff
==============================================================================
---
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/RedeliveryErrorHandlerBreakoutDuringShutdownTest.java
(original)
+++
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/FailoverLoadBalancerBreakoutDuringShutdownTest.java
Tue May 29 12:52:20 2012
@@ -23,11 +23,11 @@ import org.apache.camel.builder.RouteBui
import org.apache.camel.util.StopWatch;
/**
- * Tests that the redelivery error handler will break out if CamelContext is
shutting down.
+ * Tests that the failover load balancer will break out if CamelContext is
shutting down.
*/
-public class RedeliveryErrorHandlerBreakoutDuringShutdownTest extends
ContextTestSupport {
+public class FailoverLoadBalancerBreakoutDuringShutdownTest extends
ContextTestSupport {
- public void testRedelivery() throws Exception {
+ public void testFailover() throws Exception {
getMockEndpoint("mock:before").expectedMessageCount(1);
getMockEndpoint("mock:after").expectedMessageCount(0);
@@ -44,7 +44,7 @@ public class RedeliveryErrorHandlerBreak
context.stop();
// should take less than 5 seconds
- assertTrue("Should take less than 5 seconds, was {}", watch.stop() <
5000);
+ assertTrue("Should take less than 5 seconds, was " + watch.taken(),
watch.stop() < 5000);
}
@Override
@@ -52,17 +52,29 @@ public class RedeliveryErrorHandlerBreak
return new RouteBuilder() {
@Override
public void configure() throws Exception {
- // just keep on redelivering
-
errorHandler(defaultErrorHandler().maximumRedeliveries(-1).redeliveryDelay(1000));
from("seda:start")
.to("mock:before")
+ // just keep on failover
+ .loadBalance().failover(-1, false, true)
+ .to("direct:a")
+ .to("direct:b")
+ .end()
+ .to("mock:after");
+
+ from("direct:a")
.process(new Processor() {
public void process(Exchange exchange) throws
Exception {
throw new IllegalArgumentException("Forced");
}
- })
- .to("mock:after");
+ });
+
+ from("direct:b")
+ .process(new Processor() {
+ public void process(Exchange exchange) throws
Exception {
+ throw new IllegalArgumentException("Forced");
+ }
+ });
}
};
}