Modified: websites/production/camel/content/aggregator2.html
==============================================================================
--- websites/production/camel/content/aggregator2.html (original)
+++ websites/production/camel/content/aggregator2.html Thu Aug 31 23:20:55 2017
@@ -75,190 +75,8 @@
        <tbody>
         <tr>
         <td valign="top" width="100%">
-<div class="wiki-content maincontent"><h3 
id="Aggregator2-Aggregator">Aggregator</h3><p><strong>This applies for Camel 
version 2.3 or newer. If you use an older version then use this <a shape="rect" 
href="aggregator.html">Aggregator</a> link instead.</strong></p><p>The <a 
shape="rect" class="external-link" 
href="http://www.enterpriseintegrationpatterns.com/Aggregator.html"; 
rel="nofollow">Aggregator</a> from the <a shape="rect" 
href="enterprise-integration-patterns.html">EIP patterns</a> allows you to 
combine a number of messages together into a single message.</p><p><span 
class="confluence-embedded-file-wrapper"><img class="confluence-embedded-image 
confluence-external-resource" 
src="http://www.enterpriseintegrationpatterns.com/img/Aggregator.gif"; 
data-image-src="http://www.enterpriseintegrationpatterns.com/img/Aggregator.gif";></span></p><p>A
 correlation <a shape="rect" href="expression.html">Expression</a> is used to 
determine the messages which should be aggregated together. If yo
 u want to aggregate all messages into a single message, just use a constant 
expression. An AggregationStrategy is used to combine all the message exchanges 
for a single correlation key into a single message exchange.</p><h3 
id="Aggregator2-Aggregatoroptions">Aggregator options</h3><p>The aggregator 
supports the following options:</p><parameter 
ac:name="class">confluenceTableSmall</parameter><rich-text-body><div 
class="table-wrap"><table class="confluenceTable"><tbody><tr><th colspan="1" 
rowspan="1" class="confluenceTh"><p>Option</p></th><th colspan="1" rowspan="1" 
class="confluenceTh"><p>Default</p></th><th colspan="1" rowspan="1" 
class="confluenceTh"><p>Description</p></th></tr><tr><td colspan="1" 
rowspan="1" class="confluenceTd"><p>correlationExpression</p></td><td 
colspan="1" rowspan="1" class="confluenceTd"><p>&#160;</p></td><td colspan="1" 
rowspan="1" class="confluenceTd"><p>Mandatory <a shape="rect" 
href="expression.html">Expression</a> which evaluates the correlation key to u
 se for aggregation. The <a shape="rect" href="exchange.html">Exchange</a> 
which has the same correlation key is aggregated together. If the correlation 
key could not be evaluated an Exception is thrown. You can disable this by 
using the <code>ignoreBadCorrelationKeys</code> option.</p></td></tr><tr><td 
colspan="1" rowspan="1" class="confluenceTd"><p>aggregationStrategy</p></td><td 
colspan="1" rowspan="1" class="confluenceTd"><p>&#160;</p></td><td colspan="1" 
rowspan="1" class="confluenceTd"><p>Mandatory <code>AggregationStrategy</code> 
which is used to <em>merge</em> the incoming <a shape="rect" 
href="exchange.html">Exchange</a> with the existing already merged exchanges. 
At first call the <code>oldExchange</code> parameter is <code>null</code>. On 
subsequent invocations the <code>oldExchange</code> contains the merged 
exchanges and <code>newExchange</code> is of course the new incoming Exchange. 
From <strong>Camel 2.9.2</strong> onwards the strategy can also be a 
<code>TimeoutAware
 AggregationStrategy</code> implementation, supporting the timeout callback, 
see further below for more details. From <strong>Camel 2.16</strong> onwards 
the strategy can also be a <code>PreCompletionAwareAggregationStrategy</code> 
implementation which then runs the completion check in pre-completion mode. See 
further below for more details.</p></td></tr><tr><td colspan="1" rowspan="1" 
class="confluenceTd"><p>strategyRef</p></td><td colspan="1" rowspan="1" 
class="confluenceTd"><p>&#160;</p></td><td colspan="1" rowspan="1" 
class="confluenceTd"><p>A reference to lookup the 
<code>AggregationStrategy</code> in the <a shape="rect" 
href="registry.html">Registry</a>. From <strong>Camel 2.12</strong> onwards you 
can also use a POJO as the <code>AggregationStrategy</code>, see further below 
for details.</p></td></tr><tr><td colspan="1" rowspan="1" 
class="confluenceTd"><p>strategyMethodName</p></td><td colspan="1" rowspan="1" 
class="confluenceTd"><p>&#160;</p></td><td colspan="1" rowspan="1" c
 lass="confluenceTd"><p><strong>Camel 2.12:</strong> This option can be used to 
explicit declare the method name to use, when using POJOs as the 
<code>AggregationStrategy</code>. See further below for more 
details.</p></td></tr><tr><td colspan="1" rowspan="1" 
class="confluenceTd"><p>strategyMethodAllowNull</p></td><td colspan="1" 
rowspan="1" class="confluenceTd"><p><code>false</code></p></td><td colspan="1" 
rowspan="1" class="confluenceTd"><p><strong>Camel 2.12:</strong> If this option 
is <code>false</code> then the aggregate method is not used for the very first 
aggregation. If this option is <code>true</code> then <code>null</code> values 
is used as the <code>oldExchange</code> (at the very first aggregation), when 
using POJOs as the <code>AggregationStrategy</code>. See further below for more 
details.</p></td></tr><tr><td colspan="1" rowspan="1" 
class="confluenceTd"><p>completionSize</p></td><td colspan="1" rowspan="1" 
class="confluenceTd"><p>&#160;</p></td><td colspan="1" rowspan
 ="1" class="confluenceTd"><p>Number of messages aggregated before the 
aggregation is complete. This option can be set as either a fixed value or 
using an <a shape="rect" href="expression.html">Expression</a> which allows you 
to evaluate a size dynamically - will use <code>Integer</code> as result. If 
both are set Camel will fallback to use the fixed value if the <a shape="rect" 
href="expression.html">Expression</a> result was <code>null</code> or 
<code>0</code>.</p></td></tr><tr><td colspan="1" rowspan="1" 
class="confluenceTd"><p>completionTimeout</p></td><td colspan="1" rowspan="1" 
class="confluenceTd"><p>&#160;</p></td><td colspan="1" rowspan="1" 
class="confluenceTd"><p>Time in millis that an aggregated exchange should be 
inactive before its complete. This option can be set as either a fixed value or 
using an <a shape="rect" href="expression.html">Expression</a> which allows you 
to evaluate a timeout dynamically - will use <code>Long</code> as result. If 
both are set Camel will fa
 llback to use the fixed value if the <a shape="rect" 
href="expression.html">Expression</a> result was <code>null</code> or 
<code>0</code>. You cannot use this option together with completionInterval, 
only one of the two can be used.</p></td></tr><tr><td colspan="1" rowspan="1" 
class="confluenceTd"><p>completionInterval</p></td><td colspan="1" rowspan="1" 
class="confluenceTd"><p>&#160;</p></td><td colspan="1" rowspan="1" 
class="confluenceTd"><p>A repeating period in millis by which the aggregator 
will complete all current aggregated exchanges. Camel has a background task 
which is triggered every period. You cannot use this option together with 
completionTimeout, only one of them can be used.</p></td></tr><tr><td 
colspan="1" rowspan="1" class="confluenceTd"><p>completionPredicate</p></td><td 
colspan="1" rowspan="1" class="confluenceTd"><p>&#160;</p></td><td colspan="1" 
rowspan="1" class="confluenceTd"><p>A <a shape="rect" 
href="predicate.html">Predicate</a> to indicate when an aggrega
 ted exchange is complete. Starting in <strong>Camel 2.15</strong>, if this is 
not specified and the AggregationStrategy object implements Predicate, the 
aggregationStrategy object will be used as the 
completionPredicate.</p></td></tr><tr><td colspan="1" rowspan="1" 
class="confluenceTd"><p>completionFromBatchConsumer</p></td><td colspan="1" 
rowspan="1" class="confluenceTd"><p><code>false</code></p></td><td colspan="1" 
rowspan="1" class="confluenceTd"><p>This option is if the exchanges are coming 
from a <a shape="rect" href="batch-consumer.html">Batch Consumer</a>. Then when 
enabled the <a shape="rect" href="aggregator2.html">Aggregator2</a> will use 
the batch size determined by the <a shape="rect" 
href="batch-consumer.html">Batch Consumer</a> in the message header 
<code>CamelBatchSize</code>. See more details at <a shape="rect" 
href="batch-consumer.html">Batch Consumer</a>. This can be used to aggregate 
all files consumed from a <a shape="rect" href="file2.html">File</a> endpoint 
in 
 that given poll.</p></td></tr><tr><td colspan="1" rowspan="1" 
