This is an automated email from the ASF dual-hosted git repository.

victory pushed a commit to branch cloud-native
in repository https://gitbox.apache.org/repos/asf/dubbo.git

commit cfbe5af378381681661bafaf1d11dc54754ccba7
Author: cvictory <[email protected]>
AuthorDate: Thu Jul 11 19:19:17 2019 +0800

    store temperily
---
 .../bootstrap/DubboServiceConsumerBootstrap.java   |  16 ++--
 .../dubbo/registry/etcd/EtcdServiceDiscovery.java  | 100 +++++++++++++++------
 .../zookeeper/ZookeeperServiceDiscovery.java       |   2 +-
 3 files changed, 84 insertions(+), 34 deletions(-)

diff --git 
a/dubbo-bootstrap/src/test/java/org/apache/dubbo/bootstrap/DubboServiceConsumerBootstrap.java
 
b/dubbo-bootstrap/src/test/java/org/apache/dubbo/bootstrap/DubboServiceConsumerBootstrap.java
index 4e03c1b..48ac788 100644
--- 
a/dubbo-bootstrap/src/test/java/org/apache/dubbo/bootstrap/DubboServiceConsumerBootstrap.java
+++ 
b/dubbo-bootstrap/src/test/java/org/apache/dubbo/bootstrap/DubboServiceConsumerBootstrap.java
@@ -16,9 +16,11 @@
  */
 package org.apache.dubbo.bootstrap;
 
+import org.apache.dubbo.config.ReferenceConfig;
 import org.apache.dubbo.config.builders.ApplicationBuilder;
 import org.apache.dubbo.config.builders.ReferenceBuilder;
 import org.apache.dubbo.config.builders.RegistryBuilder;
+import org.apache.dubbo.config.utils.ReferenceConfigCache;
 
 /**
  * Dubbo Provider Bootstrap
@@ -38,14 +40,12 @@ public class DubboServiceConsumerBootstrap {
                 .await();
 
         // TODO,
-//        ReferenceConfig<EchoService> referenceConfig = 
ReferenceConfigCache.getCache().get(EchoService.class.getName(), 
EchoService.class);
-//
-//        EchoService echoService = referenceConfig.get();
-//
-//        for (int i = 0; i < 500; i++) {
-//            Thread.sleep(2000L);
-//            System.out.println(echoService.echo("Hello,World"));
-//        }
+        EchoService echoService = 
ReferenceConfigCache.getCache().get(EchoService.class.getName(), 
EchoService.class);
+
+        for (int i = 0; i < 500; i++) {
+            Thread.sleep(2000L);
+            System.out.println(echoService.echo("Hello,World"));
+        }
 
     }
 }
diff --git 
a/dubbo-registry/dubbo-registry-etcd3/src/main/java/org/apache/dubbo/registry/etcd/EtcdServiceDiscovery.java
 
b/dubbo-registry/dubbo-registry-etcd3/src/main/java/org/apache/dubbo/registry/etcd/EtcdServiceDiscovery.java
index 60075d4..e5a1a62 100644
--- 
a/dubbo-registry/dubbo-registry-etcd3/src/main/java/org/apache/dubbo/registry/etcd/EtcdServiceDiscovery.java
+++ 
b/dubbo-registry/dubbo-registry-etcd3/src/main/java/org/apache/dubbo/registry/etcd/EtcdServiceDiscovery.java
@@ -27,12 +27,22 @@ import org.apache.dubbo.registry.client.ServiceInstance;
 import org.apache.dubbo.registry.client.event.ServiceInstancesChangedEvent;
 import 
org.apache.dubbo.registry.client.event.listener.ServiceInstancesChangedListener;
 import org.apache.dubbo.remoting.etcd.ChildListener;
+import org.apache.dubbo.remoting.etcd.EtcdClient;
 import org.apache.dubbo.remoting.etcd.EtcdTransporter;
+import org.apache.dubbo.remoting.etcd.StateListener;
+import org.apache.dubbo.remoting.etcd.option.OptionUtil;
+import org.apache.dubbo.rpc.RpcException;
 
+import com.google.gson.Gson;
+
+import java.io.File;
+import java.util.Collections;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 
+import static org.apache.dubbo.common.constants.RegistryConstants.DYNAMIC_KEY;
+
 /**
  * @author cvictory ON 2019-07-08
  */
@@ -40,33 +50,28 @@ public class EtcdServiceDiscovery implements 
ServiceDiscovery, EventListener<Ser
 
     private final static Logger logger = 
LoggerFactory.getLogger(EtcdServiceDiscovery.class);
 
-//    private final String root;
+    private final String root = "/services";
 
-    private final Set<String> anyServices = new ConcurrentHashSet<>();
+    private final Set<String> services = new ConcurrentHashSet<>();
 
     private final ConcurrentMap<URL, ConcurrentMap<NotifyListener, 
