This is an automated email from the ASF dual-hosted git repository.

liujun pushed a commit to branch 3.0-k8s
in repository https://gitbox.apache.org/repos/asf/dubbo.git


The following commit(s) were added to refs/heads/3.0-k8s by this push:
     new 6581af4  [3.0 DNS] Move distribute metadata fetch to api module (#7092)
6581af4 is described below

commit 6581af46997b2ff8d12dcdc74247afc17d8a0096
Author: Albumen Kevin <[email protected]>
AuthorDate: Fri Jan 15 23:42:54 2021 +0800

    [3.0 DNS] Move distribute metadata fetch to api module (#7092)
---
 .../java/org/apache/dubbo/registry/Constants.java  |  10 +
 .../client/SelfHostMetaServiceDiscovery.java}      | 293 ++++++++-------------
 .../dubbo/registry/dns/DNSServiceDiscovery.java    | 233 +---------------
 .../dubbo/registry/dns/util/DNSClientConst.java    |  24 +-
 .../registry/dns/DNSServiceDiscoveryTest.java      |   5 +-
 5 files changed, 153 insertions(+), 412 deletions(-)

diff --git 
a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/Constants.java
 
b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/Constants.java
index 1bf4168..eee0665 100644
--- 
a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/Constants.java
+++ 
b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/Constants.java
@@ -88,4 +88,14 @@ public interface Constants {
     String REGISTRY_RETRY_PERIOD_KEY = "retry.period";
 
     String SESSION_TIMEOUT_KEY = "session";
+
+    /**
+     * To decide the frequency of checking Distributed Service Discovery 
Registry callback hook (in ms)
+     */
+    String ECHO_POLLING_CYCLE_KEY = "echoPollingCycle";
+
+    /**
+     * Default value for check frequency: 60000 (ms)
+     */
+    int DEFAULT_ECHO_POLLING_CYCLE = 60000;
 }
diff --git 
a/dubbo-registry/dubbo-registry-dns/src/main/java/org/apache/dubbo/registry/dns/DNSServiceDiscovery.java
 
b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/SelfHostMetaServiceDiscovery.java
similarity index 53%
copy from 
dubbo-registry/dubbo-registry-dns/src/main/java/org/apache/dubbo/registry/dns/DNSServiceDiscovery.java
copy to 
dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/SelfHostMetaServiceDiscovery.java
index 35596cf..25ab77e 100644
--- 
a/dubbo-registry/dubbo-registry-dns/src/main/java/org/apache/dubbo/registry/dns/DNSServiceDiscovery.java
+++ 
b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/SelfHostMetaServiceDiscovery.java
@@ -14,7 +14,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.dubbo.registry.dns;
+package org.apache.dubbo.registry.client;
 
 import org.apache.dubbo.common.URL;
 import org.apache.dubbo.common.constants.CommonConstants;
@@ -27,22 +27,15 @@ import 
org.apache.dubbo.metadata.InstanceMetadataChangedListener;
 import org.apache.dubbo.metadata.MetadataService;
 import org.apache.dubbo.metadata.RevisionResolver;
 import org.apache.dubbo.metadata.WritableMetadataService;
-import org.apache.dubbo.registry.client.DefaultServiceInstance;
-import org.apache.dubbo.registry.client.ServiceDiscovery;
-import org.apache.dubbo.registry.client.ServiceInstance;
+import org.apache.dubbo.registry.Constants;
 import org.apache.dubbo.registry.client.event.ServiceInstancesChangedEvent;
 import 
org.apache.dubbo.registry.client.event.listener.ServiceInstancesChangedListener;
 import org.apache.dubbo.registry.client.metadata.MetadataUtils;
-import org.apache.dubbo.registry.dns.util.DNSClientConst;
-import org.apache.dubbo.registry.dns.util.DNSResolver;
-import org.apache.dubbo.registry.dns.util.ResolveResult;
 import org.apache.dubbo.rpc.RpcException;
 import org.apache.dubbo.rpc.model.ApplicationModel;
 
 import com.alibaba.fastjson.JSONObject;
 
-import java.util.Collections;
-import java.util.Comparator;
 import java.util.HashSet;
 import java.util.Iterator;
 import java.util.LinkedList;
@@ -52,10 +45,9 @@ import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
 
-public class DNSServiceDiscovery implements ServiceDiscovery {
+public abstract class SelfHostMetaServiceDiscovery implements ServiceDiscovery 
{
 
     private final Logger logger = LoggerFactory.getLogger(getClass());
 
@@ -65,7 +57,7 @@ public class DNSServiceDiscovery implements ServiceDiscovery {
      * Echo check if consumer is still work
      * echo task may take a lot of time when consumer offline, create a new 
ScheduledThreadPool
      */
-    private final ScheduledExecutorService echoCheckExecutor = 
Executors.newScheduledThreadPool(1, new 
NamedThreadFactory("Dubbo-DNS-EchoCheck"));
+    private final ScheduledExecutorService echoCheckExecutor = 
Executors.newScheduledThreadPool(1, new 
NamedThreadFactory("Dubbo-Registry-EchoCheck-Consumer"));
 
     // =================================== Provider side 
=================================== //
 
@@ -79,15 +71,6 @@ public class DNSServiceDiscovery implements ServiceDiscovery 
{
     // =================================== Consumer side 
=================================== //
 
     /**
-     * DNS properties
-     */
-
-    private String addressPrefix;
-    private String addressSuffix;
-    private long pollingCycle;
-    private DNSResolver dnsResolver;
-
-    /**
      * Local Cache of {@link ServiceInstance} Metadata
      * <p>
      * Key - {@link ServiceInstance} ID ( usually ip + port )
@@ -112,32 +95,11 @@ public class DNSServiceDiscovery implements 
ServiceDiscovery {
      */
     private final ConcurrentHashMap<String, String> serviceInstanceRevisionMap 
= new ConcurrentHashMap<>();
 
-    /**
-     * Polling task ScheduledFuture, used to stop task when destroy
-     */
-    private final ConcurrentHashMap<String, ScheduledFuture<?>> 
pollingExecutorMap = new ConcurrentHashMap<>();
-
-    /**
-     * Polling check provider ExecutorService
-     */
-    private ScheduledExecutorService pollingExecutorService;
-
     @Override
     public void initialize(URL registryURL) throws Exception {
         this.registryURL = registryURL;
-        this.addressPrefix = 
registryURL.getParameter(DNSClientConst.ADDRESS_PREFIX, "");
-        this.addressSuffix = 
registryURL.getParameter(DNSClientConst.ADDRESS_SUFFIX, "");
-        this.pollingCycle = 
registryURL.getParameter(DNSClientConst.POLLING_CYCLE, 5000);
-        long echoPollingCycle = 
registryURL.getParameter(DNSClientConst.ECHO_POLLING_CYCLE, 60000);
-        int scheduledThreadPoolSize = 
registryURL.getParameter(DNSClientConst.SCHEDULED_THREAD_POOL_SIZE, 1);
-
-        String nameserver = registryURL.getHost();
-        int port = registryURL.getPort();
-        int maxQueriesPerResolve = 
registryURL.getParameter(DNSClientConst.MAX_QUERIES_PER_RESOLVE, 10);
-        this.dnsResolver = new DNSResolver(nameserver, port, 
maxQueriesPerResolve);
-
-        // polling task may take a lot of time, create a new 
ScheduledThreadPool
-        pollingExecutorService = 
Executors.newScheduledThreadPool(scheduledThreadPoolSize, new 
NamedThreadFactory("Dubbo-DNS-EchoCheck"));
+        doInitialize(registryURL);
+        long echoPollingCycle = 
registryURL.getParameter(Constants.ECHO_POLLING_CYCLE_KEY, 
Constants.DEFAULT_ECHO_POLLING_CYCLE);
 
         // Echo check: test if consumer is offline, remove 
MetadataChangeListener,
         // reduce the probability of failure when metadata update
@@ -158,18 +120,48 @@ public class DNSServiceDiscovery implements 
ServiceDiscovery {
                 }
             }
         }, echoPollingCycle, echoPollingCycle, TimeUnit.MILLISECONDS);
-
     }
 
     @Override
     public void destroy() throws Exception {
+        doDestroy();
         metadataMap.clear();
         serviceInstanceRevisionMap.clear();
-        pollingExecutorMap.forEach((serviceName, scheduledFuture) -> 
scheduledFuture.cancel(true));
-        pollingExecutorMap.clear();
         echoCheckExecutor.shutdown();
-        pollingExecutorService.shutdown();
-        dnsResolver.destroy();
+    }
+
+    private void updateMetadata(ServiceInstance serviceInstance) {
+        WritableMetadataService metadataService = 
WritableMetadataService.getDefaultExtension();
+        String metadataString = 
JSONObject.toJSONString(serviceInstance.getMetadata());
+        String metadataRevision = RevisionResolver.calRevision(metadataString);
+
+        // check if metadata updated
+        if (!metadataRevision.equalsIgnoreCase(lastMetadataRevision)) {
+            logger.info("Update Service Instance Metadata of DNS registry. 
Newer metadata: " + metadataString);
+            if (logger.isDebugEnabled()) {
+                logger.debug("Update Service Instance Metadata of DNS 
registry. Newer metadata: " + metadataString);
+            }
+
+            lastMetadataRevision = metadataRevision;
+
+            // save newest metadata to local
+            metadataService.exportInstanceMetadata(metadataString);
+
+            // notify to consumer
+            Map<String, InstanceMetadataChangedListener> listenerMap = 
metadataService.getInstanceMetadataChangedListenerMap();
+            Iterator<Map.Entry<String, InstanceMetadataChangedListener>> 
iterator = listenerMap.entrySet().iterator();
+
+            while (iterator.hasNext()) {
+                Map.Entry<String, InstanceMetadataChangedListener> entry = 
iterator.next();
+                try {
+                    entry.getValue().onEvent(metadataString);
+                } catch (RpcException e) {
+                    logger.warn("Notify to consumer error. Possible cause: 
consumer is offline.");
+                    // remove listener if consumer is offline
+                    iterator.remove();
+                }
+            }
+        }
     }
 
     @Override
@@ -177,6 +169,8 @@ public class DNSServiceDiscovery implements 
ServiceDiscovery {
         this.serviceInstance = serviceInstance;
 
         updateMetadata(serviceInstance);
+
+        doRegister(serviceInstance);
     }
 
     @Override
@@ -184,10 +178,14 @@ public class DNSServiceDiscovery implements 
ServiceDiscovery {
         this.serviceInstance = serviceInstance;
 
         updateMetadata(serviceInstance);
+
+        doUpdate(serviceInstance);
     }
 
     @Override
     public void unregister(ServiceInstance serviceInstance) throws 
RuntimeException {
+        doUnregister(serviceInstance);
+
         this.serviceInstance = null;
 
         // notify empty message to consumer
@@ -198,166 +196,95 @@ public class DNSServiceDiscovery implements 
ServiceDiscovery {
     }
 
     @Override
-    public Set<String> getServices() {
-        // it is impossible for dns to discover service names
-        return Collections.singleton("Unsupported Method");
+    public ServiceInstance getLocalInstance() {
+        return serviceInstance;
     }
 
     @Override
-    public List<ServiceInstance> getInstances(String serviceName) throws 
NullPointerException {
-
-        String serviceAddress = addressPrefix + serviceName + addressSuffix;
-
-        ResolveResult resolveResult = dnsResolver.resolve(serviceAddress);
-
-        return toServiceInstance(serviceName, resolveResult);
+    public URL getUrl() {
+        return registryURL;
     }
 
-    @Override
-    public void 
addServiceInstancesChangedListener(ServiceInstancesChangedListener listener) 
throws NullPointerException, IllegalArgumentException {
-        listener.getServiceNames().forEach(serviceName -> {
-            ScheduledFuture<?> scheduledFuture = 
pollingExecutorService.scheduleAtFixedRate(() -> {
-                        List<ServiceInstance> instances = 
getInstances(serviceName);
-                        
instances.sort(Comparator.comparingInt(ServiceInstance::hashCode));
-
-                        String serviceInstanceRevision = 
RevisionResolver.calRevision(JSONObject.toJSONString(instances));
-                        boolean changed = 
!serviceInstanceRevision.equalsIgnoreCase(
-                                serviceInstanceRevisionMap.put(serviceName, 
serviceInstanceRevision));
-
-                        if (logger.isDebugEnabled()) {
-                            logger.debug("Poll DNS data. Service Instance 
changed: " + changed + " Service Name: " + serviceName);
+    @SuppressWarnings("unchecked")
+    public final void fillServiceInstance(DefaultServiceInstance 
serviceInstance) {
+        String hostId = serviceInstance.getId();
+        if (metadataMap.containsKey(hostId)) {
+            // Use cached metadata.
+            // Metadata will be updated by provider callback
+
+            String metadataString = metadataMap.get(hostId);
+            serviceInstance.setMetadata(JSONObject.parseObject(metadataString, 
Map.class));
+        } else {
+            // refer from MetadataUtils, this proxy is different from the one 
used to refer exportedURL
+            MetadataService metadataService = 
MetadataUtils.getMetadataServiceProxy(serviceInstance, this);
+
+            String consumerId = ApplicationModel.getName() + 
NetUtils.getLocalHost();
+            String metadata = metadataService.getAndListenInstanceMetadata(
+                    consumerId, metadataString -> {
+                        logger.info("Receive callback: " + metadataString + 
serviceInstance);
+                        if (StringUtils.isEmpty(metadataString)) {
+                            // provider is shutdown
+                            metadataMap.remove(hostId);
+                        } else {
+                            metadataMap.put(hostId, metadataString);
                         }
+                    });
+            metadataMap.put(hostId, metadata);
+            serviceInstance.setMetadata(JSONObject.parseObject(metadata, 
Map.class));
+        }
+    }
 
-                        if (changed) {
-                            List<ServiceInstance> oldServiceInstances = 
cachedServiceInstances.getOrDefault(serviceName, new LinkedList<>());
+    public final void notifyListener(String serviceName, 
ServiceInstancesChangedListener listener, List<ServiceInstance> instances) {
+        String serviceInstanceRevision = 
RevisionResolver.calRevision(JSONObject.toJSONString(instances));
+        boolean changed = !serviceInstanceRevision.equalsIgnoreCase(
+                serviceInstanceRevisionMap.put(serviceName, 
serviceInstanceRevision));
 
-                            // remove expired invoker
-                            Set<ServiceInstance> allServiceInstances = new 
HashSet<>(oldServiceInstances.size() + instances.size());
-                            allServiceInstances.addAll(oldServiceInstances);
-                            allServiceInstances.addAll(instances);
+        if (logger.isDebugEnabled()) {
+            logger.debug("Poll DNS data. Service Instance changed: " + changed 
+ " Service Name: " + serviceName);
+        }
 
-                            allServiceInstances.removeAll(oldServiceInstances);
+        if (changed) {
+            List<ServiceInstance> oldServiceInstances = 
cachedServiceInstances.getOrDefault(serviceName, new LinkedList<>());
 
-                            allServiceInstances.forEach(removedServiceInstance 
-> {
-                                
MetadataUtils.destroyMetadataServiceProxy(removedServiceInstance, this);
-                            });
+            // remove expired invoker
+            Set<ServiceInstance> allServiceInstances = new 
HashSet<>(oldServiceInstances.size() + instances.size());
+            allServiceInstances.addAll(oldServiceInstances);
+            allServiceInstances.addAll(instances);
 
-                            cachedServiceInstances.put(serviceName, instances);
-                            listener.onEvent(new 
ServiceInstancesChangedEvent(serviceName, instances));
-                        }
+            allServiceInstances.removeAll(oldServiceInstances);
 
-                    },
-                    pollingCycle, pollingCycle, TimeUnit.MILLISECONDS);
+            allServiceInstances.forEach(removedServiceInstance -> {
+                
MetadataUtils.destroyMetadataServiceProxy(removedServiceInstance, this);
+            });
 
-            pollingExecutorMap.put(serviceName, scheduledFuture);
-        });
+            cachedServiceInstances.put(serviceName, instances);
+            listener.onEvent(new ServiceInstancesChangedEvent(serviceName, 
instances));
+        }
     }
 
-    @Override
-    public ServiceInstance getLocalInstance() {
-        return serviceInstance;
+    public void doInitialize(URL registryURL) throws Exception {
     }
 
-    @Override
-    public URL getUrl() {
-        return registryURL;
+    public void doDestroy() throws Exception {
     }
 
-    /**
-     * UT used only
-     */
-    @Deprecated
-    public void setDnsResolver(DNSResolver dnsResolver) {
-        this.dnsResolver = dnsResolver;
-    }
+    public void doRegister(ServiceInstance serviceInstance) throws 
RuntimeException {
 
-    /**
-     * UT used only
-     */
-    @Deprecated
-    public ConcurrentHashMap<String, List<ServiceInstance>> 
getCachedServiceInstances() {
-        return cachedServiceInstances;
     }
 
-    @SuppressWarnings("unchecked")
-    private List<ServiceInstance> toServiceInstance(String serviceName, 
ResolveResult resolveResult) {
-
-        int port;
+    public void doUpdate(ServiceInstance serviceInstance) throws 
RuntimeException {
 
-        if (resolveResult.getPort().size() > 0) {
-            // use first as default
-            port = resolveResult.getPort().get(0);
-        } else {
-            // not support SRV record
-            port = 20880;
-        }
-
-        List<ServiceInstance> instanceList = new LinkedList<>();
-
-        for (String host : resolveResult.getHostnameList()) {
-            DefaultServiceInstance serviceInstance = new 
DefaultServiceInstance(serviceName, host, port);
-            String hostId = serviceInstance.getId();
-            if (metadataMap.containsKey(hostId)) {
-                // Use cached metadata.
-                // Metadata will be updated by provider callback
-
-                String metadataString = metadataMap.get(hostId);
-                
serviceInstance.setMetadata(JSONObject.parseObject(metadataString, Map.class));
-            } else {
-                // refer from MetadataUtils, this proxy is different from the 
one used to refer exportedURL
-                MetadataService metadataService = 
MetadataUtils.getMetadataServiceProxy(serviceInstance, this);
-
-                String consumerId = ApplicationModel.getName() + 
NetUtils.getLocalHost();
-                String metadata = metadataService.getAndListenInstanceMetadata(
-                        consumerId, metadataString -> {
-                            logger.info("Receive callback: " + metadataString 
+ serviceInstance);
-                            if (StringUtils.isEmpty(metadataString)) {
-                                // provider is shutdown
-                                metadataMap.remove(hostId);
-                            } else {
-                                metadataMap.put(hostId, metadataString);
-                            }
-                        });
-                metadataMap.put(hostId, metadata);
-                serviceInstance.setMetadata(JSONObject.parseObject(metadata, 
Map.class));
-            }
-            instanceList.add(serviceInstance);
-        }
-
-        return instanceList;
     }
 
-    private void updateMetadata(ServiceInstance serviceInstance) {
-        WritableMetadataService metadataService = 
WritableMetadataService.getDefaultExtension();
-        String metadataString = 
JSONObject.toJSONString(serviceInstance.getMetadata());
-        String metadataRevision = RevisionResolver.calRevision(metadataString);
-
-        // check if metadata updated
-        if (!metadataRevision.equalsIgnoreCase(lastMetadataRevision)) {
-            logger.info("Update Service Instance Metadata of DNS registry. 
Newer metadata: " + metadataString);
-            if (logger.isDebugEnabled()) {
-                logger.debug("Update Service Instance Metadata of DNS 
registry. Newer metadata: " + metadataString);
-            }
-
-            lastMetadataRevision = metadataRevision;
+    public void doUnregister(ServiceInstance serviceInstance) throws 
RuntimeException {
 
-            // save newest metadata to local
-            metadataService.exportInstanceMetadata(metadataString);
-
-            // notify to consumer
-            Map<String, InstanceMetadataChangedListener> listenerMap = 
metadataService.getInstanceMetadataChangedListenerMap();
-            Iterator<Map.Entry<String, InstanceMetadataChangedListener>> 
iterator = listenerMap.entrySet().iterator();
+    }
 
-            while (iterator.hasNext()) {
-                Map.Entry<String, InstanceMetadataChangedListener> entry = 
iterator.next();
-                try {
-                    entry.getValue().onEvent(metadataString);
-                } catch (RpcException e) {
-                    logger.warn("Notify to consumer error. Possible cause: 
consumer is offline.");
-                    // remove listener if consumer is offline
-                    iterator.remove();
-                }
-            }
-        }
+    /**
+     * UT used only
+     */
+    @Deprecated
+    public final ConcurrentHashMap<String, List<ServiceInstance>> 
getCachedServiceInstances() {
+        return cachedServiceInstances;
     }
 }
diff --git 
a/dubbo-registry/dubbo-registry-dns/src/main/java/org/apache/dubbo/registry/dns/DNSServiceDiscovery.java
 
b/dubbo-registry/dubbo-registry-dns/src/main/java/org/apache/dubbo/registry/dns/DNSServiceDiscovery.java
index 35596cf..a4d30d6 100644
--- 
a/dubbo-registry/dubbo-registry-dns/src/main/java/org/apache/dubbo/registry/dns/DNSServiceDiscovery.java
+++ 
b/dubbo-registry/dubbo-registry-dns/src/main/java/org/apache/dubbo/registry/dns/DNSServiceDiscovery.java
@@ -17,37 +17,21 @@
 package org.apache.dubbo.registry.dns;
 
 import org.apache.dubbo.common.URL;
-import org.apache.dubbo.common.constants.CommonConstants;
 import org.apache.dubbo.common.logger.Logger;
 import org.apache.dubbo.common.logger.LoggerFactory;
 import org.apache.dubbo.common.utils.NamedThreadFactory;
-import org.apache.dubbo.common.utils.NetUtils;
-import org.apache.dubbo.common.utils.StringUtils;
-import org.apache.dubbo.metadata.InstanceMetadataChangedListener;
-import org.apache.dubbo.metadata.MetadataService;
-import org.apache.dubbo.metadata.RevisionResolver;
-import org.apache.dubbo.metadata.WritableMetadataService;
 import org.apache.dubbo.registry.client.DefaultServiceInstance;
-import org.apache.dubbo.registry.client.ServiceDiscovery;
+import org.apache.dubbo.registry.client.SelfHostMetaServiceDiscovery;
 import org.apache.dubbo.registry.client.ServiceInstance;
-import org.apache.dubbo.registry.client.event.ServiceInstancesChangedEvent;
 import 
org.apache.dubbo.registry.client.event.listener.ServiceInstancesChangedListener;
-import org.apache.dubbo.registry.client.metadata.MetadataUtils;
 import org.apache.dubbo.registry.dns.util.DNSClientConst;
 import org.apache.dubbo.registry.dns.util.DNSResolver;
 import org.apache.dubbo.registry.dns.util.ResolveResult;
-import org.apache.dubbo.rpc.RpcException;
-import org.apache.dubbo.rpc.model.ApplicationModel;
-
-import com.alibaba.fastjson.JSONObject;
 
 import java.util.Collections;
 import java.util.Comparator;
-import java.util.HashSet;
-import java.util.Iterator;
 import java.util.LinkedList;
 import java.util.List;
-import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.Executors;
@@ -55,29 +39,10 @@ import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
 
-public class DNSServiceDiscovery implements ServiceDiscovery {
+public class DNSServiceDiscovery extends SelfHostMetaServiceDiscovery {
 
     private final Logger logger = LoggerFactory.getLogger(getClass());
 
-    private URL registryURL;
-
-    /**
-     * Echo check if consumer is still work
-     * echo task may take a lot of time when consumer offline, create a new 
ScheduledThreadPool
-     */
-    private final ScheduledExecutorService echoCheckExecutor = 
Executors.newScheduledThreadPool(1, new 
NamedThreadFactory("Dubbo-DNS-EchoCheck"));
-
-    // =================================== Provider side 
=================================== //
-
-    private ServiceInstance serviceInstance;
-
-    /**
-     * Local {@link ServiceInstance} Metadata's revision
-     */
-    private String lastMetadataRevision;
-
-    // =================================== Consumer side 
=================================== //
-
     /**
      * DNS properties
      */
@@ -88,31 +53,6 @@ public class DNSServiceDiscovery implements ServiceDiscovery 
{
     private DNSResolver dnsResolver;
 
     /**
-     * Local Cache of {@link ServiceInstance} Metadata
-     * <p>
-     * Key - {@link ServiceInstance} ID ( usually ip + port )
-     * Value - Json processed metadata string
-     */
-    private final ConcurrentHashMap<String, String> metadataMap = new 
ConcurrentHashMap<>();
-
-    /**
-     * Local Cache of {@link ServiceInstance}
-     * <p>
-     * Key - Service Name
-     * Value - List {@link ServiceInstance}
-     */
-    private final ConcurrentHashMap<String, List<ServiceInstance>> 
cachedServiceInstances = new ConcurrentHashMap<>();
-
-    /**
-     * Local Cache of Service's {@link ServiceInstance} list revision,
-     * used to check if {@link ServiceInstance} list has been updated
-     * <p>
-     * Key - ServiceName
-     * Value - a revision calculate from {@link List} of {@link 
ServiceInstance}
-     */
-    private final ConcurrentHashMap<String, String> serviceInstanceRevisionMap 
= new ConcurrentHashMap<>();
-
-    /**
      * Polling task ScheduledFuture, used to stop task when destroy
      */
     private final ConcurrentHashMap<String, ScheduledFuture<?>> 
pollingExecutorMap = new ConcurrentHashMap<>();
@@ -123,78 +63,30 @@ public class DNSServiceDiscovery implements 
ServiceDiscovery {
     private ScheduledExecutorService pollingExecutorService;
 
     @Override
-    public void initialize(URL registryURL) throws Exception {
-        this.registryURL = registryURL;
+    public void doInitialize(URL registryURL) throws Exception {
         this.addressPrefix = 
registryURL.getParameter(DNSClientConst.ADDRESS_PREFIX, "");
         this.addressSuffix = 
registryURL.getParameter(DNSClientConst.ADDRESS_SUFFIX, "");
-        this.pollingCycle = 
registryURL.getParameter(DNSClientConst.POLLING_CYCLE, 5000);
-        long echoPollingCycle = 
registryURL.getParameter(DNSClientConst.ECHO_POLLING_CYCLE, 60000);
-        int scheduledThreadPoolSize = 
registryURL.getParameter(DNSClientConst.SCHEDULED_THREAD_POOL_SIZE, 1);
+        this.pollingCycle = 
registryURL.getParameter(DNSClientConst.DNS_POLLING_CYCLE, 
DNSClientConst.DEFAULT_DNS_POLLING_CYCLE);
 
         String nameserver = registryURL.getHost();
         int port = registryURL.getPort();
         int maxQueriesPerResolve = 
registryURL.getParameter(DNSClientConst.MAX_QUERIES_PER_RESOLVE, 10);
         this.dnsResolver = new DNSResolver(nameserver, port, 
maxQueriesPerResolve);
 
-        // polling task may take a lot of time, create a new 
ScheduledThreadPool
-        pollingExecutorService = 
Executors.newScheduledThreadPool(scheduledThreadPoolSize, new 
NamedThreadFactory("Dubbo-DNS-EchoCheck"));
 
-        // Echo check: test if consumer is offline, remove 
MetadataChangeListener,
-        // reduce the probability of failure when metadata update
-        echoCheckExecutor.scheduleAtFixedRate(() -> {
-            WritableMetadataService metadataService = 
WritableMetadataService.getDefaultExtension();
-            Map<String, InstanceMetadataChangedListener> listenerMap = 
metadataService.getInstanceMetadataChangedListenerMap();
-            Iterator<Map.Entry<String, InstanceMetadataChangedListener>> 
iterator = listenerMap.entrySet().iterator();
+        int scheduledThreadPoolSize = 
registryURL.getParameter(DNSClientConst.DNS_POLLING_POOL_SIZE_KEY, 
DNSClientConst.DEFAULT_DNS_POLLING_POOL_SIZE);
 
-            while (iterator.hasNext()) {
-                Map.Entry<String, InstanceMetadataChangedListener> entry = 
iterator.next();
-                try {
-                    entry.getValue().echo(CommonConstants.DUBBO);
-                } catch (RpcException e) {
-                    if (logger.isInfoEnabled()) {
-                        logger.info("Send echo message to consumer error. 
Possible cause: consumer is offline.");
-                    }
-                    iterator.remove();
-                }
-            }
-        }, echoPollingCycle, echoPollingCycle, TimeUnit.MILLISECONDS);
+        // polling task may take a lot of time, create a new 
ScheduledThreadPool
+        pollingExecutorService = 
Executors.newScheduledThreadPool(scheduledThreadPoolSize, new 
NamedThreadFactory("Dubbo-DNS-Poll"));
 
     }
 
     @Override
-    public void destroy() throws Exception {
-        metadataMap.clear();
-        serviceInstanceRevisionMap.clear();
+    public void doDestroy() throws Exception {
+        dnsResolver.destroy();
         pollingExecutorMap.forEach((serviceName, scheduledFuture) -> 
scheduledFuture.cancel(true));
         pollingExecutorMap.clear();
-        echoCheckExecutor.shutdown();
         pollingExecutorService.shutdown();
-        dnsResolver.destroy();
-    }
-
-    @Override
-    public void register(ServiceInstance serviceInstance) throws 
RuntimeException {
-        this.serviceInstance = serviceInstance;
-
-        updateMetadata(serviceInstance);
-    }
-
-    @Override
-    public void update(ServiceInstance serviceInstance) throws 
RuntimeException {
-        this.serviceInstance = serviceInstance;
-
-        updateMetadata(serviceInstance);
-    }
-
-    @Override
-    public void unregister(ServiceInstance serviceInstance) throws 
RuntimeException {
-        this.serviceInstance = null;
-
-        // notify empty message to consumer
-        WritableMetadataService metadataService = 
WritableMetadataService.getDefaultExtension();
-        metadataService.exportInstanceMetadata("");
-        
metadataService.getInstanceMetadataChangedListenerMap().forEach((consumerId, 
listener) -> listener.onEvent(""));
-        metadataService.getInstanceMetadataChangedListenerMap().clear();
     }
 
     @Override
@@ -219,33 +111,7 @@ public class DNSServiceDiscovery implements 
ServiceDiscovery {
             ScheduledFuture<?> scheduledFuture = 
pollingExecutorService.scheduleAtFixedRate(() -> {
                         List<ServiceInstance> instances = 
getInstances(serviceName);
                         
instances.sort(Comparator.comparingInt(ServiceInstance::hashCode));
-
-                        String serviceInstanceRevision = 
RevisionResolver.calRevision(JSONObject.toJSONString(instances));
-                        boolean changed = 
!serviceInstanceRevision.equalsIgnoreCase(
-                                serviceInstanceRevisionMap.put(serviceName, 
serviceInstanceRevision));
-
-                        if (logger.isDebugEnabled()) {
-                            logger.debug("Poll DNS data. Service Instance 
changed: " + changed + " Service Name: " + serviceName);
-                        }
-
-                        if (changed) {
-                            List<ServiceInstance> oldServiceInstances = 
cachedServiceInstances.getOrDefault(serviceName, new LinkedList<>());
-
-                            // remove expired invoker
-                            Set<ServiceInstance> allServiceInstances = new 
HashSet<>(oldServiceInstances.size() + instances.size());
-                            allServiceInstances.addAll(oldServiceInstances);
-                            allServiceInstances.addAll(instances);
-
-                            allServiceInstances.removeAll(oldServiceInstances);
-
-                            allServiceInstances.forEach(removedServiceInstance 
-> {
-                                
MetadataUtils.destroyMetadataServiceProxy(removedServiceInstance, this);
-                            });
-
-                            cachedServiceInstances.put(serviceName, instances);
-                            listener.onEvent(new 
ServiceInstancesChangedEvent(serviceName, instances));
-                        }
-
+                        notifyListener(serviceName, listener, instances);
                     },
                     pollingCycle, pollingCycle, TimeUnit.MILLISECONDS);
 
@@ -253,16 +119,6 @@ public class DNSServiceDiscovery implements 
ServiceDiscovery {
         });
     }
 
-    @Override
-    public ServiceInstance getLocalInstance() {
-        return serviceInstance;
-    }
-
-    @Override
-    public URL getUrl() {
-        return registryURL;
-    }
-
     /**
      * UT used only
      */
@@ -271,15 +127,6 @@ public class DNSServiceDiscovery implements 
ServiceDiscovery {
         this.dnsResolver = dnsResolver;
     }
 
-    /**
-     * UT used only
-     */
-    @Deprecated
-    public ConcurrentHashMap<String, List<ServiceInstance>> 
getCachedServiceInstances() {
-        return cachedServiceInstances;
-    }
-
-    @SuppressWarnings("unchecked")
     private List<ServiceInstance> toServiceInstance(String serviceName, 
ResolveResult resolveResult) {
 
         int port;
@@ -296,68 +143,10 @@ public class DNSServiceDiscovery implements 
ServiceDiscovery {
 
         for (String host : resolveResult.getHostnameList()) {
             DefaultServiceInstance serviceInstance = new 
DefaultServiceInstance(serviceName, host, port);
-            String hostId = serviceInstance.getId();
-            if (metadataMap.containsKey(hostId)) {
-                // Use cached metadata.
-                // Metadata will be updated by provider callback
-
-                String metadataString = metadataMap.get(hostId);
-                
serviceInstance.setMetadata(JSONObject.parseObject(metadataString, Map.class));
-            } else {
-                // refer from MetadataUtils, this proxy is different from the 
one used to refer exportedURL
-                MetadataService metadataService = 
MetadataUtils.getMetadataServiceProxy(serviceInstance, this);
-
-                String consumerId = ApplicationModel.getName() + 
NetUtils.getLocalHost();
-                String metadata = metadataService.getAndListenInstanceMetadata(
-                        consumerId, metadataString -> {
-                            logger.info("Receive callback: " + metadataString 
+ serviceInstance);
-                            if (StringUtils.isEmpty(metadataString)) {
-                                // provider is shutdown
-                                metadataMap.remove(hostId);
-                            } else {
-                                metadataMap.put(hostId, metadataString);
-                            }
-                        });
-                metadataMap.put(hostId, metadata);
-                serviceInstance.setMetadata(JSONObject.parseObject(metadata, 
Map.class));
-            }
+            fillServiceInstance(serviceInstance);
             instanceList.add(serviceInstance);
         }
 
         return instanceList;
     }
-
-    private void updateMetadata(ServiceInstance serviceInstance) {
-        WritableMetadataService metadataService = 
WritableMetadataService.getDefaultExtension();
-        String metadataString = 
JSONObject.toJSONString(serviceInstance.getMetadata());
-        String metadataRevision = RevisionResolver.calRevision(metadataString);
-
-        // check if metadata updated
-        if (!metadataRevision.equalsIgnoreCase(lastMetadataRevision)) {
-            logger.info("Update Service Instance Metadata of DNS registry. 
Newer metadata: " + metadataString);
-            if (logger.isDebugEnabled()) {
-                logger.debug("Update Service Instance Metadata of DNS 
registry. Newer metadata: " + metadataString);
-            }
-
-            lastMetadataRevision = metadataRevision;
-
-            // save newest metadata to local
-            metadataService.exportInstanceMetadata(metadataString);
-
-            // notify to consumer
-            Map<String, InstanceMetadataChangedListener> listenerMap = 
metadataService.getInstanceMetadataChangedListenerMap();
-            Iterator<Map.Entry<String, InstanceMetadataChangedListener>> 
iterator = listenerMap.entrySet().iterator();
-
-            while (iterator.hasNext()) {
-                Map.Entry<String, InstanceMetadataChangedListener> entry = 
iterator.next();
-                try {
-                    entry.getValue().onEvent(metadataString);
-                } catch (RpcException e) {
-                    logger.warn("Notify to consumer error. Possible cause: 
consumer is offline.");
-                    // remove listener if consumer is offline
-                    iterator.remove();
-                }
-            }
-        }
-    }
 }
diff --git 
a/dubbo-registry/dubbo-registry-dns/src/main/java/org/apache/dubbo/registry/dns/util/DNSClientConst.java
 
b/dubbo-registry/dubbo-registry-dns/src/main/java/org/apache/dubbo/registry/dns/util/DNSClientConst.java
index c01fa56..0fc8ada 100644
--- 
a/dubbo-registry/dubbo-registry-dns/src/main/java/org/apache/dubbo/registry/dns/util/DNSClientConst.java
+++ 
b/dubbo-registry/dubbo-registry-dns/src/main/java/org/apache/dubbo/registry/dns/util/DNSClientConst.java
@@ -24,10 +24,24 @@ public class DNSClientConst {
 
     public final static String MAX_QUERIES_PER_RESOLVE = 
"maxQueriesPerResolve";
 
-    public final static String POLLING_CYCLE = "pollingCycle";
-
-    public final static String ECHO_POLLING_CYCLE = "echoPollingCycle";
-
-    public final static String SCHEDULED_THREAD_POOL_SIZE = 
"scheduledThreadPoolSize";
+    /**
+     * To decide the frequency of execute DNS poll (in ms)
+     */
+    public final static String DNS_POLLING_CYCLE = "dnsPollingCycle";
+
+    /**
+     * Default value for check frequency: 60000 (ms)
+     */
+    public final static int DEFAULT_DNS_POLLING_CYCLE = 60000;
+
+    /**
+     * To decide how many threads used to execute DNS poll
+     */
+    public final static String DNS_POLLING_POOL_SIZE_KEY = 
"dnsPollingPoolSize";
+
+    /**
+     * Default value for DNS pool thread: 1
+     */
+    public final static int DEFAULT_DNS_POLLING_POOL_SIZE = 1;
 
 }
diff --git 
a/dubbo-registry/dubbo-registry-dns/src/test/java/org/apache/dubbo/registry/dns/DNSServiceDiscoveryTest.java
 
b/dubbo-registry/dubbo-registry-dns/src/test/java/org/apache/dubbo/registry/dns/DNSServiceDiscoveryTest.java
index c36ced1..cc6610b 100644
--- 
a/dubbo-registry/dubbo-registry-dns/src/test/java/org/apache/dubbo/registry/dns/DNSServiceDiscoveryTest.java
+++ 
b/dubbo-registry/dubbo-registry-dns/src/test/java/org/apache/dubbo/registry/dns/DNSServiceDiscoveryTest.java
@@ -27,6 +27,7 @@ import org.apache.dubbo.config.ServiceConfig;
 import org.apache.dubbo.metadata.InstanceMetadataChangedListener;
 import org.apache.dubbo.metadata.MetadataService;
 import org.apache.dubbo.metadata.WritableMetadataService;
+import org.apache.dubbo.registry.Constants;
 import org.apache.dubbo.registry.client.DefaultServiceInstance;
 import org.apache.dubbo.registry.client.ServiceDiscovery;
 import org.apache.dubbo.registry.client.ServiceInstance;
@@ -107,8 +108,8 @@ public class DNSServiceDiscoveryTest {
         DNSServiceDiscovery dnsServiceDiscovery = new DNSServiceDiscovery();
 
         URL registryURL = URL.valueOf("dns://")
-                .addParameter(DNSClientConst.POLLING_CYCLE, 100)
-                .addParameter(DNSClientConst.ECHO_POLLING_CYCLE, 100);
+                .addParameter(DNSClientConst.DNS_POLLING_CYCLE, 100)
+                .addParameter(Constants.ECHO_POLLING_CYCLE_KEY, 100);
         ApplicationModel.getEnvironment().getAppExternalConfigurationMap()
                 .put(METADATA_PROXY_TIMEOUT_KEY, String.valueOf(500));
         dnsServiceDiscovery.initialize(registryURL);

Reply via email to