class="confluenceTd"><p>forceCompletionOnStop</p></td><td colspan="1" 
rowspan="1" class="confluenceTd"><p><code>false</code></p></td><td colspan="1" 
rowspan="1" class="confluenceTd"><p><strong>Camel 2.9</strong> Indicates to 
complete all current aggregated exchanges when the context is 
stopped</p></td></tr><tr><td colspan="1" rowspan="1" 
class="confluenceTd">completeAllOnStop</td><td colspan="1" rowspan="1" 
class="confluenceTd"><code>false</code></td><td colspan="1" rowspan="1" 
class="confluenceTd"><strong>Camel 2.16:</strong>&#160;Indicates to wait to 
complete all current and partial (pending) aggregated exchanges when the 
context is stopped. This also means that we will wait for all pending exchanges 
which are stored in the&#160;aggregation repository&#160;to complete so the 
repository is empty before we can stop. &#160;You may want to enable this when 
using the memory based aggregation repository that is memory based o
 nly,&#160;and do not store data on disk. When this option is enabled, then the 
aggregator is waiting to complete&#160;all those exchanges before its stopped, 
when stopping CamelContext or the route using it.</td></tr><tr><td colspan="1" 
rowspan="1" class="confluenceTd"><p>eagerCheckCompletion</p></td><td 
colspan="1" rowspan="1" class="confluenceTd"><p><code>false</code></p></td><td 
colspan="1" rowspan="1" class="confluenceTd"><p>Whether or not to eager check 
for completion when a new incoming <a shape="rect" 
href="exchange.html">Exchange</a> has been received. This option influences the 
behavior of the <code>completionPredicate</code> option as the <a shape="rect" 
href="exchange.html">Exchange</a> being passed in changes accordingly. When 
<code>false</code> the <a shape="rect" href="exchange.html">Exchange</a> passed 
in the <a shape="rect" href="predicate.html">Predicate</a> is the 
<em>aggregated</em> Exchange which means any information you may store on the 
aggregated Exchange from
  the <code>AggregationStrategy</code> is available for the <a shape="rect" 
href="predicate.html">Predicate</a>. When <code>true</code> the <a shape="rect" 
href="exchange.html">Exchange</a> passed in the <a shape="rect" 
href="predicate.html">Predicate</a> is the <em>incoming</em> <a shape="rect" 
href="exchange.html">Exchange</a>, which means you can access data from the 
incoming Exchange.</p></td></tr><tr><td colspan="1" rowspan="1" 
class="confluenceTd"><p>groupExchanges</p></td><td colspan="1" rowspan="1" 
class="confluenceTd"><p><code>false</code></p></td><td colspan="1" rowspan="1" 
class="confluenceTd"><p>If enabled then Camel will group all aggregated 
Exchanges into a single combined 
<code>org.apache.camel.impl.GroupedExchange</code> holder class that holds all 
the aggregated Exchanges. And as a result only one Exchange is being sent out 
from the aggregator. Can be used to combine many incoming Exchanges into a 
single output Exchange without coding a custom <code>AggregationStrate
 gy</code> yourself. <strong>Important:</strong> This option does 
<strong>not</strong> support persistent repository with the aggregator. See 
further below for an example and more details.</p></td></tr><tr><td colspan="1" 
rowspan="1" class="confluenceTd"><p>ignoreInvalidCorrelationKeys</p></td><td 
colspan="1" rowspan="1" class="confluenceTd"><p><code>false</code></p></td><td 
colspan="1" rowspan="1" class="confluenceTd"><p>Whether or not to ignore 
correlation keys which could not be evaluated to a value. By default Camel will 
throw an Exception, but you can enable this option and ignore the situation 
instead.</p></td></tr><tr><td colspan="1" rowspan="1" 
class="confluenceTd"><p>closeCorrelationKeyOnCompletion</p></td><td colspan="1" 
rowspan="1" class="confluenceTd"><p>&#160;</p></td><td colspan="1" rowspan="1" 
class="confluenceTd"><p>Whether or not too <em>late</em> Exchanges should be 
accepted or not. You can enable this to indicate that if a correlation key has 
already been completed
 , then any new exchanges with the same correlation key be denied. Camel will 
then throw a <code>closedCorrelationKeyException</code> exception. When using 
this option you pass in a <code>integer</code> which is a number for a LRUCache 
which keeps that last X number of closed correlation keys. You can pass in 0 or 
a negative value to indicate a unbounded cache. By passing in a number you are 
ensured that cache won't grow too big if you use a log of different correlation 
keys.</p></td></tr><tr><td colspan="1" rowspan="1" 
class="confluenceTd"><p>discardOnCompletionTimeout</p></td><td colspan="1" 
rowspan="1" class="confluenceTd"><p><code>false</code></p></td><td colspan="1" 
rowspan="1" class="confluenceTd"><p><strong>Camel 2.5:</strong> Whether or not 
exchanges which complete due to a timeout should be discarded. If enabled then 
when a timeout occurs the aggregated message will <strong>not</strong> be sent 
out but dropped (discarded).</p></td></tr><tr><td colspan="1" rowspan="1" 
class="
 confluenceTd"><p>aggregationRepository</p></td><td colspan="1" rowspan="1" 
class="confluenceTd"><p>&#160;</p></td><td colspan="1" rowspan="1" 
class="confluenceTd"><p>Allows you to plugin you own implementation of 
<code>org.apache.camel.spi.AggregationRepository</code> which keeps track of 
the current inflight aggregated exchanges. Camel uses by default a memory based 
implementation.</p></td></tr><tr><td colspan="1" rowspan="1" 
class="confluenceTd"><p>aggregationRepositoryRef</p></td><td colspan="1" 
rowspan="1" class="confluenceTd"><p>&#160;</p></td><td colspan="1" rowspan="1" 
class="confluenceTd"><p>Reference to lookup a 
<code>aggregationRepository</code> in the <a shape="rect" 
href="registry.html">Registry</a>.</p></td></tr><tr><td colspan="1" rowspan="1" 
class="confluenceTd"><p>parallelProcessing</p></td><td colspan="1" rowspan="1" 
class="confluenceTd"><p><code>false</code></p></td><td colspan="1" rowspan="1" 
class="confluenceTd"><p>When aggregated are completed they are being sen
 d out of the aggregator. This option indicates whether or not Camel should use 
a thread pool with multiple threads for concurrency. If no custom thread pool 
has been specified then Camel creates a default pool with 10 concurrent 
threads.</p></td></tr><tr><td colspan="1" rowspan="1" 
class="confluenceTd"><p>executorService</p></td><td colspan="1" rowspan="1" 
class="confluenceTd"><p>&#160;</p></td><td colspan="1" rowspan="1" 
class="confluenceTd"><p>If using <code>parallelProcessing</code> you can 
specify a custom thread pool to be used. In fact also if you are not using 
<code>parallelProcessing</code> this custom thread pool is used to send out 
aggregated exchanges as well.</p></td></tr><tr><td colspan="1" rowspan="1" 
class="confluenceTd"><p>executorServiceRef</p></td><td colspan="1" rowspan="1" 
class="confluenceTd"><p>&#160;</p></td><td colspan="1" rowspan="1" 
class="confluenceTd"><p>Reference to lookup a <code>executorService</code> in 
the <a shape="rect" href="registry.html">Registr
 y</a></p></td></tr><tr><td colspan="1" rowspan="1" 
class="confluenceTd"><p>timeoutCheckerExecutorService</p></td><td colspan="1" 
rowspan="1" class="confluenceTd"><p>&#160;</p></td><td colspan="1" rowspan="1" 
class="confluenceTd"><p><strong>Camel 2.9:</strong> If using either of the 
<code>completionTimeout</code>, <code>completionTimeoutExpression</code>, or 
<code>completionInterval</code> options a background thread is created to check 
for the completion for every aggregator. Set this option to provide a custom 
thread pool to be used rather than creating a new thread for every 
aggregator.</p></td></tr><tr><td colspan="1" rowspan="1" 
class="confluenceTd"><p>timeoutCheckerExecutorServiceRef</p></td><td 
colspan="1" rowspan="1" class="confluenceTd"><p>&#160;</p></td><td colspan="1" 
rowspan="1" class="confluenceTd"><p><strong>Camel 2.9:</strong> Reference to 
lookup a <code>timeoutCheckerExecutorService</code> in the <a shape="rect" 
href="registry.html">Registry</a></p></td></tr><tr><td c
 olspan="1" rowspan="1" class="confluenceTd"><p>optimisticLocking</p></td><td 
colspan="1" rowspan="1" class="confluenceTd"><p><code>false</code></p></td><td 
colspan="1" rowspan="1" class="confluenceTd"><p><strong>Camel 2.11:</strong> 
Turns on using optimistic locking, which requires the 
<code>aggregationRepository</code> being used, is supporting this by 
implementing the 
<code>org.apache.camel.spi.OptimisticLockingAggregationRepository</code> 
interface.</p></td></tr><tr><td colspan="1" rowspan="1" 
class="confluenceTd"><p>optimisticLockRetryPolicy</p></td><td colspan="1" 
rowspan="1" class="confluenceTd"><p>&#160;</p></td><td colspan="1" rowspan="1" 
class="confluenceTd"><p><strong>Camel 2.11.1:</strong> Allows to configure 
retry settings when using optimistic 
locking.</p></td></tr></tbody></table></div></rich-text-body><h3 
id="Aggregator2-ExchangeProperties">Exchange Properties</h3><p>The following 
properties are set on each aggregated Exchange:</p><parameter 
ac:name="class">confluence
 TableSmall</parameter><rich-text-body><div class="table-wrap"><table 
