This is an automated email from the ASF dual-hosted git repository.

liujun pushed a commit to branch cloud-native
in repository https://gitbox.apache.org/repos/asf/dubbo.git

commit 2eaa132b20eae50f18e391a7418ec24f4cff439e
Author: ken.lj <[email protected]>
AuthorDate: Mon Aug 12 16:52:46 2019 +0800

    consul complement
---
 .../registry/consul/ConsulServiceDiscovery.java    | 112 ++++++++++++++++++---
 ...e.dubbo.registry.client.ServiceDiscoveryFactory |   1 +
 2 files changed, 99 insertions(+), 14 deletions(-)

diff --git 
a/dubbo-registry/dubbo-registry-consul/src/main/java/org/apache/dubbo/registry/consul/ConsulServiceDiscovery.java
 
b/dubbo-registry/dubbo-registry-consul/src/main/java/org/apache/dubbo/registry/consul/ConsulServiceDiscovery.java
index 3f8f6bd..ce7efc6 100644
--- 
a/dubbo-registry/dubbo-registry-consul/src/main/java/org/apache/dubbo/registry/consul/ConsulServiceDiscovery.java
+++ 
b/dubbo-registry/dubbo-registry-consul/src/main/java/org/apache/dubbo/registry/consul/ConsulServiceDiscovery.java
@@ -17,18 +17,20 @@
 package org.apache.dubbo.registry.consul;
 
 import org.apache.dubbo.common.URL;
+import org.apache.dubbo.common.logger.Logger;
+import org.apache.dubbo.common.logger.LoggerFactory;
 import org.apache.dubbo.common.utils.NamedThreadFactory;
-import org.apache.dubbo.event.EventDispatcher;
 import org.apache.dubbo.event.EventListener;
-import org.apache.dubbo.registry.NotifyListener;
 import org.apache.dubbo.registry.client.ServiceDiscovery;
 import org.apache.dubbo.registry.client.ServiceInstance;
 import org.apache.dubbo.registry.client.event.ServiceInstancesChangedEvent;
 import 
org.apache.dubbo.registry.client.event.listener.ServiceInstancesChangedListener;
 
 import com.ecwid.consul.v1.ConsulClient;
+import com.ecwid.consul.v1.QueryParams;
 import com.ecwid.consul.v1.Response;
 import com.ecwid.consul.v1.agent.model.NewService;
+import com.ecwid.consul.v1.health.HealthServicesRequest;
 import com.ecwid.consul.v1.health.model.HealthService;
 
 import java.util.Collections;
@@ -36,15 +38,14 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
 
 import static java.util.concurrent.Executors.newCachedThreadPool;
-import static org.apache.dubbo.common.constants.CommonConstants.ANY_VALUE;
 import static 
org.apache.dubbo.registry.consul.AbstractConsulRegistry.CHECK_PASS_INTERVAL;
 import static 
org.apache.dubbo.registry.consul.AbstractConsulRegistry.DEFAULT_CHECK_PASS_INTERVAL;
 import static 
org.apache.dubbo.registry.consul.AbstractConsulRegistry.DEFAULT_DEREGISTER_TIME;
@@ -59,22 +60,20 @@ import static 
org.apache.dubbo.registry.consul.AbstractConsulRegistry.WATCH_TIME
  * 2019-07-31
  */
 public class ConsulServiceDiscovery implements ServiceDiscovery, 
EventListener<ServiceInstancesChangedEvent> {
+    private static final Logger logger = 
LoggerFactory.getLogger(ConsulServiceDiscovery.class);
 
     private ConsulClient client;
-    private long checkPassInterval;
     private ExecutorService notifierExecutor = newCachedThreadPool(
             new NamedThreadFactory("dubbo-consul-notifier", true));
-    private ScheduledExecutorService ttlConsulCheckExecutor;
-
+    private TtlScheduler ttlScheduler;
+    private long checkPassInterval;
 
     public ConsulServiceDiscovery(URL url) {
         String host = url.getHost();
         int port = url.getPort() != 0 ? url.getPort() : DEFAULT_PORT;
-        client = new ConsulClient(host, port);
         checkPassInterval = url.getParameter(CHECK_PASS_INTERVAL, 
DEFAULT_CHECK_PASS_INTERVAL);
-        ttlConsulCheckExecutor = Executors.newSingleThreadScheduledExecutor();
-//        ttlConsulCheckExecutor.scheduleAtFixedRate(this::checkPass, 
checkPassInterval / 8,
-//                checkPassInterval / 8, TimeUnit.MILLISECONDS);
+        client = new ConsulClient(host, port);
+        ttlScheduler = new TtlScheduler(checkPassInterval, client);
     }
 
     @Override
@@ -94,7 +93,9 @@ public class ConsulServiceDiscovery implements 
ServiceDiscovery, EventListener<S
 
     @Override
     public void register(ServiceInstance serviceInstance) throws 
RuntimeException {
-        client.agentServiceRegister(buildService(serviceInstance));
+        NewService consulService = buildService(serviceInstance);
+        ttlScheduler.add(consulService.getId());
+        client.agentServiceRegister(consulService);
     }
 
     @Override
@@ -104,7 +105,9 @@ public class ConsulServiceDiscovery implements 
ServiceDiscovery, EventListener<S
 
     @Override
     public void unregister(ServiceInstance serviceInstance) throws 
RuntimeException {
-        client.agentServiceDeregister(buildId(serviceInstance));
+        String id = buildId(serviceInstance);
+        ttlScheduler.remove(id);
+        client.agentServiceDeregister(id);
     }
 
     @Override
@@ -117,6 +120,22 @@ public class ConsulServiceDiscovery implements 
ServiceDiscovery, EventListener<S
 
     }
 
