This is an automated email from the ASF dual-hosted git repository. victory pushed a commit to branch cloud-native in repository https://gitbox.apache.org/repos/asf/dubbo.git
commit cfbe5af378381681661bafaf1d11dc54754ccba7 Author: cvictory <[email protected]> AuthorDate: Thu Jul 11 19:19:17 2019 +0800 store temperily --- .../bootstrap/DubboServiceConsumerBootstrap.java | 16 ++-- .../dubbo/registry/etcd/EtcdServiceDiscovery.java | 100 +++++++++++++++------ .../zookeeper/ZookeeperServiceDiscovery.java | 2 +- 3 files changed, 84 insertions(+), 34 deletions(-) diff --git a/dubbo-bootstrap/src/test/java/org/apache/dubbo/bootstrap/DubboServiceConsumerBootstrap.java b/dubbo-bootstrap/src/test/java/org/apache/dubbo/bootstrap/DubboServiceConsumerBootstrap.java index 4e03c1b..48ac788 100644 --- a/dubbo-bootstrap/src/test/java/org/apache/dubbo/bootstrap/DubboServiceConsumerBootstrap.java +++ b/dubbo-bootstrap/src/test/java/org/apache/dubbo/bootstrap/DubboServiceConsumerBootstrap.java @@ -16,9 +16,11 @@ */ package org.apache.dubbo.bootstrap; +import org.apache.dubbo.config.ReferenceConfig; import org.apache.dubbo.config.builders.ApplicationBuilder; import org.apache.dubbo.config.builders.ReferenceBuilder; import org.apache.dubbo.config.builders.RegistryBuilder; +import org.apache.dubbo.config.utils.ReferenceConfigCache; /** * Dubbo Provider Bootstrap @@ -38,14 +40,12 @@ public class DubboServiceConsumerBootstrap { .await(); // TODO, -// ReferenceConfig<EchoService> referenceConfig = ReferenceConfigCache.getCache().get(EchoService.class.getName(), EchoService.class); -// -// EchoService echoService = referenceConfig.get(); -// -// for (int i = 0; i < 500; i++) { -// Thread.sleep(2000L); -// System.out.println(echoService.echo("Hello,World")); -// } + EchoService echoService = ReferenceConfigCache.getCache().get(EchoService.class.getName(), EchoService.class); + + for (int i = 0; i < 500; i++) { + Thread.sleep(2000L); + System.out.println(echoService.echo("Hello,World")); + } } } diff --git a/dubbo-registry/dubbo-registry-etcd3/src/main/java/org/apache/dubbo/registry/etcd/EtcdServiceDiscovery.java b/dubbo-registry/dubbo-registry-etcd3/src/main/java/org/apache/dubbo/registry/etcd/EtcdServiceDiscovery.java index 60075d4..e5a1a62 100644 --- a/dubbo-registry/dubbo-registry-etcd3/src/main/java/org/apache/dubbo/registry/etcd/EtcdServiceDiscovery.java +++ b/dubbo-registry/dubbo-registry-etcd3/src/main/java/org/apache/dubbo/registry/etcd/EtcdServiceDiscovery.java @@ -27,12 +27,22 @@ import org.apache.dubbo.registry.client.ServiceInstance; import org.apache.dubbo.registry.client.event.ServiceInstancesChangedEvent; import org.apache.dubbo.registry.client.event.listener.ServiceInstancesChangedListener; import org.apache.dubbo.remoting.etcd.ChildListener; +import org.apache.dubbo.remoting.etcd.EtcdClient; import org.apache.dubbo.remoting.etcd.EtcdTransporter; +import org.apache.dubbo.remoting.etcd.StateListener; +import org.apache.dubbo.remoting.etcd.option.OptionUtil; +import org.apache.dubbo.rpc.RpcException; +import com.google.gson.Gson; + +import java.io.File; +import java.util.Collections; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; +import static org.apache.dubbo.common.constants.RegistryConstants.DYNAMIC_KEY; + /** * @author cvictory ON 2019-07-08 */ @@ -40,33 +50,28 @@ public class EtcdServiceDiscovery implements ServiceDiscovery, EventListener<Ser private final static Logger logger = LoggerFactory.getLogger(EtcdServiceDiscovery.class); -// private final String root; + private final String root = "/services"; - private final Set<String> anyServices = new ConcurrentHashSet<>(); + private final Set<String> services = new ConcurrentHashSet<>(); private final ConcurrentMap<URL, ConcurrentMap<NotifyListener, ChildListener>> etcdListeners = new ConcurrentHashMap<>(); -// private final EtcdClient etcdClient; + private final EtcdClient etcdClient; public EtcdServiceDiscovery(URL url, EtcdTransporter etcdTransporter) { -// if (url.isAnyHost()) { -// throw new IllegalStateException("Service discovery address is invalid, actual: '" + url.getHost() + "'"); -// } -//// String group = url.getParameter(GROUP_KEY, DEFAULT_ROOT); -// if (!group.startsWith(PATH_SEPARATOR)) { -// group = PATH_SEPARATOR + group; -// } -// this.root = group; -// etcdClient = etcdTransporter.connect(url); -// -// etcdClient.addStateListener(state -> { -// if (state == StateListener.CONNECTED) { -// try { -//// recover(); -// } catch (Exception e) { -// logger.error(e.getMessage(), e); -// } -// } -// }); + if (url.isAnyHost()) { + throw new IllegalStateException("Service discovery address is invalid, actual: '" + url.getHost() + "'"); + } + etcdClient = etcdTransporter.connect(url); + + etcdClient.addStateListener(state -> { + if (state == StateListener.CONNECTED) { + try { +// recover(); + } catch (Exception e) { + logger.error(e.getMessage(), e); + } + } + }); } @Override @@ -86,26 +91,71 @@ public class EtcdServiceDiscovery implements ServiceDiscovery, EventListener<Ser @Override public void register(ServiceInstance serviceInstance) throws RuntimeException { + try { + String path = toPath(serviceInstance); + etcdClient.put(path, new Gson().toJson(serviceInstance)); + services.add(serviceInstance.getServiceName()); + } catch (Throwable e) { + throw new RpcException("Failed to register " + serviceInstance + " to etcd " + etcdClient.getUrl() + + ", cause: " + (OptionUtil.isProtocolError(e) + ? "etcd3 registry may not be supported yet or etcd3 registry is not available." + : e.getMessage()), e); + } + } + String toPath(ServiceInstance serviceInstance) { + return root + File.separator + serviceInstance.getServiceName() + File.separator + serviceInstance.getId(); } @Override public void update(ServiceInstance serviceInstance) throws RuntimeException { - + try { + String path = toPath(serviceInstance); + etcdClient.put(path, new Gson().toJson(serviceInstance)); + services.add(serviceInstance.getServiceName()); + } catch (Throwable e) { + throw new RpcException("Failed to register " + serviceInstance + " to etcd " + etcdClient.getUrl() + + ", cause: " + (OptionUtil.isProtocolError(e) + ? "etcd3 registry may not be supported yet or etcd3 registry is not available." + : e.getMessage()), e); + } } @Override public void unregister(ServiceInstance serviceInstance) throws RuntimeException { - + try { + String path = toPath(serviceInstance); + etcdClient.delete(path); + services.remove(serviceInstance.getServiceName()); + } catch (Throwable e) { + throw new RpcException("Failed to unregister " + serviceInstance + " to etcd " + etcdClient.getUrl() + ", cause: " + e.getMessage(), e); + } } @Override public Set<String> getServices() { - return null; + return Collections.unmodifiableSet(services); } @Override public void addServiceInstancesChangedListener(String serviceName, ServiceInstancesChangedListener listener) throws NullPointerException, IllegalArgumentException { } + + protected void registerServiceWatcher(String serviceName) { + String path = buildServicePath(serviceName); + CuratorWatcher watcher = watcherCaches.computeIfAbsent(path, key -> + new ZookeeperServiceDiscoveryChangeWatcher(this, serviceName, dispatcher)); + try { + etcdClient. + curatorFramework.getChildren().usingWatcher(watcher).forPath(path); + } catch (KeeperException.NoNodeException e) { + // ignored + if (logger.isErrorEnabled()) { + logger.error(e.getMessage()); + } + } catch (Exception e) { + throw new IllegalStateException(e.getMessage(), e); + } + } } diff --git a/dubbo-registry/dubbo-registry-zookeeper/src/main/java/org/apache/dubbo/registry/zookeeper/ZookeeperServiceDiscovery.java b/dubbo-registry/dubbo-registry-zookeeper/src/main/java/org/apache/dubbo/registry/zookeeper/ZookeeperServiceDiscovery.java index 6776a74..888972a 100644 --- a/dubbo-registry/dubbo-registry-zookeeper/src/main/java/org/apache/dubbo/registry/zookeeper/ZookeeperServiceDiscovery.java +++ b/dubbo-registry/dubbo-registry-zookeeper/src/main/java/org/apache/dubbo/registry/zookeeper/ZookeeperServiceDiscovery.java @@ -156,4 +156,4 @@ public class ZookeeperServiceDiscovery implements ServiceDiscovery, EventListene // re-register again registerServiceWatcher(serviceName); } -} \ No newline at end of file +}
