This is an automated email from the ASF dual-hosted git repository.

liubao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/servicecomb-java-chassis.git

commit eb7b31912bec28f86132476d56f97e49c6266651
Author: heyile <[email protected]>
AuthorDate: Sat Nov 24 10:37:29 2018 +0800

    [SCB-1039]Add interface to compatible with ServiceCenter Aggregator
---
 .../definition/schema/ConsumerSchemaFactory.java   |  2 +-
 .../servicecomb/serviceregistry/RegistryUtils.java |  2 +-
 .../serviceregistry/ServiceRegistry.java           |  9 +++
 .../cache/MicroserviceInstanceCache.java           |  2 +-
 .../client/LocalServiceRegistryClientImpl.java     | 15 ++++
 .../client/ServiceRegistryClient.java              | 18 +++++
 .../client/http/ServiceRegistryClientImpl.java     | 52 +++++++++++++-
 .../registry/AbstractServiceRegistry.java          |  5 ++
 .../cache/TestMicroserviceInstanceCache.java       |  4 +-
 .../client/http/TestServiceRegistryClientImpl.java | 79 +++++++++++++++++++++-
 10 files changed, 179 insertions(+), 9 deletions(-)

diff --git 
a/core/src/main/java/org/apache/servicecomb/core/definition/schema/ConsumerSchemaFactory.java
 
b/core/src/main/java/org/apache/servicecomb/core/definition/schema/ConsumerSchemaFactory.java
index a3dbbdb..c9b8ed2 100644
--- 
a/core/src/main/java/org/apache/servicecomb/core/definition/schema/ConsumerSchemaFactory.java
+++ 
b/core/src/main/java/org/apache/servicecomb/core/definition/schema/ConsumerSchemaFactory.java
@@ -65,7 +65,7 @@ public class ConsumerSchemaFactory extends 
AbstractSchemaFactory<ConsumerSchemaC
     }
 
     ServiceRegistryClient client = RegistryUtils.getServiceRegistryClient();
