Author: davsclaus
Date: Sat Apr 7 07:58:18 2012
New Revision: 1310691
URL: http://svn.apache.org/viewvc?rev=1310691&view=rev
Log:
CAMEL-5148: Aggregate EIP now supports TimeoutAwareAggregationStrategy.
Added:
camel/branches/camel-2.9.x/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateTimeoutTest.java
- copied unchanged from r1310690,
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateTimeoutTest.java
Modified:
camel/branches/camel-2.9.x/ (props changed)
camel/branches/camel-2.9.x/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java
camel/branches/camel-2.9.x/camel-core/src/main/java/org/apache/camel/processor/aggregate/TimeoutAwareAggregationStrategy.java
Propchange: camel/branches/camel-2.9.x/
------------------------------------------------------------------------------
svn:mergeinfo = /camel/trunk:1310690
Propchange: camel/branches/camel-2.9.x/
------------------------------------------------------------------------------
Binary property 'svnmerge-integrated' - no diff available.
Modified:
camel/branches/camel-2.9.x/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java
URL:
http://svn.apache.org/viewvc/camel/branches/camel-2.9.x/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java?rev=1310691&r1=1310690&r2=1310691&view=diff
==============================================================================
---
camel/branches/camel-2.9.x/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java
(original)
+++
camel/branches/camel-2.9.x/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java
Sat Apr 7 07:58:18 2012
@@ -371,6 +371,15 @@ public class AggregateProcessor extends
closedCorrelationKeys.put(key, key);
}
+ if (fromTimeout) {
+ // invoke timeout if its timeout aware aggregation strategy,
+ // to allow any custom processing before discarding the exchange
+ if (aggregationStrategy instanceof
TimeoutAwareAggregationStrategy) {
+ long timeout = getCompletionTimeout() > 0 ?
getCompletionTimeout() : -1;
+ ((TimeoutAwareAggregationStrategy)
aggregationStrategy).timeout(exchange, -1, -1, timeout);
+ }
+ }
+
if (fromTimeout && isDiscardOnCompletionTimeout()) {
// discard due timeout
LOG.debug("Aggregation for correlation key {} discarding
aggregated exchange: ()", key, exchange);
Modified:
camel/branches/camel-2.9.x/camel-core/src/main/java/org/apache/camel/processor/aggregate/TimeoutAwareAggregationStrategy.java
URL:
http://svn.apache.org/viewvc/camel/branches/camel-2.9.x/camel-core/src/main/java/org/apache/camel/processor/aggregate/TimeoutAwareAggregationStrategy.java?rev=1310691&r1=1310690&r2=1310691&view=diff
==============================================================================
---
camel/branches/camel-2.9.x/camel-core/src/main/java/org/apache/camel/processor/aggregate/TimeoutAwareAggregationStrategy.java
(original)
+++
camel/branches/camel-2.9.x/camel-core/src/main/java/org/apache/camel/processor/aggregate/TimeoutAwareAggregationStrategy.java
Sat Apr 7 07:58:18 2012
@@ -32,9 +32,9 @@ public interface TimeoutAwareAggregation
*
* @param oldExchange the current aggregated exchange, or the original
{@link Exchange} if no aggregation
* has been done before the timeout occurred
- * @param index the index
- * @param total the total
- * @param timeout the timeout value in millis
+ * @param index the index, may be <tt>-1</tt> if not possible to
determine the index
+ * @param total the total, may be <tt>-1</tt> if not possible to
determine the total
+ * @param timeout the timeout value in millis, may be <tt>-1</tt> if
not possible to determine the timeout
*/
void timeout(Exchange oldExchange, int index, int total, long timeout);
}