Author: davsclaus
Date: Mon Jun 29 05:40:04 2009
New Revision: 789216
URL: http://svn.apache.org/viewvc?rev=789216&view=rev
Log:
CAMEL-1750: added retry feature to scheduled poll consumer strategy.
Added:
camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerPollStrategyRollbackThrowExceptionTest.java
- copied, changed from r789205,
camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerPollStrategyStopOnRollbackTest.java
Modified:
camel/trunk/camel-core/src/main/java/org/apache/camel/PollingConsumerPollStrategy.java
camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultPollingConsumerPollStrategy.java
camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ScheduledPollConsumer.java
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/interceptor/TraceInterceptor.java
camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerPollStrategyStopOnRollbackTest.java
camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerPollStrategyTest.java
camel/trunk/camel-core/src/test/java/org/apache/camel/impl/ScheduledPollConsumerTest.java
Modified:
camel/trunk/camel-core/src/main/java/org/apache/camel/PollingConsumerPollStrategy.java
URL:
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/PollingConsumerPollStrategy.java?rev=789216&r1=789215&r2=789216&view=diff
==============================================================================
---
camel/trunk/camel-core/src/main/java/org/apache/camel/PollingConsumerPollStrategy.java
(original)
+++
camel/trunk/camel-core/src/main/java/org/apache/camel/PollingConsumerPollStrategy.java
Mon Jun 29 05:40:04 2009
@@ -20,7 +20,7 @@
* Strategy for a {...@link org.apache.camel.PollingConsumer} when polling an
{...@link org.apache.camel.Endpoint}.
* <p/>
* This pluggable strategy allows to plugin different implementations what to
do, most noticeable what to
- * do in case the polling goes wrong. This can be handled in the {...@link
#rollback(Consumer, Endpoint, Exception) rollback}
+ * do in case the polling goes wrong. This can be handled in the {...@link
#rollback(Consumer, Endpoint, int, Exception) rollback}
* method.
*
* @version $Revision$
@@ -36,7 +36,7 @@
void begin(Consumer consumer, Endpoint endpoint);
/**
- * Called when poll is completed sucesfully
+ * Called when poll is completed successfully
*
* @param consumer the consumer
* @param endpoint the endpoint being consumed
@@ -48,9 +48,14 @@
*
* @param consumer the consumer
* @param endpoint the endpoint being consumed
+ * @param retryCounter current retry attempt, starting from 0.
* @param cause the caused exception
- * @throws Exception can be used to rethrow the caused exception
+ * @throws Exception can be used to rethrow the caused exception. Notice
that thrown an exception will
+ * terminate the scheduler and thus Camel will not trigger again.
So if you want to let the scheduler
+ * to continue to run then do <b>not</b> throw any exception from
this method.
+ * @return whether to retry immediately or not. Return <tt>false</tt> to
ignore the problem,
+ * <tt>true</tt> to try immediately again
*/
- void rollback(Consumer consumer, Endpoint endpoint, Exception cause)
throws Exception;
+ boolean rollback(Consumer consumer, Endpoint endpoint, int retryCounter,
Exception cause) throws Exception;
}
Modified:
camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultPollingConsumerPollStrategy.java
URL:
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultPollingConsumerPollStrategy.java?rev=789216&r1=789215&r2=789216&view=diff
==============================================================================
---
camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultPollingConsumerPollStrategy.java
(original)
+++
camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultPollingConsumerPollStrategy.java
Mon Jun 29 05:40:04 2009
@@ -39,8 +39,10 @@
// noop
}
- public void rollback(Consumer consumer, Endpoint endpoint, Exception e)
throws Exception {
+ public boolean rollback(Consumer consumer, Endpoint endpoint, int
retryCounter, Exception e) throws Exception {
LOG.warn("Consumer " + consumer + " could not poll endpoint: " +
endpoint.getEndpointUri() + " caused by: " + e.getMessage(), e);
+ // we do not want to retry
+ return false;
}
}
Modified:
camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ScheduledPollConsumer.java
URL:
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ScheduledPollConsumer.java?rev=789216&r1=789215&r2=789216&view=diff
==============================================================================
---
camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ScheduledPollConsumer.java
(original)
+++
camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ScheduledPollConsumer.java
Mon Jun 29 05:40:04 2009
@@ -72,20 +72,41 @@
* Invoked whenever we should be polled
*/
public void run() {
- try {
- if (isRunAllowed()) {
- if (LOG.isTraceEnabled()) {
- LOG.trace("Starting to poll: " + this.getEndpoint());
- }
- pollStrategy.begin(this, getEndpoint());
- poll();
- pollStrategy.commit(this, getEndpoint());
- }
- } catch (Exception e) {
+ int retryCounter = -1;
+ boolean done = false;
+
+ while (!done) {
try {
- pollStrategy.rollback(this, getEndpoint(), e);
- } catch (Exception re) {
- throw ObjectHelper.wrapRuntimeCamelException(re);
+ // eager assume we are done
+ done = true;
+ if (isRunAllowed()) {
+
+ if (retryCounter == -1) {
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Starting to poll: " +
this.getEndpoint());
+ }
+ } else {
+ if (LOG.isDebugEnabled()) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Retrying attempt " + retryCounter +
" to poll: " + this.getEndpoint());
+ }
+ }
+ }
+
+ pollStrategy.begin(this, getEndpoint());
+ retryCounter++;
+ poll();
+ pollStrategy.commit(this, getEndpoint());
+ }
+ } catch (Exception e) {
+ try {
+ boolean retry = pollStrategy.rollback(this, getEndpoint(),
retryCounter, e);
+ if (retry) {
+ done = false;
+ }
+ } catch (Exception re) {
+ throw ObjectHelper.wrapRuntimeCamelException(re);
+ }
}
}
Modified:
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/interceptor/TraceInterceptor.java
URL:
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/interceptor/TraceInterceptor.java?rev=789216&r1=789215&r2=789216&view=diff
==============================================================================
---
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/interceptor/TraceInterceptor.java
(original)
+++
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/interceptor/TraceInterceptor.java
Mon Jun 29 05:40:04 2009
@@ -16,7 +16,6 @@
*/
package org.apache.camel.processor.interceptor;
-import java.util.Arrays;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;
@@ -102,16 +101,16 @@
boolean shouldLog = shouldLogNode(node) && shouldLogExchange(exchange);
+ // whether we should trace it or not, some nodes should be skipped as
they are abstract
+ // intermedidate steps for instance related to on completion
+ boolean trace = true;
+
// okay this is a regular exchange being routed we might need to log
and trace
try {
// before
if (shouldLog) {
- // whether we should trace it or not, some nodes should be
skipped as they are abstract
- // intermedidate steps for instance related to on completion
- boolean trace = true;
-
- // if traceable then register this as the previous node, now
it has been logged
+ // register route path taken if TraceableUnitOfWork unit of
work
if (exchange.getUnitOfWork() instanceof TraceableUnitOfWork) {
TraceableUnitOfWork tuow = (TraceableUnitOfWork)
exchange.getUnitOfWork();
@@ -126,22 +125,22 @@
tuow.addTraced(new DefaultRouteNode(node,
super.getProcessor()));
}
}
+ }
- // log and trace the processor
- if (trace) {
- logExchange(exchange);
- traceExchange(exchange);
- }
-
- // some nodes need extra work to trace it
- if (exchange.getUnitOfWork() instanceof TraceableUnitOfWork) {
- TraceableUnitOfWork tuow = (TraceableUnitOfWork)
exchange.getUnitOfWork();
+ // log and trace the processor
+ if (trace) {
+ logExchange(exchange);
+ traceExchange(exchange);
+ }
- if (node instanceof InterceptDefinition) {
- // special for intercept() as we would like to trace
the processor that was intercepted
- // as well, otherwise we only see the intercepted
route, but we need the both to be logged/traced
- afterIntercept((InterceptDefinition) node, tuow,
exchange);
- }
+ // some nodes need extra work to trace it
+ if (exchange.getUnitOfWork() instanceof TraceableUnitOfWork) {
+ TraceableUnitOfWork tuow = (TraceableUnitOfWork)
exchange.getUnitOfWork();
+
+ if (node instanceof InterceptDefinition) {
+ // special for intercept() as we would like to trace the
processor that was intercepted
+ // as well, otherwise we only see the intercepted route,
but we need the both to be logged/traced
+ afterIntercept((InterceptDefinition) node, tuow, exchange);
}
}
Copied:
camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerPollStrategyRollbackThrowExceptionTest.java
(from r789205,
camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerPollStrategyStopOnRollbackTest.java)
URL:
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerPollStrategyRollbackThrowExceptionTest.java?p2=camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerPollStrategyRollbackThrowExceptionTest.java&p1=camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerPollStrategyStopOnRollbackTest.java&r1=789205&r2=789216&rev=789216&view=diff
==============================================================================
---
camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerPollStrategyStopOnRollbackTest.java
(original)
+++
camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerPollStrategyRollbackThrowExceptionTest.java
Mon Jun 29 05:40:04 2009
@@ -29,9 +29,8 @@
/**
* Unit test for poll strategy
*/
-public class FileConsumerPollStrategyStopOnRollbackTest extends
ContextTestSupport {
+public class FileConsumerPollStrategyRollbackThrowExceptionTest extends
ContextTestSupport {
- private static int counter;
private static String event = "";
private String fileUrl =
"file://target/pollstrategy/?pollStrategy=#myPoll";
@@ -50,12 +49,13 @@
template.sendBodyAndHeader("file:target/pollstrategy/", "Hello World",
Exchange.FILE_NAME, "hello.txt");
}
- public void testStopOnRollback() throws Exception {
+ public void testRollbackThrowException() throws Exception {
MockEndpoint mock = getMockEndpoint("mock:result");
mock.expectedMessageCount(0);
- // let it run for a little while and since it fails first time we
should never get a message
- Thread.sleep(1000);
+ // let it run for a little while since we rethrow the excpetion the
consumer
+ // will stop scheduling and not poll anymore
+ Thread.sleep(2000);
assertMockEndpointsSatisfied();
@@ -81,22 +81,17 @@
ObjectHelper.wrapRuntimeCamelException(e);
}
- if (counter++ == 0) {
- // simulate an error on first poll
- throw new IllegalArgumentException("Damn I cannot do this");
- }
+ // simulate an error on first poll
+ throw new IllegalArgumentException("Damn I cannot do this");
}
public void commit(Consumer consumer, Endpoint endpoint) {
event += "commit";
}
- public void rollback(Consumer consumer, Endpoint endpoint, Exception
cause) throws Exception {
- if (cause.getMessage().equals("Damn I cannot do this")) {
- event += "rollback";
- // stop consumer as it does not work
- consumer.stop();
- }
+ public boolean rollback(Consumer consumer, Endpoint endpoint, int
retryCounter, Exception cause) throws Exception {
+ event += "rollback";
+ throw cause;
}
}
Modified:
camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerPollStrategyStopOnRollbackTest.java
URL:
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerPollStrategyStopOnRollbackTest.java?rev=789216&r1=789215&r2=789216&view=diff
==============================================================================
---
camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerPollStrategyStopOnRollbackTest.java
(original)
+++
camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerPollStrategyStopOnRollbackTest.java
Mon Jun 29 05:40:04 2009
@@ -91,12 +91,13 @@
event += "commit";
}
- public void rollback(Consumer consumer, Endpoint endpoint, Exception
cause) throws Exception {
+ public boolean rollback(Consumer consumer, Endpoint endpoint, int
retryCounter, Exception cause) throws Exception {
if (cause.getMessage().equals("Damn I cannot do this")) {
event += "rollback";
// stop consumer as it does not work
consumer.stop();
}
+ return false;
}
}
Modified:
camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerPollStrategyTest.java
URL:
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerPollStrategyTest.java?rev=789216&r1=789215&r2=789216&view=diff
==============================================================================
---
camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerPollStrategyTest.java
(original)
+++
camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerPollStrategyTest.java
Mon Jun 29 05:40:04 2009
@@ -82,10 +82,11 @@
event += "commit";
}
- public void rollback(Consumer consumer, Endpoint endpoint, Exception
cause) throws Exception {
+ public boolean rollback(Consumer consumer, Endpoint endpoint, int
retryCounter, Exception cause) throws Exception {
if (cause.getMessage().equals("Damn I cannot do this")) {
event += "rollback";
}
+ return false;
}
}
Modified:
camel/trunk/camel-core/src/test/java/org/apache/camel/impl/ScheduledPollConsumerTest.java
URL:
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/impl/ScheduledPollConsumerTest.java?rev=789216&r1=789215&r2=789216&view=diff
==============================================================================
---
camel/trunk/camel-core/src/test/java/org/apache/camel/impl/ScheduledPollConsumerTest.java
(original)
+++
camel/trunk/camel-core/src/test/java/org/apache/camel/impl/ScheduledPollConsumerTest.java
Mon Jun 29 05:40:04 2009
@@ -24,7 +24,9 @@
public class ScheduledPollConsumerTest extends ContextTestSupport {
private static boolean rollback;
-
+ private static int counter;
+ private static String event = "";
+
public void testExceptionOnPollAndCanStartAgain() throws Exception {
final Exception expectedException = new Exception("Hello, I should be
thrown on shutdown only!");
@@ -37,11 +39,11 @@
public void commit(Consumer consumer, Endpoint endpoint) {
}
- public void rollback(Consumer consumer, Endpoint endpoint,
Exception e) throws Exception {
+ public boolean rollback(Consumer consumer, Endpoint endpoint, int
retryCounter, Exception e) throws Exception {
if (e == expectedException) {
rollback = true;
}
-
+ return false;
}
});
@@ -64,6 +66,43 @@
assertEquals("Should not have rollback", false, rollback);
}
+ public void testRetryAtMostThreeTimes() throws Exception {
+ counter = 0;
+ event = "";
+
+ final Exception expectedException = new Exception("Hello, I should be
thrown on shutdown only!");
+ MockScheduledPollConsumer consumer = new
MockScheduledPollConsumer(expectedException);
+
+ consumer.setPollStrategy(new PollingConsumerPollStrategy() {
+ public void begin(Consumer consumer, Endpoint endpoint) {
+ }
+
+ public void commit(Consumer consumer, Endpoint endpoint) {
+ event += "commit";
+ }
+
+ public boolean rollback(Consumer consumer, Endpoint endpoint, int
retryCounter, Exception e) throws Exception {
+ event += "rollback";
+ counter++;
+ if (retryCounter < 3) {
+ return true;
+ }
+ return false;
+ }
+ });
+
+ consumer.setUseFixedDelay(true);
+ consumer.setDelay(60000);
+ consumer.start();
+ // poll that throws an exception
+ consumer.run();
+ consumer.stop();
+
+ // 3 retries + 1 last failed attempt when we give up
+ assertEquals(4, counter);
+ assertEquals("rollbackrollbackrollbackrollback", event);
+ }
+
public void testNoExceptionOnPoll() throws Exception {
MockScheduledPollConsumer consumer = new
MockScheduledPollConsumer(null);
consumer.start();