This is an automated email from the ASF dual-hosted git repository. liujun pushed a commit to branch 3.0 in repository https://gitbox.apache.org/repos/asf/dubbo.git
commit 42f0529054187227dedd153b14d2b7b4605b8ff4 Author: ken.lj <[email protected]> AuthorDate: Tue Jul 14 01:51:51 2020 +0800 can basically work with InstanceAddressURL --- .../src/main/java/org/apache/dubbo/common/URL.java | 12 +- .../org/apache/dubbo/metadata/MetadataInfo.java | 118 ++++++++++++++++++-- .../registry/client/DefaultServiceInstance.java | 27 +++-- .../dubbo/registry/client/InstanceAddressURL.java | 123 +++++++++++++++++---- .../registry/client/ServiceDiscoveryRegistry.java | 4 +- .../client/ServiceDiscoveryRegistryDirectory.java | 69 +++++++----- .../metadata/ServiceInstanceMetadataUtils.java | 18 +-- .../registry/integration/DynamicDirectory.java | 9 +- .../registry/integration/RegistryDirectory.java | 2 +- .../apache/dubbo/remoting/exchange/Exchangers.java | 2 +- .../dubbo/remoting/transport/AbstractEndpoint.java | 2 +- .../dubbo/rpc/proxy/InvokerInvocationHandler.java | 1 + .../dubbo/rpc/protocol/dubbo/DubboProtocol.java | 5 +- .../dubbo/rpc/protocol/thrift/ThriftProtocol.java | 2 +- 14 files changed, 292 insertions(+), 102 deletions(-) diff --git a/dubbo-common/src/main/java/org/apache/dubbo/common/URL.java b/dubbo-common/src/main/java/org/apache/dubbo/common/URL.java index 5f811fa..9742ebb 100644 --- a/dubbo-common/src/main/java/org/apache/dubbo/common/URL.java +++ b/dubbo-common/src/main/java/org/apache/dubbo/common/URL.java @@ -93,19 +93,19 @@ class URL implements Serializable { private static final long serialVersionUID = -1985165475234910535L; - private final String protocol; + protected String protocol; - private final String username; + protected String username; - private final String password; + protected String password; // by default, host to registry - private final String host; + protected String host; // by default, port to registry - private final int port; + protected int port; - private final String path; + protected String path; private final Map<String, String> parameters; diff --git a/dubbo-metadata/dubbo-metadata-api/src/main/java/org/apache/dubbo/metadata/MetadataInfo.java b/dubbo-metadata/dubbo-metadata-api/src/main/java/org/apache/dubbo/metadata/MetadataInfo.java index 4db0298..931a331 100644 --- a/dubbo-metadata/dubbo-metadata-api/src/main/java/org/apache/dubbo/metadata/MetadataInfo.java +++ b/dubbo-metadata/dubbo-metadata-api/src/main/java/org/apache/dubbo/metadata/MetadataInfo.java @@ -29,6 +29,7 @@ import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.SortedSet; import java.util.TreeSet; import java.util.concurrent.atomic.AtomicBoolean; @@ -154,7 +155,13 @@ public class MetadataInfo implements Serializable { if (serviceInfo == null) { return Collections.emptyMap(); } - return serviceInfo.getParams(); + return serviceInfo.getAllParams(); + } + + @Override + public String toString() { + // FIXME + return super.toString(); } public static class ServiceInfo implements Serializable { @@ -163,9 +170,12 @@ public class MetadataInfo implements Serializable { private String group; private String version; private String protocol; + private String path; // most of the time, path is the same with the interface name. private Map<String, String> params; + private transient Map<String, String> consumerParams; private transient Map<String, Map<String, String>> methodParams; + private transient Map<String, Map<String, String>> consumerMethodParams; private transient String serviceKey; private transient String matchKey; @@ -173,14 +183,7 @@ public class MetadataInfo implements Serializable { } public ServiceInfo(URL url) { - // FIXME, how to match registry. - this( - url.getServiceInterface(), - url.getParameter(GROUP_KEY), - url.getParameter(VERSION_KEY), - url.getProtocol(), - null - ); + this(url.getServiceInterface(), url.getParameter(GROUP_KEY), url.getParameter(VERSION_KEY), url.getProtocol(), url.getPath(), null); Map<String, String> params = new HashMap<>(); List<MetadataParamsFilter> filters = loader.getActivateExtension(url, "params-filter"); @@ -207,11 +210,12 @@ public class MetadataInfo implements Serializable { this.params = params; } - public ServiceInfo(String name, String group, String version, String protocol, Map<String, String> params) { + public ServiceInfo(String name, String group, String version, String protocol, String path, Map<String, String> params) { this.name = name; this.group = group; this.version = version; this.protocol = protocol; + this.path = path; this.params = params == null ? new HashMap<>() : params; this.serviceKey = URL.buildKey(name, group, version); @@ -266,6 +270,14 @@ public class MetadataInfo implements Serializable { this.version = version; } + public String getPath() { + return path; + } + + public void setPath(String path) { + this.path = path; + } + public Map<String, String> getParams() { if (params == null) { return Collections.emptyMap(); @@ -277,16 +289,42 @@ public class MetadataInfo implements Serializable { this.params = params; } + public Map<String, String> getAllParams() { + if (consumerParams != null) { + Map<String, String> allParams = new HashMap<>((int) ((params.size() + consumerParams.size()) / 0.75f + 1)); + allParams.putAll(params); + allParams.putAll(consumerParams); + return allParams; + } + return params; + } + public String getParameter(String key) { + if (consumerParams != null) { + String value = consumerParams.get(key); + if (value != null) { + return value; + } + } return params.get(key); } public String getMethodParameter(String method, String key, String defaultValue) { if (methodParams == null) { methodParams = URL.toMethodParameters(params); + consumerMethodParams = URL.toMethodParameters(consumerParams); + } + + String value = getMethodParameter(method, key, consumerMethodParams); + if (value != null) { + return value; } + value = getMethodParameter(method, key, methodParams); + return value == null ? defaultValue : value; + } - Map<String, String> keyMap = methodParams.get(method); + private String getMethodParameter(String method, String key, Map<String, Map<String, String>> map) { + Map<String, String> keyMap = map.get(method); String value = null; if (keyMap != null) { value = keyMap.get(key); @@ -294,7 +332,21 @@ public class MetadataInfo implements Serializable { if (StringUtils.isEmpty(value)) { value = getParameter(key); } - return value == null ? defaultValue : value; + return value; + } + + public boolean hasMethodParameter(String method, String key) { + String value = this.getMethodParameter(method, key, (String) null); + return StringUtils.isNotEmpty(value); + } + + public boolean hasMethodParameter(String method) { + if (methodParams == null) { + methodParams = URL.toMethodParameters(params); + consumerMethodParams = URL.toMethodParameters(consumerParams); + } + + return consumerMethodParams.containsKey(method) || methodParams.containsKey(method); } public String toDescString() { @@ -310,5 +362,47 @@ public class MetadataInfo implements Serializable { } return methodStrings.toString(); } + + public void addParameter(String key, String value) { + if (consumerParams != null) { + this.consumerParams.put(key, value); + } + } + + public void addParameterIfAbsent(String key, String value) { + if (consumerParams != null) { + this.consumerParams.putIfAbsent(key, value); + } + } + + public void addConsumerParams(Map<String, String> params) { + // copy once for one service subscription + if (consumerParams == null) { + consumerParams = new HashMap<>(params); + } + } + + @Override + public boolean equals(Object obj) { + if (obj == null) { + return false; + } + if (!(obj instanceof ServiceInfo)) { + return false; + } + + ServiceInfo serviceInfo = (ServiceInfo) obj; + return this.getMatchKey().equals(serviceInfo.getMatchKey()) && this.getParams().equals(serviceInfo.getParams()); + } + + @Override + public int hashCode() { + return Objects.hash(getMatchKey(), getParams()); + } + + @Override + public String toString() { + return super.toString(); + } } } diff --git a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/DefaultServiceInstance.java b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/DefaultServiceInstance.java index e44bd4f..3ee5dd2 100644 --- a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/DefaultServiceInstance.java +++ b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/DefaultServiceInstance.java @@ -22,6 +22,8 @@ import java.util.HashMap; import java.util.Map; import java.util.Objects; +import static org.apache.dubbo.common.constants.CommonConstants.REVISION_KEY; + /** * The default implementation of {@link ServiceInstance}. * @@ -166,18 +168,29 @@ public class DefaultServiceInstance implements ServiceInstance { if (this == o) return true; if (!(o instanceof DefaultServiceInstance)) return false; DefaultServiceInstance that = (DefaultServiceInstance) o; - return isEnabled() == that.isEnabled() && - isHealthy() == that.isHealthy() && - Objects.equals(getId(), that.getId()) && - Objects.equals(getServiceName(), that.getServiceName()) && + boolean equals = Objects.equals(getServiceName(), that.getServiceName()) && Objects.equals(getHost(), that.getHost()) && - Objects.equals(getPort(), that.getPort()) && - Objects.equals(getMetadata(), that.getMetadata()); + Objects.equals(getPort(), that.getPort()); + for (Map.Entry<String, String> entry : this.getMetadata().entrySet()) { + if (entry.getKey().equals(REVISION_KEY)) { + continue; + } + equals = equals && !entry.getValue().equals(that.getMetadata().get(entry.getKey())); + } + + return equals; } @Override public int hashCode() { - return Objects.hash(getId(), getServiceName(), getHost(), getPort(), isEnabled(), isHealthy(), getMetadata()); + int result = Objects.hash(getServiceName(), getHost(), getPort()); + for (Map.Entry<String, String> entry : this.getMetadata().entrySet()) { + if (entry.getKey().equals(REVISION_KEY)) { + continue; + } + result = 31 * result + (entry.getValue() == null ? 0 : entry.getValue().hashCode()); + } + return result; } @Override diff --git a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/InstanceAddressURL.java b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/InstanceAddressURL.java index 18632bf..149c6e2 100644 --- a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/InstanceAddressURL.java +++ b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/InstanceAddressURL.java @@ -28,6 +28,9 @@ import static org.apache.dubbo.common.constants.CommonConstants.GROUP_KEY; import static org.apache.dubbo.common.constants.CommonConstants.INTERFACE_KEY; import static org.apache.dubbo.common.constants.CommonConstants.VERSION_KEY; +/** + * FIXME, replace RpcContext operations with explicitly defined APIs + */ public class InstanceAddressURL extends URL { private ServiceInstance instance; private MetadataInfo metadataInfo; @@ -36,10 +39,20 @@ public class InstanceAddressURL extends URL { ServiceInstance instance, MetadataInfo metadataInfo ) { +// super() this.instance = instance; this.metadataInfo = metadataInfo; - this.setHost(instance.getHost()); - this.setPort(instance.getPort()); + this.host = instance.getHost(); + this.port = instance.getPort(); + } + + + public ServiceInstance getInstance() { + return instance; + } + + public MetadataInfo getMetadataInfo() { + return metadataInfo; } @Override @@ -71,6 +84,12 @@ public class InstanceAddressURL extends URL { } @Override + public String getPath() { + MetadataInfo.ServiceInfo serviceInfo = metadataInfo.getServiceInfo(getProtocolServiceKey()); + return serviceInfo.getPath(); + } + + @Override public String getParameter(String key) { if (VERSION_KEY.equals(key)) { return getVersion(); @@ -80,10 +99,7 @@ public class InstanceAddressURL extends URL { return getServiceInterface(); } - String value = getConsumerParameters().get(key); - if (StringUtils.isEmpty(value)) { - value = getInstanceMetadata().get(key); - } + String value = getInstanceMetadata().get(key); if (StringUtils.isEmpty(value) && metadataInfo != null) { value = metadataInfo.getParameter(key, RpcContext.getContext().getProtocolServiceKey()); } @@ -109,15 +125,52 @@ public class InstanceAddressURL extends URL { @Override public String getMethodParameter(String method, String key) { - String value = getMethodParameter(method, key); + MetadataInfo.ServiceInfo serviceInfo = metadataInfo.getServiceInfo(getProtocolServiceKey()); + String value = serviceInfo.getMethodParameter(method, key, null); if (StringUtils.isNotEmpty(value)) { return value; } - MetadataInfo.ServiceInfo serviceInfo = metadataInfo.getServiceInfo(getServiceKey()); - return serviceInfo.getMethodParameter(method, key, null); + return getParameter(key); } @Override + public boolean hasMethodParameter(String method, String key) { + MetadataInfo.ServiceInfo serviceInfo = metadataInfo.getServiceInfo(getProtocolServiceKey()); + + if (method == null) { + String suffix = "." + key; + for (String fullKey : getParameters().keySet()) { + if (fullKey.endsWith(suffix)) { + return true; + } + } + return false; + } + if (key == null) { + String prefix = method + "."; + for (String fullKey : getParameters().keySet()) { + if (fullKey.startsWith(prefix)) { + return true; + } + } + return false; + } + + return serviceInfo.hasMethodParameter(method, key); + } + + @Override + public boolean hasMethodParameter(String method) { + MetadataInfo.ServiceInfo serviceInfo = metadataInfo.getServiceInfo(getProtocolServiceKey()); + return serviceInfo.hasMethodParameter(method); + } + + /** + * Avoid calling this method in RPC call. + * + * @return + */ + @Override public Map<String, String> getParameters() { Map<String, String> instanceParams = getInstanceMetadata(); Map<String, String> metadataParams = (metadataInfo == null ? new HashMap<>() : metadataInfo.getParameters(RpcContext.getContext().getProtocolServiceKey())); @@ -130,8 +183,6 @@ public class InstanceAddressURL extends URL { if (metadataParams != null) { params.putAll(metadataParams); } - - params.putAll(getConsumerParameters()); return params; } @@ -139,32 +190,56 @@ public class InstanceAddressURL extends URL { return this.instance.getMetadata(); } - private Map<String, String> getConsumerParameters() { - return RpcContext.getContext().getConsumerUrl().getParameters(); - } + @Override + public URL addParameter(String key, String value) { + if (StringUtils.isEmpty(key) || StringUtils.isEmpty(value)) { + return this; + } - private String getConsumerParameter(String key) { - return RpcContext.getContext().getConsumerUrl().getParameter(key); + String protocolServiceKey = RpcContext.getContext().getProtocolServiceKey(); + getMetadataInfo().getServiceInfo(protocolServiceKey).addParameter(key, value); + return this; } - private String getConsumerMethodParameter(String method, String key) { - return RpcContext.getContext().getConsumerUrl().getMethodParameter(method, key); + @Override + public URL addParameterIfAbsent(String key, String value) { + if (StringUtils.isEmpty(key) || StringUtils.isEmpty(value)) { + return this; + } + + String protocolServiceKey = RpcContext.getContext().getProtocolServiceKey(); + getMetadataInfo().getServiceInfo(protocolServiceKey).addParameterIfAbsent(key, value); + return this; } - @Override - public URL addParameter(String key, String value) { - throw new UnsupportedOperationException(""); + public URL addConsumerParams(Map<String, String> params) { + String protocolServiceKey = RpcContext.getContext().getProtocolServiceKey(); + getMetadataInfo().getServiceInfo(protocolServiceKey).addConsumerParams(params); + return this; } @Override public boolean equals(Object obj) { // instance metadata equals - // service metadata equals - return super.equals(obj); + if (obj == null) { + return false; + } + if (!(obj instanceof InstanceAddressURL)) { + return false; + } + + InstanceAddressURL that = (InstanceAddressURL) obj; + + return this.getInstance().equals(that.getInstance()); } @Override public int hashCode() { - return super.hashCode(); + return getInstance().hashCode(); + } + + @Override + public String toString() { + return super.toString(); } } diff --git a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/ServiceDiscoveryRegistry.java b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/ServiceDiscoveryRegistry.java index 9b129f9..7d4d48e 100644 --- a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/ServiceDiscoveryRegistry.java +++ b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/ServiceDiscoveryRegistry.java @@ -52,6 +52,8 @@ import static java.util.Collections.emptySet; import static java.util.Collections.unmodifiableSet; import static java.util.stream.Collectors.toSet; import static java.util.stream.Stream.of; +import static org.apache.dubbo.common.constants.CommonConstants.DUBBO; +import static org.apache.dubbo.common.constants.CommonConstants.GROUP_CHAR_SEPERATOR; import static org.apache.dubbo.common.constants.CommonConstants.GROUP_KEY; import static org.apache.dubbo.common.constants.CommonConstants.INTERFACE_KEY; import static org.apache.dubbo.common.constants.CommonConstants.MAPPING_KEY; @@ -316,7 +318,7 @@ public class ServiceDiscoveryRegistry implements Registry { List<ServiceInstance> serviceInstances = serviceDiscovery.getInstances(serviceName); serviceListener.onEvent(new ServiceInstancesChangedEvent(serviceName, serviceInstances)); }); - listener.notify(serviceListener.getUrls(url.getProtocolServiceKey())); + listener.notify(serviceListener.getUrls(url.getServiceKey() + GROUP_CHAR_SEPERATOR + url.getParameter(PROTOCOL_KEY, DUBBO))); serviceListener.addListener(url.getProtocolServiceKey(), listener); registerServiceInstancesChangedListener(url, serviceListener); diff --git a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/ServiceDiscoveryRegistryDirectory.java b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/ServiceDiscoveryRegistryDirectory.java index 3fd7713..e01a36e 100644 --- a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/ServiceDiscoveryRegistryDirectory.java +++ b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/ServiceDiscoveryRegistryDirectory.java @@ -45,7 +45,7 @@ public class ServiceDiscoveryRegistryDirectory<T> extends DynamicDirectory<T> im private static final Logger logger = LoggerFactory.getLogger(ServiceDiscoveryRegistryDirectory.class); // Map<url, Invoker> cache service url to invoker mapping. - private volatile Map<URL, Invoker<T>> urlInvokerMap; // The initial value is null and the midway may be assigned to null, please use the local variable reference + private volatile Map<String, Invoker<T>> urlInvokerMap; // The initial value is null and the midway may be assigned to null, please use the local variable reference private ServiceInstancesChangedListener listener; @@ -75,11 +75,12 @@ public class ServiceDiscoveryRegistryDirectory<T> extends DynamicDirectory<T> im destroyAllInvokers(); // Close all invokers } else { this.forbidden = false; // Allow to access - Map<URL, Invoker<T>> oldUrlInvokerMap = this.urlInvokerMap; // local reference + Map<String, Invoker<T>> oldUrlInvokerMap = this.urlInvokerMap; // local reference if (CollectionUtils.isEmpty(invokerUrls)) { return; } - Map<URL, Invoker<T>> newUrlInvokerMap = toInvokers(invokerUrls);// Translate url list to Invoker map + + Map<String, Invoker<T>> newUrlInvokerMap = toInvokers(invokerUrls);// Translate url list to Invoker map if (CollectionUtils.isEmptyMap(newUrlInvokerMap)) { logger.error(new IllegalStateException("Cannot create invokers from url address list (total " + invokerUrls.size() + ")")); @@ -109,51 +110,63 @@ public class ServiceDiscoveryRegistryDirectory<T> extends DynamicDirectory<T> im * @param urls * @return invokers */ - private Map<URL, Invoker<T>> toInvokers(List<URL> urls) { - Map<URL, Invoker<T>> newUrlInvokerMap = new HashMap<>(); + private Map<String, Invoker<T>> toInvokers(List<URL> urls) { + Map<String, Invoker<T>> newUrlInvokerMap = new HashMap<>(); if (urls == null || urls.isEmpty()) { return newUrlInvokerMap; } for (URL url : urls) { - if (EMPTY_PROTOCOL.equals(url.getProtocol())) { + InstanceAddressURL instanceAddressURL = (InstanceAddressURL) url; + if (EMPTY_PROTOCOL.equals(instanceAddressURL.getProtocol())) { continue; } - if (!ExtensionLoader.getExtensionLoader(Protocol.class).hasExtension(url.getProtocol())) { - logger.error(new IllegalStateException("Unsupported protocol " + url.getProtocol() + - " in notified url: " + url + " from registry " + getUrl().getAddress() + + if (!ExtensionLoader.getExtensionLoader(Protocol.class).hasExtension(instanceAddressURL.getProtocol())) { + logger.error(new IllegalStateException("Unsupported protocol " + instanceAddressURL.getProtocol() + + " in notified url: " + instanceAddressURL + " from registry " + getUrl().getAddress() + " to consumer " + NetUtils.getLocalHost() + ", supported protocol: " + ExtensionLoader.getExtensionLoader(Protocol.class).getSupportedExtensions())); continue; } - if (urlInvokerMap != null && urlInvokerMap.containsKey(url)) { // Repeated url - continue; - } - Invoker<T> invoker = urlInvokerMap == null ? null : urlInvokerMap.get(url); - if (invoker == null) { // Not in the cache, refer again + // FIXME, some keys may need to be removed. + instanceAddressURL.addConsumerParams(queryMap); + + Invoker<T> invoker = urlInvokerMap == null ? null : urlInvokerMap.get(instanceAddressURL.getAddress()); + if (invoker == null || urlChanged(invoker, instanceAddressURL)) { // Not in the cache, refer again try { boolean enabled = true; - if (url.hasParameter(DISABLED_KEY)) { - enabled = !url.getParameter(DISABLED_KEY, false); + if (instanceAddressURL.hasParameter(DISABLED_KEY)) { + enabled = !instanceAddressURL.getParameter(DISABLED_KEY, false); } else { - enabled = url.getParameter(ENABLED_KEY, true); + enabled = instanceAddressURL.getParameter(ENABLED_KEY, true); } if (enabled) { - invoker = protocol.refer(serviceType, url); + invoker = protocol.refer(serviceType, instanceAddressURL); } } catch (Throwable t) { - logger.error("Failed to refer invoker for interface:" + serviceType + ",url:(" + url + ")" + t.getMessage(), t); + logger.error("Failed to refer invoker for interface:" + serviceType + ",url:(" + instanceAddressURL + ")" + t.getMessage(), t); } if (invoker != null) { // Put new invoker in cache - newUrlInvokerMap.put(url, invoker); + newUrlInvokerMap.put(instanceAddressURL.getAddress(), invoker); } } else { - newUrlInvokerMap.put(url, invoker); + newUrlInvokerMap.put(instanceAddressURL.getAddress(), invoker); } } return newUrlInvokerMap; } + private boolean urlChanged(Invoker<T> invoker, InstanceAddressURL newURL) { + InstanceAddressURL oldURL = (InstanceAddressURL) invoker.getUrl(); + + if (!newURL.getInstance().equals(oldURL.getInstance())) { + return true; + } + + return !oldURL.getMetadataInfo().getServiceInfo(getConsumerUrl().getProtocolServiceKey()) + .equals(newURL.getMetadataInfo().getServiceInfo(getConsumerUrl().getProtocolServiceKey())); + } + private List<Invoker<T>> toMergeInvokerList(List<Invoker<T>> invokers) { return invokers; } @@ -162,7 +175,7 @@ public class ServiceDiscoveryRegistryDirectory<T> extends DynamicDirectory<T> im * Close all invokers */ private void destroyAllInvokers() { - Map<URL, Invoker<T>> localUrlInvokerMap = this.urlInvokerMap; // local reference + Map<String, Invoker<T>> localUrlInvokerMap = this.urlInvokerMap; // local reference if (localUrlInvokerMap != null) { for (Invoker<T> invoker : new ArrayList<>(localUrlInvokerMap.values())) { try { @@ -183,16 +196,16 @@ public class ServiceDiscoveryRegistryDirectory<T> extends DynamicDirectory<T> im * @param oldUrlInvokerMap * @param newUrlInvokerMap */ - private void destroyUnusedInvokers(Map<URL, Invoker<T>> oldUrlInvokerMap, Map<URL, Invoker<T>> newUrlInvokerMap) { + private void destroyUnusedInvokers(Map<String, Invoker<T>> oldUrlInvokerMap, Map<String, Invoker<T>> newUrlInvokerMap) { if (newUrlInvokerMap == null || newUrlInvokerMap.size() == 0) { destroyAllInvokers(); return; } // check deleted invoker - List<URL> deleted = null; + List<String> deleted = null; if (oldUrlInvokerMap != null) { Collection<Invoker<T>> newInvokers = newUrlInvokerMap.values(); - for (Map.Entry<URL, Invoker<T>> entry : oldUrlInvokerMap.entrySet()) { + for (Map.Entry<String, Invoker<T>> entry : oldUrlInvokerMap.entrySet()) { if (!newInvokers.contains(entry.getValue())) { if (deleted == null) { deleted = new ArrayList<>(); @@ -203,9 +216,9 @@ public class ServiceDiscoveryRegistryDirectory<T> extends DynamicDirectory<T> im } if (deleted != null) { - for (URL url : deleted) { - if (url != null) { - Invoker<T> invoker = oldUrlInvokerMap.remove(url); + for (String addressKey : deleted) { + if (addressKey != null) { + Invoker<T> invoker = oldUrlInvokerMap.remove(addressKey); if (invoker != null) { try { invoker.destroy(); diff --git a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/metadata/ServiceInstanceMetadataUtils.java b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/metadata/ServiceInstanceMetadataUtils.java index 57010cf..1afb486 100644 --- a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/metadata/ServiceInstanceMetadataUtils.java +++ b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/metadata/ServiceInstanceMetadataUtils.java @@ -79,12 +79,7 @@ public class ServiceInstanceMetadataUtils { /** * The property name of The revision for all exported Dubbo services. */ - public static String EXPORTED_SERVICES_REVISION_PROPERTY_NAME = "dubbo.exported-services.revision"; - - /** - * The property name of The revision for all subscribed Dubbo services. - */ - public static String SUBSCRIBER_SERVICES_REVISION_PROPERTY_NAME = "dubbo.subscribed-services.revision"; + public static String EXPORTED_SERVICES_REVISION_PROPERTY_NAME = "dubbo.metadata.revision"; /** * The property name of metadata storage type. @@ -163,17 +158,6 @@ public class ServiceInstanceMetadataUtils { } /** - * The revision for all subscribed Dubbo services from the specified {@link ServiceInstance}. - * - * @param serviceInstance the specified {@link ServiceInstance} - * @return <code>null</code> if not exits - */ - public static String getSubscribedServicesRevision(ServiceInstance serviceInstance) { - Map<String, String> metadata = serviceInstance.getMetadata(); - return metadata.get(SUBSCRIBER_SERVICES_REVISION_PROPERTY_NAME); - } - - /** * Get metadata's storage type * * @param registryURL the {@link URL} to connect the registry diff --git a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/integration/DynamicDirectory.java b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/integration/DynamicDirectory.java index 8268361..80aadc8 100644 --- a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/integration/DynamicDirectory.java +++ b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/integration/DynamicDirectory.java @@ -43,10 +43,14 @@ import java.util.Map; import java.util.Set; import static org.apache.dubbo.common.constants.CommonConstants.ANY_VALUE; +import static org.apache.dubbo.common.constants.CommonConstants.DUBBO; import static org.apache.dubbo.common.constants.CommonConstants.GROUP_KEY; +import static org.apache.dubbo.common.constants.CommonConstants.INTERFACE_KEY; import static org.apache.dubbo.common.constants.CommonConstants.MONITOR_KEY; +import static org.apache.dubbo.common.constants.CommonConstants.PROTOCOL_KEY; import static org.apache.dubbo.common.constants.RegistryConstants.CATEGORY_KEY; import static org.apache.dubbo.common.constants.RegistryConstants.CONSUMERS_CATEGORY; +import static org.apache.dubbo.registry.Constants.REGISTER_IP_KEY; import static org.apache.dubbo.registry.Constants.REGISTER_KEY; import static org.apache.dubbo.registry.Constants.SIMPLIFIED_KEY; import static org.apache.dubbo.registry.integration.RegistryProtocol.DEFAULT_REGISTER_CONSUMER_KEYS; @@ -122,7 +126,10 @@ public abstract class DynamicDirectory<T> extends AbstractDirectory<T> implement private URL turnRegistryUrlToConsumerUrl(URL url) { return URLBuilder.from(url) - .setPath(url.getServiceInterface()) + .setHost(queryMap.get(REGISTER_IP_KEY)) + .setPort(0) + .setProtocol(queryMap.get(PROTOCOL_KEY) == null ? DUBBO : queryMap.get(PROTOCOL_KEY)) + .setPath(queryMap.get(INTERFACE_KEY)) .clearParameters() .addParameters(queryMap) .removeParameter(MONITOR_KEY) diff --git a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/integration/RegistryDirectory.java b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/integration/RegistryDirectory.java index 7a06106..1062768 100644 --- a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/integration/RegistryDirectory.java +++ b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/integration/RegistryDirectory.java @@ -394,7 +394,7 @@ public class RegistryDirectory<T> extends DynamicDirectory<T> implements NotifyL providerUrl = providerUrl.addParameter(Constants.CHECK_KEY, String.valueOf(false)); // Do not check whether the connection is successful or not, always create Invoker! // The combination of directoryUrl and override is at the end of notify, which can't be handled here - this.overrideDirectoryUrl = this.overrideDirectoryUrl.addParametersIfAbsent(providerUrl.getParameters()); // Merge the provider side parameters +// this.overrideDirectoryUrl = this.overrideDirectoryUrl.addParametersIfAbsent(providerUrl.getParameters()); // Merge the provider side parameters if ((providerUrl.getPath() == null || providerUrl.getPath() .length() == 0) && DUBBO_PROTOCOL.equals(providerUrl.getProtocol())) { // Compatible version 1.0 diff --git a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/Exchangers.java b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/Exchangers.java index 36bcc74..1e3b278 100644 --- a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/Exchangers.java +++ b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/Exchangers.java @@ -105,7 +105,7 @@ public class Exchangers { if (handler == null) { throw new IllegalArgumentException("handler == null"); } - url = url.addParameterIfAbsent(Constants.CODEC_KEY, "exchange"); +// url = url.addParameterIfAbsent(Constants.CODEC_KEY, "exchange"); return getExchanger(url).connect(url, handler); } diff --git a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/transport/AbstractEndpoint.java b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/transport/AbstractEndpoint.java index 94738a8..53ce72f 100644 --- a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/transport/AbstractEndpoint.java +++ b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/transport/AbstractEndpoint.java @@ -51,7 +51,7 @@ public abstract class AbstractEndpoint extends AbstractPeer implements Resetable } protected static Codec2 getChannelCodec(URL url) { - String codecName = url.getParameter(Constants.CODEC_KEY, "telnet"); + String codecName = url.getProtocol(); // codec extension name must stay the same with protocol name if (ExtensionLoader.getExtensionLoader(Codec2.class).hasExtension(codecName)) { return ExtensionLoader.getExtensionLoader(Codec2.class).getExtension(codecName); } else { diff --git a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/proxy/InvokerInvocationHandler.java b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/proxy/InvokerInvocationHandler.java index b5c9e67..0eae664 100644 --- a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/proxy/InvokerInvocationHandler.java +++ b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/proxy/InvokerInvocationHandler.java @@ -67,6 +67,7 @@ public class InvokerInvocationHandler implements InvocationHandler { String serviceKey = invoker.getUrl().getServiceKey(); rpcInvocation.setTargetServiceUniqueName(serviceKey); + // invoker.getUrl() returns consumer url. RpcContext.setRpcContext(invoker.getUrl()); if (consumerModel != null) { diff --git a/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/DubboProtocol.java b/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/DubboProtocol.java index 83190d2..e095523 100644 --- a/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/DubboProtocol.java +++ b/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/DubboProtocol.java @@ -572,9 +572,10 @@ public class DubboProtocol extends AbstractProtocol { // client type setting. String str = url.getParameter(CLIENT_KEY, url.getParameter(SERVER_KEY, DEFAULT_REMOTING_CLIENT)); - url = url.addParameter(CODEC_KEY, DubboCodec.NAME); +// url = url.addParameter(CODEC_KEY, DubboCodec.NAME); // enable heartbeat by default - url = url.addParameterIfAbsent(HEARTBEAT_KEY, String.valueOf(DEFAULT_HEARTBEAT)); + // FIXME, +// url = url.addParameterIfAbsent(HEARTBEAT_KEY, String.valueOf(DEFAULT_HEARTBEAT)); // BIO is not allowed since it has severe performance issue. if (str != null && str.length() > 0 && !ExtensionLoader.getExtensionLoader(Transporter.class).hasExtension(str)) { diff --git a/dubbo-rpc/dubbo-rpc-thrift/src/main/java/org/apache/dubbo/rpc/protocol/thrift/ThriftProtocol.java b/dubbo-rpc/dubbo-rpc-thrift/src/main/java/org/apache/dubbo/rpc/protocol/thrift/ThriftProtocol.java index 1737d54..2758c5e 100644 --- a/dubbo-rpc/dubbo-rpc-thrift/src/main/java/org/apache/dubbo/rpc/protocol/thrift/ThriftProtocol.java +++ b/dubbo-rpc/dubbo-rpc-thrift/src/main/java/org/apache/dubbo/rpc/protocol/thrift/ThriftProtocol.java @@ -191,7 +191,7 @@ public class ThriftProtocol extends AbstractProtocol { ExchangeClient client; - url = url.addParameter(CODEC_KEY, ThriftCodec.NAME); +// url = url.addParameter(CODEC_KEY, ThriftCodec.NAME); try { client = Exchangers.connect(url);
