This is an automated email from the ASF dual-hosted git repository.
davsclaus pushed a commit to branch camel-2.25.x
in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/camel-2.25.x by this push:
new 20520d5 [CAMEL-7810] Fix JdbcAggregationRepository optimistic locking
feature (#3837)
20520d5 is described below
commit 20520d547c81499f6b81cd8765cd1041d4e3e484
Author: Federico Valeri <[email protected]>
AuthorDate: Thu May 21 08:44:24 2020 +0200
[CAMEL-7810] Fix JdbcAggregationRepository optimistic locking feature
(#3837)
Running JdbcAggregationRepository on a distributed environment (multiple
independent
threads or multiple JVMs) we have silently dropped exchanges. This PR aims
at introducing
a version identifier to avoid the lost update problem (overwrites when
updating the
aggregate row) as stated by the OptimisticLockingAggregationRepository SPI.
Tested running 100 threads to aggregate 10k messages on MySQL 5.7 and
PostgreSQL 11.5.
Co-authored-by: Federico Valeri <fvaleri@localhost>
---
camel-core/src/main/docs/language-component.adoc | 3 +-
.../camel-sql/src/main/docs/sql-component.adoc | 19 +
.../jdbc/JdbcAggregationRepositoryNew.java | 626 +++++++++++++++++++++
.../jdbc/AbstractJdbcAggregationTestSupport.java | 17 +-
.../jdbc/JdbcAggregateLoadAndRecoverTest.java | 8 +-
.../aggregate/jdbc/JdbcAggregateNotLostTest.java | 6 +-
.../JdbcAggregateRecoverDeadLetterChannelTest.java | 2 +-
.../JdbcAggregationRepositoryAlotDataTest.java | 2 +-
.../JdbcAggregationRepositoryMultipleRepoTest.java | 3 +-
...bcAggregationRepositoryRecoverExistingTest.java | 5 +-
.../jdbc/JdbcAggregationRepositoryTest.java | 5 +-
.../jdbc/JdbcExchangeSerializationTest.java | 9 +-
.../aggregate/jdbc/JdbcGrowIssueTest.java | 6 +-
.../camel-sql/src/test/resources/sql/init.sql | 2 +-
.../camel-sql/src/test/resources/sql/init2.sql | 2 +-
.../camel-sql/src/test/resources/sql/init3.sql | 2 +-
16 files changed, 689 insertions(+), 28 deletions(-)
diff --git a/camel-core/src/main/docs/language-component.adoc
b/camel-core/src/main/docs/language-component.adoc
index 241b13d..a3a5620 100644
--- a/camel-core/src/main/docs/language-component.adoc
+++ b/camel-core/src/main/docs/language-component.adoc
@@ -62,12 +62,13 @@ with the following path and query parameters:
|===
-=== Query Parameters (6 parameters):
+=== Query Parameters (7 parameters):
[width="100%",cols="2,5,^1,2",options="header"]
|===
| Name | Description | Default | Type
+| *allowContextMapAll* (producer) | Sets whether the context map should allow
access to all details. By default only the message body and headers can be
accessed. This option can be enabled for full access to the current Exchange
and CamelContext. Doing so impose a potential security risk as this opens
access to the full power of CamelContext API. | false | boolean
| *binary* (producer) | Whether the script is binary content or text content.
By default the script is read as text content (eg java.lang.String) | false |
boolean
| *cacheScript* (producer) | Whether to cache the compiled script and reuse
Notice reusing the script can cause side effects from processing one Camel
org.apache.camel.Exchange to the next org.apache.camel.Exchange. | false |
boolean
| *contentCache* (producer) | Sets whether to use resource content cache or
not. | false | boolean
diff --git a/components/camel-sql/src/main/docs/sql-component.adoc
b/components/camel-sql/src/main/docs/sql-component.adoc
index 66a0499..ab9763c 100644
--- a/components/camel-sql/src/main/docs/sql-component.adoc
+++ b/components/camel-sql/src/main/docs/sql-component.adoc
@@ -646,6 +646,25 @@ CREATE TABLE aggregation_completed (
);
-----
+**Note:** There is a new version called `JdbcAggregationRepositoryNew` which
supports the optimistic locking
+also on a dstributed environment (multiple JVMs or independent threads). This
new implementation is built on
+top of the old and needs an additional `version` column:
+
+[source,sql]
+-----
+CREATE TABLE aggregation (
+ id varchar(255) NOT NULL,
+ exchange blob NOT NULL,
+ version BIGINT NOT NULL,
+ constraint aggregation_pk PRIMARY KEY (id)
+);
+CREATE TABLE aggregation_completed (
+ id varchar(255) NOT NULL,
+ exchange blob NOT NULL,
+ version BIGINT NOT NULL,
+ constraint aggregation_completed_pk PRIMARY KEY (id)
+);
+-----
== Storing body and headers as text
diff --git
a/components/camel-sql/src/main/java/org/apache/camel/processor/aggregate/jdbc/JdbcAggregationRepositoryNew.java
b/components/camel-sql/src/main/java/org/apache/camel/processor/aggregate/jdbc/JdbcAggregationRepositoryNew.java
new file mode 100644
index 0000000..d559af8
--- /dev/null
+++
b/components/camel-sql/src/main/java/org/apache/camel/processor/aggregate/jdbc/JdbcAggregationRepositoryNew.java
@@ -0,0 +1,626 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor.aggregate.jdbc;
+
+import java.io.IOException;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Types;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import javax.sql.DataSource;
+
+import org.apache.camel.CamelContext;
+import org.apache.camel.Exchange;
+import org.apache.camel.spi.OptimisticLockingAggregationRepository;
+import org.apache.camel.spi.RecoverableAggregationRepository;
+import org.apache.camel.support.ServiceSupport;
+import org.apache.camel.util.ObjectHelper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.core.Constants;
+import org.springframework.dao.EmptyResultDataAccessException;
+import org.springframework.jdbc.core.JdbcTemplate;
+import org.springframework.jdbc.core.RowMapper;
+import
org.springframework.jdbc.core.support.AbstractLobCreatingPreparedStatementCallback;
+import org.springframework.jdbc.support.lob.DefaultLobHandler;
+import org.springframework.jdbc.support.lob.LobCreator;
+import org.springframework.jdbc.support.lob.LobHandler;
+import org.springframework.transaction.PlatformTransactionManager;
+import org.springframework.transaction.TransactionDefinition;
+import org.springframework.transaction.TransactionStatus;
+import org.springframework.transaction.support.DefaultTransactionDefinition;
+import org.springframework.transaction.support.TransactionCallback;
+import
org.springframework.transaction.support.TransactionCallbackWithoutResult;
+import org.springframework.transaction.support.TransactionTemplate;
+
+/**
+ * JDBC based {@link org.apache.camel.spi.AggregationRepository}
+ * JdbcAggregationRepository will only preserve any Serializable compatible
+ * data types. If a data type is not such a type its dropped and a WARN is
+ * logged. And it only persists the Message body and the Message headers.
+ * The Exchange properties are not persisted.
+ *
+ * Note: This new version is built from the old one and simply adds the version
+ * property to properly support the optimistic locking also on a distributed
+ * environment (multiple JVMs or independent threads).
+ */
+public class JdbcAggregationRepositoryNew extends ServiceSupport implements
RecoverableAggregationRepository, OptimisticLockingAggregationRepository {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(JdbcAggregationRepositoryNew.class);
+
+ protected static final String EXCHANGE = "exchange";
+ protected static final String ID = "id";
+ protected static final String BODY = "body";
+
+ // version identifier needed to avoid the lost update problem
+ private static final String VERSION = "version";
+ private static final String VERSION_PROPERTY =
"CamelOptimisticLockVersion";
+
+ private static final Constants PROPAGATION_CONSTANTS = new
Constants(TransactionDefinition.class);
+
+ private JdbcOptimisticLockingExceptionMapper
jdbcOptimisticLockingExceptionMapper = new
DefaultJdbcOptimisticLockingExceptionMapper();
+ private PlatformTransactionManager transactionManager;
+ private DataSource dataSource;
+ private TransactionTemplate transactionTemplate;
+ private TransactionTemplate transactionTemplateReadOnly;
+ private int propagationBehavior =
TransactionDefinition.PROPAGATION_REQUIRED;
+ private JdbcTemplate jdbcTemplate;
+ private LobHandler lobHandler = new DefaultLobHandler();
+ private String repositoryName;
+ private boolean returnOldExchange;
+ private JdbcCamelCodec codec = new JdbcCamelCodec();
+ private long recoveryInterval = 5000;
+ private boolean useRecovery = true;
+ private int maximumRedeliveries;
+ private String deadLetterUri;
+ private List<String> headersToStoreAsText;
+ private boolean storeBodyAsText;
+ private boolean allowSerializedHeaders;
+
+ /**
+ * Creates an aggregation repository
+ */
+ public JdbcAggregationRepositoryNew() {
+ }
+
+ /**
+ * Creates an aggregation repository with the three mandatory parameters
+ */
+ public JdbcAggregationRepositoryNew(PlatformTransactionManager
transactionManager, String repositoryName, DataSource dataSource) {
+ this.setRepositoryName(repositoryName);
+ this.setTransactionManager(transactionManager);
+ this.setDataSource(dataSource);
+ }
+
+ /**
+ * Sets the name of the repository
+ */
+ public final void setRepositoryName(String repositoryName) {
+ this.repositoryName = repositoryName;
+ }
+
+ public final void setTransactionManager(PlatformTransactionManager
transactionManager) {
+ this.transactionManager = transactionManager;
+ }
+
+ /**
+ * Sets the DataSource to use for accessing the database
+ */
+ public final void setDataSource(DataSource dataSource) {
+ this.dataSource = dataSource;
+
+ jdbcTemplate = new JdbcTemplate(dataSource);
+ }
+
+ @Override
+ public Exchange add(final CamelContext camelContext, final String
correlationId,
+ final Exchange oldExchange, final Exchange
newExchange) throws OptimisticLockingException {
+
+ try {
+ return add(camelContext, correlationId, newExchange);
+ } catch (Exception e) {
+ if (jdbcOptimisticLockingExceptionMapper != null &&
jdbcOptimisticLockingExceptionMapper.isOptimisticLocking(e)) {
+ throw new OptimisticLockingException();
+ } else {
+ throw ObjectHelper.wrapRuntimeCamelException(e);
+ }
+ }
+ }
+
+ @Override
+ public Exchange add(final CamelContext camelContext, final String
correlationId, final Exchange exchange) {
+ return transactionTemplate.execute(new TransactionCallback<Exchange>()
{
+
+ public Exchange doInTransaction(TransactionStatus status) {
+ Exchange result = null;
+ final String key = correlationId;
+
+ try {
+ LOG.debug("Adding exchange with key {}", key);
+
+ boolean present = jdbcTemplate.queryForObject(
+ "SELECT COUNT(1) FROM " + getRepositoryName() + "
WHERE " + ID + " = ?", Integer.class, key) != 0;
+
+ // Recover existing exchange with that ID
+ if (isReturnOldExchange() && present) {
+ result = get(key, getRepositoryName(), camelContext);
+ }
+
+ if (present) {
+ long version = exchange.getProperty(VERSION_PROPERTY,
Long.class);
+ LOG.debug("Updating record with key {} and version
{}", key, version);
+ update(camelContext, correlationId, exchange,
getRepositoryName(), version);
+ } else {
+ LOG.debug("Inserting record with key {}");
+ insert(camelContext, correlationId, exchange,
getRepositoryName(), 1L);
+ }
+
+ } catch (Exception e) {
+ throw new RuntimeException("Error adding to repository " +
repositoryName + " with key " + key, e);
+ }
+
+ return result;
+ }
+ });
+ }
+
+ /**
+ * Updates the current exchange details in the given repository table.
+ *
+ * @param camelContext Current CamelContext
+ * @param key Correlation key
+ * @param exchange Aggregated exchange
+ * @param repositoryName Table's name
+ * @param version Version identifier
+ */
+ protected void update(final CamelContext camelContext, final String key,
final Exchange exchange, String repositoryName, Long version) throws Exception {
+ StringBuilder queryBuilder = new StringBuilder()
+ .append("UPDATE ").append(repositoryName)
+ .append(" SET ")
+ .append(EXCHANGE).append(" = ?")
+ .append(", ")
+ .append(VERSION).append(" = ?");
+ if (storeBodyAsText) {
+ queryBuilder.append(", ").append(BODY).append(" = ?");
+ }
+
+ if (hasHeadersToStoreAsText()) {
+ for (String headerName : headersToStoreAsText) {
+ queryBuilder.append(", ").append(headerName).append(" = ?");
+ }
+ }
+
+ queryBuilder.append(" WHERE ")
+ .append(ID).append(" = ?")
+ .append(" AND ")
+ .append(VERSION).append(" = ?");
+
+ String sql = queryBuilder.toString();
+ updateHelper(camelContext, key, exchange, sql, version);
+ }
+
+ /**
+ * Inserts a new record into the given repository table.
+ * Note: the exchange properties are NOT persisted.
+ *
+ * @param camelContext Current CamelContext
+ * @param correlationId Correlation key
+ * @param exchange Aggregated exchange to insert
+ * @param repositoryName Table's name
+ * @param version Version identifier
+ */
+ protected void insert(final CamelContext camelContext, final String
correlationId, final Exchange exchange, String repositoryName, Long version)
throws Exception {
+ // The default totalParameterIndex is 3 for ID, Exchange and version.
Depending on logic this will be increased.
+ int totalParameterIndex = 3;
+ StringBuilder queryBuilder = new StringBuilder()
+ .append("INSERT INTO ").append(repositoryName)
+ .append('(').append(EXCHANGE)
+ .append(", ").append(ID)
+ .append(", ").append(VERSION);
+
+ if (storeBodyAsText) {
+ queryBuilder.append(", ").append(BODY);
+ totalParameterIndex++;
+ }
+
+ if (hasHeadersToStoreAsText()) {
+ for (String headerName : headersToStoreAsText) {
+ queryBuilder.append(", ").append(headerName);
+ totalParameterIndex++;
+ }
+ }
+
+ queryBuilder.append(") VALUES (");
+
+ for (int i = 0; i < totalParameterIndex - 1; i++) {
+ queryBuilder.append("?, ");
+ }
+ queryBuilder.append("?)");
+
+ String sql = queryBuilder.toString();
+
+ insertHelper(camelContext, correlationId, exchange, sql, version);
+ }
+
+ protected int insertHelper(final CamelContext camelContext, final String
key, final Exchange exchange, String sql, final Long version) throws Exception {
+ final byte[] data = codec.marshallExchange(camelContext, exchange,
allowSerializedHeaders);
+ Integer insertCount = jdbcTemplate.execute(sql,
+ new
AbstractLobCreatingPreparedStatementCallback(getLobHandler()) {
+ @Override
+ protected void setValues(PreparedStatement ps, LobCreator
lobCreator) throws SQLException {
+ int totalParameterIndex = 0;
+ lobCreator.setBlobAsBytes(ps, ++totalParameterIndex,
data);
+ ps.setString(++totalParameterIndex, key);
+ ps.setLong(++totalParameterIndex, version);
+ if (storeBodyAsText) {
+ ps.setString(++totalParameterIndex,
exchange.getIn().getBody(String.class));
+ }
+ if (hasHeadersToStoreAsText()) {
+ for (String headerName : headersToStoreAsText) {
+ String headerValue =
exchange.getIn().getHeader(headerName, String.class);
+ ps.setString(++totalParameterIndex,
headerValue);
+ }
+ }
+ }
+ });
+ return insertCount == null ? 0 : insertCount;
+ }
+
+ protected int updateHelper(final CamelContext camelContext, final String
key, final Exchange exchange, String sql, final Long version) throws Exception {
+ final byte[] data = codec.marshallExchange(camelContext, exchange,
allowSerializedHeaders);
+ Integer updateCount = jdbcTemplate.execute(sql,
+ new
AbstractLobCreatingPreparedStatementCallback(getLobHandler()) {
+ @Override
+ protected void setValues(PreparedStatement ps, LobCreator
lobCreator) throws SQLException {
+ int totalParameterIndex = 0;
+ lobCreator.setBlobAsBytes(ps, ++totalParameterIndex,
data);
+ ps.setLong(++totalParameterIndex, version + 1);
+ if (storeBodyAsText) {
+ ps.setString(++totalParameterIndex,
exchange.getIn().getBody(String.class));
+ }
+ if (hasHeadersToStoreAsText()) {
+ for (String headerName : headersToStoreAsText) {
+ String headerValue =
exchange.getIn().getHeader(headerName, String.class);
+ ps.setString(++totalParameterIndex,
headerValue);
+ }
+ }
+ ps.setString(++totalParameterIndex, key);
+ ps.setLong(++totalParameterIndex, version);
+ }
+ });
+ if (updateCount == 1) {
+ return updateCount;
+ } else {
+ throw new RuntimeException(String.format("Stale version: error
updating record with key %s and version %s", key, version));
+ }
+ }
+
+ @Override
+ public Exchange get(final CamelContext camelContext, final String
correlationId) {
+ final String key = correlationId;
+ Exchange result = get(key, getRepositoryName(), camelContext);
+ LOG.debug("Getting key {} -> {}", key, result);
+ return result;
+ }
+
+ private Exchange get(final String key, final String repositoryName, final
CamelContext camelContext) {
+ return transactionTemplateReadOnly.execute(new
TransactionCallback<Exchange>() {
+ public Exchange doInTransaction(TransactionStatus status) {
+ try {
+
+ Map<String, Object> columns = jdbcTemplate.queryForMap(
+ String.format("SELECT %1$s, %2$s FROM %3$s WHERE
%4$s=?", EXCHANGE, VERSION, repositoryName, ID),
+ new Object[]{key}, new int[]{Types.VARCHAR});
+
+ byte[] marshalledExchange = (byte[]) columns.get(EXCHANGE);
+ long version = (long) columns.get(VERSION);
+
+ Exchange result = codec.unmarshallExchange(camelContext,
marshalledExchange);
+ result.setProperty(VERSION_PROPERTY, version);
+ return result;
+
+ } catch (EmptyResultDataAccessException ex) {
+ return null;
+ } catch (IOException ex) {
+ // Rollback the transaction
+ throw new RuntimeException("Error getting key " + key + "
from repository " + repositoryName, ex);
+ } catch (ClassNotFoundException ex) {
+ // Rollback the transaction
+ throw new RuntimeException(ex);
+ }
+ }
+ });
+ }
+
+ @Override
+ public void remove(final CamelContext camelContext, final String
correlationId, final Exchange exchange) {
+ transactionTemplate.execute(new TransactionCallbackWithoutResult() {
+ protected void doInTransactionWithoutResult(TransactionStatus
status) {
+ final String key = correlationId;
+ final String confirmKey = exchange.getExchangeId();
+ final long version = exchange.getProperty(VERSION_PROPERTY,
Long.class);
+ try {
+ LOG.debug("Removing key {}", key);
+
+ jdbcTemplate.update("DELETE FROM " + getRepositoryName() +
" WHERE " + ID + " = ? AND " + VERSION + " = ?", key, version);
+
+ insert(camelContext, confirmKey, exchange,
getRepositoryNameCompleted(), version);
+
+ } catch (Exception e) {
+ throw new RuntimeException("Error removing key " + key + "
from repository " + repositoryName, e);
+ }
+ }
+ });
+ }
+
+ @Override
+ public void confirm(final CamelContext camelContext, final String
exchangeId) {
+ transactionTemplate.execute(new TransactionCallbackWithoutResult() {
+ protected void doInTransactionWithoutResult(TransactionStatus
status) {
+ LOG.debug("Confirming exchangeId {}", exchangeId);
+ final String confirmKey = exchangeId;
+
+ jdbcTemplate.update("DELETE FROM " +
getRepositoryNameCompleted() + " WHERE " + ID + " = ?",
+ new Object[]{confirmKey});
+
+ }
+ });
+ }
+
+ @Override
+ public Set<String> getKeys() {
+ return getKeys(getRepositoryName());
+ }
+
+ @Override
+ public Set<String> scan(CamelContext camelContext) {
+ return getKeys(getRepositoryNameCompleted());
+ }
+
+ /**
+ * Returns the keys in the given repository
+ *
+ * @param repositoryName The name of the table
+ * @return Set of keys in the given repository name
+ */
+ protected Set<String> getKeys(final String repositoryName) {
+ return transactionTemplateReadOnly.execute(new
TransactionCallback<LinkedHashSet<String>>() {
+ public LinkedHashSet<String> doInTransaction(TransactionStatus
status) {
+ List<String> keys = jdbcTemplate.query("SELECT " + ID + " FROM
" + repositoryName,
+ new RowMapper<String>() {
+ public String mapRow(ResultSet rs, int rowNum)
throws SQLException {
+ String id = rs.getString(ID);
+ LOG.trace("getKey {}", id);
+ return id;
+ }
+ });
+ return new LinkedHashSet<>(keys);
+ }
+ });
+ }
+
+ @Override
+ public Exchange recover(CamelContext camelContext, String exchangeId) {
+ final String key = exchangeId;
+ Exchange answer = get(key, getRepositoryNameCompleted(), camelContext);
+ LOG.debug("Recovering exchangeId {} -> {}", key, answer);
+ return answer;
+ }
+
+ /**
+ * If recovery is enabled then a background task is run every x'th time
to scan for failed exchanges to recover
+ * and resubmit. By default this interval is 5000 millis.
+ */
+ @Override
+ public void setRecoveryInterval(long interval, TimeUnit timeUnit) {
+ this.recoveryInterval = timeUnit.toMillis(interval);
+ }
+
+ @Override
+ public void setRecoveryInterval(long interval) {
+ this.recoveryInterval = interval;
+ }
+
+ @Override
+ public long getRecoveryIntervalInMillis() {
+ return recoveryInterval;
+ }
+
+ @Override
+ public boolean isUseRecovery() {
+ return useRecovery;
+ }
+
+ /**
+ * Whether or not recovery is enabled. This option is by default true.
When enabled the Camel
+ * Aggregator automatic recover failed aggregated exchange and have them
resubmitted.
+ */
+ @Override
+ public void setUseRecovery(boolean useRecovery) {
+ this.useRecovery = useRecovery;
+ }
+
+ @Override
+ public int getMaximumRedeliveries() {
+ return maximumRedeliveries;
+ }
+
+ @Override
+ public void setMaximumRedeliveries(int maximumRedeliveries) {
+ this.maximumRedeliveries = maximumRedeliveries;
+ }
+
+ @Override
+ public String getDeadLetterUri() {
+ return deadLetterUri;
+ }
+
+ /**
+ * An endpoint uri for a Dead Letter Channel where exhausted recovered
Exchanges will be
+ * moved. If this option is used then the maximumRedeliveries option must
also be provided.
+ * Important note : if the deadletter route throws an exception, it will
be send again to DLQ until it succeed !
+ */
+ public void setDeadLetterUri(String deadLetterUri) {
+ this.deadLetterUri = deadLetterUri;
+ }
+
+ public boolean isReturnOldExchange() {
+ return returnOldExchange;
+ }
+
+ /**
+ * Whether the get operation should return the old existing Exchange if
any existed.
+ * By default this option is false to optimize as we do not need the old
exchange when aggregating.
+ */
+ public void setReturnOldExchange(boolean returnOldExchange) {
+ this.returnOldExchange = returnOldExchange;
+ }
+
+ public void setJdbcCamelCodec(JdbcCamelCodec codec) {
+ this.codec = codec;
+ }
+
+ public boolean hasHeadersToStoreAsText() {
+ return this.headersToStoreAsText != null &&
!this.headersToStoreAsText.isEmpty();
+ }
+
+ public List<String> getHeadersToStoreAsText() {
+ return headersToStoreAsText;
+ }
+
+ /**
+ * Allows to store headers as String which is human readable. By default
this option is disabled,
+ * storing the headers in binary format.
+ *
+ * @param headersToStoreAsText the list of headers to store as String
+ */
+ public void setHeadersToStoreAsText(List<String> headersToStoreAsText) {
+ this.headersToStoreAsText = headersToStoreAsText;
+ }
+
+ public boolean isStoreBodyAsText() {
+ return storeBodyAsText;
+ }
+
+ /**
+ * Whether to store the message body as String which is human readable.
+ * By default this option is false storing the body in binary format.
+ */
+ public void setStoreBodyAsText(boolean storeBodyAsText) {
+ this.storeBodyAsText = storeBodyAsText;
+ }
+
+ public boolean isAllowSerializedHeaders() {
+ return allowSerializedHeaders;
+ }
+
+ public void setAllowSerializedHeaders(boolean allowSerializedHeaders) {
+ this.allowSerializedHeaders = allowSerializedHeaders;
+ }
+
+ public int getPropagationBehavior() {
+ return propagationBehavior;
+ }
+
+ /**
+ * Sets propagation behavior to use with spring transaction templates
which are used for database access.
+ * The default is TransactionDefinition.PROPAGATION_REQUIRED.
+ */
+ public void setPropagationBehavior(int propagationBehavior) {
+ this.propagationBehavior = propagationBehavior;
+ }
+
+ /**
+ * Sets propagation behavior to use with spring transaction templates
which are used for database access.
+ * The default is TransactionDefinition.PROPAGATION_REQUIRED. This setter
accepts names of the constants, like
+ * "PROPAGATION_REQUIRED".
+ * @param propagationBehaviorName
+ */
+ public void setPropagationBehaviorName(String propagationBehaviorName) {
+ if
(!propagationBehaviorName.startsWith(DefaultTransactionDefinition.PREFIX_PROPAGATION))
{
+ throw new IllegalArgumentException("Only propagation constants
allowed");
+ }
+
setPropagationBehavior(PROPAGATION_CONSTANTS.asNumber(propagationBehaviorName).intValue());
+ }
+
+ public LobHandler getLobHandler() {
+ return lobHandler;
+ }
+
+ /**
+ * Sets a custom LobHandler to use
+ */
+ public void setLobHandler(LobHandler lobHandler) {
+ this.lobHandler = lobHandler;
+ }
+
+ public JdbcOptimisticLockingExceptionMapper
getJdbcOptimisticLockingExceptionMapper() {
+ return jdbcOptimisticLockingExceptionMapper;
+ }
+
+ public void
setJdbcOptimisticLockingExceptionMapper(JdbcOptimisticLockingExceptionMapper
jdbcOptimisticLockingExceptionMapper) {
+ this.jdbcOptimisticLockingExceptionMapper =
jdbcOptimisticLockingExceptionMapper;
+ }
+
+ public String getRepositoryName() {
+ return repositoryName;
+ }
+
+ public String getRepositoryNameCompleted() {
+ return getRepositoryName() + "_completed";
+ }
+
+ @Override
+ protected void doStart() throws Exception {
+ ObjectHelper.notNull(repositoryName, "RepositoryName");
+ ObjectHelper.notNull(transactionManager, "TransactionManager");
+ ObjectHelper.notNull(dataSource, "DataSource");
+
+ transactionTemplate = new TransactionTemplate(transactionManager);
+ transactionTemplate.setPropagationBehavior(propagationBehavior);
+
+ transactionTemplateReadOnly = new
TransactionTemplate(transactionManager);
+
transactionTemplateReadOnly.setPropagationBehavior(propagationBehavior);
+ transactionTemplateReadOnly.setReadOnly(true);
+
+ // log number of existing exchanges
+ int current = getKeys().size();
+ int completed = scan(null).size();
+
+ if (current > 0) {
+ LOG.info("On startup there are " + current + " aggregate exchanges
(not completed) in repository: " + getRepositoryName());
+ } else {
+ LOG.info("On startup there are no existing aggregate exchanges
(not completed) in repository: {}", getRepositoryName());
+ }
+ if (completed > 0) {
+ LOG.warn("On startup there are " + completed + " completed
exchanges to be recovered in repository: " + getRepositoryNameCompleted());
+ } else {
+ LOG.info("On startup there are no completed exchanges to be
recovered in repository: {}", getRepositoryNameCompleted());
+ }
+ }
+
+ @Override
+ protected void doStop() throws Exception {
+ // noop
+ }
+
+}
diff --git
a/components/camel-sql/src/test/java/org/apache/camel/processor/aggregate/jdbc/AbstractJdbcAggregationTestSupport.java
b/components/camel-sql/src/test/java/org/apache/camel/processor/aggregate/jdbc/AbstractJdbcAggregationTestSupport.java
index 94d4777..1f7229d 100644
---
a/components/camel-sql/src/test/java/org/apache/camel/processor/aggregate/jdbc/AbstractJdbcAggregationTestSupport.java
+++
b/components/camel-sql/src/test/java/org/apache/camel/processor/aggregate/jdbc/AbstractJdbcAggregationTestSupport.java
@@ -30,14 +30,14 @@ public abstract class AbstractJdbcAggregationTestSupport
extends CamelSpringTest
@Override
public void postProcessTest() throws Exception {
super.postProcessTest();
-
+
repo = applicationContext.getBean("repo1",
JdbcAggregationRepository.class);
configureJdbcAggregationRepository();
}
-
+
void configureJdbcAggregationRepository() {
}
-
+
@Override
protected RouteBuilder createRouteBuilder() throws Exception {
return new RouteBuilder() {
@@ -54,16 +54,23 @@ public abstract class AbstractJdbcAggregationTestSupport
extends CamelSpringTest
// END SNIPPET: e1
};
}
-
+
long getCompletionInterval() {
return 5000;
}
-
+
@Override
protected AbstractApplicationContext createApplicationContext() {
return new
ClassPathXmlApplicationContext("org/apache/camel/processor/aggregate/jdbc/JdbcSpringDataSource.xml");
}
+ protected Exchange repoAddAndGet(String key, Exchange exchange) {
+ repo.add(context, key, exchange);
+ // recover the exchange with the new version to be able to add again
+ exchange = repo.get(context, key);
+ return exchange;
+ }
+
public static class MyAggregationStrategy implements AggregationStrategy {
public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
diff --git
a/components/camel-sql/src/test/java/org/apache/camel/processor/aggregate/jdbc/JdbcAggregateLoadAndRecoverTest.java
b/components/camel-sql/src/test/java/org/apache/camel/processor/aggregate/jdbc/JdbcAggregateLoadAndRecoverTest.java
index 840184e..b58812b 100644
---
a/components/camel-sql/src/test/java/org/apache/camel/processor/aggregate/jdbc/JdbcAggregateLoadAndRecoverTest.java
+++
b/components/camel-sql/src/test/java/org/apache/camel/processor/aggregate/jdbc/JdbcAggregateLoadAndRecoverTest.java
@@ -38,7 +38,7 @@ public class JdbcAggregateLoadAndRecoverTest extends
AbstractJdbcAggregationTest
public void testLoadAndRecoverJdbcAggregate() throws Exception {
MockEndpoint mock = getMockEndpoint("mock:result");
mock.expectedMessageCount(SIZE / 10);
- mock.setResultWaitTime(50 * 1000);
+ mock.setResultWaitTime(5_000);
LOG.info("Staring to send " + SIZE + " messages.");
@@ -73,13 +73,17 @@ public class JdbcAggregateLoadAndRecoverTest extends
AbstractJdbcAggregationTest
return new RouteBuilder() {
@Override
public void configure() throws Exception {
+ onException(IllegalStateException.class)
+ .maximumRedeliveries(3)
+ .redeliveryDelay(100L);
+
from("seda:start?size=" + SIZE)
.to("log:input?groupSize=500")
.aggregate(header("id"), new MyAggregationStrategy())
.aggregationRepository(repo)
.completionSize(10)
.to("log:output?showHeaders=true")
- // have every 10th exchange fail which should
then be recovered
+ // have every 10th exchange fail which should then be
recovered
.process(new Processor() {
public void process(Exchange exchange) throws
Exception {
//Avoid same message to be discarded twice
diff --git
a/components/camel-sql/src/test/java/org/apache/camel/processor/aggregate/jdbc/JdbcAggregateNotLostTest.java
b/components/camel-sql/src/test/java/org/apache/camel/processor/aggregate/jdbc/JdbcAggregateNotLostTest.java
index 0c75a88..8722d8d 100644
---
a/components/camel-sql/src/test/java/org/apache/camel/processor/aggregate/jdbc/JdbcAggregateNotLostTest.java
+++
b/components/camel-sql/src/test/java/org/apache/camel/processor/aggregate/jdbc/JdbcAggregateNotLostTest.java
@@ -35,11 +35,11 @@ public class JdbcAggregateNotLostTest extends
AbstractJdbcAggregationTestSupport
template.sendBodyAndHeader("direct:start", "D", "id", 123);
template.sendBodyAndHeader("direct:start", "E", "id", 123);
- assertMockEndpointsSatisfied(30, TimeUnit.SECONDS);
+ assertMockEndpointsSatisfied(5, TimeUnit.SECONDS);
String exchangeId =
getMockEndpoint("mock:aggregated").getReceivedExchanges().get(0).getExchangeId();
-
// the exchange should be in the completed repo where we should be
able to find it
+
Exchange completed = repo.recover(context, exchangeId);
// assert the exchange was not lost and we got all the information
still
assertNotNull(completed);
@@ -63,7 +63,7 @@ public class JdbcAggregateNotLostTest extends
AbstractJdbcAggregationTestSupport
.completionSize(5).aggregationRepository(repo)
.log("aggregated exchange id ${exchangeId} with
${body}")
.to("mock:aggregated")
- // throw an exception to fail, which we then
will loose this message
+ // throw an exception to fail, which we then will
loose this message
.throwException(new IllegalArgumentException("Damn"))
.to("mock:result")
.end();
diff --git
a/components/camel-sql/src/test/java/org/apache/camel/processor/aggregate/jdbc/JdbcAggregateRecoverDeadLetterChannelTest.java
b/components/camel-sql/src/test/java/org/apache/camel/processor/aggregate/jdbc/JdbcAggregateRecoverDeadLetterChannelTest.java
index af26a56..39f6b2e 100644
---
a/components/camel-sql/src/test/java/org/apache/camel/processor/aggregate/jdbc/JdbcAggregateRecoverDeadLetterChannelTest.java
+++
b/components/camel-sql/src/test/java/org/apache/camel/processor/aggregate/jdbc/JdbcAggregateRecoverDeadLetterChannelTest.java
@@ -69,4 +69,4 @@ public class JdbcAggregateRecoverDeadLetterChannelTest
extends AbstractJdbcAggre
}
};
}
-}
\ No newline at end of file
+}
diff --git
a/components/camel-sql/src/test/java/org/apache/camel/processor/aggregate/jdbc/JdbcAggregationRepositoryAlotDataTest.java
b/components/camel-sql/src/test/java/org/apache/camel/processor/aggregate/jdbc/JdbcAggregationRepositoryAlotDataTest.java
index 5025c40..bd3af20 100644
---
a/components/camel-sql/src/test/java/org/apache/camel/processor/aggregate/jdbc/JdbcAggregationRepositoryAlotDataTest.java
+++
b/components/camel-sql/src/test/java/org/apache/camel/processor/aggregate/jdbc/JdbcAggregationRepositoryAlotDataTest.java
@@ -67,4 +67,4 @@ public class JdbcAggregationRepositoryAlotDataTest extends
AbstractJdbcAggregati
assertEquals("counter:" + i, actual.getIn().getBody());
}
}
-}
\ No newline at end of file
+}
diff --git
a/components/camel-sql/src/test/java/org/apache/camel/processor/aggregate/jdbc/JdbcAggregationRepositoryMultipleRepoTest.java
b/components/camel-sql/src/test/java/org/apache/camel/processor/aggregate/jdbc/JdbcAggregationRepositoryMultipleRepoTest.java
index be07b7b..e41ef7f 100644
---
a/components/camel-sql/src/test/java/org/apache/camel/processor/aggregate/jdbc/JdbcAggregationRepositoryMultipleRepoTest.java
+++
b/components/camel-sql/src/test/java/org/apache/camel/processor/aggregate/jdbc/JdbcAggregationRepositoryMultipleRepoTest.java
@@ -53,8 +53,9 @@ public class JdbcAggregationRepositoryMultipleRepoTest
extends CamelSpringTestSu
assertEquals("counter:1", actual.getIn().getBody());
assertEquals(null, repo2.get(context, "foo"));
- // Change it..
+ // Change it after reading the current exchange with version
Exchange exchange2 = new DefaultExchange(context);
+ exchange2 = repo1.get(context, "foo");
exchange2.getIn().setBody("counter:2");
actual = repo1.add(context, "foo", exchange2);
// the old one
diff --git
a/components/camel-sql/src/test/java/org/apache/camel/processor/aggregate/jdbc/JdbcAggregationRepositoryRecoverExistingTest.java
b/components/camel-sql/src/test/java/org/apache/camel/processor/aggregate/jdbc/JdbcAggregationRepositoryRecoverExistingTest.java
index 1a2361f..f750f7d 100644
---
a/components/camel-sql/src/test/java/org/apache/camel/processor/aggregate/jdbc/JdbcAggregationRepositoryRecoverExistingTest.java
+++
b/components/camel-sql/src/test/java/org/apache/camel/processor/aggregate/jdbc/JdbcAggregationRepositoryRecoverExistingTest.java
@@ -28,7 +28,7 @@ public class JdbcAggregationRepositoryRecoverExistingTest
extends AbstractJdbcAg
repo.setReturnOldExchange(true);
repo.setUseRecovery(true);
}
-
+
@Override
protected RouteBuilder createRouteBuilder() throws Exception {
return new RouteBuilder() {
@@ -48,7 +48,8 @@ public class JdbcAggregationRepositoryRecoverExistingTest
extends AbstractJdbcAg
Exchange actual = repo.add(context, "foo", exchange1);
assertEquals(null, actual);
- // Remove it, which makes it in the pre confirm stage
+ // Get and remove it, which makes it in the pre confirm stage
+ exchange1 = repo.get(context, "foo");
repo.remove(context, "foo", exchange1);
String id = exchange1.getExchangeId();
diff --git
a/components/camel-sql/src/test/java/org/apache/camel/processor/aggregate/jdbc/JdbcAggregationRepositoryTest.java
b/components/camel-sql/src/test/java/org/apache/camel/processor/aggregate/jdbc/JdbcAggregationRepositoryTest.java
index 69b377a..c2daf52 100644
---
a/components/camel-sql/src/test/java/org/apache/camel/processor/aggregate/jdbc/JdbcAggregationRepositoryTest.java
+++
b/components/camel-sql/src/test/java/org/apache/camel/processor/aggregate/jdbc/JdbcAggregationRepositoryTest.java
@@ -25,7 +25,7 @@ public class JdbcAggregationRepositoryTest extends
AbstractJdbcAggregationTestSu
void configureJdbcAggregationRepository() {
repo.setReturnOldExchange(true);
}
-
+
@Test
public void testOperations() {
// Can't get something we have not put in...
@@ -42,8 +42,9 @@ public class JdbcAggregationRepositoryTest extends
AbstractJdbcAggregationTestSu
actual = repo.get(context, "foo");
assertEquals("counter:1", actual.getIn().getBody());
- // Change it..
+ // Change it after reading the current exchange with version
Exchange exchange2 = new DefaultExchange(context);
+ exchange2 = repo.get(context, "foo");
exchange2.getIn().setBody("counter:2");
actual = repo.add(context, "foo", exchange2);
// the old one
diff --git
a/components/camel-sql/src/test/java/org/apache/camel/processor/aggregate/jdbc/JdbcExchangeSerializationTest.java
b/components/camel-sql/src/test/java/org/apache/camel/processor/aggregate/jdbc/JdbcExchangeSerializationTest.java
index 7f16fe0..de25da9 100644
---
a/components/camel-sql/src/test/java/org/apache/camel/processor/aggregate/jdbc/JdbcExchangeSerializationTest.java
+++
b/components/camel-sql/src/test/java/org/apache/camel/processor/aggregate/jdbc/JdbcExchangeSerializationTest.java
@@ -26,6 +26,7 @@ public class JdbcExchangeSerializationTest extends
AbstractJdbcAggregationTestSu
@Test
public void testExchangeSerialization() {
+ final String key = "foo";
Exchange exchange = new DefaultExchange(context);
exchange.getIn().setBody("Hello World");
exchange.getIn().setHeader("name", "Olivier");
@@ -35,9 +36,9 @@ public class JdbcExchangeSerializationTest extends
AbstractJdbcAggregationTestSu
Date now = new Date();
exchange.getIn().setHeader("date", now);
- repo.add(context, "foo", exchange);
+ exchange = repoAddAndGet(key, exchange);
- Exchange actual = repo.get(context, "foo");
+ Exchange actual = repo.get(context, key);
assertEquals("Hello World", actual.getIn().getBody());
assertEquals("Olivier", actual.getIn().getHeader("name"));
assertEquals(123, actual.getIn().getHeader("number"));
@@ -53,9 +54,9 @@ public class JdbcExchangeSerializationTest extends
AbstractJdbcAggregationTestSu
exchange.getIn().setHeader("name", "Thomas");
exchange.getIn().removeHeader("date");
- repo.add(context, "foo", exchange);
+ exchange = repoAddAndGet(key, exchange);
- actual = repo.get(context, "foo");
+ actual = repo.get(context, key);
assertEquals("Bye World", actual.getIn().getBody());
assertEquals("Thomas", actual.getIn().getHeader("name"));
assertEquals(123, actual.getIn().getHeader("number"));
diff --git
a/components/camel-sql/src/test/java/org/apache/camel/processor/aggregate/jdbc/JdbcGrowIssueTest.java
b/components/camel-sql/src/test/java/org/apache/camel/processor/aggregate/jdbc/JdbcGrowIssueTest.java
index 66ffd37..b919b25 100644
---
a/components/camel-sql/src/test/java/org/apache/camel/processor/aggregate/jdbc/JdbcGrowIssueTest.java
+++
b/components/camel-sql/src/test/java/org/apache/camel/processor/aggregate/jdbc/JdbcGrowIssueTest.java
@@ -31,8 +31,8 @@ public class JdbcGrowIssueTest extends
AbstractJdbcAggregationTestSupport {
for (int i = 0; i < SIZE; i++) {
sb.append("X");
}
- Exchange item = new DefaultExchange(context);
- item.getIn().setBody(sb.toString(), String.class);
+ Exchange exchange = new DefaultExchange(context);
+ exchange.getIn().setBody(sb.toString(), String.class);
// the key
final String key = "foo";
@@ -40,7 +40,7 @@ public class JdbcGrowIssueTest extends
AbstractJdbcAggregationTestSupport {
// we update using the same key, which means we should be able to do
this within the file size limit
for (int i = 0; i < SIZE; i++) {
log.debug("Updating " + i);
- repo.add(context, key, item);
+ exchange = repoAddAndGet(key, exchange);
}
// get the last
diff --git a/components/camel-sql/src/test/resources/sql/init.sql
b/components/camel-sql/src/test/resources/sql/init.sql
index 5396248..271e72e 100644
--- a/components/camel-sql/src/test/resources/sql/init.sql
+++ b/components/camel-sql/src/test/resources/sql/init.sql
@@ -24,4 +24,4 @@ CREATE TABLE aggregationRepo1_completed (
id varchar(255) NOT NULL,
exchange blob NOT NULL,
constraint aggregationRepo1_completed_pk PRIMARY KEY (id)
-);
\ No newline at end of file
+);
diff --git a/components/camel-sql/src/test/resources/sql/init2.sql
b/components/camel-sql/src/test/resources/sql/init2.sql
index 515179a..18f17c0 100644
--- a/components/camel-sql/src/test/resources/sql/init2.sql
+++ b/components/camel-sql/src/test/resources/sql/init2.sql
@@ -24,4 +24,4 @@ CREATE TABLE aggregationRepo2_completed (
id varchar(255) NOT NULL,
exchange blob NOT NULL,
constraint aggregationRepo2_completed_pk PRIMARY KEY (id)
-);
\ No newline at end of file
+);
diff --git a/components/camel-sql/src/test/resources/sql/init3.sql
b/components/camel-sql/src/test/resources/sql/init3.sql
index 2bef291..69450d0 100644
--- a/components/camel-sql/src/test/resources/sql/init3.sql
+++ b/components/camel-sql/src/test/resources/sql/init3.sql
@@ -30,4 +30,4 @@ CREATE TABLE aggregationRepo3_completed (
companyName varchar(1000),
accountName varchar(1000),
constraint aggregationRepo3_completed_pk PRIMARY KEY (id)
-);
\ No newline at end of file
+);