CAMEL-10072: camel-etcd : implement watching ServiceCallServerListStrategy
Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/615bc235 Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/615bc235 Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/615bc235 Branch: refs/heads/master Commit: 615bc2355a1152c8b7957d05058f48ef04be4e9c Parents: 181e8f8 Author: lburgazzoli <[email protected]> Authored: Mon Jun 20 15:36:25 2016 +0200 Committer: lburgazzoli <[email protected]> Committed: Mon Jun 20 15:57:33 2016 +0200 ---------------------------------------------------------------------- .../camel/component/etcd/EtcdConfiguration.java | 7 +- .../apache/camel/component/etcd/EtcdHelper.java | 18 ++ .../component/etcd/policy/EtcdRoutePolicy.java | 32 ++-- .../remote/EtcdServiceCallProcessor.java | 5 +- .../remote/EtcdServiceCallProcessorFactory.java | 7 +- .../EtcdServiceCallServerListStrategies.java | 172 ++++++++++++++++--- .../EtcdServiceCallServerListStrategy.java | 3 +- .../camel/component/etcd/EtcdKeysTest.java | 1 - .../camel/component/etcd/EtcdStatsTest.java | 1 - .../camel/component/etcd/EtcdWatchTest.java | 1 - .../remote/EtcdServiceCallRouteTest.java | 2 - .../EtcdServiceCallServerListStrategyTest.java | 110 ++++++++++++ 12 files changed, 297 insertions(+), 62 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/615bc235/components/camel-etcd/src/main/java/org/apache/camel/component/etcd/EtcdConfiguration.java ---------------------------------------------------------------------- diff --git a/components/camel-etcd/src/main/java/org/apache/camel/component/etcd/EtcdConfiguration.java b/components/camel-etcd/src/main/java/org/apache/camel/component/etcd/EtcdConfiguration.java index 263f0b4..aeb089a 100644 --- a/components/camel-etcd/src/main/java/org/apache/camel/component/etcd/EtcdConfiguration.java +++ b/components/camel-etcd/src/main/java/org/apache/camel/component/etcd/EtcdConfiguration.java @@ -183,9 +183,10 @@ public class EtcdConfiguration { URI[] etcdUriList = new URI[uris.length]; - int i = 0; - for (String uri : uris) { - etcdUriList[i++] = URI.create(camelContext.resolvePropertyPlaceholders(uri)); + for (int i = 0; i < uris.length; i++) { + etcdUriList[i] = camelContext != null + ? URI.create(camelContext.resolvePropertyPlaceholders(uris[i])) + : URI.create(uris[i]); } return new EtcdClient( http://git-wip-us.apache.org/repos/asf/camel/blob/615bc235/components/camel-etcd/src/main/java/org/apache/camel/component/etcd/EtcdHelper.java ---------------------------------------------------------------------- diff --git a/components/camel-etcd/src/main/java/org/apache/camel/component/etcd/EtcdHelper.java b/components/camel-etcd/src/main/java/org/apache/camel/component/etcd/EtcdHelper.java index b222037..a2954bc 100644 --- a/components/camel-etcd/src/main/java/org/apache/camel/component/etcd/EtcdHelper.java +++ b/components/camel-etcd/src/main/java/org/apache/camel/component/etcd/EtcdHelper.java @@ -16,13 +16,19 @@ */ package org.apache.camel.component.etcd; +import java.util.concurrent.atomic.AtomicLong; + import com.fasterxml.jackson.annotation.JsonInclude; import com.fasterxml.jackson.databind.DeserializationFeature; import com.fasterxml.jackson.databind.ObjectMapper; import mousio.etcd4j.responses.EtcdErrorCode; import mousio.etcd4j.responses.EtcdException; +import mousio.etcd4j.responses.EtcdKeysResponse; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public final class EtcdHelper { + private static final Logger LOGGER = LoggerFactory.getLogger(EtcdHelper.class); private static final String OUTDATED_EVENT_MSG = "requested index is outdated and cleared"; private EtcdHelper() { @@ -41,4 +47,16 @@ public final class EtcdHelper { .configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false) .setSerializationInclusion(JsonInclude.Include.NON_NULL); } + + + + public static void setIndex(AtomicLong index, EtcdKeysResponse response) { + if (response != null && response.node != null) { + index.set(response.node.modifiedIndex + 1); + LOGGER.debug("Index received={}, next={}", response.node.modifiedIndex, index.get()); + } else { + index.set(response.etcdIndex + 1); + LOGGER.debug("Index received={}, next={}", response.node.modifiedIndex, index.get()); + } + } } http://git-wip-us.apache.org/repos/asf/camel/blob/615bc235/components/camel-etcd/src/main/java/org/apache/camel/component/etcd/policy/EtcdRoutePolicy.java ---------------------------------------------------------------------- diff --git a/components/camel-etcd/src/main/java/org/apache/camel/component/etcd/policy/EtcdRoutePolicy.java b/components/camel-etcd/src/main/java/org/apache/camel/component/etcd/policy/EtcdRoutePolicy.java index 32e95df..115af13 100644 --- a/components/camel-etcd/src/main/java/org/apache/camel/component/etcd/policy/EtcdRoutePolicy.java +++ b/components/camel-etcd/src/main/java/org/apache/camel/component/etcd/policy/EtcdRoutePolicy.java @@ -250,7 +250,7 @@ public class EtcdRoutePolicy extends RoutePolicySupport implements ResponsePromi } else { try { EtcdKeysResponse response = promise.get(); - setIndex(response); + EtcdHelper.setIndex(index, response); if (response.node.value == null) { setLeader(tryTakeLeadership()); @@ -267,20 +267,20 @@ public class EtcdRoutePolicy extends RoutePolicySupport implements ResponsePromi } if (throwable == null) { - try { - watch(); - } catch (Exception e) { - throw new RuntimeCamelException(e); - } + watch(); } else { throw new RuntimeCamelException(throwable); } } - private void watch() throws Exception { - if (isRunAllowed()) { + private void watch() { + if (!isRunAllowed()) { + return; + } + + try { if (leader.get()) { - setIndex(client.refresh(servicePath, ttl) + EtcdHelper.setIndex(index, client.refresh(servicePath, ttl) .send() .get() ); @@ -293,6 +293,8 @@ public class EtcdRoutePolicy extends RoutePolicySupport implements ResponsePromi .timeout(ttl / 3, TimeUnit.SECONDS) .send() .addListener(this); + } catch (Exception e) { + throw new RuntimeCamelException(e); } } @@ -308,7 +310,7 @@ public class EtcdRoutePolicy extends RoutePolicySupport implements ResponsePromi .get(); result = ObjectHelper.equal(serviceName, response.node.value); - setIndex(response); + EtcdHelper.setIndex(index, response); } catch (EtcdException e) { if (!e.isErrorCode(EtcdErrorCode.NodeExist)) { throw e; @@ -317,14 +319,4 @@ public class EtcdRoutePolicy extends RoutePolicySupport implements ResponsePromi return result; } - - private void setIndex(EtcdKeysResponse response) { - if (response != null && response.node != null) { - index.set(response.node.modifiedIndex + 1); - LOGGER.debug("Index received={}, next={}", response.node.modifiedIndex, index.get()); - } else { - index.set(response.etcdIndex + 1); - LOGGER.debug("Index received={}, next={}", response.node.modifiedIndex, index.get()); - } - } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/camel/blob/615bc235/components/camel-etcd/src/main/java/org/apache/camel/component/etcd/processor/remote/EtcdServiceCallProcessor.java ---------------------------------------------------------------------- diff --git a/components/camel-etcd/src/main/java/org/apache/camel/component/etcd/processor/remote/EtcdServiceCallProcessor.java b/components/camel-etcd/src/main/java/org/apache/camel/component/etcd/processor/remote/EtcdServiceCallProcessor.java index 3251ae5..f1200a2 100644 --- a/components/camel-etcd/src/main/java/org/apache/camel/component/etcd/processor/remote/EtcdServiceCallProcessor.java +++ b/components/camel-etcd/src/main/java/org/apache/camel/component/etcd/processor/remote/EtcdServiceCallProcessor.java @@ -21,19 +21,18 @@ import org.apache.camel.ExchangePattern; import org.apache.camel.component.etcd.EtcdConfiguration; import org.apache.camel.impl.remote.DefaultServiceCallProcessor; import org.apache.camel.spi.ProcessorFactory; -import org.apache.camel.spi.ServiceCallServer; import org.apache.camel.spi.ServiceCallServerListStrategy; /** * {@link ProcessorFactory} that creates the Etcd implementation of the ServiceCall EIP. */ -public class EtcdServiceCallProcessor extends DefaultServiceCallProcessor<ServiceCallServer> { +public class EtcdServiceCallProcessor extends DefaultServiceCallProcessor<EtcdServiceCallServer> { public EtcdServiceCallProcessor(String name, String scheme, String uri, ExchangePattern exchangePattern, EtcdConfiguration conf) { super(name, scheme, uri, exchangePattern); } @Override - public void setServerListStrategy(ServiceCallServerListStrategy<ServiceCallServer> serverListStrategy) { + public void setServerListStrategy(ServiceCallServerListStrategy<EtcdServiceCallServer> serverListStrategy) { if (!(serverListStrategy instanceof EtcdServiceCallServerListStrategy)) { throw new IllegalArgumentException("ServerListStrategy is not an instance of EtcdServiceCallServerListStrategy"); } http://git-wip-us.apache.org/repos/asf/camel/blob/615bc235/components/camel-etcd/src/main/java/org/apache/camel/component/etcd/processor/remote/EtcdServiceCallProcessorFactory.java ---------------------------------------------------------------------- diff --git a/components/camel-etcd/src/main/java/org/apache/camel/component/etcd/processor/remote/EtcdServiceCallProcessorFactory.java b/components/camel-etcd/src/main/java/org/apache/camel/component/etcd/processor/remote/EtcdServiceCallProcessorFactory.java index 9f25738..370916e 100644 --- a/components/camel-etcd/src/main/java/org/apache/camel/component/etcd/processor/remote/EtcdServiceCallProcessorFactory.java +++ b/components/camel-etcd/src/main/java/org/apache/camel/component/etcd/processor/remote/EtcdServiceCallProcessorFactory.java @@ -26,14 +26,13 @@ import org.apache.camel.impl.remote.DefaultServiceCallProcessor; import org.apache.camel.impl.remote.DefaultServiceCallProcessorFactory; import org.apache.camel.spi.ProcessorFactory; import org.apache.camel.spi.RouteContext; -import org.apache.camel.spi.ServiceCallServer; import org.apache.camel.spi.ServiceCallServerListStrategy; import org.apache.camel.util.ObjectHelper; /** * {@link ProcessorFactory} that creates the Etcd implementation of the ServiceCall EIP. */ -public class EtcdServiceCallProcessorFactory extends DefaultServiceCallProcessorFactory<EtcdConfiguration, ServiceCallServer> { +public class EtcdServiceCallProcessorFactory extends DefaultServiceCallProcessorFactory<EtcdConfiguration, EtcdServiceCallServer> { @Override protected EtcdConfiguration createConfiguration(RouteContext routeContext) throws Exception { return new EtcdConfiguration(routeContext.getCamelContext()); @@ -56,13 +55,15 @@ public class EtcdServiceCallProcessorFactory extends DefaultServiceCallProcessor ServiceCallServerListStrategy strategy = null; if (ObjectHelper.equal("ondemand", name, true)) { strategy = new EtcdServiceCallServerListStrategies.OnDemand(conf); + } else if (ObjectHelper.equal("watch", name, true)) { + strategy = new EtcdServiceCallServerListStrategies.OnDemand(conf); } return Optional.ofNullable(strategy); } @Override - protected ServiceCallServerListStrategy<ServiceCallServer> createDefaultServerListStrategy(EtcdConfiguration conf) throws Exception { + protected ServiceCallServerListStrategy<EtcdServiceCallServer> createDefaultServerListStrategy(EtcdConfiguration conf) throws Exception { return new EtcdServiceCallServerListStrategies.OnDemand(conf); } } http://git-wip-us.apache.org/repos/asf/camel/blob/615bc235/components/camel-etcd/src/main/java/org/apache/camel/component/etcd/processor/remote/EtcdServiceCallServerListStrategies.java ---------------------------------------------------------------------- diff --git a/components/camel-etcd/src/main/java/org/apache/camel/component/etcd/processor/remote/EtcdServiceCallServerListStrategies.java b/components/camel-etcd/src/main/java/org/apache/camel/component/etcd/processor/remote/EtcdServiceCallServerListStrategies.java index e4b4a8a..d65ced9 100644 --- a/components/camel-etcd/src/main/java/org/apache/camel/component/etcd/processor/remote/EtcdServiceCallServerListStrategies.java +++ b/components/camel-etcd/src/main/java/org/apache/camel/component/etcd/processor/remote/EtcdServiceCallServerListStrategies.java @@ -20,56 +20,172 @@ import java.util.Collections; import java.util.List; import java.util.Objects; import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Predicate; import java.util.stream.Collectors; +import mousio.client.promises.ResponsePromise; import mousio.etcd4j.requests.EtcdKeyGetRequest; +import mousio.etcd4j.responses.EtcdException; import mousio.etcd4j.responses.EtcdKeysResponse; import org.apache.camel.RuntimeCamelException; import org.apache.camel.component.etcd.EtcdConfiguration; -import org.apache.camel.spi.ServiceCallServer; +import org.apache.camel.component.etcd.EtcdHelper; import org.apache.camel.util.ObjectHelper; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public final class EtcdServiceCallServerListStrategies { + + private abstract static class AbstractStrategy extends EtcdServiceCallServerListStrategy { + AbstractStrategy(EtcdConfiguration configuration) throws Exception { + super(configuration); + } + + protected List<EtcdServiceCallServer> getServers() { + return getServers(s -> true); + } + + protected List<EtcdServiceCallServer> getServers(Predicate<EtcdServiceCallServer> filter) { + List<EtcdServiceCallServer> servers = Collections.emptyList(); + + if (isRunAllowed()) { + try { + final EtcdConfiguration conf = getConfiguration(); + final EtcdKeyGetRequest request = getClient().get(conf.getServicePath()).recursive(); + if (conf.hasTimeout()) { + request.timeout(conf.getTimeout(), TimeUnit.SECONDS); + } + + final EtcdKeysResponse response = request.send().get(); + + if (Objects.nonNull(response.node) && !response.node.nodes.isEmpty()) { + servers = response.node.nodes.stream() + .map(node -> node.value) + .filter(ObjectHelper::isNotEmpty) + .map(this::nodeFromString) + .filter(Objects::nonNull) + .filter(filter) + .sorted(EtcdServiceCallServer.COMPARATOR) + .collect(Collectors.toList()); + } + } catch (Exception e) { + throw new RuntimeCamelException(e); + } + } + + return servers; + } + } + private EtcdServiceCallServerListStrategies() { } - public static final class OnDemand extends EtcdServiceCallServerListStrategy { + public static final class OnDemand extends AbstractStrategy { public OnDemand(EtcdConfiguration configuration) throws Exception { super(configuration); } @Override - public List<ServiceCallServer> getUpdatedListOfServers(String name) { - List<ServiceCallServer> servers = Collections.emptyList(); - try { - final EtcdConfiguration conf = getConfiguration(); - final EtcdKeyGetRequest request = getClient().get(conf.getServicePath()).recursive(); - if (conf.hasTimeout()) { - request.timeout(conf.getTimeout(), TimeUnit.SECONDS); - } + public List<EtcdServiceCallServer> getUpdatedListOfServers(String name) { + return getServers(s -> name.equalsIgnoreCase(s.getName())); + } - final EtcdKeysResponse response = request.send().get(); - - if (Objects.nonNull(response.node) && !response.node.nodes.isEmpty()) { - servers = response.node.nodes.stream() - .map(node -> node.value) - .filter(ObjectHelper::isNotEmpty) - .map(this::nodeFromString) - .filter(Objects::nonNull) - .filter(s -> name.equalsIgnoreCase(s.getName())) - .sorted(EtcdServiceCallServer.COMPARATOR) - .collect(Collectors.toList()); - } - } catch (Exception e) { - throw new RuntimeCamelException(e); + @Override + public String toString() { + return "EtcdServiceCallServerListStrategy.OnDemand"; + } + } + + public static final class Watch extends AbstractStrategy + implements ResponsePromise.IsSimplePromiseResponseHandler<EtcdKeysResponse> { + + private static final Logger LOGGER = LoggerFactory.getLogger(Watch.class); + private final AtomicReference<List<EtcdServiceCallServer>> serversRef; + private final AtomicLong index; + private final String servicePath; + + public Watch(EtcdConfiguration configuration) throws Exception { + super(configuration); + + this.serversRef = new AtomicReference<>(); + this.index = new AtomicLong(0); + this.servicePath = ObjectHelper.notNull(configuration.getServicePath(), "servicePath"); + } + + @Override + public List<EtcdServiceCallServer> getUpdatedListOfServers(String name) { + List<EtcdServiceCallServer> servers = serversRef.get(); + if (servers == null) { + serversRef.set(getServers()); + watch(); } - return servers; + return serversRef.get().stream() + .filter(s -> name.equalsIgnoreCase(s.getName())) + .collect(Collectors.toList()); } @Override public String toString() { - return "OnDemand"; + return "EtcdServiceCallServerListStrategy.Watch"; + } + + // ************************************************************************* + // Watch + // ************************************************************************* + + @Override + public void onResponse(ResponsePromise<EtcdKeysResponse> promise) { + if (!isRunAllowed()) { + return; + } + + Throwable throwable = promise.getException(); + if (throwable != null && throwable instanceof EtcdException) { + EtcdException exception = (EtcdException) throwable; + if (EtcdHelper.isOutdatedIndexException(exception)) { + LOGGER.debug("Outdated index, key={}, cause={}", servicePath, exception.etcdCause); + index.set(exception.index + 1); + } + } else { + try { + EtcdKeysResponse response = promise.get(); + EtcdHelper.setIndex(index, response); + + serversRef.set(getServers()); + } catch (TimeoutException e) { + LOGGER.debug("Timeout watching for {}", getConfiguration().getServicePath()); + throwable = null; + } catch (Exception e) { + throwable = e; + } + } + + if (throwable == null) { + watch(); + } else { + throw new RuntimeCamelException(throwable); + } + } + + private void watch() { + if (!isRunAllowed()) { + return; + } + + try { + getClient().get(servicePath) + .recursive() + .waitForChange(index.get()) + .timeout(1, TimeUnit.SECONDS) + .send() + .addListener(this); + } catch (Exception e) { + throw new RuntimeCamelException(e); + } } } @@ -80,4 +196,8 @@ public final class EtcdServiceCallServerListStrategies { public static EtcdServiceCallServerListStrategy onDemand(EtcdConfiguration configuration) throws Exception { return new OnDemand(configuration); } + + public static EtcdServiceCallServerListStrategy watch(EtcdConfiguration configuration) throws Exception { + return new Watch(configuration); + } } http://git-wip-us.apache.org/repos/asf/camel/blob/615bc235/components/camel-etcd/src/main/java/org/apache/camel/component/etcd/processor/remote/EtcdServiceCallServerListStrategy.java ---------------------------------------------------------------------- diff --git a/components/camel-etcd/src/main/java/org/apache/camel/component/etcd/processor/remote/EtcdServiceCallServerListStrategy.java b/components/camel-etcd/src/main/java/org/apache/camel/component/etcd/processor/remote/EtcdServiceCallServerListStrategy.java index 38f6139..a91156a 100644 --- a/components/camel-etcd/src/main/java/org/apache/camel/component/etcd/processor/remote/EtcdServiceCallServerListStrategy.java +++ b/components/camel-etcd/src/main/java/org/apache/camel/component/etcd/processor/remote/EtcdServiceCallServerListStrategy.java @@ -21,11 +21,10 @@ import mousio.etcd4j.EtcdClient; import org.apache.camel.component.etcd.EtcdConfiguration; import org.apache.camel.component.etcd.EtcdHelper; import org.apache.camel.impl.remote.DefaultServiceCallServerListStrategy; -import org.apache.camel.spi.ServiceCallServer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public class EtcdServiceCallServerListStrategy extends DefaultServiceCallServerListStrategy<ServiceCallServer> { +public class EtcdServiceCallServerListStrategy extends DefaultServiceCallServerListStrategy<EtcdServiceCallServer> { private static final Logger LOGGER = LoggerFactory.getLogger(EtcdServiceCallServerListStrategy.class); private static final ObjectMapper MAPPER = EtcdHelper.createObjectMapper(); http://git-wip-us.apache.org/repos/asf/camel/blob/615bc235/components/camel-etcd/src/test/java/org/apache/camel/component/etcd/EtcdKeysTest.java ---------------------------------------------------------------------- diff --git a/components/camel-etcd/src/test/java/org/apache/camel/component/etcd/EtcdKeysTest.java b/components/camel-etcd/src/test/java/org/apache/camel/component/etcd/EtcdKeysTest.java index c2bc507..8f188e9 100644 --- a/components/camel-etcd/src/test/java/org/apache/camel/component/etcd/EtcdKeysTest.java +++ b/components/camel-etcd/src/test/java/org/apache/camel/component/etcd/EtcdKeysTest.java @@ -29,7 +29,6 @@ import org.apache.camel.builder.RouteBuilder; import org.apache.camel.component.mock.MockEndpoint; import org.junit.Test; -//@Ignore("Etcd must be started manually") public class EtcdKeysTest extends EtcdTestSupport { @Test(expected = EtcdException.class) http://git-wip-us.apache.org/repos/asf/camel/blob/615bc235/components/camel-etcd/src/test/java/org/apache/camel/component/etcd/EtcdStatsTest.java ---------------------------------------------------------------------- diff --git a/components/camel-etcd/src/test/java/org/apache/camel/component/etcd/EtcdStatsTest.java b/components/camel-etcd/src/test/java/org/apache/camel/component/etcd/EtcdStatsTest.java index b3eeb13..6af0148 100644 --- a/components/camel-etcd/src/test/java/org/apache/camel/component/etcd/EtcdStatsTest.java +++ b/components/camel-etcd/src/test/java/org/apache/camel/component/etcd/EtcdStatsTest.java @@ -25,7 +25,6 @@ import org.apache.camel.builder.RouteBuilder; import org.apache.camel.component.mock.MockEndpoint; import org.junit.Test; -//@Ignore("Etcd must be started manually") public class EtcdStatsTest extends EtcdTestSupport { @Test http://git-wip-us.apache.org/repos/asf/camel/blob/615bc235/components/camel-etcd/src/test/java/org/apache/camel/component/etcd/EtcdWatchTest.java ---------------------------------------------------------------------- diff --git a/components/camel-etcd/src/test/java/org/apache/camel/component/etcd/EtcdWatchTest.java b/components/camel-etcd/src/test/java/org/apache/camel/component/etcd/EtcdWatchTest.java index 39aca62..fad8687 100644 --- a/components/camel-etcd/src/test/java/org/apache/camel/component/etcd/EtcdWatchTest.java +++ b/components/camel-etcd/src/test/java/org/apache/camel/component/etcd/EtcdWatchTest.java @@ -23,7 +23,6 @@ import org.apache.camel.builder.RouteBuilder; import org.apache.camel.component.mock.MockEndpoint; import org.junit.Test; -//@Ignore("Etcd must be started manually") public class EtcdWatchTest extends EtcdTestSupport { @Test http://git-wip-us.apache.org/repos/asf/camel/blob/615bc235/components/camel-etcd/src/test/java/org/apache/camel/component/etcd/processor/remote/EtcdServiceCallRouteTest.java ---------------------------------------------------------------------- diff --git a/components/camel-etcd/src/test/java/org/apache/camel/component/etcd/processor/remote/EtcdServiceCallRouteTest.java b/components/camel-etcd/src/test/java/org/apache/camel/component/etcd/processor/remote/EtcdServiceCallRouteTest.java index a20bd02..00d75d3 100644 --- a/components/camel-etcd/src/test/java/org/apache/camel/component/etcd/processor/remote/EtcdServiceCallRouteTest.java +++ b/components/camel-etcd/src/test/java/org/apache/camel/component/etcd/processor/remote/EtcdServiceCallRouteTest.java @@ -29,10 +29,8 @@ import org.apache.camel.RoutesBuilder; import org.apache.camel.builder.RouteBuilder; import org.apache.camel.component.etcd.EtcdHelper; import org.apache.camel.component.etcd.EtcdTestSupport; -import org.junit.Ignore; import org.junit.Test; -@Ignore public class EtcdServiceCallRouteTest extends EtcdTestSupport { private static final ObjectMapper MAPPER = EtcdHelper.createObjectMapper(); private static final String SERVICE_NAME = "http-service"; http://git-wip-us.apache.org/repos/asf/camel/blob/615bc235/components/camel-etcd/src/test/java/org/apache/camel/component/etcd/processor/remote/EtcdServiceCallServerListStrategyTest.java ---------------------------------------------------------------------- diff --git a/components/camel-etcd/src/test/java/org/apache/camel/component/etcd/processor/remote/EtcdServiceCallServerListStrategyTest.java b/components/camel-etcd/src/test/java/org/apache/camel/component/etcd/processor/remote/EtcdServiceCallServerListStrategyTest.java new file mode 100644 index 0000000..55423be --- /dev/null +++ b/components/camel-etcd/src/test/java/org/apache/camel/component/etcd/processor/remote/EtcdServiceCallServerListStrategyTest.java @@ -0,0 +1,110 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.camel.component.etcd.processor.remote; + +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.atomic.AtomicInteger; + +import com.fasterxml.jackson.databind.ObjectMapper; +import mousio.etcd4j.EtcdClient; +import mousio.etcd4j.responses.EtcdException; +import org.apache.camel.component.etcd.EtcdConfiguration; +import org.apache.camel.component.etcd.EtcdHelper; +import org.apache.camel.component.etcd.EtcdTestSupport; +import org.junit.Ignore; +import org.junit.Test; + +@Ignore +public class EtcdServiceCallServerListStrategyTest extends EtcdTestSupport { + private static final ObjectMapper MAPPER = EtcdHelper.createObjectMapper(); + private static final EtcdConfiguration CONFIGURATION = new EtcdConfiguration(null); + private static final AtomicInteger PORT = new AtomicInteger(0); + + private EtcdClient client; + + @Override + public void doPreSetup() throws Exception { + client = getClient(); + try { + client.deleteDir(CONFIGURATION.getServicePath()).recursive().send().get(); + } catch (EtcdException e) { + // Ignore + } + } + + @Override + public void tearDown() throws Exception { + try { + client.deleteDir(CONFIGURATION.getServicePath()).recursive().send().get(); + client.close(); + client = null; + } catch (EtcdException e) { + // Ignore + } + } + + @Test + public void testOnDemandStrategy() throws Exception { + for (int i = 0; i < 3; i++) { + addServer(client, "serviceType-1"); + } + for (int i = 0; i < 2; i++) { + addServer(client, "serviceType-2"); + } + + EtcdServiceCallServerListStrategy strategy = EtcdServiceCallServerListStrategies.onDemand(CONFIGURATION); + strategy.start(); + + assertEquals(3, strategy.getUpdatedListOfServers("serviceType-1").size()); + assertEquals(2, strategy.getUpdatedListOfServers("serviceType-2").size()); + + strategy.stop(); + } + + @Test + public void testWatchStrategy() throws Exception { + addServer(client, "serviceType-3"); + + EtcdServiceCallServerListStrategy strategy = EtcdServiceCallServerListStrategies.watch(CONFIGURATION); + strategy.start(); + + assertEquals(1, strategy.getUpdatedListOfServers("serviceType-3").size()); + + addServer(client, "serviceType-3"); + addServer(client, "serviceType-3"); + addServer(client, "serviceType-4"); + + Thread.sleep(250); + + assertEquals(3, strategy.getUpdatedListOfServers("serviceType-3").size()); + + strategy.stop(); + } + + private void addServer(EtcdClient client, String name) throws Exception { + int port = PORT.incrementAndGet(); + + Map<String, Object> server = new HashMap<>(); + server.put("name", name); + server.put("address", "127.0.0.1"); + server.put("port", 8000 + port); + + client.put(CONFIGURATION.getServicePath() + "service-" + port, MAPPER.writeValueAsString(server)).send().get(); + } +}
