Author: buildbot
Date: Fri Mar 18 10:20:48 2016
New Revision: 983097
Log:
Production update by buildbot for camel
Modified:
websites/production/camel/content/book-component-appendix.html
websites/production/camel/content/book-in-one-page.html
websites/production/camel/content/cache/main.pageCache
websites/production/camel/content/camel-2170-release.html
websites/production/camel/content/sql-component.html
Modified: websites/production/camel/content/book-component-appendix.html
==============================================================================
--- websites/production/camel/content/book-component-appendix.html (original)
+++ websites/production/camel/content/book-component-appendix.html Fri Mar 18
10:20:48 2016
@@ -1016,11 +1016,11 @@ template.send("direct:alias-verify&
]]></script>
</div></div><p></p><h3 id="BookComponentAppendix-SeeAlso.8">See Also</h3>
<ul><li><a shape="rect" href="configuring-camel.html">Configuring
Camel</a></li><li><a shape="rect"
href="component.html">Component</a></li><li><a shape="rect"
href="endpoint.html">Endpoint</a></li><li><a shape="rect"
href="getting-started.html">Getting Started</a></li></ul><ul><li><a
shape="rect" href="crypto.html">Crypto</a> Crypto is also available as a <a
shape="rect" href="data-format.html">Data Format</a></li></ul> <h2
id="BookComponentAppendix-CXFComponent">CXF Component</h2><div
class="confluence-information-macro confluence-information-macro-note"><span
class="aui-icon aui-icon-small aui-iconfont-warning
confluence-information-macro-icon"></span><div
class="confluence-information-macro-body"><p>When using CXF as a consumer, the
<a shape="rect" href="cxf-bean-component.html">CXF Bean Component</a> allows
you to factor out how message payloads are received from their processing as a
RESTful or SOAP web service. This has the potential of using a multitude of
transports to cons
ume web services. The bean component's configuration is also simpler and
provides the fastest method to implement web services using Camel and
CXF.</p></div></div><div class="confluence-information-macro
confluence-information-macro-tip"><span class="aui-icon aui-icon-small
aui-iconfont-approve confluence-information-macro-icon"></span><div
class="confluence-information-macro-body"><p>When using CXF in streaming modes
(see DataFormat option), then also read about <a shape="rect"
href="stream-caching.html">Stream caching</a>.</p></div></div><p>The
<strong>cxf:</strong> component provides integration with <a shape="rect"
href="http://cxf.apache.org">Apache CXF</a> for connecting to JAX-WS services
hosted in CXF.</p><p><style type="text/css">/*<![CDATA[*/
-div.rbtoc1458073132389 {padding: 0px;}
-div.rbtoc1458073132389 ul {list-style: disc;margin-left: 0px;}
-div.rbtoc1458073132389 li {margin-left: 0px;padding-left: 0px;}
+div.rbtoc1458296278355 {padding: 0px;}
+div.rbtoc1458296278355 ul {list-style: disc;margin-left: 0px;}
+div.rbtoc1458296278355 li {margin-left: 0px;padding-left: 0px;}
-/*]]>*/</style></p><div class="toc-macro rbtoc1458073132389">
+/*]]>*/</style></p><div class="toc-macro rbtoc1458296278355">
<ul class="toc-indentation"><li><a shape="rect"
href="#BookComponentAppendix-CXFComponent">CXF Component</a>
<ul class="toc-indentation"><li><a shape="rect"
href="#BookComponentAppendix-URIformat">URI format</a></li><li><a shape="rect"
href="#BookComponentAppendix-Options">Options</a>
<ul class="toc-indentation"><li><a shape="rect"
href="#BookComponentAppendix-Thedescriptionsofthedataformats">The descriptions
of the dataformats</a>
@@ -10246,136 +10246,53 @@ assertEquals("Linux", row.get(
.setBody(constant("ASF"))
.setProperty("min", constant(123))
.to("sql:select * from projects where license = :#${body} and id >
:#${property.min} order by id")]]></script>
-</div></div><h3
id="BookComponentAppendix-UsingtheJDBCbasedidempotentrepository">Using the JDBC
based idempotent repository</h3><p><strong>Available as of Camel 2.7</strong>:
In this section we will use the JDBC based idempotent repository.</p><div
class="confluence-information-macro confluence-information-macro-tip"><p
class="title">Abstract class</p><span class="aui-icon aui-icon-small
aui-iconfont-approve confluence-information-macro-icon"></span><div
class="confluence-information-macro-body"><p>From Camel 2.9 onwards there is an
abstract class
<code>org.apache.camel.processor.idempotent.jdbc.AbstractJdbcMessageIdRepository</code>
you can extend to build custom JDBC idempotent
repository.</p></div></div><p>First we have to create the database table which
will be used by the idempotent repository. For <strong>Camel 2.7</strong>, we
use the following schema:</p><div class="code panel pdl" style="border-width:
1px;"><div class="codeContent panelContent pdl">
-<script class="brush: sql; gutter: false; theme: Default"
type="syntaxhighlighter"><![CDATA[CREATE TABLE CAMEL_MESSAGEPROCESSED (
- processorName VARCHAR(255),
- messageId VARCHAR(100)
-)
-]]></script>
-</div></div><p>In <strong>Camel 2.8</strong>, we added the createdAt
column:</p><div class="code panel pdl" style="border-width: 1px;"><div
class="codeContent panelContent pdl">
-<script class="brush: sql; gutter: false; theme: Default"
type="syntaxhighlighter"><![CDATA[CREATE TABLE CAMEL_MESSAGEPROCESSED (
- processorName VARCHAR(255),
- messageId VARCHAR(100),
- createdAt TIMESTAMP
-)
-]]></script>
-</div></div><div class="confluence-information-macro
confluence-information-macro-warning"><span class="aui-icon aui-icon-small
aui-iconfont-error confluence-information-macro-icon"></span><div
class="confluence-information-macro-body">The SQL
Server <strong>TIMESTAMP</strong> type is a fixed-length binary-string
type. It does not map to any of the JDBC time types: <strong>DATE</strong>,
<strong>TIME</strong>, or
<strong>TIMESTAMP</strong>.</div></div><p> </p><p>We recommend to have a
unique constraint on the columns processorName and messageId. Because the
syntax for this constraint differs for database to database, we do not show it
here.</p><p>Second we need to setup a <code>javax.sql.DataSource</code> in the
spring XML file:</p><div class="code panel pdl" style="border-width: 1px;"><div
class="codeContent panelContent pdl">
-<script class="brush: xml; gutter: false; theme: Default"
type="syntaxhighlighter"><![CDATA[
-<jdbc:embedded-database id="dataSource" type="DERBY"
/>
-]]></script>
-</div></div>And finally we can create our JDBC idempotent repository in the
spring XML file as well:<div class="code panel pdl" style="border-width:
1px;"><div class="codeContent panelContent pdl">
-<script class="brush: xml; gutter: false; theme: Default"
type="syntaxhighlighter"><![CDATA[
-<bean id="messageIdRepository"
class="org.apache.camel.processor.idempotent.jdbc.JdbcMessageIdRepository">
- <constructor-arg ref="dataSource" />
- <constructor-arg value="myProcessorName" />
-</bean>
-
-<camel:camelContext>
- <camel:errorHandler id="deadLetterChannel"
type="DeadLetterChannel" deadLetterUri="mock:error">
- <camel:redeliveryPolicy maximumRedeliveries="0"
maximumRedeliveryDelay="0" logStackTrace="false" />
- </camel:errorHandler>
-
- <camel:route id="JdbcMessageIdRepositoryTest"
errorHandlerRef="deadLetterChannel">
- <camel:from uri="direct:start" />
- <camel:idempotentConsumer
messageIdRepositoryRef="messageIdRepository">
- <camel:header>messageId</camel:header>
- <camel:to uri="mock:result" />
- </camel:idempotentConsumer>
- </camel:route>
-</camel:camelContext>
-]]></script>
-</div></div><h4
id="BookComponentAppendix-CustomizetheJdbcMessageIdRepository">Customize the
JdbcMessageIdRepository</h4><p>Starting with <strong>Camel 2.9.1</strong> you
have a few options to tune the
<code>org.apache.camel.processor.idempotent.jdbc.JdbcMessageIdRepository</code>
for your needs:</p><div class="table-wrap"><table
class="confluenceTable"><tbody><tr><th colspan="1" rowspan="1"
class="confluenceTh"><p>Parameter</p></th><th colspan="1" rowspan="1"
class="confluenceTh"><p>Default Value</p></th><th colspan="1" rowspan="1"
class="confluenceTh"><p>Description</p></th></tr><tr><td colspan="1"
rowspan="1" class="confluenceTd"><p>createTableIfNotExists</p></td><td
colspan="1" rowspan="1" class="confluenceTd"><p>true</p></td><td colspan="1"
rowspan="1" class="confluenceTd"><p>Defines whether or not Camel should try to
create the table if it doesn't exist.</p></td></tr><tr><td colspan="1"
rowspan="1" class="confluenceTd"><p>tableExistsString</p></td><td colspan="1"
rowspan="1" c
lass="confluenceTd"><p>SELECT 1 FROM CAMEL_MESSAGEPROCESSED WHERE 1 =
0</p></td><td colspan="1" rowspan="1" class="confluenceTd"><p>This query is
used to figure out whether the table already exists or not. It must throw an
exception to indicate the table doesn't exist.</p></td></tr><tr><td colspan="1"
rowspan="1" class="confluenceTd"><p>createString</p></td><td colspan="1"
rowspan="1" class="confluenceTd"><p>CREATE TABLE CAMEL_MESSAGEPROCESSED
(processorName VARCHAR(255), messageId VARCHAR(100), createdAt
TIMESTAMP)</p></td><td colspan="1" rowspan="1" class="confluenceTd"><p>The
statement which is used to create the table.</p></td></tr><tr><td colspan="1"
rowspan="1" class="confluenceTd"><p>queryString</p></td><td colspan="1"
rowspan="1" class="confluenceTd"><p>SELECT COUNT(*) FROM CAMEL_MESSAGEPROCESSED
WHERE processorName = ? AND messageId = ?</p></td><td colspan="1" rowspan="1"
class="confluenceTd"><p>The query which is used to figure out whether the
message already exists in the
repository (the result is not equals to '0'). It takes two parameters. This
first one is the processor name (<code>String</code>) and the second one is the
message id (<code>String</code>).</p></td></tr><tr><td colspan="1" rowspan="1"
class="confluenceTd"><p>insertString</p></td><td colspan="1" rowspan="1"
class="confluenceTd"><p>INSERT INTO CAMEL_MESSAGEPROCESSED (processorName,
messageId, createdAt) VALUES (?, ?, ?)</p></td><td colspan="1" rowspan="1"
class="confluenceTd"><p>The statement which is used to add the entry into the
table. It takes three parameter. The first one is the processor name
(<code>String</code>), the second one is the message id (<code>String</code>)
and the third one is the timestamp (<code>java.sql.Timestamp</code>) when this
entry was added to the repository.</p></td></tr><tr><td colspan="1" rowspan="1"
class="confluenceTd"><p>deleteString</p></td><td colspan="1" rowspan="1"
class="confluenceTd"><p>DELETE FROM CAMEL_MESSAGEPROCESSED WHERE processorName
=
? AND messageId = ?</p></td><td colspan="1" rowspan="1"
class="confluenceTd"><p>The statement which is used to delete the entry from
the database. It takes two parameter. This first one is the processor name
(<code>String</code>) and the second one is the message id
(<code>String</code>).</p></td></tr></tbody></table></div><p>A customized
<code>org.apache.camel.processor.idempotent.jdbc.JdbcMessageIdRepository</code>
could look like:</p><div class="code panel pdl" style="border-width: 1px;"><div
class="codeContent panelContent pdl">
-<script class="brush: xml; gutter: false; theme: Default"
type="syntaxhighlighter"><![CDATA[
-<bean id="messageIdRepository"
class="org.apache.camel.processor.idempotent.jdbc.JdbcMessageIdRepository">
- <constructor-arg ref="dataSource" />
- <constructor-arg value="myProcessorName" />
- <property name="tableExistsString" value="SELECT 1
FROM CUSTOMIZED_MESSAGE_REPOSITORY WHERE 1 = 0" />
- <property name="createString" value="CREATE TABLE
CUSTOMIZED_MESSAGE_REPOSITORY (processorName VARCHAR(255), messageId
VARCHAR(100), createdAt TIMESTAMP)" />
- <property name="queryString" value="SELECT COUNT(*)
FROM CUSTOMIZED_MESSAGE_REPOSITORY WHERE processorName = ? AND messageId =
?" />
- <property name="insertString" value="INSERT INTO
CUSTOMIZED_MESSAGE_REPOSITORY (processorName, messageId, createdAt) VALUES (?,
?, ?)" />
- <property name="deleteString" value="DELETE FROM
CUSTOMIZED_MESSAGE_REPOSITORY WHERE processorName = ? AND messageId = ?"
/>
-</bean>
-]]></script>
-</div></div><h3
id="BookComponentAppendix-UsingtheJDBCbasedaggregationrepository">Using the
JDBC based aggregation repository</h3><p><strong>Available as of Camel
2.6</strong></p><div class="confluence-information-macro
confluence-information-macro-information"><p class="title">Using
JdbcAggregationRepository in Camel 2.6</p><span class="aui-icon aui-icon-small
aui-iconfont-info confluence-information-macro-icon"></span><div
class="confluence-information-macro-body"><p>In Camel 2.6, the
JdbcAggregationRepository is provided in the <code>camel-jdbc-aggregator</code>
component. From Camel 2.7 onwards, the <code>JdbcAggregationRepository</code>
is provided in the <code>camel-sql</code>
component.</p></div></div><p><code>JdbcAggregationRepository</code> is an
<code>AggregationRepository</code> which on the fly persists the aggregated
messages. This ensures that you will not loose messages, as the default
aggregator will use an in memory only <code>AggregationRepository</code>.<br
clear=
"none"> The <code>JdbcAggregationRepository</code> allows together with Camel
to provide persistent support for the <a shape="rect"
href="aggregator2.html">Aggregator</a>.</p><p>It has the following
options:</p><div class="table-wrap"><table
class="confluenceTable"><tbody><tr><th colspan="1" rowspan="1"
class="confluenceTh"><p>Option</p></th><th colspan="1" rowspan="1"
class="confluenceTh"><p>Type</p></th><th colspan="1" rowspan="1"
class="confluenceTh"><p>Description</p></th></tr><tr><td colspan="1"
rowspan="1" class="confluenceTd"><p><code>dataSource</code></p></td><td
colspan="1" rowspan="1"
class="confluenceTd"><p><code>DataSource</code></p></td><td colspan="1"
rowspan="1" class="confluenceTd"><p><strong>Mandatory:</strong> The
<code>javax.sql.DataSource</code> to use for accessing the
database.</p></td></tr><tr><td colspan="1" rowspan="1"
class="confluenceTd"><p><code>repositoryName</code></p></td><td colspan="1"
rowspan="1" class="confluenceTd"><p><code>String</code></p></td><
td colspan="1" rowspan="1" class="confluenceTd"><p><strong>Mandatory:</strong>
The name of the repository.</p></td></tr><tr><td colspan="1" rowspan="1"
class="confluenceTd"><p><code>transactionManager</code></p></td><td colspan="1"
rowspan="1" class="confluenceTd"><p><code>TransactionManager</code></p></td><td
colspan="1" rowspan="1" class="confluenceTd"><p><strong>Mandatory:</strong> The
<code>org.springframework.transaction.PlatformTransactionManager</code> to
mange transactions for the database. The TransactionManager must be able to
support databases.</p></td></tr><tr><td colspan="1" rowspan="1"
class="confluenceTd"><p><code>lobHandler</code></p></td><td colspan="1"
rowspan="1" class="confluenceTd"><p><code>LobHandler</code></p></td><td
colspan="1" rowspan="1" class="confluenceTd"><p>A
<code>org.springframework.jdbc.support.lob.LobHandler</code> to handle Lob
types in the database. Use this option to use a vendor specific LobHandler, for
example when using Oracle.</p></td></tr><
tr><td colspan="1" rowspan="1"
class="confluenceTd"><p><code>returnOldExchange</code></p></td><td colspan="1"
rowspan="1" class="confluenceTd"><p>boolean</p></td><td colspan="1" rowspan="1"
class="confluenceTd"><p>Whether the get operation should return the old
existing Exchange if any existed. By default this option is <code>false</code>
to optimize as we do not need the old exchange when
aggregating.</p></td></tr><tr><td colspan="1" rowspan="1"
class="confluenceTd"><p><code>useRecovery</code></p></td><td colspan="1"
rowspan="1" class="confluenceTd"><p>boolean</p></td><td colspan="1" rowspan="1"
class="confluenceTd"><p>Whether or not recovery is enabled. This option is by
default <code>true</code>. When enabled the Camel <a shape="rect"
href="aggregator2.html">Aggregator</a> automatic recover failed aggregated
exchange and have them resubmitted.</p></td></tr><tr><td colspan="1"
rowspan="1" class="confluenceTd"><p><code>recoveryInterval</code></p></td><td
colspan="1" rowspan="1" cla
ss="confluenceTd"><p>long</p></td><td colspan="1" rowspan="1"
class="confluenceTd"><p>If recovery is enabled then a background task is run
every x'th time to scan for failed exchanges to recover and resubmit. By
default this interval is 5000 millis.</p></td></tr><tr><td colspan="1"
rowspan="1"
class="confluenceTd"><p><code>maximumRedeliveries</code></p></td><td
colspan="1" rowspan="1" class="confluenceTd"><p>int</p></td><td colspan="1"
rowspan="1" class="confluenceTd"><p>Allows you to limit the maximum number of
redelivery attempts for a recovered exchange. If enabled then the Exchange will
be moved to the dead letter channel if all redelivery attempts failed. By
default this option is disabled. If this option is used then the
<code>deadLetterUri</code> option must also be provided.</p></td></tr><tr><td
colspan="1" rowspan="1"
class="confluenceTd"><p><code>deadLetterUri</code></p></td><td colspan="1"
rowspan="1" class="confluenceTd"><p>String</p></td><td colspan="1" rowspan="1"
clas
s="confluenceTd"><p>An endpoint uri for a <a shape="rect"
href="dead-letter-channel.html">Dead Letter Channel</a> where exhausted
recovered Exchanges will be moved. If this option is used then the
<code>maximumRedeliveries</code> option must also be
provided.</p></td></tr><tr><td colspan="1" rowspan="1"
class="confluenceTd"><p><code>storeBodyAsText</code></p></td><td colspan="1"
rowspan="1" class="confluenceTd"><p>boolean</p></td><td colspan="1" rowspan="1"
class="confluenceTd"><p><strong>Camel 2.11:</strong> Whether to store the
message body as String which is human readable. By default this option is
<code>false</code> storing the body in binary format.</p></td></tr><tr><td
colspan="1" rowspan="1"
class="confluenceTd"><p><code>headersToStoreAsText</code></p></td><td
colspan="1" rowspan="1"
class="confluenceTd"><p><code>List<String></code></p></td><td colspan="1"
rowspan="1" class="confluenceTd"><p><strong>Camel 2.11:</strong> Allows to
store headers as String which is human
readable. By default this option is disabled, storing the headers in binary
format.</p></td></tr><tr><td colspan="1" rowspan="1"
class="confluenceTd"><p><code>optimisticLocking</code></p></td><td colspan="1"
rowspan="1" class="confluenceTd"><p><code>false</code></p></td><td colspan="1"
rowspan="1" class="confluenceTd"><p><strong>Camel 2.12:</strong> To turn on
optimistic locking, which often would be needed in clustered environments where
multiple Camel applications shared the same JDBC based aggregation
repository.</p></td></tr><tr><td colspan="1" rowspan="1"
class="confluenceTd"><p><code>jdbcOptimisticLockingExceptionMapper</code></p></td><td
colspan="1" rowspan="1" class="confluenceTd"><p> </p></td><td colspan="1"
rowspan="1" class="confluenceTd"><p><strong>Camel 2.12:</strong> Allows to
plugin a custom
<code>org.apache.camel.processor.aggregate.jdbc.JdbcOptimisticLockingExceptionMapper</code>
to map vendor specific error codes to an optimistick locking error, for Camel
to p
erform a retry. This requires <code>optimisticLocking</code> to be
enabled.</p></td></tr></tbody></table></div><h4
id="BookComponentAppendix-Whatispreservedwhenpersisting">What is preserved when
persisting</h4><p><code>JdbcAggregationRepository</code> will only preserve any
<code>Serializable</code> compatible data types. If a data type is not such a
type its dropped and a <code>WARN</code> is logged. And it only persists the
<code>Message</code> body and the <code>Message</code> headers. The
<code>Exchange</code> properties are <strong>not</strong> persisted.</p><p>From
Camel 2.11 onwards you can store the message body and select(ed) headers as
String in separate columns.</p><h4
id="BookComponentAppendix-Recovery">Recovery</h4><p>The
<code>JdbcAggregationRepository</code> will by default recover any failed <a
shape="rect" href="exchange.html">Exchange</a>. It does this by having a
background tasks that scans for failed <a shape="rect"
href="exchange.html">Exchange</a>s in the persi
stent store. You can use the <code>checkInterval</code> option to set how
often this task runs. The recovery works as transactional which ensures that
Camel will try to recover and redeliver the failed <a shape="rect"
href="exchange.html">Exchange</a>. Any <a shape="rect"
href="exchange.html">Exchange</a> which was found to be recovered will be
restored from the persistent store and resubmitted and send out
again.</p><p>The following headers is set when an <a shape="rect"
href="exchange.html">Exchange</a> is being recovered/redelivered:</p><div
class="table-wrap"><table class="confluenceTable"><tbody><tr><th colspan="1"
rowspan="1" class="confluenceTh"><p>Header</p></th><th colspan="1" rowspan="1"
class="confluenceTh"><p>Type</p></th><th colspan="1" rowspan="1"
class="confluenceTh"><p>Description</p></th></tr><tr><td colspan="1"
rowspan="1"
class="confluenceTd"><p><code>Exchange.REDELIVERED</code></p></td><td
colspan="1" rowspan="1" class="confluenceTd"><p>Boolean</p></td><td colspa
n="1" rowspan="1" class="confluenceTd"><p>Is set to true to indicate the <a
shape="rect" href="exchange.html">Exchange</a> is being
redelivered.</p></td></tr><tr><td colspan="1" rowspan="1"
class="confluenceTd"><p><code>Exchange.REDELIVERY_COUNTER</code></p></td><td
colspan="1" rowspan="1" class="confluenceTd"><p>Integer</p></td><td colspan="1"
rowspan="1" class="confluenceTd"><p>The redelivery attempt, starting from
1.</p></td></tr></tbody></table></div><p>Only when an <a shape="rect"
href="exchange.html">Exchange</a> has been successfully processed it will be
marked as complete which happens when the <code>confirm</code> method is
invoked on the <code>AggregationRepository</code>. This means if the same <a
shape="rect" href="exchange.html">Exchange</a> fails again it will be kept
retried until it success.</p><p>You can use option
<code>maximumRedeliveries</code> to limit the maximum number of redelivery
attempts for a given recovered <a shape="rect" href="exchange.html">Exchange</
a>. You must also set the <code>deadLetterUri</code> option so Camel knows
where to send the <a shape="rect" href="exchange.html">Exchange</a> when the
<code>maximumRedeliveries</code> was hit.</p><p>You can see some examples in
the unit tests of camel-sql, for example <a shape="rect" class="external-link"
href="https://svn.apache.org/repos/asf/camel/trunk/components/camel-sql/src/test/java/org/apache/camel/processor/aggregate/jdbc/JdbcAggregateRecoverDeadLetterChannelTest.java">this
test</a>.</p><h4 id="BookComponentAppendix-Database">Database</h4><p>To be
operational, each aggregator uses two table: the aggregation and completed one.
By convention the completed has the same name as the aggregation one suffixed
with <code>"_COMPLETED"</code>. The name must be configured in the Spring bean
with the <code>RepositoryName</code> property. In the following example
aggregation will be used.</p><p>The table structure definition of both table
are identical: in both case a String value is u
sed as key (<strong>id</strong>) whereas a Blob contains the exchange
serialized in byte array.<br clear="none"> However one difference should be
remembered: the <strong>id</strong> field does not have the same content
depending on the table.<br clear="none"> In the aggregation table
<strong>id</strong> holds the correlation Id used by the component to aggregate
the messages. In the completed table, <strong>id</strong> holds the id of the
exchange stored in corresponding the blob field.</p><p>Here is the SQL query
used to create the tables, just replace <code>"aggregation"</code> with your
aggregator repository name.</p><div class="code panel pdl" style="border-width:
1px;"><div class="codeContent panelContent pdl">
-<script class="brush: sql; gutter: false; theme: Default"
type="syntaxhighlighter"><![CDATA[CREATE TABLE aggregation (
- id varchar(255) NOT NULL,
- exchange blob NOT NULL,
- constraint aggregation_pk PRIMARY KEY (id)
-);
-CREATE TABLE aggregation_completed (
- id varchar(255) NOT NULL,
- exchange blob NOT NULL,
- constraint aggregation_completed_pk PRIMARY KEY (id)
-);
-]]></script>
-</div></div><h4 id="BookComponentAppendix-Storingbodyandheadersastext">Storing
body and headers as text</h4><p><strong>Available as of Camel
2.11</strong></p><p>You can configure the
<code>JdbcAggregationRepository</code> to store message body and select(ed)
headers as String in separate columns. For example to store the body, and the
following two headers <code>companyName</code> and <code>accountName</code> use
the following SQL:</p><div class="code panel pdl" style="border-width:
1px;"><div class="codeContent panelContent pdl">
-<script class="brush: sql; gutter: false; theme: Default"
type="syntaxhighlighter"><![CDATA[CREATE TABLE aggregationRepo3 (
- id varchar(255) NOT NULL,
- exchange blob NOT NULL,
- body varchar(1000),
- companyName varchar(1000),
- accountName varchar(1000),
- constraint aggregationRepo3_pk PRIMARY KEY (id)
-);
-CREATE TABLE aggregationRepo3_completed (
- id varchar(255) NOT NULL,
- exchange blob NOT NULL,
- body varchar(1000),
- companyName varchar(1000),
- accountName varchar(1000),
- constraint aggregationRepo3_completed_pk PRIMARY KEY (id)
-);
-]]></script>
-</div></div><p>And then configure the repository to enable this behavior as
shown below:</p><div class="code panel pdl" style="border-width: 1px;"><div
class="codeContent panelContent pdl">
-<script class="brush: xml; gutter: false; theme: Default"
type="syntaxhighlighter"><![CDATA[ <bean id="repo3"
class="org.apache.camel.processor.aggregate.jdbc.JdbcAggregationRepository">
- <property name="repositoryName"
value="aggregationRepo3"/>
- <property name="transactionManager"
ref="txManager3"/>
- <property name="dataSource"
ref="dataSource3"/>
- <!-- configure to store the message body and following headers as
text in the repo -->
- <property name="storeBodyAsText"
value="true"/>
- <property name="headersToStoreAsText">
- <list>
- <value>companyName</value>
- <value>accountName</value>
- </list>
- </property>
- </bean>
-]]></script>
-</div></div><h4 id="BookComponentAppendix-Codec(Serialization)">Codec
(Serialization)</h4><p>Since they can contain any type of payload, Exchanges
are not serializable by design. It is converted into a byte array to be stored
in a database BLOB field. All those conversions are handled by the
<code>JdbcCodec</code> class. One detail of the code requires your attention:
the <code>ClassLoadingAwareObjectInputStream</code>.</p><p>The
<code>ClassLoadingAwareObjectInputStream</code> has been reused from the <a
shape="rect" class="external-link" href="http://activemq.apache.org/">Apache
ActiveMQ</a> project. It wraps an <code>ObjectInputStream</code> and use it
with the <code>ContextClassLoader</code> rather than the
<code>currentThread</code> one. The benefit is to be able to load classes
exposed by other bundles. This allows the exchange body and headers to have
custom types object references.</p><h4
id="BookComponentAppendix-Transaction">Transaction</h4><p>A Spring
<code>PlatformTransac
tionManager</code> is required to orchestrate transaction.</p><h4
id="BookComponentAppendix-Service(Start/Stop)">Service (Start/Stop)</h4><p>The
<code>start</code> method verify the connection of the database and the
presence of the required tables. If anything is wrong it will fail during
starting.</p><h4 id="BookComponentAppendix-Aggregatorconfiguration">Aggregator
configuration</h4><p>Depending on the targeted environment, the aggregator
might need some configuration. As you already know, each aggregator should have
its own repository (with the corresponding pair of table created in the
database) and a data source. If the default lobHandler is not adapted to your
database system, it can be injected with the <code>lobHandler</code>
property.</p><p>Here is the declaration for Oracle:</p><div class="code panel
pdl" style="border-width: 1px;"><div class="codeContent panelContent pdl">
-<script class="brush: xml; gutter: false; theme: Default"
type="syntaxhighlighter"><![CDATA[ <bean id="lobHandler"
class="org.springframework.jdbc.support.lob.OracleLobHandler">
- <property name="nativeJdbcExtractor"
ref="nativeJdbcExtractor"/>
- </bean>
+</div></div><h4
id="BookComponentAppendix-UsingINquerieswithdynamicvalues">Using IN queries
with dynamic values</h4><p><strong>Available as of Camel
2.17</strong></p><p>From Camel 2.17 onwards the SQL producer allows to use SQL
queries with IN statements where the IN values is dynamic computed. For example
from the message body or a header etc.</p><p>To use IN you need
to:</p><ul><li><span style="line-height: 1.42857;">prefix the parameter name
with <code>in:</code></span></li><li><span style="line-height:
1.42857;">add <code>( )</code> around the
parameter</span></li></ul><p><span style="line-height: 1.42857;">An example
explains this better. The following query is used:</span></p><div class="code
panel pdl" style="border-width: 1px;"><div class="codeContent panelContent pdl">
+<script class="brush: java; gutter: false; theme: Default"
type="syntaxhighlighter"><![CDATA[-- this is a comment
+select *
+from projects
+where project in (:#in:names)
+order by id]]></script>
+</div></div><p><span style="line-height: 1.42857;">In the following
route:</span></p><div class="code panel pdl" style="border-width: 1px;"><div
class="codeContent panelContent pdl">
+<script class="brush: java; gutter: false; theme: Default"
type="syntaxhighlighter"><![CDATA[from("direct:query")
+ .to("sql:classpath:sql/selectProjectsIn.sql")
+ .to("log:query")
+ .to("mock:query");]]></script>
+</div></div><p><span style="line-height: 1.42857;">Then the IN query can use a
header with the key names with the dynamic values such as:</span></p><div
class="code panel pdl" style="border-width: 1px;"><div class="codeContent
panelContent pdl">
+<script class="brush: java; gutter: false; theme: Default"
type="syntaxhighlighter"><![CDATA[// use an array
+template.requestBodyAndHeader("direct:query", "Hi there!",
"names", new String[]{"Camel", "AMQ"});
- <bean id="nativeJdbcExtractor"
class="org.springframework.jdbc.support.nativejdbc.CommonsDbcpNativeJdbcExtractor"/>
- <bean id="repo"
class="org.apache.camel.processor.aggregate.jdbc.JdbcAggregationRepository">
- <property name="transactionManager"
ref="transactionManager"/>
- <property name="repositoryName"
value="aggregation"/>
- <property name="dataSource"
ref="dataSource"/>
- <!-- Only with Oracle, else use default -->
- <property name="lobHandler"
ref="lobHandler"/>
- </bean>
-]]></script>
-</div></div><h3 id="BookComponentAppendix-Optimisticlocking">Optimistic
locking</h3><p>From <strong>Camel 2.12</strong> onwards you can turn on
<code>optimisticLocking</code> and use this JDBC based aggregation repository
in a clustered environment where multiple Camel applications shared the same
database for the aggregation repository. If there is a race condition there
JDBC driver will throw a vendor specific exception which the
<code>JdbcAggregationRepository</code> can react upon. To know which caused
exceptions from the JDBC driver is regarded as an optimistick locking error we
need a mapper to do this. Therefore there is a
<code>org.apache.camel.processor.aggregate.jdbc.JdbcOptimisticLockingExceptionMapper</code>
allows you to implement your custom logic if needed. There is a default
implementation
<code>org.apache.camel.processor.aggregate.jdbc.DefaultJdbcOptimisticLockingExceptionMapper</code>
which works as follows:</p><p>The following check is done:</p><ul
class="alternat
e"><li>If the caused exception is an <code>SQLException</code> then the
SQLState is checked if starts with 23.</li><li>If the caused exception is a
<code>DataIntegrityViolationException</code></li><li>If the caused exception
class name has "ConstraintViolation" in its name.</li><li>optional checking for
FQN class name matches if any class names has been configured</li></ul><p>You
can in addition add FQN classnames, and if any of the caused exception (or any
nested) equals any of the FQN class names, then its an optimistick locking
error.</p><p>Here is an example, where we define 2 extra FQN class names from
the JDBC vendor.</p><div class="code panel pdl" style="border-width: 1px;"><div
class="codeContent panelContent pdl">
-<script class="brush: xml; gutter: false; theme: Default"
type="syntaxhighlighter"><![CDATA[ <bean id="repo"
class="org.apache.camel.processor.aggregate.jdbc.JdbcAggregationRepository">
- <property name="transactionManager"
ref="transactionManager"/>
- <property name="repositoryName"
value="aggregation"/>
- <property name="dataSource"
ref="dataSource"/>
- <property name"jdbcOptimisticLockingExceptionMapper"
ref="myExceptionMapper"/>
- </bean>
+// use a list
+List<String> names = new ArrayList<String>();
+names.add("Camel");
+names.add("AMQ");
- <!-- use the default mapper with extra FQN class names from our JDBC
driver -->
- <bean id="myExceptionMapper"
class="org.apache.camel.processor.aggregate.jdbc.DefaultJdbcOptimisticLockingExceptionMapper">
- <property name="classNames">
- <util:set>
- <value>com.foo.sql.MyViolationExceptoion</value>
- <value>com.foo.sql.MyOtherViolationExceptoion</value>
- </util:set>
- </property>
- </bean>
-]]></script>
-</div></div><p></p><h3 id="BookComponentAppendix-SeeAlso.63">See Also</h3>
-<ul><li><a shape="rect" href="configuring-camel.html">Configuring
Camel</a></li><li><a shape="rect"
href="component.html">Component</a></li><li><a shape="rect"
href="endpoint.html">Endpoint</a></li><li><a shape="rect"
href="getting-started.html">Getting Started</a></li></ul><ul
class="alternate"><li><a shape="rect" href="sql-stored-procedure.html">SQL
Stored Procedure</a></li><li><a shape="rect"
href="jdbc.html">JDBC</a></li></ul> <h2
id="BookComponentAppendix-TestComponent">Test Component</h2><p><a shape="rect"
href="testing.html">Testing</a> of distributed and asynchronous processing is
notoriously difficult. The <a shape="rect" href="mock.html">Mock</a>, <a
shape="rect" href="test.html">Test</a> and <a shape="rect"
href="dataset.html">DataSet</a> endpoints work great with the <a shape="rect"
href="testing.html">Camel Testing Framework</a> to simplify your unit and
integration testing using <a shape="rect"
href="enterprise-integration-patterns.html">Enterprise Integration Patterns
</a> and Camel's large range of <a shape="rect"
href="components.html">Components</a> together with the powerful <a
shape="rect" href="bean-integration.html">Bean Integration</a>.</p><p>The
<strong>test</strong> component extends the <a shape="rect"
href="mock.html">Mock</a> component to support pulling messages from another
endpoint on startup to set the expected message bodies on the underlying <a
shape="rect" href="mock.html">Mock</a> endpoint. That is, you use the test
endpoint in a route and messages arriving on it will be implicitly compared to
some expected messages extracted from some other location.</p><p>So you can
use, for example, an expected set of message bodies as files. This will then
set up a properly configured <a shape="rect" href="mock.html">Mock</a>
endpoint, which is only valid if the received messages match the number of
expected messages and their message payloads are equal.</p><p>Maven users will
need to add the following dependency to their <code>pom.xml</c
ode> for this component when using <strong>Camel 2.8</strong> or
older:</p><div class="code panel pdl" style="border-width: 1px;"><div
class="codeContent panelContent pdl">
+template.requestBodyAndHeader("direct:query", "Hi there!",
"names", names);
+
+
+// use a string separated values with comma
+template.requestBodyAndHeader("direct:query", "Hi there!",
"names", "Camel,AMQ");]]></script>
+</div></div><p>The query can also be specified in the endpoint instead of
being externalized (notice that externalizing makes maintaining the SQL queries
easier)</p><div class="code panel pdl" style="border-width: 1px;"><div
class="codeContent panelContent pdl">
+<script class="brush: java; gutter: false; theme: Default"
type="syntaxhighlighter"><![CDATA[from("direct:query")
+ .to("sql:select * from projects where project in (:#in:names) order
by id")
+ .to("log:query")
+ .to("mock:query");]]></script>
+</div></div><p> </p><h2
id="BookComponentAppendix-UsingtheJDBCbasedidempotentrepository">Using the JDBC
based idempotent repository</h2><p><strong>Available as of Camel 2.7</strong>:
In this section we will use the JDBC based idempotent repository.</p><div
class="confluence-information-macro confluence-information-macro-tip"><p
class="title">Abstract class</p><span class="aui-icon aui-icon-small
aui-iconfont-approve confluence-information-macro-icon"></span><div
class="confluence-information-macro-body"></div></div><p>From Camel 2.9 onwards
there is an abstract class
<code>org.apache.camel.processor.idempotent.jdbc.AbstractJdbcMessageIdRepository</code>
you can extend to build custom JDBC idempotent
repository.</p><p> </p><p>First we have to create the database table which
will be used by the idempotent repository. For <strong>Camel 2.7</strong>, we
use the following schema:</p><div class="code panel pdl" style="border-width:
1px;"><div class="codeContent panelContent pdl"
>
+<script class="brush: sql; gutter: false; theme: Default"
type="syntaxhighlighter"><![CDATA[]]></script>
+</div></div><p>CREATE TABLE CAMEL_MESSAGEPROCESSED ( processorName
VARCHAR(255), messageId VARCHAR(100) )</p><p> </p><p>In <strong>Camel
2.8</strong>, we added the createdAt column:</p><div class="code panel pdl"
style="border-width: 1px;"><div class="codeContent panelContent pdl">
+<script class="brush: sql; gutter: false; theme: Default"
type="syntaxhighlighter"><![CDATA[]]></script>
+</div></div><p>CREATE TABLE CAMEL_MESSAGEPROCESSED ( processorName
VARCHAR(255), messageId VARCHAR(100), createdAt TIMESTAMP
)</p><p> </p><p> </p><div class="confluence-information-macro
confluence-information-macro-warning"><span class="aui-icon aui-icon-small
aui-iconfont-error confluence-information-macro-icon"></span><div
class="confluence-information-macro-body"></div></div><p
class="wysiwyg-macro-body">The SQL Server <strong>TIMESTAMP</strong> type
is a fixed-length binary-string type. It does not map to any of the JDBC time
types: <strong>DATE</strong>, <strong>TIME</strong>, or
<strong>TIMESTAMP</strong>.</p><p> </p><p> </p><p>We recommend to
have a unique constraint on the columns processorName and messageId. Because
the syntax for this constraint differs for database to database, we do not show
it here.</p><p>Second we need to setup a <code>javax.sql.DataSource</code> in
the spring XML file:{snippet:id=e1|lang=xml|url=camel/trunk/components/camel-s
ql/src/test/resources/org/apache/camel/processor/idempotent/jdbc/spring.xml}</p><p> </p><p>And
finally we can create our JDBC idempotent repository in the spring XML file as
well:{snippet:id=e2|lang=xml|url=camel/trunk/components/camel-sql/src/test/resources/org/apache/camel/processor/idempotent/jdbc/spring.xml}</p><p> </p><p>Customize
the JdbcMessageIdRepository</p><p>Starting with <strong>Camel 2.9.1</strong>
you have a few options to tune the
<code>org.apache.camel.processor.idempotent.jdbc.JdbcMessageIdRepository</code>
for your needs:</p><p class="confluenceTable"> </p><p> </p><p
class="confluenceTh"> </p><p>Parameter</p><p> </p><p
class="confluenceTh"> </p><p>Default
Value</p><p> </p><p>Description</p><p> </p><p> </p><p
class="confluenceTd"> </p><p>createTableIfNotExists</p><p> </p><p
class="confluenceTd"> </p><p>true</p><p> </p><p>Defines whether or
not Camel should try to create the table if it doesn't exi
st.</p><p> </p><p> </p><p
class="confluenceTd"> </p><p>tableExistsString</p><p> </p><p
class="confluenceTd"> </p><p>SELECT 1 FROM CAMEL_MESSAGEPROCESSED WHERE 1
= 0</p><p> </p><p>This query is used to figure out whether the table
already exists or not. It must throw an exception to indicate the table doesn't
exist.</p><p> </p><p> </p><p
class="confluenceTd"> </p><p>createString</p><p> </p><p
class="confluenceTd"> </p><p>CREATE TABLE CAMEL_MESSAGEPROCESSED
(processorName VARCHAR(255), messageId VARCHAR(100), createdAt
TIMESTAMP)</p><p> </p><p>The statement which is used to create the
table.</p><p> </p><p> </p><p
class="confluenceTd"> </p><p>queryString</p><p> </p><p
class="confluenceTd"> </p><p>SELECT COUNT(*) FROM CAMEL_MESSAGEPROCESSED
WHERE processorName = ? AND messageId = ?</p><p> </p><p>The query which is
used to figure out whether the message already exists in the repository (the
result i
s not equals to '0'). It takes two parameters. This first one is the processor
name (<code>String</code>) and the second one is the message id
(<code>String</code>).</p><p> </p><p> </p><p
class="confluenceTd"> </p><p>insertString</p><p> </p><p
class="confluenceTd"> </p><p>INSERT INTO CAMEL_MESSAGEPROCESSED
(processorName, messageId, createdAt) VALUES (?, ?, ?)</p><p> </p><p>The
statement which is used to add the entry into the table. It takes three
parameter. The first one is the processor name (<code>String</code>), the
second one is the message id (<code>String</code>) and the third one is the
timestamp (<code>java.sql.Timestamp</code>) when this entry was added to the
repository.</p><p> </p><p
class="confluenceTd"> </p><p>deleteString</p><p> </p><p
class="confluenceTd"> </p><p>DELETE FROM CAMEL_MESSAGEPROCESSED WHERE
processorName = ? AND messageId = ?</p><p> </p><p>The statement which is
used to delete the entry from the dat
abase. It takes two parameter. This first one is the processor name
(<code>String</code>) and the second one is the message id
(<code>String</code>).</p><p> </p><p>A customized
<code>org.apache.camel.processor.idempotent.jdbc.JdbcMessageIdRepository</code>
could look
like:{snippet:id=e1|lang=xml|url=camel/trunk/components/camel-sql/src/test/resources/org/apache/camel/processor/idempotent/jdbc/customized-spring.xml}</p><p> </p><p>Using
the JDBC based aggregation repository</p><p><strong>Available as of Camel
2.6</strong></p><div class="confluence-information-macro
confluence-information-macro-information"><p class="title">Using
JdbcAggregationRepository in Camel 2.6</p><span class="aui-icon aui-icon-small
aui-iconfont-info confluence-information-macro-icon"></span><div
class="confluence-information-macro-body"></div></div><p>In Camel 2.6, the
JdbcAggregationRepository is provided in the <code>camel-jdbc-aggregator</code>
component. From Camel 2.7 onwards, the <code>JdbcAggr
egationRepository</code> is provided in the <code>camel-sql</code>
component.</p><p> </p><p><code>JdbcAggregationRepository</code> is an
<code>AggregationRepository</code> which on the fly persists the aggregated
messages. This ensures that you will not loose messages, as the default
aggregator will use an in memory only <code>AggregationRepository</code>.<br
clear="none"> The <code>JdbcAggregationRepository</code> allows together with
Camel to provide persistent support for the <a shape="rect"
href="aggregator2.html">Aggregator</a>.</p><p>It has the following
options:</p><p class="confluenceTable"> </p><p> </p><p
class="confluenceTh"> </p><p>Option</p><p> </p><p
class="confluenceTh"> </p><p>Type</p><p> </p><p>Description</p><p> </p><p> </p><p
class="confluenceTd"> </p><p><code>dataSource</code></p><p> </p><p
class="confluenceTd"> </p><p><code>DataSource</code></p><p> </p><p><strong>Mandatory:</strong>
The <code>javax.
sql.DataSource</code> to use for accessing the
database.</p><p> </p><p> </p><p
class="confluenceTd"> </p><p><code>repositoryName</code></p><p> </p><p
class="confluenceTd"> </p><p><code>String</code></p><p> </p><p><strong>Mandatory:</strong>
The name of the repository.</p><p> </p><p> </p><p
class="confluenceTd"> </p><p><code>transactionManager</code></p><p> </p><p
class="confluenceTd"> </p><p><code>TransactionManager</code></p><p> </p><p><strong>Mandatory:</strong>
The <code>org.springframework.transaction.PlatformTransactionManager</code> to
mange transactions for the database. The TransactionManager must be able to
support databases.</p><p> </p><p> </p><p
class="confluenceTd"> </p><p><code>lobHandler</code></p><p> </p><p
class="confluenceTd"> </p><p><code>LobHandler</code></p><p> </p><p>A
<code>org.springframework.jdbc.support.lob.LobHandler</code> to handle Lob
types in the database. Use this
option to use a vendor specific LobHandler, for example when using
Oracle.</p><p> </p><p> </p><p
class="confluenceTd"> </p><p><code>returnOldExchange</code></p><p> </p><p
class="confluenceTd"> </p><p>boolean</p><p> </p><p>Whether the get
operation should return the old existing Exchange if any existed. By default
this option is <code>false</code> to optimize as we do not need the old
exchange when aggregating.</p><p> </p><p> </p><p
class="confluenceTd"> </p><p><code>useRecovery</code></p><p> </p><p
class="confluenceTd"> </p><p>boolean</p><p> </p><p>Whether or not
recovery is enabled. This option is by default <code>true</code>. When enabled
the Camel <a shape="rect" href="aggregator2.html">Aggregator</a> automatic
recover failed aggregated exchange and have them
resubmitted.</p><p> </p><p> </p><p
class="confluenceTd"> </p><p><code>recoveryInterval</code></p><p> </p><p
class="confluenceTd"> </p><p>l
ong</p><p> </p><p>If recovery is enabled then a background task is run
every x'th time to scan for failed exchanges to recover and resubmit. By
default this interval is 5000 millis.</p><p> </p><p> </p><p
class="confluenceTd"> </p><p><code>maximumRedeliveries</code></p><p> </p><p
class="confluenceTd"> </p><p>int</p><p> </p><p>Allows you to limit
the maximum number of redelivery attempts for a recovered exchange. If enabled
then the Exchange will be moved to the dead letter channel if all redelivery
attempts failed. By default this option is disabled. If this option is used
then the <code>deadLetterUri</code> option must also be
provided.</p><p> </p><p> </p><p
class="confluenceTd"> </p><p><code>deadLetterUri</code></p><p> </p><p
class="confluenceTd"> </p><p>String</p><p> </p><p>An endpoint uri for
a <a shape="rect" href="dead-letter-channel.html">Dead Letter Channel</a> where
exhausted recovered Exchanges will be moved.
If this option is used then the <code>maximumRedeliveries</code> option must
also be provided.</p><p> </p><p> </p><p
class="confluenceTd"> </p><p><code>storeBodyAsText</code></p><p> </p><p
class="confluenceTd"> </p><p>boolean</p><p> </p><p><strong>Camel
2.11:</strong> Whether to store the message body as String which is human
readable. By default this option is <code>false</code> storing the body in
binary format.</p><p> </p><p> </p><p
class="confluenceTd"> </p><p><code>headersToStoreAsText</code></p><p> </p><p
class="confluenceTd"> </p><p><code>List<String></code></p><p> </p><p><strong>Camel
2.11:</strong> Allows to store headers as String which is human readable. By
default this option is disabled, storing the headers in binary
format.</p><p> </p><p> </p><p
class="confluenceTd"> </p><p><code>optimisticLocking</code></p><p> </p><p
class="confluenceTd"> </p><p><code>false</code></p><p> </
p><p><strong>Camel 2.12:</strong> To turn on optimistic locking, which often
would be needed in clustered environments where multiple Camel applications
shared the same JDBC based aggregation repository.</p><p> </p><p
class="confluenceTd"> </p><p><code>jdbcOptimisticLockingExceptionMapper</code></p><p> </p><p
class="confluenceTd"> </p><p> </p><p> </p><p><strong>Camel
2.12:</strong> Allows to plugin a custom
<code>org.apache.camel.processor.aggregate.jdbc.JdbcOptimisticLockingExceptionMapper</code>
to map vendor specific error codes to an optimistick locking error, for Camel
to perform a retry. This requires <code>optimisticLocking</code> to be
enabled.</p><p> </p><p>What is preserved when
persisting</p><p><code>JdbcAggregationRepository</code> will only preserve any
<code>Serializable</code> compatible data types. If a data type is not such a
type its dropped and a <code>WARN</code> is logged. And it only persists the
<code>Message</code> body and
the <code>Message</code> headers. The <code>Exchange</code> properties are
<strong>not</strong> persisted.</p><p>From Camel 2.11 onwards you can store the
message body and select(ed) headers as String in separate
columns.</p><p>Recovery</p><p>The <code>JdbcAggregationRepository</code> will
by default recover any failed <a shape="rect"
href="exchange.html">Exchange</a>. It does this by having a background tasks
that scans for failed <a shape="rect" href="exchange.html">Exchange</a>s in the
persistent store. You can use the <code>checkInterval</code> option to set how
often this task runs. The recovery works as transactional which ensures that
Camel will try to recover and redeliver the failed <a shape="rect"
href="exchange.html">Exchange</a>. Any <a shape="rect"
href="exchange.html">Exchange</a> which was found to be recovered will be
restored from the persistent store and resubmitted and send out
again.</p><p>The following headers is set when an <a shape="rect"
href="exchange.html">
Exchange</a> is being recovered/redelivered:</p><p
class="confluenceTable"> </p><p> </p><p
class="confluenceTh"> </p><p>Header</p><p> </p><p
class="confluenceTh"> </p><p>Type</p><p> </p><p>Description</p><p> </p><p> </p><p
class="confluenceTd"> </p><p><code>Exchange.REDELIVERED</code></p><p> </p><p
class="confluenceTd"> </p><p>Boolean</p><p> </p><p>Is set to true to
indicate the <a shape="rect" href="exchange.html">Exchange</a> is being
redelivered.</p><p> </p><p
class="confluenceTd"> </p><p><code>Exchange.REDELIVERY_COUNTER</code></p><p> </p><p
class="confluenceTd"> </p><p>Integer</p><p> </p><p>The redelivery
attempt, starting from 1.</p><p> </p><p>Only when an <a shape="rect"
href="exchange.html">Exchange</a> has been successfully processed it will be
marked as complete which happens when the <code>confirm</code> method is
invoked on the <code>AggregationRepository</code>. This means if the s
ame <a shape="rect" href="exchange.html">Exchange</a> fails again it will be
kept retried until it success.</p><p>You can use option
<code>maximumRedeliveries</code> to limit the maximum number of redelivery
attempts for a given recovered <a shape="rect"
href="exchange.html">Exchange</a>. You must also set the
<code>deadLetterUri</code> option so Camel knows where to send the <a
shape="rect" href="exchange.html">Exchange</a> when the
<code>maximumRedeliveries</code> was hit.</p><p>You can see some examples in
the unit tests of camel-sql, for example <a shape="rect" class="external-link"
href="https://svn.apache.org/repos/asf/camel/trunk/components/camel-sql/src/test/java/org/apache/camel/processor/aggregate/jdbc/JdbcAggregateRecoverDeadLetterChannelTest.java">this
test</a>.</p><p>Database</p><p>To be operational, each aggregator uses two
table: the aggregation and completed one. By convention the completed has the
same name as the aggregation one suffixed with <code>"_COMPLETED"</co
de>. The name must be configured in the Spring bean with the
<code>RepositoryName</code> property. In the following example aggregation will
be used.</p><p>The table structure definition of both table are identical: in
both case a String value is used as key (<strong>id</strong>) whereas a Blob
contains the exchange serialized in byte array.<br clear="none"> However one
difference should be remembered: the <strong>id</strong> field does not have
the same content depending on the table.<br clear="none"> In the aggregation
table <strong>id</strong> holds the correlation Id used by the component to
aggregate the messages. In the completed table, <strong>id</strong> holds the
id of the exchange stored in corresponding the blob field.</p><p>Here is the
SQL query used to create the tables, just replace <code>"aggregation"</code>
with your aggregator repository name.</p><div class="code panel pdl"
style="border-width: 1px;"><div class="codeContent panelContent pdl">
+<script class="brush: sql; gutter: false; theme: Default"
type="syntaxhighlighter"><![CDATA[]]></script>
+</div></div><p>CREATE TABLE aggregation ( id varchar(255) NOT NULL, exchange
blob NOT NULL, constraint aggregation_pk PRIMARY KEY (id) ); CREATE TABLE
aggregation_completed ( id varchar(255) NOT NULL, exchange blob NOT NULL,
constraint aggregation_completed_pk PRIMARY KEY (id)
);</p><p> </p><p>Storing body and headers as text</p><p><strong>Available
as of Camel 2.11</strong></p><p>You can configure the
<code>JdbcAggregationRepository</code> to store message body and select(ed)
headers as String in separate columns. For example to store the body, and the
following two headers <code>companyName</code> and <code>accountName</code> use
the following SQL:</p><div class="code panel pdl" style="border-width:
1px;"><div class="codeContent panelContent pdl">
+<script class="brush: sql; gutter: false; theme: Default"
type="syntaxhighlighter"><![CDATA[]]></script>
+</div></div><p>CREATE TABLE aggregationRepo3 ( id varchar(255) NOT NULL,
exchange blob NOT NULL, body varchar(1000), companyName varchar(1000),
accountName varchar(1000), constraint aggregationRepo3_pk PRIMARY KEY (id) );
CREATE TABLE aggregationRepo3_completed ( id varchar(255) NOT NULL, exchange
blob NOT NULL, body varchar(1000), companyName varchar(1000), accountName
varchar(1000), constraint aggregationRepo3_completed_pk PRIMARY KEY (id)
);</p><p> </p><p>And then configure the repository to enable this behavior
as shown below:</p><div class="code panel pdl" style="border-width: 1px;"><div
class="codeContent panelContent pdl">
+<script class="brush: xml; gutter: false; theme: Default"
type="syntaxhighlighter"><![CDATA[]]></script>
+</div></div><p><bean id="repo3"
class="org.apache.camel.processor.aggregate.jdbc.JdbcAggregationRepository">
<property name="repositoryName" value="aggregationRepo3"/> <property
name="transactionManager" ref="txManager3"/> <property name="dataSource"
ref="dataSource3"/> <!-- configure to store the message body and
following headers as text in the repo --> <property
name="storeBodyAsText" value="true"/> <property
name="headersToStoreAsText"> <list>
<value>companyName</value> <value>accountName</value>
</list> </property> </bean></p><p> </p><p>Codec
(Serialization)</p><p>Since they can contain any type of payload, Exchanges are
not serializable by design. It is converted into a byte array to be stored in a
database BLOB field. All those conversions are handled by the
<code>JdbcCodec</code> class. One detail of the code requires your attention:
the <code>ClassLoadingAwareObjectInputStream</co
de>.</p><p>The <code>ClassLoadingAwareObjectInputStream</code> has been reused
from the <a shape="rect" class="external-link"
href="http://activemq.apache.org/">Apache ActiveMQ</a> project. It wraps an
<code>ObjectInputStream</code> and use it with the
<code>ContextClassLoader</code> rather than the <code>currentThread</code> one.
The benefit is to be able to load classes exposed by other bundles. This allows
the exchange body and headers to have custom types object
references.</p><p>Transaction</p><p>A Spring
<code>PlatformTransactionManager</code> is required to orchestrate
transaction.</p><p>Service (Start/Stop)</p><p>The <code>start</code> method
verify the connection of the database and the presence of the required tables.
If anything is wrong it will fail during starting.</p><p>Aggregator
configuration</p><p>Depending on the targeted environment, the aggregator might
need some configuration. As you already know, each aggregator should have its
own repository (with the correspo
nding pair of table created in the database) and a data source. If the default
lobHandler is not adapted to your database system, it can be injected with the
<code>lobHandler</code> property.</p><p>Here is the declaration for
Oracle:</p><div class="code panel pdl" style="border-width: 1px;"><div
class="codeContent panelContent pdl">
+<script class="brush: xml; gutter: false; theme: Default"
type="syntaxhighlighter"><![CDATA[]]></script>
+</div></div><p><bean id="lobHandler"
class="org.springframework.jdbc.support.lob.OracleLobHandler"> <property
name="nativeJdbcExtractor" ref="nativeJdbcExtractor"/> </bean>
<bean id="nativeJdbcExtractor"
class="org.springframework.jdbc.support.nativejdbc.CommonsDbcpNativeJdbcExtractor"/>
<bean id="repo"
class="org.apache.camel.processor.aggregate.jdbc.JdbcAggregationRepository">
<property name="transactionManager" ref="transactionManager"/>
<property name="repositoryName" value="aggregation"/> <property
name="dataSource" ref="dataSource"/> <!-- Only with Oracle, else use
default --> <property name="lobHandler" ref="lobHandler"/>
</bean></p><p> </p><p>Optimistic locking</p><p>From <strong>Camel
2.12</strong> onwards you can turn on <code>optimisticLocking</code> and use
this JDBC based aggregation repository in a clustered environment where
multiple Camel applications shared the same database for the aggregation r
epository. If there is a race condition there JDBC driver will throw a vendor
specific exception which the <code>JdbcAggregationRepository</code> can react
upon. To know which caused exceptions from the JDBC driver is regarded as an
optimistick locking error we need a mapper to do this. Therefore there is a
<code>org.apache.camel.processor.aggregate.jdbc.JdbcOptimisticLockingExceptionMapper</code>
allows you to implement your custom logic if needed. There is a default
implementation
<code>org.apache.camel.processor.aggregate.jdbc.DefaultJdbcOptimisticLockingExceptionMapper</code>
which works as follows:</p><p>The following check is done:</p><p>If the caused
exception is an <code>SQLException</code> then the SQLState is checked if
starts with 23.</p><p>If the caused exception is a
<code>DataIntegrityViolationException</code></p><p>If the caused exception
class name has "ConstraintViolation" in its name.</p><p>optional checking for
FQN class name matches if any class names has been co
nfigured</p><p>You can in addition add FQN classnames, and if any of the
caused exception (or any nested) equals any of the FQN class names, then its an
optimistick locking error.</p><p>Here is an example, where we define 2 extra
FQN class names from the JDBC vendor.</p><div class="code panel pdl"
style="border-width: 1px;"><div class="codeContent panelContent pdl">
+<script class="brush: xml; gutter: false; theme: Default"
type="syntaxhighlighter"><![CDATA[]]></script>
+</div></div><p><bean id="repo"
class="org.apache.camel.processor.aggregate.jdbc.JdbcAggregationRepository">
<property name="transactionManager" ref="transactionManager"/>
<property name="repositoryName" value="aggregation"/> <property
name="dataSource" ref="dataSource"/> <property
name"jdbcOptimisticLockingExceptionMapper" ref="myExceptionMapper"/>
</bean> <!-- use the default mapper with extra FQN class names from
our JDBC driver --> <bean id="myExceptionMapper"
class="org.apache.camel.processor.aggregate.jdbc.DefaultJdbcOptimisticLockingExceptionMapper">
<property name="classNames"> <util:set>
<value>com.foo.sql.MyViolationExceptoion</value>
<value>com.foo.sql.MyOtherViolationExceptoion</value>
</util:set> </property> </bean></p><p> </p><p></p><h3
id="BookComponentAppendix-SeeAlso.63">See Also</h3>
+<ul><li><a shape="rect" href="configuring-camel.html">Configuring
Camel</a></li><li><a shape="rect"
href="component.html">Component</a></li><li><a shape="rect"
href="endpoint.html">Endpoint</a></li><li><a shape="rect"
href="getting-started.html">Getting Started</a></li></ul><p><a shape="rect"
href="sql-stored-procedure.html">SQL Stored Procedure</a></p><p><a shape="rect"
href="jdbc.html">JDBC</a></p> <h2 id="BookComponentAppendix-TestComponent">Test
Component</h2><p><a shape="rect" href="testing.html">Testing</a> of distributed
and asynchronous processing is notoriously difficult. The <a shape="rect"
href="mock.html">Mock</a>, <a shape="rect" href="test.html">Test</a> and <a
shape="rect" href="dataset.html">DataSet</a> endpoints work great with the <a
shape="rect" href="testing.html">Camel Testing Framework</a> to simplify your
unit and integration testing using <a shape="rect"
href="enterprise-integration-patterns.html">Enterprise Integration Patterns</a>
and Camel's large range of
<a shape="rect" href="components.html">Components</a> together with the
powerful <a shape="rect" href="bean-integration.html">Bean
Integration</a>.</p><p>The <strong>test</strong> component extends the <a
shape="rect" href="mock.html">Mock</a> component to support pulling messages
from another endpoint on startup to set the expected message bodies on the
underlying <a shape="rect" href="mock.html">Mock</a> endpoint. That is, you use
the test endpoint in a route and messages arriving on it will be implicitly
compared to some expected messages extracted from some other location.</p><p>So
you can use, for example, an expected set of message bodies as files. This will
then set up a properly configured <a shape="rect" href="mock.html">Mock</a>
endpoint, which is only valid if the received messages match the number of
expected messages and their message payloads are equal.</p><p>Maven users will
need to add the following dependency to their <code>pom.xml</code> for this
component when us
ing <strong>Camel 2.8</strong> or older:</p><div class="code panel pdl"
style="border-width: 1px;"><div class="codeContent panelContent pdl">
<script class="brush: xml; gutter: false; theme: Default"
type="syntaxhighlighter"><![CDATA[<dependency>
<groupId>org.apache.camel</groupId>
<artifactId>camel-spring</artifactId>