This is an automated email from the ASF dual-hosted git repository. liubao pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/servicecomb-java-chassis.git
commit eb7b31912bec28f86132476d56f97e49c6266651 Author: heyile <[email protected]> AuthorDate: Sat Nov 24 10:37:29 2018 +0800 [SCB-1039]Add interface to compatible with ServiceCenter Aggregator --- .../definition/schema/ConsumerSchemaFactory.java | 2 +- .../servicecomb/serviceregistry/RegistryUtils.java | 2 +- .../serviceregistry/ServiceRegistry.java | 9 +++ .../cache/MicroserviceInstanceCache.java | 2 +- .../client/LocalServiceRegistryClientImpl.java | 15 ++++ .../client/ServiceRegistryClient.java | 18 +++++ .../client/http/ServiceRegistryClientImpl.java | 52 +++++++++++++- .../registry/AbstractServiceRegistry.java | 5 ++ .../cache/TestMicroserviceInstanceCache.java | 4 +- .../client/http/TestServiceRegistryClientImpl.java | 79 +++++++++++++++++++++- 10 files changed, 179 insertions(+), 9 deletions(-) diff --git a/core/src/main/java/org/apache/servicecomb/core/definition/schema/ConsumerSchemaFactory.java b/core/src/main/java/org/apache/servicecomb/core/definition/schema/ConsumerSchemaFactory.java index a3dbbdb..c9b8ed2 100644 --- a/core/src/main/java/org/apache/servicecomb/core/definition/schema/ConsumerSchemaFactory.java +++ b/core/src/main/java/org/apache/servicecomb/core/definition/schema/ConsumerSchemaFactory.java @@ -65,7 +65,7 @@ public class ConsumerSchemaFactory extends AbstractSchemaFactory<ConsumerSchemaC } ServiceRegistryClient client = RegistryUtils.getServiceRegistryClient(); - String schemaContent = client.getSchema(context.getMicroservice().getServiceId(), context.getSchemaId()); + String schemaContent = client.getAggregatedSchema(context.getMicroservice().getServiceId(), context.getSchemaId()); LOGGER.info("load schema from service center, microservice={}:{}:{}, schemaId={}, result={}", context.getMicroservice().getAppId(), context.getMicroservice().getServiceName(), diff --git a/service-registry/src/main/java/org/apache/servicecomb/serviceregistry/RegistryUtils.java b/service-registry/src/main/java/org/apache/servicecomb/serviceregistry/RegistryUtils.java index 1853578..78f0d46 100644 --- a/service-registry/src/main/java/org/apache/servicecomb/serviceregistry/RegistryUtils.java +++ b/service-registry/src/main/java/org/apache/servicecomb/serviceregistry/RegistryUtils.java @@ -216,7 +216,7 @@ public final class RegistryUtils { } public static Microservice getMicroservice(String microserviceId) { - return serviceRegistry.getRemoteMicroservice(microserviceId); + return serviceRegistry.getAggregatedRemoteMicroervice(microserviceId); } public static MicroserviceInstances findServiceInstances(String appId, String serviceName, diff --git a/service-registry/src/main/java/org/apache/servicecomb/serviceregistry/ServiceRegistry.java b/service-registry/src/main/java/org/apache/servicecomb/serviceregistry/ServiceRegistry.java index ead2ccc..c858c5a 100644 --- a/service-registry/src/main/java/org/apache/servicecomb/serviceregistry/ServiceRegistry.java +++ b/service-registry/src/main/java/org/apache/servicecomb/serviceregistry/ServiceRegistry.java @@ -58,6 +58,15 @@ public interface ServiceRegistry { Microservice getRemoteMicroservice(String microserviceId); + /** + * <p> + * if connect to simple ServiceCenter, same with the method + * {@linkplain org.apache.servicecomb.serviceregistry.ServiceRegistry#getRemoteMicroservice(String)} } + * if connect to ServiceCenter Aggregator, not only contain the target ServiceCenter but also other ServiceCenter cluster + * </p> + */ + Microservice getAggregatedRemoteMicroervice(String microserviceId); + Features getFeatures(); /** diff --git a/service-registry/src/main/java/org/apache/servicecomb/serviceregistry/cache/MicroserviceInstanceCache.java b/service-registry/src/main/java/org/apache/servicecomb/serviceregistry/cache/MicroserviceInstanceCache.java index eb6828d..16a0381 100644 --- a/service-registry/src/main/java/org/apache/servicecomb/serviceregistry/cache/MicroserviceInstanceCache.java +++ b/service-registry/src/main/java/org/apache/servicecomb/serviceregistry/cache/MicroserviceInstanceCache.java @@ -52,7 +52,7 @@ public class MicroserviceInstanceCache { public static Microservice getOrCreate(String serviceId) { try { return microservices.get(serviceId, () -> { - Microservice microservice = RegistryUtils.getServiceRegistryClient().getMicroservice(serviceId); + Microservice microservice = RegistryUtils.getServiceRegistryClient().getAggregatedMicroservice(serviceId); if (microservice == null) { throw new IllegalArgumentException("service id not exists."); } diff --git a/service-registry/src/main/java/org/apache/servicecomb/serviceregistry/client/LocalServiceRegistryClientImpl.java b/service-registry/src/main/java/org/apache/servicecomb/serviceregistry/client/LocalServiceRegistryClientImpl.java index 196cd24..9112038 100644 --- a/service-registry/src/main/java/org/apache/servicecomb/serviceregistry/client/LocalServiceRegistryClientImpl.java +++ b/service-registry/src/main/java/org/apache/servicecomb/serviceregistry/client/LocalServiceRegistryClientImpl.java @@ -189,6 +189,11 @@ public class LocalServiceRegistryClientImpl implements ServiceRegistryClient { } @Override + public Microservice getAggregatedMicroservice(String microserviceId) { + return microserviceIdMap.get(microserviceId); + } + + @Override public String registerMicroserviceInstance(MicroserviceInstance instance) { Map<String, MicroserviceInstance> instanceMap = microserviceInstanceMap.get(instance.getServiceId()); if (instanceMap == null) { @@ -354,6 +359,16 @@ public class LocalServiceRegistryClientImpl implements ServiceRegistryClient { } @Override + public String getAggregatedSchema(String microserviceId, String schemaId) { + Microservice microservice = microserviceIdMap.get(microserviceId); + if (microservice == null) { + throw new IllegalArgumentException("Invalid serviceId, serviceId=" + microserviceId); + } + + return microservice.getSchemaMap().get(schemaId); + } + + @Override public Holder<List<GetSchemaResponse>> getSchemas(String microserviceId) { Microservice microservice = microserviceIdMap.get(microserviceId); if (microservice == null) { diff --git a/service-registry/src/main/java/org/apache/servicecomb/serviceregistry/client/ServiceRegistryClient.java b/service-registry/src/main/java/org/apache/servicecomb/serviceregistry/client/ServiceRegistryClient.java index 4af6d70..8e08ae4 100644 --- a/service-registry/src/main/java/org/apache/servicecomb/serviceregistry/client/ServiceRegistryClient.java +++ b/service-registry/src/main/java/org/apache/servicecomb/serviceregistry/client/ServiceRegistryClient.java @@ -57,6 +57,15 @@ public interface ServiceRegistryClient { Microservice getMicroservice(String microserviceId); /** + * <p> + * if connect to simple ServiceCenter, same with the method + * {@linkplain org.apache.servicecomb.serviceregistry.client.ServiceRegistryClient#getMicroservice(String)} + * if connect to ServiceCenter Aggregator, not only contain the target ServiceCenter but also other ServiceCenter clusters + * </p> + */ + Microservice getAggregatedMicroservice(String microserviceId); + + /** * 更新微服务properties */ boolean updateMicroserviceProperties(String microserviceId, Map<String, String> serviceProperties); @@ -80,6 +89,15 @@ public interface ServiceRegistryClient { String getSchema(String microserviceId, String schemaId); /** + * <p> + * if connect to simple ServiceCenter, same with the method + * {@linkplain org.apache.servicecomb.serviceregistry.client.ServiceRegistryClient#getSchema(String, String)} + * if connect to ServiceCenter Aggregator, not only contain the target ServiceCenter but also other ServiceCenter clusters + * </p> + */ + String getAggregatedSchema(String microserviceId, String schemaId); + + /** * * 批量获取schemas内容 */ diff --git a/service-registry/src/main/java/org/apache/servicecomb/serviceregistry/client/http/ServiceRegistryClientImpl.java b/service-registry/src/main/java/org/apache/servicecomb/serviceregistry/client/http/ServiceRegistryClientImpl.java index b5e2436..a9082f6 100644 --- a/service-registry/src/main/java/org/apache/servicecomb/serviceregistry/client/http/ServiceRegistryClientImpl.java +++ b/service-registry/src/main/java/org/apache/servicecomb/serviceregistry/client/http/ServiceRegistryClientImpl.java @@ -358,6 +358,30 @@ public final class ServiceRegistryClientImpl implements ServiceRegistryClient { countDownLatch.await(); } catch (Exception e) { LOGGER.error("query schema exist {}/{} failed", + schemaId, + e); + } + if (holder.value != null) { + return holder.value.getSchema(); + } + + return null; + } + + @Override + public String getAggregatedSchema(String microserviceId, String schemaId) { + Holder<GetSchemaResponse> holder = new Holder<>(); + IpPort ipPort = ipPortManager.getAvailableAddress(); + + CountDownLatch countDownLatch = new CountDownLatch(1); + RestUtils.get(ipPort, + String.format(Const.REGISTRY_API.MICROSERVICE_SCHEMA, microserviceId, schemaId), + new RequestParam().addQueryParam("global", "true"), + syncHandler(countDownLatch, GetSchemaResponse.class, holder)); + try { + countDownLatch.await(); + } catch (Exception e) { + LOGGER.error("query schema exist {}/{} failed", microserviceId, schemaId, e); @@ -453,6 +477,27 @@ public final class ServiceRegistryClientImpl implements ServiceRegistryClient { } @Override + public Microservice getAggregatedMicroservice(String microserviceId) { + Holder<GetServiceResponse> holder = new Holder<>(); + IpPort ipPort = ipPortManager.getAvailableAddress(); + + CountDownLatch countDownLatch = new CountDownLatch(1); + RestUtils.get(ipPort, + String.format(Const.REGISTRY_API.MICROSERVICE_OPERATION_ONE, microserviceId), + new RequestParam().addQueryParam("global", "true"), + syncHandler(countDownLatch, GetServiceResponse.class, holder)); + try { + countDownLatch.await(); + if (holder.value != null) { + return holder.value.getService(); + } + } catch (Exception e) { + LOGGER.error("query microservice {} failed", microserviceId, e); + } + return null; + } + + @Override public String registerMicroserviceInstance(MicroserviceInstance instance) { Holder<RegisterInstanceResponse> holder = new Holder<>(); IpPort ipPort = ipPortManager.getAvailableAddress(); @@ -489,7 +534,7 @@ public final class ServiceRegistryClientImpl implements ServiceRegistryClient { CountDownLatch countDownLatch = new CountDownLatch(1); RestUtils.get(ipPort, String.format(Const.REGISTRY_API.MICROSERVICE_INSTANCE_OPERATION_ALL, providerId), - new RequestParam().addHeader("X-ConsumerId", consumerId), + new RequestParam().addHeader("X-ConsumerId", consumerId).addQueryParam("global", "true"), syncHandler(countDownLatch, GetInstancesResponse.class, holder)); try { countDownLatch.await(); @@ -650,6 +695,7 @@ public final class ServiceRegistryClientImpl implements ServiceRegistryClient { RequestParam requestParam = new RequestParam().addQueryParam("appId", appId) .addQueryParam("serviceName", serviceName) + .addQueryParam("global", "true") .addQueryParam("version", versionRule) .addHeader("X-ConsumerId", consumerId); if (revision != null) { @@ -774,7 +820,7 @@ public final class ServiceRegistryClientImpl implements ServiceRegistryClient { CountDownLatch countDownLatch = new CountDownLatch(1); RestUtils.get(ipPort, String.format(Const.REGISTRY_API.MICROSERVICE_INSTANCE_OPERATION_ONE, serviceId, instanceId), - new RequestParam().addHeader("X-ConsumerId", serviceId), + new RequestParam().addHeader("X-ConsumerId", serviceId).addQueryParam("global", "true"), syncHandler(countDownLatch, MicroserviceInstanceResponse.class, holder)); countDownLatch.await(); if (null != holder.value) { @@ -795,7 +841,7 @@ public final class ServiceRegistryClientImpl implements ServiceRegistryClient { CountDownLatch countDownLatch = new CountDownLatch(1); RestUtils.get(ipPort, Const.REGISTRY_API.SERVICECENTER_VERSION, - new RequestParam(), + new RequestParam().addQueryParam("global", "true"), syncHandler(countDownLatch, ServiceCenterInfo.class, holder)); try { countDownLatch.await(); diff --git a/service-registry/src/main/java/org/apache/servicecomb/serviceregistry/registry/AbstractServiceRegistry.java b/service-registry/src/main/java/org/apache/servicecomb/serviceregistry/registry/AbstractServiceRegistry.java index be33fda..2e6e100 100644 --- a/service-registry/src/main/java/org/apache/servicecomb/serviceregistry/registry/AbstractServiceRegistry.java +++ b/service-registry/src/main/java/org/apache/servicecomb/serviceregistry/registry/AbstractServiceRegistry.java @@ -298,6 +298,11 @@ public abstract class AbstractServiceRegistry implements ServiceRegistry { return srClient.getMicroservice(microserviceId); } + @Override + public Microservice getAggregatedRemoteMicroervice(String microserviceId) { + return srClient.getAggregatedMicroservice(microserviceId); + } + public Microservice getMicroservice() { return microservice; } diff --git a/service-registry/src/test/java/org/apache/servicecomb/serviceregistry/cache/TestMicroserviceInstanceCache.java b/service-registry/src/test/java/org/apache/servicecomb/serviceregistry/cache/TestMicroserviceInstanceCache.java index 5e55e6d..85da7fb 100644 --- a/service-registry/src/test/java/org/apache/servicecomb/serviceregistry/cache/TestMicroserviceInstanceCache.java +++ b/service-registry/src/test/java/org/apache/servicecomb/serviceregistry/cache/TestMicroserviceInstanceCache.java @@ -35,9 +35,9 @@ public class TestMicroserviceInstanceCache { { RegistryUtils.getServiceRegistryClient(); result = client; - client.getMicroservice("forkedid"); + client.getAggregatedMicroservice("forkedid"); result = microservice; - client.getMicroservice("forkedidNull"); + client.getAggregatedMicroservice("forkedidNull"); result = null; } }; diff --git a/service-registry/src/test/java/org/apache/servicecomb/serviceregistry/client/http/TestServiceRegistryClientImpl.java b/service-registry/src/test/java/org/apache/servicecomb/serviceregistry/client/http/TestServiceRegistryClientImpl.java index 1c238dc..80b7d58 100644 --- a/service-registry/src/test/java/org/apache/servicecomb/serviceregistry/client/http/TestServiceRegistryClientImpl.java +++ b/service-registry/src/test/java/org/apache/servicecomb/serviceregistry/client/http/TestServiceRegistryClientImpl.java @@ -36,6 +36,7 @@ import org.apache.servicecomb.serviceregistry.api.registry.ServiceCenterInfo; import org.apache.servicecomb.serviceregistry.api.response.GetExistenceResponse; import org.apache.servicecomb.serviceregistry.api.response.GetSchemaResponse; import org.apache.servicecomb.serviceregistry.api.response.GetSchemasResponse; +import org.apache.servicecomb.serviceregistry.api.response.GetServiceResponse; import org.apache.servicecomb.serviceregistry.client.ClientException; import org.apache.servicecomb.serviceregistry.client.IpPortManager; import org.apache.servicecomb.serviceregistry.client.http.ServiceRegistryClientImpl.ResponseWrapper; @@ -50,6 +51,7 @@ import io.vertx.core.Handler; import io.vertx.core.buffer.Buffer; import io.vertx.core.http.HttpClientOptions; import io.vertx.core.http.HttpClientResponse; +import io.vertx.core.http.HttpMethod; import io.vertx.core.json.Json; import mockit.Deencapsulation; import mockit.Expectations; @@ -299,6 +301,53 @@ public class TestServiceRegistryClientImpl { } @Test + public void getAggregatedMicroservice() { + String microserviceId = "msId"; + new MockUp<RestUtils>() { + @Mock + void httpDo(RequestContext requestContext, Handler<RestResponse> responseHandler) { + Holder<GetServiceResponse> holder = Deencapsulation.getField(responseHandler, "arg$4"); + GetServiceResponse serviceResp = Json + .decodeValue( + "{\"service\":{\"serviceId\":\"serviceId\",\"framework\":null" + + ",\"registerBy\":null,\"environment\":null,\"appId\":\"appId\",\"serviceName\":null," + + "\"alias\":null,\"version\":null,\"description\":null,\"level\":null,\"schemas\":[]," + + "\"paths\":[],\"status\":\"UP\",\"properties\":{},\"intance\":null}}", + GetServiceResponse.class); + holder.value = serviceResp; + RequestParam requestParam = requestContext.getParams(); + Assert.assertEquals("global=true", requestParam.getQueryParams()); + } + }; + Microservice aggregatedMicroservice = oClient.getAggregatedMicroservice(microserviceId); + Assert.assertEquals("serviceId", aggregatedMicroservice.getServiceId()); + Assert.assertEquals("appId", aggregatedMicroservice.getAppId()); + } + + @Test + public void getAggregatedSchema() { + String microserviceId = "msId"; + String schemaId = "schemaId"; + + new MockUp<RestUtils>() { + + @Mock + void httpDo(RequestContext requestContext, Handler<RestResponse> responseHandler) { + Holder<GetSchemaResponse> holder = Deencapsulation.getField(responseHandler, "arg$4"); + GetSchemaResponse schemasResp = Json + .decodeValue( + "{ \"schema\": \"schema\", \"schemaId\":\"metricsEndpoint\",\"summary\":\"c1188d709631a9038874f9efc6eb894f\"}", + GetSchemaResponse.class); + holder.value = schemasResp; + RequestParam requestParam = requestContext.getParams(); + Assert.assertEquals("global=true", requestParam.getQueryParams()); + } + }; + String str = oClient.getAggregatedSchema(microserviceId, schemaId); + Assert.assertEquals("schema", str); + } + + @Test public void getSchemas() { String microserviceId = "msId"; @@ -363,6 +412,13 @@ public class TestServiceRegistryClientImpl { @Test public void testFindServiceInstance() { + new MockUp<RestUtils>() { + @Mock + void get(IpPort ipPort, String uri, RequestParam requestParam, + Handler<RestResponse> responseHandler) { + Assert.assertEquals("global=true", requestParam.getQueryParams()); + } + }; Assert.assertNull(oClient.findServiceInstance("aaa", "bbb")); } @@ -374,7 +430,13 @@ public class TestServiceRegistryClientImpl { throw new Error("must not invoke this."); } }; - + new MockUp<RestUtils>() { + @Mock + void get(IpPort ipPort, String uri, RequestParam requestParam, + Handler<RestResponse> responseHandler) { + Assert.assertEquals("global=true", requestParam.getQueryParams()); + } + }; Assert.assertNull(oClient.findServiceInstance(null, "appId", "serviceName", "1.0.0+")); } @@ -396,6 +458,14 @@ public class TestServiceRegistryClientImpl { RestResponse restResponse = new RestResponse(requestContext, response); new MockUp<RestUtils>() { @Mock + void get(IpPort ipPort, String uri, RequestParam requestParam, + Handler<RestResponse> responseHandler) { + Assert.assertEquals("appId=appId&global=true&serviceName=serviceName&version=0.0.0%2B", + requestParam.getQueryParams()); + httpDo(RestUtils.createRequestContext(HttpMethod.GET, ipPort, uri, requestParam), responseHandler); + } + + @Mock void httpDo(RequestContext requestContext, Handler<RestResponse> responseHandler) { responseHandler.handle(restResponse); } @@ -418,6 +488,13 @@ public class TestServiceRegistryClientImpl { new MockUp<RestUtils>() { @Mock + void get(IpPort ipPort, String uri, RequestParam requestParam, + Handler<RestResponse> responseHandler) { + Assert.assertEquals("global=true", requestParam.getQueryParams()); + httpDo(RestUtils.createRequestContext(HttpMethod.GET, ipPort, uri, requestParam), responseHandler); + } + + @Mock void httpDo(RequestContext requestContext, Handler<RestResponse> responseHandler) { Holder<ServiceCenterInfo> holder = Deencapsulation.getField(responseHandler, "arg$4"); holder.value = serviceCenterInfo;
