This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch camel-2.25.x
in repository https://gitbox.apache.org/repos/asf/camel.git


The following commit(s) were added to refs/heads/camel-2.25.x by this push:
     new 20520d5  [CAMEL-7810] Fix JdbcAggregationRepository optimistic locking 
feature (#3837)
20520d5 is described below

commit 20520d547c81499f6b81cd8765cd1041d4e3e484
Author: Federico Valeri <fval...@users.noreply.github.com>
AuthorDate: Thu May 21 08:44:24 2020 +0200

    [CAMEL-7810] Fix JdbcAggregationRepository optimistic locking feature 
(#3837)
    
    Running JdbcAggregationRepository on a distributed environment (multiple 
independent
    threads or multiple JVMs) we have silently dropped exchanges. This PR aims 
at introducing
    a version identifier to avoid the lost update problem (overwrites when 
updating the
    aggregate row) as stated by the OptimisticLockingAggregationRepository SPI.
    
    Tested running 100 threads to aggregate 10k messages on MySQL 5.7 and 
PostgreSQL 11.5.
    
    Co-authored-by: Federico Valeri <fvaleri@localhost>
---
 camel-core/src/main/docs/language-component.adoc   |   3 +-
 .../camel-sql/src/main/docs/sql-component.adoc     |  19 +
 .../jdbc/JdbcAggregationRepositoryNew.java         | 626 +++++++++++++++++++++
 .../jdbc/AbstractJdbcAggregationTestSupport.java   |  17 +-
 .../jdbc/JdbcAggregateLoadAndRecoverTest.java      |   8 +-
 .../aggregate/jdbc/JdbcAggregateNotLostTest.java   |   6 +-
 .../JdbcAggregateRecoverDeadLetterChannelTest.java |   2 +-
 .../JdbcAggregationRepositoryAlotDataTest.java     |   2 +-
 .../JdbcAggregationRepositoryMultipleRepoTest.java |   3 +-
 ...bcAggregationRepositoryRecoverExistingTest.java |   5 +-
 .../jdbc/JdbcAggregationRepositoryTest.java        |   5 +-
 .../jdbc/JdbcExchangeSerializationTest.java        |   9 +-
 .../aggregate/jdbc/JdbcGrowIssueTest.java          |   6 +-
 .../camel-sql/src/test/resources/sql/init.sql      |   2 +-
 .../camel-sql/src/test/resources/sql/init2.sql     |   2 +-
 .../camel-sql/src/test/resources/sql/init3.sql     |   2 +-
 16 files changed, 689 insertions(+), 28 deletions(-)

diff --git a/camel-core/src/main/docs/language-component.adoc 
b/camel-core/src/main/docs/language-component.adoc
index 241b13d..a3a5620 100644
--- a/camel-core/src/main/docs/language-component.adoc
+++ b/camel-core/src/main/docs/language-component.adoc
@@ -62,12 +62,13 @@ with the following path and query parameters:
 |===
 
 
-=== Query Parameters (6 parameters):
+=== Query Parameters (7 parameters):
 
 
 [width="100%",cols="2,5,^1,2",options="header"]
 |===
 | Name | Description | Default | Type
+| *allowContextMapAll* (producer) | Sets whether the context map should allow 
access to all details. By default only the message body and headers can be 
accessed. This option can be enabled for full access to the current Exchange 
and CamelContext. Doing so impose a potential security risk as this opens 
access to the full power of CamelContext API. | false | boolean
 | *binary* (producer) | Whether the script is binary content or text content. 
By default the script is read as text content (eg java.lang.String) | false | 
boolean
 | *cacheScript* (producer) | Whether to cache the compiled script and reuse 
Notice reusing the script can cause side effects from processing one Camel 
org.apache.camel.Exchange to the next org.apache.camel.Exchange. | false | 
boolean
 | *contentCache* (producer) | Sets whether to use resource content cache or 
not. | false | boolean
diff --git a/components/camel-sql/src/main/docs/sql-component.adoc 
b/components/camel-sql/src/main/docs/sql-component.adoc
index 66a0499..ab9763c 100644
--- a/components/camel-sql/src/main/docs/sql-component.adoc
+++ b/components/camel-sql/src/main/docs/sql-component.adoc
@@ -646,6 +646,25 @@ CREATE TABLE aggregation_completed (
 );
 -----
 
+**Note:** There is a new version called `JdbcAggregationRepositoryNew` which 
supports the optimistic locking
+also on a dstributed environment (multiple JVMs or independent threads). This 
new implementation is built on
+top of the old and needs an additional `version` column:
+
+[source,sql]
+-----
+CREATE TABLE aggregation (
+ id varchar(255) NOT NULL,
+ exchange blob NOT NULL,
+ version BIGINT NOT NULL,
+ constraint aggregation_pk PRIMARY KEY (id)
+);
+CREATE TABLE aggregation_completed (
+ id varchar(255) NOT NULL,
+ exchange blob NOT NULL,
+ version BIGINT NOT NULL,
+ constraint aggregation_completed_pk PRIMARY KEY (id)
+);
+-----
 
 == Storing body and headers as text
 
diff --git 
a/components/camel-sql/src/main/java/org/apache/camel/processor/aggregate/jdbc/JdbcAggregationRepositoryNew.java
 
b/components/camel-sql/src/main/java/org/apache/camel/processor/aggregate/jdbc/JdbcAggregationRepositoryNew.java
new file mode 100644
index 0000000..d559af8
--- /dev/null
+++ 
b/components/camel-sql/src/main/java/org/apache/camel/processor/aggregate/jdbc/JdbcAggregationRepositoryNew.java
@@ -0,0 +1,626 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor.aggregate.jdbc;
+
+import java.io.IOException;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Types;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import javax.sql.DataSource;
+
+import org.apache.camel.CamelContext;
+import org.apache.camel.Exchange;
+import org.apache.camel.spi.OptimisticLockingAggregationRepository;
+import org.apache.camel.spi.RecoverableAggregationRepository;
+import org.apache.camel.support.ServiceSupport;
+import org.apache.camel.util.ObjectHelper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.core.Constants;
+import org.springframework.dao.EmptyResultDataAccessException;
+import org.springframework.jdbc.core.JdbcTemplate;
+import org.springframework.jdbc.core.RowMapper;
+import 
org.springframework.jdbc.core.support.AbstractLobCreatingPreparedStatementCallback;
+import org.springframework.jdbc.support.lob.DefaultLobHandler;
+import org.springframework.jdbc.support.lob.LobCreator;
+import org.springframework.jdbc.support.lob.LobHandler;
+import org.springframework.transaction.PlatformTransactionManager;
+import org.springframework.transaction.TransactionDefinition;
+import org.springframework.transaction.TransactionStatus;
+import org.springframework.transaction.support.DefaultTransactionDefinition;
+import org.springframework.transaction.support.TransactionCallback;
+import 
org.springframework.transaction.support.TransactionCallbackWithoutResult;
+import org.springframework.transaction.support.TransactionTemplate;
+
+/**
+ * JDBC based {@link org.apache.camel.spi.AggregationRepository}
+ * JdbcAggregationRepository will only preserve any Serializable compatible
+ * data types. If a data type is not such a type its dropped and a WARN is
+ * logged. And it only persists the Message body and the Message headers.
+ * The Exchange properties are not persisted.
+ *
+ * Note: This new version is built from the old one and simply adds the version
+ * property to properly support the optimistic locking also on a distributed
+ * environment (multiple JVMs or independent threads).
+ */
+public class JdbcAggregationRepositoryNew extends ServiceSupport implements 
RecoverableAggregationRepository, OptimisticLockingAggregationRepository {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(JdbcAggregationRepositoryNew.class);
+
+    protected static final String EXCHANGE = "exchange";
+    protected static final String ID = "id";
+    protected static final String BODY = "body";
+
+    // version identifier needed to avoid the lost update problem
+    private static final String VERSION = "version";
+    private static final String VERSION_PROPERTY = 
"CamelOptimisticLockVersion";
+
+    private static final Constants PROPAGATION_CONSTANTS = new 
Constants(TransactionDefinition.class);
+
+    private JdbcOptimisticLockingExceptionMapper 
jdbcOptimisticLockingExceptionMapper = new 
DefaultJdbcOptimisticLockingExceptionMapper();
+    private PlatformTransactionManager transactionManager;
+    private DataSource dataSource;
+    private TransactionTemplate transactionTemplate;
+    private TransactionTemplate transactionTemplateReadOnly;
+    private int propagationBehavior = 
TransactionDefinition.PROPAGATION_REQUIRED;
+    private JdbcTemplate jdbcTemplate;
+    private LobHandler lobHandler = new DefaultLobHandler();
+    private String repositoryName;
+    private boolean returnOldExchange;
+    private JdbcCamelCodec codec = new JdbcCamelCodec();
+    private long recoveryInterval = 5000;
+    private boolean useRecovery = true;
+    private int maximumRedeliveries;
+    private String deadLetterUri;
+    private List<String> headersToStoreAsText;
+    private boolean storeBodyAsText;
+    private boolean allowSerializedHeaders;
+
+    /**
+     * Creates an aggregation repository
+     */
+    public JdbcAggregationRepositoryNew() {
+    }
+
+    /**
+     * Creates an aggregation repository with the three mandatory parameters
+     */
+    public JdbcAggregationRepositoryNew(PlatformTransactionManager 
transactionManager, String repositoryName, DataSource dataSource) {
+        this.setRepositoryName(repositoryName);
+        this.setTransactionManager(transactionManager);
+        this.setDataSource(dataSource);
+    }
+
+    /**
+     * Sets the name of the repository
+     */
+    public final void setRepositoryName(String repositoryName) {
+        this.repositoryName = repositoryName;
+    }
+
+    public final void setTransactionManager(PlatformTransactionManager 
transactionManager) {
+        this.transactionManager = transactionManager;
+    }
+
+    /**
+     * Sets the DataSource to use for accessing the database
+     */
+    public final void setDataSource(DataSource dataSource) {
+        this.dataSource = dataSource;
+
+        jdbcTemplate = new JdbcTemplate(dataSource);
+    }
+
+    @Override
+    public Exchange add(final CamelContext camelContext, final String 
correlationId,
+                        final Exchange oldExchange, final Exchange 
newExchange) throws OptimisticLockingException {
+
+        try {
+            return add(camelContext, correlationId, newExchange);
+        } catch (Exception e) {
+            if (jdbcOptimisticLockingExceptionMapper != null && 
jdbcOptimisticLockingExceptionMapper.isOptimisticLocking(e)) {
+                throw new OptimisticLockingException();
+            } else {
+                throw ObjectHelper.wrapRuntimeCamelException(e);
+            }
+        }
+    }
+
+    @Override
+    public Exchange add(final CamelContext camelContext, final String 
correlationId, final Exchange exchange) {
+        return transactionTemplate.execute(new TransactionCallback<Exchange>() 
{
+
+            public Exchange doInTransaction(TransactionStatus status) {
+                Exchange result = null;
+                final String key = correlationId;
+
+                try {
+                    LOG.debug("Adding exchange with key {}", key);
+
+                    boolean present = jdbcTemplate.queryForObject(
+                            "SELECT COUNT(1) FROM " + getRepositoryName() + " 
WHERE " + ID + " = ?", Integer.class, key) != 0;
+
+                    // Recover existing exchange with that ID
+                    if (isReturnOldExchange() && present) {
+                        result = get(key, getRepositoryName(), camelContext);
+                    }
+
+                    if (present) {
+                        long version = exchange.getProperty(VERSION_PROPERTY, 
Long.class);
+                        LOG.debug("Updating record with key {} and version 
{}", key, version);
+                        update(camelContext, correlationId, exchange, 
getRepositoryName(), version);
+                    } else {
+                        LOG.debug("Inserting record with key {}");
+                        insert(camelContext, correlationId, exchange, 
getRepositoryName(), 1L);
+                    }
+
+                } catch (Exception e) {
+                    throw new RuntimeException("Error adding to repository " + 
repositoryName + " with key " + key, e);
+                }
+
+                return result;
+            }
+        });
+    }
+
+    /**
+     * Updates the current exchange details in the given repository table.
+     *
+     * @param camelContext   Current CamelContext
+     * @param key            Correlation key
+     * @param exchange       Aggregated exchange
+     * @param repositoryName Table's name
+     * @param version        Version identifier
+     */
+    protected void update(final CamelContext camelContext, final String key, 
final Exchange exchange, String repositoryName, Long version) throws Exception {
+        StringBuilder queryBuilder = new StringBuilder()
+                .append("UPDATE ").append(repositoryName)
+                .append(" SET ")
+                .append(EXCHANGE).append(" = ?")
+                .append(", ")
+                .append(VERSION).append(" = ?");
+        if (storeBodyAsText) {
+            queryBuilder.append(", ").append(BODY).append(" = ?");
+        }
+
+        if (hasHeadersToStoreAsText()) {
+            for (String headerName : headersToStoreAsText) {
+                queryBuilder.append(", ").append(headerName).append(" = ?");
+            }
+        }
+
+        queryBuilder.append(" WHERE ")
+            .append(ID).append(" = ?")
+            .append(" AND ")
+            .append(VERSION).append(" = ?");
+
+        String sql = queryBuilder.toString();
+        updateHelper(camelContext, key, exchange, sql, version);
+    }
+
+    /**
+     * Inserts a new record into the given repository table.
+     * Note: the exchange properties are NOT persisted.
+     *
+     * @param camelContext   Current CamelContext
+     * @param correlationId  Correlation key
+     * @param exchange       Aggregated exchange to insert
+     * @param repositoryName Table's name
+     * @param version        Version identifier
+     */
+    protected void insert(final CamelContext camelContext, final String 
correlationId, final Exchange exchange, String repositoryName, Long version) 
throws Exception {
+        // The default totalParameterIndex is 3 for ID, Exchange and version. 
Depending on logic this will be increased.
+        int totalParameterIndex = 3;
+        StringBuilder queryBuilder = new StringBuilder()
+                .append("INSERT INTO ").append(repositoryName)
+                .append('(').append(EXCHANGE)
+                .append(", ").append(ID)
+                .append(", ").append(VERSION);
+
+        if (storeBodyAsText) {
+            queryBuilder.append(", ").append(BODY);
+            totalParameterIndex++;
+        }
+
+        if (hasHeadersToStoreAsText()) {
+            for (String headerName : headersToStoreAsText) {
+                queryBuilder.append(", ").append(headerName);
+                totalParameterIndex++;
+            }
+        }
+
+        queryBuilder.append(") VALUES (");
+
+        for (int i = 0; i < totalParameterIndex - 1; i++) {
+            queryBuilder.append("?, ");
+        }
+        queryBuilder.append("?)");
+
+        String sql = queryBuilder.toString();
+
+        insertHelper(camelContext, correlationId, exchange, sql, version);
+    }
+
+    protected int insertHelper(final CamelContext camelContext, final String 
key, final Exchange exchange, String sql, final Long version) throws Exception {
+        final byte[] data = codec.marshallExchange(camelContext, exchange, 
allowSerializedHeaders);
+        Integer insertCount = jdbcTemplate.execute(sql,
+                new 
AbstractLobCreatingPreparedStatementCallback(getLobHandler()) {
+                    @Override
+                    protected void setValues(PreparedStatement ps, LobCreator 
lobCreator) throws SQLException {
+                        int totalParameterIndex = 0;
+                        lobCreator.setBlobAsBytes(ps, ++totalParameterIndex, 
data);
+                        ps.setString(++totalParameterIndex, key);
+                        ps.setLong(++totalParameterIndex, version);
+                        if (storeBodyAsText) {
+                            ps.setString(++totalParameterIndex, 
exchange.getIn().getBody(String.class));
+                        }
+                        if (hasHeadersToStoreAsText()) {
+                            for (String headerName : headersToStoreAsText) {
+                                String headerValue = 
exchange.getIn().getHeader(headerName, String.class);
+                                ps.setString(++totalParameterIndex, 
headerValue);
+                            }
+                        }
+                    }
+                });
+        return insertCount == null ? 0 : insertCount;
+    }
+
+    protected int updateHelper(final CamelContext camelContext, final String 
key, final Exchange exchange, String sql, final Long version) throws Exception {
+        final byte[] data = codec.marshallExchange(camelContext, exchange, 
allowSerializedHeaders);
+        Integer updateCount = jdbcTemplate.execute(sql,
+                new 
AbstractLobCreatingPreparedStatementCallback(getLobHandler()) {
+                    @Override
+                    protected void setValues(PreparedStatement ps, LobCreator 
lobCreator) throws SQLException {
+                        int totalParameterIndex = 0;
+                        lobCreator.setBlobAsBytes(ps, ++totalParameterIndex, 
data);
+                        ps.setLong(++totalParameterIndex, version + 1);
+                        if (storeBodyAsText) {
+                            ps.setString(++totalParameterIndex, 
exchange.getIn().getBody(String.class));
+                        }
+                        if (hasHeadersToStoreAsText()) {
+                            for (String headerName : headersToStoreAsText) {
+                                String headerValue = 
exchange.getIn().getHeader(headerName, String.class);
+                                ps.setString(++totalParameterIndex, 
headerValue);
+                            }
+                        }
+                        ps.setString(++totalParameterIndex, key);
+                        ps.setLong(++totalParameterIndex, version);
+                    }
+                });
+        if (updateCount == 1) {
+            return updateCount;
+        } else {
+            throw new RuntimeException(String.format("Stale version: error 
updating record with key %s and version %s", key, version));
+        }
+    }
+
+    @Override
+    public Exchange get(final CamelContext camelContext, final String 
correlationId) {
+        final String key = correlationId;
+        Exchange result = get(key, getRepositoryName(), camelContext);
+        LOG.debug("Getting key {} -> {}", key, result);
+        return result;
+    }
+
+    private Exchange get(final String key, final String repositoryName, final 
CamelContext camelContext) {
+        return transactionTemplateReadOnly.execute(new 
TransactionCallback<Exchange>() {
+            public Exchange doInTransaction(TransactionStatus status) {
+                try {
+
+                    Map<String, Object> columns = jdbcTemplate.queryForMap(
+                        String.format("SELECT %1$s, %2$s FROM %3$s WHERE 
%4$s=?", EXCHANGE, VERSION, repositoryName, ID),
+                        new Object[]{key}, new int[]{Types.VARCHAR});
+
+                    byte[] marshalledExchange = (byte[]) columns.get(EXCHANGE);
+                    long version = (long) columns.get(VERSION);
+
+                    Exchange result = codec.unmarshallExchange(camelContext, 
marshalledExchange);
+                    result.setProperty(VERSION_PROPERTY, version);
+                    return result;
+
+                } catch (EmptyResultDataAccessException ex) {
+                    return null;
+                } catch (IOException ex) {
+                    // Rollback the transaction
+                    throw new RuntimeException("Error getting key " + key + " 
from repository " + repositoryName, ex);
+                } catch (ClassNotFoundException ex) {
+                    // Rollback the transaction
+                    throw new RuntimeException(ex);
+                }
+            }
+        });
+    }
+
+    @Override
+    public void remove(final CamelContext camelContext, final String 
correlationId, final Exchange exchange) {
+        transactionTemplate.execute(new TransactionCallbackWithoutResult() {
+            protected void doInTransactionWithoutResult(TransactionStatus 
status) {
+                final String key = correlationId;
+                final String confirmKey = exchange.getExchangeId();
+                final long version = exchange.getProperty(VERSION_PROPERTY, 
Long.class);
+                try {
+                    LOG.debug("Removing key {}", key);
+
+                    jdbcTemplate.update("DELETE FROM " + getRepositoryName() + 
" WHERE " + ID + " = ? AND " + VERSION + " = ?", key, version);
+
+                    insert(camelContext, confirmKey, exchange, 
getRepositoryNameCompleted(), version);
+
+                } catch (Exception e) {
+                    throw new RuntimeException("Error removing key " + key + " 
from repository " + repositoryName, e);
+                }
+            }
+        });
+    }
+
+    @Override
+    public void confirm(final CamelContext camelContext, final String 
exchangeId) {
+        transactionTemplate.execute(new TransactionCallbackWithoutResult() {
+            protected void doInTransactionWithoutResult(TransactionStatus 
status) {
+                LOG.debug("Confirming exchangeId {}", exchangeId);
+                final String confirmKey = exchangeId;
+
+                jdbcTemplate.update("DELETE FROM " + 
getRepositoryNameCompleted() + " WHERE " + ID + " = ?",
+                        new Object[]{confirmKey});
+
+            }
+        });
+    }
+
+    @Override
+    public Set<String> getKeys() {
+        return getKeys(getRepositoryName());
+    }
+
+    @Override
+    public Set<String> scan(CamelContext camelContext) {
+        return getKeys(getRepositoryNameCompleted());
+    }
+
+    /**
+     * Returns the keys in the given repository
+     *
+     * @param repositoryName The name of the table
+     * @return Set of keys in the given repository name
+     */
+    protected Set<String> getKeys(final String repositoryName) {
+        return transactionTemplateReadOnly.execute(new 
TransactionCallback<LinkedHashSet<String>>() {
+            public LinkedHashSet<String> doInTransaction(TransactionStatus 
status) {
+                List<String> keys = jdbcTemplate.query("SELECT " + ID + " FROM 
" + repositoryName,
+                        new RowMapper<String>() {
+                            public String mapRow(ResultSet rs, int rowNum) 
throws SQLException {
+                                String id = rs.getString(ID);
+                                LOG.trace("getKey {}", id);
+                                return id;
+                            }
+                        });
+                return new LinkedHashSet<>(keys);
+            }
+        });
+    }
+
+    @Override
+    public Exchange recover(CamelContext camelContext, String exchangeId) {
+        final String key = exchangeId;
+        Exchange answer = get(key, getRepositoryNameCompleted(), camelContext);
+        LOG.debug("Recovering exchangeId {} -> {}", key, answer);
+        return answer;
+    }
+
+    /**
+     *  If recovery is enabled then a background task is run every x'th time 
to scan for failed exchanges to recover
+     *  and resubmit. By default this interval is 5000 millis.
+     */
+    @Override
+    public void setRecoveryInterval(long interval, TimeUnit timeUnit) {
+        this.recoveryInterval = timeUnit.toMillis(interval);
+    }
+
+    @Override
+    public void setRecoveryInterval(long interval) {
+        this.recoveryInterval = interval;
+    }
+
+    @Override
+    public long getRecoveryIntervalInMillis() {
+        return recoveryInterval;
+    }
+
+    @Override
+    public boolean isUseRecovery() {
+        return useRecovery;
+    }
+
+    /**
+     * Whether or not recovery is enabled. This option is by default true. 
When enabled the Camel
+     * Aggregator automatic recover failed aggregated exchange and have them 
resubmitted.
+     */
+    @Override
+    public void setUseRecovery(boolean useRecovery) {
+        this.useRecovery = useRecovery;
+    }
+
+    @Override
+    public int getMaximumRedeliveries() {
+        return maximumRedeliveries;
+    }
+
+    @Override
+    public void setMaximumRedeliveries(int maximumRedeliveries) {
+        this.maximumRedeliveries = maximumRedeliveries;
+    }
+
+    @Override
+    public String getDeadLetterUri() {
+        return deadLetterUri;
+    }
+
+    /**
+     * An endpoint uri for a Dead Letter Channel where exhausted recovered 
Exchanges will be
+     * moved. If this option is used then the maximumRedeliveries option must 
also be provided.
+     * Important note : if the deadletter route throws an exception, it will 
be send again to DLQ until it succeed !
+     */
+    public void setDeadLetterUri(String deadLetterUri) {
+        this.deadLetterUri = deadLetterUri;
+    }
+
+    public boolean isReturnOldExchange() {
+        return returnOldExchange;
+    }
+
+    /**
+     * Whether the get operation should return the old existing Exchange if 
any existed.
+     * By default this option is false to optimize as we do not need the old 
exchange when aggregating.
+     */
+    public void setReturnOldExchange(boolean returnOldExchange) {
+        this.returnOldExchange = returnOldExchange;
+    }
+
+    public void setJdbcCamelCodec(JdbcCamelCodec codec) {
+        this.codec = codec;
+    }
+
+    public boolean hasHeadersToStoreAsText() {
+        return this.headersToStoreAsText != null && 
!this.headersToStoreAsText.isEmpty();
+    }
+
+    public List<String> getHeadersToStoreAsText() {
+        return headersToStoreAsText;
+    }
+
+    /**
+     * Allows to store headers as String which is human readable. By default 
this option is disabled,
+     * storing the headers in binary format.
+     *
+     * @param headersToStoreAsText the list of headers to store as String
+     */
+    public void setHeadersToStoreAsText(List<String> headersToStoreAsText) {
+        this.headersToStoreAsText = headersToStoreAsText;
+    }
+
+    public boolean isStoreBodyAsText() {
+        return storeBodyAsText;
+    }
+
+    /**
+     * Whether to store the message body as String which is human readable.
+     * By default this option is false storing the body in binary format.
+     */
+    public void setStoreBodyAsText(boolean storeBodyAsText) {
+        this.storeBodyAsText = storeBodyAsText;
+    }
+
+    public boolean isAllowSerializedHeaders() {
+        return allowSerializedHeaders;
+    }
+
+    public void setAllowSerializedHeaders(boolean allowSerializedHeaders) {
+        this.allowSerializedHeaders = allowSerializedHeaders;
+    }
+
+    public int getPropagationBehavior() {
+        return propagationBehavior;
+    }
+
+    /**
+     * Sets propagation behavior to use with spring transaction templates 
which are used for database access.
+     * The default is TransactionDefinition.PROPAGATION_REQUIRED.
+     */
+    public void setPropagationBehavior(int propagationBehavior) {
+        this.propagationBehavior = propagationBehavior;
+    }
+
+    /**
+     * Sets propagation behavior to use with spring transaction templates 
which are used for database access.
+     * The default is TransactionDefinition.PROPAGATION_REQUIRED. This setter 
accepts names of the constants, like
+     * "PROPAGATION_REQUIRED".
+     * @param propagationBehaviorName
+     */
+    public void setPropagationBehaviorName(String propagationBehaviorName) {
+        if 
(!propagationBehaviorName.startsWith(DefaultTransactionDefinition.PREFIX_PROPAGATION))
 {
+            throw new IllegalArgumentException("Only propagation constants 
allowed");
+        }
+        
setPropagationBehavior(PROPAGATION_CONSTANTS.asNumber(propagationBehaviorName).intValue());
+    }
+
+    public LobHandler getLobHandler() {
+        return lobHandler;
+    }
+
+    /**
+     * Sets a custom LobHandler to use
+     */
+    public void setLobHandler(LobHandler lobHandler) {
+        this.lobHandler = lobHandler;
+    }
+
+    public JdbcOptimisticLockingExceptionMapper 
getJdbcOptimisticLockingExceptionMapper() {
+        return jdbcOptimisticLockingExceptionMapper;
+    }
+
+    public void 
setJdbcOptimisticLockingExceptionMapper(JdbcOptimisticLockingExceptionMapper 
jdbcOptimisticLockingExceptionMapper) {
+        this.jdbcOptimisticLockingExceptionMapper = 
jdbcOptimisticLockingExceptionMapper;
+    }
+
+    public String getRepositoryName() {
+        return repositoryName;
+    }
+
+    public String getRepositoryNameCompleted() {
+        return getRepositoryName() + "_completed";
+    }
+
+    @Override
+    protected void doStart() throws Exception {
+        ObjectHelper.notNull(repositoryName, "RepositoryName");
+        ObjectHelper.notNull(transactionManager, "TransactionManager");
+        ObjectHelper.notNull(dataSource, "DataSource");
+
+        transactionTemplate = new TransactionTemplate(transactionManager);
+        transactionTemplate.setPropagationBehavior(propagationBehavior);
+
+        transactionTemplateReadOnly = new 
TransactionTemplate(transactionManager);
+        
transactionTemplateReadOnly.setPropagationBehavior(propagationBehavior);
+        transactionTemplateReadOnly.setReadOnly(true);
+
+        // log number of existing exchanges
+        int current = getKeys().size();
+        int completed = scan(null).size();
+
+        if (current > 0) {
+            LOG.info("On startup there are " + current + " aggregate exchanges 
(not completed) in repository: " + getRepositoryName());
+        } else {
+            LOG.info("On startup there are no existing aggregate exchanges 
(not completed) in repository: {}", getRepositoryName());
+        }
+        if (completed > 0) {
+            LOG.warn("On startup there are " + completed + " completed 
exchanges to be recovered in repository: " + getRepositoryNameCompleted());
+        } else {
+            LOG.info("On startup there are no completed exchanges to be 
recovered in repository: {}", getRepositoryNameCompleted());
+        }
+    }
+
+    @Override
+    protected void doStop() throws Exception {
+        // noop
+    }
+
+}
diff --git 
a/components/camel-sql/src/test/java/org/apache/camel/processor/aggregate/jdbc/AbstractJdbcAggregationTestSupport.java
 
b/components/camel-sql/src/test/java/org/apache/camel/processor/aggregate/jdbc/AbstractJdbcAggregationTestSupport.java
index 94d4777..1f7229d 100644
--- 
a/components/camel-sql/src/test/java/org/apache/camel/processor/aggregate/jdbc/AbstractJdbcAggregationTestSupport.java
+++ 
b/components/camel-sql/src/test/java/org/apache/camel/processor/aggregate/jdbc/AbstractJdbcAggregationTestSupport.java
@@ -30,14 +30,14 @@ public abstract class AbstractJdbcAggregationTestSupport 
extends CamelSpringTest
     @Override
     public void postProcessTest() throws Exception {
         super.postProcessTest();
-        
+
         repo = applicationContext.getBean("repo1", 
JdbcAggregationRepository.class);
         configureJdbcAggregationRepository();
     }
-    
+
     void configureJdbcAggregationRepository() {
     }
-    
+
     @Override
     protected RouteBuilder createRouteBuilder() throws Exception {
         return new RouteBuilder() {
@@ -54,16 +54,23 @@ public abstract class AbstractJdbcAggregationTestSupport 
extends CamelSpringTest
             // END SNIPPET: e1
         };
     }
-    
+
     long getCompletionInterval() {
         return 5000;
     }
-    
+
     @Override
     protected AbstractApplicationContext createApplicationContext() {
         return new 
ClassPathXmlApplicationContext("org/apache/camel/processor/aggregate/jdbc/JdbcSpringDataSource.xml");
     }
 
+    protected Exchange repoAddAndGet(String key, Exchange exchange) {
+        repo.add(context, key, exchange);
+        // recover the exchange with the new version to be able to add again
+        exchange = repo.get(context, key);
+        return exchange;
+    }
+
     public static class MyAggregationStrategy implements AggregationStrategy {
 
         public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
diff --git 
a/components/camel-sql/src/test/java/org/apache/camel/processor/aggregate/jdbc/JdbcAggregateLoadAndRecoverTest.java
 
b/components/camel-sql/src/test/java/org/apache/camel/processor/aggregate/jdbc/JdbcAggregateLoadAndRecoverTest.java
index 840184e..b58812b 100644
--- 
a/components/camel-sql/src/test/java/org/apache/camel/processor/aggregate/jdbc/JdbcAggregateLoadAndRecoverTest.java
+++ 
b/components/camel-sql/src/test/java/org/apache/camel/processor/aggregate/jdbc/JdbcAggregateLoadAndRecoverTest.java
@@ -38,7 +38,7 @@ public class JdbcAggregateLoadAndRecoverTest extends 
AbstractJdbcAggregationTest
     public void testLoadAndRecoverJdbcAggregate() throws Exception {
         MockEndpoint mock = getMockEndpoint("mock:result");
         mock.expectedMessageCount(SIZE / 10);
-        mock.setResultWaitTime(50 * 1000);
+        mock.setResultWaitTime(5_000);
 
         LOG.info("Staring to send " + SIZE + " messages.");
 
@@ -73,13 +73,17 @@ public class JdbcAggregateLoadAndRecoverTest extends 
AbstractJdbcAggregationTest
         return new RouteBuilder() {
             @Override
             public void configure() throws Exception {
+                onException(IllegalStateException.class)
+                        .maximumRedeliveries(3)
+                        .redeliveryDelay(100L);
+
                 from("seda:start?size=" + SIZE)
                         .to("log:input?groupSize=500")
                         .aggregate(header("id"), new MyAggregationStrategy())
                         .aggregationRepository(repo)
                         .completionSize(10)
                         .to("log:output?showHeaders=true")
-                                // have every 10th exchange fail which should 
then be recovered
+                        // have every 10th exchange fail which should then be 
recovered
                         .process(new Processor() {
                             public void process(Exchange exchange) throws 
Exception {
                                 //Avoid same message to be discarded twice
diff --git 
a/components/camel-sql/src/test/java/org/apache/camel/processor/aggregate/jdbc/JdbcAggregateNotLostTest.java
 
b/components/camel-sql/src/test/java/org/apache/camel/processor/aggregate/jdbc/JdbcAggregateNotLostTest.java
index 0c75a88..8722d8d 100644
--- 
a/components/camel-sql/src/test/java/org/apache/camel/processor/aggregate/jdbc/JdbcAggregateNotLostTest.java
+++ 
b/components/camel-sql/src/test/java/org/apache/camel/processor/aggregate/jdbc/JdbcAggregateNotLostTest.java
@@ -35,11 +35,11 @@ public class JdbcAggregateNotLostTest extends 
AbstractJdbcAggregationTestSupport
         template.sendBodyAndHeader("direct:start", "D", "id", 123);
         template.sendBodyAndHeader("direct:start", "E", "id", 123);
 
-        assertMockEndpointsSatisfied(30, TimeUnit.SECONDS);
+        assertMockEndpointsSatisfied(5, TimeUnit.SECONDS);
 
         String exchangeId = 
getMockEndpoint("mock:aggregated").getReceivedExchanges().get(0).getExchangeId();
-
         // the exchange should be in the completed repo where we should be 
able to find it
+
         Exchange completed = repo.recover(context, exchangeId);
         // assert the exchange was not lost and we got all the information 
still
         assertNotNull(completed);
@@ -63,7 +63,7 @@ public class JdbcAggregateNotLostTest extends 
AbstractJdbcAggregationTestSupport
                         .completionSize(5).aggregationRepository(repo)
                         .log("aggregated exchange id ${exchangeId} with 
${body}")
                         .to("mock:aggregated")
-                                // throw an exception to fail, which we then 
will loose this message
+                        // throw an exception to fail, which we then will 
loose this message
                         .throwException(new IllegalArgumentException("Damn"))
                         .to("mock:result")
                         .end();
diff --git 
a/components/camel-sql/src/test/java/org/apache/camel/processor/aggregate/jdbc/JdbcAggregateRecoverDeadLetterChannelTest.java
 
b/components/camel-sql/src/test/java/org/apache/camel/processor/aggregate/jdbc/JdbcAggregateRecoverDeadLetterChannelTest.java
index af26a56..39f6b2e 100644
--- 
a/components/camel-sql/src/test/java/org/apache/camel/processor/aggregate/jdbc/JdbcAggregateRecoverDeadLetterChannelTest.java
+++ 
b/components/camel-sql/src/test/java/org/apache/camel/processor/aggregate/jdbc/JdbcAggregateRecoverDeadLetterChannelTest.java
@@ -69,4 +69,4 @@ public class JdbcAggregateRecoverDeadLetterChannelTest 
extends AbstractJdbcAggre
             }
         };
     }
-}
\ No newline at end of file
+}
diff --git 
a/components/camel-sql/src/test/java/org/apache/camel/processor/aggregate/jdbc/JdbcAggregationRepositoryAlotDataTest.java
 
b/components/camel-sql/src/test/java/org/apache/camel/processor/aggregate/jdbc/JdbcAggregationRepositoryAlotDataTest.java
index 5025c40..bd3af20 100644
--- 
a/components/camel-sql/src/test/java/org/apache/camel/processor/aggregate/jdbc/JdbcAggregationRepositoryAlotDataTest.java
+++ 
b/components/camel-sql/src/test/java/org/apache/camel/processor/aggregate/jdbc/JdbcAggregationRepositoryAlotDataTest.java
@@ -67,4 +67,4 @@ public class JdbcAggregationRepositoryAlotDataTest extends 
AbstractJdbcAggregati
             assertEquals("counter:" + i, actual.getIn().getBody());
         }
     }
-}
\ No newline at end of file
+}
diff --git 
a/components/camel-sql/src/test/java/org/apache/camel/processor/aggregate/jdbc/JdbcAggregationRepositoryMultipleRepoTest.java
 
b/components/camel-sql/src/test/java/org/apache/camel/processor/aggregate/jdbc/JdbcAggregationRepositoryMultipleRepoTest.java
index be07b7b..e41ef7f 100644
--- 
a/components/camel-sql/src/test/java/org/apache/camel/processor/aggregate/jdbc/JdbcAggregationRepositoryMultipleRepoTest.java
+++ 
b/components/camel-sql/src/test/java/org/apache/camel/processor/aggregate/jdbc/JdbcAggregationRepositoryMultipleRepoTest.java
@@ -53,8 +53,9 @@ public class JdbcAggregationRepositoryMultipleRepoTest 
extends CamelSpringTestSu
         assertEquals("counter:1", actual.getIn().getBody());
         assertEquals(null, repo2.get(context, "foo"));
 
-        // Change it..
+        // Change it after reading the current exchange with version
         Exchange exchange2 = new DefaultExchange(context);
+        exchange2 = repo1.get(context, "foo");
         exchange2.getIn().setBody("counter:2");
         actual = repo1.add(context, "foo", exchange2);
         // the old one
diff --git 
a/components/camel-sql/src/test/java/org/apache/camel/processor/aggregate/jdbc/JdbcAggregationRepositoryRecoverExistingTest.java
 
b/components/camel-sql/src/test/java/org/apache/camel/processor/aggregate/jdbc/JdbcAggregationRepositoryRecoverExistingTest.java
index 1a2361f..f750f7d 100644
--- 
a/components/camel-sql/src/test/java/org/apache/camel/processor/aggregate/jdbc/JdbcAggregationRepositoryRecoverExistingTest.java
+++ 
b/components/camel-sql/src/test/java/org/apache/camel/processor/aggregate/jdbc/JdbcAggregationRepositoryRecoverExistingTest.java
@@ -28,7 +28,7 @@ public class JdbcAggregationRepositoryRecoverExistingTest 
extends AbstractJdbcAg
         repo.setReturnOldExchange(true);
         repo.setUseRecovery(true);
     }
-    
+
     @Override
     protected RouteBuilder createRouteBuilder() throws Exception {
         return new RouteBuilder() {
@@ -48,7 +48,8 @@ public class JdbcAggregationRepositoryRecoverExistingTest 
extends AbstractJdbcAg
         Exchange actual = repo.add(context, "foo", exchange1);
         assertEquals(null, actual);
 
-        // Remove it, which makes it in the pre confirm stage
+        // Get and remove it, which makes it in the pre confirm stage
+        exchange1 = repo.get(context, "foo");
         repo.remove(context, "foo", exchange1);
 
         String id = exchange1.getExchangeId();
diff --git 
a/components/camel-sql/src/test/java/org/apache/camel/processor/aggregate/jdbc/JdbcAggregationRepositoryTest.java
 
b/components/camel-sql/src/test/java/org/apache/camel/processor/aggregate/jdbc/JdbcAggregationRepositoryTest.java
index 69b377a..c2daf52 100644
--- 
a/components/camel-sql/src/test/java/org/apache/camel/processor/aggregate/jdbc/JdbcAggregationRepositoryTest.java
+++ 
b/components/camel-sql/src/test/java/org/apache/camel/processor/aggregate/jdbc/JdbcAggregationRepositoryTest.java
@@ -25,7 +25,7 @@ public class JdbcAggregationRepositoryTest extends 
AbstractJdbcAggregationTestSu
     void configureJdbcAggregationRepository() {
         repo.setReturnOldExchange(true);
     }
-    
+
     @Test
     public void testOperations() {
         // Can't get something we have not put in...
@@ -42,8 +42,9 @@ public class JdbcAggregationRepositoryTest extends 
AbstractJdbcAggregationTestSu
         actual = repo.get(context, "foo");
         assertEquals("counter:1", actual.getIn().getBody());
 
-        // Change it..
+        // Change it after reading the current exchange with version
         Exchange exchange2 = new DefaultExchange(context);
+        exchange2 = repo.get(context, "foo");
         exchange2.getIn().setBody("counter:2");
         actual = repo.add(context, "foo", exchange2);
         // the old one
diff --git 
a/components/camel-sql/src/test/java/org/apache/camel/processor/aggregate/jdbc/JdbcExchangeSerializationTest.java
 
b/components/camel-sql/src/test/java/org/apache/camel/processor/aggregate/jdbc/JdbcExchangeSerializationTest.java
index 7f16fe0..de25da9 100644
--- 
a/components/camel-sql/src/test/java/org/apache/camel/processor/aggregate/jdbc/JdbcExchangeSerializationTest.java
+++ 
b/components/camel-sql/src/test/java/org/apache/camel/processor/aggregate/jdbc/JdbcExchangeSerializationTest.java
@@ -26,6 +26,7 @@ public class JdbcExchangeSerializationTest extends 
AbstractJdbcAggregationTestSu
 
     @Test
     public void testExchangeSerialization() {
+        final String key = "foo";
         Exchange exchange = new DefaultExchange(context);
         exchange.getIn().setBody("Hello World");
         exchange.getIn().setHeader("name", "Olivier");
@@ -35,9 +36,9 @@ public class JdbcExchangeSerializationTest extends 
AbstractJdbcAggregationTestSu
         Date now = new Date();
         exchange.getIn().setHeader("date", now);
 
-        repo.add(context, "foo", exchange);
+        exchange = repoAddAndGet(key, exchange);
 
-        Exchange actual = repo.get(context, "foo");
+        Exchange actual = repo.get(context, key);
         assertEquals("Hello World", actual.getIn().getBody());
         assertEquals("Olivier", actual.getIn().getHeader("name"));
         assertEquals(123, actual.getIn().getHeader("number"));
@@ -53,9 +54,9 @@ public class JdbcExchangeSerializationTest extends 
AbstractJdbcAggregationTestSu
         exchange.getIn().setHeader("name", "Thomas");
         exchange.getIn().removeHeader("date");
 
-        repo.add(context, "foo", exchange);
+        exchange = repoAddAndGet(key, exchange);
 
-        actual = repo.get(context, "foo");
+        actual = repo.get(context, key);
         assertEquals("Bye World", actual.getIn().getBody());
         assertEquals("Thomas", actual.getIn().getHeader("name"));
         assertEquals(123, actual.getIn().getHeader("number"));
diff --git 
a/components/camel-sql/src/test/java/org/apache/camel/processor/aggregate/jdbc/JdbcGrowIssueTest.java
 
b/components/camel-sql/src/test/java/org/apache/camel/processor/aggregate/jdbc/JdbcGrowIssueTest.java
index 66ffd37..b919b25 100644
--- 
a/components/camel-sql/src/test/java/org/apache/camel/processor/aggregate/jdbc/JdbcGrowIssueTest.java
+++ 
b/components/camel-sql/src/test/java/org/apache/camel/processor/aggregate/jdbc/JdbcGrowIssueTest.java
@@ -31,8 +31,8 @@ public class JdbcGrowIssueTest extends 
AbstractJdbcAggregationTestSupport {
         for (int i = 0; i < SIZE; i++) {
             sb.append("X");
         }
-        Exchange item = new DefaultExchange(context);
-        item.getIn().setBody(sb.toString(), String.class);
+        Exchange exchange = new DefaultExchange(context);
+        exchange.getIn().setBody(sb.toString(), String.class);
 
         // the key
         final String key = "foo";
@@ -40,7 +40,7 @@ public class JdbcGrowIssueTest extends 
AbstractJdbcAggregationTestSupport {
         // we update using the same key, which means we should be able to do 
this within the file size limit
         for (int i = 0; i < SIZE; i++) {
             log.debug("Updating " + i);
-            repo.add(context, key, item);
+            exchange = repoAddAndGet(key, exchange);
         }
 
         // get the last
diff --git a/components/camel-sql/src/test/resources/sql/init.sql 
b/components/camel-sql/src/test/resources/sql/init.sql
index 5396248..271e72e 100644
--- a/components/camel-sql/src/test/resources/sql/init.sql
+++ b/components/camel-sql/src/test/resources/sql/init.sql
@@ -24,4 +24,4 @@ CREATE TABLE aggregationRepo1_completed (
     id varchar(255) NOT NULL,
     exchange blob NOT NULL,
     constraint aggregationRepo1_completed_pk PRIMARY KEY (id)
-);
\ No newline at end of file
+);
diff --git a/components/camel-sql/src/test/resources/sql/init2.sql 
b/components/camel-sql/src/test/resources/sql/init2.sql
index 515179a..18f17c0 100644
--- a/components/camel-sql/src/test/resources/sql/init2.sql
+++ b/components/camel-sql/src/test/resources/sql/init2.sql
@@ -24,4 +24,4 @@ CREATE TABLE aggregationRepo2_completed (
     id varchar(255) NOT NULL,
     exchange blob NOT NULL,
     constraint aggregationRepo2_completed_pk PRIMARY KEY (id)
-);
\ No newline at end of file
+);
diff --git a/components/camel-sql/src/test/resources/sql/init3.sql 
b/components/camel-sql/src/test/resources/sql/init3.sql
index 2bef291..69450d0 100644
--- a/components/camel-sql/src/test/resources/sql/init3.sql
+++ b/components/camel-sql/src/test/resources/sql/init3.sql
@@ -30,4 +30,4 @@ CREATE TABLE aggregationRepo3_completed (
     companyName varchar(1000),
     accountName varchar(1000),
     constraint aggregationRepo3_completed_pk PRIMARY KEY (id)
-);
\ No newline at end of file
+);

Reply via email to