This is an automated email from the ASF dual-hosted git repository. yaohaishi pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/servicecomb-java-chassis.git
commit d4e8db6970c4a8e5b383c0b9ebc114118497757e Author: yhs0092 <[email protected]> AuthorDate: Sat Feb 15 19:07:07 2020 +0800 [SCB-1691] ServiceRegistry use serviceRegistryCache --- .../servicecomb/serviceregistry/RegistryUtils.java | 33 +++++++++ .../consumer/MicroserviceVersions.java | 2 +- .../registry/AbstractServiceRegistry.java | 80 ++++++++++------------ .../registry/RemoteServiceRegistry.java | 5 +- .../serviceregistry/RegistryUtilsTest.java | 77 +++++++++++++++++++++ 5 files changed, 151 insertions(+), 46 deletions(-) diff --git a/service-registry/src/main/java/org/apache/servicecomb/serviceregistry/RegistryUtils.java b/service-registry/src/main/java/org/apache/servicecomb/serviceregistry/RegistryUtils.java index 46de489..2f93bd6 100644 --- a/service-registry/src/main/java/org/apache/servicecomb/serviceregistry/RegistryUtils.java +++ b/service-registry/src/main/java/org/apache/servicecomb/serviceregistry/RegistryUtils.java @@ -20,6 +20,7 @@ package org.apache.servicecomb.serviceregistry; import java.net.InetSocketAddress; import java.net.URI; import java.net.URISyntaxException; +import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.Objects; @@ -33,6 +34,7 @@ import org.apache.servicecomb.foundation.common.net.IpPort; import org.apache.servicecomb.foundation.common.net.NetUtils; import org.apache.servicecomb.serviceregistry.api.registry.Microservice; import org.apache.servicecomb.serviceregistry.api.registry.MicroserviceInstance; +import org.apache.servicecomb.serviceregistry.api.response.FindInstancesResponse; import org.apache.servicecomb.serviceregistry.cache.InstanceCacheManager; import org.apache.servicecomb.serviceregistry.cache.InstanceCacheManagerNew; import org.apache.servicecomb.serviceregistry.client.ServiceRegistryClient; @@ -41,6 +43,7 @@ import org.apache.servicecomb.serviceregistry.config.ServiceRegistryConfig; import org.apache.servicecomb.serviceregistry.consumer.AppManager; import org.apache.servicecomb.serviceregistry.definition.MicroserviceDefinition; import org.apache.servicecomb.serviceregistry.registry.ServiceRegistryFactory; +import org.apache.servicecomb.serviceregistry.registry.cache.MicroserviceCache; import org.apache.servicecomb.serviceregistry.swagger.SwaggerLoader; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -250,6 +253,36 @@ public final class RegistryUtils { return serviceRegistry.findServiceInstances(appId, serviceName, versionRule, revision); } + /** + * for compatibility + */ + public static MicroserviceInstances convertCacheToMicroserviceInstances(MicroserviceCache microserviceCache) { + MicroserviceInstances microserviceInstances = new MicroserviceInstances(); + switch (microserviceCache.getStatus()) { + case SERVICE_NOT_FOUND: + microserviceInstances.setMicroserviceNotExist(true); + microserviceInstances.setNeedRefresh(false); + microserviceInstances.setRevision(""); + microserviceInstances.setInstancesResponse(null); + return microserviceInstances; + case NO_CHANGE: + microserviceInstances.setMicroserviceNotExist(false); + microserviceInstances.setNeedRefresh(false); + microserviceInstances.setRevision(microserviceCache.getRevisionId()); + return microserviceInstances; + case REFRESHED: + microserviceInstances.setMicroserviceNotExist(false); + microserviceInstances.setNeedRefresh(true); + microserviceInstances.setRevision(microserviceCache.getRevisionId()); + FindInstancesResponse findInstancesResponse = new FindInstancesResponse(); + findInstancesResponse.setInstances(new ArrayList<>(microserviceCache.getInstances())); + microserviceInstances.setInstancesResponse(findInstancesResponse); + return microserviceInstances; + default: + return null; + } + } + public static String calcSchemaSummary(String schemaContent) { return Hashing.sha256().newHasher().putString(schemaContent, Charsets.UTF_8).hash().toString(); } diff --git a/service-registry/src/main/java/org/apache/servicecomb/serviceregistry/consumer/MicroserviceVersions.java b/service-registry/src/main/java/org/apache/servicecomb/serviceregistry/consumer/MicroserviceVersions.java index 8802a15..5dc1cec 100644 --- a/service-registry/src/main/java/org/apache/servicecomb/serviceregistry/consumer/MicroserviceVersions.java +++ b/service-registry/src/main/java/org/apache/servicecomb/serviceregistry/consumer/MicroserviceVersions.java @@ -166,7 +166,7 @@ public class MicroserviceVersions { return; } - if (!microserviceInstances.isNeedRefresh()) { + if (null != revision && revision.equals(microserviceInstances.getRevision())) { return; } diff --git a/service-registry/src/main/java/org/apache/servicecomb/serviceregistry/registry/AbstractServiceRegistry.java b/service-registry/src/main/java/org/apache/servicecomb/serviceregistry/registry/AbstractServiceRegistry.java index a37fdbc..214968a 100644 --- a/service-registry/src/main/java/org/apache/servicecomb/serviceregistry/registry/AbstractServiceRegistry.java +++ b/service-registry/src/main/java/org/apache/servicecomb/serviceregistry/registry/AbstractServiceRegistry.java @@ -27,6 +27,7 @@ import java.util.Map; import java.util.Set; import java.util.concurrent.ExecutorService; +import org.apache.servicecomb.foundation.common.concurrency.SuppressedRunnableWrapper; import org.apache.servicecomb.serviceregistry.Features; import org.apache.servicecomb.serviceregistry.RegistryUtils; import org.apache.servicecomb.serviceregistry.ServiceRegistry; @@ -48,6 +49,9 @@ import org.apache.servicecomb.serviceregistry.definition.MicroserviceDefinition; import org.apache.servicecomb.serviceregistry.definition.MicroserviceNameParser; import org.apache.servicecomb.serviceregistry.registry.cache.MicroserviceCache; import org.apache.servicecomb.serviceregistry.registry.cache.MicroserviceCacheKey; +import org.apache.servicecomb.serviceregistry.registry.cache.MicroserviceCacheRefreshedEvent; +import org.apache.servicecomb.serviceregistry.registry.cache.RefreshableServiceRegistryCache; +import org.apache.servicecomb.serviceregistry.registry.cache.ServiceRegistryCache; import org.apache.servicecomb.serviceregistry.task.MicroserviceServiceCenterTask; import org.apache.servicecomb.serviceregistry.task.ServiceCenterTask; import org.apache.servicecomb.serviceregistry.task.event.RecoveryEvent; @@ -85,6 +89,8 @@ public abstract class AbstractServiceRegistry implements ServiceRegistry { private String name; + RefreshableServiceRegistryCache serviceRegistryCache; + public AbstractServiceRegistry(EventBus eventBus, ServiceRegistryConfig serviceRegistryConfig, MicroserviceDefinition microserviceDefinition) { setName(serviceRegistryConfig.getRegistryName()); @@ -104,6 +110,14 @@ public abstract class AbstractServiceRegistry implements ServiceRegistry { createServiceCenterTask(); eventBus.register(this); + + initCache(); + } + + private void initCache() { + serviceRegistryCache = new RefreshableServiceRegistryCache(microservice, srClient); + serviceRegistryCache.setCacheRefreshedWatcher( + caches -> eventBus.post(new MicroserviceCacheRefreshedEvent(caches))); } @Override @@ -207,50 +221,15 @@ public abstract class AbstractServiceRegistry implements ServiceRegistry { public MicroserviceInstances findServiceInstances(String appId, String serviceName, String versionRule, String revision) { - MicroserviceInstances microserviceInstances = srClient.findServiceInstances(microservice.getServiceId(), - appId, - serviceName, - versionRule, - revision); - - if (microserviceInstances == null) { - LOGGER.error("Can not find any instances from service center due to previous errors. service={}/{}/{}", - appId, - serviceName, - versionRule); - return null; - } - - if (microserviceInstances.isMicroserviceNotExist()) { - return microserviceInstances; - } - - if (!microserviceInstances.isNeedRefresh()) { - LOGGER.debug("instances revision is not changed, service={}/{}/{}", appId, serviceName, versionRule); - return microserviceInstances; - } - - List<MicroserviceInstance> instances = microserviceInstances.getInstancesResponse().getInstances(); - LOGGER.info("find instances[{}] from service center success. service={}/{}/{}, old revision={}, new revision={}", - instances.size(), - appId, - serviceName, - versionRule, - revision, - microserviceInstances.getRevision()); - for (MicroserviceInstance instance : instances) { - LOGGER.info("service id={}, instance id={}, endpoints={}", - instance.getServiceId(), - instance.getInstanceId(), - instance.getEndpoints()); - } - return microserviceInstances; + MicroserviceCache microserviceCache = serviceRegistryCache + .findServiceCache(MicroserviceCacheKey.builder() + .serviceName(serviceName).appId(appId).env(microservice.getEnvironment()).build()); + return RegistryUtils.convertCacheToMicroserviceInstances(microserviceCache); } @Override public MicroserviceCache findMicroserviceCache(MicroserviceCacheKey microserviceCacheKey) { - // TODO find MicroserviceCache from ServiceRegistryCache - return null; + return serviceRegistryCache.findServiceCache(microserviceCacheKey); } @Override @@ -333,6 +312,10 @@ public abstract class AbstractServiceRegistry implements ServiceRegistry { this.name = name; } + public ServiceRegistryCache getServiceRegistryCache() { + return serviceRegistryCache; + } + @Subscribe public void onShutdown(ShutdownEvent event) { LOGGER.info("service center task is shutdown."); @@ -342,17 +325,28 @@ public abstract class AbstractServiceRegistry implements ServiceRegistry { // post from watch eventloop, should refresh the exact microservice instances immediately @Subscribe public void onMicroserviceInstanceChanged(MicroserviceInstanceChangedEvent changedEvent) { - executorService.execute(() -> RegistryUtils.getAppManager().onMicroserviceInstanceChanged(changedEvent)); + executorService.execute(new SuppressedRunnableWrapper( + () -> { + serviceRegistryCache.onMicroserviceInstanceChanged(changedEvent); + RegistryUtils.getAppManager().onMicroserviceInstanceChanged(changedEvent); + })); } // post from watch eventloop, should refresh all instances immediately @Subscribe public void serviceRegistryRecovery(RecoveryEvent event) { - executorService.execute(RegistryUtils.getAppManager()::pullInstances); + executorService.execute(() -> { + serviceRegistryCache.forceRefreshCache(); + RegistryUtils.getAppManager().pullInstances(); + }); } @Subscribe public void onSafeModeChanged(SafeModeChangeEvent modeChangeEvent) { - executorService.execute(() -> RegistryUtils.getAppManager().onSafeModeChanged(modeChangeEvent)); + executorService.execute(() -> { + LOGGER.warn("receive SafeModeChangeEvent, current mode={}", modeChangeEvent.getCurrentMode()); + serviceRegistryCache.onSafeModeChanged(modeChangeEvent); + RegistryUtils.getAppManager().onSafeModeChanged(modeChangeEvent); + }); } } diff --git a/service-registry/src/main/java/org/apache/servicecomb/serviceregistry/registry/RemoteServiceRegistry.java b/service-registry/src/main/java/org/apache/servicecomb/serviceregistry/registry/RemoteServiceRegistry.java index b6af226..2002177 100644 --- a/service-registry/src/main/java/org/apache/servicecomb/serviceregistry/registry/RemoteServiceRegistry.java +++ b/service-registry/src/main/java/org/apache/servicecomb/serviceregistry/registry/RemoteServiceRegistry.java @@ -23,7 +23,6 @@ import java.util.concurrent.TimeUnit; import org.apache.servicecomb.foundation.common.concurrency.SuppressedRunnableWrapper; import org.apache.servicecomb.foundation.common.utils.SPIServiceUtils; -import org.apache.servicecomb.serviceregistry.RegistryUtils; import org.apache.servicecomb.serviceregistry.client.ServiceRegistryClient; import org.apache.servicecomb.serviceregistry.client.http.ServiceRegistryClientImpl; import org.apache.servicecomb.serviceregistry.config.ServiceRegistryConfig; @@ -85,7 +84,9 @@ public class RemoteServiceRegistry extends AbstractServiceRegistry { TimeUnit.SECONDS); taskPool.scheduleAtFixedRate( - new SuppressedRunnableWrapper(RegistryUtils.getAppManager()::pullInstances), + new SuppressedRunnableWrapper(() -> { + serviceRegistryCache.refreshCache(); + }), serviceRegistryConfig.getInstancePullInterval(), serviceRegistryConfig.getInstancePullInterval(), TimeUnit.SECONDS); diff --git a/service-registry/src/test/java/org/apache/servicecomb/serviceregistry/RegistryUtilsTest.java b/service-registry/src/test/java/org/apache/servicecomb/serviceregistry/RegistryUtilsTest.java new file mode 100644 index 0000000..ab7efc5 --- /dev/null +++ b/service-registry/src/test/java/org/apache/servicecomb/serviceregistry/RegistryUtilsTest.java @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.servicecomb.serviceregistry; + +import java.util.Collections; + +import org.apache.servicecomb.serviceregistry.api.registry.MicroserviceInstance; +import org.apache.servicecomb.serviceregistry.client.http.MicroserviceInstances; +import org.apache.servicecomb.serviceregistry.registry.cache.MicroserviceCache.MicroserviceCacheStatus; +import org.apache.servicecomb.serviceregistry.registry.cache.MockedMicroserviceCache; +import org.junit.Assert; +import org.junit.Test; + +public class RegistryUtilsTest { + @Test + public void convertCacheToMicroserviceInstances() { + MockedMicroserviceCache microserviceCache = new MockedMicroserviceCache(); + microserviceCache.setStatus(MicroserviceCacheStatus.CLIENT_ERROR); + MicroserviceInstances microserviceInstances = RegistryUtils + .convertCacheToMicroserviceInstances(microserviceCache); + Assert.assertNull(microserviceInstances); + + microserviceCache = new MockedMicroserviceCache(); + microserviceCache.setStatus(MicroserviceCacheStatus.SETTING_CACHE_ERROR); + microserviceInstances = RegistryUtils.convertCacheToMicroserviceInstances(microserviceCache); + Assert.assertNull(microserviceInstances); + + microserviceCache = new MockedMicroserviceCache(); + microserviceCache.setStatus(MicroserviceCacheStatus.INIT); + microserviceInstances = RegistryUtils.convertCacheToMicroserviceInstances(microserviceCache); + Assert.assertNull(microserviceInstances); + + microserviceCache = new MockedMicroserviceCache(); + microserviceCache.setStatus(MicroserviceCacheStatus.SERVICE_NOT_FOUND); + microserviceInstances = RegistryUtils.convertCacheToMicroserviceInstances(microserviceCache); + Assert.assertTrue(microserviceInstances.isMicroserviceNotExist()); + Assert.assertFalse(microserviceInstances.isNeedRefresh()); + Assert.assertEquals("", microserviceInstances.getRevision()); + Assert.assertNull(microserviceInstances.getInstancesResponse()); + + microserviceCache = new MockedMicroserviceCache(); + microserviceCache.setStatus(MicroserviceCacheStatus.REFRESHED); + microserviceCache.setRevisionId("0166f3c18702617d5e55cf911e4e412cc8760dab"); + MicroserviceInstance microserviceInstance = new MicroserviceInstance(); + microserviceCache.setInstances(Collections.singletonList(microserviceInstance)); + microserviceInstances = RegistryUtils.convertCacheToMicroserviceInstances(microserviceCache); + Assert.assertFalse(microserviceInstances.isMicroserviceNotExist()); + Assert.assertTrue(microserviceInstances.isNeedRefresh()); + Assert.assertEquals("0166f3c18702617d5e55cf911e4e412cc8760dab", microserviceInstances.getRevision()); + Assert.assertEquals(1, microserviceInstances.getInstancesResponse().getInstances().size()); + Assert.assertSame(microserviceInstance, microserviceInstances.getInstancesResponse().getInstances().get(0)); + + microserviceCache = new MockedMicroserviceCache(); + microserviceCache.setStatus(MicroserviceCacheStatus.NO_CHANGE); + microserviceCache.setRevisionId("0166f3c18702617d5e55cf911e4e412cc8760dab"); + microserviceInstances = RegistryUtils.convertCacheToMicroserviceInstances(microserviceCache); + Assert.assertFalse(microserviceInstances.isMicroserviceNotExist()); + Assert.assertFalse(microserviceInstances.isNeedRefresh()); + Assert.assertEquals("0166f3c18702617d5e55cf911e4e412cc8760dab", microserviceInstances.getRevision()); + Assert.assertNull(microserviceInstances.getInstancesResponse()); + } +} \ No newline at end of file