class="confluenceTable"><tbody><tr><th colspan="1" rowspan="1" 
class="confluenceTh"><p>header</p></th><th colspan="1" rowspan="1" 
class="confluenceTh"><p>type</p></th><th colspan="1" rowspan="1" 
class="confluenceTh"><p>description</p></th></tr><tr><td colspan="1" 
rowspan="1" 
class="confluenceTd"><p><code>CamelAggregatedSize</code></p></td><td 
colspan="1" rowspan="1" class="confluenceTd"><p>int</p></td><td colspan="1" 
rowspan="1" class="confluenceTd"><p>The total number of Exchanges aggregated 
into this combined Exchange.</p></td></tr><tr><td colspan="1" rowspan="1" 
class="confluenceTd"><p><code>CamelAggregatedCompletedBy</code></p></td><td 
colspan="1" rowspan="1" class="confluenceTd"><p>String</p></td><td colspan="1" 
rowspan="1" class="confluenceTd"><p>Indicator how the aggregation was completed 
as a value of either: <code>predicate</code>, <code>size</code>, 
<code>strategy</code>, <code>consumer</code>, <code>time
 out</code>, <code>forceCompletion</code> or 
<code>interval</code>.</p></td></tr></tbody></table></div></rich-text-body><h3 
id="Aggregator2-AboutAggregationStrategy">About AggregationStrategy</h3><p>The 
<code>AggregationStrategy</code> is used for aggregating the old (lookup by its 
correlation id) and the new exchanges together into a single exchange. Possible 
implementations include performing some kind of combining or delta processing, 
such as adding line items together into an invoice or just using the newest 
exchange and removing old exchanges such as for state tracking or market data 
prices; where old values are of little use.</p><p>Notice the aggregation 
strategy is a mandatory option and must be provided to the 
aggregator.</p><p>Here are a few example AggregationStrategy implementations 
that should help you create your own custom 
strategy.</p><plain-text-body>//simply combines Exchange String body values 
using '+' as a delimiter
-class StringAggregationStrategy implements AggregationStrategy {
-
-    public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
-        if (oldExchange == null) {
-            return newExchange;
-        }
-
-        String oldBody = oldExchange.getIn().getBody(String.class);
-        String newBody = newExchange.getIn().getBody(String.class);
-        oldExchange.getIn().setBody(oldBody + "+" + newBody);
-        return oldExchange;
-    }
-}
-
-//simply combines Exchange body values into an ArrayList&lt;Object&gt;
-class ArrayListAggregationStrategy implements AggregationStrategy {
-
-    public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
-               Object newBody = newExchange.getIn().getBody();
-               ArrayList&lt;Object&gt; list = null;
-        if (oldExchange == null) {
-                       list = new ArrayList&lt;Object&gt;();
-                       list.add(newBody);
-                       newExchange.getIn().setBody(list);
-                       return newExchange;
-        } else {
-               list = oldExchange.getIn().getBody(ArrayList.class);
-                       list.add(newBody);
-                       return oldExchange;
-               }
-    }
-}
-</plain-text-body><h3 id="Aggregator2-Aboutcompletion">About 
completion</h3><p>When aggregation <a shape="rect" 
href="exchange.html">Exchange</a>s at some point you need to indicate that the 
aggregated exchanges is complete, so they can be send out of the aggregator. 
Camel allows you to indicate completion in various ways as follows:</p><ul 
class="alternate"><li>completionTimeout - Is an inactivity timeout in which is 
triggered if no new exchanges have been aggregated for that particular 
correlation key within the period.</li><li>completionInterval - Once every X 
period all the current aggregated exchanges are 
completed.</li><li>completionSize - Is a number indicating that after X 
aggregated exchanges it's complete.</li><li>completionPredicate - Runs a <a 
shape="rect" href="predicate.html">Predicate</a> when a new exchange is 
aggregated to determine if we are complete or not. Staring 
in&#160;<strong>Camel 2.15</strong>, the configured aggregationStrategy can 
implement the Predicate 
 interface and will be used as the completionPredicate if no 
completionPredicate is configured. From&#160;<strong>Camel 2.16</strong>, the 
configured aggregationStrategy can 
implement&#160;<code>PreCompletionAwareAggregationStrategy</code> and will be 
used as the completionPredicate in pre-complete check mode. See further below 
for more details.</li><li>completionFromBatchConsumer - Special option for <a 
shape="rect" href="batch-consumer.html">Batch Consumer</a> which allows you to 
complete when all the messages from the batch has been 
aggregated.</li><li>forceCompletionOnStop - <strong>Camel 2.9</strong> 
Indicates to complete all current aggregated exchanges when the context is 
stopped</li><li>Using a&#160;<code>AggregateController</code> 
-&#160;<strong>Camel 2.16</strong> which allows to use an external source to 
complete groups or all groups. This can be done using Java or JMX 
API.</li></ul><p>Notice that all the completion ways are per correlation key. 
And you can combine them in
  any way you like. It's basically the first which triggers that wins. So you 
can use a completion size together with a completion timeout. Only 
completionTimeout and completionInterval cannot be used at the same 
time.</p><p>Notice the completion is a mandatory option and must be provided to 
the aggregator. If not provided Camel will thrown an Exception on 
startup.</p><parameter 
ac:name="title">Callbacks</parameter><rich-text-body><p>See the 
<code>TimeoutAwareAggregationStrategy</code> and 
<code>CompletionAwareAggregationStrategy</code> extensions to 
<code>AggregationStrategy</code> that has callbacks when the aggregated 
Exchange was completed and if a timeout occurred.</p></rich-text-body><h3 
id="Aggregator2-Pre-completionmode">Pre-completion 
mode</h3><p><strong>available as of Camel 2.16</strong></p><p>There can be 
use-cases where you want the incoming <a shape="rect" 
href="exchange.html">Exchange</a>&#160;to determine if the correlation group 
should pre-complete, and then the inco
 ming <a shape="rect" href="exchange.html">Exchange</a>&#160;is starting a new 
