This is an automated email from the ASF dual-hosted git repository. davsclaus pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/camel.git
commit f4379741300a1ffc193dd942470d139e8eb91ef0 Author: Claus Ibsen <[email protected]> AuthorDate: Wed Aug 26 11:57:26 2020 +0200 CAMEL-15469: AggregationStrategy using etcd3 as datastore --- components/camel-etcd3/pom.xml | 20 +- .../services/org/apache/camel/other.properties | 7 + .../camel-etcd3/src/generated/resources/etcd3.json | 15 + components/camel-etcd3/src/main/docs/etcd3.adoc | 11 + .../aggregate/Etcd3AggregationRepository.java | 919 +++++++++++---------- .../camel/component/etcd3/AggregateEtcd3Test.java | 41 +- .../component/etcd3/MyAggregationStrategy.java | 17 +- .../src/test/resources/log4j2.properties | 28 + components/camel-redis/pom.xml | 13 + .../aggregate/RedisAggregationRepository.java | 4 + 10 files changed, 605 insertions(+), 470 deletions(-) diff --git a/components/camel-etcd3/pom.xml b/components/camel-etcd3/pom.xml index 6edc0d8..5b0ed80 100644 --- a/components/camel-etcd3/pom.xml +++ b/components/camel-etcd3/pom.xml @@ -30,19 +30,35 @@ <artifactId>camel-etcd3</artifactId> <packaging>jar</packaging> <name>Camel :: Etcd3</name> - <description>Camel Etcd3 support</description> + <description>Aggregation repository using EtcD as datastore</description> + + <properties> + <firstVersion>3.5.0</firstVersion> + <label>database</label> + </properties> <dependencies> <dependency> + <groupId>org.apache.camel</groupId> + <artifactId>camel-support</artifactId> + </dependency> + + <dependency> <groupId>io.etcd</groupId> <artifactId>jetcd-all</artifactId> <version>${jetcd-version}</version> </dependency> + <!-- testing --> <dependency> <groupId>org.apache.camel</groupId> - <artifactId>camel-test-spring-junit5</artifactId> + <artifactId>camel-test-junit5</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.apache.logging.log4j</groupId> + <artifactId>log4j-slf4j-impl</artifactId> <scope>test</scope> </dependency> diff --git a/components/camel-etcd3/src/generated/resources/META-INF/services/org/apache/camel/other.properties b/components/camel-etcd3/src/generated/resources/META-INF/services/org/apache/camel/other.properties new file mode 100644 index 0000000..60f199b --- /dev/null +++ b/components/camel-etcd3/src/generated/resources/META-INF/services/org/apache/camel/other.properties @@ -0,0 +1,7 @@ +# Generated by camel build tools - do NOT edit this file! +name=etcd3 +groupId=org.apache.camel +artifactId=camel-etcd3 +version=3.5.0-SNAPSHOT +projectName=Camel :: Etcd3 +projectDescription=Aggregation repository using EtcD as datastore diff --git a/components/camel-etcd3/src/generated/resources/etcd3.json b/components/camel-etcd3/src/generated/resources/etcd3.json new file mode 100644 index 0000000..c854b52 --- /dev/null +++ b/components/camel-etcd3/src/generated/resources/etcd3.json @@ -0,0 +1,15 @@ +{ + "other": { + "kind": "other", + "name": "etcd3", + "title": "Etcd3", + "description": "Aggregation repository using EtcD as datastore", + "deprecated": false, + "firstVersion": "3.5.0", + "label": "database", + "supportLevel": "Preview", + "groupId": "org.apache.camel", + "artifactId": "camel-etcd3", + "version": "3.5.0-SNAPSHOT" + } +} diff --git a/components/camel-etcd3/src/main/docs/etcd3.adoc b/components/camel-etcd3/src/main/docs/etcd3.adoc new file mode 100644 index 0000000..8b2dfc3 --- /dev/null +++ b/components/camel-etcd3/src/main/docs/etcd3.adoc @@ -0,0 +1,11 @@ +[[etcd3-component]] += Etcd3 Component +:docTitle: Etcd3 +:artifactId: camel-etcd3 +:description: Aggregation repository using EtcD as datastore +:since: 3.5 +:supportLevel: Preview + +*Since Camel {since}* + +The Etcd3 component provides an `AggregationStrategy` to use Etcd3 as the backend datastore. diff --git a/components/camel-etcd3/src/main/java/org/apache/camel/component/etcd3/processor/aggregate/Etcd3AggregationRepository.java b/components/camel-etcd3/src/main/java/org/apache/camel/component/etcd3/processor/aggregate/Etcd3AggregationRepository.java index d9cd429..db5f84d 100644 --- a/components/camel-etcd3/src/main/java/org/apache/camel/component/etcd3/processor/aggregate/Etcd3AggregationRepository.java +++ b/components/camel-etcd3/src/main/java/org/apache/camel/component/etcd3/processor/aggregate/Etcd3AggregationRepository.java @@ -14,7 +14,6 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - package org.apache.camel.component.etcd3.processor.aggregate; import java.io.ByteArrayInputStream; @@ -24,25 +23,13 @@ import java.io.ObjectInputStream; import java.io.ObjectOutputStream; import java.util.Collections; import java.util.List; +import java.util.Objects; import java.util.Set; import java.util.TreeSet; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; -import javax.annotation.PostConstruct; - -import org.apache.camel.CamelContext; -import org.apache.camel.Exchange; -import org.apache.camel.impl.DefaultExchange; -import org.apache.camel.impl.DefaultExchangeHolder; -import org.apache.camel.spi.OptimisticLockingAggregationRepository; -import org.apache.camel.spi.RecoverableAggregationRepository; -import org.apache.camel.support.ServiceSupport; -import org.apache.camel.util.StringHelper; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - import io.etcd.jetcd.ByteSequence; import io.etcd.jetcd.Client; import io.etcd.jetcd.KV; @@ -57,430 +44,486 @@ import io.etcd.jetcd.op.Op; import io.etcd.jetcd.options.DeleteOption; import io.etcd.jetcd.options.GetOption; import io.etcd.jetcd.options.PutOption; +import org.apache.camel.CamelContext; +import org.apache.camel.Exchange; +import org.apache.camel.spi.OptimisticLockingAggregationRepository; +import org.apache.camel.spi.RecoverableAggregationRepository; +import org.apache.camel.support.DefaultExchange; +import org.apache.camel.support.DefaultExchangeHolder; +import org.apache.camel.support.service.ServiceSupport; +import org.apache.camel.util.StringHelper; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class Etcd3AggregationRepository extends ServiceSupport - implements RecoverableAggregationRepository, OptimisticLockingAggregationRepository { - private static final Logger LOG = LoggerFactory.getLogger(Etcd3AggregationRepository.class); - private static final String COMPLETED_SUFFIX = "-completed"; - - private boolean optimistic; - private boolean useRecovery = true; - private String endpoint; - private Client client; - private KV kvClient; - private String prefixName; - private String persistencePrefixName; - private String deadLetterChannel; - private long recoveryInterval = 5000; - private int maximumRedeliveries = 3; - private boolean allowSerializedHeaders; - - public Etcd3AggregationRepository(final String prefixName, final String endpoint) { - this.prefixName = prefixName; - this.persistencePrefixName = String.format("%s%s", prefixName, COMPLETED_SUFFIX); - this.optimistic = false; - this.endpoint = endpoint; - } - - public Etcd3AggregationRepository(final String prefixName, final String persistencePrefixName, - final String endpoint) { - this.prefixName = prefixName; - this.persistencePrefixName = persistencePrefixName; - this.optimistic = false; - this.endpoint = endpoint; - } - - public Etcd3AggregationRepository(final String prefixName, final String endpoint, boolean optimistic) { - this(prefixName, endpoint); - this.optimistic = optimistic; - } - - public Etcd3AggregationRepository(final String repositoryName, final String persistentRepositoryName, - final String endpoint, boolean optimistic) { - this(repositoryName, persistentRepositoryName, endpoint); - this.optimistic = optimistic; - } - - @Override - public Exchange add(CamelContext camelContext, String key, Exchange oldExchange, Exchange newExchange) - throws OptimisticLockingException { - if (!optimistic) { - throw new UnsupportedOperationException(); - } - LOG.trace("Adding an Exchange with ID {} for key {} in an optimistic manner.", newExchange.getExchangeId(), - key); - try { - if (oldExchange == null) { - DefaultExchangeHolder holder = DefaultExchangeHolder.marshal(newExchange, true, allowSerializedHeaders); - CompletableFuture<GetResponse> completableGetResponse = kvClient - .get(ByteSequence.from(String.format("%s/%s", prefixName, key).getBytes())); - GetResponse getResponse = completableGetResponse.get(); - List<KeyValue> keyValues = getResponse.getKvs(); - if (keyValues.isEmpty()) { - CompletableFuture<PutResponse> completablePutResponse = kvClient.put( - ByteSequence.from(String.format("%s/%s", prefixName, key).getBytes()), - convertToEtcd3Format(holder)); - completablePutResponse.get(); - } else { - DefaultExchangeHolder misbehaviorHolder = (DefaultExchangeHolder) convertFromEtcd3Format(keyValues.get(0).getValue()); - Exchange misbehaviorEx = unmarshallExchange(camelContext, misbehaviorHolder); - LOG.error( - "Optimistic locking failed for exchange with key {}: kvClient.get returned Exchange with ID {}, while it's expected no exchanges to be returned", - key, misbehaviorEx != null ? misbehaviorEx.getExchangeId() : "<null>"); - throw new OptimisticLockingException(); - } - } else { - DefaultExchangeHolder newHolder = DefaultExchangeHolder.marshal(newExchange, true, - allowSerializedHeaders); - CompletableFuture<DeleteResponse> completableDeleteResponse = kvClient - .delete(ByteSequence.from(String.format("%s/%s", prefixName, key).getBytes())); - DeleteResponse deleteResponse = completableDeleteResponse.get(); - if (deleteResponse.getDeleted() == 0) { - LOG.error( - "Optimistic locking failed for exchange with key {}: kvClient.get returned no Exchanges, while it's expected to replace one", - key); - throw new OptimisticLockingException(); - } - CompletableFuture<PutResponse> completablePutResponse = kvClient.put( - ByteSequence.from(String.format("%s/%s", prefixName, key).getBytes()), convertToEtcd3Format(newHolder)); - completablePutResponse.get(); - } - } catch (InterruptedException | ExecutionException | IOException | ClassNotFoundException e) { - LOG.error(e.getMessage(), e); - throw new OptimisticLockingException(); - } - LOG.trace("Added an Exchange with ID {} for key {} in optimistic manner.", newExchange.getExchangeId(), key); - return oldExchange; - } - - @Override - public Exchange add(CamelContext camelContext, String key, Exchange exchange) { - if (optimistic) { - throw new UnsupportedOperationException(); - } - LOG.trace("Adding an Exchange with ID {} for key {} in a thread-safe manner.", exchange.getExchangeId(), key); - DefaultExchangeHolder newHolder = DefaultExchangeHolder.marshal(exchange, true, allowSerializedHeaders); - CompletableFuture<GetResponse> completableResponse = kvClient - .get(ByteSequence.from(String.format("%s/%s", prefixName, key).getBytes())); - try { - GetResponse getResponse = completableResponse.get(); - long modRevision = 0; - if(!getResponse.getKvs().isEmpty()){ - modRevision = getResponse.getKvs().get(0).getModRevision(); - } - Txn transaction = kvClient.txn(); - transaction - .If(new Cmp(ByteSequence.from(String.format("%s/%s", prefixName, key).getBytes()), - Cmp.Op.EQUAL, - CmpTarget.ModRevisionCmpTarget.modRevision(modRevision))) - .Then(Op.put(ByteSequence - .from(String.format("%s/%s", prefixName, key).getBytes()), convertToEtcd3Format(newHolder), PutOption.DEFAULT)) - .commit(); - } catch (InterruptedException | ExecutionException | IOException e) { - LOG.error(e.getMessage(), e); - throw new RuntimeException(e.getMessage(),e); - } - return unmarshallExchange(camelContext, newHolder); - } - - @Override - public Set<String> scan(CamelContext camelContext) { - if (useRecovery) { - LOG.trace("Scanning for exchanges to recover in {} context", camelContext.getName()); - CompletableFuture<GetResponse> completableGetResponse = kvClient.get( - ByteSequence.from(persistencePrefixName.getBytes()), - GetOption.newBuilder().withPrefix(ByteSequence.from(persistencePrefixName.getBytes())).build()); - try { - GetResponse getResponse = completableGetResponse.get(); - Set<String> keys = new TreeSet<>(); - getResponse.getKvs().forEach(kv -> keys.add(new String(kv.getKey().getBytes()))); - Set<String> scanned = Collections.unmodifiableSet(keys); - LOG.trace("Found {} keys for exchanges to recover in {} context", scanned.size(), - camelContext.getName()); - return scanned; - } catch (InterruptedException | ExecutionException e) { - LOG.error(e.getMessage(), e); - throw new RuntimeException(e.getMessage(),e); - } - } else { - LOG.warn( - "What for to run recovery scans in {} context while prefix {} is running in non-recoverable aggregation repository mode?!", - camelContext.getName(), prefixName); - return Collections.emptySet(); - } - } - - @Override - public Exchange recover(CamelContext camelContext, String exchangeId) { - LOG.trace("Recovering an Exchange with ID {}.", exchangeId); - CompletableFuture<GetResponse> completableResponse = kvClient - .get(ByteSequence.from(String.format("%s/%s", persistencePrefixName, exchangeId).getBytes())); - try { - GetResponse getResponse = completableResponse.get(); - DefaultExchangeHolder holder = (DefaultExchangeHolder) convertFromEtcd3Format(getResponse.getKvs().get(0).getValue()); - return useRecovery ? unmarshallExchange(camelContext, holder) : null; - } catch (InterruptedException | ExecutionException | IOException | ClassNotFoundException e) { - LOG.error(e.getMessage(), e); - throw new RuntimeException(e.getMessage(),e); - } - } - - @Override - public void setRecoveryInterval(long interval, TimeUnit timeUnit) { - this.recoveryInterval = timeUnit.toMillis(interval); - } - - @Override - public void setRecoveryInterval(long interval) { - this.recoveryInterval = interval; - } - - @Override - public long getRecoveryIntervalInMillis() { - return recoveryInterval; - } - - @Override - public void setUseRecovery(boolean useRecovery) { - this.useRecovery = useRecovery; - } - - @Override - public boolean isUseRecovery() { - return useRecovery; - } - - @Override - public void setDeadLetterUri(String deadLetterUri) { - this.deadLetterChannel = deadLetterUri; - } - - @Override - public String getDeadLetterUri() { - return deadLetterChannel; - } - - @Override - public void setMaximumRedeliveries(int maximumRedeliveries) { - this.maximumRedeliveries = maximumRedeliveries; - } - - @Override - public int getMaximumRedeliveries() { - return maximumRedeliveries; - } - - @Override - public Exchange get(CamelContext camelContext, String key) { - CompletableFuture<GetResponse> completableResponse = kvClient - .get(ByteSequence.from(String.format("%s/%s", prefixName, key).getBytes())); - try { - GetResponse getResponse = completableResponse.get(); - DefaultExchangeHolder holder = null; - if(!getResponse.getKvs().isEmpty()) { - holder = (DefaultExchangeHolder) convertFromEtcd3Format(getResponse.getKvs().get(0).getValue()); - } - return unmarshallExchange(camelContext, holder); - } catch (InterruptedException | ExecutionException | IOException | ClassNotFoundException e) { - LOG.error(e.getMessage(), e); - throw new RuntimeException(e.getMessage(),e); - } - } - - public boolean isAllowSerializedHeaders() { - return allowSerializedHeaders; - } - - public void setAllowSerializedHeaders(boolean allowSerializedHeaders) { - this.allowSerializedHeaders = allowSerializedHeaders; - } - - @Override - public void remove(CamelContext camelContext, String key, Exchange exchange) { - DefaultExchangeHolder holder = DefaultExchangeHolder.marshal(exchange, true, allowSerializedHeaders); - if (optimistic) { - LOG.trace("Removing an exchange with ID {} for key {} in an optimistic manner.", exchange.getExchangeId(), - key); - try { - CompletableFuture<GetResponse> completableGetResponse = kvClient - .get(ByteSequence.from(String.format("%s/%s", prefixName, key).getBytes())); - GetResponse getResponse = completableGetResponse.get(); - List<KeyValue> keyValueList = getResponse.getKvs(); - boolean optimisticLockingError = keyValueList.isEmpty(); - if (!optimisticLockingError) { - DefaultExchangeHolder holderFound = (DefaultExchangeHolder) convertFromEtcd3Format( - keyValueList.get(0).getValue()); - optimisticLockingError = !Objects.equals(holder, holderFound); - if (!optimisticLockingError) { - CompletableFuture<DeleteResponse> completableDeleteResponse = kvClient - .delete(ByteSequence.from(String.format("%s/%s", prefixName, key).getBytes())); - DeleteResponse deleteResponse = completableDeleteResponse.get(); - optimisticLockingError = deleteResponse.getDeleted() == 0; - } - } - if (optimisticLockingError) { - LOG.error( - "Optimistic locking failed for exchange with key {}: kvClient.delete removed no Exchanges, while it's expected to remove one.", - key); - throw new OptimisticLockingException(); - } - - } catch (InterruptedException | ExecutionException | ClassNotFoundException | IOException e) { - LOG.error(e.getMessage(), e); - throw new RuntimeException(e.getMessage(), e); - } - LOG.trace("Removed an exchange with ID {} for key {} in an optimistic manner.", exchange.getExchangeId(), - key); - if (useRecovery) { - LOG.trace( - "Putting an exchange with ID {} for key {} into a recoverable storage in an optimistic manner.", - exchange.getExchangeId(), key); - try { - CompletableFuture<PutResponse> completablePutResponse = kvClient.put( - ByteSequence.from(String.format("%s/%s", persistencePrefixName, key).getBytes()), - convertToEtcd3Format(holder)); - completablePutResponse.get(); - LOG.trace( - "Put an exchange with ID {} for key {} into a recoverable storage in an optimistic manner.", - exchange.getExchangeId(), key); - } catch (IOException | InterruptedException | ExecutionException e) { - LOG.error(e.getMessage(), e); - throw new RuntimeException(e.getMessage(),e); - } - - } - } else { - if (useRecovery) { - LOG.trace("Removing an exchange with ID {} for key {} in a thread-safe manner.", - exchange.getExchangeId(), key); - Txn transaction = kvClient.txn(); - try { - CompletableFuture<GetResponse> completableResponse = kvClient - .get(ByteSequence.from(String.format("%s/%s", prefixName, key).getBytes())); - GetResponse getResponse = completableResponse.get(); - DefaultExchangeHolder removedHolder = (DefaultExchangeHolder) convertFromEtcd3Format(getResponse.getKvs().get(0).getValue()); - transaction - .If(new Cmp(ByteSequence.from(String.format("%s/%s", prefixName, key).getBytes()), - Cmp.Op.EQUAL, - CmpTarget.value( - ByteSequence.from(String.format("%s/%s", prefixName, key).getBytes())))) - .Then(Op.delete(ByteSequence.from(String.format("%s/%s", prefixName, key).getBytes()), - DeleteOption.DEFAULT), - Op.put(ByteSequence - .from(String.format("%s/%s", persistencePrefixName, key).getBytes()), - convertToEtcd3Format(removedHolder), PutOption.DEFAULT)) - .commit(); - LOG.trace("Removed an exchange with ID {} for key {} in a thread-safe manner.", - exchange.getExchangeId(), key); - LOG.trace( - "Put an exchange with ID {} for key {} into a recoverable storage in a thread-safe manner.", - exchange.getExchangeId(), key); - } catch (Throwable throwable) { - throw new RuntimeException(throwable.getMessage(),throwable); - } - } else { - CompletableFuture<DeleteResponse> completableDeleteResponse = kvClient - .delete(ByteSequence.from(String.format("%s/%s", prefixName, key).getBytes())); - try { - completableDeleteResponse.get(); - } catch (InterruptedException | ExecutionException e) { - LOG.error(e.getMessage(), e); - throw new RuntimeException(e.getMessage(),e); - } - } - } - } - - @Override - public void confirm(CamelContext camelContext, String exchangeId) { - LOG.trace("Confirming an exchange with ID {}.", exchangeId); - if (useRecovery) { - CompletableFuture<DeleteResponse> completableDeleteResponse = kvClient - .delete(ByteSequence.from(String.format("%s/%s", persistencePrefixName, exchangeId).getBytes())); - try { - completableDeleteResponse.get(); - } catch (InterruptedException | ExecutionException e) { - LOG.error(e.getMessage(), e); - throw new RuntimeException(e.getMessage(),e); - } - } - } - - @Override - public Set<String> getKeys() { - CompletableFuture<GetResponse> completableGetResponse = kvClient.get(ByteSequence.from(prefixName.getBytes()), - GetOption.newBuilder().withRange(ByteSequence.from(prefixName.getBytes())).build()); - Set<String> scanned = Collections.unmodifiableSet(new TreeSet<>()); - try { - GetResponse getResponse = completableGetResponse.get(); - Set<String> keys = new TreeSet<>(); - getResponse.getKvs().forEach(kv -> keys.add(new String(kv.getKey().getBytes()))); - scanned = Collections.unmodifiableSet(keys); - } catch (InterruptedException | ExecutionException e) { - LOG.error(e.getMessage(), e); - throw new RuntimeException(e.getMessage(),e); - } - return scanned; - } - - @PostConstruct - void init() throws Exception { - if (maximumRedeliveries < 0) { - throw new IllegalArgumentException("Maximum redelivery retries must be zero or a positive integer."); - } - if (recoveryInterval < 0) { - throw new IllegalArgumentException("Recovery interval must be zero or a positive integer."); - } - - } - - @Override - protected void doStart() throws Exception { - StringHelper.notEmpty(prefixName, "prefixName"); - client = Client.builder().endpoints(endpoint).build(); - kvClient = client.getKVClient(); - } - - @Override - protected void doStop() throws Exception { - if(client!=null) { - client.close(); - } - } - - protected Exchange unmarshallExchange(CamelContext camelContext, DefaultExchangeHolder holder) { - Exchange exchange = null; - if (holder != null) { - exchange = new DefaultExchange(camelContext); - DefaultExchangeHolder.unmarshal(exchange, holder); - } - return exchange; - } - - private Object convertFromEtcd3Format(ByteSequence value) throws IOException, ClassNotFoundException { - byte[] data = value.getBytes(); - ByteArrayInputStream in = new ByteArrayInputStream(data); - ObjectInputStream is; - try { - is = new ObjectInputStream(in); - return is.readObject(); - } catch (IOException | ClassNotFoundException e) { - LOG.error(e.getMessage(),e); - throw e; - } - } - - private ByteSequence convertToEtcd3Format(Object value) throws IOException { - ByteArrayOutputStream bos = new ByteArrayOutputStream(); - ObjectOutputStream oos; - try { - oos = new ObjectOutputStream(bos); - oos.writeObject(value); - oos.flush(); - return ByteSequence.from(bos.toByteArray()); - } catch (IOException e) { - LOG.error(e.getMessage(),e); - throw e; - } - } + implements RecoverableAggregationRepository, OptimisticLockingAggregationRepository { + private static final Logger LOG = LoggerFactory.getLogger(Etcd3AggregationRepository.class); + private static final String COMPLETED_SUFFIX = "-completed"; + + private boolean optimistic; + private boolean useRecovery = true; + private String endpoint; + private Client client; + private boolean shutdownClient; + private KV kvClient; + private String prefixName; + private String persistencePrefixName; + private String deadLetterChannel; + private long recoveryInterval = 5000; + private int maximumRedeliveries = 3; + private boolean allowSerializedHeaders; + + public Etcd3AggregationRepository() { + } + + public Etcd3AggregationRepository(final String prefixName, final String endpoint) { + this.prefixName = prefixName; + this.persistencePrefixName = String.format("%s%s", prefixName, COMPLETED_SUFFIX); + this.optimistic = false; + this.endpoint = endpoint; + } + + public Etcd3AggregationRepository(final String prefixName, final String persistencePrefixName, + final String endpoint) { + this.prefixName = prefixName; + this.persistencePrefixName = persistencePrefixName; + this.optimistic = false; + this.endpoint = endpoint; + } + + public Etcd3AggregationRepository(final String prefixName, final String endpoint, boolean optimistic) { + this(prefixName, endpoint); + this.optimistic = optimistic; + } + + public Etcd3AggregationRepository(final String repositoryName, final String persistentRepositoryName, + final String endpoint, boolean optimistic) { + this(repositoryName, persistentRepositoryName, endpoint); + this.optimistic = optimistic; + } + + @Override + public Exchange add(CamelContext camelContext, String key, Exchange oldExchange, Exchange newExchange) + throws OptimisticLockingException { + if (!optimistic) { + throw new UnsupportedOperationException(); + } + LOG.trace("Adding an Exchange with ID {} for key {} in an optimistic manner.", newExchange.getExchangeId(), + key); + try { + if (oldExchange == null) { + DefaultExchangeHolder holder = DefaultExchangeHolder.marshal(newExchange, true, allowSerializedHeaders); + CompletableFuture<GetResponse> completableGetResponse = kvClient + .get(ByteSequence.from(String.format("%s/%s", prefixName, key).getBytes())); + GetResponse getResponse = completableGetResponse.get(); + List<KeyValue> keyValues = getResponse.getKvs(); + if (keyValues.isEmpty()) { + CompletableFuture<PutResponse> completablePutResponse = kvClient.put( + ByteSequence.from(String.format("%s/%s", prefixName, key).getBytes()), + convertToEtcd3Format(holder)); + completablePutResponse.get(); + } else { + DefaultExchangeHolder misbehaviorHolder + = (DefaultExchangeHolder) convertFromEtcd3Format(keyValues.get(0).getValue()); + Exchange misbehaviorEx = unmarshallExchange(camelContext, misbehaviorHolder); + LOG.warn( + "Optimistic locking failed for exchange with key {}: kvClient.get returned Exchange with ID {}, while it's expected no exchanges to be returned", + key, misbehaviorEx != null ? misbehaviorEx.getExchangeId() : "<null>"); + throw new OptimisticLockingException(); + } + } else { + DefaultExchangeHolder newHolder = DefaultExchangeHolder.marshal(newExchange, true, + allowSerializedHeaders); + CompletableFuture<DeleteResponse> completableDeleteResponse = kvClient + .delete(ByteSequence.from(String.format("%s/%s", prefixName, key).getBytes())); + DeleteResponse deleteResponse = completableDeleteResponse.get(); + if (deleteResponse.getDeleted() == 0) { + LOG.warn( + "Optimistic locking failed for exchange with key {}: kvClient.get returned no Exchanges, while it's expected to replace one", + key); + throw new OptimisticLockingException(); + } + CompletableFuture<PutResponse> completablePutResponse = kvClient.put( + ByteSequence.from(String.format("%s/%s", prefixName, key).getBytes()), convertToEtcd3Format(newHolder)); + completablePutResponse.get(); + } + } catch (InterruptedException | ExecutionException | IOException | ClassNotFoundException e) { + LOG.error(e.getMessage(), e); + throw new OptimisticLockingException(); + } + LOG.trace("Added an Exchange with ID {} for key {} in optimistic manner.", newExchange.getExchangeId(), key); + return oldExchange; + } + + @Override + public Exchange add(CamelContext camelContext, String key, Exchange exchange) { + if (optimistic) { + throw new UnsupportedOperationException(); + } + LOG.trace("Adding an Exchange with ID {} for key {} in a thread-safe manner.", exchange.getExchangeId(), key); + DefaultExchangeHolder newHolder = DefaultExchangeHolder.marshal(exchange, true, allowSerializedHeaders); + CompletableFuture<GetResponse> completableResponse = kvClient + .get(ByteSequence.from(String.format("%s/%s", prefixName, key).getBytes())); + try { + GetResponse getResponse = completableResponse.get(); + long modRevision = 0; + if (!getResponse.getKvs().isEmpty()) { + modRevision = getResponse.getKvs().get(0).getModRevision(); + } + Txn transaction = kvClient.txn(); + transaction + .If(new Cmp( + ByteSequence.from(String.format("%s/%s", prefixName, key).getBytes()), + Cmp.Op.EQUAL, + CmpTarget.ModRevisionCmpTarget.modRevision(modRevision))) + .Then(Op.put(ByteSequence + .from(String.format("%s/%s", prefixName, key).getBytes()), convertToEtcd3Format(newHolder), + PutOption.DEFAULT)) + .commit(); + } catch (InterruptedException | ExecutionException | IOException e) { + LOG.error(e.getMessage(), e); + throw new RuntimeException(e.getMessage(), e); + } + return unmarshallExchange(camelContext, newHolder); + } + + @Override + public Set<String> scan(CamelContext camelContext) { + if (useRecovery) { + LOG.trace("Scanning for exchanges to recover in {} context", camelContext.getName()); + CompletableFuture<GetResponse> completableGetResponse = kvClient.get( + ByteSequence.from(persistencePrefixName.getBytes()), + GetOption.newBuilder().withPrefix(ByteSequence.from(persistencePrefixName.getBytes())).build()); + try { + GetResponse getResponse = completableGetResponse.get(); + Set<String> keys = new TreeSet<>(); + getResponse.getKvs().forEach(kv -> keys.add(new String(kv.getKey().getBytes()))); + Set<String> scanned = Collections.unmodifiableSet(keys); + LOG.trace("Found {} keys for exchanges to recover in {} context", scanned.size(), + camelContext.getName()); + return scanned; + } catch (InterruptedException | ExecutionException e) { + LOG.error(e.getMessage(), e); + throw new RuntimeException(e.getMessage(), e); + } + } else { + LOG.warn( + "What for to run recovery scans in {} context while prefix {} is running in non-recoverable aggregation repository mode?!", + camelContext.getName(), prefixName); + return Collections.emptySet(); + } + } + + @Override + public Exchange recover(CamelContext camelContext, String exchangeId) { + LOG.trace("Recovering an Exchange with ID {}.", exchangeId); + CompletableFuture<GetResponse> completableResponse = kvClient + .get(ByteSequence.from(String.format("%s/%s", persistencePrefixName, exchangeId).getBytes())); + try { + GetResponse getResponse = completableResponse.get(); + DefaultExchangeHolder holder + = (DefaultExchangeHolder) convertFromEtcd3Format(getResponse.getKvs().get(0).getValue()); + return useRecovery ? unmarshallExchange(camelContext, holder) : null; + } catch (InterruptedException | ExecutionException | IOException | ClassNotFoundException e) { + LOG.error(e.getMessage(), e); + throw new RuntimeException(e.getMessage(), e); + } + } + + @Override + public void setRecoveryInterval(long interval, TimeUnit timeUnit) { + this.recoveryInterval = timeUnit.toMillis(interval); + } + + @Override + public void setRecoveryInterval(long interval) { + this.recoveryInterval = interval; + } + + @Override + public long getRecoveryIntervalInMillis() { + return recoveryInterval; + } + + @Override + public void setUseRecovery(boolean useRecovery) { + this.useRecovery = useRecovery; + } + + @Override + public boolean isUseRecovery() { + return useRecovery; + } + + @Override + public void setDeadLetterUri(String deadLetterUri) { + this.deadLetterChannel = deadLetterUri; + } + + @Override + public String getDeadLetterUri() { + return deadLetterChannel; + } + + @Override + public void setMaximumRedeliveries(int maximumRedeliveries) { + this.maximumRedeliveries = maximumRedeliveries; + } + + @Override + public int getMaximumRedeliveries() { + return maximumRedeliveries; + } + + public boolean isAllowSerializedHeaders() { + return allowSerializedHeaders; + } + + public void setAllowSerializedHeaders(boolean allowSerializedHeaders) { + this.allowSerializedHeaders = allowSerializedHeaders; + } + + public boolean isOptimistic() { + return optimistic; + } + + public void setOptimistic(boolean optimistic) { + this.optimistic = optimistic; + } + + public String getEndpoint() { + return endpoint; + } + + public void setEndpoint(String endpoint) { + this.endpoint = endpoint; + } + + public Client getClient() { + return client; + } + + public void setClient(Client client) { + this.client = client; + } + + public String getPrefixName() { + return prefixName; + } + + public void setPrefixName(String prefixName) { + this.prefixName = prefixName; + } + + @Override + public Exchange get(CamelContext camelContext, String key) { + CompletableFuture<GetResponse> completableResponse = kvClient + .get(ByteSequence.from(String.format("%s/%s", prefixName, key).getBytes())); + try { + GetResponse getResponse = completableResponse.get(); + DefaultExchangeHolder holder = null; + if (!getResponse.getKvs().isEmpty()) { + holder = (DefaultExchangeHolder) convertFromEtcd3Format(getResponse.getKvs().get(0).getValue()); + } + return unmarshallExchange(camelContext, holder); + } catch (InterruptedException | ExecutionException | IOException | ClassNotFoundException e) { + LOG.error(e.getMessage(), e); + throw new RuntimeException(e.getMessage(), e); + } + } + + @Override + public void remove(CamelContext camelContext, String key, Exchange exchange) { + DefaultExchangeHolder holder = DefaultExchangeHolder.marshal(exchange, true, allowSerializedHeaders); + if (optimistic) { + LOG.trace("Removing an exchange with ID {} for key {} in an optimistic manner.", exchange.getExchangeId(), + key); + try { + CompletableFuture<GetResponse> completableGetResponse = kvClient + .get(ByteSequence.from(String.format("%s/%s", prefixName, key).getBytes())); + GetResponse getResponse = completableGetResponse.get(); + List<KeyValue> keyValueList = getResponse.getKvs(); + boolean optimisticLockingError = keyValueList.isEmpty(); + if (!optimisticLockingError) { + DefaultExchangeHolder holderFound = (DefaultExchangeHolder) convertFromEtcd3Format( + keyValueList.get(0).getValue()); + optimisticLockingError = !Objects.equals(holder, holderFound); + if (!optimisticLockingError) { + CompletableFuture<DeleteResponse> completableDeleteResponse = kvClient + .delete(ByteSequence.from(String.format("%s/%s", prefixName, key).getBytes())); + DeleteResponse deleteResponse = completableDeleteResponse.get(); + optimisticLockingError = deleteResponse.getDeleted() == 0; + } + } + if (optimisticLockingError) { + LOG.warn( + "Optimistic locking failed for exchange with key {}: kvClient.delete removed no Exchanges, while it's expected to remove one.", + key); + throw new OptimisticLockingException(); + } + + } catch (InterruptedException | ExecutionException | ClassNotFoundException | IOException e) { + LOG.error(e.getMessage(), e); + throw new RuntimeException(e.getMessage(), e); + } + LOG.trace("Removed an exchange with ID {} for key {} in an optimistic manner.", exchange.getExchangeId(), + key); + if (useRecovery) { + LOG.trace( + "Putting an exchange with ID {} for key {} into a recoverable storage in an optimistic manner.", + exchange.getExchangeId(), key); + try { + CompletableFuture<PutResponse> completablePutResponse = kvClient.put( + ByteSequence.from(String.format("%s/%s", persistencePrefixName, key).getBytes()), + convertToEtcd3Format(holder)); + completablePutResponse.get(); + LOG.trace( + "Put an exchange with ID {} for key {} into a recoverable storage in an optimistic manner.", + exchange.getExchangeId(), key); + } catch (IOException | InterruptedException | ExecutionException e) { + LOG.error(e.getMessage(), e); + throw new RuntimeException(e.getMessage(), e); + } + + } + } else { + if (useRecovery) { + LOG.trace("Removing an exchange with ID {} for key {} in a thread-safe manner.", + exchange.getExchangeId(), key); + Txn transaction = kvClient.txn(); + try { + CompletableFuture<GetResponse> completableResponse = kvClient + .get(ByteSequence.from(String.format("%s/%s", prefixName, key).getBytes())); + GetResponse getResponse = completableResponse.get(); + DefaultExchangeHolder removedHolder + = (DefaultExchangeHolder) convertFromEtcd3Format(getResponse.getKvs().get(0).getValue()); + transaction + .If(new Cmp( + ByteSequence.from(String.format("%s/%s", prefixName, key).getBytes()), + Cmp.Op.EQUAL, + CmpTarget.value( + ByteSequence.from(String.format("%s/%s", prefixName, key).getBytes())))) + .Then(Op.delete(ByteSequence.from(String.format("%s/%s", prefixName, key).getBytes()), + DeleteOption.DEFAULT), + Op.put(ByteSequence + .from(String.format("%s/%s", persistencePrefixName, key).getBytes()), + convertToEtcd3Format(removedHolder), PutOption.DEFAULT)) + .commit(); + LOG.trace("Removed an exchange with ID {} for key {} in a thread-safe manner.", + exchange.getExchangeId(), key); + LOG.trace( + "Put an exchange with ID {} for key {} into a recoverable storage in a thread-safe manner.", + exchange.getExchangeId(), key); + } catch (Throwable throwable) { + throw new RuntimeException(throwable.getMessage(), throwable); + } + } else { + CompletableFuture<DeleteResponse> completableDeleteResponse = kvClient + .delete(ByteSequence.from(String.format("%s/%s", prefixName, key).getBytes())); + try { + completableDeleteResponse.get(); + } catch (InterruptedException | ExecutionException e) { + LOG.error(e.getMessage(), e); + throw new RuntimeException(e.getMessage(), e); + } + } + } + } + + @Override + public void confirm(CamelContext camelContext, String exchangeId) { + LOG.trace("Confirming an exchange with ID {}.", exchangeId); + if (useRecovery) { + CompletableFuture<DeleteResponse> completableDeleteResponse = kvClient + .delete(ByteSequence.from(String.format("%s/%s", persistencePrefixName, exchangeId).getBytes())); + try { + completableDeleteResponse.get(); + } catch (InterruptedException | ExecutionException e) { + LOG.error(e.getMessage(), e); + throw new RuntimeException(e.getMessage(), e); + } + } + } + + @Override + public Set<String> getKeys() { + CompletableFuture<GetResponse> completableGetResponse = kvClient.get(ByteSequence.from(prefixName.getBytes()), + GetOption.newBuilder().withRange(ByteSequence.from(prefixName.getBytes())).build()); + Set<String> scanned = Collections.unmodifiableSet(new TreeSet<>()); + try { + GetResponse getResponse = completableGetResponse.get(); + Set<String> keys = new TreeSet<>(); + getResponse.getKvs().forEach(kv -> keys.add(new String(kv.getKey().getBytes()))); + scanned = Collections.unmodifiableSet(keys); + } catch (InterruptedException | ExecutionException e) { + LOG.error(e.getMessage(), e); + throw new RuntimeException(e.getMessage(), e); + } + return scanned; + } + + @Override + protected void doInit() throws Exception { + StringHelper.notEmpty(prefixName, "prefixName"); + if (maximumRedeliveries < 0) { + throw new IllegalArgumentException("Maximum redelivery retries must be zero or a positive integer."); + } + if (recoveryInterval < 0) { + throw new IllegalArgumentException("Recovery interval must be zero or a positive integer."); + } + + } + + @Override + protected void doStart() throws Exception { + if (client == null) { + client = Client.builder().endpoints(endpoint).build(); + shutdownClient = true; + } + kvClient = client.getKVClient(); + } + + @Override + protected void doStop() throws Exception { + if (client != null && shutdownClient) { + client.close(); + client = null; + } + } + + protected Exchange unmarshallExchange(CamelContext camelContext, DefaultExchangeHolder holder) { + Exchange exchange = null; + if (holder != null) { + exchange = new DefaultExchange(camelContext); + DefaultExchangeHolder.unmarshal(exchange, holder); + } + return exchange; + } + + private Object convertFromEtcd3Format(ByteSequence value) throws IOException, ClassNotFoundException { + byte[] data = value.getBytes(); + ByteArrayInputStream in = new ByteArrayInputStream(data); + ObjectInputStream is; + try { + is = new ObjectInputStream(in); + return is.readObject(); + } catch (IOException | ClassNotFoundException e) { + LOG.error(e.getMessage(), e); + throw e; + } + } + + private ByteSequence convertToEtcd3Format(Object value) throws IOException { + ByteArrayOutputStream bos = new ByteArrayOutputStream(); + ObjectOutputStream oos; + try { + oos = new ObjectOutputStream(bos); + oos.writeObject(value); + oos.flush(); + return ByteSequence.from(bos.toByteArray()); + } catch (IOException e) { + LOG.error(e.getMessage(), e); + throw e; + } + } } diff --git a/components/camel-etcd3/src/test/java/org/apache/camel/component/etcd3/AggregateEtcd3Test.java b/components/camel-etcd3/src/test/java/org/apache/camel/component/etcd3/AggregateEtcd3Test.java index b532462..be2b596 100644 --- a/components/camel-etcd3/src/test/java/org/apache/camel/component/etcd3/AggregateEtcd3Test.java +++ b/components/camel-etcd3/src/test/java/org/apache/camel/component/etcd3/AggregateEtcd3Test.java @@ -1,48 +1,47 @@ package org.apache.camel.component.etcd3; import org.apache.camel.builder.RouteBuilder; -import org.apache.camel.component.mock.MockEndpoint; -import org.apache.camel.test.junit4.CamelTestSupport; import org.apache.camel.component.etcd3.processor.aggregate.Etcd3AggregationRepository; -import org.junit.Test; +import org.apache.camel.component.mock.MockEndpoint; +import org.apache.camel.test.junit5.CamelTestSupport; +import org.junit.jupiter.api.Disabled; +import org.junit.jupiter.api.Test; /** * The ABC example for using the Aggregator EIP. * <p/> - * This example have 4 messages send to the aggregator, by which one - * message is published which contains the aggregation of message 1,2 and 4 - * as they use the same correlation key. + * This example have 4 messages send to the aggregator, by which one message is published which contains the aggregation + * of message 1,2 and 4 as they use the same correlation key. * <p/> - * See the class {@link camelinaction.MyAggregationStrategy} for how the messages - * are actually aggregated together. - * - * @see MyAggregationStrategy */ +@Disabled("Requires manually testing") public class AggregateEtcd3Test extends CamelTestSupport { - - private String endpoint = System.getProperty("endpoint"); //http://ip:port - - @Test + + // TODO: use docker test-containers for testing + + private String endpoint = System.getProperty("endpoint"); //http://ip:port + + @Test public void testABC() throws Exception { MockEndpoint mock = getMockEndpoint("mock:result"); mock.expectedBodiesReceived("ABC"); - template.sendBodyAndHeader("direct:start", "A", "myId", 1); + template.sendBodyAndHeader("direct:start", "A", "myId", 1); template.sendBodyAndHeader("direct:start", "B", "myId", 1); template.sendBodyAndHeader("direct:start", "F", "myId", 2); template.sendBodyAndHeader("direct:start", "C", "myId", 1); - assertMockEndpointsSatisfied(); + assertMockEndpointsSatisfied(); } @Override protected RouteBuilder createRouteBuilder() throws Exception { - return new RouteBuilder() { + return new RouteBuilder() { @Override public void configure() throws Exception { from("direct:start") - .log("Sending ${body} with correlation key ${header.myId}") - .aggregate(header("myId"), new MyAggregationStrategy()) - .aggregationRepository(new Etcd3AggregationRepository("aggregation", endpoint)) - .completionSize(3) + .log("Sending ${body} with correlation key ${header.myId}") + .aggregate(header("myId"), new MyAggregationStrategy()) + .aggregationRepository(new Etcd3AggregationRepository("aggregation", endpoint)) + .completionSize(3) .log("Sending out ${body}") .to("mock:result"); } diff --git a/components/camel-etcd3/src/test/java/org/apache/camel/component/etcd3/MyAggregationStrategy.java b/components/camel-etcd3/src/test/java/org/apache/camel/component/etcd3/MyAggregationStrategy.java index 184f605..b9c4bea 100644 --- a/components/camel-etcd3/src/test/java/org/apache/camel/component/etcd3/MyAggregationStrategy.java +++ b/components/camel-etcd3/src/test/java/org/apache/camel/component/etcd3/MyAggregationStrategy.java @@ -1,22 +1,21 @@ package org.apache.camel.component.etcd3; +import org.apache.camel.AggregationStrategy; import org.apache.camel.Exchange; -import org.apache.camel.processor.aggregate.AggregationStrategy; /** - * This is the aggregation strategy which is java code for <i>aggregating</i> - * incoming messages with the existing aggregated message. In other words - * you use this strategy to <i>merge</i> the messages together. + * This is the aggregation strategy which is java code for <i>aggregating</i> incoming messages with the existing + * aggregated message. In other words you use this strategy to <i>merge</i> the messages together. */ public class MyAggregationStrategy implements AggregationStrategy { /** * Aggregates the messages. * - * @param oldExchange the existing aggregated message. Is <tt>null</tt> the - * very first time as there are no existing message. - * @param newExchange the incoming message. This is never <tt>null</tt>. - * @return the aggregated message. + * @param oldExchange the existing aggregated message. Is <tt>null</tt> the very first time as there are no + * existing message. + * @param newExchange the incoming message. This is never <tt>null</tt>. + * @return the aggregated message. */ public Exchange aggregate(Exchange oldExchange, Exchange newExchange) { // the first time there are no existing message and therefore @@ -42,5 +41,5 @@ public class MyAggregationStrategy implements AggregationStrategy { // and return it return oldExchange; } - + } diff --git a/components/camel-etcd3/src/test/resources/log4j2.properties b/components/camel-etcd3/src/test/resources/log4j2.properties new file mode 100644 index 0000000..2c2904a --- /dev/null +++ b/components/camel-etcd3/src/test/resources/log4j2.properties @@ -0,0 +1,28 @@ +## --------------------------------------------------------------------------- +## Licensed to the Apache Software Foundation (ASF) under one or more +## contributor license agreements. See the NOTICE file distributed with +## this work for additional information regarding copyright ownership. +## The ASF licenses this file to You under the Apache License, Version 2.0 +## (the "License"); you may not use this file except in compliance with +## the License. You may obtain a copy of the License at +## +## http://www.apache.org/licenses/LICENSE-2.0 +## +## Unless required by applicable law or agreed to in writing, software +## distributed under the License is distributed on an "AS IS" BASIS, +## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +## See the License for the specific language governing permissions and +## limitations under the License. +## --------------------------------------------------------------------------- + +appender.file.type = File +appender.file.name = file +appender.file.fileName = target/camel-etcd3-test.log +appender.file.layout.type = PatternLayout +appender.file.layout.pattern = %d [%-15.15t] %-5p %-30.30c{1} - %m%n +appender.out.type = Console +appender.out.name = out +appender.out.layout.type = PatternLayout +appender.out.layout.pattern = %d [%-15.15t] %-5p %-30.30c{1} - %m%n +rootLogger.level = INFO +rootLogger.appenderRef.file.ref = file diff --git a/components/camel-redis/pom.xml b/components/camel-redis/pom.xml index 50d73cb..3dedbc1 100644 --- a/components/camel-redis/pom.xml +++ b/components/camel-redis/pom.xml @@ -65,4 +65,17 @@ </dependencies> + <build> + <plugins> + <plugin> + <artifactId>maven-surefire-plugin</artifactId> + <configuration> + <systemPropertyVariables> + <endpoint>${endpoint}</endpoint> + </systemPropertyVariables> + </configuration> + </plugin> + </plugins> + </build> + </project> diff --git a/components/camel-redis/src/main/java/org/apache/camel/component/redis/processor/aggregate/RedisAggregationRepository.java b/components/camel-redis/src/main/java/org/apache/camel/component/redis/processor/aggregate/RedisAggregationRepository.java index 49934b4..393764a 100644 --- a/components/camel-redis/src/main/java/org/apache/camel/component/redis/processor/aggregate/RedisAggregationRepository.java +++ b/components/camel-redis/src/main/java/org/apache/camel/component/redis/processor/aggregate/RedisAggregationRepository.java @@ -61,6 +61,9 @@ public class RedisAggregationRepository extends ServiceSupport private int maximumRedeliveries = 3; private boolean allowSerializedHeaders; + public RedisAggregationRepository() { + } + public RedisAggregationRepository(final String mapName, final String endpoint) { this.mapName = mapName; this.persistenceMapName = String.format("%s%s", mapName, COMPLETED_SUFFIX); @@ -368,6 +371,7 @@ public class RedisAggregationRepository extends ServiceSupport protected void doStop() throws Exception { if (redisson != null && shutdownRedisson) { redisson.shutdown(); + redisson = null; } }
