Modified: websites/production/camel/content/aggregator2.html
==============================================================================
--- websites/production/camel/content/aggregator2.html (original)
+++ websites/production/camel/content/aggregator2.html Fri Aug 25 08:22:01 2017
@@ -36,17 +36,6 @@
<![endif]-->
- <link href='//camel.apache.org/styles/highlighter/styles/shCoreCamel.css'
rel='stylesheet' type='text/css' />
- <link href='//camel.apache.org/styles/highlighter/styles/shThemeCamel.css'
rel='stylesheet' type='text/css' />
- <script src='//camel.apache.org/styles/highlighter/scripts/shCore.js'
type='text/javascript'></script>
- <script src='//camel.apache.org/styles/highlighter/scripts/shBrushJava.js'
type='text/javascript'></script>
- <script src='//camel.apache.org/styles/highlighter/scripts/shBrushXml.js'
type='text/javascript'></script>
- <script src='//camel.apache.org/styles/highlighter/scripts/shBrushPlain.js'
type='text/javascript'></script>
-
- <script type="text/javascript">
- SyntaxHighlighter.defaults['toolbar'] = false;
- SyntaxHighlighter.all();
- </script>
<title>
Apache Camel: Aggregator2
@@ -86,14 +75,7 @@
<tbody>
<tr>
<td valign="top" width="100%">
-<div class="wiki-content maincontent"><h3
id="Aggregator2-Aggregator">Aggregator</h3><p><strong>This applies for Camel
version 2.3 or newer. If you use an older version then use this <a shape="rect"
href="aggregator.html">Aggregator</a> link instead.</strong></p><p>The <a
shape="rect" class="external-link"
href="http://www.enterpriseintegrationpatterns.com/Aggregator.html"
rel="nofollow">Aggregator</a> from the <a shape="rect"
href="enterprise-integration-patterns.html">EIP patterns</a> allows you to
combine a number of messages together into a single message.</p><p><span
class="confluence-embedded-file-wrapper"><img class="confluence-embedded-image
confluence-external-resource"
src="http://www.enterpriseintegrationpatterns.com/img/Aggregator.gif"
data-image-src="http://www.enterpriseintegrationpatterns.com/img/Aggregator.gif"></span></p><p>A
correlation <a shape="rect" href="expression.html">Expression</a> is used to
determine the messages which should be aggregated together. If yo
u want to aggregate all messages into a single message, just use a constant
expression. An AggregationStrategy is used to combine all the message exchanges
for a single correlation key into a single message exchange.</p><h3
id="Aggregator2-Aggregatoroptions">Aggregator options</h3><p>The aggregator
supports the following options:</p><div class="confluenceTableSmall"><div
class="table-wrap"><table class="confluenceTable"><tbody><tr><th colspan="1"
rowspan="1" class="confluenceTh"><p>Option</p></th><th colspan="1" rowspan="1"
class="confluenceTh"><p>Default</p></th><th colspan="1" rowspan="1"
class="confluenceTh"><p>Description</p></th></tr><tr><td colspan="1"
rowspan="1" class="confluenceTd"><p>correlationExpression</p></td><td
colspan="1" rowspan="1" class="confluenceTd"><p> </p></td><td colspan="1"
rowspan="1" class="confluenceTd"><p>Mandatory <a shape="rect"
href="expression.html">Expression</a> which evaluates the correlation key to
use for aggregation. The <a shape="rect" h
ref="exchange.html">Exchange</a> which has the same correlation key is
aggregated together. If the correlation key could not be evaluated an Exception
is thrown. You can disable this by using the
<code>ignoreBadCorrelationKeys</code> option.</p></td></tr><tr><td colspan="1"
rowspan="1" class="confluenceTd"><p>aggregationStrategy</p></td><td colspan="1"
rowspan="1" class="confluenceTd"><p> </p></td><td colspan="1" rowspan="1"
class="confluenceTd"><p>Mandatory <code>AggregationStrategy</code> which is
used to <em>merge</em> the incoming <a shape="rect"
href="exchange.html">Exchange</a> with the existing already merged exchanges.
At first call the <code>oldExchange</code> parameter is <code>null</code>. On
subsequent invocations the <code>oldExchange</code> contains the merged
exchanges and <code>newExchange</code> is of course the new incoming Exchange.
From <strong>Camel 2.9.2</strong> onwards the strategy can also be a
<code>TimeoutAwareAggregationStrategy</code> implementation
, supporting the timeout callback, see further below for more details. From
<strong>Camel 2.16</strong> onwards the strategy can also be a
<code>PreCompletionAwareAggregationStrategy</code> implementation which then
runs the completion check in pre-completion mode. See further below for more
details.</p></td></tr><tr><td colspan="1" rowspan="1"
class="confluenceTd"><p>strategyRef</p></td><td colspan="1" rowspan="1"
class="confluenceTd"><p> </p></td><td colspan="1" rowspan="1"
class="confluenceTd"><p>A reference to lookup the
<code>AggregationStrategy</code> in the <a shape="rect"
href="registry.html">Registry</a>. From <strong>Camel 2.12</strong> onwards you
can also use a POJO as the <code>AggregationStrategy</code>, see further below
for details.</p></td></tr><tr><td colspan="1" rowspan="1"
class="confluenceTd"><p>strategyMethodName</p></td><td colspan="1" rowspan="1"
class="confluenceTd"><p> </p></td><td colspan="1" rowspan="1"
class="confluenceTd"><p><strong>Camel 2.12
:</strong> This option can be used to explicit declare the method name to use,
when using POJOs as the <code>AggregationStrategy</code>. See further below for
more details.</p></td></tr><tr><td colspan="1" rowspan="1"
class="confluenceTd"><p>strategyMethodAllowNull</p></td><td colspan="1"
rowspan="1" class="confluenceTd"><p><code>false</code></p></td><td colspan="1"
rowspan="1" class="confluenceTd"><p><strong>Camel 2.12:</strong> If this option
is <code>false</code> then the aggregate method is not used for the very first
aggregation. If this option is <code>true</code> then <code>null</code> values
is used as the <code>oldExchange</code> (at the very first aggregation), when
using POJOs as the <code>AggregationStrategy</code>. See further below for more
details.</p></td></tr><tr><td colspan="1" rowspan="1"
class="confluenceTd"><p>completionSize</p></td><td colspan="1" rowspan="1"
class="confluenceTd"><p> </p></td><td colspan="1" rowspan="1"
class="confluenceTd"><p>Number of me
ssages aggregated before the aggregation is complete. This option can be set
as either a fixed value or using an <a shape="rect"
href="expression.html">Expression</a> which allows you to evaluate a size
dynamically - will use <code>Integer</code> as result. If both are set Camel
will fallback to use the fixed value if the <a shape="rect"
href="expression.html">Expression</a> result was <code>null</code> or
<code>0</code>.</p></td></tr><tr><td colspan="1" rowspan="1"
class="confluenceTd"><p>completionTimeout</p></td><td colspan="1" rowspan="1"
class="confluenceTd"><p> </p></td><td colspan="1" rowspan="1"
class="confluenceTd"><p>Time in millis that an aggregated exchange should be
inactive before its complete. This option can be set as either a fixed value or
using an <a shape="rect" href="expression.html">Expression</a> which allows you
to evaluate a timeout dynamically - will use <code>Long</code> as result. If
both are set Camel will fallback to use the fixed value if the <a s
hape="rect" href="expression.html">Expression</a> result was <code>null</code>
or <code>0</code>. You cannot use this option together with completionInterval,
only one of the two can be used.</p></td></tr><tr><td colspan="1" rowspan="1"
class="confluenceTd"><p>completionInterval</p></td><td colspan="1" rowspan="1"
class="confluenceTd"><p> </p></td><td colspan="1" rowspan="1"
class="confluenceTd"><p>A repeating period in millis by which the aggregator
will complete all current aggregated exchanges. Camel has a background task
which is triggered every period. You cannot use this option together with
completionTimeout, only one of them can be used.</p></td></tr><tr><td
colspan="1" rowspan="1" class="confluenceTd"><p>completionPredicate</p></td><td
colspan="1" rowspan="1" class="confluenceTd"><p> </p></td><td colspan="1"
rowspan="1" class="confluenceTd"><p>A <a shape="rect"
href="predicate.html">Predicate</a> to indicate when an aggregated exchange is
complete. Starting in <st
rong>Camel 2.15</strong>, if this is not specified and the AggregationStrategy
object implements Predicate, the aggregationStrategy object will be used as the
completionPredicate.</p></td></tr><tr><td colspan="1" rowspan="1"
class="confluenceTd"><p>completionFromBatchConsumer</p></td><td colspan="1"
rowspan="1" class="confluenceTd"><p><code>false</code></p></td><td colspan="1"
rowspan="1" class="confluenceTd"><p>This option is if the exchanges are coming
from a <a shape="rect" href="batch-consumer.html">Batch Consumer</a>. Then when
enabled the <a shape="rect" href="aggregator2.html">Aggregator2</a> will use
the batch size determined by the <a shape="rect"
href="batch-consumer.html">Batch Consumer</a> in the message header
<code>CamelBatchSize</code>. See more details at <a shape="rect"
href="batch-consumer.html">Batch Consumer</a>. This can be used to aggregate
all files consumed from a <a shape="rect" href="file2.html">File</a> endpoint
in that given poll.</p></td></tr><tr><td col
span="1" rowspan="1" class="confluenceTd"><p>forceCompletionOnStop</p></td><td
colspan="1" rowspan="1" class="confluenceTd"><p><code>false</code></p></td><td
colspan="1" rowspan="1" class="confluenceTd"><p><strong>Camel 2.9</strong>
Indicates to complete all current aggregated exchanges when the context is
stopped</p></td></tr><tr><td colspan="1" rowspan="1"
class="confluenceTd">completeAllOnStop</td><td colspan="1" rowspan="1"
class="confluenceTd"><code>false</code></td><td colspan="1" rowspan="1"
class="confluenceTd"><strong>Camel 2.16:</strong> Indicates to wait to
complete all current and partial (pending) aggregated exchanges when the
context is stopped. This also means that we will wait for all pending exchanges
which are stored in the aggregation repository to complete so the
repository is empty before we can stop.  You may want to enable this when
using the memory based aggregation repository that is memory based
only, and do not store data on disk.
When this option is enabled, then the aggregator is waiting to
complete all those exchanges before its stopped, when stopping
CamelContext or the route using it.</td></tr><tr><td colspan="1" rowspan="1"
class="confluenceTd"><p>eagerCheckCompletion</p></td><td colspan="1"
rowspan="1" class="confluenceTd"><p><code>false</code></p></td><td colspan="1"
rowspan="1" class="confluenceTd"><p>Whether or not to eager check for
completion when a new incoming <a shape="rect"
href="exchange.html">Exchange</a> has been received. This option influences the
behavior of the <code>completionPredicate</code> option as the <a shape="rect"
href="exchange.html">Exchange</a> being passed in changes accordingly. When
<code>false</code> the <a shape="rect" href="exchange.html">Exchange</a> passed
in the <a shape="rect" href="predicate.html">Predicate</a> is the
<em>aggregated</em> Exchange which means any information you may store on the
aggregated Exchange from the <code>AggregationStrategy</code> is
available for the <a shape="rect" href="predicate.html">Predicate</a>. When
<code>true</code> the <a shape="rect" href="exchange.html">Exchange</a> passed
in the <a shape="rect" href="predicate.html">Predicate</a> is the
<em>incoming</em> <a shape="rect" href="exchange.html">Exchange</a>, which
means you can access data from the incoming Exchange.</p></td></tr><tr><td
colspan="1" rowspan="1" class="confluenceTd"><p>groupExchanges</p></td><td
colspan="1" rowspan="1" class="confluenceTd"><p><code>false</code></p></td><td
colspan="1" rowspan="1" class="confluenceTd"><p>If enabled then Camel will
group all aggregated Exchanges into a single combined
<code>org.apache.camel.impl.GroupedExchange</code> holder class that holds all
the aggregated Exchanges. And as a result only one Exchange is being sent out
from the aggregator. Can be used to combine many incoming Exchanges into a
single output Exchange without coding a custom <code>AggregationStrategy</code>
yourself. <strong>Important:</s
trong> This option does <strong>not</strong> support persistent repository
with the aggregator. See further below for an example and more
details.</p></td></tr><tr><td colspan="1" rowspan="1"
class="confluenceTd"><p>ignoreInvalidCorrelationKeys</p></td><td colspan="1"
rowspan="1" class="confluenceTd"><p><code>false</code></p></td><td colspan="1"
rowspan="1" class="confluenceTd"><p>Whether or not to ignore correlation keys
which could not be evaluated to a value. By default Camel will throw an
Exception, but you can enable this option and ignore the situation
instead.</p></td></tr><tr><td colspan="1" rowspan="1"
class="confluenceTd"><p>closeCorrelationKeyOnCompletion</p></td><td colspan="1"
rowspan="1" class="confluenceTd"><p> </p></td><td colspan="1" rowspan="1"
class="confluenceTd"><p>Whether or not too <em>late</em> Exchanges should be
accepted or not. You can enable this to indicate that if a correlation key has
already been completed, then any new exchanges with the same co
rrelation key be denied. Camel will then throw a
<code>closedCorrelationKeyException</code> exception. When using this option
you pass in a <code>integer</code> which is a number for a LRUCache which keeps
that last X number of closed correlation keys. You can pass in 0 or a negative
value to indicate a unbounded cache. By passing in a number you are ensured
that cache won't grow too big if you use a log of different correlation
keys.</p></td></tr><tr><td colspan="1" rowspan="1"
class="confluenceTd"><p>discardOnCompletionTimeout</p></td><td colspan="1"
rowspan="1" class="confluenceTd"><p><code>false</code></p></td><td colspan="1"
rowspan="1" class="confluenceTd"><p><strong>Camel 2.5:</strong> Whether or not
exchanges which complete due to a timeout should be discarded. If enabled then
when a timeout occurs the aggregated message will <strong>not</strong> be sent
out but dropped (discarded).</p></td></tr><tr><td colspan="1" rowspan="1"
class="confluenceTd"><p>aggregationRepository</p
></td><td colspan="1" rowspan="1" class="confluenceTd"><p> </p></td><td
>colspan="1" rowspan="1" class="confluenceTd"><p>Allows you to plugin you own
>implementation of <code>org.apache.camel.spi.AggregationRepository</code>
>which keeps track of the current inflight aggregated exchanges. Camel uses by
>default a memory based implementation.</p></td></tr><tr><td colspan="1"
>rowspan="1" class="confluenceTd"><p>aggregationRepositoryRef</p></td><td
>colspan="1" rowspan="1" class="confluenceTd"><p> </p></td><td
>colspan="1" rowspan="1" class="confluenceTd"><p>Reference to lookup a
><code>aggregationRepository</code> in the <a shape="rect"
>href="registry.html">Registry</a>.</p></td></tr><tr><td colspan="1"
>rowspan="1" class="confluenceTd"><p>parallelProcessing</p></td><td
>colspan="1" rowspan="1"
>class="confluenceTd"><p><code>false</code></p></td><td colspan="1"
>rowspan="1" class="confluenceTd"><p>When aggregated are completed they are
>being send out of the aggregator. This option indi
cates whether or not Camel should use a thread pool with multiple threads for
concurrency. If no custom thread pool has been specified then Camel creates a
default pool with 10 concurrent threads.</p></td></tr><tr><td colspan="1"
rowspan="1" class="confluenceTd"><p>executorService</p></td><td colspan="1"
rowspan="1" class="confluenceTd"><p> </p></td><td colspan="1" rowspan="1"
class="confluenceTd"><p>If using <code>parallelProcessing</code> you can
specify a custom thread pool to be used. In fact also if you are not using
<code>parallelProcessing</code> this custom thread pool is used to send out
aggregated exchanges as well.</p></td></tr><tr><td colspan="1" rowspan="1"
class="confluenceTd"><p>executorServiceRef</p></td><td colspan="1" rowspan="1"
class="confluenceTd"><p> </p></td><td colspan="1" rowspan="1"
class="confluenceTd"><p>Reference to lookup a <code>executorService</code> in
the <a shape="rect" href="registry.html">Registry</a></p></td></tr><tr><td
colspan="1" ro
wspan="1" class="confluenceTd"><p>timeoutCheckerExecutorService</p></td><td
colspan="1" rowspan="1" class="confluenceTd"><p> </p></td><td colspan="1"
rowspan="1" class="confluenceTd"><p><strong>Camel 2.9:</strong> If using either
of the <code>completionTimeout</code>,
<code>completionTimeoutExpression</code>, or <code>completionInterval</code>
options a background thread is created to check for the completion for every
aggregator. Set this option to provide a custom thread pool to be used rather
than creating a new thread for every aggregator.</p></td></tr><tr><td
colspan="1" rowspan="1"
class="confluenceTd"><p>timeoutCheckerExecutorServiceRef</p></td><td
colspan="1" rowspan="1" class="confluenceTd"><p> </p></td><td colspan="1"
rowspan="1" class="confluenceTd"><p><strong>Camel 2.9:</strong> Reference to
lookup a <code>timeoutCheckerExecutorService</code> in the <a shape="rect"
href="registry.html">Registry</a></p></td></tr><tr><td colspan="1" rowspan="1"
class="confluenceT
d"><p>optimisticLocking</p></td><td colspan="1" rowspan="1"
class="confluenceTd"><p><code>false</code></p></td><td colspan="1" rowspan="1"
class="confluenceTd"><p><strong>Camel 2.11:</strong> Turns on using optimistic
locking, which requires the <code>aggregationRepository</code> being used, is
supporting this by implementing the
<code>org.apache.camel.spi.OptimisticLockingAggregationRepository</code>
interface.</p></td></tr><tr><td colspan="1" rowspan="1"
class="confluenceTd"><p>optimisticLockRetryPolicy</p></td><td colspan="1"
rowspan="1" class="confluenceTd"><p> </p></td><td colspan="1" rowspan="1"
class="confluenceTd"><p><strong>Camel 2.11.1:</strong> Allows to configure
retry settings when using optimistic
locking.</p></td></tr></tbody></table></div></div>
-
-
-<h3 id="Aggregator2-ExchangeProperties">Exchange Properties</h3><p>The
following properties are set on each aggregated Exchange:</p><div
class="confluenceTableSmall"><div class="table-wrap"><table
class="confluenceTable"><tbody><tr><th colspan="1" rowspan="1"
class="confluenceTh"><p>header</p></th><th colspan="1" rowspan="1"
class="confluenceTh"><p>type</p></th><th colspan="1" rowspan="1"
class="confluenceTh"><p>description</p></th></tr><tr><td colspan="1"
rowspan="1"
class="confluenceTd"><p><code>CamelAggregatedSize</code></p></td><td
colspan="1" rowspan="1" class="confluenceTd"><p>int</p></td><td colspan="1"
rowspan="1" class="confluenceTd"><p>The total number of Exchanges aggregated
into this combined Exchange.</p></td></tr><tr><td colspan="1" rowspan="1"
class="confluenceTd"><p><code>CamelAggregatedCompletedBy</code></p></td><td
colspan="1" rowspan="1" class="confluenceTd"><p>String</p></td><td colspan="1"
rowspan="1" class="confluenceTd"><p>Indicator how the aggregation was com
pleted as a value of either: <code>predicate</code>, <code>size</code>,
<code>strategy</code>, <code>consumer</code>, <code>timeout</code>,
<code>forceCompletion</code> or
<code>interval</code>.</p></td></tr></tbody></table></div></div>
-
-
-<h3 id="Aggregator2-AboutAggregationStrategy">About
AggregationStrategy</h3><p>The <code>AggregationStrategy</code> is used for
aggregating the old (lookup by its correlation id) and the new exchanges
together into a single exchange. Possible implementations include performing
some kind of combining or delta processing, such as adding line items together
into an invoice or just using the newest exchange and removing old exchanges
such as for state tracking or market data prices; where old values are of
little use.</p><p>Notice the aggregation strategy is a mandatory option and
must be provided to the aggregator.</p><p>Here are a few example
AggregationStrategy implementations that should help you create your own custom
strategy.</p><div class="code panel pdl" style="border-width: 1px;"><div
class="codeContent panelContent pdl">
-<script class="brush: java; gutter: false; theme: Default"
type="syntaxhighlighter"><![CDATA[//simply combines Exchange String body values
using '+' as a delimiter
+<div class="wiki-content maincontent"><h3
id="Aggregator2-Aggregator">Aggregator</h3><p><strong>This applies for Camel
version 2.3 or newer. If you use an older version then use this <a shape="rect"
href="aggregator.html">Aggregator</a> link instead.</strong></p><p>The <a
shape="rect" class="external-link"
href="http://www.enterpriseintegrationpatterns.com/Aggregator.html"
rel="nofollow">Aggregator</a> from the <a shape="rect"
href="enterprise-integration-patterns.html">EIP patterns</a> allows you to
combine a number of messages together into a single message.</p><p><span
class="confluence-embedded-file-wrapper"><img class="confluence-embedded-image
confluence-external-resource"
src="http://www.enterpriseintegrationpatterns.com/img/Aggregator.gif"
data-image-src="http://www.enterpriseintegrationpatterns.com/img/Aggregator.gif"></span></p><p>A
correlation <a shape="rect" href="expression.html">Expression</a> is used to
determine the messages which should be aggregated together. If yo
u want to aggregate all messages into a single message, just use a constant
expression. An AggregationStrategy is used to combine all the message exchanges
for a single correlation key into a single message exchange.</p><h3
id="Aggregator2-Aggregatoroptions">Aggregator options</h3><p>The aggregator
supports the following options:</p><parameter
ac:name="class">confluenceTableSmall</parameter><rich-text-body><div
class="table-wrap"><table class="confluenceTable"><tbody><tr><th colspan="1"
rowspan="1" class="confluenceTh"><p>Option</p></th><th colspan="1" rowspan="1"
class="confluenceTh"><p>Default</p></th><th colspan="1" rowspan="1"
class="confluenceTh"><p>Description</p></th></tr><tr><td colspan="1"
rowspan="1" class="confluenceTd"><p>correlationExpression</p></td><td
colspan="1" rowspan="1" class="confluenceTd"><p> </p></td><td colspan="1"
rowspan="1" class="confluenceTd"><p>Mandatory <a shape="rect"
href="expression.html">Expression</a> which evaluates the correlation key to u
se for aggregation. The <a shape="rect" href="exchange.html">Exchange</a>
which has the same correlation key is aggregated together. If the correlation
key could not be evaluated an Exception is thrown. You can disable this by
using the <code>ignoreBadCorrelationKeys</code> option.</p></td></tr><tr><td
colspan="1" rowspan="1" class="confluenceTd"><p>aggregationStrategy</p></td><td
colspan="1" rowspan="1" class="confluenceTd"><p> </p></td><td colspan="1"
rowspan="1" class="confluenceTd"><p>Mandatory <code>AggregationStrategy</code>
which is used to <em>merge</em> the incoming <a shape="rect"
href="exchange.html">Exchange</a> with the existing already merged exchanges.
At first call the <code>oldExchange</code> parameter is <code>null</code>. On
subsequent invocations the <code>oldExchange</code> contains the merged
exchanges and <code>newExchange</code> is of course the new incoming Exchange.
From <strong>Camel 2.9.2</strong> onwards the strategy can also be a
<code>TimeoutAware
AggregationStrategy</code> implementation, supporting the timeout callback,
see further below for more details. From <strong>Camel 2.16</strong> onwards
the strategy can also be a <code>PreCompletionAwareAggregationStrategy</code>
implementation which then runs the completion check in pre-completion mode. See
further below for more details.</p></td></tr><tr><td colspan="1" rowspan="1"
class="confluenceTd"><p>strategyRef</p></td><td colspan="1" rowspan="1"
class="confluenceTd"><p> </p></td><td colspan="1" rowspan="1"
class="confluenceTd"><p>A reference to lookup the
<code>AggregationStrategy</code> in the <a shape="rect"
href="registry.html">Registry</a>. From <strong>Camel 2.12</strong> onwards you
can also use a POJO as the <code>AggregationStrategy</code>, see further below
for details.</p></td></tr><tr><td colspan="1" rowspan="1"
class="confluenceTd"><p>strategyMethodName</p></td><td colspan="1" rowspan="1"
class="confluenceTd"><p> </p></td><td colspan="1" rowspan="1" c
lass="confluenceTd"><p><strong>Camel 2.12:</strong> This option can be used to
explicit declare the method name to use, when using POJOs as the
<code>AggregationStrategy</code>. See further below for more
details.</p></td></tr><tr><td colspan="1" rowspan="1"
class="confluenceTd"><p>strategyMethodAllowNull</p></td><td colspan="1"
rowspan="1" class="confluenceTd"><p><code>false</code></p></td><td colspan="1"
rowspan="1" class="confluenceTd"><p><strong>Camel 2.12:</strong> If this option
is <code>false</code> then the aggregate method is not used for the very first
aggregation. If this option is <code>true</code> then <code>null</code> values
is used as the <code>oldExchange</code> (at the very first aggregation), when
using POJOs as the <code>AggregationStrategy</code>. See further below for more
details.</p></td></tr><tr><td colspan="1" rowspan="1"
class="confluenceTd"><p>completionSize</p></td><td colspan="1" rowspan="1"
class="confluenceTd"><p> </p></td><td colspan="1" rowspan
="1" class="confluenceTd"><p>Number of messages aggregated before the
aggregation is complete. This option can be set as either a fixed value or
using an <a shape="rect" href="expression.html">Expression</a> which allows you
to evaluate a size dynamically - will use <code>Integer</code> as result. If
both are set Camel will fallback to use the fixed value if the <a shape="rect"
href="expression.html">Expression</a> result was <code>null</code> or
<code>0</code>.</p></td></tr><tr><td colspan="1" rowspan="1"
class="confluenceTd"><p>completionTimeout</p></td><td colspan="1" rowspan="1"
class="confluenceTd"><p> </p></td><td colspan="1" rowspan="1"
class="confluenceTd"><p>Time in millis that an aggregated exchange should be
inactive before its complete. This option can be set as either a fixed value or
using an <a shape="rect" href="expression.html">Expression</a> which allows you
to evaluate a timeout dynamically - will use <code>Long</code> as result. If
both are set Camel will fa
llback to use the fixed value if the <a shape="rect"
href="expression.html">Expression</a> result was <code>null</code> or
<code>0</code>. You cannot use this option together with completionInterval,
only one of the two can be used.</p></td></tr><tr><td colspan="1" rowspan="1"
class="confluenceTd"><p>completionInterval</p></td><td colspan="1" rowspan="1"
class="confluenceTd"><p> </p></td><td colspan="1" rowspan="1"
class="confluenceTd"><p>A repeating period in millis by which the aggregator
will complete all current aggregated exchanges. Camel has a background task
which is triggered every period. You cannot use this option together with
completionTimeout, only one of them can be used.</p></td></tr><tr><td
colspan="1" rowspan="1" class="confluenceTd"><p>completionPredicate</p></td><td
colspan="1" rowspan="1" class="confluenceTd"><p> </p></td><td colspan="1"
rowspan="1" class="confluenceTd"><p>A <a shape="rect"
href="predicate.html">Predicate</a> to indicate when an aggrega
ted exchange is complete. Starting in <strong>Camel 2.15</strong>, if this is
not specified and the AggregationStrategy object implements Predicate, the
aggregationStrategy object will be used as the
completionPredicate.</p></td></tr><tr><td colspan="1" rowspan="1"
class="confluenceTd"><p>completionFromBatchConsumer</p></td><td colspan="1"
rowspan="1" class="confluenceTd"><p><code>false</code></p></td><td colspan="1"
rowspan="1" class="confluenceTd"><p>This option is if the exchanges are coming
from a <a shape="rect" href="batch-consumer.html">Batch Consumer</a>. Then when
enabled the <a shape="rect" href="aggregator2.html">Aggregator2</a> will use
the batch size determined by the <a shape="rect"
href="batch-consumer.html">Batch Consumer</a> in the message header
<code>CamelBatchSize</code>. See more details at <a shape="rect"
href="batch-consumer.html">Batch Consumer</a>. This can be used to aggregate
all files consumed from a <a shape="rect" href="file2.html">File</a> endpoint
in
that given poll.</p></td></tr><tr><td colspan="1" rowspan="1"
class="confluenceTd"><p>forceCompletionOnStop</p></td><td colspan="1"
rowspan="1" class="confluenceTd"><p><code>false</code></p></td><td colspan="1"
rowspan="1" class="confluenceTd"><p><strong>Camel 2.9</strong> Indicates to
complete all current aggregated exchanges when the context is
stopped</p></td></tr><tr><td colspan="1" rowspan="1"
class="confluenceTd">completeAllOnStop</td><td colspan="1" rowspan="1"
class="confluenceTd"><code>false</code></td><td colspan="1" rowspan="1"
class="confluenceTd"><strong>Camel 2.16:</strong> Indicates to wait to
complete all current and partial (pending) aggregated exchanges when the
context is stopped. This also means that we will wait for all pending exchanges
which are stored in the aggregation repository to complete so the
repository is empty before we can stop.  You may want to enable this when
using the memory based aggregation repository that is memory based o
nly, and do not store data on disk. When this option is enabled, then the
aggregator is waiting to complete all those exchanges before its stopped,
when stopping CamelContext or the route using it.</td></tr><tr><td colspan="1"
rowspan="1" class="confluenceTd"><p>eagerCheckCompletion</p></td><td
colspan="1" rowspan="1" class="confluenceTd"><p><code>false</code></p></td><td
colspan="1" rowspan="1" class="confluenceTd"><p>Whether or not to eager check
for completion when a new incoming <a shape="rect"
href="exchange.html">Exchange</a> has been received. This option influences the
behavior of the <code>completionPredicate</code> option as the <a shape="rect"
href="exchange.html">Exchange</a> being passed in changes accordingly. When
<code>false</code> the <a shape="rect" href="exchange.html">Exchange</a> passed
in the <a shape="rect" href="predicate.html">Predicate</a> is the
<em>aggregated</em> Exchange which means any information you may store on the
aggregated Exchange from
the <code>AggregationStrategy</code> is available for the <a shape="rect"
href="predicate.html">Predicate</a>. When <code>true</code> the <a shape="rect"
href="exchange.html">Exchange</a> passed in the <a shape="rect"
href="predicate.html">Predicate</a> is the <em>incoming</em> <a shape="rect"
href="exchange.html">Exchange</a>, which means you can access data from the
incoming Exchange.</p></td></tr><tr><td colspan="1" rowspan="1"
class="confluenceTd"><p>groupExchanges</p></td><td colspan="1" rowspan="1"
class="confluenceTd"><p><code>false</code></p></td><td colspan="1" rowspan="1"
class="confluenceTd"><p>If enabled then Camel will group all aggregated
Exchanges into a single combined
<code>org.apache.camel.impl.GroupedExchange</code> holder class that holds all
the aggregated Exchanges. And as a result only one Exchange is being sent out
from the aggregator. Can be used to combine many incoming Exchanges into a
single output Exchange without coding a custom <code>AggregationStrate
gy</code> yourself. <strong>Important:</strong> This option does
<strong>not</strong> support persistent repository with the aggregator. See
further below for an example and more details.</p></td></tr><tr><td colspan="1"
rowspan="1" class="confluenceTd"><p>ignoreInvalidCorrelationKeys</p></td><td
colspan="1" rowspan="1" class="confluenceTd"><p><code>false</code></p></td><td
colspan="1" rowspan="1" class="confluenceTd"><p>Whether or not to ignore
correlation keys which could not be evaluated to a value. By default Camel will
throw an Exception, but you can enable this option and ignore the situation
instead.</p></td></tr><tr><td colspan="1" rowspan="1"
class="confluenceTd"><p>closeCorrelationKeyOnCompletion</p></td><td colspan="1"
rowspan="1" class="confluenceTd"><p> </p></td><td colspan="1" rowspan="1"
class="confluenceTd"><p>Whether or not too <em>late</em> Exchanges should be
accepted or not. You can enable this to indicate that if a correlation key has
already been completed
, then any new exchanges with the same correlation key be denied. Camel will
then throw a <code>closedCorrelationKeyException</code> exception. When using
this option you pass in a <code>integer</code> which is a number for a LRUCache
which keeps that last X number of closed correlation keys. You can pass in 0 or
a negative value to indicate a unbounded cache. By passing in a number you are
ensured that cache won't grow too big if you use a log of different correlation
keys.</p></td></tr><tr><td colspan="1" rowspan="1"
class="confluenceTd"><p>discardOnCompletionTimeout</p></td><td colspan="1"
rowspan="1" class="confluenceTd"><p><code>false</code></p></td><td colspan="1"
rowspan="1" class="confluenceTd"><p><strong>Camel 2.5:</strong> Whether or not
exchanges which complete due to a timeout should be discarded. If enabled then
when a timeout occurs the aggregated message will <strong>not</strong> be sent
out but dropped (discarded).</p></td></tr><tr><td colspan="1" rowspan="1"
class="
confluenceTd"><p>aggregationRepository</p></td><td colspan="1" rowspan="1"
class="confluenceTd"><p> </p></td><td colspan="1" rowspan="1"
class="confluenceTd"><p>Allows you to plugin you own implementation of
<code>org.apache.camel.spi.AggregationRepository</code> which keeps track of
the current inflight aggregated exchanges. Camel uses by default a memory based
implementation.</p></td></tr><tr><td colspan="1" rowspan="1"
class="confluenceTd"><p>aggregationRepositoryRef</p></td><td colspan="1"
rowspan="1" class="confluenceTd"><p> </p></td><td colspan="1" rowspan="1"
class="confluenceTd"><p>Reference to lookup a
<code>aggregationRepository</code> in the <a shape="rect"
href="registry.html">Registry</a>.</p></td></tr><tr><td colspan="1" rowspan="1"
class="confluenceTd"><p>parallelProcessing</p></td><td colspan="1" rowspan="1"
class="confluenceTd"><p><code>false</code></p></td><td colspan="1" rowspan="1"
class="confluenceTd"><p>When aggregated are completed they are being sen
d out of the aggregator. This option indicates whether or not Camel should use
a thread pool with multiple threads for concurrency. If no custom thread pool
has been specified then Camel creates a default pool with 10 concurrent
threads.</p></td></tr><tr><td colspan="1" rowspan="1"
class="confluenceTd"><p>executorService</p></td><td colspan="1" rowspan="1"
class="confluenceTd"><p> </p></td><td colspan="1" rowspan="1"
class="confluenceTd"><p>If using <code>parallelProcessing</code> you can
specify a custom thread pool to be used. In fact also if you are not using
<code>parallelProcessing</code> this custom thread pool is used to send out
aggregated exchanges as well.</p></td></tr><tr><td colspan="1" rowspan="1"
class="confluenceTd"><p>executorServiceRef</p></td><td colspan="1" rowspan="1"
class="confluenceTd"><p> </p></td><td colspan="1" rowspan="1"
class="confluenceTd"><p>Reference to lookup a <code>executorService</code> in
the <a shape="rect" href="registry.html">Registr
y</a></p></td></tr><tr><td colspan="1" rowspan="1"
class="confluenceTd"><p>timeoutCheckerExecutorService</p></td><td colspan="1"
rowspan="1" class="confluenceTd"><p> </p></td><td colspan="1" rowspan="1"
class="confluenceTd"><p><strong>Camel 2.9:</strong> If using either of the
<code>completionTimeout</code>, <code>completionTimeoutExpression</code>, or
<code>completionInterval</code> options a background thread is created to check
for the completion for every aggregator. Set this option to provide a custom
thread pool to be used rather than creating a new thread for every
aggregator.</p></td></tr><tr><td colspan="1" rowspan="1"
class="confluenceTd"><p>timeoutCheckerExecutorServiceRef</p></td><td
colspan="1" rowspan="1" class="confluenceTd"><p> </p></td><td colspan="1"
rowspan="1" class="confluenceTd"><p><strong>Camel 2.9:</strong> Reference to
lookup a <code>timeoutCheckerExecutorService</code> in the <a shape="rect"
href="registry.html">Registry</a></p></td></tr><tr><td c
olspan="1" rowspan="1" class="confluenceTd"><p>optimisticLocking</p></td><td
colspan="1" rowspan="1" class="confluenceTd"><p><code>false</code></p></td><td
colspan="1" rowspan="1" class="confluenceTd"><p><strong>Camel 2.11:</strong>
Turns on using optimistic locking, which requires the
<code>aggregationRepository</code> being used, is supporting this by
implementing the
<code>org.apache.camel.spi.OptimisticLockingAggregationRepository</code>
interface.</p></td></tr><tr><td colspan="1" rowspan="1"
class="confluenceTd"><p>optimisticLockRetryPolicy</p></td><td colspan="1"
rowspan="1" class="confluenceTd"><p> </p></td><td colspan="1" rowspan="1"
class="confluenceTd"><p><strong>Camel 2.11.1:</strong> Allows to configure
retry settings when using optimistic
locking.</p></td></tr></tbody></table></div></rich-text-body><h3
id="Aggregator2-ExchangeProperties">Exchange Properties</h3><p>The following
properties are set on each aggregated Exchange:</p><parameter
ac:name="class">confluence
TableSmall</parameter><rich-text-body><div class="table-wrap"><table
class="confluenceTable"><tbody><tr><th colspan="1" rowspan="1"
class="confluenceTh"><p>header</p></th><th colspan="1" rowspan="1"
class="confluenceTh"><p>type</p></th><th colspan="1" rowspan="1"
class="confluenceTh"><p>description</p></th></tr><tr><td colspan="1"
rowspan="1"
class="confluenceTd"><p><code>CamelAggregatedSize</code></p></td><td
colspan="1" rowspan="1" class="confluenceTd"><p>int</p></td><td colspan="1"
rowspan="1" class="confluenceTd"><p>The total number of Exchanges aggregated
into this combined Exchange.</p></td></tr><tr><td colspan="1" rowspan="1"
class="confluenceTd"><p><code>CamelAggregatedCompletedBy</code></p></td><td
colspan="1" rowspan="1" class="confluenceTd"><p>String</p></td><td colspan="1"
rowspan="1" class="confluenceTd"><p>Indicator how the aggregation was completed
as a value of either: <code>predicate</code>, <code>size</code>,
<code>strategy</code>, <code>consumer</code>, <code>time
out</code>, <code>forceCompletion</code> or
<code>interval</code>.</p></td></tr></tbody></table></div></rich-text-body><h3
id="Aggregator2-AboutAggregationStrategy">About AggregationStrategy</h3><p>The
<code>AggregationStrategy</code> is used for aggregating the old (lookup by its
correlation id) and the new exchanges together into a single exchange. Possible
implementations include performing some kind of combining or delta processing,
such as adding line items together into an invoice or just using the newest
exchange and removing old exchanges such as for state tracking or market data
prices; where old values are of little use.</p><p>Notice the aggregation
strategy is a mandatory option and must be provided to the
aggregator.</p><p>Here are a few example AggregationStrategy implementations
that should help you create your own custom
strategy.</p><plain-text-body>//simply combines Exchange String body values
using '+' as a delimiter
class StringAggregationStrategy implements AggregationStrategy {
public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
@@ -103,7 +85,7 @@ class StringAggregationStrategy implemen
String oldBody = oldExchange.getIn().getBody(String.class);
String newBody = newExchange.getIn().getBody(String.class);
- oldExchange.getIn().setBody(oldBody + "+" + newBody);
+ oldExchange.getIn().setBody(oldBody + "+" + newBody);
return oldExchange;
}
}
@@ -126,9 +108,7 @@ class ArrayListAggregationStrategy imple
}
}
}
-]]></script>
-</div></div><h3 id="Aggregator2-Aboutcompletion">About completion</h3><p>When
aggregation <a shape="rect" href="exchange.html">Exchange</a>s at some point
you need to indicate that the aggregated exchanges is complete, so they can be
send out of the aggregator. Camel allows you to indicate completion in various
ways as follows:</p><ul class="alternate"><li>completionTimeout - Is an
inactivity timeout in which is triggered if no new exchanges have been
aggregated for that particular correlation key within the
period.</li><li>completionInterval - Once every X period all the current
aggregated exchanges are completed.</li><li>completionSize - Is a number
indicating that after X aggregated exchanges it's
complete.</li><li>completionPredicate - Runs a <a shape="rect"
href="predicate.html">Predicate</a> when a new exchange is aggregated to
determine if we are complete or not. Staring in <strong>Camel
2.15</strong>, the configured aggregationStrategy can implement the Predicate
interf
ace and will be used as the completionPredicate if no completionPredicate is
configured. From <strong>Camel 2.16</strong>, the configured
aggregationStrategy can
implement <code>PreCompletionAwareAggregationStrategy</code> and will be
used as the completionPredicate in pre-complete check mode. See further below
for more details.</li><li>completionFromBatchConsumer - Special option for <a
shape="rect" href="batch-consumer.html">Batch Consumer</a> which allows you to
complete when all the messages from the batch has been
aggregated.</li><li>forceCompletionOnStop - <strong>Camel 2.9</strong>
Indicates to complete all current aggregated exchanges when the context is
stopped</li><li>Using a <code>AggregateController</code>
- <strong>Camel 2.16</strong> which allows to use an external source to
complete groups or all groups. This can be done using Java or JMX
API.</li></ul><p>Notice that all the completion ways are per correlation key.
And you can combine them in any w
ay you like. It's basically the first which triggers that wins. So you can use
a completion size together with a completion timeout. Only completionTimeout
and completionInterval cannot be used at the same time.</p><p>Notice the
completion is a mandatory option and must be provided to the aggregator. If not
provided Camel will thrown an Exception on startup.</p><div
class="confluence-information-macro confluence-information-macro-tip"><p
class="title">Callbacks</p><span class="aui-icon aui-icon-small
aui-iconfont-approve confluence-information-macro-icon"></span><div
class="confluence-information-macro-body"><p>See the
<code>TimeoutAwareAggregationStrategy</code> and
<code>CompletionAwareAggregationStrategy</code> extensions to
<code>AggregationStrategy</code> that has callbacks when the aggregated
Exchange was completed and if a timeout occurred.</p></div></div><h3
id="Aggregator2-Pre-completionmode">Pre-completion
mode</h3><p><strong>available as of Camel 2.16</strong></p><p>There
can be use-cases where you want the incoming <a shape="rect"
href="exchange.html">Exchange</a> to determine if the correlation group
should pre-complete, and then the incoming <a shape="rect"
href="exchange.html">Exchange</a> is starting a new group from scratch. To
determine this the <code>AggregationStrategy</code> can
implement <code>PreCompletionAwareAggregationStrategy</code> which has
a <code>preComplete</code> method:</p><div class="code panel pdl"
style="border-width: 1px;"><div class="codeContent panelContent pdl">
-<script class="brush: java; gutter: false; theme: Default"
type="syntaxhighlighter"><![CDATA[ /**
+</plain-text-body><h3 id="Aggregator2-Aboutcompletion">About
completion</h3><p>When aggregation <a shape="rect"
href="exchange.html">Exchange</a>s at some point you need to indicate that the
aggregated exchanges is complete, so they can be send out of the aggregator.
Camel allows you to indicate completion in various ways as follows:</p><ul
class="alternate"><li>completionTimeout - Is an inactivity timeout in which is
triggered if no new exchanges have been aggregated for that particular
correlation key within the period.</li><li>completionInterval - Once every X
period all the current aggregated exchanges are
completed.</li><li>completionSize - Is a number indicating that after X
aggregated exchanges it's complete.</li><li>completionPredicate - Runs a <a
shape="rect" href="predicate.html">Predicate</a> when a new exchange is
aggregated to determine if we are complete or not. Staring
in <strong>Camel 2.15</strong>, the configured aggregationStrategy can
implement the Predicate
interface and will be used as the completionPredicate if no
completionPredicate is configured. From <strong>Camel 2.16</strong>, the
configured aggregationStrategy can
implement <code>PreCompletionAwareAggregationStrategy</code> and will be
used as the completionPredicate in pre-complete check mode. See further below
for more details.</li><li>completionFromBatchConsumer - Special option for <a
shape="rect" href="batch-consumer.html">Batch Consumer</a> which allows you to
complete when all the messages from the batch has been
aggregated.</li><li>forceCompletionOnStop - <strong>Camel 2.9</strong>
Indicates to complete all current aggregated exchanges when the context is
stopped</li><li>Using a <code>AggregateController</code>
- <strong>Camel 2.16</strong> which allows to use an external source to
complete groups or all groups. This can be done using Java or JMX
API.</li></ul><p>Notice that all the completion ways are per correlation key.
And you can combine them in
any way you like. It's basically the first which triggers that wins. So you
can use a completion size together with a completion timeout. Only
completionTimeout and completionInterval cannot be used at the same
time.</p><p>Notice the completion is a mandatory option and must be provided to
the aggregator. If not provided Camel will thrown an Exception on
startup.</p><parameter
ac:name="title">Callbacks</parameter><rich-text-body><p>See the
<code>TimeoutAwareAggregationStrategy</code> and
<code>CompletionAwareAggregationStrategy</code> extensions to
<code>AggregationStrategy</code> that has callbacks when the aggregated
Exchange was completed and if a timeout occurred.</p></rich-text-body><h3
id="Aggregator2-Pre-completionmode">Pre-completion
mode</h3><p><strong>available as of Camel 2.16</strong></p><p>There can be
use-cases where you want the incoming <a shape="rect"
href="exchange.html">Exchange</a> to determine if the correlation group
should pre-complete, and then the inco
ming <a shape="rect" href="exchange.html">Exchange</a> is starting a new
group from scratch. To determine this the <code>AggregationStrategy</code>
can implement <code>PreCompletionAwareAggregationStrategy</code> which has
a <code>preComplete</code> method:</p><plain-text-body> /**
* Determines if the aggregation should complete the current group, and
start a new group, or the aggregation
* should continue using the current group.
*
@@ -136,187 +116,13 @@ class ArrayListAggregationStrategy imple
* @param newExchange the newest exchange (can be
<tt>null</tt> if there was no data possible to acquire)
* @return <tt>true</tt> to complete current group and start a
new group, or <tt>false</tt> to keep using current
*/
- boolean preComplete(Exchange oldExchange, Exchange
newExchange);]]></script>
-</div></div><p>If the preComplete method returns true, then the existing
groups is completed (without aggregating the incoming exchange (newExchange).
And then the newExchange is used to start the correlation group from scratch so
the group would contain only that new incoming exchange. This is known as
pre-completion mode. And when the aggregation is in pre-completion mode, then
only the following completions are in use</p><ul style="list-style-type:
square;"><li>aggregationStrategy must
implement <code>PreCompletionAwareAggregationStrategy</code> xxx</li><li>completionTimeout
or completionInterval can also be used as fallback completions</li><li>any
other completion are not used (such as by size, from batch consumer
etc)</li><li>eagerCheckCompletion is implied as true, but the option has no
effect</li></ul><h3 id="Aggregator2-PersistentAggregationRepository">Persistent
AggregationRepository</h3><p>The aggregator provides a pluggable repository
which you can implement you
r own <code>org.apache.camel.spi.AggregationRepository</code>.<br
clear="none"> If you need persistent repository then you can use either Camel
<a shape="rect" href="hawtdb.html">HawtDB</a>, <a shape="rect"
href="leveldb.html">LevelDB</a>, or <a shape="rect"
href="sql-component.html">SQL Component</a> components.</p><h3
id="Aggregator2-Examples">Examples</h3><p>See some examples from the old <a
shape="rect" href="aggregator.html">Aggregator</a> which is somewhat similar to
this new aggregator.</p><div class="confluence-information-macro
confluence-information-macro-tip"><p class="title">Setting options in Spring
XML</p><span class="aui-icon aui-icon-small aui-iconfont-approve
confluence-information-macro-icon"></span><div
class="confluence-information-macro-body"><p>Many of the options are
configurable as attributes on the <code><aggregate></code> tag when using
Spring XML.</p></div></div><h4 id="Aggregator2-UsingcompletionTimeout">Using
completionTimeout</h4><p>In this exampl
e we want to aggregate all incoming messages and after 3 seconds of inactivity
we want the aggregation to complete. This is done using the
<code>completionTimeout</code> option as shown:</p><div class="code panel pdl"
style="border-width: 1px;"><div class="codeContent panelContent pdl">
-<script class="brush: java; gutter: false; theme: Default"
type="syntaxhighlighter"><![CDATA[
-from("direct:start")
- // aggregate all exchanges correlated by the id header.
- // Aggregate them using the BodyInAggregatingStrategy strategy which
- // and after 0.1 second of inactivity them timeout and complete the
aggregation
- // and send it to mock:aggregated
- .aggregate(header("id"), new
BodyInAggregatingStrategy()).completionTimeout(100).completionTimeoutCheckerInterval(10)
- .to("mock:aggregated");
-]]></script>
-</div></div>And the same example using Spring XML:<div class="code panel pdl"
style="border-width: 1px;"><div class="codeContent panelContent pdl">
-<script class="brush: xml; gutter: false; theme: Default"
type="syntaxhighlighter"><![CDATA[
-<camelContext xmlns="http://camel.apache.org/schema/spring">
- <route>
- <from uri="direct:start"/>
- <aggregate strategyRef="aggregatorStrategy"
completionTimeout="100"
completionTimeoutCheckerInterval="10">
- <correlationExpression>
- <simple>header.id</simple>
- </correlationExpression>
- <to uri="mock:aggregated"/>
- </aggregate>
- </route>
-</camelContext>
-
-<bean id="aggregatorStrategy"
class="org.apache.camel.processor.BodyInAggregatingStrategy"/>
-]]></script>
-</div></div><h4 id="Aggregator2-UsingTimeoutAwareAggregationStrategy">Using
TimeoutAwareAggregationStrategy</h4><p><strong>Available as of Camel
2.9.2</strong></p><p>If your aggregation strategy implements
<code>TimeoutAwareAggregationStrategy</code>, then Camel will invoke the
<code>timeout</code> method when the timeout occurs. Notice that the values for
index and total parameters will be -1, and the timeout parameter will be
provided only if configured as a fixed value. You must <strong>not</strong>
throw any exceptions from the <code>timeout</code> method.</p><h4
id="Aggregator2-UsingCompletionAwareAggregationStrategy">Using
CompletionAwareAggregationStrategy</h4><p><strong>Available as of Camel
2.9.3</strong></p><p>If your aggregation strategy implements
<code>CompletionAwareAggregationStrategy</code>, then Camel will invoke the
<code>onComplete</code> method when the aggregated Exchange is completed. This
allows you to do any last minute custom logic such as to cleanup some re
sources, or additional work on the exchange as it's now completed.<br
clear="none"> You must <strong>not</strong> throw any exceptions from the
<code>onCompletion</code> method.</p><h4
id="Aggregator2-UsingcompletionSize">Using completionSize</h4><p>In this
example we want to aggregate all incoming messages and when we have 3 messages
aggregated (in the same correlation group) we want the aggregation to complete.
This is done using the <code>completionSize</code> option as shown:</p><div
class="code panel pdl" style="border-width: 1px;"><div class="codeContent
panelContent pdl">
-<script class="brush: java; gutter: false; theme: Default"
type="syntaxhighlighter"><![CDATA[
-from("direct:start")
- // aggregate all exchanges correlated by the id header.
- // Aggregate them using the BodyInAggregatingStrategy strategy which
- // and after 3 messages has been aggregated then complete the aggregation
- // and send it to mock:aggregated
- .aggregate(header("id"), new
BodyInAggregatingStrategy()).completionSize(3)
- .to("mock:aggregated");
-]]></script>
-</div></div>And the same example using Spring XML:<div class="code panel pdl"
style="border-width: 1px;"><div class="codeContent panelContent pdl">
-<script class="brush: xml; gutter: false; theme: Default"
type="syntaxhighlighter"><![CDATA[
-<camelContext xmlns="http://camel.apache.org/schema/spring">
- <route>
- <from uri="direct:start"/>
- <aggregate strategyRef="aggregatorStrategy"
completionSize="3">
- <correlationExpression>
- <simple>header.id</simple>
- </correlationExpression>
- <to uri="mock:aggregated"/>
- </aggregate>
- </route>
-</camelContext>
-
-<bean id="aggregatorStrategy"
class="org.apache.camel.processor.BodyInAggregatingStrategy"/>
-]]></script>
-</div></div><h4 id="Aggregator2-UsingcompletionPredicate">Using
completionPredicate</h4><p>In this example we want to aggregate all incoming
messages and use a <a shape="rect" href="predicate.html">Predicate</a> to
determine when we are complete. The <a shape="rect"
href="predicate.html">Predicate</a> can be evaluated using either the
aggregated exchange (default) or the incoming exchange. We will give an example
for both situations. We start with the default situation as shown:</p><div
class="code panel pdl" style="border-width: 1px;"><div class="codeContent
panelContent pdl">
-<script class="brush: java; gutter: false; theme: Default"
type="syntaxhighlighter"><![CDATA[
-from("direct:start")
- // aggregate all exchanges correlated by the id header.
- // Aggregate them using the BodyInAggregatingStrategy strategy which
- // and when the aggregated body contains A+B+C then complete the
aggregation
- // and send it to mock:aggregated
- .aggregate(header("id"), new
BodyInAggregatingStrategy()).completionPredicate(body().contains("A+B+C"))
- .to("mock:aggregated");
-]]></script>
-</div></div>And the same example using Spring XML:<div class="code panel pdl"
style="border-width: 1px;"><div class="codeContent panelContent pdl">
-<script class="brush: xml; gutter: false; theme: Default"
type="syntaxhighlighter"><![CDATA[
-<camelContext xmlns="http://camel.apache.org/schema/spring">
- <route>
- <from uri="direct:start"/>
- <aggregate strategyRef="aggregatorStrategy">
- <correlationExpression>
- <simple>header.id</simple>
- </correlationExpression>
- <completionPredicate>
- <simple>${body} contains 'A+B+C'</simple>
- </completionPredicate>
- <to uri="mock:aggregated"/>
- </aggregate>
- </route>
-</camelContext>
-
-<bean id="aggregatorStrategy"
class="org.apache.camel.processor.BodyInAggregatingStrategy"/>
-]]></script>
-</div></div>And the other situation where we use the
<code>eagerCheckCompletion</code> option to tell Camel to use the incoming
Exchange. Notice how we can just test in the completion predicate that the
incoming message is the <em>END</em> message:<div class="code panel pdl"
style="border-width: 1px;"><div class="codeContent panelContent pdl">
-<script class="brush: java; gutter: false; theme: Default"
type="syntaxhighlighter"><![CDATA[
-from("direct:start")
- // aggregate all exchanges correlated by the id header.
- // Aggregate them using the BodyInAggregatingStrategy strategy
- // do eager checking which means the completion predicate will use the
incoming exchange
- // which allows us to trigger completion when a certain exchange arrived
which is the
- // END message
- .aggregate(header("id"), new BodyInAggregatingStrategy())
-
.eagerCheckCompletion().completionPredicate(body().isEqualTo("END"))
- .to("mock:aggregated");
-]]></script>
-</div></div>And the same example using Spring XML:<div class="code panel pdl"
style="border-width: 1px;"><div class="codeContent panelContent pdl">
-<script class="brush: xml; gutter: false; theme: Default"
type="syntaxhighlighter"><![CDATA[
-<camelContext xmlns="http://camel.apache.org/schema/spring">
- <route>
- <from uri="direct:start"/>
- <aggregate strategyRef="aggregatorStrategy"
eagerCheckCompletion="true">
- <correlationExpression>
- <simple>header.id</simple>
- </correlationExpression>
- <completionPredicate>
- <simple>${body} == 'END'</simple>
- </completionPredicate>
- <to uri="mock:aggregated"/>
- </aggregate>
- </route>
-</camelContext>
-
-<bean id="aggregatorStrategy"
class="org.apache.camel.processor.BodyInAggregatingStrategy"/>
-]]></script>
-</div></div><h4 id="Aggregator2-UsingdynamiccompletionTimeout">Using dynamic
completionTimeout</h4><p>In this example we want to aggregate all incoming
messages and after a period of inactivity we want the aggregation to complete.
The period should be computed at runtime based on the <code>timeout</code>
header in the incoming messages. This is done using the
<code>completionTimeout</code> option as shown:</p><div class="code panel pdl"
style="border-width: 1px;"><div class="codeContent panelContent pdl">
-<script class="brush: java; gutter: false; theme: Default"
type="syntaxhighlighter"><![CDATA[
-from("direct:start")
- // aggregate all exchanges correlated by the id header.
- // Aggregate them using the BodyInAggregatingStrategy strategy which
- // and the timeout header contains the timeout in millis of inactivity
them timeout and complete the aggregation
- // and send it to mock:aggregated
- .aggregate(header("id"), new
BodyInAggregatingStrategy()).completionTimeout(header("timeout")).completionTimeoutCheckerInterval(10)
- .to("mock:aggregated");
-]]></script>
-</div></div>And the same example using Spring XML:<div class="code panel pdl"
style="border-width: 1px;"><div class="codeContent panelContent pdl">
-<script class="brush: xml; gutter: false; theme: Default"
type="syntaxhighlighter"><![CDATA[
-<camelContext xmlns="http://camel.apache.org/schema/spring">
- <route>
- <from uri="direct:start"/>
- <aggregate strategyRef="aggregatorStrategy"
completionTimeoutCheckerInterval="10">
- <correlationExpression>
- <simple>header.id</simple>
- </correlationExpression>
- <completionTimeout>
- <header>timeout</header>
- </completionTimeout>
- <to uri="mock:aggregated"/>
- </aggregate>
- </route>
-</camelContext>
-
-<bean id="aggregatorStrategy"
class="org.apache.camel.processor.BodyInAggregatingStrategy"/>
-]]></script>
-</div></div><strong>Note:</strong> You can also add a fixed timeout value and
Camel will fallback to use this value if the dynamic value was
<code>null</code> or <code>0</code>.<h4
id="Aggregator2-UsingdynamiccompletionSize">Using dynamic
completionSize</h4><p>In this example we want to aggregate all incoming
messages based on a dynamic size per correlation key. The size is computed at
runtime based on the <code>mySize</code> header in the incoming messages. This
is done using the <code>completionSize</code> option as shown:</p><div
class="code panel pdl" style="border-width: 1px;"><div class="codeContent
panelContent pdl">
-<script class="brush: java; gutter: false; theme: Default"
type="syntaxhighlighter"><![CDATA[
-from("direct:start")
- // aggregate all exchanges correlated by the id header.
- // Aggregate them using the BodyInAggregatingStrategy strategy which
- // and the header mySize determines the number of aggregated messages
should trigger the completion
- // and send it to mock:aggregated
- .aggregate(header("id"), new
BodyInAggregatingStrategy()).completionSize(header("mySize"))
- .to("mock:aggregated");
-]]></script>
-</div></div>And the same example using Spring XML:<div class="code panel pdl"
style="border-width: 1px;"><div class="codeContent panelContent pdl">
-<script class="brush: xml; gutter: false; theme: Default"
type="syntaxhighlighter"><![CDATA[
-<camelContext xmlns="http://camel.apache.org/schema/spring">
- <route>
- <from uri="direct:start"/>
- <aggregate strategyRef="aggregatorStrategy">
- <correlationExpression>
- <simple>header.id</simple>
- </correlationExpression>
- <completionSize>
- <header>mySize</header>
- </completionSize>
- <to uri="mock:aggregated"/>
- </aggregate>
- </route>
-</camelContext>
-
-<bean id="aggregatorStrategy"
class="org.apache.camel.processor.BodyInAggregatingStrategy"/>
-]]></script>
-</div></div><strong>Note:</strong> You can also add a fixed size value and
Camel will fallback to use this value if the dynamic value was
<code>null</code> or <code>0</code>.<p></p><h4
id="Aggregator2-UsingThisPattern">Using This Pattern</h4>
-
-<p>If you would like to use this EIP Pattern then please read the <a
shape="rect" href="getting-started.html">Getting Started</a>, you may also find
the <a shape="rect" href="architecture.html">Architecture</a> useful
particularly the description of <a shape="rect"
href="endpoint.html">Endpoint</a> and <a shape="rect"
href="uris.html">URIs</a>. Then you could try out some of the <a shape="rect"
href="examples.html">Examples</a> first before trying this pattern out.</p><h4
id="Aggregator2-CompletingcurrentgroupdecidedfromtheAggregationStrategy">Completing
current group decided from the AggregationStrategy</h4><p><strong>Available as
of Camel 2.15</strong></p><p>The <code>AggregationStrategy</code> can now
included a property on the returned <code>Exchange</code> that contains a
boolean to indicate if the current group should be completed. This allows to
overrule any existing completion predicates / sizes / timeouts etc, and
complete the group.</p><p>For example the followin
g logic (from an unit test) will complete the group if the message body size
is larger than 5. This is done by setting the property <span
style="line-height: 1.4285715;">Exchange.AGGREGATION_COMPLETE_CURRENT_GROUP to
true.</span></p><div class="code panel pdl" style="border-width: 1px;"><div
class="codeContent panelContent pdl">
-<script class="brush: java; gutter: false; theme: Default"
type="syntaxhighlighter"><![CDATA[ public final class MyCompletionStrategy
implements AggregationStrategy {
+ boolean preComplete(Exchange oldExchange, Exchange
newExchange);</plain-text-body><p>If the preComplete method returns true, then
the existing groups is completed (without aggregating the incoming exchange
(newExchange). And then the newExchange is used to start the correlation group
from scratch so the group would contain only that new incoming exchange. This
is known as pre-completion mode. And when the aggregation is in pre-completion
mode, then only the following completions are in use</p><ul
style="list-style-type: square;"><li>aggregationStrategy must
implement <code>PreCompletionAwareAggregationStrategy</code> xxx</li><li>completionTimeout
or completionInterval can also be used as fallback completions</li><li>any
other completion are not used (such as by size, from batch consumer
etc)</li><li>eagerCheckCompletion is implied as true, but the option has no
effect</li></ul><h3 id="Aggregator2-PersistentAggregationRepository">Persistent
AggregationRepository</h3><p>
The aggregator provides a pluggable repository which you can implement your
own <code>org.apache.camel.spi.AggregationRepository</code>.<br clear="none">
If you need persistent repository then you can use either Camel <a shape="rect"
href="hawtdb.html">HawtDB</a>, <a shape="rect" href="leveldb.html">LevelDB</a>,
or <a shape="rect" href="sql-component.html">SQL Component</a>
components.</p><h3 id="Aggregator2-Examples">Examples</h3><p>See some examples
from the old <a shape="rect" href="aggregator.html">Aggregator</a> which is
somewhat similar to this new aggregator.</p><parameter ac:name="title">Setting
options in Spring XML</parameter><rich-text-body><p>Many of the options are
configurable as attributes on the <code><aggregate></code> tag when using
Spring XML.</p></rich-text-body><h4
id="Aggregator2-UsingcompletionTimeout">Using completionTimeout</h4><p>In this
example we want to aggregate all incoming messages and after 3 seconds of
inactivity we want the aggregation to com
plete. This is done using the <code>completionTimeout</code> option as
shown:<plain-text-body>{snippet:id=e1|lang=java|url=camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateSimpleTimeoutTest.java}</plain-text-body>And
the same example using Spring
XML:<plain-text-body>{snippet:id=e1|lang=xml|url=camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/aggregator/SpringAggregateSimpleTimeoutTest.xml}</plain-text-body></p><h4
id="Aggregator2-UsingTimeoutAwareAggregationStrategy">Using
TimeoutAwareAggregationStrategy</h4><p><strong>Available as of Camel
2.9.2</strong></p><p>If your aggregation strategy implements
<code>TimeoutAwareAggregationStrategy</code>, then Camel will invoke the
<code>timeout</code> method when the timeout occurs. Notice that the values for
index and total parameters will be -1, and the timeout parameter will be
provided only if configured as a fixed value. You must <strong>not</strong>
throw any e
xceptions from the <code>timeout</code> method.</p><h4
id="Aggregator2-UsingCompletionAwareAggregationStrategy">Using
CompletionAwareAggregationStrategy</h4><p><strong>Available as of Camel
2.9.3</strong></p><p>If your aggregation strategy implements
<code>CompletionAwareAggregationStrategy</code>, then Camel will invoke the
<code>onComplete</code> method when the aggregated Exchange is completed. This
allows you to do any last minute custom logic such as to cleanup some
resources, or additional work on the exchange as it's now completed.<br
clear="none"> You must <strong>not</strong> throw any exceptions from the
<code>onCompletion</code> method.</p><h4
id="Aggregator2-UsingcompletionSize">Using completionSize</h4><p>In this
example we want to aggregate all incoming messages and when we have 3 messages
aggregated (in the same correlation group) we want the aggregation to complete.
This is done using the <code>completionSize</code> option as
shown:<plain-text-body>{snippet:id=e1|lan
g=java|url=camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateSimpleSizeTest.java}</plain-text-body>And
the same example using Spring
XML:<plain-text-body>{snippet:id=e1|lang=xml|url=camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/aggregator/SpringAggregateSimpleSizeTest.xml}</plain-text-body></p><h4
id="Aggregator2-UsingcompletionPredicate">Using completionPredicate</h4><p>In
this example we want to aggregate all incoming messages and use a <a
shape="rect" href="predicate.html">Predicate</a> to determine when we are
complete. The <a shape="rect" href="predicate.html">Predicate</a> can be
evaluated using either the aggregated exchange (default) or the incoming
exchange. We will give an example for both situations. We start with the
default situation as
shown:<plain-text-body>{snippet:id=e1|lang=java|url=camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateSimplePredicateTest.java
}</plain-text-body>And the same example using Spring
XML:<plain-text-body>{snippet:id=e1|lang=xml|url=camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/aggregator/SpringAggregateSimplePredicateTest.xml}</plain-text-body>And
the other situation where we use the <code>eagerCheckCompletion</code> option
to tell Camel to use the incoming Exchange. Notice how we can just test in the
completion predicate that the incoming message is the <em>END</em>
message:<plain-text-body>{snippet:id=e1|lang=java|url=camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateSimplePredicateEagerTest.java}</plain-text-body>And
the same example using Spring
XML:<plain-text-body>{snippet:id=e1|lang=xml|url=camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/aggregator/SpringAggregateSimplePredicateEagerTest.xml}</plain-text-body></p><h4
id="Aggregator2-UsingdynamiccompletionTimeout">Using dynamic completion
Timeout</h4><p>In this example we want to aggregate all incoming messages and
after a period of inactivity we want the aggregation to complete. The period
should be computed at runtime based on the <code>timeout</code> header in the
incoming messages. This is done using the <code>completionTimeout</code> option
as
shown:<plain-text-body>{snippet:id=e1|lang=java|url=camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateExpressionTimeoutTest.java}</plain-text-body>And
the same example using Spring
XML:<plain-text-body>{snippet:id=e1|lang=xml|url=camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/aggregator/SpringAggregateExpressionTimeoutTest.xml}</plain-text-body><strong>Note:</strong>
You can also add a fixed timeout value and Camel will fallback to use this
value if the dynamic value was <code>null</code> or <code>0</code>.</p><h4
id="Aggregator2-UsingdynamiccompletionSize">Using dynamic
completionSize</h4><p>In th
is example we want to aggregate all incoming messages based on a dynamic size
per correlation key. The size is computed at runtime based on the
<code>mySize</code> header in the incoming messages. This is done using the
<code>completionSize</code> option as
shown:<plain-text-body>{snippet:id=e1|lang=java|url=camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateExpressionSizeTest.java}</plain-text-body>And
the same example using Spring
XML:<plain-text-body>{snippet:id=e1|lang=xml|url=camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/aggregator/SpringAggregateExpressionSizeTest.xml}</plain-text-body><strong>Note:</strong>
You can also add a fixed size value and Camel will fallback to use this value
if the dynamic value was <code>null</code> or <code>0</code>.</p><p><parameter
ac:name=""><a shape="rect" href="using-this-pattern.html">Using This
Pattern</a></parameter></p><h4 id="Aggregator2-Completingcurrentgroupdeci
dedfromtheAggregationStrategy">Completing current group decided from the
AggregationStrategy</h4><p><strong>Available as of Camel
2.15</strong></p><p>The <code>AggregationStrategy</code> can now included
a property on the returned <code>Exchange</code> that contains a boolean
to indicate if the current group should be completed. This allows to overrule
any existing completion predicates / sizes / timeouts etc, and complete the
group.</p><p>For example the following logic (from an unit test) will complete
the group if the message body size is larger than 5. This is done by setting
the property <span style="line-height:
1.4285715;">Exchange.AGGREGATION_COMPLETE_CURRENT_GROUP to
true.</span></p><plain-text-body> public final class MyCompletionStrategy
implements AggregationStrategy {
@Override
public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
if (oldExchange == null) {
return newExchange;
}
- String body = oldExchange.getIn().getBody(String.class) +
"+"
+ String body = oldExchange.getIn().getBody(String.class) + "+"
+ newExchange.getIn().getBody(String.class);
oldExchange.getIn().setBody(body);
if (body.length() >= 5) {
@@ -324,183 +130,125 @@ from("direct:start")
}
return oldExchange;
}
- }]]></script>
-</div></div><p> </p><h4
id="Aggregator2-ManuallyForcetheCompletionofAllAggregatedExchangesImmediately">Manually
Force the Completion of All Aggregated Exchanges
Immediately</h4><p><strong>Available as of Camel 2.9</strong><br clear="none">
You can manually trigger completion of all current aggregated exchanges by
sending a message containing the header
Exchange.AGGREGATION_COMPLETE_ALL_GROUPS set to true. The message is considered
a signal message only, the message headers/contents will not be processed
otherwise.</p><p><strong>Available as of Camel 2.11</strong><br clear="none">
You can alternatively set the header
Exchange.AGGREGATION_COMPLETE_ALL_GROUPS_INCLUSIVE to true to trigger
completion of all groups after processing the current message.</p><h4
id="Aggregator2-UsingaList<V>inAggregationStrategy">Using a List<V>
in AggregationStrategy</h4><p><strong>Available as of Camel
2.11</strong></p><p>If you want to aggregate some value from the messages
<V> into
a List<V> then we have added a
<code>org.apache.camel.processor.aggregate.AbstractListAggregationStrategy</code>
abstract class in <strong>Camel 2.11</strong> that makes this easier. The
completed Exchange that is sent out of the aggregator will contain the
List<V> in the message body.</p><p>For example to aggregate a
List<Integer> you can extend this class as shown below, and implement the
<code>getValue</code> method:</p><div class="code panel pdl"
style="border-width: 1px;"><div class="codeContent panelContent pdl">
-<script class="brush: java; gutter: false; theme: Default"
type="syntaxhighlighter"><![CDATA[
-/**
- * Our strategy just group a list of integers.
- */
-public final class MyListOfNumbersStrategy extends
AbstractListAggregationStrategy<Integer> {
-
- @Override
- public Integer getValue(Exchange exchange) {
- // the message body contains a number, so just return that as-is
- return exchange.getIn().getBody(Integer.class);
- }
-}
-]]></script>
-</div></div><h3 id="Aggregator2-UsingAggregateController">Using
AggregateController</h3><p><strong>Available as of Camel
2.16</strong></p><p>The <code>org.apache.camel.processor.aggregate.AggregateController</code>
allows you to control the aggregate at runtime using Java or JMX API. This can
be used to force completing groups of exchanges, or query its current runtime
statistics.</p><p>The aggregator provides a default implementation if no custom
have been configured, which can be accessed
using <code>getAggregateController()</code> method. Though it may be
easier to configure a controller in the route using aggregateController as
shown below:</p><div class="code panel pdl" style="border-width: 1px;"><div
class="codeContent panelContent pdl">
-<script class="brush: java; gutter: false; theme: Default"
type="syntaxhighlighter"><![CDATA[private AggregateController controller = new
DefaultAggregateController();
-Â
-from("direct:start")
- .aggregate(header("id"), new
MyAggregationStrategy()).completionSize(10).id("myAggregator")
+ }</plain-text-body><p> </p><h4
id="Aggregator2-ManuallyForcetheCompletionofAllAggregatedExchangesImmediately">Manually
Force the Completion of All Aggregated Exchanges
Immediately</h4><p><strong>Available as of Camel 2.9</strong><br clear="none">
You can manually trigger completion of all current aggregated exchanges by
sending a message containing the header
Exchange.AGGREGATION_COMPLETE_ALL_GROUPS set to true. The message is considered
a signal message only, the message headers/contents will not be processed
otherwise.</p><p><strong>Available as of Camel 2.11</strong><br clear="none">
You can alternatively set the header
Exchange.AGGREGATION_COMPLETE_ALL_GROUPS_INCLUSIVE to true to trigger
completion of all groups after processing the current message.</p><h4
id="Aggregator2-UsingaList<V>inAggregationStrategy">Using a List<V>
in AggregationStrategy</h4><p><strong>Available as of Camel
2.11</strong></p><p>If you want to aggregate some value from the messages <
;V> into a List<V> then we have added a
<code>org.apache.camel.processor.aggregate.AbstractListAggregationStrategy</code>
abstract class in <strong>Camel 2.11</strong> that makes this easier. The
completed Exchange that is sent out of the aggregator will contain the
List<V> in the message body.</p><p>For example to aggregate a
List<Integer> you can extend this class as shown below, and implement the
<code>getValue</code>
method:<plain-text-body>{snippet:id=e1|lang=java|url=camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/CustomListAggregationStrategyTest.java}</plain-text-body></p><h3
id="Aggregator2-UsingAggregateController">Using
AggregateController</h3><p><strong>Available as of Camel
2.16</strong></p><p>The <code>org.apache.camel.processor.aggregate.AggregateController</code>
allows you to control the aggregate at runtime using Java or JMX API. This can
be used to force completing groups of exchanges, or query its current runti
me statistics.</p><p>The aggregator provides a default implementation if no
custom have been configured, which can be accessed
using <code>getAggregateController()</code> method. Though it may be
easier to configure a controller in the route using aggregateController as
shown below:</p><plain-text-body>private AggregateController controller = new
DefaultAggregateController();
+ 
+from("direct:start")
+ .aggregate(header("id"), new
MyAggregationStrategy()).completionSize(10).id("myAggregator")
.aggregateController(controller)
- .to("mock:aggregated");]]></script>
-</div></div><p>Then there is API on AggregateController to force completion.
For example to complete a group with key foo</p><div class="code panel pdl"
style="border-width: 1px;"><div class="codeContent panelContent pdl">
-<script class="brush: java; gutter: false; theme: Default"
type="syntaxhighlighter"><![CDATA[int groups =
controller.forceCompletionOfGroup("foo");]]></script>
-</div></div><p>The number return would be the number of groups completed. In
this case it would be 1 if the foo group existed and was completed. If foo does
not exists then 0 is returned.</p><p>There is also an api to complete all
groups</p><div class="code panel pdl" style="border-width: 1px;"><div
class="codeContent panelContent pdl">
-<script class="brush: java; gutter: false; theme: Default"
type="syntaxhighlighter"><![CDATA[int groups =
controller.forceCompletionOfAllGroups();]]></script>
-</div></div><p> </p><p>To configure this from XML DSL</p><div class="code
panel pdl" style="border-width: 1px;"><div class="codeContent panelContent pdl">
-<script class="brush: java; gutter: false; theme: Default"
type="syntaxhighlighter"><![CDATA[<bean id="myController"
class="org.apache.camel.processor.aggregate.DefaultAggregateController"/>
-Â
- <camelContext xmlns="http://camel.apache.org/schema/spring">
+ .to("mock:aggregated");</plain-text-body><p>Then there is API on
AggregateController to force completion. For example to complete a group with
key foo</p><plain-text-body>int groups =
controller.forceCompletionOfGroup("foo");</plain-text-body><p>The number return
would be the number of groups completed. In this case it would be 1 if the foo
group existed and was completed. If foo does not exists then 0 is
returned.</p><p>There is also an api to complete all
groups</p><plain-text-body>int groups =
controller.forceCompletionOfAllGroups();</plain-text-body><p> </p><p>To
configure this from XML DSL</p><plain-text-body><bean id="myController"
class="org.apache.camel.processor.aggregate.DefaultAggregateController"/>
+ 
+ <camelContext xmlns="http://camel.apache.org/schema/spring">
<route>
- <from uri="direct:start"/>
- <aggregate strategyRef="myAppender"
completionSize="10"
aggregateControllerRef="myController">
+ <from uri="direct:start"/>
+ <aggregate strategyRef="myAppender" completionSize="10"
aggregateControllerRef="myController">
<correlationExpression>
<header>id</header>
</correlationExpression>
- <to uri="mock:result"/>
+ <to uri="mock:result"/>
</aggregate>
</route>
- </camelContext>]]></script>
-</div></div><p> </p><p>There is also JMX API on the aggregator which is
available under the processors node in the Camel JMX tree.</p><p> </p><h3
id="Aggregator2-UsingGroupedExchanges">Using GroupedExchanges</h3><p>In the
route below we group all the exchanges together using
<code>groupExchanges()</code>:</p><div class="code panel pdl"
style="border-width: 1px;"><div class="codeContent panelContent pdl">
-<script class="brush: java; gutter: false; theme: Default"
type="syntaxhighlighter"><![CDATA[ from("direct:start")
+ </camelContext></plain-text-body><p> </p><p>There is also JMX
API on the aggregator which is available under the processors node in the Camel
JMX tree.</p><p> </p><h3 id="Aggregator2-UsingGroupedExchanges">Using
GroupedExchanges</h3><p>In the route below we group all the exchanges together
using <code>groupExchanges()</code>:</p><plain-text-body>
from("direct:start")
// aggregate all using same expression
.aggregate(constant(true))
// wait for 0.5 seconds to aggregate
.completionTimeout(500L)
// group the exchanges so we get one single exchange
containing all the others
.groupExchanges()
- .to("mock:result");
-]]></script>
-</div></div><p>As a result we have one outgoing <a shape="rect"
href="exchange.html">Exchange</a> being routed the the "mock:result" endpoint.
The exchange is a holder containing all the incoming Exchanges.<br
clear="none"> To get access to these exchanges you need to access them from a
property on the outgoing exchange as shown:</p><div class="code panel pdl"
style="border-width: 1px;"><div class="codeContent panelContent pdl">
-<script class="brush: java; gutter: false; theme: Default"
type="syntaxhighlighter"><![CDATA[List<Exchange> grouped =
out.getProperty(Exchange.GROUPED_EXCHANGE, List.class);
-]]></script>
-</div></div><p>From <strong>Camel 2.13</strong> onwards this behavior has
changed to store these exchanges directly on the message body which is more
intuitive:</p><div class="code panel pdl" style="border-width: 1px;"><div
class="codeContent panelContent pdl">
-<script class="brush: java; gutter: false; theme: Default"
type="syntaxhighlighter"><![CDATA[List<Exchange> grouped =
exchange.getIn().getBody(List.class);
-]]></script>
-</div></div><div class="confluence-information-macro
confluence-information-macro-information"><span class="aui-icon aui-icon-small
aui-iconfont-info confluence-information-macro-icon"></span><div
class="confluence-information-macro-body"><p>Notice the old way using the
property is still present in <strong>Camel 2.13</strong> onwards, but its
considered deprecated and to be removed in Camel 3.0
onwards.</p></div></div><h3
id="Aggregator2-UsingPOJOsasAggregationStrategy">Using POJOs as
AggregationStrategy</h3><p><strong>Available as of Camel 2.12</strong></p><div
class="confluence-information-macro confluence-information-macro-tip"><span
class="aui-icon aui-icon-small aui-iconfont-approve
confluence-information-macro-icon"></span><div
class="confluence-information-macro-body"><p>You can use POJOs as
AggregationStrategy with the other <a shape="rect" href="eip.html">EIP</a>s
that supports aggregation, such as <a shape="rect"
href="splitter.html">Splitter</a>, <a shape="rect" href="rec
ipient-list.html">Recipient List</a>, etc.</p></div></div><p>To use the
<code>AggregationStrategy</code> you had to implement the
<code>org.apache.camel.processor.aggregate.AggregationStrategy</code>
interface, which means your logic would be tied to the Camel API. From
<strong>Camel 2.12</strong> onwards you can use a POJO for the logic and let
Camel adapt to your POJO. To use a POJO a convention must be followed:</p><ul
class="alternate"><li>there must be a public method to use</li><li>the method
must not be void</li><li>the method can be static or non-static</li><li>the
method must have 2 or more parameters</li><li>the parameters is paired so the
first 50% is applied to the <code>oldExchange</code> and the reminder 50% is
for the <code>newExchange</code></li><li>.. meaning that there must be an equal
number of parameters, eg 2, 4, 6 etc.</li></ul><p>The paired methods is
expected to be ordered as follows:</p><ul class="alternate"><li>the first
parameter is the message body</li><l
i>the 2nd parameter is a Map of the headers</li><li>the 3rd parameter is a Map
of the Exchange properties</li></ul><p>This convention is best explained with
some examples.</p><p>In the method below, we have only 2 parameters, so the 1st
parameter is the body of the <code>oldExchange</code>, and the 2nd is paired to
the body of the <code>newExchange</code>:</p><div class="code panel pdl"
style="border-width: 1px;"><div class="codeContent panelContent pdl">
-<script class="brush: java; gutter: false; theme: Default"
type="syntaxhighlighter"><![CDATA[public String append(String existing, String
next) {
+ .to("mock:result");
+</plain-text-body><p>As a result we have one outgoing <a shape="rect"
href="exchange.html">Exchange</a> being routed the the "mock:result" endpoint.
The exchange is a holder containing all the incoming Exchanges.<br
clear="none"> To get access to these exchanges you need to access them from a
property on the outgoing exchange as
shown:</p><plain-text-body>List<Exchange> grouped =
out.getProperty(Exchange.GROUPED_EXCHANGE, List.class);
+</plain-text-body><p>From <strong>Camel 2.13</strong> onwards this behavior
has changed to store these exchanges directly on the message body which is more
intuitive:</p><plain-text-body>List<Exchange> grouped =
exchange.getIn().getBody(List.class);
[... 172 lines stripped ...]