group from scratch. To determine this the&#160;<code>AggregationStrategy</code> 
can implement&#160;<code>PreCompletionAwareAggregationStrategy</code> which has 
a&#160;<code>preComplete</code> method:</p><plain-text-body>    /**
-     * Determines if the aggregation should complete the current group, and 
start a new group, or the aggregation
-     * should continue using the current group.
-     *
-     * @param oldExchange the oldest exchange (is &lt;tt&gt;null&lt;/tt&gt; on 
first aggregation as we only have the new exchange)
-     * @param newExchange the newest exchange (can be 
&lt;tt&gt;null&lt;/tt&gt; if there was no data possible to acquire)
-     * @return &lt;tt&gt;true&lt;/tt&gt; to complete current group and start a 
new group, or &lt;tt&gt;false&lt;/tt&gt; to keep using current
-     */
-    boolean preComplete(Exchange oldExchange, Exchange 
newExchange);</plain-text-body><p>If the preComplete method returns true, then 
the existing groups is completed (without aggregating the incoming exchange 
(newExchange). And then the newExchange is used to start the correlation group 
from scratch so the group would contain only that new incoming exchange. This 
is known as pre-completion mode. And when the aggregation is in pre-completion 
mode, then only the following completions are in use</p><ul 
style="list-style-type: square;"><li>aggregationStrategy must 
implement&#160;<code>PreCompletionAwareAggregationStrategy</code>&#160;xxx</li><li>completionTimeout
 or completionInterval can also be used as fallback completions</li><li>any 
other completion are not used (such as by size, from batch consumer 
etc)</li><li>eagerCheckCompletion is implied as true, but the option has no 
effect</li></ul><h3 id="Aggregator2-PersistentAggregationRepository">Persistent 
AggregationRepository</h3><p>
 The aggregator provides a pluggable repository which you can implement your 
own <code>org.apache.camel.spi.AggregationRepository</code>.<br clear="none"> 
If you need persistent repository then you can use either Camel <a shape="rect" 
href="hawtdb.html">HawtDB</a>, <a shape="rect" href="leveldb.html">LevelDB</a>, 
or <a shape="rect" href="sql-component.html">SQL Component</a> 
components.</p><h3 id="Aggregator2-Examples">Examples</h3><p>See some examples 
from the old <a shape="rect" href="aggregator.html">Aggregator</a> which is 
somewhat similar to this new aggregator.</p><parameter ac:name="title">Setting 
options in Spring XML</parameter><rich-text-body><p>Many of the options are 
configurable as attributes on the <code>&lt;aggregate&gt;</code> tag when using 
Spring XML.</p></rich-text-body><h4 
id="Aggregator2-UsingcompletionTimeout">Using completionTimeout</h4><p>In this 
example we want to aggregate all incoming messages and after 3 seconds of 
inactivity we want the aggregation to com
 plete. This is done using the <code>completionTimeout</code> option as 
shown:<plain-text-body>{snippet:id=e1|lang=java|url=camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateSimpleTimeoutTest.java}</plain-text-body>And
 the same example using Spring 
XML:<plain-text-body>{snippet:id=e1|lang=xml|url=camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/aggregator/SpringAggregateSimpleTimeoutTest.xml}</plain-text-body></p><h4
 id="Aggregator2-UsingTimeoutAwareAggregationStrategy">Using 
TimeoutAwareAggregationStrategy</h4><p><strong>Available as of Camel 
2.9.2</strong></p><p>If your aggregation strategy implements 
<code>TimeoutAwareAggregationStrategy</code>, then Camel will invoke the 
<code>timeout</code> method when the timeout occurs. Notice that the values for 
index and total parameters will be -1, and the timeout parameter will be 
provided only if configured as a fixed value. You must <strong>not</strong> 
throw any e
 xceptions from the <code>timeout</code> method.</p><h4 
id="Aggregator2-UsingCompletionAwareAggregationStrategy">Using 
CompletionAwareAggregationStrategy</h4><p><strong>Available as of Camel 
2.9.3</strong></p><p>If your aggregation strategy implements 
<code>CompletionAwareAggregationStrategy</code>, then Camel will invoke the 
<code>onComplete</code> method when the aggregated Exchange is completed. This 
allows you to do any last minute custom logic such as to cleanup some 
resources, or additional work on the exchange as it's now completed.<br 
clear="none"> You must <strong>not</strong> throw any exceptions from the 
<code>onCompletion</code> method.</p><h4 
id="Aggregator2-UsingcompletionSize">Using completionSize</h4><p>In this 
example we want to aggregate all incoming messages and when we have 3 messages 
aggregated (in the same correlation group) we want the aggregation to complete. 
This is done using the <code>completionSize</code> option as 
shown:<plain-text-body>{snippet:id=e1|lan
 
g=java|url=camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateSimpleSizeTest.java}</plain-text-body>And
 the same example using Spring 
XML:<plain-text-body>{snippet:id=e1|lang=xml|url=camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/aggregator/SpringAggregateSimpleSizeTest.xml}</plain-text-body></p><h4
 id="Aggregator2-UsingcompletionPredicate">Using completionPredicate</h4><p>In 
this example we want to aggregate all incoming messages and use a <a 
shape="rect" href="predicate.html">Predicate</a> to determine when we are 
complete. The <a shape="rect" href="predicate.html">Predicate</a> can be 
evaluated using either the aggregated exchange (default) or the incoming 
exchange. We will give an example for both situations. We start with the 
default situation as 
shown:<plain-text-body>{snippet:id=e1|lang=java|url=camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateSimplePredicateTest.java
 }</plain-text-body>And the same example using Spring 
XML:<plain-text-body>{snippet:id=e1|lang=xml|url=camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/aggregator/SpringAggregateSimplePredicateTest.xml}</plain-text-body>And
 the other situation where we use the <code>eagerCheckCompletion</code> option 
to tell Camel to use the incoming Exchange. Notice how we can just test in the 
completion predicate that the incoming message is the <em>END</em> 
message:<plain-text-body>{snippet:id=e1|lang=java|url=camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateSimplePredicateEagerTest.java}</plain-text-body>And
 the same example using Spring 
XML:<plain-text-body>{snippet:id=e1|lang=xml|url=camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/aggregator/SpringAggregateSimplePredicateEagerTest.xml}</plain-text-body></p><h4
 id="Aggregator2-UsingdynamiccompletionTimeout">Using dynamic completion
 Timeout</h4><p>In this example we want to aggregate all incoming messages and 
after a period of inactivity we want the aggregation to complete. The period 
should be computed at runtime based on the <code>timeout</code> header in the 
incoming messages. This is done using the <code>completionTimeout</code> option 
as 
shown:<plain-text-body>{snippet:id=e1|lang=java|url=camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateExpressionTimeoutTest.java}</plain-text-body>And
 the same example using Spring 
XML:<plain-text-body>{snippet:id=e1|lang=xml|url=camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/aggregator/SpringAggregateExpressionTimeoutTest.xml}</plain-text-body><strong>Note:</strong>
 You can also add a fixed timeout value and Camel will fallback to use this 
value if the dynamic value was <code>null</code> or <code>0</code>.</p><h4 
id="Aggregator2-UsingdynamiccompletionSize">Using dynamic 
completionSize</h4><p>In th
 is example we want to aggregate all incoming messages based on a dynamic size 
per correlation key. The size is computed at runtime based on the 
<code>mySize</code> header in the incoming messages. This is done using the 
<code>completionSize</code> option as 
shown:<plain-text-body>{snippet:id=e1|lang=java|url=camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateExpressionSizeTest.java}</plain-text-body>And
 the same example using Spring 
XML:<plain-text-body>{snippet:id=e1|lang=xml|url=camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/aggregator/SpringAggregateExpressionSizeTest.xml}</plain-text-body><strong>Note:</strong>
 You can also add a fixed size value and Camel will fallback to use this value 
if the dynamic value was <code>null</code> or <code>0</code>.</p><p><parameter 
ac:name=""><a shape="rect" href="using-this-pattern.html">Using This 
Pattern</a></parameter></p><h4 id="Aggregator2-Completingcurrentgroupdeci
 dedfromtheAggregationStrategy">Completing current group decided from the 
AggregationStrategy</h4><p><strong>Available as of Camel 
2.15</strong></p><p>The&#160;<code>AggregationStrategy</code> can now included 
a property on the returned&#160;<code>Exchange</code> that contains a boolean 
to indicate if the current group should be completed. This allows to overrule 
any existing completion predicates / sizes / timeouts etc, and complete the 
group.</p><p>For example the following logic (from an unit test) will complete 
the group if the message body size is larger than 5. This is done by setting 
the property&#160;<span style="line-height: 
1.4285715;">Exchange.AGGREGATION_COMPLETE_CURRENT_GROUP to 
true.</span></p><plain-text-body>    public final class MyCompletionStrategy 
implements AggregationStrategy {
-        @Override
-        public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
-            if (oldExchange == null) {
-                return newExchange;
-            }
-            String body = oldExchange.getIn().getBody(String.class) + "+" 
-                + newExchange.getIn().getBody(String.class);
-            oldExchange.getIn().setBody(body);
-            if (body.length() &gt;= 5) {
-                
oldExchange.setProperty(Exchange.AGGREGATION_COMPLETE_CURRENT_GROUP, true);
-            }
-            return oldExchange;
-        }
-    }</plain-text-body><p>&#160;</p><h4 
id="Aggregator2-ManuallyForcetheCompletionofAllAggregatedExchangesImmediately">Manually
 Force the Completion of All Aggregated Exchanges 
Immediately</h4><p><strong>Available as of Camel 2.9</strong><br clear="none"> 
You can manually trigger completion of all current aggregated exchanges by 
sending a message containing the header 
Exchange.AGGREGATION_COMPLETE_ALL_GROUPS set to true. The message is considered 
a signal message only, the message headers/contents will not be processed 
otherwise.</p><p><strong>Available as of Camel 2.11</strong><br clear="none"> 
You can alternatively set the header 
Exchange.AGGREGATION_COMPLETE_ALL_GROUPS_INCLUSIVE to true to trigger 
completion of all groups after processing the current message.</p><h4 
id="Aggregator2-UsingaList&lt;V&gt;inAggregationStrategy">Using a List&lt;V&gt; 
in AggregationStrategy</h4><p><strong>Available as of Camel 
2.11</strong></p><p>If you want to aggregate some value from the messages &lt
 ;V&gt; into a List&lt;V&gt; then we have added a 
<code>org.apache.camel.processor.aggregate.AbstractListAggregationStrategy</code>
 abstract class in <strong>Camel 2.11</strong> that makes this easier. The 
completed Exchange that is sent out of the aggregator will contain the 
List&lt;V&gt; in the message body.</p><p>For example to aggregate a 
List&lt;Integer&gt; you can extend this class as shown below, and implement the 
<code>getValue</code> 
method:<plain-text-body>{snippet:id=e1|lang=java|url=camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/CustomListAggregationStrategyTest.java}</plain-text-body></p><h3
 id="Aggregator2-UsingAggregateController">Using 
AggregateController</h3><p><strong>Available as of Camel 
2.16</strong></p><p>The&#160;<code>org.apache.camel.processor.aggregate.AggregateController</code>
 allows you to control the aggregate at runtime using Java or JMX API. This can 
be used to force completing groups of exchanges, or query its current runti
 me statistics.</p><p>The aggregator provides a default implementation if no 
