This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git


The following commit(s) were added to refs/heads/main by this push:
     new 82681c5  [CAMEL-4271]  JDBCAggregateRepository and Recovery in a 
Cluster (#6812)
82681c5 is described below

commit 82681c5505ddb7c8fcb2dad59d7fdcf5eb78d437
Author: Benjamin BONNET <[email protected]>
AuthorDate: Wed Jan 26 06:53:22 2022 +0100

    [CAMEL-4271]  JDBCAggregateRepository and Recovery in a Cluster (#6812)
    
    * CAMEL-4271 : recover task in cluster
    
    * Specializing JdbcAggregationRepository for cluster
---
 .../camel-sql/src/main/docs/sql-component.adoc     |  11 ++
 .../jdbc/ClusteredJdbcAggregationRepository.java   | 203 +++++++++++++++++++++
 ...=> ClusteredPostgresAggregationRepository.java} |  14 +-
 .../aggregate/jdbc/JdbcAggregationRepository.java  |  24 ++-
 .../jdbc/PostgresAggregationRepository.java        |   4 +-
 ...bstractClusteredJdbcAggregationTestSupport.java |  65 +++++++
 .../jdbc/ClusteredJdbcAggregateRecoverTest.java    |  87 +++++++++
 .../aggregate/jdbc/JdbcSpringDataSource.xml        |  22 ++-
 .../camel-sql/src/test/resources/sql/init5.sql     |  30 +++
 9 files changed, 442 insertions(+), 18 deletions(-)

diff --git a/components/camel-sql/src/main/docs/sql-component.adoc 
b/components/camel-sql/src/main/docs/sql-component.adoc
index 82ba8c0..6988467 100644
--- a/components/camel-sql/src/main/docs/sql-component.adoc
+++ b/components/camel-sql/src/main/docs/sql-component.adoc
@@ -747,6 +747,17 @@ 
class="org.apache.camel.processor.aggregate.jdbc.JdbcAggregationRepository">
 Propagation is specified by constants of 
`org.springframework.transaction.TransactionDefinition` interface,
 so `propagationBehaviorName` is convenient setter that allows to use names of 
the constants.
 
+=== Clustering
+JdbcAggregationRepository does not provide recovery in a clustered environment.
+
+You may use ClusteredJdbcAggregationRepository that provides a limited support 
for recovery in a clustered environment : recovery mechanism is dealt 
separately by members of the cluster, i.e. a member may only recover exchanges 
that it completed itself.
+
+To enable this behaviour, property `recoverByInstance` must be set to true, 
and `instanceId` property must be defined using a unique identifier (a string) 
for each member of the cluster.
+
+Besides, completed table must have a `instance_id VARCHAR(255)` column.
+
+N.B. : since each member is the only responsible for the recovery of its 
completed exchanges, if a member is stopped, its completed exchanges will not 
be recovered until it is restarted, unless you update completed table to affect 
them to another member (by changing `instance_id` for those completed 
exchanges).
+
 === PostgreSQL case
 
 There's special database that may cause problems with optimistic locking used 
by `JdbcAggregationRepository`.
diff --git 
a/components/camel-sql/src/main/java/org/apache/camel/processor/aggregate/jdbc/ClusteredJdbcAggregationRepository.java
 
b/components/camel-sql/src/main/java/org/apache/camel/processor/aggregate/jdbc/ClusteredJdbcAggregationRepository.java
new file mode 100644
index 0000000..b65f35d
--- /dev/null
+++ 
b/components/camel-sql/src/main/java/org/apache/camel/processor/aggregate/jdbc/ClusteredJdbcAggregationRepository.java
@@ -0,0 +1,203 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor.aggregate.jdbc;
+
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Set;
+
+import javax.sql.DataSource;
+
+import org.apache.camel.CamelContext;
+import org.apache.camel.Exchange;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.jdbc.core.RowMapper;
+import 
org.springframework.jdbc.core.support.AbstractLobCreatingPreparedStatementCallback;
+import org.springframework.jdbc.support.lob.LobCreator;
+import org.springframework.transaction.PlatformTransactionManager;
+import org.springframework.transaction.TransactionStatus;
+import org.springframework.transaction.support.TransactionCallback;
+import 
org.springframework.transaction.support.TransactionCallbackWithoutResult;
+
+/**
+ * JDBC based {@link org.apache.camel.spi.AggregationRepository} 
JdbcAggregationRepository will only preserve any
+ * Serializable compatible data types. If a data type is not such a type its 
dropped and a WARN is logged. And it only
+ * persists the Message body and the Message headers. The Exchange properties 
are not persisted.
+ */
+public class ClusteredJdbcAggregationRepository extends 
JdbcAggregationRepository {
+
+    private static final String INSTANCE_ID = "instance_id";
+    private static final Logger LOG = 
LoggerFactory.getLogger(ClusteredJdbcAggregationRepository.class);
+
+    private String instanceId = "DEFAULT";
+    private boolean recoveryByInstance;
+
+    /**
+     * Creates an aggregation repository
+     */
+    public ClusteredJdbcAggregationRepository() {
+    }
+
+    /**
+     * Creates an aggregation repository with the three mandatory parameters
+     */
+    public ClusteredJdbcAggregationRepository(PlatformTransactionManager 
transactionManager, String repositoryName,
+                                              DataSource dataSource) {
+        this.setRepositoryName(repositoryName);
+        this.setTransactionManager(transactionManager);
+        this.setDataSource(dataSource);
+    }
+
+    @Override
+    public void remove(final CamelContext camelContext, final String 
correlationId, final Exchange exchange) {
+        transactionTemplate.execute(new TransactionCallbackWithoutResult() {
+            protected void doInTransactionWithoutResult(TransactionStatus 
status) {
+                final String key = correlationId;
+                final String confirmKey = exchange.getExchangeId();
+                final long version = exchange.getProperty(VERSION_PROPERTY, 
Long.class);
+                try {
+                    LOG.debug("Removing key {}", key);
+
+                    jdbcTemplate.update("DELETE FROM " + getRepositoryName() + 
" WHERE " + ID + " = ? AND " + VERSION + " = ?",
+                            key, version);
+
+                    insert(camelContext, confirmKey, exchange, 
getRepositoryNameCompleted(), version, true);
+
+                } catch (Exception e) {
+                    throw new RuntimeException("Error removing key " + key + " 
from repository " + getRepositoryName(), e);
+                }
+            }
+        });
+    }
+
+    /**
+     * Inserts a new record into the given repository table. Note: the 
exchange properties are NOT persisted.
+     *
+     * @param camelContext   Current CamelContext
+     * @param correlationId  Correlation key
+     * @param exchange       Aggregated exchange to insert
+     * @param repositoryName Table's name
+     * @param version        Version identifier
+     */
+    protected void insert(
+            final CamelContext camelContext, final String correlationId, final 
Exchange exchange,
+            final String repositoryName, final Long version, final boolean 
completed)
+            throws Exception {
+        // The default totalParameterIndex is 3 for ID, Exchange and version. 
Depending
+        // on logic this will be increased.
+        int totalParameterIndex = 3;
+        StringBuilder queryBuilder = new StringBuilder().append("INSERT INTO 
").append(repositoryName).append('(')
+                .append(EXCHANGE).append(", ").append(ID).append(", 
").append(VERSION);
+
+        if (isStoreBodyAsText()) {
+            queryBuilder.append(", ").append(BODY);
+            totalParameterIndex++;
+        }
+
+        if (hasHeadersToStoreAsText()) {
+            for (String headerName : getHeadersToStoreAsText()) {
+                queryBuilder.append(", ").append(headerName);
+                totalParameterIndex++;
+            }
+        }
+        if (completed && isRecoveryByInstance()) {
+            queryBuilder.append(", ").append(INSTANCE_ID);
+            totalParameterIndex++;
+        }
+        queryBuilder.append(") VALUES (");
+
+        for (int i = 0; i < totalParameterIndex - 1; i++) {
+            queryBuilder.append("?, ");
+        }
+        queryBuilder.append("?)");
+
+        String sql = queryBuilder.toString();
+
+        insertHelper(camelContext, correlationId, exchange, sql, version, 
completed);
+    }
+
+    protected int insertHelper(
+            final CamelContext camelContext, final String key, final Exchange 
exchange,
+            final String sql, final Long version, final boolean completed)
+            throws Exception {
+        final byte[] data = codec.marshallExchange(camelContext, exchange, 
allowSerializedHeaders);
+        Integer insertCount = super.jdbcTemplate.execute(sql,
+                new 
AbstractLobCreatingPreparedStatementCallback(getLobHandler()) {
+                    @Override
+                    protected void setValues(PreparedStatement ps, LobCreator 
lobCreator) throws SQLException {
+                        int totalParameterIndex = 0;
+                        lobCreator.setBlobAsBytes(ps, ++totalParameterIndex, 
data);
+                        ps.setString(++totalParameterIndex, key);
+                        ps.setLong(++totalParameterIndex, version);
+                        if (isStoreBodyAsText()) {
+                            ps.setString(++totalParameterIndex, 
exchange.getIn().getBody(String.class));
+                        }
+                        if (hasHeadersToStoreAsText()) {
+                            for (String headerName : 
getHeadersToStoreAsText()) {
+                                String headerValue = 
exchange.getIn().getHeader(headerName, String.class);
+                                ps.setString(++totalParameterIndex, 
headerValue);
+                            }
+                        }
+                        if (completed && isRecoveryByInstance()) {
+                            ps.setString(++totalParameterIndex, instanceId);
+                        }
+                    }
+                });
+        return insertCount == null ? 0 : insertCount;
+    }
+
+    @Override
+    public Set<String> scan(final CamelContext camelContext) {
+        return transactionTemplateReadOnly.execute(new 
TransactionCallback<LinkedHashSet<String>>() {
+            public LinkedHashSet<String> doInTransaction(final 
TransactionStatus status) {
+                final List<String> keys = jdbcTemplate.query(
+                        "SELECT " + ID + " FROM " + 
getRepositoryNameCompleted()
+                                                             + 
(isRecoveryByInstance()
+                                                                     ? " WHERE 
INSTANCE_ID='" + instanceId + "'" : ""),
+                        new RowMapper<String>() {
+                            public String mapRow(final ResultSet rs, final int 
rowNum) throws SQLException {
+                                final String id = rs.getString(ID);
+                                LOG.trace("getKey {}", id);
+                                return id;
+                            }
+                        });
+                return new LinkedHashSet<>(keys);
+            }
+        });
+    }
+
+    public String getInstanceId() {
+        return instanceId;
+    }
+
+    public void setInstanceId(final String instanceId) {
+        this.instanceId = instanceId;
+    }
+
+    public boolean isRecoveryByInstance() {
+        return recoveryByInstance;
+    }
+
+    public void setRecoveryByInstance(final boolean recoveryByInstance) {
+        this.recoveryByInstance = recoveryByInstance;
+    }
+
+}
diff --git 
a/components/camel-sql/src/main/java/org/apache/camel/processor/aggregate/jdbc/PostgresAggregationRepository.java
 
b/components/camel-sql/src/main/java/org/apache/camel/processor/aggregate/jdbc/ClusteredPostgresAggregationRepository.java
similarity index 86%
copy from 
components/camel-sql/src/main/java/org/apache/camel/processor/aggregate/jdbc/PostgresAggregationRepository.java
copy to 
components/camel-sql/src/main/java/org/apache/camel/processor/aggregate/jdbc/ClusteredPostgresAggregationRepository.java
index 28b48f3..017fa0d 100644
--- 
a/components/camel-sql/src/main/java/org/apache/camel/processor/aggregate/jdbc/PostgresAggregationRepository.java
+++ 
b/components/camel-sql/src/main/java/org/apache/camel/processor/aggregate/jdbc/ClusteredPostgresAggregationRepository.java
@@ -27,19 +27,19 @@ import 
org.springframework.transaction.PlatformTransactionManager;
  * PostgreSQL specific {@link JdbcAggregationRepository} that deals with SQL 
Violation Exceptions using special
  * {@code INSERT INTO .. ON CONFLICT DO NOTHING} claues.
  */
-public class PostgresAggregationRepository extends JdbcAggregationRepository {
+public class ClusteredPostgresAggregationRepository extends 
ClusteredJdbcAggregationRepository {
 
     /**
      * Creates an aggregation repository
      */
-    public PostgresAggregationRepository() {
+    public ClusteredPostgresAggregationRepository() {
     }
 
     /**
      * Creates an aggregation repository with the three mandatory parameters
      */
-    public PostgresAggregationRepository(PlatformTransactionManager 
transactionManager, String repositoryName,
-                                         DataSource dataSource) {
+    public ClusteredPostgresAggregationRepository(PlatformTransactionManager 
transactionManager, String repositoryName,
+                                                  DataSource dataSource) {
         super(transactionManager, repositoryName, dataSource);
     }
 
@@ -51,8 +51,10 @@ public class PostgresAggregationRepository extends 
JdbcAggregationRepository {
      * @param exchange       the aggregated exchange
      * @param repositoryName The name of the table
      */
+    @Override
     protected void insert(
-            final CamelContext camelContext, final String correlationId, final 
Exchange exchange, String repositoryName)
+            final CamelContext camelContext, final String correlationId, final 
Exchange exchange, String repositoryName,
+            final Long version, final boolean completed)
             throws Exception {
         // The default totalParameterIndex is 2 for ID and Exchange. Depending 
on logic this will be increased
         int totalParameterIndex = 2;
@@ -85,7 +87,7 @@ public class PostgresAggregationRepository extends 
JdbcAggregationRepository {
 
         String sql = queryBuilder.toString();
 
-        int updateCount = insertHelper(camelContext, correlationId, exchange, 
sql, 1L);
+        int updateCount = insertHelper(camelContext, correlationId, exchange, 
sql, 1L, completed);
         if (updateCount == 0 && getRepositoryName().equals(repositoryName)) {
             throw new DataIntegrityViolationException("No row was inserted due 
to data violation");
         }
diff --git 
a/components/camel-sql/src/main/java/org/apache/camel/processor/aggregate/jdbc/JdbcAggregationRepository.java
 
b/components/camel-sql/src/main/java/org/apache/camel/processor/aggregate/jdbc/JdbcAggregationRepository.java
index 5cc88a6..1629fb6 100644
--- 
a/components/camel-sql/src/main/java/org/apache/camel/processor/aggregate/jdbc/JdbcAggregationRepository.java
+++ 
b/components/camel-sql/src/main/java/org/apache/camel/processor/aggregate/jdbc/JdbcAggregationRepository.java
@@ -69,31 +69,32 @@ public class JdbcAggregationRepository extends 
ServiceSupport
     protected static final String BODY = "body";
 
     // optimistic locking: version identifier needed to avoid the lost update 
problem
-    private static final String VERSION = "version";
-    private static final String VERSION_PROPERTY = 
"CamelOptimisticLockVersion";
+    protected static final String VERSION = "version";
+    protected static final String VERSION_PROPERTY = 
"CamelOptimisticLockVersion";
 
     private static final Logger LOG = 
LoggerFactory.getLogger(JdbcAggregationRepository.class);
     private static final Constants PROPAGATION_CONSTANTS = new 
Constants(TransactionDefinition.class);
 
+    protected JdbcCamelCodec codec = new JdbcCamelCodec();
+    protected JdbcTemplate jdbcTemplate;
+    protected TransactionTemplate transactionTemplate;
+    protected TransactionTemplate transactionTemplateReadOnly;
+    protected boolean allowSerializedHeaders;
+
     private JdbcOptimisticLockingExceptionMapper 
jdbcOptimisticLockingExceptionMapper
             = new DefaultJdbcOptimisticLockingExceptionMapper();
     private PlatformTransactionManager transactionManager;
     private DataSource dataSource;
-    private TransactionTemplate transactionTemplate;
-    private TransactionTemplate transactionTemplateReadOnly;
     private int propagationBehavior = 
TransactionDefinition.PROPAGATION_REQUIRED;
-    private JdbcTemplate jdbcTemplate;
     private LobHandler lobHandler = new DefaultLobHandler();
     private String repositoryName;
     private boolean returnOldExchange;
-    private JdbcCamelCodec codec = new JdbcCamelCodec();
     private long recoveryInterval = 5000;
     private boolean useRecovery = true;
     private int maximumRedeliveries;
     private String deadLetterUri;
     private List<String> headersToStoreAsText;
     private boolean storeBodyAsText;
-    private boolean allowSerializedHeaders;
 
     /**
      * Creates an aggregation repository
@@ -402,9 +403,12 @@ public class JdbcAggregationRepository extends 
ServiceSupport
             protected void doInTransactionWithoutResult(TransactionStatus 
status) {
                 LOG.debug("Confirming exchangeId {}", exchangeId);
                 final String confirmKey = exchangeId;
-
-                jdbcTemplate.update("DELETE FROM " + 
getRepositoryNameCompleted() + " WHERE " + ID + " = ?",
-                        confirmKey);
+                final int mustBeOne = jdbcTemplate
+                        .update("DELETE FROM " + getRepositoryNameCompleted() 
+ " WHERE " + ID + " = ?", confirmKey);
+                if (mustBeOne != 1) {
+                    LOG.error("problem removing row " + confirmKey + " from " 
+ getRepositoryNameCompleted()
+                              + " - DELETE statement did not return 1 but " + 
mustBeOne);
+                }
 
             }
         });
diff --git 
a/components/camel-sql/src/main/java/org/apache/camel/processor/aggregate/jdbc/PostgresAggregationRepository.java
 
b/components/camel-sql/src/main/java/org/apache/camel/processor/aggregate/jdbc/PostgresAggregationRepository.java
index 28b48f3..4f1da5b 100644
--- 
a/components/camel-sql/src/main/java/org/apache/camel/processor/aggregate/jdbc/PostgresAggregationRepository.java
+++ 
b/components/camel-sql/src/main/java/org/apache/camel/processor/aggregate/jdbc/PostgresAggregationRepository.java
@@ -51,8 +51,10 @@ public class PostgresAggregationRepository extends 
JdbcAggregationRepository {
      * @param exchange       the aggregated exchange
      * @param repositoryName The name of the table
      */
+    @Override
     protected void insert(
-            final CamelContext camelContext, final String correlationId, final 
Exchange exchange, String repositoryName)
+            final CamelContext camelContext, final String correlationId, final 
Exchange exchange, String repositoryName,
+            final Long version)
             throws Exception {
         // The default totalParameterIndex is 2 for ID and Exchange. Depending 
on logic this will be increased
         int totalParameterIndex = 2;
diff --git 
a/components/camel-sql/src/test/java/org/apache/camel/processor/aggregate/jdbc/AbstractClusteredJdbcAggregationTestSupport.java
 
b/components/camel-sql/src/test/java/org/apache/camel/processor/aggregate/jdbc/AbstractClusteredJdbcAggregationTestSupport.java
new file mode 100644
index 0000000..649c3ea
--- /dev/null
+++ 
b/components/camel-sql/src/test/java/org/apache/camel/processor/aggregate/jdbc/AbstractClusteredJdbcAggregationTestSupport.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor.aggregate.jdbc;
+
+import org.apache.camel.AggregationStrategy;
+import org.apache.camel.Exchange;
+import org.apache.camel.test.spring.junit5.CamelSpringTestSupport;
+import org.springframework.context.support.AbstractApplicationContext;
+
+public abstract class AbstractClusteredJdbcAggregationTestSupport extends 
CamelSpringTestSupport {
+
+    ClusteredJdbcAggregationRepository repo;
+    ClusteredJdbcAggregationRepository repobis;
+
+    @Override
+    public void postProcessTest() throws Exception {
+        super.postProcessTest();
+
+        repo = applicationContext.getBean("repo5", 
ClusteredJdbcAggregationRepository.class);
+        repobis = applicationContext.getBean("repo6", 
ClusteredJdbcAggregationRepository.class);
+        configureJdbcAggregationRepository();
+    }
+
+    void configureJdbcAggregationRepository() {
+    }
+
+    long getCompletionInterval() {
+        return 5000;
+    }
+
+    @Override
+    protected AbstractApplicationContext createApplicationContext() {
+        return newAppContext(
+                "JdbcSpringDataSource.xml", "JdbcSpringDataSource.xml");
+    }
+
+    public static class MyAggregationStrategy implements AggregationStrategy {
+
+        @Override
+        public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
+            if (oldExchange == null) {
+                return newExchange;
+            }
+            String body1 = oldExchange.getIn().getBody(String.class);
+            String body2 = newExchange.getIn().getBody(String.class);
+
+            oldExchange.getIn().setBody(body1 + body2);
+            return oldExchange;
+        }
+    }
+}
diff --git 
a/components/camel-sql/src/test/java/org/apache/camel/processor/aggregate/jdbc/ClusteredJdbcAggregateRecoverTest.java
 
b/components/camel-sql/src/test/java/org/apache/camel/processor/aggregate/jdbc/ClusteredJdbcAggregateRecoverTest.java
new file mode 100644
index 0000000..4df1263
--- /dev/null
+++ 
b/components/camel-sql/src/test/java/org/apache/camel/processor/aggregate/jdbc/ClusteredJdbcAggregateRecoverTest.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor.aggregate.jdbc;
+
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.camel.builder.RouteBuilder;
+import org.junit.jupiter.api.Test;
+
+public class ClusteredJdbcAggregateRecoverTest extends 
AbstractClusteredJdbcAggregationTestSupport {
+
+    private static AtomicInteger counter = new AtomicInteger();
+
+    @Override
+    void configureJdbcAggregationRepository() {
+        // enable recovery
+        repo.setUseRecovery(true);
+        // check faster
+        repo.setRecoveryInterval(500, TimeUnit.MILLISECONDS);
+        repo.setRecoveryByInstance(true);
+        repo.setInstanceId("INSTANCE1");
+        repobis.setUseRecovery(true);
+        repobis.setRecoveryInterval(50, TimeUnit.MILLISECONDS);
+        repobis.setRecoveryByInstance(true);
+        repobis.setInstanceId("INSTANCE2");
+
+    }
+
+    @Test
+    public void testJdbcAggregateRecover() throws Exception {
+        // should fail the first 2 times and then recover
+        getMockEndpoint("mock:aggregated").expectedMessageCount(3);
+        getMockEndpoint("mock:result").expectedBodiesReceived("ABCDE");
+        // should be marked as redelivered
+        
getMockEndpoint("mock:result").message(0).header(Exchange.REDELIVERED).isEqualTo(Boolean.TRUE);
+        // on the 2nd redelivery attempt we success
+        
getMockEndpoint("mock:result").message(0).header(Exchange.REDELIVERY_COUNTER).isEqualTo(2);
+
+        template.sendBodyAndHeader("direct:start", "A", "id", 123);
+        template.sendBodyAndHeader("direct:start", "B", "id", 123);
+        template.sendBodyAndHeader("direct:start", "C", "id", 123);
+        template.sendBodyAndHeader("direct:start", "D", "id", 123);
+        template.sendBodyAndHeader("direct:start", "E", "id", 123);
+
+        assertMockEndpointsSatisfied(30, TimeUnit.SECONDS);
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                from("direct:start").aggregate(header("id"), new 
MyAggregationStrategy()).completionSize(5)
+                        .aggregationRepository(repo)
+                        .log("aggregated exchange id ${exchangeId} with 
${body}").to("mock:aggregated").delay(1000)
+                        // simulate errors the first two times
+                        .process(new Processor() {
+                            public void process(Exchange exchange) throws 
Exception {
+                                int count = counter.incrementAndGet();
+                                if (count <= 2) {
+                                    throw new IllegalArgumentException("Damn");
+                                }
+                            }
+                        }).to("mock:result").end();
+                from("direct:tutu").aggregate(header("id"), new 
MyAggregationStrategy()).completionSize(5).aggregationRepository(repobis)
+                        .log("aggregated exchange id ${exchangeId} with 
${body}").log("recover bis!!!!!!!!!!!!!!!!!").end();
+            }
+        };
+    }
+}
diff --git 
a/components/camel-sql/src/test/resources/org/apache/camel/processor/aggregate/jdbc/JdbcSpringDataSource.xml
 
b/components/camel-sql/src/test/resources/org/apache/camel/processor/aggregate/jdbc/JdbcSpringDataSource.xml
index a04a404..45648cb 100644
--- 
a/components/camel-sql/src/test/resources/org/apache/camel/processor/aggregate/jdbc/JdbcSpringDataSource.xml
+++ 
b/components/camel-sql/src/test/resources/org/apache/camel/processor/aggregate/jdbc/JdbcSpringDataSource.xml
@@ -55,12 +55,17 @@
     <jdbc:embedded-database id="{{testClassSimpleName}}-dataSource4" 
type="DERBY">
         <jdbc:script location="classpath:/sql/init4.sql"/>
     </jdbc:embedded-database>
+    
+    <!-- In Memory Database #5 -->
+    <jdbc:embedded-database id="{{testClassSimpleName}}-dataSource5" 
type="DERBY">
+        <jdbc:script location="classpath:/sql/init5.sql"/>
+    </jdbc:embedded-database>
 
     <bean id="repo1" 
class="org.apache.camel.processor.aggregate.jdbc.JdbcAggregationRepository">
         <property name="repositoryName" value="aggregationRepo1"/>
         <property name="transactionManager" ref="txManager1"/>
         <property name="dataSource" ref="{{testClassSimpleName}}-dataSource1"/>
-    </bean>
+    </bean>    
 
     <bean id="repo2" 
class="org.apache.camel.processor.aggregate.jdbc.JdbcAggregationRepository">
         <property name="repositoryName" value="aggregationRepo2"/>
@@ -97,6 +102,17 @@
             </list>
         </property>
     </bean>
+    
+    <bean id="repo5" 
class="org.apache.camel.processor.aggregate.jdbc.ClusteredJdbcAggregationRepository">
+        <property name="repositoryName" value="aggregationRepo5"/>
+        <property name="transactionManager" ref="txManager5"/>
+        <property name="dataSource" ref="{{testClassSimpleName}}-dataSource5"/>
+    </bean>
+    <bean id="repo6" 
class="org.apache.camel.processor.aggregate.jdbc.ClusteredJdbcAggregationRepository">
+        <property name="repositoryName" value="aggregationRepo5"/>
+        <property name="transactionManager" ref="txManager5"/>
+        <property name="dataSource" ref="{{testClassSimpleName}}-dataSource5"/>
+    </bean>
 
     <bean id="txManager1" 
class="org.springframework.jdbc.datasource.DataSourceTransactionManager">
         <property name="dataSource" ref="{{testClassSimpleName}}-dataSource1"/>
@@ -113,5 +129,9 @@
     <bean id="txManager4" 
class="org.springframework.jdbc.datasource.DataSourceTransactionManager">
         <property name="dataSource" ref="{{testClassSimpleName}}-dataSource4"/>
     </bean>
+    
+    <bean id="txManager5" 
class="org.springframework.jdbc.datasource.DataSourceTransactionManager">
+        <property name="dataSource" ref="{{testClassSimpleName}}-dataSource5"/>
+    </bean>
 
 </beans>
diff --git a/components/camel-sql/src/test/resources/sql/init5.sql 
b/components/camel-sql/src/test/resources/sql/init5.sql
new file mode 100644
index 0000000..02c264d
--- /dev/null
+++ b/components/camel-sql/src/test/resources/sql/init5.sql
@@ -0,0 +1,30 @@
+--
+-- Licensed to the Apache Software Foundation (ASF) under one or more
+-- contributor license agreements.  See the NOTICE file distributed with
+-- this work for additional information regarding copyright ownership.
+-- The ASF licenses this file to You under the Apache License, Version 2.0
+-- (the "License"); you may not use this file except in compliance with
+-- the License.  You may obtain a copy of the License at
+--
+--      http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+--
+
+CREATE TABLE aggregationRepo5 (
+    id varchar(255) NOT NULL,
+    exchange blob NOT NULL,
+    version bigint NOT NULL,
+    constraint aggregationRepo1_pk PRIMARY KEY (id)
+);
+CREATE TABLE aggregationRepo5_completed (
+    id varchar(255) NOT NULL,
+    exchange blob NOT NULL,
+    version bigint NOT NULL,
+    instance_id varchar(255),
+    constraint aggregationRepo1_completed_pk PRIMARY KEY (id)
+);
\ No newline at end of file

Reply via email to