+    @Override
+    public List<ServiceInstance> getInstances(String serviceName) throws 
NullPointerException {
+        List<ServiceInstance> instances;
+        Response<List<HealthService>> response = 
getHealthServices(serviceName, -1, buildWatchTimeout(url));
+        urls = convert(response.getValue(), url);
+    }
+
+    private Response<List<HealthService>> getHealthServices(String service, 
long index, int watchTimeout) {
+        HealthServicesRequest request = HealthServicesRequest.newBuilder()
+                .setTag(SERVICE_TAG)
+                .setQueryParams(new QueryParams(watchTimeout, index))
+                .setPassing(true)
+                .build();
+        return client.getHealthServices(service, request);
+    }
+
     private NewService buildService(ServiceInstance serviceInstance) {
         NewService service = new NewService();
         service.setAddress(serviceInstance.getHost());
@@ -130,7 +149,6 @@ public class ConsulServiceDiscovery implements 
ServiceDiscovery, EventListener<S
     }
 
     private String buildId(ServiceInstance serviceInstance) {
-        // let's simply use url's hashcode to generate unique service id for 
now
         return Integer.toHexString(serviceInstance.hashCode());
     }
 
@@ -193,4 +211,70 @@ public class ConsulServiceDiscovery implements 
ServiceDiscovery, EventListener<S
             this.running = false;
         }
     }
+
+    private static class TtlScheduler {
+
+        private static final Logger logger = 
LoggerFactory.getLogger(TtlScheduler.class);
+
+        private final Map<String, ScheduledFuture> serviceHeartbeats = new 
ConcurrentHashMap<>();
+
+        private ScheduledExecutorService scheduler = 
Executors.newSingleThreadScheduledExecutor();
+        ;
+
+        private long checkInterval;
+
+        private ConsulClient client;
+
+        public TtlScheduler(long checkInterval, ConsulClient client) {
+            this.checkInterval = checkInterval;
+            this.client = client;
+        }
+
+        /**
+         * Add a service to the checks loop.
+         *
+         * @param instanceId instance id
+         */
+        public void add(String instanceId) {
+            ScheduledFuture task = this.scheduler.scheduleAtFixedRate(
+                    new ConsulHeartbeatTask(instanceId),
+                    checkInterval / 8,
+                    checkInterval / 8,
+                    TimeUnit.MILLISECONDS);
+            ScheduledFuture previousTask = 
this.serviceHeartbeats.put(instanceId, task);
+            if (previousTask != null) {
+                previousTask.cancel(true);
+            }
+        }
+
+        public void remove(String instanceId) {
+            ScheduledFuture task = this.serviceHeartbeats.get(instanceId);
+            if (task != null) {
+                task.cancel(true);
+            }
+            this.serviceHeartbeats.remove(instanceId);
+        }
+
+        private class ConsulHeartbeatTask implements Runnable {
+
+            private String checkId;
+
+            ConsulHeartbeatTask(String serviceId) {
+                this.checkId = serviceId;
+                if (!this.checkId.startsWith("service:")) {
+                    this.checkId = "service:" + this.checkId;
+                }
+            }
+
+            @Override
+            public void run() {
+                TtlScheduler.this.client.agentCheckPass(this.checkId);
+                if (logger.isInfoEnabled()) {
+                    logger.info("Sending consul heartbeat for: " + 
this.checkId);
+                }
+            }
+
+        }
+
+    }
 }
diff --git 
a/dubbo-registry/dubbo-registry-consul/src/main/resources/META-INF/dubbo/internal/org.apache.dubbo.registry.client.ServiceDiscoveryFactory
 
b/dubbo-registry/dubbo-registry-consul/src/main/resources/META-INF/dubbo/internal/org.apache.dubbo.registry.client.ServiceDiscoveryFactory
new file mode 100644
index 0000000..a0f1252
--- /dev/null
+++ 
b/dubbo-registry/dubbo-registry-consul/src/main/resources/META-INF/dubbo/internal/org.apache.dubbo.registry.client.ServiceDiscoveryFactory
@@ -0,0 +1 @@
+consul=org.apache.dubbo.registry.consul.ConsulServiceDiscoveryFactory
\ No newline at end of file

Reply via email to