custom have been configured, which can be accessed 
using&#160;<code>getAggregateController()</code>&#160;method. Though it may be 
easier to configure a controller in the route using aggregateController as 
shown below:</p><plain-text-body>private AggregateController controller = new 
DefaultAggregateController();
-&#160;
-from("direct:start")
-   .aggregate(header("id"), new 
MyAggregationStrategy()).completionSize(10).id("myAggregator")
-      .aggregateController(controller)
-      .to("mock:aggregated");</plain-text-body><p>Then there is API on 
AggregateController to force completion. For example to complete a group with 
key foo</p><plain-text-body>int groups = 
controller.forceCompletionOfGroup("foo");</plain-text-body><p>The number return 
would be the number of groups completed. In this case it would be 1 if the foo 
group existed and was completed. If foo does not exists then 0 is 
returned.</p><p>There is also an api to complete all 
groups</p><plain-text-body>int groups = 
controller.forceCompletionOfAllGroups();</plain-text-body><p>&#160;</p><p>To 
configure this from XML DSL</p><plain-text-body>&lt;bean id="myController" 
class="org.apache.camel.processor.aggregate.DefaultAggregateController"/&gt;
-&#160;
-  &lt;camelContext xmlns="http://camel.apache.org/schema/spring"&gt;
-        &lt;route&gt;
-            &lt;from uri="direct:start"/&gt;
-            &lt;aggregate strategyRef="myAppender" completionSize="10" 
aggregateControllerRef="myController"&gt;
-                &lt;correlationExpression&gt;
-                    &lt;header&gt;id&lt;/header&gt;
-                &lt;/correlationExpression&gt;
-                &lt;to uri="mock:result"/&gt;
-            &lt;/aggregate&gt;
-        &lt;/route&gt;
-    &lt;/camelContext&gt;</plain-text-body><p>&#160;</p><p>There is also JMX 
API on the aggregator which is available under the processors node in the Camel 
JMX tree.</p><p>&#160;</p><h3 id="Aggregator2-UsingGroupedExchanges">Using 
GroupedExchanges</h3><p>In the route below we group all the exchanges together 
using <code>groupExchanges()</code>:</p><plain-text-body>                
from("direct:start")
-                    // aggregate all using same expression
-                    .aggregate(constant(true))
-                    // wait for 0.5 seconds to aggregate
-                    .completionTimeout(500L)
-                    // group the exchanges so we get one single exchange 
containing all the others
-                    .groupExchanges()
-                    .to("mock:result");
-</plain-text-body><p>As a result we have one outgoing <a shape="rect" 
href="exchange.html">Exchange</a> being routed the the "mock:result" endpoint. 
The exchange is a holder containing all the incoming Exchanges.<br 
clear="none"> To get access to these exchanges you need to access them from a 
property on the outgoing exchange as 
shown:</p><plain-text-body>List&lt;Exchange&gt; grouped = 
out.getProperty(Exchange.GROUPED_EXCHANGE, List.class);
-</plain-text-body><p>From <strong>Camel 2.13</strong> onwards this behavior 
has changed to store these exchanges directly on the message body which is more 
intuitive:</p><plain-text-body>List&lt;Exchange&gt; grouped = 
exchange.getIn().getBody(List.class);
-</plain-text-body><rich-text-body><p>Notice the old way using the property is 
still present in <strong>Camel 2.13</strong> onwards, but its considered 
deprecated and to be removed in Camel 3.0 onwards.</p></rich-text-body><h3 
id="Aggregator2-UsingPOJOsasAggregationStrategy">Using POJOs as 
AggregationStrategy</h3><p><strong>Available as of Camel 
2.12</strong></p><rich-text-body><p>You can use POJOs as AggregationStrategy 
with the other <a shape="rect" href="eip.html">EIP</a>s that supports 
aggregation, such as <a shape="rect" href="splitter.html">Splitter</a>, <a 
shape="rect" href="recipient-list.html">Recipient List</a>, 
etc.</p></rich-text-body><p>To use the <code>AggregationStrategy</code> you had 
to implement the 
<code>org.apache.camel.processor.aggregate.AggregationStrategy</code> 
interface, which means your logic would be tied to the Camel API. From 
<strong>Camel 2.12</strong> onwards you can use a POJO for the logic and let 
Camel adapt to your POJO. To use a POJO a convention 
 must be followed:</p><ul class="alternate"><li>there must be a public method 
