CAMEL-10072: camel-etcd : implement watching ServiceCallServerListStrategy

Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/615bc235
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/615bc235
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/615bc235

Branch: refs/heads/master
Commit: 615bc2355a1152c8b7957d05058f48ef04be4e9c
Parents: 181e8f8
Author: lburgazzoli <[email protected]>
Authored: Mon Jun 20 15:36:25 2016 +0200
Committer: lburgazzoli <[email protected]>
Committed: Mon Jun 20 15:57:33 2016 +0200

----------------------------------------------------------------------
 .../camel/component/etcd/EtcdConfiguration.java |   7 +-
 .../apache/camel/component/etcd/EtcdHelper.java |  18 ++
 .../component/etcd/policy/EtcdRoutePolicy.java  |  32 ++--
 .../remote/EtcdServiceCallProcessor.java        |   5 +-
 .../remote/EtcdServiceCallProcessorFactory.java |   7 +-
 .../EtcdServiceCallServerListStrategies.java    | 172 ++++++++++++++++---
 .../EtcdServiceCallServerListStrategy.java      |   3 +-
 .../camel/component/etcd/EtcdKeysTest.java      |   1 -
 .../camel/component/etcd/EtcdStatsTest.java     |   1 -
 .../camel/component/etcd/EtcdWatchTest.java     |   1 -
 .../remote/EtcdServiceCallRouteTest.java        |   2 -
 .../EtcdServiceCallServerListStrategyTest.java  | 110 ++++++++++++
 12 files changed, 297 insertions(+), 62 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/615bc235/components/camel-etcd/src/main/java/org/apache/camel/component/etcd/EtcdConfiguration.java
----------------------------------------------------------------------
diff --git 
a/components/camel-etcd/src/main/java/org/apache/camel/component/etcd/EtcdConfiguration.java
 
