Author: davsclaus
Date: Mon Jun 29 05:40:04 2009
New Revision: 789216

URL: http://svn.apache.org/viewvc?rev=789216&view=rev
Log:
CAMEL-1750: added retry feature to scheduled poll consumer strategy.

Added:
    
camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerPollStrategyRollbackThrowExceptionTest.java
      - copied, changed from r789205, 
camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerPollStrategyStopOnRollbackTest.java
Modified:
    
camel/trunk/camel-core/src/main/java/org/apache/camel/PollingConsumerPollStrategy.java
    
camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultPollingConsumerPollStrategy.java
    
camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ScheduledPollConsumer.java
    
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/interceptor/TraceInterceptor.java
    
camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerPollStrategyStopOnRollbackTest.java
    
camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerPollStrategyTest.java
    
camel/trunk/camel-core/src/test/java/org/apache/camel/impl/ScheduledPollConsumerTest.java

Modified: 
camel/trunk/camel-core/src/main/java/org/apache/camel/PollingConsumerPollStrategy.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/PollingConsumerPollStrategy.java?rev=789216&r1=789215&r2=789216&view=diff
==============================================================================
--- 
camel/trunk/camel-core/src/main/java/org/apache/camel/PollingConsumerPollStrategy.java
 (original)
+++ 
camel/trunk/camel-core/src/main/java/org/apache/camel/PollingConsumerPollStrategy.java
 Mon Jun 29 05:40:04 2009
@@ -20,7 +20,7 @@
  * Strategy for a {...@link org.apache.camel.PollingConsumer} when polling an 
{...@link org.apache.camel.Endpoint}.
  * <p/>
  * This pluggable strategy allows to plugin different implementations what to 
do, most noticeable what to
- * do in case the polling goes wrong. This can be handled in the {...@link 
#rollback(Consumer, Endpoint, Exception) rollback}
+ * do in case the polling goes wrong. This can be handled in the {...@link 
#rollback(Consumer, Endpoint, int, Exception) rollback}
  * method.
  *
  * @version $Revision$
@@ -36,7 +36,7 @@
     void begin(Consumer consumer, Endpoint endpoint);
 
     /**
-     * Called when poll is completed sucesfully
+     * Called when poll is completed successfully
      *
      * @param consumer the consumer
      * @param endpoint the endpoint being consumed
@@ -48,9 +48,14 @@
      *
      * @param consumer the consumer
      * @param endpoint the endpoint being consumed
+     * @param retryCounter current retry attempt, starting from 0.
      * @param cause the caused exception
-     * @throws Exception can be used to rethrow the caused exception
+     * @throws Exception can be used to rethrow the caused exception. Notice 
that thrown an exception will
+     *         terminate the scheduler and thus Camel will not trigger again. 
So if you want to let the scheduler
+     *         to continue to run then do <b>not</b> throw any exception from 
this method.
+     * @return whether to retry immediately or not. Return <tt>false</tt> to 
ignore the problem,
+     *         <tt>true</tt> to try immediately again
      */
-    void rollback(Consumer consumer, Endpoint endpoint, Exception cause) 
throws Exception;
+    boolean rollback(Consumer consumer, Endpoint endpoint, int retryCounter, 
Exception cause) throws Exception;
 
 }

Modified: 
camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultPollingConsumerPollStrategy.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultPollingConsumerPollStrategy.java?rev=789216&r1=789215&r2=789216&view=diff
==============================================================================
--- 
camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultPollingConsumerPollStrategy.java
 (original)
+++ 
camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultPollingConsumerPollStrategy.java
 Mon Jun 29 05:40:04 2009
@@ -39,8 +39,10 @@
         // noop
     }
 
-    public void rollback(Consumer consumer, Endpoint endpoint, Exception e) 
throws Exception {
+    public boolean rollback(Consumer consumer, Endpoint endpoint, int 
retryCounter, Exception e) throws Exception {
         LOG.warn("Consumer " + consumer +  " could not poll endpoint: " + 
endpoint.getEndpointUri() + " caused by: " + e.getMessage(), e);
+        // we do not want to retry
+        return false;
     }
 
 }

