Author: davsclaus
Date: Mon Dec 19 12:26:56 2011
New Revision: 1220711
URL: http://svn.apache.org/viewvc?rev=1220711&view=rev
Log:
CAMEL-4795: Fixed issue with throttler/delayer in async delayed mode, calling
exchange done 2 times under certain conditions.
Added:
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointDelayUoWTest.java
- copied, changed from r1220644,
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointUoWTest.java
Modified:
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/DelayProcessorSupport.java
Modified:
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/DelayProcessorSupport.java
URL:
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/DelayProcessorSupport.java?rev=1220711&r1=1220710&r2=1220711&view=diff
==============================================================================
---
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/DelayProcessorSupport.java
(original)
+++
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/DelayProcessorSupport.java
Mon Dec 19 12:26:56 2011
@@ -57,9 +57,18 @@ public abstract class DelayProcessorSupp
if (!isRunAllowed()) {
exchange.setException(new RejectedExecutionException("Run is
not allowed"));
}
- DelayProcessorSupport.super.process(exchange, callback);
- // signal callback we are done async
- callback.done(false);
+
+ // process the exchange now that we woke up
+ DelayProcessorSupport.super.process(exchange, new AsyncCallback() {
+ @Override
+ public void done(boolean doneSync) {
+ log.trace("Delayed task done for exchangeId: {}",
exchange.getExchangeId());
+ // we must done the callback from this async callback as
well, to ensure callback is done correctly
+ // must invoke done on callback with false, as that is
what the original caller would
+ // expect as we returned false in the process method
+ callback.done(false);
+ }
+ });
}
}
Copied:
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointDelayUoWTest.java
(from r1220644,
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointUoWTest.java)
URL:
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointDelayUoWTest.java?p2=camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointDelayUoWTest.java&p1=camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointUoWTest.java&r1=1220644&r2=1220711&rev=1220711&view=diff
==============================================================================
---
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointUoWTest.java
(original)
+++
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointDelayUoWTest.java
Mon Dec 19 12:26:56 2011
@@ -27,7 +27,7 @@ import org.apache.camel.support.Synchron
/**
* @version
*/
-public class AsyncEndpointUoWTest extends ContextTestSupport {
+public class AsyncEndpointDelayUoWTest extends ContextTestSupport {
private static String beforeThreadName;
private static String afterThreadName;
@@ -56,8 +56,6 @@ public class AsyncEndpointUoWTest extend
return new RouteBuilder() {
@Override
public void configure() throws Exception {
- context.addComponent("async", new MyAsyncComponent());
-
from("direct:start")
.process(new Processor() {
public void process(Exchange exchange) throws
Exception {
@@ -67,12 +65,13 @@ public class AsyncEndpointUoWTest extend
})
.to("mock:before")
.to("log:before")
- .to("async:bye:camel")
+ .delay(500).asyncDelayed()
.process(new Processor() {
public void process(Exchange exchange) throws
Exception {
afterThreadName =
Thread.currentThread().getName();
}
})
+ .transform().constant("Bye Camel")
.to("log:after")
.to("mock:after")
.to("mock:result");