This is an automated email from the ASF dual-hosted git repository. davsclaus pushed a commit to branch camel-2.25.x in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/camel-2.25.x by this push: new 20520d5 [CAMEL-7810] Fix JdbcAggregationRepository optimistic locking feature (#3837) 20520d5 is described below commit 20520d547c81499f6b81cd8765cd1041d4e3e484 Author: Federico Valeri <fval...@users.noreply.github.com> AuthorDate: Thu May 21 08:44:24 2020 +0200 [CAMEL-7810] Fix JdbcAggregationRepository optimistic locking feature (#3837) Running JdbcAggregationRepository on a distributed environment (multiple independent threads or multiple JVMs) we have silently dropped exchanges. This PR aims at introducing a version identifier to avoid the lost update problem (overwrites when updating the aggregate row) as stated by the OptimisticLockingAggregationRepository SPI. Tested running 100 threads to aggregate 10k messages on MySQL 5.7 and PostgreSQL 11.5. Co-authored-by: Federico Valeri <fvaleri@localhost> --- camel-core/src/main/docs/language-component.adoc | 3 +- .../camel-sql/src/main/docs/sql-component.adoc | 19 + .../jdbc/JdbcAggregationRepositoryNew.java | 626 +++++++++++++++++++++ .../jdbc/AbstractJdbcAggregationTestSupport.java | 17 +- .../jdbc/JdbcAggregateLoadAndRecoverTest.java | 8 +- .../aggregate/jdbc/JdbcAggregateNotLostTest.java | 6 +- .../JdbcAggregateRecoverDeadLetterChannelTest.java | 2 +- .../JdbcAggregationRepositoryAlotDataTest.java | 2 +- .../JdbcAggregationRepositoryMultipleRepoTest.java | 3 +- ...bcAggregationRepositoryRecoverExistingTest.java | 5 +- .../jdbc/JdbcAggregationRepositoryTest.java | 5 +- .../jdbc/JdbcExchangeSerializationTest.java | 9 +- .../aggregate/jdbc/JdbcGrowIssueTest.java | 6 +- .../camel-sql/src/test/resources/sql/init.sql | 2 +- .../camel-sql/src/test/resources/sql/init2.sql | 2 +- .../camel-sql/src/test/resources/sql/init3.sql | 2 +- 16 files changed, 689 insertions(+), 28 deletions(-) diff --git a/camel-core/src/main/docs/language-component.adoc b/camel-core/src/main/docs/language-component.adoc index 241b13d..a3a5620 100644 --- a/camel-core/src/main/docs/language-component.adoc +++ b/camel-core/src/main/docs/language-component.adoc @@ -62,12 +62,13 @@ with the following path and query parameters: |=== -=== Query Parameters (6 parameters): +=== Query Parameters (7 parameters): [width="100%",cols="2,5,^1,2",options="header"] |=== | Name | Description | Default | Type +| *allowContextMapAll* (producer) | Sets whether the context map should allow access to all details. By default only the message body and headers can be accessed. This option can be enabled for full access to the current Exchange and CamelContext. Doing so impose a potential security risk as this opens access to the full power of CamelContext API. | false | boolean | *binary* (producer) | Whether the script is binary content or text content. By default the script is read as text content (eg java.lang.String) | false | boolean | *cacheScript* (producer) | Whether to cache the compiled script and reuse Notice reusing the script can cause side effects from processing one Camel org.apache.camel.Exchange to the next org.apache.camel.Exchange. | false | boolean | *contentCache* (producer) | Sets whether to use resource content cache or not. | false | boolean diff --git a/components/camel-sql/src/main/docs/sql-component.adoc b/components/camel-sql/src/main/docs/sql-component.adoc index 66a0499..ab9763c 100644 --- a/components/camel-sql/src/main/docs/sql-component.adoc +++ b/components/camel-sql/src/main/docs/sql-component.adoc @@ -646,6 +646,25 @@ CREATE TABLE aggregation_completed ( ); ----- +**Note:** There is a new version called `JdbcAggregationRepositoryNew` which supports the optimistic locking +also on a dstributed environment (multiple JVMs or independent threads). This new implementation is built on +top of the old and needs an additional `version` column: + +[source,sql] +----- +CREATE TABLE aggregation ( + id varchar(255) NOT NULL, + exchange blob NOT NULL, + version BIGINT NOT NULL, + constraint aggregation_pk PRIMARY KEY (id) +); +CREATE TABLE aggregation_completed ( + id varchar(255) NOT NULL, + exchange blob NOT NULL, + version BIGINT NOT NULL, + constraint aggregation_completed_pk PRIMARY KEY (id) +); +----- == Storing body and headers as text diff --git a/components/camel-sql/src/main/java/org/apache/camel/processor/aggregate/jdbc/JdbcAggregationRepositoryNew.java b/components/camel-sql/src/main/java/org/apache/camel/processor/aggregate/jdbc/JdbcAggregationRepositoryNew.java new file mode 100644 index 0000000..d559af8 --- /dev/null +++ b/components/camel-sql/src/main/java/org/apache/camel/processor/aggregate/jdbc/JdbcAggregationRepositoryNew.java @@ -0,0 +1,626 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.processor.aggregate.jdbc; + +import java.io.IOException; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Types; +import java.util.LinkedHashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.TimeUnit; +import javax.sql.DataSource; + +import org.apache.camel.CamelContext; +import org.apache.camel.Exchange; +import org.apache.camel.spi.OptimisticLockingAggregationRepository; +import org.apache.camel.spi.RecoverableAggregationRepository; +import org.apache.camel.support.ServiceSupport; +import org.apache.camel.util.ObjectHelper; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.core.Constants; +import org.springframework.dao.EmptyResultDataAccessException; +import org.springframework.jdbc.core.JdbcTemplate; +import org.springframework.jdbc.core.RowMapper; +import org.springframework.jdbc.core.support.AbstractLobCreatingPreparedStatementCallback; +import org.springframework.jdbc.support.lob.DefaultLobHandler; +import org.springframework.jdbc.support.lob.LobCreator; +import org.springframework.jdbc.support.lob.LobHandler; +import org.springframework.transaction.PlatformTransactionManager; +import org.springframework.transaction.TransactionDefinition; +import org.springframework.transaction.TransactionStatus; +import org.springframework.transaction.support.DefaultTransactionDefinition; +import org.springframework.transaction.support.TransactionCallback; +import org.springframework.transaction.support.TransactionCallbackWithoutResult; +import org.springframework.transaction.support.TransactionTemplate; + +/** + * JDBC based {@link org.apache.camel.spi.AggregationRepository} + * JdbcAggregationRepository will only preserve any Serializable compatible + * data types. If a data type is not such a type its dropped and a WARN is + * logged. And it only persists the Message body and the Message headers. + * The Exchange properties are not persisted. + * + * Note: This new version is built from the old one and simply adds the version + * property to properly support the optimistic locking also on a distributed + * environment (multiple JVMs or independent threads). + */ +public class JdbcAggregationRepositoryNew extends ServiceSupport implements RecoverableAggregationRepository, OptimisticLockingAggregationRepository { + + private static final Logger LOG = LoggerFactory.getLogger(JdbcAggregationRepositoryNew.class); + + protected static final String EXCHANGE = "exchange"; + protected static final String ID = "id"; + protected static final String BODY = "body"; + + // version identifier needed to avoid the lost update problem + private static final String VERSION = "version"; + private static final String VERSION_PROPERTY = "CamelOptimisticLockVersion"; + + private static final Constants PROPAGATION_CONSTANTS = new Constants(TransactionDefinition.class); + + private JdbcOptimisticLockingExceptionMapper jdbcOptimisticLockingExceptionMapper = new DefaultJdbcOptimisticLockingExceptionMapper(); + private PlatformTransactionManager transactionManager; + private DataSource dataSource; + private TransactionTemplate transactionTemplate; + private TransactionTemplate transactionTemplateReadOnly; + private int propagationBehavior = TransactionDefinition.PROPAGATION_REQUIRED; + private JdbcTemplate jdbcTemplate; + private LobHandler lobHandler = new DefaultLobHandler(); + private String repositoryName; + private boolean returnOldExchange; + private JdbcCamelCodec codec = new JdbcCamelCodec(); + private long recoveryInterval = 5000; + private boolean useRecovery = true; + private int maximumRedeliveries; + private String deadLetterUri; + private List<String> headersToStoreAsText; + private boolean storeBodyAsText; + private boolean allowSerializedHeaders; + + /** + * Creates an aggregation repository + */ + public JdbcAggregationRepositoryNew() { + } + + /** + * Creates an aggregation repository with the three mandatory parameters + */ + public JdbcAggregationRepositoryNew(PlatformTransactionManager transactionManager, String repositoryName, DataSource dataSource) { + this.setRepositoryName(repositoryName); + this.setTransactionManager(transactionManager); + this.setDataSource(dataSource); + } + + /** + * Sets the name of the repository + */ + public final void setRepositoryName(String repositoryName) { + this.repositoryName = repositoryName; + } + + public final void setTransactionManager(PlatformTransactionManager transactionManager) { + this.transactionManager = transactionManager; + } + + /** + * Sets the DataSource to use for accessing the database + */ + public final void setDataSource(DataSource dataSource) { + this.dataSource = dataSource; + + jdbcTemplate = new JdbcTemplate(dataSource); + } + + @Override + public Exchange add(final CamelContext camelContext, final String correlationId, + final Exchange oldExchange, final Exchange newExchange) throws OptimisticLockingException { + + try { + return add(camelContext, correlationId, newExchange); + } catch (Exception e) { + if (jdbcOptimisticLockingExceptionMapper != null && jdbcOptimisticLockingExceptionMapper.isOptimisticLocking(e)) { + throw new OptimisticLockingException(); + } else { + throw ObjectHelper.wrapRuntimeCamelException(e); + } + } + } + + @Override + public Exchange add(final CamelContext camelContext, final String correlationId, final Exchange exchange) { + return transactionTemplate.execute(new TransactionCallback<Exchange>() { + + public Exchange doInTransaction(TransactionStatus status) { + Exchange result = null; + final String key = correlationId; + + try { + LOG.debug("Adding exchange with key {}", key); + + boolean present = jdbcTemplate.queryForObject( + "SELECT COUNT(1) FROM " + getRepositoryName() + " WHERE " + ID + " = ?", Integer.class, key) != 0; + + // Recover existing exchange with that ID + if (isReturnOldExchange() && present) { + result = get(key, getRepositoryName(), camelContext); + } + + if (present) { + long version = exchange.getProperty(VERSION_PROPERTY, Long.class); + LOG.debug("Updating record with key {} and version {}", key, version); + update(camelContext, correlationId, exchange, getRepositoryName(), version); + } else { + LOG.debug("Inserting record with key {}"); + insert(camelContext, correlationId, exchange, getRepositoryName(), 1L); + } + + } catch (Exception e) { + throw new RuntimeException("Error adding to repository " + repositoryName + " with key " + key, e); + } + + return result; + } + }); + } + + /** + * Updates the current exchange details in the given repository table. + * + * @param camelContext Current CamelContext + * @param key Correlation key + * @param exchange Aggregated exchange + * @param repositoryName Table's name + * @param version Version identifier + */ + protected void update(final CamelContext camelContext, final String key, final Exchange exchange, String repositoryName, Long version) throws Exception { + StringBuilder queryBuilder = new StringBuilder() + .append("UPDATE ").append(repositoryName) + .append(" SET ") + .append(EXCHANGE).append(" = ?") + .append(", ") + .append(VERSION).append(" = ?"); + if (storeBodyAsText) { + queryBuilder.append(", ").append(BODY).append(" = ?"); + } + + if (hasHeadersToStoreAsText()) { + for (String headerName : headersToStoreAsText) { + queryBuilder.append(", ").append(headerName).append(" = ?"); + } + } + + queryBuilder.append(" WHERE ") + .append(ID).append(" = ?") + .append(" AND ") + .append(VERSION).append(" = ?"); + + String sql = queryBuilder.toString(); + updateHelper(camelContext, key, exchange, sql, version); + } + + /** + * Inserts a new record into the given repository table. + * Note: the exchange properties are NOT persisted. + * + * @param camelContext Current CamelContext + * @param correlationId Correlation key + * @param exchange Aggregated exchange to insert + * @param repositoryName Table's name + * @param version Version identifier + */ + protected void insert(final CamelContext camelContext, final String correlationId, final Exchange exchange, String repositoryName, Long version) throws Exception { + // The default totalParameterIndex is 3 for ID, Exchange and version. Depending on logic this will be increased. + int totalParameterIndex = 3; + StringBuilder queryBuilder = new StringBuilder() + .append("INSERT INTO ").append(repositoryName) + .append('(').append(EXCHANGE) + .append(", ").append(ID) + .append(", ").append(VERSION); + + if (storeBodyAsText) { + queryBuilder.append(", ").append(BODY); + totalParameterIndex++; + } + + if (hasHeadersToStoreAsText()) { + for (String headerName : headersToStoreAsText) { + queryBuilder.append(", ").append(headerName); + totalParameterIndex++; + } + } + + queryBuilder.append(") VALUES ("); + + for (int i = 0; i < totalParameterIndex - 1; i++) { + queryBuilder.append("?, "); + } + queryBuilder.append("?)"); + + String sql = queryBuilder.toString(); + + insertHelper(camelContext, correlationId, exchange, sql, version); + } + + protected int insertHelper(final CamelContext camelContext, final String key, final Exchange exchange, String sql, final Long version) throws Exception { + final byte[] data = codec.marshallExchange(camelContext, exchange, allowSerializedHeaders); + Integer insertCount = jdbcTemplate.execute(sql, + new AbstractLobCreatingPreparedStatementCallback(getLobHandler()) { + @Override + protected void setValues(PreparedStatement ps, LobCreator lobCreator) throws SQLException { + int totalParameterIndex = 0; + lobCreator.setBlobAsBytes(ps, ++totalParameterIndex, data); + ps.setString(++totalParameterIndex, key); + ps.setLong(++totalParameterIndex, version); + if (storeBodyAsText) { + ps.setString(++totalParameterIndex, exchange.getIn().getBody(String.class)); + } + if (hasHeadersToStoreAsText()) { + for (String headerName : headersToStoreAsText) { + String headerValue = exchange.getIn().getHeader(headerName, String.class); + ps.setString(++totalParameterIndex, headerValue); + } + } + } + }); + return insertCount == null ? 0 : insertCount; + } + + protected int updateHelper(final CamelContext camelContext, final String key, final Exchange exchange, String sql, final Long version) throws Exception { + final byte[] data = codec.marshallExchange(camelContext, exchange, allowSerializedHeaders); + Integer updateCount = jdbcTemplate.execute(sql, + new AbstractLobCreatingPreparedStatementCallback(getLobHandler()) { + @Override + protected void setValues(PreparedStatement ps, LobCreator lobCreator) throws SQLException { + int totalParameterIndex = 0; + lobCreator.setBlobAsBytes(ps, ++totalParameterIndex, data); + ps.setLong(++totalParameterIndex, version + 1); + if (storeBodyAsText) { + ps.setString(++totalParameterIndex, exchange.getIn().getBody(String.class)); + } + if (hasHeadersToStoreAsText()) { + for (String headerName : headersToStoreAsText) { + String headerValue = exchange.getIn().getHeader(headerName, String.class); + ps.setString(++totalParameterIndex, headerValue); + } + } + ps.setString(++totalParameterIndex, key); + ps.setLong(++totalParameterIndex, version); + } + }); + if (updateCount == 1) { + return updateCount; + } else { + throw new RuntimeException(String.format("Stale version: error updating record with key %s and version %s", key, version)); + } + } + + @Override + public Exchange get(final CamelContext camelContext, final String correlationId) { + final String key = correlationId; + Exchange result = get(key, getRepositoryName(), camelContext); + LOG.debug("Getting key {} -> {}", key, result); + return result; + } + + private Exchange get(final String key, final String repositoryName, final CamelContext camelContext) { + return transactionTemplateReadOnly.execute(new TransactionCallback<Exchange>() { + public Exchange doInTransaction(TransactionStatus status) { + try { + + Map<String, Object> columns = jdbcTemplate.queryForMap( + String.format("SELECT %1$s, %2$s FROM %3$s WHERE %4$s=?", EXCHANGE, VERSION, repositoryName, ID), + new Object[]{key}, new int[]{Types.VARCHAR}); + + byte[] marshalledExchange = (byte[]) columns.get(EXCHANGE); + long version = (long) columns.get(VERSION); + + Exchange result = codec.unmarshallExchange(camelContext, marshalledExchange); + result.setProperty(VERSION_PROPERTY, version); + return result; + + } catch (EmptyResultDataAccessException ex) { + return null; + } catch (IOException ex) { + // Rollback the transaction + throw new RuntimeException("Error getting key " + key + " from repository " + repositoryName, ex); + } catch (ClassNotFoundException ex) { + // Rollback the transaction + throw new RuntimeException(ex); + } + } + }); + } + + @Override + public void remove(final CamelContext camelContext, final String correlationId, final Exchange exchange) { + transactionTemplate.execute(new TransactionCallbackWithoutResult() { + protected void doInTransactionWithoutResult(TransactionStatus status) { + final String key = correlationId; + final String confirmKey = exchange.getExchangeId(); + final long version = exchange.getProperty(VERSION_PROPERTY, Long.class); + try { + LOG.debug("Removing key {}", key); + + jdbcTemplate.update("DELETE FROM " + getRepositoryName() + " WHERE " + ID + " = ? AND " + VERSION + " = ?", key, version); + + insert(camelContext, confirmKey, exchange, getRepositoryNameCompleted(), version); + + } catch (Exception e) { + throw new RuntimeException("Error removing key " + key + " from repository " + repositoryName, e); + } + } + }); + } + + @Override + public void confirm(final CamelContext camelContext, final String exchangeId) { + transactionTemplate.execute(new TransactionCallbackWithoutResult() { + protected void doInTransactionWithoutResult(TransactionStatus status) { + LOG.debug("Confirming exchangeId {}", exchangeId); + final String confirmKey = exchangeId; + + jdbcTemplate.update("DELETE FROM " + getRepositoryNameCompleted() + " WHERE " + ID + " = ?", + new Object[]{confirmKey}); + + } + }); + } + + @Override + public Set<String> getKeys() { + return getKeys(getRepositoryName()); + } + + @Override + public Set<String> scan(CamelContext camelContext) { + return getKeys(getRepositoryNameCompleted()); + } + + /** + * Returns the keys in the given repository + * + * @param repositoryName The name of the table + * @return Set of keys in the given repository name + */ + protected Set<String> getKeys(final String repositoryName) { + return transactionTemplateReadOnly.execute(new TransactionCallback<LinkedHashSet<String>>() { + public LinkedHashSet<String> doInTransaction(TransactionStatus status) { + List<String> keys = jdbcTemplate.query("SELECT " + ID + " FROM " + repositoryName, + new RowMapper<String>() { + public String mapRow(ResultSet rs, int rowNum) throws SQLException { + String id = rs.getString(ID); + LOG.trace("getKey {}", id); + return id; + } + }); + return new LinkedHashSet<>(keys); + } + }); + } + + @Override + public Exchange recover(CamelContext camelContext, String exchangeId) { + final String key = exchangeId; + Exchange answer = get(key, getRepositoryNameCompleted(), camelContext); + LOG.debug("Recovering exchangeId {} -> {}", key, answer); + return answer; + } + + /** + * If recovery is enabled then a background task is run every x'th time to scan for failed exchanges to recover + * and resubmit. By default this interval is 5000 millis. + */ + @Override + public void setRecoveryInterval(long interval, TimeUnit timeUnit) { + this.recoveryInterval = timeUnit.toMillis(interval); + } + + @Override + public void setRecoveryInterval(long interval) { + this.recoveryInterval = interval; + } + + @Override + public long getRecoveryIntervalInMillis() { + return recoveryInterval; + } + + @Override + public boolean isUseRecovery() { + return useRecovery; + } + + /** + * Whether or not recovery is enabled. This option is by default true. When enabled the Camel + * Aggregator automatic recover failed aggregated exchange and have them resubmitted. + */ + @Override + public void setUseRecovery(boolean useRecovery) { + this.useRecovery = useRecovery; + } + + @Override + public int getMaximumRedeliveries() { + return maximumRedeliveries; + } + + @Override + public void setMaximumRedeliveries(int maximumRedeliveries) { + this.maximumRedeliveries = maximumRedeliveries; + } + + @Override + public String getDeadLetterUri() { + return deadLetterUri; + } + + /** + * An endpoint uri for a Dead Letter Channel where exhausted recovered Exchanges will be + * moved. If this option is used then the maximumRedeliveries option must also be provided. + * Important note : if the deadletter route throws an exception, it will be send again to DLQ until it succeed ! + */ + public void setDeadLetterUri(String deadLetterUri) { + this.deadLetterUri = deadLetterUri; + } + + public boolean isReturnOldExchange() { + return returnOldExchange; + } + + /** + * Whether the get operation should return the old existing Exchange if any existed. + * By default this option is false to optimize as we do not need the old exchange when aggregating. + */ + public void setReturnOldExchange(boolean returnOldExchange) { + this.returnOldExchange = returnOldExchange; + } + + public void setJdbcCamelCodec(JdbcCamelCodec codec) { + this.codec = codec; + } + + public boolean hasHeadersToStoreAsText() { + return this.headersToStoreAsText != null && !this.headersToStoreAsText.isEmpty(); + } + + public List<String> getHeadersToStoreAsText() { + return headersToStoreAsText; + } + + /** + * Allows to store headers as String which is human readable. By default this option is disabled, + * storing the headers in binary format. + * + * @param headersToStoreAsText the list of headers to store as String + */ + public void setHeadersToStoreAsText(List<String> headersToStoreAsText) { + this.headersToStoreAsText = headersToStoreAsText; + } + + public boolean isStoreBodyAsText() { + return storeBodyAsText; + } + + /** + * Whether to store the message body as String which is human readable. + * By default this option is false storing the body in binary format. + */ + public void setStoreBodyAsText(boolean storeBodyAsText) { + this.storeBodyAsText = storeBodyAsText; + } + + public boolean isAllowSerializedHeaders() { + return allowSerializedHeaders; + } + + public void setAllowSerializedHeaders(boolean allowSerializedHeaders) { + this.allowSerializedHeaders = allowSerializedHeaders; + } + + public int getPropagationBehavior() { + return propagationBehavior; + } + + /** + * Sets propagation behavior to use with spring transaction templates which are used for database access. + * The default is TransactionDefinition.PROPAGATION_REQUIRED. + */ + public void setPropagationBehavior(int propagationBehavior) { + this.propagationBehavior = propagationBehavior; + } + + /** + * Sets propagation behavior to use with spring transaction templates which are used for database access. + * The default is TransactionDefinition.PROPAGATION_REQUIRED. This setter accepts names of the constants, like + * "PROPAGATION_REQUIRED". + * @param propagationBehaviorName + */ + public void setPropagationBehaviorName(String propagationBehaviorName) { + if (!propagationBehaviorName.startsWith(DefaultTransactionDefinition.PREFIX_PROPAGATION)) { + throw new IllegalArgumentException("Only propagation constants allowed"); + } + setPropagationBehavior(PROPAGATION_CONSTANTS.asNumber(propagationBehaviorName).intValue()); + } + + public LobHandler getLobHandler() { + return lobHandler; + } + + /** + * Sets a custom LobHandler to use + */ + public void setLobHandler(LobHandler lobHandler) { + this.lobHandler = lobHandler; + } + + public JdbcOptimisticLockingExceptionMapper getJdbcOptimisticLockingExceptionMapper() { + return jdbcOptimisticLockingExceptionMapper; + } + + public void setJdbcOptimisticLockingExceptionMapper(JdbcOptimisticLockingExceptionMapper jdbcOptimisticLockingExceptionMapper) { + this.jdbcOptimisticLockingExceptionMapper = jdbcOptimisticLockingExceptionMapper; + } + + public String getRepositoryName() { + return repositoryName; + } + + public String getRepositoryNameCompleted() { + return getRepositoryName() + "_completed"; + } + + @Override + protected void doStart() throws Exception { + ObjectHelper.notNull(repositoryName, "RepositoryName"); + ObjectHelper.notNull(transactionManager, "TransactionManager"); + ObjectHelper.notNull(dataSource, "DataSource"); + + transactionTemplate = new TransactionTemplate(transactionManager); + transactionTemplate.setPropagationBehavior(propagationBehavior); + + transactionTemplateReadOnly = new TransactionTemplate(transactionManager); + transactionTemplateReadOnly.setPropagationBehavior(propagationBehavior); + transactionTemplateReadOnly.setReadOnly(true); + + // log number of existing exchanges + int current = getKeys().size(); + int completed = scan(null).size(); + + if (current > 0) { + LOG.info("On startup there are " + current + " aggregate exchanges (not completed) in repository: " + getRepositoryName()); + } else { + LOG.info("On startup there are no existing aggregate exchanges (not completed) in repository: {}", getRepositoryName()); + } + if (completed > 0) { + LOG.warn("On startup there are " + completed + " completed exchanges to be recovered in repository: " + getRepositoryNameCompleted()); + } else { + LOG.info("On startup there are no completed exchanges to be recovered in repository: {}", getRepositoryNameCompleted()); + } + } + + @Override + protected void doStop() throws Exception { + // noop + } + +} diff --git a/components/camel-sql/src/test/java/org/apache/camel/processor/aggregate/jdbc/AbstractJdbcAggregationTestSupport.java b/components/camel-sql/src/test/java/org/apache/camel/processor/aggregate/jdbc/AbstractJdbcAggregationTestSupport.java index 94d4777..1f7229d 100644 --- a/components/camel-sql/src/test/java/org/apache/camel/processor/aggregate/jdbc/AbstractJdbcAggregationTestSupport.java +++ b/components/camel-sql/src/test/java/org/apache/camel/processor/aggregate/jdbc/AbstractJdbcAggregationTestSupport.java @@ -30,14 +30,14 @@ public abstract class AbstractJdbcAggregationTestSupport extends CamelSpringTest @Override public void postProcessTest() throws Exception { super.postProcessTest(); - + repo = applicationContext.getBean("repo1", JdbcAggregationRepository.class); configureJdbcAggregationRepository(); } - + void configureJdbcAggregationRepository() { } - + @Override protected RouteBuilder createRouteBuilder() throws Exception { return new RouteBuilder() { @@ -54,16 +54,23 @@ public abstract class AbstractJdbcAggregationTestSupport extends CamelSpringTest // END SNIPPET: e1 }; } - + long getCompletionInterval() { return 5000; } - + @Override protected AbstractApplicationContext createApplicationContext() { return new ClassPathXmlApplicationContext("org/apache/camel/processor/aggregate/jdbc/JdbcSpringDataSource.xml"); } + protected Exchange repoAddAndGet(String key, Exchange exchange) { + repo.add(context, key, exchange); + // recover the exchange with the new version to be able to add again + exchange = repo.get(context, key); + return exchange; + } + public static class MyAggregationStrategy implements AggregationStrategy { public Exchange aggregate(Exchange oldExchange, Exchange newExchange) { diff --git a/components/camel-sql/src/test/java/org/apache/camel/processor/aggregate/jdbc/JdbcAggregateLoadAndRecoverTest.java b/components/camel-sql/src/test/java/org/apache/camel/processor/aggregate/jdbc/JdbcAggregateLoadAndRecoverTest.java index 840184e..b58812b 100644 --- a/components/camel-sql/src/test/java/org/apache/camel/processor/aggregate/jdbc/JdbcAggregateLoadAndRecoverTest.java +++ b/components/camel-sql/src/test/java/org/apache/camel/processor/aggregate/jdbc/JdbcAggregateLoadAndRecoverTest.java @@ -38,7 +38,7 @@ public class JdbcAggregateLoadAndRecoverTest extends AbstractJdbcAggregationTest public void testLoadAndRecoverJdbcAggregate() throws Exception { MockEndpoint mock = getMockEndpoint("mock:result"); mock.expectedMessageCount(SIZE / 10); - mock.setResultWaitTime(50 * 1000); + mock.setResultWaitTime(5_000); LOG.info("Staring to send " + SIZE + " messages."); @@ -73,13 +73,17 @@ public class JdbcAggregateLoadAndRecoverTest extends AbstractJdbcAggregationTest return new RouteBuilder() { @Override public void configure() throws Exception { + onException(IllegalStateException.class) + .maximumRedeliveries(3) + .redeliveryDelay(100L); + from("seda:start?size=" + SIZE) .to("log:input?groupSize=500") .aggregate(header("id"), new MyAggregationStrategy()) .aggregationRepository(repo) .completionSize(10) .to("log:output?showHeaders=true") - // have every 10th exchange fail which should then be recovered + // have every 10th exchange fail which should then be recovered .process(new Processor() { public void process(Exchange exchange) throws Exception { //Avoid same message to be discarded twice diff --git a/components/camel-sql/src/test/java/org/apache/camel/processor/aggregate/jdbc/JdbcAggregateNotLostTest.java b/components/camel-sql/src/test/java/org/apache/camel/processor/aggregate/jdbc/JdbcAggregateNotLostTest.java index 0c75a88..8722d8d 100644 --- a/components/camel-sql/src/test/java/org/apache/camel/processor/aggregate/jdbc/JdbcAggregateNotLostTest.java +++ b/components/camel-sql/src/test/java/org/apache/camel/processor/aggregate/jdbc/JdbcAggregateNotLostTest.java @@ -35,11 +35,11 @@ public class JdbcAggregateNotLostTest extends AbstractJdbcAggregationTestSupport template.sendBodyAndHeader("direct:start", "D", "id", 123); template.sendBodyAndHeader("direct:start", "E", "id", 123); - assertMockEndpointsSatisfied(30, TimeUnit.SECONDS); + assertMockEndpointsSatisfied(5, TimeUnit.SECONDS); String exchangeId = getMockEndpoint("mock:aggregated").getReceivedExchanges().get(0).getExchangeId(); - // the exchange should be in the completed repo where we should be able to find it + Exchange completed = repo.recover(context, exchangeId); // assert the exchange was not lost and we got all the information still assertNotNull(completed); @@ -63,7 +63,7 @@ public class JdbcAggregateNotLostTest extends AbstractJdbcAggregationTestSupport .completionSize(5).aggregationRepository(repo) .log("aggregated exchange id ${exchangeId} with ${body}") .to("mock:aggregated") - // throw an exception to fail, which we then will loose this message + // throw an exception to fail, which we then will loose this message .throwException(new IllegalArgumentException("Damn")) .to("mock:result") .end(); diff --git a/components/camel-sql/src/test/java/org/apache/camel/processor/aggregate/jdbc/JdbcAggregateRecoverDeadLetterChannelTest.java b/components/camel-sql/src/test/java/org/apache/camel/processor/aggregate/jdbc/JdbcAggregateRecoverDeadLetterChannelTest.java index af26a56..39f6b2e 100644 --- a/components/camel-sql/src/test/java/org/apache/camel/processor/aggregate/jdbc/JdbcAggregateRecoverDeadLetterChannelTest.java +++ b/components/camel-sql/src/test/java/org/apache/camel/processor/aggregate/jdbc/JdbcAggregateRecoverDeadLetterChannelTest.java @@ -69,4 +69,4 @@ public class JdbcAggregateRecoverDeadLetterChannelTest extends AbstractJdbcAggre } }; } -} \ No newline at end of file +} diff --git a/components/camel-sql/src/test/java/org/apache/camel/processor/aggregate/jdbc/JdbcAggregationRepositoryAlotDataTest.java b/components/camel-sql/src/test/java/org/apache/camel/processor/aggregate/jdbc/JdbcAggregationRepositoryAlotDataTest.java index 5025c40..bd3af20 100644 --- a/components/camel-sql/src/test/java/org/apache/camel/processor/aggregate/jdbc/JdbcAggregationRepositoryAlotDataTest.java +++ b/components/camel-sql/src/test/java/org/apache/camel/processor/aggregate/jdbc/JdbcAggregationRepositoryAlotDataTest.java @@ -67,4 +67,4 @@ public class JdbcAggregationRepositoryAlotDataTest extends AbstractJdbcAggregati assertEquals("counter:" + i, actual.getIn().getBody()); } } -} \ No newline at end of file +} diff --git a/components/camel-sql/src/test/java/org/apache/camel/processor/aggregate/jdbc/JdbcAggregationRepositoryMultipleRepoTest.java b/components/camel-sql/src/test/java/org/apache/camel/processor/aggregate/jdbc/JdbcAggregationRepositoryMultipleRepoTest.java index be07b7b..e41ef7f 100644 --- a/components/camel-sql/src/test/java/org/apache/camel/processor/aggregate/jdbc/JdbcAggregationRepositoryMultipleRepoTest.java +++ b/components/camel-sql/src/test/java/org/apache/camel/processor/aggregate/jdbc/JdbcAggregationRepositoryMultipleRepoTest.java @@ -53,8 +53,9 @@ public class JdbcAggregationRepositoryMultipleRepoTest extends CamelSpringTestSu assertEquals("counter:1", actual.getIn().getBody()); assertEquals(null, repo2.get(context, "foo")); - // Change it.. + // Change it after reading the current exchange with version Exchange exchange2 = new DefaultExchange(context); + exchange2 = repo1.get(context, "foo"); exchange2.getIn().setBody("counter:2"); actual = repo1.add(context, "foo", exchange2); // the old one diff --git a/components/camel-sql/src/test/java/org/apache/camel/processor/aggregate/jdbc/JdbcAggregationRepositoryRecoverExistingTest.java b/components/camel-sql/src/test/java/org/apache/camel/processor/aggregate/jdbc/JdbcAggregationRepositoryRecoverExistingTest.java index 1a2361f..f750f7d 100644 --- a/components/camel-sql/src/test/java/org/apache/camel/processor/aggregate/jdbc/JdbcAggregationRepositoryRecoverExistingTest.java +++ b/components/camel-sql/src/test/java/org/apache/camel/processor/aggregate/jdbc/JdbcAggregationRepositoryRecoverExistingTest.java @@ -28,7 +28,7 @@ public class JdbcAggregationRepositoryRecoverExistingTest extends AbstractJdbcAg repo.setReturnOldExchange(true); repo.setUseRecovery(true); } - + @Override protected RouteBuilder createRouteBuilder() throws Exception { return new RouteBuilder() { @@ -48,7 +48,8 @@ public class JdbcAggregationRepositoryRecoverExistingTest extends AbstractJdbcAg Exchange actual = repo.add(context, "foo", exchange1); assertEquals(null, actual); - // Remove it, which makes it in the pre confirm stage + // Get and remove it, which makes it in the pre confirm stage + exchange1 = repo.get(context, "foo"); repo.remove(context, "foo", exchange1); String id = exchange1.getExchangeId(); diff --git a/components/camel-sql/src/test/java/org/apache/camel/processor/aggregate/jdbc/JdbcAggregationRepositoryTest.java b/components/camel-sql/src/test/java/org/apache/camel/processor/aggregate/jdbc/JdbcAggregationRepositoryTest.java index 69b377a..c2daf52 100644 --- a/components/camel-sql/src/test/java/org/apache/camel/processor/aggregate/jdbc/JdbcAggregationRepositoryTest.java +++ b/components/camel-sql/src/test/java/org/apache/camel/processor/aggregate/jdbc/JdbcAggregationRepositoryTest.java @@ -25,7 +25,7 @@ public class JdbcAggregationRepositoryTest extends AbstractJdbcAggregationTestSu void configureJdbcAggregationRepository() { repo.setReturnOldExchange(true); } - + @Test public void testOperations() { // Can't get something we have not put in... @@ -42,8 +42,9 @@ public class JdbcAggregationRepositoryTest extends AbstractJdbcAggregationTestSu actual = repo.get(context, "foo"); assertEquals("counter:1", actual.getIn().getBody()); - // Change it.. + // Change it after reading the current exchange with version Exchange exchange2 = new DefaultExchange(context); + exchange2 = repo.get(context, "foo"); exchange2.getIn().setBody("counter:2"); actual = repo.add(context, "foo", exchange2); // the old one diff --git a/components/camel-sql/src/test/java/org/apache/camel/processor/aggregate/jdbc/JdbcExchangeSerializationTest.java b/components/camel-sql/src/test/java/org/apache/camel/processor/aggregate/jdbc/JdbcExchangeSerializationTest.java index 7f16fe0..de25da9 100644 --- a/components/camel-sql/src/test/java/org/apache/camel/processor/aggregate/jdbc/JdbcExchangeSerializationTest.java +++ b/components/camel-sql/src/test/java/org/apache/camel/processor/aggregate/jdbc/JdbcExchangeSerializationTest.java @@ -26,6 +26,7 @@ public class JdbcExchangeSerializationTest extends AbstractJdbcAggregationTestSu @Test public void testExchangeSerialization() { + final String key = "foo"; Exchange exchange = new DefaultExchange(context); exchange.getIn().setBody("Hello World"); exchange.getIn().setHeader("name", "Olivier"); @@ -35,9 +36,9 @@ public class JdbcExchangeSerializationTest extends AbstractJdbcAggregationTestSu Date now = new Date(); exchange.getIn().setHeader("date", now); - repo.add(context, "foo", exchange); + exchange = repoAddAndGet(key, exchange); - Exchange actual = repo.get(context, "foo"); + Exchange actual = repo.get(context, key); assertEquals("Hello World", actual.getIn().getBody()); assertEquals("Olivier", actual.getIn().getHeader("name")); assertEquals(123, actual.getIn().getHeader("number")); @@ -53,9 +54,9 @@ public class JdbcExchangeSerializationTest extends AbstractJdbcAggregationTestSu exchange.getIn().setHeader("name", "Thomas"); exchange.getIn().removeHeader("date"); - repo.add(context, "foo", exchange); + exchange = repoAddAndGet(key, exchange); - actual = repo.get(context, "foo"); + actual = repo.get(context, key); assertEquals("Bye World", actual.getIn().getBody()); assertEquals("Thomas", actual.getIn().getHeader("name")); assertEquals(123, actual.getIn().getHeader("number")); diff --git a/components/camel-sql/src/test/java/org/apache/camel/processor/aggregate/jdbc/JdbcGrowIssueTest.java b/components/camel-sql/src/test/java/org/apache/camel/processor/aggregate/jdbc/JdbcGrowIssueTest.java index 66ffd37..b919b25 100644 --- a/components/camel-sql/src/test/java/org/apache/camel/processor/aggregate/jdbc/JdbcGrowIssueTest.java +++ b/components/camel-sql/src/test/java/org/apache/camel/processor/aggregate/jdbc/JdbcGrowIssueTest.java @@ -31,8 +31,8 @@ public class JdbcGrowIssueTest extends AbstractJdbcAggregationTestSupport { for (int i = 0; i < SIZE; i++) { sb.append("X"); } - Exchange item = new DefaultExchange(context); - item.getIn().setBody(sb.toString(), String.class); + Exchange exchange = new DefaultExchange(context); + exchange.getIn().setBody(sb.toString(), String.class); // the key final String key = "foo"; @@ -40,7 +40,7 @@ public class JdbcGrowIssueTest extends AbstractJdbcAggregationTestSupport { // we update using the same key, which means we should be able to do this within the file size limit for (int i = 0; i < SIZE; i++) { log.debug("Updating " + i); - repo.add(context, key, item); + exchange = repoAddAndGet(key, exchange); } // get the last diff --git a/components/camel-sql/src/test/resources/sql/init.sql b/components/camel-sql/src/test/resources/sql/init.sql index 5396248..271e72e 100644 --- a/components/camel-sql/src/test/resources/sql/init.sql +++ b/components/camel-sql/src/test/resources/sql/init.sql @@ -24,4 +24,4 @@ CREATE TABLE aggregationRepo1_completed ( id varchar(255) NOT NULL, exchange blob NOT NULL, constraint aggregationRepo1_completed_pk PRIMARY KEY (id) -); \ No newline at end of file +); diff --git a/components/camel-sql/src/test/resources/sql/init2.sql b/components/camel-sql/src/test/resources/sql/init2.sql index 515179a..18f17c0 100644 --- a/components/camel-sql/src/test/resources/sql/init2.sql +++ b/components/camel-sql/src/test/resources/sql/init2.sql @@ -24,4 +24,4 @@ CREATE TABLE aggregationRepo2_completed ( id varchar(255) NOT NULL, exchange blob NOT NULL, constraint aggregationRepo2_completed_pk PRIMARY KEY (id) -); \ No newline at end of file +); diff --git a/components/camel-sql/src/test/resources/sql/init3.sql b/components/camel-sql/src/test/resources/sql/init3.sql index 2bef291..69450d0 100644 --- a/components/camel-sql/src/test/resources/sql/init3.sql +++ b/components/camel-sql/src/test/resources/sql/init3.sql @@ -30,4 +30,4 @@ CREATE TABLE aggregationRepo3_completed ( companyName varchar(1000), accountName varchar(1000), constraint aggregationRepo3_completed_pk PRIMARY KEY (id) -); \ No newline at end of file +);