Author: davsclaus
Date: Thu Jan 8 11:47:16 2009
New Revision: 732808
URL: http://svn.apache.org/viewvc?rev=732808&view=rev
Log:
Merged revisions 732793 via svnmerge from
https://svn.apache.org/repos/asf/activemq/camel/trunk
........
r732793 | davsclaus | 2009-01-08 20:01:22 +0100 (Thu, 08 Jan 2009) | 1 line
CAMEL-1234: Added onRedelivery option to DeadLetterChannel to allow
processing a custom processor before every redelivery attempt.
........
Added:
activemq/camel/branches/camel-1.x/camel-core/src/test/java/org/apache/camel/processor/DeadLetterChannelOnRedeliveryTest.java
- copied, changed from r732793,
activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/DeadLetterChannelOnRedeliveryTest.java
Modified:
activemq/camel/branches/camel-1.x/ (props changed)
activemq/camel/branches/camel-1.x/camel-core/src/main/java/org/apache/camel/builder/DeadLetterChannelBuilder.java
activemq/camel/branches/camel-1.x/camel-core/src/main/java/org/apache/camel/processor/DeadLetterChannel.java
activemq/camel/branches/camel-1.x/camel-core/src/test/resources/log4j.properties
Propchange: activemq/camel/branches/camel-1.x/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Thu Jan 8 11:47:16 2009
@@ -1 +1 @@
-/activemq/camel/trunk:709850,711200,711206,711219-711220,711523,711531,711756,711784,711859,711874,711962,711971,712064,712119,712148,712662,712692,712925,713013,713107,713136,713273,713290,713292,713295,713314,713475,713625,713932,713944,714032,717965,717989,718242,718273,718312-718515,719163-719184,719334,719339,719524,719662,719848,719851,719855,719864,719978-719979,720207,720435-720437,720806,721272,721331,721333-721334,721360,721669,721764,721813,721985,722005,722070,722110,722415,722438,722726,722845,722878,723264,723314,723325-723327,723409,723835,723966,724122,724619,724681,725040,725309-725320,725340,725351,725569-725572,725612,725652-725660,725715,725883,726339,726640-726645,726932,727113,727375,727377,727624,727713,727946,729401,729892,730069,730132,730154,730157,730275,730299,730504-730505,730508,730571,730599,730759,730903,730916,730923,730936,730992,731126,731168-731169,731488,731492,731799,731824,731836,731844,731860,732207,732210,732237,732246-732247,732378,7
32589-732590,732625
+/activemq/camel/trunk:709850,711200,711206,711219-711220,711523,711531,711756,711784,711859,711874,711962,711971,712064,712119,712148,712662,712692,712925,713013,713107,713136,713273,713290,713292,713295,713314,713475,713625,713932,713944,714032,717965,717989,718242,718273,718312-718515,719163-719184,719334,719339,719524,719662,719848,719851,719855,719864,719978-719979,720207,720435-720437,720806,721272,721331,721333-721334,721360,721669,721764,721813,721985,722005,722070,722110,722415,722438,722726,722845,722878,723264,723314,723325-723327,723409,723835,723966,724122,724619,724681,725040,725309-725320,725340,725351,725569-725572,725612,725652-725660,725715,725883,726339,726640-726645,726932,727113,727375,727377,727624,727713,727946,729401,729892,730069,730132,730154,730157,730275,730299,730504-730505,730508,730571,730599,730759,730903,730916,730923,730936,730992,731126,731168-731169,731488,731492,731799,731824,731836,731844,731860,732207,732210,732237,732246-732247,732378,7
32589-732590,732625,732793
Propchange: activemq/camel/branches/camel-1.x/
------------------------------------------------------------------------------
Binary property 'svnmerge-integrated' - no diff available.
Modified:
activemq/camel/branches/camel-1.x/camel-core/src/main/java/org/apache/camel/builder/DeadLetterChannelBuilder.java
URL:
http://svn.apache.org/viewvc/activemq/camel/branches/camel-1.x/camel-core/src/main/java/org/apache/camel/builder/DeadLetterChannelBuilder.java?rev=732808&r1=732807&r2=732808&view=diff
==============================================================================
---
activemq/camel/branches/camel-1.x/camel-core/src/main/java/org/apache/camel/builder/DeadLetterChannelBuilder.java
(original)
+++
activemq/camel/branches/camel-1.x/camel-core/src/main/java/org/apache/camel/builder/DeadLetterChannelBuilder.java
Thu Jan 8 11:47:16 2009
@@ -38,6 +38,7 @@
*/
public class DeadLetterChannelBuilder extends ErrorHandlerBuilderSupport {
private RedeliveryPolicy redeliveryPolicy = new RedeliveryPolicy();
+ private Processor onRedelivery;
private ExceptionPolicyStrategy exceptionPolicyStrategy =
ErrorHandlerSupport.createDefaultExceptionPolicyStrategy();
private ProcessorFactory deadLetterFactory;
private Processor defaultDeadLetterEndpoint;
@@ -63,8 +64,8 @@
}
public Processor createErrorHandler(RouteContext routeContext, Processor
processor) throws Exception {
- Processor deadLetter = getDeadLetterFactory().createProcessor();
- DeadLetterChannel answer = new DeadLetterChannel(processor,
deadLetter, getRedeliveryPolicy(), getLogger(), getExceptionPolicyStrategy());
+ Processor deadLetter = getDeadLetterFactory().createProcessor();
+ DeadLetterChannel answer = new DeadLetterChannel(processor,
deadLetter, onRedelivery, getRedeliveryPolicy(), getLogger(),
getExceptionPolicyStrategy());
configure(answer);
return answer;
}
@@ -106,6 +107,16 @@
return this;
}
+ public DeadLetterChannelBuilder retriesExhaustedLogLevel(LoggingLevel
retriesExhaustedLogLevel) {
+
getRedeliveryPolicy().setRetriesExhaustedLogLevel(retriesExhaustedLogLevel);
+ return this;
+ }
+
+ public DeadLetterChannelBuilder retryAttemptedLogLevel(LoggingLevel
retryAttemptedLogLevel) {
+
getRedeliveryPolicy().setRetryAttemptedLogLevel(retryAttemptedLogLevel);
+ return this;
+ }
+
/**
* Sets the logger used for caught exceptions
*/
@@ -152,6 +163,16 @@
return this;
}
+ /**
+ * Sets a processor that should be processed <b>before</b> a redelivey
attempt.
+ * <p/>
+ * Can be used to change the {...@link org.apache.camel.Exchange}
<b>before</b> its being redelivered.
+ */
+ public DeadLetterChannelBuilder onRedelivery(Processor processor) {
+ setOnRedelivery(processor);
+ return this;
+ }
+
// Properties
//
-------------------------------------------------------------------------
public RedeliveryPolicy getRedeliveryPolicy() {
@@ -250,4 +271,16 @@
this.exceptionPolicyStrategy = exceptionPolicyStrategy;
}
+ public Processor getOnRedelivery() {
+ return onRedelivery;
+ }
+
+ public void setOnRedelivery(Processor onRedelivery) {
+ this.onRedelivery = onRedelivery;
+ }
+
+ @Override
+ public String toString() {
+ return "DeadLetterChannelBuilder(" + (deadLetterFactory != null ?
deadLetterFactory : defaultDeadLetterEndpoint) + ")";
+ }
}
Modified:
activemq/camel/branches/camel-1.x/camel-core/src/main/java/org/apache/camel/processor/DeadLetterChannel.java
URL:
http://svn.apache.org/viewvc/activemq/camel/branches/camel-1.x/camel-core/src/main/java/org/apache/camel/processor/DeadLetterChannel.java?rev=732808&r1=732807&r2=732808&view=diff
==============================================================================
---
activemq/camel/branches/camel-1.x/camel-core/src/main/java/org/apache/camel/processor/DeadLetterChannel.java
(original)
+++
activemq/camel/branches/camel-1.x/camel-core/src/main/java/org/apache/camel/processor/DeadLetterChannel.java
Thu Jan 8 11:47:16 2009
@@ -58,6 +58,7 @@
private AsyncProcessor outputAsync;
private RedeliveryPolicy redeliveryPolicy;
private Logger logger;
+ private Processor redeliveryProcessor;
private class RedeliveryData {
int redeliveryCounter;
@@ -104,16 +105,11 @@
}
}
- public DeadLetterChannel(Processor output, Processor deadLetter) {
- this(output, deadLetter, new RedeliveryPolicy(),
DeadLetterChannel.createDefaultLogger(),
- ErrorHandlerSupport.createDefaultExceptionPolicyStrategy());
- }
-
- public DeadLetterChannel(Processor output, Processor deadLetter,
RedeliveryPolicy redeliveryPolicy, Logger logger, ExceptionPolicyStrategy
exceptionPolicyStrategy) {
- this.deadLetter = deadLetter;
+ public DeadLetterChannel(Processor output, Processor deadLetter, Processor
redeliveryProcessor, RedeliveryPolicy redeliveryPolicy, Logger logger,
ExceptionPolicyStrategy exceptionPolicyStrategy) {
this.output = output;
+ this.deadLetter = deadLetter;
+ this.redeliveryProcessor = redeliveryProcessor;
this.outputAsync = AsyncProcessorTypeConverter.convert(output);
-
this.redeliveryPolicy = redeliveryPolicy;
this.logger = logger;
setExceptionPolicy(exceptionPolicyStrategy);
@@ -128,11 +124,18 @@
return "DeadLetterChannel[" + output + ", " + deadLetter + "]";
}
+ public void process(Exchange exchange) throws Exception {
+ AsyncProcessorHelper.process(this, exchange);
+ }
+
public boolean process(Exchange exchange, final AsyncCallback callback) {
return process(exchange, callback, new RedeliveryData());
}
- public boolean process(final Exchange exchange, final AsyncCallback
callback, final RedeliveryData data) {
+ /**
+ * Processes the exchange using decorated with this dead letter channel.
+ */
+ protected boolean process(final Exchange exchange, final AsyncCallback
callback, final RedeliveryData data) {
while (true) {
// we can't keep retrying if the route is being shutdown.
@@ -158,12 +161,13 @@
handleException(exchange, data);
}
- // should we redeliver or not?
- if
(!data.currentRedeliveryPolicy.shouldRedeliver(data.redeliveryCounter)) {
+ // compute if we should redeliver or not
+ boolean shouldRedeliver = shouldRedeliver(exchange, data);
+ if (!shouldRedeliver) {
return deliverToFaultProcessor(exchange, callback, data);
}
- // should we redeliver
+ // if we are redelivering then sleep before trying again
if (data.redeliveryCounter > 0) {
// okay we will give it another go so clear the exception so
we can try again
if (exchange.getException() != null) {
@@ -172,6 +176,9 @@
// wait until we should redeliver
data.redeliveryDelay =
data.currentRedeliveryPolicy.sleep(data.redeliveryDelay);
+
+ // letting onRedeliver be executed
+ deliverToRedeliveryProcessor(exchange, callback, data);
}
// process the exchange
@@ -206,20 +213,6 @@
}
- private void logFailedDelivery(String message, RedeliveryData data,
Throwable e) {
- LoggingLevel newLogLevel = null;
- if
(data.currentRedeliveryPolicy.shouldRedeliver(data.redeliveryCounter)) {
- newLogLevel =
data.currentRedeliveryPolicy.getRetryAttemptedLogLevel();
- } else {
- newLogLevel =
data.currentRedeliveryPolicy.getRetriesExhaustedLogLevel();
- }
- if (e != null) {
- logger.log(message, e, newLogLevel);
- } else {
- logger.log(message, newLogLevel);
- }
- }
-
protected void asyncProcess(final Exchange exchange, final AsyncCallback
callback, final RedeliveryData data) {
// set the timer here
if (!isRunAllowed()) {
@@ -242,9 +235,11 @@
// did previous processing caused an exception?
if (exchange.getException() != null) {
handleException(exchange, data);
- }
+ }
- if
(!data.currentRedeliveryPolicy.shouldRedeliver(data.redeliveryCounter)) {
+ // compute if we should redeliver or not
+ boolean shouldRedeliver = shouldRedeliver(exchange, data);
+ if (!shouldRedeliver) {
deliverToFaultProcessor(exchange, callback, data);
return;
}
@@ -259,7 +254,10 @@
// wait until we should redeliver
data.redeliveryDelay =
data.currentRedeliveryPolicy.getRedeliveryDelay(data.redeliveryDelay);
timer.schedule(new RedeliverTimerTask(exchange, callback, data),
data.redeliveryDelay);
- }
+
+ // letting onRedeliver be executed
+ deliverToRedeliveryProcessor(exchange, callback, data);
+ }
}
private void handleException(Exchange exchange, RedeliveryData data) {
@@ -275,14 +273,39 @@
Processor processor = exceptionPolicy.getErrorHandler();
if (processor != null) {
data.failureProcessor = processor;
- }
+ }
}
-
- logFailedDelivery("Failed delivery for exchangeId: " +
exchange.getExchangeId() + ". On delivery attempt: " + data.redeliveryCounter +
" caught: " + e, data, e);
+
+ String msg = "Failed delivery for exchangeId: " +
exchange.getExchangeId()
+ + ". On delivery attempt: " + data.redeliveryCounter + "
caught: " + e;
+ logFailedDelivery(true, exchange, msg, data, e);
+
data.redeliveryCounter = incrementRedeliveryCounter(exchange, e);
-
}
-
+
+ /**
+ * Gives an optional configure redelivery processor a chance to process
before the Exchange
+ * will be redelivered. This can be used to alter the Exchange.
+ */
+ private boolean deliverToRedeliveryProcessor(final Exchange exchange,
final AsyncCallback callback,
+ final RedeliveryData data) {
+ if (redeliveryProcessor == null) {
+ return true;
+ }
+
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("RedeliveryProcessor " + redeliveryProcessor + " is
processing Exchange before its redelivered");
+ }
+ AsyncProcessor afp =
AsyncProcessorTypeConverter.convert(redeliveryProcessor);
+ boolean sync = afp.process(exchange, new AsyncCallback() {
+ public void done(boolean sync) {
+ callback.done(data.sync);
+ }
+ });
+
+ return sync;
+ }
+
private boolean deliverToFaultProcessor(final Exchange exchange, final
AsyncCallback callback,
final RedeliveryData data) {
// we did not success with the redelivery so now we let the failure
processor handle it
@@ -299,45 +322,27 @@
}
});
- // The line below shouldn't be needed, it is invoked by the
AsyncCallback above
- //restoreExceptionOnExchange(exchange, data.handledPredicate);
- logFailedDelivery("Failed delivery for exchangeId: " +
exchange.getExchangeId() + ". Handled by the failure processor: " +
data.failureProcessor, data, null);
+ String msg = "Failed delivery for exchangeId: " +
exchange.getExchangeId()
+ + ". Handled by the failure processor: " +
data.failureProcessor;
+ logFailedDelivery(false, exchange, msg, data, null);
+
return sync;
}
+ // Properties
+ //
-------------------------------------------------------------------------
+
public static boolean isFailureHandled(Exchange exchange) {
- return exchange.getProperty(FAILURE_HANDLED_PROPERTY) != null
+ return exchange.getProperty(FAILURE_HANDLED_PROPERTY) != null
|| exchange.getIn().getHeader(CAUGHT_EXCEPTION_HEADER) != null;
}
public static void setFailureHandled(Exchange exchange) {
exchange.setProperty(FAILURE_HANDLED_PROPERTY,
exchange.getException());
- exchange.getIn().setHeader(CAUGHT_EXCEPTION_HEADER,
exchange.getException());
+ exchange.getIn().setHeader(CAUGHT_EXCEPTION_HEADER,
exchange.getException());
exchange.setException(null);
}
- protected static void restoreExceptionOnExchange(Exchange exchange,
Predicate handledPredicate) {
- if (handledPredicate == null || !handledPredicate.matches(exchange)) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("This exchange is not handled so its marked as
failed: " + exchange);
- }
- // exception not handled, put exception back in the exchange
-
exchange.setException(exchange.getProperty(FAILURE_HANDLED_PROPERTY,
Throwable.class));
- } else {
- if (LOG.isDebugEnabled()) {
- LOG.debug("This exchange is handled so its marked as not
failed: " + exchange);
- }
- exchange.setProperty(Exchange.EXCEPTION_HANDLED_PROPERTY,
Boolean.TRUE);
- }
- }
-
- public void process(Exchange exchange) throws Exception {
- AsyncProcessorHelper.process(this, exchange);
- }
-
- // Properties
- //
-------------------------------------------------------------------------
-
/**
* Returns the output processor
*/
@@ -379,6 +384,39 @@
// Implementation methods
//
-------------------------------------------------------------------------
+ protected static void restoreExceptionOnExchange(Exchange exchange,
Predicate handledPredicate) {
+ if (handledPredicate == null || !handledPredicate.matches(exchange)) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("This exchange is not handled so its marked as
failed: " + exchange);
+ }
+ // exception not handled, put exception back in the exchange
+
exchange.setException(exchange.getProperty(FAILURE_HANDLED_PROPERTY,
Throwable.class));
+ } else {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("This exchange is handled so its marked as not
failed: " + exchange);
+ }
+ exchange.setProperty(Exchange.EXCEPTION_HANDLED_PROPERTY,
Boolean.TRUE);
+ }
+ }
+
+ private void logFailedDelivery(boolean shouldRedeliver, Exchange exchange,
String message, RedeliveryData data, Throwable e) {
+ LoggingLevel newLogLevel;
+ if (shouldRedeliver) {
+ newLogLevel =
data.currentRedeliveryPolicy.getRetryAttemptedLogLevel();
+ } else {
+ newLogLevel =
data.currentRedeliveryPolicy.getRetriesExhaustedLogLevel();
+ }
+ if (e != null) {
+ logger.log(message, e, newLogLevel);
+ } else {
+ logger.log(message, newLogLevel);
+ }
+ }
+
+ private boolean shouldRedeliver(Exchange exchange, RedeliveryData data) {
+ return
data.currentRedeliveryPolicy.shouldRedeliver(data.redeliveryCounter);
+ }
+
/**
* Increments the redelivery counter and adds the redelivered flag if the
* message has been redelivered
Copied:
activemq/camel/branches/camel-1.x/camel-core/src/test/java/org/apache/camel/processor/DeadLetterChannelOnRedeliveryTest.java
(from r732793,
activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/DeadLetterChannelOnRedeliveryTest.java)
URL:
http://svn.apache.org/viewvc/activemq/camel/branches/camel-1.x/camel-core/src/test/java/org/apache/camel/processor/DeadLetterChannelOnRedeliveryTest.java?p2=activemq/camel/branches/camel-1.x/camel-core/src/test/java/org/apache/camel/processor/DeadLetterChannelOnRedeliveryTest.java&p1=activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/DeadLetterChannelOnRedeliveryTest.java&r1=732793&r2=732808&rev=732808&view=diff
==============================================================================
---
activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/DeadLetterChannelOnRedeliveryTest.java
(original)
+++
activemq/camel/branches/camel-1.x/camel-core/src/test/java/org/apache/camel/processor/DeadLetterChannelOnRedeliveryTest.java
Thu Jan 8 11:47:16 2009
@@ -21,7 +21,6 @@
import org.apache.camel.Processor;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.component.mock.MockEndpoint;
-import org.apache.camel.processor.onexception.MyTechnicalException;
/**
* Unit test for testing possibility to modify exchange before redelivering
@@ -67,7 +66,7 @@
errorHandler(deadLetterChannel("mock:error")
.onRedelivery(new MyRedeliverPrcessor())
// setting delay to zero is just to make unit teting
faster
- .delay(0L));
+ .initialRedeliveryDelay(0L));
// END SNIPPET: e1
@@ -75,7 +74,7 @@
public void process(Exchange exchange) throws Exception {
// force some error so Camel will do redelivery
if (++counter <= 3) {
- throw new MyTechnicalException("Forced by unit
test");
+ throw new IllegalArgumentException("Forced by unit
test");
}
}
}).to("mock:result");
Modified:
activemq/camel/branches/camel-1.x/camel-core/src/test/resources/log4j.properties
URL:
http://svn.apache.org/viewvc/activemq/camel/branches/camel-1.x/camel-core/src/test/resources/log4j.properties?rev=732808&r1=732807&r2=732808&view=diff
==============================================================================
---
activemq/camel/branches/camel-1.x/camel-core/src/test/resources/log4j.properties
(original)
+++
activemq/camel/branches/camel-1.x/camel-core/src/test/resources/log4j.properties
Thu Jan 8 11:47:16 2009
@@ -21,7 +21,7 @@
log4j.rootLogger=INFO, out
log4j.logger.org.apache.activemq.spring=WARN
-log4j.logger.org.apache.camel=DEBUG
+#log4j.logger.org.apache.camel=DEBUG
log4j.logger.org.apache.camel.impl.converter=WARN
# CONSOLE appender not used by default