Author: davsclaus
Date: Thu Mar 15 07:57:53 2012
New Revision: 1300831
URL: http://svn.apache.org/viewvc?rev=1300831&view=rev
Log:
Failover EIP - Should use defensive copy of exchange before failover to avoid
side effects
Added:
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/FailOverLoadBalancerSetFaultTest.java
Modified:
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/loadbalancer/FailOverLoadBalancer.java
Modified:
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/loadbalancer/FailOverLoadBalancer.java
URL:
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/loadbalancer/FailOverLoadBalancer.java?rev=1300831&r1=1300830&r2=1300831&view=diff
==============================================================================
---
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/loadbalancer/FailOverLoadBalancer.java
(original)
+++
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/loadbalancer/FailOverLoadBalancer.java
Thu Mar 15 07:57:53 2012
@@ -26,6 +26,7 @@ import org.apache.camel.Processor;
import org.apache.camel.Traceable;
import org.apache.camel.util.AsyncProcessorConverterHelper;
import org.apache.camel.util.AsyncProcessorHelper;
+import org.apache.camel.util.ExchangeHelper;
import org.apache.camel.util.ObjectHelper;
/**
@@ -86,6 +87,10 @@ public class FailOverLoadBalancer extend
* @return <tt>true</tt> to failover
*/
protected boolean shouldFailOver(Exchange exchange) {
+ if (exchange == null) {
+ return false;
+ }
+
boolean answer = false;
if (exchange.getException() != null) {
@@ -108,12 +113,15 @@ public class FailOverLoadBalancer extend
return answer;
}
- public boolean process(Exchange exchange, AsyncCallback callback) {
+ public boolean process(final Exchange exchange, final AsyncCallback
callback) {
final List<Processor> processors = getProcessors();
final AtomicInteger index = new AtomicInteger();
final AtomicInteger attempts = new AtomicInteger();
boolean first = true;
+ // use a copy of the original exchange before failover to avoid
populating side effects
+ // directly into the original exchange
+ Exchange copy = null;
// get the next processor
if (isRoundRobin()) {
@@ -124,7 +132,7 @@ public class FailOverLoadBalancer extend
}
log.trace("Failover starting with endpoint index {}", index);
- while (first || shouldFailOver(exchange)) {
+ while (first || shouldFailOver(copy)) {
if (!first) {
attempts.incrementAndGet();
// are we exhausted by attempts?
@@ -153,12 +161,12 @@ public class FailOverLoadBalancer extend
}
}
- // try again but prepare exchange before we failover
- prepareExchangeForFailover(exchange);
+ // try again but copy original exchange before we failover
+ copy = prepareExchangeForFailover(exchange);
Processor processor = processors.get(index.get());
// process the exchange
- boolean sync = processExchange(processor, exchange, attempts,
index, callback, processors);
+ boolean sync = processExchange(processor, exchange, copy,
attempts, index, callback, processors);
// continue as long its being processed synchronously
if (!sync) {
@@ -171,8 +179,11 @@ public class FailOverLoadBalancer extend
log.trace("Processing exchangeId: {} is continued being processed
synchronously", exchange.getExchangeId());
}
+ // and copy the current result to original so it will contain this
result of this eip
+ if (copy != null) {
+ ExchangeHelper.copyResults(exchange, copy);
+ }
log.debug("Failover complete for exchangeId: {} >>> {}",
exchange.getExchangeId(), exchange);
-
callback.done(true);
return true;
}
@@ -181,35 +192,23 @@ public class FailOverLoadBalancer extend
* Prepares the exchange for failover
*
* @param exchange the exchange
+ * @return a copy of the exchange to use for failover
*/
- protected void prepareExchangeForFailover(Exchange exchange) {
- if (exchange.getException() != null) {
- if (log.isDebugEnabled()) {
- log.debug("Failover due {} for exchangeId: {}",
exchange.getException().getMessage(), exchange.getExchangeId());
- }
-
- // clear exception so we can try failover
- exchange.setException(null);
- }
-
- exchange.setProperty(Exchange.ERRORHANDLER_HANDLED, null);
- exchange.setProperty(Exchange.FAILURE_HANDLED, null);
- exchange.setProperty(Exchange.EXCEPTION_CAUGHT, null);
- exchange.getIn().removeHeader(Exchange.REDELIVERED);
- exchange.getIn().removeHeader(Exchange.REDELIVERY_COUNTER);
- exchange.getIn().removeHeader(Exchange.REDELIVERY_MAX_COUNTER);
+ protected Exchange prepareExchangeForFailover(Exchange exchange) {
+ // use a copy of the exchange to avoid side effects on the original
exchange
+ return ExchangeHelper.createCopy(exchange, true);
}
- private boolean processExchange(Processor processor, Exchange exchange,
+ private boolean processExchange(Processor processor, Exchange exchange,
Exchange copy,
AtomicInteger attempts, AtomicInteger
index,
AsyncCallback callback, List<Processor>
processors) {
if (processor == null) {
- throw new IllegalStateException("No processors could be chosen to
process " + exchange);
+ throw new IllegalStateException("No processors could be chosen to
process " + copy);
}
- log.debug("Processing failover at attempt {} for {}", attempts,
exchange);
+ log.debug("Processing failover at attempt {} for {}", attempts, copy);
AsyncProcessor albp = AsyncProcessorConverterHelper.convert(processor);
- return AsyncProcessorHelper.process(albp, exchange, new
FailOverAsyncCallback(exchange, attempts, index, callback, processors));
+ return AsyncProcessorHelper.process(albp, copy, new
FailOverAsyncCallback(exchange, copy, attempts, index, callback, processors));
}
/**
@@ -219,13 +218,15 @@ public class FailOverLoadBalancer extend
private final class FailOverAsyncCallback implements AsyncCallback {
private final Exchange exchange;
+ private Exchange copy;
private final AtomicInteger attempts;
private final AtomicInteger index;
private final AsyncCallback callback;
private final List<Processor> processors;
- private FailOverAsyncCallback(Exchange exchange, AtomicInteger
attempts, AtomicInteger index, AsyncCallback callback, List<Processor>
processors) {
+ private FailOverAsyncCallback(Exchange exchange, Exchange copy,
AtomicInteger attempts, AtomicInteger index, AsyncCallback callback,
List<Processor> processors) {
this.exchange = exchange;
+ this.copy = copy;
this.attempts = attempts;
this.index = index;
this.callback = callback;
@@ -238,7 +239,7 @@ public class FailOverLoadBalancer extend
return;
}
- while (shouldFailOver(exchange)) {
+ while (shouldFailOver(copy)) {
attempts.incrementAndGet();
// are we exhausted by attempts?
if (maximumFailoverAttempts > -1 && attempts.get() >
maximumFailoverAttempts) {
@@ -263,11 +264,11 @@ public class FailOverLoadBalancer extend
}
// try again but prepare exchange before we failover
- prepareExchangeForFailover(exchange);
+ copy = prepareExchangeForFailover(exchange);
Processor processor = processors.get(index.get());
// try to failover using the next processor
- doneSync = processExchange(processor, exchange, attempts,
index, callback, processors);
+ doneSync = processExchange(processor, exchange, copy,
attempts, index, callback, processors);
if (!doneSync) {
log.trace("Processing exchangeId: {} is continued being
processed asynchronously", exchange.getExchangeId());
// the remainder of the failover will be completed async
@@ -276,8 +277,11 @@ public class FailOverLoadBalancer extend
}
}
+ // and copy the current result to original so it will contain this
result of this eip
+ if (copy != null) {
+ ExchangeHelper.copyResults(exchange, copy);
+ }
log.debug("Failover complete for exchangeId: {} >>> {}",
exchange.getExchangeId(), exchange);
-
// signal callback we are done
callback.done(false);
};
Added:
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/FailOverLoadBalancerSetFaultTest.java
URL:
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/FailOverLoadBalancerSetFaultTest.java?rev=1300831&view=auto
==============================================================================
---
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/FailOverLoadBalancerSetFaultTest.java
(added)
+++
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/FailOverLoadBalancerSetFaultTest.java
Thu Mar 15 07:57:53 2012
@@ -0,0 +1,73 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor;
+
+import java.io.IOException;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.camel.builder.RouteBuilder;
+
+/**
+ *
+ */
+public class FailOverLoadBalancerSetFaultTest extends ContextTestSupport {
+
+ public void testFailOverSetFault() throws Exception {
+ getMockEndpoint("mock:failover1").expectedBodiesReceived("Hello
World");
+ getMockEndpoint("mock:failover2").expectedBodiesReceived("Hello
World");
+
+ String out = template.requestBody("direct:start", "Hello World",
String.class);
+ assertEquals("Bye Camel", out);
+
+ assertMockEndpointsSatisfied();
+ }
+
+ @Override
+ protected RouteBuilder createRouteBuilder() throws Exception {
+ return new RouteBuilder() {
+ @Override
+ public void configure() throws Exception {
+ from("direct:start")
+ .loadBalance().failover(1, false, false, IOException.class)
+ .to("seda:failover1", "seda:failover2")
+ .end();
+
+ from("seda:failover1")
+ .to("mock:failover1")
+ .process(new Processor() {
+ public void process(Exchange exchange) throws
Exception {
+ // mutate the message
+ exchange.getOut().setBody("Hi Camel");
+ // and then set fault directly on OUT for
example as camel-cxf would do
+ exchange.getOut().setFault(true);
+ exchange.setException(new IOException("Forced
exception for test"));
+ }
+ });
+
+ from("seda:failover2")
+ .to("mock:failover2")
+ .process(new Processor() {
+ public void process(Exchange exchange) throws
Exception {
+ exchange.getOut().setBody("Bye Camel");
+ }
+ });
+ }
+ };
+ }
+}