-    String schemaContent = 
client.getSchema(context.getMicroservice().getServiceId(), 
context.getSchemaId());
+    String schemaContent = 
client.getAggregatedSchema(context.getMicroservice().getServiceId(), 
context.getSchemaId());
     LOGGER.info("load schema from service center, microservice={}:{}:{}, 
schemaId={}, result={}",
         context.getMicroservice().getAppId(),
         context.getMicroservice().getServiceName(),
diff --git 
a/service-registry/src/main/java/org/apache/servicecomb/serviceregistry/RegistryUtils.java
 
b/service-registry/src/main/java/org/apache/servicecomb/serviceregistry/RegistryUtils.java
index 1853578..78f0d46 100644
--- 
a/service-registry/src/main/java/org/apache/servicecomb/serviceregistry/RegistryUtils.java
+++ 
b/service-registry/src/main/java/org/apache/servicecomb/serviceregistry/RegistryUtils.java
@@ -216,7 +216,7 @@ public final class RegistryUtils {
   }
 
   public static Microservice getMicroservice(String microserviceId) {
-    return serviceRegistry.getRemoteMicroservice(microserviceId);
+    return serviceRegistry.getAggregatedRemoteMicroervice(microserviceId);
   }
 
   public static MicroserviceInstances findServiceInstances(String appId, 
String serviceName,
diff --git 
a/service-registry/src/main/java/org/apache/servicecomb/serviceregistry/ServiceRegistry.java
 
b/service-registry/src/main/java/org/apache/servicecomb/serviceregistry/ServiceRegistry.java
index ead2ccc..c858c5a 100644
--- 
a/service-registry/src/main/java/org/apache/servicecomb/serviceregistry/ServiceRegistry.java
+++ 
b/service-registry/src/main/java/org/apache/servicecomb/serviceregistry/ServiceRegistry.java
@@ -58,6 +58,15 @@ public interface ServiceRegistry {
 
   Microservice getRemoteMicroservice(String microserviceId);
 
+  /**
+   * <p>
+   *    if connect to simple ServiceCenter, same with the method
+   *    {@linkplain 
org.apache.servicecomb.serviceregistry.ServiceRegistry#getRemoteMicroservice(String)}
  }
+   *    if connect to ServiceCenter Aggregator, not only contain the target 
ServiceCenter but also other ServiceCenter cluster
+   * </p>
+   */
+  Microservice getAggregatedRemoteMicroervice(String microserviceId);
+
   Features getFeatures();
 
   /**
diff --git 
a/service-registry/src/main/java/org/apache/servicecomb/serviceregistry/cache/MicroserviceInstanceCache.java
 
b/service-registry/src/main/java/org/apache/servicecomb/serviceregistry/cache/MicroserviceInstanceCache.java
index eb6828d..16a0381 100644
--- 
a/service-registry/src/main/java/org/apache/servicecomb/serviceregistry/cache/MicroserviceInstanceCache.java
+++ 
b/service-registry/src/main/java/org/apache/servicecomb/serviceregistry/cache/MicroserviceInstanceCache.java
@@ -52,7 +52,7 @@ public class MicroserviceInstanceCache {
   public static Microservice getOrCreate(String serviceId) {
     try {
       return microservices.get(serviceId, () -> {
-        Microservice microservice = 
RegistryUtils.getServiceRegistryClient().getMicroservice(serviceId);
+        Microservice microservice = 
RegistryUtils.getServiceRegistryClient().getAggregatedMicroservice(serviceId);
         if (microservice == null) {
           throw new IllegalArgumentException("service id not exists.");
         }
diff --git 
a/service-registry/src/main/java/org/apache/servicecomb/serviceregistry/client/LocalServiceRegistryClientImpl.java
 
b/service-registry/src/main/java/org/apache/servicecomb/serviceregistry/client/LocalServiceRegistryClientImpl.java
index 196cd24..9112038 100644
--- 
a/service-registry/src/main/java/org/apache/servicecomb/serviceregistry/client/LocalServiceRegistryClientImpl.java
+++ 
b/service-registry/src/main/java/org/apache/servicecomb/serviceregistry/client/LocalServiceRegistryClientImpl.java
@@ -189,6 +189,11 @@ public class LocalServiceRegistryClientImpl implements 
ServiceRegistryClient {
   }
 
   @Override
+  public Microservice getAggregatedMicroservice(String microserviceId) {
+    return microserviceIdMap.get(microserviceId);
+  }
+
+  @Override
   public String registerMicroserviceInstance(MicroserviceInstance instance) {
     Map<String, MicroserviceInstance> instanceMap = 
microserviceInstanceMap.get(instance.getServiceId());
     if (instanceMap == null) {
@@ -354,6 +359,16 @@ public class LocalServiceRegistryClientImpl implements 
ServiceRegistryClient {
   }
 
   @Override
+  public String getAggregatedSchema(String microserviceId, String schemaId) {
+    Microservice microservice = microserviceIdMap.get(microserviceId);
+    if (microservice == null) {
+      throw new IllegalArgumentException("Invalid serviceId, serviceId=" + 
microserviceId);
+    }
+
+    return microservice.getSchemaMap().get(schemaId);
+  }
+
+  @Override
   public Holder<List<GetSchemaResponse>> getSchemas(String microserviceId) {
     Microservice microservice = microserviceIdMap.get(microserviceId);
     if (microservice == null) {
diff --git 
a/service-registry/src/main/java/org/apache/servicecomb/serviceregistry/client/ServiceRegistryClient.java
 
b/service-registry/src/main/java/org/apache/servicecomb/serviceregistry/client/ServiceRegistryClient.java
index 4af6d70..8e08ae4 100644
--- 
a/service-registry/src/main/java/org/apache/servicecomb/serviceregistry/client/ServiceRegistryClient.java
+++ 
b/service-registry/src/main/java/org/apache/servicecomb/serviceregistry/client/ServiceRegistryClient.java
@@ -57,6 +57,15 @@ public interface ServiceRegistryClient {
   Microservice getMicroservice(String microserviceId);
 
   /**
+   * <p>
+   *    if connect to simple ServiceCenter, same with the method
+   *    {@linkplain 
org.apache.servicecomb.serviceregistry.client.ServiceRegistryClient#getMicroservice(String)}
+   *    if connect to ServiceCenter Aggregator, not only contain the target 
ServiceCenter but also other ServiceCenter clusters
+   * </p>
+   */
+  Microservice getAggregatedMicroservice(String microserviceId);
+
+  /**
    * 更新微服务properties
    */
   boolean updateMicroserviceProperties(String microserviceId, Map<String, 
String> serviceProperties);
@@ -80,6 +89,15 @@ public interface ServiceRegistryClient {
   String getSchema(String microserviceId, String schemaId);
 
   /**
+   * <p>
+   *    if connect to simple ServiceCenter, same with the method
+   *    {@linkplain 
org.apache.servicecomb.serviceregistry.client.ServiceRegistryClient#getSchema(String,
 String)}
+   *    if connect to ServiceCenter Aggregator, not only contain the target 
ServiceCenter but also other ServiceCenter clusters
+   * </p>
+   */
+  String getAggregatedSchema(String microserviceId, String schemaId);
+
+  /**
    *
    * 批量获取schemas内容
    */
diff --git 
a/service-registry/src/main/java/org/apache/servicecomb/serviceregistry/client/http/ServiceRegistryClientImpl.java
 
b/service-registry/src/main/java/org/apache/servicecomb/serviceregistry/client/http/ServiceRegistryClientImpl.java
index b5e2436..a9082f6 100644
--- 
a/service-registry/src/main/java/org/apache/servicecomb/serviceregistry/client/http/ServiceRegistryClientImpl.java
+++ 
b/service-registry/src/main/java/org/apache/servicecomb/serviceregistry/client/http/ServiceRegistryClientImpl.java
@@ -358,6 +358,30 @@ public final class ServiceRegistryClientImpl implements 
ServiceRegistryClient {
       countDownLatch.await();
     } catch (Exception e) {
       LOGGER.error("query schema exist {}/{} failed",
+          schemaId,
+          e);
+    }
+    if (holder.value != null) {
+      return holder.value.getSchema();
+    }
+
+    return null;
+  }
+
+  @Override
+  public String getAggregatedSchema(String microserviceId, String schemaId) {
+    Holder<GetSchemaResponse> holder = new Holder<>();
+    IpPort ipPort = ipPortManager.getAvailableAddress();
+
+    CountDownLatch countDownLatch = new CountDownLatch(1);
+    RestUtils.get(ipPort,
+        String.format(Const.REGISTRY_API.MICROSERVICE_SCHEMA, microserviceId, 
schemaId),
+        new RequestParam().addQueryParam("global", "true"),
+        syncHandler(countDownLatch, GetSchemaResponse.class, holder));
+    try {
+      countDownLatch.await();
+    } catch (Exception e) {
+      LOGGER.error("query schema exist {}/{} failed",
           microserviceId,
           schemaId,
           e);
@@ -453,6 +477,27 @@ public final class ServiceRegistryClientImpl implements 
ServiceRegistryClient {
   }
 
   @Override
+  public Microservice getAggregatedMicroservice(String microserviceId) {
+    Holder<GetServiceResponse> holder = new Holder<>();
+    IpPort ipPort = ipPortManager.getAvailableAddress();
+
+    CountDownLatch countDownLatch = new CountDownLatch(1);
+    RestUtils.get(ipPort,
+        String.format(Const.REGISTRY_API.MICROSERVICE_OPERATION_ONE, 
microserviceId),
+        new RequestParam().addQueryParam("global", "true"),
+        syncHandler(countDownLatch, GetServiceResponse.class, holder));
+    try {
+      countDownLatch.await();
+      if (holder.value != null) {
+        return holder.value.getService();
+      }
+    } catch (Exception e) {
+      LOGGER.error("query microservice {} failed", microserviceId, e);
+    }
+    return null;
+  }
+
+  @Override
   public String registerMicroserviceInstance(MicroserviceInstance instance) {
     Holder<RegisterInstanceResponse> holder = new Holder<>();
     IpPort ipPort = ipPortManager.getAvailableAddress();
@@ -489,7 +534,7 @@ public final class ServiceRegistryClientImpl implements 
ServiceRegistryClient {
     CountDownLatch countDownLatch = new CountDownLatch(1);
     RestUtils.get(ipPort,
         String.format(Const.REGISTRY_API.MICROSERVICE_INSTANCE_OPERATION_ALL, 
providerId),
-        new RequestParam().addHeader("X-ConsumerId", consumerId),
+        new RequestParam().addHeader("X-ConsumerId", 
consumerId).addQueryParam("global", "true"),
         syncHandler(countDownLatch, GetInstancesResponse.class, holder));
     try {
       countDownLatch.await();
@@ -650,6 +695,7 @@ public final class ServiceRegistryClientImpl implements 
ServiceRegistryClient {
 
     RequestParam requestParam = new RequestParam().addQueryParam("appId", 
appId)
         .addQueryParam("serviceName", serviceName)
+        .addQueryParam("global", "true")
         .addQueryParam("version", versionRule)
         .addHeader("X-ConsumerId", consumerId);
     if (revision != null) {
@@ -774,7 +820,7 @@ public final class ServiceRegistryClientImpl implements 
ServiceRegistryClient {
       CountDownLatch countDownLatch = new CountDownLatch(1);
       RestUtils.get(ipPort,
           
String.format(Const.REGISTRY_API.MICROSERVICE_INSTANCE_OPERATION_ONE, 
serviceId, instanceId),
-          new RequestParam().addHeader("X-ConsumerId", serviceId),
+          new RequestParam().addHeader("X-ConsumerId", 
serviceId).addQueryParam("global", "true"),
           syncHandler(countDownLatch, MicroserviceInstanceResponse.class, 
holder));
       countDownLatch.await();
       if (null != holder.value) {
@@ -795,7 +841,7 @@ public final class ServiceRegistryClientImpl implements 
ServiceRegistryClient {
     CountDownLatch countDownLatch = new CountDownLatch(1);
     RestUtils.get(ipPort,
         Const.REGISTRY_API.SERVICECENTER_VERSION,
-        new RequestParam(),
+        new RequestParam().addQueryParam("global", "true"),
         syncHandler(countDownLatch, ServiceCenterInfo.class, holder));
     try {
       countDownLatch.await();
diff --git 
a/service-registry/src/main/java/org/apache/servicecomb/serviceregistry/registry/AbstractServiceRegistry.java
 
b/service-registry/src/main/java/org/apache/servicecomb/serviceregistry/registry/AbstractServiceRegistry.java
index be33fda..2e6e100 100644
--- 
a/service-registry/src/main/java/org/apache/servicecomb/serviceregistry/registry/AbstractServiceRegistry.java
+++ 
b/service-registry/src/main/java/org/apache/servicecomb/serviceregistry/registry/AbstractServiceRegistry.java
@@ -298,6 +298,11 @@ public abstract class AbstractServiceRegistry implements 
ServiceRegistry {
     return srClient.getMicroservice(microserviceId);
   }
 
+  @Override
+  public Microservice getAggregatedRemoteMicroervice(String microserviceId) {
+    return srClient.getAggregatedMicroservice(microserviceId);
+  }
+
   public Microservice getMicroservice() {
     return microservice;
   }
diff --git 
a/service-registry/src/test/java/org/apache/servicecomb/serviceregistry/cache/TestMicroserviceInstanceCache.java
 
b/service-registry/src/test/java/org/apache/servicecomb/serviceregistry/cache/TestMicroserviceInstanceCache.java
index 5e55e6d..85da7fb 100644
--- 
a/service-registry/src/test/java/org/apache/servicecomb/serviceregistry/cache/TestMicroserviceInstanceCache.java
+++ 
b/service-registry/src/test/java/org/apache/servicecomb/serviceregistry/cache/TestMicroserviceInstanceCache.java
@@ -35,9 +35,9 @@ public class TestMicroserviceInstanceCache {
       {
         RegistryUtils.getServiceRegistryClient();
         result = client;
-        client.getMicroservice("forkedid");
+        client.getAggregatedMicroservice("forkedid");
         result = microservice;
-        client.getMicroservice("forkedidNull");
+        client.getAggregatedMicroservice("forkedidNull");
         result = null;
       }
     };
diff --git 
a/service-registry/src/test/java/org/apache/servicecomb/serviceregistry/client/http/TestServiceRegistryClientImpl.java
 
b/service-registry/src/test/java/org/apache/servicecomb/serviceregistry/client/http/TestServiceRegistryClientImpl.java
index 1c238dc..80b7d58 100644
--- 
a/service-registry/src/test/java/org/apache/servicecomb/serviceregistry/client/http/TestServiceRegistryClientImpl.java
+++ 
b/service-registry/src/test/java/org/apache/servicecomb/serviceregistry/client/http/TestServiceRegistryClientImpl.java
@@ -36,6 +36,7 @@ import 
org.apache.servicecomb.serviceregistry.api.registry.ServiceCenterInfo;
 import 
org.apache.servicecomb.serviceregistry.api.response.GetExistenceResponse;
 import org.apache.servicecomb.serviceregistry.api.response.GetSchemaResponse;
 import org.apache.servicecomb.serviceregistry.api.response.GetSchemasResponse;
+import org.apache.servicecomb.serviceregistry.api.response.GetServiceResponse;
 import org.apache.servicecomb.serviceregistry.client.ClientException;
 import org.apache.servicecomb.serviceregistry.client.IpPortManager;
 import 
org.apache.servicecomb.serviceregistry.client.http.ServiceRegistryClientImpl.ResponseWrapper;
@@ -50,6 +51,7 @@ import io.vertx.core.Handler;
 import io.vertx.core.buffer.Buffer;
 import io.vertx.core.http.HttpClientOptions;
 import io.vertx.core.http.HttpClientResponse;
+import io.vertx.core.http.HttpMethod;
 import io.vertx.core.json.Json;
 import mockit.Deencapsulation;
 import mockit.Expectations;
@@ -299,6 +301,53 @@ public class TestServiceRegistryClientImpl {
   }
 
   @Test
+  public void getAggregatedMicroservice() {
+    String microserviceId = "msId";
+    new MockUp<RestUtils>() {
+      @Mock
+      void httpDo(RequestContext requestContext, Handler<RestResponse> 
responseHandler) {
+        Holder<GetServiceResponse> holder = 
Deencapsulation.getField(responseHandler, "arg$4");
+        GetServiceResponse serviceResp = Json
+            .decodeValue(
+                "{\"service\":{\"serviceId\":\"serviceId\",\"framework\":null"
+                    + 
",\"registerBy\":null,\"environment\":null,\"appId\":\"appId\",\"serviceName\":null,"
+                    + 
"\"alias\":null,\"version\":null,\"description\":null,\"level\":null,\"schemas\":[],"
+                    + 
"\"paths\":[],\"status\":\"UP\",\"properties\":{},\"intance\":null}}",
+                GetServiceResponse.class);
+        holder.value = serviceResp;
+        RequestParam requestParam = requestContext.getParams();
+        Assert.assertEquals("global=true", requestParam.getQueryParams());
+      }
+    };
+    Microservice aggregatedMicroservice = 
oClient.getAggregatedMicroservice(microserviceId);
+    Assert.assertEquals("serviceId", aggregatedMicroservice.getServiceId());
+    Assert.assertEquals("appId", aggregatedMicroservice.getAppId());
+  }
+
+  @Test
+  public void getAggregatedSchema() {
+    String microserviceId = "msId";
+    String schemaId = "schemaId";
+
+    new MockUp<RestUtils>() {
+
+      @Mock
+      void httpDo(RequestContext requestContext, Handler<RestResponse> 
responseHandler) {
+        Holder<GetSchemaResponse> holder = 
Deencapsulation.getField(responseHandler, "arg$4");
+        GetSchemaResponse schemasResp = Json
+            .decodeValue(
+                "{ \"schema\": \"schema\", 
\"schemaId\":\"metricsEndpoint\",\"summary\":\"c1188d709631a9038874f9efc6eb894f\"}",
+                GetSchemaResponse.class);
+        holder.value = schemasResp;
+        RequestParam requestParam = requestContext.getParams();
+        Assert.assertEquals("global=true", requestParam.getQueryParams());
+      }
+    };
+    String str = oClient.getAggregatedSchema(microserviceId, schemaId);
+    Assert.assertEquals("schema", str);
+  }
+
+  @Test
   public void getSchemas() {
     String microserviceId = "msId";
 
@@ -363,6 +412,13 @@ public class TestServiceRegistryClientImpl {
 
   @Test
   public void testFindServiceInstance() {
+    new MockUp<RestUtils>() {
+      @Mock
+      void get(IpPort ipPort, String uri, RequestParam requestParam,
+          Handler<RestResponse> responseHandler) {
+        Assert.assertEquals("global=true", requestParam.getQueryParams());
+      }
+    };
     Assert.assertNull(oClient.findServiceInstance("aaa", "bbb"));
   }
 
@@ -374,7 +430,13 @@ public class TestServiceRegistryClientImpl {
         throw new Error("must not invoke this.");
       }
     };
-
+    new MockUp<RestUtils>() {
+      @Mock
+      void get(IpPort ipPort, String uri, RequestParam requestParam,
+          Handler<RestResponse> responseHandler) {
+        Assert.assertEquals("global=true", requestParam.getQueryParams());
+      }
+    };
     Assert.assertNull(oClient.findServiceInstance(null, "appId", 
"serviceName", "1.0.0+"));
   }
 
@@ -396,6 +458,14 @@ public class TestServiceRegistryClientImpl {
     RestResponse restResponse = new RestResponse(requestContext, response);
     new MockUp<RestUtils>() {
       @Mock
+      void get(IpPort ipPort, String uri, RequestParam requestParam,
+          Handler<RestResponse> responseHandler) {
+        
Assert.assertEquals("appId=appId&global=true&serviceName=serviceName&version=0.0.0%2B",
+            requestParam.getQueryParams());
+        httpDo(RestUtils.createRequestContext(HttpMethod.GET, ipPort, uri, 
requestParam), responseHandler);
+      }
+
+      @Mock
       void httpDo(RequestContext requestContext, Handler<RestResponse> 
responseHandler) {
         responseHandler.handle(restResponse);
       }
@@ -418,6 +488,13 @@ public class TestServiceRegistryClientImpl {
 
     new MockUp<RestUtils>() {
       @Mock
+      void get(IpPort ipPort, String uri, RequestParam requestParam,
+          Handler<RestResponse> responseHandler) {
+        Assert.assertEquals("global=true", requestParam.getQueryParams());
+        httpDo(RestUtils.createRequestContext(HttpMethod.GET, ipPort, uri, 
requestParam), responseHandler);
+      }
+
+      @Mock
       void httpDo(RequestContext requestContext, Handler<RestResponse> 
responseHandler) {
         Holder<ServiceCenterInfo> holder = 
Deencapsulation.getField(responseHandler, "arg$4");
         holder.value = serviceCenterInfo;

Reply via email to