This is an automated email from the ASF dual-hosted git repository. liujun pushed a commit to branch cloud-native in repository https://gitbox.apache.org/repos/asf/dubbo.git
commit 2eaa132b20eae50f18e391a7418ec24f4cff439e Author: ken.lj <[email protected]> AuthorDate: Mon Aug 12 16:52:46 2019 +0800 consul complement --- .../registry/consul/ConsulServiceDiscovery.java | 112 ++++++++++++++++++--- ...e.dubbo.registry.client.ServiceDiscoveryFactory | 1 + 2 files changed, 99 insertions(+), 14 deletions(-) diff --git a/dubbo-registry/dubbo-registry-consul/src/main/java/org/apache/dubbo/registry/consul/ConsulServiceDiscovery.java b/dubbo-registry/dubbo-registry-consul/src/main/java/org/apache/dubbo/registry/consul/ConsulServiceDiscovery.java index 3f8f6bd..ce7efc6 100644 --- a/dubbo-registry/dubbo-registry-consul/src/main/java/org/apache/dubbo/registry/consul/ConsulServiceDiscovery.java +++ b/dubbo-registry/dubbo-registry-consul/src/main/java/org/apache/dubbo/registry/consul/ConsulServiceDiscovery.java @@ -17,18 +17,20 @@ package org.apache.dubbo.registry.consul; import org.apache.dubbo.common.URL; +import org.apache.dubbo.common.logger.Logger; +import org.apache.dubbo.common.logger.LoggerFactory; import org.apache.dubbo.common.utils.NamedThreadFactory; -import org.apache.dubbo.event.EventDispatcher; import org.apache.dubbo.event.EventListener; -import org.apache.dubbo.registry.NotifyListener; import org.apache.dubbo.registry.client.ServiceDiscovery; import org.apache.dubbo.registry.client.ServiceInstance; import org.apache.dubbo.registry.client.event.ServiceInstancesChangedEvent; import org.apache.dubbo.registry.client.event.listener.ServiceInstancesChangedListener; import com.ecwid.consul.v1.ConsulClient; +import com.ecwid.consul.v1.QueryParams; import com.ecwid.consul.v1.Response; import com.ecwid.consul.v1.agent.model.NewService; +import com.ecwid.consul.v1.health.HealthServicesRequest; import com.ecwid.consul.v1.health.model.HealthService; import java.util.Collections; @@ -36,15 +38,14 @@ import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; import static java.util.concurrent.Executors.newCachedThreadPool; -import static org.apache.dubbo.common.constants.CommonConstants.ANY_VALUE; import static org.apache.dubbo.registry.consul.AbstractConsulRegistry.CHECK_PASS_INTERVAL; import static org.apache.dubbo.registry.consul.AbstractConsulRegistry.DEFAULT_CHECK_PASS_INTERVAL; import static org.apache.dubbo.registry.consul.AbstractConsulRegistry.DEFAULT_DEREGISTER_TIME; @@ -59,22 +60,20 @@ import static org.apache.dubbo.registry.consul.AbstractConsulRegistry.WATCH_TIME * 2019-07-31 */ public class ConsulServiceDiscovery implements ServiceDiscovery, EventListener<ServiceInstancesChangedEvent> { + private static final Logger logger = LoggerFactory.getLogger(ConsulServiceDiscovery.class); private ConsulClient client; - private long checkPassInterval; private ExecutorService notifierExecutor = newCachedThreadPool( new NamedThreadFactory("dubbo-consul-notifier", true)); - private ScheduledExecutorService ttlConsulCheckExecutor; - + private TtlScheduler ttlScheduler; + private long checkPassInterval; public ConsulServiceDiscovery(URL url) { String host = url.getHost(); int port = url.getPort() != 0 ? url.getPort() : DEFAULT_PORT; - client = new ConsulClient(host, port); checkPassInterval = url.getParameter(CHECK_PASS_INTERVAL, DEFAULT_CHECK_PASS_INTERVAL); - ttlConsulCheckExecutor = Executors.newSingleThreadScheduledExecutor(); -// ttlConsulCheckExecutor.scheduleAtFixedRate(this::checkPass, checkPassInterval / 8, -// checkPassInterval / 8, TimeUnit.MILLISECONDS); + client = new ConsulClient(host, port); + ttlScheduler = new TtlScheduler(checkPassInterval, client); } @Override @@ -94,7 +93,9 @@ public class ConsulServiceDiscovery implements ServiceDiscovery, EventListener<S @Override public void register(ServiceInstance serviceInstance) throws RuntimeException { - client.agentServiceRegister(buildService(serviceInstance)); + NewService consulService = buildService(serviceInstance); + ttlScheduler.add(consulService.getId()); + client.agentServiceRegister(consulService); } @Override @@ -104,7 +105,9 @@ public class ConsulServiceDiscovery implements ServiceDiscovery, EventListener<S @Override public void unregister(ServiceInstance serviceInstance) throws RuntimeException { - client.agentServiceDeregister(buildId(serviceInstance)); + String id = buildId(serviceInstance); + ttlScheduler.remove(id); + client.agentServiceDeregister(id); } @Override @@ -117,6 +120,22 @@ public class ConsulServiceDiscovery implements ServiceDiscovery, EventListener<S } + @Override + public List<ServiceInstance> getInstances(String serviceName) throws NullPointerException { + List<ServiceInstance> instances; + Response<List<HealthService>> response = getHealthServices(serviceName, -1, buildWatchTimeout(url)); + urls = convert(response.getValue(), url); + } + + private Response<List<HealthService>> getHealthServices(String service, long index, int watchTimeout) { + HealthServicesRequest request = HealthServicesRequest.newBuilder() + .setTag(SERVICE_TAG) + .setQueryParams(new QueryParams(watchTimeout, index)) + .setPassing(true) + .build(); + return client.getHealthServices(service, request); + } + private NewService buildService(ServiceInstance serviceInstance) { NewService service = new NewService(); service.setAddress(serviceInstance.getHost()); @@ -130,7 +149,6 @@ public class ConsulServiceDiscovery implements ServiceDiscovery, EventListener<S } private String buildId(ServiceInstance serviceInstance) { - // let's simply use url's hashcode to generate unique service id for now return Integer.toHexString(serviceInstance.hashCode()); } @@ -193,4 +211,70 @@ public class ConsulServiceDiscovery implements ServiceDiscovery, EventListener<S this.running = false; } } + + private static class TtlScheduler { + + private static final Logger logger = LoggerFactory.getLogger(TtlScheduler.class); + + private final Map<String, ScheduledFuture> serviceHeartbeats = new ConcurrentHashMap<>(); + + private ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor(); + ; + + private long checkInterval; + + private ConsulClient client; + + public TtlScheduler(long checkInterval, ConsulClient client) { + this.checkInterval = checkInterval; + this.client = client; + } + + /** + * Add a service to the checks loop. + * + * @param instanceId instance id + */ + public void add(String instanceId) { + ScheduledFuture task = this.scheduler.scheduleAtFixedRate( + new ConsulHeartbeatTask(instanceId), + checkInterval / 8, + checkInterval / 8, + TimeUnit.MILLISECONDS); + ScheduledFuture previousTask = this.serviceHeartbeats.put(instanceId, task); + if (previousTask != null) { + previousTask.cancel(true); + } + } + + public void remove(String instanceId) { + ScheduledFuture task = this.serviceHeartbeats.get(instanceId); + if (task != null) { + task.cancel(true); + } + this.serviceHeartbeats.remove(instanceId); + } + + private class ConsulHeartbeatTask implements Runnable { + + private String checkId; + + ConsulHeartbeatTask(String serviceId) { + this.checkId = serviceId; + if (!this.checkId.startsWith("service:")) { + this.checkId = "service:" + this.checkId; + } + } + + @Override + public void run() { + TtlScheduler.this.client.agentCheckPass(this.checkId); + if (logger.isInfoEnabled()) { + logger.info("Sending consul heartbeat for: " + this.checkId); + } + } + + } + + } } diff --git a/dubbo-registry/dubbo-registry-consul/src/main/resources/META-INF/dubbo/internal/org.apache.dubbo.registry.client.ServiceDiscoveryFactory b/dubbo-registry/dubbo-registry-consul/src/main/resources/META-INF/dubbo/internal/org.apache.dubbo.registry.client.ServiceDiscoveryFactory new file mode 100644 index 0000000..a0f1252 --- /dev/null +++ b/dubbo-registry/dubbo-registry-consul/src/main/resources/META-INF/dubbo/internal/org.apache.dubbo.registry.client.ServiceDiscoveryFactory @@ -0,0 +1 @@ +consul=org.apache.dubbo.registry.consul.ConsulServiceDiscoveryFactory \ No newline at end of file