to use</li><li>the method must not be void</li><li>the method can be static or 
non-static</li><li>the method must have 2 or more parameters</li><li>the 
parameters is paired so the first 50% is applied to the 
<code>oldExchange</code> and the reminder 50% is for the 
<code>newExchange</code></li><li>.. meaning that there must be an equal number 
of parameters, eg 2, 4, 6 etc.</li></ul><p>The paired methods is expected to be 
ordered as follows:</p><ul class="alternate"><li>the first parameter is the 
message body</li><li>the 2nd parameter is a Map of the headers</li><li>the 3rd 
parameter is a Map of the Exchange properties</li></ul><p>This convention is 
best explained with some examples.</p><p>In the method below, we have only 2 
parameters, so the 1st parameter is the body of the <code>oldExchange</code>, 
and the 2nd is paired to the body of the 
<code>newExchange</code>:</p><plain-text-body>public String append(S
 tring existing, String next) {
-  return existing + next;
-}
-</plain-text-body><p>In the method below, we have only 4 parameters, so the 
1st parameter is the body of the <code>oldExchange</code>, and the 2nd is the 
Map of the <code>oldExchange} headers, and the 3rd is paired to the body of the 
{{newExchange</code>, and the 4th parameter is the Map of the 
<code>newExchange</code> headers:</p><plain-text-body>public String 
append(String existing, Map existingHeaders, String next, Map nextHeaders) {
-  return existing + next;
-}
-</plain-text-body><p>And finally if we have 6 parameters the we also have the 
properties of the <a shape="rect" 
href="exchange.html">Exchange</a>s:</p><plain-text-body>public String 
append(String existing, Map existingHeaders, Map existingProperties, String 
next, Map nextHeaders, Map nextProperties) {
-  return existing + next;
-}
-</plain-text-body><p>To use this with the <a shape="rect" 
href="aggregator2.html">Aggregate</a> EIP we can use a POJO with the aggregate 
logic as follows:</p><plain-text-body>public class MyBodyAppender {
-
-    public String append(String existing, String next) {
-        return next + existing;
-    }
-
-}
-</plain-text-body><p>And then in the Camel route we create an instance of our 
bean, and then refer to the bean in the route using <code>bean</code> method 
from <code>org.apache.camel.util.toolbox.AggregationStrategies</code> as 
shown:</p><plain-text-body>    private MyBodyAppender appender = new 
MyBodyAppender();
-
-    public void configure() throws Exception {
-        from("direct:start")
-            .aggregate(constant(true), AggregationStrategies.bean(appender, 
"append"))
-                .completionSize(3)
-                .to("mock:result");
-    }
-</plain-text-body><p>We can also provide the bean type 
directly:</p><plain-text-body>    public void configure() throws Exception {
-        from("direct:start")
-            .aggregate(constant(true), 
AggregationStrategies.bean(MyBodyAppender.class, "append"))
-                .completionSize(3)
-                .to("mock:result");
-    }
-</plain-text-body><p>And if the bean has only one method we do not need to 
specify the name of the method:</p><plain-text-body>    public void configure() 
throws Exception {
-        from("direct:start")
-            .aggregate(constant(true), 
AggregationStrategies.bean(MyBodyAppender.class))
-                .completionSize(3)
-                .to("mock:result");
-    }
-</plain-text-body><p>And the <code>append</code> method could be 
static:</p><plain-text-body>public class MyBodyAppender {
-
-    public static String append(String existing, String next) {
-        return next + existing;
-    }
-
-}
-</plain-text-body><p>If you are using XML DSL then we need to declare a 
&lt;bean&gt; with the POJO:</p><parameter 
ac:name="">xml</parameter><plain-text-body>    &lt;bean id="myAppender" 
class="com.foo.MyBodyAppender"/&gt;
-</plain-text-body><p>And in the Camel route we use <code>strategyRef</code> to 
refer to the bean by its id, and the <code>strategyMethodName</code> can be 
used to define the method name to call:</p><parameter 
ac:name="">xml</parameter><plain-text-body>    &lt;camelContext 
xmlns="http://camel.apache.org/schema/spring"&gt;
-        &lt;route&gt;
-            &lt;from uri="direct:start"/&gt;
-            &lt;aggregate strategyRef="myAppender" strategyMethodName="append" 
completionSize="3"&gt;
-                &lt;correlationExpression&gt;
-                    &lt;constant&gt;true&lt;/constant&gt;
-                &lt;/correlationExpression&gt;
-                &lt;to uri="mock:result"/&gt;
-            &lt;/aggregate&gt;
-        &lt;/route&gt;
-    &lt;/camelContext&gt;
-</plain-text-body><p>When using XML DSL you must define the POJO as a 
&lt;bean&gt;.</p><h4 id="Aggregator2-Aggregatingwhennodata">Aggregating when no 
data</h4><p>By default when using POJOs as AggregationStrategy, then the method 
is <strong>only</strong> invoked when there is data to be aggregated (by 
default). You can use the option <code>strategyMethodAllowNull</code> to 
configure this. Where as without using POJOs then you may have 
<code>null</code> as <code>oldExchange</code> or <code>newExchange</code> 
parameters. For example the <a shape="rect" 
href="aggregator2.html">Aggregate</a> EIP will invoke the 
<code>AggregationStrategy</code> with <code>oldExchange</code> as null, for the 
first <a shape="rect" href="exchange.html">Exchange</a> incoming to the 
aggregator. And then for subsequent <a shape="rect" 
href="exchange.html">Exchange</a>s then <code>oldExchange</code> and 
<code>newExchange</code> parameters are both not null.</p><h5 
id="Aggregator2-Examplewithandnodata">Example w
 ith <a shape="rect" href="content-enricher.html">Content Enricher</a> and no 
data</h5><p>Though with POJOs as AggregationStrategy we made this simpler and 
only call the method when <code>oldExchange</code> and <code>newExchange</code> 
is not null, as that would be the most common use-case. If you need to allow 
<code>oldExchange</code> or <code>newExchange</code> to be null, then you can 
configure this with the POJO using the 
<code>AggregationStrategyBeanAdapter</code> as shown below. On the bean adapter 
we call <code>setAllowNullNewExchange</code> to allow the new exchange to be 
null.</p><plain-text-body>    public void configure() throws Exception {
-        AggregationStrategyBeanAdapter myStrategy = new 
AggregationStrategyBeanAdapter(appender, "append");
-        myStrategy.setAllowNullOldExchange(true);
-        myStrategy.setAllowNullNewExchange(true);
-
-        from("direct:start")
-            .pollEnrich("seda:foo", 1000, myStrategy)
-                .to("mock:result");
-    }
-</plain-text-body><p>This can be configured a bit easier using the 
<code>beanAllowNull</code> method from <code>AggregationStrategies</code> as 
shown:</p><plain-text-body>    public void configure() throws Exception {
-        from("direct:start")
-            .pollEnrich("seda:foo", 1000, 
AggregationStrategies.beanAllowNull(appender, "append"))
-                .to("mock:result");
-    }
-</plain-text-body><p>Then the <code>append</code> method in the POJO would 
need to deal with the situation that <code>newExchange</code> can be 
null:</p><plain-text-body>    public class MyBodyAppender {
-
-        public String append(String existing, String next) {
-            if (next == null) {
-                return "NewWasNull" + existing;
-            } else {
-                return existing + next;
-            }
-        }
-
-    }
-</plain-text-body><p>In the example above we use the <a shape="rect" 
href="content-enricher.html">Content Enricher</a> EIP using 
<code>pollEnrich</code>. The <code>newExchange</code> will be null in the 
situation we could not get any data from the "seda:foo" endpoint, and therefore 
the timeout was hit after 1 second. So if we need to do some special merge 
logic we would need to set <code>setAllowNullNewExchange=true</code>, so the 
<code>append</code> method will be invoked. If we do not do that then when the 
timeout was hit, then the append method would normally not be invoked, meaning 
the <a shape="rect" href="content-enricher.html">Content Enricher</a> did not 
merge/change the message.</p><p>In XML DSL you would configure the 
<code>strategyMethodAllowNull</code> option and set it to true as shown 
below:</p><parameter ac:name="">xml</parameter><plain-text-body>    
&lt;camelContext xmlns="http://camel.apache.org/schema/spring"&gt;
-        &lt;route&gt;
-            &lt;from uri="direct:start"/&gt;
-            &lt;aggregate strategyRef="myAppender" strategyMethodName="append" 
strategyMethodAllowNull="true" completionSize="3"&gt;
-                &lt;correlationExpression&gt;
-                    &lt;constant&gt;true&lt;/constant&gt;
-                &lt;/correlationExpression&gt;
-                &lt;to uri="mock:result"/&gt;
-            &lt;/aggregate&gt;
-        &lt;/route&gt;
-    &lt;/camelContext&gt;
-</plain-text-body><h5 id="Aggregator2-Differentbodytypes">Different body 
types</h5><p>When for example using <code>strategyMethodAllowNull</code> as 
true, then the parameter types of the message bodies does not have to be the 
same. For example suppose we want to aggregate from a <code>com.foo.User</code> 
type to a <code>List&lt;String&gt;</code> that contains the user name. We could 
code a POJO doing this as follows:</p><plain-text-body>    public static final 
class MyUserAppender {
-
-        public List addUsers(List names, User user) {
-            if (names == null) {
-                names = new ArrayList();
-            }
-            names.add(user.getName());
-            return names;
-        }
-    }
-</plain-text-body><p>Notice that the return type is a List which we want to 
contain the user names. The 1st parameter is the list of names, and then notice 
the 2nd parameter is the incoming <code>com.foo.User</code> type.</p><h3 
id="Aggregator2-Seealso">See also</h3><ul class="alternate"><li>The <a 
shape="rect" href="loan-broker-example.html">Loan Broker Example</a> which uses 
an aggregator</li><li><a shape="rect" class="external-link" 
href="http://tmielke.blogspot.com/2009/01/using-camel-aggregator-correctly.html";
 rel="nofollow">Blog post by Torsten Mielke</a> about using the aggregator 
correctly.</li><li>The old <a shape="rect" 
href="aggregator.html">Aggregator</a></li><li><a shape="rect" 
href="hawtdb.html">HawtDB</a>, <a shape="rect" href="leveldb.html">LevelDB</a> 
or <a shape="rect" href="sql-component.html">SQL Component</a> for persistence 
support</li><li><a shape="rect" href="aggregate-example.html">Aggregate 
Example</a> for an example application</li></ul></div>
+<div class="wiki-content maincontent"><h3 
id="Aggregator2-Aggregator">Aggregator</h3><p><strong>This applies for Camel 
version 2.3 or newer. If you use an older version then use this <a shape="rect" 
href="aggregator.html">Aggregator</a> link instead.</strong></p><p>The <a 
shape="rect" class="external-link" 
href="http://www.enterpriseintegrationpatterns.com/Aggregator.html"; 
rel="nofollow">Aggregator</a> from the <a shape="rect" 
href="enterprise-integration-patterns.html">EIP patterns</a> allows you to 
combine a number of messages together into a single message.</p><p><span 
class="confluence-embedded-file-wrapper"><img class="confluence-embedded-image 
confluence-external-resource" 
src="http://www.enterpriseintegrationpatterns.com/img/Aggregator.gif"; 
data-image-src="http://www.enterpriseintegrationpatterns.com/img/Aggregator.gif";></span></p><p>A
 correlation <a shape="rect" href="expression.html">Expression</a> is used to 
determine the messages which should be aggregated together. If yo
 u want to aggregate all messages into a single message, just use a constant 
expression. An&#160;<strong><code>AggregationStrategy</code></strong> is used 
to combine all the message exchanges for a single correlation key into a single 
message exchange.</p><h3 id="Aggregator2-Aggregatoroptions">Aggregator 
options</h3><p>The aggregator supports the following 
options:</p><p>confluenceTableSmall</p><div class="table-wrap"><table 
class="confluenceTable"><tbody><tr><th colspan="1" rowspan="1" 
class="confluenceTh"><p>Option</p></th><th colspan="1" rowspan="1" 
class="confluenceTh"><p>Default</p></th><th colspan="1" rowspan="1" 
class="confluenceTh"><p>Description</p></th></tr><tr><td colspan="1" 
rowspan="1" 
class="confluenceTd"><p><code>correlationExpression</code></p></td><td 
colspan="1" rowspan="1" class="confluenceTd"><p>&#160;</p></td><td colspan="1" 
rowspan="1" class="confluenceTd"><p>Mandatory <a shape="rect" 
href="expression.html">Expression</a> which evaluates the correlation key to u
 se for aggregation. The <a shape="rect" href="exchange.html">Exchange</a> 
which has the same correlation key is aggregated together. If the correlation 
key could not be evaluated an Exception is thrown. You can disable this by 
using the <strong><code>ignoreBadCorrelationKeys</code></strong> 
option.</p></td></tr><tr><td colspan="1" rowspan="1" 
class="confluenceTd"><p><code>aggregationStrategy</code></p></td><td 
colspan="1" rowspan="1" class="confluenceTd"><p>&#160;</p></td><td colspan="1" 
rowspan="1" class="confluenceTd"><p>Mandatory 
<strong><code>AggregationStrategy</code></strong> which is used to 
<em>merge</em> the incoming <a shape="rect" href="exchange.html">Exchange</a> 
with the existing already merged exchanges. At first call the 
<strong><code>oldExchange</code></strong> parameter is 
<strong><code>null</code></strong>. On subsequent invocations the 
<strong><code>oldExchange</code></strong> contains the merged exchanges and 
<strong><code>newExchange</code></strong> is of course
  the new incoming Exchange.</p><p>From <strong>Camel 2.9.2</strong> onwards 
the strategy can also be a 
<strong><code>TimeoutAwareAggregationStrategy</code></strong> implementation, 
supporting the timeout callback, see further below for more details.</p><p>From 
<strong>Camel 2.16</strong>: the strategy can also be a 
<strong><code>PreCompletionAwareAggregationStrategy</code></strong> 
implementation which then runs the completion check in pre-completion mode. See 
further below for more details.</p></td></tr><tr><td colspan="1" rowspan="1" 
class="confluenceTd"><p><code>strategyRef</code></p></td><td colspan="1" 
rowspan="1" class="confluenceTd"><p>&#160;</p></td><td colspan="1" rowspan="1" 
class="confluenceTd"><p>A reference to lookup the 
<strong><code>AggregationStrategy</code></strong> in the <a shape="rect" 
href="registry.html">Registry</a>. From <strong>Camel 2.12</strong> onwards you 
can also use a POJO as the <strong><code>AggregationStrategy</code></strong>, 
see further below for 
 details.</p></td></tr><tr><td colspan="1" rowspan="1" 
class="confluenceTd"><p><code>strategyMethodName</code></p></td><td colspan="1" 
rowspan="1" class="confluenceTd"><p>&#160;</p></td><td colspan="1" rowspan="1" 
class="confluenceTd"><p><strong>Camel 2.12:</strong> This option can be used to 
explicit declare the method name to use, when using POJOs as the 
<strong><code>AggregationStrategy</code></strong>. See further below for more 
details.</p></td></tr><tr><td colspan="1" rowspan="1" 
class="confluenceTd"><p><code>strategyMethodAllowNull</code></p></td><td 
colspan="1" rowspan="1" class="confluenceTd"><p><code>false</code></p></td><td 
colspan="1" rowspan="1" class="confluenceTd"><p><strong>Camel 2.12:</strong> If 
this option is <strong><code>false</code></strong> then the aggregate method is 
not used for the very first aggregation. If this option is 
<strong><code>true</code></strong> then <strong><code>null</code></strong> 
values is used as the <strong><code>oldExchange</code></stron
 g> (at the very first aggregation), when using POJOs as the 
<strong><code>AggregationStrategy</code></strong>. See further below for more 
details.</p></td></tr><tr><td colspan="1" rowspan="1" 
class="confluenceTd"><p><code>completionSize</code></p></td><td colspan="1" 
rowspan="1" class="confluenceTd"><p>&#160;</p></td><td colspan="1" rowspan="1" 
class="confluenceTd"><p>Number of messages aggregated before the aggregation is 
complete. This option can be set as either a fixed value or using an <a 
shape="rect" href="expression.html">Expression</a> which allows you to evaluate 
a size dynamically - will use <strong><code>Integer</code></strong> as result. 
If both are set Camel will fallback to use the fixed value if the <a 
shape="rect" href="expression.html">Expression</a> result was 
<strong><code>null</code></strong> or 
<strong><code>0</code></strong>.</p></td></tr><tr><td colspan="1" rowspan="1" 
class="confluenceTd"><p><code>completionTimeout</code></p></td><td colspan="1" 
rowspan="1" c
 lass="confluenceTd"><p>&#160;</p></td><td colspan="1" rowspan="1" 
class="confluenceTd"><p>Time in millis that an aggregated exchange should be 
inactive before its complete. This option can be set as either a fixed value or 
using an <a shape="rect" href="expression.html">Expression</a> which allows you 
to evaluate a timeout dynamically - will use <strong><code>Long</code></strong> 
as result. If both are set Camel will fallback to use the fixed value if the <a 
shape="rect" href="expression.html">Expression</a> result was 
<strong><code>null</code></strong> or <strong><code>0</code></strong>. You 
cannot use this option together with 
<strong><code>completionInterval</code></strong>, only one of the two can be 
used.</p></td></tr><tr><td colspan="1" rowspan="1" 
class="confluenceTd"><p><code>completionInterval</code></p></td><td colspan="1" 
rowspan="1" class="confluenceTd"><p>&#160;</p></td><td colspan="1" rowspan="1" 
class="confluenceTd"><p>A repeating period in millis by which the aggrega
 tor will complete all current aggregated exchanges. Camel has a background 
task which is triggered every period. You cannot use this option together with 
<strong><code>completionTimeout</code></strong>, only one of them can be 
used.</p></td></tr><tr><td colspan="1" rowspan="1" 
class="confluenceTd"><p><code>completionPredicate</code></p></td><td 
colspan="1" rowspan="1" class="confluenceTd"><p>&#160;</p></td><td colspan="1" 
rowspan="1" class="confluenceTd"><p>A <a shape="rect" 
href="predicate.html">Predicate</a> to indicate when an aggregated exchange is 
complete.</p><p>From <strong>Camel 2.15</strong>: if this is not specified and 
the&#160;<strong><code>AggregationStrategy</code></strong> object implements 
Predicate, the&#160;<strong><code>aggregationStrategy</code></strong> object 
will be used as the 
<strong><code>completionPredicate</code></strong>.</p></td></tr><tr><td 
colspan="1" rowspan="1" 
class="confluenceTd"><p><code>completionFromBatchConsumer</code></p></td><td 
colspan="1" 
 rowspan="1" class="confluenceTd"><p><code>false</code></p></td><td colspan="1" 
rowspan="1" class="confluenceTd"><p>This option is if the exchanges are coming 
from a <a shape="rect" href="batch-consumer.html">Batch Consumer</a>. Then when 
enabled the <a shape="rect" href="aggregator2.html">Aggregator2</a> will use 
the batch size determined by the <a shape="rect" 
href="batch-consumer.html">Batch Consumer</a> in the message header 
<strong><code>CamelBatchSize</code></strong>. See more details at <a 
shape="rect" href="batch-consumer.html">Batch Consumer</a>. This can be used to 
aggregate all files consumed from a <a shape="rect" href="file2.html">File</a> 
endpoint in that given poll.</p></td></tr><tr><td colspan="1" rowspan="1" 
class="confluenceTd"><p><code>forceCompletionOnStop</code></p></td><td 
colspan="1" rowspan="1" class="confluenceTd"><p><code>false</code></p></td><td 
colspan="1" rowspan="1" class="confluenceTd"><p><strong>Camel 2.9</strong> 
Indicates to complete all current aggr
 egated exchanges when the context is stopped</p></td></tr><tr><td colspan="1" 
rowspan="1" class="confluenceTd"><code>completeAllOnStop</code></td><td 
colspan="1" rowspan="1" class="confluenceTd"><code>false</code></td><td 
colspan="1" rowspan="1" class="confluenceTd"><strong>Camel 
2.16:</strong>&#160;Indicates to wait to complete all current and partial 
(pending) aggregated exchanges when the context is stopped. This also means 
that we will wait for all pending exchanges which are stored in 
the&#160;aggregation repository&#160;to complete so the repository is empty 
before we can stop. &#160;You may want to enable this when using the memory 
based aggregation repository that is memory based only,&#160;and do not store 
data on disk. When this option is enabled, then the aggregator is waiting to 
complete&#160;all those exchanges before its stopped, when stopping 
CamelContext or the route using it.</td></tr><tr><td colspan="1" rowspan="1" 
class="confluenceTd"><p><code>eagerCheckCompletion
 </code></p></td><td colspan="1" rowspan="1" 
class="confluenceTd"><p><code>false</code></p></td><td colspan="1" rowspan="1" 
class="confluenceTd"><p>Whether or not to eager check for completion when a new 
incoming <a shape="rect" href="exchange.html">Exchange</a> has been received. 
This option influences the behavior of the 
<strong><code>completionPredicate</code></strong> option as the <a shape="rect" 
href="exchange.html">Exchange</a> being passed in changes accordingly. When 
<strong><code>false</code></strong> the <a shape="rect" 
href="exchange.html">Exchange</a> passed in the <a shape="rect" 
href="predicate.html">Predicate</a> is the <em>aggregated</em> Exchange which 
means any information you may store on the aggregated Exchange from 
the&#160;<strong><code>AggregationStrategy</code></strong> is available for the 
<a shape="rect" href="predicate.html">Predicate</a>. When <code>true</code> the 
<a shape="rect" href="exchange.html">Exchange</a> passed in the <a shape="rect" 
href="predi
 cate.html">Predicate</a> is the <em>incoming</em> <a shape="rect" 
href="exchange.html">Exchange</a>, which means you can access data from the 
incoming Exchange.</p></td></tr><tr><td colspan="1" rowspan="1" 
class="confluenceTd"><p><code>groupExchanges</code></p></td><td colspan="1" 
rowspan="1" class="confluenceTd"><p><code>false</code></p></td><td colspan="1" 
rowspan="1" class="confluenceTd"><p>If enabled then Camel will group all 
aggregated Exchanges into a single combined 
<strong><code>org.apache.camel.impl.GroupedExchange</code></strong> holder 
class that holds all the aggregated Exchanges. And as a result only one 
Exchange is being sent out from the aggregator. Can be used to combine many 
incoming Exchanges into a single output Exchange without coding a 
custom&#160;<strong><code>AggregationStrategy</code></strong> yourself. 
</p><p><strong>Note:</strong> this option does <strong>not</strong> support 
persistent repository with the aggregator. See further below for an example and 
mo
 re details.</p></td></tr><tr><td colspan="1" rowspan="1" 
class="confluenceTd"><p><code>ignoreInvalidCorrelationKeys</code></p></td><td 
colspan="1" rowspan="1" class="confluenceTd"><p><code>false</code></p></td><td 
colspan="1" rowspan="1" class="confluenceTd"><p>Whether or not to ignore 
correlation keys which could not be evaluated to a value. By default Camel will 
throw an Exception, but you can enable this option and ignore the situation 
instead.</p></td></tr><tr><td colspan="1" rowspan="1" 
class="confluenceTd"><p><code>closeCorrelationKeyOnCompletion</code></p></td><td
 colspan="1" rowspan="1" class="confluenceTd"><p>&#160;</p></td><td colspan="1" 
rowspan="1" class="confluenceTd"><p>Whether or not too <em>late</em> Exchanges 
should be accepted or not. You can enable this to indicate that if a 
correlation key has already been completed, then any new exchanges with the 
same correlation key be denied. Camel will then throw a 
<strong><code>closedCorrelationKeyException</code></strong> 
 exception. When using this option you pass in a 
<strong><code>integer</code></strong> which is a number for a LRUCache which 
keeps that last X number of closed correlation keys. You can pass in 0 or a 
negative value to indicate a unbounded cache. By passing in a number you are 
ensured that cache won't grow too big if you use a log of different correlation 
keys.</p></td></tr><tr><td colspan="1" rowspan="1" 
class="confluenceTd"><p><code>discardOnCompletionTimeout</code></p></td><td 
colspan="1" rowspan="1" class="confluenceTd"><p><code>false</code></p></td><td 
colspan="1" rowspan="1" class="confluenceTd"><p><strong>Camel 2.5:</strong> 
Whether or not exchanges which complete due to a timeout should be discarded. 
If enabled then when a timeout occurs the aggregated message will 
<strong>not</strong> be sent out but dropped (discarded).</p></td></tr><tr><td 
colspan="1" rowspan="1" 
class="confluenceTd"><p><code>aggregationRepository</code></p></td><td 
colspan="1" rowspan="1" class="confluen
 ceTd"><p>&#160;</p></td><td colspan="1" rowspan="1" 
class="confluenceTd"><p>Allows you to plugin you own implementation of 
<strong><code>org.apache.camel.spi.AggregationRepository</code></strong> which 
keeps track of the current inflight aggregated exchanges. Camel uses by default 
a memory based implementation.</p></td></tr><tr><td colspan="1" rowspan="1" 
class="confluenceTd"><p><code>aggregationRepositoryRef</code></p></td><td 
colspan="1" rowspan="1" class="confluenceTd"><p>&#160;</p></td><td colspan="1" 
rowspan="1" class="confluenceTd"><p>Reference to lookup a 
<strong><code>aggregationRepository</code></strong> in the <a shape="rect" 
href="registry.html">Registry</a>.</p></td></tr><tr><td colspan="1" rowspan="1" 
class="confluenceTd"><p><code>parallelProcessing</code></p></td><td colspan="1" 
rowspan="1" class="confluenceTd"><p><code>false</code></p></td><td colspan="1" 
rowspan="1" class="confluenceTd"><p>When aggregated are completed they are 
being send out of the aggregator. This 
 option indicates whether or not Camel should use a thread pool with multiple 
threads for concurrency. If no custom thread pool has been specified then Camel 
creates a default pool with 10 concurrent threads.</p></td></tr><tr><td 
colspan="1" rowspan="1" 
class="confluenceTd"><p><code>executorService</code></p></td><td colspan="1" 
rowspan="1" class="confluenceTd"><p>&#160;</p></td><td colspan="1" rowspan="1" 
class="confluenceTd"><p>If using 
<strong><code>parallelProcessing</code></strong> you can specify a custom 
thread pool to be used. In fact also if you are not using 
<strong><code>parallelProcessing</code></strong> this custom thread pool is 
used to send out aggregated exchanges as well.</p></td></tr><tr><td colspan="1" 
rowspan="1" class="confluenceTd"><p><code>executorServiceRef</code></p></td><td 
colspan="1" rowspan="1" class="confluenceTd"><p>&#160;</p></td><td colspan="1" 
rowspan="1" class="confluenceTd"><p>Reference to lookup a 
<strong><code>executorService</code></strong> in t
 he <a shape="rect" href="registry.html">Registry</a></p></td></tr><tr><td 
colspan="1" rowspan="1" 
class="confluenceTd"><p><code>timeoutCheckerExecutorService</code></p></td><td 
colspan="1" rowspan="1" class="confluenceTd"><p>&#160;</p></td><td colspan="1" 
rowspan="1" class="confluenceTd"><p><strong>Camel 2.9:</strong> If using either 
of the <strong><code>completionTimeout</code></strong>, 
<strong><code>completionTimeoutExpression</code></strong>, or 
<strong><code>completionInterval</code></strong> options a background thread is 
created to check for the completion for every aggregator. Set this option to 
provide a custom thread pool to be used rather than creating a new thread for 
every aggregator.</p></td></tr><tr><td colspan="1" rowspan="1" 
class="confluenceTd"><p><code>timeoutCheckerExecutorServiceRef</code></p></td><td
 colspan="1" rowspan="1" class="confluenceTd"><p>&#160;</p></td><td colspan="1" 
rowspan="1" class="confluenceTd"><p><strong>Camel 2.9:</strong> Reference to 
lookup 
 a <strong><code>timeoutCheckerExecutorService</code></strong> in the <a 
shape="rect" href="registry.html">Registry</a></p></td></tr><tr><td colspan="1" 
rowspan="1" class="confluenceTd"><p><code>optimisticLocking</code></p></td><td 
colspan="1" rowspan="1" class="confluenceTd"><p><code>false</code></p></td><td 
colspan="1" rowspan="1" class="confluenceTd"><p><strong>Camel 2.11:</strong> 
Turns on using optimistic locking, which requires the 
<strong><code>aggregationRepository</code></strong> being used, is supporting 
this by implementing the 
<strong><code>org.apache.camel.spi.OptimisticLockingAggregationRepository</code></strong>
 interface.</p></td></tr><tr><td colspan="1" rowspan="1" 
class="confluenceTd"><p><code>optimisticLockRetryPolicy</code></p></td><td 
colspan="1" rowspan="1" class="confluenceTd"><p>&#160;</p></td><td colspan="1" 
rowspan="1" class="confluenceTd"><p><strong>Camel 2.11.1:</strong> Allows to 
configure retry settings when using optimistic locking.</p></td></tr></tbody
 ></table></div><h3 id="Aggregator2-ExchangeProperties">Exchange 
 >Properties</h3><p>The following properties are set on each aggregated 
 >Exchange:</p><p>confluenceTableSmall</p><div class="table-wrap"><table 
 >class="confluenceTable"><tbody><tr><th colspan="1" rowspan="1" 
 >class="confluenceTh"><p>Header</p></th><th colspan="1" rowspan="1" 
 >class="confluenceTh"><p>Type</p></th><th colspan="1" rowspan="1" 
 >class="confluenceTh"><p>Description</p></th></tr><tr><td colspan="1" 
 >rowspan="1" 
 >class="confluenceTd"><p><code>CamelAggregatedSize</code></p></td><td 
 >colspan="1" rowspan="1" class="confluenceTd"><p><code>int</code></p></td><td 
 >colspan="1" rowspan="1" class="confluenceTd"><p>The total number of Exchanges 
 >aggregated into this combined Exchange.</p></td></tr><tr><td colspan="1" 
 >rowspan="1" 
 >class="confluenceTd"><p><code>CamelAggregatedCompletedBy</code></p></td><td 
 >colspan="1" rowspan="1" 
 >class="confluenceTd"><p><code>String</code></p></td><td colspan="1" 
 >rowspan="1" class="confluenceTd"><p>Ind
 icator how the aggregation was completed as a value of either: 
<strong><code>predicate</code></strong>, <strong><code>size</code></strong>, 
<strong><code>strategy</code></strong>, <strong><code>consumer</code></strong>, 
<strong><code>timeout</code></strong>, 
<strong><code>forceCompletion</code></strong> or 
<strong><code>interval</code></strong>.</p></td></tr></tbody></table></div><h3 
id="Aggregator2-AboutAggregationStrategy">About AggregationStrategy</h3><p>The 
<strong><code>AggregationStrategy</code></strong> is used for aggregating the 
old (lookup by its correlation id) and the new exchanges together into a single 
exchange. Possible implementations include performing some kind of combining or 
delta processing, such as adding line items together into an invoice or just 
using the newest exchange and removing old exchanges such as for state tracking 
or market data prices; where old values are of little use.</p><p>Notice the 
aggregation strategy is a mandatory option and must be provi
 ded to the aggregator.</p><p>Here are a few 
example&#160;<strong><code>AggregationStrategy</code></strong> implementations 
that should help you create your own custom strategy.</p>
+ class ArrayListAggregationStrategy implements AggregationStrategy { public 
Exchange aggregate(Exchange oldExchange, Exchange newExchange) { Object newBody 
= newExchange.getIn().getBody(); ArrayList</div>
         </td>
         <td valign="top">
           <div class="navigation">


Reply via email to