Author: davsclaus
Date: Thu Mar 15 07:57:53 2012
New Revision: 1300831

URL: http://svn.apache.org/viewvc?rev=1300831&view=rev
Log:
Failover EIP - Should use defensive copy of exchange before failover to avoid 
side effects

Added:
    
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/FailOverLoadBalancerSetFaultTest.java
Modified:
    
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/loadbalancer/FailOverLoadBalancer.java

Modified: 
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/loadbalancer/FailOverLoadBalancer.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/loadbalancer/FailOverLoadBalancer.java?rev=1300831&r1=1300830&r2=1300831&view=diff
==============================================================================
--- 
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/loadbalancer/FailOverLoadBalancer.java
 (original)
+++ 
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/loadbalancer/FailOverLoadBalancer.java
 Thu Mar 15 07:57:53 2012
@@ -26,6 +26,7 @@ import org.apache.camel.Processor;
 import org.apache.camel.Traceable;
 import org.apache.camel.util.AsyncProcessorConverterHelper;
 import org.apache.camel.util.AsyncProcessorHelper;
+import org.apache.camel.util.ExchangeHelper;
 import org.apache.camel.util.ObjectHelper;
 
 /**
@@ -86,6 +87,10 @@ public class FailOverLoadBalancer extend
      * @return <tt>true</tt> to failover
      */
     protected boolean shouldFailOver(Exchange exchange) {
+        if (exchange == null) {
+            return false;
+        }
+
         boolean answer = false;
 
         if (exchange.getException() != null) {
@@ -108,12 +113,15 @@ public class FailOverLoadBalancer extend
         return answer;
     }
 
-    public boolean process(Exchange exchange, AsyncCallback callback) {
+    public boolean process(final Exchange exchange, final AsyncCallback 
callback) {
         final List<Processor> processors = getProcessors();
 
         final AtomicInteger index = new AtomicInteger();
         final AtomicInteger attempts = new AtomicInteger();
         boolean first = true;
+        // use a copy of the original exchange before failover to avoid 
populating side effects
+        // directly into the original exchange
+        Exchange copy = null;
 
         // get the next processor
         if (isRoundRobin()) {
@@ -124,7 +132,7 @@ public class FailOverLoadBalancer extend
         }
         log.trace("Failover starting with endpoint index {}", index);
 
-        while (first || shouldFailOver(exchange)) {
+        while (first || shouldFailOver(copy)) {
             if (!first) {
                 attempts.incrementAndGet();
                 // are we exhausted by attempts?
@@ -153,12 +161,12 @@ public class FailOverLoadBalancer extend
                 }
             }
 
-            // try again but prepare exchange before we failover
-            prepareExchangeForFailover(exchange);
+            // try again but copy original exchange before we failover
+            copy = prepareExchangeForFailover(exchange);
             Processor processor = processors.get(index.get());
 
             // process the exchange
-            boolean sync = processExchange(processor, exchange, attempts, 
index, callback, processors);
+            boolean sync = processExchange(processor, exchange, copy, 
attempts, index, callback, processors);
 
             // continue as long its being processed synchronously
             if (!sync) {
@@ -171,8 +179,11 @@ public class FailOverLoadBalancer extend
             log.trace("Processing exchangeId: {} is continued being processed 
synchronously", exchange.getExchangeId());
         }
 
+        // and copy the current result to original so it will contain this 
result of this eip
+        if (copy != null) {
+            ExchangeHelper.copyResults(exchange, copy);
+        }
         log.debug("Failover complete for exchangeId: {} >>> {}", 
exchange.getExchangeId(), exchange);
-
         callback.done(true);
         return true;
     }
@@ -181,35 +192,23 @@ public class FailOverLoadBalancer extend
      * Prepares the exchange for failover
      *
      * @param exchange the exchange
+     * @return a copy of the exchange to use for failover
      */
-    protected void prepareExchangeForFailover(Exchange exchange) {
-        if (exchange.getException() != null) {
-            if (log.isDebugEnabled()) {
-                log.debug("Failover due {} for exchangeId: {}", 
exchange.getException().getMessage(), exchange.getExchangeId());
-            }
-
-            // clear exception so we can try failover
-            exchange.setException(null);
-        }
-
-        exchange.setProperty(Exchange.ERRORHANDLER_HANDLED, null);
-        exchange.setProperty(Exchange.FAILURE_HANDLED, null);
-        exchange.setProperty(Exchange.EXCEPTION_CAUGHT, null);
-        exchange.getIn().removeHeader(Exchange.REDELIVERED);
-        exchange.getIn().removeHeader(Exchange.REDELIVERY_COUNTER);
-        exchange.getIn().removeHeader(Exchange.REDELIVERY_MAX_COUNTER);
+    protected Exchange prepareExchangeForFailover(Exchange exchange) {
+        // use a copy of the exchange to avoid side effects on the original 
exchange
+        return ExchangeHelper.createCopy(exchange, true);
     }
 
-    private boolean processExchange(Processor processor, Exchange exchange,
+    private boolean processExchange(Processor processor, Exchange exchange, 
Exchange copy,
                                     AtomicInteger attempts, AtomicInteger 
index,
                                     AsyncCallback callback, List<Processor> 
processors) {
         if (processor == null) {
-            throw new IllegalStateException("No processors could be chosen to 
process " + exchange);
+            throw new IllegalStateException("No processors could be chosen to 
process " + copy);
         }
-        log.debug("Processing failover at attempt {} for {}", attempts, 
exchange);
+        log.debug("Processing failover at attempt {} for {}", attempts, copy);
 
         AsyncProcessor albp = AsyncProcessorConverterHelper.convert(processor);
-        return AsyncProcessorHelper.process(albp, exchange, new 
FailOverAsyncCallback(exchange, attempts, index, callback, processors));
+        return AsyncProcessorHelper.process(albp, copy, new 
FailOverAsyncCallback(exchange, copy, attempts, index, callback, processors));
     }
 
     /**
@@ -219,13 +218,15 @@ public class FailOverLoadBalancer extend
     private final class FailOverAsyncCallback implements AsyncCallback {
 
         private final Exchange exchange;
+        private Exchange copy;
         private final AtomicInteger attempts;
         private final AtomicInteger index;
         private final AsyncCallback callback;
         private final List<Processor> processors;
 
-        private FailOverAsyncCallback(Exchange exchange, AtomicInteger 
attempts, AtomicInteger index, AsyncCallback callback, List<Processor> 
processors) {
+        private FailOverAsyncCallback(Exchange exchange, Exchange copy, 
AtomicInteger attempts, AtomicInteger index, AsyncCallback callback, 
List<Processor> processors) {
             this.exchange = exchange;
+            this.copy = copy;
             this.attempts = attempts;
             this.index = index;
             this.callback = callback;
@@ -238,7 +239,7 @@ public class FailOverLoadBalancer extend
                 return;
             }
 
-            while (shouldFailOver(exchange)) {
+            while (shouldFailOver(copy)) {
                 attempts.incrementAndGet();
                 // are we exhausted by attempts?
                 if (maximumFailoverAttempts > -1 && attempts.get() > 
maximumFailoverAttempts) {
@@ -263,11 +264,11 @@ public class FailOverLoadBalancer extend
                 }
 
                 // try again but prepare exchange before we failover
-                prepareExchangeForFailover(exchange);
+                copy = prepareExchangeForFailover(exchange);
                 Processor processor = processors.get(index.get());
 
                 // try to failover using the next processor
-                doneSync = processExchange(processor, exchange, attempts, 
index, callback, processors);
+                doneSync = processExchange(processor, exchange, copy, 
attempts, index, callback, processors);
                 if (!doneSync) {
                     log.trace("Processing exchangeId: {} is continued being 
processed asynchronously", exchange.getExchangeId());
                     // the remainder of the failover will be completed async
@@ -276,8 +277,11 @@ public class FailOverLoadBalancer extend
                 }
             }
 
+            // and copy the current result to original so it will contain this 
result of this eip
+            if (copy != null) {
+                ExchangeHelper.copyResults(exchange, copy);
+            }
             log.debug("Failover complete for exchangeId: {} >>> {}", 
exchange.getExchangeId(), exchange);
-
             // signal callback we are done
             callback.done(false);
         };

Added: 
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/FailOverLoadBalancerSetFaultTest.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/FailOverLoadBalancerSetFaultTest.java?rev=1300831&view=auto
==============================================================================
--- 
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/FailOverLoadBalancerSetFaultTest.java
 (added)
+++ 
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/FailOverLoadBalancerSetFaultTest.java
 Thu Mar 15 07:57:53 2012
@@ -0,0 +1,73 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor;
+
+import java.io.IOException;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.camel.builder.RouteBuilder;
+
+/**
+ *
+ */
+public class FailOverLoadBalancerSetFaultTest extends ContextTestSupport {
+    
+    public void testFailOverSetFault() throws Exception {
+        getMockEndpoint("mock:failover1").expectedBodiesReceived("Hello 
World");
+        getMockEndpoint("mock:failover2").expectedBodiesReceived("Hello 
World");
+
+        String out = template.requestBody("direct:start", "Hello World", 
String.class);
+        assertEquals("Bye Camel", out);
+
+        assertMockEndpointsSatisfied();
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                from("direct:start")
+                    .loadBalance().failover(1, false, false, IOException.class)
+                        .to("seda:failover1", "seda:failover2")
+                    .end();
+
+                from("seda:failover1")
+                        .to("mock:failover1")
+                        .process(new Processor() {
+                            public void process(Exchange exchange) throws 
Exception {
+                                // mutate the message
+                                exchange.getOut().setBody("Hi Camel");
+                                // and then set fault directly on OUT for 
example as camel-cxf would do
+                                exchange.getOut().setFault(true);
+                                exchange.setException(new IOException("Forced 
exception for test"));
+                            }
+                        });
+
+                from("seda:failover2")
+                        .to("mock:failover2")
+                        .process(new Processor() {
+                            public void process(Exchange exchange) throws 
Exception {
+                                exchange.getOut().setBody("Bye Camel");
+                            }
+                        });
+            }
+        };
+    }
+}


Reply via email to