Author: davsclaus
Date: Sat Apr 7 07:55:15 2012
New Revision: 1310690
URL: http://svn.apache.org/viewvc?rev=1310690&view=rev
Log:
CAMEL-5148: Aggregate EIP now supports TimeoutAwareAggregationStrategy.
Added:
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateTimeoutTest.java
- copied, changed from r1310679,
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateDiscardOnTimeoutTest.java
Modified:
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/TimeoutAwareAggregationStrategy.java
Modified:
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java
URL:
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java?rev=1310690&r1=1310689&r2=1310690&view=diff
==============================================================================
---
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java
(original)
+++
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java
Sat Apr 7 07:55:15 2012
@@ -375,6 +375,15 @@ public class AggregateProcessor extends
closedCorrelationKeys.put(key, key);
}
+ if (fromTimeout) {
+ // invoke timeout if its timeout aware aggregation strategy,
+ // to allow any custom processing before discarding the exchange
+ if (aggregationStrategy instanceof
TimeoutAwareAggregationStrategy) {
+ long timeout = getCompletionTimeout() > 0 ?
getCompletionTimeout() : -1;
+ ((TimeoutAwareAggregationStrategy)
aggregationStrategy).timeout(exchange, -1, -1, timeout);
+ }
+ }
+
if (fromTimeout && isDiscardOnCompletionTimeout()) {
// discard due timeout
LOG.debug("Aggregation for correlation key {} discarding
aggregated exchange: ()", key, exchange);
Modified:
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/TimeoutAwareAggregationStrategy.java
URL:
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/TimeoutAwareAggregationStrategy.java?rev=1310690&r1=1310689&r2=1310690&view=diff
==============================================================================
---
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/TimeoutAwareAggregationStrategy.java
(original)
+++
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/TimeoutAwareAggregationStrategy.java
Sat Apr 7 07:55:15 2012
@@ -32,9 +32,9 @@ public interface TimeoutAwareAggregation
*
* @param oldExchange the current aggregated exchange, or the original
{@link Exchange} if no aggregation
* has been done before the timeout occurred
- * @param index the index
- * @param total the total
- * @param timeout the timeout value in millis
+ * @param index the index, may be <tt>-1</tt> if not possible to
determine the index
+ * @param total the total, may be <tt>-1</tt> if not possible to
determine the total
+ * @param timeout the timeout value in millis, may be <tt>-1</tt> if
not possible to determine the timeout
*/
void timeout(Exchange oldExchange, int index, int total, long timeout);
}
Copied:
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateTimeoutTest.java
(from r1310679,
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateDiscardOnTimeoutTest.java)
URL:
http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateTimeoutTest.java?p2=camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateTimeoutTest.java&p1=camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateDiscardOnTimeoutTest.java&r1=1310679&r2=1310690&rev=1310690&view=diff
==============================================================================
---
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateDiscardOnTimeoutTest.java
(original)
+++
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateTimeoutTest.java
Sat Apr 7 07:55:15 2012
@@ -17,18 +17,22 @@
package org.apache.camel.processor.aggregator;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
import org.apache.camel.ContextTestSupport;
+import org.apache.camel.Exchange;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.component.mock.MockEndpoint;
-import org.apache.camel.processor.BodyInAggregatingStrategy;
+import org.apache.camel.processor.aggregate.TimeoutAwareAggregationStrategy;
/**
* @version
*/
-public class AggregateDiscardOnTimeoutTest extends ContextTestSupport {
+public class AggregateTimeoutTest extends ContextTestSupport {
- public void testAggregateDiscardOnTimeout() throws Exception {
+ private static final AtomicInteger invoked = new AtomicInteger();
+
+ public void testAggregateTimeout() throws Exception {
MockEndpoint mock = getMockEndpoint("mock:aggregated");
mock.expectedMessageCount(0);
@@ -40,6 +44,9 @@ public class AggregateDiscardOnTimeoutTe
mock.assertIsSatisfied();
+ // should invoke the timeout method
+ assertEquals(1, invoked.get());
+
// now send 3 which does not timeout
mock.reset();
mock.expectedBodiesReceived("C+D+E");
@@ -50,6 +57,9 @@ public class AggregateDiscardOnTimeoutTe
// should complete before timeout
mock.await(1500, TimeUnit.MILLISECONDS);
+
+ // should not invoke the timeout method
+ assertEquals(1, invoked.get());
}
@Override
@@ -57,17 +67,38 @@ public class AggregateDiscardOnTimeoutTe
return new RouteBuilder() {
@Override
public void configure() throws Exception {
- // START SNIPPET: e1
from("direct:start")
- .aggregate(header("id"), new BodyInAggregatingStrategy())
+ .aggregate(header("id"), new MyAggregationStrategy())
+ .discardOnCompletionTimeout()
.completionSize(3)
- // use a 3 second timeout
+ // use a 2 second timeout
.completionTimeout(2000)
- // and if timeout occurred then just discard the
aggregated message
- .discardOnCompletionTimeout()
.to("mock:aggregated");
- // END SNIPPET: e1
}
};
}
+
+ private static class MyAggregationStrategy implements
TimeoutAwareAggregationStrategy {
+
+ public void timeout(Exchange oldExchange, int index, int total, long
timeout) {
+ invoked.incrementAndGet();
+
+ assertEquals(2000, timeout);
+ assertEquals(-1, total);
+ assertEquals(-1, index);
+ assertNotNull(oldExchange);
+ assertEquals("AB", oldExchange.getIn().getBody());
+ }
+
+ public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
+ if (oldExchange == null) {
+ return newExchange;
+ }
+
+ String body = oldExchange.getIn().getBody(String.class);
+ oldExchange.getIn().setBody(body +
newExchange.getIn().getBody(String.class));
+ return oldExchange;
+ }
+ }
+
}