This is an automated email from the ASF dual-hosted git repository. davsclaus pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/camel.git
commit c60eae2a5c98cdfef653f80da14c005ef942f7ec Author: Marco Longobardi <[email protected]> AuthorDate: Wed Jul 8 18:23:56 2020 +0200 Added camel-etcd3 component and Etcd3AggregationRepository using jetcd Removed unused properties Added some fixes to Etcd3AggregationRepository Fixed java.lang.IndexOutOfBoundsException on Etcd3AggregationRepository Added test class for camel-etcd3 component Fixed indentation problems Added test class and fixed some bugs Modified test class Modified remove method on Etcd3AggregationRepository Rebase Modified Etcd3AggregationRepository --- bom/camel-bom/pom.xml | 5 + camel-dependencies/pom.xml | 3 +- components/camel-etcd/pom.xml | 2 +- components/camel-etcd3/pom.xml | 64 +++ .../aggregate/Etcd3AggregationRepository.java | 486 +++++++++++++++++++++ .../camel/component/etcd3/AggregateEtcd3Test.java | 51 +++ .../component/etcd3/MyAggregationStrategy.java | 46 ++ components/pom.xml | 1 + parent/pom.xml | 10 +- 9 files changed, 662 insertions(+), 6 deletions(-) diff --git a/bom/camel-bom/pom.xml b/bom/camel-bom/pom.xml index 21320c9..0a94e8e 100644 --- a/bom/camel-bom/pom.xml +++ b/bom/camel-bom/pom.xml @@ -704,6 +704,11 @@ </dependency> <dependency> <groupId>org.apache.camel</groupId> + <artifactId>camel-etcd3</artifactId> + <version>${project.version}</version> + </dependency> + <dependency> + <groupId>org.apache.camel</groupId> <artifactId>camel-exec</artifactId> <version>${project.version}</version> </dependency> diff --git a/camel-dependencies/pom.xml b/camel-dependencies/pom.xml index e66d598..8dd7a46 100644 --- a/camel-dependencies/pom.xml +++ b/camel-dependencies/pom.xml @@ -334,6 +334,7 @@ <jcr-version>2.0</jcr-version> <jedis-client-version>3.1.0</jedis-client-version> <jersey-version>2.28</jersey-version> + <jetcd-version>0.5.3</jetcd-version> <jettison-version>1.4.1</jettison-version> <jetty-plugin-version>${jetty-version}</jetty-plugin-version> <jetty-runner-groupId>org.eclipse.jetty</jetty-runner-groupId> @@ -592,4 +593,4 @@ <zookeeper-version>3.5.7</zookeeper-version> <zxing-version>3.4.0</zxing-version> </properties> -</project> +</project> \ No newline at end of file diff --git a/components/camel-etcd/pom.xml b/components/camel-etcd/pom.xml index 23f8902..6f740c0 100644 --- a/components/camel-etcd/pom.xml +++ b/components/camel-etcd/pom.xml @@ -50,7 +50,7 @@ <artifactId>commons-lang3</artifactId> <version>${commons-lang3-version}</version> </dependency> - + <!-- logging --> <dependency> <groupId>org.apache.logging.log4j</groupId> diff --git a/components/camel-etcd3/pom.xml b/components/camel-etcd3/pom.xml new file mode 100644 index 0000000..6edc0d8 --- /dev/null +++ b/components/camel-etcd3/pom.xml @@ -0,0 +1,64 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> + + <modelVersion>4.0.0</modelVersion> + + <parent> + <groupId>org.apache.camel</groupId> + <artifactId>components</artifactId> + <version>3.5.0-SNAPSHOT</version> + </parent> + + <artifactId>camel-etcd3</artifactId> + <packaging>jar</packaging> + <name>Camel :: Etcd3</name> + <description>Camel Etcd3 support</description> + + <dependencies> + + <dependency> + <groupId>io.etcd</groupId> + <artifactId>jetcd-all</artifactId> + <version>${jetcd-version}</version> + </dependency> + + <dependency> + <groupId>org.apache.camel</groupId> + <artifactId>camel-test-spring-junit5</artifactId> + <scope>test</scope> + </dependency> + + </dependencies> + + <build> + <plugins> + <plugin> + <artifactId>maven-surefire-plugin</artifactId> + <configuration> + <systemPropertyVariables> + <endpoint>${endpoint}</endpoint> + </systemPropertyVariables> + </configuration> + </plugin> + </plugins> + </build> + +</project> diff --git a/components/camel-etcd3/src/main/java/org/apache/camel/component/etcd3/processor/aggregate/Etcd3AggregationRepository.java b/components/camel-etcd3/src/main/java/org/apache/camel/component/etcd3/processor/aggregate/Etcd3AggregationRepository.java new file mode 100644 index 0000000..d9cd429 --- /dev/null +++ b/components/camel-etcd3/src/main/java/org/apache/camel/component/etcd3/processor/aggregate/Etcd3AggregationRepository.java @@ -0,0 +1,486 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.camel.component.etcd3.processor.aggregate; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; +import java.util.Collections; +import java.util.List; +import java.util.Set; +import java.util.TreeSet; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; + +import javax.annotation.PostConstruct; + +import org.apache.camel.CamelContext; +import org.apache.camel.Exchange; +import org.apache.camel.impl.DefaultExchange; +import org.apache.camel.impl.DefaultExchangeHolder; +import org.apache.camel.spi.OptimisticLockingAggregationRepository; +import org.apache.camel.spi.RecoverableAggregationRepository; +import org.apache.camel.support.ServiceSupport; +import org.apache.camel.util.StringHelper; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import io.etcd.jetcd.ByteSequence; +import io.etcd.jetcd.Client; +import io.etcd.jetcd.KV; +import io.etcd.jetcd.KeyValue; +import io.etcd.jetcd.Txn; +import io.etcd.jetcd.kv.DeleteResponse; +import io.etcd.jetcd.kv.GetResponse; +import io.etcd.jetcd.kv.PutResponse; +import io.etcd.jetcd.op.Cmp; +import io.etcd.jetcd.op.CmpTarget; +import io.etcd.jetcd.op.Op; +import io.etcd.jetcd.options.DeleteOption; +import io.etcd.jetcd.options.GetOption; +import io.etcd.jetcd.options.PutOption; + +public class Etcd3AggregationRepository extends ServiceSupport + implements RecoverableAggregationRepository, OptimisticLockingAggregationRepository { + private static final Logger LOG = LoggerFactory.getLogger(Etcd3AggregationRepository.class); + private static final String COMPLETED_SUFFIX = "-completed"; + + private boolean optimistic; + private boolean useRecovery = true; + private String endpoint; + private Client client; + private KV kvClient; + private String prefixName; + private String persistencePrefixName; + private String deadLetterChannel; + private long recoveryInterval = 5000; + private int maximumRedeliveries = 3; + private boolean allowSerializedHeaders; + + public Etcd3AggregationRepository(final String prefixName, final String endpoint) { + this.prefixName = prefixName; + this.persistencePrefixName = String.format("%s%s", prefixName, COMPLETED_SUFFIX); + this.optimistic = false; + this.endpoint = endpoint; + } + + public Etcd3AggregationRepository(final String prefixName, final String persistencePrefixName, + final String endpoint) { + this.prefixName = prefixName; + this.persistencePrefixName = persistencePrefixName; + this.optimistic = false; + this.endpoint = endpoint; + } + + public Etcd3AggregationRepository(final String prefixName, final String endpoint, boolean optimistic) { + this(prefixName, endpoint); + this.optimistic = optimistic; + } + + public Etcd3AggregationRepository(final String repositoryName, final String persistentRepositoryName, + final String endpoint, boolean optimistic) { + this(repositoryName, persistentRepositoryName, endpoint); + this.optimistic = optimistic; + } + + @Override + public Exchange add(CamelContext camelContext, String key, Exchange oldExchange, Exchange newExchange) + throws OptimisticLockingException { + if (!optimistic) { + throw new UnsupportedOperationException(); + } + LOG.trace("Adding an Exchange with ID {} for key {} in an optimistic manner.", newExchange.getExchangeId(), + key); + try { + if (oldExchange == null) { + DefaultExchangeHolder holder = DefaultExchangeHolder.marshal(newExchange, true, allowSerializedHeaders); + CompletableFuture<GetResponse> completableGetResponse = kvClient + .get(ByteSequence.from(String.format("%s/%s", prefixName, key).getBytes())); + GetResponse getResponse = completableGetResponse.get(); + List<KeyValue> keyValues = getResponse.getKvs(); + if (keyValues.isEmpty()) { + CompletableFuture<PutResponse> completablePutResponse = kvClient.put( + ByteSequence.from(String.format("%s/%s", prefixName, key).getBytes()), + convertToEtcd3Format(holder)); + completablePutResponse.get(); + } else { + DefaultExchangeHolder misbehaviorHolder = (DefaultExchangeHolder) convertFromEtcd3Format(keyValues.get(0).getValue()); + Exchange misbehaviorEx = unmarshallExchange(camelContext, misbehaviorHolder); + LOG.error( + "Optimistic locking failed for exchange with key {}: kvClient.get returned Exchange with ID {}, while it's expected no exchanges to be returned", + key, misbehaviorEx != null ? misbehaviorEx.getExchangeId() : "<null>"); + throw new OptimisticLockingException(); + } + } else { + DefaultExchangeHolder newHolder = DefaultExchangeHolder.marshal(newExchange, true, + allowSerializedHeaders); + CompletableFuture<DeleteResponse> completableDeleteResponse = kvClient + .delete(ByteSequence.from(String.format("%s/%s", prefixName, key).getBytes())); + DeleteResponse deleteResponse = completableDeleteResponse.get(); + if (deleteResponse.getDeleted() == 0) { + LOG.error( + "Optimistic locking failed for exchange with key {}: kvClient.get returned no Exchanges, while it's expected to replace one", + key); + throw new OptimisticLockingException(); + } + CompletableFuture<PutResponse> completablePutResponse = kvClient.put( + ByteSequence.from(String.format("%s/%s", prefixName, key).getBytes()), convertToEtcd3Format(newHolder)); + completablePutResponse.get(); + } + } catch (InterruptedException | ExecutionException | IOException | ClassNotFoundException e) { + LOG.error(e.getMessage(), e); + throw new OptimisticLockingException(); + } + LOG.trace("Added an Exchange with ID {} for key {} in optimistic manner.", newExchange.getExchangeId(), key); + return oldExchange; + } + + @Override + public Exchange add(CamelContext camelContext, String key, Exchange exchange) { + if (optimistic) { + throw new UnsupportedOperationException(); + } + LOG.trace("Adding an Exchange with ID {} for key {} in a thread-safe manner.", exchange.getExchangeId(), key); + DefaultExchangeHolder newHolder = DefaultExchangeHolder.marshal(exchange, true, allowSerializedHeaders); + CompletableFuture<GetResponse> completableResponse = kvClient + .get(ByteSequence.from(String.format("%s/%s", prefixName, key).getBytes())); + try { + GetResponse getResponse = completableResponse.get(); + long modRevision = 0; + if(!getResponse.getKvs().isEmpty()){ + modRevision = getResponse.getKvs().get(0).getModRevision(); + } + Txn transaction = kvClient.txn(); + transaction + .If(new Cmp(ByteSequence.from(String.format("%s/%s", prefixName, key).getBytes()), + Cmp.Op.EQUAL, + CmpTarget.ModRevisionCmpTarget.modRevision(modRevision))) + .Then(Op.put(ByteSequence + .from(String.format("%s/%s", prefixName, key).getBytes()), convertToEtcd3Format(newHolder), PutOption.DEFAULT)) + .commit(); + } catch (InterruptedException | ExecutionException | IOException e) { + LOG.error(e.getMessage(), e); + throw new RuntimeException(e.getMessage(),e); + } + return unmarshallExchange(camelContext, newHolder); + } + + @Override + public Set<String> scan(CamelContext camelContext) { + if (useRecovery) { + LOG.trace("Scanning for exchanges to recover in {} context", camelContext.getName()); + CompletableFuture<GetResponse> completableGetResponse = kvClient.get( + ByteSequence.from(persistencePrefixName.getBytes()), + GetOption.newBuilder().withPrefix(ByteSequence.from(persistencePrefixName.getBytes())).build()); + try { + GetResponse getResponse = completableGetResponse.get(); + Set<String> keys = new TreeSet<>(); + getResponse.getKvs().forEach(kv -> keys.add(new String(kv.getKey().getBytes()))); + Set<String> scanned = Collections.unmodifiableSet(keys); + LOG.trace("Found {} keys for exchanges to recover in {} context", scanned.size(), + camelContext.getName()); + return scanned; + } catch (InterruptedException | ExecutionException e) { + LOG.error(e.getMessage(), e); + throw new RuntimeException(e.getMessage(),e); + } + } else { + LOG.warn( + "What for to run recovery scans in {} context while prefix {} is running in non-recoverable aggregation repository mode?!", + camelContext.getName(), prefixName); + return Collections.emptySet(); + } + } + + @Override + public Exchange recover(CamelContext camelContext, String exchangeId) { + LOG.trace("Recovering an Exchange with ID {}.", exchangeId); + CompletableFuture<GetResponse> completableResponse = kvClient + .get(ByteSequence.from(String.format("%s/%s", persistencePrefixName, exchangeId).getBytes())); + try { + GetResponse getResponse = completableResponse.get(); + DefaultExchangeHolder holder = (DefaultExchangeHolder) convertFromEtcd3Format(getResponse.getKvs().get(0).getValue()); + return useRecovery ? unmarshallExchange(camelContext, holder) : null; + } catch (InterruptedException | ExecutionException | IOException | ClassNotFoundException e) { + LOG.error(e.getMessage(), e); + throw new RuntimeException(e.getMessage(),e); + } + } + + @Override + public void setRecoveryInterval(long interval, TimeUnit timeUnit) { + this.recoveryInterval = timeUnit.toMillis(interval); + } + + @Override + public void setRecoveryInterval(long interval) { + this.recoveryInterval = interval; + } + + @Override + public long getRecoveryIntervalInMillis() { + return recoveryInterval; + } + + @Override + public void setUseRecovery(boolean useRecovery) { + this.useRecovery = useRecovery; + } + + @Override + public boolean isUseRecovery() { + return useRecovery; + } + + @Override + public void setDeadLetterUri(String deadLetterUri) { + this.deadLetterChannel = deadLetterUri; + } + + @Override + public String getDeadLetterUri() { + return deadLetterChannel; + } + + @Override + public void setMaximumRedeliveries(int maximumRedeliveries) { + this.maximumRedeliveries = maximumRedeliveries; + } + + @Override + public int getMaximumRedeliveries() { + return maximumRedeliveries; + } + + @Override + public Exchange get(CamelContext camelContext, String key) { + CompletableFuture<GetResponse> completableResponse = kvClient + .get(ByteSequence.from(String.format("%s/%s", prefixName, key).getBytes())); + try { + GetResponse getResponse = completableResponse.get(); + DefaultExchangeHolder holder = null; + if(!getResponse.getKvs().isEmpty()) { + holder = (DefaultExchangeHolder) convertFromEtcd3Format(getResponse.getKvs().get(0).getValue()); + } + return unmarshallExchange(camelContext, holder); + } catch (InterruptedException | ExecutionException | IOException | ClassNotFoundException e) { + LOG.error(e.getMessage(), e); + throw new RuntimeException(e.getMessage(),e); + } + } + + public boolean isAllowSerializedHeaders() { + return allowSerializedHeaders; + } + + public void setAllowSerializedHeaders(boolean allowSerializedHeaders) { + this.allowSerializedHeaders = allowSerializedHeaders; + } + + @Override + public void remove(CamelContext camelContext, String key, Exchange exchange) { + DefaultExchangeHolder holder = DefaultExchangeHolder.marshal(exchange, true, allowSerializedHeaders); + if (optimistic) { + LOG.trace("Removing an exchange with ID {} for key {} in an optimistic manner.", exchange.getExchangeId(), + key); + try { + CompletableFuture<GetResponse> completableGetResponse = kvClient + .get(ByteSequence.from(String.format("%s/%s", prefixName, key).getBytes())); + GetResponse getResponse = completableGetResponse.get(); + List<KeyValue> keyValueList = getResponse.getKvs(); + boolean optimisticLockingError = keyValueList.isEmpty(); + if (!optimisticLockingError) { + DefaultExchangeHolder holderFound = (DefaultExchangeHolder) convertFromEtcd3Format( + keyValueList.get(0).getValue()); + optimisticLockingError = !Objects.equals(holder, holderFound); + if (!optimisticLockingError) { + CompletableFuture<DeleteResponse> completableDeleteResponse = kvClient + .delete(ByteSequence.from(String.format("%s/%s", prefixName, key).getBytes())); + DeleteResponse deleteResponse = completableDeleteResponse.get(); + optimisticLockingError = deleteResponse.getDeleted() == 0; + } + } + if (optimisticLockingError) { + LOG.error( + "Optimistic locking failed for exchange with key {}: kvClient.delete removed no Exchanges, while it's expected to remove one.", + key); + throw new OptimisticLockingException(); + } + + } catch (InterruptedException | ExecutionException | ClassNotFoundException | IOException e) { + LOG.error(e.getMessage(), e); + throw new RuntimeException(e.getMessage(), e); + } + LOG.trace("Removed an exchange with ID {} for key {} in an optimistic manner.", exchange.getExchangeId(), + key); + if (useRecovery) { + LOG.trace( + "Putting an exchange with ID {} for key {} into a recoverable storage in an optimistic manner.", + exchange.getExchangeId(), key); + try { + CompletableFuture<PutResponse> completablePutResponse = kvClient.put( + ByteSequence.from(String.format("%s/%s", persistencePrefixName, key).getBytes()), + convertToEtcd3Format(holder)); + completablePutResponse.get(); + LOG.trace( + "Put an exchange with ID {} for key {} into a recoverable storage in an optimistic manner.", + exchange.getExchangeId(), key); + } catch (IOException | InterruptedException | ExecutionException e) { + LOG.error(e.getMessage(), e); + throw new RuntimeException(e.getMessage(),e); + } + + } + } else { + if (useRecovery) { + LOG.trace("Removing an exchange with ID {} for key {} in a thread-safe manner.", + exchange.getExchangeId(), key); + Txn transaction = kvClient.txn(); + try { + CompletableFuture<GetResponse> completableResponse = kvClient + .get(ByteSequence.from(String.format("%s/%s", prefixName, key).getBytes())); + GetResponse getResponse = completableResponse.get(); + DefaultExchangeHolder removedHolder = (DefaultExchangeHolder) convertFromEtcd3Format(getResponse.getKvs().get(0).getValue()); + transaction + .If(new Cmp(ByteSequence.from(String.format("%s/%s", prefixName, key).getBytes()), + Cmp.Op.EQUAL, + CmpTarget.value( + ByteSequence.from(String.format("%s/%s", prefixName, key).getBytes())))) + .Then(Op.delete(ByteSequence.from(String.format("%s/%s", prefixName, key).getBytes()), + DeleteOption.DEFAULT), + Op.put(ByteSequence + .from(String.format("%s/%s", persistencePrefixName, key).getBytes()), + convertToEtcd3Format(removedHolder), PutOption.DEFAULT)) + .commit(); + LOG.trace("Removed an exchange with ID {} for key {} in a thread-safe manner.", + exchange.getExchangeId(), key); + LOG.trace( + "Put an exchange with ID {} for key {} into a recoverable storage in a thread-safe manner.", + exchange.getExchangeId(), key); + } catch (Throwable throwable) { + throw new RuntimeException(throwable.getMessage(),throwable); + } + } else { + CompletableFuture<DeleteResponse> completableDeleteResponse = kvClient + .delete(ByteSequence.from(String.format("%s/%s", prefixName, key).getBytes())); + try { + completableDeleteResponse.get(); + } catch (InterruptedException | ExecutionException e) { + LOG.error(e.getMessage(), e); + throw new RuntimeException(e.getMessage(),e); + } + } + } + } + + @Override + public void confirm(CamelContext camelContext, String exchangeId) { + LOG.trace("Confirming an exchange with ID {}.", exchangeId); + if (useRecovery) { + CompletableFuture<DeleteResponse> completableDeleteResponse = kvClient + .delete(ByteSequence.from(String.format("%s/%s", persistencePrefixName, exchangeId).getBytes())); + try { + completableDeleteResponse.get(); + } catch (InterruptedException | ExecutionException e) { + LOG.error(e.getMessage(), e); + throw new RuntimeException(e.getMessage(),e); + } + } + } + + @Override + public Set<String> getKeys() { + CompletableFuture<GetResponse> completableGetResponse = kvClient.get(ByteSequence.from(prefixName.getBytes()), + GetOption.newBuilder().withRange(ByteSequence.from(prefixName.getBytes())).build()); + Set<String> scanned = Collections.unmodifiableSet(new TreeSet<>()); + try { + GetResponse getResponse = completableGetResponse.get(); + Set<String> keys = new TreeSet<>(); + getResponse.getKvs().forEach(kv -> keys.add(new String(kv.getKey().getBytes()))); + scanned = Collections.unmodifiableSet(keys); + } catch (InterruptedException | ExecutionException e) { + LOG.error(e.getMessage(), e); + throw new RuntimeException(e.getMessage(),e); + } + return scanned; + } + + @PostConstruct + void init() throws Exception { + if (maximumRedeliveries < 0) { + throw new IllegalArgumentException("Maximum redelivery retries must be zero or a positive integer."); + } + if (recoveryInterval < 0) { + throw new IllegalArgumentException("Recovery interval must be zero or a positive integer."); + } + + } + + @Override + protected void doStart() throws Exception { + StringHelper.notEmpty(prefixName, "prefixName"); + client = Client.builder().endpoints(endpoint).build(); + kvClient = client.getKVClient(); + } + + @Override + protected void doStop() throws Exception { + if(client!=null) { + client.close(); + } + } + + protected Exchange unmarshallExchange(CamelContext camelContext, DefaultExchangeHolder holder) { + Exchange exchange = null; + if (holder != null) { + exchange = new DefaultExchange(camelContext); + DefaultExchangeHolder.unmarshal(exchange, holder); + } + return exchange; + } + + private Object convertFromEtcd3Format(ByteSequence value) throws IOException, ClassNotFoundException { + byte[] data = value.getBytes(); + ByteArrayInputStream in = new ByteArrayInputStream(data); + ObjectInputStream is; + try { + is = new ObjectInputStream(in); + return is.readObject(); + } catch (IOException | ClassNotFoundException e) { + LOG.error(e.getMessage(),e); + throw e; + } + } + + private ByteSequence convertToEtcd3Format(Object value) throws IOException { + ByteArrayOutputStream bos = new ByteArrayOutputStream(); + ObjectOutputStream oos; + try { + oos = new ObjectOutputStream(bos); + oos.writeObject(value); + oos.flush(); + return ByteSequence.from(bos.toByteArray()); + } catch (IOException e) { + LOG.error(e.getMessage(),e); + throw e; + } + } +} diff --git a/components/camel-etcd3/src/test/java/org/apache/camel/component/etcd3/AggregateEtcd3Test.java b/components/camel-etcd3/src/test/java/org/apache/camel/component/etcd3/AggregateEtcd3Test.java new file mode 100644 index 0000000..b532462 --- /dev/null +++ b/components/camel-etcd3/src/test/java/org/apache/camel/component/etcd3/AggregateEtcd3Test.java @@ -0,0 +1,51 @@ +package org.apache.camel.component.etcd3; + +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.component.mock.MockEndpoint; +import org.apache.camel.test.junit4.CamelTestSupport; +import org.apache.camel.component.etcd3.processor.aggregate.Etcd3AggregationRepository; +import org.junit.Test; + +/** + * The ABC example for using the Aggregator EIP. + * <p/> + * This example have 4 messages send to the aggregator, by which one + * message is published which contains the aggregation of message 1,2 and 4 + * as they use the same correlation key. + * <p/> + * See the class {@link camelinaction.MyAggregationStrategy} for how the messages + * are actually aggregated together. + * + * @see MyAggregationStrategy + */ +public class AggregateEtcd3Test extends CamelTestSupport { + + private String endpoint = System.getProperty("endpoint"); //http://ip:port + + @Test + public void testABC() throws Exception { + MockEndpoint mock = getMockEndpoint("mock:result"); + mock.expectedBodiesReceived("ABC"); + template.sendBodyAndHeader("direct:start", "A", "myId", 1); + template.sendBodyAndHeader("direct:start", "B", "myId", 1); + template.sendBodyAndHeader("direct:start", "F", "myId", 2); + template.sendBodyAndHeader("direct:start", "C", "myId", 1); + assertMockEndpointsSatisfied(); + } + + @Override + protected RouteBuilder createRouteBuilder() throws Exception { + return new RouteBuilder() { + @Override + public void configure() throws Exception { + from("direct:start") + .log("Sending ${body} with correlation key ${header.myId}") + .aggregate(header("myId"), new MyAggregationStrategy()) + .aggregationRepository(new Etcd3AggregationRepository("aggregation", endpoint)) + .completionSize(3) + .log("Sending out ${body}") + .to("mock:result"); + } + }; + } +} diff --git a/components/camel-etcd3/src/test/java/org/apache/camel/component/etcd3/MyAggregationStrategy.java b/components/camel-etcd3/src/test/java/org/apache/camel/component/etcd3/MyAggregationStrategy.java new file mode 100644 index 0000000..184f605 --- /dev/null +++ b/components/camel-etcd3/src/test/java/org/apache/camel/component/etcd3/MyAggregationStrategy.java @@ -0,0 +1,46 @@ +package org.apache.camel.component.etcd3; + +import org.apache.camel.Exchange; +import org.apache.camel.processor.aggregate.AggregationStrategy; + +/** + * This is the aggregation strategy which is java code for <i>aggregating</i> + * incoming messages with the existing aggregated message. In other words + * you use this strategy to <i>merge</i> the messages together. + */ +public class MyAggregationStrategy implements AggregationStrategy { + + /** + * Aggregates the messages. + * + * @param oldExchange the existing aggregated message. Is <tt>null</tt> the + * very first time as there are no existing message. + * @param newExchange the incoming message. This is never <tt>null</tt>. + * @return the aggregated message. + */ + public Exchange aggregate(Exchange oldExchange, Exchange newExchange) { + // the first time there are no existing message and therefore + // the oldExchange is null. In these cases we just return + // the newExchange + if (oldExchange == null) { + return newExchange; + } + + // now we have both an existing message (oldExchange) + // and a incoming message (newExchange) + // we want to merge together. + + // in this example we add their bodies + String oldBody = oldExchange.getIn().getBody(String.class).trim(); + String newBody = newExchange.getIn().getBody(String.class).trim(); + + // the body should be the two bodies added together + String body = oldBody + newBody; + + // update the existing message with the added body + oldExchange.getIn().setBody(body); + // and return it + return oldExchange; + } + +} diff --git a/components/pom.xml b/components/pom.xml index 5d89fe7..c4e6189 100644 --- a/components/pom.xml +++ b/components/pom.xml @@ -190,6 +190,7 @@ <module>camel-elsql</module> <module>camel-elytron</module> <module>camel-etcd</module> + <module>camel-etcd3</module> <module>camel-exec</module> <module>camel-facebook</module> <module>camel-fastjson</module> diff --git a/parent/pom.xml b/parent/pom.xml index 444086d..e88be59 100644 --- a/parent/pom.xml +++ b/parent/pom.xml @@ -318,6 +318,7 @@ <jcr-version>2.0</jcr-version> <jedis-client-version>3.1.0</jedis-client-version> <jersey-version>2.28</jersey-version> + <jetcd-version>0.5.3</jetcd-version> <jetty9-version>9.4.31.v20200723</jetty9-version> <jetty-version>${jetty9-version}</jetty-version> <jetty-plugin-version>${jetty-version}</jetty-plugin-version> @@ -613,7 +614,6 @@ <camel.osgi.provide.capability /> <camel.osgi.manifest>${project.build.outputDirectory}/META-INF/MANIFEST.MF</camel.osgi.manifest> - <sourcecheckExcludes></sourcecheckExcludes> <sourcecheckExcludesComma></sourcecheckExcludesComma> </properties> @@ -1343,6 +1343,11 @@ </dependency> <dependency> <groupId>org.apache.camel</groupId> + <artifactId>camel-etcd3</artifactId> + <version>${project.version}</version> + </dependency> + <dependency> + <groupId>org.apache.camel</groupId> <artifactId>camel-exec</artifactId> <version>${project.version}</version> </dependency> @@ -2664,7 +2669,6 @@ <version>${project.version}</version> <type>test-jar</type> </dependency> - <!-- cassandra --> <dependency> <groupId>org.apache.cassandra</groupId> @@ -2681,7 +2685,6 @@ <artifactId>java-driver-query-builder</artifactId> <version>${cassandra-driver-version}</version> </dependency> - <dependency> <groupId>com.fasterxml.jackson.core</groupId> <artifactId>jackson-annotations</artifactId> @@ -4465,7 +4468,6 @@ </plugins> </build> </profile> - <profile> <id>revapi</id> <build>