b/components/camel-etcd/src/main/java/org/apache/camel/component/etcd/EtcdConfiguration.java
index 263f0b4..aeb089a 100644
--- 
a/components/camel-etcd/src/main/java/org/apache/camel/component/etcd/EtcdConfiguration.java
+++ 
b/components/camel-etcd/src/main/java/org/apache/camel/component/etcd/EtcdConfiguration.java
@@ -183,9 +183,10 @@ public class EtcdConfiguration {
 
         URI[] etcdUriList = new URI[uris.length];
 
-        int i = 0;
-        for (String uri : uris) {
-            etcdUriList[i++] = 
URI.create(camelContext.resolvePropertyPlaceholders(uri));
+        for (int i = 0; i < uris.length; i++) {
+            etcdUriList[i] = camelContext != null
+                ? URI.create(camelContext.resolvePropertyPlaceholders(uris[i]))
+                : URI.create(uris[i]);
         }
 
         return new EtcdClient(

http://git-wip-us.apache.org/repos/asf/camel/blob/615bc235/components/camel-etcd/src/main/java/org/apache/camel/component/etcd/EtcdHelper.java
----------------------------------------------------------------------
diff --git 
a/components/camel-etcd/src/main/java/org/apache/camel/component/etcd/EtcdHelper.java
 
b/components/camel-etcd/src/main/java/org/apache/camel/component/etcd/EtcdHelper.java
index b222037..a2954bc 100644
--- 
a/components/camel-etcd/src/main/java/org/apache/camel/component/etcd/EtcdHelper.java
+++ 
b/components/camel-etcd/src/main/java/org/apache/camel/component/etcd/EtcdHelper.java
@@ -16,13 +16,19 @@
  */
 package org.apache.camel.component.etcd;
 
+import java.util.concurrent.atomic.AtomicLong;
+
 import com.fasterxml.jackson.annotation.JsonInclude;
 import com.fasterxml.jackson.databind.DeserializationFeature;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import mousio.etcd4j.responses.EtcdErrorCode;
 import mousio.etcd4j.responses.EtcdException;
+import mousio.etcd4j.responses.EtcdKeysResponse;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 public final class EtcdHelper  {
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(EtcdHelper.class);
     private static final String OUTDATED_EVENT_MSG = "requested index is 
outdated and cleared";
 
     private EtcdHelper() {
@@ -41,4 +47,16 @@ public final class EtcdHelper  {
             .configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, 
false)
             .setSerializationInclusion(JsonInclude.Include.NON_NULL);
     }
+
+
+
+    public static void setIndex(AtomicLong index, EtcdKeysResponse response) {
+        if (response != null && response.node != null) {
+            index.set(response.node.modifiedIndex + 1);
+            LOGGER.debug("Index received={}, next={}", 
response.node.modifiedIndex, index.get());
+        } else {
+            index.set(response.etcdIndex + 1);
+            LOGGER.debug("Index received={}, next={}", 
response.node.modifiedIndex, index.get());
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/615bc235/components/camel-etcd/src/main/java/org/apache/camel/component/etcd/policy/EtcdRoutePolicy.java
----------------------------------------------------------------------
diff --git 
a/components/camel-etcd/src/main/java/org/apache/camel/component/etcd/policy/EtcdRoutePolicy.java
 
b/components/camel-etcd/src/main/java/org/apache/camel/component/etcd/policy/EtcdRoutePolicy.java
index 32e95df..115af13 100644
--- 
a/components/camel-etcd/src/main/java/org/apache/camel/component/etcd/policy/EtcdRoutePolicy.java
+++ 
b/components/camel-etcd/src/main/java/org/apache/camel/component/etcd/policy/EtcdRoutePolicy.java
@@ -250,7 +250,7 @@ public class EtcdRoutePolicy extends RoutePolicySupport 
implements ResponsePromi
         } else {
             try {
                 EtcdKeysResponse response = promise.get();
-                setIndex(response);
+                EtcdHelper.setIndex(index, response);
 
                 if (response.node.value == null) {
                     setLeader(tryTakeLeadership());
@@ -267,20 +267,20 @@ public class EtcdRoutePolicy extends RoutePolicySupport 
implements ResponsePromi
         }
 
         if (throwable == null) {
-            try {
-                watch();
-            } catch (Exception e) {
-                throw new RuntimeCamelException(e);
-            }
+            watch();
         } else {
             throw new RuntimeCamelException(throwable);
         }
     }
 
-    private void watch() throws Exception {
-        if (isRunAllowed()) {
+    private void watch() {
+        if (!isRunAllowed()) {
+            return;
+        }
+
+        try {
             if (leader.get()) {
-                setIndex(client.refresh(servicePath, ttl)
+                EtcdHelper.setIndex(index, client.refresh(servicePath, ttl)
                     .send()
                     .get()
                 );
@@ -293,6 +293,8 @@ public class EtcdRoutePolicy extends RoutePolicySupport 
implements ResponsePromi
                 .timeout(ttl / 3, TimeUnit.SECONDS)
                 .send()
                 .addListener(this);
+        } catch (Exception e) {
+            throw new RuntimeCamelException(e);
         }
     }
 
@@ -308,7 +310,7 @@ public class EtcdRoutePolicy extends RoutePolicySupport 
implements ResponsePromi
                 .get();
 
             result = ObjectHelper.equal(serviceName, response.node.value);
-            setIndex(response);
+            EtcdHelper.setIndex(index, response);
         } catch (EtcdException e) {
             if (!e.isErrorCode(EtcdErrorCode.NodeExist)) {
                 throw e;
@@ -317,14 +319,4 @@ public class EtcdRoutePolicy extends RoutePolicySupport 
implements ResponsePromi
 
         return result;
     }
-
-    private void setIndex(EtcdKeysResponse response) {
-        if (response != null && response.node != null) {
-            index.set(response.node.modifiedIndex + 1);
-            LOGGER.debug("Index received={}, next={}", 
response.node.modifiedIndex, index.get());
-        } else {
-            index.set(response.etcdIndex + 1);
-            LOGGER.debug("Index received={}, next={}", 
response.node.modifiedIndex, index.get());
-        }
-    }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/camel/blob/615bc235/components/camel-etcd/src/main/java/org/apache/camel/component/etcd/processor/remote/EtcdServiceCallProcessor.java
----------------------------------------------------------------------
diff --git 
a/components/camel-etcd/src/main/java/org/apache/camel/component/etcd/processor/remote/EtcdServiceCallProcessor.java
 
b/components/camel-etcd/src/main/java/org/apache/camel/component/etcd/processor/remote/EtcdServiceCallProcessor.java
index 3251ae5..f1200a2 100644
--- 
a/components/camel-etcd/src/main/java/org/apache/camel/component/etcd/processor/remote/EtcdServiceCallProcessor.java
+++ 
b/components/camel-etcd/src/main/java/org/apache/camel/component/etcd/processor/remote/EtcdServiceCallProcessor.java
@@ -21,19 +21,18 @@ import org.apache.camel.ExchangePattern;
 import org.apache.camel.component.etcd.EtcdConfiguration;
 import org.apache.camel.impl.remote.DefaultServiceCallProcessor;
 import org.apache.camel.spi.ProcessorFactory;
-import org.apache.camel.spi.ServiceCallServer;
 import org.apache.camel.spi.ServiceCallServerListStrategy;
 
 /**
  * {@link ProcessorFactory} that creates the Etcd implementation of the 
ServiceCall EIP.
  */
-public class EtcdServiceCallProcessor extends 
DefaultServiceCallProcessor<ServiceCallServer> {
+public class EtcdServiceCallProcessor extends 
DefaultServiceCallProcessor<EtcdServiceCallServer> {
     public EtcdServiceCallProcessor(String name, String scheme, String uri, 
ExchangePattern exchangePattern, EtcdConfiguration conf) {
         super(name, scheme, uri, exchangePattern);
     }
 
     @Override
-    public void 
setServerListStrategy(ServiceCallServerListStrategy<ServiceCallServer> 
serverListStrategy) {
+    public void 
setServerListStrategy(ServiceCallServerListStrategy<EtcdServiceCallServer> 
serverListStrategy) {
         if (!(serverListStrategy instanceof 
EtcdServiceCallServerListStrategy)) {
             throw new IllegalArgumentException("ServerListStrategy is not an 
instance of EtcdServiceCallServerListStrategy");
         }

http://git-wip-us.apache.org/repos/asf/camel/blob/615bc235/components/camel-etcd/src/main/java/org/apache/camel/component/etcd/processor/remote/EtcdServiceCallProcessorFactory.java
----------------------------------------------------------------------
diff --git 
a/components/camel-etcd/src/main/java/org/apache/camel/component/etcd/processor/remote/EtcdServiceCallProcessorFactory.java
 
b/components/camel-etcd/src/main/java/org/apache/camel/component/etcd/processor/remote/EtcdServiceCallProcessorFactory.java
index 9f25738..370916e 100644
--- 
a/components/camel-etcd/src/main/java/org/apache/camel/component/etcd/processor/remote/EtcdServiceCallProcessorFactory.java
+++ 
b/components/camel-etcd/src/main/java/org/apache/camel/component/etcd/processor/remote/EtcdServiceCallProcessorFactory.java
@@ -26,14 +26,13 @@ import 
org.apache.camel.impl.remote.DefaultServiceCallProcessor;
 import org.apache.camel.impl.remote.DefaultServiceCallProcessorFactory;
 import org.apache.camel.spi.ProcessorFactory;
 import org.apache.camel.spi.RouteContext;
-import org.apache.camel.spi.ServiceCallServer;
 import org.apache.camel.spi.ServiceCallServerListStrategy;
 import org.apache.camel.util.ObjectHelper;
 
 /**
  * {@link ProcessorFactory} that creates the Etcd implementation of the 
ServiceCall EIP.
  */
-public class EtcdServiceCallProcessorFactory extends 
DefaultServiceCallProcessorFactory<EtcdConfiguration, ServiceCallServer> {
+public class EtcdServiceCallProcessorFactory extends 
DefaultServiceCallProcessorFactory<EtcdConfiguration, EtcdServiceCallServer> {
     @Override
     protected EtcdConfiguration createConfiguration(RouteContext routeContext) 
throws Exception {
         return new EtcdConfiguration(routeContext.getCamelContext());
@@ -56,13 +55,15 @@ public class EtcdServiceCallProcessorFactory extends 
DefaultServiceCallProcessor
         ServiceCallServerListStrategy strategy = null;
         if (ObjectHelper.equal("ondemand", name, true)) {
             strategy = new EtcdServiceCallServerListStrategies.OnDemand(conf);
+        } else if (ObjectHelper.equal("watch", name, true)) {
+            strategy = new EtcdServiceCallServerListStrategies.OnDemand(conf);
         }
 
         return Optional.ofNullable(strategy);
     }
 
     @Override
-    protected ServiceCallServerListStrategy<ServiceCallServer> 
createDefaultServerListStrategy(EtcdConfiguration conf) throws Exception {
+    protected ServiceCallServerListStrategy<EtcdServiceCallServer> 
createDefaultServerListStrategy(EtcdConfiguration conf) throws Exception {
         return new EtcdServiceCallServerListStrategies.OnDemand(conf);
     }
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/615bc235/components/camel-etcd/src/main/java/org/apache/camel/component/etcd/processor/remote/EtcdServiceCallServerListStrategies.java
----------------------------------------------------------------------
diff --git 
a/components/camel-etcd/src/main/java/org/apache/camel/component/etcd/processor/remote/EtcdServiceCallServerListStrategies.java
 
b/components/camel-etcd/src/main/java/org/apache/camel/component/etcd/processor/remote/EtcdServiceCallServerListStrategies.java
index e4b4a8a..d65ced9 100644
--- 
a/components/camel-etcd/src/main/java/org/apache/camel/component/etcd/processor/remote/EtcdServiceCallServerListStrategies.java
+++ 
b/components/camel-etcd/src/main/java/org/apache/camel/component/etcd/processor/remote/EtcdServiceCallServerListStrategies.java
@@ -20,56 +20,172 @@ import java.util.Collections;
 import java.util.List;
 import java.util.Objects;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Predicate;
 import java.util.stream.Collectors;
 
+import mousio.client.promises.ResponsePromise;
 import mousio.etcd4j.requests.EtcdKeyGetRequest;
+import mousio.etcd4j.responses.EtcdException;
 import mousio.etcd4j.responses.EtcdKeysResponse;
 import org.apache.camel.RuntimeCamelException;
 import org.apache.camel.component.etcd.EtcdConfiguration;
-import org.apache.camel.spi.ServiceCallServer;
+import org.apache.camel.component.etcd.EtcdHelper;
 import org.apache.camel.util.ObjectHelper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 public final class EtcdServiceCallServerListStrategies {
+
+    private abstract static class AbstractStrategy extends 
EtcdServiceCallServerListStrategy {
+        AbstractStrategy(EtcdConfiguration configuration) throws Exception {
+            super(configuration);
+        }
+
+        protected List<EtcdServiceCallServer> getServers() {
+            return getServers(s -> true);
+        }
+
+        protected List<EtcdServiceCallServer> 
getServers(Predicate<EtcdServiceCallServer> filter) {
+            List<EtcdServiceCallServer> servers = Collections.emptyList();
+
+            if (isRunAllowed()) {
+                try {
+                    final EtcdConfiguration conf = getConfiguration();
+                    final EtcdKeyGetRequest request = 
getClient().get(conf.getServicePath()).recursive();
+                    if (conf.hasTimeout()) {
+                        request.timeout(conf.getTimeout(), TimeUnit.SECONDS);
+                    }
+
+                    final EtcdKeysResponse response = request.send().get();
+
+                    if (Objects.nonNull(response.node) && 
!response.node.nodes.isEmpty()) {
+                        servers = response.node.nodes.stream()
+                            .map(node -> node.value)
+                            .filter(ObjectHelper::isNotEmpty)
+                            .map(this::nodeFromString)
+                            .filter(Objects::nonNull)
+                            .filter(filter)
+                            .sorted(EtcdServiceCallServer.COMPARATOR)
+                            .collect(Collectors.toList());
+                    }
+                } catch (Exception e) {
+                    throw new RuntimeCamelException(e);
+                }
+            }
+
+            return servers;
+        }
+    }
+
     private EtcdServiceCallServerListStrategies() {
     }
 
-    public static final class OnDemand extends 
EtcdServiceCallServerListStrategy {
+    public static final class OnDemand extends AbstractStrategy {
         public OnDemand(EtcdConfiguration configuration) throws Exception {
             super(configuration);
         }
 
         @Override
-        public List<ServiceCallServer> getUpdatedListOfServers(String name) {
-            List<ServiceCallServer> servers = Collections.emptyList();
-            try {
-                final EtcdConfiguration conf = getConfiguration();
-                final EtcdKeyGetRequest request = 
getClient().get(conf.getServicePath()).recursive();
-                if (conf.hasTimeout()) {
-                    request.timeout(conf.getTimeout(), TimeUnit.SECONDS);
-                }
+        public List<EtcdServiceCallServer> getUpdatedListOfServers(String 
name) {
+            return getServers(s -> name.equalsIgnoreCase(s.getName()));
+        }
 
-                final EtcdKeysResponse response = request.send().get();
-
-                if (Objects.nonNull(response.node) && 
!response.node.nodes.isEmpty()) {
-                    servers = response.node.nodes.stream()
-                        .map(node -> node.value)
-                        .filter(ObjectHelper::isNotEmpty)
-                        .map(this::nodeFromString)
-                        .filter(Objects::nonNull)
-                        .filter(s -> name.equalsIgnoreCase(s.getName()))
-                        .sorted(EtcdServiceCallServer.COMPARATOR)
-                        .collect(Collectors.toList());
-                }
-            } catch (Exception e) {
-                throw new RuntimeCamelException(e);
+        @Override
+        public String toString() {
+            return "EtcdServiceCallServerListStrategy.OnDemand";
+        }
+    }
+
+    public static final class Watch extends AbstractStrategy
+            implements 
ResponsePromise.IsSimplePromiseResponseHandler<EtcdKeysResponse> {
+
+        private static final Logger LOGGER = 
LoggerFactory.getLogger(Watch.class);
+        private final AtomicReference<List<EtcdServiceCallServer>> serversRef;
+        private final AtomicLong index;
+        private final String servicePath;
+
+        public Watch(EtcdConfiguration configuration) throws Exception {
+            super(configuration);
+
+            this.serversRef = new AtomicReference<>();
+            this.index = new AtomicLong(0);
+            this.servicePath = 
ObjectHelper.notNull(configuration.getServicePath(), "servicePath");
+        }
+
+        @Override
+        public List<EtcdServiceCallServer> getUpdatedListOfServers(String 
name) {
+            List<EtcdServiceCallServer> servers = serversRef.get();
+            if (servers == null) {
+                serversRef.set(getServers());
+                watch();
             }
 
-            return servers;
+            return serversRef.get().stream()
+                .filter(s -> name.equalsIgnoreCase(s.getName()))
+                .collect(Collectors.toList());
         }
 
         @Override
         public String toString() {
-            return "OnDemand";
+            return "EtcdServiceCallServerListStrategy.Watch";
+        }
+
+        // 
*************************************************************************
+        // Watch
+        // 
*************************************************************************
+
+        @Override
+        public void onResponse(ResponsePromise<EtcdKeysResponse> promise) {
+            if (!isRunAllowed()) {
+                return;
+            }
+
+            Throwable throwable = promise.getException();
+            if (throwable != null && throwable instanceof EtcdException) {
+                EtcdException exception = (EtcdException) throwable;
+                if (EtcdHelper.isOutdatedIndexException(exception)) {
+                    LOGGER.debug("Outdated index, key={}, cause={}", 
servicePath, exception.etcdCause);
+                    index.set(exception.index + 1);
+                }
+            } else {
+                try {
+                    EtcdKeysResponse response = promise.get();
+                    EtcdHelper.setIndex(index, response);
+
+                    serversRef.set(getServers());
+                } catch (TimeoutException e) {
+                    LOGGER.debug("Timeout watching for {}", 
getConfiguration().getServicePath());
+                    throwable = null;
+                } catch (Exception e) {
+                    throwable = e;
+                }
+            }
+
+            if (throwable == null) {
+                watch();
+            } else {
+                throw new RuntimeCamelException(throwable);
+            }
+        }
+
+        private void watch() {
+            if (!isRunAllowed()) {
+                return;
+            }
+
+            try {
+                getClient().get(servicePath)
+                    .recursive()
+                    .waitForChange(index.get())
+                    .timeout(1, TimeUnit.SECONDS)
+                    .send()
+                    .addListener(this);
+            } catch (Exception e) {
+                throw new RuntimeCamelException(e);
+            }
         }
     }
 
@@ -80,4 +196,8 @@ public final class EtcdServiceCallServerListStrategies {
     public static EtcdServiceCallServerListStrategy onDemand(EtcdConfiguration 
configuration) throws Exception {
         return new OnDemand(configuration);
     }
+
+    public static EtcdServiceCallServerListStrategy watch(EtcdConfiguration 
configuration) throws Exception {
+        return new Watch(configuration);
+    }
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/615bc235/components/camel-etcd/src/main/java/org/apache/camel/component/etcd/processor/remote/EtcdServiceCallServerListStrategy.java
----------------------------------------------------------------------
diff --git 
a/components/camel-etcd/src/main/java/org/apache/camel/component/etcd/processor/remote/EtcdServiceCallServerListStrategy.java
 
b/components/camel-etcd/src/main/java/org/apache/camel/component/etcd/processor/remote/EtcdServiceCallServerListStrategy.java
index 38f6139..a91156a 100644
--- 
a/components/camel-etcd/src/main/java/org/apache/camel/component/etcd/processor/remote/EtcdServiceCallServerListStrategy.java
+++ 
b/components/camel-etcd/src/main/java/org/apache/camel/component/etcd/processor/remote/EtcdServiceCallServerListStrategy.java
@@ -21,11 +21,10 @@ import mousio.etcd4j.EtcdClient;
 import org.apache.camel.component.etcd.EtcdConfiguration;
 import org.apache.camel.component.etcd.EtcdHelper;
 import org.apache.camel.impl.remote.DefaultServiceCallServerListStrategy;
-import org.apache.camel.spi.ServiceCallServer;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-public class EtcdServiceCallServerListStrategy extends 
DefaultServiceCallServerListStrategy<ServiceCallServer> {
+public class EtcdServiceCallServerListStrategy extends 
DefaultServiceCallServerListStrategy<EtcdServiceCallServer> {
     private static final Logger LOGGER = 
LoggerFactory.getLogger(EtcdServiceCallServerListStrategy.class);
     private static final ObjectMapper MAPPER = EtcdHelper.createObjectMapper();
 

http://git-wip-us.apache.org/repos/asf/camel/blob/615bc235/components/camel-etcd/src/test/java/org/apache/camel/component/etcd/EtcdKeysTest.java
----------------------------------------------------------------------
diff --git 
a/components/camel-etcd/src/test/java/org/apache/camel/component/etcd/EtcdKeysTest.java
 
b/components/camel-etcd/src/test/java/org/apache/camel/component/etcd/EtcdKeysTest.java
index c2bc507..8f188e9 100644
--- 
a/components/camel-etcd/src/test/java/org/apache/camel/component/etcd/EtcdKeysTest.java
+++ 
b/components/camel-etcd/src/test/java/org/apache/camel/component/etcd/EtcdKeysTest.java
@@ -29,7 +29,6 @@ import org.apache.camel.builder.RouteBuilder;
 import org.apache.camel.component.mock.MockEndpoint;
 import org.junit.Test;
 
-//@Ignore("Etcd must be started manually")
 public class EtcdKeysTest extends EtcdTestSupport {
 
     @Test(expected = EtcdException.class)

http://git-wip-us.apache.org/repos/asf/camel/blob/615bc235/components/camel-etcd/src/test/java/org/apache/camel/component/etcd/EtcdStatsTest.java
----------------------------------------------------------------------
diff --git 
a/components/camel-etcd/src/test/java/org/apache/camel/component/etcd/EtcdStatsTest.java
 
b/components/camel-etcd/src/test/java/org/apache/camel/component/etcd/EtcdStatsTest.java
index b3eeb13..6af0148 100644
--- 
a/components/camel-etcd/src/test/java/org/apache/camel/component/etcd/EtcdStatsTest.java
+++ 
b/components/camel-etcd/src/test/java/org/apache/camel/component/etcd/EtcdStatsTest.java
@@ -25,7 +25,6 @@ import org.apache.camel.builder.RouteBuilder;
 import org.apache.camel.component.mock.MockEndpoint;
 import org.junit.Test;
 
-//@Ignore("Etcd must be started manually")
 public class EtcdStatsTest extends EtcdTestSupport {
 
     @Test

http://git-wip-us.apache.org/repos/asf/camel/blob/615bc235/components/camel-etcd/src/test/java/org/apache/camel/component/etcd/EtcdWatchTest.java
----------------------------------------------------------------------
diff --git 
a/components/camel-etcd/src/test/java/org/apache/camel/component/etcd/EtcdWatchTest.java
 
b/components/camel-etcd/src/test/java/org/apache/camel/component/etcd/EtcdWatchTest.java
index 39aca62..fad8687 100644
--- 
a/components/camel-etcd/src/test/java/org/apache/camel/component/etcd/EtcdWatchTest.java
+++ 
b/components/camel-etcd/src/test/java/org/apache/camel/component/etcd/EtcdWatchTest.java
@@ -23,7 +23,6 @@ import org.apache.camel.builder.RouteBuilder;
 import org.apache.camel.component.mock.MockEndpoint;
 import org.junit.Test;
 
-//@Ignore("Etcd must be started manually")
 public class EtcdWatchTest extends EtcdTestSupport {
 
     @Test

http://git-wip-us.apache.org/repos/asf/camel/blob/615bc235/components/camel-etcd/src/test/java/org/apache/camel/component/etcd/processor/remote/EtcdServiceCallRouteTest.java
----------------------------------------------------------------------
diff --git 
a/components/camel-etcd/src/test/java/org/apache/camel/component/etcd/processor/remote/EtcdServiceCallRouteTest.java
 
b/components/camel-etcd/src/test/java/org/apache/camel/component/etcd/processor/remote/EtcdServiceCallRouteTest.java
index a20bd02..00d75d3 100644
--- 
a/components/camel-etcd/src/test/java/org/apache/camel/component/etcd/processor/remote/EtcdServiceCallRouteTest.java
+++ 
b/components/camel-etcd/src/test/java/org/apache/camel/component/etcd/processor/remote/EtcdServiceCallRouteTest.java
@@ -29,10 +29,8 @@ import org.apache.camel.RoutesBuilder;
 import org.apache.camel.builder.RouteBuilder;
 import org.apache.camel.component.etcd.EtcdHelper;
 import org.apache.camel.component.etcd.EtcdTestSupport;
-import org.junit.Ignore;
 import org.junit.Test;
 
-@Ignore
 public class EtcdServiceCallRouteTest extends EtcdTestSupport {
     private static final ObjectMapper MAPPER = EtcdHelper.createObjectMapper();
     private static final String SERVICE_NAME = "http-service";

http://git-wip-us.apache.org/repos/asf/camel/blob/615bc235/components/camel-etcd/src/test/java/org/apache/camel/component/etcd/processor/remote/EtcdServiceCallServerListStrategyTest.java
----------------------------------------------------------------------
diff --git 
a/components/camel-etcd/src/test/java/org/apache/camel/component/etcd/processor/remote/EtcdServiceCallServerListStrategyTest.java
 
b/components/camel-etcd/src/test/java/org/apache/camel/component/etcd/processor/remote/EtcdServiceCallServerListStrategyTest.java
new file mode 100644
index 0000000..55423be
--- /dev/null
+++ 
b/components/camel-etcd/src/test/java/org/apache/camel/component/etcd/processor/remote/EtcdServiceCallServerListStrategyTest.java
@@ -0,0 +1,110 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.camel.component.etcd.processor.remote;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import mousio.etcd4j.EtcdClient;
+import mousio.etcd4j.responses.EtcdException;
+import org.apache.camel.component.etcd.EtcdConfiguration;
+import org.apache.camel.component.etcd.EtcdHelper;
+import org.apache.camel.component.etcd.EtcdTestSupport;
+import org.junit.Ignore;
+import org.junit.Test;
+
+@Ignore
+public class EtcdServiceCallServerListStrategyTest extends EtcdTestSupport {
+    private static final ObjectMapper MAPPER = EtcdHelper.createObjectMapper();
+    private static final EtcdConfiguration CONFIGURATION = new 
EtcdConfiguration(null);
+    private static final AtomicInteger PORT = new AtomicInteger(0);
+
+    private EtcdClient client;
+
+    @Override
+    public void doPreSetup() throws Exception {
+        client = getClient();
+        try {
+            
client.deleteDir(CONFIGURATION.getServicePath()).recursive().send().get();
+        } catch (EtcdException e) {
+            // Ignore
+        }
+    }
+
+    @Override
+    public void tearDown() throws Exception {
+        try {
+            
client.deleteDir(CONFIGURATION.getServicePath()).recursive().send().get();
+            client.close();
+            client = null;
+        } catch (EtcdException e) {
+            // Ignore
+        }
+    }
+
+    @Test
+    public void testOnDemandStrategy() throws Exception {
+        for (int i = 0; i < 3; i++) {
+            addServer(client, "serviceType-1");
+        }
+        for (int i = 0; i < 2; i++) {
+            addServer(client, "serviceType-2");
+        }
+
+        EtcdServiceCallServerListStrategy strategy = 
EtcdServiceCallServerListStrategies.onDemand(CONFIGURATION);
+        strategy.start();
+
+        assertEquals(3, 
strategy.getUpdatedListOfServers("serviceType-1").size());
+        assertEquals(2, 
strategy.getUpdatedListOfServers("serviceType-2").size());
+
+        strategy.stop();
+    }
+
+    @Test
+    public void testWatchStrategy() throws Exception {
+        addServer(client, "serviceType-3");
+
+        EtcdServiceCallServerListStrategy strategy = 
EtcdServiceCallServerListStrategies.watch(CONFIGURATION);
+        strategy.start();
+
+        assertEquals(1, 
strategy.getUpdatedListOfServers("serviceType-3").size());
+
+        addServer(client, "serviceType-3");
+        addServer(client, "serviceType-3");
+        addServer(client, "serviceType-4");
+
+        Thread.sleep(250);
+
+        assertEquals(3, 
strategy.getUpdatedListOfServers("serviceType-3").size());
+
+        strategy.stop();
+    }
+
+    private void addServer(EtcdClient client, String name) throws Exception {
+        int port = PORT.incrementAndGet();
+
+        Map<String, Object> server = new HashMap<>();
+        server.put("name", name);
+        server.put("address", "127.0.0.1");
+        server.put("port", 8000 + port);
+
+        client.put(CONFIGURATION.getServicePath() + "service-" + port, 
MAPPER.writeValueAsString(server)).send().get();
+    }
+}

Reply via email to