Author: davsclaus
Date: Thu Jan 8 11:01:22 2009
New Revision: 732793
URL: http://svn.apache.org/viewvc?rev=732793&view=rev
Log:
CAMEL-1234: Added onRedelivery option to DeadLetterChannel to allow processing
a custom processor before every redelivery attempt.
Added:
activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/DeadLetterChannelOnRedeliveryTest.java
(contents, props changed)
- copied, changed from r732654,
activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/interceptor/InterceptAlterMessageBeforeRedeliveryTest.java
Modified:
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/builder/DeadLetterChannelBuilder.java
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/DeadLetterChannel.java
activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/interceptor/InterceptAlterMessageBeforeRedeliveryTest.java
Modified:
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/builder/DeadLetterChannelBuilder.java
URL:
http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/builder/DeadLetterChannelBuilder.java?rev=732793&r1=732792&r2=732793&view=diff
==============================================================================
---
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/builder/DeadLetterChannelBuilder.java
(original)
+++
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/builder/DeadLetterChannelBuilder.java
Thu Jan 8 11:01:22 2009
@@ -38,6 +38,7 @@
*/
public class DeadLetterChannelBuilder extends ErrorHandlerBuilderSupport {
private RedeliveryPolicy redeliveryPolicy = new RedeliveryPolicy();
+ private Processor onRedelivery;
private ExceptionPolicyStrategy exceptionPolicyStrategy =
ErrorHandlerSupport.createDefaultExceptionPolicyStrategy();
private ProcessorFactory deadLetterFactory;
private Processor defaultDeadLetterEndpoint;
@@ -63,8 +64,8 @@
}
public Processor createErrorHandler(RouteContext routeContext, Processor
processor) throws Exception {
- Processor deadLetter = getDeadLetterFactory().createProcessor();
- DeadLetterChannel answer = new DeadLetterChannel(processor,
deadLetter, getRedeliveryPolicy(), getLogger(), getExceptionPolicyStrategy());
+ Processor deadLetter = getDeadLetterFactory().createProcessor();
+ DeadLetterChannel answer = new DeadLetterChannel(processor,
deadLetter, onRedelivery, getRedeliveryPolicy(), getLogger(),
getExceptionPolicyStrategy());
configure(answer);
return answer;
}
@@ -162,6 +163,16 @@
return this;
}
+ /**
+ * Sets a processor that should be processed <b>before</b> a redelivey
attempt.
+ * <p/>
+ * Can be used to change the {...@link org.apache.camel.Exchange}
<b>before</b> its being redelivered.
+ */
+ public DeadLetterChannelBuilder onRedelivery(Processor processor) {
+ setOnRedelivery(processor);
+ return this;
+ }
+
// Properties
//
-------------------------------------------------------------------------
public RedeliveryPolicy getRedeliveryPolicy() {
@@ -260,6 +271,14 @@
this.exceptionPolicyStrategy = exceptionPolicyStrategy;
}
+ public Processor getOnRedelivery() {
+ return onRedelivery;
+ }
+
+ public void setOnRedelivery(Processor onRedelivery) {
+ this.onRedelivery = onRedelivery;
+ }
+
@Override
public String toString() {
return "DeadLetterChannelBuilder(" + (deadLetterFactory != null ?
deadLetterFactory : defaultDeadLetterEndpoint) + ")";
Modified:
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/DeadLetterChannel.java
URL:
http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/DeadLetterChannel.java?rev=732793&r1=732792&r2=732793&view=diff
==============================================================================
---
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/DeadLetterChannel.java
(original)
+++
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/DeadLetterChannel.java
Thu Jan 8 11:01:22 2009
@@ -58,6 +58,7 @@
private AsyncProcessor outputAsync;
private RedeliveryPolicy redeliveryPolicy;
private Logger logger;
+ private Processor redeliveryProcessor;
private class RedeliveryData {
int redeliveryCounter;
@@ -105,22 +106,17 @@
}
}
- public DeadLetterChannel(Processor output, Processor deadLetter) {
- this(output, deadLetter, new RedeliveryPolicy(),
DeadLetterChannel.createDefaultLogger(),
- ErrorHandlerSupport.createDefaultExceptionPolicyStrategy());
- }
-
- public DeadLetterChannel(Processor output, Processor deadLetter,
RedeliveryPolicy redeliveryPolicy, Logger logger, ExceptionPolicyStrategy
exceptionPolicyStrategy) {
- this.deadLetter = deadLetter;
+ public DeadLetterChannel(Processor output, Processor deadLetter, Processor
redeliveryProcessor, RedeliveryPolicy redeliveryPolicy, Logger logger,
ExceptionPolicyStrategy exceptionPolicyStrategy) {
this.output = output;
+ this.deadLetter = deadLetter;
+ this.redeliveryProcessor = redeliveryProcessor;
this.outputAsync = AsyncProcessorTypeConverter.convert(output);
-
this.redeliveryPolicy = redeliveryPolicy;
this.logger = logger;
setExceptionPolicy(exceptionPolicyStrategy);
}
- public static <E extends Exchange> Logger createDefaultLogger() {
+ public static Logger createDefaultLogger() {
return new Logger(LOG, LoggingLevel.ERROR);
}
@@ -170,7 +166,6 @@
boolean shouldRedeliver = shouldRedeliver(exchange, data);
if (!shouldRedeliver) {
return deliverToFaultProcessor(exchange, callback, data);
-
}
// if we are redelivering then sleep before trying again
@@ -182,6 +177,9 @@
// wait until we should redeliver
data.redeliveryDelay =
data.currentRedeliveryPolicy.sleep(data.redeliveryDelay);
+
+ // letting onRedeliver be executed
+ deliverToRedeliveryProcessor(exchange, callback, data);
}
// process the exchange
@@ -257,7 +255,10 @@
// wait until we should redeliver
data.redeliveryDelay =
data.currentRedeliveryPolicy.getRedeliveryDelay(data.redeliveryDelay);
timer.schedule(new RedeliverTimerTask(exchange, callback, data),
data.redeliveryDelay);
- }
+
+ // letting onRedeliver be executed
+ deliverToRedeliveryProcessor(exchange, callback, data);
+ }
}
private void handleException(Exchange exchange, RedeliveryData data) {
@@ -274,13 +275,37 @@
Processor processor = exceptionPolicy.getErrorHandler();
if (processor != null) {
data.failureProcessor = processor;
- }
+ }
}
-
- logFailedDelivery(true, exchange, "Failed delivery for exchangeId: " +
exchange.getExchangeId()
- + ". On delivery attempt: " + data.redeliveryCounter + "
caught: " + e, data, e);
+
+ String msg = "Failed delivery for exchangeId: " +
exchange.getExchangeId()
+ + ". On delivery attempt: " + data.redeliveryCounter + "
caught: " + e;
+ logFailedDelivery(true, exchange, msg, data, e);
+
data.redeliveryCounter = incrementRedeliveryCounter(exchange, e);
-
+ }
+
+ /**
+ * Gives an optional configure redelivery processor a chance to process
before the Exchange
+ * will be redelivered. This can be used to alter the Exchange.
+ */
+ private boolean deliverToRedeliveryProcessor(final Exchange exchange,
final AsyncCallback callback,
+ final RedeliveryData data) {
+ if (redeliveryProcessor == null) {
+ return true;
+ }
+
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("RedeliveryProcessor " + redeliveryProcessor + " is
processing Exchange before its redelivered");
+ }
+ AsyncProcessor afp =
AsyncProcessorTypeConverter.convert(redeliveryProcessor);
+ boolean sync = afp.process(exchange, new AsyncCallback() {
+ public void done(boolean sync) {
+ callback.done(data.sync);
+ }
+ });
+
+ return sync;
}
private boolean deliverToFaultProcessor(final Exchange exchange, final
AsyncCallback callback,
@@ -299,11 +324,10 @@
}
});
- // The line below shouldn't be needed, it is invoked by the
AsyncCallback above
- // restoreExceptionOnExchange(exchange, data.handledPredicate);
- logFailedDelivery(false, exchange, "Failed delivery for exchangeId: "
+ exchange.getExchangeId()
- + ". Handled by the failure
processor: " + data.failureProcessor,
- data, null);
+ String msg = "Failed delivery for exchangeId: " +
exchange.getExchangeId()
+ + ". Handled by the failure processor: " +
data.failureProcessor;
+ logFailedDelivery(false, exchange, msg, data, null);
+
return sync;
}
Copied:
activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/DeadLetterChannelOnRedeliveryTest.java
(from r732654,
activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/interceptor/InterceptAlterMessageBeforeRedeliveryTest.java)
URL:
http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/DeadLetterChannelOnRedeliveryTest.java?p2=activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/DeadLetterChannelOnRedeliveryTest.java&p1=activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/interceptor/InterceptAlterMessageBeforeRedeliveryTest.java&r1=732654&r2=732793&rev=732793&view=diff
==============================================================================
---
activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/interceptor/InterceptAlterMessageBeforeRedeliveryTest.java
(original)
+++
activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/DeadLetterChannelOnRedeliveryTest.java
Thu Jan 8 11:01:22 2009
@@ -14,23 +14,23 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.camel.processor.interceptor;
+package org.apache.camel.processor;
import org.apache.camel.ContextTestSupport;
import org.apache.camel.Exchange;
import org.apache.camel.Processor;
-import org.apache.camel.processor.onexception.MyTechnicalException;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.processor.onexception.MyTechnicalException;
/**
* Unit test for testing possibility to modify exchange before redelivering
*/
-public class InterceptAlterMessageBeforeRedeliveryTest extends
ContextTestSupport {
+public class DeadLetterChannelOnRedeliveryTest extends ContextTestSupport {
static int counter;
- public void testInterceptAlterMessageBeforeRedelivery() throws Exception {
+ public void testOnExceptionAlterMessageBeforeRedelivery() throws Exception
{
MockEndpoint mock = getMockEndpoint("mock:result");
mock.expectedBodiesReceived("Hello World123");
@@ -39,7 +39,7 @@
assertMockEndpointsSatisfied();
}
- public void testInterceptAlterMessageWithHeadersBeforeRedelivery() throws
Exception {
+ public void testOnExceptionAlterMessageWithHeadersBeforeRedelivery()
throws Exception {
MockEndpoint mock = getMockEndpoint("mock:result");
mock.expectedBodiesReceived("Hello World123");
mock.expectedHeaderReceived("foo", "123");
@@ -60,25 +60,14 @@
return new RouteBuilder() {
@Override
public void configure() throws Exception {
- // to execute unit test much faster we dont use delay between
redeliveries
- errorHandler(deadLetterChannel("mock:error").delay(0L));
-
// START SNIPPET: e1
- // we configure an interceptor that is triggered when the
redelivery flag
- // has been set TRUE on an exchange
-
intercept().when(header("org.apache.camel.Redelivered").isEqualTo(Boolean.TRUE)).
- process(new Processor() {
- public void process(Exchange exchange) throws
Exception {
- // the message is being redelivered so we can
alter it
-
- // we just append the redelivery counter to
the body
- // you can of course do all kind of stuff
instead
- String body =
exchange.getIn().getBody(String.class);
- int count =
exchange.getIn().getHeader("org.apache.camel.RedeliveryCounter", Integer.class);
-
- exchange.getIn().setBody(body + count);
- }
- });
+ // we configure our Dead Letter Channel to invoke
+ // MyRedeliveryProcessor before a redelivery is
+ // attempted. This allows us to alter the message before
+ errorHandler(deadLetterChannel("mock:error")
+ .onRedelivery(new MyRedeliverPrcessor())
+ // setting delay to zero is just to make unit teting
faster
+ .delay(0L));
// END SNIPPET: e1
@@ -95,5 +84,23 @@
};
}
+ // START SNIPPET: e2
+ // This is our processor that is executed before every redelivery attempt
+ // here we can do what we want in the java code, such as altering the
message
+ public class MyRedeliverPrcessor implements Processor {
+
+ public void process(Exchange exchange) throws Exception {
+ // the message is being redelivered so we can alter it
+
+ // we just append the redelivery counter to the body
+ // you can of course do all kind of stuff instead
+ String body = exchange.getIn().getBody(String.class);
+ int count =
exchange.getIn().getHeader("org.apache.camel.RedeliveryCounter", Integer.class);
+
+ exchange.getIn().setBody(body + count);
+ }
+ }
+ // END SNIPPET: e2
+
}
\ No newline at end of file
Propchange:
activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/DeadLetterChannelOnRedeliveryTest.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange:
activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/DeadLetterChannelOnRedeliveryTest.java
------------------------------------------------------------------------------
svn:keywords = Rev Date
Propchange:
activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/DeadLetterChannelOnRedeliveryTest.java
------------------------------------------------------------------------------
svn:mergeinfo =
Modified:
activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/interceptor/InterceptAlterMessageBeforeRedeliveryTest.java
URL:
http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/interceptor/InterceptAlterMessageBeforeRedeliveryTest.java?rev=732793&r1=732792&r2=732793&view=diff
==============================================================================
---
activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/interceptor/InterceptAlterMessageBeforeRedeliveryTest.java
(original)
+++
activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/interceptor/InterceptAlterMessageBeforeRedeliveryTest.java
Thu Jan 8 11:01:22 2009
@@ -19,9 +19,9 @@
import org.apache.camel.ContextTestSupport;
import org.apache.camel.Exchange;
import org.apache.camel.Processor;
-import org.apache.camel.processor.onexception.MyTechnicalException;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.processor.onexception.MyTechnicalException;
/**
* Unit test for testing possibility to modify exchange before redelivering