ChildListener>> etcdListeners = new ConcurrentHashMap<>();
-//    private final EtcdClient etcdClient;
+    private final EtcdClient etcdClient;
 
     public EtcdServiceDiscovery(URL url, EtcdTransporter etcdTransporter) {
-//        if (url.isAnyHost()) {
-//            throw new IllegalStateException("Service discovery address is 
invalid, actual: '" + url.getHost() + "'");
-//        }
-////        String group = url.getParameter(GROUP_KEY, DEFAULT_ROOT);
-//        if (!group.startsWith(PATH_SEPARATOR)) {
-//            group = PATH_SEPARATOR + group;
-//        }
-//        this.root = group;
-//        etcdClient = etcdTransporter.connect(url);
-//
-//        etcdClient.addStateListener(state -> {
-//            if (state == StateListener.CONNECTED) {
-//                try {
-////                    recover();
-//                } catch (Exception e) {
-//                    logger.error(e.getMessage(), e);
-//                }
-//            }
-//        });
+        if (url.isAnyHost()) {
+            throw new IllegalStateException("Service discovery address is 
invalid, actual: '" + url.getHost() + "'");
+        }
+        etcdClient = etcdTransporter.connect(url);
+
+        etcdClient.addStateListener(state -> {
+            if (state == StateListener.CONNECTED) {
+                try {
+//                    recover();
+                } catch (Exception e) {
+                    logger.error(e.getMessage(), e);
+                }
+            }
+        });
     }
 
     @Override
@@ -86,26 +91,71 @@ public class EtcdServiceDiscovery implements 
ServiceDiscovery, EventListener<Ser
 
     @Override
     public void register(ServiceInstance serviceInstance) throws 
RuntimeException {
+        try {
+            String path = toPath(serviceInstance);
+            etcdClient.put(path, new Gson().toJson(serviceInstance));
+            services.add(serviceInstance.getServiceName());
+        } catch (Throwable e) {
+            throw new RpcException("Failed to register " + serviceInstance + " 
to etcd " + etcdClient.getUrl()
+                    + ", cause: " + (OptionUtil.isProtocolError(e)
+                    ? "etcd3 registry may not be supported yet or etcd3 
registry is not available."
+                    : e.getMessage()), e);
+        }
+    }
 
+    String toPath(ServiceInstance serviceInstance) {
+        return root + File.separator + serviceInstance.getServiceName() + 
File.separator + serviceInstance.getId();
     }
 
     @Override
     public void update(ServiceInstance serviceInstance) throws 
RuntimeException {
-
+        try {
+            String path = toPath(serviceInstance);
+            etcdClient.put(path, new Gson().toJson(serviceInstance));
+            services.add(serviceInstance.getServiceName());
+        } catch (Throwable e) {
+            throw new RpcException("Failed to register " + serviceInstance + " 
to etcd " + etcdClient.getUrl()
+                    + ", cause: " + (OptionUtil.isProtocolError(e)
+                    ? "etcd3 registry may not be supported yet or etcd3 
registry is not available."
+                    : e.getMessage()), e);
+        }
     }
 
     @Override
     public void unregister(ServiceInstance serviceInstance) throws 
RuntimeException {
-
+        try {
+            String path = toPath(serviceInstance);
+            etcdClient.delete(path);
+            services.remove(serviceInstance.getServiceName());
+        } catch (Throwable e) {
+            throw new RpcException("Failed to unregister " + serviceInstance + 
" to etcd " + etcdClient.getUrl() + ", cause: " + e.getMessage(), e);
+        }
     }
 
     @Override
     public Set<String> getServices() {
-        return null;
+        return Collections.unmodifiableSet(services);
     }
 
     @Override
     public void addServiceInstancesChangedListener(String serviceName, 
ServiceInstancesChangedListener listener) throws NullPointerException, 
IllegalArgumentException {
 
     }
+
+    protected void registerServiceWatcher(String serviceName) {
+        String path = buildServicePath(serviceName);
+        CuratorWatcher watcher = watcherCaches.computeIfAbsent(path, key ->
+                new ZookeeperServiceDiscoveryChangeWatcher(this, serviceName, 
dispatcher));
+        try {
+            etcdClient.
+            curatorFramework.getChildren().usingWatcher(watcher).forPath(path);
+        } catch (KeeperException.NoNodeException e) {
+            // ignored
+            if (logger.isErrorEnabled()) {
+                logger.error(e.getMessage());
+            }
+        } catch (Exception e) {
+            throw new IllegalStateException(e.getMessage(), e);
+        }
+    }
 }
diff --git 
a/dubbo-registry/dubbo-registry-zookeeper/src/main/java/org/apache/dubbo/registry/zookeeper/ZookeeperServiceDiscovery.java
 
b/dubbo-registry/dubbo-registry-zookeeper/src/main/java/org/apache/dubbo/registry/zookeeper/ZookeeperServiceDiscovery.java
index 6776a74..888972a 100644
--- 
a/dubbo-registry/dubbo-registry-zookeeper/src/main/java/org/apache/dubbo/registry/zookeeper/ZookeeperServiceDiscovery.java
+++ 
b/dubbo-registry/dubbo-registry-zookeeper/src/main/java/org/apache/dubbo/registry/zookeeper/ZookeeperServiceDiscovery.java
@@ -156,4 +156,4 @@ public class ZookeeperServiceDiscovery implements 
ServiceDiscovery, EventListene
         // re-register again
         registerServiceWatcher(serviceName);
     }
-}
\ No newline at end of file
+}

Reply via email to