JDBC Persistent Aggregator
--------------------------
Key: CAMEL-3502
URL: https://issues.apache.org/jira/browse/CAMEL-3502
Project: Camel
Issue Type: New Feature
Affects Versions: 2.6.0
Environment: JDBC Aggregator Database
Reporter: Olivier Roger
The patch provided is an implementation of a Aggregator for JDBC.
It implements *org.apache.camel.spi.AggregationRepository* (2.4+) and has been
tested on Camel 2.4 and 2.6.
Some details have been disscussed on the [mailling
list|http://camel.465427.n5.nabble.com/Aggregator-Persistence-td2800301.html]
with Claus.
The implementation is very similar to the one for HawtDB.
Unit tests have been included and use the H2 in-memory database.
About that, the Test *JdbcAggregateLoadAndRecoverTest* has been adapted to
exclude the redeliveries from the failling messages. It is possible for a
message to be sent due to a _scan()_ background operation evn before it is send
by the _onCompletion()_ method ?
Here is a small documentation
{quote}
h2. Database
To be operational, each aggregator uses two table: the aggregation and
completed one. By convention the completed has the same name as the aggregation
one suffixed with "_COMPLETED". The name must be configured in the Spring bean
with the _RepositoryName_ property. In the following example aggregation will
be used.
The table structure definition of both table are identical: in both case a
String value is used as key (*id*) whereas a Blob contains the exchange
serialized in byte array.
However one difference should be remembered: the *id* field does not have the
same content depending on the table.
In the aggregation table *id* holds the correlation Id used by the component to
aggregate the messages. In the completed table, *id* holds the id of the
exchange stored in corresponding the blob field.
Here is the SQL query used to create the tables, just replace "aggregation"
with your aggregator repository name.
{code}
CREATE TABLE aggregation (
id varchar(255) NOT NULL,
exchange blob NOT NULL,
constraint aggregation_pk PRIMARY KEY (id)
);
CREATE TABLE aggregation_completed (
id varchar(255) NOT NULL,
exchange blob NOT NULL,
constraint aggregation_completed_pk PRIMARY KEY (id)
);
{code}
h3. Codec (Serialization)
Since they can contain any type of payload, Exchanges are not serializable by
design. It is converted into a byte array to be stored in a database BLOB field.
All those conversions are handled by the *JdbcCodec* class. One detail of the
code requires your attention: the *ClassLoadingAwareObjectInputStream*.
The *ClassLoadingAwareObjectInputStream* has been reused from the ActiveMQ
project. It wraps an *ObjectInputStream* and use it with the ContextClassLoader
rather than the currentThread one. The benefit is to be able to load classes
exposed by other bundles.
This allows the exchange body and headers to have custom types object
references.
h2. Transactional
TransactionTemplate is use to wrap all the calls to the database.
Therefore a Transaction Manager is required (see bean-declaration)
h2. Service (Start/Stop)
The *JdbcAggregationRepository* extends ServiceSupport. This includes the
components in Camel component life cycle.
The _start()_ method verify the connection of the database and the presence of
the required tables.
h2. Aggregator configuration
Depending on the targeted environment, the aggregator might need some
configuration. As you already know, each aggregator should have its own
repository (with the corresponding pair of table created in the database) and a
data source. If the default lobHandler is not adapted to your database system,
it can be injected with the _lobHandler_ property.
Here is the declaration for Oracle :
{code}
<bean id="lobHandler"
class="org.springframework.jdbc.support.lob.OracleLobHandler">
<property name="nativeJdbcExtractor" ref="nativeJdbcExtractor"/>
</bean>
<bean id="nativeJdbcExtractor"
class="org.springframework.jdbc.support.nativejdbc.CommonsDbcpNativeJdbcExtractor"/>
<bean id="repo"
class="org.apache.camel.component.jdbc.aggregationRepository.JdbcAggregationRepository">
<constructor-arg name="transactionManager" ref="transactionManager"/>
<constructor-arg name="repositoryName" value="aggregation"/>
<constructor-arg name="dataSource" ref="dataSource"/>
<!-- Only with Oracle, else use default -->
<property name="lobHandler" ref="lobHandler"/>
</bean>
{code}
h2. Feature
A feature has been created : *camel-jdbc-aggregator*.
{quote}
--
This message is automatically generated by JIRA.
-
You can reply to this email to add a comment to the issue online.