This is an automated email from the ASF dual-hosted git repository.
liujun pushed a commit to branch 3.0-k8s
in repository https://gitbox.apache.org/repos/asf/dubbo.git
The following commit(s) were added to refs/heads/3.0-k8s by this push:
new 6581af4 [3.0 DNS] Move distribute metadata fetch to api module (#7092)
6581af4 is described below
commit 6581af46997b2ff8d12dcdc74247afc17d8a0096
Author: Albumen Kevin <[email protected]>
AuthorDate: Fri Jan 15 23:42:54 2021 +0800
[3.0 DNS] Move distribute metadata fetch to api module (#7092)
---
.../java/org/apache/dubbo/registry/Constants.java | 10 +
.../client/SelfHostMetaServiceDiscovery.java} | 293 ++++++++-------------
.../dubbo/registry/dns/DNSServiceDiscovery.java | 233 +---------------
.../dubbo/registry/dns/util/DNSClientConst.java | 24 +-
.../registry/dns/DNSServiceDiscoveryTest.java | 5 +-
5 files changed, 153 insertions(+), 412 deletions(-)
diff --git
a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/Constants.java
b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/Constants.java
index 1bf4168..eee0665 100644
---
a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/Constants.java
+++
b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/Constants.java
@@ -88,4 +88,14 @@ public interface Constants {
String REGISTRY_RETRY_PERIOD_KEY = "retry.period";
String SESSION_TIMEOUT_KEY = "session";
+
+ /**
+ * To decide the frequency of checking Distributed Service Discovery
Registry callback hook (in ms)
+ */
+ String ECHO_POLLING_CYCLE_KEY = "echoPollingCycle";
+
+ /**
+ * Default value for check frequency: 60000 (ms)
+ */
+ int DEFAULT_ECHO_POLLING_CYCLE = 60000;
}
diff --git
a/dubbo-registry/dubbo-registry-dns/src/main/java/org/apache/dubbo/registry/dns/DNSServiceDiscovery.java
b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/SelfHostMetaServiceDiscovery.java
similarity index 53%
copy from
dubbo-registry/dubbo-registry-dns/src/main/java/org/apache/dubbo/registry/dns/DNSServiceDiscovery.java
copy to
dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/SelfHostMetaServiceDiscovery.java
index 35596cf..25ab77e 100644
---
a/dubbo-registry/dubbo-registry-dns/src/main/java/org/apache/dubbo/registry/dns/DNSServiceDiscovery.java
+++
b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/SelfHostMetaServiceDiscovery.java
@@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.dubbo.registry.dns;
+package org.apache.dubbo.registry.client;
import org.apache.dubbo.common.URL;
import org.apache.dubbo.common.constants.CommonConstants;
@@ -27,22 +27,15 @@ import
org.apache.dubbo.metadata.InstanceMetadataChangedListener;
import org.apache.dubbo.metadata.MetadataService;
import org.apache.dubbo.metadata.RevisionResolver;
import org.apache.dubbo.metadata.WritableMetadataService;
-import org.apache.dubbo.registry.client.DefaultServiceInstance;
-import org.apache.dubbo.registry.client.ServiceDiscovery;
-import org.apache.dubbo.registry.client.ServiceInstance;
+import org.apache.dubbo.registry.Constants;
import org.apache.dubbo.registry.client.event.ServiceInstancesChangedEvent;
import
org.apache.dubbo.registry.client.event.listener.ServiceInstancesChangedListener;
import org.apache.dubbo.registry.client.metadata.MetadataUtils;
-import org.apache.dubbo.registry.dns.util.DNSClientConst;
-import org.apache.dubbo.registry.dns.util.DNSResolver;
-import org.apache.dubbo.registry.dns.util.ResolveResult;
import org.apache.dubbo.rpc.RpcException;
import org.apache.dubbo.rpc.model.ApplicationModel;
import com.alibaba.fastjson.JSONObject;
-import java.util.Collections;
-import java.util.Comparator;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
@@ -52,10 +45,9 @@ import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
-public class DNSServiceDiscovery implements ServiceDiscovery {
+public abstract class SelfHostMetaServiceDiscovery implements ServiceDiscovery
{
private final Logger logger = LoggerFactory.getLogger(getClass());
@@ -65,7 +57,7 @@ public class DNSServiceDiscovery implements ServiceDiscovery {
* Echo check if consumer is still work
* echo task may take a lot of time when consumer offline, create a new
ScheduledThreadPool
*/
- private final ScheduledExecutorService echoCheckExecutor =
Executors.newScheduledThreadPool(1, new
NamedThreadFactory("Dubbo-DNS-EchoCheck"));
+ private final ScheduledExecutorService echoCheckExecutor =
Executors.newScheduledThreadPool(1, new
NamedThreadFactory("Dubbo-Registry-EchoCheck-Consumer"));
// =================================== Provider side
=================================== //
@@ -79,15 +71,6 @@ public class DNSServiceDiscovery implements ServiceDiscovery
{
// =================================== Consumer side
=================================== //
/**
- * DNS properties
- */
-
- private String addressPrefix;
- private String addressSuffix;
- private long pollingCycle;
- private DNSResolver dnsResolver;
-
- /**
* Local Cache of {@link ServiceInstance} Metadata
* <p>
* Key - {@link ServiceInstance} ID ( usually ip + port )
@@ -112,32 +95,11 @@ public class DNSServiceDiscovery implements
ServiceDiscovery {
*/
private final ConcurrentHashMap<String, String> serviceInstanceRevisionMap
= new ConcurrentHashMap<>();
- /**
- * Polling task ScheduledFuture, used to stop task when destroy
- */
- private final ConcurrentHashMap<String, ScheduledFuture<?>>
pollingExecutorMap = new ConcurrentHashMap<>();
-
- /**
- * Polling check provider ExecutorService
- */
- private ScheduledExecutorService pollingExecutorService;
-
@Override
public void initialize(URL registryURL) throws Exception {
this.registryURL = registryURL;
- this.addressPrefix =
registryURL.getParameter(DNSClientConst.ADDRESS_PREFIX, "");
- this.addressSuffix =
registryURL.getParameter(DNSClientConst.ADDRESS_SUFFIX, "");
- this.pollingCycle =
registryURL.getParameter(DNSClientConst.POLLING_CYCLE, 5000);
- long echoPollingCycle =
registryURL.getParameter(DNSClientConst.ECHO_POLLING_CYCLE, 60000);
- int scheduledThreadPoolSize =
registryURL.getParameter(DNSClientConst.SCHEDULED_THREAD_POOL_SIZE, 1);
-
- String nameserver = registryURL.getHost();
- int port = registryURL.getPort();
- int maxQueriesPerResolve =
registryURL.getParameter(DNSClientConst.MAX_QUERIES_PER_RESOLVE, 10);
- this.dnsResolver = new DNSResolver(nameserver, port,
maxQueriesPerResolve);
-
- // polling task may take a lot of time, create a new
ScheduledThreadPool
- pollingExecutorService =
Executors.newScheduledThreadPool(scheduledThreadPoolSize, new
NamedThreadFactory("Dubbo-DNS-EchoCheck"));
+ doInitialize(registryURL);
+ long echoPollingCycle =
registryURL.getParameter(Constants.ECHO_POLLING_CYCLE_KEY,
Constants.DEFAULT_ECHO_POLLING_CYCLE);
// Echo check: test if consumer is offline, remove
MetadataChangeListener,
// reduce the probability of failure when metadata update
@@ -158,18 +120,48 @@ public class DNSServiceDiscovery implements
ServiceDiscovery {
}
}
}, echoPollingCycle, echoPollingCycle, TimeUnit.MILLISECONDS);
-
}
@Override
public void destroy() throws Exception {
+ doDestroy();
metadataMap.clear();
serviceInstanceRevisionMap.clear();
- pollingExecutorMap.forEach((serviceName, scheduledFuture) ->
scheduledFuture.cancel(true));
- pollingExecutorMap.clear();
echoCheckExecutor.shutdown();
- pollingExecutorService.shutdown();
- dnsResolver.destroy();
+ }
+
+ private void updateMetadata(ServiceInstance serviceInstance) {
+ WritableMetadataService metadataService =
WritableMetadataService.getDefaultExtension();
+ String metadataString =
JSONObject.toJSONString(serviceInstance.getMetadata());
+ String metadataRevision = RevisionResolver.calRevision(metadataString);
+
+ // check if metadata updated
+ if (!metadataRevision.equalsIgnoreCase(lastMetadataRevision)) {
+ logger.info("Update Service Instance Metadata of DNS registry.
Newer metadata: " + metadataString);
+ if (logger.isDebugEnabled()) {
+ logger.debug("Update Service Instance Metadata of DNS
registry. Newer metadata: " + metadataString);
+ }
+
+ lastMetadataRevision = metadataRevision;
+
+ // save newest metadata to local
+ metadataService.exportInstanceMetadata(metadataString);
+
+ // notify to consumer
+ Map<String, InstanceMetadataChangedListener> listenerMap =
metadataService.getInstanceMetadataChangedListenerMap();
+ Iterator<Map.Entry<String, InstanceMetadataChangedListener>>
iterator = listenerMap.entrySet().iterator();
+
+ while (iterator.hasNext()) {
+ Map.Entry<String, InstanceMetadataChangedListener> entry =
iterator.next();
+ try {
+ entry.getValue().onEvent(metadataString);
+ } catch (RpcException e) {
+ logger.warn("Notify to consumer error. Possible cause:
consumer is offline.");
+ // remove listener if consumer is offline
+ iterator.remove();
+ }
+ }
+ }
}
@Override
@@ -177,6 +169,8 @@ public class DNSServiceDiscovery implements
ServiceDiscovery {
this.serviceInstance = serviceInstance;
updateMetadata(serviceInstance);
+
+ doRegister(serviceInstance);
}
@Override
@@ -184,10 +178,14 @@ public class DNSServiceDiscovery implements
ServiceDiscovery {
this.serviceInstance = serviceInstance;
updateMetadata(serviceInstance);
+
+ doUpdate(serviceInstance);
}
@Override
public void unregister(ServiceInstance serviceInstance) throws
RuntimeException {
+ doUnregister(serviceInstance);
+
this.serviceInstance = null;
// notify empty message to consumer
@@ -198,166 +196,95 @@ public class DNSServiceDiscovery implements
ServiceDiscovery {
}
@Override
- public Set<String> getServices() {
- // it is impossible for dns to discover service names
- return Collections.singleton("Unsupported Method");
+ public ServiceInstance getLocalInstance() {
+ return serviceInstance;
}
@Override
- public List<ServiceInstance> getInstances(String serviceName) throws
NullPointerException {
-
- String serviceAddress = addressPrefix + serviceName + addressSuffix;
-
- ResolveResult resolveResult = dnsResolver.resolve(serviceAddress);
-
- return toServiceInstance(serviceName, resolveResult);
+ public URL getUrl() {
+ return registryURL;
}
- @Override
- public void
addServiceInstancesChangedListener(ServiceInstancesChangedListener listener)
throws NullPointerException, IllegalArgumentException {
- listener.getServiceNames().forEach(serviceName -> {
- ScheduledFuture<?> scheduledFuture =
pollingExecutorService.scheduleAtFixedRate(() -> {
- List<ServiceInstance> instances =
getInstances(serviceName);
-
instances.sort(Comparator.comparingInt(ServiceInstance::hashCode));
-
- String serviceInstanceRevision =
RevisionResolver.calRevision(JSONObject.toJSONString(instances));
- boolean changed =
!serviceInstanceRevision.equalsIgnoreCase(
- serviceInstanceRevisionMap.put(serviceName,
serviceInstanceRevision));
-
- if (logger.isDebugEnabled()) {
- logger.debug("Poll DNS data. Service Instance
changed: " + changed + " Service Name: " + serviceName);
+ @SuppressWarnings("unchecked")
+ public final void fillServiceInstance(DefaultServiceInstance
serviceInstance) {
+ String hostId = serviceInstance.getId();
+ if (metadataMap.containsKey(hostId)) {
+ // Use cached metadata.
+ // Metadata will be updated by provider callback
+
+ String metadataString = metadataMap.get(hostId);
+ serviceInstance.setMetadata(JSONObject.parseObject(metadataString,
Map.class));
+ } else {
+ // refer from MetadataUtils, this proxy is different from the one
used to refer exportedURL
+ MetadataService metadataService =
MetadataUtils.getMetadataServiceProxy(serviceInstance, this);
+
+ String consumerId = ApplicationModel.getName() +
NetUtils.getLocalHost();
+ String metadata = metadataService.getAndListenInstanceMetadata(
+ consumerId, metadataString -> {
+ logger.info("Receive callback: " + metadataString +
serviceInstance);
+ if (StringUtils.isEmpty(metadataString)) {
+ // provider is shutdown
+ metadataMap.remove(hostId);
+ } else {
+ metadataMap.put(hostId, metadataString);
}
+ });
+ metadataMap.put(hostId, metadata);
+ serviceInstance.setMetadata(JSONObject.parseObject(metadata,
Map.class));
+ }
+ }
- if (changed) {
- List<ServiceInstance> oldServiceInstances =
cachedServiceInstances.getOrDefault(serviceName, new LinkedList<>());
+ public final void notifyListener(String serviceName,
ServiceInstancesChangedListener listener, List<ServiceInstance> instances) {
+ String serviceInstanceRevision =
RevisionResolver.calRevision(JSONObject.toJSONString(instances));
+ boolean changed = !serviceInstanceRevision.equalsIgnoreCase(
+ serviceInstanceRevisionMap.put(serviceName,
serviceInstanceRevision));
- // remove expired invoker
- Set<ServiceInstance> allServiceInstances = new
HashSet<>(oldServiceInstances.size() + instances.size());
- allServiceInstances.addAll(oldServiceInstances);
- allServiceInstances.addAll(instances);
+ if (logger.isDebugEnabled()) {
+ logger.debug("Poll DNS data. Service Instance changed: " + changed
+ " Service Name: " + serviceName);
+ }
- allServiceInstances.removeAll(oldServiceInstances);
+ if (changed) {
+ List<ServiceInstance> oldServiceInstances =
cachedServiceInstances.getOrDefault(serviceName, new LinkedList<>());
- allServiceInstances.forEach(removedServiceInstance
-> {
-
MetadataUtils.destroyMetadataServiceProxy(removedServiceInstance, this);
- });
+ // remove expired invoker
+ Set<ServiceInstance> allServiceInstances = new
HashSet<>(oldServiceInstances.size() + instances.size());
+ allServiceInstances.addAll(oldServiceInstances);
+ allServiceInstances.addAll(instances);
- cachedServiceInstances.put(serviceName, instances);
- listener.onEvent(new
ServiceInstancesChangedEvent(serviceName, instances));
- }
+ allServiceInstances.removeAll(oldServiceInstances);
- },
- pollingCycle, pollingCycle, TimeUnit.MILLISECONDS);
+ allServiceInstances.forEach(removedServiceInstance -> {
+
MetadataUtils.destroyMetadataServiceProxy(removedServiceInstance, this);
+ });
- pollingExecutorMap.put(serviceName, scheduledFuture);
- });
+ cachedServiceInstances.put(serviceName, instances);
+ listener.onEvent(new ServiceInstancesChangedEvent(serviceName,
instances));
+ }
}
- @Override
- public ServiceInstance getLocalInstance() {
- return serviceInstance;
+ public void doInitialize(URL registryURL) throws Exception {
}
- @Override
- public URL getUrl() {
- return registryURL;
+ public void doDestroy() throws Exception {
}
- /**
- * UT used only
- */
- @Deprecated
- public void setDnsResolver(DNSResolver dnsResolver) {
- this.dnsResolver = dnsResolver;
- }
+ public void doRegister(ServiceInstance serviceInstance) throws
RuntimeException {
- /**
- * UT used only
- */
- @Deprecated
- public ConcurrentHashMap<String, List<ServiceInstance>>
getCachedServiceInstances() {
- return cachedServiceInstances;
}
- @SuppressWarnings("unchecked")
- private List<ServiceInstance> toServiceInstance(String serviceName,
ResolveResult resolveResult) {
-
- int port;
+ public void doUpdate(ServiceInstance serviceInstance) throws
RuntimeException {
- if (resolveResult.getPort().size() > 0) {
- // use first as default
- port = resolveResult.getPort().get(0);
- } else {
- // not support SRV record
- port = 20880;
- }
-
- List<ServiceInstance> instanceList = new LinkedList<>();
-
- for (String host : resolveResult.getHostnameList()) {
- DefaultServiceInstance serviceInstance = new
DefaultServiceInstance(serviceName, host, port);
- String hostId = serviceInstance.getId();
- if (metadataMap.containsKey(hostId)) {
- // Use cached metadata.
- // Metadata will be updated by provider callback
-
- String metadataString = metadataMap.get(hostId);
-
serviceInstance.setMetadata(JSONObject.parseObject(metadataString, Map.class));
- } else {
- // refer from MetadataUtils, this proxy is different from the
one used to refer exportedURL
- MetadataService metadataService =
MetadataUtils.getMetadataServiceProxy(serviceInstance, this);
-
- String consumerId = ApplicationModel.getName() +
NetUtils.getLocalHost();
- String metadata = metadataService.getAndListenInstanceMetadata(
- consumerId, metadataString -> {
- logger.info("Receive callback: " + metadataString
+ serviceInstance);
- if (StringUtils.isEmpty(metadataString)) {
- // provider is shutdown
- metadataMap.remove(hostId);
- } else {
- metadataMap.put(hostId, metadataString);
- }
- });
- metadataMap.put(hostId, metadata);
- serviceInstance.setMetadata(JSONObject.parseObject(metadata,
Map.class));
- }
- instanceList.add(serviceInstance);
- }
-
- return instanceList;
}
- private void updateMetadata(ServiceInstance serviceInstance) {
- WritableMetadataService metadataService =
WritableMetadataService.getDefaultExtension();
- String metadataString =
JSONObject.toJSONString(serviceInstance.getMetadata());
- String metadataRevision = RevisionResolver.calRevision(metadataString);
-
- // check if metadata updated
- if (!metadataRevision.equalsIgnoreCase(lastMetadataRevision)) {
- logger.info("Update Service Instance Metadata of DNS registry.
Newer metadata: " + metadataString);
- if (logger.isDebugEnabled()) {
- logger.debug("Update Service Instance Metadata of DNS
registry. Newer metadata: " + metadataString);
- }
-
- lastMetadataRevision = metadataRevision;
+ public void doUnregister(ServiceInstance serviceInstance) throws
RuntimeException {
- // save newest metadata to local
- metadataService.exportInstanceMetadata(metadataString);
-
- // notify to consumer
- Map<String, InstanceMetadataChangedListener> listenerMap =
metadataService.getInstanceMetadataChangedListenerMap();
- Iterator<Map.Entry<String, InstanceMetadataChangedListener>>
iterator = listenerMap.entrySet().iterator();
+ }
- while (iterator.hasNext()) {
- Map.Entry<String, InstanceMetadataChangedListener> entry =
iterator.next();
- try {
- entry.getValue().onEvent(metadataString);
- } catch (RpcException e) {
- logger.warn("Notify to consumer error. Possible cause:
consumer is offline.");
- // remove listener if consumer is offline
- iterator.remove();
- }
- }
- }
+ /**
+ * UT used only
+ */
+ @Deprecated
+ public final ConcurrentHashMap<String, List<ServiceInstance>>
getCachedServiceInstances() {
+ return cachedServiceInstances;
}
}
diff --git
a/dubbo-registry/dubbo-registry-dns/src/main/java/org/apache/dubbo/registry/dns/DNSServiceDiscovery.java
b/dubbo-registry/dubbo-registry-dns/src/main/java/org/apache/dubbo/registry/dns/DNSServiceDiscovery.java
index 35596cf..a4d30d6 100644
---
a/dubbo-registry/dubbo-registry-dns/src/main/java/org/apache/dubbo/registry/dns/DNSServiceDiscovery.java
+++
b/dubbo-registry/dubbo-registry-dns/src/main/java/org/apache/dubbo/registry/dns/DNSServiceDiscovery.java
@@ -17,37 +17,21 @@
package org.apache.dubbo.registry.dns;
import org.apache.dubbo.common.URL;
-import org.apache.dubbo.common.constants.CommonConstants;
import org.apache.dubbo.common.logger.Logger;
import org.apache.dubbo.common.logger.LoggerFactory;
import org.apache.dubbo.common.utils.NamedThreadFactory;
-import org.apache.dubbo.common.utils.NetUtils;
-import org.apache.dubbo.common.utils.StringUtils;
-import org.apache.dubbo.metadata.InstanceMetadataChangedListener;
-import org.apache.dubbo.metadata.MetadataService;
-import org.apache.dubbo.metadata.RevisionResolver;
-import org.apache.dubbo.metadata.WritableMetadataService;
import org.apache.dubbo.registry.client.DefaultServiceInstance;
-import org.apache.dubbo.registry.client.ServiceDiscovery;
+import org.apache.dubbo.registry.client.SelfHostMetaServiceDiscovery;
import org.apache.dubbo.registry.client.ServiceInstance;
-import org.apache.dubbo.registry.client.event.ServiceInstancesChangedEvent;
import
org.apache.dubbo.registry.client.event.listener.ServiceInstancesChangedListener;
-import org.apache.dubbo.registry.client.metadata.MetadataUtils;
import org.apache.dubbo.registry.dns.util.DNSClientConst;
import org.apache.dubbo.registry.dns.util.DNSResolver;
import org.apache.dubbo.registry.dns.util.ResolveResult;
-import org.apache.dubbo.rpc.RpcException;
-import org.apache.dubbo.rpc.model.ApplicationModel;
-
-import com.alibaba.fastjson.JSONObject;
import java.util.Collections;
import java.util.Comparator;
-import java.util.HashSet;
-import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
-import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
@@ -55,29 +39,10 @@ import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
-public class DNSServiceDiscovery implements ServiceDiscovery {
+public class DNSServiceDiscovery extends SelfHostMetaServiceDiscovery {
private final Logger logger = LoggerFactory.getLogger(getClass());
- private URL registryURL;
-
- /**
- * Echo check if consumer is still work
- * echo task may take a lot of time when consumer offline, create a new
ScheduledThreadPool
- */
- private final ScheduledExecutorService echoCheckExecutor =
Executors.newScheduledThreadPool(1, new
NamedThreadFactory("Dubbo-DNS-EchoCheck"));
-
- // =================================== Provider side
=================================== //
-
- private ServiceInstance serviceInstance;
-
- /**
- * Local {@link ServiceInstance} Metadata's revision
- */
- private String lastMetadataRevision;
-
- // =================================== Consumer side
=================================== //
-
/**
* DNS properties
*/
@@ -88,31 +53,6 @@ public class DNSServiceDiscovery implements ServiceDiscovery
{
private DNSResolver dnsResolver;
/**
- * Local Cache of {@link ServiceInstance} Metadata
- * <p>
- * Key - {@link ServiceInstance} ID ( usually ip + port )
- * Value - Json processed metadata string
- */
- private final ConcurrentHashMap<String, String> metadataMap = new
ConcurrentHashMap<>();
-
- /**
- * Local Cache of {@link ServiceInstance}
- * <p>
- * Key - Service Name
- * Value - List {@link ServiceInstance}
- */
- private final ConcurrentHashMap<String, List<ServiceInstance>>
cachedServiceInstances = new ConcurrentHashMap<>();
-
- /**
- * Local Cache of Service's {@link ServiceInstance} list revision,
- * used to check if {@link ServiceInstance} list has been updated
- * <p>
- * Key - ServiceName
- * Value - a revision calculate from {@link List} of {@link
ServiceInstance}
- */
- private final ConcurrentHashMap<String, String> serviceInstanceRevisionMap
= new ConcurrentHashMap<>();
-
- /**
* Polling task ScheduledFuture, used to stop task when destroy
*/
private final ConcurrentHashMap<String, ScheduledFuture<?>>
pollingExecutorMap = new ConcurrentHashMap<>();
@@ -123,78 +63,30 @@ public class DNSServiceDiscovery implements
ServiceDiscovery {
private ScheduledExecutorService pollingExecutorService;
@Override
- public void initialize(URL registryURL) throws Exception {
- this.registryURL = registryURL;
+ public void doInitialize(URL registryURL) throws Exception {
this.addressPrefix =
registryURL.getParameter(DNSClientConst.ADDRESS_PREFIX, "");
this.addressSuffix =
registryURL.getParameter(DNSClientConst.ADDRESS_SUFFIX, "");
- this.pollingCycle =
registryURL.getParameter(DNSClientConst.POLLING_CYCLE, 5000);
- long echoPollingCycle =
registryURL.getParameter(DNSClientConst.ECHO_POLLING_CYCLE, 60000);
- int scheduledThreadPoolSize =
registryURL.getParameter(DNSClientConst.SCHEDULED_THREAD_POOL_SIZE, 1);
+ this.pollingCycle =
registryURL.getParameter(DNSClientConst.DNS_POLLING_CYCLE,
DNSClientConst.DEFAULT_DNS_POLLING_CYCLE);
String nameserver = registryURL.getHost();
int port = registryURL.getPort();
int maxQueriesPerResolve =
registryURL.getParameter(DNSClientConst.MAX_QUERIES_PER_RESOLVE, 10);
this.dnsResolver = new DNSResolver(nameserver, port,
maxQueriesPerResolve);
- // polling task may take a lot of time, create a new
ScheduledThreadPool
- pollingExecutorService =
Executors.newScheduledThreadPool(scheduledThreadPoolSize, new
NamedThreadFactory("Dubbo-DNS-EchoCheck"));
- // Echo check: test if consumer is offline, remove
MetadataChangeListener,
- // reduce the probability of failure when metadata update
- echoCheckExecutor.scheduleAtFixedRate(() -> {
- WritableMetadataService metadataService =
WritableMetadataService.getDefaultExtension();
- Map<String, InstanceMetadataChangedListener> listenerMap =
metadataService.getInstanceMetadataChangedListenerMap();
- Iterator<Map.Entry<String, InstanceMetadataChangedListener>>
iterator = listenerMap.entrySet().iterator();
+ int scheduledThreadPoolSize =
registryURL.getParameter(DNSClientConst.DNS_POLLING_POOL_SIZE_KEY,
DNSClientConst.DEFAULT_DNS_POLLING_POOL_SIZE);
- while (iterator.hasNext()) {
- Map.Entry<String, InstanceMetadataChangedListener> entry =
iterator.next();
- try {
- entry.getValue().echo(CommonConstants.DUBBO);
- } catch (RpcException e) {
- if (logger.isInfoEnabled()) {
- logger.info("Send echo message to consumer error.
Possible cause: consumer is offline.");
- }
- iterator.remove();
- }
- }
- }, echoPollingCycle, echoPollingCycle, TimeUnit.MILLISECONDS);
+ // polling task may take a lot of time, create a new
ScheduledThreadPool
+ pollingExecutorService =
Executors.newScheduledThreadPool(scheduledThreadPoolSize, new
NamedThreadFactory("Dubbo-DNS-Poll"));
}
@Override
- public void destroy() throws Exception {
- metadataMap.clear();
- serviceInstanceRevisionMap.clear();
+ public void doDestroy() throws Exception {
+ dnsResolver.destroy();
pollingExecutorMap.forEach((serviceName, scheduledFuture) ->
scheduledFuture.cancel(true));
pollingExecutorMap.clear();
- echoCheckExecutor.shutdown();
pollingExecutorService.shutdown();
- dnsResolver.destroy();
- }
-
- @Override
- public void register(ServiceInstance serviceInstance) throws
RuntimeException {
- this.serviceInstance = serviceInstance;
-
- updateMetadata(serviceInstance);
- }
-
- @Override
- public void update(ServiceInstance serviceInstance) throws
RuntimeException {
- this.serviceInstance = serviceInstance;
-
- updateMetadata(serviceInstance);
- }
-
- @Override
- public void unregister(ServiceInstance serviceInstance) throws
RuntimeException {
- this.serviceInstance = null;
-
- // notify empty message to consumer
- WritableMetadataService metadataService =
WritableMetadataService.getDefaultExtension();
- metadataService.exportInstanceMetadata("");
-
metadataService.getInstanceMetadataChangedListenerMap().forEach((consumerId,
listener) -> listener.onEvent(""));
- metadataService.getInstanceMetadataChangedListenerMap().clear();
}
@Override
@@ -219,33 +111,7 @@ public class DNSServiceDiscovery implements
ServiceDiscovery {
ScheduledFuture<?> scheduledFuture =
pollingExecutorService.scheduleAtFixedRate(() -> {
List<ServiceInstance> instances =
getInstances(serviceName);
instances.sort(Comparator.comparingInt(ServiceInstance::hashCode));
-
- String serviceInstanceRevision =
RevisionResolver.calRevision(JSONObject.toJSONString(instances));
- boolean changed =
!serviceInstanceRevision.equalsIgnoreCase(
- serviceInstanceRevisionMap.put(serviceName,
serviceInstanceRevision));
-
- if (logger.isDebugEnabled()) {
- logger.debug("Poll DNS data. Service Instance
changed: " + changed + " Service Name: " + serviceName);
- }
-
- if (changed) {
- List<ServiceInstance> oldServiceInstances =
cachedServiceInstances.getOrDefault(serviceName, new LinkedList<>());
-
- // remove expired invoker
- Set<ServiceInstance> allServiceInstances = new
HashSet<>(oldServiceInstances.size() + instances.size());
- allServiceInstances.addAll(oldServiceInstances);
- allServiceInstances.addAll(instances);
-
- allServiceInstances.removeAll(oldServiceInstances);
-
- allServiceInstances.forEach(removedServiceInstance
-> {
-
MetadataUtils.destroyMetadataServiceProxy(removedServiceInstance, this);
- });
-
- cachedServiceInstances.put(serviceName, instances);
- listener.onEvent(new
ServiceInstancesChangedEvent(serviceName, instances));
- }
-
+ notifyListener(serviceName, listener, instances);
},
pollingCycle, pollingCycle, TimeUnit.MILLISECONDS);
@@ -253,16 +119,6 @@ public class DNSServiceDiscovery implements
ServiceDiscovery {
});
}
- @Override
- public ServiceInstance getLocalInstance() {
- return serviceInstance;
- }
-
- @Override
- public URL getUrl() {
- return registryURL;
- }
-
/**
* UT used only
*/
@@ -271,15 +127,6 @@ public class DNSServiceDiscovery implements
ServiceDiscovery {
this.dnsResolver = dnsResolver;
}
- /**
- * UT used only
- */
- @Deprecated
- public ConcurrentHashMap<String, List<ServiceInstance>>
getCachedServiceInstances() {
- return cachedServiceInstances;
- }
-
- @SuppressWarnings("unchecked")
private List<ServiceInstance> toServiceInstance(String serviceName,
ResolveResult resolveResult) {
int port;
@@ -296,68 +143,10 @@ public class DNSServiceDiscovery implements
ServiceDiscovery {
for (String host : resolveResult.getHostnameList()) {
DefaultServiceInstance serviceInstance = new
DefaultServiceInstance(serviceName, host, port);
- String hostId = serviceInstance.getId();
- if (metadataMap.containsKey(hostId)) {
- // Use cached metadata.
- // Metadata will be updated by provider callback
-
- String metadataString = metadataMap.get(hostId);
-
serviceInstance.setMetadata(JSONObject.parseObject(metadataString, Map.class));
- } else {
- // refer from MetadataUtils, this proxy is different from the
one used to refer exportedURL
- MetadataService metadataService =
MetadataUtils.getMetadataServiceProxy(serviceInstance, this);
-
- String consumerId = ApplicationModel.getName() +
NetUtils.getLocalHost();
- String metadata = metadataService.getAndListenInstanceMetadata(
- consumerId, metadataString -> {
- logger.info("Receive callback: " + metadataString
+ serviceInstance);
- if (StringUtils.isEmpty(metadataString)) {
- // provider is shutdown
- metadataMap.remove(hostId);
- } else {
- metadataMap.put(hostId, metadataString);
- }
- });
- metadataMap.put(hostId, metadata);
- serviceInstance.setMetadata(JSONObject.parseObject(metadata,
Map.class));
- }
+ fillServiceInstance(serviceInstance);
instanceList.add(serviceInstance);
}
return instanceList;
}
-
- private void updateMetadata(ServiceInstance serviceInstance) {
- WritableMetadataService metadataService =
WritableMetadataService.getDefaultExtension();
- String metadataString =
JSONObject.toJSONString(serviceInstance.getMetadata());
- String metadataRevision = RevisionResolver.calRevision(metadataString);
-
- // check if metadata updated
- if (!metadataRevision.equalsIgnoreCase(lastMetadataRevision)) {
- logger.info("Update Service Instance Metadata of DNS registry.
Newer metadata: " + metadataString);
- if (logger.isDebugEnabled()) {
- logger.debug("Update Service Instance Metadata of DNS
registry. Newer metadata: " + metadataString);
- }
-
- lastMetadataRevision = metadataRevision;
-
- // save newest metadata to local
- metadataService.exportInstanceMetadata(metadataString);
-
- // notify to consumer
- Map<String, InstanceMetadataChangedListener> listenerMap =
metadataService.getInstanceMetadataChangedListenerMap();
- Iterator<Map.Entry<String, InstanceMetadataChangedListener>>
iterator = listenerMap.entrySet().iterator();
-
- while (iterator.hasNext()) {
- Map.Entry<String, InstanceMetadataChangedListener> entry =
iterator.next();
- try {
- entry.getValue().onEvent(metadataString);
- } catch (RpcException e) {
- logger.warn("Notify to consumer error. Possible cause:
consumer is offline.");
- // remove listener if consumer is offline
- iterator.remove();
- }
- }
- }
- }
}
diff --git
a/dubbo-registry/dubbo-registry-dns/src/main/java/org/apache/dubbo/registry/dns/util/DNSClientConst.java
b/dubbo-registry/dubbo-registry-dns/src/main/java/org/apache/dubbo/registry/dns/util/DNSClientConst.java
index c01fa56..0fc8ada 100644
---
a/dubbo-registry/dubbo-registry-dns/src/main/java/org/apache/dubbo/registry/dns/util/DNSClientConst.java
+++
b/dubbo-registry/dubbo-registry-dns/src/main/java/org/apache/dubbo/registry/dns/util/DNSClientConst.java
@@ -24,10 +24,24 @@ public class DNSClientConst {
public final static String MAX_QUERIES_PER_RESOLVE =
"maxQueriesPerResolve";
- public final static String POLLING_CYCLE = "pollingCycle";
-
- public final static String ECHO_POLLING_CYCLE = "echoPollingCycle";
-
- public final static String SCHEDULED_THREAD_POOL_SIZE =
"scheduledThreadPoolSize";
+ /**
+ * To decide the frequency of execute DNS poll (in ms)
+ */
+ public final static String DNS_POLLING_CYCLE = "dnsPollingCycle";
+
+ /**
+ * Default value for check frequency: 60000 (ms)
+ */
+ public final static int DEFAULT_DNS_POLLING_CYCLE = 60000;
+
+ /**
+ * To decide how many threads used to execute DNS poll
+ */
+ public final static String DNS_POLLING_POOL_SIZE_KEY =
"dnsPollingPoolSize";
+
+ /**
+ * Default value for DNS pool thread: 1
+ */
+ public final static int DEFAULT_DNS_POLLING_POOL_SIZE = 1;
}
diff --git
a/dubbo-registry/dubbo-registry-dns/src/test/java/org/apache/dubbo/registry/dns/DNSServiceDiscoveryTest.java
b/dubbo-registry/dubbo-registry-dns/src/test/java/org/apache/dubbo/registry/dns/DNSServiceDiscoveryTest.java
index c36ced1..cc6610b 100644
---
a/dubbo-registry/dubbo-registry-dns/src/test/java/org/apache/dubbo/registry/dns/DNSServiceDiscoveryTest.java
+++
b/dubbo-registry/dubbo-registry-dns/src/test/java/org/apache/dubbo/registry/dns/DNSServiceDiscoveryTest.java
@@ -27,6 +27,7 @@ import org.apache.dubbo.config.ServiceConfig;
import org.apache.dubbo.metadata.InstanceMetadataChangedListener;
import org.apache.dubbo.metadata.MetadataService;
import org.apache.dubbo.metadata.WritableMetadataService;
+import org.apache.dubbo.registry.Constants;
import org.apache.dubbo.registry.client.DefaultServiceInstance;
import org.apache.dubbo.registry.client.ServiceDiscovery;
import org.apache.dubbo.registry.client.ServiceInstance;
@@ -107,8 +108,8 @@ public class DNSServiceDiscoveryTest {
DNSServiceDiscovery dnsServiceDiscovery = new DNSServiceDiscovery();
URL registryURL = URL.valueOf("dns://")
- .addParameter(DNSClientConst.POLLING_CYCLE, 100)
- .addParameter(DNSClientConst.ECHO_POLLING_CYCLE, 100);
+ .addParameter(DNSClientConst.DNS_POLLING_CYCLE, 100)
+ .addParameter(Constants.ECHO_POLLING_CYCLE_KEY, 100);
ApplicationModel.getEnvironment().getAppExternalConfigurationMap()
.put(METADATA_PROXY_TIMEOUT_KEY, String.valueOf(500));
dnsServiceDiscovery.initialize(registryURL);