Modified: 
camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ScheduledPollConsumer.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ScheduledPollConsumer.java?rev=789216&r1=789215&r2=789216&view=diff
==============================================================================
--- 
camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ScheduledPollConsumer.java
 (original)
+++ 
camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ScheduledPollConsumer.java
 Mon Jun 29 05:40:04 2009
@@ -72,20 +72,41 @@
      * Invoked whenever we should be polled
      */
     public void run() {
-        try {
-            if (isRunAllowed()) {
-                if (LOG.isTraceEnabled()) {
-                    LOG.trace("Starting to poll: " + this.getEndpoint());
-                }
-                pollStrategy.begin(this, getEndpoint());
-                poll();
-                pollStrategy.commit(this, getEndpoint());
-            }
-        } catch (Exception e) {
+        int retryCounter = -1;
+        boolean done = false;
+
+        while (!done) {
             try {
-                pollStrategy.rollback(this, getEndpoint(), e);
-            } catch (Exception re) {
-                throw ObjectHelper.wrapRuntimeCamelException(re);
+                // eager assume we are done
+                done = true;
+                if (isRunAllowed()) {
+
+                    if (retryCounter == -1) {
+                        if (LOG.isTraceEnabled()) {
+                            LOG.trace("Starting to poll: " + 
this.getEndpoint());
+                        }
+                    } else {
+                        if (LOG.isDebugEnabled()) {
+                            if (LOG.isDebugEnabled()) {
+                                LOG.debug("Retrying attempt " + retryCounter + 
" to poll: " + this.getEndpoint());
+                            }
+                        }
+                    }
+
+                    pollStrategy.begin(this, getEndpoint());
+                    retryCounter++;
+                    poll();
+                    pollStrategy.commit(this, getEndpoint());
+                }
+            } catch (Exception e) {
+                try {
+                    boolean retry = pollStrategy.rollback(this, getEndpoint(), 
retryCounter, e);
+                    if (retry) {
+                        done = false;
+                    }
+                } catch (Exception re) {
+                    throw ObjectHelper.wrapRuntimeCamelException(re);
+                }
             }
         }
 

Modified: 
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/interceptor/TraceInterceptor.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/interceptor/TraceInterceptor.java?rev=789216&r1=789215&r2=789216&view=diff
==============================================================================
--- 
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/interceptor/TraceInterceptor.java
 (original)
+++ 
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/interceptor/TraceInterceptor.java
 Mon Jun 29 05:40:04 2009
@@ -16,7 +16,6 @@
  */
 package org.apache.camel.processor.interceptor;
 
-import java.util.Arrays;
 import java.util.Date;
 import java.util.HashMap;
 import java.util.Map;
@@ -102,16 +101,16 @@
 
         boolean shouldLog = shouldLogNode(node) && shouldLogExchange(exchange);
 
+        // whether we should trace it or not, some nodes should be skipped as 
they are abstract
+        // intermedidate steps for instance related to on completion
+        boolean trace = true;
+
         // okay this is a regular exchange being routed we might need to log 
and trace
         try {
             // before
             if (shouldLog) {
 
-                // whether we should trace it or not, some nodes should be 
skipped as they are abstract
-                // intermedidate steps for instance related to on completion
-                boolean trace = true;
-
-                // if traceable then register this as the previous node, now 
it has been logged
+                // register route path taken if TraceableUnitOfWork unit of 
work
                 if (exchange.getUnitOfWork() instanceof TraceableUnitOfWork) {
                     TraceableUnitOfWork tuow = (TraceableUnitOfWork) 
exchange.getUnitOfWork();
 
@@ -126,22 +125,22 @@
                         tuow.addTraced(new DefaultRouteNode(node, 
super.getProcessor()));
                     }
                 }
+            }
 
-                // log and trace the processor
-                if (trace) {
-                    logExchange(exchange);
-                    traceExchange(exchange);
-                }
-
-                // some nodes need extra work to trace it
-                if (exchange.getUnitOfWork() instanceof TraceableUnitOfWork) {
-                    TraceableUnitOfWork tuow = (TraceableUnitOfWork) 
exchange.getUnitOfWork();
+            // log and trace the processor
+            if (trace) {
+                logExchange(exchange);
+                traceExchange(exchange);
+            }
 
-                    if (node instanceof InterceptDefinition) {
-                        // special for intercept() as we would like to trace 
the processor that was intercepted
-                        // as well, otherwise we only see the intercepted 
route, but we need the both to be logged/traced
-                        afterIntercept((InterceptDefinition) node, tuow, 
exchange);
-                    }
+            // some nodes need extra work to trace it
+            if (exchange.getUnitOfWork() instanceof TraceableUnitOfWork) {
+                TraceableUnitOfWork tuow = (TraceableUnitOfWork) 
exchange.getUnitOfWork();
+
+                if (node instanceof InterceptDefinition) {
+                    // special for intercept() as we would like to trace the 
processor that was intercepted
+                    // as well, otherwise we only see the intercepted route, 
but we need the both to be logged/traced
+                    afterIntercept((InterceptDefinition) node, tuow, exchange);
                 }
             }
 

Copied: 
camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerPollStrategyRollbackThrowExceptionTest.java
 (from r789205, 
camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerPollStrategyStopOnRollbackTest.java)
URL: 
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerPollStrategyRollbackThrowExceptionTest.java?p2=camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerPollStrategyRollbackThrowExceptionTest.java&p1=camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerPollStrategyStopOnRollbackTest.java&r1=789205&r2=789216&rev=789216&view=diff
==============================================================================
--- 
camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerPollStrategyStopOnRollbackTest.java
 (original)
+++ 
camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerPollStrategyRollbackThrowExceptionTest.java
 Mon Jun 29 05:40:04 2009
@@ -29,9 +29,8 @@
 /**
  * Unit test for poll strategy
  */
-public class FileConsumerPollStrategyStopOnRollbackTest extends 
ContextTestSupport {
+public class FileConsumerPollStrategyRollbackThrowExceptionTest extends 
ContextTestSupport {
 
-    private static int counter;
     private static String event = "";
 
     private String fileUrl = 
"file://target/pollstrategy/?pollStrategy=#myPoll";
@@ -50,12 +49,13 @@
         template.sendBodyAndHeader("file:target/pollstrategy/", "Hello World", 
Exchange.FILE_NAME, "hello.txt");
     }
 
-    public void testStopOnRollback() throws Exception {
+    public void testRollbackThrowException() throws Exception {
         MockEndpoint mock = getMockEndpoint("mock:result");
         mock.expectedMessageCount(0);
 
-        // let it run for a little while and since it fails first time we 
should never get a message
-        Thread.sleep(1000);
+        // let it run for a little while since we rethrow the excpetion the 
consumer
+        // will stop scheduling and not poll anymore
+        Thread.sleep(2000);
 
         assertMockEndpointsSatisfied();
 
@@ -81,22 +81,17 @@
                 ObjectHelper.wrapRuntimeCamelException(e);
             }
 
-            if (counter++ == 0) {
-                // simulate an error on first poll
-                throw new IllegalArgumentException("Damn I cannot do this");
-            }
+            // simulate an error on first poll
+            throw new IllegalArgumentException("Damn I cannot do this");
         }
 
         public void commit(Consumer consumer, Endpoint endpoint) {
             event += "commit";
         }
 
-        public void rollback(Consumer consumer, Endpoint endpoint, Exception 
cause) throws Exception {
-            if (cause.getMessage().equals("Damn I cannot do this")) {
-                event += "rollback";
-                // stop consumer as it does not work
-                consumer.stop();
-            }
+        public boolean rollback(Consumer consumer, Endpoint endpoint, int 
retryCounter, Exception cause) throws Exception {
+            event += "rollback";
+            throw cause;
         }
     }
 

Modified: 
camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerPollStrategyStopOnRollbackTest.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerPollStrategyStopOnRollbackTest.java?rev=789216&r1=789215&r2=789216&view=diff
==============================================================================
--- 
camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerPollStrategyStopOnRollbackTest.java
 (original)
+++ 
camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerPollStrategyStopOnRollbackTest.java
 Mon Jun 29 05:40:04 2009
@@ -91,12 +91,13 @@
             event += "commit";
         }
 
-        public void rollback(Consumer consumer, Endpoint endpoint, Exception 
cause) throws Exception {
+        public boolean rollback(Consumer consumer, Endpoint endpoint, int 
retryCounter, Exception cause) throws Exception {
             if (cause.getMessage().equals("Damn I cannot do this")) {
                 event += "rollback";
                 // stop consumer as it does not work
                 consumer.stop();
             }
+            return false;
         }
     }
 

Modified: 
camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerPollStrategyTest.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerPollStrategyTest.java?rev=789216&r1=789215&r2=789216&view=diff
==============================================================================
--- 
camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerPollStrategyTest.java
 (original)
+++ 
camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerPollStrategyTest.java
 Mon Jun 29 05:40:04 2009
@@ -82,10 +82,11 @@
             event += "commit";
         }
 
-        public void rollback(Consumer consumer, Endpoint endpoint, Exception 
cause) throws Exception {
+        public boolean rollback(Consumer consumer, Endpoint endpoint, int 
retryCounter, Exception cause) throws Exception {
             if (cause.getMessage().equals("Damn I cannot do this")) {
                 event += "rollback";
             }
+            return false;
         }
     }
 

Modified: 
camel/trunk/camel-core/src/test/java/org/apache/camel/impl/ScheduledPollConsumerTest.java
URL: 
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/impl/ScheduledPollConsumerTest.java?rev=789216&r1=789215&r2=789216&view=diff
==============================================================================
--- 
camel/trunk/camel-core/src/test/java/org/apache/camel/impl/ScheduledPollConsumerTest.java
 (original)
+++ 
camel/trunk/camel-core/src/test/java/org/apache/camel/impl/ScheduledPollConsumerTest.java
 Mon Jun 29 05:40:04 2009
@@ -24,7 +24,9 @@
 public class ScheduledPollConsumerTest extends ContextTestSupport {
 
     private static boolean rollback;
-    
+    private static int counter;
+    private static String event = "";
+
     public void testExceptionOnPollAndCanStartAgain() throws Exception {
 
         final Exception expectedException = new Exception("Hello, I should be 
thrown on shutdown only!");
@@ -37,11 +39,11 @@
             public void commit(Consumer consumer, Endpoint endpoint) {
             }
 
-            public void rollback(Consumer consumer, Endpoint endpoint, 
Exception e) throws Exception {
+            public boolean rollback(Consumer consumer, Endpoint endpoint, int 
retryCounter, Exception e) throws Exception {
                 if (e == expectedException) {
                     rollback = true;
                 }
-
+                return false;
             }
         });
 
@@ -64,6 +66,43 @@
         assertEquals("Should not have rollback", false, rollback);
     }
     
+    public void testRetryAtMostThreeTimes() throws Exception {
+        counter = 0;
+        event = "";
+
+        final Exception expectedException = new Exception("Hello, I should be 
thrown on shutdown only!");
+        MockScheduledPollConsumer consumer = new 
MockScheduledPollConsumer(expectedException);
+
+        consumer.setPollStrategy(new PollingConsumerPollStrategy() {
+            public void begin(Consumer consumer, Endpoint endpoint) {
+            }
+
+            public void commit(Consumer consumer, Endpoint endpoint) {
+                event += "commit";
+            }
+
+            public boolean rollback(Consumer consumer, Endpoint endpoint, int 
retryCounter, Exception e) throws Exception {
+                event += "rollback";
+                counter++;
+                if (retryCounter < 3) {
+                    return true;
+                }
+                return false;
+            }
+        });
+
+        consumer.setUseFixedDelay(true);
+        consumer.setDelay(60000);
+        consumer.start();
+        // poll that throws an exception
+        consumer.run();
+        consumer.stop();
+
+        // 3 retries + 1 last failed attempt when we give up
+        assertEquals(4, counter);
+        assertEquals("rollbackrollbackrollbackrollback", event);
+    }
+
     public void testNoExceptionOnPoll() throws Exception {
         MockScheduledPollConsumer consumer = new 
MockScheduledPollConsumer(null);
         consumer.start();


Reply via email to