This is an automated email from the ASF dual-hosted git repository. liujun pushed a commit to branch 3.0 in repository https://gitbox.apache.org/repos/asf/dubbo.git
commit 3d234d29fcd7dc67ffbd542edd7053d68f35c555 Author: ken.lj <[email protected]> AuthorDate: Thu Jul 9 18:01:19 2020 +0800 service discovery --- .../org.apache.dubbo.rpc.cluster.RouterFactory | 1 - .../dubbo/common/config/ConfigurationUtils.java | 4 ++ .../dubbo/config/bootstrap/DubboBootstrap.java | 58 ++++++++------------ .../apache/dubbo/metadata/MetadataConstants.java | 2 + .../org/apache/dubbo/metadata/MetadataInfo.java | 7 +++ .../dubbo/qos/command/impl/PublishMetadata.java | 63 ++++++++++++++++++++++ .../org.apache.dubbo.qos.command.BaseCommand | 1 + .../client/EventPublishingServiceDiscovery.java | 5 ++ .../client/FileSystemServiceDiscovery.java | 8 +++ .../dubbo/registry/client/ServiceDiscovery.java | 2 + .../client/ServiceDiscoveryRegistryDirectory.java | 2 +- .../listener/ServiceInstancesChangedListener.java | 14 +++-- .../metadata/ServiceInstanceMetadataUtils.java | 37 +++++++++++++ .../store/InMemoryWritableMetadataService.java | 12 +++++ .../metadata/store/RemoteMetadataServiceImpl.java | 6 +-- .../registry/support/AbstractRegistryFactory.java | 12 +++++ .../registry/client/InMemoryServiceDiscovery.java | 8 +++ .../zookeeper/ZookeeperServiceDiscovery.java | 18 +++++-- 18 files changed, 211 insertions(+), 49 deletions(-) diff --git a/dubbo-cluster/src/main/resources/META-INF/dubbo/internal/org.apache.dubbo.rpc.cluster.RouterFactory b/dubbo-cluster/src/main/resources/META-INF/dubbo/internal/org.apache.dubbo.rpc.cluster.RouterFactory index 13e307b..2a807f0 100644 --- a/dubbo-cluster/src/main/resources/META-INF/dubbo/internal/org.apache.dubbo.rpc.cluster.RouterFactory +++ b/dubbo-cluster/src/main/resources/META-INF/dubbo/internal/org.apache.dubbo.rpc.cluster.RouterFactory @@ -5,4 +5,3 @@ service=org.apache.dubbo.rpc.cluster.router.condition.config.ServiceRouterFactor app=org.apache.dubbo.rpc.cluster.router.condition.config.AppRouterFactory tag=org.apache.dubbo.rpc.cluster.router.tag.TagRouterFactory mock=org.apache.dubbo.rpc.cluster.router.mock.MockRouterFactory -instance=org.apache.dubbo.rpc.cluster.router.service.InstanceRouterFactory diff --git a/dubbo-common/src/main/java/org/apache/dubbo/common/config/ConfigurationUtils.java b/dubbo-common/src/main/java/org/apache/dubbo/common/config/ConfigurationUtils.java index 70fd9f1..a7c0693 100644 --- a/dubbo-common/src/main/java/org/apache/dubbo/common/config/ConfigurationUtils.java +++ b/dubbo-common/src/main/java/org/apache/dubbo/common/config/ConfigurationUtils.java @@ -100,6 +100,10 @@ public class ConfigurationUtils { return StringUtils.trim(getGlobalConfiguration().getString(property, defaultValue)); } + public static int get(String property, int defaultValue) { + return getGlobalConfiguration().getInt(property, defaultValue); + } + public static Map<String, String> parseProperties(String content) throws IOException { Map<String, String> map = new HashMap<>(); if (StringUtils.isEmpty(content)) { diff --git a/dubbo-config/dubbo-config-api/src/main/java/org/apache/dubbo/config/bootstrap/DubboBootstrap.java b/dubbo-config/dubbo-config-api/src/main/java/org/apache/dubbo/config/bootstrap/DubboBootstrap.java index 455535c6..a9f2bcf 100644 --- a/dubbo-config/dubbo-config-api/src/main/java/org/apache/dubbo/config/bootstrap/DubboBootstrap.java +++ b/dubbo-config/dubbo-config-api/src/main/java/org/apache/dubbo/config/bootstrap/DubboBootstrap.java @@ -17,6 +17,7 @@ package org.apache.dubbo.config.bootstrap; import org.apache.dubbo.common.URL; +import org.apache.dubbo.common.config.ConfigurationUtils; import org.apache.dubbo.common.config.Environment; import org.apache.dubbo.common.config.configcenter.DynamicConfiguration; import org.apache.dubbo.common.config.configcenter.wrapper.CompositeDynamicConfiguration; @@ -58,17 +59,16 @@ import org.apache.dubbo.config.utils.ReferenceConfigCache; import org.apache.dubbo.event.EventDispatcher; import org.apache.dubbo.event.EventListener; import org.apache.dubbo.event.GenericEventListener; -import org.apache.dubbo.metadata.MetadataInfo; import org.apache.dubbo.metadata.MetadataService; import org.apache.dubbo.metadata.MetadataServiceExporter; import org.apache.dubbo.metadata.WritableMetadataService; import org.apache.dubbo.metadata.report.MetadataReportInstance; import org.apache.dubbo.registry.client.DefaultServiceInstance; -import org.apache.dubbo.registry.client.ServiceDiscovery; -import org.apache.dubbo.registry.client.ServiceDiscoveryRegistry; import org.apache.dubbo.registry.client.ServiceInstance; import org.apache.dubbo.registry.client.ServiceInstanceCustomizer; import org.apache.dubbo.registry.client.metadata.MetadataUtils; +import org.apache.dubbo.registry.client.metadata.ServiceInstanceMetadataUtils; +import org.apache.dubbo.registry.client.metadata.store.InMemoryWritableMetadataService; import org.apache.dubbo.registry.client.metadata.store.RemoteMetadataServiceImpl; import org.apache.dubbo.registry.support.AbstractRegistryFactory; import org.apache.dubbo.rpc.model.ApplicationModel; @@ -89,7 +89,6 @@ import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; import java.util.function.Consumer; -import java.util.stream.Collectors; import static java.util.Arrays.asList; import static java.util.concurrent.Executors.newSingleThreadExecutor; @@ -99,11 +98,13 @@ import static org.apache.dubbo.common.constants.CommonConstants.DEFAULT_METADATA import static org.apache.dubbo.common.constants.CommonConstants.REMOTE_METADATA_STORAGE_TYPE; import static org.apache.dubbo.common.function.ThrowableAction.execute; import static org.apache.dubbo.common.utils.StringUtils.isNotEmpty; +import static org.apache.dubbo.metadata.MetadataConstants.DEFAULT_METADATA_PUBLISH_DELAY; +import static org.apache.dubbo.metadata.MetadataConstants.METADATA_PUBLISH_DELAY_KEY; import static org.apache.dubbo.metadata.WritableMetadataService.getDefaultExtension; -import static org.apache.dubbo.registry.client.metadata.ServiceInstanceMetadataUtils.EXPORTED_SERVICES_REVISION_PROPERTY_NAME; +import static org.apache.dubbo.registry.client.metadata.ServiceInstanceMetadataUtils.calInstanceRevision; import static org.apache.dubbo.registry.client.metadata.ServiceInstanceMetadataUtils.setMetadataStorageType; +import static org.apache.dubbo.registry.support.AbstractRegistryFactory.getServiceDiscoveries; import static org.apache.dubbo.remoting.Constants.CLIENT_KEY; -import static org.apache.dubbo.rpc.Constants.ID_KEY; /** * See {@link ApplicationModel} and {@link ExtensionLoader} for why this class is designed to be singleton. @@ -734,15 +735,6 @@ public class DubboBootstrap extends GenericEventListener { addEventListener(this); } - private List<ServiceDiscovery> getServiceDiscoveries() { - return AbstractRegistryFactory.getRegistries() - .stream() - .filter(registry -> registry instanceof ServiceDiscoveryRegistry) - .map(registry -> (ServiceDiscoveryRegistry) registry) - .map(ServiceDiscoveryRegistry::getServiceDiscovery) - .collect(Collectors.toList()); - } - /** * Start the bootstrap */ @@ -1024,6 +1016,18 @@ public class DubboBootstrap extends GenericEventListener { ServiceInstance serviceInstance = createServiceInstance(serviceName, host, port); + doRegisterServiceInstance(serviceInstance); + + // scheduled task for updating Metadata and ServiceInstance + executorRepository.nextScheduledExecutor().scheduleAtFixedRate(() -> { + InMemoryWritableMetadataService localMetadataService = (InMemoryWritableMetadataService) WritableMetadataService.getDefaultExtension(); + localMetadataService.blockUntilUpdated(); + ServiceInstanceMetadataUtils.refreshMetadataAndInstance(); + }, 0, ConfigurationUtils.get(METADATA_PUBLISH_DELAY_KEY, DEFAULT_METADATA_PUBLISH_DELAY), TimeUnit.MICROSECONDS); + } + + private void doRegisterServiceInstance(ServiceInstance serviceInstance) { + //FIXME publishMetadataToRemote(serviceInstance); getServiceDiscoveries().forEach(serviceDiscovery -> @@ -1032,31 +1036,13 @@ public class DubboBootstrap extends GenericEventListener { // register metadata serviceDiscovery.register(serviceInstance); }); - - // scheduled task for updating Metadata and ServiceInstance - executorRepository.nextScheduledExecutor().scheduleAtFixedRate(() -> { - publishMetadataToRemote(serviceInstance); - - getServiceDiscoveries().forEach(serviceDiscovery -> - { - calInstanceRevision(serviceDiscovery, serviceInstance); - // register metadata - serviceDiscovery.register(serviceInstance); - }); - }, 0, 5000, TimeUnit.MICROSECONDS); } private void publishMetadataToRemote(ServiceInstance serviceInstance) { +// InMemoryWritableMetadataService localMetadataService = (InMemoryWritableMetadataService)WritableMetadataService.getDefaultExtension(); +// localMetadataService.blockUntilUpdated(); RemoteMetadataServiceImpl remoteMetadataService = MetadataUtils.getRemoteMetadataService(); - remoteMetadataService.publishMetadata(serviceInstance); - } - - private void calInstanceRevision(ServiceDiscovery serviceDiscovery, ServiceInstance instance) { - String registryCluster = serviceDiscovery.getUrl().getParameter(ID_KEY); - MetadataInfo metadataInfo = WritableMetadataService.getDefaultExtension().getMetadataInfos().get(registryCluster); - if (metadataInfo != null) { - instance.getMetadata().put(EXPORTED_SERVICES_REVISION_PROPERTY_NAME, metadataInfo.getRevision()); - } + remoteMetadataService.publishMetadata(serviceInstance.getServiceName()); } private URL selectMetadataServiceExportedURL() { diff --git a/dubbo-metadata/dubbo-metadata-api/src/main/java/org/apache/dubbo/metadata/MetadataConstants.java b/dubbo-metadata/dubbo-metadata-api/src/main/java/org/apache/dubbo/metadata/MetadataConstants.java index e03ddd6..7ba0a43 100644 --- a/dubbo-metadata/dubbo-metadata-api/src/main/java/org/apache/dubbo/metadata/MetadataConstants.java +++ b/dubbo-metadata/dubbo-metadata-api/src/main/java/org/apache/dubbo/metadata/MetadataConstants.java @@ -23,4 +23,6 @@ public class MetadataConstants { public static final String META_DATA_STORE_TAG = ".metaData"; public static final String SERVICE_META_DATA_STORE_TAG = ".smd"; public static final String CONSUMER_META_DATA_STORE_TAG = ".cmd"; + public static final String METADATA_PUBLISH_DELAY_KEY = "dubbo.application.metadata.delay"; + public static final int DEFAULT_METADATA_PUBLISH_DELAY = 5000; } diff --git a/dubbo-metadata/dubbo-metadata-api/src/main/java/org/apache/dubbo/metadata/MetadataInfo.java b/dubbo-metadata/dubbo-metadata-api/src/main/java/org/apache/dubbo/metadata/MetadataInfo.java index 73e736f..4db0298 100644 --- a/dubbo-metadata/dubbo-metadata-api/src/main/java/org/apache/dubbo/metadata/MetadataInfo.java +++ b/dubbo-metadata/dubbo-metadata-api/src/main/java/org/apache/dubbo/metadata/MetadataInfo.java @@ -20,6 +20,7 @@ import org.apache.dubbo.common.URL; import org.apache.dubbo.common.compiler.support.ClassUtils; import org.apache.dubbo.common.extension.ExtensionLoader; import org.apache.dubbo.common.utils.ArrayUtils; +import org.apache.dubbo.common.utils.CollectionUtils; import org.apache.dubbo.common.utils.StringUtils; import java.io.Serializable; @@ -39,6 +40,7 @@ import static org.apache.dubbo.common.constants.CommonConstants.METHODS_KEY; import static org.apache.dubbo.common.constants.CommonConstants.VERSION_KEY; public class MetadataInfo implements Serializable { + public static String DEFAULT_REVISION = "0"; private String app; private String revision; private Map<String, ServiceInfo> services; @@ -85,6 +87,11 @@ public class MetadataInfo implements Serializable { if (revision != null && hasReported()) { return revision; } + + if (CollectionUtils.isEmptyMap(services)) { + return DEFAULT_REVISION; + } + StringBuilder sb = new StringBuilder(); sb.append(app); for (Map.Entry<String, ServiceInfo> entry : services.entrySet()) { diff --git a/dubbo-plugin/dubbo-qos/src/main/java/org/apache/dubbo/qos/command/impl/PublishMetadata.java b/dubbo-plugin/dubbo-qos/src/main/java/org/apache/dubbo/qos/command/impl/PublishMetadata.java new file mode 100644 index 0000000..990854c --- /dev/null +++ b/dubbo-plugin/dubbo-qos/src/main/java/org/apache/dubbo/qos/command/impl/PublishMetadata.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.dubbo.qos.command.impl; + +import org.apache.dubbo.common.extension.ExtensionLoader; +import org.apache.dubbo.common.logger.Logger; +import org.apache.dubbo.common.logger.LoggerFactory; +import org.apache.dubbo.common.threadpool.manager.ExecutorRepository; +import org.apache.dubbo.common.utils.ArrayUtils; +import org.apache.dubbo.qos.command.BaseCommand; +import org.apache.dubbo.qos.command.CommandContext; +import org.apache.dubbo.qos.command.annotation.Cmd; +import org.apache.dubbo.registry.client.metadata.ServiceInstanceMetadataUtils; + +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; + +@Cmd(name = "publishMetadata", summary = "update service metadata and service instance", example = { + "publishMetadata", + "publishMetadata 5" +}) +public class PublishMetadata implements BaseCommand { + private static final Logger logger = LoggerFactory.getLogger(PublishMetadata.class); + private final ExecutorRepository executorRepository = ExtensionLoader.getExtensionLoader(ExecutorRepository.class).getDefaultExtension(); + private ScheduledFuture future; + + @Override + public String execute(CommandContext commandContext, String[] args) { + logger.info("received publishMetadata command."); + + if (ArrayUtils.isEmpty(args)) { + ServiceInstanceMetadataUtils.refreshMetadataAndInstance(); + return "publish metadata succeeded."; + } + + try { + int delay = Integer.parseInt(args[0]); + if (future == null || future.isDone() || future.isCancelled()) { + future = executorRepository.nextScheduledExecutor() + .scheduleWithFixedDelay(ServiceInstanceMetadataUtils::refreshMetadataAndInstance, 0, delay, TimeUnit.MILLISECONDS); + } + } catch (NumberFormatException e) { + logger.error("Wrong delay param", e); + return "publishMetadata failed! Wrong delay param!"; + } + return "publish task submitted, will publish in " + args[0] + " seconds."; + } + +} diff --git a/dubbo-plugin/dubbo-qos/src/main/resources/META-INF/dubbo/internal/org.apache.dubbo.qos.command.BaseCommand b/dubbo-plugin/dubbo-qos/src/main/resources/META-INF/dubbo/internal/org.apache.dubbo.qos.command.BaseCommand index cb6e9a7..b92b6b2 100644 --- a/dubbo-plugin/dubbo-qos/src/main/resources/META-INF/dubbo/internal/org.apache.dubbo.qos.command.BaseCommand +++ b/dubbo-plugin/dubbo-qos/src/main/resources/META-INF/dubbo/internal/org.apache.dubbo.qos.command.BaseCommand @@ -5,3 +5,4 @@ ls=org.apache.dubbo.qos.command.impl.Ls offline=org.apache.dubbo.qos.command.impl.Offline ready=org.apache.dubbo.qos.command.impl.Ready version=org.apache.dubbo.qos.command.impl.Version +publish-metadata=org.apache.dubbo.qos.command.impl.PublishMetadata diff --git a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/EventPublishingServiceDiscovery.java b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/EventPublishingServiceDiscovery.java index b5517b7..ee99000 100644 --- a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/EventPublishingServiceDiscovery.java +++ b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/EventPublishingServiceDiscovery.java @@ -229,6 +229,11 @@ final class EventPublishingServiceDiscovery implements ServiceDiscovery { } @Override + public ServiceInstance getLocalInstance() { + return serviceDiscovery.getLocalInstance(); + } + + @Override public void initialize(URL registryURL) { assertInitialized(INITIALIZE_ACTION); diff --git a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/FileSystemServiceDiscovery.java b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/FileSystemServiceDiscovery.java index ba8d7d3..2a51168 100644 --- a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/FileSystemServiceDiscovery.java +++ b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/FileSystemServiceDiscovery.java @@ -62,6 +62,8 @@ public class FileSystemServiceDiscovery implements ServiceDiscovery, EventListen private FileSystemDynamicConfiguration dynamicConfiguration; + private ServiceInstance serviceInstance; + @Override public void onEvent(ServiceInstancesChangedEvent event) { @@ -134,7 +136,13 @@ public class FileSystemServiceDiscovery implements ServiceDiscovery, EventListen } @Override + public ServiceInstance getLocalInstance() { + return serviceInstance; + } + + @Override public void register(ServiceInstance serviceInstance) throws RuntimeException { + this.serviceInstance = serviceInstance; String serviceInstanceId = getServiceInstanceId(serviceInstance); String serviceName = getServiceName(serviceInstance); String content = toJSONString(serviceInstance); diff --git a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/ServiceDiscovery.java b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/ServiceDiscovery.java index f9e667b..9800c35 100644 --- a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/ServiceDiscovery.java +++ b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/ServiceDiscovery.java @@ -267,6 +267,8 @@ public interface ServiceDiscovery extends Prioritized { return null; } + ServiceInstance getLocalInstance(); + /** * A human-readable description of the implementation * diff --git a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/ServiceDiscoveryRegistryDirectory.java b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/ServiceDiscoveryRegistryDirectory.java index 9890988..3fd7713 100644 --- a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/ServiceDiscoveryRegistryDirectory.java +++ b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/ServiceDiscoveryRegistryDirectory.java @@ -64,7 +64,7 @@ public class ServiceDiscoveryRegistryDirectory<T> extends DynamicDirectory<T> im } private void refreshInvoker(List<URL> invokerUrls) { - Assert.notNull(invokerUrls, "invokerUrls should not be null"); + Assert.notNull(invokerUrls, "invokerUrls should not be null, use empty:// to clear address."); if (invokerUrls.size() == 1 && invokerUrls.get(0) != null diff --git a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/event/listener/ServiceInstancesChangedListener.java b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/event/listener/ServiceInstancesChangedListener.java index 1901e34..8af48a3 100644 --- a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/event/listener/ServiceInstancesChangedListener.java +++ b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/event/listener/ServiceInstancesChangedListener.java @@ -45,6 +45,7 @@ import java.util.TreeSet; import static org.apache.dubbo.common.constants.CommonConstants.REGISTER_KEY; import static org.apache.dubbo.common.constants.CommonConstants.REMOTE_METADATA_STORAGE_TYPE; +import static org.apache.dubbo.metadata.MetadataInfo.DEFAULT_REVISION; import static org.apache.dubbo.registry.client.metadata.ServiceInstanceMetadataUtils.getExportedServicesRevision; /** @@ -86,11 +87,17 @@ public class ServiceInstancesChangedListener implements ConditionalEventListener String appName = event.getServiceName(); allInstances.put(appName, event.getServiceInstances()); + Map<String, List<ServiceInstance>> revisionToInstances = new HashMap<>(); + Map<String, Set<String>> localServiceToRevisions = new HashMap<>(); + Map<Set<String>, List<URL>> revisionsToUrls = new HashMap(); for (Map.Entry<String, List<ServiceInstance>> entry : allInstances.entrySet()) { List<ServiceInstance> instances = entry.getValue(); for (ServiceInstance instance : instances) { String revision = getExportedServicesRevision(instance); - Map<String, List<ServiceInstance>> revisionToInstances = new HashMap<>(); + if (DEFAULT_REVISION.equals(revision)) { + logger.info("Find instance without valid service metadata: " + instance.getAddress()); + continue; + } List<ServiceInstance> subInstances = revisionToInstances.computeIfAbsent(revision, r -> new LinkedList<>()); subInstances.add(instance); @@ -104,7 +111,6 @@ public class ServiceInstancesChangedListener implements ConditionalEventListener } } - Map<String, Set<String>> localServiceToRevisions = new HashMap<>(); if (metadata != null) { parseMetadata(revision, metadata, localServiceToRevisions); ((DefaultServiceInstance) instance).setServiceMetadata(metadata); @@ -115,7 +121,6 @@ public class ServiceInstancesChangedListener implements ConditionalEventListener // set.add(revision); // } - Map<Set<String>, List<URL>> revisionsToUrls = new HashMap(); localServiceToRevisions.forEach((serviceKey, revisions) -> { List<URL> urls = revisionsToUrls.get(revisions); if (urls != null) { @@ -140,8 +145,7 @@ public class ServiceInstancesChangedListener implements ConditionalEventListener private Map<String, Set<String>> parseMetadata(String revision, MetadataInfo metadata, Map<String, Set<String>> localServiceToRevisions) { Map<String, ServiceInfo> serviceInfos = metadata.getServices(); for (Map.Entry<String, ServiceInfo> entry : serviceInfos.entrySet()) { - String serviceKey = entry.getValue().getServiceKey(); - Set<String> set = localServiceToRevisions.computeIfAbsent(serviceKey, k -> new TreeSet<>()); + Set<String> set = localServiceToRevisions.computeIfAbsent(entry.getKey(), k -> new TreeSet<>()); set.add(revision); } diff --git a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/metadata/ServiceInstanceMetadataUtils.java b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/metadata/ServiceInstanceMetadataUtils.java index 75903ee..57010cf 100644 --- a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/metadata/ServiceInstanceMetadataUtils.java +++ b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/metadata/ServiceInstanceMetadataUtils.java @@ -18,9 +18,14 @@ package org.apache.dubbo.registry.client.metadata; import org.apache.dubbo.common.URL; import org.apache.dubbo.common.utils.StringUtils; +import org.apache.dubbo.metadata.MetadataInfo; import org.apache.dubbo.metadata.MetadataService; import org.apache.dubbo.metadata.WritableMetadataService; +import org.apache.dubbo.registry.client.ServiceDiscovery; import org.apache.dubbo.registry.client.ServiceInstance; +import org.apache.dubbo.registry.client.metadata.store.RemoteMetadataServiceImpl; +import org.apache.dubbo.registry.support.AbstractRegistryFactory; +import org.apache.dubbo.rpc.model.ApplicationModel; import com.alibaba.fastjson.JSON; @@ -39,6 +44,7 @@ import static org.apache.dubbo.common.constants.CommonConstants.TIMESTAMP_KEY; import static org.apache.dubbo.common.utils.StringUtils.isBlank; import static org.apache.dubbo.registry.integration.RegistryProtocol.DEFAULT_REGISTER_PROVIDER_KEYS; import static org.apache.dubbo.rpc.Constants.DEPRECATED_KEY; +import static org.apache.dubbo.rpc.Constants.ID_KEY; /** * The Utilities class for the {@link ServiceInstance#getMetadata() metadata of the service instance} @@ -87,6 +93,8 @@ public class ServiceInstanceMetadataUtils { public static String METADATA_CLUSTER_PROPERTY_NAME = "dubbo.metadata.cluster"; + public static String INSTANCE_REVISION_UPDATED_KEY = "dubbo.instance.revision.updated"; + /** * Get the multiple {@link URL urls'} parameters of {@link MetadataService MetadataService's} Metadata * @@ -247,6 +255,35 @@ public class ServiceInstanceMetadataUtils { return null; } + public static void calInstanceRevision(ServiceDiscovery serviceDiscovery, ServiceInstance instance) { + String registryCluster = serviceDiscovery.getUrl().getParameter(ID_KEY); + MetadataInfo metadataInfo = WritableMetadataService.getDefaultExtension().getMetadataInfos().get(registryCluster); + if (metadataInfo != null) { + String existingInstanceRevision = instance.getMetadata().get(EXPORTED_SERVICES_REVISION_PROPERTY_NAME); + if (!metadataInfo.getRevision().equals(existingInstanceRevision)) { + instance.getMetadata().put(EXPORTED_SERVICES_REVISION_PROPERTY_NAME, metadataInfo.getRevision()); + if (existingInstanceRevision != null) {// skip the first registration. + instance.getExtendParams().put(INSTANCE_REVISION_UPDATED_KEY, "true"); + } + } + } + } + + public static boolean isInstanceUpdated(ServiceInstance instance) { + return "true".equals(instance.getExtendParams().get(INSTANCE_REVISION_UPDATED_KEY)); + } + + public static void refreshMetadataAndInstance() { + RemoteMetadataServiceImpl remoteMetadataService = MetadataUtils.getRemoteMetadataService(); + remoteMetadataService.publishMetadata(ApplicationModel.getName()); + + AbstractRegistryFactory.getServiceDiscoveries().forEach(serviceDiscovery -> { + calInstanceRevision(serviceDiscovery, serviceDiscovery.getLocalInstance()); + // update service instance revision + serviceDiscovery.update(serviceDiscovery.getLocalInstance()); + }); + } + /** * Set the default parameters via the specified {@link URL providerURL} * diff --git a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/metadata/store/InMemoryWritableMetadataService.java b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/metadata/store/InMemoryWritableMetadataService.java index 8605eaa..2b747de 100644 --- a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/metadata/store/InMemoryWritableMetadataService.java +++ b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/metadata/store/InMemoryWritableMetadataService.java @@ -41,6 +41,7 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ConcurrentNavigableMap; import java.util.concurrent.ConcurrentSkipListMap; +import java.util.concurrent.Semaphore; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; @@ -74,6 +75,7 @@ public class InMemoryWritableMetadataService implements WritableMetadataService */ ConcurrentNavigableMap<String, SortedSet<URL>> exportedServiceURLs = new ConcurrentSkipListMap<>(); ConcurrentMap<String, MetadataInfo> metadataInfos; + final Semaphore metadataSemaphore = new Semaphore(1); // ==================================================================================== // @@ -131,6 +133,7 @@ public class InMemoryWritableMetadataService implements WritableMetadataService }); metadataInfo.addService(new ServiceInfo(url)); } + metadataSemaphore.release(); return addURL(exportedServiceURLs, url); } @@ -145,6 +148,7 @@ public class InMemoryWritableMetadataService implements WritableMetadataService metadataInfos.remove(key); } } + metadataSemaphore.release(); return removeURL(exportedServiceURLs, url); } @@ -202,6 +206,14 @@ public class InMemoryWritableMetadataService implements WritableMetadataService return null; } + public void blockUntilUpdated() { + try { + metadataSemaphore.acquire(); + } catch (InterruptedException e) { + logger.warn("metadata refresh thread has been interrupted unexpectedly while wating for update.", e); + } + } + public Map<String, MetadataInfo> getMetadataInfos() { return metadataInfos; } diff --git a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/metadata/store/RemoteMetadataServiceImpl.java b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/metadata/store/RemoteMetadataServiceImpl.java index ee41050..00aa92a 100644 --- a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/metadata/store/RemoteMetadataServiceImpl.java +++ b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/metadata/store/RemoteMetadataServiceImpl.java @@ -47,7 +47,6 @@ import static org.apache.dubbo.common.constants.CommonConstants.VERSION_KEY; import static org.apache.dubbo.common.constants.RegistryConstants.REGISTRY_KEY; public class RemoteMetadataServiceImpl { - protected final Logger logger = LoggerFactory.getLogger(getClass()); private WritableMetadataService localMetadataService; @@ -59,11 +58,11 @@ public class RemoteMetadataServiceImpl { return MetadataReportInstance.getMetadataReports(true); } - public void publishMetadata(ServiceInstance instance) { + public void publishMetadata(String serviceName) { Map<String, MetadataInfo> metadataInfos = localMetadataService.getMetadataInfos(); metadataInfos.forEach((registryKey, metadataInfo) -> { if (!metadataInfo.hasReported()) { - SubscriberMetadataIdentifier identifier = new SubscriberMetadataIdentifier(instance.getServiceName(), metadataInfo.getRevision()); + SubscriberMetadataIdentifier identifier = new SubscriberMetadataIdentifier(serviceName, metadataInfo.getRevision()); metadataInfo.getRevision(); metadataInfo.getExtendParams().put(REGISTRY_KEY, registryKey); MetadataReport metadataReport = getMetadataReports().get(registryKey); @@ -71,6 +70,7 @@ public class RemoteMetadataServiceImpl { metadataReport = getMetadataReports().entrySet().iterator().next().getValue(); } metadataReport.publishAppMetadata(identifier, metadataInfo); + metadataInfo.markReported(); } }); } diff --git a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/support/AbstractRegistryFactory.java b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/support/AbstractRegistryFactory.java index 7ea5559..541c22a 100644 --- a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/support/AbstractRegistryFactory.java +++ b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/support/AbstractRegistryFactory.java @@ -24,6 +24,8 @@ import org.apache.dubbo.registry.NotifyListener; import org.apache.dubbo.registry.Registry; import org.apache.dubbo.registry.RegistryFactory; import org.apache.dubbo.registry.RegistryService; +import org.apache.dubbo.registry.client.ServiceDiscovery; +import org.apache.dubbo.registry.client.ServiceDiscoveryRegistry; import java.util.Collection; import java.util.Collections; @@ -33,6 +35,7 @@ import java.util.List; import java.util.Map; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.locks.ReentrantLock; +import java.util.stream.Collectors; import static org.apache.dubbo.common.constants.CommonConstants.INTERFACE_KEY; import static org.apache.dubbo.rpc.cluster.Constants.EXPORT_KEY; @@ -69,6 +72,15 @@ public abstract class AbstractRegistryFactory implements RegistryFactory { return REGISTRIES.get(key); } + public static List<ServiceDiscovery> getServiceDiscoveries() { + return AbstractRegistryFactory.getRegistries() + .stream() + .filter(registry -> registry instanceof ServiceDiscoveryRegistry) + .map(registry -> (ServiceDiscoveryRegistry) registry) + .map(ServiceDiscoveryRegistry::getServiceDiscovery) + .collect(Collectors.toList()); + } + /** * Close all created registries */ diff --git a/dubbo-registry/dubbo-registry-api/src/test/java/org/apache/dubbo/registry/client/InMemoryServiceDiscovery.java b/dubbo-registry/dubbo-registry-api/src/test/java/org/apache/dubbo/registry/client/InMemoryServiceDiscovery.java index fbc410b..93043ba 100644 --- a/dubbo-registry/dubbo-registry-api/src/test/java/org/apache/dubbo/registry/client/InMemoryServiceDiscovery.java +++ b/dubbo-registry/dubbo-registry-api/src/test/java/org/apache/dubbo/registry/client/InMemoryServiceDiscovery.java @@ -42,6 +42,8 @@ public class InMemoryServiceDiscovery implements ServiceDiscovery { private Map<String, List<ServiceInstance>> repository = new HashMap<>(); + private ServiceInstance serviceInstance; + @Override public Set<String> getServices() { return repository.keySet(); @@ -68,12 +70,18 @@ public class InMemoryServiceDiscovery implements ServiceDiscovery { return new DefaultPage<>(offset, pageSize, data, totalSize); } + @Override + public ServiceInstance getLocalInstance() { + return serviceInstance; + } + public String toString() { return "InMemoryServiceDiscovery"; } @Override public void register(ServiceInstance serviceInstance) throws RuntimeException { + this.serviceInstance = serviceInstance; String serviceName = serviceInstance.getServiceName(); List<ServiceInstance> serviceInstances = repository.computeIfAbsent(serviceName, s -> new LinkedList<>()); if (!serviceInstances.contains(serviceInstance)) { diff --git a/dubbo-registry/dubbo-registry-zookeeper/src/main/java/org/apache/dubbo/registry/zookeeper/ZookeeperServiceDiscovery.java b/dubbo-registry/dubbo-registry-zookeeper/src/main/java/org/apache/dubbo/registry/zookeeper/ZookeeperServiceDiscovery.java index 29e6790..a4cdd6c 100644 --- a/dubbo-registry/dubbo-registry-zookeeper/src/main/java/org/apache/dubbo/registry/zookeeper/ZookeeperServiceDiscovery.java +++ b/dubbo-registry/dubbo-registry-zookeeper/src/main/java/org/apache/dubbo/registry/zookeeper/ZookeeperServiceDiscovery.java @@ -41,6 +41,7 @@ import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import static org.apache.dubbo.common.function.ThrowableFunction.execute; +import static org.apache.dubbo.registry.client.metadata.ServiceInstanceMetadataUtils.isInstanceUpdated; import static org.apache.dubbo.registry.zookeeper.util.CuratorFrameworkParams.ROOT_PATH; import static org.apache.dubbo.registry.zookeeper.util.CuratorFrameworkUtils.build; import static org.apache.dubbo.registry.zookeeper.util.CuratorFrameworkUtils.buildCuratorFramework; @@ -64,6 +65,8 @@ public class ZookeeperServiceDiscovery implements ServiceDiscovery { private org.apache.curator.x.discovery.ServiceDiscovery<ZookeeperInstance> serviceDiscovery; + private ServiceInstance serviceInstance; + /** * The Key is watched Zookeeper path, the value is an instance of {@link CuratorWatcher} */ @@ -87,16 +90,25 @@ public class ZookeeperServiceDiscovery implements ServiceDiscovery { serviceDiscovery.close(); } + @Override + public ServiceInstance getLocalInstance() { + return serviceInstance; + } + public void register(ServiceInstance serviceInstance) throws RuntimeException { + this.serviceInstance = serviceInstance; doInServiceRegistry(serviceDiscovery -> { serviceDiscovery.registerService(build(serviceInstance)); }); } public void update(ServiceInstance serviceInstance) throws RuntimeException { - doInServiceRegistry(serviceDiscovery -> { - serviceDiscovery.updateService(build(serviceInstance)); - }); + this.serviceInstance = serviceInstance; + if (isInstanceUpdated(serviceInstance)) { + doInServiceRegistry(serviceDiscovery -> { + serviceDiscovery.updateService(build(serviceInstance)); + }); + } } public void unregister(ServiceInstance serviceInstance) throws RuntimeException {
