This is an automated email from the ASF dual-hosted git repository. victory pushed a commit to branch cloud-native in repository https://gitbox.apache.org/repos/asf/dubbo.git
commit 1aae6956dcca2b9d4982d8ec107dbf4ae2481fda Author: cvictory <[email protected]> AuthorDate: Mon Jul 22 16:08:33 2019 +0800 temporily commit --- .../bootstrap/DubboServiceProviderBootstrap.java | 4 ++-- .../dubbo/registry/etcd/EtcdServiceDiscovery.java | 19 +++++++++++++++++-- .../org/apache/dubbo/remoting/etcd/EtcdClient.java | 8 ++++++++ .../dubbo/remoting/etcd/jetcd/JEtcdClient.java | 5 +++++ .../remoting/etcd/jetcd/JEtcdClientWrapper.java | 20 ++++++++++++++++++++ 5 files changed, 52 insertions(+), 4 deletions(-) diff --git a/dubbo-bootstrap/src/test/java/org/apache/dubbo/bootstrap/DubboServiceProviderBootstrap.java b/dubbo-bootstrap/src/test/java/org/apache/dubbo/bootstrap/DubboServiceProviderBootstrap.java index c68fbd0..4e46a67 100644 --- a/dubbo-bootstrap/src/test/java/org/apache/dubbo/bootstrap/DubboServiceProviderBootstrap.java +++ b/dubbo-bootstrap/src/test/java/org/apache/dubbo/bootstrap/DubboServiceProviderBootstrap.java @@ -38,8 +38,8 @@ public class DubboServiceProviderBootstrap { // .metadataReport(MetadataReportBuilder.newBuilder().address("zookeeper://127.0.0.1:2181").build()) .metadataReport(MetadataReportBuilder.newBuilder().address("zookeeper://127.0.0.1:2181").build()) // .application(ApplicationBuilder.newBuilder().name("dubbo-provider-demo").build()) -// .registry(RegistryBuilder.newBuilder().address("zookeeper://127.0.0.1:2181?registry-type=service").build()) - .registry(RegistryBuilder.newBuilder().address("etcd3://127.0.0.1:2379?registry-type=service").build()) + .registry(RegistryBuilder.newBuilder().address("zookeeper://127.0.0.1:2181?registry-type=service").build()) +// .registry(RegistryBuilder.newBuilder().address("etcd3://127.0.0.1:2379?registry-type=service").build()) .protocol(ProtocolBuilder.newBuilder().port(-1).name("dubbo").build()) .service(ServiceBuilder.newBuilder().id("test").interfaceClass(EchoService.class).ref(new EchoServiceImpl()).build()) .start() diff --git a/dubbo-registry/dubbo-registry-etcd3/src/main/java/org/apache/dubbo/registry/etcd/EtcdServiceDiscovery.java b/dubbo-registry/dubbo-registry-etcd3/src/main/java/org/apache/dubbo/registry/etcd/EtcdServiceDiscovery.java index 9fcc5d4..4cce645 100644 --- a/dubbo-registry/dubbo-registry-etcd3/src/main/java/org/apache/dubbo/registry/etcd/EtcdServiceDiscovery.java +++ b/dubbo-registry/dubbo-registry-etcd3/src/main/java/org/apache/dubbo/registry/etcd/EtcdServiceDiscovery.java @@ -38,6 +38,8 @@ import com.google.gson.Gson; import java.io.File; import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Optional; @@ -62,6 +64,7 @@ public class EtcdServiceDiscovery implements ServiceDiscovery, EventListener<Ser private final ConcurrentMap<URL, ConcurrentMap<NotifyListener, ChildListener>> etcdListeners = new ConcurrentHashMap<>(); private final EtcdClient etcdClient; private final EventDispatcher dispatcher; + private ServiceInstance serviceInstance; public EtcdServiceDiscovery(URL url, EtcdTransporter etcdTransporter) { if (url.isAnyHost()) { @@ -72,7 +75,7 @@ public class EtcdServiceDiscovery implements ServiceDiscovery, EventListener<Ser etcdClient.addStateListener(state -> { if (state == StateListener.CONNECTED) { try { -// recover(); + recover(); } catch (Exception e) { logger.error(e.getMessage(), e); } @@ -101,8 +104,9 @@ public class EtcdServiceDiscovery implements ServiceDiscovery, EventListener<Ser @Override public void register(ServiceInstance serviceInstance) throws RuntimeException { try { + this.serviceInstance = serviceInstance; String path = toPath(serviceInstance); - etcdClient.put(path, new Gson().toJson(serviceInstance)); + etcdClient.putEphemeral(path, new Gson().toJson(serviceInstance)); services.add(serviceInstance.getServiceName()); } catch (Throwable e) { throw new RpcException("Failed to register " + serviceInstance + " to etcd " + etcdClient.getUrl() @@ -137,6 +141,7 @@ public class EtcdServiceDiscovery implements ServiceDiscovery, EventListener<Ser String path = toPath(serviceInstance); etcdClient.delete(path); services.remove(serviceInstance.getServiceName()); + this.serviceInstance = null; } catch (Throwable e) { throw new RpcException("Failed to unregister " + serviceInstance + " to etcd " + etcdClient.getUrl() + ", cause: " + e.getMessage(), e); } @@ -175,4 +180,14 @@ public class EtcdServiceDiscovery implements ServiceDiscovery, EventListener<Ser */ List<String> children = etcdClient.addChildListener(path, childListener); } + + private void recover() throws Exception { + // register + if (serviceInstance != null) { + if (logger.isInfoEnabled()) { + logger.info("Recover application register: " + serviceInstance); + } + register(serviceInstance); + } + } } diff --git a/dubbo-remoting/dubbo-remoting-etcd3/src/main/java/org/apache/dubbo/remoting/etcd/EtcdClient.java b/dubbo-remoting/dubbo-remoting-etcd3/src/main/java/org/apache/dubbo/remoting/etcd/EtcdClient.java index 286be93..e23b870 100644 --- a/dubbo-remoting/dubbo-remoting-etcd3/src/main/java/org/apache/dubbo/remoting/etcd/EtcdClient.java +++ b/dubbo-remoting/dubbo-remoting-etcd3/src/main/java/org/apache/dubbo/remoting/etcd/EtcdClient.java @@ -180,4 +180,12 @@ public interface EtcdClient { */ boolean put(String key, String value); + /** + * Put the key value pair to etcd (Ephemeral) + * @param key the specified key + * @param value the paired value + * @return true if put success + */ + boolean putEphemeral(String key, String value); + } diff --git a/dubbo-remoting/dubbo-remoting-etcd3/src/main/java/org/apache/dubbo/remoting/etcd/jetcd/JEtcdClient.java b/dubbo-remoting/dubbo-remoting-etcd3/src/main/java/org/apache/dubbo/remoting/etcd/jetcd/JEtcdClient.java index 4c055b4..01a6025 100644 --- a/dubbo-remoting/dubbo-remoting-etcd3/src/main/java/org/apache/dubbo/remoting/etcd/jetcd/JEtcdClient.java +++ b/dubbo-remoting/dubbo-remoting-etcd3/src/main/java/org/apache/dubbo/remoting/etcd/jetcd/JEtcdClient.java @@ -203,6 +203,11 @@ public class JEtcdClient extends AbstractEtcdClient<JEtcdClient.EtcdWatcher> { return clientWrapper.put(key, value); } + @Override + public boolean putEphemeral(String key, String value) { + return clientWrapper.put(key, value); + } + public ManagedChannel getChannel() { return clientWrapper.getChannel(); } diff --git a/dubbo-remoting/dubbo-remoting-etcd3/src/main/java/org/apache/dubbo/remoting/etcd/jetcd/JEtcdClientWrapper.java b/dubbo-remoting/dubbo-remoting-etcd3/src/main/java/org/apache/dubbo/remoting/etcd/jetcd/JEtcdClientWrapper.java index 7ef6777..f635005 100644 --- a/dubbo-remoting/dubbo-remoting-etcd3/src/main/java/org/apache/dubbo/remoting/etcd/jetcd/JEtcdClientWrapper.java +++ b/dubbo-remoting/dubbo-remoting-etcd3/src/main/java/org/apache/dubbo/remoting/etcd/jetcd/JEtcdClientWrapper.java @@ -667,6 +667,26 @@ public class JEtcdClientWrapper { return false; } + public boolean putEphemeral(final String key, String value) { + try { + return RetryLoops.invokeWithRetry( + () -> { + requiredNotNull(client, failed); + // recovery an retry + keepAlive(); + final long leaseId = globalLeaseId; + client.getKVClient() + .put(ByteSequence.from(key, UTF_8) + , ByteSequence.from(String.valueOf(value), UTF_8) + , PutOption.newBuilder().withLeaseId(leaseId).build()) + .get(DEFAULT_REQUEST_TIMEOUT, TimeUnit.MILLISECONDS); + return true; + }, retryPolicy); + } catch (Exception e) { + throw new IllegalStateException(e.getMessage(), e); + } + } + private void retry() { if (!failedRegistered.isEmpty()) { Set<String> failed = new HashSet<String>(failedRegistered);
