liubao68 commented on a change in pull request #2691:
URL: 
https://github.com/apache/servicecomb-java-chassis/pull/2691#discussion_r790092219



##########
File path: 
service-registry/registry-service-center/src/main/java/org/apache/servicecomb/serviceregistry/client/IpPortManager.java
##########
@@ -75,47 +119,202 @@ public IpPortManager(ServiceRegistryConfig 
serviceRegistryConfig) {
     currentAvailableIndex = new AtomicInteger(initialIndex);
     LOGGER.info("Initial service center address is {}", getAvailableAddress());
     maxRetryTimes = defaultIpPort.size();
+    ServiceCenterEventBus.getEventBus().register(this);
   }
 
   // we have to do this operation after the first time setup has already done
   public void initAutoDiscovery() {
     if (!autoDiscoveryInited && 
this.serviceRegistryConfig.isRegistryAutoDiscovery()) {
-      InstanceCache cache = instanceCacheManager.getOrCreate(REGISTRY_APP_ID,
-          REGISTRY_SERVICE_NAME,
-          DefinitionConst.VERSION_RULE_LATEST);
-      if (cache.getInstanceMap().size() > 0) {
-        setAutoDiscoveryInited(true);
-      } else {
-        setAutoDiscoveryInited(false);
+      for (Type type : Type.values()) {
+        InitEndPoint(type.name());
       }
     }
   }
 
-  public IpPort getAvailableAddress() {
-    return getAvailableAddress(currentAvailableIndex.incrementAndGet());
+  private void InitEndPoint(String typeName) {
+    Map<String, List<String>> zoneAndRegion = 
generateZoneAndRegionAddress(typeName);
+    if (zoneAndRegion == null) {
+      return;
+    }
+    if (typeName.equals(REGISTRY_SERVICE_NAME)) {
+      setAutoDiscoveryInited(true);
+      sameAZ.addAll(zoneAndRegion.get("sameZone"));
+      sameRegion.addAll(zoneAndRegion.get("sameRegion"));
+    }
+    CommonEventManager.post(new RefreshEndpointEvent(zoneAndRegion, typeName));
   }
 
-  private IpPort getAvailableAddress(int index) {
-    if (index < defaultIpPort.size()) {
-      return defaultIpPort.get(index);
+  @Subscribe
+  public void onMicroserviceCacheRefreshed(MicroserviceCacheRefreshedEvent 
event) {
+    List<MicroserviceCache> microserviceCaches = event.getMicroserviceCaches();
+    if (null == microserviceCaches || microserviceCaches.isEmpty()) {
+      return;
     }
-    List<CacheEndpoint> endpoints = getDiscoveredIpPort();
-    if (endpoints == null || (index >= defaultIpPort.size() + 
endpoints.size())) {
-      currentAvailableIndex.set(0);
-      return defaultIpPort.get(0);
+
+    for (MicroserviceCache microserviceCache : microserviceCaches) {
+      if (microserviceCache.getKey().toString().equals(SC_KEY)) {
+        refreshEndPoints(microserviceCache, REGISTRY_SERVICE_NAME);
+      }
+      if (microserviceCache.getKey().toString().equals(CC_KEY)) {
+        refreshEndPoints(microserviceCache, KIE_NAME);
+      }
+      if (microserviceCache.getKey().toString().equals(KIE_KEY)) {
+        refreshEndPoints(microserviceCache, CONFIG_CENTER_NAME);
+      }
+      if (microserviceCache.getKey().toString().equals(MONITORING_KEY)) {
+        refreshEndPoints(microserviceCache, CSE_MONITORING_NAME);
+      }
     }
-    maxRetryTimes = defaultIpPort.size() + endpoints.size();
-    CacheEndpoint nextEndpoint = endpoints.get(index - defaultIpPort.size());
-    return new URIEndpointObject(nextEndpoint.getEndpoint());
   }
 
-  private List<CacheEndpoint> getDiscoveredIpPort() {
-    if (!autoDiscoveryInited || 
!this.serviceRegistryConfig.isRegistryAutoDiscovery()) {
-      return null;
+  private void refreshEndPoints(MicroserviceCache microserviceCache, String 
name) {
+    Map<String, List<String>> zoneAndRegion = 
refreshEndPoint(microserviceCache);
+    if (name.equals(REGISTRY_SERVICE_NAME)) {
+      sameAZ = zoneAndRegion.get("sameZone");
+      sameRegion = zoneAndRegion.get("sameRegion");
     }
+    CommonEventManager.post(new RefreshEndpointEvent(zoneAndRegion, name));
+  }
+
+  private Map<String, List<String>> refreshEndPoint(MicroserviceCache 
microserviceCache) {
+    List<String> sameZone = new ArrayList<>();
+    List<String> sameRegion = new ArrayList<>();
+    Map<String, List<String>> zoneAndRegion = new HashMap<>();
+
+    List<MicroserviceInstance> microserviceCacheInstances = 
microserviceCache.getInstances();
+
+    microserviceCacheInstances.forEach(microserviceInstance -> {
+      String endPoint = microserviceInstance.getEndpoints().get(0);
+      availableIpCache.put(getUri(endPoint), true);
+      if (regionAndAZMatch(dataCenterInfo, microserviceInstance)) {
+        sameZone.add(endPoint);
+      } else {
+        sameRegion.add(endPoint);
+      }
+    });
+    zoneAndRegion.put("sameZone", sameZone);
+    zoneAndRegion.put("sameRegion", sameRegion);
+    return zoneAndRegion;
+  }
+
+  public IpPort getAvailableAddress() {
+    return getAvailableIpPort();
+  }
+
+  private List<CacheEndpoint> getDiscoveredIpPort() {
     InstanceCache instanceCache = 
instanceCacheManager.getOrCreate(REGISTRY_APP_ID,
         REGISTRY_SERVICE_NAME,
         DefinitionConst.VERSION_RULE_LATEST);
     return instanceCache.getOrCreateTransportMap().get(defaultTransport);
   }
+
+  private IpPort getAvailableIpPort() {
+    IpPort ipPort = null;
+    if (!autoDiscoveryInited) {
+      ipPort = getDefaultIpPort();
+    } else {
+      List<String> addresses = getAvailableZoneIpPorts();
+      if (addresses.isEmpty()) {
+        ipPort = getDefaultIpPort();
+      } else {
+        if (index.get() >= addresses.size()) {
+          index.set(0);
+        }
+        ipPort = new URIEndpointObject(addresses.get(index.get()));
+      }
+    }
+    index.getAndIncrement();
+    return ipPort;
+  }
+
+  private List<String> getAvailableZoneIpPorts() {
+    List<String> results = new ArrayList<>();
+    if (!getAvailableAddress(sameAZ).isEmpty()) {
+      results.addAll(getAvailableAddress(sameAZ));
+    } else {
+      results.addAll(getAvailableAddress(sameRegion));
+    }
+    return results;
+  }
+
+  private IpPort getDefaultIpPort() {
+    if (index.get() >= defaultIpPort.size()) {
+      index.set(0);
+    }
+    return defaultIpPort.get(index.get());
+  }
+
+  private List<String> getAvailableAddress(List<String> endpoints) {
+    List<String> result = new ArrayList<>();
+    for (String endpoint : endpoints) {
+      try {
+        if (availableIpCache.get(getUri(endpoint), () -> true)) {
+          result.add(endpoint);
+        }
+      } catch (ExecutionException e) {
+        LOGGER.error("Not expected to happen, maybe a bug.", e);
+      }
+    }
+    return result;
+  }
+
+  private String getUri(String endpoint) {
+    return StringUtils.split(endpoint, "//")[1];
+  }
+
+  private Map<String, List<String>> generateZoneAndRegionAddress(String key) {
+    InstanceCache KieCaches = instanceCacheManager
+        .getOrCreate(REGISTRY_APP_ID, key, 
DefinitionConst.VERSION_RULE_LATEST);
+    List<CacheEndpoint> CacheEndpoints = new ArrayList<>();
+    if (REGISTRY_SERVICE_NAME.equals(key)) {
+      CacheEndpoints = 
KieCaches.getOrCreateTransportMap().get(defaultTransport);
+      maxRetryTimes = CacheEndpoints.size();
+    } else {
+      if (KieCaches.getInstanceMap().size() <= 0) {
+        return null;
+      }
+      CacheEndpoints = 
KieCaches.getOrCreateTransportMap().get(defaultTransport);
+    }
+    Map<String, List<String>> zoneAndRegion = new HashMap<>();
+    dataCenterInfo = findRegion(CacheEndpoints);
+
+    List<String> sameZone = new ArrayList<>();
+    List<String> sameRegion = new ArrayList<>();
+    for (CacheEndpoint cacheEndpoint : CacheEndpoints) {
+      if (regionAndAZMatch(dataCenterInfo, cacheEndpoint.getInstance())) {
+        sameZone.add(cacheEndpoint.getEndpoint());
+      } else {
+        sameRegion.add(cacheEndpoint.getEndpoint());
+      }
+    }
+    zoneAndRegion.put("sameZone", sameZone);
+    zoneAndRegion.put("sameRegion", sameRegion);
+    return zoneAndRegion;
+  }
+
+  private DataCenterInfo findRegion(List<CacheEndpoint> CacheEndpoints) {
+    MicroserviceInstance myself = 
RegistrationManager.INSTANCE.getMicroserviceInstance();
+    if (myself.getDataCenterInfo() == null) {
+      return null;
+    }

Review comment:
       These logic can hide in AddressManager




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to